Use the new `DeviceService.ensure_device_node` helper; as a side effect we won't fail if the device node already exists.
154 lines
4.4 KiB
Python
Executable file
154 lines
4.4 KiB
Python
Executable file
#!/usr/bin/python3
|
|
"""
|
|
Host service for Linux Unified Key Setup (LUKS, format 2) devices.
|
|
|
|
This will open a LUKS container, given the path of the parent
|
|
device and the corresponding passphrase.
|
|
|
|
NB: This will use the custom osbuild udev rule inhibitor mechanism
|
|
to suppress certain udev rules. See `osbuil.util.udev.UdevInhibitor`
|
|
for details.
|
|
|
|
Host commands used: `cryptsetup`, `dmsetup`
|
|
"""
|
|
|
|
import argparse
|
|
import contextlib
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import uuid
|
|
from typing import Dict, Optional
|
|
|
|
from osbuild import devices
|
|
from osbuild.util.udev import UdevInhibitor
|
|
|
|
SCHEMA = """
|
|
"additionalProperties": false,
|
|
"required": ["passphrase"],
|
|
"properties": {
|
|
"passphrase": {
|
|
"description": "Passphrase to use",
|
|
"default": "",
|
|
"type": "string"
|
|
}
|
|
}
|
|
|
|
"""
|
|
|
|
|
|
class CryptDeviceService(devices.DeviceService):
|
|
|
|
def __init__(self, args: argparse.Namespace):
|
|
super().__init__(args)
|
|
self.devname: Optional[str] = None
|
|
self.lock = None
|
|
self.check = False
|
|
|
|
def dminfo(self, name=None):
|
|
"""Return the major, minor and open count for the device"""
|
|
res = subprocess.run(["dmsetup", "info", "-c",
|
|
"-o", "major,minor,open",
|
|
"--noheadings",
|
|
"--separator", ":",
|
|
name or self.devname],
|
|
check=False,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
encoding="UTF-8")
|
|
|
|
if res.returncode != 0:
|
|
data = res.stdout.strip()
|
|
msg = f"Failed to find the device node: {data}"
|
|
raise RuntimeError(msg)
|
|
|
|
data = res.stdout.strip()
|
|
data = list(map(int, data.split(":")))
|
|
assert len(data) == 3
|
|
major, minor, count = data[0], data[1], data[2]
|
|
return major, minor, count
|
|
|
|
def open_count(self, name=None):
|
|
count = 0
|
|
with contextlib.suppress(RuntimeError):
|
|
_, _, count = self.dminfo(name)
|
|
return count
|
|
|
|
def open(self, devpath: str, parent: str, tree: str, options: Dict):
|
|
passphrase = options.get("passphrase", "")
|
|
|
|
parent_dev = os.path.join("/dev", parent)
|
|
|
|
# Generate a random name for it, since this is a temporary name
|
|
# that is not store in the device at all
|
|
devname = "osbuild-luks-" + str(uuid.uuid4())
|
|
self.devname = devname
|
|
|
|
# This employs the custom osbuild udev rule inhibitor mechanism
|
|
self.lock = UdevInhibitor.for_dm_name(devname)
|
|
|
|
# Make sure the logical volume is activated
|
|
res = subprocess.run(["cryptsetup", "-q", "open", parent_dev, devname],
|
|
check=False,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
input=passphrase,
|
|
encoding="UTF-8")
|
|
|
|
if res.returncode != 0:
|
|
data = res.stdout.strip()
|
|
msg = f"Failed to open crypt device: {data}"
|
|
raise RuntimeError(msg)
|
|
|
|
print(f"opened as {devname}")
|
|
|
|
# Now that device is successfully opened, we check on close
|
|
self.check = True
|
|
|
|
# Now that the crypt device is open, find its major/minor numbers
|
|
major, minor, _ = self.dminfo()
|
|
|
|
subpath = os.path.join("mapper", devname)
|
|
fullpath = os.path.join(devpath, subpath)
|
|
os.makedirs(os.path.join(devpath, "mapper"), exist_ok=True)
|
|
self.ensure_device_node(fullpath, major, minor)
|
|
|
|
data = {
|
|
"path": subpath,
|
|
"name": devname,
|
|
"node": {
|
|
"major": major,
|
|
"minor": minor
|
|
}
|
|
}
|
|
return data
|
|
|
|
def close(self):
|
|
if not self.devname:
|
|
return
|
|
|
|
_, _, opencount = self.dminfo()
|
|
print(f"closing (opencount: {opencount})")
|
|
|
|
self.lock.release()
|
|
self.lock = None
|
|
|
|
name = self.devname
|
|
self.devname = None
|
|
|
|
# finally close the device
|
|
subprocess.run(["cryptsetup", "-q", "close", name],
|
|
check=self.check,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
encoding="UTF-8")
|
|
|
|
|
|
def main():
|
|
service = CryptDeviceService.from_args(sys.argv[1:])
|
|
service.main()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
r = main()
|
|
sys.exit(r)
|