Enable generating SBOM documents for depsolved transactions when using DNF5. Enable SBOM testing with DNF5 in unit tests. Signed-off-by: Tomáš Hozza <thozza@redhat.com>
217 lines
6.1 KiB
Python
Executable file
217 lines
6.1 KiB
Python
Executable file
#!/usr/bin/python3
|
|
# pylint: disable=invalid-name
|
|
|
|
"""
|
|
A JSON-based interface for depsolving using DNF.
|
|
|
|
Reads a request through stdin and prints the result to stdout.
|
|
In case of error, a structured error is printed to stdout as well.
|
|
"""
|
|
import json
|
|
import os
|
|
import os.path
|
|
import sys
|
|
import tempfile
|
|
|
|
from osbuild.solver import GPGKeyReadError, MarkingError, DepsolveError, RepoError, InvalidRequestError
|
|
|
|
# Load the solver configuration
|
|
config = {"use_dnf5": False}
|
|
config_path = os.environ.get("OSBUILD_SOLVER_CONFIG") or "/usr/lib/osbuild/solver.json"
|
|
try:
|
|
with open(config_path, encoding="utf-8") as f:
|
|
config = json.load(f)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
if config.get("use_dnf5", False):
|
|
from osbuild.solver.dnf5 import DNF5 as Solver
|
|
else:
|
|
from osbuild.solver.dnf import DNF as Solver
|
|
|
|
|
|
def get_string_option(option):
|
|
# option.get_value() causes an error if it's unset for string values, so check if it's empty first
|
|
if option.empty():
|
|
return None
|
|
return option.get_value()
|
|
|
|
|
|
def setup_cachedir(request):
|
|
arch = request["arch"]
|
|
# If dnf-json is run as a service, we don't want users to be able to set the cache
|
|
cache_dir = os.environ.get("OVERWRITE_CACHE_DIR", "")
|
|
if cache_dir:
|
|
cache_dir = os.path.join(cache_dir, arch)
|
|
else:
|
|
cache_dir = request.get("cachedir", "")
|
|
|
|
if not cache_dir:
|
|
return "", {"kind": "Error", "reason": "No cache dir set"}
|
|
|
|
return cache_dir, None
|
|
|
|
|
|
def solve(request, cache_dir):
|
|
command = request["command"]
|
|
arguments = request["arguments"]
|
|
|
|
with tempfile.TemporaryDirectory() as persistdir:
|
|
try:
|
|
solver = Solver(request, persistdir, cache_dir)
|
|
if command == "dump":
|
|
result = solver.dump()
|
|
elif command == "depsolve":
|
|
result = solver.depsolve(arguments)
|
|
elif command == "search":
|
|
result = solver.search(arguments.get("search", {}))
|
|
except GPGKeyReadError as e:
|
|
printe("error reading gpgkey")
|
|
return None, {
|
|
"kind": type(e).__name__,
|
|
"reason": str(e)
|
|
}
|
|
except RepoError as e:
|
|
return None, {
|
|
"kind": "RepoError",
|
|
"reason": f"There was a problem reading a repository: {e}"
|
|
}
|
|
except MarkingError as e:
|
|
printe("error install_specs")
|
|
return None, {
|
|
"kind": "MarkingErrors",
|
|
"reason": f"Error occurred when marking packages for installation: {e}"
|
|
}
|
|
except DepsolveError as e:
|
|
printe("error depsolve")
|
|
# collect list of packages for error
|
|
pkgs = []
|
|
for t in arguments.get("transactions", []):
|
|
pkgs.extend(t["package-specs"])
|
|
return None, {
|
|
"kind": "DepsolveError",
|
|
"reason": f"There was a problem depsolving {', '.join(pkgs)}: {e}"
|
|
}
|
|
except InvalidRequestError as e:
|
|
printe("error invalid request")
|
|
return None, {
|
|
"kind": "InvalidRequest",
|
|
"reason": str(e)
|
|
}
|
|
except Exception as e: # pylint: disable=broad-exception-caught
|
|
printe("error traceback")
|
|
import traceback
|
|
return None, {
|
|
"kind": type(e).__name__,
|
|
"reason": str(e),
|
|
"traceback": traceback.format_exc()
|
|
}
|
|
|
|
return result, None
|
|
|
|
|
|
def printe(*msg):
|
|
print(*msg, file=sys.stderr)
|
|
|
|
|
|
def fail(err):
|
|
printe(f"{err['kind']}: {err['reason']}")
|
|
print(json.dumps(err))
|
|
sys.exit(1)
|
|
|
|
|
|
def respond(result):
|
|
print(json.dumps(result))
|
|
|
|
|
|
# pylint: disable=too-many-return-statements
|
|
def validate_request(request):
|
|
command = request.get("command")
|
|
valid_cmds = ("depsolve", "dump", "search")
|
|
if command not in valid_cmds:
|
|
return {
|
|
"kind": "InvalidRequest",
|
|
"reason": f"invalid command '{command}': must be one of {', '.join(valid_cmds)}"
|
|
}
|
|
|
|
if not request.get("arch"):
|
|
return {
|
|
"kind": "InvalidRequest",
|
|
"reason": "no 'arch' specified"
|
|
}
|
|
|
|
if not request.get("module_platform_id"):
|
|
return {
|
|
"kind": "InvalidRequest",
|
|
"reason": "no 'module_platform_id' specified"
|
|
}
|
|
|
|
if not request.get("releasever"):
|
|
return {
|
|
"kind": "InvalidRequest",
|
|
"reason": "no 'releasever' specified"
|
|
}
|
|
|
|
arguments = request.get("arguments")
|
|
if not arguments:
|
|
return {
|
|
"kind": "InvalidRequest",
|
|
"reason": "empty 'arguments'"
|
|
}
|
|
|
|
sbom = request["arguments"].get("sbom")
|
|
if sbom is not None:
|
|
if command != "depsolve":
|
|
return {
|
|
"kind": "InvalidRequest",
|
|
"reason": "SBOM is only supported with 'depsolve' command"
|
|
}
|
|
if not isinstance(sbom, dict):
|
|
return {
|
|
"kind": "InvalidRequest",
|
|
"reason": "invalid 'sbom' value"
|
|
}
|
|
sbom_type = sbom.get("type")
|
|
if sbom_type is None:
|
|
return {
|
|
"kind": "InvalidRequest",
|
|
"reason": "missing 'type' in 'sbom'"
|
|
}
|
|
if not isinstance(sbom_type, str):
|
|
return {
|
|
"kind": "InvalidRequest",
|
|
"reason": "invalid 'type' in 'sbom'"
|
|
}
|
|
if sbom_type != "spdx":
|
|
return {
|
|
"kind": "InvalidRequest",
|
|
"reason": "Unsupported SBOM type"
|
|
}
|
|
|
|
if not arguments.get("repos") and not arguments.get("root_dir"):
|
|
return {
|
|
"kind": "InvalidRequest",
|
|
"reason": "no 'repos' or 'root_dir' specified"
|
|
}
|
|
|
|
return None
|
|
|
|
|
|
def main():
|
|
request = json.load(sys.stdin)
|
|
err = validate_request(request)
|
|
if err:
|
|
fail(err)
|
|
|
|
cachedir, err = setup_cachedir(request)
|
|
if err:
|
|
fail(err)
|
|
result, err = solve(request, cachedir)
|
|
if err:
|
|
fail(err)
|
|
else:
|
|
respond(result)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|