From 0f99200ba118f18c8bbc2386b49ea5c906614e0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Hozza?= Date: Tue, 17 Dec 2024 09:45:35 +0100 Subject: [PATCH] tools: add image-info tool as osbuild-image-info MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the image-info tool as osbuild-image-info from the manifest-db repository [0]. This is an exact copy without any changes. [0] https://github.com/osbuild/manifest-db/blob/8e05a898d4a83f31ad24496c8738a3b1724cf937/tools/image-info Signed-off-by: Tomáš Hozza --- tools/osbuild-image-info | 2883 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 2883 insertions(+) create mode 100755 tools/osbuild-image-info diff --git a/tools/osbuild-image-info b/tools/osbuild-image-info new file mode 100755 index 00000000..fa51d1b5 --- /dev/null +++ b/tools/osbuild-image-info @@ -0,0 +1,2883 @@ +#!/usr/bin/python3 + +import argparse +import configparser +import contextlib +import functools +import glob +import mimetypes +import operator +import json +import os +import platform +import re +import stat +import subprocess +import sys +import time +import tempfile +import xml.etree.ElementTree +import yaml +import pathlib +import jsonschema + +from collections import OrderedDict +from typing import Dict, Any + +from osbuild import devices, host, mounts, meta, monitor + +index = meta.Index("/usr/lib/osbuild/") +SECTOR_SIZE = 512 + + +def run_ostree(*args, _input=None, _check=True, **kwargs): + args = list(args) + [f'--{k}={v}' for k, v in kwargs.items()] + print("ostree " + " ".join(args), file=sys.stderr) + res = subprocess.run(["ostree"] + args, + encoding="utf-8", + stdout=subprocess.PIPE, + input=_input, + check=_check) + return res + + +def loop_open(devmgr: devices.DeviceManager, name: str, image, size, offset=0): + """ + Uses a DeviceManager to open the `name` at `offset` + Retuns a Device object and the path onto wich the image was loopback mounted + """ + info = index.get_module_info("Device", "org.osbuild.loopback") + fname = os.path.basename(image) + options = { + "filename": fname, + "start": offset // SECTOR_SIZE, + "size": size // SECTOR_SIZE + } + if not info: + raise RuntimeError("Can't load org.osbuild.loopback") + + jsonschema.validate(options, info.get_schema()) + dev = devices.Device(name, info, None, options) + reply = devmgr.open(dev) + return { + "Device": dev, + "path": os.path.join("/dev", reply["path"]) + } + + +@contextlib.contextmanager +def convert_image(image, fmt): + with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp: + if fmt["type"] != "raw": + target = os.path.join(tmp, "image.raw") + # A bug exists in qemu that causes the conversion to raw to fail + # on aarch64 systems with a LOT of CPUs. A workaround is to use + # a single coroutine to do the conversion. It doesn't slow down + # the conversion by much, but it hangs about half the time without + # the limit set. 😢 + # Bug: https://bugs.launchpad.net/qemu/+bug/1805256 + if platform.machine() == 'aarch64': + subprocess.run( + ["qemu-img", "convert", "-m", "1", "-O", "raw", image, target], + check=True + ) + else: + subprocess.run( + ["qemu-img", "convert", "-O", "raw", image, target], + check=True + ) + else: + target = image + + yield target + + +@contextlib.contextmanager +def mount_at(device, mountpoint, options=[], extra=[]): + opts = ",".join(["ro"] + options) + subprocess.run(["mount", "-o", opts] + extra + [device, mountpoint], check=True) + try: + yield mountpoint + finally: + subprocess.run(["umount", "--lazy", mountpoint], check=True) + + +@contextlib.contextmanager +def mount(device, options=None): + options = options or [] + opts = ",".join(["ro"] + options) + with tempfile.TemporaryDirectory() as mountpoint: + subprocess.run(["mount", "-o", opts, device, mountpoint], check=True) + try: + yield mountpoint + finally: + subprocess.run(["umount", "--lazy", mountpoint], check=True) + + +def parse_environment_vars(s): + r = {} + for line in s.split("\n"): + line = line.strip() + if not line: + continue + if line[0] == '#': + continue + key, value = line.split("=", 1) + r[key] = value.strip('"') + return r + + +# Parses output of `systemctl list-unit-files` +def parse_unit_files(s, expected_state): + r = [] + for line in s.split("\n")[1:]: + state = "" + unit = "" + try: + unit, state, *_ = line.split() + except ValueError: + pass + if state != expected_state: + continue + r.append(unit) + + return r + + +def subprocess_check_output(argv, parse_fn=None) -> Any: + try: + output = subprocess.check_output(argv, encoding="utf-8") + except subprocess.CalledProcessError as e: + sys.stderr.write(f"--- Output from {argv}:\n") + sys.stderr.write(e.stdout) + sys.stderr.write("\n--- End of the output\n") + raise + + return parse_fn(output) if parse_fn else output + + +def read_container_images(tree): + """ + Read installed containers + + Returns: a dictionary listing the container images in the format + like `podman images --format json` but with less information. + + NB: The parsing is done "manually" since running `podman` in the + chroot does not work. + """ + + images = [] + images_index = os.path.join("overlay-images", "images.json") + + for d in ("/var/lib/containers/storage", ): + path = os.path.join(tree, d.lstrip("/"), images_index) + try: + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + except FileNotFoundError: + continue + + for image in data: + img = { + "Id": image["id"], + "Digest": image["digest"], + "Names": image["names"], + } + created = image.get("created") + if created: + img["Created"] = created + + images.append(img) + + return images + + +def read_image_format(device) -> Dict[str, str]: + """ + Read image format. + + Returns: dictionary with at least one key 'type'. 'type' value is a string + representing the format of the image. In case the type is 'qcow2', the returned + dictionary contains second key 'compat' with a string value representing + the compatibility version of the 'qcow2' image. + + An example return value: + { + "compat": "1.1", + "type": "qcow2" + } + """ + qemu = subprocess_check_output(["qemu-img", "info", "--output=json", device], json.loads) + format = qemu["format"] + result = {"type": format} + if format == "qcow2": + result["compat"] = qemu["format-specific"]["data"]["compat"] + return result + + +def read_partition(device, partition): + """ + Read block device attributes using 'blkid' and extend the passed 'partition' + dictionary. + + Returns: the 'partition' dictionary provided as an argument, extended with + 'label', 'uuid' and 'fstype' keys and their values. + """ + res = subprocess.run(["blkid", "-c", "/dev/null", "--output", "export", + device], + check=False, encoding="utf-8", + stdout=subprocess.PIPE) + if res.returncode == 0: + blkid = parse_environment_vars(res.stdout) + else: + blkid = {} + + partition["label"] = blkid.get("LABEL") # doesn't exist for mbr + partition["uuid"] = blkid.get("UUID") + partition["fstype"] = blkid.get("TYPE") + return partition + + +def read_partition_table(device): + """ + Read information related to found partitions and partitioning table from + the device. + + Returns: dictionary with three keys - 'partition-table', 'partition-table-id' + and 'partitions'. + 'partition-table' value is a string with the type of the partition table or 'None'. + 'partition-table-id' value is a string with the ID of the partition table or 'None'. + 'partitions' value is a list of dictionaries representing found partitions. + + An example return value: + { + "partition-table": "gpt", + "partition-table-id": "DA237A6F-F0D4-47DF-BB50-007E00DB347C", + "partitions": [ + { + "bootable": false, + "partuuid": "64AF1EC2-0328-406A-8F36-83016E6DD858", + "size": 1048576, + "start": 1048576, + "type": "21686148-6449-6E6F-744E-656564454649", + }, + { + "bootable": false, + "partuuid": "D650D523-06F6-4B90-9204-8F998FE9703C", + "size": 6442450944, + "start": 2097152, + "type": "0FC63DAF-8483-4772-8E79-3D69D8477DE4", + } + ] + } + """ + partitions = [] + info = {"partition-table": None, + "partition-table-id": None, + "partitions": partitions} + try: + sfdisk = subprocess_check_output(["sfdisk", "--json", device], json.loads) + except subprocess.CalledProcessError: + # This handles a case, when the device does contain a filesystem, + # but there is no partition table. + partitions.append(read_partition(device, {})) + return info + + ptable = sfdisk["partitiontable"] + assert ptable["unit"] == "sectors" + is_dos = ptable["label"] == "dos" + ssize = ptable.get("sectorsize", SECTOR_SIZE) + + for i, p in enumerate(ptable["partitions"]): + + partuuid = p.get("uuid") + if not partuuid and is_dos: + # For dos/mbr partition layouts the partition uuid + # is generated. Normally this would be done by + # udev+blkid, when the partition table is scanned. + # 'sfdisk' prefixes the partition id with '0x' but + # 'blkid' does not; remove it to mimic 'blkid' + table_id = ptable['id'][2:] + partuuid = "%.33s-%02x" % (table_id, i+1) + + partitions.append({ + "bootable": p.get("bootable", False), + "type": p["type"], + "start": p["start"] * ssize, + "size": p["size"] * ssize, + "partuuid": partuuid + }) + + info["partitions"] = sorted(info["partitions"], key=operator.itemgetter("partuuid")) + info["partition-table"] = ptable["label"] + info["partition-table-id"] = ptable["id"] + + return info + + +def read_bootloader_type(device) -> str: + """ + Read bootloader type from the provided device. + + Returns: string representing the found bootloader. Function can return two values: + - 'grub' + - 'unknown' + """ + with open(device, "rb") as f: + if b"GRUB" in f.read(SECTOR_SIZE): + return "grub" + else: + return "unknown" + + +def read_boot_entries(boot_dir): + """ + Read boot entries. + + Returns: list of dictionaries representing configured boot entries. + + An example return value: + [ + { + "grub_arg": "--unrestricted", + "grub_class": "kernel", + "grub_users": "$grub_users", + "id": "rhel-20210429130346-0-rescue-c116920b13f44c59846f90b1057605bc", + "initrd": "/boot/initramfs-0-rescue-c116920b13f44c59846f90b1057605bc.img", + "linux": "/boot/vmlinuz-0-rescue-c116920b13f44c59846f90b1057605bc", + "options": "$kernelopts", + "title": "Red Hat Enterprise Linux (0-rescue-c116920b13f44c59846f90b1057605bc) 8.4 (Ootpa)", + "version": "0-rescue-c116920b13f44c59846f90b1057605bc" + }, + { + "grub_arg": "--unrestricted", + "grub_class": "kernel", + "grub_users": "$grub_users", + "id": "rhel-20210429130346-4.18.0-305.el8.x86_64", + "initrd": "/boot/initramfs-4.18.0-305.el8.x86_64.img $tuned_initrd", + "linux": "/boot/vmlinuz-4.18.0-305.el8.x86_64", + "options": "$kernelopts $tuned_params", + "title": "Red Hat Enterprise Linux (4.18.0-305.el8.x86_64) 8.4 (Ootpa)", + "version": "4.18.0-305.el8.x86_64" + } + ] + """ + entries = [] + for conf in glob.glob(f"{boot_dir}/loader/entries/*.conf"): + with open(conf) as f: + entries.append(dict(line.strip().split(" ", 1) for line in f)) + + return sorted(entries, key=lambda e: e["title"]) + + +def rpm_verify(tree): + """ + Read the output of 'rpm --verify'. + + Returns: dictionary with two keys 'changed' and 'missing'. + 'changed' value is a dictionary with the keys representing modified files from + installed RPM packages and values representing types of applied modifications. + 'missing' value is a list of strings prepresenting missing values owned by + installed RPM packages. + + An example return value: + { + "changed": { + "/etc/chrony.conf": "S.5....T.", + "/etc/cloud/cloud.cfg": "S.5....T.", + "/etc/nsswitch.conf": "....L....", + "/etc/openldap/ldap.conf": ".......T.", + "/etc/pam.d/fingerprint-auth": "....L....", + "/etc/pam.d/password-auth": "....L....", + "/etc/pam.d/postlogin": "....L....", + "/etc/pam.d/smartcard-auth": "....L....", + "/etc/pam.d/system-auth": "....L....", + "/etc/rhsm/rhsm.conf": "..5....T.", + "/etc/sudoers": "S.5....T.", + "/etc/systemd/logind.conf": "S.5....T." + }, + "missing": [ + "/etc/udev/rules.d/70-persistent-ipoib.rules", + "/run/cloud-init", + "/run/rpcbind", + "/run/setrans", + "/run/tuned" + ] + } + """ + # cannot use `rpm --root` here, because rpm uses passwd from the host to + # verify user and group ownership: + # https://github.com/rpm-software-management/rpm/issues/882 + rpm = subprocess.Popen(["chroot", tree, "rpm", "--verify", "--all"], + stdout=subprocess.PIPE, encoding="utf-8") + + changed = {} + missing = [] + + if rpm.stdout: + for line in rpm.stdout: + # format description in rpm(8), under `--verify` + attrs = line[:9] + if attrs == "missing ": + missing.append(line[12:].rstrip()) + else: + changed[line[13:].rstrip()] = attrs + + # ignore return value, because it returns non-zero when it found changes + rpm.wait() + + return { + "missing": sorted(missing), + "changed": changed + } + + +def rpm_not_installed_docs(tree): + """ + Gathers information on documentation, which is part of RPM packages, + but was not installed. + + Returns: list of documentation files, which are normally a part of + the installed RPM packages, but were not installed (e.g. due to using + '--excludedocs' option when executing 'rpm' command). + + An example return value: + [ + "/usr/share/man/man1/sdiff.1.gz", + "/usr/share/man/man1/seapplet.1.gz", + "/usr/share/man/man1/secon.1.gz", + "/usr/share/man/man1/secret-tool.1.gz", + "/usr/share/man/man1/sed.1.gz", + "/usr/share/man/man1/seq.1.gz" + ] + """ + # check not installed Docs (e.g. when RPMs are installed with --excludedocs) + not_installed_docs = [] + cmd = ["rpm", "--root", tree, "-qad", "--state"] + if os.path.exists(os.path.join(tree, "usr/share/rpm")): + cmd += ["--dbpath", "/usr/share/rpm"] + elif os.path.exists(os.path.join(tree, "var/lib/rpm")): + cmd += ["--dbpath", "/var/lib/rpm"] + output = subprocess_check_output(cmd) + for line in output.splitlines(): + if line.startswith("not installed"): + not_installed_docs.append(line.split()[-1]) + + return sorted(not_installed_docs) + + +def rpm_packages(tree): + """ + Read NVRs of RPM packages installed on the system. + + Returns: sorted list of strings representing RPM packages installed + on the system. + + An example return value: + [ + "NetworkManager-1.30.0-7.el8.x86_64", + "PackageKit-glib-1.1.12-6.el8.x86_64", + "PackageKit-gtk3-module-1.1.12-6.el8.x86_64", + "abattis-cantarell-fonts-0.0.25-6.el8.noarch", + "acl-2.2.53-1.el8.x86_64", + "adobe-mappings-cmap-20171205-3.el8.noarch", + "adobe-mappings-cmap-deprecated-20171205-3.el8.noarch", + "adobe-mappings-pdf-20180407-1.el8.noarch", + "adwaita-cursor-theme-3.28.0-2.el8.noarch", + "adwaita-icon-theme-3.28.0-2.el8.noarch", + "alsa-lib-1.2.4-5.el8.x86_64" + ] + """ + cmd = ["rpm", "--root", tree, "-qa"] + if os.path.exists(os.path.join(tree, "usr/share/rpm")): + cmd += ["--dbpath", "/usr/share/rpm"] + elif os.path.exists(os.path.join(tree, "var/lib/rpm")): + cmd += ["--dbpath", "/var/lib/rpm"] + subprocess_check_output(cmd) + pkgs = subprocess_check_output(cmd, str.split) + return list(sorted(pkgs)) + + +@contextlib.contextmanager +def change_root(root): + real_root = os.open("/", os.O_RDONLY) + try: + os.chroot(root) + yield None + finally: + os.fchdir(real_root) + os.chroot(".") + os.close(real_root) + + +def read_services(tree, state): + """ + Read the list of systemd services on the system in the given state. + + Returns: alphabetically sorted list of strings representing systemd services + in the given state. + The returned list may be empty. + + An example return value: + [ + "arp-ethers.service", + "canberra-system-bootup.service", + "canberra-system-shutdown-reboot.service", + "canberra-system-shutdown.service", + "chrony-dnssrv@.timer", + "chrony-wait.service" + ] + """ + services_state = subprocess_check_output( + ["systemctl", f"--root={tree}", "list-unit-files"], (lambda s: parse_unit_files(s, state))) + + # Since systemd v246, some services previously reported as "enabled" / + # "disabled" are now reported as "alias". There is no systemd command, that + # would take an "alias" unit and report its state as enabled/disabled + # and could run on a different tree (with "--root" option). + # To make the produced list of services in the given state consistent on + # pre/post v246 systemd versions, check all "alias" units and append them + # to the list, if their target is also listed in 'services_state'. + if state != "alias": + services_alias = subprocess_check_output( + ["systemctl", f"--root={tree}", "list-unit-files"], (lambda s: parse_unit_files(s, "alias"))) + + for alias in services_alias: + # The service may be in one of the following places (output of + # "systemd-analyze unit-paths", it should not change too often). + unit_paths = [ + "/etc/systemd/system.control", + "/run/systemd/system.control", + "/run/systemd/transient", + "/run/systemd/generator.early", + "/etc/systemd/system", + "/run/systemd/system", + "/run/systemd/generator", + "/usr/local/lib/systemd/system", + "/usr/lib/systemd/system", + "/run/systemd/generator.late" + ] + + with change_root(tree): + for path in unit_paths: + unit_path = os.path.join(path, alias) + if os.path.exists(unit_path): + real_unit_path = os.path.realpath(unit_path) + # Skip the alias, if there was a symlink cycle. + # When symbolic link cycles occur, the returned path will + # be one member of the cycle, but no guarantee is made about + # which member that will be. + if os.path.islink(real_unit_path): + continue + + # Append the alias unit to the list, if its target is + # already there. + if os.path.basename(real_unit_path) in services_state: + services_state.append(alias) + + # deduplicate and sort + services_state = list(set(services_state)) + services_state.sort() + + return services_state + + +def read_default_target(tree): + """ + Read the default systemd target. + + Returns: string representing the default systemd target. + + An example return value: + "multi-user.target" + """ + return subprocess_check_output(["systemctl", f"--root={tree}", "get-default"]).rstrip() + + +def read_firewall_default_zone(tree): + """ + Read the name of the default firewall zone + + Returns: a string with the zone name. If the firewall configuration doesn't + exist, an empty string is returned. + + An example return value: + "trusted" + """ + try: + with open(f"{tree}/etc/firewalld/firewalld.conf") as f: + conf = parse_environment_vars(f.read()) + return conf["DefaultZone"] + except FileNotFoundError: + return "" + + +def read_firewall_zone(tree): + """ + Read enabled services from the configuration of the default firewall zone. + + Returns: list of strings representing enabled services in the firewall. + The returned list may be empty. + + An example return value: + [ + "ssh", + "dhcpv6-client", + "cockpit" + ] + """ + default = read_firewall_default_zone(tree) + if default == "": + default = "public" + + r = [] + try: + root = xml.etree.ElementTree.parse(f"{tree}/etc/firewalld/zones/{default}.xml").getroot() + except FileNotFoundError: + root = xml.etree.ElementTree.parse(f"{tree}/usr/lib/firewalld/zones/{default}.xml").getroot() + + for element in root.findall("service"): + r.append(element.get("name")) + + return r + + +def read_fstab(tree): + """ + Read the content of /etc/fstab. + + Returns: list of all uncommented lines read from the configuration file + represented as a list of values split by whitespaces. + The returned list may be empty. + + An example return value: + [ + [ + "UUID=6d066eb4-e4c1-4472-91f9-d167097f48d1", + "/", + "xfs", + "defaults", + "0", + "0" + ] + ] + """ + result = [] + with contextlib.suppress(FileNotFoundError): + with open(f"{tree}/etc/fstab") as f: + result = sorted([line.split() for line in f if line.strip() and not line.startswith("#")]) + return result + + +def read_rhsm(tree): + """ + Read configuration changes possible via org.osbuild.rhsm stage + and in addition also the whole content of /etc/rhsm/rhsm.conf. + + Returns: returns dictionary with two keys - 'dnf-plugins' and 'rhsm.conf'. + 'dnf-plugins' value represents configuration of 'product-id' and + 'subscription-manager' DNF plugins. + 'rhsm.conf' value is a dictionary representing the content of the RHSM + configuration file. + The returned dictionary may be empty. + + An example return value: + { + "dnf-plugins": { + "product-id": { + "enabled": true + }, + "subscription-manager": { + "enabled": true + } + }, + "rhsm.conf": { + "logging": { + "default_log_level": "INFO" + }, + "rhsm": { + "auto_enable_yum_plugins": "1", + "baseurl": "https://cdn.redhat.com", + "ca_cert_dir": "/etc/rhsm/ca/", + "consumercertdir": "/etc/pki/consumer", + "entitlementcertdir": "/etc/pki/entitlement", + "full_refresh_on_yum": "0", + "inotify": "1", + "manage_repos": "0", + "package_profile_on_trans": "0", + "pluginconfdir": "/etc/rhsm/pluginconf.d", + "plugindir": "/usr/share/rhsm-plugins", + "productcertdir": "/etc/pki/product", + "repo_ca_cert": "/etc/rhsm/ca/redhat-uep.pem", + "repomd_gpg_url": "", + "report_package_profile": "1" + }, + "rhsmcertd": { + "auto_registration": "1", + "auto_registration_interval": "60", + "autoattachinterval": "1440", + "certcheckinterval": "240", + "disable": "0", + "splay": "1" + }, + "server": { + "hostname": "subscription.rhsm.redhat.com", + "insecure": "0", + "no_proxy": "", + "port": "443", + "prefix": "/subscription", + "proxy_hostname": "", + "proxy_password": "", + "proxy_port": "", + "proxy_scheme": "http", + "proxy_user": "", + "ssl_verify_depth": "3" + } + } + } + """ + result = {} + + # Check RHSM DNF plugins configuration and allowed options + dnf_plugins_config = { + "product-id": f"{tree}/etc/dnf/plugins/product-id.conf", + "subscription-manager": f"{tree}/etc/dnf/plugins/subscription-manager.conf" + } + + for plugin_name, plugin_path in dnf_plugins_config.items(): + with contextlib.suppress(FileNotFoundError): + with open(plugin_path) as f: + parser = configparser.ConfigParser() + parser.read_file(f) + # only read "enabled" option from "main" section + with contextlib.suppress(configparser.NoSectionError, configparser.NoOptionError): + # get the value as the first thing, in case it raises an exception + enabled = parser.getboolean("main", "enabled") + + try: + dnf_plugins_dict = result["dnf-plugins"] + except KeyError as _: + dnf_plugins_dict = result["dnf-plugins"] = {} + + try: + plugin_dict = dnf_plugins_dict[plugin_name] + except KeyError as _: + plugin_dict = dnf_plugins_dict[plugin_name] = {} + + plugin_dict["enabled"] = enabled + + with contextlib.suppress(FileNotFoundError): + rhsm_conf = {} + with open(f"{tree}/etc/rhsm/rhsm.conf") as f: + parser = configparser.ConfigParser() + parser.read_file(f) + for section in parser.sections(): + section_dict = {} + section_dict.update(parser[section]) + if section_dict: + rhsm_conf[section] = section_dict + + result["rhsm.conf"] = rhsm_conf + + return result + + +def read_sysconfig(tree): + """ + Read selected configuration files from /etc/sysconfig. + + Currently supported sysconfig files are: + - 'kernel' - /etc/sysconfig/kernel + - 'network' - /etc/sysconfig/network + - 'network-scripts' - /etc/sysconfig/network-scripts/ifcfg-* + + Returns: dictionary with the keys being the supported types of sysconfig + configurations read by the function. Values of 'kernel' and 'network' keys + are a dictionaries containing key/values read from the respective + configuration files. Value of 'network-scripts' key is a dictionary with + the keys corresponding to the suffix of each 'ifcfg-*' configuration file + and their values holding dictionaries with all key/values read from the + configuration file. + The returned dictionary may be empty. + + An example return value: + { + "kernel": { + "DEFAULTKERNEL": "kernel", + "UPDATEDEFAULT": "yes" + }, + "network": { + "NETWORKING": "yes", + "NOZEROCONF": "yes" + }, + "network-scripts": { + "ens3": { + "BOOTPROTO": "dhcp", + "BROWSER_ONLY": "no", + "DEFROUTE": "yes", + "DEVICE": "ens3", + "IPV4_FAILURE_FATAL": "no", + "IPV6INIT": "yes", + "IPV6_AUTOCONF": "yes", + "IPV6_DEFROUTE": "yes", + "IPV6_FAILURE_FATAL": "no", + "NAME": "ens3", + "ONBOOT": "yes", + "PROXY_METHOD": "none", + "TYPE": "Ethernet", + "UUID": "106f1b31-7093-41d6-ae47-1201710d0447" + }, + "eth0": { + "BOOTPROTO": "dhcp", + "DEVICE": "eth0", + "IPV6INIT": "no", + "ONBOOT": "yes", + "PEERDNS": "yes", + "TYPE": "Ethernet", + "USERCTL": "yes" + } + } + } + """ + result = {} + sysconfig_paths = { + "kernel": f"{tree}/etc/sysconfig/kernel", + "network": f"{tree}/etc/sysconfig/network" + } + # iterate through supported configs + for name, path in sysconfig_paths.items(): + with contextlib.suppress(FileNotFoundError): + with open(path) as f: + # if file exists start with empty array of values + result[name] = parse_environment_vars(f.read()) + + # iterate through all files in /etc/sysconfig/network-scripts + network_scripts = {} + files = glob.glob(f"{tree}/etc/sysconfig/network-scripts/ifcfg-*") + for file in files: + ifname = os.path.basename(file).lstrip("ifcfg-") + with open(file) as f: + network_scripts[ifname] = parse_environment_vars(f.read()) + + if network_scripts: + result["network-scripts"] = network_scripts + + return result + + +def read_hosts(tree): + """ + Read non-empty lines of /etc/hosts. + + Returns: list of strings for all uncommented lines in the configuration file. + The returned list may be empty. + + An example return value: + [ + "127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4", + "::1 localhost localhost.localdomain localhost6 localhost6.localdomain6" + ] + """ + result = [] + + with contextlib.suppress(FileNotFoundError): + with open(f"{tree}/etc/hosts") as f: + for line in f: + line = line.strip() + if line: + result.append(line) + return result + + +def read_logind_config(config_path): + """ + Read all uncommented key/values from the 'Login" section of system-logind + configuration file. + + Returns: dictionary with key/values read from the configuration file. + The returned dictionary may be empty. + + An example return value: + { + "NAutoVTs": "0" + } + """ + result = {} + + with open(config_path) as f: + parser = configparser.RawConfigParser() + # prevent conversion of the option name to lowercase + parser.optionxform = lambda option: option + parser.read_file(f) + with contextlib.suppress(configparser.NoSectionError): + result.update(parser["Login"]) + return result + + +def read_logind_configs(tree): + """ + Read all systemd-logind *.conf files from a predefined list of paths and + parse them. + + The searched paths are: + - "/etc/systemd/logind.conf" + - "/etc/systemd/logind.conf.d/*.conf" + - "/usr/lib/systemd/logind.conf.d/*.conf" + + Returns: dictionary as returned by '_read_glob_paths_with_parser()' with + configuration representation as returned by 'read_logind_config()'. + + An example return value: + { + "/etc/systemd/logind.conf": { + "NAutoVTs": "0" + } + } + """ + checked_globs = [ + "/etc/systemd/logind.conf", + "/etc/systemd/logind.conf.d/*.conf", + "/usr/lib/systemd/logind.conf.d/*.conf" + ] + + return _read_glob_paths_with_parser(tree, checked_globs, read_logind_config) + + +def read_locale(tree): + """ + Read all uncommented key/values set in /etc/locale.conf. + + Returns: dictionary with key/values read from the configuration file. + The returned dictionary may be empty. + + An example return value: + { + "LANG": "en_US" + } + """ + with contextlib.suppress(FileNotFoundError): + with open(f"{tree}/etc/locale.conf") as f: + return parse_environment_vars(f.read()) + + +def read_selinux_info(tree, is_ostree): + """ + Read information related to SELinux. + + Returns: dictionary with two keys - 'policy' and 'context-mismatch'. + 'policy' value corresponds to the value returned by read_selinux_conf(). + 'context-mismatch' value corresponds to the value returned by + read_selinux_ctx_mismatch(). + The returned dictionary may be empty. Keys with empty values are omitted. + + An example return value: + { + "context-mismatch": [ + { + "actual": "system_u:object_r:root_t:s0", + "expected": "system_u:object_r:device_t:s0", + "filename": "/dev" + }, + { + "actual": "system_u:object_r:root_t:s0", + "expected": "system_u:object_r:default_t:s0", + "filename": "/proc" + } + ], + "policy": { + "SELINUX": "permissive", + "SELINUXTYPE": "targeted" + } + } + """ + result = {} + + policy = read_selinux_conf(tree) + if policy: + result["policy"] = policy + + with contextlib.suppress(subprocess.CalledProcessError): + ctx_mismatch = read_selinux_ctx_mismatch(tree, is_ostree) + if ctx_mismatch: + result["context-mismatch"] = ctx_mismatch + + return result + + +def read_selinux_conf(tree): + """ + Read all uncommented key/values set in /etc/selinux/config. + + Returns: dictionary with key/values read from the configuration + file. + + An example of returned value: + { + "SELINUX": "enforcing", + "SELINUXTYPE": "targeted" + } + """ + with contextlib.suppress(FileNotFoundError): + with open(f"{tree}/etc/selinux/config") as f: + return parse_environment_vars(f.read()) + + +def read_selinux_ctx_mismatch(tree, is_ostree): + """ + Read any mismatch in selinux context of files on the image. + + Returns: list of dictionaries as described below. If there + are no mismatches between used and expected selinux context, + then an empty list is returned. + + If the checked 'tree' is ostree, then the path '/etc' is + excluded from the check. This is beause it is bind-mounted + from /usr/etc and therefore has incorrect selinux context + for its filesystem path. + + An example of returned value: + [ + { + "actual": "system_u:object_r:root_t:s0", + "expected": "system_u:object_r:device_t:s0", + "filename": "/dev" + }, + { + "actual": "system_u:object_r:root_t:s0", + "expected": "system_u:object_r:default_t:s0", + "filename": "/proc" + } + ] + """ + result = [] + + # The binary policy that should be used is on the image and has name "policy.X" + # where the "X" is a number. There may be more than one policy files. + # In the usual case, the policy with the highest number suffix should be used. + policy_files = glob.glob(f"{tree}/etc/selinux/targeted/policy/policy.*") + policy_files = sorted(policy_files, reverse=True) + + if policy_files: + CMD = [ + "setfiles", + "-r", f"{tree}", + "-nvF", + "-c", policy_files[0], # take the policy with the highest number + f"{tree}/etc/selinux/targeted/contexts/files/file_contexts", + f"{tree}" + ] + + if is_ostree: + # exclude /etc from being checked when the tree is ostree, because + # it is just bind-mounted from /usr/etc and has incorrect selinux + # context for /etc path + CMD.extend(["-e", f"{tree}/etc"]) + + output = subprocess_check_output(CMD) + + # output are lines such as: + # Would relabel /tmp/tmpwrozmb47/dev from system_u:object_r:root_t:s0 to system_u:object_r:device_t:s0\n + setfiles_pattern = r"Would\s+relabel\s+(?P.+)\s+from\s+(?P.+)\s+to\s+(?P.+)" + setfiles_re = re.compile(setfiles_pattern) + + for line in output.splitlines(): + line = line.strip() + if not line: + continue + match = setfiles_re.match(line) + # do not silently ignore changes of 'setfiles' output + if not match: + raise RuntimeError(f"could not match line '{line}' with pattern '{setfiles_pattern}'") + parsed_line = { + "filename": match.group("filename")[len(tree):], + "actual": match.group("actual"), + "expected": match.group("expected") + } + result.append(parsed_line) + + # sort the list to make it consistent across runs + result.sort(key=lambda x: x.get("filename")) + + return result + + +def _read_glob_paths_with_parser(tree, glob_paths, parser_func): + """ + Use 'parser_func' to read all files obtained by using all 'glob_paths' + globbing patterns under the 'tree' path. + + The 'glob_paths' is a list string patterns accepted by glob.glob(). + The 'parser_func' function is expected to take a single string argument + containing the absolute path to a configuration file which should be parsed. + Its return value can be arbitrary representation of the parsed + configuration. + + Returns: dictionary with the keys corresponding to directories, which + contain configuration files mathing the provided glob pattern. Value of + each key is another dictionary with keys representing each filename and + values being the parsed configuration representation as returned by the + provided 'parser_func' function. + + An example return value for dracut configuration paths and parser: + { + "/etc/dracut.conf.d": { + "sgdisk.conf": { + "install_items": " sgdisk " + }, + }, + "/usr/lib/dracut/dracut.conf.d": { + "xen.conf": { + "add_drivers": " xen-netfront xen-blkfront " + } + } + } + """ + result = {} + + for glob_path in glob_paths: + glob_path_result = {} + + files = glob.glob(f"{tree}{glob_path}") + for file in files: + config = parser_func(file) + if config: + filename = os.path.basename(file) + glob_path_result[filename] = config + + if glob_path_result: + checked_path = os.path.dirname(glob_path) + result[checked_path] = glob_path_result + + return result + + +def read_modprobe_config(config_path): + """ + Read a specific modprobe configuragion file and for now, extract only + blacklisted kernel modules. + + Returns: dictionary with the keys corresponding to specific modprobe + commands and values being the values of these commands. + + An example return value: + { + "blacklist": [ + "nouveau" + ] + } + """ + file_result = {} + + BLACKLIST_CMD = "blacklist" + + with open(config_path) as f: + # The format of files under modprobe.d: one command per line, + # with blank lines and lines starting with '#' ignored. + # A '\' at the end of a line causes it to continue on the next line. + line_to_be_continued = "" + for line in f: + line = line.strip() + # line is not blank + if line: + # comment, skip it + if line[0] == "#": + continue + # this line continues on the following line + if line[-1] == "\\": + line_to_be_continued += line[:-1] + continue + # this line ends here + else: + # is this line continuation of the previous one? + if line_to_be_continued: + line = line_to_be_continued + line + line_to_be_continued = "" + cmd, cmd_args = line.split(' ', 1) + # we care only about blacklist command for now + if cmd == BLACKLIST_CMD: + modules_list = file_result[BLACKLIST_CMD] = [] + modules_list.append(cmd_args) + + return file_result + + +def read_modprobe_configs(tree): + """ + Read all modprobe *.conf files from a predefined list of paths and extract + supported commands. For now, extract only blacklisted kernel modules. + + The searched paths are: + - "/etc/modprobe.d/*.conf" + - "/usr/lib/modprobe.d/*.conf" + - "/usr/local/lib/modprobe.d/*.conf" + + Returns: dictionary as returned by '_read_glob_paths_with_parser()' with + configuration representation as returned by 'read_modprobe_config()'. + + An example return value: + { + "/usr/lib/modprobe.d": { + "blacklist-nouveau.conf": { + "blacklist": [ + "nouveau" + ] + } + } + } + """ + checked_globs = [ + "/etc/modprobe.d/*.conf", + "/usr/lib/modprobe.d/*.conf", + "/usr/local/lib/modprobe.d/*.conf" + ] + + return _read_glob_paths_with_parser(tree, checked_globs, read_modprobe_config) + + +def read_cloud_init_config(config_path): + """ + Read the specific cloud-init configuration file. + + Returns: dictionary representing the cloud-init configuration. + + An example return value: + { + "cloud_config_modules": [ + "mounts", + "locale", + "set-passwords", + "rh_subscription", + "yum-add-repo", + "package-update-upgrade-install", + "timezone", + "puppet", + "chef", + "salt-minion", + "mcollective", + "disable-ec2-metadata", + "runcmd" + ], + "cloud_final_modules": [ + "rightscale_userdata", + "scripts-per-once", + "scripts-per-boot", + "scripts-per-instance", + "scripts-user", + "ssh-authkey-fingerprints", + "keys-to-console", + "phone-home", + "final-message", + "power-state-change" + ], + "cloud_init_modules": [ + "disk_setup", + "migrator", + "bootcmd", + "write-files", + "growpart", + "resizefs", + "set_hostname", + "update_hostname", + "update_etc_hosts", + "rsyslog", + "users-groups", + "ssh" + ], + "disable_root": 1, + "disable_vmware_customization": false, + "mount_default_fields": [ + null, + null, + "auto", + "defaults,nofail,x-systemd.requires=cloud-init.service", + "0", + "2" + ], + "resize_rootfs_tmp": "/dev", + "ssh_deletekeys": 1, + "ssh_genkeytypes": null, + "ssh_pwauth": 0, + "syslog_fix_perms": null, + "system_info": { + "default_user": { + "gecos": "Cloud User", + "groups": [ + "adm", + "systemd-journal" + ], + "lock_passwd": true, + "name": "ec2-user", + "shell": "/bin/bash", + "sudo": [ + "ALL=(ALL) NOPASSWD:ALL" + ] + }, + "distro": "rhel", + "paths": { + "cloud_dir": "/var/lib/cloud", + "templates_dir": "/etc/cloud/templates" + }, + "ssh_svcname": "sshd" + }, + "users": [ + "default" + ] + } + """ + result = {} + + with contextlib.suppress(FileNotFoundError): + with open(config_path) as f: + config = yaml.safe_load(f) + result.update(config) + + return result + + +def read_cloud_init_configs(tree): + """ + Read all cloud-init *.cfg files from a predefined list of paths and parse them. + + The searched paths are: + - "/etc/cloud/cloud.cfg" + - "/etc/cloud/cloud.cfg.d/*.cfg" + + Returns: dictionary as returned by '_read_glob_paths_with_parser()' with + configuration representation as returned by 'read_cloud_init_config()'. + + An example return value: + { + "/etc/cloud.cfg.d": + "ec2.cfg": { + "default_user": { + "name": "ec2-user" + } + } + } + } + """ + checked_globs = [ + "/etc/cloud/cloud.cfg", + "/etc/cloud/cloud.cfg.d/*.cfg" + ] + + return _read_glob_paths_with_parser(tree, checked_globs, read_cloud_init_config) + + +def read_dracut_config(config_path): + """ + Read specific dracut configuration file. + + Returns: dictionary representing the uncommented configuration options read + from the file. + + An example return value: + { + "install_items": " sgdisk " + "add_drivers": " xen-netfront xen-blkfront " + } + """ + result = {} + + with open(config_path) as f: + # dracut configuration key/values delimiter is '=' or '+=' + for line in f: + line = line.strip() + # A '#' indicates the beginning of a comment; following + # characters, up to the end of the line are not interpreted. + line_comment = line.split("#", 1) + line = line_comment[0] + if line: + key, value = line.split("=", 1) + if key[-1] == "+": + key = key[:-1] + result[key] = value.strip('"') + + return result + + +def read_dracut_configs(tree): + """ + Read all dracut *.conf files from a predefined list of paths and parse them. + + The searched paths are: + - "/etc/dracut.conf.d/*.conf" + - "/usr/lib/dracut/dracut.conf.d/*.conf" + + Returns: dictionary as returned by '_read_glob_paths_with_parser()' with + configuration representation as returned by 'read_dracut_config()'. + + An example return value: + { + "/etc/dracut.conf.d": { + "sgdisk.conf": { + "install_items": " sgdisk " + }, + }, + "/usr/lib/dracut/dracut.conf.d": { + "xen.conf": { + "add_drivers": " xen-netfront xen-blkfront " + } + } + } + """ + checked_globs = [ + "/etc/dracut.conf.d/*.conf", + "/usr/lib/dracut/dracut.conf.d/*.conf" + ] + + return _read_glob_paths_with_parser(tree, checked_globs, read_dracut_config) + + +def read_keyboard_conf(tree): + """ + Read keyboard configuration for vconsole and X11. + + Returns: dictionary with at most two keys 'X11' and 'vconsole'. + 'vconsole' value is a dictionary representing configuration read from + /etc/vconsole.conf. + 'X11' value is a dictionary with at most two keys 'layout' and 'variant', + which values are extracted from X11 keyborad configuration. + + An example return value: + { + "X11": { + "layout": "us" + }, + "vconsole": { + "FONT": "eurlatgr", + "KEYMAP": "us" + } + } + """ + result = {} + + # read virtual console configuration + with contextlib.suppress(FileNotFoundError): + with open(f"{tree}/etc/vconsole.conf") as f: + values = parse_environment_vars(f.read()) + if values: + result["vconsole"] = values + + # read X11 keyboard configuration + with contextlib.suppress(FileNotFoundError): + # Example file content: + # + # Section "InputClass" + # Identifier "system-keyboard" + # MatchIsKeyboard "on" + # Option "XkbLayout" "us,sk" + # Option "XkbVariant" ",qwerty" + # EndSection + x11_config = {} + match_options_dict = { + "layout": r'Section\s+"InputClass"\s+.*Option\s+"XkbLayout"\s+"([\w,-]+)"\s+.*EndSection', + "variant": r'Section\s+"InputClass"\s+.*Option\s+"XkbVariant"\s+"([\w,-]+)"\s+.*EndSection' + } + with open(f"{tree}/etc/X11/xorg.conf.d/00-keyboard.conf") as f: + config = f.read() + for option, pattern in match_options_dict.items(): + match = re.search(pattern, config, re.DOTALL) + if match and match.group(1): + x11_config[option] = match.group(1) + + if x11_config: + result["X11"] = x11_config + + return result + + +def read_chrony_conf(tree): + """ + Read specific directives from Chrony configuration. Currently parsed + directives are: + - 'server' + - 'pool' + - 'peer' + - 'leapsectz' + + Returns: dictionary with the keys representing parsed directives from Chrony + configuration. Value of each key is a list of strings containing arguments + provided with each occurance of the directive in the configuration. + + An example return value: + { + "leapsectz": [ + "right/UTC" + ], + "pool": [ + "2.rhel.pool.ntp.org iburst" + ], + "server": [ + "169.254.169.123 prefer iburst minpoll 4 maxpoll 4" + ] + } + """ + result = {} + + parsed_directives = ["server", "pool", "peer", "leapsectz"] + + with contextlib.suppress(FileNotFoundError): + with open(f"{tree}/etc/chrony.conf") as f: + for line in f: + line = line.strip() + if not line: + continue + # skip comments + if line[0] in ["!", ";", "#", "%"]: + continue + split_line = line.split() + if split_line[0] in parsed_directives: + try: + directive_list = result[split_line[0]] + except KeyError: + directive_list = result[split_line[0]] = [] + directive_list.append(" ".join(split_line[1:])) + + return result + + +def read_systemd_service_dropin(dropin_dir_path): + """ + Read systemd .service unit drop-in configurations. + + Returns: dictionary representing the combined drop-in configurations. + + An example return value: + { + "Service": { + "Environment": "NM_CLOUD_SETUP_EC2=yes" + } + } + """ + # read all unit drop-in configurations + config_files = glob.glob(f"{dropin_dir_path}/*.conf") + + dropin_config = {} + for file in config_files: + dropin_config[os.path.basename(file)] = read_config_file_no_comment(file) + + return dropin_config + + +def read_systemd_service_dropins(tree): + """ + Read all systemd .service unit config files from a predefined list of paths + and parse them. + + The searched paths are: + - "/etc/systemd/system/*.service.d" + - "/usr/lib/systemd/system/*.service.d" + + Returns: dictionary as returned by '_read_glob_paths_with_parser()' with + configuration representation as returned by 'read_systemd_service_dropin()'. + + An example return value: + { + "/etc/systemd/system": { + "nm-cloud-setup.service.d": { + "Service": { + "Environment": "NM_CLOUD_SETUP_EC2=yes" + } + } + } + } + """ + checked_globs = [ + "/etc/systemd/system/*.service.d", + "/usr/lib/systemd/system/*.service.d" + ] + + return _read_glob_paths_with_parser(tree, checked_globs, read_systemd_service_dropin) + + +def read_config_file_no_comment(config_path): + """ + Read configuration files. + + Returns: list of strings representing uncommented lines read from the + configuration file. + + An example return value: + [ + "x /tmp/.sap*", + "x /tmp/.hdb*lock", + "x /tmp/.trex*lock" + ] + """ + file_lines = [] + + with open(config_path) as f: + for line in f: + line = line.strip() + if not line: + continue + if line[0] == "#": + continue + file_lines.append(line) + + return file_lines + + +def read_tmpfilesd_configs(tree): + """ + Read all tmpfiles.d *.conf files from a predefined list of paths and parse + them. + + The searched paths are: + - "/etc/tmpfiles.d/*.conf" + - "/usr/lib/tmpfiles.d/*.conf" + + Returns: dictionary as returned by '_read_glob_paths_with_parser()' with + configuration representation as returned by 'read_config_file_no_comment()'. + + An example return value: + { + "/etc/tmpfiles.d": { + "sap.conf": [ + "x /tmp/.sap*", + "x /tmp/.hdb*lock", + "x /tmp/.trex*lock" + ] + } + } + """ + checked_globs = [ + "/etc/tmpfiles.d/*.conf", + "/usr/lib/tmpfiles.d/*.conf" + ] + + return _read_glob_paths_with_parser(tree, checked_globs, read_config_file_no_comment) + + +def read_tuned_profile(tree): + """ + Read the Tuned active profile and profile mode. + + Returns: dictionary with at most two keys 'active_profile' and 'profile_mode'. + Value of each key is a string representing respective tuned configuration + value. + + An example return value: + { + "active_profile": "sap-hana", + "profile_mode": "manual" + } + """ + result = {} + config_files = ["active_profile", "profile_mode"] + + with contextlib.suppress(FileNotFoundError): + for config_file in config_files: + with open(f"{tree}/etc/tuned/{config_file}") as f: + value = f.read() + value = value.strip() + if value: + result[config_file] = value + + return result + + +def read_sysctld_config(config_path): + """ + Read sysctl configuration file. + + Returns: list of strings representing uncommented lines read from the + configuration file. + + An example return value: + [ + "kernel.pid_max = 4194304", + "vm.max_map_count = 2147483647" + ] + """ + values = [] + + with open(config_path) as f: + for line in f: + line = line.strip() + if not line: + continue + # skip comments + if line[0] in ["#", ";"]: + continue + values.append(line) + + return values + + +def read_sysctld_configs(tree): + """ + Read all sysctl.d *.conf files from a predefined list of paths and parse + them. + + The searched paths are: + - "/etc/sysctl.d/*.conf", + - "/usr/lib/sysctl.d/*.conf" + + Returns: dictionary as returned by '_read_glob_paths_with_parser()' with + configuration representation as returned by 'read_sysctld_config()'. + + An example return value: + { + "/etc/sysctl.d": { + "sap.conf": [ + "kernel.pid_max = 4194304", + "vm.max_map_count = 2147483647" + ] + } + } + """ + checked_globs = [ + "/etc/sysctl.d/*.conf", + "/usr/lib/sysctl.d/*.conf" + ] + + return _read_glob_paths_with_parser(tree, checked_globs, read_sysctld_config) + + +def read_security_limits_config(config_path): + """ + Read all configuration files from /etc/security/limits.d. + + Returns: dictionary with the keys representing names of configuration files + from /etc/security/limits.d. Value of each key is a dictionary representing + uncommented configuration values read from the configuration file. + + An example return value: + [ + { + "domain": "@sapsys", + "item": "nofile", + "type": "hard", + "value": "65536" + }, + { + "domain": "@sapsys", + "item": "nofile", + "type": "soft", + "value": "65536" + } + ] + """ + values = [] + + with open(config_path) as f: + for line in f: + line = line.strip() + # the '#' character introduces a comment - after which the rest of the line is ignored + split_line = line.split("#", 1) + line = split_line[0] + if not line: + continue + # Syntax of a line is " " + domain, limit_type, item, value = line.split() + values.append({ + "domain": domain, + "type": limit_type, + "item": item, + "value": value + }) + + return values + + +def read_security_limits_configs(tree): + """ + Read all security limits *.conf files from a predefined list of paths and + parse them. + + The searched paths are: + - "/etc/security/limits.conf" + - "/etc/security/limits.d/*.conf" + + Returns: dictionary as returned by '_read_glob_paths_with_parser()' with + configuration representation as returned by 'read_security_limits_config()'. + + An example return value: + { + "/etc/security/limits.d": { + "99-sap.conf": [ + { + "domain": "@sapsys", + "item": "nofile", + "type": "hard", + "value": "65536" + }, + { + "domain": "@sapsys", + "item": "nofile", + "type": "soft", + "value": "65536" + } + ] + } + } + """ + checked_globs = [ + "/etc/security/limits.conf", + "/etc/security/limits.d/*.conf" + ] + + return _read_glob_paths_with_parser(tree, checked_globs, read_config_file_no_comment) + + +def read_ssh_config(config_path): + """ + Read the content of provided SSH(d) configuration file. + + Returns: list of uncommented and non-empty lines read from the configuation + file. + + An example return value: + [ + "Match final all", + "Include /etc/crypto-policies/back-ends/openssh.config", + "GSSAPIAuthentication yes", + "ForwardX11Trusted yes", + "SendEnv LANG LC_CTYPE LC_NUMERIC LC_TIME LC_COLLATE LC_MONETARY LC_MESSAGES", + "SendEnv LC_PAPER LC_NAME LC_ADDRESS LC_TELEPHONE LC_MEASUREMENT", + "SendEnv LC_IDENTIFICATION LC_ALL LANGUAGE", + "SendEnv XMODIFIERS" + ] + """ + config_lines = [] + + with open(config_path) as f: + for line in f: + line = line.strip() + if not line: + continue + if line[0] == "#": + continue + config_lines.append(line) + + return config_lines + + +def read_ssh_configs(tree): + """ + Read all SSH configuration files from a predefined list of paths and + parse them. + + The searched paths are: + - "/etc/ssh/ssh_config" + - "/etc/ssh/ssh_config.d/*.conf" + + Returns: dictionary as returned by '_read_glob_paths_with_parser()' with + configuration representation as returned by 'read_ssh_config()'. + + An example return value: + { + "/etc/ssh": { + "ssh_config": [ + "Include /etc/ssh/ssh_config.d/*.conf" + ] + }, + "/etc/ssh/ssh_config.d": { + "05-redhat.conf": [ + "Match final all", + "Include /etc/crypto-policies/back-ends/openssh.config", + "GSSAPIAuthentication yes", + "ForwardX11Trusted yes", + "SendEnv LANG LC_CTYPE LC_NUMERIC LC_TIME LC_COLLATE LC_MONETARY LC_MESSAGES", + "SendEnv LC_PAPER LC_NAME LC_ADDRESS LC_TELEPHONE LC_MEASUREMENT", + "SendEnv LC_IDENTIFICATION LC_ALL LANGUAGE", + "SendEnv XMODIFIERS" + ] + } + } + """ + checked_globs = [ + "/etc/ssh/ssh_config", + "/etc/ssh/ssh_config.d/*.conf" + ] + + return _read_glob_paths_with_parser(tree, checked_globs, read_ssh_config) + + +def read_sshd_configs(tree): + """ + Read all SSHd configuration files from a predefined list of paths and + parse them. + + The searched paths are: + - "/etc/ssh/sshd_config" + - "/etc/ssh/sshd_config.d/*.conf" + + Returns: dictionary as returned by '_read_glob_paths_with_parser()' with + configuration representation as returned by 'read_ssh_config()'. + + An example return value: + { + "/etc/ssh": { + "sshd_config": [ + "HostKey /etc/ssh/ssh_host_rsa_key", + "HostKey /etc/ssh/ssh_host_ecdsa_key", + "HostKey /etc/ssh/ssh_host_ed25519_key", + "SyslogFacility AUTHPRIV", + "PermitRootLogin no", + "AuthorizedKeysFile\t.ssh/authorized_keys", + "PasswordAuthentication no", + "ChallengeResponseAuthentication no", + "GSSAPIAuthentication yes", + "GSSAPICleanupCredentials no", + "UsePAM yes", + "X11Forwarding yes", + "PrintMotd no", + "AcceptEnv LANG LC_CTYPE LC_NUMERIC LC_TIME LC_COLLATE LC_MONETARY LC_MESSAGES", + "AcceptEnv LC_PAPER LC_NAME LC_ADDRESS LC_TELEPHONE LC_MEASUREMENT", + "AcceptEnv LC_IDENTIFICATION LC_ALL LANGUAGE", + "AcceptEnv XMODIFIERS", + "Subsystem\tsftp\t/usr/libexec/openssh/sftp-server", + "ClientAliveInterval 420" + ] + } + } + """ + checked_globs = [ + "/etc/ssh/sshd_config", + "/etc/ssh/sshd_config.d/*.conf" + ] + + return _read_glob_paths_with_parser(tree, checked_globs, read_ssh_config) + + +def read_yum_repos(tree): + """ + Read all YUM/DNF repo files. + + The searched paths are: + - "/etc/yum.repos.d/*.repo" + + Returns: dictionary as returned by '_read_glob_paths_with_parser()' with + configuration representation as returned by '_read_inifile_to_dict()'. + + An example return value: + { + "/etc/yum.repos.d": { + "google-cloud.repo": { + "google-cloud-sdk": { + "baseurl": "https://packages.cloud.google.com/yum/repos/cloud-sdk-el8-x86_64", + "enabled": "1", + "gpgcheck": "1", + "gpgkey": "https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg", + "name": "Google Cloud SDK", + "repo_gpgcheck": "0" + }, + "google-compute-engine": { + "baseurl": "https://packages.cloud.google.com/yum/repos/google-compute-engine-el8-x86_64-stable", + "enabled": "1", + "gpgcheck": "1", + "gpgkey": "https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg", + "name": "Google Compute Engine", + "repo_gpgcheck": "0" + } + } + } + } + """ + checked_globs = [ + "/etc/yum.repos.d/*.repo" + ] + + return _read_glob_paths_with_parser(tree, checked_globs, _read_inifile_to_dict) + + +def read_sudoers(tree): + """ + Read uncommented lines from sudoers configuration file and /etc/sudoers.d + This functions does not actually do much of a parsing, as sudoers file + format grammar is a bit too much for our purpose. + Any #include or #includedir directives are ignored by this function. + + Returns: dictionary with the keys representing names of read configuration + files, /etc/sudoers and files from /etc/sudoers.d. Value of each key is + a list of strings representing uncommented lines read from the configuration + file. + + An example return value: + { + "/etc/sudoers": [ + "Defaults !visiblepw", + "Defaults always_set_home", + "Defaults match_group_by_gid", + "Defaults always_query_group_plugin", + "Defaults env_reset", + "Defaults env_keep = \"COLORS DISPLAY HOSTNAME HISTSIZE KDEDIR LS_COLORS\"", + "Defaults env_keep += \"MAIL PS1 PS2 QTDIR USERNAME LANG LC_ADDRESS LC_CTYPE\"", + "Defaults env_keep += \"LC_COLLATE LC_IDENTIFICATION LC_MEASUREMENT LC_MESSAGES\"", + "Defaults env_keep += \"LC_MONETARY LC_NAME LC_NUMERIC LC_PAPER LC_TELEPHONE\"", + "Defaults env_keep += \"LC_TIME LC_ALL LANGUAGE LINGUAS _XKB_CHARSET XAUTHORITY\"", + "Defaults secure_path = /sbin:/bin:/usr/sbin:/usr/bin", + "root\tALL=(ALL) \tALL", + "%wheel\tALL=(ALL)\tALL", + "ec2-user\tALL=(ALL)\tNOPASSWD: ALL" + ] + } + """ + result = {} + + def _parse_sudoers_file(f): + lines = [] + for line in f: + line = line.strip() + if not line: + continue + if line[0] == "#": + continue + lines.append(line) + return lines + + with contextlib.suppress(FileNotFoundError): + with open(f"{tree}/etc/sudoers") as f: + lines = _parse_sudoers_file(f) + if lines: + result["/etc/sudoers"] = lines + + sudoersd_result = {} + for file in glob.glob(f"{tree}/etc/sudoers.d/*"): + with open(file) as f: + lines = _parse_sudoers_file(f) + if lines: + result[os.path.basename(file)] = lines + if sudoersd_result: + result["/etc/sudoers.d"] = sudoersd_result + + return result + + +def read_udev_rules(tree): + """ + Read udev rules defined in /etc/udev/rules.d. + + Returns: dictionary with the keys representing names of files with udev + rules from /etc/udev/rules.d. Value of each key is a list of strings + representing uncommented lines read from the configuration file. If + the file is empty (e.g. because of masking udev configuration installed + by an RPM), an emptu list is returned as the respective value. + + An example return value: + { + "80-net-name-slot.rules": [] + } + """ + result = {} + + for file in glob.glob(f"{tree}/etc/udev/rules.d/*.rules"): + with open(file) as f: + lines = [] + for line in f: + line = line.strip() + if not line: + continue + if line[0] == "#": + continue + lines.append(line) + # include also empty files in the report + result[os.path.basename(file)] = lines + + return result + + +def _read_inifile_to_dict(config_path): + """ + Read INI file from the provided path + + Returns: a dictionary representing the provided INI file content. + + An example return value: + { + "google-cloud-sdk": { + "baseurl": "https://packages.cloud.google.com/yum/repos/cloud-sdk-el8-x86_64", + "enabled": "1", + "gpgcheck": "1", + "gpgkey": "https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg", + "name": "Google Cloud SDK", + "repo_gpgcheck": "0" + }, + "google-compute-engine": { + "baseurl": "https://packages.cloud.google.com/yum/repos/google-compute-engine-el8-x86_64-stable", + "enabled": "1", + "gpgcheck": "1", + "gpgkey": "https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg", + "name": "Google Compute Engine", + "repo_gpgcheck": "0" + } + } + """ + result = {} + + with contextlib.suppress(FileNotFoundError): + with open(config_path, encoding="utf-8") as f: + parser = configparser.RawConfigParser() + # prevent conversion of the opion name to lowercase + parser.optionxform = lambda option: option + parser.read_file(f) + + for section in parser.sections(): + section_config = dict(parser.items(section)) + if section_config: + result[section] = section_config + + return result + + +def read_dnf_conf(tree): + """ + Read DNF configuration and defined variable files. + + Returns: dictionary with at most two keys 'dnf.conf' and 'vars'. + 'dnf.conf' value is a dictionary representing the DNF configuration + file content. + 'vars' value is a dictionary which keys represent names of files from + /etc/dnf/vars/ and values are strings representing the file content. + + An example return value: + { + "dnf.conf": { + "main": { + "installonly_limit": "3" + } + }, + "vars": { + "releasever": "8.4" + } + } + """ + result = {} + + dnf_config = _read_inifile_to_dict(f"{tree}/etc/dnf/dnf.conf") + if dnf_config: + result["dnf.conf"] = dnf_config + + dnf_vars = {} + for file in glob.glob(f"{tree}/etc/dnf/vars/*"): + with open(file) as f: + dnf_vars[os.path.basename(file)] = f.read().strip() + if dnf_vars: + result["vars"] = dnf_vars + + return result + + +def read_dnf_automatic_conf(tree): + """ + Read DNF Automatic configuation. + + Returns: dictionary as returned by '_read_inifile_to_dict()'. + + An example return value: + { + "base": { + "debuglevel": "1" + }, + "command_email": { + "email_from": "root@example.com", + "email_to": "root" + }, + "commands": { + "apply_updates": "yes", + "download_updates": "yes", + "network_online_timeout": "60", + "random_sleep": "0", + "upgrade_type": "security" + }, + "email": { + "email_from": "root@example.com", + "email_host": "localhost", + "email_to": "root" + }, + "emitters": { + "emit_via": "stdio" + } + } + """ + return _read_inifile_to_dict(f"{tree}/etc/dnf/automatic.conf") + + +def read_authselect_conf(tree): + """ + Read authselect configuration. + + Returns: dictionary with two keys 'profile-id' and 'enabled-features'. + 'profile-id' value is a string representing the configured authselect + profile. + 'enabled-features' value is a list of strings representing enabled features + of the used authselect profile. In case there are no specific features + enabled, the list is empty. + + An example return value: + { + "enabled-features": [], + "profile-id": "sssd" + } + """ + result = {} + + with contextlib.suppress(FileNotFoundError): + with open(f"{tree}/etc/authselect/authselect.conf") as f: + # the first line is always the profile ID + # following lines are listing enabled features + # lines starting with '#' and empty lines are skipped + authselect_conf_lines = [] + for line in f: + line = line.strip() + if not line: + continue + if line[0] == "#": + continue + authselect_conf_lines.append(line) + if authselect_conf_lines: + result["profile-id"] = authselect_conf_lines[0] + result["enabled-features"] = authselect_conf_lines[1:] + + return result + + +def read_resolv_conf(tree): + """ + Read /etc/resolv.conf. + + Returns: a list of uncommented lines from the /etc/resolv.conf. + + An example return value: + [ + "search redhat.com", + "nameserver 192.168.1.1", + "nameserver 192.168.1.2" + ] + """ + result = [] + + with contextlib.suppress(FileNotFoundError): + with open(f"{tree}/resolv.conf") as f: + for line in f: + line = line.strip() + if not line: + continue + if line[0] == "#": + continue + result.append(line) + + return result + + +def append_filesystem(report, tree, *, is_ostree=False): + if os.path.exists(f"{tree}/etc/os-release"): + report["packages"] = rpm_packages(tree) + if not is_ostree: + report["rpm-verify"] = rpm_verify(tree) + + not_installed_docs = rpm_not_installed_docs(tree) + if not_installed_docs: + report["rpm_not_installed_docs"] = not_installed_docs + + with open(f"{tree}/etc/os-release") as f: + report["os-release"] = parse_environment_vars(f.read()) + + report["services-enabled"] = read_services(tree, "enabled") + report["services-disabled"] = read_services(tree, "disabled") + + default_target = read_default_target(tree) + if default_target: + report["default-target"] = default_target + + with contextlib.suppress(FileNotFoundError): + with open(f"{tree}/etc/hostname") as f: + report["hostname"] = f.read().strip() + + with contextlib.suppress(FileNotFoundError): + report["timezone"] = os.path.basename(os.readlink(f"{tree}/etc/localtime")) + + authselect_conf = read_authselect_conf(tree) + if authselect_conf: + report["authselect"] = authselect_conf + + chrony_conf = read_chrony_conf(tree) + if chrony_conf: + report["chrony"] = chrony_conf + + cloud_init_configs = read_cloud_init_configs(tree) + if cloud_init_configs: + report["cloud-init"] = cloud_init_configs + + container_images = read_container_images(tree) + if container_images: + report["container-images"] = container_images + + dnf_conf = read_dnf_conf(tree) + if dnf_conf: + report["dnf"] = dnf_conf + + dnf_automatic = read_dnf_automatic_conf(tree) + if dnf_automatic: + report["/etc/dnf/automatic.conf"] = dnf_automatic + + yum_repos = read_yum_repos(tree) + if yum_repos: + report["yum_repos"] = yum_repos + + dracut_configs = read_dracut_configs(tree) + if dracut_configs: + report["dracut"] = dracut_configs + + with contextlib.suppress(FileNotFoundError): + report["firewall-enabled"] = read_firewall_zone(tree) + + firewall_default_zone = read_firewall_default_zone(tree) + if firewall_default_zone: + report["firewall-default-zone"] = firewall_default_zone + + fstab = read_fstab(tree) + if fstab: + report["fstab"] = fstab + + hosts = read_hosts(tree) + if hosts: + report["hosts"] = hosts + + keyboard = read_keyboard_conf(tree) + if keyboard: + report["keyboard"] = keyboard + + security_limits_configs = read_security_limits_configs(tree) + if security_limits_configs: + report["security-limits"] = security_limits_configs + + locale = read_locale(tree) + if locale: + report["locale"] = locale + + logind_configs = read_logind_configs(tree) + if logind_configs: + report["systemd-logind"] = logind_configs + + with contextlib.suppress(FileNotFoundError): + with open(f"{tree}/etc/machine-id") as f: + report["machine-id"] = f.readline() + + modprobe_configs = read_modprobe_configs(tree) + if modprobe_configs: + report["modprobe"] = modprobe_configs + + tmpfilesd_configs = read_tmpfilesd_configs(tree) + if tmpfilesd_configs: + report["tmpfiles.d"] = tmpfilesd_configs + + rhsm = read_rhsm(tree) + if rhsm: + report["rhsm"] = rhsm + + selinux = read_selinux_info(tree, is_ostree) + if selinux: + report["selinux"] = selinux + + ssh_configs = read_ssh_configs(tree) + if ssh_configs: + report["ssh_config"] = ssh_configs + + sshd_configs = read_sshd_configs(tree) + if sshd_configs: + report["sshd_config"] = sshd_configs + + sudoers_conf = read_sudoers(tree) + if sudoers_conf: + report["sudoers"] = sudoers_conf + + sysconfig = read_sysconfig(tree) + if sysconfig: + report["sysconfig"] = sysconfig + + sysctld_configs = read_sysctld_configs(tree) + if sysctld_configs: + report["sysctl.d"] = sysctld_configs + + systemd_service_dropins = read_systemd_service_dropins(tree) + if systemd_service_dropins: + report["systemd-service-dropins"] = systemd_service_dropins + + tuned_profile = read_tuned_profile(tree) + if tuned_profile: + report["tuned"] = tuned_profile + + resolv_conf = read_resolv_conf(tree) + # add even empty resolv_conf to the report to express that it is empty or non-existent + report["/etc/resolv.conf"] = resolv_conf + + udev_rules = read_udev_rules(tree) + if udev_rules: + report["/etc/udev/rules.d"] = udev_rules + + with open(f"{tree}/etc/passwd") as f: + report["passwd"] = sorted(f.read().strip().split("\n")) + + with open(f"{tree}/etc/group") as f: + report["groups"] = sorted(f.read().strip().split("\n")) + + if is_ostree: + with open(f"{tree}/usr/lib/passwd") as f: + report["passwd-system"] = sorted(f.read().strip().split("\n")) + + with open(f"{tree}/usr/lib/group") as f: + report["groups-system"] = sorted(f.read().strip().split("\n")) + + if os.path.exists(f"{tree}/boot") and len(os.listdir(f"{tree}/boot")) > 0: + assert "bootmenu" not in report + with contextlib.suppress(FileNotFoundError): + with open(f"{tree}/boot/grub2/grubenv") as f: + report["boot-environment"] = parse_environment_vars(f.read()) + report["bootmenu"] = read_boot_entries(f"{tree}/boot") + + elif len(glob.glob(f"{tree}/vmlinuz-*")) > 0: + assert "bootmenu" not in report + with open(f"{tree}/grub2/grubenv") as f: + report["boot-environment"] = parse_environment_vars(f.read()) + report["bootmenu"] = read_boot_entries(tree) + elif len(glob.glob(f"{tree}/EFI")): + print("EFI partition", file=sys.stderr) + + +def volume_group_for_device(device: str) -> str: + # Find the volume group that belongs to the device specified via `parent` + vg_name = None + count = 0 + + cmd = [ + "pvdisplay", "-C", "--noheadings", "-o", "vg_name", device + ] + + while True: + res = subprocess.run(cmd, + check=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="UTF-8") + + if res.returncode == 5: + if count == 10: + raise RuntimeError("Could not find parent device") + time.sleep(1*count) + count += 1 + continue + + if res.returncode != 0: + raise RuntimeError(res.stderr.strip()) + + vg_name = res.stdout.strip() + if vg_name: + break + + return vg_name + + +def ensure_device_file(path: str, major: int, minor: int): + """Ensure the device file with the given major, minor exists""" + os.makedirs(os.path.dirname(path), exist_ok=True) + if not os.path.exists(path): + os.mknod(path, 0o600 | stat.S_IFBLK, os.makedev(major, minor)) + + +def discover_lvm(dev: str, parent: devices.Device, devmgr: devices.DeviceManager): + # find the volume group name for the device file + vg_name = volume_group_for_device(dev) + + # activating LVM is done OSBuild side. + # However we still have to get OSBuild the name of the VG to open + + # Find all logical volumes in the volume group + cmd = [ + "lvdisplay", "-C", "--noheadings", + "-o", "lv_name,path,lv_kernel_major,lv_kernel_minor", + "--separator", ";", + vg_name + ] + + res = subprocess.run(cmd, + check=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="UTF-8") + + if res.returncode != 0: + raise RuntimeError(res.stderr.strip()) + + data = res.stdout.strip() + parsed = list(map(lambda l: l.split(";"), data.split("\n"))) + volumes = OrderedDict() + + # devices_map stores for each device path onto the system the corresponding + # OSBuild's Device object + devices_map = {} + + for vol in parsed: + vol = list(map(lambda v: v.strip(), vol)) + assert len(vol) == 4 + name, _, _, _ = vol + + options = { + "volume": name, + } + + # Create an OSBuild device object for the LVM partition + info = index.get_module_info("Device", "org.osbuild.lvm2.lv") + device = devices.Device( + name, + info, + parent, + options) + if not info: + raise RuntimeError("can't find org.osbuild.lvm2.lv") + jsonschema.validate(options, info.get_schema()) + reply = devmgr.open(device) + voldev = reply["path"] # get the path where is mounted the device + minor = reply["node"]["minor"] + major = reply["node"]["major"] + + info = { + "device": voldev + } + ensure_device_file(voldev, int(major), int(minor)) + + read_partition(voldev, info) + volumes[name] = info + if name.startswith("root"): + volumes.move_to_end(name, last=False) + + # associate the device path with the Device object, we will need it to + # mount later on. + devices_map[voldev] = device + # get back both the device map and the result that'll go in the JSON report + return devices_map, { + "lvm": True, + "lvm.vg": vg_name, + "lvm.volumes": volumes + } + + +def partition_is_lvm(part: Dict) -> bool: + return part["type"].upper() in ["E6D6D379-F507-44C2-A23C-238F2A3DF928", "8E"] + + +def append_partitions(report, image): + partitions = report["partitions"] + with tempfile.TemporaryDirectory() as mountpoint: + with host.ServiceManager(monitor=monitor.NullMonitor(1)) as mgr: + + devmgr = devices.DeviceManager(mgr, "/dev", os.path.dirname(image)) + + # Device map associate a path onto where the device is mounted with its + # corresponding Device object. Mount will require both the path and the + # Device object in order to do its job. + devices_map = {} + filesystems = {} + for part in partitions: + start, size = part["start"], part["size"] + ret = loop_open( + devmgr, + part["partuuid"], + image, + size, + offset=start) + dev = ret["path"] + devices_map[dev] = ret["Device"] + read_partition(dev, part) + if partition_is_lvm(part): + dmap, lvm = discover_lvm(dev, ret["Device"], devmgr) + devices_map.update(dmap) + for vol in lvm["lvm.volumes"].values(): + if vol["fstype"]: + mntopts = [] + # we cannot recover since the underlying loopback device is mounted + # read-only but since we are using the it through the device mapper + # the fact might not be communicated and the kernel attempt a to + # a recovery of the filesystem, which will lead to a kernel panic + if vol["fstype"] in ("ext4", "ext3", "xfs"): + mntopts = ["norecovery"] + filesystems[vol["uuid"].upper()] = { + "device": vol["device"], + "mntops": mntopts + } + del vol["device"] + part.update(lvm) + elif part["uuid"] and part["fstype"]: + filesystems[part["uuid"].upper()] = { + "device": dev + } + + # find partition with fstab and read it + fstab = [] + for fs in filesystems.values(): + dev, opts = fs["device"], fs.get("mntops") + with mount(dev, opts) as tree: + if os.path.exists(f"{tree}/etc/fstab"): + fstab.extend(read_fstab(tree)) + break + # sort the fstab entries by the mountpoint + fstab = sorted(fstab, key=operator.itemgetter(1)) + + # mount all partitions to ther respective mount points + root_tree = "" + mmgr = mounts.MountManager(devmgr, mountpoint) + for n, fstab_entry in enumerate(fstab): + part_uuid = fstab_entry[0].split("=")[1].upper() + part_device = filesystems[part_uuid]["device"] + part_mountpoint = fstab_entry[1] + part_fstype = fstab_entry[2] + part_options = fstab_entry[3].split(",") + part_options += filesystems[part_uuid].get("mntops", []) + + if "ext4" in part_fstype: + info = index.get_module_info("Mount", "org.osbuild.ext4") + elif "vfat" in part_fstype: + info = index.get_module_info("Mount", "org.osbuild.fat") + elif "btrfs" in part_fstype: + info = index.get_module_info("Mount", "org.osbuild.btrfs") + elif "xfs" in part_fstype: + info = index.get_module_info("Mount", "org.osbuild.xfs") + else: + raise RuntimeError("Unknown file system") + if not info: + raise RuntimeError(f"Can't find org.osbuild.{part_fstype}") + + # the first mount point should be root + if n == 0: + if part_mountpoint != "/": + raise RuntimeError("The first mountpoint in sorted fstab entries is not '/'") + root_tree = mountpoint + + # prepare the options to mount the partition + options = {} + for option in part_options: + if option == "defaults": # defaults is not a supported option + continue + + if "=" in option: + parts = option.split("=") + key = parts[0] + val = parts[1] + + # uid and gid must be integers + if key == "uid" or key == "gid": + val = int(val) + + options[key] = val + else: + options[option] = True + + options["readonly"] = True + + # Validate the options + # + # The mount manager is taking care of opening the file system for us + # so we don't have access to the json objects that'll be used to + # invoke the mounter. However we're only interested at validating the + # options. We can extract these from the schame to validate them + # only. + jsonschema.validate(options, info.get_schema()["properties"]["options"]) + + # Finally mount + mnt_kwargs = { + "name": part_device, + "info": info, + # retrieves the associated Device Object + "device": devices_map[part_device], + "target": part_mountpoint, + "options": options, + } + # XXX: remove inspect and just add once osbuild PR#1501 is merged + import inspect + if "partition" in inspect.getfullargspec(mounts.Mount).args: + # Just a filesystem, no partitions on this device + mnt_kwargs["partition"] = None + mmgr.mount(mounts.Mount(**mnt_kwargs)) + if not root_tree: + raise RuntimeError("The root filesystem tree is not mounted") + + append_filesystem(report, root_tree) + + +def analyse_image(image) -> Dict[str, Any]: + imgfmt = read_image_format(image) + report: Dict[str, Any] = {"image-format": imgfmt} + + with convert_image(image, imgfmt) as target: + size = os.stat(target).st_size + with host.ServiceManager(monitor=monitor.NullMonitor(1)) as mgr: + device = loop_open( + devices.DeviceManager(mgr, "/dev", os.path.dirname(target)), + os.path.basename(target), + target, + size, + offset=0)["path"] + report["bootloader"] = read_bootloader_type(device) + report.update(read_partition_table(device)) + if not report["partition-table"]: + # no partition table: mount device and treat it as a partition + with mount(device) as tree: + append_filesystem(report, tree) + return report + + # close loop device and descend into partitions on image file + append_partitions(report, target) + return report + + +def append_directory(report, tree): + with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir: + tree_ro = os.path.join(tmpdir, "root_ro") + os.makedirs(tree_ro) + # Make sure that the tools which analyse the directory in-place + # can not modify its content (e.g. create additional files). + # mount_at() always mounts the source as read-only! + with mount_at(tree, tree_ro, ["bind"]) as _: + if os.path.lexists(f"{tree}/ostree"): + os.makedirs(f"{tree}/etc", exist_ok=True) + with mount_at(f"{tree}/usr/etc", f"{tree}/etc", extra=["--bind"]): + append_filesystem(report, tree_ro, is_ostree=True) + else: + append_filesystem(report, tree_ro) + + +def append_ostree_repo(report, repo): + ostree = functools.partial(run_ostree, repo=repo) + + r = ostree("config", "get", "core.mode") + report["ostree"] = { + "repo": { + "core.mode": r.stdout.strip() + } + } + + r = ostree("refs") + refs = r.stdout.strip().split("\n") + report["ostree"]["refs"] = refs + + resolved = {r: ostree("rev-parse", r).stdout.strip() for r in refs} + commit = resolved[refs[0]] + + with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir: + tree = os.path.join(tmpdir, "tree") + ostree("checkout", "--force-copy", commit, tree) + append_directory(report, tree) + + +def analyse_directory(path): + report = {} + + if os.path.exists(os.path.join(path, "compose.json")): + report["type"] = "ostree/commit" + repo = os.path.join(path, "repo") + append_ostree_repo(report, repo) + elif os.path.isdir(os.path.join(path, "refs")): + report["type"] = "ostree/repo" + repo = os.path.join(path, "repo") + append_ostree_repo(report, repo) + else: + append_directory(report, path) + + return report + + +def is_tarball(path): + mtype, _ = mimetypes.guess_type(path) + return mtype == "application/x-tar" + + +def analyse_tarball(path): + with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir: + tree = os.path.join(tmpdir, "root") + os.makedirs(tree) + command = [ + "tar", + "--selinux", + "--xattrs", + "--acls", + "-x", + "--auto-compress", + "-f", path, + "-C", tree + ] + subprocess.run(command, + stdout=sys.stderr, + check=True) + # gce image type contains virtual raw disk inside a tarball + if os.path.isfile(f"{tree}/disk.raw"): + return analyse_image(f"{tree}/disk.raw") + else: + return analyse_directory(tree) + + +def is_compressed(path): + _, encoding = mimetypes.guess_type(path) + return encoding in ["xz", "gzip", "bzip2"] + + +def analyse_compressed(path): + _, encoding = mimetypes.guess_type(path) + + if encoding == "xz": + command = ["unxz", "--force"] + elif encoding == "gzip": + command = ["gunzip", "--force"] + elif encoding == "bzip2": + command = ["bunzip2", "--force"] + else: + raise ValueError(f"Unsupported compression: {encoding}") + + with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir: + subprocess.run(["cp", "--reflink=auto", "-a", path, tmpdir], + check=True) + + files = os.listdir(tmpdir) + archive = os.path.join(tmpdir, files[0]) + subprocess.run(command + [archive], check=True) + + files = os.listdir(tmpdir) + assert len(files) == 1 + image = os.path.join(tmpdir, files[0]) + return analyse_image(image) + + +def is_iso(path): + return "iso" in pathlib.Path(path).suffix + + +def analyse_iso(path): + with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp: + report = None + subprocess.run(["mount", "-o", "loop", path, tmp], check=True) + try: + report = analyse_tarball(os.path.join(tmp, "liveimg.tar.gz")) + except Exception as e: + print(f"{e}", file=sys.stderr) + subprocess.run(["umount", tmp], check=True) + return report + + +def main(): + parser = argparse.ArgumentParser(description="Inspect an image") + parser.add_argument("target", metavar="TARGET", + help="The file or directory to analyse", + type=os.path.abspath) + + args = parser.parse_args() + target = args.target + + if os.path.isdir(target): + report = analyse_directory(target) + elif is_tarball(target): + report = analyse_tarball(target) + elif is_compressed(target): + report = analyse_compressed(target) + elif is_iso(target): + report = analyse_iso(target) + else: + report = analyse_image(target) + + json.dump(report, sys.stdout, sort_keys=True, indent=2) + + +if __name__ == "__main__": + main()