diff --git a/osbuild/util/lvm2.py b/osbuild/util/lvm2.py new file mode 100644 index 00000000..1aba0cb6 --- /dev/null +++ b/osbuild/util/lvm2.py @@ -0,0 +1,628 @@ +#!/usr/bin/python3 +""" +Utility functions to read and write LVM metadata. + +This module provides a `Disk` class that can be used +to read in LVM images and explore and manipulate its +metadata directly, i.e. it reads and writes the data +and headers directly. This allows one to rename an +volume group without having to involve the kernel, +which does not like to have two active LVM volume +groups with the same name. + +The struct definitions have been taken from upstream +LVM2 sources[1], specifically: + - `lib/format_text/layout.h` + - `lib/format_text/format-text.c` + +[1] https://github.com/lvmteam/lvm2 (commit 8801a86) +""" + +import abc +import binascii +import io +import json +import os +import re +import struct +import sys + +from collections import OrderedDict +from typing import BinaryIO, Dict, Union + +PathLike = Union[str, bytes, os.PathLike] + +INITIAL_CRC = 0xf597a6cf +MDA_HEADER_SIZE = 512 + + +def _calc_crc(buf, crc=INITIAL_CRC): + crc = crc ^ 0xFFFFFFFF + crc = binascii.crc32(buf, crc) + return crc ^ 0xFFFFFFFF + + +class CStruct: + class Field: + def __init__(self, name: str, ctype: str, position: int): + self.name = name + self.type = ctype + self.pos = position + + def __init__(self, mapping: Dict, byte_order="<"): + fmt = byte_order + self.fields = [] + for pos, name in enumerate(mapping): + ctype = mapping[name] + fmt += ctype + field = self.Field(name, ctype, pos) + self.fields.append(field) + self.struct = struct.Struct(fmt) + + @property + def size(self): + return self.struct.size + + def unpack(self, data): + up = self.struct.unpack_from(data) + res = { + field.name: up[idx] + for idx, field in enumerate(self.fields) + } + return res + + def read(self, fp): + pos = fp.tell() + data = fp.read(self.size) + + if len(data) < self.size: + return None + + res = self.unpack(data) + res["_position"] = pos + return res + + def pack(self, data): + values = [ + data[field.name] for field in self.fields + ] + data = self.struct.pack(*values) + return data + + def write(self, fp, data: Dict, *, offset=None): + packed = self.pack(data) + + save = None + if offset: + save = fp.tell() + fp.seek(offset) + + fp.write(packed) + + if save: + fp.seek(save) + + def __getitem__(self, name): + for f in self.fields: + if f.name == f: + return f + raise KeyError(f"Unknown field '{name}'") + + def __contains__(self, name): + return any(field.name == name for field in self.fields) + + +class Header: + """Abstract base class for all headers""" + + @property + @classmethod + @abc.abstractmethod + def struct(cls) -> struct.Struct: + """Definition of the underlying struct data""" + + def __init__(self, data): + self.data = data + + def __getitem__(self, name): + assert name in self.struct + return self.data[name] + + def __setitem__(self, name, value): + assert name in self.struct + self.data[name] = value + + def pack(self): + return self.struct.pack(self.data) + + @classmethod + def read(cls, fp): + data = cls.struct.read(fp) # pylint: disable=no-member + return cls(data) + + def write(self, fp): + raw = self.pack() + fp.write(raw) + + def __str__(self) -> str: + msg = f"{self.__class__.__name__}:" + for f in self.struct.fields: + msg += f"\n\t{f.name}: {self[f.name]}" + return msg + + +class LabelHeader(Header): + + struct = CStruct({ # 32 bytes on disk + "id": "8s", # int8_t[8] // LABELONE + "sector": "Q", # uint64_t // Sector number of this label + "crc": "L", # uint32_t // From next field to end of sector + "offset": "L", # uint32_t // Offset from start of struct to contents + "type": "8s" # int8_t[8] // LVM2 00 + }) + + LABELID = b"LABELONE" + + # scan sector 0 to 3 inclusive + LABEL_SCAN_SECTORS = 4 + + def __init__(self, data): + super().__init__(data) + self.sector_size = 512 + + @classmethod + def search(cls, fp, *, sector_size=512): + fp.seek(0, io.SEEK_SET) + for _ in range(cls.LABEL_SCAN_SECTORS): + raw = fp.read(sector_size) + if raw[0:len(cls.LABELID)] == cls.LABELID: + data = cls.struct.unpack(raw) + return LabelHeader(data) + return None + + def read_pv_header(self, fp): + sector = self.data["sector"] + offset = self.data["offset"] + offset = sector * self.sector_size + offset + fp.seek(offset) + return PVHeader.read(fp) + + +class DiskLocN(Header): + + struct = CStruct({ + "offset": "Q", # uint64_t // Offset in bytes to start sector + "size": "Q" # uint64_t // Size in bytes + }) + + @property + def offset(self): + return self.data["offset"] + + @property + def size(self): + return self.data["size"] + + def read_data(self, fp: BinaryIO): + fp.seek(self.offset) + data = fp.read(self.size) + return io.BytesIO(data) + + @classmethod + def read_array(cls, fp): + while True: + data = cls.struct.read(fp) + + if not data or data["offset"] == 0: + break + + yield DiskLocN(data) + + +class PVHeader(Header): + + ID_LEN = 32 + struct = CStruct({ + "uuid": "32s", # int8_t[ID_LEN] + "disk_size": "Q" # uint64_t // size in bytes + }) + # followed by two NULL terminated list of data areas + # and metadata areas of type `DiskLocN` + + def __init__(self, data, data_areas, meta_areas): + super().__init__(data) + self.data_areas = data_areas + self.meta_areas = meta_areas + + @property + def uuid(self): + return self.data["uuid"] + + @property + def disk_size(self): + return self.data["disk_size"] + + @classmethod + def read(cls, fp): + data = cls.struct.read(fp) + + data_areas = list(DiskLocN.read_array(fp)) + meta_areas = list(DiskLocN.read_array(fp)) + + return cls(data, data_areas, meta_areas) + + def __str__(self): + msg = super().__str__() + if self.data_areas: + msg += "\nData: \n\t" + "\n\t".join(map(str, self.data_areas)) + if self.meta_areas: + msg += "\nMeta: \n\t" + "\n\t".join(map(str, self.meta_areas)) + return msg + + +class RawLocN(Header): + struct = CStruct({ + "offset": "Q", # uint64_t // Offset in bytes to start sector + "size": "Q", # uint64_t // Size in bytes + "checksum": "L", # uint32_t // Checksum of data + "flags": "L", # uint32_t // Flags + }) + + IGNORED = 0x00000001 + + @classmethod + def read_array(cls, fp: BinaryIO): + while True: + loc = cls.struct.read(fp) + + if not loc or loc["offset"] == 0: + break + + yield cls(loc) + + +class MDAHeader(Header): + struct = CStruct({ + "checksum": "L", # uint32_t // Checksum of data + "magic": "16s", # int8_t[16] // Allows to scan for metadata + "version": "L", # uint32_t + "start": "Q", # uint64_t // Absolute start byte of itself + "size": "Q" # uint64_t // Size of metadata area + }) + # followed by a null termiated list of type `RawLocN` + + LOC_COMMITTED = 0 + LOC_PRECOMMITTED = 1 + + HEADER_SIZE = MDA_HEADER_SIZE + + def __init__(self, data, raw_locns): + super().__init__(data) + self.raw_locns = raw_locns + + @property + def checksum(self): + return self.data["checksum"] + + @property + def magic(self): + return self.data["magic"] + + @property + def version(self): + return self.data["version"] + + @property + def start(self): + return self.data["start"] + + @property + def size(self): + return self.data["size"] + + @classmethod + def read(cls, fp): + data = cls.struct.read(fp) + raw_locns = list(RawLocN.read_array(fp)) + return cls(data, raw_locns) + + def read_metadata(self, fp) -> "Metadata": + loc = self.raw_locns[self.LOC_COMMITTED] + offset = self.start + loc["offset"] + fp.seek(offset) + data = fp.read(loc["size"]) + md = Metadata.decode(data) + return md + + def write_metadata(self, fp, data: "Metadata"): + raw = data.encode() + + loc = self.raw_locns[self.LOC_COMMITTED] + offset = self.start + loc["offset"] + fp.seek(offset) + + n = fp.write(raw) + loc["size"] = n + loc["checksum"] = _calc_crc(raw) + self.write(fp) + + def write(self, fp): + data = self.struct.pack(self.data) + + fr = io.BytesIO() + fr.write(data) + + for loc in self.raw_locns: + loc.write(fr) + + l = fr.tell() + fr.write(b"\0" * (self.HEADER_SIZE - l)) + + raw = fr.getvalue() + + cs = struct.Struct(" None: + self._vg_name = vg_name + self.data = data + + @property + def vg_name(self) -> str: + return self._vg_name + + @vg_name.setter + def vg_name(self, vg_name: str) -> None: + self.rename_vg(vg_name) + + def rename_vg(self, new_name): + # Replace the corresponding key in the dict and + # ensure it is always the first key + name = self.vg_name + d = self.data[name] + del self.data[name] + self.data[new_name] = d + self.data.move_to_end(new_name, last=False) + + @classmethod + def decode(cls, data: bytes) -> "Metadata": + data = data.decode("utf-8") + name, md = Metadata.decode_data(data) + return cls(name, md) + + def encode(self) -> bytes: + data = Metadata.encode_data(self.data) + return data.encode("utf-8") + + def __str__(self) -> str: + return json.dumps(self.data, indent=2) + + @staticmethod + def decode_data(raw): + substitutions = { + r"#.*\n": "", + r"\[": "[ ", + r"\]": " ]", + r'"': ' " ', + r"[=,]": "", + r"\s+": " ", + r"\0$": "", + } + + data = raw + for pattern, repl in substitutions.items(): + data = re.sub(pattern, repl, data) + + data = data.split() + + DICT_START = '{' + DICT_END = '}' + ARRAY_START = '[' + ARRAY_END = ']' + STRING_START = '"' + STRING_END = '"' + + def next_token(): + if not data: + return None + return data.pop(0) + + def parse_str(val): + result = "" + + while val != STRING_END: + result = f"{result} {val}" + val = next_token() + + return result.strip() + + def parse_type(val): + # type = integer | float | string + # integer = [0-9]* + # float = [0-9]*'.'[0-9]* + # string = '"'.*'"' + + if val == STRING_START: + return parse_str(next_token()) + if "." in val: + return float(val) + return int(val) + + def parse_array(val): + result = [] + + while val != ARRAY_END: + val = parse_type(val) + result.append(val) + val = next_token() + + return result + + def parse_section(val): + result = OrderedDict() + + while val and val != DICT_END: + result[val] = parse_value() + val = next_token() + + return result + + def parse_value(): + val = next_token() + + if val == DICT_START: + return parse_section(next_token()) + if val == ARRAY_START: + return parse_array(next_token()) + + return parse_type(val) + + name = next_token() + obj = parse_section(name) + + return name, obj + + @staticmethod + def encode_data(data): + + def encode_dict(d): + s = "" + for k, v in d.items(): + s += k + if not isinstance(v, dict): + s += " = " + else: + s += " " + s += encode_val(v) + "\n" + return s + + def encode_val(v): + if isinstance(v, int): + s = str(v) + elif isinstance(v, str): + s = f'"{v}"' + elif isinstance(v, list): + s = "[" + ", ".join(encode_val(x) for x in v) + "]" + elif isinstance(v, dict): + s = '{\n' + s += encode_dict(v) + s += '}\n' + return s + + return encode_dict(data) + "\0" + + +class Disk: + def __init__(self, fp, path: PathLike) -> None: + self.fp = fp + self.path = path + + self.lbl_hdr = None + self.pv_hdr = None + self.ma_headers = [] + self.metadata = None + + try: + self._init_headers() + except: # pylint: disable=broad-except + self.fp.close() + raise + + def _init_headers(self): + fp = self.fp + lbl = LabelHeader.search(fp) + + if not lbl: + raise RuntimeError("Could not find label header") + + self.lbl_hdr = lbl + self.pv_hdr = lbl.read_pv_header(fp) + + pv = self.pv_hdr + + for ma in pv.meta_areas: + data = ma.read_data(self.fp) + hdr = MDAHeader.read(data) + self.ma_headers.append(hdr) + + if not self.ma_headers: + raise RuntimeError("Could not find metadata header") + + md = self.ma_headers[0].read_metadata(fp) + self.metadata = md + + @classmethod + def open(cls, path: PathLike, *, read_only=False) -> None: + mode = "rb" + if not read_only: + mode += "+" + + fp = open(path, mode) + + return cls(fp, path) + + def flush_metadata(self): + for ma in self.ma_headers: + ma.write_metadata(self.fp, self.metadata) + + def rename_vg(self, new_name): + """Rename the volume group""" + self.metadata.rename_vg(new_name) + + def set_description(self, desc: str) -> None: + """Set the description of in the metadata block""" + self.metadata.data["description"] = desc + + def set_creation_time(self, t: int) -> None: + """Set the creation time of the volume group""" + self.metadata.data["creation_time"] = t + + def set_creation_host(self, host: str) -> None: + """Set the host that created the volume group""" + self.metadata.data["creation_host"] = host + + def dump(self): + print(self.path) + print(self.lbl_hdr) + print(self.pv_hdr) + print(self.metadata) + + def __enter__(self): + assert self.fp, "Disk not open" + return self + + def __exit__(self, *exc_details): + if self.fp: + self.fp.flush() + self.fp.close() + self.fp = None + + +def main(): + + if len(sys.argv) != 2: + print(f"usage: {sys.argv[0]} DISK") + sys.exit(1) + + with Disk.open(sys.argv[1]) as disk: + disk.dump() + + +if __name__ == "__main__": + main()