tools/image-info: bad syntax on rhel8 python

We can't use:

with (a() as b, c() as d):
    pass

On older versions of python.
This commit is contained in:
Thomas Lavocat 2023-02-10 14:13:06 +01:00 committed by Achilleas Koutsou
parent 15d80a0cd1
commit aba981c508

View file

@ -2569,133 +2569,133 @@ def partition_is_lvm(part: Dict) -> bool:
def append_partitions(report, image):
partitions = report["partitions"]
with (tempfile.TemporaryDirectory() as mountpoint,
host.ServiceManager(monitor=monitor.NullMonitor(1)) as mgr):
with tempfile.TemporaryDirectory() as mountpoint:
with host.ServiceManager(monitor=monitor.NullMonitor(1)) as mgr:
devmgr = devices.DeviceManager(mgr, "/dev", os.path.dirname(image))
devmgr = devices.DeviceManager(mgr, "/dev", os.path.dirname(image))
# Device map associate a path onto where the device is mounted with its
# corresponding Device object. Mount will require both the path and the
# Device object in order to do its job.
devices_map = {}
filesystems = {}
for part in partitions:
start, size = part["start"], part["size"]
ret = loop_open(
devmgr,
part["partuuid"],
image,
size,
offset=start)
dev = ret["path"]
devices_map[dev] = ret["Device"]
read_partition(dev, part)
if partition_is_lvm(part):
dmap, lvm = discover_lvm(dev, ret["Device"], devmgr)
devices_map.update(dmap)
for vol in lvm["lvm.volumes"].values():
if vol["fstype"]:
mntopts = []
# we cannot recover since the underlying loopback device is mounted
# read-only but since we are using the it through the device mapper
# the fact might not be communicated and the kernel attempt a to
# a recovery of the filesystem, which will lead to a kernel panic
if vol["fstype"] in ("ext4", "ext3", "xfs"):
mntopts = ["norecovery"]
filesystems[vol["uuid"].upper()] = {
"device": vol["device"],
"mntops": mntopts
}
del vol["device"]
part.update(lvm)
elif part["uuid"] and part["fstype"]:
filesystems[part["uuid"].upper()] = {
"device": dev
}
# Device map associate a path onto where the device is mounted with its
# corresponding Device object. Mount will require both the path and the
# Device object in order to do its job.
devices_map = {}
filesystems = {}
for part in partitions:
start, size = part["start"], part["size"]
ret = loop_open(
devmgr,
part["partuuid"],
image,
size,
offset=start)
dev = ret["path"]
devices_map[dev] = ret["Device"]
read_partition(dev, part)
if partition_is_lvm(part):
dmap, lvm = discover_lvm(dev, ret["Device"], devmgr)
devices_map.update(dmap)
for vol in lvm["lvm.volumes"].values():
if vol["fstype"]:
mntopts = []
# we cannot recover since the underlying loopback device is mounted
# read-only but since we are using the it through the device mapper
# the fact might not be communicated and the kernel attempt a to
# a recovery of the filesystem, which will lead to a kernel panic
if vol["fstype"] in ("ext4", "ext3", "xfs"):
mntopts = ["norecovery"]
filesystems[vol["uuid"].upper()] = {
"device": vol["device"],
"mntops": mntopts
}
del vol["device"]
part.update(lvm)
elif part["uuid"] and part["fstype"]:
filesystems[part["uuid"].upper()] = {
"device": dev
}
# find partition with fstab and read it
fstab = []
for fs in filesystems.values():
dev, opts = fs["device"], fs.get("mntops")
with mount(dev, opts) as tree:
if os.path.exists(f"{tree}/etc/fstab"):
fstab.extend(read_fstab(tree))
break
# sort the fstab entries by the mountpoint
fstab = sorted(fstab, key=operator.itemgetter(1))
# find partition with fstab and read it
fstab = []
for fs in filesystems.values():
dev, opts = fs["device"], fs.get("mntops")
with mount(dev, opts) as tree:
if os.path.exists(f"{tree}/etc/fstab"):
fstab.extend(read_fstab(tree))
break
# sort the fstab entries by the mountpoint
fstab = sorted(fstab, key=operator.itemgetter(1))
# mount all partitions to ther respective mount points
root_tree = ""
mmgr = mounts.MountManager(devmgr, mountpoint)
for n, fstab_entry in enumerate(fstab):
part_uuid = fstab_entry[0].split("=")[1].upper()
part_device = filesystems[part_uuid]["device"]
part_mountpoint = fstab_entry[1]
part_fstype = fstab_entry[2]
part_options = fstab_entry[3].split(",")
part_options += filesystems[part_uuid].get("mntops", [])
# mount all partitions to ther respective mount points
root_tree = ""
mmgr = mounts.MountManager(devmgr, mountpoint)
for n, fstab_entry in enumerate(fstab):
part_uuid = fstab_entry[0].split("=")[1].upper()
part_device = filesystems[part_uuid]["device"]
part_mountpoint = fstab_entry[1]
part_fstype = fstab_entry[2]
part_options = fstab_entry[3].split(",")
part_options += filesystems[part_uuid].get("mntops", [])
if "ext4" in part_fstype:
info = index.get_module_info("Mount", "org.osbuild.ext4")
elif "vfat" in part_fstype:
info = index.get_module_info("Mount", "org.osbuild.fat")
elif "btrfs" in part_fstype:
info = index.get_module_info("Mount", "org.osbuild.btrfs")
elif "xfs" in part_fstype:
info = index.get_module_info("Mount", "org.osbuild.xfs")
else:
raise RuntimeError("Unknown file system")
if not info:
raise RuntimeError(f"Can't find org.osbuild.{part_fstype}")
# the first mount point should be root
if n == 0:
if part_mountpoint != "/":
raise RuntimeError("The first mountpoint in sorted fstab entries is not '/'")
root_tree = mountpoint
# prepare the options to mount the partition
options = {}
for option in part_options:
if option == "defaults": # defaults is not a supported option
continue
if "=" in option:
parts = option.split("=")
key = parts[0]
val = parts[1]
# uid and gid must be integers
if key == "uid" or key == "gid":
val = int(val)
options[key] = val
if "ext4" in part_fstype:
info = index.get_module_info("Mount", "org.osbuild.ext4")
elif "vfat" in part_fstype:
info = index.get_module_info("Mount", "org.osbuild.fat")
elif "btrfs" in part_fstype:
info = index.get_module_info("Mount", "org.osbuild.btrfs")
elif "xfs" in part_fstype:
info = index.get_module_info("Mount", "org.osbuild.xfs")
else:
options[option] = True
raise RuntimeError("Unknown file system")
if not info:
raise RuntimeError(f"Can't find org.osbuild.{part_fstype}")
options["readonly"] = True
# the first mount point should be root
if n == 0:
if part_mountpoint != "/":
raise RuntimeError("The first mountpoint in sorted fstab entries is not '/'")
root_tree = mountpoint
# Validate the options
#
# The mount manager is taking care of opening the file system for us
# so we don't have access to the json objects that'll be used to
# invoke the mounter. However we're only interested at validating the
# options. We can extract these from the schame to validate them
# only.
jsonschema.validate(options, info.get_schema()["properties"]["options"])
# prepare the options to mount the partition
options = {}
for option in part_options:
if option == "defaults": # defaults is not a supported option
continue
# Finally mount
mmgr.mount(mounts.Mount(
part_device,
info,
devices_map[part_device], # retrieves the associated Device Object
part_mountpoint,
options))
if "=" in option:
parts = option.split("=")
key = parts[0]
val = parts[1]
if not root_tree:
raise RuntimeError("The root filesystem tree is not mounted")
# uid and gid must be integers
if key == "uid" or key == "gid":
val = int(val)
append_filesystem(report, root_tree)
options[key] = val
else:
options[option] = True
options["readonly"] = True
# Validate the options
#
# The mount manager is taking care of opening the file system for us
# so we don't have access to the json objects that'll be used to
# invoke the mounter. However we're only interested at validating the
# options. We can extract these from the schame to validate them
# only.
jsonschema.validate(options, info.get_schema()["properties"]["options"])
# Finally mount
mmgr.mount(mounts.Mount(
part_device,
info,
devices_map[part_device], # retrieves the associated Device Object
part_mountpoint,
options))
if not root_tree:
raise RuntimeError("The root filesystem tree is not mounted")
append_filesystem(report, root_tree)
def analyse_image(image) -> Dict[str, Any]: