This is to avoid any confusion with the Compose struct in the store, which contains the pinned rpmmd data and the pipeline, among other things. The struct in the test cases represent the user input to the compose route, so rename it 'compose-request', to make that clearer. Signed-off-by: Tom Gundersen <teg@jklm.no>
272 lines
8.7 KiB
Python
Executable file
272 lines
8.7 KiB
Python
Executable file
#!/usr/bin/python3
|
|
|
|
import argparse
|
|
import contextlib
|
|
import ctypes
|
|
import glob
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
import shutil
|
|
import urllib.request
|
|
|
|
|
|
from typing import Dict, Any, Union, Tuple
|
|
|
|
|
|
|
|
|
|
TEST_DIR = os.path.dirname(__file__)
|
|
|
|
|
|
def errcheck(ret, func, args):
|
|
if ret == -1:
|
|
e = ctypes.get_errno()
|
|
raise OSError(e, os.strerror(e))
|
|
|
|
|
|
CLONE_NEWNET = 0x40000000
|
|
libc = ctypes.CDLL('libc.so.6', use_errno=True)
|
|
libc.setns.errcheck = errcheck
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def netns():
|
|
# Grab a reference to the current namespace.
|
|
with open("/proc/self/ns/net") as oldnet:
|
|
# Create a new namespace and enter it.
|
|
libc.unshare(CLONE_NEWNET)
|
|
# Up the loopback device in the new namespace.
|
|
subprocess.run(["ip", "link", "set", "up", "dev", "lo"], check=True)
|
|
try:
|
|
yield
|
|
finally:
|
|
# Revert to the old namespace, dropping our
|
|
# reference to the new one.
|
|
libc.setns(oldnet.fileno(), CLONE_NEWNET)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def osbuild_test_store():
|
|
store = os.getenv("OSBUILD_TEST_STORE")
|
|
if store:
|
|
yield os.path.abspath(store)
|
|
else:
|
|
with tempfile.TemporaryDirectory(dir="/var/tmp", prefix="osbuild-composer-test-") as store:
|
|
yield store
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def temporary_json_file(obj):
|
|
f = tempfile.NamedTemporaryFile("w", delete=False)
|
|
json.dump(obj, f, indent=2)
|
|
f.close()
|
|
try:
|
|
yield f.name
|
|
finally:
|
|
os.unlink(f.name)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def qemu_boot_image(image_file):
|
|
with tempfile.TemporaryDirectory() as dir:
|
|
# Create an ISO that cloud-init can consume with userdata.
|
|
subprocess.run(["genisoimage",
|
|
"-quiet",
|
|
"-input-charset", "utf-8",
|
|
"-output", f"{dir}/cloudinit.iso",
|
|
"-volid", "cidata",
|
|
"-joliet",
|
|
"-rock",
|
|
f"{TEST_DIR}/cloud-init/user-data",
|
|
f"{TEST_DIR}/cloud-init/meta-data"],
|
|
check=True)
|
|
# run in background
|
|
cmd = ["qemu-system-x86_64",
|
|
"-m", "2048",
|
|
"-snapshot",
|
|
"-accel", "accel=kvm:hvf:tcg",
|
|
"-cdrom", f"{dir}/cloudinit.iso",
|
|
"-net", "nic,model=rtl8139", "-net", "user,hostfwd=tcp::22-:22",
|
|
"-nographic",
|
|
image_file
|
|
]
|
|
print(f"running qemu command: {' '.join(cmd)}")
|
|
vm = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
try:
|
|
yield None
|
|
finally:
|
|
vm.kill()
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def nspawn_boot_image(image_file, name):
|
|
cmd = ["systemd-nspawn", "--boot", "--register=no", "-M", name, "--image", image_file]
|
|
print(f"running nspawn command: {' '.join(cmd)}")
|
|
container = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
try:
|
|
yield None
|
|
finally:
|
|
container.kill()
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def nspawn_boot_archive(image_file, name):
|
|
with tempfile.TemporaryDirectory(dir="/var/tmp") as dir:
|
|
cmd = ["tar", "xf", image_file]
|
|
print(f"extracting image to {dir}: {' '.join(cmd)}")
|
|
subprocess.run(cmd, cwd=dir)
|
|
cmd = ["systemd-nspawn", "--boot", "--register=no", "-M", name, "--directory", dir]
|
|
print(f"running nspawn command: {' '.join(cmd)}")
|
|
container = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
try:
|
|
yield None
|
|
finally:
|
|
container.kill()
|
|
|
|
|
|
def run_osbuild(pipeline, store):
|
|
osbuild_cmd = ["python3", "-m", "osbuild", "--json", "--libdir", ".", "--store", store, "-"]
|
|
|
|
build_env = os.getenv("OSBUILD_TEST_BUILD_ENV", None)
|
|
if build_env:
|
|
osbuild_cmd.append("--build-env")
|
|
osbuild_cmd.append(os.path.abspath(build_env))
|
|
|
|
result = dict()
|
|
result = json.loads(subprocess.check_output(osbuild_cmd, cwd="./osbuild", encoding="utf-8", input=json.dumps(pipeline)))
|
|
|
|
return result.get("output_id")
|
|
|
|
|
|
def run_ssh_test(private_key):
|
|
cmd = ["ssh",
|
|
"-p", "22",
|
|
"-i", private_key,
|
|
"-o", "StrictHostKeyChecking=no",
|
|
"redhat@localhost",
|
|
"systemctl --wait is-system-running"]
|
|
for _ in range(20):
|
|
try:
|
|
print("attempting ssh connection")
|
|
# Run the process with check=False because it returns non-zero return code for "degraded"
|
|
sp = subprocess.run(cmd, timeout=120, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
if sp.returncode == 255:
|
|
raise subprocess.CalledProcessError(sp.returncode, cmd)
|
|
output = sp.stdout.decode('utf-8').strip()
|
|
if output == "running":
|
|
print("ssh test passed")
|
|
return True
|
|
elif output == "degraded":
|
|
print("ssh test passed, but the system is degraded")
|
|
return True
|
|
elif output == "starting":
|
|
time.sleep(10)
|
|
else:
|
|
print(f"ssh test failed, system status is: {output}")
|
|
return False
|
|
except subprocess.TimeoutExpired:
|
|
print("ssh timeout expired")
|
|
except subprocess.CalledProcessError as e:
|
|
time.sleep(10)
|
|
|
|
print("ssh test failure")
|
|
return False
|
|
|
|
|
|
def run_test(case, private_key, store):
|
|
if "pipeline" not in case:
|
|
print("skipping this test case, no pipeline given")
|
|
return True
|
|
|
|
try:
|
|
output_id = run_osbuild(case["pipeline"], store)
|
|
except subprocess.CalledProcessError as err:
|
|
print(err.output)
|
|
return False
|
|
|
|
if output_id == None:
|
|
print(f"osbuild did not produce an image")
|
|
return False
|
|
|
|
print(f"osbuild successfully built pipeline {output_id}")
|
|
|
|
filename = os.path.join(store, "refs", output_id, case["compose-request"]["filename"])
|
|
|
|
fn, ex = os.path.splitext(filename)
|
|
if ex == ".xz":
|
|
_, ex = os.path.splitext(fn)
|
|
if ex != ".tar":
|
|
with open(fn, "w") as f:
|
|
subprocess.run(["xz", "--decompress", "--stdout", filename], stdout=f)
|
|
filename = fn
|
|
|
|
if "image-info" in case:
|
|
info = json.loads(subprocess.check_output(["tools/image-info", filename]))
|
|
if info != case["image-info"]:
|
|
with temporary_json_file(case["image-info"]) as a, temporary_json_file(info) as b:
|
|
subprocess.run(["diff", "--unified", "--color", "--label", "expected", a, "--label", "got", b], check=False)
|
|
return False
|
|
|
|
if "boot" in case:
|
|
with netns():
|
|
if case["boot"]["type"] == "qemu":
|
|
with qemu_boot_image(filename):
|
|
return run_ssh_test(private_key)
|
|
elif case["boot"]["type"] == "qemu-extract":
|
|
with qemu_boot_image(filename):
|
|
return run_ssh_test(private_key)
|
|
elif case["boot"]["type"] == "nspawn":
|
|
with nspawn_boot_image(filename, output_id):
|
|
return run_ssh_test(private_key)
|
|
elif case["boot"]["type"] == "nspawn-extract":
|
|
with nspawn_boot_archive(filename, output_id):
|
|
return run_ssh_test(private_key)
|
|
else:
|
|
print("unknown test type")
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Run test cases.')
|
|
parser.add_argument('--cases', type=str, nargs='*', help='Limit tests to given cases.')
|
|
parser.add_argument('--arches', type=str, nargs='*', help='Limit tests to given architectures.')
|
|
parser.add_argument('--distros', type=str, nargs='*', help='Limit tests to given distros.')
|
|
arg = parser.parse_args()
|
|
|
|
failed = False
|
|
with osbuild_test_store() as store:
|
|
print(f"OSBUILD_TEST_STORE={store}")
|
|
private_key = f"{TEST_DIR}/keyring/id_rsa"
|
|
for filename in arg.cases if arg.cases else glob.glob(f"{TEST_DIR}/cases/*.json"):
|
|
with open(filename) as f:
|
|
case = json.load(f)
|
|
|
|
if arg.distros:
|
|
if case["compose-request"]["distro"] not in arg.distros:
|
|
continue
|
|
|
|
if arg.arches:
|
|
if case["compose-request"]["arch"] not in arg.arches:
|
|
continue
|
|
|
|
print(f"RUNNING: {filename}")
|
|
if run_test(case, private_key, store):
|
|
print(f"SUCCESS")
|
|
print()
|
|
else:
|
|
print(f"FAIL")
|
|
print()
|
|
failed = True
|
|
|
|
return 1 if failed else 0
|
|
|
|
|
|
r = main()
|
|
if r:
|
|
sys.exit(r)
|