fscache: add new `FsCache._last_used_objs()' helper

This commit adds a helper that can be used to get a sorted list
of cache entries. The list includes the name and the last_used
information.
This commit is contained in:
Michael Vogt 2023-12-12 21:20:42 +01:00 committed by Achilleas Koutsou
parent 25df4d76a6
commit b2a82beb75
2 changed files with 50 additions and 1 deletions

View file

@ -15,7 +15,7 @@ import json
import os
import subprocess
import uuid
from typing import Any, Dict, NamedTuple, Optional, Tuple, Union
from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Union
from osbuild.util import ctx, linux, rmrf
@ -101,6 +101,17 @@ class FsCacheInfo(NamedTuple):
return data
class FsCacheObjectInfo(NamedTuple):
""" File System Cache object information
This type represents information about a single cache object. The
last_used information is only guaranteed to be valid while the cache
is locked.
"""
name: str
last_used: float
class FsCache(contextlib.AbstractContextManager, os.PathLike):
"""File System Cache
@ -1059,6 +1070,22 @@ class FsCache(contextlib.AbstractContextManager, os.PathLike):
raise self.MissError() from None
raise e
def _last_used_objs(self) -> List[FsCacheObjectInfo]:
"""Return a list of FsCacheObjectInfo with name, last_used
information sorted by last_used time.
Note that this function will be racy when used without a lock and
the caller needs to handle this.
"""
objs = []
for name in os.listdir(self._path(self._dirname_objects)):
try:
last_used = self._last_used(name)
except (OSError, FsCache.MissError):
continue
objs.append(FsCacheObjectInfo(name=name, last_used=last_used))
return sorted(objs, key=lambda obj: obj.last_used)
@property
def info(self) -> FsCacheInfo:
"""Query Cache Information

View file

@ -498,3 +498,25 @@ def test_cache_full_behavior(tmp_path):
with pytest.raises(fscache.FsCache.MissError):
with cache.load("o3") as o:
pass
@pytest.mark.skipif(not has_precise_fs_timestamps(), reason="need precise fs timestamps")
def test_cache_last_used_objs(tmpdir):
cache = fscache.FsCache("osbuild-cache-id", tmpdir)
with cache:
# use big sizes to mask the effect of dirs using 4k of space too
cache.info = cache.info._replace(maximum_size=256 * 1024)
# add objs to the store
for obj in ["o3", "o2", "o1"]:
with cache.store(obj):
pass
with cache.load(obj):
pass
sleep_for_fs()
sorted_objs = cache._last_used_objs()
assert [e[0] for e in sorted_objs] == ["o3", "o2", "o1"]
# access o2
with cache.load("o2"):
pass
sorted_objs = cache._last_used_objs()
assert [e[0] for e in sorted_objs] == ["o3", "o1", "o2"]