This repository has been archived on 2021-08-24. You can view files and clone it, but cannot push or open issues or pull requests.
EgoServ-LDAP/plugin.py

392 lines
14 KiB
Python

###
# Copyright (c) 2021, mogad0n
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions, and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions, and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the author of this software nor the name of
# contributors to this software may be used to endorse or promote products
# derived from this software without specific prior written consent.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
###
import ldap
from supybot import utils, plugins, ircutils, callbacks, irclib, ircmsgs, conf, world, log
from supybot.commands import *
try:
from supybot.i18n import PluginInternationalization
_ = PluginInternationalization('EgoServ')
except ImportError:
# Placeholder that allows to run the plugin on a bot
# without the i18n module
_ = lambda x: x
# import pickle
# import sys
# filename = conf.supybot.directories.data.dirize("EgoServ.db")
class EgoServ(callbacks.Plugin):
"""Suite of tools to help Network Operators run their ErgoIRCd nets"""
threaded = True
# OPER
# This should check if it has OPER right away
# PERMS
# This is designed to be a single network ErgoIRCd administrative bot
# So maybe I just set default perms to owner!
#####
# WHO
#####
# Listen for 354; when recieved send payload for various queries
# and internal functions below
# _isAccount()
# IP address from `ircmsgs.who()`
# @wrap(['nick'])
# def getip(self, irc, msg, args, nick):
# """<nick>
# returns current IP address of a nick
# """
######
# OPER Commands
######
# DEFCON
@wrap([("literal", (1, 2, 3, 4, 5))])
def defcon(self, irc, msg, args, level):
"""<level>
The DEFCON system can disable server features at runtime, to mitigate
spam or other hostile activity. It has five levels, which are cumulative
5: Normal operation
4: No new account or channel registrations; if Tor is enabled, no new
unauthenticated connections from Tor
3: All users are +R; no changes to vhosts
2: No new unauthenticated connections; all channels are +R
1: No new connections except from localhost or other trusted IPs
"""
arg = []
arg.append(level)
irc.sendMsg(msg=ircmsgs.IrcMsg(command='DEFCON', args=arg))
irc.replySuccess(f'Setting DEFCON level to {arg}! Good Luck!')
# QUERY
@wrap(['nick'])
def query(self, irc, msg, args, nick):
"""<nick>
Dumps shit.
"""
arg = [nick]
searchFilter = "(&(uid=" + nick + "*)(objectClass=nsPerson))"
searchAttribute = ["uid","uidNumber","nsAccountLock"]
searchScope = ldap.SCOPE_SUBTREE
host = self.registryValue('ldap.host')
binddn = self.registryValue('ldap.binddn')
pw = self.registryValue('ldap.bindpw')
basedn = self.registryValue('ldap.basedn')
secure = self.registryValue('ldap.secure')
if secure == False:
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
else:
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_HARD)
l = ldap.initialize(host)
try:
ldap.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
l.protocol_version = ldap.VERSION3
l.simple_bind_s(binddn, pw)
except ldap.INVALID_CREDENTIALS:
print("Invalid LDAP credentials")
irc.reply("Internal error.")
except ldap.LDAPError:
print(ldap.LDAPError())
irc.reply("Error output sent to console.")
try:
ldap_result_id = l.search(basedn, searchScope, searchFilter, searchAttribute)
result_set = []
res = l.search_s(basedn, searchScope, searchFilter, searchAttribute)
print(res)
irc.reply(res)
except ldap.LDAPError:
print(ldap.LDAPError)
irc.reply("Error output sent to console.")
l.unbind_s()
# KILL
@wrap(['nick', optional('something')])
def kill(self, irc, msg, args, nick, reason):
"""<nick> [<reason>]
Issues a KILL command on the <nick> with <reason> if provided.
"""
arg = [nick]
if reason:
arg.append(reason)
irc.queueMsg(msg=ircmsgs.IrcMsg(command='KILL',
args=arg))
irc.replySuccess(f'Killed connection for {nick}')
# SAJOIN
@wrap([getopts({'nick': 'nick'}), 'channel'])
def sajoin(self, irc, msg, args, opts, channel):
"""[--nick <nick>] [<channel>]
Forcibly joins a <nick> to a <channel>, ignoring restrictions like bans, user limits
and channel keys. If <nick> is omitted, it defaults to the bot itself.
<channel> is only necessary if the message is not sent on the channel itself
"""
opts = dict(opts)
arg = []
if 'nick' in opts:
arg.append(opts['nick'])
arg.append(channel)
irc.queueMsg(msg=ircmsgs.IrcMsg(command='SAJOIN',
args=arg))
if 'nick' in opts:
re = f"attempting to force join {opts['nick']} to {channel}"
else:
re = f'I am attempting to forcibly join {channel}'
irc.reply(re)
# SANICK
@wrap(['nick', 'something'])
def sanick(self, irc, msg, args, current, new):
"""<current> <new>
Issues a SANICK command and forcibly changes the <current> nick to the <new> nick
"""
arg = [current, new]
irc.queueMsg(msg=ircmsgs.IrcMsg(command='SANICK',
args=arg))
# This isn't handling any nick collision errors
irc.reply(f'Forcing nick change for {current} to {new}')
#####
# NickServ
# Nick and Account Administration
#####
# Figure out some clean way to start talking to
# and parsing information from NS. For eg NS CLIENTS LIST
######
# Channel Administration
######
# Only Talking to CS isn't enough because
# CS doesn't handle expiring bans mutes
# Use mute Extban
# @wrap(['nick', 'channel', optional('expiry', 0), 'something'])
# def mute(self, irc, msg, args, nick, channel, duration, reason):
# @wrap(['nick', 'channel'])
# def mute(self, irc, msg, args, nick, channel):
# """<nick> [<channel>]
# Issues an extban which mutes the hostmask associated with the
# <nick>
# """
# # add --all flag for muting the hostmask/account in all channels
# # on the current network
# # Seconds needs to be human readable i.e. 5w 3d 2h
# # Do we care about adding quiets/mutes for accounts not online
# # via --host specification or even an --account
# # Fuck 'duration' and 'reason' for now
# # What I'm doing is very filthy
# # Refer to how the core channel plugin deals with this.
# # https://github.com/ProgVal/Limnoria/blob/master/plugins/Channel/plugin.py#L358
# try:
# (nick, ident, host) = ircutils.splitHostmask(irc.state.nickToHostmask(nick))
# except KeyError:
# irc.error(
# "No such nick",
# Raise=True,
# )
# if nick == irc.nick:
# self.log.warning(f'{msg.prefix} tried to make me mute myself.', )
# irc.error(_('No, I would like to preserve my right to speech!'))
# return
# irc.queueMsg()
@wrap(['channel', 'something', many('nick')])
def amode(self, irc, msg, args, channel, mode, nicks):
"""[<channel>] <mode> <nick> [<nick>....]
sets `CS AMODE` with <mode> on given <nick>/<nicks> for <channel>
The nick/nicks must be registered and <mode> must be (de)voice, (de)hop,
(de)op or de(admin).
<channel> is only necessary if the message is not sent on the channel itself
"""
# label = ircutils.makeLabel()
if mode == 'voice':
flag = '+v'
elif mode == 'admin':
flag = '+a'
elif mode == 'hop':
flag = '+h'
elif mode == 'op':
flag = '+o'
elif mode == 'devoice':
flag = '-v'
elif mode == 'dehop':
flag = '-h'
elif mode == 'deadmin':
flag = '-a'
elif mode == 'deop':
flag = '-o'
else:
irc.error(f'Supplied mode {mode} is not allowed/valid')
for nick in nicks:
irc.queueMsg(msg=ircmsgs.IrcMsg(command='PRIVMSG',
args=('chanserv', f'amode {channel} {flag} {nick}')))
# Would love to handle responses for when it's not an account,
# https://github.com/ergochat/ergo/issues/1515 Waiting for this.
irc.replySuccess(f'Setting mode {flag} on given nick(s), if nick(s) weren\'t given the {flag} mode it/they is/are unregistered')
@wrap([many('channel')])
def chanreg(self, irc, msg, args, channels):
"""[<channel>].. [<channel>..]
Registered the given channel/s by the bot
"""
for channel in channels:
arg = ['register']
arg.append(channel)
irc.queueMsg(msg=ircmsgs.IrcMsg(command='CS', args=arg))
irc.reply('Registered the channel(s) successfully')
@wrap(['channel',('literal', ('users', 'access'))])
def chanreset(self, irc, msg, args, channel, target):
"""[<channel>] <target>
<target> can either be 'users' (kicks all users except the bot)
or 'access' (resets all stored bans, invites, ban exceptions,
and persistent user-mode grants) for <channel>.
<channel> is only necessary if the message is not sent on the channel itself
"""
arg = ['CLEAR']
arg.append(channel)
arg.append(target)
irc.queueMsg(msg=ircmsgs.IrcMsg(command='CS',
args=arg))
if target == 'users':
irc.reply(f'Kicking all users out of {channel} besides me')
else:
irc.reply(f'Resetting bans and privileges for {channel}')
# chanpurge() & chanunpurge() are deprecated
# as they now require confirmation codes.
#####
# HostServ Commands
#####
# Add sanity checks for vhost input? Parse server disagreements
@wrap(['nick', 'something'])
def setvhost(self, irc, msg, args, nick, vhost):
"""<nick> <vhost>
sets a <nick>'s <vhost>. <vhost> must be in the format 'hello.world'
"""
arg = [nick]
searchFilter = "(&(uid=" + nick + "*)(objectClass=nsPerson))"
nickdn = "CN=" + nick + ",ou=lab_users,dc=lab,dc=local"
searchAttribute = ["uid"]
searchScope = ldap.SCOPE_SUBTREE
host = self.registryValue('ldap.host')
binddn = self.registryValue('ldap.binddn')
pw = self.registryValue('ldap.bindpw')
basedn = self.registryValue('ldap.basedn')
secure = self.registryValue('ldap.secure')
attr_vhost = 'displayName'
if secure == False:
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
else:
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_HARD)
l = ldap.initialize(host)
try:
ldap.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
l.protocol_version = ldap.VERSION3
l.simple_bind_s(binddn, pw)
except ldap.INVALID_CREDENTIALS:
print("Invalid LDAP credentials")
irc.reply("Internal error.")
except ldap.LDAPError:
print(ldap.LDAPError())
irc.reply("Error output sent to console.")
try:
#this is not yet operational
#attrmod = [( ldap.MOD_REPLACE, attr_vhost, vhost)]
l.modify_s(nickdn, [( ldap.MOD_REPLACE, attr_vhost, vhost)])
print("Modifying " + nickdn + ": setting " + attr_vhost + " to" + vhost)
irc.reply("Attribute updated.")
except ldap.LDAPError:
print(ldap.LDAPError)
irc.reply("Error output sent to console.")
l.unbind_s()
label = ircutils.makeLabel()
irc.queueMsg(ircmsgs.IrcMsg(command='OPER', args=['coffee', '1234567890']))
arg = ['SET']
arg.append(nick)
arg.append(vhost)
irc.queueMsg(msg=ircmsgs.IrcMsg(command='HS',
args=arg))
# Doesn't handle not account error?
# Doesn't handle confirmation
irc.replySuccess(f'Setting Virtual Host {vhost} for {nick}')
Class = EgoServ
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79: