From 2894f2e3ebe783ceb9993c4e883ed4f67f976228 Mon Sep 17 00:00:00 2001 From: James Prestwood Date: Thu, 31 Mar 2022 16:16:30 -0700 Subject: [PATCH] test-runner: rename test-runner, add run-tests In order to keep all test-runner dev scripts working and to work with the new runner.py system some file renaming was required. test-runner was renamed to run-tests A new test-runner was added which only creates the Runner() class. --- tools/run-tests | 1525 +++++++++++++++++++++++++++++++++++++++++++++ tools/test-runner | 1525 +-------------------------------------------- 2 files changed, 1526 insertions(+), 1524 deletions(-) create mode 100755 tools/run-tests diff --git a/tools/run-tests b/tools/run-tests new file mode 100755 index 00000000..ed6478ef --- /dev/null +++ b/tools/run-tests @@ -0,0 +1,1525 @@ +#!/usr/bin/python3 + +import os +import shutil +import fcntl +import sys +import subprocess +import atexit +import time +import unittest +import importlib +from unittest.result import TestResult +import multiprocessing +import re +import traceback + +from runner import Runner + +from configparser import ConfigParser +from prettytable import PrettyTable +from termcolor import colored +from glob import glob +from collections import namedtuple +from time import sleep +import dbus.mainloop.glib +from gi.repository import GLib +from weakref import WeakValueDictionary + +config = None +intf_id = 0 + +TEST_MAX_TIMEOUT = 240 + +dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + +def dbg(*s, **kwargs): + ''' + Allows prints if stdout has been re-directed + ''' + print(*s, **kwargs, file=sys.__stdout__) + +def exit_vm(): + if config: + for p in Process.get_all(): + print("Process %s still running!" % p.args[0]) + p.kill() + + if config.ctx and config.ctx.results: + success = print_results(config.ctx.results) + else: + success = False + + if config.ctx.args.result: + result = 'PASS' if success else 'FAIL' + with open(config.ctx.args.result, 'w') as f: + f.write(result) + + os.sync() + + runner.stop() + +def path_exists(path): + ''' + Searches PATH as well as absolute paths. + ''' + if shutil.which(path): + return True + try: + os.stat(path) + except: + return False + return True + +def find_binary(list): + ''' + Returns a binary from 'list' if its found in PATH or on a + valid absolute path. + ''' + for path in list: + if path_exists(path): + return path + return None + +# Partial DBus config. The remainder () will be filled in for each +# namespace that is created so each individual dbus-daemon has its own socket +# and address. +dbus_config = ''' + + +system +2147483647 +ANONYMOUS + + + + + + + + + + + + + + + +''' + +class Process(subprocess.Popen): + processes = WeakValueDictionary() + ctx = None + + def __new__(cls, *args, **kwargs): + obj = super().__new__(cls) + cls.processes[id(obj)] = obj + return obj + + def __init__(self, args, namespace=None, outfile=None, env=None, check=False, cleanup=None): + self.write_fds = [] + self.io_watch = None + self.cleanup = cleanup + self.verbose = False + self.out = '' + self.hup = False + self.killed = False + self.namespace = namespace + + if not self.ctx: + global config + self.ctx = config.ctx + + if self.ctx.is_verbose(args[0], log=False): + self.verbose = True + + if namespace: + args = ['ip', 'netns', 'exec', namespace] + args + + if outfile: + # outfile is only used by iwmon, in which case we don't want + # to append to an existing file. + self._append_outfile(outfile, append=False) + + if self.ctx.args.log: + logfile = '%s/%s/%s' % (self.ctx.args.log, + os.path.basename(os.getcwd()), + args[0]) + self._append_outfile(logfile) + + super().__init__(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + env=env, cwd=os.getcwd()) + + # Set as non-blocking so read() in the IO callback doesn't block forever + fl = fcntl.fcntl(self.stdout, fcntl.F_GETFL) + fcntl.fcntl(self.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK) + + self.io_watch = GLib.io_add_watch(self.stdout, GLib.IO_IN | + GLib.IO_HUP | GLib.IO_ERR, self.process_io) + + print("Starting process {}".format(self.args)) + + if check: + self.wait(10) + self.killed = True + if self.returncode != 0: + raise subprocess.CalledProcessError(returncode=self.returncode, + cmd=args) + + @classmethod + def get_all(cls): + return cls.processes.values() + + @classmethod + def kill_all(cls): + for p in cls.processes.values(): + p.kill() + + @staticmethod + def _write_io(instance, data, stdout=True): + for f in instance.write_fds: + f.write(data) + + # Write out a separator so multiple process calls per + # test are easer to read. + if instance.hup: + f.write("Terminated: {}\n\n".format(instance.args)) + + f.flush() + + if instance.verbose and stdout: + sys.__stdout__.write(data) + sys.__stdout__.flush() + + @classmethod + def write_separators(cls, sep): + for proc in cls.processes.values(): + if proc.killed: + continue + + cls._write_io(proc, sep, stdout=False) + + def process_io(self, source, condition): + if condition & GLib.IO_HUP: + self.hup = True + + data = source.read() + + if not data: + return True + + data = data.decode('utf-8') + + # Save data away in case the caller needs it (e.g. list_sta) + self.out += data + + self._write_io(self, data) + + return True + + def _append_outfile(self, file, append=True): + gid = int(os.environ.get('SUDO_GID', os.getgid())) + uid = int(os.environ.get('SUDO_UID', os.getuid())) + dir = os.path.dirname(file) + + if not path_exists(dir): + os.mkdir(dir) + os.chown(dir, uid, gid) + + file = os.path.join(dir,file) + + # If the out file exists, append. Useful for processes like + # hostapd_cli where it is called multiple times independently. + if os.path.isfile(file) and append: + mode = 'a' + else: + mode = 'w' + + try: + f = open(file, mode) + except Exception as e: + traceback.print_exc() + sys.exit(0) + + os.fchown(f.fileno(), uid, gid) + + self.write_fds.append(f) + + def wait_for_socket(self, socket, wait): + Namespace.non_block_wait(os.path.exists, wait, socket) + + # Wait for both process termination and HUP signal + def __wait(self, timeout): + try: + super().wait(timeout) + if not self.hup: + return False + + return True + except: + return False + + # Override wait() so it can do so non-blocking + def wait(self, timeout=10): + Namespace.non_block_wait(self.__wait, timeout, 1) + self._cleanup() + + def _cleanup(self): + if self.cleanup: + self.cleanup() + + self.write_fds = [] + + if self.io_watch: + GLib.source_remove(self.io_watch) + self.io_watch = None + + self.cleanup = None + self.killed = True + + # Override kill() + def kill(self, force=False): + if self.killed: + return + + print("Killing process {}".format(self.args)) + + if force: + super().kill() + else: + self.terminate() + + try: + self.wait(timeout=15) + except: + dbg("Process %s did not complete in 15 seconds!" % self.name) + super().kill() + + self._cleanup() + + def __str__(self): + return str(self.args) + '\n' + +class Interface: + def __init__(self, name, config): + self.name = name + self.ctrl_interface = '/var/run/hostapd/' + name + self.config = config + + def __del__(self): + Process(['iw', 'dev', self.name, 'del']).wait() + + def set_interface_state(self, state): + Process(['ip', 'link', 'set', self.name, state]).wait() + +class Radio: + def __init__(self, name): + self.name = name + # hostapd will reset this if this radio is used by it + self.use = 'iwd' + self.interface = None + + def __del__(self): + print("Removing radio %s" % self.name) + self.interface = None + + def create_interface(self, config, use): + global intf_id + + ifname = 'wln%s' % intf_id + + intf_id += 1 + + self.interface = Interface(ifname, config) + self.use = use + + Process(['iw', 'phy', self.name, 'interface', 'add', ifname, + 'type', 'managed']).wait() + + return self.interface + + def __str__(self): + ret = self.name + ':\n' + ret += '\tUsed By: %s ' % self.use + if self.interface: + ret += '(%s)' % self.interface.name + + ret += '\n' + + return ret + +class VirtualRadio(Radio): + ''' + A subclass of 'Radio' specific to mac80211_hwsim radios. + + TODO: Using D-Bus to create and destroy radios is more desireable + than the command line. + ''' + + def __init__(self, name, cfg=None): + global config + + self.disable_cipher = None + self.disable_iftype = None + + self.hwsim = config.hwsim.Hwsim() + + if cfg: + self.disable_iftype = cfg.get('iftype_disable', None) + self.disable_cipher = cfg.get('cipher_disable', None) + + self._radio = self.hwsim.radios.create(name, p2p_device=True, + iftype_disable=self.disable_iftype, + cipher_disable=self.disable_cipher) + + super().__init__(self._radio.name) + + def __del__(self): + super().__del__() + + # If the radio was moved into a namespace this will fail + try: + self._radio.remove() + except: + pass + + self._radio = None + + def __str__(self): + ret = super().__str__() + + if self.disable_iftype: + ret += '\tDisabled interface types: %s\n' % self.disable_iftype + + if self.disable_cipher: + ret += '\tDisabled ciphers: %s\n' % self.disable_cipher + + ret += '\tPath: %s' % self._radio.path + + ret += '\n' + + return ret + +class HostapdInstance: + ''' + A single instance of hostapd. In reality all hostapd instances + are started as a single process. This class just makes things + convenient for communicating with one of the hostapd APs. + ''' + def __init__(self, config, radio): + self.radio = radio + self.config = config + self.cli = None + + self.intf = radio.create_interface(self.config, 'hostapd') + self.intf.set_interface_state('up') + + def __del__(self): + print("Removing HostapdInstance %s" % self.config) + self.intf.set_interface_state('down') + self.radio = None + self.intf = None + + def __str__(self): + ret = 'Hostapd (%s)\n' % self.intf.name + ret += '\tConfig: %s\n' % self.config + + return ret + +class Hostapd: + ''' + A set of running hostapd instances. This is really just a single + process since hostapd can be started with multiple config files. + ''' + def __init__(self, ctx, radios, configs, radius): + self.ctx = ctx + + if len(configs) != len(radios): + raise Exception("Config (%d) and radio (%d) list length not equal" % \ + (len(configs), len(radios))) + + print("Initializing hostapd instances") + + Process(['ip', 'link', 'set', 'eth0', 'up']).wait() + Process(['ip', 'link', 'set', 'eth1', 'up']).wait() + + self.global_ctrl_iface = '/var/run/hostapd/ctrl' + + self.instances = [HostapdInstance(c, r) for c, r in zip(configs, radios)] + + ifaces = [rad.interface.name for rad in radios] + ifaces = ','.join(ifaces) + + args = ['hostapd', '-g', self.global_ctrl_iface] + + if ifaces: + args.extend(['-i', ifaces]) + + # + # Config files should already be present in /tmp. This appends + # ctrl_interface and does any variable replacement. Currently + # this is just any $ifaceN occurrences. + # + for c in configs: + full_path = '/tmp/%s' % c + args.append(full_path) + + self._rewrite_config(full_path) + + if radius: + args.append(radius) + + if ctx.is_verbose('hostapd'): + args.append('-d') + + self.process = Process(args) + + self.process.wait_for_socket(self.global_ctrl_iface, 30) + + for hapd in self.instances: + self.process.wait_for_socket(hapd.intf.ctrl_interface, 30) + + def attach_cli(self): + global config + + for hapd in self.instances: + hapd.cli = config.hostapd.HostapdCLI(config=hapd.config) + + def _rewrite_config(self, config): + ''' + Replaces any $ifaceN values with the correct interface + names as well as appends the ctrl_interface path to + the config file. + ''' + with open(config, 'r+') as f: + data = f.read() + to_replace = [] + for match in re.finditer(r'\$iface[0-9]+', data): + tag = data[match.start():match.end()] + idx = tag.split('iface')[1] + + to_replace.append((tag, self.instances[int(idx)].intf.name)) + + for r in to_replace: + data = data.replace(r[0], r[1], 1) + + data += '\nctrl_interface=/var/run/hostapd\n' + + f.write(data) + + def __getitem__(self, config): + if not config: + return self.instances[0] + + for hapd in self.instances: + if hapd.config == config: + return hapd + + return None + + def __del__(self): + print("Removing Hostapd") + try: + os.remove(self.global_ctrl_iface) + except: + print("Failed to remove %s" % self.global_ctrl_iface) + + self.instances = None + + # Hostapd may have already been stopped + if self.process: + self.ctx.stop_process(self.process) + + self.ctx = None + + # Hostapd creates simdb sockets for EAP-SIM/AKA tests but does not + # clean them up. + for f in glob("/tmp/eap_sim_db*"): + os.remove(f) + +dbus_count = 0 + +class Namespace: + def __init__(self, args, name, radios): + self.dbus_address = None + self.name = name + self.radios = radios + self.args = args + + Process(['ip', 'netns', 'add', name]).wait() + for r in radios: + Process(['iw', 'phy', r.name, 'set', 'netns', 'name', name]).wait() + + self.start_dbus() + + def reset(self): + self._bus = None + + for r in self.radios: + r._radio = None + + self.radios = [] + + Process.kill_all() + + def __del__(self): + if self.name: + print("Removing namespace %s" % self.name) + + Process(['ip', 'netns', 'del', self.name]).wait() + + def get_bus(self): + return self._bus + + def start_process(self, args, env=None, **kwargs): + if not env: + env = os.environ.copy() + + if hasattr(self, "dbus_address"): + # In case this process needs DBus... + env['DBUS_SYSTEM_BUS_ADDRESS'] = self.dbus_address + + return Process(args, namespace=self.name, env=env, **kwargs) + + def stop_process(self, p, force=False): + p.kill(force) + + def is_process_running(self, process): + for p in Process.get_all(): + if p.namespace == self.name and p.args[0] == process: + return True + return False + + def _cleanup_dbus(self): + try: + os.remove(self.dbus_address.split('=')[1]) + except: + pass + + os.remove(self.dbus_cfg) + + def start_dbus(self): + global dbus_count + + self.dbus_address = 'unix:path=/tmp/dbus%d' % dbus_count + self.dbus_cfg = '/tmp/dbus%d.conf' % dbus_count + dbus_count += 1 + + with open(self.dbus_cfg, 'w+') as f: + f.write(dbus_config) + f.write('%s\n' % self.dbus_address) + f.write('\n') + + p = self.start_process(['dbus-daemon', '--config-file=%s' % self.dbus_cfg], + cleanup=self._cleanup_dbus) + + p.wait_for_socket(self.dbus_address.split('=')[1], 5) + + self._bus = dbus.bus.BusConnection(address_or_type=self.dbus_address) + + def start_iwd(self, config_dir = '/tmp', storage_dir = '/tmp/iwd'): + args = [] + iwd_radios = ','.join([r.name for r in self.radios if r.use == 'iwd']) + + if self.args.valgrind: + args.extend(['valgrind', '--leak-check=full', '--track-origins=yes', + '--show-leak-kinds=all', + '--log-file=/tmp/valgrind.log.%p']) + + args.extend(['iwd', '-E']) + + if iwd_radios != '': + args.extend(['-p', iwd_radios]) + + if self.is_verbose(args[0]): + args.append('-d') + + env = os.environ.copy() + + env['CONFIGURATION_DIRECTORY'] = config_dir + env['STATE_DIRECTORY'] = storage_dir + + if self.is_verbose('iwd-dhcp'): + env['IWD_DHCP_DEBUG'] = '1' + + if self.is_verbose('iwd-tls'): + env['IWD_TLS_DEBUG'] = '1' + + if self.is_verbose('iwd-acd'): + env['IWD_ACD_DEBUG'] = '1' + + return self.start_process(args, env=env) + + def is_verbose(self, process, log=True): + process = os.path.basename(process) + + if self.args is None: + return False + + # every process is verbose when logging is enabled + if log and self.args.log: + return True + + if process in self.args.verbose: + return True + + # Special case here to enable verbose output with valgrind running + if process == 'valgrind' and 'iwd' in self.args.verbose: + return True + + # Handle any glob matches + for item in self.args.verbose: + if process in glob(item): + return True + + return False + + @staticmethod + def non_block_wait(func, timeout, *args, exception=True): + ''' + Convenience function for waiting in a non blocking + manor using GLibs context iteration i.e. does not block + the main loop while waiting. + + 'func' will be called at least once and repeatedly until + either it returns success, throws an exception, or the + 'timeout' expires. + + 'timeout' is the ultimate timeout in seconds + + '*args' will be passed to 'func' + + If 'exception' is an Exception type it will be raised. + If 'exception' is True a generic TimeoutError will be raised. + Any other value will not result in an exception. + ''' + # Simple class for signaling the wait timeout + class Bool: + def __init__(self, value): + self.value = value + + def wait_timeout_cb(done): + done.value = True + return False + + mainloop = GLib.MainLoop() + done = Bool(False) + + timeout = GLib.timeout_add_seconds(timeout, wait_timeout_cb, done) + context = mainloop.get_context() + + while True: + context.iteration(may_block=False) + + try: + ret = func(*args) + if ret: + if not done.value: + GLib.source_remove(timeout) + return ret + except Exception as e: + if not done.value: + GLib.source_remove(timeout) + raise e + + sleep(0.1) + + if done.value == True: + if isinstance(exception, Exception): + raise exception + elif type(exception) == bool and exception: + raise TimeoutError("Timeout on non_block_wait") + else: + return + + def __str__(self): + ret = 'Namespace: %s\n' % self.name + ret += 'Processes:\n' + for p in Process.get_all(): + ret += '\t%s' % str(p) + + ret += 'Radios:\n' + if len(self.radios) > 0: + for r in self.radios: + ret += '\t%s\n' % str(r) + else: + ret += '\tNo Radios\n' + + ret += 'DBus Address: %s\n' % self.dbus_address + ret += '===================================================\n\n' + + return ret + +class BarChart(): + def __init__(self, height=10, max_width=80): + self._height = height + self._max_width = max_width + self._values = [] + self._max_value = 0 + self._min_value = 0 + + def add_value(self, value): + if len(self._values) == 0: + self._max_value = int(1.01 * value) + self._min_value = int(0.99 * value) + elif value > self._max_value: + self._max_value = int(1.01 * value) + elif value < self._min_value: + self._min_value = int(0.99 * value) + + self._values.append(value) + + def _value_to_stars(self, value): + # Need to scale value (range of min_value -> max_value) to + # a range of 0 -> height + # + # Scaled = ((value - min_value) / ( max_value - min_value)) * (Height - 0) + 0 + + return int(((value - self._min_value) / + (self._max_value - self._min_value)) * self._height) + + def __str__(self): + # Need to map value from range 0 - self._height + ret = '' + + for i, value in enumerate(self._values): + stars = self._value_to_stars(value) + ret += '[%3u] ' % i + '%-10s' % ('*' * stars) + '\t\t\t%d\n' % value + + ret += '\n' + + return ret + + +class TestContext(Namespace): + ''' + Contains all information for a given set of tests being run + such as processes, radios, interfaces and test results. + ''' + def __init__(self, args): + self.name = None + self.args = args + self.hw_config = None + self.hostapd = None + self.wpas_interfaces = None + self.cur_radio_id = 0 + self.cur_iface_id = 0 + self.radios = [] + self.loopback_started = False + self.results = {} + self.mainloop = GLib.MainLoop() + self.namespaces = [] + self._last_mem_available = 0 + self._mem_chart = BarChart() + + def start_dbus_monitor(self): + if not self.is_verbose('dbus-monitor'): + return + + self.start_process(['dbus-monitor', '--address', self.dbus_address]) + + def start_haveged(self): + self.start_process(['haveged', '-F']) + + def create_radios(self): + setup = self.hw_config['SETUP'] + nradios = int(setup['num_radios']) + args = ['hwsim'] + + if self.hw_config['SETUP'].get('hwsim_medium', 'no') in ['no', '0', 'false']: + # register hwsim as medium + args.extend(['--no-register']) + + self.start_process(args) + self.non_block_wait(self._bus.name_has_owner, 20, 'net.connman.hwsim', + exception=TimeoutError('net.connman.hwsim did not appear')) + + for i in range(nradios): + name = 'rad%u' % i + + # Get any [radX] sections. These are for configuring + # any special radios. This no longer requires a + # radio_conf list, we just assume radios start rad0 + # and increment. + rad_config = None + if self.hw_config.has_section(name): + rad_config = self.hw_config[name] + + self.radios.append(VirtualRadio(name, rad_config)) + self.cur_radio_id += 1 + + def discover_radios(self): + import pyroute2 + + phys = [] + + try: + iw = pyroute2.iwutil.IW() + except: + iw = pyroute2.IW() + + attrs = [phy['attrs'] for phy in iw.list_wiphy()] + + for attr in attrs: + for key, value in attr: + if key == 'NL80211_ATTR_WIPHY_NAME': + if value not in phys: + phys.append(value) + break + + print('Discovered radios: %s' % str(phys)) + self.radios = [Radio(name) for name in phys] + + def start_radios(self): + reg_domain = self.hw_config['SETUP'].get('reg_domain', None) + if reg_domain: + Process(['iw', 'reg', 'set', reg_domain]).wait() + + if self.args.hw: + self.discover_radios() + else: + self.create_radios() + + def start_hostapd(self): + if not 'HOSTAPD' in self.hw_config: + return + + settings = self.hw_config['HOSTAPD'] + + if self.args.hw: + # Just grab the first N radios. It gets rather + # complicated trying to map radX radios specified in + # hw.conf so any passed through physical adapters are + # just given to hostapd/IWD as they appear during + # discovery. + # + # TODO: It may be desireable to map PCI/USB adapters to + # specific radX radios specified in the config but + # there are really 2 separate use cases here. + # 1. You want to test a *specific* radio with IWD + # or hostapd. For this you would want radX + # to map to a specific radio + # 2. You have many adapters in use to run multiple + # tests. In this case you would not care what + # was using each radio, just that there was + # enough to run all tests. + nradios = 0 + for k, _ in settings.items(): + if k == 'radius_server': + continue + nradios += 1 + + hapd_radios = self.radios[:nradios] + + else: + hapd_radios = [rad for rad in self.radios if rad.name in settings] + + hapd_configs = [conf for rad, conf in settings.items() if rad != 'radius_server'] + + radius_config = settings.get('radius_server', None) + + self.hostapd = Hostapd(self, hapd_radios, hapd_configs, radius_config) + self.hostapd.attach_cli() + + def get_frequencies(self): + frequencies = [] + + for hapd in self.hostapd.instances: + frequencies.append(hapd.cli.frequency) + + return frequencies + + def start_wpas_interfaces(self): + + if 'WPA_SUPPLICANT' not in self.hw_config: + return + + settings = self.hw_config['WPA_SUPPLICANT'] + + if self.args.hw: + nradios = len(settings.items()) + + wpas_radios = self.radios[:nradios] + self.wpas_interfaces = [] + + # + # Physical radios most likely will use a different name + # than 'rad#' but the config file is referenced by these + # 'rad#' names. Iterate through both the settings and + # physical radios to create interfaces associated with + # each config file. + # + for vrad, hwrad in zip(settings.items(), wpas_radios): + self.wpas_interfaces.append(hwrad.create_interface(vrad[1], 'wpas')) + + else: + wpas_radios = [rad for rad in self.radios if rad.name in settings] + self.wpas_interfaces = [rad.create_interface(settings[rad.name], 'wpas') \ + for rad in wpas_radios] + + def start_ofono(self): + sim_keys = self.hw_config['SETUP'].get('sim_keys', None) + if not sim_keys: + print("Ofono not requred") + return + elif sim_keys != 'ofono': + os.environ['IWD_SIM_KEYS'] = sim_keys + return + + if not find_binary(['ofonod']) or not find_binary(['phonesim']): + print("Ofono or Phonesim not found, skipping test") + return + + Process(['ip', 'link', 'set', 'lo', 'up']).wait() + + os.environ['OFONO_PHONESIM_CONFIG'] = '/tmp/phonesim.conf' + + phonesim_args = ['phonesim', '-p', '12345', '/usr/share/phonesim/default.xml'] + + self.start_process(phonesim_args) + + # + # TODO: + # Is there something to wait for? Without this phonesim rejects + # connections on all but the fist test. + # + time.sleep(3) + + ofono_args = ['ofonod', '-n', '--plugin=atmodem,phonesim'] + if self.is_verbose('ofonod'): + ofono_args.append('-d') + + self.start_process(ofono_args) + + print("Ofono started") + + def create_namespaces(self): + if not self.hw_config.has_section('NameSpaces'): + return + + for key, value in self.hw_config.items('NameSpaces'): + radio_names = value.split(',') + # Gather up radio objects for this namespace + radios = [rad for rad in self.radios if rad.name in radio_names] + + # Remove radios from 'root' namespace + self.radios = list(set(self.radios) - set(radios)) + + self.namespaces.append(Namespace(self.args, key, radios)) + + def get_namespace(self, ns): + for n in self.namespaces: + if n.name == ns: + return n + + return None + + def stop_test_processes(self): + for n in self.namespaces: + n.reset() + + self.namespaces = [] + self.hostapd = None + self.wpas_interfaces = None + + self.reset() + + def meminfo_to_dict(self): + def removesuffix(string, suffix): + if string.endswith(suffix): + return string[:-len(suffix)] + return string + + ret = {} + + with open('/proc/meminfo', 'r') as f: + data = f.read().strip().split('\n') + + for l in data: + entry = l.split(':') + ret[entry[0]] = int(removesuffix(entry[1], 'kB')) + + return ret + + def __str__(self): + ret = 'Arguments:\n' + for arg in vars(self.args): + ret += '\t --%s %s\n' % (arg, str(getattr(self.args, arg))) + + ret += 'Hostapd:\n' + if self.hostapd: + for h in self.hostapd.instances: + ret += '\t%s\n' % str(h) + else: + ret += '\tNo Hostapd instances\n' + + info = self.meminfo_to_dict() + self._mem_chart.add_value(info['MemAvailable']) + + ret += 'Available Memory: %u kB\n' % info['MemAvailable'] + ret += 'Last Test Delta: %+d kB\n' % (info['MemAvailable'] - self._last_mem_available) + ret += 'Per-test Usage:\n' + ret += str(self._mem_chart) + + self._last_mem_available = info['MemAvailable'] + + ret += super().__str__() + + for n in self.namespaces: + ret += n.__str__() + + return ret + +def build_unit_list(args): + ''' + Build list of unit tests based on passed arguments. This first + checks for literal names provided in the arguments, then if + no matches were found, checks for a glob match. + ''' + tests = [] + test_root = args.testhome + '/unit' + + for unit in args.unit_tests.split(','): + path = '%s/%s' % (test_root, unit) + if os.access(unit, os.X_OK): + tests.append(unit) + elif os.access(path, os.X_OK): + tests.append(path) + else: + # Full list or glob, first build up valid list of tests + matches = glob(path) + if matches == []: + raise Exception("Could not find test %s" % unit) + + matches = [exe for exe in matches if os.access(exe, os.X_OK)] + + tests.extend(matches) + + return sorted(tests) + +def build_test_list(args): + ''' + Build list of auto test directories based on passed arguments. + First check for absolute paths, then look in /autotests, + then glob match. + ''' + tests = [] + test_root = args.testhome + '/autotests' + + # Run all tests + if not args.autotests: + # Get list of all autotests (committed in git) + tests = os.popen('git -C %s ls-files autotests/ | cut -f2 -d"/" \ + | grep "test*" | uniq' % args.testhome).read() \ + .strip().split('\n') + tests = [test_root + '/' + t for t in tests] + else: + print("Generating partial test list") + + full_list = sorted(os.listdir(test_root)) + + for t in args.autotests.split(','): + path = '%s/%s' % (test_root, t) + if t.endswith('+'): + t = t.split('+')[0] + i = full_list.index(t) + + tests = [test_root + '/' + x for x in full_list[i:] \ + if x.startswith('test')] + elif os.path.exists(t): + if t not in tests: + tests.append(t) + elif os.path.exists(path): + if path not in tests: + tests.append(path) + else: + matches = glob(path) + if matches == []: + raise Exception("Could not find test %s" % t) + + tests.extend(list(set(matches) - set(tests))) + + return sorted(tests) + +SimpleResult = namedtuple('SimpleResult', 'run failures errors skipped time') + +def start_test(ctx, subtests, rqueue): + ''' + Run an individual test. 'subtests' are parsed prior to calling + but these effectively make up a single test. 'rqueue' is the + results queue which is required since this is using + multiprocessing. + ''' + run = 0 + errors = 0 + failures = 0 + skipped = 0 + + start = time.time() + # + # Iterate through each individual python test. + # + for s in subtests: + loader = unittest.TestLoader() + try: + module = importlib.import_module(os.path.splitext(s)[0]) + except OSError as e: + dbg(subprocess.check_output("cat /proc/buddyinfo", shell=True).decode('utf-8')) + dbg(subprocess.check_output("dmesg | tail -80", shell=True).decode('utf-8')) + print(ctx) + raise e + + subtest = loader.loadTestsFromModule(module) + + # The test suite is being (ab)used to get a bit more granularity + # with individual tests. The 'normal' way to use unittest is to + # just create a test suite and run them. The problem here is that + # test results are queued and printed at the very end so its + # difficult to know *where* a test failed (python gives a stack + # trace but printing the exception/failure immediately shows + # where in the debug logs something failed). Moreso if there are + # several test functions inside a single python file they run + # as a single test and it is difficult (again) to know where + # something failed. + + # Iterating through each python test file + for test in subtest: + limit_funcs = [] + + if ctx.args.sub_tests: + for i in ctx.args.sub_tests: + if len(i.split('.')) == 2: + limit_funcs.append(i.split('.')[1]) + + # Iterating through individual test functions inside a + # Test() class. Due to the nature of unittest we have + # to jump through some hoops to set up the test class + # only once by turning the enumeration into a list, then + # enumerating (again) to keep track of the index (just + # enumerating the test class doesn't allow len() because + # it is not a list). + tlist = list(enumerate(test)) + for index, t in enumerate(tlist): + # enumerate is returning a tuple, index 1 is our + # actual object. + t = t[1] + + func, file = str(t).split(' ') + # + # TODO: There may be a better way of doing this + # but strigifying the test class gives us a string: + # (.) + # + file = file.strip('()').split('.')[0] + '.py' + + # Create an empty result here in case the test fails + result = TestResult() + + try: + skip = len(limit_funcs) > 0 and func not in limit_funcs + + # Set up class only on first test + if index == 0: + if not skip: + dbg("%s\n\t%s RUNNING" % (file, str(func)), end='') + t.setUpClass() + else: + if not skip: + dbg("\t%s RUNNING" % str(func), end='') + + sys.__stdout__.flush() + + Process.write_separators("\n====== %s:%s ======\n\n" % (file, func)) + + if not skip: + # Run test (setUp/tearDown run automatically) + result = t() + + # Tear down class only on last test + if index == len(tlist) - 1: + t.tearDownClass() + + if skip: + continue + except unittest.SkipTest as e: + result.skipped.append(t) + except Exception as e: + dbg('\n%s threw an uncaught exception:' % func) + traceback.print_exc(file=sys.__stdout__) + + run += result.testsRun + errors += len(result.errors) + failures += len(result.failures) + skipped += len(result.skipped) + + if len(result.skipped) > 0: + dbg(colored(" SKIPPED", "cyan")) + elif run == 0 or len(result.errors) > 0 or len(result.failures) > 0: + dbg(colored(" FAILED", "red")) + for e in result.errors: + dbg(e[1]) + for f in result.failures: + dbg(f[1]) + else: + dbg(colored(" PASSED", "green")) + + # Prevents future test modules with the same name (e.g. + # connection_test.py) from being loaded from the cache + sys.modules.pop(module.__name__) + + # + # The multiprocessing queue is picky with what objects it will serialize + # and send between processes. Because of this we put the important bits + # of the result into our own 'SimpleResult' tuple. + # + sresult = SimpleResult(run=run, failures=failures, errors=errors, + skipped=skipped, time=time.time() - start) + rqueue.put(sresult) + + # This may not be required since we are manually popping sys.modules + importlib.invalidate_caches() + +def pre_test(ctx, test, copied): + ''' + Copy test files, start processes, and any other pre test work. + ''' + os.chdir(test) + + dbg("\nStarting %s" % colored(os.path.basename(test), "white", attrs=['bold'])) + if not os.path.exists(test + '/hw.conf'): + raise Exception("No hw.conf found for %s" % test) + + ctx.hw_config = ConfigParser() + ctx.hw_config.read(test + '/hw.conf') + # + # We have two types of test files: tests and everything else. Rather + # than require each test to specify the files needing to be copied to + # /tmp (previously 'tmpfs_extra_stuff'), we just copy everything which + # isn't a test. There is really no reason not to do this as any file + # present in a test directory should be needed by the test. + # + # All files + files = os.listdir(test) + # Tests (starts or ends with 'test') + subtests = [f for f in files if f.startswith('test') or \ + os.path.splitext(f)[0].endswith('test')] + # Everything else (except .py files) + to_copy = [f for f in list(set(files) - set(subtests)) if not f.endswith('.py') \ + and f != '__pycache__'] + for f in to_copy: + if os.path.isdir(f): + shutil.copytree(f, '/tmp/' + f) + else: + shutil.copy(f, '/tmp') + copied.append(f) + + # Prune down any subtests if needed + if ctx.args.sub_tests: + ctx.args.sub_tests = ctx.args.sub_tests.split(',') + + to_run = [x.split('.')[0] for x in ctx.args.sub_tests] + pruned = [] + + for s in subtests: + no_ext = s + # Handle . format + if '.' in s: + no_ext = s.split('.')[0] + + if no_ext in to_run: + pruned.append(no_ext + '.py') + + subtests = pruned + + if ctx.args.log: + ctx.start_process(['iwmon', '--nowiphy']) + elif ctx.args.monitor: + ctx.start_process(['iwmon'], outfile=ctx.args.monitor) + + ctx.start_dbus() + ctx.start_haveged() + ctx.start_dbus_monitor() + ctx.start_radios() + ctx.create_namespaces() + ctx.start_hostapd() + ctx.start_wpas_interfaces() + ctx.start_ofono() + + if ctx.hw_config.has_option('SETUP', 'start_iwd'): + start = ctx.hw_config.getboolean('SETUP', 'start_iwd') + else: + start = True + + if start: + ctx.start_iwd() + else: + print("Not starting IWD from test-runner") + + print(ctx) + + sys.path.insert(1, test) + + return sorted(subtests) + +def post_test(ctx, to_copy): + ''' + Remove copied files, and stop test processes. + ''' + try: + for f in to_copy: + if os.path.isdir('/tmp/' + f): + shutil.rmtree('/tmp/' + f) + else: + os.remove('/tmp/' + f) + + Process(['ip', 'link', 'set', 'lo', 'down']).wait() + except Exception as e: + print("Exception thrown in post_test") + finally: + ctx.stop_test_processes() + + if ctx.args.valgrind: + for f in os.listdir('/tmp'): + if f.startswith("valgrind.log."): + dbg(f) + with open('/tmp/' + f, 'r') as v: + dbg(v.read()) + dbg("\n") + os.remove('/tmp/' + f) + + # Special case for when logging is enabled + if os.path.isfile('/tmp/iwd-tls-debug-server-cert.pem'): + os.remove('/tmp/iwd-tls-debug-server-cert.pem') + + allowed = ['phonesim.conf', 'certs', 'secrets', 'iwd'] + for f in [f for f in os.listdir('/tmp') if f not in allowed]: + dbg("File %s was not cleaned up!" % f) + try: + os.remove('/tmp/' + f) + except: + pass + +def print_results(results): + table = PrettyTable(['Test', colored('Passed', 'green'), colored('Failed', 'red'), \ + colored('Skipped', 'cyan'), colored('Time', 'yellow')]) + + total_pass = 0 + total_fail = 0 + total_skip = 0 + total_time = 0 + + for test, result in results.items(): + + if result.time == TEST_MAX_TIMEOUT: + failed = "Timed out" + passed = "Timed out" + elif result.time == 0: + failed = "Exception" + passed = "Exception" + else: + failed = result.failures + result.errors + passed = result.run - failed + + total_pass += passed + total_fail += failed + total_skip += result.skipped + + total_time += result.time + + time = '%.2f' % result.time + + table.add_row([test, colored(passed, 'green'), colored(failed, 'red'), \ + colored(result.skipped, 'cyan'), colored(time, 'yellow')]) + + total_time = '%.2f' % total_time + + table.add_row(['Total', colored(total_pass, 'green'), colored(total_fail, 'red'), \ + colored(total_skip, 'cyan'), colored(total_time, 'yellow')]) + + dbg(table) + + return total_fail == 0 + +def run_auto_tests(ctx, args): + tests = build_test_list(args) + + for test in tests: + copied = [] + try: + subtests = pre_test(ctx, test, copied) + + if len(subtests) < 1: + dbg("No tests to run") + sys.exit() + + rqueue = multiprocessing.Queue() + p = multiprocessing.Process(target=start_test, args=(ctx, subtests, rqueue)) + p.start() + # Rather than time each subtest we just time the total but + # mutiply the default time by the number of tests being run. + p.join(TEST_MAX_TIMEOUT * len(subtests)) + + if p.is_alive(): + # Timeout + p.terminate() + + ctx.results[os.path.basename(test)] = SimpleResult(run=0, + failures=0, errors=0, + skipped=0, time=TEST_MAX_TIMEOUT) + else: + ctx.results[os.path.basename(test)] = rqueue.get() + + except Exception as ex: + dbg("%s threw an uncaught exception" % test) + traceback.print_exc(file=sys.__stdout__) + ctx.results[os.path.basename(test)] = SimpleResult(run=0, failures=0, + errors=0, skipped=0, time=0) + finally: + post_test(ctx, copied) + +def run_unit_tests(ctx, args): + os.chdir(args.testhome + '/unit') + units = build_unit_list(args) + + for u in units: + p = ctx.start_process([u]).wait() + if p.returncode != 0: + dbg("Unit test %s failed" % os.path.basename(u)) + else: + dbg("Unit test %s passed" % os.path.basename(u)) + +def run_tests(args): + global config + + os.chdir(args.testhome) + + # + # This allows all autotest utils (iwd/hostapd/etc) to access the + # TestContext. Any other module or script (in the same interpreter) can + # simply import config.ctx and access all live test information, + # start/stop processes, see active radios etc. + # + config = importlib.import_module('config') + config.ctx = TestContext(args) + + # Must import these after config so ctx gets set + config.hwsim = importlib.import_module('hwsim') + config.hostapd = importlib.import_module('hostapd') + + # Start writing out kernel log + config.ctx.start_process(["dmesg", '--follow']) + + if args.unit_tests is None: + run_auto_tests(config.ctx, args) + else: + run_unit_tests(config.ctx, args) + +runner = Runner() + +atexit.register(exit_vm) +runner.prepare_environment() +run_tests(runner.args) +runner.cleanup_environment() diff --git a/tools/test-runner b/tools/test-runner index c6f7f550..e663108d 100755 --- a/tools/test-runner +++ b/tools/test-runner @@ -1,1528 +1,5 @@ #!/usr/bin/python3 -import os -import shutil -import fcntl -import sys -import subprocess -import atexit -import time -import unittest -import importlib -from unittest.result import TestResult -import multiprocessing -import re -import traceback - from runner import Runner -from configparser import ConfigParser -from prettytable import PrettyTable -from termcolor import colored -from glob import glob -from collections import namedtuple -from time import sleep -import dbus.mainloop.glib -from gi.repository import GLib -from weakref import WeakValueDictionary - -config = None -intf_id = 0 - -TEST_MAX_TIMEOUT = 240 - -dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) - -def dbg(*s, **kwargs): - ''' - Allows prints if stdout has been re-directed - ''' - print(*s, **kwargs, file=sys.__stdout__) - -def exit_vm(): - if config: - for p in Process.get_all(): - print("Process %s still running!" % p.args[0]) - p.kill() - - if config.ctx and config.ctx.results: - success = print_results(config.ctx.results) - else: - success = False - - if config.ctx and config.ctx.args.result: - result = 'PASS' if success else 'FAIL' - with open(config.ctx.args.result, 'w') as f: - f.write(result) - - os.sync() - - runner.stop() - -def path_exists(path): - ''' - Searches PATH as well as absolute paths. - ''' - if shutil.which(path): - return True - try: - os.stat(path) - except: - return False - return True - -def find_binary(list): - ''' - Returns a binary from 'list' if its found in PATH or on a - valid absolute path. - ''' - for path in list: - if path_exists(path): - return path - return None - -# Partial DBus config. The remainder () will be filled in for each -# namespace that is created so each individual dbus-daemon has its own socket -# and address. -dbus_config = ''' - - -system -2147483647 -ANONYMOUS - - - - - - - - - - - - - - - -''' - -class Process(subprocess.Popen): - processes = WeakValueDictionary() - ctx = None - - def __new__(cls, *args, **kwargs): - obj = super().__new__(cls) - cls.processes[id(obj)] = obj - return obj - - def __init__(self, args, namespace=None, outfile=None, env=None, check=False, cleanup=None): - self.write_fds = [] - self.io_watch = None - self.cleanup = cleanup - self.verbose = False - self.out = '' - self.hup = False - self.killed = False - self.namespace = namespace - - if not self.ctx: - global config - self.ctx = config.ctx - - if self.ctx.is_verbose(args[0], log=False): - self.verbose = True - - if namespace: - args = ['ip', 'netns', 'exec', namespace] + args - - if outfile: - # outfile is only used by iwmon, in which case we don't want - # to append to an existing file. - self._append_outfile(outfile, append=False) - - if self.ctx.args.log: - logfile = '%s/%s/%s' % (self.ctx.args.log, - os.path.basename(os.getcwd()), - args[0]) - self._append_outfile(logfile) - - super().__init__(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - env=env, cwd=os.getcwd()) - - # Set as non-blocking so read() in the IO callback doesn't block forever - fl = fcntl.fcntl(self.stdout, fcntl.F_GETFL) - fcntl.fcntl(self.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK) - - self.io_watch = GLib.io_add_watch(self.stdout, GLib.IO_IN | - GLib.IO_HUP | GLib.IO_ERR, self.process_io) - - print("Starting process {}".format(self.args)) - - if check: - self.wait(10) - self.killed = True - if self.returncode != 0: - raise subprocess.CalledProcessError(returncode=self.returncode, - cmd=args) - - @classmethod - def get_all(cls): - return cls.processes.values() - - @classmethod - def kill_all(cls): - for p in cls.processes.values(): - p.kill() - - @staticmethod - def _write_io(instance, data, stdout=True): - for f in instance.write_fds: - f.write(data) - - # Write out a separator so multiple process calls per - # test are easer to read. - if instance.hup: - f.write("Terminated: {}\n\n".format(instance.args)) - - f.flush() - - if instance.verbose and stdout: - sys.__stdout__.write(data) - sys.__stdout__.flush() - - @classmethod - def write_separators(cls, sep): - for proc in cls.processes.values(): - if proc.killed: - continue - - cls._write_io(proc, sep, stdout=False) - - def process_io(self, source, condition): - if condition & GLib.IO_HUP: - self.hup = True - - data = source.read() - - if not data: - return True - - data = data.decode('utf-8') - - # Save data away in case the caller needs it (e.g. list_sta) - self.out += data - - self._write_io(self, data) - - return True - - def _append_outfile(self, file, append=True): - dir = os.path.dirname(file) - - if self.ctx.args.log_gid: - gid = int(self.ctx.args.log_gid) - uid = int(self.ctx.args.log_uid) - - if not path_exists(dir): - os.mkdir(dir) - if self.ctx.args.log_gid: - os.chown(dir, uid, gid) - - file = os.path.join(dir,file) - - # If the out file exists, append. Useful for processes like - # hostapd_cli where it is called multiple times independently. - if os.path.isfile(file) and append: - mode = 'a' - else: - mode = 'w' - - try: - f = open(os.path.join(dir, file), mode) - except Exception as e: - traceback.print_exc() - sys.exit(0) - - if self.ctx.args.log_gid: - os.fchown(f.fileno(), uid, gid) - - self.write_fds.append(f) - - def wait_for_socket(self, socket, wait): - Namespace.non_block_wait(os.path.exists, wait, socket) - - # Wait for both process termination and HUP signal - def __wait(self, timeout): - try: - super().wait(timeout) - if not self.hup: - return False - - return True - except: - return False - - # Override wait() so it can do so non-blocking - def wait(self, timeout=10): - Namespace.non_block_wait(self.__wait, timeout, 1) - self._cleanup() - - def _cleanup(self): - if self.cleanup: - self.cleanup() - - self.write_fds = [] - - if self.io_watch: - GLib.source_remove(self.io_watch) - self.io_watch = None - - self.cleanup = None - self.killed = True - - # Override kill() - def kill(self, force=False): - if self.killed: - return - - print("Killing process {}".format(self.args)) - - if force: - super().kill() - else: - self.terminate() - - try: - self.wait(timeout=15) - except: - dbg("Process %s did not complete in 15 seconds!" % self.name) - super().kill() - - self._cleanup() - - def __str__(self): - return str(self.args) + '\n' - -class Interface: - def __init__(self, name, config): - self.name = name - self.ctrl_interface = '/var/run/hostapd/' + name - self.config = config - - def __del__(self): - Process(['iw', 'dev', self.name, 'del']).wait() - - def set_interface_state(self, state): - Process(['ip', 'link', 'set', self.name, state]).wait() - -class Radio: - def __init__(self, name): - self.name = name - # hostapd will reset this if this radio is used by it - self.use = 'iwd' - self.interface = None - - def __del__(self): - print("Removing radio %s" % self.name) - self.interface = None - - def create_interface(self, config, use): - global intf_id - - ifname = 'wln%s' % intf_id - - intf_id += 1 - - self.interface = Interface(ifname, config) - self.use = use - - Process(['iw', 'phy', self.name, 'interface', 'add', ifname, - 'type', 'managed']).wait() - - return self.interface - - def __str__(self): - ret = self.name + ':\n' - ret += '\tUsed By: %s ' % self.use - if self.interface: - ret += '(%s)' % self.interface.name - - ret += '\n' - - return ret - -class VirtualRadio(Radio): - ''' - A subclass of 'Radio' specific to mac80211_hwsim radios. - - TODO: Using D-Bus to create and destroy radios is more desireable - than the command line. - ''' - - def __init__(self, name, cfg=None): - global config - - self.disable_cipher = None - self.disable_iftype = None - - self.hwsim = config.hwsim.Hwsim() - - if cfg: - self.disable_iftype = cfg.get('iftype_disable', None) - self.disable_cipher = cfg.get('cipher_disable', None) - - self._radio = self.hwsim.radios.create(name, p2p_device=True, - iftype_disable=self.disable_iftype, - cipher_disable=self.disable_cipher) - - super().__init__(self._radio.name) - - def __del__(self): - super().__del__() - - # If the radio was moved into a namespace this will fail - try: - self._radio.remove() - except: - pass - - self._radio = None - - def __str__(self): - ret = super().__str__() - - if self.disable_iftype: - ret += '\tDisabled interface types: %s\n' % self.disable_iftype - - if self.disable_cipher: - ret += '\tDisabled ciphers: %s\n' % self.disable_cipher - - ret += '\tPath: %s' % self._radio.path - - ret += '\n' - - return ret - -class HostapdInstance: - ''' - A single instance of hostapd. In reality all hostapd instances - are started as a single process. This class just makes things - convenient for communicating with one of the hostapd APs. - ''' - def __init__(self, config, radio): - self.radio = radio - self.config = config - self.cli = None - - self.intf = radio.create_interface(self.config, 'hostapd') - self.intf.set_interface_state('up') - - def __del__(self): - print("Removing HostapdInstance %s" % self.config) - self.intf.set_interface_state('down') - self.radio = None - self.intf = None - - def __str__(self): - ret = 'Hostapd (%s)\n' % self.intf.name - ret += '\tConfig: %s\n' % self.config - - return ret - -class Hostapd: - ''' - A set of running hostapd instances. This is really just a single - process since hostapd can be started with multiple config files. - ''' - def __init__(self, ctx, radios, configs, radius): - self.ctx = ctx - - if len(configs) != len(radios): - raise Exception("Config (%d) and radio (%d) list length not equal" % \ - (len(configs), len(radios))) - - print("Initializing hostapd instances") - - Process(['ip', 'link', 'set', 'eth0', 'up']).wait() - Process(['ip', 'link', 'set', 'eth1', 'up']).wait() - - self.global_ctrl_iface = '/var/run/hostapd/ctrl' - - self.instances = [HostapdInstance(c, r) for c, r in zip(configs, radios)] - - ifaces = [rad.interface.name for rad in radios] - ifaces = ','.join(ifaces) - - args = ['hostapd', '-g', self.global_ctrl_iface] - - if ifaces: - args.extend(['-i', ifaces]) - - # - # Config files should already be present in /tmp. This appends - # ctrl_interface and does any variable replacement. Currently - # this is just any $ifaceN occurrences. - # - for c in configs: - full_path = '/tmp/%s' % c - args.append(full_path) - - self._rewrite_config(full_path) - - if radius: - args.append(radius) - - if ctx.is_verbose('hostapd'): - args.append('-d') - - self.process = Process(args) - - self.process.wait_for_socket(self.global_ctrl_iface, 30) - - for hapd in self.instances: - self.process.wait_for_socket(hapd.intf.ctrl_interface, 30) - - def attach_cli(self): - global config - - for hapd in self.instances: - hapd.cli = config.hostapd.HostapdCLI(config=hapd.config) - - def _rewrite_config(self, config): - ''' - Replaces any $ifaceN values with the correct interface - names as well as appends the ctrl_interface path to - the config file. - ''' - with open(config, 'r+') as f: - data = f.read() - to_replace = [] - for match in re.finditer(r'\$iface[0-9]+', data): - tag = data[match.start():match.end()] - idx = tag.split('iface')[1] - - to_replace.append((tag, self.instances[int(idx)].intf.name)) - - for r in to_replace: - data = data.replace(r[0], r[1], 1) - - data += '\nctrl_interface=/var/run/hostapd\n' - - f.write(data) - - def __getitem__(self, config): - if not config: - return self.instances[0] - - for hapd in self.instances: - if hapd.config == config: - return hapd - - return None - - def __del__(self): - print("Removing Hostapd") - try: - os.remove(self.global_ctrl_iface) - except: - print("Failed to remove %s" % self.global_ctrl_iface) - - self.instances = None - - # Hostapd may have already been stopped - if self.process: - self.ctx.stop_process(self.process) - - self.ctx = None - - # Hostapd creates simdb sockets for EAP-SIM/AKA tests but does not - # clean them up. - for f in glob("/tmp/eap_sim_db*"): - os.remove(f) - -dbus_count = 0 - -class Namespace: - def __init__(self, args, name, radios): - self.dbus_address = None - self.name = name - self.radios = radios - self.args = args - - Process(['ip', 'netns', 'add', name]).wait() - for r in radios: - Process(['iw', 'phy', r.name, 'set', 'netns', 'name', name]).wait() - - self.start_dbus() - - def reset(self): - self._bus = None - - for r in self.radios: - r._radio = None - - self.radios = [] - - Process.kill_all() - - def __del__(self): - print("Removing namespace %s" % self.name) - - Process(['ip', 'netns', 'del', self.name]).wait() - - def get_bus(self): - return self._bus - - def start_process(self, args, env=None, **kwargs): - if not env: - env = os.environ.copy() - - if hasattr(self, "dbus_address"): - # In case this process needs DBus... - env['DBUS_SYSTEM_BUS_ADDRESS'] = self.dbus_address - - return Process(args, namespace=self.name, env=env, **kwargs) - - def stop_process(self, p, force=False): - p.kill(force) - - def is_process_running(self, process): - for p in Process.get_all(): - if p.namespace == self.name and p.args[0] == process: - return True - return False - - def _cleanup_dbus(self): - try: - os.remove(self.dbus_address.split('=')[1]) - except: - pass - - os.remove(self.dbus_cfg) - - def start_dbus(self): - global dbus_count - - self.dbus_address = 'unix:path=/tmp/dbus%d' % dbus_count - self.dbus_cfg = '/tmp/dbus%d.conf' % dbus_count - dbus_count += 1 - - with open(self.dbus_cfg, 'w+') as f: - f.write(dbus_config) - f.write('%s\n' % self.dbus_address) - f.write('\n') - - p = self.start_process(['dbus-daemon', '--config-file=%s' % self.dbus_cfg], - cleanup=self._cleanup_dbus) - - p.wait_for_socket(self.dbus_address.split('=')[1], 5) - - self._bus = dbus.bus.BusConnection(address_or_type=self.dbus_address) - - def start_iwd(self, config_dir = '/tmp', storage_dir = '/tmp/iwd'): - args = [] - iwd_radios = ','.join([r.name for r in self.radios if r.use == 'iwd']) - - if self.args.valgrind: - args.extend(['valgrind', '--leak-check=full', '--track-origins=yes', - '--show-leak-kinds=all', - '--log-file=/tmp/valgrind.log.%p']) - - args.extend(['iwd', '-E']) - - if iwd_radios != '': - args.extend(['-p', iwd_radios]) - - if self.is_verbose(args[0]): - args.append('-d') - - env = os.environ.copy() - - env['CONFIGURATION_DIRECTORY'] = config_dir - env['STATE_DIRECTORY'] = storage_dir - - if self.is_verbose('iwd-dhcp'): - env['IWD_DHCP_DEBUG'] = '1' - - if self.is_verbose('iwd-tls'): - env['IWD_TLS_DEBUG'] = '1' - - if self.is_verbose('iwd-acd'): - env['IWD_ACD_DEBUG'] = '1' - - return self.start_process(args, env=env) - - def is_verbose(self, process, log=True): - process = os.path.basename(process) - - if self.args is None: - return False - - # every process is verbose when logging is enabled - if log and self.args.log: - return True - - if process in self.args.verbose: - return True - - # Special case here to enable verbose output with valgrind running - if process == 'valgrind' and 'iwd' in self.args.verbose: - return True - - # Handle any glob matches - for item in self.args.verbose: - if process in glob(item): - return True - - return False - - @staticmethod - def non_block_wait(func, timeout, *args, exception=True): - ''' - Convenience function for waiting in a non blocking - manor using GLibs context iteration i.e. does not block - the main loop while waiting. - - 'func' will be called at least once and repeatedly until - either it returns success, throws an exception, or the - 'timeout' expires. - - 'timeout' is the ultimate timeout in seconds - - '*args' will be passed to 'func' - - If 'exception' is an Exception type it will be raised. - If 'exception' is True a generic TimeoutError will be raised. - Any other value will not result in an exception. - ''' - # Simple class for signaling the wait timeout - class Bool: - def __init__(self, value): - self.value = value - - def wait_timeout_cb(done): - done.value = True - return False - - mainloop = GLib.MainLoop() - done = Bool(False) - - timeout = GLib.timeout_add_seconds(timeout, wait_timeout_cb, done) - context = mainloop.get_context() - - while True: - context.iteration(may_block=False) - - try: - ret = func(*args) - if ret: - if not done.value: - GLib.source_remove(timeout) - return ret - except Exception as e: - if not done.value: - GLib.source_remove(timeout) - raise e - - sleep(0.1) - - if done.value == True: - if isinstance(exception, Exception): - raise exception - elif type(exception) == bool and exception: - raise TimeoutError("Timeout on non_block_wait") - else: - return - - def __str__(self): - ret = 'Namespace: %s\n' % self.name - ret += 'Processes:\n' - for p in Process.get_all(): - ret += '\t%s' % str(p) - - ret += 'Radios:\n' - if len(self.radios) > 0: - for r in self.radios: - ret += '\t%s\n' % str(r) - else: - ret += '\tNo Radios\n' - - ret += 'DBus Address: %s\n' % self.dbus_address - ret += '===================================================\n\n' - - return ret - -class BarChart(): - def __init__(self, height=10, max_width=80): - self._height = height - self._max_width = max_width - self._values = [] - self._max_value = 0 - self._min_value = 0 - - def add_value(self, value): - if len(self._values) == 0: - self._max_value = int(1.01 * value) - self._min_value = int(0.99 * value) - elif value > self._max_value: - self._max_value = int(1.01 * value) - elif value < self._min_value: - self._min_value = int(0.99 * value) - - self._values.append(value) - - def _value_to_stars(self, value): - # Need to scale value (range of min_value -> max_value) to - # a range of 0 -> height - # - # Scaled = ((value - min_value) / ( max_value - min_value)) * (Height - 0) + 0 - - return int(((value - self._min_value) / - (self._max_value - self._min_value)) * self._height) - - def __str__(self): - # Need to map value from range 0 - self._height - ret = '' - - for i, value in enumerate(self._values): - stars = self._value_to_stars(value) - ret += '[%3u] ' % i + '%-10s' % ('*' * stars) + '\t\t\t%d\n' % value - - ret += '\n' - - return ret - - -class TestContext(Namespace): - ''' - Contains all information for a given set of tests being run - such as processes, radios, interfaces and test results. - ''' - def __init__(self, args): - self.name = None - self.args = args - self.hw_config = None - self.hostapd = None - self.wpas_interfaces = None - self.cur_radio_id = 0 - self.cur_iface_id = 0 - self.radios = [] - self.loopback_started = False - self.results = {} - self.mainloop = GLib.MainLoop() - self.namespaces = [] - self._last_mem_available = 0 - self._mem_chart = BarChart() - - def start_dbus_monitor(self): - if not self.is_verbose('dbus-monitor'): - return - - self.start_process(['dbus-monitor', '--address', self.dbus_address]) - - def start_haveged(self): - self.start_process(['haveged', '-F']) - - def create_radios(self): - setup = self.hw_config['SETUP'] - nradios = int(setup['num_radios']) - args = ['hwsim'] - - if self.hw_config['SETUP'].get('hwsim_medium', 'no') in ['no', '0', 'false']: - # register hwsim as medium - args.extend(['--no-register']) - - self.start_process(args) - self.non_block_wait(self._bus.name_has_owner, 20, 'net.connman.hwsim', - exception=TimeoutError('net.connman.hwsim did not appear')) - - for i in range(nradios): - name = 'rad%u' % i - - # Get any [radX] sections. These are for configuring - # any special radios. This no longer requires a - # radio_conf list, we just assume radios start rad0 - # and increment. - rad_config = None - if self.hw_config.has_section(name): - rad_config = self.hw_config[name] - - self.radios.append(VirtualRadio(name, rad_config)) - self.cur_radio_id += 1 - - def discover_radios(self): - import pyroute2 - - phys = [] - - try: - iw = pyroute2.iwutil.IW() - except: - iw = pyroute2.IW() - - attrs = [phy['attrs'] for phy in iw.list_wiphy()] - - for attr in attrs: - for key, value in attr: - if key == 'NL80211_ATTR_WIPHY_NAME': - if value not in phys: - phys.append(value) - break - - print('Discovered radios: %s' % str(phys)) - self.radios = [Radio(name) for name in phys] - - def start_radios(self): - reg_domain = self.hw_config['SETUP'].get('reg_domain', None) - if reg_domain: - Process(['iw', 'reg', 'set', reg_domain]).wait() - - if self.args.hw: - self.discover_radios() - else: - self.create_radios() - - def start_hostapd(self): - if not 'HOSTAPD' in self.hw_config: - return - - settings = self.hw_config['HOSTAPD'] - - if self.args.hw: - # Just grab the first N radios. It gets rather - # complicated trying to map radX radios specified in - # hw.conf so any passed through physical adapters are - # just given to hostapd/IWD as they appear during - # discovery. - # - # TODO: It may be desireable to map PCI/USB adapters to - # specific radX radios specified in the config but - # there are really 2 separate use cases here. - # 1. You want to test a *specific* radio with IWD - # or hostapd. For this you would want radX - # to map to a specific radio - # 2. You have many adapters in use to run multiple - # tests. In this case you would not care what - # was using each radio, just that there was - # enough to run all tests. - nradios = 0 - for k, _ in settings.items(): - if k == 'radius_server': - continue - nradios += 1 - - hapd_radios = self.radios[:nradios] - - else: - hapd_radios = [rad for rad in self.radios if rad.name in settings] - - hapd_configs = [conf for rad, conf in settings.items() if rad != 'radius_server'] - - radius_config = settings.get('radius_server', None) - - self.hostapd = Hostapd(self, hapd_radios, hapd_configs, radius_config) - self.hostapd.attach_cli() - - def get_frequencies(self): - frequencies = [] - - for hapd in self.hostapd.instances: - frequencies.append(hapd.cli.frequency) - - return frequencies - - def start_wpas_interfaces(self): - - if 'WPA_SUPPLICANT' not in self.hw_config: - return - - settings = self.hw_config['WPA_SUPPLICANT'] - - if self.args.hw: - nradios = len(settings.items()) - - wpas_radios = self.radios[:nradios] - self.wpas_interfaces = [] - - # - # Physical radios most likely will use a different name - # than 'rad#' but the config file is referenced by these - # 'rad#' names. Iterate through both the settings and - # physical radios to create interfaces associated with - # each config file. - # - for vrad, hwrad in zip(settings.items(), wpas_radios): - self.wpas_interfaces.append(hwrad.create_interface(vrad[1], 'wpas')) - - else: - wpas_radios = [rad for rad in self.radios if rad.name in settings] - self.wpas_interfaces = [rad.create_interface(settings[rad.name], 'wpas') \ - for rad in wpas_radios] - - def start_ofono(self): - sim_keys = self.hw_config['SETUP'].get('sim_keys', None) - if not sim_keys: - print("Ofono not requred") - return - elif sim_keys != 'ofono': - os.environ['IWD_SIM_KEYS'] = sim_keys - return - - if not find_binary(['ofonod']) or not find_binary(['phonesim']): - print("Ofono or Phonesim not found, skipping test") - return - - Process(['ip', 'link', 'set', 'lo', 'up']).wait() - - os.environ['OFONO_PHONESIM_CONFIG'] = '/tmp/phonesim.conf' - - phonesim_args = ['phonesim', '-p', '12345', '/usr/share/phonesim/default.xml'] - - self.start_process(phonesim_args) - - # - # TODO: - # Is there something to wait for? Without this phonesim rejects - # connections on all but the fist test. - # - time.sleep(3) - - ofono_args = ['ofonod', '-n', '--plugin=atmodem,phonesim'] - if self.is_verbose('ofonod'): - ofono_args.append('-d') - - self.start_process(ofono_args) - - print("Ofono started") - - def create_namespaces(self): - if not self.hw_config.has_section('NameSpaces'): - return - - for key, value in self.hw_config.items('NameSpaces'): - radio_names = value.split(',') - # Gather up radio objects for this namespace - radios = [rad for rad in self.radios if rad.name in radio_names] - - # Remove radios from 'root' namespace - self.radios = list(set(self.radios) - set(radios)) - - self.namespaces.append(Namespace(self.args, key, radios)) - - def get_namespace(self, ns): - for n in self.namespaces: - if n.name == ns: - return n - - return None - - def stop_test_processes(self): - for n in self.namespaces: - n.reset() - - self.namespaces = [] - self.hostapd = None - self.wpas_interfaces = None - - self.reset() - - def meminfo_to_dict(self): - def removesuffix(string, suffix): - if string.endswith(suffix): - return string[:-len(suffix)] - return string - - ret = {} - - with open('/proc/meminfo', 'r') as f: - data = f.read().strip().split('\n') - - for l in data: - entry = l.split(':') - ret[entry[0]] = int(removesuffix(entry[1], 'kB')) - - return ret - - def __str__(self): - ret = 'Arguments:\n' - for arg in vars(self.args): - ret += '\t --%s %s\n' % (arg, str(getattr(self.args, arg))) - - ret += 'Hostapd:\n' - if self.hostapd: - for h in self.hostapd.instances: - ret += '\t%s\n' % str(h) - else: - ret += '\tNo Hostapd instances\n' - - info = self.meminfo_to_dict() - self._mem_chart.add_value(info['MemAvailable']) - - ret += 'Available Memory: %u kB\n' % info['MemAvailable'] - ret += 'Last Test Delta: %+d kB\n' % (info['MemAvailable'] - self._last_mem_available) - ret += 'Per-test Usage:\n' - ret += str(self._mem_chart) - - self._last_mem_available = info['MemAvailable'] - - ret += super().__str__() - - for n in self.namespaces: - ret += n.__str__() - - return ret - -def build_unit_list(args): - ''' - Build list of unit tests based on passed arguments. This first - checks for literal names provided in the arguments, then if - no matches were found, checks for a glob match. - ''' - tests = [] - test_root = args.testhome + '/unit' - - for unit in args.unit_tests.split(','): - path = '%s/%s' % (test_root, unit) - if os.access(unit, os.X_OK): - tests.append(unit) - elif os.access(path, os.X_OK): - tests.append(path) - else: - # Full list or glob, first build up valid list of tests - matches = glob(path) - if matches == []: - raise Exception("Could not find test %s" % unit) - - matches = [exe for exe in matches if os.access(exe, os.X_OK)] - - tests.extend(matches) - - return sorted(tests) - -def build_test_list(args): - ''' - Build list of auto test directories based on passed arguments. - First check for absolute paths, then look in /autotests, - then glob match. - ''' - tests = [] - test_root = args.testhome + '/autotests' - - # Run all tests - if not args.autotests: - # Get list of all autotests (committed in git) - tests = os.popen('git -C %s ls-files autotests/ | cut -f2 -d"/" \ - | grep "test*" | uniq' % args.testhome).read() \ - .strip().split('\n') - tests = [test_root + '/' + t for t in tests] - else: - print("Generating partial test list") - - full_list = sorted(os.listdir(test_root)) - - for t in args.autotests.split(','): - path = '%s/%s' % (test_root, t) - if t.endswith('+'): - t = t.split('+')[0] - i = full_list.index(t) - - tests = [test_root + '/' + x for x in full_list[i:] \ - if x.startswith('test')] - elif os.path.exists(t): - if t not in tests: - tests.append(t) - elif os.path.exists(path): - if path not in tests: - tests.append(path) - else: - matches = glob(path) - if matches == []: - raise Exception("Could not find test %s" % t) - - tests.extend(list(set(matches) - set(tests))) - - return sorted(tests) - -SimpleResult = namedtuple('SimpleResult', 'run failures errors skipped time') - -def start_test(ctx, subtests, rqueue): - ''' - Run an individual test. 'subtests' are parsed prior to calling - but these effectively make up a single test. 'rqueue' is the - results queue which is required since this is using - multiprocessing. - ''' - run = 0 - errors = 0 - failures = 0 - skipped = 0 - - start = time.time() - # - # Iterate through each individual python test. - # - for s in subtests: - loader = unittest.TestLoader() - try: - module = importlib.import_module(os.path.splitext(s)[0]) - except OSError as e: - dbg(subprocess.check_output("cat /proc/buddyinfo", shell=True).decode('utf-8')) - dbg(subprocess.check_output("dmesg | tail -80", shell=True).decode('utf-8')) - print(ctx) - raise e - - subtest = loader.loadTestsFromModule(module) - - # The test suite is being (ab)used to get a bit more granularity - # with individual tests. The 'normal' way to use unittest is to - # just create a test suite and run them. The problem here is that - # test results are queued and printed at the very end so its - # difficult to know *where* a test failed (python gives a stack - # trace but printing the exception/failure immediately shows - # where in the debug logs something failed). Moreso if there are - # several test functions inside a single python file they run - # as a single test and it is difficult (again) to know where - # something failed. - - # Iterating through each python test file - for test in subtest: - limit_funcs = [] - - if ctx.args.sub_tests: - for i in ctx.args.sub_tests: - if len(i.split('.')) == 2: - limit_funcs.append(i.split('.')[1]) - - # Iterating through individual test functions inside a - # Test() class. Due to the nature of unittest we have - # to jump through some hoops to set up the test class - # only once by turning the enumeration into a list, then - # enumerating (again) to keep track of the index (just - # enumerating the test class doesn't allow len() because - # it is not a list). - tlist = list(enumerate(test)) - for index, t in enumerate(tlist): - # enumerate is returning a tuple, index 1 is our - # actual object. - t = t[1] - - func, file = str(t).split(' ') - # - # TODO: There may be a better way of doing this - # but strigifying the test class gives us a string: - # (.) - # - file = file.strip('()').split('.')[0] + '.py' - - # Create an empty result here in case the test fails - result = TestResult() - - try: - skip = len(limit_funcs) > 0 and func not in limit_funcs - - # Set up class only on first test - if index == 0: - if not skip: - dbg("%s\n\t%s RUNNING" % (file, str(func)), end='') - t.setUpClass() - else: - if not skip: - dbg("\t%s RUNNING" % str(func), end='') - - sys.__stdout__.flush() - - Process.write_separators("\n====== %s:%s ======\n\n" % (file, func)) - - if not skip: - # Run test (setUp/tearDown run automatically) - result = t() - - # Tear down class only on last test - if index == len(tlist) - 1: - t.tearDownClass() - - if skip: - continue - except unittest.SkipTest as e: - result.skipped.append(t) - except Exception as e: - dbg('\n%s threw an uncaught exception:' % func) - traceback.print_exc(file=sys.__stdout__) - - run += result.testsRun - errors += len(result.errors) - failures += len(result.failures) - skipped += len(result.skipped) - - if len(result.skipped) > 0: - dbg(colored(" SKIPPED", "cyan")) - elif run == 0 or len(result.errors) > 0 or len(result.failures) > 0: - dbg(colored(" FAILED", "red")) - for e in result.errors: - dbg(e[1]) - for f in result.failures: - dbg(f[1]) - else: - dbg(colored(" PASSED", "green")) - - # Prevents future test modules with the same name (e.g. - # connection_test.py) from being loaded from the cache - sys.modules.pop(module.__name__) - - # - # The multiprocessing queue is picky with what objects it will serialize - # and send between processes. Because of this we put the important bits - # of the result into our own 'SimpleResult' tuple. - # - sresult = SimpleResult(run=run, failures=failures, errors=errors, - skipped=skipped, time=time.time() - start) - rqueue.put(sresult) - - # This may not be required since we are manually popping sys.modules - importlib.invalidate_caches() - -def pre_test(ctx, test, copied): - ''' - Copy test files, start processes, and any other pre test work. - ''' - os.chdir(test) - - dbg("\nStarting %s" % colored(os.path.basename(test), "white", attrs=['bold'])) - if not os.path.exists(test + '/hw.conf'): - raise Exception("No hw.conf found for %s" % test) - - ctx.hw_config = ConfigParser() - ctx.hw_config.read(test + '/hw.conf') - # - # We have two types of test files: tests and everything else. Rather - # than require each test to specify the files needing to be copied to - # /tmp (previously 'tmpfs_extra_stuff'), we just copy everything which - # isn't a test. There is really no reason not to do this as any file - # present in a test directory should be needed by the test. - # - # All files - files = os.listdir(test) - # Tests (starts or ends with 'test') - subtests = [f for f in files if f.startswith('test') or \ - os.path.splitext(f)[0].endswith('test')] - # Everything else (except .py files) - to_copy = [f for f in list(set(files) - set(subtests)) if not f.endswith('.py') \ - and f != '__pycache__'] - for f in to_copy: - if os.path.isdir(f): - shutil.copytree(f, '/tmp/' + f) - else: - shutil.copy(f, '/tmp') - copied.append(f) - - # Prune down any subtests if needed - if ctx.args.sub_tests: - ctx.args.sub_tests = ctx.args.sub_tests.split(',') - - to_run = [x.split('.')[0] for x in ctx.args.sub_tests] - pruned = [] - - for s in subtests: - no_ext = s - # Handle . format - if '.' in s: - no_ext = s.split('.')[0] - - if no_ext in to_run: - pruned.append(no_ext + '.py') - - subtests = pruned - - if ctx.args.log: - ctx.start_process(['iwmon', '--nowiphy']) - elif ctx.args.monitor: - ctx.start_process(['iwmon'], outfile=ctx.args.monitor) - - ctx.start_dbus() - ctx.start_haveged() - ctx.start_dbus_monitor() - ctx.start_radios() - ctx.create_namespaces() - ctx.start_hostapd() - ctx.start_wpas_interfaces() - ctx.start_ofono() - - if ctx.hw_config.has_option('SETUP', 'start_iwd'): - start = ctx.hw_config.getboolean('SETUP', 'start_iwd') - else: - start = True - - if start: - ctx.start_iwd() - else: - print("Not starting IWD from test-runner") - - print(ctx) - - sys.path.insert(1, test) - - return sorted(subtests) - -def post_test(ctx, to_copy): - ''' - Remove copied files, and stop test processes. - ''' - try: - for f in to_copy: - if os.path.isdir('/tmp/' + f): - shutil.rmtree('/tmp/' + f) - else: - os.remove('/tmp/' + f) - - Process(['ip', 'link', 'set', 'lo', 'down']).wait() - except Exception as e: - print("Exception thrown in post_test") - finally: - ctx.stop_test_processes() - - if ctx.args.valgrind: - for f in os.listdir('/tmp'): - if f.startswith("valgrind.log."): - dbg(f) - with open('/tmp/' + f, 'r') as v: - dbg(v.read()) - dbg("\n") - os.remove('/tmp/' + f) - - # Special case for when logging is enabled - if os.path.isfile('/tmp/iwd-tls-debug-server-cert.pem'): - os.remove('/tmp/iwd-tls-debug-server-cert.pem') - - allowed = ['phonesim.conf', 'certs', 'secrets', 'iwd'] - for f in [f for f in os.listdir('/tmp') if f not in allowed]: - dbg("File %s was not cleaned up!" % f) - try: - os.remove('/tmp/' + f) - except: - pass - -def print_results(results): - table = PrettyTable(['Test', colored('Passed', 'green'), colored('Failed', 'red'), \ - colored('Skipped', 'cyan'), colored('Time', 'yellow')]) - - total_pass = 0 - total_fail = 0 - total_skip = 0 - total_time = 0 - - for test, result in results.items(): - - if result.time == TEST_MAX_TIMEOUT: - failed = "Timed out" - passed = "Timed out" - elif result.time == 0: - failed = "Exception" - passed = "Exception" - else: - failed = result.failures + result.errors - passed = result.run - failed - - total_pass += passed - total_fail += failed - total_skip += result.skipped - - total_time += result.time - - time = '%.2f' % result.time - - table.add_row([test, colored(passed, 'green'), colored(failed, 'red'), \ - colored(result.skipped, 'cyan'), colored(time, 'yellow')]) - - total_time = '%.2f' % total_time - - table.add_row(['Total', colored(total_pass, 'green'), colored(total_fail, 'red'), \ - colored(total_skip, 'cyan'), colored(total_time, 'yellow')]) - - dbg(table) - - return total_fail == 0 - -def run_auto_tests(ctx, args): - tests = build_test_list(args) - - for test in tests: - copied = [] - try: - subtests = pre_test(ctx, test, copied) - - if len(subtests) < 1: - dbg("No tests to run") - sys.exit() - - rqueue = multiprocessing.Queue() - p = multiprocessing.Process(target=start_test, args=(ctx, subtests, rqueue)) - p.start() - # Rather than time each subtest we just time the total but - # mutiply the default time by the number of tests being run. - p.join(TEST_MAX_TIMEOUT * len(subtests)) - - if p.is_alive(): - # Timeout - p.terminate() - - ctx.results[os.path.basename(test)] = SimpleResult(run=0, - failures=0, errors=0, - skipped=0, time=TEST_MAX_TIMEOUT) - else: - ctx.results[os.path.basename(test)] = rqueue.get() - - except Exception as ex: - dbg("%s threw an uncaught exception" % test) - traceback.print_exc(file=sys.__stdout__) - ctx.results[os.path.basename(test)] = SimpleResult(run=0, failures=0, - errors=0, skipped=0, time=0) - finally: - post_test(ctx, copied) - -def run_unit_tests(ctx, args): - os.chdir(args.testhome + '/unit') - units = build_unit_list(args) - - for u in units: - p = ctx.start_process([u]).wait() - if p.returncode != 0: - dbg("Unit test %s failed" % os.path.basename(u)) - else: - dbg("Unit test %s passed" % os.path.basename(u)) - -def run_tests(args): - global config - - os.chdir(args.testhome) - - # - # This allows all autotest utils (iwd/hostapd/etc) to access the - # TestContext. Any other module or script (in the same interpreter) can - # simply import config.ctx and access all live test information, - # start/stop processes, see active radios etc. - # - config = importlib.import_module('config') - config.ctx = TestContext(args) - - # Must import these after config so ctx gets set - config.hwsim = importlib.import_module('hwsim') - config.hostapd = importlib.import_module('hostapd') - - # Start writing out kernel log - config.ctx.start_process(["dmesg", '--follow']) - - if args.unit_tests is None: - run_auto_tests(config.ctx, args) - else: - run_unit_tests(config.ctx, args) - -runner = Runner(from_env=True) - -atexit.register(exit_vm) -runner.prepare_environment() -run_tests(runner.args) -runner.cleanup_environment() +Runner().start()