X-Git-Url: https://mudpy.org/gitweb?p=mudpy.git;a=blobdiff_plain;f=lib%2Fmudpy%2Ftelnet.py;fp=lib%2Fmudpy%2Ftelnet.py;h=293d997468ec1899648e86f8f67d77179bbb9d5f;hp=0000000000000000000000000000000000000000;hb=0fca7fa3d5ac08111c8850555813d341d699797f;hpb=81fdc8035234f4e62f130297df109b6e3c52a37f diff --git a/lib/mudpy/telnet.py b/lib/mudpy/telnet.py new file mode 100644 index 0000000..293d997 --- /dev/null +++ b/lib/mudpy/telnet.py @@ -0,0 +1,185 @@ +# -*- coding: utf-8 -*- +u"""Telnet functions and constants for the mudpy engine.""" + +# Copyright (c) 2004-2010 Jeremy Stanley . Permission +# to use, copy, modify, and distribute this software is granted under +# terms provided in the LICENSE file distributed with this software. + +# telnet options (from bsd's arpa/telnet.h since telnetlib's are ambiguous) +TELOPT_BINARY = 0 # transmit 8-bit data by the receiver (rfc 856) +TELOPT_ECHO = 1 # echo received data back to the sender (rfc 857) +TELOPT_SGA = 3 # suppress transmission of the go ahead character (rfc 858) +#TELOPT_TTYPE = 24 # exchange terminal type information (rfc 1091) +TELOPT_EOR = 25 # transmit end-of-record after transmitting data (rfc 885) +TELOPT_NAWS = 31 # negotiate about window size (rfc 1073) +TELOPT_LINEMODE = 34 # cooked line-by-line input mode (rfc 1184) + +supported = ( + TELOPT_BINARY, + TELOPT_ECHO, + TELOPT_SGA, + TELOPT_EOR, + TELOPT_NAWS, + TELOPT_LINEMODE +) + +# telnet commands +EOR = 239 # end-of-record signal (rfc 885) +SE = 240 # end of subnegotiation parameters (rfc 854) +GA = 249 # go ahead signal (rfc 854) +SB = 250 # what follows is subnegotiation of the indicated option (rfc 854) +WILL = 251 # desire or confirmation of performing an option (rfc 854) +WONT = 252 # refusal or confirmation of performing an option (rfc 854) +DO = 253 # request or confirm performing an option (rfc 854) +DONT = 254 # demand or confirm no longer performing an option (rfc 854) +IAC = 255 # interpret as command escape character (rfc 854) + +# RFC 1143 option negotiation states +NO = 0 # option is disabled +YES = 1 # option is enabled +WANTNO = 2 # demanded disabling option +WANTYES = 3 # requested enabling option +WANTNO_OPPOSITE = 4 # demanded disabling option but queued an enable after it +WANTYES_OPPOSITE = 5 # requested enabling option but queued a disable after it + +# RFC 1143 option negotiation parties +HIM = 0 +US = 1 + +def telnet_proto(*arguments): + u"""Return a concatenated series of Telnet protocol commands.""" + # (this will need to be byte type during 2to3 migration) + return "".join( [chr(x) for x in arguments] ) + +def send_command(user, *command): + u"""Sends a Telnet command string to the specified user's socket.""" + user.send( telnet_proto(IAC, *command), raw=True ) + +def is_enabled(user, telopt, party, state=YES): + u"""Returns True if the indicated Telnet option is enabled, False if not.""" + if (telopt, party) in user.telopts and user.telopts[ + (telopt, party) + ] is state: return True + else: return False + +def enable(user, telopt, party): + u"""Negotiates enabling a Telnet option for the indicated user's socket.""" + if party is HIM: txpos = DO + else: txpos = WILL + if not (telopt, party) in user.telopts or user.telopts[ + (telopt, party) + ] is NO: + user.telopts[ (telopt, party) ] = WANTYES + send_command(user, txpos, telopt) + elif user.telopts[ (telopt, party) ] is WANTNO: + user.telopts[ (telopt, party) ] = WANTNO_OPPOSITE + elif user.telopts[ (telopt, party) ] is WANTYES_OPPOSITE: + user.telopts[ (telopt, party) ] = WANTYES + +def disable(user, telopt, party): + u"""Negotiates disabling a Telnet option for the indicated user's socket.""" + if party is HIM: txneg = DONT + else: txneg = WONT + if not (telopt, party) in user.telopts: user.telopts[ (telopt, party) ] = NO + elif user.telopts[ (telopt, party) ] is YES: + user.telopts[ (telopt, party) ] = WANTNO + send_command(user, txneg, telopt) + elif user.telopts[ (telopt, party) ] is WANTYES: + user.telopts[ (telopt, party) ] = WANTYES_OPPOSITE + elif user.telopts[ (telopt, party) ] is WANTNO_OPPOSITE: + user.telopts[ (telopt, party) ] = WANTNO + +def negotiate_telnet_options(user): + u"""Reply to and remove telnet negotiation options from partial_input.""" + import misc + + # make a local copy to play with + text = user.partial_input + + # start at the begining of the input + position = 0 + + # as long as we haven't checked it all + len_text = len(text) + while position < len_text: + + # jump to the first IAC you find + position = text.find( telnet_proto(IAC), position ) + + # if there wasn't an IAC in the input or it's at the end, we're done + if position < 0 or position >= len_text-1: break + + # the byte following the IAC is our command + # (this will need to be byte type during 2to3 migration) + command = ord( text[position+1] ) + + # replace a double (literal) IAC if there's an LF later + if command is IAC: + if text.find("\n", position) > 0: + position += 1 + text = text[:position] + text[position+1:] + else: position += 2 + + # implement an RFC 1143 option negotiation queue here + elif len_text > position+2 and WILL <= command <= DONT: + # this will need to be byte type during 2to3 migration + telopt = ord( text[position+2] ) + if telopt in supported: + if command <= WONT: + party = HIM + rxpos = WILL + txpos = DO + txneg = DONT + else: + party = US + rxpos = DO + txpos = WILL + txneg = WONT + if (telopt, party) not in user.telopts: + user.telopts[ (telopt, party) ] = NO + if command is rxpos: + if user.telopts[ (telopt, party) ] is NO: + user.telopts[ (telopt, party) ] = YES + send_command(user, txpos, telopt) + elif user.telopts[ (telopt, party) ] is WANTNO: + user.telopts[ (telopt, party) ] = NO + elif user.telopts[ (telopt, party) ] is WANTNO_OPPOSITE: + user.telopts[ (telopt, party) ] = YES + elif user.telopts[ (telopt, party) ] is WANTYES_OPPOSITE: + user.telopts[ (telopt, party) ] = WANTNO + send_command(user, txneg, telopt) + else: user.telopts[ (telopt, party) ] = YES + else: + if user.telopts[ (telopt, party) ] is YES: + user.telopts[ (telopt, party) ] = NO + send_command(user, txneg, telopt) + elif user.telopts[ (telopt, party) ] is WANTNO_OPPOSITE: + user.telopts[ (telopt, party) ] = WANTYES + send_command(user, txpos, telopt) + else: user.telopts[ (telopt, party) ] = NO + elif command is WILL: send_command(user, DONT, telopt ) + else: send_command(user, WONT, telopt) + text = text[:position] + text[position+3:] + + # subnegotiation options + elif len_text > position+4 and command is SB: + # this will need to be byte type during 2to3 migration + telopt = ord( text[position+2] ) + if telopt is TELOPT_NAWS: + # this will need to be byte type during 2to3 migration + user.columns = ord(text[position+3])*256+ord(text[position+4]) + end_subnegotiation = text.find( telnet_proto(IAC, SE), position ) + if end_subnegotiation > 0: + text = text[:position] + text[end_subnegotiation+2:] + else: position += 1 + + # otherwise, strip out a two-byte IAC command + elif len_text > position+2: + misc.log(u"Unknown Telnet IAC command %s ignored." % command) + text = text[:position] + text[position+2:] + + # and this means we got the begining of an IAC + else: position += 1 + + # replace the input with our cleaned-up text + user.partial_input = text