From 5ba4d3cf9ea8ca11e815dd21066086745fdb690c Mon Sep 17 00:00:00 2001 From: Achilleas Koutsou Date: Thu, 2 Jan 2025 18:56:53 +0100 Subject: [PATCH] tools: remove image-info The image-info tool now lives in osbuild/osbuild. --- tools/image-info | 2891 ---------------------------------------------- 1 file changed, 2891 deletions(-) delete mode 100755 tools/image-info diff --git a/tools/image-info b/tools/image-info deleted file mode 100755 index ff1f1c286..000000000 --- a/tools/image-info +++ /dev/null @@ -1,2891 +0,0 @@ -#!/usr/bin/python3 - -import argparse -import configparser -import contextlib -import functools -import glob -import mimetypes -import operator -import json -import os -import platform -import re -import stat -import subprocess -import sys -import time -import tempfile -import xml.etree.ElementTree -import yaml -import pathlib -import jsonschema - -from collections import OrderedDict -from typing import Dict, Any - -from osbuild import devices, host, mounts, meta, monitor - -index = meta.Index("/usr/lib/osbuild/") -SECTOR_SIZE = 512 - - -def run_ostree(*args, _input=None, _check=True, **kwargs): - args = list(args) + [f'--{k}={v}' for k, v in kwargs.items()] - print("ostree " + " ".join(args), file=sys.stderr) - res = subprocess.run(["ostree"] + args, - encoding="utf-8", - stdout=subprocess.PIPE, - input=_input, - check=_check) - return res - - -def loop_open(devmgr: devices.DeviceManager, name: str, image, size, offset=0): - """ - Uses a DeviceManager to open the `name` at `offset` - Retuns a Device object and the path onto wich the image was loopback mounted - """ - info = index.get_module_info("Device", "org.osbuild.loopback") - fname = os.path.basename(image) - options = { - "filename": fname, - "start": offset // SECTOR_SIZE, - "size": size // SECTOR_SIZE - } - if not info: - raise RuntimeError("Can't load org.osbuild.loopback") - - jsonschema.validate(options, info.get_schema()) - dev = devices.Device(name, info, None, options) - reply = devmgr.open(dev) - return { - "Device": dev, - "path": os.path.join("/dev", reply["path"]) - } - - -@contextlib.contextmanager -def convert_image(image, fmt): - with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp: - if fmt["type"] != "raw": - target = os.path.join(tmp, "image.raw") - # A bug exists in qemu that causes the conversion to raw to fail - # on aarch64 systems with a LOT of CPUs. A workaround is to use - # a single coroutine to do the conversion. It doesn't slow down - # the conversion by much, but it hangs about half the time without - # the limit set. 😢 - # Bug: https://bugs.launchpad.net/qemu/+bug/1805256 - if platform.machine() == 'aarch64': - subprocess.run( - ["qemu-img", "convert", "-m", "1", "-O", "raw", image, target], - check=True - ) - else: - subprocess.run( - ["qemu-img", "convert", "-O", "raw", image, target], - check=True - ) - else: - target = image - - yield target - - -@contextlib.contextmanager -def mount_at(device, mountpoint, options=[], extra=[]): - opts = ",".join(["ro"] + options) - subprocess.run(["mount", "-o", opts] + extra + [device, mountpoint], check=True) - try: - yield mountpoint - finally: - subprocess.run(["umount", "--lazy", mountpoint], check=True) - - -@contextlib.contextmanager -def mount(device, options=None): - options = options or [] - opts = ",".join(["ro"] + options) - with tempfile.TemporaryDirectory() as mountpoint: - subprocess.run(["mount", "-o", opts, device, mountpoint], check=True) - try: - yield mountpoint - finally: - subprocess.run(["umount", "--lazy", mountpoint], check=True) - - -def parse_environment_vars(s): - r = {} - for line in s.split("\n"): - line = line.strip() - if not line: - continue - if line[0] == '#': - continue - key, value = line.split("=", 1) - r[key] = value.strip('"') - return r - - -# Parses output of `systemctl list-unit-files` -def parse_unit_files(s, expected_state): - r = [] - for line in s.split("\n")[1:]: - state = "" - unit = "" - try: - unit, state, *_ = line.split() - except ValueError: - pass - if state != expected_state: - continue - r.append(unit) - - return r - - -def subprocess_check_output(argv, parse_fn=None) -> Any: - try: - output = subprocess.check_output(argv, encoding="utf-8") - except subprocess.CalledProcessError as e: - sys.stderr.write(f"--- Output from {argv}:\n") - sys.stderr.write(e.stdout) - sys.stderr.write("\n--- End of the output\n") - raise - - return parse_fn(output) if parse_fn else output - - -def read_container_images(tree): - """ - Read installed containers - - Returns: a dictionary listing the container images in the format - like `podman images --format json` but with less information. - - NB: The parsing is done "manually" since running `podman` in the - chroot does not work. - """ - - images = [] - images_index = os.path.join("overlay-images", "images.json") - - for d in ("/var/lib/containers/storage", ): - path = os.path.join(tree, d.lstrip("/"), images_index) - try: - with open(path, "r", encoding="utf-8") as f: - data = json.load(f) - except FileNotFoundError: - continue - - for image in data: - img = { - "Id": image["id"], - "Digest": image["digest"], - "Names": image["names"], - } - created = image.get("created") - if created: - img["Created"] = created - - images.append(img) - - return images - - -def read_image_format(device) -> Dict[str, str]: - """ - Read image format. - - Returns: dictionary with at least one key 'type'. 'type' value is a string - representing the format of the image. In case the type is 'qcow2', the returned - dictionary contains second key 'compat' with a string value representing - the compatibility version of the 'qcow2' image. - - An example return value: - { - "compat": "1.1", - "type": "qcow2" - } - """ - qemu = subprocess_check_output(["qemu-img", "info", "--output=json", device], json.loads) - format = qemu["format"] - result = {"type": format} - if format == "qcow2": - result["compat"] = qemu["format-specific"]["data"]["compat"] - return result - - -def read_partition(device, partition): - """ - Read block device attributes using 'blkid' and extend the passed 'partition' - dictionary. - - Returns: the 'partition' dictionary provided as an argument, extended with - 'label', 'uuid' and 'fstype' keys and their values. - """ - res = subprocess.run(["blkid", "-c", "/dev/null", "--output", "export", - device], - check=False, encoding="utf-8", - stdout=subprocess.PIPE) - if res.returncode == 0: - blkid = parse_environment_vars(res.stdout) - else: - blkid = {} - - partition["label"] = blkid.get("LABEL") # doesn't exist for mbr - partition["uuid"] = blkid.get("UUID") - partition["fstype"] = blkid.get("TYPE") - return partition - - -def read_partition_table(device): - """ - Read information related to found partitions and partitioning table from - the device. - - Returns: dictionary with three keys - 'partition-table', 'partition-table-id' - and 'partitions'. - 'partition-table' value is a string with the type of the partition table or 'None'. - 'partition-table-id' value is a string with the ID of the partition table or 'None'. - 'partitions' value is a list of dictionaries representing found partitions. - - An example return value: - { - "partition-table": "gpt", - "partition-table-id": "DA237A6F-F0D4-47DF-BB50-007E00DB347C", - "partitions": [ - { - "bootable": false, - "partuuid": "64AF1EC2-0328-406A-8F36-83016E6DD858", - "size": 1048576, - "start": 1048576, - "type": "21686148-6449-6E6F-744E-656564454649", - }, - { - "bootable": false, - "partuuid": "D650D523-06F6-4B90-9204-8F998FE9703C", - "size": 6442450944, - "start": 2097152, - "type": "0FC63DAF-8483-4772-8E79-3D69D8477DE4", - } - ] - } - """ - partitions = [] - info = {"partition-table": None, - "partition-table-id": None, - "partitions": partitions} - try: - sfdisk = subprocess_check_output(["sfdisk", "--json", device], json.loads) - except subprocess.CalledProcessError: - # This handles a case, when the device does contain a filesystem, - # but there is no partition table. - partitions.append(read_partition(device, {})) - return info - - ptable = sfdisk["partitiontable"] - assert ptable["unit"] == "sectors" - is_dos = ptable["label"] == "dos" - ssize = ptable.get("sectorsize", SECTOR_SIZE) - - for i, p in enumerate(ptable["partitions"]): - - partuuid = p.get("uuid") - if not partuuid and is_dos: - # For dos/mbr partition layouts the partition uuid - # is generated. Normally this would be done by - # udev+blkid, when the partition table is scanned. - # 'sfdisk' prefixes the partition id with '0x' but - # 'blkid' does not; remove it to mimic 'blkid' - table_id = ptable['id'][2:] - partuuid = "%.33s-%02x" % (table_id, i+1) - - partitions.append({ - "bootable": p.get("bootable", False), - "type": p["type"], - "start": p["start"] * ssize, - "size": p["size"] * ssize, - "partuuid": partuuid - }) - - info["partitions"] = sorted(info["partitions"], key=operator.itemgetter("partuuid")) - info["partition-table"] = ptable["label"] - info["partition-table-id"] = ptable["id"] - - return info - - -def read_bootloader_type(device) -> str: - """ - Read bootloader type from the provided device. - - Returns: string representing the found bootloader. Function can return two values: - - 'grub' - - 'unknown' - """ - with open(device, "rb") as f: - if b"GRUB" in f.read(SECTOR_SIZE): - return "grub" - else: - return "unknown" - - -def read_boot_entries(boot_dir): - """ - Read boot entries. - - Returns: list of dictionaries representing configured boot entries. - - An example return value: - [ - { - "grub_arg": "--unrestricted", - "grub_class": "kernel", - "grub_users": "$grub_users", - "id": "rhel-20210429130346-0-rescue-c116920b13f44c59846f90b1057605bc", - "initrd": "/boot/initramfs-0-rescue-c116920b13f44c59846f90b1057605bc.img", - "linux": "/boot/vmlinuz-0-rescue-c116920b13f44c59846f90b1057605bc", - "options": "$kernelopts", - "title": "Red Hat Enterprise Linux (0-rescue-c116920b13f44c59846f90b1057605bc) 8.4 (Ootpa)", - "version": "0-rescue-c116920b13f44c59846f90b1057605bc" - }, - { - "grub_arg": "--unrestricted", - "grub_class": "kernel", - "grub_users": "$grub_users", - "id": "rhel-20210429130346-4.18.0-305.el8.x86_64", - "initrd": "/boot/initramfs-4.18.0-305.el8.x86_64.img $tuned_initrd", - "linux": "/boot/vmlinuz-4.18.0-305.el8.x86_64", - "options": "$kernelopts $tuned_params", - "title": "Red Hat Enterprise Linux (4.18.0-305.el8.x86_64) 8.4 (Ootpa)", - "version": "4.18.0-305.el8.x86_64" - } - ] - """ - entries = [] - for conf in glob.glob(f"{boot_dir}/loader/entries/*.conf"): - with open(conf) as f: - entries.append(dict(line.strip().split(" ", 1) for line in f)) - - return sorted(entries, key=lambda e: e["title"]) - - -def rpm_verify(tree): - """ - Read the output of 'rpm --verify'. - - Returns: dictionary with two keys 'changed' and 'missing'. - 'changed' value is a dictionary with the keys representing modified files from - installed RPM packages and values representing types of applied modifications. - 'missing' value is a list of strings prepresenting missing values owned by - installed RPM packages. - - An example return value: - { - "changed": { - "/etc/chrony.conf": "S.5....T.", - "/etc/cloud/cloud.cfg": "S.5....T.", - "/etc/nsswitch.conf": "....L....", - "/etc/openldap/ldap.conf": ".......T.", - "/etc/pam.d/fingerprint-auth": "....L....", - "/etc/pam.d/password-auth": "....L....", - "/etc/pam.d/postlogin": "....L....", - "/etc/pam.d/smartcard-auth": "....L....", - "/etc/pam.d/system-auth": "....L....", - "/etc/rhsm/rhsm.conf": "..5....T.", - "/etc/sudoers": "S.5....T.", - "/etc/systemd/logind.conf": "S.5....T." - }, - "missing": [ - "/etc/udev/rules.d/70-persistent-ipoib.rules", - "/run/cloud-init", - "/run/rpcbind", - "/run/setrans", - "/run/tuned" - ] - } - """ - # cannot use `rpm --root` here, because rpm uses passwd from the host to - # verify user and group ownership: - # https://github.com/rpm-software-management/rpm/issues/882 - rpm = subprocess.Popen(["chroot", tree, "rpm", "--verify", "--all"], - stdout=subprocess.PIPE, encoding="utf-8") - - changed = {} - missing = [] - - if rpm.stdout: - for line in rpm.stdout: - # format description in rpm(8), under `--verify` - attrs = line[:9] - if attrs == "missing ": - missing.append(line[12:].rstrip()) - else: - changed[line[13:].rstrip()] = attrs - - # ignore return value, because it returns non-zero when it found changes - rpm.wait() - - return { - "missing": sorted(missing), - "changed": changed - } - - -def rpm_not_installed_docs(tree): - """ - Gathers information on documentation, which is part of RPM packages, - but was not installed. - - Returns: list of documentation files, which are normally a part of - the installed RPM packages, but were not installed (e.g. due to using - '--excludedocs' option when executing 'rpm' command). - - An example return value: - [ - "/usr/share/man/man1/sdiff.1.gz", - "/usr/share/man/man1/seapplet.1.gz", - "/usr/share/man/man1/secon.1.gz", - "/usr/share/man/man1/secret-tool.1.gz", - "/usr/share/man/man1/sed.1.gz", - "/usr/share/man/man1/seq.1.gz" - ] - """ - # check not installed Docs (e.g. when RPMs are installed with --excludedocs) - not_installed_docs = [] - cmd = ["rpm", "--root", tree, "-qad", "--state"] - if os.path.exists(os.path.join(tree, "usr/share/rpm")): - cmd += ["--dbpath", "/usr/share/rpm"] - elif os.path.exists(os.path.join(tree, "var/lib/rpm")): - cmd += ["--dbpath", "/var/lib/rpm"] - output = subprocess_check_output(cmd) - for line in output.splitlines(): - if line.startswith("not installed"): - not_installed_docs.append(line.split()[-1]) - - return sorted(not_installed_docs) - - -def rpm_packages(tree): - """ - Read NVRs of RPM packages installed on the system. - - Returns: sorted list of strings representing RPM packages installed - on the system. - - An example return value: - [ - "NetworkManager-1.30.0-7.el8.x86_64", - "PackageKit-glib-1.1.12-6.el8.x86_64", - "PackageKit-gtk3-module-1.1.12-6.el8.x86_64", - "abattis-cantarell-fonts-0.0.25-6.el8.noarch", - "acl-2.2.53-1.el8.x86_64", - "adobe-mappings-cmap-20171205-3.el8.noarch", - "adobe-mappings-cmap-deprecated-20171205-3.el8.noarch", - "adobe-mappings-pdf-20180407-1.el8.noarch", - "adwaita-cursor-theme-3.28.0-2.el8.noarch", - "adwaita-icon-theme-3.28.0-2.el8.noarch", - "alsa-lib-1.2.4-5.el8.x86_64" - ] - """ - cmd = ["rpm", "--root", tree, "-qa"] - if os.path.exists(os.path.join(tree, "usr/share/rpm")): - cmd += ["--dbpath", "/usr/share/rpm"] - elif os.path.exists(os.path.join(tree, "var/lib/rpm")): - cmd += ["--dbpath", "/var/lib/rpm"] - subprocess_check_output(cmd) - pkgs = subprocess_check_output(cmd, str.split) - return list(sorted(pkgs)) - - -@contextlib.contextmanager -def change_root(root): - real_root = os.open("/", os.O_RDONLY) - try: - os.chroot(root) - yield None - finally: - os.fchdir(real_root) - os.chroot(".") - os.close(real_root) - - -def read_services(tree, state): - """ - Read the list of systemd services on the system in the given state. - - Returns: alphabetically sorted list of strings representing systemd services - in the given state. - The returned list may be empty. - - An example return value: - [ - "arp-ethers.service", - "canberra-system-bootup.service", - "canberra-system-shutdown-reboot.service", - "canberra-system-shutdown.service", - "chrony-dnssrv@.timer", - "chrony-wait.service" - ] - """ - services_state = subprocess_check_output( - ["systemctl", f"--root={tree}", "list-unit-files"], (lambda s: parse_unit_files(s, state))) - - # Since systemd v246, some services previously reported as "enabled" / - # "disabled" are now reported as "alias". There is no systemd command, that - # would take an "alias" unit and report its state as enabled/disabled - # and could run on a different tree (with "--root" option). - # To make the produced list of services in the given state consistent on - # pre/post v246 systemd versions, check all "alias" units and append them - # to the list, if their target is also listed in 'services_state'. - if state != "alias": - services_alias = subprocess_check_output( - ["systemctl", f"--root={tree}", "list-unit-files"], (lambda s: parse_unit_files(s, "alias"))) - - for alias in services_alias: - # The service may be in one of the following places (output of - # "systemd-analyze unit-paths", it should not change too often). - unit_paths = [ - "/etc/systemd/system.control", - "/run/systemd/system.control", - "/run/systemd/transient", - "/run/systemd/generator.early", - "/etc/systemd/system", - "/run/systemd/system", - "/run/systemd/generator", - "/usr/local/lib/systemd/system", - "/usr/lib/systemd/system", - "/run/systemd/generator.late" - ] - - with change_root(tree): - for path in unit_paths: - unit_path = os.path.join(path, alias) - if os.path.exists(unit_path): - real_unit_path = os.path.realpath(unit_path) - # Skip the alias, if there was a symlink cycle. - # When symbolic link cycles occur, the returned path will - # be one member of the cycle, but no guarantee is made about - # which member that will be. - if os.path.islink(real_unit_path): - continue - - # Append the alias unit to the list, if its target is - # already there. - if os.path.basename(real_unit_path) in services_state: - services_state.append(alias) - - # deduplicate and sort - services_state = list(set(services_state)) - services_state.sort() - - return services_state - - -def read_default_target(tree): - """ - Read the default systemd target. - - Returns: string representing the default systemd target. - - An example return value: - "multi-user.target" - """ - return subprocess_check_output(["systemctl", f"--root={tree}", "get-default"]).rstrip() - - -def read_firewall_default_zone(tree): - """ - Read the name of the default firewall zone - - Returns: a string with the zone name. If the firewall configuration doesn't - exist, an empty string is returned. - - An example return value: - "trusted" - """ - try: - with open(f"{tree}/etc/firewalld/firewalld.conf") as f: - conf = parse_environment_vars(f.read()) - return conf["DefaultZone"] - except FileNotFoundError: - return "" - - -def read_firewall_zone(tree): - """ - Read enabled services from the configuration of the default firewall zone. - - Returns: list of strings representing enabled services in the firewall. - The returned list may be empty. - - An example return value: - [ - "ssh", - "dhcpv6-client", - "cockpit" - ] - """ - default = read_firewall_default_zone(tree) - if default == "": - default = "public" - - r = [] - try: - root = xml.etree.ElementTree.parse(f"{tree}/etc/firewalld/zones/{default}.xml").getroot() - except FileNotFoundError: - root = xml.etree.ElementTree.parse(f"{tree}/usr/lib/firewalld/zones/{default}.xml").getroot() - - for element in root.findall("service"): - r.append(element.get("name")) - - return r - - -def read_fstab(tree): - """ - Read the content of /etc/fstab. - - Returns: list of all uncommented lines read from the configuration file - represented as a list of values split by whitespaces. - The returned list may be empty. - - An example return value: - [ - [ - "UUID=6d066eb4-e4c1-4472-91f9-d167097f48d1", - "/", - "xfs", - "defaults", - "0", - "0" - ] - ] - """ - result = [] - with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/etc/fstab") as f: - result = sorted([line.split() for line in f if line.strip() and not line.startswith("#")]) - return result - - -def read_rhsm(tree): - """ - Read configuration changes possible via org.osbuild.rhsm stage - and in addition also the whole content of /etc/rhsm/rhsm.conf. - - Returns: returns dictionary with two keys - 'dnf-plugins' and 'rhsm.conf'. - 'dnf-plugins' value represents configuration of 'product-id' and - 'subscription-manager' DNF plugins. - 'rhsm.conf' value is a dictionary representing the content of the RHSM - configuration file. - The returned dictionary may be empty. - - An example return value: - { - "dnf-plugins": { - "product-id": { - "enabled": true - }, - "subscription-manager": { - "enabled": true - } - }, - "rhsm.conf": { - "logging": { - "default_log_level": "INFO" - }, - "rhsm": { - "auto_enable_yum_plugins": "1", - "baseurl": "https://cdn.redhat.com", - "ca_cert_dir": "/etc/rhsm/ca/", - "consumercertdir": "/etc/pki/consumer", - "entitlementcertdir": "/etc/pki/entitlement", - "full_refresh_on_yum": "0", - "inotify": "1", - "manage_repos": "0", - "package_profile_on_trans": "0", - "pluginconfdir": "/etc/rhsm/pluginconf.d", - "plugindir": "/usr/share/rhsm-plugins", - "productcertdir": "/etc/pki/product", - "repo_ca_cert": "/etc/rhsm/ca/redhat-uep.pem", - "repomd_gpg_url": "", - "report_package_profile": "1" - }, - "rhsmcertd": { - "auto_registration": "1", - "auto_registration_interval": "60", - "autoattachinterval": "1440", - "certcheckinterval": "240", - "disable": "0", - "splay": "1" - }, - "server": { - "hostname": "subscription.rhsm.redhat.com", - "insecure": "0", - "no_proxy": "", - "port": "443", - "prefix": "/subscription", - "proxy_hostname": "", - "proxy_password": "", - "proxy_port": "", - "proxy_scheme": "http", - "proxy_user": "", - "ssl_verify_depth": "3" - } - } - } - """ - result = {} - - # Check RHSM DNF plugins configuration and allowed options - dnf_plugins_config = { - "product-id": f"{tree}/etc/dnf/plugins/product-id.conf", - "subscription-manager": f"{tree}/etc/dnf/plugins/subscription-manager.conf" - } - - for plugin_name, plugin_path in dnf_plugins_config.items(): - with contextlib.suppress(FileNotFoundError): - with open(plugin_path) as f: - parser = configparser.ConfigParser() - parser.read_file(f) - # only read "enabled" option from "main" section - with contextlib.suppress(configparser.NoSectionError, configparser.NoOptionError): - # get the value as the first thing, in case it raises an exception - enabled = parser.getboolean("main", "enabled") - - try: - dnf_plugins_dict = result["dnf-plugins"] - except KeyError as _: - dnf_plugins_dict = result["dnf-plugins"] = {} - - try: - plugin_dict = dnf_plugins_dict[plugin_name] - except KeyError as _: - plugin_dict = dnf_plugins_dict[plugin_name] = {} - - plugin_dict["enabled"] = enabled - - with contextlib.suppress(FileNotFoundError): - rhsm_conf = {} - with open(f"{tree}/etc/rhsm/rhsm.conf") as f: - parser = configparser.ConfigParser() - parser.read_file(f) - for section in parser.sections(): - section_dict = {} - section_dict.update(parser[section]) - if section_dict: - rhsm_conf[section] = section_dict - - result["rhsm.conf"] = rhsm_conf - - return result - - -def read_sysconfig(tree): - """ - Read selected configuration files from /etc/sysconfig. - - Currently supported sysconfig files are: - - 'kernel' - /etc/sysconfig/kernel - - 'network' - /etc/sysconfig/network - - 'network-scripts' - /etc/sysconfig/network-scripts/ifcfg-* - - Returns: dictionary with the keys being the supported types of sysconfig - configurations read by the function. Values of 'kernel' and 'network' keys - are a dictionaries containing key/values read from the respective - configuration files. Value of 'network-scripts' key is a dictionary with - the keys corresponding to the suffix of each 'ifcfg-*' configuration file - and their values holding dictionaries with all key/values read from the - configuration file. - The returned dictionary may be empty. - - An example return value: - { - "kernel": { - "DEFAULTKERNEL": "kernel", - "UPDATEDEFAULT": "yes" - }, - "network": { - "NETWORKING": "yes", - "NOZEROCONF": "yes" - }, - "network-scripts": { - "ens3": { - "BOOTPROTO": "dhcp", - "BROWSER_ONLY": "no", - "DEFROUTE": "yes", - "DEVICE": "ens3", - "IPV4_FAILURE_FATAL": "no", - "IPV6INIT": "yes", - "IPV6_AUTOCONF": "yes", - "IPV6_DEFROUTE": "yes", - "IPV6_FAILURE_FATAL": "no", - "NAME": "ens3", - "ONBOOT": "yes", - "PROXY_METHOD": "none", - "TYPE": "Ethernet", - "UUID": "106f1b31-7093-41d6-ae47-1201710d0447" - }, - "eth0": { - "BOOTPROTO": "dhcp", - "DEVICE": "eth0", - "IPV6INIT": "no", - "ONBOOT": "yes", - "PEERDNS": "yes", - "TYPE": "Ethernet", - "USERCTL": "yes" - } - } - } - """ - result = {} - sysconfig_paths = { - "kernel": f"{tree}/etc/sysconfig/kernel", - "network": f"{tree}/etc/sysconfig/network" - } - # iterate through supported configs - for name, path in sysconfig_paths.items(): - with contextlib.suppress(FileNotFoundError): - with open(path) as f: - # if file exists start with empty array of values - result[name] = parse_environment_vars(f.read()) - - # iterate through all files in /etc/sysconfig/network-scripts - network_scripts = {} - files = glob.glob(f"{tree}/etc/sysconfig/network-scripts/ifcfg-*") - for file in files: - ifname = os.path.basename(file).lstrip("ifcfg-") - with open(file) as f: - network_scripts[ifname] = parse_environment_vars(f.read()) - - if network_scripts: - result["network-scripts"] = network_scripts - - return result - - -def read_hosts(tree): - """ - Read non-empty lines of /etc/hosts. - - Returns: list of strings for all uncommented lines in the configuration file. - The returned list may be empty. - - An example return value: - [ - "127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4", - "::1 localhost localhost.localdomain localhost6 localhost6.localdomain6" - ] - """ - result = [] - - with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/etc/hosts") as f: - for line in f: - line = line.strip() - if line: - result.append(line) - return result - - -def read_logind_config(config_path): - """ - Read all uncommented key/values from the 'Login" section of system-logind - configuration file. - - Returns: dictionary with key/values read from the configuration file. - The returned dictionary may be empty. - - An example return value: - { - "NAutoVTs": "0" - } - """ - result = {} - - with open(config_path) as f: - parser = configparser.RawConfigParser() - # prevent conversion of the option name to lowercase - parser.optionxform = lambda option: option - parser.read_file(f) - with contextlib.suppress(configparser.NoSectionError): - result.update(parser["Login"]) - return result - - -def read_logind_configs(tree): - """ - Read all systemd-logind *.conf files from a predefined list of paths and - parse them. - - The searched paths are: - - "/etc/systemd/logind.conf" - - "/etc/systemd/logind.conf.d/*.conf" - - "/usr/lib/systemd/logind.conf.d/*.conf" - - Returns: dictionary as returned by '_read_glob_paths_with_parser()' with - configuration representation as returned by 'read_logind_config()'. - - An example return value: - { - "/etc/systemd/logind.conf": { - "NAutoVTs": "0" - } - } - """ - checked_globs = [ - "/etc/systemd/logind.conf", - "/etc/systemd/logind.conf.d/*.conf", - "/usr/lib/systemd/logind.conf.d/*.conf" - ] - - return _read_glob_paths_with_parser(tree, checked_globs, read_logind_config) - - -def read_locale(tree): - """ - Read all uncommented key/values set in /etc/locale.conf. - - Returns: dictionary with key/values read from the configuration file. - The returned dictionary may be empty. - - An example return value: - { - "LANG": "en_US" - } - """ - with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/etc/locale.conf") as f: - return parse_environment_vars(f.read()) - - -def read_selinux_info(tree, is_ostree): - """ - Read information related to SELinux. - - Returns: dictionary with two keys - 'policy' and 'context-mismatch'. - 'policy' value corresponds to the value returned by read_selinux_conf(). - 'context-mismatch' value corresponds to the value returned by - read_selinux_ctx_mismatch(). - The returned dictionary may be empty. Keys with empty values are omitted. - - An example return value: - { - "context-mismatch": [ - { - "actual": "system_u:object_r:root_t:s0", - "expected": "system_u:object_r:device_t:s0", - "filename": "/dev" - }, - { - "actual": "system_u:object_r:root_t:s0", - "expected": "system_u:object_r:default_t:s0", - "filename": "/proc" - } - ], - "policy": { - "SELINUX": "permissive", - "SELINUXTYPE": "targeted" - } - } - """ - result = {} - - policy = read_selinux_conf(tree) - if policy: - result["policy"] = policy - - with contextlib.suppress(subprocess.CalledProcessError): - ctx_mismatch = read_selinux_ctx_mismatch(tree, is_ostree) - if ctx_mismatch: - result["context-mismatch"] = ctx_mismatch - - return result - - -def read_selinux_conf(tree): - """ - Read all uncommented key/values set in /etc/selinux/config. - - Returns: dictionary with key/values read from the configuration - file. - - An example of returned value: - { - "SELINUX": "enforcing", - "SELINUXTYPE": "targeted" - } - """ - with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/etc/selinux/config") as f: - return parse_environment_vars(f.read()) - - -def read_selinux_ctx_mismatch(tree, is_ostree): - """ - Read any mismatch in selinux context of files on the image. - - Returns: list of dictionaries as described below. If there - are no mismatches between used and expected selinux context, - then an empty list is returned. - - If the checked 'tree' is ostree, then the path '/etc' is - excluded from the check. This is beause it is bind-mounted - from /usr/etc and therefore has incorrect selinux context - for its filesystem path. - - An example of returned value: - [ - { - "actual": "system_u:object_r:root_t:s0", - "expected": "system_u:object_r:device_t:s0", - "filename": "/dev" - }, - { - "actual": "system_u:object_r:root_t:s0", - "expected": "system_u:object_r:default_t:s0", - "filename": "/proc" - } - ] - """ - result = [] - - # The binary policy that should be used is on the image and has name "policy.X" - # where the "X" is a number. There may be more than one policy files. - # In the usual case, the policy with the highest number suffix should be used. - policy_files = glob.glob(f"{tree}/etc/selinux/targeted/policy/policy.*") - policy_files = sorted(policy_files, reverse=True) - - if policy_files: - CMD = [ - "setfiles", - "-r", f"{tree}", - "-nvF", - "-c", policy_files[0], # take the policy with the highest number - f"{tree}/etc/selinux/targeted/contexts/files/file_contexts", - f"{tree}" - ] - - if is_ostree: - # exclude /etc from being checked when the tree is ostree, because - # it is just bind-mounted from /usr/etc and has incorrect selinux - # context for /etc path - CMD.extend(["-e", f"{tree}/etc"]) - - output = subprocess_check_output(CMD) - - # output are lines such as: - # Would relabel /tmp/tmpwrozmb47/dev from system_u:object_r:root_t:s0 to system_u:object_r:device_t:s0\n - setfiles_pattern = r"Would\s+relabel\s+(?P.+)\s+from\s+(?P.+)\s+to\s+(?P.+)" - setfiles_re = re.compile(setfiles_pattern) - - for line in output.splitlines(): - line = line.strip() - if not line: - continue - match = setfiles_re.match(line) - # do not silently ignore changes of 'setfiles' output - if not match: - raise RuntimeError(f"could not match line '{line}' with pattern '{setfiles_pattern}'") - parsed_line = { - "filename": match.group("filename")[len(tree):], - "actual": match.group("actual"), - "expected": match.group("expected") - } - result.append(parsed_line) - - # sort the list to make it consistent across runs - result.sort(key=lambda x: x.get("filename")) - - return result - - -def _read_glob_paths_with_parser(tree, glob_paths, parser_func): - """ - Use 'parser_func' to read all files obtained by using all 'glob_paths' - globbing patterns under the 'tree' path. - - The 'glob_paths' is a list string patterns accepted by glob.glob(). - The 'parser_func' function is expected to take a single string argument - containing the absolute path to a configuration file which should be parsed. - Its return value can be arbitrary representation of the parsed - configuration. - - Returns: dictionary with the keys corresponding to directories, which - contain configuration files mathing the provided glob pattern. Value of - each key is another dictionary with keys representing each filename and - values being the parsed configuration representation as returned by the - provided 'parser_func' function. - - An example return value for dracut configuration paths and parser: - { - "/etc/dracut.conf.d": { - "sgdisk.conf": { - "install_items": " sgdisk " - }, - }, - "/usr/lib/dracut/dracut.conf.d": { - "xen.conf": { - "add_drivers": " xen-netfront xen-blkfront " - } - } - } - """ - result = {} - - for glob_path in glob_paths: - glob_path_result = {} - - files = glob.glob(f"{tree}{glob_path}") - for file in files: - config = parser_func(file) - if config: - filename = os.path.basename(file) - glob_path_result[filename] = config - - if glob_path_result: - checked_path = os.path.dirname(glob_path) - result[checked_path] = glob_path_result - - return result - - -def read_modprobe_config(config_path): - """ - Read a specific modprobe configuragion file and for now, extract only - blacklisted kernel modules. - - Returns: dictionary with the keys corresponding to specific modprobe - commands and values being the values of these commands. - - An example return value: - { - "blacklist": [ - "nouveau" - ] - } - """ - file_result = {} - - BLACKLIST_CMD = "blacklist" - - with open(config_path) as f: - # The format of files under modprobe.d: one command per line, - # with blank lines and lines starting with '#' ignored. - # A '\' at the end of a line causes it to continue on the next line. - line_to_be_continued = "" - for line in f: - line = line.strip() - # line is not blank - if line: - # comment, skip it - if line[0] == "#": - continue - # this line continues on the following line - if line[-1] == "\\": - line_to_be_continued += line[:-1] - continue - # this line ends here - else: - # is this line continuation of the previous one? - if line_to_be_continued: - line = line_to_be_continued + line - line_to_be_continued = "" - cmd, cmd_args = line.split(' ', 1) - # we care only about blacklist command for now - if cmd == BLACKLIST_CMD: - modules_list = file_result[BLACKLIST_CMD] = [] - modules_list.append(cmd_args) - - return file_result - - -def read_modprobe_configs(tree): - """ - Read all modprobe *.conf files from a predefined list of paths and extract - supported commands. For now, extract only blacklisted kernel modules. - - The searched paths are: - - "/etc/modprobe.d/*.conf" - - "/usr/lib/modprobe.d/*.conf" - - "/usr/local/lib/modprobe.d/*.conf" - - Returns: dictionary as returned by '_read_glob_paths_with_parser()' with - configuration representation as returned by 'read_modprobe_config()'. - - An example return value: - { - "/usr/lib/modprobe.d": { - "blacklist-nouveau.conf": { - "blacklist": [ - "nouveau" - ] - } - } - } - """ - checked_globs = [ - "/etc/modprobe.d/*.conf", - "/usr/lib/modprobe.d/*.conf", - "/usr/local/lib/modprobe.d/*.conf" - ] - - return _read_glob_paths_with_parser(tree, checked_globs, read_modprobe_config) - - -def read_cloud_init_config(config_path): - """ - Read the specific cloud-init configuration file. - - Returns: dictionary representing the cloud-init configuration. - - An example return value: - { - "cloud_config_modules": [ - "mounts", - "locale", - "set-passwords", - "rh_subscription", - "yum-add-repo", - "package-update-upgrade-install", - "timezone", - "puppet", - "chef", - "salt-minion", - "mcollective", - "disable-ec2-metadata", - "runcmd" - ], - "cloud_final_modules": [ - "rightscale_userdata", - "scripts-per-once", - "scripts-per-boot", - "scripts-per-instance", - "scripts-user", - "ssh-authkey-fingerprints", - "keys-to-console", - "phone-home", - "final-message", - "power-state-change" - ], - "cloud_init_modules": [ - "disk_setup", - "migrator", - "bootcmd", - "write-files", - "growpart", - "resizefs", - "set_hostname", - "update_hostname", - "update_etc_hosts", - "rsyslog", - "users-groups", - "ssh" - ], - "disable_root": 1, - "disable_vmware_customization": false, - "mount_default_fields": [ - null, - null, - "auto", - "defaults,nofail,x-systemd.requires=cloud-init.service", - "0", - "2" - ], - "resize_rootfs_tmp": "/dev", - "ssh_deletekeys": 1, - "ssh_genkeytypes": null, - "ssh_pwauth": 0, - "syslog_fix_perms": null, - "system_info": { - "default_user": { - "gecos": "Cloud User", - "groups": [ - "adm", - "systemd-journal" - ], - "lock_passwd": true, - "name": "ec2-user", - "shell": "/bin/bash", - "sudo": [ - "ALL=(ALL) NOPASSWD:ALL" - ] - }, - "distro": "rhel", - "paths": { - "cloud_dir": "/var/lib/cloud", - "templates_dir": "/etc/cloud/templates" - }, - "ssh_svcname": "sshd" - }, - "users": [ - "default" - ] - } - """ - result = {} - - with contextlib.suppress(FileNotFoundError): - with open(config_path) as f: - config = yaml.safe_load(f) - result.update(config) - - return result - - -def read_cloud_init_configs(tree): - """ - Read all cloud-init *.cfg files from a predefined list of paths and parse them. - - The searched paths are: - - "/etc/cloud/cloud.cfg" - - "/etc/cloud/cloud.cfg.d/*.cfg" - - Returns: dictionary as returned by '_read_glob_paths_with_parser()' with - configuration representation as returned by 'read_cloud_init_config()'. - - An example return value: - { - "/etc/cloud.cfg.d": - "ec2.cfg": { - "default_user": { - "name": "ec2-user" - } - } - } - } - """ - checked_globs = [ - "/etc/cloud/cloud.cfg", - "/etc/cloud/cloud.cfg.d/*.cfg" - ] - - return _read_glob_paths_with_parser(tree, checked_globs, read_cloud_init_config) - - -def read_dracut_config(config_path): - """ - Read specific dracut configuration file. - - Returns: dictionary representing the uncommented configuration options read - from the file. - - An example return value: - { - "install_items": " sgdisk " - "add_drivers": " xen-netfront xen-blkfront " - } - """ - result = {} - - with open(config_path) as f: - # dracut configuration key/values delimiter is '=' or '+=' - for line in f: - line = line.strip() - # A '#' indicates the beginning of a comment; following - # characters, up to the end of the line are not interpreted. - line_comment = line.split("#", 1) - line = line_comment[0] - if line: - key, value = line.split("=", 1) - if key[-1] == "+": - key = key[:-1] - result[key] = value.strip('"') - - return result - - -def read_dracut_configs(tree): - """ - Read all dracut *.conf files from a predefined list of paths and parse them. - - The searched paths are: - - "/etc/dracut.conf.d/*.conf" - - "/usr/lib/dracut/dracut.conf.d/*.conf" - - Returns: dictionary as returned by '_read_glob_paths_with_parser()' with - configuration representation as returned by 'read_dracut_config()'. - - An example return value: - { - "/etc/dracut.conf.d": { - "sgdisk.conf": { - "install_items": " sgdisk " - }, - }, - "/usr/lib/dracut/dracut.conf.d": { - "xen.conf": { - "add_drivers": " xen-netfront xen-blkfront " - } - } - } - """ - checked_globs = [ - "/etc/dracut.conf.d/*.conf", - "/usr/lib/dracut/dracut.conf.d/*.conf" - ] - - return _read_glob_paths_with_parser(tree, checked_globs, read_dracut_config) - - -def read_keyboard_conf(tree): - """ - Read keyboard configuration for vconsole and X11. - - Returns: dictionary with at most two keys 'X11' and 'vconsole'. - 'vconsole' value is a dictionary representing configuration read from - /etc/vconsole.conf. - 'X11' value is a dictionary with at most two keys 'layout' and 'variant', - which values are extracted from X11 keyborad configuration. - - An example return value: - { - "X11": { - "layout": "us" - }, - "vconsole": { - "FONT": "eurlatgr", - "KEYMAP": "us" - } - } - """ - result = {} - - # read virtual console configuration - with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/etc/vconsole.conf") as f: - values = parse_environment_vars(f.read()) - if values: - result["vconsole"] = values - - # read X11 keyboard configuration - with contextlib.suppress(FileNotFoundError): - # Example file content: - # - # Section "InputClass" - # Identifier "system-keyboard" - # MatchIsKeyboard "on" - # Option "XkbLayout" "us,sk" - # Option "XkbVariant" ",qwerty" - # EndSection - x11_config = {} - match_options_dict = { - "layout": r'Section\s+"InputClass"\s+.*Option\s+"XkbLayout"\s+"([\w,-]+)"\s+.*EndSection', - "variant": r'Section\s+"InputClass"\s+.*Option\s+"XkbVariant"\s+"([\w,-]+)"\s+.*EndSection' - } - with open(f"{tree}/etc/X11/xorg.conf.d/00-keyboard.conf") as f: - config = f.read() - for option, pattern in match_options_dict.items(): - match = re.search(pattern, config, re.DOTALL) - if match and match.group(1): - x11_config[option] = match.group(1) - - if x11_config: - result["X11"] = x11_config - - return result - - -def read_chrony_conf(tree): - """ - Read specific directives from Chrony configuration. Currently parsed - directives are: - - 'server' - - 'pool' - - 'peer' - - 'leapsectz' - - Returns: dictionary with the keys representing parsed directives from Chrony - configuration. Value of each key is a list of strings containing arguments - provided with each occurance of the directive in the configuration. - - An example return value: - { - "leapsectz": [ - "right/UTC" - ], - "pool": [ - "2.rhel.pool.ntp.org iburst" - ], - "server": [ - "169.254.169.123 prefer iburst minpoll 4 maxpoll 4" - ] - } - """ - result = {} - - parsed_directives = ["server", "pool", "peer", "leapsectz"] - - with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/etc/chrony.conf") as f: - for line in f: - line = line.strip() - if not line: - continue - # skip comments - if line[0] in ["!", ";", "#", "%"]: - continue - split_line = line.split() - if split_line[0] in parsed_directives: - try: - directive_list = result[split_line[0]] - except KeyError: - directive_list = result[split_line[0]] = [] - directive_list.append(" ".join(split_line[1:])) - - return result - - -def read_systemd_service_dropin(dropin_dir_path): - """ - Read systemd .service unit drop-in configurations. - - Returns: dictionary representing the combined drop-in configurations. - - An example return value: - { - "Service": { - "Environment": "NM_CLOUD_SETUP_EC2=yes" - } - } - """ - # read all unit drop-in configurations - config_files = glob.glob(f"{dropin_dir_path}/*.conf") - parser = configparser.RawConfigParser() - # prevent conversion of the opion name to lowercase - parser.optionxform = lambda option: option - parser.read(config_files) - - dropin_config = {} - for section in parser.sections(): - section_config = {} - section_config.update(parser[section]) - if section_config: - dropin_config[section] = section_config - - return dropin_config - - -def read_systemd_service_dropins(tree): - """ - Read all systemd .service unit config files from a predefined list of paths - and parse them. - - The searched paths are: - - "/etc/systemd/system/*.service.d" - - "/usr/lib/systemd/system/*.service.d" - - Returns: dictionary as returned by '_read_glob_paths_with_parser()' with - configuration representation as returned by 'read_systemd_service_dropin()'. - - An example return value: - { - "/etc/systemd/system": { - "nm-cloud-setup.service.d": { - "Service": { - "Environment": "NM_CLOUD_SETUP_EC2=yes" - } - } - } - } - """ - checked_globs = [ - "/etc/systemd/system/*.service.d", - "/usr/lib/systemd/system/*.service.d" - ] - - return _read_glob_paths_with_parser(tree, checked_globs, read_systemd_service_dropin) - - -def read_tmpfilesd_config(config_path): - """ - Read tmpfiles.d configuration files. - - Returns: list of strings representing uncommented lines read from the - configuration file. - - An example return value: - [ - "x /tmp/.sap*", - "x /tmp/.hdb*lock", - "x /tmp/.trex*lock" - ] - """ - file_lines = [] - - with open(config_path) as f: - for line in f: - line = line.strip() - if not line: - continue - if line[0] == "#": - continue - file_lines.append(line) - - return file_lines - - -def read_tmpfilesd_configs(tree): - """ - Read all tmpfiles.d *.conf files from a predefined list of paths and parse - them. - - The searched paths are: - - "/etc/tmpfiles.d/*.conf" - - "/usr/lib/tmpfiles.d/*.conf" - - Returns: dictionary as returned by '_read_glob_paths_with_parser()' with - configuration representation as returned by 'read_tmpfilesd_config()'. - - An example return value: - { - "/etc/tmpfiles.d": { - "sap.conf": [ - "x /tmp/.sap*", - "x /tmp/.hdb*lock", - "x /tmp/.trex*lock" - ] - } - } - """ - checked_globs = [ - "/etc/tmpfiles.d/*.conf", - "/usr/lib/tmpfiles.d/*.conf" - ] - - return _read_glob_paths_with_parser(tree, checked_globs, read_tmpfilesd_config) - - -def read_tuned_profile(tree): - """ - Read the Tuned active profile and profile mode. - - Returns: dictionary with at most two keys 'active_profile' and 'profile_mode'. - Value of each key is a string representing respective tuned configuration - value. - - An example return value: - { - "active_profile": "sap-hana", - "profile_mode": "manual" - } - """ - result = {} - config_files = ["active_profile", "profile_mode"] - - with contextlib.suppress(FileNotFoundError): - for config_file in config_files: - with open(f"{tree}/etc/tuned/{config_file}") as f: - value = f.read() - value = value.strip() - if value: - result[config_file] = value - - return result - - -def read_sysctld_config(config_path): - """ - Read sysctl configuration file. - - Returns: list of strings representing uncommented lines read from the - configuration file. - - An example return value: - [ - "kernel.pid_max = 4194304", - "vm.max_map_count = 2147483647" - ] - """ - values = [] - - with open(config_path) as f: - for line in f: - line = line.strip() - if not line: - continue - # skip comments - if line[0] in ["#", ";"]: - continue - values.append(line) - - return values - - -def read_sysctld_configs(tree): - """ - Read all sysctl.d *.conf files from a predefined list of paths and parse - them. - - The searched paths are: - - "/etc/sysctl.d/*.conf", - - "/usr/lib/sysctl.d/*.conf" - - Returns: dictionary as returned by '_read_glob_paths_with_parser()' with - configuration representation as returned by 'read_sysctld_config()'. - - An example return value: - { - "/etc/sysctl.d": { - "sap.conf": [ - "kernel.pid_max = 4194304", - "vm.max_map_count = 2147483647" - ] - } - } - """ - checked_globs = [ - "/etc/sysctl.d/*.conf", - "/usr/lib/sysctl.d/*.conf" - ] - - return _read_glob_paths_with_parser(tree, checked_globs, read_sysctld_config) - - -def read_security_limits_config(config_path): - """ - Read all configuration files from /etc/security/limits.d. - - Returns: dictionary with the keys representing names of configuration files - from /etc/security/limits.d. Value of each key is a dictionary representing - uncommented configuration values read from the configuration file. - - An example return value: - [ - { - "domain": "@sapsys", - "item": "nofile", - "type": "hard", - "value": "65536" - }, - { - "domain": "@sapsys", - "item": "nofile", - "type": "soft", - "value": "65536" - } - ] - """ - values = [] - - with open(config_path) as f: - for line in f: - line = line.strip() - # the '#' character introduces a comment - after which the rest of the line is ignored - split_line = line.split("#", 1) - line = split_line[0] - if not line: - continue - # Syntax of a line is " " - domain, limit_type, item, value = line.split() - values.append({ - "domain": domain, - "type": limit_type, - "item": item, - "value": value - }) - - return values - - -def read_security_limits_configs(tree): - """ - Read all security limits *.conf files from a predefined list of paths and - parse them. - - The searched paths are: - - "/etc/security/limits.conf" - - "/etc/security/limits.d/*.conf" - - Returns: dictionary as returned by '_read_glob_paths_with_parser()' with - configuration representation as returned by 'read_security_limits_config()'. - - An example return value: - { - "/etc/security/limits.d": { - "99-sap.conf": [ - { - "domain": "@sapsys", - "item": "nofile", - "type": "hard", - "value": "65536" - }, - { - "domain": "@sapsys", - "item": "nofile", - "type": "soft", - "value": "65536" - } - ] - } - } - """ - checked_globs = [ - "/etc/security/limits.conf", - "/etc/security/limits.d/*.conf" - ] - - return _read_glob_paths_with_parser(tree, checked_globs, read_tmpfilesd_config) - - -def read_ssh_config(config_path): - """ - Read the content of provided SSH(d) configuration file. - - Returns: list of uncommented and non-empty lines read from the configuation - file. - - An example return value: - [ - "Match final all", - "Include /etc/crypto-policies/back-ends/openssh.config", - "GSSAPIAuthentication yes", - "ForwardX11Trusted yes", - "SendEnv LANG LC_CTYPE LC_NUMERIC LC_TIME LC_COLLATE LC_MONETARY LC_MESSAGES", - "SendEnv LC_PAPER LC_NAME LC_ADDRESS LC_TELEPHONE LC_MEASUREMENT", - "SendEnv LC_IDENTIFICATION LC_ALL LANGUAGE", - "SendEnv XMODIFIERS" - ] - """ - config_lines = [] - - with open(config_path) as f: - for line in f: - line = line.strip() - if not line: - continue - if line[0] == "#": - continue - config_lines.append(line) - - return config_lines - - -def read_ssh_configs(tree): - """ - Read all SSH configuration files from a predefined list of paths and - parse them. - - The searched paths are: - - "/etc/ssh/ssh_config" - - "/etc/ssh/ssh_config.d/*.conf" - - Returns: dictionary as returned by '_read_glob_paths_with_parser()' with - configuration representation as returned by 'read_ssh_config()'. - - An example return value: - { - "/etc/ssh": { - "ssh_config": [ - "Include /etc/ssh/ssh_config.d/*.conf" - ] - }, - "/etc/ssh/ssh_config.d": { - "05-redhat.conf": [ - "Match final all", - "Include /etc/crypto-policies/back-ends/openssh.config", - "GSSAPIAuthentication yes", - "ForwardX11Trusted yes", - "SendEnv LANG LC_CTYPE LC_NUMERIC LC_TIME LC_COLLATE LC_MONETARY LC_MESSAGES", - "SendEnv LC_PAPER LC_NAME LC_ADDRESS LC_TELEPHONE LC_MEASUREMENT", - "SendEnv LC_IDENTIFICATION LC_ALL LANGUAGE", - "SendEnv XMODIFIERS" - ] - } - } - """ - checked_globs = [ - "/etc/ssh/ssh_config", - "/etc/ssh/ssh_config.d/*.conf" - ] - - return _read_glob_paths_with_parser(tree, checked_globs, read_ssh_config) - - -def read_sshd_configs(tree): - """ - Read all SSHd configuration files from a predefined list of paths and - parse them. - - The searched paths are: - - "/etc/ssh/sshd_config" - - "/etc/ssh/sshd_config.d/*.conf" - - Returns: dictionary as returned by '_read_glob_paths_with_parser()' with - configuration representation as returned by 'read_ssh_config()'. - - An example return value: - { - "/etc/ssh": { - "sshd_config": [ - "HostKey /etc/ssh/ssh_host_rsa_key", - "HostKey /etc/ssh/ssh_host_ecdsa_key", - "HostKey /etc/ssh/ssh_host_ed25519_key", - "SyslogFacility AUTHPRIV", - "PermitRootLogin no", - "AuthorizedKeysFile\t.ssh/authorized_keys", - "PasswordAuthentication no", - "ChallengeResponseAuthentication no", - "GSSAPIAuthentication yes", - "GSSAPICleanupCredentials no", - "UsePAM yes", - "X11Forwarding yes", - "PrintMotd no", - "AcceptEnv LANG LC_CTYPE LC_NUMERIC LC_TIME LC_COLLATE LC_MONETARY LC_MESSAGES", - "AcceptEnv LC_PAPER LC_NAME LC_ADDRESS LC_TELEPHONE LC_MEASUREMENT", - "AcceptEnv LC_IDENTIFICATION LC_ALL LANGUAGE", - "AcceptEnv XMODIFIERS", - "Subsystem\tsftp\t/usr/libexec/openssh/sftp-server", - "ClientAliveInterval 420" - ] - } - } - """ - checked_globs = [ - "/etc/ssh/sshd_config", - "/etc/ssh/sshd_config.d/*.conf" - ] - - return _read_glob_paths_with_parser(tree, checked_globs, read_ssh_config) - - -def read_yum_repos(tree): - """ - Read all YUM/DNF repo files. - - The searched paths are: - - "/etc/yum.repos.d/*.repo" - - Returns: dictionary as returned by '_read_glob_paths_with_parser()' with - configuration representation as returned by '_read_inifile_to_dict()'. - - An example return value: - { - "/etc/yum.repos.d": { - "google-cloud.repo": { - "google-cloud-sdk": { - "baseurl": "https://packages.cloud.google.com/yum/repos/cloud-sdk-el8-x86_64", - "enabled": "1", - "gpgcheck": "1", - "gpgkey": "https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg", - "name": "Google Cloud SDK", - "repo_gpgcheck": "0" - }, - "google-compute-engine": { - "baseurl": "https://packages.cloud.google.com/yum/repos/google-compute-engine-el8-x86_64-stable", - "enabled": "1", - "gpgcheck": "1", - "gpgkey": "https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg", - "name": "Google Compute Engine", - "repo_gpgcheck": "0" - } - } - } - } - """ - checked_globs = [ - "/etc/yum.repos.d/*.repo" - ] - - return _read_glob_paths_with_parser(tree, checked_globs, _read_inifile_to_dict) - - -def read_sudoers(tree): - """ - Read uncommented lines from sudoers configuration file and /etc/sudoers.d - This functions does not actually do much of a parsing, as sudoers file - format grammar is a bit too much for our purpose. - Any #include or #includedir directives are ignored by this function. - - Returns: dictionary with the keys representing names of read configuration - files, /etc/sudoers and files from /etc/sudoers.d. Value of each key is - a list of strings representing uncommented lines read from the configuration - file. - - An example return value: - { - "/etc/sudoers": [ - "Defaults !visiblepw", - "Defaults always_set_home", - "Defaults match_group_by_gid", - "Defaults always_query_group_plugin", - "Defaults env_reset", - "Defaults env_keep = \"COLORS DISPLAY HOSTNAME HISTSIZE KDEDIR LS_COLORS\"", - "Defaults env_keep += \"MAIL PS1 PS2 QTDIR USERNAME LANG LC_ADDRESS LC_CTYPE\"", - "Defaults env_keep += \"LC_COLLATE LC_IDENTIFICATION LC_MEASUREMENT LC_MESSAGES\"", - "Defaults env_keep += \"LC_MONETARY LC_NAME LC_NUMERIC LC_PAPER LC_TELEPHONE\"", - "Defaults env_keep += \"LC_TIME LC_ALL LANGUAGE LINGUAS _XKB_CHARSET XAUTHORITY\"", - "Defaults secure_path = /sbin:/bin:/usr/sbin:/usr/bin", - "root\tALL=(ALL) \tALL", - "%wheel\tALL=(ALL)\tALL", - "ec2-user\tALL=(ALL)\tNOPASSWD: ALL" - ] - } - """ - result = {} - - def _parse_sudoers_file(f): - lines = [] - for line in f: - line = line.strip() - if not line: - continue - if line[0] == "#": - continue - lines.append(line) - return lines - - with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/etc/sudoers") as f: - lines = _parse_sudoers_file(f) - if lines: - result["/etc/sudoers"] = lines - - sudoersd_result = {} - for file in glob.glob(f"{tree}/etc/sudoers.d/*"): - with open(file) as f: - lines = _parse_sudoers_file(f) - if lines: - result[os.path.basename(file)] = lines - if sudoersd_result: - result["/etc/sudoers.d"] = sudoersd_result - - return result - - -def read_udev_rules(tree): - """ - Read udev rules defined in /etc/udev/rules.d. - - Returns: dictionary with the keys representing names of files with udev - rules from /etc/udev/rules.d. Value of each key is a list of strings - representing uncommented lines read from the configuration file. If - the file is empty (e.g. because of masking udev configuration installed - by an RPM), an emptu list is returned as the respective value. - - An example return value: - { - "80-net-name-slot.rules": [] - } - """ - result = {} - - for file in glob.glob(f"{tree}/etc/udev/rules.d/*.rules"): - with open(file) as f: - lines = [] - for line in f: - line = line.strip() - if not line: - continue - if line[0] == "#": - continue - lines.append(line) - # include also empty files in the report - result[os.path.basename(file)] = lines - - return result - - -def _read_inifile_to_dict(config_path): - """ - Read INI file from the provided path - - Returns: a dictionary representing the provided INI file content. - - An example return value: - { - "google-cloud-sdk": { - "baseurl": "https://packages.cloud.google.com/yum/repos/cloud-sdk-el8-x86_64", - "enabled": "1", - "gpgcheck": "1", - "gpgkey": "https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg", - "name": "Google Cloud SDK", - "repo_gpgcheck": "0" - }, - "google-compute-engine": { - "baseurl": "https://packages.cloud.google.com/yum/repos/google-compute-engine-el8-x86_64-stable", - "enabled": "1", - "gpgcheck": "1", - "gpgkey": "https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg", - "name": "Google Compute Engine", - "repo_gpgcheck": "0" - } - } - """ - result = {} - - with contextlib.suppress(FileNotFoundError): - with open(config_path) as f: - parser = configparser.RawConfigParser() - # prevent conversion of the opion name to lowercase - parser.optionxform = lambda option: option - parser.read_file(f) - - for section in parser.sections(): - section_config = dict(parser.items(section)) - if section_config: - result[section] = section_config - - return result - - -def read_dnf_conf(tree): - """ - Read DNF configuration and defined variable files. - - Returns: dictionary with at most two keys 'dnf.conf' and 'vars'. - 'dnf.conf' value is a dictionary representing the DNF configuration - file content. - 'vars' value is a dictionary which keys represent names of files from - /etc/dnf/vars/ and values are strings representing the file content. - - An example return value: - { - "dnf.conf": { - "main": { - "installonly_limit": "3" - } - }, - "vars": { - "releasever": "8.4" - } - } - """ - result = {} - - dnf_config = _read_inifile_to_dict(f"{tree}/etc/dnf/dnf.conf") - if dnf_config: - result["dnf.conf"] = dnf_config - - dnf_vars = {} - for file in glob.glob(f"{tree}/etc/dnf/vars/*"): - with open(file) as f: - dnf_vars[os.path.basename(file)] = f.read().strip() - if dnf_vars: - result["vars"] = dnf_vars - - return result - - -def read_dnf_automatic_conf(tree): - """ - Read DNF Automatic configuation. - - Returns: dictionary as returned by '_read_inifile_to_dict()'. - - An example return value: - { - "base": { - "debuglevel": "1" - }, - "command_email": { - "email_from": "root@example.com", - "email_to": "root" - }, - "commands": { - "apply_updates": "yes", - "download_updates": "yes", - "network_online_timeout": "60", - "random_sleep": "0", - "upgrade_type": "security" - }, - "email": { - "email_from": "root@example.com", - "email_host": "localhost", - "email_to": "root" - }, - "emitters": { - "emit_via": "stdio" - } - } - """ - return _read_inifile_to_dict(f"{tree}/etc/dnf/automatic.conf") - - -def read_authselect_conf(tree): - """ - Read authselect configuration. - - Returns: dictionary with two keys 'profile-id' and 'enabled-features'. - 'profile-id' value is a string representing the configured authselect - profile. - 'enabled-features' value is a list of strings representing enabled features - of the used authselect profile. In case there are no specific features - enabled, the list is empty. - - An example return value: - { - "enabled-features": [], - "profile-id": "sssd" - } - """ - result = {} - - with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/etc/authselect/authselect.conf") as f: - # the first line is always the profile ID - # following lines are listing enabled features - # lines starting with '#' and empty lines are skipped - authselect_conf_lines = [] - for line in f: - line = line.strip() - if not line: - continue - if line[0] == "#": - continue - authselect_conf_lines.append(line) - if authselect_conf_lines: - result["profile-id"] = authselect_conf_lines[0] - result["enabled-features"] = authselect_conf_lines[1:] - - return result - - -def read_resolv_conf(tree): - """ - Read /etc/resolv.conf. - - Returns: a list of uncommented lines from the /etc/resolv.conf. - - An example return value: - [ - "search redhat.com", - "nameserver 192.168.1.1", - "nameserver 192.168.1.2" - ] - """ - result = [] - - with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/resolv.conf") as f: - for line in f: - line = line.strip() - if not line: - continue - if line[0] == "#": - continue - result.append(line) - - return result - - -def append_filesystem(report, tree, *, is_ostree=False): - if os.path.exists(f"{tree}/etc/os-release"): - report["packages"] = rpm_packages(tree) - if not is_ostree: - report["rpm-verify"] = rpm_verify(tree) - - not_installed_docs = rpm_not_installed_docs(tree) - if not_installed_docs: - report["rpm_not_installed_docs"] = not_installed_docs - - with open(f"{tree}/etc/os-release") as f: - report["os-release"] = parse_environment_vars(f.read()) - - report["services-enabled"] = read_services(tree, "enabled") - report["services-disabled"] = read_services(tree, "disabled") - - default_target = read_default_target(tree) - if default_target: - report["default-target"] = default_target - - with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/etc/hostname") as f: - report["hostname"] = f.read().strip() - - with contextlib.suppress(FileNotFoundError): - report["timezone"] = os.path.basename(os.readlink(f"{tree}/etc/localtime")) - - authselect_conf = read_authselect_conf(tree) - if authselect_conf: - report["authselect"] = authselect_conf - - chrony_conf = read_chrony_conf(tree) - if chrony_conf: - report["chrony"] = chrony_conf - - cloud_init_configs = read_cloud_init_configs(tree) - if cloud_init_configs: - report["cloud-init"] = cloud_init_configs - - container_images = read_container_images(tree) - if container_images: - report["container-images"] = container_images - - dnf_conf = read_dnf_conf(tree) - if dnf_conf: - report["dnf"] = dnf_conf - - dnf_automatic = read_dnf_automatic_conf(tree) - if dnf_automatic: - report["/etc/dnf/automatic.conf"] = dnf_automatic - - yum_repos = read_yum_repos(tree) - if yum_repos: - report["yum_repos"] = yum_repos - - dracut_configs = read_dracut_configs(tree) - if dracut_configs: - report["dracut"] = dracut_configs - - with contextlib.suppress(FileNotFoundError): - report["firewall-enabled"] = read_firewall_zone(tree) - - firewall_default_zone = read_firewall_default_zone(tree) - if firewall_default_zone: - report["firewall-default-zone"] = firewall_default_zone - - fstab = read_fstab(tree) - if fstab: - report["fstab"] = fstab - - hosts = read_hosts(tree) - if hosts: - report["hosts"] = hosts - - keyboard = read_keyboard_conf(tree) - if keyboard: - report["keyboard"] = keyboard - - security_limits_configs = read_security_limits_configs(tree) - if security_limits_configs: - report["security-limits"] = security_limits_configs - - locale = read_locale(tree) - if locale: - report["locale"] = locale - - logind_configs = read_logind_configs(tree) - if logind_configs: - report["systemd-logind"] = logind_configs - - with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/etc/machine-id") as f: - report["machine-id"] = f.readline() - - modprobe_configs = read_modprobe_configs(tree) - if modprobe_configs: - report["modprobe"] = modprobe_configs - - tmpfilesd_configs = read_tmpfilesd_configs(tree) - if tmpfilesd_configs: - report["tmpfiles.d"] = tmpfilesd_configs - - rhsm = read_rhsm(tree) - if rhsm: - report["rhsm"] = rhsm - - selinux = read_selinux_info(tree, is_ostree) - if selinux: - report["selinux"] = selinux - - ssh_configs = read_ssh_configs(tree) - if ssh_configs: - report["ssh_config"] = ssh_configs - - sshd_configs = read_sshd_configs(tree) - if sshd_configs: - report["sshd_config"] = sshd_configs - - sudoers_conf = read_sudoers(tree) - if sudoers_conf: - report["sudoers"] = sudoers_conf - - sysconfig = read_sysconfig(tree) - if sysconfig: - report["sysconfig"] = sysconfig - - sysctld_configs = read_sysctld_configs(tree) - if sysctld_configs: - report["sysctl.d"] = sysctld_configs - - systemd_service_dropins = read_systemd_service_dropins(tree) - if systemd_service_dropins: - report["systemd-service-dropins"] = systemd_service_dropins - - tuned_profile = read_tuned_profile(tree) - if tuned_profile: - report["tuned"] = tuned_profile - - resolv_conf = read_resolv_conf(tree) - # add even empty resolv_conf to the report to express that it is empty or non-existent - report["/etc/resolv.conf"] = resolv_conf - - udev_rules = read_udev_rules(tree) - if udev_rules: - report["/etc/udev/rules.d"] = udev_rules - - with open(f"{tree}/etc/passwd") as f: - report["passwd"] = sorted(f.read().strip().split("\n")) - - with open(f"{tree}/etc/group") as f: - report["groups"] = sorted(f.read().strip().split("\n")) - - if is_ostree: - with open(f"{tree}/usr/lib/passwd") as f: - report["passwd-system"] = sorted(f.read().strip().split("\n")) - - with open(f"{tree}/usr/lib/group") as f: - report["groups-system"] = sorted(f.read().strip().split("\n")) - - if os.path.exists(f"{tree}/boot") and len(os.listdir(f"{tree}/boot")) > 0: - assert "bootmenu" not in report - with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/boot/grub2/grubenv") as f: - report["boot-environment"] = parse_environment_vars(f.read()) - report["bootmenu"] = read_boot_entries(f"{tree}/boot") - - elif len(glob.glob(f"{tree}/vmlinuz-*")) > 0: - assert "bootmenu" not in report - with open(f"{tree}/grub2/grubenv") as f: - report["boot-environment"] = parse_environment_vars(f.read()) - report["bootmenu"] = read_boot_entries(tree) - elif len(glob.glob(f"{tree}/EFI")): - print("EFI partition", file=sys.stderr) - - -def volume_group_for_device(device: str) -> str: - # Find the volume group that belongs to the device specified via `parent` - vg_name = None - count = 0 - - cmd = [ - "pvdisplay", "-C", "--noheadings", "-o", "vg_name", device - ] - - while True: - res = subprocess.run(cmd, - check=False, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - encoding="UTF-8") - - if res.returncode == 5: - if count == 10: - raise RuntimeError("Could not find parent device") - time.sleep(1*count) - count += 1 - continue - - if res.returncode != 0: - raise RuntimeError(res.stderr.strip()) - - vg_name = res.stdout.strip() - if vg_name: - break - - return vg_name - - -def ensure_device_file(path: str, major: int, minor: int): - """Ensure the device file with the given major, minor exists""" - os.makedirs(os.path.dirname(path), exist_ok=True) - if not os.path.exists(path): - os.mknod(path, 0o600 | stat.S_IFBLK, os.makedev(major, minor)) - - -def discover_lvm(dev: str, parent: devices.Device, devmgr: devices.DeviceManager): - # find the volume group name for the device file - vg_name = volume_group_for_device(dev) - - # activating LVM is done OSBuild side. - # However we still have to get OSBuild the name of the VG to open - - # Find all logical volumes in the volume group - cmd = [ - "lvdisplay", "-C", "--noheadings", - "-o", "lv_name,path,lv_kernel_major,lv_kernel_minor", - "--separator", ";", - vg_name - ] - - res = subprocess.run(cmd, - check=False, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - encoding="UTF-8") - - if res.returncode != 0: - raise RuntimeError(res.stderr.strip()) - - data = res.stdout.strip() - parsed = list(map(lambda l: l.split(";"), data.split("\n"))) - volumes = OrderedDict() - - # devices_map stores for each device path onto the system the corresponding - # OSBuild's Device object - devices_map = {} - - for vol in parsed: - vol = list(map(lambda v: v.strip(), vol)) - assert len(vol) == 4 - name, _, _, _ = vol - - options = { - "volume": name, - } - - # Create an OSBuild device object for the LVM partition - info = index.get_module_info("Device", "org.osbuild.lvm2.lv") - device = devices.Device( - name, - info, - parent, - options) - if not info: - raise RuntimeError("can't find org.osbuild.lvm2.lv") - jsonschema.validate(options, info.get_schema()) - reply = devmgr.open(device) - voldev = reply["path"] # get the path where is mounted the device - minor = reply["node"]["minor"] - major = reply["node"]["major"] - - info = { - "device": voldev - } - ensure_device_file(voldev, int(major), int(minor)) - - read_partition(voldev, info) - volumes[name] = info - if name.startswith("root"): - volumes.move_to_end(name, last=False) - - # associate the device path with the Device object, we will need it to - # mount later on. - devices_map[voldev] = device - # get back both the device map and the result that'll go in the JSON report - return devices_map, { - "lvm": True, - "lvm.vg": vg_name, - "lvm.volumes": volumes - } - - -def partition_is_lvm(part: Dict) -> bool: - return part["type"].upper() in ["E6D6D379-F507-44C2-A23C-238F2A3DF928", "8E"] - - -def append_partitions(report, image): - partitions = report["partitions"] - with tempfile.TemporaryDirectory() as mountpoint: - with host.ServiceManager(monitor=monitor.NullMonitor(1)) as mgr: - - devmgr = devices.DeviceManager(mgr, "/dev", os.path.dirname(image)) - - # Device map associate a path onto where the device is mounted with its - # corresponding Device object. Mount will require both the path and the - # Device object in order to do its job. - devices_map = {} - filesystems = {} - for part in partitions: - start, size = part["start"], part["size"] - ret = loop_open( - devmgr, - part["partuuid"], - image, - size, - offset=start) - dev = ret["path"] - devices_map[dev] = ret["Device"] - read_partition(dev, part) - if partition_is_lvm(part): - dmap, lvm = discover_lvm(dev, ret["Device"], devmgr) - devices_map.update(dmap) - for vol in lvm["lvm.volumes"].values(): - if vol["fstype"]: - mntopts = [] - # we cannot recover since the underlying loopback device is mounted - # read-only but since we are using the it through the device mapper - # the fact might not be communicated and the kernel attempt a to - # a recovery of the filesystem, which will lead to a kernel panic - if vol["fstype"] in ("ext4", "ext3", "xfs"): - mntopts = ["norecovery"] - filesystems[vol["uuid"].upper()] = { - "device": vol["device"], - "mntops": mntopts - } - del vol["device"] - part.update(lvm) - elif part["uuid"] and part["fstype"]: - filesystems[part["uuid"].upper()] = { - "device": dev - } - - # find partition with fstab and read it - fstab = [] - for fs in filesystems.values(): - dev, opts = fs["device"], fs.get("mntops") - with mount(dev, opts) as tree: - if os.path.exists(f"{tree}/etc/fstab"): - fstab.extend(read_fstab(tree)) - break - # sort the fstab entries by the mountpoint - fstab = sorted(fstab, key=operator.itemgetter(1)) - - # mount all partitions to ther respective mount points - root_tree = "" - mmgr = mounts.MountManager(devmgr, mountpoint) - for n, fstab_entry in enumerate(fstab): - part_uuid = fstab_entry[0].split("=")[1].upper() - part_device = filesystems[part_uuid]["device"] - part_mountpoint = fstab_entry[1] - part_fstype = fstab_entry[2] - part_options = fstab_entry[3].split(",") - part_options += filesystems[part_uuid].get("mntops", []) - - if "ext4" in part_fstype: - info = index.get_module_info("Mount", "org.osbuild.ext4") - elif "vfat" in part_fstype: - info = index.get_module_info("Mount", "org.osbuild.fat") - elif "btrfs" in part_fstype: - info = index.get_module_info("Mount", "org.osbuild.btrfs") - elif "xfs" in part_fstype: - info = index.get_module_info("Mount", "org.osbuild.xfs") - else: - raise RuntimeError("Unknown file system") - if not info: - raise RuntimeError(f"Can't find org.osbuild.{part_fstype}") - - # the first mount point should be root - if n == 0: - if part_mountpoint != "/": - raise RuntimeError("The first mountpoint in sorted fstab entries is not '/'") - root_tree = mountpoint - - # prepare the options to mount the partition - options = {} - for option in part_options: - if option == "defaults": # defaults is not a supported option - continue - - if "=" in option: - parts = option.split("=") - key = parts[0] - val = parts[1] - - # uid and gid must be integers - if key == "uid" or key == "gid": - val = int(val) - - options[key] = val - else: - options[option] = True - - options["readonly"] = True - - # Validate the options - # - # The mount manager is taking care of opening the file system for us - # so we don't have access to the json objects that'll be used to - # invoke the mounter. However we're only interested at validating the - # options. We can extract these from the schame to validate them - # only. - jsonschema.validate(options, info.get_schema()["properties"]["options"]) - - # Finally mount - mnt_kwargs = { - "name": part_device, - "info": info, - # retrieves the associated Device Object - "device": devices_map[part_device], - "target": part_mountpoint, - "options": options - } - # XXX: remove inspect and just add "partition" above - # once osbuild PR#1501 is available everywhere - import inspect - if "partition" in inspect.getfullargspec(mounts.Mount).args: - mnt_kwargs["partition"] = None - mmgr.mount(mounts.Mount(**mnt_kwargs)) - - if not root_tree: - raise RuntimeError("The root filesystem tree is not mounted") - - append_filesystem(report, root_tree) - - -def analyse_image(image) -> Dict[str, Any]: - imgfmt = read_image_format(image) - report: Dict[str, Any] = {"image-format": imgfmt} - - with convert_image(image, imgfmt) as target: - size = os.stat(target).st_size - with host.ServiceManager(monitor=monitor.NullMonitor(1)) as mgr: - device = loop_open( - devices.DeviceManager(mgr, "/dev", os.path.dirname(target)), - os.path.basename(target), - target, - size, - offset=0)["path"] - report["bootloader"] = read_bootloader_type(device) - report.update(read_partition_table(device)) - if not report["partition-table"]: - # no partition table: mount device and treat it as a partition - with mount(device) as tree: - append_filesystem(report, tree) - return report - - # close loop device and descend into partitions on image file - append_partitions(report, target) - return report - - -def append_directory(report, tree): - with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir: - tree_ro = os.path.join(tmpdir, "root_ro") - os.makedirs(tree_ro) - # Make sure that the tools which analyse the directory in-place - # can not modify its content (e.g. create additional files). - # mount_at() always mounts the source as read-only! - with mount_at(tree, tree_ro, ["bind"]) as _: - if os.path.lexists(f"{tree}/ostree"): - os.makedirs(f"{tree}/etc", exist_ok=True) - with mount_at(f"{tree}/usr/etc", f"{tree}/etc", extra=["--bind"]): - append_filesystem(report, tree_ro, is_ostree=True) - else: - append_filesystem(report, tree_ro) - - -def append_ostree_repo(report, repo): - ostree = functools.partial(run_ostree, repo=repo) - - r = ostree("config", "get", "core.mode") - report["ostree"] = { - "repo": { - "core.mode": r.stdout.strip() - } - } - - r = ostree("refs") - refs = r.stdout.strip().split("\n") - report["ostree"]["refs"] = refs - - resolved = {r: ostree("rev-parse", r).stdout.strip() for r in refs} - commit = resolved[refs[0]] - - with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir: - tree = os.path.join(tmpdir, "tree") - ostree("checkout", "--force-copy", commit, tree) - append_directory(report, tree) - - -def analyse_directory(path): - report = {} - - if os.path.exists(os.path.join(path, "compose.json")): - report["type"] = "ostree/commit" - repo = os.path.join(path, "repo") - append_ostree_repo(report, repo) - elif os.path.isdir(os.path.join(path, "refs")): - report["type"] = "ostree/repo" - repo = os.path.join(path, "repo") - append_ostree_repo(report, repo) - else: - append_directory(report, path) - - return report - - -def is_tarball(path): - mtype, _ = mimetypes.guess_type(path) - return mtype == "application/x-tar" - - -def analyse_tarball(path): - with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir: - tree = os.path.join(tmpdir, "root") - os.makedirs(tree) - command = [ - "tar", - "--selinux", - "--xattrs", - "--acls", - "-x", - "--auto-compress", - "-f", path, - "-C", tree - ] - subprocess.run(command, - stdout=sys.stderr, - check=True) - # gce image type contains virtual raw disk inside a tarball - if os.path.isfile(f"{tree}/disk.raw"): - return analyse_image(f"{tree}/disk.raw") - else: - return analyse_directory(tree) - - -def is_compressed(path): - _, encoding = mimetypes.guess_type(path) - return encoding in ["xz", "gzip", "bzip2"] - - -def analyse_compressed(path): - _, encoding = mimetypes.guess_type(path) - - if encoding == "xz": - command = ["unxz", "--force"] - elif encoding == "gzip": - command = ["gunzip", "--force"] - elif encoding == "bzip2": - command = ["bunzip2", "--force"] - else: - raise ValueError(f"Unsupported compression: {encoding}") - - with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir: - subprocess.run(["cp", "--reflink=auto", "-a", path, tmpdir], - check=True) - - files = os.listdir(tmpdir) - archive = os.path.join(tmpdir, files[0]) - subprocess.run(command + [archive], check=True) - - files = os.listdir(tmpdir) - assert len(files) == 1 - image = os.path.join(tmpdir, files[0]) - return analyse_image(image) - - -def is_iso(path): - return "iso" in pathlib.Path(path).suffix - - -def analyse_iso(path): - with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp: - report = None - subprocess.run(["mount", "-o", "loop", path, tmp], check=True) - try: - report = analyse_tarball(os.path.join(tmp, "liveimg.tar.gz")) - except Exception as e: - print(f"{e}", file=sys.stderr) - subprocess.run(["umount", tmp], check=True) - return report - - -def main(): - parser = argparse.ArgumentParser(description="Inspect an image") - parser.add_argument("target", metavar="TARGET", - help="The file or directory to analyse", - type=os.path.abspath) - - args = parser.parse_args() - target = args.target - - if os.path.isdir(target): - report = analyse_directory(target) - elif is_tarball(target): - report = analyse_tarball(target) - elif is_compressed(target): - report = analyse_compressed(target) - elif is_iso(target): - report = analyse_iso(target) - else: - report = analyse_image(target) - - json.dump(report, sys.stdout, sort_keys=True, indent=2) - - -if __name__ == "__main__": - main()