from random import choice, randrange
from socket import AF_INET, SO_REUSEADDR, SOCK_STREAM, SOL_SOCKET, socket
from stat import S_IMODE, ST_MODE
+from syslog import LOG_PID, LOG_INFO, LOG_DAEMON, closelog, openlog, syslog
+from telnetlib import DO, DONT, ECHO, EOR, GA, IAC, LINEMODE, SB, SE, SGA, WILL, WONT
from time import asctime, sleep
class Element:
self.connection = None
self.authenticated = False
self.password_tries = 0
- self.state = "entering_account_name"
+ self.state = "initial"
self.menu_seen = False
self.error = ""
self.input_queue = []
self.output_queue = []
self.partial_input = ""
self.echoing = True
+ self.terminator = IAC+GA
+ self.negotiation_pause = 0
self.avatar = None
self.account = None
"""Send the user their current menu."""
if not self.menu_seen:
self.menu_choices = get_menu_choices(self)
- self.send(get_menu(self.state, self.error, self.echoing, self.menu_choices), "")
+ self.send(get_menu(self.state, self.error, self.echoing, self.terminator, self.menu_choices), "")
self.menu_seen = True
self.error = False
self.adjust_echoing()
"""Remove a user from the list of connected users."""
universe.userlist.remove(self)
- def send(self, output, eol="$(eol)"):
+ def send(self, output, eol="$(eol)", raw=False):
"""Send arbitrary text to a connected user."""
- # only when there is actual output
- #if output:
+ # unless raw mode is on, clean it up all nice and pretty
+ if not raw:
- # start with a newline, append the message, then end
- # with the optional eol string passed to this function
- # and the ansi escape to return to normal text
- output = "\r\n" + output + eol + chr(27) + "[0m"
+ # we'll take out GA or EOR and add them back on the end
+ if output.endswith(IAC+GA) or output.endswith(IAC+EOR):
+ terminate = True
+ output = output[:-2]
+ else: terminate = False
- # find and replace macros in the output
- output = replace_macros(self, output)
+ # start with a newline, append the message, then end
+ # with the optional eol string passed to this function
+ # and the ansi escape to return to normal text
+ output = "\r\n" + output + eol + chr(27) + "[0m"
- # wrap the text at 80 characters
- # TODO: prompt user for preferred wrap width
- output = wrap_ansi_text(output, 80)
+ # find and replace macros in the output
+ output = replace_macros(self, output)
- # drop the formatted output into the output queue
- self.output_queue.append(output)
+ # wrap the text at 80 characters
+ output = wrap_ansi_text(output, 80)
- # try to send the last item in the queue, remove it and
- # flag that menu display is not needed
- try:
- self.connection.send(self.output_queue[0])
- self.output_queue.remove(self.output_queue[0])
- self.menu_seen = False
+ # tack the terminator back on
+ if terminate: output += self.terminator
- # but if we can't, that's okay too
- except:
- pass
+ # drop the output into the user's output queue
+ self.output_queue.append(output)
def pulse(self):
"""All the things to do to the user per increment."""
self.state = "disconnecting"
self.menu_seen = False
+ # if output is paused, decrement the counter
+ if self.state == "initial":
+ if self.negotiation_pause: self.negotiation_pause -= 1
+ else: self.state = "entering_account_name"
+
# show the user a menu as needed
- self.show_menu()
+ else: self.show_menu()
# disconnect users with the appropriate state
- if self.state == "disconnecting":
- self.quit()
+ if self.state == "disconnecting": self.quit()
# the user is unique and not flagged to disconnect
else:
+ # try to send the last item in the queue and remove it
+ if self.output_queue:
+ try:
+ self.connection.send(self.output_queue[0])
+ del self.output_queue[0]
+
+ # but if we can't, that's okay too
+ except:
+ pass
+
# check for input and add it to the queue
self.enqueue_input()
# tack this on to any previous partial
self.partial_input += input_data
+ # reply to and remove any IAC negotiation codes
+ self.negotiate_telnet_options()
+
# separate multiple input lines
new_input_lines = self.partial_input.split("\n")
# iterate over the remaining lines
for line in new_input_lines:
+ # remove a trailing carriage return
+ if line.endswith("\r"): line = line.rstrip("\r")
+
+ # log non-printable characters remaining
+ removed = filter(lambda x: (x < " " or x > "~"), line)
+ if removed:
+ logline = "Non-printable characters from "
+ if self.account and self.account.get("name"): logline += self.account.get("name") + ": "
+ else: logline += "unknown user: "
+ logline += repr(removed)
+ log(logline)
+
# filter out non-printables
- line = filter(lambda x: x>=' ' and x<='~', line)
+ line = filter(lambda x: " " <= x <= "~", line)
# strip off extra whitespace
line = line.strip()
# put on the end of the queue
self.input_queue.append(line)
+ def negotiate_telnet_options(self):
+ """Reply to/remove partial_input telnet negotiation options."""
+
+ # start at the begining of the input
+ position = 0
+
+ # make a local copy to play with
+ text = self.partial_input
+
+ # as long as we haven't checked it all
+ while position < len(text):
+
+ # jump to the first IAC you find
+ position = text.find(IAC, position)
+
+ # if there wasn't an IAC in the input, skip to the end
+ if position < 0: position = len(text)
+
+ # replace a double (literal) IAC if there's an LF later
+ elif len(text) > position+1 and text[position+1] == IAC:
+ if text.find("\n", position) > 0: text = text.replace(IAC+IAC, IAC)
+ else: position += 1
+ position += 1
+
+ # this must be an option negotiation
+ elif len(text) > position+2 and text[position+1] in (DO, DONT, WILL, WONT):
+
+ negotiation = text[position+1:position+3]
+
+ # if we turned echo off, ignore the confirmation
+ if not self.echoing and negotiation == DO+ECHO: pass
+
+ # allow LINEMODE
+ elif negotiation == WILL+LINEMODE: self.send(IAC+DO+LINEMODE, raw=True)
+
+ # if the client likes EOR instead of GA, make a note of it
+ elif negotiation == DO+EOR: self.terminator = IAC+EOR
+ elif negotiation == DONT+EOR and self.terminator == IAC+EOR:
+ self.terminator = IAC+GA
+
+ # if the client doesn't want GA, oblige
+ elif negotiation == DO+SGA and self.terminator == IAC+GA:
+ self.terminator = ""
+ self.send(IAC+WILL+SGA, raw=True)
+
+ # we don't want to allow anything else
+ elif text[position+1] == DO: self.send(IAC+WONT+text[position+2], raw=True)
+ elif text[position+1] == WILL: self.send(IAC+DONT+text[position+2], raw=True)
+
+ # strip the negotiation from the input
+ text = text.replace(text[position:position+3], "")
+
+ # get rid of IAC SB .* IAC SE
+ elif len(text) > position+4 and text[position:position+2] == IAC+SB:
+ end_subnegotiation = text.find(IAC+SE, position)
+ if end_subnegotiation > 0: text = text[:position] + text[end_subnegotiation+2:]
+ else: position += 1
+
+ # otherwise, strip out a two-byte IAC command
+ elif len(text) > position+2: text = text.replace(text[position:position+2], "")
+
+ # and this means we got the begining of an IAC
+ else: position += 1
+
+ # replace the input with our cleaned-up text
+ self.partial_input = text
+
def can_run(self, command):
"""Check if the user can run this command object."""
# send the timestamp and message to standard output
print(timestamp + " " + message)
+ # send the message to the system log
+ openlog("mudpy", LOG_PID, LOG_INFO | LOG_DAEMON)
+ syslog(message)
+ closelog()
+
def wrap_ansi_text(text, width):
"""Wrap text with arbitrary width while ignoring ANSI colors."""
# set the user's ipa from the connection's ipa
user.address = address[0]
+ # let the client know we WILL EOR
+ user.send(IAC+WILL+EOR, raw=True)
+ user.negotiation_pause = 2
+
# return the new user object
return user
-def get_menu(state, error=None, echoing=True, choices={}):
+def get_menu(state, error=None, echoing=True, terminator="", choices=None):
"""Show the correct menu text to a user."""
+ # make sure we don't reuse a mutable sequence by default
+ if choices is None: choices = {}
+
# begin with a telnet echo command sequence if needed
message = get_echo_sequence(state, echoing)
# display a message indicating if echo is off
message += get_echo_message(state)
+ # tack on EOR or GA to indicate the prompt will not be followed by CRLF
+ message += terminator
+
# return the assembly of various strings defined above
return message
return universe.categories["menu"][state].getboolean("echo", True)
def get_echo_sequence(state, echoing):
- """Build the appropriate IAC ECHO sequence as needed."""
+ """Build the appropriate IAC WILL/WONT ECHO sequence as needed."""
# if the user has echo on and the menu specifies it should be turned
# off, send: iac + will + echo + null
- if echoing and not menu_echo_on(state): return chr(255) + chr(251) + chr(1) + chr(0)
+ if echoing and not menu_echo_on(state): return IAC+WILL+ECHO
# if echo is not set to off in the menu and the user curently has echo
# off, send: iac + wont + echo + null
- elif not echoing and menu_echo_on(state): return chr(255) + chr(252) + chr(1) + chr(0)
+ elif not echoing and menu_echo_on(state): return IAC+WONT+ECHO
# default is not to send an echo control sequence at all
else: return ""