#!/usr/bin/python3 import contextlib import glob import json import os import subprocess import sys import tempfile image = sys.argv[1] subprocess.run(["modprobe", "nbd"], check=True) @contextlib.contextmanager def nbd_connect(image): for device in glob.glob("/dev/nbd*"): r = subprocess.run(["qemu-nbd", "--connect", device, "--read-only", image], check=False).returncode if r == 0: try: yield device finally: subprocess.run(["qemu-nbd", "--disconnect", device], check=True, stdout=subprocess.DEVNULL) break else: raise RuntimeError("no free network block device") @contextlib.contextmanager def mount(device): with tempfile.TemporaryDirectory() as mountpoint: subprocess.run(["mount", "-o", "ro", device, mountpoint], check=True) try: yield mountpoint finally: subprocess.run(["umount", "--lazy", mountpoint], check=True) def parse_environment_vars(s): r = {} for line in s.split("\n"): line = line.strip() if not line: continue key, value = line.split("=") r[key] = value.strip('"') return r def subprocess_check_output(argv, parse_fn=None): output = subprocess.check_output(argv, encoding="utf-8") return parse_fn(output) if parse_fn else output def read_image_format(device): qemu = subprocess_check_output(["qemu-img", "info", "--output=json", device], json.loads) return qemu["format"] def read_partition_table(device): sfdisk = subprocess_check_output(["sfdisk", "--json", device], json.loads) ptable = sfdisk["partitiontable"] assert ptable["unit"] == "sectors" partitions = [] for p in ptable["partitions"]: blkid = subprocess_check_output(["blkid", "--output", "export", p["node"]], parse_environment_vars) partitions.append({ "label": blkid.get("LABEL"), # doesn't exist for mbr "type": p["type"], "uuid": blkid["UUID"], "fstype": blkid["TYPE"], "bootable": p.get("bootable", False), "start": p["start"] * 512, "size": p["size"] * 512 }) return ptable["label"], ptable["id"], partitions def read_bootloader_type(device): with open(device, "rb") as f: if b"GRUB" in f.read(512): return "grub" else: return "unknown" def read_bls_conf(filename): with open(filename) as f: return dict(line.strip().split(" ", 1) for line in f) def rpm_verify(tree): rpm = subprocess.Popen(["rpm", "--root", tree, "--verify", "--all"], stdout=subprocess.PIPE, encoding="utf-8") changed = {} missing = [] for line in rpm.stdout: # format description in rpm(8), under `--verify` attrs = line[:9] if attrs == "missing ": missing.append(line[12:].rstrip()) else: changed[line[13:].rstrip()] = attrs # ignore return value, because it returns non-zero when it found changes rpm.wait() return { "missing": sorted(missing), "changed": changed } report = {} with nbd_connect(image) as device: report["image-format"] = read_image_format(image) report["bootloader"] = read_bootloader_type(device) report["partition-table"], report["partition-table-id"], report["partitions"] = read_partition_table(device) n_partitions = len(report["partitions"]) for n in range(1, n_partitions + 1): with mount(device + f"p{n}") as tree: if os.path.exists(f"{tree}/etc/os-release"): report["packages"] = sorted(subprocess_check_output(["rpm", "--root", tree, "-qa"], str.split)) report["rpm-verify"] = rpm_verify(tree) with open(f"{tree}/etc/os-release") as f: report["os-release"] = parse_environment_vars(f.read()) with open(f"{tree}/etc/fstab") as f: report["fstab"] = sorted([line.split() for line in f.read().split("\n") if line and not line.startswith("#")]) with open(f"{tree}/etc/passwd") as f: report["passwd"] = sorted(f.read().strip().split("\n")) with open(f"{tree}/etc/group") as f: report["groups"] = sorted(f.read().strip().split("\n")) if os.path.exists(f"{tree}/boot") and len(os.listdir(f"{tree}/boot")) > 0: assert "bootmenu" not in report report["bootmenu"] = [read_bls_conf(f) for f in glob.glob(f"{tree}/boot/loader/entries/*.conf")] elif len(glob.glob(f"{tree}/vmlinuz-*")) > 0: assert "bootmenu" not in report report["bootmenu"] = [read_bls_conf(f) for f in glob.glob(f"{tree}/loader/entries/*.conf")] json.dump(report, sys.stdout, sort_keys=True, indent=2)