Misc: Make @last handle --regexp in a single process for all messages

Spawning one process for each message was a little silly, considering
there can be thousands of messages.

Plus, some instances do reach the timeout after running for a few weeks,
so we really need to fix this.

Ideally, `regexp_wrapper` should also be removed from other plugins
(Todo, Notes, ...) as they have the same issues, but this will do for
now.
This commit is contained in:
Valentin Lorentz 2022-02-16 21:27:26 +01:00
parent 4b892c2b1d
commit 4e60d8812d

View File

@ -1,7 +1,7 @@
###
# Copyright (c) 2002-2005, Jeremiah Fincher
# Copyright (c) 2009, James McCoy
# Copyright (c) 2010-2021, Valentin Lorentz
# Copyright (c) 2010-2022, Valentin Lorentz
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@ -34,7 +34,9 @@ import os
import sys
import json
import time
import queue
import functools
import multiprocessing
import supybot
@ -43,6 +45,7 @@ import supybot.conf as conf
from supybot import commands
import supybot.utils as utils
from supybot.commands import *
from supybot.commands import ProcessTimeoutError
import supybot.ircdb as ircdb
import supybot.irclib as irclib
import supybot.utils.minisix as minisix
@ -458,6 +461,9 @@ class Misc(callbacks.Plugin):
msg.channel)
else:
skipfirst = False
final_predicates = []
for (option, arg) in optlist:
if option == 'from':
def f(m, arg=arg):
@ -482,22 +488,34 @@ class Misc(callbacks.Plugin):
return arg.lower() not in m.args[1].lower()
predicates.setdefault('without', []).append(f)
elif option == 'regexp':
def f(m, arg=arg):
def f1(s, arg):
"""Since we can't enqueue match objects into the multiprocessing queue,
we'll just wrap the function to return bools."""
if process(arg.search, s, timeout=0.1) is not None:
return True
else:
return False
if ircmsgs.isAction(m):
m1 = ircmsgs.unAction(m)
else:
m1 = m.args[1]
return regexp_wrapper(m1, reobj=arg, timeout=0.1,
plugin_name=self.name(),
fcn_name='last')
predicates.setdefault('regexp', []).append(f)
def f(messages, arg=arg):
reobj = re.compile(arg)
# using a queue to return results, so we can return at
# least some results in case of timeout
q = multiprocessing.Queue()
def p(messages):
for m in messages:
if ircmsgs.isAction(m):
s = ircmsgs.unAction(m)
else:
s = m.args[1]
if reobj.search(s):
q.put(m)
try:
process(p, messages, timeout=3.,
pn=self.name(), cn='last')
except ProcessTimeoutError:
pass
results = []
while True:
try:
results.append(q.get(False))
except queue.Empty:
break
return results
final_predicates.append(f)
elif option == 'nolimit':
nolimit = True
iterable = filter(functools.partial(self._validLastMsg, irc),
@ -531,24 +549,32 @@ class Misc(callbacks.Plugin):
showNick = False
else:
showNick = True
candidates = []
# Run predicates that filter on individual messages
for m in iterable:
for predicate in predicates:
try:
if not predicate(m):
break
except RegexpTimeout:
irc.error(_('The regular expression timed out.'))
return
if not predicate(m):
break
else:
if nolimit:
resp.append(ircmsgs.prettyPrint(m,
timestampFormat=tsf,
showNick=showNick))
else:
irc.reply(ircmsgs.prettyPrint(m,
timestampFormat=tsf,
showNick=showNick))
return
candidates.append(m)
# Run predicates that filter lists of messages
for predicate in final_predicates:
candidates = predicate(candidates)
for m in candidates:
if nolimit:
resp.append(ircmsgs.prettyPrint(m,
timestampFormat=tsf,
showNick=showNick))
else:
irc.reply(ircmsgs.prettyPrint(m,
timestampFormat=tsf,
showNick=showNick))
return
if not resp:
irc.error(_('I couldn\'t find a message matching that criteria in '
'my history of %s messages.') % len(irc.state.history))