diff --git a/osbuild/api.py b/osbuild/api.py index 4674aee9..4d77d2b2 100644 --- a/osbuild/api.py +++ b/osbuild/api.py @@ -2,19 +2,18 @@ import asyncio import io import json import os -import sys import tempfile import threading from .util import jsoncomm class API: - def __init__(self, socket_address, args, interactive): + def __init__(self, socket_address, args, monitor): self.socket_address = socket_address self.input = args - self.interactive = interactive self._output_data = io.StringIO() self._output_pipe = None + self.monitor = monitor self.event_loop = asyncio.new_event_loop() self.thread = threading.Thread(target=self._run_event_loop) self.barrier = threading.Barrier(2) @@ -41,9 +40,7 @@ class API: raw = os.read(self._output_pipe, 4096) data = raw.decode("utf-8") self._output_data.write(data) - - if self.interactive: - sys.stdout.write(data) + self.monitor.log(data) def _setup_stdio(self, server, addr): with self._prepare_input() as stdin, \ diff --git a/osbuild/main_cli.py b/osbuild/main_cli.py index e7104409..5a34ec57 100644 --- a/osbuild/main_cli.py +++ b/osbuild/main_cli.py @@ -13,6 +13,7 @@ import sys import osbuild import osbuild.meta +import osbuild.monitor RESET = "\033[0m" @@ -145,10 +146,13 @@ def osbuild_cli(): print("No output directory or checkpoints specified, exited without building.") return 0 + monitor_name = "NullMonitor" if args.json else "LogMonitor" + monitor = osbuild.monitor.make(monitor_name, sys.stdout.fileno()) + try: r = pipeline.run( args.store, - interactive=not args.json, + monitor, libdir=args.libdir, output_directory=args.output_directory ) diff --git a/osbuild/monitor.py b/osbuild/monitor.py new file mode 100644 index 00000000..4d3c9d06 --- /dev/null +++ b/osbuild/monitor.py @@ -0,0 +1,126 @@ +""" +Monitor pipeline activity + +The osbuild `Pipeline` class supports monitoring of its activities +by providing a monitor object that implements the `BaseMonitor` +interface. During the execution of the pipeline various functions +are called on the monitor object at certain events. Consult the +`BaseMonitor` class for the description of all available events. +""" + +import abc +import json +import os +import sys + +from typing import Dict + +import osbuild + + +RESET = "\033[0m" +BOLD = "\033[1m" + + +class TextWriter: + """Helper class for writing text to file descriptors""" + def __init__(self, fd: int): + self.fd = fd + self.isatty = os.isatty(fd) + + def term(self, text, *, clear=False): + """Write text if attached to a terminal.""" + if not self.isatty: + return + + if clear: + self.write(RESET) + + self.write(text) + + def write(self, text: str): + """Write all of text to the log file descriptor""" + data = text.encode("utf-8") + n = len(data) + while n: + k = os.write(self.fd, data) + n -= k + if n: + data = data[n:] + + +class BaseMonitor(abc.ABC): + """Base class for all pipeline monitors""" + + def __init__(self, fd: int): + """Logging will be done to file descriptor `fd`""" + self.out = TextWriter(fd) + + def begin(self, pipeline: osbuild.Pipeline): + """Called once at the beginning of a build""" + + def finish(self, result: Dict): + """Called at the very end of the build""" + + def stage(self, stage: osbuild.Stage): + """Called when a stage is being built""" + + def assembler(self, assembler: osbuild.Assembler): + """Called when an assembler is being built""" + + def result(self, result: osbuild.pipeline.BuildResult): + """Called when a module is done with its result""" + + def log(self, message: str): + """Called for all module log outputs""" + + +class NullMonitor(BaseMonitor): + """Monitor class that does not report anything""" + + +class LogMonitor(BaseMonitor): + """Monitor that follows show the log output of modules + + This monitor will print a header with `name: id` followed + by the options for each module as it is being built. The + full log messages of the modules will be print as soon as + they become available. + The constructor argument `fd` is a file descriptor, where + the log will get written to. If `fd` is a `TTY`, escape + sequences will be used to highlight sections of the log. + """ + def stage(self, stage): + self.module(stage) + + def assembler(self, assembler): + self.out.term(BOLD, clear=True) + self.out.write("Assembler ") + self.out.term(RESET) + + self.module(assembler) + + def module(self, module): + options = module.options or {} + title = f"{module.name}: {module.id}" + + self.out.term(BOLD, clear=True) + self.out.write(title) + self.out.term(RESET) + self.out.write(" ") + + json.dump(options, self.out, indent=2) + self.out.write("\n") + + def log(self, message): + self.out.write(message) + + +def make(name, fd): + module = sys.modules[__name__] + monitor = getattr(module, name, None) + if not monitor: + raise ValueError(f"Unknown monitor: {name}") + if not issubclass(monitor, BaseMonitor): + raise ValueError(f"Invalid monitor: {name}") + return monitor(fd) diff --git a/osbuild/pipeline.py b/osbuild/pipeline.py index 78126786..d80e561c 100644 --- a/osbuild/pipeline.py +++ b/osbuild/pipeline.py @@ -13,10 +13,6 @@ from . import sources from .util import osrelease -RESET = "\033[0m" -BOLD = "\033[1m" - - def cleanup(*objs): """Call cleanup method for all objects, filters None values out""" _ = map(lambda o: o.cleanup(), filter(None, objs)) @@ -34,12 +30,6 @@ class BuildResult: return vars(self) -def print_header(title, options): - print() - print(f"{RESET}{BOLD}{title}{RESET} " + json.dumps(options or {}, indent=2)) - print() - - class Stage: def __init__(self, name, source_options, build, base, options): self.name = name @@ -71,13 +61,11 @@ class Stage: runner, build_tree, cache, - interactive=False, + monitor, libdir=None, var="/var/tmp"): with buildroot.BuildRoot(build_tree, runner, libdir=libdir, var=var) as build_root, \ tempfile.TemporaryDirectory(prefix="osbuild-sources-output-", dir=var) as sources_output: - if interactive: - print_header(f"{self.name}: {self.id}", self.options) args = { "tree": "/run/osbuild/tree", @@ -90,7 +78,7 @@ class Stage: ro_binds = [f"{sources_output}:/run/osbuild/sources"] - with API(f"{build_root.api}/osbuild", args, interactive) as api, \ + with API(f"{build_root.api}/osbuild", args, monitor) as api, \ sources.SourcesServer(f"{build_root.api}/sources", libdir or "/usr/lib/osbuild", self.sources, @@ -129,10 +117,8 @@ class Assembler: description["id"] = self.id return description - def run(self, tree, runner, build_tree, output_dir=None, interactive=False, libdir=None, var="/var/tmp"): + def run(self, tree, runner, build_tree, monitor, output_dir=None, libdir=None, var="/var/tmp"): with buildroot.BuildRoot(build_tree, runner, libdir=libdir, var=var) as build_root: - if interactive: - print_header(f"Assembler {self.name}: {self.id}", self.options) args = { "tree": "/run/osbuild/tree", @@ -151,7 +137,7 @@ class Assembler: ro_binds = [f"{tree}:/run/osbuild/tree"] with remoteloop.LoopServer(f"{build_root.api}/remoteloop"), \ - API(f"{build_root.api}/osbuild", args, interactive) as api: + API(f"{build_root.api}/osbuild", args, monitor) as api: r = build_root.run( [f"/run/osbuild/lib/assemblers/{self.name}"], binds=binds, @@ -202,7 +188,7 @@ class Pipeline: return description - def build_stages(self, object_store, interactive, libdir): + def build_stages(self, object_store, monitor, libdir): results = {"success": True} # We need a build tree for the stages below, which is either @@ -217,7 +203,7 @@ class Pipeline: build = self.build r, t, tree = build.build_stages(object_store, - interactive, + monitor, libdir) results["build"] = r @@ -266,14 +252,19 @@ class Pipeline: for stage in self.stages[base_idx + 1:]: with build_tree.read() as build_path, tree.write() as path: + + monitor.stage(stage) + r = stage.run(path, self.runner, build_path, object_store.store, - interactive=interactive, + monitor, libdir=libdir, var=object_store.store) + monitor.result(r) + results["stages"].append(r.as_dict()) if not r.success: cleanup(build_tree, tree) @@ -285,7 +276,7 @@ class Pipeline: return results, build_tree, tree - def assemble(self, object_store, build_tree, tree, interactive, libdir, output_directory: Optional[str]): + def assemble(self, object_store, build_tree, tree, monitor, libdir, output_directory: Optional[str]): results = {"success": True} if not self.assembler: @@ -297,14 +288,18 @@ class Pipeline: tree.read() as input_dir, \ output.write() as output_dir: + monitor.assembler(self.assembler) + r = self.assembler.run(input_dir, self.runner, build_dir, + monitor, output_dir=output_dir, - interactive=interactive, libdir=libdir, var=object_store.store) + monitor.result(r) + results["assembler"] = r.as_dict() if not r.success: output.cleanup() @@ -319,10 +314,12 @@ class Pipeline: return results - def run(self, store, interactive=False, libdir=None, output_directory=None): + def run(self, store, monitor, libdir=None, output_directory=None): os.makedirs("/run/osbuild", exist_ok=True) results = {} + monitor.begin(self) + with objectstore.ObjectStore(store) as object_store: # If the final result is already in the store, no need to attempt # building it. Just fetch the cached information. If the associated @@ -336,7 +333,7 @@ class Pipeline: output.export(output_directory) else: results, build_tree, tree = self.build_stages(object_store, - interactive, + monitor, libdir) if not results["success"]: @@ -345,12 +342,14 @@ class Pipeline: r = self.assemble(object_store, build_tree, tree, - interactive, + monitor, libdir, output_directory) results.update(r) # This will also update 'success' + monitor.finish(results) + return results