3
0
mirror of https://git.kernel.org/pub/scm/network/wireless/iwd.git synced 2024-12-28 20:32:32 +01:00
iwd/autotests/testP2P/connection_test.py
Andrew Zaborowski 7ecdee6eaa autotests: p2p: Make waiting for dhclient more reliable
Make sure we wipe the leases file both for server and client, so that
dhclient doesn't try to re-use leases from previous tests (should really
happen) and waste time waiting for a reply.  Extend the timeout from 1s
to 5s, sometimes it takes dhclient 1s just to start.  Disable verbose
mode if not needed to avoid dhclient stalling if the pipe is not being
read.
2021-11-30 15:49:08 -06:00

167 lines
6.3 KiB
Python

#!/usr/bin/python3
import unittest
import sys
import os
import time
import iwd
from iwd import IWD
import testutil
from config import ctx
from wpas import Wpas
class Test(unittest.TestCase):
leases_path = '/tmp/dhcp.lease'
def test_1_client_go_neg_responder(self):
self.p2p_connect_test(preauthorize=False, go=False)
def test_2_client_go_neg_initiator(self):
self.p2p_connect_test(preauthorize=True, go=False)
def test_3_go_go_neg_responder(self):
self.p2p_connect_test(preauthorize=False, go=True)
def test_4_go_go_neg_initiator(self):
self.p2p_connect_test(preauthorize=True, go=True)
def p2p_connect_test(self, preauthorize, go):
wd = IWD()
wpas = self.wpas = Wpas(p2p=True)
wpas_go_intent = 10 if not go else 1
# Not strictly necessary but prevents the station interface from queuing its scans
# in the wiphy radio work queue and delaying P2P scans.
wd.list_devices(1)[0].disconnect()
devices = wd.list_p2p_devices(1)
p2p = self.p2p = devices[0]
p2p.enabled = True
p2p.name = 'testdev1'
wpas.p2p_find()
p2p.discovery_request = True
wd.wait(5)
wd.wait_for_object_condition(wpas, 'len(obj.p2p_peers) == 1', max_wait=20)
p2p.discovery_request = False
wpas.p2p_listen()
peers = p2p.get_peers()
self.assertEqual(len(peers), 1)
peer = next(iter(peers.values()))
self.assertEqual(peer.name, wpas.config['device_name'])
self.assertEqual(peer.category, 'display')
self.assertEqual(peer.subcategory, 'monitor')
wpas_peer = next(iter(wpas.p2p_peers.values()))
self.assertEqual(wpas_peer['name'], p2p.name)
self.assertEqual(wpas_peer['pri_dev_type'], '1-0050F204-6') # 1 == Computer, 6 == Desktop
self.assertEqual(wpas_peer['config_methods'], '0x1080')
if preauthorize:
wpas.p2p_authorize(wpas_peer, go_intent=wpas_go_intent)
peer.connect(wait=False)
self.assertEqual(len(wpas.p2p_go_neg_requests), 0)
self.assertEqual(len(wpas.p2p_clients), 0)
wd.wait_for_object_condition(wpas, 'len(obj.p2p_go_neg_requests) == 1', max_wait=3)
request = wpas.p2p_go_neg_requests[wpas_peer['p2p_dev_addr']]
if not preauthorize:
self.assertEqual(request['dev_passwd_id'], '4')
self.assertEqual(request['go_intent'], '2') # Hardcoded in src/p2p.c
wpas.p2p_accept_go_neg_request(request, go_intent=wpas_go_intent)
wd.wait_for_object_condition(request, '\'success\' in obj', max_wait=3)
self.assertEqual(request['success'], True)
self.assertEqual(request['role'], 'GO' if not go else 'client')
self.assertEqual(request['wps_method'], 'PBC')
self.assertEqual(request['p2p_dev_addr'], wpas_peer['p2p_dev_addr'])
if go:
# For some reason wpa_supplicant's newly created P2P-client interface doesn't inherit
# the settings from the main interface which were loaded from the config file
# (P2P-device and P2P-GO interfaces do), we need to set config_methods again.
peer_ifname = 'p2p-' + wpas.interface.name + '-0'
wpas.set('config_methods', wpas.config['config_methods'], ifname=peer_ifname)
wpas.set('device_name', wpas.config['device_name'], ifname=peer_ifname)
wpas.set('device_type', wpas.config['device_type'], ifname=peer_ifname)
wd.wait_for_object_condition(wpas, 'obj.p2p_group is not None', max_wait=3)
peer_ifname = wpas.p2p_group['ifname']
self.assertEqual(wpas.p2p_group['role'], 'GO' if not go else 'client')
os.system('> ' + self.leases_path)
if not go:
ctx.start_process(['ip', 'addr', 'add','dev', peer_ifname, '192.168.1.20/255.255.255.0']).wait()
dhcp = ctx.start_process(['dhcpd', '-f', '-cf', '/tmp/dhcpd.conf', '-lf', self.leases_path, peer_ifname])
self.dhcp = dhcp
wd.wait_for_object_condition(wpas, 'len(obj.p2p_clients) == 1', max_wait=3)
client = wpas.p2p_clients[request['peer_iface']]
self.assertEqual(client['p2p_dev_addr'], wpas_peer['p2p_dev_addr'])
else:
self.assertEqual(wpas.p2p_group['ip_addr'], '192.168.1.2')
self.assertEqual(wpas.p2p_group['ip_mask'], '255.255.255.240')
self.assertEqual(wpas.p2p_group['go_ip_addr'], '192.168.1.1')
extra_params = []
if ctx.is_verbose('dhclient'):
extra_params.append('-v')
dhcp = ctx.start_process(['dhclient', '-d', '--no-pid', '-cf', '/dev/null', '-lf', self.leases_path,
'-sf', '/tmp/dhclient-script'] + extra_params + [peer_ifname])
self.dhcp = dhcp
wd.wait_for_object_condition(peer, 'obj.connected', max_wait=15)
time.sleep(5) # Give the client time to set the IP
testutil.test_ip_address_match(peer_ifname, peer.connected_ip)
if not go:
testutil.test_ip_address_match(peer.connected_interface, '192.168.1.30')
self.assertEqual(peer.connected_ip, '192.168.1.20')
else:
testutil.test_ip_address_match(peer.connected_interface, '192.168.1.1')
self.assertEqual(peer.connected_ip, '192.168.1.2')
testutil.test_iface_operstate(peer.connected_interface)
testutil.test_ifaces_connected(peer.connected_interface, peer_ifname)
peer.disconnect()
if not go:
wd.wait_for_object_condition(wpas, 'len(obj.p2p_clients) == 0', max_wait=3)
else:
wd.wait_for_object_condition(wpas, 'obj.p2p_group is None', max_wait=15)
self.assertEqual(peer.connected, False)
def setUp(self):
self.p2p = None
self.wpas = None
self.dhcp = None
def tearDown(self):
if self.p2p is not None:
try:
self.p2p.enabled = False
except:
pass
if self.wpas is not None:
self.wpas.clean_up()
self.wpas = None
if self.dhcp is not None:
ctx.stop_process(self.dhcp)
for path in [self.leases_path, self.leases_path + '~']:
if os.path.exists(path):
os.remove(path)
@classmethod
def tearDownClass(cls):
IWD.clear_storage()
if __name__ == '__main__':
unittest.main(exit=True)