autotest: Improve cleanup in testP2P

Make sure some of the processes and files created are also cleaned up on
failure so that the next chance has a chance to succeed anyway.
This commit is contained in:
Andrew Zaborowski 2021-06-10 23:03:52 +02:00 committed by Denis Kenzior
parent 6d47354e63
commit 2a37dba4bf
2 changed files with 27 additions and 15 deletions

View File

@ -27,7 +27,7 @@ class Test(unittest.TestCase):
def p2p_connect_test(self, preauthorize, go):
wd = IWD()
wpas = Wpas(p2p=True)
wpas = self.wpas = Wpas(p2p=True)
wpas_go_intent = 10 if not go else 1
# Not strictly necessary but prevents the station interface from queuing its scans
@ -35,7 +35,7 @@ class Test(unittest.TestCase):
wd.list_devices(1)[0].disconnect()
devices = wd.list_p2p_devices(1)
p2p = devices[0]
p2p = self.p2p = devices[0]
p2p.enabled = True
p2p.name = 'testdev1'
@ -94,15 +94,17 @@ class Test(unittest.TestCase):
if not go:
ctx.start_process(['ifconfig', peer_ifname, '192.168.1.20', 'netmask', '255.255.255.0'], wait=True)
os.system('> /tmp/dhcpd.leases')
dhcp = ctx.start_process(['dhcpd', '-f', '-cf', '/tmp/dhcpd.conf', '-lf', '/tmp/dhcpd.leases', peer_ifname])
os.system('> /tmp/dhcp.leases')
dhcp = ctx.start_process(['dhcpd', '-f', '-cf', '/tmp/dhcpd.conf', '-lf', '/tmp/dhcp.leases', peer_ifname])
self.dhcp = dhcp
wd.wait_for_object_condition(wpas, 'len(obj.p2p_clients) == 1', max_wait=3)
client = wpas.p2p_clients[request['peer_iface']]
self.assertEqual(client['p2p_dev_addr'], wpas_peer['p2p_dev_addr'])
else:
dhcp = ctx.start_process(['dhclient', '-v', '-d', '--no-pid', '-cf', '/dev/null', '-lf', '/tmp/dhcpd.leases',
dhcp = ctx.start_process(['dhclient', '-v', '-d', '--no-pid', '-cf', '/dev/null', '-lf', '/tmp/dhcp.leases',
'-sf', '/tmp/dhclient-script', peer_ifname])
self.dhcp = dhcp
wd.wait_for_object_condition(peer, 'obj.connected', max_wait=15)
time.sleep(1) # Give the client time to set the IP
@ -127,9 +129,22 @@ class Test(unittest.TestCase):
wd.wait_for_object_condition(wpas, 'obj.p2p_group is None', max_wait=3)
self.assertEqual(peer.connected, False)
p2p.enabled = False
ctx.stop_process(dhcp)
wpas.clean_up()
def setUp(self):
self.p2p = None
self.wpas = None
self.dhcp = None
def tearDown(self):
if self.p2p is not None:
self.p2p.enabled = False
if self.wpas is not None:
self.wpas.clean_up()
self.wpas = None
if self.dhcp is not None:
ctx.stop_process(self.dhcp)
for path in ['/tmp/dhcp.leases']:
if os.path.exists(path):
os.remove(path)
@classmethod
def tearDownClass(cls):

View File

@ -245,6 +245,9 @@ class Wpas:
if self.io_watch is not None:
GLib.source_remove(self.io_watch)
self.io_watch = None
for ifname in self.sockets:
self.sockets[ifname].close()
self.sockets = {}
if self.wpa_supplicant is not None:
ctx.stop_process(self.wpa_supplicant)
self.wpa_supplicant = None
@ -253,11 +256,5 @@ class Wpas:
os.remove(path)
self.cleanup_paths = []
def _stop_wpas(self):
self.clean_up()
for ifname in self.sockets:
self.sockets[ifname].close()
self.sockets = {}
def __del__(self):
self._stop_wpas()
self.clean_up()