debian-forge/devices/org.osbuild.loopback
David Rheinsberg 4b09088661 test/isort: apply diff to full tree
Apply the isort modifications to the entire source tree, not just the
selected python files of test-src.

Signed-off-by: David Rheinsberg <david.rheinsberg@gmail.com>
2022-09-23 12:08:10 +02:00

159 lines
4.5 KiB
Python
Executable file

#!/usr/bin/python3
"""
Loopback device host service
This service can be used to expose a file or a subset of it as a
device node. The file is specified via the `filename`, and the
subset can be specified via `offset` and `size`.
The resulting device name is returned together with the device
node numbers (`major`, `minor`). The device is closed when the
service is shut down.
A typical use case is formatting the file or a partition in the
file with a file system or mounting a previously created file
system contained in the file.
NB: This will use the custom osbuild udev rules inhibitor to
suppress problematic udev rules. For more details see the
documentation for `osbuil.util.udev.UdevInhibitor`.
"""
import argparse
import os
import sys
from typing import Dict
from osbuild import devices, loop
from osbuild.util.udev import UdevInhibitor
SCHEMA = """
"additionalProperties": false,
"required": ["filename"],
"properties": {
"filename": {
"type": "string",
"description": "File to associate with the loopback device"
},
"start": {
"type": "number",
"description": "Start of the data segment (in sectors)",
"default": 0
},
"size": {
"type": "number",
"description": "Size limit of the data segment (in sectors)"
},
"sector-size": {
"type": "number",
"description": "Sector size (in bytes)",
"default": 512
},
"lock": {
"type": "boolean",
"description": "Lock the device after opening it"
}
}
"""
class LoopbackService(devices.DeviceService):
def __init__(self, args: argparse.Namespace):
super().__init__(args)
self.ctl = loop.LoopControl()
self.fd = None
self.lo = None
self.sector_size = None
@staticmethod
def setup_loop(lo: loop.Loop):
lock = UdevInhibitor.for_device(lo.LOOP_MAJOR, lo.minor)
lo.on_close = lambda _l: lock.release()
def make_loop(self, fd: int, offset, sizelimit, lock):
if not sizelimit:
stat = os.fstat(fd)
sizelimit = stat.st_size - offset
else:
sizelimit *= self.sector_size
lo = self.ctl.loop_for_fd(fd, lock=lock,
setup=self.setup_loop,
offset=offset,
sizelimit=sizelimit,
partscan=False,
autoclear=True)
return lo
def open(self, devpath: str, parent: str, tree: str, options: Dict):
filename = options["filename"]
self.sector_size = options.get("sector-size", 512)
start = options.get("start", 0) * self.sector_size
size = options.get("size")
lock = options.get("lock", False)
path = os.path.join(tree, filename.lstrip("/"))
self.fd = os.open(path, os.O_RDWR | os.O_CLOEXEC)
try:
self.lo = self.make_loop(self.fd, start, size, lock)
except Exception as error: # pylint: disable: broad-except
self.close()
raise error from None
print(f"{self.lo.devname} acquired (locked: {lock})")
dir_fd = -1
try:
dir_fd = os.open(devpath, os.O_CLOEXEC | os.O_PATH)
self.lo.mknod(dir_fd)
finally:
if dir_fd > -1:
os.close(dir_fd)
res = {
"path": self.lo.devname,
"node": {
"major": self.lo.LOOP_MAJOR,
"minor": self.lo.minor,
}
}
return res
def close(self):
# Calling `close` is valid on closed
# `LoopControl` and `Loop` objects
self.ctl.close()
if self.lo:
# Flush the buffer cache of the loop device. This
# seems to be required when clearing the fd of the
# loop device (as of kernel 5.13.8) or otherwise
# it leads to data loss.
self.lo.flush_buf()
# clear the fd. Since it might not immediately be
# cleared (due to a race with udev or some other
# process still having a reference to the loop dev)
# we give it some time and wait for the clearing
self.lo.clear_fd_wait(self.fd, 30)
self.lo.close()
self.lo = None
if self.fd is not None:
fd = self.fd
self.fd = None
try:
os.fsync(fd)
finally:
os.close(fd)
def main():
service = LoopbackService.from_args(sys.argv[1:])
service.main()
if __name__ == '__main__':
main()