mirror of
https://github.com/jlu5/PyLink.git
synced 2025-02-03 08:04:07 +01:00
classes: break Irc into three classes: PyLinkNetworkCore, PyLinkNetworkCoreWithUtils, PyLinkIRCNetwork (aliased to Irc)
This commit is contained in:
parent
8dae235b8d
commit
47e36a9249
730
classes.py
730
classes.py
@ -34,7 +34,7 @@ class ProtocolError(RuntimeError):
|
|||||||
|
|
||||||
### Internal classes (users, servers, channels)
|
### Internal classes (users, servers, channels)
|
||||||
|
|
||||||
class Irc(utils.DeprecatedAttributesObject, utils.CamelCaseToSnakeCase):
|
class PyLinkNetworkCore(utils.DeprecatedAttributesObject, utils.CamelCaseToSnakeCase):
|
||||||
"""Base IRC object for PyLink."""
|
"""Base IRC object for PyLink."""
|
||||||
|
|
||||||
def __init__(self, netname, proto, conf):
|
def __init__(self, netname, proto, conf):
|
||||||
@ -75,15 +75,6 @@ class Irc(utils.DeprecatedAttributesObject, utils.CamelCaseToSnakeCase):
|
|||||||
|
|
||||||
self.init_vars()
|
self.init_vars()
|
||||||
|
|
||||||
if world.testing:
|
|
||||||
# HACK: Don't thread if we're running tests.
|
|
||||||
self.connect()
|
|
||||||
else:
|
|
||||||
self.connection_thread = threading.Thread(target=self.connect,
|
|
||||||
name="Listener for %s" %
|
|
||||||
self.name)
|
|
||||||
self.connection_thread.start()
|
|
||||||
|
|
||||||
def log_setup(self):
|
def log_setup(self):
|
||||||
"""
|
"""
|
||||||
Initializes any channel loggers defined for the current network.
|
Initializes any channel loggers defined for the current network.
|
||||||
@ -178,295 +169,12 @@ class Irc(utils.DeprecatedAttributesObject, utils.CamelCaseToSnakeCase):
|
|||||||
# Set up channel logging for the network
|
# Set up channel logging for the network
|
||||||
self.log_setup()
|
self.log_setup()
|
||||||
|
|
||||||
def process_queue(self):
|
'''
|
||||||
"""Loop to process outgoing queue data."""
|
def __repr__(self):
|
||||||
while True:
|
return "<classes.Irc object for %r>" % self.name
|
||||||
throttle_time = self.serverdata.get('throttle_time', 0.005)
|
'''
|
||||||
if not self.aborted.wait(throttle_time):
|
|
||||||
data = self.queue.get()
|
|
||||||
if data is None:
|
|
||||||
log.debug('(%s) Stopping queue thread due to getting None as item', self.name)
|
|
||||||
break
|
|
||||||
elif data:
|
|
||||||
self._send(data)
|
|
||||||
else:
|
|
||||||
break
|
|
||||||
|
|
||||||
def connect(self):
|
|
||||||
"""
|
|
||||||
Runs the connect loop for the IRC object. This is usually called by
|
|
||||||
__init__ in a separate thread to allow multiple concurrent connections.
|
|
||||||
"""
|
|
||||||
while True:
|
|
||||||
|
|
||||||
self.aborted.clear()
|
|
||||||
self.init_vars()
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.proto.validateServerConf()
|
|
||||||
except AssertionError as e:
|
|
||||||
log.exception("(%s) Configuration error: %s", self.name, e)
|
|
||||||
return
|
|
||||||
|
|
||||||
ip = self.serverdata["ip"]
|
|
||||||
port = self.serverdata["port"]
|
|
||||||
checks_ok = True
|
|
||||||
try:
|
|
||||||
# Set the socket type (IPv6 or IPv4).
|
|
||||||
stype = socket.AF_INET6 if self.serverdata.get("ipv6") else socket.AF_INET
|
|
||||||
|
|
||||||
# Creat the socket.
|
|
||||||
self.socket = socket.socket(stype)
|
|
||||||
self.socket.setblocking(0)
|
|
||||||
|
|
||||||
# Set the socket bind if applicable.
|
|
||||||
if 'bindhost' in self.serverdata:
|
|
||||||
self.socket.bind((self.serverdata['bindhost'], 0))
|
|
||||||
|
|
||||||
# Set the connection timeouts. Initial connection timeout is a
|
|
||||||
# lot smaller than the timeout after we've connected; this is
|
|
||||||
# intentional.
|
|
||||||
self.socket.settimeout(self.pingfreq)
|
|
||||||
|
|
||||||
# Resolve hostnames if it's not an IP address already.
|
|
||||||
old_ip = ip
|
|
||||||
ip = socket.getaddrinfo(ip, port, stype)[0][-1][0]
|
|
||||||
log.debug('(%s) Resolving address %s to %s', self.name, old_ip, ip)
|
|
||||||
|
|
||||||
# Enable SSL if set to do so.
|
|
||||||
self.ssl = self.serverdata.get('ssl')
|
|
||||||
if self.ssl:
|
|
||||||
log.info('(%s) Attempting SSL for this connection...', self.name)
|
|
||||||
certfile = self.serverdata.get('ssl_certfile')
|
|
||||||
keyfile = self.serverdata.get('ssl_keyfile')
|
|
||||||
|
|
||||||
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
|
|
||||||
# Disable SSLv2 and SSLv3 - these are insecure
|
|
||||||
context.options |= ssl.OP_NO_SSLv2
|
|
||||||
context.options |= ssl.OP_NO_SSLv3
|
|
||||||
|
|
||||||
# Cert and key files are optional, load them if specified.
|
|
||||||
if certfile and keyfile:
|
|
||||||
try:
|
|
||||||
context.load_cert_chain(certfile, keyfile)
|
|
||||||
except OSError:
|
|
||||||
log.exception('(%s) Caught OSError trying to '
|
|
||||||
'initialize the SSL connection; '
|
|
||||||
'are "ssl_certfile" and '
|
|
||||||
'"ssl_keyfile" set correctly?',
|
|
||||||
self.name)
|
|
||||||
checks_ok = False
|
|
||||||
|
|
||||||
self.socket = context.wrap_socket(self.socket)
|
|
||||||
|
|
||||||
log.info("Connecting to network %r on %s:%s", self.name, ip, port)
|
|
||||||
self.socket.connect((ip, port))
|
|
||||||
self.socket.settimeout(self.pingtimeout)
|
|
||||||
|
|
||||||
# If SSL was enabled, optionally verify the certificate
|
|
||||||
# fingerprint for some added security. I don't bother to check
|
|
||||||
# the entire certificate for validity, since most IRC networks
|
|
||||||
# self-sign their certificates anyways.
|
|
||||||
if self.ssl and checks_ok:
|
|
||||||
peercert = self.socket.getpeercert(binary_form=True)
|
|
||||||
|
|
||||||
# Hash type is configurable using the ssl_fingerprint_type
|
|
||||||
# value, and defaults to sha256.
|
|
||||||
hashtype = self.serverdata.get('ssl_fingerprint_type', 'sha256').lower()
|
|
||||||
|
|
||||||
try:
|
|
||||||
hashfunc = getattr(hashlib, hashtype)
|
|
||||||
except AttributeError:
|
|
||||||
log.error('(%s) Unsupported SSL certificate fingerprint type %r given, disconnecting...',
|
|
||||||
self.name, hashtype)
|
|
||||||
checks_ok = False
|
|
||||||
else:
|
|
||||||
fp = hashfunc(peercert).hexdigest()
|
|
||||||
expected_fp = self.serverdata.get('ssl_fingerprint')
|
|
||||||
|
|
||||||
if expected_fp and checks_ok:
|
|
||||||
if fp != expected_fp:
|
|
||||||
# SSL Fingerprint doesn't match; break.
|
|
||||||
log.error('(%s) Uplink\'s SSL certificate '
|
|
||||||
'fingerprint (%s) does not match the '
|
|
||||||
'one configured: expected %r, got %r; '
|
|
||||||
'disconnecting...', self.name, hashtype,
|
|
||||||
expected_fp, fp)
|
|
||||||
checks_ok = False
|
|
||||||
else:
|
|
||||||
log.info('(%s) Uplink SSL certificate fingerprint '
|
|
||||||
'(%s) verified: %r', self.name, hashtype,
|
|
||||||
fp)
|
|
||||||
else:
|
|
||||||
log.info('(%s) Uplink\'s SSL certificate fingerprint (%s) '
|
|
||||||
'is %r. You can enhance the security of your '
|
|
||||||
'link by specifying this in a "ssl_fingerprint"'
|
|
||||||
' option in your server block.', self.name,
|
|
||||||
hashtype, fp)
|
|
||||||
|
|
||||||
if checks_ok:
|
|
||||||
|
|
||||||
self.queue_thread = threading.Thread(name="Queue thread for %s" % self.name,
|
|
||||||
target=self.process_queue, daemon=True)
|
|
||||||
self.queue_thread.start()
|
|
||||||
|
|
||||||
self.sid = self.serverdata.get("sid")
|
|
||||||
# All our checks passed, get the protocol module to connect and run the listen
|
|
||||||
# loop. This also updates any SID values should the protocol module do so.
|
|
||||||
self.proto.connect()
|
|
||||||
|
|
||||||
log.info('(%s) Enumerating our own SID %s', self.name, self.sid)
|
|
||||||
host = self.hostname()
|
|
||||||
|
|
||||||
self.servers[self.sid] = IrcServer(None, host, internal=True,
|
|
||||||
desc=self.serverdata.get('serverdesc')
|
|
||||||
or conf.conf['bot']['serverdesc'])
|
|
||||||
|
|
||||||
log.info('(%s) Starting ping schedulers....', self.name)
|
|
||||||
self.schedule_ping()
|
|
||||||
log.info('(%s) Server ready; listening for data.', self.name)
|
|
||||||
self.autoconnect_active_multiplier = 1 # Reset any extra autoconnect delays
|
|
||||||
self.run()
|
|
||||||
else: # Configuration error :(
|
|
||||||
log.error('(%s) A configuration error was encountered '
|
|
||||||
'trying to set up this connection. Please check'
|
|
||||||
' your configuration file and try again.',
|
|
||||||
self.name)
|
|
||||||
# self.run() or the protocol module it called raised an exception, meaning we've disconnected!
|
|
||||||
# Note: socket.error, ConnectionError, IOError, etc. are included in OSError since Python 3.3,
|
|
||||||
# so we don't need to explicitly catch them here.
|
|
||||||
# We also catch SystemExit here as a way to abort out connection threads properly, and stop the
|
|
||||||
# IRC connection from freezing instead.
|
|
||||||
except (OSError, RuntimeError, SystemExit) as e:
|
|
||||||
log.exception('(%s) Disconnected from IRC:', self.name)
|
|
||||||
|
|
||||||
self.disconnect()
|
|
||||||
|
|
||||||
# If autoconnect is enabled, loop back to the start. Otherwise,
|
|
||||||
# return and stop.
|
|
||||||
autoconnect = self.serverdata.get('autoconnect')
|
|
||||||
|
|
||||||
# Sets the autoconnect growth multiplier (e.g. a value of 2 multiplies the autoconnect
|
|
||||||
# time by 2 on every failure, etc.)
|
|
||||||
autoconnect_multiplier = self.serverdata.get('autoconnect_multiplier', 2)
|
|
||||||
autoconnect_max = self.serverdata.get('autoconnect_max', 1800)
|
|
||||||
# These values must at least be 1.
|
|
||||||
autoconnect_multiplier = max(autoconnect_multiplier, 1)
|
|
||||||
autoconnect_max = max(autoconnect_max, 1)
|
|
||||||
|
|
||||||
log.debug('(%s) Autoconnect delay set to %s seconds.', self.name, autoconnect)
|
|
||||||
if autoconnect is not None and autoconnect >= 1:
|
|
||||||
log.debug('(%s) Multiplying autoconnect delay %s by %s.', self.name, autoconnect, self.autoconnect_active_multiplier)
|
|
||||||
autoconnect *= self.autoconnect_active_multiplier
|
|
||||||
# Add a cap on the max. autoconnect delay, so that we don't go on forever...
|
|
||||||
autoconnect = min(autoconnect, autoconnect_max)
|
|
||||||
|
|
||||||
log.info('(%s) Going to auto-reconnect in %s seconds.', self.name, autoconnect)
|
|
||||||
# Continue when either self.aborted is set or the autoconnect time passes.
|
|
||||||
# Compared to time.sleep(), this allows us to stop connections quicker if we
|
|
||||||
# break while while for autoconnect.
|
|
||||||
self.aborted.clear()
|
|
||||||
self.aborted.wait(autoconnect)
|
|
||||||
|
|
||||||
# Store in the local state what the autoconnect multiplier currently is.
|
|
||||||
self.autoconnect_active_multiplier *= autoconnect_multiplier
|
|
||||||
|
|
||||||
if self not in world.networkobjects.values():
|
|
||||||
log.debug('Stopping stale connect loop for old connection %r', self.name)
|
|
||||||
return
|
|
||||||
|
|
||||||
else:
|
|
||||||
log.info('(%s) Stopping connect loop (autoconnect value %r is < 1).', self.name, autoconnect)
|
|
||||||
return
|
|
||||||
|
|
||||||
def disconnect(self):
|
|
||||||
"""Handle disconnects from the remote server."""
|
|
||||||
was_successful = self.connected.is_set()
|
|
||||||
log.debug('(%s) disconnect: got %s for was_successful state', self.name, was_successful)
|
|
||||||
|
|
||||||
log.debug('(%s) disconnect: Clearing self.connected state.', self.name)
|
|
||||||
self.connected.clear()
|
|
||||||
|
|
||||||
log.debug('(%s) Removing channel logging handlers due to disconnect.', self.name)
|
|
||||||
while self.loghandlers:
|
|
||||||
log.removeHandler(self.loghandlers.pop())
|
|
||||||
|
|
||||||
try:
|
|
||||||
log.debug('(%s) disconnect: Shutting down socket.', self.name)
|
|
||||||
self.socket.shutdown(socket.SHUT_RDWR)
|
|
||||||
except Exception as e: # Socket timed out during creation; ignore
|
|
||||||
log.debug('(%s) error on socket shutdown: %s: %s', self.name, type(e).__name__, e)
|
|
||||||
|
|
||||||
self.socket.close()
|
|
||||||
|
|
||||||
# Stop the queue thread.
|
|
||||||
if self.queue:
|
|
||||||
# XXX: queue.Queue.queue isn't actually documented, so this is probably not reliable in the long run.
|
|
||||||
self.queue.queue.appendleft(None)
|
|
||||||
|
|
||||||
# Stop the ping timer.
|
|
||||||
if self.pingTimer:
|
|
||||||
log.debug('(%s) Canceling pingTimer at %s due to disconnect() call', self.name, time.time())
|
|
||||||
self.pingTimer.cancel()
|
|
||||||
|
|
||||||
log.debug('(%s) disconnect: Setting self.aborted to True.', self.name)
|
|
||||||
self.aborted.set()
|
|
||||||
|
|
||||||
# Internal hook signifying that a network has disconnected.
|
|
||||||
self.call_hooks([None, 'PYLINK_DISCONNECT', {'was_successful': was_successful}])
|
|
||||||
|
|
||||||
log.debug('(%s) disconnect: Clearing state via init_vars().', self.name)
|
|
||||||
self.init_vars()
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
"""Main IRC loop which listens for messages."""
|
|
||||||
buf = b""
|
|
||||||
data = b""
|
|
||||||
while not self.aborted.is_set():
|
|
||||||
|
|
||||||
try:
|
|
||||||
data = self.socket.recv(2048)
|
|
||||||
except OSError:
|
|
||||||
# Suppress socket read warnings from lingering recv() calls if
|
|
||||||
# we've been told to shutdown.
|
|
||||||
if self.aborted.is_set():
|
|
||||||
return
|
|
||||||
raise
|
|
||||||
|
|
||||||
buf += data
|
|
||||||
if not data:
|
|
||||||
log.error('(%s) No data received, disconnecting!', self.name)
|
|
||||||
return
|
|
||||||
elif (time.time() - self.lastping) > self.pingtimeout:
|
|
||||||
log.error('(%s) Connection timed out.', self.name)
|
|
||||||
return
|
|
||||||
|
|
||||||
while b'\n' in buf:
|
|
||||||
line, buf = buf.split(b'\n', 1)
|
|
||||||
line = line.strip(b'\r')
|
|
||||||
line = line.decode(self.encoding, "replace")
|
|
||||||
self.runline(line)
|
|
||||||
|
|
||||||
def runline(self, line):
|
|
||||||
"""Sends a command to the protocol module."""
|
|
||||||
log.debug("(%s) <- %s", self.name, line)
|
|
||||||
try:
|
|
||||||
hook_args = self.proto.handle_events(line)
|
|
||||||
except Exception:
|
|
||||||
log.exception('(%s) Caught error in handle_events, disconnecting!', self.name)
|
|
||||||
log.error('(%s) The offending line was: <- %s', self.name, line)
|
|
||||||
self.aborted.set()
|
|
||||||
return
|
|
||||||
# Only call our hooks if there's data to process. Handlers that support
|
|
||||||
# hooks will return a dict of parsed arguments, which can be passed on
|
|
||||||
# to plugins and the like. For example, the JOIN handler will return
|
|
||||||
# something like: {'channel': '#whatever', 'users': ['UID1', 'UID2',
|
|
||||||
# 'UID3']}, etc.
|
|
||||||
if hook_args is not None:
|
|
||||||
self.call_hooks(hook_args)
|
|
||||||
|
|
||||||
return hook_args
|
|
||||||
|
|
||||||
|
### General utility functions
|
||||||
def call_hooks(self, hook_args):
|
def call_hooks(self, hook_args):
|
||||||
"""Calls a hook function with the given hook args."""
|
"""Calls a hook function with the given hook args."""
|
||||||
numeric, command, parsed_args = hook_args
|
numeric, command, parsed_args = hook_args
|
||||||
@ -506,47 +214,6 @@ class Irc(utils.DeprecatedAttributesObject, utils.CamelCaseToSnakeCase):
|
|||||||
hook_args)
|
hook_args)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
def _send(self, data):
|
|
||||||
"""Sends raw text to the uplink server."""
|
|
||||||
# Safeguard against newlines in input!! Otherwise, each line gets
|
|
||||||
# treated as a separate command, which is particularly nasty.
|
|
||||||
data = data.replace('\n', ' ')
|
|
||||||
encoded_data = data.encode(self.encoding, 'replace') + b"\n"
|
|
||||||
|
|
||||||
log.debug("(%s) -> %s", self.name, data)
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.socket.send(encoded_data)
|
|
||||||
except (OSError, AttributeError):
|
|
||||||
log.exception("(%s) Failed to send message %r; did the network disconnect?", self.name, data)
|
|
||||||
|
|
||||||
def send(self, data, queue=True):
|
|
||||||
"""send() wrapper with optional queueing support."""
|
|
||||||
if self.aborted.is_set():
|
|
||||||
log.debug('(%s) refusing to queue data %r as self.aborted is set', self.name, data)
|
|
||||||
return
|
|
||||||
if queue:
|
|
||||||
# XXX: we don't really know how to handle blocking queues yet, so
|
|
||||||
# it's better to not expose that yet.
|
|
||||||
self.queue.put_nowait(data)
|
|
||||||
else:
|
|
||||||
self._send(data)
|
|
||||||
|
|
||||||
def schedule_ping(self):
|
|
||||||
"""Schedules periodic pings in a loop."""
|
|
||||||
self.proto.ping()
|
|
||||||
|
|
||||||
self.pingTimer = threading.Timer(self.pingfreq, self.schedule_ping)
|
|
||||||
self.pingTimer.daemon = True
|
|
||||||
self.pingTimer.name = 'Ping timer loop for %s' % self.name
|
|
||||||
self.pingTimer.start()
|
|
||||||
|
|
||||||
log.debug('(%s) Ping scheduled at %s', self.name, time.time())
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<classes.Irc object for %r>" % self.name
|
|
||||||
|
|
||||||
### General utility functions
|
|
||||||
def call_command(self, source, text):
|
def call_command(self, source, text):
|
||||||
"""
|
"""
|
||||||
Calls a PyLink bot command. source is the caller's UID, and text is the
|
Calls a PyLink bot command. source is the caller's UID, and text is the
|
||||||
@ -615,6 +282,29 @@ class Irc(utils.DeprecatedAttributesObject, utils.CamelCaseToSnakeCase):
|
|||||||
# This is a stub to alias error to reply
|
# This is a stub to alias error to reply
|
||||||
self.reply("Error: %s" % text, **kwargs)
|
self.reply("Error: %s" % text, **kwargs)
|
||||||
|
|
||||||
|
### Configuration-based lookup functions.
|
||||||
|
def version(self):
|
||||||
|
"""
|
||||||
|
Returns a detailed version string including the PyLink daemon version,
|
||||||
|
the protocol module in use, and the server hostname.
|
||||||
|
"""
|
||||||
|
fullversion = 'PyLink-%s. %s :[protocol:%s, encoding:%s]' % (__version__, self.hostname(), self.protoname, self.encoding)
|
||||||
|
return fullversion
|
||||||
|
|
||||||
|
def hostname(self):
|
||||||
|
"""
|
||||||
|
Returns the server hostname used by PyLink on the given server.
|
||||||
|
"""
|
||||||
|
return self.serverdata.get('hostname', world.fallback_hostname)
|
||||||
|
|
||||||
|
def get_full_network_name(self):
|
||||||
|
"""
|
||||||
|
Returns the full network name (as defined by the "netname" option), or the
|
||||||
|
short network name if that isn't defined.
|
||||||
|
"""
|
||||||
|
return self.serverdata.get('netname', self.name)
|
||||||
|
|
||||||
|
class PyLinkNetworkCoreWithUtils(PyLinkNetworkCore):
|
||||||
def to_lower(self, text):
|
def to_lower(self, text):
|
||||||
"""Returns a lowercase representation of text based on the IRC object's
|
"""Returns a lowercase representation of text based on the IRC object's
|
||||||
casemapping (rfc1459 or ascii)."""
|
casemapping (rfc1459 or ascii)."""
|
||||||
@ -1008,20 +698,6 @@ class Irc(utils.DeprecatedAttributesObject, utils.CamelCaseToSnakeCase):
|
|||||||
log.debug('wrap_modes: returning %s for %s', strings, orig_modes)
|
log.debug('wrap_modes: returning %s for %s', strings, orig_modes)
|
||||||
return strings
|
return strings
|
||||||
|
|
||||||
def version(self):
|
|
||||||
"""
|
|
||||||
Returns a detailed version string including the PyLink daemon version,
|
|
||||||
the protocol module in use, and the server hostname.
|
|
||||||
"""
|
|
||||||
fullversion = 'PyLink-%s. %s :[protocol:%s, encoding:%s]' % (__version__, self.hostname(), self.protoname, self.encoding)
|
|
||||||
return fullversion
|
|
||||||
|
|
||||||
def hostname(self):
|
|
||||||
"""
|
|
||||||
Returns the server hostname used by PyLink on the given server.
|
|
||||||
"""
|
|
||||||
return self.serverdata.get('hostname', world.fallback_hostname)
|
|
||||||
|
|
||||||
### State checking functions
|
### State checking functions
|
||||||
def nick_to_uid(self, nick):
|
def nick_to_uid(self, nick):
|
||||||
"""Looks up the UID of a user with the given nick, if one is present."""
|
"""Looks up the UID of a user with the given nick, if one is present."""
|
||||||
@ -1119,13 +795,6 @@ class Irc(utils.DeprecatedAttributesObject, utils.CamelCaseToSnakeCase):
|
|||||||
else:
|
else:
|
||||||
raise KeyError("Unknown UID/SID %s" % entityid)
|
raise KeyError("Unknown UID/SID %s" % entityid)
|
||||||
|
|
||||||
def get_full_network_name(self):
|
|
||||||
"""
|
|
||||||
Returns the full network name (as defined by the "netname" option), or the
|
|
||||||
short network name if that isn't defined.
|
|
||||||
"""
|
|
||||||
return self.serverdata.get('netname', self.name)
|
|
||||||
|
|
||||||
def is_oper(self, uid, allowAuthed=True, allowOper=True):
|
def is_oper(self, uid, allowAuthed=True, allowOper=True):
|
||||||
"""
|
"""
|
||||||
Returns whether the given user has operator status on PyLink. This can be achieved
|
Returns whether the given user has operator status on PyLink. This can be achieved
|
||||||
@ -1250,6 +919,347 @@ class Irc(utils.DeprecatedAttributesObject, utils.CamelCaseToSnakeCase):
|
|||||||
result = not result
|
result = not result
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
class PyLinkIRCNetwork(PyLinkNetworkCoreWithUtils):
|
||||||
|
def __init__(self, *args):
|
||||||
|
super().__init__(*args)
|
||||||
|
if world.testing:
|
||||||
|
# HACK: Don't thread if we're running tests.
|
||||||
|
self.connect()
|
||||||
|
else:
|
||||||
|
self.connection_thread = threading.Thread(target=self.connect,
|
||||||
|
name="Listener for %s" %
|
||||||
|
self.name)
|
||||||
|
self.connection_thread.start()
|
||||||
|
|
||||||
|
def schedule_ping(self):
|
||||||
|
"""Schedules periodic pings in a loop."""
|
||||||
|
self.proto.ping()
|
||||||
|
|
||||||
|
self.pingTimer = threading.Timer(self.pingfreq, self.schedule_ping)
|
||||||
|
self.pingTimer.daemon = True
|
||||||
|
self.pingTimer.name = 'Ping timer loop for %s' % self.name
|
||||||
|
self.pingTimer.start()
|
||||||
|
|
||||||
|
log.debug('(%s) Ping scheduled at %s', self.name, time.time())
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
"""
|
||||||
|
Runs the connect loop for the IRC object. This is usually called by
|
||||||
|
__init__ in a separate thread to allow multiple concurrent connections.
|
||||||
|
"""
|
||||||
|
while True:
|
||||||
|
|
||||||
|
self.aborted.clear()
|
||||||
|
self.init_vars()
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.proto.validateServerConf()
|
||||||
|
except AssertionError as e:
|
||||||
|
log.exception("(%s) Configuration error: %s", self.name, e)
|
||||||
|
return
|
||||||
|
|
||||||
|
ip = self.serverdata["ip"]
|
||||||
|
port = self.serverdata["port"]
|
||||||
|
checks_ok = True
|
||||||
|
try:
|
||||||
|
# Set the socket type (IPv6 or IPv4).
|
||||||
|
stype = socket.AF_INET6 if self.serverdata.get("ipv6") else socket.AF_INET
|
||||||
|
|
||||||
|
# Creat the socket.
|
||||||
|
self.socket = socket.socket(stype)
|
||||||
|
self.socket.setblocking(0)
|
||||||
|
|
||||||
|
# Set the socket bind if applicable.
|
||||||
|
if 'bindhost' in self.serverdata:
|
||||||
|
self.socket.bind((self.serverdata['bindhost'], 0))
|
||||||
|
|
||||||
|
# Set the connection timeouts. Initial connection timeout is a
|
||||||
|
# lot smaller than the timeout after we've connected; this is
|
||||||
|
# intentional.
|
||||||
|
self.socket.settimeout(self.pingfreq)
|
||||||
|
|
||||||
|
# Resolve hostnames if it's not an IP address already.
|
||||||
|
old_ip = ip
|
||||||
|
ip = socket.getaddrinfo(ip, port, stype)[0][-1][0]
|
||||||
|
log.debug('(%s) Resolving address %s to %s', self.name, old_ip, ip)
|
||||||
|
|
||||||
|
# Enable SSL if set to do so.
|
||||||
|
self.ssl = self.serverdata.get('ssl')
|
||||||
|
if self.ssl:
|
||||||
|
log.info('(%s) Attempting SSL for this connection...', self.name)
|
||||||
|
certfile = self.serverdata.get('ssl_certfile')
|
||||||
|
keyfile = self.serverdata.get('ssl_keyfile')
|
||||||
|
|
||||||
|
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
|
||||||
|
# Disable SSLv2 and SSLv3 - these are insecure
|
||||||
|
context.options |= ssl.OP_NO_SSLv2
|
||||||
|
context.options |= ssl.OP_NO_SSLv3
|
||||||
|
|
||||||
|
# Cert and key files are optional, load them if specified.
|
||||||
|
if certfile and keyfile:
|
||||||
|
try:
|
||||||
|
context.load_cert_chain(certfile, keyfile)
|
||||||
|
except OSError:
|
||||||
|
log.exception('(%s) Caught OSError trying to '
|
||||||
|
'initialize the SSL connection; '
|
||||||
|
'are "ssl_certfile" and '
|
||||||
|
'"ssl_keyfile" set correctly?',
|
||||||
|
self.name)
|
||||||
|
checks_ok = False
|
||||||
|
|
||||||
|
self.socket = context.wrap_socket(self.socket)
|
||||||
|
|
||||||
|
log.info("Connecting to network %r on %s:%s", self.name, ip, port)
|
||||||
|
self.socket.connect((ip, port))
|
||||||
|
self.socket.settimeout(self.pingtimeout)
|
||||||
|
|
||||||
|
# If SSL was enabled, optionally verify the certificate
|
||||||
|
# fingerprint for some added security. I don't bother to check
|
||||||
|
# the entire certificate for validity, since most IRC networks
|
||||||
|
# self-sign their certificates anyways.
|
||||||
|
if self.ssl and checks_ok:
|
||||||
|
peercert = self.socket.getpeercert(binary_form=True)
|
||||||
|
|
||||||
|
# Hash type is configurable using the ssl_fingerprint_type
|
||||||
|
# value, and defaults to sha256.
|
||||||
|
hashtype = self.serverdata.get('ssl_fingerprint_type', 'sha256').lower()
|
||||||
|
|
||||||
|
try:
|
||||||
|
hashfunc = getattr(hashlib, hashtype)
|
||||||
|
except AttributeError:
|
||||||
|
log.error('(%s) Unsupported SSL certificate fingerprint type %r given, disconnecting...',
|
||||||
|
self.name, hashtype)
|
||||||
|
checks_ok = False
|
||||||
|
else:
|
||||||
|
fp = hashfunc(peercert).hexdigest()
|
||||||
|
expected_fp = self.serverdata.get('ssl_fingerprint')
|
||||||
|
|
||||||
|
if expected_fp and checks_ok:
|
||||||
|
if fp != expected_fp:
|
||||||
|
# SSL Fingerprint doesn't match; break.
|
||||||
|
log.error('(%s) Uplink\'s SSL certificate '
|
||||||
|
'fingerprint (%s) does not match the '
|
||||||
|
'one configured: expected %r, got %r; '
|
||||||
|
'disconnecting...', self.name, hashtype,
|
||||||
|
expected_fp, fp)
|
||||||
|
checks_ok = False
|
||||||
|
else:
|
||||||
|
log.info('(%s) Uplink SSL certificate fingerprint '
|
||||||
|
'(%s) verified: %r', self.name, hashtype,
|
||||||
|
fp)
|
||||||
|
else:
|
||||||
|
log.info('(%s) Uplink\'s SSL certificate fingerprint (%s) '
|
||||||
|
'is %r. You can enhance the security of your '
|
||||||
|
'link by specifying this in a "ssl_fingerprint"'
|
||||||
|
' option in your server block.', self.name,
|
||||||
|
hashtype, fp)
|
||||||
|
|
||||||
|
if checks_ok:
|
||||||
|
|
||||||
|
self.queue_thread = threading.Thread(name="Queue thread for %s" % self.name,
|
||||||
|
target=self.process_queue, daemon=True)
|
||||||
|
self.queue_thread.start()
|
||||||
|
|
||||||
|
self.sid = self.serverdata.get("sid")
|
||||||
|
# All our checks passed, get the protocol module to connect and run the listen
|
||||||
|
# loop. This also updates any SID values should the protocol module do so.
|
||||||
|
self.proto.connect()
|
||||||
|
|
||||||
|
log.info('(%s) Enumerating our own SID %s', self.name, self.sid)
|
||||||
|
host = self.hostname()
|
||||||
|
|
||||||
|
self.servers[self.sid] = IrcServer(None, host, internal=True,
|
||||||
|
desc=self.serverdata.get('serverdesc')
|
||||||
|
or conf.conf['bot']['serverdesc'])
|
||||||
|
|
||||||
|
log.info('(%s) Starting ping schedulers....', self.name)
|
||||||
|
self.schedule_ping()
|
||||||
|
log.info('(%s) Server ready; listening for data.', self.name)
|
||||||
|
self.autoconnect_active_multiplier = 1 # Reset any extra autoconnect delays
|
||||||
|
self.run()
|
||||||
|
else: # Configuration error :(
|
||||||
|
log.error('(%s) A configuration error was encountered '
|
||||||
|
'trying to set up this connection. Please check'
|
||||||
|
' your configuration file and try again.',
|
||||||
|
self.name)
|
||||||
|
# self.run() or the protocol module it called raised an exception, meaning we've disconnected!
|
||||||
|
# Note: socket.error, ConnectionError, IOError, etc. are included in OSError since Python 3.3,
|
||||||
|
# so we don't need to explicitly catch them here.
|
||||||
|
# We also catch SystemExit here as a way to abort out connection threads properly, and stop the
|
||||||
|
# IRC connection from freezing instead.
|
||||||
|
except (OSError, RuntimeError, SystemExit) as e:
|
||||||
|
log.exception('(%s) Disconnected from IRC:', self.name)
|
||||||
|
|
||||||
|
self.disconnect()
|
||||||
|
|
||||||
|
# If autoconnect is enabled, loop back to the start. Otherwise,
|
||||||
|
# return and stop.
|
||||||
|
autoconnect = self.serverdata.get('autoconnect')
|
||||||
|
|
||||||
|
# Sets the autoconnect growth multiplier (e.g. a value of 2 multiplies the autoconnect
|
||||||
|
# time by 2 on every failure, etc.)
|
||||||
|
autoconnect_multiplier = self.serverdata.get('autoconnect_multiplier', 2)
|
||||||
|
autoconnect_max = self.serverdata.get('autoconnect_max', 1800)
|
||||||
|
# These values must at least be 1.
|
||||||
|
autoconnect_multiplier = max(autoconnect_multiplier, 1)
|
||||||
|
autoconnect_max = max(autoconnect_max, 1)
|
||||||
|
|
||||||
|
log.debug('(%s) Autoconnect delay set to %s seconds.', self.name, autoconnect)
|
||||||
|
if autoconnect is not None and autoconnect >= 1:
|
||||||
|
log.debug('(%s) Multiplying autoconnect delay %s by %s.', self.name, autoconnect, self.autoconnect_active_multiplier)
|
||||||
|
autoconnect *= self.autoconnect_active_multiplier
|
||||||
|
# Add a cap on the max. autoconnect delay, so that we don't go on forever...
|
||||||
|
autoconnect = min(autoconnect, autoconnect_max)
|
||||||
|
|
||||||
|
log.info('(%s) Going to auto-reconnect in %s seconds.', self.name, autoconnect)
|
||||||
|
# Continue when either self.aborted is set or the autoconnect time passes.
|
||||||
|
# Compared to time.sleep(), this allows us to stop connections quicker if we
|
||||||
|
# break while while for autoconnect.
|
||||||
|
self.aborted.clear()
|
||||||
|
self.aborted.wait(autoconnect)
|
||||||
|
|
||||||
|
# Store in the local state what the autoconnect multiplier currently is.
|
||||||
|
self.autoconnect_active_multiplier *= autoconnect_multiplier
|
||||||
|
|
||||||
|
if self not in world.networkobjects.values():
|
||||||
|
log.debug('Stopping stale connect loop for old connection %r', self.name)
|
||||||
|
return
|
||||||
|
|
||||||
|
else:
|
||||||
|
log.info('(%s) Stopping connect loop (autoconnect value %r is < 1).', self.name, autoconnect)
|
||||||
|
return
|
||||||
|
|
||||||
|
def disconnect(self):
|
||||||
|
"""Handle disconnects from the remote server."""
|
||||||
|
was_successful = self.connected.is_set()
|
||||||
|
log.debug('(%s) disconnect: got %s for was_successful state', self.name, was_successful)
|
||||||
|
|
||||||
|
log.debug('(%s) disconnect: Clearing self.connected state.', self.name)
|
||||||
|
self.connected.clear()
|
||||||
|
|
||||||
|
log.debug('(%s) Removing channel logging handlers due to disconnect.', self.name)
|
||||||
|
while self.loghandlers:
|
||||||
|
log.removeHandler(self.loghandlers.pop())
|
||||||
|
|
||||||
|
try:
|
||||||
|
log.debug('(%s) disconnect: Shutting down socket.', self.name)
|
||||||
|
self.socket.shutdown(socket.SHUT_RDWR)
|
||||||
|
except Exception as e: # Socket timed out during creation; ignore
|
||||||
|
log.debug('(%s) error on socket shutdown: %s: %s', self.name, type(e).__name__, e)
|
||||||
|
|
||||||
|
self.socket.close()
|
||||||
|
|
||||||
|
# Stop the queue thread.
|
||||||
|
if self.queue:
|
||||||
|
# XXX: queue.Queue.queue isn't actually documented, so this is probably not reliable in the long run.
|
||||||
|
self.queue.queue.appendleft(None)
|
||||||
|
|
||||||
|
# Stop the ping timer.
|
||||||
|
if self.pingTimer:
|
||||||
|
log.debug('(%s) Canceling pingTimer at %s due to disconnect() call', self.name, time.time())
|
||||||
|
self.pingTimer.cancel()
|
||||||
|
|
||||||
|
log.debug('(%s) disconnect: Setting self.aborted to True.', self.name)
|
||||||
|
self.aborted.set()
|
||||||
|
|
||||||
|
# Internal hook signifying that a network has disconnected.
|
||||||
|
self.call_hooks([None, 'PYLINK_DISCONNECT', {'was_successful': was_successful}])
|
||||||
|
|
||||||
|
log.debug('(%s) disconnect: Clearing state via init_vars().', self.name)
|
||||||
|
self.init_vars()
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
"""Main IRC loop which listens for messages."""
|
||||||
|
buf = b""
|
||||||
|
data = b""
|
||||||
|
while not self.aborted.is_set():
|
||||||
|
|
||||||
|
try:
|
||||||
|
data = self.socket.recv(2048)
|
||||||
|
except OSError:
|
||||||
|
# Suppress socket read warnings from lingering recv() calls if
|
||||||
|
# we've been told to shutdown.
|
||||||
|
if self.aborted.is_set():
|
||||||
|
return
|
||||||
|
raise
|
||||||
|
|
||||||
|
buf += data
|
||||||
|
if not data:
|
||||||
|
log.error('(%s) No data received, disconnecting!', self.name)
|
||||||
|
return
|
||||||
|
elif (time.time() - self.lastping) > self.pingtimeout:
|
||||||
|
log.error('(%s) Connection timed out.', self.name)
|
||||||
|
return
|
||||||
|
|
||||||
|
while b'\n' in buf:
|
||||||
|
line, buf = buf.split(b'\n', 1)
|
||||||
|
line = line.strip(b'\r')
|
||||||
|
line = line.decode(self.encoding, "replace")
|
||||||
|
self.runline(line)
|
||||||
|
|
||||||
|
def runline(self, line):
|
||||||
|
"""Sends a command to the protocol module."""
|
||||||
|
log.debug("(%s) <- %s", self.name, line)
|
||||||
|
try:
|
||||||
|
hook_args = self.proto.handle_events(line)
|
||||||
|
except Exception:
|
||||||
|
log.exception('(%s) Caught error in handle_events, disconnecting!', self.name)
|
||||||
|
log.error('(%s) The offending line was: <- %s', self.name, line)
|
||||||
|
self.aborted.set()
|
||||||
|
return
|
||||||
|
# Only call our hooks if there's data to process. Handlers that support
|
||||||
|
# hooks will return a dict of parsed arguments, which can be passed on
|
||||||
|
# to plugins and the like. For example, the JOIN handler will return
|
||||||
|
# something like: {'channel': '#whatever', 'users': ['UID1', 'UID2',
|
||||||
|
# 'UID3']}, etc.
|
||||||
|
if hook_args is not None:
|
||||||
|
self.call_hooks(hook_args)
|
||||||
|
|
||||||
|
return hook_args
|
||||||
|
|
||||||
|
|
||||||
|
def _send(self, data):
|
||||||
|
"""Sends raw text to the uplink server."""
|
||||||
|
# Safeguard against newlines in input!! Otherwise, each line gets
|
||||||
|
# treated as a separate command, which is particularly nasty.
|
||||||
|
data = data.replace('\n', ' ')
|
||||||
|
encoded_data = data.encode(self.encoding, 'replace') + b"\n"
|
||||||
|
|
||||||
|
log.debug("(%s) -> %s", self.name, data)
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.socket.send(encoded_data)
|
||||||
|
except (OSError, AttributeError):
|
||||||
|
log.exception("(%s) Failed to send message %r; did the network disconnect?", self.name, data)
|
||||||
|
|
||||||
|
def send(self, data, queue=True):
|
||||||
|
"""send() wrapper with optional queueing support."""
|
||||||
|
if self.aborted.is_set():
|
||||||
|
log.debug('(%s) refusing to queue data %r as self.aborted is set', self.name, data)
|
||||||
|
return
|
||||||
|
if queue:
|
||||||
|
# XXX: we don't really know how to handle blocking queues yet, so
|
||||||
|
# it's better to not expose that yet.
|
||||||
|
self.queue.put_nowait(data)
|
||||||
|
else:
|
||||||
|
self._send(data)
|
||||||
|
|
||||||
|
def process_queue(self):
|
||||||
|
"""Loop to process outgoing queue data."""
|
||||||
|
while True:
|
||||||
|
throttle_time = self.serverdata.get('throttle_time', 0.005)
|
||||||
|
if not self.aborted.wait(throttle_time):
|
||||||
|
data = self.queue.get()
|
||||||
|
if data is None:
|
||||||
|
log.debug('(%s) Stopping queue thread due to getting None as item', self.name)
|
||||||
|
break
|
||||||
|
elif data:
|
||||||
|
self._send(data)
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
|
Irc = PyLinkIRCNetwork
|
||||||
|
|
||||||
class IrcUser():
|
class IrcUser():
|
||||||
"""PyLink IRC user class."""
|
"""PyLink IRC user class."""
|
||||||
def __init__(self, nick, ts, uid, server, ident='null', host='null',
|
def __init__(self, nick, ts, uid, server, ident='null', host='null',
|
||||||
|
Loading…
Reference in New Issue
Block a user