-dialogue = (
- (test_account0_setup, "first account setup"),
- (test_account1_setup, "second account setup"),
- (test_actor_appears, "actor spontaneous appearance"),
- (test_explicit_punctuation, " explicit punctuation"),
- (test_implicit_punctuation, "implicit punctuation"),
- (test_typo_replacement, "typo replacement"),
- (test_sentence_capitalization, "sentence capitalization"),
- (test_chat_mode, "chat mode"),
- (test_wrapping, "wrapping"),
- (test_forbid_ansi_input, "raw escape input is filtered"),
- (test_movement, "movement"),
- (test_actor_disappears, "actor spontaneous disappearance"),
- (test_account1_teardown, "second account teardown"),
- (test_admin_setup, "admin account setup"),
- (test_telnet_iac, "escape stray telnet iac bytes"),
- (test_telnet_unknown, "strip unknown telnet command"),
- (test_admin_restriction, "restricted admin commands"),
- (test_admin_help, "admin help"),
- (test_reload, "reload"),
- (test_set_facet, "set facet"),
- (test_set_refused, "refuse altering read-only element"),
- (test_show_version, "show version and diagnostic info"),
- (test_show_files, "show a list of loaded files"),
- (test_show_file, "show nodes from a specific file"),
- (test_show_groups, "show groups"),
- (test_show_group, "show group"),
- (test_show_element, "show element"),
- (test_show_log, "show log"),
- (test_custom_loglevel, "custom loglevel"),
- (test_invalid_loglevel, "invalid loglevel"),
- (test_log_no_errors, "no errors logged"),
-)
+final_cleanup = (
+ (0, "> ", "quit"),
+ (0, r"What would you like to do\?", "p"),
+ (0, r"permanently delete your account\?", "y"),
+ (0, r"Disconnecting\.\.\.", ""),
+ (2, "> ", "quit"),
+ (2, r"What would you like to do\?", "p"),
+ (2, r"permanently delete your account\?", "y"),
+ (2, r"Disconnecting\.\.\.", ""),
+)
+
+dialogue = {
+ test_account0_setup: "first account setup",
+ test_account1_setup: "second account setup",
+ test_actor_appears: "actor spontaneous appearance",
+ test_explicit_punctuation: "explicit punctuation",
+ test_implicit_punctuation: "implicit punctuation",
+ test_typo_replacement: "typo replacement",
+ test_sentence_capitalization: "sentence capitalization",
+ test_chat_mode: "chat mode",
+ test_wrapping: "wrapping",
+ test_forbid_ansi_input: "raw escape input is filtered",
+ test_escape_macros: "replacement macros are escaped",
+ test_movement: "movement",
+ test_actor_disappears: "actor spontaneous disappearance",
+ test_abort_avatar_deletion: "abort avatar deletion",
+ test_avatar_creation_limit: "avatar creation limit",
+ test_avatar_deletion: "avatar deletion",
+ test_abort_account_deletion: "abort account deletion",
+ test_account_deletion: "account deletion",
+ test_admin_setup: "admin account setup",
+ test_preferences: "set and show preferences",
+ test_crlf_eol: "send crlf from the client as eol",
+ test_telnet_iac: "escape stray telnet iac bytes",
+ test_telnet_unknown_command: "strip unknown telnet command",
+ test_telnet_unknown_option: "log unknown telnet option",
+ test_admin_restriction: "restricted admin commands",
+ test_admin_help: "admin help",
+ test_help: "help command",
+ test_abbrev: "command abbreviation",
+ test_reload: "reload",
+ test_set_facet: "set facet",
+ test_set_refused: "refuse altering read-only element",
+ test_show_version: "show version and diagnostic info",
+ test_show_time: "show elapsed world clock increments",
+ test_show_files: "show a list of loaded files",
+ test_show_file: "show nodes from a specific file",
+ test_show_groups: "show groups",
+ test_show_group: "show group",
+ test_show_element: "show element",
+ test_evaluate: "show results of python expressions",
+ test_debug_restricted: "only admins can run debug commands",
+ test_debug_disabled: "debugging commands only in debug mode",
+ test_show_log: "show log",
+ test_custom_loglevel: "custom loglevel",
+ test_invalid_loglevel: "invalid loglevel",
+ test_log_no_errors: "no errors logged",
+ final_cleanup: "delete remaining accounts",
+}
+
+debug_tests = (
+ test_evaluate,
+)
+
+nondebug_tests = (
+ test_debug_disabled,
+)
+
+
+def start_service(config):
+ # Clean up any previously run daemon which didn't terminate
+ if os.path.exists(pidfile):
+ with open(pidfile) as pidfd:
+ pid = int(pidfd.read())
+ try:
+ # Stop the running service
+ os.kill(pid, 15)
+ except ProcessLookupError:
+ # If there was no process, just remove the stale PID file
+ os.remove(pidfile)
+ # If there's a preexisting hung service, we can't proceed
+ assert not os.path.exists(pidfile)
+
+ # Clean up any previous test output
+ for f in pathlib.Path(".").glob("capture_*.log"):
+ os.remove(f)
+ for d in ("data", "var"):
+ shutil.rmtree(d, ignore_errors=True)
+
+ # Start the service and wait for it to be ready for connections
+ service = subprocess.Popen(("mudpy", config),
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ return service
+
+
+def stop_service(service):
+ success = True
+
+ # The no-op case when no service was started
+ if service is None:
+ return success
+
+ # This handles when the service is running as a direct child process
+ service.terminate()
+ returncode = service.wait(10)
+ if returncode != 0:
+ tlog("\nERROR: Service exited with code %s." % returncode)
+ success = False
+
+ # This cleans up a daemonized and disassociated service
+ if os.path.exists(pidfile):
+ with open(pidfile) as pidfd:
+ pid = int(pidfd.read())
+ try:
+ # Stop the running service
+ os.kill(pid, 15)
+ time.sleep(1)
+ except ProcessLookupError:
+ # If there was no process, just remove the stale PID file
+ os.remove(pidfile)
+ # The PID file didn't disappear, so we have a hung service
+ if os.path.exists(pidfile):
+ tlog("\nERROR: Hung daemon with PID %s." % pid)
+ success = False
+
+ # Log the contents of stdout and stderr, if any
+ stdout, stderr = service.communicate()
+ tlog("\nRecording stdout as capture_stdout.log.")
+ with open("capture_stdout.log", "w") as serviceout:
+ serviceout.write(stdout.decode("utf-8"))
+ tlog("\nRecording stderr as capture_stderr.log.")
+ with open("capture_stderr.log", "w") as serviceerr:
+ serviceerr.write(stderr.decode("utf-8"))
+
+ return success
+
+
+def tlog(message, quiet=False):
+ logfile = "capture_tests.log"
+ with open(logfile, "a") as logfd:
+ logfd.write(message + "\n")
+ if not quiet:
+ sys.stdout.write(message)
+ return True
+
+
+def option_callback(telnet_socket, command, option):
+ if option == b'\x7f':
+ # We use this unassigned option value as a canary, so short-circuit
+ # any response to avoid endlessly looping
+ pass
+ elif command in (telnetlib.DO, telnetlib.DONT):
+ telnet_socket.send(telnetlib.IAC + telnetlib.WONT + option)
+ elif command in (telnetlib.WILL, telnetlib.WONT):
+ telnet_socket.send(telnetlib.IAC + telnetlib.DONT + option)
+
+
+def check_debug():
+ if len(sys.argv) > 1:
+ with open(sys.argv[1]) as config_fd:
+ config = yaml.safe_load(config_fd)
+ return config.get(".mudpy.limit.debug", False)
+ return False
+
+
+def connect_client(luser, service):
+ # Try multiple times to connect, with an exponential backoff
+ for retry in range(5):
+ try:
+ # Skipping the retry=0 case gives an immediate first attempt
+ if retry:
+ time.sleep((2 ** retry) / 10)
+ luser.open("::1", 4000)
+ # Attempt to poll the connection, closing if unusable
+ try:
+ luser.fill_rawq()
+ except ConnectionResetError:
+ luser.close()
+ continue
+ # Short-circuit if we get this far, connection is safe to use
+ return luser
+ except ConnectionRefusedError:
+ continue
+ else:
+ # Connection retries have been exhausted, so give up
+ tlog("\nERROR: Client could not connect.\n")
+ stop_service(service)
+ sys.exit(1)