mirror of
https://git.kernel.org/pub/scm/network/wireless/iwd.git
synced 2025-01-24 13:44:06 +01:00
1fca13f07a
Depending on timing the operstate may not be set even when IWD shows as connected. Instead wait for the operstate to become set.
182 lines
5.3 KiB
Python
182 lines
5.3 KiB
Python
#! /usr/bin/python3
|
|
# Roughly based on wpa_supplicant's mac80211_hwsim/tools/hwsim_test.c utility.
|
|
import socket
|
|
import fcntl
|
|
import struct
|
|
import select
|
|
|
|
import iwd
|
|
from config import ctx
|
|
|
|
HWSIM_ETHERTYPE = 0x0800
|
|
HWSIM_PACKETLEN = 250
|
|
|
|
def raw_if_socket(intf):
|
|
sock = socket.socket(socket.PF_PACKET, socket.SOCK_RAW,
|
|
socket.htons(HWSIM_ETHERTYPE))
|
|
|
|
sock.bind((intf, HWSIM_ETHERTYPE))
|
|
|
|
return (sock, sock.getsockname()[4])
|
|
|
|
def checksum(buf):
|
|
pairs = zip(buf[0::2], buf[1::2])
|
|
s = sum([(h << 8) + l for h, l in pairs])
|
|
|
|
while s >> 16:
|
|
s = (s & 0xffff) + (s >> 16)
|
|
|
|
return s ^ 0xffff
|
|
|
|
def tx(fromsock, tosock, src, dst):
|
|
frame = b''.join([
|
|
dst, # eth.rmac
|
|
src, # eth.lmac
|
|
struct.pack('!H', HWSIM_ETHERTYPE), # eth.type
|
|
b'\x45', # ip.hdr_len
|
|
b'\x00', # ip.dsfield
|
|
struct.pack('!H', HWSIM_PACKETLEN - 14), # ip.len
|
|
b'\x01\x23', # ip.id
|
|
b'\x40\x00', # ip.flags, ip.frag_offset
|
|
b'\x40', # ip.ttl
|
|
b'\x01', # ip.proto
|
|
struct.pack('>H', 0), # ip.checksum
|
|
socket.inet_aton('192.168.1.1'), # ip.src
|
|
socket.inet_aton('192.168.1.2'), # ip.dst
|
|
bytes(range(0, HWSIM_PACKETLEN - 14 - 20))
|
|
])
|
|
frame = frame[:24] + struct.pack('>H', checksum(frame[14:34])) + frame[26:]
|
|
|
|
fromsock.send(frame)
|
|
|
|
return (frame, fromsock, tosock, src, dst)
|
|
|
|
def test_connected(if0=None, if1=None, group=True, expect_fail=False):
|
|
if expect_fail:
|
|
timeout = 0
|
|
else:
|
|
timeout = 10
|
|
|
|
if if0 is None or if1 is None:
|
|
iwd_list = [dev.name for dev in iwd.IWD.get_instance().list_devices()]
|
|
|
|
non_iwd_list = [rad.interface.name for rad in ctx.radios if rad.interface is not None]
|
|
|
|
for intf in iwd_list + non_iwd_list:
|
|
if if0 is None:
|
|
if0 = intf
|
|
elif if1 is None and intf != if0:
|
|
if1 = intf
|
|
|
|
sock0, addr0 = raw_if_socket(if0)
|
|
sock1, addr1 = raw_if_socket(if1)
|
|
bcast = b'\xff\xff\xff\xff\xff\xff'
|
|
|
|
try:
|
|
frames = [
|
|
tx(sock0, sock1, addr0, addr1),
|
|
tx(sock1, sock0, addr1, addr0),
|
|
]
|
|
|
|
rec = [False, False]
|
|
|
|
if group:
|
|
frames.append(tx(sock0, sock1, addr0, bcast))
|
|
frames.append(tx(sock1, sock0, addr1, bcast))
|
|
rec.append(False)
|
|
rec.append(False)
|
|
|
|
while not all(rec):
|
|
r, w, x = select.select([sock0, sock1], [], [], timeout)
|
|
if not r:
|
|
raise Exception('timeout waiting for packets: ' + repr(rec))
|
|
|
|
for s in r:
|
|
data, src = s.recvfrom(HWSIM_PACKETLEN + 1)
|
|
print('received ' + repr(data[:40]) + '... from ' + str(src))
|
|
if len(data) != HWSIM_PACKETLEN:
|
|
continue
|
|
|
|
idx = 0
|
|
for origdata, fromsock, tosock, origsrc, origdst in frames:
|
|
if s is tosock and src[4] == origsrc and data == origdata:
|
|
print('matches frame ' + str(idx))
|
|
break
|
|
idx += 1
|
|
else:
|
|
print('doesn\'t match any of our frames')
|
|
continue
|
|
|
|
if rec[idx]:
|
|
raise Exception('duplicate frame ' + str(idx))
|
|
|
|
rec[idx] = True
|
|
finally:
|
|
sock0.close()
|
|
sock1.close()
|
|
|
|
def test_ifaces_connected(if0=None, if1=None, group=True, expect_fail=False):
|
|
retry = 0
|
|
while True:
|
|
try:
|
|
test_connected(if0, if1, group, expect_fail)
|
|
break
|
|
|
|
except Exception as e:
|
|
if retry < 3 and not expect_fail:
|
|
print('retrying connection test: %i' % retry)
|
|
retry += 1
|
|
continue
|
|
raise e
|
|
|
|
SIOCGIFFLAGS = 0x8913
|
|
SIOCGIFADDR = 0x8915
|
|
IFF_UP = 1 << 0
|
|
IFF_RUNNING = 1 << 6
|
|
|
|
def _test_operstate(intf):
|
|
sock = socket.socket(socket.PF_PACKET, socket.SOCK_RAW)
|
|
|
|
try:
|
|
ifreq = struct.pack('16sh', intf.encode('utf8'), 0)
|
|
flags = struct.unpack('16sh', fcntl.ioctl(sock, SIOCGIFFLAGS, ifreq))[1]
|
|
|
|
# IFF_LOWER_UP and IFF_DORMANT not returned by SIOCGIFFLAGS
|
|
if flags & (IFF_UP | IFF_RUNNING) != IFF_UP | IFF_RUNNING:
|
|
return False
|
|
|
|
return True
|
|
finally:
|
|
sock.close()
|
|
|
|
def test_iface_operstate(intf=None):
|
|
if not intf:
|
|
intf = iwd.IWD.get_instance().list_devices()[0].name
|
|
|
|
ctx.non_block_wait(_test_operstate, 10, intf,
|
|
exception=Exception(intf + ' operstate wrong'))
|
|
|
|
def test_ip_address_match(intf, ip):
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
addr = fcntl.ioctl(s.fileno(), SIOCGIFADDR, struct.pack('256s', intf.encode('utf-8')))
|
|
addr = socket.inet_ntoa(addr[20:24])
|
|
except OSError as e:
|
|
if e.errno != 99 or ip != None:
|
|
raise Exception('SIOCGIFADDR failed with %d' % e.errno)
|
|
|
|
return
|
|
|
|
if ip != addr:
|
|
raise Exception('IP for %s did not match %s (was %s)' % (intf, ip, addr))
|
|
|
|
def test_ip_connected(tup0, tup1):
|
|
ip0, ns0 = tup0
|
|
ip1, ns1 = tup1
|
|
|
|
try:
|
|
ns0.start_process(['ping', '-c', '5', '-i', '0.2', ip1], check=True)
|
|
ns1.start_process(['ping', '-c', '5', '-i', '0.2', ip0], check=True)
|
|
except:
|
|
raise Exception('Could not ping between %s and %s' % (ip0, ip1))
|