diff --git a/tools/image-info b/tools/image-info index e42641458..4a3dc1d45 100755 --- a/tools/image-info +++ b/tools/image-info @@ -2569,133 +2569,133 @@ def partition_is_lvm(part: Dict) -> bool: def append_partitions(report, image): partitions = report["partitions"] - with (tempfile.TemporaryDirectory() as mountpoint, - host.ServiceManager(monitor=monitor.NullMonitor(1)) as mgr): + with tempfile.TemporaryDirectory() as mountpoint: + with host.ServiceManager(monitor=monitor.NullMonitor(1)) as mgr: - devmgr = devices.DeviceManager(mgr, "/dev", os.path.dirname(image)) + devmgr = devices.DeviceManager(mgr, "/dev", os.path.dirname(image)) - # Device map associate a path onto where the device is mounted with its - # corresponding Device object. Mount will require both the path and the - # Device object in order to do its job. - devices_map = {} - filesystems = {} - for part in partitions: - start, size = part["start"], part["size"] - ret = loop_open( - devmgr, - part["partuuid"], - image, - size, - offset=start) - dev = ret["path"] - devices_map[dev] = ret["Device"] - read_partition(dev, part) - if partition_is_lvm(part): - dmap, lvm = discover_lvm(dev, ret["Device"], devmgr) - devices_map.update(dmap) - for vol in lvm["lvm.volumes"].values(): - if vol["fstype"]: - mntopts = [] - # we cannot recover since the underlying loopback device is mounted - # read-only but since we are using the it through the device mapper - # the fact might not be communicated and the kernel attempt a to - # a recovery of the filesystem, which will lead to a kernel panic - if vol["fstype"] in ("ext4", "ext3", "xfs"): - mntopts = ["norecovery"] - filesystems[vol["uuid"].upper()] = { - "device": vol["device"], - "mntops": mntopts - } - del vol["device"] - part.update(lvm) - elif part["uuid"] and part["fstype"]: - filesystems[part["uuid"].upper()] = { - "device": dev - } + # Device map associate a path onto where the device is mounted with its + # corresponding Device object. Mount will require both the path and the + # Device object in order to do its job. + devices_map = {} + filesystems = {} + for part in partitions: + start, size = part["start"], part["size"] + ret = loop_open( + devmgr, + part["partuuid"], + image, + size, + offset=start) + dev = ret["path"] + devices_map[dev] = ret["Device"] + read_partition(dev, part) + if partition_is_lvm(part): + dmap, lvm = discover_lvm(dev, ret["Device"], devmgr) + devices_map.update(dmap) + for vol in lvm["lvm.volumes"].values(): + if vol["fstype"]: + mntopts = [] + # we cannot recover since the underlying loopback device is mounted + # read-only but since we are using the it through the device mapper + # the fact might not be communicated and the kernel attempt a to + # a recovery of the filesystem, which will lead to a kernel panic + if vol["fstype"] in ("ext4", "ext3", "xfs"): + mntopts = ["norecovery"] + filesystems[vol["uuid"].upper()] = { + "device": vol["device"], + "mntops": mntopts + } + del vol["device"] + part.update(lvm) + elif part["uuid"] and part["fstype"]: + filesystems[part["uuid"].upper()] = { + "device": dev + } - # find partition with fstab and read it - fstab = [] - for fs in filesystems.values(): - dev, opts = fs["device"], fs.get("mntops") - with mount(dev, opts) as tree: - if os.path.exists(f"{tree}/etc/fstab"): - fstab.extend(read_fstab(tree)) - break - # sort the fstab entries by the mountpoint - fstab = sorted(fstab, key=operator.itemgetter(1)) + # find partition with fstab and read it + fstab = [] + for fs in filesystems.values(): + dev, opts = fs["device"], fs.get("mntops") + with mount(dev, opts) as tree: + if os.path.exists(f"{tree}/etc/fstab"): + fstab.extend(read_fstab(tree)) + break + # sort the fstab entries by the mountpoint + fstab = sorted(fstab, key=operator.itemgetter(1)) - # mount all partitions to ther respective mount points - root_tree = "" - mmgr = mounts.MountManager(devmgr, mountpoint) - for n, fstab_entry in enumerate(fstab): - part_uuid = fstab_entry[0].split("=")[1].upper() - part_device = filesystems[part_uuid]["device"] - part_mountpoint = fstab_entry[1] - part_fstype = fstab_entry[2] - part_options = fstab_entry[3].split(",") - part_options += filesystems[part_uuid].get("mntops", []) + # mount all partitions to ther respective mount points + root_tree = "" + mmgr = mounts.MountManager(devmgr, mountpoint) + for n, fstab_entry in enumerate(fstab): + part_uuid = fstab_entry[0].split("=")[1].upper() + part_device = filesystems[part_uuid]["device"] + part_mountpoint = fstab_entry[1] + part_fstype = fstab_entry[2] + part_options = fstab_entry[3].split(",") + part_options += filesystems[part_uuid].get("mntops", []) - if "ext4" in part_fstype: - info = index.get_module_info("Mount", "org.osbuild.ext4") - elif "vfat" in part_fstype: - info = index.get_module_info("Mount", "org.osbuild.fat") - elif "btrfs" in part_fstype: - info = index.get_module_info("Mount", "org.osbuild.btrfs") - elif "xfs" in part_fstype: - info = index.get_module_info("Mount", "org.osbuild.xfs") - else: - raise RuntimeError("Unknown file system") - if not info: - raise RuntimeError(f"Can't find org.osbuild.{part_fstype}") - - # the first mount point should be root - if n == 0: - if part_mountpoint != "/": - raise RuntimeError("The first mountpoint in sorted fstab entries is not '/'") - root_tree = mountpoint - - # prepare the options to mount the partition - options = {} - for option in part_options: - if option == "defaults": # defaults is not a supported option - continue - - if "=" in option: - parts = option.split("=") - key = parts[0] - val = parts[1] - - # uid and gid must be integers - if key == "uid" or key == "gid": - val = int(val) - - options[key] = val + if "ext4" in part_fstype: + info = index.get_module_info("Mount", "org.osbuild.ext4") + elif "vfat" in part_fstype: + info = index.get_module_info("Mount", "org.osbuild.fat") + elif "btrfs" in part_fstype: + info = index.get_module_info("Mount", "org.osbuild.btrfs") + elif "xfs" in part_fstype: + info = index.get_module_info("Mount", "org.osbuild.xfs") else: - options[option] = True + raise RuntimeError("Unknown file system") + if not info: + raise RuntimeError(f"Can't find org.osbuild.{part_fstype}") - options["readonly"] = True + # the first mount point should be root + if n == 0: + if part_mountpoint != "/": + raise RuntimeError("The first mountpoint in sorted fstab entries is not '/'") + root_tree = mountpoint - # Validate the options - # - # The mount manager is taking care of opening the file system for us - # so we don't have access to the json objects that'll be used to - # invoke the mounter. However we're only interested at validating the - # options. We can extract these from the schame to validate them - # only. - jsonschema.validate(options, info.get_schema()["properties"]["options"]) + # prepare the options to mount the partition + options = {} + for option in part_options: + if option == "defaults": # defaults is not a supported option + continue - # Finally mount - mmgr.mount(mounts.Mount( - part_device, - info, - devices_map[part_device], # retrieves the associated Device Object - part_mountpoint, - options)) + if "=" in option: + parts = option.split("=") + key = parts[0] + val = parts[1] - if not root_tree: - raise RuntimeError("The root filesystem tree is not mounted") + # uid and gid must be integers + if key == "uid" or key == "gid": + val = int(val) - append_filesystem(report, root_tree) + options[key] = val + else: + options[option] = True + + options["readonly"] = True + + # Validate the options + # + # The mount manager is taking care of opening the file system for us + # so we don't have access to the json objects that'll be used to + # invoke the mounter. However we're only interested at validating the + # options. We can extract these from the schame to validate them + # only. + jsonschema.validate(options, info.get_schema()["properties"]["options"]) + + # Finally mount + mmgr.mount(mounts.Mount( + part_device, + info, + devices_map[part_device], # retrieves the associated Device Object + part_mountpoint, + options)) + + if not root_tree: + raise RuntimeError("The root filesystem tree is not mounted") + + append_filesystem(report, root_tree) def analyse_image(image) -> Dict[str, Any]: