3
0
mirror of https://git.kernel.org/pub/scm/network/wireless/iwd.git synced 2024-12-19 01:42:33 +01:00

autotests: Test communication when iwd reports connected

Test that the AP interface and the station interface managed by iwd
can actually send and receive ethernet traffic when iwd is in the
connected state.  Due to linux routing none of the high level utilities
like ping or arping can be easily used to test communication between
two interfaces of the same machine so use a method based on the
mac80211_hwsim/tools/hwsim_test.c utility in the wpa_supplicant tree
that uses a raw socket to inject unicast and broadcast frames.

Add this check in three tests of different security type connections
that simulate a single AP, and the two roaming tests with two APs.
Check that the station can't communicate with the other AP's interface.
This commit is contained in:
Andrew Zaborowski 2017-05-22 10:49:47 +02:00 committed by Denis Kenzior
parent f5decb274d
commit 71c15306fd
6 changed files with 129 additions and 0 deletions

View File

@ -7,6 +7,7 @@ sys.path.append('../util')
import iwd import iwd
from iwd import IWD from iwd import IWD
from iwd import NetworkType from iwd import NetworkType
import testutil
class Test(unittest.TestCase): class Test(unittest.TestCase):
@ -39,6 +40,8 @@ class Test(unittest.TestCase):
condition = 'obj.connected' condition = 'obj.connected'
wd.wait_for_object_condition(ordered_network.network_object, condition) wd.wait_for_object_condition(ordered_network.network_object, condition)
testutil.test_ifaces_connected()
device.disconnect() device.disconnect()
condition = 'not obj.connected' condition = 'not obj.connected'

View File

@ -11,6 +11,7 @@ from iwd import NetworkType
from hwsim import Hwsim from hwsim import Hwsim
from hostapd import HostapdCLI from hostapd import HostapdCLI
from wiphy import wiphy_map from wiphy import wiphy_map
import testutil
class Test(unittest.TestCase): class Test(unittest.TestCase):
def test_roam_success(self): def test_roam_success(self):
@ -109,6 +110,10 @@ class Test(unittest.TestCase):
wd.unregister_psk_agent(psk_agent) wd.unregister_psk_agent(psk_agent)
testutil.test_ifaces_connected(bss_hostapd[0].ifname, device.name)
self.assertRaises(Exception, testutil.test_ifaces_connected,
(bss_hostapd[1].ifname, device.name))
# Check that iwd starts transition to BSS 1 in less than 10 seconds # Check that iwd starts transition to BSS 1 in less than 10 seconds
rule0.signal = -8000 rule0.signal = -8000
@ -123,6 +128,10 @@ class Test(unittest.TestCase):
self.assertEqual(device.state, iwd.DeviceState.connected) self.assertEqual(device.state, iwd.DeviceState.connected)
self.assertTrue(bss_hostapd[1].list_sta()) self.assertTrue(bss_hostapd[1].list_sta())
testutil.test_ifaces_connected(bss_hostapd[1].ifname, device.name)
self.assertRaises(Exception, testutil.test_ifaces_connected,
(bss_hostapd[0].ifname, device.name))
device.disconnect() device.disconnect()
condition = 'not obj.connected' condition = 'not obj.connected'

View File

@ -7,6 +7,7 @@ sys.path.append('../util')
import iwd import iwd
from iwd import IWD from iwd import IWD
from iwd import NetworkType from iwd import NetworkType
import testutil
class Test(unittest.TestCase): class Test(unittest.TestCase):
@ -39,6 +40,8 @@ class Test(unittest.TestCase):
condition = 'obj.connected' condition = 'obj.connected'
wd.wait_for_object_condition(ordered_network.network_object, condition) wd.wait_for_object_condition(ordered_network.network_object, condition)
testutil.test_ifaces_connected()
device.disconnect() device.disconnect()
condition = 'not obj.connected' condition = 'not obj.connected'

View File

@ -11,6 +11,7 @@ from iwd import NetworkType
from hwsim import Hwsim from hwsim import Hwsim
from hostapd import HostapdCLI from hostapd import HostapdCLI
from wiphy import wiphy_map from wiphy import wiphy_map
import testutil
class Test(unittest.TestCase): class Test(unittest.TestCase):
def test_preauth_success(self): def test_preauth_success(self):
@ -100,6 +101,10 @@ class Test(unittest.TestCase):
self.assertTrue(bss_hostapd[0].list_sta()) self.assertTrue(bss_hostapd[0].list_sta())
self.assertFalse(bss_hostapd[1].list_sta()) self.assertFalse(bss_hostapd[1].list_sta())
testutil.test_ifaces_connected(bss_hostapd[0].ifname, device.name)
self.assertRaises(Exception, testutil.test_ifaces_connected,
(bss_hostapd[1].ifname, device.name))
# Check that iwd starts transition to BSS 1 in less than 15 seconds # Check that iwd starts transition to BSS 1 in less than 15 seconds
rule0.signal = -8000 rule0.signal = -8000
@ -116,6 +121,10 @@ class Test(unittest.TestCase):
self.assertEqual(device.state, iwd.DeviceState.connected) self.assertEqual(device.state, iwd.DeviceState.connected)
self.assertTrue(bss_hostapd[1].list_sta()) self.assertTrue(bss_hostapd[1].list_sta())
testutil.test_ifaces_connected(bss_hostapd[1].ifname, device.name)
self.assertRaises(Exception, testutil.test_ifaces_connected,
(bss_hostapd[0].ifname, device.name))
device.disconnect() device.disconnect()
condition = 'not obj.connected' condition = 'not obj.connected'

