1) additional qemu tests for ami, vmdk, vhd, and openstack image types 2) new type of systemd-nspawn tests for tar, ext4, and parititioned disk types the systemd-nspawn tests use loopback network interface directly from the host so it is necessary to tweak the settings of its SSH server. This is done in a "script" stage using simple "sed" command.
244 lines
8.3 KiB
Python
Executable file
244 lines
8.3 KiB
Python
Executable file
#!/usr/bin/python3
|
|
|
|
import argparse
|
|
import contextlib
|
|
import glob
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
import shutil
|
|
import urllib.request
|
|
|
|
|
|
from typing import Dict, Any, Union, Tuple
|
|
|
|
TEST_DIR = os.path.dirname(__file__)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def osbuild_test_store():
|
|
store = os.getenv("OSBUILD_TEST_STORE")
|
|
if store:
|
|
yield store
|
|
else:
|
|
with tempfile.TemporaryDirectory(dir="/var/tmp", prefix="osbuild-composer-test-") as store:
|
|
yield store
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def temporary_json_file(obj):
|
|
f = tempfile.NamedTemporaryFile("w", delete=False)
|
|
json.dump(obj, f, indent=2)
|
|
f.close()
|
|
try:
|
|
yield f.name
|
|
finally:
|
|
os.unlink(f.name)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def create_ssh_keys():
|
|
with tempfile.TemporaryDirectory() as dir:
|
|
# Copy the keys and set correct permissions/ownership on the directory and keys
|
|
# Proper directory ownership is implied by the fact that this process creates the directory
|
|
# The mode is adjusted by `chmod`
|
|
shutil.copyfile(f"{TEST_DIR}/keyring/id_rsa", f"{dir}/id_rsa")
|
|
shutil.copyfile(f"{TEST_DIR}/keyring/id_rsa.pub", f"{dir}/id_rsa.pub")
|
|
os.chmod(f"{dir}/id_rsa", 0o600)
|
|
try:
|
|
yield dir
|
|
finally:
|
|
pass
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def qemu_boot_image(image_file):
|
|
# run in background
|
|
cmd = ["qemu-system-x86_64",
|
|
"-m", "2048",
|
|
"-snapshot",
|
|
"-accel", "accel=kvm:hvf:tcg",
|
|
"-net", "nic,model=rtl8139", "-net", "user,hostfwd=tcp::1022-:22",
|
|
"-nographic",
|
|
image_file
|
|
]
|
|
print(f"running qemu command: {' '.join(cmd)}")
|
|
vm = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
try:
|
|
yield None
|
|
finally:
|
|
vm.kill()
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def nspawn_boot_container(image_file):
|
|
cmd = ["systemd-nspawn", "--boot", "-M", "boottest", "--image", image_file]
|
|
print(f"running nspawn command: {' '.join(cmd)}")
|
|
container = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
try:
|
|
yield None
|
|
finally:
|
|
container.kill()
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def nspawn_extract_container(image_file):
|
|
with tempfile.TemporaryDirectory() as dir:
|
|
subprocess.run(["tar", "xf", image_file], cwd=dir)
|
|
cmd = ["systemd-nspawn", "--boot", "-M", "boottest", "--directory", "."]
|
|
print(f"running nspawn command: {' '.join(cmd)}")
|
|
container = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=dir)
|
|
try:
|
|
yield None
|
|
finally:
|
|
container.kill()
|
|
|
|
|
|
def run_osbuild(pipeline, store):
|
|
osbuild_cmd = ["python3", "-m", "osbuild", "--json", "--libdir", ".", "--store", store, "-"]
|
|
|
|
build_pipeline = os.getenv("OSBUILD_TEST_BUILD_PIPELINE", None)
|
|
if build_pipeline:
|
|
osbuild_cmd.append("--build-pipeline")
|
|
osbuild_cmd.append(os.path.abspath(build_pipeline))
|
|
|
|
result = dict()
|
|
try:
|
|
result = json.loads(subprocess.check_output(osbuild_cmd, cwd="./osbuild", encoding="utf-8", input=json.dumps(pipeline)))
|
|
except subprocess.CalledProcessError as err:
|
|
print(err.output)
|
|
|
|
return result["tree_id"], result["output_id"]
|
|
|
|
|
|
def run_test(case, store):
|
|
try:
|
|
if "compose" not in case:
|
|
print("skipping this test case")
|
|
return True
|
|
|
|
if "pipeline" in case:
|
|
_, output_id = run_osbuild(case["pipeline"], store)
|
|
filename = os.path.join(store, "refs", output_id, case["compose"]["filename"])
|
|
else:
|
|
filename, _ = urllib.request.urlretrieve(case["url"])
|
|
|
|
info = json.loads(subprocess.check_output(["tools/image-info", filename]))
|
|
if info != case["expected"]:
|
|
with temporary_json_file(case["expected"]) as a, temporary_json_file(info) as b:
|
|
subprocess.run(["diff", "--unified", "--color", "--label", "expected", a, "--label", "got", b], check=False)
|
|
return False
|
|
except KeyError:
|
|
pass
|
|
|
|
return True
|
|
|
|
|
|
def get_local_boot_test_case(fname: str) -> Union[Tuple[str, str, Dict[Any, Any]], None]:
|
|
with open(fname, "r") as fd:
|
|
test_case_dict = json.load(fd)
|
|
|
|
if "boot-test" not in test_case_dict:
|
|
return None
|
|
|
|
pipeline_dict = test_case_dict["pipeline"]
|
|
return test_case_dict["boot-test"]["boot-type"], test_case_dict["boot-test"]["filename"], pipeline_dict
|
|
|
|
|
|
def run_ssh_test(private_key):
|
|
cmd = ["ssh",
|
|
"-p", "1022",
|
|
"-i", private_key,
|
|
"-o", "StrictHostKeyChecking=no",
|
|
"redhat@localhost",
|
|
"systemctl is-system-running"]
|
|
for _ in range(40):
|
|
try:
|
|
# Run the process with check=False because it returns non-zero return code for "degraded"
|
|
sp = subprocess.run(cmd, timeout=120, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
if sp.returncode == 255:
|
|
raise subprocess.CalledProcessError(sp.returncode, cmd)
|
|
output = sp.stdout.decode('utf-8').strip()
|
|
print(output)
|
|
if output == "running":
|
|
print("ssh test success")
|
|
return 0
|
|
elif output == "degraded":
|
|
print("ssh test passed, but the system is degraded")
|
|
return 0
|
|
else:
|
|
print(f"ssh test failed, system status is: {output}")
|
|
return 1
|
|
except subprocess.TimeoutExpired:
|
|
print("ssh timeout expired")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"ssh error: {e}")
|
|
time.sleep(20)
|
|
|
|
print("ssh test failure")
|
|
return 1
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Run test cases.')
|
|
parser.add_argument('--boot-test', type=str, nargs='*', help='Boot images produced by osbuild')
|
|
parser.add_argument('--image-info', type=str, nargs='*',
|
|
help='Build images and run image-info on them (default action)')
|
|
arg = parser.parse_args()
|
|
|
|
# Run local boot test
|
|
if arg.boot_test is not None:
|
|
failed = False
|
|
with osbuild_test_store() as store:
|
|
with create_ssh_keys() as keydir:
|
|
if arg.boot_test != []:
|
|
test_cases = list(map(lambda x: f"{TEST_DIR}/cases/{x}_local_boot.json", arg.boot_test))
|
|
else:
|
|
test_cases = glob.glob(f"{TEST_DIR}/cases/*.json")
|
|
|
|
for test_case in test_cases:
|
|
test_case_tuple = get_local_boot_test_case(test_case)
|
|
if test_case_tuple:
|
|
test_type, image_fname, pl_dict = test_case_tuple
|
|
print("starting osbuild")
|
|
_, output_id = run_osbuild(pl_dict, store)
|
|
print("osbuild success")
|
|
if test_type == "qemu":
|
|
with qemu_boot_image(f"{store}/refs/{output_id}/{image_fname}"):
|
|
if run_ssh_test(f"{keydir}/id_rsa") == 1:
|
|
failed = True
|
|
elif test_type == "nspawn":
|
|
with nspawn_boot_container(f"{store}/refs/{output_id}/{image_fname}"):
|
|
if run_ssh_test(f"{keydir}/id_rsa") == 1:
|
|
failed = True
|
|
elif test_type == "nspawn-extract":
|
|
with nspawn_extract_container(f"{store}/refs/{output_id}/{image_fname}"):
|
|
if run_ssh_test(f"{keydir}/id_rsa") == 1:
|
|
failed = True
|
|
else:
|
|
print("unknown test type")
|
|
|
|
return 1 if failed else 0
|
|
|
|
failed = False
|
|
with osbuild_test_store() as store:
|
|
for filename in arg.image_info if arg.image_info != [] else glob.glob(f"{TEST_DIR}/cases/*.json"):
|
|
name = os.path.basename(filename)[:-5]
|
|
with open(filename) as f:
|
|
case = json.load(f)
|
|
|
|
print(f"{name}")
|
|
if not run_test(case, store):
|
|
print(f"FAIL")
|
|
print()
|
|
failed = True
|
|
|
|
return 1 if failed else 0
|
|
|
|
|
|
r = main()
|
|
if r:
|
|
sys.exit(r)
|