- """This is a connected user."""
-
- def __init__(self):
- """Default values for the in-memory user variables."""
- self.address = ""
- self.last_address = ""
- self.connection = None
- self.authenticated = False
- self.password_tries = 1
- self.state = "entering_account_name"
- self.menu_seen = False
- self.error = ""
- self.input_queue = []
- self.output_queue = []
- self.partial_input = ""
- self.echoing = True
- self.avatar = None
- self.account = None
-
- def quit(self):
- """Log, close the connection and remove."""
- name = self.account.get("name")
- if name: message = "User " + name
- else: message = "An unnamed user"
- message += " logged out."
- log(message)
- self.connection.close()
- self.remove()
-
- def reload(self):
- """Save, load a new user and relocate the connection."""
-
- # get out of the list
- self.remove()
-
- # create a new user object
- new_user = User()
-
- # set everything else equivalent
- for attribute in [
- "address",
- "last_address",
- "connection",
- "authenticated",
- "password_tries",
- "state",
- "menu_seen",
- "error",
- "input_queue",
- "output_queue",
- "partial_input",
- "echoing",
- "avatar",
- "account"
- ]:
- exec("new_user." + attribute + " = self." + attribute)
-
- # add it to the list
- universe.userlist.append(new_user)
-
- # get rid of the old user object
- del(self)
-
- def replace_old_connections(self):
- """Disconnect active users with the same name."""
-
- # the default return value
- return_value = False
-
- # iterate over each user in the list
- for old_user in universe.userlist:
-
- # the name is the same but it's not us
- if old_user.account.get("name") == self.account.get("name") and old_user is not self:
-
- # make a note of it
- log("User " + self.account.get("name") + " reconnected--closing old connection to " + old_user.address + ".")
- old_user.send("$(eol)$(red)New connection from " + self.address + ". Terminating old connection...$(nrm)$(eol)")
- self.send("$(eol)$(red)Taking over old connection from " + old_user.address + ".$(nrm)")
-
- # close the old connection
- old_user.connection.close()
-
- # replace the old connection with this one
- old_user.connection = self.connection
- old_user.last_address = old_user.address
- old_user.address = self.address
- old_user.echoing = self.echoing
-
- # take this one out of the list and delete
- self.remove()
- del(self)
- return_value = True
- break
-
- # true if an old connection was replaced, false if not
- return return_value
-
- def authenticate(self):
- """Flag the user as authenticated and disconnect duplicates."""
- if not self.state is "authenticated":
- log("User " + self.account.get("name") + " logged in.")
- self.authenticated = True
-
- def show_menu(self):
- """Send the user their current menu."""
- if not self.menu_seen:
- self.menu_choices = get_menu_choices(self)
- self.send(get_menu(self.state, self.error, self.echoing, self.menu_choices), "")
- self.menu_seen = True
- self.error = False
- self.adjust_echoing()
-
- def adjust_echoing(self):
- """Adjust echoing to match state menu requirements."""
- if self.echoing and not menu_echo_on(self.state): self.echoing = False
- elif not self.echoing and menu_echo_on(self.state): self.echoing = True
-
- def remove(self):
- """Remove a user from the list of connected users."""
- universe.userlist.remove(self)
-
- def send(self, output, eol="$(eol)"):
- """Send arbitrary text to a connected user."""
-
- # only when there is actual output
- #if output:
-
- # start with a newline, append the message, then end
- # with the optional eol string passed to this function
- # and the ansi escape to return to normal text
- output = "\r\n" + output + eol + chr(27) + "[0m"
-
- # find and replace macros in the output
- output = replace_macros(self, output)
-
- # wrap the text at 80 characters
- # TODO: prompt user for preferred wrap width
- output = wrap_ansi_text(output, 80)
-
- # drop the formatted output into the output queue
- self.output_queue.append(output)
-
- # try to send the last item in the queue, remove it and
- # flag that menu display is not needed
- try:
- self.connection.send(self.output_queue[0])
- self.output_queue.remove(self.output_queue[0])
- self.menu_seen = False
-
- # but if we can't, that's okay too
- except:
- pass
-
- def pulse(self):
- """All the things to do to the user per increment."""
-
- # if the world is terminating, disconnect
- if universe.terminate_world:
- self.state = "disconnecting"
- self.menu_seen = False
-
- # show the user a menu as needed
- self.show_menu()
-
- # disconnect users with the appropriate state
- if self.state == "disconnecting":
- self.quit()
-
- # the user is unique and not flagged to disconnect
- else:
-
- # check for input and add it to the queue
- self.enqueue_input()
-
- # there is input waiting in the queue
- if self.input_queue: handle_user_input(self)
-
- def enqueue_input(self):
- """Process and enqueue any new input."""
-
- # check for some input
- try:
- input_data = self.connection.recv(1024)
- except:
- input_data = ""
-
- # we got something
- if input_data:
-
- # tack this on to any previous partial
- self.partial_input += input_data
-
- # separate multiple input lines
- new_input_lines = self.partial_input.split("\n")
-
- # if input doesn't end in a newline, replace the
- # held partial input with the last line of it
- if not self.partial_input.endswith("\n"):
- self.partial_input = new_input_lines.pop()
-
- # otherwise, chop off the extra null input and reset
- # the held partial input
- else:
- new_input_lines.pop()
- self.partial_input = ""
-
- # iterate over the remaining lines
- for line in new_input_lines:
-
- # filter out non-printables
- line = filter(lambda x: x>=' ' and x<='~', line)
-
- # strip off extra whitespace
- line = line.strip()
-
- # put on the end of the queue
- self.input_queue.append(line)
-
- def new_avatar(self):
- """Instantiate a new, unconfigured avatar for this user."""
- counter = universe.categories["internal"]["counters"].getint("next_avatar")
- while "avatar:" + repr(counter + 1) in universe.categories["actor"].keys(): counter += 1
- universe.categories["internal"]["counters"].set("next_avatar", counter + 1)
- self.avatar = Element("actor:avatar:" + repr(counter), universe)
- avatars = self.account.get("avatars").split()
- avatars.append(self.avatar.key)
- self.account.set("avatars", " ".join(avatars))
-
- def list_avatar_names(self):
- """A test function to list names of assigned avatars."""
- try:
- avatars = self.account.get("avatars").split()
- except:
- avatars = []
- avatar_names = []
- for avatar in avatars:
- avatar_names.append(universe.contents[avatar].get("name"))
- return avatar_names
-
-def broadcast(message):
- """Send a message to all connected users."""
- for each_user in universe.userlist: each_user.send("$(eol)" + message)
-
-def log(message):
- """Log a message."""
-
- # the time in posix log timestamp format
- timestamp = asctime()[4:19]
-
- # send the timestamp and message to standard output
- print(timestamp + " " + message)
+ u"""This is a connected user."""
+
+ def __init__(self):
+ u"""Default values for the in-memory user variables."""
+ self.account = None
+ self.address = u""
+ self.authenticated = False
+ self.avatar = None
+ self.client_displays_binary = False
+ self.client_sends_binary = False
+ self.columns = 79
+ self.connection = None
+ self.echoing = True
+ self.error = u""
+ self.input_queue = []
+ self.last_address = u""
+ self.last_input = universe.get_time()
+ self.menu_choices = {}
+ self.menu_seen = False
+ self.negotiation_pause = 0
+ self.output_queue = []
+ self.partial_input = ""
+ self.password_tries = 0
+ self.received_newline = True
+ self.state = u"initial"
+ self.terminator = telnet_proto([u"IAC",u"GA"])
+
+ def quit(self):
+ u"""Log, close the connection and remove."""
+ if self.account: name = self.account.get(u"name")
+ else: name = u""
+ if name: message = u"User " + name
+ else: message = u"An unnamed user"
+ message += u" logged out."
+ log(message, 2)
+ self.deactivate_avatar()
+ self.connection.close()
+ self.remove()
+
+ def check_idle(self):
+ u"""Warn or disconnect idle users as appropriate."""
+ idletime = universe.get_time() - self.last_input
+ linkdead_dict = universe.categories[u"internal"][u"time"].getdict(
+ u"linkdead"
+ )
+ if self.state in linkdead_dict: linkdead_state = self.state
+ else: linkdead_state = u"default"
+ if idletime > linkdead_dict[linkdead_state]:
+ self.send(
+ u"$(eol)$(red)You've done nothing for far too long... goodbye!" \
+ + u"$(nrm)$(eol)",
+ flush=True,
+ add_prompt=False
+ )
+ logline = u"Disconnecting "
+ if self.account and self.account.get(u"name"):
+ logline += self.account.get(u"name")
+ else:
+ logline += u"an unknown user"
+ logline += u" after idling too long in a " + self.state + u" state."
+ log(logline, 2)
+ self.state = u"disconnecting"
+ self.menu_seen = False
+ idle_dict = universe.categories[u"internal"][u"time"].getdict(u"idle")
+ if self.state in idle_dict: idle_state = self.state
+ else: idle_state = u"default"
+ if idletime == idle_dict[idle_state]:
+ self.send(
+ u"$(eol)$(red)If you continue to be unproductive, " \
+ + u"you'll be shown the door...$(nrm)$(eol)"
+ )
+
+ def reload(self):
+ u"""Save, load a new user and relocate the connection."""
+
+ # get out of the list
+ self.remove()
+
+ # create a new user object
+ new_user = User()
+
+ # set everything equivalent
+ for attribute in vars(self).keys():
+ exec(u"new_user." + attribute + u" = self." + attribute)
+
+ # the avatar needs a new owner
+ if new_user.avatar: new_user.avatar.owner = new_user
+
+ # add it to the list
+ universe.userlist.append(new_user)
+
+ # get rid of the old user object
+ del(self)
+
+ def replace_old_connections(self):
+ u"""Disconnect active users with the same name."""
+
+ # the default return value
+ return_value = False
+
+ # iterate over each user in the list
+ for old_user in universe.userlist:
+
+ # the name is the same but it's not us
+ if hasattr(
+ old_user, u"account"
+ ) and old_user.account and old_user.account.get(
+ u"name"
+ ) == self.account.get(
+ u"name"
+ ) and old_user is not self:
+
+ # make a note of it
+ log(
+ u"User " + self.account.get(
+ u"name"
+ ) + u" reconnected--closing old connection to " \
+ + old_user.address + u".",
+ 2
+ )
+ old_user.send(
+ u"$(eol)$(red)New connection from " + self.address \
+ + u". Terminating old connection...$(nrm)$(eol)", \
+ flush=True,
+ add_prompt=False
+ )
+
+ # close the old connection
+ old_user.connection.close()
+
+ # replace the old connection with this one
+ old_user.send(
+ u"$(eol)$(red)Taking over old connection from " \
+ + old_user.address + u".$(nrm)"
+ )
+ old_user.connection = self.connection
+ old_user.last_address = old_user.address
+ old_user.address = self.address
+ old_user.client_displays_binary = self.client_displays_binary
+ old_user.client_sends_binary = self.client_sends_binary
+
+ # may need to tell the new connection to echo
+ if old_user.echoing:
+ old_user.send(
+ get_echo_sequence(old_user.state, self.echoing), raw=True
+ )
+
+ # take this one out of the list and delete
+ self.remove()
+ del(self)
+ return_value = True
+ break
+
+ # true if an old connection was replaced, false if not
+ return return_value
+
+ def authenticate(self):
+ u"""Flag the user as authenticated and disconnect duplicates."""
+ if not self.state is u"authenticated":
+ log(u"User " + self.account.get(u"name") + u" logged in.", 2)
+ self.authenticated = True
+ if self.account.subkey in universe.categories[
+ u"internal"
+ ][
+ u"limits"
+ ].getlist(
+ u"default_admins"
+ ):
+ self.account.set(u"administrator", u"True")
+
+ def show_menu(self):
+ u"""Send the user their current menu."""
+ if not self.menu_seen:
+ self.menu_choices = get_menu_choices(self)
+ self.send(
+ get_menu(self.state, self.error, self.menu_choices),
+ u"",
+ add_terminator=True
+ )
+ self.menu_seen = True
+ self.error = False
+ self.adjust_echoing()
+
+ def adjust_echoing(self):
+ u"""Adjust echoing to match state menu requirements."""
+ if self.echoing and not menu_echo_on(self.state): self.echoing = False
+ elif not self.echoing and menu_echo_on(self.state): self.echoing = True
+
+ def remove(self):
+ u"""Remove a user from the list of connected users."""
+ universe.userlist.remove(self)
+
+ def send(
+ self,
+ output,
+ eol=u"$(eol)",
+ raw=False,
+ flush=False,
+ add_prompt=True,
+ just_prompt=False,
+ add_terminator=False
+ ):
+ u"""Send arbitrary text to a connected user."""
+
+ # unless raw mode is on, clean it up all nice and pretty
+ if not raw:
+
+ # strip extra $(eol) off if present
+ while output.startswith(u"$(eol)"): output = output[6:]
+ while output.endswith(u"$(eol)"): output = output[:-6]
+ extra_lines = output.find(u"$(eol)$(eol)$(eol)")
+ while extra_lines > -1:
+ output = output[:extra_lines] + output[extra_lines+6:]
+ extra_lines = output.find(u"$(eol)$(eol)$(eol)")
+
+ # start with a newline, append the message, then end
+ # with the optional eol string passed to this function
+ # and the ansi escape to return to normal text
+ if not just_prompt:
+ if not self.output_queue or not self.output_queue[-1].endswith(
+ "\r\n"
+ ):
+ output = u"$(eol)$(eol)" + output
+ elif not self.output_queue[-1].endswith(
+ "\r\n\x1b[0m\r\n"
+ ) and not self.output_queue[-1].endswith(
+ "\r\n\r\n"
+ ):
+ output = u"$(eol)" + output
+ output += eol + unichr(27) + u"[0m"
+
+ # tack on a prompt if active
+ if self.state == u"active":
+ if not just_prompt: output += u"$(eol)"
+ if add_prompt:
+ output += u"> "
+ mode = self.avatar.get(u"mode")
+ if mode: output += u"(" + mode + u") "
+
+ # find and replace macros in the output
+ output = replace_macros(self, output)
+
+ # wrap the text at the client's width (min 40, 0 disables)
+ if self.columns:
+ if self.columns < 40: wrap = 40
+ else: wrap = self.columns
+ output = wrap_ansi_text(output, wrap)
+
+ # if supported by the client, encode it utf-8
+ if self.client_displays_binary:
+ encoded_output = output.encode(u"utf-8")
+
+ # otherwise just send ascii
+ else:
+ encoded_output = output.encode(u"ascii", u"replace")
+
+ # inject appropriate echoing changes
+ encoded_output += get_echo_sequence(self.state, self.echoing)
+
+ # end with a terminator if requested
+ if add_terminator: encoded_output += self.terminator
+
+ # and tack it onto the queue
+ self.output_queue.append(encoded_output)
+
+ # if this is urgent, flush all pending output
+ if flush: self.flush()
+
+ # just dump raw bytes as requested
+ else:
+ self.output_queue.append(output)
+ self.flush()
+
+ def pulse(self):
+ u"""All the things to do to the user per increment."""
+
+ # if the world is terminating, disconnect
+ if universe.terminate_flag:
+ self.state = u"disconnecting"
+ self.menu_seen = False
+
+ # check for an idle connection and act appropriately
+ else: self.check_idle()
+
+ # if output is paused, decrement the counter
+ if self.state == u"initial":
+ if self.negotiation_pause: self.negotiation_pause -= 1
+ else: self.state = u"entering_account_name"
+
+ # show the user a menu as needed
+ elif not self.state == u"active": self.show_menu()
+
+ # flush any pending output in the queue
+ self.flush()
+
+ # disconnect users with the appropriate state
+ if self.state == u"disconnecting": self.quit()
+
+ # check for input and add it to the queue
+ self.enqueue_input()
+
+ # there is input waiting in the queue
+ if self.input_queue:
+ handle_user_input(self)
+
+ def flush(self):
+ u"""Try to send the last item in the queue and remove it."""
+ if self.output_queue:
+ if self.received_newline:
+ self.received_newline = False
+ if self.output_queue[0].startswith("\r\n"):
+ self.output_queue[0] = self.output_queue[0][2:]
+ try:
+ self.connection.send(self.output_queue[0])
+ del self.output_queue[0]
+ except:
+ if self.account and self.account.get(u"name"):
+ account = self.account.get(u"name")
+ else: account = u"an unknown user"
+ log(
+ u"Sending to " + account \
+ + u" raised an exception (broken pipe?)."
+ )
+ pass
+
+
+ def enqueue_input(self):
+ u"""Process and enqueue any new input."""
+ import unicodedata
+
+ # check for some input
+ try:
+ raw_input = self.connection.recv(1024)
+ except:
+ raw_input = ""
+
+ # we got something
+ if raw_input:
+
+ # tack this on to any previous partial
+ self.partial_input += raw_input
+
+ # reply to and remove any IAC negotiation codes
+ self.negotiate_telnet_options()
+
+ # separate multiple input lines
+ new_input_lines = self.partial_input.split("\n")
+
+ # if input doesn't end in a newline, replace the
+ # held partial input with the last line of it
+ if not self.partial_input.endswith("\n"):
+ self.partial_input = new_input_lines.pop()
+
+ # otherwise, chop off the extra null input and reset
+ # the held partial input
+ else:
+ new_input_lines.pop()
+ self.partial_input = ""
+
+ # iterate over the remaining lines
+ for line in new_input_lines:
+
+ # strip off extra whitespace
+ line = line.strip()
+
+ # make sure it's valid unicode (probably no longer needed)
+ try: unicode(line, u"utf-8")
+ except UnicodeDecodeError:
+ logline = u"Non-unicode data from "
+ if self.account and self.account.get(u"name"):
+ logline += self.account.get(u"name") + u": "
+ else: logline += u"unknown user: "
+ logline += repr(line)
+ log(logline, 4)
+ line = ""
+
+ # log non-printable characters remaining
+ if not hasattr(
+ self, u"client_sends_binary"
+ ) or not self.client_sends_binary:
+ asciiline = filter(lambda x: " " <= x <= "~", line)
+ if line != asciiline:
+ logline = u"Non-ASCII characters from "
+ if self.account and self.account.get(u"name"):
+ logline += self.account.get(u"name") + u": "
+ else: logline += u"unknown user: "
+ logline += repr(line)
+ log(logline, 4)
+ line = asciiline
+
+ # put on the end of the queue
+ self.input_queue.append(
+ unicodedata.normalize( u"NFKC", unicode(line, u"utf-8") )
+ )
+
+ def negotiate_telnet_options(self):
+ u"""Reply to/remove partial_input telnet negotiation options."""
+
+ # start at the begining of the input
+ position = 0
+
+ # make a local copy to play with
+ text = self.partial_input
+
+ # as long as we haven't checked it all
+ while position < len(text):
+
+ # jump to the first IAC you find
+ position = text.find(telnet_proto([u"IAC"]), position)
+
+ # if there wasn't an IAC in the input, we're done
+ if position < 0: break
+
+ # replace a double (literal) IAC if there's an LF later
+ elif len(text) > position+1 and text[position+1] == telnet_proto(
+ [u"IAC"]
+ ):
+ if text.find("\n", position) > 0:
+ text = text.replace(
+ telnet_proto([u"IAC",u"IAC"]), telnet_proto([u"IAC"])
+ )
+ else: position += 1
+ position += 1
+
+ # implement an RFC 1143 option negotiation queue here
+ elif len(text) > position+2 and text[position+1] in (
+ telnet_proto([u"DO",u"DONT",u"WILL",u"WONT"])
+ ):
+ negotiation = text[position+1:position+3]
+
+ # if we turned echo off, ignore the confirmation
+ if not self.echoing and negotiation == telnet_proto(
+ [u"DO",u"TELOPT_ECHO"]
+ ):
+ self.send(
+ telnet_proto([u"IAC",u"WILL",u"TELOPT_ECHO"]), raw=True
+ )
+
+ # BINARY mode handling for unicode support (RFC 856)
+ elif negotiation == telnet_proto([u"DO",u"TELOPT_BINARY"]):
+ self.send(
+ telnet_proto([u"IAC",u"WILL",u"TELOPT_BINARY"]), raw=True
+ )
+ self.client_displays_binary = True
+ elif negotiation == telnet_proto([u"DONT",u"TELOPT_BINARY"]):
+ self.send(
+ telnet_proto([u"IAC",u"WONT",u"TELOPT_BINARY"]), raw=True
+ )
+ self.client_displays_binary = False
+ elif negotiation == telnet_proto([u"WILL",u"TELOPT_BINARY"]):
+ self.send(
+ telnet_proto([u"IAC",u"DO",u"TELOPT_BINARY"]), raw=True
+ )
+ self.client_sends_binary = True
+ elif negotiation == telnet_proto([u"WONT",u"TELOPT_BINARY"]):
+ self.send(
+ telnet_proto([u"IAC",u"DONT",u"TELOPT_BINARY"]), raw=True
+ )
+ self.client_sends_binary = False
+
+ # allow LINEMODE (RFC 1184)
+ elif negotiation == telnet_proto([u"WILL",u"TELOPT_LINEMODE"]):
+ self.send(
+ telnet_proto([u"IAC",u"DO",u"TELOPT_LINEMODE"]), raw=True
+ )
+ elif negotiation == telnet_proto([u"WONT",u"TELOPT_LINEMODE"]):
+ self.send(
+ telnet_proto([u"IAC",u"DONT",u"TELOPT_LINEMODE"]), raw=True
+ )
+
+ # allow NAWS (RFC 1073)
+ elif negotiation == telnet_proto([u"WILL",u"TELOPT_NAWS"]):
+ self.send(
+ telnet_proto([u"IAC",u"DO",u"TELOPT_NAWS"]), raw=True
+ )
+ elif negotiation == telnet_proto([u"WONT",u"TELOPT_NAWS"]):
+ self.send(
+ telnet_proto([u"IAC",u"DONT",u"TELOPT_NAWS"]), raw=True
+ )
+
+ # if the client likes EOR (RFC 885) instead of GA, note it
+ elif negotiation == telnet_proto([u"DO",u"TELOPT_EOR"]):
+ self.send(
+ telnet_proto([u"IAC",u"WILL",u"TELOPT_EOR"]), raw=True
+ )
+ self.terminator = telnet_proto([u"IAC",u"EOR"])
+ elif negotiation == telnet_proto([u"DONT",u"TELOPT_EOR"]):
+ self.send(
+ telnet_proto([u"IAC",u"WONT",u"TELOPT_EOR"]), raw=True
+ )
+ if self.terminator == telnet_proto([u"IAC",u"EOR"]):
+ self.terminator = telnet_proto([u"IAC",u"GA"])
+
+ # if the client doesn't want GA, oblige (RFC 858)
+ elif negotiation == telnet_proto([u"DO",u"TELOPT_SGA"]):
+ self.send(telnet_proto([u"IAC",u"WILL",u"TELOPT_SGA"]),
+ raw=True)
+ if self.terminator == telnet_proto([u"IAC",u"GA"]):
+ self.terminator = ""
+
+ # we don't want to allow anything else
+ elif text[position+1] == telnet_proto([u"DO"]):
+ self.send(
+ telnet_proto([u"IAC",u"WONT"])+text[position+2], raw=True
+ )
+ elif text[position+1] == telnet_proto([u"WILL"]):
+ self.send(
+ telnet_proto([u"IAC",u"DONT"])+text[position+2], raw=True
+ )
+
+ # strip the negotiation from the input
+ text = text.replace(text[position:position+3], "")
+
+ # subnegotiation options
+ elif len(text) > position+4 and text[
+ position:position+2
+ ] == telnet_proto([u"IAC",u"SB"]):
+ if text[position+2] == telnet_proto([u"TELOPT_NAWS"]):
+ self.columns = ord(text[position+3])*256+ord(text[position+4])
+ end_subnegotiation = text.find(
+ telnet_proto([u"IAC",u"SE"]), position
+ )
+ if end_subnegotiation > 0:
+ text = text[:position] + text[end_subnegotiation+2:]
+ else: position += 1
+
+ # otherwise, strip out a two-byte IAC command
+ elif len(text) > position+2:
+ text = text.replace(text[position:position+2], "")
+
+ # and this means we got the begining of an IAC
+ else: position += 1
+
+ # replace the input with our cleaned-up text
+ self.partial_input = text
+
+ def new_avatar(self):
+ u"""Instantiate a new, unconfigured avatar for this user."""
+ counter = 0
+ while u"avatar:" + self.account.get(u"name") + u":" + unicode(
+ counter
+ ) in universe.categories[u"actor"].keys():
+ counter += 1
+ self.avatar = Element(
+ u"actor:avatar:" + self.account.get(u"name") + u":" + unicode(
+ counter
+ ),
+ universe
+ )
+ self.avatar.append(u"inherit", u"archetype:avatar")
+ self.account.append(u"avatars", self.avatar.key)
+
+ def delete_avatar(self, avatar):
+ u"""Remove an avatar from the world and from the user's list."""
+ if self.avatar is universe.contents[avatar]: self.avatar = None
+ universe.contents[avatar].destroy()
+ avatars = self.account.getlist(u"avatars")
+ avatars.remove(avatar)
+ self.account.set(u"avatars", avatars)
+
+ def activate_avatar_by_index(self, index):
+ u"""Enter the world with a particular indexed avatar."""
+ self.avatar = universe.contents[self.account.getlist(u"avatars")[index]]
+ self.avatar.owner = self
+ self.state = u"active"
+ self.avatar.go_home()
+
+ def deactivate_avatar(self):
+ u"""Have the active avatar leave the world."""
+ if self.avatar:
+ current = self.avatar.get(u"location")
+ if current:
+ self.avatar.set(u"default_location", current)
+ self.avatar.echo_to_location(
+ u"You suddenly wonder where " + self.avatar.get(
+ u"name"
+ ) + u" went."
+ )
+ del universe.contents[current].contents[self.avatar.key]
+ self.avatar.remove_facet(u"location")
+ self.avatar.owner = None
+ self.avatar = None
+
+ def destroy(self):
+ u"""Destroy the user and associated avatars."""
+ for avatar in self.account.getlist(u"avatars"):
+ self.delete_avatar(avatar)
+ self.account.destroy()
+
+ def list_avatar_names(self):
+ u"""List names of assigned avatars."""
+ return [
+ universe.contents[avatar].get(
+ u"name"
+ ) for avatar in self.account.getlist(u"avatars")
+ ]
+
+def makelist(value):
+ u"""Turn string into list type."""
+ if value[0] + value[-1] == u"[]": return eval(value)
+ else: return [ value ]
+
+def makedict(value):
+ u"""Turn string into dict type."""
+ if value[0] + value[-1] == u"{}": return eval(value)
+ elif value.find(u":") > 0: return eval(u"{" + value + u"}")
+ else: return { value: None }
+
+def telnet_proto(arguments):
+ u"""Return a concatenated series of Telnet protocol commands."""
+
+ # same names as bsd's arpa/telnet.h (telnetlib's are ambiguous)
+ telnet_commands = {
+ u"TELOPT_BINARY": 0, # RFC 856
+ u"TELOPT_ECHO": 1, # RFC 857
+ u"TELOPT_SGA": 3, # RFC 858
+ u"TELOPT_EOR": 25, # RFC 885
+ u"TELOPT_NAWS": 31, # RFC 1073
+ u"TELOPT_LINEMODE": 34, # RFC 1184
+ u"EOR": 239,
+ u"SE": 240,
+ u"GA": 249,
+ u"SB": 250,
+ u"WILL": 251,
+ u"WONT": 252,
+ u"DO": 253,
+ u"DONT": 254,
+ u"IAC": 255
+ }
+
+ # this will need to be a byte type during 2to3 migration
+ command_series = ""
+ for argument in arguments:
+ command_series += chr(telnet_commands[argument])
+ return command_series
+
+def broadcast(message, add_prompt=True):
+ u"""Send a message to all connected users."""
+ for each_user in universe.userlist:
+ each_user.send(u"$(eol)" + message, add_prompt=add_prompt)
+
+def log(message, level=0):
+ u"""Log a message."""
+ import codecs, os.path, syslog, time
+
+ # a couple references we need
+ file_name = universe.categories[u"internal"][u"logging"].get(u"file")
+ max_log_lines = universe.categories[u"internal"][u"logging"].getint(
+ u"max_log_lines"
+ )
+ syslog_name = universe.categories[u"internal"][u"logging"].get(u"syslog")
+ timestamp = time.asctime()[4:19]
+
+ # turn the message into a list of lines
+ lines = filter(
+ lambda x: x!=u"", [ (x.rstrip()) for x in message.split(u"\n") ]
+ )
+
+ # send the timestamp and line to a file
+ if file_name:
+ if not os.path.isabs(file_name):
+ file_name = os.path.join(universe.startdir, file_name)
+ file_descriptor = codecs.open(file_name, u"a", u"utf-8")
+ for line in lines: file_descriptor.write(timestamp + u" " + line + u"\n")
+ file_descriptor.flush()
+ file_descriptor.close()
+
+ # send the timestamp and line to standard output
+ if universe.categories[u"internal"][u"logging"].getboolean(u"stdout"):
+ for line in lines: print(timestamp + u" " + line)
+
+ # send the line to the system log
+ if syslog_name:
+ syslog.openlog(
+ syslog_name.encode("utf-8"),
+ syslog.LOG_PID,
+ syslog.LOG_INFO | syslog.LOG_DAEMON
+ )
+ for line in lines: syslog.syslog(line)
+ syslog.closelog()
+
+ # display to connected administrators
+ for user in universe.userlist:
+ if user.state == u"active" and user.account.getboolean(
+ u"administrator"
+ ) and user.account.getint(u"loglevel") <= level:
+ # iterate over every line in the message
+ full_message = u""
+ for line in lines:
+ full_message += u"$(bld)$(red)" + timestamp + u" " + line.replace(
+ u"$(", u"$_("
+ ) + u"$(nrm)$(eol)"
+ user.send(full_message, flush=True)
+
+ # add to the recent log list
+ for line in lines:
+ while 0 < len(universe.loglines) >= max_log_lines:
+ del universe.loglines[0]
+ universe.loglines.append((level, timestamp + u" " + line))
+
+def get_loglines(level, start, stop):
+ u"""Return a specific range of loglines filtered by level."""
+
+ # filter the log lines
+ loglines = filter(lambda x: x[0]>=level, universe.loglines)
+
+ # we need these in several places
+ total_count = unicode(len(universe.loglines))
+ filtered_count = len(loglines)
+
+ # don't proceed if there are no lines
+ if filtered_count:
+
+ # can't start before the begining or at the end
+ if start > filtered_count: start = filtered_count
+ if start < 1: start = 1
+
+ # can't stop before we start
+ if stop > start: stop = start
+ elif stop < 1: stop = 1
+
+ # some preamble
+ message = u"There are " + unicode(total_count)
+ message += u" log lines in memory and " + unicode(filtered_count)
+ message += u" at or above level " + unicode(level) + u"."
+ message += u" The matching lines from " + unicode(stop) + u" to "
+ message += unicode(start) + u" are:$(eol)$(eol)"
+
+ # add the text from the selected lines
+ if stop > 1: range_lines = loglines[-start:-(stop-1)]
+ else: range_lines = loglines[-start:]
+ for line in range_lines:
+ message += u" (" + unicode(line[0]) + u") " + line[1].replace(
+ u"$(", u"$_("
+ ) + u"$(eol)"
+
+ # there were no lines
+ else:
+ message = u"None of the " + unicode(total_count)
+ message += u" lines in memory matches your request."
+
+ # pass it back
+ return message
+
+def glyph_columns(character):
+ u"""Convenience function to return the column width of a glyph."""
+ import unicodedata
+ if unicodedata.east_asian_width(character) in u"FW": return 2
+ else: return 1