#!/usr/bin/python3 """ Loopback device host service This service can be used to expose a file or a subset of it as a device node. The file is specified via the `filename`, and the subset can be specified via `offset` and `size`. The resulting device name is returned together with the device node numbers (`major`, `minor`). The device is closed when the service is shut down. A typical use case is formatting the file or a partition in the file with a file system or mounting a previously created file system contained in the file. NB: This will use the custom osbuild udev rules inhibitor to suppress problematic udev rules. For more details see the documentation for `osbuil.util.udev.UdevInhibitor`. """ import argparse import errno import glob import os import stat import sys from typing import Dict from osbuild import devices, loop from osbuild.util import ctx from osbuild.util.udev import UdevInhibitor SCHEMA = """ "additionalProperties": false, "required": ["filename"], "properties": { "filename": { "type": "string", "description": "File to associate with the loopback device" }, "partscan": { "type": "boolean", "description": "Perform partition scanning after device creation" }, "start": { "type": "number", "description": "Start of the data segment (in sectors)", "default": 0 }, "size": { "type": "number", "description": "Size limit of the data segment (in sectors)" }, "sector-size": { "type": "number", "description": "Sector size (in bytes)", "default": 512 }, "lock": { "type": "boolean", "description": "Lock the device after opening it" }, "read-only": { "type": "boolean", "description": "Set up the device as read-only" } } """ class LoopbackService(devices.DeviceService): def __init__(self, args: argparse.Namespace): super().__init__(args) self.ctl = loop.LoopControl() self.fd = None self.lo = None self.sector_size = None @staticmethod def setup_loop(lo: loop.Loop): lock = UdevInhibitor.for_device(lo.LOOP_MAJOR, lo.minor) lo.on_close = lambda _l: lock.release() def make_loop(self, fd: int, offset, sizelimit, lock, partscan, read_only): if not sizelimit: sizelimit = os.fstat(fd).st_size - offset else: sizelimit *= self.sector_size lo = self.ctl.loop_for_fd(fd, lock=lock, setup=self.setup_loop, offset=offset, sizelimit=sizelimit, blocksize=self.sector_size, partscan=partscan, read_only=read_only, autoclear=True) return lo def open(self, devpath: str, parent: str, tree: str, options: Dict): filename = options["filename"] self.sector_size = options.get("sector-size", 512) start = options.get("start", 0) * self.sector_size size = options.get("size") lock = options.get("lock", False) partscan = options.get("partscan", False) read_only = options.get("read-only", False) path = os.path.join(tree, filename.lstrip("/")) self.fd = os.open(path, os.O_RDWR | os.O_CLOEXEC) try: self.lo = self.make_loop(self.fd, start, size, lock, partscan, read_only) except Exception as error: # pylint: disable: broad-except self.close() raise error from None print(f"{self.lo.devname} acquired (locked: {lock})") dir_fd = -1 try: dir_fd = os.open(devpath, os.O_CLOEXEC | os.O_PATH) with ctx.suppress_oserror(errno.EEXIST): self.lo.mknod(dir_fd) # If partscan was enabled let's find any partition # based devices that were added (i.e. loop0p1) and # copy them into our custom /dev/ directory: if partscan: for device in glob.glob(os.path.join("/dev/", f"{self.lo.devname}p*")): os.mknod(os.path.basename(device), mode=(0o600 | stat.S_IFBLK), device=os.stat(device).st_rdev, dir_fd=dir_fd) finally: if dir_fd > -1: os.close(dir_fd) res = { "path": self.lo.devname, "node": { "major": self.lo.LOOP_MAJOR, "minor": self.lo.minor, } } return res def close(self): # Calling `close` is valid on closed # `LoopControl` and `Loop` objects self.ctl.close() if self.lo: # Flush the buffer cache of the loop device. This # seems to be required when clearing the fd of the # loop device (as of kernel 5.13.8) or otherwise # it leads to data loss. self.lo.flush_buf() # clear the fd. Since it might not immediately be # cleared (due to a race with udev or some other # process still having a reference to the loop dev) # we give it some time and wait for the clearing self.lo.clear_fd_wait(self.fd, 30) self.lo.close() self.lo = None if self.fd is not None: fd = self.fd self.fd = None try: os.fsync(fd) finally: os.close(fd) def main(): service = LoopbackService.from_args(sys.argv[1:]) service.main() if __name__ == '__main__': main()