debian-forge/devices/org.osbuild.luks2
Christian Kellner 84fcf66596 devices/luks2: use new ensure_device_node helper
Use the new `DeviceService.ensure_device_node` helper; as a side
effect we won't fail if the device node already exists.
2022-11-22 18:28:38 +01:00

154 lines
4.4 KiB
Python
Executable file

#!/usr/bin/python3
"""
Host service for Linux Unified Key Setup (LUKS, format 2) devices.
This will open a LUKS container, given the path of the parent
device and the corresponding passphrase.
NB: This will use the custom osbuild udev rule inhibitor mechanism
to suppress certain udev rules. See `osbuil.util.udev.UdevInhibitor`
for details.
Host commands used: `cryptsetup`, `dmsetup`
"""
import argparse
import contextlib
import os
import subprocess
import sys
import uuid
from typing import Dict, Optional
from osbuild import devices
from osbuild.util.udev import UdevInhibitor
SCHEMA = """
"additionalProperties": false,
"required": ["passphrase"],
"properties": {
"passphrase": {
"description": "Passphrase to use",
"default": "",
"type": "string"
}
}
"""
class CryptDeviceService(devices.DeviceService):
def __init__(self, args: argparse.Namespace):
super().__init__(args)
self.devname: Optional[str] = None
self.lock = None
self.check = False
def dminfo(self, name=None):
"""Return the major, minor and open count for the device"""
res = subprocess.run(["dmsetup", "info", "-c",
"-o", "major,minor,open",
"--noheadings",
"--separator", ":",
name or self.devname],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding="UTF-8")
if res.returncode != 0:
data = res.stdout.strip()
msg = f"Failed to find the device node: {data}"
raise RuntimeError(msg)
data = res.stdout.strip()
data = list(map(int, data.split(":")))
assert len(data) == 3
major, minor, count = data[0], data[1], data[2]
return major, minor, count
def open_count(self, name=None):
count = 0
with contextlib.suppress(RuntimeError):
_, _, count = self.dminfo(name)
return count
def open(self, devpath: str, parent: str, tree: str, options: Dict):
passphrase = options.get("passphrase", "")
parent_dev = os.path.join("/dev", parent)
# Generate a random name for it, since this is a temporary name
# that is not store in the device at all
devname = "osbuild-luks-" + str(uuid.uuid4())
self.devname = devname
# This employs the custom osbuild udev rule inhibitor mechanism
self.lock = UdevInhibitor.for_dm_name(devname)
# Make sure the logical volume is activated
res = subprocess.run(["cryptsetup", "-q", "open", parent_dev, devname],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
input=passphrase,
encoding="UTF-8")
if res.returncode != 0:
data = res.stdout.strip()
msg = f"Failed to open crypt device: {data}"
raise RuntimeError(msg)
print(f"opened as {devname}")
# Now that device is successfully opened, we check on close
self.check = True
# Now that the crypt device is open, find its major/minor numbers
major, minor, _ = self.dminfo()
subpath = os.path.join("mapper", devname)
fullpath = os.path.join(devpath, subpath)
os.makedirs(os.path.join(devpath, "mapper"), exist_ok=True)
self.ensure_device_node(fullpath, major, minor)
data = {
"path": subpath,
"name": devname,
"node": {
"major": major,
"minor": minor
}
}
return data
def close(self):
if not self.devname:
return
_, _, opencount = self.dminfo()
print(f"closing (opencount: {opencount})")
self.lock.release()
self.lock = None
name = self.devname
self.devname = None
# finally close the device
subprocess.run(["cryptsetup", "-q", "close", name],
check=self.check,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding="UTF-8")
def main():
service = CryptDeviceService.from_args(sys.argv[1:])
service.main()
if __name__ == '__main__':
r = main()
sys.exit(r)