mirror of
https://git.kernel.org/pub/scm/network/wireless/iwd.git
synced 2024-12-27 19:22:34 +01:00
8aac527e29
There was a bug with process output where the last bit of data would never make it into stdout or log files. This was due to the IO watch being cleaned up when the process was killed and never allowing it to finish writing any pending data. Now the IO watch implementation has been moved out into its own function (io_process) which is now used to write the final bits of data out on process exit.
1657 lines
42 KiB
Python
Executable File
1657 lines
42 KiB
Python
Executable File
#!/usr/bin/python3
|
|
|
|
import argparse
|
|
import os
|
|
import shutil
|
|
import ctypes
|
|
import fcntl
|
|
import shlex
|
|
import sys
|
|
import subprocess
|
|
import atexit
|
|
import time
|
|
import unittest
|
|
import importlib
|
|
import signal
|
|
import pyroute2
|
|
import multiprocessing
|
|
import re
|
|
|
|
from configparser import ConfigParser
|
|
from prettytable import PrettyTable
|
|
from termcolor import colored
|
|
from glob import glob
|
|
from collections import namedtuple
|
|
from time import sleep
|
|
import dbus.mainloop.glib
|
|
from gi.repository import GLib
|
|
|
|
libc = ctypes.cdll['libc.so.6']
|
|
libc.mount.argtypes = (ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, \
|
|
ctypes.c_ulong, ctypes.c_char_p)
|
|
|
|
# Using ctypes to load the libc library is somewhat low level. Because of this
|
|
# we need to define our own flags/options for use with mounting.
|
|
MS_NOSUID = 2
|
|
MS_NODEV = 4
|
|
MS_NOEXEC = 8
|
|
MS_STRICTATIME = 1 << 24
|
|
STDIN_FILENO = 0
|
|
TIOCSTTY = 0x540E
|
|
|
|
config = None
|
|
intf_id = 0
|
|
|
|
TEST_MAX_TIMEOUT = 120
|
|
|
|
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
|
|
|
def dbg(*s):
|
|
'''
|
|
Allows prints if stdout has been re-directed
|
|
'''
|
|
print(*s, file=sys.__stdout__)
|
|
|
|
def exit_vm():
|
|
if config:
|
|
for p in config.ctx.processes:
|
|
print("Process %s still running!" % p.name)
|
|
p.kill()
|
|
p = None
|
|
|
|
config.ctx.processes = []
|
|
|
|
if config.ctx and config.ctx.results:
|
|
print_results(config.ctx.results)
|
|
|
|
os.sync()
|
|
|
|
RB_AUTOBOOT = 0x01234567
|
|
#
|
|
# Calling 'reboot' or 'shutdown' from a shell (e.g. os.system('reboot'))
|
|
# is not the same the POSIX reboot() and will cause a kernel panic since
|
|
# we are the init process. The libc.reboot() allows the VM to exit
|
|
# gracefully.
|
|
#
|
|
libc.reboot(RB_AUTOBOOT)
|
|
|
|
def path_exists(path):
|
|
'''
|
|
Searches PATH as well as absolute paths.
|
|
'''
|
|
if shutil.which(path):
|
|
return True
|
|
try:
|
|
os.stat(path)
|
|
except:
|
|
return False
|
|
return True
|
|
|
|
def find_binary(list):
|
|
'''
|
|
Returns a binary from 'list' if its found in PATH or on a
|
|
valid absolute path.
|
|
'''
|
|
for path in list:
|
|
if path_exists(path):
|
|
return path
|
|
return None
|
|
|
|
def mount(source, target, fs, flags, options=''):
|
|
'''
|
|
Python wrapper for libc mount()
|
|
'''
|
|
ret = libc.mount(source.encode(), target.encode(), fs.encode(), flags,
|
|
options.encode())
|
|
if ret < 0:
|
|
errno = ctypes.get_errno()
|
|
raise Exception("Could not mount %s (%d)" % (target, errno))
|
|
|
|
MountInfo = namedtuple('MountInfo', 'fstype target options flags')
|
|
|
|
mount_table = [
|
|
MountInfo('sysfs', '/sys', '', MS_NOSUID|MS_NOEXEC|MS_NODEV),
|
|
MountInfo('proc', '/proc', '', MS_NOSUID|MS_NOEXEC|MS_NODEV),
|
|
MountInfo('devpts', '/dev/pts', 'mode=0620', MS_NOSUID|MS_NOEXEC),
|
|
MountInfo('tmpfs', '/dev/shm', 'mode=1777', MS_NOSUID|MS_NODEV|MS_STRICTATIME),
|
|
MountInfo('tmpfs', '/run', 'mode=0755', MS_NOSUID|MS_NODEV|MS_STRICTATIME),
|
|
MountInfo('tmpfs', '/tmp', '', 0),
|
|
MountInfo('tmpfs', '/usr/share/dbus-1', 'mode=0755', MS_NOSUID|MS_NOEXEC|MS_NODEV|MS_STRICTATIME),
|
|
MountInfo('debugfs', '/sys/kernel/debug', '', 0)
|
|
]
|
|
|
|
DevInfo = namedtuple('DevInfo', 'target linkpath')
|
|
|
|
dev_table = [
|
|
DevInfo('/proc/self/fd', '/dev/fd'),
|
|
DevInfo('/proc/self/fd/0', '/dev/stdin'),
|
|
DevInfo('/proc/self/fd/1', '/dev/stdout'),
|
|
DevInfo('/proc/self/fd/2', '/dev/stderr')
|
|
]
|
|
|
|
# Partial DBus config. The remainder (<listen>) will be filled in for each
|
|
# namespace that is created so each individual dbus-daemon has its own socket
|
|
# and address.
|
|
dbus_config = '''
|
|
<!DOCTYPE busconfig PUBLIC \
|
|
"-//freedesktop//DTD D-Bus Bus Configuration 1.0//EN" \
|
|
"http://www.freedesktop.org/standards/dbus/1.0/\
|
|
busconfig.dtd\">
|
|
<busconfig>
|
|
<type>system</type>
|
|
<limit name=\"reply_timeout\">2147483647</limit>
|
|
<auth>ANONYMOUS</auth>
|
|
<allow_anonymous/>
|
|
<policy context=\"default\">
|
|
<allow user=\"*\"/>
|
|
<allow own=\"*\"/>
|
|
<allow send_type=\"method_call\"/>
|
|
<allow send_type=\"signal\"/>
|
|
<allow send_type=\"method_return\"/>
|
|
<allow send_type=\"error\"/>
|
|
<allow receive_type=\"method_call\"/>
|
|
<allow receive_type=\"signal\"/>
|
|
<allow receive_type=\"method_return\"/>
|
|
<allow receive_type=\"error\"/>
|
|
<allow send_destination=\"*\" eavesdrop=\"true\"/>
|
|
<allow eavesdrop=\"true\"/>
|
|
</policy>
|
|
'''
|
|
class Process:
|
|
'''
|
|
Start a process. If 'wait' is True the constructor will start
|
|
the process and wait for it to exit. No PID is tracked in this
|
|
case.
|
|
'''
|
|
def __init__(self, args, wait=False, env=None, ctx=None, check=False,
|
|
outfile=None, namespace=None, need_out=False, cleanup=None):
|
|
self.killed = False
|
|
self.args = args
|
|
self.wait = wait
|
|
self.name = args[0]
|
|
self.ret = None
|
|
self.ctx = ctx
|
|
self.write_fds = []
|
|
self.io_watch = None
|
|
self.cleanup = cleanup
|
|
self.verbose = False
|
|
|
|
if not namespace:
|
|
self.output_name = '/tmp/%s-out' % self.name
|
|
else:
|
|
self.output_name = '/tmp/%s-%s-out' % (self.name, namespace)
|
|
|
|
if namespace:
|
|
self.args = ['ip', 'netns', 'exec', namespace]
|
|
self.args.extend(args)
|
|
|
|
#
|
|
# For simplicity all processes will write to a temp file
|
|
# (/tmp/<name>-out). If any verbose options are required this file
|
|
# will get an IO watch and write out any bytes to the needed FDs.
|
|
#
|
|
self.stdout = open(self.output_name, 'a+')
|
|
self.io_position = self.stdout.tell()
|
|
|
|
if ctx:
|
|
# Verbose requested, add stdout/stderr to write FD list
|
|
if self.name in ctx.args.verbose:
|
|
self.verbose = True
|
|
|
|
# Add output file to FD list
|
|
if outfile:
|
|
try:
|
|
f = open(outfile, 'w')
|
|
except Exception as e:
|
|
dbg(e)
|
|
exit(0)
|
|
|
|
if ctx.args.log_uid:
|
|
os.fchown(f.fileno(), int(ctx.args.log_uid), int(ctx.args.log_gid))
|
|
|
|
self.write_fds.append(f)
|
|
|
|
# Add log file to FD list
|
|
if ctx.args.log:
|
|
test = os.path.basename(os.getcwd())
|
|
test_dir = '%s/%s' % (ctx.args.log, test)
|
|
|
|
if not path_exists(test_dir):
|
|
os.mkdir(test_dir)
|
|
os.chown(test_dir, int(ctx.args.log_uid),
|
|
int(ctx.args.log_gid))
|
|
|
|
f = open('%s/%s' % (test_dir, args[0]), 'a+')
|
|
os.fchown(f.fileno(), int(ctx.args.log_uid), int(ctx.args.log_gid))
|
|
self.write_fds.append(f)
|
|
|
|
#
|
|
# Only add an IO watch for long running processes. If
|
|
# the process is being waited for, the log/outfile bits
|
|
# will be handled after the process exists.
|
|
#
|
|
if self.write_fds != [] and not wait and not check or self.verbose:
|
|
self.io_watch = GLib.io_add_watch(self.stdout, GLib.IO_IN,
|
|
self.io_callback)
|
|
|
|
self.pid = subprocess.Popen(self.args, stdout=self.stdout, stderr=self.stdout,
|
|
env=env, cwd=os.getcwd())
|
|
|
|
print("Starting process {}".format(self.pid.args))
|
|
|
|
if not wait and not check:
|
|
return
|
|
|
|
self.pid.wait(timeout=5)
|
|
self.killed = True
|
|
self.ret = self.pid.returncode
|
|
|
|
self.stdout.seek(self.io_position)
|
|
self.out = self.stdout.read()
|
|
self.stdout.seek(0, 2)
|
|
|
|
if len(self.write_fds) > 0:
|
|
self.process_io(self.stdout)
|
|
|
|
self.write_fds = []
|
|
|
|
print("%s returned %d" % (args[0], self.ret))
|
|
if check and self.ret != 0:
|
|
raise subprocess.CalledProcessError(returncode=self.ret, cmd=self.args)
|
|
|
|
def process_io(self, source):
|
|
#
|
|
# The file will have already been written to, meaning the seek
|
|
# position points to EOF. This is why the position is saved so
|
|
# we can seek to where we were last time, read data, and seek
|
|
# back to EOF.
|
|
#
|
|
source.seek(self.io_position)
|
|
data = source.read()
|
|
|
|
self.io_position += len(data)
|
|
source.seek(self.io_position)
|
|
|
|
if len(data) == 0:
|
|
return True
|
|
|
|
for f in self.write_fds:
|
|
f.write(data)
|
|
f.flush()
|
|
|
|
if self.verbose:
|
|
sys.__stdout__.write(data)
|
|
sys.__stdout__.flush()
|
|
|
|
return True
|
|
|
|
def io_callback(self, source, cb_condition):
|
|
return self.process_io(source)
|
|
|
|
def __del__(self):
|
|
print("Del process %s" % self.args)
|
|
|
|
os.remove(self.output_name)
|
|
|
|
self.stdout.close()
|
|
|
|
if not self.killed:
|
|
self.kill()
|
|
|
|
def kill(self, force=False):
|
|
print("Killing process %s" % self.args)
|
|
|
|
if self.killed:
|
|
return
|
|
|
|
if force:
|
|
self.pid.kill()
|
|
else:
|
|
self.pid.terminate()
|
|
|
|
self.pid.wait(timeout=15)
|
|
self.pid = None
|
|
|
|
if self.ctx and self in self.ctx.processes:
|
|
self.ctx.processes.remove(self)
|
|
|
|
self.ctx = None
|
|
|
|
self.process_io(self.stdout)
|
|
|
|
if self.cleanup:
|
|
self.cleanup()
|
|
|
|
self.write_fds = []
|
|
|
|
if self.io_watch:
|
|
GLib.source_remove(self.io_watch)
|
|
self.io_watch = None
|
|
|
|
self.cleanup = None
|
|
self.killed = True
|
|
|
|
def wait_for_socket(self, socket, wait):
|
|
waited = 0
|
|
while not os.path.exists(socket):
|
|
sleep(0.5)
|
|
waited += 0.5
|
|
if waited > wait:
|
|
raise Exception("Timed out waiting for socket")
|
|
|
|
def __str__(self):
|
|
return str(self.args) + '\n'
|
|
|
|
class Interface:
|
|
def __init__(self, name, config):
|
|
self.name = name
|
|
self.ctrl_interface = '/var/run/hostapd/' + name
|
|
self.config = config
|
|
|
|
def __del__(self):
|
|
Process(['iw', 'dev', self.name, 'del'], True)
|
|
|
|
def set_interface_state(self, state):
|
|
Process(['ifconfig', self.name, state], True)
|
|
|
|
class Radio:
|
|
def __init__(self, name):
|
|
self.name = name
|
|
# hostapd will reset this if this radio is used by it
|
|
self.use = 'iwd'
|
|
self.interface = None
|
|
|
|
def __del__(self):
|
|
print("Removing radio %s" % self.name)
|
|
self.interface = None
|
|
|
|
def create_interface(self, config, use):
|
|
global intf_id
|
|
|
|
ifname = 'wln%s' % intf_id
|
|
|
|
intf_id += 1
|
|
|
|
self.interface = Interface(ifname, config)
|
|
self.use = use
|
|
|
|
Process(['iw', 'phy', self.name, 'interface', 'add', ifname,
|
|
'type', 'managed'], True)
|
|
|
|
return self.interface
|
|
|
|
def __str__(self):
|
|
ret = self.name + ':\n'
|
|
ret += '\tUsed By: %s ' % self.use
|
|
if self.interface:
|
|
ret += '(%s)' % self.interface.name
|
|
|
|
ret += '\n'
|
|
|
|
return ret
|
|
|
|
class VirtualRadio(Radio):
|
|
'''
|
|
A subclass of 'Radio' specific to mac80211_hwsim radios.
|
|
|
|
TODO: Using D-Bus to create and destroy radios is more desireable
|
|
than the command line.
|
|
'''
|
|
def __init__(self, name, config=None):
|
|
self.disable_cipher = None
|
|
self.disable_iftype = None
|
|
|
|
hwsim = importlib.import_module('hwsim').Hwsim()
|
|
|
|
if config:
|
|
self.disable_iftype = config.get('iftype_disable', None)
|
|
self.disable_cipher = config.get('cipher_disable', None)
|
|
|
|
self._radio = hwsim.radios.create(name, p2p_device=True,
|
|
iftype_disable=self.disable_iftype,
|
|
cipher_disable=self.disable_cipher)
|
|
|
|
super().__init__(self._radio.name)
|
|
|
|
def __del__(self):
|
|
super().__del__()
|
|
|
|
# If the radio was moved into a namespace this will fail
|
|
try:
|
|
self._radio.remove()
|
|
except:
|
|
pass
|
|
|
|
self._radio = None
|
|
|
|
def __str__(self):
|
|
ret = super().__str__()
|
|
|
|
if self.disable_iftype:
|
|
ret += '\tDisabled interface types: %s\n' % self.disable_iftype
|
|
|
|
if self.disable_cipher:
|
|
ret += '\tDisabled ciphers: %s\n' % self.disable_cipher
|
|
|
|
ret += '\tPath: %s' % self._radio.path
|
|
|
|
ret += '\n'
|
|
|
|
return ret
|
|
|
|
class HostapdInstance:
|
|
'''
|
|
A single instance of hostapd. In reality all hostapd instances
|
|
are started as a single process. This class just makes things
|
|
convenient for communicating with one of the hostapd APs.
|
|
'''
|
|
def __init__(self, config, radio):
|
|
self.radio = radio
|
|
self.config = config
|
|
|
|
self.intf = radio.create_interface(self.config, 'hostapd')
|
|
self.intf.set_interface_state('up')
|
|
|
|
def __del__(self):
|
|
print("Removing HostapdInstance %s" % self.config)
|
|
self.intf.set_interface_state('down')
|
|
self.radio = None
|
|
self.intf = None
|
|
|
|
def __str__(self):
|
|
ret = 'Hostapd (%s)\n' % self.intf.name
|
|
ret += '\tConfig: %s\n' % self.config
|
|
|
|
return ret
|
|
|
|
class Hostapd:
|
|
'''
|
|
A set of running hostapd instances. This is really just a single
|
|
process since hostapd can be started with multiple config files.
|
|
'''
|
|
def __init__(self, ctx, radios, configs, radius):
|
|
self.ctx = ctx
|
|
|
|
if len(configs) != len(radios):
|
|
raise Exception("Config (%d) and radio (%d) list length not equal" % \
|
|
(len(configs), len(radios)))
|
|
|
|
print("Initializing hostapd instances")
|
|
|
|
ctx.start_process(['ip', 'link', 'set', 'eth0', 'up'], wait=True)
|
|
ctx.start_process(['ip', 'link', 'set', 'eth1', 'up'], wait=True)
|
|
|
|
self.global_ctrl_iface = '/var/run/hostapd/ctrl'
|
|
|
|
self.instances = [HostapdInstance(c, r) for c, r in zip(configs, radios)]
|
|
|
|
ifaces = [rad.interface.name for rad in radios]
|
|
ifaces = ','.join(ifaces)
|
|
|
|
args = ['hostapd', '-g', self.global_ctrl_iface]
|
|
|
|
if ifaces:
|
|
args.extend(['-i', ifaces])
|
|
|
|
#
|
|
# Config files should already be present in /tmp. This appends
|
|
# ctrl_interface and does any variable replacement. Currently
|
|
# this is just any $ifaceN occurrences.
|
|
#
|
|
for c in configs:
|
|
full_path = '/tmp/%s' % c
|
|
args.append(full_path)
|
|
|
|
self._rewrite_config(full_path)
|
|
|
|
if radius:
|
|
args.append(radius)
|
|
|
|
if ctx.is_verbose('hostapd'):
|
|
args.append('-d')
|
|
|
|
self.process = ctx.start_process(args)
|
|
|
|
self.process.wait_for_socket(self.global_ctrl_iface, 30)
|
|
|
|
def _rewrite_config(self, config):
|
|
'''
|
|
Replaces any $ifaceN values with the correct interface
|
|
names as well as appends the ctrl_interface path to
|
|
the config file.
|
|
'''
|
|
with open(config, 'r+') as f:
|
|
data = f.read()
|
|
to_replace = []
|
|
for match in re.finditer(r'\$iface[0-9]+', data):
|
|
tag = data[match.start():match.end()]
|
|
idx = tag.split('iface')[1]
|
|
|
|
to_replace.append((tag, self.instances[int(idx)].intf.name))
|
|
|
|
for r in to_replace:
|
|
data = data.replace(r[0], r[1], 1)
|
|
|
|
data += '\nctrl_interface=/var/run/hostapd\n'
|
|
|
|
f.write(data)
|
|
|
|
def __getitem__(self, config):
|
|
if not config:
|
|
return self.instances[0]
|
|
|
|
for hapd in self.instances:
|
|
if hapd.config == config:
|
|
return hapd
|
|
|
|
return None
|
|
|
|
def __del__(self):
|
|
print("Removing Hostapd")
|
|
try:
|
|
os.remove(self.global_ctrl_iface)
|
|
except:
|
|
dbg("Failed to remove %s" % self.global_ctrl_iface)
|
|
|
|
self.instances = None
|
|
|
|
# Hostapd may have already been stopped
|
|
if self.process:
|
|
self.ctx.stop_process(self.process)
|
|
|
|
self.ctx = None
|
|
|
|
# Hostapd creates simdb sockets for EAP-SIM/AKA tests but does not
|
|
# clean them up.
|
|
for f in glob("/tmp/eap_sim_db*"):
|
|
os.remove(f)
|
|
|
|
dbus_count = 0
|
|
|
|
class Namespace:
|
|
def __init__(self, args, name, radios):
|
|
self.dbus_address = None
|
|
self.processes = []
|
|
self.name = name
|
|
self.radios = radios
|
|
self.args = args
|
|
|
|
Process(['ip', 'netns', 'add', name], wait=True)
|
|
for r in radios:
|
|
Process(['iw', 'phy', r.name, 'set', 'netns', 'name', name], wait=True)
|
|
|
|
self.start_dbus()
|
|
|
|
def reset(self):
|
|
self._bus = None
|
|
|
|
for r in self.radios:
|
|
r._radio = None
|
|
|
|
self.radios = []
|
|
|
|
if self.name == "root":
|
|
self._bus = dbus.bus.BusConnection(address_or_type=self.dbus_address)
|
|
|
|
for p in list(self.processes):
|
|
print("Killing process %s" % p.name)
|
|
p.kill()
|
|
|
|
self.processes = []
|
|
|
|
def __del__(self):
|
|
print("Removing namespace %s" % self.name)
|
|
|
|
Process(['ip', 'netns', 'del', self.name], wait=True)
|
|
|
|
def get_bus(self):
|
|
return self._bus
|
|
|
|
def start_process(self, args, env=None, **kwargs):
|
|
# Special case for 'root' namespace (aka TestContext)
|
|
if self.name == "root":
|
|
ns = None
|
|
else:
|
|
ns = self.name
|
|
|
|
if not env:
|
|
env = os.environ.copy()
|
|
|
|
# In case this process needs DBus...
|
|
env['DBUS_SYSTEM_BUS_ADDRESS'] = self.dbus_address
|
|
|
|
p = Process(args, ctx=self, namespace=ns, env=env, **kwargs)
|
|
|
|
if not kwargs.get('wait', False):
|
|
self.processes.append(p)
|
|
|
|
return p
|
|
|
|
def stop_process(self, p, force=False):
|
|
p.kill(force)
|
|
|
|
def is_process_running(self, process):
|
|
for p in self.processes:
|
|
if p.name == process:
|
|
return True
|
|
return False
|
|
|
|
def _cleanup_dbus(self):
|
|
try:
|
|
os.remove(self.dbus_address.split('=')[1])
|
|
except:
|
|
pass
|
|
|
|
os.remove(self.dbus_cfg)
|
|
|
|
def start_dbus(self):
|
|
global dbus_count
|
|
|
|
self.dbus_address = 'unix:path=/tmp/dbus%d' % dbus_count
|
|
self.dbus_cfg = '/tmp/dbus%d.conf' % dbus_count
|
|
dbus_count += 1
|
|
|
|
with open(self.dbus_cfg, 'w+') as f:
|
|
f.write(dbus_config)
|
|
f.write('<listen>%s</listen>\n' % self.dbus_address)
|
|
f.write('</busconfig>\n')
|
|
|
|
p = self.start_process(['dbus-daemon', '--config-file=%s' % self.dbus_cfg],
|
|
wait=False, cleanup=self._cleanup_dbus)
|
|
|
|
p.wait_for_socket(self.dbus_address.split('=')[1], wait=5)
|
|
|
|
self._bus = dbus.bus.BusConnection(address_or_type=self.dbus_address)
|
|
|
|
def start_iwd(self, config_dir = '/tmp', storage_dir = '/tmp/iwd'):
|
|
args = []
|
|
iwd_radios = ','.join([r.name for r in self.radios if r.use == 'iwd'])
|
|
|
|
if self.args.valgrind:
|
|
args.extend(['valgrind', '--leak-check=full', '--track-origins=yes',
|
|
'--log-file=/tmp/valgrind.log'])
|
|
|
|
args.extend(['iwd', '-p', iwd_radios])
|
|
|
|
if self.is_verbose(args[0]):
|
|
args.append('-d')
|
|
|
|
env = os.environ.copy()
|
|
|
|
env['CONFIGURATION_DIRECTORY'] = config_dir
|
|
env['STATE_DIRECTORY'] = storage_dir
|
|
|
|
if self.is_verbose('iwd-dhcp'):
|
|
env['IWD_DHCP_DEBUG'] = '1'
|
|
|
|
if self.is_verbose('iwd-tls'):
|
|
env['IWD_TLS_DEBUG'] = '1'
|
|
|
|
if self.is_verbose('iwd-acd'):
|
|
env['IWD_ACD_DEBUG'] = '1'
|
|
|
|
pid = self.start_process(args, env=env)
|
|
return pid
|
|
|
|
def is_verbose(self, process):
|
|
process = os.path.basename(process)
|
|
|
|
if self.args is None:
|
|
return False
|
|
|
|
# every process is verbose when logging is enabled
|
|
if self.args.log:
|
|
return True
|
|
|
|
if process in self.args.verbose:
|
|
return True
|
|
|
|
# Special case here to enable verbose output with valgrind running
|
|
if process == 'valgrind' and 'iwd' in self.args.verbose:
|
|
return True
|
|
|
|
# Handle any glob matches
|
|
for item in self.args.verbose:
|
|
if process in glob(item):
|
|
return True
|
|
|
|
return False
|
|
|
|
def wait_for_dbus_service(self, service):
|
|
tries = 0
|
|
|
|
while not self._bus.name_has_owner(service):
|
|
if tries > 200:
|
|
raise TimeoutError('DBus service %s did not appear', service)
|
|
tries += 1
|
|
sleep(0.1)
|
|
|
|
def __str__(self):
|
|
ret = 'Namespace: %s\n' % self.name
|
|
ret += 'Processes:\n'
|
|
for p in self.processes:
|
|
ret += '\t%s' % str(p)
|
|
|
|
ret += 'Radios:\n'
|
|
if len(self.radios) > 0:
|
|
for r in self.radios:
|
|
ret += '\t%s\n' % str(r)
|
|
else:
|
|
ret += '\tNo Radios\n'
|
|
|
|
ret += 'DBus Address: %s\n' % self.dbus_address
|
|
ret += '===================================================\n\n'
|
|
|
|
return ret
|
|
|
|
class TestContext(Namespace):
|
|
'''
|
|
Contains all information for a given set of tests being run
|
|
such as processes, radios, interfaces and test results.
|
|
'''
|
|
def __init__(self, args):
|
|
self.name = "root"
|
|
self.processes = []
|
|
self.args = args
|
|
self.hw_config = None
|
|
self.hostapd = None
|
|
self.wpas_interfaces = None
|
|
self.cur_radio_id = 0
|
|
self.cur_iface_id = 0
|
|
self.radios = []
|
|
self.loopback_started = False
|
|
self.results = {}
|
|
self.mainloop = GLib.MainLoop()
|
|
self.namespaces = []
|
|
|
|
def start_dbus_monitor(self):
|
|
if not self.is_verbose('dbus-monitor'):
|
|
return
|
|
|
|
self.start_process(['dbus-monitor', '--address', self.dbus_address])
|
|
|
|
def start_haveged(self):
|
|
self.start_process(['haveged', '-F'])
|
|
|
|
def create_radios(self):
|
|
setup = self.hw_config['SETUP']
|
|
nradios = int(setup['num_radios'])
|
|
args = ['hwsim']
|
|
|
|
if not self.hw_config['SETUP'].get('hwsim_medium', 'yes') in ['yes', '1', 'true']:
|
|
# register hwsim as medium
|
|
args.extend(['--no-register'])
|
|
|
|
self.start_process(args)
|
|
self.wait_for_dbus_service('net.connman.hwsim')
|
|
|
|
for i in range(nradios):
|
|
name = 'rad%u' % i
|
|
|
|
# Get any [radX] sections. These are for configuring
|
|
# any special radios. This no longer requires a
|
|
# radio_conf list, we just assume radios start rad0
|
|
# and increment.
|
|
rad_config = None
|
|
if self.hw_config.has_section(name):
|
|
rad_config = self.hw_config[name]
|
|
|
|
self.radios.append(VirtualRadio(name, rad_config))
|
|
self.cur_radio_id += 1
|
|
|
|
def discover_radios(self):
|
|
phys = []
|
|
iw = pyroute2.iwutil.IW()
|
|
|
|
attrs = [phy['attrs'] for phy in iw.list_wiphy()]
|
|
|
|
for attr in attrs:
|
|
for key, value in attr:
|
|
if key == 'NL80211_ATTR_WIPHY_NAME':
|
|
if value not in phys:
|
|
phys.append(value)
|
|
break
|
|
|
|
print('Discovered radios: %s' % str(phys))
|
|
self.radios = [Radio(name) for name in phys]
|
|
|
|
def start_radios(self):
|
|
reg_domain = self.hw_config['SETUP'].get('reg_domain', None)
|
|
if reg_domain:
|
|
Process(['iw', 'reg', 'set', reg_domain], True)
|
|
|
|
if self.args.hw:
|
|
self.discover_radios()
|
|
else:
|
|
self.create_radios()
|
|
|
|
def start_hostapd(self):
|
|
if not 'HOSTAPD' in self.hw_config:
|
|
return
|
|
|
|
settings = self.hw_config['HOSTAPD']
|
|
|
|
if self.args.hw:
|
|
# Just grab the first N radios. It gets rather
|
|
# complicated trying to map radX radios specified in
|
|
# hw.conf so any passed through physical adapters are
|
|
# just given to hostapd/IWD as they appear during
|
|
# discovery.
|
|
#
|
|
# TODO: It may be desireable to map PCI/USB adapters to
|
|
# specific radX radios specified in the config but
|
|
# there are really 2 separate use cases here.
|
|
# 1. You want to test a *specific* radio with IWD
|
|
# or hostapd. For this you would want radX
|
|
# to map to a specific radio
|
|
# 2. You have many adapters in use to run multiple
|
|
# tests. In this case you would not care what
|
|
# was using each radio, just that there was
|
|
# enough to run all tests.
|
|
nradios = 0
|
|
for k, _ in settings.items():
|
|
if k == 'radius_server':
|
|
continue
|
|
nradios += 1
|
|
|
|
hapd_radios = self.radios[:nradios]
|
|
|
|
else:
|
|
hapd_radios = [rad for rad in self.radios if rad.name in settings]
|
|
|
|
hapd_configs = [conf for rad, conf in settings.items() if rad != 'radius_server']
|
|
|
|
radius_config = settings.get('radius_server', None)
|
|
|
|
self.hostapd = Hostapd(self, hapd_radios, hapd_configs, radius_config)
|
|
|
|
def start_wpas_interfaces(self):
|
|
if 'WPA_SUPPLICANT' not in self.hw_config:
|
|
return
|
|
|
|
settings = self.hw_config['WPA_SUPPLICANT']
|
|
wpas_radios = [rad for rad in self.radios if rad.name in settings]
|
|
self.wpas_interfaces = [rad.create_interface(settings[rad.name], 'wpas') for rad in wpas_radios]
|
|
|
|
def start_ofono(self):
|
|
sim_keys = self.hw_config['SETUP'].get('sim_keys', None)
|
|
if not sim_keys:
|
|
print("Ofono not requred")
|
|
return
|
|
elif sim_keys != 'ofono':
|
|
os.environ['IWD_SIM_KEYS'] = sim_keys
|
|
return
|
|
|
|
if not find_binary(['ofonod']) or not find_binary(['phonesim']):
|
|
print("Ofono or Phonesim not found, skipping test")
|
|
return
|
|
|
|
Process(['ifconfig', 'lo', 'up'], wait=True)
|
|
|
|
os.environ['OFONO_PHONESIM_CONFIG'] = '/tmp/phonesim.conf'
|
|
|
|
phonesim_args = ['phonesim', '-p', '12345', '/usr/share/phonesim/default.xml']
|
|
|
|
self.start_process(phonesim_args)
|
|
|
|
#
|
|
# TODO:
|
|
# Is there something to wait for? Without this phonesim rejects
|
|
# connections on all but the fist test.
|
|
#
|
|
time.sleep(3)
|
|
|
|
ofono_args = ['ofonod', '-n', '--plugin=atmodem,phonesim']
|
|
if self.is_verbose('ofonod'):
|
|
ofono_args.append('-d')
|
|
|
|
self.start_process(ofono_args)
|
|
|
|
print("Ofono started")
|
|
|
|
def create_namespaces(self):
|
|
if not self.hw_config.has_section('NameSpaces'):
|
|
return
|
|
|
|
for key, value in self.hw_config.items('NameSpaces'):
|
|
radio_names = value.split(',')
|
|
# Gather up radio objects for this namespace
|
|
radios = [rad for rad in self.radios if rad.name in radio_names]
|
|
|
|
# Remove radios from 'root' namespace
|
|
self.radios = list(set(self.radios) - set(radios))
|
|
|
|
self.namespaces.append(Namespace(self.args, key, radios))
|
|
|
|
def get_namespace(self, ns):
|
|
for n in self.namespaces:
|
|
if n.name == ns:
|
|
return n
|
|
|
|
return None
|
|
|
|
def stop_test_processes(self):
|
|
for n in self.namespaces:
|
|
n.reset()
|
|
|
|
self.namespaces = []
|
|
self.hostapd = None
|
|
self.wpas_interfaces = None
|
|
|
|
self.reset()
|
|
|
|
def __str__(self):
|
|
ret = 'Arguments:\n'
|
|
for arg in vars(self.args):
|
|
ret += '\t --%s %s\n' % (arg, str(getattr(self.args, arg)))
|
|
|
|
ret += 'Hostapd:\n'
|
|
if self.hostapd:
|
|
for h in self.hostapd.instances:
|
|
ret += '\t%s\n' % str(h)
|
|
else:
|
|
ret += '\tNo Hostapd instances\n'
|
|
|
|
ret += super().__str__()
|
|
|
|
for n in self.namespaces:
|
|
ret += n.__str__()
|
|
|
|
return ret
|
|
|
|
def prepare_sandbox():
|
|
print('Preparing sandbox')
|
|
|
|
for entry in mount_table:
|
|
try:
|
|
os.lstat(entry.target)
|
|
except:
|
|
os.mkdir(entry.target, 755)
|
|
|
|
mount(entry.fstype, entry.target, entry.fstype, entry.flags,
|
|
entry.options)
|
|
|
|
for entry in dev_table:
|
|
os.symlink(entry.target, entry.linkpath)
|
|
|
|
os.mkdir('/tmp/iwd')
|
|
|
|
os.setsid()
|
|
|
|
fcntl.ioctl(STDIN_FILENO, TIOCSTTY, 1)
|
|
|
|
def build_unit_list(args):
|
|
'''
|
|
Build list of unit tests based on passed arguments. This first
|
|
checks for literal names provided in the arguments, then if
|
|
no matches were found, checks for a glob match.
|
|
'''
|
|
tests = []
|
|
test_root = args.testhome + '/unit'
|
|
|
|
for unit in args.unit_tests.split(','):
|
|
path = '%s/%s' % (test_root, unit)
|
|
if os.access(unit, os.X_OK):
|
|
tests.append(unit)
|
|
elif os.access(path, os.X_OK):
|
|
tests.append(path)
|
|
else:
|
|
# Full list or glob, first build up valid list of tests
|
|
matches = glob(path)
|
|
if matches == []:
|
|
raise Exception("Could not find test %s" % unit)
|
|
|
|
matches = [exe for exe in matches if os.access(exe, os.X_OK)]
|
|
|
|
tests.extend(matches)
|
|
|
|
return sorted(tests)
|
|
|
|
def build_test_list(args):
|
|
'''
|
|
Build list of auto test directories based on passed arguments.
|
|
First check for absolute paths, then look in <iwd>/autotests,
|
|
then glob match.
|
|
'''
|
|
tests = []
|
|
test_root = args.testhome + '/autotests'
|
|
full_list = sorted(os.listdir(test_root))
|
|
|
|
# Run all tests
|
|
if not args.auto_tests:
|
|
# --shell with no tests implies 'shell' test
|
|
if args.shell:
|
|
return [test_root + '/shell']
|
|
|
|
# Pair down any non-tests and append full path
|
|
tests = [test_root + '/' + t for t in full_list if t.startswith('test')]
|
|
else:
|
|
print("Generating partial test list")
|
|
for t in args.auto_tests.split(','):
|
|
path = '%s/%s' % (test_root, t)
|
|
if t.endswith('+'):
|
|
t = t.split('+')[0]
|
|
i = full_list.index(t)
|
|
|
|
tests = [test_root + '/' + x for x in full_list[i:] \
|
|
if x.startswith('test')]
|
|
elif os.path.exists(t):
|
|
tests.append(t)
|
|
elif os.path.exists(path):
|
|
tests.append(path)
|
|
else:
|
|
matches = glob(path)
|
|
if matches == []:
|
|
raise Exception("Could not find test %s" % t)
|
|
|
|
tests.extend(matches)
|
|
|
|
return sorted(tests)
|
|
|
|
SimpleResult = namedtuple('SimpleResult', 'run failures errors skipped time')
|
|
|
|
def start_test(ctx, subtests, rqueue):
|
|
'''
|
|
Run an individual test. 'subtests' are parsed prior to calling
|
|
but these effectively make up a single test. 'rqueue' is the
|
|
results queue which is required since this is using
|
|
multiprocessing.
|
|
'''
|
|
suite = unittest.TestSuite()
|
|
|
|
#
|
|
# Iterate through each individual python test.
|
|
#
|
|
for s in subtests:
|
|
loader = unittest.TestLoader()
|
|
subtest = importlib.import_module(os.path.splitext(s)[0])
|
|
suite.addTests(loader.loadTestsFromModule(subtest))
|
|
|
|
# Prevents future test modules with the same name (e.g.
|
|
# connection_test.py) from being loaded from the cache
|
|
sys.modules.pop(subtest.__name__)
|
|
|
|
start = time.time()
|
|
runner = unittest.TextTestRunner()
|
|
result = runner.run(suite)
|
|
#
|
|
# The multiprocessing queue is picky with what objects it will serialize
|
|
# and send between processes. Because of this we put the important bits
|
|
# of the result into our own 'SimpleResult' tuple.
|
|
#
|
|
sresult = SimpleResult(run=result.testsRun, failures=len(result.failures),
|
|
errors=len(result.errors), skipped=len(result.skipped),
|
|
time=time.time() - start)
|
|
rqueue.put(sresult)
|
|
|
|
# This may not be required since we are manually popping sys.modules
|
|
importlib.invalidate_caches()
|
|
|
|
def pre_test(ctx, test, copied):
|
|
'''
|
|
Copy test files, start processes, and any other pre test work.
|
|
'''
|
|
os.chdir(test)
|
|
|
|
dbg("Starting test %s" % test)
|
|
if not os.path.exists(test + '/hw.conf'):
|
|
print("No hw.conf found for %s" % test)
|
|
exit()
|
|
|
|
ctx.hw_config = ConfigParser()
|
|
ctx.hw_config.read(test + '/hw.conf')
|
|
#
|
|
# We have two types of test files: tests and everything else. Rather
|
|
# than require each test to specify the files needing to be copied to
|
|
# /tmp (previously 'tmpfs_extra_stuff'), we just copy everything which
|
|
# isn't a test. There is really no reason not to do this as any file
|
|
# present in a test directory should be needed by the test.
|
|
#
|
|
# All files
|
|
files = os.listdir(test)
|
|
# Tests (starts or ends with 'test')
|
|
subtests = [f for f in files if f.startswith('test') or \
|
|
os.path.splitext(f)[0].endswith('test')]
|
|
# Everything else (except .py files)
|
|
to_copy = [f for f in list(set(files) - set(subtests)) if not f.endswith('.py')]
|
|
for f in to_copy:
|
|
if os.path.isdir(f):
|
|
shutil.copytree(f, '/tmp/' + f)
|
|
else:
|
|
shutil.copy(f, '/tmp')
|
|
copied.append(f)
|
|
|
|
# Prune down any subtests if needed
|
|
if ctx.args.sub_tests:
|
|
ctx.args.sub_tests = ctx.args.sub_tests.split(',')
|
|
pruned = []
|
|
|
|
for s in subtests:
|
|
# Allow test name both with and without the extension
|
|
if s in ctx.args.sub_tests or os.path.splitext(s)[0] in ctx.args.sub_tests:
|
|
pruned.append(s)
|
|
|
|
subtests = pruned
|
|
|
|
ctx.start_dbus()
|
|
ctx.start_haveged()
|
|
ctx.start_dbus_monitor()
|
|
ctx.start_radios()
|
|
ctx.create_namespaces()
|
|
ctx.start_hostapd()
|
|
ctx.start_wpas_interfaces()
|
|
ctx.start_ofono()
|
|
|
|
if ctx.args.log:
|
|
ctx.start_process(['iwmon'])
|
|
elif ctx.args.monitor:
|
|
ctx.start_process(['iwmon'], outfile=ctx.args.monitor)
|
|
|
|
if ctx.hw_config.has_option('SETUP', 'start_iwd'):
|
|
start = ctx.hw_config.getboolean('SETUP', 'start_iwd')
|
|
else:
|
|
start = True
|
|
|
|
if start:
|
|
ctx.start_iwd()
|
|
else:
|
|
print("Not starting IWD from test-runner")
|
|
|
|
print(ctx)
|
|
|
|
sys.path.insert(1, test)
|
|
|
|
return sorted(subtests)
|
|
|
|
def post_test(ctx, to_copy):
|
|
'''
|
|
Remove copied files, and stop test processes.
|
|
'''
|
|
try:
|
|
for f in to_copy:
|
|
if os.path.isdir('/tmp/' + f):
|
|
shutil.rmtree('/tmp/' + f)
|
|
else:
|
|
os.remove('/tmp/' + f)
|
|
|
|
Process(['ifconfig', 'lo', 'down'], wait=True)
|
|
except Exception as e:
|
|
print("Exception thrown in post_test")
|
|
finally:
|
|
ctx.stop_test_processes()
|
|
|
|
allowed = ['phonesim.conf', 'certs', 'secrets', 'iwd']
|
|
for f in [f for f in os.listdir('/tmp') if f not in allowed]:
|
|
dbg("File %s was not cleaned up!" % f)
|
|
|
|
if ctx.args.valgrind:
|
|
with open('/tmp/valgrind.log', 'r') as f:
|
|
dbg(f.read())
|
|
dbg("\n")
|
|
|
|
def print_results(results):
|
|
table = PrettyTable(['Test', colored('Passed', 'green'), colored('Failed', 'red'), \
|
|
colored('Skipped', 'cyan'), colored('Time', 'yellow')])
|
|
|
|
total_pass = 0
|
|
total_fail = 0
|
|
total_skip = 0
|
|
total_time = 0
|
|
|
|
for test, result in results.items():
|
|
|
|
if result.time == TEST_MAX_TIMEOUT:
|
|
failed = "Timed out"
|
|
passed = "Timed out"
|
|
elif result.time == 0:
|
|
failed = "Exception"
|
|
passed = "Exception"
|
|
else:
|
|
failed = result.failures + result.errors
|
|
passed = result.run - failed
|
|
|
|
total_pass += passed
|
|
total_fail += failed
|
|
total_skip += result.skipped
|
|
|
|
total_time += result.time
|
|
|
|
time = '%.2f' % result.time
|
|
|
|
table.add_row([test, colored(passed, 'green'), colored(failed, 'red'), \
|
|
colored(result.skipped, 'cyan'), colored(time, 'yellow')])
|
|
|
|
total_time = '%.2f' % total_time
|
|
|
|
table.add_row(['Total', colored(total_pass, 'green'), colored(total_fail, 'red'), \
|
|
colored(total_skip, 'cyan'), colored(total_time, 'yellow')])
|
|
|
|
dbg(table)
|
|
|
|
def run_auto_tests(ctx, args):
|
|
tests = build_test_list(args)
|
|
|
|
# Copy autotests/misc/{certs,secrets,phonesim} so any test can refer to them
|
|
shutil.copytree(args.testhome + '/autotests/misc/certs', '/tmp/certs')
|
|
shutil.copytree(args.testhome + '/autotests/misc/secrets', '/tmp/secrets')
|
|
shutil.copy(args.testhome + '/autotests/misc/phonesim/phonesim.conf', '/tmp')
|
|
|
|
for test in tests:
|
|
copied = []
|
|
try:
|
|
subtests = pre_test(ctx, test, copied)
|
|
|
|
if args.shell:
|
|
#
|
|
# Shell really isn't meant to be used with multiple tests. If
|
|
# a set of tests was passed in just start out in the first.
|
|
#
|
|
os.chdir(tests[0])
|
|
os.system('/bin/bash')
|
|
exit()
|
|
|
|
if len(subtests) < 1:
|
|
dbg("No tests to run")
|
|
exit()
|
|
|
|
rqueue = multiprocessing.Queue()
|
|
p = multiprocessing.Process(target=start_test, args=(ctx, subtests, rqueue))
|
|
p.start()
|
|
# Rather than time each subtest we just time the total but
|
|
# mutiply the default time by the number of tests being run.
|
|
p.join(TEST_MAX_TIMEOUT * len(subtests))
|
|
|
|
if p.is_alive():
|
|
# Timeout
|
|
p.terminate()
|
|
|
|
ctx.results[os.path.basename(test)] = SimpleResult(run=0,
|
|
failures=0, errors=0,
|
|
skipped=0, time=TEST_MAX_TIMEOUT)
|
|
else:
|
|
ctx.results[os.path.basename(test)] = rqueue.get()
|
|
|
|
except Exception as ex:
|
|
print(ex)
|
|
print("Uncaught exception thrown for %s" % test)
|
|
ctx.results[os.path.basename(test)] = SimpleResult(run=0, failures=0,
|
|
errors=0, skipped=0, time=0)
|
|
finally:
|
|
post_test(ctx, copied)
|
|
|
|
shutil.rmtree('/tmp/iwd')
|
|
shutil.rmtree('/tmp/certs')
|
|
shutil.rmtree('/tmp/secrets')
|
|
os.remove('/tmp/phonesim.conf')
|
|
|
|
# Write out kernel log
|
|
if ctx.args.log:
|
|
ctx.start_process(["dmesg"], wait=True)
|
|
|
|
def run_unit_tests(ctx, args):
|
|
os.chdir(args.testhome + '/unit')
|
|
units = build_unit_list(args)
|
|
|
|
for u in units:
|
|
if ctx.start_process([u], wait=True).ret != 0:
|
|
dbg("Unit test %s failed" % os.path.basename(u))
|
|
else:
|
|
dbg("Unit test %s passed" % os.path.basename(u))
|
|
|
|
def run_tests():
|
|
global config
|
|
|
|
with open('/proc/cmdline', 'r') as f:
|
|
cmdline = f.read()
|
|
|
|
start = cmdline.find('--testhome')
|
|
|
|
options = shlex.split(cmdline[start:])
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--testhome')
|
|
parser.add_argument('--auto_tests')
|
|
parser.add_argument('--unit_tests')
|
|
parser.add_argument('--verbose', default=[])
|
|
parser.add_argument('--debug')
|
|
parser.add_argument('--path')
|
|
parser.add_argument('--valgrind')
|
|
parser.add_argument('--gdb')
|
|
parser.add_argument('--shell')
|
|
parser.add_argument('--log')
|
|
parser.add_argument('--log-gid')
|
|
parser.add_argument('--log-uid')
|
|
parser.add_argument('--hw')
|
|
parser.add_argument('--monitor')
|
|
parser.add_argument('--sub_tests')
|
|
|
|
args = parser.parse_args(options)
|
|
|
|
#
|
|
# This prevents any print() calls in this script from printing unless
|
|
# --debug is passed. For an 'always print' option use dbg()
|
|
#
|
|
if not args.debug:
|
|
sys.stdout = open(os.devnull, 'w')
|
|
|
|
if args.verbose != []:
|
|
args.verbose = args.verbose.split(',')
|
|
|
|
os.environ['PATH'] = '%s/src' % args.testhome
|
|
os.environ['PATH'] += ':%s/tools' % args.testhome
|
|
os.environ['PATH'] += ':%s/client' % args.testhome
|
|
os.environ['PATH'] += ':%s/monitor' % args.testhome
|
|
os.environ['PATH'] += ':%s/wired' % args.testhome
|
|
os.environ['PATH'] += ':' + args.path
|
|
|
|
sys.path.append(args.testhome + '/autotests/util')
|
|
|
|
#
|
|
# This allows all autotest utils (iwd/hostapd/etc) to access the
|
|
# TestContext. Any other module or script (in the same interpreter) can
|
|
# simply import config.ctx and access all live test information,
|
|
# start/stop processes, see active radios etc.
|
|
#
|
|
config = importlib.import_module('config')
|
|
config.ctx = TestContext(args)
|
|
|
|
if args.log:
|
|
mount('logdir', args.log, '9p', 0, 'trans=virtio,version=9p2000.L')
|
|
# Clear out any log files from other test runs
|
|
for f in glob('%s/*' % args.log):
|
|
print("removing %s" % f)
|
|
|
|
if os.path.isdir(f):
|
|
shutil.rmtree(f)
|
|
else:
|
|
os.remove(f)
|
|
elif args.monitor:
|
|
parent = os.path.abspath(os.path.join(args.monitor, os.pardir))
|
|
mount('mondir', parent, '9p', 0, 'trans=virtio,version=9p2000.L')
|
|
|
|
if config.ctx.args.unit_tests is None:
|
|
run_auto_tests(config.ctx, args)
|
|
else:
|
|
run_unit_tests(config.ctx, args)
|
|
|
|
class Main:
|
|
def __init__(self):
|
|
self.parser = argparse.ArgumentParser(
|
|
description='IWD Test Runner')
|
|
|
|
self.parser.add_argument('--qemu', '-q',
|
|
metavar='<QEMU binary>', type=str,
|
|
help='QEMU binary to use',
|
|
dest='qemu',
|
|
default=None)
|
|
self.parser.add_argument('--kernel', '-k', metavar='<kernel>',
|
|
type=str,
|
|
help='Path to kernel image',
|
|
dest='kernel',
|
|
default=None)
|
|
self.parser.add_argument('--verbose', '-v', metavar='<list>',
|
|
type=str,
|
|
help='Comma separated list of applications',
|
|
dest='verbose',
|
|
default=[])
|
|
self.parser.add_argument('--debug', '-d',
|
|
action='store_true',
|
|
help='Enable test-runner debugging',
|
|
dest='debug')
|
|
self.parser.add_argument('--shell', '-s', action='store_true',
|
|
help='Boot into shell', dest='shell')
|
|
self.parser.add_argument('--log', '-l', type=str,
|
|
help='Directory for log files')
|
|
self.parser.add_argument('--hw', '-w', type=str, nargs=1,
|
|
help='Use physical adapters for tests (passthrough)')
|
|
self.parser.add_argument('--monitor', '-m', type=str,
|
|
help='Enables iwmon output to file')
|
|
self.parser.add_argument('--sub-tests', '-S', metavar='<subtests>',
|
|
type=str, nargs=1, help='List of subtests to run',
|
|
default=None, dest='sub_tests')
|
|
|
|
# Prevent --autotest/--unittest from being used together
|
|
auto_unit_group = self.parser.add_mutually_exclusive_group()
|
|
auto_unit_group.add_argument('--auto-tests', '-A',
|
|
metavar='<tests>', type=str, nargs=1,
|
|
help='List of tests to run',
|
|
default=None,
|
|
dest='auto_tests')
|
|
auto_unit_group.add_argument('--unit-tests', '-U',
|
|
metavar='<tests>', type=str, nargs='?',
|
|
const='*',
|
|
help='List of unit tests to run',
|
|
dest='unit_tests')
|
|
|
|
# Prevent --valgrind/--gdb from being used together
|
|
valgrind_gdb_group = self.parser.add_mutually_exclusive_group()
|
|
valgrind_gdb_group.add_argument('--gdb', '-g', metavar='<exec>',
|
|
type=str, nargs=1,
|
|
help='Run gdb on specified executable',
|
|
dest='gdb')
|
|
valgrind_gdb_group.add_argument('--valgrind', '-V', action='store_true',
|
|
help='Run valgrind on IWD', dest='valgrind')
|
|
|
|
self.args = self.parser.parse_args()
|
|
|
|
if self.args.auto_tests:
|
|
self.args.auto_tests = self.args.auto_tests[0].split(',')
|
|
|
|
if self.args.sub_tests:
|
|
self.args.sub_tests = self.args.sub_tests[0].split(',')
|
|
|
|
if self.args.log and self.args.unit_tests:
|
|
dbg("Cannot use --log with --unit-tests")
|
|
quit()
|
|
|
|
if self.args.sub_tests:
|
|
if not self.args.auto_tests:
|
|
dbg("--sub-tests must be used with --auto-tests")
|
|
quit()
|
|
|
|
if len(self.args.auto_tests) > 1:
|
|
dbg("--sub-tests must be used with a single auto test")
|
|
quit()
|
|
|
|
def start(self):
|
|
usb_adapters = None
|
|
|
|
qemu_table = [
|
|
'qemu-system-x86_64',
|
|
'/usr/bin/qemu-system-x86_64'
|
|
]
|
|
|
|
kernel_table = [
|
|
'bzImage',
|
|
'arch/x86/boot/bzImage',
|
|
'vmlinux',
|
|
'arch/x86/boot/vmlinux'
|
|
]
|
|
|
|
if self.args.qemu is None:
|
|
qemu_binary = find_binary(qemu_table)
|
|
if not qemu_binary:
|
|
print("Could not find qemu binary")
|
|
quit()
|
|
else:
|
|
if path_exists(self.args.qemu):
|
|
qemu_binary = self.args.qemu
|
|
else:
|
|
print("QEMU binary %s does not exist" % \
|
|
self.args.qemu)
|
|
quit()
|
|
|
|
if self.args.kernel is None:
|
|
kernel_binary = find_binary(kernel_table)
|
|
if not kernel_binary:
|
|
print("Could not find kernel image")
|
|
quit()
|
|
else:
|
|
if path_exists(self.args.kernel):
|
|
kernel_binary = self.args.kernel
|
|
else:
|
|
print("Kernel image %s does not exist" % \
|
|
self.args.kernel)
|
|
quit()
|
|
|
|
if self.args.hw:
|
|
hw_conf = ConfigParser()
|
|
hw_conf.read(self.args.hw)
|
|
# TODO: Parse PCI adapters
|
|
if hw_conf.has_section('USBAdapters'):
|
|
# The actual key name of the adapter
|
|
# doesn't matter since all we need is the
|
|
# bus/address. This gets named by the kernel
|
|
# anyways once in the VM.
|
|
usb_adapters = [v for v in hw_conf['USBAdapters'].values()]
|
|
|
|
#
|
|
# Additional arguments not provided to test-runner which are
|
|
# needed once booted into the kernel.
|
|
#
|
|
options = 'init=%s' % os.path.realpath(sys.argv[0])
|
|
|
|
# Support running from top level as well as tools
|
|
if os.getcwd().endswith('tools'):
|
|
options += ' --testhome %s/../' % os.getcwd()
|
|
else:
|
|
options += ' --testhome %s' % os.getcwd()
|
|
|
|
options += ' --path "%s"' % os.environ['PATH']
|
|
|
|
if self.args.auto_tests:
|
|
options += ' --auto_tests %s' % ','.join(self.args.auto_tests)
|
|
|
|
if self.args.sub_tests:
|
|
options += ' --sub_tests %s' % ','.join(self.args.sub_tests)
|
|
|
|
if self.args.log:
|
|
if os.environ.get('SUDO_GID', None) is None:
|
|
print("--log can only be used as root user")
|
|
quit()
|
|
|
|
self.args.log = os.path.abspath(self.args.log)
|
|
uid = int(os.environ['SUDO_UID'])
|
|
gid = int(os.environ['SUDO_GID'])
|
|
|
|
if not path_exists(self.args.log):
|
|
os.mkdir(self.args.log)
|
|
os.chown(self.args.log, uid, gid)
|
|
|
|
options += ' --log-gid %u' % gid
|
|
options += ' --log-uid %u' % uid
|
|
|
|
if self.args.monitor:
|
|
if os.environ.get('SUDO_GID', None) is None:
|
|
print("--monitor can only be used as root user")
|
|
quit()
|
|
|
|
self.args.monitor = os.path.abspath(self.args.monitor)
|
|
mon_parent_dir = os.path.abspath(os.path.join(self.args.monitor, os.pardir))
|
|
|
|
denylist = [
|
|
'auto_tests',
|
|
'sub_tests',
|
|
'qemu',
|
|
'kernel'
|
|
]
|
|
|
|
nproc = multiprocessing.cpu_count()
|
|
|
|
#
|
|
# Specially handle CPU systems with minimal cores, otherwise
|
|
# use half the host cores.
|
|
#
|
|
if nproc < 2:
|
|
smp = 1
|
|
else:
|
|
smp = int(nproc / 2)
|
|
|
|
print("Using %d cores for VM" % smp)
|
|
|
|
#
|
|
# This passes through most of the command line options to
|
|
# the kernel command line. Some are not relevant (e.g. qemu)
|
|
# so similar options are added in the denylist above. This excludes
|
|
# any unset options which are assumed to be None or False. This
|
|
# is done so default arguments can be filled once in the VM. If
|
|
# we pass and basic types (None, False etc.) they are turned into
|
|
# a string representation ('None', 'False', etc.) which is not
|
|
# desirable.
|
|
#
|
|
for arg in vars(self.args):
|
|
if arg in denylist or getattr(self.args, arg) in [None, False, []]:
|
|
continue
|
|
options += ' --%s %s' % (arg, str(getattr(self.args, arg)))
|
|
|
|
kern_log = "ignore_loglevel" if "kernel" in self.args.verbose else "quiet"
|
|
|
|
qemu_cmdline = [
|
|
qemu_binary,
|
|
'-machine', 'type=q35,accel=kvm:tcg',
|
|
'-nodefaults', '-no-user-config', '-monitor', 'none',
|
|
'-display', 'none', '-m', '256M', '-nographic', '-vga',
|
|
'none', '-no-acpi', '-no-hpet',
|
|
'-no-reboot', '-fsdev',
|
|
'local,id=fsdev-root,path=/,readonly,security_model=none,multidevs=remap',
|
|
'-device',
|
|
'virtio-9p-pci,fsdev=fsdev-root,mount_tag=/dev/root',
|
|
'-chardev', 'stdio,id=chardev-serial0,signal=off',
|
|
'-device', 'pci-serial,chardev=chardev-serial0',
|
|
'-device', 'virtio-rng-pci',
|
|
'-kernel',
|
|
kernel_binary,
|
|
'-append',
|
|
'console=ttyS0,115200n8 earlyprintk=serial \
|
|
rootfstype=9p root=/dev/root \
|
|
rootflags=trans=virtio,version=9p2000.u \
|
|
acpi=off pci=noacpi %s ro \
|
|
mac80211_hwsim.radios=0 %s' % (kern_log, options),
|
|
'-cpu', 'host', '-smp', str(smp)
|
|
]
|
|
|
|
# Add two ethernet devices for testing EAD
|
|
qemu_cmdline.extend([
|
|
'-net', 'nic,model=virtio',
|
|
'-net', 'nic,model=virtio',
|
|
'-net', 'user'
|
|
])
|
|
|
|
if usb_adapters:
|
|
for bus, addr in [s.split(',') for s in usb_adapters]:
|
|
qemu_cmdline.extend(['-usb',
|
|
'-device',
|
|
'usb-host,hostbus=%s,hostaddr=%s' % \
|
|
(bus, addr)])
|
|
if self.args.log:
|
|
#
|
|
# Creates a virtfs device that can be mounted. This mount
|
|
# will point back to the provided log directory and is
|
|
# writable unlike the rest of the mounted file system.
|
|
#
|
|
qemu_cmdline.extend([
|
|
'-virtfs',
|
|
'local,path=%s,mount_tag=logdir,security_model=passthrough,id=logdir' \
|
|
% self.args.log
|
|
])
|
|
|
|
if self.args.monitor:
|
|
qemu_cmdline.extend([
|
|
'-virtfs',
|
|
'local,path=%s,mount_tag=mondir,security_model=passthrough,id=mondir' \
|
|
% mon_parent_dir
|
|
])
|
|
|
|
os.execlp(qemu_cmdline[0], *qemu_cmdline)
|
|
|
|
if __name__ == '__main__':
|
|
if os.getpid() == 1 and os.getppid() == 0:
|
|
atexit.register(exit_vm)
|
|
prepare_sandbox()
|
|
run_tests()
|
|
|
|
exit()
|
|
|
|
main = Main()
|
|
main.start()
|