444 lines
16 KiB
Python
444 lines
16 KiB
Python
###
|
|
# Copyright (c) 2021, mogad0n
|
|
# All rights reserved.
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions are met:
|
|
#
|
|
# * Redistributions of source code must retain the above copyright notice,
|
|
# this list of conditions, and the following disclaimer.
|
|
# * Redistributions in binary form must reproduce the above copyright notice,
|
|
# this list of conditions, and the following disclaimer in the
|
|
# documentation and/or other materials provided with the distribution.
|
|
# * Neither the name of the author of this software nor the name of
|
|
# contributors to this software may be used to endorse or promote products
|
|
# derived from this software without specific prior written consent.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
# POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
###
|
|
|
|
# My Imports
|
|
|
|
|
|
|
|
from supybot import utils, plugins, ircutils, callbacks, irclib, ircmsgs,
|
|
from supybot import conf, world, log, ircdb, registry, schedule
|
|
from supybot.commands import *
|
|
try:
|
|
from supybot.i18n import PluginInternationalization
|
|
_ = PluginInternationalization('EgoServ')
|
|
except ImportError:
|
|
# Placeholder that allows to run the plugin on a bot
|
|
# without the i18n module
|
|
_ = lambda x: x
|
|
|
|
|
|
# import pickle
|
|
# import sys
|
|
|
|
# filename = conf.supybot.directories.data.dirize("EgoServ.db")
|
|
|
|
# Perms Decorator
|
|
def ensure_permissions(min_permission_group):
|
|
"Ensures user belongs to a least permission group"
|
|
def decorator(function):
|
|
def wrapper(*args,**kwargs):
|
|
print(args)
|
|
print(kwargs)
|
|
return function(*args, **kwargs)
|
|
return wrapper
|
|
return decorator
|
|
|
|
# Taken from plugins.Time.seconds
|
|
def getTs(irc, msg, args, state):
|
|
"""[<years>y] [<weeks>w] [<days>d] [<hours>h] [<minutes>m] [<seconds>s]
|
|
Returns the number of seconds in the number of <years>, <weeks>,
|
|
<days>, <hours>, <minutes>, and <seconds> given. An example usage is
|
|
"seconds 2h 30m", which would return 9000, which is '3600*2 + 30*60'.
|
|
Useful for scheduling events at a given number of seconds in the
|
|
future.
|
|
"""
|
|
# here there is some glitch / ugly hack to allow any('getTs'), with rest('test') after...
|
|
# TODO: check that bot can't kill itself with loop
|
|
seconds = -1
|
|
items = list(args)
|
|
for arg in items:
|
|
if not (arg and arg[-1] in 'ywdhms'):
|
|
try:
|
|
n = int(arg)
|
|
state.args.append(n)
|
|
except:
|
|
state.args.append(float(seconds))
|
|
raise callbacks.ArgumentError
|
|
(s, kind) = arg[:-1], arg[-1]
|
|
try:
|
|
i = int(s)
|
|
except ValueError:
|
|
state.args.append(float(seconds))
|
|
raise callbacks.ArgumentError
|
|
if kind == 'y':
|
|
seconds += i*31536000
|
|
elif kind == 'w':
|
|
seconds += i*604800
|
|
elif kind == 'd':
|
|
seconds += i*86400
|
|
elif kind == 'h':
|
|
seconds += i*3600
|
|
elif kind == 'm':
|
|
seconds += i*60
|
|
elif kind == 's':
|
|
if i == 0:
|
|
i = 1
|
|
seconds += i
|
|
elif kind == '-':
|
|
state.args.append(float(seconds))
|
|
raise callbacks.ArgumentError
|
|
args.pop(0)
|
|
state.args.append(float(seconds))
|
|
|
|
|
|
addConverter('getTs', getTs)
|
|
|
|
|
|
|
|
class EgoServ(callbacks.Plugin):
|
|
"""Suite of tools to help Network Operators run their ErgoIRCd nets"""
|
|
threaded = True
|
|
|
|
# OPER
|
|
# This should check if it has OPER right away
|
|
|
|
# PERMS
|
|
# This is designed to be a single network ErgoIRCd administrative bot
|
|
# So maybe I just set default perms to owner!
|
|
|
|
#####
|
|
# WHO
|
|
#####
|
|
|
|
# Listen for 354; when recieved send payload for various queries
|
|
# and internal functions below
|
|
|
|
# _isAccount()
|
|
|
|
# IP address from `ircmsgs.who()`
|
|
# @wrap(['nick'])
|
|
# def getip(self, irc, msg, args, nick):
|
|
# """<nick>
|
|
|
|
# returns current IP address of a nick
|
|
# """
|
|
|
|
######
|
|
# OPER Commands
|
|
######
|
|
|
|
# DEFCON
|
|
|
|
@wrap([("literal", (1, 2, 3, 4, 5))])
|
|
def defcon(self, irc, msg, args, level):
|
|
"""<level>
|
|
|
|
The DEFCON system can disable server features at runtime, to mitigate
|
|
spam or other hostile activity. It has five levels, which are cumulative
|
|
|
|
5: Normal operation
|
|
4: No new account or channel registrations; if Tor is enabled, no new
|
|
unauthenticated connections from Tor
|
|
3: All users are +R; no changes to vhosts
|
|
2: No new unauthenticated connections; all channels are +R
|
|
1: No new connections except from localhost or other trusted IPs
|
|
"""
|
|
|
|
arg = []
|
|
arg.append(level)
|
|
irc.sendMsg(msg=ircmsgs.IrcMsg(command='DEFCON', args=arg))
|
|
irc.replySuccess(f'Setting DEFCON level to {arg}! Good Luck!')
|
|
|
|
|
|
# KILL
|
|
@ensure_permissions(5)
|
|
@wrap(['nick', optional('something')])
|
|
def kill(self, irc, msg, args, nick, reason):
|
|
"""<nick> [<reason>]
|
|
|
|
Issues a KILL command on the <nick> with <reason> if provided.
|
|
"""
|
|
arg = [nick]
|
|
if reason:
|
|
arg.append(reason)
|
|
irc.queueMsg(msg=ircmsgs.IrcMsg(command='KILL',
|
|
args=arg))
|
|
irc.replySuccess(f'Killed connection for {nick}')
|
|
|
|
# SAJOIN
|
|
@wrap([getopts({'nick': 'nick'}), 'channel'])
|
|
def sajoin(self, irc, msg, args, opts, channel):
|
|
"""[--nick <nick>] [<channel>]
|
|
|
|
Forcibly joins a <nick> to a <channel>, ignoring restrictions like bans, user limits
|
|
and channel keys. If <nick> is omitted, it defaults to the bot itself.
|
|
<channel> is only necessary if the message is not sent on the channel itself
|
|
"""
|
|
opts = dict(opts)
|
|
arg = []
|
|
if 'nick' in opts:
|
|
arg.append(opts['nick'])
|
|
arg.append(channel)
|
|
irc.queueMsg(msg=ircmsgs.IrcMsg(command='SAJOIN',
|
|
args=arg))
|
|
if 'nick' in opts:
|
|
re = f"attempting to force join {opts['nick']} to {channel}"
|
|
else:
|
|
re = f'I am attempting to forcibly join {channel}'
|
|
irc.reply(re)
|
|
|
|
# SANICK
|
|
@wrap(['nick', 'something'])
|
|
def sanick(self, irc, msg, args, current, new):
|
|
"""<current> <new>
|
|
|
|
Issues a SANICK command and forcibly changes the <current> nick to the <new> nick
|
|
"""
|
|
arg = [current, new]
|
|
irc.queueMsg(msg=ircmsgs.IrcMsg(command='SANICK',
|
|
args=arg))
|
|
# This isn't handling any nick collision errors
|
|
irc.reply(f'Forcing nick change for {current} to {new}')
|
|
|
|
#####
|
|
# NickServ
|
|
# Nick and Account Administration
|
|
#####
|
|
|
|
# Figure out some clean way to start talking to
|
|
# and parsing information from NS. For eg NS CLIENTS LIST
|
|
|
|
|
|
|
|
######
|
|
# Channel Administration
|
|
######
|
|
|
|
def _ban(self, irc, msg, args,
|
|
channel, optlist, target, getTs, reason, kick):
|
|
# Check that they're not trying to make us kickban ourself.
|
|
if irc.isNick(target):
|
|
bannedNick = target
|
|
try:
|
|
bannedHostmask = irc.state.nickToHostmask(target)
|
|
banmaskstyle = conf.supybot.protocols.irc.banmask
|
|
banmask = banmaskstyle.makeBanmask(bannedHostmask, [o[0] for o in optlist])
|
|
except KeyError:
|
|
## WTF IS THIS
|
|
if not conf.supybot.protocols.irc.strictRfc() and \
|
|
target.startswith('$'):
|
|
# Select the last part, or the whole target:
|
|
bannedNick = target.split(':')[-1]
|
|
banmask = bannedHostmask = target
|
|
else:
|
|
irc.error(format(_('I haven\'t seen %s.'), bannedNick), Raise=True)
|
|
else:
|
|
bannedNick = ircutils.nickFromHostmask(target)
|
|
banmask = bannedHostmask = target
|
|
if not irc.isNick(bannedNick):
|
|
self.log.warning('%q tried to kban a non nick: %q',
|
|
msg.prefix, bannedNick)
|
|
raise callbacks.ArgumentError
|
|
elif bannedNick == irc.nick:
|
|
self.log.warning('%q tried to make me kban myself.', msg.prefix)
|
|
irc.error(_('I cowardly refuse to kickban myself.'))
|
|
return
|
|
if not reason:
|
|
reason = msg.nick
|
|
|
|
capability = ircdb.makeChannelCapability(channel, 'op')
|
|
# Check (again) that they're not trying to make us kickban ourself.
|
|
if ircutils.hostmaskPatternEqual(banmask, irc.prefix):
|
|
if ircutils.hostmaskPatternEqual(bannedHostmask, irc.prefix):
|
|
self.log.warning('%q tried to make me kban myself.',msg.prefix)
|
|
irc.error(_('I cowardly refuse to ban myself.'))
|
|
return
|
|
else:
|
|
self.log.warning('Using exact hostmask since banmask would '
|
|
'ban myself.')
|
|
banmask = bannedHostmask
|
|
# Now, let's actually get to it. Check to make sure they have
|
|
# #channel,op and the bannee doesn't have #channel,op; or that the
|
|
# bannee and the banner are both the same person.
|
|
def doBan():
|
|
if irc.state.channels[channel].isOp(bannedNick):
|
|
irc.queueMsg(ircmsgs.deop(channel, bannedNick))
|
|
irc.queueMsg(ircmsgs.ban(channel, banmask))
|
|
if kick:
|
|
irc.queueMsg(ircmsgs.kick(channel, bannedNick, reason))
|
|
if getTs > 0:
|
|
def f():
|
|
if channel in irc.state.channels and \
|
|
banmask in irc.state.channels[channel].bans:
|
|
irc.queueMsg(ircmsgs.unban(channel, banmask))
|
|
schedule.addEvent(f, )
|
|
if bannedNick == msg.nick:
|
|
doBan()
|
|
elif ircdb.checkCapability(msg.prefix, capability):
|
|
if ircdb.checkCapability(bannedHostmask, capability) and \
|
|
not ircdb.checkCapability(msg.prefix, 'owner'):
|
|
self.log.warning('%s tried to ban %q, but both have %s',
|
|
msg.prefix, bannedHostmask, capability)
|
|
irc.error(format(_('%s has %s too, you can\'t ban '
|
|
'them.'), bannedNick, capability))
|
|
else:
|
|
doBan()
|
|
else:
|
|
self.log.warning('%q attempted kban without %s',
|
|
msg.prefix, capability)
|
|
irc.errorNoCapability(capability)
|
|
|
|
def nban(self, irc, msg, args,
|
|
bannedNick, getTs, reason):
|
|
""" <nick> [<seconds>] [<reason>]
|
|
If you have the #channel,op capability, this will kickban <nick>'s banmask from
|
|
for
|
|
the time period specified. Not specifying the time period it will ban the user
|
|
indefinitely. <reason> is optional but recommended.
|
|
"""
|
|
self._ban(irc, msg, args, bannedNick, reason, True)
|
|
nban = wrap(nban,
|
|
['haveOp',
|
|
|
|
('haveHalfop+', _('kick or ban someone')),
|
|
'nickInChannel',
|
|
optional('', 0),
|
|
additional('text')])
|
|
|
|
|
|
|
|
|
|
@wrap(['nick', 'channel', optional('getTs'), 'something'])
|
|
def quiet(self, irc, msg, args, nick, channel, duration, reason="Fuck Knows!"):
|
|
"""<nick> [<channel>] [<seconds>] [<reason>]
|
|
|
|
Quiets the banmask associated with <nick> for the time period specified
|
|
<seconds>. If not specified, it will quiet the user indefinitely.
|
|
<reason> is optional but recommended! <channel> is only necessary if
|
|
the command is not sent in the channel itself.
|
|
"""
|
|
try:
|
|
(nick, ident, host) = \
|
|
ircutils.splitHostmask(irc.state.nickToHostmask(nick))
|
|
except KeyError:
|
|
irc.error(format("No such nick"), Raise=True)
|
|
|
|
|
|
@wrap(['channel', 'something', many('nick')])
|
|
def amode(self, irc, msg, args, channel, mode, nicks):
|
|
"""[<channel>] <mode> <nick> [<nick>....]
|
|
|
|
sets `CS AMODE` with <mode> on given <nick>/<nicks> for <channel>
|
|
The nick/nicks must be registered and <mode> must be (de)voice, (de)hop,
|
|
(de)op or de(admin).
|
|
<channel> is only necessary if the message is not sent on the channel itself
|
|
"""
|
|
# label = ircutils.makeLabel()
|
|
if mode == 'voice':
|
|
flag = '+v'
|
|
elif mode == 'admin':
|
|
flag = '+a'
|
|
elif mode == 'hop':
|
|
flag = '+h'
|
|
elif mode == 'op':
|
|
flag = '+o'
|
|
elif mode == 'devoice':
|
|
flag = '-v'
|
|
elif mode == 'dehop':
|
|
flag = '-h'
|
|
elif mode == 'deadmin':
|
|
flag = '-a'
|
|
elif mode == 'deop':
|
|
flag = '-o'
|
|
|
|
for nick in nicks:
|
|
irc.queueMsg(msg=ircmsgs.IrcMsg(command='PRIVMSG',
|
|
args=('chanserv', f'amode {channel} {flag} {nick}')))
|
|
else:
|
|
irc.error(f'Supplied mode {mode} is BAD INPUT!')
|
|
# Would love to handle responses for when it's not an account,
|
|
# https://github.com/ergochat/ergo/issues/1515 Waiting for this.
|
|
irc.replySuccess(f'= Auto {flag} applied to \x02registered\x0F nicks!')
|
|
|
|
|
|
@wrap([many('channel')])
|
|
def chanreg(self, irc, msg, args, channels):
|
|
"""[<channel>].. [<channel>..]
|
|
|
|
Registered the given channel/s by the bot
|
|
"""
|
|
|
|
for channel in channels:
|
|
arg = ['register']
|
|
arg.append(channel)
|
|
irc.queueMsg(msg=ircmsgs.IrcMsg(command='CS', args=arg))
|
|
irc.reply('Registered the channel(s) successfully')
|
|
|
|
|
|
@wrap(['channel',('literal', ('users', 'access'))])
|
|
def chanreset(self, irc, msg, args, channel, target):
|
|
"""[<channel>] <target>
|
|
|
|
<target> can either be 'users' (kicks all users except the bot)
|
|
or 'access' (resets all stored bans, invites, ban exceptions,
|
|
and persistent user-mode grants) for <channel>.
|
|
<channel> is only necessary if the message is not sent on the channel itself
|
|
"""
|
|
arg = ['CLEAR']
|
|
arg.append(channel)
|
|
arg.append(target)
|
|
irc.queueMsg(msg=ircmsgs.IrcMsg(command='CS',
|
|
args=arg))
|
|
if target == 'users':
|
|
irc.reply(f'Kicking all users out of {channel} besides me')
|
|
else:
|
|
irc.reply(f'Resetting bans and privileges for {channel}')
|
|
|
|
# chanpurge() & chanunpurge() are deprecated
|
|
# as they now require confirmation codes.
|
|
|
|
#####
|
|
# HostServ Commands
|
|
#####
|
|
|
|
# Add sanity checks for vhost input? Parse server disagreements
|
|
@wrap(['nick', 'something'])
|
|
def setvhost(self, irc, msg, args, nick, vhost):
|
|
"""<nick> <vhost>
|
|
|
|
sets a <nick>'s <vhost>. <vhost> must be in the format 'hello.world'
|
|
"""
|
|
label = ircutils.makeLabel()
|
|
|
|
arg = ['SET']
|
|
arg.append(nick)
|
|
arg.append(vhost)
|
|
irc.queueMsg(msg=ircmsgs.IrcMsg(command='HS',
|
|
args=arg))
|
|
# Doesn't handle not account error?
|
|
# Doesn't handle confirmation
|
|
irc.replySuccess(f'Setting Virtual Host {vhost} for {nick}')
|
|
|
|
|
|
Class = EgoServ
|
|
|
|
|
|
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79:
|