mirror of
https://git.kernel.org/pub/scm/network/wireless/iwd.git
synced 2024-11-25 09:39:25 +01:00
2894f2e3eb
In order to keep all test-runner dev scripts working and to work with the new runner.py system some file renaming was required. test-runner was renamed to run-tests A new test-runner was added which only creates the Runner() class.
1526 lines
38 KiB
Python
Executable File
1526 lines
38 KiB
Python
Executable File
#!/usr/bin/python3
|
|
|
|
import os
|
|
import shutil
|
|
import fcntl
|
|
import sys
|
|
import subprocess
|
|
import atexit
|
|
import time
|
|
import unittest
|
|
import importlib
|
|
from unittest.result import TestResult
|
|
import multiprocessing
|
|
import re
|
|
import traceback
|
|
|
|
from runner import Runner
|
|
|
|
from configparser import ConfigParser
|
|
from prettytable import PrettyTable
|
|
from termcolor import colored
|
|
from glob import glob
|
|
from collections import namedtuple
|
|
from time import sleep
|
|
import dbus.mainloop.glib
|
|
from gi.repository import GLib
|
|
from weakref import WeakValueDictionary
|
|
|
|
config = None
|
|
intf_id = 0
|
|
|
|
TEST_MAX_TIMEOUT = 240
|
|
|
|
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
|
|
|
def dbg(*s, **kwargs):
|
|
'''
|
|
Allows prints if stdout has been re-directed
|
|
'''
|
|
print(*s, **kwargs, file=sys.__stdout__)
|
|
|
|
def exit_vm():
|
|
if config:
|
|
for p in Process.get_all():
|
|
print("Process %s still running!" % p.args[0])
|
|
p.kill()
|
|
|
|
if config.ctx and config.ctx.results:
|
|
success = print_results(config.ctx.results)
|
|
else:
|
|
success = False
|
|
|
|
if config.ctx.args.result:
|
|
result = 'PASS' if success else 'FAIL'
|
|
with open(config.ctx.args.result, 'w') as f:
|
|
f.write(result)
|
|
|
|
os.sync()
|
|
|
|
runner.stop()
|
|
|
|
def path_exists(path):
|
|
'''
|
|
Searches PATH as well as absolute paths.
|
|
'''
|
|
if shutil.which(path):
|
|
return True
|
|
try:
|
|
os.stat(path)
|
|
except:
|
|
return False
|
|
return True
|
|
|
|
def find_binary(list):
|
|
'''
|
|
Returns a binary from 'list' if its found in PATH or on a
|
|
valid absolute path.
|
|
'''
|
|
for path in list:
|
|
if path_exists(path):
|
|
return path
|
|
return None
|
|
|
|
# Partial DBus config. The remainder (<listen>) will be filled in for each
|
|
# namespace that is created so each individual dbus-daemon has its own socket
|
|
# and address.
|
|
dbus_config = '''
|
|
<!DOCTYPE busconfig PUBLIC \
|
|
"-//freedesktop//DTD D-Bus Bus Configuration 1.0//EN" \
|
|
"http://www.freedesktop.org/standards/dbus/1.0/\
|
|
busconfig.dtd\">
|
|
<busconfig>
|
|
<type>system</type>
|
|
<limit name=\"reply_timeout\">2147483647</limit>
|
|
<auth>ANONYMOUS</auth>
|
|
<allow_anonymous/>
|
|
<policy context=\"default\">
|
|
<allow user=\"*\"/>
|
|
<allow own=\"*\"/>
|
|
<allow send_type=\"method_call\"/>
|
|
<allow send_type=\"signal\"/>
|
|
<allow send_type=\"method_return\"/>
|
|
<allow send_type=\"error\"/>
|
|
<allow receive_type=\"method_call\"/>
|
|
<allow receive_type=\"signal\"/>
|
|
<allow receive_type=\"method_return\"/>
|
|
<allow receive_type=\"error\"/>
|
|
<allow send_destination=\"*\" eavesdrop=\"true\"/>
|
|
<allow eavesdrop=\"true\"/>
|
|
</policy>
|
|
'''
|
|
|
|
class Process(subprocess.Popen):
|
|
processes = WeakValueDictionary()
|
|
ctx = None
|
|
|
|
def __new__(cls, *args, **kwargs):
|
|
obj = super().__new__(cls)
|
|
cls.processes[id(obj)] = obj
|
|
return obj
|
|
|
|
def __init__(self, args, namespace=None, outfile=None, env=None, check=False, cleanup=None):
|
|
self.write_fds = []
|
|
self.io_watch = None
|
|
self.cleanup = cleanup
|
|
self.verbose = False
|
|
self.out = ''
|
|
self.hup = False
|
|
self.killed = False
|
|
self.namespace = namespace
|
|
|
|
if not self.ctx:
|
|
global config
|
|
self.ctx = config.ctx
|
|
|
|
if self.ctx.is_verbose(args[0], log=False):
|
|
self.verbose = True
|
|
|
|
if namespace:
|
|
args = ['ip', 'netns', 'exec', namespace] + args
|
|
|
|
if outfile:
|
|
# outfile is only used by iwmon, in which case we don't want
|
|
# to append to an existing file.
|
|
self._append_outfile(outfile, append=False)
|
|
|
|
if self.ctx.args.log:
|
|
logfile = '%s/%s/%s' % (self.ctx.args.log,
|
|
os.path.basename(os.getcwd()),
|
|
args[0])
|
|
self._append_outfile(logfile)
|
|
|
|
super().__init__(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
|
|
env=env, cwd=os.getcwd())
|
|
|
|
# Set as non-blocking so read() in the IO callback doesn't block forever
|
|
fl = fcntl.fcntl(self.stdout, fcntl.F_GETFL)
|
|
fcntl.fcntl(self.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
|
|
|
|
self.io_watch = GLib.io_add_watch(self.stdout, GLib.IO_IN |
|
|
GLib.IO_HUP | GLib.IO_ERR, self.process_io)
|
|
|
|
print("Starting process {}".format(self.args))
|
|
|
|
if check:
|
|
self.wait(10)
|
|
self.killed = True
|
|
if self.returncode != 0:
|
|
raise subprocess.CalledProcessError(returncode=self.returncode,
|
|
cmd=args)
|
|
|
|
@classmethod
|
|
def get_all(cls):
|
|
return cls.processes.values()
|
|
|
|
@classmethod
|
|
def kill_all(cls):
|
|
for p in cls.processes.values():
|
|
p.kill()
|
|
|
|
@staticmethod
|
|
def _write_io(instance, data, stdout=True):
|
|
for f in instance.write_fds:
|
|
f.write(data)
|
|
|
|
# Write out a separator so multiple process calls per
|
|
# test are easer to read.
|
|
if instance.hup:
|
|
f.write("Terminated: {}\n\n".format(instance.args))
|
|
|
|
f.flush()
|
|
|
|
if instance.verbose and stdout:
|
|
sys.__stdout__.write(data)
|
|
sys.__stdout__.flush()
|
|
|
|
@classmethod
|
|
def write_separators(cls, sep):
|
|
for proc in cls.processes.values():
|
|
if proc.killed:
|
|
continue
|
|
|
|
cls._write_io(proc, sep, stdout=False)
|
|
|
|
def process_io(self, source, condition):
|
|
if condition & GLib.IO_HUP:
|
|
self.hup = True
|
|
|
|
data = source.read()
|
|
|
|
if not data:
|
|
return True
|
|
|
|
data = data.decode('utf-8')
|
|
|
|
# Save data away in case the caller needs it (e.g. list_sta)
|
|
self.out += data
|
|
|
|
self._write_io(self, data)
|
|
|
|
return True
|
|
|
|
def _append_outfile(self, file, append=True):
|
|
gid = int(os.environ.get('SUDO_GID', os.getgid()))
|
|
uid = int(os.environ.get('SUDO_UID', os.getuid()))
|
|
dir = os.path.dirname(file)
|
|
|
|
if not path_exists(dir):
|
|
os.mkdir(dir)
|
|
os.chown(dir, uid, gid)
|
|
|
|
file = os.path.join(dir,file)
|
|
|
|
# If the out file exists, append. Useful for processes like
|
|
# hostapd_cli where it is called multiple times independently.
|
|
if os.path.isfile(file) and append:
|
|
mode = 'a'
|
|
else:
|
|
mode = 'w'
|
|
|
|
try:
|
|
f = open(file, mode)
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
sys.exit(0)
|
|
|
|
os.fchown(f.fileno(), uid, gid)
|
|
|
|
self.write_fds.append(f)
|
|
|
|
def wait_for_socket(self, socket, wait):
|
|
Namespace.non_block_wait(os.path.exists, wait, socket)
|
|
|
|
# Wait for both process termination and HUP signal
|
|
def __wait(self, timeout):
|
|
try:
|
|
super().wait(timeout)
|
|
if not self.hup:
|
|
return False
|
|
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
# Override wait() so it can do so non-blocking
|
|
def wait(self, timeout=10):
|
|
Namespace.non_block_wait(self.__wait, timeout, 1)
|
|
self._cleanup()
|
|
|
|
def _cleanup(self):
|
|
if self.cleanup:
|
|
self.cleanup()
|
|
|
|
self.write_fds = []
|
|
|
|
if self.io_watch:
|
|
GLib.source_remove(self.io_watch)
|
|
self.io_watch = None
|
|
|
|
self.cleanup = None
|
|
self.killed = True
|
|
|
|
# Override kill()
|
|
def kill(self, force=False):
|
|
if self.killed:
|
|
return
|
|
|
|
print("Killing process {}".format(self.args))
|
|
|
|
if force:
|
|
super().kill()
|
|
else:
|
|
self.terminate()
|
|
|
|
try:
|
|
self.wait(timeout=15)
|
|
except:
|
|
dbg("Process %s did not complete in 15 seconds!" % self.name)
|
|
super().kill()
|
|
|
|
self._cleanup()
|
|
|
|
def __str__(self):
|
|
return str(self.args) + '\n'
|
|
|
|
class Interface:
|
|
def __init__(self, name, config):
|
|
self.name = name
|
|
self.ctrl_interface = '/var/run/hostapd/' + name
|
|
self.config = config
|
|
|
|
def __del__(self):
|
|
Process(['iw', 'dev', self.name, 'del']).wait()
|
|
|
|
def set_interface_state(self, state):
|
|
Process(['ip', 'link', 'set', self.name, state]).wait()
|
|
|
|
class Radio:
|
|
def __init__(self, name):
|
|
self.name = name
|
|
# hostapd will reset this if this radio is used by it
|
|
self.use = 'iwd'
|
|
self.interface = None
|
|
|
|
def __del__(self):
|
|
print("Removing radio %s" % self.name)
|
|
self.interface = None
|
|
|
|
def create_interface(self, config, use):
|
|
global intf_id
|
|
|
|
ifname = 'wln%s' % intf_id
|
|
|
|
intf_id += 1
|
|
|
|
self.interface = Interface(ifname, config)
|
|
self.use = use
|
|
|
|
Process(['iw', 'phy', self.name, 'interface', 'add', ifname,
|
|
'type', 'managed']).wait()
|
|
|
|
return self.interface
|
|
|
|
def __str__(self):
|
|
ret = self.name + ':\n'
|
|
ret += '\tUsed By: %s ' % self.use
|
|
if self.interface:
|
|
ret += '(%s)' % self.interface.name
|
|
|
|
ret += '\n'
|
|
|
|
return ret
|
|
|
|
class VirtualRadio(Radio):
|
|
'''
|
|
A subclass of 'Radio' specific to mac80211_hwsim radios.
|
|
|
|
TODO: Using D-Bus to create and destroy radios is more desireable
|
|
than the command line.
|
|
'''
|
|
|
|
def __init__(self, name, cfg=None):
|
|
global config
|
|
|
|
self.disable_cipher = None
|
|
self.disable_iftype = None
|
|
|
|
self.hwsim = config.hwsim.Hwsim()
|
|
|
|
if cfg:
|
|
self.disable_iftype = cfg.get('iftype_disable', None)
|
|
self.disable_cipher = cfg.get('cipher_disable', None)
|
|
|
|
self._radio = self.hwsim.radios.create(name, p2p_device=True,
|
|
iftype_disable=self.disable_iftype,
|
|
cipher_disable=self.disable_cipher)
|
|
|
|
super().__init__(self._radio.name)
|
|
|
|
def __del__(self):
|
|
super().__del__()
|
|
|
|
# If the radio was moved into a namespace this will fail
|
|
try:
|
|
self._radio.remove()
|
|
except:
|
|
pass
|
|
|
|
self._radio = None
|
|
|
|
def __str__(self):
|
|
ret = super().__str__()
|
|
|
|
if self.disable_iftype:
|
|
ret += '\tDisabled interface types: %s\n' % self.disable_iftype
|
|
|
|
if self.disable_cipher:
|
|
ret += '\tDisabled ciphers: %s\n' % self.disable_cipher
|
|
|
|
ret += '\tPath: %s' % self._radio.path
|
|
|
|
ret += '\n'
|
|
|
|
return ret
|
|
|
|
class HostapdInstance:
|
|
'''
|
|
A single instance of hostapd. In reality all hostapd instances
|
|
are started as a single process. This class just makes things
|
|
convenient for communicating with one of the hostapd APs.
|
|
'''
|
|
def __init__(self, config, radio):
|
|
self.radio = radio
|
|
self.config = config
|
|
self.cli = None
|
|
|
|
self.intf = radio.create_interface(self.config, 'hostapd')
|
|
self.intf.set_interface_state('up')
|
|
|
|
def __del__(self):
|
|
print("Removing HostapdInstance %s" % self.config)
|
|
self.intf.set_interface_state('down')
|
|
self.radio = None
|
|
self.intf = None
|
|
|
|
def __str__(self):
|
|
ret = 'Hostapd (%s)\n' % self.intf.name
|
|
ret += '\tConfig: %s\n' % self.config
|
|
|
|
return ret
|
|
|
|
class Hostapd:
|
|
'''
|
|
A set of running hostapd instances. This is really just a single
|
|
process since hostapd can be started with multiple config files.
|
|
'''
|
|
def __init__(self, ctx, radios, configs, radius):
|
|
self.ctx = ctx
|
|
|
|
if len(configs) != len(radios):
|
|
raise Exception("Config (%d) and radio (%d) list length not equal" % \
|
|
(len(configs), len(radios)))
|
|
|
|
print("Initializing hostapd instances")
|
|
|
|
Process(['ip', 'link', 'set', 'eth0', 'up']).wait()
|
|
Process(['ip', 'link', 'set', 'eth1', 'up']).wait()
|
|
|
|
self.global_ctrl_iface = '/var/run/hostapd/ctrl'
|
|
|
|
self.instances = [HostapdInstance(c, r) for c, r in zip(configs, radios)]
|
|
|
|
ifaces = [rad.interface.name for rad in radios]
|
|
ifaces = ','.join(ifaces)
|
|
|
|
args = ['hostapd', '-g', self.global_ctrl_iface]
|
|
|
|
if ifaces:
|
|
args.extend(['-i', ifaces])
|
|
|
|
#
|
|
# Config files should already be present in /tmp. This appends
|
|
# ctrl_interface and does any variable replacement. Currently
|
|
# this is just any $ifaceN occurrences.
|
|
#
|
|
for c in configs:
|
|
full_path = '/tmp/%s' % c
|
|
args.append(full_path)
|
|
|
|
self._rewrite_config(full_path)
|
|
|
|
if radius:
|
|
args.append(radius)
|
|
|
|
if ctx.is_verbose('hostapd'):
|
|
args.append('-d')
|
|
|
|
self.process = Process(args)
|
|
|
|
self.process.wait_for_socket(self.global_ctrl_iface, 30)
|
|
|
|
for hapd in self.instances:
|
|
self.process.wait_for_socket(hapd.intf.ctrl_interface, 30)
|
|
|
|
def attach_cli(self):
|
|
global config
|
|
|
|
for hapd in self.instances:
|
|
hapd.cli = config.hostapd.HostapdCLI(config=hapd.config)
|
|
|
|
def _rewrite_config(self, config):
|
|
'''
|
|
Replaces any $ifaceN values with the correct interface
|
|
names as well as appends the ctrl_interface path to
|
|
the config file.
|
|
'''
|
|
with open(config, 'r+') as f:
|
|
data = f.read()
|
|
to_replace = []
|
|
for match in re.finditer(r'\$iface[0-9]+', data):
|
|
tag = data[match.start():match.end()]
|
|
idx = tag.split('iface')[1]
|
|
|
|
to_replace.append((tag, self.instances[int(idx)].intf.name))
|
|
|
|
for r in to_replace:
|
|
data = data.replace(r[0], r[1], 1)
|
|
|
|
data += '\nctrl_interface=/var/run/hostapd\n'
|
|
|
|
f.write(data)
|
|
|
|
def __getitem__(self, config):
|
|
if not config:
|
|
return self.instances[0]
|
|
|
|
for hapd in self.instances:
|
|
if hapd.config == config:
|
|
return hapd
|
|
|
|
return None
|
|
|
|
def __del__(self):
|
|
print("Removing Hostapd")
|
|
try:
|
|
os.remove(self.global_ctrl_iface)
|
|
except:
|
|
print("Failed to remove %s" % self.global_ctrl_iface)
|
|
|
|
self.instances = None
|
|
|
|
# Hostapd may have already been stopped
|
|
if self.process:
|
|
self.ctx.stop_process(self.process)
|
|
|
|
self.ctx = None
|
|
|
|
# Hostapd creates simdb sockets for EAP-SIM/AKA tests but does not
|
|
# clean them up.
|
|
for f in glob("/tmp/eap_sim_db*"):
|
|
os.remove(f)
|
|
|
|
dbus_count = 0
|
|
|
|
class Namespace:
|
|
def __init__(self, args, name, radios):
|
|
self.dbus_address = None
|
|
self.name = name
|
|
self.radios = radios
|
|
self.args = args
|
|
|
|
Process(['ip', 'netns', 'add', name]).wait()
|
|
for r in radios:
|
|
Process(['iw', 'phy', r.name, 'set', 'netns', 'name', name]).wait()
|
|
|
|
self.start_dbus()
|
|
|
|
def reset(self):
|
|
self._bus = None
|
|
|
|
for r in self.radios:
|
|
r._radio = None
|
|
|
|
self.radios = []
|
|
|
|
Process.kill_all()
|
|
|
|
def __del__(self):
|
|
if self.name:
|
|
print("Removing namespace %s" % self.name)
|
|
|
|
Process(['ip', 'netns', 'del', self.name]).wait()
|
|
|
|
def get_bus(self):
|
|
return self._bus
|
|
|
|
def start_process(self, args, env=None, **kwargs):
|
|
if not env:
|
|
env = os.environ.copy()
|
|
|
|
if hasattr(self, "dbus_address"):
|
|
# In case this process needs DBus...
|
|
env['DBUS_SYSTEM_BUS_ADDRESS'] = self.dbus_address
|
|
|
|
return Process(args, namespace=self.name, env=env, **kwargs)
|
|
|
|
def stop_process(self, p, force=False):
|
|
p.kill(force)
|
|
|
|
def is_process_running(self, process):
|
|
for p in Process.get_all():
|
|
if p.namespace == self.name and p.args[0] == process:
|
|
return True
|
|
return False
|
|
|
|
def _cleanup_dbus(self):
|
|
try:
|
|
os.remove(self.dbus_address.split('=')[1])
|
|
except:
|
|
pass
|
|
|
|
os.remove(self.dbus_cfg)
|
|
|
|
def start_dbus(self):
|
|
global dbus_count
|
|
|
|
self.dbus_address = 'unix:path=/tmp/dbus%d' % dbus_count
|
|
self.dbus_cfg = '/tmp/dbus%d.conf' % dbus_count
|
|
dbus_count += 1
|
|
|
|
with open(self.dbus_cfg, 'w+') as f:
|
|
f.write(dbus_config)
|
|
f.write('<listen>%s</listen>\n' % self.dbus_address)
|
|
f.write('</busconfig>\n')
|
|
|
|
p = self.start_process(['dbus-daemon', '--config-file=%s' % self.dbus_cfg],
|
|
cleanup=self._cleanup_dbus)
|
|
|
|
p.wait_for_socket(self.dbus_address.split('=')[1], 5)
|
|
|
|
self._bus = dbus.bus.BusConnection(address_or_type=self.dbus_address)
|
|
|
|
def start_iwd(self, config_dir = '/tmp', storage_dir = '/tmp/iwd'):
|
|
args = []
|
|
iwd_radios = ','.join([r.name for r in self.radios if r.use == 'iwd'])
|
|
|
|
if self.args.valgrind:
|
|
args.extend(['valgrind', '--leak-check=full', '--track-origins=yes',
|
|
'--show-leak-kinds=all',
|
|
'--log-file=/tmp/valgrind.log.%p'])
|
|
|
|
args.extend(['iwd', '-E'])
|
|
|
|
if iwd_radios != '':
|
|
args.extend(['-p', iwd_radios])
|
|
|
|
if self.is_verbose(args[0]):
|
|
args.append('-d')
|
|
|
|
env = os.environ.copy()
|
|
|
|
env['CONFIGURATION_DIRECTORY'] = config_dir
|
|
env['STATE_DIRECTORY'] = storage_dir
|
|
|
|
if self.is_verbose('iwd-dhcp'):
|
|
env['IWD_DHCP_DEBUG'] = '1'
|
|
|
|
if self.is_verbose('iwd-tls'):
|
|
env['IWD_TLS_DEBUG'] = '1'
|
|
|
|
if self.is_verbose('iwd-acd'):
|
|
env['IWD_ACD_DEBUG'] = '1'
|
|
|
|
return self.start_process(args, env=env)
|
|
|
|
def is_verbose(self, process, log=True):
|
|
process = os.path.basename(process)
|
|
|
|
if self.args is None:
|
|
return False
|
|
|
|
# every process is verbose when logging is enabled
|
|
if log and self.args.log:
|
|
return True
|
|
|
|
if process in self.args.verbose:
|
|
return True
|
|
|
|
# Special case here to enable verbose output with valgrind running
|
|
if process == 'valgrind' and 'iwd' in self.args.verbose:
|
|
return True
|
|
|
|
# Handle any glob matches
|
|
for item in self.args.verbose:
|
|
if process in glob(item):
|
|
return True
|
|
|
|
return False
|
|
|
|
@staticmethod
|
|
def non_block_wait(func, timeout, *args, exception=True):
|
|
'''
|
|
Convenience function for waiting in a non blocking
|
|
manor using GLibs context iteration i.e. does not block
|
|
the main loop while waiting.
|
|
|
|
'func' will be called at least once and repeatedly until
|
|
either it returns success, throws an exception, or the
|
|
'timeout' expires.
|
|
|
|
'timeout' is the ultimate timeout in seconds
|
|
|
|
'*args' will be passed to 'func'
|
|
|
|
If 'exception' is an Exception type it will be raised.
|
|
If 'exception' is True a generic TimeoutError will be raised.
|
|
Any other value will not result in an exception.
|
|
'''
|
|
# Simple class for signaling the wait timeout
|
|
class Bool:
|
|
def __init__(self, value):
|
|
self.value = value
|
|
|
|
def wait_timeout_cb(done):
|
|
done.value = True
|
|
return False
|
|
|
|
mainloop = GLib.MainLoop()
|
|
done = Bool(False)
|
|
|
|
timeout = GLib.timeout_add_seconds(timeout, wait_timeout_cb, done)
|
|
context = mainloop.get_context()
|
|
|
|
while True:
|
|
context.iteration(may_block=False)
|
|
|
|
try:
|
|
ret = func(*args)
|
|
if ret:
|
|
if not done.value:
|
|
GLib.source_remove(timeout)
|
|
return ret
|
|
except Exception as e:
|
|
if not done.value:
|
|
GLib.source_remove(timeout)
|
|
raise e
|
|
|
|
sleep(0.1)
|
|
|
|
if done.value == True:
|
|
if isinstance(exception, Exception):
|
|
raise exception
|
|
elif type(exception) == bool and exception:
|
|
raise TimeoutError("Timeout on non_block_wait")
|
|
else:
|
|
return
|
|
|
|
def __str__(self):
|
|
ret = 'Namespace: %s\n' % self.name
|
|
ret += 'Processes:\n'
|
|
for p in Process.get_all():
|
|
ret += '\t%s' % str(p)
|
|
|
|
ret += 'Radios:\n'
|
|
if len(self.radios) > 0:
|
|
for r in self.radios:
|
|
ret += '\t%s\n' % str(r)
|
|
else:
|
|
ret += '\tNo Radios\n'
|
|
|
|
ret += 'DBus Address: %s\n' % self.dbus_address
|
|
ret += '===================================================\n\n'
|
|
|
|
return ret
|
|
|
|
class BarChart():
|
|
def __init__(self, height=10, max_width=80):
|
|
self._height = height
|
|
self._max_width = max_width
|
|
self._values = []
|
|
self._max_value = 0
|
|
self._min_value = 0
|
|
|
|
def add_value(self, value):
|
|
if len(self._values) == 0:
|
|
self._max_value = int(1.01 * value)
|
|
self._min_value = int(0.99 * value)
|
|
elif value > self._max_value:
|
|
self._max_value = int(1.01 * value)
|
|
elif value < self._min_value:
|
|
self._min_value = int(0.99 * value)
|
|
|
|
self._values.append(value)
|
|
|
|
def _value_to_stars(self, value):
|
|
# Need to scale value (range of min_value -> max_value) to
|
|
# a range of 0 -> height
|
|
#
|
|
# Scaled = ((value - min_value) / ( max_value - min_value)) * (Height - 0) + 0
|
|
|
|
return int(((value - self._min_value) /
|
|
(self._max_value - self._min_value)) * self._height)
|
|
|
|
def __str__(self):
|
|
# Need to map value from range 0 - self._height
|
|
ret = ''
|
|
|
|
for i, value in enumerate(self._values):
|
|
stars = self._value_to_stars(value)
|
|
ret += '[%3u] ' % i + '%-10s' % ('*' * stars) + '\t\t\t%d\n' % value
|
|
|
|
ret += '\n'
|
|
|
|
return ret
|
|
|
|
|
|
class TestContext(Namespace):
|
|
'''
|
|
Contains all information for a given set of tests being run
|
|
such as processes, radios, interfaces and test results.
|
|
'''
|
|
def __init__(self, args):
|
|
self.name = None
|
|
self.args = args
|
|
self.hw_config = None
|
|
self.hostapd = None
|
|
self.wpas_interfaces = None
|
|
self.cur_radio_id = 0
|
|
self.cur_iface_id = 0
|
|
self.radios = []
|
|
self.loopback_started = False
|
|
self.results = {}
|
|
self.mainloop = GLib.MainLoop()
|
|
self.namespaces = []
|
|
self._last_mem_available = 0
|
|
self._mem_chart = BarChart()
|
|
|
|
def start_dbus_monitor(self):
|
|
if not self.is_verbose('dbus-monitor'):
|
|
return
|
|
|
|
self.start_process(['dbus-monitor', '--address', self.dbus_address])
|
|
|
|
def start_haveged(self):
|
|
self.start_process(['haveged', '-F'])
|
|
|
|
def create_radios(self):
|
|
setup = self.hw_config['SETUP']
|
|
nradios = int(setup['num_radios'])
|
|
args = ['hwsim']
|
|
|
|
if self.hw_config['SETUP'].get('hwsim_medium', 'no') in ['no', '0', 'false']:
|
|
# register hwsim as medium
|
|
args.extend(['--no-register'])
|
|
|
|
self.start_process(args)
|
|
self.non_block_wait(self._bus.name_has_owner, 20, 'net.connman.hwsim',
|
|
exception=TimeoutError('net.connman.hwsim did not appear'))
|
|
|
|
for i in range(nradios):
|
|
name = 'rad%u' % i
|
|
|
|
# Get any [radX] sections. These are for configuring
|
|
# any special radios. This no longer requires a
|
|
# radio_conf list, we just assume radios start rad0
|
|
# and increment.
|
|
rad_config = None
|
|
if self.hw_config.has_section(name):
|
|
rad_config = self.hw_config[name]
|
|
|
|
self.radios.append(VirtualRadio(name, rad_config))
|
|
self.cur_radio_id += 1
|
|
|
|
def discover_radios(self):
|
|
import pyroute2
|
|
|
|
phys = []
|
|
|
|
try:
|
|
iw = pyroute2.iwutil.IW()
|
|
except:
|
|
iw = pyroute2.IW()
|
|
|
|
attrs = [phy['attrs'] for phy in iw.list_wiphy()]
|
|
|
|
for attr in attrs:
|
|
for key, value in attr:
|
|
if key == 'NL80211_ATTR_WIPHY_NAME':
|
|
if value not in phys:
|
|
phys.append(value)
|
|
break
|
|
|
|
print('Discovered radios: %s' % str(phys))
|
|
self.radios = [Radio(name) for name in phys]
|
|
|
|
def start_radios(self):
|
|
reg_domain = self.hw_config['SETUP'].get('reg_domain', None)
|
|
if reg_domain:
|
|
Process(['iw', 'reg', 'set', reg_domain]).wait()
|
|
|
|
if self.args.hw:
|
|
self.discover_radios()
|
|
else:
|
|
self.create_radios()
|
|
|
|
def start_hostapd(self):
|
|
if not 'HOSTAPD' in self.hw_config:
|
|
return
|
|
|
|
settings = self.hw_config['HOSTAPD']
|
|
|
|
if self.args.hw:
|
|
# Just grab the first N radios. It gets rather
|
|
# complicated trying to map radX radios specified in
|
|
# hw.conf so any passed through physical adapters are
|
|
# just given to hostapd/IWD as they appear during
|
|
# discovery.
|
|
#
|
|
# TODO: It may be desireable to map PCI/USB adapters to
|
|
# specific radX radios specified in the config but
|
|
# there are really 2 separate use cases here.
|
|
# 1. You want to test a *specific* radio with IWD
|
|
# or hostapd. For this you would want radX
|
|
# to map to a specific radio
|
|
# 2. You have many adapters in use to run multiple
|
|
# tests. In this case you would not care what
|
|
# was using each radio, just that there was
|
|
# enough to run all tests.
|
|
nradios = 0
|
|
for k, _ in settings.items():
|
|
if k == 'radius_server':
|
|
continue
|
|
nradios += 1
|
|
|
|
hapd_radios = self.radios[:nradios]
|
|
|
|
else:
|
|
hapd_radios = [rad for rad in self.radios if rad.name in settings]
|
|
|
|
hapd_configs = [conf for rad, conf in settings.items() if rad != 'radius_server']
|
|
|
|
radius_config = settings.get('radius_server', None)
|
|
|
|
self.hostapd = Hostapd(self, hapd_radios, hapd_configs, radius_config)
|
|
self.hostapd.attach_cli()
|
|
|
|
def get_frequencies(self):
|
|
frequencies = []
|
|
|
|
for hapd in self.hostapd.instances:
|
|
frequencies.append(hapd.cli.frequency)
|
|
|
|
return frequencies
|
|
|
|
def start_wpas_interfaces(self):
|
|
|
|
if 'WPA_SUPPLICANT' not in self.hw_config:
|
|
return
|
|
|
|
settings = self.hw_config['WPA_SUPPLICANT']
|
|
|
|
if self.args.hw:
|
|
nradios = len(settings.items())
|
|
|
|
wpas_radios = self.radios[:nradios]
|
|
self.wpas_interfaces = []
|
|
|
|
#
|
|
# Physical radios most likely will use a different name
|
|
# than 'rad#' but the config file is referenced by these
|
|
# 'rad#' names. Iterate through both the settings and
|
|
# physical radios to create interfaces associated with
|
|
# each config file.
|
|
#
|
|
for vrad, hwrad in zip(settings.items(), wpas_radios):
|
|
self.wpas_interfaces.append(hwrad.create_interface(vrad[1], 'wpas'))
|
|
|
|
else:
|
|
wpas_radios = [rad for rad in self.radios if rad.name in settings]
|
|
self.wpas_interfaces = [rad.create_interface(settings[rad.name], 'wpas') \
|
|
for rad in wpas_radios]
|
|
|
|
def start_ofono(self):
|
|
sim_keys = self.hw_config['SETUP'].get('sim_keys', None)
|
|
if not sim_keys:
|
|
print("Ofono not requred")
|
|
return
|
|
elif sim_keys != 'ofono':
|
|
os.environ['IWD_SIM_KEYS'] = sim_keys
|
|
return
|
|
|
|
if not find_binary(['ofonod']) or not find_binary(['phonesim']):
|
|
print("Ofono or Phonesim not found, skipping test")
|
|
return
|
|
|
|
Process(['ip', 'link', 'set', 'lo', 'up']).wait()
|
|
|
|
os.environ['OFONO_PHONESIM_CONFIG'] = '/tmp/phonesim.conf'
|
|
|
|
phonesim_args = ['phonesim', '-p', '12345', '/usr/share/phonesim/default.xml']
|
|
|
|
self.start_process(phonesim_args)
|
|
|
|
#
|
|
# TODO:
|
|
# Is there something to wait for? Without this phonesim rejects
|
|
# connections on all but the fist test.
|
|
#
|
|
time.sleep(3)
|
|
|
|
ofono_args = ['ofonod', '-n', '--plugin=atmodem,phonesim']
|
|
if self.is_verbose('ofonod'):
|
|
ofono_args.append('-d')
|
|
|
|
self.start_process(ofono_args)
|
|
|
|
print("Ofono started")
|
|
|
|
def create_namespaces(self):
|
|
if not self.hw_config.has_section('NameSpaces'):
|
|
return
|
|
|
|
for key, value in self.hw_config.items('NameSpaces'):
|
|
radio_names = value.split(',')
|
|
# Gather up radio objects for this namespace
|
|
radios = [rad for rad in self.radios if rad.name in radio_names]
|
|
|
|
# Remove radios from 'root' namespace
|
|
self.radios = list(set(self.radios) - set(radios))
|
|
|
|
self.namespaces.append(Namespace(self.args, key, radios))
|
|
|
|
def get_namespace(self, ns):
|
|
for n in self.namespaces:
|
|
if n.name == ns:
|
|
return n
|
|
|
|
return None
|
|
|
|
def stop_test_processes(self):
|
|
for n in self.namespaces:
|
|
n.reset()
|
|
|
|
self.namespaces = []
|
|
self.hostapd = None
|
|
self.wpas_interfaces = None
|
|
|
|
self.reset()
|
|
|
|
def meminfo_to_dict(self):
|
|
def removesuffix(string, suffix):
|
|
if string.endswith(suffix):
|
|
return string[:-len(suffix)]
|
|
return string
|
|
|
|
ret = {}
|
|
|
|
with open('/proc/meminfo', 'r') as f:
|
|
data = f.read().strip().split('\n')
|
|
|
|
for l in data:
|
|
entry = l.split(':')
|
|
ret[entry[0]] = int(removesuffix(entry[1], 'kB'))
|
|
|
|
return ret
|
|
|
|
def __str__(self):
|
|
ret = 'Arguments:\n'
|
|
for arg in vars(self.args):
|
|
ret += '\t --%s %s\n' % (arg, str(getattr(self.args, arg)))
|
|
|
|
ret += 'Hostapd:\n'
|
|
if self.hostapd:
|
|
for h in self.hostapd.instances:
|
|
ret += '\t%s\n' % str(h)
|
|
else:
|
|
ret += '\tNo Hostapd instances\n'
|
|
|
|
info = self.meminfo_to_dict()
|
|
self._mem_chart.add_value(info['MemAvailable'])
|
|
|
|
ret += 'Available Memory: %u kB\n' % info['MemAvailable']
|
|
ret += 'Last Test Delta: %+d kB\n' % (info['MemAvailable'] - self._last_mem_available)
|
|
ret += 'Per-test Usage:\n'
|
|
ret += str(self._mem_chart)
|
|
|
|
self._last_mem_available = info['MemAvailable']
|
|
|
|
ret += super().__str__()
|
|
|
|
for n in self.namespaces:
|
|
ret += n.__str__()
|
|
|
|
return ret
|
|
|
|
def build_unit_list(args):
|
|
'''
|
|
Build list of unit tests based on passed arguments. This first
|
|
checks for literal names provided in the arguments, then if
|
|
no matches were found, checks for a glob match.
|
|
'''
|
|
tests = []
|
|
test_root = args.testhome + '/unit'
|
|
|
|
for unit in args.unit_tests.split(','):
|
|
path = '%s/%s' % (test_root, unit)
|
|
if os.access(unit, os.X_OK):
|
|
tests.append(unit)
|
|
elif os.access(path, os.X_OK):
|
|
tests.append(path)
|
|
else:
|
|
# Full list or glob, first build up valid list of tests
|
|
matches = glob(path)
|
|
if matches == []:
|
|
raise Exception("Could not find test %s" % unit)
|
|
|
|
matches = [exe for exe in matches if os.access(exe, os.X_OK)]
|
|
|
|
tests.extend(matches)
|
|
|
|
return sorted(tests)
|
|
|
|
def build_test_list(args):
|
|
'''
|
|
Build list of auto test directories based on passed arguments.
|
|
First check for absolute paths, then look in <iwd>/autotests,
|
|
then glob match.
|
|
'''
|
|
tests = []
|
|
test_root = args.testhome + '/autotests'
|
|
|
|
# Run all tests
|
|
if not args.autotests:
|
|
# Get list of all autotests (committed in git)
|
|
tests = os.popen('git -C %s ls-files autotests/ | cut -f2 -d"/" \
|
|
| grep "test*" | uniq' % args.testhome).read() \
|
|
.strip().split('\n')
|
|
tests = [test_root + '/' + t for t in tests]
|
|
else:
|
|
print("Generating partial test list")
|
|
|
|
full_list = sorted(os.listdir(test_root))
|
|
|
|
for t in args.autotests.split(','):
|
|
path = '%s/%s' % (test_root, t)
|
|
if t.endswith('+'):
|
|
t = t.split('+')[0]
|
|
i = full_list.index(t)
|
|
|
|
tests = [test_root + '/' + x for x in full_list[i:] \
|
|
if x.startswith('test')]
|
|
elif os.path.exists(t):
|
|
if t not in tests:
|
|
tests.append(t)
|
|
elif os.path.exists(path):
|
|
if path not in tests:
|
|
tests.append(path)
|
|
else:
|
|
matches = glob(path)
|
|
if matches == []:
|
|
raise Exception("Could not find test %s" % t)
|
|
|
|
tests.extend(list(set(matches) - set(tests)))
|
|
|
|
return sorted(tests)
|
|
|
|
SimpleResult = namedtuple('SimpleResult', 'run failures errors skipped time')
|
|
|
|
def start_test(ctx, subtests, rqueue):
|
|
'''
|
|
Run an individual test. 'subtests' are parsed prior to calling
|
|
but these effectively make up a single test. 'rqueue' is the
|
|
results queue which is required since this is using
|
|
multiprocessing.
|
|
'''
|
|
run = 0
|
|
errors = 0
|
|
failures = 0
|
|
skipped = 0
|
|
|
|
start = time.time()
|
|
#
|
|
# Iterate through each individual python test.
|
|
#
|
|
for s in subtests:
|
|
loader = unittest.TestLoader()
|
|
try:
|
|
module = importlib.import_module(os.path.splitext(s)[0])
|
|
except OSError as e:
|
|
dbg(subprocess.check_output("cat /proc/buddyinfo", shell=True).decode('utf-8'))
|
|
dbg(subprocess.check_output("dmesg | tail -80", shell=True).decode('utf-8'))
|
|
print(ctx)
|
|
raise e
|
|
|
|
subtest = loader.loadTestsFromModule(module)
|
|
|
|
# The test suite is being (ab)used to get a bit more granularity
|
|
# with individual tests. The 'normal' way to use unittest is to
|
|
# just create a test suite and run them. The problem here is that
|
|
# test results are queued and printed at the very end so its
|
|
# difficult to know *where* a test failed (python gives a stack
|
|
# trace but printing the exception/failure immediately shows
|
|
# where in the debug logs something failed). Moreso if there are
|
|
# several test functions inside a single python file they run
|
|
# as a single test and it is difficult (again) to know where
|
|
# something failed.
|
|
|
|
# Iterating through each python test file
|
|
for test in subtest:
|
|
limit_funcs = []
|
|
|
|
if ctx.args.sub_tests:
|
|
for i in ctx.args.sub_tests:
|
|
if len(i.split('.')) == 2:
|
|
limit_funcs.append(i.split('.')[1])
|
|
|
|
# Iterating through individual test functions inside a
|
|
# Test() class. Due to the nature of unittest we have
|
|
# to jump through some hoops to set up the test class
|
|
# only once by turning the enumeration into a list, then
|
|
# enumerating (again) to keep track of the index (just
|
|
# enumerating the test class doesn't allow len() because
|
|
# it is not a list).
|
|
tlist = list(enumerate(test))
|
|
for index, t in enumerate(tlist):
|
|
# enumerate is returning a tuple, index 1 is our
|
|
# actual object.
|
|
t = t[1]
|
|
|
|
func, file = str(t).split(' ')
|
|
#
|
|
# TODO: There may be a better way of doing this
|
|
# but strigifying the test class gives us a string:
|
|
# <function> (<file>.<class>)
|
|
#
|
|
file = file.strip('()').split('.')[0] + '.py'
|
|
|
|
# Create an empty result here in case the test fails
|
|
result = TestResult()
|
|
|
|
try:
|
|
skip = len(limit_funcs) > 0 and func not in limit_funcs
|
|
|
|
# Set up class only on first test
|
|
if index == 0:
|
|
if not skip:
|
|
dbg("%s\n\t%s RUNNING" % (file, str(func)), end='')
|
|
t.setUpClass()
|
|
else:
|
|
if not skip:
|
|
dbg("\t%s RUNNING" % str(func), end='')
|
|
|
|
sys.__stdout__.flush()
|
|
|
|
Process.write_separators("\n====== %s:%s ======\n\n" % (file, func))
|
|
|
|
if not skip:
|
|
# Run test (setUp/tearDown run automatically)
|
|
result = t()
|
|
|
|
# Tear down class only on last test
|
|
if index == len(tlist) - 1:
|
|
t.tearDownClass()
|
|
|
|
if skip:
|
|
continue
|
|
except unittest.SkipTest as e:
|
|
result.skipped.append(t)
|
|
except Exception as e:
|
|
dbg('\n%s threw an uncaught exception:' % func)
|
|
traceback.print_exc(file=sys.__stdout__)
|
|
|
|
run += result.testsRun
|
|
errors += len(result.errors)
|
|
failures += len(result.failures)
|
|
skipped += len(result.skipped)
|
|
|
|
if len(result.skipped) > 0:
|
|
dbg(colored(" SKIPPED", "cyan"))
|
|
elif run == 0 or len(result.errors) > 0 or len(result.failures) > 0:
|
|
dbg(colored(" FAILED", "red"))
|
|
for e in result.errors:
|
|
dbg(e[1])
|
|
for f in result.failures:
|
|
dbg(f[1])
|
|
else:
|
|
dbg(colored(" PASSED", "green"))
|
|
|
|
# Prevents future test modules with the same name (e.g.
|
|
# connection_test.py) from being loaded from the cache
|
|
sys.modules.pop(module.__name__)
|
|
|
|
#
|
|
# The multiprocessing queue is picky with what objects it will serialize
|
|
# and send between processes. Because of this we put the important bits
|
|
# of the result into our own 'SimpleResult' tuple.
|
|
#
|
|
sresult = SimpleResult(run=run, failures=failures, errors=errors,
|
|
skipped=skipped, time=time.time() - start)
|
|
rqueue.put(sresult)
|
|
|
|
# This may not be required since we are manually popping sys.modules
|
|
importlib.invalidate_caches()
|
|
|
|
def pre_test(ctx, test, copied):
|
|
'''
|
|
Copy test files, start processes, and any other pre test work.
|
|
'''
|
|
os.chdir(test)
|
|
|
|
dbg("\nStarting %s" % colored(os.path.basename(test), "white", attrs=['bold']))
|
|
if not os.path.exists(test + '/hw.conf'):
|
|
raise Exception("No hw.conf found for %s" % test)
|
|
|
|
ctx.hw_config = ConfigParser()
|
|
ctx.hw_config.read(test + '/hw.conf')
|
|
#
|
|
# We have two types of test files: tests and everything else. Rather
|
|
# than require each test to specify the files needing to be copied to
|
|
# /tmp (previously 'tmpfs_extra_stuff'), we just copy everything which
|
|
# isn't a test. There is really no reason not to do this as any file
|
|
# present in a test directory should be needed by the test.
|
|
#
|
|
# All files
|
|
files = os.listdir(test)
|
|
# Tests (starts or ends with 'test')
|
|
subtests = [f for f in files if f.startswith('test') or \
|
|
os.path.splitext(f)[0].endswith('test')]
|
|
# Everything else (except .py files)
|
|
to_copy = [f for f in list(set(files) - set(subtests)) if not f.endswith('.py') \
|
|
and f != '__pycache__']
|
|
for f in to_copy:
|
|
if os.path.isdir(f):
|
|
shutil.copytree(f, '/tmp/' + f)
|
|
else:
|
|
shutil.copy(f, '/tmp')
|
|
copied.append(f)
|
|
|
|
# Prune down any subtests if needed
|
|
if ctx.args.sub_tests:
|
|
ctx.args.sub_tests = ctx.args.sub_tests.split(',')
|
|
|
|
to_run = [x.split('.')[0] for x in ctx.args.sub_tests]
|
|
pruned = []
|
|
|
|
for s in subtests:
|
|
no_ext = s
|
|
# Handle <file>.<test function> format
|
|
if '.' in s:
|
|
no_ext = s.split('.')[0]
|
|
|
|
if no_ext in to_run:
|
|
pruned.append(no_ext + '.py')
|
|
|
|
subtests = pruned
|
|
|
|
if ctx.args.log:
|
|
ctx.start_process(['iwmon', '--nowiphy'])
|
|
elif ctx.args.monitor:
|
|
ctx.start_process(['iwmon'], outfile=ctx.args.monitor)
|
|
|
|
ctx.start_dbus()
|
|
ctx.start_haveged()
|
|
ctx.start_dbus_monitor()
|
|
ctx.start_radios()
|
|
ctx.create_namespaces()
|
|
ctx.start_hostapd()
|
|
ctx.start_wpas_interfaces()
|
|
ctx.start_ofono()
|
|
|
|
if ctx.hw_config.has_option('SETUP', 'start_iwd'):
|
|
start = ctx.hw_config.getboolean('SETUP', 'start_iwd')
|
|
else:
|
|
start = True
|
|
|
|
if start:
|
|
ctx.start_iwd()
|
|
else:
|
|
print("Not starting IWD from test-runner")
|
|
|
|
print(ctx)
|
|
|
|
sys.path.insert(1, test)
|
|
|
|
return sorted(subtests)
|
|
|
|
def post_test(ctx, to_copy):
|
|
'''
|
|
Remove copied files, and stop test processes.
|
|
'''
|
|
try:
|
|
for f in to_copy:
|
|
if os.path.isdir('/tmp/' + f):
|
|
shutil.rmtree('/tmp/' + f)
|
|
else:
|
|
os.remove('/tmp/' + f)
|
|
|
|
Process(['ip', 'link', 'set', 'lo', 'down']).wait()
|
|
except Exception as e:
|
|
print("Exception thrown in post_test")
|
|
finally:
|
|
ctx.stop_test_processes()
|
|
|
|
if ctx.args.valgrind:
|
|
for f in os.listdir('/tmp'):
|
|
if f.startswith("valgrind.log."):
|
|
dbg(f)
|
|
with open('/tmp/' + f, 'r') as v:
|
|
dbg(v.read())
|
|
dbg("\n")
|
|
os.remove('/tmp/' + f)
|
|
|
|
# Special case for when logging is enabled
|
|
if os.path.isfile('/tmp/iwd-tls-debug-server-cert.pem'):
|
|
os.remove('/tmp/iwd-tls-debug-server-cert.pem')
|
|
|
|
allowed = ['phonesim.conf', 'certs', 'secrets', 'iwd']
|
|
for f in [f for f in os.listdir('/tmp') if f not in allowed]:
|
|
dbg("File %s was not cleaned up!" % f)
|
|
try:
|
|
os.remove('/tmp/' + f)
|
|
except:
|
|
pass
|
|
|
|
def print_results(results):
|
|
table = PrettyTable(['Test', colored('Passed', 'green'), colored('Failed', 'red'), \
|
|
colored('Skipped', 'cyan'), colored('Time', 'yellow')])
|
|
|
|
total_pass = 0
|
|
total_fail = 0
|
|
total_skip = 0
|
|
total_time = 0
|
|
|
|
for test, result in results.items():
|
|
|
|
if result.time == TEST_MAX_TIMEOUT:
|
|
failed = "Timed out"
|
|
passed = "Timed out"
|
|
elif result.time == 0:
|
|
failed = "Exception"
|
|
passed = "Exception"
|
|
else:
|
|
failed = result.failures + result.errors
|
|
passed = result.run - failed
|
|
|
|
total_pass += passed
|
|
total_fail += failed
|
|
total_skip += result.skipped
|
|
|
|
total_time += result.time
|
|
|
|
time = '%.2f' % result.time
|
|
|
|
table.add_row([test, colored(passed, 'green'), colored(failed, 'red'), \
|
|
colored(result.skipped, 'cyan'), colored(time, 'yellow')])
|
|
|
|
total_time = '%.2f' % total_time
|
|
|
|
table.add_row(['Total', colored(total_pass, 'green'), colored(total_fail, 'red'), \
|
|
colored(total_skip, 'cyan'), colored(total_time, 'yellow')])
|
|
|
|
dbg(table)
|
|
|
|
return total_fail == 0
|
|
|
|
def run_auto_tests(ctx, args):
|
|
tests = build_test_list(args)
|
|
|
|
for test in tests:
|
|
copied = []
|
|
try:
|
|
subtests = pre_test(ctx, test, copied)
|
|
|
|
if len(subtests) < 1:
|
|
dbg("No tests to run")
|
|
sys.exit()
|
|
|
|
rqueue = multiprocessing.Queue()
|
|
p = multiprocessing.Process(target=start_test, args=(ctx, subtests, rqueue))
|
|
p.start()
|
|
# Rather than time each subtest we just time the total but
|
|
# mutiply the default time by the number of tests being run.
|
|
p.join(TEST_MAX_TIMEOUT * len(subtests))
|
|
|
|
if p.is_alive():
|
|
# Timeout
|
|
p.terminate()
|
|
|
|
ctx.results[os.path.basename(test)] = SimpleResult(run=0,
|
|
failures=0, errors=0,
|
|
skipped=0, time=TEST_MAX_TIMEOUT)
|
|
else:
|
|
ctx.results[os.path.basename(test)] = rqueue.get()
|
|
|
|
except Exception as ex:
|
|
dbg("%s threw an uncaught exception" % test)
|
|
traceback.print_exc(file=sys.__stdout__)
|
|
ctx.results[os.path.basename(test)] = SimpleResult(run=0, failures=0,
|
|
errors=0, skipped=0, time=0)
|
|
finally:
|
|
post_test(ctx, copied)
|
|
|
|
def run_unit_tests(ctx, args):
|
|
os.chdir(args.testhome + '/unit')
|
|
units = build_unit_list(args)
|
|
|
|
for u in units:
|
|
p = ctx.start_process([u]).wait()
|
|
if p.returncode != 0:
|
|
dbg("Unit test %s failed" % os.path.basename(u))
|
|
else:
|
|
dbg("Unit test %s passed" % os.path.basename(u))
|
|
|
|
def run_tests(args):
|
|
global config
|
|
|
|
os.chdir(args.testhome)
|
|
|
|
#
|
|
# This allows all autotest utils (iwd/hostapd/etc) to access the
|
|
# TestContext. Any other module or script (in the same interpreter) can
|
|
# simply import config.ctx and access all live test information,
|
|
# start/stop processes, see active radios etc.
|
|
#
|
|
config = importlib.import_module('config')
|
|
config.ctx = TestContext(args)
|
|
|
|
# Must import these after config so ctx gets set
|
|
config.hwsim = importlib.import_module('hwsim')
|
|
config.hostapd = importlib.import_module('hostapd')
|
|
|
|
# Start writing out kernel log
|
|
config.ctx.start_process(["dmesg", '--follow'])
|
|
|
|
if args.unit_tests is None:
|
|
run_auto_tests(config.ctx, args)
|
|
else:
|
|
run_unit_tests(config.ctx, args)
|
|
|
|
runner = Runner()
|
|
|
|
atexit.register(exit_vm)
|
|
runner.prepare_environment()
|
|
run_tests(runner.args)
|
|
runner.cleanup_environment()
|