3
0
mirror of https://github.com/jlu5/PyLink.git synced 2024-11-27 21:19:31 +01:00

Irc: rewrite sendq to use queue.Queue, and add an upper bound (maxsendq)

Closes #430. Closes #442.
This commit is contained in:
James Lu 2017-03-31 17:41:56 -07:00
parent ad4fe1924b
commit 348572bcb6
3 changed files with 16 additions and 9 deletions

View File

@ -15,8 +15,9 @@ import hashlib
from copy import deepcopy from copy import deepcopy
import inspect import inspect
import re import re
from collections import defaultdict, deque from collections import defaultdict
import ipaddress import ipaddress
import queue
try: try:
import ircmatch import ircmatch
@ -58,7 +59,7 @@ class Irc(utils.DeprecatedAttributesObject):
self.pingfreq = self.serverdata.get('pingfreq') or 90 self.pingfreq = self.serverdata.get('pingfreq') or 90
self.pingtimeout = self.pingfreq * 2 self.pingtimeout = self.pingfreq * 2
self.queue = deque() self.queue = None
self.connected = threading.Event() self.connected = threading.Event()
self.aborted = threading.Event() self.aborted = threading.Event()
@ -116,7 +117,8 @@ class Irc(utils.DeprecatedAttributesObject):
self.pseudoclient = None self.pseudoclient = None
self.lastping = time.time() self.lastping = time.time()
self.queue.clear() self.maxsendq = self.serverdata.get('maxsendq', 4096)
self.queue = queue.Queue(self.maxsendq)
# Internal variable to set the place and caller of the last command (in PM # Internal variable to set the place and caller of the last command (in PM
# or in a channel), used by fantasy command support. # or in a channel), used by fantasy command support.
@ -175,11 +177,10 @@ class Irc(utils.DeprecatedAttributesObject):
def processQueue(self): def processQueue(self):
"""Loop to process outgoing queue data.""" """Loop to process outgoing queue data."""
while not self.aborted.is_set(): while not self.aborted.is_set():
if self.queue: # Only process if there's data.
data = self.queue.popleft()
self._send(data)
throttle_time = self.serverdata.get('throttle_time', 0.005) throttle_time = self.serverdata.get('throttle_time', 0.005)
self.aborted.wait(throttle_time) data = self.queue.get(throttle_time)
if data:
self._send(data)
log.debug('(%s) Stopping queue thread as aborted is set', self.name) log.debug('(%s) Stopping queue thread as aborted is set', self.name)
def connect(self): def connect(self):
@ -510,7 +511,9 @@ class Irc(utils.DeprecatedAttributesObject):
def send(self, data, queue=True): def send(self, data, queue=True):
"""send() wrapper with optional queueing support.""" """send() wrapper with optional queueing support."""
if queue: if queue:
self.queue.append(data) # XXX: we don't really know how to handle blocking queues yet, so
# it's better to not expose that yet.
self.queue.put_nowait(data)
else: else:
self._send(data) self._send(data)

View File

@ -208,4 +208,4 @@ def clearqueue(irc, source, args):
Clears the outgoing text queue for the current connection.""" Clears the outgoing text queue for the current connection."""
permissions.checkPermissions(irc, source, ['core.clearqueue']) permissions.checkPermissions(irc, source, ['core.clearqueue'])
irc.queue.clear() irc.queue.queue.clear()

View File

@ -178,6 +178,10 @@ servers:
# cause netsplits! This defaults to 30 if not set. # cause netsplits! This defaults to 30 if not set.
maxnicklen: 30 maxnicklen: 30
# Determines the maximum size of the network's outgoing data queue (sendq), in message lines.
# This defaults to 4096 if not set.
#maxsendq: 4096
# Toggles SSL for this network. Defaults to False if not specified. # Toggles SSL for this network. Defaults to False if not specified.
#ssl: true #ssl: true