From d51c39935118223a3e7087c821af50e52ff152fd Mon Sep 17 00:00:00 2001 From: James Lu Date: Thu, 4 May 2017 18:50:42 -0700 Subject: [PATCH] Revert "relay: add locks in db read/writes (thread safety)" Unfortunately, this made relay prone to freezing the entire PyLink server. This reverts commit 2b4943a7801149f27d48cd798d7c5fe31cdcace5. --- plugins/relay.py | 320 ++++++++++++++++++++++------------------------- 1 file changed, 148 insertions(+), 172 deletions(-) diff --git a/plugins/relay.py b/plugins/relay.py index 7bb5481..f82469a 100644 --- a/plugins/relay.py +++ b/plugins/relay.py @@ -13,7 +13,6 @@ relayusers = defaultdict(dict) relayservers = defaultdict(dict) spawnlocks = defaultdict(threading.RLock) spawnlocks_servers = defaultdict(threading.RLock) -db_lock = threading.RLock() dbname = utils.getDatabaseName('pylinkrelay') datastore = structures.PickleDataStore('pylinkrelay', dbname) @@ -34,16 +33,14 @@ def initialize_all(irc): # which would break connections. world.started.wait(2) - with db_lock: - for chanpair, entrydata in db.items(): - # Iterate over all the channels stored in our relay links DB. - network, channel = chanpair - - # Initialize each relay channel on their home network, and on every linked one too. + for chanpair, entrydata in db.items(): + # Iterate over all the channels stored in our relay links DB. + network, channel = chanpair + # Initialize each relay channel on their home network, and on every linked one too. + initialize_channel(irc, channel) + for link in entrydata['links']: + network, channel = link initialize_channel(irc, channel) - for link in entrydata['links']: - network, channel = link - initialize_channel(irc, channel) def main(irc=None): """Main function, called during plugin loading at start.""" @@ -414,13 +411,12 @@ def get_orig_user(irc, user, targetirc=None): def get_relay(chanpair): """Finds the matching relay entry name for the given (network name, channel) pair, if one exists.""" - with db_lock: - if chanpair in db: # This chanpair is a shared channel; others link to it - return chanpair - # This chanpair is linked *to* a remote channel - for name, dbentry in db.items(): - if chanpair in dbentry['links']: - return name + if chanpair in db: # This chanpair is a shared channel; others link to it + return chanpair + # This chanpair is linked *to* a remote channel + for name, dbentry in db.items(): + if chanpair in dbentry['links']: + return name def get_remote_channel(irc, remoteirc, channel): """Returns the linked channel name for the given channel on remoteirc, @@ -433,10 +429,9 @@ def get_remote_channel(irc, remoteirc, channel): if chanpair[0] == remotenetname: return chanpair[1] else: - with db_lock: - for link in db[chanpair]['links']: - if link[0] == remotenetname: - return link[1] + for link in db[chanpair]['links']: + if link[0] == remotenetname: + return link[1] def initialize_channel(irc, channel): """Initializes a relay channel (merge local/remote users, set modes, etc.).""" @@ -447,9 +442,8 @@ def initialize_channel(irc, channel): log.debug('(%s) relay.initialize_channel: relay pair found to be %s', irc.name, relay) queued_users = [] if relay: - with db_lock: - all_links = db[relay]['links'].copy() - all_links.update((relay,)) + all_links = db[relay]['links'].copy() + all_links.update((relay,)) log.debug('(%s) relay.initialize_channel: all_links: %s', irc.name, all_links) # Iterate over all the remote channels linked in this relay. @@ -531,14 +525,12 @@ def check_claim(irc, channel, sender, chanobj=None): sender_modes = get_prefix_modes(irc, irc, channel, sender, mlist=mlist) log.debug('(%s) relay.check_claim: sender modes (%s/%s) are %s (mlist=%s)', irc.name, sender, channel, sender_modes, mlist) - # XXX: stop hardcoding modes to check for and support mlist in isHalfopPlus and friends - with db_lock: - return (not relay) or irc.name == relay[0] or not db[relay]['claim'] or \ - irc.name in db[relay]['claim'] or \ - any([mode in sender_modes for mode in ('y', 'q', 'a', 'o', 'h')]) \ - or irc.isInternalClient(sender) or \ - irc.isInternalServer(sender) + return (not relay) or irc.name == relay[0] or not db[relay]['claim'] or \ + irc.name in db[relay]['claim'] or \ + any([mode in sender_modes for mode in ('y', 'q', 'a', 'o', 'h')]) \ + or irc.isInternalClient(sender) or \ + irc.isInternalServer(sender) def get_supported_umodes(irc, remoteirc, modes): """Given a list of user modes, filters out all of those not supported by the @@ -1511,19 +1503,18 @@ def handle_disconnect(irc, numeric, command, args): announcement = conf.conf.get('relay', {}).get('disconnect_announcement') log.debug('(%s) relay: last connection successful: %s', irc.name, args.get('was_successful')) if announcement and args.get('was_successful'): - with db_lock: - for chanpair, entrydata in db.items(): - log.debug('(%s) relay: Looking up %s', irc.name, chanpair) - if chanpair[0] == irc.name: - for leaf in entrydata['links']: - log.debug('(%s) relay: Announcing disconnect to %s%s', irc.name, - leaf[0], leaf[1]) - remoteirc = world.networkobjects.get(leaf[0]) - if remoteirc and remoteirc.connected.is_set(): - text = string.Template(announcement).safe_substitute( - {'homenetwork': irc.name, 'homechannel': chanpair[1], - 'network': remoteirc.name, 'channel': leaf[1]}) - remoteirc.msg(leaf[1], text, loopback=False) + for chanpair, entrydata in db.items(): + log.debug('(%s) relay: Looking up %s', irc.name, chanpair) + if chanpair[0] == irc.name: + for leaf in entrydata['links']: + log.debug('(%s) relay: Announcing disconnect to %s%s', irc.name, + leaf[0], leaf[1]) + remoteirc = world.networkobjects.get(leaf[0]) + if remoteirc and remoteirc.connected.is_set(): + text = string.Template(announcement).safe_substitute( + {'homenetwork': irc.name, 'homechannel': chanpair[1], + 'network': remoteirc.name, 'channel': leaf[1]}) + remoteirc.msg(leaf[1], text, loopback=False) utils.add_hook(handle_disconnect, "PYLINK_DISCONNECT") @@ -1601,10 +1592,9 @@ def create(irc, source, args): creator = irc.getHostmask(source) # Create the relay database entry with the (network name, channel name) # pair - this is just a dict with various keys. - with db_lock: - db[(irc.name, channel)] = {'claim': [irc.name], 'links': set(), - 'blocked_nets': set(), 'creator': creator, - 'ts': time.time()} + db[(irc.name, channel)] = {'claim': [irc.name], 'links': set(), + 'blocked_nets': set(), 'creator': creator, + 'ts': time.time()} log.info('(%s) relay: Channel %s created by %s.', irc.name, channel, creator) initialize_channel(irc, channel) irc.reply('Done.') @@ -1614,10 +1604,9 @@ def stop_relay(entry): """Internal function to deinitialize a relay link and its leaves.""" network, channel = entry # Iterate over all the channel links and deinitialize them. - with db_lock: - for link in db[entry]['links']: - remove_channel(world.networkobjects.get(link[0]), link[1]) - remove_channel(world.networkobjects.get(network), channel) + for link in db[entry]['links']: + remove_channel(world.networkobjects.get(link[0]), link[1]) + remove_channel(world.networkobjects.get(network), channel) def destroy(irc, source, args): """[] @@ -1646,19 +1635,17 @@ def destroy(irc, source, args): permissions.checkPermissions(irc, source, ['relay.destroy.remote']) entry = (network, channel) + if entry in db: + stop_relay(entry) + del db[entry] - with db_lock: - if entry in db: - stop_relay(entry) - del db[entry] - - log.info('(%s) relay: Channel %s destroyed by %s.', irc.name, - channel, irc.getHostmask(source)) - irc.reply('Done.') - else: - irc.error("No such channel %r exists. If you're trying to delink a channel from " - "another network, use the DESTROY command." % channel) - return + log.info('(%s) relay: Channel %s destroyed by %s.', irc.name, + channel, irc.getHostmask(source)) + irc.reply('Done.') + else: + irc.error("No such channel %r exists. If you're trying to delink a channel from " + "another network, use the DESTROY command." % channel) + return destroy = utils.add_cmd(destroy, featured=True) @utils.add_cmd @@ -1675,20 +1662,19 @@ def purge(irc, source, args): count = 0 - with db_lock: - for entry in db.copy(): - # Entry was owned by the target network; remove it - if entry[0] == network: - count += 1 - stop_relay(entry) - del db[entry] - else: - # Drop leaf channels involving the target network - for link in db[entry]['links'].copy(): - if link[0] == network: - count += 1 - remove_channel(world.networkobjects.get(network), link[1]) - db[entry]['links'].remove(link) + for entry in db.copy(): + # Entry was owned by the target network; remove it + if entry[0] == network: + count += 1 + stop_relay(entry) + del db[entry] + else: + # Drop leaf channels involving the target network + for link in db[entry]['links'].copy(): + if link[0] == network: + count += 1 + remove_channel(world.networkobjects.get(network), link[1]) + db[entry]['links'].remove(link) irc.reply("Done. Purged %s entries involving the network %s." % (count, network)) @@ -1751,8 +1737,7 @@ def link(irc, source, args): return try: - with db_lock: - entry = db[(remotenet, channel)] + entry = db[(remotenet, channel)] except KeyError: irc.error('No such relay %r exists.' % args.channel) return @@ -1826,15 +1811,13 @@ def delink(irc, source, args): "network).") return else: - with db_lock: - for link in db[entry]['links'].copy(): - if link[0] == remotenet: - remove_channel(world.networkobjects.get(remotenet), link[1]) - db[entry]['links'].remove(link) + for link in db[entry]['links'].copy(): + if link[0] == remotenet: + remove_channel(world.networkobjects.get(remotenet), link[1]) + db[entry]['links'].remove(link) else: remove_channel(irc, channel) - with db_lock: - db[entry]['links'].remove((irc.name, channel)) + db[entry]['links'].remove((irc.name, channel)) irc.reply('Done.') log.info('(%s) relay: Channel %s delinked from %s%s by %s.', irc.name, channel, entry[0], entry[1], irc.getHostmask(source)) @@ -1871,59 +1854,58 @@ def linked(irc, source, args): irc.reply("Showing channels linked to %s:" % net, private=True) # Sort the list of shared channels when displaying - with db_lock: - for k, v in sorted(db.items()): + for k, v in sorted(db.items()): - # Skip if we're filtering by network and the network given isn't relayed - # to the channel. - if net and not (net == k[0] or net in [link[0] for link in v['links']]): - continue + # Skip if we're filtering by network and the network given isn't relayed + # to the channel. + if net and not (net == k[0] or net in [link[0] for link in v['links']]): + continue - # Bold each network/channel name pair - s = '\x02%s%s\x02 ' % k - remoteirc = world.networkobjects.get(k[0]) - channel = k[1] # Get the channel name from the network/channel pair + # Bold each network/channel name pair + s = '\x02%s%s\x02 ' % k + remoteirc = world.networkobjects.get(k[0]) + channel = k[1] # Get the channel name from the network/channel pair - if remoteirc and channel in remoteirc.channels: - c = remoteirc.channels[channel] - if ('s', None) in c.modes or ('p', None) in c.modes: - # Only show secret channels to opers or those in the channel, and tag them as - # [secret]. - localchan = get_remote_channel(remoteirc, irc, channel) - if irc.isOper(source) or (localchan and source in irc.channels[localchan].users): - s += '\x02[secret]\x02 ' - else: - continue + if remoteirc and channel in remoteirc.channels: + c = remoteirc.channels[channel] + if ('s', None) in c.modes or ('p', None) in c.modes: + # Only show secret channels to opers or those in the channel, and tag them as + # [secret]. + localchan = get_remote_channel(remoteirc, irc, channel) + if irc.isOper(source) or (localchan and source in irc.channels[localchan].users): + s += '\x02[secret]\x02 ' + else: + continue - if v['links']: - # Sort, join up and output all the linked channel names. Silently drop - # entries for disconnected networks. - s += ' '.join([''.join(link) for link in sorted(v['links']) if link[0] in world.networkobjects - and world.networkobjects[link[0]].connected.is_set()]) + if v['links']: + # Sort, join up and output all the linked channel names. Silently drop + # entries for disconnected networks. + s += ' '.join([''.join(link) for link in sorted(v['links']) if link[0] in world.networkobjects + and world.networkobjects[link[0]].connected.is_set()]) - else: # Unless it's empty; then, well... just say no relays yet. - s += '(no relays yet)' + else: # Unless it's empty; then, well... just say no relays yet. + s += '(no relays yet)' - irc.reply(s, private=True) + irc.reply(s, private=True) - if irc.isOper(source): - s = '' + if irc.isOper(source): + s = '' - # If the caller is an oper, we can show the hostmasks of people - # that created all the available channels (Janus does this too!!) - creator = v.get('creator') - if creator: - # But only if the value actually exists (old DBs will have it - # missing). - s += ' by \x02%s\x02' % creator + # If the caller is an oper, we can show the hostmasks of people + # that created all the available channels (Janus does this too!!) + creator = v.get('creator') + if creator: + # But only if the value actually exists (old DBs will have it + # missing). + s += ' by \x02%s\x02' % creator - # Ditto for creation date - ts = v.get('ts') - if ts: - s += ' on %s' % time.ctime(ts) + # Ditto for creation date + ts = v.get('ts') + if ts: + s += ' on %s' % time.ctime(ts) - if s: # Indent to make the list look nicer - irc.reply(' Channel created%s.' % s, private=True) + if s: # Indent to make the list look nicer + irc.reply(' Channel created%s.' % s, private=True) linked = utils.add_cmd(linked, featured=True) @utils.add_cmd @@ -1948,32 +1930,30 @@ def linkacl(irc, source, args): if not relay: irc.error('No such relay %r exists.' % channel) return + if cmd == 'list': + permissions.checkPermissions(irc, source, ['relay.linkacl.view']) + s = 'Blocked networks for \x02%s\x02: \x02%s\x02' % (channel, ', '.join(db[relay]['blocked_nets']) or '(empty)') + irc.reply(s) + return - with db_lock: - if cmd == 'list': - permissions.checkPermissions(irc, source, ['relay.linkacl.view']) - s = 'Blocked networks for \x02%s\x02: \x02%s\x02' % (channel, ', '.join(db[relay]['blocked_nets']) or '(empty)') - irc.reply(s) - return - - permissions.checkPermissions(irc, source, ['relay.linkacl']) + permissions.checkPermissions(irc, source, ['relay.linkacl']) + try: + remotenet = args[2] + except IndexError: + irc.error(missingargs) + return + if cmd == 'deny': + db[relay]['blocked_nets'].add(remotenet) + irc.reply('Done.') + elif cmd == 'allow': try: - remotenet = args[2] - except IndexError: - irc.error(missingargs) - return - if cmd == 'deny': - db[relay]['blocked_nets'].add(remotenet) - irc.reply('Done.') - elif cmd == 'allow': - try: - db[relay]['blocked_nets'].remove(remotenet) - except KeyError: - irc.error('Network %r is not on the blacklist for %r.' % (remotenet, channel)) - else: - irc.reply('Done.') + db[relay]['blocked_nets'].remove(remotenet) + except KeyError: + irc.error('Network %r is not on the blacklist for %r.' % (remotenet, channel)) else: - irc.error('Unknown subcommand %r: valid ones are ALLOW, DENY, and LIST.' % cmd) + irc.reply('Done.') + else: + irc.error('Unknown subcommand %r: valid ones are ALLOW, DENY, and LIST.' % cmd) @utils.add_cmd def showuser(irc, source, args): @@ -2076,22 +2056,18 @@ def claim(irc, source, args): # We override get_relay() here to limit the search to the current network. relay = (irc.name, channel) - with db_lock: - if relay not in db: - irc.error('No relay %r exists on this network (this command must be run on the ' - 'network this channel was created on).' % channel) - return - claimed = db[relay]["claim"] - try: - nets = args[1].strip() - except IndexError: # No networks given. - irc.reply('Channel \x02%s\x02 is claimed by: %s' % - (channel, ', '.join(claimed) or '\x1D(none)\x1D')) - else: - if nets == '-' or not nets: - claimed = set() - else: - claimed = set(nets.split(',')) - db[relay]["claim"] = claimed - irc.reply('CLAIM for channel \x02%s\x02 set to: %s' % - (channel, ', '.join(claimed) or '\x1D(none)\x1D')) + if relay not in db: + irc.error('No relay %r exists on this network (this command must be run on the ' + 'network this channel was created on).' % channel) + return + claimed = db[relay]["claim"] + try: + nets = args[1].strip() + except IndexError: # No networks given. + irc.reply('Channel \x02%s\x02 is claimed by: %s' % + (channel, ', '.join(claimed) or '\x1D(none)\x1D')) + else: + claimed = set(nets.split(',')) + db[relay]["claim"] = claimed + irc.reply('CLAIM for channel \x02%s\x02 set to: %s' % + (channel, ', '.join(claimed) or '\x1D(none)\x1D'))