"""Data interface functions for the mudpy engine.""" # Copyright (c) 2004-2018 mudpy authors. Permission to use, copy, # modify, and distribute this software is granted under terms # provided in the LICENSE file distributed with this software. import os import re import stat import mudpy import yaml class _IBSEmitter(yaml.emitter.Emitter): """Override the default YAML Emitter to indent block sequences.""" def expect_block_sequence(self): """Match the expectations of the ``yamllint`` style checker.""" # TODO(fungi) Get an option for this implemented upstream in # the pyyaml library self.increase_indent(flow=False, indentless=False) self.state = self.expect_first_block_sequence_item class _IBSDumper(yaml.SafeDumper, _IBSEmitter): """Use our _IBSEmitter instead of the default implementation.""" pass class Data: """A file containing universe elements and their facets.""" def __init__(self, source, universe, flags=None, relative=None, ): self.source = source self.universe = universe if flags is None: self.flags = [] else: self.flags = flags[:] self.relative = relative self.load() def load(self): """Read a file, create elements and poplulate facets accordingly.""" self.modified = False self.source = find_file( self.source, relative=self.relative, universe=self.universe) try: self.data = yaml.safe_load(open(self.source)) log_entry = ("Loaded file %s into memory." % self.source, 5) except FileNotFoundError: # it's normal if the file is one which doesn't exist yet self.data = {} log_entry = ("File %s is unavailable." % self.source, 6) try: mudpy.misc.log(*log_entry) except NameError: # happens when we're not far enough along in the init process self.universe.setup_loglines.append(log_entry) if not hasattr(self.universe, "files"): self.universe.files = {} self.universe.files[self.source] = self includes = [] for node in list(self.data): if node == "_load": includes += self.data["_load"] continue if node.startswith("_"): continue facet_pos = node.rfind(".") + 1 prefix = node[:facet_pos].strip(".") try: element = self.universe.contents[prefix] except KeyError: element = mudpy.misc.Element(prefix, self.universe, self) element.set(node[facet_pos:], self.data[node]) if prefix.startswith("mudpy.movement."): self.universe.directions.add( prefix[prefix.rfind(".") + 1:]) for include_file in includes: if not os.path.isabs(include_file): include_file = find_file( include_file, relative=self.source, universe=self.universe ) if (include_file not in self.universe.files or not self.universe.files[include_file].is_writeable()): Data(include_file, self.universe) def save(self): """Write the data, if necessary.""" normal_umask = 0o0022 private_umask = 0o0077 private_file_mode = 0o0600 # when modified, writeable and has content or the file exists if self.modified and self.is_writeable() and ( self.data or os.path.exists(self.source) ): # make parent directories if necessary old_umask = os.umask(normal_umask) os.makedirs(os.path.dirname(self.source), exist_ok=True) os.umask(old_umask) # backup the file if "mudpy.limit" in self.universe.contents: max_count = self.universe.contents["mudpy.limit"].get( "backups", 0) else: max_count = 0 if os.path.exists(self.source) and max_count: backups = [] for candidate in os.listdir(os.path.dirname(self.source)): if re.match( os.path.basename(self.source) + r"""\.\d+$""", candidate ): backups.append(int(candidate.split(".")[-1])) backups.sort() backups.reverse() for old_backup in backups: if old_backup >= max_count - 1: os.remove(self.source + "." + str(old_backup)) elif not os.path.exists( self.source + "." + str(old_backup + 1) ): os.rename( self.source + "." + str(old_backup), self.source + "." + str(old_backup + 1) ) if not os.path.exists(self.source + ".0"): os.rename(self.source, self.source + ".0") # our data file if "private" in self.flags: old_umask = os.umask(private_umask) file_descriptor = open(self.source, "w") if oct(stat.S_IMODE(os.stat( self.source)[stat.ST_MODE])) != private_file_mode: # if it's marked private, chmod it appropriately os.chmod(self.source, private_file_mode) else: old_umask = os.umask(normal_umask) file_descriptor = open(self.source, "w") os.umask(old_umask) # write and close the file yaml.dump(self.data, Dumper=_IBSDumper, allow_unicode=True, default_flow_style=False, explicit_start=True, indent=4, stream=file_descriptor) file_descriptor.close() # unset the modified flag self.modified = False def is_writeable(self): """Returns True if the _lock is False.""" try: return not self.data.get("_lock", False) except KeyError: return True def find_file( file_name=None, group=None, prefix=None, relative=None, search=None, stash=None, universe=None ): """Return an absolute file path based on configuration.""" # this is all unnecessary if it's already absolute if file_name and os.path.isabs(file_name): return os.path.realpath(file_name) # if a universe was provided, try to get some defaults from there if universe: if hasattr( universe, "contents") and "mudpy.filing" in universe.contents: filing = universe.contents["mudpy.filing"] if not prefix: prefix = filing.get("prefix") if not search: search = filing.get("search") if not stash: stash = filing.get("stash") # if there's only one file loaded, try to work around a chicken