X-Git-Url: https://mudpy.org/gitweb?p=mudpy.git;a=blobdiff_plain;f=mudpy%2Fmisc.py;h=f111b9edb76fc24bdca2deca174697e5e94149a3;hp=2cd7992b9fbe1ec0e0ed677fffdc42bdc426f9ae;hb=8e082a95481d8f6f30a29f4a95fdd46d24ed992f;hpb=a657b530f33267e5e1237355db4a79d40c914baf diff --git a/mudpy/misc.py b/mudpy/misc.py index 2cd7992..f111b9e 100644 --- a/mudpy/misc.py +++ b/mudpy/misc.py @@ -1,6 +1,6 @@ """Miscellaneous functions for the mudpy engine.""" -# Copyright (c) 2004-2019 mudpy authors. Permission to use, copy, +# Copyright (c) 2004-2021 mudpy authors. Permission to use, copy, # modify, and distribute this software is granted under terms # provided in the LICENSE file distributed with this software. @@ -184,27 +184,35 @@ class Element: prepend_padding ) + def is_restricted(self): + """Boolean check whether command is administrative or debugging.""" + return bool(self.get("administrative") or self.get("debugging")) + + def is_admin(self): + """Boolean check whether an actor is controlled by an admin owner.""" + return self.owner and self.owner.is_admin() + def can_run(self, command): """Check if the user can run this command object.""" # has to be in the commands group if command not in self.universe.groups["command"].values(): - result = False + return False + + # debugging commands are not allowed outside debug mode + if command.get("debugging") and not self.universe.debug_mode(): + return False # avatars of administrators can run any command - elif self.owner and self.owner.account.get("administrator"): - result = True + if self.is_admin(): + return True # everyone can run non-administrative commands - elif not command.get("administrative"): - result = True + if not command.is_restricted(): + return True # otherwise the command cannot be run by this actor - else: - result = False - - # pass back the result - return result + return False def update_location(self): """Make sure the location's contents contain this element.""" @@ -397,6 +405,11 @@ class Universe: element.update_location() element.clean_contents() + # warn when debug mode has been engaged + if self.debug_mode(): + pending_loglines.append(( + "WARNING: Unsafe debugging mode is enabled!", 6)) + # done loading, so disallow updating elements from read-only files self.loading = False @@ -423,13 +436,13 @@ class Universe: host = self.contents["mudpy.network"].get("host") port = self.contents["mudpy.network"].get("port") - # if no host was specified, bind to all local addresses (preferring + # if no host was specified, bind to the loopback address (preferring # ipv6) if not host: if socket.has_ipv6: - host = "::" + host = "::1" else: - host = "0.0.0.0" + host = "127.0.0.1" # figure out if this is ipv4 or v6 family = socket.getaddrinfo(host, port)[0][0] @@ -462,7 +475,19 @@ class Universe: def get_time(self): """Convenience method to get the elapsed time counter.""" - return self.groups["internal"]["counters"].get("elapsed") + try: + return self.groups["internal"]["counters"].get("elapsed", 0) + except KeyError: + return 0 + + def set_time(self, elapsed): + """Convenience method to set the elapsed time counter.""" + try: + self.groups["internal"]["counters"].set("elapsed", elapsed) + except KeyError: + # add an element for counters if it doesn't exist + Element("internal.counters", universe) + self.groups["internal"]["counters"].set("elapsed", elapsed) def add_group(self, group, fallback=None): """Set up group tracking/metadata.""" @@ -477,6 +502,10 @@ class Universe: if fallback not in self.files: mudpy.data.Data(fallback, self, flags=flags) + def debug_mode(self): + """Boolean method to indicate whether unsafe debugging is enabled.""" + return self.groups["mudpy"]["limit"].get("debug", False) + class User: @@ -501,8 +530,10 @@ class User: self.output_queue = [] self.partial_input = b"" self.password_tries = 0 + self.rows = 23 self.state = "telopt_negotiation" self.telopts = {} + self.ttype = None self.universe = universe def quit(self): @@ -563,7 +594,7 @@ class User: self.remove() # get rid of the old user object - del(self) + del self # create a new user object new_user = User() @@ -624,10 +655,12 @@ class User: old_user.connection = self.connection old_user.last_address = old_user.address old_user.address = self.address + old_user.telopts = self.telopts + old_user.adjust_echoing() # take this one out of the list and delete self.remove() - del(self) + del self return_value = True break @@ -674,7 +707,7 @@ class User: if "$_(time)" in prompt: prompt = prompt.replace( "$_(time)", - str(universe.groups["internal"]["counters"].get("elapsed"))) + str(universe.get_time())) # Append a single space for clear separation from user input if prompt[-1] != " ": @@ -805,6 +838,13 @@ class User: else: self.check_idle() + # ask the client for their current terminal type (RFC 1091); it's None + # if it's not been initialized, the empty string if it has but the + # output was indeterminate, "UNKNOWN" if the client specified it has no + # terminal types to supply + if self.ttype is None: + mudpy.telnet.request_ttype(self) + # if output is paused, decrement the counter if self.state == "telopt_negotiation": if self.negotiation_pause: @@ -850,7 +890,7 @@ class User: # check for some input try: raw_input = self.connection.recv(1024) - except (BlockingIOError, OSError): + except OSError: raw_input = b"" # we got something @@ -990,6 +1030,10 @@ class User: avatar, 6) return avatars + def is_admin(self): + """Boolean check whether user's account is an admin.""" + return self.account.get("administrator", False) + def broadcast(message, add_prompt=True): """Send a message to all connected users.""" @@ -1044,9 +1088,10 @@ def log(message, level=0): # display to connected administrators for user in universe.userlist: - if user.state == "active" and user.account.get( - "administrator" - ) and user.account.get("loglevel", 0) <= level: + if ( + user.state == "active" + and user.is_admin() + and user.account.get("loglevel", 0) <= level): # iterate over every line in the message full_message = "" for line in lines: @@ -1059,14 +1104,14 @@ def log(message, level=0): for line in lines: while 0 < len(universe.loglines) >= max_log_lines: del universe.loglines[0] - universe.loglines.append((level, timestamp + " " + line)) + universe.loglines.append((timestamp + " " + line, level)) def get_loglines(level, start, stop): """Return a specific range of loglines filtered by level.""" # filter the log lines - loglines = [x for x in universe.loglines if x[0] >= level] + loglines = [x for x in universe.loglines if x[1] >= level] # we need these in several places total_count = str(len(universe.loglines)) @@ -1075,7 +1120,7 @@ def get_loglines(level, start, stop): # don't proceed if there are no lines if filtered_count: - # can't start before the begining or at the end + # can't start before the beginning or at the end if start > filtered_count: start = filtered_count if start < 1: @@ -1088,11 +1133,10 @@ def get_loglines(level, start, stop): stop = 1 # some preamble - message = "There are " + str(total_count) - message += " log lines in memory and " + str(filtered_count) - message += " at or above level " + str(level) + "." - message += " The matching lines from " + str(stop) + " to " - message += str(start) + " are:$(eol)$(eol)" + message = ( + "There are %s log lines in memory and %s at or above level %s. " + "The matching lines from %s to %s are:$(eol)$(eol)" % + (total_count, filtered_count, level, stop, start)) # add the text from the selected lines if stop > 1: @@ -1100,14 +1144,13 @@ def get_loglines(level, start, stop): else: range_lines = loglines[-start:] for line in range_lines: - message += " (" + str(line[0]) + ") " + line[1].replace( - "$(", "$_(" - ) + "$(eol)" + message += " (%s) %s$(eol)" % ( + line[1], line[0].replace("$(", "$_(")) # there were no lines else: - message = "None of the " + str(total_count) - message += " lines in memory matches your request." + message = "None of the %s lines in memory matches your request." % ( + total_count) # pass it back return message @@ -1128,7 +1171,7 @@ def wrap_ansi_text(text, width): # characters, printable or otherwise abs_pos = 0 - # the current text position relative to the begining of the line, + # the current text position relative to the beginning of the line, # ignoring color escape sequences rel_pos = 0 @@ -1143,7 +1186,7 @@ def wrap_ansi_text(text, width): # normalize any potentially composited unicode before we count it text = unicodedata.normalize("NFKC", text) - # iterate over each character from the begining of the text + # iterate over each character from the beginning of the text for each_character in text: # the current character is the escape character @@ -1185,7 +1228,7 @@ def wrap_ansi_text(text, width): # characters but the space it replaced was only one abs_pos += 1 - # now we're at the begining of a new line, plus the + # now we're at the beginning of a new line, plus the # number of characters wrapped from the previous line rel_pos -= last_rel_whitespace last_rel_whitespace = 0 @@ -1218,7 +1261,9 @@ def weighted_choice(data): expanded.append(key) # return one at random - return random.choice(expanded) + # Allow the random.randrange() call in bandit since it's not used for + # security/cryptographic purposes + return random.choice(expanded) # nosec def random_name(): @@ -1265,7 +1310,9 @@ def random_name(): name = "" # create a name of random length from the syllables - for _syllable in range(random.randrange(2, 6)): + # Allow the random.randrange() call in bandit since it's not used for + # security/cryptographic purposes + for _syllable in range(random.randrange(2, 6)): # nosec name += weighted_choice(syllables) # strip any leading quotemark, capitalize and return the name @@ -1324,14 +1371,14 @@ def replace_macros(user, text, is_input=False): elif macro.startswith("inc:"): incfile = mudpy.data.find_file(macro[4:], universe=universe) if os.path.exists(incfile): - incfd = codecs.open(incfile, "r", "utf-8") replacement = "" - for line in incfd: - if line.endswith("\n") and not line.endswith("\r\n"): - line = line.replace("\n", "\r\n") - replacement += line - # lose the trailing eol - replacement = replacement[:-2] + with codecs.open(incfile, "r", "utf-8") as incfd: + for line in incfd: + if line.endswith("\n") and not line.endswith("\r\n"): + line = line.replace("\n", "\r\n") + replacement += line + # lose the trailing eol + replacement = replacement[:-2] else: replacement = "" log("Couldn't read included " + incfile + " file.", 7) @@ -1374,22 +1421,8 @@ def first_word(text, separator=" "): def on_pulse(): """The things which should happen on each pulse, aside from reloads.""" - # open the listening socket if it hasn't been already - if not hasattr(universe, "listening_socket"): - universe.initialize_server_socket() - - # assign a user if a new connection is waiting - user = check_for_connection(universe.listening_socket) - if user: - universe.userlist.append(user) - - # iterate over the connected users - for user in universe.userlist: - user.pulse() - - # add an element for counters if it doesn't exist - if "counters" not in universe.groups.get("internal", {}): - Element("internal.counters", universe) + # increase the elapsed increment counter + universe.set_time(universe.get_time() + 1) # update the log every now and then if not universe.groups["internal"]["counters"].get("mark"): @@ -1417,24 +1450,33 @@ def on_pulse(): ) - 1 ) + # open the listening socket if it hasn't been already + if not hasattr(universe, "listening_socket"): + universe.initialize_server_socket() + + # assign a user if a new connection is waiting + user = check_for_connection(universe.listening_socket) + if user: + universe.userlist.append(user) + + # iterate over the connected users + for user in universe.userlist: + user.pulse() + # pause for a configurable amount of time (decimal seconds) time.sleep(universe.contents["mudpy.timing"].get("increment")) - # increase the elapsed increment counter - universe.groups["internal"]["counters"].set( - "elapsed", universe.groups["internal"]["counters"].get( - "elapsed", 0 - ) + 1 - ) - def reload_data(): """Reload all relevant objects.""" universe.save() old_userlist = universe.userlist[:] + old_loglines = universe.loglines[:] for element in list(universe.contents.values()): element.destroy() - universe.load() + pending_loglines = universe.load() + new_loglines = universe.loglines[:] + universe.loglines = old_loglines + new_loglines + pending_loglines for user in old_userlist: user.reload() @@ -1485,7 +1527,7 @@ def find_command(command_name): else: for candidate in sorted(universe.groups["command"]): if candidate.startswith(command_name) and not universe.groups[ - "command"][candidate].get("administrative"): + "command"][candidate].is_restricted(): # the command matches the start of a command word and is not # restricted to administrators command = universe.groups["command"][candidate] @@ -1680,13 +1722,6 @@ def get_choice_action(user): def call_hook_function(fname, arglist): """Safely execute named function with supplied arguments, return result.""" - # strip any explicit leader or parameter - # TODO(fungi) remove this once the menu functions transition is complete - if fname.startswith("mudpy."): - fname = fname[6:] - if fname.endswith("(user)"): - fname = fname[:-6] - # all functions relative to mudpy package function = mudpy @@ -1918,33 +1953,14 @@ def handler_active(user): command = find_command(command_name) # if it's allowed, do it - ran = False + result = None if actor.can_run(command): - # dereference the relative object path for the requested function - # TODO(fungi) use call_hook_function() here instead - action = mudpy action_fname = command.get("action", command.key) - for component in action_fname.split("."): - try: - action = getattr(action, component) - ran = True - except AttributeError: - log('Could not find action function "%s" for command "%s"' - % (action_fname, command_name)) - action = None - break - if action: - try: - action(actor, parameters) - except Exception: - log('Command string "%s" from user %s raised an ' - 'exception...\n%s' % ( - input_data, actor.owner.account.get("name"), - traceback.format_exc())) - mudpy.command.error(actor, input_data) + if action_fname: + result = call_hook_function(action_fname, (actor, parameters)) # if the command was not run, give an error - if not ran: + if not result: mudpy.command.error(actor, input_data) # if no input, just idle back with a prompt