1 """Miscellaneous functions for the mudpy engine."""
3 # Copyright (c) 2004-2020 mudpy authors. Permission to use, copy,
4 # modify, and distribute this software is granted under terms
5 # provided in the LICENSE file distributed with this software.
25 """An element of the universe."""
27 def __init__(self, key, universe, origin=None):
28 """Set up a new element."""
30 # keep track of our key name
33 # keep track of what universe it's loading into
34 self.universe = universe
36 # set of facet keys from the universe
37 self.facethash = dict()
39 # not owned by a user by default (used for avatars)
42 # no contents in here by default
45 if self.key.find(".") > 0:
46 self.group, self.subkey = self.key.split(".")[-2:]
49 self.subkey = self.key
50 if self.group not in self.universe.groups:
51 self.universe.groups[self.group] = {}
53 # get an appropriate origin
55 self.universe.add_group(self.group)
56 origin = self.universe.files[
57 self.universe.origins[self.group]["fallback"]]
59 # record or reset a pointer to the origin file
60 self.origin = self.universe.files[origin.source]
62 # add or replace this element in the universe
63 self.universe.contents[self.key] = self
64 self.universe.groups[self.group][self.subkey] = self
67 """Create a new element and replace this one."""
68 args = (self.key, self.universe, self.origin)
73 """Remove an element from the universe and destroy it."""
74 for facet in dict(self.facethash):
75 self.remove_facet(facet)
76 del self.universe.groups[self.group][self.subkey]
77 del self.universe.contents[self.key]
81 """Return a list of non-inherited facets for this element."""
84 def has_facet(self, facet):
85 """Return whether the non-inherited facet exists."""
86 return facet in self.facets()
88 def remove_facet(self, facet):
89 """Remove a facet from the element."""
90 if ".".join((self.key, facet)) in self.origin.data:
91 del self.origin.data[".".join((self.key, facet))]
92 if facet in self.facethash:
93 del self.facethash[facet]
94 self.origin.modified = True
97 """Return a list of the element's inheritance lineage."""
98 if self.has_facet("inherit"):
99 ancestry = self.get("inherit")
102 for parent in ancestry[:]:
103 ancestors = self.universe.contents[parent].ancestry()
104 for ancestor in ancestors:
105 if ancestor not in ancestry:
106 ancestry.append(ancestor)
111 def get(self, facet, default=None):
112 """Retrieve values."""
116 return self.origin.data[".".join((self.key, facet))]
117 except (KeyError, TypeError):
119 if self.has_facet("inherit"):
120 for ancestor in self.ancestry():
121 if self.universe.contents[ancestor].has_facet(facet):
122 return self.universe.contents[ancestor].get(facet)
126 def set(self, facet, value):
128 if not self.origin.is_writeable() and not self.universe.loading:
129 # break if there is an attempt to update an element from a
130 # read-only file, unless the universe is in the midst of loading
131 # updated data from files
132 raise PermissionError("Altering elements in read-only files is "
134 # Coerce some values to appropriate data types
135 # TODO(fungi) Move these to a separate validation mechanism
136 if facet in ["loglevel"]:
138 elif facet in ["administrator"]:
141 # The canonical node for this facet within its origin
142 node = ".".join((self.key, facet))
144 if node not in self.origin.data or self.origin.data[node] != value:
145 # Be careful to only update the origin's contents when required,
146 # since that affects whether the backing file gets written
147 self.origin.data[node] = value
148 self.origin.modified = True
150 # Make sure this facet is included in the element's facets
151 self.facethash[facet] = self.origin.data[node]
153 def append(self, facet, value):
154 """Append value to a list."""
155 newlist = self.get(facet)
158 if type(newlist) is not list:
159 newlist = list(newlist)
160 newlist.append(value)
161 self.set(facet, newlist)
171 add_terminator=False,
174 """Convenience method to pass messages to an owner."""
187 def is_restricted(self):
188 """Boolean check whether command is administrative or debugging."""
190 self.get("administrative", False) or self.get("debugging", False))
193 """Boolean check whether an actor is controlled by an admin owner."""
194 return(self.owner and self.owner.is_admin())
196 def can_run(self, command):
197 """Check if the user can run this command object."""
199 # has to be in the commands group
200 if command not in self.universe.groups["command"].values():
203 # debugging commands are not allowed outside debug mode
204 if command.get("debugging") and not self.universe.debug_mode():
207 # avatars of administrators can run any command
211 # everyone can run non-administrative commands
212 if not command.is_restricted():
215 # otherwise the command cannot be run by this actor
218 def update_location(self):
219 """Make sure the location's contents contain this element."""
220 area = self.get("location")
221 if area in self.universe.contents:
222 self.universe.contents[area].contents[self.key] = self
224 def clean_contents(self):
225 """Make sure the element's contents aren't bogus."""
226 for element in self.contents.values():
227 if element.get("location") != self.key:
228 del self.contents[element.key]
230 def go_to(self, area):
231 """Relocate the element to a specific area."""
232 current = self.get("location")
233 if current and self.key in self.universe.contents[current].contents:
234 del universe.contents[current].contents[self.key]
235 if area in self.universe.contents:
236 self.set("location", area)
237 self.universe.contents[area].contents[self.key] = self
241 """Relocate the element to its default location."""
242 self.go_to(self.get("default_location"))
243 self.echo_to_location(
244 "You suddenly realize that " + self.get("name") + " is here."
247 def move_direction(self, direction):
248 """Relocate the element in a specified direction."""
249 motion = self.universe.contents["mudpy.movement.%s" % direction]
250 enter_term = motion.get("enter_term")
251 exit_term = motion.get("exit_term")
252 self.echo_to_location("%s exits %s." % (self.get("name"), exit_term))
253 self.send("You exit %s." % exit_term, add_prompt=False)
255 self.universe.contents[
256 self.get("location")].link_neighbor(direction)
258 self.echo_to_location("%s arrives from %s." % (
259 self.get("name"), enter_term))
261 def look_at(self, key):
262 """Show an element to another element."""
264 element = self.universe.contents[key]
266 name = element.get("name")
268 message += "$(cyn)" + name + "$(nrm)$(eol)"
269 description = element.get("description")
271 message += description + "$(eol)"
272 portal_list = list(element.portals().keys())
275 message += "$(cyn)[ Exits: " + ", ".join(
278 for element in self.universe.contents[
281 if element.get("is_actor") and element is not self:
282 message += "$(yel)" + element.get(
284 ) + " is here.$(nrm)$(eol)"
285 elif element is not self:
286 message += "$(grn)" + element.get(
292 """Map the portal directions for an area to neighbors."""
294 if re.match(r"""^area\.-?\d+,-?\d+,-?\d+$""", self.key):
295 coordinates = [(int(x))
296 for x in self.key.split(".")[-1].split(",")]
299 self.universe.contents["mudpy.movement.%s" % x].get("vector")
300 ) for x in self.universe.directions)
301 for portal in self.get("gridlinks"):
302 adjacent = map(lambda c, o: c + o,
303 coordinates, offsets[portal])
304 neighbor = "area." + ",".join(
305 [(str(x)) for x in adjacent]
307 if neighbor in self.universe.contents:
308 portals[portal] = neighbor
309 for facet in self.facets():
310 if facet.startswith("link_"):
311 neighbor = self.get(facet)
312 if neighbor in self.universe.contents:
313 portal = facet.split("_")[1]
314 portals[portal] = neighbor
317 def link_neighbor(self, direction):
318 """Return the element linked in a given direction."""
319 portals = self.portals()
320 if direction in portals:
321 return portals[direction]
323 def echo_to_location(self, message):
324 """Show a message to other elements in the current location."""
325 for element in self.universe.contents[
328 if element is not self:
329 element.send(message)
336 def __init__(self, filename="", load=False):
337 """Initialize the universe."""
340 self.directions = set()
344 self.reload_flag = False
345 self.setup_loglines = []
346 self.startdir = os.getcwd()
347 self.terminate_flag = False
351 possible_filenames = [
353 "/usr/local/mudpy/etc/mudpy.yaml",
354 "/usr/local/etc/mudpy.yaml",
355 "/etc/mudpy/mudpy.yaml",
358 for filename in possible_filenames:
359 if os.access(filename, os.R_OK):
361 if not os.path.isabs(filename):
362 filename = os.path.join(self.startdir, filename)
363 self.filename = filename
365 # make sure to preserve any accumulated log entries during load
366 self.setup_loglines += self.load()
369 """Load universe data from persistent storage."""
371 # while loading, it's safe to update elements from read-only files
374 # it's possible for this to enter before logging configuration is read
375 pending_loglines = []
377 # start populating the (re)files dict from the base config
379 mudpy.data.Data(self.filename, self)
381 # load default storage locations for groups
382 if hasattr(self, "contents") and "mudpy.filing" in self.contents:
383 self.origins.update(self.contents["mudpy.filing"].get(
386 # add some builtin groups we know we'll need
387 for group in ("account", "actor", "internal"):
388 self.add_group(group)
390 # make a list of inactive avatars
391 inactive_avatars = []
392 for account in self.groups.get("account", {}).values():
393 for avatar in account.get("avatars"):
395 inactive_avatars.append(self.contents[avatar])
397 pending_loglines.append((
398 'Missing avatar "%s", possible data corruption' %
400 for user in self.userlist:
401 if user.avatar in inactive_avatars:
402 inactive_avatars.remove(user.avatar)
404 # another pass to straighten out all the element contents
405 for element in self.contents.values():
406 element.update_location()
407 element.clean_contents()
409 # done loading, so disallow updating elements from read-only files
412 return pending_loglines
415 """Create a new, empty Universe (the Big Bang)."""
416 new_universe = Universe()
417 for attribute in vars(self).keys():
418 setattr(new_universe, attribute, getattr(self, attribute))
419 new_universe.reload_flag = False
424 """Save the universe to persistent storage."""
425 for key in self.files:
426 self.files[key].save()
428 def initialize_server_socket(self):
429 """Create and open the listening socket."""
431 # need to know the local address and port number for the listener
432 host = self.contents["mudpy.network"].get("host")
433 port = self.contents["mudpy.network"].get("port")
435 # if no host was specified, bind to all local addresses (preferring
443 # figure out if this is ipv4 or v6
444 family = socket.getaddrinfo(host, port)[0][0]
445 if family is socket.AF_INET6 and not socket.has_ipv6:
446 log("No support for IPv6 address %s (use IPv4 instead)." % host)
448 # create a new stream-type socket object
449 self.listening_socket = socket.socket(family, socket.SOCK_STREAM)
451 # set the socket options to allow existing open ones to be
452 # reused (fixes a bug where the server can't bind for a minute
453 # when restarting on linux systems)
454 self.listening_socket.setsockopt(
455 socket.SOL_SOCKET, socket.SO_REUSEADDR, 1
458 # bind the socket to to our desired server ipa and port
459 self.listening_socket.bind((host, port))
461 # disable blocking so we can proceed whether or not we can
463 self.listening_socket.setblocking(0)
465 # start listening on the socket
466 self.listening_socket.listen(1)
468 # note that we're now ready for user connections
469 log("Listening for Telnet connections on %s port %s" % (
473 """Convenience method to get the elapsed time counter."""
474 return self.groups["internal"]["counters"].get("elapsed")
476 def add_group(self, group, fallback=None):
477 """Set up group tracking/metadata."""
478 if group not in self.origins:
479 self.origins[group] = {}
481 fallback = mudpy.data.find_file(
482 ".".join((group, "yaml")), universe=self)
483 if "fallback" not in self.origins[group]:
484 self.origins[group]["fallback"] = fallback
485 flags = self.origins[group].get("flags", None)
486 if fallback not in self.files:
487 mudpy.data.Data(fallback, self, flags=flags)
489 def debug_mode(self):
490 """Boolean method to indicate whether unsafe debugging is enabled."""
491 return self.groups["mudpy"]["limit"].get("debug", False)
496 """This is a connected user."""
499 """Default values for the in-memory user variables."""
502 self.authenticated = False
506 self.connection = None
508 self.input_queue = []
509 self.last_address = ""
510 self.last_input = universe.get_time()
511 self.menu_choices = {}
512 self.menu_seen = False
513 self.negotiation_pause = 0
514 self.output_queue = []
515 self.partial_input = b""
516 self.password_tries = 0
518 self.state = "telopt_negotiation"
521 self.universe = universe
524 """Log, close the connection and remove."""
526 name = self.account.get("name", self)
529 log("Logging out %s" % name, 2)
530 self.deactivate_avatar()
531 self.connection.close()
534 def check_idle(self):
535 """Warn or disconnect idle users as appropriate."""
536 idletime = universe.get_time() - self.last_input
537 linkdead_dict = universe.contents[
538 "mudpy.timing.idle.disconnect"].facets()
539 if self.state in linkdead_dict:
540 linkdead_state = self.state
542 linkdead_state = "default"
543 if idletime > linkdead_dict[linkdead_state]:
545 "$(eol)$(red)You've done nothing for far too long... goodbye!"
550 logline = "Disconnecting "
551 if self.account and self.account.get("name"):
552 logline += self.account.get("name")
554 logline += "an unknown user"
555 logline += (" after idling too long in the " + self.state
558 self.state = "disconnecting"
559 self.menu_seen = False
560 idle_dict = universe.contents["mudpy.timing.idle.warn"].facets()
561 if self.state in idle_dict:
562 idle_state = self.state
564 idle_state = "default"
565 if idletime == idle_dict[idle_state]:
567 "$(eol)$(red)If you continue to be unproductive, "
568 + "you'll be shown the door...$(nrm)$(eol)"
572 """Save, load a new user and relocate the connection."""
574 # copy old attributes
575 attributes = self.__dict__
577 # get out of the list
580 # get rid of the old user object
583 # create a new user object
586 # set everything equivalent
587 new_user.__dict__ = attributes
589 # the avatar needs a new owner
591 new_user.account = universe.contents[new_user.account.key]
592 new_user.avatar = universe.contents[new_user.avatar.key]
593 new_user.avatar.owner = new_user
596 universe.userlist.append(new_user)
598 def replace_old_connections(self):
599 """Disconnect active users with the same name."""
601 # the default return value
604 # iterate over each user in the list
605 for old_user in universe.userlist:
607 # the name is the same but it's not us
610 ) and old_user.account and old_user.account.get(
612 ) == self.account.get(
614 ) and old_user is not self:
618 "User " + self.account.get(
620 ) + " reconnected--closing old connection to "
621 + old_user.address + ".",
625 "$(eol)$(red)New connection from " + self.address
626 + ". Terminating old connection...$(nrm)$(eol)",
631 # close the old connection
632 old_user.connection.close()
634 # replace the old connection with this one
636 "$(eol)$(red)Taking over old connection from "
637 + old_user.address + ".$(nrm)"
639 old_user.connection = self.connection
640 old_user.last_address = old_user.address
641 old_user.address = self.address
643 # take this one out of the list and delete
649 # true if an old connection was replaced, false if not
652 def authenticate(self):
653 """Flag the user as authenticated and disconnect duplicates."""
654 if self.state != "authenticated":
655 self.authenticated = True
656 log("User %s authenticated for account %s." % (
657 self, self.account.subkey), 2)
658 if ("mudpy.limit" in universe.contents and self.account.subkey in
659 universe.contents["mudpy.limit"].get("admins")):
660 self.account.set("administrator", True)
661 log("Account %s is an administrator." % (
662 self.account.subkey), 2)
665 """Send the user their current menu."""
666 if not self.menu_seen:
667 self.menu_choices = get_menu_choices(self)
669 get_menu(self.state, self.error, self.menu_choices),
673 self.menu_seen = True
675 self.adjust_echoing()
678 """"Generate and return an input prompt."""
680 # Start with the user's preference, if one was provided
681 prompt = self.account.get("prompt")
683 # If the user has not set a prompt, then immediately return the default
684 # provided for the current state
686 return get_menu_prompt(self.state)
688 # Allow including the World clock state
689 if "$_(time)" in prompt:
690 prompt = prompt.replace(
692 str(universe.groups["internal"]["counters"].get("elapsed")))
694 # Append a single space for clear separation from user input
695 if prompt[-1] != " ":
696 prompt = "%s " % prompt
698 # Return the cooked prompt
701 def adjust_echoing(self):
702 """Adjust echoing to match state menu requirements."""
703 if mudpy.telnet.is_enabled(self, mudpy.telnet.TELOPT_ECHO,
705 if menu_echo_on(self.state):
706 mudpy.telnet.disable(self, mudpy.telnet.TELOPT_ECHO,
708 elif not menu_echo_on(self.state):
709 mudpy.telnet.enable(self, mudpy.telnet.TELOPT_ECHO,
713 """Remove a user from the list of connected users."""
714 log("Disconnecting account %s." % self, 0)
715 universe.userlist.remove(self)
725 add_terminator=False,
728 """Send arbitrary text to a connected user."""
730 # unless raw mode is on, clean it up all nice and pretty
733 # strip extra $(eol) off if present
734 while output.startswith("$(eol)"):
736 while output.endswith("$(eol)"):
738 extra_lines = output.find("$(eol)$(eol)$(eol)")
739 while extra_lines > -1:
740 output = output[:extra_lines] + output[extra_lines + 6:]
741 extra_lines = output.find("$(eol)$(eol)$(eol)")
743 # start with a newline, append the message, then end
744 # with the optional eol string passed to this function
745 # and the ansi escape to return to normal text
746 if not just_prompt and prepend_padding:
747 if (not self.output_queue or not
748 self.output_queue[-1].endswith(b"\r\n")):
749 output = "$(eol)" + output
750 elif not self.output_queue[-1].endswith(
752 ) and not self.output_queue[-1].endswith(
755 output = "$(eol)" + output
756 output += eol + chr(27) + "[0m"
758 # tack on a prompt if active
759 if self.state == "active":
763 output += self.prompt()
764 mode = self.avatar.get("mode")
766 output += "(" + mode + ") "
768 # find and replace macros in the output
769 output = replace_macros(self, output)
771 # wrap the text at the client's width (min 40, 0 disables)
773 if self.columns < 40:
777 output = wrap_ansi_text(output, wrap)
779 # if supported by the client, encode it utf-8
780 if mudpy.telnet.is_enabled(self, mudpy.telnet.TELOPT_BINARY,
782 encoded_output = output.encode("utf-8")
784 # otherwise just send ascii
786 encoded_output = output.encode("ascii", "replace")
788 # end with a terminator if requested
789 if add_prompt or add_terminator:
790 if mudpy.telnet.is_enabled(
791 self, mudpy.telnet.TELOPT_EOR, mudpy.telnet.US):
792 encoded_output += mudpy.telnet.telnet_proto(
793 mudpy.telnet.IAC, mudpy.telnet.EOR)
794 elif not mudpy.telnet.is_enabled(
795 self, mudpy.telnet.TELOPT_SGA, mudpy.telnet.US):
796 encoded_output += mudpy.telnet.telnet_proto(
797 mudpy.telnet.IAC, mudpy.telnet.GA)
799 # and tack it onto the queue
800 self.output_queue.append(encoded_output)
802 # if this is urgent, flush all pending output
806 # just dump raw bytes as requested
808 self.output_queue.append(output)
812 """All the things to do to the user per increment."""
814 # if the world is terminating, disconnect
815 if universe.terminate_flag:
816 self.state = "disconnecting"
817 self.menu_seen = False
819 # check for an idle connection and act appropriately
823 # ask the client for their current terminal type (RFC 1091); it's None
824 # if it's not been initialized, the empty string if it has but the
825 # output was indeterminate, "UNKNOWN" if the client specified it has no
826 # terminal types to supply
827 if self.ttype is None:
828 mudpy.telnet.request_ttype(self)
830 # if output is paused, decrement the counter
831 if self.state == "telopt_negotiation":
832 if self.negotiation_pause:
833 self.negotiation_pause -= 1
835 self.state = "entering_account_name"
837 # show the user a menu as needed
838 elif not self.state == "active":
841 # flush any pending output in the queue
844 # disconnect users with the appropriate state
845 if self.state == "disconnecting":
848 # check for input and add it to the queue
851 # there is input waiting in the queue
853 handle_user_input(self)
856 """Try to send the last item in the queue and remove it."""
857 if self.output_queue:
859 self.connection.send(self.output_queue[0])
860 except (BrokenPipeError, ConnectionResetError):
861 if self.account and self.account.get("name"):
862 account = self.account.get("name")
864 account = "an unknown user"
865 self.state = "disconnecting"
866 log("Disconnected while sending to %s." % account, 7)
867 del self.output_queue[0]
869 def enqueue_input(self):
870 """Process and enqueue any new input."""
872 # check for some input
874 raw_input = self.connection.recv(1024)
881 # tack this on to any previous partial
882 self.partial_input += raw_input
884 # reply to and remove any IAC negotiation codes
885 mudpy.telnet.negotiate_telnet_options(self)
887 # separate multiple input lines
888 new_input_lines = self.partial_input.split(b"\r\0")
889 if len(new_input_lines) == 1:
890 new_input_lines = new_input_lines[0].split(b"\r\n")
892 # if input doesn't end in a newline, replace the
893 # held partial input with the last line of it
895 self.partial_input.endswith(b"\r\0") or
896 self.partial_input.endswith(b"\r\n")):
897 self.partial_input = new_input_lines.pop()
899 # otherwise, chop off the extra null input and reset
900 # the held partial input
902 new_input_lines.pop()
903 self.partial_input = b""
905 # iterate over the remaining lines
906 for line in new_input_lines:
908 # strip off extra whitespace
911 # log non-printable characters remaining
912 if not mudpy.telnet.is_enabled(
913 self, mudpy.telnet.TELOPT_BINARY, mudpy.telnet.HIM):
914 asciiline = bytes([x for x in line if 32 <= x <= 126])
915 if line != asciiline:
916 logline = "Non-ASCII characters from "
917 if self.account and self.account.get("name"):
918 logline += self.account.get("name") + ": "
920 logline += "unknown user: "
921 logline += repr(line)
926 line = line.decode("utf-8")
927 except UnicodeDecodeError:
928 logline = "Non-UTF-8 sequence from "
929 if self.account and self.account.get("name"):
930 logline += self.account.get("name") + ": "
932 logline += "unknown user: "
933 logline += repr(line)
937 line = unicodedata.normalize("NFKC", line)
939 # put on the end of the queue
940 self.input_queue.append(line)
942 def new_avatar(self):
943 """Instantiate a new, unconfigured avatar for this user."""
945 while ("avatar_%s_%s" % (self.account.get("name"), counter)
946 in universe.groups.get("actor", {}).keys()):
948 self.avatar = Element(
949 "actor.avatar_%s_%s" % (self.account.get("name"), counter),
951 self.avatar.append("inherit", "archetype.avatar")
952 self.account.append("avatars", self.avatar.key)
953 log("Created new avatar %s for user %s." % (
954 self.avatar.key, self.account.get("name")), 0)
956 def delete_avatar(self, avatar):
957 """Remove an avatar from the world and from the user's list."""
958 if self.avatar is universe.contents[avatar]:
960 log("Deleting avatar %s for user %s." % (
961 avatar, self.account.get("name")), 0)
962 universe.contents[avatar].destroy()
963 avatars = self.account.get("avatars")
964 avatars.remove(avatar)
965 self.account.set("avatars", avatars)
967 def activate_avatar_by_index(self, index):
968 """Enter the world with a particular indexed avatar."""
969 self.avatar = universe.contents[
970 self.account.get("avatars")[index]]
971 self.avatar.owner = self
972 self.state = "active"
973 log("Activated avatar %s (%s)." % (
974 self.avatar.get("name"), self.avatar.key), 0)
975 self.avatar.go_home()
977 def deactivate_avatar(self):
978 """Have the active avatar leave the world."""
980 log("Deactivating avatar %s (%s) for user %s." % (
981 self.avatar.get("name"), self.avatar.key,
982 self.account.get("name")), 0)
983 current = self.avatar.get("location")
985 self.avatar.set("default_location", current)
986 self.avatar.echo_to_location(
987 "You suddenly wonder where " + self.avatar.get(
991 del universe.contents[current].contents[self.avatar.key]
992 self.avatar.remove_facet("location")
993 self.avatar.owner = None
997 """Destroy the user and associated avatars."""
998 for avatar in self.account.get("avatars"):
999 self.delete_avatar(avatar)
1000 log("Destroying account %s for user %s." % (
1001 self.account.get("name"), self), 0)
1002 self.account.destroy()
1004 def list_avatar_names(self):
1005 """List names of assigned avatars."""
1007 for avatar in self.account.get("avatars"):
1009 avatars.append(universe.contents[avatar].get("name"))
1011 log('Missing avatar "%s", possible data corruption.' %
1016 """Boolean check whether user's account is an admin."""
1017 return(self.account.get("administrator", False))
1020 def broadcast(message, add_prompt=True):
1021 """Send a message to all connected users."""
1022 for each_user in universe.userlist:
1023 each_user.send("$(eol)" + message, add_prompt=add_prompt)
1026 def log(message, level=0):
1027 """Log a message."""
1029 # a couple references we need
1030 if "mudpy.log" in universe.contents:
1031 file_name = universe.contents["mudpy.log"].get("file", "")
1032 max_log_lines = universe.contents["mudpy.log"].get("lines", 0)
1033 syslog_name = universe.contents["mudpy.log"].get("syslog", "")
1038 timestamp = datetime.datetime.now().isoformat(' ')
1040 # turn the message into a list of nonempty lines
1041 lines = [x for x in [(x.rstrip()) for x in message.split("\n")] if x != ""]
1043 # send the timestamp and line to a file
1045 if not os.path.isabs(file_name):
1046 file_name = os.path.join(universe.startdir, file_name)
1047 os.makedirs(os.path.dirname(file_name), exist_ok=True)
1048 file_descriptor = codecs.open(file_name, "a", "utf-8")
1050 file_descriptor.write(timestamp + " " + line + "\n")
1051 file_descriptor.flush()
1052 file_descriptor.close()
1054 # send the timestamp and line to standard output
1055 if ("mudpy.log" in universe.contents and
1056 universe.contents["mudpy.log"].get("stdout")):
1058 print(timestamp + " " + line)
1060 # send the line to the system log
1063 syslog_name.encode("utf-8"),
1065 syslog.LOG_INFO | syslog.LOG_DAEMON
1071 # display to connected administrators
1072 for user in universe.userlist:
1074 user.state == "active"
1076 and user.account.get("loglevel", 0) <= level):
1077 # iterate over every line in the message
1081 "$(bld)$(red)" + timestamp + " "
1082 + line.replace("$(", "$_(") + "$(nrm)$(eol)")
1083 user.send(full_message, flush=True)
1085 # add to the recent log list
1087 while 0 < len(universe.loglines) >= max_log_lines:
1088 del universe.loglines[0]
1089 universe.loglines.append((level, timestamp + " " + line))
1092 def get_loglines(level, start, stop):
1093 """Return a specific range of loglines filtered by level."""
1095 # filter the log lines
1096 loglines = [x for x in universe.loglines if x[0] >= level]
1098 # we need these in several places
1099 total_count = str(len(universe.loglines))
1100 filtered_count = len(loglines)
1102 # don't proceed if there are no lines
1105 # can't start before the beginning or at the end
1106 if start > filtered_count:
1107 start = filtered_count
1111 # can't stop before we start
1118 message = "There are " + str(total_count)
1119 message += " log lines in memory and " + str(filtered_count)
1120 message += " at or above level " + str(level) + "."
1121 message += " The matching lines from " + str(stop) + " to "
1122 message += str(start) + " are:$(eol)$(eol)"
1124 # add the text from the selected lines
1126 range_lines = loglines[-start:-(stop - 1)]
1128 range_lines = loglines[-start:]
1129 for line in range_lines:
1130 message += " (" + str(line[0]) + ") " + line[1].replace(
1134 # there were no lines
1136 message = "None of the " + str(total_count)
1137 message += " lines in memory matches your request."
1143 def glyph_columns(character):
1144 """Convenience function to return the column width of a glyph."""
1145 if unicodedata.east_asian_width(character) in "FW":
1151 def wrap_ansi_text(text, width):
1152 """Wrap text with arbitrary width while ignoring ANSI colors."""
1154 # the current position in the entire text string, including all
1155 # characters, printable or otherwise
1158 # the current text position relative to the beginning of the line,
1159 # ignoring color escape sequences
1162 # the absolute and relative positions of the most recent whitespace
1164 last_abs_whitespace = 0
1165 last_rel_whitespace = 0
1167 # whether the current character is part of a color escape sequence
1170 # normalize any potentially composited unicode before we count it
1171 text = unicodedata.normalize("NFKC", text)
1173 # iterate over each character from the beginning of the text
1174 for each_character in text:
1176 # the current character is the escape character
1177 if each_character == "\x1b" and not escape:
1181 # the current character is within an escape sequence
1184 if each_character == "m":
1185 # the current character is m, which terminates the
1189 # the current character is a space
1190 elif each_character == " ":
1191 last_abs_whitespace = abs_pos
1192 last_rel_whitespace = rel_pos
1194 # the current character is a newline, so reset the relative
1195 # position too (start a new line)
1196 elif each_character == "\n":
1198 last_abs_whitespace = abs_pos
1199 last_rel_whitespace = rel_pos
1201 # the current character meets the requested maximum line width, so we
1202 # need to wrap unless the current word is wider than the terminal (in
1203 # which case we let it do the wrapping instead)
1204 if last_rel_whitespace != 0 and (rel_pos > width or (
1205 rel_pos > width - 1 and glyph_columns(each_character) == 2)):
1207 # insert an eol in place of the last space
1208 text = (text[:last_abs_whitespace] + "\r\n" +
1209 text[last_abs_whitespace + 1:])
1211 # increase the absolute position because an eol is two
1212 # characters but the space it replaced was only one
1215 # now we're at the beginning of a new line, plus the
1216 # number of characters wrapped from the previous line
1217 rel_pos -= last_rel_whitespace
1218 last_rel_whitespace = 0
1220 # as long as the character is not a carriage return and the
1221 # other above conditions haven't been met, count it as a
1222 # printable character
1223 elif each_character != "\r":
1224 rel_pos += glyph_columns(each_character)
1225 if each_character in (" ", "\n"):
1226 last_abs_whitespace = abs_pos
1227 last_rel_whitespace = rel_pos
1229 # increase the absolute position for every character
1232 # return the newly-wrapped text
1236 def weighted_choice(data):
1237 """Takes a dict weighted by value and returns a random key."""
1239 # this will hold our expanded list of keys from the data
1242 # create the expanded list of keys
1243 for key in data.keys():
1244 for _count in range(data[key]):
1245 expanded.append(key)
1247 # return one at random
1248 # Allow the random.randrange() call in bandit since it's not used for
1249 # security/cryptographic purposes
1250 return random.choice(expanded) # nosec
1254 """Returns a random character name."""
1256 # the vowels and consonants needed to create romaji syllables
1285 # this dict will hold our weighted list of syllables
1288 # generate the list with an even weighting
1289 for consonant in consonants:
1290 for vowel in vowels:
1291 syllables[consonant + vowel] = 1
1293 # we'll build the name into this string
1296 # create a name of random length from the syllables
1297 # Allow the random.randrange() call in bandit since it's not used for
1298 # security/cryptographic purposes
1299 for _syllable in range(random.randrange(2, 6)): # nosec
1300 name += weighted_choice(syllables)
1302 # strip any leading quotemark, capitalize and return the name
1303 return name.strip("'").capitalize()
1306 def replace_macros(user, text, is_input=False):
1307 """Replaces macros in text output."""
1309 # third person pronouns
1311 "female": {"obj": "her", "pos": "hers", "sub": "she"},
1312 "male": {"obj": "him", "pos": "his", "sub": "he"},
1313 "neuter": {"obj": "it", "pos": "its", "sub": "it"}
1316 # a dict of replacement macros
1319 "bld": chr(27) + "[1m",
1320 "nrm": chr(27) + "[0m",
1321 "blk": chr(27) + "[30m",
1322 "blu": chr(27) + "[34m",
1323 "cyn": chr(27) + "[36m",
1324 "grn": chr(27) + "[32m",
1325 "mgt": chr(27) + "[35m",
1326 "red": chr(27) + "[31m",
1327 "yel": chr(27) + "[33m",
1330 # add dynamic macros where possible
1332 account_name = user.account.get("name")
1334 macros["account"] = account_name
1336 avatar_gender = user.avatar.get("gender")
1338 macros["tpop"] = pronouns[avatar_gender]["obj"]
1339 macros["tppp"] = pronouns[avatar_gender]["pos"]
1340 macros["tpsp"] = pronouns[avatar_gender]["sub"]
1345 # find and replace per the macros dict
1346 macro_start = text.find("$(")
1347 if macro_start == -1:
1349 macro_end = text.find(")", macro_start) + 1
1350 macro = text[macro_start + 2:macro_end - 1]
1351 if macro in macros.keys():
1352 replacement = macros[macro]
1354 # this is how we handle local file inclusion (dangerous!)
1355 elif macro.startswith("inc:"):
1356 incfile = mudpy.data.find_file(macro[4:], universe=universe)
1357 if os.path.exists(incfile):
1358 incfd = codecs.open(incfile, "r", "utf-8")
1361 if line.endswith("\n") and not line.endswith("\r\n"):
1362 line = line.replace("\n", "\r\n")
1364 # lose the trailing eol
1365 replacement = replacement[:-2]
1368 log("Couldn't read included " + incfile + " file.", 7)
1370 # if we get here, log and replace it with null
1374 log("Unexpected replacement macro " +
1375 macro + " encountered.", 6)
1377 # and now we act on the replacement
1378 text = text.replace("$(" + macro + ")", replacement)
1380 # replace the look-like-a-macro sequence
1381 text = text.replace("$_(", "$(")
1386 def escape_macros(value):
1387 """Escapes replacement macros in text."""
1388 if type(value) is str:
1389 return value.replace("$(", "$_(")
1394 def first_word(text, separator=" "):
1395 """Returns a tuple of the first word and the rest."""
1397 if text.find(separator) > 0:
1398 return text.split(separator, 1)
1406 """The things which should happen on each pulse, aside from reloads."""
1408 # open the listening socket if it hasn't been already
1409 if not hasattr(universe, "listening_socket"):
1410 universe.initialize_server_socket()
1412 # assign a user if a new connection is waiting
1413 user = check_for_connection(universe.listening_socket)
1415 universe.userlist.append(user)
1417 # iterate over the connected users
1418 for user in universe.userlist:
1421 # add an element for counters if it doesn't exist
1422 if "counters" not in universe.groups.get("internal", {}):
1423 Element("internal.counters", universe)
1425 # update the log every now and then
1426 if not universe.groups["internal"]["counters"].get("mark"):
1427 log(str(len(universe.userlist)) + " connection(s)")
1428 universe.groups["internal"]["counters"].set(
1429 "mark", universe.contents["mudpy.timing"].get("status")
1432 universe.groups["internal"]["counters"].set(
1433 "mark", universe.groups["internal"]["counters"].get(
1438 # periodically save everything
1439 if not universe.groups["internal"]["counters"].get("save"):
1441 universe.groups["internal"]["counters"].set(
1442 "save", universe.contents["mudpy.timing"].get("save")
1445 universe.groups["internal"]["counters"].set(
1446 "save", universe.groups["internal"]["counters"].get(
1451 # pause for a configurable amount of time (decimal seconds)
1452 time.sleep(universe.contents["mudpy.timing"].get("increment"))
1454 # increase the elapsed increment counter
1455 universe.groups["internal"]["counters"].set(
1456 "elapsed", universe.groups["internal"]["counters"].get(
1463 """Reload all relevant objects."""
1465 old_userlist = universe.userlist[:]
1466 old_loglines = universe.loglines[:]
1467 for element in list(universe.contents.values()):
1470 new_loglines = universe.loglines[:]
1471 universe.loglines = old_loglines + new_loglines
1472 for user in old_userlist:
1476 def check_for_connection(listening_socket):
1477 """Check for a waiting connection and return a new user object."""
1479 # try to accept a new connection
1481 connection, address = listening_socket.accept()
1482 except BlockingIOError:
1485 # note that we got one
1486 log("New connection from %s." % address[0], 2)
1488 # disable blocking so we can proceed whether or not we can send/receive
1489 connection.setblocking(0)
1491 # create a new user object
1493 log("Instantiated %s for %s." % (user, address[0]), 0)
1495 # associate this connection with it
1496 user.connection = connection
1498 # set the user's ipa from the connection's ipa
1499 user.address = address[0]
1501 # let the client know we WILL EOR (RFC 885)
1502 mudpy.telnet.enable(user, mudpy.telnet.TELOPT_EOR, mudpy.telnet.US)
1503 user.negotiation_pause = 2
1505 # return the new user object
1509 def find_command(command_name):
1510 """Try to find a command by name or abbreviation."""
1512 # lowercase the command
1513 command_name = command_name.lower()
1516 if command_name in universe.groups["command"]:
1517 # the command matches a command word for which we have data
1518 command = universe.groups["command"][command_name]
1520 for candidate in sorted(universe.groups["command"]):
1521 if candidate.startswith(command_name) and not universe.groups[
1522 "command"][candidate].is_restricted():
1523 # the command matches the start of a command word and is not
1524 # restricted to administrators
1525 command = universe.groups["command"][candidate]
1530 def get_menu(state, error=None, choices=None):
1531 """Show the correct menu text to a user."""
1533 # make sure we don't reuse a mutable sequence by default
1537 # get the description or error text
1538 message = get_menu_description(state, error)
1540 # get menu choices for the current state
1541 message += get_formatted_menu_choices(state, choices)
1543 # try to get a prompt, if it was defined
1544 message += get_menu_prompt(state)
1546 # throw in the default choice, if it exists
1547 message += get_formatted_default_menu_choice(state)
1549 # display a message indicating if echo is off
1550 message += get_echo_message(state)
1552 # return the assembly of various strings defined above
1556 def menu_echo_on(state):
1557 """True if echo is on, false if it is off."""
1558 return universe.groups["menu"][state].get("echo", True)
1561 def get_echo_message(state):
1562 """Return a message indicating that echo is off."""
1563 if menu_echo_on(state):
1566 return "(won't echo) "
1569 def get_default_menu_choice(state):
1570 """Return the default choice for a menu."""
1571 return universe.groups["menu"][state].get("default")
1574 def get_formatted_default_menu_choice(state):
1575 """Default menu choice foratted for inclusion in a prompt string."""
1576 default_choice = get_default_menu_choice(state)
1578 return "[$(red)" + default_choice + "$(nrm)] "
1583 def get_menu_description(state, error):
1584 """Get the description or error text."""
1586 # an error condition was raised by the handler
1589 # try to get an error message matching the condition
1591 description = universe.groups[
1592 "menu"][state].get("error_" + error)
1594 description = "That is not a valid choice..."
1595 description = "$(red)" + description + "$(nrm)"
1597 # there was no error condition
1600 # try to get a menu description for the current state
1601 description = universe.groups["menu"][state].get("description")
1603 # return the description or error message
1605 description += "$(eol)$(eol)"
1609 def get_menu_prompt(state):
1610 """Try to get a prompt, if it was defined."""
1611 prompt = universe.groups["menu"][state].get("prompt")
1617 def get_menu_choices(user):
1618 """Return a dict of choice:meaning."""
1619 state = universe.groups["menu"][user.state]
1620 create_choices = state.get("create")
1622 choices = call_hook_function(create_choices, (user,))
1628 for facet in state.facets():
1629 if facet.startswith("demand_") and not call_hook_function(
1630 universe.groups["menu"][user.state].get(facet), (user,)):
1631 ignores.append(facet.split("_", 2)[1])
1632 elif facet.startswith("create_"):
1633 creates[facet] = facet.split("_", 2)[1]
1634 elif facet.startswith("choice_"):
1635 options[facet] = facet.split("_", 2)[1]
1636 for facet in creates.keys():
1637 if not creates[facet] in ignores:
1638 choices[creates[facet]] = call_hook_function(
1639 state.get(facet), (user,))
1640 for facet in options.keys():
1641 if not options[facet] in ignores:
1642 choices[options[facet]] = state.get(facet)
1646 def get_formatted_menu_choices(state, choices):
1647 """Returns a formatted string of menu choices."""
1649 choice_keys = list(choices.keys())
1651 for choice in choice_keys:
1652 choice_output += " [$(red)" + choice + "$(nrm)] " + choices[
1656 choice_output += "$(eol)"
1657 return choice_output
1660 def get_menu_branches(state):
1661 """Return a dict of choice:branch."""
1663 for facet in universe.groups["menu"][state].facets():
1664 if facet.startswith("branch_"):
1666 facet.split("_", 2)[1]
1667 ] = universe.groups["menu"][state].get(facet)
1671 def get_default_branch(state):
1672 """Return the default branch."""
1673 return universe.groups["menu"][state].get("branch")
1676 def get_choice_branch(user):
1677 """Returns the new state matching the given choice."""
1678 branches = get_menu_branches(user.state)
1679 if user.choice in branches.keys():
1680 return branches[user.choice]
1681 elif user.choice in user.menu_choices.keys():
1682 return get_default_branch(user.state)
1687 def get_menu_actions(state):
1688 """Return a dict of choice:branch."""
1690 for facet in universe.groups["menu"][state].facets():
1691 if facet.startswith("action_"):
1693 facet.split("_", 2)[1]
1694 ] = universe.groups["menu"][state].get(facet)
1698 def get_default_action(state):
1699 """Return the default action."""
1700 return universe.groups["menu"][state].get("action")
1703 def get_choice_action(user):
1704 """Run any indicated script for the given choice."""
1705 actions = get_menu_actions(user.state)
1706 if user.choice in actions.keys():
1707 return actions[user.choice]
1708 elif user.choice in user.menu_choices.keys():
1709 return get_default_action(user.state)
1714 def call_hook_function(fname, arglist):
1715 """Safely execute named function with supplied arguments, return result."""
1717 # all functions relative to mudpy package
1720 for component in fname.split("."):
1722 function = getattr(function, component)
1723 except AttributeError:
1724 log('Could not find mudpy.%s() for arguments "%s"'
1725 % (fname, arglist), 7)
1730 return function(*arglist)
1732 log('Calling mudpy.%s(%s) raised an exception...\n%s'
1733 % (fname, (*arglist,), traceback.format_exc()), 7)
1736 def handle_user_input(user):
1737 """The main handler, branches to a state-specific handler."""
1739 # if the user's client echo is off, send a blank line for aesthetics
1740 if mudpy.telnet.is_enabled(user, mudpy.telnet.TELOPT_ECHO,
1742 user.send("", add_prompt=False, prepend_padding=False)
1744 # check to make sure the state is expected, then call that handler
1746 globals()["handler_" + user.state](user)
1748 generic_menu_handler(user)
1750 # since we got input, flag that the menu/prompt needs to be redisplayed
1751 user.menu_seen = False
1753 # update the last_input timestamp while we're at it
1754 user.last_input = universe.get_time()
1757 def generic_menu_handler(user):
1758 """A generic menu choice handler."""
1760 # get a lower-case representation of the next line of input
1761 if user.input_queue:
1762 user.choice = user.input_queue.pop(0)
1764 user.choice = user.choice.lower()
1768 user.choice = get_default_menu_choice(user.state)
1769 if user.choice in user.menu_choices:
1770 action = get_choice_action(user)
1772 call_hook_function(action, (user,))
1773 new_state = get_choice_branch(user)
1775 user.state = new_state
1777 user.error = "default"
1780 def handler_entering_account_name(user):
1781 """Handle the login account name."""
1783 # get the next waiting line of input
1784 input_data = user.input_queue.pop(0)
1786 # did the user enter anything?
1789 # keep only the first word and convert to lower-case
1790 name = input_data.lower()
1792 # fail if there are non-alphanumeric characters
1793 if name != "".join(filter(
1794 lambda x: x >= "0" and x <= "9" or x >= "a" and x <= "z",
1796 user.error = "bad_name"
1798 # if that account exists, time to request a password
1799 elif name in universe.groups.get("account", {}):
1800 user.account = universe.groups["account"][name]
1801 user.state = "checking_password"
1803 # otherwise, this could be a brand new user
1805 user.account = Element("account.%s" % name, universe)
1806 user.account.set("name", name)
1807 log("New user: " + name, 2)
1808 user.state = "checking_new_account_name"
1810 # if the user entered nothing for a name, then buhbye
1812 user.state = "disconnecting"
1815 def handler_checking_password(user):
1816 """Handle the login account password."""
1818 # get the next waiting line of input
1819 input_data = user.input_queue.pop(0)
1821 if "mudpy.limit" in universe.contents:
1822 max_password_tries = universe.contents["mudpy.limit"].get(
1823 "password_tries", 3)
1825 max_password_tries = 3
1827 # does the hashed input equal the stored hash?
1828 if mudpy.password.verify(input_data, user.account.get("passhash")):
1830 # if so, set the username and load from cold storage
1831 if not user.replace_old_connections():
1833 user.state = "main_utility"
1835 # if at first your hashes don't match, try, try again
1836 elif user.password_tries < max_password_tries - 1:
1837 user.password_tries += 1
1838 user.error = "incorrect"
1840 # we've exceeded the maximum number of password failures, so disconnect
1843 "$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)"
1845 user.state = "disconnecting"
1848 def handler_entering_new_password(user):
1849 """Handle a new password entry."""
1851 # get the next waiting line of input
1852 input_data = user.input_queue.pop(0)
1854 if "mudpy.limit" in universe.contents:
1855 max_password_tries = universe.contents["mudpy.limit"].get(
1856 "password_tries", 3)
1858 max_password_tries = 3
1860 # make sure the password is strong--at least one upper, one lower and
1861 # one digit, seven or more characters in length
1862 if len(input_data) > 6 and len(
1863 list(filter(lambda x: x >= "0" and x <= "9", input_data))
1865 list(filter(lambda x: x >= "A" and x <= "Z", input_data))
1867 list(filter(lambda x: x >= "a" and x <= "z", input_data))
1870 # hash and store it, then move on to verification
1871 user.account.set("passhash", mudpy.password.create(input_data))
1872 user.state = "verifying_new_password"
1874 # the password was weak, try again if you haven't tried too many times
1875 elif user.password_tries < max_password_tries - 1:
1876 user.password_tries += 1
1879 # too many tries, so adios
1882 "$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)"
1884 user.account.destroy()
1885 user.state = "disconnecting"
1888 def handler_verifying_new_password(user):
1889 """Handle the re-entered new password for verification."""
1891 # get the next waiting line of input
1892 input_data = user.input_queue.pop(0)
1894 if "mudpy.limit" in universe.contents:
1895 max_password_tries = universe.contents["mudpy.limit"].get(
1896 "password_tries", 3)
1898 max_password_tries = 3
1900 # hash the input and match it to storage
1901 if mudpy.password.verify(input_data, user.account.get("passhash")):
1904 # the hashes matched, so go active
1905 if not user.replace_old_connections():
1906 user.state = "main_utility"
1908 # go back to entering the new password as long as you haven't tried
1910 elif user.password_tries < max_password_tries - 1:
1911 user.password_tries += 1
1912 user.error = "differs"
1913 user.state = "entering_new_password"
1915 # otherwise, sayonara
1918 "$(eol)$(red)Too many failed password attempts...$(nrm)$(eol)"
1920 user.account.destroy()
1921 user.state = "disconnecting"
1924 def handler_active(user):
1925 """Handle input for active users."""
1927 # get the next waiting line of input
1928 input_data = user.input_queue.pop(0)
1933 # split out the command and parameters
1935 mode = actor.get("mode")
1936 if mode and input_data.startswith("!"):
1937 command_name, parameters = first_word(input_data[1:])
1938 elif mode == "chat":
1939 command_name = "say"
1940 parameters = input_data
1942 command_name, parameters = first_word(input_data)
1944 # expand to an actual command
1945 command = find_command(command_name)
1947 # if it's allowed, do it
1949 if actor.can_run(command):
1950 action_fname = command.get("action", command.key)
1952 result = call_hook_function(action_fname, (actor, parameters))
1954 # if the command was not run, give an error
1956 mudpy.command.error(actor, input_data)
1958 # if no input, just idle back with a prompt
1960 user.send("", just_prompt=True)
1963 def daemonize(universe):
1964 """Fork and disassociate from everything."""
1966 # only if this is what we're configured to do
1967 if "mudpy.process" in universe.contents and universe.contents[
1968 "mudpy.process"].get("daemon"):
1970 # log before we start forking around, so the terminal gets the message
1971 log("Disassociating from the controlling terminal.")
1973 # fork off and die, so we free up the controlling terminal
1977 # switch to a new process group
1980 # fork some more, this time to free us from the old process group
1984 # reset the working directory so we don't needlessly tie up mounts
1987 # clear the file creation mask so we can bend it to our will later
1990 # redirect stdin/stdout/stderr and close off their former descriptors
1991 for stdpipe in range(3):
1993 sys.stdin = codecs.open("/dev/null", "r", "utf-8")
1994 sys.__stdin__ = codecs.open("/dev/null", "r", "utf-8")
1995 sys.stdout = codecs.open("/dev/null", "w", "utf-8")
1996 sys.stderr = codecs.open("/dev/null", "w", "utf-8")
1997 sys.__stdout__ = codecs.open("/dev/null", "w", "utf-8")
1998 sys.__stderr__ = codecs.open("/dev/null", "w", "utf-8")
2001 def create_pidfile(universe):
2002 """Write a file containing the current process ID."""
2003 pid = str(os.getpid())
2004 log("Process ID: " + pid)
2005 if "mudpy.process" in universe.contents:
2006 file_name = universe.contents["mudpy.process"].get("pidfile", "")
2010 if not os.path.isabs(file_name):
2011 file_name = os.path.join(universe.startdir, file_name)
2012 os.makedirs(os.path.dirname(file_name), exist_ok=True)
2013 file_descriptor = codecs.open(file_name, "w", "utf-8")
2014 file_descriptor.write(pid + "\n")
2015 file_descriptor.flush()
2016 file_descriptor.close()
2019 def remove_pidfile(universe):
2020 """Remove the file containing the current process ID."""
2021 if "mudpy.process" in universe.contents:
2022 file_name = universe.contents["mudpy.process"].get("pidfile", "")
2026 if not os.path.isabs(file_name):
2027 file_name = os.path.join(universe.startdir, file_name)
2028 if os.access(file_name, os.W_OK):
2029 os.remove(file_name)
2032 def excepthook(excepttype, value, tracebackdata):
2033 """Handle uncaught exceptions."""
2035 # assemble the list of errors into a single string
2037 traceback.format_exception(excepttype, value, tracebackdata)
2040 # try to log it, if possible
2043 except Exception as e:
2044 # try to write it to stderr, if possible
2045 sys.stderr.write(message + "\nException while logging...\n%s" % e)
2048 def sighook(what, where):
2049 """Handle external signals."""
2052 message = "Caught signal: "
2054 # for a hangup signal
2055 if what == signal.SIGHUP:
2056 message += "hangup (reloading)"
2057 universe.reload_flag = True
2059 # for a terminate signal
2060 elif what == signal.SIGTERM:
2061 message += "terminate (halting)"
2062 universe.terminate_flag = True
2064 # catchall for unexpected signals
2066 message += str(what) + " (unhandled)"
2072 def override_excepthook():
2073 """Redefine sys.excepthook with our own."""
2074 sys.excepthook = excepthook
2077 def assign_sighook():
2078 """Assign a customized handler for some signals."""
2079 signal.signal(signal.SIGHUP, sighook)
2080 signal.signal(signal.SIGTERM, sighook)
2084 """This contains functions to be performed when starting the engine."""
2086 # see if a configuration file was specified
2087 if len(sys.argv) > 1:
2088 conffile = sys.argv[1]
2094 universe = Universe(conffile, True)
2096 # report any loglines which accumulated during setup
2097 for logline in universe.setup_loglines:
2099 universe.setup_loglines = []
2101 # fork and disassociate
2104 # override the default exception handler so we get logging first thing
2105 override_excepthook()
2107 # set up custom signal handlers
2111 create_pidfile(universe)
2113 # load and store diagnostic info
2114 universe.versions = mudpy.version.Versions("mudpy")
2116 # log startup diagnostic messages
2117 log("On %s at %s" % (universe.versions.python_version, sys.executable), 1)
2118 log("Import path: %s" % ", ".join(sys.path), 1)
2119 log("Installed dependencies: %s" % universe.versions.dependencies_text, 1)
2120 log("Other python packages: %s" % universe.versions.environment_text, 1)
2121 log("Running version: %s" % universe.versions.version, 1)
2122 log("Initial directory: %s" % universe.startdir, 1)
2123 log("Command line: %s" % " ".join(sys.argv), 1)
2125 # pass the initialized universe back
2130 """These are functions performed when shutting down the engine."""
2132 # the loop has terminated, so save persistent data
2135 # log a final message
2136 log("Shutting down now.")
2138 # get rid of the pidfile
2139 remove_pidfile(universe)