diff --git a/osbuild/util/fscache.py b/osbuild/util/fscache.py new file mode 100644 index 00000000..d08da631 --- /dev/null +++ b/osbuild/util/fscache.py @@ -0,0 +1,1011 @@ +"""File System Cache + +This module implements a data cache that uses the file system to store data +as well as protect parallel access. It implements automatic cache management +and allows purging the cache during runtime, pruning old entries and keeping +the cache under a given limit. +""" + +# pylint: disable=too-many-lines + +import contextlib +import ctypes +import errno +import json +import os +import uuid +from typing import Any, Dict, NamedTuple, Optional, Tuple, Union + +from osbuild.util import ctx, linux, rmrf + +__all__ = [ + "FsCache", + "FsCacheInfo", +] + + +MaximumSizeType = Optional[Union[int, str]] + + +class FsCacheInfo(NamedTuple): + """File System Cache Information + + This type represents static cache information. It is an immutable named + tuple and used to query or set the configuration of a cache. + + creation_boot_id - Hashed linux boot-id at the time of cache-creation + maximum_size - Maximum cache size in bytes, or "unlimited" + """ + + creation_boot_id: Optional[str] = None + maximum_size: MaximumSizeType = None + + @classmethod + def from_json(cls, data: Any) -> "FsCacheInfo": + """Create tuple from parsed JSON + + This takes a parsed JSON value and converts it into a tuple with the + same information. Unknown fields in the input are ignored. The input + is usually taken from `json.load()` and similar. + """ + + if not isinstance(data, dict): + return cls() + + creation_boot_id = None + maximum_size: MaximumSizeType = None + + # parse "creation-boot-id" + _creation_boot_id = data.get("creation-boot-id", None) + if isinstance(_creation_boot_id, str) and len(_creation_boot_id) == 32: + creation_boot_id = _creation_boot_id + + # parse "maximum-size" + _maximum_size = data.get("maximum-size", None) + if isinstance(_maximum_size, int): + maximum_size = _maximum_size + elif isinstance(_maximum_size, str) and _maximum_size == "unlimited": + maximum_size = "unlimited" + + # create immutable tuple + return cls( + creation_boot_id, + maximum_size, + ) + + def to_json(self) -> Dict[str, Any]: + """Convert tuple into parsed JSON + + Return a parsed JSON value that represents the same values as this + tuple does. Unset values are skipped. The returned value can be + converted into formatted JSON via `json.dump()` and similar. + """ + + data: Dict[str, Any] = {} + if self.creation_boot_id is not None: + data["creation-boot-id"] = self.creation_boot_id + if self.maximum_size is not None: + data["maximum-size"] = self.maximum_size + return data + + +class FsCache(contextlib.AbstractContextManager, os.PathLike): + """File System Cache + + This file system cache context represents an on-disk cache. That is, it + allows storing information on the file system, and retrieving it from other + contexts. + + A single cache directory can be shared between many processes at the same + time. The cache protects access to the cached data. The cache must not be + shared over non-coherent network storage, but is designed for system-local + linux file-systems. + + The file-system layout is as follows: + + [cache]/ + ├── cache.info + ├── cache.lock + ├── cache.size + ├── objects/ + │ ├── [id0] + │ ├── [id1]/ + │ │ ├── data/ + │ │ │ └── ... + │ │ ├── object.info + │ │ └── object.lock + │ └── ... + └── stage/ + ├── uuid-[uuid0] + ├── uuid-[uuid1]/ + │ ├── data/ + │ │ └── ... + │ ├── object.info + │ └── object.lock + └── ... + + The central data store is in the `objects` subdirectory. Every cache entry + has a separate subdirectory there. To guard access, a read-lock on + `object.lock` is required for all readers, a write-lock is required for all + writers. Static information about the object is available in the + `object.info` file. + + As an optimization, entries in the object store consisting of a single + file can be stored directly underneath `objects` without a separate + subdirectory hierarchy. Their guarding lock is directly taken on this file + and no metadata is available, other than the file information itself. This + is used extensively by the cache management to prepare objects for atomic + replacements. Due to lack of metadata, they are volatile and can be + deleted as soon as they are unlocked. + + Generally, access to the cache is non-blocking. That is, if a read-lock + cannot be acquired, an entry is considered non-existant. Thus, unless + treated as a `write-once` cache, cache efficiency will decrease when taking + write-locks. + + The `data/` directory contains the content of a cache entry. Its content + is solely defined by the creator of the entry and the cache makes no + assumptions about its layout. Note that the `data/` directory itself can be + modified (e.g., permission-changes) if an unnamed top-level directory is + desired (e.g., to store a directory tree). + + Additionally to the `objects/` directory, a similar `stage/` directory is + provided. This directory is `write-only` and used to prepare entries for + the object store before committing them. The staging area is optional. It + is completely safe to do the same directly in the object store. However, + the separation allows putting the staging area on a different file-system + (e.g., symlinking to a tmpfs), and thus improving performance for larger + operations. Otherwise, the staging area follows the same rules as the + object store, except that only writers are expected. Hence, staging entries + always use a unique UUID as name. To commit a staging entry, a user is + expected to create an entry in the object store and copy/move the `data/` + directory over. + + To guard against parallel accesses, a set of locks is utilized. Generally, + a `*.lock`-file locks the directory it is in, while a lock on any other + file just locks that file (unfortunately, we cannot acquire write-locks on + directories directly, since it would require opening them for writing, + which is not possible on linux). `cache.lock` can be used to guard the + entire cache. A write-lock will keep any other parallel operation out, + while a read-lock merely acquires cache access (you are still allowed to + modify the cache, but need fine-grained locking). Hence, a write-lock on the + global `cache.lock` file is only required for operations that cannot use + fine-grained locking. The latter requires individual locking for each file + or each object store entry you modify. In all those cases you must ensure + for parallel modifications, since lock acquisition on file-systems can only + be done after opening a file. + """ + + class MissError(Exception): + """Cache Miss Exception + + This error is raised when a cache entry is not found. Due to the + shared nature of the cache, a caller must be aware that any entry can + be created or deleted by other concurrent operations, at any point in + time. Hence, a cache miss only reflects the state of the cache at a + particular time under a particular lock. + """ + + # static parameters + _dirname_data = "data" + _dirname_objects = "objects" + _dirname_stage = "stage" + _filename_cache_info = "cache.info" + _filename_cache_lock = "cache.lock" + _filename_cache_size = "cache.size" + _filename_object_info = "object.info" + _filename_object_lock = "object.lock" + + # constant properties + _appid: str + _libc: linux.Libc + _path_cache: Any + + # context-manager properties + _active: bool + _bootid: Optional[str] + _lock: Optional[int] + _info: FsCacheInfo + _info_maximum_size: int + + def __init__(self, appid: str, path_cache: Any): + """Create File System Cache + + This creates a new file-system cache. It does not create the cache, nor + access any of its content. You must enter its context-manager to prepare + the cache for access. Any access outside of a context-manager will raise + an assertion error, unless explicitly stated otherwise. + + Parameters: + ----------- + appid + The application-ID of the caller. This can be any random string. It + is used to initialize the application-specific boot-ID used to tag + caches and detect whether an entry was created during the same boot. + path_cache + The path to the cache directory. The directory (and the path to it) + is created if it does not exist. + """ + + self._appid = appid + self._libc = linux.Libc.default() + self._path_cache = os.fspath(path_cache) + + self._active = False + self._bootid = None + self._lock = None + self._info = FsCacheInfo() + self._info_maximum_size = 0 + + @staticmethod + def _calculate_size(path_target: str) -> int: + """Calculate total size of a directory tree + + Calculate the total amount of storage required for a directory tree in + bytes. This does not account for metadata, but only for stored file + content. + + Parameters: + ----------- + path_target + File-system path to the directory to operate on. + """ + + return sum( + os.lstat( + os.path.join(path, f) + ).st_size for path, dirs, files in os.walk( + path_target + ) for f in files + ) + + def __fspath__(self) -> Any: + """Return cache path + + Return the path to this cache as provided to the constructor of the + cache. No conversions are applied, so the path is absolute if the + path as provided by the caller was absolute, and vice-versa. + + This is part of the `os.PathLike` interface. See its documentation. + """ + + return self._path_cache + + def _path(self, *rpaths): + """Return absolute path into cache location + + Take the relative path from the caller and turn it into an absolute + path. Since most operations take a relative path from the cache root + to a cache location, this function can be used to make those paths + absolute. + + Parameters: + ----------- + rpaths + Relative paths from cache root to the desired cache location. + """ + + return os.path.join(self, *rpaths) + + @contextlib.contextmanager + def _atomic_open( + self, + rpath: str, + *, + wait: bool, + write: bool, + closefd: bool = True, + oflags: int = 0, + ): + """Atomically open and lock file + + Open the cache-file at the specified relative path and acquire a + lock on it. Yield the file-descriptor to the caller. Once control + returns, all locks are released (if not already done so by the + caller) and the file-descriptor is closed. + + Note that this operation involves a retry-loop in case the file is + replaced or moved before the lock is acquired. + + Parameters: + ----------- + rpath + Relative path from the cache-root to the file to open. + wait + Whether to wait for locks to be acquired. + write + If false, the file is opened for reading and a read lock is + acquired. If true, it is opened for read and write and a write + lock is acquired. + closefd + If false, retain file-descriptor (and lock) on success. + oflags + Additional open-flags to pass to `os.open()`. + """ + + fd = None + path = self._path(rpath) + + try: + while True: + # Open the file and acquire a lock. Make sure not to modify the + # file in any way, ever. If non-blocking operation was requested + # the lock call will raise `EAGAIN` if contended. + flags = os.O_RDONLY | os.O_CLOEXEC | oflags + lock = linux.fcntl.F_RDLCK + if write: + flags = flags | os.O_RDWR + lock = linux.fcntl.F_WRLCK + fd = os.open(path, flags, 0o644) + linux.fcntl_flock(fd, lock, wait=wait) + + # The file might have been replaced between opening it and + # acquiring the lock. Hence, run `stat(2)` on the path again + # and compare it to `fstat(2)` of the open file. If they differ + # simply retry. + st_fd = os.stat(fd) + st_path = os.stat(path) + if st_fd.st_dev != st_path.st_dev or st_fd.st_ino != st_path.st_ino: + linux.fcntl_flock(fd, linux.fcntl.F_UNLCK) + os.close(fd) + fd = None + continue + + # Yield control to the caller to make use of the FD. If the FD + # is to be retained, clear it before returning to the cleanup + # handlers. + yield fd + + if not closefd: + fd = None + + return + finally: + if fd is not None: + linux.fcntl_flock(fd, linux.fcntl.F_UNLCK) + os.close(fd) + + @contextlib.contextmanager + def _atomic_file( + self, + rpath: str, + rpath_store: str, + closefd: bool = True, + ignore_exist: bool = False, + replace: bool = False, + ): + """Create and link temporary file + + Create a new temporary file and yield control to the caller to fill in + data and metadata. Once control is returned, the file is linked at the + specified location. If an exception is raised, the temporary file is + discarded. + + This function emulates the behavior of `O_TMPFILE` for systems and + file-systems where it is not available. + + Parameters: + ----------- + rpath + Relative path from cache-root to the location where to link the + file on success. + rpath_store + Relative path from cache-root to the store to use for temporary + files. This must share the same mount-instance as the final path. + closefd + If false, retain file-descriptor (and lock) on success. + ignore_exist + If true, an existing file at the desired location during a + replacement will not cause an error. + replace + If true, replace a previous file at the specified location. If + false, no replacement takes place and the temporary file is + discarded. + """ + + assert not replace or not ignore_exist + + rpath_tmp = None + + try: + # First create a random file in the selected store. This file will + # have a UUID as name and thus we can safely use `O_CREAT|O_EXCL` + # to create it and guarantee its uniqueness. + name = "uuid-" + uuid.uuid4().hex + rpath_tmp = os.path.join(rpath_store, name) + with self._atomic_open( + rpath_tmp, + wait=True, + write=True, + closefd=closefd, + oflags=os.O_CREAT | os.O_EXCL, + ) as fd: + # Yield control to the caller to fill in data and metadata. + with os.fdopen(fd, "r+", closefd=False, encoding="utf8") as file: + yield file + + if replace: + flags = ctypes.c_uint(0) + else: + flags = self._libc.RENAME_NOREPLACE + + suppress = [] + if ignore_exist: + suppress.append(errno.EEXIST) + + # As a last step, move the file to the desired location. + with ctx.suppress_oserror(*suppress): + self._libc.renameat2( + oldpath=self._path(rpath_tmp).encode(), + newpath=self._path(rpath).encode(), + flags=flags, + ) + finally: + if rpath_tmp is not None: + # If the temporary file exists, we delete it on error. If we + # haven't created it, or if we already moved it, this will be a + # no-op. Due to the unique name, we will never delete a file we + # do not own. + # On fatal errors, we leak the file into the object store. Due + # to the released lock and UUID name, cache management will + # clean it up. + with ctx.suppress_oserror(errno.ENOENT): + os.unlink(self._path(rpath_tmp)) + + def _atomic_dir(self, rpath_store: str) -> Tuple[str, int]: + """Atomically create and lock an anonymous directory + + Create an anonymous directory in the specified storage directory + relative to the cache-root. The directory will have a UUID as name. On + success, the name of the directory and the open file-descriptor to its + acquired lock file (write-locked) are returned. + + The lock-file logic follows the cache-logic for objects. Hence, the + cache scaffolding for the specified store must exist. No other cache + infrastructure is required, though. + + Parameters: + ----------- + rpath_store + Relative path from the cache-root to the storage directory to create + the new anonymous directory in. Most likely, this is either the + object-store or the staging-area. + """ + + rpath_dir = None + rpath_lock = None + + try: + while True: + # Allocate a UUID for the new directory and prepare the paths + # to the directory and lock-file inside. + name = "uuid-" + uuid.uuid4().hex + rpath_dir = os.path.join(rpath_store, name) + rpath_lock = os.path.join(rpath_dir, self._filename_object_lock) + + # Create an anonymous lock-file, but before linking it create + # the target directory to link the file in. Use an ExitStack + # to control exactly where to catch exceptions. + with contextlib.ExitStack() as es: + f = es.enter_context( + self._atomic_file( + rpath_lock, + rpath_store, + closefd=False, + ) + ) + lockfd = f.fileno() + os.mkdir(self._path(rpath_dir)) + + # Exit the `_atomic_file()` context, thus triggering a link + # of the anonymous lock-file into the new directory. A + # parallel cleanup might have deleted the empty directory, + # so catch `ENOENT` and retry. + try: + es.close() + except OSError as e: + if e.errno == errno.ENOENT: + continue + raise + + return (name, lockfd) + except: + # On error, we might have already created the directory or even + # linked the lock-file. Try unlinking both, but ignore errors if + # they do not exist. Due to using UUIDs as names we cannot conflict + # with entries created by some-one else. + if rpath_lock is not None: + with ctx.suppress_oserror(errno.ENOENT, errno.ENOTDIR): + os.unlink(self._path(rpath_lock)) + if rpath_dir is not None: + with ctx.suppress_oserror(errno.ENOENT, errno.ENOTDIR): + os.rmdir(self._path(rpath_dir)) + raise + + def _create_scaffolding(self): + """Create cache scaffolding + + Create the directories leading to the cache, as well as the internal + scaffolding directories and files. This ensures that an existing cache + is not interrupted or rewritten. Hence, this can safely be called in + parallel, even on live caches. + + If this happens to create a new cache, it is initialized with its + default configuration and constraints. By default, this means the cache + has a maximum size of 0 and thus is only used as staging area with no + long-time storage. + + This call requires no cache-infrastructure to be in place, and can be + called repeatedly at any time. + """ + + # Create the directory-scaffolding of the cache. Make sure to ignore + # errors when they already exist, to allow for parallel setups. + dirs = [ + self._path(self._dirname_objects), + self._path(self._dirname_stage), + ] + for i in dirs: + os.makedirs(i, exist_ok=True) + + # Create the file-scaffolding of the cache. We fill in the default + # information and ignore racing operations. + with self._atomic_file(self._filename_cache_info, self._dirname_objects, ignore_exist=True) as f: + f.write("{}") + with self._atomic_file(self._filename_cache_lock, self._dirname_objects, ignore_exist=True) as f: + pass + with self._atomic_file(self._filename_cache_size, self._dirname_objects, ignore_exist=True) as f: + f.write("0") + + def _load_cache_info(self, info: Optional[FsCacheInfo] = None): + """Load cache information + + This loads information about the cache into this cache-instance. The + cache-information is itself cached on this instance and only updated + on request. If the underlying file in the cache changes at runtime it + is not automatically re-loaded. Only when this function is called the + information is reloaded. + + By default this function reads the cache-information from the + respective file in the cache and then caches it on this instance. If + the `info` argument is not `None`, then no information is read from the + file-system, but instead the information is taken from the `info` + argument. This allows changing the cache-information of this instance + without necessarily modifying the underlying file. + + This call requires the cache scaffolding to be fully created. + + Parameters: + ----------- + info + If `None`, the cache info file is read. Otherwise, the information + is taken from this tuple. + """ + + # Parse the JSON data into python. + if info is None: + with open(self._path(self._filename_cache_info), "r", encoding="utf8") as f: + info_raw = json.load(f) + + info = FsCacheInfo.from_json(info_raw) + + # Retain information. + self._info = info + + # Parse `maximum-size` into internal representation. + if info.maximum_size == "unlimited": + self._info_maximum_size = -1 + elif isinstance(info.maximum_size, int): + self._info_maximum_size = info.maximum_size + else: + self._info_maximum_size = 0 + + def _is_active(self): + # Internal helper to verify we are in an active context-manager. + return self._active + + def __enter__(self): + assert not self._active + + try: + # Acquire the current boot-id so we can tag entries accordingly, and + # judge entries that are from previous boots. + self._bootid = linux.proc_boot_id(self._appid).hex + + # Create the scaffolding for the entire cache. + self._create_scaffolding() + + # Acquire a shared cache lock. + self._lock = os.open( + self._path(self._filename_cache_lock), + os.O_RDONLY | os.O_CLOEXEC, + ) + linux.fcntl_flock(self._lock, linux.fcntl.F_RDLCK, wait=True) + + # Read the cache configuration. + self._load_cache_info() + + self._active = True + return self + except: + self.__exit__(None, None, None) + raise + + def __exit__(self, exc_type, exc_value, exc_tb): + # Discard any state of this context and reset to original state. + if self._lock is not None: + linux.fcntl_flock(self._lock, linux.fcntl.F_UNLCK) + os.close(self._lock) + self._lock = None + self._active = False + self._bootid = None + self._info = FsCacheInfo() + # We always have to leave the file-system scaffolding around. Even if + # the cache is entirely empty, we cannot know whether there are other + # parallel accesses (without unreasonable effort). + + def _update_cache_size(self, diff: int) -> bool: + """Update cache size + + Update the total cache size by the specified amount, unless it exceeds + the cache limits. + + This carefully updates the stored cache size to allow for parallel + updates by other cache users. If the cache limits are exceeded, the + operation is canceled and `False` is returned. Otherwise, `True` is + returned. + + If the specified amount is negative, the operation always succeeds. If + the cache size would end up negative, it is capped at 0. + + This operation requires an active context. + """ + + assert self._is_active() + + # Open the cache-size and lock it for writing. But instead of writing + # directly to it, we replace it with a new file. This guarantees that + # we cannot crash while writing a partial size, but always atomically + # update the content. + with self._atomic_open(self._filename_cache_size, write=True, wait=True) as fd: + with os.fdopen(fd, "r", closefd=False, encoding="utf8") as f: + size = json.load(f) + + if size + diff < 0: + size = 0 + elif (self._info_maximum_size < 0) or (size + diff <= self._info_maximum_size): + size = size + diff + else: + return False + + with self._atomic_file(self._filename_cache_size, self._dirname_objects, replace=True) as f: + json.dump(size, f) + + return True + + def _rm_r_object(self, rpath_dir: str): + """Remove object + + Recursively remove all traces of a stored object. This either requires + the caller to hold a write-lock on the entry, or otherwise guarantee + that no cache lookups can acquire the entry concurrently. + + This carefully deletes any traces of the entry, making sure to first + mark the object as invalid, and dropping the lock-file last. This can + safely be called on partially constructured or non-existing entries. + + Parameters: + ----------- + rpath_dir + Relative path from the cache-root to the object directory. + """ + + path_dir = self._path(rpath_dir) + path_info = os.path.join(path_dir, self._filename_object_info) + path_lock = os.path.join(path_dir, self._filename_object_lock) + + # Optimization: Bail out early if the entry is non-existant + if not os.path.lexists(path_dir): + return + + # First step, we unlink the info-file. This will mark the entry as + # volatile and thus it will get cleaned up by cache management in case + # we crash while deleting it. Furthermore, no cache lookups will ever + # consider the entry again if the info-file is missing. + with ctx.suppress_oserror(errno.ENOENT, errno.ENOTDIR): + os.unlink(path_info) + + # Now iterate the directory and drop everything _except_ the lock file. + # This makes sure no parallel operation will needlessly race with us. In + # case no lock is acquired, we still allow for parallel racing cleanups. + # + # Note that racing cleanups might delete the entire directory at any + # time during this iteration. Furthermore, `scandir()` is not atomic but + # repeatedly calls into the kernel. Hence, we carefully bail out once + # it reports a non-existant directory. + with ctx.suppress_oserror(errno.ENOENT, errno.ENOTDIR): + for entry in os.scandir(path_dir): + if entry.name == self._filename_object_lock: + continue + with ctx.suppress_oserror(errno.ENOENT, errno.ENOTDIR): + if entry.is_dir(): + rmrf.rmtree(entry.path) + else: + os.unlink(entry.path) + + # With everything gone, we unlink the lock-file and eventually delete + # the directory. Again, cleanup routines might have raced us, so avoid + # failing in case the entries are already gone. + with ctx.suppress_oserror(errno.ENOENT, errno.ENOTDIR): + os.unlink(path_lock) + with ctx.suppress_oserror(errno.ENOENT, errno.ENOTDIR): + os.rmdir(path_dir) + + @contextlib.contextmanager + def stage(self): + """Create staging entry + + Create a new entry in the staging area and yield control to the caller + with the relative path to the entry. Once control returns, the staging + entry is completely discarded. + + If the application crashes while holding a staging entry, it will be + left behind in the staging directory, but unlocked and marked as stale. + Hence, any cache management routine will discard it. + """ + + assert self._is_active() + + uuidname = None + lockfd = None + + try: + # Create and lock a new anonymous object in the staging area. + uuidname, lockfd = self._atomic_dir(self._dirname_stage) + + rpath_data = os.path.join( + self._dirname_stage, + uuidname, + self._dirname_data, + ) + + # Prepare an empty data directory and yield it to the caller. + os.mkdir(self._path(rpath_data)) + yield rpath_data + finally: + if lockfd is not None: + self._rm_r_object(os.path.join(self._dirname_stage, uuidname)) + linux.fcntl_flock(lockfd, linux.fcntl.F_UNLCK) + os.close(lockfd) + + @contextlib.contextmanager + def store(self, name: str): + """Store object in cache + + Create a new entry and store it in the cache with the specified name. + The entry is first created with an anonymous name and control is yielded + to the caller to fill in data. Once control returns, the entry is + committed with the specified name. + + The final commit is skipped if an entry with the given name already + exists, or its name is claimed for other reasons. Furthermore, the + commit is skipped if cache limits are exceeded, or if cache maintenance + refuses the commit. Hence, a commit can never be relied upon and the + entry might be deleted from the cache as soon as the commit was invoked. + + Parameters: + ----------- + name + Name to store the object under. + """ + + assert self._is_active() + assert self._bootid is not None + + if not name: + raise ValueError() + + uuidname = None + lockfd = None + + try: + # Create and lock a new anonymous object in the staging area. + uuidname, lockfd = self._atomic_dir(self._dirname_objects) + + rpath_uuid = os.path.join( + self._dirname_objects, + uuidname, + ) + rpath_data = os.path.join( + rpath_uuid, + self._dirname_data, + ) + rpath_info = os.path.join( + rpath_uuid, + self._filename_object_info, + ) + path_uuid = self._path(rpath_uuid) + path_data = self._path(rpath_data) + path_info = self._path(rpath_info) + + # Prepare an empty data directory and yield it to the caller. + os.mkdir(path_data) + yield rpath_data + + # Collect metadata about the new entry. + info: Dict[str, Any] = {} + info["creation-boot-id"] = self._bootid + info["size"] = self._calculate_size(path_data) + + # Update the total cache-size. If it exceeds the limits, bail out + # but do not trigger an error. It behaves as if the entry was + # committed and immediately deleted by racing cache management. No + # need to tell the caller about it (if that is ever needed, we can + # provide for it). + # + # Note that if we crash after updating the total cache size, but + # before committing the object information, the total cache size + # will be out of sync. However, it is never overcommitted, so we + # will never violate any cache invariants. The cache-size will be + # re-synchronized by any full cache-management operation. + if not self._update_cache_size(info["size"]): + return + + try: + # Commit the object-information, thus marking it as fully + # committed and accounted in the cache. + with open(path_info, "x", encoding="utf8") as f: + json.dump(info, f) + + # As last step move the entry to the desired location. If the + # target name is already taken, we bail out and pretend the + # entry was immediately overwritten by another one. + path_name = self._path(self._dirname_objects, name) + try: + self._libc.renameat2( + oldpath=path_uuid.encode(), + newpath=path_name.encode(), + flags=self._libc.RENAME_NOREPLACE, + ) + except OSError as e: + ignore = [errno.EEXIST, errno.ENOTDIR, errno.ENOTEMPTY] + if e.errno not in ignore: + raise + + uuidname = None + finally: + # If the anonymous entry still exists, it will be cleaned up by + # the outer handler. Hence, make sure to drop the info file + # again and de-account it, so we don't overcommit. + if os.path.lexists(path_uuid): + with ctx.suppress_oserror(errno.ENOENT, errno.ENOTDIR): + os.unlink(path_info) + self._update_cache_size(-info["size"]) + finally: + if lockfd is not None: + if uuidname is not None: + # In case this runs after the object was renamed, but before + # `uuidname` was cleared, then `_rm_r_object()` will be a + # no-op. + self._rm_r_object(os.path.join(self._dirname_objects, uuidname)) + linux.fcntl_flock(lockfd, linux.fcntl.F_UNLCK) + os.close(lockfd) + + @contextlib.contextmanager + def load(self, name: str): + """Load a cache entry + + Find the cache entry with the given name, acquire a read-lock and + yield its path back to the caller. Once control returns, the entry + is released. + + The returned path is the relative path between the cache and the top + level directory of the cache entry. + + Parameters: + ----------- + name + Name of the cache entry to find. + """ + + assert self._is_active() + + if not name: + raise ValueError() + + with contextlib.ExitStack() as es: + # Use an ExitStack so we can catch exceptions raised by the + # `__enter__()` call on the context-manager. We want to catch + # `OSError` exceptions and convert them to cache-misses. + try: + es.enter_context( + self._atomic_open( + os.path.join( + self._dirname_objects, + name, + self._filename_object_lock, + ), + write=False, + wait=False, + ) + ) + except OSError as e: + if e.errno in [errno.EAGAIN, errno.ENOENT, errno.ENOTDIR]: + raise self.MissError() from None + raise e + + yield os.path.join( + self._dirname_objects, + name, + self._dirname_data, + ) + + @property + def info(self) -> FsCacheInfo: + """Query Cache Information + + Return the parsed cache information which is currently cached on this + cache-instance. The cache information has all unknown fields stripped. + + Unset values are represented by `None`, and the cache will interpret + it as the default value for the respective field. + """ + + assert self._is_active() + + return self._info + + @info.setter + def info(self, info: FsCacheInfo): + """Write Cache Information + + Update and write the cache-information onto the file-system. This first + locks the cache-information file, reads it in, updates the newly read + information with the data from `info`, writes the result back to disk + and finally unlocks the file. + + There are a few caveats to take into account: + + * The locking guarantees that simultaneous updates will be properly + ordered and never discard any information. + * Since this reads in the newest cache-information, this function can + update cache-information values other than the ones from `info`. Any + value unset in `info` will be re-read from disk and thus might + change (in the future, if required, this can be adjusted to allow a + caller to hook into the operation while the lock is held). + * You cannot strip known values from the cache-information. Any value + not present in `info` is left unchanged. You must explicitly set a + value to its default to reset it. + * Cache-information fields that are not known to this implementation + are never exposed to the caller, but are left unchanged on-disk. + This guarantees that future extensions are left alone and are not + accidentally stripped. + + The cached information of this instance is updated to reflect the + changes. + + Parameters: + ----------- + info + Cache information object to consume and write. + """ + + assert self._is_active() + + with self._atomic_open(self._filename_cache_info, write=True, wait=True) as fd: + with os.fdopen(fd, "r", closefd=False, encoding="utf8") as f: + info_raw = json.load(f) + + # If the on-disk data is in an unexpected format, we never touch + # it. If it is a JSON-object, we update it with the new values and + # then re-parse it into a full `FsCacheInfo` with all known fields + # populated. + if isinstance(info_raw, dict): + info_raw.update(info.to_json()) + info = FsCacheInfo.from_json(info_raw) + + # Replace the file with the new values. This releases the lock. + with self._atomic_file(self._filename_cache_info, self._dirname_objects, replace=True) as f: + json.dump(info_raw, f) + + self._load_cache_info(info) diff --git a/test/mod/test_util_fscache.py b/test/mod/test_util_fscache.py new file mode 100644 index 00000000..4a3141b1 --- /dev/null +++ b/test/mod/test_util_fscache.py @@ -0,0 +1,332 @@ +# +# Tests for the 'osbuild.util.fscache' module. +# + +# pylint: disable=protected-access + +import os +import tempfile + +import pytest + +from osbuild.util import fscache + + +@pytest.fixture(name="tmpdir") +def tmpdir_fixture(): + with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp: + yield tmp + + +def test_calculate_size(tmpdir): + # + # Test the `_calculate_size()` helper and verify it only includes file + # content in its calculation. + # + + os.mkdir(os.path.join(tmpdir, "dir")) + + assert fscache.FsCache._calculate_size(os.path.join(tmpdir, "dir")) == 0 + + with open(os.path.join(tmpdir, "dir", "file"), "x", encoding="utf8") as f: + pass + + assert fscache.FsCache._calculate_size(os.path.join(tmpdir, "dir")) == 0 + + with open(os.path.join(tmpdir, "dir", "file"), "w", encoding="utf8") as f: + f.write("foobar") + + assert fscache.FsCache._calculate_size(os.path.join(tmpdir, "dir")) == 6 + + +def test_pathlike(tmpdir: os.PathLike[str]): + # + # Verify behavior of `__fspath__()`. + # + + class Wrapper: + def __init__(self, path: str): + self._path = path + + def __fspath__(self) -> str: + return self._path + + # Test with a plain string as argument + dir_str: str = os.fspath(tmpdir) + cache1 = fscache.FsCache("osbuild-test-appid", dir_str) + assert os.fspath(cache1) == tmpdir + assert os.path.join(cache1, "foobar") == os.path.join(tmpdir, "foobar") + + # Test with a wrapper-type as argument + dir_pathlike: Wrapper = Wrapper(os.fspath(tmpdir)) + cache2 = fscache.FsCache("osbuild-test-appid", dir_pathlike) + assert os.fspath(cache2) == tmpdir + assert os.path.join(cache2, "foobar") == os.path.join(tmpdir, "foobar") + + +def test_path(tmpdir): + # + # Verify behavior of `_path()`. + # + + cache = fscache.FsCache("osbuild-test-appid", tmpdir) + with cache: + assert cache._path() == cache._path_cache + assert cache._path("dir") == os.path.join(cache._path_cache, "dir") + assert cache._path("dir", "file") == os.path.join(cache._path_cache, "dir", "file") + + +def test_atomic_open(tmpdir): + # + # Verify the `_atomic_open()` helper correctly opens existing files and + # takes a lock. + # + + cache = fscache.FsCache("osbuild-test-appid", tmpdir) + with cache: + # Must never create files. + with pytest.raises(OSError): + with cache._atomic_open("file", write=False, wait=False) as f: + pass + + # Create the file with "foo" as content. + with open(os.path.join(tmpdir, "file"), "x", encoding="utf8") as f: + f.write("foo") + + # Open and acquire a write-lock. Then verify a read-lock fails. + with cache._atomic_open("file", write=True, wait=False): + with pytest.raises(BlockingIOError): + with cache._atomic_open("file", write=False, wait=False): + pass + + +def test_atomic_file(tmpdir): + # + # Verify behavior of `_atomic_file()` as replacement for `O_TMPFILE`. + # + + cache = fscache.FsCache("osbuild-test-appid", tmpdir) + with cache: + rpath_store = cache._dirname_objects + path_store = os.path.join(cache._path_cache, rpath_store) + + # Initially the store is empty. + assert len(list(os.scandir(path_store))) == 0 + + # Create a file and verify there is almost exactly 1 file in the store. + with cache._atomic_file(os.path.join(rpath_store, "file"), rpath_store) as f: + assert len(list(os.scandir(path_store))) == 1 + f.write("foo") + assert len(list(os.scandir(path_store))) == 1 + + # Verify `ignore_exist=False` works as expected. + with pytest.raises(OSError): + with cache._atomic_file(os.path.join(rpath_store, "file"), rpath_store) as f: + # Temporarily, there will be 2 files. + assert len(list(os.scandir(path_store))) == 2 + f.write("bar") + assert len(list(os.scandir(path_store))) == 1 + with open(os.path.join(path_store, "file"), "r", encoding="utf8") as f: + assert f.read() == "foo" + + # Verify `ignore_exist=True` works as expected. + with cache._atomic_file(os.path.join(rpath_store, "file"), rpath_store, ignore_exist=True) as f: + f.write("bar") + assert len(list(os.scandir(path_store))) == 1 + with open(os.path.join(path_store, "file"), "r", encoding="utf8") as f: + assert f.read() == "foo" + + # Verify `replace=True`. + with cache._atomic_file(os.path.join(rpath_store, "file"), rpath_store, replace=True) as f: + f.write("bar") + assert len(list(os.scandir(path_store))) == 1 + with open(os.path.join(path_store, "file"), "r", encoding="utf8") as f: + assert f.read() == "bar" + + # Combining `replace` and `ignore_exist` is not allowed. + with pytest.raises(AssertionError): + with cache._atomic_file( + os.path.join(rpath_store, "file"), + rpath_store, + replace=True, + ignore_exist=True, + ) as f: + pass + + +def test_atomic_dir(tmpdir): + # + # Verify the `_atomic_dir()` helper correctly creates anonymous files + # and yields the name and lock-file. + # + + cache = fscache.FsCache("osbuild-test-appid", tmpdir) + with cache: + # The relative-path must exist, so expect an error if it does not. + with pytest.raises(OSError): + cache._atomic_dir("dir") + + assert len(list(os.scandir(os.path.join(tmpdir, cache._dirname_objects)))) == 0 + + (name, lockfd) = cache._atomic_dir(cache._dirname_objects) + assert name.startswith("uuid-") + assert len(name) == 37 + assert lockfd >= 0 + os.close(lockfd) + + assert len(list(os.scandir(os.path.join(tmpdir, cache._dirname_objects)))) == 1 + + +def test_scaffolding(tmpdir): + # + # Verify that the cache creates scaffolding when entered. + # + + cache = fscache.FsCache("osbuild-test-appid", tmpdir) + + assert len(list(os.scandir(tmpdir))) == 0 + + with cache: + pass + + assert len(list(os.scandir(tmpdir))) == 5 + assert len(list(os.scandir(os.path.join(tmpdir, cache._dirname_objects)))) == 0 + assert len(list(os.scandir(os.path.join(tmpdir, cache._dirname_stage)))) == 0 + + with open(os.path.join(tmpdir, cache._filename_cache_info), "r", encoding="utf8") as f: + assert f.read() == "{}" + with open(os.path.join(tmpdir, cache._filename_cache_lock), "r", encoding="utf8") as f: + assert f.read() == "" + with open(os.path.join(tmpdir, cache._filename_cache_size), "r", encoding="utf8") as f: + assert f.read() == "0" + + +def test_cache_info(tmpdir): + # + # Verify that the cache reads and augments cache information. Also verify + # the default values. + # + + cache = fscache.FsCache("osbuild-test-appid", tmpdir) + + with cache: + assert cache._info == fscache.FsCacheInfo() + assert cache.info == cache._info + + assert cache.info.maximum_size is None + assert cache.info.creation_boot_id is None + cache.info = fscache.FsCacheInfo(maximum_size=1024) + assert cache.info.maximum_size == 1024 + assert cache.info.creation_boot_id is None + cache.info = fscache.FsCacheInfo(creation_boot_id="0"*32) + assert cache.info.maximum_size == 1024 + assert cache.info.creation_boot_id == "0"*32 + cache.info = fscache.FsCacheInfo(maximum_size=2048, creation_boot_id="1"*32) + assert cache.info.maximum_size == 2048 + assert cache.info.creation_boot_id == "1"*32 + + assert not fscache.FsCacheInfo().to_json() + assert fscache.FsCacheInfo(creation_boot_id="0"*32).to_json() == { + "creation-boot-id": "0"*32, + } + assert fscache.FsCacheInfo(creation_boot_id="0"*32, maximum_size=1024).to_json() == { + "creation-boot-id": "0"*32, + "maximum-size": 1024, + } + + assert fscache.FsCacheInfo.from_json({}) == fscache.FsCacheInfo() + assert fscache.FsCacheInfo.from_json(None) == fscache.FsCacheInfo() + assert fscache.FsCacheInfo.from_json("foobar") == fscache.FsCacheInfo() + assert fscache.FsCacheInfo.from_json({ + "creation-boot-id": "0"*32, + }) == fscache.FsCacheInfo(creation_boot_id="0"*32) + assert fscache.FsCacheInfo.from_json({ + "creation-boot-id": "0"*32, + "maximum-size": 1024, + }) == fscache.FsCacheInfo(creation_boot_id="0"*32, maximum_size=1024) + assert fscache.FsCacheInfo.from_json({ + "creation-boot-id": "0"*32, + "maximum-size": 1024, + }) == fscache.FsCacheInfo(creation_boot_id="0"*32, maximum_size=1024) + assert fscache.FsCacheInfo.from_json({ + "creation-boot-id": "0"*32, + "unknown0": "foobar", + "unknown1": ["foo", "bar"], + }) == fscache.FsCacheInfo(creation_boot_id="0"*32) + + +def test_store(tmpdir): + # + # API tests for the `store()` method. + # + + cache = fscache.FsCache("osbuild-test-appid", tmpdir) + + with pytest.raises(AssertionError): + with cache.store("foobar"): + pass + + with cache: + with pytest.raises(ValueError): + with cache.store(""): + pass + + +def test_load(tmpdir): + # + # API tests for the `load()` method. + # + + cache = fscache.FsCache("osbuild-test-appid", tmpdir) + + with pytest.raises(AssertionError): + with cache.load("foobar"): + pass + + with cache: + with pytest.raises(ValueError): + with cache.load(""): + pass + + +def test_basic(tmpdir): + # + # A basic cache store+load test. + # + + cache = fscache.FsCache("osbuild-test-appid", tmpdir) + with cache: + cache.info = cache.info._replace(maximum_size=1024) + + with cache.stage() as rpath: + with open(os.path.join(tmpdir, rpath, "bar"), "x", encoding="utf8") as f: + f.write("foobar") + + with pytest.raises(fscache.FsCache.MissError): + with cache.load("foo") as rpath: + pass + + with cache.store("foo") as rpath: + with open(os.path.join(tmpdir, rpath, "bar"), "x", encoding="utf8") as f: + f.write("foobar") + + with cache.load("foo") as rpath: + with open(os.path.join(tmpdir, rpath, "bar"), "r", encoding="utf8") as f: + assert f.read() == "foobar" + + +def test_size_discard(tmpdir): + # + # Verify that a cache with no maximum-size configured can never store any + # entries, but discards them immediately. + # + + cache = fscache.FsCache("osbuild-test-appid", tmpdir) + with cache: + with cache.store("foo") as rpath: + with open(os.path.join(tmpdir, rpath, "bar"), "x", encoding="utf8") as f: + f.write("foobar") + + with pytest.raises(fscache.FsCache.MissError): + with cache.load("foo") as rpath: + pass