assembler/qemu: Partition{Table} & Filesystem objs

Instead of having dictionaries representing the partition table,
partitions and filesystems together with some functions operating
on them, have proper python objects with methods. In the future
these objects could be extract and properly tested as well.
This commit is contained in:
Christian Kellner 2019-12-16 16:45:09 +01:00 committed by Tom Gundersen
parent 83c3f9608d
commit 5ee68aef30

View file

@ -8,6 +8,7 @@ import shutil
import subprocess
import sys
import tempfile
from typing import List
import osbuild.remoteloop as remoteloop
STAGE_DESC = "Assemble a bootable partitioned disk image with qemu-img"
@ -130,6 +131,18 @@ def mount(source, dest):
subprocess.run(["umount", "-R", dest], check=True)
class Filesystem:
def __init__(self,
fstype: str,
uuid: str,
mountpoint: str,
label: str = None):
self.type = fstype
self.uuid = uuid
self.mountpoint = mountpoint
self.label = label
def mkfs_ext4(device, uuid, label):
opts = []
if label:
@ -154,7 +167,8 @@ def mkfs_vfat(device, uuid, label):
subprocess.run(["mkfs.vfat", "-i", volid] + opts + [device], encoding='utf-8', check=True)
def mkfs_for_type(device, uuid, fs_type, label):
def mkfs_for_type(device: str, fs: Filesystem):
fs_type = fs.type
if fs_type == "ext4":
maker = mkfs_ext4
elif fs_type == "xfs":
@ -163,80 +177,151 @@ def mkfs_for_type(device, uuid, fs_type, label):
maker = mkfs_vfat
else:
raise ValueError(f"Unknown filesystem type '{fs_type}'")
maker(device, uuid, label)
maker(device, fs.uuid, fs.label)
def create_partition_table_legacy(image, options):
class Partition:
def __init__(self,
pttype: str = None,
start: int = None,
size: int = None,
bootable: bool = False,
filesystem: Filesystem = None):
self.type = pttype
self.start = start
self.size = size
self.bootable = bootable
self.filesystem = filesystem
@property
def start_in_bytes(self):
return (self.start or 0) * 512
@property
def size_in_bytes(self):
return (self.size or 0) * 512
@property
def mountpoint(self):
if self.filesystem is None:
return None
return self.filesystem.mountpoint
@property
def fs_type(self):
if self.filesystem is None:
return None
return self.filesystem.type
@property
def fs_uuid(self):
if self.filesystem is None:
return None
return self.filesystem.uuid
class PartitionTable:
def __init__(self, label, uuid, partitions):
self.label = label
self.uuid = uuid
self.partitions = partitions or []
def __getitem__(self, key) -> Partition:
return self.partitions[key]
def partitions_with_filesystems(self) -> List[Partition]:
"""Return partitions with filesystems sorted by hierarchy"""
def mountpoint_len(p):
return len(p.mountpoint)
parts_fs = filter(lambda p: p.filesystem is not None, self.partitions)
return sorted(parts_fs, key=mountpoint_len)
def write_to(self, target, sync=True):
"""Write the partition table to disk"""
# generate the command for sfdisk to create the table
command = f"label: {self.label}\nlabel-id: {self.uuid}"
for partition in self.partitions:
fields = []
for field in ["start", "size", "type"]:
value = getattr(partition, field)
if value:
fields += [f'{field}="{value}"']
if partition.bootable:
fields += ["bootable"]
command += "\n" + ", ".join(fields)
subprocess.run(["sfdisk", "-q", target],
input=command,
encoding='utf-8',
check=True)
if sync:
self.update_from(target)
def update_from(self, target):
"""Update and fill in missing information from disk"""
r = subprocess.run(["sfdisk", "--json", target],
stdout=subprocess.PIPE,
encoding='utf-8',
check=True)
disk_table = json.loads(r.stdout)["partitiontable"]
disk_parts = disk_table["partitions"]
assert len(disk_parts) == len(self.partitions)
for i, part in enumerate(self.partitions):
part.start = disk_parts[i]["start"]
part.size = disk_parts[i]["size"]
part.type = disk_parts[i].get("type")
def filesystem_from_json(js) -> Filesystem:
return Filesystem(js["type"], js["uuid"], js["mountpoint"], js.get("label"))
def partition_from_json(js) -> Partition:
p = Partition(pttype=js.get("type"),
start=js.get("start"),
size=js.get("size"),
bootable=js.get("bootable"))
fs = js.get("filesystem")
if fs:
p.filesystem = filesystem_from_json(fs)
return p
def partition_table_from_options(options) -> PartitionTable:
ptuuid = options["ptuuid"]
root_fs_uuid = options["root_fs_uuid"]
root_fs_type = options.get("root_fs_type", "ext4")
pttype = options.get("pttype", "dos")
partitions = options.get("partitions")
partition_table = f"label: mbr\nlabel-id: {ptuuid}\nbootable, type=83"
subprocess.run(["sfdisk", "-q", image], input=partition_table, encoding='utf-8', check=True)
if pttype == "mbr":
pttype = "dos"
r = subprocess.run(["sfdisk", "--json", image], stdout=subprocess.PIPE, encoding='utf-8', check=True)
partition_table = json.loads(r.stdout)
partition = partition_table["partitiontable"]["partitions"][0]
partitions = [{
"start": partition["start"] * 512,
"size": partition["size"] * 512,
"filesystem": {
"type": root_fs_type,
"uuid": root_fs_uuid,
"mountpoint": "/"
}
}]
return "mbr", partitions
if partitions is None:
# legacy mode, create a correct
root_fs_uuid = options["root_fs_uuid"]
root_fs_type = options.get("root_fs_type", "ext4")
partitions = [{
"bootable": True,
"type": "83",
"filesystem": {
"type": root_fs_type,
"uuid": root_fs_uuid,
"mountpoint": "/"
}
}]
parts = [partition_from_json(p) for p in partitions]
return PartitionTable(pttype, ptuuid, parts)
def create_partition_table(image, options):
"""Set up the partition table of the image"""
ptuuid = options["ptuuid"]
pttype = options.get("pttype")
# if 'pttype' is missing, we are in legacy mode
if pttype is None:
return create_partition_table_legacy(image, options)
# new mode
partitions = options["partitions"]
# generate the command for sfdisk to create the table
command = f"label: {pttype}\nlabel-id: {ptuuid}"
for partition in partitions:
fields = []
for field in ["start", "size", "type"]:
if field in partition:
fields += [f'{field}="{partition[field]}"']
if "bootable" in partition and partition["bootable"]:
fields += ["bootable"]
command += "\n" + ", ".join(fields)
subprocess.run(["sfdisk", "-q", image], input=command, encoding='utf-8', check=True)
# read the actual dimensions back
r = subprocess.run(["sfdisk", "--json", image], stdout=subprocess.PIPE, encoding='utf-8', check=True)
disk_table = json.loads(r.stdout)["partitiontable"]
disk_partitions = disk_table["partitions"]
assert len(disk_partitions) == len(partitions)
for i, partition in enumerate(partitions):
partition["start"] = disk_partitions[i]["start"] * 512
partition["size"] = disk_partitions[i]["size"] * 512
return pttype, partitions
def install_grub2(image, partitions):
def install_grub2(image: str, pt: PartitionTable):
"""Install grub2 to image"""
grub2_core = "/var/tmp/grub2-core.img"
root_fs_type = "unknown"
for p in partitions:
if p["filesystem"]["mountpoint"] == "/":
root_fs_type = p["filesystem"]["type"]
for p in pt:
if p.mountpoint and p.mountpoint == "/":
root_fs_type = p.fs_type
break
if root_fs_type == "ext4":
@ -260,7 +345,7 @@ def install_grub2(image, partitions):
"part_msdos", fs_module, "biosdisk"],
check=True)
partition_offset = partitions[0]["start"]
partition_offset = pt[0].start_in_bytes
assert os.path.getsize(grub2_core) < partition_offset - 512
with open(image, "rb+") as image_f:
@ -298,25 +383,26 @@ def main(tree, output_dir, options, loop_client):
subprocess.run(["truncate", "--size", str(size), image], check=True)
# The partition table
pttype, partitions = create_partition_table(image, options)
pt = partition_table_from_options(options)
pt.write_to(image)
# Create the level-2 bootloader
if pttype == "mbr":
install_grub2(image, partitions)
if pt.label == "dos":
install_grub2(image, pt)
# Now assemble the filesystem hierarchy and copy the tree into the image
with contextlib.ExitStack() as cm:
root = cm.enter_context(tempfile.TemporaryDirectory(prefix="osbuild-mnt"))
# sort the partition according to their position in the filesystem tree
for partition in sorted(partitions, key=lambda p: len(p["filesystem"]["mountpoint"])):
offset, size = partition["start"], partition["size"]
filesystem = partition["filesystem"]
# iterate the partition according to their position in the filesystem tree
for partition in pt.partitions_with_filesystems():
offset, size = partition.start_in_bytes, partition.size_in_bytes
loop = cm.enter_context(loop_client.device(image, offset, size))
# make the specified filesystem
fs_label = filesystem.get("label")
mkfs_for_type(loop, filesystem["uuid"], filesystem["type"], fs_label)
# make the specified filesystem, if any
if partition.filesystem is None:
continue
mkfs_for_type(loop, partition.filesystem)
# now mount it
mountpoint = os.path.normpath(f"{root}/{filesystem['mountpoint']}")
mountpoint = os.path.normpath(f"{root}/{partition.mountpoint}")
os.makedirs(mountpoint, exist_ok=True)
cm.enter_context(mount(loop, mountpoint))
# the filesystem tree should now be properly setup,