-# Copyright (c) 2004-2017 Jeremy Stanley <fungi@yuggoth.org>. Permission
+# Copyright (c) 2004-2018 Jeremy Stanley <fungi@yuggoth.org>. Permission
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
+import os
import re
import sys
import telnetlib
(0, r'says, "Now less chatty\."', ""),
)
+test_wrapping = (
+ (0, '> ', "say " + 100 * "o"),
+ (1, r'says,\r\n"O[o]+\."', ""),
+)
+
test_movement = (
(0, "> ", "move north"),
(0, r"You exit to the north\.", ""),
(2, "Whom would you like to awaken?", ""),
)
+test_telnet_iac = (
+ # Send a double (escaped) IAC byte within other text, which should get
+ # unescaped and deduplicated to a single \xff in the buffer and then
+ # the line of input discarded as a non-UTF-8 sequence
+ (2, "> ", b"say argle\xff\xffbargle\r\n"),
+ (2, r"Non-UTF-8 sequence from admin: b'say argle\\xffbargle'.*> ", ""),
+)
+
+test_telnet_unknown = (
+ # Send an unsupported negotiation command #127 which should get filtered
+ # from the line of input
+ (2, "> ", b"say glop\xff\x7fglyf\r\n"),
+ (2, r'Unknown Telnet IAC command 127 ignored\..*"Glopglyf\.".*> ', ""),
+)
+
test_admin_restriction = (
(0, "> ", "help halt"),
(0, r"That is not an available command\.", "halt"),
r".* User admin reloaded the world\.", ""),
)
+test_set_facet = (
+ (2, "> ", "set actor.avatar_admin_0 gender female"),
+ (2, r'You have successfully \(re\)set the "gender" facet of element', ""),
+)
+
+test_set_refused = (
+ (2, "> ", "set mudpy.limit password_tries 10"),
+ (2, r'The "mudpy\.limit" element is kept in read-only file', ""),
+)
+
+test_show_version = (
+ (2, "> ", "show version"),
+ (2, r"Running mudpy .* on .* Python 3.*with.*pyyaml.*> ", ""),
+)
+
+test_show_files = (
+ (2, "> ", "show files"),
+ (2, r'These are the current files containing the universe:.*'
+ r' \x1b\[31m\(rw\) \x1b\[32m/.*/account.yaml\x1b\[0m'
+ r' \x1b\[33m\[private\]\x1b\[0m.*> ', ""),
+)
+
+test_show_file = (
+ (2, "> ", "show file %s" %
+ os.path.join(os.getcwd(), "data/internal.yaml")),
+ (2, r'These are the nodes in the.*file:.*internal\.counters.*> ', ""),
+)
+
+test_show_groups = (
+ (2, "> ", "show groups"),
+ (2, r'These are the element groups:.*'
+ r' \x1b\[32maccount\x1b\[0m.*> ', ""),
+)
+
+test_show_group = (
+ (2, "> ", "show group account"),
+ (2, r'These are the elements in the "account" group:.*'
+ r' \x1b\[32maccount\.admin\x1b\[0m.*> ', ""),
+)
+
test_show_element = (
(2, "> ", "show element mudpy.limit"),
(2, r'These are the properties of the "mudpy\.limit" element.*'
- r' \x1b\[32mpassword_tries: \x1b\[31m[0-9]+.*> ',
- "show element actor:avatar:admin:0"),
- (2, r'These are the properties of the "actor:avatar:admin:0" element.*'
- r' \x1b\[32mgender: \x1b\[31mmale.*> ', ""),
+ r' \x1b\[32mpassword_tries: \x1b\[31m3.*> ',
+ "show element actor.avatar_admin_0"),
+ (2, r'These are the properties of the "actor.avatar_admin_0" element.*'
+ r' \x1b\[32mgender: \x1b\[31mfemale.*> ', ""),
)
test_show_log = (
)
test_custom_loglevel = (
- (2, "> ", "set account:admin loglevel 2"),
+ (2, "> ", "set account.admin loglevel 2"),
(2, "You have successfully .*> ", "show log"),
(2, r"There are [0-9]+ log lines in memory and [0-9]+ at or above level "
r"[0-9]+\. The matching lines\r\nfrom [0-9]+ to [0-9]+ are:", ""),
)
test_invalid_loglevel = (
- (2, "> ", "set account:admin loglevel two"),
+ (2, "> ", "set account.admin loglevel two"),
(2, r'''Value "two" of type "<class 'str'>" cannot be coerced .*> ''', ""),
)
(test_typo_replacement, "typo replacement"),
(test_sentence_capitalization, "sentence capitalization"),
(test_chat_mode, "chat mode"),
+ (test_wrapping, "wrapping"),
(test_movement, "movement"),
(test_actor_disappears, "actor spontaneous disappearance"),
(test_account1_teardown, "second account teardown"),
(test_admin_setup, "admin account setup"),
+ (test_telnet_iac, "escape stray telnet iac bytes"),
+ (test_telnet_unknown, "strip unknown telnet command"),
(test_admin_restriction, "restricted admin commands"),
(test_admin_help, "admin help"),
(test_reload, "reload"),
+ (test_set_facet, "set facet"),
+ (test_set_refused, "refuse altering read-only element"),
+ (test_show_version, "show version and diagnostic info"),
+ (test_show_files, "show a list of loaded files"),
+ (test_show_file, "show nodes from a specific file"),
+ (test_show_groups, "show groups"),
+ (test_show_group, "show group"),
(test_show_element, "show element"),
(test_show_log, "show log"),
(test_custom_loglevel, "custom loglevel"),
test_start = time.time()
for conversant, question, answer in test:
print("luser%s waiting for: %s" % (conversant, question))
- index, match, received = lusers[conversant].expect(
- [re.compile(question.encode("utf-8"), flags=re.DOTALL)], 5)
- captures[conversant] += received.decode("utf-8")
+ try:
+ index, match, received = lusers[conversant].expect(
+ [re.compile(question.encode("utf-8"), flags=re.DOTALL)], 5)
+ captures[conversant] += received.decode("utf-8")
+ except ConnectionResetError:
+ print("ERROR: Unable to connect to server.")
+ success = False
+ break
+ except EOFError:
+ print("ERROR: luser%s premature disconnection expecting:\n\n"
+ "%s\n\n"
+ "Check the end of capture_%s.log for received data."
+ % (conversant, question, conversant))
+ success = False
+ break
try:
captures[conversant] += lusers[
conversant].read_very_eager().decode("utf-8")
- except:
+ except Exception:
pass
if index is not 0:
print("ERROR: luser%s did not receive expected string:\n\n"
% (conversant, question, conversant))
success = False
break
- print("luser%s sending: %s" % (conversant, answer))
- lusers[conversant].write(("%s\r\n" % answer).encode("utf-8"))
- captures[conversant] += "%s\r\n" % answer
+ if type(answer) is str:
+ print("luser%s sending: %s" % (conversant, answer))
+ lusers[conversant].write(("%s\r\n" % answer).encode("utf-8"))
+ captures[conversant] += "%s\r\n" % answer
+ elif type(answer) is bytes:
+ print("luser%s sending raw bytes: %s" % (conversant, answer))
+ lusers[conversant].get_socket().send(answer)
+ captures[conversant] += "!!!RAW BYTES: %s" % answer
+ else:
+ print("ERROR: answer provided with unsupported type %s"
+ % type(answer))
+ success = False
+ break
if not success:
break
print("Completed in %.3f seconds." % (time.time() - test_start))
try:
captures[conversant] += lusers[
conversant].read_very_eager().decode("utf-8")
- except:
+ except Exception:
pass
lusers[conversant].close()
logfile = "capture_%s.log" % conversant