auto-t: add APIs for PKEX

Also added some sanity checks to the existing DPP APIs to make
sure started/role gets set correctly.
This commit is contained in:
James Prestwood 2023-11-08 09:21:54 -08:00 committed by Denis Kenzior
parent 809ddcebbd
commit 6f7384d5c7
1 changed files with 150 additions and 3 deletions

View File

@ -42,6 +42,8 @@ IWD_P2P_SERVICE_MANAGER_INTERFACE = 'net.connman.iwd.p2p.ServiceManager'
IWD_P2P_WFD_INTERFACE = 'net.connman.iwd.p2p.Display'
IWD_STATION_DEBUG_INTERFACE = 'net.connman.iwd.StationDebug'
IWD_DPP_INTERFACE = 'net.connman.iwd.DeviceProvisioning'
IWD_DPP_PKEX_INTERFACE = 'net.connman.iwd.SharedCodeDeviceProvisioning'
IWD_SHARED_CODE_AGENT_INTERFACE = 'net.connman.iwd.SharedCodeAgent'
IWD_AGENT_MANAGER_PATH = '/net/connman/iwd'
IWD_TOP_LEVEL_PATH = '/'
@ -212,6 +214,33 @@ class SignalAgent(dbus.service.Object):
def handle_new_level(self, path, level):
pass
class SharedCodeAgent(dbus.service.Object):
def __init__(self, codes = {}):
self._path = '/test/agent/' + str(int(round(time.time() * 1000)))
self._codes = codes
dbus.service.Object.__init__(self, ctx.get_bus(), self._path)
@property
def path(self):
return self._path
@dbus.service.method(IWD_SHARED_CODE_AGENT_INTERFACE,
in_signature='', out_signature='')
def Release(self):
print("SharedCodeAgent released")
@dbus.service.method(IWD_SHARED_CODE_AGENT_INTERFACE,
in_signature='s', out_signature='s')
def RequestSharedCode(self, identifier):
print("SharedCodeAgent request for %s" % identifier)
code = self._codes.get(identifier, None)
if not code:
return NotFoundEx("No code found for %s" % identifier)
return code
class AdHocDevice(IWDDBusAbstract):
'''
Class represents an AdHoc device object: net.connman.iwd.AdHoc
@ -299,6 +328,49 @@ class DeviceProvisioning(IWDDBusAbstract):
def role(self):
return self._properties['Role']
class SharedCodeDeviceProvisioning(IWDDBusAbstract):
_iface_name = IWD_DPP_PKEX_INTERFACE
def start_enrollee(self, code, identifier=None):
args = {
"Code": code
}
if identifier:
args["Identifier"] = identifier
self._iface.StartEnrollee(args)
def start_configurator(self, path):
self._iface.StartConfigurator(dbus.ObjectPath(path))
def configure_enrollee(self, code, identifier=None):
args = {
"Code": code
}
if identifier:
args["Identifier"] = identifier
self._iface.ConfigureEnrollee(args)
def stop(self):
self._iface.Stop()
def register_agent(self, path):
self._iface.RegisterSharedCodeAgent(path)
def unregister_agent(self):
self._iface.UnregisterSharedCodeAgent()
@property
def started(self):
return self._properties['Started']
@property
def role(self):
return self._properties['Role']
class AccessPointDevice(IWDDBusAbstract):
'''
Class represents net.connman.iwd.AccessPoint
@ -375,6 +447,7 @@ class Device(IWDDBusAbstract):
self._station_props = None
self._station_debug_obj = None
self._dpp_obj = None
self._sc_dpp_obj = None
self._ap_obj = None
IWDDBusAbstract.__init__(self, *args, **kwargs)
@ -407,6 +480,17 @@ class Device(IWDDBusAbstract):
return self._dpp_obj
@property
def _sc_device_provisioning(self):
if self._properties['Mode'] != 'station':
self._prop_proxy.Set(IWD_DEVICE_INTERFACE, 'Mode', 'station')
if self._sc_dpp_obj is None:
self._sc_dpp_obj = SharedCodeDeviceProvisioning(
object_path=self._object_path,
namespace=self._namespace)
return self._sc_dpp_obj
@property
def _station_debug(self):
if self._properties['Mode'] != 'station':
@ -774,13 +858,76 @@ class Device(IWDDBusAbstract):
self._station_debug.wait_for_event(event, timeout)
def dpp_start_enrollee(self):
return self._device_provisioning.start_enrollee()
ret = self._device_provisioning.start_enrollee()
condition = 'obj.started == True'
IWD._wait_for_object_condition(self._device_provisioning, condition)
condition = 'obj.role == "enrollee"'
IWD._wait_for_object_condition(self._device_provisioning, condition)
return ret
def dpp_start_configurator(self, uri=None):
return self._device_provisioning.start_configurator(uri)
ret = self._device_provisioning.start_configurator(uri)
condition = 'obj.started == True'
IWD._wait_for_object_condition(self._device_provisioning, condition)
condition = 'obj.role == "configurator"'
IWD._wait_for_object_condition(self._device_provisioning, condition)
return ret
def dpp_pkex_enroll(self, *args, **kwargs):
ret = self._sc_device_provisioning.start_enrollee(*args, **kwargs)
condition = 'obj.started == True'
IWD._wait_for_object_condition(self._sc_device_provisioning, condition)
condition = 'obj.role == "enrollee"'
IWD._wait_for_object_condition(self._sc_device_provisioning, condition)
return ret
def dpp_pkex_start_configurator(self, *args, **kwargs):
ret = self._sc_device_provisioning.start_configurator(*args, **kwargs)
condition = 'obj.started == True'
IWD._wait_for_object_condition(self._sc_device_provisioning, condition)
condition = 'obj.role == "configurator"'
IWD._wait_for_object_condition(self._sc_device_provisioning, condition)
return ret
def dpp_pkex_configure_enrollee(self, *args, **kwargs):
ret = self._sc_device_provisioning.configure_enrollee(*args, **kwargs)
condition = 'obj.started == True'
IWD._wait_for_object_condition(self._sc_device_provisioning, condition)
condition = 'obj.role == "configurator"'
IWD._wait_for_object_condition(self._sc_device_provisioning, condition)
return ret
def dpp_pkex_stop(self):
ret = self._sc_device_provisioning.stop()
condition = 'obj.started == False'
IWD._wait_for_object_condition(self._sc_device_provisioning, condition)
return ret
def dpp_pkex_register_agent(self, path):
self._sc_device_provisioning.register_agent(path)
def dpp_pkex_unregister_agent(self):
self._sc_device_provisioning.unregister_agent()
def dpp_stop(self):
return self._device_provisioning.stop()
ret = self._device_provisioning.stop()
condition = 'obj.started == False'
IWD._wait_for_object_condition(self._device_provisioning, condition)
return ret
def __str__(self, prefix = ''):
s = prefix + 'Device: ' + self.device_path + '\n'\