#!/usr/bin/python3 import argparse import json import os import osbuild import subprocess import sys RESET = "\033[0m" BOLD = "\033[1m" RED = "\033[31m" def run_interactive(pipeline_path, input_dir, output_dir): with open(pipeline_path) as f: pipeline = json.load(f) with osbuild.BuildRoot() as buildroot, osbuild.tmpfs() as tree: for i, stage in enumerate(pipeline["stages"], start=1): name = stage["name"] options = stage.get("options", {}) print() print(f"{RESET}{BOLD}{i}. {name}{RESET} " + json.dumps(options or {}, indent=2)) print("Inspect with:") print(f"\t# nsenter -a --wd=/root -t `machinectl show {buildroot.machine_name} -p Leader --value`") print() buildroot.run_stage(stage, tree, input_dir) assembler = pipeline.get("assembler") if assembler: name = assembler["name"] options = assembler.get("options", {}) print() print(f"{RESET}{BOLD}Assembling: {name}{RESET} " + json.dumps(options or {}, indent=2)) print("Inspect with:") print(f"\t# nsenter -a --wd=/root -t `machinectl show {buildroot.machine_name} -p Leader --value`") print() buildroot.run_assembler(assembler, tree, input_dir, output_dir) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Build operating system images") parser.add_argument("pipeline_path", metavar="PIPELINE", help="json file containing the pipeline that should be built") requiredNamed = parser.add_argument_group('required named arguments') requiredNamed.add_argument("-i", "--input", dest="input_dir", metavar="DIRECTORY", type=os.path.abspath, help="provide the contents of DIRECTORY to the first stage", required=True) requiredNamed.add_argument("-o", "--output", dest="output_dir", metavar="DIRECTORY", type=os.path.abspath, help="provide the empty DIRECTORY as output argument to the last stage", required=True) args = parser.parse_args() os.makedirs("/run/osbuild", exist_ok=True) try: run_interactive(args.pipeline_path, args.input_dir, args.output_dir) except KeyboardInterrupt: print() print(f"{RESET}{BOLD}{RED}Aborted{RESET}") sys.exit(130) except osbuild.StageFailed as error: print() print(f"{RESET}{BOLD}{RED}{error.stage} failed with code {error.returncode}{RESET}") sys.exit(1)