diff --git a/tools/osbuild-mpp b/tools/osbuild-mpp index 04e0bc0e..00660c3d 100755 --- a/tools/osbuild-mpp +++ b/tools/osbuild-mpp @@ -119,6 +119,37 @@ Example: ... ``` +Defining partition layouts for disk images: + +It is possbile to define a partition layout via `mpp-define-image`. The defined layout +is actually written to a temporary sparse file and read back via `sfdisk`, so that all +partition data like `size` and `start` include actual padding and such. The `image` +variable will be defined with `size` and `layout` keys, the latter containing the +partition layout data. It can be accessed via the "String expansion" explained above. + +Example: + +``` +... + "mpp-define-image": { + "size": "10737418240", + "table": { + "uuid": "D209C89E-EA5E-4FBD-B161-B461CCE297E0", + "label": "gpt", + "partitions": [ + { + "id": "bios-boot", + "start": 2048, + "size": 2048, + "type": "21686148-6449-6E6F-744E-656564454649", + "bootable": true, + "uuid": "FAC7F1FB-3E8D-4137-A512-961DE09A5549" + }, + ... + } +... +``` + """ @@ -127,6 +158,7 @@ import contextlib import json import os import string +import subprocess import sys import pathlib import tempfile @@ -321,6 +353,163 @@ class DepSolver: return deps +class Partition: + def __init__(self, + uid: str = None, + pttype: str = None, + start: int = None, + size: int = None, + bootable: bool = False, + name: str = None, + uuid: str = None): + self.id = uid + self.type = pttype + self.start = start + self.size = size + self.bootable = bootable + self.name = name + self.uuid = uuid + self.index = None + + @property + def start_in_bytes(self): + return (self.start or 0) * 512 + + @property + def size_in_bytes(self): + return (self.size or 0) * 512 + + @classmethod + def from_dict(cls, js): + p = cls(uid=js.get("id"), + pttype=js.get("type"), + start=js.get("start"), + size=js.get("size"), + bootable=js.get("bootable"), + name=js.get("name"), + uuid=js.get("uuid")) + return p + + def to_dict(self): + data = {} + + if self.start: + data["start"] = self.start + if self.size: + data["size"] = self.size + if self.type: + data["type"] = self.type + if self.bootable: + data["bootable"] = self.bootable + if self.name: + data["name"] = self.name + if self.uuid: + data["uuid"] = self.uuid + + return data + + +class PartitionTable: + def __init__(self, label, uuid, partitions): + self.label = label + self.uuid = uuid + self.partitions = partitions or [] + + def __getitem__(self, key) -> Partition: + if isinstance(key, int): + return self.partitions[key] + elif isinstance(key, str): + for part in self.partitions: + if part.id == key: + return part + else: + raise ValueError("unsupported type") + + def write_to(self, target, sync=True): + """Write the partition table to disk""" + # generate the command for sfdisk to create the table + command = f"label: {self.label}\nlabel-id: {self.uuid}" + for partition in self.partitions: + fields = [] + for field in ["start", "size", "type", "name", "uuid"]: + value = getattr(partition, field) + if value: + fields += [f'{field}="{value}"'] + if partition.bootable: + fields += ["bootable"] + command += "\n" + ", ".join(fields) + + subprocess.run(["sfdisk", "-q", "--no-tell-kernel", target], + input=command, + encoding='utf-8', + check=True) + + if sync: + self.update_from(target) + + def update_from(self, target): + """Update and fill in missing information from disk""" + r = subprocess.run(["sfdisk", "--json", target], + stdout=subprocess.PIPE, + encoding='utf-8', + check=True) + disk_table = json.loads(r.stdout)["partitiontable"] + disk_parts = disk_table["partitions"] + + assert len(disk_parts) == len(self.partitions) + for i, part in enumerate(self.partitions): + part.index = i + part.start = disk_parts[i]["start"] + part.size = disk_parts[i]["size"] + part.type = disk_parts[i].get("type") + part.name = disk_parts[i].get("name") + + @classmethod + def from_dict(cls, js) -> Partition: + ptuuid = js["uuid"] + pttype = js["label"] + partitions = js.get("partitions") + + parts = [Partition.from_dict(p) for p in partitions] + table = cls(pttype, ptuuid, parts) + + return table + + def __str__(self) -> str: + data = {} + + if self.uuid: + data["uuid"] = self.uuid + + data["label"] = self.label + + data["partitions"] = [ + pt.to_dict() for pt in self.partitions + ] + + return json.dumps(data, indent=2) + + +class Image: + def __init__(self, size, layout): + self.size = size + self.layout = layout + + @classmethod + def from_dict(cls, js): + size = js["size"] + data = js["table"] + + with tempfile.TemporaryDirectory() as tmp: + image = os.path.join(tmp, "disk.img") + subprocess.run(["truncate", "--size", size, image], check=True) + + table = PartitionTable.from_dict(data) + table.write_to(image) + + return cls(size, table) + + class ManifestFile: @staticmethod def load(path): @@ -452,21 +641,29 @@ class ManifestFile: def _is_format(node): if not isinstance(node, dict): return False - return "mpp-format-string" in node or "mpp-format-int" in node + for m in ("int", "string", "json"): + if f"mpp-format-{m}" in node: + return True + return False def _eval_format(node, local_vars): if "mpp-format-string" in node: - is_int = False + res_type = "string" format_string = node["mpp-format-string"] + elif "mpp-format-json" in node: + res_type = "json" + format_string = node["mpp-format-json"] else: - is_int = True + res_type = "int" format_string = node["mpp-format-int"] # pylint: disable=eval-used # yolo this is fine! res = eval(f'f\'\'\'{format_string}\'\'\'', local_vars) - if is_int: + if res_type == "int": return int(res) + elif res_type == "json": + return json.loads(res) return res if isinstance(node, dict): @@ -486,6 +683,15 @@ class ManifestFile: def process_format(self): self._process_format(self.root) + def process_partition(self): + desc = self.root.get("mpp-define-image") + if not desc: + return + + del self.root["mpp-define-image"] + + self.vars["image"] = Image.from_dict(desc) + class ManifestFileV1(ManifestFile): def __init__(self, path, data): @@ -691,6 +897,8 @@ def main(): # First resolve all imports m.process_imports(args.searchdirs) + m.process_partition() + # Override variables from the main of imported files if args.vars: m.set_vars(args.vars)