tools/osbuild-depsolve-dnf5: add gpg keys to repos in response

Add the full gpg keys to the repository configs in the response.

On each repository object from dnf, the gpg keys are URLs, either
file:// or http(s)://.  We need to resolve these and return them with
in the response.

When the URL is a file:// path, and it comes from a .repo config file,
we assume that the path is relative to the root_dir, so we prepend it to
the path in the file.  This is so that repo configs in OS root trees can
be used unmodified.  However, when a key is defined in the request, we
should assume that the path is valid, either because it was defined by
the caller as a URL, or because it was defined in-line in the request
and osbuild-depsolve-dnf5 wrote it to the persistdir itself.

A new exception is defined to identify errors during this process.
This commit is contained in:
Achilleas Koutsou 2024-03-28 19:39:05 +01:00 committed by Brian C. Lane
parent 629f171f72
commit 782c0d907d

View file

@ -12,6 +12,9 @@ import os
import sys
import tempfile
import traceback
import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime
from typing import List
@ -331,9 +334,13 @@ class Solver():
})
return packages
def depsolve(self, transactions):
def depsolve(self, arguments):
"""depsolve returns a list of the dependencies for the set of transactions
"""
transactions = arguments.get("transactions", [])
# collect repo IDs from the request so we know whether to translate gpg key paths
request_repo_ids = set(repo["id"] for repo in arguments.get("repos", []))
root_dir = arguments.get("root_dir")
last_transaction: List = []
for transaction in transactions:
@ -372,10 +379,10 @@ class Solver():
if len(transactions) > 0 and len(last_transaction) == 0:
raise TransactionError("Empty transaction results")
dependencies = []
packages = []
pkg_repos = {}
for package in last_transaction:
pkg_repo_cfg = package.get_repo().get_config()
dependencies.append({
packages.append({
"name": package.get_name(),
"epoch": int(package.get_epoch()),
"version": package.get_version(),
@ -385,14 +392,76 @@ class Solver():
"path": package.get_location(),
"remote_location": remote_location(package),
"checksum": f"{package.get_checksum().get_type_str()}:{package.get_checksum().get_checksum()}",
"sslverify": pkg_repo_cfg.get_sslverify_option().get_value(),
"gpgcheck": pkg_repo_cfg.get_gpgcheck_option().get_value(),
"sslclientkey": pkg_repo_cfg.get_sslclientkey_option().get_value(),
"sslclientcert": pkg_repo_cfg.get_sslclientcert_option().get_value(),
"sslcacert": pkg_repo_cfg.get_sslcacert_option().get_value(),
})
# collect repository objects by id to create the 'repositories' collection for the response
pkg_repo = package.get_repo()
pkg_repos[pkg_repo.get_id()] = pkg_repo
return sorted(dependencies, key=lambda x: x["path"])
packages = sorted(packages, key=lambda x: x["path"])
def get_string_option(option):
# option.get_value() causes an error if it's unset for string values, so check if it's empty first
if option.empty():
return None
return option.get_value()
repositories = {} # full repository configs for the response
for repo in pkg_repos.values():
repo_cfg = repo.get_config()
repositories[repo.get_id()] = {
"id": repo.get_id(),
"name": repo.get_name(),
"baseurl": list(repo_cfg.get_baseurl_option().get_value()), # resolves to () if unset
"metalink": get_string_option(repo_cfg.get_metalink_option()),
"mirrorlist": get_string_option(repo_cfg.get_mirrorlist_option()),
"gpgcheck": repo_cfg.get_gpgcheck_option().get_value(),
"check_repogpg": repo_cfg.get_repo_gpgcheck_option().get_value(),
"gpgkeys": read_keys(repo_cfg.get_gpgkey_option().get_value(),
root_dir if repo.get_id() not in request_repo_ids else None),
"sslverify": repo_cfg.get_sslverify_option().get_value(),
"sslclientkey": get_string_option(repo_cfg.get_sslclientkey_option()),
"sslclientcert": get_string_option(repo_cfg.get_sslclientcert_option()),
"sslcacert": get_string_option(repo_cfg.get_sslcacert_option()),
}
response = {
"packages": packages,
"repos": repositories,
}
return response
class GPGKeyReadError(Exception):
pass
def modify_rootdir_path(path, root_dir):
if path and root_dir:
# if the root_dir is set, we need to translate the key path to be under this directory
return os.path.join(root_dir, path.lstrip("/"))
return path
def read_keys(paths, root_dir=None):
keys = []
for path in paths:
url = urllib.parse.urlparse(path)
if url.scheme == "file":
path = path.removeprefix("file://")
path = modify_rootdir_path(path, root_dir)
try:
with open(path, mode="r", encoding="utf-8") as keyfile:
keys.append(keyfile.read())
except Exception as e:
raise GPGKeyReadError(f"error loading gpg key from {path}: {e}") from e
elif url.scheme in ["http", "https"]:
try:
resp = urllib.request.urlopen(urllib.request.Request(path))
keys.append(resp.read().decode())
except urllib.error.URLError as e:
raise GPGKeyReadError(f"error reading remote gpg key at {path}: {e}") from e
else:
raise GPGKeyReadError(f"unknown url scheme for gpg key: {url.scheme} ({path})")
return keys
def setup_cachedir(request):
@ -419,7 +488,7 @@ def solve(request, cache_dir):
if command == "dump":
result = solver.dump()
elif command == "depsolve":
result = solver.depsolve(arguments.get("transactions"))
result = solver.depsolve(arguments)
elif command == "search":
result = solver.search(arguments.get("search", {}))
except TransactionError as e: