mirror of
https://git.kernel.org/pub/scm/network/wireless/iwd.git
synced 2024-11-22 14:49:24 +01:00
auto-t: hostapd.py: update to work with test-runner rewrite
Before hostapd was initialized using the wiphy_map which has now gone away. Instead we have a global config module which contains a single 'ctx'. This is the centeral store for all test information. This patch converts hostapd.py to lookup instances by already initialized Hostapd object. The interface parameter was removed since all tests have been converted to use config= instead. In addition HostapdCLI was changed to allow no parameters if there is only a single hostapd instance.
This commit is contained in:
parent
0772d4b61a
commit
edac41b1b3
@ -1,11 +1,11 @@
|
|||||||
#!/usr/bin/python3
|
#!/usr/bin/python3
|
||||||
import os, os.path
|
import os, os.path
|
||||||
from wiphy import wiphy_map
|
|
||||||
import re
|
import re
|
||||||
import socket
|
import socket
|
||||||
import select
|
import select
|
||||||
import time
|
import time
|
||||||
from gi.repository import GLib
|
from gi.repository import GLib
|
||||||
|
from config import ctx
|
||||||
|
|
||||||
chan_freq_map = [
|
chan_freq_map = [
|
||||||
None,
|
None,
|
||||||
@ -28,31 +28,26 @@ chan_freq_map = [
|
|||||||
ctrl_count = 0
|
ctrl_count = 0
|
||||||
mainloop = GLib.MainLoop()
|
mainloop = GLib.MainLoop()
|
||||||
|
|
||||||
hostapd_map = {ifname: intf for wname, wiphy in wiphy_map.items()
|
|
||||||
for ifname, intf in wiphy.interface_map.items()
|
|
||||||
if wiphy.use == 'hostapd'}
|
|
||||||
|
|
||||||
class HostapdCLI:
|
class HostapdCLI:
|
||||||
def _init_hostapd(self, interface=None, config=None):
|
def _init_hostapd(self, config=None):
|
||||||
global ctrl_count
|
global ctrl_count
|
||||||
|
interface = None
|
||||||
|
|
||||||
if not interface and not config:
|
if not config and len(ctx.hostapd.instances) > 1:
|
||||||
raise Exception('interface or config must be provided')
|
raise Exception('config must be provided if more than one hostapd instance exists')
|
||||||
|
|
||||||
if not interface:
|
hapd = ctx.hostapd[config]
|
||||||
for intf in hostapd_map.values():
|
|
||||||
if intf.config == config:
|
|
||||||
interface = intf
|
|
||||||
break
|
|
||||||
|
|
||||||
if not interface:
|
self.interface = hapd.intf
|
||||||
|
self.config = hapd.config
|
||||||
|
|
||||||
|
if not self.interface:
|
||||||
raise Exception('config %s not found' % config)
|
raise Exception('config %s not found' % config)
|
||||||
|
|
||||||
self.ifname = interface.name
|
self.ifname = self.interface.name
|
||||||
self.socket_path = os.path.dirname(interface.ctrl_interface)
|
self.socket_path = os.path.dirname(self.interface.ctrl_interface)
|
||||||
|
|
||||||
self.cmdline = 'hostapd_cli -p"' + self.socket_path + '" -i"' + \
|
self.cmdline = ['hostapd_cli', '-p', self.socket_path, '-i', self.ifname]
|
||||||
self.ifname + '"'
|
|
||||||
|
|
||||||
if not hasattr(self, '_hostapd_restarted'):
|
if not hasattr(self, '_hostapd_restarted'):
|
||||||
self._hostapd_restarted = False
|
self._hostapd_restarted = False
|
||||||
@ -61,6 +56,7 @@ class HostapdCLI:
|
|||||||
str(ctrl_count)
|
str(ctrl_count)
|
||||||
self.ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
self.ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
||||||
self.ctrl_sock.bind(self.local_ctrl)
|
self.ctrl_sock.bind(self.local_ctrl)
|
||||||
|
|
||||||
self.ctrl_sock.connect(self.socket_path + '/' + self.ifname)
|
self.ctrl_sock.connect(self.socket_path + '/' + self.ifname)
|
||||||
|
|
||||||
if 'OK' not in self._ctrl_request('ATTACH'):
|
if 'OK' not in self._ctrl_request('ATTACH'):
|
||||||
@ -68,8 +64,8 @@ class HostapdCLI:
|
|||||||
|
|
||||||
ctrl_count = ctrl_count + 1
|
ctrl_count = ctrl_count + 1
|
||||||
|
|
||||||
def __init__(self, interface=None, config=None):
|
def __init__(self, config=None):
|
||||||
self._init_hostapd(interface, config)
|
self._init_hostapd(config)
|
||||||
|
|
||||||
def wait_for_event(self, event, timeout=10):
|
def wait_for_event(self, event, timeout=10):
|
||||||
global mainloop
|
global mainloop
|
||||||
@ -115,53 +111,49 @@ class HostapdCLI:
|
|||||||
|
|
||||||
def _del_hostapd(self, force=False):
|
def _del_hostapd(self, force=False):
|
||||||
self.ctrl_sock.close()
|
self.ctrl_sock.close()
|
||||||
|
os.remove(self.local_ctrl)
|
||||||
|
|
||||||
if self._hostapd_restarted:
|
if self._hostapd_restarted:
|
||||||
if force:
|
ctx.stop_process(ctx.hostapd.process, force)
|
||||||
os.system('killall -9 hostapd')
|
|
||||||
else:
|
|
||||||
os.system('killall hostapd')
|
|
||||||
|
|
||||||
os.system('ifconfig %s down' % self.ifname)
|
self.interface.set_interface_state('down')
|
||||||
os.system('ifconfig %s up' % self.ifname)
|
self.interface.set_interface_state('up')
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
self._del_hostapd()
|
self._del_hostapd()
|
||||||
|
|
||||||
def wps_push_button(self):
|
def wps_push_button(self):
|
||||||
os.system(self.cmdline + ' wps_pbc')
|
ctx.start_process(self.cmdline + ['wps_pbc'], wait=True)
|
||||||
|
|
||||||
def wps_pin(self, pin):
|
def wps_pin(self, pin):
|
||||||
os.system(self.cmdline + ' wps_pin any ' + pin)
|
cmd = self.cmdline + ['wps_pin', 'any', pin]
|
||||||
|
ctx.start_process(cmd, wait=True)
|
||||||
|
|
||||||
def deauthenticate(self, client_address):
|
def deauthenticate(self, client_address):
|
||||||
os.system(self.cmdline + ' deauthenticate ' + client_address)
|
cmd = self.cmdline + ['deauthenticate', client_address]
|
||||||
|
ctx.start_process(cmd, wait=True)
|
||||||
|
|
||||||
def eapol_reauth(self, client_address):
|
def eapol_reauth(self, client_address):
|
||||||
cmd = 'IFNAME=' + self.ifname + ' EAPOL_REAUTH ' + client_address
|
cmd = 'IFNAME=' + self.ifname + ' EAPOL_REAUTH ' + client_address
|
||||||
s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
self.ctrl_sock.sendall(cmd.encode('utf-8'))
|
||||||
s.connect(self.socket_path + '/' + self.ifname)
|
|
||||||
s.sendall(cmd.encode('utf-8'))
|
|
||||||
s.close()
|
|
||||||
|
|
||||||
def reload(self):
|
def reload(self):
|
||||||
# Seemingly all three commands needed for the instance to notice
|
# Seemingly all three commands needed for the instance to notice
|
||||||
# interface's address change
|
# interface's address change
|
||||||
cmds = 'reload\ndisable\nenable\n'
|
ctx.start_process(self.cmdline + ['reload'], wait=True)
|
||||||
proc = os.popen(self.cmdline, mode='w')
|
ctx.start_process(self.cmdline + ['disable'], wait=True)
|
||||||
lines = proc.write(cmds)
|
ctx.start_process(self.cmdline + ['enable'], wait=True)
|
||||||
proc.close()
|
|
||||||
|
|
||||||
def list_sta(self):
|
def list_sta(self):
|
||||||
proc = os.popen(self.cmdline + ' list_sta')
|
proc = ctx.start_process(self.cmdline + ['list_sta'])
|
||||||
lines = proc.read()
|
proc.pid.wait()
|
||||||
proc.close()
|
lines = proc.pid.stdout.read().decode('utf-8')
|
||||||
|
|
||||||
return [line for line in lines.split('\n') if line]
|
return [line for line in lines.split('\n') if line]
|
||||||
|
|
||||||
def set_neighbor(self, addr, ssid, nr):
|
def set_neighbor(self, addr, ssid, nr):
|
||||||
os.system(self.cmdline + ' set_neighbor ' + addr + ' ssid=\\""' + ssid +
|
cmd = self.cmdline + ['set_neighbor', addr, 'ssid=\\""%s"\\"' % ssid, 'nr=%s' % nr]
|
||||||
'"\\" nr=' + nr)
|
ctx.start_process(cmd, wait=True)
|
||||||
|
|
||||||
def send_bss_transition(self, device, nr_list):
|
def send_bss_transition(self, device, nr_list):
|
||||||
# Send a BSS transition to a station (device). nr_list should be an
|
# Send a BSS transition to a station (device). nr_list should be an
|
||||||
@ -170,7 +162,7 @@ class HostapdCLI:
|
|||||||
# consistent with the set_neighbor() API, i.e. the same neighbor report
|
# consistent with the set_neighbor() API, i.e. the same neighbor report
|
||||||
# string could be used in both API's.
|
# string could be used in both API's.
|
||||||
pref = 1
|
pref = 1
|
||||||
cmd = self.cmdline + ' bss_tm_req ' + device
|
cmd = self.cmdline + ['bss_tm_req', device]
|
||||||
for i in nr_list:
|
for i in nr_list:
|
||||||
addr = i[0]
|
addr = i[0]
|
||||||
nr = i[1]
|
nr = i[1]
|
||||||
@ -180,19 +172,15 @@ class HostapdCLI:
|
|||||||
chan_num=nr[10:12]
|
chan_num=nr[10:12]
|
||||||
phy_num=nr[14:16]
|
phy_num=nr[14:16]
|
||||||
|
|
||||||
cmd += ' pref=%s neighbor=%s,%s,%s,%s,%s' % \
|
cmd += ['pref=%s' % str(pref), 'neighbor=%s,%s,%s,%s,%s' % \
|
||||||
(str(pref), addr, bss_info, op_class, chan_num, phy_num)
|
(addr, bss_info, op_class, chan_num, phy_num)]
|
||||||
pref += 1
|
pref += 1
|
||||||
|
|
||||||
os.system(cmd)
|
ctx.start_process(cmd, wait=True)
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def kill_all():
|
|
||||||
os.system('killall hostapd')
|
|
||||||
|
|
||||||
def get_config_value(self, key):
|
def get_config_value(self, key):
|
||||||
# first find the right config file
|
# first find the right config file
|
||||||
with open(hostapd_map[self.ifname].config, 'r') as f:
|
with open(self.config, 'r') as f:
|
||||||
# read in config file and search for key
|
# read in config file and search for key
|
||||||
cfg = f.read()
|
cfg = f.read()
|
||||||
match = re.search(r'%s=.*' % key, cfg)
|
match = re.search(r'%s=.*' % key, cfg)
|
||||||
@ -200,6 +188,7 @@ class HostapdCLI:
|
|||||||
return match.group(0).split('=')[1]
|
return match.group(0).split('=')[1]
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def get_freq(self):
|
def get_freq(self):
|
||||||
return chan_freq_map[int(self.get_config_value('channel'))]
|
return chan_freq_map[int(self.get_config_value('channel'))]
|
||||||
|
|
||||||
@ -209,21 +198,20 @@ class HostapdCLI:
|
|||||||
'''
|
'''
|
||||||
# set flag so hostapd can be killed after the test
|
# set flag so hostapd can be killed after the test
|
||||||
self._hostapd_restarted = True
|
self._hostapd_restarted = True
|
||||||
intf = hostapd_map[self.ifname]
|
|
||||||
|
|
||||||
self._del_hostapd(force=True)
|
self._del_hostapd(force=True)
|
||||||
|
|
||||||
os.system('hostapd -g %s -i %s %s &' %
|
ctx.start_hostapd()
|
||||||
(intf.ctrl_interface, intf.name, intf.config))
|
|
||||||
|
|
||||||
# Give hostapd a second to start and initialize the control interface
|
# Give hostapd a second to start and initialize the control interface
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
# New hostapd process, so re-init
|
# New hostapd process, so re-init
|
||||||
self._init_hostapd(intf)
|
self._init_hostapd(config=self.config)
|
||||||
|
|
||||||
def req_beacon(self, addr, request):
|
def req_beacon(self, addr, request):
|
||||||
'''
|
'''
|
||||||
Send a RRM Beacon request
|
Send a RRM Beacon request
|
||||||
'''
|
'''
|
||||||
os.system(self.cmdline + ' req_beacon ' + addr + ' ' + request)
|
cmd = self.cmdline + ['req_beacon', addr, request]
|
||||||
|
ctx.start_process(cmd, wait=True)
|
||||||
|
Loading…
Reference in New Issue
Block a user