3
0
mirror of https://github.com/jlu5/PyLink.git synced 2025-01-12 13:12:36 +01:00
PyLink/test/protocol_test_fixture.py

962 lines
39 KiB
Python

"""
A test fixture for PyLink protocol modules.
"""
import time
import unittest
import collections
import itertools
from unittest.mock import patch
from pylinkirc import conf, world
from pylinkirc.log import log
from pylinkirc.classes import User, Server, Channel
class DummySocket():
def __init__(self):
#self.recv_messages = collections.deque()
self.sent_messages = collections.deque()
@staticmethod
def connect(address):
return
'''
def recv(bufsize, *args):
if self.recv_messages:
data = self.recv_messages.popleft()
print('<-', data)
return data
else:
return None
'''
def recv(bufsize, *args):
raise NotImplementedError
def send(self, data):
print('->', data)
self.sent_messages.append(data)
class BaseProtocolTest(unittest.TestCase):
proto_class = None
netname = 'test'
serverdata = conf.conf['servers'][netname]
def setUp(self):
if not self.proto_class:
raise RuntimeError("Must set target protocol module in proto_class")
self.p = self.proto_class(self.netname)
# Stub connect() and the socket for now...
self.p.connect = lambda self: None
self.p.socket = DummySocket()
if self.serverdata:
self.p.serverdata = self.serverdata
def _make_user(self, nick, uid, ts=None, sid=None, **kwargs):
"""
Creates a user for testing.
"""
if ts is None:
ts = int(time.time())
userobj = User(self.p, nick, ts, uid, sid, **kwargs)
self.p.users[uid] = userobj
return userobj
### STATEKEEPING FUNCTIONS
def test_nick_to_uid(self):
self.assertEqual(self.p.nick_to_uid('TestUser'), None)
self._make_user('TestUser', 'testuid1')
self.assertEqual(self.p.nick_to_uid('TestUser'), 'testuid1')
self.assertEqual(self.p.nick_to_uid('TestUser', multi=True), ['testuid1'])
self.assertEqual(self.p.nick_to_uid('BestUser'), None)
self.assertEqual(self.p.nick_to_uid('RestUser', multi=True), [])
self._make_user('TestUser', 'testuid2')
self.assertEqual(self.p.nick_to_uid('TestUser', multi=True), ['testuid1', 'testuid2'])
def test_is_internal(self):
self.p.servers['internalserver'] = Server(self.p, None, 'internal.server', internal=True)
self.p.sid = 'internalserver'
self.p.servers['externalserver'] = Server(self.p, None, 'external.server', internal=False)
iuser = self._make_user('someone', 'uid1', sid='internalserver')
euser = self._make_user('sometwo', 'uid2', sid='externalserver')
self.assertTrue(self.p.is_internal_server('internalserver'))
self.assertFalse(self.p.is_internal_server('externalserver'))
self.assertTrue(self.p.is_internal_client('uid1'))
self.assertFalse(self.p.is_internal_client('uid2'))
def test_is_manipulatable(self):
self.p.servers['serv1'] = Server(self.p, None, 'myserv.local', internal=True)
iuser = self._make_user('yes', 'uid1', sid='serv1', manipulatable=True)
euser = self._make_user('no', 'uid2', manipulatable=False)
self.assertTrue(self.p.is_manipulatable_client('uid1'))
self.assertFalse(self.p.is_manipulatable_client('uid2'))
def test_get_service_bot(self):
self.assertFalse(self.p.get_service_bot('nonexistent'))
regular_user = self._make_user('Guest12345', 'Guest12345@1')
service_user = self._make_user('MyServ', 'MyServ@2')
service_user.service = 'myserv'
self.assertFalse(self.p.get_service_bot('Guest12345@1'))
with patch.dict(world.services, {'myserv': 'myserv instance'}, clear=True):
self.assertEqual(self.p.get_service_bot('MyServ@2'), 'myserv instance')
def test_to_lower(self):
check = lambda inp, expected: self.assertEqual(self.p.to_lower(inp), expected)
check_unchanged = lambda inp: self.assertEqual(self.p.to_lower(inp), inp)
check('BLAH!', 'blah!')
check('BLAH!', 'blah!') # since we memoize
check_unchanged('zabcdefghijklmnopqrstuvwxy')
check('123Xyz !@#$%^&*()-=+', '123xyz !@#$%^&*()-=+')
if self.p.casemapping == 'rfc1459':
check('hello [] {} |\\ ~^', 'hello [] [] \\\\ ^^')
check('{Test Case}', '[test case]')
else:
check_unchanged('hello [] {} |\\ ~^')
check('{Test Case}', '{test case}')
def test_is_nick(self):
assertT = lambda inp: self.assertTrue(self.p.is_nick(inp))
assertF = lambda inp: self.assertFalse(self.p.is_nick(inp))
assertT('test')
assertT('PyLink')
assertT('[bracketman]')
assertT('{RACKETman}')
assertT('bar|tender')
assertT('\\bar|bender\\')
assertT('GL|ovd')
assertT('B')
assertT('`')
assertT('Hello123')
assertT('test-')
assertT('test-test')
assertT('_jkl9')
assertT('_-jkl9')
assertF('')
assertF('?')
assertF('nick@network')
assertF('Space flight')
assertF(' ')
assertF(' Space blight')
assertF('0')
assertF('-test')
assertF('#lounge')
assertF('\\bar/bender\\')
assertF('GL/ovd') # Technically not valid, but some IRCds don't care ;)
assertF('100AAAAAC') # TS6 UID
self.assertFalse(self.p.is_nick('longnicklongnicklongnicklongnicklongnicklongnick', nicklen=20))
self.assertTrue(self.p.is_nick('ninechars', nicklen=9))
self.assertTrue(self.p.is_nick('ChanServ', nicklen=20))
self.assertTrue(self.p.is_nick('leneight', nicklen=9))
self.assertFalse(self.p.is_nick('bitmonster', nicklen=9))
self.assertFalse(self.p.is_nick('ninecharsplus', nicklen=12))
def test_is_channel(self):
assertT = lambda inp: self.assertTrue(self.p.is_channel(inp))
assertF = lambda inp: self.assertFalse(self.p.is_channel(inp))
assertT('#test')
assertT('#')
assertT('#a#b#c')
assertF('nick!user@host')
assertF('&channel') # we don't support these yet
assertF('lorem ipsum')
def test_is_server_name(self):
self.assertTrue(self.p.is_server_name('test.local'))
self.assertTrue(self.p.is_server_name('IRC.example.com'))
self.assertTrue(self.p.is_server_name('services.'))
self.assertFalse(self.p.is_server_name('.org'))
self.assertFalse(self.p.is_server_name('bacon'))
def test_is_hostmask(self):
assertT = lambda inp: self.assertTrue(self.p.is_hostmask(inp))
assertF = lambda inp: self.assertFalse(self.p.is_hostmask(inp))
assertT('nick!user@host')
assertT('abc123!~ident@ip1-2-3-4.example.net')
assertF('brick!user')
assertF('user@host')
assertF('!@')
assertF('!')
assertF('@abcd')
assertF('#channel')
assertF('test.host')
assertF('nick ! user @ host')
assertF('alpha!beta@example.net#otherchan') # Janus workaround
def test_get_SID(self):
self.p.servers['serv1'] = Server(self.p, None, 'myserv.local', internal=True)
check = lambda inp, expected: self.assertEqual(self.p._get_SID(inp), expected)
check('myserv.local', 'serv1')
check('MYSERV.local', 'serv1')
check('serv1', 'serv1')
check('other.server', 'other.server')
def test_get_UID(self):
u = self._make_user('you', uid='100')
check = lambda inp, expected: self.assertEqual(self.p._get_UID(inp), expected)
check('you', '100') # nick to UID
check('YOu', '100')
check('100', '100') # already a UID
check('Test', 'Test') # non-existent
def test_get_hostmask(self):
u = self._make_user('lorem', 'testUID', ident='ipsum', host='sit.amet')
self.assertEqual(self.p.get_hostmask(u.uid), 'lorem!ipsum@sit.amet')
def test_get_friendly_name(self):
u = self._make_user('lorem', 'testUID', ident='ipsum', host='sit.amet')
s = self.p.servers['mySID'] = Server(self.p, None, 'irc.example.org')
c = self.p._channels['#abc'] = Channel('#abc')
self.assertEqual(self.p.get_friendly_name(u.uid), 'lorem')
self.assertEqual(self.p.get_friendly_name('#abc'), '#abc')
self.assertEqual(self.p.get_friendly_name('mySID'), 'irc.example.org')
# TODO: _squit wrapper
### MISC UTILS
def test_get_service_option(self):
f = self.p.get_service_option
self.assertEqual(f('myserv', 'myopt'), None) # No value anywhere
self.assertEqual(f('myserv', 'myopt', default=0), 0)
# Define global option
with patch.dict(conf.conf, {'myserv': {'youropt': True, 'myopt': 1234}}):
self.assertEqual(f('myserv', 'myopt'), 1234) # Read global option
# Both global and local options exist
with patch.dict(self.p.serverdata, {'myserv_myopt': 2345}):
self.assertEqual(f('myserv', 'myopt'), 2345) # Read local option
self.assertEqual(f('myserv', 'myopt', global_option='youropt'), 2345)
# Custom global_option setting
self.assertEqual(f('myserv', 'abcdef', global_option='youropt'), True)
# Define local option
with patch.dict(self.p.serverdata, {'myserv_myopt': 998877}):
self.assertEqual(f('myserv', 'myopt'), 998877) # Read local option
self.assertEqual(f('myserv', 'myopt', default='unused'), 998877)
def test_get_service_options_list(self):
f = self.p.get_service_options
self.assertEqual(f('myserv', 'items', list), []) # No value anywhere
# Define global option
with patch.dict(conf.conf, {'myserv': {'items': [1, 10, 100], 'empty': []}}):
self.assertEqual(f('myserv', 'items', list), [1, 10, 100]) # Global value only
self.assertEqual(f('myserv', 'empty', list), [])
# Both global and local options exist
with patch.dict(self.p.serverdata, {'myserv_items': [2, 4, 6, 8], 'empty': []}):
self.assertEqual(f('myserv', 'items', list), [1, 10, 100, 2, 4, 6, 8])
# Custom global_option setting
self.assertEqual(f('myserv', 'items', list, global_option='nonexistent'), [2, 4, 6, 8])
self.assertEqual(f('myserv', 'empty', list), [])
# Define local option
with patch.dict(self.p.serverdata, {'myserv_items': [1, 0, 0, 3]}):
self.assertEqual(f('myserv', 'items', list), [1, 0, 0, 3]) # Read local option
def test_get_service_options_dict(self):
f = self.p.get_service_options
self.assertEqual(f('chanman', 'items', dict), {}) # No value anywhere
# This is just mildly relevant test data, it's not actually used anywhere.
globalopt = {'o': '@', 'v': '+', 'a': '!', 'h': '%'}
localopt = {'a': '&', 'q': '~'}
# Define global option
with patch.dict(conf.conf, {'chanman': {'prefixes': globalopt, 'empty': {}}}):
self.assertEqual(f('chanman', 'prefixes', dict), globalopt) # Global value only
self.assertEqual(f('chanman', 'empty', dict), {})
# Both global and local options exist
with patch.dict(self.p.serverdata, {'chanman_prefixes': localopt, 'empty': {}}):
self.assertEqual(f('chanman', 'prefixes', dict), {**globalopt, **localopt})
self.assertEqual(f('chanman', 'items', dict), {}) # No value anywhere
self.assertEqual(f('chanman', 'empty', dict), {})
# Define local option
with patch.dict(self.p.serverdata, {'chanman_prefixes': localopt}):
self.assertEqual(f('chanman', 'prefixes', dict), localopt) # Read local option
### MODE HANDLING
def test_parse_modes_channel_rfc(self):
# These are basic tests that only use RFC 1459 defined modes.
# IRCds supporting more complex modes can define new test cases if needed.
u = self._make_user('testuser', uid='100')
c = self.p.channels['#testruns'] = Channel(self.p, name='#testruns')
self.assertEqual(
self.p.parse_modes('#testruns', ['+m']),
[('+m', None)]
)
self.assertEqual(
self.p.parse_modes('#testruns', ['+l', '3']),
[('+l', '3')]
)
self.assertEqual(
self.p.parse_modes('#testruns', ['+ntl', '59']),
[('+n', None), ('+t', None), ('+l', '59')]
)
self.assertEqual(
self.p.parse_modes('#testruns', ['+k-n', 'test']),
[('+k', 'test'), ('-n', None)]
)
self.assertEqual(
self.p.parse_modes('#testruns', ['+o', '102']), # unknown UID
[]
)
c.users.add(u)
u.channels.add(c)
self.assertEqual(
self.p.parse_modes('#testruns', ['+o', '100']),
[('+o', '100')]
)
self.assertEqual(
self.p.parse_modes('#testruns', ['+vip', '100']),
[('+v', '100'), ('+i', None), ('+p', None)]
)
def test_parse_modes_channel_rfc(self):
# These are basic tests that only use RFC 1459 defined modes.
# IRCds supporting more complex modes can define new test cases if needed.
c = self.p.channels['#testruns'] = Channel(self.p, name='#testruns')
# Note: base case is not defined and raises AssertionError
self.assertEqual(
self.p.parse_modes('#testruns', ['+m']), # add modes
[('+m', None)]
)
self.assertEqual(
self.p.parse_modes('#testruns', ['-tn']), # remove modes
[('-t', None), ('-n', None)]
)
self.assertEqual(
self.p.parse_modes('#TESTRUNS', ['-tn']), # different case target
[('-t', None), ('-n', None)]
)
self.assertEqual(
self.p.parse_modes('#testruns', ['+l', '3']), # modes w/ arguments
[('+l', '3')]
)
self.assertEqual(
self.p.parse_modes('#testruns', ['+nlt', '59']), # combination
[('+n', None), ('+l', '59'), ('+t', None)]
)
self.assertEqual(
self.p.parse_modes('#testruns', ['+k-n', 'test']), # swapping +/-
[('+k', 'test'), ('-n', None)]
)
self.assertEqual(
self.p.parse_modes('#testruns', ['n-s']), # sloppy syntax
[('+n', None), ('-s', None)]
)
self.assertEqual(
self.p.parse_modes('#testruns', ['+bmi', '*!test@example.com']),
[('+b', '*!test@example.com'), ('+m', None), ('+i', None)]
)
def test_parse_modes_prefixmodes_rfc(self):
c = self.p.channels['#testruns'] = Channel(self.p, name='#testruns')
self.assertEqual(
self.p.parse_modes('#testruns', ['+ov', '102', '101']), # unknown UIDs are ignored
[]
)
u = self._make_user('test100', uid='100')
c.users.add(u)
u.channels.add(c)
self.assertEqual(
self.p.parse_modes('#testruns', ['+o', '100']),
[('+o', '100')]
)
self.assertEqual(
self.p.parse_modes('#testruns', ['+vip', '100']),
[('+v', '100'), ('+i', None), ('+p', None)]
)
self.assertEqual(
self.p.parse_modes('#testruns', ['-o+bn', '100', '*!test@example.com']),
[('-o', '100'), ('+b', '*!test@example.com'), ('+n', None)]
)
self.assertEqual(
# 2nd user missing
self.p.parse_modes('#testruns', ['+oovv', '100', '102', '100', '102']),
[('+o', '100'), ('+v', '100')]
)
u2 = self._make_user('test102', uid='102')
c.users.add(u2)
u2.channels.add(c)
self.assertEqual(
# two users interleaved
self.p.parse_modes('#testruns', ['+oovv', '100', '102', '100', '102']),
[('+o', '100'), ('+o', '102'), ('+v', '100'), ('+v', '102')]
)
self.assertEqual(
# Mode cycle
self.p.parse_modes('#testruns', ['+o-o', '100', '100']),
[('+o', '100'), ('-o', '100')]
)
def test_parse_modes_channel_ban_complex(self):
c = self.p.channels['#testruns'] = Channel(self.p, name='#testruns')
self.assertEqual(
# add first ban, but don't remove second because it doesn't exist
self.p.parse_modes('#testruns', ['+b-b', '*!*@test1', '*!*@test2']),
[('+b', '*!*@test1')],
"First ban should have been added, and second ignored"
)
self.p.apply_modes('#testruns', [('+b', '*!*@test1')])
self.assertEqual(
# remove first ban because it matches case
self.p.parse_modes('#testruns', ['-b', '*!*@test1']),
[('-b', '*!*@test1')],
"First ban should have been removed (same case)"
)
self.assertEqual(
# remove first ban despite different case
self.p.parse_modes('#testruns', ['-b', '*!*@TEST1']),
[('-b', '*!*@test1')],
"First ban should have been removed (different case)"
)
self.p.apply_modes('#testruns', [('+b', '*!*@Test2')])
self.assertEqual(
# remove second ban despite different case
self.p.parse_modes('#testruns', ['-b', '*!*@test2']),
[('-b', '*!*@Test2')],
"Second ban should have been removed (different case)"
)
def test_parse_modes_channel_ban_cycle(self):
c = self.p.channels['#testruns'] = Channel(self.p, name='#testruns')
self.assertEqual(
self.p.parse_modes('#testruns', ['+b-b', '*!*@example.com', '*!*@example.com']),
[('+b', '*!*@example.com'), ('-b', '*!*@example.com')],
"Cycling a ban +b-b should remove it"
)
self.assertEqual(
self.p.parse_modes('#testruns', ['-b+b', '*!*@example.com', '*!*@example.com']),
[('+b', '*!*@example.com')],
"Cycling a ban -b+b should add it"
)
self.assertEqual(
self.p.parse_modes('#testruns', ['+b-b', '*!*@example.com', '*!*@Example.com']),
[('+b', '*!*@example.com'), ('-b', '*!*@example.com')],
"Cycling a ban +b-b should remove it (different case)"
)
self.assertEqual(
self.p.parse_modes('#testruns', ['+b-b', '*!*@Example.com', '*!*@example.com']),
[('+b', '*!*@Example.com'), ('-b', '*!*@Example.com')],
"Cycling a ban +b-b should remove it (different case)"
)
def test_parse_modes_channel_prefixmode_has_nick(self):
c = self.p.channels['#'] = Channel(self.p, name='#')
u = self._make_user('mynick', uid='myuid')
c.users.add(u)
u.channels.add(c)
self.assertEqual(
self.p.parse_modes('#', ['+o', 'myuid']),
[('+o', 'myuid')],
"+o on UID should be registered"
)
self.assertEqual(
self.p.parse_modes('#', ['+o', 'mynick']),
[('+o', 'myuid')],
"+o on nick should be registered"
)
self.assertEqual(
self.p.parse_modes('#', ['+o', 'MyUiD']),
[],
"+o on wrong case UID should be ignored"
)
self.assertEqual(
self.p.parse_modes('#', ['+o', 'MyNick']),
[('+o', 'myuid')],
"+o on different case nick should be registered"
)
self.assertEqual(
self.p.parse_modes('#', ['-o', 'myuid']),
[('-o', 'myuid')],
"-o on UID should be registered"
)
self.assertEqual(
self.p.parse_modes('#', ['-o', 'mynick']),
[('-o', 'myuid')],
"-o on nick should be registered"
)
# TODO: check the output of parse_modes() here too
def test_parse_modes_channel_key(self):
c = self.p.channels['#pylink'] = Channel(self.p, name='#pylink')
c.modes = {('k', 'foobar')}
modes = self.p.parse_modes('#pylink', ['-k', 'foobar'])
self.assertEqual(modes, [('-k', 'foobar')], "Parse result should include -k")
modes = self.p.parse_modes('#pylink', ['-k', 'aBcDeF'])
self.assertEqual(modes, [], "Incorrect key removal should be ignored")
modes = self.p.parse_modes('#pylink', ['+k', 'aBcDeF'])
self.assertEqual(modes, [('+k', 'aBcDeF')], "Parse result should include +k (replace key)")
# Mismatched case - treat this as remove
# Some IRCds allow this (Unreal, P10), others do not (InspIRCd). However, if such a message
# makes actually its way to us it is most likely valid.
# Note: Charybdis and ngIRCd do -k * removal instead so this case will never happen there
modes = self.p.parse_modes('#pylink', ['-k', 'FooBar'])
self.assertEqual(modes, [('-k', 'foobar')], "Parse result should include -k (different case)")
modes = self.p.parse_modes('#pylink', ['-k', '*'])
self.assertEqual(modes, [('-k', 'foobar')], "Parse result should include -k (-k *)")
def test_parse_modes_user_rfc(self):
u = self._make_user('testuser', uid='100')
self.assertEqual(
self.p.parse_modes('100', ['+i-w+x']),
[('+i', None), ('-w', None), ('+x', None)]
)
self.assertEqual(
# Sloppy syntax, but OK
self.p.parse_modes('100', ['wx']),
[('+w', None), ('+x', None)]
)
def test_apply_modes_channel_simple(self):
c = self.p.channels['#'] = Channel(self.p, name='#')
self.p.apply_modes('#', [('+m', None)])
self.assertEqual(c.modes, {('m', None)})
self.p.apply_modes('#', []) # No-op
self.assertEqual(c.modes, {('m', None)})
self.p.apply_modes('#', [('-m', None)])
self.assertFalse(c.modes) # assert is empty
def test_apply_modes_channel_add_remove(self):
c = self.p.channels['#'] = Channel(self.p, name='#')
self.p.apply_modes('#', [('+i', None)])
self.assertEqual(c.modes, {('i', None)})
self.p.apply_modes('#', [('+n', None), ('-i', None)])
self.assertEqual(c.modes, {('n', None)})
def test_apply_modes_channel_remove_nonexistent(self):
c = self.p.channels['#abc'] = Channel(self.p, name='#abc')
self.p.apply_modes('#abc', [('+t', None)])
self.assertEqual(c.modes, {('t', None)})
self.p.apply_modes('#abc', [('-n', None), ('-i', None)])
self.assertEqual(c.modes, {('t', None)})
def test_apply_modes_channel_limit(self):
c = self.p.channels['#abc'] = Channel(self.p, name='#abc')
self.p.apply_modes('#abc', [('+t', None), ('+l', '30')])
self.assertEqual(c.modes, {('t', None), ('l', '30')})
self.p.apply_modes('#abc', [('+l', '55')])
self.assertEqual(c.modes, {('t', None), ('l', '55')})
self.p.apply_modes('#abc', [('-l', None)])
self.assertEqual(c.modes, {('t', None)})
def test_apply_modes_channel_key(self):
c = self.p.channels['#pylink'] = Channel(self.p, name='#pylink')
self.p.apply_modes('#pylink', [('+k', 'password123'), ('+s', None)])
self.assertEqual(c.modes, {('s', None), ('k', 'password123')})
self.p.apply_modes('#pylink', [('+k', 'qwerty')])
self.assertEqual(c.modes, {('s', None), ('k', 'qwerty')})
self.p.apply_modes('#pylink', [('-k', 'abcdef')])
# Trying to remove with wrong key is a no-op
self.assertEqual(c.modes, {('s', None), ('k', 'qwerty')})
self.p.apply_modes('#pylink', [('-k', 'qwerty')])
self.assertEqual(c.modes, {('s', None)})
self.p.apply_modes('#pylink', [('+k', 'qwerty')])
self.assertEqual(c.modes, {('s', None), ('k', 'qwerty')})
self.p.apply_modes('#pylink', [('-k', 'QWERTY')]) # Remove with different case
self.assertEqual(c.modes, {('s', None)})
self.p.apply_modes('#pylink', [('+k', '12345')]) # Replace existing key
self.assertEqual(c.modes, {('s', None), ('k', '12345')})
def test_apply_modes_channel_ban(self):
c = self.p.channels['#Magic'] = Channel(self.p, name='#Magic')
self.p.apply_modes('#Magic', [('+b', '*!*@test.host'), ('+b', '*!*@best.host')])
self.assertEqual(c.modes, {('b', '*!*@test.host'), ('b', '*!*@best.host')}, "Bans should be added")
# This should be a no-op
self.p.apply_modes('#Magic', [('-b', '*!*non-existent')])
self.assertEqual(c.modes, {('b', '*!*@test.host'), ('b', '*!*@best.host')}, "Trying to unset non-existent ban should be no-op")
# Simple removal
self.p.apply_modes('#Magic', [('-b', '*!*@test.host')])
self.assertEqual(c.modes, {('b', '*!*@best.host')}, "Ban on *!*@test.host be removed (same case as original)")
# Removal but different case than original
self.p.apply_modes('#Magic', [('-b', '*!*@BEST.HOST')])
self.assertFalse(c.modes, "Ban on *!*@best.host should be removed (different case)")
def test_apply_modes_channel_mode_cycle(self):
c = self.p.channels['#Magic'] = Channel(self.p, name='#Magic')
self.p.apply_modes('#Magic', [('+b', '*!*@example.net'), ('-b', '*!*@example.net')])
self.assertEqual(c.modes, set(), "Ban should have been removed (same case)")
self.p.apply_modes('#Magic', [('+b', '*!*@example.net'), ('-b', '*!*@Example.net')])
self.assertEqual(c.modes, set(), "Ban should have been removed (different case)")
u = self._make_user('nick', uid='user')
c.users.add(u.uid)
u.channels.add(c)
self.p.apply_modes('#Magic', [('+o', 'user'), ('-o', 'user')])
self.assertEqual(c.modes, set(), "No prefixmodes should have been set")
self.assertFalse(c.is_op('user'))
self.assertFalse(c.get_prefix_modes('user'))
def test_apply_modes_channel_prefixmodes(self):
# Make some users
c = self.p.channels['#staff'] = Channel(self.p, name='#staff')
u1 = self._make_user('user100', uid='100')
u2 = self._make_user('user101', uid='101')
c.users.add(u1.uid)
u1.channels.add(c)
c.users.add(u2.uid)
u2.channels.add(c)
# Set modes
self.p.apply_modes('#staff', [('+o', '100'), ('+v', '101'), ('+t', None)])
self.assertEqual(c.modes, {('t', None)})
# State checks, lots of them... TODO: move this into Channel class tests
self.assertTrue(c.is_op('100'))
self.assertTrue(c.is_op_plus('100'))
self.assertFalse(c.is_halfop('100'))
self.assertTrue(c.is_halfop_plus('100'))
self.assertFalse(c.is_voice('100'))
self.assertTrue(c.is_voice_plus('100'))
self.assertEqual(c.get_prefix_modes('100'), ['op'])
self.assertTrue(c.is_voice('101'))
self.assertTrue(c.is_voice_plus('101'))
self.assertFalse(c.is_halfop('101'))
self.assertFalse(c.is_halfop_plus('101'))
self.assertFalse(c.is_op('101'))
self.assertFalse(c.is_op_plus('101'))
self.assertEqual(c.get_prefix_modes('101'), ['voice'])
self.assertFalse(c.prefixmodes['owner'])
self.assertFalse(c.prefixmodes['admin'])
self.assertEqual(c.prefixmodes['op'], {'100'})
self.assertFalse(c.prefixmodes['halfop'])
self.assertEqual(c.prefixmodes['voice'], {'101'})
self.p.apply_modes('#staff', [('-o', '100')])
self.assertEqual(c.modes, {('t', None)})
self.assertFalse(c.get_prefix_modes('100'))
self.assertEqual(c.get_prefix_modes('101'), ['voice'])
def test_apply_modes_user(self):
u = self._make_user('nick', uid='user')
self.p.apply_modes('user', [('+o', None), ('+w', None)])
self.assertEqual(u.modes, {('o', None), ('w', None)})
self.p.apply_modes('user', [('-o', None), ('+i', None)])
self.assertEqual(u.modes, {('i', None), ('w', None)})
def test_reverse_modes_simple(self):
c = self.p.channels['#foobar'] = Channel(self.p, name='#foobar')
c.modes = {('m', None), ('n', None)}
# This function supports both strings and mode lists
# Base cases
for inp in {'', '+', '-'}:
self.assertEqual(self.p.reverse_modes('#foobar', inp), '+')
out = self.p.reverse_modes('#foobar', [])
self.assertEqual(out, [])
# One simple
out = self.p.reverse_modes('#foobar', '+t')
self.assertEqual(out, '-t')
out = self.p.reverse_modes('#foobar', [('+t', None)])
self.assertEqual(out, [('-t', None)])
out = self.p.reverse_modes('#foobar', '+m')
self.assertEqual(out, '+', 'Calling reverse_modes() on an already set mode is a no-op')
out = self.p.reverse_modes('#foobar', [('+m', None)])
self.assertEqual(out, [], 'Calling reverse_modes() on an already set mode is a no-op')
out = self.p.reverse_modes('#foobar', [('-i', None)])
self.assertEqual(out, [], 'Calling reverse_modes() on non-existent mode is a no-op')
def test_reverse_modes_multi(self):
c = self.p.channels['#foobar'] = Channel(self.p, name='#foobar')
c.modes = {('t', None), ('n', None)}
# reverse_modes should ignore modes that were already set
out = self.p.reverse_modes('#foobar', '+nt')
self.assertEqual(out, '+')
out = self.p.reverse_modes('#foobar', [('+m', None), ('+i', None)])
self.assertEqual(out, [('-m', None), ('-i', None)])
out = self.p.reverse_modes('#foobar', '+mint')
self.assertEqual(out, '-mi') # Ignore +nt since it already exists
out = self.p.reverse_modes('#foobar', '+m-t')
self.assertEqual(out, '-m+t')
out = self.p.reverse_modes('#foobar', '+mn-t')
self.assertEqual(out, '-m+t')
out = self.p.reverse_modes('#foobar', [('+m', None), ('+n', None), ('-t', None)])
self.assertEqual(out, [('-m', None), ('+t', None)])
def test_reverse_modes_bans(self):
c = self.p.channels['#foobar'] = Channel(self.p, name='#foobar')
c.modes = {('t', None), ('n', None), ('b', '*!*@example.com')}
out = self.p.reverse_modes('#foobar', [('+b', '*!*@test')])
self.assertEqual(out, [('-b', '*!*@test')], "non-existent ban should be removed")
out = self.p.reverse_modes('#foobar', '+b *!*@example.com')
self.assertEqual(out, '+', "+b existing ban should be no-op")
out = self.p.reverse_modes('#foobar', '+b *!*@Example.com')
self.assertEqual(out, '+', "Should ignore attempt to change case of ban mode")
out = self.p.reverse_modes('#foobar', '-b *!*@example.com')
self.assertEqual(out, '+b *!*@example.com', "-b existing ban should reset it")
out = self.p.reverse_modes('#foobar', '-b *!*@Example.com')
self.assertEqual(out, '+b *!*@example.com', "-b existing ban should reset it using original case")
out = self.p.reverse_modes('#foobar', '-b *!*@*')
self.assertEqual(out, '+', "Removing non-existent ban is no-op")
out = self.p.reverse_modes('#foobar', '+bbbm 1 2 3')
self.assertEqual(out, '-bbbm 1 2 3')
def test_reverse_modes_limit(self):
c = self.p.channels['#foobar'] = Channel(self.p, name='#foobar')
c.modes = {('t', None), ('n', None), ('l', '50')}
out = self.p.reverse_modes('#foobar', [('+l', '100')])
self.assertEqual(out, [('+l', '50')], "Setting +l should reset original mode")
out = self.p.reverse_modes('#foobar', [('-l', None)])
self.assertEqual(out, [('+l', '50')], "Unsetting +l should reset original mode")
out = self.p.reverse_modes('#foobar', [('+l', '50')])
self.assertEqual(out, [], "Setting +l with original value is no-op")
c.modes.clear()
out = self.p.reverse_modes('#foobar', [('+l', '111'), ('+m', None)])
self.assertEqual(out, [('-l', None), ('-m', None)], "Setting +l on channel without it should remove")
def test_reverse_modes_prefixmodes(self):
c = self.p.channels['#foobar'] = Channel(self.p, name='#foobar')
c.modes = {('t', None), ('n', None)}
u = self._make_user('nick', uid='user')
u.channels.add(c)
c.users.add(u)
c.prefixmodes['op'].add(u.uid)
out = self.p.reverse_modes('#foobar', '-o user')
self.assertEqual(out, '+o user')
out = self.p.reverse_modes('#foobar', '+o user')
self.assertEqual(out, '+')
out = self.p.reverse_modes('#foobar', '+ov user user')
self.assertEqual(out, '-v user') # ignore +o
out = self.p.reverse_modes('#foobar', '-ovt user user')
self.assertEqual(out, '+ot user') # ignore -v
def test_reverse_modes_cycle_simple(self):
c = self.p.channels['#weirdstuff'] = Channel(self.p, name='#weirdstuff')
c.modes = {('t', None), ('n', None)}
out = self.p.reverse_modes('#weirdstuff', '+n-n') # +- cycle existing mode
self.assertEqual(out, '+n')
out = self.p.reverse_modes('#weirdstuff', '-n+n') # -+ cycle existing mode
self.assertEqual(out, '+n') # Ugly but OK
out = self.p.reverse_modes('#weirdstuff', '+m-m') # +- cycle non-existent mode
self.assertEqual(out, '-m')
out = self.p.reverse_modes('#weirdstuff', '-m+m') # -+ cycle non-existent mode
self.assertEqual(out, '-m') # Ugly but OK
def test_reverse_modes_cycle_bans(self):
c = self.p.channels['#weirdstuff'] = Channel(self.p, name='#weirdstuff')
c.modes = {('t', None), ('n', None), ('b', '*!*@test.host')}
out = self.p.reverse_modes('#weirdstuff', '+b-b *!*@test.host *!*@test.host') # +- cycle existing ban
self.assertEqual(out, '+b *!*@test.host')
out = self.p.reverse_modes('#weirdstuff', '-b+b *!*@test.host *!*@test.host') # -+ cycle existing ban
self.assertEqual(out, '+b *!*@test.host') # Ugly but OK
out = self.p.reverse_modes('#weirdstuff', '+b-b *!*@* *!*@*') # +- cycle existing ban
self.assertEqual(out, '-b *!*@*')
out = self.p.reverse_modes('#weirdstuff', '-b+b *!*@* *!*@*') # -+ cycle existing ban
self.assertEqual(out, '-b *!*@*') # Ugly but OK
def test_reverse_modes_cycle_arguments(self):
# All of these cases are ugly, sometimes unsetting modes that don't exist...
c = self.p.channels['#weirdstuff'] = Channel(self.p, name='#weirdstuff')
out = self.p.reverse_modes('#weirdstuff', '+l-l 30')
self.assertEqual(out, '-l')
out = self.p.reverse_modes('#weirdstuff', '-l+l 30')
self.assertEqual(out, '-l')
out = self.p.reverse_modes('#weirdstuff', '+k-k aaaaaaaaaaaa aaaaaaaaaaaa')
self.assertEqual(out, '-k aaaaaaaaaaaa')
out = self.p.reverse_modes('#weirdstuff', '-k+k aaaaaaaaaaaa aaaaaaaaaaaa')
self.assertEqual(out, '-k aaaaaaaaaaaa')
c.modes = {('l', '555'), ('k', 'NO-PLEASE')}
out = self.p.reverse_modes('#weirdstuff', '+l-l 30')
self.assertEqual(out, '+l 555')
out = self.p.reverse_modes('#weirdstuff', '-l+l 30')
self.assertEqual(out, '+l 555')
out = self.p.reverse_modes('#weirdstuff', '+k-k aaaaaaaaaaaa aaaaaaaaaaaa')
self.assertEqual(out, '+k NO-PLEASE')
out = self.p.reverse_modes('#weirdstuff', '-k+k aaaaaaaaaaaa aaaaaaaaaaaa')
self.assertEqual(out, '+k NO-PLEASE')
def test_reverse_modes_cycle_prefixmodes(self):
# All of these cases are ugly, sometimes unsetting modes that don't exist...
c = self.p.channels['#weirdstuff'] = Channel(self.p, name='#weirdstuff')
u = self._make_user('nick', uid='user')
u.channels.add(c)
c.users.add(u)
# user not already opped
out = self.p.reverse_modes('#weirdstuff', '+o-o user user')
self.assertEqual(out, '-o user')
out = self.p.reverse_modes('#weirdstuff', '-o+o user user')
self.assertEqual(out, '-o user')
c.prefixmodes['op'].add(u.uid)
# user was opped
out = self.p.reverse_modes('#weirdstuff', '+o-o user user')
self.assertEqual(out, '+o user')
out = self.p.reverse_modes('#weirdstuff', '-o+o user user')
self.assertEqual(out, '+o user')
def test_join_modes(self):
# join_modes operates independently of state; the input just has to be valid modepairs
check = lambda inp, expected, sort=False: self.assertEqual(self.p.join_modes(inp, sort=sort), expected)
check([], '+') # base case
check([('+b', '*!*@test')], '+b *!*@test')
check([('-S', None)], '-S')
check([('+n', None), ('+t', None)], '+nt')
check([('+t', None), ('+n', None)], '+tn')
check([('+t', None), ('+n', None)], '+nt', sort=True)
check([('-n', None), ('-s', None)], '-ns')
check([('+q', '*'), ('-q', '*')], '+q-q * *')
check([('+l', '5'), ('-n', None), ('+R', None)], '+l-n+R 5')
# Sloppy syntax: assume leading mode is + if not otherwise stated
check([('o', '100AAAAAC'), ('m', None), ('-v', '100AAAAAC')], '+om-v 100AAAAAC 100AAAAAC')
def test_wrap_modes_small(self):
# wrap_modes is also state independent; it only calls join_modes
N_USERS = 5
modes = [('+m', None)] + [('-b', 'user%s!*@example.org' % num) for num in range(N_USERS)]
wr = self.p.wrap_modes(modes, 200)
log.debug('wrap_modes input: %s', modes)
log.debug('wrap_modes output: %s', wr)
self.assertEqual(len(wr), 1) # no split should have occurred
self.assertEqual(wr[0], self.p.join_modes(modes))
def test_wrap_modes_split_length_limit(self):
# wrap_modes is also state independent; it only calls join_modes
N_USERS = 50
modes = [('+o', 'user%s' % num) for num in range(N_USERS)]
wr = self.p.wrap_modes(modes, 120)
log.debug('wrap_modes input: %s', modes)
log.debug('wrap_modes output: %s', wr)
self.assertTrue(len(wr) > 1) # we should have induced a split
for s in wr:
# Check that each split item starts with the right mode char
self.assertTrue(s.startswith('+oooo'))
all_args = itertools.chain.from_iterable(s.split() for s in wr)
for num in range(N_USERS):
# Check that no users are missing
self.assertIn('user%s' % num, all_args)
def test_wrap_modes_split_max_modes(self):
# wrap_modes is also state independent; it only calls join_modes
N_USERS = 50
N_MAX_PER_MSG = 8
modes = [('+v', 'user%s' % num) for num in range(N_USERS)]
wr = self.p.wrap_modes(modes, 200, N_MAX_PER_MSG)
log.debug('wrap_modes input: %s', modes)
log.debug('wrap_modes output: %s', wr)
self.assertTrue(len(wr) > 1) # we should have induced a split
splits = [s.split() for s in wr]
for s in splits:
# Check that each message sets <= N_MAX_PER_MSG modes
self.assertTrue(len(s[0]) <= (N_MAX_PER_MSG + 1)) # add 1 to account for leading +
self.assertTrue(s[0].startswith('+'))
self.assertTrue(s[0].endswith('v'))
all_args = itertools.chain.from_iterable(splits)
for num in range(N_USERS):
# Check that no users are missing
self.assertIn('user%s' % num, all_args)
# TODO: test type coersion if channel or mode targets are ints