Merge branch 'more-plugins' into testing

This commit is contained in:
Valentin Lorentz 2010-10-24 13:37:51 +02:00
commit 6ef2503416
17 changed files with 2785 additions and 0 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,62 @@
###
# Copyright (c) 2003-2005, James Vega
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions, and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions, and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the author of this software nor the name of
# contributors to this software may be used to endorse or promote products
# derived from this software without specific prior written consent.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
###
"""
This is a module to contain Debian-specific commands.
"""
import supybot
import supybot.world as world
# Use this for the version of this plugin. You may wish to put a CVS keyword
# in here if you're keeping the plugin in CVS or some similar system.
__version__ = "0.1"
__author__ = supybot.authors.jamessan
# This is a dictionary mapping supybot.Author instances to lists of
# contributions.
__contributors__ = {}
import config
import plugin
reload(plugin) # In case we're being reloaded.
# Add more reloads here if you add third-party modules and want them to be
# reloaded when this plugin is reloaded. Don't forget to import them as well!
import BeautifulSoup
reload(BeautifulSoup)
if world.testing:
import test
Class = plugin.Class
configure = config.configure
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79:

75
plugins/Debian/config.py Normal file
View File

@ -0,0 +1,75 @@
###
# Copyright (c) 2003-2005, James Vega
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions, and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions, and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the author of this software nor the name of
# contributors to this software may be used to endorse or promote products
# derived from this software without specific prior written consent.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
###
import supybot.conf as conf
import supybot.registry as registry
def configure(advanced):
# This will be called by supybot to configure this module. advanced is
# a bool that specifies whether the user identified himself as an advanced
# user or not. You should effect your configuration by manipulating the
# registry as appropriate.
from supybot.questions import output, expect, anything, something, yn
conf.registerPlugin('Debian', True)
if not utils.findBinaryInPath('zgrep'):
if not advanced:
output("""I can't find zgrep in your path. This is necessary
to run the file command. I'll disable this command
now. When you get zgrep in your path, use the command
'enable Debian.file' to re-enable the command.""")
capabilities = conf.supybot.capabilities()
capabilities.add('-Debian.file')
conf.supybot.capabilities.set(capabilities)
else:
output("""I can't find zgrep in your path. If you want to run
the file command with any sort of expediency, you'll
need it. You can use a python equivalent, but it's
about two orders of magnitude slower. THIS MEANS IT
WILL TAKE AGES TO RUN THIS COMMAND. Don't do this.""")
if yn('Do you want to use a Python equivalent of zgrep?'):
conf.supybot.plugins.Debian.pythonZgrep.setValue(True)
else:
output('I\'ll disable file now.')
capabilities = conf.supybot.capabilities()
capabilities.add('-Debian.file')
conf.supybot.capabilities.set(capabilities)
Debian = conf.registerPlugin('Debian')
conf.registerGlobalValue(Debian, 'pythonZgrep',
registry.Boolean(False, """An advanced option, mostly just for testing;
uses a Python-coded zgrep rather than the actual zgrep executable,
generally resulting in a 50x slowdown. What would take 2 seconds will
take 100 with this enabled. Don't enable this."""))
conf.registerChannelValue(Debian, 'bold',
registry.Boolean(True, """Determines whether the plugin will use bold in
the responses to some of its commands."""))
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79:

492
plugins/Debian/plugin.py Normal file
View File

