dnj-json: delete unused cache folders
Detect folders that are not used since some timeout and delete them. The cache folder must be empty when dnf-json is started in order to avoid the situation where some folders can never be cleaned up (dnf-json does not look at the cache directory content but uses information from the requests to deduce which folders to keep and to delete). Solves #2020
This commit is contained in:
parent
d543e39fc9
commit
bcf34f8c6c
1 changed files with 155 additions and 49 deletions
204
dnf-json
204
dnf-json
|
|
@ -11,21 +11,83 @@ import logging
|
|||
import sys
|
||||
from http.server import BaseHTTPRequestHandler
|
||||
import pathlib
|
||||
from multiprocessing import Lock
|
||||
import queue
|
||||
import shutil
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import dnf
|
||||
import hawkey
|
||||
import pickle
|
||||
|
||||
# Logging setup (to systemd if available)
|
||||
formatter = logging.Formatter(
|
||||
fmt="%(asctime)s %(name)s.%(levelname)s: %(message)s",
|
||||
datefmt="%Y.%m.%d %H:%M:%S"
|
||||
)
|
||||
handler = logging.StreamHandler(stream=sys.stdout)
|
||||
handler.setFormatter(formatter)
|
||||
|
||||
log = logging.getLogger('dnf-json')
|
||||
log.addHandler(handler)
|
||||
log.setLevel(logging.INFO)
|
||||
|
||||
# Synchronisation necessary for the multiprocess request handling.
|
||||
process_lock = Lock()
|
||||
|
||||
class CacheState():
|
||||
"""
|
||||
A CacheState keeps track of the cache folders.
|
||||
CacheState objects can update the list of used cache folders, which
|
||||
will associate a date object to the name of the folder.
|
||||
CacheState objects can ask to clean the unused cache folders.
|
||||
Meaning that the folders having reach a certain timeout will be deleted.
|
||||
"""
|
||||
|
||||
def __init__(self, cache_dir, cache_timeout, folder_dict={}):
|
||||
"""
|
||||
cache_dir the place where the state is stored
|
||||
folder_dict a dict containing the existing list of cache folders
|
||||
cache_timeout a timedelta before a cache folder can be deleted
|
||||
"""
|
||||
self.cache_dir = cache_dir
|
||||
self.folder_dict = folder_dict
|
||||
self.cache_timeout = cache_timeout
|
||||
|
||||
def update_used(self, folder):
|
||||
"""
|
||||
Associate a datetime.now() to the folders given as parameters
|
||||
"""
|
||||
log.debug("Folder %s was used", folder)
|
||||
self.folder_dict[folder] = datetime.now()
|
||||
|
||||
def clean_unused(self):
|
||||
"""
|
||||
Delete the folders having reach the timeout
|
||||
"""
|
||||
log.info("clean unused folders")
|
||||
now = datetime.now()
|
||||
list_folder_to_delete = []
|
||||
for folder, then in self.folder_dict.items():
|
||||
delta = now - then
|
||||
log.debug("delete %s if %s > than %s", folder, delta, self.cache_timeout)
|
||||
if delta > self.cache_timeout:
|
||||
list_folder_to_delete.append(folder)
|
||||
for folder in list_folder_to_delete:
|
||||
del self.folder_dict[folder]
|
||||
shutil.rmtree(folder)
|
||||
|
||||
@staticmethod
|
||||
def load_cache_state_from_disk(cache_dir):
|
||||
try:
|
||||
with open(os.path.join(cache_dir,"cache_state.pkl"), "rb") as inp:
|
||||
return pickle.load(inp)
|
||||
except FileNotFoundError:
|
||||
return CacheState(cache_dir, timedelta(hours=24))
|
||||
|
||||
def store_on_disk(self):
|
||||
with open(os.path.join(self.cache_dir, "cache_state.pkl"), "wb") as outp:
|
||||
return pickle.dump(self, outp)
|
||||
|
||||
class Solver():
|
||||
|
||||
def __init__(self, repos, module_platform_id, persistdir, cachedir, arch):
|
||||
|
|
@ -127,7 +189,7 @@ class Solver():
|
|||
return checksums
|
||||
|
||||
def _timestamp_to_rfc3339(self, timestamp):
|
||||
d = datetime.datetime.utcfromtimestamp(timestamp)
|
||||
d = datetime.utcfromtimestamp(timestamp)
|
||||
return d.strftime('%Y-%m-%dT%H:%M:%SZ')
|
||||
|
||||
def dump(self):
|
||||
|
|
@ -181,6 +243,26 @@ class Solver():
|
|||
}
|
||||
|
||||
class DnfJsonRequestHandler(BaseHTTPRequestHandler):
|
||||
"""
|
||||
Answers Http requests to depsolve or dump packages.
|
||||
"""
|
||||
|
||||
|
||||
def init_cache_folder_list(self, repos):
|
||||
cache_folders = []
|
||||
for repo in repos:
|
||||
if "baseurl" in repo:
|
||||
url = repo["baseurl"]
|
||||
elif "metalink" in repo:
|
||||
url = repo["metalink"]
|
||||
elif "mirrorlist" in repo:
|
||||
url = repo["mirrorlist"]
|
||||
else:
|
||||
assert False
|
||||
digest = hashlib.sha256(url.encode()).hexdigest()[:16]
|
||||
repoid = repo["id"]
|
||||
cache_folders.append(f"{self.cache_dir}/{repoid}-{digest}")
|
||||
return cache_folders
|
||||
|
||||
def _send(self):
|
||||
self.client_address=('',)
|
||||
|
|
@ -201,56 +283,77 @@ class DnfJsonRequestHandler(BaseHTTPRequestHandler):
|
|||
self.wfile.write(json.dumps(json_object).encode("utf-8"))
|
||||
|
||||
def do_POST(self):
|
||||
content_len = int(self.headers.get('Content-Length'))
|
||||
data = self.rfile.read(content_len)
|
||||
call = json.loads(data.decode("utf-8"))
|
||||
command = call["command"]
|
||||
arguments = call["arguments"]
|
||||
repos = arguments.get("repos", {})
|
||||
arch = arguments["arch"]
|
||||
cachedir = arguments["cachedir"]
|
||||
module_platform_id = arguments["module_platform_id"]
|
||||
"""
|
||||
do_Post answer the request of a depsolving/dump.
|
||||
Depsolving and dump require synchronizing caches on disk to perform their
|
||||
operations. Caches folders are created for each remote repository. Since
|
||||
the user request contains the list of repository the space taken can grow
|
||||
indefinitely.
|
||||
|
||||
with tempfile.TemporaryDirectory() as persistdir:
|
||||
This solution implements a cache cleaning mechanism. It works by deleting
|
||||
the directories on a timeout based rule and by keeping the last used date
|
||||
in a synced file on disks.
|
||||
|
||||
This method is multiprocess safe.
|
||||
"""
|
||||
with process_lock:
|
||||
try:
|
||||
solver = Solver(
|
||||
repos,
|
||||
module_platform_id,
|
||||
persistdir,
|
||||
cachedir,
|
||||
arch
|
||||
)
|
||||
if command == "dump":
|
||||
self.response_success(solver.dump())
|
||||
log.info("dump success")
|
||||
elif command == "depsolve":
|
||||
self.response_success(
|
||||
solver.depsolve(
|
||||
arguments["package-specs"],
|
||||
arguments.get("exclude-specs", [])
|
||||
)
|
||||
)
|
||||
log.info("depsolve success")
|
||||
content_len = int(self.headers.get('Content-Length'))
|
||||
data = self.rfile.read(content_len)
|
||||
call = json.loads(data.decode("utf-8"))
|
||||
command = call["command"]
|
||||
arguments = call["arguments"]
|
||||
repos = arguments.get("repos", {})
|
||||
arch = arguments["arch"]
|
||||
self.cache_dir = arguments["cachedir"]
|
||||
cache_state = CacheState.load_cache_state_from_disk(self.cache_dir)
|
||||
module_platform_id = arguments["module_platform_id"]
|
||||
|
||||
except dnf.exceptions.MarkingErrors as e:
|
||||
log.info("error install_specs")
|
||||
self.response_with_dnf_error(
|
||||
"MarkingErrors",
|
||||
f"Error occurred when marking packages for installation: {e}"
|
||||
)
|
||||
except dnf.exceptions.DepsolveError as e:
|
||||
log.info("error depsolve")
|
||||
self.response_with_dnf_error(
|
||||
"DepsolveError",
|
||||
(
|
||||
"There was a problem depsolving "
|
||||
f"{arguments['package-specs']}: {e}"
|
||||
)
|
||||
)
|
||||
except dnf.exceptions.Error as e:
|
||||
self.response_with_dnf_error(
|
||||
type(e).__name__,
|
||||
f"Error occurred when setting up repo: {e}")
|
||||
with tempfile.TemporaryDirectory() as persistdir:
|
||||
try:
|
||||
solver = Solver(
|
||||
repos,
|
||||
module_platform_id,
|
||||
persistdir,
|
||||
self.cache_dir,
|
||||
arch
|
||||
)
|
||||
if command == "dump":
|
||||
self.response_success(solver.dump())
|
||||
log.info("dump success")
|
||||
elif command == "depsolve":
|
||||
self.response_success(
|
||||
solver.depsolve(
|
||||
arguments["package-specs"],
|
||||
arguments.get("exclude-specs", [])
|
||||
)
|
||||
)
|
||||
log.info("depsolve success")
|
||||
|
||||
except dnf.exceptions.MarkingErrors as e:
|
||||
log.info("error install_specs")
|
||||
self.response_with_dnf_error(
|
||||
"MarkingErrors",
|
||||
f"Error occurred when marking packages for installation: {e}"
|
||||
)
|
||||
except dnf.exceptions.DepsolveError as e:
|
||||
log.info("error depsolve")
|
||||
self.response_with_dnf_error(
|
||||
"DepsolveError",
|
||||
(
|
||||
"There was a problem depsolving "
|
||||
f"{arguments['package-specs']}: {e}"
|
||||
)
|
||||
)
|
||||
except dnf.exceptions.Error as e:
|
||||
self.response_with_dnf_error(
|
||||
type(e).__name__,
|
||||
f"Error occurred when setting up repo: {e}")
|
||||
finally:
|
||||
for cache_folder in self.init_cache_folder_list(repos):
|
||||
cache_state.update_used(cache_folder)
|
||||
cache_state.clean_unused()
|
||||
cache_state.store_on_disk()
|
||||
|
||||
log.info("Starting the dnf-json server")
|
||||
|
||||
|
|
@ -258,6 +361,8 @@ LISTEN_FDS = int(os.environ.get("LISTEN_FDS", 0))
|
|||
SOCK_PATH = "/run/osbuild-dnf-json/"
|
||||
SOCK_NAME = "api.sock"
|
||||
|
||||
# The dnf-json web server has to use forks to serve the requests. Because the
|
||||
# dnf library is leaking memory in its Cpp side.
|
||||
class SystemDActivationSocketServer(socketserver.ForkingMixIn, socketserver.UnixStreamServer):
|
||||
def server_bind(self):
|
||||
log.debug("service bind")
|
||||
|
|
@ -270,6 +375,7 @@ class SystemDActivationSocketServer(socketserver.ForkingMixIn, socketserver.Unix
|
|||
log.debug("socket_type: %d ", self.socket_type)
|
||||
self.socket = socket.fromfd(3, self.address_family, self.socket_type)
|
||||
|
||||
# start the web server
|
||||
pathlib.Path(SOCK_PATH).mkdir(parents=True, exist_ok=True)
|
||||
server = SystemDActivationSocketServer(f"{SOCK_PATH}{SOCK_NAME}", DnfJsonRequestHandler)
|
||||
server.serve_forever()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue