util/linux: add capability utilities
This commit is contained in:
parent
46fd8958bb
commit
1874c71920
2 changed files with 176 additions and 0 deletions
|
|
@ -13,8 +13,12 @@ reasonable manner.
|
|||
|
||||
|
||||
import array
|
||||
import ctypes
|
||||
import ctypes.util
|
||||
import fcntl
|
||||
import os
|
||||
import platform
|
||||
import threading
|
||||
|
||||
|
||||
__all__ = [
|
||||
|
|
@ -121,3 +125,149 @@ def ioctl_blockdev_flushbuf(fd: int):
|
|||
raise ValueError(f"Invalid file descriptor: '{fd}'")
|
||||
|
||||
fcntl.ioctl(fd, BLK_IOC_FLSBUF, 0)
|
||||
|
||||
|
||||
class LibCap:
|
||||
"""Wrapper for libcap (capabilities commands and library) project"""
|
||||
|
||||
cap_value_t = ctypes.c_int
|
||||
_lock = threading.Lock()
|
||||
_inst = None
|
||||
|
||||
def __init__(self, lib: ctypes.CDLL) -> None:
|
||||
self.lib = lib
|
||||
|
||||
# process-wide bounding set
|
||||
get_bound = lib.cap_get_bound
|
||||
get_bound.argtypes = (self.cap_value_t,)
|
||||
get_bound.restype = ctypes.c_int
|
||||
get_bound.errcheck = self._check_result
|
||||
self._get_bound = get_bound
|
||||
|
||||
from_name = lib.cap_from_name
|
||||
from_name.argtypes = (ctypes.c_char_p, ctypes.POINTER(self.cap_value_t),)
|
||||
from_name.restype = ctypes.c_int
|
||||
from_name.errcheck = self._check_result
|
||||
self._from_name = from_name
|
||||
|
||||
to_name = lib.cap_to_name
|
||||
to_name.argtypes = (ctypes.c_int,)
|
||||
to_name.restype = ctypes.POINTER(ctypes.c_char)
|
||||
to_name.errcheck = self._check_result
|
||||
self._to_name = to_name
|
||||
|
||||
free = lib.cap_free
|
||||
free.argtypes = (ctypes.c_void_p,)
|
||||
free.restype = ctypes.c_int
|
||||
free.errcheck = self._check_result
|
||||
self._free = free
|
||||
|
||||
@staticmethod
|
||||
def _check_result(result, func, args):
|
||||
if result is None or (isinstance(result, int) and result == -1):
|
||||
err = ctypes.get_errno()
|
||||
msg = f"{func.__name__}{args} -> {result}: error ({err}): {os.strerror(err)}"
|
||||
raise OSError(err, msg)
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def make():
|
||||
path = ctypes.util.find_library("cap")
|
||||
if not path:
|
||||
return None
|
||||
|
||||
try:
|
||||
lib = ctypes.CDLL(path, use_errno=True)
|
||||
except (OSError, ImportError):
|
||||
return None
|
||||
|
||||
return LibCap(lib)
|
||||
|
||||
@staticmethod
|
||||
def last_cap() -> int:
|
||||
"""Return the int value of the highest valid capability"""
|
||||
try:
|
||||
with open("/proc/sys/kernel/cap_last_cap", "rb") as f:
|
||||
data = f.read()
|
||||
return int(data)
|
||||
except FileNotFoundError:
|
||||
return 0
|
||||
|
||||
@classmethod
|
||||
def get_default(cls) -> "LibCap":
|
||||
"""Return a singleton instance of the library"""
|
||||
with cls._lock:
|
||||
if cls._inst is None:
|
||||
cls._inst = cls.make()
|
||||
return cls._inst
|
||||
|
||||
def get_bound(self, capability: int) -> bool:
|
||||
"""Return the current value of the capability in the thread's bounding set"""
|
||||
# cap = self.cap_value_t(capability)
|
||||
return self._get_bound(capability) == 1
|
||||
|
||||
def to_name(self, value: int) -> str:
|
||||
"""Translate from the capability's integer value to the its symbolic name"""
|
||||
raw = self._to_name(value)
|
||||
val = ctypes.cast(raw, ctypes.c_char_p).value
|
||||
res = str(val, encoding="utf-8")
|
||||
self._free(raw)
|
||||
return res.upper()
|
||||
|
||||
def from_name(self, value: str) -> int:
|
||||
"""Translate from the symbolic name to its integer value"""
|
||||
cap = self.cap_value_t()
|
||||
self._from_name(value.encode("utf-8"), ctypes.pointer(cap))
|
||||
return int(cap.value)
|
||||
|
||||
|
||||
def cap_is_supported(capability: str = "CAP_CHOWN") -> bool:
|
||||
"""Return whether a given capability is supported by the system"""
|
||||
lib = LibCap.get_default()
|
||||
if not lib:
|
||||
return False
|
||||
|
||||
try:
|
||||
value = lib.from_name(capability)
|
||||
lib.get_bound(value)
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
|
||||
def cap_bound_set() -> set:
|
||||
"""Return the calling thread's capability bounding set
|
||||
|
||||
If capabilities are not supported this function will return the empty set.
|
||||
"""
|
||||
lib = LibCap.get_default()
|
||||
if not lib:
|
||||
return set()
|
||||
|
||||
res = set(
|
||||
lib.to_name(cap)
|
||||
for cap in range(lib.last_cap() + 1)
|
||||
if lib.get_bound(cap)
|
||||
)
|
||||
|
||||
return res
|
||||
|
||||
|
||||
def cap_mask_to_set(mask: int) -> set:
|
||||
lib = LibCap.get_default()
|
||||
if not lib:
|
||||
return set()
|
||||
|
||||
def bits(n):
|
||||
count = 0
|
||||
while n:
|
||||
if n & 1:
|
||||
yield count
|
||||
count += 1
|
||||
n >>= 1
|
||||
|
||||
res = {
|
||||
lib.to_name(cap) for cap in bits(mask)
|
||||
}
|
||||
|
||||
return res
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue