mirror of
				https://git.kernel.org/pub/scm/network/wireless/iwd.git
				synced 2025-10-31 13:17:25 +01:00 
			
		
		
		
	 e70d7e0857
			
		
	
	
		e70d7e0857
		
	
	
	
	
		
			
			Many processes are not long running (e.g. hostapd_cli, ip, iw, etc) and the separators written to log files don't show up for these which makes debugging difficult. This is even true for IWD/Hostapd for tests with start_iwd=0. After writing separators for long running processes write them out for any additional log files too.
		
			
				
	
	
		
			1014 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			1014 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/python3
 | |
| 
 | |
| import os
 | |
| import shutil
 | |
| import sys
 | |
| import subprocess
 | |
| import atexit
 | |
| import time
 | |
| import unittest
 | |
| import importlib
 | |
| from unittest.result import TestResult
 | |
| import multiprocessing
 | |
| import re
 | |
| import traceback
 | |
| 
 | |
| from configparser import ConfigParser
 | |
| from prettytable import PrettyTable
 | |
| from termcolor import colored
 | |
| from glob import glob
 | |
| from collections import namedtuple
 | |
| import dbus.mainloop.glib
 | |
| from gi.repository import GLib
 | |
| 
 | |
| from runner import Runner
 | |
| from utils import Process, Namespace, BarChart
 | |
| 
 | |
| config = None
 | |
| intf_id = 0
 | |
| 
 | |
| TEST_MAX_TIMEOUT = 240
 | |
| 
 | |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
 | |
| 
 | |
| def dbg(*s, **kwargs):
 | |
| 	'''
 | |
| 		Allows prints if stdout has been re-directed
 | |
| 	'''
 | |
| 	print(*s, **kwargs, file=sys.__stdout__)
 | |
| 
 | |
| def exit_vm():
 | |
| 	if config:
 | |
| 		for p in Process.get_all():
 | |
| 			print("Process %s still running!" % p.args[0])
 | |
| 			p.kill()
 | |
| 
 | |
| 		if config.ctx and config.ctx.results:
 | |
| 			success = print_results(config.ctx.results)
 | |
| 		else:
 | |
| 			success = False
 | |
| 
 | |
| 		if config.ctx.args.result:
 | |
| 			result = 'PASS' if success else 'FAIL'
 | |
| 			with open(config.ctx.args.result, 'w') as f:
 | |
| 				f.write(result)
 | |
| 
 | |
| 	os.sync()
 | |
| 
 | |
| 	runner.stop()
 | |
| 
 | |
| class Interface:
 | |
| 	def __init__(self, name, config):
 | |
| 		self.name = name
 | |
| 		self.ctrl_interface = '/var/run/hostapd/' + name
 | |
| 		self.config = config
 | |
| 
 | |
| 	def __del__(self):
 | |
| 		Process(['iw', 'dev', self.name, 'del']).wait()
 | |
| 
 | |
| 	def set_interface_state(self, state):
 | |
| 		Process(['ip', 'link', 'set', self.name, state]).wait()
 | |
| 
 | |
| class Radio:
 | |
| 	def __init__(self, name):
 | |
| 		self.name = name
 | |
| 		# hostapd will reset this if this radio is used by it
 | |
| 		self.use = 'iwd'
 | |
| 		self.interface = None
 | |
| 
 | |
| 	def __del__(self):
 | |
| 		print("Removing radio %s" % self.name)
 | |
| 		self.interface = None
 | |
| 
 | |
| 	def create_interface(self, config, use):
 | |
| 		global intf_id
 | |
| 
 | |
| 		ifname = 'wln%s' % intf_id
 | |
| 
 | |
| 		intf_id += 1
 | |
| 
 | |
| 		self.interface = Interface(ifname, config)
 | |
| 		self.use = use
 | |
| 
 | |
| 		Process(['iw', 'phy', self.name, 'interface', 'add', ifname,
 | |
| 				'type', 'managed']).wait()
 | |
| 
 | |
| 		return self.interface
 | |
| 
 | |
| 	def __str__(self):
 | |
| 		ret = self.name + ':\n'
 | |
| 		ret += '\tUsed By: %s ' % self.use
 | |
| 		if self.interface:
 | |
| 			ret += '(%s)' % self.interface.name
 | |
| 
 | |
| 		ret += '\n'
 | |
| 
 | |
| 		return ret
 | |
| 
 | |
| class VirtualRadio(Radio):
 | |
| 	'''
 | |
| 		A subclass of 'Radio' specific to mac80211_hwsim radios.
 | |
| 
 | |
| 		TODO: Using D-Bus to create and destroy radios is more desireable
 | |
| 		than the command line.
 | |
| 	'''
 | |
| 
 | |
| 	def __init__(self, name, cfg=None):
 | |
| 		global config
 | |
| 
 | |
| 		self.disable_cipher = None
 | |
| 		self.disable_iftype = None
 | |
| 
 | |
| 		self.hwsim = config.hwsim.Hwsim()
 | |
| 
 | |
| 		if cfg:
 | |
| 			self.disable_iftype = cfg.get('iftype_disable', None)
 | |
| 			self.disable_cipher = cfg.get('cipher_disable', None)
 | |
| 
 | |
| 		self._radio = self.hwsim.radios.create(name, p2p_device=True,
 | |
| 					iftype_disable=self.disable_iftype,
 | |
| 					cipher_disable=self.disable_cipher)
 | |
| 
 | |
| 		super().__init__(self._radio.name)
 | |
| 
 | |
| 	def __del__(self):
 | |
| 		super().__del__()
 | |
| 
 | |
