1 """Data interface functions for the mudpy engine."""
3 # Copyright (c) 2004-2018 Jeremy Stanley <fungi@yuggoth.org>. Permission
4 # to use, copy, modify, and distribute this software is granted under
5 # terms provided in the LICENSE file distributed with this software.
15 class _IBSEmitter(yaml.emitter.Emitter):
17 """Override the default YAML Emitter to indent block sequences."""
19 def expect_block_sequence(self):
20 """Match the expectations of the ``yamllint`` style checker."""
22 # TODO(fungi) Get an option for this implemented upstream in
24 self.increase_indent(flow=False, indentless=False)
25 self.state = self.expect_first_block_sequence_item
28 class _IBSDumper(yaml.SafeDumper, _IBSEmitter):
30 """Use our _IBSEmitter instead of the default implementation."""
37 """A file containing universe elements and their facets."""
46 self.universe = universe
51 self.relative = relative
55 """Read a file, create elements and poplulate facets accordingly."""
57 self.source = find_file(
58 self.source, relative=self.relative, universe=self.universe)
60 self.data = yaml.safe_load(open(self.source))
61 log_entry = ("Loaded file %s into memory." % self.source, 5)
62 except FileNotFoundError:
63 # it's normal if the file is one which doesn't exist yet
65 log_entry = ("File %s is unavailable." % self.source, 6)
67 mudpy.misc.log(*log_entry)
69 # happens when we're not far enough along in the init process
70 self.universe.setup_loglines.append(log_entry)
71 if not hasattr(self.universe, "files"):
72 self.universe.files = {}
73 self.universe.files[self.source] = self
75 for node in list(self.data):
77 for included in self.data["_load"]:
81 universe=self.universe)
82 if included not in includes:
83 includes.append(included)
85 if node.startswith("_"):
87 facet_pos = node.rfind(".") + 1
88 prefix = node[:facet_pos].strip(".")
90 element = self.universe.contents[prefix]
92 element = mudpy.misc.Element(prefix, self.universe, self)
93 element.set(node[facet_pos:], self.data[node])
94 if prefix.startswith("mudpy.movement."):
95 self.universe.directions.add(
96 prefix[prefix.rfind(".") + 1:])
97 for include_file in includes:
98 if not os.path.isabs(include_file):
99 include_file = find_file(
101 relative=self.source,
102 universe=self.universe
104 if (include_file not in self.universe.files or not
105 self.universe.files[include_file].is_writeable()):
106 Data(include_file, self.universe)
109 """Write the data, if necessary."""
110 normal_umask = 0o0022
111 private_umask = 0o0077
112 private_file_mode = 0o0600
114 # when modified, writeable and has content or the file exists
115 if self.modified and self.is_writeable() and (
116 self.data or os.path.exists(self.source)
119 # make parent directories if necessary
120 if not os.path.exists(os.path.dirname(self.source)):
121 old_umask = os.umask(normal_umask)
122 os.makedirs(os.path.dirname(self.source))
126 if "mudpy.limit" in self.universe.contents:
127 max_count = self.universe.contents["mudpy.limit"].get(
131 if os.path.exists(self.source) and max_count:
133 for candidate in os.listdir(os.path.dirname(self.source)):
135 os.path.basename(self.source) +
136 r"""\.\d+$""", candidate
138 backups.append(int(candidate.split(".")[-1]))
141 for old_backup in backups:
142 if old_backup >= max_count - 1:
143 os.remove(self.source + "." + str(old_backup))
144 elif not os.path.exists(
145 self.source + "." + str(old_backup + 1)
148 self.source + "." + str(old_backup),
149 self.source + "." + str(old_backup + 1)
151 if not os.path.exists(self.source + ".0"):
152 os.rename(self.source, self.source + ".0")
155 if "private" in self.flags:
156 old_umask = os.umask(private_umask)
157 file_descriptor = open(self.source, "w")
158 if oct(stat.S_IMODE(os.stat(
159 self.source)[stat.ST_MODE])) != private_file_mode:
160 # if it's marked private, chmod it appropriately
161 os.chmod(self.source, private_file_mode)
163 old_umask = os.umask(normal_umask)
164 file_descriptor = open(self.source, "w")
167 # write and close the file
168 yaml.dump(self.data, Dumper=_IBSDumper, allow_unicode=True,
169 default_flow_style=False, explicit_start=True, indent=4,
170 stream=file_descriptor)
171 file_descriptor.close()
173 # unset the modified flag
174 self.modified = False
176 def is_writeable(self):
177 """Returns True if the _lock is False."""
179 return not self.data.get("_lock", False)
193 """Return an absolute file path based on configuration."""
195 # this is all unnecessary if it's already absolute
196 if file_name and os.path.isabs(file_name):
197 return os.path.realpath(file_name)
199 # if a universe was provided, try to get some defaults from there
203 universe, "contents") and "mudpy.filing" in universe.contents:
204 filing = universe.contents["mudpy.filing"]
206 prefix = filing.get("prefix")
208 search = filing.get("search")
210 stash = filing.get("stash")
212 # if there's only one file loaded, try to work around a chicken<egg
213 elif hasattr(universe, "files") and len(
215 ) == 1 and not universe.files[
216 list(universe.files.keys())[0]].is_writeable():
217 data_file = universe.files[list(universe.files.keys())[0]].data
219 # try for a fallback default directory
221 stash = data_file.get(".mudpy.filing.stash", "")
223 # try for a fallback root path
225 prefix = data_file.get(".mudpy.filing.prefix", "")
227 # try for a fallback search path
229 search = data_file.get(".mudpy.filing.search", "")
231 # another fallback root path, this time from the universe startdir
232 if hasattr(universe, "startdir"):
234 prefix = universe.startdir
235 elif not os.path.isabs(prefix):
236 prefix = os.path.join(universe.startdir, prefix)
238 # when no root path is specified, assume the current working directory
239 if (not prefix or prefix == ".") and hasattr(universe, "startdir"):
240 prefix = universe.startdir
242 # make sure it's absolute
243 prefix = os.path.realpath(prefix)
245 # if there's no search path, just use the root path and etc
247 search = [prefix, "etc"]
249 # work on a copy of the search path, to avoid modifying the caller's
253 # if there's no default path, use the last component of the search path
257 # if an existing file or directory reference was supplied, prepend it
259 if os.path.isdir(relative):
260 search = [relative] + search
262 search = [os.path.dirname(relative)] + search
264 # make the search path entries absolute and throw away any dupes
266 for each_path in search:
267 if not os.path.isabs(each_path):
268 each_path = os.path.realpath(os.path.join(prefix, each_path))
269 if each_path not in clean_search:
270 clean_search.append(each_path)
272 # start hunting for the file now
273 for each_path in clean_search:
275 # construct the candidate path
276 candidate = os.path.join(each_path, file_name)
278 # if the file exists and is readable, we're done
279 if os.path.isfile(candidate):
280 file_name = os.path.realpath(candidate)
283 # if the path is a directory, look for an __init__ file
284 if os.path.isdir(candidate):
285 file_name = os.path.realpath(
286 os.path.join(candidate, "__init__.yaml"))
289 # it didn't exist after all, so use the default path instead
290 if not os.path.isabs(file_name):
291 file_name = os.path.join(stash, file_name)
292 if not os.path.isabs(file_name):
293 file_name = os.path.join(prefix, file_name)
295 # and normalize it last thing before returning
296 file_name = os.path.realpath(file_name)