We must avoid depending on the host's state in any way. This achieves isolation in the following ways: - rather than the default config file /dev/null is used - rather than sharing the host persistent state dir a temporary one is used and thrown away for each call - the module_platform_id is set explicitly per supported distro, rather than taken from /etc/os-release. Optionally, the cache directory can be configured, as we may want to keep this separate from the host, if for no other reason than accounting. However, the cache appears to be well-behaved, so we can keep sharing it between calls (or even with the host). This speeds up things considerably, so this is definitely what we want. Signed-off-by: Tom Gundersen <teg@jklm.no>
146 lines
4.4 KiB
Python
146 lines
4.4 KiB
Python
#!/usr/bin/python3
|
|
|
|
import datetime
|
|
import dnf
|
|
import hashlib
|
|
import hawkey
|
|
import json
|
|
import shutil
|
|
import sys
|
|
import tempfile
|
|
|
|
DNF_ERROR_EXIT_CODE = 10
|
|
|
|
|
|
def timestamp_to_rfc3339(timestamp):
|
|
d = datetime.datetime.utcfromtimestamp(package.buildtime)
|
|
return d.strftime('%Y-%m-%dT%H:%M:%SZ')
|
|
|
|
|
|
def dnfrepo(desc, parent_conf=None):
|
|
"""Makes a dnf.repo.Repo out of a JSON repository description"""
|
|
|
|
repo = dnf.repo.Repo(desc["id"], parent_conf)
|
|
repo.name = desc["name"]
|
|
|
|
if "baseurl" in desc:
|
|
repo.baseurl = desc["baseurl"]
|
|
elif "metalink" in desc:
|
|
repo.metalink = desc["metalink"]
|
|
elif "mirrorlist" in desc:
|
|
repo.mirrorlist = desc["mirrorlist"]
|
|
else:
|
|
assert False
|
|
|
|
if desc.get("ignoressl", False):
|
|
repo.sslverify = False
|
|
|
|
return repo
|
|
|
|
|
|
def create_base(repos, module_platform_id, persistdir, cachedir, clean=False):
|
|
base = dnf.Base()
|
|
base.conf.module_platform_id = module_platform_id
|
|
base.conf.config_file_path = "/dev/null"
|
|
base.conf.persistdir = persistdir
|
|
if cachedir:
|
|
base.conf.cachedir = cachedir
|
|
|
|
if clean:
|
|
shutil.rmtree(base.conf.cachedir, ignore_errors=True)
|
|
|
|
for repo in repos:
|
|
base.repos.add(dnfrepo(repo, base.conf))
|
|
|
|
base.fill_sack(load_system_repo=False)
|
|
return base
|
|
|
|
|
|
def exit_with_dnf_error(kind: str, reason: str):
|
|
json.dump({"kind": kind, "reason": reason}, sys.stdout)
|
|
sys.exit(DNF_ERROR_EXIT_CODE)
|
|
|
|
|
|
def repo_checksums(base):
|
|
checksums = {}
|
|
for repo in base.repos.iter_enabled():
|
|
# Uses the same algorithm as libdnf to find cache dir:
|
|
# https://github.com/rpm-software-management/libdnf/blob/master/libdnf/repo/Repo.cpp#L1288
|
|
if repo.metalink:
|
|
url = repo.metalink
|
|
elif repo.mirrorlist:
|
|
url = repo.mirrorlist
|
|
elif repo.baseurl:
|
|
url = repo.baseurl[0]
|
|
else:
|
|
assert False
|
|
|
|
digest = hashlib.sha256(url.encode()).hexdigest()[:16]
|
|
|
|
with open(f"{base.conf.cachedir}/{repo.id}-{digest}/repodata/repomd.xml", "rb") as f:
|
|
repomd = f.read()
|
|
|
|
checksums[repo.id] = "sha256:" + hashlib.sha256(repomd).hexdigest()
|
|
|
|
return checksums
|
|
|
|
|
|
call = json.load(sys.stdin)
|
|
command = call["command"]
|
|
arguments = call["arguments"]
|
|
repos = arguments.get("repos", {})
|
|
clean = arguments.get("clean", False)
|
|
cachedir = arguments.get("cachedir", None)
|
|
module_platform_id = arguments["module_platform_id"]
|
|
|
|
with tempfile.TemporaryDirectory() as persistdir:
|
|
base = create_base(repos, module_platform_id, persistdir, cachedir, clean)
|
|
|
|
if command == "dump":
|
|
packages = []
|
|
for package in base.sack.query().available():
|
|
packages.append({
|
|
"name": package.name,
|
|
"summary": package.summary,
|
|
"description": package.description,
|
|
"url": package.url,
|
|
"epoch": package.epoch,
|
|
"version": package.version,
|
|
"release": package.release,
|
|
"arch": package.arch,
|
|
"buildtime": timestamp_to_rfc3339(package.buildtime),
|
|
"license": package.license
|
|
})
|
|
json.dump({
|
|
"checksums": repo_checksums(base),
|
|
"packages": packages
|
|
}, sys.stdout)
|
|
|
|
elif command == "depsolve":
|
|
errors = []
|
|
|
|
try:
|
|
base.install_specs(arguments["package-specs"], exclude=arguments.get("exclude-specs", []))
|
|
except dnf.exceptions.MarkingErrors as e:
|
|
exit_with_dnf_error("MarkingErrors", f"Error occurred when marking packages for installation: {e}")
|
|
|
|
try:
|
|
base.resolve()
|
|
except dnf.exceptions.DepsolveError as e:
|
|
exit_with_dnf_error("DepsolveError", f"There was a problem depsolving {arguments['package-specs']}: {e}")
|
|
|
|
dependencies = []
|
|
for package in base.transaction.install_set:
|
|
dependencies.append({
|
|
"name": package.name,
|
|
"epoch": package.epoch,
|
|
"version": package.version,
|
|
"release": package.release,
|
|
"arch": package.arch,
|
|
"remote_location": package.remote_location(),
|
|
"checksum": f"{hawkey.chksum_name(package.chksum[0])}:{package.chksum[1].hex()}",
|
|
})
|
|
json.dump({
|
|
"checksums": repo_checksums(base),
|
|
"dependencies": dependencies
|
|
}, sys.stdout)
|