@ -0,0 +1,492 @@
###
# Copyright (c) 2003-2005, James Vega
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions, and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions, and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the author of this software nor the name of
# contributors to this software may be used to endorse or promote products
# derived from this software without specific prior written consent.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
###
import os
import re
import gzip
import time
import popen2
import fnmatch
import threading
import BeautifulSoup
import supybot.conf as conf
import supybot.utils as utils
import supybot.world as world
from supybot.commands import *
import supybot.plugins as plugins
import supybot.ircutils as ircutils
import supybot.callbacks as callbacks
from supybot.utils.iter import all, imap, ifilter
class PeriodicFileDownloader(object):
"""A class to periodically download a file/files.
A class-level dictionary 'periodicFiles' maps names of files to
three-tuples of
(url, seconds between downloads, function to run with downloaded file).
'url' should be in some form that urllib2.urlopen can handle (do note that
urllib2.urlopen handles file:// links perfectly well.)
'seconds between downloads' is the number of seconds between downloads,
obviously. An important point to remember, however, is that it is only
engaged when a command is run. I.e., if you say you want the file
downloaded every day, but no commands that use it are run in a week, the
next time such a command is run, it'll be using a week-old file. If you
don't want such behavior, you'll have to give an error mess age to the user
and tell him to call you back in the morning.
'function to run with downloaded file' is a function that will be passed
a string *filename* of the downloaded file. This will be some random
filename probably generated via some mktemp-type-thing. You can do what
you want with this; you may want to build a database, take some stats,
or simply rename the file. You can pass None as your function and the
file with automatically be renamed to match the filename you have it listed
under. It'll be in conf.supybot.directories.data, of course.
Aside from that dictionary, simply use self.getFile(filename) in any method
that makes use of a periodically downloaded file, and you'll be set.
"""
periodicFiles = None
def __init__(self, *args, **kwargs):
if self.periodicFiles is None:
raise ValueError, 'You must provide files to download'
self.lastDownloaded = {}
self.downloadedCounter = {}
for filename in self.periodicFiles:
if self.periodicFiles[filename][-1] is None:
fullname = os.path.join(conf.supybot.directories.data(),
filename)
if os.path.exists(fullname):
self.lastDownloaded[filename] = os.stat(fullname).st_ctime
else:
self.lastDownloaded[filename] = 0
else:
self.lastDownloaded[filename] = 0
self.currentlyDownloading = set()
self.downloadedCounter[filename] = 0
self.getFile(filename)
super(PeriodicFileDownloader, self).__init__(*args, **kwargs)
def _downloadFile(self, filename, url, f):
self.currentlyDownloading.add(filename)
try:
try:
infd = utils.web.getUrlFd(url)
except IOError, e:
self.log.warning('Error downloading %s: %s', url, e)
return
except utils.web.Error, e:
self.log.warning('Error downloading %s: %s', url, e)
return
confDir = conf.supybot.directories.data()
newFilename = os.path.join(confDir, utils.file.mktemp())
outfd = file(newFilename, 'wb')
start = time.time()
s = infd.read(4096)
while s:
outfd.write(s)
s = infd.read(4096)
infd.close()
outfd.close()
self.log.info('Downloaded %s in %s seconds',
filename, time.time()-start)
self.downloadedCounter[filename] += 1
self.lastDownloaded[filename] = time.time()
if f is None:
toFilename = os.path.join(confDir, filename)
if os.name == 'nt':
# Windows, grrr...
if os.path.exists(toFilename):
os.remove(toFilename)
os.rename(newFilename, toFilename)
else:
start = time.time()
f(newFilename)
total = time.time() - start
self.log.info('Function ran on %s in %s seconds',
filename, total)
finally:
self.currentlyDownloading.remove(filename)
def getFile(self, filename):
if world.documenting:
return
(url, timeLimit, f) = self.periodicFiles[filename]
if time.time() - self.lastDownloaded[filename] > timeLimit and \
filename not in self.currentlyDownloading:
self.log.info('Beginning download of %s', url)
args = (filename, url, f)
name = '%s #%s' % (filename, self.downloadedCounter[filename])
t = threading.Thread(target=self._downloadFile, name=name,
args=(filename, url, f))
t.setDaemon(True)
t.start()
world.threadsSpawned += 1
class Debian(callbacks.Plugin, PeriodicFileDownloader):
threaded = True
periodicFiles = {
# This file is only updated once a week, so there's no sense in
# downloading a new one every day.
'Contents-i386.gz': ('ftp://ftp.us.debian.org/'
'debian/dists/unstable/Contents-i386.gz',
604800, None)
}
contents = conf.supybot.directories.data.dirize('Contents-i386.gz')
def file(self, irc, msg, args, optlist, glob):
"""[--{regexp,exact} <value>] [<glob>]
Returns packages in Debian that includes files matching <glob>. If
--regexp is given, returns packages that include files matching the
given regexp. If --exact is given, returns packages that include files
matching exactly the string given.
"""
self.getFile('Contents-i386.gz')
# Make sure it's anchored, make sure it doesn't have a leading slash
# (the filenames don't have leading slashes, and people may not know
# that).
if not optlist and not glob:
raise callbacks.ArgumentError
if optlist and glob:
irc.error('You must specify either a glob or a regexp/exact '
'search, but not both.', Raise=True)
for (option, arg) in optlist:
if option == 'exact':
regexp = arg.lstrip('/')
elif option == 'regexp':
regexp = arg
if glob:
regexp = fnmatch.translate(glob.lstrip('/'))
regexp = regexp.rstrip('$')
regexp = ".*%s.* " % regexp
try:
re_obj = re.compile(regexp, re.I)
except re.error, e:
irc.error(format('Error in regexp: %s', e), Raise=True)
if self.registryValue('pythonZgrep'):
fd = gzip.open(self.contents)
r = imap(lambda tup: tup[0],
ifilter(lambda tup: tup[0],
imap(lambda line:(re_obj.search(line), line),fd)))
else:
try:
(r, w) = popen2.popen4(['zgrep', '-ie', regexp, self.contents])
w.close()
except TypeError:
# We're on Windows.
irc.error('This command won\'t work on this platform. '
'If you think it should (i.e., you know that you '
'have a zgrep binary somewhere) then file a bug '
'about it at http://supybot.sf.net/ .', Raise=True)
packages = set() # Make packages unique
try:
for line in r:
if len(packages) > 100:
irc.error('More than 100 packages matched, '
'please narrow your search.', Raise=True)
try:
if hasattr(line, 'group'): # we're actually using
line = line.group(0) # pythonZgrep :(
(filename, pkg_list) = line.split()
if filename == 'FILE':
# This is the last line before the actual files.
continue
except ValueError: # Unpack list of wrong size.
continue # We've not gotten to the files yet.
packages.update(pkg_list.split(','))
finally:
if hasattr(r, 'close'):
r.close()
if len(packages) == 0:
irc.reply('I found no packages with that file.')
else:
irc.reply(format('%L', sorted(packages)))
file = wrap(file, [getopts({'regexp':'regexpMatcher','exact':'something'}),
additional('glob')])
_debreflags = re.DOTALL | re.IGNORECASE
_deblistre = re.compile(r'<h3>Package ([^<]+)</h3>(.*?)</ul>', _debreflags)
def version(self, irc, msg, args, optlist, branch, package):
"""[--exact] [{stable,testing,unstable,experimental}] <package name>
Returns the current version(s) of a Debian package in the given branch
(if any, otherwise all available ones are displayed). If --exact is
specified, only packages whose name exactly matches <package name>
will be reported.
"""
url = 'http://packages.debian.org/cgi-bin/search_packages.pl?keywords'\
'=%s&searchon=names&version=%s&release=all&subword=1'
for (option, _) in optlist:
if option == 'exact':
url = url.replace('&subword=1','')
responses = []
if '*' in package:
irc.error('Wildcard characters can not be specified.', Raise=True)
package = utils.web.urlquote(package)
url %= (package, branch)
try:
html = utils.web.getUrl(url)
except utils.web.Error, e:
irc.error(format('I couldn\'t reach the search page (%s).', e),
Raise=True)
if 'is down at the moment' in html:
irc.error('Packages.debian.org is down at the moment. '
'Please try again later.', Raise=True)
pkgs = self._deblistre.findall(html)
if not pkgs:
irc.reply(format('No package found for %s (%s)',
utils.web.urlunquote(package), branch))
else:
for pkg in pkgs:
pkgMatch = pkg[0]
soup = BeautifulSoup.BeautifulSoup()
soup.feed(pkg[1])
liBranches = soup.fetch('li')
branches = []
versions = []
def branchVers(br):
vers = [b.next.string.strip() for b in br]
return [utils.str.rsplit(v, ':', 1)[0] for v in vers]
for li in liBranches:
branches.append(li.a.string)
versions.append(branchVers(li.fetch('br')))
if branches and versions:
for pairs in zip(branches, versions):
branch = pairs[0]
ver = ', '.join(pairs[1])
s = format('%s (%s)', pkgMatch,
': '.join([branch, ver]))
responses.append(s)
resp = format('%i matches found: %s',
len(responses), '; '.join(responses))
irc.reply(resp)
version = wrap(version, [getopts({'exact':''}),
optional(('literal', ('stable', 'testing',
'unstable', 'experimental')), 'all'),
'text'])
_incomingRe = re.compile(r'<a href="(.*?\.deb)">', re.I)
def incoming(self, irc, msg, args, optlist, globs):
"""[--{regexp,arch} <value>] [<glob> ...]
Checks debian incoming for a matching package name. The arch
parameter defaults to i386; --regexp returns only those package names
that match a given regexp, and normal matches use standard *nix
globbing.
"""
predicates = []
archPredicate = lambda s: ('_i386.' in s)
for (option, arg) in optlist:
if option == 'regexp':
predicates.append(r.search)
elif option == 'arch':
arg = '_%s.' % arg
archPredicate = lambda s, arg=arg: (arg in s)
predicates.append(archPredicate)
for glob in globs:
glob = fnmatch.translate(glob)
predicates.append(re.compile(glob).search)
packages = []
try:
fd = utils.web.getUrlFd('http://incoming.debian.org/')
except utils.web.Error, e:
irc.error(str(e), Raise=True)
for line in fd:
m = self._incomingRe.search(line)
if m:
name = m.group(1)
if all(None, imap(lambda p: p(name), predicates)):
realname = utils.str.rsplit(name, '_', 1)[0]
packages.append(realname)
if len(packages) == 0:
irc.error('No packages matched that search.')
else:
irc.reply(format('%L', packages))
incoming = thread(wrap(incoming,
[getopts({'regexp': 'regexpMatcher',
'arch': 'something'}),
any('glob')]))
def bold(self, s):
if self.registryValue('bold', dynamic.channel):
return ircutils.bold(s)
return s
_update = re.compile(r' : ([^<]+)</body', re.I)
def stats(self, irc, msg, args, pkg):
"""<source package>
Reports various statistics (from http://packages.qa.debian.org/) about
<source package>.
"""
pkg = pkg.lower()
text = utils.web.getUrl('http://packages.qa.debian.org/%s/%s.html' %
(pkg[0], pkg))
if "Error 404" in text:
irc.errorInvalid('source package name')
updated = None
m = self._update.search(text)
if m:
updated = m.group(1)
soup = BeautifulSoup.BeautifulSoup()
soup.feed(text)
pairs = zip(soup.fetch('td', {'class': 'labelcell'}),
soup.fetch('td', {'class': 'contentcell'}))
for (label, content) in pairs:
if label.string == 'Last version':
version = '%s: %s' % (self.bold(label.string), content.string)
elif label.string == 'Maintainer':
name = content.a.string
email = content.fetch('a')[1]['href'][7:]
maintainer = format('%s: %s %u', self.bold('Maintainer'),
name, utils.web.mungeEmail(email))
elif label.string == 'All bugs':
bugsAll = format('%i Total', content.first('a').string)
elif label.string == 'Release Critical':
bugsRC = format('%i RC', content.first('a').string)
elif label.string == 'Important and Normal':
bugs = format('%i Important/Normal',
content.first('a').string)
elif label.string == 'Minor and Wishlist':
bugsMinor = format('%i Minor/Wishlist',
content.first('a').string)
elif label.string == 'Fixed and Pending':
bugsFixed = format('%i Fixed/Pending',
content.first('a').string)
elif label.string == 'Subscribers count':
subscribers = format('%s: %i',
self.bold('Subscribers'), content.string)
bugL = (bugsAll, bugsRC, bugs, bugsMinor, bugsFixed)
s = '. '.join((version, maintainer, subscribers,
'%s: %s' % (self.bold('Bugs'), '; '.join(bugL))))
if updated:
s = 'As of %s, %s' % (updated, s)
irc.reply(s)
stats = wrap(stats, ['somethingWithoutSpaces'])
_newpkgre = re.compile(r'<li><a href[^>]+>([^<]+)</a>')
def new(self, irc, msg, args, section, glob):
"""[{main,contrib,non-free}] [<glob>]
Checks for packages that have been added to Debian's unstable branch
in the past week. If no glob is specified, returns a list of all
packages. If no section is specified, defaults to main.
"""
try:
fd = utils.web.getUrlFd(
'http://packages.debian.org/unstable/newpkg_%s' % section)
except utils.web.Error, e:
irc.error(str(e), Raise=True)
packages = []
for line in fd:
m = self._newpkgre.search(line)
if m:
m = m.group(1)
if fnmatch.fnmatch(m, glob):
packages.append(m)
fd.close()
if packages:
irc.reply(format('%L', packages))
else:
irc.error('No packages matched that search.')
new = wrap(new, [optional(('literal', ('main', 'contrib', 'non-free')),
'main'),
additional('glob', '*')])
_severity = re.compile(r'.*(?:severity set to `([^\']+)\'|'
r'severity:\s+<em>([^<]+)</em>)', re.I)
_package = re.compile(r'Package: <[^>]+>([^<]+)<', re.I | re.S)
_reporter = re.compile(r'Reported by: <[^>]+>([^<]+)<', re.I | re.S)
_subject = re.compile(r'<br>([^<]+)</h1>', re.I | re.S)
_date = re.compile(r'Date: ([^;]+);', re.I | re.S)
_tags = re.compile(r'Tags: <strong>([^<]+)</strong>', re.I)
_searches = (_package, _subject, _reporter, _date)
def bug(self, irc, msg, args, bug):
"""<num>
Returns a description of the bug with bug id <num>.
"""
url = 'http://bugs.debian.org/%s' % bug
try:
text = utils.web.getUrl(url)
except utils.web.Error, e:
irc.error(str(e), Raise=True)
if "There is no record of Bug" in text:
irc.error('I could not find a bug report matching that number.',
Raise=True)
searches = map(lambda p: p.search(text), self._searches)
sev = self._severity.search(text)
tags = self._tags.search(text)
# This section should be cleaned up to ease future modifications
if all(None, searches):
L = map(self.bold, ('Package', 'Subject', 'Reported'))
resp = format('%s: %%s; %s: %%s; %s: by %%s on %%s', *L)
L = map(utils.web.htmlToText, map(lambda p: p.group(1), searches))
resp = format(resp, *L)
if sev:
sev = filter(None, sev.groups())
if sev:
sev = utils.web.htmlToText(sev[0])
resp += format('; %s: %s', self.bold('Severity'), sev)
if tags:
resp += format('; %s: %s', self.bold('Tags'), tags.group(1))
resp += format('; %u', url)
irc.reply(resp)
else:
irc.reply('I was unable to properly parse the BTS page.')
bug = wrap(bug, [('id', 'bug')])
_dpnRe = re.compile(r'"\+2">([^<]+)</font', re.I)
def debianize(self, irc, msg, args, words):
"""<text>
Turns <text> into a 'debian package name' using
http://www.pigdog.com/features/dpn.html.
"""
url = r'http://www.pigdog.org/cgi_bin/dpn.phtml?name=%s'
try:
text = utils.web.getUrl(url % '+'.join(words))
except utils.web.Error, e:
irc.error(str(e), Raise=True)
m = self._dpnRe.search(text)
if m is not None:
irc.reply(m.group(1))
else:
irc.errorPossibleBug('Unable to parse webpage.')
debianize = wrap(debianize, [many('something')])
Class = Debian
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79:

