Limnoria/src/ircdb.py

1360 lines
50 KiB
Python

###
# Copyright (c) 2002-2009, Jeremiah Fincher
# Copyright (c) 2011, Valentin Lorentz
# Copyright (c) 2009,2013, James McCoy
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions, and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions, and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the author of this software nor the name of
# contributors to this software may be used to endorse or promote products
# derived from this software without specific prior written consent.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
###
import os
import time
import operator
from . import conf, ircutils, log, registry, unpreserve, utils, world
from .utils import minisix
def isCapability(capability):
return len(capability.split(None, 1)) == 1
def fromChannelCapability(capability):
"""Returns a (channel, capability) tuple from a channel capability."""
assert isChannelCapability(capability), 'got %s' % capability
return capability.split(',', 1)
def isChannelCapability(capability):
"""Returns True if capability is a channel capability; False otherwise."""
if ',' in capability:
(channel, capability) = capability.split(',', 1)
return ircutils.isChannel(channel) and isCapability(capability)
else:
return False
def makeChannelCapability(channel, capability):
"""Makes a channel capability given a channel and a capability."""
assert isCapability(capability), 'got %s' % capability
assert ircutils.isChannel(channel), 'got %s' % channel
return '%s,%s' % (channel, capability)
def isAntiCapability(capability):
"""Returns True if capability is an anticapability; False otherwise."""
if isChannelCapability(capability):
(_, capability) = fromChannelCapability(capability)
return isCapability(capability) and capability[0] == '-'
def makeAntiCapability(capability):
"""Returns the anticapability of a given capability."""
assert isCapability(capability), 'got %s' % capability
assert not isAntiCapability(capability), \
'makeAntiCapability does not work on anticapabilities. ' \
'You probably want invertCapability; got %s.' % capability
if isChannelCapability(capability):
(channel, capability) = fromChannelCapability(capability)
return makeChannelCapability(channel, '-' + capability)
else:
return '-' + capability
def unAntiCapability(capability):
"""Takes an anticapability and returns the non-anti form."""
assert isCapability(capability), 'got %s' % capability
if not isAntiCapability(capability):
raise ValueError('%s is not an anti capability' % capability)
if isChannelCapability(capability):
(channel, capability) = fromChannelCapability(capability)
return ','.join((channel, capability[1:]))
else:
return capability[1:]
def invertCapability(capability):
"""Make a capability into an anticapability and vice versa."""
assert isCapability(capability), 'got %s' % capability
if isAntiCapability(capability):
return unAntiCapability(capability)
else:
return makeAntiCapability(capability)
def canonicalCapability(capability):
if callable(capability):
capability = capability()
assert isCapability(capability), 'got %s' % capability
return capability.lower()
_unwildcard_remover = utils.str.MultipleRemover('!@*?')
def unWildcardHostmask(hostmask):
return _unwildcard_remover(hostmask)
_invert = invertCapability
class CapabilitySet(set):
"""A subclass of set handling basic capability stuff."""
__slots__ = ('__parent',)
def __init__(self, capabilities=()):
self.__parent = super(CapabilitySet, self)
self.__parent.__init__()
for capability in capabilities:
self.add(capability)
def add(self, capability):
"""Adds a capability to the set."""
capability = ircutils.toLower(capability)
inverted = _invert(capability)
if self.__parent.__contains__(inverted):
self.__parent.remove(inverted)
self.__parent.add(capability)
def remove(self, capability):
"""Removes a capability from the set."""
capability = ircutils.toLower(capability)
self.__parent.remove(capability)
def __contains__(self, capability):
capability = ircutils.toLower(capability)
if self.__parent.__contains__(capability):
return True
if self.__parent.__contains__(_invert(capability)):
return True
else:
return False
def check(self, capability, ignoreOwner=False):
"""Returns the appropriate boolean for whether a given capability is
'allowed' given its (or its anticapability's) presence in the set.
"""
capability = ircutils.toLower(capability)
if self.__parent.__contains__(capability):
return True
elif self.__parent.__contains__(_invert(capability)):
return False
else:
raise KeyError
def __repr__(self):
return '%s([%s])' % (self.__class__.__name__,
', '.join(map(repr, self)))
antiOwner = makeAntiCapability('owner')
class UserCapabilitySet(CapabilitySet):
"""A subclass of CapabilitySet to handle the owner capability correctly."""
__slots__ = ('__parent',)
def __init__(self, *args, **kwargs):
self.__parent = super(UserCapabilitySet, self)
self.__parent.__init__(*args, **kwargs)
def __contains__(self, capability, ignoreOwner=False):
capability = ircutils.toLower(capability)
if not ignoreOwner and capability == 'owner' or capability == antiOwner:
return True
elif not ignoreOwner and self.__parent.__contains__('owner'):
return True
else:
return self.__parent.__contains__(capability)
def check(self, capability, ignoreOwner=False):
"""Returns the appropriate boolean for whether a given capability is
'allowed' given its (or its anticapability's) presence in the set.
Differs from CapabilitySet in that it handles the 'owner' capability
appropriately.
"""
capability = ircutils.toLower(capability)
if capability == 'owner' or capability == antiOwner:
if self.__parent.__contains__('owner'):
return not isAntiCapability(capability)
else:
return isAntiCapability(capability)
elif not ignoreOwner and self.__parent.__contains__('owner'):
if isAntiCapability(capability):
return False
else:
return True
else:
return self.__parent.check(capability)
def add(self, capability):
"""Adds a capability to the set. Just make sure it's not -owner."""
capability = ircutils.toLower(capability)
assert capability != '-owner', '"-owner" disallowed.'
self.__parent.add(capability)
class IrcUser(object):
"""This class holds the capabilities and authentications for a user."""
__slots__ = ('id', 'auth', 'name', 'ignore', 'secure', 'hashed',
'password', 'capabilities', 'hostmasks', 'nicks', 'gpgkeys')
def __init__(self, ignore=False, password='', name='',
capabilities=(), hostmasks=None, nicks=None,
secure=False, hashed=False):
self.id = None
self.auth = [] # The (time, hostmask) list of auth crap.
self.name = name # The name of the user.
self.ignore = ignore # A boolean deciding if the person is ignored.
self.secure = secure # A boolean describing if hostmasks *must* match.
self.hashed = hashed # True if the password is hashed on disk.
self.password = password # password (plaintext? hashed?)
self.capabilities = UserCapabilitySet()
for capability in capabilities:
self.capabilities.add(capability)
# hostmasks used for recognition
self.hostmasks = ircutils.HostmaskSet(hostmasks or [])
if nicks is None:
# {'network1': ['foo', 'bar'], 'network': ['baz']}
self.nicks = ircutils.IrcDict()
else:
self.nicks = nicks
self.gpgkeys = [] # GPG key ids
def __repr__(self):
return format('%s(id=%s, ignore=%s, password="", name=%q, hashed=%r, '
'capabilities=%r, hostmasks=[], secure=%r)\n',
self.__class__.__name__, self.id, self.ignore,
self.name, self.hashed, self.capabilities, self.secure)
def __hash__(self):
return hash(self.id)
def addCapability(self, capability):
"""Gives the user the given capability."""
self.capabilities.add(capability)
def removeCapability(self, capability):
"""Takes from the user the given capability."""
self.capabilities.remove(capability)
def _checkCapability(self, capability, ignoreOwner=False):
"""Checks the user for a given capability."""
if self.ignore:
if isAntiCapability(capability):
return True
else:
return False
else:
return self.capabilities.check(capability, ignoreOwner=ignoreOwner)
def setPassword(self, password, hashed=False):
"""Sets the user's password. If password is None, it will be disabled."""
if hashed or self.hashed:
self.hashed = True
if password is None:
self.password = ""
else:
self.password = utils.saltHash(password)
else:
self.password = password
def checkPassword(self, password):
"""Checks the user's password."""
if password is None or not self.password:
return False
if self.hashed:
(salt, _) = self.password.split('|')
return (self.password == utils.saltHash(password, salt=salt))
else:
return (self.password == password)
def checkHostmask(self, hostmask, useAuth=True):
"""Checks a given hostmask against the user's hostmasks or current
authentication. If useAuth is False, only checks against the user's
hostmasks.
"""
if useAuth:
timeout = conf.supybot.databases.users.timeoutIdentification()
removals = []
try:
for (when, authmask) in self.auth:
if timeout and when+timeout < time.time():
removals.append((when, authmask))
elif hostmask == authmask:
return True
finally:
while removals:
self.auth.remove(removals.pop())
matched_pattern = self.hostmasks.match(hostmask)
if matched_pattern is not None:
return matched_pattern
return False
def addHostmask(self, hostmask):
"""Adds a hostmask to the user's hostmasks."""
assert ircutils.isUserHostmask(hostmask), 'got %s' % hostmask
if len(unWildcardHostmask(hostmask)) < 3:
raise ValueError('Hostmask must contain at least 3 non-wildcard characters.')
self.hostmasks.add(hostmask)
def removeHostmask(self, hostmask):
"""Removes a hostmask from the user's hostmasks."""
self.hostmasks.remove(hostmask)
def checkNick(self, network, nick):
"""Checks a given nick against the user's nicks."""
return nick in self.nicks[network]
def addNick(self, network, nick):
"""Adds a nick to the user's registered nicks on the network."""
global users
assert isinstance(network, minisix.string_types)
assert ircutils.isNick(nick), 'got %s' % nick
if users.getUserFromNick(network, nick) is not None:
raise KeyError
if network not in self.nicks:
self.nicks[network] = []
if nick not in self.nicks[network]:
self.nicks[network].append(nick)
def removeNick(self, network, nick):
"""Removes a nick from the user's registered nicks on the network."""
assert isinstance(network, minisix.string_types)
if nick not in self.nicks[network]:
raise KeyError
self.nicks[network].remove(nick)
def addAuth(self, hostmask):
"""Sets a user's authenticated hostmask. This times out according to
conf.supybot.timeoutIdentification. If hostmask exactly matches an
existing, known hostmask, the previous entry is removed."""
if self.checkHostmask(hostmask, useAuth=False) or not self.secure:
self.auth.append((time.time(), hostmask))
knownHostmasks = set()
def uniqueHostmask(auth):
(_, mask) = auth
if mask not in knownHostmasks:
knownHostmasks.add(mask)
return True
return False
uniqued = list(filter(uniqueHostmask, reversed(self.auth)))
self.auth = list(reversed(uniqued))
else:
raise ValueError('secure flag set, unmatched hostmask')
def clearAuth(self):
"""Unsets a user's authenticated hostmask."""
for (when, hostmask) in self.auth:
users.invalidateCache(hostmask=hostmask)
self.auth = []
def preserve(self, fd, indent=''):
def write(s):
fd.write(indent)
fd.write(s)
fd.write(os.linesep)
write('name %s' % self.name)
write('ignore %s' % self.ignore)
write('secure %s' % self.secure)
if self.password:
write('hashed %s' % self.hashed)
write('password %s' % self.password)
for capability in self.capabilities:
write('capability %s' % capability)
for hostmask in self.hostmasks:
write('hostmask %s' % hostmask)
for network, nicks in self.nicks.items():
write('nicks %s %s' % (network, ' '.join(nicks)))
for key in self.gpgkeys:
write('gpgkey %s' % key)
fd.write(os.linesep)
class IrcChannel(object):
"""This class holds the capabilities, bans, and ignores of a channel."""
__slots__ = ('defaultAllow', 'expiredBans', 'bans', 'ignores', 'silences',
'exceptions', 'capabilities', 'lobotomized')
defaultOff = ('op', 'halfop', 'voice', 'protected')
def __init__(self, bans=None, silences=None, exceptions=None, ignores=None,
capabilities=None, lobotomized=False, defaultAllow=True):
self.defaultAllow = defaultAllow
self.expiredBans = []
self.bans = bans or {}
self.ignores = ircutils.ExpiringHostmaskDict(ignores)
self.silences = silences or []
self.exceptions = exceptions or []
self.capabilities = capabilities or CapabilitySet()
for capability in self.defaultOff:
if capability not in self.capabilities:
self.capabilities.add(makeAntiCapability(capability))
self.lobotomized = lobotomized
def __repr__(self):
return '%s(bans=%r, ignores=%r, capabilities=%r, ' \
'lobotomized=%r, defaultAllow=%s, ' \
'silences=%r, exceptions=%r)\n' % \
(self.__class__.__name__, self.bans, self.ignores,
self.capabilities, self.lobotomized,
self.defaultAllow, self.silences, self.exceptions)
def addBan(self, hostmask, expiration=0):
"""Adds a ban to the channel banlist."""
assert not conf.supybot.protocols.irc.strictRfc() or \
ircutils.isUserHostmask(hostmask), 'got %s' % hostmask
self.bans[hostmask] = int(expiration)
def removeBan(self, hostmask):
"""Removes a ban from the channel banlist."""
assert not conf.supybot.protocols.irc.strictRfc() or \
ircutils.isUserHostmask(hostmask), 'got %s' % hostmask
return self.bans.pop(hostmask)
def checkBan(self, hostmask):
"""Checks whether a given hostmask is banned by the channel banlist."""
assert ircutils.isUserHostmask(hostmask), 'got %s' % hostmask
now = time.time()
for (pattern, expiration) in list(self.bans.items()):
if now < expiration or not expiration:
if ircutils.hostmaskPatternEqual(pattern, hostmask):
return True
else:
self.expiredBans.append((pattern, expiration))
del self.bans[pattern]
return False
def addIgnore(self, hostmask, expiration=0):
"""Adds an ignore to the channel ignore list."""
assert ircutils.isUserHostmask(hostmask), 'got %s' % hostmask
self.ignores[hostmask] = int(expiration)
def removeIgnore(self, hostmask):
"""Removes an ignore from the channel ignore list."""
assert ircutils.isUserHostmask(hostmask), 'got %s' % hostmask
return self.ignores.pop(hostmask)
def addCapability(self, capability):
"""Adds a capability to the channel's default capabilities."""
assert isCapability(capability), 'got %s' % capability
self.capabilities.add(capability)
def removeCapability(self, capability):
"""Removes a capability from the channel's default capabilities."""
assert isCapability(capability), 'got %s' % capability
self.capabilities.remove(capability)
def setDefaultCapability(self, b):
"""Sets the default capability in the channel."""
self.defaultAllow = b
def _checkCapability(self, capability, ignoreOwner=False):
"""Checks whether a certain capability is allowed by the channel."""
assert isCapability(capability), 'got %s' % capability
if capability in self.capabilities:
return self.capabilities.check(capability, ignoreOwner=ignoreOwner)
else:
if isAntiCapability(capability):
return not self.defaultAllow
else:
return self.defaultAllow
def checkIgnored(self, hostmask):
"""Checks whether a given hostmask is to be ignored by the channel."""
if self.lobotomized:
return True
if world.testing:
return False
assert ircutils.isUserHostmask(hostmask), 'got %s' % hostmask
if self.checkBan(hostmask):
return True
if self.ignores.match(hostmask):
return True
return False
def preserve(self, fd, indent=''):
def write(s):
fd.write(indent)
fd.write(s)
fd.write(os.linesep)
write('lobotomized %s' % self.lobotomized)
write('defaultAllow %s' % self.defaultAllow)
for capability in self.capabilities:
write('capability ' + capability)
bans = list(self.bans.items())
utils.sortBy(operator.itemgetter(1), bans)
for (ban, expiration) in bans:
write('ban %s %d' % (ban, expiration))
ignores = list(self.ignores.items())
utils.sortBy(operator.itemgetter(1), ignores)
for (ignore, expiration) in ignores:
write('ignore %s %d' % (ignore, expiration))
fd.write(os.linesep)
class IrcNetwork(object):
"""This class holds dynamic information about a network that should be
preserved across restarts."""
__slots__ = ('stsPolicies', 'lastDisconnectTimes')
def __init__(self, stsPolicies=None, lastDisconnectTimes=None):
self.stsPolicies = stsPolicies or {}
self.lastDisconnectTimes = lastDisconnectTimes or {}
def __repr__(self):
return '%s(stsPolicies=%r, lastDisconnectTimes=%s)' % \
(self.__class__.__name__, self.stsPolicies,
self.lastDisconnectTimes)
def addStsPolicy(self, server, stsPolicy):
assert isinstance(stsPolicy, str)
self.stsPolicies[server] = stsPolicy
def expireStsPolicy(self, server):
if server in self.stsPolicies:
del self.stsPolicies[server]
def addDisconnection(self, server):
self.lastDisconnectTimes[server] = int(time.time())
def preserve(self, fd, indent=''):
def write(s):
fd.write(indent)
fd.write(s)
fd.write(os.linesep)
for (server, stsPolicy) in sorted(self.stsPolicies.items()):
write('stsPolicy %s %s' % (server, stsPolicy))
for (server, disconnectTime) in \
sorted(self.lastDisconnectTimes.items()):
write('lastDisconnectTime %s %s' % (server, disconnectTime))
fd.write(os.linesep)
class Creator(object):
__slots__ = ()
def badCommand(self, command, rest, lineno):
raise ValueError('Invalid command on line %s: %s' % (lineno, command))
class IrcUserCreator(Creator):
__slots__ = ('users')
u = None
def __init__(self, users):
if self.u is None:
IrcUserCreator.u = IrcUser()
self.users = users
def user(self, rest, lineno):
if self.u.id is not None:
raise ValueError('Unexpected user command on line %s.' % lineno)
self.u.id = int(rest)
def _checkId(self):
if self.u.id is None:
raise ValueError('Unexpected user description without user.')
def name(self, rest, lineno):
self._checkId()
self.u.name = rest
def ignore(self, rest, lineno):
self._checkId()
self.u.ignore = bool(utils.gen.safeEval(rest))
def secure(self, rest, lineno):
self._checkId()
self.u.secure = bool(utils.gen.safeEval(rest))
def hashed(self, rest, lineno):
self._checkId()
self.u.hashed = bool(utils.gen.safeEval(rest))
def password(self, rest, lineno):
self._checkId()
self.u.password = rest
def hostmask(self, rest, lineno):
self._checkId()
self.u.hostmasks.add(rest)
def nicks(self, rest, lineno):
self._checkId()
network, nicks = rest.split(' ', 1)
self.u.nicks[network] = nicks.split(' ')
def capability(self, rest, lineno):
self._checkId()
self.u.capabilities.add(rest)
def gpgkey(self, rest, lineno):
self._checkId()
self.u.gpgkeys.append(rest)
def finish(self):
if self.u.name:
try:
self.users.setUser(self.u)
except DuplicateHostmask:
log.error('Hostmasks for %s collided with another user\'s. '
'Resetting hostmasks for %s.',
self.u.name, self.u.name)
# Some might argue that this is arbitrary, and perhaps it is.
# But we've got to do *something*, so we'll show some deference
# to our lower-numbered users.
self.u.hostmasks.clear()
self.users.setUser(self.u)
IrcUserCreator.u = None
class IrcChannelCreator(Creator):
__slots__ = ('c', 'channels', 'hadChannel')
name = None
def __init__(self, channels):
self.c = IrcChannel()
self.channels = channels
self.hadChannel = bool(self.name)
def channel(self, rest, lineno):
if self.name is not None:
raise ValueError('Unexpected channel command on line %s' % lineno)
IrcChannelCreator.name = rest
def _checkId(self):
if self.name is None:
raise ValueError('Unexpected channel description without channel.')
def lobotomized(self, rest, lineno):
self._checkId()
self.c.lobotomized = bool(utils.gen.safeEval(rest))
def defaultallow(self, rest, lineno):
self._checkId()
self.c.defaultAllow = bool(utils.gen.safeEval(rest))
def capability(self, rest, lineno):
self._checkId()
self.c.capabilities.add(rest)
def ban(self, rest, lineno):
self._checkId()
(pattern, expiration) = rest.split()
self.c.bans[pattern] = int(float(expiration))
def ignore(self, rest, lineno):
self._checkId()
(pattern, expiration) = rest.split()
self.c.ignores[pattern] = int(float(expiration))
def finish(self):
if self.hadChannel:
self.channels.setChannel(self.name, self.c)
IrcChannelCreator.name = None
class IrcNetworkCreator(Creator):
__slots__ = ('net', 'networks')
name = None
def __init__(self, networks):
self.net = IrcNetwork()
self.networks = networks
def network(self, rest, lineno):
IrcNetworkCreator.name = rest
def stspolicy(self, rest, lineno):
(server, stsPolicy) = rest.split()
self.net.addStsPolicy(server, stsPolicy)
def lastdisconnecttime(self, rest, lineno):
(server, when) = rest.split()
when = int(when)
self.net.lastDisconnectTimes[server] = when
def finish(self):
if self.name:
self.networks.setNetwork(self.name, self.net)
self.net = IrcNetwork()
class DuplicateHostmask(ValueError):
pass
class UsersDictionary(utils.IterableMap):
"""A simple serialized-to-file User Database."""
__slots__ = ('noFlush', 'filename', 'users', '_nameCache',
'_hostmaskCache', 'nextId')
def __init__(self):
self.noFlush = False
self.filename = None
self.users = {}
self.nextId = 0
self._nameCache = utils.structures.CacheDict(1000)
self._hostmaskCache = utils.structures.CacheDict(1000)
# This is separate because the Creator has to access our instance.
def open(self, filename):
self.filename = filename
reader = unpreserve.Reader(IrcUserCreator, self)
try:
self.noFlush = True
try:
reader.readFile(filename)
self.noFlush = False
self.flush()
except EnvironmentError as e:
log.error('Invalid user dictionary file, resetting to empty.')
log.error('Exact error: %s', utils.exnToString(e))
except Exception as e:
log.exception('Exact error:')
finally:
self.noFlush = False
def reload(self):
"""Reloads the database from its file."""
if self.filename is not None:
self.nextId = 0
self.users.clear()
self._nameCache.clear()
self._hostmaskCache.clear()
try:
self.open(self.filename)
except EnvironmentError as e:
log.warning('UsersDictionary.reload failed: %s', e)
else:
log.error('UsersDictionary.reload called with no filename.')
def flush(self):
"""Flushes the database to its file."""
if not self.noFlush:
if self.filename is not None:
fd = utils.file.AtomicFile(self.filename)
for (id, u) in sorted(self.users.items()):
fd.write('user %s' % id)
fd.write(os.linesep)
u.preserve(fd, indent=' ')
fd.close()
else:
log.error('UsersDictionary.flush called with no filename.')
else:
log.debug('Not flushing UsersDictionary because of noFlush.')
def close(self):
self.flush()
if self.flush in world.flushers:
world.flushers.remove(self.flush)
self.users.clear()
def items(self):
return self.users.items()
def getUserId(self, s):
"""Returns the user ID of a given name or hostmask."""
if ircutils.isUserHostmask(s):
try:
return self._hostmaskCache[s]
except KeyError:
ids = {}
for (id, user) in self.users.items():
x = user.checkHostmask(s)
if x:
ids[id] = x
if len(ids) == 1:
id = list(ids.keys())[0]
self._hostmaskCache[s] = id
try:
self._hostmaskCache[id].add(s)
except KeyError:
self._hostmaskCache[id] = set([s])
return id
elif len(ids) == 0:
raise KeyError(s)
else:
log.error('Multiple matches found in user database. '
'Removing the offending hostmasks.')
for (id, hostmask) in ids.items():
log.error('Removing %q from user %s.', hostmask, id)
self.users[id].removeHostmask(hostmask)
raise DuplicateHostmask('Ids %r matched.' % ids)
else: # Not a hostmask, must be a name.
s = s.lower()
try:
return self._nameCache[s]
except KeyError:
for (id, user) in self.users.items():
if s == user.name.lower():
self._nameCache[s] = id
self._nameCache[id] = s
return id
else:
raise KeyError(s)
def getUser(self, id):
"""Returns a user given its id, name, or hostmask."""
if not isinstance(id, int):
# Must be a string. Get the UserId first.
id = self.getUserId(id)
u = self.users[id]
while isinstance(u, int):
id = u
u = self.users[id]
u.id = id
return u
def getUserFromNick(self, network, nick):
"""Return a user given its nick."""
for user in self.users.values():
try:
if nick in user.nicks[network]:
return user
except KeyError:
pass
return None
def hasUser(self, id):
"""Returns the database has a user given its id, name, or hostmask."""
try:
self.getUser(id)
return True
except KeyError:
return False
def numUsers(self):
return len(self.users)
def invalidateCache(self, id=None, hostmask=None, name=None):
if hostmask is not None:
if hostmask in self._hostmaskCache:
id = self._hostmaskCache.pop(hostmask)
self._hostmaskCache[id].remove(hostmask)
if not self._hostmaskCache[id]:
del self._hostmaskCache[id]
if name is not None:
del self._nameCache[self._nameCache[id]]
del self._nameCache[id]
if id is not None:
if id in self._nameCache:
del self._nameCache[self._nameCache[id]]
del self._nameCache[id]
if id in self._hostmaskCache:
for hostmask in self._hostmaskCache[id]:
del self._hostmaskCache[hostmask]
del self._hostmaskCache[id]
def setUser(self, user, flush=True):
"""Sets a user (given its id) to the IrcUser given it."""
self.nextId = max(self.nextId, user.id)
try:
if self.getUserId(user.name) != user.id:
raise DuplicateHostmask(user.name, user.name)
except KeyError:
pass
for hostmask in user.hostmasks:
for (i, u) in self.items():
if i == user.id:
continue
elif u.checkHostmask(hostmask):
# We used to remove the hostmask here, but it's not
# appropriate for us both to remove the hostmask and to
# raise an exception. So instead, we'll raise an
# exception, but be nice and give the offending hostmask
# back at the same time.
raise DuplicateHostmask(u.name, hostmask)
for otherHostmask in u.hostmasks:
if ircutils.hostmaskPatternEqual(hostmask, otherHostmask):
raise DuplicateHostmask(u.name, hostmask)
self.invalidateCache(user.id)
self.users[user.id] = user
if flush:
self.flush()
def delUser(self, id):
"""Removes a user from the database."""
del self.users[id]
if id in self._nameCache:
del self._nameCache[self._nameCache[id]]
del self._nameCache[id]
if id in self._hostmaskCache:
for hostmask in list(self._hostmaskCache[id]):
del self._hostmaskCache[hostmask]
del self._hostmaskCache[id]
self.flush()
def newUser(self):
"""Allocates a new user in the database and returns it and its id."""
user = IrcUser(hashed=True)
self.nextId += 1
id = self.nextId
self.users[id] = user
self.flush()
user.id = id
return user
class ChannelsDictionary(utils.IterableMap):
__slots__ = ('noFlush', 'filename', 'channels')
def __init__(self):
self.noFlush = False
self.filename = None
self.channels = ircutils.IrcDict()
def open(self, filename):
self.noFlush = True
try:
self.filename = filename
reader = unpreserve.Reader(IrcChannelCreator, self)
try:
reader.readFile(filename)
self.noFlush = False
self.flush()
except EnvironmentError as e:
log.error('Invalid channel database, resetting to empty.')
log.error('Exact error: %s', utils.exnToString(e))
except Exception as e:
log.error('Invalid channel database, resetting to empty.')
log.exception('Exact error:')
finally:
self.noFlush = False
def flush(self):
"""Flushes the channel database to its file."""
if not self.noFlush:
if self.filename is not None:
fd = utils.file.AtomicFile(self.filename)
for (channel, c) in sorted(self.channels.items()):
fd.write('channel %s' % channel)
fd.write(os.linesep)
c.preserve(fd, indent=' ')
fd.close()
else:
log.warning('ChannelsDictionary.flush without self.filename.')
else:
log.debug('Not flushing ChannelsDictionary because of noFlush.')
def close(self):
self.flush()
if self.flush in world.flushers:
world.flushers.remove(self.flush)
self.channels.clear()
def reload(self):
"""Reloads the channel database from its file."""
if self.filename is not None:
self.channels.clear()
try:
self.open(self.filename)
except EnvironmentError as e:
log.warning('ChannelsDictionary.reload failed: %s', e)
else:
log.warning('ChannelsDictionary.reload without self.filename.')
def getChannel(self, channel):
"""Returns an IrcChannel object for the given channel."""
channel = channel.lower()
if channel in self.channels:
return self.channels[channel]
else:
c = IrcChannel()
self.channels[channel] = c
return c
def setChannel(self, channel, ircChannel):
"""Sets a given channel to the IrcChannel object given."""
channel = channel.lower()
self.channels[channel] = ircChannel
self.flush()
def items(self):
return self.channels.items()
class NetworksDictionary(utils.IterableMap):
__slots__ = ('noFlush', 'filename', 'networks')
def __init__(self):
self.noFlush = False
self.filename = None
self.networks = ircutils.IrcDict()
def open(self, filename):
self.noFlush = True
try:
self.filename = filename
reader = unpreserve.Reader(IrcNetworkCreator, self)
try:
reader.readFile(filename)
self.noFlush = False
self.flush()
except EnvironmentError as e:
log.error('Invalid network database, resetting to empty.')
log.error('Exact error: %s', utils.exnToString(e))
except Exception as e:
log.error('Invalid network database, resetting to empty.')
log.exception('Exact error:')
finally:
self.noFlush = False
def flush(self):
"""Flushes the network database to its file."""
if not self.noFlush:
if self.filename is not None:
fd = utils.file.AtomicFile(self.filename)
for (network, net) in sorted(self.networks.items()):
fd.write('network %s' % network)
fd.write(os.linesep)
net.preserve(fd, indent=' ')
fd.close()
else:
log.warning('NetworksDictionary.flush without self.filename.')
else:
log.debug('Not flushing NetworksDictionary because of noFlush.')
def close(self):
self.flush()
if self.flush in world.flushers:
world.flushers.remove(self.flush)
self.networks.clear()
def reload(self):
"""Reloads the network database from its file."""
if self.filename is not None:
self.networks.clear()
try:
self.open(self.filename)
except EnvironmentError as e:
log.warning('NetworksDictionary.reload failed: %s', e)
else:
log.warning('NetworksDictionary.reload without self.filename.')
def getNetwork(self, network):
"""Returns an IrcNetwork object for the given network."""
network = network.lower()
if network in self.networks:
return self.networks[network]
else:
c = IrcNetwork()
self.networks[network] = c
return c
def setNetwork(self, network, ircNetwork):
"""Sets a given network to the IrcNetwork object given."""
network = network.lower()
self.networks[network] = ircNetwork
self.flush()
def items(self):
return self.networks.items()
class IgnoresDB(object):
__slots__ = ('filename', 'hostmasks')
def __init__(self):
self.filename = None
self.hostmasks = ircutils.ExpiringHostmaskDict()
def open(self, filename):
self.filename = filename
fd = open(self.filename)
for line in utils.file.nonCommentNonEmptyLines(fd):
try:
line = line.rstrip('\r\n')
L = line.split()
hostmask = L.pop(0)
if L:
expiration = int(float(L.pop(0)))
else:
expiration = 0
self.add(hostmask, expiration)
except Exception:
log.error('Invalid line in ignores database: %q', line)
fd.close()
def flush(self):
if self.filename is not None:
fd = utils.file.AtomicFile(self.filename)
now = time.time()
for (hostmask, expiration) in self.hostmasks.items():
if now < expiration or not expiration:
fd.write('%s %s' % (hostmask, expiration))
fd.write(os.linesep)
fd.close()
else:
log.warning('IgnoresDB.flush called without self.filename.')
def close(self):
if self.flush in world.flushers:
world.flushers.remove(self.flush)
self.flush()
self.hostmasks.clear()
def reload(self):
if self.filename is not None:
oldhostmasks = list(self.hostmasks.items())
self.hostmasks.clear()
try:
self.open(self.filename)
except EnvironmentError as e:
log.warning('IgnoresDB.reload failed: %s', e)
# Let's be somewhat transactional.
for (hostmask, expiration) in oldhostmasks:
self.hostmasks.add(hostmask, expiration)
else:
log.warning('IgnoresDB.reload called without self.filename.')
def checkIgnored(self, prefix):
return bool(self.hostmasks.match(prefix))
def add(self, hostmask, expiration=0):
assert ircutils.isUserHostmask(hostmask), 'got %s' % hostmask
self.hostmasks[hostmask] = expiration
def remove(self, hostmask):
del self.hostmasks[hostmask]
confDir = conf.supybot.directories.conf()
try:
userFile = os.path.join(confDir, conf.supybot.databases.users.filename())
users = UsersDictionary()
users.open(userFile)
except EnvironmentError as e:
log.warning('Couldn\'t open user database: %s', e)
try:
channelFile = os.path.join(confDir,
conf.supybot.databases.channels.filename())
channels = ChannelsDictionary()
channels.open(channelFile)
except EnvironmentError as e:
log.warning('Couldn\'t open channel database: %s', e)
try:
networkFile = os.path.join(confDir,
conf.supybot.databases.networks.filename())
networks = NetworksDictionary()
networks.open(networkFile)
except EnvironmentError as e:
log.warning('Couldn\'t open network database: %s', e)
try:
ignoreFile = os.path.join(confDir,
conf.supybot.databases.ignores.filename())
ignores = IgnoresDB()
ignores.open(ignoreFile)
except EnvironmentError as e:
log.warning('Couldn\'t open ignore database: %s', e)
world.flushers.append(users.flush)
world.flushers.append(channels.flush)
world.flushers.append(networks.flush)
world.flushers.append(ignores.flush)
###
# Useful functions for checking credentials.
###
def checkIgnored(hostmask, recipient='', users=users, channels=channels):
"""checkIgnored(hostmask, recipient='') -> True/False
Checks if the user is ignored by the recipient of the message.
"""
try:
id = users.getUserId(hostmask)
user = users.getUser(id)
except KeyError:
# If there's no user...
if conf.supybot.defaultIgnore():
log.debug('Ignoring %s due to conf.supybot.defaultIgnore',
hostmask)
return True
else:
try:
if user._checkCapability('trusted'):
# Trusted users (including owners) shouldn't ever be ignored.
return False
except KeyError:
# neither explicitly trusted or -trusted -> consider them not
# trusted
pass
if user.ignore:
log.debug('Ignoring %s due to their IrcUser ignore flag.', hostmask)
return True
if ignores.checkIgnored(hostmask):
log.debug('Ignoring %s due to ignore database.', hostmask)
return True
if ircutils.isChannel(recipient):
channel = channels.getChannel(recipient)
if channel.checkIgnored(hostmask):
log.debug('Ignoring %s due to the channel ignores.', hostmask)
return True
return False
def _x(capability, ret):
if isAntiCapability(capability):
return not ret
else:
return ret
def _checkCapabilityForUnknownUser(capability, users=users, channels=channels,
ignoreDefaultAllow=False):
if isChannelCapability(capability):
(channel, capability) = fromChannelCapability(capability)
try:
c = channels.getChannel(channel)
if capability in c.capabilities:
return c._checkCapability(capability)
else:
return _x(capability, (not ignoreDefaultAllow) and c.defaultAllow)
except KeyError:
pass
defaultCapabilities = conf.supybot.capabilities()
if capability in defaultCapabilities:
return defaultCapabilities.check(capability)
elif ignoreDefaultAllow:
return _x(capability, False)
else:
return _x(capability, conf.supybot.capabilities.default())
def checkCapability(hostmask, capability, users=users, channels=channels,
ignoreOwner=False, ignoreChannelOp=False,
ignoreDefaultAllow=False):
"""Checks that the user specified by name/hostmask has the capability given.
``users`` and ``channels`` default to ``ircdb.users`` and
``ircdb.channels``.
``ignoreOwner``, ``ignoreChannelOp``, and ``ignoreDefaultAllow`` are
used to override default behavior of the capability system in special
cases (actually, in the AutoMode plugin):
* ``ignoreOwner`` disables the behavior "owners have all capabilites"
* ``ignoreChannelOp`` disables the behavior "channel ops have all
channel capabilities"
* ``ignoreDefaultAllow`` disables the behavior "if a user does not have
a capability or the associated anticapability, then they have the
capability"
"""
if world.testing and (not isinstance(hostmask, str) or
'@' not in hostmask or
'__no_testcap__' not in hostmask.split('@')[1]):
return _x(capability, True)
try:
u = users.getUser(hostmask)
if u.secure and not u.checkHostmask(hostmask, useAuth=False):
raise KeyError
except KeyError:
# Raised when no hostmasks match.
return _checkCapabilityForUnknownUser(capability, users=users,
channels=channels, ignoreDefaultAllow=ignoreDefaultAllow)
except ValueError as e:
# Raised when multiple hostmasks match.
log.warning('%s: %s', hostmask, e)
return _checkCapabilityForUnknownUser(capability, users=users,
channels=channels, ignoreDefaultAllow=ignoreDefaultAllow)
if capability in u.capabilities:
try:
return u._checkCapability(capability, ignoreOwner)
except KeyError:
pass
if isChannelCapability(capability):
(channel, capability) = fromChannelCapability(capability)
if not ignoreChannelOp:
try:
chanop = makeChannelCapability(channel, 'op')
if u._checkCapability(chanop):
return _x(capability, True)
except KeyError:
pass
c = channels.getChannel(channel)
if capability in c.capabilities:
return c._checkCapability(capability)
elif not ignoreDefaultAllow:
return _x(capability, c.defaultAllow)
else:
return False
defaultCapabilities = conf.supybot.capabilities()
defaultCapabilitiesRegistered = conf.supybot.capabilities.registeredUsers()
if capability in defaultCapabilities:
return defaultCapabilities.check(capability)
elif capability in defaultCapabilitiesRegistered:
return defaultCapabilitiesRegistered.check(capability)
elif ignoreDefaultAllow:
return _x(capability, False)
else:
return _x(capability, conf.supybot.capabilities.default())
def checkCapabilities(hostmask, capabilities, requireAll=False):
"""Checks that a user has capabilities in a list.
requireAll is True if *all* capabilities in the list must be had, False if
*any* of the capabilities in the list must be had.
"""
for capability in capabilities:
if requireAll:
if not checkCapability(hostmask, capability):
return False
else:
if checkCapability(hostmask, capability):
return True
return requireAll
###
# supybot.capabilities
###
class SpaceSeparatedListOfCapabilities(registry.SpaceSeparatedListOfStrings):
__slots__ = ()
List = CapabilitySet
class DefaultCapabilities(SpaceSeparatedListOfCapabilities):
__slots__ = ()
# We use a keyword argument trick here to prevent eval'ing of code that
# changes allowDefaultOwner from affecting this. It's not perfect, but
# it's still an improvement, raising the bar for potential crackers.
def setValue(self, v, allowDefaultOwner=conf.allowDefaultOwner):
registry.SpaceSeparatedListOfStrings.setValue(self, v)
if '-owner' not in self.value and not allowDefaultOwner:
print('*** You must run supybot with the --allow-default-owner')
print('*** option in order to allow a default capability of owner.')
print('*** Don\'t do that, it\'s dumb.')
self.value.add('-owner')
conf.registerGlobalValue(conf.supybot, 'capabilities',
DefaultCapabilities([
'-owner', '-admin', '-trusted',
'-aka.add', '-aka.set', '-aka.remove',
'-alias.add', '-alias.remove',
'-scheduler.add', '-scheduler.remove',
'-scheduler.repeat',
],
"""These are the
capabilities that are given to everyone by default. If they are normal
capabilities, then the user will have to have the appropriate
anti-capability if you want to override these capabilities; if they are
anti-capabilities, then the user will have to have the actual capability
to override these capabilities. See docs/CAPABILITIES if you don't
understand why these default to what they do."""))
conf.registerGlobalValue(conf.supybot.capabilities, 'registeredUsers',
SpaceSeparatedListOfCapabilities([], """These are the
capabilities that are given to every authenticated user by default.
You probably want to use supybot.capabilities instead, to give these
capabilities both to registered and non-registered users."""))
conf.registerGlobalValue(conf.supybot.capabilities, 'default',
registry.Boolean(True, """Determines whether the bot by default will allow
users to have a capability. If this is disabled, a user must explicitly
have the capability for whatever command they wish to run.
To set this in a channel-specific way, use the 'channel capability
setdefault' command."""))
conf.registerGlobalValue(conf.supybot.capabilities, 'private',
registry.SpaceSeparatedListOfStrings([], """Determines what capabilities
the bot will never tell to a non-admin whether or not a user has them."""))
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79: