3
0
mirror of https://github.com/jlu5/PyLink.git synced 2025-01-11 12:42:34 +01:00

relay: add locks in db read/writes (thread safety)

This commit is contained in:
James Lu 2016-12-09 17:43:50 -08:00
parent e40b2f6529
commit 2b4943a780

View File

@ -14,6 +14,7 @@ relayusers = defaultdict(dict)
relayservers = defaultdict(dict)
spawnlocks = defaultdict(threading.RLock)
spawnlocks_servers = defaultdict(threading.RLock)
db_lock = threading.RLock()
dbname = utils.getDatabaseName('pylinkrelay')
datastore = structures.PickleDataStore('pylinkrelay', dbname)
@ -34,15 +35,16 @@ def initializeAll(irc):
# which would break connections.
world.started.wait(2)
for chanpair, entrydata in db.items():
# Iterate over all the channels stored in our relay links DB.
network, channel = chanpair
with db_lock:
for chanpair, entrydata in db.items():
# Iterate over all the channels stored in our relay links DB.
network, channel = chanpair
# Initialize each relay channel on their home network, and on every linked one too.
initializeChannel(irc, channel)
for link in entrydata['links']:
network, channel = link
# Initialize each relay channel on their home network, and on every linked one too.
initializeChannel(irc, channel)
for link in entrydata['links']:
network, channel = link
initializeChannel(irc, channel)
def main(irc=None):
"""Main function, called during plugin loading at start."""
@ -410,12 +412,13 @@ def getOrigUser(irc, user, targetirc=None):
def getRelay(chanpair):
"""Finds the matching relay entry name for the given (network name, channel)
pair, if one exists."""
if chanpair in db: # This chanpair is a shared channel; others link to it
return chanpair
# This chanpair is linked *to* a remote channel
for name, dbentry in db.items():
if chanpair in dbentry['links']:
return name
with db_lock:
if chanpair in db: # This chanpair is a shared channel; others link to it
return chanpair
# This chanpair is linked *to* a remote channel
for name, dbentry in db.items():
if chanpair in dbentry['links']:
return name
def getRemoteChan(irc, remoteirc, channel):
"""Returns the linked channel name for the given channel on remoteirc,
@ -428,9 +431,10 @@ def getRemoteChan(irc, remoteirc, channel):
if chanpair[0] == remotenetname:
return chanpair[1]
else:
for link in db[chanpair]['links']:
if link[0] == remotenetname:
return link[1]
with db_lock:
for link in db[chanpair]['links']:
if link[0] == remotenetname:
return link[1]
def initializeChannel(irc, channel):
"""Initializes a relay channel (merge local/remote users, set modes, etc.)."""
@ -441,8 +445,9 @@ def initializeChannel(irc, channel):
log.debug('(%s) relay.initializeChannel: relay pair found to be %s', irc.name, relay)
queued_users = []
if relay:
all_links = db[relay]['links'].copy()
all_links.update((relay,))
with db_lock:
all_links = db[relay]['links'].copy()
all_links.update((relay,))
log.debug('(%s) relay.initializeChannel: all_links: %s', irc.name, all_links)
# Iterate over all the remote channels linked in this relay.
@ -524,12 +529,14 @@ def checkClaim(irc, channel, sender, chanobj=None):
sender_modes = getPrefixModes(irc, irc, channel, sender, mlist=mlist)
log.debug('(%s) relay.checkClaim: sender modes (%s/%s) are %s (mlist=%s)', irc.name,
sender, channel, sender_modes, mlist)
# XXX: stop hardcoding modes to check for and support mlist in isHalfopPlus and friends
return (not relay) or irc.name == relay[0] or not db[relay]['claim'] or \
irc.name in db[relay]['claim'] or \
any([mode in sender_modes for mode in ('y', 'q', 'a', 'o', 'h')]) \
or irc.isInternalClient(sender) or \
irc.isInternalServer(sender)
with db_lock:
return (not relay) or irc.name == relay[0] or not db[relay]['claim'] or \
irc.name in db[relay]['claim'] or \
any([mode in sender_modes for mode in ('y', 'q', 'a', 'o', 'h')]) \
or irc.isInternalClient(sender) or \
irc.isInternalServer(sender)
def getSupportedUmodes(irc, remoteirc, modes):
"""Given a list of user modes, filters out all of those not supported by the
@ -1542,9 +1549,10 @@ def create(irc, source, args):
creator = irc.getHostmask(source)
# Create the relay database entry with the (network name, channel name)
# pair - this is just a dict with various keys.
db[(irc.name, channel)] = {'claim': [irc.name], 'links': set(),
'blocked_nets': set(), 'creator': creator,
'ts': time.time()}
with db_lock:
db[(irc.name, channel)] = {'claim': [irc.name], 'links': set(),
'blocked_nets': set(), 'creator': creator,
'ts': time.time()}
log.info('(%s) relay: Channel %s created by %s.', irc.name, channel, creator)
initializeChannel(irc, channel)
irc.reply('Done.')
@ -1554,9 +1562,10 @@ def _stop_relay(entry):
"""Internal function to deinitialize a relay link and its leaves."""
network, channel = entry
# Iterate over all the channel links and deinitialize them.
for link in db[entry]['links']:
removeChannel(world.networkobjects.get(link[0]), link[1])
removeChannel(world.networkobjects.get(network), channel)
with db_lock:
for link in db[entry]['links']:
removeChannel(world.networkobjects.get(link[0]), link[1])
removeChannel(world.networkobjects.get(network), channel)
def destroy(irc, source, args):
"""[<home network>] <channel>
@ -1586,17 +1595,18 @@ def destroy(irc, source, args):
entry = (network, channel)
if entry in db:
_stop_relay(entry)
del db[entry]
with db_lock:
if entry in db:
_stop_relay(entry)
del db[entry]
log.info('(%s) relay: Channel %s destroyed by %s.', irc.name,
channel, irc.getHostmask(source))
irc.reply('Done.')
else:
irc.error("No such channel %r exists. If you're trying to delink a channel from "
"another network, use the DESTROY command." % channel)
return
log.info('(%s) relay: Channel %s destroyed by %s.', irc.name,
channel, irc.getHostmask(source))
irc.reply('Done.')
else:
irc.error("No such channel %r exists. If you're trying to delink a channel from "
"another network, use the DESTROY command." % channel)
return
destroy = utils.add_cmd(destroy, featured=True)
@utils.add_cmd
@ -1613,20 +1623,20 @@ def purge(irc, source, args):
count = 0
### XXX lock to make this thread safe!
for entry in db.copy():
# Entry was owned by the target network; remove it
if entry[0] == network:
count += 1
_stop_relay(entry)
del db[entry]
else:
# Drop leaf channels involving the target network
for link in db[entry]['links'].copy():
if link[0] == network:
count += 1
removeChannel(world.networkobjects.get(network), link[1])
db[entry]['links'].remove(link)
with db_lock:
for entry in db.copy():
# Entry was owned by the target network; remove it
if entry[0] == network:
count += 1
_stop_relay(entry)
del db[entry]
else:
# Drop leaf channels involving the target network
for link in db[entry]['links'].copy():
if link[0] == network:
count += 1
removeChannel(world.networkobjects.get(network), link[1])
db[entry]['links'].remove(link)
irc.reply("Done. Purged %s entries involving the network %s." % (count, network))
@ -1686,7 +1696,8 @@ def link(irc, source, args):
return
try:
entry = db[(remotenet, channel)]
with db_lock:
entry = db[(remotenet, channel)]
except KeyError:
irc.error('No such relay %r exists.' % channel)
return
@ -1747,13 +1758,15 @@ def delink(irc, source, args):
"network).")
return
else:
for link in db[entry]['links'].copy():
if link[0] == remotenet:
removeChannel(world.networkobjects.get(remotenet), link[1])
db[entry]['links'].remove(link)
with db_lock:
for link in db[entry]['links'].copy():
if link[0] == remotenet:
removeChannel(world.networkobjects.get(remotenet), link[1])
db[entry]['links'].remove(link)
else:
removeChannel(irc, channel)
db[entry]['links'].remove((irc.name, channel))
with db_lock:
db[entry]['links'].remove((irc.name, channel))
irc.reply('Done.')
log.info('(%s) relay: Channel %s delinked from %s%s by %s.', irc.name,
channel, entry[0], entry[1], irc.getHostmask(source))
@ -1788,58 +1801,59 @@ def linked(irc, source, args):
irc.reply("Showing channels linked to %s:" % net, private=True)
# Sort the list of shared channels when displaying
for k, v in sorted(db.items()):
with db_lock:
for k, v in sorted(db.items()):
# Skip if we're filtering by network and the network given isn't relayed
# to the channel.
if net and not (net == k[0] or net in [link[0] for link in v['links']]):
continue
# Skip if we're filtering by network and the network given isn't relayed
# to the channel.
if net and not (net == k[0] or net in [link[0] for link in v['links']]):
continue
# Bold each network/channel name pair
s = '\x02%s%s\x02 ' % k
remoteirc = world.networkobjects.get(k[0])
channel = k[1] # Get the channel name from the network/channel pair
# Bold each network/channel name pair
s = '\x02%s%s\x02 ' % k
remoteirc = world.networkobjects.get(k[0])
channel = k[1] # Get the channel name from the network/channel pair
if remoteirc and channel in remoteirc.channels:
c = remoteirc.channels[channel]
if ('s', None) in c.modes or ('p', None) in c.modes:
# Only show secret channels to opers or those in the channel, and tag them as
# [secret].
localchan = getRemoteChan(remoteirc, irc, channel)
if irc.isOper(source) or (localchan and source in irc.channels[localchan].users):
s += '\x02[secret]\x02 '
else:
continue
if remoteirc and channel in remoteirc.channels:
c = remoteirc.channels[channel]
if ('s', None) in c.modes or ('p', None) in c.modes:
# Only show secret channels to opers or those in the channel, and tag them as
# [secret].
localchan = getRemoteChan(remoteirc, irc, channel)
if irc.isOper(source) or (localchan and source in irc.channels[localchan].users):
s += '\x02[secret]\x02 '
else:
continue
if v['links']:
# Sort, join up and output all the linked channel names. Silently drop
# entries for disconnected networks.
s += ' '.join([''.join(link) for link in sorted(v['links']) if link[0] in world.networkobjects
and world.networkobjects[link[0]].connected.is_set()])
if v['links']:
# Sort, join up and output all the linked channel names. Silently drop
# entries for disconnected networks.
s += ' '.join([''.join(link) for link in sorted(v['links']) if link[0] in world.networkobjects
and world.networkobjects[link[0]].connected.is_set()])
else: # Unless it's empty; then, well... just say no relays yet.
s += '(no relays yet)'
else: # Unless it's empty; then, well... just say no relays yet.
s += '(no relays yet)'
irc.reply(s, private=True)
irc.reply(s, private=True)
if irc.isOper(source):
s = ''
if irc.isOper(source):
s = ''
# If the caller is an oper, we can show the hostmasks of people
# that created all the available channels (Janus does this too!!)
creator = v.get('creator')
if creator:
# But only if the value actually exists (old DBs will have it
# missing).
s += ' by \x02%s\x02' % creator
# If the caller is an oper, we can show the hostmasks of people
# that created all the available channels (Janus does this too!!)
creator = v.get('creator')
if creator:
# But only if the value actually exists (old DBs will have it
# missing).
s += ' by \x02%s\x02' % creator
# Ditto for creation date
ts = v.get('ts')
if ts:
s += ' on %s' % time.ctime(ts)
# Ditto for creation date
ts = v.get('ts')
if ts:
s += ' on %s' % time.ctime(ts)
if s: # Indent to make the list look nicer
irc.reply(' Channel created%s.' % s, private=True)
if s: # Indent to make the list look nicer
irc.reply(' Channel created%s.' % s, private=True)
linked = utils.add_cmd(linked, featured=True)
@utils.add_cmd
@ -1864,30 +1878,32 @@ def linkacl(irc, source, args):
if not relay:
irc.error('No such relay %r exists.' % channel)
return
if cmd == 'list':
permissions.checkPermissions(irc, source, ['relay.linkacl.view'])
s = 'Blocked networks for \x02%s\x02: \x02%s\x02' % (channel, ', '.join(db[relay]['blocked_nets']) or '(empty)')
irc.reply(s)
return
permissions.checkPermissions(irc, source, ['relay.linkacl'])
try:
remotenet = args[2]
except IndexError:
irc.error(missingargs)
return
if cmd == 'deny':
db[relay]['blocked_nets'].add(remotenet)
irc.reply('Done.')
elif cmd == 'allow':
with db_lock:
if cmd == 'list':
permissions.checkPermissions(irc, source, ['relay.linkacl.view'])
s = 'Blocked networks for \x02%s\x02: \x02%s\x02' % (channel, ', '.join(db[relay]['blocked_nets']) or '(empty)')
irc.reply(s)
return
permissions.checkPermissions(irc, source, ['relay.linkacl'])
try:
db[relay]['blocked_nets'].remove(remotenet)
except KeyError:
irc.error('Network %r is not on the blacklist for %r.' % (remotenet, channel))
else:
remotenet = args[2]
except IndexError:
irc.error(missingargs)
return
if cmd == 'deny':
db[relay]['blocked_nets'].add(remotenet)
irc.reply('Done.')
else:
irc.error('Unknown subcommand %r: valid ones are ALLOW, DENY, and LIST.' % cmd)
elif cmd == 'allow':
try:
db[relay]['blocked_nets'].remove(remotenet)
except KeyError:
irc.error('Network %r is not on the blacklist for %r.' % (remotenet, channel))
else:
irc.reply('Done.')
else:
irc.error('Unknown subcommand %r: valid ones are ALLOW, DENY, and LIST.' % cmd)
@utils.add_cmd
def showuser(irc, source, args):
@ -1982,20 +1998,21 @@ def claim(irc, source, args):
# We override getRelay() here to limit the search to the current network.
relay = (irc.name, channel)
if relay not in db:
irc.error('No such relay %r exists.' % channel)
return
claimed = db[relay]["claim"]
try:
nets = args[1].strip()
except IndexError: # No networks given.
irc.reply('Channel \x02%s\x02 is claimed by: %s' %
(channel, ', '.join(claimed) or '\x1D(none)\x1D'))
else:
if nets == '-' or not nets:
claimed = set()
with db_lock:
if relay not in db:
irc.error('No such relay %r exists.' % channel)
return
claimed = db[relay]["claim"]
try:
nets = args[1].strip()
except IndexError: # No networks given.
irc.reply('Channel \x02%s\x02 is claimed by: %s' %
(channel, ', '.join(claimed) or '\x1D(none)\x1D'))
else:
claimed = set(nets.split(','))
db[relay]["claim"] = claimed
irc.reply('CLAIM for channel \x02%s\x02 set to: %s' %
(channel, ', '.join(claimed) or '\x1D(none)\x1D'))
if nets == '-' or not nets:
claimed = set()
else:
claimed = set(nets.split(','))
db[relay]["claim"] = claimed
irc.reply('CLAIM for channel \x02%s\x02 set to: %s' %
(channel, ', '.join(claimed) or '\x1D(none)\x1D'))