92
plugins/Debian/test.py Normal file
View File

@ -0,0 +1,92 @@
###
# Copyright (c) 2003-2005, James Vega
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions, and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions, and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the author of this software nor the name of
# contributors to this software may be used to endorse or promote products
# derived from this software without specific prior written consent.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
###
import os
import time
from supybot.test import *
class DebianTestCase(PluginTestCase):
plugins = ('Debian',)
timeout = 100
cleanDataDir = False
fileDownloaded = False
if network:
def setUp(self, nick='test'):
PluginTestCase.setUp(self)
try:
datadir = conf.supybot.directories.data
if os.path.exists(datadir.dirize('Contents-i386.gz')):
pass
else:
print
print "Downloading files, this may take awhile."
filename = datadir.dirize('Contents-i386.gz')
while not os.path.exists(filename):
time.sleep(1)
print "Download complete."
print "Starting test ..."
self.fileDownloaded = True
except KeyboardInterrupt:
pass
def testDebBugNoHtml(self):
self.assertNotRegexp('debian bug 287792', r'\<em\>')
def testDebversion(self):
self.assertHelp('debian version')
self.assertRegexp('debian version lakjdfad',
r'^No package.*\(all\)')
self.assertRegexp('debian version unstable alkdjfad',
r'^No package.*\(unstable\)')
self.assertRegexp('debian version gaim',
r'\d+ matches found:.*gaim.*\(stable')
self.assertRegexp('debian version linux-wlan',
r'\d+ matches found:.*linux-wlan.*')
self.assertRegexp('debian version --exact linux-wlan',
r'^No package.*\(all\)')
self.assertError('debian version unstable')
def testDebfile(self):
self.assertHelp('file')
if not self.fileDownloaded:
pass
self.assertRegexp('file --exact bin/gaim', r'net/gaim')
def testDebincoming(self):
self.assertNotError('incoming')
def testDebianize(self):
self.assertNotError('debianize supybot')
def testDebstats(self):
self.assertNotError('stats supybot')
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79:

