debian-forge/osbuild/testutil/__init__.py
Michael Vogt 1b3e956334 testutil: switch mock_command to use bash
This change allows use the more advanced features of bash like
array operations (e.g. `${@:2}` to drop the first two arguments
or similar. On fedora/rhel this is a no-op as it is already using
sh -> bash (afaik).
2024-10-24 10:06:46 +02:00

203 lines
6.4 KiB
Python

"""
Test related utilities
"""
import contextlib
import inspect
import os
import pathlib
import random
import re
import shutil
import socket
import string
import subprocess
import tempfile
import textwrap
from types import ModuleType
from typing import Type
def has_executable(executable: str) -> bool:
return shutil.which(executable) is not None
def assert_dict_has(v, keys, expected_value):
for key in keys.split("."):
assert key in v
v = v[key]
assert v == expected_value
def make_fake_tree(basedir: pathlib.Path, fake_content: dict):
"""Create a directory tree of files with content.
Call it with:
{"filename": "content", "otherfile": "content"}
filename paths will have their parents created as needed, under tmpdir.
"""
for path, content in fake_content.items():
dirp, name = os.path.split(os.path.join(basedir, path.lstrip("/")))
os.makedirs(dirp, exist_ok=True)
with open(os.path.join(dirp, name), "w", encoding="utf-8") as fp:
fp.write(content)
def make_fake_input_tree(tmpdir: pathlib.Path, fake_content: dict) -> str:
"""
Wrapper around make_fake_tree for "input trees"
"""
basedir = tmpdir / "tree"
make_fake_tree(basedir, fake_content)
return os.fspath(basedir)
def assert_jsonschema_error_contains(res, expected_err, expected_num_errs=None):
err_msgs = [e.as_dict()["message"] for e in res.errors]
if expected_num_errs is not None:
assert len(err_msgs) == expected_num_errs, \
f"expected exactly {expected_num_errs} errors in {[e.as_dict() for e in res.errors]}"
re_typ = getattr(re, 'Pattern', None)
# this can be removed once we no longer support py3.6 (re.Pattern is modern)
if not re_typ:
re_typ = getattr(re, '_pattern_type')
if isinstance(expected_err, re_typ):
finder = expected_err.search
else:
def finder(s): return expected_err in s # pylint: disable=C0321
assert any(finder(err_msg)
for err_msg in err_msgs), f"{expected_err} not found in {err_msgs}"
class MockCommandCallArgs:
"""MockCommandCallArgs provides the arguments a mocked command
was called with.
Use :call_args_list: to get a list of calls and each of these calls
will have the argv[1:] from the mocked binary.
"""
def __init__(self, calllog_path):
self._calllog = pathlib.Path(calllog_path)
@property
def call_args_list(self):
call_arg_list = []
for acall in self._calllog.read_text(encoding="utf8").split("\n\n"):
if acall:
call_arg_list.append(acall.split("\n"))
return call_arg_list
@contextlib.contextmanager
def mock_command(cmd_name: str, script: str):
"""
mock_command creates a mocked binary with the given :cmd_name: and :script:
content. This is useful to e.g. mock errors from binaries or validate that
external binaries are called in the right way.
It returns a MockCommandCallArgs class that can be used to inspect the
way the binary was called.
"""
original_path = os.environ["PATH"]
with tempfile.TemporaryDirectory() as tmpdir:
cmd_path = pathlib.Path(tmpdir) / cmd_name
cmd_calllog_path = pathlib.Path(os.fspath(cmd_path) + ".calllog")
# This is a little bit naive right now, if args contains \n things
# will break. easy enough to fix by using \0 as the separator but
# then \n in args is kinda rare
fake_cmd_content = textwrap.dedent(f"""\
#!/bin/bash -e
for arg in "$@"; do
echo "$arg" >> {cmd_calllog_path}
done
# extra separator to differenciate between calls
echo "" >> {cmd_calllog_path}
""") + script
cmd_path.write_text(fake_cmd_content, encoding="utf8")
cmd_path.chmod(0o755)
os.environ["PATH"] = f"{tmpdir}:{original_path}"
try:
yield MockCommandCallArgs(cmd_calllog_path)
finally:
os.environ["PATH"] = original_path
@contextlib.contextmanager
def make_container(tmp_path, fake_content, base="scratch"):
fake_container_tag = "osbuild-test-" + "".join(random.choices(string.digits, k=12))
fake_container_src = tmp_path / "fake-container-src"
fake_container_src.mkdir(exist_ok=True)
make_fake_tree(fake_container_src, fake_content)
fake_containerfile_path = fake_container_src / "Containerfile"
container_file_content = f"""
FROM {base}
COPY . .
"""
fake_containerfile_path.write_text(container_file_content, encoding="utf8")
subprocess.check_call([
"podman", "build",
"--no-cache",
"-t", fake_container_tag,
"-f", os.fspath(fake_containerfile_path),
])
try:
yield fake_container_tag
finally:
subprocess.check_call(["podman", "image", "rm", fake_container_tag])
@contextlib.contextmanager
def pull_oci_archive_container(archive_path, image_name):
subprocess.check_call(["skopeo", "copy", f"oci-archive:{archive_path}", f"containers-storage:{image_name}"])
try:
yield
finally:
subprocess.check_call(["skopeo", "delete", f"containers-storage:{image_name}"])
def make_fake_service_fd() -> int:
"""Create a file descriptor suitable as input for --service-fd for any
host.Service
Note that the service will take over the fd and take care of the
lifecycle so no need to close it.
"""
sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
fd = os.dup(sock.fileno())
return fd
def find_one_subclass_in_module(module: ModuleType, subclass: Type) -> object:
"""Find the class in the given module that is a subclass of the given input
If multiple classes are found an error is raised.
"""
cls = None
for name, memb in inspect.getmembers(
module,
predicate=lambda obj: inspect.isclass(obj) and issubclass(obj, subclass)):
if cls:
raise ValueError(f"already have {cls}, also found {name}:{memb}")
cls = memb
return cls
def make_fake_images_inputs(fake_oci_path, name):
fname = fake_oci_path.name
dirname = fake_oci_path.parent
return {
"images": {
"path": dirname,
"data": {
"archives": {
fname: {
"format": "oci-archive",
"name": name,
},
},
},
},
}