From a9c695831c3cce25df4c487100525c618a926b3b Mon Sep 17 00:00:00 2001 From: Jeremy Stanley Date: Fri, 18 Feb 2011 18:53:14 +0000 Subject: [PATCH] PEP 8 conformance for telnet server library * lib/mudpy/telnet.py: Conform to the PEP 8 style guide. --- lib/mudpy/telnet.py | 312 ++++++++++++++++++++++++++++------------------------ 1 file changed, 167 insertions(+), 145 deletions(-) diff --git a/lib/mudpy/telnet.py b/lib/mudpy/telnet.py index 293d997..043c86b 100644 --- a/lib/mudpy/telnet.py +++ b/lib/mudpy/telnet.py @@ -1,185 +1,207 @@ # -*- coding: utf-8 -*- u"""Telnet functions and constants for the mudpy engine.""" -# Copyright (c) 2004-2010 Jeremy Stanley . Permission +# Copyright (c) 2004-2011 Jeremy Stanley . Permission # to use, copy, modify, and distribute this software is granted under # terms provided in the LICENSE file distributed with this software. # telnet options (from bsd's arpa/telnet.h since telnetlib's are ambiguous) -TELOPT_BINARY = 0 # transmit 8-bit data by the receiver (rfc 856) -TELOPT_ECHO = 1 # echo received data back to the sender (rfc 857) -TELOPT_SGA = 3 # suppress transmission of the go ahead character (rfc 858) -#TELOPT_TTYPE = 24 # exchange terminal type information (rfc 1091) -TELOPT_EOR = 25 # transmit end-of-record after transmitting data (rfc 885) -TELOPT_NAWS = 31 # negotiate about window size (rfc 1073) -TELOPT_LINEMODE = 34 # cooked line-by-line input mode (rfc 1184) +TELOPT_BINARY = 0 # transmit 8-bit data by the receiver (rfc 856) +TELOPT_ECHO = 1 # echo received data back to the sender (rfc 857) +TELOPT_SGA = 3 # suppress transmission of the go ahead character (rfc 858) +# TELOPT_TTYPE = 24 # exchange terminal type information (rfc 1091) +TELOPT_EOR = 25 # transmit end-of-record after transmitting data (rfc 885) +TELOPT_NAWS = 31 # negotiate about window size (rfc 1073) +TELOPT_LINEMODE = 34 # cooked line-by-line input mode (rfc 1184) supported = ( - TELOPT_BINARY, - TELOPT_ECHO, - TELOPT_SGA, - TELOPT_EOR, - TELOPT_NAWS, - TELOPT_LINEMODE + TELOPT_BINARY, + TELOPT_ECHO, + TELOPT_SGA, + TELOPT_EOR, + TELOPT_NAWS, + TELOPT_LINEMODE ) # telnet commands -EOR = 239 # end-of-record signal (rfc 885) -SE = 240 # end of subnegotiation parameters (rfc 854) -GA = 249 # go ahead signal (rfc 854) -SB = 250 # what follows is subnegotiation of the indicated option (rfc 854) -WILL = 251 # desire or confirmation of performing an option (rfc 854) -WONT = 252 # refusal or confirmation of performing an option (rfc 854) -DO = 253 # request or confirm performing an option (rfc 854) -DONT = 254 # demand or confirm no longer performing an option (rfc 854) -IAC = 255 # interpret as command escape character (rfc 854) +EOR = 239 # end-of-record signal (rfc 885) +SE = 240 # end of subnegotiation parameters (rfc 854) +GA = 249 # go ahead signal (rfc 854) +SB = 250 # what follows is subnegotiation of the indicated option (rfc 854) +WILL = 251 # desire or confirmation of performing an option (rfc 854) +WONT = 252 # refusal or confirmation of performing an option (rfc 854) +DO = 253 # request or confirm performing an option (rfc 854) +DONT = 254 # demand or confirm no longer performing an option (rfc 854) +IAC = 255 # interpret as command escape character (rfc 854) # RFC 1143 option negotiation states -NO = 0 # option is disabled -YES = 1 # option is enabled -WANTNO = 2 # demanded disabling option -WANTYES = 3 # requested enabling option -WANTNO_OPPOSITE = 4 # demanded disabling option but queued an enable after it -WANTYES_OPPOSITE = 5 # requested enabling option but queued a disable after it +NO = 0 # option is disabled +YES = 1 # option is enabled +WANTNO = 2 # demanded disabling option +WANTYES = 3 # requested enabling option +WANTNO_OPPOSITE = 4 # demanded disabling option but queued an enable after it +WANTYES_OPPOSITE = 5 # requested enabling option but queued a disable after it # RFC 1143 option negotiation parties HIM = 0 US = 1 + def telnet_proto(*arguments): - u"""Return a concatenated series of Telnet protocol commands.""" - # (this will need to be byte type during 2to3 migration) - return "".join( [chr(x) for x in arguments] ) + u"""Return a concatenated series of Telnet protocol commands.""" + # (this will need to be byte type during 2to3 migration) + return "".join([chr(x) for x in arguments]) + def send_command(user, *command): - u"""Sends a Telnet command string to the specified user's socket.""" - user.send( telnet_proto(IAC, *command), raw=True ) + u"""Sends a Telnet command string to the specified user's socket.""" + user.send(telnet_proto(IAC, *command), raw=True) + def is_enabled(user, telopt, party, state=YES): - u"""Returns True if the indicated Telnet option is enabled, False if not.""" - if (telopt, party) in user.telopts and user.telopts[ - (telopt, party) - ] is state: return True - else: return False + u"""Returns True if the indicated Telnet option is enabled, False if not.""" + if (telopt, party) in user.telopts and user.telopts[ + (telopt, party) + ] is state: + return True + else: + return False + def enable(user, telopt, party): - u"""Negotiates enabling a Telnet option for the indicated user's socket.""" - if party is HIM: txpos = DO - else: txpos = WILL - if not (telopt, party) in user.telopts or user.telopts[ - (telopt, party) - ] is NO: - user.telopts[ (telopt, party) ] = WANTYES - send_command(user, txpos, telopt) - elif user.telopts[ (telopt, party) ] is WANTNO: - user.telopts[ (telopt, party) ] = WANTNO_OPPOSITE - elif user.telopts[ (telopt, party) ] is WANTYES_OPPOSITE: - user.telopts[ (telopt, party) ] = WANTYES + u"""Negotiates enabling a Telnet option for the indicated user's socket.""" + if party is HIM: + txpos = DO + else: + txpos = WILL + if not (telopt, party) in user.telopts or user.telopts[ + (telopt, party) + ] is NO: + user.telopts[(telopt, party)] = WANTYES + send_command(user, txpos, telopt) + elif user.telopts[(telopt, party)] is WANTNO: + user.telopts[(telopt, party)] = WANTNO_OPPOSITE + elif user.telopts[(telopt, party)] is WANTYES_OPPOSITE: + user.telopts[(telopt, party)] = WANTYES + def disable(user, telopt, party): - u"""Negotiates disabling a Telnet option for the indicated user's socket.""" - if party is HIM: txneg = DONT - else: txneg = WONT - if not (telopt, party) in user.telopts: user.telopts[ (telopt, party) ] = NO - elif user.telopts[ (telopt, party) ] is YES: - user.telopts[ (telopt, party) ] = WANTNO - send_command(user, txneg, telopt) - elif user.telopts[ (telopt, party) ] is WANTYES: - user.telopts[ (telopt, party) ] = WANTYES_OPPOSITE - elif user.telopts[ (telopt, party) ] is WANTNO_OPPOSITE: - user.telopts[ (telopt, party) ] = WANTNO + u"""Negotiates disabling a Telnet option for the indicated user's socket.""" + if party is HIM: + txneg = DONT + else: + txneg = WONT + if not (telopt, party) in user.telopts: + user.telopts[(telopt, party)] = NO + elif user.telopts[(telopt, party)] is YES: + user.telopts[(telopt, party)] = WANTNO + send_command(user, txneg, telopt) + elif user.telopts[(telopt, party)] is WANTYES: + user.telopts[(telopt, party)] = WANTYES_OPPOSITE + elif user.telopts[(telopt, party)] is WANTNO_OPPOSITE: + user.telopts[(telopt, party)] = WANTNO + def negotiate_telnet_options(user): - u"""Reply to and remove telnet negotiation options from partial_input.""" - import misc + u"""Reply to and remove telnet negotiation options from partial_input.""" + import misc - # make a local copy to play with - text = user.partial_input + # make a local copy to play with + text = user.partial_input - # start at the begining of the input - position = 0 + # start at the begining of the input + position = 0 - # as long as we haven't checked it all - len_text = len(text) - while position < len_text: + # as long as we haven't checked it all + len_text = len(text) + while position < len_text: - # jump to the first IAC you find - position = text.find( telnet_proto(IAC), position ) + # jump to the first IAC you find + position = text.find(telnet_proto(IAC), position) - # if there wasn't an IAC in the input or it's at the end, we're done - if position < 0 or position >= len_text-1: break + # if there wasn't an IAC in the input or it's at the end, we're done + if position < 0 or position >= len_text - 1: + break - # the byte following the IAC is our command - # (this will need to be byte type during 2to3 migration) - command = ord( text[position+1] ) + # the byte following the IAC is our command + # (this will need to be byte type during 2to3 migration) + command = ord(text[position + 1]) - # replace a double (literal) IAC if there's an LF later - if command is IAC: - if text.find("\n", position) > 0: - position += 1 - text = text[:position] + text[position+1:] - else: position += 2 - - # implement an RFC 1143 option negotiation queue here - elif len_text > position+2 and WILL <= command <= DONT: - # this will need to be byte type during 2to3 migration - telopt = ord( text[position+2] ) - if telopt in supported: - if command <= WONT: - party = HIM - rxpos = WILL - txpos = DO - txneg = DONT + # replace a double (literal) IAC if there's an LF later + if command is IAC: + if text.find("\n", position) > 0: + position += 1 + text = text[:position] + text[position + 1:] else: - party = US - rxpos = DO - txpos = WILL - txneg = WONT - if (telopt, party) not in user.telopts: - user.telopts[ (telopt, party) ] = NO - if command is rxpos: - if user.telopts[ (telopt, party) ] is NO: - user.telopts[ (telopt, party) ] = YES - send_command(user, txpos, telopt) - elif user.telopts[ (telopt, party) ] is WANTNO: - user.telopts[ (telopt, party) ] = NO - elif user.telopts[ (telopt, party) ] is WANTNO_OPPOSITE: - user.telopts[ (telopt, party) ] = YES - elif user.telopts[ (telopt, party) ] is WANTYES_OPPOSITE: - user.telopts[ (telopt, party) ] = WANTNO - send_command(user, txneg, telopt) - else: user.telopts[ (telopt, party) ] = YES + position += 2 + + # implement an RFC 1143 option negotiation queue here + elif len_text > position + 2 and WILL <= command <= DONT: + # this will need to be byte type during 2to3 migration + telopt = ord(text[position + 2]) + if telopt in supported: + if command <= WONT: + party = HIM + rxpos = WILL + txpos = DO + txneg = DONT + else: + party = US + rxpos = DO + txpos = WILL + txneg = WONT + if (telopt, party) not in user.telopts: + user.telopts[(telopt, party)] = NO + if command is rxpos: + if user.telopts[(telopt, party)] is NO: + user.telopts[(telopt, party)] = YES + send_command(user, txpos, telopt) + elif user.telopts[(telopt, party)] is WANTNO: + user.telopts[(telopt, party)] = NO + elif user.telopts[(telopt, party)] is WANTNO_OPPOSITE: + user.telopts[(telopt, party)] = YES + elif user.telopts[(telopt, party)] is WANTYES_OPPOSITE: + user.telopts[(telopt, party)] = WANTNO + send_command(user, txneg, telopt) + else: + user.telopts[(telopt, party)] = YES + else: + if user.telopts[(telopt, party)] is YES: + user.telopts[(telopt, party)] = NO + send_command(user, txneg, telopt) + elif user.telopts[(telopt, party)] is WANTNO_OPPOSITE: + user.telopts[(telopt, party)] = WANTYES + send_command(user, txpos, telopt) + else: + user.telopts[(telopt, party)] = NO + elif command is WILL: + send_command(user, DONT, telopt) else: - if user.telopts[ (telopt, party) ] is YES: - user.telopts[ (telopt, party) ] = NO - send_command(user, txneg, telopt) - elif user.telopts[ (telopt, party) ] is WANTNO_OPPOSITE: - user.telopts[ (telopt, party) ] = WANTYES - send_command(user, txpos, telopt) - else: user.telopts[ (telopt, party) ] = NO - elif command is WILL: send_command(user, DONT, telopt ) - else: send_command(user, WONT, telopt) - text = text[:position] + text[position+3:] - - # subnegotiation options - elif len_text > position+4 and command is SB: - # this will need to be byte type during 2to3 migration - telopt = ord( text[position+2] ) - if telopt is TELOPT_NAWS: + send_command(user, WONT, telopt) + text = text[:position] + text[position + 3:] + + # subnegotiation options + elif len_text > position + 4 and command is SB: # this will need to be byte type during 2to3 migration - user.columns = ord(text[position+3])*256+ord(text[position+4]) - end_subnegotiation = text.find( telnet_proto(IAC, SE), position ) - if end_subnegotiation > 0: - text = text[:position] + text[end_subnegotiation+2:] - else: position += 1 - - # otherwise, strip out a two-byte IAC command - elif len_text > position+2: - misc.log(u"Unknown Telnet IAC command %s ignored." % command) - text = text[:position] + text[position+2:] - - # and this means we got the begining of an IAC - else: position += 1 - - # replace the input with our cleaned-up text - user.partial_input = text + telopt = ord(text[position + 2]) + if telopt is TELOPT_NAWS: + # this will need to be byte type during 2to3 migration + user.columns = ord(text[position + 3]) * \ + 256 + ord(text[position + 4]) + end_subnegotiation = text.find(telnet_proto(IAC, SE), position) + if end_subnegotiation > 0: + text = text[:position] + text[end_subnegotiation + 2:] + else: + position += 1 + + # otherwise, strip out a two-byte IAC command + elif len_text > position + 2: + misc.log(u"Unknown Telnet IAC command %s ignored." % command) + text = text[:position] + text[position + 2:] + + # and this means we got the begining of an IAC + else: + position += 1 + + # replace the input with our cleaned-up text + user.partial_input = text -- 2.11.0