mirror of
https://git.kernel.org/pub/scm/network/wireless/iwd.git
synced 2024-12-21 03:32:42 +01:00
c4fe900a71
Check that the right set of routes is being added for IPv4 and IPv6. Chane gateway addresses to differ from the AP or dhcpd addresses.
290 lines
9.6 KiB
Python
290 lines
9.6 KiB
Python
#! /usr/bin/python3
|
|
# Roughly based on wpa_supplicant's mac80211_hwsim/tools/hwsim_test.c utility.
|
|
import socket
|
|
import fcntl
|
|
import struct
|
|
import select
|
|
import codecs
|
|
import collections
|
|
|
|
import iwd
|
|
from config import ctx
|
|
|
|
HWSIM_ETHERTYPE = 0x0800
|
|
HWSIM_PACKETLEN = 250
|
|
|
|
def raw_if_socket(intf):
|
|
sock = socket.socket(socket.PF_PACKET, socket.SOCK_RAW,
|
|
socket.htons(HWSIM_ETHERTYPE))
|
|
|
|
sock.bind((intf, HWSIM_ETHERTYPE))
|
|
|
|
return (sock, sock.getsockname()[4])
|
|
|
|
def checksum(buf):
|
|
pairs = zip(buf[0::2], buf[1::2])
|
|
s = sum([(h << 8) + l for h, l in pairs])
|
|
|
|
while s >> 16:
|
|
s = (s & 0xffff) + (s >> 16)
|
|
|
|
return s ^ 0xffff
|
|
|
|
def tx(fromsock, tosock, src, dst):
|
|
frame = b''.join([
|
|
dst, # eth.rmac
|
|
src, # eth.lmac
|
|
struct.pack('!H', HWSIM_ETHERTYPE), # eth.type
|
|
b'\x45', # ip.hdr_len
|
|
b'\x00', # ip.dsfield
|
|
struct.pack('!H', HWSIM_PACKETLEN - 14), # ip.len
|
|
b'\x01\x23', # ip.id
|
|
b'\x40\x00', # ip.flags, ip.frag_offset
|
|
b'\x40', # ip.ttl
|
|
b'\x01', # ip.proto
|
|
struct.pack('>H', 0), # ip.checksum
|
|
socket.inet_aton('192.168.1.1'), # ip.src
|
|
socket.inet_aton('192.168.1.2'), # ip.dst
|
|
bytes(range(0, HWSIM_PACKETLEN - 14 - 20))
|
|
])
|
|
frame = frame[:24] + struct.pack('>H', checksum(frame[14:34])) + frame[26:]
|
|
|
|
fromsock.send(frame)
|
|
|
|
return (frame, fromsock, tosock, src, dst)
|
|
|
|
def test_connected(if0=None, if1=None, group=True, expect_fail=False):
|
|
if expect_fail:
|
|
timeout = 0
|
|
else:
|
|
timeout = 10
|
|
|
|
if if0 is None or if1 is None:
|
|
iwd_list = [dev.name for dev in iwd.IWD.get_instance().list_devices()]
|
|
|
|
non_iwd_list = [rad.interface.name for rad in ctx.radios if rad.interface is not None]
|
|
|
|
for intf in iwd_list + non_iwd_list:
|
|
if if0 is None:
|
|
if0 = intf
|
|
elif if1 is None and intf != if0:
|
|
if1 = intf
|
|
|
|
sock0, addr0 = raw_if_socket(if0)
|
|
sock1, addr1 = raw_if_socket(if1)
|
|
bcast = b'\xff\xff\xff\xff\xff\xff'
|
|
|
|
try:
|
|
frames = [
|
|
tx(sock0, sock1, addr0, addr1),
|
|
tx(sock1, sock0, addr1, addr0),
|
|
]
|
|
|
|
rec = [False, False]
|
|
|
|
if group:
|
|
frames.append(tx(sock0, sock1, addr0, bcast))
|
|
frames.append(tx(sock1, sock0, addr1, bcast))
|
|
rec.append(False)
|
|
rec.append(False)
|
|
|
|
while not all(rec):
|
|
r, w, x = select.select([sock0, sock1], [], [], timeout)
|
|
if not r:
|
|
raise Exception('timeout waiting for packets: ' + repr(rec))
|
|
|
|
for s in r:
|
|
data, src = s.recvfrom(HWSIM_PACKETLEN + 1)
|
|
print('received ' + repr(data[:40]) + '... from ' + str(src))
|
|
if len(data) != HWSIM_PACKETLEN:
|
|
continue
|
|
|
|
idx = 0
|
|
for origdata, fromsock, tosock, origsrc, origdst in frames:
|
|
if s is tosock and src[4] == origsrc and data == origdata:
|
|
print('matches frame ' + str(idx))
|
|
break
|
|
idx += 1
|
|
else:
|
|
print('doesn\'t match any of our frames')
|
|
continue
|
|
|
|
if rec[idx]:
|
|
raise Exception('duplicate frame ' + str(idx))
|
|
|
|
rec[idx] = True
|
|
finally:
|
|
sock0.close()
|
|
sock1.close()
|
|
|
|
def test_ifaces_connected(if0=None, if1=None, group=True, expect_fail=False):
|
|
retry = 0
|
|
while True:
|
|
try:
|
|
test_connected(if0, if1, group, expect_fail)
|
|
break
|
|
|
|
except Exception as e:
|
|
if retry < 3 and not expect_fail:
|
|
print('retrying connection test: %i' % retry)
|
|
retry += 1
|
|
continue
|
|
raise e
|
|
|
|
SIOCGIFFLAGS = 0x8913
|
|
SIOCGIFADDR = 0x8915
|
|
SIOCGIFNETMASK = 0x891b
|
|
IFF_UP = 1 << 0
|
|
IFF_RUNNING = 1 << 6
|
|
|
|
def _test_operstate(intf):
|
|
sock = socket.socket(socket.PF_PACKET, socket.SOCK_RAW)
|
|
|
|
try:
|
|
ifreq = struct.pack('16sh', intf.encode('utf8'), 0)
|
|
flags = struct.unpack('16sh', fcntl.ioctl(sock, SIOCGIFFLAGS, ifreq))[1]
|
|
|
|
# IFF_LOWER_UP and IFF_DORMANT not returned by SIOCGIFFLAGS
|
|
if flags & (IFF_UP | IFF_RUNNING) != IFF_UP | IFF_RUNNING:
|
|
return False
|
|
|
|
return True
|
|
finally:
|
|
sock.close()
|
|
|
|
def test_iface_operstate(intf=None):
|
|
if not intf:
|
|
intf = iwd.IWD.get_instance().list_devices()[0].name
|
|
|
|
ctx.non_block_wait(_test_operstate, 10, intf,
|
|
exception=Exception(intf + ' operstate wrong'))
|
|
|
|
def get_addrs6(ifname):
|
|
f = open('/proc/net/if_inet6', 'r')
|
|
lines = f.readlines()
|
|
f.close()
|
|
for line in lines:
|
|
addr_str, _, plen, _, _, addr_ifname = line.split()
|
|
if ifname is not None and addr_ifname != ifname:
|
|
continue
|
|
|
|
yield (codecs.decode(addr_str, 'hex'), int(plen, 16), addr_ifname)
|
|
|
|
def test_ip_address_match(intf, expected_addr_str, expected_plen=None, match_plen=None):
|
|
def mask_addr(addr, plen):
|
|
if plen is None or len(addr) * 8 <= plen:
|
|
return addr
|
|
bytelen = int(plen / 8)
|
|
return addr[0:bytelen] + bytes([addr[bytelen] & (0xff00 >> (plen & 7))]) + b'\0' * (len(addr) - bytelen - 1)
|
|
if expected_addr_str is not None:
|
|
try:
|
|
expected_addr = socket.inet_pton(socket.AF_INET, expected_addr_str)
|
|
family = socket.AF_INET
|
|
except OSError as e:
|
|
try:
|
|
expected_addr = socket.inet_pton(socket.AF_INET6, expected_addr_str)
|
|
family = socket.AF_INET6
|
|
except OSError as e2:
|
|
raise e2 from None
|
|
expected_addr = mask_addr(expected_addr, match_plen)
|
|
else:
|
|
expected_addr = None
|
|
family = socket.AF_INET
|
|
|
|
if family == socket.AF_INET:
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
out = fcntl.ioctl(s.fileno(), SIOCGIFADDR, struct.pack('256s', intf.encode('utf-8')))
|
|
actual_addr = mask_addr(out[20:24], match_plen)
|
|
out = fcntl.ioctl(s.fileno(), SIOCGIFNETMASK, struct.pack('256s', intf.encode('utf-8')))
|
|
actual_plen = sum([sum([(byte >> bit) & 1 for bit in range(0, 8)]) for byte in out[20:24]]) # count bits
|
|
except OSError as e:
|
|
if e.errno == 99 and expected_addr is None:
|
|
return
|
|
|
|
raise Exception('SIOCGIFADDR/SIOCGIFNETMASK failed with %d' % e.errno)
|
|
else:
|
|
# The "get" ioctls don't work for IPv6, netdevice(7) recommends reading /proc/net instead,
|
|
# which on the other hand works *only* for IPv6
|
|
actual_addr = None
|
|
actual_plen = None
|
|
for addr, plen, _ in get_addrs6(intf):
|
|
actual_addr = mask_addr(addr, match_plen)
|
|
actual_plen = plen
|
|
if actual_addr == expected_addr:
|
|
break
|
|
|
|
if expected_addr != actual_addr:
|
|
raise Exception('IP for %s did not match %s (was %s)' %
|
|
(intf, expected_addr_str, socket.inet_ntop(family, actual_addr)))
|
|
|
|
if expected_plen is not None and expected_plen != actual_plen:
|
|
raise Exception('Prefix Length for %s did not match %i (was %i)' %
|
|
(intf, expected_plen, actual_plen))
|
|
|
|
def test_ip_connected(tup0, tup1):
|
|
ip0, ns0 = tup0
|
|
ip1, ns1 = tup1
|
|
|
|
try:
|
|
ns0.start_process(['ping', '-c', '5', '-i', '0.2', ip1], check=True)
|
|
ns1.start_process(['ping', '-c', '5', '-i', '0.2', ip0], check=True)
|
|
except:
|
|
raise Exception('Could not ping between %s and %s' % (ip0, ip1))
|
|
|
|
RouteInfo = collections.namedtuple('RouteInfo', 'dst plen gw flags ifname',
|
|
defaults=(None, None, None, 0, ''))
|
|
|
|
def get_routes4(ifname=None):
|
|
f = open('/proc/net/route', 'r')
|
|
lines = f.readlines()
|
|
f.close()
|
|
for line in lines[1:]: # Skip header line
|
|
route_ifname, dst_str, gw_str, flags, ref_cnt, use_cnt, metric, mask_str, \
|
|
mtu = line.strip().split(maxsplit=8)
|
|
if ifname is not None and route_ifname != ifname:
|
|
continue
|
|
|
|
dst = codecs.decode(dst_str, 'hex')[::-1]
|
|
mask = int(mask_str, 16)
|
|
plen = sum([(mask >> bit) & 1 for bit in range(0, 32)]) # count bits
|
|
gw = codecs.decode(gw_str, 'hex')[::-1]
|
|
|
|
if dst == b'\0\0\0\0':
|
|
dst = None
|
|
plen = None
|
|
if gw == b'\0\0\0\0':
|
|
gw = None
|
|
yield RouteInfo(dst, plen, gw, int(flags, 16), route_ifname)
|
|
|
|
def get_routes6(ifname=None):
|
|
f = open('/proc/net/ipv6_route', 'r')
|
|
lines = f.readlines()
|
|
f.close()
|
|
for line in lines:
|
|
dst_str, dst_plen_str, src_str, src_plen_str, gw_str, metric, ref_cnt, \
|
|
use_cnt, flags, route_ifname = line.strip().split(maxsplit=9)
|
|
if ifname is not None and route_ifname != ifname:
|
|
continue
|
|
|
|
dst = codecs.decode(dst_str, 'hex')
|
|
plen = int(dst_plen_str, 16)
|
|
gw = codecs.decode(gw_str, 'hex')
|
|
|
|
if dst[0] == 0xff or dst[:2] == b'\xfe\x80': # Skip link-local and multicast
|
|
continue
|
|
|
|
# Skip RTN_LOCAL-type routes, we don't need to validate them since they're added by
|
|
# the kernel and we can't simply add them to the expected list (the list that we
|
|
# validate against) because they're added a short time after an address (due to DAD?)
|
|
# and would create race conditions
|
|
if int(flags, 16) & (1 << 31):
|
|
continue
|
|
|
|
if dst == b'\0' * 16:
|
|
dst = None
|
|
plen = None
|
|
if gw == b'\0' * 16:
|
|
gw = None
|
|
yield RouteInfo(dst, plen, gw, int(flags, 16) & 0xf, route_ifname)
|