diff --git a/autotests/util/hostapd.py b/autotests/util/hostapd.py index 209d8c24..49047f38 100644 --- a/autotests/util/hostapd.py +++ b/autotests/util/hostapd.py @@ -3,6 +3,9 @@ import os, os.path from wiphy import wiphy_map import re import socket +import select +import time +from gi.repository import GLib chan_freq_map = [ None, @@ -22,12 +25,17 @@ chan_freq_map = [ 2484 ] +ctrl_count = 0 +mainloop = GLib.MainLoop() + hostapd_map = {ifname: intf for wname, wiphy in wiphy_map.items() for ifname, intf in wiphy.interface_map.items() if wiphy.use == 'hostapd'} class HostapdCLI: def __init__(self, interface=None, config=None): + global ctrl_count + if not interface and not config: raise Exception('interface or config must be provided') @@ -48,10 +56,64 @@ class HostapdCLI: self._hostapd_restarted = False + self.local_ctrl = '/tmp/hostapd_' + str(os.getpid()) + '_' + \ + str(ctrl_count) + self.ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + self.ctrl_sock.bind(self.local_ctrl) + self.ctrl_sock.connect(self.socket_path + '/' + self.ifname) + + if 'OK' not in self._ctrl_request('ATTACH'): + raise Exception('ATTACH failed') + + ctrl_count = ctrl_count + 1 + + def wait_for_event(self, event, timeout=10): + global mainloop + self._wait_timed_out = False + + def wait_timeout_cb(): + self._wait_timed_out = True + return False + + timeout = GLib.timeout_add_seconds(timeout, wait_timeout_cb) + context = mainloop.get_context() + + while True: + context.iteration(may_block=False) + + while self._data_available(0.25): + data = self.ctrl_sock.recv(4096).decode('utf-8') + if event in data: + return data + + if self._wait_timed_out: + raise TimeoutError('waiting for hostapd event timed out') + + return None + + def _data_available(self, timeout=2): + [r, w, e] = select.select([self.ctrl_sock], [], [], timeout) + if r: + return True + return False + + def _ctrl_request(self, command, timeout=10): + if type(command) is str: + command = str.encode(command) + + self.ctrl_sock.send(bytes(command)) + + if self._data_available(timeout): + return self.ctrl_sock.recv(4096).decode('utf-8') + + raise Exception('timeout waiting for control response') + def __del__(self): if self._hostapd_restarted: os.system('killall hostapd') + self.ctrl_sock.close() + def wps_push_button(self): os.system(self.cmdline + ' wps_pbc') @@ -140,3 +202,9 @@ class HostapdCLI: # set flag so hostapd can be killed after the test self._hostapd_restarted = True + + def req_beacon(self, addr, request): + ''' + Send a RRM Beacon request + ''' + os.system(self.cmdline + ' req_beacon ' + addr + ' ' + request)