auto-t: Enable test frwk to wait for the radios

Previously, we had to wait for an arbitrary amount of time after
iwd was started form the python scripts to make sure that the
radio objects are available on the D-bus. This patch allows to
wait inside of list_devices() method and get back a list of the
devices once they are available.
This commit is contained in:
Tim Kourt 2018-05-15 11:43:12 -07:00 committed by Denis Kenzior
parent 79ce3f645f
commit 102a455e00
1 changed files with 51 additions and 12 deletions

View File

@ -8,6 +8,7 @@ import sys
import os
import threading
import time
import collections
from abc import ABCMeta, abstractmethod
from enum import Enum
@ -602,6 +603,39 @@ class PSKAgent(dbus.service.Object):
return passwd
class DeviceList(collections.Mapping):
def __init__(self, iwd, objects):
self._dict = {}
iwd._object_manager.connect_to_signal("InterfacesAdded",
self._interfaces_added_handler, IWD_DEVICE_INTERFACE)
iwd._object_manager.connect_to_signal("InterfacesRemoved",
self._interfaces_removed_handler, IWD_DEVICE_INTERFACE)
for path in objects:
for interface in objects[path]:
if interface == IWD_DEVICE_INTERFACE:
self._dict[path] = Device(path, objects[path][interface])
def __getitem__(self, key):
return self._dict.__getitem__(key)
def __iter__(self):
return self._dict.__iter__()
def __len__(self):
return self._dict.__len__()
def __delitem__(self, key):
self._dict.pop(key).remove()
def _interfaces_added_handler(self, path, interfaces):
self._dict[path] = Device(interfaces[IWD_DEVICE_INTERFACE])
def _interfaces_removed_handler(self, path, interfaces):
del _dict[path]
class IWD(AsyncOpAbstract):
''''''
_bus = dbus.SystemBus()
@ -610,6 +644,7 @@ class IWD(AsyncOpAbstract):
_agent_manager_if = None
_known_network_manager_if = None
_iwd_proc = None
_devices = None
def __init__(self, start_iwd_daemon = False,
iwd_config_dir = IWD_CONFIG_DIR):
@ -648,6 +683,9 @@ class IWD(AsyncOpAbstract):
tries += 1
time.sleep(0.05)
self._devices = DeviceList(self,
self._object_manager.GetManagedObjects())
def __del__(self):
if self._iwd_proc is None:
return
@ -727,18 +765,19 @@ class IWD(AsyncOpAbstract):
assert not os.path.isabs(source)
shutil.copy(source, IWD_STORAGE_DIR)
def list_devices(self):
objects = self._object_manager.GetManagedObjects()
devices = None
for path in objects.keys():
interfaces = objects[path]
for interface in interfaces.keys():
if interface in [IWD_DEVICE_INTERFACE]:
if devices is None:
devices = []
device = Device(path)
devices.append(device)
return devices
def list_devices(self, wait_to_appear = False):
if not wait_to_appear:
return list(self._devices.values())
tries = 0
while len(self._devices) == 0:
if tries > 100:
raise TimeoutError('IWD has no associated devices')
tries += 1
time.sleep(0.2)
return list(self._devices.values())
def list_known_networks(self):
'''Returns a list of KnownNetwork objects.'''