test-runner: inherit Popen by Process class

The Procss class was doing quite a bit of what Popen already does like
storing the return code and process arguments. In addition the Process
class ended up storing a Popen object which was frequently accessed.

For both simplicity and memory savings have Process inherit Popen and
add the additional functionality test-runner needs like stdout
processing to output files and the console.

To do this Popen.wait() needed to be overridden to to prevent blocking
as well as wait for the HUP signal so we are sure all the process
output was written. kill() was also overritten to perform cleanup.

The most intrusive change was removing wait as a kwarg, and instead
requiring the caller to call wait(). This doesn't change much in
terms of complexity to the caller, but simplifies the __init__
routine of Process.

Some convenient improvements:
 - Separate multiple process instance output (Terminate: <args> will
   be written to outfiles each time a process dies.)
 - Append to outfile if the same process is started again
 - Wait for HUP before returning from wait(). This allows any remaining
   output to be written without the need to manually call process_io.
 - Store ctx as a class variable so callers don't need to pass it in
   (e.g. when using Process directly rather than start_process)
This commit is contained in:
James Prestwood 2021-08-25 15:17:23 -07:00 committed by Denis Kenzior
parent a3db60b7b1
commit 7d94aee5b6
1 changed files with 112 additions and 116 deletions

View File

@ -57,7 +57,7 @@ def dbg(*s, **kwargs):
def exit_vm():
if config:
for p in config.ctx.processes:
print("Process %s still running!" % p.name)
print("Process %s still running!" % p.args[0])
p.kill()
p = None
@ -159,102 +159,61 @@ busconfig.dtd\">
<allow eavesdrop=\"true\"/>
</policy>
'''
class Process:
'''
Start a process. If 'wait' is True the constructor will start
the process and wait for it to exit. No PID is tracked in this
case.
'''
def __init__(self, args, wait=False, env=None, ctx=None, check=False,
outfile=None, namespace=None, need_out=False, cleanup=None):
self.killed = False
self.args = args
self.wait = wait
self.name = args[0]
self.ret = None
self.ctx = ctx
class Process(subprocess.Popen):
ctx = None
def __init__(self, args, namespace=None, outfile=None, env=None, check=False, cleanup=None):
self.write_fds = []
self.io_watch = None
self.cleanup = cleanup
self.verbose = False
self.out = ''
self.hup = False
self.killed = False
if not self.ctx:
global config
self.ctx = config.ctx
if namespace:
self.args = ['ip', 'netns', 'exec', namespace]
self.args.extend(args)
args = ['ip', 'netns', 'exec', namespace] + args
if ctx:
# Verbose requested, add stdout/stderr to write FD list.
# This will end up always returning true if logging is
# on which isn't desired so pass log=False as to only
# allow stdout to processes listed in --verbose.
if ctx.is_verbose(self.name, log=False):
self.verbose = True
if self.ctx.is_verbose(args[0], log=False):
self.verbose = True
# Add output file to FD list
if outfile:
try:
f = open(outfile, 'w')
except Exception as e:
dbg(e)
exit(0)
if outfile:
self._append_outfile(outfile)
if ctx.args.log_uid:
os.fchown(f.fileno(), int(ctx.args.log_uid), int(ctx.args.log_gid))
if self.ctx.args.log:
logfile = '%s/%s/%s' % (self.ctx.args.log,
os.path.basename(os.getcwd()),
args[0])
self._append_outfile(logfile)
self.write_fds.append(f)
# Add log file to FD list
if ctx.args.log:
test = os.path.basename(os.getcwd())
test_dir = '%s/%s' % (ctx.args.log, test)
if not path_exists(test_dir):
os.mkdir(test_dir)
os.chown(test_dir, int(ctx.args.log_uid),
int(ctx.args.log_gid))
f = open('%s/%s' % (test_dir, args[0]), 'a+')
os.fchown(f.fileno(), int(ctx.args.log_uid), int(ctx.args.log_gid))
self.write_fds.append(f)
self.pid = subprocess.Popen(self.args, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=env, cwd=os.getcwd())
super().__init__(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
env=env, cwd=os.getcwd())
# Set as non-blocking so read() in the IO callback doesn't block forever
fl = fcntl.fcntl(self.pid.stdout, fcntl.F_GETFL)
fcntl.fcntl(self.pid.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
fl = fcntl.fcntl(self.stdout, fcntl.F_GETFL)
fcntl.fcntl(self.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
if not wait and not check:
self.io_watch = GLib.io_add_watch(self.pid.stdout, GLib.IO_IN |
GLib.IO_HUP | GLib.IO_ERR, self.io_callback)
self.io_watch = GLib.io_add_watch(self.stdout, GLib.IO_IN |
GLib.IO_HUP | GLib.IO_ERR, self.process_io)
print("Starting process {}".format(self.pid.args))
print("Starting process {}".format(self.args))
if not wait and not check:
return
if check:
self.wait(10)
self.killed = True
if self.returncode != 0:
raise subprocess.CalledProcessError(returncode=self.returncode,
cmd=args)
Namespace.non_block_wait(self.wait_for_process, 10, 1)
self.killed = True
self.ret = self.pid.returncode
def process_io(self, source, condition):
if condition & GLib.IO_HUP:
self.hup = True
self.process_io(self.pid.stdout)
self.write_fds = []
print("%s returned %d" % (args[0], self.ret))
if check and self.ret != 0:
raise subprocess.CalledProcessError(returncode=self.ret, cmd=self.args)
def wait_for_process(self, timeout):
try:
self.pid.wait(timeout)
return True
except:
return False
def process_io(self, source):
data = source.read()
if not data:
@ -267,6 +226,12 @@ class Process:
for f in self.write_fds:
f.write(data)
# Write out a separator so multiple process calls per
# test are easer to read.
if self.hup:
f.write("Terminated: {}\n\n".format(self.args))
f.flush()
if self.verbose:
@ -275,42 +240,73 @@ class Process:
return True
def io_callback(self, source, cb_condition):
return self.process_io(source)
def _append_outfile(self, file):
gid = int(self.ctx.args.log_gid)
uid = int(self.ctx.args.log_uid)
dir = os.path.dirname(file)
def __del__(self):
print("Del process %s" % self.args)
if not path_exists(dir):
os.mkdir(dir)
os.chown(dir, uid, gid)
if not self.killed:
self.kill()
file = os.path.join(dir,file)
# If the out file exists, append. Useful for processes like
# hostapd_cli where it is called multiple times independently.
if os.path.isfile(file):
mode = 'a'
else:
mode = 'w'
try:
f = open(os.path.join(dir, file), mode)
except Exception as e:
traceback.print_exc()
exit(0)
os.fchown(f.fileno(), uid, gid)
self.write_fds.append(f)
def wait_for_socket(self, socket, wait):
Namespace.non_block_wait(os.path.exists, wait, socket)
# Wait for both process termination and HUP signal
def __wait(self, timeout):
try:
super().wait(timeout)
if not self.hup:
return False
return True
except:
return False
# Override wait() so it can do so non-blocking
def wait(self, timeout=10):
Namespace.non_block_wait(self.__wait, timeout, 1)
# Override kill()
def kill(self, force=False):
print("Killing process %s" % self.args)
if self.killed:
return
print("Killing process {}".format(self.args))
if force:
self.pid.kill()
super().kill()
else:
self.pid.terminate()
self.terminate()
try:
self.pid.wait(timeout=15)
self.wait(timeout=15)
except:
dbg("Process %s did not complete in 15 seconds!" % self.name)
self.pid.kill()
if self.ctx and self in self.ctx.processes:
self.ctx.processes.remove(self)
super().kill()
if self.cleanup:
self.cleanup()
self.process_io(self.pid.stdout)
self.ctx = None
self.pid = None
self.write_fds = []
if self.io_watch:
@ -320,8 +316,8 @@ class Process:
self.cleanup = None
self.killed = True
def wait_for_socket(self, socket, wait):
Namespace.non_block_wait(os.path.exists, wait, socket)
if self in self.ctx.processes:
self.ctx.processes.remove(self)
def __str__(self):
return str(self.args) + '\n'
@ -333,10 +329,10 @@ class Interface:
self.config = config
def __del__(self):
Process(['iw', 'dev', self.name, 'del'], True)
Process(['iw', 'dev', self.name, 'del']).wait()
def set_interface_state(self, state):
Process(['ifconfig', self.name, state], True)
Process(['ifconfig', self.name, state]).wait()
class Radio:
def __init__(self, name):
@ -360,7 +356,7 @@ class Radio:
self.use = use
Process(['iw', 'phy', self.name, 'interface', 'add', ifname,
'type', 'managed'], True)
'type', 'managed']).wait()
return self.interface
@ -463,8 +459,8 @@ class Hostapd:
print("Initializing hostapd instances")
ctx.start_process(['ip', 'link', 'set', 'eth0', 'up'], wait=True)
ctx.start_process(['ip', 'link', 'set', 'eth1', 'up'], wait=True)
ctx.start_process(['ip', 'link', 'set', 'eth0', 'up']).wait()
ctx.start_process(['ip', 'link', 'set', 'eth1', 'up']).wait()
self.global_ctrl_iface = '/var/run/hostapd/ctrl'
@ -575,9 +571,9 @@ class Namespace:
self.radios = radios
self.args = args
Process(['ip', 'netns', 'add', name], wait=True)
Process(['ip', 'netns', 'add', name]).wait()
for r in radios:
Process(['iw', 'phy', r.name, 'set', 'netns', 'name', name], wait=True)
Process(['iw', 'phy', r.name, 'set', 'netns', 'name', name]).wait()
self.start_dbus()
@ -590,7 +586,6 @@ class Namespace:
self.radios = []
for p in list(self.processes):
print("Killing process %s" % p.name)
p.kill()
self.processes = []
@ -598,7 +593,7 @@ class Namespace:
def __del__(self):
print("Removing namespace %s" % self.name)
Process(['ip', 'netns', 'del', self.name], wait=True)
Process(['ip', 'netns', 'del', self.name]).wait()
def get_bus(self):
return self._bus
@ -617,7 +612,7 @@ class Namespace:
# In case this process needs DBus...
env['DBUS_SYSTEM_BUS_ADDRESS'] = self.dbus_address
p = Process(args, ctx=self, namespace=ns, env=env, **kwargs)
p = Process(args, namespace=ns, env=env, **kwargs)
if not kwargs.get('wait', False):
self.processes.append(p)
@ -629,7 +624,7 @@ class Namespace:
def is_process_running(self, process):
for p in self.processes:
if p.name == process:
if p.args[0] == process:
return True
return False
@ -654,7 +649,7 @@ class Namespace:
f.write('</busconfig>\n')
p = self.start_process(['dbus-daemon', '--config-file=%s' % self.dbus_cfg],
wait=False, cleanup=self._cleanup_dbus)
cleanup=self._cleanup_dbus)
p.wait_for_socket(self.dbus_address.split('=')[1], 5)
@ -865,7 +860,7 @@ class TestContext(Namespace):
def start_radios(self):
reg_domain = self.hw_config['SETUP'].get('reg_domain', None)
if reg_domain:
Process(['iw', 'reg', 'set', reg_domain], True)
Process(['iw', 'reg', 'set', reg_domain]).wait()
if self.args.hw:
self.discover_radios()
@ -942,7 +937,7 @@ class TestContext(Namespace):
print("Ofono or Phonesim not found, skipping test")
return
Process(['ifconfig', 'lo', 'up'], wait=True)
Process(['ifconfig', 'lo', 'up']).wait()
os.environ['OFONO_PHONESIM_CONFIG'] = '/tmp/phonesim.conf'
@ -1327,7 +1322,7 @@ def post_test(ctx, to_copy):
else:
os.remove('/tmp/' + f)
Process(['ifconfig', 'lo', 'down'], wait=True)
Process(['ifconfig', 'lo', 'down']).wait()
except Exception as e:
print("Exception thrown in post_test")
finally:
@ -1451,7 +1446,8 @@ def run_unit_tests(ctx, args):
units = build_unit_list(args)
for u in units:
if ctx.start_process([u], wait=True).ret != 0:
p = ctx.start_process([u]).wait()
if p.returncode != 0:
dbg("Unit test %s failed" % os.path.basename(u))
else:
dbg("Unit test %s passed" % os.path.basename(u))