View File

@ -0,0 +1 @@
Insert a description of your plugin here, with any notes, etc. about using it.

View File

@ -0,0 +1,66 @@
###
# Copyright (c) 2010, Valentin Lorentz
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions, and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions, and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the author of this software nor the name of
# contributors to this software may be used to endorse or promote products
# derived from this software without specific prior written consent.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
###
"""
This plugins provides a MegaHAL integration for Supybot.
MegaHAL must be installed ('apt-get install megahal' on Debian)
"""
import supybot
import supybot.world as world
# Use this for the version of this plugin. You may wish to put a CVS keyword
# in here if you're keeping the plugin in CVS or some similar system.
__version__ = ""
# XXX Replace this with an appropriate author or supybot.Author instance.
__author__ = supybot.authors.unknown
# This is a dictionary mapping supybot.Author instances to lists of
# contributions.
__contributors__ = {}
# This is a url where the most recent plugin package can be downloaded.
__url__ = '' # 'http://supybot.com/Members/yourname/MegaHAL/download'
import config
import plugin
reload(plugin) # In case we're being reloaded.
# Add more reloads here if you add third-party modules and want them to be
# reloaded when this plugin is reloaded. Don't forget to import them as well!
if world.testing:
import test
Class = plugin.Class
configure = config.configure
# vim:set shiftwidth=4 tabstop=4 expandtab textwidth=79:

