[wip] support outgoing batches

This commit is contained in:
Valentin Lorentz 2021-03-13 12:12:03 +01:00
parent 8b90884fa0
commit e19436a4ba

View File

@ -1218,6 +1218,15 @@ class Irc(IrcCommandDispatcher, log.Firewalled):
self.state = IrcState() self.state = IrcState()
self.queue = IrcMsgQueue() self.queue = IrcMsgQueue()
self.fastqueue = smallqueue() self.fastqueue = smallqueue()
# Messages of batches that are currently in one self.queue (not
# self.fastqueue).
# This works by adding only the first message of a batch in a queue,
# and when self.takeMsg pops that message from the queue, it will
# also pop the whole batch from self._queued_batches and atomically
# add it to self.fastqueue
self._queued_batches = {}
self.driver = None # The driver should set this later. self.driver = None # The driver should set this later.
self._setNonResettingVariables() self._setNonResettingVariables()
self._queueConnectMessages() self._queueConnectMessages()
@ -1306,6 +1315,9 @@ class Irc(IrcCommandDispatcher, log.Firewalled):
def queueMsg(self, msg): def queueMsg(self, msg):
"""Queues a message to be sent to the server.""" """Queues a message to be sent to the server."""
if msg.command.upper() == 'BATCH':
log.error('Tried to send a BATCH message using queueMsg '
'instead of queueBatch: %r', msg)
if not self.zombie: if not self.zombie:
return self.queue.enqueue(msg) return self.queue.enqueue(msg)
else: else:
@ -1314,11 +1326,60 @@ class Irc(IrcCommandDispatcher, log.Firewalled):
def sendMsg(self, msg): def sendMsg(self, msg):
"""Queues a message to be sent to the server *immediately*""" """Queues a message to be sent to the server *immediately*"""
if msg.command.upper() == 'BATCH':
log.error('Tried to send a BATCH message using sendMsg '
'instead of queueBatch: %r', msg)
if not self.zombie: if not self.zombie:
self.fastqueue.enqueue(msg) self.fastqueue.enqueue(msg)
else: else:
log.warning('Refusing to send %r; %s is a zombie.', msg, self) log.warning('Refusing to send %r; %s is a zombie.', msg, self)
def queueBatch(self, msgs):
"""Queues a batch of messages to be sent to the server.
See <https://ircv3.net/specs/extensions/batch-3.2>
queueMsg/sendMsg must not be used repeatedly to send a batch, because
they do not guarantee the batch is send atomically, which is
required because "Clients MUST NOT send messages other than PRIVMSG
while a multiline batch is open."
-- <https://ircv3.net/specs/extensions/multiline>
"""
if not conf.supybot.protocols.irc.experimentalExtensions():
log.error('queueBatch is disabled because it depends on draft '
'IRC specifications. If you know what you are doing, '
'set supybot.protocols.irc.experimentalExtensions.')
return
if len(msg) < 2:
log.error('queueBatch called with less than two messages.')
return
if msgs[0].command.upper() != 'BATCH' or msgs[0].args[0][0] != '+':
log.error('queueBatch called with non-"BATCH +" as first message.')
return
if msgs[-1].command.upper() != 'BATCH' or msgs[-1].args[0][0] != '-':
log.error('queueBatch called with non-"BATCH -" as last message.')
return
batch_name = msgs[0].args[0][1:]
if msgs[0].args[0][1:] != batch_name:
log.error('queueBatch called with mismatched BATCH name args.')
return
if any(msg.server_tags.get('batch') != batch_name for msg in msgs):
log.error('queueBatch called with mismatched batch names.')
return
if batch_name in self._queued_batches:
log.error('queueBatch called with a batch name already in flight')
return
self._queued_batches[batch_name] = msgs
# Enqueue only the start of the batch. When takeMsg sees it, it will
# enqueue the full batch in self.fastqueue.
# We don't enqueue the full batch in self.fastqueue here, because
# there is no reason for this batch to jump in front of all other
# queued messages.
# TODO: the batch will be ordered with the priority of a BATCH
# message (ie. normal), but if the batch is made only of low-priority
# messages like PRIVMSG, it should have that priority.
self.queue.enqueue(msgs[0])
def _truncateMsg(self, msg): def _truncateMsg(self, msg):
msg_str = str(msg) msg_str = str(msg)
if msg_str[0] == '@': if msg_str[0] == '@':
@ -1381,7 +1442,36 @@ class Irc(IrcCommandDispatcher, log.Firewalled):
now = str(int(now)) now = str(int(now))
self.outstandingPing = True self.outstandingPing = True
self.queueMsg(ircmsgs.ping(now)) self.queueMsg(ircmsgs.ping(now))
if msg: if msg:
if msg.command.upper() == 'BATCH':
if not conf.supybot.protocols.irc.experimentalExtensions():
log.error('Dropping outgoing batch.'
'supybot.protocols.irc.experimentalExtensions'
'is disabled, so plugins should not send '
'batches. This is a bug, please report it.')
return None
if msg.args[0].startswith('+'):
# Start of a batch; created by self.queueBatch. We need to
# add *prepend* the rest of the batch to the fastqueue
# so that no other message is sent while the batch is
# open.
# "Clients MUST NOT send messages other than PRIVMSG while
# a multiline batch is open."
# -- <https://ircv3.net/specs/extensions/multiline>
#
# (Yes, *prepend* to the queue. Fortunately, it should be
# empty, because BATCH cannot be queued in the fastqueue
# and we just got a BATCH, which means it's from the
# regular queue, which means the fastqueue is empty.
# But let's not take any risk, eg. if race condition
# with a plugin appending directly to the fastqueue.)
batch_messages = self._queued_batches
if batch_messages[0] != msg:
log.error('Enqueue "BATCH +" message does not match '
'the one of the batch in flight.')
self.fastqueue[:0] = batch_messages[1:]
if not world.testing and 'label' not in msg.server_tags \ if not world.testing and 'label' not in msg.server_tags \
and 'labeled-response' in self.state.capabilities_ack: and 'labeled-response' in self.state.capabilities_ack:
# Not adding labels while testing, because it would break # Not adding labels while testing, because it would break