X-Git-Url: https://mudpy.org/gitweb?p=mudpy.git;a=blobdiff_plain;f=mudpy%2Ftests%2Fselftest.py;h=ddb09f033569909e65974d6c17fcfa807bf76a17;hp=799ac506234844c4e44c212c3441e3dc6f6547fc;hb=73a4f24fc4819d7b6b50064d660b24b231a33d9e;hpb=68261e7b241348089399e7f0ccbb8e05263f7dc9 diff --git a/mudpy/tests/selftest.py b/mudpy/tests/selftest.py index 799ac50..ddb09f0 100644 --- a/mudpy/tests/selftest.py +++ b/mudpy/tests/selftest.py @@ -1,6 +1,6 @@ -# Copyright (c) 2004-2018 Jeremy Stanley . Permission -# to use, copy, modify, and distribute this software is granted under -# terms provided in the LICENSE file distributed with this software. +# Copyright (c) 2004-2019 mudpy authors. Permission to use, copy, +# modify, and distribute this software is granted under terms +# provided in the LICENSE file distributed with this software. import os import pathlib @@ -111,6 +111,11 @@ test_forbid_ansi_input = ( (1, r'says, "\[35mfoo\[0m\."', ""), ) +test_escape_macros = ( + (0, '> ', "say $(red)bar$(nrm)"), + (1, r'says, "\$\(red\)bar\$\(nrm\)."', ""), +) + test_movement = ( (0, "> ", "move north"), (0, r"You exit to the north\.", ""), @@ -158,19 +163,44 @@ test_admin_setup = ( (2, "Whom would you like to awaken?", ""), ) +test_preferences = ( + (0, "> ", "preferences"), + (0, r"prompt \x1b\[32m.*> ", "preferences prompt $(foo)"), + (0, r"\$\(foo\) ", "preferences prompt"), + (0, r"\$\(foo\).*\$\(foo\) ", "preferences prompt $(time)>"), + (0, "[0-9]> ", "preferences prompt >"), + (2, "> ", "preferences loglevel 0"), + (2, "> ", "preferences"), + (2, r"loglevel \x1b\[32m0\x1b\[0m.*> ", "preferences loglevel zero"), + (2, r'''cannot be set to type ""\..*> ''', ""), +) + +test_crlf_eol = ( + # Send a CR+LF at the end of the line instead of the default CR+NUL, + # to make sure they're treated the same + (2, "> ", b"say I use CR+LF as my EOL, not CR+NUL.\r\n"), + (2, r'You say, "I use CR\+LF as my EOL, not CR\+NUL\.".*> ', ""), +) + test_telnet_iac = ( # Send a double (escaped) IAC byte within other text, which should get # unescaped and deduplicated to a single \xff in the buffer and then # the line of input discarded as a non-ASCII sequence - (2, "> ", b"say argle\xff\xffbargle\r\n"), - (2, r"Non-ASCII characters from admin: b'say argle\\xffbargle'.*> ", ""), + (2, "> ", b"say argle\xff\xffbargle\r\0"), + (2, r"Non-ASCII characters from admin: b'say.*argle\\xffbargle'.*> ", ""), ) -test_telnet_unknown = ( +test_telnet_unknown_command = ( # Send an unsupported negotiation command #127 which should get filtered # from the line of input - (2, "> ", b"say glop\xff\x7fglyf\r\n"), - (2, r'Unknown Telnet IAC command 127 ignored\..*"Glopglyf\.".*> ', ""), + (2, "> ", b"say glop\xff\x7fglyf\r\0"), + (2, r'Ignored unknown command 127 from admin\..*"Glopglyf\.".*> ', ""), +) + +test_telnet_unknown_option = ( + # Send an unassigned negotiation option #127 which should get logged + (2, "> ", b"\xff\xfe\x7f\r\0"), + (2, r'''Received "don't 127" from admin\..*> ''', ""), ) test_admin_restriction = ( @@ -185,6 +215,12 @@ test_admin_help = ( (2, "This will save all active accounts", ""), ) +test_abbrev = ( + (0, "> ", "help mov"), + (0, r"Move in a specific direction\.", "mov north"), + (0, r"You exit to the north\.", ""), +) + test_reload = ( (2, "> ", "reload"), (2, r"Reloading all code modules, configs and data\." @@ -209,6 +245,11 @@ test_show_version = ( (2, r"Running mudpy .* on .* Python 3.*with.*pyyaml.*> ", ""), ) +test_show_time = ( + (2, "> ", "show time"), + (2, r"\r\n[0-9]+ increments elapsed.*> ", ""), +) + test_show_files = ( (2, "> ", "show files"), (2, r'These are the current files containing the universe:.*' @@ -243,6 +284,12 @@ test_show_element = ( r' \x1b\[32mgender: \x1b\[31mfemale.*> ', ""), ) +test_show_result = ( + (2, "> ", "show result 12345*67890"), + (2, r"\r\n838102050\r\n.*> ", "show result 1/0"), + (2, r"Your expression raised an exception.*division by zero.*> ", ""), +) + test_show_log = ( (2, "> ", "show log"), (2, r"There are [0-9]+ log lines in memory and [0-9]+ at or above level " @@ -292,23 +339,30 @@ dialogue = ( (test_chat_mode, "chat mode"), (test_wrapping, "wrapping"), (test_forbid_ansi_input, "raw escape input is filtered"), + (test_escape_macros, "replacement macros are escaped"), (test_movement, "movement"), (test_actor_disappears, "actor spontaneous disappearance"), (test_account1_teardown, "second account teardown"), (test_admin_setup, "admin account setup"), + (test_preferences, "set and show preferences"), + (test_crlf_eol, "send crlf from the client as eol"), (test_telnet_iac, "escape stray telnet iac bytes"), - (test_telnet_unknown, "strip unknown telnet command"), + (test_telnet_unknown_command, "strip unknown telnet command"), + (test_telnet_unknown_option, "log unknown telnet option"), (test_admin_restriction, "restricted admin commands"), (test_admin_help, "admin help"), + (test_abbrev, "command abbreviation"), (test_reload, "reload"), (test_set_facet, "set facet"), (test_set_refused, "refuse altering read-only element"), (test_show_version, "show version and diagnostic info"), + (test_show_time, "show elapsed world clock increments"), (test_show_files, "show a list of loaded files"), (test_show_file, "show nodes from a specific file"), (test_show_groups, "show groups"), (test_show_group, "show group"), (test_show_element, "show element"), + (test_show_result, "show result of a python expression"), (test_show_log, "show log"), (test_custom_loglevel, "custom loglevel"), (test_invalid_loglevel, "invalid loglevel"), @@ -339,7 +393,6 @@ def start_service(config): os.remove(f.name) for d in ("data", "var"): shutil.rmtree(d, ignore_errors=True) - os.mkdir("var") # Start the service and wait for it to be ready for connections service = subprocess.Popen(("mudpy", config), @@ -400,6 +453,17 @@ def tlog(message, quiet=False): return True +def option_callback(telnet_socket, command, option): + if option == b'\x7f': + # We use this unassigned option value as a canary, so short-circuit + # any response to avoid endlessly looping + pass + elif command in (telnetlib.DO, telnetlib.DONT): + telnet_socket.send(telnetlib.IAC + telnetlib.WONT + option) + elif command in (telnetlib.WILL, telnetlib.WONT): + telnet_socket.send(telnetlib.IAC + telnetlib.DONT + option) + + def main(): captures = ["", "", ""] lusers = [telnetlib.Telnet(), telnetlib.Telnet(), telnetlib.Telnet()] @@ -411,6 +475,7 @@ def main(): service = start_service(sys.argv[1]) for luser in lusers: luser.open("::1", 4000) + luser.set_option_negotiation_callback(option_callback) for test, description in dialogue: tlog("\nTesting %s..." % description) test_start = time.time() @@ -437,7 +502,7 @@ def main(): conversant].read_very_eager().decode("utf-8") except Exception: pass - if index is not 0: + if index != 0: tlog("\nERROR: luser%s did not receive expected string:\n\n" "%s\n\n" "Check the end of capture_%s.log for received data." @@ -446,7 +511,7 @@ def main(): break if type(answer) is str: tlog("luser%s sending: %s" % (conversant, answer), quiet=True) - lusers[conversant].write(("%s\r\n" % answer).encode("utf-8")) + lusers[conversant].write(("%s\r\0" % answer).encode("utf-8")) captures[conversant] += "%s\r\n" % answer elif type(answer) is bytes: tlog("luser%s sending raw bytes: %s" % (conversant, answer),