# -*- coding: utf-8 -*- u"""Miscellaneous functions for the mudpy engine.""" # Copyright (c) 2004-2011 Jeremy Stanley . Permission # to use, copy, modify, and distribute this software is granted under # terms provided in the LICENSE file distributed with this software. class Element: u"""An element of the universe.""" def __init__(self, key, universe, filename=None): u"""Set up a new element.""" import data, os.path # keep track of our key name self.key = key # keep track of what universe it's loading into self.universe = universe # clone attributes if this is replacing another element if self.key in self.universe.contents: old_element = self.universe.contents[self.key] for attribute in vars(old_element).keys(): exec(u"self." + attribute + u" = old_element." + attribute) if self.owner: self.owner.avatar = self # i guess this is a new element then else: # not owned by a user by default (used for avatars) self.owner = None # no contents in here by default self.contents = {} # parse out appropriate category and subkey names, add to list if self.key.find(u":") > 0: self.category, self.subkey = self.key.split(u":", 1) else: self.category = u"other" self.subkey = self.key if not self.category in self.universe.categories: self.category = u"other" self.subkey = self.key # get an appropriate filename for the origin if not filename: filename = self.universe.default_origins[self.category] if not os.path.isabs(filename): filename = os.path.abspath(filename) # add the file if it doesn't exist yet if not filename in self.universe.files: data.DataFile(filename, self.universe) # record or reset a pointer to the origin file self.origin = self.universe.files[filename] # add a data section to the origin if necessary if not self.origin.data.has_section(self.key): self.origin.data.add_section(self.key) # add or replace this element in the universe self.universe.contents[self.key] = self self.universe.categories[self.category][self.subkey] = self def reload(self): u"""Create a new element and replace this one.""" new_element = Element(self.key, self.universe, self.origin.filename) del(self) def destroy(self): u"""Remove an element from the universe and destroy it.""" self.origin.data.remove_section(self.key) del self.universe.categories[self.category][self.subkey] del self.universe.contents[self.key] del self def facets(self): u"""Return a list of non-inherited facets for this element.""" if self.key in self.origin.data.sections(): return self.origin.data.options(self.key) else: return [] def has_facet(self, facet): u"""Return whether the non-inherited facet exists.""" return facet in self.facets() def remove_facet(self, facet): u"""Remove a facet from the element.""" if self.has_facet(facet): self.origin.data.remove_option(self.key, facet) self.origin.modified = True def ancestry(self): u"""Return a list of the element's inheritance lineage.""" if self.has_facet(u"inherit"): ancestry = self.getlist(u"inherit") for parent in ancestry[:]: ancestors = self.universe.contents[parent].ancestry() for ancestor in ancestors: if ancestor not in ancestry: ancestry.append(ancestor) return ancestry else: return [] def get(self, facet, default=None): u"""Retrieve values.""" if default is None: default = u"" if self.origin.data.has_option(self.key, facet): raw_data = self.origin.data.get(self.key, facet) if raw_data.startswith(u"u\"") or raw_data.startswith(u"u'"): raw_data = raw_data[1:] raw_data.strip(u"\"'") if type(raw_data) == str: return unicode(raw_data, "utf-8") else: return raw_data elif self.has_facet(u"inherit"): for ancestor in self.ancestry(): if self.universe.contents[ancestor].has_facet(facet): return self.universe.contents[ancestor].get(facet) else: return default def getboolean(self, facet, default=None): u"""Retrieve values as boolean type.""" if default is None: default=False if self.origin.data.has_option(self.key, facet): return self.origin.data.getboolean(self.key, facet) elif self.has_facet(u"inherit"): for ancestor in self.ancestry(): if self.universe.contents[ancestor].has_facet(facet): return self.universe.contents[ancestor].getboolean(facet) else: return default def getint(self, facet, default=None): u"""Return values as int/long type.""" if default is None: default = 0 if self.origin.data.has_option(self.key, facet): return self.origin.data.getint(self.key, facet) elif self.has_facet(u"inherit"): for ancestor in self.ancestry(): if self.universe.contents[ancestor].has_facet(facet): return self.universe.contents[ancestor].getint(facet) else: return default def getfloat(self, facet, default=None): u"""Return values as float type.""" if default is None: default = 0.0 if self.origin.data.has_option(self.key, facet): return self.origin.data.getfloat(self.key, facet) elif self.has_facet(u"inherit"): for ancestor in self.ancestry(): if self.universe.contents[ancestor].has_facet(facet): return self.universe.contents[ancestor].getfloat(facet) else: return default def getlist(self, facet, default=None): u"""Return values as list type.""" import data if default is None: default = [] value = self.get(facet) if value: return data.makelist(value) else: return default def getdict(self, facet, default=None): u"""Return values as dict type.""" import data if default is None: default = {} value = self.get(facet) if value: return data.makedict(value) else: return default def set(self, facet, value): u"""Set values.""" if not self.has_facet(facet) or not self.get(facet) == value: if type(value) is long: value = unicode(value) elif not type(value) is unicode: value = repr(value) self.origin.data.set(self.key, facet, value) self.origin.modified = True def append(self, facet, value): u"""Append value tp a list.""" if type(value) is long: value = unicode(value) elif not type(value) is unicode: value = repr(value) newlist = self.getlist(facet) newlist.append(value) self.set(facet, newlist) def new_event(self, action, when=None): u"""Create, attach and enqueue an event element.""" # if when isn't specified, that means now if not when: when = self.universe.get_time() # events are elements themselves event = Element(u"event:" + self.key + u":" + counter) def send( self, message, eol=u"$(eol)", raw=False, flush=False, add_prompt=True, just_prompt=False, add_terminator=False, prepend_padding=True ): u"""Convenience method to pass messages to an owner.""" if self.owner: self.owner.send( message, eol, raw, flush, add_prompt, just_prompt, add_terminator, prepend_padding ) def can_run(self, command): u"""Check if the user can run this command object.""" # has to be in the commands category if command not in self.universe.categories[u"command"].values(): result = False # avatars of administrators can run any command elif self.owner and self.owner.account.getboolean(u"administrator"): result = True # everyone can run non-administrative commands elif not command.getboolean(u"administrative"): result = True # otherwise the command cannot be run by this actor else: result = False # pass back the result return result def update_location(self): u"""Make sure the location's contents contain this element.""" location = self.get(u"location") if location in self.universe.contents: self.universe.contents[location].contents[self.key] = self def clean_contents(self): u"""Make sure the element's contents aren't bogus.""" for element in self.contents.values(): if element.get(u"location") != self.key: del self.contents[element.key] def go_to(self, location): u"""Relocate the element to a specific location.""" current = self.get(u"location") if current and self.key in self.universe.contents[current].contents: del universe.contents[current].contents[self.key] if location in self.universe.contents: self.set(u"location", location) self.universe.contents[location].contents[self.key] = self self.look_at(location) def go_home(self): u"""Relocate the element to its default location.""" self.go_to(self.get(u"default_location")) self.echo_to_location( u"You suddenly realize that " + self.get(u"name") + u" is here." ) def move_direction(self, direction): u"""Relocate the element in a specified direction.""" self.echo_to_location( self.get( u"name" ) + u" exits " + self.universe.categories[ u"internal" ][ u"directions" ].getdict( direction )[ u"exit" ] + u"." ) self.send( u"You exit " + self.universe.categories[ u"internal" ][ u"directions" ].getdict( direction )[ u"exit" ] + u".", add_prompt=False ) self.go_to( self.universe.contents[self.get(u"location")].link_neighbor(direction) ) self.echo_to_location( self.get( u"name" ) + u" arrives from " + self.universe.categories[ u"internal" ][ u"directions" ].getdict( direction )[ u"enter" ] + u"." ) def look_at(self, key): u"""Show an element to another element.""" if self.owner: element = self.universe.contents[key] message = u"" name = element.get(u"name") if name: message += u"$(cyn)" + name + u"$(nrm)$(eol)" description = element.get(u"description") if description: message += description + u"$(eol)" portal_list = element.portals().keys() if portal_list: portal_list.sort() message += u"$(cyn)[ Exits: " + u", ".join( portal_list ) + u" ]$(nrm)$(eol)" for element in self.universe.contents[ self.get(u"location") ].contents.values(): if element.getboolean(u"is_actor") and element is not self: message += u"$(yel)" + element.get( u"name" ) + u" is here.$(nrm)$(eol)" elif element is not self: message += u"$(grn)" + element.get( u"impression" ) + u"$(nrm)$(eol)" self.send(message) def portals(self): u"""Map the portal directions for an area to neighbors.""" import re portals = {} if re.match(u"""^location:-?\d+,-?\d+,-?\d+$""", self.key): coordinates = [(int(x)) for x in self.key.split(u":")[1].split(u",")] directions = self.universe.categories[u"internal"][u"directions"] offsets = dict( [ ( x, directions.getdict(x)[u"vector"] ) for x in directions.facets() ] ) for portal in self.getlist(u"gridlinks"): adjacent = map(lambda c,o: c+o, coordinates, offsets[portal]) neighbor = u"location:" + u",".join( [(unicode(x)) for x in adjacent] ) if neighbor in self.universe.contents: portals[portal] = neighbor for facet in self.facets(): if facet.startswith(u"link_"): neighbor = self.get(facet) if neighbor in self.universe.contents: portal = facet.split(u"_")[1] portals[portal] = neighbor return portals def link_neighbor(self, direction): u"""Return the element linked in a given direction.""" portals = self.portals() if direction in portals: return portals[direction] def echo_to_location(self, message): u"""Show a message to other elements in the current location.""" for element in self.universe.contents[ self.get(u"location") ].contents.values(): if element is not self: element.send(message) class Universe: u"""The universe.""" def __init__(self, filename=u"", load=False): u"""Initialize the universe.""" import os, os.path self.categories = {} self.contents = {} self.default_origins = {} self.loglines = [] self.pending_events_long = {} self.pending_events_short = {} self.private_files = [] self.reload_flag = False self.startdir = os.getcwd() self.terminate_flag = False self.userlist = [] if not filename: possible_filenames = [ u".mudpyrc", u".mudpy/mudpyrc", u".mudpy/mudpy.conf", u"mudpy.conf", u"etc/mudpy.conf", u"/usr/local/mudpy/mudpy.conf", u"/usr/local/mudpy/etc/mudpy.conf", u"/etc/mudpy/mudpy.conf", u"/etc/mudpy.conf" ] for filename in possible_filenames: if os.access(filename, os.R_OK): break if not os.path.isabs(filename): filename = os.path.join(self.startdir, filename) self.filename = filename if load: self.load() def load(self): u"""Load universe data from persistent storage.""" import data # the files dict must exist and filename needs to be read-only if not hasattr( self, u"files" ) or not ( self.filename in self.files and self.files[ self.filename ].is_writeable() ): # clear out all read-only files if hasattr(self, u"files"): for data_filename in self.files.keys(): if not self.files[data_filename].is_writeable(): del self.files[data_filename] # start loading from the initial file data.DataFile(self.filename, self) # make a list of inactive avatars inactive_avatars = [] for account in self.categories[u"account"].values(): inactive_avatars += [ (self.contents[x]) for x in account.getlist(u"avatars") ] for user in self.userlist: if user.avatar in inactive_avatars: inactive_avatars.remove(user.avatar) # go through all elements to clear out inactive avatar locations for element in self.contents.values(): location = element.get(u"location") if element in inactive_avatars and location: if location in self.contents and element.key in self.contents[ location ].contents: del self.contents[location].contents[element.key] element.set(u"default_location", location) element.remove_facet(u"location") # another pass to straighten out all the element contents for element in self.contents.values(): element.update_location() element.clean_contents() def new(self): u"""Create a new, empty Universe (the Big Bang).""" new_universe = Universe() for attribute in vars(self).keys(): exec(u"new_universe." + attribute + u" = self." + attribute) new_universe.reload_flag = False del self return new_universe def save(self): u"""Save the universe to persistent storage.""" for key in self.files: self.files[key].save() def initialize_server_socket(self): u"""Create and open the listening socket.""" import socket # need to know the local address and port number for the listener host = self.categories[u"internal"][u"network"].get(u"host") port = self.categories[u"internal"][u"network"].getint(u"port") # if no host was specified, bind to all local addresses (preferring ipv6) if not host: if socket.has_ipv6: host = u"::" else: host = u"0.0.0.0" # figure out if this is ipv4 or v6 family = socket.getaddrinfo(host, port)[0][0] if family is socket.AF_INET6 and not socket.has_ipv6: log(u"No support for IPv6 address %s (use IPv4 instead)." % host) # create a new stream-type socket object self.listening_socket = socket.socket(family, socket.SOCK_STREAM) # set the socket options to allow existing open ones to be # reused (fixes a bug where the server can't bind for a minute # when restarting on linux systems) self.listening_socket.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 ) # bind the socket to to our desired server ipa and port self.listening_socket.bind((host, port)) # disable blocking so we can proceed whether or not we can # send/receive self.listening_socket.setblocking(0) # start listening on the socket self.listening_socket.listen(1) # note that we're now ready for user connections log( u"Listening for Telnet connections on: " + host + u":" + unicode(port) ) def get_time(self): u"""Convenience method to get the elapsed time counter.""" return self.categories[u"internal"][u"counters"].getint(u"elapsed") class User: u"""This is a connected user.""" def __init__(self): u"""Default values for the in-memory user variables.""" import telnet self.account = None self.address = u"" self.authenticated = False self.avatar = None self.columns = 79 self.connection = None self.error = u"" self.input_queue = [] self.last_address = u"" self.last_input = universe.get_time() self.menu_choices = {} self.menu_seen = False self.negotiation_pause = 0 self.output_queue = [] self.partial_input = "" self.password_tries = 0 self.state = u"initial" self.telopts = {} def quit(self): u"""Log, close the connection and remove.""" if self.account: name = self.account.get(u"name") else: name = u"" if name: message = u"User " + name else: message = u"An unnamed user" message += u" logged out." log(message, 2) self.deactivate_avatar() self.connection.close() self.remove() def check_idle(self): u"""Warn or disconnect idle users as appropriate.""" idletime = universe.get_time() - self.last_input linkdead_dict = universe.categories[u"internal"][u"time"].getdict( u"linkdead" ) if self.state in linkdead_dict: linkdead_state = self.state else: linkdead_state = u"default" if idletime > linkdead_dict[linkdead_state]: self.send( u"$(eol)$(red)You've done nothing for far too long... goodbye!" \ + u"$(nrm)$(eol)", flush=True, add_prompt=False ) logline = u"Disconnecting " if self.account and self.account.get(u"name"): logline += self.account.get(u"name") else: logline += u"an unknown user" logline += u" after idling too long in the " + self.state + u" state." log(logline, 2) self.state = u"disconnecting" self.menu_seen = False idle_dict = universe.categories[u"internal"][u"time"].getdict(u"idle") if self.state in idle_dict: idle_state = self.state else: idle_state = u"default" if idletime == idle_dict[idle_state]: self.send( u"$(eol)$(red)If you continue to be unproductive, " \ + u"you'll be shown the door...$(nrm)$(eol)" ) def reload(self): u"""Save, load a new user and relocate the connection.""" # get out of the list self.remove() # create a new user object new_user = User() # set everything equivalent for attribute in vars(self).keys(): exec(u"new_user." + attribute + u" = self." + attribute) # the avatar needs a new owner if new_user.avatar: new_user.avatar.owner = new_user # add it to the list universe.userlist.append(new_user) # get rid of the old user object del(self) def replace_old_connections(self): u"""Disconnect active users with the same name.""" # the default return value return_value = False # iterate over each user in the list for old_user in universe.userlist: # the name is the same but it's not us if hasattr( old_user, u"account" ) and old_user.account and old_user.account.get( u"name" ) == self.account.get( u"name" ) and old_user is not self: # make a note of it log( u"User " + self.account.get( u"name" ) + u" reconnected--closing old connection to " \ + old_user.address + u".", 2 ) old_user.send( u"$(eol)$(red)New connection from " + self.address \ + u". Terminating old connection...$(nrm)$(eol)", \ flush=True, add_prompt=False ) # close the old connection old_user.connection.close() # replace the old connection with this one old_user.send( u"$(eol)$(red)Taking over old connection from " \ + old_user.address + u".$(nrm)" ) old_user.connection = self.connection old_user.last_address = old_user.address old_user.address = self.address # take this one out of the list and delete self.remove() del(self) return_value = True break # true if an old connection was replaced, false if not return return_value def authenticate(self): u"""Flag the user as authenticated and disconnect duplicates.""" if not self.state is u"authenticated": log(u"User " + self.account.get(u"name") + u" logged in.", 2) self.authenticated = True if self.account.subkey in universe.categories[ u"internal" ][ u"limits" ].getlist( u"default_admins" ): self.account.set(u"administrator", u"True") def show_menu(self): u"""Send the user their current menu.""" if not self.menu_seen: self.menu_choices = get_menu_choices(self) self.send( get_menu(self.state, self.error, self.menu_choices), u"", add_terminator=True ) self.menu_seen = True self.error = False self.adjust_echoing() def adjust_echoing(self): u"""Adjust echoing to match state menu requirements.""" import telnet if telnet.is_enabled(self, telnet.TELOPT_ECHO, telnet.US): if menu_echo_on(self.state): telnet.disable(self, telnet.TELOPT_ECHO, telnet.US) elif not menu_echo_on(self.state): telnet.enable(self, telnet.TELOPT_ECHO, telnet.US) def remove(self): u"""Remove a user from the list of connected users.""" universe.userlist.remove(self) def send( self, output, eol=u"$(eol)", raw=False, flush=False, add_prompt=True, just_prompt=False, add_terminator=False, prepend_padding=True ): u"""Send arbitrary text to a connected user.""" import telnet # unless raw mode is on, clean it up all nice and pretty if not raw: # strip extra $(eol) off if present while output.startswith(u"$(eol)"): output = output[6:] while output.endswith(u"$(eol)"): output = output[:-6] extra_lines = output.find(u"$(eol)$(eol)$(eol)") while extra_lines > -1: output = output[:extra_lines] + output[extra_lines+6:] extra_lines = output.find(u"$(eol)$(eol)$(eol)") # start with a newline, append the message, then end # with the optional eol string passed to this function # and the ansi escape to return to normal text if not just_prompt and prepend_padding: if not self.output_queue \ or not self.output_queue[-1].endswith("\r\n"): output = u"$(eol)" + output elif not self.output_queue[-1].endswith( "\r\n\x1b[0m\r\n" ) and not self.output_queue[-1].endswith( "\r\n\r\n" ): output = u"$(eol)" + output output += eol + unichr(27) + u"[0m" # tack on a prompt if active if self.state == u"active": if not just_prompt: output += u"$(eol)" if add_prompt: output += u"> " mode = self.avatar.get(u"mode") if mode: output += u"(" + mode + u") " # find and replace macros in the output output = replace_macros(self, output) # wrap the text at the client's width (min 40, 0 disables) if self.columns: if self.columns < 40: wrap = 40 else: wrap = self.columns output = wrap_ansi_text(output, wrap) # if supported by the client, encode it utf-8 if telnet.is_enabled(self, telnet.TELOPT_BINARY, telnet.US): encoded_output = output.encode(u"utf-8") # otherwise just send ascii else: encoded_output = output.encode(u"ascii", u"replace") # end with a terminator if requested if add_prompt or add_terminator: if telnet.is_enabled(self, telnet.TELOPT_EOR, telnet.US): encoded_output += telnet.telnet_proto(telnet.IAC, telnet.EOR) elif not telnet.is_enabled(self, telnet.TELOPT_SGA, telnet.US): encoded_output += telnet.telnet_proto(telnet.IAC, telnet.GA) # and tack it onto the queue self.output_queue.append(encoded_output) # if this is urgent, flush all pending output if flush: self.flush() # just dump raw bytes as requested else: self.output_queue.append(output) self.flush() def pulse(self): u"""All the things to do to the user per increment.""" # if the world is terminating, disconnect if universe.terminate_flag: self.state = u"disconnecting" self.menu_seen = False # check for an idle connection and act appropriately else: self.check_idle() # if output is paused, decrement the counter if self.state == u"initial": if self.negotiation_pause: self.negotiation_pause -= 1 else: self.state = u"entering_account_name" # show the user a menu as needed elif not self.state == u"active": self.show_menu() # flush any pending output in the queue self.flush() # disconnect users with the appropriate state if self.state == u"disconnecting": self.quit() # check for input and add it to the queue self.enqueue_input() # there is input waiting in the queue if self.input_queue: handle_user_input(self) def flush(self): u"""Try to send the last item in the queue and remove it.""" if self.output_queue: try: self.connection.send(self.output_queue[0]) del self.output_queue[0] except: if self.account and self.account.get(u"name"): account = self.account.get(u"name") else: account = u"an unknown user" log( u"Sending to %s raised an exception (broken pipe?)." % account, 7 ) pass def enqueue_input(self): u"""Process and enqueue any new input.""" import telnet, unicodedata # check for some input try: raw_input = self.connection.recv(1024) except: raw_input = "" # we got something if raw_input: # tack this on to any previous partial self.partial_input += raw_input # reply to and remove any IAC negotiation codes telnet.negotiate_telnet_options(self) # separate multiple input lines new_input_lines = self.partial_input.split("\n") # if input doesn't end in a newline, replace the # held partial input with the last line of it if not self.partial_input.endswith("\n"): self.partial_input = new_input_lines.pop() # otherwise, chop off the extra null input and reset # the held partial input else: new_input_lines.pop() self.partial_input = "" # iterate over the remaining lines for line in new_input_lines: # strip off extra whitespace line = line.strip() # make sure it's valid unicode (probably no longer needed) try: unicode(line, u"utf-8") except UnicodeDecodeError: logline = u"Non-unicode data from " if self.account and self.account.get(u"name"): logline += self.account.get(u"name") + u": " else: logline += u"unknown user: " logline += repr(line) log(logline, 4) line = "" # log non-printable characters remaining if telnet.is_enabled(self, telnet.TELOPT_BINARY, telnet.HIM): asciiline = filter(lambda x: " " <= x <= "~", line) if line != asciiline: logline = u"Non-ASCII characters from " if self.account and self.account.get(u"name"): logline += self.account.get(u"name") + u": " else: logline += u"unknown user: " logline += repr(line) log(logline, 4) line = asciiline # put on the end of the queue self.input_queue.append( unicodedata.normalize( u"NFKC", unicode(line, u"utf-8") ) ) def new_avatar(self): u"""Instantiate a new, unconfigured avatar for this user.""" counter = 0 while u"avatar:" + self.account.get(u"name") + u":" + unicode( counter ) in universe.categories[u"actor"].keys(): counter += 1 self.avatar = Element( u"actor:avatar:" + self.account.get(u"name") + u":" + unicode( counter ), universe ) self.avatar.append(u"inherit", u"archetype:avatar") self.account.append(u"avatars", self.avatar.key) def delete_avatar(self, avatar): u"""Remove an avatar from the world and from the user's list.""" if self.avatar is universe.contents[avatar]: self.avatar = None universe.contents[avatar].destroy() avatars = self.account.getlist(u"avatars") avatars.remove(avatar) self.account.set(u"avatars", avatars) def activate_avatar_by_index(self, index): u"""Enter the world with a particular indexed avatar.""" self.avatar = universe.contents[self.account.getlist(u"avatars")[index]] self.avatar.owner = self self.state = u"active" self.avatar.go_home() def deactivate_avatar(self): u"""Have the active avatar leave the world.""" if self.avatar: current = self.avatar.get(u"location") if current: self.avatar.set(u"default_location", current) self.avatar.echo_to_location( u"You suddenly wonder where " + self.avatar.get( u"name" ) + u" went." ) del universe.contents[current].contents[self.avatar.key] self.avatar.remove_facet(u"location") self.avatar.owner = None self.avatar = None def destroy(self): u"""Destroy the user and associated avatars.""" for avatar in self.account.getlist(u"avatars"): self.delete_avatar(avatar) self.account.destroy() def list_avatar_names(self): u"""List names of assigned avatars.""" return [ universe.contents[avatar].get( u"name" ) for avatar in self.account.getlist(u"avatars") ] def broadcast(message, add_prompt=True): u"""Send a message to all connected users.""" for each_user in universe.userlist: each_user.send(u"$(eol)" + message, add_prompt=add_prompt) def log(message, level=0): u"""Log a message.""" import codecs, os.path, syslog, time # a couple references we need file_name = universe.categories[u"internal"][u"logging"].get(u"file") max_log_lines = universe.categories[u"internal"][u"logging"].getint( u"max_log_lines" ) syslog_name = universe.categories[u"internal"][u"logging"].get(u"syslog") timestamp = time.asctime()[4:19] # turn the message into a list of lines lines = filter( lambda x: x!=u"", [ (x.rstrip()) for x in message.split(u"\n") ] ) # send the timestamp and line to a file if file_name: if not os.path.isabs(file_name): file_name = os.path.join(universe.startdir, file_name) file_descriptor = codecs.open(file_name, u"a", u"utf-8") for line in lines: file_descriptor.write(timestamp + u" " + line + u"\n") file_descriptor.flush() file_descriptor.close() # send the timestamp and line to standard output if universe.categories[u"internal"][u"logging"].getboolean(u"stdout"): for line in lines: print(timestamp + u" " + line) # send the line to the system log if syslog_name: syslog.openlog( syslog_name.encode("utf-8"), syslog.LOG_PID, syslog.LOG_INFO | syslog.LOG_DAEMON ) for line in lines: syslog.syslog(line) syslog.closelog() # display to connected administrators for user in universe.userlist: if user.state == u"active" and user.account.getboolean( u"administrator" ) and user.account.getint(u"loglevel") <= level: # iterate over every line in the message full_message = u"" for line in lines: full_message += u"$(bld)$(red)" + timestamp + u" " + line.replace( u"$(", u"$_(" ) + u"$(nrm)$(eol)" user.send(full_message, flush=True) # add to the recent log list for line in lines: while 0 < len(universe.loglines) >= max_log_lines: del universe.loglines[0] universe.loglines.append((level, timestamp + u" " + line)) def get_loglines(level, start, stop): u"""Return a specific range of loglines filtered by level.""" # filter the log lines loglines = filter(lambda x: x[0]>=level, universe.loglines) # we need these in several places total_count = unicode(len(universe.loglines)) filtered_count = len(loglines) # don't proceed if there are no lines if filtered_count: # can't start before the begining or at the end if start > filtered_count: start = filtered_count if start < 1: start = 1 # can't stop before we start if stop > start: stop = start elif stop < 1: stop = 1 # some preamble message = u"There are " + unicode(total_count) message += u" log lines in memory and " + unicode(filtered_count) message += u" at or above level " + unicode(level) + u"." message += u" The matching lines from " + unicode(stop) + u" to " message += unicode(start) + u" are:$(eol)$(eol)" # add the text from the selected lines if stop > 1: range_lines = loglines[-start:-(stop-1)] else: range_lines = loglines[-start:] for line in range_lines: message += u" (" + unicode(line[0]) + u") " + line[1].replace( u"$(", u"$_(" ) + u"$(eol)" # there were no lines else: message = u"None of the " + unicode(total_count) message += u" lines in memory matches your request." # pass it back return message def glyph_columns(character): u"""Convenience function to return the column width of a glyph.""" import unicodedata if unicodedata.east_asian_width(character) in u"FW": return 2 else: return 1 def wrap_ansi_text(text, width): u"""Wrap text with arbitrary width while ignoring ANSI colors.""" import unicodedata # the current position in the entire text string, including all # characters, printable or otherwise absolute_position = 0 # the current text position relative to the begining of the line, # ignoring color escape sequences relative_position = 0 # the absolute position of the most recent whitespace character last_whitespace = 0 # whether the current character is part of a color escape sequence escape = False # normalize any potentially composited unicode before we count it text = unicodedata.normalize(u"NFKC", text) # iterate over each character from the begining of the text for each_character in text: # the current character is the escape character if each_character == u"\x1b" and not escape: escape = True # the current character is within an escape sequence elif escape: # the current character is m, which terminates the # escape sequence if each_character == u"m": escape = False # the current character is a newline, so reset the relative # position (start a new line) elif each_character == u"\n": relative_position = 0 last_whitespace = absolute_position # the current character meets the requested maximum line width, # so we need to backtrack and find a space at which to wrap; # special care is taken to avoid an off-by-one in case the # current character is a double-width glyph elif each_character != u"\r" and ( relative_position >= width or ( relative_position >= width -1 and glyph_columns( each_character ) == 2 ) ): # it's always possible we landed on whitespace if unicodedata.category(each_character) in (u"Cc",u"Zs"): last_whitespace = absolute_position # insert an eol in place of the space text = text[:last_whitespace] + u"\r\n" + text[last_whitespace + 1:] # increase the absolute position because an eol is two # characters but the space it replaced was only one absolute_position += 1 # now we're at the begining of a new line, plus the # number of characters wrapped from the previous line relative_position = 0 for remaining_characters in text[last_whitespace:absolute_position]: relative_position += glyph_columns(remaining_characters) # as long as the character is not a carriage return and the # other above conditions haven't been met, count it as a # printable character elif each_character != u"\r": relative_position += glyph_columns(each_character) if unicodedata.category(each_character) in (u"Cc",u"Zs"): last_whitespace = absolute_position # increase the absolute position for every character absolute_position += 1 # return the newly-wrapped text return text def weighted_choice(data): u"""Takes a dict weighted by value and returns a random key.""" import random # this will hold our expanded list of keys from the data expanded = [] # create the expanded list of keys for key in data.keys(): for count in range(data[key]): expanded.append(key) # return one at random return random.choice(expanded) def random_name(): u"""Returns a random character name.""" import random # the vowels and consonants needed to create romaji syllables vowels = [ u"a", u"i", u"u", u"e", u"o" ] consonants = [ u"'", u"k", u"z", u"s", u"sh", u"z", u"j", u"t", u"ch", u"ts", u"d", u"n", u"h", u"f", u"m", u"y", u"r", u"w" ] # this dict will hold our weighted list of syllables syllables = {} # generate the list with an even weighting for consonant in consonants: for vowel in vowels: syllables[consonant + vowel] = 1 # we'll build the name into this string name = u"" # create a name of random length from the syllables for syllable in range(random.randrange(2, 6)): name += weighted_choice(syllables) # strip any leading quotemark, capitalize and return the name return name.strip(u"'").capitalize() def replace_macros(user, text, is_input=False): u"""Replaces macros in text output.""" import codecs, data, os.path # third person pronouns pronouns = { u"female": { u"obj": u"her", u"pos": u"hers", u"sub": u"she" }, u"male": { u"obj": u"him", u"pos": u"his", u"sub": u"he" }, u"neuter": { u"obj": u"it", u"pos": u"its", u"sub": u"it" } } # a dict of replacement macros macros = { u"eol": u"\r\n", u"bld": unichr(27) + u"[1m", u"nrm": unichr(27) + u"[0m", u"blk": unichr(27) + u"[30m", u"blu": unichr(27) + u"[34m", u"cyn": unichr(27) + u"[36m", u"grn": unichr(27) + u"[32m", u"mgt": unichr(27) + u"[35m", u"red": unichr(27) + u"[31m", u"yel": unichr(27) + u"[33m", } # add dynamic macros where possible if user.account: account_name = user.account.get(u"name") if account_name: macros[u"account"] = account_name if user.avatar: avatar_gender = user.avatar.get(u"gender") if avatar_gender: macros[u"tpop"] = pronouns[avatar_gender][u"obj"] macros[u"tppp"] = pronouns[avatar_gender][u"pos"] macros[u"tpsp"] = pronouns[avatar_gender][u"sub"] # loop until broken while True: # find and replace per the macros dict macro_start = text.find(u"$(") if macro_start == -1: break macro_end = text.find(u")", macro_start) + 1 macro = text[macro_start+2:macro_end-1] if macro in macros.keys(): replacement = macros[macro] # this is how we handle local file inclusion (dangerous!) elif macro.startswith(u"inc:"): incfile = data.find_file(macro[4:], universe=universe) if os.path.exists(incfile): incfd = codecs.open(incfile, u"r", u"utf-8") replacement = u"" for line in incfd: if line.endswith(u"\n") and not line.endswith(u"\r\n"): line = line.replace(u"\n", u"\r\n") replacement += line # lose the trailing eol replacement = replacement[:-2] else: replacement = u"" log(u"Couldn't read included " + incfile + u" file.", 6) # if we get here, log and replace it with null else: replacement = u"" if not is_input: log(u"Unexpected replacement macro " + macro + u" encountered.", 6) # and now we act on the replacement text = text.replace(u"$(" + macro + u")", replacement) # replace the look-like-a-macro sequence text = text.replace(u"$_(", u"$(") return text def escape_macros(text): u"""Escapes replacement macros in text.""" return text.replace(u"$(", u"$_(") def first_word(text, separator=u" "): u"""Returns a tuple of the first word and the rest.""" if text: if text.find(separator) > 0: return text.split(separator, 1) else: return text, u"" else: return u"", u"" def on_pulse(): u"""The things which should happen on each pulse, aside from reloads.""" import time # open the listening socket if it hasn't been already if not hasattr(universe, u"listening_socket"): universe.initialize_server_socket() # assign a user if a new connection is waiting user = check_for_connection(universe.listening_socket) if user: universe.userlist.append(user) # iterate over the connected users for user in universe.userlist: user.pulse() # add an element for counters if it doesn't exist if not u"counters" in universe.categories[u"internal"]: universe.categories[u"internal"][u"counters"] = Element( u"internal:counters", universe ) # update the log every now and then if not universe.categories[u"internal"][u"counters"].getint(u"mark"): log(unicode(len(universe.userlist)) + u" connection(s)") universe.categories[u"internal"][u"counters"].set( u"mark", universe.categories[u"internal"][u"time"].getint( u"frequency_log" ) ) else: universe.categories[u"internal"][u"counters"].set( u"mark", universe.categories[u"internal"][u"counters"].getint( u"mark" ) - 1 ) # periodically save everything if not universe.categories[u"internal"][u"counters"].getint(u"save"): universe.save() universe.categories[u"internal"][u"counters"].set( u"save", universe.categories[u"internal"][u"time"].getint( u"frequency_save" ) ) else: universe.categories[u"internal"][u"counters"].set( u"save", universe.categories[u"internal"][u"counters"].getint( u"save" ) - 1 ) # pause for a configurable amount of time (decimal seconds) time.sleep(universe.categories[u"internal"][u"time"].getfloat(u"increment")) # increase the elapsed increment counter universe.categories[u"internal"][u"counters"].set( u"elapsed", universe.categories[u"internal"][u"counters"].getint( u"elapsed" ) + 1 ) def reload_data(): u"""Reload all relevant objects.""" for user in universe.userlist[:]: user.reload() for element in universe.contents.values(): if element.origin.is_writeable(): element.reload() universe.load() def check_for_connection(listening_socket): u"""Check for a waiting connection and return a new user object.""" import telnet # try to accept a new connection try: connection, address = listening_socket.accept() except: return None # note that we got one log(u"Connection from " + address[0], 2) # disable blocking so we can proceed whether or not we can send/receive connection.setblocking(0) # create a new user object user = User() # associate this connection with it user.connection = connection # set the user's ipa from the connection's ipa user.address = address[0] # let the client know we WILL EOR (RFC 885) telnet.enable(user, telnet.TELOPT_EOR, telnet.US) user.negotiation_pause = 2 # return the new user object return user def get_menu(state, error=None, choices=None): u"""Show the correct menu text to a user.""" # make sure we don't reuse a mutable sequence by default if choices is None: choices = {} # get the description or error text message = get_menu_description(state, error) # get menu choices for the current state message += get_formatted_menu_choices(state, choices) # try to get a prompt, if it was defined message += get_menu_prompt(state) # throw in the default choice, if it exists message += get_formatted_default_menu_choice(state) # display a message indicating if echo is off message += get_echo_message(state) # return the assembly of various strings defined above return message def menu_echo_on(state): u"""True if echo is on, false if it is off.""" return universe.categories[u"menu"][state].getboolean(u"echo", True) def get_echo_message(state): u"""Return a message indicating that echo is off.""" if menu_echo_on(state): return u"" else: return u"(won't echo) " def get_default_menu_choice(state): u"""Return the default choice for a menu.""" return universe.categories[u"menu"][state].get(u"default") def get_formatted_default_menu_choice(state): u"""Default menu choice foratted for inclusion in a prompt string.""" default_choice = get_default_menu_choice(state) if default_choice: return u"[$(red)" + default_choice + u"$(nrm)] " else: return u"" def get_menu_description(state, error): u"""Get the description or error text.""" # an error condition was raised by the handler if error: # try to get an error message matching the condition # and current state description = universe.categories[u"menu"][state].get(u"error_" + error) if not description: description = u"That is not a valid choice..." description = u"$(red)" + description + u"$(nrm)" # there was no error condition else: # try to get a menu description for the current state description = universe.categories[u"menu"][state].get(u"description") # return the description or error message if description: description += u"$(eol)$(eol)" return description def get_menu_prompt(state): u"""Try to get a prompt, if it was defined.""" prompt = universe.categories[u"menu"][state].get(u"prompt") if prompt: prompt += u" " return prompt def get_menu_choices(user): u"""Return a dict of choice:meaning.""" menu = universe.categories[u"menu"][user.state] create_choices = menu.get(u"create") if create_choices: choices = eval(create_choices) else: choices = {} ignores = [] options = {} creates = {} for facet in menu.facets(): if facet.startswith(u"demand_") and not eval( universe.categories[u"menu"][user.state].get(facet) ): ignores.append(facet.split(u"_", 2)[1]) elif facet.startswith(u"create_"): creates[facet] = facet.split(u"_", 2)[1] elif facet.startswith(u"choice_"): options[facet] = facet.split(u"_", 2)[1] for facet in creates.keys(): if not creates[facet] in ignores: choices[creates[facet]] = eval(menu.get(facet)) for facet in options.keys(): if not options[facet] in ignores: choices[options[facet]] = menu.get(facet) return choices def get_formatted_menu_choices(state, choices): u"""Returns a formatted string of menu choices.""" choice_output = u"" choice_keys = choices.keys() choice_keys.sort() for choice in choice_keys: choice_output += u" [$(red)" + choice + u"$(nrm)] " + choices[ choice ] + u"$(eol)" if choice_output: choice_output += u"$(eol)" return choice_output def get_menu_branches(state): u"""Return a dict of choice:branch.""" branches = {} for facet in universe.categories[u"menu"][state].facets(): if facet.startswith(u"branch_"): branches[ facet.split(u"_", 2)[1] ] = universe.categories[u"menu"][state].get(facet) return branches def get_default_branch(state): u"""Return the default branch.""" return universe.categories[u"menu"][state].get(u"branch") def get_choice_branch(user, choice): u"""Returns the new state matching the given choice.""" branches = get_menu_branches(user.state) if choice in branches.keys(): return branches[choice] elif choice in user.menu_choices.keys(): return get_default_branch(user.state) else: return u"" def get_menu_actions(state): u"""Return a dict of choice:branch.""" actions = {} for facet in universe.categories[u"menu"][state].facets(): if facet.startswith(u"action_"): actions[ facet.split(u"_", 2)[1] ] = universe.categories[u"menu"][state].get(facet) return actions def get_default_action(state): u"""Return the default action.""" return universe.categories[u"menu"][state].get(u"action") def get_choice_action(user, choice): u"""Run any indicated script for the given choice.""" actions = get_menu_actions(user.state) if choice in actions.keys(): return actions[choice] elif choice in user.menu_choices.keys(): return get_default_action(user.state) else: return u"" def handle_user_input(user): u"""The main handler, branches to a state-specific handler.""" import telnet # if the user's client echo is off, send a blank line for aesthetics if telnet.is_enabled(user, telnet.TELOPT_ECHO, telnet.US): user.send(u"", add_prompt=False, prepend_padding=False) # check to make sure the state is expected, then call that handler if u"handler_" + user.state in globals(): exec(u"handler_" + user.state + u"(user)") else: generic_menu_handler(user) # since we got input, flag that the menu/prompt needs to be redisplayed user.menu_seen = False # update the last_input timestamp while we're at it user.last_input = universe.get_time() def generic_menu_handler(user): u"""A generic menu choice handler.""" # get a lower-case representation of the next line of input if user.input_queue: choice = user.input_queue.pop(0) if choice: choice = choice.lower() else: choice = u"" if not choice: choice = get_default_menu_choice(user.state) if choice in user.menu_choices: exec(get_choice_action(user, choice)) new_state = get_choice_branch(user, choice) if new_state: user.state = new_state else: user.error = u"default" def handler_entering_account_name(user): u"""Handle the login account name.""" # get the next waiting line of input input_data = user.input_queue.pop(0) # did the user enter anything? if input_data: # keep only the first word and convert to lower-case name = input_data.lower() # fail if there are non-alphanumeric characters if name != filter( lambda x: x>=u"0" and x<=u"9" or x>=u"a" and x<=u"z", name ): user.error = u"bad_name" # if that account exists, time to request a password elif name in universe.categories[u"account"]: user.account = universe.categories[u"account"][name] user.state = u"checking_password" # otherwise, this could be a brand new user else: user.account = Element(u"account:" + name, universe) user.account.set(u"name", name) log(u"New user: " + name, 2) user.state = u"checking_new_account_name" # if the user entered nothing for a name, then buhbye else: user.state = u"disconnecting" def handler_checking_password(user): u"""Handle the login account password.""" import password # get the next waiting line of input input_data = user.input_queue.pop(0) # does the hashed input equal the stored hash? if password.verify( input_data, user.account.get(u"passhash") ): # if so, set the username and load from cold storage if not user.replace_old_connections(): user.authenticate() user.state = u"main_utility" # if at first your hashes don't match, try, try again elif user.password_tries < universe.categories[ u"internal" ][ u"limits" ].getint( u"password_tries" ) - 1: user.password_tries += 1 user.error = u"incorrect" # we've exceeded the maximum number of password failures, so disconnect else: user.send( u"$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)" ) user.state = u"disconnecting" def handler_entering_new_password(user): u"""Handle a new password entry.""" import password # get the next waiting line of input input_data = user.input_queue.pop(0) # make sure the password is strong--at least one upper, one lower and # one digit, seven or more characters in length if len(input_data) > 6 and len( filter( lambda x: x>=u"0" and x<=u"9", input_data ) ) and len( filter( lambda x: x>=u"A" and x<=u"Z", input_data ) ) and len( filter( lambda x: x>=u"a" and x<=u"z", input_data ) ): # hash and store it, then move on to verification user.account.set( u"passhash", password.create(input_data) ) user.state = u"verifying_new_password" # the password was weak, try again if you haven't tried too many times elif user.password_tries < universe.categories[ u"internal" ][ u"limits" ].getint( u"password_tries" ) - 1: user.password_tries += 1 user.error = u"weak" # too many tries, so adios else: user.send( u"$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)" ) user.account.destroy() user.state = u"disconnecting" def handler_verifying_new_password(user): u"""Handle the re-entered new password for verification.""" import password # get the next waiting line of input input_data = user.input_queue.pop(0) # hash the input and match it to storage if password.verify( input_data, user.account.get(u"passhash") ): user.authenticate() # the hashes matched, so go active if not user.replace_old_connections(): user.state = u"main_utility" # go back to entering the new password as long as you haven't tried # too many times elif user.password_tries < universe.categories[ u"internal" ][ u"limits" ].getint( u"password_tries" ) - 1: user.password_tries += 1 user.error = u"differs" user.state = u"entering_new_password" # otherwise, sayonara else: user.send( u"$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)" ) user.account.destroy() user.state = u"disconnecting" def handler_active(user): u"""Handle input for active users.""" # get the next waiting line of input input_data = user.input_queue.pop(0) # is there input? if input_data: # split out the command and parameters actor = user.avatar mode = actor.get(u"mode") if mode and input_data.startswith(u"!"): command_name, parameters = first_word(input_data[1:]) elif mode == u"chat": command_name = u"say" parameters = input_data else: command_name, parameters = first_word(input_data) # lowercase the command command_name = command_name.lower() # the command matches a command word for which we have data if command_name in universe.categories[u"command"]: command = universe.categories[u"command"][command_name] else: command = None # if it's allowed, do it if actor.can_run(command): exec(command.get(u"action")) # otherwise, give an error elif command_name: command_error(actor, input_data) # if no input, just idle back with a prompt else: user.send(u"", just_prompt=True) def command_halt(actor, parameters): u"""Halt the world.""" if actor.owner: # see if there's a message or use a generic one if parameters: message = u"Halting: " + parameters else: message = u"User " + actor.owner.account.get( u"name" ) + u" halted the world." # let everyone know broadcast(message, add_prompt=False) log(message, 8) # set a flag to terminate the world universe.terminate_flag = True def command_reload(actor): u"""Reload all code modules, configs and data.""" if actor.owner: # let the user know and log actor.send(u"Reloading all code modules, configs and data.") log( u"User " + actor.owner.account.get(u"name") + u" reloaded the world.", 8 ) # set a flag to reload universe.reload_flag = True def command_quit(actor): u"""Leave the world and go back to the main menu.""" if actor.owner: actor.owner.state = u"main_utility" actor.owner.deactivate_avatar() def command_help(actor, parameters): u"""List available commands and provide help for commands.""" # did the user ask for help on a specific command word? if parameters and actor.owner: # is the command word one for which we have data? if parameters in universe.categories[u"command"]: command = universe.categories[u"command"][parameters] else: command = None # only for allowed commands if actor.can_run(command): # add a description if provided description = command.get(u"description") if not description: description = u"(no short description provided)" if command.getboolean(u"administrative"): output = u"$(red)" else: output = u"$(grn)" output += parameters + u"$(nrm) - " + description + u"$(eol)$(eol)" # add the help text if provided help_text = command.get(u"help") if not help_text: help_text = u"No help is provided for this command." output += help_text # list related commands see_also = command.getlist(u"see_also") if see_also: really_see_also = u"" for item in see_also: if item in universe.categories[u"command"]: command = universe.categories[u"command"][item] if actor.can_run(command): if really_see_also: really_see_also += u", " if command.getboolean(u"administrative"): really_see_also += u"$(red)" else: really_see_also += u"$(grn)" really_see_also += item + u"$(nrm)" if really_see_also: output += u"$(eol)$(eol)See also: " + really_see_also # no data for the requested command word else: output = u"That is not an available command." # no specific command word was indicated else: # give a sorted list of commands with descriptions if provided output = u"These are the commands available to you:$(eol)$(eol)" sorted_commands = universe.categories[u"command"].keys() sorted_commands.sort() for item in sorted_commands: command = universe.categories[u"command"][item] if actor.can_run(command): description = command.get(u"description") if not description: description = u"(no short description provided)" if command.getboolean(u"administrative"): output += u" $(red)" else: output += u" $(grn)" output += item + u"$(nrm) - " + description + u"$(eol)" output += u"$(eol)Enter \"help COMMAND\" for help on a command " \ + u"named \"COMMAND\"." # send the accumulated output to the user actor.send(output) def command_move(actor, parameters): u"""Move the avatar in a given direction.""" if parameters in universe.contents[actor.get(u"location")].portals(): actor.move_direction(parameters) else: actor.send(u"You cannot go that way.") def command_look(actor, parameters): u"""Look around.""" if parameters: actor.send(u"You can't look at or in anything yet.") else: actor.look_at(actor.get(u"location")) def command_say(actor, parameters): u"""Speak to others in the same area.""" import unicodedata # check for replacement macros and escape them parameters = escape_macros(parameters) # if the message is wrapped in quotes, remove them and leave contents intact if parameters.startswith(u"\"") and parameters.endswith(u"\""): message = parameters[1:-1] literal = True # otherwise, get rid of stray quote marks on the ends of the message else: message = parameters.strip(u"\"'`") literal = False # the user entered a message if message: # match the punctuation used, if any, to an action actions = universe.categories[u"internal"][u"language"].getdict( u"actions" ) default_punctuation = universe.categories[u"internal"][u"language"].get( u"default_punctuation" ) action = u"" for mark in actions.keys(): if not literal and message.endswith(mark): action = actions[mark] break # add punctuation if needed if not action: action = actions[default_punctuation] if message and not ( literal or unicodedata.category(message[-1]) == u"Po" ): message += default_punctuation # failsafe checks to avoid unwanted reformatting and null strings if message and not literal: # decapitalize the first letter to improve matching message = message[0].lower() + message[1:] # iterate over all words in message, replacing typos typos = universe.categories[u"internal"][u"language"].getdict( u"typos" ) words = message.split() for index in range(len(words)): word = words[index] while unicodedata.category(word[0]) == u"Po": word = word[1:] while unicodedata.category(word[-1]) == u"Po": word = word[:-1] if word in typos.keys(): words[index] = words[index].replace(word, typos[word]) message = u" ".join(words) # capitalize the first letter message = message[0].upper() + message[1:] # tell the area if message: actor.echo_to_location( actor.get(u"name") + u" " + action + u"s, \"" + message + u"\"" ) actor.send(u"You " + action + u", \"" + message + u"\"") # there was no message else: actor.send(u"What do you want to say?") def command_chat(actor): u"""Toggle chat mode.""" mode = actor.get(u"mode") if not mode: actor.set(u"mode", u"chat") actor.send(u"Entering chat mode (use $(grn)!chat$(nrm) to exit).") elif mode == u"chat": actor.remove_facet(u"mode") actor.send(u"Exiting chat mode.") else: actor.send(u"Sorry, but you're already busy with something else!") def command_show(actor, parameters): u"""Show program data.""" import re message = u"" arguments = parameters.split() if not parameters: message = u"What do you want to show?" elif arguments[0] == u"time": message = universe.categories[u"internal"][u"counters"].get( u"elapsed" ) + u" increments elapsed since the world was created." elif arguments[0] == u"categories": message = u"These are the element categories:$(eol)" categories = universe.categories.keys() categories.sort() for category in categories: message += u"$(eol) $(grn)" + category + u"$(nrm)" elif arguments[0] == u"files": message = u"These are the current files containing the universe:$(eol)" filenames = universe.files.keys() filenames.sort() for filename in filenames: if universe.files[filename].is_writeable(): status = u"rw" else: status = u"ro" message += u"$(eol) $(red)(" + status + u") $(grn)" + filename \ + u"$(nrm)" elif arguments[0] == u"category": if len(arguments) != 2: message = u"You must specify one category." elif arguments[1] in universe.categories: message = u"These are the elements in the \"" + arguments[1] \ + u"\" category:$(eol)" elements = [ ( universe.categories[arguments[1]][x].key ) for x in universe.categories[arguments[1]].keys() ] elements.sort() for element in elements: message += u"$(eol) $(grn)" + element + u"$(nrm)" else: message = u"Category \"" + arguments[1] + u"\" does not exist." elif arguments[0] == u"file": if len(arguments) != 2: message = u"You must specify one file." elif arguments[1] in universe.files: message = u"These are the elements in the \"" + arguments[1] \ + u"\" file:$(eol)" elements = universe.files[arguments[1]].data.sections() elements.sort() for element in elements: message += u"$(eol) $(grn)" + element + u"$(nrm)" else: message = u"Category \"" + arguments[1] + u"\" does not exist." elif arguments[0] == u"element": if len(arguments) != 2: message = u"You must specify one element." elif arguments[1] in universe.contents: element = universe.contents[arguments[1]] message = u"These are the properties of the \"" + arguments[1] \ + u"\" element (in \"" + element.origin.filename + u"\"):$(eol)" facets = element.facets() facets.sort() for facet in facets: message += u"$(eol) $(grn)" + facet + u": $(red)" \ + escape_macros(element.get(facet)) + u"$(nrm)" else: message = u"Element \"" + arguments[1] + u"\" does not exist." elif arguments[0] == u"result": if len(arguments) < 2: message = u"You need to specify an expression." else: try: message = repr(eval(u" ".join(arguments[1:]))) except: message = u"Your expression raised an exception!" elif arguments[0] == u"log": if len(arguments) == 4: if re.match(u"^\d+$", arguments[3]) and int(arguments[3]) >= 0: stop = int(arguments[3]) else: stop = -1 else: stop = 0 if len(arguments) >= 3: if re.match(u"^\d+$", arguments[2]) and int(arguments[2]) > 0: start = int(arguments[2]) else: start = -1 else: start = 10 if len(arguments) >= 2: if re.match(u"^\d+$", arguments[1]) and 0 <= int(arguments[1]) <= 9: level = int(arguments[1]) else: level = -1 elif 0 <= actor.owner.account.getint(u"loglevel") <= 9: level = actor.owner.account.getint(u"loglevel") else: level = 1 if level > -1 and start > -1 and stop > -1: message = get_loglines(level, start, stop) else: message = u"When specified, level must be 0-9 (default 1), " \ + u"start and stop must be >=1 (default 10 and 1)." else: message = u"I don't know what \"" + parameters + u"\" is." actor.send(message) def command_create(actor, parameters): u"""Create an element if it does not exist.""" if not parameters: message = u"You must at least specify an element to create." elif not actor.owner: message = u"" else: arguments = parameters.split() if len(arguments) == 1: arguments.append(u"") if len(arguments) == 2: element, filename = arguments if element in universe.contents: message = u"The \"" + element + u"\" element already exists." else: message = u"You create \"" + element + u"\" within the universe." logline = actor.owner.account.get( u"name" ) + u" created an element: " + element if filename: logline += u" in file " + filename if filename not in universe.files: message += u" Warning: \"" + filename \ + u"\" is not yet included in any other file and will " \ + u"not be read on startup unless this is remedied." Element(element, universe, filename) log(logline, 6) elif len(arguments) > 2: message = u"You can only specify an element and a filename." actor.send(message) def command_destroy(actor, parameters): u"""Destroy an element if it exists.""" if actor.owner: if not parameters: message = u"You must specify an element to destroy." else: if parameters not in universe.contents: message = u"The \"" + parameters + u"\" element does not exist." else: universe.contents[parameters].destroy() message = u"You destroy \"" + parameters \ + u"\" within the universe." log( actor.owner.account.get( u"name" ) + u" destroyed an element: " + parameters, 6 ) actor.send(message) def command_set(actor, parameters): u"""Set a facet of an element.""" if not parameters: message = u"You must specify an element, a facet and a value." else: arguments = parameters.split(u" ", 2) if len(arguments) == 1: message = u"What facet of element \"" + arguments[0] \ + u"\" would you like to set?" elif len(arguments) == 2: message = u"What value would you like to set for the \"" \ + arguments[1] + u"\" facet of the \"" + arguments[0] \ + u"\" element?" else: element, facet, value = arguments if element not in universe.contents: message = u"The \"" + element + u"\" element does not exist." else: universe.contents[element].set(facet, value) message = u"You have successfully (re)set the \"" + facet \ + u"\" facet of element \"" + element \ + u"\". Try \"show element " + element + u"\" for verification." actor.send(message) def command_delete(actor, parameters): u"""Delete a facet from an element.""" if not parameters: message = u"You must specify an element and a facet." else: arguments = parameters.split(u" ") if len(arguments) == 1: message = u"What facet of element \"" + arguments[0] \ + u"\" would you like to delete?" elif len(arguments) != 2: message = u"You may only specify an element and a facet." else: element, facet = arguments if element not in universe.contents: message = u"The \"" + element + u"\" element does not exist." elif facet not in universe.contents[element].facets(): message = u"The \"" + element + u"\" element has no \"" + facet \ + u"\" facet." else: universe.contents[element].remove_facet(facet) message = u"You have successfully deleted the \"" + facet \ + u"\" facet of element \"" + element \ + u"\". Try \"show element " + element + u"\" for verification." actor.send(message) def command_error(actor, input_data): u"""Generic error for an unrecognized command word.""" import random # 90% of the time use a generic error if random.randrange(10): message = u"I'm not sure what \"" + input_data + u"\" means..." # 10% of the time use the classic diku error else: message = u"Arglebargle, glop-glyf!?!" # send the error message actor.send(message) def daemonize(universe): u"""Fork and disassociate from everything.""" import codecs, ctypes, ctypes.util, os, os.path, sys # only if this is what we're configured to do if universe.contents[u"internal:process"].getboolean(u"daemon"): # if possible, we want to rename the process to the same as the script # (these will need to be byte type during 2to3 migration) new_argv = "\0".join(sys.argv) + "\0" new_short_argv0 = os.path.basename(sys.argv[0]) + "\0" # attempt the linux way first try: argv_array = ctypes.POINTER(ctypes.c_char_p) ctypes.pythonapi.Py_GetArgcArgv.argtypes = ( ctypes.POINTER(ctypes.c_int), ctypes.POINTER(argv_array) ) argc = argv_array() ctypes.pythonapi.Py_GetArgcArgv( ctypes.c_int(0), ctypes.pointer(argc) ) old_argv0_size = len(argc.contents.value) ctypes.memset( argc.contents, 0, len(new_argv)+old_argv0_size ) ctypes.memmove( argc.contents, new_argv, len(new_argv) ) ctypes.CDLL( ctypes.util.find_library(u"c") ).prctl( 15, new_short_argv0, 0, 0, 0 ) except: # since that failed, maybe it's bsd? try: # much simpler, since bsd has a libc function call for this ctypes.CDLL( ctypes.util.find_library(u"c") ).setproctitle( new_argv ) except: # that didn't work either, so just log that we couldn't log(u"Failed to rename the interpreter process (cosmetic).") # log before we start forking around, so the terminal gets the message log(u"Disassociating from the controlling terminal.") # fork off and die, so we free up the controlling terminal if os.fork(): os._exit(0) # switch to a new process group os.setsid() # fork some more, this time to free us from the old process group if os.fork(): os._exit(0) # reset the working directory so we don't needlessly tie up mounts os.chdir(u"/") # clear the file creation mask so we can bend it to our will later os.umask(0) # redirect stdin/stdout/stderr and close off their former descriptors for stdpipe in range(3): os.close(stdpipe) sys.stdin = codecs.open(u"/dev/null", u"r", u"utf-8") sys.__stdin__ = codecs.open(u"/dev/null", u"r", u"utf-8") sys.stdout = codecs.open(u"/dev/null", u"w", u"utf-8") sys.stderr = codecs.open(u"/dev/null", u"w", u"utf-8") sys.__stdout__ = codecs.open(u"/dev/null", u"w", u"utf-8") sys.__stderr__ = codecs.open(u"/dev/null", u"w", u"utf-8") def create_pidfile(universe): u"""Write a file containing the current process ID.""" import codecs, os, os.path pid = unicode(os.getpid()) log(u"Process ID: " + pid) file_name = universe.contents[u"internal:process"].get(u"pidfile") if file_name: if not os.path.isabs(file_name): file_name = os.path.join(universe.startdir, file_name) file_descriptor = codecs.open(file_name, u"w", u"utf-8") file_descriptor.write(pid + u"\n") file_descriptor.flush() file_descriptor.close() def remove_pidfile(universe): u"""Remove the file containing the current process ID.""" import os, os.path file_name = universe.contents[u"internal:process"].get(u"pidfile") if file_name: if not os.path.isabs(file_name): file_name = os.path.join(universe.startdir, file_name) if os.access(file_name, os.W_OK): os.remove(file_name) def excepthook(excepttype, value, tracebackdata): u"""Handle uncaught exceptions.""" import traceback # assemble the list of errors into a single string message = u"".join( traceback.format_exception(excepttype, value, tracebackdata) ) # try to log it, if possible try: log(message, 9) except: pass # try to write it to stderr, if possible try: sys.stderr.write(message) except: pass def sighook(what, where): u"""Handle external signals.""" import signal # a generic message message = u"Caught signal: " # for a hangup signal if what == signal.SIGHUP: message += u"hangup (reloading)" universe.reload_flag = True # for a terminate signal elif what == signal.SIGTERM: message += u"terminate (halting)" universe.terminate_flag = True # catchall for unexpected signals else: message += unicode(what) + u" (unhandled)" # log what happened log(message, 8) def override_excepthook(): u"""Redefine sys.excepthook with our own.""" import sys sys.excepthook = excepthook def assign_sighook(): u"""Assign a customized handler for some signals.""" import signal signal.signal(signal.SIGHUP, sighook) signal.signal(signal.SIGTERM, sighook) def setup(): """This contains functions to be performed when starting the engine.""" import sys # see if a configuration file was specified if len(sys.argv) > 1: conffile = sys.argv[1] else: conffile = u"" # the big bang global universe universe = Universe(conffile, True) # log an initial message log(u"Started mudpy with command line: " + u" ".join(sys.argv)) # fork and disassociate daemonize(universe) # override the default exception handler so we get logging first thing override_excepthook() # set up custom signal handlers assign_sighook() # make the pidfile create_pidfile(universe) # pass the initialized universe back return universe def finish(): """This contains functions to be performed when shutting down the engine.""" # the loop has terminated, so save persistent data universe.save() # log a final message log(u"Shutting down now.") # get rid of the pidfile remove_pidfile(universe)