mirror of
				https://git.kernel.org/pub/scm/network/wireless/iwd.git
				synced 2025-11-02 23:37:28 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			1888 lines
		
	
	
		
			48 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			1888 lines
		
	
	
		
			48 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/python3
 | 
						|
 | 
						|
import argparse
 | 
						|
import os
 | 
						|
import shutil
 | 
						|
import ctypes
 | 
						|
import fcntl
 | 
						|
import shlex
 | 
						|
import sys
 | 
						|
import subprocess
 | 
						|
import atexit
 | 
						|
import time
 | 
						|
import unittest
 | 
						|
import importlib
 | 
						|
import signal
 | 
						|
from unittest.result import TestResult
 | 
						|
import pyroute2
 | 
						|
import multiprocessing
 | 
						|
import re
 | 
						|
import traceback
 | 
						|
 | 
						|
from configparser import ConfigParser
 | 
						|
from prettytable import PrettyTable
 | 
						|
from termcolor import colored
 | 
						|
from glob import glob
 | 
						|
from collections import namedtuple
 | 
						|
from time import sleep
 | 
						|
import dbus.mainloop.glib
 | 
						|
from gi.repository import GLib
 | 
						|
from weakref import WeakValueDictionary
 | 
						|
 | 
						|
libc = ctypes.cdll['libc.so.6']
 | 
						|
libc.mount.argtypes = (ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, \
 | 
						|
			ctypes.c_ulong, ctypes.c_char_p)
 | 
						|
 | 
						|
# Using ctypes to load the libc library is somewhat low level. Because of this
 | 
						|
# we need to define our own flags/options for use with mounting.
 | 
						|
MS_NOSUID = 2
 | 
						|
MS_NODEV = 4
 | 
						|
MS_NOEXEC = 8
 | 
						|
MS_STRICTATIME = 1 << 24
 | 
						|
STDIN_FILENO = 0
 | 
						|
TIOCSTTY = 0x540E
 | 
						|
 | 
						|
config = None
 | 
						|
intf_id = 0
 | 
						|
 | 
						|
TEST_MAX_TIMEOUT = 240
 | 
						|
 | 
						|
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
 | 
						|
 | 
						|
def dbg(*s, **kwargs):
 | 
						|
	'''
 | 
						|
		Allows prints if stdout has been re-directed
 | 
						|
	'''
 | 
						|
	print(*s, **kwargs, file=sys.__stdout__)
 | 
						|
 | 
						|
def exit_vm():
 | 
						|
	if config:
 | 
						|
		for p in Process.get_all():
 | 
						|
			print("Process %s still running!" % p.args[0])
 | 
						|
			p.kill()
 | 
						|
 | 
						|
	if config.ctx and config.ctx.results:
 | 
						|
		print_results(config.ctx.results)
 | 
						|
 | 
						|
	os.sync()
 | 
						|
 | 
						|
	RB_AUTOBOOT = 0x01234567
 | 
						|
	#
 | 
						|
	# Calling 'reboot' or 'shutdown' from a shell (e.g. os.system('reboot'))
 | 
						|
	# is not the same the POSIX reboot() and will cause a kernel panic since
 | 
						|
	# we are the init process. The libc.reboot() allows the VM to exit
 | 
						|
	# gracefully.
 | 
						|
	#
 | 
						|
	libc.reboot(RB_AUTOBOOT)
 | 
						|
 | 
						|
def path_exists(path):
 | 
						|
	'''
 | 
						|
		Searches PATH as well as absolute paths.
 | 
						|
	'''
 | 
						|
	if shutil.which(path):
 | 
						|
		return True
 | 
						|
	try:
 | 
						|
		os.stat(path)
 | 
						|
	except:
 | 
						|
		return False
 | 
						|
	return True
 | 
						|
 | 
						|
def find_binary(list):
 | 
						|
	'''
 | 
						|
		Returns a binary from 'list' if its found in PATH or on a
 | 
						|
		valid absolute path.
 | 
						|
	'''
 | 
						|
	for path in list:
 | 
						|
		if path_exists(path):
 | 
						|
			return path
 | 
						|
	return None
 | 
						|
 | 
						|
def mount(source, target, fs, flags, options=''):
 | 
						|
	'''
 | 
						|
		Python wrapper for libc mount()
 | 
						|
	'''
 | 
						|
	ret = libc.mount(source.encode(), target.encode(), fs.encode(), flags,
 | 
						|
				options.encode())
 | 
						|
	if ret < 0:
 | 
						|
		errno = ctypes.get_errno()
 | 
						|
		raise Exception("Could not mount %s (%d)" % (target, errno))
 | 
						|
 | 
						|
MountInfo = namedtuple('MountInfo', 'fstype target options flags')
 | 
						|
 | 
						|
mount_table = [
 | 
						|
	MountInfo('sysfs', '/sys', '', MS_NOSUID|MS_NOEXEC|MS_NODEV),
 | 
						|
	MountInfo('proc', '/proc', '', MS_NOSUID|MS_NOEXEC|MS_NODEV),
 | 
						|
	MountInfo('devpts', '/dev/pts', 'mode=0620', MS_NOSUID|MS_NOEXEC),
 | 
						|
	MountInfo('tmpfs', '/dev/shm', 'mode=1777', MS_NOSUID|MS_NODEV|MS_STRICTATIME),
 | 
						|
	MountInfo('tmpfs', '/run', 'mode=0755', MS_NOSUID|MS_NODEV|MS_STRICTATIME),
 | 
						|
	MountInfo('tmpfs', '/tmp', '', 0),
 | 
						|
	MountInfo('tmpfs', '/usr/share/dbus-1', 'mode=0755', MS_NOSUID|MS_NOEXEC|MS_NODEV|MS_STRICTATIME),
 | 
						|
	MountInfo('debugfs', '/sys/kernel/debug', '', 0)
 | 
						|
]
 | 
						|
 | 
						|
DevInfo = namedtuple('DevInfo', 'target linkpath')
 | 
						|
 | 
						|
dev_table = [
 | 
						|
	DevInfo('/proc/self/fd', '/dev/fd'),
 | 
						|
	DevInfo('/proc/self/fd/0', '/dev/stdin'),
 | 
						|
	DevInfo('/proc/self/fd/1', '/dev/stdout'),
 | 
						|
	DevInfo('/proc/self/fd/2', '/dev/stderr')
 | 
						|
]
 | 
						|
 | 
						|
# Partial DBus config. The remainder (<listen>) will be filled in for each
 | 
						|
# namespace that is created so each individual dbus-daemon has its own socket
 | 
						|
# and address.
 | 
						|
dbus_config = '''
 | 
						|
<!DOCTYPE busconfig PUBLIC \
 | 
						|
"-//freedesktop//DTD D-Bus Bus Configuration 1.0//EN" \
 | 
						|
"http://www.freedesktop.org/standards/dbus/1.0/\
 | 
						|
busconfig.dtd\">
 | 
						|
<busconfig>
 | 
						|
<type>system</type>
 | 
						|
<limit name=\"reply_timeout\">2147483647</limit>
 | 
						|
<auth>ANONYMOUS</auth>
 | 
						|
<allow_anonymous/>
 | 
						|
<policy context=\"default\">
 | 
						|
<allow user=\"*\"/>
 | 
						|
<allow own=\"*\"/>
 | 
						|
<allow send_type=\"method_call\"/>
 | 
						|
<allow send_type=\"signal\"/>
 | 
						|
<allow send_type=\"method_return\"/>
 | 
						|
<allow send_type=\"error\"/>
 | 
						|
<allow receive_type=\"method_call\"/>
 | 
						|
<allow receive_type=\"signal\"/>
 | 
						|
<allow receive_type=\"method_return\"/>
 | 
						|
<allow receive_type=\"error\"/>
 | 
						|
<allow send_destination=\"*\" eavesdrop=\"true\"/>
 | 
						|
<allow eavesdrop=\"true\"/>
 | 
						|
</policy>
 | 
						|
'''
 | 
						|
 | 
						|
