debian-forge/osbuild/util/linux.py
Michael Vogt 09e78c52d9 uktil: add libc.memfd_create() wrapper
This is required for python3.6 where there is no `os.memfd_create()`
yet. Can be removed once we move to python3.8+.
2024-09-17 19:27:03 +02:00

572 lines
18 KiB
Python

"""Linux API Access
This module provides access to linux system-calls and other APIs, in particular
those not provided by the python standard library. The idea is to provide
universal wrappers with broad access to linux APIs. Convenience helpers and
higher-level abstractions are beyond the scope of this module.
In some cases it is overly complex to provide universal access to a specific
API. Hence, the API might be restricted to a reduced subset of its
functionality, just to make sure we can actually implement the wrappers in a
reasonable manner.
"""
import array
import ctypes
import ctypes.util
import fcntl
import hashlib
import hmac
import os
import platform
import struct
import threading
import uuid
__all__ = [
"fcntl_flock",
"ioctl_get_immutable",
"ioctl_toggle_immutable",
"Libc",
"proc_boot_id",
]
# NOTE: These are wrong on at least ALPHA and SPARC. They use different
# ioctl number setups. We should fix this, but this is really awkward
# in standard python.
# Our tests will catch this, so we will not accidentally run into this
# on those architectures.
FS_IOC_GETFLAGS = 0x80086601
FS_IOC_SETFLAGS = 0x40086602
FS_IMMUTABLE_FL = 0x00000010
if platform.machine() == "ppc64le":
BLK_IOC_FLSBUF = 0x20001261
else:
BLK_IOC_FLSBUF = 0x00001261
def ioctl_get_immutable(fd: int):
"""Query FS_IMMUTABLE_FL
This queries the `FS_IMMUTABLE_FL` flag on a specified file.
Arguments
---------
fd
File-descriptor to operate on.
Returns
-------
bool
Whether the `FS_IMMUTABLE_FL` flag is set or not.
Raises
------
OSError
If the underlying ioctl fails, a matching `OSError` will be raised.
"""
if not isinstance(fd, int) or fd < 0:
raise ValueError()
flags = array.array('L', [0])
fcntl.ioctl(fd, FS_IOC_GETFLAGS, flags, True)
return bool(flags[0] & FS_IMMUTABLE_FL)
def ioctl_toggle_immutable(fd: int, set_to: bool):
"""Toggle FS_IMMUTABLE_FL
This toggles the `FS_IMMUTABLE_FL` flag on a specified file. It can both set
and clear the flag.
Arguments
---------
fd
File-descriptor to operate on.
set_to
Whether to set the `FS_IMMUTABLE_FL` flag or not.
Raises
------
OSError
If the underlying ioctl fails, a matching `OSError` will be raised.
"""
if not isinstance(fd, int) or fd < 0:
raise ValueError()
flags = array.array('L', [0])
fcntl.ioctl(fd, FS_IOC_GETFLAGS, flags, True)
if set_to:
flags[0] |= FS_IMMUTABLE_FL
else:
flags[0] &= ~FS_IMMUTABLE_FL
fcntl.ioctl(fd, FS_IOC_SETFLAGS, flags, False)
def ioctl_blockdev_flushbuf(fd: int):
"""Flush the block device buffer cache
NB: This function needs the `CAP_SYS_ADMIN` capability.
Arguments
---------
fd
File-descriptor of a block device to operate on.
Raises
------
OSError
If the underlying ioctl fails, a matching `OSError`
will be raised.
"""
if not isinstance(fd, int) or fd < 0:
raise ValueError(f"Invalid file descriptor: '{fd}'")
fcntl.ioctl(fd, BLK_IOC_FLSBUF, 0)
class LibCap:
"""Wrapper for libcap (capabilities commands and library) project"""
cap_value_t = ctypes.c_int
_lock = threading.Lock()
_inst = None
def __init__(self, lib: ctypes.CDLL) -> None:
self.lib = lib
# process-wide bounding set
get_bound = lib.cap_get_bound
get_bound.argtypes = (self.cap_value_t,)
get_bound.restype = ctypes.c_int
get_bound.errcheck = self._check_result # type: ignore
self._get_bound = get_bound
from_name = lib.cap_from_name
from_name.argtypes = (ctypes.c_char_p, ctypes.POINTER(self.cap_value_t),)
from_name.restype = ctypes.c_int
from_name.errcheck = self._check_result # type: ignore
self._from_name = from_name
to_name = lib.cap_to_name
to_name.argtypes = (ctypes.c_int,)
to_name.restype = ctypes.POINTER(ctypes.c_char)
to_name.errcheck = self._check_result # type: ignore
self._to_name = to_name
free = lib.cap_free
free.argtypes = (ctypes.c_void_p,)
free.restype = ctypes.c_int
free.errcheck = self._check_result # type: ignore
self._free = free
@staticmethod
def _check_result(result, func, args):
if result is None or (isinstance(result, int) and result == -1):
err = ctypes.get_errno()
msg = f"{func.__name__}{args} -> {result}: error ({err}): {os.strerror(err)}"
raise OSError(err, msg)
return result
@staticmethod
def make():
path = ctypes.util.find_library("cap")
if not path:
return None
try:
lib = ctypes.CDLL(path, use_errno=True)
except (OSError, ImportError):
return None
return LibCap(lib)
@staticmethod
def last_cap() -> int:
"""Return the int value of the highest valid capability"""
try:
with open("/proc/sys/kernel/cap_last_cap", "rb") as f:
data = f.read()
return int(data)
except FileNotFoundError:
return 0
@classmethod
def get_default(cls) -> "LibCap":
"""Return a singleton instance of the library"""
with cls._lock:
if cls._inst is None:
cls._inst = cls.make()
return cls._inst
def get_bound(self, capability: int) -> bool:
"""Return the current value of the capability in the thread's bounding set"""
# cap = self.cap_value_t(capability)
return self._get_bound(capability) == 1
def to_name(self, value: int) -> str:
"""Translate from the capability's integer value to the its symbolic name"""
raw = self._to_name(value)
val = ctypes.cast(raw, ctypes.c_char_p).value
if val is None:
raise RuntimeError("Failed to cast.")
res = str(val, encoding="utf-8")
self._free(raw)
return res.upper()
def from_name(self, value: str) -> int:
"""Translate from the symbolic name to its integer value"""
cap = self.cap_value_t()
self._from_name(value.encode("utf-8"), ctypes.pointer(cap))
return int(cap.value)
def cap_is_supported(capability: str = "CAP_CHOWN") -> bool:
"""Return whether a given capability is supported by the system"""
lib = LibCap.get_default()
if not lib:
return False
try:
value = lib.from_name(capability)
lib.get_bound(value)
return True
except OSError:
return False
def cap_bound_set() -> set:
"""Return the calling thread's capability bounding set
If capabilities are not supported this function will return the empty set.
"""
lib = LibCap.get_default()
if not lib:
return set()
res = set(
lib.to_name(cap)
for cap in range(lib.last_cap() + 1)
if lib.get_bound(cap)
)
return res
def cap_mask_to_set(mask: int) -> set:
lib = LibCap.get_default()
if not lib:
return set()
def bits(n):
count = 0
while n:
if n & 1:
yield count
count += 1
n >>= 1
res = {
lib.to_name(cap) for cap in bits(mask)
}
return res
def fcntl_flock(fd: int, lock_type: int, wait: bool = False):
"""Perform File-locking Operation
This function performs a linux file-locking operation on the specified
file-descriptor. The specific type of lock must be specified by the caller.
This function does not allow to specify the byte-range of the file to lock.
Instead, it always applies the lock operations to the entire file.
For system-level documentation, see the `fcntl(2)` man-page, especially the
section about `struct flock` and the locking commands.
This function always uses the open-file-description locks provided by
modern linux kernels. This means, locks are tied to the
open-file-description. That is, they are shared between duplicated
file-descriptors. Furthermore, acquiring a lock while already holding a
lock will update the lock to the new specified lock type, rather than
acquiring a new lock.
If `wait` is `False` a non-blocking operation is performed. In case the lock
is contested a `BlockingIOError` is raised by the python standard library.
If `Wait` is `True`, the kernel will suspend execution until the lock is
acquired.
If a synchronous exception is raised, the operation will be canceled and the
exception is forwarded.
Parameters
----------
fd
The file-descriptor to use for the locking operation.
lock_type
The type of lock to use. This can be one of: `fcntl.F_RDLCK`,
`fcntl.F_WRLCK`, `fcntl.F_UNLCK`.
wait
Whether to suspend execution until the lock is acquired in case of
contested locks.
Raises
------
OSError
If the underlying `fcntl(2)` syscall fails, a matching `OSError` is
raised. In particular, `BlockingIOError` signals contested locks. The
POSIX error code is `EAGAIN`.
"""
valid_types = [fcntl.F_RDLCK, fcntl.F_WRLCK, fcntl.F_UNLCK]
if lock_type not in valid_types:
raise ValueError("Unknown lock type")
if not isinstance(fd, int):
raise ValueError("File-descriptor is not an integer")
if fd < 0:
raise ValueError("File-descriptor is negative")
#
# The `OFD` constants are not available through the `fcntl` module, so we
# need to use their integer representations directly. They are the same
# across all linux ABIs:
#
# F_OFD_GETLK = 36
# F_OFD_SETLK = 37
# F_OFD_SETLKW = 38
#
if wait:
lock_cmd = 38
else:
lock_cmd = 37
#
# We use the linux open-file-descriptor (OFD) version of the POSIX file
# locking operations. They attach locks to an open file description, rather
# than to a process. They have clear, useful semantics.
# This means, we need to use the `fcntl(2)` operation with `struct flock`,
# which is rather unfortunate, since it varies depending on compiler
# arguments used for the python library, as well as depends on the host
# architecture, etc.
#
# The structure layout of the locking argument is:
#
# struct flock {
# short int l_type;
# short int l_whence;
# off_t l_start;
# off_t l_len;
# pid_t int l_pid;
# }
#
# The possible options for `l_whence` are `SEEK_SET`, `SEEK_CUR`, and
# `SEEK_END`. All are provided by the `fcntl` module. Same for the possible
# options for `l_type`, which are `L_RDLCK`, `L_WRLCK`, and `L_UNLCK`.
#
# Depending on which architecture you run on, but also depending on whether
# large-file mode was enabled to compile the python library, the values of
# the constants as well as the sizes of `off_t` can change. What we know is
# that `short int` is always 16-bit on linux, and we know that `fcntl(2)`
# does not take a `size` parameter. Therefore, the kernel will just fetch
# the structure from user-space with the correct size. The python wrapper
# `fcntl.fcntl()` always uses a 1024-bytes buffer and thus we can just pad
# our argument with trailing zeros to provide a valid argument to the
# kernel. Note that your libc might also do automatic translation to
# `fcntl64(2)` and `struct flock64` (if you run on 32bit machines with
# large-file support enabled). Also, random architectures change trailing
# padding of the structure (MIPS-ABI32 adds 128-byte trailing padding,
# SPARC adds 16?).
#
# To avoid all this mess, we use the fact that we only care for `l_type`.
# Everything else is always set to 0 in all our needed locking calls.
# Therefore, we simply use the largest possible `struct flock` for your
# libc and set everything to 0. The `l_type` field is guaranteed to be
# 16-bit, so it will have the correct offset, alignment, and endianness
# without us doing anything. Downside of all this is that all our locks
# always affect the entire file. However, we do not need locks for specific
# sub-regions of a file, so we should be fine. Eventually, what we end up
# with passing to libc is:
#
# struct flock {
# uint16_t l_type;
# uint16_t l_whence;
# uint32_t pad0;
# uint64_t pad1;
# uint64_t pad2;
# uint32_t pad3;
# uint32_t pad4;
# }
#
type_flock64 = struct.Struct('=HHIQQII')
arg_flock64 = type_flock64.pack(lock_type, 0, 0, 0, 0, 0, 0)
#
# Since python-3.5 (PEP475) the standard library guards around `EINTR` and
# automatically retries the operation. Hence, there is no need to retry
# waiting calls. If a python signal handler raises an exception, the
# operation is not retried and the exception is forwarded.
#
fcntl.fcntl(fd, lock_cmd, arg_flock64)
class c_timespec(ctypes.Structure):
_fields_ = [('tv_sec', ctypes.c_long), ('tv_nsec', ctypes.c_long)]
class c_timespec_times2(ctypes.Structure):
_fields_ = [('atime', c_timespec), ('mtime', c_timespec)]
class Libc:
"""Safe Access to libc
This class provides selected safe accessors to libc functionality. It is
highly linux-specific and uses `ctypes.CDLL` to access `libc`.
"""
AT_FDCWD = ctypes.c_int(-100)
RENAME_EXCHANGE = ctypes.c_uint(2)
RENAME_NOREPLACE = ctypes.c_uint(1)
RENAME_WHITEOUT = ctypes.c_uint(4)
# see /usr/include/x86_64-linux-gnu/bits/stat.h
UTIME_NOW = ctypes.c_long(((1 << 30) - 1))
UTIME_OMIT = ctypes.c_long(((1 << 30) - 2))
_lock = threading.Lock()
_inst = None
def __init__(self, lib: ctypes.CDLL):
self._lib = lib
# prototype: renameat2
proto = ctypes.CFUNCTYPE(
ctypes.c_int,
ctypes.c_int,
ctypes.c_char_p,
ctypes.c_int,
ctypes.c_char_p,
ctypes.c_uint,
use_errno=True,
)(
("renameat2", self._lib),
(
(1, "olddirfd", self.AT_FDCWD),
(1, "oldpath"),
(1, "newdirfd", self.AT_FDCWD),
(1, "newpath"),
(1, "flags", 0),
),
)
setattr(proto, "errcheck", self._errcheck_errno)
setattr(proto, "__name__", "renameat2")
self.renameat2 = proto
# prototype: futimens
proto = ctypes.CFUNCTYPE(
ctypes.c_int,
ctypes.c_int,
ctypes.POINTER(c_timespec_times2),
use_errno=True,
)(
("futimens", self._lib),
(
(1, "fd"),
(1, "timespec"),
),
)
setattr(proto, "errcheck", self._errcheck_errno)
setattr(proto, "__name__", "futimens")
self.futimens = proto
# prototype: _memfd_create() (takes a byte type name)
# (can be removed once we move to python3.8)
proto = ctypes.CFUNCTYPE(
ctypes.c_int, # restype (return type)
ctypes.c_char_p,
ctypes.c_uint,
use_errno=True,
)(
("memfd_create", self._lib),
(
(1, "name"),
(1, "flags", 0),
),
)
setattr(proto, "errcheck", self._errcheck_errno)
setattr(proto, "__name__", "memfd_create")
self._memfd_create = proto
# (can be removed once we move to python3.8)
def memfd_create(self, name: str, flags: int = 0) -> int:
""" create an anonymous file """
char_p_name = name.encode()
return self._memfd_create(char_p_name, flags)
@staticmethod
def make() -> "Libc":
"""Create a new instance"""
return Libc(ctypes.CDLL("", use_errno=True))
@classmethod
def default(cls) -> "Libc":
"""Return and possibly create the default singleton instance"""
with cls._lock:
if cls._inst is None:
cls._inst = cls.make()
return cls._inst
@staticmethod
def _errcheck_errno(result, func, args):
if result < 0:
err = ctypes.get_errno()
msg = f"{func.__name__}{args} -> {result}: error ({err}): {os.strerror(err)}"
raise OSError(err, msg)
return result
def proc_boot_id(appid: str):
"""Acquire Application-specific Boot-ID
This queries the kernel for the boot-id of the running system. It then
calculates an application-specific boot-id by combining the kernel boot-id
with the provided application-id. This uses a cryptographic HMAC.
Therefore, the kernel boot-id will not be deducable from the output. This
allows the caller to use the resulting application specific boot-id for any
purpose they wish without exposing the confidential kernel boot-id.
This always returns an object of type `uuid.UUID` from the python standard
library. Furthermore, this always produces UUIDs of version 4 variant 1.
Parameters
----------
appid
An arbitrary object (usually a string) that identifies the use-case of
the boot-id.
"""
with open("/proc/sys/kernel/random/boot_id", "r", encoding="utf8") as f:
content = f.read().strip(" \t\r\n")
# Running the boot-id through HMAC-SHA256 guarantees that the original
# boot-id will not be exposed. Thus two IDs generated with this interface
# will not allow to deduce whether they share a common boot-id.
# From the result, we throw away everything but the lower 128bits and then
# turn it into a UUID version 4 variant 1.
h = bytearray(hmac.new(content.encode(), appid.encode(), hashlib.sha256).digest()) # type: ignore
h[6] = (h[6] & 0x0f) | 0x40 # mark as version 4
h[8] = (h[6] & 0x3f) | 0x80 # mark as variant 1
return uuid.UUID(bytes=bytes(h[0:16]))