debian-forge/tools/osbuild-depsolve-dnf
Tomáš Hozza 562a30ce93 osbuild-depsolve-dnf: add SBOM support for DNF5
Enable generating SBOM documents for depsolved transactions when using
DNF5. Enable SBOM testing with DNF5 in unit tests.

Signed-off-by: Tomáš Hozza <thozza@redhat.com>
2024-12-02 23:24:39 +01:00

217 lines
6.1 KiB
Python
Executable file

#!/usr/bin/python3
# pylint: disable=invalid-name
"""
A JSON-based interface for depsolving using DNF.
Reads a request through stdin and prints the result to stdout.
In case of error, a structured error is printed to stdout as well.
"""
import json
import os
import os.path
import sys
import tempfile
from osbuild.solver import GPGKeyReadError, MarkingError, DepsolveError, RepoError, InvalidRequestError
# Load the solver configuration
config = {"use_dnf5": False}
config_path = os.environ.get("OSBUILD_SOLVER_CONFIG") or "/usr/lib/osbuild/solver.json"
try:
with open(config_path, encoding="utf-8") as f:
config = json.load(f)
except FileNotFoundError:
pass
if config.get("use_dnf5", False):
from osbuild.solver.dnf5 import DNF5 as Solver
else:
from osbuild.solver.dnf import DNF as Solver
def get_string_option(option):
# option.get_value() causes an error if it's unset for string values, so check if it's empty first
if option.empty():
return None
return option.get_value()
def setup_cachedir(request):
arch = request["arch"]
# If dnf-json is run as a service, we don't want users to be able to set the cache
cache_dir = os.environ.get("OVERWRITE_CACHE_DIR", "")
if cache_dir:
cache_dir = os.path.join(cache_dir, arch)
else:
cache_dir = request.get("cachedir", "")
if not cache_dir:
return "", {"kind": "Error", "reason": "No cache dir set"}
return cache_dir, None
def solve(request, cache_dir):
command = request["command"]
arguments = request["arguments"]
with tempfile.TemporaryDirectory() as persistdir:
try:
solver = Solver(request, persistdir, cache_dir)
if command == "dump":
result = solver.dump()
elif command == "depsolve":
result = solver.depsolve(arguments)
elif command == "search":
result = solver.search(arguments.get("search", {}))
except GPGKeyReadError as e:
printe("error reading gpgkey")
return None, {
"kind": type(e).__name__,
"reason": str(e)
}
except RepoError as e:
return None, {
"kind": "RepoError",
"reason": f"There was a problem reading a repository: {e}"
}
except MarkingError as e:
printe("error install_specs")
return None, {
"kind": "MarkingErrors",
"reason": f"Error occurred when marking packages for installation: {e}"
}
except DepsolveError as e:
printe("error depsolve")
# collect list of packages for error
pkgs = []
for t in arguments.get("transactions", []):
pkgs.extend(t["package-specs"])
return None, {
"kind": "DepsolveError",
"reason": f"There was a problem depsolving {', '.join(pkgs)}: {e}"
}
except InvalidRequestError as e:
printe("error invalid request")
return None, {
"kind": "InvalidRequest",
"reason": str(e)
}
except Exception as e: # pylint: disable=broad-exception-caught
printe("error traceback")
import traceback
return None, {
"kind": type(e).__name__,
"reason": str(e),
"traceback": traceback.format_exc()
}
return result, None
def printe(*msg):
print(*msg, file=sys.stderr)
def fail(err):
printe(f"{err['kind']}: {err['reason']}")
print(json.dumps(err))
sys.exit(1)
def respond(result):
print(json.dumps(result))
# pylint: disable=too-many-return-statements
def validate_request(request):
command = request.get("command")
valid_cmds = ("depsolve", "dump", "search")
if command not in valid_cmds:
return {
"kind": "InvalidRequest",
"reason": f"invalid command '{command}': must be one of {', '.join(valid_cmds)}"
}
if not request.get("arch"):
return {
"kind": "InvalidRequest",
"reason": "no 'arch' specified"
}
if not request.get("module_platform_id"):
return {
"kind": "InvalidRequest",
"reason": "no 'module_platform_id' specified"
}
if not request.get("releasever"):
return {
"kind": "InvalidRequest",
"reason": "no 'releasever' specified"
}
arguments = request.get("arguments")
if not arguments:
return {
"kind": "InvalidRequest",
"reason": "empty 'arguments'"
}
sbom = request["arguments"].get("sbom")
if sbom is not None:
if command != "depsolve":
return {
"kind": "InvalidRequest",
"reason": "SBOM is only supported with 'depsolve' command"
}
if not isinstance(sbom, dict):
return {
"kind": "InvalidRequest",
"reason": "invalid 'sbom' value"
}
sbom_type = sbom.get("type")
if sbom_type is None:
return {
"kind": "InvalidRequest",
"reason": "missing 'type' in 'sbom'"
}
if not isinstance(sbom_type, str):
return {
"kind": "InvalidRequest",
"reason": "invalid 'type' in 'sbom'"
}
if sbom_type != "spdx":
return {
"kind": "InvalidRequest",
"reason": "Unsupported SBOM type"
}
if not arguments.get("repos") and not arguments.get("root_dir"):
return {
"kind": "InvalidRequest",
"reason": "no 'repos' or 'root_dir' specified"
}
return None
def main():
request = json.load(sys.stdin)
err = validate_request(request)
if err:
fail(err)
cachedir, err = setup_cachedir(request)
if err:
fail(err)
result, err = solve(request, cachedir)
if err:
fail(err)
else:
respond(result)
if __name__ == "__main__":
main()