diff --git a/osbuild/util/ostree.py b/osbuild/util/ostree.py new file mode 100644 index 00000000..f9082212 --- /dev/null +++ b/osbuild/util/ostree.py @@ -0,0 +1,104 @@ +import contextlib +import json +import os +import tempfile + +from typing import List + + +class Param: + """rpm-ostree Treefile parameter""" + + def __init__(self, value_type, mandatory=False): + self.type = value_type + self.mandatory = mandatory + + def check(self, value): + origin = getattr(self.type, "__origin__", None) + if origin: + self.typecheck(value, origin) + if origin is list: + self.check_list(value, self.type) + else: + raise NotImplementedError(origin) + else: + self.typecheck(value, self.type) + + @staticmethod + def check_list(value, tp): + inner = tp.__args__ + for x in value: + Param.typecheck(x, inner) + + @staticmethod + def typecheck(value, tp): + if isinstance(value, tp): + return + raise ValueError(f"{value} is not of {tp}") + + +class Treefile: + """Representation of an rpm-ostree Treefile + + The following parameters are currently supported, + presented together with the rpm-ostree compose + phase that they are used in. + - ref: commit + - repos: install + - selinux: install, postprocess, commit + - boot-location: postprocess + - etc-group-members: postprocess + - machineid-compat + + NB: 'ref' and 'repos' are mandatory and must be + present, even if they are not used in the given + phase; they therefore have defaults preset. + """ + + parameters = { + "ref": Param(str, True), + "repos": Param(List[str], True), + "selinux": Param(bool), + "boot-location": Param(str), + "etc-group-members": Param(List[str]), + "machineid-compat": Param(bool) + } + + def __init__(self): + self._data = {} + self["ref"] = "osbuild/devel" + self["repos"] = ["osbuild"] + + def __getitem__(self, key): + param = self.parameters.get(key) + if not param: + raise ValueError(f"Unknown param: {key}") + return self._data[key] + + def __setitem__(self, key, value): + param = self.parameters.get(key) + if not param: + raise ValueError(f"Unknown param: {key}") + param.check(value) + self._data[key] = value + + def dumps(self): + return json.dumps(self._data) + + def dump(self, fp): + return json.dump(self._data, fp) + + @contextlib.contextmanager + def as_tmp_file(self): + name = None + try: + fd, name = tempfile.mkstemp(suffix=".json", + text=True) + + with os.fdopen(fd, "w+") as f: + self.dump(f) + + yield name + finally: + if name: + os.unlink(name) diff --git a/test/test_util_ostree.py b/test/test_util_ostree.py new file mode 100644 index 00000000..1ffea818 --- /dev/null +++ b/test/test_util_ostree.py @@ -0,0 +1,92 @@ +import json +import unittest +import subprocess + +from osbuild.util import ostree + + +def run(*args, check=True, encoding="utf-8", **kwargs): + res = subprocess.run(*args, + encoding=encoding, + check=check, + **kwargs) + return res + + +def have_rpm_ostree(): + try: + r = run(["rpm-ostree", "--version"], + capture_output=True, check=False) + except FileNotFoundError: + return False + return r.returncode == 0 and "compose" in r.stdout + + +class TestObjectStore(unittest.TestCase): + + @unittest.skipIf(not have_rpm_ostree(), "rpm-ostree missing") + def test_treefile_empty(self): + # check we produce a valid treefile from an empty object + tf = ostree.Treefile() + + with tf.as_tmp_file() as f: + run(["rpm-ostree", "compose", "tree", "--print-only", f]) + + def test_treefile_types(self): + tf = ostree.Treefile() + + tf["repos"] = ["a", "b", "c"] # valid list of strings + tf["selinux"] = True # valid boolean + tf["ref"] = "ref/sample/tip" # valid string + + with self.assertRaises(ValueError): + tf["repos"] = "not a list" # not a list + + with self.assertRaises(ValueError): + tf["repos"] = [1, 2, 3] # not a string list + + with self.assertRaises(ValueError): + tf["selinux"] = "not a bool" # not a boolean + + def test_treefile_dump(self): + tf = ostree.Treefile() + test_ref = "a/sample/ref" + tf["ref"] = test_ref + + with tf.as_tmp_file() as path: + with open(path, "r") as f: + js = json.load(f) + self.assertEqual(js["ref"], test_ref) + self.assertEqual(tf["ref"], test_ref) + + @unittest.skipIf(not have_rpm_ostree(), "rpm-ostree missing") + def test_treefile_full(self): + params = { + "ref": "osbuild/ostree/devel", + "repos": ["fedora", "osbuild"], + "selinux": True, + "boot-location": "new", + "etc-group-members": ["wheel"], + "machineid-compat": True + } + + tf = ostree.Treefile() + for p, v in params.items(): + tf[p] = v + + with tf.as_tmp_file() as path: + r = run(["rpm-ostree", + "compose", + "tree", + "--print-only", + path], + capture_output=True) + self.assertEqual(r.returncode, 0) + js = json.loads(r.stdout) + + for p, v in params.items(): + self.assertEqual(v, js[p]) + + +if __name__ == "__main__": + unittest.main()