`tox` is a standard testing tool for Python projects, this allows you to test locally with all your installed Python version with the following command: `tox -m test -p all` To run the tests in parallel for all supported Python versions. To run linters or type analysis: ``` tox -m lint -p all tox -m type -p all ``` This commit *also* disables the `import-error` warning from `pylint`, not all Python versions have the system-installed Python libraries available and they can't be fetched from PyPI. Some linters have been added and the general order linters run in has been changed. This allows for quicker test failure when running `tox -m lint`. As a consequence the `test_pylint` test has been removed as it's role can now be fulfilled by `tox`. Other assorted linter fixes due to newer versions: - use a str.join method (`consider-using-join`) - fix various (newer) mypy and pylint issues - comments starting with `#` and no space due to `autopep8` This also changes our CI to use the new `tox` setup and on top of that pins the versions of linters used. This might move into separate requirements.txt files later on to allow for easier updating of those dependencies.
584 lines
16 KiB
Python
584 lines
16 KiB
Python
import contextlib
|
|
import enum
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
import time
|
|
from typing import Any, Optional, Set, Union
|
|
|
|
from osbuild.util import jsoncomm
|
|
from osbuild.util.fscache import FsCache, FsCacheInfo
|
|
from osbuild.util.mnt import mount, umount
|
|
from osbuild.util.path import clamp_mtime
|
|
from osbuild.util.types import PathLike
|
|
|
|
from . import api
|
|
|
|
__all__ = [
|
|
"ObjectStore",
|
|
]
|
|
|
|
|
|
class PathAdapter:
|
|
"""Expose an object attribute as `os.PathLike`"""
|
|
|
|
def __init__(self, obj: Any, attr: str) -> None:
|
|
self.obj = obj
|
|
self.attr = attr
|
|
|
|
def __fspath__(self):
|
|
return getattr(self.obj, self.attr)
|
|
|
|
|
|
class Object:
|
|
class Mode(enum.Enum):
|
|
READ = 0
|
|
WRITE = 1
|
|
|
|
class Metadata:
|
|
"""store and retrieve metadata for an object"""
|
|
|
|
def __init__(self, base, folder: Optional[str] = None) -> None:
|
|
self.base = base
|
|
self.folder = folder
|
|
os.makedirs(self.path, exist_ok=True)
|
|
|
|
def _path_for_key(self, key) -> str:
|
|
assert key
|
|
name = f"{key}.json"
|
|
return os.path.join(self.path, name)
|
|
|
|
@property
|
|
def path(self):
|
|
if not self.folder:
|
|
return self.base
|
|
return os.path.join(self.base, self.folder)
|
|
|
|
@contextlib.contextmanager
|
|
def write(self, key):
|
|
|
|
tmp = tempfile.NamedTemporaryFile(
|
|
mode="w",
|
|
encoding="utf8",
|
|
dir=self.path,
|
|
prefix=".",
|
|
suffix=".tmp.json",
|
|
delete=True,
|
|
)
|
|
|
|
with tmp as f:
|
|
yield f
|
|
|
|
f.flush()
|
|
|
|
# if nothing was written to the file
|
|
si = os.stat(tmp.name)
|
|
if si.st_size == 0:
|
|
return
|
|
|
|
dest = self._path_for_key(key)
|
|
# ensure it is proper json?
|
|
os.link(tmp.name, dest)
|
|
|
|
@contextlib.contextmanager
|
|
def read(self, key):
|
|
dest = self._path_for_key(key)
|
|
try:
|
|
with open(dest, "r", encoding="utf8") as f:
|
|
yield f
|
|
except FileNotFoundError:
|
|
raise KeyError(f"No metadata for '{key}'") from None
|
|
|
|
def set(self, key: str, data):
|
|
|
|
if data is None:
|
|
return
|
|
|
|
with self.write(key) as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
def get(self, key: str):
|
|
with contextlib.suppress(KeyError):
|
|
with self.read(key) as f:
|
|
return json.load(f)
|
|
return None
|
|
|
|
def __fspath__(self):
|
|
return self.path
|
|
|
|
def __init__(self, cache: FsCache, uid: str, mode: Mode):
|
|
self._cache = cache
|
|
self._mode = mode
|
|
self._id = uid
|
|
self._path = None
|
|
self._meta: Optional[Object.Metadata] = None
|
|
self._stack: Optional[contextlib.ExitStack] = None
|
|
self.source_epoch = None # see finalize()
|
|
|
|
def _open_for_reading(self):
|
|
name = self._stack.enter_context(
|
|
self._cache.load(self.id)
|
|
)
|
|
self._path = os.path.join(self._cache, name)
|
|
|
|
def _open_for_writing(self):
|
|
name = self._stack.enter_context(
|
|
self._cache.stage()
|
|
)
|
|
self._path = os.path.join(self._cache, name)
|
|
os.makedirs(os.path.join(self._path, "tree"))
|
|
|
|
def __enter__(self):
|
|
assert not self.active
|
|
self._stack = contextlib.ExitStack()
|
|
if self.mode == Object.Mode.READ:
|
|
self._open_for_reading()
|
|
else:
|
|
self._open_for_writing()
|
|
|
|
# Expose our base path as `os.PathLike` via `PathAdater`
|
|
# so any changes to it, e.g. via `store_tree`, will be
|
|
# automatically picked up by `Metadata`.
|
|
wrapped = PathAdapter(self, "_path")
|
|
self._meta = self.Metadata(wrapped, folder="meta")
|
|
|
|
if self.mode == Object.Mode.WRITE:
|
|
self.meta.set("info", {
|
|
"created": int(time.time()),
|
|
})
|
|
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, exc_tb):
|
|
assert self.active
|
|
self.cleanup()
|
|
|
|
@property
|
|
def active(self) -> bool:
|
|
return self._stack is not None
|
|
|
|
@property
|
|
def id(self) -> Optional[str]:
|
|
return self._id
|
|
|
|
@property
|
|
def mode(self) -> Mode:
|
|
return self._mode
|
|
|
|
def init(self, base: "Object"):
|
|
"""Initialize the object with the base object"""
|
|
self._check_mode(Object.Mode.WRITE)
|
|
assert self.active
|
|
assert self._path
|
|
|
|
subprocess.run(
|
|
[
|
|
"cp",
|
|
"--reflink=auto",
|
|
"-a",
|
|
os.fspath(base.path) + "/.",
|
|
os.fspath(self.path),
|
|
],
|
|
check=True,
|
|
)
|
|
|
|
@property
|
|
def path(self) -> str:
|
|
assert self.active
|
|
assert self._path
|
|
return self._path
|
|
|
|
@property
|
|
def tree(self) -> str:
|
|
return os.path.join(self.path, "tree")
|
|
|
|
@property
|
|
def meta(self) -> Metadata:
|
|
assert self.active
|
|
assert self._meta
|
|
return self._meta
|
|
|
|
@property
|
|
def created(self) -> int:
|
|
"""When was the object created
|
|
|
|
It is stored as `created` in the `info` metadata entry,
|
|
and thus will also get overwritten if the metadata gets
|
|
overwritten via `init()`.
|
|
NB: only valid to access when the object is active.
|
|
"""
|
|
info = self.meta.get("info")
|
|
assert info, "info metadata missing"
|
|
return info["created"]
|
|
|
|
def clamp_mtime(self):
|
|
"""Clamp mtime of files and dirs to source_epoch
|
|
|
|
If a source epoch is specified we clamp all files that
|
|
are newer then our own creation timestap to the given
|
|
source epoch. As a result all files created during the
|
|
build should receive the source epoch modification time
|
|
"""
|
|
if self.source_epoch is None:
|
|
return
|
|
|
|
clamp_mtime(self.tree, self.created, self.source_epoch)
|
|
|
|
def finalize(self):
|
|
if self.mode != Object.Mode.WRITE:
|
|
return
|
|
|
|
self.clamp_mtime()
|
|
|
|
# put the object into the READER state
|
|
self._mode = Object.Mode.READ
|
|
|
|
def cleanup(self):
|
|
if self._stack:
|
|
self._stack.close()
|
|
self._stack = None
|
|
|
|
def _check_mode(self, want: Mode):
|
|
"""Internal: Raise a ValueError if we are not in the desired mode"""
|
|
if self.mode != want:
|
|
raise ValueError(f"Wrong object mode: {self.mode}, want {want}")
|
|
|
|
def export(self, to_directory: PathLike):
|
|
"""Copy object into an external directory"""
|
|
subprocess.run(
|
|
[
|
|
"cp",
|
|
"--reflink=auto",
|
|
"-a",
|
|
os.fspath(self.tree) + "/.",
|
|
os.fspath(to_directory),
|
|
],
|
|
check=True,
|
|
)
|
|
|
|
def __fspath__(self):
|
|
return self.tree
|
|
|
|
|
|
class HostTree:
|
|
"""Read-only access to the host file system
|
|
|
|
An object that provides the same interface as
|
|
`objectstore.Object` that can be used to read
|
|
the host file-system.
|
|
"""
|
|
|
|
_root: Optional[tempfile.TemporaryDirectory]
|
|
|
|
def __init__(self, store):
|
|
self.store = store
|
|
self._root = None
|
|
self.init()
|
|
|
|
def init(self):
|
|
if self._root:
|
|
return
|
|
|
|
self._root = self.store.tempdir(prefix="host")
|
|
|
|
root = self._root.name
|
|
# Create a bare bones root file system
|
|
# with just /usr mounted from the host
|
|
usr = os.path.join(root, "usr")
|
|
os.makedirs(usr)
|
|
|
|
# ensure / is read-only
|
|
mount(root, root)
|
|
mount("/usr", usr)
|
|
|
|
@property
|
|
def tree(self) -> os.PathLike:
|
|
if not self._root:
|
|
raise AssertionError("HostTree not initialized")
|
|
return self._root.name
|
|
|
|
def cleanup(self):
|
|
if self._root:
|
|
umount(self._root.name)
|
|
self._root.cleanup()
|
|
self._root = None
|
|
|
|
def __fspath__(self) -> os.PathLike:
|
|
return self.tree
|
|
|
|
|
|
class ObjectStore(contextlib.AbstractContextManager):
|
|
def __init__(self, store: PathLike):
|
|
self.cache = FsCache("osbuild", store)
|
|
self.tmp = os.path.join(store, "tmp")
|
|
os.makedirs(self.store, exist_ok=True)
|
|
os.makedirs(self.objects, exist_ok=True)
|
|
os.makedirs(self.tmp, exist_ok=True)
|
|
self._objs: Set[Object] = set()
|
|
self._host_tree: Optional[HostTree] = None
|
|
self._stack = contextlib.ExitStack()
|
|
|
|
def _get_floating(self, object_id: str) -> Optional[Object]:
|
|
"""Internal: get a non-committed object"""
|
|
for obj in self._objs:
|
|
if obj.mode == Object.Mode.READ and obj.id == object_id:
|
|
return obj
|
|
return None
|
|
|
|
@property
|
|
def maximum_size(self) -> Optional[Union[int, str]]:
|
|
info = self.cache.info
|
|
return info.maximum_size
|
|
|
|
@maximum_size.setter
|
|
def maximum_size(self, size: Union[int, str]):
|
|
info = FsCacheInfo(maximum_size=size)
|
|
self.cache.info = info
|
|
|
|
@property
|
|
def active(self) -> bool:
|
|
# pylint: disable=protected-access
|
|
return self.cache._is_active()
|
|
|
|
@property
|
|
def store(self):
|
|
return os.fspath(self.cache)
|
|
|
|
@property
|
|
def objects(self):
|
|
return os.path.join(self.cache, "objects")
|
|
|
|
@property
|
|
def host_tree(self) -> HostTree:
|
|
assert self.active
|
|
|
|
if not self._host_tree:
|
|
self._host_tree = HostTree(self)
|
|
return self._host_tree
|
|
|
|
def contains(self, object_id):
|
|
if not object_id:
|
|
return False
|
|
|
|
if self._get_floating(object_id):
|
|
return True
|
|
|
|
try:
|
|
with self.cache.load(object_id):
|
|
return True
|
|
except FsCache.MissError:
|
|
return False
|
|
|
|
def tempdir(self, prefix=None, suffix=None):
|
|
"""Return a tempfile.TemporaryDirectory within the store"""
|
|
return tempfile.TemporaryDirectory(dir=self.tmp,
|
|
prefix=prefix,
|
|
suffix=suffix)
|
|
|
|
def get(self, object_id):
|
|
assert self.active
|
|
|
|
obj = self._get_floating(object_id)
|
|
if obj:
|
|
return obj
|
|
|
|
try:
|
|
obj = Object(self.cache, object_id, Object.Mode.READ)
|
|
self._stack.enter_context(obj)
|
|
return obj
|
|
except FsCache.MissError:
|
|
return None
|
|
|
|
def new(self, object_id: str):
|
|
"""Creates a new `Object` and open it for writing.
|
|
|
|
It returns a instance of `Object` that can be used to
|
|
write tree and metadata. Use `commit` to attempt to
|
|
store the object in the cache.
|
|
"""
|
|
assert self.active
|
|
|
|
obj = Object(self.cache, object_id, Object.Mode.WRITE)
|
|
self._stack.enter_context(obj)
|
|
|
|
self._objs.add(obj)
|
|
|
|
return obj
|
|
|
|
def commit(self, obj: Object, object_id: str):
|
|
"""Commits the Object to the object cache as `object_id`.
|
|
|
|
Attempts to store the contents of `obj` and its metadata
|
|
in the object cache. Whether anything is actually stored
|
|
depends on the configuration of the cache, i.e. its size
|
|
and how much free space is left or can be made available.
|
|
Therefore the caller should not assume that the stored
|
|
object can be retrived at all.
|
|
"""
|
|
|
|
assert self.active
|
|
|
|
# we clamp the mtime of `obj` itself so that it
|
|
# resuming a snapshop and building with a snapshot
|
|
# goes through the same code path
|
|
obj.clamp_mtime()
|
|
|
|
self.cache.store_tree(object_id, obj.path + "/.")
|
|
|
|
def cleanup(self):
|
|
"""Cleanup all created Objects that are still alive"""
|
|
if self._host_tree:
|
|
self._host_tree.cleanup()
|
|
self._host_tree = None
|
|
|
|
self._stack.close()
|
|
self._objs = set()
|
|
|
|
def __fspath__(self):
|
|
return os.fspath(self.store)
|
|
|
|
def __enter__(self):
|
|
assert not self.active
|
|
self._stack.enter_context(self.cache)
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
assert self.active
|
|
self.cleanup()
|
|
|
|
|
|
class StoreServer(api.BaseAPI):
|
|
|
|
endpoint = "store"
|
|
|
|
def __init__(self, store: ObjectStore, *, socket_address=None):
|
|
super().__init__(socket_address)
|
|
self.store = store
|
|
self.tmproot = store.tempdir(prefix="store-server-")
|
|
self._stack = contextlib.ExitStack()
|
|
|
|
def _cleanup(self):
|
|
self.tmproot.cleanup()
|
|
self.tmproot = None
|
|
self._stack.close()
|
|
self._stack = None
|
|
|
|
def _read_tree(self, msg, sock):
|
|
object_id = msg["object-id"]
|
|
obj = self.store.get(object_id)
|
|
if not obj:
|
|
sock.send({"path": None})
|
|
return
|
|
|
|
sock.send({"path": obj.tree})
|
|
|
|
def _read_tree_at(self, msg, sock):
|
|
object_id = msg["object-id"]
|
|
target = msg["target"]
|
|
subtree = msg["subtree"]
|
|
|
|
obj = self.store.get(object_id)
|
|
if not obj:
|
|
sock.send({"path": None})
|
|
return
|
|
|
|
try:
|
|
source = os.path.join(obj, subtree.lstrip("/"))
|
|
mount(source, target)
|
|
self._stack.callback(umount, target)
|
|
|
|
# pylint: disable=broad-except
|
|
except Exception as e:
|
|
sock.send({"error": str(e)})
|
|
return
|
|
|
|
sock.send({"path": target})
|
|
|
|
def _mkdtemp(self, msg, sock):
|
|
args = {
|
|
"suffix": msg.get("suffix"),
|
|
"prefix": msg.get("prefix"),
|
|
"dir": self.tmproot.name
|
|
}
|
|
|
|
path = tempfile.mkdtemp(**args)
|
|
sock.send({"path": path})
|
|
|
|
def _source(self, msg, sock):
|
|
name = msg["name"]
|
|
base = self.store.store
|
|
path = os.path.join(base, "sources", name)
|
|
sock.send({"path": path})
|
|
|
|
def _message(self, msg, _fds, sock):
|
|
if msg["method"] == "read-tree":
|
|
self._read_tree(msg, sock)
|
|
elif msg["method"] == "read-tree-at":
|
|
self._read_tree_at(msg, sock)
|
|
elif msg["method"] == "mkdtemp":
|
|
self._mkdtemp(msg, sock)
|
|
elif msg["method"] == "source":
|
|
self._source(msg, sock)
|
|
else:
|
|
raise ValueError("Invalid RPC call", msg)
|
|
|
|
|
|
class StoreClient:
|
|
def __init__(self, connect_to="/run/osbuild/api/store"):
|
|
self.client = jsoncomm.Socket.new_client(connect_to)
|
|
|
|
def __del__(self):
|
|
if self.client is not None:
|
|
self.client.close()
|
|
|
|
def mkdtemp(self, suffix=None, prefix=None):
|
|
msg = {
|
|
"method": "mkdtemp",
|
|
"suffix": suffix,
|
|
"prefix": prefix
|
|
}
|
|
|
|
self.client.send(msg)
|
|
msg, _, _ = self.client.recv()
|
|
|
|
return msg["path"]
|
|
|
|
def read_tree(self, object_id: str):
|
|
msg = {
|
|
"method": "read-tree",
|
|
"object-id": object_id
|
|
}
|
|
|
|
self.client.send(msg)
|
|
msg, _, _ = self.client.recv()
|
|
|
|
return msg["path"]
|
|
|
|
def read_tree_at(self, object_id: str, target: str, path="/"):
|
|
msg = {
|
|
"method": "read-tree-at",
|
|
"object-id": object_id,
|
|
"target": os.fspath(target),
|
|
"subtree": os.fspath(path)
|
|
}
|
|
|
|
self.client.send(msg)
|
|
msg, _, _ = self.client.recv()
|
|
|
|
err = msg.get("error")
|
|
if err:
|
|
raise RuntimeError(err)
|
|
|
|
return msg["path"]
|
|
|
|
def source(self, name: str) -> str:
|
|
msg = {
|
|
"method": "source",
|
|
"name": name
|
|
}
|
|
|
|
self.client.send(msg)
|
|
msg, _, _ = self.client.recv()
|
|
|
|
return msg["path"]
|