72
plugins/MegaHAL/config.py Normal file
View File

@ -0,0 +1,72 @@
###
# Copyright (c) 2010, Valentin Lorentz
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions, and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions, and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the author of this software nor the name of
# contributors to this software may be used to endorse or promote products
# derived from this software without specific prior written consent.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
###
import supybot.conf as conf
import supybot.registry as registry
try:
from supybot.i18n import PluginInternationalization
from supybot.i18n import internationalizeDocstring
_ = PluginInternationalization('MegaHAL')
except:
_ = lambda x:x
internationalizeDocstring = lambda x:x
def configure(advanced):
# This will be called by supybot to configure this module. advanced is
# a bool that specifies whether the user identified himself as an advanced
# user or not. You should effect your configuration by manipulating the
# registry as appropriate.
from supybot.questions import expect, anything, something, yn
conf.registerPlugin('MegaHAL', True)
MegaHAL = conf.registerPlugin('MegaHAL')
# This is where your configuration variables (if any) should go. For example:
# conf.registerGlobalValue(MegaHAL, 'someConfigVariableName',
# registry.Boolean(False, """Help for someConfigVariableName."""))
conf.registerGroup(MegaHAL, 'learn')
conf.registerGlobalValue(MegaHAL.learn, 'commands',
registry.Boolean(False, _("""Determines whether the bot answers to messages
beginning by a non-alphanumeric char.""")))
conf.registerGroup(MegaHAL, 'answer')
conf.registerChannelValue(MegaHAL.answer, 'commands',
registry.Boolean(False, _("""Determines whether messages beginning by a
non-alphanumeric char are learned.""")))
conf.registerChannelValue(MegaHAL.answer, 'probability',
registry.Integer(10, _("""Determines the percent of messages the bot will
answer (zero is recommended if you have a tiny database).""")))
conf.registerChannelValue(MegaHAL.answer, 'probabilityWhenAddressed',
registry.Integer(100, _("""Determines the percent of messages adressed to
the bot the bot will answer.""")))
# vim:set shiftwidth=4 tabstop=4 expandtab textwidth=79:

148
plugins/MegaHAL/plugin.py Normal file
View File

@ -0,0 +1,148 @@
###
# Copyright (c) 2010, Valentin Lorentz
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions, and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions, and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the author of this software nor the name of
# contributors to this software may be used to endorse or promote products
# derived from this software without specific prior written consent.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
###
import re
import os
import sys
import random
import supybot.conf as conf
import mh_python as megahal
import supybot.utils as utils
from cStringIO import StringIO
from supybot.commands import *
import supybot.plugins as plugins
import supybot.ircutils as ircutils
import supybot.callbacks as callbacks
try:
from supybot.i18n import PluginInternationalization
from supybot.i18n import internationalizeDocstring
_ = PluginInternationalization('MegaHAL')
except:
# This are useless function that's allow to run the plugin on a bot
# without the i18n plugin
_ = lambda x:x
internationalizeDocstring = lambda x:x
class MegaHAL(callbacks.Plugin):
"""This plugins provides a MegaHAL integration for Supybot.
MegaHAL must be installed ('apt-get install megahal' on Debian)"""
callAfter = ['MoobotFactoids', 'Factoids', 'Infobot']
callBefore = ['Dunno']
def __init__(self, irc):
# Call Supybot's scripts
self.__parent = super(MegaHAL, self)
self.__parent.__init__(irc)
# Save state
saved = (sys.stdout, os.getcwd())
# Create proxy for MegaHAL
os.chdir(conf.supybot.directories.data())
sys.stdout = StringIO()
# Initialize MegaHAL
megahal.initbrain()
# Restore state
sys.stdout, cwd = saved
os.chdir(cwd)
random.seed()
_dontKnow = [
'I don\'t know enough to answer you yet!',
'I am utterly speechless!',
'I forgot what I was going to say!'
]
_translations = {
'I don\'t know enough to answer you yet!':
_('I don\'t know enough to answer you yet!'),
'I am utterly speechless!':
_('I am utterly speechless!'),
'I forgot what I was going to say!':
_('I forgot what I was going to say!'),
}
def _response(self, msg, prb, reply):
if random.randint(0, 100) < prb:
response = megahal.doreply(msg)
if self._translations.has_key(response):
response = self._translations[response]
reply(response)
else:
megahal.learn(msg)
def doPrivmsg(self, irc, msg):
message = msg.args[1]
probability = self.registryValue('answer.probability')
answer = False
learn = False
if message.startswith(irc.nick) or re.match('\W.*', message):
# Managed by invalidCommand
return
print repr(message)
if answer:
self._response(message,
self.registryValue('answer.probability'),
irc.reply)
elif learn:
megahal.learn(message)
def invalidCommand(self, irc, msg, tokens):
message = msg.args[1]
usedToStartWithNick = False
if message.startswith(message):
parsed = re.match('(.+ |\W)?(?P<message>\w.*)', message)
message = parsed.group('message')
usedToStartWithNick = True
if self.registryValue('answer.commands') or usedToStartWithNick:
self._response(message,
self.registryValue('answer.probabilityWhenAdressed'),
irc.reply)
elif self.registryValue('learn.commands'):
megahal.learn(message)
@internationalizeDocstring
def cleanup(self, irc, msg, args):
"""takes no argument
Saves MegaHAL brain to disk."""
megahal.cleanup()
irc.replySuccess()
Class = MegaHAL
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79:

