-# Copyright (c) 2004-2019 mudpy authors. Permission to use, copy,
+# Copyright (c) 2004-2020 mudpy authors. Permission to use, copy,
# modify, and distribute this software is granted under terms
# provided in the LICENSE file distributed with this software.
import telnetlib
import time
+import yaml
+
+# TODO(fungi) Clean this up once Python 3.5 is no longer supported
+if sys.version < "3.6":
+ import collections
+ odict = collections.OrderedDict
+else:
+ odict = dict
+
pidfile = "var/mudpy.pid"
test_account0_setup = (
r' \x1b\[32mgender: \x1b\[31mfemale.*> ', ""),
)
-test_show_result = (
- (2, "> ", "show result 12345*67890"),
- (2, r"\r\n838102050\r\n.*> ", "show result 1/0"),
- (2, r"Your expression raised an exception.*division by zero.*> ",
- "show result mudpy"),
- (2, r"<module 'mudpy' from .*> ", "show result re"),
- (2, r"Your expression raised an exception.*name 're' is not defined.*> ",
- "show result universe"),
- (2, r"<mudpy\.misc\.Universe object at 0x.*> ", "show result actor"),
- (2, r"Your expression raised an exception.*name 'actor' is not "
- r"defined.*> ", ""),
+test_evaluate = (
+ (2, "> ", "evaluate 12345*67890"),
+ (2, r"\r\n838102050\r\n.*> ", "evaluate 1/0"),
+ (2, "Your expression raised an exception.*division by zero.*> ",
+ "evaluate mudpy"),
+ (2, "<module 'mudpy' from.*> ", "evaluate re"),
+ (2, "Your expression raised an exception.*name 're' is not defined.*> ",
+ "evaluate universe"),
+ (2, r"<mudpy\.misc\.Universe object at 0x.*> ", "evaluate actor"),
+ (2, "Your expression raised an exception.*name 'actor' is not defined.*> ",
+ "evaluate dir(mudpy)"),
+ (2, "__builtins__.*> ", "evaluate mudpy.__builtins__.open"),
+ (2, "not allowed.*> ", "evaluate (lambda x: x + 1)(2)"),
+ (2, "not allowed.*> ", ""),
+)
+
+test_debug_restricted = (
+ (0, "> ", "help evaluate"),
+ (0, r"That is not an available command\.", "evaluate"),
+ (0, '(not sure what "evaluate" means|Arglebargle, glop-glyf)', ""),
+)
+
+test_debug_disabled = (
+ (2, "> ", "help evaluate"),
+ (2, r"That is not an available command\.", "evaluate"),
+ (2, '(not sure what "evaluate" means|Arglebargle, glop-glyf)', ""),
)
test_show_log = (
(2, "> ", "show log"),
(2, r"There are [0-9]+ log lines in memory and [0-9]+ at or above level "
- r"[0-9]+\. The matching lines\r\nfrom [0-9]+ to [0-9]+ are:", ""),
+ r"[0-9]+\. The matching.*from [0-9]+ to [0-9]+ are:", ""),
)
test_custom_loglevel = (
(2, "> ", "set account.admin loglevel 2"),
(2, "You have successfully .*> ", "show log"),
(2, r"There are [0-9]+ log lines in memory and [0-9]+ at or above level "
- r"[0-9]+\. The matching lines\r\nfrom [0-9]+ to [0-9]+ are:", ""),
+ r"[0-9]+\. The matching.*from [0-9]+ to [0-9]+ are:", ""),
)
test_invalid_loglevel = (
(2, r"Disconnecting\.\.\.", ""),
)
-dialogue = (
+dialogue = odict((
(test_account0_setup, "first account setup"),
(test_account1_setup, "second account setup"),
(test_actor_appears, "actor spontaneous appearance"),
(test_show_groups, "show groups"),
(test_show_group, "show group"),
(test_show_element, "show element"),
- (test_show_result, "show result of a python expression"),
+ (test_evaluate, "show results of python expressions"),
+ (test_debug_restricted, "only admins can run debug commands"),
+ (test_debug_disabled, "debugging commands only in debug mode"),
(test_show_log, "show log"),
(test_custom_loglevel, "custom loglevel"),
(test_invalid_loglevel, "invalid loglevel"),
(test_log_no_errors, "no errors logged"),
(final_cleanup, "delete remaining accounts"),
+))
+
+debug_tests = (
+ test_evaluate,
+)
+
+nondebug_tests = (
+ test_debug_disabled,
)
def start_service(config):
# Clean up any previously run daemon which didn't terminate
if os.path.exists(pidfile):
- pidfd = open(pidfile)
- pid = int(pidfd.read())
+ with open(pidfile) as pidfd:
+ pid = int(pidfd.read())
try:
# Stop the running service
os.kill(pid, 15)
# This cleans up a daemonized and disassociated service
if os.path.exists(pidfile):
- pidfd = open(pidfile)
- pid = int(pidfd.read())
+ with open(pidfile) as pidfd:
+ pid = int(pidfd.read())
try:
# Stop the running service
os.kill(pid, 15)
# Log the contents of stdout and stderr, if any
stdout, stderr = service.communicate()
tlog("\nRecording stdout as capture_stdout.log.")
- serviceout = open("capture_stdout.log", "w")
- serviceout.write(stdout.decode("utf-8"))
+ with open("capture_stdout.log", "w") as serviceout:
+ serviceout.write(stdout.decode("utf-8"))
tlog("\nRecording stderr as capture_stderr.log.")
- serviceerr = open("capture_stderr.log", "w")
- serviceerr.write(stderr.decode("utf-8"))
+ with open("capture_stderr.log", "w") as serviceerr:
+ serviceerr.write(stderr.decode("utf-8"))
return(success)
telnet_socket.send(telnetlib.IAC + telnetlib.DONT + option)
+def check_debug():
+ if len(sys.argv) > 1:
+ config = yaml.safe_load(open(sys.argv[1]))
+ return config.get(".mudpy.limit.debug", False)
+ return False
+
+
def main():
captures = ["", "", ""]
lusers = [telnetlib.Telnet(), telnetlib.Telnet(), telnetlib.Telnet()]
if len(sys.argv) > 1:
# Start the service if a config file was provided on the command line
service = start_service(sys.argv[1])
+ if not service:
+ tlog("\nERROR: Service did not start.\n")
+ sys.exit(1)
for luser in lusers:
luser.open("::1", 4000)
luser.set_option_negotiation_callback(option_callback)
- for test, description in dialogue:
+ selected_dialogue = odict(dialogue)
+ if check_debug():
+ for test in nondebug_tests:
+ del selected_dialogue[test]
+ else:
+ for test in debug_tests:
+ del selected_dialogue[test]
+ for test, description in selected_dialogue.items():
tlog("\nTesting %s..." % description)
test_start = time.time()
for conversant, question, answer in test: