For the qemu tests this makes no difference as we are anyway forwarding the ports. But the nspawn tests share the same network namespace between the image and the ssh client running the test without any forwarding. In order for that to work we had to modify the image to use a non-standard port. We don't want this for two reasons: we want to make sure we test our images unmodified, and also this meant that when we changed our pipeline generation we were not verifying that the boot test cases were updated accordingly. As a result they have now drifted. Signed-off-by: Tom Gundersen <teg@jklm.no>
281 lines
9.3 KiB
Python
Executable file
281 lines
9.3 KiB
Python
Executable file
#!/usr/bin/python3
|
|
|
|
import argparse
|
|
import contextlib
|
|
import ctypes
|
|
import glob
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
import shutil
|
|
import urllib.request
|
|
|
|
|
|
from typing import Dict, Any, Union, Tuple
|
|
|
|
|
|
|
|
|
|
TEST_DIR = os.path.dirname(__file__)
|
|
|
|
|
|
def errcheck(ret, func, args):
|
|
if ret == -1:
|
|
e = ctypes.get_errno()
|
|
raise OSError(e, os.strerror(e))
|
|
|
|
|
|
CLONE_NEWNET = 0x40000000
|
|
libc = ctypes.CDLL('libc.so.6', use_errno=True)
|
|
libc.setns.errcheck = errcheck
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def netns():
|
|
# Grab a reference to the current namespace.
|
|
with open("/proc/self/ns/net") as oldnet:
|
|
# Create a new namespace and enter it.
|
|
libc.unshare(CLONE_NEWNET)
|
|
# Up the loopback device in the new namespace.
|
|
subprocess.run(["ip", "link", "set", "up", "dev", "lo"], check=True)
|
|
try:
|
|
yield
|
|
finally:
|
|
# Revert to the old namespace, dropping our
|
|
# reference to the new one.
|
|
libc.setns(oldnet.fileno(), CLONE_NEWNET)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def osbuild_test_store():
|
|
store = os.getenv("OSBUILD_TEST_STORE")
|
|
if store:
|
|
yield store
|
|
else:
|
|
with tempfile.TemporaryDirectory(dir="/var/tmp", prefix="osbuild-composer-test-") as store:
|
|
yield store
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def temporary_json_file(obj):
|
|
f = tempfile.NamedTemporaryFile("w", delete=False)
|
|
json.dump(obj, f, indent=2)
|
|
f.close()
|
|
try:
|
|
yield f.name
|
|
finally:
|
|
os.unlink(f.name)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def create_ssh_keys():
|
|
with tempfile.TemporaryDirectory() as dir:
|
|
# Copy the keys and set correct permissions/ownership on the directory and keys
|
|
# Proper directory ownership is implied by the fact that this process creates the directory
|
|
# The mode is adjusted by `chmod`
|
|
shutil.copyfile(f"{TEST_DIR}/keyring/id_rsa", f"{dir}/id_rsa")
|
|
shutil.copyfile(f"{TEST_DIR}/keyring/id_rsa.pub", f"{dir}/id_rsa.pub")
|
|
os.chmod(f"{dir}/id_rsa", 0o600)
|
|
try:
|
|
yield dir
|
|
finally:
|
|
pass
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def qemu_boot_image(image_file):
|
|
# run in background
|
|
cmd = ["qemu-system-x86_64",
|
|
"-m", "2048",
|
|
"-snapshot",
|
|
"-accel", "accel=kvm:hvf:tcg",
|
|
"-net", "nic,model=rtl8139", "-net", "user,hostfwd=tcp::22-:22",
|
|
"-nographic",
|
|
image_file
|
|
]
|
|
print(f"running qemu command: {' '.join(cmd)}")
|
|
vm = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
try:
|
|
yield None
|
|
finally:
|
|
vm.kill()
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def nspawn_boot_container(image_file):
|
|
cmd = ["systemd-nspawn", "--boot", "--register=no", "-M", "boottest", "--image", image_file]
|
|
print(f"running nspawn command: {' '.join(cmd)}")
|
|
container = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
try:
|
|
yield None
|
|
finally:
|
|
container.kill()
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def nspawn_extract_container(image_file):
|
|
with tempfile.TemporaryDirectory() as dir:
|
|
subprocess.run(["tar", "xf", image_file], cwd=dir)
|
|
cmd = ["systemd-nspawn", "--boot", "--register=no", "-M", "boottest", "--directory", "."]
|
|
print(f"running nspawn command: {' '.join(cmd)}")
|
|
container = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=dir)
|
|
try:
|
|
yield None
|
|
finally:
|
|
container.kill()
|
|
|
|
|
|
def run_osbuild(pipeline, store):
|
|
osbuild_cmd = ["python3", "-m", "osbuild", "--json", "--libdir", ".", "--store", store, "-"]
|
|
|
|
build_env = os.getenv("OSBUILD_TEST_BUILD_ENV", None)
|
|
if build_env:
|
|
osbuild_cmd.append("--build-env")
|
|
osbuild_cmd.append(os.path.abspath(build_env))
|
|
|
|
result = dict()
|
|
try:
|
|
result = json.loads(subprocess.check_output(osbuild_cmd, cwd="./osbuild", encoding="utf-8", input=json.dumps(pipeline)))
|
|
except subprocess.CalledProcessError as err:
|
|
print(err.output)
|
|
|
|
return result["tree_id"], result["output_id"]
|
|
|
|
|
|
def run_test(case, store):
|
|
try:
|
|
if "compose" not in case:
|
|
print("skipping this test case")
|
|
return True
|
|
|
|
if "pipeline" in case:
|
|
_, output_id = run_osbuild(case["pipeline"], store)
|
|
filename = os.path.join(store, "refs", output_id, case["compose"]["filename"])
|
|
else:
|
|
filename, _ = urllib.request.urlretrieve(case["url"])
|
|
|
|
fn, ex = os.path.splitext(filename)
|
|
if ex == ".xz":
|
|
with open(fn, "w") as f:
|
|
subprocess.run(["xz", "--decompress", "--stdout", filename], stdout=f)
|
|
filename = fn
|
|
info = json.loads(subprocess.check_output(["tools/image-info", filename]))
|
|
if info != case["expected"]:
|
|
with temporary_json_file(case["expected"]) as a, temporary_json_file(info) as b:
|
|
subprocess.run(["diff", "--unified", "--color", "--label", "expected", a, "--label", "got", b], check=False)
|
|
return False
|
|
except KeyError:
|
|
pass
|
|
|
|
return True
|
|
|
|
|
|
def get_local_boot_test_case(fname: str) -> Union[Tuple[str, str, Dict[Any, Any]], None]:
|
|
with open(fname, "r") as fd:
|
|
test_case_dict = json.load(fd)
|
|
|
|
if "boot-test" not in test_case_dict:
|
|
return None
|
|
|
|
pipeline_dict = test_case_dict["pipeline"]
|
|
return test_case_dict["boot-test"]["boot-type"], test_case_dict["boot-test"]["filename"], pipeline_dict
|
|
|
|
|
|
def run_ssh_test(private_key):
|
|
cmd = ["ssh",
|
|
"-p", "22",
|
|
"-i", private_key,
|
|
"-o", "StrictHostKeyChecking=no",
|
|
"redhat@localhost",
|
|
"systemctl is-system-running"]
|
|
for _ in range(40):
|
|
try:
|
|
# Run the process with check=False because it returns non-zero return code for "degraded"
|
|
sp = subprocess.run(cmd, timeout=120, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
if sp.returncode == 255:
|
|
raise subprocess.CalledProcessError(sp.returncode, cmd)
|
|
output = sp.stdout.decode('utf-8').strip()
|
|
print(output)
|
|
if output == "running":
|
|
print("ssh test success")
|
|
return 0
|
|
elif output == "degraded":
|
|
print("ssh test passed, but the system is degraded")
|
|
return 0
|
|
else:
|
|
print(f"ssh test failed, system status is: {output}")
|
|
return 1
|
|
except subprocess.TimeoutExpired:
|
|
print("ssh timeout expired")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"ssh error: {e}")
|
|
time.sleep(20)
|
|
|
|
print("ssh test failure")
|
|
return 1
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Run test cases.')
|
|
parser.add_argument('--boot-test', type=str, nargs='*', help='Boot images produced by osbuild')
|
|
parser.add_argument('--image-info', type=str, nargs='*',
|
|
help='Build images and run image-info on them (default action)')
|
|
arg = parser.parse_args()
|
|
|
|
# Run local boot test
|
|
if arg.boot_test is not None:
|
|
failed = False
|
|
with osbuild_test_store() as store:
|
|
with create_ssh_keys() as keydir:
|
|
if arg.boot_test != []:
|
|
test_cases = list(map(lambda x: f"{TEST_DIR}/cases/{x}_local_boot.json", arg.boot_test))
|
|
else:
|
|
test_cases = glob.glob(f"{TEST_DIR}/cases/*.json")
|
|
|
|
for test_case in test_cases:
|
|
test_case_tuple = get_local_boot_test_case(test_case)
|
|
if test_case_tuple:
|
|
test_type, image_fname, pl_dict = test_case_tuple
|
|
print("starting osbuild")
|
|
_, output_id = run_osbuild(pl_dict, store)
|
|
print("osbuild success")
|
|
with netns():
|
|
if test_type == "qemu":
|
|
with qemu_boot_image(f"{store}/refs/{output_id}/{image_fname}"):
|
|
if run_ssh_test(f"{keydir}/id_rsa") == 1:
|
|
failed = True
|
|
elif test_type == "nspawn":
|
|
with nspawn_boot_container(f"{store}/refs/{output_id}/{image_fname}"):
|
|
if run_ssh_test(f"{keydir}/id_rsa") == 1:
|
|
failed = True
|
|
elif test_type == "nspawn-extract":
|
|
with nspawn_extract_container(f"{store}/refs/{output_id}/{image_fname}"):
|
|
if run_ssh_test(f"{keydir}/id_rsa") == 1:
|
|
failed = True
|
|
else:
|
|
print("unknown test type")
|
|
|
|
return 1 if failed else 0
|
|
|
|
failed = False
|
|
with osbuild_test_store() as store:
|
|
for filename in arg.image_info if arg.image_info != [] else glob.glob(f"{TEST_DIR}/cases/*.json"):
|
|
name = os.path.basename(filename)[:-5]
|
|
with open(filename) as f:
|
|
case = json.load(f)
|
|
|
|
print(f"{name}")
|
|
if not run_test(case, store):
|
|
print(f"FAIL")
|
|
print()
|
|
failed = True
|
|
|
|
return 1 if failed else 0
|
|
|
|
|
|
r = main()
|
|
if r:
|
|
sys.exit(r)
|