tools/osbuild-depsolve-dnf: add gpg keys to repos in response

Add the full gpg keys to the repository configs in the response.

On each repository object from dnf, the gpg keys are URLs, either
file:// or http(s)://.  We need to resolve these and return them with
in the response.

When the URL is a file:// path, and it comes from a .repo config file,
we assume that the path is relative to the root_dir, so we prepend it to
the path in the file.  This is so that repo configs in OS root trees can
be used unmodified.  However, when a key is defined in the request, we
should assume that the path is valid, either because it was defined by
the caller as a URL, or because it was defined in-line in the request
and osbuild-depsolve-dnf wrote it to the persistdir itself.

A new exception is defined to identify errors during this process.
This commit is contained in:
Achilleas Koutsou 2024-03-28 15:19:38 +01:00 committed by Brian C. Lane
parent 9552ba0fc1
commit d2ce43ee50

View file

@ -11,6 +11,9 @@ import json
import os
import sys
import tempfile
import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime
from typing import List
@ -223,7 +226,11 @@ class Solver():
})
return packages
def depsolve(self, transactions):
def depsolve(self, arguments):
transactions = arguments.get("transactions", [])
# collect repo IDs from the request so we know whether to translate gpg key paths
request_repo_ids = set(repo["id"] for repo in arguments.get("repos", []))
root_dir = arguments.get("root_dir")
last_transaction: List = []
for transaction in transactions:
@ -281,6 +288,7 @@ class Solver():
"mirrorlist": repo.mirrorlist,
"gpgcheck": repo.gpgcheck,
"check_repogpg": repo.repo_gpgcheck,
"gpgkeys": read_keys(repo.gpgkey, root_dir if repo.id not in request_repo_ids else None),
"ignoressl": not bool(repo.sslverify),
"sslcacert": repo.sslcacert,
"sslclientkey": repo.sslclientkey,
@ -293,6 +301,40 @@ class Solver():
return response
class GPGKeyReadError(Exception):
pass
def modify_rootdir_path(path, root_dir):
if path and root_dir:
# if the root_dir is set, we need to translate the key path to be under this directory
return os.path.join(root_dir, path.lstrip("/"))
return path
def read_keys(paths, root_dir=None):
keys = []
for path in paths:
url = urllib.parse.urlparse(path)
if url.scheme == "file":
path = path.removeprefix("file://")
path = modify_rootdir_path(path, root_dir)
try:
with open(path, mode="r", encoding="utf-8") as keyfile:
keys.append(keyfile.read())
except Exception as e:
raise GPGKeyReadError(f"error loading gpg key from {path}: {e}") from e
elif url.scheme in ["http", "https"]:
try:
resp = urllib.request.urlopen(urllib.request.Request(path))
keys.append(resp.read().decode())
except urllib.error.URLError as e:
raise GPGKeyReadError(f"error reading remote gpg key at {path}: {e}") from e
else:
raise GPGKeyReadError(f"unknown url scheme for gpg key: {url.scheme} ({path})")
return keys
def setup_cachedir(request):
arch = request["arch"]
# If dnf-json is run as a service, we don't want users to be able to set the cache
@ -312,14 +354,13 @@ def solve(request, cache_dir):
command = request["command"]
arguments = request["arguments"]
transactions = arguments.get("transactions")
with tempfile.TemporaryDirectory() as persistdir:
try:
solver = Solver(request, persistdir, cache_dir)
if command == "dump":
result = solver.dump()
elif command == "depsolve":
result = solver.depsolve(transactions)
result = solver.depsolve(arguments)
elif command == "search":
result = solver.search(arguments.get("search", {}))
@ -333,7 +374,7 @@ def solve(request, cache_dir):
printe("error depsolve")
# collect list of packages for error
pkgs = []
for t in transactions:
for t in arguments.get("transactions", []):
pkgs.extend(t["package-specs"])
return None, {
"kind": "DepsolveError",
@ -350,6 +391,12 @@ def solve(request, cache_dir):
"kind": type(e).__name__,
"reason": str(e)
}
except GPGKeyReadError as e:
printe("error reading gpgkey")
return None, {
"kind": type(e).__name__,
"reason": str(e)
}
return result, None