This fixes pylint warnings on our modules that are currently not part of CI-pylint. The fixes should all be straightforward. Signed-off-by: David Rheinsberg <david.rheinsberg@gmail.com>
162 lines
4.5 KiB
Python
Executable file
162 lines
4.5 KiB
Python
Executable file
#!/usr/bin/python3
|
|
"""
|
|
Loopback device host service
|
|
|
|
This service can be used to expose a file or a subset of it as a
|
|
device node. The file is specified via the `filename`, and the
|
|
subset can be specified via `offset` and `size`.
|
|
The resulting device name is returned together with the device
|
|
node numbers (`major`, `minor`). The device is closed when the
|
|
service is shut down.
|
|
A typical use case is formatting the file or a partition in the
|
|
file with a file system or mounting a previously created file
|
|
system contained in the file.
|
|
NB: This will use the custom osbuild udev rules inhibitor to
|
|
suppress problematic udev rules. For more details see the
|
|
documentation for `osbuil.util.udev.UdevInhibitor`.
|
|
"""
|
|
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
|
|
from typing import Dict
|
|
|
|
from osbuild import devices
|
|
from osbuild import loop
|
|
from osbuild.util.udev import UdevInhibitor
|
|
|
|
|
|
SCHEMA = """
|
|
"additionalProperties": false,
|
|
"required": ["filename"],
|
|
"properties": {
|
|
"filename": {
|
|
"type": "string",
|
|
"description": "File to associate with the loopback device"
|
|
},
|
|
"start": {
|
|
"type": "number",
|
|
"description": "Start of the data segment (in sectors)",
|
|
"default": 0
|
|
},
|
|
"size": {
|
|
"type": "number",
|
|
"description": "Size limit of the data segment (in sectors)"
|
|
},
|
|
"sector-size": {
|
|
"type": "number",
|
|
"description": "Sector size (in bytes)",
|
|
"default": 512
|
|
},
|
|
"lock": {
|
|
"type": "boolean",
|
|
"description": "Lock the device after opening it"
|
|
}
|
|
}
|
|
"""
|
|
|
|
|
|
class LoopbackService(devices.DeviceService):
|
|
|
|
def __init__(self, args: argparse.Namespace):
|
|
super().__init__(args)
|
|
self.ctl = loop.LoopControl()
|
|
self.fd = None
|
|
self.lo = None
|
|
self.sector_size = None
|
|
|
|
@staticmethod
|
|
def setup_loop(lo: loop.Loop):
|
|
lock = UdevInhibitor.for_device(lo.LOOP_MAJOR, lo.minor)
|
|
lo.on_close = lambda _l: lock.release()
|
|
|
|
def make_loop(self, fd: int, offset, sizelimit, lock):
|
|
if not sizelimit:
|
|
stat = os.fstat(fd)
|
|
sizelimit = stat.st_size - offset
|
|
else:
|
|
sizelimit *= self.sector_size
|
|
|
|
lo = self.ctl.loop_for_fd(fd, lock=lock,
|
|
setup=self.setup_loop,
|
|
offset=offset,
|
|
sizelimit=sizelimit,
|
|
partscan=False,
|
|
autoclear=True)
|
|
|
|
return lo
|
|
|
|
def open(self, devpath: str, parent: str, tree: str, options: Dict):
|
|
filename = options["filename"]
|
|
self.sector_size = options.get("sector-size", 512)
|
|
start = options.get("start", 0) * self.sector_size
|
|
size = options.get("size")
|
|
lock = options.get("lock", False)
|
|
|
|
path = os.path.join(tree, filename.lstrip("/"))
|
|
|
|
self.fd = os.open(path, os.O_RDWR | os.O_CLOEXEC)
|
|
try:
|
|
self.lo = self.make_loop(self.fd, start, size, lock)
|
|
except Exception as error: # pylint: disable: broad-except
|
|
self.close()
|
|
raise error from None
|
|
|
|
print(f"{self.lo.devname} acquired (locked: {lock})")
|
|
|
|
dir_fd = -1
|
|
try:
|
|
dir_fd = os.open(devpath, os.O_CLOEXEC | os.O_PATH)
|
|
self.lo.mknod(dir_fd)
|
|
finally:
|
|
if dir_fd > -1:
|
|
os.close(dir_fd)
|
|
|
|
res = {
|
|
"path": self.lo.devname,
|
|
"node": {
|
|
"major": self.lo.LOOP_MAJOR,
|
|
"minor": self.lo.minor,
|
|
}
|
|
}
|
|
|
|
return res
|
|
|
|
def close(self):
|
|
# Calling `close` is valid on closed
|
|
# `LoopControl` and `Loop` objects
|
|
self.ctl.close()
|
|
|
|
if self.lo:
|
|
# Flush the buffer cache of the loop device. This
|
|
# seems to be required when clearing the fd of the
|
|
# loop device (as of kernel 5.13.8) or otherwise
|
|
# it leads to data loss.
|
|
self.lo.flush_buf()
|
|
|
|
# clear the fd. Since it might not immediately be
|
|
# cleared (due to a race with udev or some other
|
|
# process still having a reference to the loop dev)
|
|
# we give it some time and wait for the clearing
|
|
self.lo.clear_fd_wait(self.fd, 30)
|
|
self.lo.close()
|
|
self.lo = None
|
|
|
|
if self.fd is not None:
|
|
fd = self.fd
|
|
self.fd = None
|
|
try:
|
|
os.fsync(fd)
|
|
finally:
|
|
os.close(fd)
|
|
|
|
|
|
def main():
|
|
service = LoopbackService.from_args(sys.argv[1:])
|
|
service.main()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|