# -*- coding: utf-8 -*-
-u"""Miscellaneous functions for the mudpy engine."""
+"""Miscellaneous functions for the mudpy engine."""
# Copyright (c) 2004-2011 Jeremy Stanley <fungi@yuggoth.org>. Permission
# to use, copy, modify, and distribute this software is granted under
class Element:
- u"""An element of the universe."""
+ """An element of the universe."""
def __init__(self, key, universe, filename=None):
- u"""Set up a new element."""
+ """Set up a new element."""
import data
import os.path
if self.key in self.universe.contents:
old_element = self.universe.contents[self.key]
for attribute in vars(old_element).keys():
- exec(u"self." + attribute + u" = old_element." + attribute)
+ exec("self." + attribute + " = old_element." + attribute)
if self.owner:
self.owner.avatar = self
self.contents = {}
# parse out appropriate category and subkey names, add to list
- if self.key.find(u":") > 0:
- self.category, self.subkey = self.key.split(u":", 1)
+ if self.key.find(":") > 0:
+ self.category, self.subkey = self.key.split(":", 1)
else:
- self.category = u"other"
+ self.category = "other"
self.subkey = self.key
if self.category not in self.universe.categories:
- self.category = u"other"
+ self.category = "other"
self.subkey = self.key
# get an appropriate filename for the origin
self.universe.categories[self.category][self.subkey] = self
def reload(self):
- u"""Create a new element and replace this one."""
+ """Create a new element and replace this one."""
new_element = Element(self.key, self.universe, self.origin.filename)
del(self)
def destroy(self):
- u"""Remove an element from the universe and destroy it."""
+ """Remove an element from the universe and destroy it."""
self.origin.data.remove_section(self.key)
del self.universe.categories[self.category][self.subkey]
del self.universe.contents[self.key]
del self
def facets(self):
- u"""Return a list of non-inherited facets for this element."""
+ """Return a list of non-inherited facets for this element."""
if self.key in self.origin.data.sections():
return self.origin.data.options(self.key)
else:
return []
def has_facet(self, facet):
- u"""Return whether the non-inherited facet exists."""
+ """Return whether the non-inherited facet exists."""
return facet in self.facets()
def remove_facet(self, facet):
- u"""Remove a facet from the element."""
+ """Remove a facet from the element."""
if self.has_facet(facet):
self.origin.data.remove_option(self.key, facet)
self.origin.modified = True
def ancestry(self):
- u"""Return a list of the element's inheritance lineage."""
- if self.has_facet(u"inherit"):
- ancestry = self.getlist(u"inherit")
+ """Return a list of the element's inheritance lineage."""
+ if self.has_facet("inherit"):
+ ancestry = self.getlist("inherit")
for parent in ancestry[:]:
ancestors = self.universe.contents[parent].ancestry()
for ancestor in ancestors:
return []
def get(self, facet, default=None):
- u"""Retrieve values."""
+ """Retrieve values."""
if default is None:
- default = u""
+ default = ""
if self.origin.data.has_option(self.key, facet):
raw_data = self.origin.data.get(self.key, facet)
- if raw_data.startswith(u"u\"") or raw_data.startswith(u"u'"):
+ if raw_data.startswith("u\"") or raw_data.startswith("u'"):
raw_data = raw_data[1:]
- raw_data.strip(u"\"'")
- if type(raw_data) == str:
- return unicode(raw_data, "utf-8")
- else:
- return raw_data
- elif self.has_facet(u"inherit"):
+ raw_data.strip("\"'")
+ return raw_data
+ elif self.has_facet("inherit"):
for ancestor in self.ancestry():
if self.universe.contents[ancestor].has_facet(facet):
return self.universe.contents[ancestor].get(facet)
return default
def getboolean(self, facet, default=None):
- u"""Retrieve values as boolean type."""
+ """Retrieve values as boolean type."""
if default is None:
default = False
if self.origin.data.has_option(self.key, facet):
return self.origin.data.getboolean(self.key, facet)
- elif self.has_facet(u"inherit"):
+ elif self.has_facet("inherit"):
for ancestor in self.ancestry():
if self.universe.contents[ancestor].has_facet(facet):
return self.universe.contents[ancestor].getboolean(facet)
return default
def getint(self, facet, default=None):
- u"""Return values as int/long type."""
+ """Return values as int/long type."""
if default is None:
default = 0
if self.origin.data.has_option(self.key, facet):
return self.origin.data.getint(self.key, facet)
- elif self.has_facet(u"inherit"):
+ elif self.has_facet("inherit"):
for ancestor in self.ancestry():
if self.universe.contents[ancestor].has_facet(facet):
return self.universe.contents[ancestor].getint(facet)
return default
def getfloat(self, facet, default=None):
- u"""Return values as float type."""
+ """Return values as float type."""
if default is None:
default = 0.0
if self.origin.data.has_option(self.key, facet):
return self.origin.data.getfloat(self.key, facet)
- elif self.has_facet(u"inherit"):
+ elif self.has_facet("inherit"):
for ancestor in self.ancestry():
if self.universe.contents[ancestor].has_facet(facet):
return self.universe.contents[ancestor].getfloat(facet)
return default
def getlist(self, facet, default=None):
- u"""Return values as list type."""
+ """Return values as list type."""
import data
if default is None:
default = []
return default
def getdict(self, facet, default=None):
- u"""Return values as dict type."""
+ """Return values as dict type."""
import data
if default is None:
default = {}
return default
def set(self, facet, value):
- u"""Set values."""
+ """Set values."""
if not self.has_facet(facet) or not self.get(facet) == value:
- if type(value) is long:
- value = unicode(value)
- elif not type(value) is unicode:
+ if type(value) is long or repr(type(value)) == "<type 'unicode'>":
+ value = str(value)
+ elif not type(value) is str:
value = repr(value)
self.origin.data.set(self.key, facet, value)
self.origin.modified = True
def append(self, facet, value):
- u"""Append value tp a list."""
+ """Append value tp a list."""
if type(value) is long:
- value = unicode(value)
- elif not type(value) is unicode:
+ value = str(value)
+ elif not type(value) is str:
value = repr(value)
newlist = self.getlist(facet)
newlist.append(value)
self.set(facet, newlist)
def new_event(self, action, when=None):
- u"""Create, attach and enqueue an event element."""
+ """Create, attach and enqueue an event element."""
# if when isn't specified, that means now
if not when:
when = self.universe.get_time()
# events are elements themselves
- event = Element(u"event:" + self.key + u":" + counter)
+ event = Element("event:" + self.key + ":" + counter)
def send(
self,
message,
- eol=u"$(eol)",
+ eol="$(eol)",
raw=False,
flush=False,
add_prompt=True,
add_terminator=False,
prepend_padding=True
):
- u"""Convenience method to pass messages to an owner."""
+ """Convenience method to pass messages to an owner."""
if self.owner:
self.owner.send(
message,
)
def can_run(self, command):
- u"""Check if the user can run this command object."""
+ """Check if the user can run this command object."""
# has to be in the commands category
- if command not in self.universe.categories[u"command"].values():
+ if command not in self.universe.categories["command"].values():
result = False
# avatars of administrators can run any command
- elif self.owner and self.owner.account.getboolean(u"administrator"):
+ elif self.owner and self.owner.account.getboolean("administrator"):
result = True
# everyone can run non-administrative commands
- elif not command.getboolean(u"administrative"):
+ elif not command.getboolean("administrative"):
result = True
# otherwise the command cannot be run by this actor
return result
def update_location(self):
- u"""Make sure the location's contents contain this element."""
- location = self.get(u"location")
+ """Make sure the location's contents contain this element."""
+ location = self.get("location")
if location in self.universe.contents:
self.universe.contents[location].contents[self.key] = self
def clean_contents(self):
- u"""Make sure the element's contents aren't bogus."""
+ """Make sure the element's contents aren't bogus."""
for element in self.contents.values():
- if element.get(u"location") != self.key:
+ if element.get("location") != self.key:
del self.contents[element.key]
def go_to(self, location):
- u"""Relocate the element to a specific location."""
- current = self.get(u"location")
+ """Relocate the element to a specific location."""
+ current = self.get("location")
if current and self.key in self.universe.contents[current].contents:
del universe.contents[current].contents[self.key]
if location in self.universe.contents:
- self.set(u"location", location)
+ self.set("location", location)
self.universe.contents[location].contents[self.key] = self
self.look_at(location)
def go_home(self):
- u"""Relocate the element to its default location."""
- self.go_to(self.get(u"default_location"))
+ """Relocate the element to its default location."""
+ self.go_to(self.get("default_location"))
self.echo_to_location(
- u"You suddenly realize that " + self.get(u"name") + u" is here."
+ "You suddenly realize that " + self.get("name") + " is here."
)
def move_direction(self, direction):
- u"""Relocate the element in a specified direction."""
+ """Relocate the element in a specified direction."""
self.echo_to_location(
self.get(
- u"name"
- ) + u" exits " + self.universe.categories[
- u"internal"
+ "name"
+ ) + " exits " + self.universe.categories[
+ "internal"
][
- u"directions"
+ "directions"
].getdict(
direction
)[
- u"exit"
- ] + u"."
+ "exit"
+ ] + "."
)
self.send(
- u"You exit " + self.universe.categories[
- u"internal"
+ "You exit " + self.universe.categories[
+ "internal"
][
- u"directions"
+ "directions"
].getdict(
direction
)[
- u"exit"
- ] + u".",
+ "exit"
+ ] + ".",
add_prompt=False
)
self.go_to(
self.universe.contents[
- self.get(u"location")].link_neighbor(direction)
+ self.get("location")].link_neighbor(direction)
)
self.echo_to_location(
self.get(
- u"name"
- ) + u" arrives from " + self.universe.categories[
- u"internal"
+ "name"
+ ) + " arrives from " + self.universe.categories[
+ "internal"
][
- u"directions"
+ "directions"
].getdict(
direction
)[
- u"enter"
- ] + u"."
+ "enter"
+ ] + "."
)
def look_at(self, key):
- u"""Show an element to another element."""
+ """Show an element to another element."""
if self.owner:
element = self.universe.contents[key]
- message = u""
- name = element.get(u"name")
+ message = ""
+ name = element.get("name")
if name:
- message += u"$(cyn)" + name + u"$(nrm)$(eol)"
- description = element.get(u"description")
+ message += "$(cyn)" + name + "$(nrm)$(eol)"
+ description = element.get("description")
if description:
- message += description + u"$(eol)"
+ message += description + "$(eol)"
portal_list = element.portals().keys()
if portal_list:
portal_list.sort()
- message += u"$(cyn)[ Exits: " + u", ".join(
+ message += "$(cyn)[ Exits: " + ", ".join(
portal_list
- ) + u" ]$(nrm)$(eol)"
+ ) + " ]$(nrm)$(eol)"
for element in self.universe.contents[
- self.get(u"location")
+ self.get("location")
].contents.values():
- if element.getboolean(u"is_actor") and element is not self:
- message += u"$(yel)" + element.get(
- u"name"
- ) + u" is here.$(nrm)$(eol)"
+ if element.getboolean("is_actor") and element is not self:
+ message += "$(yel)" + element.get(
+ "name"
+ ) + " is here.$(nrm)$(eol)"
elif element is not self:
- message += u"$(grn)" + element.get(
- u"impression"
- ) + u"$(nrm)$(eol)"
+ message += "$(grn)" + element.get(
+ "impression"
+ ) + "$(nrm)$(eol)"
self.send(message)
def portals(self):
- u"""Map the portal directions for a room to neighbors."""
+ """Map the portal directions for a room to neighbors."""
import re
portals = {}
- if re.match(u"""^location:-?\d+,-?\d+,-?\d+$""", self.key):
+ if re.match("""^location:-?\d+,-?\d+,-?\d+$""", self.key):
coordinates = [(int(x))
- for x in self.key.split(u":")[1].split(u",")]
- directions = self.universe.categories[u"internal"][u"directions"]
+ for x in self.key.split(":")[1].split(",")]
+ directions = self.universe.categories["internal"]["directions"]
offsets = dict(
[
(
- x, directions.getdict(x)[u"vector"]
+ x, directions.getdict(x)["vector"]
) for x in directions.facets()
]
)
- for portal in self.getlist(u"gridlinks"):
+ for portal in self.getlist("gridlinks"):
adjacent = map(lambda c, o: c + o,
coordinates, offsets[portal])
- neighbor = u"location:" + u",".join(
- [(unicode(x)) for x in adjacent]
+ neighbor = "location:" + ",".join(
+ [(str(x)) for x in adjacent]
)
if neighbor in self.universe.contents:
portals[portal] = neighbor
for facet in self.facets():
- if facet.startswith(u"link_"):
+ if facet.startswith("link_"):
neighbor = self.get(facet)
if neighbor in self.universe.contents:
- portal = facet.split(u"_")[1]
+ portal = facet.split("_")[1]
portals[portal] = neighbor
return portals
def link_neighbor(self, direction):
- u"""Return the element linked in a given direction."""
+ """Return the element linked in a given direction."""
portals = self.portals()
if direction in portals:
return portals[direction]
def echo_to_location(self, message):
- u"""Show a message to other elements in the current location."""
+ """Show a message to other elements in the current location."""
for element in self.universe.contents[
- self.get(u"location")
+ self.get("location")
].contents.values():
if element is not self:
element.send(message)
class Universe:
- u"""The universe."""
+ """The universe."""
- def __init__(self, filename=u"", load=False):
- u"""Initialize the universe."""
+ def __init__(self, filename="", load=False):
+ """Initialize the universe."""
import os
import os.path
self.categories = {}
self.userlist = []
if not filename:
possible_filenames = [
- u".mudpyrc",
- u".mudpy/mudpyrc",
- u".mudpy/mudpy.conf",
- u"mudpy.conf",
- u"etc/mudpy.conf",
- u"/usr/local/mudpy/mudpy.conf",
- u"/usr/local/mudpy/etc/mudpy.conf",
- u"/etc/mudpy/mudpy.conf",
- u"/etc/mudpy.conf"
+ ".mudpyrc",
+ ".mudpy/mudpyrc",
+ ".mudpy/mudpy.conf",
+ "mudpy.conf",
+ "etc/mudpy.conf",
+ "/usr/local/mudpy/mudpy.conf",
+ "/usr/local/mudpy/etc/mudpy.conf",
+ "/etc/mudpy/mudpy.conf",
+ "/etc/mudpy.conf"
]
for filename in possible_filenames:
if os.access(filename, os.R_OK):
self.load()
def load(self):
- u"""Load universe data from persistent storage."""
+ """Load universe data from persistent storage."""
import data
# the files dict must exist and filename needs to be read-only
if not hasattr(
- self, u"files"
+ self, "files"
) or not (
self.filename in self.files and self.files[
self.filename
):
# clear out all read-only files
- if hasattr(self, u"files"):
+ if hasattr(self, "files"):
for data_filename in self.files.keys():
if not self.files[data_filename].is_writeable():
del self.files[data_filename]
# make a list of inactive avatars
inactive_avatars = []
- for account in self.categories[u"account"].values():
+ for account in self.categories["account"].values():
inactive_avatars += [
- (self.contents[x]) for x in account.getlist(u"avatars")
+ (self.contents[x]) for x in account.getlist("avatars")
]
for user in self.userlist:
if user.avatar in inactive_avatars:
# go through all elements to clear out inactive avatar locations
for element in self.contents.values():
- location = element.get(u"location")
+ location = element.get("location")
if element in inactive_avatars and location:
if location in self.contents and element.key in self.contents[
location
].contents:
del self.contents[location].contents[element.key]
- element.set(u"default_location", location)
- element.remove_facet(u"location")
+ element.set("default_location", location)
+ element.remove_facet("location")
# another pass to straighten out all the element contents
for element in self.contents.values():
element.clean_contents()
def new(self):
- u"""Create a new, empty Universe (the Big Bang)."""
+ """Create a new, empty Universe (the Big Bang)."""
new_universe = Universe()
for attribute in vars(self).keys():
- exec(u"new_universe." + attribute + u" = self." + attribute)
+ exec("new_universe." + attribute + " = self." + attribute)
new_universe.reload_flag = False
del self
return new_universe
def save(self):
- u"""Save the universe to persistent storage."""
+ """Save the universe to persistent storage."""
for key in self.files:
self.files[key].save()
def initialize_server_socket(self):
- u"""Create and open the listening socket."""
+ """Create and open the listening socket."""
import socket
# need to know the local address and port number for the listener
- host = self.categories[u"internal"][u"network"].get(u"host")
- port = self.categories[u"internal"][u"network"].getint(u"port")
+ host = self.categories["internal"]["network"].get("host")
+ port = self.categories["internal"]["network"].getint("port")
# if no host was specified, bind to all local addresses (preferring
# ipv6)
if not host:
if socket.has_ipv6:
- host = u"::"
+ host = "::"
else:
- host = u"0.0.0.0"
+ host = "0.0.0.0"
# figure out if this is ipv4 or v6
family = socket.getaddrinfo(host, port)[0][0]
if family is socket.AF_INET6 and not socket.has_ipv6:
- log(u"No support for IPv6 address %s (use IPv4 instead)." % host)
+ log("No support for IPv6 address %s (use IPv4 instead)." % host)
# create a new stream-type socket object
self.listening_socket = socket.socket(family, socket.SOCK_STREAM)
# note that we're now ready for user connections
log(
- u"Listening for Telnet connections on: " +
- host + u":" + unicode(port)
+ "Listening for Telnet connections on: " +
+ host + ":" + str(port)
)
def get_time(self):
- u"""Convenience method to get the elapsed time counter."""
- return self.categories[u"internal"][u"counters"].getint(u"elapsed")
+ """Convenience method to get the elapsed time counter."""
+ return self.categories["internal"]["counters"].getint("elapsed")
class User:
- u"""This is a connected user."""
+ """This is a connected user."""
def __init__(self):
- u"""Default values for the in-memory user variables."""
+ """Default values for the in-memory user variables."""
import telnet
self.account = None
- self.address = u""
+ self.address = ""
self.authenticated = False
self.avatar = None
self.columns = 79
self.connection = None
- self.error = u""
+ self.error = ""
self.input_queue = []
- self.last_address = u""
+ self.last_address = ""
self.last_input = universe.get_time()
self.menu_choices = {}
self.menu_seen = False
self.output_queue = []
self.partial_input = ""
self.password_tries = 0
- self.state = u"initial"
+ self.state = "initial"
self.telopts = {}
def quit(self):
- u"""Log, close the connection and remove."""
+ """Log, close the connection and remove."""
if self.account:
- name = self.account.get(u"name")
+ name = self.account.get("name")
else:
- name = u""
+ name = ""
if name:
- message = u"User " + name
+ message = "User " + name
else:
- message = u"An unnamed user"
- message += u" logged out."
+ message = "An unnamed user"
+ message += " logged out."
log(message, 2)
self.deactivate_avatar()
self.connection.close()
self.remove()
def check_idle(self):
- u"""Warn or disconnect idle users as appropriate."""
+ """Warn or disconnect idle users as appropriate."""
idletime = universe.get_time() - self.last_input
- linkdead_dict = universe.categories[u"internal"][u"time"].getdict(
- u"linkdead"
+ linkdead_dict = universe.categories["internal"]["time"].getdict(
+ "linkdead"
)
if self.state in linkdead_dict:
linkdead_state = self.state
else:
- linkdead_state = u"default"
+ linkdead_state = "default"
if idletime > linkdead_dict[linkdead_state]:
self.send(
- u"$(eol)$(red)You've done nothing for far too long... goodbye!"
- + u"$(nrm)$(eol)",
+ "$(eol)$(red)You've done nothing for far too long... goodbye!"
+ + "$(nrm)$(eol)",
flush=True,
add_prompt=False
)
- logline = u"Disconnecting "
- if self.account and self.account.get(u"name"):
- logline += self.account.get(u"name")
+ logline = "Disconnecting "
+ if self.account and self.account.get("name"):
+ logline += self.account.get("name")
else:
- logline += u"an unknown user"
- logline += u" after idling too long in the " + \
- self.state + u" state."
+ logline += "an unknown user"
+ logline += " after idling too long in the " + \
+ self.state + " state."
log(logline, 2)
- self.state = u"disconnecting"
+ self.state = "disconnecting"
self.menu_seen = False
- idle_dict = universe.categories[u"internal"][u"time"].getdict(u"idle")
+ idle_dict = universe.categories["internal"]["time"].getdict("idle")
if self.state in idle_dict:
idle_state = self.state
else:
- idle_state = u"default"
+ idle_state = "default"
if idletime == idle_dict[idle_state]:
self.send(
- u"$(eol)$(red)If you continue to be unproductive, "
- + u"you'll be shown the door...$(nrm)$(eol)"
+ "$(eol)$(red)If you continue to be unproductive, "
+ + "you'll be shown the door...$(nrm)$(eol)"
)
def reload(self):
- u"""Save, load a new user and relocate the connection."""
+ """Save, load a new user and relocate the connection."""
# get out of the list
self.remove()
# set everything equivalent
for attribute in vars(self).keys():
- exec(u"new_user." + attribute + u" = self." + attribute)
+ exec("new_user." + attribute + " = self." + attribute)
# the avatar needs a new owner
if new_user.avatar:
del(self)
def replace_old_connections(self):
- u"""Disconnect active users with the same name."""
+ """Disconnect active users with the same name."""
# the default return value
return_value = False
# the name is the same but it's not us
if hasattr(
- old_user, u"account"
+ old_user, "account"
) and old_user.account and old_user.account.get(
- u"name"
+ "name"
) == self.account.get(
- u"name"
+ "name"
) and old_user is not self:
# make a note of it
log(
- u"User " + self.account.get(
- u"name"
- ) + u" reconnected--closing old connection to "
- + old_user.address + u".",
+ "User " + self.account.get(
+ "name"
+ ) + " reconnected--closing old connection to "
+ + old_user.address + ".",
2
)
old_user.send(
- u"$(eol)$(red)New connection from " + self.address
- + u". Terminating old connection...$(nrm)$(eol)",
+ "$(eol)$(red)New connection from " + self.address
+ + ". Terminating old connection...$(nrm)$(eol)",
flush=True,
add_prompt=False
)
# replace the old connection with this one
old_user.send(
- u"$(eol)$(red)Taking over old connection from "
- + old_user.address + u".$(nrm)"
+ "$(eol)$(red)Taking over old connection from "
+ + old_user.address + ".$(nrm)"
)
old_user.connection = self.connection
old_user.last_address = old_user.address
return return_value
def authenticate(self):
- u"""Flag the user as authenticated and disconnect duplicates."""
- if self.state is not u"authenticated":
- log(u"User " + self.account.get(u"name") + u" logged in.", 2)
+ """Flag the user as authenticated and disconnect duplicates."""
+ if self.state is not "authenticated":
+ log("User " + self.account.get("name") + " logged in.", 2)
self.authenticated = True
if self.account.subkey in universe.categories[
- u"internal"
+ "internal"
][
- u"limits"
+ "limits"
].getlist(
- u"default_admins"
+ "default_admins"
):
- self.account.set(u"administrator", u"True")
+ self.account.set("administrator", "True")
def show_menu(self):
- u"""Send the user their current menu."""
+ """Send the user their current menu."""
if not self.menu_seen:
self.menu_choices = get_menu_choices(self)
self.send(
get_menu(self.state, self.error, self.menu_choices),
- u"",
+ "",
add_terminator=True
)
self.menu_seen = True
self.adjust_echoing()
def adjust_echoing(self):
- u"""Adjust echoing to match state menu requirements."""
+ """Adjust echoing to match state menu requirements."""
import telnet
if telnet.is_enabled(self, telnet.TELOPT_ECHO, telnet.US):
if menu_echo_on(self.state):
telnet.enable(self, telnet.TELOPT_ECHO, telnet.US)
def remove(self):
- u"""Remove a user from the list of connected users."""
+ """Remove a user from the list of connected users."""
universe.userlist.remove(self)
def send(
self,
output,
- eol=u"$(eol)",
+ eol="$(eol)",
raw=False,
flush=False,
add_prompt=True,
add_terminator=False,
prepend_padding=True
):
- u"""Send arbitrary text to a connected user."""
+ """Send arbitrary text to a connected user."""
import telnet
# unless raw mode is on, clean it up all nice and pretty
if not raw:
# strip extra $(eol) off if present
- while output.startswith(u"$(eol)"):
+ while output.startswith("$(eol)"):
output = output[6:]
- while output.endswith(u"$(eol)"):
+ while output.endswith("$(eol)"):
output = output[:-6]
- extra_lines = output.find(u"$(eol)$(eol)$(eol)")
+ extra_lines = output.find("$(eol)$(eol)$(eol)")
while extra_lines > -1:
output = output[:extra_lines] + output[extra_lines + 6:]
- extra_lines = output.find(u"$(eol)$(eol)$(eol)")
+ extra_lines = output.find("$(eol)$(eol)$(eol)")
# start with a newline, append the message, then end
# with the optional eol string passed to this function
if not just_prompt and prepend_padding:
if not self.output_queue \
or not self.output_queue[-1].endswith("\r\n"):
- output = u"$(eol)" + output
+ output = "$(eol)" + output
elif not self.output_queue[-1].endswith(
"\r\n\x1b[0m\r\n"
) and not self.output_queue[-1].endswith(
"\r\n\r\n"
):
- output = u"$(eol)" + output
- output += eol + unichr(27) + u"[0m"
+ output = "$(eol)" + output
+ output += eol + unichr(27) + "[0m"
# tack on a prompt if active
- if self.state == u"active":
+ if self.state == "active":
if not just_prompt:
- output += u"$(eol)"
+ output += "$(eol)"
if add_prompt:
- output += u"> "
- mode = self.avatar.get(u"mode")
+ output += "> "
+ mode = self.avatar.get("mode")
if mode:
- output += u"(" + mode + u") "
+ output += "(" + mode + ") "
# find and replace macros in the output
output = replace_macros(self, output)
# if supported by the client, encode it utf-8
if telnet.is_enabled(self, telnet.TELOPT_BINARY, telnet.US):
- encoded_output = output.encode(u"utf-8")
+ encoded_output = output.encode("utf-8")
# otherwise just send ascii
else:
- encoded_output = output.encode(u"ascii", u"replace")
+ encoded_output = output.encode("ascii", "replace")
# end with a terminator if requested
if add_prompt or add_terminator:
self.flush()
def pulse(self):
- u"""All the things to do to the user per increment."""
+ """All the things to do to the user per increment."""
# if the world is terminating, disconnect
if universe.terminate_flag:
- self.state = u"disconnecting"
+ self.state = "disconnecting"
self.menu_seen = False
# check for an idle connection and act appropriately
self.check_idle()
# if output is paused, decrement the counter
- if self.state == u"initial":
+ if self.state == "initial":
if self.negotiation_pause:
self.negotiation_pause -= 1
else:
- self.state = u"entering_account_name"
+ self.state = "entering_account_name"
# show the user a menu as needed
- elif not self.state == u"active":
+ elif not self.state == "active":
self.show_menu()
# flush any pending output in the queue
self.flush()
# disconnect users with the appropriate state
- if self.state == u"disconnecting":
+ if self.state == "disconnecting":
self.quit()
# check for input and add it to the queue
handle_user_input(self)
def flush(self):
- u"""Try to send the last item in the queue and remove it."""
+ """Try to send the last item in the queue and remove it."""
if self.output_queue:
try:
self.connection.send(self.output_queue[0])
del self.output_queue[0]
except:
- if self.account and self.account.get(u"name"):
- account = self.account.get(u"name")
+ if self.account and self.account.get("name"):
+ account = self.account.get("name")
else:
- account = u"an unknown user"
- log(u"Sending to %s raised an exception (broken pipe?)."
+ account = "an unknown user"
+ log("Sending to %s raised an exception (broken pipe?)."
% account, 7)
pass
def enqueue_input(self):
- u"""Process and enqueue any new input."""
+ """Process and enqueue any new input."""
import telnet
import unicodedata
# strip off extra whitespace
line = line.strip()
- # make sure it's valid unicode (probably no longer needed)
- try:
- unicode(line, u"utf-8")
- except UnicodeDecodeError:
- logline = u"Non-unicode data from "
- if self.account and self.account.get(u"name"):
- logline += self.account.get(u"name") + u": "
- else:
- logline += u"unknown user: "
- logline += repr(line)
- log(logline, 4)
- line = ""
-
# log non-printable characters remaining
if telnet.is_enabled(self, telnet.TELOPT_BINARY, telnet.HIM):
asciiline = filter(lambda x: " " <= x <= "~", line)
if line != asciiline:
- logline = u"Non-ASCII characters from "
- if self.account and self.account.get(u"name"):
- logline += self.account.get(u"name") + u": "
+ logline = "Non-ASCII characters from "
+ if self.account and self.account.get("name"):
+ logline += self.account.get("name") + ": "
else:
- logline += u"unknown user: "
+ logline += "unknown user: "
logline += repr(line)
log(logline, 4)
line = asciiline
# put on the end of the queue
self.input_queue.append(
- unicodedata.normalize(u"NFKC", unicode(line, u"utf-8"))
+ unicodedata.normalize("NFKC", line.decode("utf-8"))
)
def new_avatar(self):
- u"""Instantiate a new, unconfigured avatar for this user."""
+ """Instantiate a new, unconfigured avatar for this user."""
counter = 0
- while u"avatar:" + self.account.get(u"name") + u":" + unicode(
+ while "avatar:" + self.account.get("name") + ":" + str(
counter
- ) in universe.categories[u"actor"].keys():
+ ) in universe.categories["actor"].keys():
counter += 1
self.avatar = Element(
- u"actor:avatar:" + self.account.get(u"name") + u":" + unicode(
+ "actor:avatar:" + self.account.get("name") + ":" + str(
counter
),
universe
)
- self.avatar.append(u"inherit", u"archetype:avatar")
- self.account.append(u"avatars", self.avatar.key)
+ self.avatar.append("inherit", "archetype:avatar")
+ self.account.append("avatars", self.avatar.key)
def delete_avatar(self, avatar):
- u"""Remove an avatar from the world and from the user's list."""
+ """Remove an avatar from the world and from the user's list."""
if self.avatar is universe.contents[avatar]:
self.avatar = None
universe.contents[avatar].destroy()
- avatars = self.account.getlist(u"avatars")
+ avatars = self.account.getlist("avatars")
avatars.remove(avatar)
- self.account.set(u"avatars", avatars)
+ self.account.set("avatars", avatars)
def activate_avatar_by_index(self, index):
- u"""Enter the world with a particular indexed avatar."""
+ """Enter the world with a particular indexed avatar."""
self.avatar = universe.contents[
- self.account.getlist(u"avatars")[index]]
+ self.account.getlist("avatars")[index]]
self.avatar.owner = self
- self.state = u"active"
+ self.state = "active"
self.avatar.go_home()
def deactivate_avatar(self):
- u"""Have the active avatar leave the world."""
+ """Have the active avatar leave the world."""
if self.avatar:
- current = self.avatar.get(u"location")
+ current = self.avatar.get("location")
if current:
- self.avatar.set(u"default_location", current)
+ self.avatar.set("default_location", current)
self.avatar.echo_to_location(
- u"You suddenly wonder where " + self.avatar.get(
- u"name"
- ) + u" went."
+ "You suddenly wonder where " + self.avatar.get(
+ "name"
+ ) + " went."
)
del universe.contents[current].contents[self.avatar.key]
- self.avatar.remove_facet(u"location")
+ self.avatar.remove_facet("location")
self.avatar.owner = None
self.avatar = None
def destroy(self):
- u"""Destroy the user and associated avatars."""
- for avatar in self.account.getlist(u"avatars"):
+ """Destroy the user and associated avatars."""
+ for avatar in self.account.getlist("avatars"):
self.delete_avatar(avatar)
self.account.destroy()
def list_avatar_names(self):
- u"""List names of assigned avatars."""
+ """List names of assigned avatars."""
return [
universe.contents[avatar].get(
- u"name"
- ) for avatar in self.account.getlist(u"avatars")
+ "name"
+ ) for avatar in self.account.getlist("avatars")
]
def broadcast(message, add_prompt=True):
- u"""Send a message to all connected users."""
+ """Send a message to all connected users."""
for each_user in universe.userlist:
- each_user.send(u"$(eol)" + message, add_prompt=add_prompt)
+ each_user.send("$(eol)" + message, add_prompt=add_prompt)
def log(message, level=0):
- u"""Log a message."""
+ """Log a message."""
import codecs
import os.path
import syslog
import time
# a couple references we need
- file_name = universe.categories[u"internal"][u"logging"].get(u"file")
- max_log_lines = universe.categories[u"internal"][u"logging"].getint(
- u"max_log_lines"
+ file_name = universe.categories["internal"]["logging"].get("file")
+ max_log_lines = universe.categories["internal"]["logging"].getint(
+ "max_log_lines"
)
- syslog_name = universe.categories[u"internal"][u"logging"].get(u"syslog")
+ syslog_name = universe.categories["internal"]["logging"].get("syslog")
timestamp = time.asctime()[4:19]
# turn the message into a list of lines
lines = filter(
- lambda x: x != u"", [(x.rstrip()) for x in message.split(u"\n")]
+ lambda x: x != "", [(x.rstrip()) for x in message.split("\n")]
)
# send the timestamp and line to a file
if file_name:
if not os.path.isabs(file_name):
file_name = os.path.join(universe.startdir, file_name)
- file_descriptor = codecs.open(file_name, u"a", u"utf-8")
+ file_descriptor = codecs.open(file_name, "a", "utf-8")
for line in lines:
- file_descriptor.write(timestamp + u" " + line + u"\n")
+ file_descriptor.write(timestamp + " " + line + "\n")
file_descriptor.flush()
file_descriptor.close()
# send the timestamp and line to standard output
- if universe.categories[u"internal"][u"logging"].getboolean(u"stdout"):
+ if universe.categories["internal"]["logging"].getboolean("stdout"):
for line in lines:
- print(timestamp + u" " + line)
+ print(timestamp + " " + line)
# send the line to the system log
if syslog_name:
# display to connected administrators
for user in universe.userlist:
- if user.state == u"active" and user.account.getboolean(
- u"administrator"
- ) and user.account.getint(u"loglevel") <= level:
+ if user.state == "active" and user.account.getboolean(
+ "administrator"
+ ) and user.account.getint("loglevel") <= level:
# iterate over every line in the message
- full_message = u""
+ full_message = ""
for line in lines:
full_message += (
- u"$(bld)$(red)" + timestamp + u" "
- + line.replace(u"$(", u"$_(") + u"$(nrm)$(eol)")
+ "$(bld)$(red)" + timestamp + " "
+ + line.replace("$(", "$_(") + "$(nrm)$(eol)")
user.send(full_message, flush=True)
# add to the recent log list
for line in lines:
while 0 < len(universe.loglines) >= max_log_lines:
del universe.loglines[0]
- universe.loglines.append((level, timestamp + u" " + line))
+ universe.loglines.append((level, timestamp + " " + line))
def get_loglines(level, start, stop):
- u"""Return a specific range of loglines filtered by level."""
+ """Return a specific range of loglines filtered by level."""
# filter the log lines
loglines = filter(lambda x: x[0] >= level, universe.loglines)
# we need these in several places
- total_count = unicode(len(universe.loglines))
+ total_count = str(len(universe.loglines))
filtered_count = len(loglines)
# don't proceed if there are no lines
stop = 1
# some preamble
- message = u"There are " + unicode(total_count)
- message += u" log lines in memory and " + unicode(filtered_count)
- message += u" at or above level " + unicode(level) + u"."
- message += u" The matching lines from " + unicode(stop) + u" to "
- message += unicode(start) + u" are:$(eol)$(eol)"
+ message = "There are " + str(total_count)
+ message += " log lines in memory and " + str(filtered_count)
+ message += " at or above level " + str(level) + "."
+ message += " The matching lines from " + str(stop) + " to "
+ message += str(start) + " are:$(eol)$(eol)"
# add the text from the selected lines
if stop > 1:
else:
range_lines = loglines[-start:]
for line in range_lines:
- message += u" (" + unicode(line[0]) + u") " + line[1].replace(
- u"$(", u"$_("
- ) + u"$(eol)"
+ message += " (" + str(line[0]) + ") " + line[1].replace(
+ "$(", "$_("
+ ) + "$(eol)"
# there were no lines
else:
- message = u"None of the " + unicode(total_count)
- message += u" lines in memory matches your request."
+ message = "None of the " + str(total_count)
+ message += " lines in memory matches your request."
# pass it back
return message
def glyph_columns(character):
- u"""Convenience function to return the column width of a glyph."""
+ """Convenience function to return the column width of a glyph."""
import unicodedata
- if unicodedata.east_asian_width(character) in u"FW":
+ if unicodedata.east_asian_width(character) in "FW":
return 2
else:
return 1
def wrap_ansi_text(text, width):
- u"""Wrap text with arbitrary width while ignoring ANSI colors."""
+ """Wrap text with arbitrary width while ignoring ANSI colors."""
import unicodedata
# the current position in the entire text string, including all
escape = False
# normalize any potentially composited unicode before we count it
- text = unicodedata.normalize(u"NFKC", text)
+ text = unicodedata.normalize("NFKC", text)
# iterate over each character from the begining of the text
for each_character in text:
# the current character is the escape character
- if each_character == u"\x1b" and not escape:
+ if each_character == "\x1b" and not escape:
escape = True
# the current character is within an escape sequence
# the current character is m, which terminates the
# escape sequence
- if each_character == u"m":
+ if each_character == "m":
escape = False
# the current character is a newline, so reset the relative
# position (start a new line)
- elif each_character == u"\n":
+ elif each_character == "\n":
rel_pos = 0
last_whitespace = abs_pos
# so we need to backtrack and find a space at which to wrap;
# special care is taken to avoid an off-by-one in case the
# current character is a double-width glyph
- elif each_character != u"\r" and (
+ elif each_character != "\r" and (
rel_pos >= width or (
rel_pos >= width - 1 and glyph_columns(
each_character
):
# it's always possible we landed on whitespace
- if unicodedata.category(each_character) in (u"Cc", u"Zs"):
+ if unicodedata.category(each_character) in ("Cc", "Zs"):
last_whitespace = abs_pos
# insert an eol in place of the space
text = text[:last_whitespace] + \
- u"\r\n" + text[last_whitespace + 1:]
+ "\r\n" + text[last_whitespace + 1:]
# increase the absolute position because an eol is two
# characters but the space it replaced was only one
# as long as the character is not a carriage return and the
# other above conditions haven't been met, count it as a
# printable character
- elif each_character != u"\r":
+ elif each_character != "\r":
rel_pos += glyph_columns(each_character)
- if unicodedata.category(each_character) in (u"Cc", u"Zs"):
+ if unicodedata.category(each_character) in ("Cc", "Zs"):
last_whitespace = abs_pos
# increase the absolute position for every character
def weighted_choice(data):
- u"""Takes a dict weighted by value and returns a random key."""
+ """Takes a dict weighted by value and returns a random key."""
import random
# this will hold our expanded list of keys from the data
def random_name():
- u"""Returns a random character name."""
+ """Returns a random character name."""
import random
# the vowels and consonants needed to create romaji syllables
vowels = [
- u"a",
- u"i",
- u"u",
- u"e",
- u"o"
+ "a",
+ "i",
+ "u",
+ "e",
+ "o"
]
consonants = [
- u"'",
- u"k",
- u"z",
- u"s",
- u"sh",
- u"z",
- u"j",
- u"t",
- u"ch",
- u"ts",
- u"d",
- u"n",
- u"h",
- u"f",
- u"m",
- u"y",
- u"r",
- u"w"
+ "'",
+ "k",
+ "z",
+ "s",
+ "sh",
+ "z",
+ "j",
+ "t",
+ "ch",
+ "ts",
+ "d",
+ "n",
+ "h",
+ "f",
+ "m",
+ "y",
+ "r",
+ "w"
]
# this dict will hold our weighted list of syllables
syllables[consonant + vowel] = 1
# we'll build the name into this string
- name = u""
+ name = ""
# create a name of random length from the syllables
for syllable in range(random.randrange(2, 6)):
name += weighted_choice(syllables)
# strip any leading quotemark, capitalize and return the name
- return name.strip(u"'").capitalize()
+ return name.strip("'").capitalize()
def replace_macros(user, text, is_input=False):
- u"""Replaces macros in text output."""
+ """Replaces macros in text output."""
import codecs
import data
import os.path
# third person pronouns
pronouns = {
- u"female": {u"obj": u"her", u"pos": u"hers", u"sub": u"she"},
- u"male": {u"obj": u"him", u"pos": u"his", u"sub": u"he"},
- u"neuter": {u"obj": u"it", u"pos": u"its", u"sub": u"it"}
+ "female": {"obj": "her", "pos": "hers", "sub": "she"},
+ "male": {"obj": "him", "pos": "his", "sub": "he"},
+ "neuter": {"obj": "it", "pos": "its", "sub": "it"}
}
# a dict of replacement macros
macros = {
- u"eol": u"\r\n",
- u"bld": unichr(27) + u"[1m",
- u"nrm": unichr(27) + u"[0m",
- u"blk": unichr(27) + u"[30m",
- u"blu": unichr(27) + u"[34m",
- u"cyn": unichr(27) + u"[36m",
- u"grn": unichr(27) + u"[32m",
- u"mgt": unichr(27) + u"[35m",
- u"red": unichr(27) + u"[31m",
- u"yel": unichr(27) + u"[33m",
+ "eol": "\r\n",
+ "bld": unichr(27) + "[1m",
+ "nrm": unichr(27) + "[0m",
+ "blk": unichr(27) + "[30m",
+ "blu": unichr(27) + "[34m",
+ "cyn": unichr(27) + "[36m",
+ "grn": unichr(27) + "[32m",
+ "mgt": unichr(27) + "[35m",
+ "red": unichr(27) + "[31m",
+ "yel": unichr(27) + "[33m",
}
# add dynamic macros where possible
if user.account:
- account_name = user.account.get(u"name")
+ account_name = user.account.get("name")
if account_name:
- macros[u"account"] = account_name
+ macros["account"] = account_name
if user.avatar:
- avatar_gender = user.avatar.get(u"gender")
+ avatar_gender = user.avatar.get("gender")
if avatar_gender:
- macros[u"tpop"] = pronouns[avatar_gender][u"obj"]
- macros[u"tppp"] = pronouns[avatar_gender][u"pos"]
- macros[u"tpsp"] = pronouns[avatar_gender][u"sub"]
+ macros["tpop"] = pronouns[avatar_gender]["obj"]
+ macros["tppp"] = pronouns[avatar_gender]["pos"]
+ macros["tpsp"] = pronouns[avatar_gender]["sub"]
# loop until broken
while True:
# find and replace per the macros dict
- macro_start = text.find(u"$(")
+ macro_start = text.find("$(")
if macro_start == -1:
break
- macro_end = text.find(u")", macro_start) + 1
+ macro_end = text.find(")", macro_start) + 1
macro = text[macro_start + 2:macro_end - 1]
if macro in macros.keys():
replacement = macros[macro]
# this is how we handle local file inclusion (dangerous!)
- elif macro.startswith(u"inc:"):
+ elif macro.startswith("inc:"):
incfile = data.find_file(macro[4:], universe=universe)
if os.path.exists(incfile):
- incfd = codecs.open(incfile, u"r", u"utf-8")
- replacement = u""
+ incfd = codecs.open(incfile, "r", "utf-8")
+ replacement = ""
for line in incfd:
- if line.endswith(u"\n") and not line.endswith(u"\r\n"):
- line = line.replace(u"\n", u"\r\n")
+ if line.endswith("\n") and not line.endswith("\r\n"):
+ line = line.replace("\n", "\r\n")
replacement += line
# lose the trailing eol
replacement = replacement[:-2]
else:
- replacement = u""
- log(u"Couldn't read included " + incfile + u" file.", 6)
+ replacement = ""
+ log("Couldn't read included " + incfile + " file.", 6)
# if we get here, log and replace it with null
else:
- replacement = u""
+ replacement = ""
if not is_input:
- log(u"Unexpected replacement macro " +
- macro + u" encountered.", 6)
+ log("Unexpected replacement macro " +
+ macro + " encountered.", 6)
# and now we act on the replacement
- text = text.replace(u"$(" + macro + u")", replacement)
+ text = text.replace("$(" + macro + ")", replacement)
# replace the look-like-a-macro sequence
- text = text.replace(u"$_(", u"$(")
+ text = text.replace("$_(", "$(")
return text
def escape_macros(text):
- u"""Escapes replacement macros in text."""
- return text.replace(u"$(", u"$_(")
+ """Escapes replacement macros in text."""
+ return text.replace("$(", "$_(")
-def first_word(text, separator=u" "):
- u"""Returns a tuple of the first word and the rest."""
+def first_word(text, separator=" "):
+ """Returns a tuple of the first word and the rest."""
if text:
if text.find(separator) > 0:
return text.split(separator, 1)
else:
- return text, u""
+ return text, ""
else:
- return u"", u""
+ return "", ""
def on_pulse():
- u"""The things which should happen on each pulse, aside from reloads."""
+ """The things which should happen on each pulse, aside from reloads."""
import time
# open the listening socket if it hasn't been already
- if not hasattr(universe, u"listening_socket"):
+ if not hasattr(universe, "listening_socket"):
universe.initialize_server_socket()
# assign a user if a new connection is waiting
user.pulse()
# add an element for counters if it doesn't exist
- if u"counters" not in universe.categories[u"internal"]:
- universe.categories[u"internal"][u"counters"] = Element(
- u"internal:counters", universe
+ if "counters" not in universe.categories["internal"]:
+ universe.categories["internal"]["counters"] = Element(
+ "internal:counters", universe
)
# update the log every now and then
- if not universe.categories[u"internal"][u"counters"].getint(u"mark"):
- log(unicode(len(universe.userlist)) + u" connection(s)")
- universe.categories[u"internal"][u"counters"].set(
- u"mark", universe.categories[u"internal"][u"time"].getint(
- u"frequency_log"
+ if not universe.categories["internal"]["counters"].getint("mark"):
+ log(str(len(universe.userlist)) + " connection(s)")
+ universe.categories["internal"]["counters"].set(
+ "mark", universe.categories["internal"]["time"].getint(
+ "frequency_log"
)
)
else:
- universe.categories[u"internal"][u"counters"].set(
- u"mark", universe.categories[u"internal"][u"counters"].getint(
- u"mark"
+ universe.categories["internal"]["counters"].set(
+ "mark", universe.categories["internal"]["counters"].getint(
+ "mark"
) - 1
)
# periodically save everything
- if not universe.categories[u"internal"][u"counters"].getint(u"save"):
+ if not universe.categories["internal"]["counters"].getint("save"):
universe.save()
- universe.categories[u"internal"][u"counters"].set(
- u"save", universe.categories[u"internal"][u"time"].getint(
- u"frequency_save"
+ universe.categories["internal"]["counters"].set(
+ "save", universe.categories["internal"]["time"].getint(
+ "frequency_save"
)
)
else:
- universe.categories[u"internal"][u"counters"].set(
- u"save", universe.categories[u"internal"][u"counters"].getint(
- u"save"
+ universe.categories["internal"]["counters"].set(
+ "save", universe.categories["internal"]["counters"].getint(
+ "save"
) - 1
)
# pause for a configurable amount of time (decimal seconds)
- time.sleep(universe.categories[u"internal"]
- [u"time"].getfloat(u"increment"))
+ time.sleep(universe.categories["internal"]
+ ["time"].getfloat("increment"))
# increase the elapsed increment counter
- universe.categories[u"internal"][u"counters"].set(
- u"elapsed", universe.categories[u"internal"][u"counters"].getint(
- u"elapsed"
+ universe.categories["internal"]["counters"].set(
+ "elapsed", universe.categories["internal"]["counters"].getint(
+ "elapsed"
) + 1
)
def reload_data():
- u"""Reload all relevant objects."""
+ """Reload all relevant objects."""
for user in universe.userlist[:]:
user.reload()
for element in universe.contents.values():
def check_for_connection(listening_socket):
- u"""Check for a waiting connection and return a new user object."""
+ """Check for a waiting connection and return a new user object."""
import telnet
# try to accept a new connection
return None
# note that we got one
- log(u"Connection from " + address[0], 2)
+ log("Connection from " + address[0], 2)
# disable blocking so we can proceed whether or not we can send/receive
connection.setblocking(0)
def get_menu(state, error=None, choices=None):
- u"""Show the correct menu text to a user."""
+ """Show the correct menu text to a user."""
# make sure we don't reuse a mutable sequence by default
if choices is None:
def menu_echo_on(state):
- u"""True if echo is on, false if it is off."""
- return universe.categories[u"menu"][state].getboolean(u"echo", True)
+ """True if echo is on, false if it is off."""
+ return universe.categories["menu"][state].getboolean("echo", True)
def get_echo_message(state):
- u"""Return a message indicating that echo is off."""
+ """Return a message indicating that echo is off."""
if menu_echo_on(state):
- return u""
+ return ""
else:
- return u"(won't echo) "
+ return "(won't echo) "
def get_default_menu_choice(state):
- u"""Return the default choice for a menu."""
- return universe.categories[u"menu"][state].get(u"default")
+ """Return the default choice for a menu."""
+ return universe.categories["menu"][state].get("default")
def get_formatted_default_menu_choice(state):
- u"""Default menu choice foratted for inclusion in a prompt string."""
+ """Default menu choice foratted for inclusion in a prompt string."""
default_choice = get_default_menu_choice(state)
if default_choice:
- return u"[$(red)" + default_choice + u"$(nrm)] "
+ return "[$(red)" + default_choice + "$(nrm)] "
else:
- return u""
+ return ""
def get_menu_description(state, error):
- u"""Get the description or error text."""
+ """Get the description or error text."""
# an error condition was raised by the handler
if error:
# try to get an error message matching the condition
# and current state
description = universe.categories[
- u"menu"][state].get(u"error_" + error)
+ "menu"][state].get("error_" + error)
if not description:
- description = u"That is not a valid choice..."
- description = u"$(red)" + description + u"$(nrm)"
+ description = "That is not a valid choice..."
+ description = "$(red)" + description + "$(nrm)"
# there was no error condition
else:
# try to get a menu description for the current state
- description = universe.categories[u"menu"][state].get(u"description")
+ description = universe.categories["menu"][state].get("description")
# return the description or error message
if description:
- description += u"$(eol)$(eol)"
+ description += "$(eol)$(eol)"
return description
def get_menu_prompt(state):
- u"""Try to get a prompt, if it was defined."""
- prompt = universe.categories[u"menu"][state].get(u"prompt")
+ """Try to get a prompt, if it was defined."""
+ prompt = universe.categories["menu"][state].get("prompt")
if prompt:
- prompt += u" "
+ prompt += " "
return prompt
def get_menu_choices(user):
- u"""Return a dict of choice:meaning."""
- menu = universe.categories[u"menu"][user.state]
- create_choices = menu.get(u"create")
+ """Return a dict of choice:meaning."""
+ menu = universe.categories["menu"][user.state]
+ create_choices = menu.get("create")
if create_choices:
choices = eval(create_choices)
else:
options = {}
creates = {}
for facet in menu.facets():
- if facet.startswith(u"demand_") and not eval(
- universe.categories[u"menu"][user.state].get(facet)
+ if facet.startswith("demand_") and not eval(
+ universe.categories["menu"][user.state].get(facet)
):
- ignores.append(facet.split(u"_", 2)[1])
- elif facet.startswith(u"create_"):
- creates[facet] = facet.split(u"_", 2)[1]
- elif facet.startswith(u"choice_"):
- options[facet] = facet.split(u"_", 2)[1]
+ ignores.append(facet.split("_", 2)[1])
+ elif facet.startswith("create_"):
+ creates[facet] = facet.split("_", 2)[1]
+ elif facet.startswith("choice_"):
+ options[facet] = facet.split("_", 2)[1]
for facet in creates.keys():
if not creates[facet] in ignores:
choices[creates[facet]] = eval(menu.get(facet))
def get_formatted_menu_choices(state, choices):
- u"""Returns a formatted string of menu choices."""
- choice_output = u""
+ """Returns a formatted string of menu choices."""
+ choice_output = ""
choice_keys = choices.keys()
choice_keys.sort()
for choice in choice_keys:
- choice_output += u" [$(red)" + choice + u"$(nrm)] " + choices[
+ choice_output += " [$(red)" + choice + "$(nrm)] " + choices[
choice
- ] + u"$(eol)"
+ ] + "$(eol)"
if choice_output:
- choice_output += u"$(eol)"
+ choice_output += "$(eol)"
return choice_output
def get_menu_branches(state):
- u"""Return a dict of choice:branch."""
+ """Return a dict of choice:branch."""
branches = {}
- for facet in universe.categories[u"menu"][state].facets():
- if facet.startswith(u"branch_"):
+ for facet in universe.categories["menu"][state].facets():
+ if facet.startswith("branch_"):
branches[
- facet.split(u"_", 2)[1]
- ] = universe.categories[u"menu"][state].get(facet)
+ facet.split("_", 2)[1]
+ ] = universe.categories["menu"][state].get(facet)
return branches
def get_default_branch(state):
- u"""Return the default branch."""
- return universe.categories[u"menu"][state].get(u"branch")
+ """Return the default branch."""
+ return universe.categories["menu"][state].get("branch")
def get_choice_branch(user, choice):
- u"""Returns the new state matching the given choice."""
+ """Returns the new state matching the given choice."""
branches = get_menu_branches(user.state)
if choice in branches.keys():
return branches[choice]
elif choice in user.menu_choices.keys():
return get_default_branch(user.state)
else:
- return u""
+ return ""
def get_menu_actions(state):
- u"""Return a dict of choice:branch."""
+ """Return a dict of choice:branch."""
actions = {}
- for facet in universe.categories[u"menu"][state].facets():
- if facet.startswith(u"action_"):
+ for facet in universe.categories["menu"][state].facets():
+ if facet.startswith("action_"):
actions[
- facet.split(u"_", 2)[1]
- ] = universe.categories[u"menu"][state].get(facet)
+ facet.split("_", 2)[1]
+ ] = universe.categories["menu"][state].get(facet)
return actions
def get_default_action(state):
- u"""Return the default action."""
- return universe.categories[u"menu"][state].get(u"action")
+ """Return the default action."""
+ return universe.categories["menu"][state].get("action")
def get_choice_action(user, choice):
- u"""Run any indicated script for the given choice."""
+ """Run any indicated script for the given choice."""
actions = get_menu_actions(user.state)
if choice in actions.keys():
return actions[choice]
elif choice in user.menu_choices.keys():
return get_default_action(user.state)
else:
- return u""
+ return ""
def handle_user_input(user):
- u"""The main handler, branches to a state-specific handler."""
+ """The main handler, branches to a state-specific handler."""
import telnet
# if the user's client echo is off, send a blank line for aesthetics
if telnet.is_enabled(user, telnet.TELOPT_ECHO, telnet.US):
- user.send(u"", add_prompt=False, prepend_padding=False)
+ user.send("", add_prompt=False, prepend_padding=False)
# check to make sure the state is expected, then call that handler
- if u"handler_" + user.state in globals():
- exec(u"handler_" + user.state + u"(user)")
+ if "handler_" + user.state in globals():
+ exec("handler_" + user.state + "(user)")
else:
generic_menu_handler(user)
def generic_menu_handler(user):
- u"""A generic menu choice handler."""
+ """A generic menu choice handler."""
# get a lower-case representation of the next line of input
if user.input_queue:
if choice:
choice = choice.lower()
else:
- choice = u""
+ choice = ""
if not choice:
choice = get_default_menu_choice(user.state)
if choice in user.menu_choices:
if new_state:
user.state = new_state
else:
- user.error = u"default"
+ user.error = "default"
def handler_entering_account_name(user):
- u"""Handle the login account name."""
+ """Handle the login account name."""
# get the next waiting line of input
input_data = user.input_queue.pop(0)
# fail if there are non-alphanumeric characters
if name != filter(
- lambda x: x >= u"0" and x <= u"9" or x >= u"a" and x <= u"z", name
+ lambda x: x >= "0" and x <= "9" or x >= "a" and x <= "z", name
):
- user.error = u"bad_name"
+ user.error = "bad_name"
# if that account exists, time to request a password
- elif name in universe.categories[u"account"]:
- user.account = universe.categories[u"account"][name]
- user.state = u"checking_password"
+ elif name in universe.categories["account"]:
+ user.account = universe.categories["account"][name]
+ user.state = "checking_password"
# otherwise, this could be a brand new user
else:
- user.account = Element(u"account:" + name, universe)
- user.account.set(u"name", name)
- log(u"New user: " + name, 2)
- user.state = u"checking_new_account_name"
+ user.account = Element("account:" + name, universe)
+ user.account.set("name", name)
+ log("New user: " + name, 2)
+ user.state = "checking_new_account_name"
# if the user entered nothing for a name, then buhbye
else:
- user.state = u"disconnecting"
+ user.state = "disconnecting"
def handler_checking_password(user):
- u"""Handle the login account password."""
+ """Handle the login account password."""
import password
# get the next waiting line of input
input_data = user.input_queue.pop(0)
# does the hashed input equal the stored hash?
- if password.verify(input_data, user.account.get(u"passhash")):
+ if password.verify(input_data, user.account.get("passhash")):
# if so, set the username and load from cold storage
if not user.replace_old_connections():
user.authenticate()
- user.state = u"main_utility"
+ user.state = "main_utility"
# if at first your hashes don't match, try, try again
elif user.password_tries < universe.categories[
- u"internal"
+ "internal"
][
- u"limits"
+ "limits"
].getint(
- u"password_tries"
+ "password_tries"
) - 1:
user.password_tries += 1
- user.error = u"incorrect"
+ user.error = "incorrect"
# we've exceeded the maximum number of password failures, so disconnect
else:
user.send(
- u"$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)"
+ "$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)"
)
- user.state = u"disconnecting"
+ user.state = "disconnecting"
def handler_entering_new_password(user):
- u"""Handle a new password entry."""
+ """Handle a new password entry."""
import password
# get the next waiting line of input
# make sure the password is strong--at least one upper, one lower and
# one digit, seven or more characters in length
if len(input_data) > 6 and len(
- filter(lambda x: x >= u"0" and x <= u"9", input_data)
+ filter(lambda x: x >= "0" and x <= "9", input_data)
) and len(
- filter(lambda x: x >= u"A" and x <= u"Z", input_data)
+ filter(lambda x: x >= "A" and x <= "Z", input_data)
) and len(
- filter(lambda x: x >= u"a" and x <= u"z", input_data)
+ filter(lambda x: x >= "a" and x <= "z", input_data)
):
# hash and store it, then move on to verification
- user.account.set(u"passhash", password.create(input_data))
- user.state = u"verifying_new_password"
+ user.account.set("passhash", password.create(input_data))
+ user.state = "verifying_new_password"
# the password was weak, try again if you haven't tried too many times
elif user.password_tries < universe.categories[
- u"internal"
+ "internal"
][
- u"limits"
+ "limits"
].getint(
- u"password_tries"
+ "password_tries"
) - 1:
user.password_tries += 1
- user.error = u"weak"
+ user.error = "weak"
# too many tries, so adios
else:
user.send(
- u"$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)"
+ "$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)"
)
user.account.destroy()
- user.state = u"disconnecting"
+ user.state = "disconnecting"
def handler_verifying_new_password(user):
- u"""Handle the re-entered new password for verification."""
+ """Handle the re-entered new password for verification."""
import password
# get the next waiting line of input
input_data = user.input_queue.pop(0)
# hash the input and match it to storage
- if password.verify(input_data, user.account.get(u"passhash")):
+ if password.verify(input_data, user.account.get("passhash")):
user.authenticate()
# the hashes matched, so go active
if not user.replace_old_connections():
- user.state = u"main_utility"
+ user.state = "main_utility"
# go back to entering the new password as long as you haven't tried
# too many times
elif user.password_tries < universe.categories[
- u"internal"
+ "internal"
][
- u"limits"
+ "limits"
].getint(
- u"password_tries"
+ "password_tries"
) - 1:
user.password_tries += 1
- user.error = u"differs"
- user.state = u"entering_new_password"
+ user.error = "differs"
+ user.state = "entering_new_password"
# otherwise, sayonara
else:
user.send(
- u"$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)"
+ "$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)"
)
user.account.destroy()
- user.state = u"disconnecting"
+ user.state = "disconnecting"
def handler_active(user):
- u"""Handle input for active users."""
+ """Handle input for active users."""
# get the next waiting line of input
input_data = user.input_queue.pop(0)
# split out the command and parameters
actor = user.avatar
- mode = actor.get(u"mode")
- if mode and input_data.startswith(u"!"):
+ mode = actor.get("mode")
+ if mode and input_data.startswith("!"):
command_name, parameters = first_word(input_data[1:])
- elif mode == u"chat":
- command_name = u"say"
+ elif mode == "chat":
+ command_name = "say"
parameters = input_data
else:
command_name, parameters = first_word(input_data)
command_name = command_name.lower()
# the command matches a command word for which we have data
- if command_name in universe.categories[u"command"]:
- command = universe.categories[u"command"][command_name]
+ if command_name in universe.categories["command"]:
+ command = universe.categories["command"][command_name]
else:
command = None
# if it's allowed, do it
if actor.can_run(command):
- exec(command.get(u"action"))
+ exec(command.get("action"))
# otherwise, give an error
elif command_name:
# if no input, just idle back with a prompt
else:
- user.send(u"", just_prompt=True)
+ user.send("", just_prompt=True)
def command_halt(actor, parameters):
- u"""Halt the world."""
+ """Halt the world."""
if actor.owner:
# see if there's a message or use a generic one
if parameters:
- message = u"Halting: " + parameters
+ message = "Halting: " + parameters
else:
- message = u"User " + actor.owner.account.get(
- u"name"
- ) + u" halted the world."
+ message = "User " + actor.owner.account.get(
+ "name"
+ ) + " halted the world."
# let everyone know
broadcast(message, add_prompt=False)
def command_reload(actor):
- u"""Reload all code modules, configs and data."""
+ """Reload all code modules, configs and data."""
if actor.owner:
# let the user know and log
- actor.send(u"Reloading all code modules, configs and data.")
+ actor.send("Reloading all code modules, configs and data.")
log(
- u"User " +
- actor.owner.account.get(u"name") + u" reloaded the world.",
+ "User " +
+ actor.owner.account.get("name") + " reloaded the world.",
8
)
def command_quit(actor):
- u"""Leave the world and go back to the main menu."""
+ """Leave the world and go back to the main menu."""
if actor.owner:
- actor.owner.state = u"main_utility"
+ actor.owner.state = "main_utility"
actor.owner.deactivate_avatar()
def command_help(actor, parameters):
- u"""List available commands and provide help for commands."""
+ """List available commands and provide help for commands."""
# did the user ask for help on a specific command word?
if parameters and actor.owner:
# is the command word one for which we have data?
- if parameters in universe.categories[u"command"]:
- command = universe.categories[u"command"][parameters]
+ if parameters in universe.categories["command"]:
+ command = universe.categories["command"][parameters]
else:
command = None
if actor.can_run(command):
# add a description if provided
- description = command.get(u"description")
+ description = command.get("description")
if not description:
- description = u"(no short description provided)"
- if command.getboolean(u"administrative"):
- output = u"$(red)"
+ description = "(no short description provided)"
+ if command.getboolean("administrative"):
+ output = "$(red)"
else:
- output = u"$(grn)"
- output += parameters + u"$(nrm) - " + description + u"$(eol)$(eol)"
+ output = "$(grn)"
+ output += parameters + "$(nrm) - " + description + "$(eol)$(eol)"
# add the help text if provided
- help_text = command.get(u"help")
+ help_text = command.get("help")
if not help_text:
- help_text = u"No help is provided for this command."
+ help_text = "No help is provided for this command."
output += help_text
# list related commands
- see_also = command.getlist(u"see_also")
+ see_also = command.getlist("see_also")
if see_also:
- really_see_also = u""
+ really_see_also = ""
for item in see_also:
- if item in universe.categories[u"command"]:
- command = universe.categories[u"command"][item]
+ if item in universe.categories["command"]:
+ command = universe.categories["command"][item]
if actor.can_run(command):
if really_see_also:
- really_see_also += u", "
- if command.getboolean(u"administrative"):
- really_see_also += u"$(red)"
+ really_see_also += ", "
+ if command.getboolean("administrative"):
+ really_see_also += "$(red)"
else:
- really_see_also += u"$(grn)"
- really_see_also += item + u"$(nrm)"
+ really_see_also += "$(grn)"
+ really_see_also += item + "$(nrm)"
if really_see_also:
- output += u"$(eol)$(eol)See also: " + really_see_also
+ output += "$(eol)$(eol)See also: " + really_see_also
# no data for the requested command word
else:
- output = u"That is not an available command."
+ output = "That is not an available command."
# no specific command word was indicated
else:
# give a sorted list of commands with descriptions if provided
- output = u"These are the commands available to you:$(eol)$(eol)"
- sorted_commands = universe.categories[u"command"].keys()
+ output = "These are the commands available to you:$(eol)$(eol)"
+ sorted_commands = universe.categories["command"].keys()
sorted_commands.sort()
for item in sorted_commands:
- command = universe.categories[u"command"][item]
+ command = universe.categories["command"][item]
if actor.can_run(command):
- description = command.get(u"description")
+ description = command.get("description")
if not description:
- description = u"(no short description provided)"
- if command.getboolean(u"administrative"):
- output += u" $(red)"
+ description = "(no short description provided)"
+ if command.getboolean("administrative"):
+ output += " $(red)"
else:
- output += u" $(grn)"
- output += item + u"$(nrm) - " + description + u"$(eol)"
- output += u"$(eol)Enter \"help COMMAND\" for help on a command " \
- + u"named \"COMMAND\"."
+ output += " $(grn)"
+ output += item + "$(nrm) - " + description + "$(eol)"
+ output += "$(eol)Enter \"help COMMAND\" for help on a command " \
+ + "named \"COMMAND\"."
# send the accumulated output to the user
actor.send(output)
def command_move(actor, parameters):
- u"""Move the avatar in a given direction."""
- if parameters in universe.contents[actor.get(u"location")].portals():
+ """Move the avatar in a given direction."""
+ if parameters in universe.contents[actor.get("location")].portals():
actor.move_direction(parameters)
else:
- actor.send(u"You cannot go that way.")
+ actor.send("You cannot go that way.")
def command_look(actor, parameters):
- u"""Look around."""
+ """Look around."""
if parameters:
- actor.send(u"You can't look at or in anything yet.")
+ actor.send("You can't look at or in anything yet.")
else:
- actor.look_at(actor.get(u"location"))
+ actor.look_at(actor.get("location"))
def command_say(actor, parameters):
- u"""Speak to others in the same room."""
+ """Speak to others in the same room."""
import unicodedata
# check for replacement macros and escape them
# if the message is wrapped in quotes, remove them and leave contents
# intact
- if parameters.startswith(u"\"") and parameters.endswith(u"\""):
+ if parameters.startswith("\"") and parameters.endswith("\""):
message = parameters[1:-1]
literal = True
# otherwise, get rid of stray quote marks on the ends of the message
else:
- message = parameters.strip(u"\"'`")
+ message = parameters.strip("\"'`")
literal = False
# the user entered a message
if message:
# match the punctuation used, if any, to an action
- actions = universe.categories[u"internal"][u"language"].getdict(
- u"actions"
+ actions = universe.categories["internal"]["language"].getdict(
+ "actions"
)
default_punctuation = (
- universe.categories[u"internal"][u"language"].get(
- u"default_punctuation"))
- action = u""
+ universe.categories["internal"]["language"].get(
+ "default_punctuation"))
+ action = ""
for mark in actions.keys():
if not literal and message.endswith(mark):
action = actions[mark]
if not action:
action = actions[default_punctuation]
if message and not (
- literal or unicodedata.category(message[-1]) == u"Po"
+ literal or unicodedata.category(message[-1]) == "Po"
):
message += default_punctuation
message = message[0].lower() + message[1:]
# iterate over all words in message, replacing typos
- typos = universe.categories[u"internal"][u"language"].getdict(
- u"typos"
+ typos = universe.categories["internal"]["language"].getdict(
+ "typos"
)
words = message.split()
for index in range(len(words)):
word = words[index]
- while unicodedata.category(word[0]) == u"Po":
+ while unicodedata.category(word[0]) == "Po":
word = word[1:]
- while unicodedata.category(word[-1]) == u"Po":
+ while unicodedata.category(word[-1]) == "Po":
word = word[:-1]
if word in typos.keys():
words[index] = words[index].replace(word, typos[word])
- message = u" ".join(words)
+ message = " ".join(words)
# capitalize the first letter
message = message[0].upper() + message[1:]
# tell the room
if message:
actor.echo_to_location(
- actor.get(u"name") + u" " + action + u"s, \"" + message + u"\""
+ actor.get("name") + " " + action + "s, \"" + message + "\""
)
- actor.send(u"You " + action + u", \"" + message + u"\"")
+ actor.send("You " + action + ", \"" + message + "\"")
# there was no message
else:
- actor.send(u"What do you want to say?")
+ actor.send("What do you want to say?")
def command_chat(actor):
- u"""Toggle chat mode."""
- mode = actor.get(u"mode")
+ """Toggle chat mode."""
+ mode = actor.get("mode")
if not mode:
- actor.set(u"mode", u"chat")
- actor.send(u"Entering chat mode (use $(grn)!chat$(nrm) to exit).")
- elif mode == u"chat":
- actor.remove_facet(u"mode")
- actor.send(u"Exiting chat mode.")
+ actor.set("mode", "chat")
+ actor.send("Entering chat mode (use $(grn)!chat$(nrm) to exit).")
+ elif mode == "chat":
+ actor.remove_facet("mode")
+ actor.send("Exiting chat mode.")
else:
- actor.send(u"Sorry, but you're already busy with something else!")
+ actor.send("Sorry, but you're already busy with something else!")
def command_show(actor, parameters):
- u"""Show program data."""
+ """Show program data."""
import re
- message = u""
+ message = ""
arguments = parameters.split()
if not parameters:
- message = u"What do you want to show?"
- elif arguments[0] == u"time":
- message = universe.categories[u"internal"][u"counters"].get(
- u"elapsed"
- ) + u" increments elapsed since the world was created."
- elif arguments[0] == u"categories":
- message = u"These are the element categories:$(eol)"
+ message = "What do you want to show?"
+ elif arguments[0] == "time":
+ message = universe.categories["internal"]["counters"].get(
+ "elapsed"
+ ) + " increments elapsed since the world was created."
+ elif arguments[0] == "categories":
+ message = "These are the element categories:$(eol)"
categories = universe.categories.keys()
categories.sort()
for category in categories:
- message += u"$(eol) $(grn)" + category + u"$(nrm)"
- elif arguments[0] == u"files":
- message = u"These are the current files containing the universe:$(eol)"
+ message += "$(eol) $(grn)" + category + "$(nrm)"
+ elif arguments[0] == "files":
+ message = "These are the current files containing the universe:$(eol)"
filenames = universe.files.keys()
filenames.sort()
for filename in filenames:
if universe.files[filename].is_writeable():
- status = u"rw"
+ status = "rw"
else:
- status = u"ro"
- message += u"$(eol) $(red)(" + status + u") $(grn)" + filename \
- + u"$(nrm)"
- elif arguments[0] == u"category":
+ status = "ro"
+ message += "$(eol) $(red)(" + status + ") $(grn)" + filename \
+ + "$(nrm)"
+ elif arguments[0] == "category":
if len(arguments) != 2:
- message = u"You must specify one category."
+ message = "You must specify one category."
elif arguments[1] in universe.categories:
- message = u"These are the elements in the \"" + arguments[1] \
- + u"\" category:$(eol)"
+ message = "These are the elements in the \"" + arguments[1] \
+ + "\" category:$(eol)"
elements = [
(
universe.categories[arguments[1]][x].key
]
elements.sort()
for element in elements:
- message += u"$(eol) $(grn)" + element + u"$(nrm)"
+ message += "$(eol) $(grn)" + element + "$(nrm)"
else:
- message = u"Category \"" + arguments[1] + u"\" does not exist."
- elif arguments[0] == u"file":
+ message = "Category \"" + arguments[1] + "\" does not exist."
+ elif arguments[0] == "file":
if len(arguments) != 2:
- message = u"You must specify one file."
+ message = "You must specify one file."
elif arguments[1] in universe.files:
- message = u"These are the elements in the \"" + arguments[1] \
- + u"\" file:$(eol)"
+ message = "These are the elements in the \"" + arguments[1] \
+ + "\" file:$(eol)"
elements = universe.files[arguments[1]].data.sections()
elements.sort()
for element in elements:
- message += u"$(eol) $(grn)" + element + u"$(nrm)"
+ message += "$(eol) $(grn)" + element + "$(nrm)"
else:
- message = u"Category \"" + arguments[1] + u"\" does not exist."
- elif arguments[0] == u"element":
+ message = "Category \"" + arguments[1] + "\" does not exist."
+ elif arguments[0] == "element":
if len(arguments) != 2:
- message = u"You must specify one element."
+ message = "You must specify one element."
elif arguments[1] in universe.contents:
element = universe.contents[arguments[1]]
- message = u"These are the properties of the \"" + arguments[1] \
+ message = "These are the properties of the \"" + arguments[1] \
+ \
- u"\" element (in \"" + \
- element.origin.filename + u"\"):$(eol)"
+ "\" element (in \"" + \
+ element.origin.filename + "\"):$(eol)"
facets = element.facets()
facets.sort()
for facet in facets:
- message += u"$(eol) $(grn)" + facet + u": $(red)" \
- + escape_macros(element.get(facet)) + u"$(nrm)"
+ message += "$(eol) $(grn)" + facet + ": $(red)" \
+ + escape_macros(element.get(facet)) + "$(nrm)"
else:
- message = u"Element \"" + arguments[1] + u"\" does not exist."
- elif arguments[0] == u"result":
+ message = "Element \"" + arguments[1] + "\" does not exist."
+ elif arguments[0] == "result":
if len(arguments) < 2:
- message = u"You need to specify an expression."
+ message = "You need to specify an expression."
else:
try:
- message = repr(eval(u" ".join(arguments[1:])))
+ message = repr(eval(" ".join(arguments[1:])))
except:
- message = u"Your expression raised an exception!"
- elif arguments[0] == u"log":
+ message = "Your expression raised an exception!"
+ elif arguments[0] == "log":
if len(arguments) == 4:
- if re.match(u"^\d+$", arguments[3]) and int(arguments[3]) >= 0:
+ if re.match("^\d+$", arguments[3]) and int(arguments[3]) >= 0:
stop = int(arguments[3])
else:
stop = -1
else:
stop = 0
if len(arguments) >= 3:
- if re.match(u"^\d+$", arguments[2]) and int(arguments[2]) > 0:
+ if re.match("^\d+$", arguments[2]) and int(arguments[2]) > 0:
start = int(arguments[2])
else:
start = -1
else:
start = 10
if len(arguments) >= 2:
- if (re.match(u"^\d+$", arguments[1])
+ if (re.match("^\d+$", arguments[1])
and 0 <= int(arguments[1]) <= 9):
level = int(arguments[1])
else:
level = -1
- elif 0 <= actor.owner.account.getint(u"loglevel") <= 9:
- level = actor.owner.account.getint(u"loglevel")
+ elif 0 <= actor.owner.account.getint("loglevel") <= 9:
+ level = actor.owner.account.getint("loglevel")
else:
level = 1
if level > -1 and start > -1 and stop > -1:
message = get_loglines(level, start, stop)
else:
- message = u"When specified, level must be 0-9 (default 1), " \
- + u"start and stop must be >=1 (default 10 and 1)."
+ message = "When specified, level must be 0-9 (default 1), " \
+ + "start and stop must be >=1 (default 10 and 1)."
else:
- message = u"I don't know what \"" + parameters + u"\" is."
+ message = "I don't know what \"" + parameters + "\" is."
actor.send(message)
def command_create(actor, parameters):
- u"""Create an element if it does not exist."""
+ """Create an element if it does not exist."""
if not parameters:
- message = u"You must at least specify an element to create."
+ message = "You must at least specify an element to create."
elif not actor.owner:
- message = u""
+ message = ""
else:
arguments = parameters.split()
if len(arguments) == 1:
- arguments.append(u"")
+ arguments.append("")
if len(arguments) == 2:
element, filename = arguments
if element in universe.contents:
- message = u"The \"" + element + u"\" element already exists."
+ message = "The \"" + element + "\" element already exists."
else:
- message = u"You create \"" + \
- element + u"\" within the universe."
+ message = "You create \"" + \
+ element + "\" within the universe."
logline = actor.owner.account.get(
- u"name"
- ) + u" created an element: " + element
+ "name"
+ ) + " created an element: " + element
if filename:
- logline += u" in file " + filename
+ logline += " in file " + filename
if filename not in universe.files:
- message += u" Warning: \"" + filename \
- + u"\" is not yet included in any other file and will " \
+ message += " Warning: \"" + filename \
+ + "\" is not yet included in any other file and will " \
+ \
- u"not be read on startup unless this is remedied."
+ "not be read on startup unless this is remedied."
Element(element, universe, filename)
log(logline, 6)
elif len(arguments) > 2:
- message = u"You can only specify an element and a filename."
+ message = "You can only specify an element and a filename."
actor.send(message)
def command_destroy(actor, parameters):
- u"""Destroy an element if it exists."""
+ """Destroy an element if it exists."""
if actor.owner:
if not parameters:
- message = u"You must specify an element to destroy."
+ message = "You must specify an element to destroy."
else:
if parameters not in universe.contents:
- message = u"The \"" + parameters + \
- u"\" element does not exist."
+ message = "The \"" + parameters + \
+ "\" element does not exist."
else:
universe.contents[parameters].destroy()
- message = u"You destroy \"" + parameters \
- + u"\" within the universe."
+ message = "You destroy \"" + parameters \
+ + "\" within the universe."
log(
actor.owner.account.get(
- u"name"
- ) + u" destroyed an element: " + parameters,
+ "name"
+ ) + " destroyed an element: " + parameters,
6
)
actor.send(message)
def command_set(actor, parameters):
- u"""Set a facet of an element."""
+ """Set a facet of an element."""
if not parameters:
- message = u"You must specify an element, a facet and a value."
+ message = "You must specify an element, a facet and a value."
else:
- arguments = parameters.split(u" ", 2)
+ arguments = parameters.split(" ", 2)
if len(arguments) == 1:
- message = u"What facet of element \"" + arguments[0] \
- + u"\" would you like to set?"
+ message = "What facet of element \"" + arguments[0] \
+ + "\" would you like to set?"
elif len(arguments) == 2:
- message = u"What value would you like to set for the \"" \
- + arguments[1] + u"\" facet of the \"" + arguments[0] \
- + u"\" element?"
+ message = "What value would you like to set for the \"" \
+ + arguments[1] + "\" facet of the \"" + arguments[0] \
+ + "\" element?"
else:
element, facet, value = arguments
if element not in universe.contents:
- message = u"The \"" + element + u"\" element does not exist."
+ message = "The \"" + element + "\" element does not exist."
else:
universe.contents[element].set(facet, value)
- message = u"You have successfully (re)set the \"" + facet \
- + u"\" facet of element \"" + element \
- + u"\". Try \"show element " + \
- element + u"\" for verification."
+ message = "You have successfully (re)set the \"" + facet \
+ + "\" facet of element \"" + element \
+ + "\". Try \"show element " + \
+ element + "\" for verification."
actor.send(message)
def command_delete(actor, parameters):
- u"""Delete a facet from an element."""
+ """Delete a facet from an element."""
if not parameters:
- message = u"You must specify an element and a facet."
+ message = "You must specify an element and a facet."
else:
- arguments = parameters.split(u" ")
+ arguments = parameters.split(" ")
if len(arguments) == 1:
- message = u"What facet of element \"" + arguments[0] \
- + u"\" would you like to delete?"
+ message = "What facet of element \"" + arguments[0] \
+ + "\" would you like to delete?"
elif len(arguments) != 2:
- message = u"You may only specify an element and a facet."
+ message = "You may only specify an element and a facet."
else:
element, facet = arguments
if element not in universe.contents:
- message = u"The \"" + element + u"\" element does not exist."
+ message = "The \"" + element + "\" element does not exist."
elif facet not in universe.contents[element].facets():
- message = u"The \"" + element + u"\" element has no \"" + facet \
- + u"\" facet."
+ message = "The \"" + element + "\" element has no \"" + facet \
+ + "\" facet."
else:
universe.contents[element].remove_facet(facet)
- message = u"You have successfully deleted the \"" + facet \
- + u"\" facet of element \"" + element \
- + u"\". Try \"show element " + \
- element + u"\" for verification."
+ message = "You have successfully deleted the \"" + facet \
+ + "\" facet of element \"" + element \
+ + "\". Try \"show element " + \
+ element + "\" for verification."
actor.send(message)
def command_error(actor, input_data):
- u"""Generic error for an unrecognized command word."""
+ """Generic error for an unrecognized command word."""
import random
# 90% of the time use a generic error
if random.randrange(10):
- message = u"I'm not sure what \"" + input_data + u"\" means..."
+ message = "I'm not sure what \"" + input_data + "\" means..."
# 10% of the time use the classic diku error
else:
- message = u"Arglebargle, glop-glyf!?!"
+ message = "Arglebargle, glop-glyf!?!"
# send the error message
actor.send(message)
def daemonize(universe):
- u"""Fork and disassociate from everything."""
+ """Fork and disassociate from everything."""
import codecs
import ctypes
import ctypes.util
import sys
# only if this is what we're configured to do
- if universe.contents[u"internal:process"].getboolean(u"daemon"):
+ if universe.contents["internal:process"].getboolean("daemon"):
# if possible, we want to rename the process to the same as the script
# (these will need to be byte type during 2to3 migration)
old_argv0_size = len(argc.contents.value)
ctypes.memset(argc.contents, 0, len(new_argv) + old_argv0_size)
ctypes.memmove(argc.contents, new_argv, len(new_argv))
- ctypes.CDLL(ctypes.util.find_library(u"c")).prctl(
+ ctypes.CDLL(ctypes.util.find_library("c")).prctl(
15,
new_short_argv0,
0,
try:
# much simpler, since bsd has a libc function call for this
- ctypes.CDLL(ctypes.util.find_library(u"c")).setproctitle(
+ ctypes.CDLL(ctypes.util.find_library("c")).setproctitle(
new_argv
)
except:
# that didn't work either, so just log that we couldn't
- log(u"Failed to rename the interpreter process (cosmetic).")
+ log("Failed to rename the interpreter process (cosmetic).")
# log before we start forking around, so the terminal gets the message
- log(u"Disassociating from the controlling terminal.")
+ log("Disassociating from the controlling terminal.")
# fork off and die, so we free up the controlling terminal
if os.fork():
os._exit(0)
# reset the working directory so we don't needlessly tie up mounts
- os.chdir(u"/")
+ os.chdir("/")
# clear the file creation mask so we can bend it to our will later
os.umask(0)
# redirect stdin/stdout/stderr and close off their former descriptors
for stdpipe in range(3):
os.close(stdpipe)
- sys.stdin = codecs.open(u"/dev/null", u"r", u"utf-8")
- sys.__stdin__ = codecs.open(u"/dev/null", u"r", u"utf-8")
- sys.stdout = codecs.open(u"/dev/null", u"w", u"utf-8")
- sys.stderr = codecs.open(u"/dev/null", u"w", u"utf-8")
- sys.__stdout__ = codecs.open(u"/dev/null", u"w", u"utf-8")
- sys.__stderr__ = codecs.open(u"/dev/null", u"w", u"utf-8")
+ sys.stdin = codecs.open("/dev/null", "r", "utf-8")
+ sys.__stdin__ = codecs.open("/dev/null", "r", "utf-8")
+ sys.stdout = codecs.open("/dev/null", "w", "utf-8")
+ sys.stderr = codecs.open("/dev/null", "w", "utf-8")
+ sys.__stdout__ = codecs.open("/dev/null", "w", "utf-8")
+ sys.__stderr__ = codecs.open("/dev/null", "w", "utf-8")
def create_pidfile(universe):
- u"""Write a file containing the current process ID."""
+ """Write a file containing the current process ID."""
import codecs
import os
import os.path
- pid = unicode(os.getpid())
- log(u"Process ID: " + pid)
- file_name = universe.contents[u"internal:process"].get(u"pidfile")
+ pid = str(os.getpid())
+ log("Process ID: " + pid)
+ file_name = universe.contents["internal:process"].get("pidfile")
if file_name:
if not os.path.isabs(file_name):
file_name = os.path.join(universe.startdir, file_name)
- file_descriptor = codecs.open(file_name, u"w", u"utf-8")
- file_descriptor.write(pid + u"\n")
+ file_descriptor = codecs.open(file_name, "w", "utf-8")
+ file_descriptor.write(pid + "\n")
file_descriptor.flush()
file_descriptor.close()
def remove_pidfile(universe):
- u"""Remove the file containing the current process ID."""
+ """Remove the file containing the current process ID."""
import os
import os.path
- file_name = universe.contents[u"internal:process"].get(u"pidfile")
+ file_name = universe.contents["internal:process"].get("pidfile")
if file_name:
if not os.path.isabs(file_name):
file_name = os.path.join(universe.startdir, file_name)
def excepthook(excepttype, value, tracebackdata):
- u"""Handle uncaught exceptions."""
+ """Handle uncaught exceptions."""
import traceback
# assemble the list of errors into a single string
- message = u"".join(
+ message = "".join(
traceback.format_exception(excepttype, value, tracebackdata)
)
def sighook(what, where):
- u"""Handle external signals."""
+ """Handle external signals."""
import signal
# a generic message
- message = u"Caught signal: "
+ message = "Caught signal: "
# for a hangup signal
if what == signal.SIGHUP:
- message += u"hangup (reloading)"
+ message += "hangup (reloading)"
universe.reload_flag = True
# for a terminate signal
elif what == signal.SIGTERM:
- message += u"terminate (halting)"
+ message += "terminate (halting)"
universe.terminate_flag = True
# catchall for unexpected signals
else:
- message += unicode(what) + u" (unhandled)"
+ message += str(what) + " (unhandled)"
# log what happened
log(message, 8)
def override_excepthook():
- u"""Redefine sys.excepthook with our own."""
+ """Redefine sys.excepthook with our own."""
import sys
sys.excepthook = excepthook
def assign_sighook():
- u"""Assign a customized handler for some signals."""
+ """Assign a customized handler for some signals."""
import signal
signal.signal(signal.SIGHUP, sighook)
signal.signal(signal.SIGTERM, sighook)
if len(sys.argv) > 1:
conffile = sys.argv[1]
else:
- conffile = u""
+ conffile = ""
# the big bang
global universe
universe = Universe(conffile, True)
# log an initial message
- log(u"Started mudpy with command line: " + u" ".join(sys.argv))
+ log("Started mudpy with command line: " + " ".join(sys.argv))
# fork and disassociate
daemonize(universe)
universe.save()
# log a final message
- log(u"Shutting down now.")
+ log("Shutting down now.")
# get rid of the pidfile
remove_pidfile(universe)