3
0
mirror of https://git.kernel.org/pub/scm/network/wireless/iwd.git synced 2024-11-21 22:09:23 +01:00
iwd/tools/run-tests
James Prestwood 5a14daf9b8 test-runner: use may_block=True for context iteration (and move location)
This allows the callers condition to be checked immediately without
the mainloop running. In addition may_block=True allows the mainloop
to poll/sleep rather than immediately return back to the caller. This
handles async IO much better than may_block=False, at least for our
use-case.
2022-03-31 18:12:40 -05:00

1534 lines
38 KiB
Python
Executable File

#!/usr/bin/python3
import os
import shutil
import fcntl
import sys
import subprocess
import atexit
import time
import unittest
import importlib
from unittest.result import TestResult
import multiprocessing
import re
import traceback
from runner import Runner
from configparser import ConfigParser
from prettytable import PrettyTable
from termcolor import colored
from glob import glob
from collections import namedtuple
from time import sleep
import dbus.mainloop.glib
from gi.repository import GLib
from weakref import WeakValueDictionary
config = None
intf_id = 0
TEST_MAX_TIMEOUT = 240
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
def dbg(*s, **kwargs):
'''
Allows prints if stdout has been re-directed
'''
print(*s, **kwargs, file=sys.__stdout__)
def exit_vm():
if config:
for p in Process.get_all():
print("Process %s still running!" % p.args[0])
p.kill()
if config.ctx and config.ctx.results:
success = print_results(config.ctx.results)
else:
success = False
if config.ctx.args.result:
result = 'PASS' if success else 'FAIL'
with open(config.ctx.args.result, 'w') as f:
f.write(result)
os.sync()
runner.stop()
def path_exists(path):
'''
Searches PATH as well as absolute paths.
'''
if shutil.which(path):
return True
try:
os.stat(path)
except:
return False
return True
def find_binary(list):
'''
Returns a binary from 'list' if its found in PATH or on a
valid absolute path.
'''
for path in list:
if path_exists(path):
return path
return None
# Partial DBus config. The remainder (<listen>) will be filled in for each
# namespace that is created so each individual dbus-daemon has its own socket
# and address.
dbus_config = '''
<!DOCTYPE busconfig PUBLIC \
"-//freedesktop//DTD D-Bus Bus Configuration 1.0//EN" \
"http://www.freedesktop.org/standards/dbus/1.0/\
busconfig.dtd\">
<busconfig>
<type>system</type>
<limit name=\"reply_timeout\">2147483647</limit>
<auth>ANONYMOUS</auth>
<allow_anonymous/>
<policy context=\"default\">
<allow user=\"*\"/>
<allow own=\"*\"/>
<allow send_type=\"method_call\"/>
<allow send_type=\"signal\"/>
<allow send_type=\"method_return\"/>
<allow send_type=\"error\"/>
<allow receive_type=\"method_call\"/>
<allow receive_type=\"signal\"/>
<allow receive_type=\"method_return\"/>
<allow receive_type=\"error\"/>
<allow send_destination=\"*\" eavesdrop=\"true\"/>
<allow eavesdrop=\"true\"/>
</policy>
'''
class Process(subprocess.Popen):
processes = WeakValueDictionary()
ctx = None
def __new__(cls, *args, **kwargs):
obj = super().__new__(cls)
cls.processes[id(obj)] = obj
return obj
def __init__(self, args, namespace=None, outfile=None, env=None, check=False, cleanup=None):
self.write_fds = []
self.io_watch = None
self.cleanup = cleanup
self.verbose = False
self.out = ''
self.hup = False
self.killed = False
self.namespace = namespace
logfile = args[0]
if not self.ctx:
global config
self.ctx = config.ctx
if self.ctx.is_verbose(args[0], log=False):
self.verbose = True
if namespace:
args = ['ip', 'netns', 'exec', namespace] + args
logfile += '-%s' % namespace
if outfile:
# outfile is only used by iwmon, in which case we don't want
# to append to an existing file.
self._append_outfile(outfile, append=False)
if self.ctx.args.log:
testdir = os.getcwd()
# Special case any processes started prior to a test
# (i.e. from testhome). Put these in the root log directory
if testdir == self.ctx.args.testhome:
testdir = '.'
else:
testdir = os.path.basename(testdir)
logfile = '%s/%s/%s' % (self.ctx.args.log, testdir, logfile)
self._append_outfile(logfile)
super().__init__(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
env=env, cwd=os.getcwd())
# Set as non-blocking so read() in the IO callback doesn't block forever
fl = fcntl.fcntl(self.stdout, fcntl.F_GETFL)
fcntl.fcntl(self.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
self.io_watch = GLib.io_add_watch(self.stdout, GLib.IO_IN |
GLib.IO_HUP | GLib.IO_ERR, self.process_io)
print("Starting process {}".format(self.args))
if check:
self.wait(10)
self.killed = True
if self.returncode != 0:
raise subprocess.CalledProcessError(returncode=self.returncode,
cmd=args)
@classmethod
def get_all(cls):
return cls.processes.values()
@classmethod
def kill_all(cls):
for p in cls.processes.values():
p.kill()
@staticmethod
def _write_io(instance, data, stdout=True):
for f in instance.write_fds:
f.write(data)
# Write out a separator so multiple process calls per
# test are easer to read.
if instance.hup:
f.write("Terminated: {}\n\n".format(instance.args))
f.flush()
if instance.verbose and stdout:
sys.__stdout__.write(data)
sys.__stdout__.flush()
@classmethod
def write_separators(cls, sep):
for proc in cls.processes.values():
if proc.killed:
continue
cls._write_io(proc, sep, stdout=False)
def process_io(self, source, condition):
if condition & GLib.IO_HUP:
self.hup = True
data = source.read()
if not data:
return True
data = data.decode('utf-8')
# Save data away in case the caller needs it (e.g. list_sta)
self.out += data
self._write_io(self, data)
return True
def _append_outfile(self, file, append=True):
gid = int(os.environ.get('SUDO_GID', os.getgid()))
uid = int(os.environ.get('SUDO_UID', os.getuid()))
dir = os.path.dirname(file)
if not path_exists(dir):
os.mkdir(dir)
os.chown(dir, uid, gid)
file = os.path.join(dir,file)
# If the out file exists, append. Useful for processes like
# hostapd_cli where it is called multiple times independently.
if os.path.isfile(file) and append:
mode = 'a'
else:
mode = 'w'
try:
f = open(file, mode)
except Exception as e:
traceback.print_exc()
sys.exit(0)
os.fchown(f.fileno(), uid, gid)
self.write_fds.append(f)
def wait_for_socket(self, socket, wait):
Namespace.non_block_wait(os.path.exists, wait, socket)
# Wait for both process termination and HUP signal
def __wait(self, timeout):
try:
super().wait(timeout)
if not self.hup:
return False
return True
except:
return False
# Override wait() so it can do so non-blocking
def wait(self, timeout=10):
Namespace.non_block_wait(self.__wait, timeout, 1)
self._cleanup()
def _cleanup(self):
if self.cleanup:
self.cleanup()
self.write_fds = []
if self.io_watch:
GLib.source_remove(self.io_watch)
self.io_watch = None
self.cleanup = None
self.killed = True
# Override kill()
def kill(self, force=False):
if self.killed:
return
print("Killing process {}".format(self.args))
if force:
super().kill()
else:
self.terminate()
try:
self.wait(timeout=15)
except:
dbg("Process %s did not complete in 15 seconds!" % self.name)
super().kill()
self._cleanup()
def __str__(self):
return str(self.args) + '\n'
class Interface:
def __init__(self, name, config):
self.name = name
self.ctrl_interface = '/var/run/hostapd/' + name
self.config = config
def __del__(self):
Process(['iw', 'dev', self.name, 'del']).wait()
def set_interface_state(self, state):
Process(['ip', 'link', 'set', self.name, state]).wait()
class Radio:
def __init__(self, name):
self.name = name
# hostapd will reset this if this radio is used by it
self.use = 'iwd'
self.interface = None
def __del__(self):
print("Removing radio %s" % self.name)
self.interface = None
def create_interface(self, config, use):
global intf_id
ifname = 'wln%s' % intf_id
intf_id += 1
self.interface = Interface(ifname, config)
self.use = use
Process(['iw', 'phy', self.name, 'interface', 'add', ifname,
'type', 'managed']).wait()
return self.interface
def __str__(self):
ret = self.name + ':\n'
ret += '\tUsed By: %s ' % self.use
if self.interface:
ret += '(%s)' % self.interface.name
ret += '\n'
return ret
class VirtualRadio(Radio):
'''
A subclass of 'Radio' specific to mac80211_hwsim radios.
TODO: Using D-Bus to create and destroy radios is more desireable
than the command line.
'''
def __init__(self, name, cfg=None):
global config
self.disable_cipher = None
self.disable_iftype = None
self.hwsim = config.hwsim.Hwsim()
if cfg:
self.disable_iftype = cfg.get('iftype_disable', None)
self.disable_cipher = cfg.get('cipher_disable', None)
self._radio = self.hwsim.radios.create(name, p2p_device=True,
iftype_disable=self.disable_iftype,
cipher_disable=self.disable_cipher)
super().__init__(self._radio.name)
def __del__(self):
super().__del__()
# If the radio was moved into a namespace this will fail
try:
self._radio.remove()
except:
pass
self._radio = None
def __str__(self):
ret = super().__str__()
if self.disable_iftype:
ret += '\tDisabled interface types: %s\n' % self.disable_iftype
if self.disable_cipher:
ret += '\tDisabled ciphers: %s\n' % self.disable_cipher
ret += '\tPath: %s' % self._radio.path
ret += '\n'
return ret
class HostapdInstance:
'''
A single instance of hostapd. In reality all hostapd instances
are started as a single process. This class just makes things
convenient for communicating with one of the hostapd APs.
'''
def __init__(self, config, radio):
self.radio = radio
self.config = config
self.cli = None
self.intf = radio.create_interface(self.config, 'hostapd')
self.intf.set_interface_state('up')
def __del__(self):
print("Removing HostapdInstance %s" % self.config)
self.intf.set_interface_state('down')
self.radio = None
self.intf = None
def __str__(self):
ret = 'Hostapd (%s)\n' % self.intf.name
ret += '\tConfig: %s\n' % self.config
return ret
class Hostapd:
'''
A set of running hostapd instances. This is really just a single
process since hostapd can be started with multiple config files.
'''
def __init__(self, ctx, radios, configs, radius):
self.ctx = ctx
if len(configs) != len(radios):
raise Exception("Config (%d) and radio (%d) list length not equal" % \
(len(configs), len(radios)))
print("Initializing hostapd instances")
Process(['ip', 'link', 'set', 'eth0', 'up']).wait()
Process(['ip', 'link', 'set', 'eth1', 'up']).wait()
self.global_ctrl_iface = '/var/run/hostapd/ctrl'
self.instances = [HostapdInstance(c, r) for c, r in zip(configs, radios)]
ifaces = [rad.interface.name for rad in radios]
ifaces = ','.join(ifaces)
args = ['hostapd', '-g', self.global_ctrl_iface]
if ifaces:
args.extend(['-i', ifaces])
#
# Config files should already be present in /tmp. This appends
# ctrl_interface and does any variable replacement. Currently
# this is just any $ifaceN occurrences.
#
for c in configs:
full_path = '/tmp/%s' % c
args.append(full_path)
self._rewrite_config(full_path)
if radius:
args.append(radius)
if ctx.is_verbose('hostapd'):
args.append('-d')
self.process = Process(args)
self.process.wait_for_socket(self.global_ctrl_iface, 30)
for hapd in self.instances:
self.process.wait_for_socket(hapd.intf.ctrl_interface, 30)
def attach_cli(self):
global config
for hapd in self.instances:
hapd.cli = config.hostapd.HostapdCLI(config=hapd.config)
def _rewrite_config(self, config):
'''
Replaces any $ifaceN values with the correct interface
names as well as appends the ctrl_interface path to
the config file.
'''
with open(config, 'r+') as f:
data = f.read()
to_replace = []
for match in re.finditer(r'\$iface[0-9]+', data):
tag = data[match.start():match.end()]
idx = tag.split('iface')[1]
to_replace.append((tag, self.instances[int(idx)].intf.name))
for r in to_replace:
data = data.replace(r[0], r[1], 1)
data += '\nctrl_interface=/var/run/hostapd\n'
f.write(data)
def __getitem__(self, config):
if not config:
return self.instances[0]
for hapd in self.instances:
if hapd.config == config:
return hapd
return None
def __del__(self):
print("Removing Hostapd")
try:
os.remove(self.global_ctrl_iface)
except:
print("Failed to remove %s" % self.global_ctrl_iface)
self.instances = None
# Hostapd may have already been stopped
if self.process:
self.ctx.stop_process(self.process)
self.ctx = None
# Hostapd creates simdb sockets for EAP-SIM/AKA tests but does not
# clean them up.
for f in glob("/tmp/eap_sim_db*"):
os.remove(f)
dbus_count = 0
class Namespace:
def __init__(self, args, name, radios):
self.dbus_address = None
self.name = name
self.radios = radios
self.args = args
Process(['ip', 'netns', 'add', name]).wait()
for r in radios:
Process(['iw', 'phy', r.name, 'set', 'netns', 'name', name]).wait()
self.start_dbus()
def reset(self):
self._bus = None
for r in self.radios:
r._radio = None
self.radios = []
Process.kill_all()
def __del__(self):
if self.name:
print("Removing namespace %s" % self.name)
Process(['ip', 'netns', 'del', self.name]).wait()
def get_bus(self):
return self._bus
def start_process(self, args, env=None, **kwargs):
if not env:
env = os.environ.copy()
if hasattr(self, "dbus_address"):
# In case this process needs DBus...
env['DBUS_SYSTEM_BUS_ADDRESS'] = self.dbus_address
return Process(args, namespace=self.name, env=env, **kwargs)
def stop_process(self, p, force=False):
p.kill(force)
def is_process_running(self, process):
for p in Process.get_all():
if p.namespace == self.name and p.args[0] == process:
return True
return False
def _cleanup_dbus(self):
try:
os.remove(self.dbus_address.split('=')[1])
except:
pass
os.remove(self.dbus_cfg)
def start_dbus(self):
global dbus_count
self.dbus_address = 'unix:path=/tmp/dbus%d' % dbus_count
self.dbus_cfg = '/tmp/dbus%d.conf' % dbus_count
dbus_count += 1
with open(self.dbus_cfg, 'w+') as f:
f.write(dbus_config)
f.write('<listen>%s</listen>\n' % self.dbus_address)
f.write('</busconfig>\n')
p = self.start_process(['dbus-daemon', '--config-file=%s' % self.dbus_cfg],
cleanup=self._cleanup_dbus)
p.wait_for_socket(self.dbus_address.split('=')[1], 5)
self._bus = dbus.bus.BusConnection(address_or_type=self.dbus_address)
def start_iwd(self, config_dir = '/tmp', storage_dir = '/tmp/iwd'):
args = []
iwd_radios = ','.join([r.name for r in self.radios if r.use == 'iwd'])
if self.args.valgrind:
args.extend(['valgrind', '--leak-check=full', '--track-origins=yes',
'--show-leak-kinds=all',
'--log-file=/tmp/valgrind.log.%p'])
args.extend(['iwd', '-E'])
if iwd_radios != '':
args.extend(['-p', iwd_radios])
if self.is_verbose(args[0]):
args.append('-d')
env = os.environ.copy()
env['CONFIGURATION_DIRECTORY'] = config_dir
env['STATE_DIRECTORY'] = storage_dir
if self.is_verbose('iwd-dhcp'):
env['IWD_DHCP_DEBUG'] = '1'
if self.is_verbose('iwd-tls'):
env['IWD_TLS_DEBUG'] = '1'
if self.is_verbose('iwd-acd'):
env['IWD_ACD_DEBUG'] = '1'
return self.start_process(args, env=env)
def is_verbose(self, process, log=True):
process = os.path.basename(process)
if self.args is None:
return False
# every process is verbose when logging is enabled
if log and self.args.log:
return True
if process in self.args.verbose:
return True
# Special case here to enable verbose output with valgrind running
if process == 'valgrind' and 'iwd' in self.args.verbose:
return True
# Handle any glob matches
for item in self.args.verbose:
if process in glob(item):
return True
return False
@staticmethod
def non_block_wait(func, timeout, *args, exception=True):
'''
Convenience function for waiting in a non blocking
manor using GLibs context iteration i.e. does not block
the main loop while waiting.
'func' will be called at least once and repeatedly until
either it returns success, throws an exception, or the
'timeout' expires.
'timeout' is the ultimate timeout in seconds
'*args' will be passed to 'func'
If 'exception' is an Exception type it will be raised.
If 'exception' is True a generic TimeoutError will be raised.
Any other value will not result in an exception.
'''
# Simple class for signaling the wait timeout
class Bool:
def __init__(self, value):
self.value = value
def wait_timeout_cb(done):
done.value = True
return False
mainloop = GLib.MainLoop()
done = Bool(False)
timeout = GLib.timeout_add_seconds(timeout, wait_timeout_cb, done)
context = mainloop.get_context()
while True:
try:
ret = func(*args)
if ret:
if not done.value:
GLib.source_remove(timeout)
return ret
except Exception as e:
if not done.value:
GLib.source_remove(timeout)
raise e
if done.value == True:
if isinstance(exception, Exception):
raise exception
elif type(exception) == bool and exception:
raise TimeoutError("Timeout on non_block_wait")
else:
return
context.iteration(may_block=True)
def __str__(self):
ret = 'Namespace: %s\n' % self.name
ret += 'Processes:\n'
for p in Process.get_all():
ret += '\t%s' % str(p)
ret += 'Radios:\n'
if len(self.radios) > 0:
for r in self.radios:
ret += '\t%s\n' % str(r)
else:
ret += '\tNo Radios\n'
ret += 'DBus Address: %s\n' % self.dbus_address
ret += '===================================================\n\n'
return ret
class BarChart():
def __init__(self, height=10, max_width=80):
self._height = height
self._max_width = max_width
self._values = []
self._max_value = 0
self._min_value = 0
def add_value(self, value):
if len(self._values) == 0:
self._max_value = int(1.01 * value)
self._min_value = int(0.99 * value)
elif value > self._max_value:
self._max_value = int(1.01 * value)
elif value < self._min_value:
self._min_value = int(0.99 * value)
self._values.append(value)
def _value_to_stars(self, value):
# Need to scale value (range of min_value -> max_value) to
# a range of 0 -> height
#
# Scaled = ((value - min_value) / ( max_value - min_value)) * (Height - 0) + 0
return int(((value - self._min_value) /
(self._max_value - self._min_value)) * self._height)
def __str__(self):
# Need to map value from range 0 - self._height
ret = ''
for i, value in enumerate(self._values):
stars = self._value_to_stars(value)
ret += '[%3u] ' % i + '%-10s' % ('*' * stars) + '\t\t\t%d\n' % value
ret += '\n'
return ret
class TestContext(Namespace):
'''
Contains all information for a given set of tests being run
such as processes, radios, interfaces and test results.
'''
def __init__(self, args):
self.name = None
self.args = args
self.hw_config = None
self.hostapd = None
self.wpas_interfaces = None
self.cur_radio_id = 0
self.cur_iface_id = 0
self.radios = []
self.loopback_started = False
self.results = {}
self.mainloop = GLib.MainLoop()
self.namespaces = []
self._last_mem_available = 0
self._mem_chart = BarChart()
def start_dbus_monitor(self):
if not self.is_verbose('dbus-monitor'):
return
self.start_process(['dbus-monitor', '--address', self.dbus_address])
def start_haveged(self):
self.start_process(['haveged', '-F'])
def create_radios(self):
setup = self.hw_config['SETUP']
nradios = int(setup['num_radios'])
args = ['hwsim']
if self.hw_config['SETUP'].get('hwsim_medium', 'no') in ['no', '0', 'false']:
# register hwsim as medium
args.extend(['--no-register'])
self.start_process(args)
self.non_block_wait(self._bus.name_has_owner, 20, 'net.connman.hwsim',
exception=TimeoutError('net.connman.hwsim did not appear'))
for i in range(nradios):
name = 'rad%u' % i
# Get any [radX] sections. These are for configuring
# any special radios. This no longer requires a
# radio_conf list, we just assume radios start rad0
# and increment.
rad_config = None
if self.hw_config.has_section(name):
rad_config = self.hw_config[name]
self.radios.append(VirtualRadio(name, rad_config))
self.cur_radio_id += 1
def discover_radios(self):
import pyroute2
phys = []
try:
iw = pyroute2.iwutil.IW()
except:
iw = pyroute2.IW()
attrs = [phy['attrs'] for phy in iw.list_wiphy()]
for attr in attrs:
for key, value in attr:
if key == 'NL80211_ATTR_WIPHY_NAME':
if value not in phys:
phys.append(value)
break
print('Discovered radios: %s' % str(phys))
self.radios = [Radio(name) for name in phys]
def start_radios(self):
reg_domain = self.hw_config['SETUP'].get('reg_domain', None)
if reg_domain:
Process(['iw', 'reg', 'set', reg_domain]).wait()
if self.args.hw:
self.discover_radios()
else:
self.create_radios()
def start_hostapd(self):
if not 'HOSTAPD' in self.hw_config:
return
settings = self.hw_config['HOSTAPD']
if self.args.hw:
# Just grab the first N radios. It gets rather
# complicated trying to map radX radios specified in
# hw.conf so any passed through physical adapters are
# just given to hostapd/IWD as they appear during
# discovery.
#
# TODO: It may be desireable to map PCI/USB adapters to
# specific radX radios specified in the config but
# there are really 2 separate use cases here.
# 1. You want to test a *specific* radio with IWD
# or hostapd. For this you would want radX
# to map to a specific radio
# 2. You have many adapters in use to run multiple
# tests. In this case you would not care what
# was using each radio, just that there was
# enough to run all tests.
nradios = 0
for k, _ in settings.items():
if k == 'radius_server':
continue
nradios += 1
hapd_radios = self.radios[:nradios]
else:
hapd_radios = [rad for rad in self.radios if rad.name in settings]
hapd_configs = [conf for rad, conf in settings.items() if rad != 'radius_server']
radius_config = settings.get('radius_server', None)
self.hostapd = Hostapd(self, hapd_radios, hapd_configs, radius_config)
self.hostapd.attach_cli()
def get_frequencies(self):
frequencies = []
for hapd in self.hostapd.instances:
frequencies.append(hapd.cli.frequency)
return frequencies
def start_wpas_interfaces(self):
if 'WPA_SUPPLICANT' not in self.hw_config:
return
settings = self.hw_config['WPA_SUPPLICANT']
if self.args.hw:
nradios = len(settings.items())
wpas_radios = self.radios[:nradios]
self.wpas_interfaces = []
#
# Physical radios most likely will use a different name
# than 'rad#' but the config file is referenced by these
# 'rad#' names. Iterate through both the settings and
# physical radios to create interfaces associated with
# each config file.
#
for vrad, hwrad in zip(settings.items(), wpas_radios):
self.wpas_interfaces.append(hwrad.create_interface(vrad[1], 'wpas'))
else:
wpas_radios = [rad for rad in self.radios if rad.name in settings]
self.wpas_interfaces = [rad.create_interface(settings[rad.name], 'wpas') \
for rad in wpas_radios]
def start_ofono(self):
sim_keys = self.hw_config['SETUP'].get('sim_keys', None)
if not sim_keys:
print("Ofono not requred")
return
elif sim_keys != 'ofono':
os.environ['IWD_SIM_KEYS'] = sim_keys
return
if not find_binary(['ofonod']) or not find_binary(['phonesim']):
print("Ofono or Phonesim not found, skipping test")
return
Process(['ip', 'link', 'set', 'lo', 'up']).wait()
os.environ['OFONO_PHONESIM_CONFIG'] = '/tmp/phonesim.conf'
phonesim_args = ['phonesim', '-p', '12345', '/usr/share/phonesim/default.xml']
self.start_process(phonesim_args)
#
# TODO:
# Is there something to wait for? Without this phonesim rejects
# connections on all but the fist test.
#
time.sleep(3)
ofono_args = ['ofonod', '-n', '--plugin=atmodem,phonesim']
if self.is_verbose('ofonod'):
ofono_args.append('-d')
self.start_process(ofono_args)
print("Ofono started")
def create_namespaces(self):
if not self.hw_config.has_section('NameSpaces'):
return
for key, value in self.hw_config.items('NameSpaces'):
radio_names = value.split(',')
# Gather up radio objects for this namespace
radios = [rad for rad in self.radios if rad.name in radio_names]
# Remove radios from 'root' namespace
self.radios = list(set(self.radios) - set(radios))
self.namespaces.append(Namespace(self.args, key, radios))
def get_namespace(self, ns):
for n in self.namespaces:
if n.name == ns:
return n
return None
def stop_test_processes(self):
for n in self.namespaces:
n.reset()
self.namespaces = []
self.hostapd = None
self.wpas_interfaces = None
self.reset()
def meminfo_to_dict(self):
def removesuffix(string, suffix):
if string.endswith(suffix):
return string[:-len(suffix)]
return string
ret = {}
with open('/proc/meminfo', 'r') as f:
data = f.read().strip().split('\n')
for l in data:
entry = l.split(':')
ret[entry[0]] = int(removesuffix(entry[1], 'kB'))
return ret
def __str__(self):
ret = 'Arguments:\n'
for arg in vars(self.args):
ret += '\t --%s %s\n' % (arg, str(getattr(self.args, arg)))
ret += 'Hostapd:\n'
if self.hostapd:
for h in self.hostapd.instances:
ret += '\t%s\n' % str(h)
else:
ret += '\tNo Hostapd instances\n'
info = self.meminfo_to_dict()
self._mem_chart.add_value(info['MemAvailable'])
ret += 'Available Memory: %u kB\n' % info['MemAvailable']
ret += 'Last Test Delta: %+d kB\n' % (info['MemAvailable'] - self._last_mem_available)
ret += 'Per-test Usage:\n'
ret += str(self._mem_chart)
self._last_mem_available = info['MemAvailable']
ret += super().__str__()
for n in self.namespaces:
ret += n.__str__()
return ret
def build_unit_list(args):
'''
Build list of unit tests based on passed arguments. This first
checks for literal names provided in the arguments, then if
no matches were found, checks for a glob match.
'''
tests = []
test_root = args.testhome + '/unit'
for unit in args.unit_tests.split(','):
path = '%s/%s' % (test_root, unit)
if os.access(unit, os.X_OK):
tests.append(unit)
elif os.access(path, os.X_OK):
tests.append(path)
else:
# Full list or glob, first build up valid list of tests
matches = glob(path)
if matches == []:
raise Exception("Could not find test %s" % unit)
matches = [exe for exe in matches if os.access(exe, os.X_OK)]
tests.extend(matches)
return sorted(tests)
def build_test_list(args):
'''
Build list of auto test directories based on passed arguments.
First check for absolute paths, then look in <iwd>/autotests,
then glob match.
'''
tests = []
test_root = args.testhome + '/autotests'
# Run all tests
if not args.autotests:
# Get list of all autotests (committed in git)
tests = os.popen('git -C %s ls-files autotests/ | cut -f2 -d"/" \
| grep "test*" | uniq' % args.testhome).read() \
.strip().split('\n')
tests = [test_root + '/' + t for t in tests]
else:
print("Generating partial test list")
full_list = sorted(os.listdir(test_root))
for t in args.autotests.split(','):
path = '%s/%s' % (test_root, t)
if t.endswith('+'):
t = t.split('+')[0]
i = full_list.index(t)
tests = [test_root + '/' + x for x in full_list[i:] \
if x.startswith('test')]
elif os.path.exists(t):
if t not in tests:
tests.append(t)
elif os.path.exists(path):
if path not in tests:
tests.append(path)
else:
matches = glob(path)
if matches == []:
raise Exception("Could not find test %s" % t)
tests.extend(list(set(matches) - set(tests)))
return sorted(tests)
SimpleResult = namedtuple('SimpleResult', 'run failures errors skipped time')
def start_test(ctx, subtests, rqueue):
'''
Run an individual test. 'subtests' are parsed prior to calling
but these effectively make up a single test. 'rqueue' is the
results queue which is required since this is using
multiprocessing.
'''
run = 0
errors = 0
failures = 0
skipped = 0
start = time.time()
#
# Iterate through each individual python test.
#
for s in subtests:
loader = unittest.TestLoader()
try:
module = importlib.import_module(os.path.splitext(s)[0])
except OSError as e:
dbg(subprocess.check_output("cat /proc/buddyinfo", shell=True).decode('utf-8'))
dbg(subprocess.check_output("dmesg | tail -80", shell=True).decode('utf-8'))
print(ctx)
raise e
subtest = loader.loadTestsFromModule(module)
# The test suite is being (ab)used to get a bit more granularity
# with individual tests. The 'normal' way to use unittest is to
# just create a test suite and run them. The problem here is that
# test results are queued and printed at the very end so its
# difficult to know *where* a test failed (python gives a stack
# trace but printing the exception/failure immediately shows
# where in the debug logs something failed). Moreso if there are
# several test functions inside a single python file they run
# as a single test and it is difficult (again) to know where
# something failed.
# Iterating through each python test file
for test in subtest:
limit_funcs = []
if ctx.args.sub_tests:
for i in ctx.args.sub_tests:
if len(i.split('.')) == 2:
limit_funcs.append(i.split('.')[1])
# Iterating through individual test functions inside a
# Test() class. Due to the nature of unittest we have
# to jump through some hoops to set up the test class
# only once by turning the enumeration into a list, then
# enumerating (again) to keep track of the index (just
# enumerating the test class doesn't allow len() because
# it is not a list).
tlist = list(enumerate(test))
for index, t in enumerate(tlist):
# enumerate is returning a tuple, index 1 is our
# actual object.
t = t[1]
func, file = str(t).split(' ')
#
# TODO: There may be a better way of doing this
# but strigifying the test class gives us a string:
# <function> (<file>.<class>)
#
file = file.strip('()').split('.')[0] + '.py'
# Create an empty result here in case the test fails
result = TestResult()
try:
skip = len(limit_funcs) > 0 and func not in limit_funcs
# Set up class only on first test
if index == 0:
if not skip:
dbg("%s\n\t%s RUNNING" % (file, str(func)), end='')
t.setUpClass()
else:
if not skip:
dbg("\t%s RUNNING" % str(func), end='')
sys.__stdout__.flush()
Process.write_separators("\n====== %s:%s ======\n\n" % (file, func))
if not skip:
# Run test (setUp/tearDown run automatically)
result = t()
# Tear down class only on last test
if index == len(tlist) - 1:
t.tearDownClass()
if skip:
continue
except unittest.SkipTest as e:
result.skipped.append(t)
except Exception as e:
dbg('\n%s threw an uncaught exception:' % func)
traceback.print_exc(file=sys.__stdout__)
run += result.testsRun
errors += len(result.errors)
failures += len(result.failures)
skipped += len(result.skipped)
if len(result.skipped) > 0:
dbg(colored(" SKIPPED", "cyan"))
elif run == 0 or len(result.errors) > 0 or len(result.failures) > 0:
dbg(colored(" FAILED", "red"))
for e in result.errors:
dbg(e[1])
for f in result.failures:
dbg(f[1])
else:
dbg(colored(" PASSED", "green"))
# Prevents future test modules with the same name (e.g.
# connection_test.py) from being loaded from the cache
sys.modules.pop(module.__name__)
#
# The multiprocessing queue is picky with what objects it will serialize
# and send between processes. Because of this we put the important bits
# of the result into our own 'SimpleResult' tuple.
#
sresult = SimpleResult(run=run, failures=failures, errors=errors,
skipped=skipped, time=time.time() - start)
rqueue.put(sresult)
# This may not be required since we are manually popping sys.modules
importlib.invalidate_caches()
def pre_test(ctx, test, copied):
'''
Copy test files, start processes, and any other pre test work.
'''
os.chdir(test)
dbg("\nStarting %s" % colored(os.path.basename(test), "white", attrs=['bold']))
if not os.path.exists(test + '/hw.conf'):
raise Exception("No hw.conf found for %s" % test)
ctx.hw_config = ConfigParser()
ctx.hw_config.read(test + '/hw.conf')
#
# We have two types of test files: tests and everything else. Rather
# than require each test to specify the files needing to be copied to
# /tmp (previously 'tmpfs_extra_stuff'), we just copy everything which
# isn't a test. There is really no reason not to do this as any file
# present in a test directory should be needed by the test.
#
# All files
files = os.listdir(test)
# Tests (starts or ends with 'test')
subtests = [f for f in files if f.startswith('test') or \
os.path.splitext(f)[0].endswith('test')]
# Everything else (except .py files)
to_copy = [f for f in list(set(files) - set(subtests)) if not f.endswith('.py') \
and f != '__pycache__']
for f in to_copy:
if os.path.isdir(f):
shutil.copytree(f, '/tmp/' + f)
else:
shutil.copy(f, '/tmp')
copied.append(f)
# Prune down any subtests if needed
if ctx.args.sub_tests:
ctx.args.sub_tests = ctx.args.sub_tests.split(',')
to_run = [x.split('.')[0] for x in ctx.args.sub_tests]
pruned = []
for s in subtests:
no_ext = s
# Handle <file>.<test function> format
if '.' in s:
no_ext = s.split('.')[0]
if no_ext in to_run:
pruned.append(no_ext + '.py')
subtests = pruned
if ctx.args.log:
ctx.start_process(['iwmon', '--nowiphy'])
elif ctx.args.monitor:
ctx.start_process(['iwmon'], outfile=ctx.args.monitor)
ctx.start_dbus()
ctx.start_haveged()
ctx.start_dbus_monitor()
ctx.start_radios()
ctx.create_namespaces()
ctx.start_hostapd()
ctx.start_wpas_interfaces()
ctx.start_ofono()
if ctx.hw_config.has_option('SETUP', 'start_iwd'):
start = ctx.hw_config.getboolean('SETUP', 'start_iwd')
else:
start = True
if start:
ctx.start_iwd()
else:
print("Not starting IWD from test-runner")
print(ctx)
sys.path.insert(1, test)
return sorted(subtests)
def post_test(ctx, to_copy):
'''
Remove copied files, and stop test processes.
'''
try:
for f in to_copy:
if os.path.isdir('/tmp/' + f):
shutil.rmtree('/tmp/' + f)
else:
os.remove('/tmp/' + f)
Process(['ip', 'link', 'set', 'lo', 'down']).wait()
except Exception as e:
print("Exception thrown in post_test")
finally:
ctx.stop_test_processes()
if ctx.args.valgrind:
for f in os.listdir('/tmp'):
if f.startswith("valgrind.log."):
dbg(f)
with open('/tmp/' + f, 'r') as v:
dbg(v.read())
dbg("\n")
os.remove('/tmp/' + f)
# Special case for when logging is enabled
if os.path.isfile('/tmp/iwd-tls-debug-server-cert.pem'):
os.remove('/tmp/iwd-tls-debug-server-cert.pem')
allowed = ['phonesim.conf', 'certs', 'secrets', 'iwd']
for f in [f for f in os.listdir('/tmp') if f not in allowed]:
dbg("File %s was not cleaned up!" % f)
try:
os.remove('/tmp/' + f)
except:
pass
def print_results(results):
table = PrettyTable(['Test', colored('Passed', 'green'), colored('Failed', 'red'), \
colored('Skipped', 'cyan'), colored('Time', 'yellow')])
total_pass = 0
total_fail = 0
total_skip = 0
total_time = 0
for test, result in results.items():
if result.time == TEST_MAX_TIMEOUT:
failed = "Timed out"
passed = "Timed out"
elif result.time == 0:
failed = "Exception"
passed = "Exception"
else:
failed = result.failures + result.errors
passed = result.run - failed
total_pass += passed
total_fail += failed
total_skip += result.skipped
total_time += result.time
time = '%.2f' % result.time
table.add_row([test, colored(passed, 'green'), colored(failed, 'red'), \
colored(result.skipped, 'cyan'), colored(time, 'yellow')])
total_time = '%.2f' % total_time
table.add_row(['Total', colored(total_pass, 'green'), colored(total_fail, 'red'), \
colored(total_skip, 'cyan'), colored(total_time, 'yellow')])
dbg(table)
return total_fail == 0
def run_auto_tests(ctx, args):
tests = build_test_list(args)
for test in tests:
copied = []
try:
subtests = pre_test(ctx, test, copied)
if len(subtests) < 1:
dbg("No tests to run")
sys.exit()
rqueue = multiprocessing.Queue()
p = multiprocessing.Process(target=start_test, args=(ctx, subtests, rqueue))
p.start()
# Rather than time each subtest we just time the total but
# mutiply the default time by the number of tests being run.
p.join(TEST_MAX_TIMEOUT * len(subtests))
if p.is_alive():
# Timeout
p.terminate()
ctx.results[os.path.basename(test)] = SimpleResult(run=0,
failures=0, errors=0,
skipped=0, time=TEST_MAX_TIMEOUT)
else:
ctx.results[os.path.basename(test)] = rqueue.get()
except Exception as ex:
dbg("%s threw an uncaught exception" % test)
traceback.print_exc(file=sys.__stdout__)
ctx.results[os.path.basename(test)] = SimpleResult(run=0, failures=0,
errors=0, skipped=0, time=0)
finally:
post_test(ctx, copied)
def run_unit_tests(ctx, args):
os.chdir(args.testhome + '/unit')
units = build_unit_list(args)
for u in units:
p = ctx.start_process([u]).wait()
if p.returncode != 0:
dbg("Unit test %s failed" % os.path.basename(u))
else:
dbg("Unit test %s passed" % os.path.basename(u))
def run_tests(args):
global config
os.chdir(args.testhome)
#
# This allows all autotest utils (iwd/hostapd/etc) to access the
# TestContext. Any other module or script (in the same interpreter) can
# simply import config.ctx and access all live test information,
# start/stop processes, see active radios etc.
#
config = importlib.import_module('config')
config.ctx = TestContext(args)
# Must import these after config so ctx gets set
config.hwsim = importlib.import_module('hwsim')
config.hostapd = importlib.import_module('hostapd')
# Start writing out kernel log
config.ctx.start_process(["dmesg", '--follow'])
if args.unit_tests is None:
run_auto_tests(config.ctx, args)
else:
run_unit_tests(config.ctx, args)
runner = Runner()
atexit.register(exit_vm)
runner.prepare_environment()
run_tests(runner.args)
runner.cleanup_environment()