class Process(subprocess.Popen):
 | 
						|
	processes = WeakValueDictionary()
 | 
						|
	ctx = None
 | 
						|
 | 
						|
	def __new__(cls, *args, **kwargs):
 | 
						|
		obj = super().__new__(cls)
 | 
						|
		cls.processes[id(obj)] = obj
 | 
						|
		return obj
 | 
						|
 | 
						|
	def __init__(self, args, namespace=None, outfile=None, env=None, check=False, cleanup=None):
 | 
						|
		self.write_fds = []
 | 
						|
		self.io_watch = None
 | 
						|
		self.cleanup = cleanup
 | 
						|
		self.verbose = False
 | 
						|
		self.out = ''
 | 
						|
		self.hup = False
 | 
						|
		self.killed = False
 | 
						|
		self.namespace = namespace
 | 
						|
 | 
						|
		if not self.ctx:
 | 
						|
			global config
 | 
						|
			self.ctx = config.ctx
 | 
						|
 | 
						|
		if self.ctx.is_verbose(args[0], log=False):
 | 
						|
			self.verbose = True
 | 
						|
 | 
						|
		if namespace:
 | 
						|
			args = ['ip', 'netns', 'exec', namespace] + args
 | 
						|
 | 
						|
		if outfile:
 | 
						|
			# outfile is only used by iwmon, in which case we don't want
 | 
						|
			# to append to an existing file.
 | 
						|
			self._append_outfile(outfile, append=False)
 | 
						|
 | 
						|
		if self.ctx.args.log:
 | 
						|
			logfile = '%s/%s/%s' % (self.ctx.args.log,
 | 
						|
						os.path.basename(os.getcwd()),
 | 
						|
						args[0])
 | 
						|
			self._append_outfile(logfile)
 | 
						|
 | 
						|
		super().__init__(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
 | 
						|
					env=env, cwd=os.getcwd())
 | 
						|
 | 
						|
		# Set as non-blocking so read() in the IO callback doesn't block forever
 | 
						|
		fl = fcntl.fcntl(self.stdout, fcntl.F_GETFL)
 | 
						|
		fcntl.fcntl(self.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
 | 
						|
 | 
						|
		self.io_watch = GLib.io_add_watch(self.stdout, GLib.IO_IN |
 | 
						|
						GLib.IO_HUP | GLib.IO_ERR, self.process_io)
 | 
						|
 | 
						|
		print("Starting process {}".format(self.args))
 | 
						|
 | 
						|
		if check:
 | 
						|
			self.wait(10)
 | 
						|
			self.killed = True
 | 
						|
			if self.returncode != 0:
 | 
						|
				raise subprocess.CalledProcessError(returncode=self.returncode,
 | 
						|
									cmd=args)
 | 
						|
 | 
						|
	@classmethod
 | 
						|
	def get_all(cls):
 | 
						|
		return cls.processes.values()
 | 
						|
 | 
						|
	@classmethod
 | 
						|
	def kill_all(cls):
 | 
						|
		for p in cls.processes.values():
 | 
						|
			p.kill()
 | 
						|
 | 
						|
	@staticmethod
 | 
						|
	def _write_io(instance, data, stdout=True):
 | 
						|
		for f in instance.write_fds:
 | 
						|
			f.write(data)
 | 
						|
 | 
						|
			# Write out a separator so multiple process calls per
 | 
						|
			# test are easer to read.
 | 
						|
			if instance.hup:
 | 
						|
				f.write("Terminated: {}\n\n".format(instance.args))
 | 
						|
 | 
						|
			f.flush()
 | 
						|
 | 
						|
		if instance.verbose and stdout:
 | 
						|
			sys.__stdout__.write(data)
 | 
						|
			sys.__stdout__.flush()
 | 
						|
 | 
						|
	@classmethod
 | 
						|
	def write_separators(cls, sep):
 | 
						|
		for proc in cls.processes.values():
 | 
						|
			if proc.killed:
 | 
						|
				continue
 | 
						|
 | 
						|
			cls._write_io(proc, sep, stdout=False)
 | 
						|
 | 
						|
	def process_io(self, source, condition):
 | 
						|
		if condition & GLib.IO_HUP:
 | 
						|
			self.hup = True
 | 
						|
 | 
						|
		data = source.read()
 | 
						|
 | 
						|
		if not data:
 | 
						|
			return True
 | 
						|
 | 
						|
		data = data.decode('utf-8')
 | 
						|
 | 
						|
		# Save data away in case the caller needs it (e.g. list_sta)
 | 
						|
		self.out += data
 | 
						|
 | 
						|
		self._write_io(self, data)
 | 
						|
 | 
						|
		return True
 | 
						|
 | 
						|
	def _append_outfile(self, file, append=True):
 | 
						|
		gid = int(self.ctx.args.log_gid)
 | 
						|
		uid = int(self.ctx.args.log_uid)
 | 
						|
		dir = os.path.dirname(file)
 | 
						|
 | 
						|
		if not path_exists(dir):
 | 
						|
			os.mkdir(dir)
 | 
						|
			os.chown(dir, uid, gid)
 | 
						|
 | 
						|
		file = os.path.join(dir,file)
 | 
						|
 | 
						|
		# If the out file exists, append. Useful for processes like
 | 
						|
		# hostapd_cli where it is called multiple times independently.
 | 
						|
		if os.path.isfile(file) and append:
 | 
						|
			mode = 'a'
 | 
						|
		else:
 | 
						|
			mode = 'w'
 | 
						|
 | 
						|
		try:
 | 
						|
			f = open(os.path.join(dir, file), mode)
 | 
						|
		except Exception as e:
 | 
						|
			traceback.print_exc()
 | 
						|
			exit(0)
 | 
						|
 | 
						|
		os.fchown(f.fileno(), uid, gid)
 | 
						|
 | 
						|
		self.write_fds.append(f)
 | 
						|
 | 
						|
	def wait_for_socket(self, socket, wait):
 | 
						|
		Namespace.non_block_wait(os.path.exists, wait, socket)
 | 
						|
 | 
						|
	# Wait for both process termination and HUP signal
 | 
						|
	def __wait(self, timeout):
 | 
						|
		try:
 | 
						|
			super().wait(timeout)
 | 
						|
			if not self.hup:
 | 
						|
				return False
 | 
						|
 | 
						|
			return True
 | 
						|
		except:
 | 
						|
			return False
 | 
						|
 | 
						|
	# Override wait() so it can do so non-blocking
 | 
						|
	def wait(self, timeout=10):
 | 
						|
		Namespace.non_block_wait(self.__wait, timeout, 1)
 | 
						|
		self._cleanup()
 | 
						|
 | 
						|
	def _cleanup(self):
 | 
						|
		if self.cleanup:
 | 
						|
			self.cleanup()
 | 
						|
 | 
						|
		self.write_fds = []
 | 
						|
 | 
						|
		if self.io_watch:
 | 
						|
			GLib.source_remove(self.io_watch)
 | 
						|
			self.io_watch = None
 | 
						|
 | 
						|
		self.cleanup = None
 | 
						|
		self.killed = True
 | 
						|
 | 
						|
	# Override kill()
 | 
						|
	def kill(self, force=False):
 | 
						|
		if self.killed:
 | 
						|
			return
 | 
						|
 | 
						|
		print("Killing process {}".format(self.args))
 | 
						|
 | 
						|
		if force:
 | 
						|
			super().kill()
 | 
						|
		else:
 | 
						|
			self.terminate()
 | 
						|
 | 
						|
		try:
 | 
						|
			self.wait(timeout=15)
 | 
						|
		except:
 | 
						|
			dbg("Process %s did not complete in 15 seconds!" % self.name)
 | 
						|
			super().kill()
 | 
						|
 | 
						|
		self._cleanup()
 | 
						|
 | 
						|
	def __str__(self):
 | 
						|
		return str(self.args) + '\n'
 | 
						|
 | 
						|
class Interface:
 | 
						|
	def __init__(self, name, config):
 | 
						|
		self.name = name
 | 
						|
		self.ctrl_interface = '/var/run/hostapd/' + name
 | 
						|
		self.config = config
 | 
						|
 | 
						|
	def __del__(self):
 | 
						|
		Process(['iw', 'dev', self.name, 'del']).wait()
 | 
						|
 | 
						|
	def set_interface_state(self, state):
 | 
						|
		Process(['ip', 'link', 'set', self.name, state]).wait()
 | 
						|
 | 
						|
class Radio:
 | 
						|
	def __init__(self, name):
 | 
						|
		self.name = name
 | 
						|
		# hostapd will reset this if this radio is used by it
 | 
						|
		self.use = 'iwd'
 | 
						|
		self.interface = None
 | 
						|
 | 
						|
	def __del__(self):
 | 
						|
		print("Removing radio %s" % self.name)
 | 
						|
		self.interface = None
 | 
						|
 | 
						|
	def create_interface(self, config, use):
 | 
						|
		global intf_id
 | 
						|
 | 
						|
		ifname = 'wln%s' % intf_id
 | 
						|
 | 
						|
		intf_id += 1
 | 
						|
 | 
						|
		self.interface = Interface(ifname, config)
 | 
						|
		self.use = use
 | 
						|
 | 
						|
		Process(['iw', 'phy', self.name, 'interface', 'add', ifname,
 | 
						|
				'type', 'managed']).wait()
 | 
						|
 | 
						|
		return self.interface
 | 
						|
 | 
						|
	def __str__(self):
 | 
						|
		ret = self.name + ':\n'
 | 
						|
		ret += '\tUsed By: %s ' % self.use
 | 
						|
		if self.interface:
 | 
						|
			ret += '(%s)' % self.interface.name
 | 
						|
 | 
						|
		ret += '\n'
 | 
						|
 | 
						|
		return ret
 | 
						|
 | 
						|
class VirtualRadio(Radio):
 | 
						|
	'''
 | 
						|
		A subclass of 'Radio' specific to mac80211_hwsim radios.
 | 
						|
 | 
						|
		TODO: Using D-Bus to create and destroy radios is more desireable
 | 
						|
		than the command line.
 | 
						|
	'''
 | 
						|
 | 
						|
	def __init__(self, name, cfg=None):
 | 
						|
		global config
 | 
						|
 | 
						|
		self.disable_cipher = None
 | 
						|
		self.disable_iftype = None
 | 
						|
 | 
						|
		self.hwsim = config.hwsim.Hwsim()
 | 
						|
 | 
						|
		if cfg:
 | 
						|
			self.disable_iftype = cfg.get('iftype_disable', None)
 | 
						|
			self.disable_cipher = cfg.get('cipher_disable', None)
 | 
						|
 | 
						|
		self._radio = self.hwsim.radios.create(name, p2p_device=True,
 | 
						|
					iftype_disable=self.disable_iftype,
 | 
						|
					cipher_disable=self.disable_cipher)
 | 
						|
 | 
						|
		super().__init__(self._radio.name)
 | 
						|
 | 
						|
	def __del__(self):
 | 
						|
		super().__del__()
 | 
						|
 | 
						|
		# If the radio was moved into a namespace this will fail
 | 
						|
		try:
 | 
						|
			self._radio.remove()
 | 
						|
		except:
 | 
						|
			pass
 | 
						|
 | 
						|
		self._radio = None
 | 
						|
 | 
						|
	def __str__(self):
 | 
						|
		ret = super().__str__()
 | 
						|
 | 
						|
		if self.disable_iftype:
 | 
						|
			ret += '\tDisabled interface types: %s\n' % self.disable_iftype
 | 
						|
 | 
						|
		if self.disable_cipher:
 | 
						|
			ret += '\tDisabled ciphers: %s\n' % self.disable_cipher
 | 
						|
 | 
						|
		ret += '\tPath: %s' % self._radio.path
 | 
						|
 | 
						|
		ret += '\n'
 | 
						|
 | 
						|
		return ret
 | 
						|
 | 
						|
class HostapdInstance:
 | 
						|
	'''
 | 
						|
		A single instance of hostapd. In reality all hostapd instances
 | 
						|
		are started as a single process. This class just makes things
 | 
						|
		convenient for communicating with one of the hostapd APs.
 | 
						|
	'''
 | 
						|
	def __init__(self, config, radio):
 | 
						|
		self.radio = radio
 | 
						|
		self.config = config
 | 
						|
		self.cli = None
 | 
						|
 | 
						|
		self.intf = radio.create_interface(self.config, 'hostapd')
 | 
						|
		self.intf.set_interface_state('up')
 | 
						|
 | 
						|
	def __del__(self):
 | 
						|
		print("Removing HostapdInstance %s" % self.config)
 | 
						|
		self.intf.set_interface_state('down')
 | 
						|
		self.radio = None
 | 
						|
		self.intf = None
 | 
						|
 | 
						|
	def __str__(self):
 | 
						|
		ret = 'Hostapd (%s)\n' % self.intf.name
 | 
						|
		ret += '\tConfig: %s\n' % self.config
 | 
						|
 | 
						|
		return ret
 | 
						|
 | 
						|
class Hostapd:
 | 
						|
	'''
 | 
						|
		A set of running hostapd instances. This is really just a single
 | 
						|
		process since hostapd can be started with multiple config files.
 | 
						|
	'''
 | 
						|
	def __init__(self, ctx, radios, configs, radius):
 | 
						|
		self.ctx = ctx
 | 
						|
 | 
						|
		if len(configs) != len(radios):
 | 
						|
			raise Exception("Config (%d) and radio (%d) list length not equal" % \
 | 
						|
						(len(configs), len(radios)))
 | 
						|
 | 
						|
		print("Initializing hostapd instances")
 | 
						|
 | 
						|
		Process(['ip', 'link', 'set', 'eth0', 'up']).wait()
 | 
						|
		Process(['ip', 'link', 'set', 'eth1', 'up']).wait()
 | 
						|
 | 
						|
		self.global_ctrl_iface = '/var/run/hostapd/ctrl'
 | 
						|
 | 
						|
		self.instances = [HostapdInstance(c, r) for c, r in zip(configs, radios)]
 | 
						|
 | 
						|
		ifaces = [rad.interface.name for rad in radios]
 | 
						|
		ifaces = ','.join(ifaces)
 | 
						|
 | 
						|
		args = ['hostapd', '-g', self.global_ctrl_iface]
 | 
						|
 | 
						|
		if ifaces:
 | 
						|
			args.extend(['-i', ifaces])
 | 
						|
 | 
						|
		#
 | 
						|
		# Config files should already be present in /tmp. This appends
 | 
						|
		# ctrl_interface and does any variable replacement. Currently
 | 
						|
		# this is just any $ifaceN occurrences.
 | 
						|
		#
 | 
						|
		for c in configs:
 | 
						|
			full_path = '/tmp/%s' % c
 | 
						|
			args.append(full_path)
 | 
						|
 | 
						|
			self._rewrite_config(full_path)
 | 
						|
 | 
						|
		if radius:
 | 
						|
			args.append(radius)
 | 
						|
 | 
						|
		if ctx.is_verbose('hostapd'):
 | 
						|
			args.append('-d')
 | 
						|
 | 
						|
		self.process = Process(args)
 | 
						|
 | 
						|
		self.process.wait_for_socket(self.global_ctrl_iface, 30)
 | 
						|
 | 
						|
		for hapd in self.instances:
 | 
						|
			self.process.wait_for_socket(hapd.intf.ctrl_interface, 30)
 | 
						|
 | 
						|
	def attach_cli(self):
 | 
						|
		global config
 | 
						|
 | 
						|
		for hapd in self.instances:
 | 
						|
			hapd.cli = config.hostapd.HostapdCLI(config=hapd.config)
 | 
						|
 | 
						|
	def _rewrite_config(self, config):
 | 
						|
		'''
 | 
						|
			Replaces any $ifaceN values with the correct interface
 | 
						|
			names as well as appends the ctrl_interface path to
 | 
						|
			the config file.
 | 
						|
		'''
 | 
						|
		with open(config, 'r+') as f:
 | 
						|
			data = f.read()
 | 
						|
			to_replace = []
 | 
						|
			for match in re.finditer(r'\$iface[0-9]+', data):
 | 
						|
				tag = data[match.start():match.end()]
 | 
						|
				idx = tag.split('iface')[1]
 | 
						|
 | 
						|
				to_replace.append((tag, self.instances[int(idx)].intf.name))
 | 
						|
 | 
						|
			for r in to_replace:
 | 
						|
				data = data.replace(r[0], r[1], 1)
 | 
						|
 | 
						|
			data += '\nctrl_interface=/var/run/hostapd\n'
 | 
						|
 | 
						|
			f.write(data)
 | 
						|
 | 
						|
	def __getitem__(self, config):
 | 
						|
		if not config:
 | 
						|
			return self.instances[0]
 | 
						|
 | 
						|
		for hapd in self.instances:
 | 
						|
			if hapd.config == config:
 | 
						|
				return hapd
 | 
						|
 | 
						|
		return None
 | 
						|
 | 
						|
	def __del__(self):
 | 
						|
		print("Removing Hostapd")
 | 
						|
		try:
 | 
						|
			os.remove(self.global_ctrl_iface)
 | 
						|
		except:
 | 
						|
			print("Failed to remove %s" % self.global_ctrl_iface)
 | 
						|
 | 
						|
		self.instances = None
 | 
						|
 | 
						|
		# Hostapd may have already been stopped
 | 
						|
		if self.process:
 | 
						|
			self.ctx.stop_process(self.process)
 | 
						|
 | 
						|
		self.ctx = None
 | 
						|
 | 
						|
		# Hostapd creates simdb sockets for EAP-SIM/AKA tests but does not
 | 
						|
		# clean them up.
 | 
						|
		for f in glob("/tmp/eap_sim_db*"):
 | 
						|
			os.remove(f)
 | 
						|
 | 
						|
dbus_count = 0
 | 
						|
 | 
						|
class Namespace:
 | 
						|
	def __init__(self, args, name, radios):
 | 
						|
		self.dbus_address = None
 | 
						|
		self.name = name
 | 
						|
		self.radios = radios
 | 
						|
		self.args = args
 | 
						|
 | 
						|
		Process(['ip', 'netns', 'add', name]).wait()
 | 
						|
		for r in radios:
 | 
						|
			Process(['iw', 'phy', r.name, 'set', 'netns', 'name', name]).wait()
 | 
						|
 | 
						|
		self.start_dbus()
 | 
						|
 | 
						|
	def reset(self):
 | 
						|
		self._bus = None
 | 
						|
 | 
						|
		for r in self.radios:
 | 
						|
			r._radio = None
 | 
						|
 | 
						|
		self.radios = []
 | 
						|
 | 
						|
		Process.kill_all()
 | 
						|
 | 
						|
	def __del__(self):
 | 
						|
		print("Removing namespace %s" % self.name)
 | 
						|
 | 
						|
		Process(['ip', 'netns', 'del', self.name]).wait()
 | 
						|
 | 
						|
	def get_bus(self):
 | 
						|
		return self._bus
 | 
						|
 | 
						|
	def start_process(self, args, env=None, **kwargs):
 | 
						|
		if not env:
 | 
						|
			env = os.environ.copy()
 | 
						|
 | 
						|
		if hasattr(self, "dbus_address"):
 | 
						|
			# In case this process needs DBus...
 | 
						|
			env['DBUS_SYSTEM_BUS_ADDRESS'] = self.dbus_address
 | 
						|
 | 
						|
		return Process(args, namespace=self.name, env=env, **kwargs)
 | 
						|
 | 
						|
	def stop_process(self, p, force=False):
 | 
						|
		p.kill(force)
 | 
						|
 | 
						|
	def is_process_running(self, process):
 | 
						|
		for p in Process.get_all():
 | 
						|
			if p.namespace == self.name and p.args[0] == process:
 | 
						|
				return True
 | 
						|
		return False
 | 
						|
 | 
						|
	def _cleanup_dbus(self):
 | 
						|
		try:
 | 
						|
			os.remove(self.dbus_address.split('=')[1])
 | 
						|
		except:
 | 
						|
			pass
 | 
						|
 | 
						|
		os.remove(self.dbus_cfg)
 | 
						|
 | 
						|
	def start_dbus(self):
 | 
						|
		global dbus_count
 | 
						|
 | 
						|
		self.dbus_address = 'unix:path=/tmp/dbus%d' % dbus_count
 | 
						|
		self.dbus_cfg = '/tmp/dbus%d.conf' % dbus_count
 | 
						|
		dbus_count += 1
 | 
						|
 | 
						|
		with open(self.dbus_cfg, 'w+') as f:
 | 
						|
			f.write(dbus_config)
 | 
						|
			f.write('<listen>%s</listen>\n' % self.dbus_address)
 | 
						|
			f.write('</busconfig>\n')
 | 
						|
 | 
						|
		p = self.start_process(['dbus-daemon', '--config-file=%s' % self.dbus_cfg],
 | 
						|
					cleanup=self._cleanup_dbus)
 | 
						|
 | 
						|
		p.wait_for_socket(self.dbus_address.split('=')[1], 5)
 | 
						|
 | 
						|
		self._bus = dbus.bus.BusConnection(address_or_type=self.dbus_address)
 | 
						|
 | 
						|
	def start_iwd(self, config_dir = '/tmp', storage_dir = '/tmp/iwd'):
 | 
						|
		args = []
 | 
						|
		iwd_radios = ','.join([r.name for r in self.radios if r.use == 'iwd'])
 | 
						|
 | 
						|
		if self.args.valgrind:
 | 
						|
			args.extend(['valgrind', '--leak-check=full', '--track-origins=yes',
 | 
						|
					'--show-leak-kinds=all',
 | 
						|
					'--log-file=/tmp/valgrind.log.%p'])
 | 
						|
 | 
						|
		args.extend(['iwd', '-E'])
 | 
						|
 | 
						|
		if iwd_radios != '':
 | 
						|
			args.extend(['-p', iwd_radios])
 | 
						|
 | 
						|
		if self.is_verbose(args[0]):
 | 
						|
			args.append('-d')
 | 
						|
 | 
						|
		env = os.environ.copy()
 | 
						|
 | 
						|
		env['CONFIGURATION_DIRECTORY'] = config_dir
 | 
						|
		env['STATE_DIRECTORY'] = storage_dir
 | 
						|
 | 
						|
		if self.is_verbose('iwd-dhcp'):
 | 
						|
			env['IWD_DHCP_DEBUG'] = '1'
 | 
						|
 | 
						|
		if self.is_verbose('iwd-tls'):
 | 
						|
			env['IWD_TLS_DEBUG'] = '1'
 | 
						|
 | 
						|
		if self.is_verbose('iwd-acd'):
 | 
						|
			env['IWD_ACD_DEBUG'] = '1'
 | 
						|
 | 
						|
		return self.start_process(args, env=env)
 | 
						|
 | 
						|
	def is_verbose(self, process, log=True):
 | 
						|
		process = os.path.basename(process)
 | 
						|
 | 
						|
		if self.args is None:
 | 
						|
			return False
 | 
						|
 | 
						|
		# every process is verbose when logging is enabled
 | 
						|
		if log and self.args.log:
 | 
						|
			return True
 | 
						|
 | 
						|
		if process in self.args.verbose:
 | 
						|
			return True
 | 
						|
 | 
						|
		# Special case here to enable verbose output with valgrind running
 | 
						|
		if process == 'valgrind' and 'iwd' in self.args.verbose:
 | 
						|
			return True
 | 
						|
 | 
						|
		# Handle any glob matches
 | 
						|
		for item in self.args.verbose:
 | 
						|
			if process in glob(item):
 | 
						|
				return True
 | 
						|
 | 
						|
		return False
 | 
						|
 | 
						|
	@staticmethod
 | 
						|
	def non_block_wait(func, timeout, *args, exception=True):
 | 
						|
		'''
 | 
						|
			Convenience function for waiting in a non blocking
 | 
						|
			manor using GLibs context iteration i.e. does not block
 | 
						|
			the main loop while waiting.
 | 
						|
 | 
						|
			'func' will be called at least once and repeatedly until
 | 
						|
			either it returns success, throws an exception, or the
 | 
						|
			'timeout' expires.
 | 
						|
 | 
						|
			'timeout' is the ultimate timeout in seconds
 | 
						|
 | 
						|
			'*args' will be passed to 'func'
 | 
						|
 | 
						|
			If 'exception' is an Exception type it will be raised.
 | 
						|
			If 'exception' is True a generic TimeoutError will be raised.
 | 
						|
			Any other value will not result in an exception.
 | 
						|
		'''
 | 
						|
		# Simple class for signaling the wait timeout
 | 
						|
		class Bool:
 | 
						|
			def __init__(self, value):
 | 
						|
				self.value = value
 | 
						|
 | 
						|
		def wait_timeout_cb(done):
 | 
						|
			done.value = True
 | 
						|
			return False
 | 
						|
 | 
						|
		mainloop = GLib.MainLoop()
 | 
						|
		done = Bool(False)
 | 
						|
 | 
						|
		timeout = GLib.timeout_add_seconds(timeout, wait_timeout_cb, done)
 | 
						|
		context = mainloop.get_context()
 | 
						|
 | 
						|
		while True:
 | 
						|
			context.iteration(may_block=False)
 | 
						|
 | 
						|
			try:
 | 
						|
				ret = func(*args)
 | 
						|
				if ret:
 | 
						|
					if not done.value:
 | 
						|
						GLib.source_remove(timeout)
 | 
						|
					return ret
 | 
						|
			except Exception as e:
 | 
						|
				if not done.value:
 | 
						|
					GLib.source_remove(timeout)
 | 
						|
				raise e
 | 
						|
 | 
						|
			sleep(0.1)
 | 
						|
 | 
						|
			if done.value == True:
 | 
						|
				if isinstance(exception, Exception):
 | 
						|
					raise exception
 | 
						|
				elif type(exception) == bool and exception:
 | 
						|
					raise TimeoutError("Timeout on non_block_wait")
 | 
						|
				else:
 | 
						|
					return
 | 
						|
 | 
						|
	def __str__(self):
 | 
						|
		ret = 'Namespace: %s\n' % self.name
 | 
						|
		ret += 'Processes:\n'
 | 
						|
		for p in Process.get_all():
 | 
						|
			ret += '\t%s' % str(p)
 | 
						|
 | 
						|
		ret += 'Radios:\n'
 | 
						|
		if len(self.radios) > 0:
 | 
						|
			for r in self.radios:
 | 
						|
				ret += '\t%s\n' % str(r)
 | 
						|
		else:
 | 
						|
			ret += '\tNo Radios\n'
 | 
						|
 | 
						|
		ret += 'DBus Address: %s\n' % self.dbus_address
 | 
						|
		ret += '===================================================\n\n'
 | 
						|
 | 
						|
		return ret
 | 
						|
 | 
						|
class TestContext(Namespace):
 | 
						|
	'''
 | 
						|
		Contains all information for a given set of tests being run
 | 
						|
		such as processes, radios, interfaces and test results.
 | 
						|
	'''
 | 
						|
	def __init__(self, args):
 | 
						|
		self.name = None
 | 
						|
		self.args = args
 | 
						|
		self.hw_config = None
 | 
						|
		self.hostapd = None
 | 
						|
		self.wpas_interfaces = None
 | 
						|
		self.cur_radio_id = 0
 | 
						|
		self.cur_iface_id = 0
 | 
						|
		self.radios = []
 | 
						|
		self.loopback_started = False
 | 
						|
		self.results = {}
 | 
						|
		self.mainloop = GLib.MainLoop()
 | 
						|
		self.namespaces = []
 | 
						|
 | 
						|
	def start_dbus_monitor(self):
 | 
						|
		if not self.is_verbose('dbus-monitor'):
 | 
						|
			return
 | 
						|
 | 
						|
		self.start_process(['dbus-monitor', '--address', self.dbus_address])
 | 
						|
 | 
						|
	def start_haveged(self):
 | 
						|
		self.start_process(['haveged', '-F'])
 | 
						|
 | 
						|
	def create_radios(self):
 | 
						|
		setup = self.hw_config['SETUP']
 | 
						|
		nradios = int(setup['num_radios'])
 | 
						|
		args = ['hwsim']
 | 
						|
 | 
						|
		if self.hw_config['SETUP'].get('hwsim_medium', 'no') in ['no', '0', 'false']:
 | 
						|
			# register hwsim as medium
 | 
						|
			args.extend(['--no-register'])
 | 
						|
 | 
						|
		self.start_process(args)
 | 
						|
		self.non_block_wait(self._bus.name_has_owner, 20, 'net.connman.hwsim',
 | 
						|
					exception=TimeoutError('net.connman.hwsim did not appear'))
 | 
						|
 | 
						|
		for i in range(nradios):
 | 
						|
			name = 'rad%u' % i
 | 
						|
 | 
						|
			# Get any [radX] sections. These are for configuring
 | 
						|
			# any special radios. This no longer requires a
 | 
						|
			# radio_conf list, we just assume radios start rad0
 | 
						|
			# and increment.
 | 
						|
			rad_config = None
 | 
						|
			if self.hw_config.has_section(name):
 | 
						|
				rad_config = self.hw_config[name]
 | 
						|
 | 
						|
			self.radios.append(VirtualRadio(name, rad_config))
 | 
						|
			self.cur_radio_id += 1
 | 
						|
 | 
						|
	def discover_radios(self):
 | 
						|
		phys = []
 | 
						|
 | 
						|
		try:
 | 
						|
			iw = pyroute2.iwutil.IW()
 | 
						|
		except:
 | 
						|
			iw = pyroute2.IW()
 | 
						|
 | 
						|
		attrs = [phy['attrs'] for phy in iw.list_wiphy()]
 | 
						|
 | 
						|
		for attr in attrs:
 | 
						|
			for key, value in attr:
 | 
						|
				if key == 'NL80211_ATTR_WIPHY_NAME':
 | 
						|
					if value not in phys:
 | 
						|
						phys.append(value)
 | 
						|
					break
 | 
						|
 | 
						|
		print('Discovered radios: %s' % str(phys))
 | 
						|
		self.radios = [Radio(name) for name in phys]
 | 
						|
 | 
						|
	def start_radios(self):
 | 
						|
		reg_domain = self.hw_config['SETUP'].get('reg_domain', None)
 | 
						|
		if reg_domain:
 | 
						|
			Process(['iw', 'reg', 'set', reg_domain]).wait()
 | 
						|
 | 
						|
		if self.args.hw:
 | 
						|
			self.discover_radios()
 | 
						|
		else:
 | 
						|
			self.create_radios()
 | 
						|
 | 
						|
	def start_hostapd(self):
 | 
						|
		if not 'HOSTAPD' in self.hw_config:
 | 
						|
			return
 | 
						|
 | 
						|
		settings = self.hw_config['HOSTAPD']
 | 
						|
 | 
						|
		if self.args.hw:
 | 
						|
			# Just grab the first N radios. It gets rather
 | 
						|
			# complicated trying to map radX radios specified in
 | 
						|
			# hw.conf so any passed through physical adapters are
 | 
						|
			# just given to hostapd/IWD as they appear during
 | 
						|
			# discovery.
 | 
						|
			#
 | 
						|
			# TODO: It may be desireable to map PCI/USB adapters to
 | 
						|
			#       specific radX radios specified in the config but
 | 
						|
			#       there are really 2 separate use cases here.
 | 
						|
			#       1. You want to test a *specific* radio with IWD
 | 
						|
			#          or hostapd. For this you would want radX
 | 
						|
			#          to map to a specific radio
 | 
						|
			#       2. You have many adapters in use to run multiple
 | 
						|
			#          tests. In this case you would not care what
 | 
						|
			#          was using each radio, just that there was
 | 
						|
			#          enough to run all tests.
 | 
						|
			nradios = 0
 | 
						|
			for k, _ in settings.items():
 | 
						|
				if k == 'radius_server':
 | 
						|
					continue
 | 
						|
				nradios += 1
 | 
						|
 | 
						|
			hapd_radios = self.radios[:nradios]
 | 
						|
 | 
						|
		else:
 | 
						|
			hapd_radios = [rad for rad in self.radios if rad.name in settings]
 | 
						|
 | 
						|
		hapd_configs = [conf for rad, conf in settings.items() if rad != 'radius_server']
 | 
						|
 | 
						|
		radius_config = settings.get('radius_server', None)
 | 
						|
 | 
						|
		self.hostapd = Hostapd(self, hapd_radios, hapd_configs, radius_config)
 | 
						|
		self.hostapd.attach_cli()
 | 
						|
 | 
						|
	def get_frequencies(self):
 | 
						|
		frequencies = []
 | 
						|
 | 
						|
		for hapd in self.hostapd.instances:
 | 
						|
			frequencies.append(hapd.cli.frequency)
 | 
						|
 | 
						|
		return frequencies
 | 
						|
 | 
						|
	def start_wpas_interfaces(self):
 | 
						|
 | 
						|
		if 'WPA_SUPPLICANT' not in self.hw_config:
 | 
						|
			return
 | 
						|
 | 
						|
		settings = self.hw_config['WPA_SUPPLICANT']
 | 
						|
 | 
						|
		if self.args.hw:
 | 
						|
			nradios = len(settings.items())
 | 
						|
 | 
						|
			wpas_radios = self.radios[:nradios]
 | 
						|
			self.wpas_interfaces = []
 | 
						|
 | 
						|
			#
 | 
						|
			# Physical radios most likely will use a different name
 | 
						|
			# than 'rad#' but the config file is referenced by these
 | 
						|
			# 'rad#' names. Iterate through both the settings and
 | 
						|
			# physical radios to create interfaces associated with
 | 
						|
			# each config file.
 | 
						|
			#
 | 
						|
			for vrad, hwrad in zip(settings.items(), wpas_radios):
 | 
						|
				self.wpas_interfaces.append(hwrad.create_interface(vrad[1], 'wpas'))
 | 
						|
 | 
						|
		else:
 | 
						|
			wpas_radios = [rad for rad in self.radios if rad.name in settings]
 | 
						|
			self.wpas_interfaces = [rad.create_interface(settings[rad.name], 'wpas') \
 | 
						|
						for rad in wpas_radios]
 | 
						|
 | 
						|
	def start_ofono(self):
 | 
						|
		sim_keys = self.hw_config['SETUP'].get('sim_keys', None)
 | 
						|
		if not sim_keys:
 | 
						|
			print("Ofono not requred")
 | 
						|
			return
 | 
						|
		elif sim_keys != 'ofono':
 | 
						|
			os.environ['IWD_SIM_KEYS'] = sim_keys
 | 
						|
			return
 | 
						|
 | 
						|
		if not find_binary(['ofonod']) or not find_binary(['phonesim']):
 | 
						|
			print("Ofono or Phonesim not found, skipping test")
 | 
						|
			return
 | 
						|
 | 
						|
		Process(['ip', 'link', 'set', 'lo', 'up']).wait()
 | 
						|
 | 
						|
		os.environ['OFONO_PHONESIM_CONFIG'] = '/tmp/phonesim.conf'
 | 
						|
 | 
						|
		phonesim_args = ['phonesim', '-p', '12345', '/usr/share/phonesim/default.xml']
 | 
						|
 | 
						|
		self.start_process(phonesim_args)
 | 
						|
 | 
						|
		#
 | 
						|
		# TODO:
 | 
						|
		# Is there something to wait for? Without this phonesim rejects
 | 
						|
		# connections on all but the fist test.
 | 
						|
		#
 | 
						|
		time.sleep(3)
 | 
						|
 | 
						|
		ofono_args = ['ofonod', '-n', '--plugin=atmodem,phonesim']
 | 
						|
		if self.is_verbose('ofonod'):
 | 
						|
			ofono_args.append('-d')
 | 
						|
 | 
						|
		self.start_process(ofono_args)
 | 
						|
 | 
						|
		print("Ofono started")
 | 
						|
 | 
						|
	def create_namespaces(self):
 | 
						|
		if not self.hw_config.has_section('NameSpaces'):
 | 
						|
			return
 | 
						|
 | 
						|
		for key, value in self.hw_config.items('NameSpaces'):
 | 
						|
			radio_names = value.split(',')
 | 
						|
			# Gather up radio objects for this namespace
 | 
						|
			radios = [rad for rad in self.radios if rad.name in radio_names]
 | 
						|
 | 
						|
			# Remove radios from 'root' namespace
 | 
						|
			self.radios = list(set(self.radios) - set(radios))
 | 
						|
 | 
						|
			self.namespaces.append(Namespace(self.args, key, radios))
 | 
						|
 | 
						|
	def get_namespace(self, ns):
 | 
						|
		for n in self.namespaces:
 | 
						|
			if n.name == ns:
 | 
						|
				return n
 | 
						|
 | 
						|
		return None
 | 
						|
 | 
						|
	def stop_test_processes(self):
 | 
						|
		for n in self.namespaces:
 | 
						|
			n.reset()
 | 
						|
 | 
						|
		self.namespaces = []
 | 
						|
		self.hostapd = None
 | 
						|
		self.wpas_interfaces = None
 | 
						|
 | 
						|
		self.reset()
 | 
						|
 | 
						|
	def __str__(self):
 | 
						|
		ret = 'Arguments:\n'
 | 
						|
		for arg in vars(self.args):
 | 
						|
			ret += '\t --%s %s\n' % (arg, str(getattr(self.args, arg)))
 | 
						|
 | 
						|
		ret += 'Hostapd:\n'
 | 
						|
		if self.hostapd:
 | 
						|
			for h in self.hostapd.instances:
 | 
						|
				ret += '\t%s\n' % str(h)
 | 
						|
		else:
 | 
						|
			ret += '\tNo Hostapd instances\n'
 | 
						|
 | 
						|
		ret += super().__str__()
 | 
						|
 | 
						|
		for n in self.namespaces:
 | 
						|
			ret += n.__str__()
 | 
						|
 | 
						|
		return ret
 | 
						|
 | 
						|
def prepare_sandbox():
 | 
						|
	print('Preparing sandbox')
 | 
						|
 | 
						|
	for entry in mount_table:
 | 
						|
		try:
 | 
						|
			os.lstat(entry.target)
 | 
						|
		except:
 | 
						|
			os.mkdir(entry.target, 755)
 | 
						|
 | 
						|
		mount(entry.fstype, entry.target, entry.fstype, entry.flags,
 | 
						|
			entry.options)
 | 
						|
 | 
						|
	for entry in dev_table:
 | 
						|
		os.symlink(entry.target, entry.linkpath)
 | 
						|
 | 
						|
	os.mkdir('/tmp/iwd')
 | 
						|
 | 
						|
	os.setsid()
 | 
						|
 | 
						|
	fcntl.ioctl(STDIN_FILENO, TIOCSTTY, 1)
 | 
						|
 | 
						|
def build_unit_list(args):
 | 
						|
	'''
 | 
						|
		Build list of unit tests based on passed arguments. This first
 | 
						|
		checks for literal names provided in the arguments, then if
 | 
						|
		no matches were found, checks for a glob match.
 | 
						|
	'''
 | 
						|
	tests = []
 | 
						|
	test_root = args.testhome + '/unit'
 | 
						|
 | 
						|
	for unit in args.unit_tests.split(','):
 | 
						|
		path = '%s/%s' % (test_root, unit)
 | 
						|
		if os.access(unit, os.X_OK):
 | 
						|
			tests.append(unit)
 | 
						|
		elif os.access(path, os.X_OK):
 | 
						|
			tests.append(path)
 | 
						|
		else:
 | 
						|
			# Full list or glob, first build up valid list of tests
 | 
						|
			matches = glob(path)
 | 
						|
			if matches == []:
 | 
						|
				raise Exception("Could not find test %s" % unit)
 | 
						|
 | 
						|
			matches = [exe for exe in matches if os.access(exe, os.X_OK)]
 | 
						|
 | 
						|
			tests.extend(matches)
 | 
						|
 | 
						|
	return sorted(tests)
 | 
						|
 | 
						|
def build_test_list(args):
 | 
						|
	'''
 | 
						|
		Build list of auto test directories based on passed arguments.
 | 
						|
		First check for absolute paths, then look in <iwd>/autotests,
 | 
						|
		then glob match.
 | 
						|
	'''
 | 
						|
	tests = []
 | 
						|
	test_root = args.testhome + '/autotests'
 | 
						|
 | 
						|
	# Run all tests
 | 
						|
	if not args.auto_tests:
 | 
						|
		# --shell with no tests implies 'shell' test
 | 
						|
		if args.shell:
 | 
						|
			return [test_root + '/shell']
 | 
						|
 | 
						|
		# Get list of all autotests (committed in git)
 | 
						|
		tests = os.popen('git -C %s ls-files autotests/ | cut -f2 -d"/" \
 | 
						|
					| grep "test*" | uniq' % args.testhome).read() \
 | 
						|
					.strip().split('\n')
 | 
						|
		tests = [test_root + '/' + t for t in tests]
 | 
						|
	else:
 | 
						|
		print("Generating partial test list")
 | 
						|
 | 
						|
		full_list = sorted(os.listdir(test_root))
 | 
						|
 | 
						|
		for t in args.auto_tests.split(','):
 | 
						|
			path = '%s/%s' % (test_root, t)
 | 
						|
			if t.endswith('+'):
 | 
						|
				t = t.split('+')[0]
 | 
						|
				i = full_list.index(t)
 | 
						|
 | 
						|
				tests = [test_root + '/' + x for x in full_list[i:] \
 | 
						|
							if x.startswith('test')]
 | 
						|
			elif os.path.exists(t):
 | 
						|
				if t not in tests:
 | 
						|
					tests.append(t)
 | 
						|
			elif os.path.exists(path):
 | 
						|
				if path not in tests:
 | 
						|
					tests.append(path)
 | 
						|
			else:
 | 
						|
				matches = glob(path)
 | 
						|
				if matches == []:
 | 
						|
					raise Exception("Could not find test %s" % t)
 | 
						|
 | 
						|
				tests.extend(list(set(matches) - set(tests)))
 | 
						|
 | 
						|
	return sorted(tests)
 | 
						|
 | 
						|
SimpleResult = namedtuple('SimpleResult', 'run failures errors skipped time')
 | 
						|
 | 
						|
def start_test(ctx, subtests, rqueue):
 | 
						|
	'''
 | 
						|
		Run an individual test. 'subtests' are parsed prior to calling
 | 
						|
		but these effectively make up a single test. 'rqueue' is the
 | 
						|
		results queue which is required since this is using
 | 
						|
		multiprocessing.
 | 
						|
	'''
 | 
						|
	run = 0
 | 
						|
	errors = 0
 | 
						|
	failures = 0
 | 
						|
	skipped = 0
 | 
						|
 | 
						|
	start = time.time()
 | 
						|
	#
 | 
						|
	# Iterate through each individual python test.
 | 
						|
	#
 | 
						|
	for s in subtests:
 | 
						|
		loader = unittest.TestLoader()
 | 
						|
		module = importlib.import_module(os.path.splitext(s)[0])
 | 
						|
		subtest = loader.loadTestsFromModule(module)
 | 
						|
 | 
						|
		# The test suite is being (ab)used to get a bit more granularity
 | 
						|
		# with individual tests. The 'normal' way to use unittest is to
 | 
						|
		# just create a test suite and run them. The problem here is that
 | 
						|
		# test results are queued and printed at the very end so its
 | 
						|
		# difficult to know *where* a test failed (python gives a stack
 | 
						|
		# trace but printing the exception/failure immediately shows
 | 
						|
		# where in the debug logs something failed). Moreso if there are
 | 
						|
		# several test functions inside a single python file they run
 | 
						|
		# as a single test and it is difficult (again) to know where
 | 
						|
		# something failed.
 | 
						|
 | 
						|
		# Iterating through each python test file
 | 
						|
		for test in subtest:
 | 
						|
			limit_funcs = []
 | 
						|
 | 
						|
			if ctx.args.sub_tests:
 | 
						|
				for i in ctx.args.sub_tests:
 | 
						|
					if len(i.split('.')) == 2:
 | 
						|
						limit_funcs.append(i.split('.')[1])
 | 
						|
 | 
						|
			# Iterating through individual test functions inside a
 | 
						|
			# Test() class. Due to the nature of unittest we have
 | 
						|
			# to jump through some hoops to set up the test class
 | 
						|
			# only once by turning the enumeration into a list, then
 | 
						|
			# enumerating (again) to keep track of the index (just
 | 
						|
			# enumerating the test class doesn't allow len() because
 | 
						|
			# it is not a list).
 | 
						|
			tlist = list(enumerate(test))
 | 
						|
			for index, t in enumerate(tlist):
 | 
						|
				# enumerate is returning a tuple, index 1 is our
 | 
						|
				# actual object.
 | 
						|
				t = t[1]
 | 
						|
 | 
						|
				func, file = str(t).split(' ')
 | 
						|
				#
 | 
						|
				# TODO: There may be a better way of doing this
 | 
						|
				# but strigifying the test class gives us a string:
 | 
						|
				# <function> (<file>.<class>)
 | 
						|
				#
 | 
						|
				file = file.strip('()').split('.')[0] + '.py'
 | 
						|
 | 
						|
				# Create an empty result here in case the test fails
 | 
						|
				result = TestResult()
 | 
						|
 | 
						|
				try:
 | 
						|
					skip = len(limit_funcs) > 0 and func not in limit_funcs
 | 
						|
 | 
						|
					# Set up class only on first test
 | 
						|
					if index == 0:
 | 
						|
						if not skip:
 | 
						|
							dbg("%s\n\t%s RUNNING" % (file, str(func)), end='')
 | 
						|
						t.setUpClass()
 | 
						|
					else:
 | 
						|
						if not skip:
 | 
						|
							dbg("\t%s RUNNING" % str(func), end='')
 | 
						|
 | 
						|
					sys.__stdout__.flush()
 | 
						|
 | 
						|
					Process.write_separators("\n====== %s:%s ======\n\n" % (file, func))
 | 
						|
 | 
						|
					if not skip:
 | 
						|
						# Run test (setUp/tearDown run automatically)
 | 
						|
						result = t()
 | 
						|
 | 
						|
					# Tear down class only on last test
 | 
						|
					if index == len(tlist) - 1:
 | 
						|
						t.tearDownClass()
 | 
						|
 | 
						|
					if skip:
 | 
						|
						continue
 | 
						|
				except unittest.SkipTest as e:
 | 
						|
					result.skipped.append(t)
 | 
						|
				except Exception as e:
 | 
						|
					dbg('\n%s threw an uncaught exception:' % func)
 | 
						|
					traceback.print_exc(file=sys.__stdout__)
 | 
						|
 | 
						|
				run += result.testsRun
 | 
						|
				errors += len(result.errors)
 | 
						|
				failures += len(result.failures)
 | 
						|
				skipped += len(result.skipped)
 | 
						|
 | 
						|
				if len(result.skipped) > 0:
 | 
						|
					dbg(colored(" SKIPPED", "cyan"))
 | 
						|
				elif run == 0 or len(result.errors) > 0 or len(result.failures) > 0:
 | 
						|
					dbg(colored(" FAILED", "red"))
 | 
						|
					for e in result.errors:
 | 
						|
						dbg(e[1])
 | 
						|
					for f in result.failures:
 | 
						|
						dbg(f[1])
 | 
						|
				else:
 | 
						|
					dbg(colored(" PASSED", "green"))
 | 
						|
 | 
						|
		# Prevents future test modules with the same name (e.g.
 | 
						|
		# connection_test.py) from being loaded from the cache
 | 
						|
		sys.modules.pop(module.__name__)
 | 
						|
 | 
						|
	#
 | 
						|
	# The multiprocessing queue is picky with what objects it will serialize
 | 
						|
	# and send between processes. Because of this we put the important bits
 | 
						|
	# of the result into our own 'SimpleResult' tuple.
 | 
						|
	#
 | 
						|
	sresult = SimpleResult(run=run, failures=failures, errors=errors,
 | 
						|
				skipped=skipped, time=time.time() - start)
 | 
						|
	rqueue.put(sresult)
 | 
						|
 | 
						|
	# This may not be required since we are manually popping sys.modules
 | 
						|
	importlib.invalidate_caches()
 | 
						|
 | 
						|
def pre_test(ctx, test, copied):
 | 
						|
	'''
 | 
						|
		Copy test files, start processes, and any other pre test work.
 | 
						|
	'''
 | 
						|
	os.chdir(test)
 | 
						|
 | 
						|
	dbg("\nStarting %s" % colored(os.path.basename(test), "white", attrs=['bold']))
 | 
						|
	if not os.path.exists(test + '/hw.conf'):
 | 
						|
		raise Exception("No hw.conf found for %s" % test)
 | 
						|
 | 
						|
	ctx.hw_config = ConfigParser()
 | 
						|
	ctx.hw_config.read(test + '/hw.conf')
 | 
						|
	#
 | 
						|
	# We have two types of test files: tests and everything else. Rather
 | 
						|
	# than require each test to specify the files needing to be copied to
 | 
						|
	# /tmp (previously 'tmpfs_extra_stuff'), we just copy everything which
 | 
						|
	# isn't a test. There is really no reason not to do this as any file
 | 
						|
	# present in a test directory should be needed by the test.
 | 
						|
	#
 | 
						|
	# All files
 | 
						|
	files = os.listdir(test)
 | 
						|
	# Tests (starts or ends with 'test')
 | 
						|
	subtests = [f for f in files if f.startswith('test') or \
 | 
						|
			os.path.splitext(f)[0].endswith('test')]
 | 
						|
	# Everything else (except .py files)
 | 
						|
	to_copy = [f for f in list(set(files) - set(subtests)) if not f.endswith('.py') \
 | 
						|
								and f != '__pycache__']
 | 
						|
	for f in to_copy:
 | 
						|
		if os.path.isdir(f):
 | 
						|
			shutil.copytree(f, '/tmp/' + f)
 | 
						|
		else:
 | 
						|
			shutil.copy(f, '/tmp')
 | 
						|
		copied.append(f)
 | 
						|
 | 
						|
	# Prune down any subtests if needed
 | 
						|
	if ctx.args.sub_tests:
 | 
						|
		ctx.args.sub_tests = ctx.args.sub_tests.split(',')
 | 
						|
 | 
						|
		to_run = [x.split('.')[0] for x in ctx.args.sub_tests]
 | 
						|
		pruned = []
 | 
						|
 | 
						|
		for s in subtests:
 | 
						|
			no_ext = s
 | 
						|
			# Handle <file>.<test function> format
 | 
						|
			if '.' in s:
 | 
						|
				no_ext = s.split('.')[0]
 | 
						|
 | 
						|
			if no_ext in to_run:
 | 
						|
				pruned.append(no_ext + '.py')
 | 
						|
 | 
						|
		subtests = pruned
 | 
						|
 | 
						|
	ctx.start_dbus()
 | 
						|
	ctx.start_haveged()
 | 
						|
	ctx.start_dbus_monitor()
 | 
						|
	ctx.start_radios()
 | 
						|
	ctx.create_namespaces()
 | 
						|
	ctx.start_hostapd()
 | 
						|
	ctx.start_wpas_interfaces()
 | 
						|
	ctx.start_ofono()
 | 
						|
 | 
						|
	if ctx.args.log:
 | 
						|
		ctx.start_process(['iwmon', '--nowiphy'])
 | 
						|
	elif ctx.args.monitor:
 | 
						|
		ctx.start_process(['iwmon', '--nowiphy'], outfile=ctx.args.monitor)
 | 
						|
 | 
						|
	if ctx.hw_config.has_option('SETUP', 'start_iwd'):
 | 
						|
		start = ctx.hw_config.getboolean('SETUP', 'start_iwd')
 | 
						|
	else:
 | 
						|
		start = True
 | 
						|
 | 
						|
	if start:
 | 
						|
		ctx.start_iwd()
 | 
						|
	else:
 | 
						|
		print("Not starting IWD from test-runner")
 | 
						|
 | 
						|
	print(ctx)
 | 
						|
 | 
						|
	sys.path.insert(1, test)
 | 
						|
 | 
						|
	return sorted(subtests)
 | 
						|
 | 
						|
def post_test(ctx, to_copy):
 | 
						|
	'''
 | 
						|
		Remove copied files, and stop test processes.
 | 
						|
	'''
 | 
						|
	try:
 | 
						|
		for f in to_copy:
 | 
						|
			if os.path.isdir('/tmp/' + f):
 | 
						|
				shutil.rmtree('/tmp/' + f)
 | 
						|
			else:
 | 
						|
				os.remove('/tmp/' + f)
 | 
						|
 | 
						|
		Process(['ip', 'link', 'set', 'lo', 'down']).wait()
 | 
						|
	except Exception as e:
 | 
						|
		print("Exception thrown in post_test")
 | 
						|
	finally:
 | 
						|
		ctx.stop_test_processes()
 | 
						|
 | 
						|
	if ctx.args.valgrind:
 | 
						|
		for f in os.listdir('/tmp'):
 | 
						|
			if f.startswith("valgrind.log."):
 | 
						|
				dbg(f)
 | 
						|
				with open('/tmp/' + f, 'r') as v:
 | 
						|
					dbg(v.read())
 | 
						|
				dbg("\n")
 | 
						|
				os.remove('/tmp/' + f)
 | 
						|
 | 
						|
	# Special case for when logging is enabled
 | 
						|
	if os.path.isfile('/tmp/iwd-tls-debug-server-cert.pem'):
 | 
						|
		os.remove('/tmp/iwd-tls-debug-server-cert.pem')
 | 
						|
 | 
						|
	allowed = ['phonesim.conf', 'certs', 'secrets', 'iwd']
 | 
						|
	for f in [f for f in os.listdir('/tmp') if f not in allowed]:
 | 
						|
		dbg("File %s was not cleaned up!" % f)
 | 
						|
		try:
 | 
						|
			os.remove('/tmp/' + f)
 | 
						|
		except:
 | 
						|
			pass
 | 
						|
 | 
						|
def print_results(results):
 | 
						|
	table = PrettyTable(['Test', colored('Passed', 'green'), colored('Failed', 'red'), \
 | 
						|
				colored('Skipped', 'cyan'), colored('Time', 'yellow')])
 | 
						|
 | 
						|
	total_pass = 0
 | 
						|
	total_fail = 0
 | 
						|
	total_skip = 0
 | 
						|
	total_time = 0
 | 
						|
 | 
						|
	for test, result in results.items():
 | 
						|
 | 
						|
		if result.time == TEST_MAX_TIMEOUT:
 | 
						|
			failed = "Timed out"
 | 
						|
			passed = "Timed out"
 | 
						|
		elif result.time == 0:
 | 
						|
			failed = "Exception"
 | 
						|
			passed = "Exception"
 | 
						|
		else:
 | 
						|
			failed = result.failures + result.errors
 | 
						|
			passed = result.run - failed
 | 
						|
 | 
						|
			total_pass += passed
 | 
						|
			total_fail += failed
 | 
						|
			total_skip += result.skipped
 | 
						|
 | 
						|
		total_time += result.time
 | 
						|
 | 
						|
		time = '%.2f' % result.time
 | 
						|
 | 
						|
		table.add_row([test, colored(passed, 'green'), colored(failed, 'red'), \
 | 
						|
				colored(result.skipped, 'cyan'), colored(time, 'yellow')])
 | 
						|
 | 
						|
	total_time = '%.2f' % total_time
 | 
						|
 | 
						|
	table.add_row(['Total', colored(total_pass, 'green'), colored(total_fail, 'red'), \
 | 
						|
			colored(total_skip, 'cyan'), colored(total_time, 'yellow')])
 | 
						|
 | 
						|
	dbg(table)
 | 
						|
 | 
						|
def run_auto_tests(ctx, args):
 | 
						|
	tests = build_test_list(args)
 | 
						|
 | 
						|
	# Copy autotests/misc/{certs,secrets,phonesim} so any test can refer to them
 | 
						|
	shutil.copytree(args.testhome + '/autotests/misc/certs', '/tmp/certs')
 | 
						|
	shutil.copytree(args.testhome + '/autotests/misc/secrets', '/tmp/secrets')
 | 
						|
	shutil.copy(args.testhome + '/autotests/misc/phonesim/phonesim.conf', '/tmp')
 | 
						|
 | 
						|
	for test in tests:
 | 
						|
		copied = []
 | 
						|
		try:
 | 
						|
			subtests = pre_test(ctx, test, copied)
 | 
						|
 | 
						|
			if args.shell:
 | 
						|
				#
 | 
						|
				# Shell really isn't meant to be used with multiple tests. If
 | 
						|
				# a set of tests was passed in just start out in the first.
 | 
						|
				#
 | 
						|
				os.chdir(tests[0])
 | 
						|
				os.environ['DBUS_SYSTEM_BUS_ADDRESS'] = ctx.dbus_address
 | 
						|
				os.system('/bin/bash')
 | 
						|
				exit()
 | 
						|
 | 
						|
			if len(subtests) < 1:
 | 
						|
				dbg("No tests to run")
 | 
						|
				exit()
 | 
						|
 | 
						|
			rqueue = multiprocessing.Queue()
 | 
						|
			p = multiprocessing.Process(target=start_test, args=(ctx, subtests, rqueue))
 | 
						|
			p.start()
 | 
						|
			# Rather than time each subtest we just time the total but
 | 
						|
			# mutiply the default time by the number of tests being run.
 | 
						|
			p.join(TEST_MAX_TIMEOUT * len(subtests))
 | 
						|
 | 
						|
			if p.is_alive():
 | 
						|
				# Timeout
 | 
						|
				p.terminate()
 | 
						|
 | 
						|
				ctx.results[os.path.basename(test)] = SimpleResult(run=0,
 | 
						|
								failures=0, errors=0,
 | 
						|
								skipped=0, time=TEST_MAX_TIMEOUT)
 | 
						|
			else:
 | 
						|
				ctx.results[os.path.basename(test)] = rqueue.get()
 | 
						|
 | 
						|
		except Exception as ex:
 | 
						|
			dbg("%s threw an uncaught exception" % test)
 | 
						|
			traceback.print_exc(file=sys.__stdout__)
 | 
						|
			ctx.results[os.path.basename(test)] = SimpleResult(run=0, failures=0,
 | 
						|
								errors=0, skipped=0, time=0)
 | 
						|
		finally:
 | 
						|
			post_test(ctx, copied)
 | 
						|
 | 
						|
	shutil.rmtree('/tmp/iwd')
 | 
						|
	shutil.rmtree('/tmp/certs')
 | 
						|
	shutil.rmtree('/tmp/secrets')
 | 
						|
	os.remove('/tmp/phonesim.conf')
 | 
						|
 | 
						|
def run_unit_tests(ctx, args):
 | 
						|
	os.chdir(args.testhome + '/unit')
 | 
						|
	units = build_unit_list(args)
 | 
						|
 | 
						|
	for u in units:
 | 
						|
		p = ctx.start_process([u]).wait()
 | 
						|
		if p.returncode != 0:
 | 
						|
			dbg("Unit test %s failed" % os.path.basename(u))
 | 
						|
		else:
 | 
						|
			dbg("Unit test %s passed" % os.path.basename(u))
 | 
						|
 | 
						|
def run_tests():
 | 
						|
	global config
 | 
						|
 | 
						|
	with open('/proc/cmdline', 'r') as f:
 | 
						|
		cmdline = f.read()
 | 
						|
 | 
						|
	start = cmdline.find('--testhome')
 | 
						|
 | 
						|
	options = shlex.split(cmdline[start:])
 | 
						|
 | 
						|
	parser = argparse.ArgumentParser()
 | 
						|
	parser.add_argument('--testhome')
 | 
						|
	parser.add_argument('--auto_tests')
 | 
						|
	parser.add_argument('--unit_tests')
 | 
						|
	parser.add_argument('--verbose', default=[])
 | 
						|
	parser.add_argument('--debug')
 | 
						|
	parser.add_argument('--path')
 | 
						|
	parser.add_argument('--valgrind')
 | 
						|
	parser.add_argument('--gdb')
 | 
						|
	parser.add_argument('--shell')
 | 
						|
	parser.add_argument('--log')
 | 
						|
	parser.add_argument('--log-gid')
 | 
						|
	parser.add_argument('--log-uid')
 | 
						|
	parser.add_argument('--hw')
 | 
						|
	parser.add_argument('--monitor')
 | 
						|
	parser.add_argument('--sub_tests')
 | 
						|
 | 
						|
	args = parser.parse_args(options)
 | 
						|
 | 
						|
	#
 | 
						|
	# This prevents any print() calls in this script from printing unless
 | 
						|
	# --debug is passed. For an 'always print' option use dbg()
 | 
						|
	#
 | 
						|
	if not args.debug:
 | 
						|
		sys.stdout = open(os.devnull, 'w')
 | 
						|
 | 
						|
	if args.verbose != []:
 | 
						|
		args.verbose = args.verbose.split(',')
 | 
						|
 | 
						|
	os.environ['PATH'] = '%s/src' % args.testhome
 | 
						|
	os.environ['PATH'] += ':%s/tools' % args.testhome
 | 
						|
	os.environ['PATH'] += ':%s/client' % args.testhome
 | 
						|
	os.environ['PATH'] += ':%s/monitor' % args.testhome
 | 
						|
	os.environ['PATH'] += ':%s/wired' % args.testhome
 | 
						|
	os.environ['PATH'] += ':' + args.path
 | 
						|
 | 
						|
	sys.path.append(args.testhome + '/autotests/util')
 | 
						|
 | 
						|
	#
 | 
						|
	# This allows all autotest utils (iwd/hostapd/etc) to access the
 | 
						|
	# TestContext. Any other module or script (in the same interpreter) can
 | 
						|
	# simply import config.ctx and access all live test information,
 | 
						|
	# start/stop processes, see active radios etc.
 | 
						|
	#
 | 
						|
	config = importlib.import_module('config')
 | 
						|
	config.ctx = TestContext(args)
 | 
						|
 | 
						|
	# Must import these after config so ctx gets set
 | 
						|
	config.hwsim = importlib.import_module('hwsim')
 | 
						|
	config.hostapd = importlib.import_module('hostapd')
 | 
						|
 | 
						|
	if args.log:
 | 
						|
		mount('logdir', args.log, '9p', 0, 'trans=virtio,version=9p2000.L,msize=10240')
 | 
						|
		# Clear out any log files from other test runs
 | 
						|
		for f in glob('%s/*' % args.log):
 | 
						|
			print("removing %s" % f)
 | 
						|
 | 
						|
			if os.path.isdir(f):
 | 
						|
				shutil.rmtree(f)
 | 
						|
			else:
 | 
						|
				os.remove(f)
 | 
						|
 | 
						|
		# Start writing out kernel log
 | 
						|
		config.ctx.start_process(["dmesg", '--follow'])
 | 
						|
	elif args.monitor:
 | 
						|
		parent = os.path.abspath(os.path.join(args.monitor, os.pardir))
 | 
						|
		mount('mondir', parent, '9p', 0, 'trans=virtio,version=9p2000.L,msize=10240')
 | 
						|
 | 
						|
	if config.ctx.args.unit_tests is None:
 | 
						|
		run_auto_tests(config.ctx, args)
 | 
						|
	else:
 | 
						|
		run_unit_tests(config.ctx, args)
 | 
						|
 | 
						|
class Main:
 | 
						|
	def __init__(self):
 | 
						|
		self.parser = argparse.ArgumentParser(
 | 
						|
				description='IWD Test Runner')
 | 
						|
 | 
						|
		self.parser.add_argument('--qemu', '-q',
 | 
						|
				metavar='<QEMU binary>', type=str,
 | 
						|
				help='QEMU binary to use',
 | 
						|
				dest='qemu',
 | 
						|
				default=None)
 | 
						|
		self.parser.add_argument('--kernel', '-k', metavar='<kernel>',
 | 
						|
				type=str,
 | 
						|
				help='Path to kernel image',
 | 
						|
				dest='kernel',
 | 
						|
				default=None)
 | 
						|
		self.parser.add_argument('--verbose', '-v', metavar='<list>',
 | 
						|
				type=str,
 | 
						|
				help='Comma separated list of applications',
 | 
						|
				dest='verbose',
 | 
						|
				default=[])
 | 
						|
		self.parser.add_argument('--debug', '-d',
 | 
						|
				action='store_true',
 | 
						|
				help='Enable test-runner debugging',
 | 
						|
				dest='debug')
 | 
						|
		self.parser.add_argument('--shell', '-s', action='store_true',
 | 
						|
				help='Boot into shell', dest='shell')
 | 
						|
		self.parser.add_argument('--log', '-l', type=str,
 | 
						|
				help='Directory for log files')
 | 
						|
		self.parser.add_argument('--hw', '-w', type=str, nargs=1,
 | 
						|
				help='Use physical adapters for tests (passthrough)')
 | 
						|
		self.parser.add_argument('--monitor', '-m', type=str,
 | 
						|
				help='Enables iwmon output to file')
 | 
						|
		self.parser.add_argument('--sub-tests', '-S', metavar='<subtests>',
 | 
						|
				type=str, nargs=1, help='List of subtests to run',
 | 
						|
				default=None, dest='sub_tests')
 | 
						|
 | 
						|
		# Prevent --autotest/--unittest from being used together
 | 
						|
		auto_unit_group = self.parser.add_mutually_exclusive_group()
 | 
						|
		auto_unit_group.add_argument('--auto-tests', '-A',
 | 
						|
				metavar='<tests>', type=str, nargs=1,
 | 
						|
				help='List of tests to run',
 | 
						|
				default=None,
 | 
						|
				dest='auto_tests')
 | 
						|
		auto_unit_group.add_argument('--unit-tests', '-U',
 | 
						|
				metavar='<tests>', type=str, nargs='?',
 | 
						|
				const='*',
 | 
						|
				help='List of unit tests to run',
 | 
						|
				dest='unit_tests')
 | 
						|
 | 
						|
		# Prevent --valgrind/--gdb from being used together
 | 
						|
		valgrind_gdb_group = self.parser.add_mutually_exclusive_group()
 | 
						|
		valgrind_gdb_group.add_argument('--gdb', '-g', metavar='<exec>',
 | 
						|
				type=str, nargs=1,
 | 
						|
				help='Run gdb on specified executable',
 | 
						|
				dest='gdb')
 | 
						|
		valgrind_gdb_group.add_argument('--valgrind', '-V', action='store_true',
 | 
						|
				help='Run valgrind on IWD', dest='valgrind')
 | 
						|
 | 
						|
		self.args = self.parser.parse_args()
 | 
						|
 | 
						|
		if self.args.auto_tests:
 | 
						|
			self.args.auto_tests = self.args.auto_tests[0].split(',')
 | 
						|
 | 
						|
		if self.args.sub_tests:
 | 
						|
			self.args.sub_tests = self.args.sub_tests[0].split(',')
 | 
						|
 | 
						|
		if self.args.log and self.args.unit_tests:
 | 
						|
			dbg("Cannot use --log with --unit-tests")
 | 
						|
			quit()
 | 
						|
 | 
						|
		if self.args.sub_tests:
 | 
						|
			if not self.args.auto_tests:
 | 
						|
				dbg("--sub-tests must be used with --auto-tests")
 | 
						|
				quit()
 | 
						|
 | 
						|
			if len(self.args.auto_tests) > 1:
 | 
						|
				dbg("--sub-tests must be used with a single auto test")
 | 
						|
				quit()
 | 
						|
 | 
						|
	def start(self):
 | 
						|
		usb_adapters = None
 | 
						|
		pci_adapters = None
 | 
						|
 | 
						|
		qemu_table = [
 | 
						|
			'qemu-system-x86_64',
 | 
						|
			'/usr/bin/qemu-system-x86_64'
 | 
						|
		]
 | 
						|
 | 
						|
		kernel_table = [
 | 
						|
			'bzImage',
 | 
						|
			'arch/x86/boot/bzImage',
 | 
						|
			'vmlinux',
 | 
						|
			'arch/x86/boot/vmlinux'
 | 
						|
		]
 | 
						|
 | 
						|
		if self.args.qemu is None:
 | 
						|
			qemu_binary = find_binary(qemu_table)
 | 
						|
			if not qemu_binary:
 | 
						|
				print("Could not find qemu binary")
 | 
						|
				quit()
 | 
						|
		else:
 | 
						|
			if path_exists(self.args.qemu):
 | 
						|
				qemu_binary = self.args.qemu
 | 
						|
			else:
 | 
						|
				print("QEMU binary %s does not exist" % \
 | 
						|
						self.args.qemu)
 | 
						|
				quit()
 | 
						|
 | 
						|
		if self.args.kernel is None:
 | 
						|
			kernel_binary = find_binary(kernel_table)
 | 
						|
			if not kernel_binary:
 | 
						|
				print("Could not find kernel image")
 | 
						|
				quit()
 | 
						|
		else:
 | 
						|
			if path_exists(self.args.kernel):
 | 
						|
				kernel_binary = self.args.kernel
 | 
						|
			else:
 | 
						|
				print("Kernel image %s does not exist" % \
 | 
						|
						self.args.kernel)
 | 
						|
				quit()
 | 
						|
 | 
						|
		if self.args.hw:
 | 
						|
			hw_conf = ConfigParser()
 | 
						|
			hw_conf.read(self.args.hw)
 | 
						|
 | 
						|
			if hw_conf.has_section('USBAdapters'):
 | 
						|
				# The actual key name of the adapter
 | 
						|
				# doesn't matter since all we need is the
 | 
						|
				# bus/address. This gets named by the kernel
 | 
						|
				# anyways once in the VM.
 | 
						|
				usb_adapters = [v for v in hw_conf['USBAdapters'].values()]
 | 
						|
 | 
						|
			if hw_conf.has_section('PCIAdapters'):
 | 
						|
				pci_adapters = [v for v in hw_conf['PCIAdapters'].values()]
 | 
						|
 | 
						|
		#
 | 
						|
		# Additional arguments not provided to test-runner which are
 | 
						|
		# needed once booted into the kernel.
 | 
						|
		#
 | 
						|
		options = 'init=%s' % os.path.realpath(sys.argv[0])
 | 
						|
 | 
						|
		# Support running from top level as well as tools
 | 
						|
		if os.getcwd().endswith('tools'):
 | 
						|
			options += ' --testhome %s/../' % os.getcwd()
 | 
						|
		else:
 | 
						|
			options += ' --testhome %s' % os.getcwd()
 | 
						|
 | 
						|
		options += ' --path "%s"' % os.environ['PATH']
 | 
						|
 | 
						|
		if self.args.auto_tests:
 | 
						|
			options += ' --auto_tests %s' % ','.join(self.args.auto_tests)
 | 
						|
 | 
						|
		if self.args.sub_tests:
 | 
						|
			options += ' --sub_tests %s' % ','.join(self.args.sub_tests)
 | 
						|
 | 
						|
		if self.args.log:
 | 
						|
			if os.environ.get('SUDO_GID', None) is None:
 | 
						|
				print("--log can only be used as root user")
 | 
						|
				quit()
 | 
						|
 | 
						|
			self.args.log = os.path.abspath(self.args.log)
 | 
						|
			uid = int(os.environ['SUDO_UID'])
 | 
						|
			gid = int(os.environ['SUDO_GID'])
 | 
						|
 | 
						|
			if not path_exists(self.args.log):
 | 
						|
				os.mkdir(self.args.log)
 | 
						|
				os.chown(self.args.log, uid, gid)
 | 
						|
 | 
						|
			options += ' --log-gid %u' % gid
 | 
						|
			options += ' --log-uid %u' % uid
 | 
						|
 | 
						|
		if self.args.monitor:
 | 
						|
			if os.environ.get('SUDO_GID', None) is None:
 | 
						|
				print("--monitor can only be used as root user")
 | 
						|
				quit()
 | 
						|
 | 
						|
			self.args.monitor = os.path.abspath(self.args.monitor)
 | 
						|
			mon_parent_dir = os.path.abspath(os.path.join(self.args.monitor, os.pardir))
 | 
						|
 | 
						|
			options += ' --log-gid %u' % int(os.environ['SUDO_GID'])
 | 
						|
			options += ' --log-uid %u' % int(os.environ['SUDO_UID'])
 | 
						|
 | 
						|
		denylist = [
 | 
						|
			'auto_tests',
 | 
						|
			'sub_tests',
 | 
						|
			'qemu',
 | 
						|
			'kernel'
 | 
						|
		]
 | 
						|
 | 
						|
		nproc = multiprocessing.cpu_count()
 | 
						|
 | 
						|
		#
 | 
						|
		# Specially handle CPU systems with minimal cores, otherwise
 | 
						|
		# use half the host cores.
 | 
						|
		#
 | 
						|
		if nproc < 2:
 | 
						|
			smp = 1
 | 
						|
		else:
 | 
						|
			smp = int(nproc / 2)
 | 
						|
 | 
						|
		#
 | 
						|
		# Increase RAM if valgrind is being used
 | 
						|
		#
 | 
						|
		if self.args.valgrind:
 | 
						|
			ram = 512
 | 
						|
		else:
 | 
						|
			ram = 384
 | 
						|
 | 
						|
		print("Using %d cores, %d RAM for VM" % (smp, ram))
 | 
						|
 | 
						|
		#
 | 
						|
		# This passes through most of the command line options to
 | 
						|
		# the kernel command line. Some are not relevant (e.g. qemu)
 | 
						|
		# so similar options are added in the denylist above. This excludes
 | 
						|
		# any unset options which are assumed to be None or False. This
 | 
						|
		# is done so default arguments can be filled once in the VM. If
 | 
						|
		# we pass and basic types (None, False etc.) they are turned into
 | 
						|
		# a string representation ('None', 'False', etc.) which is not
 | 
						|
		# desirable.
 | 
						|
		#
 | 
						|
		for arg in vars(self.args):
 | 
						|
			if arg in denylist or getattr(self.args, arg) in [None, False, []]:
 | 
						|
				continue
 | 
						|
			options += ' --%s %s' % (arg, str(getattr(self.args, arg)))
 | 
						|
 | 
						|
		kern_log = "ignore_loglevel" if "kernel" in self.args.verbose else "quiet"
 | 
						|
 | 
						|
		qemu_cmdline = [
 | 
						|
			qemu_binary,
 | 
						|
			'-machine', 'type=q35,accel=kvm:tcg',
 | 
						|
			'-nodefaults', '-no-user-config', '-monitor', 'none',
 | 
						|
			'-display', 'none', '-m', '%dM' % ram, '-nographic', '-vga',
 | 
						|
			'none', '-no-acpi', '-no-hpet',
 | 
						|
			'-no-reboot', '-fsdev',
 | 
						|
			'local,id=fsdev-root,path=/,readonly=on,security_model=none,multidevs=remap',
 | 
						|
			'-device',
 | 
						|
			'virtio-9p-pci,fsdev=fsdev-root,mount_tag=/dev/root',
 | 
						|
			'-chardev', 'stdio,id=chardev-serial0,signal=off',
 | 
						|
			'-device', 'pci-serial,chardev=chardev-serial0',
 | 
						|
			'-device', 'virtio-rng-pci',
 | 
						|
			'-kernel',
 | 
						|
			kernel_binary,
 | 
						|
			'-append',
 | 
						|
			'console=ttyS0,115200n8 earlyprintk=serial \
 | 
						|
				rootfstype=9p root=/dev/root \
 | 
						|
				rootflags=trans=virtio,msize=1048576,version=9p2000.u \
 | 
						|
				acpi=off pci=noacpi %s ro \
 | 
						|
				mac80211_hwsim.radios=0 %s' % (kern_log, options),
 | 
						|
			'-smp', str(smp)
 | 
						|
		]
 | 
						|
 | 
						|
		# Add two ethernet devices for testing EAD
 | 
						|
		qemu_cmdline.extend([
 | 
						|
				'-net', 'nic,model=virtio',
 | 
						|
				'-net', 'nic,model=virtio',
 | 
						|
				'-net', 'user'
 | 
						|
		])
 | 
						|
 | 
						|
		if usb_adapters:
 | 
						|
			for bus, addr in [s.split(',') for s in usb_adapters]:
 | 
						|
				qemu_cmdline.extend(['-usb',
 | 
						|
							'-device',
 | 
						|
							'usb-host,hostbus=%s,hostaddr=%s' % \
 | 
						|
							(bus, addr)])
 | 
						|
		if pci_adapters:
 | 
						|
			qemu_cmdline.extend(['-enable-kvm'])
 | 
						|
			for addr in pci_adapters:
 | 
						|
				qemu_cmdline.extend(['-device', 'vfio-pci,host=%s' % addr])
 | 
						|
 | 
						|
		if self.args.log:
 | 
						|
			#
 | 
						|
			# Creates a virtfs device that can be mounted. This mount
 | 
						|
			# will point back to the provided log directory and is
 | 
						|
			# writable unlike the rest of the mounted file system.
 | 
						|
			#
 | 
						|
			qemu_cmdline.extend([
 | 
						|
				'-virtfs',
 | 
						|
				'local,path=%s,mount_tag=logdir,security_model=passthrough,id=logdir' \
 | 
						|
						% self.args.log
 | 
						|
			])
 | 
						|
 | 
						|
		if self.args.monitor:
 | 
						|
			qemu_cmdline.extend([
 | 
						|
				'-virtfs',
 | 
						|
				'local,path=%s,mount_tag=mondir,security_model=passthrough,id=mondir' \
 | 
						|
						% mon_parent_dir
 | 
						|
			])
 | 
						|
 | 
						|
		os.execlp(qemu_cmdline[0], *qemu_cmdline)
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
	if os.getpid() == 1 and os.getppid() == 0:
 | 
						|
		atexit.register(exit_vm)
 | 
						|
		prepare_sandbox()
 | 
						|
		run_tests()
 | 
						|
 | 
						|
		exit()
 | 
						|
 | 
						|
	main = Main()
 | 
						|
	main.start()
 |