| 		# If the radio was moved into a namespace this will fail
 | |
| 		try:
 | |
| 			self._radio.remove()
 | |
| 		except:
 | |
| 			pass
 | |
| 
 | |
| 		self._radio = None
 | |
| 
 | |
| 	def __str__(self):
 | |
| 		ret = super().__str__()
 | |
| 
 | |
| 		if self.disable_iftype:
 | |
| 			ret += '\tDisabled interface types: %s\n' % self.disable_iftype
 | |
| 
 | |
| 		if self.disable_cipher:
 | |
| 			ret += '\tDisabled ciphers: %s\n' % self.disable_cipher
 | |
| 
 | |
| 		ret += '\tPath: %s' % self._radio.path
 | |
| 
 | |
| 		ret += '\n'
 | |
| 
 | |
| 		return ret
 | |
| 
 | |
| class HostapdInstance:
 | |
| 	'''
 | |
| 		A single instance of hostapd. In reality all hostapd instances
 | |
| 		are started as a single process. This class just makes things
 | |
| 		convenient for communicating with one of the hostapd APs.
 | |
| 	'''
 | |
| 	def __init__(self, config, radio):
 | |
| 		self.radio = radio
 | |
| 		self.config = config
 | |
| 		self.cli = None
 | |
| 
 | |
| 		self.intf = radio.create_interface(self.config, 'hostapd')
 | |
| 		self.intf.set_interface_state('up')
 | |
| 
 | |
| 	def __del__(self):
 | |
| 		print("Removing HostapdInstance %s" % self.config)
 | |
| 		self.intf.set_interface_state('down')
 | |
| 		self.radio = None
 | |
| 		self.intf = None
 | |
| 
 | |
| 	def __str__(self):
 | |
| 		ret = 'Hostapd (%s)\n' % self.intf.name
 | |
| 		ret += '\tConfig: %s\n' % self.config
 | |
| 
 | |
| 		return ret
 | |
| 
 | |
| class Hostapd:
 | |
| 	'''
 | |
| 		A set of running hostapd instances. This is really just a single
 | |
| 		process since hostapd can be started with multiple config files.
 | |
| 	'''
 | |
| 	def __init__(self, radios, configs, radius):
 | |
| 		if len(configs) != len(radios):
 | |
| 			raise Exception("Config (%d) and radio (%d) list length not equal" % \
 | |
| 						(len(configs), len(radios)))
 | |
| 
 | |
| 		print("Initializing hostapd instances")
 | |
| 
 | |
| 		Process(['ip', 'link', 'set', 'eth0', 'up']).wait()
 | |
| 		Process(['ip', 'link', 'set', 'eth1', 'up']).wait()
 | |
| 
 | |
| 		self.global_ctrl_iface = '/var/run/hostapd/ctrl'
 | |
| 
 | |
| 		self.instances = [HostapdInstance(c, r) for c, r in zip(configs, radios)]
 | |
| 
 | |
| 		ifaces = [rad.interface.name for rad in radios]
 | |
| 		ifaces = ','.join(ifaces)
 | |
| 
 | |
| 		args = ['hostapd', '-g', self.global_ctrl_iface]
 | |
| 
 | |
| 		if ifaces:
 | |
| 			args.extend(['-i', ifaces])
 | |
| 
 | |
| 		#
 | |
| 		# Config files should already be present in /tmp. This appends
 | |
| 		# ctrl_interface and does any variable replacement. Currently
 | |
| 		# this is just any $ifaceN occurrences.
 | |
| 		#
 | |
| 		for c in configs:
 | |
| 			full_path = '/tmp/%s' % c
 | |
| 			args.append(full_path)
 | |
| 
 | |
| 			self._rewrite_config(full_path)
 | |
| 
 | |
| 		if radius:
 | |
| 			args.append(radius)
 | |
| 
 | |
| 		if Process.is_verbose('hostapd'):
 | |
| 			args.append('-d')
 | |
| 
 | |
| 		self.process = Process(args)
 | |
| 
 | |
| 		self.process.wait_for_socket(self.global_ctrl_iface, 30)
 | |
| 
 | |
| 		for hapd in self.instances:
 | |
| 			self.process.wait_for_socket(hapd.intf.ctrl_interface, 30)
 | |
| 
 | |
| 	def attach_cli(self):
 | |
| 		global config
 | |
| 
 | |
| 		for hapd in self.instances:
 | |
| 			hapd.cli = config.hostapd.HostapdCLI(config=hapd.config)
 | |
| 
 | |
| 	def _rewrite_config(self, config):
 | |
| 		'''
 | |
| 			Replaces any $ifaceN values with the correct interface
 | |
| 			names as well as appends the ctrl_interface path to
 | |
| 			the config file.
 | |
| 		'''
 | |
| 		with open(config, 'r+') as f:
 | |
| 			data = f.read()
 | |
| 			to_replace = []
 | |
| 			for match in re.finditer(r'\$iface[0-9]+', data):
 | |
| 				tag = data[match.start():match.end()]
 | |
| 				idx = tag.split('iface')[1]
 | |
| 
 | |
| 				to_replace.append((tag, self.instances[int(idx)].intf.name))
 | |
| 
 | |
| 			for r in to_replace:
 | |
| 				data = data.replace(r[0], r[1], 1)
 | |
| 
 | |
| 			data += '\nctrl_interface=/var/run/hostapd\n'
 | |
| 
 | |
| 			f.write(data)
 | |
| 
 | |
| 	def __getitem__(self, config):
 | |
| 		if not config:
 | |
| 			return self.instances[0]
 | |
| 
 | |
