X-Git-Url: https://mudpy.org/gitweb?p=mudpy.git;a=blobdiff_plain;f=mudpy%2Fmisc.py;h=6d61085731a8e35778390a0cff63c91055788f2b;hp=f99fac357cc79cbf196fa341581d4edd19401290;hb=8c920b5c4b87fd8b54fd566c462c47f9e7f47693;hpb=f38d6c0f396d2ff3da597b0fbac2cee2891df934 diff --git a/mudpy/misc.py b/mudpy/misc.py index f99fac3..6d61085 100644 --- a/mudpy/misc.py +++ b/mudpy/misc.py @@ -1,6 +1,6 @@ """Miscellaneous functions for the mudpy engine.""" -# Copyright (c) 2004-2019 mudpy authors. Permission to use, copy, +# Copyright (c) 2004-2021 mudpy authors. Permission to use, copy, # modify, and distribute this software is granted under terms # provided in the LICENSE file distributed with this software. @@ -184,27 +184,35 @@ class Element: prepend_padding ) + def is_restricted(self): + """Boolean check whether command is administrative or debugging.""" + return bool(self.get("administrative") or self.get("debugging")) + + def is_admin(self): + """Boolean check whether an actor is controlled by an admin owner.""" + return self.owner and self.owner.is_admin() + def can_run(self, command): """Check if the user can run this command object.""" # has to be in the commands group if command not in self.universe.groups["command"].values(): - result = False + return False + + # debugging commands are not allowed outside debug mode + if command.get("debugging") and not self.universe.debug_mode(): + return False # avatars of administrators can run any command - elif self.owner and self.owner.account.get("administrator"): - result = True + if self.is_admin(): + return True # everyone can run non-administrative commands - elif not command.get("administrative"): - result = True + if not command.is_restricted(): + return True # otherwise the command cannot be run by this actor - else: - result = False - - # pass back the result - return result + return False def update_location(self): """Make sure the location's contents contain this element.""" @@ -397,6 +405,11 @@ class Universe: element.update_location() element.clean_contents() + # warn when debug mode has been engaged + if self.debug_mode(): + pending_loglines.append(( + "WARNING: Unsafe debugging mode is enabled!", 6)) + # done loading, so disallow updating elements from read-only files self.loading = False @@ -406,7 +419,7 @@ class Universe: """Create a new, empty Universe (the Big Bang).""" new_universe = Universe() for attribute in vars(self).keys(): - exec("new_universe." + attribute + " = self." + attribute) + setattr(new_universe, attribute, getattr(self, attribute)) new_universe.reload_flag = False del self return new_universe @@ -423,13 +436,13 @@ class Universe: host = self.contents["mudpy.network"].get("host") port = self.contents["mudpy.network"].get("port") - # if no host was specified, bind to all local addresses (preferring + # if no host was specified, bind to the loopback address (preferring # ipv6) if not host: if socket.has_ipv6: - host = "::" + host = "::1" else: - host = "0.0.0.0" + host = "127.0.0.1" # figure out if this is ipv4 or v6 family = socket.getaddrinfo(host, port)[0][0] @@ -457,10 +470,8 @@ class Universe: self.listening_socket.listen(1) # note that we're now ready for user connections - log( - "Listening for Telnet connections on: " + - host + ":" + str(port) - ) + log("Listening for Telnet connections on %s port %s" % ( + host, str(port))) def get_time(self): """Convenience method to get the elapsed time counter.""" @@ -479,6 +490,10 @@ class Universe: if fallback not in self.files: mudpy.data.Data(fallback, self, flags=flags) + def debug_mode(self): + """Boolean method to indicate whether unsafe debugging is enabled.""" + return self.groups["mudpy"]["limit"].get("debug", False) + class User: @@ -490,6 +505,7 @@ class User: self.address = "" self.authenticated = False self.avatar = None + self.choice = "" self.columns = 79 self.connection = None self.error = "" @@ -502,8 +518,10 @@ class User: self.output_queue = [] self.partial_input = b"" self.password_tries = 0 + self.rows = 23 self.state = "telopt_negotiation" self.telopts = {} + self.ttype = None self.universe = universe def quit(self): @@ -564,7 +582,7 @@ class User: self.remove() # get rid of the old user object - del(self) + del self # create a new user object new_user = User() @@ -628,7 +646,7 @@ class User: # take this one out of the list and delete self.remove() - del(self) + del self return_value = True break @@ -639,14 +657,13 @@ class User: """Flag the user as authenticated and disconnect duplicates.""" if self.state != "authenticated": self.authenticated = True + log("User %s authenticated for account %s." % ( + self, self.account.subkey), 2) if ("mudpy.limit" in universe.contents and self.account.subkey in universe.contents["mudpy.limit"].get("admins")): self.account.set("administrator", True) - log("Administrator %s authenticated." % - self.account.get("name"), 2) - else: - log("User %s authenticated for account %s." % ( - self, self.account.subkey), 2) + log("Account %s is an administrator." % ( + self.account.subkey), 2) def show_menu(self): """Send the user their current menu.""" @@ -807,6 +824,13 @@ class User: else: self.check_idle() + # ask the client for their current terminal type (RFC 1091); it's None + # if it's not been initialized, the empty string if it has but the + # output was indeterminate, "UNKNOWN" if the client specified it has no + # terminal types to supply + if self.ttype is None: + mudpy.telnet.request_ttype(self) + # if output is paused, decrement the counter if self.state == "telopt_negotiation": if self.negotiation_pause: @@ -852,7 +876,7 @@ class User: # check for some input try: raw_input = self.connection.recv(1024) - except (BlockingIOError, OSError): + except OSError: raw_input = b"" # we got something @@ -992,6 +1016,10 @@ class User: avatar, 6) return avatars + def is_admin(self): + """Boolean check whether user's account is an admin.""" + return self.account.get("administrator", False) + def broadcast(message, add_prompt=True): """Send a message to all connected users.""" @@ -1046,9 +1074,10 @@ def log(message, level=0): # display to connected administrators for user in universe.userlist: - if user.state == "active" and user.account.get( - "administrator" - ) and user.account.get("loglevel", 0) <= level: + if ( + user.state == "active" + and user.is_admin() + and user.account.get("loglevel", 0) <= level): # iterate over every line in the message full_message = "" for line in lines: @@ -1061,14 +1090,14 @@ def log(message, level=0): for line in lines: while 0 < len(universe.loglines) >= max_log_lines: del universe.loglines[0] - universe.loglines.append((level, timestamp + " " + line)) + universe.loglines.append((timestamp + " " + line, level)) def get_loglines(level, start, stop): """Return a specific range of loglines filtered by level.""" # filter the log lines - loglines = [x for x in universe.loglines if x[0] >= level] + loglines = [x for x in universe.loglines if x[1] >= level] # we need these in several places total_count = str(len(universe.loglines)) @@ -1077,7 +1106,7 @@ def get_loglines(level, start, stop): # don't proceed if there are no lines if filtered_count: - # can't start before the begining or at the end + # can't start before the beginning or at the end if start > filtered_count: start = filtered_count if start < 1: @@ -1090,11 +1119,10 @@ def get_loglines(level, start, stop): stop = 1 # some preamble - message = "There are " + str(total_count) - message += " log lines in memory and " + str(filtered_count) - message += " at or above level " + str(level) + "." - message += " The matching lines from " + str(stop) + " to " - message += str(start) + " are:$(eol)$(eol)" + message = ( + "There are %s log lines in memory and %s at or above level %s. " + "The matching lines from %s to %s are:$(eol)$(eol)" % + (total_count, filtered_count, level, stop, start)) # add the text from the selected lines if stop > 1: @@ -1102,14 +1130,13 @@ def get_loglines(level, start, stop): else: range_lines = loglines[-start:] for line in range_lines: - message += " (" + str(line[0]) + ") " + line[1].replace( - "$(", "$_(" - ) + "$(eol)" + message += " (%s) %s$(eol)" % ( + line[1], line[0].replace("$(", "$_(")) # there were no lines else: - message = "None of the " + str(total_count) - message += " lines in memory matches your request." + message = "None of the %s lines in memory matches your request." % ( + total_count) # pass it back return message @@ -1130,7 +1157,7 @@ def wrap_ansi_text(text, width): # characters, printable or otherwise abs_pos = 0 - # the current text position relative to the begining of the line, + # the current text position relative to the beginning of the line, # ignoring color escape sequences rel_pos = 0 @@ -1145,7 +1172,7 @@ def wrap_ansi_text(text, width): # normalize any potentially composited unicode before we count it text = unicodedata.normalize("NFKC", text) - # iterate over each character from the begining of the text + # iterate over each character from the beginning of the text for each_character in text: # the current character is the escape character @@ -1187,7 +1214,7 @@ def wrap_ansi_text(text, width): # characters but the space it replaced was only one abs_pos += 1 - # now we're at the begining of a new line, plus the + # now we're at the beginning of a new line, plus the # number of characters wrapped from the previous line rel_pos -= last_rel_whitespace last_rel_whitespace = 0 @@ -1220,7 +1247,9 @@ def weighted_choice(data): expanded.append(key) # return one at random - return random.choice(expanded) + # Allow the random.randrange() call in bandit since it's not used for + # security/cryptographic purposes + return random.choice(expanded) # nosec def random_name(): @@ -1267,7 +1296,9 @@ def random_name(): name = "" # create a name of random length from the syllables - for _syllable in range(random.randrange(2, 6)): + # Allow the random.randrange() call in bandit since it's not used for + # security/cryptographic purposes + for _syllable in range(random.randrange(2, 6)): # nosec name += weighted_choice(syllables) # strip any leading quotemark, capitalize and return the name @@ -1434,9 +1465,12 @@ def reload_data(): """Reload all relevant objects.""" universe.save() old_userlist = universe.userlist[:] + old_loglines = universe.loglines[:] for element in list(universe.contents.values()): element.destroy() - universe.load() + pending_loglines = universe.load() + new_loglines = universe.loglines[:] + universe.loglines = old_loglines + new_loglines + pending_loglines for user in old_userlist: user.reload() @@ -1487,7 +1521,7 @@ def find_command(command_name): else: for candidate in sorted(universe.groups["command"]): if candidate.startswith(command_name) and not universe.groups[ - "command"][candidate].get("administrative"): + "command"][candidate].is_restricted(): # the command matches the start of a command word and is not # restricted to administrators command = universe.groups["command"][candidate] @@ -1584,19 +1618,18 @@ def get_menu_prompt(state): def get_menu_choices(user): """Return a dict of choice:meaning.""" - menu = universe.groups["menu"][user.state] - create_choices = menu.get("create") + state = universe.groups["menu"][user.state] + create_choices = state.get("create") if create_choices: - choices = eval(create_choices) + choices = call_hook_function(create_choices, (user,)) else: choices = {} ignores = [] options = {} creates = {} - for facet in menu.facets(): - if facet.startswith("demand_") and not eval( - universe.groups["menu"][user.state].get(facet) - ): + for facet in state.facets(): + if facet.startswith("demand_") and not call_hook_function( + universe.groups["menu"][user.state].get(facet), (user,)): ignores.append(facet.split("_", 2)[1]) elif facet.startswith("create_"): creates[facet] = facet.split("_", 2)[1] @@ -1604,10 +1637,11 @@ def get_menu_choices(user): options[facet] = facet.split("_", 2)[1] for facet in creates.keys(): if not creates[facet] in ignores: - choices[creates[facet]] = eval(menu.get(facet)) + choices[creates[facet]] = call_hook_function( + state.get(facet), (user,)) for facet in options.keys(): if not options[facet] in ignores: - choices[options[facet]] = menu.get(facet) + choices[options[facet]] = state.get(facet) return choices @@ -1641,12 +1675,12 @@ def get_default_branch(state): return universe.groups["menu"][state].get("branch") -def get_choice_branch(user, choice): +def get_choice_branch(user): """Returns the new state matching the given choice.""" branches = get_menu_branches(user.state) - if choice in branches.keys(): - return branches[choice] - elif choice in user.menu_choices.keys(): + if user.choice in branches.keys(): + return branches[user.choice] + elif user.choice in user.menu_choices.keys(): return get_default_branch(user.state) else: return "" @@ -1668,17 +1702,39 @@ def get_default_action(state): return universe.groups["menu"][state].get("action") -def get_choice_action(user, choice): +def get_choice_action(user): """Run any indicated script for the given choice.""" actions = get_menu_actions(user.state) - if choice in actions.keys(): - return actions[choice] - elif choice in user.menu_choices.keys(): + if user.choice in actions.keys(): + return actions[user.choice] + elif user.choice in user.menu_choices.keys(): return get_default_action(user.state) else: return "" +def call_hook_function(fname, arglist): + """Safely execute named function with supplied arguments, return result.""" + + # all functions relative to mudpy package + function = mudpy + + for component in fname.split("."): + try: + function = getattr(function, component) + except AttributeError: + log('Could not find mudpy.%s() for arguments "%s"' + % (fname, arglist), 7) + function = None + break + if function: + try: + return function(*arglist) + except Exception: + log('Calling mudpy.%s(%s) raised an exception...\n%s' + % (fname, (*arglist,), traceback.format_exc()), 7) + + def handle_user_input(user): """The main handler, branches to a state-specific handler.""" @@ -1688,9 +1744,9 @@ def handle_user_input(user): user.send("", add_prompt=False, prepend_padding=False) # check to make sure the state is expected, then call that handler - if "handler_" + user.state in globals(): - exec("handler_" + user.state + "(user)") - else: + try: + globals()["handler_" + user.state](user) + except KeyError: generic_menu_handler(user) # since we got input, flag that the menu/prompt needs to be redisplayed @@ -1705,16 +1761,18 @@ def generic_menu_handler(user): # get a lower-case representation of the next line of input if user.input_queue: - choice = user.input_queue.pop(0) - if choice: - choice = choice.lower() + user.choice = user.input_queue.pop(0) + if user.choice: + user.choice = user.choice.lower() else: - choice = "" - if not choice: - choice = get_default_menu_choice(user.state) - if choice in user.menu_choices: - exec(get_choice_action(user, choice)) - new_state = get_choice_branch(user, choice) + user.choice = "" + if not user.choice: + user.choice = get_default_menu_choice(user.state) + if user.choice in user.menu_choices: + action = get_choice_action(user) + if action: + call_hook_function(action, (user,)) + new_state = get_choice_branch(user) if new_state: user.state = new_state else: @@ -1889,31 +1947,14 @@ def handler_active(user): command = find_command(command_name) # if it's allowed, do it - ran = False + result = None if actor.can_run(command): - # dereference the relative object path for the requested function - action = mudpy action_fname = command.get("action", command.key) - for component in action_fname.split("."): - try: - action = getattr(action, component) - ran = True - except AttributeError: - log('Could not find action function "%s" for command "%s"' - % (action_fname, command_name)) - action = None - break - if action: - try: - action(actor, parameters) - except Exception: - log('Command string "%s" from user %s raised an ' - 'exception...\n%s' % ( - input_data, actor.owner.account.get("name"), - traceback.format_exc())) + if action_fname: + result = call_hook_function(action_fname, (actor, parameters)) # if the command was not run, give an error - if not ran: + if not result: mudpy.command.error(actor, input_data) # if no input, just idle back with a prompt @@ -2079,8 +2120,9 @@ def setup(): log("Import path: %s" % ", ".join(sys.path), 1) log("Installed dependencies: %s" % universe.versions.dependencies_text, 1) log("Other python packages: %s" % universe.versions.environment_text, 1) - log("Started %s with command line: %s" % ( - universe.versions.version, " ".join(sys.argv)), 1) + log("Running version: %s" % universe.versions.version, 1) + log("Initial directory: %s" % universe.startdir, 1) + log("Command line: %s" % " ".join(sys.argv), 1) # pass the initialized universe back return universe