Use PosixPath arguments with os.remove()
[mudpy.git] / mudpy / tests / selftest.py
index 6e130c3..bb19d33 100644 (file)
@@ -11,6 +11,15 @@ import sys
 import telnetlib
 import time
 
+import yaml
+
+# TODO(fungi) Clean this up once Python 3.5 is no longer supported
+if sys.version < "3.6":
+    import collections
+    odict = collections.OrderedDict
+else:
+    odict = dict
+
 pidfile = "var/mudpy.pid"
 
 test_account0_setup = (
@@ -331,17 +340,32 @@ test_show_element = (
         r'  \x1b\[32mgender: \x1b\[31mfemale.*> ', ""),
 )
 
-test_show_result = (
-    (2, "> ", "show result 12345*67890"),
-    (2, r"\r\n838102050\r\n.*> ", "show result 1/0"),
-    (2, r"Your expression raised an exception.*division by zero.*> ",
-     "show result mudpy"),
-    (2, r"<module 'mudpy' from.*> ", "show result re"),
-    (2, r"Your expression raised an exception.*name 're' is not defined.*> ",
-     "show result universe"),
-    (2, r"<mudpy\.misc\.Universe object at 0x.*> ", "show result actor"),
-    (2, r"Your expression raised an exception.*name 'actor' is not "
-        r"defined.*> ", ""),
+test_evaluate = (
+    (2, "> ", "evaluate 12345*67890"),
+    (2, r"\r\n838102050\r\n.*> ", "evaluate 1/0"),
+    (2, "Your expression raised an exception.*division by zero.*> ",
+     "evaluate mudpy"),
+    (2, "<module 'mudpy' from.*> ", "evaluate re"),
+    (2, "Your expression raised an exception.*name 're' is not defined.*> ",
+     "evaluate universe"),
+    (2, r"<mudpy\.misc\.Universe object at 0x.*> ", "evaluate actor"),
+    (2, "Your expression raised an exception.*name 'actor' is not defined.*> ",
+        "evaluate dir(mudpy)"),
+    (2, "__builtins__.*> ", "evaluate mudpy.__builtins__.open"),
+    (2, "not allowed.*> ", "evaluate (lambda x: x + 1)(2)"),
+    (2, "not allowed.*> ", ""),
+)
+
+test_debug_restricted = (
+    (0, "> ", "help evaluate"),
+    (0, r"That is not an available command\.", "evaluate"),
+    (0, '(not sure what "evaluate" means|Arglebargle, glop-glyf)', ""),
+)
+
+test_debug_disabled = (
+    (2, "> ", "help evaluate"),
+    (2, r"That is not an available command\.", "evaluate"),
+    (2, '(not sure what "evaluate" means|Arglebargle, glop-glyf)', ""),
 )
 
 test_show_log = (
@@ -378,7 +402,7 @@ final_cleanup = (
     (2, r"Disconnecting\.\.\.", ""),
 )
 
-dialogue = (
+dialogue = odict((
     (test_account0_setup, "first account setup"),
     (test_account1_setup, "second account setup"),
     (test_actor_appears, "actor spontaneous appearance"),
@@ -417,20 +441,30 @@ dialogue = (
     (test_show_groups, "show groups"),
     (test_show_group, "show group"),
     (test_show_element, "show element"),
-    (test_show_result, "show result of a python expression"),
+    (test_evaluate, "show results of python expressions"),
+    (test_debug_restricted, "only admins can run debug commands"),
+    (test_debug_disabled, "debugging commands only in debug mode"),
     (test_show_log, "show log"),
     (test_custom_loglevel, "custom loglevel"),
     (test_invalid_loglevel, "invalid loglevel"),
     (test_log_no_errors, "no errors logged"),
     (final_cleanup, "delete remaining accounts"),
+))
+
+debug_tests = (
+    test_evaluate,
+)
+
+nondebug_tests = (
+    test_debug_disabled,
 )
 
 
 def start_service(config):
     # Clean up any previously run daemon which didn't terminate
     if os.path.exists(pidfile):
-        pidfd = open(pidfile)
-        pid = int(pidfd.read())
+        with open(pidfile) as pidfd:
+            pid = int(pidfd.read())
         try:
             # Stop the running service
             os.kill(pid, 15)
@@ -443,9 +477,7 @@ def start_service(config):
 
     # Clean up any previous test output
     for f in pathlib.Path(".").glob("capture_*.log"):
-        # have to use .name here since remove() doesn't support passing a
-        # PosixPath argument until Python3.6
-        os.remove(f.name)
+        os.remove(f)
     for d in ("data", "var"):
         shutil.rmtree(d, ignore_errors=True)
 
@@ -454,7 +486,7 @@ def start_service(config):
                                stdout=subprocess.PIPE,
                                stderr=subprocess.PIPE)
     time.sleep(1)
-    return(service)
+    return service
 
 
 def stop_service(service):
@@ -462,7 +494,7 @@ def stop_service(service):
 
     # The no-op case when no service was started
     if service is None:
-        return(success)
+        return success
 
     # This handles when the service is running as a direct child process
     service.terminate()
@@ -473,8 +505,8 @@ def stop_service(service):
 
     # This cleans up a daemonized and disassociated service
     if os.path.exists(pidfile):
-        pidfd = open(pidfile)
-        pid = int(pidfd.read())
+        with open(pidfile) as pidfd:
+            pid = int(pidfd.read())
         try:
             # Stop the running service
             os.kill(pid, 15)
@@ -490,13 +522,13 @@ def stop_service(service):
     # Log the contents of stdout and stderr, if any
     stdout, stderr = service.communicate()
     tlog("\nRecording stdout as capture_stdout.log.")
-    serviceout = open("capture_stdout.log", "w")
-    serviceout.write(stdout.decode("utf-8"))
+    with open("capture_stdout.log", "w") as serviceout:
+        serviceout.write(stdout.decode("utf-8"))
     tlog("\nRecording stderr as capture_stderr.log.")
-    serviceerr = open("capture_stderr.log", "w")
-    serviceerr.write(stderr.decode("utf-8"))
+    with open("capture_stderr.log", "w") as serviceerr:
+        serviceerr.write(stderr.decode("utf-8"))
 
-    return(success)
+    return success
 
 
 def tlog(message, quiet=False):
@@ -519,6 +551,13 @@ def option_callback(telnet_socket, command, option):
         telnet_socket.send(telnetlib.IAC + telnetlib.DONT + option)
 
 
+def check_debug():
+    if len(sys.argv) > 1:
+        config = yaml.safe_load(open(sys.argv[1]))
+        return config.get(".mudpy.limit.debug", False)
+    return False
+
+
 def main():
     captures = ["", "", ""]
     lusers = [telnetlib.Telnet(), telnetlib.Telnet(), telnetlib.Telnet()]
@@ -534,7 +573,14 @@ def main():
     for luser in lusers:
         luser.open("::1", 4000)
         luser.set_option_negotiation_callback(option_callback)
-    for test, description in dialogue:
+    selected_dialogue = odict(dialogue)
+    if check_debug():
+        for test in nondebug_tests:
+            del selected_dialogue[test]
+    else:
+        for test in debug_tests:
+            del selected_dialogue[test]
+    for test, description in selected_dialogue.items():
         tlog("\nTesting %s..." % description)
         test_start = time.time()
         for conversant, question, answer in test: