dnf-json: convert to single-use depsolve script
- Removed server class and handlers
The dnf-json Python script will no longer run as a service. In the
future, we will create a service in the Go package that will handle
receiving requests and calling the script accordingly.
- Removed CacheState class
- Added standalone functions for setting up cache and running the
depsolve
- Validate the input before reading
- Print all messages (status and error) to stderr and print only the
machine-readable results to stdout (including structured error)
The status messages on stderr are useful for troubleshooting. When
called from the service they will appear in the log/journal.
- Catch RepoError exceptions
This can occur when dnf fails to load the repository configuration.
- Support multiple depsolve jobs per request
The structure is changed to support making multiple depsolve
requests but reuse the dnf.Base object to make chained (incremental)
dependency resolution requests.
Before:
{
"command": "depsolve",
"arguments": {
"package-specs": [...],
"exclude-specs": [...],
"repos": [{...}],
"cachedir": "...",
"module_platform_id": "...",
"arch": "..."
}
}
After:
{
"command": "depsolve",
"cachedir": "...",
"module_platform_id": "...",
"arch": "...",
"arguments": {
"repos": [{...}],
"transactions": [
{
"package-specs": [...],
"exclude-specs": [...],
"repo-ids": [...]
}
]
}
}
Signed-off-by: Achilleas Koutsou <achilleas@koutsou.net>
This commit is contained in:
parent
4b289ce861
commit
82007dcf46
1 changed files with 142 additions and 233 deletions
375
dnf-json
375
dnf-json
|
|
@ -1,95 +1,25 @@
|
|||
#!/usr/bin/python3
|
||||
# pylint: disable=invalid-name
|
||||
|
||||
"""
|
||||
A JSON-based interface for depsolving using DNF.
|
||||
|
||||
Reads a request through stdin and prints the result to stdout.
|
||||
Status messages are always printed to stderr.
|
||||
In case of error, a structured error is printed to stdout as well.
|
||||
"""
|
||||
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import pickle
|
||||
import shutil
|
||||
import socket
|
||||
import socketserver
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from http.server import BaseHTTPRequestHandler
|
||||
from multiprocessing import Lock
|
||||
from datetime import datetime
|
||||
|
||||
import dnf
|
||||
import hawkey
|
||||
|
||||
# Logging setup (to systemd if available)
|
||||
formatter = logging.Formatter(
|
||||
fmt="%(asctime)s %(name)s.%(levelname)s: %(message)s",
|
||||
datefmt="%Y.%m.%d %H:%M:%S"
|
||||
)
|
||||
handler = logging.StreamHandler(stream=sys.stdout)
|
||||
handler.setFormatter(formatter)
|
||||
log = logging.getLogger('dnf-json')
|
||||
log.addHandler(handler)
|
||||
log.setLevel(logging.INFO)
|
||||
|
||||
# Synchronisation necessary for the multiprocess request handling.
|
||||
process_lock = Lock()
|
||||
|
||||
|
||||
class CacheState():
|
||||
"""
|
||||
A CacheState keeps track of the cache folders.
|
||||
CacheState objects can update the list of used cache folders, which
|
||||
will associate a date object to the name of the folder.
|
||||
CacheState objects can ask to clean the unused cache folders.
|
||||
Meaning that the folders having reach a certain timeout will be deleted.
|
||||
"""
|
||||
|
||||
def __init__(self, cache_dir, cache_timeout, folder_dict=None):
|
||||
"""
|
||||
cache_dir the place where the state is stored
|
||||
folder_dict a dict containing the existing list of cache folders
|
||||
cache_timeout a timedelta before a cache folder can be deleted
|
||||
"""
|
||||
if folder_dict is None:
|
||||
folder_dict = {}
|
||||
self.cache_dir = cache_dir
|
||||
self.folder_dict = folder_dict
|
||||
self.cache_timeout = cache_timeout
|
||||
|
||||
def update_used(self, folder):
|
||||
"""
|
||||
Associate a datetime.now() to the folders given as parameters
|
||||
"""
|
||||
log.debug("Folder %s was used", folder)
|
||||
self.folder_dict[folder] = datetime.now()
|
||||
|
||||
def clean_unused(self):
|
||||
"""
|
||||
Delete the folders having reach the timeout
|
||||
"""
|
||||
log.info("clean unused folders")
|
||||
now = datetime.now()
|
||||
list_folder_to_delete = []
|
||||
for folder, then in self.folder_dict.items():
|
||||
delta = now - then
|
||||
log.debug("delete %s if %s > than %s", folder, delta, self.cache_timeout)
|
||||
if delta > self.cache_timeout:
|
||||
list_folder_to_delete.append(folder)
|
||||
for folder in list_folder_to_delete:
|
||||
del self.folder_dict[folder]
|
||||
shutil.rmtree(folder)
|
||||
|
||||
@classmethod
|
||||
def load(cls, cache_dir):
|
||||
try:
|
||||
with open(os.path.join(cache_dir, "cache_state.pkl"), "rb") as inp:
|
||||
return pickle.load(inp)
|
||||
except FileNotFoundError:
|
||||
return cls(cache_dir, timedelta(hours=24))
|
||||
|
||||
def store(self):
|
||||
with open(os.path.join(self.cache_dir, "cache_state.pkl"), "wb") as outp:
|
||||
return pickle.dump(self, outp)
|
||||
|
||||
|
||||
class Solver():
|
||||
|
||||
|
|
@ -256,7 +186,7 @@ class Solver():
|
|||
|
||||
# don't install weak-deps for transactions after the 1st transaction
|
||||
if idx > 0:
|
||||
self.base.conf.install_weak_deps=False
|
||||
self.base.conf.install_weak_deps = False
|
||||
|
||||
# set the packages from the last transaction as installed
|
||||
for installed_pkg in last_transaction:
|
||||
|
|
@ -266,7 +196,7 @@ class Solver():
|
|||
self.base.install_specs(
|
||||
transaction.get("package-specs"),
|
||||
transaction.get("exclude-specs"),
|
||||
reponame=[str(id) for id in transaction.get("repos")])
|
||||
reponame=[str(rid) for rid in transaction.get("repo-ids")])
|
||||
self.base.resolve()
|
||||
|
||||
# store the current transaction result
|
||||
|
|
@ -301,163 +231,142 @@ class Solver():
|
|||
}
|
||||
|
||||
|
||||
class DnfJsonRequestHandler(BaseHTTPRequestHandler):
|
||||
"""
|
||||
Answers Http requests to depsolve or dump packages.
|
||||
"""
|
||||
def setup_cachedir(request):
|
||||
arch = request["arch"]
|
||||
# If dnf-json is run as a service, we don't want users to be able to set the cache
|
||||
cache_dir = os.environ.get("OVERWRITE_CACHE_DIR", "")
|
||||
if cache_dir:
|
||||
cache_dir = os.path.join(cache_dir, arch)
|
||||
else:
|
||||
cache_dir = request.get("cachedir", "")
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.cache_dir = None
|
||||
super().__init__(*args, **kwargs)
|
||||
if not cache_dir:
|
||||
return "", {"kind": "Error", "reason": "No cache dir set"}
|
||||
|
||||
def init_cache_folder_list(self, repos):
|
||||
cache_folders = []
|
||||
for repo in repos:
|
||||
if "baseurl" in repo:
|
||||
url = repo["baseurl"]
|
||||
elif "metalink" in repo:
|
||||
url = repo["metalink"]
|
||||
elif "mirrorlist" in repo:
|
||||
url = repo["mirrorlist"]
|
||||
else:
|
||||
assert False
|
||||
digest = hashlib.sha256(url.encode()).hexdigest()[:16]
|
||||
repoid = repo["id"]
|
||||
cache_folders.append(f"{self.cache_dir}/{repoid}-{digest}")
|
||||
return cache_folders
|
||||
|
||||
def _send(self):
|
||||
self.client_address = ('', 0)
|
||||
|
||||
def response_with_dnf_error(self, kind: str, reason: str):
|
||||
self._send()
|
||||
self.send_response(500)
|
||||
self.send_header("Content-Type", "application/json")
|
||||
self.end_headers()
|
||||
self.wfile.write(json.dumps({"kind": kind, "reason":
|
||||
reason}).encode("utf-8"))
|
||||
|
||||
def response_failure(self, json_object):
|
||||
self._send()
|
||||
self.send_response(500)
|
||||
self.send_header("Content-Type", "application/json")
|
||||
self.end_headers()
|
||||
self.wfile.write(json.dumps(json_object).encode("utf-8"))
|
||||
|
||||
def response_success(self, json_object):
|
||||
self._send()
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "application/json")
|
||||
self.end_headers()
|
||||
self.wfile.write(json.dumps(json_object).encode("utf-8"))
|
||||
|
||||
def do_POST(self):
|
||||
"""
|
||||
do_Post answer the request of a depsolving/dump.
|
||||
Depsolving and dump require synchronizing caches on disk to perform their
|
||||
operations. Caches folders are created for each remote repository. Since
|
||||
the user request contains the list of repository the space taken can grow
|
||||
indefinitely.
|
||||
|
||||
This solution implements a cache cleaning mechanism. It works by deleting
|
||||
the directories on a timeout based rule and by keeping the last used date
|
||||
in a synced file on disks.
|
||||
|
||||
This method is multiprocess safe.
|
||||
"""
|
||||
with process_lock:
|
||||
try:
|
||||
content_len = int(self.headers.get('Content-Length'))
|
||||
data = self.rfile.read(content_len)
|
||||
call = json.loads(data.decode("utf-8"))
|
||||
command = call["command"]
|
||||
arguments = call["arguments"]
|
||||
repos = arguments.get("repos", {})
|
||||
arch = arguments["arch"]
|
||||
module_platform_id = arguments["module_platform_id"]
|
||||
|
||||
# If dnf-json is run as a service, we don't want users to be able to set the cache
|
||||
self.cache_dir = os.environ.get("OVERWRITE_CACHE_DIR", "")
|
||||
if self.cache_dir:
|
||||
self.cache_dir = os.path.join(self.cache_dir, arch)
|
||||
else:
|
||||
self.cache_dir = arguments.get("cachedir", "")
|
||||
|
||||
if not self.cache_dir:
|
||||
self.response_failure({"kind": "Error", "reason": "No cache dir set"})
|
||||
cache_state = CacheState.load(self.cache_dir)
|
||||
|
||||
with tempfile.TemporaryDirectory() as persistdir:
|
||||
try:
|
||||
solver = Solver(
|
||||
repos,
|
||||
module_platform_id,
|
||||
persistdir,
|
||||
self.cache_dir,
|
||||
arch
|
||||
)
|
||||
if command == "dump":
|
||||
self.response_success(solver.dump())
|
||||
log.info("dump success")
|
||||
elif command == "depsolve":
|
||||
self.response_success(
|
||||
solver.depsolve(
|
||||
arguments["package-specs"],
|
||||
arguments.get("exclude-specs", [])
|
||||
)
|
||||
)
|
||||
log.info("depsolve success")
|
||||
elif command == "chain-depsolve":
|
||||
self.response_success(
|
||||
solver.chain_depsolve(arguments["transactions"])
|
||||
)
|
||||
|
||||
except dnf.exceptions.MarkingErrors as e:
|
||||
log.info("error install_specs")
|
||||
self.response_with_dnf_error(
|
||||
"MarkingErrors",
|
||||
f"Error occurred when marking packages for installation: {e}"
|
||||
)
|
||||
except dnf.exceptions.DepsolveError as e:
|
||||
log.info("error depsolve")
|
||||
self.response_with_dnf_error(
|
||||
"DepsolveError",
|
||||
(
|
||||
"There was a problem depsolving "
|
||||
f"{arguments['package-specs']}: {e}"
|
||||
)
|
||||
)
|
||||
except dnf.exceptions.Error as e:
|
||||
self.response_with_dnf_error(
|
||||
type(e).__name__,
|
||||
f"Error occurred when setting up repo: {e}")
|
||||
finally:
|
||||
for cache_folder in self.init_cache_folder_list(repos):
|
||||
cache_state.update_used(cache_folder)
|
||||
cache_state.clean_unused()
|
||||
cache_state.store()
|
||||
return cache_dir, None
|
||||
|
||||
|
||||
log.info("Starting the dnf-json server")
|
||||
def solve(request, cache_dir):
|
||||
command = request["command"]
|
||||
arch = request["arch"]
|
||||
module_platform_id = request["module_platform_id"]
|
||||
arguments = request["arguments"]
|
||||
|
||||
LISTEN_FDS = int(os.environ.get("LISTEN_FDS", 0))
|
||||
# set from entrypoint if differs from 3
|
||||
LISTEN_FD = int(os.environ.get("LISTEN_FD", 3))
|
||||
transactions = arguments.get("transactions")
|
||||
with tempfile.TemporaryDirectory() as persistdir:
|
||||
try:
|
||||
solver = Solver(
|
||||
arguments["repos"],
|
||||
module_platform_id,
|
||||
persistdir,
|
||||
cache_dir,
|
||||
arch
|
||||
)
|
||||
if command == "dump":
|
||||
result = solver.dump()
|
||||
printe("dump success")
|
||||
elif command == "depsolve":
|
||||
job = transactions[0]
|
||||
result = solver.depsolve(
|
||||
job["package-specs"],
|
||||
job.get("exclude-specs", [])
|
||||
)
|
||||
printe("depsolve success")
|
||||
elif command == "chain-depsolve":
|
||||
result = solver.chain_depsolve(transactions)
|
||||
printe("chain-depsolve success")
|
||||
|
||||
except dnf.exceptions.MarkingErrors as e:
|
||||
printe("error install_specs")
|
||||
return None, {
|
||||
"kind": "MarkingErrors",
|
||||
"reason": f"Error occurred when marking packages for installation: {e}"
|
||||
}
|
||||
except dnf.exceptions.DepsolveError as e:
|
||||
printe("error depsolve")
|
||||
return None, {
|
||||
"kind": "DepsolveError",
|
||||
"reason": f"There was a problem depsolving {arguments['package-specs']}: {e}"
|
||||
}
|
||||
except dnf.exceptions.RepoError as e:
|
||||
return None, {
|
||||
"kind": "RepoError",
|
||||
"reason": f"There was a problem reading a repository: {e}"
|
||||
}
|
||||
except dnf.exceptions.Error as e:
|
||||
printe("error repository setup")
|
||||
return None, {
|
||||
"kind": type(e).__name__,
|
||||
"reason": str(e)
|
||||
}
|
||||
return result, None
|
||||
|
||||
|
||||
# The dnf-json web server has to use forks to serve the requests. Because the
|
||||
# dnf library is leaking memory in its Cpp side.
|
||||
class SystemDActivationSocketServer(socketserver.ForkingMixIn, socketserver.UnixStreamServer):
|
||||
def server_bind(self):
|
||||
log.debug("service bind")
|
||||
log.debug("rebind socket")
|
||||
log.debug("address_family: %d ", self.address_family)
|
||||
log.debug("socket_type: %d ", self.socket_type)
|
||||
if LISTEN_FDS > 1:
|
||||
log.warning("More than one LISTEN_FDS")
|
||||
self.socket = socket.fromfd(LISTEN_FD, self.address_family, self.socket_type)
|
||||
def printe(*msg):
|
||||
print(*msg, file=sys.stderr)
|
||||
|
||||
|
||||
# start the web server
|
||||
server = SystemDActivationSocketServer('', DnfJsonRequestHandler)
|
||||
server.serve_forever()
|
||||
def fail(err):
|
||||
printe(f"{err['kind']}: {err['reason']}")
|
||||
print(json.dumps(err))
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def respond(result):
|
||||
print(json.dumps(result))
|
||||
|
||||
|
||||
def validate_request(request):
|
||||
command = request.get("command")
|
||||
valid_cmds = ("depsolve", "dump", "chain-depsolve")
|
||||
if command not in valid_cmds:
|
||||
return {
|
||||
"kind": "InvalidRequest",
|
||||
"reason": f"invalid command '{command}': must be one of {', '.join(valid_cmds)}"
|
||||
}
|
||||
|
||||
if not request.get("arch"):
|
||||
return {
|
||||
"kind": "InvalidRequest",
|
||||
"reason": "no 'arch' specified"
|
||||
}
|
||||
|
||||
if not request.get("module_platform_id"):
|
||||
return {
|
||||
"kind": "InvalidRequest",
|
||||
"reason": "no 'module_platform_id' specified"
|
||||
}
|
||||
arguments = request.get("arguments")
|
||||
if not arguments:
|
||||
return {
|
||||
"kind": "InvalidRequest",
|
||||
"reason": "empty 'arguments'"
|
||||
}
|
||||
|
||||
if not arguments.get("repos"):
|
||||
return {
|
||||
"kind": "InvalidRequest",
|
||||
"reason": "no 'repos' specified"
|
||||
}
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def main():
|
||||
request = json.load(sys.stdin)
|
||||
err = validate_request(request)
|
||||
if err:
|
||||
fail(err)
|
||||
|
||||
cachedir, err = setup_cachedir(request)
|
||||
if err:
|
||||
fail(err)
|
||||
result, err = solve(request, cachedir)
|
||||
if err:
|
||||
fail(err)
|
||||
else:
|
||||
respond(result)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue