By default `timeout` is 30 seconds, but we had it set to 5. Drop the override and use the default. This has two effects: it increases the time before we give up on connecting (as it says on the tin), and it also increases the time download has to be slow for before we give up. Internally, we were seing failures in downlaoding metadata from ODCS and similar issues have occurred in CI too. The potential downside to this is in case of having several mirrors this means it takes longer before giving up on a bad one and trying a better one. But slow is better than broken, so for now rever to the default behavior. Signed-off-by: Tom Gundersen <teg@jklm.no>
406 lines
15 KiB
Python
Executable file
406 lines
15 KiB
Python
Executable file
#!/usr/bin/python3
|
|
# pylint: disable=invalid-name
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import pickle
|
|
import shutil
|
|
import socket
|
|
import socketserver
|
|
import sys
|
|
import tempfile
|
|
|
|
from datetime import datetime, timedelta
|
|
from http.server import BaseHTTPRequestHandler
|
|
from multiprocessing import Lock
|
|
|
|
import dnf
|
|
import hawkey
|
|
|
|
# Logging setup (to systemd if available)
|
|
formatter = logging.Formatter(
|
|
fmt="%(asctime)s %(name)s.%(levelname)s: %(message)s",
|
|
datefmt="%Y.%m.%d %H:%M:%S"
|
|
)
|
|
handler = logging.StreamHandler(stream=sys.stdout)
|
|
handler.setFormatter(formatter)
|
|
log = logging.getLogger('dnf-json')
|
|
log.addHandler(handler)
|
|
log.setLevel(logging.INFO)
|
|
|
|
# Synchronisation necessary for the multiprocess request handling.
|
|
process_lock = Lock()
|
|
|
|
|
|
class CacheState():
|
|
"""
|
|
A CacheState keeps track of the cache folders.
|
|
CacheState objects can update the list of used cache folders, which
|
|
will associate a date object to the name of the folder.
|
|
CacheState objects can ask to clean the unused cache folders.
|
|
Meaning that the folders having reach a certain timeout will be deleted.
|
|
"""
|
|
|
|
def __init__(self, cache_dir, cache_timeout, folder_dict=None):
|
|
"""
|
|
cache_dir the place where the state is stored
|
|
folder_dict a dict containing the existing list of cache folders
|
|
cache_timeout a timedelta before a cache folder can be deleted
|
|
"""
|
|
if folder_dict is None:
|
|
folder_dict = {}
|
|
self.cache_dir = cache_dir
|
|
self.folder_dict = folder_dict
|
|
self.cache_timeout = cache_timeout
|
|
|
|
def update_used(self, folder):
|
|
"""
|
|
Associate a datetime.now() to the folders given as parameters
|
|
"""
|
|
log.debug("Folder %s was used", folder)
|
|
self.folder_dict[folder] = datetime.now()
|
|
|
|
def clean_unused(self):
|
|
"""
|
|
Delete the folders having reach the timeout
|
|
"""
|
|
log.info("clean unused folders")
|
|
now = datetime.now()
|
|
list_folder_to_delete = []
|
|
for folder, then in self.folder_dict.items():
|
|
delta = now - then
|
|
log.debug("delete %s if %s > than %s", folder, delta, self.cache_timeout)
|
|
if delta > self.cache_timeout:
|
|
list_folder_to_delete.append(folder)
|
|
for folder in list_folder_to_delete:
|
|
del self.folder_dict[folder]
|
|
shutil.rmtree(folder)
|
|
|
|
@classmethod
|
|
def load(cls, cache_dir):
|
|
try:
|
|
with open(os.path.join(cache_dir, "cache_state.pkl"), "rb") as inp:
|
|
return pickle.load(inp)
|
|
except FileNotFoundError:
|
|
return cls(cache_dir, timedelta(hours=24))
|
|
|
|
def store(self):
|
|
with open(os.path.join(self.cache_dir, "cache_state.pkl"), "wb") as outp:
|
|
return pickle.dump(self, outp)
|
|
|
|
|
|
class Solver():
|
|
|
|
# pylint: disable=too-many-arguments
|
|
def __init__(self, repos, module_platform_id, persistdir, cachedir, arch):
|
|
self.base = dnf.Base()
|
|
|
|
# Enable fastestmirror to ensure we choose the fastest mirrors for
|
|
# downloading metadata (when depsolving) and downloading packages.
|
|
self.base.conf.fastestmirror = True
|
|
|
|
# We use the same cachedir for multiple architectures. Unfortunately,
|
|
# this is something that doesn't work well in certain situations
|
|
# with zchunk:
|
|
# Imagine that we already have cache for arch1. Then, we use dnf-json
|
|
# to depsolve for arch2. If ZChunk is enabled and available (that's
|
|
# the case for Fedora), dnf will try to download only differences
|
|
# between arch1 and arch2 metadata. But, as these are completely
|
|
# different, dnf must basically redownload everything.
|
|
# For downloding deltas, zchunk uses HTTP range requests. Unfortunately,
|
|
# if the mirror doesn't support multi range requests, then zchunk will
|
|
# download one small segment per a request. Because we need to update
|
|
# the whole metadata (10s of MB), this can be extremely slow in some cases.
|
|
# I think that we can come up with a better fix but let's just disable
|
|
# zchunk for now. As we are already downloading a lot of data when
|
|
# building images, I don't care if we download even more.
|
|
self.base.conf.zchunk = False
|
|
|
|
# Set the rest of the dnf configuration.
|
|
self.base.conf.module_platform_id = module_platform_id
|
|
self.base.conf.config_file_path = "/dev/null"
|
|
self.base.conf.persistdir = persistdir
|
|
self.base.conf.cachedir = cachedir
|
|
self.base.conf.substitutions['arch'] = arch
|
|
self.base.conf.substitutions['basearch'] = dnf.rpm.basearch(arch)
|
|
|
|
for repo in repos:
|
|
self.base.repos.add(self._dnfrepo(repo, self.base.conf))
|
|
self.base.fill_sack(load_system_repo=False)
|
|
|
|
@staticmethod
|
|
def _dnfrepo(desc, parent_conf=None):
|
|
"""Makes a dnf.repo.Repo out of a JSON repository description"""
|
|
|
|
repo = dnf.repo.Repo(desc["id"], parent_conf)
|
|
|
|
if "name" in desc:
|
|
repo.name = desc["name"]
|
|
if "baseurl" in desc:
|
|
repo.baseurl = desc["baseurl"]
|
|
elif "metalink" in desc:
|
|
repo.metalink = desc["metalink"]
|
|
elif "mirrorlist" in desc:
|
|
repo.mirrorlist = desc["mirrorlist"]
|
|
else:
|
|
assert False
|
|
|
|
if desc.get("ignoressl", False):
|
|
repo.sslverify = False
|
|
if "sslcacert" in desc:
|
|
repo.sslcacert = desc["sslcacert"]
|
|
if "sslclientkey" in desc:
|
|
repo.sslclientkey = desc["sslclientkey"]
|
|
if "sslclientcert" in desc:
|
|
repo.sslclientcert = desc["sslclientcert"]
|
|
|
|
# In dnf, the default metadata expiration time is 48 hours. However,
|
|
# some repositories never expire the metadata, and others expire it much
|
|
# sooner than that. We therefore allow this to be configured. If nothing
|
|
# is provided we error on the side of checking if we should invalidate
|
|
# the cache. If cache invalidation is not necessary, the overhead of
|
|
# checking is in the hundreds of milliseconds. In order to avoid this
|
|
# overhead accumulating for API calls that consist of several dnf calls,
|
|
# we set the expiration to a short time period, rather than 0.
|
|
repo.metadata_expire = desc.get("metadata_expire", "20s")
|
|
|
|
return repo
|
|
|
|
def _repo_checksums(self):
|
|
checksums = {}
|
|
for repo in self.base.repos.iter_enabled():
|
|
# Uses the same algorithm as libdnf to find cache dir:
|
|
# https://github.com/rpm-software-management/libdnf/blob/master/libdnf/repo/Repo.cpp#L1288
|
|
if repo.metalink:
|
|
url = repo.metalink
|
|
elif repo.mirrorlist:
|
|
url = repo.mirrorlist
|
|
elif repo.baseurl:
|
|
url = repo.baseurl[0]
|
|
else:
|
|
assert False
|
|
|
|
digest = hashlib.sha256(url.encode()).hexdigest()[:16]
|
|
|
|
repomd_file = f"{repo.id}-{digest}/repodata/repomd.xml"
|
|
with open(f"{self.base.conf.cachedir}/{repomd_file}", "rb") as f:
|
|
repomd = f.read()
|
|
|
|
checksums[repo.id] = "sha256:" + hashlib.sha256(repomd).hexdigest()
|
|
|
|
return checksums
|
|
|
|
@staticmethod
|
|
def _timestamp_to_rfc3339(timestamp):
|
|
return datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%dT%H:%M:%SZ')
|
|
|
|
def dump(self):
|
|
packages = []
|
|
for package in self.base.sack.query().available():
|
|
packages.append({
|
|
"name": package.name,
|
|
"summary": package.summary,
|
|
"description": package.description,
|
|
"url": package.url,
|
|
"repo_id": package.repoid,
|
|
"epoch": package.epoch,
|
|
"version": package.version,
|
|
"release": package.release,
|
|
"arch": package.arch,
|
|
"buildtime": self._timestamp_to_rfc3339(package.buildtime),
|
|
"license": package.license
|
|
})
|
|
return {
|
|
"checksums": self._repo_checksums(),
|
|
"packages": packages
|
|
}
|
|
|
|
def depsolve(self, package_spec, exclude_spec):
|
|
self.base.install_specs(package_spec, exclude_spec)
|
|
self.base.resolve()
|
|
dependencies = []
|
|
for tsi in self.base.transaction:
|
|
# Avoid using the install_set() helper, as it does not guarantee
|
|
# a stable order
|
|
if tsi.action not in dnf.transaction.FORWARD_ACTIONS:
|
|
continue
|
|
package = tsi.pkg
|
|
|
|
dependencies.append({
|
|
"name": package.name,
|
|
"epoch": package.epoch,
|
|
"version": package.version,
|
|
"release": package.release,
|
|
"arch": package.arch,
|
|
"repo_id": package.repoid,
|
|
"path": package.relativepath,
|
|
"remote_location": package.remote_location(),
|
|
"checksum": (
|
|
f"{hawkey.chksum_name(package.chksum[0])}:"
|
|
f"{package.chksum[1].hex()}"
|
|
)
|
|
})
|
|
return {
|
|
"checksums": self._repo_checksums(),
|
|
"dependencies": dependencies
|
|
}
|
|
|
|
|
|
class DnfJsonRequestHandler(BaseHTTPRequestHandler):
|
|
"""
|
|
Answers Http requests to depsolve or dump packages.
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.cache_dir = None
|
|
super().__init__(*args, **kwargs)
|
|
|
|
def init_cache_folder_list(self, repos):
|
|
cache_folders = []
|
|
for repo in repos:
|
|
if "baseurl" in repo:
|
|
url = repo["baseurl"]
|
|
elif "metalink" in repo:
|
|
url = repo["metalink"]
|
|
elif "mirrorlist" in repo:
|
|
url = repo["mirrorlist"]
|
|
else:
|
|
assert False
|
|
digest = hashlib.sha256(url.encode()).hexdigest()[:16]
|
|
repoid = repo["id"]
|
|
cache_folders.append(f"{self.cache_dir}/{repoid}-{digest}")
|
|
return cache_folders
|
|
|
|
def _send(self):
|
|
self.client_address = ('', 0)
|
|
|
|
def response_with_dnf_error(self, kind: str, reason: str):
|
|
self._send()
|
|
self.send_response(500)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"kind": kind, "reason":
|
|
reason}).encode("utf-8"))
|
|
|
|
def response_failure(self, json_object):
|
|
self._send()
|
|
self.send_response(500)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(json_object).encode("utf-8"))
|
|
|
|
def response_success(self, json_object):
|
|
self._send()
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(json_object).encode("utf-8"))
|
|
|
|
def do_POST(self):
|
|
"""
|
|
do_Post answer the request of a depsolving/dump.
|
|
Depsolving and dump require synchronizing caches on disk to perform their
|
|
operations. Caches folders are created for each remote repository. Since
|
|
the user request contains the list of repository the space taken can grow
|
|
indefinitely.
|
|
|
|
This solution implements a cache cleaning mechanism. It works by deleting
|
|
the directories on a timeout based rule and by keeping the last used date
|
|
in a synced file on disks.
|
|
|
|
This method is multiprocess safe.
|
|
"""
|
|
with process_lock:
|
|
try:
|
|
content_len = int(self.headers.get('Content-Length'))
|
|
data = self.rfile.read(content_len)
|
|
call = json.loads(data.decode("utf-8"))
|
|
command = call["command"]
|
|
arguments = call["arguments"]
|
|
repos = arguments.get("repos", {})
|
|
arch = arguments["arch"]
|
|
module_platform_id = arguments["module_platform_id"]
|
|
|
|
# If dnf-json is run as a service, we don't want users to be able to set the cache
|
|
self.cache_dir = os.environ.get("OVERWRITE_CACHE_DIR", "")
|
|
if self.cache_dir:
|
|
self.cache_dir = os.path.join(self.cache_dir, arch)
|
|
else:
|
|
self.cache_dir = arguments.get("cachedir", "")
|
|
|
|
if not self.cache_dir:
|
|
self.response_failure({"kind": "Error", "reason": "No cache dir set"})
|
|
cache_state = CacheState.load(self.cache_dir)
|
|
|
|
with tempfile.TemporaryDirectory() as persistdir:
|
|
try:
|
|
solver = Solver(
|
|
repos,
|
|
module_platform_id,
|
|
persistdir,
|
|
self.cache_dir,
|
|
arch
|
|
)
|
|
if command == "dump":
|
|
self.response_success(solver.dump())
|
|
log.info("dump success")
|
|
elif command == "depsolve":
|
|
self.response_success(
|
|
solver.depsolve(
|
|
arguments["package-specs"],
|
|
arguments.get("exclude-specs", [])
|
|
)
|
|
)
|
|
log.info("depsolve success")
|
|
|
|
except dnf.exceptions.MarkingErrors as e:
|
|
log.info("error install_specs")
|
|
self.response_with_dnf_error(
|
|
"MarkingErrors",
|
|
f"Error occurred when marking packages for installation: {e}"
|
|
)
|
|
except dnf.exceptions.DepsolveError as e:
|
|
log.info("error depsolve")
|
|
self.response_with_dnf_error(
|
|
"DepsolveError",
|
|
(
|
|
"There was a problem depsolving "
|
|
f"{arguments['package-specs']}: {e}"
|
|
)
|
|
)
|
|
except dnf.exceptions.Error as e:
|
|
self.response_with_dnf_error(
|
|
type(e).__name__,
|
|
f"Error occurred when setting up repo: {e}")
|
|
finally:
|
|
for cache_folder in self.init_cache_folder_list(repos):
|
|
cache_state.update_used(cache_folder)
|
|
cache_state.clean_unused()
|
|
cache_state.store()
|
|
|
|
|
|
log.info("Starting the dnf-json server")
|
|
|
|
LISTEN_FDS = int(os.environ.get("LISTEN_FDS", 0))
|
|
# set from entrypoint if differs from 3
|
|
LISTEN_FD = int(os.environ.get("LISTEN_FD", 3))
|
|
|
|
|
|
# The dnf-json web server has to use forks to serve the requests. Because the
|
|
# dnf library is leaking memory in its Cpp side.
|
|
class SystemDActivationSocketServer(socketserver.ForkingMixIn, socketserver.UnixStreamServer):
|
|
def server_bind(self):
|
|
log.debug("service bind")
|
|
log.debug("rebind socket")
|
|
log.debug("address_family: %d ", self.address_family)
|
|
log.debug("socket_type: %d ", self.socket_type)
|
|
if LISTEN_FDS > 1:
|
|
log.warning("More than one LISTEN_FDS")
|
|
self.socket = socket.fromfd(LISTEN_FD, self.address_family, self.socket_type)
|
|
|
|
|
|
# start the web server
|
|
server = SystemDActivationSocketServer('', DnfJsonRequestHandler)
|
|
server.serve_forever()
|