Require all the tests that compile a manifest to either specify checkpoints or exports. Convert all the tests that were relying on implicit exports with v1 manifests to use explicit exports.
293 lines
12 KiB
Python
293 lines
12 KiB
Python
#
|
|
# Runtime tests for the individual assemblers.
|
|
#
|
|
|
|
import contextlib
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
import unittest
|
|
|
|
from osbuild import loop
|
|
from .. import test
|
|
|
|
MEBIBYTE = 1024 * 1024
|
|
|
|
|
|
@unittest.skipUnless(test.TestBase.have_test_data(), "no test-data access")
|
|
@unittest.skipUnless(test.TestBase.can_bind_mount(), "root-only")
|
|
class TestAssemblers(test.TestBase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super().setUpClass()
|
|
|
|
def setUp(self):
|
|
self.osbuild = test.OSBuild()
|
|
|
|
@contextlib.contextmanager
|
|
def run_assembler(self, osb, name, options, output_path):
|
|
with open(os.path.join(self.locate_test_data(),
|
|
"manifests/filesystem.json")) as f:
|
|
manifest = json.load(f)
|
|
manifest["pipeline"] = dict(
|
|
manifest["pipeline"],
|
|
assembler={"name": name, "options": options}
|
|
)
|
|
data = json.dumps(manifest)
|
|
|
|
treeid = osb.treeid_from_manifest(data)
|
|
assert treeid
|
|
|
|
with tempfile.TemporaryDirectory(dir="/var/tmp") as output_dir:
|
|
osb.compile(data, output_dir=output_dir, checkpoints=[treeid], exports=["assembler"])
|
|
with osb.map_object(treeid) as tree:
|
|
yield tree, os.path.join(output_dir, "assembler", output_path)
|
|
|
|
def assertImageFile(self, filename, fmt, expected_size=None):
|
|
info = json.loads(subprocess.check_output(["qemu-img", "info", "--output", "json", filename]))
|
|
self.assertEqual(info["format"], fmt)
|
|
self.assertEqual(info["virtual-size"], expected_size)
|
|
|
|
def assertFilesystem(self, device, uuid, fstype, tree):
|
|
output = subprocess.check_output(["blkid", "--output", "export", device], encoding="utf-8")
|
|
blkid = dict(line.split("=") for line in output.strip().split("\n"))
|
|
self.assertEqual(blkid["UUID"], uuid)
|
|
self.assertEqual(blkid["TYPE"], fstype)
|
|
|
|
with mount(device) as target_tree:
|
|
diff = self.tree_diff(tree, target_tree)
|
|
if fstype == 'ext4':
|
|
added_files = ["/lost+found"]
|
|
else:
|
|
added_files = []
|
|
self.assertEqual(diff["added_files"], added_files)
|
|
self.assertEqual(diff["deleted_files"], [])
|
|
self.assertEqual(diff["differences"], {})
|
|
|
|
def assertGRUB2(self, device, l1hash, l2hash, size):
|
|
m1 = hashlib.sha256()
|
|
m2 = hashlib.sha256()
|
|
with open(device, "rb") as d:
|
|
sectors = d.read(size)
|
|
self.assertEqual(len(sectors), size)
|
|
m1.update(sectors[:440])
|
|
m2.update(sectors[512:size])
|
|
self.assertEqual(m1.hexdigest(), l1hash)
|
|
self.assertEqual(m2.hexdigest(), l2hash)
|
|
|
|
def assertPartitionTable(self, ptable, label, uuid, n_partitions, boot_partition=None):
|
|
self.assertEqual(ptable["label"], label)
|
|
self.assertEqual(ptable["id"][2:], uuid[:8])
|
|
self.assertEqual(len(ptable["partitions"]), n_partitions)
|
|
|
|
if boot_partition:
|
|
bootable = [p.get("bootable", False) for p in ptable["partitions"]]
|
|
self.assertEqual(bootable.count(True), 1)
|
|
self.assertEqual(bootable.index(True) + 1, boot_partition)
|
|
|
|
def read_partition_table(self, device):
|
|
sfdisk = json.loads(subprocess.check_output(["sfdisk", "--json", device]))
|
|
ptable = sfdisk["partitiontable"]
|
|
self.assertIsNotNone(ptable)
|
|
return ptable
|
|
|
|
@unittest.skipUnless(test.TestBase.have_tree_diff(), "tree-diff missing")
|
|
def test_rawfs(self):
|
|
for fs_type in ["ext4", "xfs", "btrfs"]:
|
|
with self.subTest(fs_type=fs_type):
|
|
print(f" {fs_type}", flush=True)
|
|
options = {
|
|
"filename": "image.raw",
|
|
"root_fs_uuid": "016a1cda-5182-4ab3-bf97-426b00b74eb0",
|
|
"size": 1024 * MEBIBYTE,
|
|
"fs_type": fs_type,
|
|
}
|
|
with self.osbuild as osb:
|
|
with self.run_assembler(osb, "org.osbuild.rawfs", options, "image.raw") as (tree, image):
|
|
self.assertImageFile(image, "raw", options["size"])
|
|
self.assertFilesystem(image, options["root_fs_uuid"], fs_type, tree)
|
|
|
|
@unittest.skipUnless(test.TestBase.have_tree_diff(), "tree-diff missing")
|
|
def test_ostree(self):
|
|
with self.osbuild as osb:
|
|
with open(os.path.join(self.locate_test_data(),
|
|
"manifests/fedora-ostree-commit.json")) as f:
|
|
manifest = json.load(f)
|
|
|
|
data = json.dumps(manifest)
|
|
with tempfile.TemporaryDirectory(dir="/var/tmp") as output_dir:
|
|
result = osb.compile(data, output_dir=output_dir, exports=["assembler"])
|
|
compose_file = os.path.join(output_dir, "assembler", "compose.json")
|
|
repo = os.path.join(output_dir, "assembler", "repo")
|
|
|
|
with open(compose_file) as f:
|
|
compose = json.load(f)
|
|
commit_id = compose["ostree-commit"]
|
|
ref = compose["ref"]
|
|
rpmostree_inputhash = compose["rpm-ostree-inputhash"]
|
|
os_version = compose["ostree-version"]
|
|
assert commit_id
|
|
assert ref
|
|
assert rpmostree_inputhash
|
|
assert os_version
|
|
self.assertIn("metadata", result["assembler"])
|
|
metadata = result["assembler"]["metadata"]
|
|
self.assertIn("compose", metadata)
|
|
self.assertEqual(compose, metadata["compose"])
|
|
|
|
md = subprocess.check_output(
|
|
[
|
|
"ostree",
|
|
"show",
|
|
"--repo", repo,
|
|
"--print-metadata-key=rpmostree.inputhash",
|
|
commit_id
|
|
], encoding="utf-8").strip()
|
|
self.assertEqual(md, f"'{rpmostree_inputhash}'")
|
|
|
|
md = subprocess.check_output(
|
|
[
|
|
"ostree",
|
|
"show",
|
|
"--repo", repo,
|
|
"--print-metadata-key=version",
|
|
commit_id
|
|
], encoding="utf-8").strip()
|
|
self.assertEqual(md, f"'{os_version}'")
|
|
|
|
@unittest.skipUnless(test.TestBase.have_tree_diff(), "tree-diff missing")
|
|
def test_qemu(self):
|
|
loctl = loop.LoopControl()
|
|
with self.osbuild as osb:
|
|
for fmt in ["raw", "raw.xz", "qcow2", "vmdk", "vdi"]:
|
|
for fs_type in ["ext4", "xfs", "btrfs"]:
|
|
with self.subTest(fmt=fmt, fs_type=fs_type):
|
|
print(f" {fmt} {fs_type}", flush=True)
|
|
options = {
|
|
"format": fmt,
|
|
"filename": f"image.{fmt}",
|
|
"ptuuid": "b2c09a39-db93-44c5-846a-81e06b1dc162",
|
|
"root_fs_uuid": "aff010e9-df95-4f81-be6b-e22317251033",
|
|
"size": 1024 * MEBIBYTE,
|
|
"root_fs_type": fs_type,
|
|
}
|
|
with self.run_assembler(osb,
|
|
"org.osbuild.qemu",
|
|
options,
|
|
f"image.{fmt}") as (tree, image):
|
|
if fmt == "raw.xz":
|
|
subprocess.run(["unxz", "--keep", "--force", image], check=True)
|
|
image = image[:-3]
|
|
fmt = "raw"
|
|
self.assertImageFile(image, fmt, options["size"])
|
|
with open_image(loctl, image, fmt) as (target, device):
|
|
ptable = self.read_partition_table(device)
|
|
self.assertPartitionTable(ptable,
|
|
"dos",
|
|
options["ptuuid"],
|
|
1,
|
|
boot_partition=1)
|
|
if fs_type == "btrfs":
|
|
l2hash = "919aad44d37aa9fdbb8cb1bbd8ce2a44e64aee76f4dceb805eaab041b7f62348"
|
|
elif fs_type == "xfs":
|
|
l2hash = "1729f531281e4c3cbcde2a39b587c9dd5334ea1335bb860905556d5b73603de6"
|
|
else:
|
|
l2hash = "24c3ad6be9a5687d5140e0bf66d25953c4f0c7eeb6aaced4cc64685f5b3cfa9e"
|
|
self.assertGRUB2(device,
|
|
"26e3327c6b5ac9b5e21d8b86f19ff7cb4d12fb2d0406713f936997d9d89de3ee",
|
|
l2hash,
|
|
1024 * 1024)
|
|
|
|
p1 = ptable["partitions"][0]
|
|
ssize = ptable.get("sectorsize", 512)
|
|
start, size = p1["start"] * ssize, p1["size"] * ssize
|
|
with loop_open(loctl, target, offset=start, size=size) as dev:
|
|
self.assertFilesystem(dev, options["root_fs_uuid"], fs_type, tree)
|
|
|
|
@unittest.skipUnless(test.TestBase.have_tree_diff(), "tree-diff missing")
|
|
def test_tar(self):
|
|
cases = [
|
|
("tree.tar.gz", None, ["application/x-tar"]),
|
|
("tree.tar.gz", "gzip", ["application/x-gzip", "application/gzip"])
|
|
]
|
|
with self.osbuild as osb:
|
|
for filename, compression, expected_mimetypes in cases:
|
|
options = {"filename": filename}
|
|
if compression:
|
|
options["compression"] = compression
|
|
with self.run_assembler(osb,
|
|
"org.osbuild.tar",
|
|
options,
|
|
filename) as (tree, image):
|
|
output = subprocess.check_output(["file", "--mime-type", image], encoding="utf-8")
|
|
_, mimetype = output.strip().split(": ") # "filename: mimetype"
|
|
self.assertIn(mimetype, expected_mimetypes)
|
|
|
|
if compression:
|
|
continue
|
|
|
|
# In the non-compression case, we verify the tree's content
|
|
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp:
|
|
args = [
|
|
"tar",
|
|
"--numeric-owner",
|
|
"--selinux",
|
|
"--acls",
|
|
"--xattrs", "--xattrs-include", "*",
|
|
"-xaf", image,
|
|
"-C", tmp]
|
|
subprocess.check_output(args, encoding="utf-8")
|
|
diff = self.tree_diff(tree, tmp)
|
|
self.assertEqual(diff["added_files"], [])
|
|
self.assertEqual(diff["deleted_files"], [])
|
|
self.assertEqual(diff["differences"], {})
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def loop_create_device(ctl, fd, offset=None, sizelimit=None):
|
|
lo = None
|
|
try:
|
|
lo = ctl.loop_for_fd(fd,
|
|
offset=offset,
|
|
sizelimit=sizelimit,
|
|
autoclear=True)
|
|
yield lo
|
|
finally:
|
|
if lo:
|
|
lo.close()
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def loop_open(ctl, image, *, offset=None, size=None):
|
|
with open(image, "rb") as f:
|
|
fd = f.fileno()
|
|
with loop_create_device(ctl, fd, offset=offset, sizelimit=size) as lo:
|
|
yield os.path.join("/dev", lo.devname)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def mount(device):
|
|
with tempfile.TemporaryDirectory() as mountpoint:
|
|
subprocess.run(["mount", "-o", "ro", device, mountpoint], check=True)
|
|
try:
|
|
yield mountpoint
|
|
finally:
|
|
subprocess.run(["umount", "--lazy", mountpoint], check=True)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def open_image(ctl, image, fmt):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
if fmt != "raw":
|
|
target = os.path.join(tmp, "image.raw")
|
|
subprocess.run(["qemu-img", "convert", "-O", "raw", image, target],
|
|
check=True)
|
|
else:
|
|
target = image
|
|
|
|
size = os.stat(target).st_size
|
|
|
|
with loop_open(ctl, target, offset=0, size=size) as dev:
|
|
yield target, dev
|