X-Git-Url: https://mudpy.org/gitweb?p=mudpy.git;a=blobdiff_plain;f=mudpy%2Fmisc.py;h=f111b9edb76fc24bdca2deca174697e5e94149a3;hp=3968039fff7d240a8ca2b05e5aa91bd69984b4bc;hb=8e082a95481d8f6f30a29f4a95fdd46d24ed992f;hpb=2c533eb48672110a04d6c4850d3e103d6294ee8d diff --git a/mudpy/misc.py b/mudpy/misc.py index 3968039..f111b9e 100644 --- a/mudpy/misc.py +++ b/mudpy/misc.py @@ -1,6 +1,6 @@ """Miscellaneous functions for the mudpy engine.""" -# Copyright (c) 2004-2020 mudpy authors. Permission to use, copy, +# Copyright (c) 2004-2021 mudpy authors. Permission to use, copy, # modify, and distribute this software is granted under terms # provided in the LICENSE file distributed with this software. @@ -186,34 +186,33 @@ class Element: def is_restricted(self): """Boolean check whether command is administrative or debugging.""" - return( - self.get("administrative", False) or self.get("debugging", False)) + return bool(self.get("administrative") or self.get("debugging")) def is_admin(self): """Boolean check whether an actor is controlled by an admin owner.""" - return(self.owner and self.owner.is_admin()) + return self.owner and self.owner.is_admin() def can_run(self, command): """Check if the user can run this command object.""" # has to be in the commands group if command not in self.universe.groups["command"].values(): - return(False) + return False # debugging commands are not allowed outside debug mode if command.get("debugging") and not self.universe.debug_mode(): - return(False) + return False # avatars of administrators can run any command if self.is_admin(): - return(True) + return True # everyone can run non-administrative commands if not command.is_restricted(): - return(True) + return True # otherwise the command cannot be run by this actor - return(False) + return False def update_location(self): """Make sure the location's contents contain this element.""" @@ -406,6 +405,11 @@ class Universe: element.update_location() element.clean_contents() + # warn when debug mode has been engaged + if self.debug_mode(): + pending_loglines.append(( + "WARNING: Unsafe debugging mode is enabled!", 6)) + # done loading, so disallow updating elements from read-only files self.loading = False @@ -432,13 +436,13 @@ class Universe: host = self.contents["mudpy.network"].get("host") port = self.contents["mudpy.network"].get("port") - # if no host was specified, bind to all local addresses (preferring + # if no host was specified, bind to the loopback address (preferring # ipv6) if not host: if socket.has_ipv6: - host = "::" + host = "::1" else: - host = "0.0.0.0" + host = "127.0.0.1" # figure out if this is ipv4 or v6 family = socket.getaddrinfo(host, port)[0][0] @@ -471,7 +475,19 @@ class Universe: def get_time(self): """Convenience method to get the elapsed time counter.""" - return self.groups["internal"]["counters"].get("elapsed") + try: + return self.groups["internal"]["counters"].get("elapsed", 0) + except KeyError: + return 0 + + def set_time(self, elapsed): + """Convenience method to set the elapsed time counter.""" + try: + self.groups["internal"]["counters"].set("elapsed", elapsed) + except KeyError: + # add an element for counters if it doesn't exist + Element("internal.counters", universe) + self.groups["internal"]["counters"].set("elapsed", elapsed) def add_group(self, group, fallback=None): """Set up group tracking/metadata.""" @@ -578,7 +594,7 @@ class User: self.remove() # get rid of the old user object - del(self) + del self # create a new user object new_user = User() @@ -639,10 +655,12 @@ class User: old_user.connection = self.connection old_user.last_address = old_user.address old_user.address = self.address + old_user.telopts = self.telopts + old_user.adjust_echoing() # take this one out of the list and delete self.remove() - del(self) + del self return_value = True break @@ -689,7 +707,7 @@ class User: if "$_(time)" in prompt: prompt = prompt.replace( "$_(time)", - str(universe.groups["internal"]["counters"].get("elapsed"))) + str(universe.get_time())) # Append a single space for clear separation from user input if prompt[-1] != " ": @@ -1014,7 +1032,7 @@ class User: def is_admin(self): """Boolean check whether user's account is an admin.""" - return(self.account.get("administrator", False)) + return self.account.get("administrator", False) def broadcast(message, add_prompt=True): @@ -1086,14 +1104,14 @@ def log(message, level=0): for line in lines: while 0 < len(universe.loglines) >= max_log_lines: del universe.loglines[0] - universe.loglines.append((level, timestamp + " " + line)) + universe.loglines.append((timestamp + " " + line, level)) def get_loglines(level, start, stop): """Return a specific range of loglines filtered by level.""" # filter the log lines - loglines = [x for x in universe.loglines if x[0] >= level] + loglines = [x for x in universe.loglines if x[1] >= level] # we need these in several places total_count = str(len(universe.loglines)) @@ -1115,11 +1133,10 @@ def get_loglines(level, start, stop): stop = 1 # some preamble - message = "There are " + str(total_count) - message += " log lines in memory and " + str(filtered_count) - message += " at or above level " + str(level) + "." - message += " The matching lines from " + str(stop) + " to " - message += str(start) + " are:$(eol)$(eol)" + message = ( + "There are %s log lines in memory and %s at or above level %s. " + "The matching lines from %s to %s are:$(eol)$(eol)" % + (total_count, filtered_count, level, stop, start)) # add the text from the selected lines if stop > 1: @@ -1127,14 +1144,13 @@ def get_loglines(level, start, stop): else: range_lines = loglines[-start:] for line in range_lines: - message += " (" + str(line[0]) + ") " + line[1].replace( - "$(", "$_(" - ) + "$(eol)" + message += " (%s) %s$(eol)" % ( + line[1], line[0].replace("$(", "$_(")) # there were no lines else: - message = "None of the " + str(total_count) - message += " lines in memory matches your request." + message = "None of the %s lines in memory matches your request." % ( + total_count) # pass it back return message @@ -1355,14 +1371,14 @@ def replace_macros(user, text, is_input=False): elif macro.startswith("inc:"): incfile = mudpy.data.find_file(macro[4:], universe=universe) if os.path.exists(incfile): - incfd = codecs.open(incfile, "r", "utf-8") replacement = "" - for line in incfd: - if line.endswith("\n") and not line.endswith("\r\n"): - line = line.replace("\n", "\r\n") - replacement += line - # lose the trailing eol - replacement = replacement[:-2] + with codecs.open(incfile, "r", "utf-8") as incfd: + for line in incfd: + if line.endswith("\n") and not line.endswith("\r\n"): + line = line.replace("\n", "\r\n") + replacement += line + # lose the trailing eol + replacement = replacement[:-2] else: replacement = "" log("Couldn't read included " + incfile + " file.", 7) @@ -1405,22 +1421,8 @@ def first_word(text, separator=" "): def on_pulse(): """The things which should happen on each pulse, aside from reloads.""" - # open the listening socket if it hasn't been already - if not hasattr(universe, "listening_socket"): - universe.initialize_server_socket() - - # assign a user if a new connection is waiting - user = check_for_connection(universe.listening_socket) - if user: - universe.userlist.append(user) - - # iterate over the connected users - for user in universe.userlist: - user.pulse() - - # add an element for counters if it doesn't exist - if "counters" not in universe.groups.get("internal", {}): - Element("internal.counters", universe) + # increase the elapsed increment counter + universe.set_time(universe.get_time() + 1) # update the log every now and then if not universe.groups["internal"]["counters"].get("mark"): @@ -1448,16 +1450,22 @@ def on_pulse(): ) - 1 ) + # open the listening socket if it hasn't been already + if not hasattr(universe, "listening_socket"): + universe.initialize_server_socket() + + # assign a user if a new connection is waiting + user = check_for_connection(universe.listening_socket) + if user: + universe.userlist.append(user) + + # iterate over the connected users + for user in universe.userlist: + user.pulse() + # pause for a configurable amount of time (decimal seconds) time.sleep(universe.contents["mudpy.timing"].get("increment")) - # increase the elapsed increment counter - universe.groups["internal"]["counters"].set( - "elapsed", universe.groups["internal"]["counters"].get( - "elapsed", 0 - ) + 1 - ) - def reload_data(): """Reload all relevant objects.""" @@ -1466,9 +1474,9 @@ def reload_data(): old_loglines = universe.loglines[:] for element in list(universe.contents.values()): element.destroy() - universe.load() + pending_loglines = universe.load() new_loglines = universe.loglines[:] - universe.loglines = old_loglines + new_loglines + universe.loglines = old_loglines + new_loglines + pending_loglines for user in old_userlist: user.reload()