-# -*- coding: utf-8 -*-
"""Miscellaneous functions for the mudpy engine."""
-# Copyright (c) 2004-2014 Jeremy Stanley <fungi@yuggoth.org>. Permission
+# Copyright (c) 2004-2016 Jeremy Stanley <fungi@yuggoth.org>. Permission
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
"""An element of the universe."""
- def __init__(self, key, universe, filename=None):
+ def __init__(self, key, universe, filename=None, old_style=False):
"""Set up a new element."""
+ # TODO(fungi): This can be removed after the transition is complete
+ self.old_style = old_style
+
# keep track of our key name
self.key = key
self.universe = universe
# clone attributes if this is replacing another element
- if self.key in self.universe.contents:
+ if self.old_style and self.key in self.universe.contents:
old_element = self.universe.contents[self.key]
for attribute in vars(old_element).keys():
exec("self." + attribute + " = old_element." + attribute)
# i guess this is a new element then
else:
+ # set of facet keys from the universe
+ self.facethash = dict()
+
# not owned by a user by default (used for avatars)
self.owner = None
self.origin = self.universe.files[filename]
# add a data section to the origin if necessary
- if not self.origin.data.has_section(self.key):
- self.origin.data.add_section(self.key)
+ if self.key not in self.origin.data:
+ self.origin.data[self.key] = {}
# add or replace this element in the universe
self.universe.contents[self.key] = self
def reload(self):
"""Create a new element and replace this one."""
- Element(self.key, self.universe, self.origin.filename)
+ Element(self.key, self.universe, self.origin.filename,
+ old_style=self.old_style)
del(self)
def destroy(self):
"""Remove an element from the universe and destroy it."""
- self.origin.data.remove_section(self.key)
+ del(self.origin.data[self.key])
del self.universe.categories[self.category][self.subkey]
del self.universe.contents[self.key]
del self
def facets(self):
"""Return a list of non-inherited facets for this element."""
- if self.key in self.origin.data.sections():
- return self.origin.data.options(self.key)
+ if self.old_style:
+ try:
+ return self.origin.data[self.key].keys()
+ except (AttributeError, KeyError):
+ return []
else:
- return []
+ return self.facethash
def has_facet(self, facet):
"""Return whether the non-inherited facet exists."""
def remove_facet(self, facet):
"""Remove a facet from the element."""
if self.has_facet(facet):
- self.origin.data.remove_option(self.key, facet)
+ del(self.origin.data[self.key][facet])
self.origin.modified = True
def ancestry(self):
"""Return a list of the element's inheritance lineage."""
if self.has_facet("inherit"):
- ancestry = self.getlist("inherit")
+ ancestry = self.get("inherit")
+ if not ancestry:
+ ancestry = []
for parent in ancestry[:]:
ancestors = self.universe.contents[parent].ancestry()
for ancestor in ancestors:
"""Retrieve values."""
if default is None:
default = ""
- if self.origin.data.has_option(self.key, facet):
- raw_data = self.origin.data.get(self.key, facet)
- if raw_data.startswith("u\"") or raw_data.startswith("u'"):
- raw_data = raw_data[1:]
- raw_data.strip("\"'")
- return raw_data
- elif self.has_facet("inherit"):
+ try:
+ if self.old_style:
+ return self.origin.data[self.key][facet]
+ else:
+ return self.origin.data[".".join((self.key, facet))]
+ except (KeyError, TypeError):
+ pass
+ if self.has_facet("inherit"):
for ancestor in self.ancestry():
if self.universe.contents[ancestor].has_facet(facet):
return self.universe.contents[ancestor].get(facet)
else:
return default
- def getboolean(self, facet, default=None):
- """Retrieve values as boolean type."""
- if default is None:
- default = False
- if self.origin.data.has_option(self.key, facet):
- return self.origin.data.getboolean(self.key, facet)
- elif self.has_facet("inherit"):
- for ancestor in self.ancestry():
- if self.universe.contents[ancestor].has_facet(facet):
- return self.universe.contents[ancestor].getboolean(facet)
- else:
- return default
-
- def getint(self, facet, default=None):
- """Return values as int type."""
- if default is None:
- default = 0
- if self.origin.data.has_option(self.key, facet):
- return self.origin.data.getint(self.key, facet)
- elif self.has_facet("inherit"):
- for ancestor in self.ancestry():
- if self.universe.contents[ancestor].has_facet(facet):
- return self.universe.contents[ancestor].getint(facet)
- else:
- return default
-
- def getfloat(self, facet, default=None):
- """Return values as float type."""
- if default is None:
- default = 0.0
- if self.origin.data.has_option(self.key, facet):
- return self.origin.data.getfloat(self.key, facet)
- elif self.has_facet("inherit"):
- for ancestor in self.ancestry():
- if self.universe.contents[ancestor].has_facet(facet):
- return self.universe.contents[ancestor].getfloat(facet)
- else:
- return default
-
- def getlist(self, facet, default=None):
- """Return values as list type."""
- if default is None:
- default = []
- value = self.get(facet)
- if value:
- return mudpy.data.makelist(value)
- else:
- return default
-
- def getdict(self, facet, default=None):
- """Return values as dict type."""
- if default is None:
- default = {}
- value = self.get(facet)
- if value:
- return mudpy.data.makedict(value)
- else:
- return default
-
def set(self, facet, value):
"""Set values."""
if not self.has_facet(facet) or not self.get(facet) == value:
- if not type(value) is str:
- value = repr(value)
- self.origin.data.set(self.key, facet, value)
+ if self.old_style:
+ if self.key not in self.origin.data:
+ self.origin.data[self.key] = {}
+ self.origin.data[self.key][facet] = value
+ else:
+ node = ".".join((self.key, facet))
+ self.origin.data[node] = value
+ self.facethash[node] = self.origin.data[node]
self.origin.modified = True
def append(self, facet, value):
"""Append value to a list."""
- if not type(value) is str:
- value = repr(value)
- newlist = self.getlist(facet)
+ newlist = self.get(facet)
+ if not newlist:
+ newlist = []
+ if type(newlist) is not list:
+ newlist = list(newlist)
newlist.append(value)
self.set(facet, newlist)
result = False
# avatars of administrators can run any command
- elif self.owner and self.owner.account.getboolean("administrator"):
+ elif self.owner and self.owner.account.get("administrator"):
result = True
# everyone can run non-administrative commands
- elif not command.getboolean("administrative"):
+ elif not command.get("administrative"):
result = True
# otherwise the command cannot be run by this actor
def update_location(self):
"""Make sure the location's contents contain this element."""
- location = self.get("location")
- if location in self.universe.contents:
- self.universe.contents[location].contents[self.key] = self
+ area = self.get("location")
+ if area in self.universe.contents:
+ self.universe.contents[area].contents[self.key] = self
def clean_contents(self):
"""Make sure the element's contents aren't bogus."""
if element.get("location") != self.key:
del self.contents[element.key]
- def go_to(self, location):
- """Relocate the element to a specific location."""
+ def go_to(self, area):
+ """Relocate the element to a specific area."""
current = self.get("location")
if current and self.key in self.universe.contents[current].contents:
del universe.contents[current].contents[self.key]
- if location in self.universe.contents:
- self.set("location", location)
- self.universe.contents[location].contents[self.key] = self
- self.look_at(location)
+ if area in self.universe.contents:
+ self.set("location", area)
+ self.universe.contents[area].contents[self.key] = self
+ self.look_at(area)
def go_home(self):
"""Relocate the element to its default location."""
"internal"
][
"directions"
- ].getdict(
+ ].get(
direction
)[
"exit"
"internal"
][
"directions"
- ].getdict(
+ ].get(
direction
)[
"exit"
"internal"
][
"directions"
- ].getdict(
+ ].get(
direction
)[
"enter"
for element in self.universe.contents[
self.get("location")
].contents.values():
- if element.getboolean("is_actor") and element is not self:
+ if element.get("is_actor") and element is not self:
message += "$(yel)" + element.get(
"name"
) + " is here.$(nrm)$(eol)"
self.send(message)
def portals(self):
- """Map the portal directions for a room to neighbors."""
+ """Map the portal directions for an area to neighbors."""
portals = {}
- if re.match("""^location:-?\d+,-?\d+,-?\d+$""", self.key):
+ if re.match("""^area:-?\d+,-?\d+,-?\d+$""", self.key):
coordinates = [(int(x))
for x in self.key.split(":")[1].split(",")]
directions = self.universe.categories["internal"]["directions"]
offsets = dict(
[
(
- x, directions.getdict(x)["vector"]
+ x, directions.get(x)["vector"]
) for x in directions.facets()
]
)
- for portal in self.getlist("gridlinks"):
+ for portal in self.get("gridlinks"):
adjacent = map(lambda c, o: c + o,
coordinates, offsets[portal])
- neighbor = "location:" + ",".join(
+ neighbor = "area:" + ",".join(
[(str(x)) for x in adjacent]
)
if neighbor in self.universe.contents:
self.loglines = []
self.private_files = []
self.reload_flag = False
+ self.setup_loglines = []
self.startdir = os.getcwd()
self.terminate_flag = False
self.userlist = []
if not filename:
possible_filenames = [
- ".mudpyrc",
- ".mudpy/mudpyrc",
- ".mudpy/mudpy.conf",
- "mudpy.conf",
- "etc/mudpy.conf",
- "/usr/local/mudpy/mudpy.conf",
- "/usr/local/mudpy/etc/mudpy.conf",
- "/etc/mudpy/mudpy.conf",
- "/etc/mudpy.conf"
+ "etc/mudpy.yaml",
+ "/usr/local/mudpy/etc/mudpy.yaml",
+ "/usr/local/etc/mudpy.yaml",
+ "/etc/mudpy/mudpy.yaml",
+ "/etc/mudpy.yaml"
]
for filename in possible_filenames:
if os.access(filename, os.R_OK):
filename = os.path.join(self.startdir, filename)
self.filename = filename
if load:
- self.load()
+ # make sure to preserve any accumulated log entries during load
+ self.setup_loglines += self.load()
def load(self):
"""Load universe data from persistent storage."""
+ # it's possible for this to enter before logging configuration is read
+ pending_loglines = []
+
# the files dict must exist and filename needs to be read-only
if not hasattr(
self, "files"
# make a list of inactive avatars
inactive_avatars = []
for account in self.categories["account"].values():
- inactive_avatars += [
- (self.contents[x]) for x in account.getlist("avatars")
- ]
+ for avatar in account.get("avatars"):
+ try:
+ inactive_avatars.append(self.contents[avatar])
+ except KeyError:
+ pending_loglines.append((
+ "Missing avatar \"%s\", possible data corruption" %
+ avatar, 6))
for user in self.userlist:
if user.avatar in inactive_avatars:
inactive_avatars.remove(user.avatar)
# go through all elements to clear out inactive avatar locations
for element in self.contents.values():
- location = element.get("location")
- if element in inactive_avatars and location:
- if location in self.contents and element.key in self.contents[
- location
+ area = element.get("location")
+ if element in inactive_avatars and area:
+ if area in self.contents and element.key in self.contents[
+ area
].contents:
- del self.contents[location].contents[element.key]
- element.set("default_location", location)
+ del self.contents[area].contents[element.key]
+ element.set("default_location", area)
element.remove_facet("location")
# another pass to straighten out all the element contents
for element in self.contents.values():
element.update_location()
element.clean_contents()
+ return pending_loglines
def new(self):
"""Create a new, empty Universe (the Big Bang)."""
# need to know the local address and port number for the listener
host = self.categories["internal"]["network"].get("host")
- port = self.categories["internal"]["network"].getint("port")
+ port = self.categories["internal"]["network"].get("port")
# if no host was specified, bind to all local addresses (preferring
# ipv6)
def get_time(self):
"""Convenience method to get the elapsed time counter."""
- return self.categories["internal"]["counters"].getint("elapsed")
+ return self.categories["internal"]["counters"].get("elapsed")
class User:
def check_idle(self):
"""Warn or disconnect idle users as appropriate."""
idletime = universe.get_time() - self.last_input
- linkdead_dict = universe.categories["internal"]["time"].getdict(
+ linkdead_dict = universe.categories["internal"]["time"].get(
"linkdead"
)
if self.state in linkdead_dict:
log(logline, 2)
self.state = "disconnecting"
self.menu_seen = False
- idle_dict = universe.categories["internal"]["time"].getdict("idle")
+ idle_dict = universe.categories["internal"]["time"].get("idle")
if self.state in idle_dict:
idle_state = self.state
else:
"internal"
][
"limits"
- ].getlist(
+ ].get(
"default_admins"
):
self.account.set("administrator", "True")
if self.output_queue:
try:
self.connection.send(self.output_queue[0])
- del self.output_queue[0]
except BrokenPipeError:
if self.account and self.account.get("name"):
account = self.account.get("name")
else:
account = "an unknown user"
- log("Broken pipe sending to %s." % account, 7)
self.state = "disconnecting"
+ log("Broken pipe sending to %s." % account, 7)
+ del self.output_queue[0]
def enqueue_input(self):
"""Process and enqueue any new input."""
# check for some input
try:
raw_input = self.connection.recv(1024)
- except:
+ except (BlockingIOError, OSError):
raw_input = b""
# we got something
# log non-printable characters remaining
if mudpy.telnet.is_enabled(self, mudpy.telnet.TELOPT_BINARY,
mudpy.telnet.HIM):
- asciiline = b"".join(
- filter(lambda x: b" " <= x <= b"~", line))
+ asciiline = bytes([x for x in line if 32 <= x <= 126])
if line != asciiline:
logline = "Non-ASCII characters from "
if self.account and self.account.get("name"):
"actor:avatar:" + self.account.get("name") + ":" + str(
counter
),
- universe
+ universe, old_style=True
)
self.avatar.append("inherit", "archetype:avatar")
self.account.append("avatars", self.avatar.key)
if self.avatar is universe.contents[avatar]:
self.avatar = None
universe.contents[avatar].destroy()
- avatars = self.account.getlist("avatars")
+ avatars = self.account.get("avatars")
avatars.remove(avatar)
self.account.set("avatars", avatars)
def activate_avatar_by_index(self, index):
"""Enter the world with a particular indexed avatar."""
self.avatar = universe.contents[
- self.account.getlist("avatars")[index]]
+ self.account.get("avatars")[index]]
self.avatar.owner = self
self.state = "active"
self.avatar.go_home()
def destroy(self):
"""Destroy the user and associated avatars."""
- for avatar in self.account.getlist("avatars"):
+ for avatar in self.account.get("avatars"):
self.delete_avatar(avatar)
self.account.destroy()
def list_avatar_names(self):
"""List names of assigned avatars."""
- return [
- universe.contents[avatar].get(
- "name"
- ) for avatar in self.account.getlist("avatars")
- ]
+ avatars = []
+ for avatar in self.account.get("avatars"):
+ try:
+ avatars.append(universe.contents[avatar].get("name"))
+ except KeyError:
+ log("Missing avatar \"%s\", possible data corruption." %
+ avatar, 6)
+ return avatars
def broadcast(message, add_prompt=True):
# a couple references we need
file_name = universe.categories["internal"]["logging"].get("file")
- max_log_lines = universe.categories["internal"]["logging"].getint(
+ max_log_lines = universe.categories["internal"]["logging"].get(
"max_log_lines"
)
syslog_name = universe.categories["internal"]["logging"].get("syslog")
timestamp = time.asctime()[4:19]
- # turn the message into a list of lines
- lines = filter(
- lambda x: x != "", [(x.rstrip()) for x in message.split("\n")]
- )
+ # turn the message into a list of nonempty lines
+ lines = [x for x in [(x.rstrip()) for x in message.split("\n")] if x != ""]
# send the timestamp and line to a file
if file_name:
file_descriptor.close()
# send the timestamp and line to standard output
- if universe.categories["internal"]["logging"].getboolean("stdout"):
+ if universe.categories["internal"]["logging"].get("stdout"):
for line in lines:
print(timestamp + " " + line)
# display to connected administrators
for user in universe.userlist:
- if user.state == "active" and user.account.getboolean(
+ if user.state == "active" and user.account.get(
"administrator"
- ) and user.account.getint("loglevel") <= level:
+ ) and user.account.get("loglevel", 0) <= level:
# iterate over every line in the message
full_message = ""
for line in lines:
"""Return a specific range of loglines filtered by level."""
# filter the log lines
- loglines = filter(lambda x: x[0] >= level, universe.loglines)
+ loglines = [x for x in universe.loglines if x[0] >= level]
# we need these in several places
total_count = str(len(universe.loglines))
return text
-def escape_macros(text):
+def escape_macros(value):
"""Escapes replacement macros in text."""
- return text.replace("$(", "$_(")
+ if type(value) is str:
+ return value.replace("$(", "$_(")
+ else:
+ return value
def first_word(text, separator=" "):
# add an element for counters if it doesn't exist
if "counters" not in universe.categories["internal"]:
universe.categories["internal"]["counters"] = Element(
- "internal:counters", universe
+ "internal:counters", universe, old_style=True
)
# update the log every now and then
- if not universe.categories["internal"]["counters"].getint("mark"):
+ if not universe.categories["internal"]["counters"].get("mark"):
log(str(len(universe.userlist)) + " connection(s)")
universe.categories["internal"]["counters"].set(
- "mark", universe.categories["internal"]["time"].getint(
+ "mark", universe.categories["internal"]["time"].get(
"frequency_log"
)
)
else:
universe.categories["internal"]["counters"].set(
- "mark", universe.categories["internal"]["counters"].getint(
+ "mark", universe.categories["internal"]["counters"].get(
"mark"
) - 1
)
# periodically save everything
- if not universe.categories["internal"]["counters"].getint("save"):
+ if not universe.categories["internal"]["counters"].get("save"):
universe.save()
universe.categories["internal"]["counters"].set(
- "save", universe.categories["internal"]["time"].getint(
+ "save", universe.categories["internal"]["time"].get(
"frequency_save"
)
)
else:
universe.categories["internal"]["counters"].set(
- "save", universe.categories["internal"]["counters"].getint(
+ "save", universe.categories["internal"]["counters"].get(
"save"
) - 1
)
# pause for a configurable amount of time (decimal seconds)
time.sleep(universe.categories["internal"]
- ["time"].getfloat("increment"))
+ ["time"].get("increment"))
# increase the elapsed increment counter
universe.categories["internal"]["counters"].set(
- "elapsed", universe.categories["internal"]["counters"].getint(
- "elapsed"
+ "elapsed", universe.categories["internal"]["counters"].get(
+ "elapsed", 0
) + 1
)
# try to accept a new connection
try:
connection, address = listening_socket.accept()
- except:
+ except BlockingIOError:
return None
# note that we got one
def menu_echo_on(state):
"""True if echo is on, false if it is off."""
- return universe.categories["menu"][state].getboolean("echo", True)
+ return universe.categories["menu"][state].get("echo", True)
def get_echo_message(state):
# otherwise, this could be a brand new user
else:
- user.account = Element("account:" + name, universe)
+ user.account = Element("account:" + name, universe, old_style=True)
user.account.set("name", name)
log("New user: " + name, 2)
user.state = "checking_new_account_name"
"internal"
][
"limits"
- ].getint(
+ ].get(
"password_tries"
) - 1:
user.password_tries += 1
"internal"
][
"limits"
- ].getint(
+ ].get(
"password_tries"
) - 1:
user.password_tries += 1
"internal"
][
"limits"
- ].getint(
+ ].get(
"password_tries"
) - 1:
user.password_tries += 1
description = command.get("description")
if not description:
description = "(no short description provided)"
- if command.getboolean("administrative"):
+ if command.get("administrative"):
output = "$(red)"
else:
output = "$(grn)"
output += help_text
# list related commands
- see_also = command.getlist("see_also")
+ see_also = command.get("see_also")
if see_also:
really_see_also = ""
for item in see_also:
if actor.can_run(command):
if really_see_also:
really_see_also += ", "
- if command.getboolean("administrative"):
+ if command.get("administrative"):
really_see_also += "$(red)"
else:
really_see_also += "$(grn)"
description = command.get("description")
if not description:
description = "(no short description provided)"
- if command.getboolean("administrative"):
+ if command.get("administrative"):
output += " $(red)"
else:
output += " $(grn)"
def command_say(actor, parameters):
- """Speak to others in the same room."""
+ """Speak to others in the same area."""
# check for replacement macros and escape them
parameters = escape_macros(parameters)
if message:
# match the punctuation used, if any, to an action
- actions = universe.categories["internal"]["language"].getdict(
+ actions = universe.contents["mudpy.linguistic"].get(
"actions"
)
default_punctuation = (
- universe.categories["internal"]["language"].get(
+ universe.contents["mudpy.linguistic"].get(
"default_punctuation"))
action = ""
- for mark in actions.keys():
+
+ # reverse sort punctuation options so the longest match wins
+ for mark in sorted(actions.keys(), reverse=True):
if not literal and message.endswith(mark):
action = actions[mark]
break
message = message[0].lower() + message[1:]
# iterate over all words in message, replacing typos
- typos = universe.categories["internal"]["language"].getdict(
+ typos = universe.contents["mudpy.linguistic"].get(
"typos"
)
words = message.split()
# capitalize the first letter
message = message[0].upper() + message[1:]
- # tell the room
+ # tell the area
if message:
actor.echo_to_location(
actor.get("name") + " " + action + "s, \"" + message + "\""
elif arguments[1] in universe.files:
message = ("These are the elements in the \"" + arguments[1]
+ "\" file:$(eol)")
- elements = universe.files[arguments[1]].data.sections()
+ elements = universe.files[arguments[1]].data.keys()
elements.sort()
for element in elements:
message += "$(eol) $(grn)" + element + "$(nrm)"
+ "\" element (in \"" + element.origin.filename
+ "\"):$(eol)")
facets = element.facets()
- facets.sort()
- for facet in facets:
- message += ("$(eol) $(grn)" + facet + ": $(red)"
- + escape_macros(element.get(facet)) + "$(nrm)")
+ for facet in sorted(facets):
+ if element.old_style:
+ message += ("$(eol) $(grn)%s: $(red)%s$(nrm)" %
+ (facet, escape_macros(element.get(facet))))
+ else:
+ message += ("$(eol) $(grn)%s: $(red)%s$(nrm)" %
+ (facet, str(facets[facet])))
else:
message = "Element \"" + arguments[1] + "\" does not exist."
elif arguments[0] == "result":
else:
try:
message = repr(eval(" ".join(arguments[1:])))
- except:
- message = "Your expression raised an exception!"
+ except Exception as e:
+ message = ("$(red)Your expression raised an exception...$(eol)"
+ "$(eol)$(bld)%s$(nrm)" % e)
elif arguments[0] == "log":
if len(arguments) == 4:
if re.match("^\d+$", arguments[3]) and int(arguments[3]) >= 0:
level = int(arguments[1])
else:
level = -1
- elif 0 <= actor.owner.account.getint("loglevel") <= 9:
- level = actor.owner.account.getint("loglevel")
+ elif 0 <= actor.owner.account.get("loglevel", 0) <= 9:
+ level = actor.owner.account.get("loglevel", 0)
else:
level = 1
if level > -1 and start > -1 and stop > -1:
" Warning: \"" + filename + "\" is not yet "
"included in any other file and will not be read "
"on startup unless this is remedied.")
- Element(element, universe, filename)
+ Element(element, universe, filename, old_style=True)
log(logline, 6)
elif len(arguments) > 2:
message = "You can only specify an element and a filename."
"""Fork and disassociate from everything."""
# only if this is what we're configured to do
- if universe.contents["internal:process"].getboolean("daemon"):
+ if universe.contents["internal:process"].get("daemon"):
# log before we start forking around, so the terminal gets the message
log("Disassociating from the controlling terminal.")
# try to log it, if possible
try:
log(message, 9)
- except:
- pass
-
- # try to write it to stderr, if possible
- try:
- sys.stderr.write(message)
- except:
- pass
+ except Exception as e:
+ # try to write it to stderr, if possible
+ sys.stderr.write(message + "\nException while logging...\n%s" % e)
def sighook(what, where):
global universe
universe = Universe(conffile, True)
+ # report any loglines which accumulated during setup
+ for logline in universe.setup_loglines:
+ log(*logline)
+ universe.setup_loglines = []
+
# log an initial message
log("Started mudpy with command line: " + " ".join(sys.argv))