3
0
mirror of https://git.kernel.org/pub/scm/network/wireless/iwd.git synced 2025-01-12 19:32:36 +01:00
iwd/test/connect-p2p
2020-07-13 14:15:24 -05:00

156 lines
4.3 KiB
Python
Executable File

#!/usr/bin/python3
import sys
import dbus
import dbus.mainloop.glib
from gi.repository import GLib
import fnmatch
if len(sys.argv) not in [1, 2]:
print("Usage: %s [<name_mask>]" % (sys.argv[0],))
sys.exit(1)
name_mask = None
if len(sys.argv) >= 2:
name_mask = sys.argv[1]
DEVICE_IF = 'net.connman.iwd.p2p.Device'
PEER_IF = 'net.connman.iwd.p2p.Peer'
WSC_IF = 'net.connman.iwd.SimpleConfiguration'
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object('net.connman.iwd', '/'),
'org.freedesktop.DBus.ObjectManager')
objects = manager.GetManagedObjects()
p2p_path = None
peers = {}
connecting = False
mainloop = None
for path in objects:
if DEVICE_IF in objects[path] and p2p_path is None:
p2p_path = path
if p2p_path is None:
print('No P2P interfaces')
sys.exit(-1)
values = objects[p2p_path][DEVICE_IF]
properties = dbus.Interface(bus.get_object('net.connman.iwd', p2p_path),
'org.freedesktop.DBus.Properties')
p2p = dbus.Interface(bus.get_object('net.connman.iwd', p2p_path), DEVICE_IF)
def list_peers():
global peers
print('We have the following P2P peers')
for path in peers:
result = peers[path]
print(' ' + result['Name'] + ' at ' + path)
def connect(path):
global connecting, peers, mainloop
list_peers()
peer = dbus.Interface(bus.get_object('net.connman.iwd', path), PEER_IF)
peer_wsc = dbus.Interface(bus.get_object('net.connman.iwd', path), WSC_IF)
peer_props = peers[path]
print('Connecting to ' + peer_props['Name'])
connecting = True
try:
peer_wsc.PushButton(timeout=120)
print('New connection')
except Exception as e:
connecting = False
print('Connection error: ' + repr(e))
if mainloop:
mainloop.quit()
else:
sys.exit(0)
def properties_changed(interface, changed, invalidated, path):
global peers
path = str(path)
if interface != PEER_IF or path not in peers:
return
peers[path].update(changed)
for k in invalidated:
del peers[path][k]
def interfaces_added(path, interfaces):
global connecting, peers
if PEER_IF not in interfaces:
return
path = str(path)
peers[path] = interfaces[PEER_IF]
print('Added peer ' + str(peers[path]['Name']))
if connecting or name_mask is None or not fnmatch.fnmatch(peers[path]['Name'], name_mask):
return
connect(path)
def interfaces_removed(path, interfaces):
global peers
if PEER_IF not in interfaces:
return
print('Removed peer ' + str(peers[path]['Name']))
del peers[path]
if not values['Enabled']:
properties.Set(DEVICE_IF, 'Enabled', True)
if not values['AvailableConnections']:
# See if we have peers we can disconnect or where we can cancel an ongoing
# connection that may be in the 120s GO Negotiation timeout for example.
# Should we have a Connecting property on the WSC interface so we can know
# whether a connection is in progress?
for path in objects:
if PEER_IF not in objects[path]:
continue
peer_values = objects[path][PEER_IF]
print('Trying to disconnect from peer ' + str(peer_values['Name']))
peer = dbus.Interface(bus.get_object('net.connman.iwd', path), PEER_IF)
try:
peer.Disconnect()
except:
pass
for path in objects:
if PEER_IF in objects[path]:
interfaces_added(path, objects[path])
p2p.RequestDiscovery()
bus.add_signal_receiver(properties_changed,
bus_name="net.connman.iwd",
dbus_interface="org.freedesktop.DBus.Properties",
signal_name="PropertiesChanged",
path_keyword="path")
bus.add_signal_receiver(interfaces_added,
bus_name="net.connman.iwd",
dbus_interface="org.freedesktop.DBus.ObjectManager",
signal_name="InterfacesAdded")
bus.add_signal_receiver(interfaces_removed,
bus_name="net.connman.iwd",
dbus_interface="org.freedesktop.DBus.ObjectManager",
signal_name="InterfacesRemoved")
mainloop = GLib.MainLoop()
mainloop.run()