tools/image-info: fix mypy errors

This commit is contained in:
Thomas Lavocat 2022-11-10 16:26:00 +01:00 committed by Achilleas Koutsou
parent 71ca1d0e9a
commit aaddca445d

View file

@ -21,7 +21,7 @@ import xml.etree.ElementTree
import yaml
from collections import OrderedDict
from typing import Dict
from typing import Dict, Any, Generator
from osbuild import loop
@ -70,7 +70,7 @@ def loop_open(ctl, image, *, offset=None, size=None):
@contextlib.contextmanager
def convert_image(ctl, image, fmt):
def convert_image(image, fmt):
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp:
if fmt["type"] != "raw":
target = os.path.join(tmp, "image.raw")
@ -135,6 +135,8 @@ def parse_environment_vars(s):
def parse_unit_files(s, expected_state):
r = []
for line in s.split("\n")[1:]:
state = ""
unit = ""
try:
unit, state, *_ = line.split()
except ValueError:
@ -146,7 +148,7 @@ def parse_unit_files(s, expected_state):
return r
def subprocess_check_output(argv, parse_fn=None):
def subprocess_check_output(argv, parse_fn=None) -> Any:
try:
output = subprocess.check_output(argv, encoding="utf-8")
except subprocess.CalledProcessError as e:
@ -195,7 +197,7 @@ def read_container_images(tree):
return images
def read_image_format(device):
def read_image_format(device) -> Dict[str, str]:
"""
Read image format.
@ -318,7 +320,7 @@ def read_partition_table(device):
return info
def read_bootloader_type(device):
def read_bootloader_type(device) -> str:
"""
Read bootloader type from the provided device.
@ -416,16 +418,18 @@ def rpm_verify(tree):
changed = {}
missing = []
for line in rpm.stdout:
# format description in rpm(8), under `--verify`
attrs = line[:9]
if attrs == "missing ":
missing.append(line[12:].rstrip())
else:
changed[line[13:].rstrip()] = attrs
# ignore return value, because it returns non-zero when it found changes
rpm.wait()
if rpm.stdout:
for line in rpm.stdout:
# format description in rpm(8), under `--verify`
attrs = line[:9]
if attrs == "missing ":
missing.append(line[12:].rstrip())
else:
changed[line[13:].rstrip()] = attrs
# ignore return value, because it returns non-zero when it found changes
rpm.wait()
return {
"missing": sorted(missing),
@ -433,7 +437,7 @@ def rpm_verify(tree):
}
def rpm_not_installed_docs(tree, is_ostree):
def rpm_not_installed_docs(tree):
"""
Gathers information on documentation, which is part of RPM packages,
but was not installed.
@ -467,7 +471,7 @@ def rpm_not_installed_docs(tree, is_ostree):
return sorted(not_installed_docs)
def rpm_packages(tree, is_ostree):
def rpm_packages(tree):
"""
Read NVRs of RPM packages installed on the system.
@ -494,7 +498,7 @@ def rpm_packages(tree, is_ostree):
cmd += ["--dbpath", "/usr/share/rpm"]
elif os.path.exists(os.path.join(tree, "var/lib/rpm")):
cmd += ["--dbpath", "/var/lib/rpm"]
output = subprocess_check_output(cmd)
subprocess_check_output(cmd)
pkgs = subprocess_check_output(cmd, str.split)
return list(sorted(pkgs))
@ -2271,11 +2275,11 @@ def read_resolv_conf(tree):
def append_filesystem(report, tree, *, is_ostree=False):
if os.path.exists(f"{tree}/etc/os-release"):
report["packages"] = rpm_packages(tree, is_ostree)
report["packages"] = rpm_packages(tree)
if not is_ostree:
report["rpm-verify"] = rpm_verify(tree)
not_installed_docs = rpm_not_installed_docs(tree, is_ostree)
not_installed_docs = rpm_not_installed_docs(tree)
if not_installed_docs:
report["rpm_not_installed_docs"] = not_installed_docs
@ -2485,7 +2489,7 @@ def ensure_device_file(path: str, major: int, minor: int):
@contextlib.contextmanager
def discover_lvm(dev) -> Dict:
def discover_lvm(dev) -> Generator[Dict[str, Any], None, None]:
# find the volume group name for the device file
vg_name = volume_group_for_device(dev)
@ -2532,14 +2536,12 @@ def discover_lvm(dev) -> Dict:
volumes[name] = info
if name.startswith("root"):
volumes.move_to_end(name, last=False)
res = {
yield {
"lvm": True,
"lvm.vg": vg_name,
"lvm.volumes": volumes
}
yield res
finally:
r = subprocess.run(["vgchange", "-an", vg_name],
stdout=subprocess.DEVNULL,
@ -2621,13 +2623,13 @@ def append_partitions(report, image, loctl):
append_filesystem(report, root_tree)
def analyse_image(image):
def analyse_image(image) -> Dict[str, Any]:
loctl = loop.LoopControl()
imgfmt = read_image_format(image)
report = {"image-format": imgfmt}
report: Dict[str, Any] = {"image-format": imgfmt}
with convert_image(loctl, image, imgfmt) as target:
with convert_image(image, imgfmt) as target:
size = os.stat(target).st_size
with loop_open(loctl, target, offset=0, size=size) as device:
report["bootloader"] = read_bootloader_type(device)
@ -2650,7 +2652,7 @@ def append_directory(report, tree):
# Make sure that the tools which analyse the directory in-place
# can not modify its content (e.g. create additional files).
# mount_at() always mounts the source as read-only!
with mount_at(tree, tree_ro, ["bind"]) as mountpoint:
with mount_at(tree, tree_ro, ["bind"]) as _:
if os.path.lexists(f"{tree}/ostree"):
os.makedirs(f"{tree}/etc", exist_ok=True)
with mount_at(f"{tree}/usr/etc", f"{tree}/etc", extra=["--bind"]):
@ -2700,7 +2702,7 @@ def analyse_directory(path):
def is_tarball(path):
mtype, encoding = mimetypes.guess_type(path)
mtype, _ = mimetypes.guess_type(path)
return mtype == "application/x-tar"