Escape replacement macros in preferences
[mudpy.git] / mudpy / tests / selftest.py
index 949488d..d98755a 100644 (file)
@@ -1,6 +1,6 @@
-# Copyright (c) 2004-2018 Jeremy Stanley <fungi@yuggoth.org>. Permission
-# to use, copy, modify, and distribute this software is granted under
-# terms provided in the LICENSE file distributed with this software.
+# Copyright (c) 2004-2019 mudpy authors. Permission to use, copy,
+# modify, and distribute this software is granted under terms
+# provided in the LICENSE file distributed with this software.
 
 import os
 import pathlib
@@ -111,6 +111,11 @@ test_forbid_ansi_input = (
     (1, r'says, "\[35mfoo\[0m\."', ""),
 )
 
+test_escape_macros = (
+    (0, '> ', "say $(red)bar$(nrm)"),
+    (1, r'says, "\$\(red\)bar\$\(nrm\)."', ""),
+)
+
 test_movement = (
     (0, "> ", "move north"),
     (0, r"You exit to the north\.", ""),
@@ -158,19 +163,43 @@ test_admin_setup = (
     (2, "Whom would you like to awaken?", ""),
 )
 
+test_preferences = (
+    (0, "> ", "preferences"),
+    (0, r"prompt \x1b\[32m.*> ", "preferences prompt $(foo)"),
+    (0, r"\$\(foo\) ", "preferences prompt"),
+    (0, r"\$\(foo\).*\$\(foo\) ", "preferences prompt >"),
+    (2, "> ", "preferences loglevel 0"),
+    (2, "> ", "preferences"),
+    (2, r"loglevel \x1b\[32m0\x1b\[0m.*> ", "preferences loglevel zero"),
+    (2, r'''cannot be set to type "<class 'str'>"\..*> ''', ""),
+)
+
+test_crlf_eol = (
+    # Send a CR+LF at the end of the line instead of the default CR+NUL,
+    # to make sure they're treated the same
+    (2, "> ", b"say I use CR+LF as my EOL, not CR+NUL.\r\n"),
+    (2, r'You say, "I use CR\+LF as my EOL, not CR\+NUL\.".*> ', ""),
+)
+
 test_telnet_iac = (
     # Send a double (escaped) IAC byte within other text, which should get
     # unescaped and deduplicated to a single \xff in the buffer and then
     # the line of input discarded as a non-ASCII sequence
-    (2, "> ", b"say argle\xff\xffbargle\r\n"),
-    (2, r"Non-ASCII characters from admin: b'say argle\\xffbargle'.*> ", ""),
+    (2, "> ", b"say argle\xff\xffbargle\r\0"),
+    (2, r"Non-ASCII characters from admin: b'say.*argle\\xffbargle'.*> ", ""),
 )
 
-test_telnet_unknown = (
+test_telnet_unknown_command = (
     # Send an unsupported negotiation command #127 which should get filtered
     # from the line of input
-    (2, "> ", b"say glop\xff\x7fglyf\r\n"),
-    (2, r'Unknown Telnet IAC command 127 ignored\..*"Glopglyf\.".*> ', ""),
+    (2, "> ", b"say glop\xff\x7fglyf\r\0"),
+    (2, r'Ignored unknown command 127 from admin\..*"Glopglyf\.".*> ', ""),
+)
+
+test_telnet_unknown_option = (
+    # Send an unassigned negotiation option #127 which should get logged
+    (2, "> ", b"\xff\xfe\x7f\r\0"),
+    (2, r'''Received "don't 127" from admin\..*> ''', ""),
 )
 
 test_admin_restriction = (
@@ -292,12 +321,16 @@ dialogue = (
     (test_chat_mode, "chat mode"),
     (test_wrapping, "wrapping"),
     (test_forbid_ansi_input, "raw escape input is filtered"),
+    (test_escape_macros, "replacement macros are escaped"),
     (test_movement, "movement"),
     (test_actor_disappears, "actor spontaneous disappearance"),
     (test_account1_teardown, "second account teardown"),
     (test_admin_setup, "admin account setup"),
+    (test_preferences, "set and show preferences"),
+    (test_crlf_eol, "send crlf from the client as eol"),
     (test_telnet_iac, "escape stray telnet iac bytes"),
-    (test_telnet_unknown, "strip unknown telnet command"),
+    (test_telnet_unknown_command, "strip unknown telnet command"),
+    (test_telnet_unknown_option, "log unknown telnet option"),
     (test_admin_restriction, "restricted admin commands"),
     (test_admin_help, "admin help"),
     (test_reload, "reload"),
@@ -339,7 +372,6 @@ def start_service(config):
         os.remove(f.name)
     for d in ("data", "var"):
         shutil.rmtree(d, ignore_errors=True)
-    os.mkdir("var")
 
     # Start the service and wait for it to be ready for connections
     service = subprocess.Popen(("mudpy", config),
@@ -360,7 +392,7 @@ def stop_service(service):
     service.terminate()
     returncode = service.wait(10)
     if returncode != 0:
-        print("ERROR: Service exited with code %s." % returncode)
+        tlog("\nERROR: Service exited with code %s." % returncode)
         success = False
 
     # This cleans up a daemonized and disassociated service
@@ -376,21 +408,41 @@ def stop_service(service):
             os.remove(pidfile)
         # The PID file didn't disappear, so we have a hung service
         if os.path.exists(pidfile):
-            print("ERROR: Hung daemon with PID %s." % pid)
+            tlog("\nERROR: Hung daemon with PID %s." % pid)
             success = False
 
     # Log the contents of stdout and stderr, if any
     stdout, stderr = service.communicate()
-    print("Recording stdout as capture_stdout.log.")
+    tlog("\nRecording stdout as capture_stdout.log.")
     serviceout = open("capture_stdout.log", "w")
     serviceout.write(stdout.decode("utf-8"))
-    print("Recording stderr as capture_stderr.log.")
+    tlog("\nRecording stderr as capture_stderr.log.")
     serviceerr = open("capture_stderr.log", "w")
     serviceerr.write(stderr.decode("utf-8"))
 
     return(success)
 
 
+def tlog(message, quiet=False):
+    logfile = "capture_tests.log"
+    with open(logfile, "a") as logfd:
+        logfd.write(message + "\n")
+    if not quiet:
+        sys.stdout.write(message)
+    return True
+
+
+def option_callback(telnet_socket, command, option):
+    if option == b'\x7f':
+        # We use this unassigned option value as a canary, so short-circuit
+        # any response to avoid endlessly looping
+        pass
+    elif command in (telnetlib.DO, telnetlib.DONT):
+        telnet_socket.send(telnetlib.IAC + telnetlib.WONT + option)
+    elif command in (telnetlib.WILL, telnetlib.WONT):
+        telnet_socket.send(telnetlib.IAC + telnetlib.DONT + option)
+
+
 def main():
     captures = ["", "", ""]
     lusers = [telnetlib.Telnet(), telnetlib.Telnet(), telnetlib.Telnet()]
@@ -402,24 +454,26 @@ def main():
         service = start_service(sys.argv[1])
     for luser in lusers:
         luser.open("::1", 4000)
+        luser.set_option_negotiation_callback(option_callback)
     for test, description in dialogue:
-        print("\nTesting %s..." % description)
+        tlog("\nTesting %s..." % description)
         test_start = time.time()
         for conversant, question, answer in test:
-            print("luser%s waiting for: %s" % (conversant, question))
+            tlog("luser%s waiting for: %s" % (conversant, question),
+                 quiet=True)
             try:
                 index, match, received = lusers[conversant].expect(
                     [re.compile(question.encode("utf-8"), flags=re.DOTALL)], 5)
                 captures[conversant] += received.decode("utf-8")
             except ConnectionResetError:
-                print("ERROR: Unable to connect to server.")
+                tlog("\nERROR: Unable to connect to server.")
                 success = False
                 break
             except EOFError:
-                print("ERROR: luser%s premature disconnection expecting:\n\n"
-                      "%s\n\n"
-                      "Check the end of capture_%s.log for received data."
-                      % (conversant, question, conversant))
+                tlog("\nERROR: luser%s premature disconnection expecting:\n\n"
+                     "%s\n\n"
+                     "Check the end of capture_%s.log for received data."
+                     % (conversant, question, conversant))
                 success = False
                 break
             try:
@@ -427,31 +481,31 @@ def main():
                     conversant].read_very_eager().decode("utf-8")
             except Exception:
                 pass
-            if index is not 0:
-                print("ERROR: luser%s did not receive expected string:\n\n"
-                      "%s\n\n"
-                      "Check the end of capture_%s.log for received data."
-                      % (conversant, question, conversant))
+            if index != 0:
+                tlog("\nERROR: luser%s did not receive expected string:\n\n"
+                     "%s\n\n"
+                     "Check the end of capture_%s.log for received data."
+                     % (conversant, question, conversant))
                 success = False
                 break
             if type(answer) is str:
-                print("luser%s sending: %s" % (conversant, answer))
-                lusers[conversant].write(("%s\r\n" % answer).encode("utf-8"))
+                tlog("luser%s sending: %s" % (conversant, answer), quiet=True)
+                lusers[conversant].write(("%s\r\0" % answer).encode("utf-8"))
                 captures[conversant] += "%s\r\n" % answer
             elif type(answer) is bytes:
-                print("luser%s sending raw bytes: %s" % (conversant, answer))
+                tlog("luser%s sending raw bytes: %s" % (conversant, answer),
+                     quiet=True)
                 lusers[conversant].get_socket().send(answer)
                 captures[conversant] += "!!!RAW BYTES: %s" % answer
             else:
-                print("ERROR: answer provided with unsupported type %s"
-                      % type(answer))
+                tlog("\nERROR: answer provided with unsupported type %s"
+                     % type(answer))
                 success = False
                 break
         if not success:
             break
-        print("Completed in %.3f seconds." % (time.time() - test_start))
+        tlog("Completed in %.3f seconds." % (time.time() - test_start))
     duration = time.time() - start
-    print("")
     for conversant in range(len(captures)):
         try:
             captures[conversant] += lusers[
@@ -460,17 +514,17 @@ def main():
             pass
         lusers[conversant].close()
         logfile = "capture_%s.log" % conversant
-        print("Recording session %s as %s." % (conversant, logfile))
+        tlog("\nRecording session %s as %s." % (conversant, logfile))
         log = open(logfile, "w")
         log.write(captures[conversant])
         log.close()
     if not stop_service(service):
         success = False
-    print("\nRan %s tests in %.3f seconds." % (len(dialogue), duration))
+    tlog("\nRan %s tests in %.3f seconds." % (len(dialogue), duration))
     if success:
-        print("SUCCESS")
+        tlog("\nSUCCESS\n")
     else:
-        print("FAILURE")
+        tlog("\nFAILURE\n")
         sys.exit(1)