From 03e02dda51f1fb5acfee791794c8da03c4e726d7 Mon Sep 17 00:00:00 2001 From: James Lu Date: Sun, 12 Nov 2017 11:56:33 -0800 Subject: [PATCH] relay: replace garbage locking code with proper filtering in relay_joins I'm not even going to start on how much time I spent working on this... Closes #548, #529 --- plugins/relay.py | 91 ++++++++++++++++++++++++++---------------------- 1 file changed, 50 insertions(+), 41 deletions(-) diff --git a/plugins/relay.py b/plugins/relay.py index 0bc34b5..7814984 100644 --- a/plugins/relay.py +++ b/plugins/relay.py @@ -473,54 +473,59 @@ def initialize_channel(irc, channel): # We're initializing a relay that already exists. This can be done at # ENDBURST, or on the LINK command. relay = get_relay(irc, channel) + log.debug('(%s) relay.initialize_channel being called on %s', irc.name, channel) log.debug('(%s) relay.initialize_channel: relay pair found to be %s', irc.name, relay) queued_users = [] if relay: - # Only allow one thread to initialize channels at a time. - if relay in channels_init_in_progress and channels_init_in_progress[relay].is_set(): - log.debug('(%s) relay.initialize_channel: skipping init of %s since another one is in progress', irc.name, relay) - return + all_links = db[relay]['links'].copy() + all_links.update((relay,)) + log.debug('(%s) relay.initialize_channel: all_links: %s', irc.name, all_links) - channels_init_in_progress[relay].set() - try: - all_links = db[relay]['links'].copy() - all_links.update((relay,)) - log.debug('(%s) relay.initialize_channel: all_links: %s', irc.name, all_links) + # Iterate over all the remote channels linked in this relay. + for link in all_links: + remotenet, remotechan = link + if remotenet == irc.name: # If the network is us, skip. + continue + remoteirc = world.networkobjects.get(remotenet) - # Iterate over all the remote channels linked in this relay. - for link in all_links: - remotenet, remotechan = link - if remotenet == irc.name: # If the network is us, skip. + if remoteirc is None: + # Remote network doesn't have an IRC object; e.g. it was removed + # from the config. Skip this. + continue + + # Give each network a tiny bit of leeway to finish up its connection. + # This is better than just dropping users their completely. + if not remoteirc.connected.wait(TCONDITION_TIMEOUT): + continue + + # Join their (remote) users and set their modes, if applicable. + if remotechan in remoteirc.channels: + rc = remoteirc.channels[remotechan] + ''' + if not hasattr(rc, '_relay_initial_burst'): + rc._relay_initial_burst = threading.Event() + + if rc._relay_initial_burst.is_set(): + log.debug('(%s) relay.initialize_channel: skipping inbound burst from %s/%s => %s/%s ' + 'as it has already been bursted', irc.name, remoteirc.name, remotechan, irc.name, channel) continue - remoteirc = world.networkobjects.get(remotenet) + rc._relay_initial_burst.set() + ''' + relay_joins(remoteirc, remotechan, rc.users, rc.ts, targetirc=irc) - if remoteirc is None: - # Remote network doesn't have an IRC object; e.g. it was removed - # from the config. Skip this. - continue + # Only update the topic if it's different from what we already have, + # and topic bursting is complete. + if rc.topicset and rc.topic != irc.channels[channel].topic: + irc.topic_burst(irc.sid, channel, rc.topic) - if not (remoteirc.connected.is_set() and get_remote_channel(remoteirc, irc, remotechan)): - continue # Remote network isn't connected. + # Send our users and channel modes to the other nets + if channel in irc.channels: + c = irc._channels[channel] + relay_joins(irc, channel, c.users, c.ts) - # Join their (remote) users and set their modes, if applicable. - if remotechan in remoteirc.channels: - rc = remoteirc.channels[remotechan] - relay_joins(remoteirc, remotechan, rc.users, rc.ts) - - # Only update the topic if it's different from what we already have, - # and topic bursting is complete. - if rc.topicset and rc.topic != irc.channels[channel].topic: - irc.topic_burst(irc.sid, channel, rc.topic) - - # Send our users and channel modes to the other nets - if channel in irc.channels: - relay_joins(irc, channel, irc.channels[channel].users, irc.channels[channel].ts) - - if 'pylink' in world.services: - world.services['pylink'].join(irc, channel) - finally: - channels_init_in_progress[relay].clear() + if 'pylink' in world.services: + world.services['pylink'].join(irc, channel) def remove_channel(irc, channel): """Destroys a relay channel by parting all of its users.""" @@ -673,9 +678,10 @@ def iterate_all_present(origirc, origuser, func, extra_args=(), kwargs=None): ### EVENT HANDLER INTERNALS -def relay_joins(irc, channel, users, ts, **kwargs): +def relay_joins(irc, channel, users, ts, targetirc=None, **kwargs): """ - Relays one or more users' joins from a channel to its relay links. + Relays one or more users' joins from a channel to its relay links. If targetirc is given, only burst + to that specific network. """ if ts < 750000: @@ -740,7 +746,10 @@ def relay_joins(irc, channel, users, ts, **kwargs): remoteirc.call_hooks([rsid, 'PYLINK_RELAY_JOIN', {'channel': remotechan, 'users': [u[-1] for u in queued_users]}]) - iterate_all(irc, _relay_joins_loop, extra_args=(channel, users, ts), kwargs=kwargs) + if targetirc: + _relay_joins_loop(irc, targetirc, channel, users, ts, **kwargs) + else: + iterate_all(irc, _relay_joins_loop, extra_args=(channel, users, ts), kwargs=kwargs) def relay_part(irc, *args, **kwargs): """