"""Miscellaneous functions for the mudpy engine."""
-# Copyright (c) 2004-2017 Jeremy Stanley <fungi@yuggoth.org>. Permission
+# Copyright (c) 2004-2018 Jeremy Stanley <fungi@yuggoth.org>. Permission
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
try:
line = line.decode("utf-8")
except UnicodeDecodeError:
- logline = "Non-UTF-8 characters from "
+ logline = "Non-UTF-8 sequence from "
if self.account and self.account.get("name"):
logline += self.account.get("name") + ": "
else:
"""Telnet functions and constants for the mudpy engine."""
-# Copyright (c) 2004-2017 Jeremy Stanley <fungi@yuggoth.org>. Permission
+# Copyright (c) 2004-2018 Jeremy Stanley <fungi@yuggoth.org>. Permission
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
# replace a double (literal) IAC if there's an LF later
if command is IAC:
- if text.find("\n", position) > 0:
+ if text.find(b"\n", position) > 0:
position += 1
text = text[:position] + text[position + 1:]
else:
-# Copyright (c) 2004-2017 Jeremy Stanley <fungi@yuggoth.org>. Permission
+# Copyright (c) 2004-2018 Jeremy Stanley <fungi@yuggoth.org>. Permission
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
(2, "Whom would you like to awaken?", ""),
)
+test_telnet_iac = (
+ # Send a double (escaped) IAC byte within other text, which should get
+ # unescaped and deduplicated to a single \xff in the buffer and then
+ # the line of input discarded as a non-UTF-8 sequence
+ (2, "> ", b"say argle\xff\xffbargle\r\n"),
+ (2, r"Non-UTF-8 sequence from admin: b'say argle\\xffbargle'.*> ", ""),
+)
+
test_admin_restriction = (
(0, "> ", "help halt"),
(0, r"That is not an available command\.", "halt"),
(test_actor_disappears, "actor spontaneous disappearance"),
(test_account1_teardown, "second account teardown"),
(test_admin_setup, "admin account setup"),
+ (test_telnet_iac, "escape stray telnet iac bytes"),
(test_admin_restriction, "restricted admin commands"),
(test_admin_help, "admin help"),
(test_reload, "reload"),
% (conversant, question, conversant))
success = False
break
- print("luser%s sending: %s" % (conversant, answer))
- lusers[conversant].write(("%s\r\n" % answer).encode("utf-8"))
- captures[conversant] += "%s\r\n" % answer
+ if type(answer) is str:
+ print("luser%s sending: %s" % (conversant, answer))
+ lusers[conversant].write(("%s\r\n" % answer).encode("utf-8"))
+ captures[conversant] += "%s\r\n" % answer
+ elif type(answer) is bytes:
+ print("luser%s sending raw bytes: %s" % (conversant, answer))
+ lusers[conversant].get_socket().send(answer)
+ captures[conversant] += "!!!RAW BYTES: %s" % answer
+ else:
+ print("ERROR: answer provided with unsupported type %s"
+ % type(answer))
+ success = False
+ break
if not success:
break
print("Completed in %.3f seconds." % (time.time() - test_start))