Compare commits

..

19 Commits

Author SHA1 Message Date
103ae6d638 Merge pull request 'Improve' (#21) from improve into master
Reviewed-on: LimnoriaPlugins/SnoParser#21

had missed it
2024-12-16 09:14:04 +01:00
2b0207d12b
Reduce data variables
Signed-off-by: Georg Pfuetzenreuter <mail@georg-pfuetzenreuter.net>
2024-10-06 17:22:20 +02:00
58f1e22dde
Correct ipaddress argument
Signed-off-by: Georg Pfuetzenreuter <mail@georg-pfuetzenreuter.net>
2024-10-06 17:22:19 +02:00
3ba723b111
Correct OPER regex
Signed-off-by: Georg Pfuetzenreuter <mail@georg-pfuetzenreuter.net>
2024-10-06 17:22:19 +02:00
4d13d704dc
Clean up Redis logic
Simplify functions and move internal database interfaces into a more
appropriate data structure.
Configure all database numbers using a redis plugin group, instead of
using a custom setting just for the whois database to make the setup
more logical.

Signed-off-by: Georg Pfuetzenreuter <mail@georg-pfuetzenreuter.net>
2024-10-06 17:22:19 +02:00
20797b44cd
Clean up imports
Signed-off-by: Georg Pfuetzenreuter <mail@georg-pfuetzenreuter.net>
2024-10-05 19:48:09 +02:00
ab36cc85e8 Merge pull request 'black formatting' (#20) from black into master
Reviewed-on: LimnoriaPlugins/SnoParser#20
2023-05-03 16:44:48 +02:00
43526c19a6
black formatting
Signed-off-by: Pratyush Desai <pratyush.desai@liberta.casa>
2023-05-03 20:13:41 +05:30
f6c9705fc5 Merge pull request 'regex-cleanup' (#19) from regex-cleanup into master
Reviewed-on: LimnoriaPlugins/SnoParser#19
2023-05-03 13:01:49 +02:00
25ab2849ae regex efficiency wip need to look up SNO list
Signed-off-by: Pratyush Desai <pratyush.desai@liberta.casa>
2021-10-16 13:59:54 +05:30
8cacf16cb9 fix target chan issues
Signed-off-by: Pratyush Desai <pratyush.desai@liberta.casa>
2021-10-16 11:43:34 +05:30
a321b6584d
Disabling channel validation
Signed-off-by: Georg <georg@lysergic.dev>
2021-09-11 12:09:36 +02:00
ca02bba8c3 TargetChannel config + requirements.txt (#17)
Added requirements.txt
Updated configuration value errors

Reviewed-on: LimnoriaPlugins/SnoParser#17
Co-authored-by: Pratyush Desai <pratyush.desai@liberta.casa>
Co-committed-by: Pratyush Desai <pratyush.desai@liberta.casa>
2021-09-07 16:33:00 +02:00
46596e0b33
ipquery IP validation + repaired config integers
Signed-off-by: Georg <georg@lysergic.dev>
2021-08-31 16:29:17 +02:00
14e7d4e74e
OPER and DEOPER notices
Signed-off-by: Georg <georg@lysergic.dev>
2021-08-29 00:13:44 +02:00
9a035edef3
Fixing timedelta
Signed-off-by: Georg <georg@lysergic.dev>
2021-08-27 15:55:27 +02:00
ee7c161067
Adding branch info to README.rst
Signed-off-by: Georg <georg@lysergic.dev>
2021-08-27 15:05:25 +02:00
3ada63a085
Removing __pycache__
Signed-off-by: Georg <georg@lysergic.dev>
2021-08-27 14:53:38 +02:00
6b6a35cd94
Whois + Caching + Nickname/IP address counting
Signed-off-by: Georg <georg@lysergic.dev>
2021-08-27 14:25:42 +02:00
8 changed files with 377 additions and 246 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
env/ env/
.vscode .vscode
__pycache__/

View File

@ -1,8 +1,11 @@
SnoParser SnoParser + Redis
========= =================
Parses the Server Notices from ErgoIRCd. Parses the Server Notices from ErgoIRCd.
This branch contains Redis functionality:
Performs and caches WHOIS lookups for connecting IP addresses. Counts nicknames and IP addresses.
Assumptions Assumptions
----------- -----------

Binary file not shown.

Binary file not shown.

Binary file not shown.

244
config.py
View File

@ -28,15 +28,24 @@
### ###
from supybot import conf, registry from supybot import conf, registry, ircutils
try: try:
from supybot.i18n import PluginInternationalization from supybot.i18n import PluginInternationalization
_ = PluginInternationalization('SnoParser')
_ = PluginInternationalization("SnoParser")
except: except:
# Placeholder that allows to run the plugin on a bot # Placeholder that allows to run the plugin on a bot
# without the i18n module # without the i18n module
_ = lambda x: x _ = lambda x: x
# class ValidChannel(registry.string):
# """Value must be a valid channel"""
# def setValue(self, v):
# if not (ircutils.isChannel(v)):
# self.error()
# registry.String.setValue(self, v)
def configure(advanced): def configure(advanced):
# This will be called by supybot to configure this module. advanced is # This will be called by supybot to configure this module. advanced is
@ -44,115 +53,182 @@ def configure(advanced):
# user or not. You should effect your configuration by manipulating the # user or not. You should effect your configuration by manipulating the
# registry as appropriate. # registry as appropriate.
from supybot.questions import expect, anything, something, yn from supybot.questions import expect, anything, something, yn
conf.registerPlugin('SnoParser', True)
conf.registerPlugin("SnoParser", True)
SnoParser = conf.registerPlugin('SnoParser') SnoParser = conf.registerPlugin("SnoParser")
# This is where your configuration variables (if any) should go. For example: # This is where your configuration variables (if any) should go. For example:
# conf.registerGlobalValue(SnoParser, 'someConfigVariableName', # conf.registerGlobalValue(SnoParser, 'someConfigVariableName',
# registry.Boolean(False, _("""Help for someConfigVariableName."""))) # registry.Boolean(False, _("""Help for someConfigVariableName.""")))
conf.registerGlobalValue(SnoParser, 'targetChannel', # conf.registerNetworkValue(SnoParser, 'targetChannel', ValidChannel,
registry.String(None, ("""Sends reformatted snolines to the <channel>"""))) # ("", ("""Determines which channel the bot should send snolines""")))
conf.registerGlobalValue(SnoParser, 'AutoVhost', conf.registerNetworkValue(
registry.String('libcasa/user/', ("""Configure the vhost eg. libcasa/user/$account"""))) SnoParser,
"targetChannel",
registry.String(
"",
("""Determines which channel the bot should snolines example: `#snotices`"""),
),
)
conf.registerGlobalValue(SnoParser, 'preventHighlight', conf.registerGlobalValue(
registry.Boolean(True, ("""Toggles in channel highlights with ZWSP"""))) SnoParser,
"AutoVhost",
registry.String(
"libcasa/user/", ("""Configure the vhost eg. libcasa/user/$account""")
),
)
conf.registerGlobalValue(SnoParser, 'debug', conf.registerGlobalValue(
registry.Boolean('false', SnoParser,
""" "preventHighlight",
registry.Boolean(True, ("""Toggles in channel highlights with ZWSP""")),
)
conf.registerGlobalValue(
SnoParser,
"debug",
registry.Boolean(
"false",
"""
SnoParser: Verbose output. Note: there is a seperate debug option for the `whois` client. SnoParser: Verbose output. Note: there is a seperate debug option for the `whois` client.
""" """,
, private=True private=True,
)) ),
)
### ###
# REDIS related settings below: # REDIS related settings below:
### ###
conf.registerGroup(SnoParser, 'redis') conf.registerGroup(SnoParser, "redis")
conf.registerGlobalValue(SnoParser.redis, 'db1', conf.registerGroup(SnoParser.redis, "db")
registry.Integer('1', conf.registerGlobalValue(
""" SnoParser.redis.db,
Redis: Database number for counting of NICKNAMES. "ips",
""" registry.Integer(
, private=True 2,
)) """
conf.registerGlobalValue(SnoParser.redis, 'db2',
registry.Integer('2',
"""
Redis: Database number for counting of IP ADDRESSES. Redis: Database number for counting of IP ADDRESSES.
""" """,
, private=True private=True,
)) ),
conf.registerGlobalValue(SnoParser.redis, 'host', )
registry.String('127.0.0.1', conf.registerGlobalValue(
""" SnoParser.redis.db,
"nicks",
registry.Integer(
1,
"""
Redis: Database number for counting of NICKNAMES.
""",
private=True,
),
)
conf.registerGlobalValue(
SnoParser.redis.db,
"whois",
registry.Integer(
0,
"""
Redis: Database number for WHOIS query caching.
""",
),
)
conf.registerGlobalValue(
SnoParser.redis,
"host",
registry.String(
"localhost",
"""
Redis: IP address or hostname. Redis: IP address or hostname.
""" """,
, private=True private=True,
)) ),
conf.registerGlobalValue(SnoParser.redis, 'port', )
registry.Integer('6379', conf.registerGlobalValue(
""" SnoParser.redis,
"port",
registry.Integer(
6379,
"""
Redis: Port. Redis: Port.
""" """,
, private=True private=True,
)) ),
conf.registerGlobalValue(SnoParser.redis, 'username', )
registry.String('', conf.registerGlobalValue(
""" SnoParser.redis,
"username",
registry.String(
"",
"""
Redis: Username. This is optional and has not been tested. It is recommended to perform initial tests on a local instance with no username and password. Redis: Username. This is optional and has not been tested. It is recommended to perform initial tests on a local instance with no username and password.
""" """,
, private=True private=True,
)) ),
conf.registerGlobalValue(SnoParser.redis, 'password', )
registry.String('', conf.registerGlobalValue(
""" SnoParser.redis,
"password",
registry.String(
"",
"""
Redis: Password. This is optional and has not been tested. It is recommended to perform initial tests on a local instance with no username and password. Redis: Password. This is optional and has not been tested. It is recommended to perform initial tests on a local instance with no username and password.
""" """,
)) ),
conf.registerGlobalValue(SnoParser.redis, 'timeout', )
registry.Integer('5', conf.registerGlobalValue(
""" SnoParser.redis,
"timeout",
registry.Integer(
5,
"""
Redis: Socket Timeout. The developer does not know what to recommend here, but `5` seems to yield good results. Redis: Socket Timeout. The developer does not know what to recommend here, but `5` seems to yield good results.
""" """,
)) ),
)
### ###
## WHOIS related settings below: ## WHOIS related settings below:
### ###
conf.registerGroup(SnoParser, 'whois') conf.registerGroup(SnoParser, "whois")
conf.registerGlobalValue(SnoParser.whois, 'debug', conf.registerGlobalValue(
registry.Boolean('false', SnoParser.whois,
""" "debug",
registry.Boolean(
"false",
"""
SnoParser: True: Very verbose console output. False: Mostly silent operation. SnoParser: True: Very verbose console output. False: Mostly silent operation.
""" """,
, private=True private=True,
)) ),
conf.registerGlobalValue(SnoParser.whois, 'sample', )
registry.String('', conf.registerGlobalValue(
""" SnoParser.whois,
"sample",
registry.String(
"",
"""
SnoParser: This allows to set a testing IP address, if the plugin shall be evaluated on i.e. a local network. This will override all IP addresses from SNOTICES! SnoParser: This allows to set a testing IP address, if the plugin shall be evaluated on i.e. a local network. This will override all IP addresses from SNOTICES!
""" """,
, private=True private=True,
)) ),
conf.registerGlobalValue(SnoParser.whois, 'ttl', )
registry.Integer('3600', conf.registerGlobalValue(
""" SnoParser.whois,
"ttl",
registry.Integer(
3600,
"""
SnoParser: How long to cache WHOIS entries for. SnoParser: How long to cache WHOIS entries for.
""" """,
, private=True private=True,
)) ),
conf.registerGroup(SnoParser.whois, 'redis') )
conf.registerGlobalValue(SnoParser.whois.redis, 'db',
registry.Integer('0',
"""
Redis: Database number for WHOIS query caching.
"""
))
# vim:set shiftwidth=4 tabstop=4 expandtab textwidth=79: # vim:set shiftwidth=4 tabstop=4 expandtab textwidth=79:

