"""Miscellaneous functions for the mudpy engine."""
-# Copyright (c) 2004-2020 mudpy authors. Permission to use, copy,
+# Copyright (c) 2004-2021 mudpy authors. Permission to use, copy,
# modify, and distribute this software is granted under terms
# provided in the LICENSE file distributed with this software.
element.update_location()
element.clean_contents()
+ # warn when debug mode has been engaged
+ if self.debug_mode():
+ pending_loglines.append((
+ "WARNING: Unsafe debugging mode is enabled!", 6))
+
# done loading, so disallow updating elements from read-only files
self.loading = False
host = self.contents["mudpy.network"].get("host")
port = self.contents["mudpy.network"].get("port")
- # if no host was specified, bind to all local addresses (preferring
+ # if no host was specified, bind to the loopback address (preferring
# ipv6)
if not host:
if socket.has_ipv6:
- host = "::"
+ host = "::1"
else:
- host = "0.0.0.0"
+ host = "127.0.0.1"
# figure out if this is ipv4 or v6
family = socket.getaddrinfo(host, port)[0][0]
def get_time(self):
"""Convenience method to get the elapsed time counter."""
- return self.groups["internal"]["counters"].get("elapsed")
+ try:
+ return self.groups["internal"]["counters"].get("elapsed", 0)
+ except KeyError:
+ return 0
+
+ def set_time(self, elapsed):
+ """Convenience method to set the elapsed time counter."""
+ try:
+ self.groups["internal"]["counters"].set("elapsed", elapsed)
+ except KeyError:
+ # add an element for counters if it doesn't exist
+ Element("internal.counters", universe)
+ self.groups["internal"]["counters"].set("elapsed", elapsed)
def add_group(self, group, fallback=None):
"""Set up group tracking/metadata."""
old_user.connection = self.connection
old_user.last_address = old_user.address
old_user.address = self.address
+ old_user.telopts = self.telopts
+ old_user.adjust_echoing()
# take this one out of the list and delete
self.remove()
if "$_(time)" in prompt:
prompt = prompt.replace(
"$_(time)",
- str(universe.groups["internal"]["counters"].get("elapsed")))
+ str(universe.get_time()))
# Append a single space for clear separation from user input
if prompt[-1] != " ":
for line in lines:
while 0 < len(universe.loglines) >= max_log_lines:
del universe.loglines[0]
- universe.loglines.append((level, timestamp + " " + line))
+ universe.loglines.append((timestamp + " " + line, level))
def get_loglines(level, start, stop):
"""Return a specific range of loglines filtered by level."""
# filter the log lines
- loglines = [x for x in universe.loglines if x[0] >= level]
+ loglines = [x for x in universe.loglines if x[1] >= level]
# we need these in several places
total_count = str(len(universe.loglines))
stop = 1
# some preamble
- message = "There are " + str(total_count)
- message += " log lines in memory and " + str(filtered_count)
- message += " at or above level " + str(level) + "."
- message += " The matching lines from " + str(stop) + " to "
- message += str(start) + " are:$(eol)$(eol)"
+ message = (
+ "There are %s log lines in memory and %s at or above level %s. "
+ "The matching lines from %s to %s are:$(eol)$(eol)" %
+ (total_count, filtered_count, level, stop, start))
# add the text from the selected lines
if stop > 1:
else:
range_lines = loglines[-start:]
for line in range_lines:
- message += " (" + str(line[0]) + ") " + line[1].replace(
- "$(", "$_("
- ) + "$(eol)"
+ message += " (%s) %s$(eol)" % (
+ line[1], line[0].replace("$(", "$_("))
# there were no lines
else:
- message = "None of the " + str(total_count)
- message += " lines in memory matches your request."
+ message = "None of the %s lines in memory matches your request." % (
+ total_count)
# pass it back
return message
elif macro.startswith("inc:"):
incfile = mudpy.data.find_file(macro[4:], universe=universe)
if os.path.exists(incfile):
- incfd = codecs.open(incfile, "r", "utf-8")
replacement = ""
- for line in incfd:
- if line.endswith("\n") and not line.endswith("\r\n"):
- line = line.replace("\n", "\r\n")
- replacement += line
- # lose the trailing eol
- replacement = replacement[:-2]
+ with codecs.open(incfile, "r", "utf-8") as incfd:
+ for line in incfd:
+ if line.endswith("\n") and not line.endswith("\r\n"):
+ line = line.replace("\n", "\r\n")
+ replacement += line
+ # lose the trailing eol
+ replacement = replacement[:-2]
else:
replacement = ""
log("Couldn't read included " + incfile + " file.", 7)
def on_pulse():
"""The things which should happen on each pulse, aside from reloads."""
- # open the listening socket if it hasn't been already
- if not hasattr(universe, "listening_socket"):
- universe.initialize_server_socket()
-
- # assign a user if a new connection is waiting
- user = check_for_connection(universe.listening_socket)
- if user:
- universe.userlist.append(user)
-
- # iterate over the connected users
- for user in universe.userlist:
- user.pulse()
-
- # add an element for counters if it doesn't exist
- if "counters" not in universe.groups.get("internal", {}):
- Element("internal.counters", universe)
+ # increase the elapsed increment counter
+ universe.set_time(universe.get_time() + 1)
# update the log every now and then
if not universe.groups["internal"]["counters"].get("mark"):
) - 1
)
+ # open the listening socket if it hasn't been already
+ if not hasattr(universe, "listening_socket"):
+ universe.initialize_server_socket()
+
+ # assign a user if a new connection is waiting
+ user = check_for_connection(universe.listening_socket)
+ if user:
+ universe.userlist.append(user)
+
+ # iterate over the connected users
+ for user in universe.userlist:
+ user.pulse()
+
# pause for a configurable amount of time (decimal seconds)
time.sleep(universe.contents["mudpy.timing"].get("increment"))
- # increase the elapsed increment counter
- universe.groups["internal"]["counters"].set(
- "elapsed", universe.groups["internal"]["counters"].get(
- "elapsed", 0
- ) + 1
- )
-
def reload_data():
"""Reload all relevant objects."""
old_loglines = universe.loglines[:]
for element in list(universe.contents.values()):
element.destroy()
- universe.load()
+ pending_loglines = universe.load()
new_loglines = universe.loglines[:]
- universe.loglines = old_loglines + new_loglines
+ universe.loglines = old_loglines + new_loglines + pending_loglines
for user in old_userlist:
user.reload()
log("Running version: %s" % universe.versions.version, 1)
log("Initial directory: %s" % universe.startdir, 1)
log("Command line: %s" % " ".join(sys.argv), 1)
- if universe.debug_mode():
- log("WARNING: Unsafe debugging mode is enabled!", 6)
# pass the initialized universe back
return universe