Package mudpy :: Module telnet
[frames] | no frames]

Source Code for Module mudpy.telnet

  1  # -*- coding: utf-8 -*- 
  2  u"""Telnet functions and constants for the mudpy engine.""" 
  3   
  4  # Copyright (c) 2004-2011 Jeremy Stanley <fungi@yuggoth.org>. Permission 
  5  # to use, copy, modify, and distribute this software is granted under 
  6  # terms provided in the LICENSE file distributed with this software. 
  7   
  8  # telnet options (from bsd's arpa/telnet.h since telnetlib's are ambiguous) 
  9  TELOPT_BINARY = 0 # transmit 8-bit data by the receiver (rfc 856) 
 10  TELOPT_ECHO = 1 # echo received data back to the sender (rfc 857) 
 11  TELOPT_SGA = 3 # suppress transmission of the go ahead character (rfc 858) 
 12  #TELOPT_TTYPE = 24 # exchange terminal type information (rfc 1091) 
 13  TELOPT_EOR = 25 # transmit end-of-record after transmitting data (rfc 885) 
 14  TELOPT_NAWS = 31 # negotiate about window size (rfc 1073) 
 15  TELOPT_LINEMODE = 34 # cooked line-by-line input mode (rfc 1184) 
 16   
 17  supported = ( 
 18     TELOPT_BINARY, 
 19     TELOPT_ECHO, 
 20     TELOPT_SGA, 
 21     TELOPT_EOR, 
 22     TELOPT_NAWS, 
 23     TELOPT_LINEMODE 
 24  ) 
 25   
 26  # telnet commands 
 27  EOR = 239 # end-of-record signal (rfc 885) 
 28  SE = 240 # end of subnegotiation parameters (rfc 854) 
 29  GA = 249 # go ahead signal (rfc 854) 
 30  SB = 250 # what follows is subnegotiation of the indicated option (rfc 854) 
 31  WILL = 251 # desire or confirmation of performing an option (rfc 854) 
 32  WONT = 252 # refusal or confirmation of performing an option (rfc 854) 
 33  DO = 253 # request or confirm performing an option (rfc 854) 
 34  DONT = 254 # demand or confirm no longer performing an option (rfc 854) 
 35  IAC = 255 # interpret as command escape character (rfc 854) 
 36   
 37  # RFC 1143 option negotiation states 
 38  NO = 0 # option is disabled 
 39  YES = 1 # option is enabled 
 40  WANTNO = 2 # demanded disabling option 
 41  WANTYES = 3 # requested enabling option 
 42  WANTNO_OPPOSITE = 4 # demanded disabling option but queued an enable after it 
 43  WANTYES_OPPOSITE = 5 # requested enabling option but queued a disable after it 
 44   
 45  # RFC 1143 option negotiation parties 
 46  HIM = 0 
 47  US = 1 
 48   
