mirror of
https://git.kernel.org/pub/scm/network/wireless/iwd.git
synced 2024-11-29 13:59:24 +01:00
b1cc4c236b
The parsing code was breaking out of the loop on the first comment which is incorrect and causes only part of the file to be parsed. Its odd this hasn't popped up until now but its likely due to differing dhcpd versions, some which add comments and others that do not.
324 lines
15 KiB
Python
324 lines
15 KiB
Python
#!/usr/bin/python3
|
|
|
|
import unittest
|
|
import sys
|
|
|
|
sys.path.append('../util')
|
|
import iwd
|
|
from iwd import IWD
|
|
from iwd import PSKAgent
|
|
from iwd import NetworkType
|
|
from hostapd import HostapdCLI
|
|
import testutil
|
|
from config import ctx
|
|
import os
|
|
import socket
|
|
import datetime, time
|
|
|
|
class Test(unittest.TestCase):
|
|
|
|
def test_connection_success(self):
|
|
def check_addr(device):
|
|
try:
|
|
# DHCPv6 addresses always have a prefix length of 128 bits, the actual
|
|
# subnet's prefix length is in the route.
|
|
testutil.test_ip_address_match(device.name, '3ffe:501:ffff:100::1', 128, 112)
|
|
except:
|
|
return False
|
|
|
|
return True
|
|
|
|
def get_ll_addrs6(ns, ifname):
|
|
show_ip = ns.start_process(['ip', 'addr', 'show', ifname])
|
|
show_ip.wait()
|
|
for l in show_ip.out.split('\n'):
|
|
if 'inet6 fe80::' in l:
|
|
return socket.inet_pton(socket.AF_INET6, l.split(None, 1)[1].split('/', 1)[0])
|
|
return None
|
|
|
|
os.system("hostname test-sta")
|
|
IWD.copy_to_storage('auto.psk', name='ap-ns1.psk')
|
|
wd = IWD(True)
|
|
|
|
psk_agent = PSKAgent("secret123")
|
|
wd.register_psk_agent(psk_agent)
|
|
|
|
devices = wd.list_devices(1)
|
|
device = devices[0]
|
|
|
|
ordered_network = device.get_ordered_network('ap-ns1')
|
|
|
|
self.assertEqual(ordered_network.type, NetworkType.psk)
|
|
|
|
condition = 'not obj.connected'
|
|
wd.wait_for_object_condition(ordered_network.network_object, condition)
|
|
|
|
ordered_network.network_object.connect()
|
|
|
|
condition = 'obj.state == DeviceState.connected'
|
|
wd.wait_for_object_condition(device, condition)
|
|
connect_time = time.time()
|
|
|
|
testutil.test_iface_operstate()
|
|
|
|
testutil.test_ip_address_match(device.name, '192.168.1.10', 17, 24)
|
|
ctx.non_block_wait(check_addr, 10, device,
|
|
exception=Exception("IPv6 address was not set"))
|
|
|
|
# Cannot use test_ifaces_connected() across namespaces (implementation details)
|
|
testutil.test_ip_connected(('192.168.1.10', ctx), ('192.168.1.1', self.ns1))
|
|
|
|
ifname = str(device.name)
|
|
router_ll_addr = get_ll_addrs6(self.ns1, self.hapd.ifname)
|
|
# Since we're in an isolated VM with freshly created interfaces we know any routes
|
|
# will have been created by IWD and we don't have to allow for pre-existing routes
|
|
# in the table.
|
|
# Flags: 1=RTF_UP, 2=RTF_GATEWAY
|
|
expected_routes4 = {
|
|
testutil.RouteInfo(gw=socket.inet_pton(socket.AF_INET, '192.168.1.1'),
|
|
flags=3, ifname=ifname),
|
|
testutil.RouteInfo(dst=socket.inet_pton(socket.AF_INET, '192.168.0.0'), plen=17,
|
|
flags=1, ifname=ifname)
|
|
}
|
|
expected_routes6 = {
|
|
# Default router
|
|
testutil.RouteInfo(gw=router_ll_addr, flags=3, ifname=ifname),
|
|
# On-link prefix
|
|
testutil.RouteInfo(dst=socket.inet_pton(socket.AF_INET6, '3ffe:501:ffff:100::'), plen=72,
|
|
flags=1, ifname=ifname),
|
|
# Router for an off-link prefix, medium preference
|
|
testutil.RouteInfo(dst=socket.inet_pton(socket.AF_INET6, '3ffe:501:ffff:300::'), plen=64,
|
|
gw=router_ll_addr, flags=3, ifname=ifname),
|
|
# Router for an off-link prefix, high preference
|
|
testutil.RouteInfo(dst=socket.inet_pton(socket.AF_INET6, '3ffe:501:ffff:400::'), plen=65,
|
|
gw=router_ll_addr, flags=3, ifname=ifname),
|
|
# Router for an off-link prefix, low preference
|
|
testutil.RouteInfo(dst=socket.inet_pton(socket.AF_INET6, '3ffe:501:ffff:500::'), plen=66,
|
|
gw=router_ll_addr, flags=3, ifname=ifname)
|
|
}
|
|
self.maxDiff = None
|
|
self.assertEqual(expected_routes4, set(testutil.get_routes4(ifname)))
|
|
self.assertEqual(expected_routes6, set(testutil.get_routes6(ifname)))
|
|
|
|
rclog = open('/tmp/resolvconf.log', 'r')
|
|
entries = rclog.readlines()
|
|
rclog.close()
|
|
expected_rclog = ['-a %s.dns\n' % ifname, 'nameserver 192.168.1.2\n', 'nameserver 3ffe:501:ffff:100::2\n']
|
|
# Every real resolvconf -a run overwrites the previous settings. Check the last three lines
|
|
# of our log since we care about the end result here.
|
|
self.assertEqual(expected_rclog, entries[-3:])
|
|
|
|
leases_file = self.parse_lease_file('/tmp/dhcpd.leases', socket.AF_INET)
|
|
lease = leases_file['leases'][socket.inet_pton(socket.AF_INET, '192.168.1.10')]
|
|
self.assertEqual(lease['state'], 'active')
|
|
self.assertEqual(lease['client-hostname'], 'test-sta')
|
|
self.assertTrue(lease['starts'] < connect_time)
|
|
self.assertTrue(lease['ends'] > connect_time)
|
|
# The T1 is 15 seconds per dhcpd.conf. This is the approximate interval between lease
|
|
# renewals we should see from the client (+/- 1 second + some jitter). Wait a little
|
|
# less than twice that time (25s) so that we can expect the lease was renewed strictly
|
|
# once (not 0 or 2 times) by that time, check that the lease timestamps have changed by
|
|
# at least 10s so as to leave a lot of margin.
|
|
renew_time = lease['starts'] + 15
|
|
now = time.time()
|
|
ctx.non_block_wait(lambda: False, renew_time + 10 - now, exception=False)
|
|
|
|
leases_file = self.parse_lease_file('/tmp/dhcpd.leases', socket.AF_INET)
|
|
new_lease = leases_file['leases'][socket.inet_pton(socket.AF_INET, '192.168.1.10')]
|
|
self.assertEqual(new_lease['state'], 'active')
|
|
self.assertEqual(new_lease['client-hostname'], 'test-sta')
|
|
self.assertTrue(new_lease['starts'] > lease['starts'] + 10)
|
|
self.assertTrue(new_lease['starts'] < lease['starts'] + 25)
|
|
self.assertTrue(new_lease['ends'] > lease['ends'] + 10)
|
|
self.assertTrue(new_lease['ends'] < lease['ends'] + 25)
|
|
|
|
# Now wait another T1 seconds but don't let our DHCP client get its REQUEST out this
|
|
# time so as to test renew timeouts and resends. The retry interval is 60 seconds
|
|
# since (T2 - T1) / 2 is shorter than 60s. It is now about 10s since the last
|
|
# renewal or 5s before the next DHCPREQUEST frame that is going to be lost. We'll
|
|
# wait T1 seconds, so until about 10s after the failed attempt, we'll check that
|
|
# there was no renewal by that time, just in case, and we'll reenable frame delivery.
|
|
# We'll then wait another 60s and we should see the lease has been successfully
|
|
# renewed some 10 seconds earlier on the 1st DHCPREQUEST retransmission.
|
|
#
|
|
# We can't use hswim to block the frames from reaching the AP because we'd lose
|
|
# beacons and get disconnected. We also can't drop our subnet route or IP address
|
|
# because IWD's sendto() call would synchronously error out and the DHCP client
|
|
# would just give up. Add a false route to break routing to 192.168.1.1 and delete
|
|
# it afterwards.
|
|
os.system('ip route add 192.168.1.1/32 dev ' + ifname + ' via 192.168.1.100 preference 0')
|
|
|
|
lease = new_lease
|
|
renew_time = lease['starts'] + 15
|
|
now = time.time()
|
|
ctx.non_block_wait(lambda: False, renew_time + 10 - now, exception=False)
|
|
|
|
leases_file = self.parse_lease_file('/tmp/dhcpd.leases', socket.AF_INET)
|
|
new_lease = leases_file['leases'][socket.inet_pton(socket.AF_INET, '192.168.1.10')]
|
|
self.assertEqual(new_lease['starts'], lease['starts'])
|
|
|
|
os.system('ip route del 192.168.1.1/32 dev ' + ifname + ' via 192.168.1.100 preference 0')
|
|
|
|
retry_time = lease['starts'] + 75
|
|
now = time.time()
|
|
ctx.non_block_wait(lambda: False, retry_time + 10 - now, exception=False)
|
|
|
|
leases_file = self.parse_lease_file('/tmp/dhcpd.leases', socket.AF_INET)
|
|
new_lease = leases_file['leases'][socket.inet_pton(socket.AF_INET, '192.168.1.10')]
|
|
self.assertEqual(new_lease['state'], 'active')
|
|
self.assertEqual(lease['client-hostname'], 'test-sta')
|
|
self.assertTrue(new_lease['starts'] > lease['starts'] + 70)
|
|
self.assertTrue(new_lease['starts'] < lease['starts'] + 85)
|
|
self.assertTrue(new_lease['ends'] > lease['ends'] + 70)
|
|
self.assertTrue(new_lease['ends'] < lease['ends'] + 85)
|
|
|
|
device.disconnect()
|
|
|
|
condition = 'not obj.connected'
|
|
wd.wait_for_object_condition(ordered_network.network_object, condition)
|
|
|
|
wd.unregister_psk_agent(psk_agent)
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
def remove_lease4():
|
|
try:
|
|
os.remove('/tmp/dhcpd.leases')
|
|
os.remove('/tmp/dhcpd.leases~')
|
|
except:
|
|
pass
|
|
def remove_lease6():
|
|
try:
|
|
os.remove('/tmp/dhcpd6.leases')
|
|
os.remove('/tmp/dhcpd6.leases~')
|
|
except:
|
|
pass
|
|
|
|
cls.ns1 = ctx.get_namespace('ns1')
|
|
cls.hapd = HostapdCLI('ap-ns1.conf')
|
|
# TODO: This could be moved into test-runner itself if other tests ever
|
|
# require this functionality (p2p, FILS, etc.). Since its simple
|
|
# enough it can stay here for now.
|
|
cls.ns1.start_process(['ip', 'addr','add', '192.168.1.1/17',
|
|
'dev', cls.hapd.ifname]).wait()
|
|
cls.ns1.start_process(['touch', '/tmp/dhcpd.leases']).wait()
|
|
cls.dhcpd_pid = cls.ns1.start_process(['dhcpd', '-f', '-d', '-cf', '/tmp/dhcpd.conf',
|
|
'-lf', '/tmp/dhcpd.leases',
|
|
cls.hapd.ifname], cleanup=remove_lease4)
|
|
|
|
cls.ns1.start_process(['ip', 'addr', 'add', '3ffe:501:ffff:100::1/72',
|
|
'dev', cls.hapd.ifname]).wait()
|
|
cls.ns1.start_process(['touch', '/tmp/dhcpd6.leases']).wait()
|
|
cls.dhcpd6_pid = cls.ns1.start_process(['dhcpd', '-6', '-f', '-d',
|
|
'-cf', '/tmp/dhcpd-v6.conf',
|
|
'-lf', '/tmp/dhcpd6.leases',
|
|
cls.hapd.ifname], cleanup=remove_lease6)
|
|
cls.ns1.start_process(['sysctl',
|
|
'net.ipv6.conf.' + cls.hapd.ifname + '.forwarding=1']).wait()
|
|
# Send out Router Advertisements telling clients to use DHCPv6.
|
|
# Note trying to send the RAs from the router's global IPv6 address by adding a
|
|
# "AdvRASrcAddress { 3ffe:501:ffff:100::1; };" line will fail because the client
|
|
# and the router interfaces are in the same namespace and Linux won't allow routes
|
|
# with a non-link-local gateway address that is present on another interface in the
|
|
# same namespace.
|
|
config = open('/tmp/radvd.conf', 'w')
|
|
config.write('interface ' + cls.hapd.ifname + ''' {
|
|
AdvSendAdvert on;
|
|
AdvManagedFlag on;
|
|
prefix 3ffe:501:ffff:100::/72 { AdvAutonomous off; };
|
|
route 3ffe:501:ffff:300::/64 {};
|
|
route 3ffe:501:ffff:400::/65 { AdvRoutePreference low; };
|
|
route 3ffe:501:ffff:500::/66 { AdvRoutePreference high; };
|
|
};''')
|
|
config.close()
|
|
cls.radvd_pid = cls.ns1.start_process(['radvd', '-n', '-d5',
|
|
'-p', '/tmp/radvd.pid', '-C', '/tmp/radvd.conf'])
|
|
|
|
cls.orig_path = os.environ['PATH']
|
|
os.environ['PATH'] = '/tmp/test-bin:' + os.environ['PATH']
|
|
IWD.copy_to_storage('resolvconf', '/tmp/test-bin')
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
IWD.clear_storage()
|
|
cls.ns1.stop_process(cls.dhcpd_pid)
|
|
cls.dhcpd_pid = None
|
|
cls.ns1.stop_process(cls.dhcpd6_pid)
|
|
cls.dhcpd6_pid = None
|
|
cls.ns1.stop_process(cls.radvd_pid)
|
|
cls.radvd_pid = None
|
|
os.system('rm -rf /tmp/radvd.conf /tmp/resolvconf.log /tmp/test-bin')
|
|
os.environ['PATH'] = cls.orig_path
|
|
|
|
@staticmethod
|
|
def parse_lease_file(path, family):
|
|
file = open(path, 'r')
|
|
lines = file.readlines()
|
|
file.close()
|
|
|
|
stack = [[]]
|
|
statement = []
|
|
token = ''
|
|
for line in lines:
|
|
whitespace = False
|
|
quote = False
|
|
for ch in line:
|
|
if not quote and ch in ' \t\r\n;{}=#':
|
|
if len(token):
|
|
statement.append(token)
|
|
token = ''
|
|
if not quote and ch in ';{}':
|
|
if len(statement):
|
|
stack[-1].append(statement)
|
|
statement = []
|
|
if ch == '"':
|
|
quote = not quote
|
|
elif quote or ch not in ' \t\r\n;{}#':
|
|
token += ch
|
|
if ch == '#':
|
|
continue
|
|
elif ch == '{':
|
|
stack.append([])
|
|
elif ch == '}':
|
|
statements = stack.pop()
|
|
stack[-1][-1].append(statements)
|
|
if len(token):
|
|
statement.append(token)
|
|
token = ''
|
|
if len(statement):
|
|
stack[-1].append(statement)
|
|
statements = stack.pop(0)
|
|
if len(stack):
|
|
raise Exception('Unclosed block(s)')
|
|
|
|
contents = {'leases':{}}
|
|
for s in statements:
|
|
if s[0] == 'lease':
|
|
ip = socket.inet_pton(family, s[1])
|
|
lease = {}
|
|
for param in s[2]:
|
|
if param[0] in ('starts', 'ends', 'tstp', 'tsfp', 'atsfp', 'cltt'):
|
|
weekday = param[1]
|
|
year, month, day = param[2].split('/')
|
|
hour, minute, second = param[3].split(':')
|
|
dt = datetime.datetime(
|
|
int(year), int(month), int(day),
|
|
int(hour), int(minute), int(second),
|
|
tzinfo=datetime.timezone.utc)
|
|
lease[param[0]] = dt.timestamp()
|
|
elif param[0:2] == ['binding', 'state']:
|
|
lease['state'] = param[2]
|
|
elif param[0:2] == ['hardware', 'ethernet']:
|
|
lease['hwaddr'] = bytes([int(v, 16) for v in param[2].split(':')])
|
|
elif param[0] in ('preferred-life', 'max-life'):
|
|
lease[param[0]] = int(param[1])
|
|
elif param[0] in ('client-hostname'):
|
|
lease[param[0]] = param[1]
|
|
contents['leases'][ip] = lease # New entries overwrite older ones
|
|
elif s[0] == 'server-duid':
|
|
contents[s[0]] = s[1]
|
|
return contents
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main(exit=True)
|