Filter non-ASCII input when not in binary mode
[mudpy.git] / mudpy / tests / selftest.py
index 1b9c81b..e120e70 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2004-2017 Jeremy Stanley <fungi@yuggoth.org>. Permission
+# Copyright (c) 2004-2018 Jeremy Stanley <fungi@yuggoth.org>. Permission
 # to use, copy, modify, and distribute this software is granted under
 # terms provided in the LICENSE file distributed with this software.
 
@@ -96,6 +96,11 @@ test_chat_mode = (
     (0, r'says, "Now less chatty\."', ""),
 )
 
+test_wrapping = (
+    (0, '> ', "say " + 100 * "o"),
+    (1, r'says,\r\n"O[o]+\."', ""),
+)
+
 test_movement = (
     (0, "> ", "move north"),
     (0, r"You exit to the north\.", ""),
@@ -143,6 +148,21 @@ test_admin_setup = (
     (2, "Whom would you like to awaken?", ""),
 )
 
+test_telnet_iac = (
+    # Send a double (escaped) IAC byte within other text, which should get
+    # unescaped and deduplicated to a single \xff in the buffer and then
+    # the line of input discarded as a non-ASCII sequence
+    (2, "> ", b"say argle\xff\xffbargle\r\n"),
+    (2, r"Non-ASCII characters from admin: b'say argle\\xffbargle'.*> ", ""),
+)
+
+test_telnet_unknown = (
+    # Send an unsupported negotiation command #127 which should get filtered
+    # from the line of input
+    (2, "> ", b"say glop\xff\x7fglyf\r\n"),
+    (2, r'Unknown Telnet IAC command 127 ignored\..*"Glopglyf\.".*> ', ""),
+)
+
 test_admin_restriction = (
     (0, "> ", "help halt"),
     (0, r"That is not an available command\.", "halt"),
@@ -171,6 +191,11 @@ test_set_refused = (
     (2, r'The "mudpy\.limit" element is kept in read-only file', ""),
 )
 
+test_show_version = (
+    (2, "> ", "show version"),
+    (2, r"Running mudpy .* on .* Python 3.*with.*pyyaml.*> ", ""),
+)
+
 test_show_files = (
     (2, "> ", "show files"),
     (2, r'These are the current files containing the universe:.*'
@@ -184,6 +209,18 @@ test_show_file = (
     (2, r'These are the nodes in the.*file:.*internal\.counters.*> ', ""),
 )
 
+test_show_groups = (
+    (2, "> ", "show groups"),
+    (2, r'These are the element groups:.*'
+        r'  \x1b\[32maccount\x1b\[0m.*> ', ""),
+)
+
+test_show_group = (
+    (2, "> ", "show group account"),
+    (2, r'These are the elements in the "account" group:.*'
+        r'  \x1b\[32maccount\.admin\x1b\[0m.*> ', ""),
+)
+
 test_show_element = (
     (2, "> ", "show element mudpy.limit"),
     (2, r'These are the properties of the "mudpy\.limit" element.*'
@@ -225,17 +262,23 @@ dialogue = (
     (test_typo_replacement, "typo replacement"),
     (test_sentence_capitalization, "sentence capitalization"),
     (test_chat_mode, "chat mode"),
+    (test_wrapping, "wrapping"),
     (test_movement, "movement"),
     (test_actor_disappears, "actor spontaneous disappearance"),
     (test_account1_teardown, "second account teardown"),
     (test_admin_setup, "admin account setup"),
+    (test_telnet_iac, "escape stray telnet iac bytes"),
+    (test_telnet_unknown, "strip unknown telnet command"),
     (test_admin_restriction, "restricted admin commands"),
     (test_admin_help, "admin help"),
     (test_reload, "reload"),
     (test_set_facet, "set facet"),
     (test_set_refused, "refuse altering read-only element"),
+    (test_show_version, "show version and diagnostic info"),
     (test_show_files, "show a list of loaded files"),
     (test_show_file, "show nodes from a specific file"),
+    (test_show_groups, "show groups"),
+    (test_show_group, "show group"),
     (test_show_element, "show element"),
     (test_show_log, "show log"),
     (test_custom_loglevel, "custom loglevel"),
@@ -283,9 +326,19 @@ def main():
                       % (conversant, question, conversant))
                 success = False
                 break
-            print("luser%s sending: %s" % (conversant, answer))
-            lusers[conversant].write(("%s\r\n" % answer).encode("utf-8"))
-            captures[conversant] += "%s\r\n" % answer
+            if type(answer) is str:
+                print("luser%s sending: %s" % (conversant, answer))
+                lusers[conversant].write(("%s\r\n" % answer).encode("utf-8"))
+                captures[conversant] += "%s\r\n" % answer
+            elif type(answer) is bytes:
+                print("luser%s sending raw bytes: %s" % (conversant, answer))
+                lusers[conversant].get_socket().send(answer)
+                captures[conversant] += "!!!RAW BYTES: %s" % answer
+            else:
+                print("ERROR: answer provided with unsupported type %s"
+                      % type(answer))
+                success = False
+                break
         if not success:
             break
         print("Completed in %.3f seconds." % (time.time() - test_start))