diff --git a/classes.py b/classes.py index fd309b6..dc5aca4 100644 --- a/classes.py +++ b/classes.py @@ -34,7 +34,7 @@ class ProtocolError(RuntimeError): ### Internal classes (users, servers, channels) -class Irc(utils.DeprecatedAttributesObject, utils.CamelCaseToSnakeCase): +class PyLinkNetworkCore(utils.DeprecatedAttributesObject, utils.CamelCaseToSnakeCase): """Base IRC object for PyLink.""" def __init__(self, netname, proto, conf): @@ -75,15 +75,6 @@ class Irc(utils.DeprecatedAttributesObject, utils.CamelCaseToSnakeCase): self.init_vars() - if world.testing: - # HACK: Don't thread if we're running tests. - self.connect() - else: - self.connection_thread = threading.Thread(target=self.connect, - name="Listener for %s" % - self.name) - self.connection_thread.start() - def log_setup(self): """ Initializes any channel loggers defined for the current network. @@ -178,295 +169,12 @@ class Irc(utils.DeprecatedAttributesObject, utils.CamelCaseToSnakeCase): # Set up channel logging for the network self.log_setup() - def process_queue(self): - """Loop to process outgoing queue data.""" - while True: - throttle_time = self.serverdata.get('throttle_time', 0.005) - if not self.aborted.wait(throttle_time): - data = self.queue.get() - if data is None: - log.debug('(%s) Stopping queue thread due to getting None as item', self.name) - break - elif data: - self._send(data) - else: - break - - def connect(self): - """ - Runs the connect loop for the IRC object. This is usually called by - __init__ in a separate thread to allow multiple concurrent connections. - """ - while True: - - self.aborted.clear() - self.init_vars() - - try: - self.proto.validateServerConf() - except AssertionError as e: - log.exception("(%s) Configuration error: %s", self.name, e) - return - - ip = self.serverdata["ip"] - port = self.serverdata["port"] - checks_ok = True - try: - # Set the socket type (IPv6 or IPv4). - stype = socket.AF_INET6 if self.serverdata.get("ipv6") else socket.AF_INET - - # Creat the socket. - self.socket = socket.socket(stype) - self.socket.setblocking(0) - - # Set the socket bind if applicable. - if 'bindhost' in self.serverdata: - self.socket.bind((self.serverdata['bindhost'], 0)) - - # Set the connection timeouts. Initial connection timeout is a - # lot smaller than the timeout after we've connected; this is - # intentional. - self.socket.settimeout(self.pingfreq) - - # Resolve hostnames if it's not an IP address already. - old_ip = ip - ip = socket.getaddrinfo(ip, port, stype)[0][-1][0] - log.debug('(%s) Resolving address %s to %s', self.name, old_ip, ip) - - # Enable SSL if set to do so. - self.ssl = self.serverdata.get('ssl') - if self.ssl: - log.info('(%s) Attempting SSL for this connection...', self.name) - certfile = self.serverdata.get('ssl_certfile') - keyfile = self.serverdata.get('ssl_keyfile') - - context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - # Disable SSLv2 and SSLv3 - these are insecure - context.options |= ssl.OP_NO_SSLv2 - context.options |= ssl.OP_NO_SSLv3 - - # Cert and key files are optional, load them if specified. - if certfile and keyfile: - try: - context.load_cert_chain(certfile, keyfile) - except OSError: - log.exception('(%s) Caught OSError trying to ' - 'initialize the SSL connection; ' - 'are "ssl_certfile" and ' - '"ssl_keyfile" set correctly?', - self.name) - checks_ok = False - - self.socket = context.wrap_socket(self.socket) - - log.info("Connecting to network %r on %s:%s", self.name, ip, port) - self.socket.connect((ip, port)) - self.socket.settimeout(self.pingtimeout) - - # If SSL was enabled, optionally verify the certificate - # fingerprint for some added security. I don't bother to check - # the entire certificate for validity, since most IRC networks - # self-sign their certificates anyways. - if self.ssl and checks_ok: - peercert = self.socket.getpeercert(binary_form=True) - - # Hash type is configurable using the ssl_fingerprint_type - # value, and defaults to sha256. - hashtype = self.serverdata.get('ssl_fingerprint_type', 'sha256').lower() - - try: - hashfunc = getattr(hashlib, hashtype) - except AttributeError: - log.error('(%s) Unsupported SSL certificate fingerprint type %r given, disconnecting...', - self.name, hashtype) - checks_ok = False - else: - fp = hashfunc(peercert).hexdigest() - expected_fp = self.serverdata.get('ssl_fingerprint') - - if expected_fp and checks_ok: - if fp != expected_fp: - # SSL Fingerprint doesn't match; break. - log.error('(%s) Uplink\'s SSL certificate ' - 'fingerprint (%s) does not match the ' - 'one configured: expected %r, got %r; ' - 'disconnecting...', self.name, hashtype, - expected_fp, fp) - checks_ok = False - else: - log.info('(%s) Uplink SSL certificate fingerprint ' - '(%s) verified: %r', self.name, hashtype, - fp) - else: - log.info('(%s) Uplink\'s SSL certificate fingerprint (%s) ' - 'is %r. You can enhance the security of your ' - 'link by specifying this in a "ssl_fingerprint"' - ' option in your server block.', self.name, - hashtype, fp) - - if checks_ok: - - self.queue_thread = threading.Thread(name="Queue thread for %s" % self.name, - target=self.process_queue, daemon=True) - self.queue_thread.start() - - self.sid = self.serverdata.get("sid") - # All our checks passed, get the protocol module to connect and run the listen - # loop. This also updates any SID values should the protocol module do so. - self.proto.connect() - - log.info('(%s) Enumerating our own SID %s', self.name, self.sid) - host = self.hostname() - - self.servers[self.sid] = IrcServer(None, host, internal=True, - desc=self.serverdata.get('serverdesc') - or conf.conf['bot']['serverdesc']) - - log.info('(%s) Starting ping schedulers....', self.name) - self.schedule_ping() - log.info('(%s) Server ready; listening for data.', self.name) - self.autoconnect_active_multiplier = 1 # Reset any extra autoconnect delays - self.run() - else: # Configuration error :( - log.error('(%s) A configuration error was encountered ' - 'trying to set up this connection. Please check' - ' your configuration file and try again.', - self.name) - # self.run() or the protocol module it called raised an exception, meaning we've disconnected! - # Note: socket.error, ConnectionError, IOError, etc. are included in OSError since Python 3.3, - # so we don't need to explicitly catch them here. - # We also catch SystemExit here as a way to abort out connection threads properly, and stop the - # IRC connection from freezing instead. - except (OSError, RuntimeError, SystemExit) as e: - log.exception('(%s) Disconnected from IRC:', self.name) - - self.disconnect() - - # If autoconnect is enabled, loop back to the start. Otherwise, - # return and stop. - autoconnect = self.serverdata.get('autoconnect') - - # Sets the autoconnect growth multiplier (e.g. a value of 2 multiplies the autoconnect - # time by 2 on every failure, etc.) - autoconnect_multiplier = self.serverdata.get('autoconnect_multiplier', 2) - autoconnect_max = self.serverdata.get('autoconnect_max', 1800) - # These values must at least be 1. - autoconnect_multiplier = max(autoconnect_multiplier, 1) - autoconnect_max = max(autoconnect_max, 1) - - log.debug('(%s) Autoconnect delay set to %s seconds.', self.name, autoconnect) - if autoconnect is not None and autoconnect >= 1: - log.debug('(%s) Multiplying autoconnect delay %s by %s.', self.name, autoconnect, self.autoconnect_active_multiplier) - autoconnect *= self.autoconnect_active_multiplier - # Add a cap on the max. autoconnect delay, so that we don't go on forever... - autoconnect = min(autoconnect, autoconnect_max) - - log.info('(%s) Going to auto-reconnect in %s seconds.', self.name, autoconnect) - # Continue when either self.aborted is set or the autoconnect time passes. - # Compared to time.sleep(), this allows us to stop connections quicker if we - # break while while for autoconnect. - self.aborted.clear() - self.aborted.wait(autoconnect) - - # Store in the local state what the autoconnect multiplier currently is. - self.autoconnect_active_multiplier *= autoconnect_multiplier - - if self not in world.networkobjects.values(): - log.debug('Stopping stale connect loop for old connection %r', self.name) - return - - else: - log.info('(%s) Stopping connect loop (autoconnect value %r is < 1).', self.name, autoconnect) - return - - def disconnect(self): - """Handle disconnects from the remote server.""" - was_successful = self.connected.is_set() - log.debug('(%s) disconnect: got %s for was_successful state', self.name, was_successful) - - log.debug('(%s) disconnect: Clearing self.connected state.', self.name) - self.connected.clear() - - log.debug('(%s) Removing channel logging handlers due to disconnect.', self.name) - while self.loghandlers: - log.removeHandler(self.loghandlers.pop()) - - try: - log.debug('(%s) disconnect: Shutting down socket.', self.name) - self.socket.shutdown(socket.SHUT_RDWR) - except Exception as e: # Socket timed out during creation; ignore - log.debug('(%s) error on socket shutdown: %s: %s', self.name, type(e).__name__, e) - - self.socket.close() - - # Stop the queue thread. - if self.queue: - # XXX: queue.Queue.queue isn't actually documented, so this is probably not reliable in the long run. - self.queue.queue.appendleft(None) - - # Stop the ping timer. - if self.pingTimer: - log.debug('(%s) Canceling pingTimer at %s due to disconnect() call', self.name, time.time()) - self.pingTimer.cancel() - - log.debug('(%s) disconnect: Setting self.aborted to True.', self.name) - self.aborted.set() - - # Internal hook signifying that a network has disconnected. - self.call_hooks([None, 'PYLINK_DISCONNECT', {'was_successful': was_successful}]) - - log.debug('(%s) disconnect: Clearing state via init_vars().', self.name) - self.init_vars() - - def run(self): - """Main IRC loop which listens for messages.""" - buf = b"" - data = b"" - while not self.aborted.is_set(): - - try: - data = self.socket.recv(2048) - except OSError: - # Suppress socket read warnings from lingering recv() calls if - # we've been told to shutdown. - if self.aborted.is_set(): - return - raise - - buf += data - if not data: - log.error('(%s) No data received, disconnecting!', self.name) - return - elif (time.time() - self.lastping) > self.pingtimeout: - log.error('(%s) Connection timed out.', self.name) - return - - while b'\n' in buf: - line, buf = buf.split(b'\n', 1) - line = line.strip(b'\r') - line = line.decode(self.encoding, "replace") - self.runline(line) - - def runline(self, line): - """Sends a command to the protocol module.""" - log.debug("(%s) <- %s", self.name, line) - try: - hook_args = self.proto.handle_events(line) - except Exception: - log.exception('(%s) Caught error in handle_events, disconnecting!', self.name) - log.error('(%s) The offending line was: <- %s', self.name, line) - self.aborted.set() - return - # Only call our hooks if there's data to process. Handlers that support - # hooks will return a dict of parsed arguments, which can be passed on - # to plugins and the like. For example, the JOIN handler will return - # something like: {'channel': '#whatever', 'users': ['UID1', 'UID2', - # 'UID3']}, etc. - if hook_args is not None: - self.call_hooks(hook_args) - - return hook_args + ''' + def __repr__(self): + return "" % self.name + ''' + ### General utility functions def call_hooks(self, hook_args): """Calls a hook function with the given hook args.""" numeric, command, parsed_args = hook_args @@ -506,47 +214,6 @@ class Irc(utils.DeprecatedAttributesObject, utils.CamelCaseToSnakeCase): hook_args) continue - def _send(self, data): - """Sends raw text to the uplink server.""" - # Safeguard against newlines in input!! Otherwise, each line gets - # treated as a separate command, which is particularly nasty. - data = data.replace('\n', ' ') - encoded_data = data.encode(self.encoding, 'replace') + b"\n" - - log.debug("(%s) -> %s", self.name, data) - - try: - self.socket.send(encoded_data) - except (OSError, AttributeError): - log.exception("(%s) Failed to send message %r; did the network disconnect?", self.name, data) - - def send(self, data, queue=True): - """send() wrapper with optional queueing support.""" - if self.aborted.is_set(): - log.debug('(%s) refusing to queue data %r as self.aborted is set', self.name, data) - return - if queue: - # XXX: we don't really know how to handle blocking queues yet, so - # it's better to not expose that yet. - self.queue.put_nowait(data) - else: - self._send(data) - - def schedule_ping(self): - """Schedules periodic pings in a loop.""" - self.proto.ping() - - self.pingTimer = threading.Timer(self.pingfreq, self.schedule_ping) - self.pingTimer.daemon = True - self.pingTimer.name = 'Ping timer loop for %s' % self.name - self.pingTimer.start() - - log.debug('(%s) Ping scheduled at %s', self.name, time.time()) - - def __repr__(self): - return "" % self.name - - ### General utility functions def call_command(self, source, text): """ Calls a PyLink bot command. source is the caller's UID, and text is the @@ -615,6 +282,29 @@ class Irc(utils.DeprecatedAttributesObject, utils.CamelCaseToSnakeCase): # This is a stub to alias error to reply self.reply("Error: %s" % text, **kwargs) + ### Configuration-based lookup functions. + def version(self): + """ + Returns a detailed version string including the PyLink daemon version, + the protocol module in use, and the server hostname. + """ + fullversion = 'PyLink-%s. %s :[protocol:%s, encoding:%s]' % (__version__, self.hostname(), self.protoname, self.encoding) + return fullversion + + def hostname(self): + """ + Returns the server hostname used by PyLink on the given server. + """ + return self.serverdata.get('hostname', world.fallback_hostname) + + def get_full_network_name(self): + """ + Returns the full network name (as defined by the "netname" option), or the + short network name if that isn't defined. + """ + return self.serverdata.get('netname', self.name) + +class PyLinkNetworkCoreWithUtils(PyLinkNetworkCore): def to_lower(self, text): """Returns a lowercase representation of text based on the IRC object's casemapping (rfc1459 or ascii).""" @@ -1008,20 +698,6 @@ class Irc(utils.DeprecatedAttributesObject, utils.CamelCaseToSnakeCase): log.debug('wrap_modes: returning %s for %s', strings, orig_modes) return strings - def version(self): - """ - Returns a detailed version string including the PyLink daemon version, - the protocol module in use, and the server hostname. - """ - fullversion = 'PyLink-%s. %s :[protocol:%s, encoding:%s]' % (__version__, self.hostname(), self.protoname, self.encoding) - return fullversion - - def hostname(self): - """ - Returns the server hostname used by PyLink on the given server. - """ - return self.serverdata.get('hostname', world.fallback_hostname) - ### State checking functions def nick_to_uid(self, nick): """Looks up the UID of a user with the given nick, if one is present.""" @@ -1119,13 +795,6 @@ class Irc(utils.DeprecatedAttributesObject, utils.CamelCaseToSnakeCase): else: raise KeyError("Unknown UID/SID %s" % entityid) - def get_full_network_name(self): - """ - Returns the full network name (as defined by the "netname" option), or the - short network name if that isn't defined. - """ - return self.serverdata.get('netname', self.name) - def is_oper(self, uid, allowAuthed=True, allowOper=True): """ Returns whether the given user has operator status on PyLink. This can be achieved @@ -1250,6 +919,347 @@ class Irc(utils.DeprecatedAttributesObject, utils.CamelCaseToSnakeCase): result = not result return result +class PyLinkIRCNetwork(PyLinkNetworkCoreWithUtils): + def __init__(self, *args): + super().__init__(*args) + if world.testing: + # HACK: Don't thread if we're running tests. + self.connect() + else: + self.connection_thread = threading.Thread(target=self.connect, + name="Listener for %s" % + self.name) + self.connection_thread.start() + + def schedule_ping(self): + """Schedules periodic pings in a loop.""" + self.proto.ping() + + self.pingTimer = threading.Timer(self.pingfreq, self.schedule_ping) + self.pingTimer.daemon = True + self.pingTimer.name = 'Ping timer loop for %s' % self.name + self.pingTimer.start() + + log.debug('(%s) Ping scheduled at %s', self.name, time.time()) + + def connect(self): + """ + Runs the connect loop for the IRC object. This is usually called by + __init__ in a separate thread to allow multiple concurrent connections. + """ + while True: + + self.aborted.clear() + self.init_vars() + + try: + self.proto.validateServerConf() + except AssertionError as e: + log.exception("(%s) Configuration error: %s", self.name, e) + return + + ip = self.serverdata["ip"] + port = self.serverdata["port"] + checks_ok = True + try: + # Set the socket type (IPv6 or IPv4). + stype = socket.AF_INET6 if self.serverdata.get("ipv6") else socket.AF_INET + + # Creat the socket. + self.socket = socket.socket(stype) + self.socket.setblocking(0) + + # Set the socket bind if applicable. + if 'bindhost' in self.serverdata: + self.socket.bind((self.serverdata['bindhost'], 0)) + + # Set the connection timeouts. Initial connection timeout is a + # lot smaller than the timeout after we've connected; this is + # intentional. + self.socket.settimeout(self.pingfreq) + + # Resolve hostnames if it's not an IP address already. + old_ip = ip + ip = socket.getaddrinfo(ip, port, stype)[0][-1][0] + log.debug('(%s) Resolving address %s to %s', self.name, old_ip, ip) + + # Enable SSL if set to do so. + self.ssl = self.serverdata.get('ssl') + if self.ssl: + log.info('(%s) Attempting SSL for this connection...', self.name) + certfile = self.serverdata.get('ssl_certfile') + keyfile = self.serverdata.get('ssl_keyfile') + + context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + # Disable SSLv2 and SSLv3 - these are insecure + context.options |= ssl.OP_NO_SSLv2 + context.options |= ssl.OP_NO_SSLv3 + + # Cert and key files are optional, load them if specified. + if certfile and keyfile: + try: + context.load_cert_chain(certfile, keyfile) + except OSError: + log.exception('(%s) Caught OSError trying to ' + 'initialize the SSL connection; ' + 'are "ssl_certfile" and ' + '"ssl_keyfile" set correctly?', + self.name) + checks_ok = False + + self.socket = context.wrap_socket(self.socket) + + log.info("Connecting to network %r on %s:%s", self.name, ip, port) + self.socket.connect((ip, port)) + self.socket.settimeout(self.pingtimeout) + + # If SSL was enabled, optionally verify the certificate + # fingerprint for some added security. I don't bother to check + # the entire certificate for validity, since most IRC networks + # self-sign their certificates anyways. + if self.ssl and checks_ok: + peercert = self.socket.getpeercert(binary_form=True) + + # Hash type is configurable using the ssl_fingerprint_type + # value, and defaults to sha256. + hashtype = self.serverdata.get('ssl_fingerprint_type', 'sha256').lower() + + try: + hashfunc = getattr(hashlib, hashtype) + except AttributeError: + log.error('(%s) Unsupported SSL certificate fingerprint type %r given, disconnecting...', + self.name, hashtype) + checks_ok = False + else: + fp = hashfunc(peercert).hexdigest() + expected_fp = self.serverdata.get('ssl_fingerprint') + + if expected_fp and checks_ok: + if fp != expected_fp: + # SSL Fingerprint doesn't match; break. + log.error('(%s) Uplink\'s SSL certificate ' + 'fingerprint (%s) does not match the ' + 'one configured: expected %r, got %r; ' + 'disconnecting...', self.name, hashtype, + expected_fp, fp) + checks_ok = False + else: + log.info('(%s) Uplink SSL certificate fingerprint ' + '(%s) verified: %r', self.name, hashtype, + fp) + else: + log.info('(%s) Uplink\'s SSL certificate fingerprint (%s) ' + 'is %r. You can enhance the security of your ' + 'link by specifying this in a "ssl_fingerprint"' + ' option in your server block.', self.name, + hashtype, fp) + + if checks_ok: + + self.queue_thread = threading.Thread(name="Queue thread for %s" % self.name, + target=self.process_queue, daemon=True) + self.queue_thread.start() + + self.sid = self.serverdata.get("sid") + # All our checks passed, get the protocol module to connect and run the listen + # loop. This also updates any SID values should the protocol module do so. + self.proto.connect() + + log.info('(%s) Enumerating our own SID %s', self.name, self.sid) + host = self.hostname() + + self.servers[self.sid] = IrcServer(None, host, internal=True, + desc=self.serverdata.get('serverdesc') + or conf.conf['bot']['serverdesc']) + + log.info('(%s) Starting ping schedulers....', self.name) + self.schedule_ping() + log.info('(%s) Server ready; listening for data.', self.name) + self.autoconnect_active_multiplier = 1 # Reset any extra autoconnect delays + self.run() + else: # Configuration error :( + log.error('(%s) A configuration error was encountered ' + 'trying to set up this connection. Please check' + ' your configuration file and try again.', + self.name) + # self.run() or the protocol module it called raised an exception, meaning we've disconnected! + # Note: socket.error, ConnectionError, IOError, etc. are included in OSError since Python 3.3, + # so we don't need to explicitly catch them here. + # We also catch SystemExit here as a way to abort out connection threads properly, and stop the + # IRC connection from freezing instead. + except (OSError, RuntimeError, SystemExit) as e: + log.exception('(%s) Disconnected from IRC:', self.name) + + self.disconnect() + + # If autoconnect is enabled, loop back to the start. Otherwise, + # return and stop. + autoconnect = self.serverdata.get('autoconnect') + + # Sets the autoconnect growth multiplier (e.g. a value of 2 multiplies the autoconnect + # time by 2 on every failure, etc.) + autoconnect_multiplier = self.serverdata.get('autoconnect_multiplier', 2) + autoconnect_max = self.serverdata.get('autoconnect_max', 1800) + # These values must at least be 1. + autoconnect_multiplier = max(autoconnect_multiplier, 1) + autoconnect_max = max(autoconnect_max, 1) + + log.debug('(%s) Autoconnect delay set to %s seconds.', self.name, autoconnect) + if autoconnect is not None and autoconnect >= 1: + log.debug('(%s) Multiplying autoconnect delay %s by %s.', self.name, autoconnect, self.autoconnect_active_multiplier) + autoconnect *= self.autoconnect_active_multiplier + # Add a cap on the max. autoconnect delay, so that we don't go on forever... + autoconnect = min(autoconnect, autoconnect_max) + + log.info('(%s) Going to auto-reconnect in %s seconds.', self.name, autoconnect) + # Continue when either self.aborted is set or the autoconnect time passes. + # Compared to time.sleep(), this allows us to stop connections quicker if we + # break while while for autoconnect. + self.aborted.clear() + self.aborted.wait(autoconnect) + + # Store in the local state what the autoconnect multiplier currently is. + self.autoconnect_active_multiplier *= autoconnect_multiplier + + if self not in world.networkobjects.values(): + log.debug('Stopping stale connect loop for old connection %r', self.name) + return + + else: + log.info('(%s) Stopping connect loop (autoconnect value %r is < 1).', self.name, autoconnect) + return + + def disconnect(self): + """Handle disconnects from the remote server.""" + was_successful = self.connected.is_set() + log.debug('(%s) disconnect: got %s for was_successful state', self.name, was_successful) + + log.debug('(%s) disconnect: Clearing self.connected state.', self.name) + self.connected.clear() + + log.debug('(%s) Removing channel logging handlers due to disconnect.', self.name) + while self.loghandlers: + log.removeHandler(self.loghandlers.pop()) + + try: + log.debug('(%s) disconnect: Shutting down socket.', self.name) + self.socket.shutdown(socket.SHUT_RDWR) + except Exception as e: # Socket timed out during creation; ignore + log.debug('(%s) error on socket shutdown: %s: %s', self.name, type(e).__name__, e) + + self.socket.close() + + # Stop the queue thread. + if self.queue: + # XXX: queue.Queue.queue isn't actually documented, so this is probably not reliable in the long run. + self.queue.queue.appendleft(None) + + # Stop the ping timer. + if self.pingTimer: + log.debug('(%s) Canceling pingTimer at %s due to disconnect() call', self.name, time.time()) + self.pingTimer.cancel() + + log.debug('(%s) disconnect: Setting self.aborted to True.', self.name) + self.aborted.set() + + # Internal hook signifying that a network has disconnected. + self.call_hooks([None, 'PYLINK_DISCONNECT', {'was_successful': was_successful}]) + + log.debug('(%s) disconnect: Clearing state via init_vars().', self.name) + self.init_vars() + + def run(self): + """Main IRC loop which listens for messages.""" + buf = b"" + data = b"" + while not self.aborted.is_set(): + + try: + data = self.socket.recv(2048) + except OSError: + # Suppress socket read warnings from lingering recv() calls if + # we've been told to shutdown. + if self.aborted.is_set(): + return + raise + + buf += data + if not data: + log.error('(%s) No data received, disconnecting!', self.name) + return + elif (time.time() - self.lastping) > self.pingtimeout: + log.error('(%s) Connection timed out.', self.name) + return + + while b'\n' in buf: + line, buf = buf.split(b'\n', 1) + line = line.strip(b'\r') + line = line.decode(self.encoding, "replace") + self.runline(line) + + def runline(self, line): + """Sends a command to the protocol module.""" + log.debug("(%s) <- %s", self.name, line) + try: + hook_args = self.proto.handle_events(line) + except Exception: + log.exception('(%s) Caught error in handle_events, disconnecting!', self.name) + log.error('(%s) The offending line was: <- %s', self.name, line) + self.aborted.set() + return + # Only call our hooks if there's data to process. Handlers that support + # hooks will return a dict of parsed arguments, which can be passed on + # to plugins and the like. For example, the JOIN handler will return + # something like: {'channel': '#whatever', 'users': ['UID1', 'UID2', + # 'UID3']}, etc. + if hook_args is not None: + self.call_hooks(hook_args) + + return hook_args + + + def _send(self, data): + """Sends raw text to the uplink server.""" + # Safeguard against newlines in input!! Otherwise, each line gets + # treated as a separate command, which is particularly nasty. + data = data.replace('\n', ' ') + encoded_data = data.encode(self.encoding, 'replace') + b"\n" + + log.debug("(%s) -> %s", self.name, data) + + try: + self.socket.send(encoded_data) + except (OSError, AttributeError): + log.exception("(%s) Failed to send message %r; did the network disconnect?", self.name, data) + + def send(self, data, queue=True): + """send() wrapper with optional queueing support.""" + if self.aborted.is_set(): + log.debug('(%s) refusing to queue data %r as self.aborted is set', self.name, data) + return + if queue: + # XXX: we don't really know how to handle blocking queues yet, so + # it's better to not expose that yet. + self.queue.put_nowait(data) + else: + self._send(data) + + def process_queue(self): + """Loop to process outgoing queue data.""" + while True: + throttle_time = self.serverdata.get('throttle_time', 0.005) + if not self.aborted.wait(throttle_time): + data = self.queue.get() + if data is None: + log.debug('(%s) Stopping queue thread due to getting None as item', self.name) + break + elif data: + self._send(data) + else: + break + +Irc = PyLinkIRCNetwork + class IrcUser(): """PyLink IRC user class.""" def __init__(self, nick, ts, uid, server, ident='null', host='null',