This is required for python3.6 where there is no `os.memfd_create()` yet. Can be removed once we move to python3.8+.
284 lines
8.8 KiB
Python
284 lines
8.8 KiB
Python
#
|
|
# Tests for the `osbuild.util.linux` module.
|
|
#
|
|
|
|
import ctypes
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
import time
|
|
|
|
import pytest
|
|
|
|
from osbuild.util import linux
|
|
|
|
from .. import test
|
|
|
|
|
|
@pytest.fixture(name="tmpdir")
|
|
def tmpdir_fixture():
|
|
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp:
|
|
yield tmp
|
|
|
|
|
|
@pytest.mark.skipif(not test.TestBase.can_modify_immutable("/var/tmp"), reason="root-only")
|
|
def test_ioctl_get_immutable(tmpdir):
|
|
#
|
|
# Test the `ioctl_get_immutable()` helper and make sure it works
|
|
# as intended.
|
|
#
|
|
|
|
with open(f"{tmpdir}/immutable", "x", encoding="utf8") as f:
|
|
assert not linux.ioctl_get_immutable(f.fileno())
|
|
|
|
|
|
@pytest.mark.skipif(not test.TestBase.can_modify_immutable("/var/tmp"), reason="root-only")
|
|
def test_ioctl_toggle_immutable(tmpdir):
|
|
#
|
|
# Test the `ioctl_toggle_immutable()` helper and make sure it works
|
|
# as intended.
|
|
#
|
|
|
|
with open(f"{tmpdir}/immutable", "x", encoding="utf8") as f:
|
|
# Check the file is mutable by default and if we clear it again.
|
|
assert not linux.ioctl_get_immutable(f.fileno())
|
|
linux.ioctl_toggle_immutable(f.fileno(), False)
|
|
assert not linux.ioctl_get_immutable(f.fileno())
|
|
|
|
# Set immutable and check for it. Try again to verify with flag set.
|
|
linux.ioctl_toggle_immutable(f.fileno(), True)
|
|
assert linux.ioctl_get_immutable(f.fileno())
|
|
linux.ioctl_toggle_immutable(f.fileno(), True)
|
|
assert linux.ioctl_get_immutable(f.fileno())
|
|
|
|
# Verify immutable files cannot be unlinked.
|
|
with pytest.raises(OSError):
|
|
os.unlink(f"{tmpdir}/immutable")
|
|
|
|
# Check again that clearing the flag works.
|
|
linux.ioctl_toggle_immutable(f.fileno(), False)
|
|
assert not linux.ioctl_get_immutable(f.fileno())
|
|
|
|
# This time, check that we actually set the same flag as `chattr`.
|
|
subprocess.run(["chattr", "+i",
|
|
f"{tmpdir}/immutable"], check=True)
|
|
assert linux.ioctl_get_immutable(f.fileno())
|
|
|
|
# Same for clearing it.
|
|
subprocess.run(["chattr", "-i",
|
|
f"{tmpdir}/immutable"], check=True)
|
|
assert not linux.ioctl_get_immutable(f.fileno())
|
|
|
|
# Verify we can unlink the file again, once the flag is cleared.
|
|
os.unlink(f"{tmpdir}/immutable")
|
|
|
|
|
|
@pytest.mark.skipif(not linux.cap_is_supported(), reason="no support for capabilities")
|
|
def test_capabilities():
|
|
#
|
|
# Test the capability related utility functions
|
|
#
|
|
|
|
lib = linux.LibCap.get_default()
|
|
assert lib
|
|
l2 = linux.LibCap.get_default()
|
|
assert lib is l2
|
|
|
|
assert linux.cap_is_supported()
|
|
|
|
assert linux.cap_is_supported("CAP_MAC_ADMIN")
|
|
|
|
val = lib.from_name("CAP_MAC_ADMIN")
|
|
assert val >= 0
|
|
|
|
name = lib.to_name(val)
|
|
assert name == "CAP_MAC_ADMIN"
|
|
|
|
assert not linux.cap_is_supported("CAP_GICMO")
|
|
with pytest.raises(OSError):
|
|
lib.from_name("CAP_GICMO")
|
|
|
|
|
|
def test_fcntl_flock():
|
|
#
|
|
# This tests the `linux.fcntl_flock()` file-locking helper. Note
|
|
# that file-locks are on the open-file-description, so they are shared
|
|
# between dupped file-descriptors. We explicitly create a separate
|
|
# file-description via `/proc/self/fd/`.
|
|
#
|
|
|
|
with tempfile.TemporaryFile() as f:
|
|
fd1 = f.fileno()
|
|
fd2 = os.open(os.path.join("/proc/self/fd/", str(fd1)), os.O_RDWR | os.O_CLOEXEC)
|
|
|
|
# Test: unlock
|
|
linux.fcntl_flock(fd1, linux.fcntl.F_UNLCK)
|
|
|
|
# Test: write-lock + unlock
|
|
linux.fcntl_flock(fd1, linux.fcntl.F_WRLCK)
|
|
linux.fcntl_flock(fd1, linux.fcntl.F_UNLCK)
|
|
|
|
# Test: read-lock1 + read-lock2 + unlock1 + unlock2
|
|
linux.fcntl_flock(fd1, linux.fcntl.F_RDLCK)
|
|
linux.fcntl_flock(fd2, linux.fcntl.F_RDLCK)
|
|
linux.fcntl_flock(fd1, linux.fcntl.F_UNLCK)
|
|
linux.fcntl_flock(fd2, linux.fcntl.F_UNLCK)
|
|
|
|
# Test: write-lock1 + write-lock2 + unlock
|
|
linux.fcntl_flock(fd1, linux.fcntl.F_WRLCK)
|
|
with pytest.raises(BlockingIOError):
|
|
linux.fcntl_flock(fd2, linux.fcntl.F_WRLCK)
|
|
linux.fcntl_flock(fd1, linux.fcntl.F_UNLCK)
|
|
|
|
# Test: write-lock1 + read-lock2 + unlock
|
|
linux.fcntl_flock(fd1, linux.fcntl.F_WRLCK)
|
|
with pytest.raises(BlockingIOError):
|
|
linux.fcntl_flock(fd2, linux.fcntl.F_RDLCK)
|
|
linux.fcntl_flock(fd1, linux.fcntl.F_UNLCK)
|
|
|
|
# Test: read-lock1 + write-lock2 + unlock
|
|
linux.fcntl_flock(fd1, linux.fcntl.F_RDLCK)
|
|
with pytest.raises(BlockingIOError):
|
|
linux.fcntl_flock(fd2, linux.fcntl.F_WRLCK)
|
|
linux.fcntl_flock(fd1, linux.fcntl.F_UNLCK)
|
|
|
|
# Test: write-lock1 + read-lock1 + read-lock2 + unlock
|
|
linux.fcntl_flock(fd1, linux.fcntl.F_WRLCK)
|
|
linux.fcntl_flock(fd1, linux.fcntl.F_RDLCK)
|
|
linux.fcntl_flock(fd2, linux.fcntl.F_RDLCK)
|
|
linux.fcntl_flock(fd1, linux.fcntl.F_UNLCK)
|
|
|
|
# Test: read-lock1 + read-lock2 + write-lock1 + unlock1 + unlock2
|
|
linux.fcntl_flock(fd1, linux.fcntl.F_RDLCK)
|
|
linux.fcntl_flock(fd2, linux.fcntl.F_RDLCK)
|
|
with pytest.raises(BlockingIOError):
|
|
linux.fcntl_flock(fd1, linux.fcntl.F_WRLCK)
|
|
linux.fcntl_flock(fd1, linux.fcntl.F_UNLCK)
|
|
linux.fcntl_flock(fd2, linux.fcntl.F_UNLCK)
|
|
|
|
# Test: write-lock3 + write-lock1 + close3 + write-lock1 + unlock1
|
|
fd3 = os.open(os.path.join("/proc/self/fd/", str(fd1)), os.O_RDWR | os.O_CLOEXEC)
|
|
linux.fcntl_flock(fd3, linux.fcntl.F_WRLCK)
|
|
with pytest.raises(BlockingIOError):
|
|
linux.fcntl_flock(fd1, linux.fcntl.F_WRLCK)
|
|
os.close(fd3)
|
|
linux.fcntl_flock(fd1, linux.fcntl.F_WRLCK)
|
|
linux.fcntl_flock(fd1, linux.fcntl.F_UNLCK)
|
|
|
|
# Cleanup
|
|
os.close(fd2)
|
|
|
|
|
|
def test_libc():
|
|
#
|
|
# Test that the Libc class can be instantiated and provides a suitable
|
|
# default singleton. Verify the expected interfaces exist (though tests
|
|
# for them are separate).
|
|
#
|
|
|
|
libc0 = linux.Libc.make()
|
|
libc1 = linux.Libc.default()
|
|
|
|
assert libc0 is not libc1
|
|
assert libc1 is linux.Libc.default()
|
|
|
|
assert libc0.AT_FDCWD
|
|
assert libc0.RENAME_EXCHANGE
|
|
assert libc0.RENAME_NOREPLACE
|
|
assert libc0.RENAME_WHITEOUT
|
|
assert libc0.renameat2
|
|
assert libc0.memfd_create
|
|
|
|
|
|
def test_libc_renameat2_errcheck():
|
|
#
|
|
# Verify the `renameat(2)` system call on `Libc` correctly turns errors into
|
|
# python exceptions.
|
|
#
|
|
|
|
libc = linux.Libc.default()
|
|
|
|
with pytest.raises(OSError):
|
|
libc.renameat2(oldpath=b"", newpath=b"")
|
|
|
|
|
|
def test_libc_renameat2_exchange(tmpdir):
|
|
#
|
|
# Verify the `renameat(2)` system call on `Libc` with the
|
|
# `RENAME_EXCHANGE` flag. This swaps two files atomically.
|
|
#
|
|
|
|
libc = linux.Libc.default()
|
|
|
|
with open(f"{tmpdir}/foo", "x", encoding="utf8") as f:
|
|
f.write("foo")
|
|
with open(f"{tmpdir}/bar", "x", encoding="utf8") as f:
|
|
f.write("bar")
|
|
|
|
libc.renameat2(
|
|
oldpath=f"{tmpdir}/foo".encode(),
|
|
newpath=f"{tmpdir}/bar".encode(),
|
|
flags=linux.Libc.RENAME_EXCHANGE,
|
|
)
|
|
|
|
with open(f"{tmpdir}/foo", "r", encoding="utf8") as f:
|
|
assert f.read() == "bar"
|
|
with open(f"{tmpdir}/bar", "r", encoding="utf8") as f:
|
|
assert f.read() == "foo"
|
|
|
|
|
|
def test_libc_memfd_create_errcheck():
|
|
libc = linux.Libc.default()
|
|
with pytest.raises(OSError) as exc:
|
|
libc.memfd_create("foo", -1)
|
|
assert "Invalid argument" in str(exc.value)
|
|
|
|
|
|
def test_libc_memfd_create():
|
|
libc = linux.Libc.default()
|
|
fd = libc.memfd_create("foo", 0)
|
|
os.write(fd, b"file content")
|
|
fd2 = os.dup(fd)
|
|
os.close(fd)
|
|
os.lseek(fd2, 0, 0)
|
|
assert os.read(fd2, 4096) == b"file content"
|
|
|
|
|
|
def test_proc_boot_id():
|
|
#
|
|
# Test the `proc_boot_id()` function which reads the current boot-id
|
|
# from the kernel. Make sure it is a valid UUID and also consistent on
|
|
# repeated queries.
|
|
#
|
|
|
|
bootid = linux.proc_boot_id("test")
|
|
assert len(bootid.hex) == 32
|
|
assert bootid.version == 4
|
|
|
|
bootid2 = linux.proc_boot_id("test")
|
|
assert bootid.int == bootid2.int
|
|
|
|
bootid3 = linux.proc_boot_id("foobar")
|
|
assert bootid.int != bootid3.int
|
|
|
|
|
|
def test_libc_futimens_errcheck():
|
|
libc = linux.Libc.default()
|
|
with pytest.raises(OSError):
|
|
libc.futimens(-1, None)
|
|
|
|
|
|
def test_libc_futimes_works(tmpdir):
|
|
libc = linux.Libc.default()
|
|
stamp_file = os.path.join(tmpdir, "foo")
|
|
with open(stamp_file, "wb") as fp:
|
|
fp.write(b"meep")
|
|
mtime1 = os.stat(stamp_file).st_mtime
|
|
time.sleep(0.1)
|
|
with open(stamp_file, "rb") as fp:
|
|
libc.futimens(fp.fileno(), ctypes.byref(linux.c_timespec_times2(
|
|
atime=linux.c_timespec(tv_sec=3, tv_nsec=300 * 1000 * 1000),
|
|
mtime=linux.c_timespec(tv_sec=0, tv_nsec=libc.UTIME_OMIT),
|
|
)))
|
|
assert os.stat(stamp_file).st_atime == 3.3
|
|
assert round(os.stat(stamp_file).st_mtime, 3) == round(mtime1, 3)
|