X-Git-Url: https://mudpy.org/gitweb?p=mudpy.git;a=blobdiff_plain;f=mudpy%2Ftests%2Fselftest.py;h=d98755a552af1f61c1e776f8c6f9ad537777ce2b;hp=43e93ae192f3ad46e4c4dd64128d887b9fa4ad20;hb=641a045ea5407a56626c7ea718efc7bbf5152ba1;hpb=b64bdabe02e5d30113df5052050cfb9b62683a74 diff --git a/mudpy/tests/selftest.py b/mudpy/tests/selftest.py index 43e93ae..d98755a 100644 --- a/mudpy/tests/selftest.py +++ b/mudpy/tests/selftest.py @@ -111,6 +111,11 @@ test_forbid_ansi_input = ( (1, r'says, "\[35mfoo\[0m\."', ""), ) +test_escape_macros = ( + (0, '> ', "say $(red)bar$(nrm)"), + (1, r'says, "\$\(red\)bar\$\(nrm\)."', ""), +) + test_movement = ( (0, "> ", "move north"), (0, r"You exit to the north\.", ""), @@ -160,9 +165,9 @@ test_admin_setup = ( test_preferences = ( (0, "> ", "preferences"), - (0, r"prompt \x1b\[32m.*> ", "preferences prompt #"), - (0, r"# ", "preferences prompt"), - (0, r"#.*# ", "preferences prompt >"), + (0, r"prompt \x1b\[32m.*> ", "preferences prompt $(foo)"), + (0, r"\$\(foo\) ", "preferences prompt"), + (0, r"\$\(foo\).*\$\(foo\) ", "preferences prompt >"), (2, "> ", "preferences loglevel 0"), (2, "> ", "preferences"), (2, r"loglevel \x1b\[32m0\x1b\[0m.*> ", "preferences loglevel zero"), @@ -181,16 +186,22 @@ test_telnet_iac = ( # unescaped and deduplicated to a single \xff in the buffer and then # the line of input discarded as a non-ASCII sequence (2, "> ", b"say argle\xff\xffbargle\r\0"), - (2, r"Non-ASCII characters from admin: b'say argle\\xffbargle'.*> ", ""), + (2, r"Non-ASCII characters from admin: b'say.*argle\\xffbargle'.*> ", ""), ) -test_telnet_unknown = ( +test_telnet_unknown_command = ( # Send an unsupported negotiation command #127 which should get filtered # from the line of input (2, "> ", b"say glop\xff\x7fglyf\r\0"), (2, r'Ignored unknown command 127 from admin\..*"Glopglyf\.".*> ', ""), ) +test_telnet_unknown_option = ( + # Send an unassigned negotiation option #127 which should get logged + (2, "> ", b"\xff\xfe\x7f\r\0"), + (2, r'''Received "don't 127" from admin\..*> ''', ""), +) + test_admin_restriction = ( (0, "> ", "help halt"), (0, r"That is not an available command\.", "halt"), @@ -310,6 +321,7 @@ dialogue = ( (test_chat_mode, "chat mode"), (test_wrapping, "wrapping"), (test_forbid_ansi_input, "raw escape input is filtered"), + (test_escape_macros, "replacement macros are escaped"), (test_movement, "movement"), (test_actor_disappears, "actor spontaneous disappearance"), (test_account1_teardown, "second account teardown"), @@ -317,7 +329,8 @@ dialogue = ( (test_preferences, "set and show preferences"), (test_crlf_eol, "send crlf from the client as eol"), (test_telnet_iac, "escape stray telnet iac bytes"), - (test_telnet_unknown, "strip unknown telnet command"), + (test_telnet_unknown_command, "strip unknown telnet command"), + (test_telnet_unknown_option, "log unknown telnet option"), (test_admin_restriction, "restricted admin commands"), (test_admin_help, "admin help"), (test_reload, "reload"), @@ -419,6 +432,17 @@ def tlog(message, quiet=False): return True +def option_callback(telnet_socket, command, option): + if option == b'\x7f': + # We use this unassigned option value as a canary, so short-circuit + # any response to avoid endlessly looping + pass + elif command in (telnetlib.DO, telnetlib.DONT): + telnet_socket.send(telnetlib.IAC + telnetlib.WONT + option) + elif command in (telnetlib.WILL, telnetlib.WONT): + telnet_socket.send(telnetlib.IAC + telnetlib.DONT + option) + + def main(): captures = ["", "", ""] lusers = [telnetlib.Telnet(), telnetlib.Telnet(), telnetlib.Telnet()] @@ -430,6 +454,7 @@ def main(): service = start_service(sys.argv[1]) for luser in lusers: luser.open("::1", 4000) + luser.set_option_negotiation_callback(option_callback) for test, description in dialogue: tlog("\nTesting %s..." % description) test_start = time.time() @@ -456,7 +481,7 @@ def main(): conversant].read_very_eager().decode("utf-8") except Exception: pass - if index is not 0: + if index != 0: tlog("\nERROR: luser%s did not receive expected string:\n\n" "%s\n\n" "Check the end of capture_%s.log for received data."