Massive updates. urllib2 -> webutils, configurables -> registry, fix

Debian.version
This commit is contained in:
James Vega 2004-01-21 04:17:18 +00:00
parent 2f3ad5893c
commit c801aa9c46
2 changed files with 36 additions and 46 deletions

View File

@ -46,14 +46,15 @@ import socket
import urllib import urllib
import fnmatch import fnmatch
import os.path import os.path
import urllib2
from itertools import imap, ifilter from itertools import imap, ifilter
import registry
import conf import conf
import utils import utils
import privmsgs import privmsgs
import webutils
import callbacks import callbacks
import configurable
def configure(onStart, afterConnect, advanced): def configure(onStart, afterConnect, advanced):
@ -77,13 +78,18 @@ def configure(onStart, afterConnect, advanced):
print 'orders of magnitude slower. THIS MEANS IT WILL TAKE AGES ' print 'orders of magnitude slower. THIS MEANS IT WILL TAKE AGES '
print 'TO RUN THIS COMMAND. Don\'t do this.' print 'TO RUN THIS COMMAND. Don\'t do this.'
if yn('Do you want to use a Python equivalent of zegrep?') == 'y': if yn('Do you want to use a Python equivalent of zegrep?') == 'y':
onStart.append('usepythonzegrep') conf.supybot.plugins.Debian.pythonZegrep.setValue(True)
else: else:
print 'I\'ll disable file now.' print 'I\'ll disable file now.'
onStart.append('disable file') onStart.append('disable file')
conf.registerPlugin('Debian')
conf.registerGlobalValue(conf.supybot.plugins.Debian, 'pythonZegrep',
registry.Boolean(False, """An advanced option, mostly just for testing;
uses a Python-coded zegrep rather than the actual zegrep executable,
generally resulting in a 50x slowdown. What would take 2 seconds will
take 100 with this enabled. Don't enable this."""))
class Debian(callbacks.Privmsg, class Debian(callbacks.Privmsg,
configurable.Mixin,
plugins.PeriodicFileDownloader): plugins.PeriodicFileDownloader):
threaded = True threaded = True
periodicFiles = { periodicFiles = {
@ -94,21 +100,12 @@ class Debian(callbacks.Privmsg,
604800, None) 604800, None)
} }
contents = os.path.join(conf.supybot.directories.data(),'Contents-i386.gz') contents = os.path.join(conf.supybot.directories.data(),'Contents-i386.gz')
configurables = configurable.Dictionary(
[('python-zegrep', configurable.BoolType, False,
"""An advanced option, mostly just for testing; uses a Python-coded
zegrep rather than the actual zegrep executable, generally resulting
in a 50x slowdown. What would take 2 seconds will take 100 with this
enabled. Don't enable this.""")]
)
def __init__(self): def __init__(self):
callbacks.Privmsg.__init__(self) callbacks.Privmsg.__init__(self)
configurable.Mixin.__init__(self)
plugins.PeriodicFileDownloader.__init__(self) plugins.PeriodicFileDownloader.__init__(self)
def die(self): def die(self):
callbacks.Privmsg.die(self) callbacks.Privmsg.die(self)
configurable.Mixin.die(self)
def file(self, irc, msg, args): def file(self, irc, msg, args):
"""[--{regexp,exact}=<value>] [<glob>] """[--{regexp,exact}=<value>] [<glob>]
@ -141,7 +138,7 @@ class Debian(callbacks.Privmsg,
except re.error, e: except re.error, e:
irc.error("Error in regexp: %s" % e) irc.error("Error in regexp: %s" % e)
return return
if self.configurables.get('python-zegrep', None): if self.registryValue('pythonZegrep'):
fd = gzip.open(self.contents) fd = gzip.open(self.contents)
r = imap(lambda tup: tup[0], r = imap(lambda tup: tup[0],
ifilter(lambda tup: tup[0], ifilter(lambda tup: tup[0],
@ -181,10 +178,9 @@ class Debian(callbacks.Privmsg,
irc.reply(utils.commaAndify(packages)) irc.reply(utils.commaAndify(packages))
_debreflags = re.DOTALL | re.IGNORECASE _debreflags = re.DOTALL | re.IGNORECASE
_debpkgre = re.compile(r'<a[^>]+>(.*?)</a>', _debreflags) _debbrre = re.compile(r'<li><a href[^>]+>(.*?)</a> \(', _debreflags)
_debbrre = re.compile(r'<td align="center">(\S+)\s*</?td>', _debreflags) _debverre = re.compile(r'<br>\d+?:(\S+):', _debreflags)
_debtablere = re.compile(r'<table[^>]*>(.*?)</table>', _debreflags) _deblistre = re.compile(r'<h3>Package ([^<]+)</h3>(.*?)</ul>', _debreflags)
_debnumpkgsre = re.compile(r'out of total of (\d+)', _debreflags)
_debBranches = ('stable', 'testing', 'unstable', 'experimental') _debBranches = ('stable', 'testing', 'unstable', 'experimental')
def version(self, irc, msg, args): def version(self, irc, msg, args):
"""[stable|testing|unstable|experimental] <package name> """[stable|testing|unstable|experimental] <package name>
@ -202,47 +198,37 @@ class Debian(callbacks.Privmsg,
irc.error('You must give a package name.') irc.error('You must give a package name.')
return return
responses = [] responses = []
numberOfPackages = 0
package = privmsgs.getArgs(args) package = privmsgs.getArgs(args)
package = urllib.quote(package) package = urllib.quote(package)
url = 'http://packages.debian.org/cgi-bin/search_packages.pl?keywords'\ url = 'http://packages.debian.org/cgi-bin/search_packages.pl?keywords'\
'=%s&searchon=names&version=%s&release=all' % (package, branch) '=%s&searchon=names&version=%s&release=all' % (package, branch)
try: try:
fd = urllib2.urlopen(url) html = webutils.getUrl(url)
html = fd.read() except webutils.WebError, e:
fd.close()
except urllib2.HTTPError, e:
irc.error('I couldn\'t reach the search page (%s).' % e) irc.error('I couldn\'t reach the search page (%s).' % e)
return return
except socket.error, e:
if e.args[0] == 110 or e.args[0] == 10060:
irc.error('Connection timed out to packages.debian.org.')
return
else:
raise
if 'is down at the moment' in html: if 'is down at the moment' in html:
irc.error('Packages.debian.org is down at the moment. ' irc.error('Packages.debian.org is down at the moment. '
'Please try again later.') 'Please try again later.')
return return
m = self._debnumpkgsre.search(html) pkgs = self._deblistre.findall(html)
if m: self.log.warning(pkgs)
numberOfPackages = m.group(1) if not pkgs:
m = self._debtablere.search(html)
if m is None:
irc.reply('No package found for %s (%s)' % irc.reply('No package found for %s (%s)' %
(urllib.unquote(package), branch)) (urllib.unquote(package), branch))
else: else:
tableData = m.group(1) for pkg in pkgs:
rows = tableData.split('</TR>') pkgMatch = pkg[0]
for row in rows: brMatch = self._debbrre.findall(pkg[1])
pkgMatch = self._debpkgre.search(row) verMatch = self._debverre.findall(pkg[1])
brMatch = self._debbrre.search(row) if pkgMatch and brMatch and verMatch:
if pkgMatch and brMatch: versions = zip(brMatch, verMatch)
s = '%s (%s)' % (pkgMatch.group(1), brMatch.group(1)) for version in versions:
responses.append(s) s = '%s (%s)' % (pkgMatch, ': '.join(version))
resp = 'Total matches: %s, shown: %s. %s' % \ responses.append(s)
(numberOfPackages, len(responses), ', '.join(responses)) resp = '%s matches found: %s' % \
(len(responses), '; '.join(responses))
irc.reply(resp) irc.reply(resp)
_incomingRe = re.compile(r'<a href="(.*?\.deb)">', re.I) _incomingRe = re.compile(r'<a href="(.*?\.deb)">', re.I)
@ -275,7 +261,11 @@ class Debian(callbacks.Privmsg,
glob = '*%s*' % glob glob = '*%s*' % glob
predicates.append(lambda s: fnmatch.fnmatch(s, glob)) predicates.append(lambda s: fnmatch.fnmatch(s, glob))
packages = [] packages = []
fd = urllib2.urlopen('http://incoming.debian.org/') try:
fd = webutils.getUrlFd('http://incoming.debian.org/')
except webutils.WebError, e:
irc.error(e)
return
for line in fd: for line in fd:
m = self._incomingRe.search(line) m = self._incomingRe.search(line)
if m: if m:

View File

@ -67,7 +67,7 @@ if network:
self.assertRegexp('debian version unstable alkdjfad', self.assertRegexp('debian version unstable alkdjfad',
r'^No package.*\(unstable\)') r'^No package.*\(unstable\)')
self.assertRegexp('debian version gaim', self.assertRegexp('debian version gaim',
r'Total matches:.*gaim.*\(stable\)') r'\d+ matches found:.*gaim.*\(stable')
self.assertError('debian version unstable') self.assertError('debian version unstable')
def testDebfile(self): def testDebfile(self):