Optional types were provided in places but were not always correct. Add mypy checking and fix those that fail(ed).
157 lines
4.5 KiB
Python
Executable file
157 lines
4.5 KiB
Python
Executable file
#!/usr/bin/python3
|
|
"""
|
|
Host service for Linux Unified Key Setup (LUKS, format 2) devices.
|
|
|
|
This will open a LUKS container, given the path of the parent
|
|
device and the corresponding passphrase.
|
|
|
|
NB: This will use the custom osbuild udev rule inhibitor mechanism
|
|
to suppress certain udev rules. See `osbuil.util.udev.UdevInhibitor`
|
|
for details.
|
|
|
|
Host commands used: `cryptsetup`, `dmsetup`
|
|
"""
|
|
|
|
import argparse
|
|
import contextlib
|
|
import os
|
|
import stat
|
|
import subprocess
|
|
import sys
|
|
import uuid
|
|
|
|
from typing import Dict, Optional
|
|
|
|
from osbuild import devices
|
|
from osbuild.util.udev import UdevInhibitor
|
|
|
|
|
|
SCHEMA = """
|
|
"additionalProperties": false,
|
|
"required": ["passphrase"],
|
|
"properties": {
|
|
"passphrase": {
|
|
"description": "Passphrase to use",
|
|
"default": "",
|
|
"type": "string"
|
|
}
|
|
}
|
|
|
|
"""
|
|
|
|
|
|
class CryptDeviceService(devices.DeviceService):
|
|
|
|
def __init__(self, args: argparse.Namespace):
|
|
super().__init__(args)
|
|
self.devname: Optional[str] = None
|
|
self.lock = None
|
|
self.check = False
|
|
|
|
def dminfo(self, name=None):
|
|
"""Return the major, minor and open count for the device"""
|
|
res = subprocess.run(["dmsetup", "info", "-c",
|
|
"-o", "major,minor,open",
|
|
"--noheadings",
|
|
"--separator", ":",
|
|
name or self.devname],
|
|
check=False,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
encoding="UTF-8")
|
|
|
|
if res.returncode != 0:
|
|
data = res.stdout.strip()
|
|
msg = f"Failed to find the device node: {data}"
|
|
raise RuntimeError(msg)
|
|
|
|
data = res.stdout.strip()
|
|
data = list(map(int, data.split(":")))
|
|
assert len(data) == 3
|
|
major, minor, count = data[0], data[1], data[2]
|
|
return major, minor, count
|
|
|
|
def open_count(self, name=None):
|
|
count = 0
|
|
with contextlib.suppress(RuntimeError):
|
|
_, _, count = self.dminfo(name)
|
|
return count
|
|
|
|
def open(self, devpath: str, parent: str, tree: str, options: Dict):
|
|
passphrase = options.get("passphrase", "")
|
|
|
|
parent_dev = os.path.join("/dev", parent)
|
|
|
|
# Generate a random name for it, since this is a temporary name
|
|
# that is not store in the device at all
|
|
devname = "osbuild-luks-" + str(uuid.uuid4())
|
|
self.devname = devname
|
|
|
|
# This employs the custom osbuild udev rule inhibitor mechanism
|
|
self.lock = UdevInhibitor.for_dm_name(devname)
|
|
|
|
# Make sure the logical volume is activated
|
|
res = subprocess.run(["cryptsetup", "-q", "open", parent_dev, devname],
|
|
check=False,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
input=passphrase,
|
|
encoding="UTF-8")
|
|
|
|
if res.returncode != 0:
|
|
data = res.stdout.strip()
|
|
msg = f"Failed to open crypt device: {data}"
|
|
raise RuntimeError(msg)
|
|
|
|
print(f"opened as {devname}")
|
|
|
|
# Now that device is successfully opened, we check on close
|
|
self.check = True
|
|
|
|
# Now that the crypt device is open, find its major/minor numbers
|
|
major, minor, _ = self.dminfo()
|
|
|
|
subpath = os.path.join("mapper", devname)
|
|
fullpath = os.path.join(devpath, subpath)
|
|
os.makedirs(os.path.join(devpath, "mapper"), exist_ok=True)
|
|
os.mknod(fullpath, 0o666 | stat.S_IFBLK, os.makedev(major, minor))
|
|
|
|
data = {
|
|
"path": subpath,
|
|
"name": devname,
|
|
"node": {
|
|
"major": major,
|
|
"minor": minor
|
|
}
|
|
}
|
|
return data
|
|
|
|
def close(self):
|
|
if not self.devname:
|
|
return
|
|
|
|
_, _, opencount = self.dminfo()
|
|
print(f"closing (opencount: {opencount})")
|
|
|
|
self.lock.release()
|
|
self.lock = None
|
|
|
|
name = self.devname
|
|
self.devname = None
|
|
|
|
# finally close the device
|
|
res = subprocess.run(["cryptsetup", "-q", "close", name],
|
|
check=self.check,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
encoding="UTF-8")
|
|
|
|
|
|
def main():
|
|
service = CryptDeviceService.from_args(sys.argv[1:])
|
|
service.main()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
r = main()
|
|
sys.exit(r)
|