objectstore: remove copy on write from object
Remove copy-on-write support from `objectstore.Object`. The main
reason for introducing copy-on-write was to save an additional
copy in the non DAG-pipeline model[1]. With the introduction of
the latter and the explicit `--export` option, we can achieve the
same result without the complexity of copy-on-write semantics.
[1] See commit 39213b7, part of 3b7c87d5..42a365d1 changeset.
This commit is contained in:
parent
afc82ee465
commit
5346025031
3 changed files with 192 additions and 291 deletions
|
|
@ -1,4 +1,5 @@
|
|||
import contextlib
|
||||
import enum
|
||||
import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
|
|
@ -17,63 +18,51 @@ __all__ = [
|
|||
|
||||
|
||||
class Object:
|
||||
def __init__(self, store: "ObjectStore"):
|
||||
self._init = True
|
||||
self._readers = 0
|
||||
self._writer = False
|
||||
self._base: Optional[str] = None
|
||||
|
||||
class Mode(enum.Enum):
|
||||
READ = 0
|
||||
WRITE = 1
|
||||
|
||||
def __init__(self, store: "ObjectStore", uid: str, mode: Mode):
|
||||
self._mode = mode
|
||||
self._workdir = None
|
||||
self._tree = None
|
||||
self.id: Optional[str] = None
|
||||
self._id = uid
|
||||
self.store = store
|
||||
self.reset()
|
||||
|
||||
def init(self) -> None:
|
||||
"""Initialize the object with content of its base"""
|
||||
self._check_writable()
|
||||
self._check_readers()
|
||||
self._check_writer()
|
||||
if self._init:
|
||||
return
|
||||
|
||||
with self.store.new(self._base) as obj:
|
||||
obj.export(self._tree)
|
||||
self._init = True
|
||||
|
||||
@property
|
||||
def base(self) -> Optional[str]:
|
||||
return self._base
|
||||
|
||||
@base.setter
|
||||
def base(self, base_id: Optional[str]):
|
||||
self._init = not base_id
|
||||
self._base = base_id
|
||||
self.id = base_id
|
||||
|
||||
@property
|
||||
def _path(self) -> Optional[str]:
|
||||
if self._base and not self._init:
|
||||
path = self.store.resolve_ref(self._base)
|
||||
if self.mode == Object.Mode.READ:
|
||||
path = self.store.resolve_ref(uid)
|
||||
assert path is not None
|
||||
self._path = path
|
||||
else:
|
||||
path = self._tree
|
||||
return path
|
||||
workdir = self.tempdir("workdir")
|
||||
self._path = os.path.join(workdir.name, "object")
|
||||
os.makedirs(self._path)
|
||||
self._workdir = workdir
|
||||
|
||||
@property
|
||||
def id(self) -> Optional[str]:
|
||||
return self._id
|
||||
|
||||
@property
|
||||
def mode(self) -> Mode:
|
||||
return self._mode
|
||||
|
||||
def init(self, base: "Object"):
|
||||
"""Initialize the object with the base object"""
|
||||
self._check_mode(Object.Mode.WRITE)
|
||||
base.clone(self._path)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def write(self) -> Iterator[str]:
|
||||
"""Return a path that can be written to"""
|
||||
self._check_writable()
|
||||
self._check_readers()
|
||||
self._check_writer()
|
||||
self.init()
|
||||
self.id = None
|
||||
self._check_mode(Object.Mode.WRITE)
|
||||
|
||||
with self.tempdir("writer") as target:
|
||||
mount(self._path, target, ro=False)
|
||||
try:
|
||||
self._writer = True
|
||||
yield target
|
||||
finally:
|
||||
umount(target)
|
||||
self._writer = False
|
||||
|
||||
@contextlib.contextmanager
|
||||
def read(self) -> Iterator[PathLike]:
|
||||
|
|
@ -88,8 +77,7 @@ class Object:
|
|||
Map the tree or a part of it specified via `path` at the
|
||||
specified path `target`.
|
||||
"""
|
||||
self._check_writable()
|
||||
self._check_writer()
|
||||
self._check_mode(Object.Mode.READ)
|
||||
|
||||
if self._path is None:
|
||||
raise RuntimeError("read_at with no path.")
|
||||
|
|
@ -98,73 +86,62 @@ class Object:
|
|||
|
||||
mount(path, target)
|
||||
try:
|
||||
self._readers += 1
|
||||
yield target
|
||||
finally:
|
||||
umount(target)
|
||||
self._readers -= 1
|
||||
|
||||
def store_tree(self):
|
||||
"""Store the tree with a fresh name and reset itself
|
||||
"""Store the tree with a fresh name and close it
|
||||
|
||||
Moves the tree atomically by using rename(2), to a
|
||||
randomly generated unique name. Afterwards it resets
|
||||
itself and can be used as if it was new.
|
||||
"""
|
||||
self._check_writable()
|
||||
self._check_readers()
|
||||
self._check_writer()
|
||||
self.init()
|
||||
destination = str(uuid.uuid4())
|
||||
os.rename(self._tree, os.path.join(self.store.objects, destination))
|
||||
self.reset()
|
||||
return destination
|
||||
randomly generated unique name.
|
||||
|
||||
def reset(self):
|
||||
This puts the object into the READ state.
|
||||
"""
|
||||
self._check_mode(Object.Mode.WRITE)
|
||||
|
||||
name = str(uuid.uuid4())
|
||||
|
||||
destination = os.path.join(self.store.objects, name)
|
||||
os.rename(self._path, destination)
|
||||
self._path = destination
|
||||
|
||||
self.finalize()
|
||||
self.cleanup()
|
||||
self._workdir = self.store.tempdir(suffix="object")
|
||||
self._tree = os.path.join(self._workdir.name, "tree")
|
||||
os.makedirs(self._tree, mode=0o755, exist_ok=True)
|
||||
self._init = not self._base
|
||||
|
||||
return name
|
||||
|
||||
def finalize(self):
|
||||
if self.mode != Object.Mode.WRITE:
|
||||
return
|
||||
|
||||
# put the object into the READER state
|
||||
self._mode = Object.Mode.READ
|
||||
|
||||
def cleanup(self):
|
||||
self._check_readers()
|
||||
self._check_writer()
|
||||
if self._tree:
|
||||
workdir = self._workdir
|
||||
if workdir:
|
||||
# manually remove the tree, it might contain
|
||||
# files with immutable flag set, which will
|
||||
# throw off standard Python 3 tempdir cleanup
|
||||
rmrf.rmtree(self._tree)
|
||||
self._tree = None
|
||||
if self._workdir:
|
||||
self._workdir.cleanup()
|
||||
rmrf.rmtree(os.path.join(workdir.name, "object"))
|
||||
|
||||
workdir.cleanup()
|
||||
self._workdir = None
|
||||
self.id = None
|
||||
|
||||
def _check_readers(self):
|
||||
"""Internal: Raise a ValueError if there are readers"""
|
||||
if self._readers:
|
||||
raise ValueError("Read operation is ongoing")
|
||||
|
||||
def _check_writable(self):
|
||||
"""Internal: Raise a ValueError if not writable"""
|
||||
if not self._workdir:
|
||||
raise ValueError("Object is not writable")
|
||||
|
||||
def _check_writer(self):
|
||||
"""Internal: Raise a ValueError if there is a writer"""
|
||||
if self._writer:
|
||||
raise ValueError("Write operation is ongoing")
|
||||
def _check_mode(self, want: Mode):
|
||||
"""Internal: Raise a ValueError if we are not in the desired mode"""
|
||||
if self.mode != want:
|
||||
raise ValueError(f"Wrong object mode: {self.mode}, want {want}")
|
||||
|
||||
def tempdir(self, suffix=None):
|
||||
workdir = self._workdir.name
|
||||
if suffix:
|
||||
suffix = "-" + suffix
|
||||
return tempfile.TemporaryDirectory(dir=workdir,
|
||||
suffix=suffix)
|
||||
name = f"object-{self._id[:7]}-"
|
||||
return self.store.tempdir(prefix=name, suffix=suffix)
|
||||
|
||||
def __enter__(self):
|
||||
self._check_writable()
|
||||
self._check_mode(Object.Mode.WRITE)
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
|
|
@ -186,17 +163,19 @@ class Object:
|
|||
|
||||
def clone(self, to_directory: PathLike):
|
||||
"""Clone the object to the specified directory"""
|
||||
with self.read() as from_directory:
|
||||
subprocess.run(
|
||||
[
|
||||
"cp",
|
||||
"--reflink=auto",
|
||||
"-a",
|
||||
os.fspath(from_directory) + "/.",
|
||||
os.fspath(to_directory),
|
||||
],
|
||||
check=True,
|
||||
)
|
||||
|
||||
assert self._path
|
||||
|
||||
subprocess.run(
|
||||
[
|
||||
"cp",
|
||||
"--reflink=auto",
|
||||
"-a",
|
||||
os.fspath(self._path) + "/.",
|
||||
os.fspath(to_directory),
|
||||
],
|
||||
check=True,
|
||||
)
|
||||
|
||||
|
||||
class HostTree:
|
||||
|
|
@ -248,7 +227,7 @@ class ObjectStore(contextlib.AbstractContextManager):
|
|||
def _get_floating(self, object_id: str) -> Optional[Object]:
|
||||
"""Internal: get a non-committed object"""
|
||||
for obj in self._objs:
|
||||
if obj.id == object_id:
|
||||
if obj.mode == Object.Mode.READ and obj.id == object_id:
|
||||
return obj
|
||||
return None
|
||||
|
||||
|
|
@ -281,11 +260,10 @@ class ObjectStore(contextlib.AbstractContextManager):
|
|||
if not self.contains(object_id):
|
||||
return None
|
||||
|
||||
obj = self.new(base_id=object_id)
|
||||
return obj
|
||||
return Object(self, object_id, Object.Mode.READ)
|
||||
|
||||
def new(self, base_id=None):
|
||||
"""Creates a new temporary `Object`.
|
||||
def new(self, object_id: str):
|
||||
"""Creates a new `Object` and open it for writing.
|
||||
|
||||
It returns a temporary instance of `Object`, the base
|
||||
optionally set to `base_id`. It can be used to interact
|
||||
|
|
@ -295,14 +273,7 @@ class ObjectStore(contextlib.AbstractContextManager):
|
|||
store via `commit()`.
|
||||
"""
|
||||
|
||||
obj = Object(self)
|
||||
|
||||
if base_id:
|
||||
# if we were given a base id then this is the base
|
||||
# content for the new object
|
||||
# NB: `Object` has copy-on-write semantics, so no
|
||||
# copying of the data takes places at this point
|
||||
obj.base = base_id
|
||||
obj = Object(self, object_id, Object.Mode.WRITE)
|
||||
|
||||
self._objs.add(obj)
|
||||
|
||||
|
|
@ -344,11 +315,6 @@ class ObjectStore(contextlib.AbstractContextManager):
|
|||
|
||||
os.replace(link, ref)
|
||||
|
||||
# the reference that is pointing to `object_name` is now the base
|
||||
# of `obj`. It is not actively initialized but any subsequent calls
|
||||
# to `obj.write()` will initialize it again
|
||||
obj.base = object_id
|
||||
|
||||
return object_name
|
||||
|
||||
def cleanup(self):
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue