diff --git a/autotests/testEAP-TLS/connection_test.py b/autotests/testEAP-TLS/connection_test.py index 24e163bc..631d032e 100644 --- a/autotests/testEAP-TLS/connection_test.py +++ b/autotests/testEAP-TLS/connection_test.py @@ -7,6 +7,7 @@ sys.path.append('../util') import iwd from iwd import IWD from iwd import NetworkType +import testutil class Test(unittest.TestCase): @@ -39,6 +40,8 @@ class Test(unittest.TestCase): condition = 'obj.connected' wd.wait_for_object_condition(ordered_network.network_object, condition) + testutil.test_ifaces_connected() + device.disconnect() condition = 'not obj.connected' diff --git a/autotests/testFT-PSK-roam/test.py b/autotests/testFT-PSK-roam/test.py index c9a7c2f6..482de5a8 100644 --- a/autotests/testFT-PSK-roam/test.py +++ b/autotests/testFT-PSK-roam/test.py @@ -11,6 +11,7 @@ from iwd import NetworkType from hwsim import Hwsim from hostapd import HostapdCLI from wiphy import wiphy_map +import testutil class Test(unittest.TestCase): def test_roam_success(self): @@ -109,6 +110,10 @@ class Test(unittest.TestCase): wd.unregister_psk_agent(psk_agent) + testutil.test_ifaces_connected(bss_hostapd[0].ifname, device.name) + self.assertRaises(Exception, testutil.test_ifaces_connected, + (bss_hostapd[1].ifname, device.name)) + # Check that iwd starts transition to BSS 1 in less than 10 seconds rule0.signal = -8000 @@ -123,6 +128,10 @@ class Test(unittest.TestCase): self.assertEqual(device.state, iwd.DeviceState.connected) self.assertTrue(bss_hostapd[1].list_sta()) + testutil.test_ifaces_connected(bss_hostapd[1].ifname, device.name) + self.assertRaises(Exception, testutil.test_ifaces_connected, + (bss_hostapd[0].ifname, device.name)) + device.disconnect() condition = 'not obj.connected' diff --git a/autotests/testOpen/connection_test.py b/autotests/testOpen/connection_test.py index 7e50cc60..a3b8cae3 100644 --- a/autotests/testOpen/connection_test.py +++ b/autotests/testOpen/connection_test.py @@ -7,6 +7,7 @@ sys.path.append('../util') import iwd from iwd import IWD from iwd import NetworkType +import testutil class Test(unittest.TestCase): @@ -39,6 +40,8 @@ class Test(unittest.TestCase): condition = 'obj.connected' wd.wait_for_object_condition(ordered_network.network_object, condition) + testutil.test_ifaces_connected() + device.disconnect() condition = 'not obj.connected' diff --git a/autotests/testPreauth-roam/test.py b/autotests/testPreauth-roam/test.py index b95d605f..b112a829 100644 --- a/autotests/testPreauth-roam/test.py +++ b/autotests/testPreauth-roam/test.py @@ -11,6 +11,7 @@ from iwd import NetworkType from hwsim import Hwsim from hostapd import HostapdCLI from wiphy import wiphy_map +import testutil class Test(unittest.TestCase): def test_preauth_success(self): @@ -100,6 +101,10 @@ class Test(unittest.TestCase): self.assertTrue(bss_hostapd[0].list_sta()) self.assertFalse(bss_hostapd[1].list_sta()) + testutil.test_ifaces_connected(bss_hostapd[0].ifname, device.name) + self.assertRaises(Exception, testutil.test_ifaces_connected, + (bss_hostapd[1].ifname, device.name)) + # Check that iwd starts transition to BSS 1 in less than 15 seconds rule0.signal = -8000 @@ -116,6 +121,10 @@ class Test(unittest.TestCase): self.assertEqual(device.state, iwd.DeviceState.connected) self.assertTrue(bss_hostapd[1].list_sta()) + testutil.test_ifaces_connected(bss_hostapd[1].ifname, device.name) + self.assertRaises(Exception, testutil.test_ifaces_connected, + (bss_hostapd[0].ifname, device.name)) + device.disconnect() condition = 'not obj.connected' diff --git a/autotests/testWPA2/connection_test.py b/autotests/testWPA2/connection_test.py index dbdbb281..076b84a3 100644 --- a/autotests/testWPA2/connection_test.py +++ b/autotests/testWPA2/connection_test.py @@ -8,6 +8,7 @@ import iwd from iwd import IWD from iwd import PSKAgent from iwd import NetworkType +import testutil class Test(unittest.TestCase): @@ -43,6 +44,8 @@ class Test(unittest.TestCase): condition = 'obj.connected' wd.wait_for_object_condition(ordered_network.network_object, condition) + testutil.test_ifaces_connected() + device.disconnect() condition = 'not obj.connected' diff --git a/autotests/util/testutil.py b/autotests/util/testutil.py new file mode 100644 index 00000000..7a46d5dd --- /dev/null +++ b/autotests/util/testutil.py @@ -0,0 +1,102 @@ +#! /usr/bin/python3 +# Rougly based on wpa_supplicant's mac80211_hwsim/tools/hwsim_test.c utility. +import socket +import fcntl +import struct +import select + +import wiphy + +HWSIM_ETHERTYPE = 0x0800 +HWSIM_PACKETLEN = 250 + +def raw_if_socket(intf): + sock = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, + socket.htons(HWSIM_ETHERTYPE)) + + sock.bind((intf, HWSIM_ETHERTYPE)) + + return (sock, sock.getsockname()[4]) + +def checksum(buf): + pairs = zip(buf[0::2], buf[1::2]) + s = sum([(h << 8) + l for h, l in pairs]) + + while s >> 16: + s = (s & 0xffff) + (s >> 16) + + return s ^ 0xffff + +def tx(fromsock, tosock, src, dst): + frame = b''.join([ + dst, # eth.rmac + src, # eth.lmac + struct.pack('!H', HWSIM_ETHERTYPE), # eth.type + b'\x45', # ip.hdr_len + b'\x00', # ip.dsfield + struct.pack('!H', HWSIM_PACKETLEN - 14), # ip.len + b'\x01\x23', # ip.id + b'\x40\x00', # ip.flags, ip.frag_offset + b'\x40', # ip.ttl + b'\x01', # ip.proto + struct.pack('>H', 0), # ip.checksum + socket.inet_aton('192.168.1.1'), # ip.src + socket.inet_aton('192.168.1.2'), # ip.dst + bytes(range(0, HWSIM_PACKETLEN - 14 - 20)) + ]) + frame = frame[:24] + struct.pack('>H', checksum(frame[14:34])) + frame[26:] + + fromsock.send(frame) + + return (frame, fromsock, tosock, src, dst) + +def test_ifaces_connected(if0=None, if1=None): + for wname in wiphy.wiphy_map: + for intf in wiphy.wiphy_map[wname]: + if if0 is None: + if0 = intf + elif if1 is None and intf != if0: + if1 = intf + + sock0, addr0 = raw_if_socket(if0) + sock1, addr1 = raw_if_socket(if1) + bcast = b'\xff\xff\xff\xff\xff\xff' + + try: + frames = [ + tx(sock0, sock1, addr0, addr1), + tx(sock0, sock1, addr0, bcast), + tx(sock1, sock0, addr1, addr0), + tx(sock1, sock0, addr1, bcast), + ] + + rec = [False, False, False, False] + + while not all(rec): + r, w, x = select.select([sock0, sock1], [], [], 1.0) + if not r: + raise Exception('timeout waiting for packets: ' + repr(rec)) + + for s in r: + data, src = s.recvfrom(HWSIM_PACKETLEN + 1) + print('received ' + repr(data[:40]) + '... from ' + str(src)) + if len(data) != HWSIM_PACKETLEN: + continue + + idx = 0 + for origdata, fromsock, tosock, origsrc, origdst in frames: + if s is tosock and src[4] == origsrc and data == origdata: + print('matches frame ' + str(idx)) + break + idx += 1 + else: + print('doesn\'t match any of our frames') + continue + + if rec[idx]: + raise Exception('duplicate frame ' + str(idx)) + + rec[idx] = True + finally: + sock0.close() + sock1.close()