View File

@ -8,6 +8,7 @@ import iwd
from iwd import IWD from iwd import IWD
from iwd import PSKAgent from iwd import PSKAgent
from iwd import NetworkType from iwd import NetworkType
import testutil
class Test(unittest.TestCase): class Test(unittest.TestCase):
@ -43,6 +44,8 @@ class Test(unittest.TestCase):
condition = 'obj.connected' condition = 'obj.connected'
wd.wait_for_object_condition(ordered_network.network_object, condition) wd.wait_for_object_condition(ordered_network.network_object, condition)
testutil.test_ifaces_connected()
device.disconnect() device.disconnect()
condition = 'not obj.connected' condition = 'not obj.connected'

102
autotests/util/testutil.py Normal file
View File

@ -0,0 +1,102 @@
#! /usr/bin/python3
# Rougly based on wpa_supplicant's mac80211_hwsim/tools/hwsim_test.c utility.
import socket
import fcntl
import struct
import select
import wiphy
HWSIM_ETHERTYPE = 0x0800
HWSIM_PACKETLEN = 250
def raw_if_socket(intf):
sock = socket.socket(socket.PF_PACKET, socket.SOCK_RAW,
socket.htons(HWSIM_ETHERTYPE))
sock.bind((intf, HWSIM_ETHERTYPE))
return (sock, sock.getsockname()[4])
def checksum(buf):
pairs = zip(buf[0::2], buf[1::2])
s = sum([(h << 8) + l for h, l in pairs])
while s >> 16:
s = (s & 0xffff) + (s >> 16)
return s ^ 0xffff
def tx(fromsock, tosock, src, dst):
frame = b''.join([
dst, # eth.rmac
src, # eth.lmac
struct.pack('!H', HWSIM_ETHERTYPE), # eth.type
b'\x45', # ip.hdr_len
b'\x00', # ip.dsfield
struct.pack('!H', HWSIM_PACKETLEN - 14), # ip.len
b'\x01\x23', # ip.id
b'\x40\x00', # ip.flags, ip.frag_offset
b'\x40', # ip.ttl
b'\x01', # ip.proto
struct.pack('>H', 0), # ip.checksum
socket.inet_aton('192.168.1.1'), # ip.src
socket.inet_aton('192.168.1.2'), # ip.dst
bytes(range(0, HWSIM_PACKETLEN - 14 - 20))
])
frame = frame[:24] + struct.pack('>H', checksum(frame[14:34])) + frame[26:]
fromsock.send(frame)
return (frame, fromsock, tosock, src, dst)
def test_ifaces_connected(if0=None, if1=None):
for wname in wiphy.wiphy_map:
for intf in wiphy.wiphy_map[wname]:
if if0 is None:
if0 = intf
elif if1 is None and intf != if0:
if1 = intf
sock0, addr0 = raw_if_socket(if0)
sock1, addr1 = raw_if_socket(if1)
bcast = b'\xff\xff\xff\xff\xff\xff'
try:
frames = [
tx(sock0, sock1, addr0, addr1),
tx(sock0, sock1, addr0, bcast),
tx(sock1, sock0, addr1, addr0),
tx(sock1, sock0, addr1, bcast),
]
rec = [False, False, False, False]
while not all(rec):
r, w, x = select.select([sock0, sock1], [], [], 1.0)
if not r:
raise Exception('timeout waiting for packets: ' + repr(rec))
for s in r:
data, src = s.recvfrom(HWSIM_PACKETLEN + 1)
print('received ' + repr(data[:40]) + '... from ' + str(src))
if len(data) != HWSIM_PACKETLEN:
continue
idx = 0
for origdata, fromsock, tosock, origsrc, origdst in frames:
if s is tosock and src[4] == origsrc and data == origdata:
print('matches frame ' + str(idx))
break
idx += 1
else:
print('doesn\'t match any of our frames')
continue
if rec[idx]:
raise Exception('duplicate frame ' + str(idx))
rec[idx] = True
finally:
sock0.close()
sock1.close()