#!/usr/bin/python3 # pylint: disable=invalid-name """ A JSON-based interface for depsolving using DNF. Reads a request through stdin and prints the result to stdout. In case of error, a structured error is printed to stdout as well. """ import json import os import os.path import sys import tempfile from osbuild.solver import GPGKeyReadError, MarkingError, DepsolveError, NoReposError, RepoError, InvalidRequestError # Load the solver configuration config = {"use_dnf5": False} config_path = os.environ.get("OSBUILD_SOLVER_CONFIG") or "/usr/lib/osbuild/solver.json" try: with open(config_path, encoding="utf-8") as f: loaded_config = json.load(f) config.update(loaded_config) except FileNotFoundError: pass if config.get("use_dnf5", False): from osbuild.solver.dnf5 import DNF5 as Solver else: from osbuild.solver.dnf import DNF as Solver def get_string_option(option): # option.get_value() causes an error if it's unset for string values, so check if it's empty first if option.empty(): return None return option.get_value() def setup_cachedir(request): arch = request["arch"] # If dnf-json is run as a service, we don't want users to be able to set the cache cache_dir = os.environ.get("OVERWRITE_CACHE_DIR", "") if cache_dir: cache_dir = os.path.join(cache_dir, arch) else: cache_dir = request.get("cachedir", "") if not cache_dir: return "", {"kind": "Error", "reason": "No cache dir set"} return cache_dir, None def solve(request, cache_dir): command = request["command"] arguments = request["arguments"] with tempfile.TemporaryDirectory() as persistdir: try: solver = Solver(request, persistdir, cache_dir, config.get("license_index_path")) if command == "dump": result = solver.dump() elif command == "depsolve": result = solver.depsolve(arguments) elif command == "search": result = solver.search(arguments.get("search", {})) except GPGKeyReadError as e: printe("error reading gpgkey") return None, { "kind": type(e).__name__, "reason": str(e) } except RepoError as e: return None, { "kind": "RepoError", "reason": f"There was a problem reading a repository: {e}" } except NoReposError as e: return None, { "kind": "NoReposError", "reason": f"There was a problem finding repositories: {e}" } except MarkingError as e: printe("error install_specs") return None, { "kind": "MarkingErrors", "reason": f"Error occurred when marking packages for installation: {e}" } except DepsolveError as e: printe("error depsolve") # collect list of packages for error pkgs = [] for t in arguments.get("transactions", []): pkgs.extend(t["package-specs"]) return None, { "kind": "DepsolveError", "reason": f"There was a problem depsolving {', '.join(pkgs)}: {e}" } except InvalidRequestError as e: printe("error invalid request") return None, { "kind": "InvalidRequest", "reason": str(e) } except Exception as e: # pylint: disable=broad-exception-caught printe("error traceback") import traceback return None, { "kind": type(e).__name__, "reason": str(e), "traceback": traceback.format_exc() } return result, None def printe(*msg): print(*msg, file=sys.stderr) def fail(err): printe(f"{err['kind']}: {err['reason']}") print(json.dumps(err)) sys.exit(1) def respond(result): print(json.dumps(result)) # pylint: disable=too-many-return-statements def validate_request(request): command = request.get("command") valid_cmds = ("depsolve", "dump", "search") if command not in valid_cmds: return { "kind": "InvalidRequest", "reason": f"invalid command '{command}': must be one of {', '.join(valid_cmds)}" } if not request.get("arch"): return { "kind": "InvalidRequest", "reason": "no 'arch' specified" } if not request.get("releasever"): return { "kind": "InvalidRequest", "reason": "no 'releasever' specified" } arguments = request.get("arguments") if not arguments: return { "kind": "InvalidRequest", "reason": "empty 'arguments'" } sbom = request["arguments"].get("sbom") if sbom is not None: if command != "depsolve": return { "kind": "InvalidRequest", "reason": "SBOM is only supported with 'depsolve' command" } if not isinstance(sbom, dict): return { "kind": "InvalidRequest", "reason": "invalid 'sbom' value" } sbom_type = sbom.get("type") if sbom_type is None: return { "kind": "InvalidRequest", "reason": "missing 'type' in 'sbom'" } if not isinstance(sbom_type, str): return { "kind": "InvalidRequest", "reason": "invalid 'type' in 'sbom'" } if sbom_type != "spdx": return { "kind": "InvalidRequest", "reason": "Unsupported SBOM type" } if not arguments.get("repos") and not arguments.get("root_dir"): return { "kind": "InvalidRequest", "reason": "no 'repos' or 'root_dir' specified" } return None def main(): request = json.load(sys.stdin) err = validate_request(request) if err: fail(err) cachedir, err = setup_cachedir(request) if err: fail(err) result, err = solve(request, cachedir) if err: fail(err) else: respond(result) if __name__ == "__main__": main()