devices: add new org.osbuild.luks2
This is the corresponding device for LUKS containers created via the new `org.osbuild.luks2.format` stage. Needed information are the parent device and the passphrase used to create the container. NB: this device always uses the new custom block device udev rule inhibitor facility.
This commit is contained in:
parent
dbd8035881
commit
4676c701b7
1 changed files with 157 additions and 0 deletions
157
devices/org.osbuild.luks2
Executable file
157
devices/org.osbuild.luks2
Executable file
|
|
@ -0,0 +1,157 @@
|
|||
#!/usr/bin/python3
|
||||
"""
|
||||
Host service for Linux Unified Key Setup (LUKS, format 2) devices.
|
||||
|
||||
This will open a LUKS container, given the path of the parent
|
||||
device and the corresponding passphrase.
|
||||
|
||||
NB: This will use the custom osbuild udev rule inhibitor mechanism
|
||||
to suppress certain udev rules. See `osbuil.util.udev.UdevInhibitor`
|
||||
for details.
|
||||
|
||||
Host commands used: `cryptsetup`, `dmsetup`
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import contextlib
|
||||
import os
|
||||
import stat
|
||||
import subprocess
|
||||
import sys
|
||||
import uuid
|
||||
|
||||
from typing import Dict
|
||||
|
||||
from osbuild import devices
|
||||
from osbuild.util.udev import UdevInhibitor
|
||||
|
||||
|
||||
SCHEMA = """
|
||||
"additionalProperties": false,
|
||||
"required": ["passphrase"],
|
||||
"properties": {
|
||||
"passphrase": {
|
||||
"description": "Passphrase to use",
|
||||
"default": "",
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
|
||||
"""
|
||||
|
||||
|
||||
class CryptDeviceService(devices.DeviceService):
|
||||
|
||||
def __init__(self, args: argparse.Namespace):
|
||||
super().__init__(args)
|
||||
self.devname = None
|
||||
self.lock = None
|
||||
self.check = False
|
||||
|
||||
def dminfo(self, name=None):
|
||||
"""Return the major, minor and open count for the device"""
|
||||
res = subprocess.run(["dmsetup", "info", "-c",
|
||||
"-o", "major,minor,open",
|
||||
"--noheadings",
|
||||
"--separator", ":",
|
||||
name or self.devname],
|
||||
check=False,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
encoding="UTF-8")
|
||||
|
||||
if res.returncode != 0:
|
||||
data = res.stdout.strip()
|
||||
msg = f"Failed to find the device node: {data}"
|
||||
raise RuntimeError(msg)
|
||||
|
||||
data = res.stdout.strip()
|
||||
data = list(map(int, data.split(":")))
|
||||
assert len(data) == 3
|
||||
major, minor, count = data[0], data[1], data[2]
|
||||
return major, minor, count
|
||||
|
||||
def open_count(self, name=None):
|
||||
count = 0
|
||||
with contextlib.suppress(RuntimeError):
|
||||
_, _, count = self.dminfo(name)
|
||||
return count
|
||||
|
||||
def open(self, devpath: str, parent: str, tree: str, options: Dict):
|
||||
passphrase = options.get("passphrase", "")
|
||||
|
||||
parent_dev = os.path.join("/dev", parent)
|
||||
|
||||
# Generate a random name for it, since this is a temporary name
|
||||
# that is not store in the device at all
|
||||
devname = "osbuild-luks-" + str(uuid.uuid4())
|
||||
self.devname = devname
|
||||
|
||||
# This employs the custom osbuild udev rule inhibitor mechanism
|
||||
self.lock = UdevInhibitor.for_dm_name(devname)
|
||||
|
||||
# Make sure the logical volume is activated
|
||||
res = subprocess.run(["cryptsetup", "-q", "open", parent_dev, devname],
|
||||
check=False,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
input=passphrase,
|
||||
encoding="UTF-8")
|
||||
|
||||
if res.returncode != 0:
|
||||
data = res.stdout.strip()
|
||||
msg = f"Failed to open crypt device: {data}"
|
||||
raise RuntimeError(msg)
|
||||
|
||||
print(f"opened as {devname}")
|
||||
|
||||
# Now that device is successfully opened, we check on close
|
||||
self.check = True
|
||||
|
||||
# Now that the crypt device is open, find its major/minor numbers
|
||||
major, minor, _ = self.dminfo()
|
||||
|
||||
subpath = os.path.join("mapper", devname)
|
||||
fullpath = os.path.join(devpath, subpath)
|
||||
os.makedirs(os.path.join(devpath, "mapper"), exist_ok=True)
|
||||
os.mknod(fullpath, 0o666 | stat.S_IFBLK, os.makedev(major, minor))
|
||||
|
||||
data = {
|
||||
"path": subpath,
|
||||
"name": devname,
|
||||
"node": {
|
||||
"major": major,
|
||||
"minor": minor
|
||||
}
|
||||
}
|
||||
return data
|
||||
|
||||
def close(self):
|
||||
if not self.devname:
|
||||
return
|
||||
|
||||
_, _, opencount = self.dminfo()
|
||||
print(f"closing (opencount: {opencount})")
|
||||
|
||||
self.lock.release()
|
||||
self.lock = None
|
||||
|
||||
name = self.devname
|
||||
self.devname = None
|
||||
|
||||
# finally close the device
|
||||
res = subprocess.run(["cryptsetup", "-q", "close", name],
|
||||
check=self.check,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
encoding="UTF-8")
|
||||
|
||||
|
||||
def main():
|
||||
service = CryptDeviceService.from_args(sys.argv[1:])
|
||||
service.main()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
r = main()
|
||||
sys.exit(r)
|
||||
Loading…
Add table
Add a link
Reference in a new issue