From ac83e4541c3ae750e02ad9c367ed4099511c49e2 Mon Sep 17 00:00:00 2001 From: Achilleas Koutsou Date: Mon, 6 Jan 2025 16:42:59 +0100 Subject: [PATCH] tools/osbuild-image-info: code quality and style improvements Run isort for imports. Pylint: wrong-import-order / C0411 Solves the following linter warnings: - standard import "pathlib" should be placed before third party import "yaml" - standard import "collections.OrderedDict" should be placed before third party imports "yaml", "jsonschema" - standard import "typing.Dict" should be placed before third party imports "yaml", "jsonschema" Fix default arg values. Pylint: dangerous-default-value / W0102 - Using mutable default values ([]) for function arguments is considered dangerous. Rename format variable. Pylint: redefined-builtin / W0622 - 'format' is a built-in function. Use f-strings instead of formatting where possible. Pylint: consider-using-f-string / C0209 Remove unnecessary else after returns. Pylint: no-else-return / R1705 Remove unnecessary else after continue. Pylint: no-else-continue / R1724 Set the encoding (utf-8) for all calls to open(). Pylint: unspecified-encoding / W1514 Disable the too-many-branches and too-many-statements warnings for append_partitions() and append_filesystem(). We can refactor the functions to make them smaller later, but for now we're addressing only the simpler issues. Initialise with dict literal instead of call to function. Pylint: use-dict-literal / R1735 Use implicit truthiness for glob instead of len(). Pylint: use-implicit-booleaness-not-len / C1802 Rename ambiguous variable 'l' to 'line'. pycodestyle: ambiguous-variable-name (E741) Merge comparisons with 'in'. Pylint: consider-using-in / R1714 --- tools/osbuild-image-info | 141 ++++++++++++++++++++------------------- 1 file changed, 73 insertions(+), 68 deletions(-) diff --git a/tools/osbuild-image-info b/tools/osbuild-image-info index 7cb9aaf4..ee4e70f1 100755 --- a/tools/osbuild-image-info +++ b/tools/osbuild-image-info @@ -5,26 +5,26 @@ import configparser import contextlib import functools import glob +import json import mimetypes import operator -import json import os +import pathlib import platform import re import stat import subprocess import sys -import time import tempfile +import time import xml.etree.ElementTree -import yaml -import pathlib -import jsonschema - from collections import OrderedDict -from typing import Dict, Any +from typing import Any, Dict -from osbuild import devices, host, mounts, meta, monitor +import jsonschema +import yaml + +from osbuild import devices, host, meta, monitor, mounts index = meta.Index("/usr/lib/osbuild/") SECTOR_SIZE = 512 @@ -93,7 +93,11 @@ def convert_image(image, fmt): @contextlib.contextmanager -def mount_at(device, mountpoint, options=[], extra=[]): +def mount_at(device, mountpoint, options=None, extra=None): + if options is None: + options = [] + if extra is None: + extra = [] opts = ",".join(["ro"] + options) subprocess.run(["mount", "-o", opts] + extra + [device, mountpoint], check=True) try: @@ -209,9 +213,9 @@ def read_image_format(device) -> Dict[str, str]: } """ qemu = subprocess_check_output(["qemu-img", "info", "--output=json", device], json.loads) - format = qemu["format"] - result = {"type": format} - if format == "qcow2": + image_format = qemu["format"] + result = {"type": image_format} + if image_format == "qcow2": result["compat"] = qemu["format-specific"]["data"]["compat"] return result @@ -299,7 +303,7 @@ def read_partition_table(device): # 'sfdisk' prefixes the partition id with '0x' but # 'blkid' does not; remove it to mimic 'blkid' table_id = ptable['id'][2:] - partuuid = "%.33s-%02x" % (table_id, i + 1) + partuuid = f"{table_id:.33s}-{i + 1:02x}" partitions.append({ "bootable": p.get("bootable", False), @@ -327,8 +331,7 @@ def read_bootloader_type(device) -> str: with open(device, "rb") as f: if b"GRUB" in f.read(SECTOR_SIZE): return "grub" - else: - return "unknown" + return "unknown" def read_boot_entries(boot_dir): @@ -365,8 +368,8 @@ def read_boot_entries(boot_dir): """ entries = [] for conf in glob.glob(f"{boot_dir}/loader/entries/*.conf"): - with open(conf) as f: - entry = dict() + with open(conf, encoding="utf-8") as f: + entry = {} for line in f: line = line.strip() if not line or line.startswith("#"): @@ -613,7 +616,7 @@ def read_firewall_default_zone(tree): "trusted" """ try: - with open(f"{tree}/etc/firewalld/firewalld.conf") as f: + with open(f"{tree}/etc/firewalld/firewalld.conf", encoding="utf-8") as f: conf = parse_environment_vars(f.read()) return conf["DefaultZone"] except FileNotFoundError: @@ -672,7 +675,7 @@ def read_fstab(tree): """ result = [] with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/etc/fstab") as f: + with open(f"{tree}/etc/fstab", encoding="utf-8") as f: result = sorted([line.split() for line in f if line.strip() and not line.startswith("#")]) return result @@ -754,7 +757,7 @@ def read_rhsm(tree): for plugin_name, plugin_path in dnf_plugins_config.items(): with contextlib.suppress(FileNotFoundError): - with open(plugin_path) as f: + with open(plugin_path, encoding="utf-8") as f: parser = configparser.ConfigParser() parser.read_file(f) # only read "enabled" option from "main" section @@ -776,7 +779,7 @@ def read_rhsm(tree): with contextlib.suppress(FileNotFoundError): rhsm_conf = {} - with open(f"{tree}/etc/rhsm/rhsm.conf") as f: + with open(f"{tree}/etc/rhsm/rhsm.conf", encoding="utf-8") as f: parser = configparser.ConfigParser() parser.read_file(f) for section in parser.sections(): @@ -855,7 +858,7 @@ def read_sysconfig(tree): # iterate through supported configs for name, path in sysconfig_paths.items(): with contextlib.suppress(FileNotFoundError): - with open(path) as f: + with open(path, encoding="utf-8") as f: # if file exists start with empty array of values result[name] = parse_environment_vars(f.read()) @@ -864,7 +867,7 @@ def read_sysconfig(tree): files = glob.glob(f"{tree}/etc/sysconfig/network-scripts/ifcfg-*") for file in files: ifname = os.path.basename(file).lstrip("ifcfg-") - with open(file) as f: + with open(file, encoding="utf-8") as f: network_scripts[ifname] = parse_environment_vars(f.read()) if network_scripts: @@ -889,7 +892,7 @@ def read_hosts(tree): result = [] with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/etc/hosts") as f: + with open(f"{tree}/etc/hosts", encoding="utf-8") as f: for line in f: line = line.strip() if line: @@ -912,7 +915,7 @@ def read_logind_config(config_path): """ result = {} - with open(config_path) as f: + with open(config_path, encoding="utf-8") as f: parser = configparser.RawConfigParser() # prevent conversion of the option name to lowercase parser.optionxform = lambda option: option @@ -964,7 +967,7 @@ def read_locale(tree): } """ with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/etc/locale.conf") as f: + with open(f"{tree}/etc/locale.conf", encoding="utf-8") as f: return parse_environment_vars(f.read()) @@ -1026,7 +1029,7 @@ def read_selinux_conf(tree): } """ with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/etc/selinux/config") as f: + with open(f"{tree}/etc/selinux/config", encoding="utf-8") as f: return parse_environment_vars(f.read()) @@ -1178,7 +1181,7 @@ def read_modprobe_config(config_path): BLACKLIST_CMD = "blacklist" - with open(config_path) as f: + with open(config_path, encoding="utf-8") as f: # The format of files under modprobe.d: one command per line, # with blank lines and lines starting with '#' ignored. # A '\' at the end of a line causes it to continue on the next line. @@ -1195,16 +1198,16 @@ def read_modprobe_config(config_path): line_to_be_continued += line[:-1] continue # this line ends here - else: - # is this line continuation of the previous one? - if line_to_be_continued: - line = line_to_be_continued + line - line_to_be_continued = "" - cmd, cmd_args = line.split(' ', 1) - # we care only about blacklist command for now - if cmd == BLACKLIST_CMD: - modules_list = file_result[BLACKLIST_CMD] = [] - modules_list.append(cmd_args) + + # is this line continuation of the previous one? + if line_to_be_continued: + line = line_to_be_continued + line + line_to_be_continued = "" + cmd, cmd_args = line.split(' ', 1) + # we care only about blacklist command for now + if cmd == BLACKLIST_CMD: + modules_list = file_result[BLACKLIST_CMD] = [] + modules_list.append(cmd_args) return file_result @@ -1335,7 +1338,7 @@ def read_cloud_init_config(config_path): result = {} with contextlib.suppress(FileNotFoundError): - with open(config_path) as f: + with open(config_path, encoding="utf-8") as f: config = yaml.safe_load(f) result.update(config) @@ -1387,7 +1390,7 @@ def read_dracut_config(config_path): """ result = {} - with open(config_path) as f: + with open(config_path, encoding="utf-8") as f: # dracut configuration key/values delimiter is '=' or '+=' for line in f: line = line.strip() @@ -1462,7 +1465,7 @@ def read_keyboard_conf(tree): # read virtual console configuration with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/etc/vconsole.conf") as f: + with open(f"{tree}/etc/vconsole.conf", encoding="utf-8") as f: values = parse_environment_vars(f.read()) if values: result["vconsole"] = values @@ -1482,7 +1485,7 @@ def read_keyboard_conf(tree): "layout": r'Section\s+"InputClass"\s+.*Option\s+"XkbLayout"\s+"([\w,-]+)"\s+.*EndSection', "variant": r'Section\s+"InputClass"\s+.*Option\s+"XkbVariant"\s+"([\w,-]+)"\s+.*EndSection' } - with open(f"{tree}/etc/X11/xorg.conf.d/00-keyboard.conf") as f: + with open(f"{tree}/etc/X11/xorg.conf.d/00-keyboard.conf", encoding="utf-8") as f: config = f.read() for option, pattern in match_options_dict.items(): match = re.search(pattern, config, re.DOTALL) @@ -1526,7 +1529,7 @@ def read_chrony_conf(tree): parsed_directives = ["server", "pool", "peer", "leapsectz"] with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/etc/chrony.conf") as f: + with open(f"{tree}/etc/chrony.conf", encoding="utf-8") as f: for line in f: line = line.strip() if not line: @@ -1615,7 +1618,7 @@ def read_config_file_no_comment(config_path): """ file_lines = [] - with open(config_path) as f: + with open(config_path, encoding="utf-8") as f: for line in f: line = line.strip() if not line: @@ -1677,7 +1680,7 @@ def read_tuned_profile(tree): with contextlib.suppress(FileNotFoundError): for config_file in config_files: - with open(f"{tree}/etc/tuned/{config_file}") as f: + with open(f"{tree}/etc/tuned/{config_file}", encoding="utf-8") as f: value = f.read() value = value.strip() if value: @@ -1701,7 +1704,7 @@ def read_sysctld_config(config_path): """ values = [] - with open(config_path) as f: + with open(config_path, encoding="utf-8") as f: for line in f: line = line.strip() if not line: @@ -1770,7 +1773,7 @@ def read_security_limits_config(config_path): """ values = [] - with open(config_path) as f: + with open(config_path, encoding="utf-8") as f: for line in f: line = line.strip() # the '#' character introduces a comment - after which the rest of the line is ignored @@ -1851,7 +1854,7 @@ def read_ssh_config(config_path): """ config_lines = [] - with open(config_path) as f: + with open(config_path, encoding="utf-8") as f: for line in f: line = line.strip() if not line: @@ -2038,14 +2041,14 @@ def read_sudoers(tree): return lines with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/etc/sudoers") as f: + with open(f"{tree}/etc/sudoers", encoding="utf-8") as f: lines = _parse_sudoers_file(f) if lines: result["/etc/sudoers"] = lines sudoersd_result = {} for file in glob.glob(f"{tree}/etc/sudoers.d/*"): - with open(file) as f: + with open(file, encoding="utf-8") as f: lines = _parse_sudoers_file(f) if lines: result[os.path.basename(file)] = lines @@ -2073,7 +2076,7 @@ def read_udev_rules(tree): result = {} for file in glob.glob(f"{tree}/etc/udev/rules.d/*.rules"): - with open(file) as f: + with open(file, encoding="utf-8") as f: lines = [] for line in f: line = line.strip() @@ -2161,7 +2164,7 @@ def read_dnf_conf(tree): dnf_vars = {} for file in glob.glob(f"{tree}/etc/dnf/vars/*"): - with open(file) as f: + with open(file, encoding="utf-8") as f: dnf_vars[os.path.basename(file)] = f.read().strip() if dnf_vars: result["vars"] = dnf_vars @@ -2224,7 +2227,7 @@ def read_authselect_conf(tree): result = {} with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/etc/authselect/authselect.conf") as f: + with open(f"{tree}/etc/authselect/authselect.conf", encoding="utf-8") as f: # the first line is always the profile ID # following lines are listing enabled features # lines starting with '#' and empty lines are skipped @@ -2259,7 +2262,7 @@ def read_resolv_conf(tree): result = [] with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/resolv.conf") as f: + with open(f"{tree}/resolv.conf", encoding="utf-8") as f: for line in f: line = line.strip() if not line: @@ -2271,6 +2274,7 @@ def read_resolv_conf(tree): return result +# pylint: disable=too-many-branches disable=too-many-statements def append_filesystem(report, tree, *, is_ostree=False): if os.path.exists(f"{tree}/etc/os-release"): report["packages"] = rpm_packages(tree) @@ -2281,7 +2285,7 @@ def append_filesystem(report, tree, *, is_ostree=False): if not_installed_docs: report["rpm_not_installed_docs"] = not_installed_docs - with open(f"{tree}/etc/os-release") as f: + with open(f"{tree}/etc/os-release", encoding="utf-8") as f: report["os-release"] = parse_environment_vars(f.read()) report["services-enabled"] = read_services(tree, "enabled") @@ -2292,7 +2296,7 @@ def append_filesystem(report, tree, *, is_ostree=False): report["default-target"] = default_target with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/etc/hostname") as f: + with open(f"{tree}/etc/hostname", encoding="utf-8") as f: report["hostname"] = f.read().strip() with contextlib.suppress(FileNotFoundError): @@ -2362,7 +2366,7 @@ def append_filesystem(report, tree, *, is_ostree=False): report["systemd-logind"] = logind_configs with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/etc/machine-id") as f: + with open(f"{tree}/etc/machine-id", encoding="utf-8") as f: report["machine-id"] = f.readline() modprobe_configs = read_modprobe_configs(tree) @@ -2417,32 +2421,32 @@ def append_filesystem(report, tree, *, is_ostree=False): if udev_rules: report["/etc/udev/rules.d"] = udev_rules - with open(f"{tree}/etc/passwd") as f: + with open(f"{tree}/etc/passwd", encoding="utf-8") as f: report["passwd"] = sorted(f.read().strip().split("\n")) - with open(f"{tree}/etc/group") as f: + with open(f"{tree}/etc/group", encoding="utf-8") as f: report["groups"] = sorted(f.read().strip().split("\n")) if is_ostree: - with open(f"{tree}/usr/lib/passwd") as f: + with open(f"{tree}/usr/lib/passwd", encoding="utf-8") as f: report["passwd-system"] = sorted(f.read().strip().split("\n")) - with open(f"{tree}/usr/lib/group") as f: + with open(f"{tree}/usr/lib/group", encoding="utf-8") as f: report["groups-system"] = sorted(f.read().strip().split("\n")) if os.path.exists(f"{tree}/boot") and len(os.listdir(f"{tree}/boot")) > 0: assert "bootmenu" not in report with contextlib.suppress(FileNotFoundError): - with open(f"{tree}/boot/grub2/grubenv") as f: + with open(f"{tree}/boot/grub2/grubenv", encoding="utf-8") as f: report["boot-environment"] = parse_environment_vars(f.read()) report["bootmenu"] = read_boot_entries(f"{tree}/boot") elif len(glob.glob(f"{tree}/vmlinuz-*")) > 0: assert "bootmenu" not in report - with open(f"{tree}/grub2/grubenv") as f: + with open(f"{tree}/grub2/grubenv", encoding="utf-8") as f: report["boot-environment"] = parse_environment_vars(f.read()) report["bootmenu"] = read_boot_entries(tree) - elif len(glob.glob(f"{tree}/EFI")): + elif glob.glob(f"{tree}/EFI"): print("EFI partition", file=sys.stderr) @@ -2511,7 +2515,7 @@ def discover_lvm(dev: str, parent: devices.Device, devmgr: devices.DeviceManager raise RuntimeError(res.stderr.strip()) data = res.stdout.strip() - parsed = list(map(lambda l: l.split(";"), data.split("\n"))) + parsed = list(map(lambda line: line.split(";"), data.split("\n"))) volumes = OrderedDict() # devices_map stores for each device path onto the system the corresponding @@ -2567,6 +2571,7 @@ def partition_is_lvm(part: Dict) -> bool: return part["type"].upper() in ["E6D6D379-F507-44C2-A23C-238F2A3DF928", "8E"] +# pylint: disable=too-many-branches disable=too-many-statements def append_partitions(report, image): partitions = report["partitions"] with tempfile.TemporaryDirectory() as mountpoint: @@ -2666,7 +2671,7 @@ def append_partitions(report, image): val = parts[1] # uid and gid must be integers - if key == "uid" or key == "gid": + if key in ("uid", "gid"): val = int(val) options[key] = val @@ -2812,8 +2817,8 @@ def analyse_tarball(path): # gce image type contains virtual raw disk inside a tarball if os.path.isfile(f"{tree}/disk.raw"): return analyse_image(f"{tree}/disk.raw") - else: - return analyse_directory(tree) + + return analyse_directory(tree) def is_compressed(path):