mirror of
https://git.kernel.org/pub/scm/network/wireless/iwd.git
synced 2024-12-18 17:22:50 +01:00
1e1a6d6754
This sends data over the raw sockets similar to test_ifaces_connected
297 lines
9.8 KiB
Python
297 lines
9.8 KiB
Python
#! /usr/bin/python3
|
|
# Roughly based on wpa_supplicant's mac80211_hwsim/tools/hwsim_test.c utility.
|
|
import socket
|
|
import fcntl
|
|
import struct
|
|
import select
|
|
import codecs
|
|
import collections
|
|
|
|
import iwd
|
|
from config import ctx
|
|
|
|
HWSIM_ETHERTYPE = 0x0800
|
|
HWSIM_PACKETLEN = 250
|
|
|
|
def raw_if_socket(intf):
|
|
sock = socket.socket(socket.PF_PACKET, socket.SOCK_RAW,
|
|
socket.htons(HWSIM_ETHERTYPE))
|
|
|
|
sock.bind((intf, HWSIM_ETHERTYPE))
|
|
|
|
return (sock, sock.getsockname()[4])
|
|
|
|
def checksum(buf):
|
|
pairs = zip(buf[0::2], buf[1::2])
|
|
s = sum([(h << 8) + l for h, l in pairs])
|
|
|
|
while s >> 16:
|
|
s = (s & 0xffff) + (s >> 16)
|
|
|
|
return s ^ 0xffff
|
|
|
|
def tx(fromsock, tosock, src, dst):
|
|
frame = b''.join([
|
|
dst, # eth.rmac
|
|
src, # eth.lmac
|
|
struct.pack('!H', HWSIM_ETHERTYPE), # eth.type
|
|
b'\x45', # ip.hdr_len
|
|
b'\x00', # ip.dsfield
|
|
struct.pack('!H', HWSIM_PACKETLEN - 14), # ip.len
|
|
b'\x01\x23', # ip.id
|
|
b'\x40\x00', # ip.flags, ip.frag_offset
|
|
b'\x40', # ip.ttl
|
|
b'\x01', # ip.proto
|
|
struct.pack('>H', 0), # ip.checksum
|
|
socket.inet_aton('192.168.1.1'), # ip.src
|
|
socket.inet_aton('192.168.1.2'), # ip.dst
|
|
bytes(range(0, HWSIM_PACKETLEN - 14 - 20))
|
|
])
|
|
frame = frame[:24] + struct.pack('>H', checksum(frame[14:34])) + frame[26:]
|
|
|
|
fromsock.send(frame)
|
|
|
|
return (frame, fromsock, tosock, src, dst)
|
|
|
|
def tx_packets(if0, if1, num):
|
|
sock0, addr0 = raw_if_socket(if0)
|
|
sock1, addr1 = raw_if_socket(if1)
|
|
|
|
for i in range(num):
|
|
tx(sock0, sock1, addr0, addr1)
|
|
|
|
def test_connected(if0=None, if1=None, group=True, expect_fail=False):
|
|
if expect_fail:
|
|
timeout = 0
|
|
else:
|
|
timeout = 10
|
|
|
|
if if0 is None or if1 is None:
|
|
iwd_list = [dev.name for dev in iwd.IWD.get_instance().list_devices()]
|
|
|
|
non_iwd_list = [rad.interface.name for rad in ctx.radios if rad.interface is not None]
|
|
|
|
for intf in iwd_list + non_iwd_list:
|
|
if if0 is None:
|
|
if0 = intf
|
|
elif if1 is None and intf != if0:
|
|
if1 = intf
|
|
|
|
sock0, addr0 = raw_if_socket(if0)
|
|
sock1, addr1 = raw_if_socket(if1)
|
|
bcast = b'\xff\xff\xff\xff\xff\xff'
|
|
|
|
try:
|
|
frames = [
|
|
tx(sock0, sock1, addr0, addr1),
|
|
tx(sock1, sock0, addr1, addr0),
|
|
]
|
|
|
|
rec = [False, False]
|
|
|
|
if group:
|
|
frames.append(tx(sock0, sock1, addr0, bcast))
|
|
frames.append(tx(sock1, sock0, addr1, bcast))
|
|
rec.append(False)
|
|
rec.append(False)
|
|
|
|
while not all(rec):
|
|
r, w, x = select.select([sock0, sock1], [], [], timeout)
|
|
if not r:
|
|
raise Exception('timeout waiting for packets: ' + repr(rec))
|
|
|
|
for s in r:
|
|
data, src = s.recvfrom(HWSIM_PACKETLEN + 1)
|
|
print('received ' + repr(data[:40]) + '... from ' + str(src))
|
|
if len(data) != HWSIM_PACKETLEN:
|
|
continue
|
|
|
|
idx = 0
|
|
for origdata, fromsock, tosock, origsrc, origdst in frames:
|
|
if s is tosock and src[4] == origsrc and data == origdata:
|
|
print('matches frame ' + str(idx))
|
|
break
|
|
idx += 1
|
|
else:
|
|
print('doesn\'t match any of our frames')
|
|
continue
|
|
|
|
if rec[idx]:
|
|
raise Exception('duplicate frame ' + str(idx))
|
|
|
|
rec[idx] = True
|
|
finally:
|
|
sock0.close()
|
|
sock1.close()
|
|
|
|
def test_ifaces_connected(if0=None, if1=None, group=True, expect_fail=False):
|
|
retry = 0
|
|
while True:
|
|
try:
|
|
test_connected(if0, if1, group, expect_fail)
|
|
break
|
|
|
|
except Exception as e:
|
|
if retry < 3 and not expect_fail:
|
|
print('retrying connection test: %i' % retry)
|
|
retry += 1
|
|
continue
|
|
raise e
|
|
|
|
SIOCGIFFLAGS = 0x8913
|
|
SIOCGIFADDR = 0x8915
|
|
SIOCGIFNETMASK = 0x891b
|
|
IFF_UP = 1 << 0
|
|
IFF_RUNNING = 1 << 6
|
|
|
|
def _test_operstate(intf):
|
|
sock = socket.socket(socket.PF_PACKET, socket.SOCK_RAW)
|
|
|
|
try:
|
|
ifreq = struct.pack('16sh', intf.encode('utf8'), 0)
|
|
flags = struct.unpack('16sh', fcntl.ioctl(sock, SIOCGIFFLAGS, ifreq))[1]
|
|
|
|
# IFF_LOWER_UP and IFF_DORMANT not returned by SIOCGIFFLAGS
|
|
if flags & (IFF_UP | IFF_RUNNING) != IFF_UP | IFF_RUNNING:
|
|
return False
|
|
|
|
return True
|
|
finally:
|
|
sock.close()
|
|
|
|
def test_iface_operstate(intf=None):
|
|
if not intf:
|
|
intf = iwd.IWD.get_instance().list_devices()[0].name
|
|
|
|
ctx.non_block_wait(_test_operstate, 10, intf,
|
|
exception=Exception(intf + ' operstate wrong'))
|
|
|
|
def get_addrs6(ifname):
|
|
f = open('/proc/net/if_inet6', 'r')
|
|
lines = f.readlines()
|
|
f.close()
|
|
for line in lines:
|
|
addr_str, _, plen, _, _, addr_ifname = line.split()
|
|
if ifname is not None and addr_ifname != ifname:
|
|
continue
|
|
|
|
yield (codecs.decode(addr_str, 'hex'), int(plen, 16), addr_ifname)
|
|
|
|
def test_ip_address_match(intf, expected_addr_str, expected_plen=None, match_plen=None):
|
|
def mask_addr(addr, plen):
|
|
if plen is None or len(addr) * 8 <= plen:
|
|
return addr
|
|
bytelen = int(plen / 8)
|
|
return addr[0:bytelen] + bytes([addr[bytelen] & (0xff00 >> (plen & 7))]) + b'\0' * (len(addr) - bytelen - 1)
|
|
if expected_addr_str is not None:
|
|
try:
|
|
expected_addr = socket.inet_pton(socket.AF_INET, expected_addr_str)
|
|
family = socket.AF_INET
|
|
except OSError as e:
|
|
try:
|
|
expected_addr = socket.inet_pton(socket.AF_INET6, expected_addr_str)
|
|
family = socket.AF_INET6
|
|
except OSError as e2:
|
|
raise e2 from None
|
|
expected_addr = mask_addr(expected_addr, match_plen)
|
|
else:
|
|
expected_addr = None
|
|
family = socket.AF_INET
|
|
|
|
if family == socket.AF_INET:
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
out = fcntl.ioctl(s.fileno(), SIOCGIFADDR, struct.pack('256s', intf.encode('utf-8')))
|
|
actual_addr = mask_addr(out[20:24], match_plen)
|
|
out = fcntl.ioctl(s.fileno(), SIOCGIFNETMASK, struct.pack('256s', intf.encode('utf-8')))
|
|
actual_plen = sum([sum([(byte >> bit) & 1 for bit in range(0, 8)]) for byte in out[20:24]]) # count bits
|
|
except OSError as e:
|
|
if e.errno == 99 and expected_addr is None:
|
|
return
|
|
|
|
raise Exception('SIOCGIFADDR/SIOCGIFNETMASK failed with %d' % e.errno)
|
|
else:
|
|
# The "get" ioctls don't work for IPv6, netdevice(7) recommends reading /proc/net instead,
|
|
# which on the other hand works *only* for IPv6
|
|
actual_addr = None
|
|
actual_plen = None
|
|
for addr, plen, _ in get_addrs6(intf):
|
|
actual_addr = mask_addr(addr, match_plen)
|
|
actual_plen = plen
|
|
if actual_addr == expected_addr:
|
|
break
|
|
|
|
if expected_addr != actual_addr:
|
|
raise Exception('IP for %s did not match %s (was %s)' %
|
|
(intf, expected_addr_str, socket.inet_ntop(family, actual_addr)))
|
|
|
|
if expected_plen is not None and expected_plen != actual_plen:
|
|
raise Exception('Prefix Length for %s did not match %i (was %i)' %
|
|
(intf, expected_plen, actual_plen))
|
|
|
|
def test_ip_connected(tup0, tup1):
|
|
ip0, ns0 = tup0
|
|
ip1, ns1 = tup1
|
|
|
|
try:
|
|
ns0.start_process(['ping', '-c', '5', '-i', '0.2', ip1], check=True)
|
|
ns1.start_process(['ping', '-c', '5', '-i', '0.2', ip0], check=True)
|
|
except:
|
|
raise Exception('Could not ping between %s and %s' % (ip0, ip1))
|
|
|
|
RouteInfo = collections.namedtuple('RouteInfo', 'dst plen gw flags ifname',
|
|
defaults=(None, None, None, 0, ''))
|
|
|
|
def get_routes4(ifname=None):
|
|
f = open('/proc/net/route', 'r')
|
|
lines = f.readlines()
|
|
f.close()
|
|
for line in lines[1:]: # Skip header line
|
|
route_ifname, dst_str, gw_str, flags, ref_cnt, use_cnt, metric, mask_str, \
|
|
mtu = line.strip().split(maxsplit=8)
|
|
if ifname is not None and route_ifname != ifname:
|
|
continue
|
|
|
|
dst = codecs.decode(dst_str, 'hex')[::-1]
|
|
mask = int(mask_str, 16)
|
|
plen = sum([(mask >> bit) & 1 for bit in range(0, 32)]) # count bits
|
|
gw = codecs.decode(gw_str, 'hex')[::-1]
|
|
|
|
if dst == b'\0\0\0\0':
|
|
dst = None
|
|
plen = None
|
|
if gw == b'\0\0\0\0':
|
|
gw = None
|
|
yield RouteInfo(dst, plen, gw, int(flags, 16), route_ifname)
|
|
|
|
def get_routes6(ifname=None):
|
|
f = open('/proc/net/ipv6_route', 'r')
|
|
lines = f.readlines()
|
|
f.close()
|
|
for line in lines:
|
|
dst_str, dst_plen_str, src_str, src_plen_str, gw_str, metric, ref_cnt, \
|
|
use_cnt, flags, route_ifname = line.strip().split(maxsplit=9)
|
|
if ifname is not None and route_ifname != ifname:
|
|
continue
|
|
|
|
dst = codecs.decode(dst_str, 'hex')
|
|
plen = int(dst_plen_str, 16)
|
|
gw = codecs.decode(gw_str, 'hex')
|
|
|
|
if dst[0] == 0xff or dst[:2] == b'\xfe\x80': # Skip link-local and multicast
|
|
continue
|
|
|
|
# Skip RTN_LOCAL-type routes, we don't need to validate them since they're added by
|
|
# the kernel and we can't simply add them to the expected list (the list that we
|
|
# validate against) because they're added a short time after an address (due to DAD?)
|
|
# and would create race conditions
|
|
if int(flags, 16) & (1 << 31):
|
|
continue
|
|
|
|
if dst == b'\0' * 16:
|
|
dst = None
|
|
plen = None
|
|
if gw == b'\0' * 16:
|
|
gw = None
|
|
yield RouteInfo(dst, plen, gw, int(flags, 16) & 0xf, route_ifname)
|