auto-t: Validate netmasks in testNetconfig, add utility

Extend test_ip_address_match to support IPv6 and to test the
netmask/prefix length while it reads the local address since those are
retrieved using the same API.

Modify testNetconfig to validate the prefix lengths, change the prefix
lengths to be less common values (not 24 bits for IPv4 or 64 for IPv6),
minor cleanup.
This commit is contained in:
Andrew Zaborowski 2022-06-16 02:02:24 +02:00 committed by Denis Kenzior
parent 00e41eb0ff
commit 57888632a3
6 changed files with 78 additions and 24 deletions

View File

@ -11,7 +11,7 @@ from iwd import NetworkType
from hostapd import HostapdCLI from hostapd import HostapdCLI
import testutil import testutil
from config import ctx from config import ctx
import os, time import os
import subprocess import subprocess
class Test(unittest.TestCase): class Test(unittest.TestCase):
@ -19,8 +19,9 @@ class Test(unittest.TestCase):
def test_connection_success(self): def test_connection_success(self):
def check_addr(device): def check_addr(device):
try: try:
subprocess.check_output('ip addr show ' + device.name + \ # DHCPv6 addresses always have a prefix length of 128 bits, the actual
' | grep \'inet6 3ffe:501:ffff:100::\'', shell=True) # subnet's prefix length is in the route.
testutil.test_ip_address_match(device.name, '3ffe:501:ffff:100::1', 128, 112)
except: except:
return False return False
@ -49,6 +50,7 @@ class Test(unittest.TestCase):
testutil.test_iface_operstate() testutil.test_iface_operstate()
testutil.test_ifaces_connected() testutil.test_ifaces_connected()
testutil.test_ip_address_match(device.name, '192.168.1.10', 17, 24)
ctx.non_block_wait(check_addr, 10, device, ctx.non_block_wait(check_addr, 10, device,
exception=Exception("IPv6 address was not set")) exception=Exception("IPv6 address was not set"))
@ -78,14 +80,14 @@ class Test(unittest.TestCase):
# TODO: This could be moved into test-runner itself if other tests ever # TODO: This could be moved into test-runner itself if other tests ever
# require this functionality (p2p, FILS, etc.). Since its simple # require this functionality (p2p, FILS, etc.). Since its simple
# enough it can stay here for now. # enough it can stay here for now.
ctx.start_process(['ip', 'addr','add', '192.168.1.1/255.255.255.0', ctx.start_process(['ip', 'addr','add', '192.168.1.1/255.255.128.0',
'dev', hapd.ifname,]).wait() 'dev', hapd.ifname,]).wait()
ctx.start_process(['touch', '/tmp/dhcpd.leases']).wait() ctx.start_process(['touch', '/tmp/dhcpd.leases']).wait()
cls.dhcpd_pid = ctx.start_process(['dhcpd', '-f', '-cf', '/tmp/dhcpd.conf', cls.dhcpd_pid = ctx.start_process(['dhcpd', '-f', '-cf', '/tmp/dhcpd.conf',
'-lf', '/tmp/dhcpd.leases', '-lf', '/tmp/dhcpd.leases',
hapd.ifname], cleanup=remove_lease4) hapd.ifname], cleanup=remove_lease4)
ctx.start_process(['ip', 'addr', 'add', '3ffe:501:ffff:100::1/64', ctx.start_process(['ip', 'addr', 'add', '3ffe:501:ffff:100::1/72',
'dev', hapd.ifname]).wait() 'dev', hapd.ifname]).wait()
ctx.start_process(['touch', '/tmp/dhcpd6.leases']).wait() ctx.start_process(['touch', '/tmp/dhcpd6.leases']).wait()
cls.dhcpd6_pid = ctx.start_process(['dhcpd', '-6', '-f', '-cf', '/tmp/dhcpd-v6.conf', cls.dhcpd6_pid = ctx.start_process(['dhcpd', '-6', '-f', '-cf', '/tmp/dhcpd-v6.conf',

View File

@ -1,4 +1,4 @@
subnet6 3ffe:501:ffff:100::/64 subnet6 3ffe:501:ffff:100::/72
{ {
option dhcp6.name-servers 3ffe:501:ffff:100::1; option dhcp6.name-servers 3ffe:501:ffff:100::1;
range6 3ffe:501:ffff:100::10 3ffe:501:ffff:100::20; range6 3ffe:501:ffff:100::10 3ffe:501:ffff:100::20;

View File

@ -1,14 +1,14 @@
default-lease-time 600; # 10 minutes default-lease-time 600; # 10 minutes
max-lease-time 7200; # 2 hours max-lease-time 7200; # 2 hours
option broadcast-address 192.168.1.255; option broadcast-address 192.168.127.255;
option routers 192.168.1.254; option routers 192.168.1.254;
option subnet-mask 255.255.255.0; option subnet-mask 255.255.128.0;
subnet 192.168.1.0 netmask 255.255.255.0 subnet 192.168.0.0 netmask 255.255.128.0
{ {
option routers 192.168.1.1; option routers 192.168.1.1;
option subnet-mask 255.255.255.0; option subnet-mask 255.255.128.0;
option domain-name-servers 192.168.1.1; option domain-name-servers 192.168.1.1;
range 192.168.1.10 192.168.1.20; range 192.168.1.10 192.168.1.20;
range 192.168.1.100 192.168.1.200; range 192.168.1.100 192.168.1.200;

View File

@ -1,5 +1,6 @@
[IPv4] [IPv4]
Address=192.168.1.10 Address=192.168.1.10
Netmask=255.255.255.128
Gateway=192.168.1.1 Gateway=192.168.1.1
[Settings] [Settings]

View File

@ -45,7 +45,7 @@ class Test(unittest.TestCase):
testutil.test_iface_operstate() testutil.test_iface_operstate()
testutil.test_ifaces_connected() testutil.test_ifaces_connected()
testutil.test_ip_address_match(dev1.name, '192.168.1.10') testutil.test_ip_address_match(dev1.name, '192.168.1.10', 25)
ordered_network = dev2.get_ordered_network('ssidTKIP') ordered_network = dev2.get_ordered_network('ssidTKIP')
@ -79,9 +79,9 @@ class Test(unittest.TestCase):
hapd = HostapdCLI() hapd = HostapdCLI()
# TODO: This could be moved into test-runner itself if other tests ever # TODO: This could be moved into test-runner itself if other tests ever
# require this functionality (p2p, FILS, etc.). Since its simple # require this functionality (p2p, FILS, etc.). Since it's simple
# enough it can stay here for now. # enough it can stay here for now.
ctx.start_process(['ip', 'addr','add', '192.168.1.1/255.255.255.0', ctx.start_process(['ip', 'addr','add', '192.168.1.1/255.255.128.0',
'dev', hapd.ifname]).wait() 'dev', hapd.ifname]).wait()
ctx.start_process(['touch', '/tmp/dhcpd.leases']).wait() ctx.start_process(['touch', '/tmp/dhcpd.leases']).wait()
cls.dhcpd_pid = ctx.start_process(['dhcpd', '-f', '-cf', '/tmp/dhcpd.conf', cls.dhcpd_pid = ctx.start_process(['dhcpd', '-f', '-cf', '/tmp/dhcpd.conf',

View File

@ -4,6 +4,7 @@ import socket
import fcntl import fcntl
import struct import struct
import select import select
import codecs
import iwd import iwd
from config import ctx from config import ctx
@ -131,6 +132,7 @@ def test_ifaces_connected(if0=None, if1=None, group=True, expect_fail=False):
SIOCGIFFLAGS = 0x8913 SIOCGIFFLAGS = 0x8913
SIOCGIFADDR = 0x8915 SIOCGIFADDR = 0x8915
SIOCGIFNETMASK = 0x891b
IFF_UP = 1 << 0 IFF_UP = 1 << 0
IFF_RUNNING = 1 << 6 IFF_RUNNING = 1 << 6
@ -156,19 +158,68 @@ def test_iface_operstate(intf=None):
ctx.non_block_wait(_test_operstate, 10, intf, ctx.non_block_wait(_test_operstate, 10, intf,
exception=Exception(intf + ' operstate wrong')) exception=Exception(intf + ' operstate wrong'))
def test_ip_address_match(intf, ip): def get_addrs6(ifname):
try: f = open('/proc/net/if_inet6', 'r')
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) lines = f.readlines()
addr = fcntl.ioctl(s.fileno(), SIOCGIFADDR, struct.pack('256s', intf.encode('utf-8'))) f.close()
addr = socket.inet_ntoa(addr[20:24]) for line in lines:
except OSError as e: addr_str, _, plen, _, _, addr_ifname = line.split()
if e.errno != 99 or ip != None: if ifname is not None and addr_ifname != ifname:
raise Exception('SIOCGIFADDR failed with %d' % e.errno) continue
return yield (codecs.decode(addr_str, 'hex'), int(plen, 16), addr_ifname)
if ip != addr: def test_ip_address_match(intf, expected_addr_str, expected_plen=None, match_plen=None):
raise Exception('IP for %s did not match %s (was %s)' % (intf, ip, addr)) def mask_addr(addr, plen):
if plen is None or len(addr) * 8 <= plen:
return addr
bytelen = int(plen / 8)
return addr[0:bytelen] + bytes([addr[bytelen] & (0xff00 >> (plen & 7))]) + b'\0' * (len(addr) - bytelen - 1)
if expected_addr_str is not None:
try:
expected_addr = socket.inet_pton(socket.AF_INET, expected_addr_str)
family = socket.AF_INET
except OSError as e:
try:
expected_addr = socket.inet_pton(socket.AF_INET6, expected_addr_str)
family = socket.AF_INET6
except OSError as e2:
raise e2 from None
expected_addr = mask_addr(expected_addr, match_plen)
else:
expected_addr = None
family = socket.AF_INET
if family == socket.AF_INET:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
out = fcntl.ioctl(s.fileno(), SIOCGIFADDR, struct.pack('256s', intf.encode('utf-8')))
actual_addr = mask_addr(out[20:24], match_plen)
out = fcntl.ioctl(s.fileno(), SIOCGIFNETMASK, struct.pack('256s', intf.encode('utf-8')))
actual_plen = sum([sum([(byte >> bit) & 1 for bit in range(0, 8)]) for byte in out[20:24]]) # count bits
except OSError as e:
if e.errno == 99 and expected_addr is None:
return
raise Exception('SIOCGIFADDR/SIOCGIFNETMASK failed with %d' % e.errno)
else:
# The "get" ioctls don't work for IPv6, netdevice(7) recommends reading /proc/net instead,
# which on the other hand works *only* for IPv6
actual_addr = None
actual_plen = None
for addr, plen, _ in get_addrs6(intf):
actual_addr = mask_addr(addr, match_plen)
actual_plen = plen
if actual_addr == expected_addr:
break
if expected_addr != actual_addr:
raise Exception('IP for %s did not match %s (was %s)' %
(intf, expected_addr_str, socket.inet_ntop(family, actual_addr)))
if expected_plen is not None and expected_plen != actual_plen:
raise Exception('Prefix Length for %s did not match %i (was %i)' %
(intf, expected_plen, actual_plen))
def test_ip_connected(tup0, tup1): def test_ip_connected(tup0, tup1):
ip0, ns0 = tup0 ip0, ns0 = tup0