Group all imports at the tops of files and alphabetize, for ease of
maintainabiity.
# -*- coding: utf-8 -*-
"""Generates potential ChangeLog file contents from commit messages."""
# -*- coding: utf-8 -*-
"""Generates potential ChangeLog file contents from commit messages."""
-# Copyright (c) 2010-2012 Jeremy Stanley <fungi@yuggoth.org>. Permission
+# Copyright (c) 2010-2013 Jeremy Stanley <fungi@yuggoth.org>. Permission
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
-# needs GitPython: http://gitorious.org/git-python
-import git
copyright = """\
Copyright (c) 2004-2012 Jeremy Stanley <fungi@yuggoth.org>. Permission to
use, copy, modify, and distribute this software is granted under terms
copyright = """\
Copyright (c) 2004-2012 Jeremy Stanley <fungi@yuggoth.org>. Permission to
use, copy, modify, and distribute this software is granted under terms
# terms provided in the LICENSE file distributed with this software.
# core objects for the mudpy engine
# terms provided in the LICENSE file distributed with this software.
# core objects for the mudpy engine
import sys
sys.path.append( os.path.realpath("lib") )
import sys
sys.path.append( os.path.realpath("lib") )
import mudpy
# start it up
import mudpy
# start it up
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
conversation = (
("Identify yourself:", "testuser"),
("Enter your choice:", "n"),
conversation = (
("Identify yourself:", "testuser"),
("Enter your choice:", "n"),
("Disconnecting...", ""),
)
("Disconnecting...", ""),
)
mud = telnetlib.Telnet()
mud.open("::1", 6669)
for question, answer in conversation:
mud = telnetlib.Telnet()
mud.open("::1", 6669)
for question, answer in conversation:
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
+import imp
+
+import mudpy
+
def load():
"""Import/reload some modules (be careful, as this can result in loops)."""
def load():
"""Import/reload some modules (be careful, as this can result in loops)."""
# pick up the modules list from this package
global modules
# pick up the modules list from this package
global modules
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
+import codecs
+import os
+import re
+import stat
+import sys
+
+# TODO: remove this check after the switch to py3k
+try:
+ import configparser
+except ImportError:
+ import ConfigParser as configparser
+
+import mudpy
+
def load(self):
"""Read a file and create elements accordingly."""
def load(self):
"""Read a file and create elements accordingly."""
- import mudpy.misc
- import os
- import os.path
- # TODO: remove this check after the switch to py3k
- try:
- import configparser
- except ImportError:
- import ConfigParser as configparser
self.data = configparser.RawConfigParser()
self.modified = False
if os.access(self.filename, os.R_OK):
self.data = configparser.RawConfigParser()
self.modified = False
if os.access(self.filename, os.R_OK):
def save(self):
"""Write the data, if necessary."""
def save(self):
"""Write the data, if necessary."""
- import codecs
- import os
- import os.path
- import re
- import stat
# when modified, writeable and has content or the file exists
if self.modified and self.is_writeable() and (
# when modified, writeable and has content or the file exists
if self.modified and self.is_writeable() and (
universe=None
):
"""Return an absolute file path based on configuration."""
universe=None
):
"""Return an absolute file path based on configuration."""
- import os
- import os.path
- import sys
# make sure to get rid of any surrounding quotes first thing
if file_name:
# make sure to get rid of any surrounding quotes first thing
if file_name:
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
+import codecs
+import ctypes
+import ctypes.util
+import os
+import random
+import re
+import signal
+import socket
+import sys
+import syslog
+import time
+import traceback
+import unicodedata
+
+import mudpy
+
def __init__(self, key, universe, filename=None):
"""Set up a new element."""
def __init__(self, key, universe, filename=None):
"""Set up a new element."""
- import mudpy.data
- import os.path
# keep track of our key name
self.key = key
# keep track of our key name
self.key = key
def getlist(self, facet, default=None):
"""Return values as list type."""
def getlist(self, facet, default=None):
"""Return values as list type."""
if default is None:
default = []
value = self.get(facet)
if default is None:
default = []
value = self.get(facet)
def getdict(self, facet, default=None):
"""Return values as dict type."""
def getdict(self, facet, default=None):
"""Return values as dict type."""
if default is None:
default = {}
value = self.get(facet)
if default is None:
default = {}
value = self.get(facet)
def portals(self):
"""Map the portal directions for a room to neighbors."""
def portals(self):
"""Map the portal directions for a room to neighbors."""
portals = {}
if re.match("""^location:-?\d+,-?\d+,-?\d+$""", self.key):
coordinates = [(int(x))
portals = {}
if re.match("""^location:-?\d+,-?\d+,-?\d+$""", self.key):
coordinates = [(int(x))
def __init__(self, filename="", load=False):
"""Initialize the universe."""
def __init__(self, filename="", load=False):
"""Initialize the universe."""
- import os
- import os.path
self.categories = {}
self.contents = {}
self.default_origins = {}
self.categories = {}
self.contents = {}
self.default_origins = {}
def load(self):
"""Load universe data from persistent storage."""
def load(self):
"""Load universe data from persistent storage."""
# the files dict must exist and filename needs to be read-only
if not hasattr(
# the files dict must exist and filename needs to be read-only
if not hasattr(
def initialize_server_socket(self):
"""Create and open the listening socket."""
def initialize_server_socket(self):
"""Create and open the listening socket."""
# need to know the local address and port number for the listener
host = self.categories["internal"]["network"].get("host")
# need to know the local address and port number for the listener
host = self.categories["internal"]["network"].get("host")
def __init__(self):
"""Default values for the in-memory user variables."""
def __init__(self):
"""Default values for the in-memory user variables."""
self.account = None
self.address = ""
self.authenticated = False
self.account = None
self.address = ""
self.authenticated = False
def adjust_echoing(self):
"""Adjust echoing to match state menu requirements."""
def adjust_echoing(self):
"""Adjust echoing to match state menu requirements."""
if mudpy.telnet.is_enabled(self, mudpy.telnet.TELOPT_ECHO,
mudpy.telnet.US):
if menu_echo_on(self.state):
if mudpy.telnet.is_enabled(self, mudpy.telnet.TELOPT_ECHO,
mudpy.telnet.US):
if menu_echo_on(self.state):
prepend_padding=True
):
"""Send arbitrary text to a connected user."""
prepend_padding=True
):
"""Send arbitrary text to a connected user."""
# unless raw mode is on, clean it up all nice and pretty
if not raw:
# unless raw mode is on, clean it up all nice and pretty
if not raw:
def enqueue_input(self):
"""Process and enqueue any new input."""
def enqueue_input(self):
"""Process and enqueue any new input."""
- import mudpy.telnet
- import unicodedata
# check for some input
try:
# check for some input
try:
def log(message, level=0):
"""Log a message."""
def log(message, level=0):
"""Log a message."""
- import codecs
- import os.path
- import syslog
- import time
# a couple references we need
file_name = universe.categories["internal"]["logging"].get("file")
# a couple references we need
file_name = universe.categories["internal"]["logging"].get("file")
def glyph_columns(character):
"""Convenience function to return the column width of a glyph."""
def glyph_columns(character):
"""Convenience function to return the column width of a glyph."""
if unicodedata.east_asian_width(character) in "FW":
return 2
else:
if unicodedata.east_asian_width(character) in "FW":
return 2
else:
def wrap_ansi_text(text, width):
"""Wrap text with arbitrary width while ignoring ANSI colors."""
def wrap_ansi_text(text, width):
"""Wrap text with arbitrary width while ignoring ANSI colors."""
# the current position in the entire text string, including all
# characters, printable or otherwise
# the current position in the entire text string, including all
# characters, printable or otherwise
def weighted_choice(data):
"""Takes a dict weighted by value and returns a random key."""
def weighted_choice(data):
"""Takes a dict weighted by value and returns a random key."""
# this will hold our expanded list of keys from the data
expanded = []
# this will hold our expanded list of keys from the data
expanded = []
def random_name():
"""Returns a random character name."""
def random_name():
"""Returns a random character name."""
# the vowels and consonants needed to create romaji syllables
vowels = [
# the vowels and consonants needed to create romaji syllables
vowels = [
def replace_macros(user, text, is_input=False):
"""Replaces macros in text output."""
def replace_macros(user, text, is_input=False):
"""Replaces macros in text output."""
- import codecs
- import mudpy.data
- import os.path
# third person pronouns
pronouns = {
# third person pronouns
pronouns = {
def on_pulse():
"""The things which should happen on each pulse, aside from reloads."""
def on_pulse():
"""The things which should happen on each pulse, aside from reloads."""
# open the listening socket if it hasn't been already
if not hasattr(universe, "listening_socket"):
# open the listening socket if it hasn't been already
if not hasattr(universe, "listening_socket"):
def check_for_connection(listening_socket):
"""Check for a waiting connection and return a new user object."""
def check_for_connection(listening_socket):
"""Check for a waiting connection and return a new user object."""
# try to accept a new connection
try:
# try to accept a new connection
try:
def handle_user_input(user):
"""The main handler, branches to a state-specific handler."""
def handle_user_input(user):
"""The main handler, branches to a state-specific handler."""
# if the user's client echo is off, send a blank line for aesthetics
if mudpy.telnet.is_enabled(user, mudpy.telnet.TELOPT_ECHO,
# if the user's client echo is off, send a blank line for aesthetics
if mudpy.telnet.is_enabled(user, mudpy.telnet.TELOPT_ECHO,
def handler_checking_password(user):
"""Handle the login account password."""
def handler_checking_password(user):
"""Handle the login account password."""
# get the next waiting line of input
input_data = user.input_queue.pop(0)
# get the next waiting line of input
input_data = user.input_queue.pop(0)
def handler_entering_new_password(user):
"""Handle a new password entry."""
def handler_entering_new_password(user):
"""Handle a new password entry."""
# get the next waiting line of input
input_data = user.input_queue.pop(0)
# get the next waiting line of input
input_data = user.input_queue.pop(0)
def handler_verifying_new_password(user):
"""Handle the re-entered new password for verification."""
def handler_verifying_new_password(user):
"""Handle the re-entered new password for verification."""
# get the next waiting line of input
input_data = user.input_queue.pop(0)
# get the next waiting line of input
input_data = user.input_queue.pop(0)
def command_say(actor, parameters):
"""Speak to others in the same room."""
def command_say(actor, parameters):
"""Speak to others in the same room."""
# check for replacement macros and escape them
parameters = escape_macros(parameters)
# check for replacement macros and escape them
parameters = escape_macros(parameters)
def command_show(actor, parameters):
"""Show program data."""
def command_show(actor, parameters):
"""Show program data."""
message = ""
arguments = parameters.split()
if not parameters:
message = ""
arguments = parameters.split()
if not parameters:
def command_error(actor, input_data):
"""Generic error for an unrecognized command word."""
def command_error(actor, input_data):
"""Generic error for an unrecognized command word."""
# 90% of the time use a generic error
if random.randrange(10):
# 90% of the time use a generic error
if random.randrange(10):
def daemonize(universe):
"""Fork and disassociate from everything."""
def daemonize(universe):
"""Fork and disassociate from everything."""
- import codecs
- import ctypes
- import ctypes.util
- import os
- import os.path
- import sys
# only if this is what we're configured to do
if universe.contents["internal:process"].getboolean("daemon"):
# only if this is what we're configured to do
if universe.contents["internal:process"].getboolean("daemon"):
def create_pidfile(universe):
"""Write a file containing the current process ID."""
def create_pidfile(universe):
"""Write a file containing the current process ID."""
- import codecs
- import os
- import os.path
pid = str(os.getpid())
log("Process ID: " + pid)
file_name = universe.contents["internal:process"].get("pidfile")
pid = str(os.getpid())
log("Process ID: " + pid)
file_name = universe.contents["internal:process"].get("pidfile")
def remove_pidfile(universe):
"""Remove the file containing the current process ID."""
def remove_pidfile(universe):
"""Remove the file containing the current process ID."""
- import os
- import os.path
file_name = universe.contents["internal:process"].get("pidfile")
if file_name:
if not os.path.isabs(file_name):
file_name = universe.contents["internal:process"].get("pidfile")
if file_name:
if not os.path.isabs(file_name):
def excepthook(excepttype, value, tracebackdata):
"""Handle uncaught exceptions."""
def excepthook(excepttype, value, tracebackdata):
"""Handle uncaught exceptions."""
# assemble the list of errors into a single string
message = "".join(
# assemble the list of errors into a single string
message = "".join(
def sighook(what, where):
"""Handle external signals."""
def sighook(what, where):
"""Handle external signals."""
# a generic message
message = "Caught signal: "
# a generic message
message = "Caught signal: "
def override_excepthook():
"""Redefine sys.excepthook with our own."""
def override_excepthook():
"""Redefine sys.excepthook with our own."""
sys.excepthook = excepthook
def assign_sighook():
"""Assign a customized handler for some signals."""
sys.excepthook = excepthook
def assign_sighook():
"""Assign a customized handler for some signals."""
signal.signal(signal.SIGHUP, sighook)
signal.signal(signal.SIGTERM, sighook)
def setup():
"""This contains functions to be performed when starting the engine."""
signal.signal(signal.SIGHUP, sighook)
signal.signal(signal.SIGTERM, sighook)
def setup():
"""This contains functions to be performed when starting the engine."""
# see if a configuration file was specified
if len(sys.argv) > 1:
# see if a configuration file was specified
if len(sys.argv) > 1:
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
+import base64
+import hashlib
+import math
+import random
+import re
+import struct
+
# convenience constants for indexing the supported hashing algorithms,
# guaranteed a stable part of the interface
MD5 = 0 # hashlib.md5
# convenience constants for indexing the supported hashing algorithms,
# guaranteed a stable part of the interface
MD5 = 0 # hashlib.md5
This is a wrapper around struct.pack, used to turn a list of integers
between 0 and 255 into a packed sequence akin to a C-style string.
"""
This is a wrapper around struct.pack, used to turn a list of integers
between 0 and 255 into a packed sequence akin to a C-style string.
"""
packed = b""
for number in numbers:
number = int(number)
packed = b""
for number in numbers:
number = int(number)
This is a wrapper around base64.b64encode with preferences
appropriate for encoding Unix-style passwd hash strings.
"""
This is a wrapper around base64.b64encode with preferences
appropriate for encoding Unix-style passwd hash strings.
"""
return base64.b64encode(
byte_sequence,
b"./"
return base64.b64encode(
byte_sequence,
b"./"
need and discard any excess characters over the specified length.
This ensures full distribution over each character of the salt.
"""
need and discard any excess characters over the specified length.
This ensures full distribution over each character of the salt.
"""
- import math
- import random
salt = []
for i in range(int(math.ceil(salt_len * 0.75))):
salt.append(random.randint(0, 255))
salt = []
for i in range(int(math.ceil(salt_len * 0.75))):
salt.append(random.randint(0, 255))
facets to this function, a conforming new-style password hash will be
returned.
"""
facets to this function, a conforming new-style password hash will be
returned.
"""
assert re.match("^[0-9a-f]{32}$",
legacy_hash), "Not a valid MD5 hexdigest"
collapsed = b""
assert re.match("^[0-9a-f]{32}$",
legacy_hash), "Not a valid MD5 hexdigest"
collapsed = b""
create(password, algorithm=SHA256, rounds=12, salt_len=16)
"""
create(password, algorithm=SHA256, rounds=12, salt_len=16)
"""
# if a specific salt wasn't specified, we need to generate one
if not salt:
# if a specific salt wasn't specified, we need to generate one
if not salt:
comes out the same as the encoded_hash.
"""
sep = encoded_hash[0]
comes out the same as the encoded_hash.
"""
sep = encoded_hash[0]
algorithm, rounds, salt, hashed = encoded_hash.split(sep)[1:]
if encoded_hash == create(
password=password,
algorithm, rounds, salt, hashed = encoded_hash.split(sep)[1:]
if encoded_hash == create(
password=password,
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
# to use, copy, modify, and distribute this software is granted under
# terms provided in the LICENSE file distributed with this software.
# telnet options (from bsd's arpa/telnet.h since telnetlib's are ambiguous)
TELOPT_BINARY = 0 # transmit 8-bit data by the receiver (rfc 856)
TELOPT_ECHO = 1 # echo received data back to the sender (rfc 857)
# telnet options (from bsd's arpa/telnet.h since telnetlib's are ambiguous)
TELOPT_BINARY = 0 # transmit 8-bit data by the receiver (rfc 856)
TELOPT_ECHO = 1 # echo received data back to the sender (rfc 857)
def negotiate_telnet_options(user):
"""Reply to and remove telnet negotiation options from partial_input."""
def negotiate_telnet_options(user):
"""Reply to and remove telnet negotiation options from partial_input."""
# make a local copy to play with
text = user.partial_input
# make a local copy to play with
text = user.partial_input