The org.osbuild.sysconfig stage is now supported. Config updates can be made to the kernel and network files. Currently, the same values are used for all image types in rhel84. The image-info script is updated to allow testing the sysconfig info.
628 lines
20 KiB
Python
Executable file
628 lines
20 KiB
Python
Executable file
#!/usr/bin/python3
|
|
|
|
import argparse
|
|
import contextlib
|
|
import errno
|
|
import functools
|
|
import glob
|
|
import mimetypes
|
|
import json
|
|
import os
|
|
import platform
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import xml.etree.ElementTree
|
|
|
|
from osbuild import loop
|
|
|
|
|
|
def run_ostree(*args, _input=None, _check=True, **kwargs):
|
|
args = list(args) + [f'--{k}={v}' for k, v in kwargs.items()]
|
|
print("ostree " + " ".join(args), file=sys.stderr)
|
|
res = subprocess.run(["ostree"] + args,
|
|
encoding="utf-8",
|
|
stdout=subprocess.PIPE,
|
|
input=_input,
|
|
check=_check)
|
|
return res
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def loop_create_device(ctl, fd, offset=None, sizelimit=None):
|
|
while True:
|
|
lo = loop.Loop(ctl.get_unbound())
|
|
try:
|
|
lo.set_fd(fd)
|
|
except OSError as e:
|
|
lo.close()
|
|
if e.errno == errno.EBUSY:
|
|
continue
|
|
raise e
|
|
try:
|
|
lo.set_status(offset=offset, sizelimit=sizelimit, autoclear=True)
|
|
except BlockingIOError:
|
|
lo.clear_fd()
|
|
lo.close()
|
|
continue
|
|
break
|
|
try:
|
|
yield lo
|
|
finally:
|
|
lo.close()
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def loop_open(ctl, image, *, offset=None, size=None):
|
|
with open(image, "rb") as f:
|
|
fd = f.fileno()
|
|
with loop_create_device(ctl, fd, offset=offset, sizelimit=size) as lo:
|
|
yield os.path.join("/dev", lo.devname)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def open_image(ctl, image, fmt):
|
|
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp:
|
|
if fmt != "raw":
|
|
target = os.path.join(tmp, "image.raw")
|
|
# A bug exists in qemu that causes the conversion to raw to fail
|
|
# on aarch64 systems with a LOT of CPUs. A workaround is to use
|
|
# a single coroutine to do the conversion. It doesn't slow down
|
|
# the conversion by much, but it hangs about half the time without
|
|
# the limit set. 😢
|
|
# Bug: https://bugs.launchpad.net/qemu/+bug/1805256
|
|
if platform.machine() == 'aarch64':
|
|
subprocess.run(
|
|
["qemu-img", "convert", "-m", "1", "-O", "raw", image, target],
|
|
check=True
|
|
)
|
|
else:
|
|
subprocess.run(
|
|
["qemu-img", "convert", "-O", "raw", image, target],
|
|
check=True
|
|
)
|
|
else:
|
|
target = image
|
|
|
|
size = os.stat(target).st_size
|
|
|
|
with loop_open(ctl, target, offset=0, size=size) as dev:
|
|
yield target, dev
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def mount_at(device, mountpoint, options=[], extra=[]):
|
|
opts = ",".join(["ro"] + options)
|
|
subprocess.run(["mount", "-o", opts] + extra + [device, mountpoint], check=True)
|
|
try:
|
|
yield mountpoint
|
|
finally:
|
|
subprocess.run(["umount", "--lazy", mountpoint], check=True)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def mount(device):
|
|
with tempfile.TemporaryDirectory() as mountpoint:
|
|
subprocess.run(["mount", "-o", "ro", device, mountpoint], check=True)
|
|
try:
|
|
yield mountpoint
|
|
finally:
|
|
subprocess.run(["umount", "--lazy", mountpoint], check=True)
|
|
|
|
|
|
def parse_environment_vars(s):
|
|
r = {}
|
|
for line in s.split("\n"):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
if line[0] == '#':
|
|
continue
|
|
key, value = line.split("=", 1)
|
|
r[key] = value.strip('"')
|
|
return r
|
|
|
|
|
|
def parse_unit_files(s, expected_state):
|
|
r = []
|
|
for line in s.split("\n")[1:]:
|
|
try:
|
|
unit, state, *_ = line.split()
|
|
except ValueError:
|
|
pass
|
|
if state != expected_state:
|
|
continue
|
|
r.append(unit)
|
|
|
|
return r
|
|
|
|
|
|
def subprocess_check_output(argv, parse_fn=None):
|
|
try:
|
|
output = subprocess.check_output(argv, encoding="utf-8")
|
|
except subprocess.CalledProcessError as e:
|
|
sys.stderr.write(f"--- Output from {argv}:\n")
|
|
sys.stderr.write(e.stdout)
|
|
sys.stderr.write("\n--- End of the output\n")
|
|
raise
|
|
|
|
return parse_fn(output) if parse_fn else output
|
|
|
|
|
|
def read_image_format(device):
|
|
qemu = subprocess_check_output(["qemu-img", "info", "--output=json", device], json.loads)
|
|
return qemu["format"]
|
|
|
|
|
|
def read_partition(device, partition):
|
|
res = subprocess.run(["blkid", "--output", "export", device],
|
|
check=False, encoding="utf-8",
|
|
stdout=subprocess.PIPE)
|
|
if res.returncode == 0:
|
|
blkid = parse_environment_vars(res.stdout)
|
|
else:
|
|
blkid = {}
|
|
|
|
partition["label"] = blkid.get("LABEL") # doesn't exist for mbr
|
|
partition["uuid"] = blkid.get("UUID")
|
|
partition["fstype"] = blkid.get("TYPE")
|
|
return partition
|
|
|
|
|
|
def read_partition_table(device):
|
|
partitions = []
|
|
info = {"partition-table": None,
|
|
"partition-table-id": None,
|
|
"partitions": partitions}
|
|
try:
|
|
sfdisk = subprocess_check_output(["sfdisk", "--json", device], json.loads)
|
|
except subprocess.CalledProcessError:
|
|
partitions.append(read_partition(device, False))
|
|
return info
|
|
|
|
ptable = sfdisk["partitiontable"]
|
|
assert ptable["unit"] == "sectors"
|
|
is_dos = ptable["label"] == "dos"
|
|
ssize = ptable.get("sectorsize", 512)
|
|
|
|
for i, p in enumerate(ptable["partitions"]):
|
|
|
|
partuuid = p.get("uuid")
|
|
if not partuuid and is_dos:
|
|
# For dos/mbr partition layouts the partition uuid
|
|
# is generated. Normally this would be done by
|
|
# udev+blkid, when the partition table is scanned.
|
|
# 'sfdisk' prefixes the partition id with '0x' but
|
|
# 'blkid' does not; remove it to mimic 'blkid'
|
|
table_id = ptable['id'][2:]
|
|
partuuid = "%.33s-%02x" % (table_id, i+1)
|
|
|
|
partitions.append({
|
|
"bootable": p.get("bootable", False),
|
|
"type": p["type"],
|
|
"start": p["start"] * ssize,
|
|
"size": p["size"] * ssize,
|
|
"partuuid": partuuid
|
|
})
|
|
|
|
info["partition-table"] = ptable["label"]
|
|
info["partition-table-id"] = ptable["id"]
|
|
|
|
return info
|
|
|
|
|
|
def read_bootloader_type(device):
|
|
with open(device, "rb") as f:
|
|
if b"GRUB" in f.read(512):
|
|
return "grub"
|
|
else:
|
|
return "unknown"
|
|
|
|
|
|
def read_boot_entries(boot_dir):
|
|
entries = []
|
|
for conf in glob.glob(f"{boot_dir}/loader/entries/*.conf"):
|
|
with open(conf) as f:
|
|
entries.append(dict(line.strip().split(" ", 1) for line in f))
|
|
|
|
return sorted(entries, key=lambda e: e["title"])
|
|
|
|
|
|
def rpm_verify(tree):
|
|
# cannot use `rpm --root` here, because rpm uses passwd from the host to
|
|
# verify user and group ownership:
|
|
# https://github.com/rpm-software-management/rpm/issues/882
|
|
rpm = subprocess.Popen(["chroot", tree, "rpm", "--verify", "--all"],
|
|
stdout=subprocess.PIPE, encoding="utf-8")
|
|
|
|
changed = {}
|
|
missing = []
|
|
for line in rpm.stdout:
|
|
# format description in rpm(8), under `--verify`
|
|
attrs = line[:9]
|
|
if attrs == "missing ":
|
|
missing.append(line[12:].rstrip())
|
|
else:
|
|
changed[line[13:].rstrip()] = attrs
|
|
|
|
# ignore return value, because it returns non-zero when it found changes
|
|
rpm.wait()
|
|
|
|
return {
|
|
"missing": sorted(missing),
|
|
"changed": changed
|
|
}
|
|
|
|
|
|
def rpm_packages(tree, is_ostree):
|
|
cmd = ["rpm", "--root", tree, "-qa"]
|
|
if is_ostree:
|
|
cmd += ["--dbpath", "/usr/share/rpm"]
|
|
pkgs = subprocess_check_output(cmd, str.split)
|
|
return list(sorted(pkgs))
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def change_root(root):
|
|
real_root = os.open("/", os.O_RDONLY)
|
|
try:
|
|
os.chroot(root)
|
|
yield None
|
|
finally:
|
|
os.fchdir(real_root)
|
|
os.chroot(".")
|
|
os.close(real_root)
|
|
|
|
|
|
def read_services(tree, state):
|
|
services_state = subprocess_check_output(["systemctl", f"--root={tree}", "list-unit-files"], (lambda s: parse_unit_files(s, state)))
|
|
|
|
# Since systemd v246, some services previously reported as "enabled" /
|
|
# "disabled" are now reported as "alias". There is no systemd command, that
|
|
# would take an "alias" unit and report its state as enabled/disabled
|
|
# and could run on a different tree (with "--root" option).
|
|
# To make the produced list of services in the given state consistent on
|
|
# pre/post v246 systemd versions, check all "alias" units and append them
|
|
# to the list, if their target is also listed in 'services_state'.
|
|
if state != "alias":
|
|
services_alias = subprocess_check_output(["systemctl", f"--root={tree}", "list-unit-files"], (lambda s: parse_unit_files(s, "alias")))
|
|
|
|
for alias in services_alias:
|
|
# The service may be in one of the following places (output of
|
|
# "systemd-analyze unit-paths", it should not change too often).
|
|
unit_paths = [
|
|
"/etc/systemd/system.control",
|
|
"/run/systemd/system.control",
|
|
"/run/systemd/transient",
|
|
"/run/systemd/generator.early",
|
|
"/etc/systemd/system",
|
|
"/run/systemd/system",
|
|
"/run/systemd/generator",
|
|
"/usr/local/lib/systemd/system",
|
|
"/usr/lib/systemd/system",
|
|
"/run/systemd/generator.late"
|
|
]
|
|
|
|
with change_root(tree):
|
|
for path in unit_paths:
|
|
unit_path = os.path.join(path, alias)
|
|
if os.path.exists(unit_path):
|
|
real_unit_path = os.path.realpath(unit_path)
|
|
# Skip the alias, if there was a symlink cycle.
|
|
# When symbolic link cycles occur, the returned path will
|
|
# be one member of the cycle, but no guarantee is made about
|
|
# which member that will be.
|
|
if os.path.islink(real_unit_path):
|
|
continue
|
|
|
|
# Append the alias unit to the list, if its target is
|
|
# already there.
|
|
if os.path.basename(real_unit_path) in services_state:
|
|
services_state.append(alias)
|
|
|
|
# deduplicate and sort
|
|
services_state = list(set(services_state))
|
|
services_state.sort()
|
|
|
|
return services_state
|
|
|
|
|
|
def read_default_target(tree):
|
|
return subprocess_check_output(["systemctl", f"--root={tree}", "get-default"]).rstrip()
|
|
|
|
|
|
def read_firewall_zone(tree):
|
|
try:
|
|
with open(f"{tree}/etc/firewalld/firewalld.conf") as f:
|
|
conf = parse_environment_vars(f.read())
|
|
default = conf["DefaultZone"]
|
|
except FileNotFoundError:
|
|
default = "public"
|
|
|
|
r = []
|
|
try:
|
|
root = xml.etree.ElementTree.parse(f"{tree}/etc/firewalld/zones/{default}.xml").getroot()
|
|
except FileNotFoundError:
|
|
root = xml.etree.ElementTree.parse(f"{tree}/usr/lib/firewalld/zones/{default}.xml").getroot()
|
|
|
|
for element in root.findall("service"):
|
|
r.append(element.get("name"))
|
|
|
|
return r
|
|
|
|
|
|
def read_fstab(tree):
|
|
result = []
|
|
with contextlib.suppress(FileNotFoundError):
|
|
with open(f"{tree}/etc/fstab") as f:
|
|
result = sorted([line.split() for line in f if line and not line.startswith("#")])
|
|
return result
|
|
|
|
# Create a nested dictionary for all supported sysconfigs
|
|
def read_sysconfig(tree):
|
|
result = {}
|
|
sysconfig_paths = {
|
|
"kernel": f"{tree}/etc/sysconfig/kernel",
|
|
"network": f"{tree}/etc/sysconfig/network"
|
|
}
|
|
# iterate through supported configs
|
|
# based on https://github.com/osbuild/osbuild/blob/main/osbuild/util/osrelease.py#L17
|
|
for name, path in sysconfig_paths.items():
|
|
with contextlib.suppress(FileNotFoundError):
|
|
with open(path) as f:
|
|
# if file exists start with empty array of values
|
|
result[name] = {}
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
if line[0] == "#":
|
|
continue
|
|
key, value = line.split("=", 1)
|
|
result[name][key] = value.strip('"')
|
|
return result
|
|
|
|
|
|
def append_filesystem(report, tree, *, is_ostree=False):
|
|
if os.path.exists(f"{tree}/etc/os-release"):
|
|
report["packages"] = rpm_packages(tree, is_ostree)
|
|
if not is_ostree:
|
|
report["rpm-verify"] = rpm_verify(tree)
|
|
|
|
with open(f"{tree}/etc/os-release") as f:
|
|
report["os-release"] = parse_environment_vars(f.read())
|
|
|
|
report["services-enabled"] = read_services(tree, "enabled")
|
|
report["services-disabled"] = read_services(tree, "disabled")
|
|
|
|
default_target = read_default_target(tree)
|
|
if default_target:
|
|
report["default-target"] = default_target
|
|
|
|
with contextlib.suppress(FileNotFoundError):
|
|
with open(f"{tree}/etc/hostname") as f:
|
|
report["hostname"] = f.read().strip()
|
|
|
|
with contextlib.suppress(FileNotFoundError):
|
|
report["timezone"] = os.path.basename(os.readlink(f"{tree}/etc/localtime"))
|
|
|
|
with contextlib.suppress(FileNotFoundError):
|
|
report["firewall-enabled"] = read_firewall_zone(tree)
|
|
|
|
fstab = read_fstab(tree)
|
|
if fstab:
|
|
report["fstab"] = fstab
|
|
|
|
sysconfig = read_sysconfig(tree)
|
|
if sysconfig:
|
|
report["sysconfig"] = sysconfig
|
|
|
|
with open(f"{tree}/etc/passwd") as f:
|
|
report["passwd"] = sorted(f.read().strip().split("\n"))
|
|
|
|
with open(f"{tree}/etc/group") as f:
|
|
report["groups"] = sorted(f.read().strip().split("\n"))
|
|
|
|
if is_ostree:
|
|
with open(f"{tree}/usr/lib/passwd") as f:
|
|
report["passwd-system"] = sorted(f.read().strip().split("\n"))
|
|
|
|
with open(f"{tree}/usr/lib/group") as f:
|
|
report["groups-system"] = sorted(f.read().strip().split("\n"))
|
|
|
|
if os.path.exists(f"{tree}/boot") and len(os.listdir(f"{tree}/boot")) > 0:
|
|
assert "bootmenu" not in report
|
|
with contextlib.suppress(FileNotFoundError):
|
|
with open(f"{tree}/boot/grub2/grubenv") as f:
|
|
report["boot-environment"] = parse_environment_vars(f.read())
|
|
report["bootmenu"] = read_boot_entries(f"{tree}/boot")
|
|
|
|
elif len(glob.glob(f"{tree}/vmlinuz-*")) > 0:
|
|
assert "bootmenu" not in report
|
|
with open(f"{tree}/grub2/grubenv") as f:
|
|
report["boot-environment"] = parse_environment_vars(f.read())
|
|
report["bootmenu"] = read_boot_entries(tree)
|
|
elif len(glob.glob(f"{tree}/EFI")):
|
|
print("EFI partition", file=sys.stderr)
|
|
|
|
|
|
def partition_is_esp(partition):
|
|
return partition["type"] == "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"
|
|
|
|
|
|
def find_esp(partitions):
|
|
for i, p in enumerate(partitions):
|
|
if partition_is_esp(p):
|
|
return p, i
|
|
return None, 0
|
|
|
|
|
|
def append_partitions(report, device, loctl):
|
|
partitions = report["partitions"]
|
|
esp, esp_id = find_esp(partitions)
|
|
|
|
with contextlib.ExitStack() as cm:
|
|
|
|
devices = {}
|
|
for n, part in enumerate(partitions):
|
|
start, size = part["start"], part["size"]
|
|
dev = cm.enter_context(loop_open(loctl, device, offset=start, size=size))
|
|
devices[n] = dev
|
|
read_partition(dev, part)
|
|
|
|
for n, part in enumerate(partitions):
|
|
if not part["fstype"]:
|
|
continue
|
|
|
|
with mount(devices[n]) as tree:
|
|
if esp and os.path.exists(f"{tree}/boot/efi"):
|
|
with mount_at(devices[esp_id], f"{tree}/boot/efi", options=['umask=077']):
|
|
append_filesystem(report, tree)
|
|
else:
|
|
append_filesystem(report, tree)
|
|
|
|
|
|
def analyse_image(image):
|
|
loctl = loop.LoopControl()
|
|
|
|
imgfmt = read_image_format(image)
|
|
report = {"image-format": imgfmt}
|
|
|
|
with open_image(loctl, image, imgfmt) as (_, device):
|
|
report["bootloader"] = read_bootloader_type(device)
|
|
report.update(read_partition_table(device))
|
|
|
|
if report["partition-table"]:
|
|
append_partitions(report, device, loctl)
|
|
else:
|
|
with mount(device) as tree:
|
|
append_filesystem(report, tree)
|
|
|
|
return report
|
|
|
|
|
|
def append_directory(report, tree):
|
|
if os.path.lexists(f"{tree}/ostree"):
|
|
os.makedirs(f"{tree}/etc", exist_ok=True)
|
|
with mount_at(f"{tree}/usr/etc", f"{tree}/etc", extra=["--bind"]):
|
|
append_filesystem(report, tree, is_ostree=True)
|
|
else:
|
|
append_filesystem(report, tree)
|
|
|
|
|
|
def append_ostree_repo(report, repo):
|
|
ostree = functools.partial(run_ostree, repo=repo)
|
|
|
|
r = ostree("config", "get", "core.mode")
|
|
report["ostree"] = {
|
|
"repo": {
|
|
"core.mode": r.stdout.strip()
|
|
}
|
|
}
|
|
|
|
r = ostree("refs")
|
|
refs = r.stdout.strip().split("\n")
|
|
report["ostree"]["refs"] = refs
|
|
|
|
resolved = {r: ostree("rev-parse", r).stdout.strip() for r in refs}
|
|
commit = resolved[refs[0]]
|
|
|
|
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir:
|
|
tree = os.path.join(tmpdir, "tree")
|
|
ostree("checkout", "--force-copy", commit, tree)
|
|
append_directory(report, tree)
|
|
|
|
|
|
def analyse_directory(path):
|
|
report = {}
|
|
|
|
if os.path.exists(os.path.join(path, "compose.json")):
|
|
report["type"] = "ostree/commit"
|
|
repo = os.path.join(path, "repo")
|
|
append_ostree_repo(report, repo)
|
|
elif os.path.isdir(os.path.join(path, "refs")):
|
|
report["type"] = "ostree/repo"
|
|
append_ostree_repo(report, repo)
|
|
else:
|
|
append_directory(report, path)
|
|
|
|
return report
|
|
|
|
|
|
def is_tarball(path):
|
|
mtype, encoding = mimetypes.guess_type(path)
|
|
return mtype == "application/x-tar"
|
|
|
|
|
|
def analyse_tarball(path):
|
|
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir:
|
|
tree = os.path.join(tmpdir, "root")
|
|
os.makedirs(tree)
|
|
command = [
|
|
"tar",
|
|
"-x",
|
|
"--auto-compress",
|
|
"-f", path,
|
|
"-C", tree
|
|
]
|
|
subprocess.run(command,
|
|
stdout=sys.stderr,
|
|
check=True)
|
|
return analyse_directory(tree)
|
|
|
|
|
|
def is_compressed(path):
|
|
_, encoding = mimetypes.guess_type(path)
|
|
return encoding in ["xz", "gzip", "bzip2"]
|
|
|
|
|
|
def analyse_compressed(path):
|
|
_, encoding = mimetypes.guess_type(path)
|
|
|
|
if encoding == "xz":
|
|
command = ["unxz", "--force"]
|
|
elif encoding == "gzip":
|
|
command = ["gunzip", "--force"]
|
|
elif encoding == "bzip2":
|
|
command = ["bunzip2", "--force"]
|
|
else:
|
|
raise ValueError(f"Unsupported compression: {encoding}")
|
|
|
|
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir:
|
|
subprocess.run(["cp", "--reflink=auto", "-a", path, tmpdir],
|
|
check=True)
|
|
|
|
files = os.listdir(tmpdir)
|
|
archive = os.path.join(tmpdir, files[0])
|
|
subprocess.run(command + [archive], check=True)
|
|
|
|
files = os.listdir(tmpdir)
|
|
assert len(files) == 1
|
|
image = os.path.join(tmpdir, files[0])
|
|
return analyse_image(image)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Inspect an image")
|
|
parser.add_argument("target", metavar="TARGET",
|
|
help="The file or directory to analyse",
|
|
type=os.path.abspath)
|
|
|
|
args = parser.parse_args()
|
|
target = args.target
|
|
|
|
if os.path.isdir(target):
|
|
report = analyse_directory(target)
|
|
elif is_tarball(target):
|
|
report = analyse_tarball(target)
|
|
elif is_compressed(target):
|
|
report = analyse_compressed(target)
|
|
else:
|
|
report = analyse_image(target)
|
|
|
|
json.dump(report, sys.stdout, sort_keys=True, indent=2)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|