| 		for hapd in self.instances:
 | |
| 			if hapd.config == config:
 | |
| 				return hapd
 | |
| 
 | |
| 		return None
 | |
| 
 | |
| 	def __del__(self):
 | |
| 		print("Removing Hostapd")
 | |
| 		try:
 | |
| 			os.remove(self.global_ctrl_iface)
 | |
| 		except:
 | |
| 			print("Failed to remove %s" % self.global_ctrl_iface)
 | |
| 
 | |
| 		for hapd in self.instances:
 | |
| 			GLib.source_remove(hapd.cli.io_watch)
 | |
| 
 | |
| 		# Hostapd may have already been stopped
 | |
| 		if self.process:
 | |
| 			self.process.kill()
 | |
| 
 | |
| 		# Hostapd creates simdb sockets for EAP-SIM/AKA tests but does not
 | |
| 		# clean them up.
 | |
| 		for f in glob("/tmp/eap_sim_db*"):
 | |
| 			os.remove(f)
 | |
| 
 | |
| class TestContext(Namespace):
 | |
| 	'''
 | |
| 		Contains all information for a given set of tests being run
 | |
| 		such as processes, radios, interfaces and test results.
 | |
| 	'''
 | |
| 	def __init__(self, args):
 | |
| 		self.name = None
 | |
| 		self.args = args
 | |
| 		self.hw_config = None
 | |
| 		self.hostapd = None
 | |
| 		self.wpas_interfaces = None
 | |
| 		self.radios = []
 | |
| 		self.results = {}
 | |
| 		self.namespaces = []
 | |
| 		self._last_mem_available = 0
 | |
| 		self._mem_chart = BarChart()
 | |
| 
 | |
| 	def start_dbus_monitor(self):
 | |
| 		if not Process.is_verbose('dbus-monitor'):
 | |
| 			return
 | |
| 
 | |
| 		self.start_process(['dbus-monitor', '--address', self.dbus_address])
 | |
| 
 | |
| 	def start_haveged(self):
 | |
| 		self.start_process(['haveged', '-F'])
 | |
| 
 | |
| 	def create_radios(self):
 | |
| 		setup = self.hw_config['SETUP']
 | |
| 		nradios = int(setup['num_radios'])
 | |
| 		args = ['hwsim']
 | |
| 
 | |
| 		if self.hw_config['SETUP'].get('hwsim_medium', 'no') in ['no', '0', 'false']:
 | |
| 			# register hwsim as medium
 | |
| 			args.extend(['--no-register'])
 | |
| 
 | |
| 		self.start_process(args)
 | |
| 		self.non_block_wait(self._bus.name_has_owner, 20, 'net.connman.hwsim',
 | |
| 					exception=TimeoutError('net.connman.hwsim did not appear'))
 | |
| 
 | |
| 		for i in range(nradios):
 | |
| 			name = 'rad%u' % i
 | |
| 
 | |
| 			# Get any [radX] sections. These are for configuring
 | |
| 			# any special radios. This no longer requires a
 | |
| 			# radio_conf list, we just assume radios start rad0
 | |
| 			# and increment.
 | |
| 			rad_config = None
 | |
| 			if self.hw_config.has_section(name):
 | |
| 				rad_config = self.hw_config[name]
 | |
| 
 | |
| 			self.radios.append(VirtualRadio(name, rad_config))
 | |
| 
 | |
| 	def discover_radios(self):
 | |
| 		import pyroute2
 | |
| 
 | |
| 		phys = []
 | |
| 
 | |
| 		try:
 | |
| 			iw = pyroute2.iwutil.IW()
 | |
| 		except:
 | |
| 			iw = pyroute2.IW()
 | |
| 
 | |
| 		attrs = [phy['attrs'] for phy in iw.list_wiphy()]
 | |
| 
 | |
| 		for attr in attrs:
 | |
| 			for key, value in attr:
 | |
| 				if key == 'NL80211_ATTR_WIPHY_NAME':
 | |
| 					if value not in phys:
 | |
| 						phys.append(value)
 | |
| 					break
 | |
| 
 | |
| 		print('Discovered radios: %s' % str(phys))
 | |
| 		self.radios = [Radio(name) for name in phys]
 | |
| 
 | |
| 	def start_radios(self):
 | |
| 		reg_domain = self.hw_config['SETUP'].get('reg_domain', None)
 | |
| 		if reg_domain:
 | |
| 			Process(['iw', 'reg', 'set', reg_domain]).wait()
 | |
| 
 | |
| 		if self.args.hw:
 | |
| 			self.discover_radios()
 | |
| 		else:
 | |
| 			self.create_radios()
 | |
| 
 | |
| 	def start_hostapd(self):
 | |
| 		if not 'HOSTAPD' in self.hw_config:
 | |
| 			return
 | |
| 
 | |
| 		settings = self.hw_config['HOSTAPD']
 | |
| 
 | |
| 		if self.args.hw:
 | |
| 			# Just grab the first N radios. It gets rather
 | |
| 			# complicated trying to map radX radios specified in
 | |
| 			# hw.conf so any passed through physical adapters are
 | |
| 			# just given to hostapd/IWD as they appear during
 | |
| 			# discovery.
 | |
| 			#
 | |
| 			# TODO: It may be desireable to map PCI/USB adapters to
 | |
| 			#       specific radX radios specified in the config but
 | |
| 			#       there are really 2 separate use cases here.
 | |
| 			#       1. You want to test a *specific* radio with IWD
 | |
| 			#          or hostapd. For this you would want radX
 | |
| 			#          to map to a specific radio
 | |
