debian-forge/tools/osbuild-depsolve-dnf
Michael Vogt d068c6d91f dnfjson: detect/error if no repositories are defined
This commit adds an error message if no repositories are
defined in the dnfjson query. We had the issue in
https://github.com/osbuild/bootc-image-builder/issues/922
that in a RHEL bootc-container no repositories are defined.

Here the error is quite confusing, as it complains about
error marking packages which is technically correct but
hides the root of the problem.

With this detect we can construct a more useful error
message in the higher layers.
2025-05-12 20:51:57 +02:00

217 lines
6.2 KiB
Python
Executable file

#!/usr/bin/python3
# pylint: disable=invalid-name
"""
A JSON-based interface for depsolving using DNF.
Reads a request through stdin and prints the result to stdout.
In case of error, a structured error is printed to stdout as well.
"""
import json
import os
import os.path
import sys
import tempfile
from osbuild.solver import GPGKeyReadError, MarkingError, DepsolveError, NoReposError, RepoError, InvalidRequestError
# Load the solver configuration
config = {"use_dnf5": False}
config_path = os.environ.get("OSBUILD_SOLVER_CONFIG") or "/usr/lib/osbuild/solver.json"
try:
with open(config_path, encoding="utf-8") as f:
loaded_config = json.load(f)
config.update(loaded_config)
except FileNotFoundError:
pass
if config.get("use_dnf5", False):
from osbuild.solver.dnf5 import DNF5 as Solver
else:
from osbuild.solver.dnf import DNF as Solver
def get_string_option(option):
# option.get_value() causes an error if it's unset for string values, so check if it's empty first
if option.empty():
return None
return option.get_value()
def setup_cachedir(request):
arch = request["arch"]
# If dnf-json is run as a service, we don't want users to be able to set the cache
cache_dir = os.environ.get("OVERWRITE_CACHE_DIR", "")
if cache_dir:
cache_dir = os.path.join(cache_dir, arch)
else:
cache_dir = request.get("cachedir", "")
if not cache_dir:
return "", {"kind": "Error", "reason": "No cache dir set"}
return cache_dir, None
def solve(request, cache_dir):
command = request["command"]
arguments = request["arguments"]
with tempfile.TemporaryDirectory() as persistdir:
try:
solver = Solver(request, persistdir, cache_dir, config.get("license_index_path"))
if command == "dump":
result = solver.dump()
elif command == "depsolve":
result = solver.depsolve(arguments)
elif command == "search":
result = solver.search(arguments.get("search", {}))
except GPGKeyReadError as e:
printe("error reading gpgkey")
return None, {
"kind": type(e).__name__,
"reason": str(e)
}
except RepoError as e:
return None, {
"kind": "RepoError",
"reason": f"There was a problem reading a repository: {e}"
}
except NoReposError as e:
return None, {
"kind": "NoReposError",
"reason": f"There was a problem finding repositories: {e}"
}
except MarkingError as e:
printe("error install_specs")
return None, {
"kind": "MarkingErrors",
"reason": f"Error occurred when marking packages for installation: {e}"
}
except DepsolveError as e:
printe("error depsolve")
# collect list of packages for error
pkgs = []
for t in arguments.get("transactions", []):
pkgs.extend(t["package-specs"])
return None, {
"kind": "DepsolveError",
"reason": f"There was a problem depsolving {', '.join(pkgs)}: {e}"
}
except InvalidRequestError as e:
printe("error invalid request")
return None, {
"kind": "InvalidRequest",
"reason": str(e)
}
except Exception as e: # pylint: disable=broad-exception-caught
printe("error traceback")
import traceback
return None, {
"kind": type(e).__name__,
"reason": str(e),
"traceback": traceback.format_exc()
}
return result, None
def printe(*msg):
print(*msg, file=sys.stderr)
def fail(err):
printe(f"{err['kind']}: {err['reason']}")
print(json.dumps(err))
sys.exit(1)
def respond(result):
print(json.dumps(result))
# pylint: disable=too-many-return-statements
def validate_request(request):
command = request.get("command")
valid_cmds = ("depsolve", "dump", "search")
if command not in valid_cmds:
return {
"kind": "InvalidRequest",
"reason": f"invalid command '{command}': must be one of {', '.join(valid_cmds)}"
}
if not request.get("arch"):
return {
"kind": "InvalidRequest",
"reason": "no 'arch' specified"
}
if not request.get("releasever"):
return {
"kind": "InvalidRequest",
"reason": "no 'releasever' specified"
}
arguments = request.get("arguments")
if not arguments:
return {
"kind": "InvalidRequest",
"reason": "empty 'arguments'"
}
sbom = request["arguments"].get("sbom")
if sbom is not None:
if command != "depsolve":
return {
"kind": "InvalidRequest",
"reason": "SBOM is only supported with 'depsolve' command"
}
if not isinstance(sbom, dict):
return {
"kind": "InvalidRequest",
"reason": "invalid 'sbom' value"
}
sbom_type = sbom.get("type")
if sbom_type is None:
return {
"kind": "InvalidRequest",
"reason": "missing 'type' in 'sbom'"
}
if not isinstance(sbom_type, str):
return {
"kind": "InvalidRequest",
"reason": "invalid 'type' in 'sbom'"
}
if sbom_type != "spdx":
return {
"kind": "InvalidRequest",
"reason": "Unsupported SBOM type"
}
if not arguments.get("repos") and not arguments.get("root_dir"):
return {
"kind": "InvalidRequest",
"reason": "no 'repos' or 'root_dir' specified"
}
return None
def main():
request = json.load(sys.stdin)
err = validate_request(request)
if err:
fail(err)
cachedir, err = setup_cachedir(request)
if err:
fail(err)
result, err = solve(request, cachedir)
if err:
fail(err)
else:
respond(result)
if __name__ == "__main__":
main()