351
plugin.py
View File

@ -28,143 +28,116 @@
### ###
from supybot import utils, plugins, ircutils, callbacks, ircdb, conf, log, world, ircmsgs from supybot import (
ircutils,
callbacks,
ircmsgs,
)
from supybot.commands import * from supybot.commands import *
try: try:
from supybot.i18n import PluginInternationalization from supybot.i18n import PluginInternationalization
_ = PluginInternationalization('SnoParser')
_ = PluginInternationalization("SnoParser")
except ImportError: except ImportError:
# Placeholder that allows to run the plugin on a bot # Placeholder that allows to run the plugin on a bot
# without the i18n module # without the i18n module
_ = lambda x: x _ = lambda x: x
import re import re
import os
import sys
import time
import sqlite3
import redis import redis
import json import json
from datetime import timedelta from datetime import timedelta
from ipwhois import IPWhois from ipwhois import IPWhois
import ipwhois from ipwhois.exceptions import IPDefinedError
class SnoParser(callbacks.Plugin): class SnoParser(callbacks.Plugin):
"""Parses the Server Notices from ErgoIRCd""" """Parses the Server Notices from ErgoIRCd"""
threaded = True threaded = True
def redis_connect_whois(self) -> redis.client.Redis: def _redis_connect(self, db) -> redis.client.Redis:
try: try:
redis_client_whois = redis.Redis( redis_client = redis.Redis(
host = self.registryValue('redis.host'), host=self.registryValue("redis.host"),
port = self.registryValue('redis.port'), port=self.registryValue("redis.port"),
password = self.registryValue('redis.password'), password=self.registryValue("redis.password"),
username = self.registryValue('redis.username'), username=self.registryValue("redis.username"),
db = self.registryValue('whois.redis.db'), db=self.registryValue(f"redis.db.{db}"),
socket_timeout = int(self.registryValue('redis.timeout')) socket_timeout=int(self.registryValue("redis.timeout")),
) )
ping = redis_client_whois.ping() ping = redis_client.ping()
if ping is True: if ping is True:
return redis_client_whois return redis_client
except redis.AuthenticationError: except redis.AuthenticationError:
print("Could not authenticate to Redis backend.") print("Could not authenticate to Redis backend.")
def redis_connect_nicks(self) -> redis.client.Redis:
try:
redis_client_nicks = redis.Redis(
host = self.registryValue('redis.host'),
port = self.registryValue('redis.port'),
password = self.registryValue('redis.password'),
username = self.registryValue('redis.username'),
db = self.registryValue('redis.db1'),
socket_timeout = int(self.registryValue('redis.timeout'))
)
ping = redis_client_nicks.ping()
if ping is True:
return redis_client_nicks
except redis.AuthenticationError:
print("Could not authenticate to Redis backend.")
def redis_connect_ips(self) -> redis.client.Redis:
try:
redis_client_ips = redis.Redis(
host = self.registryValue('redis.host'),
port = self.registryValue('redis.port'),
password = self.registryValue('redis.password'),
username = self.registryValue('redis.username'),
db = self.registryValue('redis.db2'),
socket_timeout = int(self.registryValue('redis.timeout'))
)
ping = redis_client_ips.ping()
if ping is True:
return redis_client_ips
except redis.AuthenticationError:
print("Could not authenticate to Redis backend.")
def __init__(self, irc): def __init__(self, irc):
super().__init__(irc) super().__init__(irc)
self.redis_client_whois = self.redis_connect_whois() self.redis_clients = {
self.redis_client_nicks = self.redis_connect_nicks() 'ips': self._redis_connect("ips"),
self.redis_client_ips = self.redis_connect_ips() 'nicks': self._redis_connect("nicks"),
'whois': self._redis_connect("whois"),
}
def _get_from_cache(self, db, key):
""" Get value from Redis cache """
return self.redis_clients[db].get(key)
def whois_fresh(self, sourceip: str) -> dict: def whois_fresh(self, sourceip: str) -> dict:
"""Data from cache.""" """Data from WHOIS backend (IANA or respective RIR)."""
asn = 0 asn = 0
subnet = '' subnet = ""
try: try:
whois = IPWhois(sourceip) whois = IPWhois(sourceip)
whoisres = whois.lookup_rdap(depth=1,retry_count=0) whoisres = whois.lookup_rdap(depth=1, retry_count=0)
results = whoisres results = whoisres
if self.registryValue('whois.debug'): if self.registryValue("whois.debug"):
print(results) print(results)
asn = whoisres['asn_registry'] asn = whoisres["asn_registry"]
country = whoisres['asn_country_code'] country = whoisres["asn_country_code"]
description = whoisres['asn_description'] description = whoisres["asn_description"]
whoisout = asn + ' ' + country + ' ' + description whoisout = asn + " " + country + " " + description
except ipwhois.exceptions.IPDefinedError: except IPDefinedError:
whoisout = 'RFC 4291 (Local)' whoisout = "RFC 4291 (Local)"
response = whoisout response = whoisout
return response return response
def whois_get_cache(self, key: str) -> str:
"""Data from Redis."""
k = self.redis_client_whois.get(key)
# self = SnoParser()
# val = self.redis_client_whois.get(key)
val = k
return val
def whois_set_cache(self, key: str, value: str) -> bool: def whois_set_cache(self, key: str, value: str) -> bool:
"""Data to Redis.""" """Data to Redis."""
duration = self.registryValue('whois.ttl') return self.redis_clients['whois'].setex(
state = self.redis_client_whois.setex(key, timedelta(seconds=duration), value=value,) key,
return state timedelta(seconds=int(self.registryValue("whois.ttl"))),
value=value,
)
def whois_run(self, sourceip: str) -> dict: def whois_run(self, sourceip: str) -> dict:
"""Whois query router.""" """Whois query router."""
data = self.whois_get_cache(key=sourceip) data = self._get_from_cache("whois", sourceip)
if data is not None: if data is not None:
data = json.loads(data) data = json.loads(data)
if self.registryValue('whois.debug'): if self.registryValue("whois.debug"):
print("SNOPARSER DEBUG - WHOIS_RUN WITH CACHE: TRUE") print("SNOPARSER DEBUG - WHOIS_RUN WITH CACHE: TRUE")
print(data) print(data)
print(sourceip) print(sourceip)
return data return data
else: else:
data = self.whois_fresh(sourceip) data = self.whois_fresh(sourceip)
if self.registryValue('whois.debug'): if self.registryValue("whois.debug"):
print("SNOPARSER DEBUG - WHOIS_RUN WITH CACHE: FALSE") print("SNOPARSER DEBUG - WHOIS_RUN WITH CACHE: FALSE")
print(data) print(data)
print(sourceip) print(sourceip)
if data.startswith: if data.startswith:
if self.registryValue('whois.debug'): if self.registryValue("whois.debug"):
print("SNOPARSER DEBUG - WHOIS_RUN WITH CACHE: FALSE AND CORRECT STARTING CHARACTER") print(
"SNOPARSER DEBUG - WHOIS_RUN WITH CACHE: FALSE AND CORRECT STARTING CHARACTER"
)
print(data) print(data)
data = json.dumps(data) data = json.dumps(data)
state = self.whois_set_cache(key=sourceip, value=data) state = self.whois_set_cache(key=sourceip, value=data)
@ -172,35 +145,37 @@ class SnoParser(callbacks.Plugin):
if state is True: if state is True:
return json.loads(data) return json.loads(data)
else: else:
if self.registryValue('whois.debug'): if self.registryValue("whois.debug"):
print("SNOPARSER DEBUG _ WHOIS_RUN WITH CACHE: FALSE AND WRONG STARTING CHARACTER") print(
"SNOPARSER DEBUG _ WHOIS_RUN WITH CACHE: FALSE AND WRONG STARTING CHARACTER"
)
print(data) print(data)
return data return data
def nick_run(self, nickname: str) -> dict: def nick_run(self, nickname: str) -> dict:
"""Tracks nicknames""" """Tracks nicknames"""
data = self.redis_client_nicks.get(nickname) data = self._get_from_cache("nicks", nickname)
if data is not None: if data is not None:
if self.registryValue('debug'): if self.registryValue("debug"):
print("SNOPARSER DEBUG - NICK_RUN, SEEN: TRUE") print("SNOPARSER DEBUG - NICK_RUN, SEEN: TRUE")
print(nickname) print(nickname)
print(data) print(data)
self.redis_client_nicks.incrby(nickname,amount=1) self.redis_clients['nicks'].incrby(nickname, amount=1)
if data: if data:
decoded = data.decode('utf-8') decoded = data.decode("utf-8")
return decoded return decoded
else: else:
return 0 return 0
else: else:
if self.registryValue('debug'): if self.registryValue("debug"):
print("SNOPARSER DEBUG _ NICK_RUN, SEEN: FALSE") print("SNOPARSER DEBUG _ NICK_RUN, SEEN: FALSE")
print(nickname) print(nickname)
print(data) print(data)
self.redis_client_nicks.set(nickname,value='1') self.redis_clients['nicks'].set(nickname, value="1")
if data: if data:
decoded = data.decode('utf-8') decoded = data.decode("utf-8")
return decoded return decoded
else: else:
return 0 return 0
@ -208,27 +183,26 @@ class SnoParser(callbacks.Plugin):
def ip_run(self, ipaddress: str) -> dict: def ip_run(self, ipaddress: str) -> dict:
"""Tracks IP addresses""" """Tracks IP addresses"""
data = self.redis_client_ips.get(ipaddress) data = self._get_from_cache("ips", ipaddress)
if data is not None: if data is not None:
if self.registryValue('debug'): if self.registryValue("debug"):
print("SNOPARSER DEBUG - IP_RUN, SEEN: TRUE") print("SNOPARSER DEBUG - IP_RUN, SEEN: TRUE")
print(ipaddress) print(ipaddress)
print(data) print(data)
self.redis_client_ips.incrby(ipaddress,amount=1) self.redis_clients['ips'].incrby(ipaddress, amount=1)
if data: if data:
decoded = data.decode('utf-8') decoded = data.decode("utf-8")
return decoded return decoded
else: else:
return 0 return 0
else: else:
if self.registryValue('debug'): if self.registryValue("debug"):
print("SNOPARSER DEBUG _ IP_RUN, SEEN: FALSE") print("SNOPARSER DEBUG _ IP_RUN, SEEN: FALSE")
print(ipaddress) print(ipaddress)
print(data) print(data)
self.redis_client_ips.set(ipaddress,value='1') self.redis_clients['ips'].set(ipaddress, value="1")
if data: if data:
decoded = data.decode('utf-8') decoded = data.decode("utf-8")
return decoded return decoded
else: else:
return 0 return 0
@ -238,32 +212,36 @@ class SnoParser(callbacks.Plugin):
Queries the cache for an address. Queries the cache for an address.
""" """
data = self.whois_get_cache(key=ipaddress) data = self.redis_clients['whois'].get(ipaddress)
decoded_data = data.decode('utf-8') if data is not None:
ttl = self.redis_client_whois.ttl(ipaddress) data = data.decode("utf-8")
count = self.redis_client_ips.get(ipaddress) ttl = self.redis_clients['whois'].ttl(ipaddress)
decoded_count = count.decode('utf-8') count = self.redis_clients['ips'].get(ipaddress)
if count is not None:
print('SnoParser manual query: ', data, ' ', ttl) count = count.decode("utf-8")
irc.reply(f'{decoded_data} - Count: {decoded_count} - Remaining: {ttl}s') print("SnoParser manual query: ", data, " ", ttl)
irc.reply(f"{data} - Count: {count} - Remaining: {ttl}s")
ipquery = wrap(ipquery, ['anything'])
ipquery = wrap(ipquery, ["ip"])
def doNotice(self, irc, msg): def doNotice(self, irc, msg):
(target, text) = msg.args (target, text) = msg.args
if target == irc.nick: if target == irc.nick:
# server notices CONNECT, KILL, XLINE, NICK, ACCOUNT # server notices CONNECT, KILL, XLINE, NICK, ACCOUNT, OPER, QUIT,
text = ircutils.stripFormatting(text) text = ircutils.stripFormatting(text)
if 'CONNECT' in text: # if 'CONNECT' in text:
connregex = "^-CONNECT- Client connected \[(.+)\] \[u\:~(.+)\] \[h\:(.+)\] \[ip\:(.+)\] \[r\:(.+)\]$" RE_CLICONN = re.compile(
couple = re.match(connregex, text) r"^-CONNECT- Client connected \[(.+)\] \[u\:~(.+)\] \[h\:(.+)\] \[ip\:(.+)\] \[r\:(.+)\]$"
)
couple = RE_CLICONN.match(text)
# check `if couple:` ie was there a match even? if yes proceed
if couple:
nickname = couple.group(1) nickname = couple.group(1)
username = couple.group(2) username = couple.group(2)
host = couple.group(3) host = couple.group(3)
if self.registryValue('whois.sample'): if self.registryValue("whois.sample"):
ip = self.registryValue('whois.sample') ip = self.registryValue("whois.sample")
else: else:
ip = couple.group(4) ip = couple.group(4)
realname = couple.group(5) realname = couple.group(5)
@ -272,43 +250,74 @@ class SnoParser(callbacks.Plugin):
nick_seen = self.nick_run(nickname=nickname) nick_seen = self.nick_run(nickname=nickname)
whois = self.whois_run(sourceip=ip) whois = self.whois_run(sourceip=ip)
DictFromSnotice = {'notice': 'connect', 'nickname': nickname, 'username': username, 'host': host, 'ip': ip, 'realname': realname, 'ipCount': ip_seen, 'nickCount': nick_seen} snote_dict = {
#repl = f"\x02\x1FNOTICE: {DictFromSnotice['notice']} \x0F\x11\x0303==>>\x0F \x02Nick:\x0F {DictFromSnotice['nickname']} \x02Username:\x0F {DictFromSnotice['username']} \x02Hostname:\x0F {DictFromSnotice['host']} \x02IP:\x0F {DictFromSnotice['ip']} \x02Realname:\x0F {DictFromSnotice['realname']} \x02IPcount:\x0F {DictFromSnotice['ipCount']} \x02NickCount:\x0F {DictFromSnotice['nickCount']}" "notice": "connect",
repl = f"\x02\x1F{DictFromSnotice['notice']} \x0F\x11\x0303==>>\x0F \x02Nick:\x0F {DictFromSnotice['nickname']} \x02Username:\x0F {DictFromSnotice['username']} \x02Hostname:\x0F {DictFromSnotice['host']} \x02IP:\x0F {DictFromSnotice['ip']} {whois} \x02Realname:\x0F {DictFromSnotice['realname']} \x02IPcount:\x0F {DictFromSnotice['ipCount']} \x02NickCount:\x0F {DictFromSnotice['nickCount']}" "nickname": nickname,
"username": username,
"host": host,
"ip": ip,
"realname": realname,
"ipCount": ip_seen,
"nickCount": nick_seen,
}
repl = f"\x02\x1F{snote_dict['notice']} \x0F\x11\x0303==>>\x0F \x02Nick:\x0F {snote_dict['nickname']} \x02Username:\x0F {snote_dict['username']} \x02Hostname:\x0F {snote_dict['host']} \x02IP:\x0F {snote_dict['ip']} {whois} \x02Realname:\x0F {snote_dict['realname']} \x02IPcount:\x0F {snote_dict['ipCount']} \x02NickCount:\x0F {snote_dict['nickCount']}"
self._sendSnotice(irc, msg, repl) self._sendSnotice(irc, msg, repl)
if 'XLINE' in text and 'temporary' in text: # if 'XLINE' in text and 'temporary' in text:
xlineregex = "^-XLINE- (.+) \[(.+)\] added temporary \((.+)\) (K-Line|D-Line) for (.+)$" RE_XLINE = re.compile(
couple = re.match(xlineregex, text) r"^-XLINE- (.+) \[(.+)\] added temporary \((.+)\) (K-Line|D-Line) for (.+)$"
)
couple = RE_XLINE.match(text)
if couple:
who = couple.group(1) who = couple.group(1)
who_operator = couple.group(2) who_operator = couple.group(2)
duration = couple.group(3) duration = couple.group(3)
which_line = couple.group(4) which_line = couple.group(4)
host_or_ip = couple.group(5) host_or_ip = couple.group(5)
DictFromSnotice = {'notice': 'tempban', 'who': who, 'operator': who_operator, 'duration': duration, 'type': which_line, 'target': host_or_ip} snote_dict = {
repl = f"\x02\x1FNOTICE: {DictFromSnotice['notice']}\x0F \x11\x0303 X_X \x0F \x02BannedBy:\x0F {DictFromSnotice['who']} \x02BannedByOper:\x0F {DictFromSnotice['operator']} \x02Duration:\x0F {DictFromSnotice['duration']} \x02XLINE Type:\x0F {DictFromSnotice['type']} \x02Nick:\x0F {DictFromSnotice['target']}" "notice": "tempban",
"who": who,
"operator": who_operator,
"duration": duration,
"type": which_line,
"target": host_or_ip,
}
repl = f"\x02\x1FNOTICE: {snote_dict['notice']}\x0F \x11\x0303 X_X \x0F \x02BannedBy:\x0F {snote_dict['who']} \x02BannedByOper:\x0F {snote_dict['operator']} \x02Duration:\x0F {snote_dict['duration']} \x02XLINE Type:\x0F {snote_dict['type']} \x02Nick:\x0F {snote_dict['target']}"
self._sendSnotice(irc, msg, repl) self._sendSnotice(irc, msg, repl)
# WHY THE FUCK IS IT elif ?? # WHY THE FUCK IS IT elif ??
elif 'XLINE' in text and 'temporary' not in text and 'removed' not in text: elif "XLINE" in text and "temporary" not in text and "removed" not in text:
perm_xline_regex = "^-XLINE- (.+) \[(.+)\] added (D-Line|K-Line) for (.+)$" perm_xline_regex = (
"^-XLINE- (.+) \[(.+)\] added (D-Line|K-Line) for (.+)$"
)
couple = re.match(perm_xline_regex, text) couple = re.match(perm_xline_regex, text)
who = couple.group(1) who = couple.group(1)
who_operator = couple.group(2) who_operator = couple.group(2)
which_line = couple.group(3) which_line = couple.group(3)
host_or_ip = couple.group(4) host_or_ip = couple.group(4)
DictFromSnotice = {'notice': 'Permaban', 'who': who, 'operator': who_operator, 'type': which_line, 'target': host_or_ip} snote_dict = {
repl = f"\x02\x1FNOTICE: {DictFromSnotice['notice']} \x0F \x11\x0303 X_X \x0F \x02BannedBy:\x0F {DictFromSnotice['who']} \x02BannedByOper:\x0F {DictFromSnotice['operator']} \x02XLINE Type:\x0F {DictFromSnotice['type']} \x02Host/IP:\x0F {DictFromSnotice['target']}" "notice": "Permaban",
"who": who,
"operator": who_operator,
"type": which_line,
"target": host_or_ip,
}
repl = f"\x02\x1FNOTICE: {snote_dict['notice']} \x0F \x11\x0303 X_X \x0F \x02BannedBy:\x0F {snote_dict['who']} \x02BannedByOper:\x0F {snote_dict['operator']} \x02XLINE Type:\x0F {snote_dict['type']} \x02Host/IP:\x0F {snote_dict['target']}"
self._sendSnotice(irc, msg, repl) self._sendSnotice(irc, msg, repl)
elif 'XLINE' in text and 'removed' in text: elif "XLINE" in text and "removed" in text:
unxlineregex = "^-XLINE- (.+) removed (D-Line|K-Line) for (.+)$" unxlineregex = "^-XLINE- (.+) removed (D-Line|K-Line) for (.+)$"
couple = re.match(unxlineregex, text) couple = re.match(unxlineregex, text)
who = couple.group(1) who = couple.group(1)
which_line = couple.group(2) which_line = couple.group(2)
host_or_ip = couple.group(3) host_or_ip = couple.group(3)
DictFromSnotice = {'notice': 'unxline', 'who': who, 'type': which_line, 'target': host_or_ip} snote_dict = {
repl = f"\x02\x1FNOTICE: {DictFromSnotice['notice']} \x0F\x11\x0303 :=D\x0F \x02UnbannedBy:\x0F {DictFromSnotice['who']} \x02XLINE type:\x0F {DictFromSnotice['type']} \x02Host/IP:\x0F {DictFromSnotice['target']}" "notice": "unxline",
"who": who,
"type": which_line,
"target": host_or_ip,
}
repl = f"\x02\x1FNOTICE: {snote_dict['notice']} \x0F\x11\x0303 :=D\x0F \x02UnbannedBy:\x0F {snote_dict['who']} \x02XLINE type:\x0F {snote_dict['type']} \x02Host/IP:\x0F {snote_dict['target']}"
self._sendSnotice(irc, msg, repl) self._sendSnotice(irc, msg, repl)
if 'KILL' in text: if "KILL" in text:
killregex = "^-KILL- (.+) \[(.+)\] killed (\d) clients with a (KLINE|DLINE) \[(.+)\]$" killregex = "^-KILL- (.+) \[(.+)\] killed (\d) clients with a (KLINE|DLINE) \[(.+)\]$"
couple = re.match(killregex, text) couple = re.match(killregex, text)
who = couple.group(1) who = couple.group(1)
@ -316,68 +325,106 @@ class SnoParser(callbacks.Plugin):
clients = couple.group(3) clients = couple.group(3)
which_line = couple.group(4) which_line = couple.group(4)
nick = couple.group(5) nick = couple.group(5)
DictFromSnotice = {'notice': 'kill', 'who': who, 'operator': who_operator, "client": clients, 'type': which_line, 'nick': nick} snote_dict = {
repl = f"\x02\x1FNOTICE: {DictFromSnotice['notice']} \x0F\x11\x0303☠\x0F \x02KilledBy:\x0F {DictFromSnotice['who']} \x02KilledByOper:\x0F {DictFromSnotice['operator']} \x02NumofClientsAffected:\x0F {DictFromSnotice['client']} \x02XLINE Type:\x0F {DictFromSnotice['type']} \x02Nick:\x0F {DictFromSnotice['nick']}" "notice": "kill",
"who": who,
"operator": who_operator,
"client": clients,
"type": which_line,
"nick": nick,
}
repl = f"\x02\x1FNOTICE: {snote_dict['notice']} \x0F\x11\x0303☠\x0F \x02KilledBy:\x0F {snote_dict['who']} \x02KilledByOper:\x0F {snote_dict['operator']} \x02NumofClientsAffected:\x0F {snote_dict['client']} \x02XLINE Type:\x0F {snote_dict['type']} \x02Nick:\x0F {snote_dict['nick']}"
self._sendSnotice(irc, msg, repl) self._sendSnotice(irc, msg, repl)
if 'NICK' in text and 'changed nickname to' in text: if "NICK" in text and "changed nickname to" in text:
nickregex = "^-NICK- (.+) changed nickname to (.+)$" nickregex = "^-NICK- (.+) changed nickname to (.+)$"
couple = re.match(nickregex, text) couple = re.match(nickregex, text)
old_nick = couple.group(1) old_nick = couple.group(1)
new_nick = couple.group(2) new_nick = couple.group(2)
DictFromSnotice = {'notice': 'nick change', 'old_nick': old_nick, 'new_nick': new_nick} snote_dict = {
repl = f"\x02\x1FNOTICE: {DictFromSnotice['notice']} ==> {DictFromSnotice['old_nick']} changed their nick to {DictFromSnotice['new_nick']}" "notice": "nick change",
"old_nick": old_nick,
"new_nick": new_nick,
}
repl = f"\x02\x1FNOTICE: {snote_dict['notice']} ==> {snote_dict['old_nick']} changed their nick to {snote_dict['new_nick']}"
self._sendSnotice(irc, msg, repl) self._sendSnotice(irc, msg, repl)
if 'QUIT' in text and 'exited' in text: if "QUIT" in text and "exited" in text:
quitregex = "^-QUIT- (.+) exited the network$" quitregex = "^-QUIT- (.+) exited the network$"
couple = re.match(quitregex, text) couple = re.match(quitregex, text)
nick = couple.group(1) nick = couple.group(1)
DictFromSnotice = {'notice': 'quit', 'nick': nick} snote_dict = {"notice": "quit", "nick": nick}
repl = f"\x02\x1FNOTICE: quit nick: {nick} has exited the network" repl = f"\x02\x1FNOTICE: quit nick: {nick} has exited the network"
self._sendSnotice(irc, msg, repl) self._sendSnotice(irc, msg, repl)
if 'ACCOUNT' in text and 'registered account' in text: if "ACCOUNT" in text and "registered account" in text:
accregex = "^-ACCOUNT- Client \[(.*)\] registered account \[(.*)\] from IP (.*)$" accregex = "^-ACCOUNT- Client \[(.*)\] registered account \[(.*)\] from IP (.*)$"
couple = re.match(accregex, text) couple = re.match(accregex, text)
hostmask = couple.group(1) hostmask = couple.group(1)
account = couple.group(2) account = couple.group(2)
ip = couple.group(3) ip = couple.group(3)
DictFromSnotice = {'notice': 'accreg', 'hostmask': hostmask, 'account': account, 'ip': ip} snote_dict = {
"notice": "accreg",
"hostmask": hostmask,
"account": account,
"ip": ip,
}
repl = f"\x02\x1FNOTICE: accreg -> [{account}] was registered by hostmask [{hostmask}] from IP {ip}" repl = f"\x02\x1FNOTICE: accreg -> [{account}] was registered by hostmask [{hostmask}] from IP {ip}"
# Trigger HS SET # Trigger HS SET
self._setvhost(irc, msg, account) self._setvhost(irc, msg, account)
self._sendSnotice(irc, msg, repl) self._sendSnotice(irc, msg, repl)
if 'ACCOUNT' in text and 'registered account' in text and 'SAREGISTER' in text:
if (
"ACCOUNT" in text
and "registered account" in text
and "SAREGISTER" in text
):
accregex = "^-ACCOUNT- Operator \[(.*)\] registered account \[(.*)\] with SAREGISTER$" accregex = "^-ACCOUNT- Operator \[(.*)\] registered account \[(.*)\] with SAREGISTER$"
couple = re.match(accregex, text) couple = re.match(accregex, text)
oper = couple.group(1) oper = couple.group(1)
account = couple.group(2) account = couple.group(2)
DictFromSnotice = {'notice': 'sareg', 'oper': oper, 'account': account} snote_dict = {"notice": "sareg", "oper": oper, "account": account}
repl = f"\x02\x1FNOTICE: sareg -> [{account}] was registered by operator [{oper}]" repl = f"\x02\x1FNOTICE: sareg -> [{account}] was registered by operator [{oper}]"
self._setvhost(irc, msg, account) self._setvhost(irc, msg, account)
self._sendSnotice(irc, msg, repl) self._sendSnotice(irc, msg, repl)
if "OPER" in text and "Client opered up" in text:
operregex = "^-OPER- Client opered up \[(.*), (.*)\]$"
couple = re.match(operregex, text)
hostmask = couple.group(1)
oper = couple.group(2)
print(couple)
snote_dict = {"notice": "opered", "hostmask": hostmask, "oper": oper}
repl = f"\x02\x1FNOTICE:\x0F [{hostmask}] opered up as [{oper}]."
self._sendSnotice(irc, msg, repl)
if "OPER" in text and "Client deopered" in text:
operregex = "^-OPER- Client deopered \[(.*)\]"
couple = re.match(operregex, text)
account = couple.group(1)
snote_dict = {"notice": "deopered", "name": account}
repl = f"\x02\x1FNOTICE:\x0F [{account}] opered down."
self._sendSnotice(irc, msg, repl)
# Post Registration # Post Registration
def _setvhost(self, irc, msg, account): def _setvhost(self, irc, msg, account):
arg = ['SET'] arg = ["SET"]
arg.append(account) arg.append(account)
vhost = self.registryValue('AutoVhost') vhost = self.registryValue("AutoVhost")
arg.append(f'{vhost}{account}') arg.append(f"{vhost}{account}")
irc.sendMsg(msg=ircmsgs.IrcMsg(command='HS', irc.sendMsg(msg=ircmsgs.IrcMsg(command="HS", args=arg))
args=arg))
# Send formatted SNO to channel # Send formatted SNO to channel
def _sendSnotice(self, irc, msg, repl): def _sendSnotice(self, irc, msg, repl):
try: try:
channel = self.registryValue('targetChannel') channel = self.registryValue("targetChannel")
if channel[0] == '#': if irc.isChannel(channel):
irc.queueMsg(msg=ircmsgs.IrcMsg(command='NOTICE', irc.queueMsg(msg=ircmsgs.IrcMsg(command="NOTICE", args=(channel, repl)))
args=(channel, repl)))
# what sort of exception does one raise # what sort of exception does one raise
except: except:
pass pass

4
requirements.txt Normal file
View File

@ -0,0 +1,4 @@
async-timeout==4.0.2
dnspython==2.0.0
ipwhois==1.2.0
redis==4.5.4