test/stages: add helper function for testing partitioning stages

Refactor unit test implementation for `parted`, `sfdisk` and `sgdisk`
stages by extracting the common parts into a helper function. Each stage
now implements only its own function for filtering `sfdisk --json`
output and calls the common helper function.

In addition, flip the oder when comparing the expected and actual output
from `sfdisk --json`, to make it more easier to comprehend.

Signed-off-by: Tomáš Hozza <thozza@redhat.com>
This commit is contained in:
Tomáš Hozza 2023-05-29 11:39:01 +02:00 committed by Simon de Vlieger
parent 17ebae23dc
commit 15b7b6fab3

View file

@ -15,7 +15,7 @@ import tempfile
import unittest import unittest
import xml import xml
from collections.abc import Mapping from collections.abc import Mapping
from typing import Dict from typing import Callable, Dict, Optional
from osbuild.util import checksum, selinux from osbuild.util import checksum, selinux
@ -366,9 +366,16 @@ class TestStages(test.TestBase):
osb.copy_source_data(self.store, "org.osbuild.files") osb.copy_source_data(self.store, "org.osbuild.files")
@unittest.skipUnless(have_sfdisk_with_json(), "Need sfdisk with JSON support") @unittest.skipUnless(have_sfdisk_with_json(), "Need sfdisk with JSON support")
def test_parted(self): def _test_partitioning_stage(self, stage_name, sfdisk_out_filter_fn: Optional[Callable[[Dict], Dict]] = None):
"""
Helper function for testing partitioning stages.
:param stage_name: Name of the partitioning stage to test
:param sfdisk_out_filter_fn: Optional function to filter the output of sfdisk
before comparing it with the expected output.
"""
datadir = self.locate_test_data() datadir = self.locate_test_data()
testdir = os.path.join(datadir, "stages", "parted") testdir = os.path.join(datadir, "stages", stage_name)
imgname = "disk.img" imgname = "disk.img"
@ -377,7 +384,7 @@ class TestStages(test.TestBase):
with self.osbuild as osb, tempfile.TemporaryDirectory(dir="/var/tmp") as outdir: with self.osbuild as osb, tempfile.TemporaryDirectory(dir="/var/tmp") as outdir:
osb.compile_file(os.path.join(testdir, "parted.json"), osb.compile_file(os.path.join(testdir, f"{stage_name}.json"),
checkpoints=["tree"], checkpoints=["tree"],
exports=["tree"], exports=["tree"],
output_dir=outdir) output_dir=outdir)
@ -394,130 +401,62 @@ class TestStages(test.TestBase):
have = json.loads(r.stdout) have = json.loads(r.stdout)
table = have["partitiontable"] if sfdisk_out_filter_fn is not None:
have = sfdisk_out_filter_fn(have)
# Old versions of sfdisk (e.g. on RHEL-8), do not include
# the 'sectorsize' in the output, so we delete it from the
# expected output if it is not present in the actual output
if "sectorsize" not in have["partitiontable"]:
del want["partitiontable"]["sectorsize"]
self.assertEqual(want, have)
# cache the downloaded data for the files source
osb.copy_source_data(self.store, "org.osbuild.files")
def test_parted(self):
def filter_sfdisk_output(sfdisk_output: Dict) -> Dict:
table = sfdisk_output["partitiontable"]
# remove entries that are not stable across `parted` # remove entries that are not stable across `parted`
# invocations: "device", "id" and uuids in general # invocations: "device", "id" and uuids in general
if "device" in table: if "device" in table:
del table["device"] del table["device"]
if "id" in table: if "id" in table:
del table["id"] del table["id"]
for p in table["partitions"]: for p in table["partitions"]:
if "uuid" in p: if "uuid" in p:
del p["uuid"] del p["uuid"]
p["node"] = os.path.basename(p["node"]) p["node"] = os.path.basename(p["node"])
return sfdisk_output
# Old versions of sfdisk (e.g. on RHEL-8), do not include self._test_partitioning_stage("parted", filter_sfdisk_output)
# the 'sectorsize' in the output, so we delete it from the
# expected output if it is not present in the actual output
if "sectorsize" not in table:
del want["partitiontable"]["sectorsize"]
self.assertEqual(have, want)
# cache the downloaded data for the files source
osb.copy_source_data(self.store, "org.osbuild.files")
@unittest.skipUnless(have_sfdisk_with_json(), "Need sfdisk with JSON support")
def test_sgdisk(self): def test_sgdisk(self):
datadir = self.locate_test_data() def filter_sfdisk_output(sfdisk_output: Dict) -> Dict:
testdir = os.path.join(datadir, "stages", "sgdisk") table = sfdisk_output["partitiontable"]
imgname = "disk.img"
with open(os.path.join(testdir, f"{imgname}.json"), "r", encoding="utf8") as f:
want = json.load(f)
with self.osbuild as osb, tempfile.TemporaryDirectory(dir="/var/tmp") as outdir:
osb.compile_file(os.path.join(testdir, "sgdisk.json"),
checkpoints=["tree"],
exports=["tree"],
output_dir=outdir)
target = os.path.join(outdir, "tree", imgname)
assert os.path.exists(target)
r = subprocess.run(["sfdisk", "--json", target],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf8",
check=False)
have = json.loads(r.stdout)
table = have["partitiontable"]
# remove entries that are not stable across `parted` # remove entries that are not stable across `parted`
# invocations: "device", "id" # invocations: "device", "id"
if "device" in table: if "device" in table:
del table["device"] del table["device"]
for p in table["partitions"]: for p in table["partitions"]:
p["node"] = os.path.basename(p["node"]) p["node"] = os.path.basename(p["node"])
return sfdisk_output
# Old versions of sfdisk (e.g. on RHEL-8), do not include self._test_partitioning_stage("sgdisk", filter_sfdisk_output)
# the 'sectorsize' in the output, so we delete it from the
# expected output if it is not present in the actual output
if "sectorsize" not in table:
del want["partitiontable"]["sectorsize"]
self.assertEqual(have, want)
# cache the downloaded data for the files source
osb.copy_source_data(self.store, "org.osbuild.files")
@unittest.skipUnless(have_sfdisk_with_json(), "Need sfdisk with JSON support")
def test_sfdisk(self): def test_sfdisk(self):
datadir = self.locate_test_data() def filter_sfdisk_output(sfdisk_output: Dict) -> Dict:
testdir = os.path.join(datadir, "stages", "sfdisk") table = sfdisk_output["partitiontable"]
imgname = "disk.img"
with open(os.path.join(testdir, f"{imgname}.json"), "r", encoding="utf8") as f:
want = json.load(f)
with self.osbuild as osb, tempfile.TemporaryDirectory(dir="/var/tmp") as outdir:
osb.compile_file(os.path.join(testdir, "sfdisk.json"),
checkpoints=["tree"],
exports=["tree"],
output_dir=outdir)
target = os.path.join(outdir, "tree", imgname)
assert os.path.exists(target)
r = subprocess.run(["sfdisk", "--json", target],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf8",
check=False)
have = json.loads(r.stdout)
table = have["partitiontable"]
# remove entries that are not stable across `sfdisk` # remove entries that are not stable across `sfdisk`
# invocations: "device" # invocations: "device"
if "device" in table: if "device" in table:
del table["device"] del table["device"]
for p in table["partitions"]: for p in table["partitions"]:
p["node"] = os.path.basename(p["node"]) p["node"] = os.path.basename(p["node"])
return sfdisk_output
# Old versions of sfdisk (e.g. on RHEL-8), do not include self._test_partitioning_stage("sfdisk", filter_sfdisk_output)
# the 'sectorsize' in the output, so we delete it from the
# expected output if it is not present in the actual output
if "sectorsize" not in table:
del want["partitiontable"]["sectorsize"]
self.assertEqual(have, want)
# cache the downloaded data for the files source
osb.copy_source_data(self.store, "org.osbuild.files")
def test_ovf(self): def test_ovf(self):
datadir = self.locate_test_data() datadir = self.locate_test_data()