Copyright (c) 2005 mudpy, Jeremy Stanley <fungi@yuggoth.org>, all rights reserved.
Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions
-are met:
+modification, are permitted provided that the following conditions are met:
- - Redistributions of source code must retain the above copyright
- notice, this list of conditions and the following disclaimer.
- - Redistributions in binary form must reproduce the above
- copyright notice, this list of conditions and the following
- disclaimer in the documentation and/or other materials provided
- with the distribution.
+ - Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+ - Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
-FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
-COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
-INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
-BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
-LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
-CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
-ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-POSSIBILITY OF SUCH DAMAGE.
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
[menu:choose_name]
prompt = Choose a name for $(tpop):
description = Your new avatar needs a name. This will be the name with which $(tpsp) grew up, and will initially be the name by which $(tpsp) is known in the world of Example. There are ways for your new avatar to make a name for $(tpop)self over time, so $(tpsp) won't be stuck going by such an unremarkable name forever.
-create_1 = muffmisc.random_name()
-create_3 = muffmisc.random_name()
-create_2 = muffmisc.random_name()
-create_5 = muffmisc.random_name()
-create_4 = muffmisc.random_name()
-create_7 = muffmisc.random_name()
-create_6 = muffmisc.random_name()
+create_1 = random_name()
+create_3 = random_name()
+create_2 = random_name()
+create_5 = random_name()
+create_4 = random_name()
+create_7 = random_name()
+create_6 = random_name()
branch = active
action = user.avatar.set("name", user.menu_choices[choice])
#!/usr/bin/python
-"""Skeletal executable for the MUFF Engine"""
+"""Skeletal executable for the mudpy engine."""
# Copyright (c) 2005 mudpy, Jeremy Stanley <fungi@yuggoth.org>, all rights reserved.
# Licensed per terms in the LICENSE file distributed with this software.
-# muff uses the ini-style configs supported by the ConfigParser module
-import ConfigParser
+# core objects for the mudpy engine
+import mudpy
+from mudpy import log, on_pulse, reload_data, universe
-# need the sys module to alter the import path appropriately
-import sys
+# loop indefinitely while the world is not flagged for termination or
+# there are connected users
+while not universe.terminate_world or universe.userlist:
-def get_main_loop():
- """Find and return the main loop function"""
+ # the world was flagged for a reload of all code/data
+ if universe.reload_modules:
- # figure out where to find our main configuration file
- config_data = ConfigParser.SafeConfigParser()
- config_dirs = [".", "./etc", "/usr/local/muff", "/usr/local/muff/etc", "/etc/muff", "/etc" ]
- config_name = "mudpy.conf"
- config_files = []
- for each_dir in config_dirs:
- config_files.append(each_dir + "/" + config_name)
+ # reload the mudpy module
+ reload(mudpy)
- # load the config file, get the module path and add it to sys.path
- config_data.read(config_files)
- module_path = config_data.get("files", "modules")
- sys.path.append(module_path)
+ # move data into new persistent objects
+ reload_data()
- # import the main loop function
- from muff.muffmain import main
- return main
+ # reset the reload flag
+ universe.reload_modules = False
-# load the main loop and run it
-main = get_main_loop()
-main()
+ # do what needs to be done on each pulse
+ on_pulse()
+
+# the loop has terminated, so save persistent data
+universe.save()
+
+# log a final message
+log("Shutting down now.")
[include]
testdata = testdata
-[files]
-modules =
-
[internal:general]
password_tries = 3
--- /dev/null
+"""Core objects for the mudpy engine."""
+
+# Copyright (c) 2005 mudpy, Jeremy Stanley <fungi@yuggoth.org>, all rights reserved.
+# Licensed per terms in the LICENSE file distributed with this software.
+
+# import some things we need
+from ConfigParser import SafeConfigParser
+from md5 import new as new_md5
+from os import F_OK, R_OK, access, getcwd, makedirs, sep
+from random import choice, randrange
+from socket import AF_INET, SO_REUSEADDR, SOCK_STREAM, SOL_SOCKET, socket
+from time import asctime, sleep
+
+# a dict of replacement macros
+macros = {
+ "$(eol)": "\r\n",
+ "$(bld)": chr(27) + "[1m",
+ "$(nrm)": chr(27) + "[0m",
+ "$(blk)": chr(27) + "[30m",
+ "$(grn)": chr(27) + "[32m",
+ "$(red)": chr(27) + "[31m"
+ }
+
+class Element:
+ """An element of the universe."""
+ def __init__(self, key, universe, origin=""):
+ """Default values for the in-memory element variables."""
+ self.key = key
+ if self.key.find(":") > 0:
+ self.category, self.subkey = self.key.split(":", 1)
+ else:
+ self.category = "other"
+ self.subkey = self.key
+ if not self.category in universe.categories: self.category = "other"
+ universe.categories[self.category][self.subkey] = self
+ self.origin = origin
+ if not self.origin: self.origin = universe.default_origins[self.category]
+ if not self.origin.startswith(sep):
+ self.origin = getcwd() + sep + self.origin
+ universe.contents[self.key] = self
+ if not self.origin in universe.files:
+ DataFile(self.origin, universe)
+ if not universe.files[self.origin].data.has_section(self.key):
+ universe.files[self.origin].data.add_section(self.key)
+ def delete(self):
+ log("Deleting: " + self.key + ".")
+ universe.files[self.origin].data.remove_section(self.key)
+ del universe.categories[self.category][self.subkey]
+ del universe.contents[self.key]
+ del self
+ def facets(self):
+ """Return a list of facets for this element."""
+ return universe.files[self.origin].data.options(self.key)
+ def get(self, facet):
+ """Retrieve values."""
+ if universe.files[self.origin].data.has_option(self.key, facet):
+ return universe.files[self.origin].data.get(self.key, facet)
+ else:
+ return ""
+ def getboolean(self, facet, default=False):
+ """Retrieve values as boolean type."""
+ if universe.files[self.origin].data.has_option(self.key, facet):
+ return universe.files[self.origin].data.getboolean(self.key, facet)
+ else:
+ return default
+ def getint(self, facet):
+ """Convenience method to coerce return values as type int."""
+ value = self.get(facet)
+ if not value: value = 0
+ elif type(value) is str: value = value.rstrip("L")
+ return int(value)
+ def getfloat(self, facet):
+ """Convenience method to coerce return values as type float."""
+ value = self.get(facet)
+ if not value: value = 0
+ elif type(value) is str: value = value.rstrip("L")
+ return float(value)
+ def set(self, facet, value):
+ """Set values."""
+ if not type(value) is str: value = repr(value)
+ universe.files[self.origin].data.set(self.key, facet, value)
+
+class DataFile:
+ """A file containing universe elements."""
+ def __init__(self, filename, universe):
+ filedir = sep.join(filename.split(sep)[:-1])
+ self.data = SafeConfigParser()
+ if access(filename, R_OK): self.data.read(filename)
+ self.filename = filename
+ universe.files[filename] = self
+ if "categories" in self.data.sections():
+ for option in self.data.options("categories"):
+ universe.default_origins[option] = self.data.get("categories", option)
+ if not option in universe.categories:
+ universe.categories[option] = {}
+ for section in self.data.sections():
+ if section == "categories" or section == "include":
+ for option in self.data.options(section):
+ includefile = self.data.get(section, option)
+ if not includefile.startswith(sep):
+ includefile = filedir + sep + includefile
+ DataFile(includefile, universe)
+ elif section != "control":
+ Element(section, universe, filename)
+ def save(self):
+ if self.data.sections() and not ( "control" in self.data.sections() and self.data.getboolean("control", "read_only") ):
+ basedir = sep.join(self.filename.split(sep)[:-1])
+ if not access(basedir, F_OK): makedirs(basedir)
+ file_descriptor = file(self.filename, "w")
+ self.data.write(file_descriptor)
+ file_descriptor.flush()
+ file_descriptor.close()
+
+class Universe:
+ """The universe."""
+ def __init__(self, filename=""):
+ """Initialize the universe."""
+ self.categories = {}
+ self.contents = {}
+ self.default_origins = {}
+ self.files = {}
+ self.userlist = []
+ self.terminate_world = False
+ self.reload_modules = False
+ if not filename:
+ possible_filenames = [
+ ".mudpyrc",
+ ".mudpy/mudpyrc",
+ ".mudpy/mudpy.conf",
+ "mudpy.conf",
+ "etc/mudpy.conf",
+ "/usr/local/mudpy/mudpy.conf",
+ "/usr/local/mudpy/etc/mudpy.conf",
+ "/etc/mudpy/mudpy.conf",
+ "/etc/mudpy.conf"
+ ]
+ for filename in possible_filenames:
+ if access(filename, R_OK): break
+ if not filename.startswith(sep):
+ filename = getcwd() + sep + filename
+ DataFile(filename, self)
+ def save(self):
+ """Save the universe to persistent storage."""
+ for key in self.files: self.files[key].save()
+
+ def initialize_server_socket(self):
+ """Create and open the listening socket."""
+
+ # create a new ipv4 stream-type socket object
+ self.listening_socket = socket(AF_INET, SOCK_STREAM)
+
+ # set the socket options to allow existing open ones to be
+ # reused (fixes a bug where the server can't bind for a minute
+ # when restarting on linux systems)
+ self.listening_socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
+
+ # bind the socket to to our desired server ipa and port
+ self.listening_socket.bind((self.categories["internal"]["network"].get("host"), self.categories["internal"]["network"].getint("port")))
+
+ # disable blocking so we can proceed whether or not we can
+ # send/receive
+ self.listening_socket.setblocking(0)
+
+ # start listening on the socket
+ self.listening_socket.listen(1)
+
+ # note that we're now ready for user connections
+ log("Waiting for connection(s)...")
+
+class User:
+ """This is a connected user."""
+
+ def __init__(self):
+ """Default values for the in-memory user variables."""
+ self.address = ""
+ self.last_address = ""
+ self.connection = None
+ self.authenticated = False
+ self.password_tries = 1
+ self.state = "entering_account_name"
+ self.menu_seen = False
+ self.error = ""
+ self.input_queue = []
+ self.output_queue = []
+ self.partial_input = ""
+ self.echoing = True
+ self.avatar = None
+ self.account = None
+
+ def quit(self):
+ """Log, close the connection and remove."""
+ name = self.account.get("name")
+ if name: message = "User " + name
+ else: message = "An unnamed user"
+ message += " logged out."
+ log(message)
+ self.connection.close()
+ self.remove()
+
+ def reload(self):
+ """Save, load a new user and relocate the connection."""
+
+ # get out of the list
+ self.remove()
+
+ # create a new user object
+ new_user = User()
+
+ # set everything else equivalent
+ for attribute in [
+ "address",
+ "last_address",
+ "connection",
+ "authenticated",
+ "password_tries",
+ "state",
+ "menu_seen",
+ "error",
+ "input_queue",
+ "output_queue",
+ "partial_input",
+ "echoing",
+ "avatar",
+ "account"
+ ]:
+ exec("new_user." + attribute + " = self." + attribute)
+
+ # add it to the list
+ universe.userlist.append(new_user)
+
+ # get rid of the old user object
+ del(self)
+
+ def replace_old_connections(self):
+ """Disconnect active users with the same name."""
+
+ # the default return value
+ return_value = False
+
+ # iterate over each user in the list
+ for old_user in universe.userlist:
+
+ # the name is the same but it's not us
+ if old_user.account.get("name") == self.account.get("name") and old_user is not self:
+
+ # make a note of it
+ log("User " + self.account.get("name") + " reconnected--closing old connection to " + old_user.address + ".")
+ old_user.send("$(eol)$(red)New connection from " + self.address + ". Terminating old connection...$(nrm)$(eol)")
+ self.send("$(eol)$(red)Taking over old connection from " + old_user.address + ".$(nrm)")
+
+ # close the old connection
+ old_user.connection.close()
+
+ # replace the old connection with this one
+ old_user.connection = self.connection
+ old_user.last_address = old_user.address
+ old_user.address = self.address
+ old_user.echoing = self.echoing
+
+ # take this one out of the list and delete
+ self.remove()
+ del(self)
+ return_value = True
+ break
+
+ # true if an old connection was replaced, false if not
+ return return_value
+
+ def authenticate(self):
+ """Flag the user as authenticated and disconnect duplicates."""
+ if not self.state is "authenticated":
+ log("User " + self.account.get("name") + " logged in.")
+ self.authenticated = True
+
+ def show_menu(self):
+ """Send the user their current menu."""
+ if not self.menu_seen:
+ self.menu_choices = get_menu_choices(self)
+ self.send(get_menu(self.state, self.error, self.echoing, self.menu_choices), "")
+ self.menu_seen = True
+ self.error = False
+ self.adjust_echoing()
+
+ def adjust_echoing(self):
+ """Adjust echoing to match state menu requirements."""
+ if self.echoing and not menu_echo_on(self.state): self.echoing = False
+ elif not self.echoing and menu_echo_on(self.state): self.echoing = True
+
+ def remove(self):
+ """Remove a user from the list of connected users."""
+ universe.userlist.remove(self)
+
+ def send(self, output, eol="$(eol)"):
+ """Send arbitrary text to a connected user."""
+
+ # only when there is actual output
+ #if output:
+
+ # start with a newline, append the message, then end
+ # with the optional eol string passed to this function
+ # and the ansi escape to return to normal text
+ output = "\r\n" + output + eol + chr(27) + "[0m"
+
+ # find and replace macros in the output
+ output = replace_macros(self, output)
+
+ # wrap the text at 80 characters
+ # TODO: prompt user for preferred wrap width
+ output = wrap_ansi_text(output, 80)
+
+ # drop the formatted output into the output queue
+ self.output_queue.append(output)
+
+ # try to send the last item in the queue, remove it and
+ # flag that menu display is not needed
+ try:
+ self.connection.send(self.output_queue[0])
+ self.output_queue.remove(self.output_queue[0])
+ self.menu_seen = False
+
+ # but if we can't, that's okay too
+ except:
+ pass
+
+ def pulse(self):
+ """All the things to do to the user per increment."""
+
+ # if the world is terminating, disconnect
+ if universe.terminate_world:
+ self.state = "disconnecting"
+ self.menu_seen = False
+
+ # show the user a menu as needed
+ self.show_menu()
+
+ # disconnect users with the appropriate state
+ if self.state == "disconnecting":
+ self.quit()
+
+ # the user is unique and not flagged to disconnect
+ else:
+
+ # check for input and add it to the queue
+ self.enqueue_input()
+
+ # there is input waiting in the queue
+ if self.input_queue: handle_user_input(self)
+
+ def enqueue_input(self):
+ """Process and enqueue any new input."""
+
+ # check for some input
+ try:
+ input_data = self.connection.recv(1024)
+ except:
+ input_data = ""
+
+ # we got something
+ if input_data:
+
+ # tack this on to any previous partial
+ self.partial_input += input_data
+
+ # separate multiple input lines
+ new_input_lines = self.partial_input.split("\n")
+
+ # if input doesn't end in a newline, replace the
+ # held partial input with the last line of it
+ if not self.partial_input.endswith("\n"):
+ self.partial_input = new_input_lines.pop()
+
+ # otherwise, chop off the extra null input and reset
+ # the held partial input
+ else:
+ new_input_lines.pop()
+ self.partial_input = ""
+
+ # iterate over the remaining lines
+ for line in new_input_lines:
+
+ # filter out non-printables
+ line = filter(lambda x: x>=' ' and x<='~', line)
+
+ # strip off extra whitespace
+ line = line.strip()
+
+ # put on the end of the queue
+ self.input_queue.append(line)
+
+ def new_avatar(self):
+ """Instantiate a new, unconfigured avatar for this user."""
+ counter = universe.categories["internal"]["counters"].getint("next_avatar")
+ while "avatar:" + repr(counter + 1) in universe.categories["actor"].keys(): counter += 1
+ universe.categories["internal"]["counters"].set("next_avatar", counter + 1)
+ self.avatar = Element("actor:avatar:" + repr(counter), universe)
+ avatars = self.account.get("avatars").split()
+ avatars.append(self.avatar.key)
+ self.account.set("avatars", " ".join(avatars))
+
+ def list_avatar_names(self):
+ """A test function to list names of assigned avatars."""
+ try:
+ avatars = self.account.get("avatars").split()
+ except:
+ avatars = []
+ avatar_names = []
+ for avatar in avatars:
+ avatar_names.append(universe.contents[avatar].get("name"))
+ return avatar_names
+
+def broadcast(message):
+ """Send a message to all connected users."""
+ for each_user in universe.userlist: each_user.send("$(eol)" + message)
+
+def log(message):
+ """Log a message."""
+
+ # the time in posix log timestamp format
+ timestamp = asctime()[4:19]
+
+ # send the timestamp and message to standard output
+ print(timestamp + " " + message)
+
+def wrap_ansi_text(text, width):
+ """Wrap text with arbitrary width while ignoring ANSI colors."""
+
+ # the current position in the entire text string, including all
+ # characters, printable or otherwise
+ absolute_position = 0
+
+ # the current text position relative to the begining of the line,
+ # ignoring color escape sequences
+ relative_position = 0
+
+ # whether the current character is part of a color escape sequence
+ escape = False
+
+ # iterate over each character from the begining of the text
+ for each_character in text:
+
+ # the current character is the escape character
+ if each_character == chr(27):
+ escape = True
+
+ # the current character is within an escape sequence
+ elif escape:
+
+ # the current character is m, which terminates the
+ # current escape sequence
+ if each_character == "m":
+ escape = False
+
+ # the current character is a newline, so reset the relative
+ # position (start a new line)
+ elif each_character == "\n":
+ relative_position = 0
+
+ # the current character meets the requested maximum line width,
+ # so we need to backtrack and find a space at which to wrap
+ elif relative_position == width:
+
+ # distance of the current character examined from the
+ # relative position
+ wrap_offset = 0
+
+ # count backwards until we find a space
+ while text[absolute_position - wrap_offset] != " ":
+ wrap_offset += 1
+
+ # insert an eol in place of the space
+ text = text[:absolute_position - wrap_offset] + "\r\n" + text[absolute_position - wrap_offset + 1:]
+
+ # increase the absolute position because an eol is two
+ # characters but the space it replaced was only one
+ absolute_position += 1
+
+ # now we're at the begining of a new line, plus the
+ # number of characters wrapped from the previous line
+ relative_position = wrap_offset
+
+ # as long as the character is not a carriage return and the
+ # other above conditions haven't been met, count it as a
+ # printable character
+ elif each_character != "\r":
+ relative_position += 1
+
+ # increase the absolute position for every character
+ absolute_position += 1
+
+ # return the newly-wrapped text
+ return text
+
+def weighted_choice(data):
+ """Takes a dict weighted by value and returns a random key."""
+
+ # this will hold our expanded list of keys from the data
+ expanded = []
+
+ # create thee expanded list of keys
+ for key in data.keys():
+ for count in range(data[key]):
+ expanded.append(key)
+
+ # return one at random
+ return choice(expanded)
+
+def random_name():
+ """Returns a random character name."""
+
+ # the vowels and consonants needed to create romaji syllables
+ vowels = [ "a", "i", "u", "e", "o" ]
+ consonants = ["'", "k", "z", "s", "sh", "z", "j", "t", "ch", "ts", "d", "n", "h", "f", "m", "y", "r", "w" ]
+
+ # this dict will hold our weighted list of syllables
+ syllables = {}
+
+ # generate the list with an even weighting
+ for consonant in consonants:
+ for vowel in vowels:
+ syllables[consonant + vowel] = 1
+
+ # we'll build the name into this string
+ name = ""
+
+ # create a name of random length from the syllables
+ for syllable in range(randrange(2, 6)):
+ name += weighted_choice(syllables)
+
+ # strip any leading quotemark, capitalize and return the name
+ return name.strip("'").capitalize()
+
+def replace_macros(user, text, is_input=False):
+ """Replaces macros in text output."""
+ while True:
+ macro_start = text.find("$(")
+ if macro_start == -1: break
+ macro_end = text.find(")", macro_start) + 1
+ macro = text[macro_start:macro_end]
+ if macro in macros.keys():
+ text = text.replace(macro, macros[macro])
+
+ # the user's account name
+ elif macro == "$(account)":
+ text = text.replace(macro, user.account.get("name"))
+
+ # third person subjective pronoun
+ elif macro == "$(tpsp)":
+ if user.avatar.get("gender") == "male":
+ text = text.replace(macro, "he")
+ elif user.avatar.get("gender") == "female":
+ text = text.replace(macro, "she")
+ else:
+ text = text.replace(macro, "it")
+
+ # third person objective pronoun
+ elif macro == "$(tpop)":
+ if user.avatar.get("gender") == "male":
+ text = text.replace(macro, "him")
+ elif user.avatar.get("gender") == "female":
+ text = text.replace(macro, "her")
+ else:
+ text = text.replace(macro, "it")
+
+ # third person possessive pronoun
+ elif macro == "$(tppp)":
+ if user.avatar.get("gender") == "male":
+ text = text.replace(macro, "his")
+ elif user.avatar.get("gender") == "female":
+ text = text.replace(macro, "hers")
+ else:
+ text = text.replace(macro, "its")
+
+ # if we get here, log and replace it with null
+ else:
+ text = text.replace(macro, "")
+ if not is_input:
+ log("Unexpected replacement macro " + macro + " encountered.")
+
+ # replace the look-like-a-macro sequence
+ text = text.replace("$_(", "$(")
+
+ return text
+
+def check_time(frequency):
+ """Check for a factor of the current increment count."""
+ if type(frequency) is str:
+ frequency = universe.categories["internal"]["time"].getint(frequency)
+ if not "counters" in universe.categories["internal"]:
+ Element("internal:counters", universe)
+ return not universe.categories["internal"]["counters"].getint("elapsed") % frequency
+
+def on_pulse():
+ """The things which should happen on each pulse, aside from reloads."""
+
+ # open the listening socket if it hasn't been already
+ if not hasattr(universe, "listening_socket"):
+ universe.initialize_server_socket()
+
+ # assign a user if a new connection is waiting
+ user = check_for_connection(universe.listening_socket)
+ if user: universe.userlist.append(user)
+
+ # iterate over the connected users
+ for user in universe.userlist: user.pulse()
+
+ # update the log every now and then
+ if check_time("frequency_log"):
+ log(repr(len(universe.userlist)) + " connection(s)")
+
+ # periodically save everything
+ if check_time("frequency_save"):
+ universe.save()
+
+ # pause for a configurable amount of time (decimal seconds)
+ sleep(universe.categories["internal"]["time"].getfloat("increment"))
+
+ # increment the elapsed increment counter
+ universe.categories["internal"]["counters"].set("elapsed", universe.categories["internal"]["counters"].getint("elapsed") + 1)
+
+def reload_data():
+ """Reload data into new persistent objects."""
+ for user in universe.userlist[:]: user.reload()
+
+def check_for_connection(listening_socket):
+ """Check for a waiting connection and return a new user object."""
+
+ # try to accept a new connection
+ try:
+ connection, address = listening_socket.accept()
+ except:
+ return None
+
+ # note that we got one
+ log("Connection from " + address[0])
+
+ # disable blocking so we can proceed whether or not we can send/receive
+ connection.setblocking(0)
+
+ # create a new user object
+ user = User()
+
+ # associate this connection with it
+ user.connection = connection
+
+ # set the user's ipa from the connection's ipa
+ user.address = address[0]
+
+ # return the new user object
+ return user
+
+def get_menu(state, error=None, echoing=True, choices={}):
+ """Show the correct menu text to a user."""
+
+ # begin with a telnet echo command sequence if needed
+ message = get_echo_sequence(state, echoing)
+
+ # get the description or error text
+ message += get_menu_description(state, error)
+
+ # get menu choices for the current state
+ message += get_formatted_menu_choices(state, choices)
+
+ # try to get a prompt, if it was defined
+ message += get_menu_prompt(state)
+
+ # throw in the default choice, if it exists
+ message += get_formatted_default_menu_choice(state)
+
+ # display a message indicating if echo is off
+ message += get_echo_message(state)
+
+ # return the assembly of various strings defined above
+ return message
+
+def menu_echo_on(state):
+ """True if echo is on, false if it is off."""
+ return universe.categories["menu"][state].getboolean("echo", True)
+
+def get_echo_sequence(state, echoing):
+ """Build the appropriate IAC ECHO sequence as needed."""
+
+ # if the user has echo on and the menu specifies it should be turned
+ # off, send: iac + will + echo + null
+ if echoing and not menu_echo_on(state): return chr(255) + chr(251) + chr(1) + chr(0)
+
+ # if echo is not set to off in the menu and the user curently has echo
+ # off, send: iac + wont + echo + null
+ elif not echoing and menu_echo_on(state): return chr(255) + chr(252) + chr(1) + chr(0)
+
+ # default is not to send an echo control sequence at all
+ else: return ""
+
+def get_echo_message(state):
+ """Return a message indicating that echo is off."""
+ if menu_echo_on(state): return ""
+ else: return "(won't echo) "
+
+def get_default_menu_choice(state):
+ """Return the default choice for a menu."""
+ return universe.categories["menu"][state].get("default")
+
+def get_formatted_default_menu_choice(state):
+ """Default menu choice foratted for inclusion in a prompt string."""
+ default = get_default_menu_choice(state)
+ if default: return "[$(red)" + default + "$(nrm)] "
+ else: return ""
+
+def get_menu_description(state, error):
+ """Get the description or error text."""
+
+ # an error condition was raised by the handler
+ if error:
+
+ # try to get an error message matching the condition
+ # and current state
+ description = universe.categories["menu"][state].get("error_" + error)
+ if not description: description = "That is not a valid choice..."
+ description = "$(red)" + description + "$(nrm)"
+
+ # there was no error condition
+ else:
+
+ # try to get a menu description for the current state
+ description = universe.categories["menu"][state].get("description")
+
+ # return the description or error message
+ if description: description += "$(eol)$(eol)"
+ return description
+
+def get_menu_prompt(state):
+ """Try to get a prompt, if it was defined."""
+ prompt = universe.categories["menu"][state].get("prompt")
+ if prompt: prompt += " "
+ return prompt
+
+def get_menu_choices(user):
+ """Return a dict of choice:meaning."""
+ choices = {}
+ for facet in universe.categories["menu"][user.state].facets():
+ if facet.startswith("choice_"):
+ choices[facet.split("_", 2)[1]] = universe.categories["menu"][user.state].get(facet)
+ elif facet.startswith("create_"):
+ choices[facet.split("_", 2)[1]] = eval(universe.categories["menu"][user.state].get(facet))
+ return choices
+
+def get_formatted_menu_choices(state, choices):
+ """Returns a formatted string of menu choices."""
+ choice_output = ""
+ choice_keys = choices.keys()
+ choice_keys.sort()
+ for choice in choice_keys:
+ choice_output += " [$(red)" + choice + "$(nrm)] " + choices[choice] + "$(eol)"
+ if choice_output: choice_output += "$(eol)"
+ return choice_output
+
+def get_menu_branches(state):
+ """Return a dict of choice:branch."""
+ branches = {}
+ for facet in universe.categories["menu"][state].facets():
+ if facet.startswith("branch_"):
+ branches[facet.split("_", 2)[1]] = universe.categories["menu"][state].get(facet)
+ return branches
+
+def get_default_branch(state):
+ """Return the default branch."""
+ return universe.categories["menu"][state].get("branch")
+
+def get_choice_branch(user, choice):
+ """Returns the new state matching the given choice."""
+ branches = get_menu_branches(user.state)
+ if not choice: choice = get_default_menu_choice(user.state)
+ if choice in branches.keys(): return branches[choice]
+ elif choice in user.menu_choices.keys(): return get_default_branch(user.state)
+ else: return ""
+
+def get_menu_actions(state):
+ """Return a dict of choice:branch."""
+ actions = {}
+ for facet in universe.categories["menu"][state].facets():
+ if facet.startswith("action_"):
+ actions[facet.split("_", 2)[1]] = universe.categories["menu"][state].get(facet)
+ return actions
+
+def get_default_action(state):
+ """Return the default action."""
+ return universe.categories["menu"][state].get("action")
+
+def get_choice_action(user, choice):
+ """Run any indicated script for the given choice."""
+ actions = get_menu_actions(user.state)
+ if not choice: choice = get_default_menu_choice(user.state)
+ if choice in actions.keys(): return actions[choice]
+ elif choice in user.menu_choices.keys(): return get_default_action(user.state)
+ else: return ""
+
+def handle_user_input(user):
+ """The main handler, branches to a state-specific handler."""
+
+ # check to make sure the state is expected, then call that handler
+ if "handler_" + user.state in globals():
+ exec("handler_" + user.state + "(user)")
+ else:
+ generic_menu_handler(user)
+
+ # since we got input, flag that the menu/prompt needs to be redisplayed
+ user.menu_seen = False
+
+ # if the user's client echo is off, send a blank line for aesthetics
+ if not user.echoing: user.send("", "")
+
+def generic_menu_handler(user):
+ """A generic menu choice handler."""
+
+ # get a lower-case representation of the next line of input
+ if user.input_queue:
+ choice = user.input_queue.pop(0)
+ if choice: choice = choice.lower()
+ else: choice = ""
+
+ # run any script related to this choice
+ exec(get_choice_action(user, choice))
+
+ # move on to the next state or return an error
+ new_state = get_choice_branch(user, choice)
+ if new_state: user.state = new_state
+ else: user.error = "default"
+
+def handler_entering_account_name(user):
+ """Handle the login account name."""
+
+ # get the next waiting line of input
+ input_data = user.input_queue.pop(0)
+
+ # did the user enter anything?
+ if input_data:
+
+ # keep only the first word and convert to lower-case
+ name = input_data.lower()
+
+ # fail if there are non-alphanumeric characters
+ if name != filter(lambda x: x>="0" and x<="9" or x>="a" and x<="z", name):
+ user.error = "bad_name"
+
+ # if that account exists, time to request a password
+ elif name in universe.categories["account"]:
+ user.account = universe.categories["account"][name]
+ user.state = "checking_password"
+
+ # otherwise, this could be a brand new user
+ else:
+ user.account = Element("account:" + name, universe)
+ user.account.set("name", name)
+ log("New user: " + name)
+ user.state = "checking_new_account_name"
+
+ # if the user entered nothing for a name, then buhbye
+ else:
+ user.state = "disconnecting"
+
+def handler_checking_password(user):
+ """Handle the login account password."""
+
+ # get the next waiting line of input
+ input_data = user.input_queue.pop(0)
+
+ # does the hashed input equal the stored hash?
+ if new_md5(user.account.get("name") + input_data).hexdigest() == user.account.get("passhash"):
+
+ # if so, set the username and load from cold storage
+ if not user.replace_old_connections():
+ user.authenticate()
+ user.state = "main_utility"
+
+ # if at first your hashes don't match, try, try again
+ elif user.password_tries < universe.categories["internal"]["general"].getint("password_tries"):
+ user.password_tries += 1
+ user.error = "incorrect"
+
+ # we've exceeded the maximum number of password failures, so disconnect
+ else:
+ user.send("$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)")
+ user.state = "disconnecting"
+
+def handler_checking_new_account_name(user):
+ """Handle input for the new user menu."""
+
+ # get the next waiting line of input
+ input_data = user.input_queue.pop(0)
+
+ # if there's input, take the first character and lowercase it
+ if input_data:
+ choice = input_data.lower()[0]
+
+ # if there's no input, use the default
+ else:
+ choice = get_default_menu_choice(user.state)
+
+ # user selected to disconnect
+ if choice == "d":
+ user.account.delete()
+ user.state = "disconnecting"
+
+ # go back to the login screen
+ elif choice == "g":
+ user.account.delete()
+ user.state = "entering_account_name"
+
+ # new user, so ask for a password
+ elif choice == "n":
+ user.state = "entering_new_password"
+
+ # user entered a non-existent option
+ else:
+ user.error = "default"
+
+def handler_entering_new_password(user):
+ """Handle a new password entry."""
+
+ # get the next waiting line of input
+ input_data = user.input_queue.pop(0)
+
+ # make sure the password is strong--at least one upper, one lower and
+ # one digit, seven or more characters in length
+ if len(input_data) > 6 and len(filter(lambda x: x>="0" and x<="9", input_data)) and len(filter(lambda x: x>="A" and x<="Z", input_data)) and len(filter(lambda x: x>="a" and x<="z", input_data)):
+
+ # hash and store it, then move on to verification
+ user.account.set("passhash", new_md5(user.account.get("name") + input_data).hexdigest())
+ user.state = "verifying_new_password"
+
+ # the password was weak, try again if you haven't tried too many times
+ elif user.password_tries < universe.categories["internal"]["general"].getint("password_tries"):
+ user.password_tries += 1
+ user.error = "weak"
+
+ # too many tries, so adios
+ else:
+ user.send("$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)")
+ user.account.delete()
+ user.state = "disconnecting"
+
+def handler_verifying_new_password(user):
+ """Handle the re-entered new password for verification."""
+
+ # get the next waiting line of input
+ input_data = user.input_queue.pop(0)
+
+ # hash the input and match it to storage
+ if new_md5(user.account.get("name") + input_data).hexdigest() == user.account.get("passhash"):
+ user.authenticate()
+
+ # the hashes matched, so go active
+ if not user.replace_old_connections(): user.state = "main_utility"
+
+ # go back to entering the new password as long as you haven't tried
+ # too many times
+ elif user.password_tries < universe.categories["internal"]["general"].getint("password_tries"):
+ user.password_tries += 1
+ user.error = "differs"
+ user.state = "entering_new_password"
+
+ # otherwise, sayonara
+ else:
+ user.send("$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)")
+ user.account.delete()
+ user.state = "disconnecting"
+
+def handler_active(user):
+ """Handle input for active users."""
+
+ # get the next waiting line of input
+ input_data = user.input_queue.pop(0)
+
+ # split out the command (first word) and parameters (everything else)
+ if input_data.find(" ") > 0:
+ command, parameters = input_data.split(" ", 1)
+ else:
+ command = input_data
+ parameters = ""
+
+ # lowercase the command
+ command = command.lower()
+
+ # the command matches a command word for which we have data
+ if command in universe.categories["command"]:
+ exec(universe.categories["command"][command].get("action"))
+
+ # no data matching the entered command word
+ elif command: command_error(user, command, parameters)
+
+def command_halt(user, command="", parameters=""):
+ """Halt the world."""
+
+ # see if there's a message or use a generic one
+ if parameters: message = "Halting: " + parameters
+ else: message = "User " + user.account.get("name") + " halted the world."
+
+ # let everyone know
+ broadcast(message)
+ log(message)
+
+ # set a flag to terminate the world
+ universe.terminate_world = True
+
+def command_reload(user, command="", parameters=""):
+ """Reload all code modules, configs and data."""
+
+ # let the user know and log
+ user.send("Reloading all code modules, configs and data.")
+ log("User " + user.account.get("name") + " reloaded the world.")
+
+ # set a flag to reload
+ universe.reload_modules = True
+
+def command_quit(user, command="", parameters=""):
+ """Quit the world."""
+ user.state = "disconnecting"
+
+def command_help(user, command="", parameters=""):
+ """List available commands and provide help for commands."""
+
+ # did the user ask for help on a specific command word?
+ if parameters:
+
+ # is the command word one for which we have data?
+ if parameters in universe.categories["command"]:
+
+ # add a description if provided
+ description = universe.categories["command"][parameters].get("description")
+ if not description:
+ description = "(no short description provided)"
+ output = "$(grn)" + parameters + "$(nrm) - " + description + "$(eol)$(eol)"
+
+ # add the help text if provided
+ help_text = universe.categories["command"][parameters].get("help")
+ if not help_text:
+ help_text = "No help is provided for this command."
+ output += help_text
+
+ # no data for the requested command word
+ else:
+ output = "That is not an available command."
+
+ # no specific command word was indicated
+ else:
+
+ # give a sorted list of commands with descriptions if provided
+ output = "These are the commands available to you:$(eol)$(eol)"
+ sorted_commands = universe.categories["command"].keys()
+ sorted_commands.sort()
+ for item in sorted_commands:
+ description = universe.categories["command"][item].get("description")
+ if not description:
+ description = "(no short description provided)"
+ output += " $(grn)" + item + "$(nrm) - " + description + "$(eol)"
+ output += "$(eol)Enter \"help COMMAND\" for help on a command named \"COMMAND\"."
+
+ # send the accumulated output to the user
+ user.send(output)
+
+def command_say(user, command="", parameters=""):
+ """Speak to others in the same room."""
+
+ # check for replacement macros
+ if replace_macros(user, parameters, True) != parameters:
+ user.send("You cannot speak $_(replacement macros).")
+
+ # the user entered a message
+ elif parameters:
+
+ # get rid of quote marks on the ends of the message and
+ # capitalize the first letter
+ message = parameters.strip("\"'`").capitalize()
+
+ # a dictionary of punctuation:action pairs
+ actions = {}
+ for facet in universe.categories["internal"]["language"].facets():
+ if facet.startswith("punctuation_"):
+ action = facet.split("_")[1]
+ for mark in universe.categories["internal"]["language"].get(facet).split():
+ actions[mark] = action
+
+ # match the punctuation used, if any, to an action
+ default_punctuation = universe.categories["internal"]["language"].get("default_punctuation")
+ action = actions[default_punctuation]
+ for mark in actions.keys():
+ if message.endswith(mark) and mark != default_punctuation:
+ action = actions[mark]
+ break
+
+ # if the action is default and there is no mark, add one
+ if action == actions[default_punctuation] and not message.endswith(default_punctuation):
+ message += default_punctuation
+
+ # capitalize a list of words within the message
+ capitalize = universe.categories["internal"]["language"].get("capitalize").split()
+ for word in capitalize:
+ message = message.replace(" " + word + " ", " " + word.capitalize() + " ")
+
+ # tell the room
+ # TODO: we won't be using broadcast once there are actual rooms
+ broadcast(user.account.get("name") + " " + action + "s, \"" + message + "\"")
+
+ # there was no message
+ else:
+ user.send("What do you want to say?")
+
+def command_show(user, command="", parameters=""):
+ """Show program data."""
+ if parameters == "avatars":
+ message = "These are the avatars managed by your account:$(eol)"
+ avatars = user.list_avatar_names()
+ avatars.sort()
+ for avatar in avatars: message += "$(eol) $(grn)" + avatar + "$(nrm)"
+ elif parameters == "files":
+ message = "These are the current files containing the universe:$(eol)"
+ keys = universe.files.keys()
+ keys.sort()
+ for key in keys: message += "$(eol) $(grn)" + key + "$(nrm)"
+ elif parameters == "universe":
+ message = "These are the current elements in the universe:$(eol)"
+ keys = universe.contents.keys()
+ keys.sort()
+ for key in keys: message += "$(eol) $(grn)" + key + "$(nrm)"
+ elif parameters == "time":
+ message = universe.categories["internal"]["counters"].get("elapsed") + " increments elapsed since the world was created."
+ elif parameters: message = "I don't know what \"" + parameters + "\" is."
+ else: message = "What do you want to show?"
+ user.send(message)
+
+def command_error(user, command="", parameters=""):
+ """Generic error for an unrecognized command word."""
+
+ # 90% of the time use a generic error
+ if randrange(10):
+ message = "I'm not sure what \"" + command
+ if parameters:
+ message += " " + parameters
+ message += "\" means..."
+
+ # 10% of the time use the classic diku error
+ else:
+ message = "Arglebargle, glop-glyf!?!"
+
+ # send the error message
+ user.send(message)
+
+# if there is no universe, create an empty one
+if not "universe" in locals(): universe = Universe()
+
+++ /dev/null
-"""Initialization for MUFF Modules"""
-
-# Copyright (c) 2005 mudpy, Jeremy Stanley <fungi@yuggoth.org>, all rights reserved.
-# Licensed per terms in the LICENSE file distributed with this software.
-
-# these are all the modules included in the muff package; if you create
-# another, be sure to add it to this list
-__all__ = [ "muffcmds", "muffmain", "muffmenu", "muffmisc", "muffsock", "muffuniv", "muffuser", "muffvars" ]
-
+++ /dev/null
-"""Command objects for the MUFF Engine"""
-
-# Copyright (c) 2005 mudpy, Jeremy Stanley <fungi@yuggoth.org>, all rights reserved.
-# Licensed per terms in the LICENSE file distributed with this software.
-
-# command data like descriptions, help text, limits, et cetera, are stored in
-# ini-style configuration files supported by the ConfigParser module
-import ConfigParser
-
-# md5 hashing is used for verification of account passwords
-import md5
-
-# the os module is used to we can get a directory listing and build lists of
-# multiple config files in one directory
-import os
-
-# the random module is useful for creating random conditional output
-import random
-
-# string.split is used extensively to tokenize user input (break up command
-# names and parameters)
-import string
-
-# bit of a hack to load all modules in the muff package
-import muff
-for module in muff.__all__:
- exec("import " + module)
-
-def handle_user_input(user):
- """The main handler, branches to a state-specific handler."""
-
- # check to make sure the state is expected, then call that handler
- if "handler_" + user.state in globals():
- exec("handler_" + user.state + "(user)")
- else:
- generic_menu_handler(user)
-
- # since we got input, flag that the menu/prompt needs to be redisplayed
- user.menu_seen = False
-
- # if the user's client echo is off, send a blank line for aesthetics
- if not user.echoing: user.send("", "")
-
-def generic_menu_handler(user):
- """A generic menu choice handler."""
-
- # get a lower-case representation of the next line of input
- if user.input_queue:
- choice = user.input_queue.pop(0)
- if choice: choice = choice.lower()
- else: choice = ""
-
- # run any script related to this choice
- exec(muffmenu.get_choice_action(user, choice))
-
- # move on to the next state or return an error
- new_state = muffmenu.get_choice_branch(user, choice)
- if new_state: user.state = new_state
- else: user.error = "default"
-
-def handler_entering_account_name(user):
- """Handle the login account name."""
-
- # get the next waiting line of input
- input_data = user.input_queue.pop(0)
-
- # did the user enter anything?
- if input_data:
-
- # keep only the first word and convert to lower-case
- name = input_data.lower()
-
- # fail if there are non-alphanumeric characters
- if name != filter(lambda x: x>="0" and x<="9" or x>="a" and x<="z", name):
- user.error = "bad_name"
-
- # if that account exists, time to request a password
- elif name in muffuniv.universe.categories["account"]:
- user.account = muffuniv.universe.categories["account"][name]
- user.state = "checking_password"
-
- # otherwise, this could be a brand new user
- else:
- user.account = muffuniv.Element("account:" + name, muffuniv.universe)
- user.account.set("name", name)
- muffmisc.log("New user: " + name)
- user.state = "checking_new_account_name"
-
- # if the user entered nothing for a name, then buhbye
- else:
- user.state = "disconnecting"
-
-def handler_checking_password(user):
- """Handle the login account password."""
-
- # get the next waiting line of input
- input_data = user.input_queue.pop(0)
-
- # does the hashed input equal the stored hash?
- if md5.new(user.account.get("name") + input_data).hexdigest() == user.account.get("passhash"):
-
- # if so, set the username and load from cold storage
- if not user.replace_old_connections():
- user.authenticate()
- user.state = "main_utility"
-
- # if at first your hashes don't match, try, try again
- elif user.password_tries < muffuniv.universe.categories["internal"]["general"].getint("password_tries"):
- user.password_tries += 1
- user.error = "incorrect"
-
- # we've exceeded the maximum number of password failures, so disconnect
- else:
- user.send("$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)")
- user.state = "disconnecting"
-
-def handler_checking_new_account_name(user):
- """Handle input for the new user menu."""
-
- # get the next waiting line of input
- input_data = user.input_queue.pop(0)
-
- # if there's input, take the first character and lowercase it
- if input_data:
- choice = input_data.lower()[0]
-
- # if there's no input, use the default
- else:
- choice = muffmenu.get_default_menu_choice(user.state)
-
- # user selected to disconnect
- if choice == "d":
- user.account.delete()
- user.state = "disconnecting"
-
- # go back to the login screen
- elif choice == "g":
- user.account.delete()
- user.state = "entering_account_name"
-
- # new user, so ask for a password
- elif choice == "n":
- user.state = "entering_new_password"
-
- # user entered a non-existent option
- else:
- user.error = "default"
-
-def handler_entering_new_password(user):
- """Handle a new password entry."""
-
- # get the next waiting line of input
- input_data = user.input_queue.pop(0)
-
- # make sure the password is strong--at least one upper, one lower and
- # one digit, seven or more characters in length
- if len(input_data) > 6 and len(filter(lambda x: x>="0" and x<="9", input_data)) and len(filter(lambda x: x>="A" and x<="Z", input_data)) and len(filter(lambda x: x>="a" and x<="z", input_data)):
-
- # hash and store it, then move on to verification
- user.account.set("passhash", md5.new(user.account.get("name") + input_data).hexdigest())
- user.state = "verifying_new_password"
-
- # the password was weak, try again if you haven't tried too many times
- elif user.password_tries < muffuniv.universe.categories["internal"]["general"].getint("password_tries"):
- user.password_tries += 1
- user.error = "weak"
-
- # too many tries, so adios
- else:
- user.send("$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)")
- user.account.delete()
- user.state = "disconnecting"
-
-def handler_verifying_new_password(user):
- """Handle the re-entered new password for verification."""
-
- # get the next waiting line of input
- input_data = user.input_queue.pop(0)
-
- # hash the input and match it to storage
- if md5.new(user.account.get("name") + input_data).hexdigest() == user.account.get("passhash"):
- user.authenticate()
-
- # the hashes matched, so go active
- if not user.replace_old_connections(): user.state = "main_utility"
-
- # go back to entering the new password as long as you haven't tried
- # too many times
- elif user.password_tries < muffuniv.universe.categories["internal"]["general"].getint("password_tries"):
- user.password_tries += 1
- user.error = "differs"
- user.state = "entering_new_password"
-
- # otherwise, sayonara
- else:
- user.send("$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)")
- user.account.delete()
- user.state = "disconnecting"
-
-def handler_active(user):
- """Handle input for active users."""
-
- # get the next waiting line of input
- input_data = user.input_queue.pop(0)
-
- # split out the command (first word) and parameters (everything else)
- if input_data.find(" ") > 0:
- command, parameters = input_data.split(" ", 1)
- else:
- command = input_data
- parameters = ""
-
- # lowercase the command
- command = command.lower()
-
- # the command matches a command word for which we have data
- if command in muffuniv.universe.categories["command"]:
- exec(muffuniv.universe.categories["command"][command].get("action"))
-
- # no data matching the entered command word
- elif command: command_error(user, command, parameters)
-
-def command_halt(user, command="", parameters=""):
- """Halt the world."""
-
- # see if there's a message or use a generic one
- if parameters: message = "Halting: " + parameters
- else: message = "User " + user.account.get("name") + " halted the world."
-
- # let everyone know
- muffmisc.broadcast(message)
- muffmisc.log(message)
-
- # set a flag to terminate the world
- muffvars.terminate_world = True
-
-def command_reload(user, command="", parameters=""):
- """Reload all code modules, configs and data."""
-
- # let the user know and log
- user.send("Reloading all code modules, configs and data.")
- muffmisc.log("User " + user.account.get("name") + " reloaded the world.")
-
- # set a flag to reload
- muffvars.reload_modules = True
-
-def command_quit(user, command="", parameters=""):
- """Quit the world."""
- user.state = "disconnecting"
-
-def command_help(user, command="", parameters=""):
- """List available commands and provide help for commands."""
-
- # did the user ask for help on a specific command word?
- if parameters:
-
- # is the command word one for which we have data?
- if parameters in muffuniv.universe.categories["command"]:
-
- # add a description if provided
- description = muffuniv.universe.categories["command"][parameters].get("description")
- if not description:
- description = "(no short description provided)"
- output = "$(grn)" + parameters + "$(nrm) - " + description + "$(eol)$(eol)"
-
- # add the help text if provided
- help_text = muffuniv.universe.categories["command"][parameters].get("help")
- if not help_text:
- help_text = "No help is provided for this command."
- output += help_text
-
- # no data for the requested command word
- else:
- output = "That is not an available command."
-
- # no specific command word was indicated
- else:
-
- # give a sorted list of commands with descriptions if provided
- output = "These are the commands available to you:$(eol)$(eol)"
- sorted_commands = muffuniv.universe.categories["command"].keys()
- sorted_commands.sort()
- for item in sorted_commands:
- description = muffuniv.universe.categories["command"][item].get("description")
- if not description:
- description = "(no short description provided)"
- output += " $(grn)" + item + "$(nrm) - " + description + "$(eol)"
- output += "$(eol)Enter \"help COMMAND\" for help on a command named \"COMMAND\"."
-
- # send the accumulated output to the user
- user.send(output)
-
-def command_say(user, command="", parameters=""):
- """Speak to others in the same room."""
-
- # check for replacement macros
- if muffmisc.replace_macros(user, parameters, True) != parameters:
- user.send("You cannot speak $_(replacement macros).")
-
- # the user entered a message
- elif parameters:
-
- # get rid of quote marks on the ends of the message and
- # capitalize the first letter
- message = parameters.strip("\"'`").capitalize()
-
- # a dictionary of punctuation:action pairs
- actions = {}
- for facet in muffuniv.universe.categories["internal"]["language"].facets():
- if facet.startswith("punctuation_"):
- action = facet.split("_")[1]
- for mark in muffuniv.universe.categories["internal"]["language"].get(facet).split():
- actions[mark] = action
-
- # match the punctuation used, if any, to an action
- default_punctuation = muffuniv.universe.categories["internal"]["language"].get("default_punctuation")
- action = actions[default_punctuation]
- for mark in actions.keys():
- if message.endswith(mark) and mark != default_punctuation:
- action = actions[mark]
- break
-
- # if the action is default and there is no mark, add one
- if action == actions[default_punctuation] and not message.endswith(default_punctuation):
- message += default_punctuation
-
- # capitalize a list of words within the message
- capitalize = muffuniv.universe.categories["internal"]["language"].get("capitalize").split()
- for word in capitalize:
- message = message.replace(" " + word + " ", " " + word.capitalize() + " ")
-
- # tell the room
- # TODO: we won't be using broadcast once there are actual rooms
- muffmisc.broadcast(user.account.get("name") + " " + action + "s, \"" + message + "\"")
-
- # there was no message
- else:
- user.send("What do you want to say?")
-
-def command_show(user, command="", parameters=""):
- """Show program data."""
- if parameters == "avatars":
- message = "These are the avatars managed by your account:$(eol)"
- avatars = user.list_avatar_names()
- avatars.sort()
- for avatar in avatars: message += "$(eol) $(grn)" + avatar + "$(nrm)"
- elif parameters == "files":
- message = "These are the current files containing the universe:$(eol)"
- keys = muffuniv.universe.files.keys()
- keys.sort()
- for key in keys: message += "$(eol) $(grn)" + key + "$(nrm)"
- elif parameters == "universe":
- message = "These are the current elements in the universe:$(eol)"
- keys = muffuniv.universe.contents.keys()
- keys.sort()
- for key in keys: message += "$(eol) $(grn)" + key + "$(nrm)"
- elif parameters == "time":
- message = muffuniv.universe.categories["internal"]["counters"].get("elapsed") + " increments elapsed since the world was created."
- elif parameters: message = "I don't know what \"" + parameters + "\" is."
- else: message = "What do you want to show?"
- user.send(message)
-
-def command_error(user, command="", parameters=""):
- """Generic error for an unrecognized command word."""
-
- # 90% of the time use a generic error
- if random.randrange(10):
- message = "I'm not sure what \"" + command
- if parameters:
- message += " " + parameters
- message += "\" means..."
-
- # 10% of the time use the classic diku error
- else:
- message = "Arglebargle, glop-glyf!?!"
-
- # send the error message
- user.send(message)
-
+++ /dev/null
-"""Main loop for the MUFF Engine"""
-
-# Copyright (c) 2005 mudpy, Jeremy Stanley <fungi@yuggoth.org>, all rights reserved.
-# Licensed per terms in the LICENSE file distributed with this software.
-
-# string.strip is used to clean up leading/trailing whitespace in user input
-import string
-
-# time.sleep is used in the loop to save cpu and provide crude pulse timing
-import time
-
-# hack to load all modules in the muff package
-import muff
-for module in muff.__all__:
- exec("import " + module)
-
-def main():
- """The main loop."""
-
- # loop indefinitely while the world is not flagged for termination or
- # there are connected users
- while not muffvars.terminate_world or muffvars.userlist:
-
- # the world was flagged for a reload of all code/data
- if muffvars.reload_modules:
-
- # reload the muff package
- reload(muff)
-
- # reload all modules listed in the muff package
- for module in muff.__all__:
- exec("reload(muff." + module + ")")
-
- # move data into new persistent objects
- muffmisc.reload_data()
-
- # reset the reload flag
- muffvars.reload_modules = False
-
- # do what needs to be done on each pulse
- muffmisc.on_pulse()
-
- # the loop has terminated, so save persistent data
- muffuniv.universe.save()
-
- # log a final message
- muffmisc.log("Shutting down now.")
-
+++ /dev/null
-"""Menu objects for the MUFF Engine"""
-
-# Copyright (c) 2005 mudpy, Jeremy Stanley <fungi@yuggoth.org>, all rights reserved.
-# Licensed per terms in the LICENSE file distributed with this software.
-
-# muff uses menu data stored in ini-style files supported by ConfigParser
-import ConfigParser
-
-# hack to load all modules in the muff package
-import muff
-for module in muff.__all__:
- exec("import " + module)
-
-def get_menu(state, error=None, echoing=True, choices={}):
- """Show the correct menu text to a user."""
-
- # begin with a telnet echo command sequence if needed
- message = get_echo_sequence(state, echoing)
-
- # get the description or error text
- message += get_menu_description(state, error)
-
- # get menu choices for the current state
- message += get_formatted_menu_choices(state, choices)
-
- # try to get a prompt, if it was defined
- message += get_menu_prompt(state)
-
- # throw in the default choice, if it exists
- message += get_formatted_default_menu_choice(state)
-
- # display a message indicating if echo is off
- message += get_echo_message(state)
-
- # return the assembly of various strings defined above
- return message
-
-def menu_echo_on(state):
- """True if echo is on, false if it is off."""
- return muffuniv.universe.categories["menu"][state].getboolean("echo", True)
-
-def get_echo_sequence(state, echoing):
- """Build the appropriate IAC ECHO sequence as needed."""
-
- # if the user has echo on and the menu specifies it should be turned
- # off, send: iac + will + echo + null
- if echoing and not menu_echo_on(state): return chr(255) + chr(251) + chr(1) + chr(0)
-
- # if echo is not set to off in the menu and the user curently has echo
- # off, send: iac + wont + echo + null
- elif not echoing and menu_echo_on(state): return chr(255) + chr(252) + chr(1) + chr(0)
-
- # default is not to send an echo control sequence at all
- else: return ""
-
-def get_echo_message(state):
- """Return a message indicating that echo is off."""
- if menu_echo_on(state): return ""
- else: return "(won't echo) "
-
-def get_default_menu_choice(state):
- """Return the default choice for a menu."""
- return muffuniv.universe.categories["menu"][state].get("default")
-
-def get_formatted_default_menu_choice(state):
- """Default menu choice foratted for inclusion in a prompt string."""
- default = get_default_menu_choice(state)
- if default: return "[$(red)" + default + "$(nrm)] "
- else: return ""
-
-def get_menu_description(state, error):
- """Get the description or error text."""
-
- # an error condition was raised by the handler
- if error:
-
- # try to get an error message matching the condition
- # and current state
- description = muffuniv.universe.categories["menu"][state].get("error_" + error)
- if not description: description = "That is not a valid choice..."
- description = "$(red)" + description + "$(nrm)"
-
- # there was no error condition
- else:
-
- # try to get a menu description for the current state
- description = muffuniv.universe.categories["menu"][state].get("description")
-
- # return the description or error message
- if description: description += "$(eol)$(eol)"
- return description
-
-def get_menu_prompt(state):
- """Try to get a prompt, if it was defined."""
- prompt = muffuniv.universe.categories["menu"][state].get("prompt")
- if prompt: prompt += " "
- return prompt
-
-def get_menu_choices(user):
- """Return a dict of choice:meaning."""
- choices = {}
- for facet in muffuniv.universe.categories["menu"][user.state].facets():
- if facet.startswith("choice_"):
- choices[facet.split("_", 2)[1]] = muffuniv.universe.categories["menu"][user.state].get(facet)
- elif facet.startswith("create_"):
- choices[facet.split("_", 2)[1]] = eval(muffuniv.universe.categories["menu"][user.state].get(facet))
- return choices
-
-def get_formatted_menu_choices(state, choices):
- """Returns a formatted string of menu choices."""
- choice_output = ""
- choice_keys = choices.keys()
- choice_keys.sort()
- for choice in choice_keys:
- choice_output += " [$(red)" + choice + "$(nrm)] " + choices[choice] + "$(eol)"
- if choice_output: choice_output += "$(eol)"
- return choice_output
-
-def get_menu_branches(state):
- """Return a dict of choice:branch."""
- branches = {}
- for facet in muffuniv.universe.categories["menu"][state].facets():
- if facet.startswith("branch_"):
- branches[facet.split("_", 2)[1]] = muffuniv.universe.categories["menu"][state].get(facet)
- return branches
-
-def get_default_branch(state):
- """Return the default branch."""
- return muffuniv.universe.categories["menu"][state].get("branch")
-
-def get_choice_branch(user, choice):
- """Returns the new state matching the given choice."""
- branches = get_menu_branches(user.state)
- if not choice: choice = get_default_menu_choice(user.state)
- if choice in branches.keys(): return branches[choice]
- elif choice in user.menu_choices.keys(): return get_default_branch(user.state)
- else: return ""
-
-def get_menu_actions(state):
- """Return a dict of choice:branch."""
- actions = {}
- for facet in muffuniv.universe.categories["menu"][state].facets():
- if facet.startswith("action_"):
- actions[facet.split("_", 2)[1]] = muffuniv.universe.categories["menu"][state].get(facet)
- return actions
-
-def get_default_action(state):
- """Return the default action."""
- return muffuniv.universe.categories["menu"][state].get("action")
-
-def get_choice_action(user, choice):
- """Run any indicated script for the given choice."""
- actions = get_menu_actions(user.state)
- if not choice: choice = get_default_menu_choice(user.state)
- if choice in actions.keys(): return actions[choice]
- elif choice in user.menu_choices.keys(): return get_default_action(user.state)
- else: return ""
-
+++ /dev/null
-"""Miscellaneous objects for the MUFF Engine"""
-
-# Copyright (c) 2005 mudpy, Jeremy Stanley <fungi@yuggoth.org>, all rights reserved.
-# Licensed per terms in the LICENSE file distributed with this software.
-
-import ConfigParser
-
-# used by several functions for random calls
-import random
-
-# random_name uses string.strip
-import string
-
-# the log function uses time.asctime for creating timestamps
-import time
-
-# hack to load all modules in the muff package
-import muff
-for module in muff.__all__:
- exec("import " + module)
-
-def broadcast(message):
- """Send a message to all connected users."""
- for each_user in muffvars.userlist: each_user.send("$(eol)" + message)
-
-def log(message):
- """Log a message."""
-
- # the time in posix log timestamp format
- timestamp = time.asctime()[4:19]
-
- # send the timestamp and message to standard output
- print(timestamp + " " + message)
-
-def wrap_ansi_text(text, width):
- """Wrap text with arbitrary width while ignoring ANSI colors."""
-
- # the current position in the entire text string, including all
- # characters, printable or otherwise
- absolute_position = 0
-
- # the current text position relative to the begining of the line,
- # ignoring color escape sequences
- relative_position = 0
-
- # whether the current character is part of a color escape sequence
- escape = False
-
- # iterate over each character from the begining of the text
- for each_character in text:
-
- # the current character is the escape character
- if each_character == chr(27):
- escape = True
-
- # the current character is within an escape sequence
- elif escape:
-
- # the current character is m, which terminates the
- # current escape sequence
- if each_character == "m":
- escape = False
-
- # the current character is a newline, so reset the relative
- # position (start a new line)
- elif each_character == "\n":
- relative_position = 0
-
- # the current character meets the requested maximum line width,
- # so we need to backtrack and find a space at which to wrap
- elif relative_position == width:
-
- # distance of the current character examined from the
- # relative position
- wrap_offset = 0
-
- # count backwards until we find a space
- while text[absolute_position - wrap_offset] != " ":
- wrap_offset += 1
-
- # insert an eol in place of the space
- text = text[:absolute_position - wrap_offset] + "\r\n" + text[absolute_position - wrap_offset + 1:]
-
- # increase the absolute position because an eol is two
- # characters but the space it replaced was only one
- absolute_position += 1
-
- # now we're at the begining of a new line, plus the
- # number of characters wrapped from the previous line
- relative_position = wrap_offset
-
- # as long as the character is not a carriage return and the
- # other above conditions haven't been met, count it as a
- # printable character
- elif each_character != "\r":
- relative_position += 1
-
- # increase the absolute position for every character
- absolute_position += 1
-
- # return the newly-wrapped text
- return text
-
-def weighted_choice(data):
- """Takes a dict weighted by value and returns a random key."""
-
- # this will hold our expanded list of keys from the data
- expanded = []
-
- # create thee expanded list of keys
- for key in data.keys():
- for count in range(data[key]):
- expanded.append(key)
-
- # return one at random
- return random.choice(expanded)
-
-def random_name():
- """Returns a random character name."""
-
- # the vowels and consonants needed to create romaji syllables
- vowels = [ "a", "i", "u", "e", "o" ]
- consonants = ["'", "k", "z", "s", "sh", "z", "j", "t", "ch", "ts", "d", "n", "h", "f", "m", "y", "r", "w" ]
-
- # this dict will hold our weighted list of syllables
- syllables = {}
-
- # generate the list with an even weighting
- for consonant in consonants:
- for vowel in vowels:
- syllables[consonant + vowel] = 1
-
- # we'll build the name into this string
- name = ""
-
- # create a name of random length from the syllables
- for syllable in range(random.randrange(2, 6)):
- name += weighted_choice(syllables)
-
- # strip any leading quotemark, capitalize and return the name
- return string.strip(name, "'").capitalize()
-
-def replace_macros(user, text, is_input=False):
- """Replaces macros in text output."""
- while True:
- macro_start = string.find(text, "$(")
- if macro_start == -1: break
- macro_end = string.find(text, ")", macro_start) + 1
- macro = text[macro_start:macro_end]
- if macro in muffvars.macros.keys():
- text = string.replace(text, macro, muffvars.macros[macro])
-
- # the user's account name
- elif macro == "$(account)":
- text = string.replace(text, macro, user.account.get("name"))
-
- # third person subjective pronoun
- elif macro == "$(tpsp)":
- if user.avatar.get("gender") == "male":
- text = string.replace(text, macro, "he")
- elif user.avatar.get("gender") == "female":
- text = string.replace(text, macro, "she")
- else:
- text = string.replace(text, macro, "it")
-
- # third person objective pronoun
- elif macro == "$(tpop)":
- if user.avatar.get("gender") == "male":
- text = string.replace(text, macro, "him")
- elif user.avatar.get("gender") == "female":
- text = string.replace(text, macro, "her")
- else:
- text = string.replace(text, macro, "it")
-
- # third person possessive pronoun
- elif macro == "$(tppp)":
- if user.avatar.get("gender") == "male":
- text = string.replace(text, macro, "his")
- elif user.avatar.get("gender") == "female":
- text = string.replace(text, macro, "hers")
- else:
- text = string.replace(text, macro, "its")
-
- # if we get here, log and replace it with null
- else:
- text = string.replace(text, macro, "")
- if not is_input:
- log("Unexpected replacement macro " + macro + " encountered.")
-
- # replace the look-like-a-macro sequence
- text = string.replace(text, "$_(", "$(")
-
- return text
-
-def check_time(frequency):
- """Check for a factor of the current increment count."""
- if type(frequency) is str:
- frequency = muffuniv.universe.categories["internal"]["time"].getint(frequency)
- if not "counters" in muffuniv.universe.categories["internal"]:
- muffuniv.Element("internal:counters", muffuniv.universe)
- return not muffuniv.universe.categories["internal"]["counters"].getint("elapsed") % frequency
-
-def on_pulse():
- """The things which should happen on each pulse, aside from reloads."""
-
- # open the listening socket if it hasn't been already
- if not muffvars.newsocket: muffsock.initialize_server_socket()
-
- # assign a user if a new connection is waiting
- user = muffsock.check_for_connection(muffvars.newsocket)
- if user: muffvars.userlist.append(user)
-
- # iterate over the connected users
- for user in muffvars.userlist: user.pulse()
-
- # update the log every now and then
- if check_time("frequency_log"):
- log(repr(len(muffvars.userlist)) + " connection(s)")
-
- # periodically save everything
- if check_time("frequency_save"):
- muffuniv.universe.save()
-
- # pause for a configurable amount of time (decimal seconds)
- time.sleep(muffuniv.universe.categories["internal"]["time"].getfloat("increment"))
-
- # increment the elapsed increment counter
- muffuniv.universe.categories["internal"]["counters"].set("elapsed", muffuniv.universe.categories["internal"]["counters"].getint("elapsed") + 1)
-
-def reload_data():
- """Reload data into new persistent objects."""
-
- # reload the users
- temporary_userlist = []
- for user in muffvars.userlist: temporary_userlist.append(user)
- for user in temporary_userlist: user.reload()
- del(temporary_userlist)
-
+++ /dev/null
-"""Socket objects for the MUFF Engine"""
-
-# Copyright (c) 2005 mudpy, Jeremy Stanley <fungi@yuggoth.org>, all rights reserved.
-# Licensed per terms in the LICENSE file distributed with this software.
-
-# need socket.socket for new connection objects and the server's listener
-import socket
-
-# hack to load all modules in the muff package
-import muff
-import muffuniv
-for module in muff.__all__:
- exec("import " + module)
-
-def check_for_connection(newsocket):
- """Check for a waiting connection and return a new user object."""
-
- # try to accept a new connection
- try:
- connection, address = newsocket.accept()
- except:
- return None
-
- # note that we got one
- muffmisc.log("Connection from " + address[0])
-
- # disable blocking so we can proceed whether or not we can send/receive
- connection.setblocking(0)
-
- # create a new user object
- user = muffuser.User()
-
- # associate this connection with it
- user.connection = connection
-
- # set the user's ipa from the connection's ipa
- user.address = address[0]
-
- # return the new user object
- return user
-
-def initialize_server_socket():
- """Create and open the listening socket."""
-
- # create a new ipv4 stream-type socket object
- newsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-
- # set the socket options to allow existing open ones to be reused
- # (fixes a bug where the server can't bind for a minute when restarting
- # on linux systems)
- newsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
- # bind the socket to to our desired server ipa and port
- newsocket.bind((muffuniv.universe.categories["internal"]["network"].get("host"), muffuniv.universe.categories["internal"]["network"].getint("port")))
-
- # disable blocking so we can proceed whether or not we can send/receive
- newsocket.setblocking(0)
-
- # start listening on the socket
- newsocket.listen(1)
-
- # note that we're now ready for user connections
- muffmisc.log("Waiting for connection(s)...")
-
- # store this in a globally-accessible place
- muffvars.newsocket = newsocket
-
+++ /dev/null
-# Copyright (c) 2005 mudpy, Jeremy Stanley <fungi@yuggoth.org>, all rights reserved.
-# Licensed per terms in the LICENSE file distributed with this software.
-
-# persistent variables are stored in ini-style files supported by ConfigParser
-import ConfigParser
-
-# need to know the directory separator
-import os
-
-# hack to load all modules in the muff package
-import muff
-for module in muff.__all__:
- exec("import " + module)
-
-class Element:
- """An element of the universe."""
- def __init__(self, key, universe, origin=""):
- """Default values for the in-memory element variables."""
- self.key = key
- if self.key.find(":") > 0:
- self.category, self.subkey = self.key.split(":", 1)
- else:
- self.category = "other"
- self.subkey = self.key
- if not self.category in universe.categories: self.category = "other"
- universe.categories[self.category][self.subkey] = self
- self.origin = origin
- if not self.origin: self.origin = universe.default_origins[self.category]
- if not self.origin.startswith(os.sep):
- self.origin = os.getcwd() + os.sep + self.origin
- universe.contents[self.key] = self
- if not self.origin in universe.files:
- DataFile(self.origin, universe)
- if not universe.files[self.origin].data.has_section(self.key):
- universe.files[self.origin].data.add_section(self.key)
- def delete(self):
- muffmisc.log("Deleting: " + self.key + ".")
- universe.files[self.origin].data.remove_section(self.key)
- del universe.categories[self.category][self.subkey]
- del universe.contents[self.key]
- del self
- def facets(self):
- """Return a list of facets for this element."""
- return universe.files[self.origin].data.options(self.key)
- def get(self, facet):
- """Retrieve values."""
- if universe.files[self.origin].data.has_option(self.key, facet):
- return universe.files[self.origin].data.get(self.key, facet)
- else:
- return ""
- def getboolean(self, facet, default=False):
- """Retrieve values as boolean type."""
- if universe.files[self.origin].data.has_option(self.key, facet):
- return universe.files[self.origin].data.getboolean(self.key, facet)
- else:
- return default
- def getint(self, facet):
- """Convenience method to coerce return values as type int."""
- value = self.get(facet)
- if not value: value = 0
- elif type(value) is str: value = value.rstrip("L")
- return int(value)
- def getfloat(self, facet):
- """Convenience method to coerce return values as type float."""
- value = self.get(facet)
- if not value: value = 0
- elif type(value) is str: value = value.rstrip("L")
- return float(value)
- def set(self, facet, value):
- """Set values."""
- if not type(value) is str: value = repr(value)
- universe.files[self.origin].data.set(self.key, facet, value)
-
-class DataFile:
- """A file containing universe elements."""
- def __init__(self, filename, universe):
- filedir = os.sep.join(filename.split(os.sep)[:-1])
- self.data = ConfigParser.SafeConfigParser()
- if os.access(filename, os.R_OK): self.data.read(filename)
- self.filename = filename
- universe.files[filename] = self
- if "categories" in self.data.sections():
- for option in self.data.options("categories"):
- universe.default_origins[option] = self.data.get("categories", option)
- if not option in universe.categories:
- universe.categories[option] = {}
- for section in self.data.sections():
- if section == "categories" or section == "include":
- for option in self.data.options(section):
- includefile = self.data.get(section, option)
- if not includefile.startswith(os.sep):
- includefile = filedir + os.sep + includefile
- DataFile(includefile, universe)
- elif section != "control":
- Element(section, universe, filename)
- def save(self):
- if self.data.sections() and not ( "control" in self.data.sections() and self.data.getboolean("control", "read_only") ):
- basedir = os.sep.join(self.filename.split(os.sep)[:-1])
- if not os.access(basedir, os.F_OK): os.makedirs(basedir)
- file_descriptor = file(self.filename, "w")
- self.data.write(file_descriptor)
- file_descriptor.flush()
- file_descriptor.close()
-
-class Universe:
- """The universe."""
- def __init__(self, filename=""):
- """Initialize the universe."""
- self.categories = {}
- self.contents = {}
- self.default_origins = {}
- self.files = {}
- if not filename:
- possible_filenames = [
- ".mudpyrc",
- ".mudpy/mudpyrc",
- ".mudpy/mudpy.conf",
- "mudpy.conf",
- "etc/mudpy.conf",
- "/usr/local/mudpy/mudpy.conf",
- "/usr/local/mudpy/etc/mudpy.conf",
- "/etc/mudpy/mudpy.conf",
- "/etc/mudpy.conf"
- ]
- for filename in possible_filenames:
- if os.access(filename, os.R_OK): break
- if not filename.startswith(os.sep):
- filename = os.getcwd() + os.sep + filename
- DataFile(filename, self)
- def save(self):
- """Save the universe to persistent storage."""
- for key in self.files: self.files[key].save()
-
-# if there is no universe, create an empty one
-if not "universe" in locals(): universe = Universe()
-
+++ /dev/null
-"""User objects for the MUFF Engine"""
-
-# Copyright (c) 2005 mudpy, Jeremy Stanley <fungi@yuggoth.org>, all rights reserved.
-# Licensed per terms in the LICENSE file distributed with this software.
-
-# user accounts are stored in ini-style files supported by ConfigParser
-import ConfigParser
-
-# os is used to test for existence of the account dir and, if necessary, make it
-import os
-
-# string.replace is used to perform substitutions for color codes and the like
-import string
-
-# hack to load all modules in the muff package
-import muff
-for module in muff.__all__:
- exec("import " + module)
-
-class User:
- """This is a connected user."""
-
- def __init__(self):
- """Default values for the in-memory user variables."""
- self.address = ""
- self.last_address = ""
- self.connection = None
- self.authenticated = False
- self.password_tries = 1
- self.state = "entering_account_name"
- self.menu_seen = False
- self.error = ""
- self.input_queue = []
- self.output_queue = []
- self.partial_input = ""
- self.echoing = True
- self.avatar = None
- self.account = None
-
- def quit(self):
- """Log, save, close the connection and remove."""
- name = self.account.get("name")
- if name: message = "User " + name
- else: message = "An unnamed user"
- message += " logged out."
- muffmisc.log(message)
- self.connection.close()
- self.remove()
-
- def reload(self):
- """Save, load a new user and relocate the connection."""
-
- # unauthenticated connections get the boot
- if not self.authenticated:
- muffmisc.log("An unauthenticated user was disconnected during reload.")
- self.state = "disconnecting"
-
- # authenticated users
- else:
-
- # save and get out of the list
- self.save()
- self.remove()
-
- # create a new user object
- new_user = muffuser.User()
-
- # give it the same name
- new_user.account.set("name", self.account.get("name"))
-
- # set everything else equivalent
- new_user.address = self.address
- new_user.last_address = self.last_address
- new_user.connection = self.connection
- new_user.authenticated = self.authenticated
- new_user.password_tries = self.password_tries
- new_user.state = self.state
- new_user.menu_seen = self.menu_seen
- new_user.error = self.error
- new_user.input_queue = self.input_queue
- new_user.output_queue = self.output_queue
- new_user.partial_input = self.partial_input
- new_user.echoing = self.echoing
-
- # add it to the list
- muffvars.userlist.append(new_user)
-
- # get rid of the old user object
- del(self)
-
- def replace_old_connections(self):
- """Disconnect active users with the same name."""
-
- # the default return value
- return_value = False
-
- # iterate over each user in the list
- for old_user in muffvars.userlist:
-
- # the name is the same but it's not us
- if old_user.account.get("name") == self.account.get("name") and old_user is not self:
-
- # make a note of it
- muffmisc.log("User " + self.account.get("name") + " reconnected--closing old connection to " + old_user.address + ".")
- old_user.send("$(eol)$(red)New connection from " + self.address + ". Terminating old connection...$(nrm)$(eol)")
- self.send("$(eol)$(red)Taking over old connection from " + old_user.address + ".$(nrm)")
-
- # close the old connection
- old_user.connection.close()
-
- # replace the old connection with this one
- old_user.connection = self.connection
- old_user.last_address = old_user.address
- old_user.address = self.address
- old_user.echoing = self.echoing
-
- # take this one out of the list and delete
- self.remove()
- del(self)
- return_value = True
- break
-
- # true if an old connection was replaced, false if not
- return return_value
-
- def authenticate(self):
- """Flag the user as authenticated and disconnect duplicates."""
- if not self.state is "authenticated":
- muffmisc.log("User " + self.account.get("name") + " logged in.")
- self.authenticated = True
-
- def show_menu(self):
- """Send the user their current menu."""
- if not self.menu_seen:
- self.menu_choices = muffmenu.get_menu_choices(self)
- self.send(muffmenu.get_menu(self.state, self.error, self.echoing, self.menu_choices), "")
- self.menu_seen = True
- self.error = False
- self.adjust_echoing()
-
- def adjust_echoing(self):
- """Adjust echoing to match state menu requirements."""
- if self.echoing and not muffmenu.menu_echo_on(self.state): self.echoing = False
- elif not self.echoing and muffmenu.menu_echo_on(self.state): self.echoing = True
-
- def remove(self):
- """Remove a user from the list of connected users."""
- muffvars.userlist.remove(self)
-
- def send(self, output, eol="$(eol)"):
- """Send arbitrary text to a connected user."""
-
- # only when there is actual output
- #if output:
-
- # start with a newline, append the message, then end
- # with the optional eol string passed to this function
- # and the ansi escape to return to normal text
- output = "\r\n" + output + eol + chr(27) + "[0m"
-
- # find and replace macros in the output
- output = muffmisc.replace_macros(self, output)
-
- # wrap the text at 80 characters
- # TODO: prompt user for preferred wrap width
- output = muffmisc.wrap_ansi_text(output, 80)
-
- # drop the formatted output into the output queue
- self.output_queue.append(output)
-
- # try to send the last item in the queue, remove it and
- # flag that menu display is not needed
- try:
- self.connection.send(self.output_queue[0])
- self.output_queue.remove(self.output_queue[0])
- self.menu_seen = False
-
- # but if we can't, that's okay too
- except:
- pass
-
- def pulse(self):
- """All the things to do to the user per increment."""
-
- # if the world is terminating, disconnect
- if muffvars.terminate_world:
- self.state = "disconnecting"
- self.menu_seen = False
-
- # show the user a menu as needed
- self.show_menu()
-
- # disconnect users with the appropriate state
- if self.state == "disconnecting":
- self.quit()
-
- # the user is unique and not flagged to disconnect
- else:
-
- # check for input and add it to the queue
- self.enqueue_input()
-
- # there is input waiting in the queue
- if self.input_queue: muffcmds.handle_user_input(self)
-
- def enqueue_input(self):
- """Process and enqueue any new input."""
-
- # check for some input
- try:
- input_data = self.connection.recv(1024)
- except:
- input_data = ""
-
- # we got something
- if input_data:
-
- # tack this on to any previous partial
- self.partial_input += input_data
-
- # separate multiple input lines
- new_input_lines = self.partial_input.split("\n")
-
- # if input doesn't end in a newline, replace the
- # held partial input with the last line of it
- if not self.partial_input.endswith("\n"):
- self.partial_input = new_input_lines.pop()
-
- # otherwise, chop off the extra null input and reset
- # the held partial input
- else:
- new_input_lines.pop()
- self.partial_input = ""
-
- # iterate over the remaining lines
- for line in new_input_lines:
-
- # filter out non-printables
- line = filter(lambda x: x>=' ' and x<='~', line)
-
- # strip off extra whitespace
- line = line.strip()
-
- # put on the end of the queue
- self.input_queue.append(line)
-
- def new_avatar(self):
- """Instantiate a new, unconfigured avatar for this user."""
- counter = muffuniv.universe.categories["internal"]["counters"].getint("next_avatar")
- while "avatar:" + repr(counter + 1) in muffuniv.universe.categories["actor"].keys(): counter += 1
- muffuniv.universe.categories["internal"]["counters"].set("next_avatar", counter + 1)
- self.avatar = muffuniv.Element("actor:avatar:" + repr(counter), muffuniv.universe)
- avatars = self.account.get("avatars").split()
- avatars.append(self.avatar.key)
- self.account.set("avatars", " ".join(avatars))
-
- def list_avatar_names(self):
- """A test function to list names of assigned avatars."""
- try:
- avatars = self.account.get("avatars").split()
- except:
- avatars = []
- avatar_names = []
- for avatar in avatars:
- avatar_names.append(muffuniv.universe.contents[avatar].get("name"))
- return avatar_names
-
+++ /dev/null
-"""Global variable objects for the MUFF Engine"""
-
-# Copyright (c) 2005 mudpy, Jeremy Stanley <fungi@yuggoth.org>, all rights reserved.
-# Licensed per terms in the LICENSE file distributed with this software.
-
-# persistent variables are stored in ini-style files supported by ConfigParser
-import ConfigParser
-
-# need to be able to create the variable save file's directory
-import os
-
-# hack to load all modules in the muff package
-import muff
-for module in muff.__all__:
- exec("import " + module)
-
-# if there is no userlist, create an empty one
-try:
- if userlist: pass
-except NameError:
- userlist = []
-
-# if there is no listening socket, create an empty one
-try:
- if newsocket: pass
-except NameError:
- newsocket = None
-
-# flag to raise when the world should be shut down
-terminate_world = False
-
-# flag to raise when all code modules, config and data should be reloaded
-reload_modules = False
-
-# a dict of replacement macros
-macros = {
- "$(eol)": "\r\n",
- "$(bld)": chr(27) + "[1m",
- "$(nrm)": chr(27) + "[0m",
- "$(blk)": chr(27) + "[30m",
- "$(grn)": chr(27) + "[32m",
- "$(red)": chr(27) + "[31m"
- }
-
--- /dev/null
+[prop:example_prop]
+name = The Example Prop
+
+[actor:example_actor]
+name = The Example Actor
+
+[location:0:0:0:0]
+name = The Origin of the Universe
+