| 			#       2. You have many adapters in use to run multiple
 | |
| 			#          tests. In this case you would not care what
 | |
| 			#          was using each radio, just that there was
 | |
| 			#          enough to run all tests.
 | |
| 			nradios = 0
 | |
| 			for k, _ in settings.items():
 | |
| 				if k == 'radius_server':
 | |
| 					continue
 | |
| 				nradios += 1
 | |
| 
 | |
| 			hapd_radios = self.radios[:nradios]
 | |
| 
 | |
| 		else:
 | |
| 			hapd_radios = [rad for rad in self.radios if rad.name in settings]
 | |
| 
 | |
| 		hapd_configs = [conf for rad, conf in settings.items() if rad != 'radius_server']
 | |
| 
 | |
| 		radius_config = settings.get('radius_server', None)
 | |
| 
 | |
| 		self.hostapd = Hostapd(hapd_radios, hapd_configs, radius_config)
 | |
| 		self.hostapd.attach_cli()
 | |
| 
 | |
| 	def get_frequencies(self):
 | |
| 		frequencies = []
 | |
| 
 | |
| 		for hapd in self.hostapd.instances:
 | |
| 			frequencies.append(hapd.cli.frequency)
 | |
| 
 | |
| 		return frequencies
 | |
| 
 | |
| 	def start_wpas_interfaces(self):
 | |
| 
 | |
| 		if 'WPA_SUPPLICANT' not in self.hw_config:
 | |
| 			return
 | |
| 
 | |
| 		settings = self.hw_config['WPA_SUPPLICANT']
 | |
| 
 | |
| 		if self.args.hw:
 | |
| 			nradios = len(settings.items())
 | |
| 
 | |
| 			wpas_radios = self.radios[:nradios]
 | |
| 			self.wpas_interfaces = []
 | |
| 
 | |
| 			#
 | |
| 			# Physical radios most likely will use a different name
 | |
| 			# than 'rad#' but the config file is referenced by these
 | |
| 			# 'rad#' names. Iterate through both the settings and
 | |
| 			# physical radios to create interfaces associated with
 | |
| 			# each config file.
 | |
| 			#
 | |
| 			for vrad, hwrad in zip(settings.items(), wpas_radios):
 | |
| 				self.wpas_interfaces.append(hwrad.create_interface(vrad[1], 'wpas'))
 | |
| 
 | |
| 		else:
 | |
| 			wpas_radios = [rad for rad in self.radios if rad.name in settings]
 | |
| 			self.wpas_interfaces = [rad.create_interface(settings[rad.name], 'wpas') \
 | |
| 						for rad in wpas_radios]
 | |
| 
 | |
| 	def start_ofono(self):
 | |
| 		sim_keys = self.hw_config['SETUP'].get('sim_keys', None)
 | |
| 		if not sim_keys:
 | |
| 			print("Ofono not requred")
 | |
| 			return
 | |
| 		elif sim_keys != 'ofono':
 | |
| 			os.environ['IWD_SIM_KEYS'] = sim_keys
 | |
| 			return
 | |
| 
 | |
| 		if not shutil.which('ofonod') or not shutil.which(['phonesim']):
 | |
| 			print("Ofono or Phonesim not found, skipping test")
 | |
| 			return
 | |
| 
 | |
| 		Process(['ip', 'link', 'set', 'lo', 'up']).wait()
 | |
| 
 | |
| 		os.environ['OFONO_PHONESIM_CONFIG'] = '/tmp/phonesim.conf'
 | |
| 
 | |
| 		phonesim_args = ['phonesim', '-p', '12345', '/usr/share/phonesim/default.xml']
 | |
| 
 | |
| 		self.start_process(phonesim_args)
 | |
| 
 | |
| 		#
 | |
| 		# TODO:
 | |
| 		# Is there something to wait for? Without this phonesim rejects
 | |
| 		# connections on all but the fist test.
 | |
| 		#
 | |
| 		time.sleep(3)
 | |
| 
 | |
| 		ofono_args = ['ofonod', '-n', '--plugin=atmodem,phonesim']
 | |
| 		if Process.is_verbose('ofonod'):
 | |
| 			ofono_args.append('-d')
 | |
| 
 | |
| 		self.start_process(ofono_args)
 | |
| 
 | |
| 		print("Ofono started")
 | |
| 
 | |
| 	def create_namespaces(self):
 | |
| 		if not self.hw_config.has_section('NameSpaces'):
 | |
| 			return
 | |
| 
 | |
| 		for key, value in self.hw_config.items('NameSpaces'):
 | |
| 			radio_names = value.split(',')
 | |
| 			# Gather up radio objects for this namespace
 | |
| 			radios = [rad for rad in self.radios if rad.name in radio_names]
 | |
| 
 | |
| 			# Remove radios from 'root' namespace
 | |
| 			self.radios = list(set(self.radios) - set(radios))
 | |
| 
 | |
| 			self.namespaces.append(Namespace(self.args, key, radios))
 | |
| 
 | |
| 	def get_namespace(self, ns):
 | |
| 		for n in self.namespaces:
 | |
| 			if n.name == ns:
 | |
| 				return n
 | |
| 
 | |
| 		return None
 | |
| 
 | |
| 	def stop_test_processes(self):
 | |
| 		for n in self.namespaces:
 | |
| 			n.reset()
 | |
| 
 | |
| 		self.namespaces = []
 | |
| 		self.hostapd = None
 | |
| 		self.wpas_interfaces = None
 | |
| 
 | |
| 		self.reset()
 | |
| 
 | |
| 	def meminfo_to_dict(self):
 | |
| 		def removesuffix(string, suffix):
 | |
