tools/osbuild-image-info: code quality and style improvements

Run isort for imports.
Pylint: wrong-import-order / C0411
Solves the following linter warnings:
- standard import "pathlib" should be placed before third party import
  "yaml"
- standard import "collections.OrderedDict" should be placed before
  third party imports "yaml", "jsonschema"
- standard import "typing.Dict" should be placed before third party
  imports "yaml", "jsonschema"

Fix default arg values.
Pylint: dangerous-default-value / W0102
- Using mutable default values ([]) for function arguments is considered
  dangerous.

Rename format variable.
Pylint: redefined-builtin / W0622
- 'format' is a built-in function.

Use f-strings instead of formatting where possible.
Pylint: consider-using-f-string / C0209

Remove unnecessary else after returns.
Pylint: no-else-return / R1705

Remove unnecessary else after continue.
Pylint: no-else-continue / R1724

Set the encoding (utf-8) for all calls to open().
Pylint: unspecified-encoding / W1514

Disable the too-many-branches and too-many-statements warnings for
append_partitions() and append_filesystem().  We can refactor the
functions to make them smaller later, but for now we're addressing only
the simpler issues.

Initialise with dict literal instead of call to function.
Pylint: use-dict-literal / R1735

Use implicit truthiness for glob instead of len().
Pylint: use-implicit-booleaness-not-len / C1802

Rename ambiguous variable 'l' to 'line'.
pycodestyle: ambiguous-variable-name (E741)

Merge comparisons with 'in'.
Pylint: consider-using-in / R1714
This commit is contained in:
Achilleas Koutsou 2025-01-06 16:42:59 +01:00 committed by Tomáš Hozza
parent fd19ab41fb
commit ac83e4541c

View file

