3
0
mirror of https://git.kernel.org/pub/scm/network/wireless/iwd.git synced 2026-02-15 19:27:55 +01:00
iwd/autotests/util/testutil.py
Andrew Zaborowski 57888632a3 auto-t: Validate netmasks in testNetconfig, add utility
Extend test_ip_address_match to support IPv6 and to test the
netmask/prefix length while it reads the local address since those are
retrieved using the same API.

Modify testNetconfig to validate the prefix lengths, change the prefix
lengths to be less common values (not 24 bits for IPv4 or 64 for IPv6),
minor cleanup.
2022-06-22 15:56:35 -05:00

233 lines
7.5 KiB
Python

#! /usr/bin/python3
# Roughly based on wpa_supplicant's mac80211_hwsim/tools/hwsim_test.c utility.
import socket
import fcntl
import struct
import select
import codecs
import iwd
from config import ctx
HWSIM_ETHERTYPE = 0x0800
HWSIM_PACKETLEN = 250
def raw_if_socket(intf):
sock = socket.socket(socket.PF_PACKET, socket.SOCK_RAW,
socket.htons(HWSIM_ETHERTYPE))
sock.bind((intf, HWSIM_ETHERTYPE))
return (sock, sock.getsockname()[4])
def checksum(buf):
pairs = zip(buf[0::2], buf[1::2])
s = sum([(h << 8) + l for h, l in pairs])
while s >> 16:
s = (s & 0xffff) + (s >> 16)
return s ^ 0xffff
def tx(fromsock, tosock, src, dst):
frame = b''.join([
dst, # eth.rmac
src, # eth.lmac
struct.pack('!H', HWSIM_ETHERTYPE), # eth.type
b'\x45', # ip.hdr_len
b'\x00', # ip.dsfield
struct.pack('!H', HWSIM_PACKETLEN - 14), # ip.len
b'\x01\x23', # ip.id
b'\x40\x00', # ip.flags, ip.frag_offset
b'\x40', # ip.ttl
b'\x01', # ip.proto
struct.pack('>H', 0), # ip.checksum
socket.inet_aton('192.168.1.1'), # ip.src
socket.inet_aton('192.168.1.2'), # ip.dst
bytes(range(0, HWSIM_PACKETLEN - 14 - 20))
])
frame = frame[:24] + struct.pack('>H', checksum(frame[14:34])) + frame[26:]
fromsock.send(frame)
return (frame, fromsock, tosock, src, dst)
def test_connected(if0=None, if1=None, group=True, expect_fail=False):
if expect_fail:
timeout = 0
else:
timeout = 10
if if0 is None or if1 is None:
iwd_list = [dev.name for dev in iwd.IWD.get_instance().list_devices()]
non_iwd_list = [rad.interface.name for rad in ctx.radios if rad.interface is not None]
for intf in iwd_list + non_iwd_list:
if if0 is None:
if0 = intf
elif if1 is None and intf != if0:
if1 = intf
sock0, addr0 = raw_if_socket(if0)
sock1, addr1 = raw_if_socket(if1)
bcast = b'\xff\xff\xff\xff\xff\xff'
try:
frames = [
tx(sock0, sock1, addr0, addr1),
tx(sock1, sock0, addr1, addr0),
]
rec = [False, False]
if group:
frames.append(tx(sock0, sock1, addr0, bcast))
frames.append(tx(sock1, sock0, addr1, bcast))
rec.append(False)
rec.append(False)
while not all(rec):
r, w, x = select.select([sock0, sock1], [], [], timeout)
if not r:
raise Exception('timeout waiting for packets: ' + repr(rec))
for s in r:
data, src = s.recvfrom(HWSIM_PACKETLEN + 1)
print('received ' + repr(data[:40]) + '... from ' + str(src))
if len(data) != HWSIM_PACKETLEN:
continue
idx = 0
for origdata, fromsock, tosock, origsrc, origdst in frames:
if s is tosock and src[4] == origsrc and data == origdata:
print('matches frame ' + str(idx))
break
idx += 1
else:
print('doesn\'t match any of our frames')
continue
if rec[idx]:
raise Exception('duplicate frame ' + str(idx))
rec[idx] = True
finally:
sock0.close()
sock1.close()
def test_ifaces_connected(if0=None, if1=None, group=True, expect_fail=False):
retry = 0
while True:
try:
test_connected(if0, if1, group, expect_fail)
break
except Exception as e:
if retry < 3 and not expect_fail:
print('retrying connection test: %i' % retry)
retry += 1
continue
raise e
SIOCGIFFLAGS = 0x8913
SIOCGIFADDR = 0x8915
SIOCGIFNETMASK = 0x891b
IFF_UP = 1 << 0
IFF_RUNNING = 1 << 6
def _test_operstate(intf):
sock = socket.socket(socket.PF_PACKET, socket.SOCK_RAW)
try:
ifreq = struct.pack('16sh', intf.encode('utf8'), 0)
flags = struct.unpack('16sh', fcntl.ioctl(sock, SIOCGIFFLAGS, ifreq))[1]
# IFF_LOWER_UP and IFF_DORMANT not returned by SIOCGIFFLAGS
if flags & (IFF_UP | IFF_RUNNING) != IFF_UP | IFF_RUNNING:
return False
return True
finally:
sock.close()
def test_iface_operstate(intf=None):
if not intf:
intf = iwd.IWD.get_instance().list_devices()[0].name
ctx.non_block_wait(_test_operstate, 10, intf,
exception=Exception(intf + ' operstate wrong'))
def get_addrs6(ifname):
f = open('/proc/net/if_inet6', 'r')
lines = f.readlines()
f.close()
for line in lines:
addr_str, _, plen, _, _, addr_ifname = line.split()
if ifname is not None and addr_ifname != ifname:
continue
yield (codecs.decode(addr_str, 'hex'), int(plen, 16), addr_ifname)
def test_ip_address_match(intf, expected_addr_str, expected_plen=None, match_plen=None):
def mask_addr(addr, plen):
if plen is None or len(addr) * 8 <= plen:
return addr
bytelen = int(plen / 8)
return addr[0:bytelen] + bytes([addr[bytelen] & (0xff00 >> (plen & 7))]) + b'\0' * (len(addr) - bytelen - 1)
if expected_addr_str is not None:
try:
expected_addr = socket.inet_pton(socket.AF_INET, expected_addr_str)
family = socket.AF_INET
except OSError as e:
try:
expected_addr = socket.inet_pton(socket.AF_INET6, expected_addr_str)
family = socket.AF_INET6
except OSError as e2:
raise e2 from None
expected_addr = mask_addr(expected_addr, match_plen)
else:
expected_addr = None
family = socket.AF_INET
if family == socket.AF_INET:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
out = fcntl.ioctl(s.fileno(), SIOCGIFADDR, struct.pack('256s', intf.encode('utf-8')))
actual_addr = mask_addr(out[20:24], match_plen)
out = fcntl.ioctl(s.fileno(), SIOCGIFNETMASK, struct.pack('256s', intf.encode('utf-8')))
actual_plen = sum([sum([(byte >> bit) & 1 for bit in range(0, 8)]) for byte in out[20:24]]) # count bits
except OSError as e:
if e.errno == 99 and expected_addr is None:
return
raise Exception('SIOCGIFADDR/SIOCGIFNETMASK failed with %d' % e.errno)
else:
# The "get" ioctls don't work for IPv6, netdevice(7) recommends reading /proc/net instead,
# which on the other hand works *only* for IPv6
actual_addr = None
actual_plen = None
for addr, plen, _ in get_addrs6(intf):
actual_addr = mask_addr(addr, match_plen)
actual_plen = plen
if actual_addr == expected_addr:
break
if expected_addr != actual_addr:
raise Exception('IP for %s did not match %s (was %s)' %
(intf, expected_addr_str, socket.inet_ntop(family, actual_addr)))
if expected_plen is not None and expected_plen != actual_plen:
raise Exception('Prefix Length for %s did not match %i (was %i)' %
(intf, expected_plen, actual_plen))
def test_ip_connected(tup0, tup1):
ip0, ns0 = tup0
ip1, ns1 = tup1
try:
ns0.start_process(['ping', '-c', '5', '-i', '0.2', ip1], check=True)
ns1.start_process(['ping', '-c', '5', '-i', '0.2', ip0], check=True)
except:
raise Exception('Could not ping between %s and %s' % (ip0, ip1))