| 			if string.endswith(suffix):
 | |
| 				return string[:-len(suffix)]
 | |
| 			return string
 | |
| 
 | |
| 		ret = {}
 | |
| 
 | |
| 		with open('/proc/meminfo', 'r') as f:
 | |
| 			data = f.read().strip().split('\n')
 | |
| 
 | |
| 		for l in data:
 | |
| 			entry = l.split(':')
 | |
| 			ret[entry[0]] = int(removesuffix(entry[1], 'kB'))
 | |
| 
 | |
| 		return ret
 | |
| 
 | |
| 	def __str__(self):
 | |
| 		ret = 'Arguments:\n'
 | |
| 		for arg in vars(self.args):
 | |
| 			ret += '\t --%s %s\n' % (arg, str(getattr(self.args, arg)))
 | |
| 
 | |
| 		ret += 'Hostapd:\n'
 | |
| 		if self.hostapd:
 | |
| 			for h in self.hostapd.instances:
 | |
| 				ret += '\t%s\n' % str(h)
 | |
| 		else:
 | |
| 			ret += '\tNo Hostapd instances\n'
 | |
| 
 | |
| 		info = self.meminfo_to_dict()
 | |
| 		self._mem_chart.add_value(info['MemAvailable'])
 | |
| 
 | |
| 		ret += 'Available Memory: %u kB\n' % info['MemAvailable']
 | |
| 		ret += 'Last Test Delta: %+d kB\n' % (info['MemAvailable'] - self._last_mem_available)
 | |
| 		ret += 'Per-test Usage:\n'
 | |
| 		ret += str(self._mem_chart)
 | |
| 
 | |
| 		self._last_mem_available = info['MemAvailable']
 | |
| 
 | |
| 		ret += super().__str__()
 | |
| 
 | |
| 		for n in self.namespaces:
 | |
| 			ret += n.__str__()
 | |
| 
 | |
| 		return ret
 | |
| 
 | |
| def build_unit_list(args):
 | |
| 	'''
 | |
| 		Build list of unit tests based on passed arguments. This first
 | |
| 		checks for literal names provided in the arguments, then if
 | |
| 		no matches were found, checks for a glob match.
 | |
| 	'''
 | |
| 	tests = []
 | |
| 	test_root = args.testhome + '/unit'
 | |
| 
 | |
| 	for unit in args.unit_tests.split(','):
 | |
| 		path = '%s/%s' % (test_root, unit)
 | |
| 		if os.access(unit, os.X_OK):
 | |
| 			tests.append(unit)
 | |
| 		elif os.access(path, os.X_OK):
 | |
| 			tests.append(path)
 | |
| 		else:
 | |
| 			# Full list or glob, first build up valid list of tests
 | |
| 			matches = glob(path)
 | |
| 			if matches == []:
 | |
| 				raise Exception("Could not find test %s" % unit)
 | |
| 
 | |
| 			matches = [exe for exe in matches if os.access(exe, os.X_OK)]
 | |
| 
 | |
| 			tests.extend(matches)
 | |
| 
 | |
| 	return sorted(tests)
 | |
| 
 | |
| def build_test_list(args):
 | |
| 	'''
 | |
| 		Build list of auto test directories based on passed arguments.
 | |
| 		First check for absolute paths, then look in <iwd>/autotests,
 | |
| 		then glob match.
 | |
| 	'''
 | |
| 	tests = []
 | |
| 	test_root = args.testhome + '/autotests'
 | |
| 
 | |
| 	# Run all tests
 | |
| 	if not args.autotests:
 | |
| 		# Get list of all autotests (committed in git)
 | |
