From 15b7b6fab30cbf5ab9bba8cfd2ce5b0e6b233735 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Hozza?= Date: Mon, 29 May 2023 11:39:01 +0200 Subject: [PATCH] test/stages: add helper function for testing partitioning stages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refactor unit test implementation for `parted`, `sfdisk` and `sgdisk` stages by extracting the common parts into a helper function. Each stage now implements only its own function for filtering `sfdisk --json` output and calls the common helper function. In addition, flip the oder when comparing the expected and actual output from `sfdisk --json`, to make it more easier to comprehend. Signed-off-by: Tomáš Hozza --- test/run/test_stages.py | 135 +++++++++++----------------------------- 1 file changed, 37 insertions(+), 98 deletions(-) diff --git a/test/run/test_stages.py b/test/run/test_stages.py index 588f9828..df00b64c 100644 --- a/test/run/test_stages.py +++ b/test/run/test_stages.py @@ -15,7 +15,7 @@ import tempfile import unittest import xml from collections.abc import Mapping -from typing import Dict +from typing import Callable, Dict, Optional from osbuild.util import checksum, selinux @@ -366,9 +366,16 @@ class TestStages(test.TestBase): osb.copy_source_data(self.store, "org.osbuild.files") @unittest.skipUnless(have_sfdisk_with_json(), "Need sfdisk with JSON support") - def test_parted(self): + def _test_partitioning_stage(self, stage_name, sfdisk_out_filter_fn: Optional[Callable[[Dict], Dict]] = None): + """ + Helper function for testing partitioning stages. + + :param stage_name: Name of the partitioning stage to test + :param sfdisk_out_filter_fn: Optional function to filter the output of sfdisk + before comparing it with the expected output. + """ datadir = self.locate_test_data() - testdir = os.path.join(datadir, "stages", "parted") + testdir = os.path.join(datadir, "stages", stage_name) imgname = "disk.img" @@ -377,7 +384,7 @@ class TestStages(test.TestBase): with self.osbuild as osb, tempfile.TemporaryDirectory(dir="/var/tmp") as outdir: - osb.compile_file(os.path.join(testdir, "parted.json"), + osb.compile_file(os.path.join(testdir, f"{stage_name}.json"), checkpoints=["tree"], exports=["tree"], output_dir=outdir) @@ -394,130 +401,62 @@ class TestStages(test.TestBase): have = json.loads(r.stdout) - table = have["partitiontable"] + if sfdisk_out_filter_fn is not None: + have = sfdisk_out_filter_fn(have) + # Old versions of sfdisk (e.g. on RHEL-8), do not include + # the 'sectorsize' in the output, so we delete it from the + # expected output if it is not present in the actual output + if "sectorsize" not in have["partitiontable"]: + del want["partitiontable"]["sectorsize"] + + self.assertEqual(want, have) + + # cache the downloaded data for the files source + osb.copy_source_data(self.store, "org.osbuild.files") + + def test_parted(self): + def filter_sfdisk_output(sfdisk_output: Dict) -> Dict: + table = sfdisk_output["partitiontable"] # remove entries that are not stable across `parted` # invocations: "device", "id" and uuids in general if "device" in table: del table["device"] if "id" in table: del table["id"] - for p in table["partitions"]: if "uuid" in p: del p["uuid"] p["node"] = os.path.basename(p["node"]) + return sfdisk_output - # Old versions of sfdisk (e.g. on RHEL-8), do not include - # the 'sectorsize' in the output, so we delete it from the - # expected output if it is not present in the actual output - if "sectorsize" not in table: - del want["partitiontable"]["sectorsize"] + self._test_partitioning_stage("parted", filter_sfdisk_output) - self.assertEqual(have, want) - - # cache the downloaded data for the files source - osb.copy_source_data(self.store, "org.osbuild.files") - - @unittest.skipUnless(have_sfdisk_with_json(), "Need sfdisk with JSON support") def test_sgdisk(self): - datadir = self.locate_test_data() - testdir = os.path.join(datadir, "stages", "sgdisk") - - imgname = "disk.img" - - with open(os.path.join(testdir, f"{imgname}.json"), "r", encoding="utf8") as f: - want = json.load(f) - - with self.osbuild as osb, tempfile.TemporaryDirectory(dir="/var/tmp") as outdir: - - osb.compile_file(os.path.join(testdir, "sgdisk.json"), - checkpoints=["tree"], - exports=["tree"], - output_dir=outdir) - - target = os.path.join(outdir, "tree", imgname) - - assert os.path.exists(target) - - r = subprocess.run(["sfdisk", "--json", target], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - encoding="utf8", - check=False) - - have = json.loads(r.stdout) - - table = have["partitiontable"] - + def filter_sfdisk_output(sfdisk_output: Dict) -> Dict: + table = sfdisk_output["partitiontable"] # remove entries that are not stable across `parted` # invocations: "device", "id" if "device" in table: del table["device"] - for p in table["partitions"]: p["node"] = os.path.basename(p["node"]) + return sfdisk_output - # Old versions of sfdisk (e.g. on RHEL-8), do not include - # the 'sectorsize' in the output, so we delete it from the - # expected output if it is not present in the actual output - if "sectorsize" not in table: - del want["partitiontable"]["sectorsize"] + self._test_partitioning_stage("sgdisk", filter_sfdisk_output) - self.assertEqual(have, want) - - # cache the downloaded data for the files source - osb.copy_source_data(self.store, "org.osbuild.files") - - @unittest.skipUnless(have_sfdisk_with_json(), "Need sfdisk with JSON support") def test_sfdisk(self): - datadir = self.locate_test_data() - testdir = os.path.join(datadir, "stages", "sfdisk") - - imgname = "disk.img" - - with open(os.path.join(testdir, f"{imgname}.json"), "r", encoding="utf8") as f: - want = json.load(f) - - with self.osbuild as osb, tempfile.TemporaryDirectory(dir="/var/tmp") as outdir: - - osb.compile_file(os.path.join(testdir, "sfdisk.json"), - checkpoints=["tree"], - exports=["tree"], - output_dir=outdir) - - target = os.path.join(outdir, "tree", imgname) - - assert os.path.exists(target) - - r = subprocess.run(["sfdisk", "--json", target], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - encoding="utf8", - check=False) - - have = json.loads(r.stdout) - - table = have["partitiontable"] - + def filter_sfdisk_output(sfdisk_output: Dict) -> Dict: + table = sfdisk_output["partitiontable"] # remove entries that are not stable across `sfdisk` # invocations: "device" if "device" in table: del table["device"] - for p in table["partitions"]: p["node"] = os.path.basename(p["node"]) + return sfdisk_output - # Old versions of sfdisk (e.g. on RHEL-8), do not include - # the 'sectorsize' in the output, so we delete it from the - # expected output if it is not present in the actual output - if "sectorsize" not in table: - del want["partitiontable"]["sectorsize"] - - self.assertEqual(have, want) - - # cache the downloaded data for the files source - osb.copy_source_data(self.store, "org.osbuild.files") + self._test_partitioning_stage("sfdisk", filter_sfdisk_output) def test_ovf(self): datadir = self.locate_test_data()