#!/usr/bin/python3 import argparse import configparser import contextlib import functools import glob import json import mimetypes import operator import os import pathlib import platform import re import stat import subprocess import sys import tempfile import time import xml.etree.ElementTree from collections import OrderedDict from typing import Any, Dict, List import jsonschema import yaml from osbuild import devices, host, meta, monitor, mounts index = meta.Index("/usr/lib/osbuild/") SECTOR_SIZE = 512 def run_ostree(*args, _input=None, _check=True, **kwargs): args = list(args) + [f'--{k}={v}' for k, v in kwargs.items()] print("ostree " + " ".join(args), file=sys.stderr) res = subprocess.run(["ostree"] + args, encoding="utf-8", stdout=subprocess.PIPE, input=_input, check=_check) return res class OSBuildDeviceManager(devices.DeviceManager): """ Thin wrapper around the DeviceManager for opening devices """ def open_loopback(self, name, image, size, offset=0) -> Dict[str, Any]: """ Uses a DeviceManager to open the `name` at `offset`. Returns a Device object and the path onto which the image was loopback mounted. """ info = index.get_module_info("Device", "org.osbuild.loopback") if not info: raise RuntimeError("Can't load org.osbuild.loopback") fname = os.path.basename(image) options = { "filename": fname, "start": offset // SECTOR_SIZE, "size": size // SECTOR_SIZE } jsonschema.validate(options, info.get_schema()) dev = devices.Device(name, info, None, options) reply = self.open(dev) return { "Device": dev, "path": os.path.join(self.devpath, reply["path"]) } def open_lvm_lv(self, lv_name: str, parent: devices.Device): """ Open a logical volume and return the path to the device node """ info = index.get_module_info("Device", "org.osbuild.lvm2.lv") if not info: raise RuntimeError("can't find org.osbuild.lvm2.lv") options = { "volume": lv_name, } jsonschema.validate(options, info.get_schema()) dev = devices.Device(lv_name, info, parent, options) reply = self.open(dev) return { "Device": dev, "path": os.path.join(self.devpath, reply["path"]) } @contextlib.contextmanager def convert_image(image, fmt): with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp: if fmt["type"] != "raw": target = os.path.join(tmp, "image.raw") # A bug exists in qemu that causes the conversion to raw to fail # on aarch64 systems with a LOT of CPUs. A workaround is to use # a single coroutine to do the conversion. It doesn't slow down # the conversion by much, but it hangs about half the time without # the limit set. 😢 # Bug: https://bugs.launchpad.net/qemu/+bug/1805256 if platform.machine() == 'aarch64': subprocess.run( ["qemu-img", "convert", "-m", "1", "-O", "raw", image, target], check=True ) else: subprocess.run( ["qemu-img", "convert", "-O", "raw", image, target], check=True ) else: target = image yield target @contextlib.contextmanager def mount_at(device, mountpoint, options=None, extra=None): if options is None: options = [] if extra is None: extra = [] opts = ",".join(["ro"] + options) subprocess.run(["mount", "-o", opts] + extra + [device, mountpoint], check=True) try: yield mountpoint finally: subprocess.run(["umount", "--lazy", mountpoint], check=True) @contextlib.contextmanager def mount(device, options=None): options = options or [] opts = ",".join(["ro"] + options) with tempfile.TemporaryDirectory() as mountpoint: subprocess.run(["mount", "-o", opts, device, mountpoint], check=True) try: yield mountpoint finally: subprocess.run(["umount", "--lazy", mountpoint], check=True) def parse_environment_vars(s): r = {} for line in s.split("\n"): line = line.strip() if not line: continue if line[0] == '#': continue key, value = line.split("=", 1) r[key] = value.strip('"') return r # Parses output of `systemctl list-unit-files` def parse_unit_files(s, expected_state): r = [] for line in s.split("\n")[1:]: state = "" unit = "" try: unit, state, *_ = line.split() except ValueError: pass if state != expected_state: continue r.append(unit) return r def subprocess_check_output(argv, parse_fn=None) -> Any: try: output = subprocess.check_output(argv, encoding="utf-8") except subprocess.CalledProcessError as e: sys.stderr.write(f"--- Output from {argv}:\n") sys.stderr.write(e.stdout) sys.stderr.write("\n--- End of the output\n") raise return parse_fn(output) if parse_fn else output def read_container_images(tree): """ Read installed containers Returns: a dictionary listing the container images in the format like `podman images --format json` but with less information. NB: The parsing is done "manually" since running `podman` in the chroot does not work. """ images = [] images_index = os.path.join("overlay-images", "images.json") for d in ("/var/lib/containers/storage", ): path = os.path.join(tree, d.lstrip("/"), images_index) try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) except FileNotFoundError: continue for image in data: img = { "Id": image["id"], "Digest": image["digest"], "Names": image["names"], } created = image.get("created") if created: img["Created"] = created images.append(img) return images def read_image_format(device) -> Dict[str, str]: """ Read image format. Returns: dictionary with at least one key 'type'. 'type' value is a string representing the format of the image. In case the type is 'qcow2', the returned dictionary contains second key 'compat' with a string value representing the compatibility version of the 'qcow2' image. An example return value: { "compat": "1.1", "type": "qcow2" } """ qemu = subprocess_check_output(["qemu-img", "info", "--output=json", device], json.loads) image_format = qemu["format"] result = {"type": image_format} if image_format == "qcow2": result["compat"] = qemu["format-specific"]["data"]["compat"] return result def read_partition(device, partition): """ Read block device attributes using 'blkid' and extend the passed 'partition' dictionary. Returns: the 'partition' dictionary provided as an argument, extended with 'label', 'uuid' and 'fstype' keys and their values. """ res = subprocess.run(["blkid", "-c", "/dev/null", "--output", "export", device], check=False, encoding="utf-8", stdout=subprocess.PIPE) if res.returncode == 0: blkid = parse_environment_vars(res.stdout) else: blkid = {} partition["label"] = blkid.get("LABEL") # doesn't exist for mbr partition["uuid"] = blkid.get("UUID") partition["fstype"] = blkid.get("TYPE") return partition def read_partition_table(device): """ Read information related to found partitions and partitioning table from the device. Returns: dictionary with three keys - 'partition-table', 'partition-table-id' and 'partitions'. 'partition-table' value is a string with the type of the partition table or 'None'. 'partition-table-id' value is a string with the ID of the partition table or 'None'. 'partitions' value is a list of dictionaries representing found partitions. An example return value: { "partition-table": "gpt", "partition-table-id": "DA237A6F-F0D4-47DF-BB50-007E00DB347C", "partitions": [ { "bootable": false, "partuuid": "64AF1EC2-0328-406A-8F36-83016E6DD858", "size": 1048576, "start": 1048576, "type": "21686148-6449-6E6F-744E-656564454649", }, { "bootable": false, "partuuid": "D650D523-06F6-4B90-9204-8F998FE9703C", "size": 6442450944, "start": 2097152, "type": "0FC63DAF-8483-4772-8E79-3D69D8477DE4", } ] } """ partitions = [] info = {"partition-table": None, "partition-table-id": None, "partitions": partitions} try: sfdisk = subprocess_check_output(["sfdisk", "--json", device], json.loads) except subprocess.CalledProcessError: # This handles a case, when the device does contain a filesystem, # but there is no partition table. partitions.append(read_partition(device, {})) return info ptable = sfdisk["partitiontable"] assert ptable["unit"] == "sectors" is_dos = ptable["label"] == "dos" ssize = ptable.get("sectorsize", SECTOR_SIZE) for i, p in enumerate(ptable["partitions"]): partuuid = p.get("uuid") if not partuuid and is_dos: # For dos/mbr partition layouts the partition uuid # is generated. Normally this would be done by # udev+blkid, when the partition table is scanned. # 'sfdisk' prefixes the partition id with '0x' but # 'blkid' does not; remove it to mimic 'blkid' table_id = ptable['id'][2:] partuuid = f"{table_id:.33s}-{i + 1:02x}" partitions.append({ "bootable": p.get("bootable", False), "type": p["type"], "start": p["start"] * ssize, "size": p["size"] * ssize, "partuuid": partuuid }) info["partitions"] = sorted(info["partitions"], key=operator.itemgetter("partuuid")) info["partition-table"] = ptable["label"] info["partition-table-id"] = ptable["id"] return info def read_bootloader_type(device) -> str: """ Read bootloader type from the provided device. Returns: string representing the found bootloader. Function can return two values: - 'grub' - 'unknown' """ with open(device, "rb") as f: if b"GRUB" in f.read(SECTOR_SIZE): return "grub" return "unknown" def read_boot_entries(boot_dir): """ Read boot entries. Returns: list of dictionaries representing configured boot entries. An example return value: [ { "grub_arg": "--unrestricted", "grub_class": "kernel", "grub_users": "$grub_users", "id": "rhel-20210429130346-0-rescue-c116920b13f44c59846f90b1057605bc", "initrd": "/boot/initramfs-0-rescue-c116920b13f44c59846f90b1057605bc.img", "linux": "/boot/vmlinuz-0-rescue-c116920b13f44c59846f90b1057605bc", "options": "$kernelopts", "title": "Red Hat Enterprise Linux (0-rescue-c116920b13f44c59846f90b1057605bc) 8.4 (Ootpa)", "version": "0-rescue-c116920b13f44c59846f90b1057605bc" }, { "grub_arg": "--unrestricted", "grub_class": "kernel", "grub_users": "$grub_users", "id": "rhel-20210429130346-4.18.0-305.el8.x86_64", "initrd": "/boot/initramfs-4.18.0-305.el8.x86_64.img $tuned_initrd", "linux": "/boot/vmlinuz-4.18.0-305.el8.x86_64", "options": "$kernelopts $tuned_params", "title": "Red Hat Enterprise Linux (4.18.0-305.el8.x86_64) 8.4 (Ootpa)", "version": "4.18.0-305.el8.x86_64" } ] """ entries = [] for conf in glob.glob(f"{boot_dir}/loader/entries/*.conf"): with open(conf, encoding="utf-8") as f: entry = {} for line in f: line = line.strip() if not line or line.startswith("#"): continue key, value = line.split(" ", 1) entry[key] = value entries.append(entry) return sorted(entries, key=lambda e: e["title"]) def rpm_verify(tree): """ Read the output of 'rpm --verify'. Returns: dictionary with two keys 'changed' and 'missing'. 'changed' value is a dictionary with the keys representing modified files from installed RPM packages and values representing types of applied modifications. 'missing' value is a list of strings representing missing values owned by installed RPM packages. An example return value: { "changed": { "/etc/chrony.conf": "S.5....T.", "/etc/cloud/cloud.cfg": "S.5....T.", "/etc/nsswitch.conf": "....L....", "/etc/openldap/ldap.conf": ".......T.", "/etc/pam.d/fingerprint-auth": "....L....", "/etc/pam.d/password-auth": "....L....", "/etc/pam.d/postlogin": "....L....", "/etc/pam.d/smartcard-auth": "....L....", "/etc/pam.d/system-auth": "....L....", "/etc/rhsm/rhsm.conf": "..5....T.", "/etc/sudoers": "S.5....T.", "/etc/systemd/logind.conf": "S.5....T." }, "missing": [ "/etc/udev/rules.d/70-persistent-ipoib.rules", "/run/cloud-init", "/run/rpcbind", "/run/setrans", "/run/tuned" ] } """ # cannot use `rpm --root` here, because rpm uses passwd from the host to # verify user and group ownership: # https://github.com/rpm-software-management/rpm/issues/882 rpm = subprocess.Popen(["chroot", tree, "rpm", "--verify", "--all"], stdout=subprocess.PIPE, encoding="utf-8") changed = {} missing = [] if rpm.stdout: for line in rpm.stdout: # format description in rpm(8), under `--verify` attrs = line[:9] if attrs == "missing ": missing.append(line[12:].rstrip()) else: changed[line[13:].rstrip()] = attrs # ignore return value, because it returns non-zero when it found changes rpm.wait() return { "missing": sorted(missing), "changed": changed } def rpm_not_installed_docs(tree): """ Gathers information on documentation, which is part of RPM packages, but was not installed. Returns: list of documentation files, which are normally a part of the installed RPM packages, but were not installed (e.g. due to using '--excludedocs' option when executing 'rpm' command). An example return value: [ "/usr/share/man/man1/sdiff.1.gz", "/usr/share/man/man1/seapplet.1.gz", "/usr/share/man/man1/secon.1.gz", "/usr/share/man/man1/secret-tool.1.gz", "/usr/share/man/man1/sed.1.gz", "/usr/share/man/man1/seq.1.gz" ] """ # check not installed Docs (e.g. when RPMs are installed with --excludedocs) not_installed_docs = [] cmd = ["rpm", "--root", tree, "-qad", "--state"] if os.path.exists(os.path.join(tree, "usr/share/rpm")): cmd += ["--dbpath", "/usr/share/rpm"] elif os.path.exists(os.path.join(tree, "var/lib/rpm")): cmd += ["--dbpath", "/var/lib/rpm"] output = subprocess_check_output(cmd) for line in output.splitlines(): if line.startswith("not installed"): not_installed_docs.append(line.split()[-1]) return sorted(not_installed_docs) def rpm_packages(tree): """ Read NVRs of RPM packages installed on the system. Returns: sorted list of strings representing RPM packages installed on the system. An example return value: [ "NetworkManager-1.30.0-7.el8.x86_64", "PackageKit-glib-1.1.12-6.el8.x86_64", "PackageKit-gtk3-module-1.1.12-6.el8.x86_64", "abattis-cantarell-fonts-0.0.25-6.el8.noarch", "acl-2.2.53-1.el8.x86_64", "adobe-mappings-cmap-20171205-3.el8.noarch", "adobe-mappings-cmap-deprecated-20171205-3.el8.noarch", "adobe-mappings-pdf-20180407-1.el8.noarch", "adwaita-cursor-theme-3.28.0-2.el8.noarch", "adwaita-icon-theme-3.28.0-2.el8.noarch", "alsa-lib-1.2.4-5.el8.x86_64" ] """ cmd = ["rpm", "--root", tree, "-qa"] if os.path.exists(os.path.join(tree, "usr/share/rpm")): cmd += ["--dbpath", "/usr/share/rpm"] elif os.path.exists(os.path.join(tree, "var/lib/rpm")): cmd += ["--dbpath", "/var/lib/rpm"] subprocess_check_output(cmd) pkgs = subprocess_check_output(cmd, str.split) return list(sorted(pkgs)) @contextlib.contextmanager def change_root(root): real_root = os.open("/", os.O_RDONLY) try: os.chroot(root) yield None finally: os.fchdir(real_root) os.chroot(".") os.close(real_root) def read_services(tree, state): """ Read the list of systemd services on the system in the given state. Returns: alphabetically sorted list of strings representing systemd services in the given state. The returned list may be empty. An example return value: [ "arp-ethers.service", "canberra-system-bootup.service", "canberra-system-shutdown-reboot.service", "canberra-system-shutdown.service", "chrony-dnssrv@.timer", "chrony-wait.service" ] """ services_state = subprocess_check_output( ["systemctl", f"--root={tree}", "list-unit-files"], (lambda s: parse_unit_files(s, state))) # Since systemd v246, some services previously reported as "enabled" / # "disabled" are now reported as "alias". There is no systemd command, that # would take an "alias" unit and report its state as enabled/disabled # and could run on a different tree (with "--root" option). # To make the produced list of services in the given state consistent on # pre/post v246 systemd versions, check all "alias" units and append them # to the list, if their target is also listed in 'services_state'. if state != "alias": services_alias = subprocess_check_output( ["systemctl", f"--root={tree}", "list-unit-files"], (lambda s: parse_unit_files(s, "alias"))) for alias in services_alias: # The service may be in one of the following places (output of # "systemd-analyze unit-paths", it should not change too often). unit_paths = [ "/etc/systemd/system.control", "/run/systemd/system.control", "/run/systemd/transient", "/run/systemd/generator.early", "/etc/systemd/system", "/run/systemd/system", "/run/systemd/generator", "/usr/local/lib/systemd/system", "/usr/lib/systemd/system", "/run/systemd/generator.late" ] with change_root(tree): for path in unit_paths: unit_path = os.path.join(path, alias) if os.path.exists(unit_path): real_unit_path = os.path.realpath(unit_path) # Skip the alias, if there was a symlink cycle. # When symbolic link cycles occur, the returned path will # be one member of the cycle, but no guarantee is made about # which member that will be. if os.path.islink(real_unit_path): continue # Append the alias unit to the list, if its target is # already there. if os.path.basename(real_unit_path) in services_state: services_state.append(alias) # deduplicate and sort services_state = list(set(services_state)) services_state.sort() return services_state def read_default_target(tree): """ Read the default systemd target. Returns: string representing the default systemd target. An example return value: "multi-user.target" """ try: return subprocess_check_output(["systemctl", f"--root={tree}", "get-default"]).rstrip() except subprocess.CalledProcessError: return "" def read_firewall_default_zone(tree): """ Read the name of the default firewall zone Returns: a string with the zone name. If the firewall configuration doesn't exist, an empty string is returned. An example return value: "trusted" """ try: with open(f"{tree}/etc/firewalld/firewalld.conf", encoding="utf-8") as f: conf = parse_environment_vars(f.read()) return conf["DefaultZone"] except FileNotFoundError: return "" def read_firewall_zone(tree): """ Read enabled services from the configuration of the default firewall zone. Returns: list of strings representing enabled services in the firewall. The returned list may be empty. An example return value: [ "ssh", "dhcpv6-client", "cockpit" ] """ default = read_firewall_default_zone(tree) if default == "": default = "public" r = [] try: root = xml.etree.ElementTree.parse(f"{tree}/etc/firewalld/zones/{default}.xml").getroot() except FileNotFoundError: root = xml.etree.ElementTree.parse(f"{tree}/usr/lib/firewalld/zones/{default}.xml").getroot() for element in root.findall("service"): r.append(element.get("name")) return r def read_fstab(tree): """ Read the content of /etc/fstab. Returns: list of all uncommented lines read from the configuration file represented as a list of values split by whitespaces. The returned list may be empty. An example return value: [ [ "UUID=6d066eb4-e4c1-4472-91f9-d167097f48d1", "/", "xfs", "defaults", "0", "0" ] ] """ result = [] with contextlib.suppress(FileNotFoundError): with open(f"{tree}/etc/fstab", encoding="utf-8") as f: result = sorted([line.split() for line in f if line.strip() and not line.startswith("#")]) return result def read_rhsm(tree): """ Read configuration changes possible via org.osbuild.rhsm stage and in addition also the whole content of /etc/rhsm/rhsm.conf. Returns: returns dictionary with two keys - 'dnf-plugins' and 'rhsm.conf'. 'dnf-plugins' value represents configuration of 'product-id' and 'subscription-manager' DNF plugins. 'rhsm.conf' value is a dictionary representing the content of the RHSM configuration file. The returned dictionary may be empty. An example return value: { "dnf-plugins": { "product-id": { "enabled": true }, "subscription-manager": { "enabled": true } }, "rhsm.conf": { "logging": { "default_log_level": "INFO" }, "rhsm": { "auto_enable_yum_plugins": "1", "baseurl": "https://cdn.redhat.com", "ca_cert_dir": "/etc/rhsm/ca/", "consumercertdir": "/etc/pki/consumer", "entitlementcertdir": "/etc/pki/entitlement", "full_refresh_on_yum": "0", "inotify": "1", "manage_repos": "0", "package_profile_on_trans": "0", "pluginconfdir": "/etc/rhsm/pluginconf.d", "plugindir": "/usr/share/rhsm-plugins", "productcertdir": "/etc/pki/product", "repo_ca_cert": "/etc/rhsm/ca/redhat-uep.pem", "repomd_gpg_url": "", "report_package_profile": "1" }, "rhsmcertd": { "auto_registration": "1", "auto_registration_interval": "60", "autoattachinterval": "1440", "certcheckinterval": "240", "disable": "0", "splay": "1" }, "server": { "hostname": "subscription.rhsm.redhat.com", "insecure": "0", "no_proxy": "", "port": "443", "prefix": "/subscription", "proxy_hostname": "", "proxy_password": "", "proxy_port": "", "proxy_scheme": "http", "proxy_user": "", "ssl_verify_depth": "3" } } } """ result = {} # Check RHSM DNF plugins configuration and allowed options dnf_plugins_config = { "product-id": f"{tree}/etc/dnf/plugins/product-id.conf", "subscription-manager": f"{tree}/etc/dnf/plugins/subscription-manager.conf" } for plugin_name, plugin_path in dnf_plugins_config.items(): with contextlib.suppress(FileNotFoundError): with open(plugin_path, encoding="utf-8") as f: parser = configparser.ConfigParser() parser.read_file(f) # only read "enabled" option from "main" section with contextlib.suppress(configparser.NoSectionError, configparser.NoOptionError): # get the value as the first thing, in case it raises an exception enabled = parser.getboolean("main", "enabled") try: dnf_plugins_dict = result["dnf-plugins"] except KeyError as _: dnf_plugins_dict = result["dnf-plugins"] = {} try: plugin_dict = dnf_plugins_dict[plugin_name] except KeyError as _: plugin_dict = dnf_plugins_dict[plugin_name] = {} plugin_dict["enabled"] = enabled with contextlib.suppress(FileNotFoundError): rhsm_conf = {} with open(f"{tree}/etc/rhsm/rhsm.conf", encoding="utf-8") as f: parser = configparser.ConfigParser() parser.read_file(f) for section in parser.sections(): section_dict = {} section_dict.update(parser[section]) if section_dict: rhsm_conf[section] = section_dict result["rhsm.conf"] = rhsm_conf return result def read_sysconfig(tree): """ Read selected configuration files from /etc/sysconfig. Currently supported sysconfig files are: - 'kernel' - /etc/sysconfig/kernel - 'network' - /etc/sysconfig/network - 'network-scripts' - /etc/sysconfig/network-scripts/ifcfg-* Returns: dictionary with the keys being the supported types of sysconfig configurations read by the function. Values of 'kernel' and 'network' keys are a dictionaries containing key/values read from the respective configuration files. Value of 'network-scripts' key is a dictionary with the keys corresponding to the suffix of each 'ifcfg-*' configuration file and their values holding dictionaries with all key/values read from the configuration file. The returned dictionary may be empty. An example return value: { "kernel": { "DEFAULTKERNEL": "kernel", "UPDATEDEFAULT": "yes" }, "network": { "NETWORKING": "yes", "NOZEROCONF": "yes" }, "network-scripts": { "ens3": { "BOOTPROTO": "dhcp", "BROWSER_ONLY": "no", "DEFROUTE": "yes", "DEVICE": "ens3", "IPV4_FAILURE_FATAL": "no", "IPV6INIT": "yes", "IPV6_AUTOCONF": "yes", "IPV6_DEFROUTE": "yes", "IPV6_FAILURE_FATAL": "no", "NAME": "ens3", "ONBOOT": "yes", "PROXY_METHOD": "none", "TYPE": "Ethernet", "UUID": "106f1b31-7093-41d6-ae47-1201710d0447" }, "eth0": { "BOOTPROTO": "dhcp", "DEVICE": "eth0", "IPV6INIT": "no", "ONBOOT": "yes", "PEERDNS": "yes", "TYPE": "Ethernet", "USERCTL": "yes" } } } """ result = {} sysconfig_paths = { "kernel": f"{tree}/etc/sysconfig/kernel", "network": f"{tree}/etc/sysconfig/network" } # iterate through supported configs for name, path in sysconfig_paths.items(): with contextlib.suppress(FileNotFoundError): with open(path, encoding="utf-8") as f: # if file exists start with empty array of values result[name] = parse_environment_vars(f.read()) # iterate through all files in /etc/sysconfig/network-scripts network_scripts = {} files = glob.glob(f"{tree}/etc/sysconfig/network-scripts/ifcfg-*") for file in files: ifname = os.path.basename(file).lstrip("ifcfg-") with open(file, encoding="utf-8") as f: network_scripts[ifname] = parse_environment_vars(f.read()) if network_scripts: result["network-scripts"] = network_scripts return result def read_hosts(tree): """ Read non-empty lines of /etc/hosts. Returns: list of strings for all uncommented lines in the configuration file. The returned list may be empty. An example return value: [ "127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4", "::1 localhost localhost.localdomain localhost6 localhost6.localdomain6" ] """ result = [] with contextlib.suppress(FileNotFoundError): with open(f"{tree}/etc/hosts", encoding="utf-8") as f: for line in f: line = line.strip() if line: result.append(line) return result def read_logind_config(config_path): """ Read all uncommented key/values from the 'Login" section of system-logind configuration file. Returns: dictionary with key/values read from the configuration file. The returned dictionary may be empty. An example return value: { "NAutoVTs": "0" } """ result = {} with open(config_path, encoding="utf-8") as f: parser = configparser.RawConfigParser() # prevent conversion of the option name to lowercase parser.optionxform = lambda option: option parser.read_file(f) with contextlib.suppress(configparser.NoSectionError): result.update(parser["Login"]) return result def read_logind_configs(tree): """ Read all systemd-logind *.conf files from a predefined list of paths and parse them. The searched paths are: - "/etc/systemd/logind.conf" - "/etc/systemd/logind.conf.d/*.conf" - "/usr/lib/systemd/logind.conf.d/*.conf" Returns: dictionary as returned by '_read_glob_paths_with_parser()' with configuration representation as returned by 'read_logind_config()'. An example return value: { "/etc/systemd/logind.conf": { "NAutoVTs": "0" } } """ checked_globs = [ "/etc/systemd/logind.conf", "/etc/systemd/logind.conf.d/*.conf", "/usr/lib/systemd/logind.conf.d/*.conf" ] return _read_glob_paths_with_parser(tree, checked_globs, read_logind_config) def read_locale(tree): """ Read all uncommented key/values set in /etc/locale.conf. Returns: dictionary with key/values read from the configuration file. The returned dictionary may be empty. An example return value: { "LANG": "en_US" } """ with contextlib.suppress(FileNotFoundError): with open(f"{tree}/etc/locale.conf", encoding="utf-8") as f: return parse_environment_vars(f.read()) def read_selinux_info(tree, is_ostree): """ Read information related to SELinux. Returns: dictionary with two keys - 'policy' and 'context-mismatch'. 'policy' value corresponds to the value returned by read_selinux_conf(). 'context-mismatch' value corresponds to the value returned by read_selinux_ctx_mismatch(). The returned dictionary may be empty. Keys with empty values are omitted. An example return value: { "context-mismatch": [ { "actual": "system_u:object_r:root_t:s0", "expected": "system_u:object_r:device_t:s0", "filename": "/dev" }, { "actual": "system_u:object_r:root_t:s0", "expected": "system_u:object_r:default_t:s0", "filename": "/proc" } ], "policy": { "SELINUX": "permissive", "SELINUXTYPE": "targeted" } } """ result = {} policy = read_selinux_conf(tree) if policy: result["policy"] = policy with contextlib.suppress(subprocess.CalledProcessError): ctx_mismatch = read_selinux_ctx_mismatch(tree, is_ostree) if ctx_mismatch: result["context-mismatch"] = ctx_mismatch return result def read_selinux_conf(tree): """ Read all uncommented key/values set in /etc/selinux/config. Returns: dictionary with key/values read from the configuration file. An example of returned value: { "SELINUX": "enforcing", "SELINUXTYPE": "targeted" } """ with contextlib.suppress(FileNotFoundError): with open(f"{tree}/etc/selinux/config", encoding="utf-8") as f: return parse_environment_vars(f.read()) def read_selinux_ctx_mismatch(tree, is_ostree): """ Read any mismatch in selinux context of files on the image. Returns: list of dictionaries as described below. If there are no mismatches between used and expected selinux context, then an empty list is returned. If the checked 'tree' is ostree, then the path '/etc' is excluded from the check. This is beause it is bind-mounted from /usr/etc and therefore has incorrect selinux context for its filesystem path. An example of returned value: [ { "actual": "system_u:object_r:root_t:s0", "expected": "system_u:object_r:device_t:s0", "filename": "/dev" }, { "actual": "system_u:object_r:root_t:s0", "expected": "system_u:object_r:default_t:s0", "filename": "/proc" } ] """ result = [] # The binary policy that should be used is on the image and has name "policy.X" # where the "X" is a number. There may be more than one policy files. # In the usual case, the policy with the highest number suffix should be used. policy_files = glob.glob(f"{tree}/etc/selinux/targeted/policy/policy.*") policy_files = sorted(policy_files, reverse=True) if policy_files: CMD = [ "setfiles", "-r", f"{tree}", "-nvF", "-c", policy_files[0], # take the policy with the highest number f"{tree}/etc/selinux/targeted/contexts/files/file_contexts", f"{tree}" ] if is_ostree: # exclude /etc from being checked when the tree is ostree, because # it is just bind-mounted from /usr/etc and has incorrect selinux # context for /etc path CMD.extend(["-e", f"{tree}/etc"]) output = subprocess_check_output(CMD) # output are lines such as: # Would relabel /tmp/tmpwrozmb47/dev from system_u:object_r:root_t:s0 to system_u:object_r:device_t:s0\n setfiles_pattern = r"Would\s+relabel\s+(?P.+)\s+from\s+(?P.+)\s+to\s+(?P.+)" setfiles_re = re.compile(setfiles_pattern) # skip messages about old compiled fcontext format binary_fcontext_skip = rf"{tree}/etc/selinux/targeted/contexts/files/file_contexts.*\.bin:\s+Old compiled fcontext format, skipping" binary_fcontext_skip_re = re.compile(binary_fcontext_skip) for line in output.splitlines(): line = line.strip() if not line or binary_fcontext_skip_re.match(line): continue match = setfiles_re.match(line) # do not silently ignore changes of 'setfiles' output if not match: raise RuntimeError(f"could not match line '{line}' with pattern '{setfiles_pattern}'") parsed_line = { "filename": match.group("filename")[len(tree):], "actual": match.group("actual"), "expected": match.group("expected") } result.append(parsed_line) # sort the list to make it consistent across runs result.sort(key=lambda x: x.get("filename")) return result def _read_glob_paths_with_parser(tree, glob_paths, parser_func): """ Use 'parser_func' to read all files obtained by using all 'glob_paths' globbing patterns under the 'tree' path. The 'glob_paths' is a list string patterns accepted by glob.glob(). The 'parser_func' function is expected to take a single string argument containing the absolute path to a configuration file which should be parsed. Its return value can be arbitrary representation of the parsed configuration. Returns: dictionary with the keys corresponding to directories, which contain configuration files mathing the provided glob pattern. Value of each key is another dictionary with keys representing each filename and values being the parsed configuration representation as returned by the provided 'parser_func' function. An example return value for dracut configuration paths and parser: { "/etc/dracut.conf.d": { "sgdisk.conf": { "install_items": " sgdisk " }, }, "/usr/lib/dracut/dracut.conf.d": { "xen.conf": { "add_drivers": " xen-netfront xen-blkfront " } } } """ result = {} for glob_path in glob_paths: glob_path_result = {} files = glob.glob(f"{tree}{glob_path}") for file in files: config = parser_func(file) if config: filename = os.path.basename(file) glob_path_result[filename] = config if glob_path_result: checked_path = os.path.dirname(glob_path) result[checked_path] = glob_path_result return result def read_modprobe_config(config_path): """ Read a specific modprobe configuragion file and for now, extract only blacklisted kernel modules. Returns: dictionary with the keys corresponding to specific modprobe commands and values being the values of these commands. An example return value: { "blacklist": [ "nouveau" ] } """ file_result = {} BLACKLIST_CMD = "blacklist" with open(config_path, encoding="utf-8") as f: # The format of files under modprobe.d: one command per line, # with blank lines and lines starting with '#' ignored. # A '\' at the end of a line causes it to continue on the next line. line_to_be_continued = "" for line in f: line = line.strip() # line is not blank if line: # comment, skip it if line[0] == "#": continue # this line continues on the following line if line[-1] == "\\": line_to_be_continued += line[:-1] continue # this line ends here # is this line continuation of the previous one? if line_to_be_continued: line = line_to_be_continued + line line_to_be_continued = "" cmd, cmd_args = line.split(' ', 1) # we care only about blacklist command for now if cmd == BLACKLIST_CMD: modules_list = file_result[BLACKLIST_CMD] = [] modules_list.append(cmd_args) return file_result def read_modprobe_configs(tree): """ Read all modprobe *.conf files from a predefined list of paths and extract supported commands. For now, extract only blacklisted kernel modules. The searched paths are: - "/etc/modprobe.d/*.conf" - "/usr/lib/modprobe.d/*.conf" - "/usr/local/lib/modprobe.d/*.conf" Returns: dictionary as returned by '_read_glob_paths_with_parser()' with configuration representation as returned by 'read_modprobe_config()'. An example return value: { "/usr/lib/modprobe.d": { "blacklist-nouveau.conf": { "blacklist": [ "nouveau" ] } } } """ checked_globs = [ "/etc/modprobe.d/*.conf", "/usr/lib/modprobe.d/*.conf", "/usr/local/lib/modprobe.d/*.conf" ] return _read_glob_paths_with_parser(tree, checked_globs, read_modprobe_config) def read_cloud_init_config(config_path): """ Read the specific cloud-init configuration file. Returns: dictionary representing the cloud-init configuration. An example return value: { "cloud_config_modules": [ "mounts", "locale", "set-passwords", "rh_subscription", "yum-add-repo", "package-update-upgrade-install", "timezone", "puppet", "chef", "salt-minion", "mcollective", "disable-ec2-metadata", "runcmd" ], "cloud_final_modules": [ "rightscale_userdata", "scripts-per-once", "scripts-per-boot", "scripts-per-instance", "scripts-user", "ssh-authkey-fingerprints", "keys-to-console", "phone-home", "final-message", "power-state-change" ], "cloud_init_modules": [ "disk_setup", "migrator", "bootcmd", "write-files", "growpart", "resizefs", "set_hostname", "update_hostname", "update_etc_hosts", "rsyslog", "users-groups", "ssh" ], "disable_root": 1, "disable_vmware_customization": false, "mount_default_fields": [ null, null, "auto", "defaults,nofail,x-systemd.requires=cloud-init.service", "0", "2" ], "resize_rootfs_tmp": "/dev", "ssh_deletekeys": 1, "ssh_genkeytypes": null, "ssh_pwauth": 0, "syslog_fix_perms": null, "system_info": { "default_user": { "gecos": "Cloud User", "groups": [ "adm", "systemd-journal" ], "lock_passwd": true, "name": "ec2-user", "shell": "/bin/bash", "sudo": [ "ALL=(ALL) NOPASSWD:ALL" ] }, "distro": "rhel", "paths": { "cloud_dir": "/var/lib/cloud", "templates_dir": "/etc/cloud/templates" }, "ssh_svcname": "sshd" }, "users": [ "default" ] } """ result = {} with contextlib.suppress(FileNotFoundError): with open(config_path, encoding="utf-8") as f: config = yaml.safe_load(f) result.update(config) return result def read_cloud_init_configs(tree): """ Read all cloud-init *.cfg files from a predefined list of paths and parse them. The searched paths are: - "/etc/cloud/cloud.cfg" - "/etc/cloud/cloud.cfg.d/*.cfg" Returns: dictionary as returned by '_read_glob_paths_with_parser()' with configuration representation as returned by 'read_cloud_init_config()'. An example return value: { "/etc/cloud.cfg.d": "ec2.cfg": { "default_user": { "name": "ec2-user" } } } } """ checked_globs = [ "/etc/cloud/cloud.cfg", "/etc/cloud/cloud.cfg.d/*.cfg" ] return _read_glob_paths_with_parser(tree, checked_globs, read_cloud_init_config) def read_dracut_config(config_path): """ Read specific dracut configuration file. Returns: dictionary representing the uncommented configuration options read from the file. An example return value: { "install_items": " sgdisk " "add_drivers": " xen-netfront xen-blkfront " } """ result = {} with open(config_path, encoding="utf-8") as f: # dracut configuration key/values delimiter is '=' or '+=' for line in f: line = line.strip() # A '#' indicates the beginning of a comment; following # characters, up to the end of the line are not interpreted. line_comment = line.split("#", 1) line = line_comment[0] if line: key, value = line.split("=", 1) if key[-1] == "+": key = key[:-1] result[key] = value.strip('"') return result def read_dracut_configs(tree): """ Read all dracut *.conf files from a predefined list of paths and parse them. The searched paths are: - "/etc/dracut.conf.d/*.conf" - "/usr/lib/dracut/dracut.conf.d/*.conf" Returns: dictionary as returned by '_read_glob_paths_with_parser()' with configuration representation as returned by 'read_dracut_config()'. An example return value: { "/etc/dracut.conf.d": { "sgdisk.conf": { "install_items": " sgdisk " }, }, "/usr/lib/dracut/dracut.conf.d": { "xen.conf": { "add_drivers": " xen-netfront xen-blkfront " } } } """ checked_globs = [ "/etc/dracut.conf.d/*.conf", "/usr/lib/dracut/dracut.conf.d/*.conf" ] return _read_glob_paths_with_parser(tree, checked_globs, read_dracut_config) def read_keyboard_conf(tree): """ Read keyboard configuration for vconsole and X11. Returns: dictionary with at most two keys 'X11' and 'vconsole'. 'vconsole' value is a dictionary representing configuration read from /etc/vconsole.conf. 'X11' value is a dictionary with at most two keys 'layout' and 'variant', which values are extracted from X11 keyborad configuration. An example return value: { "X11": { "layout": "us" }, "vconsole": { "FONT": "eurlatgr", "KEYMAP": "us" } } """ result = {} # read virtual console configuration with contextlib.suppress(FileNotFoundError): with open(f"{tree}/etc/vconsole.conf", encoding="utf-8") as f: values = parse_environment_vars(f.read()) if values: result["vconsole"] = values # read X11 keyboard configuration with contextlib.suppress(FileNotFoundError): # Example file content: # # Section "InputClass" # Identifier "system-keyboard" # MatchIsKeyboard "on" # Option "XkbLayout" "us,sk" # Option "XkbVariant" ",qwerty" # EndSection x11_config = {} match_options_dict = { "layout": r'Section\s+"InputClass"\s+.*Option\s+"XkbLayout"\s+"([\w,-]+)"\s+.*EndSection', "variant": r'Section\s+"InputClass"\s+.*Option\s+"XkbVariant"\s+"([\w,-]+)"\s+.*EndSection' } with open(f"{tree}/etc/X11/xorg.conf.d/00-keyboard.conf", encoding="utf-8") as f: config = f.read() for option, pattern in match_options_dict.items(): match = re.search(pattern, config, re.DOTALL) if match and match.group(1): x11_config[option] = match.group(1) if x11_config: result["X11"] = x11_config return result def read_chrony_conf(tree): """ Read specific directives from Chrony configuration. Currently parsed directives are: - 'server' - 'pool' - 'peer' - 'leapsectz' Returns: dictionary with the keys representing parsed directives from Chrony configuration. Value of each key is a list of strings containing arguments provided with each occurrence of the directive in the configuration. An example return value: { "leapsectz": [ "right/UTC" ], "pool": [ "2.rhel.pool.ntp.org iburst" ], "server": [ "169.254.169.123 prefer iburst minpoll 4 maxpoll 4" ] } """ result = {} parsed_directives = ["server", "pool", "peer", "leapsectz"] with contextlib.suppress(FileNotFoundError): with open(f"{tree}/etc/chrony.conf", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue # skip comments if line[0] in ["!", ";", "#", "%"]: continue split_line = line.split() if split_line[0] in parsed_directives: try: directive_list = result[split_line[0]] except KeyError: directive_list = result[split_line[0]] = [] directive_list.append(" ".join(split_line[1:])) return result def read_systemd_service_dropin(dropin_dir_path): """ Read systemd .service unit drop-in configurations. Returns: dictionary representing the combined drop-in configurations. An example return value: { "Service": { "Environment": "NM_CLOUD_SETUP_EC2=yes" } } """ # read all unit drop-in configurations config_files = glob.glob(f"{dropin_dir_path}/*.conf") dropin_config = {} for file in config_files: dropin_config[os.path.basename(file)] = read_config_file_no_comment(file) return dropin_config def read_systemd_service_dropins(tree): """ Read all systemd .service unit config files from a predefined list of paths and parse them. The searched paths are: - "/etc/systemd/system/*.service.d" - "/usr/lib/systemd/system/*.service.d" Returns: dictionary as returned by '_read_glob_paths_with_parser()' with configuration representation as returned by 'read_systemd_service_dropin()'. An example return value: { "/etc/systemd/system": { "nm-cloud-setup.service.d": { "Service": { "Environment": "NM_CLOUD_SETUP_EC2=yes" } } } } """ checked_globs = [ "/etc/systemd/system/*.service.d", "/usr/lib/systemd/system/*.service.d" ] return _read_glob_paths_with_parser(tree, checked_globs, read_systemd_service_dropin) def read_config_file_no_comment(config_path): """ Read configuration files. Returns: list of strings representing uncommented lines read from the configuration file. An example return value: [ "x /tmp/.sap*", "x /tmp/.hdb*lock", "x /tmp/.trex*lock" ] """ file_lines = [] with open(config_path, encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue if line[0] == "#": continue file_lines.append(line) return file_lines def read_tmpfilesd_configs(tree): """ Read all tmpfiles.d *.conf files from a predefined list of paths and parse them. The searched paths are: - "/etc/tmpfiles.d/*.conf" - "/usr/lib/tmpfiles.d/*.conf" Returns: dictionary as returned by '_read_glob_paths_with_parser()' with configuration representation as returned by 'read_config_file_no_comment()'. An example return value: { "/etc/tmpfiles.d": { "sap.conf": [ "x /tmp/.sap*", "x /tmp/.hdb*lock", "x /tmp/.trex*lock" ] } } """ checked_globs = [ "/etc/tmpfiles.d/*.conf", "/usr/lib/tmpfiles.d/*.conf" ] return _read_glob_paths_with_parser(tree, checked_globs, read_config_file_no_comment) def read_tuned_profile(tree): """ Read the Tuned active profile and profile mode. Returns: dictionary with at most two keys 'active_profile' and 'profile_mode'. Value of each key is a string representing respective tuned configuration value. An example return value: { "active_profile": "sap-hana", "profile_mode": "manual" } """ result = {} config_files = ["active_profile", "profile_mode"] with contextlib.suppress(FileNotFoundError): for config_file in config_files: with open(f"{tree}/etc/tuned/{config_file}", encoding="utf-8") as f: value = f.read() value = value.strip() if value: result[config_file] = value return result def read_sysctld_config(config_path): """ Read sysctl configuration file. Returns: list of strings representing uncommented lines read from the configuration file. An example return value: [ "kernel.pid_max = 4194304", "vm.max_map_count = 2147483647" ] """ values = [] with open(config_path, encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue # skip comments if line[0] in ["#", ";"]: continue values.append(line) return values def read_sysctld_configs(tree): """ Read all sysctl.d *.conf files from a predefined list of paths and parse them. The searched paths are: - "/etc/sysctl.d/*.conf", - "/usr/lib/sysctl.d/*.conf" Returns: dictionary as returned by '_read_glob_paths_with_parser()' with configuration representation as returned by 'read_sysctld_config()'. An example return value: { "/etc/sysctl.d": { "sap.conf": [ "kernel.pid_max = 4194304", "vm.max_map_count = 2147483647" ] } } """ checked_globs = [ "/etc/sysctl.d/*.conf", "/usr/lib/sysctl.d/*.conf" ] return _read_glob_paths_with_parser(tree, checked_globs, read_sysctld_config) def read_security_limits_config(config_path): """ Read all configuration files from /etc/security/limits.d. Returns: dictionary with the keys representing names of configuration files from /etc/security/limits.d. Value of each key is a dictionary representing uncommented configuration values read from the configuration file. An example return value: [ { "domain": "@sapsys", "item": "nofile", "type": "hard", "value": "65536" }, { "domain": "@sapsys", "item": "nofile", "type": "soft", "value": "65536" } ] """ values = [] with open(config_path, encoding="utf-8") as f: for line in f: line = line.strip() # the '#' character introduces a comment - after which the rest of the line is ignored split_line = line.split("#", 1) line = split_line[0] if not line: continue # Syntax of a line is " " domain, limit_type, item, value = line.split() values.append({ "domain": domain, "type": limit_type, "item": item, "value": value }) return values def read_security_limits_configs(tree): """ Read all security limits *.conf files from a predefined list of paths and parse them. The searched paths are: - "/etc/security/limits.conf" - "/etc/security/limits.d/*.conf" Returns: dictionary as returned by '_read_glob_paths_with_parser()' with configuration representation as returned by 'read_security_limits_config()'. An example return value: { "/etc/security/limits.d": { "99-sap.conf": [ { "domain": "@sapsys", "item": "nofile", "type": "hard", "value": "65536" }, { "domain": "@sapsys", "item": "nofile", "type": "soft", "value": "65536" } ] } } """ checked_globs = [ "/etc/security/limits.conf", "/etc/security/limits.d/*.conf" ] return _read_glob_paths_with_parser(tree, checked_globs, read_config_file_no_comment) def read_ssh_config(config_path): """ Read the content of provided SSH(d) configuration file. Returns: list of uncommented and non-empty lines read from the configuation file. An example return value: [ "Match final all", "Include /etc/crypto-policies/back-ends/openssh.config", "GSSAPIAuthentication yes", "ForwardX11Trusted yes", "SendEnv LANG LC_CTYPE LC_NUMERIC LC_TIME LC_COLLATE LC_MONETARY LC_MESSAGES", "SendEnv LC_PAPER LC_NAME LC_ADDRESS LC_TELEPHONE LC_MEASUREMENT", "SendEnv LC_IDENTIFICATION LC_ALL LANGUAGE", "SendEnv XMODIFIERS" ] """ config_lines = [] with open(config_path, encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue if line[0] == "#": continue config_lines.append(line) return config_lines def read_ssh_configs(tree): """ Read all SSH configuration files from a predefined list of paths and parse them. The searched paths are: - "/etc/ssh/ssh_config" - "/etc/ssh/ssh_config.d/*.conf" Returns: dictionary as returned by '_read_glob_paths_with_parser()' with configuration representation as returned by 'read_ssh_config()'. An example return value: { "/etc/ssh": { "ssh_config": [ "Include /etc/ssh/ssh_config.d/*.conf" ] }, "/etc/ssh/ssh_config.d": { "05-redhat.conf": [ "Match final all", "Include /etc/crypto-policies/back-ends/openssh.config", "GSSAPIAuthentication yes", "ForwardX11Trusted yes", "SendEnv LANG LC_CTYPE LC_NUMERIC LC_TIME LC_COLLATE LC_MONETARY LC_MESSAGES", "SendEnv LC_PAPER LC_NAME LC_ADDRESS LC_TELEPHONE LC_MEASUREMENT", "SendEnv LC_IDENTIFICATION LC_ALL LANGUAGE", "SendEnv XMODIFIERS" ] } } """ checked_globs = [ "/etc/ssh/ssh_config", "/etc/ssh/ssh_config.d/*.conf" ] return _read_glob_paths_with_parser(tree, checked_globs, read_ssh_config) def read_sshd_configs(tree): """ Read all SSHd configuration files from a predefined list of paths and parse them. The searched paths are: - "/etc/ssh/sshd_config" - "/etc/ssh/sshd_config.d/*.conf" Returns: dictionary as returned by '_read_glob_paths_with_parser()' with configuration representation as returned by 'read_ssh_config()'. An example return value: { "/etc/ssh": { "sshd_config": [ "HostKey /etc/ssh/ssh_host_rsa_key", "HostKey /etc/ssh/ssh_host_ecdsa_key", "HostKey /etc/ssh/ssh_host_ed25519_key", "SyslogFacility AUTHPRIV", "PermitRootLogin no", "AuthorizedKeysFile\t.ssh/authorized_keys", "PasswordAuthentication no", "ChallengeResponseAuthentication no", "GSSAPIAuthentication yes", "GSSAPICleanupCredentials no", "UsePAM yes", "X11Forwarding yes", "PrintMotd no", "AcceptEnv LANG LC_CTYPE LC_NUMERIC LC_TIME LC_COLLATE LC_MONETARY LC_MESSAGES", "AcceptEnv LC_PAPER LC_NAME LC_ADDRESS LC_TELEPHONE LC_MEASUREMENT", "AcceptEnv LC_IDENTIFICATION LC_ALL LANGUAGE", "AcceptEnv XMODIFIERS", "Subsystem\tsftp\t/usr/libexec/openssh/sftp-server", "ClientAliveInterval 420" ] } } """ checked_globs = [ "/etc/ssh/sshd_config", "/etc/ssh/sshd_config.d/*.conf" ] return _read_glob_paths_with_parser(tree, checked_globs, read_ssh_config) def read_yum_repos(tree): """ Read all YUM/DNF repo files. The searched paths are: - "/etc/yum.repos.d/*.repo" Returns: dictionary as returned by '_read_glob_paths_with_parser()' with configuration representation as returned by '_read_inifile_to_dict()'. An example return value: { "/etc/yum.repos.d": { "google-cloud.repo": { "google-cloud-sdk": { "baseurl": "https://packages.cloud.google.com/yum/repos/cloud-sdk-el8-x86_64", "enabled": "1", "gpgcheck": "1", "gpgkey": "https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg", "name": "Google Cloud SDK", "repo_gpgcheck": "0" }, "google-compute-engine": { "baseurl": "https://packages.cloud.google.com/yum/repos/google-compute-engine-el8-x86_64-stable", "enabled": "1", "gpgcheck": "1", "gpgkey": "https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg", "name": "Google Compute Engine", "repo_gpgcheck": "0" } } } } """ checked_globs = [ "/etc/yum.repos.d/*.repo" ] return _read_glob_paths_with_parser(tree, checked_globs, _read_inifile_to_dict) def read_sudoers(tree): """ Read uncommented lines from sudoers configuration file and /etc/sudoers.d This functions does not actually do much of a parsing, as sudoers file format grammar is a bit too much for our purpose. Any #include or #includedir directives are ignored by this function. Returns: dictionary with the keys representing names of read configuration files, /etc/sudoers and files from /etc/sudoers.d. Value of each key is a list of strings representing uncommented lines read from the configuration file. An example return value: { "/etc/sudoers": [ "Defaults !visiblepw", "Defaults always_set_home", "Defaults match_group_by_gid", "Defaults always_query_group_plugin", "Defaults env_reset", "Defaults env_keep = \"COLORS DISPLAY HOSTNAME HISTSIZE KDEDIR LS_COLORS\"", "Defaults env_keep += \"MAIL PS1 PS2 QTDIR USERNAME LANG LC_ADDRESS LC_CTYPE\"", "Defaults env_keep += \"LC_COLLATE LC_IDENTIFICATION LC_MEASUREMENT LC_MESSAGES\"", "Defaults env_keep += \"LC_MONETARY LC_NAME LC_NUMERIC LC_PAPER LC_TELEPHONE\"", "Defaults env_keep += \"LC_TIME LC_ALL LANGUAGE LINGUAS _XKB_CHARSET XAUTHORITY\"", "Defaults secure_path = /sbin:/bin:/usr/sbin:/usr/bin", "root\tALL=(ALL) \tALL", "%wheel\tALL=(ALL)\tALL", "ec2-user\tALL=(ALL)\tNOPASSWD: ALL" ] } """ result = {} def _parse_sudoers_file(f): lines = [] for line in f: line = line.strip() if not line: continue if line[0] == "#": continue lines.append(line) return lines with contextlib.suppress(FileNotFoundError): with open(f"{tree}/etc/sudoers", encoding="utf-8") as f: lines = _parse_sudoers_file(f) if lines: result["/etc/sudoers"] = lines sudoersd_result = {} for file in glob.glob(f"{tree}/etc/sudoers.d/*"): with open(file, encoding="utf-8") as f: lines = _parse_sudoers_file(f) if lines: result[os.path.basename(file)] = lines if sudoersd_result: result["/etc/sudoers.d"] = sudoersd_result return result def read_udev_rules(tree): """ Read udev rules defined in /etc/udev/rules.d. Returns: dictionary with the keys representing names of files with udev rules from /etc/udev/rules.d. Value of each key is a list of strings representing uncommented lines read from the configuration file. If the file is empty (e.g. because of masking udev configuration installed by an RPM), an empty list is returned as the respective value. An example return value: { "80-net-name-slot.rules": [] } """ result = {} for file in glob.glob(f"{tree}/etc/udev/rules.d/*.rules"): with open(file, encoding="utf-8") as f: lines = [] for line in f: line = line.strip() if not line: continue if line[0] == "#": continue lines.append(line) # include also empty files in the report result[os.path.basename(file)] = lines return result def _read_inifile_to_dict(config_path): """ Read INI file from the provided path Returns: a dictionary representing the provided INI file content. An example return value: { "google-cloud-sdk": { "baseurl": "https://packages.cloud.google.com/yum/repos/cloud-sdk-el8-x86_64", "enabled": "1", "gpgcheck": "1", "gpgkey": "https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg", "name": "Google Cloud SDK", "repo_gpgcheck": "0" }, "google-compute-engine": { "baseurl": "https://packages.cloud.google.com/yum/repos/google-compute-engine-el8-x86_64-stable", "enabled": "1", "gpgcheck": "1", "gpgkey": "https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg", "name": "Google Compute Engine", "repo_gpgcheck": "0" } } """ result = {} with contextlib.suppress(FileNotFoundError): with open(config_path, encoding="utf-8") as f: parser = configparser.RawConfigParser() # prevent conversion of the opion name to lowercase parser.optionxform = lambda option: option parser.read_file(f) for section in parser.sections(): section_config = dict(parser.items(section)) if section_config: result[section] = section_config return result def read_dnf_conf(tree): """ Read DNF configuration and defined variable files. Returns: dictionary with at most two keys 'dnf.conf' and 'vars'. 'dnf.conf' value is a dictionary representing the DNF configuration file content. 'vars' value is a dictionary which keys represent names of files from /etc/dnf/vars/ and values are strings representing the file content. An example return value: { "dnf.conf": { "main": { "installonly_limit": "3" } }, "vars": { "releasever": "8.4" } } """ result = {} dnf_config = _read_inifile_to_dict(f"{tree}/etc/dnf/dnf.conf") if dnf_config: result["dnf.conf"] = dnf_config dnf_vars = {} for file in glob.glob(f"{tree}/etc/dnf/vars/*"): with open(file, encoding="utf-8") as f: dnf_vars[os.path.basename(file)] = f.read().strip() if dnf_vars: result["vars"] = dnf_vars return result def read_dnf_automatic_conf(tree): """ Read DNF Automatic configuration. Returns: dictionary as returned by '_read_inifile_to_dict()'. An example return value: { "base": { "debuglevel": "1" }, "command_email": { "email_from": "root@example.com", "email_to": "root" }, "commands": { "apply_updates": "yes", "download_updates": "yes", "network_online_timeout": "60", "random_sleep": "0", "upgrade_type": "security" }, "email": { "email_from": "root@example.com", "email_host": "localhost", "email_to": "root" }, "emitters": { "emit_via": "stdio" } } """ return _read_inifile_to_dict(f"{tree}/etc/dnf/automatic.conf") def read_authselect_conf(tree): """ Read authselect configuration. Returns: dictionary with two keys 'profile-id' and 'enabled-features'. 'profile-id' value is a string representing the configured authselect profile. 'enabled-features' value is a list of strings representing enabled features of the used authselect profile. In case there are no specific features enabled, the list is empty. An example return value: { "enabled-features": [], "profile-id": "sssd" } """ result = {} with contextlib.suppress(FileNotFoundError): with open(f"{tree}/etc/authselect/authselect.conf", encoding="utf-8") as f: # the first line is always the profile ID # following lines are listing enabled features # lines starting with '#' and empty lines are skipped authselect_conf_lines = [] for line in f: line = line.strip() if not line: continue if line[0] == "#": continue authselect_conf_lines.append(line) if authselect_conf_lines: result["profile-id"] = authselect_conf_lines[0] result["enabled-features"] = authselect_conf_lines[1:] return result def read_resolv_conf(tree): """ Read /etc/resolv.conf. Returns: a list of uncommented lines from the /etc/resolv.conf. An example return value: [ "search redhat.com", "nameserver 192.168.1.1", "nameserver 192.168.1.2" ] """ result = [] with contextlib.suppress(FileNotFoundError): with open(f"{tree}/resolv.conf", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue if line[0] == "#": continue result.append(line) return result # pylint: disable=too-many-branches disable=too-many-statements def append_filesystem(report, tree, *, is_ostree=False): if os.path.exists(f"{tree}/etc/os-release"): report["packages"] = rpm_packages(tree) if not is_ostree: report["rpm-verify"] = rpm_verify(tree) not_installed_docs = rpm_not_installed_docs(tree) if not_installed_docs: report["rpm_not_installed_docs"] = not_installed_docs with open(f"{tree}/etc/os-release", encoding="utf-8") as f: report["os-release"] = parse_environment_vars(f.read()) report["services-enabled"] = read_services(tree, "enabled") report["services-disabled"] = read_services(tree, "disabled") default_target = read_default_target(tree) if default_target: report["default-target"] = default_target with contextlib.suppress(FileNotFoundError): with open(f"{tree}/etc/hostname", encoding="utf-8") as f: report["hostname"] = f.read().strip() with contextlib.suppress(FileNotFoundError): report["timezone"] = os.path.basename(os.readlink(f"{tree}/etc/localtime")) authselect_conf = read_authselect_conf(tree) if authselect_conf: report["authselect"] = authselect_conf chrony_conf = read_chrony_conf(tree) if chrony_conf: report["chrony"] = chrony_conf cloud_init_configs = read_cloud_init_configs(tree) if cloud_init_configs: report["cloud-init"] = cloud_init_configs container_images = read_container_images(tree) if container_images: report["container-images"] = container_images dnf_conf = read_dnf_conf(tree) if dnf_conf: report["dnf"] = dnf_conf dnf_automatic = read_dnf_automatic_conf(tree) if dnf_automatic: report["/etc/dnf/automatic.conf"] = dnf_automatic yum_repos = read_yum_repos(tree) if yum_repos: report["yum_repos"] = yum_repos dracut_configs = read_dracut_configs(tree) if dracut_configs: report["dracut"] = dracut_configs with contextlib.suppress(FileNotFoundError): report["firewall-enabled"] = read_firewall_zone(tree) firewall_default_zone = read_firewall_default_zone(tree) if firewall_default_zone: report["firewall-default-zone"] = firewall_default_zone fstab = read_fstab(tree) if fstab: report["fstab"] = fstab hosts = read_hosts(tree) if hosts: report["hosts"] = hosts keyboard = read_keyboard_conf(tree) if keyboard: report["keyboard"] = keyboard security_limits_configs = read_security_limits_configs(tree) if security_limits_configs: report["security-limits"] = security_limits_configs locale = read_locale(tree) if locale: report["locale"] = locale logind_configs = read_logind_configs(tree) if logind_configs: report["systemd-logind"] = logind_configs with contextlib.suppress(FileNotFoundError): with open(f"{tree}/etc/machine-id", encoding="utf-8") as f: report["machine-id"] = f.readline() modprobe_configs = read_modprobe_configs(tree) if modprobe_configs: report["modprobe"] = modprobe_configs tmpfilesd_configs = read_tmpfilesd_configs(tree) if tmpfilesd_configs: report["tmpfiles.d"] = tmpfilesd_configs rhsm = read_rhsm(tree) if rhsm: report["rhsm"] = rhsm selinux = read_selinux_info(tree, is_ostree) if selinux: report["selinux"] = selinux ssh_configs = read_ssh_configs(tree) if ssh_configs: report["ssh_config"] = ssh_configs sshd_configs = read_sshd_configs(tree) if sshd_configs: report["sshd_config"] = sshd_configs sudoers_conf = read_sudoers(tree) if sudoers_conf: report["sudoers"] = sudoers_conf sysconfig = read_sysconfig(tree) if sysconfig: report["sysconfig"] = sysconfig sysctld_configs = read_sysctld_configs(tree) if sysctld_configs: report["sysctl.d"] = sysctld_configs systemd_service_dropins = read_systemd_service_dropins(tree) if systemd_service_dropins: report["systemd-service-dropins"] = systemd_service_dropins tuned_profile = read_tuned_profile(tree) if tuned_profile: report["tuned"] = tuned_profile resolv_conf = read_resolv_conf(tree) # add even empty resolv_conf to the report to express that it is empty or non-existent report["/etc/resolv.conf"] = resolv_conf udev_rules = read_udev_rules(tree) if udev_rules: report["/etc/udev/rules.d"] = udev_rules with open(f"{tree}/etc/passwd", encoding="utf-8") as f: report["passwd"] = sorted(f.read().strip().split("\n")) with open(f"{tree}/etc/group", encoding="utf-8") as f: report["groups"] = sorted(f.read().strip().split("\n")) if is_ostree: with open(f"{tree}/usr/lib/passwd", encoding="utf-8") as f: report["passwd-system"] = sorted(f.read().strip().split("\n")) with open(f"{tree}/usr/lib/group", encoding="utf-8") as f: report["groups-system"] = sorted(f.read().strip().split("\n")) if os.path.exists(f"{tree}/boot") and len(os.listdir(f"{tree}/boot")) > 0: assert "bootmenu" not in report with contextlib.suppress(FileNotFoundError): with open(f"{tree}/boot/grub2/grubenv", encoding="utf-8") as f: report["boot-environment"] = parse_environment_vars(f.read()) report["bootmenu"] = read_boot_entries(f"{tree}/boot") elif len(glob.glob(f"{tree}/vmlinuz-*")) > 0: assert "bootmenu" not in report with open(f"{tree}/grub2/grubenv", encoding="utf-8") as f: report["boot-environment"] = parse_environment_vars(f.read()) report["bootmenu"] = read_boot_entries(tree) elif glob.glob(f"{tree}/EFI"): print("EFI partition", file=sys.stderr) def lvm_vg_for_device(device: str) -> str: """ Find the volume group name for the specified device. """ vg_name = None count = 0 cmd = [ "pvdisplay", "-C", "--noheadings", "-o", "vg_name", device ] while True: res = subprocess.run(cmd, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="UTF-8") if res.returncode == 5: if count == 10: raise RuntimeError(f"Could not find parent device: {res.stderr.strip()}") time.sleep(1 * count) count += 1 continue if res.returncode != 0: raise RuntimeError(res.stderr.strip()) vg_name = res.stdout.strip() if vg_name: break return vg_name def lvm_lvs_for_vg(vg_name: str) -> List[str]: """ Get the list of logical volumes for a given volume group. """ cmd = [ "lvdisplay", "-C", "--noheadings", "-o", "lv_name", vg_name ] res = subprocess.run(cmd, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="UTF-8") if res.returncode != 0: raise RuntimeError(res.stderr.strip()) return [lv.strip() for lv in res.stdout.strip().splitlines()] def discover_lvm(dev: str, parent: devices.Device, devmgr: OSBuildDeviceManager): # NB: activating LVM is done by the OSBuild device implementation, # however, the LV name must be passed to the OSBuild device implementation. vg_name = lvm_vg_for_device(dev) lv_names = lvm_lvs_for_vg(vg_name) # NB: the order of the volumes is important, we want to mount the root # volume first, so that we can mount the other volumes on top of it. volumes = OrderedDict() # devices_map stores for each device path onto the system the corresponding # OSBuild's Device object devices_map = {} for lv_name in lv_names: ret = devmgr.open_lvm_lv(lv_name, parent) voldev = ret["path"] device = ret["Device"] # NB: add the device path to the partition info, so that it can be mounted by the caller. part_info = { "device": voldev, } volumes[lv_name] = read_partition(voldev, part_info) if lv_name.startswith("root"): volumes.move_to_end(lv_name, last=False) # associate the device path with the Device object, we will need it to # mount later on. devices_map[voldev] = device # get back both the device map and the result that'll go in the JSON report return devices_map, { "lvm": True, "lvm.vg": vg_name, "lvm.volumes": volumes } def partition_is_lvm(part: Dict) -> bool: return part["type"].upper() in ["E6D6D379-F507-44C2-A23C-238F2A3DF928", "8E"] def parse_subvol_list(output): """ Parse the output of 'btrfs subvolume list' and return just the subvolume names/paths. """ paths = [] for line in output.strip().split("\n"): # subvolume names can have spaces in them, but they are the last field and they are preceded by the word # path parts = line.partition(" path ") # str.partition() always returns a 3-tuple, but will return (str, "", "") if the separator is not found if parts[2] == "": raise RuntimeError(f"failed to parse output line from 'btrfs subvolume list': {line}") paths.append(parts[2]) return paths def find_root_subvol(root): """ Given a btrfs volume root, find the subvolume that contains the root OS tree. """ subvols = subprocess_check_output(["btrfs", "subvolume", "list", root], parse_fn=parse_subvol_list) # look through each subvol for /etc/fstab for subvol in subvols: path = os.path.join(root, subvol) if os.path.exists(os.path.join(path, "etc/fstab")): return path return None def find_fstab_root(tree, fstype): """ Find the root volume under tree by searching for /etc/fstab. This function first checks if the path /etc/fstab exists and if it doesn't and the fstype is btrfs, checks all subvolumes as well. Returns None if fstab is not found. """ if os.path.exists(f"{tree}/etc/fstab"): return tree if fstype == "btrfs": root_subvol = find_root_subvol(tree) if root_subvol: return root_subvol return None # pylint: disable=too-many-branches disable=too-many-statements def append_partitions(report, image): partitions = report["partitions"] with tempfile.TemporaryDirectory() as mountpoint: with host.ServiceManager(monitor=monitor.NullMonitor(1)) as mgr: devmgr = OSBuildDeviceManager(mgr, "/dev", os.path.dirname(image)) # Device map associate a path onto where the device is mounted with its # corresponding Device object. Mount will require both the path and the # Device object in order to do its job. devices_map = {} filesystems = {} for part in partitions: start, size = part["start"], part["size"] ret = devmgr.open_loopback( part["partuuid"], image, size, offset=start) dev = ret["path"] devices_map[dev] = ret["Device"] read_partition(dev, part) if partition_is_lvm(part): dmap, lvm = discover_lvm(dev, ret["Device"], devmgr) devices_map.update(dmap) for vol in lvm["lvm.volumes"].values(): if vol["fstype"]: mntopts = [] # we cannot recover since the underlying loopback device is mounted # read-only but since we are using the it through the device mapper # the fact might not be communicated and the kernel attempt a to # a recovery of the filesystem, which will lead to a kernel panic if vol["fstype"] in ("ext4", "ext3", "xfs"): mntopts = ["norecovery"] filesystems[vol["uuid"].upper()] = { "device": vol["device"], "mntops": mntopts, "type": vol["fstype"], } del vol["device"] part.update(lvm) elif part["uuid"] and part["fstype"]: filesystems[part["uuid"].upper()] = { "device": dev, "type": part["fstype"], } # find partition with fstab and read it fstab = [] for fs in filesystems.values(): if fs["type"] == "swap": continue dev, opts = fs["device"], fs.get("mntops") with mount(dev, opts) as tree: root = find_fstab_root(tree, fs["type"]) if root: fstab.extend(read_fstab(root)) break else: raise RuntimeError("no fstab file found") # sort the fstab entries by the mountpoint fstab = sorted(fstab, key=operator.itemgetter(1)) # mount all partitions to their respective mount points root_tree = "" mmgr = mounts.MountManager(devmgr, mountpoint) for n, fstab_entry in enumerate(fstab): part_uuid = fstab_entry[0].split("=")[1].upper() part_device = filesystems[part_uuid]["device"] part_mountpoint = fstab_entry[1] part_fstype = fstab_entry[2] part_options = fstab_entry[3].split(",") part_options += filesystems[part_uuid].get("mntops", []) if "ext4" in part_fstype: info = index.get_module_info("Mount", "org.osbuild.ext4") elif "vfat" in part_fstype: info = index.get_module_info("Mount", "org.osbuild.fat") elif "btrfs" in part_fstype: info = index.get_module_info("Mount", "org.osbuild.btrfs") elif "xfs" in part_fstype: info = index.get_module_info("Mount", "org.osbuild.xfs") elif "swap" in part_fstype: # can't mount swap partitions continue else: raise RuntimeError(f"Unknown file system: {part_fstype}") if not info: raise RuntimeError(f"Can't find org.osbuild.{part_fstype}") # the first mount point should be root if n == 0: if part_mountpoint != "/": raise RuntimeError("The first mountpoint in sorted fstab entries is not '/'") root_tree = mountpoint # prepare the options to mount the partition options = {} for option in part_options: if option == "defaults": # defaults is not a supported option continue if "=" in option: parts = option.split("=") key = parts[0] val = parts[1] # uid and gid must be integers if key in ("uid", "gid"): val = int(val) options[key] = val else: options[option] = True options["readonly"] = True # Validate the options # # The mount manager is taking care of opening the file system for us # so we don't have access to the json objects that'll be used to # invoke the mounter. However we're only interested at validating the # options. We can extract these from the schema to validate them # only. jsonschema.validate(options, info.get_schema()["properties"]["options"]) # Finally mount mnt_kwargs = { "name": part_device + part_mountpoint, "info": info, # retrieves the associated Device Object "device": devices_map[part_device], "target": part_mountpoint, "options": options, "partition": None, } mmgr.mount(mounts.Mount(**mnt_kwargs)) if not root_tree: raise RuntimeError("The root filesystem tree is not mounted") append_filesystem(report, root_tree) def analyse_image(image) -> Dict[str, Any]: imgfmt = read_image_format(image) report: Dict[str, Any] = {"image-format": imgfmt} with convert_image(image, imgfmt) as target: size = os.stat(target).st_size with host.ServiceManager(monitor=monitor.NullMonitor(1)) as mgr: device = OSBuildDeviceManager(mgr, "/dev", os.path.dirname(target)).open_loopback( os.path.basename(target), target, size, offset=0)["path"] report["bootloader"] = read_bootloader_type(device) report.update(read_partition_table(device)) if not report["partition-table"]: # no partition table: mount device and treat it as a partition with mount(device) as tree: append_filesystem(report, tree) return report # close loop device and descend into partitions on image file append_partitions(report, target) return report def append_directory(report, tree): with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir: tree_ro = os.path.join(tmpdir, "root_ro") os.makedirs(tree_ro) # Make sure that the tools which analyse the directory in-place # can not modify its content (e.g. create additional files). # mount_at() always mounts the source as read-only! with mount_at(tree, tree_ro, ["bind"]) as _: if os.path.lexists(f"{tree}/ostree"): os.makedirs(f"{tree}/etc", exist_ok=True) with mount_at(f"{tree}/usr/etc", f"{tree}/etc", extra=["--bind"]): append_filesystem(report, tree_ro, is_ostree=True) else: append_filesystem(report, tree_ro) def append_ostree_repo(report, repo): ostree = functools.partial(run_ostree, repo=repo) r = ostree("config", "get", "core.mode") report["ostree"] = { "repo": { "core.mode": r.stdout.strip() } } r = ostree("refs") refs = r.stdout.strip().split("\n") report["ostree"]["refs"] = refs resolved = {r: ostree("rev-parse", r).stdout.strip() for r in refs} commit = resolved[refs[0]] with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir: tree = os.path.join(tmpdir, "tree") ostree("checkout", "--force-copy", commit, tree) append_directory(report, tree) def analyse_directory(path): report = {} if os.path.exists(os.path.join(path, "compose.json")): report["type"] = "ostree/commit" repo = os.path.join(path, "repo") append_ostree_repo(report, repo) elif os.path.isdir(os.path.join(path, "refs")): report["type"] = "ostree/repo" repo = os.path.join(path, "repo") append_ostree_repo(report, repo) else: append_directory(report, path) return report def is_tarball(path): mtype, _ = mimetypes.guess_type(path) return mtype == "application/x-tar" def analyse_tarball(path): with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir: tree = os.path.join(tmpdir, "root") os.makedirs(tree) command = [ "tar", "--selinux", "--xattrs", "--acls", "-x", "--auto-compress", "-f", path, "-C", tree ] subprocess.run(command, stdout=sys.stderr, check=True) # gce image type contains virtual raw disk inside a tarball if os.path.isfile(f"{tree}/disk.raw"): return analyse_image(f"{tree}/disk.raw") return analyse_directory(tree) def is_compressed(path): mtype, encoding = mimetypes.guess_type(path) # zst files do not return an "encoding", only a mimetype so they need to be # special cased return encoding in ["xz", "gzip", "bzip2"] or mtype == "application/zstd" def analyse_compressed(path): mtype, encoding = mimetypes.guess_type(path) if encoding == "xz": command = ["unxz", "--force"] elif encoding == "gzip": command = ["gunzip", "--force"] elif encoding == "bzip2": command = ["bunzip2", "--force"] elif mtype == "application/zstd": # zst files do not return an "encoding", only a mimetype so they need to be # special cased command = ["unzstd", "--force", "--rm"] else: raise ValueError(f"Unsupported compression: {encoding}") with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir: subprocess.run(["cp", "--reflink=auto", "-a", path, tmpdir], check=True) files = os.listdir(tmpdir) archive = os.path.join(tmpdir, files[0]) subprocess.run(command + [archive], check=True) files = os.listdir(tmpdir) assert len(files) == 1 image = os.path.join(tmpdir, files[0]) return analyse_image(image) def is_iso(path): return "iso" in pathlib.Path(path).suffix def analyse_iso(path): with mount(path, ["loop"]) as tmp: return analyse_tarball(os.path.join(tmp, "liveimg.tar.gz")) def main(): parser = argparse.ArgumentParser(description="Inspect an image") parser.add_argument("target", metavar="TARGET", help="The file or directory to analyse", type=os.path.abspath) args = parser.parse_args() target = args.target if os.path.isdir(target): report = analyse_directory(target) elif is_tarball(target): report = analyse_tarball(target) elif is_compressed(target): report = analyse_compressed(target) elif is_iso(target): report = analyse_iso(target) else: report = analyse_image(target) if not report: print(f"Failed to analyse {target}: no information gathered in the report", file=sys.stderr) sys.exit(1) json.dump(report, sys.stdout, sort_keys=True, indent=2) if __name__ == "__main__": main()