-class Element:
- u"""An element of the universe."""
- def __init__(self, key, universe, filename=None):
- u"""Set up a new element."""
- import os.path
-
- # keep track of our key name
- self.key = key
-
- # keep track of what universe it's loading into
- self.universe = universe
-
- # clone attributes if this is replacing another element
- if self.key in self.universe.contents:
- old_element = self.universe.contents[self.key]
- for attribute in vars(old_element).keys():
- exec(u"self." + attribute + u" = old_element." + attribute)
- if self.owner: self.owner.avatar = self
-
- # i guess this is a new element then
- else:
-
- # not owned by a user by default (used for avatars)
- self.owner = None
-
- # no contents in here by default
- self.contents = {}
-
- # parse out appropriate category and subkey names, add to list
- if self.key.find(u":") > 0:
- self.category, self.subkey = self.key.split(u":", 1)
- else:
- self.category = u"other"
- self.subkey = self.key
- if not self.category in self.universe.categories:
- self.category = u"other"
- self.subkey = self.key
-
- # get an appropriate filename for the origin
- if not filename:
- filename = self.universe.default_origins[self.category]
- if not os.path.isabs(filename):
- filename = os.path.abspath(filename)
-
- # add the file if it doesn't exist yet
- if not filename in self.universe.files:
- DataFile(filename, self.universe)
-
- # record or reset a pointer to the origin file
- self.origin = self.universe.files[filename]
-
- # add a data section to the origin if necessary
- if not self.origin.data.has_section(self.key):
- self.origin.data.add_section(self.key)
-
- # add or replace this element in the universe
- self.universe.contents[self.key] = self
- self.universe.categories[self.category][self.subkey] = self
-
- def reload(self):
- u"""Create a new element and replace this one."""
- new_element = Element(self.key, self.universe, self.origin.filename)
- del(self)
- def destroy(self):
- u"""Remove an element from the universe and destroy it."""
- self.origin.data.remove_section(self.key)
- del self.universe.categories[self.category][self.subkey]
- del self.universe.contents[self.key]
- del self
- def facets(self):
- u"""Return a list of non-inherited facets for this element."""
- if self.key in self.origin.data.sections():
- return self.origin.data.options(self.key)
- else: return []
- def has_facet(self, facet):
- u"""Return whether the non-inherited facet exists."""
- return facet in self.facets()
- def remove_facet(self, facet):
- u"""Remove a facet from the element."""
- if self.has_facet(facet):
- self.origin.data.remove_option(self.key, facet)
- self.origin.modified = True
- def ancestry(self):
- u"""Return a list of the element's inheritance lineage."""
- if self.has_facet(u"inherit"):
- ancestry = self.getlist(u"inherit")
- for parent in ancestry[:]:
- ancestors = self.universe.contents[parent].ancestry()
- for ancestor in ancestors:
- if ancestor not in ancestry: ancestry.append(ancestor)
- return ancestry
- else: return []
- def get(self, facet, default=None):
- u"""Retrieve values."""
- if default is None: default = u""
- if self.origin.data.has_option(self.key, facet):
- raw_data = self.origin.data.get(self.key, facet)
- if raw_data.startswith(u"u\"") or raw_data.startswith(u"u'"):
- raw_data = raw_data[1:]
- raw_data.strip(u"\"'")
- if type(raw_data) == str: return unicode(raw_data, "utf-8")
- else: return raw_data
- elif self.has_facet(u"inherit"):
- for ancestor in self.ancestry():
- if self.universe.contents[ancestor].has_facet(facet):
- return self.universe.contents[ancestor].get(facet)
- else: return default
- def getboolean(self, facet, default=None):
- u"""Retrieve values as boolean type."""
- if default is None: default=False
- if self.origin.data.has_option(self.key, facet):
- return self.origin.data.getboolean(self.key, facet)
- elif self.has_facet(u"inherit"):
- for ancestor in self.ancestry():
- if self.universe.contents[ancestor].has_facet(facet):
- return self.universe.contents[ancestor].getboolean(facet)
- else: return default
- def getint(self, facet, default=None):
- u"""Return values as int/long type."""
- if default is None: default = 0
- if self.origin.data.has_option(self.key, facet):
- return self.origin.data.getint(self.key, facet)
- elif self.has_facet(u"inherit"):
- for ancestor in self.ancestry():
- if self.universe.contents[ancestor].has_facet(facet):
- return self.universe.contents[ancestor].getint(facet)
- else: return default
- def getfloat(self, facet, default=None):
- u"""Return values as float type."""
- if default is None: default = 0.0
- if self.origin.data.has_option(self.key, facet):
- return self.origin.data.getfloat(self.key, facet)
- elif self.has_facet(u"inherit"):
- for ancestor in self.ancestry():
- if self.universe.contents[ancestor].has_facet(facet):
- return self.universe.contents[ancestor].getfloat(facet)
- else: return default
- def getlist(self, facet, default=None):
- u"""Return values as list type."""
- if default is None: default = []
- value = self.get(facet)
- if value: return makelist(value)
- else: return default
- def getdict(self, facet, default=None):
- u"""Return values as dict type."""
- if default is None: default = {}
- value = self.get(facet)
- if value: return makedict(value)
- else: return default
- def set(self, facet, value):
- u"""Set values."""
- if not self.has_facet(facet) or not self.get(facet) == value:
- if type(value) is long: value = unicode(value)
- elif not type(value) is unicode: value = repr(value)
- self.origin.data.set(self.key, facet, value)
- self.origin.modified = True
- def append(self, facet, value):
- u"""Append value tp a list."""
- if type(value) is long: value = unicode(value)
- elif not type(value) is unicode: value = repr(value)
- newlist = self.getlist(facet)
- newlist.append(value)
- self.set(facet, newlist)
-
- def new_event(self, action, when=None):
- u"""Create, attach and enqueue an event element."""
-
- # if when isn't specified, that means now
- if not when: when = self.universe.get_time()
-
- # events are elements themselves
- event = Element(u"event:" + self.key + u":" + counter)
-
- def send(
- self,
- message,
- eol=u"$(eol)",
- raw=False,
- flush=False,
- add_prompt=True,
- just_prompt=False
- ):
- u"""Convenience method to pass messages to an owner."""
- if self.owner:
- self.owner.send(message, eol, raw, flush, add_prompt, just_prompt)
-
- def can_run(self, command):
- u"""Check if the user can run this command object."""
-
- # has to be in the commands category
- if command not in self.universe.categories[u"command"].values():
- result = False
-
- # avatars of administrators can run any command
- elif self.owner and self.owner.account.getboolean(u"administrator"):
- result = True
-
- # everyone can run non-administrative commands
- elif not command.getboolean(u"administrative"):
- result = True
-
- # otherwise the command cannot be run by this actor
- else:
- result = False
-
- # pass back the result
- return result
-
- def update_location(self):
- u"""Make sure the location's contents contain this element."""
- location = self.get(u"location")
- if location in self.universe.contents:
- self.universe.contents[location].contents[self.key] = self
- def clean_contents(self):
- u"""Make sure the element's contents aren't bogus."""
- for element in self.contents.values():
- if element.get(u"location") != self.key:
- del self.contents[element.key]
- def go_to(self, location):
- u"""Relocate the element to a specific location."""
- current = self.get(u"location")
- if current and self.key in self.universe.contents[current].contents:
- del universe.contents[current].contents[self.key]
- if location in self.universe.contents: self.set(u"location", location)
- self.universe.contents[location].contents[self.key] = self
- self.look_at(location)
- def go_home(self):
- u"""Relocate the element to its default location."""
- self.go_to(self.get(u"default_location"))
- self.echo_to_location(
- u"You suddenly realize that " + self.get(u"name") + u" is here."
- )
- def move_direction(self, direction):
- u"""Relocate the element in a specified direction."""
- self.echo_to_location(
- self.get(
- u"name"
- ) + u" exits " + self.universe.categories[
- u"internal"
- ][
- u"directions"
- ].getdict(
- direction
- )[
- u"exit"
- ] + u"."
- )
- self.send(
- u"You exit " + self.universe.categories[
- u"internal"
- ][
- u"directions"
- ].getdict(
- direction
- )[
- u"exit"
- ] + u".",
- add_prompt=False
- )
- self.go_to(
- self.universe.contents[self.get(u"location")].link_neighbor(direction)
- )
- self.echo_to_location(
- self.get(
- u"name"
- ) + u" arrives from " + self.universe.categories[
- u"internal"
- ][
- u"directions"
- ].getdict(
- direction
- )[
- u"enter"
- ] + u"."
- )
- def look_at(self, key):
- u"""Show an element to another element."""
- if self.owner:
- element = self.universe.contents[key]
- message = u""
- name = element.get(u"name")
- if name: message += u"$(cyn)" + name + u"$(nrm)$(eol)"
- description = element.get(u"description")
- if description: message += description + u"$(eol)"
- portal_list = element.portals().keys()
- if portal_list:
- portal_list.sort()
- message += u"$(cyn)[ Exits: " + u", ".join(
- portal_list
- ) + u" ]$(nrm)$(eol)"
- for element in self.universe.contents[
- self.get(u"location")
- ].contents.values():
- if element.getboolean(u"is_actor") and element is not self:
- message += u"$(yel)" + element.get(
- u"name"
- ) + u" is here.$(nrm)$(eol)"
- elif element is not self:
- message += u"$(grn)" + element.get(
- u"impression"
- ) + u"$(nrm)$(eol)"
- self.send(message)
- def portals(self):
- u"""Map the portal directions for a room to neighbors."""
- import re
- portals = {}
- if re.match(u"""^location:-?\d+,-?\d+,-?\d+$""", self.key):
- coordinates = [(int(x)) for x in self.key.split(u":")[1].split(u",")]
- directions = self.universe.categories[u"internal"][u"directions"]
- offsets = dict(
- [
- (
- x, directions.getdict(x)[u"vector"]
- ) for x in directions.facets()
- ]
- )
- for portal in self.getlist(u"gridlinks"):
- adjacent = map(lambda c,o: c+o, coordinates, offsets[portal])
- neighbor = u"location:" + u",".join(
- [(unicode(x)) for x in adjacent]
- )
- if neighbor in self.universe.contents: portals[portal] = neighbor
- for facet in self.facets():
- if facet.startswith(u"link_"):
- neighbor = self.get(facet)
- if neighbor in self.universe.contents:
- portal = facet.split(u"_")[1]
- portals[portal] = neighbor
- return portals
- def link_neighbor(self, direction):
- u"""Return the element linked in a given direction."""
- portals = self.portals()
- if direction in portals: return portals[direction]
- def echo_to_location(self, message):
- u"""Show a message to other elements in the current location."""
- for element in self.universe.contents[
- self.get(u"location")
- ].contents.values():
- if element is not self: element.send(message)
-
-class DataFile:
- u"""A file containing universe elements."""
- def __init__(self, filename, universe):
- self.filename = filename
- self.universe = universe
- self.load()
- def load(self):
- u"""Read a file and create elements accordingly."""
- import ConfigParser, os, os.path
- self.data = ConfigParser.RawConfigParser()
- self.modified = False
- if os.access(self.filename, os.R_OK): self.data.read(self.filename)
- if not hasattr(self.universe, u"files"): self.universe.files = {}
- self.universe.files[self.filename] = self
- includes = []
- if self.data.has_option(u"__control__", u"include_files"):
- for included in makelist(
- self.data.get(u"__control__", u"include_files")
- ):
- included = find_file(
- included,
- relative=self.filename,
- universe=self.universe
- )
- if included not in includes: includes.append(included)
- if self.data.has_option(u"__control__", u"include_dirs"):
- for included in [ os.path.join(x, u"__init__.mpy") for x in makelist(
- self.data.get(u"__control__", u"include_dirs")
- ) ]:
- included = find_file(
- included,
- relative=self.filename,
- universe=self.universe
- )
- if included not in includes: includes.append(included)
- if self.data.has_option(u"__control__", u"default_files"):
- origins = makedict(self.data.get(u"__control__", u"default_files"))
- for key in origins.keys():
- origins[key] = find_file(
- origins[key],
- relative=self.filename,
- universe=self.universe
- )
- if origins[key] not in includes: includes.append(origins[key])
- self.universe.default_origins[key] = origins[key]
- if key not in self.universe.categories:
- self.universe.categories[key] = {}
- if self.data.has_option(u"__control__", u"private_files"):
- for item in makelist(self.data.get(u"__control__", u"private_files")):
- item = find_file(
- item,
- relative=self.filename,
- universe=self.universe
- )
- if item not in includes: includes.append(item)
- if item not in self.universe.private_files:
- self.universe.private_files.append(item)
- for section in self.data.sections():
- if section != u"__control__":
- Element(section, self.universe, self.filename)
- for include_file in includes:
- if not os.path.isabs(include_file):
- include_file = find_file(
- include_file,
- relative=self.filename,
- universe=self.universe
- )
- if include_file not in self.universe.files or not self.universe.files[
- include_file
- ].is_writeable():
- DataFile(include_file, self.universe)
- def save(self):
- u"""Write the data, if necessary."""
- import codecs, os, os.path, re, stat
-
- # when modified, writeable and has content or the file exists
- if self.modified and self.is_writeable() and (
- self.data.sections() or os.path.exists(self.filename)
- ):
-
- # make parent directories if necessary
- if not os.path.exists(os.path.dirname(self.filename)):
- os.makedirs(os.path.dirname(self.filename))
-
- # backup the file
- if self.data.has_option(u"__control__", u"backup_count"):
- max_count = self.data.has_option(u"__control__", u"backup_count")
- else:
- max_count = universe.categories[u"internal"][u"limits"].getint(
- u"default_backup_count"
- )
- if os.path.exists(self.filename) and max_count:
- backups = []
- for candidate in os.listdir(os.path.dirname(self.filename)):
- if re.match(
- os.path.basename(self.filename) + u"""\.\d+$""", candidate
- ):
- backups.append(int(candidate.split(u".")[-1]))
- backups.sort()
- backups.reverse()
- for old_backup in backups:
- if old_backup >= max_count-1:
- os.remove(self.filename+u"."+unicode(old_backup))
- elif not os.path.exists(
- self.filename+u"."+unicode(old_backup+1)
- ):
- os.rename(
- self.filename + u"."+unicode(old_backup),
- self.filename + u"."+unicode( old_backup + 1 )
- )
- if not os.path.exists(self.filename+u".0"):
- os.rename( self.filename, self.filename + u".0" )
-
- # our data file
- file_descriptor = codecs.open(self.filename, u"w", u"utf-8")
-
- # if it's marked private, chmod it appropriately
- if self.filename in self.universe.private_files and oct(
- stat.S_IMODE( os.stat(self.filename)[stat.ST_MODE] )
- ) != 0600:
- os.chmod(self.filename, 0600)
-
- # write it back sorted, instead of using ConfigParser
- sections = self.data.sections()
- sections.sort()
- for section in sections:
- file_descriptor.write(u"[" + section + u"]\n")
- options = self.data.options(section)
- options.sort()
- for option in options:
- file_descriptor.write(
- option + u" = " + self.data.get(section, option) + u"\n"
- )
- file_descriptor.write(u"\n")
-
- # flush and close the file
- file_descriptor.flush()
- file_descriptor.close()
-
- # unset the modified flag
- self.modified = False
- def is_writeable(self):
- u"""Returns True if the __control__ read_only is False."""
- return not self.data.has_option(
- u"__control__", u"read_only"
- ) or not self.data.getboolean(
- u"__control__", u"read_only"
- )