From f8281eee5474181685fde0f90f5c757c865936f4 Mon Sep 17 00:00:00 2001 From: Thomas Lavocat Date: Tue, 23 Nov 2021 16:22:23 +0100 Subject: [PATCH] dnf-json: refactor Prepare the multi-cache architecture by doing some refactoring. Mainly this commit adds a solver class that embeds all the logic around dnf. Responsibilities of communicating on the socket and depsolving are separated. --- dnf-json | 372 +++++++++++++++++++++++++++---------------------------- 1 file changed, 184 insertions(+), 188 deletions(-) diff --git a/dnf-json b/dnf-json index 37411dfd4..d046867eb 100755 --- a/dnf-json +++ b/dnf-json @@ -8,130 +8,183 @@ import os import socket import socketserver import logging +import sys from http.server import BaseHTTPRequestHandler import dnf import hawkey -from systemd.journal import JournalHandler +formatter = logging.Formatter( + fmt="%(asctime)s %(name)s.%(levelname)s: %(message)s", + datefmt="%Y.%m.%d %H:%M:%S" + ) +handler = logging.StreamHandler(stream=sys.stdout) +handler.setFormatter(formatter) log = logging.getLogger('dnf-json') -log.addHandler(JournalHandler()) +log.addHandler(handler) log.setLevel(logging.INFO) -DNF_ERROR_EXIT_CODE = 10 +class Solver(): -def timestamp_to_rfc3339(timestamp, package): - d = datetime.datetime.utcfromtimestamp(package.buildtime) - return d.strftime('%Y-%m-%dT%H:%M:%SZ') + def __init__(self, repos, module_platform_id, persistdir, cachedir, arch): + self.base = dnf.Base() -def dnfrepo(desc, parent_conf=None): - """Makes a dnf.repo.Repo out of a JSON repository description""" + # Enable fastestmirror to ensure we choose the fastest mirrors for + # downloading metadata (when depsolving) and downloading packages. + self.base.conf.fastestmirror = True - repo = dnf.repo.Repo(desc["id"], parent_conf) + # We use the same cachedir for multiple architectures. Unfortunately, + # this is something that doesn't work well in certain situations + # with zchunk: + # Imagine that we already have cache for arch1. Then, we use dnf-json + # to depsolve for arch2. If ZChunk is enabled and available (that's + # the case for Fedora), dnf will try to download only differences + # between arch1 and arch2 metadata. But, as these are completely + # different, dnf must basically redownload everything. + # For downloding deltas, zchunk uses HTTP range requests. Unfortunately, + # if the mirror doesn't support multi range requests, then zchunk will + # download one small segment per a request. Because we need to update + # the whole metadata (10s of MB), this can be extremely slow in some cases. + # I think that we can come up with a better fix but let's just disable + # zchunk for now. As we are already downloading a lot of data when + # building images, I don't care if we download even more. + self.base.conf.zchunk = False - if "baseurl" in desc: - repo.baseurl = desc["baseurl"] - elif "metalink" in desc: - repo.metalink = desc["metalink"] - elif "mirrorlist" in desc: - repo.mirrorlist = desc["mirrorlist"] - else: - assert False + # Try another mirror if it takes longer than 5 seconds to connect. + self.base.conf.timeout = 5 - if desc.get("ignoressl", False): - repo.sslverify = False - if "sslcacert" in desc: - repo.sslcacert = desc["sslcacert"] - if "sslclientkey" in desc: - repo.sslclientkey = desc["sslclientkey"] - if "sslclientcert" in desc: - repo.sslclientcert = desc["sslclientcert"] + # Set the rest of the dnf configuration. + self.base.conf.module_platform_id = module_platform_id + self.base.conf.config_file_path = "/dev/null" + self.base.conf.persistdir = persistdir + self.base.conf.cachedir = cachedir + self.base.conf.substitutions['arch'] = arch + self.base.conf.substitutions['basearch'] = dnf.rpm.basearch(arch) - # In dnf, the default metadata expiration time is 48 hours. However, - # some repositories never expire the metadata, and others expire it much - # sooner than that. We therefore allow this to be configured. If nothing - # is provided we error on the side of checking if we should invalidate - # the cache. If cache invalidation is not necessary, the overhead of - # checking is in the hundreds of milliseconds. In order to avoid this - # overhead accumulating for API calls that consist of several dnf calls, - # we set the expiration to a short time period, rather than 0. - repo.metadata_expire = desc.get("metadata_expire", "20s") + for repo in repos: + self.base.repos.add(self._dnfrepo(repo, self.base.conf)) + self.base.fill_sack(load_system_repo=False) - return repo + def _dnfrepo(self, desc, parent_conf=None): + """Makes a dnf.repo.Repo out of a JSON repository description""" + repo = dnf.repo.Repo(desc["id"], parent_conf) -def create_base(repos, module_platform_id, persistdir, cachedir, arch): - base = dnf.Base() - - # Enable fastestmirror to ensure we choose the fastest mirrors for - # downloading metadata (when depsolving) and downloading packages. - base.conf.fastestmirror = True - - # We use the same cachedir for multiple architectures. Unfortunately, - # this is something that doesn't work well in certain situations - # with zchunk: - # Imagine that we already have cache for arch1. Then, we use dnf-json - # to depsolve for arch2. If ZChunk is enabled and available (that's - # the case for Fedora), dnf will try to download only differences - # between arch1 and arch2 metadata. But, as these are completely - # different, dnf must basically redownload everything. - # For downloding deltas, zchunk uses HTTP range requests. Unfortunately, - # if the mirror doesn't support multi range requests, then zchunk will - # download one small segment per a request. Because we need to update - # the whole metadata (10s of MB), this can be extremely slow in some cases. - # I think that we can come up with a better fix but let's just disable - # zchunk for now. As we are already downloading a lot of data when - # building images, I don't care if we download even more. - base.conf.zchunk = False - - # Try another mirror if it takes longer than 5 seconds to connect. - base.conf.timeout = 5 - - # Set the rest of the dnf configuration. - base.conf.module_platform_id = module_platform_id - base.conf.config_file_path = "/dev/null" - base.conf.persistdir = persistdir - base.conf.cachedir = cachedir - base.conf.substitutions['arch'] = arch - base.conf.substitutions['basearch'] = dnf.rpm.basearch(arch) - - for repo in repos: - base.repos.add(dnfrepo(repo, base.conf)) - - base.fill_sack(load_system_repo=False) - return base - -def repo_checksums(base): - checksums = {} - for repo in base.repos.iter_enabled(): - # Uses the same algorithm as libdnf to find cache dir: - # https://github.com/rpm-software-management/libdnf/blob/master/libdnf/repo/Repo.cpp#L1288 - if repo.metalink: - url = repo.metalink - elif repo.mirrorlist: - url = repo.mirrorlist - elif repo.baseurl: - url = repo.baseurl[0] + if "baseurl" in desc: + repo.baseurl = desc["baseurl"] + elif "metalink" in desc: + repo.metalink = desc["metalink"] + elif "mirrorlist" in desc: + repo.mirrorlist = desc["mirrorlist"] else: assert False - digest = hashlib.sha256(url.encode()).hexdigest()[:16] + if desc.get("ignoressl", False): + repo.sslverify = False + if "sslcacert" in desc: + repo.sslcacert = desc["sslcacert"] + if "sslclientkey" in desc: + repo.sslclientkey = desc["sslclientkey"] + if "sslclientcert" in desc: + repo.sslclientcert = desc["sslclientcert"] - repomd_file = f"{repo.id}-{digest}/repodata/repomd.xml" - with open(f"{base.conf.cachedir}/{repomd_file}", "rb") as f: - repomd = f.read() + # In dnf, the default metadata expiration time is 48 hours. However, + # some repositories never expire the metadata, and others expire it much + # sooner than that. We therefore allow this to be configured. If nothing + # is provided we error on the side of checking if we should invalidate + # the cache. If cache invalidation is not necessary, the overhead of + # checking is in the hundreds of milliseconds. In order to avoid this + # overhead accumulating for API calls that consist of several dnf calls, + # we set the expiration to a short time period, rather than 0. + repo.metadata_expire = desc.get("metadata_expire", "20s") - checksums[repo.id] = "sha256:" + hashlib.sha256(repomd).hexdigest() + return repo - return checksums + def _repo_checksums(self): + checksums = {} + for repo in self.base.repos.iter_enabled(): + # Uses the same algorithm as libdnf to find cache dir: + # https://github.com/rpm-software-management/libdnf/blob/master/libdnf/repo/Repo.cpp#L1288 + if repo.metalink: + url = repo.metalink + elif repo.mirrorlist: + url = repo.mirrorlist + elif repo.baseurl: + url = repo.baseurl[0] + else: + assert False + + digest = hashlib.sha256(url.encode()).hexdigest()[:16] + + repomd_file = f"{repo.id}-{digest}/repodata/repomd.xml" + with open(f"{self.base.conf.cachedir}/{repomd_file}", "rb") as f: + repomd = f.read() + + checksums[repo.id] = "sha256:" + hashlib.sha256(repomd).hexdigest() + + return checksums + + def _timestamp_to_rfc3339(self, timestamp): + d = datetime.datetime.utcfromtimestamp(timestamp) + return d.strftime('%Y-%m-%dT%H:%M:%SZ') + + def dump(self): + packages = [] + for package in self.base.sack.query().available(): + packages.append({ + "name": package.name, + "summary": package.summary, + "description": package.description, + "url": package.url, + "epoch": package.epoch, + "version": package.version, + "release": package.release, + "arch": package.arch, + "buildtime": self._timestamp_to_rfc3339(package.buildtime), + "license": package.license + }) + return { + "checksums": self._repo_checksums(), + "packages": packages + } + + def depsolve(self, package_spec, exclude_spec): + self.base.install_specs(package_spec, exclude_spec) + self.base.resolve() + dependencies = [] + for tsi in self.base.transaction: + # Avoid using the install_set() helper, as it does not guarantee + # a stable order + if tsi.action not in dnf.transaction.FORWARD_ACTIONS: + continue + package = tsi.pkg + + dependencies.append({ + "name": package.name, + "epoch": package.epoch, + "version": package.version, + "release": package.release, + "arch": package.arch, + "repo_id": package.reponame, + "path": package.relativepath, + "remote_location": package.remote_location(), + "checksum": ( + f"{hawkey.chksum_name(package.chksum[0])}:" + f"{package.chksum[1].hex()}" + ) + }) + return { + "checksums": self._repo_checksums(), + "dependencies": dependencies + } class DnfJsonRequestHandler(BaseHTTPRequestHandler): def _send(self): self.client_address=('',) - def exit_with_dnf_error(self, kind: str, reason: str): + def response_with_dnf_error(self, kind: str, reason: str): self._send() self.send_response(500) self.send_header("Content-Type", "application/json") @@ -139,7 +192,7 @@ class DnfJsonRequestHandler(BaseHTTPRequestHandler): self.wfile.write(json.dumps({"kind": kind, "reason": reason}).encode("utf-8")) - def exit_with_success(self, json_object): + def response_success(self, json_object): self._send() self.send_response(200) self.send_header("Content-Type", "application/json") @@ -159,114 +212,57 @@ class DnfJsonRequestHandler(BaseHTTPRequestHandler): with tempfile.TemporaryDirectory() as persistdir: try: - base = create_base( + solver = Solver( repos, module_platform_id, persistdir, cachedir, arch ) - except dnf.exceptions.Error as e: - return self.exit_with_dnf_error( - type(e).__name__, - f"Error occurred when setting up repo: {e}" + if command == "dump": + self.response_success(solver.dump()) + log.info("dump success") + elif command == "depsolve": + self.response_success( + solver.depsolve( + arguments["package-specs"], + arguments.get("exclude-specs", []) + ) + ) + log.info("depsolve success") + + except dnf.exceptions.MarkingErrors as e: + log.info("error install_specs") + self.response_with_dnf_error( + "MarkingErrors", + f"Error occurred when marking packages for installation: {e}" ) - - if command == "dump": - packages = [] - for package in base.sack.query().available(): - packages.append({ - "name": package.name, - "summary": package.summary, - "description": package.description, - "url": package.url, - "epoch": package.epoch, - "version": package.version, - "release": package.release, - "arch": package.arch, - "buildtime": timestamp_to_rfc3339(package.buildtime, package), - "license": package.license - }) - self.exit_with_success({ - "checksums": repo_checksums(base), - "packages": packages - }) - - elif command == "depsolve": - log.info("start depsolve") - errors = [] - - try: - base.install_specs( - arguments["package-specs"], - exclude=arguments.get("exclude-specs", []) + except dnf.exceptions.DepsolveError as e: + log.info("error depsolve") + self.response_with_dnf_error( + "DepsolveError", + ( + "There was a problem depsolving " + f"{arguments['package-specs']}: {e}" ) - except dnf.exceptions.MarkingErrors as e: - log.info("error install_specs") - return self.exit_with_dnf_error( - "MarkingErrors", - f"Error occurred when marking packages for installation: {e}" - ) - - try: - base.resolve() - except dnf.exceptions.DepsolveError as e: - log.info("error depsolve") - return self.exit_with_dnf_error( - "DepsolveError", - ( - "There was a problem depsolving " - f"{arguments['package-specs']}: {e}" - ) - ) - - dependencies = [] - for tsi in base.transaction: - # Avoid using the install_set() helper, as it does not guarantee - # a stable order - if tsi.action not in dnf.transaction.FORWARD_ACTIONS: - continue - package = tsi.pkg - - dependencies.append({ - "name": package.name, - "epoch": package.epoch, - "version": package.version, - "release": package.release, - "arch": package.arch, - "repo_id": package.reponame, - "path": package.relativepath, - "remote_location": package.remote_location(), - "checksum": ( - f"{hawkey.chksum_name(package.chksum[0])}:" - f"{package.chksum[1].hex()}" - ) - }) - self.exit_with_success({ - "checksums": repo_checksums(base), - "dependencies": dependencies - }) + ) + except dnf.exceptions.Error as e: + self.response_with_dnf_error( + type(e).__name__, + f"Error occurred when setting up repo: {e}") log.info("Starting the dnf-json server") -SOCKET_FILE_PATH = "/run/osbuild-dnf-json/api.sock" - LISTEN_FDS = int(os.environ.get("LISTEN_FDS", 0)) -LISTEN_PID = os.environ.get("LISTEN_PID", None) or os.getpid() - -log.debug("env %d and %d", LISTEN_FDS, LISTEN_PID) class SystemDActivationSocketServer(socketserver.UnixStreamServer): def server_bind(self): log.debug("service bind") - if LISTEN_FDS == 0: - log.debug("create new socket") - socketserver.UnixStreamServer.server_bind(self) - else: - log.debug("rebind socket") - log.debug("address_family: %d ", self.address_family) - log.debug("socket_type: %d ", self.socket_type) - self.socket = socket.fromfd(3, self.address_family, self.socket_type) + assert LISTEN_FDS == 1 + log.debug("rebind socket") + log.debug("address_family: %d ", self.address_family) + log.debug("socket_type: %d ", self.socket_type) + self.socket = socket.fromfd(3, self.address_family, self.socket_type) -server = SystemDActivationSocketServer(SOCKET_FILE_PATH, DnfJsonRequestHandler) +server = SystemDActivationSocketServer('', DnfJsonRequestHandler) server.serve_forever()