debian-forge/osbuild/objectstore.py
Christian Kellner ae0680da11 osbuid: integrate FsCache into ObjectStore
Integrate the recently added file system cache `FsCache` into our
object store `ObjectStore`. NB: This changes the semantics of it:
previously a call to `ObjectStore.commit` resulted in the object
being in the cache (i/o errors aside). But `FsCache.store`, which
is now the backing store for objects, will only commit objects if
there is enough space left. Thus we cannot rely that objects are
present for reading after a call to `FsCache.store`. To cope with
this we now always copy the object into the cache, even for cases
where we previously moved it: for the case where commit is called
with `object_id` matching `Object.id`, which is the case for when
`commit` is called for last stage in the pipeline. We could keep
this optimization but then we would have to special case it and
not call `commit` for these cases but only after we exported all
objects; or in other words, after we are sure we will never read
from any committed object again. The extra complexity seems not
worth it for the little gain of the optimization.
Convert all the tests for the new semantic and also remove a lot
of them that make no sense under this new paradigm.

Add a new command line option `--cache-max-size` which will set
the maximum size of the cache, if specified.
2022-12-09 12:03:40 +01:00

547 lines
14 KiB
Python

import contextlib
import enum
import json
import os
import subprocess
import tempfile
from typing import Any, Optional, Set, Union
from osbuild.util import jsoncomm
from osbuild.util.fscache import FsCache, FsCacheInfo
from osbuild.util.mnt import mount, umount
from osbuild.util.types import PathLike
from . import api
__all__ = [
"ObjectStore",
]
class PathAdapter:
"""Expose an object attribute as `os.PathLike`"""
def __init__(self, obj: Any, attr: str) -> None:
self.obj = obj
self.attr = attr
def __fspath__(self):
return getattr(self.obj, self.attr)
class Object:
class Mode(enum.Enum):
READ = 0
WRITE = 1
class Metadata:
"""store and retrieve metadata for an object"""
def __init__(self, base, folder: Optional[str] = None) -> None:
self.base = base
self.folder = folder
os.makedirs(self.path, exist_ok=True)
def _path_for_key(self, key) -> str:
assert key
name = f"{key}.json"
return os.path.join(self.path, name)
@property
def path(self):
if not self.folder:
return self.base
return os.path.join(self.base, self.folder)
@contextlib.contextmanager
def write(self, key):
tmp = tempfile.NamedTemporaryFile(
mode="w",
encoding="utf8",
dir=self.path,
prefix=".",
suffix=".tmp.json",
delete=True,
)
with tmp as f:
yield f
f.flush()
# if nothing was written to the file
si = os.stat(tmp.name)
if si.st_size == 0:
return
dest = self._path_for_key(key)
# ensure it is proper json?
os.link(tmp.name, dest)
@contextlib.contextmanager
def read(self, key):
dest = self._path_for_key(key)
try:
with open(dest, "r", encoding="utf8") as f:
yield f
except FileNotFoundError:
raise KeyError(f"No metadata for '{key}'") from None
def set(self, key: str, data):
if data is None:
return
with self.write(key) as f:
json.dump(data, f, indent=2)
def get(self, key: str):
with contextlib.suppress(KeyError):
with self.read(key) as f:
return json.load(f)
return None
def __fspath__(self):
return self.path
def __init__(self, cache: FsCache, uid: str, mode: Mode):
self._cache = cache
self._mode = mode
self._id = uid
self._path = None
self._meta: Optional[Object.Metadata] = None
self._stack: Optional[contextlib.ExitStack] = None
def _open_for_reading(self):
name = self._stack.enter_context(
self._cache.load(self.id)
)
self._path = os.path.join(self._cache, name)
def _open_for_writing(self):
name = self._stack.enter_context(
self._cache.stage()
)
self._path = os.path.join(self._cache, name)
os.makedirs(os.path.join(self._path, "tree"))
def __enter__(self):
assert not self.active
self._stack = contextlib.ExitStack()
if self.mode == Object.Mode.READ:
self._open_for_reading()
else:
self._open_for_writing()
# Expose our base path as `os.PathLike` via `PathAdater`
# so any changes to it, e.g. via `store_tree`, will be
# automatically picked up by `Metadata`.
wrapped = PathAdapter(self, "_path")
self._meta = self.Metadata(wrapped, folder="meta")
return self
def __exit__(self, exc_type, exc_value, exc_tb):
assert self.active
self.cleanup()
@property
def active(self) -> bool:
return self._stack is not None
@property
def id(self) -> Optional[str]:
return self._id
@property
def mode(self) -> Mode:
return self._mode
def init(self, base: "Object"):
"""Initialize the object with the base object"""
self._check_mode(Object.Mode.WRITE)
assert self.active
assert self._path
base.clone(self._path)
@property
def tree(self) -> str:
assert self.active
assert self._path
return os.path.join(self._path, "tree")
@property
def meta(self) -> Metadata:
assert self.active
assert self._meta
return self._meta
def finalize(self):
if self.mode != Object.Mode.WRITE:
return
# put the object into the READER state
self._mode = Object.Mode.READ
def cleanup(self):
if self._stack:
self._stack.close()
self._stack = None
def _check_mode(self, want: Mode):
"""Internal: Raise a ValueError if we are not in the desired mode"""
if self.mode != want:
raise ValueError(f"Wrong object mode: {self.mode}, want {want}")
def export(self, to_directory: PathLike):
"""Copy object into an external directory"""
subprocess.run(
[
"cp",
"--reflink=auto",
"-a",
os.fspath(self.tree) + "/.",
os.fspath(to_directory),
],
check=True,
)
def clone(self, to_directory: PathLike):
"""Clone the object to the specified directory"""
assert self._path
subprocess.run(
[
"cp",
"--reflink=auto",
"-a",
os.fspath(self._path) + "/.",
os.fspath(to_directory),
],
check=True,
)
def __fspath__(self):
return self.tree
class HostTree:
"""Read-only access to the host file system
An object that provides the same interface as
`objectstore.Object` that can be used to read
the host file-system.
"""
_root: Optional[tempfile.TemporaryDirectory]
def __init__(self, store):
self.store = store
self._root = None
self.init()
def init(self):
if self._root:
return
self._root = self.store.tempdir(prefix="host")
root = self._root.name
# Create a bare bones root file system
# with just /usr mounted from the host
usr = os.path.join(root, "usr")
os.makedirs(usr)
# ensure / is read-only
mount(root, root)
mount("/usr", usr)
@property
def tree(self) -> os.PathLike:
if not self._root:
raise AssertionError("HostTree not initialized")
return self._root.name
def cleanup(self):
if self._root:
umount(self._root.name)
self._root.cleanup()
self._root = None
def __fspath__(self) -> os.PathLike:
return self.tree
class ObjectStore(contextlib.AbstractContextManager):
def __init__(self, store: PathLike):
self.cache = FsCache("osbuild", store)
self.tmp = os.path.join(store, "tmp")
os.makedirs(self.store, exist_ok=True)
os.makedirs(self.objects, exist_ok=True)
os.makedirs(self.tmp, exist_ok=True)
self._objs: Set[Object] = set()
self._host_tree: Optional[HostTree] = None
self._stack = contextlib.ExitStack()
def _get_floating(self, object_id: str) -> Optional[Object]:
"""Internal: get a non-committed object"""
for obj in self._objs:
if obj.mode == Object.Mode.READ and obj.id == object_id:
return obj
return None
@property
def maximum_size(self) -> Optional[Union[int, str]]:
info = self.cache.info
return info.maximum_size
@maximum_size.setter
def maximum_size(self, size: Union[int, str]):
info = FsCacheInfo(maximum_size=size)
self.cache.info = info
@property
def active(self) -> bool:
#pylint: disable=protected-access
return self.cache._is_active()
@property
def store(self):
return os.fspath(self.cache)
@property
def objects(self):
return os.path.join(self.cache, "objects")
@property
def host_tree(self) -> HostTree:
assert self.active
if not self._host_tree:
self._host_tree = HostTree(self)
return self._host_tree
def contains(self, object_id):
if not object_id:
return False
if self._get_floating(object_id):
return True
try:
with self.cache.load(object_id):
return True
except FsCache.MissError:
return False
def tempdir(self, prefix=None, suffix=None):
"""Return a tempfile.TemporaryDirectory within the store"""
return tempfile.TemporaryDirectory(dir=self.tmp,
prefix=prefix,
suffix=suffix)
def get(self, object_id):
assert self.active
obj = self._get_floating(object_id)
if obj:
return obj
try:
obj = Object(self.cache, object_id, Object.Mode.READ)
self._stack.enter_context(obj)
return obj
except FsCache.MissError:
return None
def new(self, object_id: str):
"""Creates a new `Object` and open it for writing.
It returns a instance of `Object` that can be used to
write tree and metadata. Use `commit` to attempt to
store the object in the cache.
"""
assert self.active
obj = Object(self.cache, object_id, Object.Mode.WRITE)
self._stack.enter_context(obj)
self._objs.add(obj)
return obj
def commit(self, obj: Object, object_id: str):
"""Commits the Object to the object cache as `object_id`.
Attempts to store the contents of `obj` and its metadata
in the object cache. Whether anything is actually stored
depends on the configuration of the cache, i.e. its size
and how much free space is left or can be made available.
Therefore the caller should not assume that the stored
object can be retrived at all.
"""
assert self.active
with self.cache.store(object_id) as name:
path = os.path.join(self.cache, name)
obj.clone(path)
def cleanup(self):
"""Cleanup all created Objects that are still alive"""
if self._host_tree:
self._host_tree.cleanup()
self._host_tree = None
self._stack.close()
self._objs = set()
def __fspath__(self):
return os.fspath(self.store)
def __enter__(self):
assert not self.active
self._stack.enter_context(self.cache)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
assert self.active
self.cleanup()
class StoreServer(api.BaseAPI):
endpoint = "store"
def __init__(self, store: ObjectStore, *, socket_address=None):
super().__init__(socket_address)
self.store = store
self.tmproot = store.tempdir(prefix="store-server-")
self._stack = contextlib.ExitStack()
def _cleanup(self):
self.tmproot.cleanup()
self.tmproot = None
self._stack.close()
self._stack = None
def _read_tree(self, msg, sock):
object_id = msg["object-id"]
obj = self.store.get(object_id)
if not obj:
sock.send({"path": None})
return
sock.send({"path": obj.tree})
def _read_tree_at(self, msg, sock):
object_id = msg["object-id"]
target = msg["target"]
subtree = msg["subtree"]
obj = self.store.get(object_id)
if not obj:
sock.send({"path": None})
return
try:
source = os.path.join(obj, subtree.lstrip("/"))
mount(source, target)
self._stack.callback(umount, target)
# pylint: disable=broad-except
except Exception as e:
sock.send({"error": str(e)})
return
sock.send({"path": target})
def _mkdtemp(self, msg, sock):
args = {
"suffix": msg.get("suffix"),
"prefix": msg.get("prefix"),
"dir": self.tmproot.name
}
path = tempfile.mkdtemp(**args)
sock.send({"path": path})
def _source(self, msg, sock):
name = msg["name"]
base = self.store.store
path = os.path.join(base, "sources", name)
sock.send({"path": path})
def _message(self, msg, _fds, sock):
if msg["method"] == "read-tree":
self._read_tree(msg, sock)
elif msg["method"] == "read-tree-at":
self._read_tree_at(msg, sock)
elif msg["method"] == "mkdtemp":
self._mkdtemp(msg, sock)
elif msg["method"] == "source":
self._source(msg, sock)
else:
raise ValueError("Invalid RPC call", msg)
class StoreClient:
def __init__(self, connect_to="/run/osbuild/api/store"):
self.client = jsoncomm.Socket.new_client(connect_to)
def __del__(self):
if self.client is not None:
self.client.close()
def mkdtemp(self, suffix=None, prefix=None):
msg = {
"method": "mkdtemp",
"suffix": suffix,
"prefix": prefix
}
self.client.send(msg)
msg, _, _ = self.client.recv()
return msg["path"]
def read_tree(self, object_id: str):
msg = {
"method": "read-tree",
"object-id": object_id
}
self.client.send(msg)
msg, _, _ = self.client.recv()
return msg["path"]
def read_tree_at(self, object_id: str, target: str, path="/"):
msg = {
"method": "read-tree-at",
"object-id": object_id,
"target": os.fspath(target),
"subtree": os.fspath(path)
}
self.client.send(msg)
msg, _, _ = self.client.recv()
err = msg.get("error")
if err:
raise RuntimeError(err)
return msg["path"]
def source(self, name: str) -> str:
msg = {
"method": "source",
"name": name
}
self.client.send(msg)
msg, _, _ = self.client.recv()
return msg["path"]