3
0
mirror of https://github.com/jlu5/PyLink.git synced 2024-11-24 03:29:28 +01:00

Revert "relay: add locks in db read/writes (thread safety)"

Unfortunately, this made relay prone to freezing the entire PyLink server.

This reverts commit 2b4943a780.
This commit is contained in:
James Lu 2017-05-04 18:50:42 -07:00
parent 1358fedca6
commit d51c399351

View File

@ -13,7 +13,6 @@ relayusers = defaultdict(dict)
relayservers = defaultdict(dict) relayservers = defaultdict(dict)
spawnlocks = defaultdict(threading.RLock) spawnlocks = defaultdict(threading.RLock)
spawnlocks_servers = defaultdict(threading.RLock) spawnlocks_servers = defaultdict(threading.RLock)
db_lock = threading.RLock()
dbname = utils.getDatabaseName('pylinkrelay') dbname = utils.getDatabaseName('pylinkrelay')
datastore = structures.PickleDataStore('pylinkrelay', dbname) datastore = structures.PickleDataStore('pylinkrelay', dbname)
@ -34,16 +33,14 @@ def initialize_all(irc):
# which would break connections. # which would break connections.
world.started.wait(2) world.started.wait(2)
with db_lock: for chanpair, entrydata in db.items():
for chanpair, entrydata in db.items(): # Iterate over all the channels stored in our relay links DB.
# Iterate over all the channels stored in our relay links DB. network, channel = chanpair
network, channel = chanpair # Initialize each relay channel on their home network, and on every linked one too.
initialize_channel(irc, channel)
# Initialize each relay channel on their home network, and on every linked one too. for link in entrydata['links']:
network, channel = link
initialize_channel(irc, channel) initialize_channel(irc, channel)
for link in entrydata['links']:
network, channel = link
initialize_channel(irc, channel)
def main(irc=None): def main(irc=None):
"""Main function, called during plugin loading at start.""" """Main function, called during plugin loading at start."""
@ -414,13 +411,12 @@ def get_orig_user(irc, user, targetirc=None):
def get_relay(chanpair): def get_relay(chanpair):
"""Finds the matching relay entry name for the given (network name, channel) """Finds the matching relay entry name for the given (network name, channel)
pair, if one exists.""" pair, if one exists."""
with db_lock: if chanpair in db: # This chanpair is a shared channel; others link to it
if chanpair in db: # This chanpair is a shared channel; others link to it return chanpair
return chanpair # This chanpair is linked *to* a remote channel
# This chanpair is linked *to* a remote channel for name, dbentry in db.items():
for name, dbentry in db.items(): if chanpair in dbentry['links']:
if chanpair in dbentry['links']: return name
return name
def get_remote_channel(irc, remoteirc, channel): def get_remote_channel(irc, remoteirc, channel):
"""Returns the linked channel name for the given channel on remoteirc, """Returns the linked channel name for the given channel on remoteirc,
@ -433,10 +429,9 @@ def get_remote_channel(irc, remoteirc, channel):
if chanpair[0] == remotenetname: if chanpair[0] == remotenetname:
return chanpair[1] return chanpair[1]
else: else:
with db_lock: for link in db[chanpair]['links']:
for link in db[chanpair]['links']: if link[0] == remotenetname:
if link[0] == remotenetname: return link[1]
return link[1]
def initialize_channel(irc, channel): def initialize_channel(irc, channel):
"""Initializes a relay channel (merge local/remote users, set modes, etc.).""" """Initializes a relay channel (merge local/remote users, set modes, etc.)."""
@ -447,9 +442,8 @@ def initialize_channel(irc, channel):
log.debug('(%s) relay.initialize_channel: relay pair found to be %s', irc.name, relay) log.debug('(%s) relay.initialize_channel: relay pair found to be %s', irc.name, relay)
queued_users = [] queued_users = []
if relay: if relay:
with db_lock: all_links = db[relay]['links'].copy()
all_links = db[relay]['links'].copy() all_links.update((relay,))
all_links.update((relay,))
log.debug('(%s) relay.initialize_channel: all_links: %s', irc.name, all_links) log.debug('(%s) relay.initialize_channel: all_links: %s', irc.name, all_links)
# Iterate over all the remote channels linked in this relay. # Iterate over all the remote channels linked in this relay.
@ -531,14 +525,12 @@ def check_claim(irc, channel, sender, chanobj=None):
sender_modes = get_prefix_modes(irc, irc, channel, sender, mlist=mlist) sender_modes = get_prefix_modes(irc, irc, channel, sender, mlist=mlist)
log.debug('(%s) relay.check_claim: sender modes (%s/%s) are %s (mlist=%s)', irc.name, log.debug('(%s) relay.check_claim: sender modes (%s/%s) are %s (mlist=%s)', irc.name,
sender, channel, sender_modes, mlist) sender, channel, sender_modes, mlist)
# XXX: stop hardcoding modes to check for and support mlist in isHalfopPlus and friends # XXX: stop hardcoding modes to check for and support mlist in isHalfopPlus and friends
with db_lock: return (not relay) or irc.name == relay[0] or not db[relay]['claim'] or \
return (not relay) or irc.name == relay[0] or not db[relay]['claim'] or \ irc.name in db[relay]['claim'] or \
irc.name in db[relay]['claim'] or \ any([mode in sender_modes for mode in ('y', 'q', 'a', 'o', 'h')]) \
any([mode in sender_modes for mode in ('y', 'q', 'a', 'o', 'h')]) \ or irc.isInternalClient(sender) or \
or irc.isInternalClient(sender) or \ irc.isInternalServer(sender)
irc.isInternalServer(sender)
def get_supported_umodes(irc, remoteirc, modes): def get_supported_umodes(irc, remoteirc, modes):
"""Given a list of user modes, filters out all of those not supported by the """Given a list of user modes, filters out all of those not supported by the
@ -1511,19 +1503,18 @@ def handle_disconnect(irc, numeric, command, args):
announcement = conf.conf.get('relay', {}).get('disconnect_announcement') announcement = conf.conf.get('relay', {}).get('disconnect_announcement')
log.debug('(%s) relay: last connection successful: %s', irc.name, args.get('was_successful')) log.debug('(%s) relay: last connection successful: %s', irc.name, args.get('was_successful'))
if announcement and args.get('was_successful'): if announcement and args.get('was_successful'):
with db_lock: for chanpair, entrydata in db.items():
for chanpair, entrydata in db.items(): log.debug('(%s) relay: Looking up %s', irc.name, chanpair)
log.debug('(%s) relay: Looking up %s', irc.name, chanpair) if chanpair[0] == irc.name:
if chanpair[0] == irc.name: for leaf in entrydata['links']:
for leaf in entrydata['links']: log.debug('(%s) relay: Announcing disconnect to %s%s', irc.name,
log.debug('(%s) relay: Announcing disconnect to %s%s', irc.name, leaf[0], leaf[1])
leaf[0], leaf[1]) remoteirc = world.networkobjects.get(leaf[0])
remoteirc = world.networkobjects.get(leaf[0]) if remoteirc and remoteirc.connected.is_set():
if remoteirc and remoteirc.connected.is_set(): text = string.Template(announcement).safe_substitute(
text = string.Template(announcement).safe_substitute( {'homenetwork': irc.name, 'homechannel': chanpair[1],
{'homenetwork': irc.name, 'homechannel': chanpair[1], 'network': remoteirc.name, 'channel': leaf[1]})
'network': remoteirc.name, 'channel': leaf[1]}) remoteirc.msg(leaf[1], text, loopback=False)
remoteirc.msg(leaf[1], text, loopback=False)
utils.add_hook(handle_disconnect, "PYLINK_DISCONNECT") utils.add_hook(handle_disconnect, "PYLINK_DISCONNECT")
@ -1601,10 +1592,9 @@ def create(irc, source, args):
creator = irc.getHostmask(source) creator = irc.getHostmask(source)
# Create the relay database entry with the (network name, channel name) # Create the relay database entry with the (network name, channel name)
# pair - this is just a dict with various keys. # pair - this is just a dict with various keys.
with db_lock: db[(irc.name, channel)] = {'claim': [irc.name], 'links': set(),
db[(irc.name, channel)] = {'claim': [irc.name], 'links': set(), 'blocked_nets': set(), 'creator': creator,
'blocked_nets': set(), 'creator': creator, 'ts': time.time()}
'ts': time.time()}
log.info('(%s) relay: Channel %s created by %s.', irc.name, channel, creator) log.info('(%s) relay: Channel %s created by %s.', irc.name, channel, creator)
initialize_channel(irc, channel) initialize_channel(irc, channel)
irc.reply('Done.') irc.reply('Done.')
@ -1614,10 +1604,9 @@ def stop_relay(entry):
"""Internal function to deinitialize a relay link and its leaves.""" """Internal function to deinitialize a relay link and its leaves."""
network, channel = entry network, channel = entry
# Iterate over all the channel links and deinitialize them. # Iterate over all the channel links and deinitialize them.
with db_lock: for link in db[entry]['links']:
for link in db[entry]['links']: remove_channel(world.networkobjects.get(link[0]), link[1])
remove_channel(world.networkobjects.get(link[0]), link[1]) remove_channel(world.networkobjects.get(network), channel)
remove_channel(world.networkobjects.get(network), channel)
def destroy(irc, source, args): def destroy(irc, source, args):
"""[<home network>] <channel> """[<home network>] <channel>
@ -1646,19 +1635,17 @@ def destroy(irc, source, args):
permissions.checkPermissions(irc, source, ['relay.destroy.remote']) permissions.checkPermissions(irc, source, ['relay.destroy.remote'])
entry = (network, channel) entry = (network, channel)
if entry in db:
stop_relay(entry)
del db[entry]
with db_lock: log.info('(%s) relay: Channel %s destroyed by %s.', irc.name,
if entry in db: channel, irc.getHostmask(source))
stop_relay(entry) irc.reply('Done.')
del db[entry] else:
irc.error("No such channel %r exists. If you're trying to delink a channel from "
log.info('(%s) relay: Channel %s destroyed by %s.', irc.name, "another network, use the DESTROY command." % channel)
channel, irc.getHostmask(source)) return
irc.reply('Done.')
else:
irc.error("No such channel %r exists. If you're trying to delink a channel from "
"another network, use the DESTROY command." % channel)
return
destroy = utils.add_cmd(destroy, featured=True) destroy = utils.add_cmd(destroy, featured=True)
@utils.add_cmd @utils.add_cmd
@ -1675,20 +1662,19 @@ def purge(irc, source, args):
count = 0 count = 0
with db_lock: for entry in db.copy():
for entry in db.copy(): # Entry was owned by the target network; remove it
# Entry was owned by the target network; remove it if entry[0] == network:
if entry[0] == network: count += 1
count += 1 stop_relay(entry)
stop_relay(entry) del db[entry]
del db[entry] else:
else: # Drop leaf channels involving the target network
# Drop leaf channels involving the target network for link in db[entry]['links'].copy():
for link in db[entry]['links'].copy(): if link[0] == network:
if link[0] == network: count += 1
count += 1 remove_channel(world.networkobjects.get(network), link[1])
remove_channel(world.networkobjects.get(network), link[1]) db[entry]['links'].remove(link)
db[entry]['links'].remove(link)
irc.reply("Done. Purged %s entries involving the network %s." % (count, network)) irc.reply("Done. Purged %s entries involving the network %s." % (count, network))
@ -1751,8 +1737,7 @@ def link(irc, source, args):
return return
try: try:
with db_lock: entry = db[(remotenet, channel)]
entry = db[(remotenet, channel)]
except KeyError: except KeyError:
irc.error('No such relay %r exists.' % args.channel) irc.error('No such relay %r exists.' % args.channel)
return return
@ -1826,15 +1811,13 @@ def delink(irc, source, args):
"network).") "network).")
return return
else: else:
with db_lock: for link in db[entry]['links'].copy():
for link in db[entry]['links'].copy(): if link[0] == remotenet:
if link[0] == remotenet: remove_channel(world.networkobjects.get(remotenet), link[1])
remove_channel(world.networkobjects.get(remotenet), link[1]) db[entry]['links'].remove(link)
db[entry]['links'].remove(link)
else: else:
remove_channel(irc, channel) remove_channel(irc, channel)
with db_lock: db[entry]['links'].remove((irc.name, channel))
db[entry]['links'].remove((irc.name, channel))
irc.reply('Done.') irc.reply('Done.')
log.info('(%s) relay: Channel %s delinked from %s%s by %s.', irc.name, log.info('(%s) relay: Channel %s delinked from %s%s by %s.', irc.name,
channel, entry[0], entry[1], irc.getHostmask(source)) channel, entry[0], entry[1], irc.getHostmask(source))
@ -1871,59 +1854,58 @@ def linked(irc, source, args):
irc.reply("Showing channels linked to %s:" % net, private=True) irc.reply("Showing channels linked to %s:" % net, private=True)
# Sort the list of shared channels when displaying # Sort the list of shared channels when displaying
with db_lock: for k, v in sorted(db.items()):
for k, v in sorted(db.items()):
# Skip if we're filtering by network and the network given isn't relayed # Skip if we're filtering by network and the network given isn't relayed
# to the channel. # to the channel.
if net and not (net == k[0] or net in [link[0] for link in v['links']]): if net and not (net == k[0] or net in [link[0] for link in v['links']]):
continue continue
# Bold each network/channel name pair # Bold each network/channel name pair
s = '\x02%s%s\x02 ' % k s = '\x02%s%s\x02 ' % k
remoteirc = world.networkobjects.get(k[0]) remoteirc = world.networkobjects.get(k[0])
channel = k[1] # Get the channel name from the network/channel pair channel = k[1] # Get the channel name from the network/channel pair
if remoteirc and channel in remoteirc.channels: if remoteirc and channel in remoteirc.channels:
c = remoteirc.channels[channel] c = remoteirc.channels[channel]
if ('s', None) in c.modes or ('p', None) in c.modes: if ('s', None) in c.modes or ('p', None) in c.modes:
# Only show secret channels to opers or those in the channel, and tag them as # Only show secret channels to opers or those in the channel, and tag them as
# [secret]. # [secret].
localchan = get_remote_channel(remoteirc, irc, channel) localchan = get_remote_channel(remoteirc, irc, channel)
if irc.isOper(source) or (localchan and source in irc.channels[localchan].users): if irc.isOper(source) or (localchan and source in irc.channels[localchan].users):
s += '\x02[secret]\x02 ' s += '\x02[secret]\x02 '
else: else:
continue continue
if v['links']: if v['links']:
# Sort, join up and output all the linked channel names. Silently drop # Sort, join up and output all the linked channel names. Silently drop
# entries for disconnected networks. # entries for disconnected networks.
s += ' '.join([''.join(link) for link in sorted(v['links']) if link[0] in world.networkobjects s += ' '.join([''.join(link) for link in sorted(v['links']) if link[0] in world.networkobjects
and world.networkobjects[link[0]].connected.is_set()]) and world.networkobjects[link[0]].connected.is_set()])
else: # Unless it's empty; then, well... just say no relays yet. else: # Unless it's empty; then, well... just say no relays yet.
s += '(no relays yet)' s += '(no relays yet)'
irc.reply(s, private=True) irc.reply(s, private=True)
if irc.isOper(source): if irc.isOper(source):
s = '' s = ''
# If the caller is an oper, we can show the hostmasks of people # If the caller is an oper, we can show the hostmasks of people
# that created all the available channels (Janus does this too!!) # that created all the available channels (Janus does this too!!)
creator = v.get('creator') creator = v.get('creator')
if creator: if creator:
# But only if the value actually exists (old DBs will have it # But only if the value actually exists (old DBs will have it
# missing). # missing).
s += ' by \x02%s\x02' % creator s += ' by \x02%s\x02' % creator
# Ditto for creation date # Ditto for creation date
ts = v.get('ts') ts = v.get('ts')
if ts: if ts:
s += ' on %s' % time.ctime(ts) s += ' on %s' % time.ctime(ts)
if s: # Indent to make the list look nicer if s: # Indent to make the list look nicer
irc.reply(' Channel created%s.' % s, private=True) irc.reply(' Channel created%s.' % s, private=True)
linked = utils.add_cmd(linked, featured=True) linked = utils.add_cmd(linked, featured=True)
@utils.add_cmd @utils.add_cmd
@ -1948,32 +1930,30 @@ def linkacl(irc, source, args):
if not relay: if not relay:
irc.error('No such relay %r exists.' % channel) irc.error('No such relay %r exists.' % channel)
return return
if cmd == 'list':
permissions.checkPermissions(irc, source, ['relay.linkacl.view'])
s = 'Blocked networks for \x02%s\x02: \x02%s\x02' % (channel, ', '.join(db[relay]['blocked_nets']) or '(empty)')
irc.reply(s)
return
with db_lock: permissions.checkPermissions(irc, source, ['relay.linkacl'])
if cmd == 'list': try:
permissions.checkPermissions(irc, source, ['relay.linkacl.view']) remotenet = args[2]
s = 'Blocked networks for \x02%s\x02: \x02%s\x02' % (channel, ', '.join(db[relay]['blocked_nets']) or '(empty)') except IndexError:
irc.reply(s) irc.error(missingargs)
return return
if cmd == 'deny':
permissions.checkPermissions(irc, source, ['relay.linkacl']) db[relay]['blocked_nets'].add(remotenet)
irc.reply('Done.')
elif cmd == 'allow':
try: try:
remotenet = args[2] db[relay]['blocked_nets'].remove(remotenet)
except IndexError: except KeyError:
irc.error(missingargs) irc.error('Network %r is not on the blacklist for %r.' % (remotenet, channel))
return
if cmd == 'deny':
db[relay]['blocked_nets'].add(remotenet)
irc.reply('Done.')
elif cmd == 'allow':
try:
db[relay]['blocked_nets'].remove(remotenet)
except KeyError:
irc.error('Network %r is not on the blacklist for %r.' % (remotenet, channel))
else:
irc.reply('Done.')
else: else:
irc.error('Unknown subcommand %r: valid ones are ALLOW, DENY, and LIST.' % cmd) irc.reply('Done.')
else:
irc.error('Unknown subcommand %r: valid ones are ALLOW, DENY, and LIST.' % cmd)
@utils.add_cmd @utils.add_cmd
def showuser(irc, source, args): def showuser(irc, source, args):
@ -2076,22 +2056,18 @@ def claim(irc, source, args):
# We override get_relay() here to limit the search to the current network. # We override get_relay() here to limit the search to the current network.
relay = (irc.name, channel) relay = (irc.name, channel)
with db_lock: if relay not in db:
if relay not in db: irc.error('No relay %r exists on this network (this command must be run on the '
irc.error('No relay %r exists on this network (this command must be run on the ' 'network this channel was created on).' % channel)
'network this channel was created on).' % channel) return
return claimed = db[relay]["claim"]
claimed = db[relay]["claim"] try:
try: nets = args[1].strip()
nets = args[1].strip() except IndexError: # No networks given.
except IndexError: # No networks given. irc.reply('Channel \x02%s\x02 is claimed by: %s' %
irc.reply('Channel \x02%s\x02 is claimed by: %s' % (channel, ', '.join(claimed) or '\x1D(none)\x1D'))
(channel, ', '.join(claimed) or '\x1D(none)\x1D')) else:
else: claimed = set(nets.split(','))
if nets == '-' or not nets: db[relay]["claim"] = claimed
claimed = set() irc.reply('CLAIM for channel \x02%s\x02 set to: %s' %
else: (channel, ', '.join(claimed) or '\x1D(none)\x1D'))
claimed = set(nets.split(','))
db[relay]["claim"] = claimed
irc.reply('CLAIM for channel \x02%s\x02 set to: %s' %
(channel, ', '.join(claimed) or '\x1D(none)\x1D'))