"""Miscellaneous functions for the mudpy engine.""" # Copyright (c) 2004-2017 Jeremy Stanley . Permission # to use, copy, modify, and distribute this software is granted under # terms provided in the LICENSE file distributed with this software. import codecs import os import random import re import signal import socket import sys import syslog import time import traceback import unicodedata import mudpy class Element: """An element of the universe.""" def __init__(self, key, universe, filename=None, old_style=False): """Set up a new element.""" # TODO(fungi): This can be removed after the transition is complete self.old_style = old_style # keep track of our key name self.key = key # keep track of what universe it's loading into self.universe = universe # clone attributes if this is replacing another element if self.old_style and self.key in self.universe.contents: old_element = self.universe.contents[self.key] for attribute in vars(old_element).keys(): exec("self." + attribute + " = old_element." + attribute) if self.owner: self.owner.avatar = self # i guess this is a new element then else: # set of facet keys from the universe self.facethash = dict() # not owned by a user by default (used for avatars) self.owner = None # no contents in here by default self.contents = {} # parse out appropriate category and subkey names, add to list if self.key.find(":") > 0: self.category, self.subkey = self.key.split(":", 1) else: self.category = "other" self.subkey = self.key if self.category not in self.universe.categories: self.category = "other" self.subkey = self.key # get an appropriate filename for the origin if not filename: filename = self.universe.default_origins[self.category] if not os.path.isabs(filename): filename = os.path.abspath(filename) # add the file if it doesn't exist yet if filename not in self.universe.files: mudpy.data.DataFile(filename, self.universe) # record or reset a pointer to the origin file self.origin = self.universe.files[filename] # add a data section to the origin if necessary if self.key not in self.origin.data: self.origin.data[self.key] = {} # add or replace this element in the universe self.universe.contents[self.key] = self self.universe.categories[self.category][self.subkey] = self def reload(self): """Create a new element and replace this one.""" Element(self.key, self.universe, self.origin.filename, old_style=self.old_style) del(self) def destroy(self): """Remove an element from the universe and destroy it.""" del(self.origin.data[self.key]) del self.universe.categories[self.category][self.subkey] del self.universe.contents[self.key] del self def facets(self): """Return a list of non-inherited facets for this element.""" if self.old_style: try: return self.origin.data[self.key].keys() except (AttributeError, KeyError): return [] else: return self.facethash def has_facet(self, facet): """Return whether the non-inherited facet exists.""" return facet in self.facets() def remove_facet(self, facet): """Remove a facet from the element.""" if self.has_facet(facet): del(self.origin.data[self.key][facet]) self.origin.modified = True def ancestry(self): """Return a list of the element's inheritance lineage.""" if self.has_facet("inherit"): ancestry = self.get("inherit") if not ancestry: ancestry = [] for parent in ancestry[:]: ancestors = self.universe.contents[parent].ancestry() for ancestor in ancestors: if ancestor not in ancestry: ancestry.append(ancestor) return ancestry else: return [] def get(self, facet, default=None): """Retrieve values.""" if default is None: default = "" try: if self.old_style: return self.origin.data[self.key][facet] else: return self.origin.data[".".join((self.key, facet))] except (KeyError, TypeError): pass if self.has_facet("inherit"): for ancestor in self.ancestry(): if self.universe.contents[ancestor].has_facet(facet): return self.universe.contents[ancestor].get(facet) else: return default def set(self, facet, value): """Set values.""" if facet in ["loglevel"]: value = int(value) if not self.has_facet(facet) or not self.get(facet) == value: if self.old_style: if self.key not in self.origin.data: self.origin.data[self.key] = {} self.origin.data[self.key][facet] = value else: node = ".".join((self.key, facet)) self.origin.data[node] = value self.facethash[facet] = self.origin.data[node] self.origin.modified = True def append(self, facet, value): """Append value to a list.""" newlist = self.get(facet) if not newlist: newlist = [] if type(newlist) is not list: newlist = list(newlist) newlist.append(value) self.set(facet, newlist) def send( self, message, eol="$(eol)", raw=False, flush=False, add_prompt=True, just_prompt=False, add_terminator=False, prepend_padding=True ): """Convenience method to pass messages to an owner.""" if self.owner: self.owner.send( message, eol, raw, flush, add_prompt, just_prompt, add_terminator, prepend_padding ) def can_run(self, command): """Check if the user can run this command object.""" # has to be in the commands category if command not in self.universe.categories["command"].values(): result = False # avatars of administrators can run any command elif self.owner and self.owner.account.get("administrator"): result = True # everyone can run non-administrative commands elif not command.get("administrative"): result = True # otherwise the command cannot be run by this actor else: result = False # pass back the result return result def update_location(self): """Make sure the location's contents contain this element.""" area = self.get("location") if area in self.universe.contents: self.universe.contents[area].contents[self.key] = self def clean_contents(self): """Make sure the element's contents aren't bogus.""" for element in self.contents.values(): if element.get("location") != self.key: del self.contents[element.key] def go_to(self, area): """Relocate the element to a specific area.""" current = self.get("location") if current and self.key in self.universe.contents[current].contents: del universe.contents[current].contents[self.key] if area in self.universe.contents: self.set("location", area) self.universe.contents[area].contents[self.key] = self self.look_at(area) def go_home(self): """Relocate the element to its default location.""" self.go_to(self.get("default_location")) self.echo_to_location( "You suddenly realize that " + self.get("name") + " is here." ) def move_direction(self, direction): """Relocate the element in a specified direction.""" motion = self.universe.contents["mudpy.movement.%s" % direction] enter_term = motion.get("enter_term") exit_term = motion.get("exit_term") self.echo_to_location("%s exits %s." % (self.get("name"), exit_term)) self.send("You exit %s." % exit_term, add_prompt=False) self.go_to( self.universe.contents[ self.get("location")].link_neighbor(direction) ) self.echo_to_location("%s arrives from %s." % ( self.get("name"), enter_term)) def look_at(self, key): """Show an element to another element.""" if self.owner: element = self.universe.contents[key] message = "" name = element.get("name") if name: message += "$(cyn)" + name + "$(nrm)$(eol)" description = element.get("description") if description: message += description + "$(eol)" portal_list = list(element.portals().keys()) if portal_list: portal_list.sort() message += "$(cyn)[ Exits: " + ", ".join( portal_list ) + " ]$(nrm)$(eol)" for element in self.universe.contents[ self.get("location") ].contents.values(): if element.get("is_actor") and element is not self: message += "$(yel)" + element.get( "name" ) + " is here.$(nrm)$(eol)" elif element is not self: message += "$(grn)" + element.get( "impression" ) + "$(nrm)$(eol)" self.send(message) def portals(self): """Map the portal directions for an area to neighbors.""" portals = {} if re.match(r"""^area:-?\d+,-?\d+,-?\d+$""", self.key): coordinates = [(int(x)) for x in self.key.split(":")[1].split(",")] offsets = dict( (x, self.universe.contents["mudpy.movement.%s" % x].get("vector") ) for x in self.universe.directions) for portal in self.get("gridlinks"): adjacent = map(lambda c, o: c + o, coordinates, offsets[portal]) neighbor = "area:" + ",".join( [(str(x)) for x in adjacent] ) if neighbor in self.universe.contents: portals[portal] = neighbor for facet in self.facets(): if facet.startswith("link_"): neighbor = self.get(facet) if neighbor in self.universe.contents: portal = facet.split("_")[1] portals[portal] = neighbor return portals def link_neighbor(self, direction): """Return the element linked in a given direction.""" portals = self.portals() if direction in portals: return portals[direction] def echo_to_location(self, message): """Show a message to other elements in the current location.""" for element in self.universe.contents[ self.get("location") ].contents.values(): if element is not self: element.send(message) class Universe: """The universe.""" def __init__(self, filename="", load=False): """Initialize the universe.""" self.categories = {} self.contents = {} self.default_origins = {} self.directions = set() self.loading = False self.loglines = [] self.private_files = [] self.reload_flag = False self.setup_loglines = [] self.startdir = os.getcwd() self.terminate_flag = False self.userlist = [] if not filename: possible_filenames = [ "etc/mudpy.yaml", "/usr/local/mudpy/etc/mudpy.yaml", "/usr/local/etc/mudpy.yaml", "/etc/mudpy/mudpy.yaml", "/etc/mudpy.yaml" ] for filename in possible_filenames: if os.access(filename, os.R_OK): break if not os.path.isabs(filename): filename = os.path.join(self.startdir, filename) self.filename = filename if load: # make sure to preserve any accumulated log entries during load self.setup_loglines += self.load() def load(self): """Load universe data from persistent storage.""" # while loading, it's safe to update elements from read-only files self.loading = True # it's possible for this to enter before logging configuration is read pending_loglines = [] # the files dict must exist and filename needs to be read-only if not hasattr( self, "files" ) or not ( self.filename in self.files and self.files[ self.filename ].is_writeable() ): # clear out all read-only files if hasattr(self, "files"): for data_filename in list(self.files.keys()): if not self.files[data_filename].is_writeable(): del self.files[data_filename] # start loading from the initial file mudpy.data.DataFile(self.filename, self) # make a list of inactive avatars inactive_avatars = [] for account in self.categories["account"].values(): for avatar in account.get("avatars"): try: inactive_avatars.append(self.contents[avatar]) except KeyError: pending_loglines.append(( 'Missing avatar "%s", possible data corruption' % avatar, 6)) for user in self.userlist: if user.avatar in inactive_avatars: inactive_avatars.remove(user.avatar) # go through all elements to clear out inactive avatar locations for element in self.contents.values(): area = element.get("location") if element in inactive_avatars and area: if area in self.contents and element.key in self.contents[ area ].contents: del self.contents[area].contents[element.key] element.set("default_location", area) element.remove_facet("location") # another pass to straighten out all the element contents for element in self.contents.values(): element.update_location() element.clean_contents() # done loading, so disallow updating elements from read-only files self.loading = False return pending_loglines def new(self): """Create a new, empty Universe (the Big Bang).""" new_universe = Universe() for attribute in vars(self).keys(): exec("new_universe." + attribute + " = self." + attribute) new_universe.reload_flag = False del self return new_universe def save(self): """Save the universe to persistent storage.""" for key in self.files: self.files[key].save() def initialize_server_socket(self): """Create and open the listening socket.""" # need to know the local address and port number for the listener host = self.contents["mudpy.network"].get("host") port = self.contents["mudpy.network"].get("port") # if no host was specified, bind to all local addresses (preferring # ipv6) if not host: if socket.has_ipv6: host = "::" else: host = "0.0.0.0" # figure out if this is ipv4 or v6 family = socket.getaddrinfo(host, port)[0][0] if family is socket.AF_INET6 and not socket.has_ipv6: log("No support for IPv6 address %s (use IPv4 instead)." % host) # create a new stream-type socket object self.listening_socket = socket.socket(family, socket.SOCK_STREAM) # set the socket options to allow existing open ones to be # reused (fixes a bug where the server can't bind for a minute # when restarting on linux systems) self.listening_socket.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 ) # bind the socket to to our desired server ipa and port self.listening_socket.bind((host, port)) # disable blocking so we can proceed whether or not we can # send/receive self.listening_socket.setblocking(0) # start listening on the socket self.listening_socket.listen(1) # note that we're now ready for user connections log( "Listening for Telnet connections on: " + host + ":" + str(port) ) def get_time(self): """Convenience method to get the elapsed time counter.""" return self.categories["internal"]["counters"].get("elapsed") class User: """This is a connected user.""" def __init__(self): """Default values for the in-memory user variables.""" self.account = None self.address = "" self.authenticated = False self.avatar = None self.columns = 79 self.connection = None self.error = "" self.input_queue = [] self.last_address = "" self.last_input = universe.get_time() self.menu_choices = {} self.menu_seen = False self.negotiation_pause = 0 self.output_queue = [] self.partial_input = b"" self.password_tries = 0 self.state = "initial" self.telopts = {} def quit(self): """Log, close the connection and remove.""" if self.account: name = self.account.get("name") else: name = "" if name: message = "User " + name else: message = "An unnamed user" message += " logged out." log(message, 2) self.deactivate_avatar() self.connection.close() self.remove() def check_idle(self): """Warn or disconnect idle users as appropriate.""" idletime = universe.get_time() - self.last_input linkdead_dict = universe.contents[ "mudpy.timing.idle.disconnect"].facets() if self.state in linkdead_dict: linkdead_state = self.state else: linkdead_state = "default" if idletime > linkdead_dict[linkdead_state]: self.send( "$(eol)$(red)You've done nothing for far too long... goodbye!" + "$(nrm)$(eol)", flush=True, add_prompt=False ) logline = "Disconnecting " if self.account and self.account.get("name"): logline += self.account.get("name") else: logline += "an unknown user" logline += (" after idling too long in the " + self.state + " state.") log(logline, 2) self.state = "disconnecting" self.menu_seen = False idle_dict = universe.contents["mudpy.timing.idle.warn"].facets() if self.state in idle_dict: idle_state = self.state else: idle_state = "default" if idletime == idle_dict[idle_state]: self.send( "$(eol)$(red)If you continue to be unproductive, " + "you'll be shown the door...$(nrm)$(eol)" ) def reload(self): """Save, load a new user and relocate the connection.""" # get out of the list self.remove() # create a new user object new_user = User() # set everything equivalent for attribute in vars(self).keys(): exec("new_user." + attribute + " = self." + attribute) # the avatar needs a new owner if new_user.avatar: new_user.avatar.owner = new_user # add it to the list universe.userlist.append(new_user) # get rid of the old user object del(self) def replace_old_connections(self): """Disconnect active users with the same name.""" # the default return value return_value = False # iterate over each user in the list for old_user in universe.userlist: # the name is the same but it's not us if hasattr( old_user, "account" ) and old_user.account and old_user.account.get( "name" ) == self.account.get( "name" ) and old_user is not self: # make a note of it log( "User " + self.account.get( "name" ) + " reconnected--closing old connection to " + old_user.address + ".", 2 ) old_user.send( "$(eol)$(red)New connection from " + self.address + ". Terminating old connection...$(nrm)$(eol)", flush=True, add_prompt=False ) # close the old connection old_user.connection.close() # replace the old connection with this one old_user.send( "$(eol)$(red)Taking over old connection from " + old_user.address + ".$(nrm)" ) old_user.connection = self.connection old_user.last_address = old_user.address old_user.address = self.address # take this one out of the list and delete self.remove() del(self) return_value = True break # true if an old connection was replaced, false if not return return_value def authenticate(self): """Flag the user as authenticated and disconnect duplicates.""" if self.state is not "authenticated": log("User " + self.account.get("name") + " logged in.", 2) self.authenticated = True if ("mudpy.limit" in universe.contents and self.account.subkey in universe.contents["mudpy.limit"].get("admins")): self.account.set("administrator", "True") def show_menu(self): """Send the user their current menu.""" if not self.menu_seen: self.menu_choices = get_menu_choices(self) self.send( get_menu(self.state, self.error, self.menu_choices), "", add_terminator=True ) self.menu_seen = True self.error = False self.adjust_echoing() def adjust_echoing(self): """Adjust echoing to match state menu requirements.""" if mudpy.telnet.is_enabled(self, mudpy.telnet.TELOPT_ECHO, mudpy.telnet.US): if menu_echo_on(self.state): mudpy.telnet.disable(self, mudpy.telnet.TELOPT_ECHO, mudpy.telnet.US) elif not menu_echo_on(self.state): mudpy.telnet.enable(self, mudpy.telnet.TELOPT_ECHO, mudpy.telnet.US) def remove(self): """Remove a user from the list of connected users.""" universe.userlist.remove(self) def send( self, output, eol="$(eol)", raw=False, flush=False, add_prompt=True, just_prompt=False, add_terminator=False, prepend_padding=True ): """Send arbitrary text to a connected user.""" # unless raw mode is on, clean it up all nice and pretty if not raw: # strip extra $(eol) off if present while output.startswith("$(eol)"): output = output[6:] while output.endswith("$(eol)"): output = output[:-6] extra_lines = output.find("$(eol)$(eol)$(eol)") while extra_lines > -1: output = output[:extra_lines] + output[extra_lines + 6:] extra_lines = output.find("$(eol)$(eol)$(eol)") # start with a newline, append the message, then end # with the optional eol string passed to this function # and the ansi escape to return to normal text if not just_prompt and prepend_padding: if (not self.output_queue or not self.output_queue[-1].endswith(b"\r\n")): output = "$(eol)" + output elif not self.output_queue[-1].endswith( b"\r\n\x1b[0m\r\n" ) and not self.output_queue[-1].endswith( b"\r\n\r\n" ): output = "$(eol)" + output output += eol + chr(27) + "[0m" # tack on a prompt if active if self.state == "active": if not just_prompt: output += "$(eol)" if add_prompt: output += "> " mode = self.avatar.get("mode") if mode: output += "(" + mode + ") " # find and replace macros in the output output = replace_macros(self, output) # wrap the text at the client's width (min 40, 0 disables) if self.columns: if self.columns < 40: wrap = 40 else: wrap = self.columns output = wrap_ansi_text(output, wrap) # if supported by the client, encode it utf-8 if mudpy.telnet.is_enabled(self, mudpy.telnet.TELOPT_BINARY, mudpy.telnet.US): encoded_output = output.encode("utf-8") # otherwise just send ascii else: encoded_output = output.encode("ascii", "replace") # end with a terminator if requested if add_prompt or add_terminator: if mudpy.telnet.is_enabled( self, mudpy.telnet.TELOPT_EOR, mudpy.telnet.US): encoded_output += mudpy.telnet.telnet_proto( mudpy.telnet.IAC, mudpy.telnet.EOR) elif not mudpy.telnet.is_enabled( self, mudpy.telnet.TELOPT_SGA, mudpy.telnet.US): encoded_output += mudpy.telnet.telnet_proto( mudpy.telnet.IAC, mudpy.telnet.GA) # and tack it onto the queue self.output_queue.append(encoded_output) # if this is urgent, flush all pending output if flush: self.flush() # just dump raw bytes as requested else: self.output_queue.append(output) self.flush() def pulse(self): """All the things to do to the user per increment.""" # if the world is terminating, disconnect if universe.terminate_flag: self.state = "disconnecting" self.menu_seen = False # check for an idle connection and act appropriately else: self.check_idle() # if output is paused, decrement the counter if self.state == "initial": if self.negotiation_pause: self.negotiation_pause -= 1 else: self.state = "entering_account_name" # show the user a menu as needed elif not self.state == "active": self.show_menu() # flush any pending output in the queue self.flush() # disconnect users with the appropriate state if self.state == "disconnecting": self.quit() # check for input and add it to the queue self.enqueue_input() # there is input waiting in the queue if self.input_queue: handle_user_input(self) def flush(self): """Try to send the last item in the queue and remove it.""" if self.output_queue: try: self.connection.send(self.output_queue[0]) except BrokenPipeError: if self.account and self.account.get("name"): account = self.account.get("name") else: account = "an unknown user" self.state = "disconnecting" log("Broken pipe sending to %s." % account, 7) del self.output_queue[0] def enqueue_input(self): """Process and enqueue any new input.""" # check for some input try: raw_input = self.connection.recv(1024) except (BlockingIOError, OSError): raw_input = b"" # we got something if raw_input: # tack this on to any previous partial self.partial_input += raw_input # reply to and remove any IAC negotiation codes mudpy.telnet.negotiate_telnet_options(self) # separate multiple input lines new_input_lines = self.partial_input.split(b"\n") # if input doesn't end in a newline, replace the # held partial input with the last line of it if not self.partial_input.endswith(b"\n"): self.partial_input = new_input_lines.pop() # otherwise, chop off the extra null input and reset # the held partial input else: new_input_lines.pop() self.partial_input = b"" # iterate over the remaining lines for line in new_input_lines: # strip off extra whitespace line = line.strip() # log non-printable characters remaining if mudpy.telnet.is_enabled(self, mudpy.telnet.TELOPT_BINARY, mudpy.telnet.HIM): asciiline = bytes([x for x in line if 32 <= x <= 126]) if line != asciiline: logline = "Non-ASCII characters from " if self.account and self.account.get("name"): logline += self.account.get("name") + ": " else: logline += "unknown user: " logline += repr(line) log(logline, 4) line = asciiline try: line = line.decode("utf-8") except UnicodeDecodeError: logline = "Non-UTF-8 characters from " if self.account and self.account.get("name"): logline += self.account.get("name") + ": " else: logline += "unknown user: " logline += repr(line) log(logline, 4) return line = unicodedata.normalize("NFKC", line) # put on the end of the queue self.input_queue.append(line) def new_avatar(self): """Instantiate a new, unconfigured avatar for this user.""" counter = 0 while "avatar:" + self.account.get("name") + ":" + str( counter ) in universe.categories["actor"].keys(): counter += 1 self.avatar = Element( "actor:avatar:" + self.account.get("name") + ":" + str( counter ), universe, old_style=True ) self.avatar.append("inherit", "archetype:avatar") self.account.append("avatars", self.avatar.key) def delete_avatar(self, avatar): """Remove an avatar from the world and from the user's list.""" if self.avatar is universe.contents[avatar]: self.avatar = None universe.contents[avatar].destroy() avatars = self.account.get("avatars") avatars.remove(avatar) self.account.set("avatars", avatars) def activate_avatar_by_index(self, index): """Enter the world with a particular indexed avatar.""" self.avatar = universe.contents[ self.account.get("avatars")[index]] self.avatar.owner = self self.state = "active" self.avatar.go_home() def deactivate_avatar(self): """Have the active avatar leave the world.""" if self.avatar: current = self.avatar.get("location") if current: self.avatar.set("default_location", current) self.avatar.echo_to_location( "You suddenly wonder where " + self.avatar.get( "name" ) + " went." ) del universe.contents[current].contents[self.avatar.key] self.avatar.remove_facet("location") self.avatar.owner = None self.avatar = None def destroy(self): """Destroy the user and associated avatars.""" for avatar in self.account.get("avatars"): self.delete_avatar(avatar) self.account.destroy() def list_avatar_names(self): """List names of assigned avatars.""" avatars = [] for avatar in self.account.get("avatars"): try: avatars.append(universe.contents[avatar].get("name")) except KeyError: log('Missing avatar "%s", possible data corruption.' % avatar, 6) return avatars def broadcast(message, add_prompt=True): """Send a message to all connected users.""" for each_user in universe.userlist: each_user.send("$(eol)" + message, add_prompt=add_prompt) def log(message, level=0): """Log a message.""" # a couple references we need if "mudpy.log" in universe.contents: file_name = universe.contents["mudpy.log"].get("file", "") max_log_lines = universe.contents["mudpy.log"].get("lines", 0) syslog_name = universe.contents["mudpy.log"].get("syslog", "") else: file_name = "" max_log_lines = 0 syslog_name = "" timestamp = time.asctime()[4:19] # turn the message into a list of nonempty lines lines = [x for x in [(x.rstrip()) for x in message.split("\n")] if x != ""] # send the timestamp and line to a file if file_name: if not os.path.isabs(file_name): file_name = os.path.join(universe.startdir, file_name) file_descriptor = codecs.open(file_name, "a", "utf-8") for line in lines: file_descriptor.write(timestamp + " " + line + "\n") file_descriptor.flush() file_descriptor.close() # send the timestamp and line to standard output if ("mudpy.log" in universe.contents and universe.contents["mudpy.log"].get("stdout")): for line in lines: print(timestamp + " " + line) # send the line to the system log if syslog_name: syslog.openlog( syslog_name.encode("utf-8"), syslog.LOG_PID, syslog.LOG_INFO | syslog.LOG_DAEMON ) for line in lines: syslog.syslog(line) syslog.closelog() # display to connected administrators for user in universe.userlist: if user.state == "active" and user.account.get( "administrator" ) and user.account.get("loglevel", 0) <= level: # iterate over every line in the message full_message = "" for line in lines: full_message += ( "$(bld)$(red)" + timestamp + " " + line.replace("$(", "$_(") + "$(nrm)$(eol)") user.send(full_message, flush=True) # add to the recent log list for line in lines: while 0 < len(universe.loglines) >= max_log_lines: del universe.loglines[0] universe.loglines.append((level, timestamp + " " + line)) def get_loglines(level, start, stop): """Return a specific range of loglines filtered by level.""" # filter the log lines loglines = [x for x in universe.loglines if x[0] >= level] # we need these in several places total_count = str(len(universe.loglines)) filtered_count = len(loglines) # don't proceed if there are no lines if filtered_count: # can't start before the begining or at the end if start > filtered_count: start = filtered_count if start < 1: start = 1 # can't stop before we start if stop > start: stop = start elif stop < 1: stop = 1 # some preamble message = "There are " + str(total_count) message += " log lines in memory and " + str(filtered_count) message += " at or above level " + str(level) + "." message += " The matching lines from " + str(stop) + " to " message += str(start) + " are:$(eol)$(eol)" # add the text from the selected lines if stop > 1: range_lines = loglines[-start:-(stop - 1)] else: range_lines = loglines[-start:] for line in range_lines: message += " (" + str(line[0]) + ") " + line[1].replace( "$(", "$_(" ) + "$(eol)" # there were no lines else: message = "None of the " + str(total_count) message += " lines in memory matches your request." # pass it back return message def glyph_columns(character): """Convenience function to return the column width of a glyph.""" if unicodedata.east_asian_width(character) in "FW": return 2 else: return 1 def wrap_ansi_text(text, width): """Wrap text with arbitrary width while ignoring ANSI colors.""" # the current position in the entire text string, including all # characters, printable or otherwise abs_pos = 0 # the current text position relative to the begining of the line, # ignoring color escape sequences rel_pos = 0 # the absolute position of the most recent whitespace character last_whitespace = 0 # whether the current character is part of a color escape sequence escape = False # normalize any potentially composited unicode before we count it text = unicodedata.normalize("NFKC", text) # iterate over each character from the begining of the text for each_character in text: # the current character is the escape character if each_character == "\x1b" and not escape: escape = True # the current character is within an escape sequence elif escape: # the current character is m, which terminates the # escape sequence if each_character == "m": escape = False # the current character is a newline, so reset the relative # position (start a new line) elif each_character == "\n": rel_pos = 0 last_whitespace = abs_pos # the current character meets the requested maximum line width, # so we need to backtrack and find a space at which to wrap; # special care is taken to avoid an off-by-one in case the # current character is a double-width glyph elif each_character != "\r" and ( rel_pos >= width or ( rel_pos >= width - 1 and glyph_columns( each_character ) == 2 ) ): # it's always possible we landed on whitespace if unicodedata.category(each_character) in ("Cc", "Zs"): last_whitespace = abs_pos # insert an eol in place of the space text = text[:last_whitespace] + "\r\n" + text[last_whitespace + 1:] # increase the absolute position because an eol is two # characters but the space it replaced was only one abs_pos += 1 # now we're at the begining of a new line, plus the # number of characters wrapped from the previous line rel_pos = 0 for remaining_characters in text[last_whitespace:abs_pos]: rel_pos += glyph_columns(remaining_characters) # as long as the character is not a carriage return and the # other above conditions haven't been met, count it as a # printable character elif each_character != "\r": rel_pos += glyph_columns(each_character) if unicodedata.category(each_character) in ("Cc", "Zs"): last_whitespace = abs_pos # increase the absolute position for every character abs_pos += 1 # return the newly-wrapped text return text def weighted_choice(data): """Takes a dict weighted by value and returns a random key.""" # this will hold our expanded list of keys from the data expanded = [] # create the expanded list of keys for key in data.keys(): for count in range(data[key]): expanded.append(key) # return one at random return random.choice(expanded) def random_name(): """Returns a random character name.""" # the vowels and consonants needed to create romaji syllables vowels = [ "a", "i", "u", "e", "o" ] consonants = [ "'", "k", "z", "s", "sh", "z", "j", "t", "ch", "ts", "d", "n", "h", "f", "m", "y", "r", "w" ] # this dict will hold our weighted list of syllables syllables = {} # generate the list with an even weighting for consonant in consonants: for vowel in vowels: syllables[consonant + vowel] = 1 # we'll build the name into this string name = "" # create a name of random length from the syllables for syllable in range(random.randrange(2, 6)): name += weighted_choice(syllables) # strip any leading quotemark, capitalize and return the name return name.strip("'").capitalize() def replace_macros(user, text, is_input=False): """Replaces macros in text output.""" # third person pronouns pronouns = { "female": {"obj": "her", "pos": "hers", "sub": "she"}, "male": {"obj": "him", "pos": "his", "sub": "he"}, "neuter": {"obj": "it", "pos": "its", "sub": "it"} } # a dict of replacement macros macros = { "eol": "\r\n", "bld": chr(27) + "[1m", "nrm": chr(27) + "[0m", "blk": chr(27) + "[30m", "blu": chr(27) + "[34m", "cyn": chr(27) + "[36m", "grn": chr(27) + "[32m", "mgt": chr(27) + "[35m", "red": chr(27) + "[31m", "yel": chr(27) + "[33m", } # add dynamic macros where possible if user.account: account_name = user.account.get("name") if account_name: macros["account"] = account_name if user.avatar: avatar_gender = user.avatar.get("gender") if avatar_gender: macros["tpop"] = pronouns[avatar_gender]["obj"] macros["tppp"] = pronouns[avatar_gender]["pos"] macros["tpsp"] = pronouns[avatar_gender]["sub"] # loop until broken while True: # find and replace per the macros dict macro_start = text.find("$(") if macro_start == -1: break macro_end = text.find(")", macro_start) + 1 macro = text[macro_start + 2:macro_end - 1] if macro in macros.keys(): replacement = macros[macro] # this is how we handle local file inclusion (dangerous!) elif macro.startswith("inc:"): incfile = mudpy.data.find_file(macro[4:], universe=universe) if os.path.exists(incfile): incfd = codecs.open(incfile, "r", "utf-8") replacement = "" for line in incfd: if line.endswith("\n") and not line.endswith("\r\n"): line = line.replace("\n", "\r\n") replacement += line # lose the trailing eol replacement = replacement[:-2] else: replacement = "" log("Couldn't read included " + incfile + " file.", 7) # if we get here, log and replace it with null else: replacement = "" if not is_input: log("Unexpected replacement macro " + macro + " encountered.", 6) # and now we act on the replacement text = text.replace("$(" + macro + ")", replacement) # replace the look-like-a-macro sequence text = text.replace("$_(", "$(") return text def escape_macros(value): """Escapes replacement macros in text.""" if type(value) is str: return value.replace("$(", "$_(") else: return value def first_word(text, separator=" "): """Returns a tuple of the first word and the rest.""" if text: if text.find(separator) > 0: return text.split(separator, 1) else: return text, "" else: return "", "" def on_pulse(): """The things which should happen on each pulse, aside from reloads.""" # open the listening socket if it hasn't been already if not hasattr(universe, "listening_socket"): universe.initialize_server_socket() # assign a user if a new connection is waiting user = check_for_connection(universe.listening_socket) if user: universe.userlist.append(user) # iterate over the connected users for user in universe.userlist: user.pulse() # add an element for counters if it doesn't exist if "counters" not in universe.categories["internal"]: universe.categories["internal"]["counters"] = Element( "internal:counters", universe, old_style=True ) # update the log every now and then if not universe.categories["internal"]["counters"].get("mark"): log(str(len(universe.userlist)) + " connection(s)") universe.categories["internal"]["counters"].set( "mark", universe.contents["mudpy.timing"].get("status") ) else: universe.categories["internal"]["counters"].set( "mark", universe.categories["internal"]["counters"].get( "mark" ) - 1 ) # periodically save everything if not universe.categories["internal"]["counters"].get("save"): universe.save() universe.categories["internal"]["counters"].set( "save", universe.contents["mudpy.timing"].get("save") ) else: universe.categories["internal"]["counters"].set( "save", universe.categories["internal"]["counters"].get( "save" ) - 1 ) # pause for a configurable amount of time (decimal seconds) time.sleep(universe.contents["mudpy.timing"].get("increment")) # increase the elapsed increment counter universe.categories["internal"]["counters"].set( "elapsed", universe.categories["internal"]["counters"].get( "elapsed", 0 ) + 1 ) def reload_data(): """Reload all relevant objects.""" for user in universe.userlist[:]: user.reload() for element in universe.contents.values(): if element.origin.is_writeable(): element.reload() universe.load() def check_for_connection(listening_socket): """Check for a waiting connection and return a new user object.""" # try to accept a new connection try: connection, address = listening_socket.accept() except BlockingIOError: return None # note that we got one log("Connection from " + address[0], 2) # disable blocking so we can proceed whether or not we can send/receive connection.setblocking(0) # create a new user object user = User() # associate this connection with it user.connection = connection # set the user's ipa from the connection's ipa user.address = address[0] # let the client know we WILL EOR (RFC 885) mudpy.telnet.enable(user, mudpy.telnet.TELOPT_EOR, mudpy.telnet.US) user.negotiation_pause = 2 # return the new user object return user def get_menu(state, error=None, choices=None): """Show the correct menu text to a user.""" # make sure we don't reuse a mutable sequence by default if choices is None: choices = {} # get the description or error text message = get_menu_description(state, error) # get menu choices for the current state message += get_formatted_menu_choices(state, choices) # try to get a prompt, if it was defined message += get_menu_prompt(state) # throw in the default choice, if it exists message += get_formatted_default_menu_choice(state) # display a message indicating if echo is off message += get_echo_message(state) # return the assembly of various strings defined above return message def menu_echo_on(state): """True if echo is on, false if it is off.""" return universe.categories["menu"][state].get("echo", True) def get_echo_message(state): """Return a message indicating that echo is off.""" if menu_echo_on(state): return "" else: return "(won't echo) " def get_default_menu_choice(state): """Return the default choice for a menu.""" return universe.categories["menu"][state].get("default") def get_formatted_default_menu_choice(state): """Default menu choice foratted for inclusion in a prompt string.""" default_choice = get_default_menu_choice(state) if default_choice: return "[$(red)" + default_choice + "$(nrm)] " else: return "" def get_menu_description(state, error): """Get the description or error text.""" # an error condition was raised by the handler if error: # try to get an error message matching the condition # and current state description = universe.categories[ "menu"][state].get("error_" + error) if not description: description = "That is not a valid choice..." description = "$(red)" + description + "$(nrm)" # there was no error condition else: # try to get a menu description for the current state description = universe.categories["menu"][state].get("description") # return the description or error message if description: description += "$(eol)$(eol)" return description def get_menu_prompt(state): """Try to get a prompt, if it was defined.""" prompt = universe.categories["menu"][state].get("prompt") if prompt: prompt += " " return prompt def get_menu_choices(user): """Return a dict of choice:meaning.""" menu = universe.categories["menu"][user.state] create_choices = menu.get("create") if create_choices: choices = eval(create_choices) else: choices = {} ignores = [] options = {} creates = {} for facet in menu.facets(): if facet.startswith("demand_") and not eval( universe.categories["menu"][user.state].get(facet) ): ignores.append(facet.split("_", 2)[1]) elif facet.startswith("create_"): creates[facet] = facet.split("_", 2)[1] elif facet.startswith("choice_"): options[facet] = facet.split("_", 2)[1] for facet in creates.keys(): if not creates[facet] in ignores: choices[creates[facet]] = eval(menu.get(facet)) for facet in options.keys(): if not options[facet] in ignores: choices[options[facet]] = menu.get(facet) return choices def get_formatted_menu_choices(state, choices): """Returns a formatted string of menu choices.""" choice_output = "" choice_keys = list(choices.keys()) choice_keys.sort() for choice in choice_keys: choice_output += " [$(red)" + choice + "$(nrm)] " + choices[ choice ] + "$(eol)" if choice_output: choice_output += "$(eol)" return choice_output def get_menu_branches(state): """Return a dict of choice:branch.""" branches = {} for facet in universe.categories["menu"][state].facets(): if facet.startswith("branch_"): branches[ facet.split("_", 2)[1] ] = universe.categories["menu"][state].get(facet) return branches def get_default_branch(state): """Return the default branch.""" return universe.categories["menu"][state].get("branch") def get_choice_branch(user, choice): """Returns the new state matching the given choice.""" branches = get_menu_branches(user.state) if choice in branches.keys(): return branches[choice] elif choice in user.menu_choices.keys(): return get_default_branch(user.state) else: return "" def get_menu_actions(state): """Return a dict of choice:branch.""" actions = {} for facet in universe.categories["menu"][state].facets(): if facet.startswith("action_"): actions[ facet.split("_", 2)[1] ] = universe.categories["menu"][state].get(facet) return actions def get_default_action(state): """Return the default action.""" return universe.categories["menu"][state].get("action") def get_choice_action(user, choice): """Run any indicated script for the given choice.""" actions = get_menu_actions(user.state) if choice in actions.keys(): return actions[choice] elif choice in user.menu_choices.keys(): return get_default_action(user.state) else: return "" def handle_user_input(user): """The main handler, branches to a state-specific handler.""" # if the user's client echo is off, send a blank line for aesthetics if mudpy.telnet.is_enabled(user, mudpy.telnet.TELOPT_ECHO, mudpy.telnet.US): user.send("", add_prompt=False, prepend_padding=False) # check to make sure the state is expected, then call that handler if "handler_" + user.state in globals(): exec("handler_" + user.state + "(user)") else: generic_menu_handler(user) # since we got input, flag that the menu/prompt needs to be redisplayed user.menu_seen = False # update the last_input timestamp while we're at it user.last_input = universe.get_time() def generic_menu_handler(user): """A generic menu choice handler.""" # get a lower-case representation of the next line of input if user.input_queue: choice = user.input_queue.pop(0) if choice: choice = choice.lower() else: choice = "" if not choice: choice = get_default_menu_choice(user.state) if choice in user.menu_choices: exec(get_choice_action(user, choice)) new_state = get_choice_branch(user, choice) if new_state: user.state = new_state else: user.error = "default" def handler_entering_account_name(user): """Handle the login account name.""" # get the next waiting line of input input_data = user.input_queue.pop(0) # did the user enter anything? if input_data: # keep only the first word and convert to lower-case name = input_data.lower() # fail if there are non-alphanumeric characters if name != "".join(filter( lambda x: x >= "0" and x <= "9" or x >= "a" and x <= "z", name)): user.error = "bad_name" # if that account exists, time to request a password elif name in universe.categories["account"]: user.account = universe.categories["account"][name] user.state = "checking_password" # otherwise, this could be a brand new user else: user.account = Element("account:" + name, universe, old_style=True) user.account.set("name", name) log("New user: " + name, 2) user.state = "checking_new_account_name" # if the user entered nothing for a name, then buhbye else: user.state = "disconnecting" def handler_checking_password(user): """Handle the login account password.""" # get the next waiting line of input input_data = user.input_queue.pop(0) if "mudpy.limit" in universe.contents: max_password_tries = universe.contents["mudpy.limit"].get( "password_tries", 3) else: max_password_tries = 3 # does the hashed input equal the stored hash? if mudpy.password.verify(input_data, user.account.get("passhash")): # if so, set the username and load from cold storage if not user.replace_old_connections(): user.authenticate() user.state = "main_utility" # if at first your hashes don't match, try, try again elif user.password_tries < max_password_tries - 1: user.password_tries += 1 user.error = "incorrect" # we've exceeded the maximum number of password failures, so disconnect else: user.send( "$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)" ) user.state = "disconnecting" def handler_entering_new_password(user): """Handle a new password entry.""" # get the next waiting line of input input_data = user.input_queue.pop(0) if "mudpy.limit" in universe.contents: max_password_tries = universe.contents["mudpy.limit"].get( "password_tries", 3) else: max_password_tries = 3 # make sure the password is strong--at least one upper, one lower and # one digit, seven or more characters in length if len(input_data) > 6 and len( list(filter(lambda x: x >= "0" and x <= "9", input_data)) ) and len( list(filter(lambda x: x >= "A" and x <= "Z", input_data)) ) and len( list(filter(lambda x: x >= "a" and x <= "z", input_data)) ): # hash and store it, then move on to verification user.account.set("passhash", mudpy.password.create(input_data)) user.state = "verifying_new_password" # the password was weak, try again if you haven't tried too many times elif user.password_tries < max_password_tries - 1: user.password_tries += 1 user.error = "weak" # too many tries, so adios else: user.send( "$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)" ) user.account.destroy() user.state = "disconnecting" def handler_verifying_new_password(user): """Handle the re-entered new password for verification.""" # get the next waiting line of input input_data = user.input_queue.pop(0) if "mudpy.limit" in universe.contents: max_password_tries = universe.contents["mudpy.limit"].get( "password_tries", 3) else: max_password_tries = 3 # hash the input and match it to storage if mudpy.password.verify(input_data, user.account.get("passhash")): user.authenticate() # the hashes matched, so go active if not user.replace_old_connections(): user.state = "main_utility" # go back to entering the new password as long as you haven't tried # too many times elif user.password_tries < max_password_tries - 1: user.password_tries += 1 user.error = "differs" user.state = "entering_new_password" # otherwise, sayonara else: user.send( "$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)" ) user.account.destroy() user.state = "disconnecting" def handler_active(user): """Handle input for active users.""" # get the next waiting line of input input_data = user.input_queue.pop(0) # is there input? if input_data: # split out the command and parameters actor = user.avatar mode = actor.get("mode") if mode and input_data.startswith("!"): command_name, parameters = first_word(input_data[1:]) elif mode == "chat": command_name = "say" parameters = input_data else: command_name, parameters = first_word(input_data) # lowercase the command command_name = command_name.lower() # the command matches a command word for which we have data if command_name in universe.categories["command"]: command = universe.categories["command"][command_name] else: command = None # if it's allowed, do it if actor.can_run(command): exec(command.get("action")) # otherwise, give an error elif command_name: command_error(actor, input_data) # if no input, just idle back with a prompt else: user.send("", just_prompt=True) def command_halt(actor, parameters): """Halt the world.""" if actor.owner: # see if there's a message or use a generic one if parameters: message = "Halting: " + parameters else: message = "User " + actor.owner.account.get( "name" ) + " halted the world." # let everyone know broadcast(message, add_prompt=False) log(message, 8) # set a flag to terminate the world universe.terminate_flag = True def command_reload(actor): """Reload all code modules, configs and data.""" if actor.owner: # let the user know and log actor.send("Reloading all code modules, configs and data.") log( "User " + actor.owner.account.get("name") + " reloaded the world.", 6 ) # set a flag to reload universe.reload_flag = True def command_quit(actor): """Leave the world and go back to the main menu.""" if actor.owner: actor.owner.state = "main_utility" actor.owner.deactivate_avatar() def command_help(actor, parameters): """List available commands and provide help for commands.""" # did the user ask for help on a specific command word? if parameters and actor.owner: # is the command word one for which we have data? if parameters in universe.categories["command"]: command = universe.categories["command"][parameters] else: command = None # only for allowed commands if actor.can_run(command): # add a description if provided description = command.get("description") if not description: description = "(no short description provided)" if command.get("administrative"): output = "$(red)" else: output = "$(grn)" output += parameters + "$(nrm) - " + description + "$(eol)$(eol)" # add the help text if provided help_text = command.get("help") if not help_text: help_text = "No help is provided for this command." output += help_text # list related commands see_also = command.get("see_also") if see_also: really_see_also = "" for item in see_also: if item in universe.categories["command"]: command = universe.categories["command"][item] if actor.can_run(command): if really_see_also: really_see_also += ", " if command.get("administrative"): really_see_also += "$(red)" else: really_see_also += "$(grn)" really_see_also += item + "$(nrm)" if really_see_also: output += "$(eol)$(eol)See also: " + really_see_also # no data for the requested command word else: output = "That is not an available command." # no specific command word was indicated else: # give a sorted list of commands with descriptions if provided output = "These are the commands available to you:$(eol)$(eol)" sorted_commands = list(universe.categories["command"].keys()) sorted_commands.sort() for item in sorted_commands: command = universe.categories["command"][item] if actor.can_run(command): description = command.get("description") if not description: description = "(no short description provided)" if command.get("administrative"): output += " $(red)" else: output += " $(grn)" output += item + "$(nrm) - " + description + "$(eol)" output += ('$(eol)Enter "help COMMAND" for help on a command ' 'named "COMMAND".') # send the accumulated output to the user actor.send(output) def command_move(actor, parameters): """Move the avatar in a given direction.""" if parameters in universe.contents[actor.get("location")].portals(): actor.move_direction(parameters) else: actor.send("You cannot go that way.") def command_look(actor, parameters): """Look around.""" if parameters: actor.send("You can't look at or in anything yet.") else: actor.look_at(actor.get("location")) def command_say(actor, parameters): """Speak to others in the same area.""" # check for replacement macros and escape them parameters = escape_macros(parameters) # if the message is wrapped in quotes, remove them and leave contents # intact if parameters.startswith('"') and parameters.endswith('"'): message = parameters[1:-1] literal = True # otherwise, get rid of stray quote marks on the ends of the message else: message = parameters.strip('''"'`''') literal = False # the user entered a message if message: # match the punctuation used, if any, to an action if "mudpy.linguistic" in universe.contents: actions = universe.contents["mudpy.linguistic"].get("actions", {}) default_punctuation = (universe.contents["mudpy.linguistic"].get( "default_punctuation", ".")) else: actions = {} default_punctuation = "." action = "" # reverse sort punctuation options so the longest match wins for mark in sorted(actions.keys(), reverse=True): if not literal and message.endswith(mark): action = actions[mark] break # add punctuation if needed if not action: action = actions[default_punctuation] if message and not ( literal or unicodedata.category(message[-1]) == "Po" ): message += default_punctuation # failsafe checks to avoid unwanted reformatting and null strings if message and not literal: # decapitalize the first letter to improve matching message = message[0].lower() + message[1:] # iterate over all words in message, replacing typos if "mudpy.linguistic" in universe.contents: typos = universe.contents["mudpy.linguistic"].get("typos", {}) else: typos = {} words = message.split() for index in range(len(words)): word = words[index] while unicodedata.category(word[0]) == "Po": word = word[1:] while unicodedata.category(word[-1]) == "Po": word = word[:-1] if word in typos.keys(): words[index] = words[index].replace(word, typos[word]) message = " ".join(words) # capitalize the first letter message = message[0].upper() + message[1:] # tell the area if message: actor.echo_to_location( actor.get("name") + " " + action + 's, "' + message + '"' ) actor.send("You " + action + ', "' + message + '"') # there was no message else: actor.send("What do you want to say?") def command_chat(actor): """Toggle chat mode.""" mode = actor.get("mode") if not mode: actor.set("mode", "chat") actor.send("Entering chat mode (use $(grn)!chat$(nrm) to exit).") elif mode == "chat": actor.remove_facet("mode") actor.send("Exiting chat mode.") else: actor.send("Sorry, but you're already busy with something else!") def command_show(actor, parameters): """Show program data.""" message = "" arguments = parameters.split() if not parameters: message = "What do you want to show?" elif arguments[0] == "time": message = universe.categories["internal"]["counters"].get( "elapsed" ) + " increments elapsed since the world was created." elif arguments[0] == "categories": message = "These are the element categories:$(eol)" categories = list(universe.categories.keys()) categories.sort() for category in categories: message += "$(eol) $(grn)" + category + "$(nrm)" elif arguments[0] == "files": message = "These are the current files containing the universe:$(eol)" filenames = list(universe.files.keys()) filenames.sort() for filename in filenames: if universe.files[filename].is_writeable(): status = "rw" else: status = "ro" message += ("$(eol) $(red)(" + status + ") $(grn)" + filename + "$(nrm)") elif arguments[0] == "category": if len(arguments) != 2: message = "You must specify one category." elif arguments[1] in universe.categories: message = ('These are the elements in the "' + arguments[1] + '" category:$(eol)') elements = [ ( universe.categories[arguments[1]][x].key ) for x in universe.categories[arguments[1]].keys() ] elements.sort() for element in elements: message += "$(eol) $(grn)" + element + "$(nrm)" else: message = 'Category "' + arguments[1] + '" does not exist.' elif arguments[0] == "file": if len(arguments) != 2: message = "You must specify one file." elif arguments[1] in universe.files: message = ('These are the elements in the "' + arguments[1] + '" file:$(eol)') elements = universe.files[arguments[1]].data.keys() elements.sort() for element in elements: message += "$(eol) $(grn)" + element + "$(nrm)" else: message = 'Category "' + arguments[1] + '" does not exist.' elif arguments[0] == "element": if len(arguments) != 2: message = "You must specify one element." elif arguments[1].strip(".") in universe.contents: element = universe.contents[arguments[1].strip(".")] message = ('These are the properties of the "' + arguments[1] + '" element (in "' + element.origin.filename + '"):$(eol)') facets = element.facets() for facet in sorted(facets): if element.old_style: message += ("$(eol) $(grn)%s: $(red)%s$(nrm)" % (facet, escape_macros(element.get(facet)))) else: message += ("$(eol) $(grn)%s: $(red)%s$(nrm)" % (facet, str(facets[facet]))) else: message = 'Element "' + arguments[1] + '" does not exist.' elif arguments[0] == "result": if len(arguments) < 2: message = "You need to specify an expression." else: try: message = repr(eval(" ".join(arguments[1:]))) except Exception as e: message = ("$(red)Your expression raised an exception...$(eol)" "$(eol)$(bld)%s$(nrm)" % e) elif arguments[0] == "log": if len(arguments) == 4: if re.match(r"^\d+$", arguments[3]) and int(arguments[3]) >= 0: stop = int(arguments[3]) else: stop = -1 else: stop = 0 if len(arguments) >= 3: if re.match(r"^\d+$", arguments[2]) and int(arguments[2]) > 0: start = int(arguments[2]) else: start = -1 else: start = 10 if len(arguments) >= 2: if (re.match(r"^\d+$", arguments[1]) and 0 <= int(arguments[1]) <= 9): level = int(arguments[1]) else: level = -1 elif 0 <= actor.owner.account.get("loglevel", 0) <= 9: level = actor.owner.account.get("loglevel", 0) else: level = 1 if level > -1 and start > -1 and stop > -1: message = get_loglines(level, start, stop) else: message = ("When specified, level must be 0-9 (default 1), " "start and stop must be >=1 (default 10 and 1).") else: message = '''I don't know what "''' + parameters + '" is.' actor.send(message) def command_create(actor, parameters): """Create an element if it does not exist.""" if not parameters: message = "You must at least specify an element to create." elif not actor.owner: message = "" else: arguments = parameters.split() if len(arguments) == 1: arguments.append("") if len(arguments) == 2: element, filename = arguments if element in universe.contents: message = 'The "' + element + '" element already exists.' else: message = ('You create "' + element + '" within the universe.') logline = actor.owner.account.get( "name" ) + " created an element: " + element if filename: logline += " in file " + filename if filename not in universe.files: message += ( ' Warning: "' + filename + '" is not yet ' "included in any other file and will not be read " "on startup unless this is remedied.") Element(element, universe, filename, old_style=True) log(logline, 6) elif len(arguments) > 2: message = "You can only specify an element and a filename." actor.send(message) def command_destroy(actor, parameters): """Destroy an element if it exists.""" if actor.owner: if not parameters: message = "You must specify an element to destroy." else: if parameters not in universe.contents: message = 'The "' + parameters + '" element does not exist.' else: universe.contents[parameters].destroy() message = ('You destroy "' + parameters + '" within the universe.') log( actor.owner.account.get( "name" ) + " destroyed an element: " + parameters, 6 ) actor.send(message) def command_set(actor, parameters): """Set a facet of an element.""" if not parameters: message = "You must specify an element, a facet and a value." else: arguments = parameters.split(" ", 2) if len(arguments) == 1: message = ('What facet of element "' + arguments[0] + '" would you like to set?') elif len(arguments) == 2: message = ('What value would you like to set for the "' + arguments[1] + '" facet of the "' + arguments[0] + '" element?') else: element, facet, value = arguments if element not in universe.contents: message = 'The "' + element + '" element does not exist.' else: try: universe.contents[element].set(facet, value) except ValueError: message = ('Value "%s" of type "%s" cannot be coerced ' 'to the correct datatype for facet "%s".' % (value, type(value), facet)) else: message = ('You have successfully (re)set the "' + facet + '" facet of element "' + element + '". Try "show element ' + element + '" for verification.') actor.send(message) def command_delete(actor, parameters): """Delete a facet from an element.""" if not parameters: message = "You must specify an element and a facet." else: arguments = parameters.split(" ") if len(arguments) == 1: message = ('What facet of element "' + arguments[0] + '" would you like to delete?') elif len(arguments) != 2: message = "You may only specify an element and a facet." else: element, facet = arguments if element not in universe.contents: message = 'The "' + element + '" element does not exist.' elif facet not in universe.contents[element].facets(): message = ('The "' + element + '" element has no "' + facet + '" facet.') else: universe.contents[element].remove_facet(facet) message = ('You have successfully deleted the "' + facet + '" facet of element "' + element + '". Try "show element ' + element + '" for verification.') actor.send(message) def command_error(actor, input_data): """Generic error for an unrecognized command word.""" # 90% of the time use a generic error if random.randrange(10): message = '''I'm not sure what "''' + input_data + '''" means...''' # 10% of the time use the classic diku error else: message = "Arglebargle, glop-glyf!?!" # send the error message actor.send(message) def daemonize(universe): """Fork and disassociate from everything.""" # only if this is what we're configured to do if "mudpy.process" in universe.contents and universe.contents[ "mudpy.process"].get("daemon"): # log before we start forking around, so the terminal gets the message log("Disassociating from the controlling terminal.") # fork off and die, so we free up the controlling terminal if os.fork(): os._exit(0) # switch to a new process group os.setsid() # fork some more, this time to free us from the old process group if os.fork(): os._exit(0) # reset the working directory so we don't needlessly tie up mounts os.chdir("/") # clear the file creation mask so we can bend it to our will later os.umask(0) # redirect stdin/stdout/stderr and close off their former descriptors for stdpipe in range(3): os.close(stdpipe) sys.stdin = codecs.open("/dev/null", "r", "utf-8") sys.__stdin__ = codecs.open("/dev/null", "r", "utf-8") sys.stdout = codecs.open("/dev/null", "w", "utf-8") sys.stderr = codecs.open("/dev/null", "w", "utf-8") sys.__stdout__ = codecs.open("/dev/null", "w", "utf-8") sys.__stderr__ = codecs.open("/dev/null", "w", "utf-8") def create_pidfile(universe): """Write a file containing the current process ID.""" pid = str(os.getpid()) log("Process ID: " + pid) if "mudpy.process" in universe.contents: file_name = universe.contents["mudpy.process"].get("pidfile", "") else: file_name = "" if file_name: if not os.path.isabs(file_name): file_name = os.path.join(universe.startdir, file_name) file_descriptor = codecs.open(file_name, "w", "utf-8") file_descriptor.write(pid + "\n") file_descriptor.flush() file_descriptor.close() def remove_pidfile(universe): """Remove the file containing the current process ID.""" if "mudpy.process" in universe.contents: file_name = universe.contents["mudpy.process"].get("pidfile", "") else: file_name = "" if file_name: if not os.path.isabs(file_name): file_name = os.path.join(universe.startdir, file_name) if os.access(file_name, os.W_OK): os.remove(file_name) def excepthook(excepttype, value, tracebackdata): """Handle uncaught exceptions.""" # assemble the list of errors into a single string message = "".join( traceback.format_exception(excepttype, value, tracebackdata) ) # try to log it, if possible try: log(message, 9) except Exception as e: # try to write it to stderr, if possible sys.stderr.write(message + "\nException while logging...\n%s" % e) def sighook(what, where): """Handle external signals.""" # a generic message message = "Caught signal: " # for a hangup signal if what == signal.SIGHUP: message += "hangup (reloading)" universe.reload_flag = True # for a terminate signal elif what == signal.SIGTERM: message += "terminate (halting)" universe.terminate_flag = True # catchall for unexpected signals else: message += str(what) + " (unhandled)" # log what happened log(message, 8) def override_excepthook(): """Redefine sys.excepthook with our own.""" sys.excepthook = excepthook def assign_sighook(): """Assign a customized handler for some signals.""" signal.signal(signal.SIGHUP, sighook) signal.signal(signal.SIGTERM, sighook) def setup(): """This contains functions to be performed when starting the engine.""" # see if a configuration file was specified if len(sys.argv) > 1: conffile = sys.argv[1] else: conffile = "" # the big bang global universe universe = Universe(conffile, True) # report any loglines which accumulated during setup for logline in universe.setup_loglines: log(*logline) universe.setup_loglines = [] # log an initial message log("Started mudpy with command line: " + " ".join(sys.argv)) # fork and disassociate daemonize(universe) # override the default exception handler so we get logging first thing override_excepthook() # set up custom signal handlers assign_sighook() # make the pidfile create_pidfile(universe) # pass the initialized universe back return universe def finish(): """These are functions performed when shutting down the engine.""" # the loop has terminated, so save persistent data universe.save() # log a final message log("Shutting down now.") # get rid of the pidfile remove_pidfile(universe)