1 """Miscellaneous functions for the mudpy engine."""
3 # Copyright (c) 2004-2021 mudpy authors. Permission to use, copy,
4 # modify, and distribute this software is granted under terms
5 # provided in the LICENSE file distributed with this software.
25 """An element of the universe."""
27 def __init__(self, key, universe, origin=None):
28 """Set up a new element."""
30 # keep track of our key name
33 # keep track of what universe it's loading into
34 self.universe = universe
36 # set of facet keys from the universe
37 self.facethash = dict()
39 # not owned by a user by default (used for avatars)
42 # no contents in here by default
45 if self.key.find(".") > 0:
46 self.group, self.subkey = self.key.split(".")[-2:]
49 self.subkey = self.key
50 if self.group not in self.universe.groups:
51 self.universe.groups[self.group] = {}
53 # get an appropriate origin
55 self.universe.add_group(self.group)
56 origin = self.universe.files[
57 self.universe.origins[self.group]["fallback"]]
59 # record or reset a pointer to the origin file
60 self.origin = self.universe.files[origin.source]
62 # add or replace this element in the universe
63 self.universe.contents[self.key] = self
64 self.universe.groups[self.group][self.subkey] = self
67 """Create a new element and replace this one."""
68 args = (self.key, self.universe, self.origin)
73 """Remove an element from the universe and destroy it."""
74 for facet in dict(self.facethash):
75 self.remove_facet(facet)
76 del self.universe.groups[self.group][self.subkey]
77 del self.universe.contents[self.key]
81 """Return a list of non-inherited facets for this element."""
84 def has_facet(self, facet):
85 """Return whether the non-inherited facet exists."""
86 return facet in self.facets()
88 def remove_facet(self, facet):
89 """Remove a facet from the element."""
90 if ".".join((self.key, facet)) in self.origin.data:
91 del self.origin.data[".".join((self.key, facet))]
92 if facet in self.facethash:
93 del self.facethash[facet]
94 self.origin.modified = True
97 """Return a list of the element's inheritance lineage."""
98 if self.has_facet("inherit"):
99 ancestry = self.get("inherit")
102 for parent in ancestry[:]:
103 ancestors = self.universe.contents[parent].ancestry()
104 for ancestor in ancestors:
105 if ancestor not in ancestry:
106 ancestry.append(ancestor)
111 def get(self, facet, default=None):
112 """Retrieve values."""
116 return self.origin.data[".".join((self.key, facet))]
117 except (KeyError, TypeError):
119 if self.has_facet("inherit"):
120 for ancestor in self.ancestry():
121 if self.universe.contents[ancestor].has_facet(facet):
122 return self.universe.contents[ancestor].get(facet)
126 def set(self, facet, value):
128 if not self.origin.is_writeable() and not self.universe.loading:
129 # break if there is an attempt to update an element from a
130 # read-only file, unless the universe is in the midst of loading
131 # updated data from files
132 raise PermissionError("Altering elements in read-only files is "
134 # Coerce some values to appropriate data types
135 # TODO(fungi) Move these to a separate validation mechanism
136 if facet in ["loglevel"]:
138 elif facet in ["administrator"]:
141 # The canonical node for this facet within its origin
142 node = ".".join((self.key, facet))
144 if node not in self.origin.data or self.origin.data[node] != value:
145 # Be careful to only update the origin's contents when required,
146 # since that affects whether the backing file gets written
147 self.origin.data[node] = value
148 self.origin.modified = True
150 # Make sure this facet is included in the element's facets
151 self.facethash[facet] = self.origin.data[node]
153 def append(self, facet, value):
154 """Append value to a list."""
155 newlist = self.get(facet)
158 if type(newlist) is not list:
159 newlist = list(newlist)
160 newlist.append(value)
161 self.set(facet, newlist)
171 add_terminator=False,
174 """Convenience method to pass messages to an owner."""
187 def is_restricted(self):
188 """Boolean check whether command is administrative or debugging."""
189 return bool(self.get("administrative") or self.get("debugging"))
192 """Boolean check whether an actor is controlled by an admin owner."""
193 return self.owner and self.owner.is_admin()
195 def can_run(self, command):
196 """Check if the user can run this command object."""
198 # has to be in the commands group
199 if command not in self.universe.groups["command"].values():
202 # debugging commands are not allowed outside debug mode
203 if command.get("debugging") and not self.universe.debug_mode():
206 # avatars of administrators can run any command
210 # everyone can run non-administrative commands
211 if not command.is_restricted():
214 # otherwise the command cannot be run by this actor
217 def update_location(self):
218 """Make sure the location's contents contain this element."""
219 area = self.get("location")
220 if area in self.universe.contents:
221 self.universe.contents[area].contents[self.key] = self
223 def clean_contents(self):
224 """Make sure the element's contents aren't bogus."""
225 for element in self.contents.values():
226 if element.get("location") != self.key:
227 del self.contents[element.key]
229 def go_to(self, area):
230 """Relocate the element to a specific area."""
231 current = self.get("location")
232 if current and self.key in self.universe.contents[current].contents:
233 del universe.contents[current].contents[self.key]
234 if area in self.universe.contents:
235 self.set("location", area)
236 self.universe.contents[area].contents[self.key] = self
240 """Relocate the element to its default location."""
241 self.go_to(self.get("default_location"))
242 self.echo_to_location(
243 "You suddenly realize that " + self.get("name") + " is here."
246 def move_direction(self, direction):
247 """Relocate the element in a specified direction."""
248 motion = self.universe.contents["mudpy.movement.%s" % direction]
249 enter_term = motion.get("enter_term")
250 exit_term = motion.get("exit_term")
251 self.echo_to_location("%s exits %s." % (self.get("name"), exit_term))
252 self.send("You exit %s." % exit_term, add_prompt=False)
254 self.universe.contents[
255 self.get("location")].link_neighbor(direction)
257 self.echo_to_location("%s arrives from %s." % (
258 self.get("name"), enter_term))
260 def look_at(self, key):
261 """Show an element to another element."""
263 element = self.universe.contents[key]
265 name = element.get("name")
267 message += "$(cyn)" + name + "$(nrm)$(eol)"
268 description = element.get("description")
270 message += description + "$(eol)"
271 portal_list = list(element.portals().keys())
274 message += "$(cyn)[ Exits: " + ", ".join(
277 for element in self.universe.contents[
280 if element.get("is_actor") and element is not self:
281 message += "$(yel)" + element.get(
283 ) + " is here.$(nrm)$(eol)"
284 elif element is not self:
285 message += "$(grn)" + element.get(
291 """Map the portal directions for an area to neighbors."""
293 if re.match(r"""^area\.-?\d+,-?\d+,-?\d+$""", self.key):
294 coordinates = [(int(x))
295 for x in self.key.split(".")[-1].split(",")]
298 self.universe.contents["mudpy.movement.%s" % x].get("vector")
299 ) for x in self.universe.directions)
300 for portal in self.get("gridlinks"):
301 adjacent = map(lambda c, o: c + o,
302 coordinates, offsets[portal])
303 neighbor = "area." + ",".join(
304 [(str(x)) for x in adjacent]
306 if neighbor in self.universe.contents:
307 portals[portal] = neighbor
308 for facet in self.facets():
309 if facet.startswith("link_"):
310 neighbor = self.get(facet)
311 if neighbor in self.universe.contents:
312 portal = facet.split("_")[1]
313 portals[portal] = neighbor
316 def link_neighbor(self, direction):
317 """Return the element linked in a given direction."""
318 portals = self.portals()
319 if direction in portals:
320 return portals[direction]
322 def echo_to_location(self, message):
323 """Show a message to other elements in the current location."""
324 for element in self.universe.contents[
327 if element is not self:
328 element.send(message)
335 def __init__(self, filename="", load=False):
336 """Initialize the universe."""
339 self.directions = set()
343 self.reload_flag = False
344 self.setup_loglines = []
345 self.startdir = os.getcwd()
346 self.terminate_flag = False
350 possible_filenames = [
352 "/usr/local/mudpy/etc/mudpy.yaml",
353 "/usr/local/etc/mudpy.yaml",
354 "/etc/mudpy/mudpy.yaml",
357 for filename in possible_filenames:
358 if os.access(filename, os.R_OK):
360 if not os.path.isabs(filename):
361 filename = os.path.join(self.startdir, filename)
362 self.filename = filename
364 # make sure to preserve any accumulated log entries during load
365 self.setup_loglines += self.load()
368 """Load universe data from persistent storage."""
370 # while loading, it's safe to update elements from read-only files
373 # it's possible for this to enter before logging configuration is read
374 pending_loglines = []
376 # start populating the (re)files dict from the base config
378 mudpy.data.Data(self.filename, self)
380 # load default storage locations for groups
381 if hasattr(self, "contents") and "mudpy.filing" in self.contents:
382 self.origins.update(self.contents["mudpy.filing"].get(
385 # add some builtin groups we know we'll need
386 for group in ("account", "actor", "internal"):
387 self.add_group(group)
389 # make a list of inactive avatars
390 inactive_avatars = []
391 for account in self.groups.get("account", {}).values():
392 for avatar in account.get("avatars"):
394 inactive_avatars.append(self.contents[avatar])
396 pending_loglines.append((
397 'Missing avatar "%s", possible data corruption' %
399 for user in self.userlist:
400 if user.avatar in inactive_avatars:
401 inactive_avatars.remove(user.avatar)
403 # another pass to straighten out all the element contents
404 for element in self.contents.values():
405 element.update_location()
406 element.clean_contents()
408 # warn when debug mode has been engaged
409 if self.debug_mode():
410 pending_loglines.append((
411 "WARNING: Unsafe debugging mode is enabled!", 6))
413 # done loading, so disallow updating elements from read-only files
416 return pending_loglines
419 """Create a new, empty Universe (the Big Bang)."""
420 new_universe = Universe()
421 for attribute in vars(self).keys():
422 setattr(new_universe, attribute, getattr(self, attribute))
423 new_universe.reload_flag = False
428 """Save the universe to persistent storage."""
429 for key in self.files:
430 self.files[key].save()
432 def initialize_server_socket(self):
433 """Create and open the listening socket."""
435 # need to know the local address and port number for the listener
436 host = self.contents["mudpy.network"].get("host")
437 port = self.contents["mudpy.network"].get("port")
439 # if no host was specified, bind to the loopback address (preferring
447 # figure out if this is ipv4 or v6
448 family = socket.getaddrinfo(host, port)[0][0]
449 if family is socket.AF_INET6 and not socket.has_ipv6:
450 log("No support for IPv6 address %s (use IPv4 instead)." % host)
452 # create a new stream-type socket object
453 self.listening_socket = socket.socket(family, socket.SOCK_STREAM)
455 # set the socket options to allow existing open ones to be
456 # reused (fixes a bug where the server can't bind for a minute
457 # when restarting on linux systems)
458 self.listening_socket.setsockopt(
459 socket.SOL_SOCKET, socket.SO_REUSEADDR, 1
462 # bind the socket to to our desired server ipa and port
463 self.listening_socket.bind((host, port))
465 # disable blocking so we can proceed whether or not we can
467 self.listening_socket.setblocking(0)
469 # start listening on the socket
470 self.listening_socket.listen(1)
472 # note that we're now ready for user connections
473 log("Listening for Telnet connections on %s port %s" % (
477 """Convenience method to get the elapsed time counter."""
479 return self.groups["internal"]["counters"].get("elapsed", 0)
483 def set_time(self, elapsed):
484 """Convenience method to set the elapsed time counter."""
486 self.groups["internal"]["counters"].set("elapsed", elapsed)
488 # add an element for counters if it doesn't exist
489 Element("internal.counters", universe)
490 self.groups["internal"]["counters"].set("elapsed", elapsed)
492 def add_group(self, group, fallback=None):
493 """Set up group tracking/metadata."""
494 if group not in self.origins:
495 self.origins[group] = {}
497 fallback = mudpy.data.find_file(
498 ".".join((group, "yaml")), universe=self)
499 if "fallback" not in self.origins[group]:
500 self.origins[group]["fallback"] = fallback
501 flags = self.origins[group].get("flags", None)
502 if fallback not in self.files:
503 mudpy.data.Data(fallback, self, flags=flags)
505 def debug_mode(self):
506 """Boolean method to indicate whether unsafe debugging is enabled."""
507 return self.groups["mudpy"]["limit"].get("debug", False)
512 """This is a connected user."""
515 """Default values for the in-memory user variables."""
518 self.authenticated = False
522 self.connection = None
524 self.input_queue = []
525 self.last_address = ""
526 self.last_input = universe.get_time()
527 self.menu_choices = {}
528 self.menu_seen = False
529 self.negotiation_pause = 0
530 self.output_queue = []
531 self.partial_input = b""
532 self.password_tries = 0
534 self.state = "telopt_negotiation"
537 self.universe = universe
540 """Log, close the connection and remove."""
542 name = self.account.get("name", self)
545 log("Logging out %s" % name, 2)
546 self.deactivate_avatar()
547 self.connection.close()
550 def check_idle(self):
551 """Warn or disconnect idle users as appropriate."""
552 idletime = universe.get_time() - self.last_input
553 linkdead_dict = universe.contents[
554 "mudpy.timing.idle.disconnect"].facets()
555 if self.state in linkdead_dict:
556 linkdead_state = self.state
558 linkdead_state = "default"
559 if idletime > linkdead_dict[linkdead_state]:
561 "$(eol)$(red)You've done nothing for far too long... goodbye!"
566 logline = "Disconnecting "
567 if self.account and self.account.get("name"):
568 logline += self.account.get("name")
570 logline += "an unknown user"
571 logline += (" after idling too long in the " + self.state
574 self.state = "disconnecting"
575 self.menu_seen = False
576 idle_dict = universe.contents["mudpy.timing.idle.warn"].facets()
577 if self.state in idle_dict:
578 idle_state = self.state
580 idle_state = "default"
581 if idletime == idle_dict[idle_state]:
583 "$(eol)$(red)If you continue to be unproductive, "
584 + "you'll be shown the door...$(nrm)$(eol)"
588 """Save, load a new user and relocate the connection."""
590 # copy old attributes
591 attributes = self.__dict__
593 # get out of the list
596 # get rid of the old user object
599 # create a new user object
602 # set everything equivalent
603 new_user.__dict__ = attributes
605 # the avatar needs a new owner
607 new_user.account = universe.contents[new_user.account.key]
608 new_user.avatar = universe.contents[new_user.avatar.key]
609 new_user.avatar.owner = new_user
612 universe.userlist.append(new_user)
614 def replace_old_connections(self):
615 """Disconnect active users with the same name."""
617 # the default return value
620 # iterate over each user in the list
621 for old_user in universe.userlist:
623 # the name is the same but it's not us
626 ) and old_user.account and old_user.account.get(
628 ) == self.account.get(
630 ) and old_user is not self:
634 "User " + self.account.get(
636 ) + " reconnected--closing old connection to "
637 + old_user.address + ".",
641 "$(eol)$(red)New connection from " + self.address
642 + ". Terminating old connection...$(nrm)$(eol)",
647 # close the old connection
648 old_user.connection.close()
650 # replace the old connection with this one
652 "$(eol)$(red)Taking over old connection from "
653 + old_user.address + ".$(nrm)"
655 old_user.connection = self.connection
656 old_user.last_address = old_user.address
657 old_user.address = self.address
658 old_user.telopts = self.telopts
659 old_user.adjust_echoing()
661 # take this one out of the list and delete
667 # true if an old connection was replaced, false if not
670 def authenticate(self):
671 """Flag the user as authenticated and disconnect duplicates."""
672 if self.state != "authenticated":
673 self.authenticated = True
674 log("User %s authenticated for account %s." % (
675 self, self.account.subkey), 2)
676 if ("mudpy.limit" in universe.contents and self.account.subkey in
677 universe.contents["mudpy.limit"].get("admins")):
678 self.account.set("administrator", True)
679 log("Account %s is an administrator." % (
680 self.account.subkey), 2)
683 """Send the user their current menu."""
684 if not self.menu_seen:
685 self.menu_choices = get_menu_choices(self)
687 get_menu(self.state, self.error, self.menu_choices),
691 self.menu_seen = True
693 self.adjust_echoing()
696 """"Generate and return an input prompt."""
698 # Start with the user's preference, if one was provided
699 prompt = self.account.get("prompt")
701 # If the user has not set a prompt, then immediately return the default
702 # provided for the current state
704 return get_menu_prompt(self.state)
706 # Allow including the World clock state
707 if "$_(time)" in prompt:
708 prompt = prompt.replace(
710 str(universe.get_time()))
712 # Append a single space for clear separation from user input
713 if prompt[-1] != " ":
714 prompt = "%s " % prompt
716 # Return the cooked prompt
719 def adjust_echoing(self):
720 """Adjust echoing to match state menu requirements."""
721 if mudpy.telnet.is_enabled(self, mudpy.telnet.TELOPT_ECHO,
723 if menu_echo_on(self.state):
724 mudpy.telnet.disable(self, mudpy.telnet.TELOPT_ECHO,
726 elif not menu_echo_on(self.state):
727 mudpy.telnet.enable(self, mudpy.telnet.TELOPT_ECHO,
731 """Remove a user from the list of connected users."""
732 log("Disconnecting account %s." % self, 0)
733 universe.userlist.remove(self)
743 add_terminator=False,
746 """Send arbitrary text to a connected user."""
748 # unless raw mode is on, clean it up all nice and pretty
751 # strip extra $(eol) off if present
752 while output.startswith("$(eol)"):
754 while output.endswith("$(eol)"):
756 extra_lines = output.find("$(eol)$(eol)$(eol)")
757 while extra_lines > -1:
758 output = output[:extra_lines] + output[extra_lines + 6:]
759 extra_lines = output.find("$(eol)$(eol)$(eol)")
761 # start with a newline, append the message, then end
762 # with the optional eol string passed to this function
763 # and the ansi escape to return to normal text
764 if not just_prompt and prepend_padding:
765 if (not self.output_queue or not
766 self.output_queue[-1].endswith(b"\r\n")):
767 output = "$(eol)" + output
768 elif not self.output_queue[-1].endswith(
770 ) and not self.output_queue[-1].endswith(
773 output = "$(eol)" + output
774 output += eol + chr(27) + "[0m"
776 # tack on a prompt if active
777 if self.state == "active":
781 output += self.prompt()
782 mode = self.avatar.get("mode")
784 output += "(" + mode + ") "
786 # find and replace macros in the output
787 output = replace_macros(self, output)
789 # wrap the text at the client's width (min 40, 0 disables)
791 if self.columns < 40:
795 output = wrap_ansi_text(output, wrap)
797 # if supported by the client, encode it utf-8
798 if mudpy.telnet.is_enabled(self, mudpy.telnet.TELOPT_BINARY,
800 encoded_output = output.encode("utf-8")
802 # otherwise just send ascii
804 encoded_output = output.encode("ascii", "replace")
806 # end with a terminator if requested
807 if add_prompt or add_terminator:
808 if mudpy.telnet.is_enabled(
809 self, mudpy.telnet.TELOPT_EOR, mudpy.telnet.US):
810 encoded_output += mudpy.telnet.telnet_proto(
811 mudpy.telnet.IAC, mudpy.telnet.EOR)
812 elif not mudpy.telnet.is_enabled(
813 self, mudpy.telnet.TELOPT_SGA, mudpy.telnet.US):
814 encoded_output += mudpy.telnet.telnet_proto(
815 mudpy.telnet.IAC, mudpy.telnet.GA)
817 # and tack it onto the queue
818 self.output_queue.append(encoded_output)
820 # if this is urgent, flush all pending output
824 # just dump raw bytes as requested
826 self.output_queue.append(output)
830 """All the things to do to the user per increment."""
832 # if the world is terminating, disconnect
833 if universe.terminate_flag:
834 self.state = "disconnecting"
835 self.menu_seen = False
837 # check for an idle connection and act appropriately
841 # ask the client for their current terminal type (RFC 1091); it's None
842 # if it's not been initialized, the empty string if it has but the
843 # output was indeterminate, "UNKNOWN" if the client specified it has no
844 # terminal types to supply
845 if self.ttype is None:
846 mudpy.telnet.request_ttype(self)
848 # if output is paused, decrement the counter
849 if self.state == "telopt_negotiation":
850 if self.negotiation_pause:
851 self.negotiation_pause -= 1
853 self.state = "entering_account_name"
855 # show the user a menu as needed
856 elif not self.state == "active":
859 # flush any pending output in the queue
862 # disconnect users with the appropriate state
863 if self.state == "disconnecting":
866 # check for input and add it to the queue
869 # there is input waiting in the queue
871 handle_user_input(self)
874 """Try to send the last item in the queue and remove it."""
875 if self.output_queue:
877 self.connection.send(self.output_queue[0])
878 except (BrokenPipeError, ConnectionResetError):
879 if self.account and self.account.get("name"):
880 account = self.account.get("name")
882 account = "an unknown user"
883 self.state = "disconnecting"
884 log("Disconnected while sending to %s." % account, 7)
885 del self.output_queue[0]
887 def enqueue_input(self):
888 """Process and enqueue any new input."""
890 # check for some input
892 raw_input = self.connection.recv(1024)
899 # tack this on to any previous partial
900 self.partial_input += raw_input
902 # reply to and remove any IAC negotiation codes
903 mudpy.telnet.negotiate_telnet_options(self)
905 # separate multiple input lines
906 new_input_lines = self.partial_input.split(b"\r\0")
907 if len(new_input_lines) == 1:
908 new_input_lines = new_input_lines[0].split(b"\r\n")
910 # if input doesn't end in a newline, replace the
911 # held partial input with the last line of it
913 self.partial_input.endswith(b"\r\0") or
914 self.partial_input.endswith(b"\r\n")):
915 self.partial_input = new_input_lines.pop()
917 # otherwise, chop off the extra null input and reset
918 # the held partial input
920 new_input_lines.pop()
921 self.partial_input = b""
923 # iterate over the remaining lines
924 for line in new_input_lines:
926 # strip off extra whitespace
929 # log non-printable characters remaining
930 if not mudpy.telnet.is_enabled(
931 self, mudpy.telnet.TELOPT_BINARY, mudpy.telnet.HIM):
932 asciiline = bytes([x for x in line if 32 <= x <= 126])
933 if line != asciiline:
934 logline = "Non-ASCII characters from "
935 if self.account and self.account.get("name"):
936 logline += self.account.get("name") + ": "
938 logline += "unknown user: "
939 logline += repr(line)
944 line = line.decode("utf-8")
945 except UnicodeDecodeError:
946 logline = "Non-UTF-8 sequence from "
947 if self.account and self.account.get("name"):
948 logline += self.account.get("name") + ": "
950 logline += "unknown user: "
951 logline += repr(line)
955 line = unicodedata.normalize("NFKC", line)
957 # put on the end of the queue
958 self.input_queue.append(line)
960 def new_avatar(self):
961 """Instantiate a new, unconfigured avatar for this user."""
963 while ("avatar_%s_%s" % (self.account.get("name"), counter)
964 in universe.groups.get("actor", {}).keys()):
966 self.avatar = Element(
967 "actor.avatar_%s_%s" % (self.account.get("name"), counter),
969 self.avatar.append("inherit", "archetype.avatar")
970 self.account.append("avatars", self.avatar.key)
971 log("Created new avatar %s for user %s." % (
972 self.avatar.key, self.account.get("name")), 0)
974 def delete_avatar(self, avatar):
975 """Remove an avatar from the world and from the user's list."""
976 if self.avatar is universe.contents[avatar]:
978 log("Deleting avatar %s for user %s." % (
979 avatar, self.account.get("name")), 0)
980 universe.contents[avatar].destroy()
981 avatars = self.account.get("avatars")
982 avatars.remove(avatar)
983 self.account.set("avatars", avatars)
985 def activate_avatar_by_index(self, index):
986 """Enter the world with a particular indexed avatar."""
987 self.avatar = universe.contents[
988 self.account.get("avatars")[index]]
989 self.avatar.owner = self
990 self.state = "active"
991 log("Activated avatar %s (%s)." % (
992 self.avatar.get("name"), self.avatar.key), 0)
993 self.avatar.go_home()
995 def deactivate_avatar(self):
996 """Have the active avatar leave the world."""
998 log("Deactivating avatar %s (%s) for user %s." % (
999 self.avatar.get("name"), self.avatar.key,
1000 self.account.get("name")), 0)
1001 current = self.avatar.get("location")
1003 self.avatar.set("default_location", current)
1004 self.avatar.echo_to_location(
1005 "You suddenly wonder where " + self.avatar.get(
1009 del universe.contents[current].contents[self.avatar.key]
1010 self.avatar.remove_facet("location")
1011 self.avatar.owner = None
1015 """Destroy the user and associated avatars."""
1016 for avatar in self.account.get("avatars"):
1017 self.delete_avatar(avatar)
1018 log("Destroying account %s for user %s." % (
1019 self.account.get("name"), self), 0)
1020 self.account.destroy()
1022 def list_avatar_names(self):
1023 """List names of assigned avatars."""
1025 for avatar in self.account.get("avatars"):
1027 avatars.append(universe.contents[avatar].get("name"))
1029 log('Missing avatar "%s", possible data corruption.' %
1034 """Boolean check whether user's account is an admin."""
1035 return self.account.get("administrator", False)
1038 def broadcast(message, add_prompt=True):
1039 """Send a message to all connected users."""
1040 for each_user in universe.userlist:
1041 each_user.send("$(eol)" + message, add_prompt=add_prompt)
1044 def log(message, level=0):
1045 """Log a message."""
1047 # a couple references we need
1048 if "mudpy.log" in universe.contents:
1049 file_name = universe.contents["mudpy.log"].get("file", "")
1050 max_log_lines = universe.contents["mudpy.log"].get("lines", 0)
1051 syslog_name = universe.contents["mudpy.log"].get("syslog", "")
1056 timestamp = datetime.datetime.now().isoformat(' ')
1058 # turn the message into a list of nonempty lines
1059 lines = [x for x in [(x.rstrip()) for x in message.split("\n")] if x != ""]
1061 # send the timestamp and line to a file
1063 if not os.path.isabs(file_name):
1064 file_name = os.path.join(universe.startdir, file_name)
1065 os.makedirs(os.path.dirname(file_name), exist_ok=True)
1066 file_descriptor = codecs.open(file_name, "a", "utf-8")
1068 file_descriptor.write(timestamp + " " + line + "\n")
1069 file_descriptor.flush()
1070 file_descriptor.close()
1072 # send the timestamp and line to standard output
1073 if ("mudpy.log" in universe.contents and
1074 universe.contents["mudpy.log"].get("stdout")):
1076 print(timestamp + " " + line)
1078 # send the line to the system log
1081 syslog_name.encode("utf-8"),
1083 syslog.LOG_INFO | syslog.LOG_DAEMON
1089 # display to connected administrators
1090 for user in universe.userlist:
1092 user.state == "active"
1094 and user.account.get("loglevel", 0) <= level):
1095 # iterate over every line in the message
1099 "$(bld)$(red)" + timestamp + " "
1100 + line.replace("$(", "$_(") + "$(nrm)$(eol)")
1101 user.send(full_message, flush=True)
1103 # add to the recent log list
1105 while 0 < len(universe.loglines) >= max_log_lines:
1106 del universe.loglines[0]
1107 universe.loglines.append((timestamp + " " + line, level))
1110 def get_loglines(level, start, stop):
1111 """Return a specific range of loglines filtered by level."""
1113 # filter the log lines
1114 loglines = [x for x in universe.loglines if x[1] >= level]
1116 # we need these in several places
1117 total_count = str(len(universe.loglines))
1118 filtered_count = len(loglines)
1120 # don't proceed if there are no lines
1123 # can't start before the beginning or at the end
1124 if start > filtered_count:
1125 start = filtered_count
1129 # can't stop before we start
1137 "There are %s log lines in memory and %s at or above level %s. "
1138 "The matching lines from %s to %s are:$(eol)$(eol)" %
1139 (total_count, filtered_count, level, stop, start))
1141 # add the text from the selected lines
1143 range_lines = loglines[-start:-(stop - 1)]
1145 range_lines = loglines[-start:]
1146 for line in range_lines:
1147 message += " (%s) %s$(eol)" % (
1148 line[1], line[0].replace("$(", "$_("))
1150 # there were no lines
1152 message = "None of the %s lines in memory matches your request." % (
1159 def glyph_columns(character):
1160 """Convenience function to return the column width of a glyph."""
1161 if unicodedata.east_asian_width(character) in "FW":
1167 def wrap_ansi_text(text, width):
1168 """Wrap text with arbitrary width while ignoring ANSI colors."""
1170 # the current position in the entire text string, including all
1171 # characters, printable or otherwise
1174 # the current text position relative to the beginning of the line,
1175 # ignoring color escape sequences
1178 # the absolute and relative positions of the most recent whitespace
1180 last_abs_whitespace = 0
1181 last_rel_whitespace = 0
1183 # whether the current character is part of a color escape sequence
1186 # normalize any potentially composited unicode before we count it
1187 text = unicodedata.normalize("NFKC", text)
1189 # iterate over each character from the beginning of the text
1190 for each_character in text:
1192 # the current character is the escape character
1193 if each_character == "\x1b" and not escape:
1197 # the current character is within an escape sequence
1200 if each_character == "m":
1201 # the current character is m, which terminates the
1205 # the current character is a space
1206 elif each_character == " ":
1207 last_abs_whitespace = abs_pos
1208 last_rel_whitespace = rel_pos
1210 # the current character is a newline, so reset the relative
1211 # position too (start a new line)
1212 elif each_character == "\n":
1214 last_abs_whitespace = abs_pos
1215 last_rel_whitespace = rel_pos
1217 # the current character meets the requested maximum line width, so we
1218 # need to wrap unless the current word is wider than the terminal (in
1219 # which case we let it do the wrapping instead)
1220 if last_rel_whitespace != 0 and (rel_pos > width or (
1221 rel_pos > width - 1 and glyph_columns(each_character) == 2)):
1223 # insert an eol in place of the last space
1224 text = (text[:last_abs_whitespace] + "\r\n" +
1225 text[last_abs_whitespace + 1:])
1227 # increase the absolute position because an eol is two
1228 # characters but the space it replaced was only one
1231 # now we're at the beginning of a new line, plus the
1232 # number of characters wrapped from the previous line
1233 rel_pos -= last_rel_whitespace
1234 last_rel_whitespace = 0
1236 # as long as the character is not a carriage return and the
1237 # other above conditions haven't been met, count it as a
1238 # printable character
1239 elif each_character != "\r":
1240 rel_pos += glyph_columns(each_character)
1241 if each_character in (" ", "\n"):
1242 last_abs_whitespace = abs_pos
1243 last_rel_whitespace = rel_pos
1245 # increase the absolute position for every character
1248 # return the newly-wrapped text
1252 def weighted_choice(data):
1253 """Takes a dict weighted by value and returns a random key."""
1255 # this will hold our expanded list of keys from the data
1258 # create the expanded list of keys
1259 for key in data.keys():
1260 for _count in range(data[key]):
1261 expanded.append(key)
1263 # return one at random
1264 # Allow the random.randrange() call in bandit since it's not used for
1265 # security/cryptographic purposes
1266 return random.choice(expanded) # nosec
1270 """Returns a random character name."""
1272 # the vowels and consonants needed to create romaji syllables
1301 # this dict will hold our weighted list of syllables
1304 # generate the list with an even weighting
1305 for consonant in consonants:
1306 for vowel in vowels:
1307 syllables[consonant + vowel] = 1
1309 # we'll build the name into this string
1312 # create a name of random length from the syllables
1313 # Allow the random.randrange() call in bandit since it's not used for
1314 # security/cryptographic purposes
1315 for _syllable in range(random.randrange(2, 6)): # nosec
1316 name += weighted_choice(syllables)
1318 # strip any leading quotemark, capitalize and return the name
1319 return name.strip("'").capitalize()
1322 def replace_macros(user, text, is_input=False):
1323 """Replaces macros in text output."""
1325 # third person pronouns
1327 "female": {"obj": "her", "pos": "hers", "sub": "she"},
1328 "male": {"obj": "him", "pos": "his", "sub": "he"},
1329 "neuter": {"obj": "it", "pos": "its", "sub": "it"}
1332 # a dict of replacement macros
1335 "bld": chr(27) + "[1m",
1336 "nrm": chr(27) + "[0m",
1337 "blk": chr(27) + "[30m",
1338 "blu": chr(27) + "[34m",
1339 "cyn": chr(27) + "[36m",
1340 "grn": chr(27) + "[32m",
1341 "mgt": chr(27) + "[35m",
1342 "red": chr(27) + "[31m",
1343 "yel": chr(27) + "[33m",
1346 # add dynamic macros where possible
1348 account_name = user.account.get("name")
1350 macros["account"] = account_name
1352 avatar_gender = user.avatar.get("gender")
1354 macros["tpop"] = pronouns[avatar_gender]["obj"]
1355 macros["tppp"] = pronouns[avatar_gender]["pos"]
1356 macros["tpsp"] = pronouns[avatar_gender]["sub"]
1361 # find and replace per the macros dict
1362 macro_start = text.find("$(")
1363 if macro_start == -1:
1365 macro_end = text.find(")", macro_start) + 1
1366 macro = text[macro_start + 2:macro_end - 1]
1367 if macro in macros.keys():
1368 replacement = macros[macro]
1370 # this is how we handle local file inclusion (dangerous!)
1371 elif macro.startswith("inc:"):
1372 incfile = mudpy.data.find_file(macro[4:], universe=universe)
1373 if os.path.exists(incfile):
1375 with codecs.open(incfile, "r", "utf-8") as incfd:
1377 if line.endswith("\n") and not line.endswith("\r\n"):
1378 line = line.replace("\n", "\r\n")
1380 # lose the trailing eol
1381 replacement = replacement[:-2]
1384 log("Couldn't read included " + incfile + " file.", 7)
1386 # if we get here, log and replace it with null
1390 log("Unexpected replacement macro " +
1391 macro + " encountered.", 6)
1393 # and now we act on the replacement
1394 text = text.replace("$(" + macro + ")", replacement)
1396 # replace the look-like-a-macro sequence
1397 text = text.replace("$_(", "$(")
1402 def escape_macros(value):
1403 """Escapes replacement macros in text."""
1404 if type(value) is str:
1405 return value.replace("$(", "$_(")
1410 def first_word(text, separator=" "):
1411 """Returns a tuple of the first word and the rest."""
1413 if text.find(separator) > 0:
1414 return text.split(separator, 1)
1422 """The things which should happen on each pulse, aside from reloads."""
1424 # increase the elapsed increment counter
1425 universe.set_time(universe.get_time() + 1)
1427 # update the log every now and then
1428 if not universe.groups["internal"]["counters"].get("mark"):
1429 log(str(len(universe.userlist)) + " connection(s)")
1430 universe.groups["internal"]["counters"].set(
1431 "mark", universe.contents["mudpy.timing"].get("status")
1434 universe.groups["internal"]["counters"].set(
1435 "mark", universe.groups["internal"]["counters"].get(
1440 # periodically save everything
1441 if not universe.groups["internal"]["counters"].get("save"):
1443 universe.groups["internal"]["counters"].set(
1444 "save", universe.contents["mudpy.timing"].get("save")
1447 universe.groups["internal"]["counters"].set(
1448 "save", universe.groups["internal"]["counters"].get(
1453 # open the listening socket if it hasn't been already
1454 if not hasattr(universe, "listening_socket"):
1455 universe.initialize_server_socket()
1457 # assign a user if a new connection is waiting
1458 user = check_for_connection(universe.listening_socket)
1460 universe.userlist.append(user)
1462 # iterate over the connected users
1463 for user in universe.userlist:
1466 # pause for a configurable amount of time (decimal seconds)
1467 time.sleep(universe.contents["mudpy.timing"].get("increment"))
1471 """Reload all relevant objects."""
1473 old_userlist = universe.userlist[:]
1474 old_loglines = universe.loglines[:]
1475 for element in list(universe.contents.values()):
1477 pending_loglines = universe.load()
1478 new_loglines = universe.loglines[:]
1479 universe.loglines = old_loglines + new_loglines + pending_loglines
1480 for user in old_userlist:
1484 def check_for_connection(listening_socket):
1485 """Check for a waiting connection and return a new user object."""
1487 # try to accept a new connection
1489 connection, address = listening_socket.accept()
1490 except BlockingIOError:
1493 # note that we got one
1494 log("New connection from %s." % address[0], 2)
1496 # disable blocking so we can proceed whether or not we can send/receive
1497 connection.setblocking(0)
1499 # create a new user object
1501 log("Instantiated %s for %s." % (user, address[0]), 0)
1503 # associate this connection with it
1504 user.connection = connection
1506 # set the user's ipa from the connection's ipa
1507 user.address = address[0]
1509 # let the client know we WILL EOR (RFC 885)
1510 mudpy.telnet.enable(user, mudpy.telnet.TELOPT_EOR, mudpy.telnet.US)
1511 user.negotiation_pause = 2
1513 # return the new user object
1517 def find_command(command_name):
1518 """Try to find a command by name or abbreviation."""
1520 # lowercase the command
1521 command_name = command_name.lower()
1524 if command_name in universe.groups["command"]:
1525 # the command matches a command word for which we have data
1526 command = universe.groups["command"][command_name]
1528 for candidate in sorted(universe.groups["command"]):
1529 if candidate.startswith(command_name) and not universe.groups[
1530 "command"][candidate].is_restricted():
1531 # the command matches the start of a command word and is not
1532 # restricted to administrators
1533 command = universe.groups["command"][candidate]
1538 def get_menu(state, error=None, choices=None):
1539 """Show the correct menu text to a user."""
1541 # make sure we don't reuse a mutable sequence by default
1545 # get the description or error text
1546 message = get_menu_description(state, error)
1548 # get menu choices for the current state
1549 message += get_formatted_menu_choices(state, choices)
1551 # try to get a prompt, if it was defined
1552 message += get_menu_prompt(state)
1554 # throw in the default choice, if it exists
1555 message += get_formatted_default_menu_choice(state)
1557 # display a message indicating if echo is off
1558 message += get_echo_message(state)
1560 # return the assembly of various strings defined above
1564 def menu_echo_on(state):
1565 """True if echo is on, false if it is off."""
1566 return universe.groups["menu"][state].get("echo", True)
1569 def get_echo_message(state):
1570 """Return a message indicating that echo is off."""
1571 if menu_echo_on(state):
1574 return "(won't echo) "
1577 def get_default_menu_choice(state):
1578 """Return the default choice for a menu."""
1579 return universe.groups["menu"][state].get("default")
1582 def get_formatted_default_menu_choice(state):
1583 """Default menu choice foratted for inclusion in a prompt string."""
1584 default_choice = get_default_menu_choice(state)
1586 return "[$(red)" + default_choice + "$(nrm)] "
1591 def get_menu_description(state, error):
1592 """Get the description or error text."""
1594 # an error condition was raised by the handler
1597 # try to get an error message matching the condition
1599 description = universe.groups[
1600 "menu"][state].get("error_" + error)
1602 description = "That is not a valid choice..."
1603 description = "$(red)" + description + "$(nrm)"
1605 # there was no error condition
1608 # try to get a menu description for the current state
1609 description = universe.groups["menu"][state].get("description")
1611 # return the description or error message
1613 description += "$(eol)$(eol)"
1617 def get_menu_prompt(state):
1618 """Try to get a prompt, if it was defined."""
1619 prompt = universe.groups["menu"][state].get("prompt")
1625 def get_menu_choices(user):
1626 """Return a dict of choice:meaning."""
1627 state = universe.groups["menu"][user.state]
1628 create_choices = state.get("create")
1630 choices = call_hook_function(create_choices, (user,))
1636 for facet in state.facets():
1637 if facet.startswith("demand_") and not call_hook_function(
1638 universe.groups["menu"][user.state].get(facet), (user,)):
1639 ignores.append(facet.split("_", 2)[1])
1640 elif facet.startswith("create_"):
1641 creates[facet] = facet.split("_", 2)[1]
1642 elif facet.startswith("choice_"):
1643 options[facet] = facet.split("_", 2)[1]
1644 for facet in creates.keys():
1645 if not creates[facet] in ignores:
1646 choices[creates[facet]] = call_hook_function(
1647 state.get(facet), (user,))
1648 for facet in options.keys():
1649 if not options[facet] in ignores:
1650 choices[options[facet]] = state.get(facet)
1654 def get_formatted_menu_choices(state, choices):
1655 """Returns a formatted string of menu choices."""
1657 choice_keys = list(choices.keys())
1659 for choice in choice_keys:
1660 choice_output += " [$(red)" + choice + "$(nrm)] " + choices[
1664 choice_output += "$(eol)"
1665 return choice_output
1668 def get_menu_branches(state):
1669 """Return a dict of choice:branch."""
1671 for facet in universe.groups["menu"][state].facets():
1672 if facet.startswith("branch_"):
1674 facet.split("_", 2)[1]
1675 ] = universe.groups["menu"][state].get(facet)
1679 def get_default_branch(state):
1680 """Return the default branch."""
1681 return universe.groups["menu"][state].get("branch")
1684 def get_choice_branch(user):
1685 """Returns the new state matching the given choice."""
1686 branches = get_menu_branches(user.state)
1687 if user.choice in branches.keys():
1688 return branches[user.choice]
1689 elif user.choice in user.menu_choices.keys():
1690 return get_default_branch(user.state)
1695 def get_menu_actions(state):
1696 """Return a dict of choice:branch."""
1698 for facet in universe.groups["menu"][state].facets():
1699 if facet.startswith("action_"):
1701 facet.split("_", 2)[1]
1702 ] = universe.groups["menu"][state].get(facet)
1706 def get_default_action(state):
1707 """Return the default action."""
1708 return universe.groups["menu"][state].get("action")
1711 def get_choice_action(user):
1712 """Run any indicated script for the given choice."""
1713 actions = get_menu_actions(user.state)
1714 if user.choice in actions.keys():
1715 return actions[user.choice]
1716 elif user.choice in user.menu_choices.keys():
1717 return get_default_action(user.state)
1722 def call_hook_function(fname, arglist):
1723 """Safely execute named function with supplied arguments, return result."""
1725 # all functions relative to mudpy package
1728 for component in fname.split("."):
1730 function = getattr(function, component)
1731 except AttributeError:
1732 log('Could not find mudpy.%s() for arguments "%s"'
1733 % (fname, arglist), 7)
1738 return function(*arglist)
1740 log('Calling mudpy.%s(%s) raised an exception...\n%s'
1741 % (fname, (*arglist,), traceback.format_exc()), 7)
1744 def handle_user_input(user):
1745 """The main handler, branches to a state-specific handler."""
1747 # if the user's client echo is off, send a blank line for aesthetics
1748 if mudpy.telnet.is_enabled(user, mudpy.telnet.TELOPT_ECHO,
1750 user.send("", add_prompt=False, prepend_padding=False)
1752 # check to make sure the state is expected, then call that handler
1754 globals()["handler_" + user.state](user)
1756 generic_menu_handler(user)
1758 # since we got input, flag that the menu/prompt needs to be redisplayed
1759 user.menu_seen = False
1761 # update the last_input timestamp while we're at it
1762 user.last_input = universe.get_time()
1765 def generic_menu_handler(user):
1766 """A generic menu choice handler."""
1768 # get a lower-case representation of the next line of input
1769 if user.input_queue:
1770 user.choice = user.input_queue.pop(0)
1772 user.choice = user.choice.lower()
1776 user.choice = get_default_menu_choice(user.state)
1777 if user.choice in user.menu_choices:
1778 action = get_choice_action(user)
1780 call_hook_function(action, (user,))
1781 new_state = get_choice_branch(user)
1783 user.state = new_state
1785 user.error = "default"
1788 def handler_entering_account_name(user):
1789 """Handle the login account name."""
1791 # get the next waiting line of input
1792 input_data = user.input_queue.pop(0)
1794 # did the user enter anything?
1797 # keep only the first word and convert to lower-case
1798 name = input_data.lower()
1800 # fail if there are non-alphanumeric characters
1801 if name != "".join(filter(
1802 lambda x: x >= "0" and x <= "9" or x >= "a" and x <= "z",
1804 user.error = "bad_name"
1806 # if that account exists, time to request a password
1807 elif name in universe.groups.get("account", {}):
1808 user.account = universe.groups["account"][name]
1809 user.state = "checking_password"
1811 # otherwise, this could be a brand new user
1813 user.account = Element("account.%s" % name, universe)
1814 user.account.set("name", name)
1815 log("New user: " + name, 2)
1816 user.state = "checking_new_account_name"
1818 # if the user entered nothing for a name, then buhbye
1820 user.state = "disconnecting"
1823 def handler_checking_password(user):
1824 """Handle the login account password."""
1826 # get the next waiting line of input
1827 input_data = user.input_queue.pop(0)
1829 if "mudpy.limit" in universe.contents:
1830 max_password_tries = universe.contents["mudpy.limit"].get(
1831 "password_tries", 3)
1833 max_password_tries = 3
1835 # does the hashed input equal the stored hash?
1836 if mudpy.password.verify(input_data, user.account.get("passhash")):
1838 # if so, set the username and load from cold storage
1839 if not user.replace_old_connections():
1841 user.state = "main_utility"
1843 # if at first your hashes don't match, try, try again
1844 elif user.password_tries < max_password_tries - 1:
1845 user.password_tries += 1
1846 user.error = "incorrect"
1848 # we've exceeded the maximum number of password failures, so disconnect
1851 "$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)"
1853 user.state = "disconnecting"
1856 def handler_entering_new_password(user):
1857 """Handle a new password entry."""
1859 # get the next waiting line of input
1860 input_data = user.input_queue.pop(0)
1862 if "mudpy.limit" in universe.contents:
1863 max_password_tries = universe.contents["mudpy.limit"].get(
1864 "password_tries", 3)
1866 max_password_tries = 3
1868 # make sure the password is strong--at least one upper, one lower and
1869 # one digit, seven or more characters in length
1870 if len(input_data) > 6 and len(
1871 list(filter(lambda x: x >= "0" and x <= "9", input_data))
1873 list(filter(lambda x: x >= "A" and x <= "Z", input_data))
1875 list(filter(lambda x: x >= "a" and x <= "z", input_data))
1878 # hash and store it, then move on to verification
1879 user.account.set("passhash", mudpy.password.create(input_data))
1880 user.state = "verifying_new_password"
1882 # the password was weak, try again if you haven't tried too many times
1883 elif user.password_tries < max_password_tries - 1:
1884 user.password_tries += 1
1887 # too many tries, so adios
1890 "$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)"
1892 user.account.destroy()
1893 user.state = "disconnecting"
1896 def handler_verifying_new_password(user):
1897 """Handle the re-entered new password for verification."""
1899 # get the next waiting line of input
1900 input_data = user.input_queue.pop(0)
1902 if "mudpy.limit" in universe.contents:
1903 max_password_tries = universe.contents["mudpy.limit"].get(
1904 "password_tries", 3)
1906 max_password_tries = 3
1908 # hash the input and match it to storage
1909 if mudpy.password.verify(input_data, user.account.get("passhash")):
1912 # the hashes matched, so go active
1913 if not user.replace_old_connections():
1914 user.state = "main_utility"
1916 # go back to entering the new password as long as you haven't tried
1918 elif user.password_tries < max_password_tries - 1:
1919 user.password_tries += 1
1920 user.error = "differs"
1921 user.state = "entering_new_password"
1923 # otherwise, sayonara
1926 "$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)"
1928 user.account.destroy()
1929 user.state = "disconnecting"
1932 def handler_active(user):
1933 """Handle input for active users."""
1935 # get the next waiting line of input
1936 input_data = user.input_queue.pop(0)
1941 # split out the command and parameters
1943 mode = actor.get("mode")
1944 if mode and input_data.startswith("!"):
1945 command_name, parameters = first_word(input_data[1:])
1946 elif mode == "chat":
1947 command_name = "say"
1948 parameters = input_data
1950 command_name, parameters = first_word(input_data)
1952 # expand to an actual command
1953 command = find_command(command_name)
1955 # if it's allowed, do it
1957 if actor.can_run(command):
1958 action_fname = command.get("action", command.key)
1960 result = call_hook_function(action_fname, (actor, parameters))
1962 # if the command was not run, give an error
1964 mudpy.command.error(actor, input_data)
1966 # if no input, just idle back with a prompt
1968 user.send("", just_prompt=True)
1971 def daemonize(universe):
1972 """Fork and disassociate from everything."""
1974 # only if this is what we're configured to do
1975 if "mudpy.process" in universe.contents and universe.contents[
1976 "mudpy.process"].get("daemon"):
1978 # log before we start forking around, so the terminal gets the message
1979 log("Disassociating from the controlling terminal.")
1981 # fork off and die, so we free up the controlling terminal
1985 # switch to a new process group
1988 # fork some more, this time to free us from the old process group
1992 # reset the working directory so we don't needlessly tie up mounts
1995 # clear the file creation mask so we can bend it to our will later
1998 # redirect stdin/stdout/stderr and close off their former descriptors
1999 for stdpipe in range(3):
2001 sys.stdin = codecs.open("/dev/null", "r", "utf-8")
2002 sys.__stdin__ = codecs.open("/dev/null", "r", "utf-8")
2003 sys.stdout = codecs.open("/dev/null", "w", "utf-8")
2004 sys.stderr = codecs.open("/dev/null", "w", "utf-8")
2005 sys.__stdout__ = codecs.open("/dev/null", "w", "utf-8")
2006 sys.__stderr__ = codecs.open("/dev/null", "w", "utf-8")
2009 def create_pidfile(universe):
2010 """Write a file containing the current process ID."""
2011 pid = str(os.getpid())
2012 log("Process ID: " + pid)
2013 if "mudpy.process" in universe.contents:
2014 file_name = universe.contents["mudpy.process"].get("pidfile", "")
2018 if not os.path.isabs(file_name):
2019 file_name = os.path.join(universe.startdir, file_name)
2020 os.makedirs(os.path.dirname(file_name), exist_ok=True)
2021 file_descriptor = codecs.open(file_name, "w", "utf-8")
2022 file_descriptor.write(pid + "\n")
2023 file_descriptor.flush()
2024 file_descriptor.close()
2027 def remove_pidfile(universe):
2028 """Remove the file containing the current process ID."""
2029 if "mudpy.process" in universe.contents:
2030 file_name = universe.contents["mudpy.process"].get("pidfile", "")
2034 if not os.path.isabs(file_name):
2035 file_name = os.path.join(universe.startdir, file_name)
2036 if os.access(file_name, os.W_OK):
2037 os.remove(file_name)
2040 def excepthook(excepttype, value, tracebackdata):
2041 """Handle uncaught exceptions."""
2043 # assemble the list of errors into a single string
2045 traceback.format_exception(excepttype, value, tracebackdata)
2048 # try to log it, if possible
2051 except Exception as e:
2052 # try to write it to stderr, if possible
2053 sys.stderr.write(message + "\nException while logging...\n%s" % e)
2056 def sighook(what, where):
2057 """Handle external signals."""
2060 message = "Caught signal: "
2062 # for a hangup signal
2063 if what == signal.SIGHUP:
2064 message += "hangup (reloading)"
2065 universe.reload_flag = True
2067 # for a terminate signal
2068 elif what == signal.SIGTERM:
2069 message += "terminate (halting)"
2070 universe.terminate_flag = True
2072 # catchall for unexpected signals
2074 message += str(what) + " (unhandled)"
2080 def override_excepthook():
2081 """Redefine sys.excepthook with our own."""
2082 sys.excepthook = excepthook
2085 def assign_sighook():
2086 """Assign a customized handler for some signals."""
2087 signal.signal(signal.SIGHUP, sighook)
2088 signal.signal(signal.SIGTERM, sighook)
2092 """This contains functions to be performed when starting the engine."""
2094 # see if a configuration file was specified
2095 if len(sys.argv) > 1:
2096 conffile = sys.argv[1]
2102 universe = Universe(conffile, True)
2104 # report any loglines which accumulated during setup
2105 for logline in universe.setup_loglines:
2107 universe.setup_loglines = []
2109 # fork and disassociate
2112 # override the default exception handler so we get logging first thing
2113 override_excepthook()
2115 # set up custom signal handlers
2119 create_pidfile(universe)
2121 # load and store diagnostic info
2122 universe.versions = mudpy.version.Versions("mudpy")
2124 # log startup diagnostic messages
2125 log("On %s at %s" % (universe.versions.python_version, sys.executable), 1)
2126 log("Import path: %s" % ", ".join(sys.path), 1)
2127 log("Installed dependencies: %s" % universe.versions.dependencies_text, 1)
2128 log("Other python packages: %s" % universe.versions.environment_text, 1)
2129 log("Running version: %s" % universe.versions.version, 1)
2130 log("Initial directory: %s" % universe.startdir, 1)
2131 log("Command line: %s" % " ".join(sys.argv), 1)
2133 # pass the initialized universe back
2138 """These are functions performed when shutting down the engine."""
2140 # the loop has terminated, so save persistent data
2143 # log a final message
2144 log("Shutting down now.")
2146 # get rid of the pidfile
2147 remove_pidfile(universe)