Merge branch 'multiprocess'

This commit is contained in:
Daniel Folkinshteyn 2010-08-05 14:01:23 -04:00
commit e779b70609
6 changed files with 77 additions and 4 deletions

View File

@ -94,6 +94,18 @@ class Status(callbacks.Plugin):
irc.reply(s) irc.reply(s)
threads = wrap(threads) threads = wrap(threads)
def processes(self, irc, msg, args):
"""takes no arguments
Returns the number of processes that have been spawned.
"""
# TODO: maintain a dict of active subprocesses, so we can
# include a list thereof in output, linke in threads(). maybe?
s = format('I have spawned %n.',
(world.processesSpawned, 'process'))
irc.reply(s)
processes = wrap(processes)
def net(self, irc, msg, args): def net(self, irc, msg, args):
"""takes no arguments """takes no arguments

View File

@ -71,6 +71,8 @@ class StatusTestCase(PluginTestCase):
def testThreads(self): def testThreads(self):
self.assertNotError('threads') self.assertNotError('threads')
def testProcesses(self):
self.assertNotError('processes')
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79: # vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79:

View File

@ -33,10 +33,12 @@ import binascii
import supybot.utils as utils import supybot.utils as utils
from supybot.commands import * from supybot.commands import *
import supybot.commands as commands
import supybot.plugins as plugins import supybot.plugins as plugins
import supybot.ircutils as ircutils import supybot.ircutils as ircutils
import supybot.callbacks as callbacks import supybot.callbacks as callbacks
import multiprocessing
class String(callbacks.Plugin): class String(callbacks.Plugin):
def ord(self, irc, msg, args, letter): def ord(self, irc, msg, args, letter):
@ -136,10 +138,10 @@ class String(callbacks.Plugin):
s = 'You probably don\'t want to match the empty string.' s = 'You probably don\'t want to match the empty string.'
irc.error(s) irc.error(s)
else: else:
irc.reply(f(text)) v = commands.process(f, text, timeout=10, pn=self.name(), cn='re')
re = wrap(re, [('checkCapability', 'trusted'), irc.reply(v)
first('regexpMatcher', 'regexpReplacer'), re = thread(wrap(re, [first('regexpMatcher', 'regexpReplacer'),
'text']) 'text']))
def xor(self, irc, msg, args, password, text): def xor(self, irc, msg, args, password, text):
"""<password> <text> """<password> <text>

View File

@ -976,6 +976,23 @@ class CommandThread(world.SupyThread):
finally: finally:
self.cb.threaded = self.originalThreaded self.cb.threaded = self.originalThreaded
class CommandProcess(world.SupyProcess):
"""Just does some extra logging and error-recovery for commands that need
to run in processes.
"""
def __init__(self, target=None, args=(), kwargs={}):
pn = kwargs.pop('pn', 'Unknown')
cn = kwargs.pop('cn', 'unknown')
procName = 'Process #%s (for %s.%s)' % (world.processesSpawned,
pn,
cn)
log.debug('Spawning process %s (args: %r)', procName, args)
self.__parent = super(CommandProcess, self)
self.__parent.__init__(target=target, name=procName,
args=args, kwargs=kwargs)
def run(self):
self.__parent.run()
class CanonicalString(registry.NormalizedString): class CanonicalString(registry.NormalizedString):
def normalize(self, s): def normalize(self, s):

View File

@ -37,6 +37,8 @@ import types
import getopt import getopt
import inspect import inspect
import threading import threading
import multiprocessing #python2.6 or later!
import Queue
import supybot.log as log import supybot.log as log
import supybot.conf as conf import supybot.conf as conf
@ -67,6 +69,34 @@ def thread(f):
f(self, irc, msg, args, *L, **kwargs) f(self, irc, msg, args, *L, **kwargs)
return utils.python.changeFunctionName(newf, f.func_name, f.__doc__) return utils.python.changeFunctionName(newf, f.func_name, f.__doc__)
def process(f, *args, **kwargs):
"""Runs a function <f> in a subprocess.
Several extra keyword arguments can be supplied.
<pn>, the pluginname, and <cn>, the command name, are strings used to
create the process name, for identification purposes.
<timeout>, if supplied, limits the length of execution of target
function to <timeout> seconds."""
timeout = kwargs.pop('timeout', None)
q = multiprocessing.Queue()
def newf(f, q, *args, **kwargs):
r = f(*args, **kwargs)
q.put(r)
targetArgs = (f, q,) + args
p = callbacks.CommandProcess(target=newf,
args=targetArgs, kwargs=kwargs)
p.start()
p.join(timeout)
if p.is_alive():
p.terminate()
q.put("%s aborted due to timeout." % (p.name,))
try:
v = q.get(block=False)
except Queue.Empty:
v = "Nothing returned."
return v
class UrlSnarfThread(world.SupyThread): class UrlSnarfThread(world.SupyThread):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
assert 'url' in kwargs assert 'url' in kwargs

View File

@ -37,6 +37,7 @@ import sys
import time import time
import atexit import atexit
import threading import threading
import multiprocessing # python 2.6 and later!
if sys.version_info >= (2, 5, 0): if sys.version_info >= (2, 5, 0):
import re as sre import re as sre
@ -67,6 +68,15 @@ class SupyThread(threading.Thread):
super(SupyThread, self).__init__(*args, **kwargs) super(SupyThread, self).__init__(*args, **kwargs)
log.debug('Spawning thread %q.', self.getName()) log.debug('Spawning thread %q.', self.getName())
processesSpawned = 1 # Starts at one for the initial process.
class SupyProcess(multiprocessing.Process):
def __init__(self, *args, **kwargs):
global processesSpawned
processesSpawned += 1
super(SupyProcess, self).__init__(*args, **kwargs)
log.debug('Spawning process %q.', self.name)
commandsProcessed = 0 commandsProcessed = 0
ircs = [] # A list of all the IRCs. ircs = [] # A list of all the IRCs.