Since Object knows its base now, the initialization of the tree with the content of its base can be delayed until the moment someone wants to actually modify the tree, thus implementing copy on write semantics. For this a new `write` method is added that will initialize the base and return the writable tree. It should be used instead of `path` whenever the a client wants to write to the tree of the Object. Adapt the pipeline and the tests to use the new `write` method in all the appropriate places. NB: since the intention can not be inferred when using `path` directly, the Object is still being initialized there.
98 lines
3.4 KiB
Python
98 lines
3.4 KiB
Python
import os
|
|
import shutil
|
|
import tempfile
|
|
import unittest
|
|
|
|
from pathlib import Path
|
|
from osbuild import objectstore
|
|
|
|
|
|
class TestObjectStore(unittest.TestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.store = os.getenv("OSBUILD_TEST_STORE")
|
|
if not cls.store:
|
|
cls.store = tempfile.mkdtemp(prefix="osbuild-test-", dir="/var/tmp")
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
if not os.getenv("OSBUILD_TEST_STORE"):
|
|
shutil.rmtree(cls.store)
|
|
|
|
def test_basic(self):
|
|
# always use a temporary store so item counting works
|
|
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp:
|
|
object_store = objectstore.ObjectStore(tmp)
|
|
with object_store.new("a") as tree:
|
|
path = tree.write()
|
|
p = Path(f"{path}/A")
|
|
p.touch()
|
|
|
|
assert os.path.exists(f"{object_store.refs}/a")
|
|
assert os.path.exists(f"{object_store.refs}/a/A")
|
|
assert len(os.listdir(object_store.refs)) == 1
|
|
assert len(os.listdir(object_store.objects)) == 1
|
|
assert len(os.listdir(f"{object_store.refs}/a/")) == 1
|
|
|
|
with object_store.new("b") as tree:
|
|
path = tree.write()
|
|
p = Path(f"{path}/A")
|
|
p.touch()
|
|
p = Path(f"{path}/B")
|
|
p.touch()
|
|
|
|
assert os.path.exists(f"{object_store.refs}/b")
|
|
assert os.path.exists(f"{object_store.refs}/b/B")
|
|
|
|
assert len(os.listdir(object_store.refs)) == 2
|
|
assert len(os.listdir(object_store.objects)) == 2
|
|
assert len(os.listdir(f"{object_store.refs}/b/")) == 2
|
|
|
|
def test_duplicate(self):
|
|
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp:
|
|
object_store = objectstore.ObjectStore(tmp)
|
|
with object_store.new("a") as tree:
|
|
path = tree.write()
|
|
p = Path(f"{path}/A")
|
|
p.touch()
|
|
|
|
with object_store.new("b") as tree:
|
|
path = tree.write()
|
|
shutil.copy2(f"{object_store.refs}/a/A",
|
|
f"{path}/A")
|
|
|
|
assert os.path.exists(f"{object_store.refs}/a")
|
|
assert os.path.exists(f"{object_store.refs}/a/A")
|
|
assert os.path.exists(f"{object_store.refs}/b/A")
|
|
|
|
assert len(os.listdir(object_store.refs)) == 2
|
|
assert len(os.listdir(object_store.objects)) == 1
|
|
assert len(os.listdir(f"{object_store.refs}/a/")) == 1
|
|
assert len(os.listdir(f"{object_store.refs}/b/")) == 1
|
|
|
|
def test_snapshot(self):
|
|
object_store = objectstore.ObjectStore(self.store)
|
|
with object_store.new("b") as tree:
|
|
path = tree.write()
|
|
p = Path(f"{path}/A")
|
|
p.touch()
|
|
object_store.snapshot(tree, "a")
|
|
path = tree.write()
|
|
p = Path(f"{path}/B")
|
|
p.touch()
|
|
|
|
# check the references exist
|
|
assert os.path.exists(f"{object_store.refs}/a")
|
|
assert os.path.exists(f"{object_store.refs}/b")
|
|
|
|
# check the contents of the trees
|
|
assert os.path.exists(f"{object_store.refs}/a/A")
|
|
assert not os.path.exists(f"{object_store.refs}/a/B")
|
|
|
|
assert os.path.exists(f"{object_store.refs}/b/A")
|
|
assert os.path.exists(f"{object_store.refs}/b/B")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|