1 """Miscellaneous objects for the MUFF Engine"""
3 # Copyright (c) 2005 MUDpy, The Fungi <fungi@yuggoth.org>, all rights reserved.
4 # Licensed per terms in the LICENSE file distributed with this software.
8 # used by several functions for random calls
11 # random_name uses string.strip
14 # the log function uses time.asctime for creating timestamps
17 # hack to load all modules in the muff package
19 for module in muff.__all__:
20 exec("import " + module)
22 def broadcast(message):
23 """Send a message to all connected users."""
24 for each_user in muffvars.userlist: each_user.send("$(eol)" + message)
29 # the time in posix log timestamp format
30 timestamp = time.asctime()[4:19]
32 # send the timestamp and message to standard output
33 print(timestamp + " " + message)
35 def wrap_ansi_text(text, width):
36 """Wrap text with arbitrary width while ignoring ANSI colors."""
38 # the current position in the entire text string, including all
39 # characters, printable or otherwise
42 # the current text position relative to the begining of the line,
43 # ignoring color escape sequences
46 # whether the current character is part of a color escape sequence
49 # iterate over each character from the begining of the text
50 for each_character in text:
52 # the current character is the escape character
53 if each_character == chr(27):
56 # the current character is within an escape sequence
59 # the current character is m, which terminates the
60 # current escape sequence
61 if each_character == "m":
64 # the current character is a newline, so reset the relative
65 # position (start a new line)
66 elif each_character == "\n":
69 # the current character meets the requested maximum line width,
70 # so we need to backtrack and find a space at which to wrap
71 elif relative_position == width:
73 # distance of the current character examined from the
77 # count backwards until we find a space
78 while text[absolute_position - wrap_offset] != " ":
81 # insert an eol in place of the space
82 text = text[:absolute_position - wrap_offset] + "\r\n" + text[absolute_position - wrap_offset + 1:]
84 # increase the absolute position because an eol is two
85 # characters but the space it replaced was only one
86 absolute_position += 1
88 # now we're at the begining of a new line, plus the
89 # number of characters wrapped from the previous line
90 relative_position = wrap_offset
92 # as long as the character is not a carriage return and the
93 # other above conditions haven't been met, count it as a
95 elif each_character != "\r":
96 relative_position += 1
98 # increase the absolute position for every character
99 absolute_position += 1
101 # return the newly-wrapped text
104 def weighted_choice(data):
105 """Takes a dict weighted by value and returns a random key."""
107 # this will hold our expanded list of keys from the data
110 # create thee expanded list of keys
111 for key in data.keys():
112 for count in range(data[key]):
115 # return one at random
116 return random.choice(expanded)
119 """Returns a random character name."""
121 # the vowels and consonants needed to create romaji syllables
122 vowels = [ "a", "i", "u", "e", "o" ]
123 consonants = ["'", "k", "z", "s", "sh", "z", "j", "t", "ch", "ts", "d", "n", "h", "f", "m", "y", "r", "w" ]
125 # this dict will hold our weighted list of syllables
128 # generate the list with an even weighting
129 for consonant in consonants:
131 syllables[consonant + vowel] = 1
133 # we'll build the name into this string
136 # create a name of random length from the syllables
137 for syllable in range(random.randrange(2, 6)):
138 name += weighted_choice(syllables)
140 # strip any leading quotemark, capitalize and return the name
141 return string.strip(name, "'").capitalize()
143 def replace_macros(user, text, is_input=False):
144 """Replaces macros in text output."""
146 macro_start = string.find(text, "$(")
147 if macro_start == -1: break
148 macro_end = string.find(text, ")", macro_start) + 1
149 macro = text[macro_start:macro_end]
150 if macro in muffvars.macros.keys():
151 text = string.replace(text, macro, muffvars.macros[macro])
153 # the user's account name
154 elif macro == "$(account)":
155 text = string.replace(text, macro, user.name)
157 # third person subjective pronoun
158 elif macro == "$(tpsp)":
159 if user.avatar.get("gender") == "male":
160 text = string.replace(text, macro, "he")
161 elif user.avatar.get("gender") == "female":
162 text = string.replace(text, macro, "she")
164 text = string.replace(text, macro, "it")
166 # third person objective pronoun
167 elif macro == "$(tpop)":
168 if user.avatar.get("gender") == "male":
169 text = string.replace(text, macro, "him")
170 elif user.avatar.get("gender") == "female":
171 text = string.replace(text, macro, "her")
173 text = string.replace(text, macro, "it")
175 # third person possessive pronoun
176 elif macro == "$(tppp)":
177 if user.avatar.get("gender") == "male":
178 text = string.replace(text, macro, "his")
179 elif user.avatar.get("gender") == "female":
180 text = string.replace(text, macro, "hers")
182 text = string.replace(text, macro, "its")
184 # if we get here, log and replace it with null
186 text = string.replace(text, macro, "")
188 log("Unexpected replacement macro " + macro + " encountered.")
190 # replace the look-like-a-macro sequence
191 text = string.replace(text, "$_(", "$(")
195 def check_time(frequency):
196 """Check for a factor of the current increment count."""
197 if type(frequency) is str:
198 frequency = muffconf.getint("time", frequency)
199 return not muffuniv.universe.internals["counters"].getint("elapsed") % frequency
202 """The things which should happen on each pulse, aside from reloads."""
204 # open the listening socket if it hasn't been already
205 if not muffvars.newsocket: muffsock.initialize_server_socket()
207 # assign a user if a new connection is waiting
208 user = muffsock.check_for_connection(muffvars.newsocket)
209 if user: muffvars.userlist.append(user)
211 # iterate over the connected users
212 for user in muffvars.userlist: user.pulse()
214 # update the log every now and then
215 if check_time("frequency_log"):
216 log(repr(len(muffvars.userlist)) + " connection(s)")
218 # periodically save everything
219 if check_time("frequency_save"):
220 for user in muffvars.userlist: user.save()
221 muffuniv.universe.save()
223 # pause for a configurable amount of time (decimal seconds)
224 time.sleep(muffconf.getfloat("time", "increment"))
226 # increment the elapsed increment counter
227 muffuniv.universe.internals["counters"].set("elapsed", muffuniv.universe.internals["counters"].getint("elapsed") + 1)
230 """Reload data into new persistent objects."""
233 temporary_userlist = []
234 for user in muffvars.userlist: temporary_userlist.append(user)
235 for user in temporary_userlist: user.reload()
236 del(temporary_userlist)