Using O_DIRECT to open the image partition and then using that fd
for the backing of the loopback device will break the mounting of
the formatted partition, i.e mount will fail with:
mount: /tmp/looptest-6qrtkp5e/mountpoint-root: wrong fs type,
bad option, bad superblock on /dev/loop0, missing codepage or
helper program, or other error.
Reproducible with the follow small-ish python script, executed via
'env PYTHONPATH=$(pwd) python3 looptest.py':
---- 8< ---- 8< ---- [ looptest.py ] ---- 8< ---- 8< ----
import contextlib
import json
import os
import subprocess
import stat
import tempfile
from osbuild import loop
@contextlib.contextmanager
def mount(source, dest):
subprocess.run(["mount", source, dest], check=True)
try:
yield dest
finally:
subprocess.run(["umount", "-R", dest], check=True)
@contextlib.contextmanager
def os_open(path, flags):
fd = os.open(path, flags)
try:
yield fd
finally:
os.close(fd)
def main():
size = 512 * 1024 * 1024
ptuuid = "0x14fc63d2"
with contextlib.ExitStack() as cm:
tmpdir = cm.enter_context(tempfile.TemporaryDirectory(prefix="looptest-"))
print(f"Temporary directory at {tmpdir}")
devdir = os.path.join(tmpdir, "dev")
os.makedirs(devdir, exist_ok=True)
dir_fd = cm.enter_context(os_open(devdir, os.O_DIRECTORY))
image = os.path.join(tmpdir, "image")
subprocess.run(["truncate", "--size", str(size), image], check=True)
table = f"label: mbr\nlabel-id: {ptuuid}\nbootable, type=83"
subprocess.run(["sfdisk", image], input=table, encoding='utf-8',
check=True)
# read it back
r = subprocess.run(["sfdisk", "--json", image],
stdout=subprocess.PIPE,
encoding='utf-8', check=True)
table = json.loads(r.stdout)["partitiontable"]
partitions = table["partitions"]
start = partitions[0]["start"] * 512
size = partitions[0]["size"] * 512
# fails here with os.O_DIRECT
image_fd = cm.enter_context(os_open(image, os.O_RDWR | os.O_DIRECT))
control = loop.LoopControl()
minor = control.get_unbound()
lo = loop.Loop(minor)
lo.set_fd(image_fd)
lo.set_status(offset=start, sizelimit=size, autoclear=True)
lo.mknod(dir_fd)
loopdev = f"/dev/loop{minor}"
# loopdev = os.path.join(devdir, lo.devname)
# os.chmod(loopdev, os.stat(loopdev).st_mode | stat.S_IRGRP)
subprocess.run(["ls", "-la", f"{devdir}"], check=True)
subprocess.run(["mkfs.ext4", loopdev],
input="y", encoding='utf-8', check=True)
subprocess.run(["blkid", loopdev], check=True)
mountpoint = os.path.join(tmpdir, "mountpoint-root")
os.makedirs(mountpoint, exist_ok=True)
cm.enter_context(mount(loopdev, mountpoint))
subprocess.run(["ls", "-la", tmpdir], check=True)
subprocess.run(["ls", "-la", mountpoint], check=True)
subprocess.run(["mount"], check=True)
if __name__ == '__main__':
main()
149 lines
4.6 KiB
Python
149 lines
4.6 KiB
Python
import array
|
|
import asyncio
|
|
import contextlib
|
|
import errno
|
|
import json
|
|
import os
|
|
import platform
|
|
import socket
|
|
import threading
|
|
from . import loop
|
|
|
|
|
|
__all__ = [
|
|
"LoopClient",
|
|
"LoopServer"
|
|
]
|
|
|
|
|
|
def load_fds(sock, msglen):
|
|
fds = array.array("i") # Array of ints
|
|
msg, ancdata, _, addr = sock.recvmsg(msglen, socket.CMSG_LEN(253 * fds.itemsize))
|
|
for cmsg_level, cmsg_type, cmsg_data in ancdata:
|
|
if (cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS):
|
|
# Append data, ignoring any truncated integers at the end.
|
|
fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
|
|
return json.loads(msg), list(fds), addr
|
|
|
|
|
|
def dump_fds(sock, obj, fds, flags=0, addr=None):
|
|
ancillary = [(socket.SOL_SOCKET, socket.SCM_RIGHTS, array.array("i", fds))]
|
|
sock.sendmsg([json.dumps(obj).encode('utf-8')], ancillary, flags, addr)
|
|
|
|
|
|
class LoopServer:
|
|
"""Server for creating loopback devices
|
|
|
|
The server listens for requests on a AF_UNIX/SOCK_DRGAM sockets.
|
|
|
|
A request should contain SCM_RIGHTS of two filedescriptors, one
|
|
that sholud be the backing file for the new loopdevice, and a
|
|
second that should be a directory file descriptor where the new
|
|
device node will be created.
|
|
|
|
The payload should be a JSON object with the mandatory arguments
|
|
@fd which is the offset in the SCM_RIGHTS array for the backing
|
|
file descriptor and @dir_fd which is the offset for the output
|
|
directory. Optionally, @offset and @sizelimit in bytes may also
|
|
be specified.
|
|
|
|
The server respods with a JSON object containing the device name
|
|
of the new device node created in the output directory.
|
|
|
|
The created loopback device is guaranteed to be bound to the
|
|
given backing file descriptor for the lifetime of the LoopServer
|
|
object.
|
|
"""
|
|
|
|
def __init__(self, socket_address):
|
|
self.socket_address = socket_address
|
|
self.devs = []
|
|
self.ctl = loop.LoopControl()
|
|
self.event_loop = asyncio.new_event_loop()
|
|
self.thread = threading.Thread(target=self._run_event_loop)
|
|
|
|
def _create_device(self, fd, dir_fd, offset=None, sizelimit=None):
|
|
while True:
|
|
# Getting an unbound loopback device and attaching a backing
|
|
# file descriptor to it is racy, so we must use a retry loop
|
|
lo = loop.Loop(self.ctl.get_unbound())
|
|
try:
|
|
lo.set_fd(fd)
|
|
except OSError as e:
|
|
if e.errno == errno.EBUSY:
|
|
continue
|
|
raise e
|
|
break
|
|
lo.set_status(offset=offset, sizelimit=sizelimit, autoclear=True)
|
|
lo.mknod(dir_fd)
|
|
# Pin the Loop objects so they are only released when the LoopServer
|
|
# is destroyed.
|
|
self.devs.append(lo)
|
|
return lo.devname
|
|
|
|
def _dispatch(self, sock):
|
|
args, fds, addr = load_fds(sock, 1024)
|
|
|
|
fd = fds[args["fd"]]
|
|
dir_fd = fds[args["dir_fd"]]
|
|
offset = args.get("offset")
|
|
sizelimit = args.get("sizelimit")
|
|
|
|
devname = self._create_device(fd, dir_fd, offset, sizelimit)
|
|
ret = json.dumps({"devname": devname})
|
|
sock.sendto(ret.encode('utf-8'), addr)
|
|
|
|
def _run_event_loop(self):
|
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
|
sock.bind(self.socket_address)
|
|
self.event_loop.add_reader(sock, self._dispatch, sock)
|
|
asyncio.set_event_loop(self.event_loop)
|
|
self.event_loop.run_forever()
|
|
self.event_loop.remove_reader(sock)
|
|
sock.close()
|
|
|
|
def __enter__(self):
|
|
self.thread.start()
|
|
|
|
def __exit__(self, *args):
|
|
self.event_loop.call_soon_threadsafe(self.event_loop.stop)
|
|
self.thread.join()
|
|
|
|
|
|
class LoopClient:
|
|
def __init__(self, sock):
|
|
self.sock = sock
|
|
|
|
@contextlib.contextmanager
|
|
def device(self, filename, offset=None, sizelimit=None):
|
|
req = {}
|
|
fds = array.array("i")
|
|
oflags = os.O_RDWR
|
|
|
|
if platform.machine() != "s390x":
|
|
# O_DIRECT will break s390x currently
|
|
oflags |= os.O_DIRECT
|
|
|
|
fd = os.open(filename, oflags)
|
|
dir_fd = os.open("/dev", os.O_DIRECTORY)
|
|
|
|
fds.append(fd)
|
|
req["fd"] = 0
|
|
fds.append(dir_fd)
|
|
req["dir_fd"] = 1
|
|
|
|
if offset:
|
|
req["offset"] = offset
|
|
if sizelimit:
|
|
req["sizelimit"] = sizelimit
|
|
|
|
dump_fds(self.sock, req, fds)
|
|
os.close(dir_fd)
|
|
os.close(fd)
|
|
|
|
ret = json.loads(self.sock.recv(1024))
|
|
path = os.path.join("/dev", ret["devname"])
|
|
try:
|
|
yield path
|
|
finally:
|
|
os.unlink(path)
|