-# Copyright (c) 2004-2017 Jeremy Stanley <fungi@yuggoth.org>. Permission
+# Copyright (c) 2004-2018 Jeremy Stanley <fungi@yuggoth.org>. Permission
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
(0, r'says, "Now less chatty\."', ""),
)
+test_wrapping = (
+ (0, '> ', "say " + 100 * "o"),
+ (1, r'says,\r\n"O[o]+\."', ""),
+)
+
+test_forbid_ansi_input = (
+ (0, '> ', "say \x1b[35mfoo\x1b[0m"),
+ (1, r'says, "\[35mfoo\[0m\."', ""),
+)
+
test_movement = (
(0, "> ", "move north"),
(0, r"You exit to the north\.", ""),
(2, "Whom would you like to awaken?", ""),
)
+test_telnet_iac = (
+ # Send a double (escaped) IAC byte within other text, which should get
+ # unescaped and deduplicated to a single \xff in the buffer and then
+ # the line of input discarded as a non-ASCII sequence
+ (2, "> ", b"say argle\xff\xffbargle\r\n"),
+ (2, r"Non-ASCII characters from admin: b'say argle\\xffbargle'.*> ", ""),
+)
+
+test_telnet_unknown = (
+ # Send an unsupported negotiation command #127 which should get filtered
+ # from the line of input
+ (2, "> ", b"say glop\xff\x7fglyf\r\n"),
+ (2, r'Unknown Telnet IAC command 127 ignored\..*"Glopglyf\.".*> ', ""),
+)
+
test_admin_restriction = (
(0, "> ", "help halt"),
(0, r"That is not an available command\.", "halt"),
test_reload = (
(2, "> ", "reload"),
(2, r"Reloading all code modules, configs and data\."
- r".* User admin reloaded the world\.", ""),
+ r".* User admin reloaded the world\.",
+ "show element account.admin"),
+ (2, 'These are the properties of the "account.admin" element.*'
+ r' \x1b\[32mpasshash:\r\n\x1b\[31m\$.*> ', ""),
)
test_set_facet = (
(2, r'The "mudpy\.limit" element is kept in read-only file', ""),
)
+test_show_version = (
+ (2, "> ", "show version"),
+ (2, r"Running mudpy .* on .* Python 3.*with.*pyyaml.*> ", ""),
+)
+
test_show_files = (
(2, "> ", "show files"),
(2, r'These are the current files containing the universe:.*'
(2, r"None of the [0-9]+ lines in memory matches your request\.", ""),
)
+final_cleanup = (
+ (0, "> ", "quit"),
+ (0, "What would you like to do?", "d"),
+ (0, "Whom would you like to delete?", ""),
+ (0, "What would you like to do?", "p"),
+ (0, "permanently delete your account?", "y"),
+ (0, "Disconnecting...", ""),
+ (2, "> ", "quit"),
+ (2, "What would you like to do?", "d"),
+ (2, "Whom would you like to delete?", ""),
+ (2, "What would you like to do?", "p"),
+ (2, "permanently delete your account?", "y"),
+ (2, "Disconnecting...", ""),
+)
+
dialogue = (
(test_account0_setup, "first account setup"),
(test_account1_setup, "second account setup"),
(test_typo_replacement, "typo replacement"),
(test_sentence_capitalization, "sentence capitalization"),
(test_chat_mode, "chat mode"),
+ (test_wrapping, "wrapping"),
+ (test_forbid_ansi_input, "raw escape input is filtered"),
(test_movement, "movement"),
(test_actor_disappears, "actor spontaneous disappearance"),
(test_account1_teardown, "second account teardown"),
(test_admin_setup, "admin account setup"),
+ (test_telnet_iac, "escape stray telnet iac bytes"),
+ (test_telnet_unknown, "strip unknown telnet command"),
(test_admin_restriction, "restricted admin commands"),
(test_admin_help, "admin help"),
(test_reload, "reload"),
(test_set_facet, "set facet"),
(test_set_refused, "refuse altering read-only element"),
+ (test_show_version, "show version and diagnostic info"),
(test_show_files, "show a list of loaded files"),
(test_show_file, "show nodes from a specific file"),
(test_show_groups, "show groups"),
(test_custom_loglevel, "custom loglevel"),
(test_invalid_loglevel, "invalid loglevel"),
(test_log_no_errors, "no errors logged"),
+ (final_cleanup, "delete remaining accounts"),
)
% (conversant, question, conversant))
success = False
break
- print("luser%s sending: %s" % (conversant, answer))
- lusers[conversant].write(("%s\r\n" % answer).encode("utf-8"))
- captures[conversant] += "%s\r\n" % answer
+ if type(answer) is str:
+ print("luser%s sending: %s" % (conversant, answer))
+ lusers[conversant].write(("%s\r\n" % answer).encode("utf-8"))
+ captures[conversant] += "%s\r\n" % answer
+ elif type(answer) is bytes:
+ print("luser%s sending raw bytes: %s" % (conversant, answer))
+ lusers[conversant].get_socket().send(answer)
+ captures[conversant] += "!!!RAW BYTES: %s" % answer
+ else:
+ print("ERROR: answer provided with unsupported type %s"
+ % type(answer))
+ success = False
+ break
if not success:
break
print("Completed in %.3f seconds." % (time.time() - test_start))