mirror of
https://github.com/jlu5/PyLink.git
synced 2024-11-01 09:19:23 +01:00
03766b9f89
Closes #307.
456 lines
17 KiB
Python
456 lines
17 KiB
Python
"""
|
|
utils.py - PyLink utilities module.
|
|
|
|
This module contains various utility functions related to IRC and/or the PyLink
|
|
framework.
|
|
"""
|
|
|
|
import string
|
|
import re
|
|
import importlib
|
|
import os
|
|
import collections
|
|
|
|
from .log import log
|
|
from . import world, conf
|
|
# This is just so protocols and plugins are importable.
|
|
from pylinkirc import protocols, plugins
|
|
|
|
NORMALIZEWHITESPACE_RE = re.compile(r'\s+')
|
|
|
|
class NotAuthorizedError(Exception):
|
|
"""
|
|
Exception raised by checkAuthenticated() when a user fails authentication
|
|
requirements.
|
|
"""
|
|
pass
|
|
|
|
class IncrementalUIDGenerator():
|
|
"""
|
|
Incremental UID Generator module, adapted from InspIRCd source:
|
|
https://github.com/inspircd/inspircd/blob/f449c6b296ab/src/server.cpp#L85-L156
|
|
"""
|
|
|
|
def __init__(self, sid):
|
|
if not (hasattr(self, 'allowedchars') and hasattr(self, 'length')):
|
|
raise RuntimeError("Allowed characters list not defined. Subclass "
|
|
"%s by defining self.allowedchars and self.length "
|
|
"and then calling super().__init__()." % self.__class__.__name__)
|
|
self.uidchars = [self.allowedchars[0]]*self.length
|
|
self.sid = str(sid)
|
|
|
|
def increment(self, pos=None):
|
|
"""
|
|
Increments the UID generator to the next available UID.
|
|
"""
|
|
# Position starts at 1 less than the UID length.
|
|
if pos is None:
|
|
pos = self.length - 1
|
|
|
|
# If we're at the last character in the list of allowed ones, reset
|
|
# and increment the next level above.
|
|
if self.uidchars[pos] == self.allowedchars[-1]:
|
|
self.uidchars[pos] = self.allowedchars[0]
|
|
self.increment(pos-1)
|
|
else:
|
|
# Find what position in the allowed characters list we're currently
|
|
# on, and add one.
|
|
idx = self.allowedchars.find(self.uidchars[pos])
|
|
self.uidchars[pos] = self.allowedchars[idx+1]
|
|
|
|
def next_uid(self):
|
|
"""
|
|
Returns the next unused UID for the server.
|
|
"""
|
|
uid = self.sid + ''.join(self.uidchars)
|
|
self.increment()
|
|
return uid
|
|
|
|
class PUIDGenerator():
|
|
"""
|
|
Pseudo UID Generator module, using a prefix and a simple counter.
|
|
"""
|
|
|
|
def __init__(self, prefix):
|
|
self.prefix = prefix
|
|
self.counter = 0
|
|
|
|
def next_uid(self, prefix=''):
|
|
"""
|
|
Generates the next PUID.
|
|
"""
|
|
uid = '%s@%s' % (prefix or self.prefix, self.counter)
|
|
self.counter += 1
|
|
return uid
|
|
next_sid = next_uid
|
|
|
|
def add_cmd(func, name=None, **kwargs):
|
|
"""Binds an IRC command function to the given command name."""
|
|
world.services['pylink'].add_cmd(func, name=name, **kwargs)
|
|
return func
|
|
|
|
def add_hook(func, command):
|
|
"""Binds a hook function to the given command name."""
|
|
command = command.upper()
|
|
world.hooks[command].append(func)
|
|
return func
|
|
|
|
_nickregex = r'^[A-Za-z\|\\_\[\]\{\}\^\`][A-Z0-9a-z\-\|\\_\[\]\{\}\^\`]*$'
|
|
def isNick(s, nicklen=None):
|
|
"""Returns whether the string given is a valid nick."""
|
|
if nicklen and len(s) > nicklen:
|
|
return False
|
|
return bool(re.match(_nickregex, s))
|
|
|
|
def isChannel(s):
|
|
"""Returns whether the string given is a valid channel name."""
|
|
return str(s).startswith('#')
|
|
|
|
def _isASCII(s):
|
|
"""Returns whether the string given is valid ASCII."""
|
|
chars = string.ascii_letters + string.digits + string.punctuation
|
|
return all(char in chars for char in s)
|
|
|
|
def isServerName(s):
|
|
"""Returns whether the string given is a valid IRC server name."""
|
|
return _isASCII(s) and '.' in s and not s.startswith('.')
|
|
|
|
hostmaskRe = re.compile(r'^\S+!\S+@\S+$')
|
|
def isHostmask(text):
|
|
"""Returns whether the given text is a valid hostmask."""
|
|
# Band-aid patch here to prevent bad bans set by Janus forwarding people into invalid channels.
|
|
return hostmaskRe.match(text) and '#' not in text
|
|
|
|
def parseModes(irc, target, args):
|
|
"""Parses a modestring list into a list of (mode, argument) tuples.
|
|
['+mitl-o', '3', 'person'] => [('+m', None), ('+i', None), ('+t', None), ('+l', '3'), ('-o', 'person')]
|
|
|
|
This method is deprecated. Use irc.parseModes() instead.
|
|
"""
|
|
log.warning("(%s) utils.parseModes is deprecated. Use irc.parseModes() instead!", irc.name)
|
|
return irc.parseModes(target, args)
|
|
|
|
def applyModes(irc, target, changedmodes):
|
|
"""Takes a list of parsed IRC modes, and applies them on the given target.
|
|
|
|
The target can be either a channel or a user; this is handled automatically.
|
|
|
|
This method is deprecated. Use irc.applyModes() instead.
|
|
"""
|
|
log.warning("(%s) utils.applyModes is deprecated. Use irc.applyModes() instead!", irc.name)
|
|
return irc.applyModes(target, changedmodes)
|
|
|
|
def loadPlugin(name):
|
|
"""
|
|
Imports and returns the requested plugin.
|
|
"""
|
|
return importlib.import_module('pylinkirc.plugins.' + name)
|
|
|
|
def getProtocolModule(name):
|
|
"""
|
|
Imports and returns the protocol module requested.
|
|
"""
|
|
return importlib.import_module('pylinkirc.protocols.' + name)
|
|
|
|
def getDatabaseName(dbname):
|
|
"""
|
|
Returns a database filename with the given base DB name appropriate for the
|
|
current PyLink instance.
|
|
|
|
This returns '<dbname>.db' if the running config name is PyLink's default
|
|
(pylink.yml), and '<dbname>-<config name>.db' for anything else. For example,
|
|
if this is called from an instance running as './pylink testing.yml', it
|
|
would return '<dbname>-testing.db'."""
|
|
if conf.confname != 'pylink':
|
|
dbname += '-%s' % conf.confname
|
|
dbname += '.db'
|
|
return dbname
|
|
|
|
def splitHostmask(mask):
|
|
"""
|
|
Returns a nick!user@host hostmask split into three fields: nick, user, and host.
|
|
"""
|
|
nick, identhost = mask.split('!', 1)
|
|
ident, host = identhost.split('@', 1)
|
|
return [nick, ident, host]
|
|
|
|
class ServiceBot():
|
|
"""
|
|
PyLink IRC Service class.
|
|
"""
|
|
|
|
def __init__(self, name, default_help=True, default_list=True,
|
|
nick=None, ident=None, manipulatable=False, extra_channels=None,
|
|
desc=None):
|
|
# Service name
|
|
self.name = name
|
|
|
|
# Nick/ident to take. Defaults to the same as the service name if not given.
|
|
self.nick = nick
|
|
self.ident = ident
|
|
|
|
# Tracks whether the bot should be manipulatable by the 'bots' plugin and other commands.
|
|
self.manipulatable = manipulatable
|
|
|
|
# We make the command definitions a dict of lists of functions. Multiple
|
|
# plugins are actually allowed to bind to one function name; this just causes
|
|
# them to be called in the order that they are bound.
|
|
self.commands = collections.defaultdict(list)
|
|
|
|
# This tracks the UIDs of the service bot on different networks, as they are
|
|
# spawned.
|
|
self.uids = {}
|
|
|
|
# Track what channels other than those defined in the config
|
|
# that the bot should join by default.
|
|
self.extra_channels = extra_channels or collections.defaultdict(set)
|
|
|
|
# Service description, used in the default help command if one is given.
|
|
self.desc = desc
|
|
|
|
# List of command names to "feature"
|
|
self.featured_cmds = set()
|
|
|
|
if default_help:
|
|
self.add_cmd(self.help)
|
|
|
|
if default_list:
|
|
self.add_cmd(self.listcommands, 'list')
|
|
|
|
def spawn(self, irc=None):
|
|
"""
|
|
Spawns instances of this service on all connected networks.
|
|
"""
|
|
# Spawn the new service by calling the PYLINK_NEW_SERVICE hook,
|
|
# which is handled by coreplugin.
|
|
if irc is None:
|
|
for irc in world.networkobjects.values():
|
|
irc.callHooks([None, 'PYLINK_NEW_SERVICE', {'name': self.name}])
|
|
else:
|
|
raise NotImplementedError("Network specific plugins not supported yet.")
|
|
|
|
def join(self, irc, channels, autojoin=True):
|
|
"""
|
|
Joins the given service bot to the given channel(s).
|
|
"""
|
|
|
|
if type(irc) == str:
|
|
netname = irc
|
|
else:
|
|
netname = irc.name
|
|
|
|
# Ensure type safety: pluralize strings if only one channel was given, then convert to set.
|
|
if type(channels) == str:
|
|
channels = [channels]
|
|
channels = set(channels)
|
|
|
|
if autojoin:
|
|
log.debug('(%s/%s) Adding channels %s to autojoin', netname, self.name, channels)
|
|
self.extra_channels[netname] |= channels
|
|
|
|
# If the network was given as a string, look up the Irc object here.
|
|
try:
|
|
irc = world.networkobjects[netname]
|
|
except KeyError:
|
|
log.debug('(%s/%s) Skipping join(), IRC object not initialized yet', irc, self.name)
|
|
return
|
|
|
|
try:
|
|
u = self.uids[irc.name]
|
|
except KeyError:
|
|
log.debug('(%s/%s) Skipping join(), UID not initialized yet', irc.name, self.name)
|
|
return
|
|
|
|
# Specify modes to join the services bot with.
|
|
joinmodes = irc.serverdata.get("%s_joinmodes" % self.name) or conf.conf.get(self.name, {}).get('joinmodes') or ''
|
|
joinmodes = ''.join([m for m in joinmodes if m in irc.prefixmodes])
|
|
|
|
for chan in channels:
|
|
if isChannel(chan):
|
|
if u in irc.channels[chan].users:
|
|
log.debug('(%s) Skipping join of services %s to channel %s - it is already present', irc.name, self.name, chan)
|
|
continue
|
|
log.debug('(%s) Joining services %s to channel %s with modes %r', irc.name, self.name, chan, joinmodes)
|
|
if joinmodes: # Modes on join were specified; use SJOIN to burst our service
|
|
irc.proto.sjoin(irc.sid, chan, [(joinmodes, u)])
|
|
else:
|
|
irc.proto.join(u, chan)
|
|
|
|
irc.callHooks([irc.sid, 'PYLINK_SERVICE_JOIN', {'channel': chan, 'users': [u]}])
|
|
else:
|
|
log.warning('(%s) Ignoring invalid autojoin channel %r.', irc.name, chan)
|
|
|
|
def reply(self, irc, text, notice=False, private=False):
|
|
"""Replies to a message as the service in question."""
|
|
servuid = self.uids.get(irc.name)
|
|
if not servuid:
|
|
log.warning("(%s) Possible desync? UID for service %s doesn't exist!", irc.name, self.name)
|
|
return
|
|
|
|
irc.reply(text, notice=notice, source=servuid, private=private)
|
|
|
|
def error(self, irc, text, notice=False, private=False):
|
|
"""Replies with an error, as the service in question."""
|
|
servuid = self.uids.get(irc.name)
|
|
if not servuid:
|
|
log.warning("(%s) Possible desync? UID for service %s doesn't exist!", irc.name, self.name)
|
|
return
|
|
|
|
irc.error(text, notice=notice, source=servuid, private=private)
|
|
|
|
def call_cmd(self, irc, source, text, called_in=None):
|
|
"""
|
|
Calls a PyLink bot command. source is the caller's UID, and text is the
|
|
full, unparsed text of the message.
|
|
"""
|
|
irc.called_in = called_in or source
|
|
irc.called_by = source
|
|
|
|
cmd_args = text.strip().split(' ')
|
|
cmd = cmd_args[0].lower()
|
|
cmd_args = cmd_args[1:]
|
|
if cmd not in self.commands:
|
|
if not cmd.startswith('\x01'):
|
|
# Ignore invalid command errors from CTCPs.
|
|
self.reply(irc, 'Error: Unknown command %r.' % cmd)
|
|
log.info('(%s/%s) Received unknown command %r from %s', irc.name, self.name, cmd, irc.getHostmask(source))
|
|
return
|
|
|
|
log.info('(%s/%s) Calling command %r for %s', irc.name, self.name, cmd, irc.getHostmask(source))
|
|
for func in self.commands[cmd]:
|
|
try:
|
|
func(irc, source, cmd_args)
|
|
except NotAuthorizedError as e:
|
|
self.reply(irc, 'Error: %s' % e)
|
|
except Exception as e:
|
|
log.exception('Unhandled exception caught in command %r', cmd)
|
|
self.reply(irc, 'Uncaught exception in command %r: %s: %s' % (cmd, type(e).__name__, str(e)))
|
|
|
|
def add_cmd(self, func, name=None, featured=False):
|
|
"""Binds an IRC command function to the given command name."""
|
|
if name is None:
|
|
name = func.__name__
|
|
name = name.lower()
|
|
|
|
# Mark as a featured command if requested to do so.
|
|
if featured:
|
|
self.featured_cmds.add(name)
|
|
|
|
self.commands[name].append(func)
|
|
return func
|
|
|
|
def _show_command_help(self, irc, command, private=False, shortform=False):
|
|
"""
|
|
Shows help for the given command.
|
|
"""
|
|
def _reply(text):
|
|
"""
|
|
reply() wrapper to handle the private argument.
|
|
"""
|
|
self.reply(irc, text, private=private)
|
|
|
|
if command not in self.commands:
|
|
_reply('Error: Unknown command %r.' % command)
|
|
return
|
|
else:
|
|
funcs = self.commands[command]
|
|
if len(funcs) > 1:
|
|
_reply('The following \x02%s\x02 plugins bind to the \x02%s\x02 command: %s'
|
|
% (len(funcs), command, ', '.join([func.__module__ for func in funcs])))
|
|
for func in funcs:
|
|
doc = func.__doc__
|
|
mod = func.__module__
|
|
if doc:
|
|
lines = doc.splitlines()
|
|
# Bold the first line, which usually just tells you what
|
|
# arguments the command takes.
|
|
args_desc = '\x02%s %s\x02' % (command, lines[0])
|
|
|
|
_reply(args_desc.strip())
|
|
if not shortform:
|
|
# Note: we handle newlines in docstrings a bit differently. Per
|
|
# https://github.com/GLolol/PyLink/issues/307, only double newlines
|
|
# have the effect of showing a new line on IRC. Single newlines
|
|
# are stripped so that word wrap can be applied in the source code
|
|
# without actually affecting the output on IRC.
|
|
for line in doc.replace('\r', '').split('\n\n')[1:]:
|
|
_reply(' ') # Empty line to break up output a bit.
|
|
real_line = line.replace('\n', ' ')
|
|
real_line = real_line.strip()
|
|
real_line = NORMALIZEWHITESPACE_RE.sub(' ', real_line)
|
|
_reply(real_line)
|
|
else:
|
|
_reply("Error: Command %r doesn't offer any help." % command)
|
|
return
|
|
|
|
def help(self, irc, source, args):
|
|
"""<command>
|
|
|
|
Gives help for <command>, if it is available."""
|
|
try:
|
|
command = args[0].lower()
|
|
except IndexError:
|
|
# No argument given: show service description (if present), 'list' output, and a list
|
|
# of featured commands.
|
|
if self.desc:
|
|
self.reply(irc, self.desc)
|
|
self.reply(irc, " ") # Extra newline to unclutter the output text
|
|
|
|
self.listcommands(irc, source, args)
|
|
return
|
|
else:
|
|
self._show_command_help(irc, command)
|
|
|
|
def listcommands(self, irc, source, args):
|
|
"""takes no arguments.
|
|
|
|
Returns a list of available commands this service has to offer."""
|
|
|
|
# Don't show CTCP handlers in the public command list.
|
|
cmds = sorted([cmd for cmd in self.commands.keys() if '\x01' not in cmd])
|
|
|
|
if cmds:
|
|
self.reply(irc, 'Available commands include: %s' % ', '.join(cmds))
|
|
self.reply(irc, 'To see help on a specific command, type \x02help <command>\x02.')
|
|
else:
|
|
self.reply(irc, 'This service doesn\'t provide any public commands.')
|
|
|
|
# If there are featured commands, list them by showing the help for each.
|
|
# These definitions are sent in private to prevent flooding in channels.
|
|
if self.featured_cmds:
|
|
self.reply(irc, " ", private=True)
|
|
self.reply(irc, 'Featured commands include:', private=True)
|
|
for cmd in sorted(self.featured_cmds):
|
|
if self.commands.get(cmd):
|
|
# Only show featured commands that are both defined and loaded.
|
|
# TODO: perhaps plugin unload should remove unused featured command
|
|
# definitions automatically?
|
|
self._show_command_help(irc, cmd, private=True, shortform=True)
|
|
self.reply(irc, 'End of command listing.', private=True)
|
|
|
|
def registerService(name, *args, **kwargs):
|
|
"""Registers a service bot."""
|
|
name = name.lower()
|
|
if name in world.services:
|
|
raise ValueError("Service name %s is already bound!" % name)
|
|
|
|
world.services[name] = sbot = ServiceBot(name, *args, **kwargs)
|
|
sbot.spawn()
|
|
return sbot
|
|
|
|
def unregisterService(name):
|
|
"""Unregisters an existing service bot."""
|
|
assert name in world.services, "Unknown service %s" % name
|
|
name = name.lower()
|
|
sbot = world.services[name]
|
|
for ircnet, uid in sbot.uids.items():
|
|
ircobj = world.networkobjects[ircnet]
|
|
# Special case for the main PyLink client. If we're unregistering that,
|
|
# clear the irc.pseudoclient entry.
|
|
if name == 'pylink':
|
|
ircobj.pseudoclient = None
|
|
|
|
ircobj.proto.quit(uid, "Service unloaded.")
|
|
|
|
del world.services[name]
|