43
plugins/MegaHAL/test.py Normal file
View File

@ -0,0 +1,43 @@
###
# Copyright (c) 2010, Valentin Lorentz
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions, and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions, and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the author of this software nor the name of
# contributors to this software may be used to endorse or promote products
# derived from this software without specific prior written consent.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
###
from supybot.test import *
class MegaHALTestCase(PluginTestCase):
plugins = ('MegaHAL',)
def testCleanup(self):
self.assertNotError('cleanup')
def testAnswer(self):
self.assertNotRegexp('foo', '.*not a valid.*')
# vim:set shiftwidth=4 tabstop=4 expandtab textwidth=79:

View File

@ -0,0 +1 @@
Insert a description of your plugin here, with any notes, etc. about using it.

View File

@ -0,0 +1,68 @@
# -*- coding: utf8 -*-
###
# Copyright (c) 2010, Valentin Lorentz
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions, and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions, and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the author of this software nor the name of
# contributors to this software may be used to endorse or promote products
# derived from this software without specific prior written consent.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
###
"""
Ce plugin est un portage du robot IRC Fschfsch, servant à fournir un accès
à la pysandbox sur IRC.
"""
import supybot
import supybot.world as world
# Use this for the version of this plugin. You may wish to put a CVS keyword
# in here if you're keeping the plugin in CVS or some similar system.
__version__ = "0.1"
# XXX Replace this with an appropriate author or supybot.Author instance.
__author__ = supybot.Author('Valentin Lorentz', 'ProgVal',
'progval@gmail.com')
# This is a dictionary mapping supybot.Author instances to lists of
# contributions.
__contributors__ = {}
# This is a url where the most recent plugin package can be downloaded.
__url__ = 'http://supybot-fr.tk/SupySandbox'
import config
import plugin
reload(plugin) # In case we're being reloaded.
# Add more reloads here if you add third-party modules and want them to be
# reloaded when this plugin is reloaded. Don't forget to import them as well!
if world.testing:
import test
Class = plugin.Class
configure = config.configure
# vim:set shiftwidth=4 tabstop=4 expandtab textwidth=79:

View File

@ -0,0 +1,50 @@
# -*- coding: utf8 -*-
###
# Copyright (c) 2010, Valentin Lorentz
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions, and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions, and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the author of this software nor the name of
# contributors to this software may be used to endorse or promote products
# derived from this software without specific prior written consent.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
###
import supybot.conf as conf
import supybot.registry as registry
def configure(advanced):
# This will be called by supybot to configure this module. advanced is
# a bool that specifies whether the user identified himself as an advanced
# user or not. You should effect your configuration by manipulating the
# registry as appropriate.
from supybot.questions import expect, anything, something, yn
conf.registerPlugin('SupySandbox', True)
PySandbox = conf.registerPlugin('SupySandbox')
# This is where your configuration variables (if any) should go. For example:
# conf.registerGlobalValue(PySandbox, 'someConfigVariableName',
# registry.Boolean(False, """Help for someConfigVariableName."""))
# vim:set shiftwidth=4 tabstop=4 expandtab textwidth=79:

View File

@ -0,0 +1 @@
# Stub so local is a module, used for third-party modules

View File

