Update the copyright year in the LICENSE file
[mudpy.git] / mudpy / data.py
1 """Data interface functions for the mudpy engine."""
2
3 # Copyright (c) 2004-2018 mudpy authors. Permission to use, copy,
4 # modify, and distribute this software is granted under terms
5 # provided in the LICENSE file distributed with this software.
6
7 import os
8 import re
9 import stat
10
11 import mudpy
12 import yaml
13
14
15 class _IBSEmitter(yaml.emitter.Emitter):
16
17     """Override the default YAML Emitter to indent block sequences."""
18
19     def expect_block_sequence(self):
20         """Match the expectations of the ``yamllint`` style checker."""
21
22         # TODO(fungi) Get an option for this implemented upstream in
23         # the pyyaml library
24         self.increase_indent(flow=False, indentless=False)
25         self.state = self.expect_first_block_sequence_item
26
27
28 class _IBSDumper(yaml.SafeDumper, _IBSEmitter):
29
30     """Use our _IBSEmitter instead of the default implementation."""
31
32     pass
33
34
35 class Data:
36
37     """A file containing universe elements and their facets."""
38
39     def __init__(self,
40                  source,
41                  universe,
42                  flags=None,
43                  relative=None,
44                  ):
45         self.source = source
46         self.universe = universe
47         if flags is None:
48             self.flags = []
49         else:
50             self.flags = flags[:]
51         self.relative = relative
52         self.load()
53
54     def load(self):
55         """Read a file, create elements and poplulate facets accordingly."""
56         self.modified = False
57         self.source = find_file(
58                 self.source, relative=self.relative, universe=self.universe)
59         try:
60             self.data = yaml.safe_load(open(self.source))
61             log_entry = ("Loaded file %s into memory." % self.source, 5)
62         except FileNotFoundError:
63             # it's normal if the file is one which doesn't exist yet
64             self.data = {}
65             log_entry = ("File %s is unavailable." % self.source, 6)
66         try:
67             mudpy.misc.log(*log_entry)
68         except NameError:
69             # happens when we're not far enough along in the init process
70             self.universe.setup_loglines.append(log_entry)
71         if not hasattr(self.universe, "files"):
72             self.universe.files = {}
73         self.universe.files[self.source] = self
74         includes = []
75         for node in list(self.data):
76             if node == "_load":
77                 includes += self.data["_load"]
78                 continue
79             if node.startswith("_"):
80                 continue
81             facet_pos = node.rfind(".") + 1
82             prefix = node[:facet_pos].strip(".")
83             try:
84                 element = self.universe.contents[prefix]
85             except KeyError:
86                 element = mudpy.misc.Element(prefix, self.universe, self)
87             element.set(node[facet_pos:], self.data[node])
88             if prefix.startswith("mudpy.movement."):
89                 self.universe.directions.add(
90                     prefix[prefix.rfind(".") + 1:])
91         for include_file in includes:
92             if not os.path.isabs(include_file):
93                 include_file = find_file(
94                     include_file,
95                     relative=self.source,
96                     universe=self.universe
97                 )
98             if (include_file not in self.universe.files or not
99                     self.universe.files[include_file].is_writeable()):
100                 Data(include_file, self.universe)
101
102     def save(self):
103         """Write the data, if necessary."""
104         normal_umask = 0o0022
105         private_umask = 0o0077
106         private_file_mode = 0o0600
107
108         # when modified, writeable and has content or the file exists
109         if self.modified and self.is_writeable() and (
110            self.data or os.path.exists(self.source)
111            ):
112
113             # make parent directories if necessary
114             old_umask = os.umask(normal_umask)
115             os.makedirs(os.path.dirname(self.source), exist_ok=True)
116             os.umask(old_umask)
117
118             # backup the file
119             if "mudpy.limit" in self.universe.contents:
120                 max_count = self.universe.contents["mudpy.limit"].get(
121                     "backups", 0)
122             else:
123                 max_count = 0
124             if os.path.exists(self.source) and max_count:
125                 backups = []
126                 for candidate in os.listdir(os.path.dirname(self.source)):
127                     if re.match(
128                        os.path.basename(self.source) +
129                        r"""\.\d+$""", candidate
130                        ):
131                         backups.append(int(candidate.split(".")[-1]))
132                 backups.sort()
133                 backups.reverse()
134                 for old_backup in backups:
135                     if old_backup >= max_count - 1:
136                         os.remove(self.source + "." + str(old_backup))
137                     elif not os.path.exists(
138                         self.source + "." + str(old_backup + 1)
139                     ):
140                         os.rename(
141                             self.source + "." + str(old_backup),
142                             self.source + "." + str(old_backup + 1)
143                         )
144                 if not os.path.exists(self.source + ".0"):
145                     os.rename(self.source, self.source + ".0")
146
147             # our data file
148             if "private" in self.flags:
149                 old_umask = os.umask(private_umask)
150                 file_descriptor = open(self.source, "w")
151                 if oct(stat.S_IMODE(os.stat(
152                         self.source)[stat.ST_MODE])) != private_file_mode:
153                     # if it's marked private, chmod it appropriately
154                     os.chmod(self.source, private_file_mode)
155             else:
156                 old_umask = os.umask(normal_umask)
157                 file_descriptor = open(self.source, "w")
158             os.umask(old_umask)
159
160             # write and close the file
161             yaml.dump(self.data, Dumper=_IBSDumper, allow_unicode=True,
162                       default_flow_style=False, explicit_start=True, indent=4,
163                       stream=file_descriptor)
164             file_descriptor.close()
165
166             # unset the modified flag
167             self.modified = False
168
169     def is_writeable(self):
170         """Returns True if the _lock is False."""
171         try:
172             return not self.data.get("_lock", False)
173         except KeyError:
174             return True
175
176
177 def find_file(
178     file_name=None,
179     group=None,
180     prefix=None,
181     relative=None,
182     search=None,
183     stash=None,
184     universe=None
185 ):
186     """Return an absolute file path based on configuration."""
187
188     # this is all unnecessary if it's already absolute
189     if file_name and os.path.isabs(file_name):
190         return os.path.realpath(file_name)
191
192     # if a universe was provided, try to get some defaults from there
193     if universe:
194
195         if hasattr(
196                 universe, "contents") and "mudpy.filing" in universe.contents:
197             filing = universe.contents["mudpy.filing"]
198             if not prefix:
199                 prefix = filing.get("prefix")
200             if not search:
201                 search = filing.get("search")
202             if not stash:
203                 stash = filing.get("stash")
204
205         # if there's only one file loaded, try to work around a chicken<egg
206         elif hasattr(universe, "files") and len(
207             universe.files
208         ) == 1 and not universe.files[
209                 list(universe.files.keys())[0]].is_writeable():
210             data_file = universe.files[list(universe.files.keys())[0]].data
211
212             # try for a fallback default directory
213             if not stash:
214                 stash = data_file.get(".mudpy.filing.stash", "")
215
216             # try for a fallback root path
217             if not prefix:
218                 prefix = data_file.get(".mudpy.filing.prefix", "")
219
220             # try for a fallback search path
221             if not search:
222                 search = data_file.get(".mudpy.filing.search", "")
223
224         # another fallback root path, this time from the universe startdir
225         if hasattr(universe, "startdir"):
226             if not prefix:
227                 prefix = universe.startdir
228             elif not os.path.isabs(prefix):
229                 prefix = os.path.join(universe.startdir, prefix)
230
231     # when no root path is specified, assume the current working directory
232     if (not prefix or prefix == ".") and hasattr(universe, "startdir"):
233         prefix = universe.startdir
234
235     # make sure it's absolute
236     prefix = os.path.realpath(prefix)
237
238     # if there's no search path, just use the root path and etc
239     if not search:
240         search = [prefix, "etc"]
241
242     # work on a copy of the search path, to avoid modifying the caller's
243     else:
244         search = search[:]
245
246     # if there's no default path, use the last component of the search path
247     if not stash:
248         stash = search[-1]
249
250     # if an existing file or directory reference was supplied, prepend it
251     if relative:
252         if os.path.isdir(relative):
253             search = [relative] + search
254         else:
255             search = [os.path.dirname(relative)] + search
256
257     # make the search path entries absolute and throw away any dupes
258     clean_search = []
259     for each_path in search:
260         if not os.path.isabs(each_path):
261             each_path = os.path.realpath(os.path.join(prefix, each_path))
262         if each_path not in clean_search:
263             clean_search.append(each_path)
264
265     # start hunting for the file now
266     for each_path in clean_search:
267
268         # construct the candidate path
269         candidate = os.path.join(each_path, file_name)
270
271         # if the file exists and is readable, we're done
272         if os.path.isfile(candidate):
273             file_name = os.path.realpath(candidate)
274             break
275
276         # if the path is a directory, look for an __init__ file
277         if os.path.isdir(candidate):
278             file_name = os.path.realpath(
279                     os.path.join(candidate, "__init__.yaml"))
280             break
281
282     # it didn't exist after all, so use the default path instead
283     if not os.path.isabs(file_name):
284         file_name = os.path.join(stash, file_name)
285     if not os.path.isabs(file_name):
286         file_name = os.path.join(prefix, file_name)
287
288     # and normalize it last thing before returning
289     file_name = os.path.realpath(file_name)
290     return file_name