From bcf34f8c6c045a4eb665627106a2ee9787b0e481 Mon Sep 17 00:00:00 2001 From: Thomas Lavocat Date: Thu, 9 Dec 2021 17:44:00 +0100 Subject: [PATCH] dnj-json: delete unused cache folders Detect folders that are not used since some timeout and delete them. The cache folder must be empty when dnf-json is started in order to avoid the situation where some folders can never be cleaned up (dnf-json does not look at the cache directory content but uses information from the requests to deduce which folders to keep and to delete). Solves #2020 --- dnf-json | 204 ++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 155 insertions(+), 49 deletions(-) diff --git a/dnf-json b/dnf-json index 9f16dd1f8..ed209b4d5 100755 --- a/dnf-json +++ b/dnf-json @@ -11,21 +11,83 @@ import logging import sys from http.server import BaseHTTPRequestHandler import pathlib +from multiprocessing import Lock +import queue +import shutil +from datetime import datetime, timedelta import dnf import hawkey +import pickle +# Logging setup (to systemd if available) formatter = logging.Formatter( fmt="%(asctime)s %(name)s.%(levelname)s: %(message)s", datefmt="%Y.%m.%d %H:%M:%S" ) handler = logging.StreamHandler(stream=sys.stdout) handler.setFormatter(formatter) - log = logging.getLogger('dnf-json') log.addHandler(handler) log.setLevel(logging.INFO) +# Synchronisation necessary for the multiprocess request handling. +process_lock = Lock() + +class CacheState(): + """ + A CacheState keeps track of the cache folders. + CacheState objects can update the list of used cache folders, which + will associate a date object to the name of the folder. + CacheState objects can ask to clean the unused cache folders. + Meaning that the folders having reach a certain timeout will be deleted. + """ + + def __init__(self, cache_dir, cache_timeout, folder_dict={}): + """ + cache_dir the place where the state is stored + folder_dict a dict containing the existing list of cache folders + cache_timeout a timedelta before a cache folder can be deleted + """ + self.cache_dir = cache_dir + self.folder_dict = folder_dict + self.cache_timeout = cache_timeout + + def update_used(self, folder): + """ + Associate a datetime.now() to the folders given as parameters + """ + log.debug("Folder %s was used", folder) + self.folder_dict[folder] = datetime.now() + + def clean_unused(self): + """ + Delete the folders having reach the timeout + """ + log.info("clean unused folders") + now = datetime.now() + list_folder_to_delete = [] + for folder, then in self.folder_dict.items(): + delta = now - then + log.debug("delete %s if %s > than %s", folder, delta, self.cache_timeout) + if delta > self.cache_timeout: + list_folder_to_delete.append(folder) + for folder in list_folder_to_delete: + del self.folder_dict[folder] + shutil.rmtree(folder) + + @staticmethod + def load_cache_state_from_disk(cache_dir): + try: + with open(os.path.join(cache_dir,"cache_state.pkl"), "rb") as inp: + return pickle.load(inp) + except FileNotFoundError: + return CacheState(cache_dir, timedelta(hours=24)) + + def store_on_disk(self): + with open(os.path.join(self.cache_dir, "cache_state.pkl"), "wb") as outp: + return pickle.dump(self, outp) + class Solver(): def __init__(self, repos, module_platform_id, persistdir, cachedir, arch): @@ -127,7 +189,7 @@ class Solver(): return checksums def _timestamp_to_rfc3339(self, timestamp): - d = datetime.datetime.utcfromtimestamp(timestamp) + d = datetime.utcfromtimestamp(timestamp) return d.strftime('%Y-%m-%dT%H:%M:%SZ') def dump(self): @@ -181,6 +243,26 @@ class Solver(): } class DnfJsonRequestHandler(BaseHTTPRequestHandler): + """ + Answers Http requests to depsolve or dump packages. + """ + + + def init_cache_folder_list(self, repos): + cache_folders = [] + for repo in repos: + if "baseurl" in repo: + url = repo["baseurl"] + elif "metalink" in repo: + url = repo["metalink"] + elif "mirrorlist" in repo: + url = repo["mirrorlist"] + else: + assert False + digest = hashlib.sha256(url.encode()).hexdigest()[:16] + repoid = repo["id"] + cache_folders.append(f"{self.cache_dir}/{repoid}-{digest}") + return cache_folders def _send(self): self.client_address=('',) @@ -201,56 +283,77 @@ class DnfJsonRequestHandler(BaseHTTPRequestHandler): self.wfile.write(json.dumps(json_object).encode("utf-8")) def do_POST(self): - content_len = int(self.headers.get('Content-Length')) - data = self.rfile.read(content_len) - call = json.loads(data.decode("utf-8")) - command = call["command"] - arguments = call["arguments"] - repos = arguments.get("repos", {}) - arch = arguments["arch"] - cachedir = arguments["cachedir"] - module_platform_id = arguments["module_platform_id"] + """ + do_Post answer the request of a depsolving/dump. + Depsolving and dump require synchronizing caches on disk to perform their + operations. Caches folders are created for each remote repository. Since + the user request contains the list of repository the space taken can grow + indefinitely. - with tempfile.TemporaryDirectory() as persistdir: + This solution implements a cache cleaning mechanism. It works by deleting + the directories on a timeout based rule and by keeping the last used date + in a synced file on disks. + + This method is multiprocess safe. + """ + with process_lock: try: - solver = Solver( - repos, - module_platform_id, - persistdir, - cachedir, - arch - ) - if command == "dump": - self.response_success(solver.dump()) - log.info("dump success") - elif command == "depsolve": - self.response_success( - solver.depsolve( - arguments["package-specs"], - arguments.get("exclude-specs", []) - ) - ) - log.info("depsolve success") + content_len = int(self.headers.get('Content-Length')) + data = self.rfile.read(content_len) + call = json.loads(data.decode("utf-8")) + command = call["command"] + arguments = call["arguments"] + repos = arguments.get("repos", {}) + arch = arguments["arch"] + self.cache_dir = arguments["cachedir"] + cache_state = CacheState.load_cache_state_from_disk(self.cache_dir) + module_platform_id = arguments["module_platform_id"] - except dnf.exceptions.MarkingErrors as e: - log.info("error install_specs") - self.response_with_dnf_error( - "MarkingErrors", - f"Error occurred when marking packages for installation: {e}" - ) - except dnf.exceptions.DepsolveError as e: - log.info("error depsolve") - self.response_with_dnf_error( - "DepsolveError", - ( - "There was a problem depsolving " - f"{arguments['package-specs']}: {e}" - ) - ) - except dnf.exceptions.Error as e: - self.response_with_dnf_error( - type(e).__name__, - f"Error occurred when setting up repo: {e}") + with tempfile.TemporaryDirectory() as persistdir: + try: + solver = Solver( + repos, + module_platform_id, + persistdir, + self.cache_dir, + arch + ) + if command == "dump": + self.response_success(solver.dump()) + log.info("dump success") + elif command == "depsolve": + self.response_success( + solver.depsolve( + arguments["package-specs"], + arguments.get("exclude-specs", []) + ) + ) + log.info("depsolve success") + + except dnf.exceptions.MarkingErrors as e: + log.info("error install_specs") + self.response_with_dnf_error( + "MarkingErrors", + f"Error occurred when marking packages for installation: {e}" + ) + except dnf.exceptions.DepsolveError as e: + log.info("error depsolve") + self.response_with_dnf_error( + "DepsolveError", + ( + "There was a problem depsolving " + f"{arguments['package-specs']}: {e}" + ) + ) + except dnf.exceptions.Error as e: + self.response_with_dnf_error( + type(e).__name__, + f"Error occurred when setting up repo: {e}") + finally: + for cache_folder in self.init_cache_folder_list(repos): + cache_state.update_used(cache_folder) + cache_state.clean_unused() + cache_state.store_on_disk() log.info("Starting the dnf-json server") @@ -258,6 +361,8 @@ LISTEN_FDS = int(os.environ.get("LISTEN_FDS", 0)) SOCK_PATH = "/run/osbuild-dnf-json/" SOCK_NAME = "api.sock" +# The dnf-json web server has to use forks to serve the requests. Because the +# dnf library is leaking memory in its Cpp side. class SystemDActivationSocketServer(socketserver.ForkingMixIn, socketserver.UnixStreamServer): def server_bind(self): log.debug("service bind") @@ -270,6 +375,7 @@ class SystemDActivationSocketServer(socketserver.ForkingMixIn, socketserver.Unix log.debug("socket_type: %d ", self.socket_type) self.socket = socket.fromfd(3, self.address_family, self.socket_type) +# start the web server pathlib.Path(SOCK_PATH).mkdir(parents=True, exist_ok=True) server = SystemDActivationSocketServer(f"{SOCK_PATH}{SOCK_NAME}", DnfJsonRequestHandler) server.serve_forever()