# -*- coding: utf-8 -*-
-u"""Core objects for the mudpy engine."""
+u"""Miscellaneous functions for the mudpy engine."""
# Copyright (c) 2004-2009 Jeremy Stanley <fungi@yuggoth.org>. Permission
# to use, copy, modify, and distribute this software is granted under
if default is None: default = u""
if self.origin.data.has_option(self.key, facet):
raw_data = self.origin.data.get(self.key, facet)
+ if raw_data.startswith(u"u\"") or raw_data.startswith(u"u'"):
+ raw_data = raw_data[1:]
+ raw_data.strip(u"\"'")
if type(raw_data) == str: return unicode(raw_data, "utf-8")
else: return raw_data
elif self.has_facet(u"inherit"):
if os.access(self.filename, os.R_OK): self.data.read(self.filename)
if not hasattr(self.universe, u"files"): self.universe.files = {}
self.universe.files[self.filename] = self
+ includes = []
if self.data.has_option(u"__control__", u"include_files"):
- includes = makelist(self.data.get(u"__control__", u"include_files"))
- else: includes = []
+ for included in makelist(
+ self.data.get(u"__control__", u"include_files")
+ ):
+ included = find_file(
+ included,
+ relative=self.filename,
+ universe=self.universe
+ )
+ if included not in includes: includes.append(included)
+ if self.data.has_option(u"__control__", u"include_dirs"):
+ for included in [ os.path.join(x, u"__init__.mpy") for x in makelist(
+ self.data.get(u"__control__", u"include_dirs")
+ ) ]:
+ included = find_file(
+ included,
+ relative=self.filename,
+ universe=self.universe
+ )
+ if included not in includes: includes.append(included)
if self.data.has_option(u"__control__", u"default_files"):
origins = makedict(self.data.get(u"__control__", u"default_files"))
for key in origins.keys():
- if not os.path.isabs(origins[key]):
- origins[key] = os.path.join(
- os.path.dirname(self.filename), origins[key]
- )
- if not origins[key] in includes: includes.append(origins[key])
+ origins[key] = find_file(
+ origins[key],
+ relative=self.filename,
+ universe=self.universe
+ )
+ if origins[key] not in includes: includes.append(origins[key])
self.universe.default_origins[key] = origins[key]
- if not key in self.universe.categories:
+ if key not in self.universe.categories:
self.universe.categories[key] = {}
if self.data.has_option(u"__control__", u"private_files"):
for item in makelist(self.data.get(u"__control__", u"private_files")):
- if not item in includes: includes.append(item)
- if not item in self.universe.private_files:
- if not os.path.isabs(item):
- item = os.path.join(os.path.dirname(self.filename), item)
+ item = find_file(
+ item,
+ relative=self.filename,
+ universe=self.universe
+ )
+ if item not in includes: includes.append(item)
+ if item not in self.universe.private_files:
self.universe.private_files.append(item)
for section in self.data.sections():
if section != u"__control__":
Element(section, self.universe, self.filename)
for include_file in includes:
if not os.path.isabs(include_file):
- include_file = os.path.join(
- os.path.dirname(self.filename), include_file
+ include_file = find_file(
+ include_file,
+ relative=self.filename,
+ universe=self.universe
)
if include_file not in self.universe.files or not self.universe.files[
include_file
u"""Create and open the listening socket."""
import socket
- # create a new ipv4 stream-type socket object
- self.listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ # need to know the local address and port number for the listener
+ host = self.categories[u"internal"][u"network"].get(u"host")
+ port = self.categories[u"internal"][u"network"].getint(u"port")
+
+ # if no host was specified, bind to all local addresses (preferring ipv6)
+ if not host:
+ if socket.has_ipv6: host = u"::"
+ else: host = u"0.0.0.0"
+
+ # figure out if this is ipv4 or v6
+ family = socket.getaddrinfo(host, port)[0][0]
+ if family is socket.AF_INET6 and not socket.has_ipv6:
+ log(u"No support for IPv6 address %s (use IPv4 instead)." % host)
+
+ # create a new stream-type socket object
+ self.listening_socket = socket.socket(family, socket.SOCK_STREAM)
# set the socket options to allow existing open ones to be
# reused (fixes a bug where the server can't bind for a minute
)
# bind the socket to to our desired server ipa and port
- host = self.categories[u"internal"][u"network"].get(u"host")
- port = self.categories[u"internal"][u"network"].getint(u"port")
self.listening_socket.bind((host, port))
# disable blocking so we can proceed whether or not we can
self.listening_socket.listen(1)
# note that we're now ready for user connections
- if not host: host = u"0.0.0.0"
log(
u"Listening for Telnet connections on: " + host + u":" + unicode(port)
)
logline += self.account.get(u"name")
else:
logline += u"an unknown user"
- logline += u" after idling too long in a " + self.state + u" state."
+ logline += u" after idling too long in the " + self.state + u" state."
log(logline, 2)
self.state = u"disconnecting"
self.menu_seen = False
account = self.account.get(u"name")
else: account = u"an unknown user"
log(
- u"Sending to " + account \
- + u" raised an exception (broken pipe?)."
+ u"Sending to %s raised an exception (broken pipe?)." % account,
+ 7
)
pass
def makelist(value):
u"""Turn string into list type."""
if value[0] + value[-1] == u"[]": return eval(value)
+ elif value[0] + value[-1] == u"\"\"": return [ value[1:-1] ]
else: return [ value ]
def makedict(value):
# this is how we handle local file inclusion (dangerous!)
elif macro.startswith(u"inc:"):
- incfile = os.path.join(universe.startdir, macro[4:])
+ incfile = find_file(macro[4:], universe=universe)
if os.path.exists(incfile):
incfd = codecs.open(incfile, u"r", u"utf-8")
replacement = u""
u"""Speak to others in the same room."""
import unicodedata
- # check for replacement macros
- if replace_macros(actor.owner, parameters, True) != parameters:
- actor.send(u"You cannot speak $_(replacement macros).")
+ # check for replacement macros and escape them
+ parameters = escape_macros(parameters)
- # the user entered a message
- elif parameters:
+ # if the message is wrapped in quotes, remove them and leave contents intact
+ if parameters.startswith(u"\"") and parameters.endswith(u"\""):
+ message = parameters[1:-1]
+ literal = True
- # get rid of quote marks on the ends of the message
+ # otherwise, get rid of stray quote marks on the ends of the message
+ else:
message = parameters.strip(u"\"'`")
+ literal = False
+
+ # the user entered a message
+ if message:
# match the punctuation used, if any, to an action
actions = universe.categories[u"internal"][u"language"].getdict(
)
action = u""
for mark in actions.keys():
- if message.endswith(mark):
+ if not literal and message.endswith(mark):
action = actions[mark]
break
# add punctuation if needed
if not action:
action = actions[default_punctuation]
- if message and not unicodedata.category(message[-1]) == u"Po":
+ if message and not (
+ literal or unicodedata.category(message[-1]) == u"Po"
+ ):
message += default_punctuation
- # decapitalize the first letter to improve matching
- message = message[0].lower() + message[1:]
-
- # iterate over all words in message, replacing typos
- typos = universe.categories[u"internal"][u"language"].getdict(u"typos")
- words = message.split()
- for index in range(len(words)):
- word = words[index]
- while unicodedata.category(word[0]) == u"Po":
- word = word[1:]
- while unicodedata.category(word[-1]) == u"Po":
- word = word[:-1]
- if word in typos.keys():
- words[index] = words[index].replace(word, typos[word])
- message = u" ".join(words)
-
- # capitalize the first letter
- message = message[0].upper() + message[1:]
-
- # tell the room
+ # failsafe checks to avoid unwanted reformatting and null strings
+ if message and not literal:
+
+ # decapitalize the first letter to improve matching
+ message = message[0].lower() + message[1:]
+
+ # iterate over all words in message, replacing typos
+ typos = universe.categories[u"internal"][u"language"].getdict(
+ u"typos"
+ )
+ words = message.split()
+ for index in range(len(words)):
+ word = words[index]
+ while unicodedata.category(word[0]) == u"Po":
+ word = word[1:]
+ while unicodedata.category(word[-1]) == u"Po":
+ word = word[:-1]
+ if word in typos.keys():
+ words[index] = words[index].replace(word, typos[word])
+ message = u" ".join(words)
+
+ # capitalize the first letter
+ message = message[0].upper() + message[1:]
+
+ # tell the room
+ if message:
actor.echo_to_location(
actor.get(u"name") + u" " + action + u"s, \"" + message + u"\""
)
# send the error message
actor.send(message)
+def find_file(
+ file_name=None,
+ root_path=None,
+ search_path=None,
+ default_dir=None,
+ relative=None,
+ universe=None
+):
+ u"""Return an absolute file path based on configuration."""
+ import os, os.path, sys
+
+ # make sure to get rid of any surrounding quotes first thing
+ if file_name: file_name = file_name.strip(u"\"'")
+
+ # this is all unnecessary if it's already absolute
+ if file_name and os.path.isabs(file_name):
+ return os.path.realpath(file_name)
+
+ # when no file name is specified, look for <argv[0]>.conf
+ elif not file_name: file_name = os.path.basename( sys.argv[0] ) + u".conf"
+
+ # if a universe was provided, try to get some defaults from there
+ if universe:
+
+ if hasattr(
+ universe,
+ u"contents"
+ ) and u"internal:storage" in universe.contents:
+ storage = universe.categories[u"internal"][u"storage"]
+ if not root_path: root_path = storage.get(u"root_path").strip("\"'")
+ if not search_path: search_path = storage.getlist(u"search_path")
+ if not default_dir: default_dir = storage.get(u"default_dir").strip("\"'")
+
+ # if there's only one file loaded, try to work around a chicken<egg
+ elif hasattr(universe, u"files") and len(
+ universe.files
+ ) == 1 and not universe.files[universe.files.keys()[0]].is_writeable():
+ data_file = universe.files[universe.files.keys()[0]].data
+
+ # try for a fallback default directory
+ if not default_dir and data_file.has_option(
+ u"internal:storage",
+ u"default_dir"
+ ):
+ default_dir = data_file.get(
+ u"internal:storage",
+ u"default_dir"
+ ).strip(u"\"'")
+
+ # try for a fallback root path
+ if not root_path and data_file.has_option(
+ u"internal:storage",
+ u"root_path"
+ ):
+ root_path = data_file.get(
+ u"internal:storage",
+ u"root_path"
+ ).strip(u"\"'")
+
+ # try for a fallback search path
+ if not search_path and data_file.has_option(
+ u"internal:storage",
+ u"search_path"
+ ):
+ search_path = makelist(
+ data_file.get(u"internal:storage", u"search_path").strip(u"\"'")
+ )
+
+ # another fallback root path, this time from the universe startdir
+ if not root_path and hasattr(universe, "startdir"):
+ root_path = universe.startdir
+
+ # when no root path is specified, assume the current working directory
+ if not root_path: root_path = os.getcwd()
+
+ # otherwise, make sure it's absolute
+ elif not os.path.isabs(root_path): root_path = os.path.realpath(root_path)
+
+ # if there's no search path, just use the root path and etc
+ if not search_path: search_path = [root_path, u"etc"]
+
+ # work on a copy of the search path, to avoid modifying the caller's
+ else: search_path = search_path[:]
+
+ # if there's no default path, use the last element of the search path
+ if not default_dir: default_dir = search_path[-1]
+
+ # if an existing file or directory reference was supplied, prepend it
+ if relative:
+ relative = relative.strip(u"\"'")
+ if os.path.isdir(relative): search_path = [relative] + search_path
+ else: search_path = [ os.path.dirname(relative) ] + search_path
+
+ # make the search path entries absolute and throw away any dupes
+ clean_search_path = []
+ for each_path in search_path:
+ each_path = each_path.strip(u"\"'")
+ if not os.path.isabs(each_path):
+ each_path = os.path.realpath( os.path.join(root_path, each_path) )
+ if each_path not in clean_search_path:
+ clean_search_path.append(each_path)
+
+ # start hunting for the file now
+ for each_path in clean_search_path:
+
+ # if the file exists and is readable, we're done
+ if os.path.isfile( os.path.join(each_path, file_name) ):
+ file_name = os.path.realpath( os.path.join(each_path, file_name) )
+ break
+
+ # it didn't exist after all, so use the default path instead
+ if not os.path.isabs(file_name):
+ file_name = os.path.join(default_dir, file_name)
+ if not os.path.isabs(file_name):
+ file_name = os.path.join(root_path, file_name)
+
+ # and normalize it last thing before returning
+ file_name = os.path.realpath(file_name)
+
+ # normalize the resulting file path and hand it back
+ return file_name
+
def daemonize(universe):
u"""Fork and disassociate from everything."""
import codecs, ctypes, ctypes.util, os, os.path, sys
# log what happened
log(message, 8)
-# redefine sys.excepthook with ours
-import sys
-sys.excepthook = excepthook
+def override_excepthook():
+ u"""Redefine sys.excepthook with our own."""
+ import sys
+ sys.excepthook = excepthook
-# assign the sgnal handlers
-import signal
-signal.signal(signal.SIGHUP, sighook)
-signal.signal(signal.SIGTERM, sighook)
+def assign_sighook():
+ u"""Assign a customized handler for some signals."""
+ import signal
+ signal.signal(signal.SIGHUP, sighook)
+ signal.signal(signal.SIGTERM, sighook)
+
+def setup():
+ """This contains functions to be performed when starting the engine."""
+ import sys
-# if there is no universe, create an empty one
-if not u"universe" in locals():
+ # see if a configuration file was specified
if len(sys.argv) > 1: conffile = sys.argv[1]
else: conffile = u""
+
+ # the big bang
+ global universe
universe = Universe(conffile, True)
-elif universe.reload_flag:
- universe = universe.new()
- reload_data()
+ # log an initial message
+ log(u"Started mudpy with command line: " + u" ".join(sys.argv))
+
+ # fork and disassociate
+ daemonize(universe)
+
+ # override the default exception handler so we get logging first thing
+ override_excepthook()
+
+ # set up custom signal handlers
+ assign_sighook()
+
+ # make the pidfile
+ create_pidfile(universe)
+
+ # pass the initialized universe back
+ return universe
+
+def finish():
+ """This contains functions to be performed when shutting down the engine."""
+
+ # the loop has terminated, so save persistent data
+ universe.save()
+
+ # log a final message
+ log(u"Shutting down now.")
+
+ # get rid of the pidfile
+ remove_pidfile(universe)