X-Git-Url: https://mudpy.org/gitweb?p=mudpy.git;a=blobdiff_plain;f=mudpy%2Ftests%2Fselftest.py;h=949488d969762829482f7e6ae3b3869aaff92857;hp=bf324345e9e7acf780e4dc46ec00558b23dd5603;hb=46bbc0afe053d9c8bac9f3a343ead69807fd886d;hpb=34d256939f22a5186b32d0628ed399e4b644e798 diff --git a/mudpy/tests/selftest.py b/mudpy/tests/selftest.py index bf32434..949488d 100644 --- a/mudpy/tests/selftest.py +++ b/mudpy/tests/selftest.py @@ -3,11 +3,16 @@ # terms provided in the LICENSE file distributed with this software. import os +import pathlib import re +import shutil +import subprocess import sys import telnetlib import time +pidfile = "var/mudpy.pid" + test_account0_setup = ( (0, "Identify yourself:", "luser0"), (0, "Enter your choice:", "n"), @@ -101,6 +106,11 @@ test_wrapping = ( (1, r'says,\r\n"O[o]+\."', ""), ) +test_forbid_ansi_input = ( + (0, '> ', "say \x1b[35mfoo\x1b[0m"), + (1, r'says, "\[35mfoo\[0m\."', ""), +) + test_movement = ( (0, "> ", "move north"), (0, r"You exit to the north\.", ""), @@ -151,9 +161,9 @@ test_admin_setup = ( test_telnet_iac = ( # Send a double (escaped) IAC byte within other text, which should get # unescaped and deduplicated to a single \xff in the buffer and then - # the line of input discarded as a non-UTF-8 sequence + # the line of input discarded as a non-ASCII sequence (2, "> ", b"say argle\xff\xffbargle\r\n"), - (2, r"Non-UTF-8 sequence from admin: b'say argle\\xffbargle'.*> ", ""), + (2, r"Non-ASCII characters from admin: b'say argle\\xffbargle'.*> ", ""), ) test_telnet_unknown = ( @@ -178,7 +188,10 @@ test_admin_help = ( test_reload = ( (2, "> ", "reload"), (2, r"Reloading all code modules, configs and data\." - r".* User admin reloaded the world\.", ""), + r".* User admin reloaded the world\.", + "show element account.admin"), + (2, 'These are the properties of the "account.admin" element.*' + r' \x1b\[32mpasshash:\r\n\x1b\[31m\$.*> ', ""), ) test_set_facet = ( @@ -253,16 +266,32 @@ test_log_no_errors = ( (2, r"None of the [0-9]+ lines in memory matches your request\.", ""), ) +final_cleanup = ( + (0, "> ", "quit"), + (0, "What would you like to do?", "d"), + (0, "Whom would you like to delete?", ""), + (0, "What would you like to do?", "p"), + (0, "permanently delete your account?", "y"), + (0, "Disconnecting...", ""), + (2, "> ", "quit"), + (2, "What would you like to do?", "d"), + (2, "Whom would you like to delete?", ""), + (2, "What would you like to do?", "p"), + (2, "permanently delete your account?", "y"), + (2, "Disconnecting...", ""), +) + dialogue = ( (test_account0_setup, "first account setup"), (test_account1_setup, "second account setup"), (test_actor_appears, "actor spontaneous appearance"), - (test_explicit_punctuation, " explicit punctuation"), + (test_explicit_punctuation, "explicit punctuation"), (test_implicit_punctuation, "implicit punctuation"), (test_typo_replacement, "typo replacement"), (test_sentence_capitalization, "sentence capitalization"), (test_chat_mode, "chat mode"), (test_wrapping, "wrapping"), + (test_forbid_ansi_input, "raw escape input is filtered"), (test_movement, "movement"), (test_actor_disappears, "actor spontaneous disappearance"), (test_account1_teardown, "second account teardown"), @@ -284,14 +313,93 @@ dialogue = ( (test_custom_loglevel, "custom loglevel"), (test_invalid_loglevel, "invalid loglevel"), (test_log_no_errors, "no errors logged"), + (final_cleanup, "delete remaining accounts"), ) +def start_service(config): + # Clean up any previously run daemon which didn't terminate + if os.path.exists(pidfile): + pidfd = open(pidfile) + pid = int(pidfd.read()) + try: + # Stop the running service + os.kill(pid, 15) + time.sleep(1) + except ProcessLookupError: + # If there was no process, just remove the stale PID file + os.remove(pidfile) + # If there's a preexisting hung service, we can't proceed + assert not os.path.exists(pidfile) + + # Clean up any previous test output + for f in pathlib.Path(".").glob("capture_*.log"): + # have to use .name here since remove() doesn't support passing a + # PosixPath argument until Python3.6 + os.remove(f.name) + for d in ("data", "var"): + shutil.rmtree(d, ignore_errors=True) + os.mkdir("var") + + # Start the service and wait for it to be ready for connections + service = subprocess.Popen(("mudpy", config), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + time.sleep(1) + return(service) + + +def stop_service(service): + success = True + + # The no-op case when no service was started + if service is None: + return(success) + + # This handles when the service is running as a direct child process + service.terminate() + returncode = service.wait(10) + if returncode != 0: + print("ERROR: Service exited with code %s." % returncode) + success = False + + # This cleans up a daemonized and disassociated service + if os.path.exists(pidfile): + pidfd = open(pidfile) + pid = int(pidfd.read()) + try: + # Stop the running service + os.kill(pid, 15) + time.sleep(1) + except ProcessLookupError: + # If there was no process, just remove the stale PID file + os.remove(pidfile) + # The PID file didn't disappear, so we have a hung service + if os.path.exists(pidfile): + print("ERROR: Hung daemon with PID %s." % pid) + success = False + + # Log the contents of stdout and stderr, if any + stdout, stderr = service.communicate() + print("Recording stdout as capture_stdout.log.") + serviceout = open("capture_stdout.log", "w") + serviceout.write(stdout.decode("utf-8")) + print("Recording stderr as capture_stderr.log.") + serviceerr = open("capture_stderr.log", "w") + serviceerr.write(stderr.decode("utf-8")) + + return(success) + + def main(): captures = ["", "", ""] lusers = [telnetlib.Telnet(), telnetlib.Telnet(), telnetlib.Telnet()] success = True start = time.time() + service = None + if len(sys.argv) > 1: + # Start the service if a config file was provided on the command line + service = start_service(sys.argv[1]) for luser in lusers: luser.open("::1", 4000) for test, description in dialogue: @@ -356,6 +464,8 @@ def main(): log = open(logfile, "w") log.write(captures[conversant]) log.close() + if not stop_service(service): + success = False print("\nRan %s tests in %.3f seconds." % (len(dialogue), duration)) if success: print("SUCCESS")