"""Core objects for the mudpy engine."""
-# Copyright (c) 2006 mudpy, Jeremy Stanley <fungi@yuggoth.org>, all rights reserved.
-# Licensed per terms in the LICENSE file distributed with this software.
+# Copyright (c) 2004-2008 Jeremy Stanley <fungi@yuggoth.org>. Permission
+# to use, copy, modify, and distribute this software is granted under
+# terms provided in the LICENSE file distributed with this software.
# import some things we need
from ConfigParser import RawConfigParser
from signal import SIGHUP, SIGTERM, signal
from socket import AF_INET, SO_REUSEADDR, SOCK_STREAM, SOL_SOCKET, socket
from stat import S_IMODE, ST_MODE
+from string import digits, letters, punctuation, uppercase
from sys import argv, stderr
from syslog import LOG_PID, LOG_INFO, LOG_DAEMON, closelog, openlog, syslog
from telnetlib import DO, DONT, ECHO, EOR, GA, IAC, LINEMODE, SB, SE, SGA, WILL, WONT
element.clean_contents()
def new(self):
+ """Create a new, empty Universe (the Big Bang)."""
new_universe = Universe()
for attribute in vars(self).keys():
exec("new_universe." + attribute + " = self." + attribute)
self.error = ""
self.input_queue = []
self.last_address = ""
+ self.last_input = universe.get_time()
self.menu_choices = {}
self.menu_seen = False
self.negotiation_pause = 0
self.connection.close()
self.remove()
+ def check_idle(self):
+ """Warn or disconnect idle users as appropriate."""
+ idletime = universe.get_time() - self.last_input
+ linkdead_dict = universe.categories["internal"]["time"].getdict("linkdead")
+ if self.state in linkdead_dict: linkdead_state = self.state
+ else: linkdead_state = "default"
+ if idletime > linkdead_dict[linkdead_state]:
+ self.send("$(eol)$(red)You've done nothing for far too long... goodbye!$(nrm)$(eol)", flush=True, add_prompt=False)
+ logline = "Disconnecting "
+ if self.account and self.account.get("name"): logline += self.account.get("name")
+ else: logline += "an unknown user"
+ logline += " after idling too long in a " + self.state + " state."
+ log(logline, 2)
+ self.state = "disconnecting"
+ self.menu_seen = False
+ idle_dict = universe.categories["internal"]["time"].getdict("idle")
+ if self.state in idle_dict: idle_state = self.state
+ else: idle_state = "default"
+ if idletime == idle_dict[idle_state]:
+ self.send("$(eol)$(red)If you continue to be unproductive, you'll be shown the door...$(nrm)$(eol)")
+
def reload(self):
"""Save, load a new user and relocate the connection."""
for old_user in universe.userlist:
# the name is the same but it's not us
- if old_user.account.get("name") == self.account.get("name") and old_user is not self:
+ if hasattr(old_user, "account") and old_user.account and old_user.account.get("name") == self.account.get("name") and old_user is not self:
# make a note of it
log("User " + self.account.get("name") + " reconnected--closing old connection to " + old_user.address + ".", 2)
# tack on a prompt if active
if self.state == "active":
if not just_prompt: output += "$(eol)"
- if add_prompt: output += "> "
+ if add_prompt:
+ output += "> "
+ mode = self.avatar.get("mode")
+ if mode: output += "(" + mode + ") "
# find and replace macros in the output
output = replace_macros(self, output)
- # wrap the text at 80 characters
- output = wrap_ansi_text(output, 80)
+ # wrap the text at 79 characters
+ output = wrap_ansi_text(output, 79)
# tack the terminator back on
if terminate: output += self.terminator
self.state = "disconnecting"
self.menu_seen = False
+ # check for an idle connection and act appropriately
+ else: self.check_idle()
+
# if output is paused, decrement the counter
if self.state == "initial":
if self.negotiation_pause: self.negotiation_pause -= 1
"""Have the active avatar leave the world."""
if self.avatar:
current = self.avatar.get("location")
- self.avatar.set("default_location", current)
- self.avatar.echo_to_location("You suddenly wonder where " + self.avatar.get("name") + " went.")
- del universe.contents[current].contents[self.avatar.key]
- self.avatar.remove_facet("location")
+ if current:
+ self.avatar.set("default_location", current)
+ self.avatar.echo_to_location("You suddenly wonder where " + self.avatar.get("name") + " went.")
+ del universe.contents[current].contents[self.avatar.key]
+ self.avatar.remove_facet("location")
self.avatar.owner = None
self.avatar = None
# a couple references we need
file_name = universe.categories["internal"]["logging"].get("file")
- if not isabs(file_name):
- file_name = path_join(universe.startdir, file_name)
max_log_lines = universe.categories["internal"]["logging"].getint("max_log_lines")
syslog_name = universe.categories["internal"]["logging"].get("syslog")
timestamp = asctime()[4:19]
# send the timestamp and line to a file
if file_name:
+ if not isabs(file_name):
+ file_name = path_join(universe.startdir, file_name)
file_descriptor = file(file_name, "a")
for line in lines: file_descriptor.write(timestamp + " " + line + "\n")
file_descriptor.flush()
message = "There are " + str(total_count)
message += " log lines in memory and " + str(filtered_count)
message += " at or above level " + str(level) + "."
- message += " The lines from " + str(stop) + " to " + str(start)
- message += " are:$(eol)$(eol)"
+ message += " The matching lines from " + str(stop) + " to "
+ message += str(start) + " are:$(eol)$(eol)"
# add the text from the selected lines
if stop > 1: range_lines = loglines[-start:-(stop-1)]
# ignoring color escape sequences
relative_position = 0
+ # whether the current character is part of a telnet IAC sequence
+ iac_counter = 0
+
# whether the current character is part of a color escape sequence
escape = False
# iterate over each character from the begining of the text
for each_character in text:
+ # the current character is the telnet IAC character
+ if each_character == IAC and not iac_counter:
+ iac_counter = 2
+
+ # the current character is within an IAC sequence
+ elif iac_counter:
+
+ # the current character is another IAC,
+ # terminating the sequence
+ if each_character == IAC:
+ iac_counter = 0
+
+ # otherwise, decrement the IAC counter
+ else:
+ iac_counter -= 1
+
# the current character is the escape character
- if each_character == chr(27):
+ elif each_character == chr(27) and not escape:
escape = True
# the current character is within an escape sequence
elif escape:
# the current character is m, which terminates the
- # current escape sequence
+ # escape sequence
if each_character == "m":
escape = False
# the current character meets the requested maximum line width,
# so we need to backtrack and find a space at which to wrap
- elif relative_position == width:
+ elif relative_position == width and not each_character == "\r":
# distance of the current character examined from the
# relative position
def replace_macros(user, text, is_input=False):
"""Replaces macros in text output."""
+ # third person pronouns
+ pronouns = {
+ "female": { "obj": "her", "pos": "hers", "sub": "she" },
+ "male": { "obj": "him", "pos": "his", "sub": "he" },
+ "neuter": { "obj": "it", "pos": "its", "sub": "it" }
+ }
+
+ # a dict of replacement macros
+ macros = {
+ "eol": "\r\n",
+ "bld": chr(27) + "[1m",
+ "nrm": chr(27) + "[0m",
+ "blk": chr(27) + "[30m",
+ "blu": chr(27) + "[34m",
+ "cyn": chr(27) + "[36m",
+ "grn": chr(27) + "[32m",
+ "mgt": chr(27) + "[35m",
+ "red": chr(27) + "[31m",
+ "yel": chr(27) + "[33m",
+ }
+
+ # add dynamic macros where possible
+ if user.account:
+ account_name = user.account.get("name")
+ if account_name:
+ macros["account"] = account_name
+ if user.avatar:
+ avatar_gender = user.avatar.get("gender")
+ if avatar_gender:
+ macros["tpop"] = pronouns[avatar_gender]["obj"]
+ macros["tppp"] = pronouns[avatar_gender]["pos"]
+ macros["tpsp"] = pronouns[avatar_gender]["sub"]
+
# loop until broken
while True:
- # third person pronouns
- pronouns = {
- "female": { "obj": "her", "pos": "hers", "sub": "she" },
- "male": { "obj": "him", "pos": "his", "sub": "he" },
- "neuter": { "obj": "it", "pos": "its", "sub": "it" }
- }
-
- # a dict of replacement macros
- macros = {
- "$(eol)": "\r\n",
- "$(bld)": chr(27) + "[1m",
- "$(nrm)": chr(27) + "[0m",
- "$(blk)": chr(27) + "[30m",
- "$(blu)": chr(27) + "[34m",
- "$(cyn)": chr(27) + "[36m",
- "$(grn)": chr(27) + "[32m",
- "$(mgt)": chr(27) + "[35m",
- "$(red)": chr(27) + "[31m",
- "$(yel)": chr(27) + "[33m",
- }
-
- # add dynamic macros where possible
- if user.account:
- account_name = user.account.get("name")
- if account_name:
- macros["$(account)"] = account_name
- if user.avatar:
- avatar_gender = user.avatar.get("gender")
- if avatar_gender:
- macros["$(tpop)"] = pronouns[avatar_gender]["obj"]
- macros["$(tppp)"] = pronouns[avatar_gender]["pos"]
- macros["$(tpsp)"] = pronouns[avatar_gender]["sub"]
-
# find and replace per the macros dict
macro_start = text.find("$(")
if macro_start == -1: break
macro_end = text.find(")", macro_start) + 1
- macro = text[macro_start:macro_end]
+ macro = text[macro_start+2:macro_end-1]
if macro in macros.keys():
- text = text.replace(macro, macros[macro])
+ replacement = macros[macro]
+
+ # this is how we handle local file inclusion (dangerous!)
+ elif macro.startswith("inc:"):
+ incfile = path_join(universe.startdir, macro[4:])
+ if exists(incfile):
+ incfd = file(incfile)
+ replacement = ""
+ for line in incfd:
+ if line.endswith("\n") and not line.endswith("\r\n"):
+ line = line.replace("\n", "\r\n")
+ replacement += line
+ # lose the trailing eol
+ replacement = replacement[:-2]
+ else:
+ replacement = ""
+ log("Couldn't read included " + incfile + " file.", 6)
# if we get here, log and replace it with null
else:
- text = text.replace(macro, "")
+ replacement = ""
if not is_input:
log("Unexpected replacement macro " + macro + " encountered.", 6)
+ # and now we act on the replacement
+ text = text.replace("$(" + macro + ")", replacement)
+
# replace the look-like-a-macro sequence
text = text.replace("$_(", "$(")
"""Escapes replacement macros in text."""
return text.replace("$(", "$_(")
+def first_word(text, separator=" "):
+ """Returns a tuple of the first word and the rest."""
+ if text:
+ if text.find(separator) > 0: return text.split(separator, 1)
+ else: return text, ""
+ else: return "", ""
+
def on_pulse():
"""The things which should happen on each pulse, aside from reloads."""
# iterate over the connected users
for user in universe.userlist: user.pulse()
+ # add an element for counters if it doesn't exist
+ if not "counters" in universe.categories["internal"]:
+ universe.categories["internal"]["counters"] = Element("internal:counters", universe)
+
# update the log every now and then
if not universe.categories["internal"]["counters"].getint("mark"):
log(str(len(universe.userlist)) + " connection(s)")
# since we got input, flag that the menu/prompt needs to be redisplayed
user.menu_seen = False
+ # update the last_input timestamp while we're at it
+ user.last_input = universe.get_time()
+
def generic_menu_handler(user):
"""A generic menu choice handler."""
# is there input?
if input_data:
- # split out the command (first word) and parameters (everything else)
- if input_data.find(" ") > 0:
- command_name, parameters = input_data.split(" ", 1)
+ # split out the command and parameters
+ actor = user.avatar
+ mode = actor.get("mode")
+ if mode and input_data.startswith("!"):
+ command_name, parameters = first_word(input_data[1:])
+ elif mode == "chat":
+ command_name = "say"
+ parameters = input_data
else:
- command_name = input_data
- parameters = ""
+ command_name, parameters = first_word(input_data)
# lowercase the command
command_name = command_name.lower()
else: command = None
# if it's allowed, do it
- actor = user.avatar
if actor.can_run(command): exec(command.get("action"))
# otherwise, give an error
help_text = "No help is provided for this command."
output += help_text
+ # list related commands
+ see_also = command.getlist("see_also")
+ if see_also:
+ really_see_also = ""
+ for item in see_also:
+ if item in universe.categories["command"]:
+ command = universe.categories["command"][item]
+ if actor.can_run(command):
+ if really_see_also:
+ really_see_also += ", "
+ if command.getboolean("administrative"):
+ really_see_also += "$(red)"
+ else:
+ really_see_also += "$(grn)"
+ really_see_also += item + "$(nrm)"
+ if really_see_also:
+ output += "$(eol)$(eol)See also: " + really_see_also
+
# no data for the requested command word
else:
output = "That is not an available command."
# the user entered a message
elif parameters:
- # get rid of quote marks on the ends of the message and
- # capitalize the first letter
+ # get rid of quote marks on the ends of the message
message = parameters.strip("\"'`")
- message = message[0].capitalize() + message[1:]
-
- # a dictionary of punctuation:action pairs
- actions = {}
- for facet in universe.categories["internal"]["language"].facets():
- if facet.startswith("punctuation_"):
- action = facet.split("_")[1]
- for mark in universe.categories["internal"]["language"].getlist(facet):
- actions[mark] = action
# match the punctuation used, if any, to an action
+ actions = universe.categories["internal"]["language"].getdict("actions")
default_punctuation = universe.categories["internal"]["language"].get("default_punctuation")
- action = actions[default_punctuation]
+ action = ""
for mark in actions.keys():
- if message.endswith(mark) and mark != default_punctuation:
+ if message.endswith(mark):
action = actions[mark]
break
- # if the action is default and there is no mark, add one
- if action == actions[default_punctuation] and not message.endswith(default_punctuation):
- message += default_punctuation
+ # add punctuation if needed
+ if not action:
+ action = actions[default_punctuation]
+ if message and not message[-1] in punctuation:
+ message += default_punctuation
+
+ # decapitalize the first letter to improve matching
+ if message and message[0] in uppercase:
+ message = message[0].lower() + message[1:]
+
+ # iterate over all words in message, replacing typos
+ typos = universe.categories["internal"]["language"].getdict("typos")
+ words = message.split()
+ for index in range(len(words)):
+ word = words[index]
+ bare_word = word.strip(punctuation)
+ if bare_word in typos.keys():
+ words[index] = word.replace(bare_word, typos[bare_word])
+ message = " ".join(words)
- # capitalize a list of words within the message
- capitalize_words = universe.categories["internal"]["language"].getlist("capitalize_words")
- for word in capitalize_words:
- message = message.replace(" " + word + " ", " " + word.capitalize() + " ")
+ # capitalize the first letter
+ message = message[0].upper() + message[1:]
# tell the room
actor.echo_to_location(actor.get("name") + " " + action + "s, \"" + message + "\"")
else:
actor.send("What do you want to say?")
+def command_chat(actor):
+ """Toggle chat mode."""
+ mode = actor.get("mode")
+ if not mode:
+ actor.set("mode", "chat")
+ actor.send("Entering chat mode (use $(grn)!chat$(nrm) to exit).")
+ elif mode == "chat":
+ actor.remove_facet("mode")
+ actor.send("Exiting chat mode.")
+ else: actor.send("Sorry, but you're already busy with something else!")
+
def command_show(actor, parameters):
"""Show program data."""
message = ""
pid = str(getpid())
log("Process ID: " + pid)
file_name = universe.contents["internal:process"].get("pidfile")
- if not isabs(file_name):
- file_name = path_join(universe.startdir, file_name)
if file_name:
+ if not isabs(file_name):
+ file_name = path_join(universe.startdir, file_name)
file_descriptor = file(file_name, 'w')
file_descriptor.write(pid + "\n")
file_descriptor.flush()
def remove_pidfile(universe):
"""Remove the file containing the current process ID."""
file_name = universe.contents["internal:process"].get("pidfile")
- if not isabs(file_name):
- file_name = path_join(universe.startdir, file_name)
- if file_name and access(file_name, W_OK): remove(file_name)
+ if file_name:
+ if not isabs(file_name):
+ file_name = path_join(universe.startdir, file_name)
+ if access(file_name, W_OK): remove(file_name)
def excepthook(excepttype, value, traceback):
"""Handle uncaught exceptions."""