@ -5,26 +5,26 @@ import configparser
import contextlib
import functools
import glob
import json
import mimetypes
import operator
import json
import os
import pathlib
import platform
import re
import stat
import subprocess
import sys
import time
import tempfile
import time
import xml.etree.ElementTree
import yaml
import pathlib
import jsonschema
from collections import OrderedDict
from typing import Dict, Any
from typing import Any, Dict
from osbuild import devices, host, mounts, meta, monitor
import jsonschema
import yaml
from osbuild import devices, host, meta, monitor, mounts
index = meta.Index("/usr/lib/osbuild/")
SECTOR_SIZE = 512
@ -93,7 +93,11 @@ def convert_image(image, fmt):
@contextlib.contextmanager
def mount_at(device, mountpoint, options=[], extra=[]):
def mount_at(device, mountpoint, options=None, extra=None):
if options is None:
options = []
if extra is None:
extra = []
opts = ",".join(["ro"] + options)
subprocess.run(["mount", "-o", opts] + extra + [device, mountpoint], check=True)
try:
@ -209,9 +213,9 @@ def read_image_format(device) -> Dict[str, str]:
}
"""
qemu = subprocess_check_output(["qemu-img", "info", "--output=json", device], json.loads)
format = qemu["format"]
result = {"type": format}
if format == "qcow2":
image_format = qemu["format"]
result = {"type": image_format}
if image_format == "qcow2":
result["compat"] = qemu["format-specific"]["data"]["compat"]
return result
@ -299,7 +303,7 @@ def read_partition_table(device):
# 'sfdisk' prefixes the partition id with '0x' but
# 'blkid' does not; remove it to mimic 'blkid'
table_id = ptable['id'][2:]
partuuid = "%.33s-%02x" % (table_id, i + 1)
partuuid = f"{table_id:.33s}-{i + 1:02x}"
partitions.append({
"bootable": p.get("bootable", False),
@ -327,8 +331,7 @@ def read_bootloader_type(device) -> str:
with open(device, "rb") as f:
if b"GRUB" in f.read(SECTOR_SIZE):
return "grub"
else:
return "unknown"
return "unknown"
def read_boot_entries(boot_dir):
@ -365,8 +368,8 @@ def read_boot_entries(boot_dir):
"""
entries = []
for conf in glob.glob(f"{boot_dir}/loader/entries/*.conf"):
with open(conf) as f:
entry = dict()
with open(conf, encoding="utf-8") as f:
entry = {}
for line in f:
line = line.strip()
if not line or line.startswith("#"):
@ -613,7 +616,7 @@ def read_firewall_default_zone(tree):
"trusted"
"""
try:
with open(f"{tree}/etc/firewalld/firewalld.conf") as f:
with open(f"{tree}/etc/firewalld/firewalld.conf", encoding="utf-8") as f:
conf = parse_environment_vars(f.read())
return conf["DefaultZone"]
except FileNotFoundError:
@ -672,7 +675,7 @@ def read_fstab(tree):
"""
result = []
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/etc/fstab") as f:
with open(f"{tree}/etc/fstab", encoding="utf-8") as f:
result = sorted([line.split() for line in f if line.strip() and not line.startswith("#")])
return result
@ -754,7 +757,7 @@ def read_rhsm(tree):
for plugin_name, plugin_path in dnf_plugins_config.items():
with contextlib.suppress(FileNotFoundError):
with open(plugin_path) as f:
with open(plugin_path, encoding="utf-8") as f:
parser = configparser.ConfigParser()
parser.read_file(f)
# only read "enabled" option from "main" section
@ -776,7 +779,7 @@ def read_rhsm(tree):
with contextlib.suppress(FileNotFoundError):
rhsm_conf = {}
with open(f"{tree}/etc/rhsm/rhsm.conf") as f:
with open(f"{tree}/etc/rhsm/rhsm.conf", encoding="utf-8") as f:
parser = configparser.ConfigParser()
parser.read_file(f)
for section in parser.sections():
@ -855,7 +858,7 @@ def read_sysconfig(tree):
# iterate through supported configs
for name, path in sysconfig_paths.items():
with contextlib.suppress(FileNotFoundError):
with open(path) as f:
with open(path, encoding="utf-8") as f:
# if file exists start with empty array of values
result[name] = parse_environment_vars(f.read())
@ -864,7 +867,7 @@ def read_sysconfig(tree):
files = glob.glob(f"{tree}/etc/sysconfig/network-scripts/ifcfg-*")
for file in files:
ifname = os.path.basename(file).lstrip("ifcfg-")
with open(file) as f:
with open(file, encoding="utf-8") as f:
network_scripts[ifname] = parse_environment_vars(f.read())
if network_scripts:
@ -889,7 +892,7 @@ def read_hosts(tree):
result = []
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/etc/hosts") as f:
with open(f"{tree}/etc/hosts", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
@ -912,7 +915,7 @@ def read_logind_config(config_path):
"""
result = {}
with open(config_path) as f:
with open(config_path, encoding="utf-8") as f:
parser = configparser.RawConfigParser()
# prevent conversion of the option name to lowercase
parser.optionxform = lambda option: option
@ -964,7 +967,7 @@ def read_locale(tree):
}
"""
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/etc/locale.conf") as f:
with open(f"{tree}/etc/locale.conf", encoding="utf-8") as f:
return parse_environment_vars(f.read())
@ -1026,7 +1029,7 @@ def read_selinux_conf(tree):
}
"""
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/etc/selinux/config") as f:
with open(f"{tree}/etc/selinux/config", encoding="utf-8") as f:
return parse_environment_vars(f.read())
@ -1178,7 +1181,7 @@ def read_modprobe_config(config_path):
BLACKLIST_CMD = "blacklist"
with open(config_path) as f:
with open(config_path, encoding="utf-8") as f:
# The format of files under modprobe.d: one command per line,
# with blank lines and lines starting with '#' ignored.
# A '\' at the end of a line causes it to continue on the next line.
@ -1195,16 +1198,16 @@ def read_modprobe_config(config_path):
line_to_be_continued += line[:-1]
continue
# this line ends here
else:
# is this line continuation of the previous one?
if line_to_be_continued:
line = line_to_be_continued + line
line_to_be_continued = ""
cmd, cmd_args = line.split(' ', 1)
# we care only about blacklist command for now
if cmd == BLACKLIST_CMD:
modules_list = file_result[BLACKLIST_CMD] = []
modules_list.append(cmd_args)
# is this line continuation of the previous one?
if line_to_be_continued:
line = line_to_be_continued + line
line_to_be_continued = ""
cmd, cmd_args = line.split(' ', 1)
# we care only about blacklist command for now
if cmd == BLACKLIST_CMD:
modules_list = file_result[BLACKLIST_CMD] = []
modules_list.append(cmd_args)
return file_result
@ -1335,7 +1338,7 @@ def read_cloud_init_config(config_path):
result = {}
with contextlib.suppress(FileNotFoundError):
with open(config_path) as f:
with open(config_path, encoding="utf-8") as f:
config = yaml.safe_load(f)
result.update(config)
@ -1387,7 +1390,7 @@ def read_dracut_config(config_path):
"""
result = {}
with open(config_path) as f:
with open(config_path, encoding="utf-8") as f:
# dracut configuration key/values delimiter is '=' or '+='
for line in f:
line = line.strip()
@ -1462,7 +1465,7 @@ def read_keyboard_conf(tree):
# read virtual console configuration
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/etc/vconsole.conf") as f:
with open(f"{tree}/etc/vconsole.conf", encoding="utf-8") as f:
values = parse_environment_vars(f.read())
if values:
result["vconsole"] = values
@ -1482,7 +1485,7 @@ def read_keyboard_conf(tree):
"layout": r'Section\s+"InputClass"\s+.*Option\s+"XkbLayout"\s+"([\w,-]+)"\s+.*EndSection',
"variant": r'Section\s+"InputClass"\s+.*Option\s+"XkbVariant"\s+"([\w,-]+)"\s+.*EndSection'
}
with open(f"{tree}/etc/X11/xorg.conf.d/00-keyboard.conf") as f:
with open(f"{tree}/etc/X11/xorg.conf.d/00-keyboard.conf", encoding="utf-8") as f:
config = f.read()
for option, pattern in match_options_dict.items():
match = re.search(pattern, config, re.DOTALL)
@ -1526,7 +1529,7 @@ def read_chrony_conf(tree):
parsed_directives = ["server", "pool", "peer", "leapsectz"]
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/etc/chrony.conf") as f:
with open(f"{tree}/etc/chrony.conf", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
@ -1615,7 +1618,7 @@ def read_config_file_no_comment(config_path):
"""
file_lines = []
with open(config_path) as f:
with open(config_path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
@ -1677,7 +1680,7 @@ def read_tuned_profile(tree):
with contextlib.suppress(FileNotFoundError):
for config_file in config_files:
with open(f"{tree}/etc/tuned/{config_file}") as f:
with open(f"{tree}/etc/tuned/{config_file}", encoding="utf-8") as f:
value = f.read()
value = value.strip()
if value:
@ -1701,7 +1704,7 @@ def read_sysctld_config(config_path):
"""
values = []
with open(config_path) as f:
with open(config_path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
@ -1770,7 +1773,7 @@ def read_security_limits_config(config_path):
"""
values = []
with open(config_path) as f:
with open(config_path, encoding="utf-8") as f:
for line in f:
line = line.strip()
# the '#' character introduces a comment - after which the rest of the line is ignored
@ -1851,7 +1854,7 @@ def read_ssh_config(config_path):
"""
config_lines = []
with open(config_path) as f:
with open(config_path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
@ -2038,14 +2041,14 @@ def read_sudoers(tree):
return lines
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/etc/sudoers") as f:
with open(f"{tree}/etc/sudoers", encoding="utf-8") as f:
lines = _parse_sudoers_file(f)
if lines:
result["/etc/sudoers"] = lines
sudoersd_result = {}
for file in glob.glob(f"{tree}/etc/sudoers.d/*"):
with open(file) as f:
with open(file, encoding="utf-8") as f:
lines = _parse_sudoers_file(f)
if lines:
result[os.path.basename(file)] = lines
@ -2073,7 +2076,7 @@ def read_udev_rules(tree):
result = {}
for file in glob.glob(f"{tree}/etc/udev/rules.d/*.rules"):
with open(file) as f:
with open(file, encoding="utf-8") as f:
lines = []
for line in f:
line = line.strip()
@ -2161,7 +2164,7 @@ def read_dnf_conf(tree):
dnf_vars = {}
for file in glob.glob(f"{tree}/etc/dnf/vars/*"):
with open(file) as f:
with open(file, encoding="utf-8") as f:
dnf_vars[os.path.basename(file)] = f.read().strip()
if dnf_vars:
result["vars"] = dnf_vars
@ -2224,7 +2227,7 @@ def read_authselect_conf(tree):
result = {}
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/etc/authselect/authselect.conf") as f:
with open(f"{tree}/etc/authselect/authselect.conf", encoding="utf-8") as f:
# the first line is always the profile ID
# following lines are listing enabled features
# lines starting with '#' and empty lines are skipped
@ -2259,7 +2262,7 @@ def read_resolv_conf(tree):
result = []
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/resolv.conf") as f:
with open(f"{tree}/resolv.conf", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
@ -2271,6 +2274,7 @@ def read_resolv_conf(tree):
return result
# pylint: disable=too-many-branches disable=too-many-statements
def append_filesystem(report, tree, *, is_ostree=False):
if os.path.exists(f"{tree}/etc/os-release"):
report["packages"] = rpm_packages(tree)
@ -2281,7 +2285,7 @@ def append_filesystem(report, tree, *, is_ostree=False):
if not_installed_docs:
report["rpm_not_installed_docs"] = not_installed_docs
with open(f"{tree}/etc/os-release") as f:
with open(f"{tree}/etc/os-release", encoding="utf-8") as f:
report["os-release"] = parse_environment_vars(f.read())
report["services-enabled"] = read_services(tree, "enabled")
@ -2292,7 +2296,7 @@ def append_filesystem(report, tree, *, is_ostree=False):
report["default-target"] = default_target
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/etc/hostname") as f:
with open(f"{tree}/etc/hostname", encoding="utf-8") as f:
report["hostname"] = f.read().strip()
with contextlib.suppress(FileNotFoundError):
@ -2362,7 +2366,7 @@ def append_filesystem(report, tree, *, is_ostree=False):
report["systemd-logind"] = logind_configs
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/etc/machine-id") as f:
with open(f"{tree}/etc/machine-id", encoding="utf-8") as f:
report["machine-id"] = f.readline()
modprobe_configs = read_modprobe_configs(tree)
@ -2417,32 +2421,32 @@ def append_filesystem(report, tree, *, is_ostree=False):
if udev_rules:
report["/etc/udev/rules.d"] = udev_rules
with open(f"{tree}/etc/passwd") as f:
with open(f"{tree}/etc/passwd", encoding="utf-8") as f:
report["passwd"] = sorted(f.read().strip().split("\n"))
with open(f"{tree}/etc/group") as f:
with open(f"{tree}/etc/group", encoding="utf-8") as f:
report["groups"] = sorted(f.read().strip().split("\n"))
if is_ostree:
with open(f"{tree}/usr/lib/passwd") as f:
with open(f"{tree}/usr/lib/passwd", encoding="utf-8") as f:
report["passwd-system"] = sorted(f.read().strip().split("\n"))
with open(f"{tree}/usr/lib/group") as f:
with open(f"{tree}/usr/lib/group", encoding="utf-8") as f:
report["groups-system"] = sorted(f.read().strip().split("\n"))
if os.path.exists(f"{tree}/boot") and len(os.listdir(f"{tree}/boot")) > 0:
assert "bootmenu" not in report
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/boot/grub2/grubenv") as f:
with open(f"{tree}/boot/grub2/grubenv", encoding="utf-8") as f:
report["boot-environment"] = parse_environment_vars(f.read())
report["bootmenu"] = read_boot_entries(f"{tree}/boot")
elif len(glob.glob(f"{tree}/vmlinuz-*")) > 0:
assert "bootmenu" not in report
with open(f"{tree}/grub2/grubenv") as f:
with open(f"{tree}/grub2/grubenv", encoding="utf-8") as f:
report["boot-environment"] = parse_environment_vars(f.read())
report["bootmenu"] = read_boot_entries(tree)
elif len(glob.glob(f"{tree}/EFI")):
elif glob.glob(f"{tree}/EFI"):
print("EFI partition", file=sys.stderr)
@ -2511,7 +2515,7 @@ def discover_lvm(dev: str, parent: devices.Device, devmgr: devices.DeviceManager
raise RuntimeError(res.stderr.strip())
data = res.stdout.strip()
parsed = list(map(lambda l: l.split(";"), data.split("\n")))
parsed = list(map(lambda line: line.split(";"), data.split("\n")))
volumes = OrderedDict()
# devices_map stores for each device path onto the system the corresponding
@ -2567,6 +2571,7 @@ def partition_is_lvm(part: Dict) -> bool:
return part["type"].upper() in ["E6D6D379-F507-44C2-A23C-238F2A3DF928", "8E"]
# pylint: disable=too-many-branches disable=too-many-statements
def append_partitions(report, image):
partitions = report["partitions"]
with tempfile.TemporaryDirectory() as mountpoint:
@ -2666,7 +2671,7 @@ def append_partitions(report, image):
val = parts[1]
# uid and gid must be integers
if key == "uid" or key == "gid":
if key in ("uid", "gid"):
val = int(val)
options[key] = val
@ -2812,8 +2817,8 @@ def analyse_tarball(path):
# gce image type contains virtual raw disk inside a tarball
if os.path.isfile(f"{tree}/disk.raw"):
return analyse_image(f"{tree}/disk.raw")
else:
return analyse_directory(tree)
return analyse_directory(tree)
def is_compressed(path):