"""Miscellaneous functions for the mudpy engine."""
-# Copyright (c) 2004-2017 Jeremy Stanley <fungi@yuggoth.org>. Permission
+# Copyright (c) 2004-2018 Jeremy Stanley <fungi@yuggoth.org>. Permission
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
def reload(self):
"""Create a new element and replace this one."""
- Element(self.key, self.universe, self.origin)
- del(self)
+ args = (self.key, self.universe, self.origin)
+ self.destroy()
+ Element(*args)
def destroy(self):
"""Remove an element from the universe and destroy it."""
self.startdir = os.getcwd()
self.terminate_flag = False
self.userlist = []
+ self.versions = None
if not filename:
possible_filenames = [
"etc/mudpy.yaml",
# it's possible for this to enter before logging configuration is read
pending_loglines = []
- # the files dict must exist and filename needs to be read-only
- if not hasattr(
- self, "files"
- ) or not (
- self.filename in self.files and self.files[
- self.filename
- ].is_writeable()
- ):
-
- # clear out all read-only files
- if hasattr(self, "files"):
- for data_filename in list(self.files.keys()):
- if not self.files[data_filename].is_writeable():
- del self.files[data_filename]
-
- # start loading from the initial file
- mudpy.data.Data(self.filename, self)
+ # start populating the (re)files dict from the base config
+ self.files = {}
+ mudpy.data.Data(self.filename, self)
# load default storage locations for groups
if hasattr(self, "contents") and "mudpy.filing" in self.contents:
if user.avatar in inactive_avatars:
inactive_avatars.remove(user.avatar)
- # go through all elements to clear out inactive avatar locations
- for element in self.contents.values():
- area = element.get("location")
- if element in inactive_avatars and area:
- if area in self.contents and element.key in self.contents[
- area
- ].contents:
- del self.contents[area].contents[element.key]
- element.set("default_location", area)
- element.remove_facet("location")
-
# another pass to straighten out all the element contents
for element in self.contents.values():
element.update_location()
self.output_queue = []
self.partial_input = b""
self.password_tries = 0
- self.state = "initial"
+ self.state = "telopt_negotiation"
self.telopts = {}
def quit(self):
def reload(self):
"""Save, load a new user and relocate the connection."""
+ # copy old attributes
+ attributes = self.__dict__
+
# get out of the list
self.remove()
+ # get rid of the old user object
+ del(self)
+
# create a new user object
new_user = User()
# set everything equivalent
- for attribute in vars(self).keys():
- exec("new_user." + attribute + " = self." + attribute)
+ new_user.__dict__ = attributes
# the avatar needs a new owner
if new_user.avatar:
+ new_user.account = universe.contents[new_user.account.key]
+ new_user.avatar = universe.contents[new_user.avatar.key]
new_user.avatar.owner = new_user
# add it to the list
universe.userlist.append(new_user)
- # get rid of the old user object
- del(self)
-
def replace_old_connections(self):
"""Disconnect active users with the same name."""
self.check_idle()
# if output is paused, decrement the counter
- if self.state == "initial":
+ if self.state == "telopt_negotiation":
if self.negotiation_pause:
self.negotiation_pause -= 1
else:
if self.output_queue:
try:
self.connection.send(self.output_queue[0])
- except BrokenPipeError:
+ except (BrokenPipeError, ConnectionResetError):
if self.account and self.account.get("name"):
account = self.account.get("name")
else:
account = "an unknown user"
self.state = "disconnecting"
- log("Broken pipe sending to %s." % account, 7)
+ log("Disconnected while sending to %s." % account, 7)
del self.output_queue[0]
def enqueue_input(self):
line = line.strip()
# log non-printable characters remaining
- if mudpy.telnet.is_enabled(self, mudpy.telnet.TELOPT_BINARY,
- mudpy.telnet.HIM):
+ if not mudpy.telnet.is_enabled(
+ self, mudpy.telnet.TELOPT_BINARY, mudpy.telnet.HIM):
asciiline = bytes([x for x in line if 32 <= x <= 126])
if line != asciiline:
logline = "Non-ASCII characters from "
try:
line = line.decode("utf-8")
except UnicodeDecodeError:
- logline = "Non-UTF-8 characters from "
+ logline = "Non-UTF-8 sequence from "
if self.account and self.account.get("name"):
logline += self.account.get("name") + ": "
else:
# ignoring color escape sequences
rel_pos = 0
- # the absolute position of the most recent whitespace character
- last_whitespace = 0
+ # the absolute and relative positions of the most recent whitespace
+ # character
+ last_abs_whitespace = 0
+ last_rel_whitespace = 0
# whether the current character is part of a color escape sequence
escape = False
# the current character is the escape character
if each_character == "\x1b" and not escape:
escape = True
+ rel_pos -= 1
# the current character is within an escape sequence
elif escape:
-
- # the current character is m, which terminates the
- # escape sequence
+ rel_pos -= 1
if each_character == "m":
+ # the current character is m, which terminates the
+ # escape sequence
escape = False
+ # the current character is a space
+ elif each_character == " ":
+ last_abs_whitespace = abs_pos
+ last_rel_whitespace = rel_pos
+
# the current character is a newline, so reset the relative
- # position (start a new line)
+ # position too (start a new line)
elif each_character == "\n":
rel_pos = 0
- last_whitespace = abs_pos
-
- # the current character meets the requested maximum line width,
- # so we need to backtrack and find a space at which to wrap;
- # special care is taken to avoid an off-by-one in case the
- # current character is a double-width glyph
- elif each_character != "\r" and (
- rel_pos >= width or (
- rel_pos >= width - 1 and glyph_columns(
- each_character
- ) == 2
- )
- ):
+ last_abs_whitespace = abs_pos
+ last_rel_whitespace = rel_pos
- # it's always possible we landed on whitespace
- if unicodedata.category(each_character) in ("Cc", "Zs"):
- last_whitespace = abs_pos
+ # the current character meets the requested maximum line width, so we
+ # need to wrap unless the current word is wider than the terminal (in
+ # which case we let it do the wrapping instead)
+ if last_rel_whitespace != 0 and (rel_pos > width or (
+ rel_pos > width - 1 and glyph_columns(each_character) == 2)):
- # insert an eol in place of the space
- text = text[:last_whitespace] + "\r\n" + text[last_whitespace + 1:]
+ # insert an eol in place of the last space
+ text = (text[:last_abs_whitespace] + "\r\n" +
+ text[last_abs_whitespace + 1:])
# increase the absolute position because an eol is two
# characters but the space it replaced was only one
# now we're at the begining of a new line, plus the
# number of characters wrapped from the previous line
- rel_pos = 0
- for remaining_characters in text[last_whitespace:abs_pos]:
- rel_pos += glyph_columns(remaining_characters)
+ rel_pos -= last_rel_whitespace
+ last_rel_whitespace = 0
# as long as the character is not a carriage return and the
# other above conditions haven't been met, count it as a
# printable character
elif each_character != "\r":
rel_pos += glyph_columns(each_character)
- if unicodedata.category(each_character) in ("Cc", "Zs"):
- last_whitespace = abs_pos
+ if each_character in (" ", "\n"):
+ last_abs_whitespace = abs_pos
+ last_rel_whitespace = rel_pos
# increase the absolute position for every character
abs_pos += 1
def reload_data():
"""Reload all relevant objects."""
- for user in universe.userlist[:]:
- user.reload()
- for element in universe.contents.values():
- if element.origin.is_writeable():
- element.reload()
+ universe.save()
+ old_userlist = universe.userlist[:]
+ for element in list(universe.contents.values()):
+ element.destroy()
universe.load()
+ for user in old_userlist:
+ user.reload()
def check_for_connection(listening_socket):
arguments = parameters.split()
if not parameters:
message = "What do you want to show?"
+ elif arguments[0] == "version":
+ message = repr(universe.versions)
elif arguments[0] == "time":
message = universe.groups["internal"]["counters"].get(
"elapsed"
log(*logline)
universe.setup_loglines = []
- # log an initial message
- log("Started mudpy with command line: " + " ".join(sys.argv))
-
# fork and disassociate
daemonize(universe)
# make the pidfile
create_pidfile(universe)
+ # load and store diagnostic info
+ universe.versions = mudpy.version.Versions("mudpy")
+
+ # log startup diagnostic messages
+ log("On %s at %s" % (universe.versions.python_version, sys.executable), 1)
+ log("Import path: %s" % ", ".join(sys.path), 1)
+ log("Installed dependencies: %s" % universe.versions.dependencies_text, 1)
+ log("Other python packages: %s" % universe.versions.environment_text, 1)
+ log("Started %s with command line: %s" % (
+ universe.versions.version, " ".join(sys.argv)), 1)
+
# pass the initialized universe back
return universe