@ -0,0 +1,279 @@
#!/usr/bin/env python
# this file is under the WTFPLv2 [http://sam.zoy.org/wtfpl]
# v1: 2010/05/23
# Author: Tila
# You need a configuration file: ~/.fschfsch.py. Config example:
# ---
# host = 'irc.freenode.net'
# port = 7000
# ssl = True
# nickname = 'botnickname'
# password = 'secret'
# channels = ['##fschfsch', '#channel2', '#channel3']
# texts = {'help': 'I am fschfsch, a robot snake that evals python code',
# 'sandbox': "I am powered by setrlimit and pysandbox [http://github.com/haypo/pysandbox], I don't fear you"}
# ---
'''
fschfsch is a Python-evaluating bot. fschfsch is pronounced "fssshh! fssshh!".
'''
IN_MAXLEN = 300 # bytes
OUT_MAXLEN = 300 # bytes
TIMEOUT = 3 # seconds
EVAL_MAXTIMESECONDS = TIMEOUT
EVAL_MAXMEMORYBYTES = 10 * 1024 * 1024 # 10 MiB
try:
import sandbox as S
except ImportError:
print 'You need pysandbox in order to run fschfsch [http://github.com/haypo/pysandbox].'
raise
try:
import twisted
except ImportError:
print 'You need twisted in order to run fschfsch.'
raise
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.internet import ssl, reactor
from twisted.words.im.ircsupport import IRCProto
from twisted.words.protocols.irc import IRCClient
# other imports
import re
import sys
import os
import resource as R
import select
import signal
import time
import threading
import random
def createSandboxConfig():
cfg = S.SandboxConfig(
'stdout',
'stderr',
'regex',
'unicodedata', # flow wants u'\{ATOM SYMBOL}' :-)
'future',
'code',
'time',
'datetime',
'math',
'itertools',
'random',
'encodings',
)
cfg.allowModule('sys',
'version', 'hexversion', 'version_info')
return cfg
def _evalPython(line, locals):
locals = dict(locals)
try:
if "\n" in line:
raise SyntaxError()
code = compile(line, "<irc>", "single")
except SyntaxError:
code = compile(line, "<irc>", "exec")
exec code in locals
def evalPython(line, locals=None):
sandbox = S.Sandbox(config=createSandboxConfig())
if locals is not None:
locals = dict(locals)
else:
locals = dict()
try:
sandbox.call(_evalPython, line, locals)
except BaseException, e:
print 'Error: [%s] %s' % (e.__class__.__name__, str(e))
except:
print 'Error: <unknown exception>'
sys.stdout.flush()
def childProcess(line, w, locals):
# reseed after a fork to avoid generating the same sequence for each child
random.seed()
sys.stdout = sys.stderr = os.fdopen(w, 'w')
R.setrlimit(R.RLIMIT_CPU, (EVAL_MAXTIMESECONDS, EVAL_MAXTIMESECONDS))
R.setrlimit(R.RLIMIT_AS, (EVAL_MAXMEMORYBYTES, EVAL_MAXMEMORYBYTES))
R.setrlimit(R.RLIMIT_NPROC, (0, 0)) # 0 forks
evalPython(line, locals)
def handleChild(childpid, r):
txt = ''
if any(select.select([r], [], [], TIMEOUT)):
txt = os.read(r, OUT_MAXLEN + 1)
os.close(r)
if OUT_MAXLEN < len(txt):
txt = txt[:OUT_MAXLEN] + '...'
n = 0
while n < 6:
pid, status = os.waitpid(childpid, os.WNOHANG)
if pid:
break
time.sleep(.5)
n += 1
if not pid:
os.kill(childpid, signal.SIGKILL)
return 'Timeout'
elif os.WIFEXITED(status):
txts = txt.rstrip().split('\n')
if len(txts) > 1:
txt = txts[0].rstrip() + ' [+ %d line(s)]' % (len(txts) - 1)
else:
txt = txts[0].rstrip()
return 'Output: ' + txt
elif os.WIFSIGNALED(status):
return 'Killed'
class EvalJob(threading.Thread):
def __init__(self, line, irc, channel):
super(EvalJob, self).__init__()
self.line = line
self.irc = irc
self.channel = channel
def run(self):
output = self.handle_line(self.line)
reactor.callFromThread(self.irc.say, self.channel, output)
self.irc.executionLock.release()
def handle_line(self, line):
if IN_MAXLEN < len(line):
return '(command is too long: %s bytes, the maximum is %s)' % (len(line), IN_MAXLEN)
print("Process %s" % repr(line))
r, w = os.pipe()
childpid = os.fork()
if not childpid:
os.close(r)
childProcess(line, w, self.irc.factory.morevars)
os._exit(0)
else:
os.close(w)
result = handleChild(childpid, r)
print("=> %s" % repr(result))
return result
class EvalBot(IRCClient):
versionName = 'fschfsch'
versionNum = '0.1'
#~ def __init__(self, *a, **k):
def connectionMade(self):
self.nickname = self.factory.nick
self.password = self.factory.password
self.talkre = re.compile('^%s[>:,] (.*)$' % self.nickname)
self.executionLock = threading.Semaphore()
self.pingSelfId = None
IRCClient.connectionMade(self)
def signedOn(self):
self.pingSelfId = reactor.callLater(180, self.pingSelf)
for chan in self.factory.channels:
self.join(chan)
def pingSelf(self):
# used to avoid some timeouts where fschfsch does not reconnect
self.ping(self.nickname)
self.pingSelfId = reactor.callLater(180, self.pingSelf)
def privmsg(self, user, channel, message):
if self.pingSelfId is not None:
self.pingSelfId.reset(180)
if user.startswith('haypo') and message.startswith('exit'):
os._exit(0)
if not channel:
return
if not message.startswith(self.nickname):
return
if not self.talkre.match(message):
return
if not self.executionLock.acquire(blocking=False):
return
pyline = self.talkre.match(message).group(1)
pyline = pyline.replace(' $$ ', '\n')
self.handleThread = EvalJob(pyline, self, channel)
self.handleThread.start()
class MyFactory(ReconnectingClientFactory):
def __init__(self, **kw):
for k in kw:
if k in ('nick', 'password', 'channels', 'morevars'):
setattr(self, k, kw[k])
protocol = EvalBot
def check_output(expr, expected, locals=None):
from cStringIO import StringIO
original_stdout = sys.stdout
try:
output = StringIO()
sys.stdout = output
evalPython(expr, locals)
stdout = output.getvalue()
assert stdout == expected, "%r != %r" % (stdout, expected)
finally:
sys.stdout = original_stdout
def runTests():
# single
check_output('1+1', '2\n')
check_output('1; 2', '1\n2\n')
check_output(
# written in a single line
"prime=lambda n,i=2:"
"False if n%i==0 else prime(n,i+1) if i*i<n else True; "
"prime(17)",
"True\n")
# exec
check_output('def f(): print("hello")\nf()', 'hello\n')
check_output('print 1\nprint 2', '1\n2\n')
check_output('text', "'abc'\n", {'text': 'abc'})
return True
def main():
if len(sys.argv) == 2 and sys.argv[1] == 'tests':
ok = runTests()
if ok:
print("no failure")
else:
print("failure!")
sys.exit(int(not ok))
elif len(sys.argv) != 1:
print 'usage: %s -- run the bot' % sys.argv[0]
print ' or: %s tests -- run self tests' % sys.argv[0]
print
print 'Edit ~/.fschfschrc.py first'
sys.exit(4)
conf = {}
execfile(os.path.expanduser('~/.fschfschrc.py'), conf)
factory = MyFactory(nick=conf['nickname'], password=conf.get('password', None), channels=conf.get('channels', []), morevars=conf.get('texts', {}))
if conf.get('ssl', 0):
reactor.connectSSL(conf['host'], conf['port'], factory, ssl.ClientContextFactory())
else:
reactor.connectTCP(conf['host'], conf['port'], factory)
reactor.run()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,199 @@
###
# Copyright (c) 2010, Valentin Lorentz
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions, and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions, and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the author of this software nor the name of
# contributors to this software may be used to endorse or promote products
# derived from this software without specific prior written consent.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# pysandbox were originally writen by haypo (under the BSD license),
# and fschfsch by Tila (under the WTFPL license).
###
IN_MAXLEN = 1000 # bytes
OUT_MAXLEN = 1000 # bytes
TIMEOUT = 3 # seconds
EVAL_MAXTIMESECONDS = TIMEOUT
EVAL_MAXMEMORYBYTES = 75 * 1024 * 1024 # 10 MiB
try:
import sandbox as S
except ImportError:
print 'You need pysandbox in order to run fschfsch ' + \
'[http://github.com/haypo/pysandbox].'
raise
import re
import os
import sys
import time
import random
import select
import resource as R
import supybot.utils as utils
from supybot.commands import *
import supybot.plugins as plugins
import supybot.ircutils as ircutils
import supybot.callbacks as callbacks
from cStringIO import StringIO
def createSandboxConfig():
cfg = S.SandboxConfig(
'stdout',
'stderr',
'regex',
'unicodedata', # flow wants u'\{ATOM SYMBOL}' :-)
'future',
'code',
'time',
'datetime',
'math',
'itertools',
'random',
'encodings',
)
cfg.allowModule('sys',
'version', 'hexversion', 'version_info')
return cfg
def _evalPython(line, locals):
locals = dict(locals)
try:
if "\n" in line:
raise SyntaxError()
code = compile(line, "<irc>", "single")
except SyntaxError:
code = compile(line, "<irc>", "exec")
exec code in locals
def evalPython(line, locals=None):
sandbox = S.Sandbox(config=createSandboxConfig())
if locals is not None:
locals = dict(locals)
else:
locals = dict()
try:
sandbox.call(_evalPython, line, locals)
except BaseException, e:
print 'Error: [%s] %s' % (e.__class__.__name__, str(e))
except:
print 'Error: <unknown exception>'
sys.stdout.flush()
def check_output(expr, expected, locals=None):
from cStringIO import StringIO
original_stdout = sys.stdout
try:
output = StringIO()
sys.stdout = output
evalPython(expr, locals)
stdout = output.getvalue()
assert stdout == expected, "%r != %r" % (stdout, expected)
finally:
sys.stdout = original_stdout
def runTests():
# single
check_output('1+1', '2\n')
check_output('1; 2', '1\n2\n')
check_output(
# written in a single line
"prime=lambda n,i=2:"
"False if n%i==0 else prime(n,i+1) if i*i<n else True; "
"prime(17)",
"True\n")
# exec
check_output('def f(): print("hello")\nf()', 'hello\n')
check_output('print 1\nprint 2', '1\n2\n')
check_output('text', "'abc'\n", {'text': 'abc'})
return True
def childProcess(line, w, locals):
# reseed after a fork to avoid generating the same sequence for each child
random.seed()
sys.stdout = sys.stderr = os.fdopen(w, 'w')
R.setrlimit(R.RLIMIT_CPU, (EVAL_MAXTIMESECONDS, EVAL_MAXTIMESECONDS))
R.setrlimit(R.RLIMIT_AS, (EVAL_MAXMEMORYBYTES, EVAL_MAXMEMORYBYTES))
R.setrlimit(R.RLIMIT_NPROC, (0, 0)) # 0 forks
evalPython(line, locals)
def handleChild(childpid, r):
txt = ''
if __import__("__builtin__").any(select.select([r], [], [], TIMEOUT)):
txt = os.read(r, OUT_MAXLEN + 1)
os.close(r)
n = 0
while n < 6:
pid, status = os.waitpid(childpid, os.WNOHANG)
if pid:
break
time.sleep(.5)
n += 1
if not pid:
os.kill(childpid, signal.SIGKILL)
return 'Timeout'
elif os.WIFEXITED(status):
return txt.rstrip()
elif os.WIFSIGNALED(status):
return 'Killed'
def handle_line(line):
r, w = os.pipe()
childpid = os.fork()
if not childpid:
os.close(r)
childProcess(line, w, {})
os._exit(0)
else:
os.close(w)
result = handleChild(childpid, r)
return result
class SupySandbox(callbacks.Plugin):
"""Add the help for "@plugin help SupySandbox" here
This should describe *how* to use this plugin."""
_parser = re.compile(r'(.?sandbox)? (?P<code>.*)')
def sandbox(self, irc, msg, args):
"""<code>
Runs Python code safely thanks to pysandbox"""
code = self._parser.match(msg.args[1]).group('code')
irc.reply(handle_line(code.replace(' $$ ', '\n')))
def runtests(self, irc, msg, args):
irc.reply(runTests())
Class = SupySandbox
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79:

View File

@ -0,0 +1,56 @@
# -*- coding: utf8 -*-
###
# Copyright (c) 2010, Valentin Lorentz
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions, and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions, and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the author of this software nor the name of
# contributors to this software may be used to endorse or promote products
# derived from this software without specific prior written consent.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
###
from supybot.test import *
class SupySandboxTestCase(PluginTestCase):
plugins = ('SupySandbox',)
def testFschfschTestcase(self):
self.assertResponse('runtests', 'True')
def testCodeIsSuccessfullyRunned(self):
self.assertResponse('sandbox 1+1', "2")
self.assertResponse('sandbox print 1+1', "2")
self.assertResponse('sandbox print \'toto\'', "toto")
def testMultine(self):
self.assertResponse('sandbox print 1; print 2', "'1\\n2'")
self.assertResponse('sandbox print 1 $$ print 2', "'1\\n2'")
self.assertResponse('sandbox toto=True $$ while toto: $$ print "foo"'
' $$ toto=False', "foo")
def testProtections(self):
#self.assertResponse('sandbox while True: print 1', "Timeout")
pass
# vim:set shiftwidth=4 tabstop=4 expandtab textwidth=79: