# -*- coding: utf-8 -*-
"""Miscellaneous functions for the mudpy engine."""
-# Copyright (c) 2004-2012 Jeremy Stanley <fungi@yuggoth.org>. Permission
+# Copyright (c) 2004-2014 Jeremy Stanley <fungi@yuggoth.org>. Permission
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
+import codecs
+import ctypes
+import ctypes.util
+import os
+import random
+import re
+import signal
+import socket
+import sys
+import syslog
+import time
+import traceback
+import unicodedata
+
+import mudpy
+
class Element:
def __init__(self, key, universe, filename=None):
"""Set up a new element."""
- import mudpy.data
- import os.path
# keep track of our key name
self.key = key
return default
def getint(self, facet, default=None):
- """Return values as int/long type."""
+ """Return values as int type."""
if default is None:
default = 0
if self.origin.data.has_option(self.key, facet):
def getlist(self, facet, default=None):
"""Return values as list type."""
- import mudpy.data
if default is None:
default = []
value = self.get(facet)
def getdict(self, facet, default=None):
"""Return values as dict type."""
- import mudpy.data
if default is None:
default = {}
value = self.get(facet)
def set(self, facet, value):
"""Set values."""
if not self.has_facet(facet) or not self.get(facet) == value:
- if type(value) is long or repr(type(value)) == "<type 'unicode'>":
- value = str(value)
- elif not type(value) is str:
+ if not type(value) is str:
value = repr(value)
self.origin.data.set(self.key, facet, value)
self.origin.modified = True
def append(self, facet, value):
- """Append value tp a list."""
- if type(value) is long:
- value = str(value)
- elif not type(value) is str:
+ """Append value to a list."""
+ if not type(value) is str:
value = repr(value)
newlist = self.getlist(facet)
newlist.append(value)
self.set(facet, newlist)
- def new_event(self, action, when=None):
- """Create, attach and enqueue an event element."""
-
- # if when isn't specified, that means now
- if not when:
- when = self.universe.get_time()
-
- # events are elements themselves
- event = Element("event:" + self.key + ":" + counter)
-
def send(
self,
message,
def portals(self):
"""Map the portal directions for a room to neighbors."""
- import re
portals = {}
if re.match("""^location:-?\d+,-?\d+,-?\d+$""", self.key):
coordinates = [(int(x))
def __init__(self, filename="", load=False):
"""Initialize the universe."""
- import os
- import os.path
self.categories = {}
self.contents = {}
self.default_origins = {}
self.loglines = []
- self.pending_events_long = {}
- self.pending_events_short = {}
self.private_files = []
self.reload_flag = False
self.startdir = os.getcwd()
def load(self):
"""Load universe data from persistent storage."""
- import mudpy.data
# the files dict must exist and filename needs to be read-only
if not hasattr(
def initialize_server_socket(self):
"""Create and open the listening socket."""
- import socket
# need to know the local address and port number for the listener
host = self.categories["internal"]["network"].get("host")
def __init__(self):
"""Default values for the in-memory user variables."""
- import mudpy.telnet
self.account = None
self.address = ""
self.authenticated = False
self.menu_seen = False
self.negotiation_pause = 0
self.output_queue = []
- self.partial_input = ""
+ self.partial_input = b""
self.password_tries = 0
self.state = "initial"
self.telopts = {}
def adjust_echoing(self):
"""Adjust echoing to match state menu requirements."""
- import mudpy.telnet
if mudpy.telnet.is_enabled(self, mudpy.telnet.TELOPT_ECHO,
mudpy.telnet.US):
if menu_echo_on(self.state):
prepend_padding=True
):
"""Send arbitrary text to a connected user."""
- import mudpy.telnet
# unless raw mode is on, clean it up all nice and pretty
if not raw:
# and the ansi escape to return to normal text
if not just_prompt and prepend_padding:
if not self.output_queue \
- or not self.output_queue[-1].endswith("\r\n"):
+ or not self.output_queue[-1].endswith(b"\r\n"):
output = "$(eol)" + output
elif not self.output_queue[-1].endswith(
- "\r\n\x1b[0m\r\n"
+ b"\r\n\x1b[0m\r\n"
) and not self.output_queue[-1].endswith(
- "\r\n\r\n"
+ b"\r\n\r\n"
):
output = "$(eol)" + output
- output += eol + unichr(27) + "[0m"
+ output += eol + chr(27) + "[0m"
# tack on a prompt if active
if self.state == "active":
def enqueue_input(self):
"""Process and enqueue any new input."""
- import mudpy.telnet
- import unicodedata
# check for some input
try:
raw_input = self.connection.recv(1024)
except:
- raw_input = ""
+ raw_input = b""
# we got something
if raw_input:
mudpy.telnet.negotiate_telnet_options(self)
# separate multiple input lines
- new_input_lines = self.partial_input.split("\n")
+ new_input_lines = self.partial_input.split(b"\n")
# if input doesn't end in a newline, replace the
# held partial input with the last line of it
- if not self.partial_input.endswith("\n"):
+ if not self.partial_input.endswith(b"\n"):
self.partial_input = new_input_lines.pop()
# otherwise, chop off the extra null input and reset
# the held partial input
else:
new_input_lines.pop()
- self.partial_input = ""
+ self.partial_input = b""
# iterate over the remaining lines
for line in new_input_lines:
# log non-printable characters remaining
if mudpy.telnet.is_enabled(self, mudpy.telnet.TELOPT_BINARY,
mudpy.telnet.HIM):
- asciiline = "".join(
- filter(lambda x: " " <= x <= "~", line))
+ asciiline = b"".join(
+ filter(lambda x: b" " <= x <= b"~", line))
if line != asciiline:
logline = "Non-ASCII characters from "
if self.account and self.account.get("name"):
log(logline, 4)
line = asciiline
+ try:
+ line = line.decode("utf-8")
+ except UnicodeDecodeError:
+ logline = "Non-UTF-8 characters from "
+ if self.account and self.account.get("name"):
+ logline += self.account.get("name") + ": "
+ else:
+ logline += "unknown user: "
+ logline += repr(line)
+ log(logline, 4)
+ return
+
+ line = unicodedata.normalize("NFKC", line)
+
# put on the end of the queue
- self.input_queue.append(
- unicodedata.normalize("NFKC", line.decode("utf-8"))
- )
+ self.input_queue.append(line)
def new_avatar(self):
"""Instantiate a new, unconfigured avatar for this user."""
def log(message, level=0):
"""Log a message."""
- import codecs
- import os.path
- import syslog
- import time
# a couple references we need
file_name = universe.categories["internal"]["logging"].get("file")
def glyph_columns(character):
"""Convenience function to return the column width of a glyph."""
- import unicodedata
if unicodedata.east_asian_width(character) in "FW":
return 2
else:
def wrap_ansi_text(text, width):
"""Wrap text with arbitrary width while ignoring ANSI colors."""
- import unicodedata
# the current position in the entire text string, including all
# characters, printable or otherwise
def weighted_choice(data):
"""Takes a dict weighted by value and returns a random key."""
- import random
# this will hold our expanded list of keys from the data
expanded = []
def random_name():
"""Returns a random character name."""
- import random
# the vowels and consonants needed to create romaji syllables
vowels = [
def replace_macros(user, text, is_input=False):
"""Replaces macros in text output."""
- import codecs
- import mudpy.data
- import os.path
# third person pronouns
pronouns = {
# a dict of replacement macros
macros = {
"eol": "\r\n",
- "bld": unichr(27) + "[1m",
- "nrm": unichr(27) + "[0m",
- "blk": unichr(27) + "[30m",
- "blu": unichr(27) + "[34m",
- "cyn": unichr(27) + "[36m",
- "grn": unichr(27) + "[32m",
- "mgt": unichr(27) + "[35m",
- "red": unichr(27) + "[31m",
- "yel": unichr(27) + "[33m",
+ "bld": chr(27) + "[1m",
+ "nrm": chr(27) + "[0m",
+ "blk": chr(27) + "[30m",
+ "blu": chr(27) + "[34m",
+ "cyn": chr(27) + "[36m",
+ "grn": chr(27) + "[32m",
+ "mgt": chr(27) + "[35m",
+ "red": chr(27) + "[31m",
+ "yel": chr(27) + "[33m",
}
# add dynamic macros where possible
def on_pulse():
"""The things which should happen on each pulse, aside from reloads."""
- import time
# open the listening socket if it hasn't been already
if not hasattr(universe, "listening_socket"):
def check_for_connection(listening_socket):
"""Check for a waiting connection and return a new user object."""
- import mudpy.telnet
# try to accept a new connection
try:
def handle_user_input(user):
"""The main handler, branches to a state-specific handler."""
- import mudpy.telnet
# if the user's client echo is off, send a blank line for aesthetics
if mudpy.telnet.is_enabled(user, mudpy.telnet.TELOPT_ECHO,
def handler_checking_password(user):
"""Handle the login account password."""
- import mudpy.password
# get the next waiting line of input
input_data = user.input_queue.pop(0)
def handler_entering_new_password(user):
"""Handle a new password entry."""
- import mudpy.password
# get the next waiting line of input
input_data = user.input_queue.pop(0)
def handler_verifying_new_password(user):
"""Handle the re-entered new password for verification."""
- import mudpy.password
# get the next waiting line of input
input_data = user.input_queue.pop(0)
def command_say(actor, parameters):
"""Speak to others in the same room."""
- import unicodedata
# check for replacement macros and escape them
parameters = escape_macros(parameters)
def command_show(actor, parameters):
"""Show program data."""
- import re
message = ""
arguments = parameters.split()
if not parameters:
def command_error(actor, input_data):
"""Generic error for an unrecognized command word."""
- import random
# 90% of the time use a generic error
if random.randrange(10):
def daemonize(universe):
"""Fork and disassociate from everything."""
- import codecs
- import ctypes
- import ctypes.util
- import os
- import os.path
- import sys
# only if this is what we're configured to do
if universe.contents["internal:process"].getboolean("daemon"):
# if possible, we want to rename the process to the same as the script
- # (these will need to be byte type during 2to3 migration)
- new_argv = "\0".join(sys.argv) + "\0"
- new_short_argv0 = os.path.basename(sys.argv[0]) + "\0"
+ new_argv = b"\x00".join(x.encode("utf-8") for x in sys.argv) + b"\x00"
+ short_argv0 = os.path.basename(sys.argv[0]).encode("utf-8") + b"\x00"
# attempt the linux way first
try:
ctypes.memmove(argc.contents, new_argv, len(new_argv))
ctypes.CDLL(ctypes.util.find_library("c")).prctl(
15,
- new_short_argv0,
+ short_argv0,
0,
0,
0
def create_pidfile(universe):
"""Write a file containing the current process ID."""
- import codecs
- import os
- import os.path
pid = str(os.getpid())
log("Process ID: " + pid)
file_name = universe.contents["internal:process"].get("pidfile")
def remove_pidfile(universe):
"""Remove the file containing the current process ID."""
- import os
- import os.path
file_name = universe.contents["internal:process"].get("pidfile")
if file_name:
if not os.path.isabs(file_name):
def excepthook(excepttype, value, tracebackdata):
"""Handle uncaught exceptions."""
- import traceback
# assemble the list of errors into a single string
message = "".join(
def sighook(what, where):
"""Handle external signals."""
- import signal
# a generic message
message = "Caught signal: "
def override_excepthook():
"""Redefine sys.excepthook with our own."""
- import sys
sys.excepthook = excepthook
def assign_sighook():
"""Assign a customized handler for some signals."""
- import signal
signal.signal(signal.SIGHUP, sighook)
signal.signal(signal.SIGTERM, sighook)
def setup():
"""This contains functions to be performed when starting the engine."""
- import sys
# see if a configuration file was specified
if len(sys.argv) > 1: