Fail selftest if anything is written to stderr
[mudpy.git] / mudpy / data.py
1 """Data interface functions for the mudpy engine."""
2
3 # Copyright (c) 2004-2021 mudpy authors. Permission to use, copy,
4 # modify, and distribute this software is granted under terms
5 # provided in the LICENSE file distributed with this software.
6
7 import os
8 import re
9 import stat
10
11 import mudpy
12 import yaml
13
14
15 class _IBSEmitter(yaml.emitter.Emitter):
16
17     """Override the default YAML Emitter to indent block sequences."""
18
19     def expect_block_sequence(self):
20         """Match the expectations of the ``yamllint`` style checker."""
21
22         # TODO(fungi) Get an option for this implemented upstream in
23         # the pyyaml library
24         self.increase_indent(flow=False, indentless=False)
25         self.state = self.expect_first_block_sequence_item
26
27
28 class _IBSDumper(yaml.SafeDumper, _IBSEmitter):
29
30     """Use our _IBSEmitter instead of the default implementation."""
31
32     pass
33
34
35 class Data:
36
37     """A file containing universe elements and their facets."""
38
39     def __init__(self,
40                  source,
41                  universe,
42                  flags=None,
43                  relative=None,
44                  ):
45         self.source = source
46         self.universe = universe
47         if flags is None:
48             self.flags = []
49         else:
50             self.flags = flags[:]
51         self.relative = relative
52         self.load()
53
54     def load(self):
55         """Read a file, create elements and poplulate facets accordingly."""
56         self.modified = False
57         self.source = find_file(
58                 self.source, relative=self.relative, universe=self.universe)
59         try:
60             with open(self.source) as datafd:
61                 self.data = yaml.safe_load(datafd)
62             log_entry = ("Loaded file %s into memory." % self.source, 5)
63         except FileNotFoundError:
64             # it's normal if the file is one which doesn't exist yet
65             self.data = {}
66             log_entry = ("File %s is unavailable." % self.source, 6)
67         try:
68             mudpy.misc.log(*log_entry)
69         except NameError:
70             # happens when we're not far enough along in the init process
71             self.universe.setup_loglines.append(log_entry)
72         if not hasattr(self.universe, "files"):
73             self.universe.files = {}
74         self.universe.files[self.source] = self
75         includes = []
76         for node in list(self.data):
77             if node == "_load":
78                 includes += self.data["_load"]
79                 continue
80             if node.startswith("_"):
81                 continue
82             facet_pos = node.rfind(".") + 1
83             prefix = node[:facet_pos].strip(".")
84             try:
85                 element = self.universe.contents[prefix]
86             except KeyError:
87                 element = mudpy.misc.Element(prefix, self.universe, self)
88             element.set(node[facet_pos:], self.data[node])
89             if prefix.startswith("mudpy.movement."):
90                 self.universe.directions.add(
91                     prefix[prefix.rfind(".") + 1:])
92         for include_file in includes:
93             if not os.path.isabs(include_file):
94                 include_file = find_file(
95                     include_file,
96                     relative=self.source,
97                     universe=self.universe
98                 )
99             if (include_file not in self.universe.files or not
100                     self.universe.files[include_file].is_writeable()):
101                 Data(include_file, self.universe)
102
103     def save(self):
104         """Write the data, if necessary."""
105         normal_umask = 0o0022
106         private_umask = 0o0077
107         private_file_mode = 0o0600
108
109         # when modified, writeable and has content or the file exists
110         if self.modified and self.is_writeable() and (
111            self.data or os.path.exists(self.source)
112            ):
113
114             # make parent directories if necessary
115             old_umask = os.umask(normal_umask)
116             os.makedirs(os.path.dirname(self.source), exist_ok=True)
117             os.umask(old_umask)
118
119             # backup the file
120             if "mudpy.limit" in self.universe.contents:
121                 max_count = self.universe.contents["mudpy.limit"].get(
122                     "backups", 0)
123             else:
124                 max_count = 0
125             if os.path.exists(self.source) and max_count:
126                 backups = []
127                 for candidate in os.listdir(os.path.dirname(self.source)):
128                     if re.match(
129                        os.path.basename(self.source) +
130                        r"""\.\d+$""", candidate
131                        ):
132                         backups.append(int(candidate.split(".")[-1]))
133                 backups.sort()
134                 backups.reverse()
135                 for old_backup in backups:
136                     if old_backup >= max_count - 1:
137                         os.remove(self.source + "." + str(old_backup))
138                     elif not os.path.exists(
139                         self.source + "." + str(old_backup + 1)
140                     ):
141                         os.rename(
142                             self.source + "." + str(old_backup),
143                             self.source + "." + str(old_backup + 1)
144                         )
145                 if not os.path.exists(self.source + ".0"):
146                     os.rename(self.source, self.source + ".0")
147
148             # our data file
149             if "private" in self.flags:
150                 old_umask = os.umask(private_umask)
151                 file_descriptor = open(self.source, "w")
152                 if oct(stat.S_IMODE(os.stat(
153                         self.source)[stat.ST_MODE])) != private_file_mode:
154                     # if it's marked private, chmod it appropriately
155                     os.chmod(self.source, private_file_mode)
156             else:
157                 old_umask = os.umask(normal_umask)
158                 file_descriptor = open(self.source, "w")
159             os.umask(old_umask)
160
161             # write and close the file
162             yaml.dump(self.data, Dumper=_IBSDumper, allow_unicode=True,
163                       default_flow_style=False, explicit_start=True, indent=4,
164                       stream=file_descriptor)
165             file_descriptor.close()
166
167             # unset the modified flag
168             self.modified = False
169
170     def is_writeable(self):
171         """Returns True if the _lock is False."""
172         try:
173             return not self.data.get("_lock", False)
174         except KeyError:
175             return True
176
177
178 def find_file(
179     file_name=None,
180     group=None,
181     prefix=None,
182     relative=None,
183     search=None,
184     stash=None,
185     universe=None
186 ):
187     """Return an absolute file path based on configuration."""
188
189     # this is all unnecessary if it's already absolute
190     if file_name and os.path.isabs(file_name):
191         return os.path.realpath(file_name)
192
193     # if a universe was provided, try to get some defaults from there
194     if universe:
195
196         if hasattr(
197                 universe, "contents") and "mudpy.filing" in universe.contents:
198             filing = universe.contents["mudpy.filing"]
199             if not prefix:
200                 prefix = filing.get("prefix")
201             if not search:
202                 search = filing.get("search")
203             if not stash:
204                 stash = filing.get("stash")
205
206         # if there's only one file loaded, try to work around a chicken<egg
207         elif hasattr(universe, "files") and len(
208             universe.files
209         ) == 1 and not universe.files[
210                 list(universe.files.keys())[0]].is_writeable():
211             data_file = universe.files[list(universe.files.keys())[0]].data
212
213             # try for a fallback default directory
214             if not stash:
215                 stash = data_file.get(".mudpy.filing.stash", "")
216
217             # try for a fallback root path
218             if not prefix:
219                 prefix = data_file.get(".mudpy.filing.prefix", "")
220
221             # try for a fallback search path
222             if not search:
223                 search = data_file.get(".mudpy.filing.search", "")
224
225         # another fallback root path, this time from the universe startdir
226         if hasattr(universe, "startdir"):
227             if not prefix:
228                 prefix = universe.startdir
229             elif not os.path.isabs(prefix):
230                 prefix = os.path.join(universe.startdir, prefix)
231
232     # when no root path is specified, assume the current working directory
233     if (not prefix or prefix == ".") and hasattr(universe, "startdir"):
234         prefix = universe.startdir
235
236     # make sure it's absolute
237     prefix = os.path.realpath(prefix)
238
239     # if there's no search path, just use the root path and etc
240     if not search:
241         search = [prefix, "etc"]
242
243     # work on a copy of the search path, to avoid modifying the caller's
244     else:
245         search = search[:]
246
247     # if there's no default path, use the last component of the search path
248     if not stash:
249         stash = search[-1]
250
251     # if an existing file or directory reference was supplied, prepend it
252     if relative:
253         if os.path.isdir(relative):
254             search = [relative] + search
255         else:
256             search = [os.path.dirname(relative)] + search
257
258     # make the search path entries absolute and throw away any dupes
259     clean_search = []
260     for each_path in search:
261         if not os.path.isabs(each_path):
262             each_path = os.path.realpath(os.path.join(prefix, each_path))
263         if each_path not in clean_search:
264             clean_search.append(each_path)
265
266     # start hunting for the file now
267     for each_path in clean_search:
268
269         # construct the candidate path
270         candidate = os.path.join(each_path, file_name)
271
272         # if the file exists and is readable, we're done
273         if os.path.isfile(candidate):
274             file_name = os.path.realpath(candidate)
275             break
276
277         # if the path is a directory, look for an __init__ file
278         if os.path.isdir(candidate):
279             file_name = os.path.realpath(
280                     os.path.join(candidate, "__init__.yaml"))
281             break
282
283     # it didn't exist after all, so use the default path instead
284     if not os.path.isabs(file_name):
285         file_name = os.path.join(stash, file_name)
286     if not os.path.isabs(file_name):
287         file_name = os.path.join(prefix, file_name)
288
289     # and normalize it last thing before returning
290     file_name = os.path.realpath(file_name)
291     return file_name