49 -def telnet_proto(*arguments):
50 u"""Return a concatenated series of Telnet protocol commands.""" 51 # (this will need to be byte type during 2to3 migration) 52 return "".join( [chr(x) for x in arguments] )
53
54 -def send_command(user, *command):
55 u"""Sends a Telnet command string to the specified user's socket.""" 56 user.send( telnet_proto(IAC, *command), raw=True )
57
58 -def is_enabled(user, telopt, party, state=YES):
59 u"""Returns True if the indicated Telnet option is enabled, False if not.""" 60 if (telopt, party) in user.telopts and user.telopts[ 61 (telopt, party) 62 ] is state: return True 63 else: return False
64
65 -def enable(user, telopt, party):
66 u"""Negotiates enabling a Telnet option for the indicated user's socket.""" 67 if party is HIM: txpos = DO 68 else: txpos = WILL 69 if not (telopt, party) in user.telopts or user.telopts[ 70 (telopt, party) 71 ] is NO: 72 user.telopts[ (telopt, party) ] = WANTYES 73 send_command(user, txpos, telopt) 74 elif user.telopts[ (telopt, party) ] is WANTNO: 75 user.telopts[ (telopt, party) ] = WANTNO_OPPOSITE 76 elif user.telopts[ (telopt, party) ] is WANTYES_OPPOSITE: 77 user.telopts[ (telopt, party) ] = WANTYES
78
79 -def disable(user, telopt, party):
80 u"""Negotiates disabling a Telnet option for the indicated user's socket.""" 81 if party is HIM: txneg = DONT 82 else: txneg = WONT 83 if not (telopt, party) in user.telopts: user.telopts[ (telopt, party) ] = NO 84 elif user.telopts[ (telopt, party) ] is YES: 85 user.telopts[ (telopt, party) ] = WANTNO 86 send_command(user, txneg, telopt) 87 elif user.telopts[ (telopt, party) ] is WANTYES: 88 user.telopts[ (telopt, party) ] = WANTYES_OPPOSITE 89 elif user.telopts[ (telopt, party) ] is WANTNO_OPPOSITE: 90 user.telopts[ (telopt, party) ] = WANTNO
91
92 -def negotiate_telnet_options(user):
93 u"""Reply to and remove telnet negotiation options from partial_input.""" 94 import misc 95 96 # make a local copy to play with 97 text = user.partial_input 98 99 # start at the begining of the input 100 position = 0 101 102 # as long as we haven't checked it all 103 len_text = len(text) 104 while position < len_text: 105 106 # jump to the first IAC you find 107 position = text.find( telnet_proto(IAC), position ) 108 109 # if there wasn't an IAC in the input or it's at the end, we're done 110 if position < 0 or position >= len_text-1: break 111 112 # the byte following the IAC is our command 113 # (this will need to be byte type during 2to3 migration) 114 command = ord( text[position+1] ) 115 116 # replace a double (literal) IAC if there's an LF later 117 if command is IAC: 118 if text.find("\n", position) > 0: 119 position += 1 120 text = text[:position] + text[position+1:] 121 else: position += 2 122 123 # implement an RFC 1143 option negotiation queue here 124 elif len_text > position+2 and WILL <= command <= DONT: 125 # this will need to be byte type during 2to3 migration 126 telopt = ord( text[position+2] ) 127 if telopt in supported: 128 if command <= WONT: 129 party = HIM 130 rxpos = WILL 131 txpos = DO 132 txneg = DONT 133 else: 134 party = US 135 rxpos = DO 136 txpos = WILL 137 txneg = WONT 138 if (telopt, party) not in user.telopts: 139 user.telopts[ (telopt, party) ] = NO 140 if command is rxpos: 141 if user.telopts[ (telopt, party) ] is NO: 142 user.telopts[ (telopt, party) ] = YES 143 send_command(user, txpos, telopt) 144 elif user.telopts[ (telopt, party) ] is WANTNO: 145 user.telopts[ (telopt, party) ] = NO 146 elif user.telopts[ (telopt, party) ] is WANTNO_OPPOSITE: 147 user.telopts[ (telopt, party) ] = YES 148 elif user.telopts[ (telopt, party) ] is WANTYES_OPPOSITE: 149 user.telopts[ (telopt, party) ] = WANTNO 150 send_command(user, txneg, telopt) 151 else: user.telopts[ (telopt, party) ] = YES 152 else: 153 if user.telopts[ (telopt, party) ] is YES: 154 user.telopts[ (telopt, party) ] = NO 155 send_command(user, txneg, telopt) 156 elif user.telopts[ (telopt, party) ] is WANTNO_OPPOSITE: 157 user.telopts[ (telopt, party) ] = WANTYES 158 send_command(user, txpos, telopt) 159 else: user.telopts[ (telopt, party) ] = NO 160 elif command is WILL: send_command(user, DONT, telopt ) 161 else: send_command(user, WONT, telopt) 162 text = text[:position] + text[position+3:] 163 164 # subnegotiation options 165 elif len_text > position+4 and command is SB: 166 # this will need to be byte type during 2to3 migration 167 telopt = ord( text[position+2] ) 168 if telopt is TELOPT_NAWS: 169 # this will need to be byte type during 2to3 migration 170 user.columns = ord(text[position+3])*256+ord(text[position+4]) 171 end_subnegotiation = text.find( telnet_proto(IAC, SE), position ) 172 if end_subnegotiation > 0: 173 text = text[:position] + text[end_subnegotiation+2:] 174 else: position += 1 175 176 # otherwise, strip out a two-byte IAC command 177 elif len_text > position+2: 178 misc.log(u"Unknown Telnet IAC command %s ignored." % command) 179 text = text[:position] + text[position+2:] 180 181 # and this means we got the begining of an IAC 182 else: position += 1 183 184 # replace the input with our cleaned-up text 185 user.partial_input = text
186