mirror of
https://git.kernel.org/pub/scm/network/wireless/iwd.git
synced 2024-11-22 14:49:24 +01:00
test: Add a connect-p2p test script
This commit is contained in:
parent
7a38085bf8
commit
270bbbf25a
155
test/connect-p2p
Executable file
155
test/connect-p2p
Executable file
@ -0,0 +1,155 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import sys
|
||||
import dbus
|
||||
import dbus.mainloop.glib
|
||||
from gi.repository import GLib
|
||||
import fnmatch
|
||||
|
||||
if len(sys.argv) not in [1, 2]:
|
||||
print("Usage: %s [<name_mask>]" % (sys.argv[0],))
|
||||
sys.exit(1)
|
||||
|
||||
name_mask = None
|
||||
if len(sys.argv) >= 2:
|
||||
name_mask = sys.argv[1]
|
||||
|
||||
DEVICE_IF = 'net.connman.iwd.p2p.Device'
|
||||
PEER_IF = 'net.connman.iwd.p2p.Peer'
|
||||
WSC_IF = 'net.connman.iwd.SimpleConfiguration'
|
||||
|
||||
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
||||
|
||||
bus = dbus.SystemBus()
|
||||
manager = dbus.Interface(bus.get_object('net.connman.iwd', '/'),
|
||||
'org.freedesktop.DBus.ObjectManager')
|
||||
objects = manager.GetManagedObjects()
|
||||
|
||||
p2p_path = None
|
||||
peers = {}
|
||||
connecting = False
|
||||
mainloop = None
|
||||
|
||||
for path in objects:
|
||||
if DEVICE_IF in objects[path] and p2p_path is None:
|
||||
p2p_path = path
|
||||
|
||||
if p2p_path is None:
|
||||
print('No P2P interfaces')
|
||||
sys.exit(-1)
|
||||
|
||||
values = objects[p2p_path][DEVICE_IF]
|
||||
properties = dbus.Interface(bus.get_object('net.connman.iwd', p2p_path),
|
||||
'org.freedesktop.DBus.Properties')
|
||||
p2p = dbus.Interface(bus.get_object('net.connman.iwd', p2p_path), DEVICE_IF)
|
||||
|
||||
def list_peers():
|
||||
global peers
|
||||
|
||||
print('We have the following P2P peers')
|
||||
for path in peers:
|
||||
result = peers[path]
|
||||
|
||||
print(' ' + result['Name'] + ' at ' + path)
|
||||
|
||||
def connect(path):
|
||||
global connecting, peers, mainloop
|
||||
|
||||
list_peers()
|
||||
|
||||
peer = dbus.Interface(bus.get_object('net.connman.iwd', path), PEER_IF)
|
||||
peer_wsc = dbus.Interface(bus.get_object('net.connman.iwd', path), WSC_IF)
|
||||
peer_props = peers[path]
|
||||
|
||||
print('Connecting to ' + peer_props['Name'])
|
||||
connecting = True
|
||||
try:
|
||||
peer_wsc.PushButton()
|
||||
print('New connection')
|
||||
except Exception as e:
|
||||
connecting = False
|
||||
print('Connection error: ' + repr(e))
|
||||
|
||||
if mainloop:
|
||||
mainloop.quit()
|
||||
else:
|
||||
sys.exit(0)
|
||||
|
||||
def properties_changed(interface, changed, invalidated, path):
|
||||
global peers
|
||||
|
||||
path = str(path)
|
||||
if interface != PEER_IF or path not in peers:
|
||||
return
|
||||
|
||||
peers[path].update(changed)
|
||||
for k in invalidated:
|
||||
del peers[path][k]
|
||||
|
||||
def interfaces_added(path, interfaces):
|
||||
global connecting, peers
|
||||
|
||||
if PEER_IF not in interfaces:
|
||||
return
|
||||
|
||||
path = str(path)
|
||||
peers[path] = interfaces[PEER_IF]
|
||||
print('Added peer ' + str(peers[path]['Name']))
|
||||
|
||||
if connecting or name_mask is None or not fnmatch.fnmatch(peers[path]['Name'], name_mask):
|
||||
return
|
||||
|
||||
connect(path)
|
||||
|
||||
def interfaces_removed(path, interfaces):
|
||||
global peers
|
||||
|
||||
if PEER_IF not in interfaces:
|
||||
return
|
||||
|
||||
print('Removed peer ' + str(peers[path]['Name']))
|
||||
del peers[path]
|
||||
|
||||
if not values['Enabled']:
|
||||
properties.Set(DEVICE_IF, 'Enabled', True)
|
||||
|
||||
if not values['AvailableConnections']:
|
||||
# See if we have peers we can disconnect or where we can cancel an ongoing
|
||||
# connection that may be in the 120s GO Negotiation timeout for example.
|
||||
# Should we have a Connecting property on the WSC interface so we can know
|
||||
# whether a connection is in progress?
|
||||
for path in objects:
|
||||
if PEER_IF not in objects[path]:
|
||||
continue
|
||||
peer_values = objects[path][PEER_IF]
|
||||
print('Trying to disconnect from peer ' + str(peer_values['Name']))
|
||||
peer = dbus.Interface(bus.get_object('net.connman.iwd', path), PEER_IF)
|
||||
try:
|
||||
peer.Disconnect()
|
||||
except:
|
||||
pass
|
||||
|
||||
for path in objects:
|
||||
if PEER_IF in objects[path]:
|
||||
interfaces_added(path, objects[path])
|
||||
|
||||
p2p.RequestDiscovery()
|
||||
|
||||
bus.add_signal_receiver(properties_changed,
|
||||
bus_name="net.connman.iwd",
|
||||
dbus_interface="org.freedesktop.DBus.Properties",
|
||||
signal_name="PropertiesChanged",
|
||||
path_keyword="path")
|
||||
|
||||
bus.add_signal_receiver(interfaces_added,
|
||||
bus_name="net.connman.iwd",
|
||||
dbus_interface="org.freedesktop.DBus.ObjectManager",
|
||||
signal_name="InterfacesAdded")
|
||||
|
||||
bus.add_signal_receiver(interfaces_removed,
|
||||
bus_name="net.connman.iwd",
|
||||
dbus_interface="org.freedesktop.DBus.ObjectManager",
|
||||
signal_name="InterfacesRemoved")
|
||||
|
||||
mainloop = GLib.MainLoop()
|
||||
mainloop.run()
|
Loading…
Reference in New Issue
Block a user