# -*- coding: utf-8 -*-
"""Miscellaneous functions for the mudpy engine."""
-# Copyright (c) 2004-2013 Jeremy Stanley <fungi@yuggoth.org>. Permission
+# Copyright (c) 2004-2014 Jeremy Stanley <fungi@yuggoth.org>. Permission
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
+import codecs
+import os
+import random
+import re
+import signal
+import socket
+import sys
+import syslog
+import time
+import traceback
+import unicodedata
+
+import mudpy
+
class Element:
def __init__(self, key, universe, filename=None):
"""Set up a new element."""
- import mudpy.data
- import os.path
# keep track of our key name
self.key = key
self.origin = self.universe.files[filename]
# add a data section to the origin if necessary
- if not self.origin.data.has_section(self.key):
- self.origin.data.add_section(self.key)
+ # TODO(fungi): remove this indirection after the YAML transition
+ if self.origin._format == "yaml":
+ if self.key not in self.origin.data:
+ self.origin.data[self.key] = {}
+ else:
+ if not self.origin.data.has_section(self.key):
+ self.origin.data.add_section(self.key)
# add or replace this element in the universe
self.universe.contents[self.key] = self
def reload(self):
"""Create a new element and replace this one."""
- new_element = Element(self.key, self.universe, self.origin.filename)
+ Element(self.key, self.universe, self.origin.filename)
del(self)
def destroy(self):
def facets(self):
"""Return a list of non-inherited facets for this element."""
- if self.key in self.origin.data.sections():
- return self.origin.data.options(self.key)
+ # TODO(fungi): remove this indirection after the YAML transition
+ if self.origin._format == "yaml":
+ try:
+ return self.origin.data[self.key].keys()
+ except (AttributeError, KeyError):
+ return []
else:
- return []
+ if self.key in self.origin.data.sections():
+ return self.origin.data.options(self.key)
+ else:
+ return []
def has_facet(self, facet):
"""Return whether the non-inherited facet exists."""
"""Retrieve values."""
if default is None:
default = ""
- if self.origin.data.has_option(self.key, facet):
- raw_data = self.origin.data.get(self.key, facet)
- if raw_data.startswith("u\"") or raw_data.startswith("u'"):
- raw_data = raw_data[1:]
- raw_data.strip("\"'")
- return raw_data
- elif self.has_facet("inherit"):
- for ancestor in self.ancestry():
- if self.universe.contents[ancestor].has_facet(facet):
- return self.universe.contents[ancestor].get(facet)
+ # TODO(fungi): remove this indirection after the YAML transition
+ if self.origin._format == "yaml":
+ try:
+ return self.origin.data[self.key][facet]
+ except (KeyError, TypeError):
+ pass
+ if self.has_facet("inherit"):
+ for ancestor in self.ancestry():
+ if self.universe.contents[ancestor].has_facet(facet):
+ return self.universe.contents[ancestor].get(facet)
+ else:
+ return default
else:
- return default
+ if self.origin.data.has_option(self.key, facet):
+ raw_data = self.origin.data.get(self.key, facet)
+ if raw_data.startswith("u\"") or raw_data.startswith("u'"):
+ raw_data = raw_data[1:]
+ raw_data.strip("\"'")
+ return raw_data
+ elif self.has_facet("inherit"):
+ for ancestor in self.ancestry():
+ if self.universe.contents[ancestor].has_facet(facet):
+ return self.universe.contents[ancestor].get(facet)
+ else:
+ return default
def getboolean(self, facet, default=None):
"""Retrieve values as boolean type."""
if default is None:
default = False
- if self.origin.data.has_option(self.key, facet):
- return self.origin.data.getboolean(self.key, facet)
- elif self.has_facet("inherit"):
+ # TODO(fungi): remove this indirection after the YAML transition
+ if self.origin._format == "yaml":
+ try:
+ return bool(self.origin.data[self.key][facet])
+ except KeyError:
+ pass
for ancestor in self.ancestry():
- if self.universe.contents[ancestor].has_facet(facet):
+ try:
return self.universe.contents[ancestor].getboolean(facet)
- else:
+ except KeyError:
+ pass
return default
+ else:
+ if self.origin.data.has_option(self.key, facet):
+ return self.origin.data.getboolean(self.key, facet)
+ elif self.has_facet("inherit"):
+ for ancestor in self.ancestry():
+ if self.universe.contents[ancestor].has_facet(facet):
+ return self.universe.contents[ancestor].getboolean(
+ facet)
+ else:
+ return default
def getint(self, facet, default=None):
"""Return values as int type."""
def getlist(self, facet, default=None):
"""Return values as list type."""
- import mudpy.data
if default is None:
default = []
value = self.get(facet)
def getdict(self, facet, default=None):
"""Return values as dict type."""
- import mudpy.data
if default is None:
default = {}
value = self.get(facet)
def set(self, facet, value):
"""Set values."""
if not self.has_facet(facet) or not self.get(facet) == value:
- # TODO: remove this check after the switch to py3k
- if repr(type(value)) == "<type 'unicode'>":
- value = str(value)
if not type(value) is str:
value = repr(value)
self.origin.data.set(self.key, facet, value)
newlist.append(value)
self.set(facet, newlist)
- def new_event(self, action, when=None):
- """Create, attach and enqueue an event element."""
-
- # if when isn't specified, that means now
- if not when:
- when = self.universe.get_time()
-
- # events are elements themselves
- event = Element("event:" + self.key + ":" + counter)
-
def send(
self,
message,
def update_location(self):
"""Make sure the location's contents contain this element."""
- location = self.get("location")
- if location in self.universe.contents:
- self.universe.contents[location].contents[self.key] = self
+ area = self.get("location")
+ if area in self.universe.contents:
+ self.universe.contents[area].contents[self.key] = self
def clean_contents(self):
"""Make sure the element's contents aren't bogus."""
if element.get("location") != self.key:
del self.contents[element.key]
- def go_to(self, location):
- """Relocate the element to a specific location."""
+ def go_to(self, area):
+ """Relocate the element to a specific area."""
current = self.get("location")
if current and self.key in self.universe.contents[current].contents:
del universe.contents[current].contents[self.key]
- if location in self.universe.contents:
- self.set("location", location)
- self.universe.contents[location].contents[self.key] = self
- self.look_at(location)
+ if area in self.universe.contents:
+ self.set("location", area)
+ self.universe.contents[area].contents[self.key] = self
+ self.look_at(area)
def go_home(self):
"""Relocate the element to its default location."""
self.send(message)
def portals(self):
- """Map the portal directions for a room to neighbors."""
- import re
+ """Map the portal directions for an area to neighbors."""
portals = {}
- if re.match("""^location:-?\d+,-?\d+,-?\d+$""", self.key):
+ if re.match("""^area:-?\d+,-?\d+,-?\d+$""", self.key):
coordinates = [(int(x))
for x in self.key.split(":")[1].split(",")]
directions = self.universe.categories["internal"]["directions"]
for portal in self.getlist("gridlinks"):
adjacent = map(lambda c, o: c + o,
coordinates, offsets[portal])
- neighbor = "location:" + ",".join(
+ neighbor = "area:" + ",".join(
[(str(x)) for x in adjacent]
)
if neighbor in self.universe.contents:
def __init__(self, filename="", load=False):
"""Initialize the universe."""
- import os
- import os.path
self.categories = {}
self.contents = {}
self.default_origins = {}
self.loglines = []
- self.pending_events_long = {}
- self.pending_events_short = {}
self.private_files = []
self.reload_flag = False
self.startdir = os.getcwd()
def load(self):
"""Load universe data from persistent storage."""
- import mudpy.data
# the files dict must exist and filename needs to be read-only
if not hasattr(
# clear out all read-only files
if hasattr(self, "files"):
- for data_filename in self.files.keys():
+ for data_filename in list(self.files.keys()):
if not self.files[data_filename].is_writeable():
del self.files[data_filename]
# go through all elements to clear out inactive avatar locations
for element in self.contents.values():
- location = element.get("location")
- if element in inactive_avatars and location:
- if location in self.contents and element.key in self.contents[
- location
+ area = element.get("location")
+ if element in inactive_avatars and area:
+ if area in self.contents and element.key in self.contents[
+ area
].contents:
- del self.contents[location].contents[element.key]
- element.set("default_location", location)
+ del self.contents[area].contents[element.key]
+ element.set("default_location", area)
element.remove_facet("location")
# another pass to straighten out all the element contents
def initialize_server_socket(self):
"""Create and open the listening socket."""
- import socket
# need to know the local address and port number for the listener
host = self.categories["internal"]["network"].get("host")
def __init__(self):
"""Default values for the in-memory user variables."""
- import mudpy.telnet
self.account = None
self.address = ""
self.authenticated = False
logline += self.account.get("name")
else:
logline += "an unknown user"
- logline += " after idling too long in the " + \
- self.state + " state."
+ logline += (" after idling too long in the " + self.state
+ + " state.")
log(logline, 2)
self.state = "disconnecting"
self.menu_seen = False
def adjust_echoing(self):
"""Adjust echoing to match state menu requirements."""
- import mudpy.telnet
if mudpy.telnet.is_enabled(self, mudpy.telnet.TELOPT_ECHO,
mudpy.telnet.US):
if menu_echo_on(self.state):
prepend_padding=True
):
"""Send arbitrary text to a connected user."""
- import mudpy.telnet
# unless raw mode is on, clean it up all nice and pretty
if not raw:
# with the optional eol string passed to this function
# and the ansi escape to return to normal text
if not just_prompt and prepend_padding:
- if not self.output_queue \
- or not self.output_queue[-1].endswith("\r\n"):
+ if (not self.output_queue or not
+ self.output_queue[-1].endswith(b"\r\n")):
output = "$(eol)" + output
elif not self.output_queue[-1].endswith(
- "\r\n\x1b[0m\r\n"
+ b"\r\n\x1b[0m\r\n"
) and not self.output_queue[-1].endswith(
- "\r\n\r\n"
+ b"\r\n\r\n"
):
output = "$(eol)" + output
output += eol + chr(27) + "[0m"
try:
self.connection.send(self.output_queue[0])
del self.output_queue[0]
- except:
+ except BrokenPipeError:
if self.account and self.account.get("name"):
account = self.account.get("name")
else:
account = "an unknown user"
- log("Sending to %s raised an exception (broken pipe?)."
- % account, 7)
- pass
+ log("Broken pipe sending to %s." % account, 7)
+ self.state = "disconnecting"
def enqueue_input(self):
"""Process and enqueue any new input."""
- import mudpy.telnet
- import unicodedata
# check for some input
try:
raw_input = self.connection.recv(1024)
- except:
+ except (BlockingIOError, OSError):
raw_input = b""
# we got something
def log(message, level=0):
"""Log a message."""
- import codecs
- import os.path
- import syslog
- import time
# a couple references we need
file_name = universe.categories["internal"]["logging"].get("file")
def glyph_columns(character):
"""Convenience function to return the column width of a glyph."""
- import unicodedata
if unicodedata.east_asian_width(character) in "FW":
return 2
else:
def wrap_ansi_text(text, width):
"""Wrap text with arbitrary width while ignoring ANSI colors."""
- import unicodedata
# the current position in the entire text string, including all
# characters, printable or otherwise
last_whitespace = abs_pos
# insert an eol in place of the space
- text = text[:last_whitespace] + \
- "\r\n" + text[last_whitespace + 1:]
+ text = text[:last_whitespace] + "\r\n" + text[last_whitespace + 1:]
# increase the absolute position because an eol is two
# characters but the space it replaced was only one
def weighted_choice(data):
"""Takes a dict weighted by value and returns a random key."""
- import random
# this will hold our expanded list of keys from the data
expanded = []
def random_name():
"""Returns a random character name."""
- import random
# the vowels and consonants needed to create romaji syllables
vowels = [
def replace_macros(user, text, is_input=False):
"""Replaces macros in text output."""
- import codecs
- import mudpy.data
- import os.path
# third person pronouns
pronouns = {
def on_pulse():
"""The things which should happen on each pulse, aside from reloads."""
- import time
# open the listening socket if it hasn't been already
if not hasattr(universe, "listening_socket"):
def check_for_connection(listening_socket):
"""Check for a waiting connection and return a new user object."""
- import mudpy.telnet
# try to accept a new connection
try:
connection, address = listening_socket.accept()
- except:
+ except BlockingIOError:
return None
# note that we got one
def handle_user_input(user):
"""The main handler, branches to a state-specific handler."""
- import mudpy.telnet
# if the user's client echo is off, send a blank line for aesthetics
if mudpy.telnet.is_enabled(user, mudpy.telnet.TELOPT_ECHO,
def handler_checking_password(user):
"""Handle the login account password."""
- import mudpy.password
# get the next waiting line of input
input_data = user.input_queue.pop(0)
def handler_entering_new_password(user):
"""Handle a new password entry."""
- import mudpy.password
# get the next waiting line of input
input_data = user.input_queue.pop(0)
def handler_verifying_new_password(user):
"""Handle the re-entered new password for verification."""
- import mudpy.password
# get the next waiting line of input
input_data = user.input_queue.pop(0)
else:
output += " $(grn)"
output += item + "$(nrm) - " + description + "$(eol)"
- output += "$(eol)Enter \"help COMMAND\" for help on a command " \
- + "named \"COMMAND\"."
+ output += ("$(eol)Enter \"help COMMAND\" for help on a command "
+ "named \"COMMAND\".")
# send the accumulated output to the user
actor.send(output)
def command_say(actor, parameters):
- """Speak to others in the same room."""
- import unicodedata
+ """Speak to others in the same area."""
# check for replacement macros and escape them
parameters = escape_macros(parameters)
# capitalize the first letter
message = message[0].upper() + message[1:]
- # tell the room
+ # tell the area
if message:
actor.echo_to_location(
actor.get("name") + " " + action + "s, \"" + message + "\""
def command_show(actor, parameters):
"""Show program data."""
- import re
message = ""
arguments = parameters.split()
if not parameters:
status = "rw"
else:
status = "ro"
- message += "$(eol) $(red)(" + status + ") $(grn)" + filename \
- + "$(nrm)"
+ message += ("$(eol) $(red)(" + status + ") $(grn)" + filename
+ + "$(nrm)")
elif arguments[0] == "category":
if len(arguments) != 2:
message = "You must specify one category."
elif arguments[1] in universe.categories:
- message = "These are the elements in the \"" + arguments[1] \
- + "\" category:$(eol)"
+ message = ("These are the elements in the \"" + arguments[1]
+ + "\" category:$(eol)")
elements = [
(
universe.categories[arguments[1]][x].key
if len(arguments) != 2:
message = "You must specify one file."
elif arguments[1] in universe.files:
- message = "These are the elements in the \"" + arguments[1] \
- + "\" file:$(eol)"
+ message = ("These are the elements in the \"" + arguments[1]
+ + "\" file:$(eol)")
elements = universe.files[arguments[1]].data.sections()
elements.sort()
for element in elements:
message = "You must specify one element."
elif arguments[1] in universe.contents:
element = universe.contents[arguments[1]]
- message = "These are the properties of the \"" + arguments[1] \
- + \
- "\" element (in \"" + \
- element.origin.filename + "\"):$(eol)"
+ message = ("These are the properties of the \"" + arguments[1]
+ + "\" element (in \"" + element.origin.filename
+ + "\"):$(eol)")
facets = element.facets()
facets.sort()
for facet in facets:
- message += "$(eol) $(grn)" + facet + ": $(red)" \
- + escape_macros(element.get(facet)) + "$(nrm)"
+ message += ("$(eol) $(grn)" + facet + ": $(red)"
+ + escape_macros(element.get(facet)) + "$(nrm)")
else:
message = "Element \"" + arguments[1] + "\" does not exist."
elif arguments[0] == "result":
else:
try:
message = repr(eval(" ".join(arguments[1:])))
- except:
- message = "Your expression raised an exception!"
+ except Exception as e:
+ message = ("$(red)Your expression raised an exception...$(eol)"
+ "$(eol)$(bld)%s$(nrm)" % e)
elif arguments[0] == "log":
if len(arguments) == 4:
if re.match("^\d+$", arguments[3]) and int(arguments[3]) >= 0:
if level > -1 and start > -1 and stop > -1:
message = get_loglines(level, start, stop)
else:
- message = "When specified, level must be 0-9 (default 1), " \
- + "start and stop must be >=1 (default 10 and 1)."
+ message = ("When specified, level must be 0-9 (default 1), "
+ "start and stop must be >=1 (default 10 and 1).")
else:
message = "I don't know what \"" + parameters + "\" is."
actor.send(message)
if element in universe.contents:
message = "The \"" + element + "\" element already exists."
else:
- message = "You create \"" + \
- element + "\" within the universe."
+ message = ("You create \"" +
+ element + "\" within the universe.")
logline = actor.owner.account.get(
"name"
) + " created an element: " + element
if filename:
logline += " in file " + filename
if filename not in universe.files:
- message += " Warning: \"" + filename \
- + "\" is not yet included in any other file and will " \
- + \
- "not be read on startup unless this is remedied."
+ message += (
+ " Warning: \"" + filename + "\" is not yet "
+ "included in any other file and will not be read "
+ "on startup unless this is remedied.")
Element(element, universe, filename)
log(logline, 6)
elif len(arguments) > 2:
message = "You must specify an element to destroy."
else:
if parameters not in universe.contents:
- message = "The \"" + parameters + \
- "\" element does not exist."
+ message = "The \"" + parameters + "\" element does not exist."
else:
universe.contents[parameters].destroy()
- message = "You destroy \"" + parameters \
- + "\" within the universe."
+ message = ("You destroy \"" + parameters
+ + "\" within the universe.")
log(
actor.owner.account.get(
"name"
else:
arguments = parameters.split(" ", 2)
if len(arguments) == 1:
- message = "What facet of element \"" + arguments[0] \
- + "\" would you like to set?"
+ message = ("What facet of element \"" + arguments[0]
+ + "\" would you like to set?")
elif len(arguments) == 2:
- message = "What value would you like to set for the \"" \
- + arguments[1] + "\" facet of the \"" + arguments[0] \
- + "\" element?"
+ message = ("What value would you like to set for the \"" +
+ arguments[1] + "\" facet of the \"" + arguments[0]
+ + "\" element?")
else:
element, facet, value = arguments
if element not in universe.contents:
message = "The \"" + element + "\" element does not exist."
else:
universe.contents[element].set(facet, value)
- message = "You have successfully (re)set the \"" + facet \
- + "\" facet of element \"" + element \
- + "\". Try \"show element " + \
- element + "\" for verification."
+ message = ("You have successfully (re)set the \"" + facet
+ + "\" facet of element \"" + element
+ + "\". Try \"show element " +
+ element + "\" for verification.")
actor.send(message)
else:
arguments = parameters.split(" ")
if len(arguments) == 1:
- message = "What facet of element \"" + arguments[0] \
- + "\" would you like to delete?"
+ message = ("What facet of element \"" + arguments[0]
+ + "\" would you like to delete?")
elif len(arguments) != 2:
message = "You may only specify an element and a facet."
else:
if element not in universe.contents:
message = "The \"" + element + "\" element does not exist."
elif facet not in universe.contents[element].facets():
- message = "The \"" + element + "\" element has no \"" + facet \
- + "\" facet."
+ message = ("The \"" + element + "\" element has no \"" + facet
+ + "\" facet.")
else:
universe.contents[element].remove_facet(facet)
- message = "You have successfully deleted the \"" + facet \
- + "\" facet of element \"" + element \
- + "\". Try \"show element " + \
- element + "\" for verification."
+ message = ("You have successfully deleted the \"" + facet
+ + "\" facet of element \"" + element
+ + "\". Try \"show element " +
+ element + "\" for verification.")
actor.send(message)
def command_error(actor, input_data):
"""Generic error for an unrecognized command word."""
- import random
# 90% of the time use a generic error
if random.randrange(10):
def daemonize(universe):
"""Fork and disassociate from everything."""
- import codecs
- import ctypes
- import ctypes.util
- import os
- import os.path
- import sys
# only if this is what we're configured to do
if universe.contents["internal:process"].getboolean("daemon"):
- # if possible, we want to rename the process to the same as the script
- new_argv = b"\x00".join(x.encode("utf-8") for x in sys.argv) + b"\x00"
- short_argv0 = os.path.basename(sys.argv[0]).encode("utf-8") + b"\x00"
-
- # attempt the linux way first
- try:
- argv_array = ctypes.POINTER(ctypes.c_char_p)
- ctypes.pythonapi.Py_GetArgcArgv.argtypes = (
- ctypes.POINTER(ctypes.c_int),
- ctypes.POINTER(argv_array)
- )
- argc = argv_array()
- ctypes.pythonapi.Py_GetArgcArgv(
- ctypes.c_int(0),
- ctypes.pointer(argc)
- )
- old_argv0_size = len(argc.contents.value)
- ctypes.memset(argc.contents, 0, len(new_argv) + old_argv0_size)
- ctypes.memmove(argc.contents, new_argv, len(new_argv))
- ctypes.CDLL(ctypes.util.find_library("c")).prctl(
- 15,
- short_argv0,
- 0,
- 0,
- 0
- )
-
- except:
-
- # since that failed, maybe it's bsd?
- try:
-
- # much simpler, since bsd has a libc function call for this
- ctypes.CDLL(ctypes.util.find_library("c")).setproctitle(
- new_argv
- )
-
- except:
-
- # that didn't work either, so just log that we couldn't
- log("Failed to rename the interpreter process (cosmetic).")
-
# log before we start forking around, so the terminal gets the message
log("Disassociating from the controlling terminal.")
def create_pidfile(universe):
"""Write a file containing the current process ID."""
- import codecs
- import os
- import os.path
pid = str(os.getpid())
log("Process ID: " + pid)
file_name = universe.contents["internal:process"].get("pidfile")
def remove_pidfile(universe):
"""Remove the file containing the current process ID."""
- import os
- import os.path
file_name = universe.contents["internal:process"].get("pidfile")
if file_name:
if not os.path.isabs(file_name):
def excepthook(excepttype, value, tracebackdata):
"""Handle uncaught exceptions."""
- import traceback
# assemble the list of errors into a single string
message = "".join(
# try to log it, if possible
try:
log(message, 9)
- except:
- pass
-
- # try to write it to stderr, if possible
- try:
- sys.stderr.write(message)
- except:
- pass
+ except Exception as e:
+ # try to write it to stderr, if possible
+ sys.stderr.write(message + "\nException while logging...\n%s" % e)
def sighook(what, where):
"""Handle external signals."""
- import signal
# a generic message
message = "Caught signal: "
def override_excepthook():
"""Redefine sys.excepthook with our own."""
- import sys
sys.excepthook = excepthook
def assign_sighook():
"""Assign a customized handler for some signals."""
- import signal
signal.signal(signal.SIGHUP, sighook)
signal.signal(signal.SIGTERM, sighook)
def setup():
"""This contains functions to be performed when starting the engine."""
- import sys
# see if a configuration file was specified
if len(sys.argv) > 1:
def finish():
- """This contains functions to be performed when shutting down the
- engine."""
+ """These are functions performed when shutting down the engine."""
# the loop has terminated, so save persistent data
universe.save()