X-Git-Url: https://mudpy.org/gitweb?a=blobdiff_plain;f=lib%2Fmudpy%2Fmisc.py;h=6a1643647481398218a00e966f73105be8af7690;hb=4fe66ea61adda4bb6a15989dbb0a9a70d30b4038;hp=033699c20ac8379e475e904565a70d21d65720a5;hpb=8bf6b7a787510321b75e477079ebf70ac150d853;p=mudpy.git diff --git a/lib/mudpy/misc.py b/lib/mudpy/misc.py index 033699c..6a16436 100644 --- a/lib/mudpy/misc.py +++ b/lib/mudpy/misc.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- u"""Miscellaneous functions for the mudpy engine.""" -# Copyright (c) 2004-2009 Jeremy Stanley . Permission +# Copyright (c) 2004-2010 Jeremy Stanley . Permission # to use, copy, modify, and distribute this software is granted under # terms provided in the LICENSE file distributed with this software. @@ -9,7 +9,7 @@ class Element: u"""An element of the universe.""" def __init__(self, key, universe, filename=None): u"""Set up a new element.""" - import os.path + import data, os.path # keep track of our key name self.key = key @@ -51,7 +51,7 @@ class Element: # add the file if it doesn't exist yet if not filename in self.universe.files: - DataFile(filename, self.universe) + data.DataFile(filename, self.universe) # record or reset a pointer to the origin file self.origin = self.universe.files[filename] @@ -144,15 +144,17 @@ class Element: else: return default def getlist(self, facet, default=None): u"""Return values as list type.""" + import data if default is None: default = [] value = self.get(facet) - if value: return makelist(value) + if value: return data.makelist(value) else: return default def getdict(self, facet, default=None): u"""Return values as dict type.""" + import data if default is None: default = {} value = self.get(facet) - if value: return makedict(value) + if value: return data.makedict(value) else: return default def set(self, facet, value): u"""Set values.""" @@ -185,11 +187,22 @@ class Element: raw=False, flush=False, add_prompt=True, - just_prompt=False + just_prompt=False, + add_terminator=False, + prepend_padding=True ): u"""Convenience method to pass messages to an owner.""" if self.owner: - self.owner.send(message, eol, raw, flush, add_prompt, just_prompt) + self.owner.send( + message, + eol, + raw, + flush, + add_prompt, + just_prompt, + add_terminator, + prepend_padding + ) def can_run(self, command): u"""Check if the user can run this command object.""" @@ -345,155 +358,6 @@ class Element: ].contents.values(): if element is not self: element.send(message) -class DataFile: - u"""A file containing universe elements.""" - def __init__(self, filename, universe): - self.filename = filename - self.universe = universe - self.load() - def load(self): - u"""Read a file and create elements accordingly.""" - import ConfigParser, os, os.path - self.data = ConfigParser.RawConfigParser() - self.modified = False - if os.access(self.filename, os.R_OK): self.data.read(self.filename) - if not hasattr(self.universe, u"files"): self.universe.files = {} - self.universe.files[self.filename] = self - includes = [] - if self.data.has_option(u"__control__", u"include_files"): - for included in makelist( - self.data.get(u"__control__", u"include_files") - ): - included = find_file( - included, - relative=self.filename, - universe=self.universe - ) - if included not in includes: includes.append(included) - if self.data.has_option(u"__control__", u"include_dirs"): - for included in [ os.path.join(x, u"__init__.mpy") for x in makelist( - self.data.get(u"__control__", u"include_dirs") - ) ]: - included = find_file( - included, - relative=self.filename, - universe=self.universe - ) - if included not in includes: includes.append(included) - if self.data.has_option(u"__control__", u"default_files"): - origins = makedict(self.data.get(u"__control__", u"default_files")) - for key in origins.keys(): - origins[key] = find_file( - origins[key], - relative=self.filename, - universe=self.universe - ) - if origins[key] not in includes: includes.append(origins[key]) - self.universe.default_origins[key] = origins[key] - if key not in self.universe.categories: - self.universe.categories[key] = {} - if self.data.has_option(u"__control__", u"private_files"): - for item in makelist(self.data.get(u"__control__", u"private_files")): - item = find_file( - item, - relative=self.filename, - universe=self.universe - ) - if item not in includes: includes.append(item) - if item not in self.universe.private_files: - self.universe.private_files.append(item) - for section in self.data.sections(): - if section != u"__control__": - Element(section, self.universe, self.filename) - for include_file in includes: - if not os.path.isabs(include_file): - include_file = find_file( - include_file, - relative=self.filename, - universe=self.universe - ) - if include_file not in self.universe.files or not self.universe.files[ - include_file - ].is_writeable(): - DataFile(include_file, self.universe) - def save(self): - u"""Write the data, if necessary.""" - import codecs, os, os.path, re, stat - - # when modified, writeable and has content or the file exists - if self.modified and self.is_writeable() and ( - self.data.sections() or os.path.exists(self.filename) - ): - - # make parent directories if necessary - if not os.path.exists(os.path.dirname(self.filename)): - os.makedirs(os.path.dirname(self.filename)) - - # backup the file - if self.data.has_option(u"__control__", u"backup_count"): - max_count = self.data.has_option(u"__control__", u"backup_count") - else: - max_count = universe.categories[u"internal"][u"limits"].getint( - u"default_backup_count" - ) - if os.path.exists(self.filename) and max_count: - backups = [] - for candidate in os.listdir(os.path.dirname(self.filename)): - if re.match( - os.path.basename(self.filename) + u"""\.\d+$""", candidate - ): - backups.append(int(candidate.split(u".")[-1])) - backups.sort() - backups.reverse() - for old_backup in backups: - if old_backup >= max_count-1: - os.remove(self.filename+u"."+unicode(old_backup)) - elif not os.path.exists( - self.filename+u"."+unicode(old_backup+1) - ): - os.rename( - self.filename + u"."+unicode(old_backup), - self.filename + u"."+unicode( old_backup + 1 ) - ) - if not os.path.exists(self.filename+u".0"): - os.rename( self.filename, self.filename + u".0" ) - - # our data file - file_descriptor = codecs.open(self.filename, u"w", u"utf-8") - - # if it's marked private, chmod it appropriately - if self.filename in self.universe.private_files and oct( - stat.S_IMODE( os.stat(self.filename)[stat.ST_MODE] ) - ) != 0600: - os.chmod(self.filename, 0600) - - # write it back sorted, instead of using ConfigParser - sections = self.data.sections() - sections.sort() - for section in sections: - file_descriptor.write(u"[" + section + u"]\n") - options = self.data.options(section) - options.sort() - for option in options: - file_descriptor.write( - option + u" = " + self.data.get(section, option) + u"\n" - ) - file_descriptor.write(u"\n") - - # flush and close the file - file_descriptor.flush() - file_descriptor.close() - - # unset the modified flag - self.modified = False - def is_writeable(self): - u"""Returns True if the __control__ read_only is False.""" - return not self.data.has_option( - u"__control__", u"read_only" - ) or not self.data.getboolean( - u"__control__", u"read_only" - ) - class Universe: u"""The universe.""" @@ -532,6 +396,7 @@ class Universe: def load(self): u"""Load universe data from persistent storage.""" + import data # the files dict must exist and filename needs to be read-only if not hasattr( @@ -549,7 +414,7 @@ class Universe: del self.files[data_filename] # start loading from the initial file - DataFile(self.filename, self) + data.DataFile(self.filename, self) # make a list of inactive avatars inactive_avatars = [] @@ -642,15 +507,13 @@ class User: def __init__(self): u"""Default values for the in-memory user variables.""" + import telnet self.account = None self.address = u"" self.authenticated = False self.avatar = None - self.client_displays_binary = False - self.client_sends_binary = False self.columns = 79 self.connection = None - self.echoing = True self.error = u"" self.input_queue = [] self.last_address = u"" @@ -661,9 +524,8 @@ class User: self.output_queue = [] self.partial_input = "" self.password_tries = 0 - self.received_newline = True self.state = u"initial" - self.terminator = telnet_proto([u"IAC",u"GA"]) + self.telopts = {} def quit(self): u"""Log, close the connection and remove.""" @@ -776,14 +638,6 @@ class User: old_user.connection = self.connection old_user.last_address = old_user.address old_user.address = self.address - old_user.client_displays_binary = self.client_displays_binary - old_user.client_sends_binary = self.client_sends_binary - - # may need to tell the new connection to echo - if old_user.echoing: - old_user.send( - get_echo_sequence(old_user.state, self.echoing), raw=True - ) # take this one out of the list and delete self.remove() @@ -823,8 +677,12 @@ class User: def adjust_echoing(self): u"""Adjust echoing to match state menu requirements.""" - if self.echoing and not menu_echo_on(self.state): self.echoing = False - elif not self.echoing and menu_echo_on(self.state): self.echoing = True + import telnet + if telnet.is_enabled(self, telnet.TELOPT_ECHO, telnet.US): + if menu_echo_on(self.state): + telnet.disable(self, telnet.TELOPT_ECHO, telnet.US) + elif not menu_echo_on(self.state): + telnet.enable(self, telnet.TELOPT_ECHO, telnet.US) def remove(self): u"""Remove a user from the list of connected users.""" @@ -838,9 +696,11 @@ class User: flush=False, add_prompt=True, just_prompt=False, - add_terminator=False + add_terminator=False, + prepend_padding=True ): u"""Send arbitrary text to a connected user.""" + import telnet # unless raw mode is on, clean it up all nice and pretty if not raw: @@ -856,11 +716,10 @@ class User: # start with a newline, append the message, then end # with the optional eol string passed to this function # and the ansi escape to return to normal text - if not just_prompt: - if not self.output_queue or not self.output_queue[-1].endswith( - "\r\n" - ): - output = u"$(eol)$(eol)" + output + if not just_prompt and prepend_padding: + if not self.output_queue \ + or not self.output_queue[-1].endswith("\r\n"): + output = u"$(eol)" + output elif not self.output_queue[-1].endswith( "\r\n\x1b[0m\r\n" ) and not self.output_queue[-1].endswith( @@ -887,18 +746,18 @@ class User: output = wrap_ansi_text(output, wrap) # if supported by the client, encode it utf-8 - if self.client_displays_binary: + if telnet.is_enabled(self, telnet.TELOPT_BINARY, telnet.US): encoded_output = output.encode(u"utf-8") # otherwise just send ascii - else: - encoded_output = output.encode(u"ascii", u"replace") - - # inject appropriate echoing changes - encoded_output += get_echo_sequence(self.state, self.echoing) + else: encoded_output = output.encode(u"ascii", u"replace") # end with a terminator if requested - if add_terminator: encoded_output += self.terminator + if add_prompt or add_terminator: + if telnet.is_enabled(self, telnet.TELOPT_EOR, telnet.US): + encoded_output += telnet.telnet_proto(telnet.IAC, telnet.EOR) + elif not telnet.is_enabled(self, telnet.TELOPT_SGA, telnet.US): + encoded_output += telnet.telnet_proto(telnet.IAC, telnet.GA) # and tack it onto the queue self.output_queue.append(encoded_output) @@ -946,10 +805,6 @@ class User: def flush(self): u"""Try to send the last item in the queue and remove it.""" if self.output_queue: - if self.received_newline: - self.received_newline = False - if self.output_queue[0].startswith("\r\n"): - self.output_queue[0] = self.output_queue[0][2:] try: self.connection.send(self.output_queue[0]) del self.output_queue[0] @@ -966,7 +821,7 @@ class User: def enqueue_input(self): u"""Process and enqueue any new input.""" - import unicodedata + import telnet, unicodedata # check for some input try: @@ -981,7 +836,7 @@ class User: self.partial_input += raw_input # reply to and remove any IAC negotiation codes - self.negotiate_telnet_options() + telnet.negotiate_telnet_options(self) # separate multiple input lines new_input_lines = self.partial_input.split("\n") @@ -1015,9 +870,7 @@ class User: line = "" # log non-printable characters remaining - if not hasattr( - self, u"client_sends_binary" - ) or not self.client_sends_binary: + if telnet.is_enabled(self, telnet.TELOPT_BINARY, telnet.HIM): asciiline = filter(lambda x: " " <= x <= "~", line) if line != asciiline: logline = u"Non-ASCII characters from " @@ -1033,147 +886,6 @@ class User: unicodedata.normalize( u"NFKC", unicode(line, u"utf-8") ) ) - def negotiate_telnet_options(self): - u"""Reply to/remove partial_input telnet negotiation options.""" - - # start at the begining of the input - position = 0 - - # make a local copy to play with - text = self.partial_input - - # as long as we haven't checked it all - while position < len(text): - - # jump to the first IAC you find - position = text.find(telnet_proto([u"IAC"]), position) - - # if there wasn't an IAC in the input, we're done - if position < 0: break - - # replace a double (literal) IAC if there's an LF later - elif len(text) > position+1 and text[position+1] == telnet_proto( - [u"IAC"] - ): - if text.find("\n", position) > 0: - text = text.replace( - telnet_proto([u"IAC",u"IAC"]), telnet_proto([u"IAC"]) - ) - else: position += 1 - position += 1 - - # implement an RFC 1143 option negotiation queue here - elif len(text) > position+2 and text[position+1] in ( - telnet_proto([u"DO",u"DONT",u"WILL",u"WONT"]) - ): - negotiation = text[position+1:position+3] - - # if we turned echo off, ignore the confirmation - if not self.echoing and negotiation == telnet_proto( - [u"DO",u"TELOPT_ECHO"] - ): - self.send( - telnet_proto([u"IAC",u"WILL",u"TELOPT_ECHO"]), raw=True - ) - - # BINARY mode handling for unicode support (RFC 856) - elif negotiation == telnet_proto([u"DO",u"TELOPT_BINARY"]): - self.send( - telnet_proto([u"IAC",u"WILL",u"TELOPT_BINARY"]), raw=True - ) - self.client_displays_binary = True - elif negotiation == telnet_proto([u"DONT",u"TELOPT_BINARY"]): - self.send( - telnet_proto([u"IAC",u"WONT",u"TELOPT_BINARY"]), raw=True - ) - self.client_displays_binary = False - elif negotiation == telnet_proto([u"WILL",u"TELOPT_BINARY"]): - self.send( - telnet_proto([u"IAC",u"DO",u"TELOPT_BINARY"]), raw=True - ) - self.client_sends_binary = True - elif negotiation == telnet_proto([u"WONT",u"TELOPT_BINARY"]): - self.send( - telnet_proto([u"IAC",u"DONT",u"TELOPT_BINARY"]), raw=True - ) - self.client_sends_binary = False - - # allow LINEMODE (RFC 1184) - elif negotiation == telnet_proto([u"WILL",u"TELOPT_LINEMODE"]): - self.send( - telnet_proto([u"IAC",u"DO",u"TELOPT_LINEMODE"]), raw=True - ) - elif negotiation == telnet_proto([u"WONT",u"TELOPT_LINEMODE"]): - self.send( - telnet_proto([u"IAC",u"DONT",u"TELOPT_LINEMODE"]), raw=True - ) - - # allow NAWS (RFC 1073) - elif negotiation == telnet_proto([u"WILL",u"TELOPT_NAWS"]): - self.send( - telnet_proto([u"IAC",u"DO",u"TELOPT_NAWS"]), raw=True - ) - elif negotiation == telnet_proto([u"WONT",u"TELOPT_NAWS"]): - self.send( - telnet_proto([u"IAC",u"DONT",u"TELOPT_NAWS"]), raw=True - ) - - # if the client likes EOR (RFC 885) instead of GA, note it - elif negotiation == telnet_proto([u"DO",u"TELOPT_EOR"]): - self.send( - telnet_proto([u"IAC",u"WILL",u"TELOPT_EOR"]), raw=True - ) - self.terminator = telnet_proto([u"IAC",u"EOR"]) - elif negotiation == telnet_proto([u"DONT",u"TELOPT_EOR"]): - self.send( - telnet_proto([u"IAC",u"WONT",u"TELOPT_EOR"]), raw=True - ) - if self.terminator == telnet_proto([u"IAC",u"EOR"]): - self.terminator = telnet_proto([u"IAC",u"GA"]) - - # if the client doesn't want GA, oblige (RFC 858) - elif negotiation == telnet_proto([u"DO",u"TELOPT_SGA"]): - self.send(telnet_proto([u"IAC",u"WILL",u"TELOPT_SGA"]), - raw=True) - if self.terminator == telnet_proto([u"IAC",u"GA"]): - self.terminator = "" - - # we don't want to allow anything else - elif text[position+1] == telnet_proto([u"DO"]): - self.send( - telnet_proto([u"IAC",u"WONT"])+text[position+2], raw=True - ) - elif text[position+1] == telnet_proto([u"WILL"]): - self.send( - telnet_proto([u"IAC",u"DONT"])+text[position+2], raw=True - ) - - # strip the negotiation from the input - text = text.replace(text[position:position+3], "") - - # subnegotiation options - elif len(text) > position+4 and text[ - position:position+2 - ] == telnet_proto([u"IAC",u"SB"]): - if text[position+2] == telnet_proto([u"TELOPT_NAWS"]): - self.columns = ord(text[position+3])*256+ord(text[position+4]) - end_subnegotiation = text.find( - telnet_proto([u"IAC",u"SE"]), position - ) - if end_subnegotiation > 0: - text = text[:position] + text[end_subnegotiation+2:] - else: position += 1 - - # otherwise, strip out a two-byte IAC command - elif len(text) > position+2: - text = text.replace(text[position:position+2], "") - - # and this means we got the begining of an IAC - else: position += 1 - - # replace the input with our cleaned-up text - self.partial_input = text - def new_avatar(self): u"""Instantiate a new, unconfigured avatar for this user.""" counter = 0 @@ -1235,46 +947,6 @@ class User: ) for avatar in self.account.getlist(u"avatars") ] -def makelist(value): - u"""Turn string into list type.""" - if value[0] + value[-1] == u"[]": return eval(value) - elif value[0] + value[-1] == u"\"\"": return [ value[1:-1] ] - else: return [ value ] - -def makedict(value): - u"""Turn string into dict type.""" - if value[0] + value[-1] == u"{}": return eval(value) - elif value.find(u":") > 0: return eval(u"{" + value + u"}") - else: return { value: None } - -def telnet_proto(arguments): - u"""Return a concatenated series of Telnet protocol commands.""" - - # same names as bsd's arpa/telnet.h (telnetlib's are ambiguous) - telnet_commands = { - u"TELOPT_BINARY": 0, # RFC 856 - u"TELOPT_ECHO": 1, # RFC 857 - u"TELOPT_SGA": 3, # RFC 858 - u"TELOPT_EOR": 25, # RFC 885 - u"TELOPT_NAWS": 31, # RFC 1073 - u"TELOPT_LINEMODE": 34, # RFC 1184 - u"EOR": 239, - u"SE": 240, - u"GA": 249, - u"SB": 250, - u"WILL": 251, - u"WONT": 252, - u"DO": 253, - u"DONT": 254, - u"IAC": 255 - } - - # (this will need to be byte type during 2to3 migration) - command_series = "" - for argument in arguments: - command_series += chr(telnet_commands[argument]) - return command_series - def broadcast(message, add_prompt=True): u"""Send a message to all connected users.""" for each_user in universe.userlist: @@ -1542,7 +1214,7 @@ def random_name(): def replace_macros(user, text, is_input=False): u"""Replaces macros in text output.""" - import codecs, os.path + import codecs, data, os.path # third person pronouns pronouns = { @@ -1590,7 +1262,7 @@ def replace_macros(user, text, is_input=False): # this is how we handle local file inclusion (dangerous!) elif macro.startswith(u"inc:"): - incfile = find_file(macro[4:], universe=universe) + incfile = data.find_file(macro[4:], universe=universe) if os.path.exists(incfile): incfd = codecs.open(incfile, u"r", u"utf-8") replacement = u"" @@ -1699,6 +1371,7 @@ def reload_data(): def check_for_connection(listening_socket): u"""Check for a waiting connection and return a new user object.""" + import telnet # try to accept a new connection try: @@ -1722,7 +1395,7 @@ def check_for_connection(listening_socket): user.address = address[0] # let the client know we WILL EOR (RFC 885) - user.send(telnet_proto([u"IAC",u"WILL",u"TELOPT_EOR"]), raw=True) + telnet.enable(user, telnet.TELOPT_EOR, telnet.US) user.negotiation_pause = 2 # return the new user object @@ -1756,22 +1429,6 @@ def menu_echo_on(state): u"""True if echo is on, false if it is off.""" return universe.categories[u"menu"][state].getboolean(u"echo", True) -def get_echo_sequence(state, echoing): - u"""Build the appropriate IAC WILL/WONT ECHO sequence as needed.""" - - # if the user has echo on and the menu specifies it should be turned - # off, send: iac + will + echo + null - if echoing and not menu_echo_on(state): - return telnet_proto([u"IAC",u"WILL",u"TELOPT_ECHO"]) - - # if echo is not set to off in the menu and the user curently has echo - # off, send: iac + wont + echo + null - elif not echoing and menu_echo_on(state): - return telnet_proto([u"IAC",u"WONT",u"TELOPT_ECHO"]) - - # default is not to send an echo control sequence at all - else: return "" - def get_echo_message(state): u"""Return a message indicating that echo is off.""" if menu_echo_on(state): return u"" @@ -1899,9 +1556,11 @@ def get_choice_action(user, choice): def handle_user_input(user): u"""The main handler, branches to a state-specific handler.""" + import telnet # if the user's client echo is off, send a blank line for aesthetics - if user.echoing: user.received_newline = True + if telnet.is_enabled(user, telnet.TELOPT_ECHO, telnet.US): + user.send(u"", add_prompt=False, prepend_padding=False) # check to make sure the state is expected, then call that handler if u"handler_" + user.state in globals(): @@ -1966,17 +1625,13 @@ def handler_entering_account_name(user): def handler_checking_password(user): u"""Handle the login account password.""" - import md5 + import password # get the next waiting line of input input_data = user.input_queue.pop(0) # does the hashed input equal the stored hash? - if unicode( - md5.new( - ( user.account.get(u"name") + input_data ).encode(u"utf-8") - ).hexdigest() - ) == user.account.get(u"passhash"): + if password.verify( input_data, user.account.get(u"passhash") ): # if so, set the username and load from cold storage if not user.replace_old_connections(): @@ -2003,7 +1658,7 @@ def handler_checking_password(user): def handler_entering_new_password(user): u"""Handle a new password entry.""" - import md5 + import password # get the next waiting line of input input_data = user.input_queue.pop(0) @@ -2019,14 +1674,7 @@ def handler_entering_new_password(user): ): # hash and store it, then move on to verification - user.account.set( - u"passhash", - unicode( - md5.new( - ( user.account.get(u"name") + input_data ).encode(u"utf-8") - ).hexdigest() - ) - ) + user.account.set( u"passhash", password.create(input_data) ) user.state = u"verifying_new_password" # the password was weak, try again if you haven't tried too many times @@ -2050,17 +1698,13 @@ def handler_entering_new_password(user): def handler_verifying_new_password(user): u"""Handle the re-entered new password for verification.""" - import md5 + import password # get the next waiting line of input input_data = user.input_queue.pop(0) # hash the input and match it to storage - if unicode( - md5.new( - ( user.account.get(u"name") + input_data ).encode(u"utf-8") - ).hexdigest() - ) == user.account.get(u"passhash"): + if password.verify( input_data, user.account.get(u"passhash") ): user.authenticate() # the hashes matched, so go active @@ -2537,128 +2181,6 @@ def command_error(actor, input_data): # send the error message actor.send(message) -def find_file( - file_name=None, - root_path=None, - search_path=None, - default_dir=None, - relative=None, - universe=None -): - u"""Return an absolute file path based on configuration.""" - import os, os.path, sys - - # make sure to get rid of any surrounding quotes first thing - if file_name: file_name = file_name.strip(u"\"'") - - # this is all unnecessary if it's already absolute - if file_name and os.path.isabs(file_name): - return os.path.realpath(file_name) - - # when no file name is specified, look for .conf - elif not file_name: file_name = os.path.basename( sys.argv[0] ) + u".conf" - - # if a universe was provided, try to get some defaults from there - if universe: - - if hasattr( - universe, - u"contents" - ) and u"internal:storage" in universe.contents: - storage = universe.categories[u"internal"][u"storage"] - if not root_path: root_path = storage.get(u"root_path").strip("\"'") - if not search_path: search_path = storage.getlist(u"search_path") - if not default_dir: default_dir = storage.get(u"default_dir").strip("\"'") - - # if there's only one file loaded, try to work around a chicken