Optional types were provided in places but were not always correct. Add mypy checking and fix those that fail(ed).
160 lines
4.5 KiB
Python
Executable file
160 lines
4.5 KiB
Python
Executable file
#!/usr/bin/python3
|
|
"""
|
|
Loopback device host service
|
|
|
|
This service can be used to expose a file or a subset of it as a
|
|
device node. The file is specified via the `filename`, and the
|
|
subset can be specified via `offset` and `size`.
|
|
The resulting device name is returned together with the device
|
|
node numbers (`major`, `minor`). The device is closed when the
|
|
service is shut down.
|
|
A typical use case is formatting the file or a partition in the
|
|
file with a file system or mounting a previously created file
|
|
system contained in the file.
|
|
NB: This will use the custom osbuild udev rules inhibitor to
|
|
suppress problematic udev rules. For more details see the
|
|
documentation for `osbuil.util.udev.UdevInhibitor`.
|
|
"""
|
|
|
|
|
|
import argparse
|
|
import errno
|
|
import os
|
|
import sys
|
|
|
|
from typing import Dict, Optional
|
|
|
|
from osbuild import devices
|
|
from osbuild import loop
|
|
from osbuild.util.udev import UdevInhibitor
|
|
|
|
|
|
SCHEMA = """
|
|
"additionalProperties": false,
|
|
"required": ["filename"],
|
|
"properties": {
|
|
"filename": {
|
|
"type": "string",
|
|
"description": "File to associate with the loopback device"
|
|
},
|
|
"start": {
|
|
"type": "number",
|
|
"description": "Start of the data segment (in sectors)",
|
|
"default": 0
|
|
},
|
|
"size": {
|
|
"type": "number",
|
|
"description": "Size limit of the data segment (in sectors)"
|
|
},
|
|
"sector-size": {
|
|
"type": "number",
|
|
"description": "Sector size (in bytes)",
|
|
"default": 512
|
|
},
|
|
"lock": {
|
|
"type": "boolean",
|
|
"description": "Lock the device after opening it"
|
|
}
|
|
}
|
|
"""
|
|
|
|
|
|
class LoopbackService(devices.DeviceService):
|
|
|
|
def __init__(self, args: argparse.Namespace):
|
|
super().__init__(args)
|
|
self.ctl = loop.LoopControl()
|
|
|
|
@staticmethod
|
|
def setup_loop(lo: loop.Loop):
|
|
lock = UdevInhibitor.for_device(lo.LOOP_MAJOR, lo.minor)
|
|
lo.on_close = lambda _l: lock.release()
|
|
|
|
def make_loop(self, fd: int, offset, sizelimit, lock):
|
|
if not sizelimit:
|
|
stat = os.fstat(fd)
|
|
sizelimit = stat.st_size - offset
|
|
else:
|
|
sizelimit *= self.sector_size
|
|
|
|
lo = self.ctl.loop_for_fd(fd, lock=lock,
|
|
setup=self.setup_loop,
|
|
offset=offset,
|
|
sizelimit=sizelimit,
|
|
partscan=False,
|
|
autoclear=True)
|
|
|
|
return lo
|
|
|
|
def open(self, devpath: str, parent: str, tree: str, options: Dict):
|
|
filename = options["filename"]
|
|
self.sector_size = options.get("sector-size", 512)
|
|
start = options.get("start", 0) * self.sector_size
|
|
size = options.get("size")
|
|
lock = options.get("lock", False)
|
|
|
|
path = os.path.join(tree, filename.lstrip("/"))
|
|
|
|
self.fd = os.open(path, os.O_RDWR | os.O_CLOEXEC)
|
|
try:
|
|
self.lo = self.make_loop(self.fd, start, size, lock)
|
|
except Exception as error: # pylint: disable: broad-except
|
|
self.close()
|
|
raise error from None
|
|
|
|
print(f"{self.lo.devname} acquired (locked: {lock})")
|
|
|
|
dir_fd = -1
|
|
try:
|
|
dir_fd = os.open(devpath, os.O_CLOEXEC | os.O_PATH)
|
|
self.lo.mknod(dir_fd)
|
|
finally:
|
|
if dir_fd > -1:
|
|
os.close(dir_fd)
|
|
|
|
res = {
|
|
"path": self.lo.devname,
|
|
"node": {
|
|
"major": self.lo.LOOP_MAJOR,
|
|
"minor": self.lo.minor,
|
|
}
|
|
}
|
|
|
|
return res
|
|
|
|
def close(self):
|
|
# Calling `close` is valid on closed
|
|
# `LoopControl` and `Loop` objects
|
|
self.ctl.close()
|
|
|
|
if self.lo:
|
|
# Flush the buffer cache of the loop device. This
|
|
# seems to be required when clearing the fd of the
|
|
# loop device (as of kernel 5.13.8) or otherwise
|
|
# it leads to data loss.
|
|
self.lo.flush_buf()
|
|
|
|
# clear the fd. Since it might not immediately be
|
|
# cleared (due to a race with udev or some other
|
|
# process still having a reference to the loop dev)
|
|
# we give it some time and wait for the clearing
|
|
self.lo.clear_fd_wait(self.fd, 30)
|
|
self.lo.close()
|
|
self.lo = None
|
|
|
|
if self.fd is not None:
|
|
fd = self.fd
|
|
self.fd = None
|
|
try:
|
|
os.fsync(fd)
|
|
finally:
|
|
os.close(fd)
|
|
|
|
|
|
def main():
|
|
service = LoopbackService.from_args(sys.argv[1:])
|
|
service.main()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|