394 lines
15 KiB
Python
Executable file
394 lines
15 KiB
Python
Executable file
#!/usr/bin/python3
|
|
|
|
import datetime
|
|
import hashlib
|
|
import json
|
|
import tempfile
|
|
import os
|
|
import socket
|
|
import socketserver
|
|
import logging
|
|
import sys
|
|
from http.server import BaseHTTPRequestHandler
|
|
import pathlib
|
|
from multiprocessing import Lock
|
|
import queue
|
|
import shutil
|
|
from datetime import datetime, timedelta
|
|
|
|
import dnf
|
|
import hawkey
|
|
import pickle
|
|
|
|
# Logging setup (to systemd if available)
|
|
formatter = logging.Formatter(
|
|
fmt="%(asctime)s %(name)s.%(levelname)s: %(message)s",
|
|
datefmt="%Y.%m.%d %H:%M:%S"
|
|
)
|
|
handler = logging.StreamHandler(stream=sys.stdout)
|
|
handler.setFormatter(formatter)
|
|
log = logging.getLogger('dnf-json')
|
|
log.addHandler(handler)
|
|
log.setLevel(logging.INFO)
|
|
|
|
# Synchronisation necessary for the multiprocess request handling.
|
|
process_lock = Lock()
|
|
|
|
class CacheState():
|
|
"""
|
|
A CacheState keeps track of the cache folders.
|
|
CacheState objects can update the list of used cache folders, which
|
|
will associate a date object to the name of the folder.
|
|
CacheState objects can ask to clean the unused cache folders.
|
|
Meaning that the folders having reach a certain timeout will be deleted.
|
|
"""
|
|
|
|
def __init__(self, cache_dir, cache_timeout, folder_dict={}):
|
|
"""
|
|
cache_dir the place where the state is stored
|
|
folder_dict a dict containing the existing list of cache folders
|
|
cache_timeout a timedelta before a cache folder can be deleted
|
|
"""
|
|
self.cache_dir = cache_dir
|
|
self.folder_dict = folder_dict
|
|
self.cache_timeout = cache_timeout
|
|
|
|
def update_used(self, folder):
|
|
"""
|
|
Associate a datetime.now() to the folders given as parameters
|
|
"""
|
|
log.debug("Folder %s was used", folder)
|
|
self.folder_dict[folder] = datetime.now()
|
|
|
|
def clean_unused(self):
|
|
"""
|
|
Delete the folders having reach the timeout
|
|
"""
|
|
log.info("clean unused folders")
|
|
now = datetime.now()
|
|
list_folder_to_delete = []
|
|
for folder, then in self.folder_dict.items():
|
|
delta = now - then
|
|
log.debug("delete %s if %s > than %s", folder, delta, self.cache_timeout)
|
|
if delta > self.cache_timeout:
|
|
list_folder_to_delete.append(folder)
|
|
for folder in list_folder_to_delete:
|
|
del self.folder_dict[folder]
|
|
shutil.rmtree(folder)
|
|
|
|
@staticmethod
|
|
def load_cache_state_from_disk(cache_dir):
|
|
try:
|
|
with open(os.path.join(cache_dir,"cache_state.pkl"), "rb") as inp:
|
|
return pickle.load(inp)
|
|
except FileNotFoundError:
|
|
return CacheState(cache_dir, timedelta(hours=24))
|
|
|
|
def store_on_disk(self):
|
|
with open(os.path.join(self.cache_dir, "cache_state.pkl"), "wb") as outp:
|
|
return pickle.dump(self, outp)
|
|
|
|
class Solver():
|
|
|
|
def __init__(self, repos, module_platform_id, persistdir, cachedir, arch):
|
|
self.base = dnf.Base()
|
|
|
|
# Enable fastestmirror to ensure we choose the fastest mirrors for
|
|
# downloading metadata (when depsolving) and downloading packages.
|
|
self.base.conf.fastestmirror = True
|
|
|
|
# We use the same cachedir for multiple architectures. Unfortunately,
|
|
# this is something that doesn't work well in certain situations
|
|
# with zchunk:
|
|
# Imagine that we already have cache for arch1. Then, we use dnf-json
|
|
# to depsolve for arch2. If ZChunk is enabled and available (that's
|
|
# the case for Fedora), dnf will try to download only differences
|
|
# between arch1 and arch2 metadata. But, as these are completely
|
|
# different, dnf must basically redownload everything.
|
|
# For downloding deltas, zchunk uses HTTP range requests. Unfortunately,
|
|
# if the mirror doesn't support multi range requests, then zchunk will
|
|
# download one small segment per a request. Because we need to update
|
|
# the whole metadata (10s of MB), this can be extremely slow in some cases.
|
|
# I think that we can come up with a better fix but let's just disable
|
|
# zchunk for now. As we are already downloading a lot of data when
|
|
# building images, I don't care if we download even more.
|
|
self.base.conf.zchunk = False
|
|
|
|
# Try another mirror if it takes longer than 5 seconds to connect.
|
|
self.base.conf.timeout = 5
|
|
|
|
# Set the rest of the dnf configuration.
|
|
self.base.conf.module_platform_id = module_platform_id
|
|
self.base.conf.config_file_path = "/dev/null"
|
|
self.base.conf.persistdir = persistdir
|
|
self.base.conf.cachedir = cachedir
|
|
self.base.conf.substitutions['arch'] = arch
|
|
self.base.conf.substitutions['basearch'] = dnf.rpm.basearch(arch)
|
|
|
|
for repo in repos:
|
|
self.base.repos.add(self._dnfrepo(repo, self.base.conf))
|
|
self.base.fill_sack(load_system_repo=False)
|
|
|
|
def _dnfrepo(self, desc, parent_conf=None):
|
|
"""Makes a dnf.repo.Repo out of a JSON repository description"""
|
|
|
|
repo = dnf.repo.Repo(desc["id"], parent_conf)
|
|
|
|
if "baseurl" in desc:
|
|
repo.baseurl = desc["baseurl"]
|
|
elif "metalink" in desc:
|
|
repo.metalink = desc["metalink"]
|
|
elif "mirrorlist" in desc:
|
|
repo.mirrorlist = desc["mirrorlist"]
|
|
else:
|
|
assert False
|
|
|
|
if desc.get("ignoressl", False):
|
|
repo.sslverify = False
|
|
if "sslcacert" in desc:
|
|
repo.sslcacert = desc["sslcacert"]
|
|
if "sslclientkey" in desc:
|
|
repo.sslclientkey = desc["sslclientkey"]
|
|
if "sslclientcert" in desc:
|
|
repo.sslclientcert = desc["sslclientcert"]
|
|
|
|
# In dnf, the default metadata expiration time is 48 hours. However,
|
|
# some repositories never expire the metadata, and others expire it much
|
|
# sooner than that. We therefore allow this to be configured. If nothing
|
|
# is provided we error on the side of checking if we should invalidate
|
|
# the cache. If cache invalidation is not necessary, the overhead of
|
|
# checking is in the hundreds of milliseconds. In order to avoid this
|
|
# overhead accumulating for API calls that consist of several dnf calls,
|
|
# we set the expiration to a short time period, rather than 0.
|
|
repo.metadata_expire = desc.get("metadata_expire", "20s")
|
|
|
|
return repo
|
|
|
|
def _repo_checksums(self):
|
|
checksums = {}
|
|
for repo in self.base.repos.iter_enabled():
|
|
# Uses the same algorithm as libdnf to find cache dir:
|
|
# https://github.com/rpm-software-management/libdnf/blob/master/libdnf/repo/Repo.cpp#L1288
|
|
if repo.metalink:
|
|
url = repo.metalink
|
|
elif repo.mirrorlist:
|
|
url = repo.mirrorlist
|
|
elif repo.baseurl:
|
|
url = repo.baseurl[0]
|
|
else:
|
|
assert False
|
|
|
|
digest = hashlib.sha256(url.encode()).hexdigest()[:16]
|
|
|
|
repomd_file = f"{repo.id}-{digest}/repodata/repomd.xml"
|
|
with open(f"{self.base.conf.cachedir}/{repomd_file}", "rb") as f:
|
|
repomd = f.read()
|
|
|
|
checksums[repo.id] = "sha256:" + hashlib.sha256(repomd).hexdigest()
|
|
|
|
return checksums
|
|
|
|
def _timestamp_to_rfc3339(self, timestamp):
|
|
d = datetime.utcfromtimestamp(timestamp)
|
|
return d.strftime('%Y-%m-%dT%H:%M:%SZ')
|
|
|
|
def dump(self):
|
|
packages = []
|
|
for package in self.base.sack.query().available():
|
|
packages.append({
|
|
"name": package.name,
|
|
"summary": package.summary,
|
|
"description": package.description,
|
|
"url": package.url,
|
|
"epoch": package.epoch,
|
|
"version": package.version,
|
|
"release": package.release,
|
|
"arch": package.arch,
|
|
"buildtime": self._timestamp_to_rfc3339(package.buildtime),
|
|
"license": package.license
|
|
})
|
|
return {
|
|
"checksums": self._repo_checksums(),
|
|
"packages": packages
|
|
}
|
|
|
|
def depsolve(self, package_spec, exclude_spec):
|
|
self.base.install_specs(package_spec, exclude_spec)
|
|
self.base.resolve()
|
|
dependencies = []
|
|
for tsi in self.base.transaction:
|
|
# Avoid using the install_set() helper, as it does not guarantee
|
|
# a stable order
|
|
if tsi.action not in dnf.transaction.FORWARD_ACTIONS:
|
|
continue
|
|
package = tsi.pkg
|
|
|
|
dependencies.append({
|
|
"name": package.name,
|
|
"epoch": package.epoch,
|
|
"version": package.version,
|
|
"release": package.release,
|
|
"arch": package.arch,
|
|
"repo_id": package.reponame,
|
|
"path": package.relativepath,
|
|
"remote_location": package.remote_location(),
|
|
"checksum": (
|
|
f"{hawkey.chksum_name(package.chksum[0])}:"
|
|
f"{package.chksum[1].hex()}"
|
|
)
|
|
})
|
|
return {
|
|
"checksums": self._repo_checksums(),
|
|
"dependencies": dependencies
|
|
}
|
|
|
|
class DnfJsonRequestHandler(BaseHTTPRequestHandler):
|
|
"""
|
|
Answers Http requests to depsolve or dump packages.
|
|
"""
|
|
|
|
|
|
def init_cache_folder_list(self, repos):
|
|
cache_folders = []
|
|
for repo in repos:
|
|
if "baseurl" in repo:
|
|
url = repo["baseurl"]
|
|
elif "metalink" in repo:
|
|
url = repo["metalink"]
|
|
elif "mirrorlist" in repo:
|
|
url = repo["mirrorlist"]
|
|
else:
|
|
assert False
|
|
digest = hashlib.sha256(url.encode()).hexdigest()[:16]
|
|
repoid = repo["id"]
|
|
cache_folders.append(f"{self.cache_dir}/{repoid}-{digest}")
|
|
return cache_folders
|
|
|
|
def _send(self):
|
|
self.client_address=('',)
|
|
|
|
def response_with_dnf_error(self, kind: str, reason: str):
|
|
self._send()
|
|
self.send_response(500)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"kind": kind, "reason":
|
|
reason}).encode("utf-8"))
|
|
|
|
def response_failure(self, json_object):
|
|
self._send()
|
|
self.send_response(500)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(json_object).encode("utf-8"))
|
|
|
|
def response_success(self, json_object):
|
|
self._send()
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(json_object).encode("utf-8"))
|
|
|
|
def do_POST(self):
|
|
"""
|
|
do_Post answer the request of a depsolving/dump.
|
|
Depsolving and dump require synchronizing caches on disk to perform their
|
|
operations. Caches folders are created for each remote repository. Since
|
|
the user request contains the list of repository the space taken can grow
|
|
indefinitely.
|
|
|
|
This solution implements a cache cleaning mechanism. It works by deleting
|
|
the directories on a timeout based rule and by keeping the last used date
|
|
in a synced file on disks.
|
|
|
|
This method is multiprocess safe.
|
|
"""
|
|
with process_lock:
|
|
try:
|
|
content_len = int(self.headers.get('Content-Length'))
|
|
data = self.rfile.read(content_len)
|
|
call = json.loads(data.decode("utf-8"))
|
|
command = call["command"]
|
|
arguments = call["arguments"]
|
|
repos = arguments.get("repos", {})
|
|
arch = arguments["arch"]
|
|
module_platform_id = arguments["module_platform_id"]
|
|
|
|
# If dnf-json is run as a service, we don't want users to be able to set the cache
|
|
self.cache_dir = os.environ.get("OVERWRITE_CACHE_DIR", "")
|
|
if self.cache_dir:
|
|
self.cache_dir = os.path.join(self.cache_dir, arch)
|
|
else:
|
|
self.cache_dir = arguments.get("cachedir", "")
|
|
|
|
if not self.cache_dir:
|
|
self.response_failure({ "kind": "Error", "reason": "No cache dir set" })
|
|
cache_state = CacheState.load_cache_state_from_disk(self.cache_dir)
|
|
|
|
with tempfile.TemporaryDirectory() as persistdir:
|
|
try:
|
|
solver = Solver(
|
|
repos,
|
|
module_platform_id,
|
|
persistdir,
|
|
self.cache_dir,
|
|
arch
|
|
)
|
|
if command == "dump":
|
|
self.response_success(solver.dump())
|
|
log.info("dump success")
|
|
elif command == "depsolve":
|
|
self.response_success(
|
|
solver.depsolve(
|
|
arguments["package-specs"],
|
|
arguments.get("exclude-specs", [])
|
|
)
|
|
)
|
|
log.info("depsolve success")
|
|
|
|
except dnf.exceptions.MarkingErrors as e:
|
|
log.info("error install_specs")
|
|
self.response_with_dnf_error(
|
|
"MarkingErrors",
|
|
f"Error occurred when marking packages for installation: {e}"
|
|
)
|
|
except dnf.exceptions.DepsolveError as e:
|
|
log.info("error depsolve")
|
|
self.response_with_dnf_error(
|
|
"DepsolveError",
|
|
(
|
|
"There was a problem depsolving "
|
|
f"{arguments['package-specs']}: {e}"
|
|
)
|
|
)
|
|
except dnf.exceptions.Error as e:
|
|
self.response_with_dnf_error(
|
|
type(e).__name__,
|
|
f"Error occurred when setting up repo: {e}")
|
|
finally:
|
|
for cache_folder in self.init_cache_folder_list(repos):
|
|
cache_state.update_used(cache_folder)
|
|
cache_state.clean_unused()
|
|
cache_state.store_on_disk()
|
|
|
|
log.info("Starting the dnf-json server")
|
|
|
|
LISTEN_FDS = int(os.environ.get("LISTEN_FDS", 0))
|
|
# set from entrypoint if differs from 3
|
|
LISTEN_FD = int(os.environ.get("LISTEN_FD", 3))
|
|
|
|
# The dnf-json web server has to use forks to serve the requests. Because the
|
|
# dnf library is leaking memory in its Cpp side.
|
|
class SystemDActivationSocketServer(socketserver.ForkingMixIn, socketserver.UnixStreamServer):
|
|
def server_bind(self):
|
|
log.debug("service bind")
|
|
log.debug("rebind socket")
|
|
log.debug("address_family: %d ", self.address_family)
|
|
log.debug("socket_type: %d ", self.socket_type)
|
|
if LISTEN_FDS > 1:
|
|
log.warning("More than one LISTEN_FDS")
|
|
self.socket = socket.fromfd(LISTEN_FD, self.address_family, self.socket_type)
|
|
|
|
# start the web server
|
|
server = SystemDActivationSocketServer('', DnfJsonRequestHandler)
|
|
server.serve_forever()
|