#!/usr/bin/env python3 """`osbuild-dev` provides helper functionality for `osbuild` development mostly centered around manifest reading.""" # pylint: disable=unsupported-membership-test,unsupported-delete-operation # pylint: disable=unsubscriptable-object # pylint: disable=raise-missing-from # pylint: disable=subprocess-run-check import json import os import secrets import subprocess import sys import tempfile from typing import Any, Iterator, Optional try: import attrs import rich import typer from rich.tree import Tree except ImportError: print( "You are missing dependencies, please install `python3-attrs`, `python3-rich`, `python3-typer` or their `pip` equivalents." ) raise cli = typer.Typer() # Main command. man = typer.Typer() # Manifest subcommand. cli.add_typer(man, name="manifest") con = rich.console.Console() @cli.callback() def main() -> int: return 0 def detect_and_parse_inputs(inputs) -> Iterator[str]: """There are three valid formats for inputs to exist in, see: https://github.com/osbuild/osbuild/pull/1003. This function detects which one is in use and yields its hashes.""" if isinstance(inputs, dict): yield from inputs.keys() elif isinstance(inputs, list): if isinstance(inputs[0], str): yield from inputs elif isinstance(inputs[0], dict): for value in inputs: yield value["id"] else: con.print("[bold][red]Could not understand inputs format[/red][/bold]") sys.exit(1) else: con.print("[bold][red]Could not understand inputs format[/red][/bold]") sys.exit(1) def json_as_terminal_tree(tree: Optional[Tree], data: Any, name: str) -> Tree: """Convert JSON into a `rich` tree.""" if tree is None: tree = Tree("") if isinstance(data, (int, str)): subtree = tree.add(f"{name}: [bold]{data}[/bold]") elif isinstance(data, dict): subtree = tree.add(str(name)) for key, val in data.items(): json_as_terminal_tree(subtree, val, key) elif isinstance(data, list): name = f"{name} [italic]({len(data)})[/italic]" subtree = tree.add(name) for index, item in enumerate(data): json_as_terminal_tree(subtree, item, index) else: raise ValueError( f"json_as_terminal_tree does not know how to deal with {type(data)}" ) return subtree @attrs.define() class Manifest: name: str data: dict[str, Any] = attrs.Factory(dict) def ignore_a_stage(self, name: str) -> None: """Remove a stage from the data we represent.""" for pipeline in self.data["pipelines"]: to_pop = [] for index, stage in enumerate(pipeline["stages"]): if stage["type"] == name: to_pop.append(index) for index in to_pop: pipeline["stages"].pop(index) def ignore_sources(self) -> None: """Remove the `sources` section from the manifest.""" if "sources" in self.data: del self.data["sources"] def resolve_content_hashes(self) -> None: # If we're resolving content hashes back to names we adjust the data structure # in-place. sources = {} # We can't handle all source types but some we can if "org.osbuild.curl" in self.data["sources"]: for hasj, url in self.data["sources"]["org.osbuild.curl"][ "items" ].items(): sources[hasj] = url for pipeline in self.data["pipelines"]: for stage in pipeline["stages"]: if stage["type"] == "org.osbuild.rpm": stage["inputs"]["packages"]["references"] = { reference: sources[reference].split("/")[-1] for reference in detect_and_parse_inputs(stage["inputs"]["packages"]["references"]) } def print_for_terminal(self, path: Optional[str] = None) -> None: if path is None: con.print(json_as_terminal_tree(None, self.data, self.name)) else: with open(path, "w", encoding="utf8") as f: rich.print( json_as_terminal_tree(None, self.data, self.name), file=f ) def print_for_html(self) -> None: pass @classmethod def from_path(cls, path: str) -> "Manifest": try: with open(path, encoding="utf8") as f: data = json.load(f) except FileNotFoundError: con.print(f"[bold][red]Could not open file {path!r}[/red][/bold]") sys.exit(1) # We deal with this possibly being a 'wrapped' manifest, one produced # by `osbuild-composer`. if "manifest" in data: data = data["manifest"] if data.get("version") != "2": con.print(f"[bold][red]Could not parse file {path!r}, wrong manifest version.[/red][/bold]") sys.exit(1) return cls(os.path.basename(path), data) @man.command(name="print") def pretty_print( manifest_path: str, ignore_stage: list[str] = typer.Option([]), resolve_sources: bool = typer.Option( True, help="Resolve content hashes of sources to their names." ), skip_sources: bool = typer.Option( True, help="Skips display of the sources in the manifest." ), ) -> int: """Pretty print an `osbuild` manifest file.""" manifest = Manifest.from_path(manifest_path) for name in ignore_stage: manifest.ignore_a_stage(name) if resolve_sources: manifest.resolve_content_hashes() if skip_sources: manifest.ignore_sources() manifest.print_for_terminal() return 0 @man.command(name="diff") def pretty_diff( manifest_paths: list[str], simple: bool = typer.Option(False, help="Use `diff` instead of `vimdiff`"), ignore_stage: list[str] = typer.Option([]), resolve_sources: bool = typer.Option( True, help="Resolve content hashes of sources to their names." ), skip_sources: bool = typer.Option( True, help="Skips display of the sources in the manifest." ), ) -> int: """Pretty print a diff of `osbuild` manifest files.""" with tempfile.TemporaryDirectory() as temporary: paths = [] for manifest_path in manifest_paths: manifest = Manifest.from_path(manifest_path) for name in ignore_stage: manifest.ignore_a_stage(name) if resolve_sources: manifest.resolve_content_hashes() if skip_sources: manifest.ignore_sources() path = f"{temporary}/{os.path.basename(manifest_path)}-{secrets.token_hex(2)}" manifest.print_for_terminal(path) paths.append(path) subprocess.run((["diff", "-u"] if simple else ["vimdiff"]) + paths, check=True) return 0 if __name__ == "__main__": cli()