linux: add Libc accessor with renameat2(2)
Add a new utility that wraps ctypes.CDLL() for the self-embedded libc.so. Initially, it only exposes renameat2(2), but more can be added when needed in the future. The Libc class is very similar to the existing LibCap class, with a similar instantiation logic with singleton access. In the future, the Libc class will allow access to other system calls and libc.so functionality, when needed.
This commit is contained in:
parent
ebbedd1e89
commit
efe4ad4b92
2 changed files with 123 additions and 0 deletions
|
|
@ -28,6 +28,7 @@ __all__ = [
|
|||
"fcntl_flock",
|
||||
"ioctl_get_immutable",
|
||||
"ioctl_toggle_immutable",
|
||||
"Libc",
|
||||
"proc_boot_id",
|
||||
]
|
||||
|
||||
|
|
@ -421,6 +422,71 @@ def fcntl_flock(fd: int, lock_type: int, wait: bool = False):
|
|||
fcntl.fcntl(fd, lock_cmd, arg_flock64)
|
||||
|
||||
|
||||
class Libc:
|
||||
"""Safe Access to libc
|
||||
|
||||
This class provides selected safe accessors to libc functionality. It is
|
||||
highly linux-specific and uses `ctypes.CDLL` to access `libc`.
|
||||
"""
|
||||
|
||||
AT_FDCWD = ctypes.c_int(-100)
|
||||
RENAME_EXCHANGE = ctypes.c_uint(2)
|
||||
RENAME_NOREPLACE = ctypes.c_uint(1)
|
||||
RENAME_WHITEOUT = ctypes.c_uint(4)
|
||||
|
||||
_lock = threading.Lock()
|
||||
_inst = None
|
||||
|
||||
def __init__(self, lib: ctypes.CDLL):
|
||||
self._lib = lib
|
||||
|
||||
# prototype: renameat2
|
||||
proto = ctypes.CFUNCTYPE(
|
||||
ctypes.c_int,
|
||||
ctypes.c_int,
|
||||
ctypes.c_char_p,
|
||||
ctypes.c_int,
|
||||
ctypes.c_char_p,
|
||||
ctypes.c_uint,
|
||||
use_errno=True,
|
||||
)(
|
||||
("renameat2", self._lib),
|
||||
(
|
||||
(1, "olddirfd", self.AT_FDCWD),
|
||||
(1, "oldpath"),
|
||||
(1, "newdirfd", self.AT_FDCWD),
|
||||
(1, "newpath"),
|
||||
(1, "flags", 0),
|
||||
),
|
||||
)
|
||||
setattr(proto, "errcheck", self._errcheck_errno)
|
||||
setattr(proto, "__name__", "renameat2")
|
||||
self.renameat2 = proto
|
||||
|
||||
@staticmethod
|
||||
def make() -> "Libc":
|
||||
"""Create a new instance"""
|
||||
|
||||
return Libc(ctypes.CDLL("", use_errno=True))
|
||||
|
||||
@classmethod
|
||||
def default(cls) -> "Libc":
|
||||
"""Return and possibly create the default singleton instance"""
|
||||
|
||||
with cls._lock:
|
||||
if cls._inst is None:
|
||||
cls._inst = cls.make()
|
||||
return cls._inst
|
||||
|
||||
@staticmethod
|
||||
def _errcheck_errno(result, func, args):
|
||||
if result < 0:
|
||||
err = ctypes.get_errno()
|
||||
msg = f"{func.__name__}{args} -> {result}: error ({err}): {os.strerror(err)}"
|
||||
raise OSError(err, msg)
|
||||
return result
|
||||
|
||||
|
||||
def proc_boot_id(appid: str):
|
||||
"""Acquire Application-specific Boot-ID
|
||||
|
||||
|
|
|
|||
|
|
@ -167,6 +167,63 @@ def test_fcntl_flock():
|
|||
os.close(fd2)
|
||||
|
||||
|
||||
def test_libc():
|
||||
#
|
||||
# Test that the Libc class can be instantiated and provides a suitable
|
||||
# default singleton. Verify the expected interfaces exist (though tests
|
||||
# for them are separate).
|
||||
#
|
||||
|
||||
libc0 = linux.Libc.make()
|
||||
libc1 = linux.Libc.default()
|
||||
|
||||
assert libc0 is not libc1
|
||||
assert libc1 is linux.Libc.default()
|
||||
|
||||
assert libc0.AT_FDCWD
|
||||
assert libc0.RENAME_EXCHANGE
|
||||
assert libc0.RENAME_NOREPLACE
|
||||
assert libc0.RENAME_WHITEOUT
|
||||
assert libc0.renameat2
|
||||
|
||||
|
||||
def test_libc_renameat2_errcheck():
|
||||
#
|
||||
# Verify the `renameat(2)` system call on `Libc` correctly turns errors into
|
||||
# python exceptions.
|
||||
#
|
||||
|
||||
libc = linux.Libc.default()
|
||||
|
||||
with pytest.raises(OSError):
|
||||
libc.renameat2(oldpath=b"", newpath=b"")
|
||||
|
||||
|
||||
def test_libc_renameat2_exchange(tmpdir):
|
||||
#
|
||||
# Verify the `renameat(2)` system call on `Libc` with the
|
||||
# `RENAME_EXCHANGE` flag. This swaps two files atomically.
|
||||
#
|
||||
|
||||
libc = linux.Libc.default()
|
||||
|
||||
with open(f"{tmpdir}/foo", "x", encoding="utf8") as f:
|
||||
f.write("foo")
|
||||
with open(f"{tmpdir}/bar", "x", encoding="utf8") as f:
|
||||
f.write("bar")
|
||||
|
||||
libc.renameat2(
|
||||
oldpath=f"{tmpdir}/foo".encode(),
|
||||
newpath=f"{tmpdir}/bar".encode(),
|
||||
flags=linux.Libc.RENAME_EXCHANGE,
|
||||
)
|
||||
|
||||
with open(f"{tmpdir}/foo", "r", encoding="utf8") as f:
|
||||
assert f.read() == "bar"
|
||||
with open(f"{tmpdir}/bar", "r", encoding="utf8") as f:
|
||||
assert f.read() == "foo"
|
||||
|
||||
|
||||
def test_proc_boot_id():
|
||||
#
|
||||
# Test the `proc_boot_id()` function which reads the current boot-id
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue