mirror of
https://git.kernel.org/pub/scm/network/wireless/iwd.git
synced 2024-11-29 22:19:23 +01:00
576d8ad123
For the interface connectivity tests obtain the lists of interfaces in use directly from the IWD class, which has the current list from DBus properties.
141 lines
4.1 KiB
Python
141 lines
4.1 KiB
Python
#! /usr/bin/python3
|
|
# Rougly based on wpa_supplicant's mac80211_hwsim/tools/hwsim_test.c utility.
|
|
import socket
|
|
import fcntl
|
|
import struct
|
|
import select
|
|
|
|
import wiphy
|
|
import iwd
|
|
|
|
HWSIM_ETHERTYPE = 0x0800
|
|
HWSIM_PACKETLEN = 250
|
|
|
|
def raw_if_socket(intf):
|
|
sock = socket.socket(socket.PF_PACKET, socket.SOCK_RAW,
|
|
socket.htons(HWSIM_ETHERTYPE))
|
|
|
|
sock.bind((intf, HWSIM_ETHERTYPE))
|
|
|
|
return (sock, sock.getsockname()[4])
|
|
|
|
def checksum(buf):
|
|
pairs = zip(buf[0::2], buf[1::2])
|
|
s = sum([(h << 8) + l for h, l in pairs])
|
|
|
|
while s >> 16:
|
|
s = (s & 0xffff) + (s >> 16)
|
|
|
|
return s ^ 0xffff
|
|
|
|
def tx(fromsock, tosock, src, dst):
|
|
frame = b''.join([
|
|
dst, # eth.rmac
|
|
src, # eth.lmac
|
|
struct.pack('!H', HWSIM_ETHERTYPE), # eth.type
|
|
b'\x45', # ip.hdr_len
|
|
b'\x00', # ip.dsfield
|
|
struct.pack('!H', HWSIM_PACKETLEN - 14), # ip.len
|
|
b'\x01\x23', # ip.id
|
|
b'\x40\x00', # ip.flags, ip.frag_offset
|
|
b'\x40', # ip.ttl
|
|
b'\x01', # ip.proto
|
|
struct.pack('>H', 0), # ip.checksum
|
|
socket.inet_aton('192.168.1.1'), # ip.src
|
|
socket.inet_aton('192.168.1.2'), # ip.dst
|
|
bytes(range(0, HWSIM_PACKETLEN - 14 - 20))
|
|
])
|
|
frame = frame[:24] + struct.pack('>H', checksum(frame[14:34])) + frame[26:]
|
|
|
|
fromsock.send(frame)
|
|
|
|
return (frame, fromsock, tosock, src, dst)
|
|
|
|
def test_connected(if0=None, if1=None):
|
|
if if0 is None or if1 is None:
|
|
iwd_list = [dev.name for dev in iwd.IWD.get_instance().list_devices()]
|
|
non_iwd_list = [ifname for w in wiphy.wiphy_map.values()
|
|
for ifname in w.interface_map]
|
|
for intf in iwd_list + non_iwd_list:
|
|
if if0 is None:
|
|
if0 = intf
|
|
elif if1 is None and intf != if0:
|
|
if1 = intf
|
|
|
|
sock0, addr0 = raw_if_socket(if0)
|
|
sock1, addr1 = raw_if_socket(if1)
|
|
bcast = b'\xff\xff\xff\xff\xff\xff'
|
|
|
|
try:
|
|
frames = [
|
|
tx(sock0, sock1, addr0, addr1),
|
|
tx(sock0, sock1, addr0, bcast),
|
|
tx(sock1, sock0, addr1, addr0),
|
|
tx(sock1, sock0, addr1, bcast),
|
|
]
|
|
|
|
rec = [False, False, False, False]
|
|
|
|
while not all(rec):
|
|
r, w, x = select.select([sock0, sock1], [], [], 1.0)
|
|
if not r:
|
|
raise Exception('timeout waiting for packets: ' + repr(rec))
|
|
|
|
for s in r:
|
|
data, src = s.recvfrom(HWSIM_PACKETLEN + 1)
|
|
print('received ' + repr(data[:40]) + '... from ' + str(src))
|
|
if len(data) != HWSIM_PACKETLEN:
|
|
continue
|
|
|
|
idx = 0
|
|
for origdata, fromsock, tosock, origsrc, origdst in frames:
|
|
if s is tosock and src[4] == origsrc and data == origdata:
|
|
print('matches frame ' + str(idx))
|
|
break
|
|
idx += 1
|
|
else:
|
|
print('doesn\'t match any of our frames')
|
|
continue
|
|
|
|
if rec[idx]:
|
|
raise Exception('duplicate frame ' + str(idx))
|
|
|
|
rec[idx] = True
|
|
finally:
|
|
sock0.close()
|
|
sock1.close()
|
|
|
|
def test_ifaces_connected(if0=None, if1=None):
|
|
retry = 0
|
|
while True:
|
|
try:
|
|
test_connected(if0, if1)
|
|
break
|
|
|
|
except Exception as e:
|
|
if retry < 3:
|
|
print('retrying connection test: %i' % retry)
|
|
retry += 1
|
|
continue
|
|
raise e
|
|
|
|
SIOCGIFFLAGS = 0x8913
|
|
IFF_UP = 1 << 0
|
|
IFF_RUNNING = 1 << 6
|
|
|
|
def test_iface_operstate(intf=None):
|
|
if not intf:
|
|
intf = iwd.IWD.get_instance().list_devices()[0].name
|
|
|
|
sock = socket.socket(socket.PF_PACKET, socket.SOCK_RAW)
|
|
|
|
try:
|
|
ifreq = struct.pack('16sh', intf.encode('utf8'), 0)
|
|
flags = struct.unpack('16sh', fcntl.ioctl(sock, SIOCGIFFLAGS, ifreq))[1]
|
|
|
|
# IFF_LOWER_UP and IFF_DORMANT not returned by SIOCGIFFLAGS
|
|
if flags & (IFF_UP | IFF_RUNNING) != IFF_UP | IFF_RUNNING:
|
|
raise Exception(intf + ' operstate wrong')
|
|
finally:
|
|
sock.close()
|