This repository has been archived on 2021-08-20. You can view files and clone it, but cannot push or open issues or pull requests.
EgoServ/plugin.py
2021-08-20 14:05:26 +05:30

444 lines
16 KiB
Python

###
# Copyright (c) 2021, mogad0n
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions, and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions, and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the author of this software nor the name of
# contributors to this software may be used to endorse or promote products
# derived from this software without specific prior written consent.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
###
# My Imports
from supybot import utils, plugins, ircutils, callbacks, irclib, ircmsgs,
from supybot import conf, world, log, ircdb, registry, schedule
from supybot.commands import *
try:
from supybot.i18n import PluginInternationalization
_ = PluginInternationalization('EgoServ')
except ImportError:
# Placeholder that allows to run the plugin on a bot
# without the i18n module
_ = lambda x: x
# import pickle
# import sys
# filename = conf.supybot.directories.data.dirize("EgoServ.db")
# Perms Decorator
def ensure_permissions(min_permission_group):
"Ensures user belongs to a least permission group"
def decorator(function):
def wrapper(*args,**kwargs):
print(args)
print(kwargs)
return function(*args, **kwargs)
return wrapper
return decorator
# Taken from plugins.Time.seconds
def getTs(irc, msg, args, state):
"""[<years>y] [<weeks>w] [<days>d] [<hours>h] [<minutes>m] [<seconds>s]
Returns the number of seconds in the number of <years>, <weeks>,
<days>, <hours>, <minutes>, and <seconds> given. An example usage is
"seconds 2h 30m", which would return 9000, which is '3600*2 + 30*60'.
Useful for scheduling events at a given number of seconds in the
future.
"""
# here there is some glitch / ugly hack to allow any('getTs'), with rest('test') after...
# TODO: check that bot can't kill itself with loop
seconds = -1
items = list(args)
for arg in items:
if not (arg and arg[-1] in 'ywdhms'):
try:
n = int(arg)
state.args.append(n)
except:
state.args.append(float(seconds))
raise callbacks.ArgumentError
(s, kind) = arg[:-1], arg[-1]
try:
i = int(s)
except ValueError:
state.args.append(float(seconds))
raise callbacks.ArgumentError
if kind == 'y':
seconds += i*31536000
elif kind == 'w':
seconds += i*604800
elif kind == 'd':
seconds += i*86400
elif kind == 'h':
seconds += i*3600
elif kind == 'm':
seconds += i*60
elif kind == 's':
if i == 0:
i = 1
seconds += i
elif kind == '-':
state.args.append(float(seconds))
raise callbacks.ArgumentError
args.pop(0)
state.args.append(float(seconds))
addConverter('getTs', getTs)
class EgoServ(callbacks.Plugin):
"""Suite of tools to help Network Operators run their ErgoIRCd nets"""
threaded = True
# OPER
# This should check if it has OPER right away
# PERMS
# This is designed to be a single network ErgoIRCd administrative bot
# So maybe I just set default perms to owner!
#####
# WHO
#####
# Listen for 354; when recieved send payload for various queries
# and internal functions below
# _isAccount()
# IP address from `ircmsgs.who()`
# @wrap(['nick'])
# def getip(self, irc, msg, args, nick):
# """<nick>
# returns current IP address of a nick
# """
######
# OPER Commands
######
# DEFCON
@wrap([("literal", (1, 2, 3, 4, 5))])
def defcon(self, irc, msg, args, level):
"""<level>
The DEFCON system can disable server features at runtime, to mitigate
spam or other hostile activity. It has five levels, which are cumulative
5: Normal operation
4: No new account or channel registrations; if Tor is enabled, no new
unauthenticated connections from Tor
3: All users are +R; no changes to vhosts
2: No new unauthenticated connections; all channels are +R
1: No new connections except from localhost or other trusted IPs
"""
arg = []
arg.append(level)
irc.sendMsg(msg=ircmsgs.IrcMsg(command='DEFCON', args=arg))
irc.replySuccess(f'Setting DEFCON level to {arg}! Good Luck!')
# KILL
@ensure_permissions(5)
@wrap(['nick', optional('something')])
def kill(self, irc, msg, args, nick, reason):
"""<nick> [<reason>]
Issues a KILL command on the <nick> with <reason> if provided.
"""
arg = [nick]
if reason:
arg.append(reason)
irc.queueMsg(msg=ircmsgs.IrcMsg(command='KILL',
args=arg))
irc.replySuccess(f'Killed connection for {nick}')
# SAJOIN
@wrap([getopts({'nick': 'nick'}), 'channel'])
def sajoin(self, irc, msg, args, opts, channel):
"""[--nick <nick>] [<channel>]
Forcibly joins a <nick> to a <channel>, ignoring restrictions like bans, user limits
and channel keys. If <nick> is omitted, it defaults to the bot itself.
<channel> is only necessary if the message is not sent on the channel itself
"""
opts = dict(opts)
arg = []
if 'nick' in opts:
arg.append(opts['nick'])
arg.append(channel)
irc.queueMsg(msg=ircmsgs.IrcMsg(command='SAJOIN',
args=arg))
if 'nick' in opts:
re = f"attempting to force join {opts['nick']} to {channel}"
else:
re = f'I am attempting to forcibly join {channel}'
irc.reply(re)
# SANICK
@wrap(['nick', 'something'])
def sanick(self, irc, msg, args, current, new):
"""<current> <new>
Issues a SANICK command and forcibly changes the <current> nick to the <new> nick
"""
arg = [current, new]
irc.queueMsg(msg=ircmsgs.IrcMsg(command='SANICK',
args=arg))
# This isn't handling any nick collision errors
irc.reply(f'Forcing nick change for {current} to {new}')
#####
# NickServ
# Nick and Account Administration
#####
# Figure out some clean way to start talking to
# and parsing information from NS. For eg NS CLIENTS LIST
######
# Channel Administration
######
def _ban(self, irc, msg, args,
channel, optlist, target, getTs, reason, kick):
# Check that they're not trying to make us kickban ourself.
if irc.isNick(target):
bannedNick = target
try:
bannedHostmask = irc.state.nickToHostmask(target)
banmaskstyle = conf.supybot.protocols.irc.banmask
banmask = banmaskstyle.makeBanmask(bannedHostmask, [o[0] for o in optlist])
except KeyError:
## WTF IS THIS
if not conf.supybot.protocols.irc.strictRfc() and \
target.startswith('$'):
# Select the last part, or the whole target:
bannedNick = target.split(':')[-1]
banmask = bannedHostmask = target
else:
irc.error(format(_('I haven\'t seen %s.'), bannedNick), Raise=True)
else:
bannedNick = ircutils.nickFromHostmask(target)
banmask = bannedHostmask = target
if not irc.isNick(bannedNick):
self.log.warning('%q tried to kban a non nick: %q',
msg.prefix, bannedNick)
raise callbacks.ArgumentError
elif bannedNick == irc.nick:
self.log.warning('%q tried to make me kban myself.', msg.prefix)
irc.error(_('I cowardly refuse to kickban myself.'))
return
if not reason:
reason = msg.nick
capability = ircdb.makeChannelCapability(channel, 'op')
# Check (again) that they're not trying to make us kickban ourself.
if ircutils.hostmaskPatternEqual(banmask, irc.prefix):
if ircutils.hostmaskPatternEqual(bannedHostmask, irc.prefix):
self.log.warning('%q tried to make me kban myself.',msg.prefix)
irc.error(_('I cowardly refuse to ban myself.'))
return
else:
self.log.warning('Using exact hostmask since banmask would '
'ban myself.')
banmask = bannedHostmask
# Now, let's actually get to it. Check to make sure they have
# #channel,op and the bannee doesn't have #channel,op; or that the
# bannee and the banner are both the same person.
def doBan():
if irc.state.channels[channel].isOp(bannedNick):
irc.queueMsg(ircmsgs.deop(channel, bannedNick))
irc.queueMsg(ircmsgs.ban(channel, banmask))
if kick:
irc.queueMsg(ircmsgs.kick(channel, bannedNick, reason))
if getTs > 0:
def f():
if channel in irc.state.channels and \
banmask in irc.state.channels[channel].bans:
irc.queueMsg(ircmsgs.unban(channel, banmask))
schedule.addEvent(f, )
if bannedNick == msg.nick:
doBan()
elif ircdb.checkCapability(msg.prefix, capability):
if ircdb.checkCapability(bannedHostmask, capability) and \
not ircdb.checkCapability(msg.prefix, 'owner'):
self.log.warning('%s tried to ban %q, but both have %s',
msg.prefix, bannedHostmask, capability)
irc.error(format(_('%s has %s too, you can\'t ban '
'them.'), bannedNick, capability))
else:
doBan()
else:
self.log.warning('%q attempted kban without %s',
msg.prefix, capability)
irc.errorNoCapability(capability)
def nban(self, irc, msg, args,
bannedNick, getTs, reason):
""" <nick> [<seconds>] [<reason>]
If you have the #channel,op capability, this will kickban <nick>'s banmask from
for
the time period specified. Not specifying the time period it will ban the user
indefinitely. <reason> is optional but recommended.
"""
self._ban(irc, msg, args, bannedNick, reason, True)
nban = wrap(nban,
['haveOp',
('haveHalfop+', _('kick or ban someone')),
'nickInChannel',
optional('', 0),
additional('text')])
@wrap(['nick', 'channel', optional('getTs'), 'something'])
def quiet(self, irc, msg, args, nick, channel, duration, reason="Fuck Knows!"):
"""<nick> [<channel>] [<seconds>] [<reason>]
Quiets the banmask associated with <nick> for the time period specified
<seconds>. If not specified, it will quiet the user indefinitely.
<reason> is optional but recommended! <channel> is only necessary if
the command is not sent in the channel itself.
"""
try:
(nick, ident, host) = \
ircutils.splitHostmask(irc.state.nickToHostmask(nick))
except KeyError:
irc.error(format("No such nick"), Raise=True)
@wrap(['channel', 'something', many('nick')])
def amode(self, irc, msg, args, channel, mode, nicks):
"""[<channel>] <mode> <nick> [<nick>....]
sets `CS AMODE` with <mode> on given <nick>/<nicks> for <channel>
The nick/nicks must be registered and <mode> must be (de)voice, (de)hop,
(de)op or de(admin).
<channel> is only necessary if the message is not sent on the channel itself
"""
# label = ircutils.makeLabel()
if mode == 'voice':
flag = '+v'
elif mode == 'admin':
flag = '+a'
elif mode == 'hop':
flag = '+h'
elif mode == 'op':
flag = '+o'
elif mode == 'devoice':
flag = '-v'
elif mode == 'dehop':
flag = '-h'
elif mode == 'deadmin':
flag = '-a'
elif mode == 'deop':
flag = '-o'
for nick in nicks:
irc.queueMsg(msg=ircmsgs.IrcMsg(command='PRIVMSG',
args=('chanserv', f'amode {channel} {flag} {nick}')))
else:
irc.error(f'Supplied mode {mode} is BAD INPUT!')
# Would love to handle responses for when it's not an account,
# https://github.com/ergochat/ergo/issues/1515 Waiting for this.
irc.replySuccess(f'= Auto {flag} applied to \x02registered\x0F nicks!')
@wrap([many('channel')])
def chanreg(self, irc, msg, args, channels):
"""[<channel>].. [<channel>..]
Registered the given channel/s by the bot
"""
for channel in channels:
arg = ['register']
arg.append(channel)
irc.queueMsg(msg=ircmsgs.IrcMsg(command='CS', args=arg))
irc.reply('Registered the channel(s) successfully')
@wrap(['channel',('literal', ('users', 'access'))])
def chanreset(self, irc, msg, args, channel, target):
"""[<channel>] <target>
<target> can either be 'users' (kicks all users except the bot)
or 'access' (resets all stored bans, invites, ban exceptions,
and persistent user-mode grants) for <channel>.
<channel> is only necessary if the message is not sent on the channel itself
"""
arg = ['CLEAR']
arg.append(channel)
arg.append(target)
irc.queueMsg(msg=ircmsgs.IrcMsg(command='CS',
args=arg))
if target == 'users':
irc.reply(f'Kicking all users out of {channel} besides me')
else:
irc.reply(f'Resetting bans and privileges for {channel}')
# chanpurge() & chanunpurge() are deprecated
# as they now require confirmation codes.
#####
# HostServ Commands
#####
# Add sanity checks for vhost input? Parse server disagreements
@wrap(['nick', 'something'])
def setvhost(self, irc, msg, args, nick, vhost):
"""<nick> <vhost>
sets a <nick>'s <vhost>. <vhost> must be in the format 'hello.world'
"""
label = ircutils.makeLabel()
arg = ['SET']
arg.append(nick)
arg.append(vhost)
irc.queueMsg(msg=ircmsgs.IrcMsg(command='HS',
args=arg))
# Doesn't handle not account error?
# Doesn't handle confirmation
irc.replySuccess(f'Setting Virtual Host {vhost} for {nick}')
Class = EgoServ
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79: