X-Git-Url: https://mudpy.org/gitweb?p=mudpy.git;a=blobdiff_plain;f=mudpy%2Ftests%2Fselftest.py;h=3012c617b9b2dca0bb20467f9c83f4873eb777ea;hp=edcefbd8d7c7bbcd2b5f3f234c05c2534821d091;hb=6375b4be5c22662376bfb1534a5ec77506f6402e;hpb=8a903d4a3899982b281ca33ee9c131b2fd9ef04d diff --git a/mudpy/tests/selftest.py b/mudpy/tests/selftest.py index edcefbd..3012c61 100644 --- a/mudpy/tests/selftest.py +++ b/mudpy/tests/selftest.py @@ -158,19 +158,26 @@ test_admin_setup = ( (2, "Whom would you like to awaken?", ""), ) +test_crlf_eol = ( + # Send a CR+LF at the end of the line instead of the default CR+NUL, + # to make sure they're treated the same + (2, "> ", b"say I use CR+LF as my EOL, not CR+NUL.\r\n"), + (2, r'You say, "I use CR\+LF as my EOL, not CR\+NUL\.".*> ', ""), +) + test_telnet_iac = ( # Send a double (escaped) IAC byte within other text, which should get # unescaped and deduplicated to a single \xff in the buffer and then # the line of input discarded as a non-ASCII sequence - (2, "> ", b"say argle\xff\xffbargle\r\n"), + (2, "> ", b"say argle\xff\xffbargle\r\0"), (2, r"Non-ASCII characters from admin: b'say argle\\xffbargle'.*> ", ""), ) test_telnet_unknown = ( # Send an unsupported negotiation command #127 which should get filtered # from the line of input - (2, "> ", b"say glop\xff\x7fglyf\r\n"), - (2, r'Unknown Telnet IAC command 127 ignored\..*"Glopglyf\.".*> ', ""), + (2, "> ", b"say glop\xff\x7fglyf\r\0"), + (2, r'Ignored unknown command 127 from admin\..*"Glopglyf\.".*> ', ""), ) test_admin_restriction = ( @@ -285,7 +292,7 @@ dialogue = ( (test_account0_setup, "first account setup"), (test_account1_setup, "second account setup"), (test_actor_appears, "actor spontaneous appearance"), - (test_explicit_punctuation, " explicit punctuation"), + (test_explicit_punctuation, "explicit punctuation"), (test_implicit_punctuation, "implicit punctuation"), (test_typo_replacement, "typo replacement"), (test_sentence_capitalization, "sentence capitalization"), @@ -296,6 +303,7 @@ dialogue = ( (test_actor_disappears, "actor spontaneous disappearance"), (test_account1_teardown, "second account teardown"), (test_admin_setup, "admin account setup"), + (test_crlf_eol, "send crlf from the client as eol"), (test_telnet_iac, "escape stray telnet iac bytes"), (test_telnet_unknown, "strip unknown telnet command"), (test_admin_restriction, "restricted admin commands"), @@ -360,7 +368,7 @@ def stop_service(service): service.terminate() returncode = service.wait(10) if returncode != 0: - print("ERROR: Service exited with code %s." % returncode) + tlog("\nERROR: Service exited with code %s." % returncode) success = False # This cleans up a daemonized and disassociated service @@ -376,21 +384,30 @@ def stop_service(service): os.remove(pidfile) # The PID file didn't disappear, so we have a hung service if os.path.exists(pidfile): - print("ERROR: Hung daemon with PID %s." % pid) + tlog("\nERROR: Hung daemon with PID %s." % pid) success = False # Log the contents of stdout and stderr, if any stdout, stderr = service.communicate() - print("Recording stdout as capture_stdout.log.") + tlog("\nRecording stdout as capture_stdout.log.") serviceout = open("capture_stdout.log", "w") serviceout.write(stdout.decode("utf-8")) - print("Recording stderr as capture_stderr.log.") + tlog("\nRecording stderr as capture_stderr.log.") serviceerr = open("capture_stderr.log", "w") serviceerr.write(stderr.decode("utf-8")) return(success) +def tlog(message, quiet=False): + logfile = "capture_tests.log" + with open(logfile, "a") as logfd: + logfd.write(message + "\n") + if not quiet: + sys.stdout.write(message) + return True + + def main(): captures = ["", "", ""] lusers = [telnetlib.Telnet(), telnetlib.Telnet(), telnetlib.Telnet()] @@ -403,23 +420,24 @@ def main(): for luser in lusers: luser.open("::1", 4000) for test, description in dialogue: - print("\nTesting %s..." % description) + tlog("\nTesting %s..." % description) test_start = time.time() for conversant, question, answer in test: - print("luser%s waiting for: %s" % (conversant, question)) + tlog("luser%s waiting for: %s" % (conversant, question), + quiet=True) try: index, match, received = lusers[conversant].expect( [re.compile(question.encode("utf-8"), flags=re.DOTALL)], 5) captures[conversant] += received.decode("utf-8") except ConnectionResetError: - print("ERROR: Unable to connect to server.") + tlog("\nERROR: Unable to connect to server.") success = False break except EOFError: - print("ERROR: luser%s premature disconnection expecting:\n\n" - "%s\n\n" - "Check the end of capture_%s.log for received data." - % (conversant, question, conversant)) + tlog("\nERROR: luser%s premature disconnection expecting:\n\n" + "%s\n\n" + "Check the end of capture_%s.log for received data." + % (conversant, question, conversant)) success = False break try: @@ -428,30 +446,30 @@ def main(): except Exception: pass if index is not 0: - print("ERROR: luser%s did not receive expected string:\n\n" - "%s\n\n" - "Check the end of capture_%s.log for received data." - % (conversant, question, conversant)) + tlog("\nERROR: luser%s did not receive expected string:\n\n" + "%s\n\n" + "Check the end of capture_%s.log for received data." + % (conversant, question, conversant)) success = False break if type(answer) is str: - print("luser%s sending: %s" % (conversant, answer)) - lusers[conversant].write(("%s\r\n" % answer).encode("utf-8")) + tlog("luser%s sending: %s" % (conversant, answer), quiet=True) + lusers[conversant].write(("%s\r\0" % answer).encode("utf-8")) captures[conversant] += "%s\r\n" % answer elif type(answer) is bytes: - print("luser%s sending raw bytes: %s" % (conversant, answer)) + tlog("luser%s sending raw bytes: %s" % (conversant, answer), + quiet=True) lusers[conversant].get_socket().send(answer) captures[conversant] += "!!!RAW BYTES: %s" % answer else: - print("ERROR: answer provided with unsupported type %s" - % type(answer)) + tlog("\nERROR: answer provided with unsupported type %s" + % type(answer)) success = False break if not success: break - print("Completed in %.3f seconds." % (time.time() - test_start)) + tlog("Completed in %.3f seconds." % (time.time() - test_start)) duration = time.time() - start - print("") for conversant in range(len(captures)): try: captures[conversant] += lusers[ @@ -460,17 +478,17 @@ def main(): pass lusers[conversant].close() logfile = "capture_%s.log" % conversant - print("Recording session %s as %s." % (conversant, logfile)) + tlog("\nRecording session %s as %s." % (conversant, logfile)) log = open(logfile, "w") log.write(captures[conversant]) log.close() if not stop_service(service): success = False - print("\nRan %s tests in %.3f seconds." % (len(dialogue), duration)) + tlog("\nRan %s tests in %.3f seconds." % (len(dialogue), duration)) if success: - print("SUCCESS") + tlog("\nSUCCESS\n") else: - print("FAILURE") + tlog("\nFAILURE\n") sys.exit(1)