debian-forge-composer/tools/image-info
Tom Gundersen 18934d4249 tools/image-info: add support for naked partitions
In case there is no partition table, we assume the whole image is
one big partition and treat is as such.

Signed-off-by: Tom Gundersen <teg@jklm.no>
2019-10-25 11:46:25 +02:00

178 lines
5.5 KiB
Python
Executable file

#!/usr/bin/python3
import contextlib
import glob
import json
import os
import subprocess
import sys
import tempfile
image = sys.argv[1]
subprocess.run(["modprobe", "nbd"], check=True)
@contextlib.contextmanager
def nbd_connect(image):
for device in glob.glob("/dev/nbd*"):
r = subprocess.run(["qemu-nbd", "--connect", device, "--read-only", image], check=False).returncode
if r == 0:
try:
yield device
finally:
subprocess.run(["qemu-nbd", "--disconnect", device], check=True, stdout=subprocess.DEVNULL)
break
else:
raise RuntimeError("no free network block device")
@contextlib.contextmanager
def mount(device):
with tempfile.TemporaryDirectory() as mountpoint:
subprocess.run(["mount", "-o", "ro", device, mountpoint], check=True)
try:
yield mountpoint
finally:
subprocess.run(["umount", "--lazy", mountpoint], check=True)
def parse_environment_vars(s):
r = {}
for line in s.split("\n"):
line = line.strip()
if not line:
continue
key, value = line.split("=")
r[key] = value.strip('"')
return r
def subprocess_check_output(argv, parse_fn=None):
output = subprocess.check_output(argv, encoding="utf-8")
return parse_fn(output) if parse_fn else output
def read_image_format(device):
qemu = subprocess_check_output(["qemu-img", "info", "--output=json", device], json.loads)
return qemu["format"]
def read_partition(device, bootable, typ=None, start=0, size=0, type=None):
blkid = subprocess_check_output(["blkid", "--output", "export", device], parse_environment_vars)
return {
"label": blkid.get("LABEL"), # doesn't exist for mbr
"type": typ,
"uuid": blkid["UUID"],
"fstype": blkid["TYPE"],
"bootable": bootable,
"start": start,
"size": size
}
def read_partition_table(device):
partitions = []
try:
sfdisk = subprocess_check_output(["sfdisk", "--json", device], json.loads)
except subprocess.CalledProcessError:
partitions.append(read_partition(device, False))
return None, None, partitions
else:
ptable = sfdisk["partitiontable"]
assert ptable["unit"] == "sectors"
for p in ptable["partitions"]:
partitions.append(read_partition(p["node"], p.get("bootable", False), p["type"], p["start"] * 512, p["size"] * 512))
return ptable["label"], ptable["id"], partitions
def read_bootloader_type(device):
with open(device, "rb") as f:
if b"GRUB" in f.read(512):
return "grub"
else:
return "unknown"
def read_boot_entries(boot_dir):
entries = []
for conf in glob.glob(f"{boot_dir}/loader/entries/*.conf"):
with open(conf) as f:
entries.append(dict(line.strip().split(" ", 1) for line in f))
return sorted(entries, key=lambda e: e["title"])
def rpm_verify(tree):
# cannot use `rpm --root` here, because rpm uses passwd from the host to
# verify user and group ownership:
# https://github.com/rpm-software-management/rpm/issues/882
rpm = subprocess.Popen(["chroot", tree, "rpm", "--verify", "--all"],
stdout=subprocess.PIPE, encoding="utf-8")
changed = {}
missing = []
for line in rpm.stdout:
# format description in rpm(8), under `--verify`
attrs = line[:9]
if attrs == "missing ":
missing.append(line[12:].rstrip())
else:
changed[line[13:].rstrip()] = attrs
# ignore return value, because it returns non-zero when it found changes
rpm.wait()
return {
"missing": sorted(missing),
"changed": changed
}
def append_filesystem(report, tree):
if os.path.exists(f"{tree}/etc/os-release"):
report["packages"] = sorted(subprocess_check_output(["rpm", "--root", tree, "-qa"], str.split))
report["rpm-verify"] = rpm_verify(tree)
with open(f"{tree}/etc/os-release") as f:
report["os-release"] = parse_environment_vars(f.read())
try:
with open(f"{tree}/etc/fstab") as f:
report["fstab"] = sorted([line.split() for line in f.read().split("\n") if line and not line.startswith("#")])
except FileNotFoundError:
pass
with open(f"{tree}/etc/passwd") as f:
report["passwd"] = sorted(f.read().strip().split("\n"))
with open(f"{tree}/etc/group") as f:
report["groups"] = sorted(f.read().strip().split("\n"))
if os.path.exists(f"{tree}/boot") and len(os.listdir(f"{tree}/boot")) > 0:
assert "bootmenu" not in report
report["bootmenu"] = read_boot_entries(f"{tree}/boot")
elif len(glob.glob(f"{tree}/vmlinuz-*")) > 0:
assert "bootmenu" not in report
report["bootmenu"] = read_boot_entries(tree)
report = {}
with nbd_connect(image) as device:
report["image-format"] = read_image_format(image)
report["bootloader"] = read_bootloader_type(device)
report["partition-table"], report["partition-table-id"], report["partitions"] = read_partition_table(device)
if report["partition-table"]:
n_partitions = len(report["partitions"])
for n in range(1, n_partitions + 1):
with mount(device + f"p{n}") as tree:
append_filesystem(report, tree)
else:
with mount(device) as tree:
append_filesystem(report, tree)
json.dump(report, sys.stdout, sort_keys=True, indent=2)