dnf-json: refactor

Prepare the multi-cache architecture by doing some refactoring.
Mainly this commit adds a solver class that embeds all the logic around
dnf. Responsibilities of communicating on the socket and depsolving are
separated.
This commit is contained in:
Thomas Lavocat 2021-11-23 16:22:23 +01:00 committed by Thomas Lavocat
parent ca126e9747
commit f8281eee54

372
dnf-json
View file

@ -8,130 +8,183 @@ import os
import socket import socket
import socketserver import socketserver
import logging import logging
import sys
from http.server import BaseHTTPRequestHandler from http.server import BaseHTTPRequestHandler
import dnf import dnf
import hawkey import hawkey
from systemd.journal import JournalHandler
formatter = logging.Formatter(
fmt="%(asctime)s %(name)s.%(levelname)s: %(message)s",
datefmt="%Y.%m.%d %H:%M:%S"
)
handler = logging.StreamHandler(stream=sys.stdout)
handler.setFormatter(formatter)
log = logging.getLogger('dnf-json') log = logging.getLogger('dnf-json')
log.addHandler(JournalHandler()) log.addHandler(handler)
log.setLevel(logging.INFO) log.setLevel(logging.INFO)
DNF_ERROR_EXIT_CODE = 10 class Solver():
def timestamp_to_rfc3339(timestamp, package): def __init__(self, repos, module_platform_id, persistdir, cachedir, arch):
d = datetime.datetime.utcfromtimestamp(package.buildtime) self.base = dnf.Base()
return d.strftime('%Y-%m-%dT%H:%M:%SZ')
def dnfrepo(desc, parent_conf=None): # Enable fastestmirror to ensure we choose the fastest mirrors for
"""Makes a dnf.repo.Repo out of a JSON repository description""" # downloading metadata (when depsolving) and downloading packages.
self.base.conf.fastestmirror = True
repo = dnf.repo.Repo(desc["id"], parent_conf) # We use the same cachedir for multiple architectures. Unfortunately,
# this is something that doesn't work well in certain situations
# with zchunk:
# Imagine that we already have cache for arch1. Then, we use dnf-json
# to depsolve for arch2. If ZChunk is enabled and available (that's
# the case for Fedora), dnf will try to download only differences
# between arch1 and arch2 metadata. But, as these are completely
# different, dnf must basically redownload everything.
# For downloding deltas, zchunk uses HTTP range requests. Unfortunately,
# if the mirror doesn't support multi range requests, then zchunk will
# download one small segment per a request. Because we need to update
# the whole metadata (10s of MB), this can be extremely slow in some cases.
# I think that we can come up with a better fix but let's just disable
# zchunk for now. As we are already downloading a lot of data when
# building images, I don't care if we download even more.
self.base.conf.zchunk = False
if "baseurl" in desc: # Try another mirror if it takes longer than 5 seconds to connect.
repo.baseurl = desc["baseurl"] self.base.conf.timeout = 5
elif "metalink" in desc:
repo.metalink = desc["metalink"]
elif "mirrorlist" in desc:
repo.mirrorlist = desc["mirrorlist"]
else:
assert False
if desc.get("ignoressl", False): # Set the rest of the dnf configuration.
repo.sslverify = False self.base.conf.module_platform_id = module_platform_id
if "sslcacert" in desc: self.base.conf.config_file_path = "/dev/null"
repo.sslcacert = desc["sslcacert"] self.base.conf.persistdir = persistdir
if "sslclientkey" in desc: self.base.conf.cachedir = cachedir
repo.sslclientkey = desc["sslclientkey"] self.base.conf.substitutions['arch'] = arch
if "sslclientcert" in desc: self.base.conf.substitutions['basearch'] = dnf.rpm.basearch(arch)
repo.sslclientcert = desc["sslclientcert"]
# In dnf, the default metadata expiration time is 48 hours. However, for repo in repos:
# some repositories never expire the metadata, and others expire it much self.base.repos.add(self._dnfrepo(repo, self.base.conf))
# sooner than that. We therefore allow this to be configured. If nothing self.base.fill_sack(load_system_repo=False)
# is provided we error on the side of checking if we should invalidate
# the cache. If cache invalidation is not necessary, the overhead of
# checking is in the hundreds of milliseconds. In order to avoid this
# overhead accumulating for API calls that consist of several dnf calls,
# we set the expiration to a short time period, rather than 0.
repo.metadata_expire = desc.get("metadata_expire", "20s")
return repo def _dnfrepo(self, desc, parent_conf=None):
"""Makes a dnf.repo.Repo out of a JSON repository description"""
repo = dnf.repo.Repo(desc["id"], parent_conf)
def create_base(repos, module_platform_id, persistdir, cachedir, arch): if "baseurl" in desc:
base = dnf.Base() repo.baseurl = desc["baseurl"]
elif "metalink" in desc:
# Enable fastestmirror to ensure we choose the fastest mirrors for repo.metalink = desc["metalink"]
# downloading metadata (when depsolving) and downloading packages. elif "mirrorlist" in desc:
base.conf.fastestmirror = True repo.mirrorlist = desc["mirrorlist"]
# We use the same cachedir for multiple architectures. Unfortunately,
# this is something that doesn't work well in certain situations
# with zchunk:
# Imagine that we already have cache for arch1. Then, we use dnf-json
# to depsolve for arch2. If ZChunk is enabled and available (that's
# the case for Fedora), dnf will try to download only differences
# between arch1 and arch2 metadata. But, as these are completely
# different, dnf must basically redownload everything.
# For downloding deltas, zchunk uses HTTP range requests. Unfortunately,
# if the mirror doesn't support multi range requests, then zchunk will
# download one small segment per a request. Because we need to update
# the whole metadata (10s of MB), this can be extremely slow in some cases.
# I think that we can come up with a better fix but let's just disable
# zchunk for now. As we are already downloading a lot of data when
# building images, I don't care if we download even more.
base.conf.zchunk = False
# Try another mirror if it takes longer than 5 seconds to connect.
base.conf.timeout = 5
# Set the rest of the dnf configuration.
base.conf.module_platform_id = module_platform_id
base.conf.config_file_path = "/dev/null"
base.conf.persistdir = persistdir
base.conf.cachedir = cachedir
base.conf.substitutions['arch'] = arch
base.conf.substitutions['basearch'] = dnf.rpm.basearch(arch)
for repo in repos:
base.repos.add(dnfrepo(repo, base.conf))
base.fill_sack(load_system_repo=False)
return base
def repo_checksums(base):
checksums = {}
for repo in base.repos.iter_enabled():
# Uses the same algorithm as libdnf to find cache dir:
# https://github.com/rpm-software-management/libdnf/blob/master/libdnf/repo/Repo.cpp#L1288
if repo.metalink:
url = repo.metalink
elif repo.mirrorlist:
url = repo.mirrorlist
elif repo.baseurl:
url = repo.baseurl[0]
else: else:
assert False assert False
digest = hashlib.sha256(url.encode()).hexdigest()[:16] if desc.get("ignoressl", False):
repo.sslverify = False
if "sslcacert" in desc:
repo.sslcacert = desc["sslcacert"]
if "sslclientkey" in desc:
repo.sslclientkey = desc["sslclientkey"]
if "sslclientcert" in desc:
repo.sslclientcert = desc["sslclientcert"]
repomd_file = f"{repo.id}-{digest}/repodata/repomd.xml" # In dnf, the default metadata expiration time is 48 hours. However,
with open(f"{base.conf.cachedir}/{repomd_file}", "rb") as f: # some repositories never expire the metadata, and others expire it much
repomd = f.read() # sooner than that. We therefore allow this to be configured. If nothing
# is provided we error on the side of checking if we should invalidate
# the cache. If cache invalidation is not necessary, the overhead of
# checking is in the hundreds of milliseconds. In order to avoid this
# overhead accumulating for API calls that consist of several dnf calls,
# we set the expiration to a short time period, rather than 0.
repo.metadata_expire = desc.get("metadata_expire", "20s")
checksums[repo.id] = "sha256:" + hashlib.sha256(repomd).hexdigest() return repo
return checksums def _repo_checksums(self):
checksums = {}
for repo in self.base.repos.iter_enabled():
# Uses the same algorithm as libdnf to find cache dir:
# https://github.com/rpm-software-management/libdnf/blob/master/libdnf/repo/Repo.cpp#L1288
if repo.metalink:
url = repo.metalink
elif repo.mirrorlist:
url = repo.mirrorlist
elif repo.baseurl:
url = repo.baseurl[0]
else:
assert False
digest = hashlib.sha256(url.encode()).hexdigest()[:16]
repomd_file = f"{repo.id}-{digest}/repodata/repomd.xml"
with open(f"{self.base.conf.cachedir}/{repomd_file}", "rb") as f:
repomd = f.read()
checksums[repo.id] = "sha256:" + hashlib.sha256(repomd).hexdigest()
return checksums
def _timestamp_to_rfc3339(self, timestamp):
d = datetime.datetime.utcfromtimestamp(timestamp)
return d.strftime('%Y-%m-%dT%H:%M:%SZ')
def dump(self):
packages = []
for package in self.base.sack.query().available():
packages.append({
"name": package.name,
"summary": package.summary,
"description": package.description,
"url": package.url,
"epoch": package.epoch,
"version": package.version,
"release": package.release,
"arch": package.arch,
"buildtime": self._timestamp_to_rfc3339(package.buildtime),
"license": package.license
})
return {
"checksums": self._repo_checksums(),
"packages": packages
}
def depsolve(self, package_spec, exclude_spec):
self.base.install_specs(package_spec, exclude_spec)
self.base.resolve()
dependencies = []
for tsi in self.base.transaction:
# Avoid using the install_set() helper, as it does not guarantee
# a stable order
if tsi.action not in dnf.transaction.FORWARD_ACTIONS:
continue
package = tsi.pkg
dependencies.append({
"name": package.name,
"epoch": package.epoch,
"version": package.version,
"release": package.release,
"arch": package.arch,
"repo_id": package.reponame,
"path": package.relativepath,
"remote_location": package.remote_location(),
"checksum": (
f"{hawkey.chksum_name(package.chksum[0])}:"
f"{package.chksum[1].hex()}"
)
})
return {
"checksums": self._repo_checksums(),
"dependencies": dependencies
}
class DnfJsonRequestHandler(BaseHTTPRequestHandler): class DnfJsonRequestHandler(BaseHTTPRequestHandler):
def _send(self): def _send(self):
self.client_address=('',) self.client_address=('',)
def exit_with_dnf_error(self, kind: str, reason: str): def response_with_dnf_error(self, kind: str, reason: str):
self._send() self._send()
self.send_response(500) self.send_response(500)
self.send_header("Content-Type", "application/json") self.send_header("Content-Type", "application/json")
@ -139,7 +192,7 @@ class DnfJsonRequestHandler(BaseHTTPRequestHandler):
self.wfile.write(json.dumps({"kind": kind, "reason": self.wfile.write(json.dumps({"kind": kind, "reason":
reason}).encode("utf-8")) reason}).encode("utf-8"))
def exit_with_success(self, json_object): def response_success(self, json_object):
self._send() self._send()
self.send_response(200) self.send_response(200)
self.send_header("Content-Type", "application/json") self.send_header("Content-Type", "application/json")
@ -159,114 +212,57 @@ class DnfJsonRequestHandler(BaseHTTPRequestHandler):
with tempfile.TemporaryDirectory() as persistdir: with tempfile.TemporaryDirectory() as persistdir:
try: try:
base = create_base( solver = Solver(
repos, repos,
module_platform_id, module_platform_id,
persistdir, persistdir,
cachedir, cachedir,
arch arch
) )
except dnf.exceptions.Error as e: if command == "dump":
return self.exit_with_dnf_error( self.response_success(solver.dump())
type(e).__name__, log.info("dump success")
f"Error occurred when setting up repo: {e}" elif command == "depsolve":
self.response_success(
solver.depsolve(
arguments["package-specs"],
arguments.get("exclude-specs", [])
)
)
log.info("depsolve success")
except dnf.exceptions.MarkingErrors as e:
log.info("error install_specs")
self.response_with_dnf_error(
"MarkingErrors",
f"Error occurred when marking packages for installation: {e}"
) )
except dnf.exceptions.DepsolveError as e:
if command == "dump": log.info("error depsolve")
packages = [] self.response_with_dnf_error(
for package in base.sack.query().available(): "DepsolveError",
packages.append({ (
"name": package.name, "There was a problem depsolving "
"summary": package.summary, f"{arguments['package-specs']}: {e}"
"description": package.description,
"url": package.url,
"epoch": package.epoch,
"version": package.version,
"release": package.release,
"arch": package.arch,
"buildtime": timestamp_to_rfc3339(package.buildtime, package),
"license": package.license
})
self.exit_with_success({
"checksums": repo_checksums(base),
"packages": packages
})
elif command == "depsolve":
log.info("start depsolve")
errors = []
try:
base.install_specs(
arguments["package-specs"],
exclude=arguments.get("exclude-specs", [])
) )
except dnf.exceptions.MarkingErrors as e: )
log.info("error install_specs") except dnf.exceptions.Error as e:
return self.exit_with_dnf_error( self.response_with_dnf_error(
"MarkingErrors", type(e).__name__,
f"Error occurred when marking packages for installation: {e}" f"Error occurred when setting up repo: {e}")
)
try:
base.resolve()
except dnf.exceptions.DepsolveError as e:
log.info("error depsolve")
return self.exit_with_dnf_error(
"DepsolveError",
(
"There was a problem depsolving "
f"{arguments['package-specs']}: {e}"
)
)
dependencies = []
for tsi in base.transaction:
# Avoid using the install_set() helper, as it does not guarantee
# a stable order
if tsi.action not in dnf.transaction.FORWARD_ACTIONS:
continue
package = tsi.pkg
dependencies.append({
"name": package.name,
"epoch": package.epoch,
"version": package.version,
"release": package.release,
"arch": package.arch,
"repo_id": package.reponame,
"path": package.relativepath,
"remote_location": package.remote_location(),
"checksum": (
f"{hawkey.chksum_name(package.chksum[0])}:"
f"{package.chksum[1].hex()}"
)
})
self.exit_with_success({
"checksums": repo_checksums(base),
"dependencies": dependencies
})
log.info("Starting the dnf-json server") log.info("Starting the dnf-json server")
SOCKET_FILE_PATH = "/run/osbuild-dnf-json/api.sock"
LISTEN_FDS = int(os.environ.get("LISTEN_FDS", 0)) LISTEN_FDS = int(os.environ.get("LISTEN_FDS", 0))
LISTEN_PID = os.environ.get("LISTEN_PID", None) or os.getpid()
log.debug("env %d and %d", LISTEN_FDS, LISTEN_PID)
class SystemDActivationSocketServer(socketserver.UnixStreamServer): class SystemDActivationSocketServer(socketserver.UnixStreamServer):
def server_bind(self): def server_bind(self):
log.debug("service bind") log.debug("service bind")
if LISTEN_FDS == 0: assert LISTEN_FDS == 1
log.debug("create new socket") log.debug("rebind socket")
socketserver.UnixStreamServer.server_bind(self) log.debug("address_family: %d ", self.address_family)
else: log.debug("socket_type: %d ", self.socket_type)
log.debug("rebind socket") self.socket = socket.fromfd(3, self.address_family, self.socket_type)
log.debug("address_family: %d ", self.address_family)
log.debug("socket_type: %d ", self.socket_type)
self.socket = socket.fromfd(3, self.address_family, self.socket_type)
server = SystemDActivationSocketServer(SOCKET_FILE_PATH, DnfJsonRequestHandler) server = SystemDActivationSocketServer('', DnfJsonRequestHandler)
server.serve_forever() server.serve_forever()