mirror of
https://github.com/jlu5/PyLink.git
synced 2024-11-01 01:09:22 +01:00
Add get_service_options API to merge together global & local network options
First part of #642.
This commit is contained in:
parent
9ec83f3995
commit
e0d82cdf3d
16
classes.py
16
classes.py
@ -544,6 +544,22 @@ class PyLinkNetworkCore(structures.CamelCaseToSnakeCase):
|
||||
|
||||
return default
|
||||
|
||||
def get_service_options(self, servicename: str, option: str, itertype: type, global_option=None):
|
||||
"""
|
||||
Returns a merged copy of the requested service bot option. This includes:
|
||||
|
||||
1) If present, the value of the config option servers::<NETNAME>::<SERVICENAME>_<OPTION> (netopt)
|
||||
2) If present, the value of the config option <SERVICENAME>::<GLOBAL_OPTION>, where
|
||||
<GLOBAL_OPTION> is either the 'global_option' keyword value or <OPTION> (globalopt)
|
||||
|
||||
For itertype, the following types are allowed:
|
||||
- list: items are combined as globalopt + netopt
|
||||
- dict: items are combined as {**globalopt, **netopt}
|
||||
"""
|
||||
netopt = self.serverdata.get('%s_%s' % (servicename, option)) or itertype()
|
||||
globalopt = conf.conf.get(servicename, {}).get(global_option or option) or itertype()
|
||||
return utils.merge_iterables(globalopt, netopt)
|
||||
|
||||
def has_cap(self, capab):
|
||||
"""
|
||||
Returns whether this protocol module instance has the requested capability.
|
||||
|
@ -258,6 +258,49 @@ class BaseProtocolTest(unittest.TestCase):
|
||||
self.assertEqual(f('myserv', 'myopt'), 998877) # Read local option
|
||||
self.assertEqual(f('myserv', 'myopt', default='unused'), 998877)
|
||||
|
||||
def test_get_service_options_list(self):
|
||||
f = self.p.get_service_options
|
||||
self.assertEqual(f('myserv', 'items', list), []) # No value anywhere
|
||||
|
||||
# Define global option
|
||||
with patch.dict(conf.conf, {'myserv': {'items': [1, 10, 100], 'empty': []}}):
|
||||
self.assertEqual(f('myserv', 'items', list), [1, 10, 100]) # Global value only
|
||||
self.assertEqual(f('myserv', 'empty', list), [])
|
||||
|
||||
# Both global and local options exist
|
||||
with patch.dict(self.p.serverdata, {'myserv_items': [2, 4, 6, 8], 'empty': []}):
|
||||
self.assertEqual(f('myserv', 'items', list), [1, 10, 100, 2, 4, 6, 8])
|
||||
# Custom global_option setting
|
||||
self.assertEqual(f('myserv', 'items', list, global_option='nonexistent'), [2, 4, 6, 8])
|
||||
self.assertEqual(f('myserv', 'empty', list), [])
|
||||
|
||||
# Define local option
|
||||
with patch.dict(self.p.serverdata, {'myserv_items': [1, 0, 0, 3]}):
|
||||
self.assertEqual(f('myserv', 'items', list), [1, 0, 0, 3]) # Read local option
|
||||
|
||||
def test_get_service_options_dict(self):
|
||||
f = self.p.get_service_options
|
||||
self.assertEqual(f('chanman', 'items', dict), {}) # No value anywhere
|
||||
|
||||
# This is just mildly relevant test data, it's not actually used anywhere.
|
||||
globalopt = {'o': '@', 'v': '+', 'a': '!', 'h': '%'}
|
||||
localopt = {'a': '&', 'q': '~'}
|
||||
# Define global option
|
||||
with patch.dict(conf.conf, {'chanman': {'prefixes': globalopt, 'empty': {}}}):
|
||||
self.assertEqual(f('chanman', 'prefixes', dict), globalopt) # Global value only
|
||||
self.assertEqual(f('chanman', 'empty', dict), {})
|
||||
|
||||
# Both global and local options exist
|
||||
with patch.dict(self.p.serverdata, {'chanman_prefixes': localopt, 'empty': {}}):
|
||||
self.assertEqual(f('chanman', 'prefixes', dict), {**globalopt, **localopt})
|
||||
|
||||
self.assertEqual(f('chanman', 'items', dict), {}) # No value anywhere
|
||||
self.assertEqual(f('chanman', 'empty', dict), {})
|
||||
|
||||
# Define local option
|
||||
with patch.dict(self.p.serverdata, {'chanman_prefixes': localopt}):
|
||||
self.assertEqual(f('chanman', 'prefixes', dict), localopt) # Read local option
|
||||
|
||||
### MODE HANDLING
|
||||
def test_parse_modes_channel_rfc(self):
|
||||
# These are basic tests that only use RFC 1459 defined modes.
|
||||
@ -436,7 +479,7 @@ class BaseProtocolTest(unittest.TestCase):
|
||||
"Cycling a ban +b-b should remove it (different case)"
|
||||
)
|
||||
|
||||
def test_parse_mode_channel_prefixmode_has_nick(self):
|
||||
def test_parse_modes_channel_prefixmode_has_nick(self):
|
||||
c = self.p.channels['#'] = Channel(self.p, name='#')
|
||||
u = self._make_user('mynick', uid='myuid')
|
||||
c.users.add(u)
|
||||
|
@ -254,5 +254,22 @@ class UtilsTestCase(unittest.TestCase):
|
||||
self.assertFalse(f('*9*', '14', lambda s: s.zfill(13)))
|
||||
self.assertTrue(f('*chin*', 'machine', str.upper))
|
||||
|
||||
def test_merge_iterables(self):
|
||||
f = utils.merge_iterables
|
||||
self.assertEqual(f([], []), [])
|
||||
self.assertEqual(f({}, {}), {})
|
||||
self.assertEqual(f(set(), set()), set())
|
||||
|
||||
self.assertEqual(f([1,2], [4,5,6]), [1,2,4,5,6])
|
||||
self.assertEqual(f({'a': 'b'}, {'c': 'd', 'e': 'f'}),
|
||||
{'a': 'b', 'c': 'd', 'e': 'f'})
|
||||
self.assertEqual(f({0,1,2}, {1,3,5}),
|
||||
{0,1,2,3,5})
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
f([1,2,3], {'a': 'b'}) # mismatched type
|
||||
with self.assertRaises(ValueError):
|
||||
f([], set()) # mismatched type
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
18
utils.py
18
utils.py
@ -851,3 +851,21 @@ def match_text(glob, text, filterfunc=str.lower):
|
||||
text = filterfunc(text)
|
||||
|
||||
return re.match(_glob2re(glob), text)
|
||||
|
||||
def merge_iterables(A, B):
|
||||
"""
|
||||
Merges the values in two iterables. A and B must be of the same type, and one of the following:
|
||||
|
||||
- list: items are combined as A + B
|
||||
- set: items are combined as A | B
|
||||
- dict: items are combined as {**A, **B}
|
||||
"""
|
||||
if type(A) != type(B):
|
||||
raise ValueError("inputs must be the same type")
|
||||
|
||||
if isinstance(A, list):
|
||||
return A + B
|
||||
elif isinstance(A, set):
|
||||
return A | B
|
||||
elif isinstance(A, dict):
|
||||
return {**A, **B}
|
||||
|
Loading…
Reference in New Issue
Block a user