| 		tests = os.popen('git -C %s ls-files autotests/ | cut -f2 -d"/" \
 | |
| 					| grep "test*" | uniq' % args.testhome).read() \
 | |
| 					.strip().split('\n')
 | |
| 		tests = [test_root + '/' + t for t in tests]
 | |
| 	else:
 | |
| 		print("Generating partial test list")
 | |
| 
 | |
| 		full_list = sorted(os.listdir(test_root))
 | |
| 
 | |
| 		for t in args.autotests.split(','):
 | |
| 			path = '%s/%s' % (test_root, t)
 | |
| 			if t.endswith('+'):
 | |
| 				t = t.split('+')[0]
 | |
| 				i = full_list.index(t)
 | |
| 
 | |
| 				tests = [test_root + '/' + x for x in full_list[i:] \
 | |
| 							if x.startswith('test')]
 | |
| 			elif os.path.exists(t):
 | |
| 				if t not in tests:
 | |
| 					tests.append(t)
 | |
| 			elif os.path.exists(path):
 | |
| 				if path not in tests:
 | |
| 					tests.append(path)
 | |
| 			else:
 | |
| 				matches = glob(path)
 | |
| 				if matches == []:
 | |
| 					raise Exception("Could not find test %s" % t)
 | |
| 
 | |
| 				tests.extend(list(set(matches) - set(tests)))
 | |
| 
 | |
| 	return sorted(tests)
 | |
| 
 | |
| SimpleResult = namedtuple('SimpleResult', 'run failures errors skipped time')
 | |
| 
 | |
| def start_test(ctx, subtests, rqueue):
 | |
| 	'''
 | |
| 		Run an individual test. 'subtests' are parsed prior to calling
 | |
| 		but these effectively make up a single test. 'rqueue' is the
 | |
| 		results queue which is required since this is using
 | |
| 		multiprocessing.
 | |
| 	'''
 | |
| 	run = 0
 | |
| 	errors = 0
 | |
| 	failures = 0
 | |
| 	skipped = 0
 | |
| 
 | |
| 	start = time.time()
 | |
| 	#
 | |
| 	# Iterate through each individual python test.
 | |
| 	#
 | |
| 	for s in subtests:
 | |
| 		loader = unittest.TestLoader()
 | |
| 		try:
 | |
| 			module = importlib.import_module(os.path.splitext(s)[0])
 | |
| 		except OSError as e:
 | |
| 			dbg(subprocess.check_output("cat /proc/buddyinfo", shell=True).decode('utf-8'))
 | |
| 			dbg(subprocess.check_output("dmesg | tail -80", shell=True).decode('utf-8'))
 | |
| 			print(ctx)
 | |
| 			raise e
 | |
| 
 | |
| 		subtest = loader.loadTestsFromModule(module)
 | |
| 
 | |
| 		# The test suite is being (ab)used to get a bit more granularity
 | |
| 		# with individual tests. The 'normal' way to use unittest is to
 | |
| 		# just create a test suite and run them. The problem here is that
 | |
| 		# test results are queued and printed at the very end so its
 | |
| 		# difficult to know *where* a test failed (python gives a stack
 | |
| 		# trace but printing the exception/failure immediately shows
 | |
| 		# where in the debug logs something failed). Moreso if there are
 | |
| 		# several test functions inside a single python file they run
 | |
| 		# as a single test and it is difficult (again) to know where
 | |
| 		# something failed.
 | |
| 
 | |
| 		# Iterating through each python test file
 | |
| 		for test in subtest:
 | |
| 			limit_funcs = []
 | |
| 
 | |
| 			if ctx.args.sub_tests:
 | |
| 				for i in ctx.args.sub_tests:
 | |
| 					if len(i.split('.')) == 2:
 | |
| 						limit_funcs.append(i.split('.')[1])
 | |
| 
 | |
| 			# Iterating through individual test functions inside a
 | |
| 			# Test() class. Due to the nature of unittest we have
 | |
| 			# to jump through some hoops to set up the test class
 | |
| 			# only once by turning the enumeration into a list, then
 | |
| 			# enumerating (again) to keep track of the index (just
 | |
| 			# enumerating the test class doesn't allow len() because
 | |
| 			# it is not a list).
 | |
| 			tlist = list(enumerate(test))
 | |
| 			for index, t in enumerate(tlist):
 | |
| 				# enumerate is returning a tuple, index 1 is our
 | |
| 				# actual object.
 | |
| 				t = t[1]
 | |
| 
 | |
| 				func, file = str(t).split(' ')
 | |
| 				#
 | |
| 				# TODO: There may be a better way of doing this
 | |
| 				# but strigifying the test class gives us a string:
 | |
| 				# <function> (<file>.<class>)
 | |
| 				#
 | |
| 				file = file.strip('()').split('.')[0] + '.py'
 | |
| 
 | |
| 				# Create an empty result here in case the test fails
 | |
| 				result = TestResult()
 | |
| 
 | |
| 				try:
 | |
| 					skip = len(limit_funcs) > 0 and func not in limit_funcs
 | |
| 
 | |
| 					# Set up class only on first test
 | |
| 					if index == 0:
 | |
| 						if not skip:
 | |
| 							dbg("%s\n\t%s RUNNING" % (file, str(func)), end='')
 | |
| 						t.setUpClass()
 | |
| 					else:
 | |
| 						if not skip:
 | |
| 							dbg("\t%s RUNNING" % str(func), end='')
 | |
| 
 | |
| 					sys.__stdout__.flush()
 | |
| 
 | |
| 					name = os.path.basename(os.getcwd())
 | |
| 
 | |
| 					Process.write_separators(name, "\n====== %s:%s:%s ======\n\n" %
 | |
| 									(name, file, func))
 | |
| 
 | |
| 					if not skip:
 | |
| 						# Run test (setUp/tearDown run automatically)
 | |
| 						result = t()
 | |
| 
 | |
| 					# Tear down class only on last test
 | |
| 					if index == len(tlist) - 1:
 | |
| 						t.tearDownClass()
 | |
| 
 | |
| 					if skip:
 | |
| 						continue
 | |
| 				except unittest.SkipTest as e:
 | |
| 					result.skipped.append(t)
 | |
| 				except Exception as e:
 | |
| 					dbg('\n%s threw an uncaught exception:' % func)
 | |
| 					traceback.print_exc(file=sys.__stdout__)
 | |
| 
 | |
| 				run += result.testsRun
 | |
| 				errors += len(result.errors)
 | |
| 				failures += len(result.failures)
 | |
| 				skipped += len(result.skipped)
 | |
| 
 | |
| 				if len(result.skipped) > 0:
 | |
| 					dbg(colored(" SKIPPED", "cyan"))
 | |
| 				elif run == 0 or len(result.errors) > 0 or len(result.failures) > 0:
 | |
| 					dbg(colored(" FAILED", "red"))
 | |
| 					for e in result.errors:
 | |
| 						dbg(e[1])
 | |
| 					for f in result.failures:
 | |
| 						dbg(f[1])
 | |
| 				else:
 | |
| 					dbg(colored(" PASSED", "green"))
 | |
| 
 | |
| 		# Prevents future test modules with the same name (e.g.
 | |
| 		# connection_test.py) from being loaded from the cache
 | |
| 		sys.modules.pop(module.__name__)
 | |
| 
 | |
| 	#
 | |
| 	# The multiprocessing queue is picky with what objects it will serialize
 | |
| 	# and send between processes. Because of this we put the important bits
 | |
| 	# of the result into our own 'SimpleResult' tuple.
 | |
| 	#
 | |
| 	sresult = SimpleResult(run=run, failures=failures, errors=errors,
 | |
| 				skipped=skipped, time=time.time() - start)
 | |
| 	rqueue.put(sresult)
 | |
| 
 | |
| 	# This may not be required since we are manually popping sys.modules
 | |
| 	importlib.invalidate_caches()
 | |
| 
 | |
| def pre_test(ctx, test, copied):
 | |
| 	'''
 | |
| 		Copy test files, start processes, and any other pre test work.
 | |
| 	'''
 | |
| 	os.chdir(test)
 | |
| 
 | |
| 	dbg("\nStarting %s" % colored(os.path.basename(test), "white", attrs=['bold']))
 | |
| 	if not os.path.exists(test + '/hw.conf'):
 | |
| 		raise Exception("No hw.conf found for %s" % test)
 | |
| 
 | |
| 	ctx.hw_config = ConfigParser()
 | |
| 	ctx.hw_config.read(test + '/hw.conf')
 | |
| 	#
 | |
| 	# We have two types of test files: tests and everything else. Rather
 | |
| 	# than require each test to specify the files needing to be copied to
 | |
| 	# /tmp (previously 'tmpfs_extra_stuff'), we just copy everything which
 | |
| 	# isn't a test. There is really no reason not to do this as any file
 | |
| 	# present in a test directory should be needed by the test.
 | |
| 	#
 | |
| 	# All files
 | |
| 	files = os.listdir(test)
 | |
| 	# Tests (starts or ends with 'test')
 | |
| 	subtests = [f for f in files if f.startswith('test') or \
 | |
| 			os.path.splitext(f)[0].endswith('test')]
 | |
| 	# Everything else (except .py files)
 | |
| 	to_copy = [f for f in list(set(files) - set(subtests)) if not f.endswith('.py') \
 | |
| 								and f != '__pycache__']
 | |
| 	for f in to_copy:
 | |
| 		if os.path.isdir(f):
 | |
| 			shutil.copytree(f, '/tmp/' + f)
 | |
| 		else:
 | |
| 			shutil.copy(f, '/tmp')
 | |
| 		copied.append(f)
 | |
| 
 | |
| 	# Prune down any subtests if needed
 | |
| 	if ctx.args.sub_tests:
 | |
| 		ctx.args.sub_tests = ctx.args.sub_tests.split(',')
 | |
| 
 | |
| 		to_run = [x.split('.')[0] for x in ctx.args.sub_tests]
 | |
| 		pruned = []
 | |
| 
 | |
| 		for s in subtests:
 | |
| 			no_ext = s
 | |
| 			# Handle <file>.<test function> format
 | |
| 			if '.' in s:
 | |
| 				no_ext = s.split('.')[0]
 | |
| 
 | |
| 			if no_ext in to_run:
 | |
| 				pruned.append(no_ext + '.py')
 | |
| 
 | |
| 		subtests = pruned
 | |
| 
 | |
| 	if ctx.args.log:
 | |
| 		ctx.start_process(['iwmon', '--nowiphy'])
 | |
| 	elif ctx.args.monitor:
 | |
| 		ctx.start_process(['iwmon'], outfile=ctx.args.monitor)
 | |
| 
 | |
| 	ctx.start_dbus()
 | |
| 	ctx.start_haveged()
 | |
| 	ctx.start_dbus_monitor()
 | |
| 	ctx.start_radios()
 | |
| 	ctx.create_namespaces()
 | |
| 	ctx.start_hostapd()
 | |
| 	ctx.start_wpas_interfaces()
 | |
| 	ctx.start_ofono()
 | |
| 
 | |
| 	if ctx.hw_config.getboolean('SETUP', 'start_iwd', fallback=True):
 | |
| 		ctx.start_iwd()
 | |
| 
 | |
| 	print(ctx)
 | |
| 
 | |
| 	sys.path.insert(1, test)
 | |
| 
 | |
| 	return sorted(subtests)
 | |
| 
 | |
| def post_test(ctx, to_copy):
 | |
| 	'''
 | |
| 		Remove copied files, and stop test processes.
 | |
| 	'''
 | |
| 	try:
 | |
| 		for f in to_copy:
 | |
| 			if os.path.isdir('/tmp/' + f):
 | |
| 				shutil.rmtree('/tmp/' + f)
 | |
| 			else:
 | |
| 				os.remove('/tmp/' + f)
 | |
| 
 | |
| 		Process(['ip', 'link', 'set', 'lo', 'down']).wait()
 | |
| 	except Exception as e:
 | |
| 		print("Exception thrown in post_test")
 | |
| 	finally:
 | |
| 		ctx.stop_test_processes()
 | |
| 
 | |
| 	if ctx.args.valgrind:
 | |
| 		for f in os.listdir('/tmp'):
 | |
| 			if f.startswith("valgrind.log."):
 | |
| 				dbg(f)
 | |
| 				with open('/tmp/' + f, 'r') as v:
 | |
| 					dbg(v.read())
 | |
| 				dbg("\n")
 | |
| 				os.remove('/tmp/' + f)
 | |
| 
 | |
| 	# Special case for when logging is enabled
 | |
| 	if os.path.isfile('/tmp/iwd-tls-debug-server-cert.pem'):
 | |
| 		os.remove('/tmp/iwd-tls-debug-server-cert.pem')
 | |
| 
 | |
| 	allowed = ['phonesim.conf', 'certs', 'secrets', 'iwd']
 | |
| 	for f in [f for f in os.listdir('/tmp') if f not in allowed]:
 | |
| 		dbg("File %s was not cleaned up!" % f)
 | |
| 		try:
 | |
| 			os.remove('/tmp/' + f)
 | |
| 		except:
 | |
| 			pass
 | |
| 
 | |
| def print_results(results):
 | |
| 	table = PrettyTable(['Test', colored('Passed', 'green'), colored('Failed', 'red'), \
 | |
| 				colored('Skipped', 'cyan'), colored('Time', 'yellow')])
 | |
| 
 | |
| 	total_pass = 0
 | |
| 	total_fail = 0
 | |
| 	total_skip = 0
 | |
| 	total_time = 0
 | |
| 
 | |
| 	for test, result in results.items():
 | |
| 
 | |
| 		if result.time == TEST_MAX_TIMEOUT:
 | |
| 			failed = "Timed out"
 | |
| 			passed = "Timed out"
 | |
| 		elif result.time == 0:
 | |
| 			failed = "Exception"
 | |
| 			passed = "Exception"
 | |
| 		else:
 | |
| 			failed = result.failures + result.errors
 | |
| 			passed = result.run - failed
 | |
| 
 | |
| 			total_pass += passed
 | |
| 			total_fail += failed
 | |
| 			total_skip += result.skipped
 | |
| 
 | |
| 		total_time += result.time
 | |
| 
 | |
| 		time = '%.2f' % result.time
 | |
| 
 | |
| 		table.add_row([test, colored(passed, 'green'), colored(failed, 'red'), \
 | |
| 				colored(result.skipped, 'cyan'), colored(time, 'yellow')])
 | |
| 
 | |
| 	total_time = '%.2f' % total_time
 | |
| 
 | |
| 	table.add_row(['Total', colored(total_pass, 'green'), colored(total_fail, 'red'), \
 | |
| 			colored(total_skip, 'cyan'), colored(total_time, 'yellow')])
 | |
| 
 | |
| 	dbg(table)
 | |
| 
 | |
| 	return total_fail == 0
 | |
| 
 | |
| def run_auto_tests(ctx, args):
 | |
| 	tests = build_test_list(args)
 | |
| 
 | |
| 	for test in tests:
 | |
| 		copied = []
 | |
| 		try:
 | |
| 			subtests = pre_test(ctx, test, copied)
 | |
| 
 | |
| 			if len(subtests) < 1:
 | |
| 				dbg("No tests to run")
 | |
| 				sys.exit()
 | |
| 
 | |
| 			rqueue = multiprocessing.Queue()
 | |
| 			p = multiprocessing.Process(target=start_test, args=(ctx, subtests, rqueue))
 | |
| 			p.start()
 | |
| 			# Rather than time each subtest we just time the total but
 | |
| 			# mutiply the default time by the number of tests being run.
 | |
| 			p.join(TEST_MAX_TIMEOUT * len(subtests))
 | |
| 
 | |
| 			if p.is_alive():
 | |
| 				# Timeout
 | |
| 				p.terminate()
 | |
| 
 | |
| 				ctx.results[os.path.basename(test)] = SimpleResult(run=0,
 | |
| 								failures=0, errors=0,
 | |
| 								skipped=0, time=TEST_MAX_TIMEOUT)
 | |
| 			else:
 | |
| 				ctx.results[os.path.basename(test)] = rqueue.get()
 | |
| 
 | |
| 		except Exception as ex:
 | |
| 			dbg("%s threw an uncaught exception" % test)
 | |
| 			traceback.print_exc(file=sys.__stdout__)
 | |
| 			ctx.results[os.path.basename(test)] = SimpleResult(run=0, failures=0,
 | |
| 								errors=0, skipped=0, time=0)
 | |
| 		finally:
 | |
| 			post_test(ctx, copied)
 | |
| 
 | |
| def run_unit_tests(ctx, args):
 | |
| 	os.chdir(args.testhome + '/unit')
 | |
| 	units = build_unit_list(args)
 | |
| 
 | |
| 	for u in units:
 | |
| 		p = ctx.start_process([u]).wait()
 | |
| 		if p.returncode != 0:
 | |
| 			dbg("Unit test %s failed" % os.path.basename(u))
 | |
| 		else:
 | |
| 			dbg("Unit test %s passed" % os.path.basename(u))
 | |
| 
 | |
| def run_tests(args):
 | |
| 	global config
 | |
| 
 | |
| 	os.chdir(args.testhome)
 | |
| 
 | |
| 	#
 | |
| 	# This allows all autotest utils (iwd/hostapd/etc) to access the
 | |
| 	# TestContext. Any other module or script (in the same interpreter) can
 | |
| 	# simply import config.ctx and access all live test information,
 | |
| 	# start/stop processes, see active radios etc.
 | |
| 	#
 | |
| 	config = importlib.import_module('config')
 | |
| 	config.ctx = TestContext(args)
 | |
| 
 | |
| 	# Must import these after config so ctx gets set
 | |
| 	config.hwsim = importlib.import_module('hwsim')
 | |
| 	config.hostapd = importlib.import_module('hostapd')
 | |
| 
 | |
| 	# Start writing out kernel log
 | |
| 	config.ctx.start_process(["dmesg", '--follow'])
 | |
| 
 | |
| 	if args.unit_tests is None:
 | |
| 		run_auto_tests(config.ctx, args)
 | |
| 	else:
 | |
| 		run_unit_tests(config.ctx, args)
 | |
| 
 | |
| runner = Runner()
 | |
| 
 | |
| atexit.register(exit_vm)
 | |
| runner.prepare_environment()
 | |
| run_tests(runner.args)
 | |
| runner.cleanup_environment()
 |