debian-forge/osbuild/solver/dnf5.py
Simon de Vlieger 3bbacb5703 solver/dnf5: switch base exception type
libdnf5 changed error types [1], [2], [3] and they no longer inherit from
`RuntimeError`; instead catch `Exception` which is the new common base
type.

This fixes issues in our CI.

[1]: https://github.com/rpm-software-management/dnf5/pull/2124
[2]: https://github.com/rpm-software-management/dnf5/pull/2118#issuecomment-2876601471
[3]: https://bugzilla.redhat.com/show_bug.cgi?id=2365689

Signed-off-by: Simon de Vlieger <supakeen@redhat.com>
2025-06-16 09:23:25 -07:00

478 lines
20 KiB
Python
Executable file

import os
import os.path
import tempfile
from datetime import datetime
from typing import Dict, List
import libdnf5 as dnf5
from libdnf5.base import GoalProblem_NO_PROBLEM as NO_PROBLEM
from libdnf5.base import GoalProblem_NOT_FOUND as NOT_FOUND
from libdnf5.common import QueryCmp_CONTAINS as CONTAINS
from libdnf5.common import QueryCmp_EQ as EQ
from libdnf5.common import QueryCmp_GLOB as GLOB
from osbuild.solver import (
DepsolveError,
MarkingError,
NoReposError,
RepoError,
SolverBase,
modify_rootdir_path,
read_keys,
)
from osbuild.util.sbom.dnf5 import dnf_pkgset_to_sbom_pkgset
from osbuild.util.sbom.spdx import sbom_pkgset_to_spdx2_doc
def remote_location(package, schemes=("http", "ftp", "file", "https")):
"""Return the remote url where a package rpm may be downloaded from
This wraps the get_remote_location() function, returning the first
result or if it cannot find a suitable url it raises a RuntimeError
"""
urls = package.get_remote_locations(schemes)
if not urls or len(urls) == 0:
raise RuntimeError(f"Cannot determine remote location for {package.get_nevra()}")
return urls[0]
def get_string_option(option):
# option.get_value() causes an error if it's unset for string values, so check if it's empty first
if option.empty():
return None
return option.get_value()
# XXX - Temporarily lifted from dnf.rpm module # pylint: disable=fixme
def _invert(dct):
return {v: k for k in dct for v in dct[k]}
def any_repos_enabled(base):
"""Return true if any repositories are enabled"""
rq = dnf5.repo.RepoQuery(base)
return rq.begin() != rq.end()
class DNF5(SolverBase):
"""Solver implements package related actions
These include depsolving a package set, searching for packages, and dumping a list
of all available packages.
"""
# pylint: disable=too-many-arguments
def __init__(self, request, persistdir, cachedir, license_index_path=None):
arch = request["arch"]
releasever = request.get("releasever")
module_platform_id = request.get("module_platform_id")
proxy = request.get("proxy")
arguments = request["arguments"]
repos = arguments.get("repos", [])
root_dir = arguments.get("root_dir")
# Gather up all the exclude packages from all the transactions
exclude_pkgs = []
# Return an empty list when 'transactions' key is missing or when it is None
transactions = arguments.get("transactions") or []
for t in transactions:
# Return an empty list when 'exclude-specs' key is missing or when it is None
exclude_pkgs.extend(t.get("exclude-specs") or [])
if not exclude_pkgs:
exclude_pkgs = []
self.base = dnf5.base.Base()
# Base is the correct place to set substitutions, not per-repo.
# See https://github.com/rpm-software-management/dnf5/issues/1248
self.base.get_vars().set("arch", arch)
self.base.get_vars().set("basearch", self._BASEARCH_MAP[arch])
if releasever:
self.base.get_vars().set('releasever', releasever)
if proxy:
self.base.get_vars().set('proxy', proxy)
# Enable fastestmirror to ensure we choose the fastest mirrors for
# downloading metadata (when depsolving) and downloading packages.
conf = self.base.get_config()
conf.fastestmirror = True
# Weak dependencies are installed for the 1st transaction
# This is set to False for any subsequent ones in depsolve()
conf.install_weak_deps = True
# We use the same cachedir for multiple architectures. Unfortunately,
# this is something that doesn't work well in certain situations
# with zchunk:
# Imagine that we already have cache for arch1. Then, we use dnf-json
# to depsolve for arch2. If ZChunk is enabled and available (that's
# the case for Fedora), dnf will try to download only differences
# between arch1 and arch2 metadata. But, as these are completely
# different, dnf must basically redownload everything.
# For downloding deltas, zchunk uses HTTP range requests. Unfortunately,
# if the mirror doesn't support multi range requests, then zchunk will
# download one small segment per a request. Because we need to update
# the whole metadata (10s of MB), this can be extremely slow in some cases.
# I think that we can come up with a better fix but let's just disable
# zchunk for now. As we are already downloading a lot of data when
# building images, I don't care if we download even more.
conf.zchunk = False
# Set the rest of the dnf configuration.
if module_platform_id:
conf.module_platform_id = module_platform_id
conf.config_file_path = "/dev/null"
conf.persistdir = persistdir
conf.cachedir = cachedir
# Include comps metadata by default
metadata_types = ['comps']
metadata_types.extend(arguments.get("optional-metadata", []))
conf.optional_metadata_types = metadata_types
try:
# NOTE: With libdnf5 packages are excluded in the repo setup
for repo in repos:
self._dnfrepo(repo, exclude_pkgs)
if root_dir:
# This sets the varsdir to ("{root_dir}/usr/share/dnf5/vars.d/", "{root_dir}/etc/dnf/vars/") for custom
# variable substitution (e.g. CentOS Stream 9's $stream variable)
conf.installroot = root_dir
conf.varsdir = (os.path.join(root_dir, "etc/dnf/vars"), os.path.join(root_dir, "usr/share/dnf5/vars.d"))
# Cannot modify .conf() values after this
# base.setup() should be called before loading repositories otherwise substitutions might not work.
self.base.setup()
if root_dir:
repos_dir = os.path.join(root_dir, "etc/yum.repos.d")
self.base.get_repo_sack().create_repos_from_dir(repos_dir)
rq = dnf5.repo.RepoQuery(self.base)
rq.filter_enabled(True)
repo_iter = rq.begin()
while repo_iter != rq.end():
repo = repo_iter.value()
config = repo.get_config()
config.sslcacert = modify_rootdir_path(
get_string_option(config.get_sslcacert_option()),
root_dir,
)
config.sslclientcert = modify_rootdir_path(
get_string_option(config.get_sslclientcert_option()),
root_dir,
)
config.sslclientkey = modify_rootdir_path(
get_string_option(config.get_sslclientkey_option()),
root_dir,
)
repo_iter.next()
self.base.get_repo_sack().load_repos(dnf5.repo.Repo.Type_AVAILABLE)
except Exception as e:
raise RepoError(e) from e
if not any_repos_enabled(self.base):
raise NoReposError("There are no enabled repositories")
# Custom license index file path use for SBOM generation
self.license_index_path = license_index_path
_BASEARCH_MAP = _invert({
'aarch64': ('aarch64',),
'alpha': ('alpha', 'alphaev4', 'alphaev45', 'alphaev5', 'alphaev56',
'alphaev6', 'alphaev67', 'alphaev68', 'alphaev7', 'alphapca56'),
'arm': ('armv5tejl', 'armv5tel', 'armv5tl', 'armv6l', 'armv7l', 'armv8l'),
'armhfp': ('armv6hl', 'armv7hl', 'armv7hnl', 'armv8hl'),
'i386': ('i386', 'athlon', 'geode', 'i386', 'i486', 'i586', 'i686'),
'ia64': ('ia64',),
'mips': ('mips',),
'mipsel': ('mipsel',),
'mips64': ('mips64',),
'mips64el': ('mips64el',),
'loongarch64': ('loongarch64',),
'noarch': ('noarch',),
'ppc': ('ppc',),
'ppc64': ('ppc64', 'ppc64iseries', 'ppc64p7', 'ppc64pseries'),
'ppc64le': ('ppc64le',),
'riscv32': ('riscv32',),
'riscv64': ('riscv64',),
'riscv128': ('riscv128',),
's390': ('s390',),
's390x': ('s390x',),
'sh3': ('sh3',),
'sh4': ('sh4', 'sh4a'),
'sparc': ('sparc', 'sparc64', 'sparc64v', 'sparcv8', 'sparcv9',
'sparcv9v'),
'x86_64': ('x86_64', 'amd64', 'ia32e'),
})
# pylint: disable=too-many-branches
def _dnfrepo(self, desc, exclude_pkgs=None):
"""Makes a dnf.repo.Repo out of a JSON repository description"""
if not exclude_pkgs:
exclude_pkgs = []
sack = self.base.get_repo_sack()
repo = sack.create_repo(desc["id"])
conf = repo.get_config()
if "name" in desc:
conf.name = desc["name"]
# At least one is required
if "baseurl" in desc:
conf.baseurl = desc["baseurl"]
elif "metalink" in desc:
conf.metalink = desc["metalink"]
elif "mirrorlist" in desc:
conf.mirrorlist = desc["mirrorlist"]
else:
raise ValueError("missing either `baseurl`, `metalink`, or `mirrorlist` in repo")
conf.sslverify = desc.get("sslverify", True)
if "sslcacert" in desc:
conf.sslcacert = desc["sslcacert"]
if "sslclientkey" in desc:
conf.sslclientkey = desc["sslclientkey"]
if "sslclientcert" in desc:
conf.sslclientcert = desc["sslclientcert"]
if "gpgcheck" in desc:
conf.gpgcheck = desc["gpgcheck"]
if "repo_gpgcheck" in desc:
conf.repo_gpgcheck = desc["repo_gpgcheck"]
if "gpgkey" in desc:
conf.gpgkey = [desc["gpgkey"]]
if "gpgkeys" in desc:
# gpgkeys can contain a full key, or it can be a URL
# dnf expects urls, so write the key to a temporary location and add the file://
# path to conf.gpgkey
keydir = os.path.join(self.base.get_config().persistdir, "gpgkeys")
if not os.path.exists(keydir):
os.makedirs(keydir, mode=0o700, exist_ok=True)
for key in desc["gpgkeys"]:
if key.startswith("-----BEGIN PGP PUBLIC KEY BLOCK-----"):
# Not using with because it needs to be a valid file for the duration. It
# is inside the temporary persistdir so will be cleaned up on exit.
# pylint: disable=consider-using-with
keyfile = tempfile.NamedTemporaryFile(dir=keydir, delete=False)
keyfile.write(key.encode("utf-8"))
conf.gpgkey += (f"file://{keyfile.name}",)
keyfile.close()
else:
conf.gpgkey += (key,)
# In dnf, the default metadata expiration time is 48 hours. However,
# some repositories never expire the metadata, and others expire it much
# sooner than that. We therefore allow this to be configured. If nothing
# is provided we error on the side of checking if we should invalidate
# the cache. If cache invalidation is not necessary, the overhead of
# checking is in the hundreds of milliseconds. In order to avoid this
# overhead accumulating for API calls that consist of several dnf calls,
# we set the expiration to a short time period, rather than 0.
conf.metadata_expire = desc.get("metadata_expire", "20s")
# This option if True disables modularization filtering. Effectively
# disabling modularity for given repository.
if "module_hotfixes" in desc:
repo.module_hotfixes = desc["module_hotfixes"]
# Set the packages to exclude
conf.excludepkgs = exclude_pkgs
return repo
@staticmethod
def _timestamp_to_rfc3339(timestamp):
return datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%dT%H:%M:%SZ')
def _sbom_for_pkgset(self, pkgset: List[dnf5.rpm.Package]) -> Dict:
"""
Create an SBOM document for the given package set.
For now, only SPDX v2 is supported.
"""
pkgset = dnf_pkgset_to_sbom_pkgset(pkgset)
spdx_doc = sbom_pkgset_to_spdx2_doc(pkgset, self.license_index_path)
return spdx_doc.to_dict()
def dump(self):
"""dump returns a list of all available packages"""
packages = []
q = dnf5.rpm.PackageQuery(self.base)
q.filter_available()
for package in list(q):
packages.append({
"name": package.get_name(),
"summary": package.get_summary(),
"description": package.get_description(),
"url": package.get_url(),
"repo_id": package.get_repo_id(),
"epoch": int(package.get_epoch()),
"version": package.get_version(),
"release": package.get_release(),
"arch": package.get_arch(),
"buildtime": self._timestamp_to_rfc3339(package.get_build_time()),
"license": package.get_license()
})
return packages
def search(self, args):
""" Perform a search on the available packages
args contains a "search" dict with parameters to use for searching.
"packages" list of package name globs to search for
"latest" is a boolean that will return only the latest NEVRA instead
of all matching builds in the metadata.
eg.
"search": {
"latest": false,
"packages": ["tmux", "vim*", "*ssh*"]
},
"""
pkg_globs = args.get("packages", [])
packages = []
# NOTE: Build query one piece at a time, don't pass all to filterm at the same
# time.
for name in pkg_globs:
q = dnf5.rpm.PackageQuery(self.base)
q.filter_available()
# If the package name glob has * in it, use glob.
# If it has *name* use substr
# If it has neither use exact match
if "*" in name:
if name[0] != "*" or name[-1] != "*":
q.filter_name([name], GLOB)
else:
q.filter_name([name.replace("*", "")], CONTAINS)
else:
q.filter_name([name], EQ)
if args.get("latest", False):
q.filter_latest_evr()
for package in list(q):
packages.append({
"name": package.get_name(),
"summary": package.get_summary(),
"description": package.get_description(),
"url": package.get_url(),
"repo_id": package.get_repo_id(),
"epoch": int(package.get_epoch()),
"version": package.get_version(),
"release": package.get_release(),
"arch": package.get_arch(),
"buildtime": self._timestamp_to_rfc3339(package.get_build_time()),
"license": package.get_license()
})
return packages
def depsolve(self, arguments):
"""depsolve returns a list of the dependencies for the set of transactions
"""
# Return an empty list when 'transactions' key is missing or when it is None
transactions = arguments.get("transactions") or []
# collect repo IDs from the request so we know whether to translate gpg key paths
request_repo_ids = set(repo["id"] for repo in arguments.get("repos", []))
root_dir = arguments.get("root_dir")
last_transaction: List = []
for transaction in transactions:
goal = dnf5.base.Goal(self.base)
goal.reset()
sack = self.base.get_rpm_package_sack()
sack.clear_user_excludes()
# weak deps are selected per-transaction
self.base.get_config().install_weak_deps = transaction.get("install_weak_deps", False)
# set the packages from the last transaction as installed
for installed_pkg in last_transaction:
goal.add_rpm_install(installed_pkg)
# Support group/environment names as well as ids
settings = dnf5.base.GoalJobSettings()
settings.group_with_name = True
# Packages are added individually, excludes are handled in the repo setup
for pkg in transaction.get("package-specs"):
goal.add_install(pkg, settings)
transaction = goal.resolve()
transaction_problems = transaction.get_problems()
if transaction_problems == NOT_FOUND:
raise MarkingError("\n".join(transaction.get_resolve_logs_as_strings()))
if transaction_problems != NO_PROBLEM:
raise DepsolveError("\n".join(transaction.get_resolve_logs_as_strings()))
# store the current transaction result
last_transaction.clear()
for tsi in transaction.get_transaction_packages():
# Only add packages being installed, upgraded, downgraded, or reinstalled
if not dnf5.base.transaction.transaction_item_action_is_inbound(tsi.get_action()):
continue
last_transaction.append(tsi.get_package())
# Something went wrong, but no error was generated by goal.resolve()
if len(transactions) > 0 and len(last_transaction) == 0:
raise DepsolveError("Empty transaction results")
packages = []
pkg_repos = {}
for package in last_transaction:
packages.append({
"name": package.get_name(),
"epoch": int(package.get_epoch()),
"version": package.get_version(),
"release": package.get_release(),
"arch": package.get_arch(),
"repo_id": package.get_repo_id(),
"path": package.get_location(),
"remote_location": remote_location(package),
"checksum": f"{package.get_checksum().get_type_str()}:{package.get_checksum().get_checksum()}",
})
# collect repository objects by id to create the 'repositories' collection for the response
pkg_repo = package.get_repo()
pkg_repos[pkg_repo.get_id()] = pkg_repo
packages = sorted(packages, key=lambda x: x["path"])
repositories = {} # full repository configs for the response
for repo in pkg_repos.values():
repo_cfg = repo.get_config()
repositories[repo.get_id()] = {
"id": repo.get_id(),
"name": repo.get_name(),
"baseurl": list(repo_cfg.get_baseurl_option().get_value()), # resolves to () if unset
"metalink": get_string_option(repo_cfg.get_metalink_option()),
"mirrorlist": get_string_option(repo_cfg.get_mirrorlist_option()),
"gpgcheck": repo_cfg.get_gpgcheck_option().get_value(),
"repo_gpgcheck": repo_cfg.get_repo_gpgcheck_option().get_value(),
"gpgkeys": read_keys(repo_cfg.get_gpgkey_option().get_value(),
root_dir if repo.get_id() not in request_repo_ids else None),
"sslverify": repo_cfg.get_sslverify_option().get_value(),
"sslclientkey": get_string_option(repo_cfg.get_sslclientkey_option()),
"sslclientcert": get_string_option(repo_cfg.get_sslclientcert_option()),
"sslcacert": get_string_option(repo_cfg.get_sslcacert_option()),
}
response = {
"solver": "dnf5",
"packages": packages,
"repos": repositories,
}
if "sbom" in arguments:
response["sbom"] = self._sbom_for_pkgset(last_transaction)
return response