iwd/autotests/testP2P/connection_test.py

174 lines
6.7 KiB
Python

#!/usr/bin/python3
import unittest
import sys
import os
import time
import iwd
from iwd import IWD
import testutil
from config import ctx
from wpas import Wpas
from utils import Process
class Test(unittest.TestCase):
leases_path = '/tmp/dhcp.lease'
def test_1_client_go_neg_responder(self):
self.p2p_connect_test(preauthorize=False, go=False)
def test_2_client_go_neg_initiator(self):
self.p2p_connect_test(preauthorize=True, go=False)
def test_3_go_go_neg_responder(self):
self.p2p_connect_test(preauthorize=False, go=True)
def test_4_go_go_neg_initiator(self):
self.p2p_connect_test(preauthorize=True, go=True)
def p2p_connect_test(self, preauthorize, go):
wpas = self.wpas = Wpas(p2p=True)
wd = IWD()
wpas_go_intent = 10 if not go else 1
# Not strictly necessary but prevents the station interface from queuing its scans
# in the wiphy radio work queue and delaying P2P scans.
wd.list_devices(1)[0].disconnect()
devices = wd.list_p2p_devices(1)
p2p = self.p2p = devices[0]
p2p.enabled = True
p2p.name = 'testdev1'
wpas.p2p_find()
p2p.discovery_request = True
wd.wait(5)
wd.wait_for_object_condition(wpas, 'len(obj.p2p_peers) == 1', max_wait=20)
p2p.discovery_request = False
wpas.p2p_listen()
peers = p2p.get_peers()
self.assertEqual(len(peers), 1)
peer = next(iter(peers.values()))
self.assertEqual(peer.name, wpas.config['device_name'])
self.assertEqual(peer.category, 'display')
self.assertEqual(peer.subcategory, 'monitor')
wpas_peer = next(iter(wpas.p2p_peers.values()))
self.assertEqual(wpas_peer['name'], p2p.name)
self.assertEqual(wpas_peer['pri_dev_type'], '1-0050F204-6') # 1 == Computer, 6 == Desktop
self.assertEqual(wpas_peer['config_methods'], '0x1080')
if preauthorize:
wpas.p2p_authorize(wpas_peer, go_intent=wpas_go_intent)
peer.connect(wait=False)
self.assertEqual(len(wpas.p2p_go_neg_requests), 0)
self.assertEqual(len(wpas.p2p_clients), 0)
wd.wait_for_object_condition(wpas, 'len(obj.p2p_go_neg_requests) == 1', max_wait=3)
request = wpas.p2p_go_neg_requests[wpas_peer['p2p_dev_addr']]
if not preauthorize:
self.assertEqual(request['dev_passwd_id'], '4')
self.assertEqual(request['go_intent'], '2') # Hardcoded in src/p2p.c
wpas.p2p_accept_go_neg_request(request, go_intent=wpas_go_intent)
wd.wait_for_object_condition(request, '\'success\' in obj', max_wait=3)
self.assertEqual(request['success'], True)
self.assertEqual(request['role'], 'GO' if not go else 'client')
self.assertEqual(request['wps_method'], 'PBC')
self.assertEqual(request['p2p_dev_addr'], wpas_peer['p2p_dev_addr'])
if go:
# For some reason wpa_supplicant's newly created P2P-client interface doesn't inherit
# the settings from the main interface which were loaded from the config file
# (P2P-device and P2P-GO interfaces do), we need to set config_methods again.
#
# Additionally since hostap commit 922fa099721903b106a7bc1ccd1ffe8c4a7bce69,
# just setting config_methods to the same value we used last time is detected as
# a NOOP so we need to set it to a different value first for it to take effect.
peer_ifname = 'p2p-' + wpas.interface.name + '-0'
wpas.set('config_methods', '', ifname=peer_ifname)
wpas.set('config_methods', wpas.config['config_methods'], ifname=peer_ifname)
wpas.set('device_name', wpas.config['device_name'], ifname=peer_ifname)
wpas.set('device_type', wpas.config['device_type'], ifname=peer_ifname)
wd.wait_for_object_condition(wpas, 'obj.p2p_group is not None', max_wait=3)
peer_ifname = wpas.p2p_group['ifname']
self.assertEqual(wpas.p2p_group['role'], 'GO' if not go else 'client')
os.system('> ' + self.leases_path)
if not go:
ctx.start_process(['ip', 'addr', 'add','dev', peer_ifname, '192.168.1.20/255.255.255.0']).wait()
dhcp = ctx.start_process(['dhcpd', '-f', '-cf', '/tmp/dhcpd.conf', '-lf', self.leases_path, peer_ifname])
self.dhcp = dhcp
wd.wait_for_object_condition(wpas, 'len(obj.p2p_clients) == 1', max_wait=3)
client = wpas.p2p_clients[request['peer_iface']]
self.assertEqual(client['p2p_dev_addr'], wpas_peer['p2p_dev_addr'])
else:
self.assertEqual(wpas.p2p_group['ip_addr'], '192.168.1.2')
self.assertEqual(wpas.p2p_group['ip_mask'], '255.255.255.240')
self.assertEqual(wpas.p2p_group['go_ip_addr'], '192.168.1.1')
extra_params = []
if Process.is_verbose('dhclient'):
extra_params.append('-v')
dhcp = ctx.start_process(['dhclient', '-d', '--no-pid', '-cf', '/dev/null', '-lf', self.leases_path,
'-sf', '/tmp/dhclient-script'] + extra_params + [peer_ifname])
self.dhcp = dhcp
wd.wait_for_object_condition(peer, 'obj.connected', max_wait=15)
time.sleep(5) # Give the client time to set the IP
testutil.test_ip_address_match(peer_ifname, peer.connected_ip)
if not go:
testutil.test_ip_address_match(peer.connected_interface, '192.168.1.30')
self.assertEqual(peer.connected_ip, '192.168.1.20')
else:
testutil.test_ip_address_match(peer.connected_interface, '192.168.1.1')
self.assertEqual(peer.connected_ip, '192.168.1.2')
testutil.test_iface_operstate(peer.connected_interface)
testutil.test_ifaces_connected(peer.connected_interface, peer_ifname)
peer.disconnect()
if not go:
wd.wait_for_object_condition(wpas, 'len(obj.p2p_clients) == 0', max_wait=3)
else:
wd.wait_for_object_condition(wpas, 'obj.p2p_group is None', max_wait=15)
self.assertEqual(peer.connected, False)
def setUp(self):
self.p2p = None
self.wpas = None
self.dhcp = None
def tearDown(self):
if self.p2p is not None:
try:
self.p2p.enabled = False
except:
pass
if self.wpas is not None:
self.wpas.clean_up()
self.wpas = None
if self.dhcp is not None:
ctx.stop_process(self.dhcp)
for path in [self.leases_path, self.leases_path + '~']:
if os.path.exists(path):
os.remove(path)
@classmethod
def tearDownClass(cls):
IWD.clear_storage()
if __name__ == '__main__':
unittest.main(exit=True)