"""Core objects for the mudpy engine.""" # Copyright (c) 2004-2008 Jeremy Stanley . Permission # to use, copy, modify, and distribute this software is granted under # terms provided in the LICENSE file distributed with this software. # import some things we need from ConfigParser import RawConfigParser from md5 import new as new_md5 from os import _exit, R_OK, W_OK, access, chdir, chmod, close, fork, getcwd, getpid, listdir, makedirs, remove, rename, setsid, stat, umask from os.path import abspath, basename, dirname, exists, isabs, join as path_join from random import choice, randrange from re import match from signal import SIGHUP, SIGTERM, signal from socket import AF_INET, SO_REUSEADDR, SOCK_STREAM, SOL_SOCKET, socket from stat import S_IMODE, ST_MODE from string import digits, letters, punctuation, uppercase from sys import argv, stderr from syslog import LOG_PID, LOG_INFO, LOG_DAEMON, closelog, openlog, syslog from telnetlib import DO, DONT, ECHO, EOR, GA, IAC, LINEMODE, SB, SE, SGA, WILL, WONT from time import asctime, sleep from traceback import format_exception class Element: """An element of the universe.""" def __init__(self, key, universe, filename=None): """Set up a new element.""" # keep track of our key name self.key = key # keep track of what universe it's loading` into self.universe = universe # clone attributes if this is replacing another element if self.key in self.universe.contents: old_element = self.universe.contents[self.key] for attribute in vars(old_element).keys(): exec("self." + attribute + " = old_element." + attribute) if self.owner: self.owner.avatar = self # i guess this is a new element then else: # not owned by a user by default (used for avatars) self.owner = None # no contents in here by default self.contents = {} # parse out appropriate category and subkey names, add to list if self.key.find(":") > 0: self.category, self.subkey = self.key.split(":", 1) else: self.category = "other" self.subkey = self.key if not self.category in self.universe.categories: self.category = "other" self.subkey = self.key # get an appropriate filename for the origin if not filename: filename = self.universe.default_origins[self.category] if not isabs(filename): filename = abspath(filename) # add the file if it doesn't exist yet if not filename in self.universe.files: DataFile(filename, self.universe) # record or reset a pointer to the origin file self.origin = self.universe.files[filename] # add a data section to the origin if necessary if not self.origin.data.has_section(self.key): self.origin.data.add_section(self.key) # add or replace this element in the universe self.universe.contents[self.key] = self self.universe.categories[self.category][self.subkey] = self def reload(self): """Create a new element and replace this one.""" new_element = Element(self.key, self.universe, self.origin.filename) del(self) def destroy(self): """Remove an element from the universe and destroy it.""" self.origin.data.remove_section(self.key) del self.universe.categories[self.category][self.subkey] del self.universe.contents[self.key] del self def facets(self): """Return a list of non-inherited facets for this element.""" if self.key in self.origin.data.sections(): return self.origin.data.options(self.key) else: return [] def has_facet(self, facet): """Return whether the non-inherited facet exists.""" return facet in self.facets() def remove_facet(self, facet): """Remove a facet from the element.""" if self.has_facet(facet): self.origin.data.remove_option(self.key, facet) self.origin.modified = True def ancestry(self): """Return a list of the element's inheritance lineage.""" if self.has_facet("inherit"): ancestry = self.getlist("inherit") for parent in ancestry[:]: ancestors = self.universe.contents[parent].ancestry() for ancestor in ancestors: if ancestor not in ancestry: ancestry.append(ancestor) return ancestry else: return [] def get(self, facet, default=None): """Retrieve values.""" if default is None: default = "" if self.origin.data.has_option(self.key, facet): return self.origin.data.get(self.key, facet) elif self.has_facet("inherit"): for ancestor in self.ancestry(): if self.universe.contents[ancestor].has_facet(facet): return self.universe.contents[ancestor].get(facet) else: return default def getboolean(self, facet, default=None): """Retrieve values as boolean type.""" if default is None: default=False if self.origin.data.has_option(self.key, facet): return self.origin.data.getboolean(self.key, facet) elif self.has_facet("inherit"): for ancestor in self.ancestry(): if self.universe.contents[ancestor].has_facet(facet): return self.universe.contents[ancestor].getboolean(facet) else: return default def getint(self, facet, default=None): """Return values as int/long type.""" if default is None: default = 0 if self.origin.data.has_option(self.key, facet): return self.origin.data.getint(self.key, facet) elif self.has_facet("inherit"): for ancestor in self.ancestry(): if self.universe.contents[ancestor].has_facet(facet): return self.universe.contents[ancestor].getint(facet) else: return default def getfloat(self, facet, default=None): """Return values as float type.""" if default is None: default = 0.0 if self.origin.data.has_option(self.key, facet): return self.origin.data.getfloat(self.key, facet) elif self.has_facet("inherit"): for ancestor in self.ancestry(): if self.universe.contents[ancestor].has_facet(facet): return self.universe.contents[ancestor].getfloat(facet) else: return default def getlist(self, facet, default=None): """Return values as list type.""" if default is None: default = [] value = self.get(facet) if value: return makelist(value) else: return default def getdict(self, facet, default=None): """Return values as dict type.""" if default is None: default = {} value = self.get(facet) if value: return makedict(value) else: return default def set(self, facet, value): """Set values.""" if not self.has_facet(facet) or not self.get(facet) == value: if type(value) is long: value = str(value) elif not type(value) is str: value = repr(value) self.origin.data.set(self.key, facet, value) self.origin.modified = True def append(self, facet, value): """Append value tp a list.""" if type(value) is long: value = str(value) elif not type(value) is str: value = repr(value) newlist = self.getlist(facet) newlist.append(value) self.set(facet, newlist) def new_event(self, action, when=None): """Create, attach and enqueue an event element.""" # if when isn't specified, that means now if not when: when = self.universe.get_time() # events are elements themselves event = Element("event:" + self.key + ":" + counter) def send(self, message, eol="$(eol)", raw=False, flush=False, add_prompt=True, just_prompt=False): """Convenience method to pass messages to an owner.""" if self.owner: self.owner.send(message, eol, raw, flush, add_prompt, just_prompt) def can_run(self, command): """Check if the user can run this command object.""" # has to be in the commands category if command not in self.universe.categories["command"].values(): result = False # avatars of administrators can run any command elif self.owner and self.owner.account.getboolean("administrator"): result = True # everyone can run non-administrative commands elif not command.getboolean("administrative"): result = True # otherwise the command cannot be run by this actor else: result = False # pass back the result return result def update_location(self): """Make sure the location's contents contain this element.""" location = self.get("location") if location in self.universe.contents: self.universe.contents[location].contents[self.key] = self def clean_contents(self): """Make sure the element's contents aren't bogus.""" for element in self.contents.values(): if element.get("location") != self.key: del self.contents[element.key] def go_to(self, location): """Relocate the element to a specific location.""" current = self.get("location") if current and self.key in self.universe.contents[current].contents: del universe.contents[current].contents[self.key] if location in self.universe.contents: self.set("location", location) self.universe.contents[location].contents[self.key] = self self.look_at(location) def go_home(self): """Relocate the element to its default location.""" self.go_to(self.get("default_location")) self.echo_to_location("You suddenly realize that " + self.get("name") + " is here.") def move_direction(self, direction): """Relocate the element in a specified direction.""" self.echo_to_location(self.get("name") + " exits " + self.universe.categories["internal"]["directions"].getdict(direction)["exit"] + ".") self.send("You exit " + self.universe.categories["internal"]["directions"].getdict(direction)["exit"] + ".", add_prompt=False) self.go_to(self.universe.contents[self.get("location")].link_neighbor(direction)) self.echo_to_location(self.get("name") + " arrives from " + self.universe.categories["internal"]["directions"].getdict(direction)["enter"] + ".") def look_at(self, key): """Show an element to another element.""" if self.owner: element = self.universe.contents[key] message = "" name = element.get("name") if name: message += "$(cyn)" + name + "$(nrm)$(eol)" description = element.get("description") if description: message += description + "$(eol)" portal_list = element.portals().keys() if portal_list: portal_list.sort() message += "$(cyn)[ Exits: " + ", ".join(portal_list) + " ]$(nrm)$(eol)" for element in self.universe.contents[self.get("location")].contents.values(): if element.getboolean("is_actor") and element is not self: message += "$(yel)" + element.get("name") + " is here.$(nrm)$(eol)" elif element is not self: message += "$(grn)" + element.get("impression") + "$(nrm)$(eol)" self.send(message) def portals(self): """Map the portal directions for a room to neighbors.""" portals = {} if match("""^location:-?\d+,-?\d+,-?\d+$""", self.key): coordinates = [(int(x)) for x in self.key.split(":")[1].split(",")] directions = self.universe.categories["internal"]["directions"] offsets = dict([(x, directions.getdict(x)["vector"]) for x in directions.facets()]) for portal in self.getlist("gridlinks"): adjacent = map(lambda c,o: c+o, coordinates, offsets[portal]) neighbor = "location:" + ",".join([(str(x)) for x in adjacent]) if neighbor in self.universe.contents: portals[portal] = neighbor for facet in self.facets(): if facet.startswith("link_"): neighbor = self.get(facet) if neighbor in self.universe.contents: portal = facet.split("_")[1] portals[portal] = neighbor return portals def link_neighbor(self, direction): """Return the element linked in a given direction.""" portals = self.portals() if direction in portals: return portals[direction] def echo_to_location(self, message): """Show a message to other elements in the current location.""" for element in self.universe.contents[self.get("location")].contents.values(): if element is not self: element.send(message) class DataFile: """A file containing universe elements.""" def __init__(self, filename, universe): self.filename = filename self.universe = universe self.load() def load(self): """Read a file and create elements accordingly.""" self.modified = False self.data = RawConfigParser() if access(self.filename, R_OK): self.data.read(self.filename) if not hasattr(self.universe, "files"): self.universe.files = {} self.universe.files[self.filename] = self if self.data.has_option("__control__", "include_files"): includes = makelist(self.data.get("__control__", "include_files")) else: includes = [] if self.data.has_option("__control__", "default_files"): origins = makedict(self.data.get("__control__", "default_files")) for key in origins.keys(): if not isabs(origins[key]): origins[key] = path_join(dirname(self.filename), origins[key]) if not origins[key] in includes: includes.append(origins[key]) self.universe.default_origins[key] = origins[key] if not key in self.universe.categories: self.universe.categories[key] = {} if self.data.has_option("__control__", "private_files"): for item in makelist(self.data.get("__control__", "private_files")): if not item in includes: includes.append(item) if not item in self.universe.private_files: if not isabs(item): item = path_join(dirname(self.filename), item) self.universe.private_files.append(item) for section in self.data.sections(): if section != "__control__": Element(section, self.universe, self.filename) for include_file in includes: if not isabs(include_file): include_file = path_join(dirname(self.filename), include_file) if include_file not in self.universe.files or not self.universe.files[include_file].is_writeable(): DataFile(include_file, self.universe) def save(self): """Write the data, if necessary.""" # when modified, writeable and has content or the file exists if self.modified and self.is_writeable() and ( self.data.sections() or exists(self.filename) ): # make parent directories if necessary if not exists(dirname(self.filename)): makedirs(dirname(self.filename)) # backup the file if self.data.has_option("__control__", "backup_count"): max_count = self.data.has_option("__control__", "backup_count") else: max_count = universe.categories["internal"]["limits"].getint("default_backup_count") if exists(self.filename) and max_count: backups = [] for candidate in listdir(dirname(self.filename)): if match(basename(self.filename) + """\.\d+$""", candidate): backups.append(int(candidate.split(".")[-1])) backups.sort() backups.reverse() for old_backup in backups: if old_backup >= max_count-1: remove(self.filename+"."+str(old_backup)) elif not exists(self.filename+"."+str(old_backup+1)): rename(self.filename+"."+str(old_backup), self.filename+"."+str(old_backup+1)) if not exists(self.filename+".0"): rename(self.filename, self.filename+".0") # our data file file_descriptor = file(self.filename, "w") # if it's marked private, chmod it appropriately if self.filename in self.universe.private_files and oct(S_IMODE(stat(self.filename)[ST_MODE])) != 0600: chmod(self.filename, 0600) # write it back sorted, instead of using ConfigParser sections = self.data.sections() sections.sort() for section in sections: file_descriptor.write("[" + section + "]\n") options = self.data.options(section) options.sort() for option in options: file_descriptor.write(option + " = " + self.data.get(section, option) + "\n") file_descriptor.write("\n") # flush and close the file file_descriptor.flush() file_descriptor.close() # unset the modified flag self.modified = False def is_writeable(self): """Returns True if the __control__ read_only is False.""" return not self.data.has_option("__control__", "read_only") or not self.data.getboolean("__control__", "read_only") class Universe: """The universe.""" def __init__(self, filename="", load=False): """Initialize the universe.""" self.categories = {} self.contents = {} self.default_origins = {} self.loglines = [] self.pending_events_long = {} self.pending_events_short = {} self.private_files = [] self.reload_flag = False self.startdir = getcwd() self.terminate_flag = False self.userlist = [] if not filename: possible_filenames = [ ".mudpyrc", ".mudpy/mudpyrc", ".mudpy/mudpy.conf", "mudpy.conf", "etc/mudpy.conf", "/usr/local/mudpy/mudpy.conf", "/usr/local/mudpy/etc/mudpy.conf", "/etc/mudpy/mudpy.conf", "/etc/mudpy.conf" ] for filename in possible_filenames: if access(filename, R_OK): break if not isabs(filename): filename = path_join(self.startdir, filename) self.filename = filename if load: self.load() def load(self): """Load universe data from persistent storage.""" # the files dict must exist and filename needs to be read-only if not hasattr(self, "files") or not ( self.filename in self.files and self.files[self.filename].is_writeable() ): # clear out all read-only files if hasattr(self, "files"): for data_filename in self.files.keys(): if not self.files[data_filename].is_writeable(): del self.files[data_filename] # start loading from the initial file DataFile(self.filename, self) # make a list of inactive avatars inactive_avatars = [] for account in self.categories["account"].values(): inactive_avatars += [ (self.contents[x]) for x in account.getlist("avatars") ] for user in self.userlist: if user.avatar in inactive_avatars: inactive_avatars.remove(user.avatar) # go through all elements to clear out inactive avatar locations for element in self.contents.values(): location = element.get("location") if element in inactive_avatars and location: if location in self.contents and element.key in self.contents[location].contents: del self.contents[location].contents[element.key] element.set("default_location", location) element.remove_facet("location") # another pass to straighten out all the element contents for element in self.contents.values(): element.update_location() element.clean_contents() def new(self): """Create a new, empty Universe (the Big Bang).""" new_universe = Universe() for attribute in vars(self).keys(): exec("new_universe." + attribute + " = self." + attribute) new_universe.reload_flag = False del self return new_universe def save(self): """Save the universe to persistent storage.""" for key in self.files: self.files[key].save() def initialize_server_socket(self): """Create and open the listening socket.""" # create a new ipv4 stream-type socket object self.listening_socket = socket(AF_INET, SOCK_STREAM) # set the socket options to allow existing open ones to be # reused (fixes a bug where the server can't bind for a minute # when restarting on linux systems) self.listening_socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) # bind the socket to to our desired server ipa and port host = self.categories["internal"]["network"].get("host") port = self.categories["internal"]["network"].getint("port") self.listening_socket.bind((host, port)) # disable blocking so we can proceed whether or not we can # send/receive self.listening_socket.setblocking(0) # start listening on the socket self.listening_socket.listen(1) # note that we're now ready for user connections if not host: host = "0.0.0.0" log("Listening for Telnet connections on: " + host + ":" + str(port)) def get_time(self): """Convenience method to get the elapsed time counter.""" return self.categories["internal"]["counters"].getint("elapsed") class User: """This is a connected user.""" def __init__(self): """Default values for the in-memory user variables.""" self.account = None self.address = "" self.authenticated = False self.avatar = None self.connection = None self.echoing = True self.error = "" self.input_queue = [] self.last_address = "" self.last_input = universe.get_time() self.menu_choices = {} self.menu_seen = False self.negotiation_pause = 0 self.output_queue = [] self.partial_input = "" self.password_tries = 0 self.received_newline = True self.state = "initial" self.terminator = IAC+GA def quit(self): """Log, close the connection and remove.""" if self.account: name = self.account.get("name") else: name = "" if name: message = "User " + name else: message = "An unnamed user" message += " logged out." log(message, 2) self.deactivate_avatar() self.connection.close() self.remove() def check_idle(self): """Warn or disconnect idle users as appropriate.""" idletime = universe.get_time() - self.last_input linkdead_dict = universe.categories["internal"]["time"].getdict("linkdead") if self.state in linkdead_dict: linkdead_state = self.state else: linkdead_state = "default" if idletime > linkdead_dict[linkdead_state]: self.send("$(eol)$(red)You've done nothing for far too long... goodbye!$(nrm)$(eol)", flush=True, add_prompt=False) logline = "Disconnecting " if self.account and self.account.get("name"): logline += self.account.get("name") else: logline += "an unknown user" logline += " after idling too long in a " + self.state + " state." log(logline, 2) self.state = "disconnecting" self.menu_seen = False idle_dict = universe.categories["internal"]["time"].getdict("idle") if self.state in idle_dict: idle_state = self.state else: idle_state = "default" if idletime == idle_dict[idle_state]: self.send("$(eol)$(red)If you continue to be unproductive, you'll be shown the door...$(nrm)$(eol)") def reload(self): """Save, load a new user and relocate the connection.""" # get out of the list self.remove() # create a new user object new_user = User() # set everything equivalent for attribute in vars(self).keys(): exec("new_user." + attribute + " = self." + attribute) # the avatar needs a new owner if new_user.avatar: new_user.avatar.owner = new_user # add it to the list universe.userlist.append(new_user) # get rid of the old user object del(self) def replace_old_connections(self): """Disconnect active users with the same name.""" # the default return value return_value = False # iterate over each user in the list for old_user in universe.userlist: # the name is the same but it's not us if hasattr(old_user, "account") and old_user.account and old_user.account.get("name") == self.account.get("name") and old_user is not self: # make a note of it log("User " + self.account.get("name") + " reconnected--closing old connection to " + old_user.address + ".", 2) old_user.send("$(eol)$(red)New connection from " + self.address + ". Terminating old connection...$(nrm)$(eol)", flush=True, add_prompt=False) # close the old connection old_user.connection.close() # replace the old connection with this one old_user.send("$(eol)$(red)Taking over old connection from " + old_user.address + ".$(nrm)") old_user.connection = self.connection old_user.last_address = old_user.address old_user.address = self.address # may need to tell the new connection to echo if old_user.echoing: old_user.send(get_echo_sequence(old_user.state, self.echoing), raw=True) # take this one out of the list and delete self.remove() del(self) return_value = True break # true if an old connection was replaced, false if not return return_value def authenticate(self): """Flag the user as authenticated and disconnect duplicates.""" if not self.state is "authenticated": log("User " + self.account.get("name") + " logged in.", 2) self.authenticated = True if self.account.subkey in universe.categories["internal"]["limits"].getlist("default_admins"): self.account.set("administrator", "True") def show_menu(self): """Send the user their current menu.""" if not self.menu_seen: self.menu_choices = get_menu_choices(self) self.send(get_menu(self.state, self.error, self.echoing, self.terminator, self.menu_choices), "") self.menu_seen = True self.error = False self.adjust_echoing() def adjust_echoing(self): """Adjust echoing to match state menu requirements.""" if self.echoing and not menu_echo_on(self.state): self.echoing = False elif not self.echoing and menu_echo_on(self.state): self.echoing = True def remove(self): """Remove a user from the list of connected users.""" universe.userlist.remove(self) def send(self, output, eol="$(eol)", raw=False, flush=False, add_prompt=True, just_prompt=False): """Send arbitrary text to a connected user.""" # unless raw mode is on, clean it up all nice and pretty if not raw: # strip extra $(eol) off if present while output.startswith("$(eol)"): output = output[6:] while output.endswith("$(eol)"): output = output[:-6] extra_lines = output.find("$(eol)$(eol)$(eol)") while extra_lines > -1: output = output[:extra_lines] + output[extra_lines+6:] extra_lines = output.find("$(eol)$(eol)$(eol)") # we'll take out GA or EOR and add them back on the end if output.endswith(IAC+GA) or output.endswith(IAC+EOR): terminate = True output = output[:-2] else: terminate = False # start with a newline, append the message, then end # with the optional eol string passed to this function # and the ansi escape to return to normal text if not just_prompt: if not self.output_queue or not self.output_queue[-1].endswith("\r\n"): output = "$(eol)$(eol)" + output elif not self.output_queue[-1].endswith("\r\n"+chr(27)+"[0m"+"\r\n") and not self.output_queue[-1].endswith("\r\n\r\n"): output = "$(eol)" + output output += eol + chr(27) + "[0m" # tack on a prompt if active if self.state == "active": if not just_prompt: output += "$(eol)" if add_prompt: output += "> " mode = self.avatar.get("mode") if mode: output += "(" + mode + ") " # find and replace macros in the output output = replace_macros(self, output) # wrap the text at 79 characters output = wrap_ansi_text(output, 79) # tack the terminator back on if terminate: output += self.terminator # drop the output into the user's output queue self.output_queue.append(output) # if this is urgent, flush all pending output if flush: self.flush() def pulse(self): """All the things to do to the user per increment.""" # if the world is terminating, disconnect if universe.terminate_flag: self.state = "disconnecting" self.menu_seen = False # check for an idle connection and act appropriately else: self.check_idle() # if output is paused, decrement the counter if self.state == "initial": if self.negotiation_pause: self.negotiation_pause -= 1 else: self.state = "entering_account_name" # show the user a menu as needed elif not self.state == "active": self.show_menu() # flush any pending output in teh queue self.flush() # disconnect users with the appropriate state if self.state == "disconnecting": self.quit() # check for input and add it to the queue self.enqueue_input() # there is input waiting in the queue if self.input_queue: handle_user_input(self) def flush(self): """Try to send the last item in the queue and remove it.""" if self.output_queue: if self.received_newline: self.received_newline = False if self.output_queue[0].startswith("\r\n"): self.output_queue[0] = self.output_queue[0][2:] try: self.connection.send(self.output_queue[0]) del self.output_queue[0] except: pass def enqueue_input(self): """Process and enqueue any new input.""" # check for some input try: input_data = self.connection.recv(1024) except: input_data = "" # we got something if input_data: # tack this on to any previous partial self.partial_input += input_data # reply to and remove any IAC negotiation codes self.negotiate_telnet_options() # separate multiple input lines new_input_lines = self.partial_input.split("\n") # if input doesn't end in a newline, replace the # held partial input with the last line of it if not self.partial_input.endswith("\n"): self.partial_input = new_input_lines.pop() # otherwise, chop off the extra null input and reset # the held partial input else: new_input_lines.pop() self.partial_input = "" # iterate over the remaining lines for line in new_input_lines: # remove a trailing carriage return if line.endswith("\r"): line = line.rstrip("\r") # log non-printable characters remaining removed = filter(lambda x: (x < " " or x > "~"), line) if removed: logline = "Non-printable characters from " if self.account and self.account.get("name"): logline += self.account.get("name") + ": " else: logline += "unknown user: " logline += repr(removed) log(logline, 4) # filter out non-printables line = filter(lambda x: " " <= x <= "~", line) # strip off extra whitespace line = line.strip() # put on the end of the queue self.input_queue.append(line) def negotiate_telnet_options(self): """Reply to/remove partial_input telnet negotiation options.""" # start at the begining of the input position = 0 # make a local copy to play with text = self.partial_input # as long as we haven't checked it all while position < len(text): # jump to the first IAC you find position = text.find(IAC, position) # if there wasn't an IAC in the input, skip to the end if position < 0: position = len(text) # replace a double (literal) IAC if there's an LF later elif len(text) > position+1 and text[position+1] == IAC: if text.find("\n", position) > 0: text = text.replace(IAC+IAC, IAC) else: position += 1 position += 1 # this must be an option negotiation elif len(text) > position+2 and text[position+1] in (DO, DONT, WILL, WONT): negotiation = text[position+1:position+3] # if we turned echo off, ignore the confirmation if not self.echoing and negotiation == DO+ECHO: pass # allow LINEMODE elif negotiation == WILL+LINEMODE: self.send(IAC+DO+LINEMODE, raw=True) # if the client likes EOR instead of GA, make a note of it elif negotiation == DO+EOR: self.terminator = IAC+EOR elif negotiation == DONT+EOR and self.terminator == IAC+EOR: self.terminator = IAC+GA # if the client doesn't want GA, oblige elif negotiation == DO+SGA and self.terminator == IAC+GA: self.terminator = "" self.send(IAC+WILL+SGA, raw=True) # we don't want to allow anything else elif text[position+1] == DO: self.send(IAC+WONT+text[position+2], raw=True) elif text[position+1] == WILL: self.send(IAC+DONT+text[position+2], raw=True) # strip the negotiation from the input text = text.replace(text[position:position+3], "") # get rid of IAC SB .* IAC SE elif len(text) > position+4 and text[position:position+2] == IAC+SB: end_subnegotiation = text.find(IAC+SE, position) if end_subnegotiation > 0: text = text[:position] + text[end_subnegotiation+2:] else: position += 1 # otherwise, strip out a two-byte IAC command elif len(text) > position+2: text = text.replace(text[position:position+2], "") # and this means we got the begining of an IAC else: position += 1 # replace the input with our cleaned-up text self.partial_input = text def new_avatar(self): """Instantiate a new, unconfigured avatar for this user.""" counter = 0 while "avatar:" + self.account.get("name") + ":" + str(counter) in universe.categories["actor"].keys(): counter += 1 self.avatar = Element("actor:avatar:" + self.account.get("name") + ":" + str(counter), universe) self.avatar.append("inherit", "archetype:avatar") self.account.append("avatars", self.avatar.key) def delete_avatar(self, avatar): """Remove an avatar from the world and from the user's list.""" if self.avatar is universe.contents[avatar]: self.avatar = None universe.contents[avatar].destroy() avatars = self.account.getlist("avatars") avatars.remove(avatar) self.account.set("avatars", avatars) def activate_avatar_by_index(self, index): """Enter the world with a particular indexed avatar.""" self.avatar = universe.contents[self.account.getlist("avatars")[index]] self.avatar.owner = self self.state = "active" self.avatar.go_home() def deactivate_avatar(self): """Have the active avatar leave the world.""" if self.avatar: current = self.avatar.get("location") if current: self.avatar.set("default_location", current) self.avatar.echo_to_location("You suddenly wonder where " + self.avatar.get("name") + " went.") del universe.contents[current].contents[self.avatar.key] self.avatar.remove_facet("location") self.avatar.owner = None self.avatar = None def destroy(self): """Destroy the user and associated avatars.""" for avatar in self.account.getlist("avatars"): self.delete_avatar(avatar) self.account.destroy() def list_avatar_names(self): """List names of assigned avatars.""" return [ universe.contents[avatar].get("name") for avatar in self.account.getlist("avatars") ] def makelist(value): """Turn string into list type.""" if value[0] + value[-1] == "[]": return eval(value) else: return [ value ] def makedict(value): """Turn string into dict type.""" if value[0] + value[-1] == "{}": return eval(value) elif value.find(":") > 0: return eval("{" + value + "}") else: return { value: None } def broadcast(message, add_prompt=True): """Send a message to all connected users.""" for each_user in universe.userlist: each_user.send("$(eol)" + message, add_prompt=add_prompt) def log(message, level=0): """Log a message.""" # a couple references we need file_name = universe.categories["internal"]["logging"].get("file") max_log_lines = universe.categories["internal"]["logging"].getint("max_log_lines") syslog_name = universe.categories["internal"]["logging"].get("syslog") timestamp = asctime()[4:19] # turn the message into a list of lines lines = filter(lambda x: x!="", [(x.rstrip()) for x in message.split("\n")]) # send the timestamp and line to a file if file_name: if not isabs(file_name): file_name = path_join(universe.startdir, file_name) file_descriptor = file(file_name, "a") for line in lines: file_descriptor.write(timestamp + " " + line + "\n") file_descriptor.flush() file_descriptor.close() # send the timestamp and line to standard output if universe.categories["internal"]["logging"].getboolean("stdout"): for line in lines: print(timestamp + " " + line) # send the line to the system log if syslog_name: openlog(syslog_name, LOG_PID, LOG_INFO | LOG_DAEMON) for line in lines: syslog(line) closelog() # display to connected administrators for user in universe.userlist: if user.state == "active" and user.account.getboolean("administrator") and user.account.getint("loglevel") <= level: # iterate over every line in the message full_message = "" for line in lines: full_message += "$(bld)$(red)" + timestamp + " " + line + "$(nrm)$(eol)" user.send(full_message, flush=True) # add to the recent log list for line in lines: while 0 < len(universe.loglines) >= max_log_lines: del universe.loglines[0] universe.loglines.append((level, timestamp + " " + line)) def get_loglines(level, start, stop): """Return a specific range of loglines filtered by level.""" # filter the log lines loglines = filter(lambda x: x[0]>=level, universe.loglines) # we need these in several places total_count = str(len(universe.loglines)) filtered_count = len(loglines) # don't proceed if there are no lines if filtered_count: # can't start before the begining or at the end if start > filtered_count: start = filtered_count if start < 1: start = 1 # can't stop before we start if stop > start: stop = start elif stop < 1: stop = 1 # some preamble message = "There are " + str(total_count) message += " log lines in memory and " + str(filtered_count) message += " at or above level " + str(level) + "." message += " The matching lines from " + str(stop) + " to " message += str(start) + " are:$(eol)$(eol)" # add the text from the selected lines if stop > 1: range_lines = loglines[-start:-(stop-1)] else: range_lines = loglines[-start:] for line in range_lines: message += " (" + str(line[0]) + ") " + line[1] + "$(eol)" # there were no lines else: message = "None of the " + str(total_count) message += " lines in memory matches your request." # pass it back return message def wrap_ansi_text(text, width): """Wrap text with arbitrary width while ignoring ANSI colors.""" # the current position in the entire text string, including all # characters, printable or otherwise absolute_position = 0 # the current text position relative to the begining of the line, # ignoring color escape sequences relative_position = 0 # whether the current character is part of a telnet IAC sequence iac_counter = 0 # whether the current character is part of a color escape sequence escape = False # iterate over each character from the begining of the text for each_character in text: # the current character is the telnet IAC character if each_character == IAC and not iac_counter: iac_counter = 2 # the current character is within an IAC sequence elif iac_counter: # the current character is another IAC, # terminating the sequence if each_character == IAC: iac_counter = 0 # otherwise, decrement the IAC counter else: iac_counter -= 1 # the current character is the escape character elif each_character == chr(27) and not escape: escape = True # the current character is within an escape sequence elif escape: # the current character is m, which terminates the # escape sequence if each_character == "m": escape = False # the current character is a newline, so reset the relative # position (start a new line) elif each_character == "\n": relative_position = 0 # the current character meets the requested maximum line width, # so we need to backtrack and find a space at which to wrap elif relative_position == width and not each_character == "\r": # distance of the current character examined from the # relative position wrap_offset = 0 # count backwards until we find a space while text[absolute_position - wrap_offset] != " ": wrap_offset += 1 # insert an eol in place of the space text = text[:absolute_position - wrap_offset] + "\r\n" + text[absolute_position - wrap_offset + 1:] # increase the absolute position because an eol is two # characters but the space it replaced was only one absolute_position += 1 # now we're at the begining of a new line, plus the # number of characters wrapped from the previous line relative_position = wrap_offset # as long as the character is not a carriage return and the # other above conditions haven't been met, count it as a # printable character elif each_character != "\r": relative_position += 1 # increase the absolute position for every character absolute_position += 1 # return the newly-wrapped text return text def weighted_choice(data): """Takes a dict weighted by value and returns a random key.""" # this will hold our expanded list of keys from the data expanded = [] # create thee expanded list of keys for key in data.keys(): for count in range(data[key]): expanded.append(key) # return one at random return choice(expanded) def random_name(): """Returns a random character name.""" # the vowels and consonants needed to create romaji syllables vowels = [ "a", "i", "u", "e", "o" ] consonants = ["'", "k", "z", "s", "sh", "z", "j", "t", "ch", "ts", "d", "n", "h", "f", "m", "y", "r", "w" ] # this dict will hold our weighted list of syllables syllables = {} # generate the list with an even weighting for consonant in consonants: for vowel in vowels: syllables[consonant + vowel] = 1 # we'll build the name into this string name = "" # create a name of random length from the syllables for syllable in range(randrange(2, 6)): name += weighted_choice(syllables) # strip any leading quotemark, capitalize and return the name return name.strip("'").capitalize() def replace_macros(user, text, is_input=False): """Replaces macros in text output.""" # third person pronouns pronouns = { "female": { "obj": "her", "pos": "hers", "sub": "she" }, "male": { "obj": "him", "pos": "his", "sub": "he" }, "neuter": { "obj": "it", "pos": "its", "sub": "it" } } # a dict of replacement macros macros = { "eol": "\r\n", "bld": chr(27) + "[1m", "nrm": chr(27) + "[0m", "blk": chr(27) + "[30m", "blu": chr(27) + "[34m", "cyn": chr(27) + "[36m", "grn": chr(27) + "[32m", "mgt": chr(27) + "[35m", "red": chr(27) + "[31m", "yel": chr(27) + "[33m", } # add dynamic macros where possible if user.account: account_name = user.account.get("name") if account_name: macros["account"] = account_name if user.avatar: avatar_gender = user.avatar.get("gender") if avatar_gender: macros["tpop"] = pronouns[avatar_gender]["obj"] macros["tppp"] = pronouns[avatar_gender]["pos"] macros["tpsp"] = pronouns[avatar_gender]["sub"] # loop until broken while True: # find and replace per the macros dict macro_start = text.find("$(") if macro_start == -1: break macro_end = text.find(")", macro_start) + 1 macro = text[macro_start+2:macro_end-1] if macro in macros.keys(): replacement = macros[macro] # this is how we handle local file inclusion (dangerous!) elif macro.startswith("inc:"): incfile = path_join(universe.startdir, macro[4:]) if exists(incfile): incfd = file(incfile) replacement = "" for line in incfd: if line.endswith("\n") and not line.endswith("\r\n"): line = line.replace("\n", "\r\n") replacement += line # lose the trailing eol replacement = replacement[:-2] else: replacement = "" log("Couldn't read included " + incfile + " file.", 6) # if we get here, log and replace it with null else: replacement = "" if not is_input: log("Unexpected replacement macro " + macro + " encountered.", 6) # and now we act on the replacement text = text.replace("$(" + macro + ")", replacement) # replace the look-like-a-macro sequence text = text.replace("$_(", "$(") return text def escape_macros(text): """Escapes replacement macros in text.""" return text.replace("$(", "$_(") def first_word(text, separator=" "): """Returns a tuple of the first word and the rest.""" if text: if text.find(separator) > 0: return text.split(separator, 1) else: return text, "" else: return "", "" def on_pulse(): """The things which should happen on each pulse, aside from reloads.""" # open the listening socket if it hasn't been already if not hasattr(universe, "listening_socket"): universe.initialize_server_socket() # assign a user if a new connection is waiting user = check_for_connection(universe.listening_socket) if user: universe.userlist.append(user) # iterate over the connected users for user in universe.userlist: user.pulse() # add an element for counters if it doesn't exist if not "counters" in universe.categories["internal"]: universe.categories["internal"]["counters"] = Element("internal:counters", universe) # update the log every now and then if not universe.categories["internal"]["counters"].getint("mark"): log(str(len(universe.userlist)) + " connection(s)") universe.categories["internal"]["counters"].set("mark", universe.categories["internal"]["time"].getint("frequency_log")) else: universe.categories["internal"]["counters"].set("mark", universe.categories["internal"]["counters"].getint("mark") - 1) # periodically save everything if not universe.categories["internal"]["counters"].getint("save"): universe.save() universe.categories["internal"]["counters"].set("save", universe.categories["internal"]["time"].getint("frequency_save")) else: universe.categories["internal"]["counters"].set("save", universe.categories["internal"]["counters"].getint("save") - 1) # pause for a configurable amount of time (decimal seconds) sleep(universe.categories["internal"]["time"].getfloat("increment")) # increase the elapsed increment counter universe.categories["internal"]["counters"].set("elapsed", universe.categories["internal"]["counters"].getint("elapsed") + 1) def reload_data(): """Reload all relevant objects.""" for user in universe.userlist[:]: user.reload() for element in universe.contents.values(): if element.origin.is_writeable(): element.reload() universe.load() def check_for_connection(listening_socket): """Check for a waiting connection and return a new user object.""" # try to accept a new connection try: connection, address = listening_socket.accept() except: return None # note that we got one log("Connection from " + address[0], 2) # disable blocking so we can proceed whether or not we can send/receive connection.setblocking(0) # create a new user object user = User() # associate this connection with it user.connection = connection # set the user's ipa from the connection's ipa user.address = address[0] # let the client know we WILL EOR user.send(IAC+WILL+EOR, raw=True) user.negotiation_pause = 2 # return the new user object return user def get_menu(state, error=None, echoing=True, terminator="", choices=None): """Show the correct menu text to a user.""" # make sure we don't reuse a mutable sequence by default if choices is None: choices = {} # begin with a telnet echo command sequence if needed message = get_echo_sequence(state, echoing) # get the description or error text message += get_menu_description(state, error) # get menu choices for the current state message += get_formatted_menu_choices(state, choices) # try to get a prompt, if it was defined message += get_menu_prompt(state) # throw in the default choice, if it exists message += get_formatted_default_menu_choice(state) # display a message indicating if echo is off message += get_echo_message(state) # tack on EOR or GA to indicate the prompt will not be followed by CRLF message += terminator # return the assembly of various strings defined above return message def menu_echo_on(state): """True if echo is on, false if it is off.""" return universe.categories["menu"][state].getboolean("echo", True) def get_echo_sequence(state, echoing): """Build the appropriate IAC WILL/WONT ECHO sequence as needed.""" # if the user has echo on and the menu specifies it should be turned # off, send: iac + will + echo + null if echoing and not menu_echo_on(state): return IAC+WILL+ECHO # if echo is not set to off in the menu and the user curently has echo # off, send: iac + wont + echo + null elif not echoing and menu_echo_on(state): return IAC+WONT+ECHO # default is not to send an echo control sequence at all else: return "" def get_echo_message(state): """Return a message indicating that echo is off.""" if menu_echo_on(state): return "" else: return "(won't echo) " def get_default_menu_choice(state): """Return the default choice for a menu.""" return universe.categories["menu"][state].get("default") def get_formatted_default_menu_choice(state): """Default menu choice foratted for inclusion in a prompt string.""" default_choice = get_default_menu_choice(state) if default_choice: return "[$(red)" + default_choice + "$(nrm)] " else: return "" def get_menu_description(state, error): """Get the description or error text.""" # an error condition was raised by the handler if error: # try to get an error message matching the condition # and current state description = universe.categories["menu"][state].get("error_" + error) if not description: description = "That is not a valid choice..." description = "$(red)" + description + "$(nrm)" # there was no error condition else: # try to get a menu description for the current state description = universe.categories["menu"][state].get("description") # return the description or error message if description: description += "$(eol)$(eol)" return description def get_menu_prompt(state): """Try to get a prompt, if it was defined.""" prompt = universe.categories["menu"][state].get("prompt") if prompt: prompt += " " return prompt def get_menu_choices(user): """Return a dict of choice:meaning.""" menu = universe.categories["menu"][user.state] create_choices = menu.get("create") if create_choices: choices = eval(create_choices) else: choices = {} ignores = [] options = {} creates = {} for facet in menu.facets(): if facet.startswith("demand_") and not eval(universe.categories["menu"][user.state].get(facet)): ignores.append(facet.split("_", 2)[1]) elif facet.startswith("create_"): creates[facet] = facet.split("_", 2)[1] elif facet.startswith("choice_"): options[facet] = facet.split("_", 2)[1] for facet in creates.keys(): if not creates[facet] in ignores: choices[creates[facet]] = eval(menu.get(facet)) for facet in options.keys(): if not options[facet] in ignores: choices[options[facet]] = menu.get(facet) return choices def get_formatted_menu_choices(state, choices): """Returns a formatted string of menu choices.""" choice_output = "" choice_keys = choices.keys() choice_keys.sort() for choice in choice_keys: choice_output += " [$(red)" + choice + "$(nrm)] " + choices[choice] + "$(eol)" if choice_output: choice_output += "$(eol)" return choice_output def get_menu_branches(state): """Return a dict of choice:branch.""" branches = {} for facet in universe.categories["menu"][state].facets(): if facet.startswith("branch_"): branches[facet.split("_", 2)[1]] = universe.categories["menu"][state].get(facet) return branches def get_default_branch(state): """Return the default branch.""" return universe.categories["menu"][state].get("branch") def get_choice_branch(user, choice): """Returns the new state matching the given choice.""" branches = get_menu_branches(user.state) if choice in branches.keys(): return branches[choice] elif choice in user.menu_choices.keys(): return get_default_branch(user.state) else: return "" def get_menu_actions(state): """Return a dict of choice:branch.""" actions = {} for facet in universe.categories["menu"][state].facets(): if facet.startswith("action_"): actions[facet.split("_", 2)[1]] = universe.categories["menu"][state].get(facet) return actions def get_default_action(state): """Return the default action.""" return universe.categories["menu"][state].get("action") def get_choice_action(user, choice): """Run any indicated script for the given choice.""" actions = get_menu_actions(user.state) if choice in actions.keys(): return actions[choice] elif choice in user.menu_choices.keys(): return get_default_action(user.state) else: return "" def handle_user_input(user): """The main handler, branches to a state-specific handler.""" # if the user's client echo is off, send a blank line for aesthetics if user.echoing: user.received_newline = True # check to make sure the state is expected, then call that handler if "handler_" + user.state in globals(): exec("handler_" + user.state + "(user)") else: generic_menu_handler(user) # since we got input, flag that the menu/prompt needs to be redisplayed user.menu_seen = False # update the last_input timestamp while we're at it user.last_input = universe.get_time() def generic_menu_handler(user): """A generic menu choice handler.""" # get a lower-case representation of the next line of input if user.input_queue: choice = user.input_queue.pop(0) if choice: choice = choice.lower() else: choice = "" if not choice: choice = get_default_menu_choice(user.state) if choice in user.menu_choices: exec(get_choice_action(user, choice)) new_state = get_choice_branch(user, choice) if new_state: user.state = new_state else: user.error = "default" def handler_entering_account_name(user): """Handle the login account name.""" # get the next waiting line of input input_data = user.input_queue.pop(0) # did the user enter anything? if input_data: # keep only the first word and convert to lower-case name = input_data.lower() # fail if there are non-alphanumeric characters if name != filter(lambda x: x>="0" and x<="9" or x>="a" and x<="z", name): user.error = "bad_name" # if that account exists, time to request a password elif name in universe.categories["account"]: user.account = universe.categories["account"][name] user.state = "checking_password" # otherwise, this could be a brand new user else: user.account = Element("account:" + name, universe) user.account.set("name", name) log("New user: " + name, 2) user.state = "checking_new_account_name" # if the user entered nothing for a name, then buhbye else: user.state = "disconnecting" def handler_checking_password(user): """Handle the login account password.""" # get the next waiting line of input input_data = user.input_queue.pop(0) # does the hashed input equal the stored hash? if new_md5(user.account.get("name") + input_data).hexdigest() == user.account.get("passhash"): # if so, set the username and load from cold storage if not user.replace_old_connections(): user.authenticate() user.state = "main_utility" # if at first your hashes don't match, try, try again elif user.password_tries < universe.categories["internal"]["limits"].getint("password_tries") - 1: user.password_tries += 1 user.error = "incorrect" # we've exceeded the maximum number of password failures, so disconnect else: user.send("$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)") user.state = "disconnecting" def handler_entering_new_password(user): """Handle a new password entry.""" # get the next waiting line of input input_data = user.input_queue.pop(0) # make sure the password is strong--at least one upper, one lower and # one digit, seven or more characters in length if len(input_data) > 6 and len(filter(lambda x: x>="0" and x<="9", input_data)) and len(filter(lambda x: x>="A" and x<="Z", input_data)) and len(filter(lambda x: x>="a" and x<="z", input_data)): # hash and store it, then move on to verification user.account.set("passhash", new_md5(user.account.get("name") + input_data).hexdigest()) user.state = "verifying_new_password" # the password was weak, try again if you haven't tried too many times elif user.password_tries < universe.categories["internal"]["limits"].getint("password_tries") - 1: user.password_tries += 1 user.error = "weak" # too many tries, so adios else: user.send("$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)") user.account.destroy() user.state = "disconnecting" def handler_verifying_new_password(user): """Handle the re-entered new password for verification.""" # get the next waiting line of input input_data = user.input_queue.pop(0) # hash the input and match it to storage if new_md5(user.account.get("name") + input_data).hexdigest() == user.account.get("passhash"): user.authenticate() # the hashes matched, so go active if not user.replace_old_connections(): user.state = "main_utility" # go back to entering the new password as long as you haven't tried # too many times elif user.password_tries < universe.categories["internal"]["limits"].getint("password_tries") - 1: user.password_tries += 1 user.error = "differs" user.state = "entering_new_password" # otherwise, sayonara else: user.send("$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)") user.account.destroy() user.state = "disconnecting" def handler_active(user): """Handle input for active users.""" # get the next waiting line of input input_data = user.input_queue.pop(0) # is there input? if input_data: # split out the command and parameters actor = user.avatar mode = actor.get("mode") if mode and input_data.startswith("!"): command_name, parameters = first_word(input_data[1:]) elif mode == "chat": command_name = "say" parameters = input_data else: command_name, parameters = first_word(input_data) # lowercase the command command_name = command_name.lower() # the command matches a command word for which we have data if command_name in universe.categories["command"]: command = universe.categories["command"][command_name] else: command = None # if it's allowed, do it if actor.can_run(command): exec(command.get("action")) # otherwise, give an error elif command_name: command_error(actor, input_data) # if no input, just idle back with a prompt else: user.send("", just_prompt=True) def command_halt(actor, parameters): """Halt the world.""" if actor.owner: # see if there's a message or use a generic one if parameters: message = "Halting: " + parameters else: message = "User " + actor.owner.account.get("name") + " halted the world." # let everyone know broadcast(message, add_prompt=False) log(message, 8) # set a flag to terminate the world universe.terminate_flag = True def command_reload(actor): """Reload all code modules, configs and data.""" if actor.owner: # let the user know and log actor.send("Reloading all code modules, configs and data.") log("User " + actor.owner.account.get("name") + " reloaded the world.", 8) # set a flag to reload universe.reload_flag = True def command_quit(actor): """Leave the world and go back to the main menu.""" if actor.owner: actor.owner.state = "main_utility" actor.owner.deactivate_avatar() def command_help(actor, parameters): """List available commands and provide help for commands.""" # did the user ask for help on a specific command word? if parameters and actor.owner: # is the command word one for which we have data? if parameters in universe.categories["command"]: command = universe.categories["command"][parameters] else: command = None # only for allowed commands if actor.can_run(command): # add a description if provided description = command.get("description") if not description: description = "(no short description provided)" if command.getboolean("administrative"): output = "$(red)" else: output = "$(grn)" output += parameters + "$(nrm) - " + description + "$(eol)$(eol)" # add the help text if provided help_text = command.get("help") if not help_text: help_text = "No help is provided for this command." output += help_text # list related commands see_also = command.getlist("see_also") if see_also: really_see_also = "" for item in see_also: if item in universe.categories["command"]: command = universe.categories["command"][item] if actor.can_run(command): if really_see_also: really_see_also += ", " if command.getboolean("administrative"): really_see_also += "$(red)" else: really_see_also += "$(grn)" really_see_also += item + "$(nrm)" if really_see_also: output += "$(eol)$(eol)See also: " + really_see_also # no data for the requested command word else: output = "That is not an available command." # no specific command word was indicated else: # give a sorted list of commands with descriptions if provided output = "These are the commands available to you:$(eol)$(eol)" sorted_commands = universe.categories["command"].keys() sorted_commands.sort() for item in sorted_commands: command = universe.categories["command"][item] if actor.can_run(command): description = command.get("description") if not description: description = "(no short description provided)" if command.getboolean("administrative"): output += " $(red)" else: output += " $(grn)" output += item + "$(nrm) - " + description + "$(eol)" output += "$(eol)Enter \"help COMMAND\" for help on a command named \"COMMAND\"." # send the accumulated output to the user actor.send(output) def command_move(actor, parameters): """Move the avatar in a given direction.""" if parameters in universe.contents[actor.get("location")].portals(): actor.move_direction(parameters) else: actor.send("You cannot go that way.") def command_look(actor, parameters): """Look around.""" if parameters: actor.send("You can't look at or in anything yet.") else: actor.look_at(actor.get("location")) def command_say(actor, parameters): """Speak to others in the same room.""" # check for replacement macros if replace_macros(actor.owner, parameters, True) != parameters: actor.send("You cannot speak $_(replacement macros).") # the user entered a message elif parameters: # get rid of quote marks on the ends of the message message = parameters.strip("\"'`") # match the punctuation used, if any, to an action actions = universe.categories["internal"]["language"].getdict("actions") default_punctuation = universe.categories["internal"]["language"].get("default_punctuation") action = "" for mark in actions.keys(): if message.endswith(mark): action = actions[mark] break # add punctuation if needed if not action: action = actions[default_punctuation] if message and not message[-1] in punctuation: message += default_punctuation # decapitalize the first letter to improve matching if message and message[0] in uppercase: message = message[0].lower() + message[1:] # iterate over all words in message, replacing typos typos = universe.categories["internal"]["language"].getdict("typos") words = message.split() for index in range(len(words)): word = words[index] bare_word = word.strip(punctuation) if bare_word in typos.keys(): words[index] = word.replace(bare_word, typos[bare_word]) message = " ".join(words) # capitalize the first letter message = message[0].upper() + message[1:] # tell the room actor.echo_to_location(actor.get("name") + " " + action + "s, \"" + message + "\"") actor.send("You " + action + ", \"" + message + "\"") # there was no message else: actor.send("What do you want to say?") def command_chat(actor): """Toggle chat mode.""" mode = actor.get("mode") if not mode: actor.set("mode", "chat") actor.send("Entering chat mode (use $(grn)!chat$(nrm) to exit).") elif mode == "chat": actor.remove_facet("mode") actor.send("Exiting chat mode.") else: actor.send("Sorry, but you're already busy with something else!") def command_show(actor, parameters): """Show program data.""" message = "" arguments = parameters.split() if not parameters: message = "What do you want to show?" elif arguments[0] == "time": message = universe.categories["internal"]["counters"].get("elapsed") + " increments elapsed since the world was created." elif arguments[0] == "categories": message = "These are the element categories:$(eol)" categories = universe.categories.keys() categories.sort() for category in categories: message += "$(eol) $(grn)" + category + "$(nrm)" elif arguments[0] == "files": message = "These are the current files containing the universe:$(eol)" filenames = universe.files.keys() filenames.sort() for filename in filenames: if universe.files[filename].is_writeable(): status = "rw" else: status = "ro" message += "$(eol) $(red)(" + status + ") $(grn)" + filename + "$(nrm)" elif arguments[0] == "category": if len(arguments) != 2: message = "You must specify one category." elif arguments[1] in universe.categories: message = "These are the elements in the \"" + arguments[1] + "\" category:$(eol)" elements = [(universe.categories[arguments[1]][x].key) for x in universe.categories[arguments[1]].keys()] elements.sort() for element in elements: message += "$(eol) $(grn)" + element + "$(nrm)" else: message = "Category \"" + arguments[1] + "\" does not exist." elif arguments[0] == "file": if len(arguments) != 2: message = "You must specify one file." elif arguments[1] in universe.files: message = "These are the elements in the \"" + arguments[1] + "\" file:$(eol)" elements = universe.files[arguments[1]].data.sections() elements.sort() for element in elements: message += "$(eol) $(grn)" + element + "$(nrm)" else: message = "Category \"" + arguments[1] + "\" does not exist." elif arguments[0] == "element": if len(arguments) != 2: message = "You must specify one element." elif arguments[1] in universe.contents: element = universe.contents[arguments[1]] message = "These are the properties of the \"" + arguments[1] + "\" element (in \"" + element.origin.filename + "\"):$(eol)" facets = element.facets() facets.sort() for facet in facets: message += "$(eol) $(grn)" + facet + ": $(red)" + escape_macros(element.get(facet)) + "$(nrm)" else: message = "Element \"" + arguments[1] + "\" does not exist." elif arguments[0] == "result": if len(arguments) < 2: message = "You need to specify an expression." else: try: message = repr(eval(" ".join(arguments[1:]))) except: message = "Your expression raised an exception!" elif arguments[0] == "log": if len(arguments) == 4: if match("^\d+$", arguments[3]) and int(arguments[3]) >= 0: stop = int(arguments[3]) else: stop = -1 else: stop = 0 if len(arguments) >= 3: if match("^\d+$", arguments[2]) and int(arguments[2]) > 0: start = int(arguments[2]) else: start = -1 else: start = 10 if len(arguments) >= 2: if match("^\d+$", arguments[1]) and 0 <= int(arguments[1]) <= 9: level = int(arguments[1]) else: level = -1 elif 0 <= actor.owner.account.getint("loglevel") <= 9: level = actor.owner.account.getint("loglevel") else: level = 1 if level > -1 and start > -1 and stop > -1: message = get_loglines(level, start, stop) else: message = "When specified, level must be 0-9 (default 1), start and stop must be >=1 (default 10 and 1)." else: message = "I don't know what \"" + parameters + "\" is." actor.send(message) def command_create(actor, parameters): """Create an element if it does not exist.""" if not parameters: message = "You must at least specify an element to create." elif not actor.owner: message = "" else: arguments = parameters.split() if len(arguments) == 1: arguments.append("") if len(arguments) == 2: element, filename = arguments if element in universe.contents: message = "The \"" + element + "\" element already exists." else: message = "You create \"" + element + "\" within the universe." logline = actor.owner.account.get("name") + " created an element: " + element if filename: logline += " in file " + filename if filename not in universe.files: message += " Warning: \"" + filename + "\" is not yet included in any other file and will not be read on startup unless this is remedied." Element(element, universe, filename) log(logline, 6) elif len(arguments) > 2: message = "You can only specify an element and a filename." actor.send(message) def command_destroy(actor, parameters): """Destroy an element if it exists.""" if actor.owner: if not parameters: message = "You must specify an element to destroy." else: if parameters not in universe.contents: message = "The \"" + parameters + "\" element does not exist." else: universe.contents[parameters].destroy() message = "You destroy \"" + parameters + "\" within the universe." log(actor.owner.account.get("name") + " destroyed an element: " + parameters, 6) actor.send(message) def command_set(actor, parameters): """Set a facet of an element.""" if not parameters: message = "You must specify an element, a facet and a value." else: arguments = parameters.split(" ", 2) if len(arguments) == 1: message = "What facet of element \"" + arguments[0] + "\" would you like to set?" elif len(arguments) == 2: message = "What value would you like to set for the \"" + arguments[1] + "\" facet of the \"" + arguments[0] + "\" element?" else: element, facet, value = arguments if element not in universe.contents: message = "The \"" + element + "\" element does not exist." else: universe.contents[element].set(facet, value) message = "You have successfully (re)set the \"" + facet + "\" facet of element \"" + element + "\". Try \"show element " + element + "\" for verification." actor.send(message) def command_delete(actor, parameters): """Delete a facet from an element.""" if not parameters: message = "You must specify an element and a facet." else: arguments = parameters.split(" ") if len(arguments) == 1: message = "What facet of element \"" + arguments[0] + "\" would you like to delete?" elif len(arguments) != 2: message = "You may only specify an element and a facet." else: element, facet = arguments if element not in universe.contents: message = "The \"" + element + "\" element does not exist." elif facet not in universe.contents[element].facets(): message = "The \"" + element + "\" element has no \"" + facet + "\" facet." else: universe.contents[element].remove_facet(facet) message = "You have successfully deleted the \"" + facet + "\" facet of element \"" + element + "\". Try \"show element " + element + "\" for verification." actor.send(message) def command_error(actor, input_data): """Generic error for an unrecognized command word.""" # 90% of the time use a generic error if randrange(10): message = "I'm not sure what \"" + input_data + "\" means..." # 10% of the time use the classic diku error else: message = "Arglebargle, glop-glyf!?!" # send the error message actor.send(message) def daemonize(): """Fork and disassociate from everything.""" if universe.contents["internal:process"].getboolean("daemon"): import sys from resource import getrlimit, RLIMIT_NOFILE log("Disassociating from the controlling terminal.") if fork(): _exit(0) setsid() if fork(): _exit(0) chdir("/") umask(0) for stdpipe in range(3): close(stdpipe) sys.stdin = sys.__stdin__ = file("/dev/null", "r") sys.stdout = sys.stderr = sys.__stdout__ = sys.__stderr__ = file("/dev/null", "w") def create_pidfile(universe): """Write a file containing the current process ID.""" pid = str(getpid()) log("Process ID: " + pid) file_name = universe.contents["internal:process"].get("pidfile") if file_name: if not isabs(file_name): file_name = path_join(universe.startdir, file_name) file_descriptor = file(file_name, 'w') file_descriptor.write(pid + "\n") file_descriptor.flush() file_descriptor.close() def remove_pidfile(universe): """Remove the file containing the current process ID.""" file_name = universe.contents["internal:process"].get("pidfile") if file_name: if not isabs(file_name): file_name = path_join(universe.startdir, file_name) if access(file_name, W_OK): remove(file_name) def excepthook(excepttype, value, traceback): """Handle uncaught exceptions.""" # assemble the list of errors into a single string message = "".join(format_exception(excepttype, value, traceback)) # try to log it, if possible try: log(message, 9) except: pass # try to write it to stderr, if possible try: stderr.write(message) except: pass def sighook(what, where): """Handle external signals.""" # a generic message message = "Caught signal: " # for a hangup signal if what == SIGHUP: message += "hangup (reloading)" universe.reload_flag = True # for a terminate signal elif what == SIGTERM: message += "terminate (halting)" universe.terminate_flag = True # catchall for unexpected signals else: message += str(what) + " (unhandled)" # log what happened log(message, 8) # redefine sys.excepthook with ours import sys sys.excepthook = excepthook # assign the sgnal handlers signal(SIGHUP, sighook) signal(SIGTERM, sighook) # if there is no universe, create an empty one if not "universe" in locals(): if len(argv) > 1: conffile = argv[1] else: conffile = "" universe = Universe(conffile, True) elif universe.reload_flag: universe = universe.new() reload_data()