From 5ee68aef305fa8153cafaf7247a4dea115209225 Mon Sep 17 00:00:00 2001 From: Christian Kellner Date: Mon, 16 Dec 2019 16:45:09 +0100 Subject: [PATCH] assembler/qemu: Partition{Table} & Filesystem objs Instead of having dictionaries representing the partition table, partitions and filesystems together with some functions operating on them, have proper python objects with methods. In the future these objects could be extract and properly tested as well. --- assemblers/org.osbuild.qemu | 238 ++++++++++++++++++++++++------------ 1 file changed, 162 insertions(+), 76 deletions(-) diff --git a/assemblers/org.osbuild.qemu b/assemblers/org.osbuild.qemu index 26635aa7..3830f67d 100755 --- a/assemblers/org.osbuild.qemu +++ b/assemblers/org.osbuild.qemu @@ -8,6 +8,7 @@ import shutil import subprocess import sys import tempfile +from typing import List import osbuild.remoteloop as remoteloop STAGE_DESC = "Assemble a bootable partitioned disk image with qemu-img" @@ -130,6 +131,18 @@ def mount(source, dest): subprocess.run(["umount", "-R", dest], check=True) +class Filesystem: + def __init__(self, + fstype: str, + uuid: str, + mountpoint: str, + label: str = None): + self.type = fstype + self.uuid = uuid + self.mountpoint = mountpoint + self.label = label + + def mkfs_ext4(device, uuid, label): opts = [] if label: @@ -154,7 +167,8 @@ def mkfs_vfat(device, uuid, label): subprocess.run(["mkfs.vfat", "-i", volid] + opts + [device], encoding='utf-8', check=True) -def mkfs_for_type(device, uuid, fs_type, label): +def mkfs_for_type(device: str, fs: Filesystem): + fs_type = fs.type if fs_type == "ext4": maker = mkfs_ext4 elif fs_type == "xfs": @@ -163,80 +177,151 @@ def mkfs_for_type(device, uuid, fs_type, label): maker = mkfs_vfat else: raise ValueError(f"Unknown filesystem type '{fs_type}'") - maker(device, uuid, label) + maker(device, fs.uuid, fs.label) -def create_partition_table_legacy(image, options): +class Partition: + def __init__(self, + pttype: str = None, + start: int = None, + size: int = None, + bootable: bool = False, + filesystem: Filesystem = None): + self.type = pttype + self.start = start + self.size = size + self.bootable = bootable + self.filesystem = filesystem + + @property + def start_in_bytes(self): + return (self.start or 0) * 512 + + @property + def size_in_bytes(self): + return (self.size or 0) * 512 + + @property + def mountpoint(self): + if self.filesystem is None: + return None + return self.filesystem.mountpoint + + @property + def fs_type(self): + if self.filesystem is None: + return None + return self.filesystem.type + + @property + def fs_uuid(self): + if self.filesystem is None: + return None + return self.filesystem.uuid + + +class PartitionTable: + def __init__(self, label, uuid, partitions): + self.label = label + self.uuid = uuid + self.partitions = partitions or [] + + def __getitem__(self, key) -> Partition: + return self.partitions[key] + + def partitions_with_filesystems(self) -> List[Partition]: + """Return partitions with filesystems sorted by hierarchy""" + def mountpoint_len(p): + return len(p.mountpoint) + parts_fs = filter(lambda p: p.filesystem is not None, self.partitions) + return sorted(parts_fs, key=mountpoint_len) + + def write_to(self, target, sync=True): + """Write the partition table to disk""" + # generate the command for sfdisk to create the table + command = f"label: {self.label}\nlabel-id: {self.uuid}" + for partition in self.partitions: + fields = [] + for field in ["start", "size", "type"]: + value = getattr(partition, field) + if value: + fields += [f'{field}="{value}"'] + if partition.bootable: + fields += ["bootable"] + command += "\n" + ", ".join(fields) + + subprocess.run(["sfdisk", "-q", target], + input=command, + encoding='utf-8', + check=True) + + if sync: + self.update_from(target) + + def update_from(self, target): + """Update and fill in missing information from disk""" + r = subprocess.run(["sfdisk", "--json", target], + stdout=subprocess.PIPE, + encoding='utf-8', + check=True) + disk_table = json.loads(r.stdout)["partitiontable"] + disk_parts = disk_table["partitions"] + + assert len(disk_parts) == len(self.partitions) + for i, part in enumerate(self.partitions): + part.start = disk_parts[i]["start"] + part.size = disk_parts[i]["size"] + part.type = disk_parts[i].get("type") + + +def filesystem_from_json(js) -> Filesystem: + return Filesystem(js["type"], js["uuid"], js["mountpoint"], js.get("label")) + + +def partition_from_json(js) -> Partition: + p = Partition(pttype=js.get("type"), + start=js.get("start"), + size=js.get("size"), + bootable=js.get("bootable")) + fs = js.get("filesystem") + if fs: + p.filesystem = filesystem_from_json(fs) + return p + + +def partition_table_from_options(options) -> PartitionTable: ptuuid = options["ptuuid"] - root_fs_uuid = options["root_fs_uuid"] - root_fs_type = options.get("root_fs_type", "ext4") + pttype = options.get("pttype", "dos") + partitions = options.get("partitions") - partition_table = f"label: mbr\nlabel-id: {ptuuid}\nbootable, type=83" - subprocess.run(["sfdisk", "-q", image], input=partition_table, encoding='utf-8', check=True) + if pttype == "mbr": + pttype = "dos" - r = subprocess.run(["sfdisk", "--json", image], stdout=subprocess.PIPE, encoding='utf-8', check=True) - partition_table = json.loads(r.stdout) - - partition = partition_table["partitiontable"]["partitions"][0] - partitions = [{ - "start": partition["start"] * 512, - "size": partition["size"] * 512, - "filesystem": { - "type": root_fs_type, - "uuid": root_fs_uuid, - "mountpoint": "/" - } - }] - - return "mbr", partitions + if partitions is None: + # legacy mode, create a correct + root_fs_uuid = options["root_fs_uuid"] + root_fs_type = options.get("root_fs_type", "ext4") + partitions = [{ + "bootable": True, + "type": "83", + "filesystem": { + "type": root_fs_type, + "uuid": root_fs_uuid, + "mountpoint": "/" + } + }] + parts = [partition_from_json(p) for p in partitions] + return PartitionTable(pttype, ptuuid, parts) -def create_partition_table(image, options): - """Set up the partition table of the image""" - ptuuid = options["ptuuid"] - pttype = options.get("pttype") - - # if 'pttype' is missing, we are in legacy mode - if pttype is None: - return create_partition_table_legacy(image, options) - - # new mode - partitions = options["partitions"] - - # generate the command for sfdisk to create the table - command = f"label: {pttype}\nlabel-id: {ptuuid}" - for partition in partitions: - fields = [] - for field in ["start", "size", "type"]: - if field in partition: - fields += [f'{field}="{partition[field]}"'] - if "bootable" in partition and partition["bootable"]: - fields += ["bootable"] - command += "\n" + ", ".join(fields) - - subprocess.run(["sfdisk", "-q", image], input=command, encoding='utf-8', check=True) - - # read the actual dimensions back - r = subprocess.run(["sfdisk", "--json", image], stdout=subprocess.PIPE, encoding='utf-8', check=True) - disk_table = json.loads(r.stdout)["partitiontable"] - disk_partitions = disk_table["partitions"] - - assert len(disk_partitions) == len(partitions) - for i, partition in enumerate(partitions): - partition["start"] = disk_partitions[i]["start"] * 512 - partition["size"] = disk_partitions[i]["size"] * 512 - - return pttype, partitions - - -def install_grub2(image, partitions): +def install_grub2(image: str, pt: PartitionTable): """Install grub2 to image""" grub2_core = "/var/tmp/grub2-core.img" root_fs_type = "unknown" - for p in partitions: - if p["filesystem"]["mountpoint"] == "/": - root_fs_type = p["filesystem"]["type"] + for p in pt: + if p.mountpoint and p.mountpoint == "/": + root_fs_type = p.fs_type break if root_fs_type == "ext4": @@ -260,7 +345,7 @@ def install_grub2(image, partitions): "part_msdos", fs_module, "biosdisk"], check=True) - partition_offset = partitions[0]["start"] + partition_offset = pt[0].start_in_bytes assert os.path.getsize(grub2_core) < partition_offset - 512 with open(image, "rb+") as image_f: @@ -298,25 +383,26 @@ def main(tree, output_dir, options, loop_client): subprocess.run(["truncate", "--size", str(size), image], check=True) # The partition table - pttype, partitions = create_partition_table(image, options) + pt = partition_table_from_options(options) + pt.write_to(image) # Create the level-2 bootloader - if pttype == "mbr": - install_grub2(image, partitions) + if pt.label == "dos": + install_grub2(image, pt) # Now assemble the filesystem hierarchy and copy the tree into the image with contextlib.ExitStack() as cm: root = cm.enter_context(tempfile.TemporaryDirectory(prefix="osbuild-mnt")) - # sort the partition according to their position in the filesystem tree - for partition in sorted(partitions, key=lambda p: len(p["filesystem"]["mountpoint"])): - offset, size = partition["start"], partition["size"] - filesystem = partition["filesystem"] + # iterate the partition according to their position in the filesystem tree + for partition in pt.partitions_with_filesystems(): + offset, size = partition.start_in_bytes, partition.size_in_bytes loop = cm.enter_context(loop_client.device(image, offset, size)) - # make the specified filesystem - fs_label = filesystem.get("label") - mkfs_for_type(loop, filesystem["uuid"], filesystem["type"], fs_label) + # make the specified filesystem, if any + if partition.filesystem is None: + continue + mkfs_for_type(loop, partition.filesystem) # now mount it - mountpoint = os.path.normpath(f"{root}/{filesystem['mountpoint']}") + mountpoint = os.path.normpath(f"{root}/{partition.mountpoint}") os.makedirs(mountpoint, exist_ok=True) cm.enter_context(mount(loop, mountpoint)) # the filesystem tree should now be properly setup,