-# Copyright (c) 2004-2017 Jeremy Stanley <fungi@yuggoth.org>. Permission
+# Copyright (c) 2004-2018 Jeremy Stanley <fungi@yuggoth.org>. Permission
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
import os
+import pathlib
import re
+import shutil
+import subprocess
import sys
import telnetlib
import time
+pidfile = "var/mudpy.pid"
+
test_account0_setup = (
(0, "Identify yourself:", "luser0"),
(0, "Enter your choice:", "n"),
(0, r'says, "Now less chatty\."', ""),
)
+test_wrapping = (
+ (0, '> ', "say " + 100 * "o"),
+ (1, r'says,\r\n"O[o]+\."', ""),
+)
+
+test_forbid_ansi_input = (
+ (0, '> ', "say \x1b[35mfoo\x1b[0m"),
+ (1, r'says, "\[35mfoo\[0m\."', ""),
+)
+
test_movement = (
(0, "> ", "move north"),
(0, r"You exit to the north\.", ""),
(2, "Whom would you like to awaken?", ""),
)
+test_crlf_eol = (
+ # Send a CR+LF at the end of the line instead of the default CR+NUL,
+ # to make sure they're treated the same
+ (2, "> ", b"say I use CR+LF as my EOL, not CR+NUL.\r\n"),
+ (2, r'You say, "I use CR\+LF as my EOL, not CR\+NUL\.".*> ', ""),
+)
+
+test_telnet_iac = (
+ # Send a double (escaped) IAC byte within other text, which should get
+ # unescaped and deduplicated to a single \xff in the buffer and then
+ # the line of input discarded as a non-ASCII sequence
+ (2, "> ", b"say argle\xff\xffbargle\r\0"),
+ (2, r"Non-ASCII characters from admin: b'say argle\\xffbargle'.*> ", ""),
+)
+
+test_telnet_unknown = (
+ # Send an unsupported negotiation command #127 which should get filtered
+ # from the line of input
+ (2, "> ", b"say glop\xff\x7fglyf\r\0"),
+ (2, r'Ignored unknown command 127 from admin\..*"Glopglyf\.".*> ', ""),
+)
+
test_admin_restriction = (
(0, "> ", "help halt"),
(0, r"That is not an available command\.", "halt"),
test_reload = (
(2, "> ", "reload"),
(2, r"Reloading all code modules, configs and data\."
- r".* User admin reloaded the world\.", ""),
+ r".* User admin reloaded the world\.",
+ "show element account.admin"),
+ (2, 'These are the properties of the "account.admin" element.*'
+ r' \x1b\[32mpasshash:\r\n\x1b\[31m\$.*> ', ""),
)
test_set_facet = (
(2, r'The "mudpy\.limit" element is kept in read-only file', ""),
)
+test_show_version = (
+ (2, "> ", "show version"),
+ (2, r"Running mudpy .* on .* Python 3.*with.*pyyaml.*> ", ""),
+)
+
test_show_files = (
(2, "> ", "show files"),
(2, r'These are the current files containing the universe:.*'
test_show_file = (
(2, "> ", "show file %s" %
os.path.join(os.getcwd(), "data/internal.yaml")),
- (2, "These are the nodes in the.*file:.*internal:counters.*> ", ""),
+ (2, r'These are the nodes in the.*file:.*internal\.counters.*> ', ""),
+)
+
+test_show_groups = (
+ (2, "> ", "show groups"),
+ (2, r'These are the element groups:.*'
+ r' \x1b\[32maccount\x1b\[0m.*> ', ""),
+)
+
+test_show_group = (
+ (2, "> ", "show group account"),
+ (2, r'These are the elements in the "account" group:.*'
+ r' \x1b\[32maccount\.admin\x1b\[0m.*> ', ""),
)
test_show_element = (
(2, r"None of the [0-9]+ lines in memory matches your request\.", ""),
)
+final_cleanup = (
+ (0, "> ", "quit"),
+ (0, "What would you like to do?", "d"),
+ (0, "Whom would you like to delete?", ""),
+ (0, "What would you like to do?", "p"),
+ (0, "permanently delete your account?", "y"),
+ (0, "Disconnecting...", ""),
+ (2, "> ", "quit"),
+ (2, "What would you like to do?", "d"),
+ (2, "Whom would you like to delete?", ""),
+ (2, "What would you like to do?", "p"),
+ (2, "permanently delete your account?", "y"),
+ (2, "Disconnecting...", ""),
+)
+
dialogue = (
(test_account0_setup, "first account setup"),
(test_account1_setup, "second account setup"),
(test_actor_appears, "actor spontaneous appearance"),
- (test_explicit_punctuation, " explicit punctuation"),
+ (test_explicit_punctuation, "explicit punctuation"),
(test_implicit_punctuation, "implicit punctuation"),
(test_typo_replacement, "typo replacement"),
(test_sentence_capitalization, "sentence capitalization"),
(test_chat_mode, "chat mode"),
+ (test_wrapping, "wrapping"),
+ (test_forbid_ansi_input, "raw escape input is filtered"),
(test_movement, "movement"),
(test_actor_disappears, "actor spontaneous disappearance"),
(test_account1_teardown, "second account teardown"),
(test_admin_setup, "admin account setup"),
+ (test_crlf_eol, "send crlf from the client as eol"),
+ (test_telnet_iac, "escape stray telnet iac bytes"),
+ (test_telnet_unknown, "strip unknown telnet command"),
(test_admin_restriction, "restricted admin commands"),
(test_admin_help, "admin help"),
(test_reload, "reload"),
(test_set_facet, "set facet"),
(test_set_refused, "refuse altering read-only element"),
+ (test_show_version, "show version and diagnostic info"),
(test_show_files, "show a list of loaded files"),
(test_show_file, "show nodes from a specific file"),
+ (test_show_groups, "show groups"),
+ (test_show_group, "show group"),
(test_show_element, "show element"),
(test_show_log, "show log"),
(test_custom_loglevel, "custom loglevel"),
(test_invalid_loglevel, "invalid loglevel"),
(test_log_no_errors, "no errors logged"),
+ (final_cleanup, "delete remaining accounts"),
)
+def start_service(config):
+ # Clean up any previously run daemon which didn't terminate
+ if os.path.exists(pidfile):
+ pidfd = open(pidfile)
+ pid = int(pidfd.read())
+ try:
+ # Stop the running service
+ os.kill(pid, 15)
+ time.sleep(1)
+ except ProcessLookupError:
+ # If there was no process, just remove the stale PID file
+ os.remove(pidfile)
+ # If there's a preexisting hung service, we can't proceed
+ assert not os.path.exists(pidfile)
+
+ # Clean up any previous test output
+ for f in pathlib.Path(".").glob("capture_*.log"):
+ # have to use .name here since remove() doesn't support passing a
+ # PosixPath argument until Python3.6
+ os.remove(f.name)
+ for d in ("data", "var"):
+ shutil.rmtree(d, ignore_errors=True)
+ os.mkdir("var")
+
+ # Start the service and wait for it to be ready for connections
+ service = subprocess.Popen(("mudpy", config),
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ time.sleep(1)
+ return(service)
+
+
+def stop_service(service):
+ success = True
+
+ # The no-op case when no service was started
+ if service is None:
+ return(success)
+
+ # This handles when the service is running as a direct child process
+ service.terminate()
+ returncode = service.wait(10)
+ if returncode != 0:
+ tlog("\nERROR: Service exited with code %s." % returncode)
+ success = False
+
+ # This cleans up a daemonized and disassociated service
+ if os.path.exists(pidfile):
+ pidfd = open(pidfile)
+ pid = int(pidfd.read())
+ try:
+ # Stop the running service
+ os.kill(pid, 15)
+ time.sleep(1)
+ except ProcessLookupError:
+ # If there was no process, just remove the stale PID file
+ os.remove(pidfile)
+ # The PID file didn't disappear, so we have a hung service
+ if os.path.exists(pidfile):
+ tlog("\nERROR: Hung daemon with PID %s." % pid)
+ success = False
+
+ # Log the contents of stdout and stderr, if any
+ stdout, stderr = service.communicate()
+ tlog("\nRecording stdout as capture_stdout.log.")
+ serviceout = open("capture_stdout.log", "w")
+ serviceout.write(stdout.decode("utf-8"))
+ tlog("\nRecording stderr as capture_stderr.log.")
+ serviceerr = open("capture_stderr.log", "w")
+ serviceerr.write(stderr.decode("utf-8"))
+
+ return(success)
+
+
+def tlog(message, quiet=False):
+ logfile = "capture_tests.log"
+ with open(logfile, "a") as logfd:
+ logfd.write(message + "\n")
+ if not quiet:
+ sys.stdout.write(message)
+ return True
+
+
def main():
captures = ["", "", ""]
lusers = [telnetlib.Telnet(), telnetlib.Telnet(), telnetlib.Telnet()]
success = True
start = time.time()
+ service = None
+ if len(sys.argv) > 1:
+ # Start the service if a config file was provided on the command line
+ service = start_service(sys.argv[1])
for luser in lusers:
luser.open("::1", 4000)
for test, description in dialogue:
- print("\nTesting %s..." % description)
+ tlog("\nTesting %s..." % description)
test_start = time.time()
for conversant, question, answer in test:
- print("luser%s waiting for: %s" % (conversant, question))
+ tlog("luser%s waiting for: %s" % (conversant, question),
+ quiet=True)
try:
index, match, received = lusers[conversant].expect(
[re.compile(question.encode("utf-8"), flags=re.DOTALL)], 5)
captures[conversant] += received.decode("utf-8")
except ConnectionResetError:
- print("ERROR: Unable to connect to server.")
+ tlog("\nERROR: Unable to connect to server.")
success = False
break
except EOFError:
- print("ERROR: luser%s premature disconnection expecting:\n\n"
- "%s\n\n"
- "Check the end of capture_%s.log for received data."
- % (conversant, question, conversant))
+ tlog("\nERROR: luser%s premature disconnection expecting:\n\n"
+ "%s\n\n"
+ "Check the end of capture_%s.log for received data."
+ % (conversant, question, conversant))
success = False
break
try:
except Exception:
pass
if index is not 0:
- print("ERROR: luser%s did not receive expected string:\n\n"
- "%s\n\n"
- "Check the end of capture_%s.log for received data."
- % (conversant, question, conversant))
+ tlog("\nERROR: luser%s did not receive expected string:\n\n"
+ "%s\n\n"
+ "Check the end of capture_%s.log for received data."
+ % (conversant, question, conversant))
+ success = False
+ break
+ if type(answer) is str:
+ tlog("luser%s sending: %s" % (conversant, answer), quiet=True)
+ lusers[conversant].write(("%s\r\0" % answer).encode("utf-8"))
+ captures[conversant] += "%s\r\n" % answer
+ elif type(answer) is bytes:
+ tlog("luser%s sending raw bytes: %s" % (conversant, answer),
+ quiet=True)
+ lusers[conversant].get_socket().send(answer)
+ captures[conversant] += "!!!RAW BYTES: %s" % answer
+ else:
+ tlog("\nERROR: answer provided with unsupported type %s"
+ % type(answer))
success = False
break
- print("luser%s sending: %s" % (conversant, answer))
- lusers[conversant].write(("%s\r\n" % answer).encode("utf-8"))
- captures[conversant] += "%s\r\n" % answer
if not success:
break
- print("Completed in %.3f seconds." % (time.time() - test_start))
+ tlog("Completed in %.3f seconds." % (time.time() - test_start))
duration = time.time() - start
- print("")
for conversant in range(len(captures)):
try:
captures[conversant] += lusers[
pass
lusers[conversant].close()
logfile = "capture_%s.log" % conversant
- print("Recording session %s as %s." % (conversant, logfile))
+ tlog("\nRecording session %s as %s." % (conversant, logfile))
log = open(logfile, "w")
log.write(captures[conversant])
log.close()
- print("\nRan %s tests in %.3f seconds." % (len(dialogue), duration))
+ if not stop_service(service):
+ success = False
+ tlog("\nRan %s tests in %.3f seconds." % (len(dialogue), duration))
if success:
- print("SUCCESS")
+ tlog("\nSUCCESS\n")
else:
- print("FAILURE")
+ tlog("\nFAILURE\n")
sys.exit(1)