mirror of
https://git.kernel.org/pub/scm/network/wireless/iwd.git
synced 2025-01-05 04:32:34 +01:00
1ecadc3952
In UML if any process dies while test-runner is waiting for the DBus service or some socket to be available it will block forever. This is due to the way the non_block_wait works. Its not optimal but it essentially polls over some input function until the conditions are met. And, depending on the input function, this can cause UML to hang since it never has a chance to go idle and advance the time clock. This can be fixed, at least for services/sockets, by sleeping in the input function allowing time to pass. This will then allow test-runner to bail out with an exception. This patch adds a new wait_for_service function which handles this automatically, and wait_for_socket was refactored to behave similarly.
1016 lines
26 KiB
Python
Executable File
1016 lines
26 KiB
Python
Executable File
#!/usr/bin/python3
|
|
|
|
import os
|
|
import shutil
|
|
import sys
|
|
import subprocess
|
|
import atexit
|
|
import time
|
|
import unittest
|
|
import importlib
|
|
from unittest.result import TestResult
|
|
import multiprocessing
|
|
import re
|
|
import traceback
|
|
|
|
from configparser import ConfigParser
|
|
from prettytable import PrettyTable
|
|
from termcolor import colored
|
|
from glob import glob
|
|
from collections import namedtuple
|
|
import dbus.mainloop.glib
|
|
from gi.repository import GLib
|
|
|
|
from runner import Runner
|
|
from utils import Process, Namespace, BarChart
|
|
|
|
config = None
|
|
intf_id = 0
|
|
|
|
TEST_MAX_TIMEOUT = 240
|
|
|
|
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
|
|
|
def dbg(*s, **kwargs):
|
|
'''
|
|
Allows prints if stdout has been re-directed
|
|
'''
|
|
print(*s, **kwargs, file=sys.__stdout__)
|
|
|
|
def exit_vm():
|
|
if config:
|
|
for p in Process.get_all():
|
|
print("Process %s still running!" % p.args[0])
|
|
p.kill()
|
|
|
|
if config.ctx and config.ctx.results:
|
|
success = print_results(config.ctx.results)
|
|
else:
|
|
success = False
|
|
|
|
if config.ctx.args.result:
|
|
result = 'PASS' if success else 'FAIL'
|
|
with open(config.ctx.args.result, 'w') as f:
|
|
f.write(result)
|
|
|
|
os.sync()
|
|
|
|
runner.stop()
|
|
|
|
class Interface:
|
|
def __init__(self, name, config):
|
|
self.name = name
|
|
self.ctrl_interface = '/var/run/hostapd/' + name
|
|
self.config = config
|
|
|
|
def __del__(self):
|
|
Process(['iw', 'dev', self.name, 'del']).wait()
|
|
|
|
def set_interface_state(self, state):
|
|
Process(['ip', 'link', 'set', self.name, state]).wait()
|
|
|
|
class Radio:
|
|
def __init__(self, name):
|
|
self.name = name
|
|
# hostapd will reset this if this radio is used by it
|
|
self.use = 'iwd'
|
|
self.interface = None
|
|
|
|
def __del__(self):
|
|
print("Removing radio %s" % self.name)
|
|
self.interface = None
|
|
|
|
def create_interface(self, config, use):
|
|
global intf_id
|
|
|
|
ifname = 'wln%s' % intf_id
|
|
|
|
intf_id += 1
|
|
|
|
self.interface = Interface(ifname, config)
|
|
self.use = use
|
|
|
|
Process(['iw', 'phy', self.name, 'interface', 'add', ifname,
|
|
'type', 'managed']).wait()
|
|
|
|
return self.interface
|
|
|
|
def __str__(self):
|
|
ret = self.name + ':\n'
|
|
ret += '\tUsed By: %s ' % self.use
|
|
if self.interface:
|
|
ret += '(%s)' % self.interface.name
|
|
|
|
ret += '\n'
|
|
|
|
return ret
|
|
|
|
class VirtualRadio(Radio):
|
|
'''
|
|
A subclass of 'Radio' specific to mac80211_hwsim radios.
|
|
|
|
TODO: Using D-Bus to create and destroy radios is more desireable
|
|
than the command line.
|
|
'''
|
|
|
|
def __init__(self, name, cfg=None):
|
|
global config
|
|
|
|
self.disable_cipher = None
|
|
self.disable_iftype = None
|
|
|
|
self.hwsim = config.hwsim.Hwsim()
|
|
|
|
if cfg:
|
|
self.disable_iftype = cfg.get('iftype_disable', None)
|
|
self.disable_cipher = cfg.get('cipher_disable', None)
|
|
|
|
self._radio = self.hwsim.radios.create(name, p2p_device=True,
|
|
iftype_disable=self.disable_iftype,
|
|
cipher_disable=self.disable_cipher)
|
|
|
|
super().__init__(self._radio.name)
|
|
|
|
def __del__(self):
|
|
super().__del__()
|
|
|
|
# If the radio was moved into a namespace this will fail
|
|
try:
|
|
self._radio.remove()
|
|
except:
|
|
pass
|
|
|
|
self._radio = None
|
|
|
|
def __str__(self):
|
|
ret = super().__str__()
|
|
|
|
if self.disable_iftype:
|
|
ret += '\tDisabled interface types: %s\n' % self.disable_iftype
|
|
|
|
if self.disable_cipher:
|
|
ret += '\tDisabled ciphers: %s\n' % self.disable_cipher
|
|
|
|
ret += '\tPath: %s' % self._radio.path
|
|
|
|
ret += '\n'
|
|
|
|
return ret
|
|
|
|
class HostapdInstance:
|
|
'''
|
|
A single instance of hostapd. In reality all hostapd instances
|
|
are started as a single process. This class just makes things
|
|
convenient for communicating with one of the hostapd APs.
|
|
'''
|
|
def __init__(self, config, radio):
|
|
self.radio = radio
|
|
self.config = config
|
|
self.cli = None
|
|
|
|
self.intf = radio.create_interface(self.config, 'hostapd')
|
|
self.intf.set_interface_state('up')
|
|
|
|
def __del__(self):
|
|
print("Removing HostapdInstance %s" % self.config)
|
|
self.intf.set_interface_state('down')
|
|
self.radio = None
|
|
self.intf = None
|
|
|
|
def __str__(self):
|
|
ret = 'Hostapd (%s)\n' % self.intf.name
|
|
ret += '\tConfig: %s\n' % self.config
|
|
|
|
return ret
|
|
|
|
class Hostapd:
|
|
'''
|
|
A set of running hostapd instances. This is really just a single
|
|
process since hostapd can be started with multiple config files.
|
|
'''
|
|
def __init__(self, radios, configs, radius):
|
|
if len(configs) != len(radios):
|
|
raise Exception("Config (%d) and radio (%d) list length not equal" % \
|
|
(len(configs), len(radios)))
|
|
|
|
print("Initializing hostapd instances")
|
|
|
|
Process(['ip', 'link', 'set', 'eth0', 'up']).wait()
|
|
Process(['ip', 'link', 'set', 'eth1', 'up']).wait()
|
|
|
|
self.global_ctrl_iface = '/var/run/hostapd/ctrl'
|
|
|
|
self.instances = [HostapdInstance(c, r) for c, r in zip(configs, radios)]
|
|
|
|
ifaces = [rad.interface.name for rad in radios]
|
|
ifaces = ','.join(ifaces)
|
|
|
|
args = ['hostapd', '-g', self.global_ctrl_iface]
|
|
|
|
if ifaces:
|
|
args.extend(['-i', ifaces])
|
|
|
|
#
|
|
# Config files should already be present in /tmp. This appends
|
|
# ctrl_interface and does any variable replacement. Currently
|
|
# this is just any $ifaceN occurrences.
|
|
#
|
|
for c in configs:
|
|
full_path = '/tmp/%s' % c
|
|
args.append(full_path)
|
|
|
|
self._rewrite_config(full_path)
|
|
|
|
if radius:
|
|
args.append(radius)
|
|
|
|
if Process.is_verbose('hostapd'):
|
|
args.append('-d')
|
|
|
|
self.process = Process(args)
|
|
|
|
self.process.wait_for_socket(self.global_ctrl_iface, 30)
|
|
|
|
for hapd in self.instances:
|
|
self.process.wait_for_socket(hapd.intf.ctrl_interface, 30)
|
|
|
|
def attach_cli(self):
|
|
global config
|
|
|
|
for hapd in self.instances:
|
|
hapd.cli = config.hostapd.HostapdCLI(config=hapd.config)
|
|
|
|
def _rewrite_config(self, config):
|
|
'''
|
|
Replaces any $ifaceN values with the correct interface
|
|
names as well as appends the ctrl_interface path to
|
|
the config file.
|
|
'''
|
|
with open(config, 'r+') as f:
|
|
data = f.read()
|
|
to_replace = []
|
|
for match in re.finditer(r'\$iface[0-9]+', data):
|
|
tag = data[match.start():match.end()]
|
|
idx = tag.split('iface')[1]
|
|
|
|
to_replace.append((tag, self.instances[int(idx)].intf.name))
|
|
|
|
for r in to_replace:
|
|
data = data.replace(r[0], r[1], 1)
|
|
|
|
data += '\nctrl_interface=/var/run/hostapd\n'
|
|
|
|
f.write(data)
|
|
|
|
def __getitem__(self, config):
|
|
if not config:
|
|
return self.instances[0]
|
|
|
|
for hapd in self.instances:
|
|
if hapd.config == config:
|
|
return hapd
|
|
|
|
return None
|
|
|
|
def __del__(self):
|
|
print("Removing Hostapd")
|
|
try:
|
|
os.remove(self.global_ctrl_iface)
|
|
except:
|
|
print("Failed to remove %s" % self.global_ctrl_iface)
|
|
|
|
for hapd in self.instances:
|
|
GLib.source_remove(hapd.cli.io_watch)
|
|
|
|
# Hostapd may have already been stopped
|
|
if self.process:
|
|
self.process.kill()
|
|
|
|
# Hostapd creates simdb sockets for EAP-SIM/AKA tests but does not
|
|
# clean them up.
|
|
for f in glob("/tmp/eap_sim_db*"):
|
|
os.remove(f)
|
|
|
|
class TestContext(Namespace):
|
|
'''
|
|
Contains all information for a given set of tests being run
|
|
such as processes, radios, interfaces and test results.
|
|
'''
|
|
def __init__(self, args):
|
|
self.name = None
|
|
self.args = args
|
|
self.hw_config = None
|
|
self.hostapd = None
|
|
self.wpas_interfaces = None
|
|
self.radios = []
|
|
self.results = {}
|
|
self.namespaces = []
|
|
self._last_mem_available = 0
|
|
self._mem_chart = BarChart()
|
|
|
|
def start_dbus_monitor(self):
|
|
if not Process.is_verbose('dbus-monitor'):
|
|
return
|
|
|
|
self.start_process(['dbus-monitor', '--address', self.dbus_address])
|
|
|
|
def start_haveged(self):
|
|
self.start_process(['haveged', '-F'])
|
|
|
|
def create_radios(self):
|
|
setup = self.hw_config['SETUP']
|
|
nradios = int(setup['num_radios'])
|
|
args = ['hwsim']
|
|
|
|
if self.hw_config['SETUP'].get('hwsim_medium', 'no') in ['no', '0', 'false']:
|
|
# register hwsim as medium
|
|
args.extend(['--no-register'])
|
|
|
|
proc = self.start_process(args)
|
|
proc.wait_for_service(self, 'net.connman.hwsim', 20)
|
|
|
|
for i in range(nradios):
|
|
name = 'rad%u' % i
|
|
|
|
# Get any [radX] sections. These are for configuring
|
|
# any special radios. This no longer requires a
|
|
# radio_conf list, we just assume radios start rad0
|
|
# and increment.
|
|
rad_config = None
|
|
if self.hw_config.has_section(name):
|
|
rad_config = self.hw_config[name]
|
|
|
|
self.radios.append(VirtualRadio(name, rad_config))
|
|
|
|
def discover_radios(self):
|
|
import pyroute2
|
|
|
|
phys = []
|
|
|
|
try:
|
|
iw = pyroute2.iwutil.IW()
|
|
except:
|
|
iw = pyroute2.IW()
|
|
|
|
attrs = [phy['attrs'] for phy in iw.list_wiphy()]
|
|
|
|
for attr in attrs:
|
|
for key, value in attr:
|
|
if key == 'NL80211_ATTR_WIPHY_NAME':
|
|
if value not in phys:
|
|
phys.append(value)
|
|
break
|
|
|
|
print('Discovered radios: %s' % str(phys))
|
|
self.radios = [Radio(name) for name in phys]
|
|
|
|
def start_radios(self):
|
|
reg_domain = self.hw_config['SETUP'].get('reg_domain', None)
|
|
if reg_domain:
|
|
Process(['iw', 'reg', 'set', reg_domain]).wait()
|
|
|
|
if self.args.hw:
|
|
self.discover_radios()
|
|
else:
|
|
self.create_radios()
|
|
|
|
def start_hostapd(self):
|
|
if not 'HOSTAPD' in self.hw_config:
|
|
return
|
|
|
|
settings = self.hw_config['HOSTAPD']
|
|
|
|
if self.args.hw:
|
|
# Just grab the first N radios. It gets rather
|
|
# complicated trying to map radX radios specified in
|
|
# hw.conf so any passed through physical adapters are
|
|
# just given to hostapd/IWD as they appear during
|
|
# discovery.
|
|
#
|
|
# TODO: It may be desireable to map PCI/USB adapters to
|
|
# specific radX radios specified in the config but
|
|
# there are really 2 separate use cases here.
|
|
# 1. You want to test a *specific* radio with IWD
|
|
# or hostapd. For this you would want radX
|
|
# to map to a specific radio
|
|
# 2. You have many adapters in use to run multiple
|
|
# tests. In this case you would not care what
|
|
# was using each radio, just that there was
|
|
# enough to run all tests.
|
|
nradios = 0
|
|
for k, _ in settings.items():
|
|
if k == 'radius_server':
|
|
continue
|
|
nradios += 1
|
|
|
|
hapd_radios = self.radios[:nradios]
|
|
|
|
else:
|
|
hapd_radios = [rad for rad in self.radios if rad.name in settings]
|
|
|
|
hapd_configs = [conf for rad, conf in settings.items() if rad != 'radius_server']
|
|
|
|
radius_config = settings.get('radius_server', None)
|
|
|
|
self.hostapd = Hostapd(hapd_radios, hapd_configs, radius_config)
|
|
self.hostapd.attach_cli()
|
|
|
|
def get_frequencies(self):
|
|
frequencies = []
|
|
|
|
for hapd in self.hostapd.instances:
|
|
frequencies.append(hapd.cli.frequency)
|
|
|
|
return frequencies
|
|
|
|
def start_wpas_interfaces(self):
|
|
if 'WPA_SUPPLICANT' not in self.hw_config:
|
|
return
|
|
|
|
if not shutil.which('wpa_supplicant'):
|
|
print('wpa_supplicant not found, dependent tests will be skipped')
|
|
return
|
|
|
|
settings = self.hw_config['WPA_SUPPLICANT']
|
|
|
|
if self.args.hw:
|
|
nradios = len(settings.items())
|
|
|
|
wpas_radios = self.radios[:nradios]
|
|
self.wpas_interfaces = []
|
|
|
|
#
|
|
# Physical radios most likely will use a different name
|
|
# than 'rad#' but the config file is referenced by these
|
|
# 'rad#' names. Iterate through both the settings and
|
|
# physical radios to create interfaces associated with
|
|
# each config file.
|
|
#
|
|
for vrad, hwrad in zip(settings.items(), wpas_radios):
|
|
self.wpas_interfaces.append(hwrad.create_interface(vrad[1], 'wpas'))
|
|
|
|
else:
|
|
wpas_radios = [rad for rad in self.radios if rad.name in settings]
|
|
self.wpas_interfaces = [rad.create_interface(settings[rad.name], 'wpas') \
|
|
for rad in wpas_radios]
|
|
|
|
def start_ofono(self):
|
|
sim_keys = self.hw_config['SETUP'].get('sim_keys', None)
|
|
if not sim_keys:
|
|
print("Ofono not requred")
|
|
return
|
|
elif sim_keys != 'ofono':
|
|
os.environ['IWD_SIM_KEYS'] = sim_keys
|
|
return
|
|
|
|
if not shutil.which('ofonod') or not shutil.which('phonesim'):
|
|
print("Ofono or Phonesim not found, skipping test")
|
|
return
|
|
|
|
Process(['ip', 'link', 'set', 'lo', 'up']).wait()
|
|
|
|
os.environ['OFONO_PHONESIM_CONFIG'] = '/tmp/phonesim.conf'
|
|
|
|
phonesim_args = ['phonesim', '-p', '12345', '/usr/share/phonesim/default.xml']
|
|
|
|
self.start_process(phonesim_args)
|
|
|
|
#
|
|
# TODO:
|
|
# Is there something to wait for? Without this phonesim rejects
|
|
# connections on all but the fist test.
|
|
#
|
|
time.sleep(3)
|
|
|
|
ofono_args = ['ofonod', '-n', '--plugin=atmodem,phonesim']
|
|
if Process.is_verbose('ofonod'):
|
|
ofono_args.append('-d')
|
|
|
|
self.start_process(ofono_args)
|
|
|
|
print("Ofono started")
|
|
|
|
def create_namespaces(self):
|
|
if not self.hw_config.has_section('NameSpaces'):
|
|
return
|
|
|
|
for key, value in self.hw_config.items('NameSpaces'):
|
|
radio_names = value.split(',')
|
|
# Gather up radio objects for this namespace
|
|
radios = [rad for rad in self.radios if rad.name in radio_names]
|
|
|
|
# Remove radios from 'root' namespace
|
|
self.radios = list(set(self.radios) - set(radios))
|
|
|
|
self.namespaces.append(Namespace(self.args, key, radios))
|
|
|
|
def get_namespace(self, ns):
|
|
for n in self.namespaces:
|
|
if n.name == ns:
|
|
return n
|
|
|
|
return None
|
|
|
|
def stop_test_processes(self):
|
|
for n in self.namespaces:
|
|
n.reset()
|
|
|
|
self.namespaces = []
|
|
self.hostapd = None
|
|
self.wpas_interfaces = None
|
|
|
|
self.reset()
|
|
|
|
def meminfo_to_dict(self):
|
|
def removesuffix(string, suffix):
|
|
if string.endswith(suffix):
|
|
return string[:-len(suffix)]
|
|
return string
|
|
|
|
ret = {}
|
|
|
|
with open('/proc/meminfo', 'r') as f:
|
|
data = f.read().strip().split('\n')
|
|
|
|
for l in data:
|
|
entry = l.split(':')
|
|
ret[entry[0]] = int(removesuffix(entry[1], 'kB'))
|
|
|
|
return ret
|
|
|
|
def __str__(self):
|
|
ret = 'Arguments:\n'
|
|
for arg in vars(self.args):
|
|
ret += '\t --%s %s\n' % (arg, str(getattr(self.args, arg)))
|
|
|
|
ret += 'Hostapd:\n'
|
|
if self.hostapd:
|
|
for h in self.hostapd.instances:
|
|
ret += '\t%s\n' % str(h)
|
|
else:
|
|
ret += '\tNo Hostapd instances\n'
|
|
|
|
info = self.meminfo_to_dict()
|
|
self._mem_chart.add_value(info['MemAvailable'])
|
|
|
|
ret += 'Available Memory: %u kB\n' % info['MemAvailable']
|
|
ret += 'Last Test Delta: %+d kB\n' % (info['MemAvailable'] - self._last_mem_available)
|
|
ret += 'Per-test Usage:\n'
|
|
ret += str(self._mem_chart)
|
|
|
|
self._last_mem_available = info['MemAvailable']
|
|
|
|
ret += super().__str__()
|
|
|
|
for n in self.namespaces:
|
|
ret += n.__str__()
|
|
|
|
return ret
|
|
|
|
def build_unit_list(args):
|
|
'''
|
|
Build list of unit tests based on passed arguments. This first
|
|
checks for literal names provided in the arguments, then if
|
|
no matches were found, checks for a glob match.
|
|
'''
|
|
tests = []
|
|
test_root = args.testhome + '/unit'
|
|
|
|
for unit in args.unit_tests.split(','):
|
|
path = '%s/%s' % (test_root, unit)
|
|
if os.access(unit, os.X_OK):
|
|
tests.append(unit)
|
|
elif os.access(path, os.X_OK):
|
|
tests.append(path)
|
|
else:
|
|
# Full list or glob, first build up valid list of tests
|
|
matches = glob(path)
|
|
if matches == []:
|
|
raise Exception("Could not find test %s" % unit)
|
|
|
|
matches = [exe for exe in matches if os.access(exe, os.X_OK)]
|
|
|
|
tests.extend(matches)
|
|
|
|
return sorted(tests)
|
|
|
|
def build_test_list(args):
|
|
'''
|
|
Build list of auto test directories based on passed arguments.
|
|
First check for absolute paths, then look in <iwd>/autotests,
|
|
then glob match.
|
|
'''
|
|
tests = []
|
|
test_root = args.testhome + '/autotests'
|
|
|
|
# Run all tests
|
|
if not args.autotests:
|
|
# Get list of all autotests (committed in git)
|
|
tests = os.popen('git -C %s ls-files autotests/ | cut -f2 -d"/" \
|
|
| grep "test*" | uniq' % args.testhome).read() \
|
|
.strip().split('\n')
|
|
tests = [test_root + '/' + t for t in tests]
|
|
else:
|
|
print("Generating partial test list")
|
|
|
|
full_list = sorted(os.listdir(test_root))
|
|
|
|
for t in args.autotests.split(','):
|
|
path = '%s/%s' % (test_root, t)
|
|
if t.endswith('+'):
|
|
t = t.split('+')[0]
|
|
i = full_list.index(t)
|
|
|
|
tests = [test_root + '/' + x for x in full_list[i:] \
|
|
if x.startswith('test')]
|
|
elif os.path.exists(t):
|
|
if t not in tests:
|
|
tests.append(t)
|
|
elif os.path.exists(path):
|
|
if path not in tests:
|
|
tests.append(path)
|
|
else:
|
|
matches = glob(path)
|
|
if matches == []:
|
|
raise Exception("Could not find test %s" % t)
|
|
|
|
tests.extend(list(set(matches) - set(tests)))
|
|
|
|
return sorted(tests)
|
|
|
|
SimpleResult = namedtuple('SimpleResult', 'run failures errors skipped time')
|
|
|
|
def start_test(ctx, subtests, rqueue):
|
|
'''
|
|
Run an individual test. 'subtests' are parsed prior to calling
|
|
but these effectively make up a single test. 'rqueue' is the
|
|
results queue which is required since this is using
|
|
multiprocessing.
|
|
'''
|
|
run = 0
|
|
errors = 0
|
|
failures = 0
|
|
skipped = 0
|
|
|
|
start = time.time()
|
|
#
|
|
# Iterate through each individual python test.
|
|
#
|
|
for s in subtests:
|
|
loader = unittest.TestLoader()
|
|
try:
|
|
module = importlib.import_module(os.path.splitext(s)[0])
|
|
except OSError as e:
|
|
dbg(subprocess.check_output("cat /proc/buddyinfo", shell=True).decode('utf-8'))
|
|
dbg(subprocess.check_output("dmesg | tail -80", shell=True).decode('utf-8'))
|
|
print(ctx)
|
|
raise e
|
|
|
|
subtest = loader.loadTestsFromModule(module)
|
|
|
|
# The test suite is being (ab)used to get a bit more granularity
|
|
# with individual tests. The 'normal' way to use unittest is to
|
|
# just create a test suite and run them. The problem here is that
|
|
# test results are queued and printed at the very end so its
|
|
# difficult to know *where* a test failed (python gives a stack
|
|
# trace but printing the exception/failure immediately shows
|
|
# where in the debug logs something failed). Moreso if there are
|
|
# several test functions inside a single python file they run
|
|
# as a single test and it is difficult (again) to know where
|
|
# something failed.
|
|
|
|
# Iterating through each python test file
|
|
for test in subtest:
|
|
limit_funcs = []
|
|
|
|
if ctx.args.sub_tests:
|
|
for i in ctx.args.sub_tests:
|
|
if len(i.split('.')) == 2:
|
|
limit_funcs.append(i.split('.')[1])
|
|
|
|
# Iterating through individual test functions inside a
|
|
# Test() class. Due to the nature of unittest we have
|
|
# to jump through some hoops to set up the test class
|
|
# only once by turning the enumeration into a list, then
|
|
# enumerating (again) to keep track of the index (just
|
|
# enumerating the test class doesn't allow len() because
|
|
# it is not a list).
|
|
tlist = list(enumerate(test))
|
|
for index, t in enumerate(tlist):
|
|
# enumerate is returning a tuple, index 1 is our
|
|
# actual object.
|
|
t = t[1]
|
|
|
|
func, file = str(t).split(' ')
|
|
#
|
|
# TODO: There may be a better way of doing this
|
|
# but strigifying the test class gives us a string:
|
|
# <function> (<file>.<class>)
|
|
#
|
|
file = file.strip('()').split('.')[0] + '.py'
|
|
|
|
# Create an empty result here in case the test fails
|
|
result = TestResult()
|
|
|
|
try:
|
|
skip = len(limit_funcs) > 0 and func not in limit_funcs
|
|
|
|
# Set up class only on first test
|
|
if index == 0:
|
|
if not skip:
|
|
dbg("%s\n\t%s RUNNING" % (file, str(func)), end='')
|
|
t.setUpClass()
|
|
else:
|
|
if not skip:
|
|
dbg("\t%s RUNNING" % str(func), end='')
|
|
|
|
sys.__stdout__.flush()
|
|
|
|
name = os.path.basename(os.getcwd())
|
|
|
|
Process.write_separators(name, "\n====== %s:%s:%s ======\n\n" %
|
|
(name, file, func))
|
|
|
|
if not skip:
|
|
# Run test (setUp/tearDown run automatically)
|
|
result = t()
|
|
|
|
# Tear down class only on last test
|
|
if index == len(tlist) - 1:
|
|
t.tearDownClass()
|
|
|
|
if skip:
|
|
continue
|
|
except unittest.SkipTest as e:
|
|
result.skipped.append(t)
|
|
except Exception as e:
|
|
dbg('\n%s threw an uncaught exception:' % func)
|
|
traceback.print_exc(file=sys.__stdout__)
|
|
|
|
run += result.testsRun
|
|
errors += len(result.errors)
|
|
failures += len(result.failures)
|
|
skipped += len(result.skipped)
|
|
|
|
if len(result.skipped) > 0:
|
|
dbg(colored(" SKIPPED", "cyan"))
|
|
elif run == 0 or len(result.errors) > 0 or len(result.failures) > 0:
|
|
dbg(colored(" FAILED", "red"))
|
|
for e in result.errors:
|
|
dbg(e[1])
|
|
for f in result.failures:
|
|
dbg(f[1])
|
|
else:
|
|
dbg(colored(" PASSED", "green"))
|
|
|
|
# Prevents future test modules with the same name (e.g.
|
|
# connection_test.py) from being loaded from the cache
|
|
sys.modules.pop(module.__name__)
|
|
|
|
#
|
|
# The multiprocessing queue is picky with what objects it will serialize
|
|
# and send between processes. Because of this we put the important bits
|
|
# of the result into our own 'SimpleResult' tuple.
|
|
#
|
|
sresult = SimpleResult(run=run, failures=failures, errors=errors,
|
|
skipped=skipped, time=time.time() - start)
|
|
rqueue.put(sresult)
|
|
|
|
# This may not be required since we are manually popping sys.modules
|
|
importlib.invalidate_caches()
|
|
|
|
def pre_test(ctx, test, copied):
|
|
'''
|
|
Copy test files, start processes, and any other pre test work.
|
|
'''
|
|
os.chdir(test)
|
|
|
|
dbg("\nStarting %s" % colored(os.path.basename(test), "white", attrs=['bold']))
|
|
if not os.path.exists(test + '/hw.conf'):
|
|
raise Exception("No hw.conf found for %s" % test)
|
|
|
|
ctx.hw_config = ConfigParser()
|
|
ctx.hw_config.read(test + '/hw.conf')
|
|
#
|
|
# We have two types of test files: tests and everything else. Rather
|
|
# than require each test to specify the files needing to be copied to
|
|
# /tmp (previously 'tmpfs_extra_stuff'), we just copy everything which
|
|
# isn't a test. There is really no reason not to do this as any file
|
|
# present in a test directory should be needed by the test.
|
|
#
|
|
# All files
|
|
files = os.listdir(test)
|
|
# Tests (starts or ends with 'test')
|
|
subtests = [f for f in files if f.startswith('test') or \
|
|
os.path.splitext(f)[0].endswith('test')]
|
|
# Everything else (except .py files)
|
|
to_copy = [f for f in list(set(files) - set(subtests)) if not f.endswith('.py') \
|
|
and f != '__pycache__']
|
|
for f in to_copy:
|
|
if os.path.isdir(f):
|
|
shutil.copytree(f, '/tmp/' + f)
|
|
else:
|
|
shutil.copy(f, '/tmp')
|
|
copied.append(f)
|
|
|
|
# Prune down any subtests if needed
|
|
if ctx.args.sub_tests:
|
|
ctx.args.sub_tests = ctx.args.sub_tests.split(',')
|
|
|
|
to_run = [x.split('.')[0] for x in ctx.args.sub_tests]
|
|
pruned = []
|
|
|
|
for s in subtests:
|
|
no_ext = s
|
|
# Handle <file>.<test function> format
|
|
if '.' in s:
|
|
no_ext = s.split('.')[0]
|
|
|
|
if no_ext in to_run:
|
|
pruned.append(no_ext + '.py')
|
|
|
|
subtests = pruned
|
|
|
|
if ctx.args.log:
|
|
ctx.start_process(['iwmon', '--nowiphy'])
|
|
elif ctx.args.monitor:
|
|
ctx.start_process(['iwmon'], outfile=ctx.args.monitor)
|
|
|
|
ctx.start_dbus()
|
|
ctx.start_haveged()
|
|
ctx.start_dbus_monitor()
|
|
ctx.start_radios()
|
|
ctx.create_namespaces()
|
|
ctx.start_hostapd()
|
|
ctx.start_wpas_interfaces()
|
|
ctx.start_ofono()
|
|
|
|
if ctx.hw_config.getboolean('SETUP', 'start_iwd', fallback=True):
|
|
ctx.start_iwd()
|
|
|
|
print(ctx)
|
|
|
|
sys.path.insert(1, test)
|
|
|
|
return sorted(subtests)
|
|
|
|
def post_test(ctx, to_copy):
|
|
'''
|
|
Remove copied files, and stop test processes.
|
|
'''
|
|
try:
|
|
for f in to_copy:
|
|
if os.path.isdir('/tmp/' + f):
|
|
shutil.rmtree('/tmp/' + f)
|
|
elif os.path.exists('/tmp/' + f):
|
|
os.remove('/tmp/' + f)
|
|
|
|
Process(['ip', 'link', 'set', 'lo', 'down']).wait()
|
|
except Exception as e:
|
|
print("Exception thrown in post_test")
|
|
finally:
|
|
ctx.stop_test_processes()
|
|
|
|
if ctx.args.valgrind:
|
|
for f in os.listdir('/tmp'):
|
|
if f.startswith("valgrind.log."):
|
|
dbg(f)
|
|
with open('/tmp/' + f, 'r') as v:
|
|
dbg(v.read())
|
|
dbg("\n")
|
|
os.remove('/tmp/' + f)
|
|
|
|
# Special case for when logging is enabled
|
|
if os.path.isfile('/tmp/iwd-tls-debug-server-cert.pem'):
|
|
os.remove('/tmp/iwd-tls-debug-server-cert.pem')
|
|
|
|
allowed = ['phonesim.conf', 'certs', 'secrets', 'iwd']
|
|
for f in [f for f in os.listdir('/tmp') if f not in allowed]:
|
|
dbg("File %s was not cleaned up!" % f)
|
|
try:
|
|
os.remove('/tmp/' + f)
|
|
except:
|
|
pass
|
|
|
|
def print_results(results):
|
|
table = PrettyTable(['Test', colored('Passed', 'green'), colored('Failed', 'red'), \
|
|
colored('Skipped', 'cyan'), colored('Time', 'yellow')])
|
|
|
|
total_pass = 0
|
|
total_fail = 0
|
|
total_skip = 0
|
|
total_time = 0
|
|
|
|
for test, result in results.items():
|
|
|
|
if result.time == TEST_MAX_TIMEOUT:
|
|
failed = "Timed out"
|
|
passed = "Timed out"
|
|
elif result.time == 0:
|
|
failed = "Exception"
|
|
passed = "Exception"
|
|
else:
|
|
failed = result.failures + result.errors
|
|
passed = result.run - failed
|
|
|
|
total_pass += passed
|
|
total_fail += failed
|
|
total_skip += result.skipped
|
|
|
|
total_time += result.time
|
|
|
|
time = '%.2f' % result.time
|
|
|
|
table.add_row([test, colored(passed, 'green'), colored(failed, 'red'), \
|
|
colored(result.skipped, 'cyan'), colored(time, 'yellow')])
|
|
|
|
total_time = '%.2f' % total_time
|
|
|
|
table.add_row(['Total', colored(total_pass, 'green'), colored(total_fail, 'red'), \
|
|
colored(total_skip, 'cyan'), colored(total_time, 'yellow')])
|
|
|
|
dbg(table)
|
|
|
|
return total_fail == 0
|
|
|
|
def run_auto_tests(ctx, args):
|
|
tests = build_test_list(args)
|
|
|
|
for test in tests:
|
|
copied = []
|
|
try:
|
|
subtests = pre_test(ctx, test, copied)
|
|
|
|
if len(subtests) < 1:
|
|
dbg("No tests to run")
|
|
sys.exit()
|
|
|
|
rqueue = multiprocessing.Queue()
|
|
p = multiprocessing.Process(target=start_test, args=(ctx, subtests, rqueue))
|
|
p.start()
|
|
# Rather than time each subtest we just time the total but
|
|
# mutiply the default time by the number of tests being run.
|
|
p.join(TEST_MAX_TIMEOUT * len(subtests))
|
|
|
|
if p.is_alive():
|
|
# Timeout
|
|
p.terminate()
|
|
|
|
ctx.results[os.path.basename(test)] = SimpleResult(run=0,
|
|
failures=0, errors=0,
|
|
skipped=0, time=TEST_MAX_TIMEOUT)
|
|
else:
|
|
ctx.results[os.path.basename(test)] = rqueue.get()
|
|
|
|
except Exception as ex:
|
|
dbg("%s threw an uncaught exception" % test)
|
|
traceback.print_exc(file=sys.__stdout__)
|
|
ctx.results[os.path.basename(test)] = SimpleResult(run=0, failures=0,
|
|
errors=0, skipped=0, time=0)
|
|
finally:
|
|
post_test(ctx, copied)
|
|
|
|
def run_unit_tests(ctx, args):
|
|
os.chdir(args.testhome + '/unit')
|
|
units = build_unit_list(args)
|
|
|
|
for u in units:
|
|
p = ctx.start_process([u]).wait()
|
|
if p.returncode != 0:
|
|
dbg("Unit test %s failed" % os.path.basename(u))
|
|
else:
|
|
dbg("Unit test %s passed" % os.path.basename(u))
|
|
|
|
def run_tests(args):
|
|
global config
|
|
|
|
os.chdir(args.testhome)
|
|
|
|
#
|
|
# This allows all autotest utils (iwd/hostapd/etc) to access the
|
|
# TestContext. Any other module or script (in the same interpreter) can
|
|
# simply import config.ctx and access all live test information,
|
|
# start/stop processes, see active radios etc.
|
|
#
|
|
config = importlib.import_module('config')
|
|
config.ctx = TestContext(args)
|
|
|
|
# Must import these after config so ctx gets set
|
|
config.hwsim = importlib.import_module('hwsim')
|
|
config.hostapd = importlib.import_module('hostapd')
|
|
|
|
# Start writing out kernel log
|
|
config.ctx.start_process(["dmesg", '--follow'])
|
|
|
|
if args.unit_tests is None:
|
|
run_auto_tests(config.ctx, args)
|
|
else:
|
|
run_unit_tests(config.ctx, args)
|
|
|
|
runner = Runner()
|
|
|
|
atexit.register(exit_vm)
|
|
runner.prepare_environment()
|
|
run_tests(runner.args)
|
|
runner.cleanup_environment()
|