#!/usr/bin/python3 import datetime import dnf import hashlib import json import sys DNF_ERROR_EXIT_CODE = 10 def timestamp_to_rfc3339(timestamp): d = datetime.datetime.utcfromtimestamp(package.buildtime) return d.strftime('%Y-%m-%dT%H:%M:%SZ') def dnfrepo(desc, parent_conf=None): """Makes a dnf.repo.Repo out of a JSON repository description""" repo = dnf.repo.Repo(desc["id"], parent_conf) repo.name = desc["name"] if "baseurl" in desc: repo.baseurl = desc["baseurl"] elif "metalink" in desc: repo.metalink = desc["metalink"] elif "mirrorlist" in desc: repo.mirrorlist = desc["mirrorlist"] else: assert False if desc.get("ignoressl", False): repo.sslverify = False return repo def create_base(repos): base = dnf.Base() for repo in repos: base.repos.add(dnfrepo(repo, base.conf)) base.fill_sack(load_system_repo=False) return base def exit_with_dnf_error(kind: str, reason: str): json.dump({"kind": kind, "reason": reason}, sys.stdout) sys.exit(DNF_ERROR_EXIT_CODE) def repo_checksums(base): checksums = {} for repo in base.repos.iter_enabled(): # Uses the same algorithm as libdnf to find cache dir: # https://github.com/rpm-software-management/libdnf/blob/master/libdnf/repo/Repo.cpp#L1288 if repo.metalink: url = repo.metalink elif repo.mirrorlist: url = repo.mirrorlist elif repo.baseurl: url = repo.baseurl[0] else: assert False digest = hashlib.sha256(url.encode()).hexdigest()[:16] with open(f"{base.conf.cachedir}/{repo.id}-{digest}/repodata/repomd.xml", "rb") as f: repomd = f.read() checksums[repo.id] = "sha256:" + hashlib.sha256(repomd).hexdigest() return checksums call = json.load(sys.stdin) command = call["command"] arguments = call.get("arguments", {}) if command == "dump": base = create_base(arguments.get("repos", {})) packages = [] for package in base.sack.query().available(): packages.append({ "name": package.name, "summary": package.summary, "description": package.description, "url": package.url, "epoch": package.epoch, "version": package.version, "release": package.release, "arch": package.arch, "buildtime": timestamp_to_rfc3339(package.buildtime), "license": package.license }) json.dump({ "checksums": repo_checksums(base), "packages": packages }, sys.stdout) elif command == "depsolve": base = create_base(arguments.get("repos", {})) errors = [] try: base.install_specs(arguments["package-specs"]) except dnf.exceptions.MarkingErrors as e: exit_with_dnf_error("MarkingErrors", f"Error occurred when marking packages for installation: {e}") try: base.resolve() except dnf.exceptions.DepsolveError as e: exit_with_dnf_error("DepsolveError", f"There was a problem depsolving {arguments['package-specs']}: {e}") dependencies = [] for package in base.transaction.install_set: dependencies.append({ "name": package.name, "epoch": package.epoch, "version": package.version, "release": package.release, "arch": package.arch }) json.dump({ "checksums": repo_checksums(base), "dependencies": dependencies }, sys.stdout)