X-Git-Url: https://mudpy.org/gitweb?p=mudpy.git;a=blobdiff_plain;f=mudpy%2Ftests%2Fselftest.py;h=766f01d044cdc6760b540dc6a6fd58b2dd7e153a;hp=d8f51063621aaac345d6ad6eb44c51603006f006;hb=0c50d321bc2ecb103c9e80896f6d79e77f1a72b2;hpb=87f655b9fd09c77b168f90139239f88fad7576eb diff --git a/mudpy/tests/selftest.py b/mudpy/tests/selftest.py index d8f5106..766f01d 100644 --- a/mudpy/tests/selftest.py +++ b/mudpy/tests/selftest.py @@ -1,7 +1,8 @@ -# Copyright (c) 2004-2017 Jeremy Stanley . Permission +# Copyright (c) 2004-2018 Jeremy Stanley . Permission # to use, copy, modify, and distribute this software is granted under # terms provided in the LICENSE file distributed with this software. +import os import re import sys import telnetlib @@ -142,6 +143,14 @@ test_admin_setup = ( (2, "Whom would you like to awaken?", ""), ) +test_telnet_iac = ( + # Send a double (escaped) IAC byte within other text, which should get + # unescaped and deduplicated to a single \xff in the buffer and then + # the line of input discarded as a non-UTF-8 sequence + (2, "> ", b"say argle\xff\xffbargle\r\n"), + (2, r"Non-UTF-8 sequence from admin: b'say argle\\xffbargle'.*> ", ""), +) + test_admin_restriction = ( (0, "> ", "help halt"), (0, r"That is not an available command\.", "halt"), @@ -154,11 +163,54 @@ test_admin_help = ( (2, "This will save all active accounts", ""), ) +test_reload = ( + (2, "> ", "reload"), + (2, r"Reloading all code modules, configs and data\." + r".* User admin reloaded the world\.", ""), +) + +test_set_facet = ( + (2, "> ", "set actor.avatar_admin_0 gender female"), + (2, r'You have successfully \(re\)set the "gender" facet of element', ""), +) + +test_set_refused = ( + (2, "> ", "set mudpy.limit password_tries 10"), + (2, r'The "mudpy\.limit" element is kept in read-only file', ""), +) + +test_show_files = ( + (2, "> ", "show files"), + (2, r'These are the current files containing the universe:.*' + r' \x1b\[31m\(rw\) \x1b\[32m/.*/account.yaml\x1b\[0m' + r' \x1b\[33m\[private\]\x1b\[0m.*> ', ""), +) + +test_show_file = ( + (2, "> ", "show file %s" % + os.path.join(os.getcwd(), "data/internal.yaml")), + (2, r'These are the nodes in the.*file:.*internal\.counters.*> ', ""), +) + +test_show_groups = ( + (2, "> ", "show groups"), + (2, r'These are the element groups:.*' + r' \x1b\[32maccount\x1b\[0m.*> ', ""), +) + +test_show_group = ( + (2, "> ", "show group account"), + (2, r'These are the elements in the "account" group:.*' + r' \x1b\[32maccount\.admin\x1b\[0m.*> ', ""), +) + test_show_element = ( - (2, "> ", "show element internal:counters"), - (2, r'These are the properties of the "internal:counters" element ' - r'\(in.*data/internal\.yaml"\):.* \x1b\[32melapsed: ' - r'\x1b\[31m[0-9]+\x1b\[0m', ""), + (2, "> ", "show element mudpy.limit"), + (2, r'These are the properties of the "mudpy\.limit" element.*' + r' \x1b\[32mpassword_tries: \x1b\[31m3.*> ', + "show element actor.avatar_admin_0"), + (2, r'These are the properties of the "actor.avatar_admin_0" element.*' + r' \x1b\[32mgender: \x1b\[31mfemale.*> ', ""), ) test_show_log = ( @@ -168,14 +220,14 @@ test_show_log = ( ) test_custom_loglevel = ( - (2, "> ", "set account:admin loglevel 2"), + (2, "> ", "set account.admin loglevel 2"), (2, "You have successfully .*> ", "show log"), (2, r"There are [0-9]+ log lines in memory and [0-9]+ at or above level " r"[0-9]+\. The matching lines\r\nfrom [0-9]+ to [0-9]+ are:", ""), ) test_invalid_loglevel = ( - (2, "> ", "set account:admin loglevel two"), + (2, "> ", "set account.admin loglevel two"), (2, r'''Value "two" of type "" cannot be coerced .*> ''', ""), ) @@ -197,8 +249,16 @@ dialogue = ( (test_actor_disappears, "actor spontaneous disappearance"), (test_account1_teardown, "second account teardown"), (test_admin_setup, "admin account setup"), + (test_telnet_iac, "escape stray telnet iac bytes"), (test_admin_restriction, "restricted admin commands"), (test_admin_help, "admin help"), + (test_reload, "reload"), + (test_set_facet, "set facet"), + (test_set_refused, "refuse altering read-only element"), + (test_show_files, "show a list of loaded files"), + (test_show_file, "show nodes from a specific file"), + (test_show_groups, "show groups"), + (test_show_group, "show group"), (test_show_element, "show element"), (test_show_log, "show log"), (test_custom_loglevel, "custom loglevel"), @@ -213,19 +273,31 @@ def main(): success = True start = time.time() for luser in lusers: - luser.open("::1", 6669) + luser.open("::1", 4000) for test, description in dialogue: print("\nTesting %s..." % description) test_start = time.time() for conversant, question, answer in test: print("luser%s waiting for: %s" % (conversant, question)) - index, match, received = lusers[conversant].expect( - [re.compile(question.encode("utf-8"), flags=re.DOTALL)], 5) - captures[conversant] += received.decode("utf-8") + try: + index, match, received = lusers[conversant].expect( + [re.compile(question.encode("utf-8"), flags=re.DOTALL)], 5) + captures[conversant] += received.decode("utf-8") + except ConnectionResetError: + print("ERROR: Unable to connect to server.") + success = False + break + except EOFError: + print("ERROR: luser%s premature disconnection expecting:\n\n" + "%s\n\n" + "Check the end of capture_%s.log for received data." + % (conversant, question, conversant)) + success = False + break try: captures[conversant] += lusers[ conversant].read_very_eager().decode("utf-8") - except: + except Exception: pass if index is not 0: print("ERROR: luser%s did not receive expected string:\n\n" @@ -234,9 +306,19 @@ def main(): % (conversant, question, conversant)) success = False break - print("luser%s sending: %s" % (conversant, answer)) - lusers[conversant].write(("%s\r\n" % answer).encode("utf-8")) - captures[conversant] += "%s\r\n" % answer + if type(answer) is str: + print("luser%s sending: %s" % (conversant, answer)) + lusers[conversant].write(("%s\r\n" % answer).encode("utf-8")) + captures[conversant] += "%s\r\n" % answer + elif type(answer) is bytes: + print("luser%s sending raw bytes: %s" % (conversant, answer)) + lusers[conversant].get_socket().send(answer) + captures[conversant] += "!!!RAW BYTES: %s" % answer + else: + print("ERROR: answer provided with unsupported type %s" + % type(answer)) + success = False + break if not success: break print("Completed in %.3f seconds." % (time.time() - test_start)) @@ -246,7 +328,7 @@ def main(): try: captures[conversant] += lusers[ conversant].read_very_eager().decode("utf-8") - except: + except Exception: pass lusers[conversant].close() logfile = "capture_%s.log" % conversant