devices/loopback: enable partition scanning as an option

We can now add an entire device and then get the partitions added
to our environment for use, rather than to have to map each partition
in to a separate loopback device.

This is a prep patch for https://github.com/osbuild/osbuild/issues/1495
This commit is contained in:
Dusty Mabe 2023-12-12 20:41:04 -05:00
parent c3b52ba240
commit f6d0a4a9ac

View file

@ -19,7 +19,9 @@ documentation for `osbuil.util.udev.UdevInhibitor`.
import argparse
import errno
import glob
import os
import stat
import sys
from typing import Dict
@ -35,6 +37,10 @@ SCHEMA = """
"type": "string",
"description": "File to associate with the loopback device"
},
"partscan": {
"type": "boolean",
"description": "Perform partition scanning after device creation"
},
"start": {
"type": "number",
"description": "Start of the data segment (in sectors)",
@ -71,10 +77,9 @@ class LoopbackService(devices.DeviceService):
lock = UdevInhibitor.for_device(lo.LOOP_MAJOR, lo.minor)
lo.on_close = lambda _l: lock.release()
def make_loop(self, fd: int, offset, sizelimit, lock):
def make_loop(self, fd: int, offset, sizelimit, lock, partscan):
if not sizelimit:
stat = os.fstat(fd)
sizelimit = stat.st_size - offset
sizelimit = os.fstat(fd).st_size - offset
else:
sizelimit *= self.sector_size
@ -83,7 +88,7 @@ class LoopbackService(devices.DeviceService):
offset=offset,
sizelimit=sizelimit,
blocksize=self.sector_size,
partscan=False,
partscan=partscan,
autoclear=True)
return lo
@ -94,12 +99,13 @@ class LoopbackService(devices.DeviceService):
start = options.get("start", 0) * self.sector_size
size = options.get("size")
lock = options.get("lock", False)
partscan = options.get("partscan", False)
path = os.path.join(tree, filename.lstrip("/"))
self.fd = os.open(path, os.O_RDWR | os.O_CLOEXEC)
try:
self.lo = self.make_loop(self.fd, start, size, lock)
self.lo = self.make_loop(self.fd, start, size, lock, partscan)
except Exception as error: # pylint: disable: broad-except
self.close()
raise error from None
@ -111,6 +117,15 @@ class LoopbackService(devices.DeviceService):
dir_fd = os.open(devpath, os.O_CLOEXEC | os.O_PATH)
with ctx.suppress_oserror(errno.EEXIST):
self.lo.mknod(dir_fd)
# If partscan was enabled let's find any partition
# based devices that were added (i.e. loop0p1) and
# copy them into our custom /dev/ directory:
if partscan:
for device in glob.glob(os.path.join("/dev/", f"{self.lo.devname}p*")):
os.mknod(os.path.basename(device),
mode=(0o600 | stat.S_IFBLK),
device=os.stat(device).st_rdev,
dir_fd=dir_fd)
finally:
if dir_fd > -1:
os.close(dir_fd)