debian-forge/tools/mpp-depsolve.py
Christian Kellner aa00e62fed tools/mpp: sort source urls
In both mpp-depsolve and mpp-import-pipeline, sort the packages to
url dictionary before writing the JSON. This makes it easier to
look for packages but more importantly ensures that the resulting
set of packages has the same ordering in the sources section
independently of how it was assembled.
2021-06-21 18:04:13 +02:00

331 lines
8.6 KiB
Python
Executable file

#!/usr/bin/python3
"""Manifest-Pre-Processor - Depsolving
This manifest-pre-processor consumes a manifest on stdin, processes it, and
produces the resulting manifest on stdout.
This tool adjusts the `org.osbuild.rpm` stage. It consumes the `mpp-depsolve`
option and produces a package-list and source-entries.
It supports version "1" and version "2" of the manifest description format.
The parameters for this pre-processor, version "1", look like this:
```
...
{
"name": "org.osbuild.rpm",
...
"options": {
...
"mpp-depsolve": {
"architecture": "x86_64",
"module-platform-id": "f32",
"baseurl": "http://mirrors.kernel.org/fedora/releases/32/Everything/x86_64/os",
"repos": [
{
"id": "default",
"metalink": "https://mirrors.fedoraproject.org/metalink?repo=fedora-32&arch=$basearch"
}
],
"packages": [
"@core",
"dracut-config-generic",
"grub2-pc",
"kernel"
],
"excludes": [
(optional excludes)
]
}
}
}
...
```
The parameters for this pre-processor, version "2", look like this:
```
...
{
"name": "org.osbuild.rpm",
...
"inputs": {
packages: {
"mpp-depsolve": {
see above
}
}
}
}
...
```
"""
import argparse
import collections
import contextlib
import json
import os
import sys
import tempfile
import urllib.parse
from typing import Dict
import dnf
import hawkey
class State:
dnf_cache = None # DNF Cache Directory
manifest = None # Input/Working Manifest
manifest_urls = None # Link to sources URL dict
manifest_todo = [] # Array of links to RPM stages
def _dnf_repo(conf, desc):
repo = dnf.repo.Repo(desc["id"], conf)
if "baseurl" in desc:
repo.baseurl = desc["baseurl"]
elif "metalink" in desc:
repo.metalink = desc["metalink"]
elif "mirrorlist" in desc:
repo.metalink = desc["mirrorlist"]
else:
raise ValueError("repo description does not contain baseurl, metalink, or mirrorlist keys")
return repo
def _dnf_base(repos, module_platform_id, persistdir, cachedir, arch):
base = dnf.Base()
if cachedir:
base.conf.cachedir = cachedir
base.conf.config_file_path = "/dev/null"
base.conf.module_platform_id = module_platform_id
base.conf.persistdir = persistdir
base.conf.substitutions['arch'] = arch
base.conf.substitutions['basearch'] = dnf.rpm.basearch(arch)
for repo in repos:
base.repos.add(_dnf_repo(base.conf, repo))
base.fill_sack(load_system_repo=False)
return base
def _dnf_resolve(state, mpp_depsolve):
deps = []
arch = mpp_depsolve["architecture"]
mpid = mpp_depsolve["module-platform-id"]
repos = mpp_depsolve.get("repos", [])
packages = mpp_depsolve.get("packages", [])
excludes = mpp_depsolve.get("excludes", [])
baseurl = mpp_depsolve.get("baseurl")
baseurls = {
repo["id"]: repo.get("baseurl", baseurl) for repo in repos
}
if len(packages) > 0:
with tempfile.TemporaryDirectory() as persistdir:
base = _dnf_base(repos, mpid, persistdir, state.dnf_cache, arch)
base.install_specs(packages, exclude=excludes)
base.resolve()
for tsi in base.transaction:
if tsi.action not in dnf.transaction.FORWARD_ACTIONS:
continue
checksum_type = hawkey.chksum_name(tsi.pkg.chksum[0])
checksum_hex = tsi.pkg.chksum[1].hex()
path = tsi.pkg.relativepath
base = baseurls[tsi.pkg.reponame]
# dep["path"] often starts with a "/", even though it's meant to be
# relative to `baseurl`. Strip any leading slashes, but ensure there's
# exactly one between `baseurl` and the path.
url = urllib.parse.urljoin(base + "/", path.lstrip("/"))
pkg = {
"checksum": f"{checksum_type}:{checksum_hex}",
"name": tsi.pkg.name,
"url": url,
}
deps.append(pkg)
return deps
def _manifest_enter(manifest, key, default):
if key not in manifest:
manifest[key] = default
return manifest[key]
def _sort_urls(urls: Dict):
def get_sort_key(item):
key = item[1]
if isinstance(key, dict):
key = key["url"]
return key
if not urls:
return urls
urls_sorted = sorted(urls.items(), key=get_sort_key)
urls.clear()
urls.update(collections.OrderedDict(urls_sorted))
def _manifest_parse_v1(state, data):
manifest = data
# Resolve "sources"."org.osbuild.files"."url".
manifest_sources = _manifest_enter(manifest, "sources", {})
manifest_files = _manifest_enter(manifest_sources, "org.osbuild.files", {})
manifest_urls = _manifest_enter(manifest_files, "urls", {})
# Resolve "pipeline"."stages".
manifest_pipeline = _manifest_enter(manifest, "pipeline", {})
manifest_stages = _manifest_enter(manifest_pipeline, "stages", [])
# Collect all stages of interest in `manifest_todo`.
manifest_todo = []
for stage in manifest_stages:
if stage.get("name", "") != "org.osbuild.rpm":
continue
stage_options = _manifest_enter(stage, "options", {})
if "mpp-depsolve" not in stage_options:
continue
manifest_todo.append(stage)
# Remember links of interest.
state.manifest = manifest
state.manifest_urls = manifest_urls
state.manifest_todo = manifest_todo
def _manifest_process_v1(state, stage):
options = _manifest_enter(stage, "options", {})
options_mpp = _manifest_enter(options, "mpp-depsolve", {})
options_packages = _manifest_enter(options, "packages", [])
del(options["mpp-depsolve"])
deps = _dnf_resolve(state, options_mpp)
for dep in deps:
options_packages.append(dep["checksum"])
state.manifest_urls[dep["checksum"]] = dep["url"]
def _manifest_depsolve_v1(state, src):
_manifest_parse_v1(state, src)
for stage in state.manifest_todo:
_manifest_process_v1(state, stage)
def _manifest_parse_v2(state, manifest):
todo = []
for pipeline in manifest.get("pipelines", {}):
for stage in pipeline.get("stages", []):
if stage["type"] != "org.osbuild.rpm":
continue
inputs = _manifest_enter(stage, "inputs", {})
packages = _manifest_enter(inputs, "packages", {})
if "mpp-depsolve" not in packages:
continue
todo.append(packages)
sources = _manifest_enter(manifest, "sources", {})
files = _manifest_enter(sources, "org.osbuild.curl", {})
urls = _manifest_enter(files, "items", {})
state.manifest = manifest
state.manifest_todo = todo
state.manifest_urls = urls
def _manifest_process_v2(state, ip):
urls = state.manifest_urls
refs = _manifest_enter(ip, "references", {})
mpp = ip["mpp-depsolve"]
deps = _dnf_resolve(state, mpp)
for dep in deps:
checksum = dep["checksum"]
refs[checksum] = {}
urls[checksum] = dep["url"]
del ip["mpp-depsolve"]
def _manifest_depsolve_v2(state, src):
_manifest_parse_v2(state, src)
for todo in state.manifest_todo:
_manifest_process_v2(state, todo)
def _main_args(argv):
parser = argparse.ArgumentParser(description="Generate Test Manifests")
parser.add_argument(
"--dnf-cache",
metavar="PATH",
type=os.path.abspath,
default=None,
help="Path to DNF cache-directory to use",
)
return parser.parse_args(argv[1:])
@contextlib.contextmanager
def _main_state(args):
state = State()
if args.dnf_cache:
state.dnf_cache = args.dnf_cache
yield state
def _main_process(state):
src = json.load(sys.stdin)
version = src.get("version", "1")
if version == "1":
_manifest_depsolve_v1(state, src)
elif version == "2":
_manifest_depsolve_v2(state, src)
else:
print(f"Unknown manifest version {version}", file=sys.stderr)
return 1
_sort_urls(state.manifest_urls)
json.dump(state.manifest, sys.stdout, indent=2)
sys.stdout.write("\n")
return 0
def main() -> int:
args = _main_args(sys.argv)
with _main_state(args) as state:
_main_process(state)
return 0
if __name__ == "__main__":
sys.exit(main())