Compare commits

..

No commits in common. "master" and "master" have entirely different histories.

8 changed files with 246 additions and 377 deletions

3
.gitignore vendored
View File

@ -1,3 +1,2 @@
env/
.vscode
__pycache__/
.vscode

View File

@ -1,11 +1,8 @@
SnoParser + Redis
=================
SnoParser
=========
Parses the Server Notices from ErgoIRCd.
This branch contains Redis functionality:
Performs and caches WHOIS lookups for connecting IP addresses. Counts nicknames and IP addresses.
Assumptions
-----------
@ -27,4 +24,4 @@ Configurations
1. Formatting for 'AutoVhost' post Registration (TODO: Disable)
2. Configure the '#channel' to send the SNO lines (disabled by default) (TODO: Add Exceptions)
3. Use ZWSP to toggle nickhighlights when in channel.
3. Use ZWSP to toggle nickhighlights when in channel.

Binary file not shown.

Binary file not shown.

Binary file not shown.

244
config.py
View File

@ -28,24 +28,15 @@
###
from supybot import conf, registry, ircutils
from supybot import conf, registry
try:
from supybot.i18n import PluginInternationalization
_ = PluginInternationalization("SnoParser")
_ = PluginInternationalization('SnoParser')
except:
# Placeholder that allows to run the plugin on a bot
# without the i18n module
_ = lambda x: x
# class ValidChannel(registry.string):
# """Value must be a valid channel"""
# def setValue(self, v):
# if not (ircutils.isChannel(v)):
# self.error()
# registry.String.setValue(self, v)
def configure(advanced):
# This will be called by supybot to configure this module. advanced is
@ -53,182 +44,115 @@ def configure(advanced):
# user or not. You should effect your configuration by manipulating the
# registry as appropriate.
from supybot.questions import expect, anything, something, yn
conf.registerPlugin("SnoParser", True)
conf.registerPlugin('SnoParser', True)
SnoParser = conf.registerPlugin("SnoParser")
SnoParser = conf.registerPlugin('SnoParser')
# This is where your configuration variables (if any) should go. For example:
# conf.registerGlobalValue(SnoParser, 'someConfigVariableName',
# registry.Boolean(False, _("""Help for someConfigVariableName.""")))
# conf.registerNetworkValue(SnoParser, 'targetChannel', ValidChannel,
# ("", ("""Determines which channel the bot should send snolines""")))
conf.registerGlobalValue(SnoParser, 'targetChannel',
registry.String(None, ("""Sends reformatted snolines to the <channel>""")))
conf.registerNetworkValue(
SnoParser,
"targetChannel",
registry.String(
"",
("""Determines which channel the bot should snolines example: `#snotices`"""),
),
)
conf.registerGlobalValue(SnoParser, 'AutoVhost',
registry.String('libcasa/user/', ("""Configure the vhost eg. libcasa/user/$account""")))
conf.registerGlobalValue(
SnoParser,
"AutoVhost",
registry.String(
"libcasa/user/", ("""Configure the vhost eg. libcasa/user/$account""")
),
)
conf.registerGlobalValue(SnoParser, 'preventHighlight',
registry.Boolean(True, ("""Toggles in channel highlights with ZWSP""")))
conf.registerGlobalValue(
SnoParser,
"preventHighlight",
registry.Boolean(True, ("""Toggles in channel highlights with ZWSP""")),
)
conf.registerGlobalValue(
SnoParser,
"debug",
registry.Boolean(
"false",
"""
conf.registerGlobalValue(SnoParser, 'debug',
registry.Boolean('false',
"""
SnoParser: Verbose output. Note: there is a seperate debug option for the `whois` client.
""",
private=True,
),
)
"""
, private=True
))
###
# REDIS related settings below:
###
conf.registerGroup(SnoParser, "redis")
conf.registerGroup(SnoParser.redis, "db")
conf.registerGlobalValue(
SnoParser.redis.db,
"ips",
registry.Integer(
2,
"""
Redis: Database number for counting of IP ADDRESSES.
""",
private=True,
),
)
conf.registerGlobalValue(
SnoParser.redis.db,
"nicks",
registry.Integer(
1,
"""
conf.registerGroup(SnoParser, 'redis')
conf.registerGlobalValue(SnoParser.redis, 'db1',
registry.Integer('1',
"""
Redis: Database number for counting of NICKNAMES.
""",
private=True,
),
)
conf.registerGlobalValue(
SnoParser.redis.db,
"whois",
registry.Integer(
0,
"""
Redis: Database number for WHOIS query caching.
""",
),
)
conf.registerGlobalValue(
SnoParser.redis,
"host",
registry.String(
"localhost",
"""
"""
, private=True
))
conf.registerGlobalValue(SnoParser.redis, 'db2',
registry.Integer('2',
"""
Redis: Database number for counting of IP ADDRESSES.
"""
, private=True
))
conf.registerGlobalValue(SnoParser.redis, 'host',
registry.String('127.0.0.1',
"""
Redis: IP address or hostname.
""",
private=True,
),
)
conf.registerGlobalValue(
SnoParser.redis,
"port",
registry.Integer(
6379,
"""
"""
, private=True
))
conf.registerGlobalValue(SnoParser.redis, 'port',
registry.Integer('6379',
"""
Redis: Port.
""",
private=True,
),
)
conf.registerGlobalValue(
SnoParser.redis,
"username",
registry.String(
"",
"""
"""
, private=True
))
conf.registerGlobalValue(SnoParser.redis, 'username',
registry.String('',
"""
Redis: Username. This is optional and has not been tested. It is recommended to perform initial tests on a local instance with no username and password.
""",
private=True,
),
)
conf.registerGlobalValue(
SnoParser.redis,
"password",
registry.String(
"",
"""
"""
, private=True
))
conf.registerGlobalValue(SnoParser.redis, 'password',
registry.String('',
"""
Redis: Password. This is optional and has not been tested. It is recommended to perform initial tests on a local instance with no username and password.
""",
),
)
conf.registerGlobalValue(
SnoParser.redis,
"timeout",
registry.Integer(
5,
"""
"""
))
conf.registerGlobalValue(SnoParser.redis, 'timeout',
registry.Integer('5',
"""
Redis: Socket Timeout. The developer does not know what to recommend here, but `5` seems to yield good results.
""",
),
)
"""
))
###
## WHOIS related settings below:
###
conf.registerGroup(SnoParser, "whois")
conf.registerGlobalValue(
SnoParser.whois,
"debug",
registry.Boolean(
"false",
"""
conf.registerGroup(SnoParser, 'whois')
conf.registerGlobalValue(SnoParser.whois, 'debug',
registry.Boolean('false',
"""
SnoParser: True: Very verbose console output. False: Mostly silent operation.
""",
private=True,
),
)
conf.registerGlobalValue(
SnoParser.whois,
"sample",
registry.String(
"",
"""
"""
, private=True
))
conf.registerGlobalValue(SnoParser.whois, 'sample',
registry.String('',
"""
SnoParser: This allows to set a testing IP address, if the plugin shall be evaluated on i.e. a local network. This will override all IP addresses from SNOTICES!
""",
private=True,
),
)
conf.registerGlobalValue(
SnoParser.whois,
"ttl",
registry.Integer(
3600,
"""
"""
, private=True
))
conf.registerGlobalValue(SnoParser.whois, 'ttl',
registry.Integer('3600',
"""
SnoParser: How long to cache WHOIS entries for.
""",
private=True,
),
)
"""
, private=True
))
conf.registerGroup(SnoParser.whois, 'redis')
conf.registerGlobalValue(SnoParser.whois.redis, 'db',
registry.Integer('0',
"""
Redis: Database number for WHOIS query caching.
"""
))
# vim:set shiftwidth=4 tabstop=4 expandtab textwidth=79:

