3
0
mirror of https://git.kernel.org/pub/scm/network/wireless/iwd.git synced 2025-01-04 11:42:33 +01:00
iwd/autotests/util/hostapd.py
James Prestwood 35f06ef87a auto-t: hostapd: allow CLI to lookup by config file
There is a common interface lookup in many tests in order to initialize
the HostapdCLI object e.g.:

for intf in hostapd_map.values():
    if intf.config == 'ssidOWE.conf':
        hapd = HostapdCLI(intf)
        break

Instead of having to do this in every test, HostapdCLI will now
optionally take a config file (config=<file>). The interface object
will still be prefered (i.e. supplying an interface will not even
check the config file) as to not break existing tests. But if only
a config file is supplied the lookup is done internally.

There are some tests that do still need the interface, as they do
an interface lookup to initialize both hostapd and hwsim at the
same time.
2019-06-05 16:18:26 -05:00

140 lines
4.1 KiB
Python

#!/usr/bin/python3
import os, os.path
from wiphy import wiphy_map
import re
import socket
chan_freq_map = [
None,
2412,
2417,
2422,
2427,
2432,
2437,
2442,
2447,
2452,
2457,
2462,
2467,
2472,
2484
]
hostapd_map = {ifname: intf for wname, wiphy in wiphy_map.items()
for ifname, intf in wiphy.interface_map.items()
if wiphy.use == 'hostapd'}
class HostapdCLI:
def __init__(self, interface=None, config=None):
if not interface and not config:
raise Exception('interface or config must be provided')
if not interface:
for intf in hostapd_map.values():
if intf.config == config:
interface = intf
break
self.ifname = interface.name
self.socket_path = os.path.dirname(interface.ctrl_interface)
self.cmdline = 'hostapd_cli -p"' + self.socket_path + '" -i"' + \
self.ifname + '"'
self._hostapd_restarted = False
def __del__(self):
if self._hostapd_restarted:
os.system('killall hostapd')
def wps_push_button(self):
os.system(self.cmdline + ' wps_pbc')
def wps_pin(self, pin):
os.system(self.cmdline + ' wps_pin any ' + pin)
def deauthenticate(self, client_address):
os.system(self.cmdline + ' deauthenticate ' + client_address)
def eapol_reauth(self, client_address):
cmd = 'IFNAME=' + self.ifname + ' EAPOL_REAUTH ' + client_address
s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
s.connect(self.socket_path + '/' + self.ifname)
s.sendall(cmd.encode('utf-8'))
s.close()
def reload(self):
# Seemingly all three commands needed for the instance to notice
# interface's address change
cmds = 'reload\ndisable\nenable\n'
proc = os.popen(self.cmdline, mode='w')
lines = proc.write(cmds)
proc.close()
def list_sta(self):
proc = os.popen(self.cmdline + ' list_sta')
lines = proc.read()
proc.close()
return [line for line in lines.split('\n') if line]
def set_neighbor(self, addr, ssid, nr):
os.system(self.cmdline + ' set_neighbor ' + addr + ' ssid=\\""' + ssid +
'"\\" nr=' + nr)
def send_bss_transition(self, device, nr_list):
# Send a BSS transition to a station (device). nr_list should be an
# array of tuples containing the BSS address and neighbor report.
# Parsing the neighbor report is a bit ugly but it makes it more
# consistent with the set_neighbor() API, i.e. the same neighbor report
# string could be used in both API's.
pref = 1
cmd = self.cmdline + ' bss_tm_req ' + device
for i in nr_list:
addr = i[0]
nr = i[1]
bss_info=str(int(nr[0:8], 16))
op_class=str(int(nr[8:10], 16))
chan_num=nr[10:12]
phy_num=nr[14:16]
cmd += ' pref=%s neighbor=%s,%s,%s,%s,%s' % \
(str(pref), addr, bss_info, op_class, chan_num, phy_num)
pref += 1
os.system(cmd)
@staticmethod
def kill_all():
os.system('killall hostapd')
def get_config_value(self, key):
# first find the right config file
with open(hostapd_map[self.ifname].config, 'r') as f:
# read in config file and search for key
cfg = f.read()
match = re.search(r'%s=.*' % key, cfg)
if match:
return match.group(0).split('=')[1]
return None
def get_freq(self):
return chan_freq_map[int(self.get_config_value('channel'))]
def ungraceful_restart(self):
'''
Ungracefully kill and restart hostapd
'''
intf = hostapd_map[self.ifname]
os.system('killall -9 hostapd')
os.system('ifconfig %s down' % intf.name)
os.system('ifconfig %s up' % intf.name)
os.system('hostapd -g %s -i %s %s &' %
(intf.ctrl_interface, intf.name, intf.config))
# set flag so hostapd can be killed after the test
self._hostapd_restarted = True