Clean up leading space typo in test title
[mudpy.git] / mudpy / tests / selftest.py
index bc87885..949488d 100644 (file)
@@ -3,11 +3,16 @@
 # terms provided in the LICENSE file distributed with this software.
 
 import os
+import pathlib
 import re
+import shutil
+import subprocess
 import sys
 import telnetlib
 import time
 
+pidfile = "var/mudpy.pid"
+
 test_account0_setup = (
     (0, "Identify yourself:", "luser0"),
     (0, "Enter your choice:", "n"),
@@ -280,7 +285,7 @@ dialogue = (
     (test_account0_setup, "first account setup"),
     (test_account1_setup, "second account setup"),
     (test_actor_appears, "actor spontaneous appearance"),
-    (test_explicit_punctuation, " explicit punctuation"),
+    (test_explicit_punctuation, "explicit punctuation"),
     (test_implicit_punctuation, "implicit punctuation"),
     (test_typo_replacement, "typo replacement"),
     (test_sentence_capitalization, "sentence capitalization"),
@@ -312,11 +317,89 @@ dialogue = (
 )
 
 
+def start_service(config):
+    # Clean up any previously run daemon which didn't terminate
+    if os.path.exists(pidfile):
+        pidfd = open(pidfile)
+        pid = int(pidfd.read())
+        try:
+            # Stop the running service
+            os.kill(pid, 15)
+            time.sleep(1)
+        except ProcessLookupError:
+            # If there was no process, just remove the stale PID file
+            os.remove(pidfile)
+        # If there's a preexisting hung service, we can't proceed
+        assert not os.path.exists(pidfile)
+
+    # Clean up any previous test output
+    for f in pathlib.Path(".").glob("capture_*.log"):
+        # have to use .name here since remove() doesn't support passing a
+        # PosixPath argument until Python3.6
+        os.remove(f.name)
+    for d in ("data", "var"):
+        shutil.rmtree(d, ignore_errors=True)
+    os.mkdir("var")
+
+    # Start the service and wait for it to be ready for connections
+    service = subprocess.Popen(("mudpy", config),
+                               stdout=subprocess.PIPE,
+                               stderr=subprocess.PIPE)
+    time.sleep(1)
+    return(service)
+
+
+def stop_service(service):
+    success = True
+
+    # The no-op case when no service was started
+    if service is None:
+        return(success)
+
+    # This handles when the service is running as a direct child process
+    service.terminate()
+    returncode = service.wait(10)
+    if returncode != 0:
+        print("ERROR: Service exited with code %s." % returncode)
+        success = False
+
+    # This cleans up a daemonized and disassociated service
+    if os.path.exists(pidfile):
+        pidfd = open(pidfile)
+        pid = int(pidfd.read())
+        try:
+            # Stop the running service
+            os.kill(pid, 15)
+            time.sleep(1)
+        except ProcessLookupError:
+            # If there was no process, just remove the stale PID file
+            os.remove(pidfile)
+        # The PID file didn't disappear, so we have a hung service
+        if os.path.exists(pidfile):
+            print("ERROR: Hung daemon with PID %s." % pid)
+            success = False
+
+    # Log the contents of stdout and stderr, if any
+    stdout, stderr = service.communicate()
+    print("Recording stdout as capture_stdout.log.")
+    serviceout = open("capture_stdout.log", "w")
+    serviceout.write(stdout.decode("utf-8"))
+    print("Recording stderr as capture_stderr.log.")
+    serviceerr = open("capture_stderr.log", "w")
+    serviceerr.write(stderr.decode("utf-8"))
+
+    return(success)
+
+
 def main():
     captures = ["", "", ""]
     lusers = [telnetlib.Telnet(), telnetlib.Telnet(), telnetlib.Telnet()]
     success = True
     start = time.time()
+    service = None
+    if len(sys.argv) > 1:
+        # Start the service if a config file was provided on the command line
+        service = start_service(sys.argv[1])
     for luser in lusers:
         luser.open("::1", 4000)
     for test, description in dialogue:
@@ -381,6 +464,8 @@ def main():
         log = open(logfile, "w")
         log.write(captures[conversant])
         log.close()
+    if not stop_service(service):
+        success = False
     print("\nRan %s tests in %.3f seconds." % (len(dialogue), duration))
     if success:
         print("SUCCESS")