Rename the parsing and process functions to have a format version specific suffix. This should make it easy to add support for the format version 2. Logic should be unchanged.
233 lines
6.4 KiB
Python
Executable file
233 lines
6.4 KiB
Python
Executable file
#!/usr/bin/python3
|
|
|
|
"""Manifest-Pre-Processor - Depsolving
|
|
|
|
This manifest-pre-processor consumes a manifest on stdin, processes it, and
|
|
produces the resulting manifest on stdout.
|
|
|
|
This tool adjusts the `org.osbuild.rpm` stage. It consumes the `mpp-depsolve`
|
|
option and produces a package-list and source-entries.
|
|
|
|
The parameters for this pre-processor look like this:
|
|
|
|
```
|
|
...
|
|
{
|
|
"name": "org.osbuild.rpm",
|
|
...
|
|
"options": {
|
|
...
|
|
"mpp-depsolve": {
|
|
"architecture": "x86_64",
|
|
"module-platform-id": "f32",
|
|
"baseurl": "http://mirrors.kernel.org/fedora/releases/32/Everything/x86_64/os",
|
|
"repos": [
|
|
{
|
|
"id": "default",
|
|
"metalink": "https://mirrors.fedoraproject.org/metalink?repo=fedora-32&arch=$basearch"
|
|
}
|
|
],
|
|
"packages": [
|
|
"@core",
|
|
"dracut-config-generic",
|
|
"grub2-pc",
|
|
"kernel"
|
|
],
|
|
"excludes": [
|
|
(optional excludes)
|
|
]
|
|
}
|
|
}
|
|
}
|
|
...
|
|
```
|
|
"""
|
|
|
|
import argparse
|
|
import contextlib
|
|
import json
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
import urllib.parse
|
|
|
|
import dnf
|
|
import hawkey
|
|
|
|
|
|
class State:
|
|
dnf_cache = None # DNF Cache Directory
|
|
|
|
manifest = None # Input/Working Manifest
|
|
manifest_urls = None # Link to sources URL dict
|
|
manifest_todo = [] # Array of links to RPM stages
|
|
|
|
|
|
def _dnf_repo(conf, desc):
|
|
repo = dnf.repo.Repo(desc["id"], conf)
|
|
if "baseurl" in desc:
|
|
repo.baseurl = desc["baseurl"]
|
|
elif "metalink" in desc:
|
|
repo.metalink = desc["metalink"]
|
|
elif "mirrorlist" in desc:
|
|
repo.metalink = desc["mirrorlist"]
|
|
else:
|
|
raise ValueError("repo description does not contain baseurl, metalink, or mirrorlist keys")
|
|
return repo
|
|
|
|
|
|
def _dnf_base(repos, module_platform_id, persistdir, cachedir, arch):
|
|
base = dnf.Base()
|
|
if cachedir:
|
|
base.conf.cachedir = cachedir
|
|
base.conf.config_file_path = "/dev/null"
|
|
base.conf.module_platform_id = module_platform_id
|
|
base.conf.persistdir = persistdir
|
|
base.conf.substitutions['arch'] = arch
|
|
base.conf.substitutions['basearch'] = dnf.rpm.basearch(arch)
|
|
|
|
for repo in repos:
|
|
base.repos.add(_dnf_repo(base.conf, repo))
|
|
|
|
base.fill_sack(load_system_repo=False)
|
|
return base
|
|
|
|
|
|
def _dnf_resolve(state, mpp_depsolve):
|
|
deps = []
|
|
|
|
arch = mpp_depsolve["architecture"]
|
|
mpid = mpp_depsolve["module-platform-id"]
|
|
repos = mpp_depsolve.get("repos", [])
|
|
packages = mpp_depsolve.get("packages", [])
|
|
excludes = mpp_depsolve.get("excludes", [])
|
|
|
|
if len(packages) > 0:
|
|
with tempfile.TemporaryDirectory() as persistdir:
|
|
base = _dnf_base(repos, mpid, persistdir, state.dnf_cache, arch)
|
|
base.install_specs(packages, exclude=excludes)
|
|
base.resolve()
|
|
|
|
for tsi in base.transaction:
|
|
if tsi.action not in dnf.transaction.FORWARD_ACTIONS:
|
|
continue
|
|
|
|
checksum_type = hawkey.chksum_name(tsi.pkg.chksum[0])
|
|
checksum_hex = tsi.pkg.chksum[1].hex()
|
|
pkg = {
|
|
"checksum": f"{checksum_type}:{checksum_hex}",
|
|
"name": tsi.pkg.name,
|
|
"path": tsi.pkg.relativepath,
|
|
}
|
|
deps.append(pkg)
|
|
|
|
return deps
|
|
|
|
|
|
def _manifest_enter(manifest, key, default):
|
|
if key not in manifest:
|
|
manifest[key] = default
|
|
return manifest[key]
|
|
|
|
|
|
def _manifest_parse_v1(state, data):
|
|
manifest = data
|
|
|
|
# Resolve "sources"."org.osbuild.files"."url".
|
|
manifest_sources = _manifest_enter(manifest, "sources", {})
|
|
manifest_files = _manifest_enter(manifest_sources, "org.osbuild.files", {})
|
|
manifest_urls = _manifest_enter(manifest_files, "urls", {})
|
|
|
|
# Resolve "pipeline"."stages".
|
|
manifest_pipeline = _manifest_enter(manifest, "pipeline", {})
|
|
manifest_stages = _manifest_enter(manifest_pipeline, "stages", [])
|
|
|
|
# Collect all stages of interest in `manifest_todo`.
|
|
manifest_todo = []
|
|
for stage in manifest_stages:
|
|
if stage.get("name", "") != "org.osbuild.rpm":
|
|
continue
|
|
|
|
stage_options = _manifest_enter(stage, "options", {})
|
|
if "mpp-depsolve" not in stage_options:
|
|
continue
|
|
|
|
manifest_todo.append(stage)
|
|
|
|
# Remember links of interest.
|
|
state.manifest = manifest
|
|
state.manifest_urls = manifest_urls
|
|
state.manifest_todo = manifest_todo
|
|
|
|
|
|
def _manifest_process_v1(state, stage):
|
|
options = _manifest_enter(stage, "options", {})
|
|
options_mpp = _manifest_enter(options, "mpp-depsolve", {})
|
|
options_packages = _manifest_enter(options, "packages", [])
|
|
baseurl = options_mpp["baseurl"]
|
|
|
|
del(options["mpp-depsolve"])
|
|
|
|
deps = _dnf_resolve(state, options_mpp)
|
|
for dep in deps:
|
|
options_packages.append(dep["checksum"])
|
|
# dep["path"] often starts with a "/", even though it's meant to be
|
|
# relative to `baseurl`. Strip any leading slashes, but ensure there's
|
|
# exactly one between `baseurl` and the path.
|
|
url = urllib.parse.urljoin(baseurl + "/", dep["path"].lstrip("/"))
|
|
state.manifest_urls[dep["checksum"]] = url
|
|
|
|
|
|
def _manifest_depsolve_v1(state, src):
|
|
_manifest_parse_v1(state, src)
|
|
|
|
for stage in state.manifest_todo:
|
|
_manifest_process_v1(state, stage)
|
|
|
|
|
|
def _main_args(argv):
|
|
parser = argparse.ArgumentParser(description="Generate Test Manifests")
|
|
|
|
parser.add_argument(
|
|
"--dnf-cache",
|
|
metavar="PATH",
|
|
type=os.path.abspath,
|
|
default=None,
|
|
help="Path to DNF cache-directory to use",
|
|
)
|
|
|
|
return parser.parse_args(argv[1:])
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def _main_state(args):
|
|
state = State()
|
|
if args.dnf_cache:
|
|
state.dnf_cache = args.dnf_cache
|
|
yield state
|
|
|
|
|
|
def _main_process(state):
|
|
src = json.load(sys.stdin)
|
|
version = src.get("version", "1")
|
|
if version == "1":
|
|
_manifest_depsolve_v1(state, src)
|
|
else:
|
|
print(f"Unknown manifest version {version}", file=sys.stderr)
|
|
return 1
|
|
|
|
json.dump(state.manifest, sys.stdout, indent=2)
|
|
sys.stdout.write("\n")
|
|
return 0
|
|
|
|
|
|
def main() -> int:
|
|
args = _main_args(sys.argv)
|
|
with _main_state(args) as state:
|
|
_main_process(state)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|