From 71c15306fdaef2e4ff5834a655ad51c5c6794f17 Mon Sep 17 00:00:00 2001 From: Andrew Zaborowski Date: Mon, 22 May 2017 10:49:47 +0200 Subject: [PATCH] autotests: Test communication when iwd reports connected Test that the AP interface and the station interface managed by iwd can actually send and receive ethernet traffic when iwd is in the connected state. Due to linux routing none of the high level utilities like ping or arping can be easily used to test communication between two interfaces of the same machine so use a method based on the mac80211_hwsim/tools/hwsim_test.c utility in the wpa_supplicant tree that uses a raw socket to inject unicast and broadcast frames. Add this check in three tests of different security type connections that simulate a single AP, and the two roaming tests with two APs. Check that the station can't communicate with the other AP's interface. --- autotests/testEAP-TLS/connection_test.py | 3 + autotests/testFT-PSK-roam/test.py | 9 ++ autotests/testOpen/connection_test.py | 3 + autotests/testPreauth-roam/test.py | 9 ++ autotests/testWPA2/connection_test.py | 3 + autotests/util/testutil.py | 102 +++++++++++++++++++++++ 6 files changed, 129 insertions(+) create mode 100644 autotests/util/testutil.py diff --git a/autotests/testEAP-TLS/connection_test.py b/autotests/testEAP-TLS/connection_test.py index 24e163bc..631d032e 100644 --- a/autotests/testEAP-TLS/connection_test.py +++ b/autotests/testEAP-TLS/connection_test.py @@ -7,6 +7,7 @@ sys.path.append('../util') import iwd from iwd import IWD from iwd import NetworkType +import testutil class Test(unittest.TestCase): @@ -39,6 +40,8 @@ class Test(unittest.TestCase): condition = 'obj.connected' wd.wait_for_object_condition(ordered_network.network_object, condition) + testutil.test_ifaces_connected() + device.disconnect() condition = 'not obj.connected' diff --git a/autotests/testFT-PSK-roam/test.py b/autotests/testFT-PSK-roam/test.py index c9a7c2f6..482de5a8 100644 --- a/autotests/testFT-PSK-roam/test.py +++ b/autotests/testFT-PSK-roam/test.py @@ -11,6 +11,7 @@ from iwd import NetworkType from hwsim import Hwsim from hostapd import HostapdCLI from wiphy import wiphy_map +import testutil class Test(unittest.TestCase): def test_roam_success(self): @@ -109,6 +110,10 @@ class Test(unittest.TestCase): wd.unregister_psk_agent(psk_agent) + testutil.test_ifaces_connected(bss_hostapd[0].ifname, device.name) + self.assertRaises(Exception, testutil.test_ifaces_connected, + (bss_hostapd[1].ifname, device.name)) + # Check that iwd starts transition to BSS 1 in less than 10 seconds rule0.signal = -8000 @@ -123,6 +128,10 @@ class Test(unittest.TestCase): self.assertEqual(device.state, iwd.DeviceState.connected) self.assertTrue(bss_hostapd[1].list_sta()) + testutil.test_ifaces_connected(bss_hostapd[1].ifname, device.name) + self.assertRaises(Exception, testutil.test_ifaces_connected, + (bss_hostapd[0].ifname, device.name)) + device.disconnect() condition = 'not obj.connected' diff --git a/autotests/testOpen/connection_test.py b/autotests/testOpen/connection_test.py index 7e50cc60..a3b8cae3 100644 --- a/autotests/testOpen/connection_test.py +++ b/autotests/testOpen/connection_test.py @@ -7,6 +7,7 @@ sys.path.append('../util') import iwd from iwd import IWD from iwd import NetworkType +import testutil class Test(unittest.TestCase): @@ -39,6 +40,8 @@ class Test(unittest.TestCase): condition = 'obj.connected' wd.wait_for_object_condition(ordered_network.network_object, condition) + testutil.test_ifaces_connected() + device.disconnect() condition = 'not obj.connected' diff --git a/autotests/testPreauth-roam/test.py b/autotests/testPreauth-roam/test.py index b95d605f..b112a829 100644 --- a/autotests/testPreauth-roam/test.py +++ b/autotests/testPreauth-roam/test.py @@ -11,6 +11,7 @@ from iwd import NetworkType from hwsim import Hwsim from hostapd import HostapdCLI from wiphy import wiphy_map +import testutil class Test(unittest.TestCase): def test_preauth_success(self): @@ -100,6 +101,10 @@ class Test(unittest.TestCase): self.assertTrue(bss_hostapd[0].list_sta()) self.assertFalse(bss_hostapd[1].list_sta()) + testutil.test_ifaces_connected(bss_hostapd[0].ifname, device.name) + self.assertRaises(Exception, testutil.test_ifaces_connected, + (bss_hostapd[1].ifname, device.name)) + # Check that iwd starts transition to BSS 1 in less than 15 seconds rule0.signal = -8000 @@ -116,6 +121,10 @@ class Test(unittest.TestCase): self.assertEqual(device.state, iwd.DeviceState.connected) self.assertTrue(bss_hostapd[1].list_sta()) + testutil.test_ifaces_connected(bss_hostapd[1].ifname, device.name) + self.assertRaises(Exception, testutil.test_ifaces_connected, + (bss_hostapd[0].ifname, device.name)) + device.disconnect() condition = 'not obj.connected' diff --git a/autotests/testWPA2/connection_test.py b/autotests/testWPA2/connection_test.py index dbdbb281..076b84a3 100644 --- a/autotests/testWPA2/connection_test.py +++ b/autotests/testWPA2/connection_test.py @@ -8,6 +8,7 @@ import iwd from iwd import IWD from iwd import PSKAgent from iwd import NetworkType +import testutil class Test(unittest.TestCase): @@ -43,6 +44,8 @@ class Test(unittest.TestCase): condition = 'obj.connected' wd.wait_for_object_condition(ordered_network.network_object, condition) + testutil.test_ifaces_connected() + device.disconnect() condition = 'not obj.connected' diff --git a/autotests/util/testutil.py b/autotests/util/testutil.py new file mode 100644 index 00000000..7a46d5dd --- /dev/null +++ b/autotests/util/testutil.py @@ -0,0 +1,102 @@ +#! /usr/bin/python3 +# Rougly based on wpa_supplicant's mac80211_hwsim/tools/hwsim_test.c utility. +import socket +import fcntl +import struct +import select + +import wiphy + +HWSIM_ETHERTYPE = 0x0800 +HWSIM_PACKETLEN = 250 + +def raw_if_socket(intf): + sock = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, + socket.htons(HWSIM_ETHERTYPE)) + + sock.bind((intf, HWSIM_ETHERTYPE)) + + return (sock, sock.getsockname()[4]) + +def checksum(buf): + pairs = zip(buf[0::2], buf[1::2]) + s = sum([(h << 8) + l for h, l in pairs]) + + while s >> 16: + s = (s & 0xffff) + (s >> 16) + + return s ^ 0xffff + +def tx(fromsock, tosock, src, dst): + frame = b''.join([ + dst, # eth.rmac + src, # eth.lmac + struct.pack('!H', HWSIM_ETHERTYPE), # eth.type + b'\x45', # ip.hdr_len + b'\x00', # ip.dsfield + struct.pack('!H', HWSIM_PACKETLEN - 14), # ip.len + b'\x01\x23', # ip.id + b'\x40\x00', # ip.flags, ip.frag_offset + b'\x40', # ip.ttl + b'\x01', # ip.proto + struct.pack('>H', 0), # ip.checksum + socket.inet_aton('192.168.1.1'), # ip.src + socket.inet_aton('192.168.1.2'), # ip.dst + bytes(range(0, HWSIM_PACKETLEN - 14 - 20)) + ]) + frame = frame[:24] + struct.pack('>H', checksum(frame[14:34])) + frame[26:] + + fromsock.send(frame) + + return (frame, fromsock, tosock, src, dst) + +def test_ifaces_connected(if0=None, if1=None): + for wname in wiphy.wiphy_map: + for intf in wiphy.wiphy_map[wname]: + if if0 is None: + if0 = intf + elif if1 is None and intf != if0: + if1 = intf + + sock0, addr0 = raw_if_socket(if0) + sock1, addr1 = raw_if_socket(if1) + bcast = b'\xff\xff\xff\xff\xff\xff' + + try: + frames = [ + tx(sock0, sock1, addr0, addr1), + tx(sock0, sock1, addr0, bcast), + tx(sock1, sock0, addr1, addr0), + tx(sock1, sock0, addr1, bcast), + ] + + rec = [False, False, False, False] + + while not all(rec): + r, w, x = select.select([sock0, sock1], [], [], 1.0) + if not r: + raise Exception('timeout waiting for packets: ' + repr(rec)) + + for s in r: + data, src = s.recvfrom(HWSIM_PACKETLEN + 1) + print('received ' + repr(data[:40]) + '... from ' + str(src)) + if len(data) != HWSIM_PACKETLEN: + continue + + idx = 0 + for origdata, fromsock, tosock, origsrc, origdst in frames: + if s is tosock and src[4] == origsrc and data == origdata: + print('matches frame ' + str(idx)) + break + idx += 1 + else: + print('doesn\'t match any of our frames') + continue + + if rec[idx]: + raise Exception('duplicate frame ' + str(idx)) + + rec[idx] = True + finally: + sock0.close() + sock1.close()