1 """Data interface functions for the mudpy engine."""
3 # Copyright (c) 2004-2017 Jeremy Stanley <fungi@yuggoth.org>. Permission
4 # to use, copy, modify, and distribute this software is granted under
5 # terms provided in the LICENSE file distributed with this software.
17 """A file containing universe elements and their facets."""
26 self.universe = universe
31 self.relative = relative
35 """Read a file, create elements and poplulate facets accordingly."""
37 self.source = find_file(
38 self.source, relative=self.relative, universe=self.universe)
40 self.data = yaml.safe_load(open(self.source))
41 except FileNotFoundError:
42 # it's normal if the file is one which doesn't exist yet
44 log_entry = ("File %s is unavailable." % self.source, 6)
46 mudpy.misc.log(*log_entry)
48 # happens when we're not far enough along in the init process
49 self.universe.setup_loglines.append(log_entry)
50 if not hasattr(self.universe, "files"):
51 self.universe.files = {}
52 self.universe.files[self.source] = self
54 for node in list(self.data):
56 for included in self.data["_load"]:
60 universe=self.universe)
61 if included not in includes:
62 includes.append(included)
64 if node.startswith("_"):
66 facet_pos = node.rfind(".") + 1
68 mudpy.misc.Element(node, self.universe, self, old_style=True)
70 prefix = node[:facet_pos].strip(".")
72 element = self.universe.contents[prefix]
74 element = mudpy.misc.Element(prefix, self.universe, self)
75 element.set(node[facet_pos:], self.data[node])
76 if prefix.startswith("mudpy.movement."):
77 self.universe.directions.add(
78 prefix[prefix.rfind(".") + 1:])
79 for include_file in includes:
80 if not os.path.isabs(include_file):
81 include_file = find_file(
84 universe=self.universe
86 if (include_file not in self.universe.files or not
87 self.universe.files[include_file].is_writeable()):
88 Data(include_file, self.universe)
91 """Write the data, if necessary."""
93 private_umask = 0o0077
94 private_file_mode = 0o0600
96 # when modified, writeable and has content or the file exists
97 if self.modified and self.is_writeable() and (
98 self.data or os.path.exists(self.source)
101 # make parent directories if necessary
102 if not os.path.exists(os.path.dirname(self.source)):
103 old_umask = os.umask(normal_umask)
104 os.makedirs(os.path.dirname(self.source))
108 if "mudpy.limit" in self.universe.contents:
109 max_count = self.universe.contents["mudpy.limit"].get(
113 if os.path.exists(self.source) and max_count:
115 for candidate in os.listdir(os.path.dirname(self.source)):
117 os.path.basename(self.source) +
118 r"""\.\d+$""", candidate
120 backups.append(int(candidate.split(".")[-1]))
123 for old_backup in backups:
124 if old_backup >= max_count - 1:
125 os.remove(self.source + "." + str(old_backup))
126 elif not os.path.exists(
127 self.source + "." + str(old_backup + 1)
130 self.source + "." + str(old_backup),
131 self.source + "." + str(old_backup + 1)
133 if not os.path.exists(self.source + ".0"):
134 os.rename(self.source, self.source + ".0")
137 if "private" in self.flags:
138 old_umask = os.umask(private_umask)
139 file_descriptor = open(self.source, "w")
140 if oct(stat.S_IMODE(os.stat(
141 self.source)[stat.ST_MODE])) != private_file_mode:
142 # if it's marked private, chmod it appropriately
143 os.chmod(self.source, private_file_mode)
145 old_umask = os.umask(normal_umask)
146 file_descriptor = open(self.source, "w")
149 # write and close the file
150 yaml.safe_dump(self.data, allow_unicode=True,
151 default_flow_style=False, stream=file_descriptor)
152 file_descriptor.close()
154 # unset the modified flag
155 self.modified = False
157 def is_writeable(self):
158 """Returns True if the _lock is False."""
160 return not self.data.get("_lock", False)
174 """Return an absolute file path based on configuration."""
176 # this is all unnecessary if it's already absolute
177 if file_name and os.path.isabs(file_name):
178 return os.path.realpath(file_name)
180 # if a universe was provided, try to get some defaults from there
184 universe, "contents") and "mudpy.filing" in universe.contents:
185 filing = universe.contents["mudpy.filing"]
187 prefix = filing.get("prefix")
189 search = filing.get("search")
191 stash = filing.get("stash")
193 # if there's only one file loaded, try to work around a chicken<egg
194 elif hasattr(universe, "files") and len(
196 ) == 1 and not universe.files[
197 list(universe.files.keys())[0]].is_writeable():
198 data_file = universe.files[list(universe.files.keys())[0]].data
200 # try for a fallback default directory
202 stash = data_file.get(".mudpy.filing.stash", "")
204 # try for a fallback root path
206 prefix = data_file.get(".mudpy.filing.prefix", "")
208 # try for a fallback search path
210 search = data_file.get(".mudpy.filing.search", "")
212 # another fallback root path, this time from the universe startdir
213 if hasattr(universe, "startdir"):
215 prefix = universe.startdir
216 elif not os.path.isabs(prefix):
217 prefix = os.path.join(universe.startdir, prefix)
219 # when no root path is specified, assume the current working directory
220 if (not prefix or prefix == ".") and hasattr(universe, "startdir"):
221 prefix = universe.startdir
223 # make sure it's absolute
224 prefix = os.path.realpath(prefix)
226 # if there's no search path, just use the root path and etc
228 search = [prefix, "etc"]
230 # work on a copy of the search path, to avoid modifying the caller's
234 # if there's no default path, use the last component of the search path
238 # if an existing file or directory reference was supplied, prepend it
240 if os.path.isdir(relative):
241 search = [relative] + search
243 search = [os.path.dirname(relative)] + search
245 # make the search path entries absolute and throw away any dupes
247 for each_path in search:
248 if not os.path.isabs(each_path):
249 each_path = os.path.realpath(os.path.join(prefix, each_path))
250 if each_path not in clean_search:
251 clean_search.append(each_path)
253 # start hunting for the file now
254 for each_path in clean_search:
256 # construct the candidate path
257 candidate = os.path.join(each_path, file_name)
259 # if the file exists and is readable, we're done
260 if os.path.isfile(candidate):
261 file_name = os.path.realpath(candidate)
264 # if the path is a directory, look for an __init__ file
265 if os.path.isdir(candidate):
266 file_name = os.path.realpath(
267 os.path.join(candidate, "__init__.yaml"))
270 # it didn't exist after all, so use the default path instead
271 if not os.path.isabs(file_name):
272 file_name = os.path.join(stash, file_name)
273 if not os.path.isabs(file_name):
274 file_name = os.path.join(prefix, file_name)
276 # and normalize it last thing before returning
277 file_name = os.path.realpath(file_name)