Reduced functionality, but I doubt anyone will complain. Much cleaner and clearer now that we're using flat files.

This commit is contained in:
Jeremy Fincher 2004-07-28 05:59:30 +00:00
parent 46383f9fc7
commit d594232c5c
3 changed files with 199 additions and 307 deletions

View File

@ -1,3 +1,8 @@
* Changed the URL plugin to use flatfiles rather than SQLite
database. Also reduced the functionality of the last command by
removing some options that no one ever really used, and removed
the random command (who uses that anyway?)
* Changed the Words plugin not to use SQLite. We lose the
anagram command, but crossword and hangman become much easier to
use, since all the user has to do is put a words file in

View File

@ -55,12 +55,6 @@ import supybot.privmsgs as privmsgs
import supybot.registry as registry
import supybot.callbacks as callbacks
try:
import sqlite
except ImportError:
raise callbacks.Error, 'You need to have PySQLite installed to use this ' \
'plugin. Download it at <http://pysqlite.sf.net/>'
def configure(advanced):
from supybot.questions import output, expect, anything, something, yn
conf.registerPlugin('URL', True)
@ -93,54 +87,57 @@ conf.registerChannelValue(conf.supybot.plugins.URL, 'nonSnarfingRegexp',
snarfed. Give the empty string if you have no URLs that you'd like to
exclude from being snarfed."""))
class URL(callbacks.PrivmsgCommandAndRegexp,
plugins.ChannelDBHandler):
class URLDB(object):
def __init__(self, channel, log):
self.log = log
dataDir = conf.supybot.directories.data()
self.filename = os.path.join(dataDir, '%s-URL.db' % channel)
def addUrl(self, url, nick):
fd = file(self.filename, 'a')
fd.write('%s %s\n' % (url, nick))
fd.close()
def numUrls(self):
try:
fd = file(self.filename)
except EnvironmentError, e:
self.log.warning('Couldn\'t open %s: %s',
self.filename, utils.exnToString(e))
return 0
try:
return itertools.ilen(fd)
finally:
fd.close()
def getUrls(self, p):
L = []
try:
fd = file(self.filename)
except EnvironmentError, e:
self.log.warning('Couldn\'t open %s: %s',
self.filename, utils.exnToString(e))
return []
try:
for line in fd:
line = line.strip()
(url, nick) = line.split()
if p(url, nick):
L.append(url)
L.reverse()
return L
finally:
fd.close()
class URL(callbacks.PrivmsgCommandAndRegexp):
regexps = ['tinyurlSnarfer', 'titleSnarfer']
_titleRe = re.compile('<title>(.*?)</title>', re.I)
def __init__(self):
self.nextMsgs = {}
plugins.ChannelDBHandler.__init__(self)
callbacks.PrivmsgCommandAndRegexp.__init__(self)
def die(self):
plugins.ChannelDBHandler.die(self)
callbacks.PrivmsgCommandAndRegexp.die(self)
def makeDb(self, filename):
if os.path.exists(filename):
return sqlite.connect(filename)
db = sqlite.connect(filename)
cursor = db.cursor()
cursor.execute("""CREATE TABLE urls (
id INTEGER PRIMARY KEY,
url TEXT,
added TIMESTAMP,
added_by TEXT,
previous_msg TEXT,
current_msg TEXT,
next_msg TEXT,
protocol TEXT,
site TEXT,
filename TEXT
)""")
cursor.execute("""CREATE TABLE tinyurls (
id INTEGER PRIMARY KEY,
url_id INTEGER,
tinyurl TEXT
)""")
db.commit()
return db
_titleRe = re.compile('<title>(.*?)</title>', re.I | re.S)
def getDb(self, channel):
return URLDB(channel, self.log)
def doPrivmsg(self, irc, msg):
channel = msg.args[0]
db = self.getDb(channel)
cursor = db.cursor()
if (msg.nick, channel) in self.nextMsgs:
L = self.nextMsgs.pop((msg.nick, msg.args[0]))
for (url, added) in L:
cursor.execute("""UPDATE urls SET next_msg=%s
WHERE url=%s AND added=%s""",
msg.args[1], url, added)
if ircmsgs.isAction(msg):
text = ircmsgs.unAction(msg)
else:
@ -149,31 +146,17 @@ class URL(callbacks.PrivmsgCommandAndRegexp,
r = self.registryValue('nonSnarfingRegexp', channel)
#self.log.warning(repr(r))
if r and r.search(url):
#self.log.warning('Skipping addition of URL to db.')
self.log.debug('Skipping adding %r to db.', url)
continue
#self.log.warning('Adding URL to db.')
(protocol, site, filename, _, _, _) = urlparse.urlparse(url)
previousMsg = ''
for oldMsg in reversed(irc.state.history):
if oldMsg.command == 'PRIVMSG':
if oldMsg.nick == msg.nick and oldMsg.args[0] == channel:
previousMsg = oldMsg.args[1]
addedBy = msg.nick
added = int(time.time())
cursor.execute("""INSERT INTO urls VALUES
(NULL, %s, %s, %s, %s, %s, '', %s, %s, %s)""",
url, added, addedBy, msg.args[1], previousMsg,
protocol, site, filename)
key = (msg.nick, channel)
self.nextMsgs.setdefault(key, []).append((url, added))
db.commit()
super(URL, self).doPrivmsg(irc, msg)
self.log.debug('Adding %r to db.', url)
db.addUrl(url, msg.nick)
callbacks.PrivmsgCommandAndRegexp.doPrivmsg(self, irc, msg)
def tinyurlSnarfer(self, irc, msg, match):
r"https?://[^\])>\s]{18,}"
if not ircutils.isChannel(msg.args[0]):
return
channel = msg.args[0]
if not ircutils.isChannel(channel):
return
r = self.registryValue('nonSnarfingRegexp', channel)
if self.registryValue('tinyurlSnarfer', channel):
url = match.group(0)
@ -181,18 +164,10 @@ class URL(callbacks.PrivmsgCommandAndRegexp,
return
minlen = self.registryValue('tinyurlSnarfer.minimumLength',channel)
if len(url) >= minlen:
db = self.getDb(channel)
cursor = db.cursor()
try:
(tinyurl, updateDb) = self._getTinyUrl(url, channel)
except sqlite.OperationalError:
irc.error('The database just decided to crap itself.')
return
tinyurl = self._getTinyUrl(url, channel)
if tinyurl is None:
self.log.warning('tinyurl was None for url %r', url)
self.log.warning('Couldn\'t get tinyurl for %r', url)
return
elif updateDb:
self._updateTinyDb(url, tinyurl, channel)
domain = webutils.getDomain(url)
s = '%s (at %s)' % (ircutils.bold(tinyurl), domain)
irc.reply(s, prefixName=False)
@ -200,16 +175,16 @@ class URL(callbacks.PrivmsgCommandAndRegexp,
def titleSnarfer(self, irc, msg, match):
r"https?://[^\])>\s]+"
if not ircutils.isChannel(msg.args[0]):
channel = msg.args[0]
if not ircutils.isChannel(channel):
return
if callbacks.addressed(irc.nick, msg):
return
channel = msg.args[0]
r = self.registryValue('nonSnarfingRegexp', channel)
#self.log.warning('Title: %r' % r)
if self.registryValue('titleSnarfer', channel):
url = match.group(0)
r = self.registryValue('nonSnarfingRegexp', channel)
if r and r.search(url):
self.log.debug('Not titleSnarfing %r.', url)
return
try:
size = conf.supybot.protocols.http.peekSize()
@ -225,71 +200,24 @@ class URL(callbacks.PrivmsgCommandAndRegexp,
irc.reply(s, prefixName=False)
titleSnarfer = privmsgs.urlSnarfer(titleSnarfer)
def _updateTinyDb(self, url, tinyurl, channel):
db = self.getDb(channel)
cursor = db.cursor()
cursor.execute("""INSERT INTO tinyurls
VALUES (NULL, 0, %s)""", tinyurl)
cursor.execute("""SELECT id FROM urls WHERE url=%s""", url)
id = cursor.fetchone()[0]
cursor.execute("""UPDATE tinyurls SET url_id=%s
WHERE tinyurl=%s""", id, tinyurl)
db.commit()
_tinyRe = re.compile(r'<blockquote><b>(http://tinyurl\.com/\w+)</b>')
def _getTinyUrl(self, url, channel, cmd=False):
db = self.getDb(channel)
cursor = db.cursor()
try:
cursor.execute("""SELECT tinyurls.tinyurl FROM urls, tinyurls
WHERE urls.url=%s AND
tinyurls.url_id=urls.id""", url)
except sqlite.OperationalError:
raise
if cursor.rowcount == 0:
updateDb = True
try:
fd = urllib2.urlopen('http://tinyurl.com/create.php?url=%s' %
url)
s = fd.read()
fd.close()
m = self._tinyRe.search(s)
if m is None:
tinyurl = None
else:
tinyurl = m.group(1)
except urllib2.HTTPError, e:
if cmd:
raise callbacks.Error, e.msg()
else:
self.log.warning(str(e))
else:
updateDb = False
tinyurl = cursor.fetchone()[0]
return (tinyurl, updateDb)
def _formatUrl(self, url, added, addedBy):
when = time.strftime(conf.supybot.humanTimestampFormat(),
time.localtime(int(added)))
return '<%s> (added by %s at %s)' % (url, addedBy, when)
def random(self, irc, msg, args):
"""[<channel>]
Returns a random URL from the URL database. <channel> is only required
if the message isn't sent in the channel itself.
"""
channel = privmsgs.getChannel(msg, args)
db = self.getDb(channel)
cursor = db.cursor()
cursor.execute("""SELECT url, added, added_by
FROM urls
ORDER BY random()
LIMIT 1""")
if cursor.rowcount == 0:
irc.reply('I have no URLs in my database for %s' % channel)
else:
irc.reply(self._formatUrl(*cursor.fetchone()))
fd = urllib2.urlopen('http://tinyurl.com/create.php?url=%s' %
url)
s = fd.read()
fd.close()
m = self._tinyRe.search(s)
if m is None:
tinyurl = None
else:
tinyurl = m.group(1)
return tinyurl
except urllib2.HTTPError, e:
if cmd:
raise callbacks.Error, e.msg()
else:
self.log.warning(str(e))
def tiny(self, irc, msg, args):
"""<url>
@ -305,15 +233,10 @@ class URL(callbacks.PrivmsgCommandAndRegexp,
minlen = self.registryValue('tinyurlSnarfer.minimumLength', channel)
r = self.registryValue('nonSnarfingRegexp', channel)
if snarf and len(url) >= minlen and not r.search(url):
self.log.debug('Not applying tiny command, snarfer is active.')
return
try:
(tinyurl, updateDb) = self._getTinyUrl(url, channel, cmd=True)
except sqlite.OperationalError:
irc.error('The database just decided to crap itself.')
return
tinyurl = self._getTinyUrl(url, channel, cmd=True)
if tinyurl is not None:
if updateDb:
self._updateTinyDb(url, tinyurl, channel)
irc.reply(tinyurl)
else:
s = 'Could not parse the TinyURL.com results page.'
@ -328,85 +251,54 @@ class URL(callbacks.PrivmsgCommandAndRegexp,
"""
channel = privmsgs.getChannel(msg, args)
db = self.getDb(channel)
cursor = db.cursor()
cursor.execute("""SELECT COUNT(*) FROM urls""")
(count,) = cursor.fetchone()
count = int(count)
irc.reply('I have %s %s in my database.' %
(count, count == 1 and 'URL' or 'URLs'))
count = db.numUrls()
irc.reply('I have %s in my database.' % utils.nItems('URL', count))
def last(self, irc, msg, args):
"""[<channel>] [--{from,with,at,proto,near}=<value>] --{nolimit,fancy}
"""[<channel>] [--{from,with,proto}=<value>] --{nolimit}
Gives the last URL matching the given criteria. --from is from whom
the URL came; --at is the site of the URL; --proto is the protocol the
URL used; --with is something inside the URL; --near is a string in the
messages before and after the link. If --nolimit is given, returns
all the URLs that are found. --fancy returns information in addition
to just the URL. <channel> is only necessary if the message isn't sent
in the channel itself.
the URL came; --proto is the protocol the URL used; --with is something
inside the URL; If --nolimit is given, returns all the URLs that are
found. to just the URL. <channel> is only necessary if the message
isn't sent in the channel itself.
"""
channel = privmsgs.getChannel(msg, args)
(optlist, rest) = getopt.getopt(args, '', ['from=', 'with=', 'at=',
'proto=', 'near=',
'nolimit', 'fancy'])
criteria = ['1=1']
formats = []
simple = True
(optlist, rest) = getopt.getopt(args, '', ['from=', 'with=',
'proto=', 'nolimit',])
predicates = []
nolimit = False
for (option, argument) in optlist:
option = option.lstrip('-') # Strip off the --.
if option == 'nolimit':
for (option, arg) in optlist:
if option == '--nolimit':
nolimit = True
if option == 'fancy':
simple = False
elif option == 'from':
criteria.append('added_by LIKE %s')
formats.append(argument)
elif option == 'with':
if '%' not in argument and '_' not in argument:
argument = '%%%s%%' % argument
criteria.append('url LIKE %s')
formats.append(argument)
elif option == 'at':
if '%' not in argument and '_' not in argument:
argument = '%' + argument
criteria.append('site LIKE %s')
formats.append(argument)
elif option == 'proto':
criteria.append('protocol=%s')
formats.append(argument)
elif option == 'near':
criteria.append("""(previous_msg LIKE %s OR
next_msg LIKE %s OR
current_msg LIKE %s)""")
if '%' not in argument:
argument = '%%%s%%' % argument
formats.append(argument)
formats.append(argument)
formats.append(argument)
elif option == '--from':
def from_(url, nick, arg=arg):
return nick.lower() == arg.lower()
predicates.append(from_)
elif option == '--with':
def with(url, nick, arg=arg):
return arg in url
predicates.append(with)
elif option == '--proto':
def proto(url, nick, arg=arg):
return url.startswith(arg)
predicates.append(proto)
db = self.getDb(channel)
cursor = db.cursor()
criterion = ' AND '.join(criteria)
sql = """SELECT id, url, added, added_by
FROM urls
WHERE %s ORDER BY id DESC
LIMIT 100""" % criterion
cursor.execute(sql, *formats)
if cursor.rowcount == 0:
def predicate(url, nick):
for predicate in predicates:
if not predicate(url, nick):
return False
return True
urls = db.getUrls(predicate)
if not urls:
irc.reply('No URLs matched that criteria.')
else:
if nolimit:
urls = ['<%s>' % t[1] for t in cursor.fetchall()]
urls = ['<%s>' % url for url in urls]
s = ', '.join(urls)
elif simple:
s = cursor.fetchone()[1]
else:
(id, url, added, added_by) = cursor.fetchone()
timestamp = time.strftime('%I:%M %p, %B %d, %Y',
time.localtime(int(added)))
s = '#%s: <%s>, added by %s at %s.' % \
(id, url, added_by, timestamp)
# We should optimize this with another URLDB method eventually.
s = urls[0]
irc.reply(s)

View File

@ -66,111 +66,106 @@ http://gameknot.com/tsignup.pl
http://lambda.weblogs.com/xml/rss.xml
""".strip().splitlines()
try:
import sqlite
except ImportError:
sqlite = None
class URLTestCase(ChannelPluginTestCase, PluginDocumentation):
plugins = ('URL',)
def setUp(self):
ChannelPluginTestCase.setUp(self)
conf.supybot.plugins.URL.tinyurlSnarfer.setValue(False)
if sqlite is not None:
class URLTestCase(ChannelPluginTestCase, PluginDocumentation):
plugins = ('URL',)
def setUp(self):
ChannelPluginTestCase.setUp(self)
conf.supybot.plugins.URL.tinyurlSnarfer.setValue(False)
def test(self):
counter = 0
self.assertNotError('url random')
for url in urls:
self.assertRegexp('url stats', str(counter))
self.feedMsg(url)
counter += 1
def test(self):
counter = 0
#self.assertNotError('url random')
for url in urls:
self.assertRegexp('url stats', str(counter))
self.assertRegexp('url last', re.escape(urls[-1]))
self.assertRegexp('url last --proto https', re.escape(urls[-3]))
self.assertRegexp('url last --at gameknot.com',re.escape(urls[-2]))
self.assertRegexp('url last --with dhcp', re.escape(urls[-4]))
self.assertRegexp('url last --from alsdkjf', '^No')
self.assertNotError('url random')
self.feedMsg(url)
counter += 1
self.assertRegexp('url stats', str(counter))
self.assertRegexp('url last', re.escape(urls[-1]))
self.assertRegexp('url last --proto https', re.escape(urls[-3]))
self.assertRegexp('url last --with gameknot.com',
re.escape(urls[-2]))
self.assertRegexp('url last --with dhcp', re.escape(urls[-4]))
self.assertRegexp('url last --from alsdkjf', '^No')
self.assertNotError('url random')
def testDefaultNotFancy(self):
self.feedMsg(urls[0])
self.assertResponse('url last', urls[0])
def testDefaultNotFancy(self):
self.feedMsg(urls[0])
self.assertResponse('url last', urls[0])
def testAction(self):
self.irc.feedMsg(ircmsgs.action(self.channel, urls[1]))
self.assertNotRegexp('url last', '\\x01')
def testAction(self):
self.irc.feedMsg(ircmsgs.action(self.channel, urls[1]))
self.assertNotRegexp('url last', '\\x01')
def testNonSnarfingRegexpConfigurable(self):
self.assertNoResponse('http://foo.bar.baz/', 2)
def testNonSnarfingRegexpConfigurable(self):
self.assertNoResponse('http://foo.bar.baz/', 2)
self.assertResponse('url last', 'http://foo.bar.baz/')
try:
conf.supybot.plugins.URL.nonSnarfingRegexp.set('m/biff/i')
self.assertNoResponse('http://biff.bar.baz/', 2)
self.assertResponse('url last', 'http://foo.bar.baz/')
finally:
conf.supybot.plugins.URL.nonSnarfingRegexp.set('')
if network:
def testTinyurl(self):
try:
conf.supybot.plugins.URL.nonSnarfingRegexp.set('m/biff/i')
self.assertNoResponse('http://biff.bar.baz/', 2)
self.assertResponse('url last', 'http://foo.bar.baz/')
conf.supybot.plugins.URL.tinyurlSnarfer.setValue(False)
self.assertRegexp(
'url tiny http://sourceforge.net/tracker/?'
'func=add&group_id=58965&atid=489447',
r'http://tinyurl.com/rqac')
conf.supybot.plugins.URL.tinyurlSnarfer.setValue(True)
self.assertRegexp(
'url tiny http://sourceforge.net/tracker/?'
'func=add&group_id=58965&atid=489447',
r'http://tinyurl.com/rqac')
finally:
conf.supybot.plugins.URL.nonSnarfingRegexp.set('')
conf.supybot.plugins.URL.tinyurlSnarfer.setValue(False)
if network:
def testTinyurl(self):
try:
conf.supybot.plugins.URL.tinyurlSnarfer.setValue(False)
self.assertRegexp(
'url tiny http://sourceforge.net/tracker/?'
'func=add&group_id=58965&atid=489447',
r'http://tinyurl.com/rqac')
conf.supybot.plugins.URL.tinyurlSnarfer.setValue(True)
self.assertRegexp(
'url tiny http://sourceforge.net/tracker/?'
'func=add&group_id=58965&atid=489447',
r'http://tinyurl.com/rqac')
finally:
conf.supybot.plugins.URL.tinyurlSnarfer.setValue(False)
def testTinysnarf(self):
try:
conf.supybot.plugins.URL.tinyurlSnarfer.setValue(True)
self.assertRegexp('http://sourceforge.net/tracker/?'
'func=add&group_id=58965&atid=489447',
r'http://tinyurl.com/rqac.* \(at')
self.assertRegexp(
'http://www.urbandictionary.com/define.php?'
'term=all+your+base+are+belong+to+us',
r'http://tinyurl.com/u479.* \(at')
finally:
conf.supybot.plugins.URL.tinyurlSnarfer.setValue(False)
def testTinysnarf(self):
def testTitleSnarfer(self):
try:
conf.supybot.plugins.URL.titleSnarfer.setValue(True)
self.assertResponse('http://microsoft.com/',
'Title: Microsoft Corporation'
' (at microsoft.com)')
finally:
conf.supybot.plugins.URL.titleSnarfer.setValue(False)
def testNonSnarfing(self):
tiny = conf.supybot.plugins.URL.tinyurlSnarfer()
snarf = conf.supybot.plugins.URL.nonSnarfingRegexp()
title = conf.supybot.plugins.URL.titleSnarfer()
try:
conf.supybot.plugins.URL.nonSnarfingRegexp.set('m/sf/')
try:
conf.supybot.plugins.URL.tinyurlSnarfer.setValue(True)
self.assertRegexp('http://sourceforge.net/tracker/?'
'func=add&group_id=58965&atid=489447',
r'http://tinyurl.com/rqac.* \(at')
self.assertRegexp(
'http://www.urbandictionary.com/define.php?'
'term=all+your+base+are+belong+to+us',
r'http://tinyurl.com/u479.* \(at')
self.assertNoResponse('http://sf.net/', 2)
self.assertResponse('http://www.sourceforge.net/',
'http://tinyurl.com/2cnkf')
finally:
conf.supybot.plugins.URL.tinyurlSnarfer.setValue(False)
def testTitleSnarfer(self):
conf.supybot.plugins.URL.tinyurlSnarfer.setValue(tiny)
try:
conf.supybot.plugins.URL.titleSnarfer.setValue(True)
self.assertResponse('http://microsoft.com/',
'Title: Microsoft Corporation'
' (at microsoft.com)')
self.assertNoResponse('http://sf.net/', 2)
self.assertRegexp('http://www.sourceforge.net/',
r'Sourceforge\.net')
finally:
conf.supybot.plugins.URL.titleSnarfer.setValue(False)
def testNonSnarfing(self):
tiny = conf.supybot.plugins.URL.tinyurlSnarfer()
snarf = conf.supybot.plugins.URL.nonSnarfingRegexp()
title = conf.supybot.plugins.URL.titleSnarfer()
try:
conf.supybot.plugins.URL.nonSnarfingRegexp.set('m/sf/')
try:
conf.supybot.plugins.URL.tinyurlSnarfer.setValue(True)
self.assertNoResponse('http://sf.net/', 2)
self.assertResponse('http://www.sourceforge.net/',
'http://tinyurl.com/2cnkf')
finally:
conf.supybot.plugins.URL.tinyurlSnarfer.setValue(tiny)
try:
conf.supybot.plugins.URL.titleSnarfer.setValue(True)
self.assertNoResponse('http://sf.net/', 2)
self.assertRegexp('http://www.sourceforge.net/',
r'Sourceforge\.net')
finally:
conf.supybot.plugins.URL.titleSnarfer.setValue(title)
finally:
conf.supybot.plugins.URL.nonSnarfingRegexp.setValue(snarf)
conf.supybot.plugins.URL.titleSnarfer.setValue(title)
finally:
conf.supybot.plugins.URL.nonSnarfingRegexp.setValue(snarf)