3
0
mirror of https://github.com/jlu5/PyLink.git synced 2024-11-27 21:19:31 +01:00
PyLink/plugins/antispam.py

410 lines
19 KiB
Python
Raw Normal View History

# antispam.py: Basic services-side spamfilters for IRC
2019-07-15 00:12:29 +02:00
from pylinkirc import conf, utils, world
from pylinkirc.log import log
mydesc = ("Provides anti-spam functionality.")
sbot = utils.register_service("antispam", default_nick="AntiSpam", desc=mydesc)
def die(irc=None):
utils.unregister_service("antispam")
_UNICODE_CHARMAP = {
'A': 'AΑА𝐀𝐴𝑨𝒜𝓐𝔄𝔸𝕬𝖠𝗔𝘈𝘼𝙰𝚨𝛢𝜜𝝖𝞐',
'B': 'ΒВв𐌁𝐁𝐵𝑩𝓑𝔅𝔹𝕭𝖡𝗕𝘉𝘽𝙱𝚩𝛣𝜝𝝗𝞑',
'C': 'CϹС𐌂𝐂𝐶𝑪𝒞𝓒𝕮𝖢𝗖𝘊𝘾𝙲',
'D': 'D𝐃𝐷𝑫𝒟𝓓𝔇𝔻𝕯𝖣𝗗𝘋𝘿𝙳',
'E': 'EΕЕ𝐄𝐸𝑬𝓔𝔈𝔼𝕰𝖤𝗘𝘌𝙀𝙴𝚬𝛦𝜠𝝚𝞔',
'F': 'FϜ𝐅𝐹𝑭𝓕𝔉𝔽𝕱𝖥𝗙𝘍𝙁𝙵𝟊',
'G': 'Ԍԍ𝐆𝐺𝑮𝒢𝓖𝔊𝔾𝕲𝖦𝗚𝘎𝙂𝙶',
'H': 'ΗНн𝐇𝐻𝑯𝓗𝕳𝖧𝗛𝘏𝙃𝙷𝚮𝛨𝜢𝝜𝞖',
'J': 'JЈ𝐉𝐽𝑱𝒥𝓙𝔍𝕁𝕵𝖩𝗝𝘑𝙅𝙹',
'K': 'KΚК𝐊𝐾𝑲𝒦𝓚𝔎𝕂𝕶𝖪𝗞𝘒𝙆𝙺𝚱𝛫𝜥𝝟𝞙',
'L': '𝐋𝐿𝑳𝓛𝔏𝕃𝕷𝖫𝗟𝘓𝙇𝙻',
'M': 'MΜϺМ𐌑𝐌𝑀𝑴𝓜𝔐𝕄𝕸𝖬𝗠𝘔𝙈𝙼𝚳𝛭𝜧𝝡𝞛',
'N': 'Ν𝐍𝑁𝑵𝒩𝓝𝔑𝕹𝖭𝗡𝘕𝙉𝙽𝚴𝛮𝜨𝝢𝞜',
'P': 'PΡРᴘᴩ𝐏𝑃𝑷𝒫𝓟𝔓𝕻𝖯𝗣𝘗𝙋𝙿𝚸𝛲𝜬𝝦𝞠',
'Q': 'Q𝐐𝑄𝑸𝒬𝓠𝔔𝕼𝖰𝗤𝘘𝙌𝚀',
'R': 'RƦʀ𝐑𝑅𝑹𝓡𝕽𝖱𝗥𝘙𝙍𝚁',
'S': 'SЅՏ𝐒𝑆𝑺𝒮𝓢𝔖𝕊𝕾𝖲𝗦𝘚𝙎𝚂',
'T': 'TΤτТт𐌕𝐓𝑇𝑻𝒯𝓣𝔗𝕋𝕿𝖳𝗧𝘛𝙏𝚃𝚻𝛕𝛵𝜏𝜯𝝉𝝩𝞃𝞣𝞽',
'U': 'UՍ𝐔𝑈𝑼𝒰𝓤𝔘𝕌𝖀𝖴𝗨𝘜𝙐𝚄',
'V': 'VѴ٧۷𝐕𝑉𝑽𝒱𝓥𝔙𝕍𝖁𝖵𝗩𝘝𝙑𝚅',
'W': 'WԜ𝐖𝑊𝑾𝒲𝓦𝔚𝕎𝖂𝖶𝗪𝘞𝙒𝚆',
'X': 'XΧХ𐌗𐌢𝐗𝑋𝑿𝒳𝓧𝔛𝕏𝖃𝖷𝗫𝘟𝙓𝚇𝚾𝛸𝜲𝝬𝞦',
'Y': 'YΥϒУҮ𝐘𝑌𝒀𝒴𝓨𝔜𝕐𝖄𝖸𝗬𝘠𝙔𝚈𝚼𝛶𝜰𝝪𝞤',
'Z': 'ZΖ𝐙𝑍𝒁𝒵𝓩𝖅𝖹𝗭𝘡𝙕𝚉𝚭𝛧𝜡𝝛𝞕',
'a': 'aɑαа𝐚𝑎𝒂𝒶𝓪𝔞𝕒𝖆𝖺𝗮𝘢𝙖𝚊𝛂𝛼𝜶𝝰𝞪',
'b': 'bƄЬ𝐛𝑏𝒃𝒷𝓫𝔟𝕓𝖇𝖻𝗯𝘣𝙗𝚋',
'c': 'cϲс𝐜𝑐𝒄𝒸𝓬𝔠𝕔𝖈𝖼𝗰𝘤𝙘𝚌',
'd': 'dԁ𝐝𝑑𝒅𝒹𝓭𝔡𝕕𝖉𝖽𝗱𝘥𝙙𝚍',
'e': 'eеҽ𝐞𝑒𝒆𝓮𝔢𝕖𝖊𝖾𝗲𝘦𝙚𝚎',
'f': 'fſϝք𝐟𝑓𝒇𝒻𝓯𝔣𝕗𝖋𝖿𝗳𝘧𝙛𝚏𝟋',
'g': 'gƍɡց𝐠𝑔𝒈𝓰𝔤𝕘𝖌𝗀𝗴𝘨𝙜𝚐',
'h': 'hһհ𝐡𝒉𝒽𝓱𝔥𝕙𝖍𝗁𝗵𝘩𝙝𝚑',
'i': 'iıɩɪιіӏ𝐢𝑖𝒊𝒾𝓲𝔦𝕚𝖎𝗂𝗶𝘪𝙞𝚒𝚤𝛊𝜄𝜾𝝸𝞲',
'j': 'jϳј𝐣𝑗𝒋𝒿𝓳𝔧𝕛𝖏𝗃𝗷𝘫𝙟𝚓',
'k': 'k𝐤𝑘𝒌𝓀𝓴𝔨𝕜𝖐𝗄𝗸𝘬𝙠𝚔',
'l': '',
'm': 'ⅿm',
'n': 'nոռ𝐧𝑛𝒏𝓃𝓷𝔫𝕟𝖓𝗇𝗻𝘯𝙣𝚗',
'o': 'οо',
'p': 'pρϱр𝐩𝑝𝒑𝓅𝓹𝔭𝕡𝖕𝗉𝗽𝘱𝙥𝚙𝛒𝛠𝜌𝜚𝝆𝝔𝞀𝞎𝞺𝟈',
'q': 'qԛգզ𝐪𝑞𝒒𝓆𝓺𝔮𝕢𝖖𝗊𝗾𝘲𝙦𝚚',
'r': 'rг𝐫𝑟𝒓𝓇𝓻𝔯𝕣𝖗𝗋𝗿𝘳𝙧𝚛',
's': 'sƽѕ𝐬𝑠𝒔𝓈𝓼𝔰𝕤𝖘𝗌𝘀𝘴𝙨𝚜',
't': 't𝐭𝑡𝒕𝓉𝓽𝔱𝕥𝖙𝗍𝘁𝘵𝙩𝚝',
'u': 'uʋυս𝐮𝑢𝒖𝓊𝓾𝔲𝕦𝖚𝗎𝘂𝘶𝙪𝚞𝛖𝜐𝝊𝞄𝞾',
'v': 'vνѵט𝐯𝑣𝒗𝓋𝓿𝔳𝕧𝖛𝗏𝘃𝘷𝙫𝚟𝛎𝜈𝝂𝝼𝞶',
'w': 'wɯѡԝա𝐰𝑤𝒘𝓌𝔀𝔴𝕨𝖜𝗐𝘄𝘸𝙬𝚠',
'x': 'x×х𝐱𝑥𝒙𝓍𝔁𝔵𝕩𝖝𝗑𝘅𝘹𝙭𝚡',
'y': 'yɣʏγуүỿ𝐲𝑦𝒚𝓎𝔂𝔶𝕪𝖞𝗒𝘆𝘺𝙮𝚢𝛄𝛾𝜸𝝲𝞬',
'z': 'z𝐳𝑧𝒛𝓏𝔃𝔷𝕫𝖟𝗓𝘇𝘻𝙯𝚣',
'/': '',
'\\': '',
' ': '\u2000\u2001\u2002\u2003\u2004\u2005\u2006\u2007\u2008\u2009\u200a\xa0\u202f\u205f',
'.': '',
'-': '˗╴﹣-−⎼',
'!': '﹗!ǃⵑ︕',
':': ':˸։፡᛬⁚∶⠆︓﹕',
'#': '#﹟'
}
def _prep_maketrans(data):
from_s = ''
to_s = ''
for target, chars in data.items():
from_s += chars
to_s += target * len(chars)
return str.maketrans(from_s, to_s)
UNICODE_CHARMAP = _prep_maketrans(_UNICODE_CHARMAP)
PUNISH_OPTIONS = ['kill', 'ban', 'quiet', 'kick', 'block']
EXEMPT_OPTIONS = ['voice', 'halfop', 'op']
DEFAULT_EXEMPT_OPTION = 'halfop'
def _punish(irc, target, channel, punishment, reason):
"""Punishes the target user. This function returns True if the user was successfully punished."""
if target not in irc.users:
log.warning("(%s) antispam: got target %r that isn't a user?", irc.name, target)
return False
elif irc.is_oper(target):
log.debug("(%s) antispam: refusing to punish oper %s/%s", irc.name, target, irc.get_friendly_name(target))
return False
target_nick = irc.get_friendly_name(target)
if channel:
c = irc.channels[channel]
exempt_level = irc.get_service_option('antispam', 'exempt_level', DEFAULT_EXEMPT_OPTION).lower()
if exempt_level not in EXEMPT_OPTIONS:
log.error('(%s) Antispam exempt %r is not a valid setting, '
'falling back to defaults; accepted settings include: %s',
irc.name, exempt_level, ', '.join(EXEMPT_OPTIONS))
exempt_level = DEFAULT_EXEMPT_OPTION
if exempt_level == 'voice' and c.is_voice_plus(target):
log.debug("(%s) antispam: refusing to punish voiced and above %s/%s", irc.name, target, target_nick)
return False
elif exempt_level == 'halfop' and c.is_halfop_plus(target):
log.debug("(%s) antispam: refusing to punish halfop and above %s/%s", irc.name, target, target_nick)
return False
elif exempt_level == 'op' and c.is_op_plus(target):
log.debug("(%s) antispam: refusing to punish op and above %s/%s", irc.name, target, target_nick)
return False
my_uid = sbot.uids.get(irc.name)
# XXX workaround for single-bot protocols like Clientbot
if irc.pseudoclient and not irc.has_cap('can-spawn-clients'):
my_uid = irc.pseudoclient.uid
bans = set()
log.debug('(%s) antispam: got %r as punishment for %s/%s', irc.name, punishment,
target, irc.get_friendly_name(target))
def _ban():
bans.add(irc.make_channel_ban(target))
def _quiet():
bans.add(irc.make_channel_ban(target, ban_type='quiet'))
def _kick():
irc.kick(my_uid, channel, target, reason)
irc.call_hooks([my_uid, 'ANTISPAM_KICK', {'channel': channel, 'text': reason, 'target': target,
'parse_as': 'KICK'}])
def _kill():
if target not in irc.users:
log.debug('(%s) antispam: not killing %s/%s; they already left', irc.name, target,
irc.get_friendly_name(target))
return
userdata = irc.users[target]
irc.kill(my_uid, target, reason)
irc.call_hooks([my_uid, 'ANTISPAM_KILL', {'target': target, 'text': reason,
'userdata': userdata, 'parse_as': 'KILL'}])
kill = False
successful_punishments = 0
for action in set(punishment.split('+')):
if action not in PUNISH_OPTIONS:
log.error('(%s) Antispam punishment %r is not a valid setting; '
'accepted settings include: %s OR any combination of '
'these joined together with a "+".',
irc.name, punishment, ', '.join(PUNISH_OPTIONS))
return
elif action == 'block':
# We only need to increment this for this function to return True
successful_punishments += 1
elif action == 'kill':
kill = True # Delay kills so that the user data doesn't disappear.
# XXX factorize these blocks
elif action == 'kick' and channel:
try:
_kick()
except NotImplementedError:
log.warning("(%s) antispam: Kicks are not supported on this network, skipping; "
"target was %s/%s", irc.name, target_nick, channel)
else:
successful_punishments += 1
elif action == 'ban' and channel:
try:
_ban()
except (ValueError, NotImplementedError):
log.warning("(%s) antispam: Bans are not supported on this network, skipping; "
"target was %s/%s", irc.name, target_nick, channel)
else:
successful_punishments += 1
elif action == 'quiet' and channel:
try:
_quiet()
except (ValueError, NotImplementedError):
log.warning("(%s) antispam: Quiet is not supported on this network, skipping; "
"target was %s/%s", irc.name, target_nick, channel)
else:
successful_punishments += 1
if bans: # Set all bans at once to prevent spam
irc.mode(my_uid, channel, bans)
irc.call_hooks([my_uid, 'ANTISPAM_BAN',
{'target': channel, 'modes': bans, 'parse_as': 'MODE'}])
if kill:
try:
_kill()
except NotImplementedError:
log.warning("(%s) antispam: Kills are not supported on this network, skipping; "
"target was %s/%s", irc.name, target_nick, channel)
else:
successful_punishments += 1
if not successful_punishments:
log.warning('(%s) antispam: Failed to punish %s with %r, target was %s', irc.name,
target_nick, punishment, channel or 'a PM')
return bool(successful_punishments)
MASSHIGHLIGHT_DEFAULTS = {
'min_length': 50,
'min_nicks': 5,
'reason': "Mass highlight spam is prohibited",
'punishment': 'kick+ban',
'enabled': False
}
def handle_masshighlight(irc, source, command, args):
"""Handles mass highlight attacks."""
channel = args['target']
text = args['text']
mhl_settings = irc.get_service_option('antispam', 'masshighlight',
MASSHIGHLIGHT_DEFAULTS)
if not mhl_settings.get('enabled', False):
return
my_uid = sbot.uids.get(irc.name)
# XXX workaround for single-bot protocols like Clientbot
if irc.pseudoclient and not irc.has_cap('can-spawn-clients'):
my_uid = irc.pseudoclient.uid
if (not irc.connected.is_set()) or (not my_uid):
# Break if the network isn't ready.
log.debug("(%s) antispam.masshighlight: skipping processing; network isn't ready", irc.name)
return
elif not irc.is_channel(channel):
# Not a channel - mass highlight blocking only makes sense within channels
log.debug("(%s) antispam.masshighlight: skipping processing; %r is not a channel", irc.name, channel)
return
elif irc.is_internal_client(source):
# Ignore messages from our own clients.
log.debug("(%s) antispam.masshighlight: skipping processing message from internal client %s", irc.name, source)
return
elif source not in irc.users:
log.debug("(%s) antispam.masshighlight: ignoring message from non-user %s", irc.name, source)
return
elif channel not in irc.channels or my_uid not in irc.channels[channel].users:
# We're not monitoring this channel.
log.debug("(%s) antispam.masshighlight: skipping processing message from channel %r we're not in", irc.name, channel)
return
elif len(text) < mhl_settings.get('min_length', MASSHIGHLIGHT_DEFAULTS['min_length']):
log.debug("(%s) antispam.masshighlight: skipping processing message %r; it's too short", irc.name, text)
return
if irc.get_service_option('antispam', 'strip_formatting', True):
text = utils.strip_irc_formatting(text)
# Strip :, from potential nicks
words = [word.rstrip(':,') for word in text.split()]
userlist = [irc.users[uid].nick for uid in irc.channels[channel].users.copy()]
min_nicks = mhl_settings.get('min_nicks', MASSHIGHLIGHT_DEFAULTS['min_nicks'])
# Don't allow repeating the same nick to trigger punishment
nicks_caught = set()
punished = False
for word in words:
if word in userlist:
nicks_caught.add(word)
if len(nicks_caught) >= min_nicks:
# Get the punishment and reason.
punishment = mhl_settings.get('punishment', MASSHIGHLIGHT_DEFAULTS['punishment']).lower()
reason = mhl_settings.get('reason', MASSHIGHLIGHT_DEFAULTS['reason'])
log.info("(%s) antispam: punishing %s => %s for mass highlight spam",
irc.name,
irc.get_friendly_name(source),
channel)
punished = _punish(irc, source, channel, punishment, reason)
break
log.debug('(%s) antispam.masshighlight: got %s/%s nicks on message to %r', irc.name,
len(nicks_caught), min_nicks, channel)
return not punished # Filter this message from relay, etc. if it triggered protection
utils.add_hook(handle_masshighlight, 'PRIVMSG', priority=1000)
utils.add_hook(handle_masshighlight, 'NOTICE', priority=1000)
TEXTFILTER_DEFAULTS = {
'reason': "Spam is prohibited",
'punishment': 'kick+ban+block',
'watch_pms': False,
'enabled': False,
'munge_unicode': True,
}
def handle_textfilter(irc, source, command, args):
"""Antispam text filter handler."""
target = args['target']
text = args['text']
txf_settings = irc.get_service_option('antispam', 'textfilter',
TEXTFILTER_DEFAULTS)
if not txf_settings.get('enabled', False):
return
my_uid = sbot.uids.get(irc.name)
# XXX workaround for single-bot protocols like Clientbot
if irc.pseudoclient and not irc.has_cap('can-spawn-clients'):
my_uid = irc.pseudoclient.uid
if (not irc.connected.is_set()) or (not my_uid):
# Break if the network isn't ready.
log.debug("(%s) antispam.textfilters: skipping processing; network isn't ready", irc.name)
return
elif irc.is_internal_client(source):
# Ignore messages from our own clients.
log.debug("(%s) antispam.textfilters: skipping processing message from internal client %s", irc.name, source)
return
elif source not in irc.users:
log.debug("(%s) antispam.textfilters: ignoring message from non-user %s", irc.name, source)
return
if irc.is_channel(target):
channel_or_none = target
if target not in irc.channels or my_uid not in irc.channels[target].users:
# We're not monitoring this channel.
log.debug("(%s) antispam.textfilters: skipping processing message from channel %r we're not in", irc.name, target)
return
else:
channel_or_none = None
watch_pms = txf_settings.get('watch_pms', TEXTFILTER_DEFAULTS['watch_pms'])
if watch_pms == 'services':
if not irc.get_service_bot(target):
log.debug("(%s) antispam.textfilters: skipping processing; %r is not a service bot (watch_pms='services')", irc.name, target)
return
elif watch_pms == 'all':
log.debug("(%s) antispam.textfilters: checking all PMs (watch_pms='all')", irc.name)
pass
else:
# Not a channel.
log.debug("(%s) antispam.textfilters: skipping processing; %r is not a channel and watch_pms is disabled", irc.name, target)
return
# Merge together global and local textfilter lists.
txf_globs = set(conf.conf.get('antispam', {}).get('textfilter_globs', [])) | \
set(irc.serverdata.get('antispam_textfilter_globs', []))
punishment = txf_settings.get('punishment', TEXTFILTER_DEFAULTS['punishment']).lower()
reason = txf_settings.get('reason', TEXTFILTER_DEFAULTS['reason'])
if irc.get_service_option('antispam', 'strip_formatting', True):
text = utils.strip_irc_formatting(text)
if txf_settings.get('munge_unicode', TEXTFILTER_DEFAULTS['munge_unicode']):
text = str.translate(text, UNICODE_CHARMAP)
punished = False
for filterglob in txf_globs:
if utils.match_text(filterglob, text):
log.info("(%s) antispam: punishing %s => %s for text filter %r",
irc.name,
irc.get_friendly_name(source),
irc.get_friendly_name(target),
filterglob)
punished = _punish(irc, source, channel_or_none, punishment, reason)
break
return not punished # Filter this message from relay, etc. if it triggered protection
utils.add_hook(handle_textfilter, 'PRIVMSG', priority=999)
utils.add_hook(handle_textfilter, 'NOTICE', priority=999)
PARTQUIT_DEFAULTS = {
'watch_quits': True,
'watch_parts': True,
'part_filter_message': "Reason filtered",
'quit_filter_message': "Reason filtered",
}
def handle_partquit(irc, source, command, args):
"""Antispam part/quit message filter."""
text = args.get('text')
pq_settings = irc.get_service_option('antispam', 'partquit',
PARTQUIT_DEFAULTS)
if not text:
return # No text to match against
elif command == 'QUIT' and not pq_settings.get('watch_quits', True):
return # Not enabled
elif command == 'PART' and not pq_settings.get('watch_parts', True):
return
# Merge together global and local partquit filter lists.
pq_globs = set(conf.conf.get('antispam', {}).get('partquit_globs', [])) | \
set(irc.serverdata.get('antispam_partquit_globs', []))
if not pq_globs:
return
for filterglob in pq_globs:
if utils.match_text(filterglob, text):
# For parts, also log the affected channels
if command == 'PART':
filtered_message = pq_settings.get('part_filter_message', PARTQUIT_DEFAULTS['part_filter_message'])
log.info('(%s) antispam: filtered part message from %s on %s due to part/quit filter glob %s',
irc.name, irc.get_hostmask(source), ','.join(args['channels']), filterglob)
else:
filtered_message = pq_settings.get('quit_filter_message', PARTQUIT_DEFAULTS['quit_filter_message'])
log.info('(%s) antispam: filtered quit message from %s due to part/quit filter glob %s',
irc.name, args['userdata'].nick, filterglob)
args['text'] = filtered_message
break
utils.add_hook(handle_partquit, 'PART', priority=999)
utils.add_hook(handle_partquit, 'QUIT', priority=999)