mirror of
https://git.kernel.org/pub/scm/network/wireless/iwd.git
synced 2025-01-05 12:52:37 +01:00
7ecdee6eaa
Make sure we wipe the leases file both for server and client, so that dhclient doesn't try to re-use leases from previous tests (should really happen) and waste time waiting for a reply. Extend the timeout from 1s to 5s, sometimes it takes dhclient 1s just to start. Disable verbose mode if not needed to avoid dhclient stalling if the pipe is not being read.
167 lines
6.3 KiB
Python
167 lines
6.3 KiB
Python
#!/usr/bin/python3
|
|
|
|
import unittest
|
|
import sys
|
|
import os
|
|
import time
|
|
|
|
import iwd
|
|
from iwd import IWD
|
|
import testutil
|
|
from config import ctx
|
|
from wpas import Wpas
|
|
|
|
class Test(unittest.TestCase):
|
|
leases_path = '/tmp/dhcp.lease'
|
|
|
|
def test_1_client_go_neg_responder(self):
|
|
self.p2p_connect_test(preauthorize=False, go=False)
|
|
|
|
def test_2_client_go_neg_initiator(self):
|
|
self.p2p_connect_test(preauthorize=True, go=False)
|
|
|
|
def test_3_go_go_neg_responder(self):
|
|
self.p2p_connect_test(preauthorize=False, go=True)
|
|
|
|
def test_4_go_go_neg_initiator(self):
|
|
self.p2p_connect_test(preauthorize=True, go=True)
|
|
|
|
def p2p_connect_test(self, preauthorize, go):
|
|
wd = IWD()
|
|
wpas = self.wpas = Wpas(p2p=True)
|
|
wpas_go_intent = 10 if not go else 1
|
|
|
|
# Not strictly necessary but prevents the station interface from queuing its scans
|
|
# in the wiphy radio work queue and delaying P2P scans.
|
|
wd.list_devices(1)[0].disconnect()
|
|
|
|
devices = wd.list_p2p_devices(1)
|
|
p2p = self.p2p = devices[0]
|
|
p2p.enabled = True
|
|
p2p.name = 'testdev1'
|
|
|
|
wpas.p2p_find()
|
|
p2p.discovery_request = True
|
|
wd.wait(5)
|
|
wd.wait_for_object_condition(wpas, 'len(obj.p2p_peers) == 1', max_wait=20)
|
|
p2p.discovery_request = False
|
|
wpas.p2p_listen()
|
|
|
|
peers = p2p.get_peers()
|
|
self.assertEqual(len(peers), 1)
|
|
peer = next(iter(peers.values()))
|
|
self.assertEqual(peer.name, wpas.config['device_name'])
|
|
self.assertEqual(peer.category, 'display')
|
|
self.assertEqual(peer.subcategory, 'monitor')
|
|
|
|
wpas_peer = next(iter(wpas.p2p_peers.values()))
|
|
self.assertEqual(wpas_peer['name'], p2p.name)
|
|
self.assertEqual(wpas_peer['pri_dev_type'], '1-0050F204-6') # 1 == Computer, 6 == Desktop
|
|
self.assertEqual(wpas_peer['config_methods'], '0x1080')
|
|
|
|
if preauthorize:
|
|
wpas.p2p_authorize(wpas_peer, go_intent=wpas_go_intent)
|
|
|
|
peer.connect(wait=False)
|
|
|
|
self.assertEqual(len(wpas.p2p_go_neg_requests), 0)
|
|
self.assertEqual(len(wpas.p2p_clients), 0)
|
|
wd.wait_for_object_condition(wpas, 'len(obj.p2p_go_neg_requests) == 1', max_wait=3)
|
|
request = wpas.p2p_go_neg_requests[wpas_peer['p2p_dev_addr']]
|
|
|
|
if not preauthorize:
|
|
self.assertEqual(request['dev_passwd_id'], '4')
|
|
self.assertEqual(request['go_intent'], '2') # Hardcoded in src/p2p.c
|
|
|
|
wpas.p2p_accept_go_neg_request(request, go_intent=wpas_go_intent)
|
|
|
|
wd.wait_for_object_condition(request, '\'success\' in obj', max_wait=3)
|
|
self.assertEqual(request['success'], True)
|
|
self.assertEqual(request['role'], 'GO' if not go else 'client')
|
|
self.assertEqual(request['wps_method'], 'PBC')
|
|
self.assertEqual(request['p2p_dev_addr'], wpas_peer['p2p_dev_addr'])
|
|
|
|
if go:
|
|
# For some reason wpa_supplicant's newly created P2P-client interface doesn't inherit
|
|
# the settings from the main interface which were loaded from the config file
|
|
# (P2P-device and P2P-GO interfaces do), we need to set config_methods again.
|
|
peer_ifname = 'p2p-' + wpas.interface.name + '-0'
|
|
wpas.set('config_methods', wpas.config['config_methods'], ifname=peer_ifname)
|
|
wpas.set('device_name', wpas.config['device_name'], ifname=peer_ifname)
|
|
wpas.set('device_type', wpas.config['device_type'], ifname=peer_ifname)
|
|
|
|
wd.wait_for_object_condition(wpas, 'obj.p2p_group is not None', max_wait=3)
|
|
peer_ifname = wpas.p2p_group['ifname']
|
|
self.assertEqual(wpas.p2p_group['role'], 'GO' if not go else 'client')
|
|
|
|
os.system('> ' + self.leases_path)
|
|
|
|
if not go:
|
|
ctx.start_process(['ip', 'addr', 'add','dev', peer_ifname, '192.168.1.20/255.255.255.0']).wait()
|
|
dhcp = ctx.start_process(['dhcpd', '-f', '-cf', '/tmp/dhcpd.conf', '-lf', self.leases_path, peer_ifname])
|
|
self.dhcp = dhcp
|
|
|
|
wd.wait_for_object_condition(wpas, 'len(obj.p2p_clients) == 1', max_wait=3)
|
|
client = wpas.p2p_clients[request['peer_iface']]
|
|
self.assertEqual(client['p2p_dev_addr'], wpas_peer['p2p_dev_addr'])
|
|
else:
|
|
self.assertEqual(wpas.p2p_group['ip_addr'], '192.168.1.2')
|
|
self.assertEqual(wpas.p2p_group['ip_mask'], '255.255.255.240')
|
|
self.assertEqual(wpas.p2p_group['go_ip_addr'], '192.168.1.1')
|
|
|
|
extra_params = []
|
|
if ctx.is_verbose('dhclient'):
|
|
extra_params.append('-v')
|
|
|
|
dhcp = ctx.start_process(['dhclient', '-d', '--no-pid', '-cf', '/dev/null', '-lf', self.leases_path,
|
|
'-sf', '/tmp/dhclient-script'] + extra_params + [peer_ifname])
|
|
self.dhcp = dhcp
|
|
|
|
wd.wait_for_object_condition(peer, 'obj.connected', max_wait=15)
|
|
time.sleep(5) # Give the client time to set the IP
|
|
testutil.test_ip_address_match(peer_ifname, peer.connected_ip)
|
|
|
|
if not go:
|
|
testutil.test_ip_address_match(peer.connected_interface, '192.168.1.30')
|
|
self.assertEqual(peer.connected_ip, '192.168.1.20')
|
|
else:
|
|
testutil.test_ip_address_match(peer.connected_interface, '192.168.1.1')
|
|
self.assertEqual(peer.connected_ip, '192.168.1.2')
|
|
|
|
testutil.test_iface_operstate(peer.connected_interface)
|
|
testutil.test_ifaces_connected(peer.connected_interface, peer_ifname)
|
|
|
|
peer.disconnect()
|
|
if not go:
|
|
wd.wait_for_object_condition(wpas, 'len(obj.p2p_clients) == 0', max_wait=3)
|
|
else:
|
|
wd.wait_for_object_condition(wpas, 'obj.p2p_group is None', max_wait=15)
|
|
self.assertEqual(peer.connected, False)
|
|
|
|
def setUp(self):
|
|
self.p2p = None
|
|
self.wpas = None
|
|
self.dhcp = None
|
|
|
|
def tearDown(self):
|
|
if self.p2p is not None:
|
|
try:
|
|
self.p2p.enabled = False
|
|
except:
|
|
pass
|
|
if self.wpas is not None:
|
|
self.wpas.clean_up()
|
|
self.wpas = None
|
|
if self.dhcp is not None:
|
|
ctx.stop_process(self.dhcp)
|
|
for path in [self.leases_path, self.leases_path + '~']:
|
|
if os.path.exists(path):
|
|
os.remove(path)
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
IWD.clear_storage()
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main(exit=True)
|