autotests: Add P2P GO role test scenarios

autotests/testP2P/client_test.py is renamed to connection_test.py to
reflect that it now tests both roles.
This commit is contained in:
Andrew Zaborowski 2021-06-04 03:50:51 +02:00 committed by Denis Kenzior
parent 0be475af3c
commit e583bc470f
4 changed files with 115 additions and 41 deletions

View File

@ -4,6 +4,7 @@ import unittest
import sys
import netifaces
import os
import time
import iwd
from iwd import IWD
@ -13,14 +14,21 @@ from wpas import Wpas
class Test(unittest.TestCase):
def test_1_client_go_neg_responder(self):
self.p2p_client_test(False)
self.p2p_connect_test(preauthorize=False, go=False)
def test_2_client_go_neg_initiator(self):
self.p2p_client_test(True)
self.p2p_connect_test(preauthorize=True, go=False)
def p2p_client_test(self, preauthorize):
wpas = Wpas(p2p=True)
def test_3_go_go_neg_responder(self):
self.p2p_connect_test(preauthorize=False, go=True)
def test_4_go_go_neg_initiator(self):
self.p2p_connect_test(preauthorize=True, go=True)
def p2p_connect_test(self, preauthorize, go):
wd = IWD()
wpas = Wpas(p2p=True)
wpas_go_intent = 10 if not go else 1
# Not strictly necessary but prevents the station interface from queuing its scans
# in the wiphy radio work queue and delaying P2P scans.
@ -51,7 +59,7 @@ class Test(unittest.TestCase):
self.assertEqual(wpas_peer['config_methods'], '0x1080')
if preauthorize:
wpas.p2p_authorize(wpas_peer)
wpas.p2p_authorize(wpas_peer, go_intent=wpas_go_intent)
peer.connect(wait=False)
@ -64,38 +72,63 @@ class Test(unittest.TestCase):
self.assertEqual(request['dev_passwd_id'], '4')
self.assertEqual(request['go_intent'], '2') # Hardcoded in src/p2p.c
wpas.p2p_accept_go_neg_request(request)
wpas.p2p_accept_go_neg_request(request, go_intent=wpas_go_intent)
wd.wait_for_object_condition(request, '\'success\' in obj', max_wait=3)
self.assertEqual(request['success'], True)
self.assertEqual(request['role'], 'GO')
self.assertEqual(request['role'], 'GO' if not go else 'client')
self.assertEqual(request['wps_method'], 'PBC')
self.assertEqual(request['p2p_dev_addr'], wpas_peer['p2p_dev_addr'])
wd.wait_for_object_condition(wpas, 'obj.p2p_group is not None', max_wait=3)
go_ifname = wpas.p2p_group['ifname']
ctx.start_process(['ifconfig', go_ifname, '192.168.1.20', 'netmask', '255.255.255.0'], wait=True)
os.system('> /tmp/dhcpd.leases')
dhcpd = ctx.start_process(['dhcpd', '-f', '-cf', '/tmp/dhcpd.conf', '-lf', '/tmp/dhcpd.leases', go_ifname])
if go:
# For some reason wpa_supplicant's newly created P2P-client interface doesn't inherit
# the settings from the main interface which were loaded from the config file
# (P2P-device and P2P-GO interfaces do), we need to set config_methods again.
peer_ifname = 'p2p-' + wpas.interface.name + '-0'
wpas.set('config_methods', wpas.config['config_methods'], ifname=peer_ifname)
wpas.set('device_name', wpas.config['device_name'], ifname=peer_ifname)
wpas.set('device_type', wpas.config['device_type'], ifname=peer_ifname)
wd.wait_for_object_condition(wpas, 'len(obj.p2p_clients) == 1', max_wait=3)
client = wpas.p2p_clients[request['peer_iface']]
self.assertEqual(client['p2p_dev_addr'], wpas_peer['p2p_dev_addr'])
wd.wait_for_object_condition(wpas, 'obj.p2p_group is not None', max_wait=3)
peer_ifname = wpas.p2p_group['ifname']
if not go:
ctx.start_process(['ifconfig', peer_ifname, '192.168.1.20', 'netmask', '255.255.255.0'], wait=True)
os.system('> /tmp/dhcpd.leases')
dhcp = ctx.start_process(['dhcpd', '-f', '-cf', '/tmp/dhcpd.conf', '-lf', '/tmp/dhcpd.leases', peer_ifname])
wd.wait_for_object_condition(wpas, 'len(obj.p2p_clients) == 1', max_wait=3)
client = wpas.p2p_clients[request['peer_iface']]
self.assertEqual(client['p2p_dev_addr'], wpas_peer['p2p_dev_addr'])
else:
dhcp = ctx.start_process(['dhclient', '-v', '-d', '--no-pid', '-cf', '/dev/null', '-lf', '/tmp/dhcpd.leases',
'-sf', '/tmp/dhclient-script', peer_ifname])
wd.wait_for_object_condition(peer, 'obj.connected', max_wait=15)
time.sleep(1) # Give the client time to set the IP
our_ip = netifaces.ifaddresses(peer.connected_interface)[netifaces.AF_INET][0]['addr']
self.assertEqual(peer.connected_ip, '192.168.1.20')
self.assertEqual(our_ip, '192.168.1.30')
peer_ip = netifaces.ifaddresses(peer_ifname)[netifaces.AF_INET][0]['addr']
self.assertEqual(peer.connected_ip, peer_ip)
if not go:
self.assertEqual(our_ip, '192.168.1.30')
self.assertEqual(peer_ip, '192.168.1.20')
else:
self.assertEqual(our_ip, '192.168.1.1')
self.assertEqual(peer_ip, '192.168.1.2')
testutil.test_iface_operstate(peer.connected_interface)
testutil.test_ifaces_connected(peer.connected_interface, go_ifname)
testutil.test_ifaces_connected(peer.connected_interface, peer_ifname)
peer.disconnect()
wd.wait_for_object_condition(wpas, 'len(obj.p2p_clients) == 0', max_wait=3)
if not go:
wd.wait_for_object_condition(wpas, 'len(obj.p2p_clients) == 0', max_wait=3)
else:
wd.wait_for_object_condition(wpas, 'obj.p2p_group is None', max_wait=3)
self.assertEqual(peer.connected, False)
p2p.enabled = False
ctx.stop_process(dhcpd)
ctx.stop_process(dhcp)
wpas.clean_up()
@classmethod

