#!/usr/bin/python3 """Manifest-Pre-Processor - Depsolving This manifest-pre-processor consumes a manifest on stdin, processes it, and produces the resulting manifest on stdout. This tool adjusts the `org.osbuild.rpm` stage. It consumes the `mpp-depsolve` option and produces a package-list and source-entries. It supports version "1" and version "2" of the manifest description format. The parameters for this pre-processor, version "1", look like this: ``` ... { "name": "org.osbuild.rpm", ... "options": { ... "mpp-depsolve": { "architecture": "x86_64", "module-platform-id": "f32", "baseurl": "http://mirrors.kernel.org/fedora/releases/32/Everything/x86_64/os", "repos": [ { "id": "default", "metalink": "https://mirrors.fedoraproject.org/metalink?repo=fedora-32&arch=$basearch" } ], "packages": [ "@core", "dracut-config-generic", "grub2-pc", "kernel" ], "excludes": [ (optional excludes) ] } } } ... ``` The parameters for this pre-processor, version "2", look like this: ``` ... { "name": "org.osbuild.rpm", ... "inputs": { packages: { "mpp-depsolve": { see above } } } } ... ``` """ import argparse import collections import contextlib import json import os import sys import tempfile import urllib.parse from typing import Dict import dnf import hawkey class State: dnf_cache = None # DNF Cache Directory manifest = None # Input/Working Manifest manifest_urls = None # Link to sources URL dict manifest_todo = [] # Array of links to RPM stages def _dnf_repo(conf, desc): repo = dnf.repo.Repo(desc["id"], conf) if "baseurl" in desc: repo.baseurl = desc["baseurl"] elif "metalink" in desc: repo.metalink = desc["metalink"] elif "mirrorlist" in desc: repo.metalink = desc["mirrorlist"] else: raise ValueError("repo description does not contain baseurl, metalink, or mirrorlist keys") return repo def _dnf_base(repos, module_platform_id, persistdir, cachedir, arch): base = dnf.Base() if cachedir: base.conf.cachedir = cachedir base.conf.config_file_path = "/dev/null" base.conf.module_platform_id = module_platform_id base.conf.persistdir = persistdir base.conf.substitutions['arch'] = arch base.conf.substitutions['basearch'] = dnf.rpm.basearch(arch) for repo in repos: base.repos.add(_dnf_repo(base.conf, repo)) base.fill_sack(load_system_repo=False) return base def _dnf_resolve(state, mpp_depsolve): deps = [] arch = mpp_depsolve["architecture"] mpid = mpp_depsolve["module-platform-id"] repos = mpp_depsolve.get("repos", []) packages = mpp_depsolve.get("packages", []) excludes = mpp_depsolve.get("excludes", []) baseurl = mpp_depsolve.get("baseurl") baseurls = { repo["id"]: repo.get("baseurl", baseurl) for repo in repos } if len(packages) > 0: with tempfile.TemporaryDirectory() as persistdir: base = _dnf_base(repos, mpid, persistdir, state.dnf_cache, arch) base.install_specs(packages, exclude=excludes) base.resolve() for tsi in base.transaction: if tsi.action not in dnf.transaction.FORWARD_ACTIONS: continue checksum_type = hawkey.chksum_name(tsi.pkg.chksum[0]) checksum_hex = tsi.pkg.chksum[1].hex() path = tsi.pkg.relativepath base = baseurls[tsi.pkg.reponame] # dep["path"] often starts with a "/", even though it's meant to be # relative to `baseurl`. Strip any leading slashes, but ensure there's # exactly one between `baseurl` and the path. url = urllib.parse.urljoin(base + "/", path.lstrip("/")) pkg = { "checksum": f"{checksum_type}:{checksum_hex}", "name": tsi.pkg.name, "url": url, } deps.append(pkg) return deps def _manifest_enter(manifest, key, default): if key not in manifest: manifest[key] = default return manifest[key] def _sort_urls(urls: Dict): def get_sort_key(item): key = item[1] if isinstance(key, dict): key = key["url"] return key if not urls: return urls urls_sorted = sorted(urls.items(), key=get_sort_key) urls.clear() urls.update(collections.OrderedDict(urls_sorted)) def _manifest_parse_v1(state, data): manifest = data # Resolve "sources"."org.osbuild.files"."url". manifest_sources = _manifest_enter(manifest, "sources", {}) manifest_files = _manifest_enter(manifest_sources, "org.osbuild.files", {}) manifest_urls = _manifest_enter(manifest_files, "urls", {}) # Resolve "pipeline"."stages". manifest_pipeline = _manifest_enter(manifest, "pipeline", {}) manifest_stages = _manifest_enter(manifest_pipeline, "stages", []) # Collect all stages of interest in `manifest_todo`. manifest_todo = [] for stage in manifest_stages: if stage.get("name", "") != "org.osbuild.rpm": continue stage_options = _manifest_enter(stage, "options", {}) if "mpp-depsolve" not in stage_options: continue manifest_todo.append(stage) # Remember links of interest. state.manifest = manifest state.manifest_urls = manifest_urls state.manifest_todo = manifest_todo def _manifest_process_v1(state, stage): options = _manifest_enter(stage, "options", {}) options_mpp = _manifest_enter(options, "mpp-depsolve", {}) options_packages = _manifest_enter(options, "packages", []) del(options["mpp-depsolve"]) deps = _dnf_resolve(state, options_mpp) for dep in deps: options_packages.append(dep["checksum"]) state.manifest_urls[dep["checksum"]] = dep["url"] def _manifest_depsolve_v1(state, src): _manifest_parse_v1(state, src) for stage in state.manifest_todo: _manifest_process_v1(state, stage) def _manifest_parse_v2(state, manifest): todo = [] for pipeline in manifest.get("pipelines", {}): for stage in pipeline.get("stages", []): if stage["type"] != "org.osbuild.rpm": continue inputs = _manifest_enter(stage, "inputs", {}) packages = _manifest_enter(inputs, "packages", {}) if "mpp-depsolve" not in packages: continue todo.append(packages) sources = _manifest_enter(manifest, "sources", {}) files = _manifest_enter(sources, "org.osbuild.curl", {}) urls = _manifest_enter(files, "items", {}) state.manifest = manifest state.manifest_todo = todo state.manifest_urls = urls def _manifest_process_v2(state, ip): urls = state.manifest_urls refs = _manifest_enter(ip, "references", {}) mpp = ip["mpp-depsolve"] deps = _dnf_resolve(state, mpp) for dep in deps: checksum = dep["checksum"] refs[checksum] = {} urls[checksum] = dep["url"] del ip["mpp-depsolve"] def _manifest_depsolve_v2(state, src): _manifest_parse_v2(state, src) for todo in state.manifest_todo: _manifest_process_v2(state, todo) def _main_args(argv): parser = argparse.ArgumentParser(description="Generate Test Manifests") parser.add_argument( "--dnf-cache", metavar="PATH", type=os.path.abspath, default=None, help="Path to DNF cache-directory to use", ) return parser.parse_args(argv[1:]) @contextlib.contextmanager def _main_state(args): state = State() if args.dnf_cache: state.dnf_cache = args.dnf_cache yield state def _main_process(state): src = json.load(sys.stdin) version = src.get("version", "1") if version == "1": _manifest_depsolve_v1(state, src) elif version == "2": _manifest_depsolve_v2(state, src) else: print(f"Unknown manifest version {version}", file=sys.stderr) return 1 _sort_urls(state.manifest_urls) json.dump(state.manifest, sys.stdout, indent=2) sys.stdout.write("\n") return 0 def main() -> int: args = _main_args(sys.argv) with _main_state(args) as state: _main_process(state) return 0 if __name__ == "__main__": sys.exit(main())