+def start_service(config):
+ # Clean up any previously run daemon which didn't terminate
+ if os.path.exists(pidfile):
+ pidfd = open(pidfile)
+ pid = int(pidfd.read())
+ try:
+ # Stop the running service
+ os.kill(pid, 15)
+ time.sleep(1)
+ except ProcessLookupError:
+ # If there was no process, just remove the stale PID file
+ os.remove(pidfile)
+ # If there's a preexisting hung service, we can't proceed
+ assert not os.path.exists(pidfile)
+
+ # Clean up any previous test output
+ for f in pathlib.Path(".").glob("capture_*.log"):
+ # have to use .name here since remove() doesn't support passing a
+ # PosixPath argument until Python3.6
+ os.remove(f.name)
+ for d in ("data", "var"):
+ shutil.rmtree(d, ignore_errors=True)
+
+ # Start the service and wait for it to be ready for connections
+ service = subprocess.Popen(("mudpy", config),
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ time.sleep(1)
+ return(service)
+
+
+def stop_service(service):
+ success = True
+
+ # The no-op case when no service was started
+ if service is None:
+ return(success)
+
+ # This handles when the service is running as a direct child process
+ service.terminate()
+ returncode = service.wait(10)
+ if returncode != 0:
+ tlog("\nERROR: Service exited with code %s." % returncode)
+ success = False
+
+ # This cleans up a daemonized and disassociated service
+ if os.path.exists(pidfile):
+ pidfd = open(pidfile)
+ pid = int(pidfd.read())
+ try:
+ # Stop the running service
+ os.kill(pid, 15)
+ time.sleep(1)
+ except ProcessLookupError:
+ # If there was no process, just remove the stale PID file
+ os.remove(pidfile)
+ # The PID file didn't disappear, so we have a hung service
+ if os.path.exists(pidfile):
+ tlog("\nERROR: Hung daemon with PID %s." % pid)
+ success = False
+
+ # Log the contents of stdout and stderr, if any
+ stdout, stderr = service.communicate()
+ tlog("\nRecording stdout as capture_stdout.log.")
+ serviceout = open("capture_stdout.log", "w")
+ serviceout.write(stdout.decode("utf-8"))
+ tlog("\nRecording stderr as capture_stderr.log.")
+ serviceerr = open("capture_stderr.log", "w")
+ serviceerr.write(stderr.decode("utf-8"))
+
+ return(success)
+
+
+def tlog(message, quiet=False):
+ logfile = "capture_tests.log"
+ with open(logfile, "a") as logfd:
+ logfd.write(message + "\n")
+ if not quiet:
+ sys.stdout.write(message)
+ return True
+
+
+def option_callback(telnet_socket, command, option):
+ if option == b'\x7f':
+ # We use this unassigned option value as a canary, so short-circuit
+ # any response to avoid endlessly looping
+ pass
+ elif command in (telnetlib.DO, telnetlib.DONT):
+ telnet_socket.send(b"%s%s%s" % (telnetlib.IAC, telnetlib.WONT, option))
+ elif command in (telnetlib.WILL, telnetlib.WONT):
+ telnet_socket.send(b"%s%s%s" % (telnetlib.IAC, telnetlib.DONT, option))
+
+