test-runner: refactor process output code

The Process class requires the ability to write out any processes
output to stdout, logging, or an explicit file, as well as store
it inside python for processing by test utilities. To accomplish
this each process was given a temporary file to write to, and that
file had an IO watch set on it. Any data that was written was then
read, and re-written out to where it needed to go. This ended up
being very buggy and quite complex due to needing to mess with
read/write pointers inside the file.

Popen already creates pipes to stdout if told, and they are accessable
via the p.stdout. Its then as simple as setting an IO watch on that
pipe and keeping the same code for reading out new data and writing
it to any files we want. This greatly reduces the complexity.
This commit is contained in:
James Prestwood 2021-07-29 15:21:54 -07:00 committed by Denis Kenzior
parent 04d00c5c20
commit e2e625fa18
1 changed files with 27 additions and 50 deletions

View File

@ -175,24 +175,12 @@ class Process:
self.io_watch = None
self.cleanup = cleanup
self.verbose = False
if not namespace:
self.output_name = '/tmp/%s-out' % self.name
else:
self.output_name = '/tmp/%s-%s-out' % (self.name, namespace)
self.out = ''
if namespace:
self.args = ['ip', 'netns', 'exec', namespace]
self.args.extend(args)
#
# For simplicity all processes will write to a temp file
# (/tmp/<name>-out). If any verbose options are required this file
# will get an IO watch and write out any bytes to the needed FDs.
#
self.stdout = open(self.output_name, 'a+')
self.io_position = self.stdout.tell()
if ctx:
# Verbose requested, add stdout/stderr to write FD list.
# This will end up always returning true if logging is
@ -228,18 +216,18 @@ class Process:
os.fchown(f.fileno(), int(ctx.args.log_uid), int(ctx.args.log_gid))
self.write_fds.append(f)
#
# Only add an IO watch for long running processes. If
# the process is being waited for, the log/outfile bits
# will be handled after the process exists.
#
if self.write_fds != [] and not wait and not check or self.verbose:
self.io_watch = GLib.io_add_watch(self.stdout, GLib.IO_IN,
self.io_callback)
self.pid = subprocess.Popen(self.args, stdout=self.stdout, stderr=self.stdout,
self.pid = subprocess.Popen(self.args, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=env, cwd=os.getcwd())
# Set as non-blocking so read() in the IO callback doesn't block forever
fl = fcntl.fcntl(self.pid.stdout, fcntl.F_GETFL)
fcntl.fcntl(self.pid.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
if not wait and not check:
self.io_watch = GLib.io_add_watch(self.pid.stdout, GLib.IO_IN |
GLib.IO_HUP | GLib.IO_ERR, self.io_callback)
print("Starting process {}".format(self.pid.args))
if not wait and not check:
@ -249,12 +237,7 @@ class Process:
self.killed = True
self.ret = self.pid.returncode
self.stdout.seek(self.io_position)
self.out = self.stdout.read()
self.stdout.seek(0, 2)
if len(self.write_fds) > 0:
self.process_io(self.stdout)
self.process_io(self.pid.stdout)
self.write_fds = []
@ -263,21 +246,16 @@ class Process:
raise subprocess.CalledProcessError(returncode=self.ret, cmd=self.args)
def process_io(self, source):
#
# The file will have already been written to, meaning the seek
# position points to EOF. This is why the position is saved so
# we can seek to where we were last time, read data, and seek
# back to EOF.
#
source.seek(self.io_position)
data = source.read()
self.io_position += len(data)
source.seek(self.io_position)
if len(data) == 0:
if not data:
return True
data = data.decode('utf-8')
# Save data away in case the caller needs it (e.g. list_sta)
self.out += data
for f in self.write_fds:
f.write(data)
f.flush()
@ -294,10 +272,6 @@ class Process:
def __del__(self):
print("Del process %s" % self.args)
os.remove(self.output_name)
self.stdout.close()
if not self.killed:
self.kill()
@ -312,19 +286,22 @@ class Process:
else:
self.pid.terminate()
self.pid.wait(timeout=15)
self.pid = None
try:
self.pid.wait(timeout=15)
except:
dbg("Process %s did not complete in 15 seconds!" % self.name)
self.pid.kill()
if self.ctx and self in self.ctx.processes:
self.ctx.processes.remove(self)
self.ctx = None
self.process_io(self.stdout)
if self.cleanup:
self.cleanup()
self.process_io(self.pid.stdout)
self.ctx = None
self.pid = None
self.write_fds = []
if self.io_watch: