api: introduce pipeline monitoring
Introduce the concept of pipeline monitoring: A new monitor class is passed to the pipeline.run() function. The main idea is to separate the monitoring from the code that builds pipeline. Through the build process various methods will be called on that object, representing the different steps and their targets during the build process. This can be used to fully stream the output of the various stages or just indicate the start and finish of the individual stages. This replaces the 'interactive' argument throughout the pipeline code. The old interactive behavior is replicated via the new `LogMonitor` class that logs the beginning of stages/assembler, but also streams all the output of them to stdout. The non-interactive behavior of not reporting anything is done by using the `NullMonitor` class, which in turn outputs nothing.
This commit is contained in:
parent
5d55bc9aca
commit
3e18d8118c
4 changed files with 159 additions and 33 deletions
|
|
@ -2,19 +2,18 @@ import asyncio
|
|||
import io
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import threading
|
||||
from .util import jsoncomm
|
||||
|
||||
|
||||
class API:
|
||||
def __init__(self, socket_address, args, interactive):
|
||||
def __init__(self, socket_address, args, monitor):
|
||||
self.socket_address = socket_address
|
||||
self.input = args
|
||||
self.interactive = interactive
|
||||
self._output_data = io.StringIO()
|
||||
self._output_pipe = None
|
||||
self.monitor = monitor
|
||||
self.event_loop = asyncio.new_event_loop()
|
||||
self.thread = threading.Thread(target=self._run_event_loop)
|
||||
self.barrier = threading.Barrier(2)
|
||||
|
|
@ -41,9 +40,7 @@ class API:
|
|||
raw = os.read(self._output_pipe, 4096)
|
||||
data = raw.decode("utf-8")
|
||||
self._output_data.write(data)
|
||||
|
||||
if self.interactive:
|
||||
sys.stdout.write(data)
|
||||
self.monitor.log(data)
|
||||
|
||||
def _setup_stdio(self, server, addr):
|
||||
with self._prepare_input() as stdin, \
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import sys
|
|||
|
||||
import osbuild
|
||||
import osbuild.meta
|
||||
import osbuild.monitor
|
||||
|
||||
|
||||
RESET = "\033[0m"
|
||||
|
|
@ -145,10 +146,13 @@ def osbuild_cli():
|
|||
print("No output directory or checkpoints specified, exited without building.")
|
||||
return 0
|
||||
|
||||
monitor_name = "NullMonitor" if args.json else "LogMonitor"
|
||||
monitor = osbuild.monitor.make(monitor_name, sys.stdout.fileno())
|
||||
|
||||
try:
|
||||
r = pipeline.run(
|
||||
args.store,
|
||||
interactive=not args.json,
|
||||
monitor,
|
||||
libdir=args.libdir,
|
||||
output_directory=args.output_directory
|
||||
)
|
||||
|
|
|
|||
126
osbuild/monitor.py
Normal file
126
osbuild/monitor.py
Normal file
|
|
@ -0,0 +1,126 @@
|
|||
"""
|
||||
Monitor pipeline activity
|
||||
|
||||
The osbuild `Pipeline` class supports monitoring of its activities
|
||||
by providing a monitor object that implements the `BaseMonitor`
|
||||
interface. During the execution of the pipeline various functions
|
||||
are called on the monitor object at certain events. Consult the
|
||||
`BaseMonitor` class for the description of all available events.
|
||||
"""
|
||||
|
||||
import abc
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
|
||||
from typing import Dict
|
||||
|
||||
import osbuild
|
||||
|
||||
|
||||
RESET = "\033[0m"
|
||||
BOLD = "\033[1m"
|
||||
|
||||
|
||||
class TextWriter:
|
||||
"""Helper class for writing text to file descriptors"""
|
||||
def __init__(self, fd: int):
|
||||
self.fd = fd
|
||||
self.isatty = os.isatty(fd)
|
||||
|
||||
def term(self, text, *, clear=False):
|
||||
"""Write text if attached to a terminal."""
|
||||
if not self.isatty:
|
||||
return
|
||||
|
||||
if clear:
|
||||
self.write(RESET)
|
||||
|
||||
self.write(text)
|
||||
|
||||
def write(self, text: str):
|
||||
"""Write all of text to the log file descriptor"""
|
||||
data = text.encode("utf-8")
|
||||
n = len(data)
|
||||
while n:
|
||||
k = os.write(self.fd, data)
|
||||
n -= k
|
||||
if n:
|
||||
data = data[n:]
|
||||
|
||||
|
||||
class BaseMonitor(abc.ABC):
|
||||
"""Base class for all pipeline monitors"""
|
||||
|
||||
def __init__(self, fd: int):
|
||||
"""Logging will be done to file descriptor `fd`"""
|
||||
self.out = TextWriter(fd)
|
||||
|
||||
def begin(self, pipeline: osbuild.Pipeline):
|
||||
"""Called once at the beginning of a build"""
|
||||
|
||||
def finish(self, result: Dict):
|
||||
"""Called at the very end of the build"""
|
||||
|
||||
def stage(self, stage: osbuild.Stage):
|
||||
"""Called when a stage is being built"""
|
||||
|
||||
def assembler(self, assembler: osbuild.Assembler):
|
||||
"""Called when an assembler is being built"""
|
||||
|
||||
def result(self, result: osbuild.pipeline.BuildResult):
|
||||
"""Called when a module is done with its result"""
|
||||
|
||||
def log(self, message: str):
|
||||
"""Called for all module log outputs"""
|
||||
|
||||
|
||||
class NullMonitor(BaseMonitor):
|
||||
"""Monitor class that does not report anything"""
|
||||
|
||||
|
||||
class LogMonitor(BaseMonitor):
|
||||
"""Monitor that follows show the log output of modules
|
||||
|
||||
This monitor will print a header with `name: id` followed
|
||||
by the options for each module as it is being built. The
|
||||
full log messages of the modules will be print as soon as
|
||||
they become available.
|
||||
The constructor argument `fd` is a file descriptor, where
|
||||
the log will get written to. If `fd` is a `TTY`, escape
|
||||
sequences will be used to highlight sections of the log.
|
||||
"""
|
||||
def stage(self, stage):
|
||||
self.module(stage)
|
||||
|
||||
def assembler(self, assembler):
|
||||
self.out.term(BOLD, clear=True)
|
||||
self.out.write("Assembler ")
|
||||
self.out.term(RESET)
|
||||
|
||||
self.module(assembler)
|
||||
|
||||
def module(self, module):
|
||||
options = module.options or {}
|
||||
title = f"{module.name}: {module.id}"
|
||||
|
||||
self.out.term(BOLD, clear=True)
|
||||
self.out.write(title)
|
||||
self.out.term(RESET)
|
||||
self.out.write(" ")
|
||||
|
||||
json.dump(options, self.out, indent=2)
|
||||
self.out.write("\n")
|
||||
|
||||
def log(self, message):
|
||||
self.out.write(message)
|
||||
|
||||
|
||||
def make(name, fd):
|
||||
module = sys.modules[__name__]
|
||||
monitor = getattr(module, name, None)
|
||||
if not monitor:
|
||||
raise ValueError(f"Unknown monitor: {name}")
|
||||
if not issubclass(monitor, BaseMonitor):
|
||||
raise ValueError(f"Invalid monitor: {name}")
|
||||
return monitor(fd)
|
||||
|
|
@ -13,10 +13,6 @@ from . import sources
|
|||
from .util import osrelease
|
||||
|
||||
|
||||
RESET = "\033[0m"
|
||||
BOLD = "\033[1m"
|
||||
|
||||
|
||||
def cleanup(*objs):
|
||||
"""Call cleanup method for all objects, filters None values out"""
|
||||
_ = map(lambda o: o.cleanup(), filter(None, objs))
|
||||
|
|
@ -34,12 +30,6 @@ class BuildResult:
|
|||
return vars(self)
|
||||
|
||||
|
||||
def print_header(title, options):
|
||||
print()
|
||||
print(f"{RESET}{BOLD}{title}{RESET} " + json.dumps(options or {}, indent=2))
|
||||
print()
|
||||
|
||||
|
||||
class Stage:
|
||||
def __init__(self, name, source_options, build, base, options):
|
||||
self.name = name
|
||||
|
|
@ -71,13 +61,11 @@ class Stage:
|
|||
runner,
|
||||
build_tree,
|
||||
cache,
|
||||
interactive=False,
|
||||
monitor,
|
||||
libdir=None,
|
||||
var="/var/tmp"):
|
||||
with buildroot.BuildRoot(build_tree, runner, libdir=libdir, var=var) as build_root, \
|
||||
tempfile.TemporaryDirectory(prefix="osbuild-sources-output-", dir=var) as sources_output:
|
||||
if interactive:
|
||||
print_header(f"{self.name}: {self.id}", self.options)
|
||||
|
||||
args = {
|
||||
"tree": "/run/osbuild/tree",
|
||||
|
|
@ -90,7 +78,7 @@ class Stage:
|
|||
|
||||
ro_binds = [f"{sources_output}:/run/osbuild/sources"]
|
||||
|
||||
with API(f"{build_root.api}/osbuild", args, interactive) as api, \
|
||||
with API(f"{build_root.api}/osbuild", args, monitor) as api, \
|
||||
sources.SourcesServer(f"{build_root.api}/sources",
|
||||
libdir or "/usr/lib/osbuild",
|
||||
self.sources,
|
||||
|
|
@ -129,10 +117,8 @@ class Assembler:
|
|||
description["id"] = self.id
|
||||
return description
|
||||
|
||||
def run(self, tree, runner, build_tree, output_dir=None, interactive=False, libdir=None, var="/var/tmp"):
|
||||
def run(self, tree, runner, build_tree, monitor, output_dir=None, libdir=None, var="/var/tmp"):
|
||||
with buildroot.BuildRoot(build_tree, runner, libdir=libdir, var=var) as build_root:
|
||||
if interactive:
|
||||
print_header(f"Assembler {self.name}: {self.id}", self.options)
|
||||
|
||||
args = {
|
||||
"tree": "/run/osbuild/tree",
|
||||
|
|
@ -151,7 +137,7 @@ class Assembler:
|
|||
ro_binds = [f"{tree}:/run/osbuild/tree"]
|
||||
|
||||
with remoteloop.LoopServer(f"{build_root.api}/remoteloop"), \
|
||||
API(f"{build_root.api}/osbuild", args, interactive) as api:
|
||||
API(f"{build_root.api}/osbuild", args, monitor) as api:
|
||||
r = build_root.run(
|
||||
[f"/run/osbuild/lib/assemblers/{self.name}"],
|
||||
binds=binds,
|
||||
|
|
@ -202,7 +188,7 @@ class Pipeline:
|
|||
|
||||
return description
|
||||
|
||||
def build_stages(self, object_store, interactive, libdir):
|
||||
def build_stages(self, object_store, monitor, libdir):
|
||||
results = {"success": True}
|
||||
|
||||
# We need a build tree for the stages below, which is either
|
||||
|
|
@ -217,7 +203,7 @@ class Pipeline:
|
|||
build = self.build
|
||||
|
||||
r, t, tree = build.build_stages(object_store,
|
||||
interactive,
|
||||
monitor,
|
||||
libdir)
|
||||
|
||||
results["build"] = r
|
||||
|
|
@ -266,14 +252,19 @@ class Pipeline:
|
|||
|
||||
for stage in self.stages[base_idx + 1:]:
|
||||
with build_tree.read() as build_path, tree.write() as path:
|
||||
|
||||
monitor.stage(stage)
|
||||
|
||||
r = stage.run(path,
|
||||
self.runner,
|
||||
build_path,
|
||||
object_store.store,
|
||||
interactive=interactive,
|
||||
monitor,
|
||||
libdir=libdir,
|
||||
var=object_store.store)
|
||||
|
||||
monitor.result(r)
|
||||
|
||||
results["stages"].append(r.as_dict())
|
||||
if not r.success:
|
||||
cleanup(build_tree, tree)
|
||||
|
|
@ -285,7 +276,7 @@ class Pipeline:
|
|||
|
||||
return results, build_tree, tree
|
||||
|
||||
def assemble(self, object_store, build_tree, tree, interactive, libdir, output_directory: Optional[str]):
|
||||
def assemble(self, object_store, build_tree, tree, monitor, libdir, output_directory: Optional[str]):
|
||||
results = {"success": True}
|
||||
|
||||
if not self.assembler:
|
||||
|
|
@ -297,14 +288,18 @@ class Pipeline:
|
|||
tree.read() as input_dir, \
|
||||
output.write() as output_dir:
|
||||
|
||||
monitor.assembler(self.assembler)
|
||||
|
||||
r = self.assembler.run(input_dir,
|
||||
self.runner,
|
||||
build_dir,
|
||||
monitor,
|
||||
output_dir=output_dir,
|
||||
interactive=interactive,
|
||||
libdir=libdir,
|
||||
var=object_store.store)
|
||||
|
||||
monitor.result(r)
|
||||
|
||||
results["assembler"] = r.as_dict()
|
||||
if not r.success:
|
||||
output.cleanup()
|
||||
|
|
@ -319,10 +314,12 @@ class Pipeline:
|
|||
|
||||
return results
|
||||
|
||||
def run(self, store, interactive=False, libdir=None, output_directory=None):
|
||||
def run(self, store, monitor, libdir=None, output_directory=None):
|
||||
os.makedirs("/run/osbuild", exist_ok=True)
|
||||
results = {}
|
||||
|
||||
monitor.begin(self)
|
||||
|
||||
with objectstore.ObjectStore(store) as object_store:
|
||||
# If the final result is already in the store, no need to attempt
|
||||
# building it. Just fetch the cached information. If the associated
|
||||
|
|
@ -336,7 +333,7 @@ class Pipeline:
|
|||
output.export(output_directory)
|
||||
else:
|
||||
results, build_tree, tree = self.build_stages(object_store,
|
||||
interactive,
|
||||
monitor,
|
||||
libdir)
|
||||
|
||||
if not results["success"]:
|
||||
|
|
@ -345,12 +342,14 @@ class Pipeline:
|
|||
r = self.assemble(object_store,
|
||||
build_tree,
|
||||
tree,
|
||||
interactive,
|
||||
monitor,
|
||||
libdir,
|
||||
output_directory)
|
||||
|
||||
results.update(r) # This will also update 'success'
|
||||
|
||||
monitor.finish(results)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue