diff --git a/osbuild/loop.py b/osbuild/loop.py index 77334c3f..675ea510 100644 --- a/osbuild/loop.py +++ b/osbuild/loop.py @@ -15,6 +15,9 @@ __all__ = [ "UnexpectedDevice" ] +# can be mocked in tests +DEV_PATH = "/dev" + class UnexpectedDevice(Exception): def __init__(self, expected_minor, rdev, mode): @@ -124,8 +127,14 @@ class Loop: with contextlib.ExitStack() as stack: if not dir_fd: - dir_fd = os.open("/dev", os.O_DIRECTORY) + dir_fd = os.open(DEV_PATH, os.O_DIRECTORY) stack.callback(lambda: os.close(dir_fd)) + # ensure the device node is availale, in containers it may + # not get dynamically created + try: + self.mknod(dir_fd) + except FileExistsError: + pass self.fd = os.open(self.devname, os.O_RDWR, dir_fd=dir_fd) info = os.stat(self.fd) @@ -534,7 +543,7 @@ class LoopControl: with contextlib.ExitStack() as stack: if not dir_fd: - dir_fd = os.open("/dev", os.O_DIRECTORY) + dir_fd = os.open(DEV_PATH, os.O_DIRECTORY) stack.callback(lambda: os.close(dir_fd)) self.fd = os.open("loop-control", os.O_RDWR, dir_fd=dir_fd) diff --git a/test/mod/test_loop.py b/test/mod/test_loop.py index 0b5ba6bb..2bd3e6ce 100644 --- a/test/mod/test_loop.py +++ b/test/mod/test_loop.py @@ -5,9 +5,11 @@ import contextlib import fcntl import os +import pathlib import threading import time from tempfile import TemporaryDirectory, TemporaryFile +from unittest.mock import patch import pytest @@ -255,6 +257,17 @@ def test_on_close(tempdir): ctl.close() -def test_loop_handles_error_in_init(): +@patch("os.open", side_effect=FileNotFoundError) +def test_loop_handles_error_in_init(mocked_open): with pytest.raises(FileNotFoundError): - lopo = loop.Loop("non-existing") + lopo = loop.Loop(999) + + +@pytest.mark.skipif(os.getuid() != 0, reason="root only") +def test_loop_create_mknod(): + # tmpdir must be /var/tmp because /tmp is usually mounted with "nodev" + with TemporaryDirectory(dir="/var/tmp") as tmpdir: + with patch.object(loop, "DEV_PATH", new=tmpdir) as mocked_dev_path: + lopo = loop.Loop(1337) + assert lopo.devname == "loop1337" + assert pathlib.Path(f"{tmpdir}/loop1337").is_block_device()