Fix TypeError with IAC escaping
authorJeremy Stanley <fungi@yuggoth.org>
Sun, 21 Jan 2018 23:39:45 +0000 (23:39 +0000)
committerJeremy Stanley <fungi@yuggoth.org>
Sun, 21 Jan 2018 23:39:45 +0000 (23:39 +0000)
Declare the literal newline as a byte type when analyzing an input
sequence with a double IAC byte, fixing a lingering crash which
raised a TypeError in negotiate_telnet_options(). Add regression
testing for proper handling and deduplication of an inline IAC pair.
While we're here, improve the log message about undecodeable UTF-8
sequences to be clear it's not just some of the bytes which were
discarded.

mudpy/misc.py
mudpy/telnet.py
mudpy/tests/selftest.py

index f325364..154b555 100644 (file)
@@ -1,6 +1,6 @@
 """Miscellaneous functions for the mudpy engine."""
 
-# Copyright (c) 2004-2017 Jeremy Stanley <fungi@yuggoth.org>. Permission
+# Copyright (c) 2004-2018 Jeremy Stanley <fungi@yuggoth.org>. Permission
 # to use, copy, modify, and distribute this software is granted under
 # terms provided in the LICENSE file distributed with this software.
 
@@ -898,7 +898,7 @@ class User:
                 try:
                     line = line.decode("utf-8")
                 except UnicodeDecodeError:
-                    logline = "Non-UTF-8 characters from "
+                    logline = "Non-UTF-8 sequence from "
                     if self.account and self.account.get("name"):
                         logline += self.account.get("name") + ": "
                     else:
index 8d8870c..245f44a 100644 (file)
@@ -1,6 +1,6 @@
 """Telnet functions and constants for the mudpy engine."""
 
-# Copyright (c) 2004-2017 Jeremy Stanley <fungi@yuggoth.org>. Permission
+# Copyright (c) 2004-2018 Jeremy Stanley <fungi@yuggoth.org>. Permission
 # to use, copy, modify, and distribute this software is granted under
 # terms provided in the LICENSE file distributed with this software.
 
@@ -127,7 +127,7 @@ def negotiate_telnet_options(user):
 
         # replace a double (literal) IAC if there's an LF later
         if command is IAC:
-            if text.find("\n", position) > 0:
+            if text.find(b"\n", position) > 0:
                 position += 1
                 text = text[:position] + text[position + 1:]
             else:
index cba69c6..766f01d 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2004-2017 Jeremy Stanley <fungi@yuggoth.org>. Permission
+# Copyright (c) 2004-2018 Jeremy Stanley <fungi@yuggoth.org>. Permission
 # to use, copy, modify, and distribute this software is granted under
 # terms provided in the LICENSE file distributed with this software.
 
@@ -143,6 +143,14 @@ test_admin_setup = (
     (2, "Whom would you like to awaken?", ""),
 )
 
+test_telnet_iac = (
+    # Send a double (escaped) IAC byte within other text, which should get
+    # unescaped and deduplicated to a single \xff in the buffer and then
+    # the line of input discarded as a non-UTF-8 sequence
+    (2, "> ", b"say argle\xff\xffbargle\r\n"),
+    (2, r"Non-UTF-8 sequence from admin: b'say argle\\xffbargle'.*> ", ""),
+)
+
 test_admin_restriction = (
     (0, "> ", "help halt"),
     (0, r"That is not an available command\.", "halt"),
@@ -241,6 +249,7 @@ dialogue = (
     (test_actor_disappears, "actor spontaneous disappearance"),
     (test_account1_teardown, "second account teardown"),
     (test_admin_setup, "admin account setup"),
+    (test_telnet_iac, "escape stray telnet iac bytes"),
     (test_admin_restriction, "restricted admin commands"),
     (test_admin_help, "admin help"),
     (test_reload, "reload"),
@@ -297,9 +306,19 @@ def main():
                       % (conversant, question, conversant))
                 success = False
                 break
-            print("luser%s sending: %s" % (conversant, answer))
-            lusers[conversant].write(("%s\r\n" % answer).encode("utf-8"))
-            captures[conversant] += "%s\r\n" % answer
+            if type(answer) is str:
+                print("luser%s sending: %s" % (conversant, answer))
+                lusers[conversant].write(("%s\r\n" % answer).encode("utf-8"))
+                captures[conversant] += "%s\r\n" % answer
+            elif type(answer) is bytes:
+                print("luser%s sending raw bytes: %s" % (conversant, answer))
+                lusers[conversant].get_socket().send(answer)
+                captures[conversant] += "!!!RAW BYTES: %s" % answer
+            else:
+                print("ERROR: answer provided with unsupported type %s"
+                      % type(answer))
+                success = False
+                break
         if not success:
             break
         print("Completed in %.3f seconds." % (time.time() - test_start))