Move the last remaining test into the correct subdir. With this done,
all our tests run in one of the 3 groups:
* `make test-src`
Run tests against the source-code, including linters.
* `make test-mod`
Run unit-tests on the individual python modules. This needs no
special permissions (unless noted in each test) or runtime
environments. It is meant to be fast and easy to run in all
circumstances.
* `make test-run`
Run tests that execute the osbuild pipeline. This requires
superuser privileges and will likely take a while. Furthermore,
this might produce large artifacts.
177 lines
7.3 KiB
Python
177 lines
7.3 KiB
Python
#
|
|
# Runtime tests for the individual assemblers.
|
|
#
|
|
|
|
import contextlib
|
|
import glob
|
|
import hashlib
|
|
import json
|
|
import subprocess
|
|
import tempfile
|
|
import time
|
|
import unittest
|
|
|
|
from .. import test
|
|
|
|
|
|
class TestAssemblers(test.TestBase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super().setUpClass()
|
|
subprocess.run(["modprobe", "nbd"], check=False)
|
|
|
|
def setUp(self):
|
|
self.osbuild = test.OSBuild(self)
|
|
|
|
@contextlib.contextmanager
|
|
def run_assembler(self, name, options, output_path):
|
|
with self.osbuild as osb:
|
|
with open("test/pipelines/f30-base.json") as f:
|
|
manifest = json.load(f)
|
|
manifest["pipeline"] = dict(
|
|
manifest["pipeline"],
|
|
assembler={"name": name, "options": options}
|
|
)
|
|
data = json.dumps(manifest)
|
|
|
|
treeid = osb.treeid_from_manifest(data)
|
|
assert treeid
|
|
|
|
osb.compile(data, checkpoints=[treeid])
|
|
with osb.map_object(treeid) as tree, \
|
|
osb.map_output(output_path) as output:
|
|
yield tree, output
|
|
|
|
def assertImageFile(self, filename, fmt, expected_size=None):
|
|
info = json.loads(subprocess.check_output(["qemu-img", "info", "--output", "json", filename]))
|
|
self.assertEqual(info["format"], fmt)
|
|
self.assertEqual(info["virtual-size"], expected_size)
|
|
|
|
def assertFilesystem(self, device, uuid, fstype, tree):
|
|
output = subprocess.check_output(["blkid", "--output", "export", device], encoding="utf-8")
|
|
blkid = dict(line.split("=") for line in output.strip().split("\n"))
|
|
self.assertEqual(blkid["UUID"], uuid)
|
|
self.assertEqual(blkid["TYPE"], fstype)
|
|
|
|
with mount(device) as target_tree:
|
|
diff = self.tree_diff(tree, target_tree)
|
|
self.assertEqual(diff["added_files"], ["/lost+found"])
|
|
self.assertEqual(diff["deleted_files"], [])
|
|
self.assertEqual(diff["differences"], {})
|
|
|
|
def assertGRUB2(self, device, l1hash, l2hash, size):
|
|
m1 = hashlib.sha256()
|
|
m2 = hashlib.sha256()
|
|
with open(device, "rb") as d:
|
|
sectors = d.read(size)
|
|
self.assertEqual(len(sectors), size)
|
|
m1.update(sectors[:440])
|
|
m2.update(sectors[512:size])
|
|
self.assertEqual(m1.hexdigest(), l1hash)
|
|
self.assertEqual(m2.hexdigest(), l2hash)
|
|
|
|
def assertPartitionTable(self, device, label, uuid, n_partitions, boot_partition=None):
|
|
sfdisk = json.loads(subprocess.check_output(["sfdisk", "--json", device]))
|
|
ptable = sfdisk["partitiontable"]
|
|
|
|
self.assertEqual(ptable["label"], label)
|
|
self.assertEqual(ptable["id"][2:], uuid[:8])
|
|
self.assertEqual(len(ptable["partitions"]), n_partitions)
|
|
|
|
if boot_partition:
|
|
bootable = [p.get("bootable", False) for p in ptable["partitions"]]
|
|
self.assertEqual(bootable.count(True), 1)
|
|
self.assertEqual(bootable.index(True) + 1, boot_partition)
|
|
|
|
@unittest.skipUnless(test.TestBase.have_tree_diff(), "tree-diff missing")
|
|
def test_rawfs(self):
|
|
options = {
|
|
"filename": "image.raw",
|
|
"root_fs_uuid": "016a1cda-5182-4ab3-bf97-426b00b74eb0",
|
|
"size": 2 * 1024 * 1024 * 1024
|
|
}
|
|
with self.run_assembler("org.osbuild.rawfs", options, "image.raw") as (tree, image):
|
|
self.assertImageFile(image, "raw", options["size"])
|
|
self.assertFilesystem(image, options["root_fs_uuid"], "ext4", tree)
|
|
|
|
@unittest.skipUnless(test.TestBase.have_tree_diff(), "tree-diff missing")
|
|
def test_qemu(self):
|
|
for fmt in ["raw", "raw.xz", "qcow2", "vmdk", "vdi"]:
|
|
with self.subTest(fmt=fmt):
|
|
print(f" {fmt}", flush=True)
|
|
options = {
|
|
"format": fmt,
|
|
"filename": f"image.{fmt}",
|
|
"ptuuid": "b2c09a39-db93-44c5-846a-81e06b1dc162",
|
|
"root_fs_uuid": "aff010e9-df95-4f81-be6b-e22317251033",
|
|
"size": 2 * 1024 * 1024 * 1024
|
|
}
|
|
with self.run_assembler("org.osbuild.qemu",
|
|
options,
|
|
f"image.{fmt}") as (tree, image):
|
|
if fmt == "raw.xz":
|
|
subprocess.run(["unxz", "--keep", "--force", image], check=True)
|
|
image = image[:-3]
|
|
fmt = "raw"
|
|
self.assertImageFile(image, fmt, options["size"])
|
|
with nbd_connect(image) as device:
|
|
self.assertPartitionTable(device,
|
|
"dos",
|
|
options["ptuuid"],
|
|
1,
|
|
boot_partition=1)
|
|
self.assertGRUB2(device,
|
|
"26e3327c6b5ac9b5e21d8b86f19ff7cb4d12fb2d0406713f936997d9d89de3ee",
|
|
"18031c9465e3f9ccb9aeb9c8e59dec6b82e91376e2628c8100b5461af62ad67c",
|
|
1024 * 1024)
|
|
self.assertFilesystem(device + "p1", options["root_fs_uuid"], "ext4", tree)
|
|
|
|
def test_tar(self):
|
|
cases = [
|
|
("tree.tar.gz", None, ["application/x-tar"]),
|
|
("tree.tar.gz", "gzip", ["application/x-gzip", "application/gzip"])
|
|
]
|
|
for filename, compression, expected_mimetypes in cases:
|
|
options = {"filename": filename}
|
|
if compression:
|
|
options["compression"] = compression
|
|
with self.run_assembler("org.osbuild.tar",
|
|
options,
|
|
filename) as (_, image):
|
|
output = subprocess.check_output(["file", "--mime-type", image], encoding="utf-8")
|
|
_, mimetype = output.strip().split(": ") # "filename: mimetype"
|
|
self.assertIn(mimetype, expected_mimetypes)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def mount(device):
|
|
with tempfile.TemporaryDirectory() as mountpoint:
|
|
subprocess.run(["mount", "-o", "ro", device, mountpoint], check=True)
|
|
try:
|
|
yield mountpoint
|
|
finally:
|
|
subprocess.run(["umount", "--lazy", mountpoint], check=True)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def nbd_connect(image):
|
|
for device in glob.glob("/dev/nbd*"):
|
|
r = subprocess.run(["qemu-nbd", "--connect", device, "--read-only", image], check=False).returncode
|
|
if r == 0:
|
|
try:
|
|
# qemu-nbd doesn't wait for the device to be ready
|
|
for _ in range(100):
|
|
if subprocess.run(["nbd-client", "--check", device],
|
|
check=False,
|
|
stdout=subprocess.DEVNULL).returncode == 0:
|
|
break
|
|
time.sleep(0.2)
|
|
|
|
yield device
|
|
finally:
|
|
# qemu-nbd doesn't wait until the device is released. nbd-client does
|
|
subprocess.run(["qemu-nbd", "--disconnect", device], check=True, stdout=subprocess.DEVNULL)
|
|
subprocess.run(["nbd-client", "--disconnect", device], check=False, stdout=subprocess.DEVNULL)
|
|
break
|
|
else:
|
|
raise RuntimeError("no free network block device")
|