diff --git a/tools/image-info b/tools/image-info index 376ac53ef..db31fa725 100755 --- a/tools/image-info +++ b/tools/image-info @@ -21,7 +21,7 @@ import xml.etree.ElementTree import yaml from collections import OrderedDict -from typing import Dict +from typing import Dict, Any, Generator from osbuild import loop @@ -70,7 +70,7 @@ def loop_open(ctl, image, *, offset=None, size=None): @contextlib.contextmanager -def convert_image(ctl, image, fmt): +def convert_image(image, fmt): with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp: if fmt["type"] != "raw": target = os.path.join(tmp, "image.raw") @@ -135,6 +135,8 @@ def parse_environment_vars(s): def parse_unit_files(s, expected_state): r = [] for line in s.split("\n")[1:]: + state = "" + unit = "" try: unit, state, *_ = line.split() except ValueError: @@ -146,7 +148,7 @@ def parse_unit_files(s, expected_state): return r -def subprocess_check_output(argv, parse_fn=None): +def subprocess_check_output(argv, parse_fn=None) -> Any: try: output = subprocess.check_output(argv, encoding="utf-8") except subprocess.CalledProcessError as e: @@ -195,7 +197,7 @@ def read_container_images(tree): return images -def read_image_format(device): +def read_image_format(device) -> Dict[str, str]: """ Read image format. @@ -318,7 +320,7 @@ def read_partition_table(device): return info -def read_bootloader_type(device): +def read_bootloader_type(device) -> str: """ Read bootloader type from the provided device. @@ -416,16 +418,18 @@ def rpm_verify(tree): changed = {} missing = [] - for line in rpm.stdout: - # format description in rpm(8), under `--verify` - attrs = line[:9] - if attrs == "missing ": - missing.append(line[12:].rstrip()) - else: - changed[line[13:].rstrip()] = attrs - # ignore return value, because it returns non-zero when it found changes - rpm.wait() + if rpm.stdout: + for line in rpm.stdout: + # format description in rpm(8), under `--verify` + attrs = line[:9] + if attrs == "missing ": + missing.append(line[12:].rstrip()) + else: + changed[line[13:].rstrip()] = attrs + + # ignore return value, because it returns non-zero when it found changes + rpm.wait() return { "missing": sorted(missing), @@ -433,7 +437,7 @@ def rpm_verify(tree): } -def rpm_not_installed_docs(tree, is_ostree): +def rpm_not_installed_docs(tree): """ Gathers information on documentation, which is part of RPM packages, but was not installed. @@ -467,7 +471,7 @@ def rpm_not_installed_docs(tree, is_ostree): return sorted(not_installed_docs) -def rpm_packages(tree, is_ostree): +def rpm_packages(tree): """ Read NVRs of RPM packages installed on the system. @@ -494,7 +498,7 @@ def rpm_packages(tree, is_ostree): cmd += ["--dbpath", "/usr/share/rpm"] elif os.path.exists(os.path.join(tree, "var/lib/rpm")): cmd += ["--dbpath", "/var/lib/rpm"] - output = subprocess_check_output(cmd) + subprocess_check_output(cmd) pkgs = subprocess_check_output(cmd, str.split) return list(sorted(pkgs)) @@ -2271,11 +2275,11 @@ def read_resolv_conf(tree): def append_filesystem(report, tree, *, is_ostree=False): if os.path.exists(f"{tree}/etc/os-release"): - report["packages"] = rpm_packages(tree, is_ostree) + report["packages"] = rpm_packages(tree) if not is_ostree: report["rpm-verify"] = rpm_verify(tree) - not_installed_docs = rpm_not_installed_docs(tree, is_ostree) + not_installed_docs = rpm_not_installed_docs(tree) if not_installed_docs: report["rpm_not_installed_docs"] = not_installed_docs @@ -2485,7 +2489,7 @@ def ensure_device_file(path: str, major: int, minor: int): @contextlib.contextmanager -def discover_lvm(dev) -> Dict: +def discover_lvm(dev) -> Generator[Dict[str, Any], None, None]: # find the volume group name for the device file vg_name = volume_group_for_device(dev) @@ -2532,14 +2536,12 @@ def discover_lvm(dev) -> Dict: volumes[name] = info if name.startswith("root"): volumes.move_to_end(name, last=False) - res = { + yield { "lvm": True, "lvm.vg": vg_name, "lvm.volumes": volumes } - yield res - finally: r = subprocess.run(["vgchange", "-an", vg_name], stdout=subprocess.DEVNULL, @@ -2621,13 +2623,13 @@ def append_partitions(report, image, loctl): append_filesystem(report, root_tree) -def analyse_image(image): +def analyse_image(image) -> Dict[str, Any]: loctl = loop.LoopControl() imgfmt = read_image_format(image) - report = {"image-format": imgfmt} + report: Dict[str, Any] = {"image-format": imgfmt} - with convert_image(loctl, image, imgfmt) as target: + with convert_image(image, imgfmt) as target: size = os.stat(target).st_size with loop_open(loctl, target, offset=0, size=size) as device: report["bootloader"] = read_bootloader_type(device) @@ -2650,7 +2652,7 @@ def append_directory(report, tree): # Make sure that the tools which analyse the directory in-place # can not modify its content (e.g. create additional files). # mount_at() always mounts the source as read-only! - with mount_at(tree, tree_ro, ["bind"]) as mountpoint: + with mount_at(tree, tree_ro, ["bind"]) as _: if os.path.lexists(f"{tree}/ostree"): os.makedirs(f"{tree}/etc", exist_ok=True) with mount_at(f"{tree}/usr/etc", f"{tree}/etc", extra=["--bind"]): @@ -2700,7 +2702,7 @@ def analyse_directory(path): def is_tarball(path): - mtype, encoding = mimetypes.guess_type(path) + mtype, _ = mimetypes.guess_type(path) return mtype == "application/x-tar"