Instead of checkpointing the tree and then accessing the generated tree inside the store via the `map_object` function we not just export the tree and use that. This better hides the internals of the store and also allows us to activate on-demand building that does not rely on checkpoints being implicitly built like exports.
293 lines
12 KiB
Python
293 lines
12 KiB
Python
#
|
|
# Runtime tests for the individual assemblers.
|
|
#
|
|
|
|
import contextlib
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
import unittest
|
|
|
|
from osbuild import loop
|
|
from .. import test
|
|
|
|
MEBIBYTE = 1024 * 1024
|
|
|
|
|
|
@unittest.skipUnless(test.TestBase.have_test_data(), "no test-data access")
|
|
@unittest.skipUnless(test.TestBase.can_bind_mount(), "root-only")
|
|
class TestAssemblers(test.TestBase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super().setUpClass()
|
|
|
|
def setUp(self):
|
|
self.osbuild = test.OSBuild()
|
|
|
|
@contextlib.contextmanager
|
|
def run_assembler(self, osb, name, options, output_path):
|
|
with open(os.path.join(self.locate_test_data(),
|
|
"manifests/filesystem.json")) as f:
|
|
manifest = json.load(f)
|
|
manifest["pipeline"] = dict(
|
|
manifest["pipeline"],
|
|
assembler={"name": name, "options": options}
|
|
)
|
|
data = json.dumps(manifest)
|
|
|
|
treeid = osb.treeid_from_manifest(data)
|
|
assert treeid
|
|
|
|
with tempfile.TemporaryDirectory(dir="/var/tmp") as output_dir:
|
|
osb.compile(data, output_dir=output_dir, exports=["assembler", "tree"])
|
|
tree = os.path.join(output_dir, "tree")
|
|
yield tree, os.path.join(output_dir, "assembler", output_path)
|
|
|
|
def assertImageFile(self, filename, fmt, expected_size=None):
|
|
info = json.loads(subprocess.check_output(["qemu-img", "info", "--output", "json", filename]))
|
|
self.assertEqual(info["format"], fmt)
|
|
self.assertEqual(info["virtual-size"], expected_size)
|
|
|
|
def assertFilesystem(self, device, uuid, fstype, tree):
|
|
output = subprocess.check_output(["blkid", "--output", "export", device], encoding="utf-8")
|
|
blkid = dict(line.split("=") for line in output.strip().split("\n"))
|
|
self.assertEqual(blkid["UUID"], uuid)
|
|
self.assertEqual(blkid["TYPE"], fstype)
|
|
|
|
with mount(device) as target_tree:
|
|
diff = self.tree_diff(tree, target_tree)
|
|
if fstype == 'ext4':
|
|
added_files = ["/lost+found"]
|
|
else:
|
|
added_files = []
|
|
self.assertEqual(diff["added_files"], added_files)
|
|
self.assertEqual(diff["deleted_files"], [])
|
|
self.assertEqual(diff["differences"], {})
|
|
|
|
def assertGRUB2(self, device, l1hash, l2hash, size):
|
|
m1 = hashlib.sha256()
|
|
m2 = hashlib.sha256()
|
|
with open(device, "rb") as d:
|
|
sectors = d.read(size)
|
|
self.assertEqual(len(sectors), size)
|
|
m1.update(sectors[:440])
|
|
m2.update(sectors[512:size])
|
|
self.assertEqual(m1.hexdigest(), l1hash)
|
|
self.assertEqual(m2.hexdigest(), l2hash)
|
|
|
|
def assertPartitionTable(self, ptable, label, uuid, n_partitions, boot_partition=None):
|
|
self.assertEqual(ptable["label"], label)
|
|
self.assertEqual(ptable["id"][2:], uuid[:8])
|
|
self.assertEqual(len(ptable["partitions"]), n_partitions)
|
|
|
|
if boot_partition:
|
|
bootable = [p.get("bootable", False) for p in ptable["partitions"]]
|
|
self.assertEqual(bootable.count(True), 1)
|
|
self.assertEqual(bootable.index(True) + 1, boot_partition)
|
|
|
|
def read_partition_table(self, device):
|
|
sfdisk = json.loads(subprocess.check_output(["sfdisk", "--json", device]))
|
|
ptable = sfdisk["partitiontable"]
|
|
self.assertIsNotNone(ptable)
|
|
return ptable
|
|
|
|
@unittest.skipUnless(test.TestBase.have_tree_diff(), "tree-diff missing")
|
|
def test_rawfs(self):
|
|
for fs_type in ["ext4", "xfs", "btrfs"]:
|
|
with self.subTest(fs_type=fs_type):
|
|
print(f" {fs_type}", flush=True)
|
|
options = {
|
|
"filename": "image.raw",
|
|
"root_fs_uuid": "016a1cda-5182-4ab3-bf97-426b00b74eb0",
|
|
"size": 1024 * MEBIBYTE,
|
|
"fs_type": fs_type,
|
|
}
|
|
with self.osbuild as osb:
|
|
with self.run_assembler(osb, "org.osbuild.rawfs", options, "image.raw") as (tree, image):
|
|
self.assertImageFile(image, "raw", options["size"])
|
|
self.assertFilesystem(image, options["root_fs_uuid"], fs_type, tree)
|
|
|
|
@unittest.skipUnless(test.TestBase.have_tree_diff(), "tree-diff missing")
|
|
def test_ostree(self):
|
|
with self.osbuild as osb:
|
|
with open(os.path.join(self.locate_test_data(),
|
|
"manifests/fedora-ostree-commit.json")) as f:
|
|
manifest = json.load(f)
|
|
|
|
data = json.dumps(manifest)
|
|
with tempfile.TemporaryDirectory(dir="/var/tmp") as output_dir:
|
|
result = osb.compile(data, output_dir=output_dir, exports=["assembler"])
|
|
compose_file = os.path.join(output_dir, "assembler", "compose.json")
|
|
repo = os.path.join(output_dir, "assembler", "repo")
|
|
|
|
with open(compose_file) as f:
|
|
compose = json.load(f)
|
|
commit_id = compose["ostree-commit"]
|
|
ref = compose["ref"]
|
|
rpmostree_inputhash = compose["rpm-ostree-inputhash"]
|
|
os_version = compose["ostree-version"]
|
|
assert commit_id
|
|
assert ref
|
|
assert rpmostree_inputhash
|
|
assert os_version
|
|
self.assertIn("metadata", result["assembler"])
|
|
metadata = result["assembler"]["metadata"]
|
|
self.assertIn("compose", metadata)
|
|
self.assertEqual(compose, metadata["compose"])
|
|
|
|
md = subprocess.check_output(
|
|
[
|
|
"ostree",
|
|
"show",
|
|
"--repo", repo,
|
|
"--print-metadata-key=rpmostree.inputhash",
|
|
commit_id
|
|
], encoding="utf-8").strip()
|
|
self.assertEqual(md, f"'{rpmostree_inputhash}'")
|
|
|
|
md = subprocess.check_output(
|
|
[
|
|
"ostree",
|
|
"show",
|
|
"--repo", repo,
|
|
"--print-metadata-key=version",
|
|
commit_id
|
|
], encoding="utf-8").strip()
|
|
self.assertEqual(md, f"'{os_version}'")
|
|
|
|
@unittest.skipUnless(test.TestBase.have_tree_diff(), "tree-diff missing")
|
|
def test_qemu(self):
|
|
loctl = loop.LoopControl()
|
|
with self.osbuild as osb:
|
|
for fmt in ["raw", "raw.xz", "qcow2", "vmdk", "vdi"]:
|
|
for fs_type in ["ext4", "xfs", "btrfs"]:
|
|
with self.subTest(fmt=fmt, fs_type=fs_type):
|
|
print(f" {fmt} {fs_type}", flush=True)
|
|
options = {
|
|
"format": fmt,
|
|
"filename": f"image.{fmt}",
|
|
"ptuuid": "b2c09a39-db93-44c5-846a-81e06b1dc162",
|
|
"root_fs_uuid": "aff010e9-df95-4f81-be6b-e22317251033",
|
|
"size": 1024 * MEBIBYTE,
|
|
"root_fs_type": fs_type,
|
|
}
|
|
with self.run_assembler(osb,
|
|
"org.osbuild.qemu",
|
|
options,
|
|
f"image.{fmt}") as (tree, image):
|
|
if fmt == "raw.xz":
|
|
subprocess.run(["unxz", "--keep", "--force", image], check=True)
|
|
image = image[:-3]
|
|
fmt = "raw"
|
|
self.assertImageFile(image, fmt, options["size"])
|
|
with open_image(loctl, image, fmt) as (target, device):
|
|
ptable = self.read_partition_table(device)
|
|
self.assertPartitionTable(ptable,
|
|
"dos",
|
|
options["ptuuid"],
|
|
1,
|
|
boot_partition=1)
|
|
if fs_type == "btrfs":
|
|
l2hash = "919aad44d37aa9fdbb8cb1bbd8ce2a44e64aee76f4dceb805eaab041b7f62348"
|
|
elif fs_type == "xfs":
|
|
l2hash = "1729f531281e4c3cbcde2a39b587c9dd5334ea1335bb860905556d5b73603de6"
|
|
else:
|
|
l2hash = "24c3ad6be9a5687d5140e0bf66d25953c4f0c7eeb6aaced4cc64685f5b3cfa9e"
|
|
self.assertGRUB2(device,
|
|
"26e3327c6b5ac9b5e21d8b86f19ff7cb4d12fb2d0406713f936997d9d89de3ee",
|
|
l2hash,
|
|
1024 * 1024)
|
|
|
|
p1 = ptable["partitions"][0]
|
|
ssize = ptable.get("sectorsize", 512)
|
|
start, size = p1["start"] * ssize, p1["size"] * ssize
|
|
with loop_open(loctl, target, offset=start, size=size) as dev:
|
|
self.assertFilesystem(dev, options["root_fs_uuid"], fs_type, tree)
|
|
|
|
@unittest.skipUnless(test.TestBase.have_tree_diff(), "tree-diff missing")
|
|
def test_tar(self):
|
|
cases = [
|
|
("tree.tar.gz", None, ["application/x-tar"]),
|
|
("tree.tar.gz", "gzip", ["application/x-gzip", "application/gzip"])
|
|
]
|
|
with self.osbuild as osb:
|
|
for filename, compression, expected_mimetypes in cases:
|
|
options = {"filename": filename}
|
|
if compression:
|
|
options["compression"] = compression
|
|
with self.run_assembler(osb,
|
|
"org.osbuild.tar",
|
|
options,
|
|
filename) as (tree, image):
|
|
output = subprocess.check_output(["file", "--mime-type", image], encoding="utf-8")
|
|
_, mimetype = output.strip().split(": ") # "filename: mimetype"
|
|
self.assertIn(mimetype, expected_mimetypes)
|
|
|
|
if compression:
|
|
continue
|
|
|
|
# In the non-compression case, we verify the tree's content
|
|
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp:
|
|
args = [
|
|
"tar",
|
|
"--numeric-owner",
|
|
"--selinux",
|
|
"--acls",
|
|
"--xattrs", "--xattrs-include", "*",
|
|
"-xaf", image,
|
|
"-C", tmp]
|
|
subprocess.check_output(args, encoding="utf-8")
|
|
diff = self.tree_diff(tree, tmp)
|
|
self.assertEqual(diff["added_files"], [])
|
|
self.assertEqual(diff["deleted_files"], [])
|
|
self.assertEqual(diff["differences"], {})
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def loop_create_device(ctl, fd, offset=None, sizelimit=None):
|
|
lo = None
|
|
try:
|
|
lo = ctl.loop_for_fd(fd,
|
|
offset=offset,
|
|
sizelimit=sizelimit,
|
|
autoclear=True)
|
|
yield lo
|
|
finally:
|
|
if lo:
|
|
lo.close()
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def loop_open(ctl, image, *, offset=None, size=None):
|
|
with open(image, "rb") as f:
|
|
fd = f.fileno()
|
|
with loop_create_device(ctl, fd, offset=offset, sizelimit=size) as lo:
|
|
yield os.path.join("/dev", lo.devname)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def mount(device):
|
|
with tempfile.TemporaryDirectory() as mountpoint:
|
|
subprocess.run(["mount", "-o", "ro", device, mountpoint], check=True)
|
|
try:
|
|
yield mountpoint
|
|
finally:
|
|
subprocess.run(["umount", "--lazy", mountpoint], check=True)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def open_image(ctl, image, fmt):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
if fmt != "raw":
|
|
target = os.path.join(tmp, "image.raw")
|
|
subprocess.run(["qemu-img", "convert", "-O", "raw", image, target],
|
|
check=True)
|
|
else:
|
|
target = image
|
|
|
|
size = os.stat(target).st_size
|
|
|
|
with loop_open(ctl, target, offset=0, size=size) as dev:
|
|
yield target, dev
|