View File

@ -0,0 +1,17 @@
#! /bin/bash
case $reason in
BOUND|RENEW|REBIND|REBOOT)
if [ x$new_ip_address != x ] && \
[ x$new_ip_address != x$old_ip_address -o \
x$reason = xBOUND -o x$reason = xREBOOT ]; then
/sbin/ip addr add $new_ip_address/${new_subnet_mask:-32} \
${new_broadcast_arg} \
dev $interface
fi
;;
EXPIRE|FAIL|RELEASE|STOP)
if [ x$old_ip_address != x ]; then
/sbin/ip -4 addr flush dev $interface
fi
;;
esac

View File

@ -4,3 +4,6 @@ EnableNetworkConfiguration=true
[P2P]
# Something different from the default of 'pc' for validation
DeviceType=desktop
[IPv4]
APAddressPool=192.168.1.0/28

View File

@ -8,13 +8,11 @@ ctrl_count = 0
class Wpas:
def _start_wpas(self, config_name=None, p2p=False):
global ctrl_count
main_interface = None
for interface in ctx.wpas_interfaces:
if config_name is None or interface.config == config_name:
if main_interface is not None:
raise Exception('More than was wpa_supplicant interface matches given config')
raise Exception('More than one wpa_supplicant interface matches given config')
main_interface = interface
if main_interface is None:
@ -37,15 +35,9 @@ class Wpas:
self.wpa_supplicant = ctx.start_process(cmd)
self.local_ctrl = '/tmp/wpas_' + str(os.getpid()) + '_' + str(ctrl_count)
ctrl_count = ctrl_count + 1
self.ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
self.ctrl_sock.bind(self.local_ctrl)
self.remote_ctrl = self.socket_path + '/' + self.ifname
self.wpa_supplicant.wait_for_socket(self.remote_ctrl, 2)
self.ctrl_sock.connect(self.remote_ctrl)
self.io_watch = GLib.io_add_watch(self.ctrl_sock, GLib.IO_IN, self._handle_data_in)
self.sockets = {}
self.cleanup_paths = []
self.io_watch = GLib.io_add_watch(self._get_socket(), GLib.IO_IN, self._handle_data_in)
self.p2p_peers = {}
self.p2p_go_neg_requests = {}
@ -173,11 +165,34 @@ class Wpas:
return True
def _ctrl_request(self, command, timeout=10):
def _ctrl_request(self, command, ifname=None):
if type(command) is str:
command = str.encode(command)
self.ctrl_sock.send(bytes(command))
self._get_socket(ifname).send(bytes(command))
def _get_socket(self, ifname=None):
global ctrl_count
if ifname is None:
ifname = self.ifname
if ifname in self.sockets:
return self.sockets[ifname]
local_path = '/tmp/wpas_' + str(os.getpid()) + '_' + str(ctrl_count)
ctrl_count = ctrl_count + 1
remote_path = self.socket_path + '/' + ifname
self.wpa_supplicant.wait_for_socket(remote_path, 2)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.bind(local_path)
self.cleanup_paths.append(local_path)
sock.connect(remote_path)
self.cleanup_paths.append(remote_path)
self.sockets[ifname] = sock
return sock
# Normal find phase with listen and active scan states
def p2p_find(self):
@ -219,6 +234,12 @@ class Wpas:
('' if go_intent is None else ' go_intent=' + str(go_intent)) + ' auth')
self.wait_for_event('OK')
def p2p_set(self, key, value, **kwargs):
self._ctrl_request('P2P_SET ' + key + ' ' + value, **kwargs)
def set(self, key, value, **kwargs):
self._ctrl_request('SET ' + key + ' ' + value, **kwargs)
# Probably needed: remove references to self so that the GC can call __del__ automatically
def clean_up(self):
if self.io_watch is not None:
@ -227,16 +248,16 @@ class Wpas:
if self.wpa_supplicant is not None:
ctx.stop_process(self.wpa_supplicant)
self.wpa_supplicant = None
for path in self.cleanup_paths:
if os.path.exists(path):
os.remove(path)
self.cleanup_paths = []
def _stop_wpas(self):
self.clean_up()
if self.ctrl_sock:
self.ctrl_sock.close()
self.ctrl_sock = None
if os.path.exists(self.remote_ctrl):
os.remove(self.remote_ctrl)
if os.path.exists(self.local_ctrl):
os.remove(self.local_ctrl)
for ifname in self.sockets:
self.sockets[ifname].close()
self.sockets = {}
def __del__(self):
self._stop_wpas()