+final_cleanup = (
+ (0, "> ", "quit"),
+ (0, r"What would you like to do\?", "p"),
+ (0, r"permanently delete your account\?", "y"),
+ (0, r"Disconnecting\.\.\.", ""),
+ (2, "> ", "quit"),
+ (2, r"What would you like to do\?", "p"),
+ (2, r"permanently delete your account\?", "y"),
+ (2, r"Disconnecting\.\.\.", ""),
+)
+
+dialogue = {
+ test_account0_setup: "first account setup",
+ test_account1_setup: "second account setup",
+ test_actor_appears: "actor spontaneous appearance",
+ test_explicit_punctuation: "explicit punctuation",
+ test_implicit_punctuation: "implicit punctuation",
+ test_typo_replacement: "typo replacement",
+ test_sentence_capitalization: "sentence capitalization",
+ test_chat_mode: "chat mode",
+ test_wrapping: "wrapping",
+ test_forbid_ansi_input: "raw escape input is filtered",
+ test_escape_macros: "replacement macros are escaped",
+ test_movement: "movement",
+ test_actor_disappears: "actor spontaneous disappearance",
+ test_abort_avatar_deletion: "abort avatar deletion",
+ test_avatar_creation_limit: "avatar creation limit",
+ test_avatar_deletion: "avatar deletion",
+ test_abort_account_deletion: "abort account deletion",
+ test_account_deletion: "account deletion",
+ test_admin_setup: "admin account setup",
+ test_preferences: "set and show preferences",
+ test_crlf_eol: "send crlf from the client as eol",
+ test_telnet_iac: "escape stray telnet iac bytes",
+ test_telnet_unknown_command: "strip unknown telnet command",
+ test_telnet_unknown_option: "log unknown telnet option",
+ test_admin_restriction: "restricted admin commands",
+ test_admin_help: "admin help",
+ test_help: "help command",
+ test_abbrev: "command abbreviation",
+ test_reload: "reload",
+ test_set_facet: "set facet",
+ test_set_refused: "refuse altering read-only element",
+ test_show_version: "show version and diagnostic info",
+ test_show_time: "show elapsed world clock increments",
+ test_show_files: "show a list of loaded files",
+ test_show_file: "show nodes from a specific file",
+ test_show_groups: "show groups",
+ test_show_group: "show group",
+ test_show_element: "show element",
+ test_evaluate: "show results of python expressions",
+ test_debug_restricted: "only admins can run debug commands",
+ test_debug_disabled: "debugging commands only in debug mode",
+ test_show_log: "show log",
+ test_custom_loglevel: "custom loglevel",
+ test_invalid_loglevel: "invalid loglevel",
+ test_log_no_errors: "no errors logged",
+ final_cleanup: "delete remaining accounts",
+}
+
+debug_tests = (
+ test_evaluate,
+)
+
+nondebug_tests = (
+ test_debug_disabled,
+)
+
+
+def start_service(config):
+ # Clean up any previously run daemon which didn't terminate
+ if os.path.exists(pidfile):
+ with open(pidfile) as pidfd:
+ pid = int(pidfd.read())
+ try:
+ # Stop the running service
+ os.kill(pid, 15)
+ except ProcessLookupError:
+ # If there was no process, just remove the stale PID file
+ os.remove(pidfile)
+ # If there's a preexisting hung service, we can't proceed
+ assert not os.path.exists(pidfile)
+
+ # Clean up any previous test output
+ for f in pathlib.Path(".").glob("capture_*.log"):
+ os.remove(f)
+ for d in ("data", "var"):
+ shutil.rmtree(d, ignore_errors=True)
+
+ # Start the service and wait for it to be ready for connections
+ service = subprocess.Popen(("mudpy", config),
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ return service
+
+
+def stop_service(service):
+ success = True
+
+ # The no-op case when no service was started
+ if service is None:
+ return success
+
+ # This handles when the service is running as a direct child process
+ service.terminate()
+ returncode = service.wait(10)
+ if returncode != 0:
+ tlog("\nERROR: Service exited with code %s." % returncode)
+ success = False
+
+ # This cleans up a daemonized and disassociated service
+ if os.path.exists(pidfile):
+ with open(pidfile) as pidfd:
+ pid = int(pidfd.read())
+ try:
+ # Stop the running service
+ os.kill(pid, 15)
+ time.sleep(1)
+ except ProcessLookupError:
+ # If there was no process, just remove the stale PID file
+ os.remove(pidfile)
+ # The PID file didn't disappear, so we have a hung service
+ if os.path.exists(pidfile):
+ tlog("\nERROR: Hung daemon with PID %s." % pid)
+ success = False
+
+ # Log the contents of stdout and stderr, if any
+ stdout, stderr = service.communicate()
+ tlog("\nRecording stdout as capture_stdout.log.")
+ with open("capture_stdout.log", "w") as serviceout:
+ serviceout.write(stdout.decode("utf-8"))
+ tlog("\nRecording stderr as capture_stderr.log.")
+ with open("capture_stderr.log", "w") as serviceerr:
+ serviceerr.write(stderr.decode("utf-8"))
+
+ return success
+
+
+def tlog(message, quiet=False):
+ logfile = "capture_tests.log"
+ with open(logfile, "a") as logfd:
+ logfd.write(message + "\n")
+ if not quiet:
+ sys.stdout.write(message)
+ return True
+
+
+def option_callback(telnet_socket, command, option):
+ if option == b'\x7f':
+ # We use this unassigned option value as a canary, so short-circuit
+ # any response to avoid endlessly looping
+ pass
+ elif command in (telnetlib.DO, telnetlib.DONT):
+ telnet_socket.send(telnetlib.IAC + telnetlib.WONT + option)
+ elif command in (telnetlib.WILL, telnetlib.WONT):
+ telnet_socket.send(telnetlib.IAC + telnetlib.DONT + option)
+
+
+def check_debug():
+ if len(sys.argv) > 1:
+ with open(sys.argv[1]) as config_fd:
+ config = yaml.safe_load(config_fd)
+ return config.get(".mudpy.limit.debug", False)
+ return False
+
+
+def connect_client(luser, service):
+ # Try multiple times to connect, with an exponential backoff
+ for retry in range(5):
+ try:
+ # Skipping the retry=0 case gives an immediate first attempt
+ if retry:
+ time.sleep((2 ** retry) / 10)
+ luser.open("::1", 4000)
+ # Attempt to poll the connection, closing if unusable
+ try:
+ luser.fill_rawq()
+ except ConnectionResetError:
+ luser.close()
+ continue
+ # Short-circuit if we get this far, connection is safe to use
+ return luser
+ except ConnectionRefusedError:
+ continue
+ else:
+ # Connection retries have been exhausted, so give up
+ tlog("\nERROR: Client could not connect.\n")
+ stop_service(service)
+ sys.exit(1)
+