From: Jeremy Stanley Date: Sun, 21 Jan 2018 23:39:45 +0000 (+0000) Subject: Fix TypeError with IAC escaping X-Git-Url: https://mudpy.org/gitweb?p=mudpy.git;a=commitdiff_plain;h=c677014e04ec3818187eff679bf378b9be735c15 Fix TypeError with IAC escaping Declare the literal newline as a byte type when analyzing an input sequence with a double IAC byte, fixing a lingering crash which raised a TypeError in negotiate_telnet_options(). Add regression testing for proper handling and deduplication of an inline IAC pair. While we're here, improve the log message about undecodeable UTF-8 sequences to be clear it's not just some of the bytes which were discarded. --- diff --git a/mudpy/misc.py b/mudpy/misc.py index f325364..154b555 100644 --- a/mudpy/misc.py +++ b/mudpy/misc.py @@ -1,6 +1,6 @@ """Miscellaneous functions for the mudpy engine.""" -# Copyright (c) 2004-2017 Jeremy Stanley . Permission +# Copyright (c) 2004-2018 Jeremy Stanley . Permission # to use, copy, modify, and distribute this software is granted under # terms provided in the LICENSE file distributed with this software. @@ -898,7 +898,7 @@ class User: try: line = line.decode("utf-8") except UnicodeDecodeError: - logline = "Non-UTF-8 characters from " + logline = "Non-UTF-8 sequence from " if self.account and self.account.get("name"): logline += self.account.get("name") + ": " else: diff --git a/mudpy/telnet.py b/mudpy/telnet.py index 8d8870c..245f44a 100644 --- a/mudpy/telnet.py +++ b/mudpy/telnet.py @@ -1,6 +1,6 @@ """Telnet functions and constants for the mudpy engine.""" -# Copyright (c) 2004-2017 Jeremy Stanley . Permission +# Copyright (c) 2004-2018 Jeremy Stanley . Permission # to use, copy, modify, and distribute this software is granted under # terms provided in the LICENSE file distributed with this software. @@ -127,7 +127,7 @@ def negotiate_telnet_options(user): # replace a double (literal) IAC if there's an LF later if command is IAC: - if text.find("\n", position) > 0: + if text.find(b"\n", position) > 0: position += 1 text = text[:position] + text[position + 1:] else: diff --git a/mudpy/tests/selftest.py b/mudpy/tests/selftest.py index cba69c6..766f01d 100644 --- a/mudpy/tests/selftest.py +++ b/mudpy/tests/selftest.py @@ -1,4 +1,4 @@ -# Copyright (c) 2004-2017 Jeremy Stanley . Permission +# Copyright (c) 2004-2018 Jeremy Stanley . Permission # to use, copy, modify, and distribute this software is granted under # terms provided in the LICENSE file distributed with this software. @@ -143,6 +143,14 @@ test_admin_setup = ( (2, "Whom would you like to awaken?", ""), ) +test_telnet_iac = ( + # Send a double (escaped) IAC byte within other text, which should get + # unescaped and deduplicated to a single \xff in the buffer and then + # the line of input discarded as a non-UTF-8 sequence + (2, "> ", b"say argle\xff\xffbargle\r\n"), + (2, r"Non-UTF-8 sequence from admin: b'say argle\\xffbargle'.*> ", ""), +) + test_admin_restriction = ( (0, "> ", "help halt"), (0, r"That is not an available command\.", "halt"), @@ -241,6 +249,7 @@ dialogue = ( (test_actor_disappears, "actor spontaneous disappearance"), (test_account1_teardown, "second account teardown"), (test_admin_setup, "admin account setup"), + (test_telnet_iac, "escape stray telnet iac bytes"), (test_admin_restriction, "restricted admin commands"), (test_admin_help, "admin help"), (test_reload, "reload"), @@ -297,9 +306,19 @@ def main(): % (conversant, question, conversant)) success = False break - print("luser%s sending: %s" % (conversant, answer)) - lusers[conversant].write(("%s\r\n" % answer).encode("utf-8")) - captures[conversant] += "%s\r\n" % answer + if type(answer) is str: + print("luser%s sending: %s" % (conversant, answer)) + lusers[conversant].write(("%s\r\n" % answer).encode("utf-8")) + captures[conversant] += "%s\r\n" % answer + elif type(answer) is bytes: + print("luser%s sending raw bytes: %s" % (conversant, answer)) + lusers[conversant].get_socket().send(answer) + captures[conversant] += "!!!RAW BYTES: %s" % answer + else: + print("ERROR: answer provided with unsupported type %s" + % type(answer)) + success = False + break if not success: break print("Completed in %.3f seconds." % (time.time() - test_start))