363
plugin.py
View File

@ -28,154 +28,179 @@
###
from supybot import (
ircutils,
callbacks,
ircmsgs,
)
from supybot import utils, plugins, ircutils, callbacks, ircdb, conf, log, world, ircmsgs
from supybot.commands import *
try:
from supybot.i18n import PluginInternationalization
_ = PluginInternationalization("SnoParser")
_ = PluginInternationalization('SnoParser')
except ImportError:
# Placeholder that allows to run the plugin on a bot
# without the i18n module
_ = lambda x: x
import re
import os
import sys
import time
import sqlite3
import redis
import json
from datetime import timedelta
from ipwhois import IPWhois
from ipwhois.exceptions import IPDefinedError
import ipwhois
class SnoParser(callbacks.Plugin):
"""Parses the Server Notices from ErgoIRCd"""
threaded = True
def _redis_connect(self, db) -> redis.client.Redis:
def redis_connect_whois(self) -> redis.client.Redis:
try:
redis_client = redis.Redis(
host=self.registryValue("redis.host"),
port=self.registryValue("redis.port"),
password=self.registryValue("redis.password"),
username=self.registryValue("redis.username"),
db=self.registryValue(f"redis.db.{db}"),
socket_timeout=int(self.registryValue("redis.timeout")),
redis_client_whois = redis.Redis(
host = self.registryValue('redis.host'),
port = self.registryValue('redis.port'),
password = self.registryValue('redis.password'),
username = self.registryValue('redis.username'),
db = self.registryValue('whois.redis.db'),
socket_timeout = int(self.registryValue('redis.timeout'))
)
ping = redis_client.ping()
ping = redis_client_whois.ping()
if ping is True:
return redis_client
return redis_client_whois
except redis.AuthenticationError:
print("Could not authenticate to Redis backend.")
def redis_connect_nicks(self) -> redis.client.Redis:
try:
redis_client_nicks = redis.Redis(
host = self.registryValue('redis.host'),
port = self.registryValue('redis.port'),
password = self.registryValue('redis.password'),
username = self.registryValue('redis.username'),
db = self.registryValue('redis.db1'),
socket_timeout = int(self.registryValue('redis.timeout'))
)
ping = redis_client_nicks.ping()
if ping is True:
return redis_client_nicks
except redis.AuthenticationError:
print("Could not authenticate to Redis backend.")
def redis_connect_ips(self) -> redis.client.Redis:
try:
redis_client_ips = redis.Redis(
host = self.registryValue('redis.host'),
port = self.registryValue('redis.port'),
password = self.registryValue('redis.password'),
username = self.registryValue('redis.username'),
db = self.registryValue('redis.db2'),
socket_timeout = int(self.registryValue('redis.timeout'))
)
ping = redis_client_ips.ping()
if ping is True:
return redis_client_ips
except redis.AuthenticationError:
print("Could not authenticate to Redis backend.")
def __init__(self, irc):
super().__init__(irc)
self.redis_clients = {
'ips': self._redis_connect("ips"),
'nicks': self._redis_connect("nicks"),
'whois': self._redis_connect("whois"),
}
def _get_from_cache(self, db, key):
""" Get value from Redis cache """
return self.redis_clients[db].get(key)
self.redis_client_whois = self.redis_connect_whois()
self.redis_client_nicks = self.redis_connect_nicks()
self.redis_client_ips = self.redis_connect_ips()
def whois_fresh(self, sourceip: str) -> dict:
"""Data from WHOIS backend (IANA or respective RIR)."""
"""Data from cache."""
asn = 0
subnet = ""
subnet = ''
try:
whois = IPWhois(sourceip)
whoisres = whois.lookup_rdap(depth=1, retry_count=0)
whoisres = whois.lookup_rdap(depth=1,retry_count=0)
results = whoisres
if self.registryValue("whois.debug"):
if self.registryValue('whois.debug'):
print(results)
asn = whoisres["asn_registry"]
country = whoisres["asn_country_code"]
description = whoisres["asn_description"]
whoisout = asn + " " + country + " " + description
except IPDefinedError:
whoisout = "RFC 4291 (Local)"
asn = whoisres['asn_registry']
country = whoisres['asn_country_code']
description = whoisres['asn_description']
whoisout = asn + ' ' + country + ' ' + description
except ipwhois.exceptions.IPDefinedError:
whoisout = 'RFC 4291 (Local)'
response = whoisout
return response
def whois_get_cache(self, key: str) -> str:
"""Data from Redis."""
k = self.redis_client_whois.get(key)
# self = SnoParser()
# val = self.redis_client_whois.get(key)
val = k
return val
def whois_set_cache(self, key: str, value: str) -> bool:
"""Data to Redis."""
return self.redis_clients['whois'].setex(
key,
timedelta(seconds=int(self.registryValue("whois.ttl"))),
value=value,
)
duration = self.registryValue('whois.ttl')
state = self.redis_client_whois.setex(key, timedelta(seconds=duration), value=value,)
return state
def whois_run(self, sourceip: str) -> dict:
"""Whois query router."""
data = self._get_from_cache("whois", sourceip)
data = self.whois_get_cache(key=sourceip)
if data is not None:
data = json.loads(data)
if self.registryValue("whois.debug"):
if self.registryValue('whois.debug'):
print("SNOPARSER DEBUG - WHOIS_RUN WITH CACHE: TRUE")
print(data)
print(sourceip)
return data
else:
data = self.whois_fresh(sourceip)
if self.registryValue("whois.debug"):
if self.registryValue('whois.debug'):
print("SNOPARSER DEBUG - WHOIS_RUN WITH CACHE: FALSE")
print(data)
print(sourceip)
if data.startswith:
if self.registryValue("whois.debug"):
print(
"SNOPARSER DEBUG - WHOIS_RUN WITH CACHE: FALSE AND CORRECT STARTING CHARACTER"
)
if self.registryValue('whois.debug'):
print("SNOPARSER DEBUG - WHOIS_RUN WITH CACHE: FALSE AND CORRECT STARTING CHARACTER")
print(data)
data = json.dumps(data)
state = self.whois_set_cache(key=sourceip, value=data)
if state is True:
return json.loads(data)
else:
if self.registryValue("whois.debug"):
print(
"SNOPARSER DEBUG _ WHOIS_RUN WITH CACHE: FALSE AND WRONG STARTING CHARACTER"
)
if self.registryValue('whois.debug'):
print("SNOPARSER DEBUG _ WHOIS_RUN WITH CACHE: FALSE AND WRONG STARTING CHARACTER")
print(data)
return data
def nick_run(self, nickname: str) -> dict:
"""Tracks nicknames"""
data = self._get_from_cache("nicks", nickname)
data = self.redis_client_nicks.get(nickname)
if data is not None:
if self.registryValue("debug"):
if self.registryValue('debug'):
print("SNOPARSER DEBUG - NICK_RUN, SEEN: TRUE")
print(nickname)
print(data)
self.redis_clients['nicks'].incrby(nickname, amount=1)
self.redis_client_nicks.incrby(nickname,amount=1)
if data:
decoded = data.decode("utf-8")
decoded = data.decode('utf-8')
return decoded
else:
return 0
else:
if self.registryValue("debug"):
if self.registryValue('debug'):
print("SNOPARSER DEBUG _ NICK_RUN, SEEN: FALSE")
print(nickname)
print(data)
self.redis_clients['nicks'].set(nickname, value="1")
self.redis_client_nicks.set(nickname,value='1')
if data:
decoded = data.decode("utf-8")
decoded = data.decode('utf-8')
return decoded
else:
return 0
@ -183,26 +208,27 @@ class SnoParser(callbacks.Plugin):
def ip_run(self, ipaddress: str) -> dict:
"""Tracks IP addresses"""
data = self._get_from_cache("ips", ipaddress)
data = self.redis_client_ips.get(ipaddress)
if data is not None:
if self.registryValue("debug"):
if self.registryValue('debug'):
print("SNOPARSER DEBUG - IP_RUN, SEEN: TRUE")
print(ipaddress)
print(data)
self.redis_clients['ips'].incrby(ipaddress, amount=1)
self.redis_client_ips.incrby(ipaddress,amount=1)
if data:
decoded = data.decode("utf-8")
decoded = data.decode('utf-8')
return decoded
else:
return 0
else:
if self.registryValue("debug"):
if self.registryValue('debug'):
print("SNOPARSER DEBUG _ IP_RUN, SEEN: FALSE")
print(ipaddress)
print(data)
self.redis_clients['ips'].set(ipaddress, value="1")
self.redis_client_ips.set(ipaddress,value='1')
if data:
decoded = data.decode("utf-8")
decoded = data.decode('utf-8')
return decoded
else:
return 0
@ -212,36 +238,32 @@ class SnoParser(callbacks.Plugin):
Queries the cache for an address.
"""
data = self.redis_clients['whois'].get(ipaddress)
if data is not None:
data = data.decode("utf-8")
ttl = self.redis_clients['whois'].ttl(ipaddress)
count = self.redis_clients['ips'].get(ipaddress)
if count is not None:
count = count.decode("utf-8")
print("SnoParser manual query: ", data, " ", ttl)
irc.reply(f"{data} - Count: {count} - Remaining: {ttl}s")
data = self.whois_get_cache(key=ipaddress)
decoded_data = data.decode('utf-8')
ttl = self.redis_client_whois.ttl(ipaddress)
count = self.redis_client_ips.get(ipaddress)
decoded_count = count.decode('utf-8')
print('SnoParser manual query: ', data, ' ', ttl)
irc.reply(f'{decoded_data} - Count: {decoded_count} - Remaining: {ttl}s')
ipquery = wrap(ipquery, ['anything'])
ipquery = wrap(ipquery, ["ip"])
def doNotice(self, irc, msg):
(target, text) = msg.args
if target == irc.nick:
# server notices CONNECT, KILL, XLINE, NICK, ACCOUNT, OPER, QUIT,
# server notices CONNECT, KILL, XLINE, NICK, ACCOUNT
text = ircutils.stripFormatting(text)
# if 'CONNECT' in text:
RE_CLICONN = re.compile(
r"^-CONNECT- Client connected \[(.+)\] \[u\:~(.+)\] \[h\:(.+)\] \[ip\:(.+)\] \[r\:(.+)\]$"
)
couple = RE_CLICONN.match(text)
# check `if couple:` ie was there a match even? if yes proceed
if couple:
if 'CONNECT' in text:
connregex = "^-CONNECT- Client connected \[(.+)\] \[u\:~(.+)\] \[h\:(.+)\] \[ip\:(.+)\] \[r\:(.+)\]$"
couple = re.match(connregex, text)
nickname = couple.group(1)
username = couple.group(2)
host = couple.group(3)
if self.registryValue("whois.sample"):
ip = self.registryValue("whois.sample")
if self.registryValue('whois.sample'):
ip = self.registryValue('whois.sample')
else:
ip = couple.group(4)
realname = couple.group(5)
@ -250,74 +272,43 @@ class SnoParser(callbacks.Plugin):
nick_seen = self.nick_run(nickname=nickname)
whois = self.whois_run(sourceip=ip)
snote_dict = {
"notice": "connect",
"nickname": nickname,
"username": username,
"host": host,
"ip": ip,
"realname": realname,
"ipCount": ip_seen,
"nickCount": nick_seen,
}
repl = f"\x02\x1F{snote_dict['notice']} \x0F\x11\x0303==>>\x0F \x02Nick:\x0F {snote_dict['nickname']} \x02Username:\x0F {snote_dict['username']} \x02Hostname:\x0F {snote_dict['host']} \x02IP:\x0F {snote_dict['ip']} {whois} \x02Realname:\x0F {snote_dict['realname']} \x02IPcount:\x0F {snote_dict['ipCount']} \x02NickCount:\x0F {snote_dict['nickCount']}"
DictFromSnotice = {'notice': 'connect', 'nickname': nickname, 'username': username, 'host': host, 'ip': ip, 'realname': realname, 'ipCount': ip_seen, 'nickCount': nick_seen}
#repl = f"\x02\x1FNOTICE: {DictFromSnotice['notice']} \x0F\x11\x0303==>>\x0F \x02Nick:\x0F {DictFromSnotice['nickname']} \x02Username:\x0F {DictFromSnotice['username']} \x02Hostname:\x0F {DictFromSnotice['host']} \x02IP:\x0F {DictFromSnotice['ip']} \x02Realname:\x0F {DictFromSnotice['realname']} \x02IPcount:\x0F {DictFromSnotice['ipCount']} \x02NickCount:\x0F {DictFromSnotice['nickCount']}"
repl = f"\x02\x1F{DictFromSnotice['notice']} \x0F\x11\x0303==>>\x0F \x02Nick:\x0F {DictFromSnotice['nickname']} \x02Username:\x0F {DictFromSnotice['username']} \x02Hostname:\x0F {DictFromSnotice['host']} \x02IP:\x0F {DictFromSnotice['ip']} {whois} \x02Realname:\x0F {DictFromSnotice['realname']} \x02IPcount:\x0F {DictFromSnotice['ipCount']} \x02NickCount:\x0F {DictFromSnotice['nickCount']}"
self._sendSnotice(irc, msg, repl)
# if 'XLINE' in text and 'temporary' in text:
RE_XLINE = re.compile(
r"^-XLINE- (.+) \[(.+)\] added temporary \((.+)\) (K-Line|D-Line) for (.+)$"
)
couple = RE_XLINE.match(text)
if couple:
if 'XLINE' in text and 'temporary' in text:
xlineregex = "^-XLINE- (.+) \[(.+)\] added temporary \((.+)\) (K-Line|D-Line) for (.+)$"
couple = re.match(xlineregex, text)
who = couple.group(1)
who_operator = couple.group(2)
duration = couple.group(3)
which_line = couple.group(4)
host_or_ip = couple.group(5)
snote_dict = {
"notice": "tempban",
"who": who,
"operator": who_operator,
"duration": duration,
"type": which_line,
"target": host_or_ip,
}
repl = f"\x02\x1FNOTICE: {snote_dict['notice']}\x0F \x11\x0303 X_X \x0F \x02BannedBy:\x0F {snote_dict['who']} \x02BannedByOper:\x0F {snote_dict['operator']} \x02Duration:\x0F {snote_dict['duration']} \x02XLINE Type:\x0F {snote_dict['type']} \x02Nick:\x0F {snote_dict['target']}"
DictFromSnotice = {'notice': 'tempban', 'who': who, 'operator': who_operator, 'duration': duration, 'type': which_line, 'target': host_or_ip}
repl = f"\x02\x1FNOTICE: {DictFromSnotice['notice']}\x0F \x11\x0303 X_X \x0F \x02BannedBy:\x0F {DictFromSnotice['who']} \x02BannedByOper:\x0F {DictFromSnotice['operator']} \x02Duration:\x0F {DictFromSnotice['duration']} \x02XLINE Type:\x0F {DictFromSnotice['type']} \x02Nick:\x0F {DictFromSnotice['target']}"
self._sendSnotice(irc, msg, repl)
# WHY THE FUCK IS IT elif ??
elif "XLINE" in text and "temporary" not in text and "removed" not in text:
perm_xline_regex = (
"^-XLINE- (.+) \[(.+)\] added (D-Line|K-Line) for (.+)$"
)
elif 'XLINE' in text and 'temporary' not in text and 'removed' not in text:
perm_xline_regex = "^-XLINE- (.+) \[(.+)\] added (D-Line|K-Line) for (.+)$"
couple = re.match(perm_xline_regex, text)
who = couple.group(1)
who_operator = couple.group(2)
which_line = couple.group(3)
host_or_ip = couple.group(4)
snote_dict = {
"notice": "Permaban",
"who": who,
"operator": who_operator,
"type": which_line,
"target": host_or_ip,
}
repl = f"\x02\x1FNOTICE: {snote_dict['notice']} \x0F \x11\x0303 X_X \x0F \x02BannedBy:\x0F {snote_dict['who']} \x02BannedByOper:\x0F {snote_dict['operator']} \x02XLINE Type:\x0F {snote_dict['type']} \x02Host/IP:\x0F {snote_dict['target']}"
DictFromSnotice = {'notice': 'Permaban', 'who': who, 'operator': who_operator, 'type': which_line, 'target': host_or_ip}
repl = f"\x02\x1FNOTICE: {DictFromSnotice['notice']} \x0F \x11\x0303 X_X \x0F \x02BannedBy:\x0F {DictFromSnotice['who']} \x02BannedByOper:\x0F {DictFromSnotice['operator']} \x02XLINE Type:\x0F {DictFromSnotice['type']} \x02Host/IP:\x0F {DictFromSnotice['target']}"
self._sendSnotice(irc, msg, repl)
elif "XLINE" in text and "removed" in text:
elif 'XLINE' in text and 'removed' in text:
unxlineregex = "^-XLINE- (.+) removed (D-Line|K-Line) for (.+)$"
couple = re.match(unxlineregex, text)
who = couple.group(1)
which_line = couple.group(2)
host_or_ip = couple.group(3)
snote_dict = {
"notice": "unxline",
"who": who,
"type": which_line,
"target": host_or_ip,
}
repl = f"\x02\x1FNOTICE: {snote_dict['notice']} \x0F\x11\x0303 :=D\x0F \x02UnbannedBy:\x0F {snote_dict['who']} \x02XLINE type:\x0F {snote_dict['type']} \x02Host/IP:\x0F {snote_dict['target']}"
DictFromSnotice = {'notice': 'unxline', 'who': who, 'type': which_line, 'target': host_or_ip}
repl = f"\x02\x1FNOTICE: {DictFromSnotice['notice']} \x0F\x11\x0303 :=D\x0F \x02UnbannedBy:\x0F {DictFromSnotice['who']} \x02XLINE type:\x0F {DictFromSnotice['type']} \x02Host/IP:\x0F {DictFromSnotice['target']}"
self._sendSnotice(irc, msg, repl)
if "KILL" in text:
if 'KILL' in text:
killregex = "^-KILL- (.+) \[(.+)\] killed (\d) clients with a (KLINE|DLINE) \[(.+)\]$"
couple = re.match(killregex, text)
who = couple.group(1)
@ -325,110 +316,72 @@ class SnoParser(callbacks.Plugin):
clients = couple.group(3)
which_line = couple.group(4)
nick = couple.group(5)
snote_dict = {
"notice": "kill",
"who": who,
"operator": who_operator,
"client": clients,
"type": which_line,
"nick": nick,
}
repl = f"\x02\x1FNOTICE: {snote_dict['notice']} \x0F\x11\x0303☠\x0F \x02KilledBy:\x0F {snote_dict['who']} \x02KilledByOper:\x0F {snote_dict['operator']} \x02NumofClientsAffected:\x0F {snote_dict['client']} \x02XLINE Type:\x0F {snote_dict['type']} \x02Nick:\x0F {snote_dict['nick']}"
DictFromSnotice = {'notice': 'kill', 'who': who, 'operator': who_operator, "client": clients, 'type': which_line, 'nick': nick}
repl = f"\x02\x1FNOTICE: {DictFromSnotice['notice']} \x0F\x11\x0303☠\x0F \x02KilledBy:\x0F {DictFromSnotice['who']} \x02KilledByOper:\x0F {DictFromSnotice['operator']} \x02NumofClientsAffected:\x0F {DictFromSnotice['client']} \x02XLINE Type:\x0F {DictFromSnotice['type']} \x02Nick:\x0F {DictFromSnotice['nick']}"
self._sendSnotice(irc, msg, repl)
if "NICK" in text and "changed nickname to" in text:
if 'NICK' in text and 'changed nickname to' in text:
nickregex = "^-NICK- (.+) changed nickname to (.+)$"
couple = re.match(nickregex, text)
old_nick = couple.group(1)
new_nick = couple.group(2)
snote_dict = {
"notice": "nick change",
"old_nick": old_nick,
"new_nick": new_nick,
}
repl = f"\x02\x1FNOTICE: {snote_dict['notice']} ==> {snote_dict['old_nick']} changed their nick to {snote_dict['new_nick']}"
DictFromSnotice = {'notice': 'nick change', 'old_nick': old_nick, 'new_nick': new_nick}
repl = f"\x02\x1FNOTICE: {DictFromSnotice['notice']} ==> {DictFromSnotice['old_nick']} changed their nick to {DictFromSnotice['new_nick']}"
self._sendSnotice(irc, msg, repl)
if "QUIT" in text and "exited" in text:
if 'QUIT' in text and 'exited' in text:
quitregex = "^-QUIT- (.+) exited the network$"
couple = re.match(quitregex, text)
nick = couple.group(1)
snote_dict = {"notice": "quit", "nick": nick}
DictFromSnotice = {'notice': 'quit', 'nick': nick}
repl = f"\x02\x1FNOTICE: quit nick: {nick} has exited the network"
self._sendSnotice(irc, msg, repl)
if "ACCOUNT" in text and "registered account" in text:
if 'ACCOUNT' in text and 'registered account' in text:
accregex = "^-ACCOUNT- Client \[(.*)\] registered account \[(.*)\] from IP (.*)$"
couple = re.match(accregex, text)
hostmask = couple.group(1)
account = couple.group(2)
ip = couple.group(3)
snote_dict = {
"notice": "accreg",
"hostmask": hostmask,
"account": account,
"ip": ip,
}
DictFromSnotice = {'notice': 'accreg', 'hostmask': hostmask, 'account': account, 'ip': ip}
repl = f"\x02\x1FNOTICE: accreg -> [{account}] was registered by hostmask [{hostmask}] from IP {ip}"
# Trigger HS SET
self._setvhost(irc, msg, account)
self._sendSnotice(irc, msg, repl)
if (
"ACCOUNT" in text
and "registered account" in text
and "SAREGISTER" in text
):
self._sendSnotice(irc, msg, repl)
if 'ACCOUNT' in text and 'registered account' in text and 'SAREGISTER' in text:
accregex = "^-ACCOUNT- Operator \[(.*)\] registered account \[(.*)\] with SAREGISTER$"
couple = re.match(accregex, text)
oper = couple.group(1)
account = couple.group(2)
snote_dict = {"notice": "sareg", "oper": oper, "account": account}
DictFromSnotice = {'notice': 'sareg', 'oper': oper, 'account': account}
repl = f"\x02\x1FNOTICE: sareg -> [{account}] was registered by operator [{oper}]"
self._setvhost(irc, msg, account)
self._sendSnotice(irc, msg, repl)
if "OPER" in text and "Client opered up" in text:
operregex = "^-OPER- Client opered up \[(.*), (.*)\]$"
couple = re.match(operregex, text)
hostmask = couple.group(1)
oper = couple.group(2)
print(couple)
snote_dict = {"notice": "opered", "hostmask": hostmask, "oper": oper}
repl = f"\x02\x1FNOTICE:\x0F [{hostmask}] opered up as [{oper}]."
self._sendSnotice(irc, msg, repl)
if "OPER" in text and "Client deopered" in text:
operregex = "^-OPER- Client deopered \[(.*)\]"
couple = re.match(operregex, text)
account = couple.group(1)
snote_dict = {"notice": "deopered", "name": account}
repl = f"\x02\x1FNOTICE:\x0F [{account}] opered down."
self._sendSnotice(irc, msg, repl)
# Post Registration
def _setvhost(self, irc, msg, account):
arg = ["SET"]
arg = ['SET']
arg.append(account)
vhost = self.registryValue("AutoVhost")
arg.append(f"{vhost}{account}")
irc.sendMsg(msg=ircmsgs.IrcMsg(command="HS", args=arg))
vhost = self.registryValue('AutoVhost')
arg.append(f'{vhost}{account}')
irc.sendMsg(msg=ircmsgs.IrcMsg(command='HS',
args=arg))
# Send formatted SNO to channel
# Send formatted SNO to channel
def _sendSnotice(self, irc, msg, repl):
try:
channel = self.registryValue("targetChannel")
if irc.isChannel(channel):
irc.queueMsg(msg=ircmsgs.IrcMsg(command="NOTICE", args=(channel, repl)))
channel = self.registryValue('targetChannel')
if channel[0] == '#':
irc.queueMsg(msg=ircmsgs.IrcMsg(command='NOTICE',
args=(channel, repl)))
# what sort of exception does one raise
except:
pass
Class = SnoParser

View File

@ -1,4 +0,0 @@
async-timeout==4.0.2
dnspython==2.0.0
ipwhois==1.2.0
redis==4.5.4