debian-forge-composer/test/run
Tom Gundersen 49387604e4 test/run: pass cloud-init data to qemu
This passes the redhat user with ssh key as an ISO image to our
qemu instances, making sure images relying on cloud-init rather than
hardcoded user credentials can be used in our tests.

Signed-off-by: Tom Gundersen <teg@jklm.no>
2019-12-10 02:47:35 +01:00

294 lines
10 KiB
Python
Executable file

#!/usr/bin/python3
import argparse
import contextlib
import ctypes
import glob
import json
import os
import subprocess
import sys
import tempfile
import time
import shutil
import urllib.request
from typing import Dict, Any, Union, Tuple
TEST_DIR = os.path.dirname(__file__)
def errcheck(ret, func, args):
if ret == -1:
e = ctypes.get_errno()
raise OSError(e, os.strerror(e))
CLONE_NEWNET = 0x40000000
libc = ctypes.CDLL('libc.so.6', use_errno=True)
libc.setns.errcheck = errcheck
@contextlib.contextmanager
def netns():
# Grab a reference to the current namespace.
with open("/proc/self/ns/net") as oldnet:
# Create a new namespace and enter it.
libc.unshare(CLONE_NEWNET)
# Up the loopback device in the new namespace.
subprocess.run(["ip", "link", "set", "up", "dev", "lo"], check=True)
try:
yield
finally:
# Revert to the old namespace, dropping our
# reference to the new one.
libc.setns(oldnet.fileno(), CLONE_NEWNET)
@contextlib.contextmanager
def osbuild_test_store():
store = os.getenv("OSBUILD_TEST_STORE")
if store:
yield store
else:
with tempfile.TemporaryDirectory(dir="/var/tmp", prefix="osbuild-composer-test-") as store:
yield store
@contextlib.contextmanager
def temporary_json_file(obj):
f = tempfile.NamedTemporaryFile("w", delete=False)
json.dump(obj, f, indent=2)
f.close()
try:
yield f.name
finally:
os.unlink(f.name)
@contextlib.contextmanager
def create_ssh_keys():
with tempfile.TemporaryDirectory() as dir:
# Copy the keys and set correct permissions/ownership on the directory and keys
# Proper directory ownership is implied by the fact that this process creates the directory
# The mode is adjusted by `chmod`
shutil.copyfile(f"{TEST_DIR}/keyring/id_rsa", f"{dir}/id_rsa")
shutil.copyfile(f"{TEST_DIR}/keyring/id_rsa.pub", f"{dir}/id_rsa.pub")
os.chmod(f"{dir}/id_rsa", 0o600)
try:
yield dir
finally:
pass
@contextlib.contextmanager
def qemu_boot_image(image_file):
with tempfile.TemporaryDirectory() as dir:
# Create an ISO that cloud-init can consume with userdata.
subprocess.run(["genisoimage",
"-quiet",
"-input-charset", "utf-8",
"-output", f"{dir}/cloudinit.iso",
"-volid", "cidata",
"-joliet",
"-rock",
f"{TEST_DIR}/cloud-init/user-data",
f"{TEST_DIR}/cloud-init/meta-data"],
check=True)
# run in background
cmd = ["qemu-system-x86_64",
"-m", "2048",
"-snapshot",
"-accel", "accel=kvm:hvf:tcg",
"-cdrom", f"{dir}/cloudinit.iso",
"-net", "nic,model=rtl8139", "-net", "user,hostfwd=tcp::22-:22",
"-nographic",
image_file
]
print(f"running qemu command: {' '.join(cmd)}")
vm = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
yield None
finally:
vm.kill()
@contextlib.contextmanager
def nspawn_boot_container(image_file):
cmd = ["systemd-nspawn", "--boot", "--register=no", "-M", "boottest", "--image", image_file]
print(f"running nspawn command: {' '.join(cmd)}")
container = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
yield None
finally:
container.kill()
@contextlib.contextmanager
def nspawn_extract_container(image_file):
with tempfile.TemporaryDirectory() as dir:
subprocess.run(["tar", "xf", image_file], cwd=dir)
cmd = ["systemd-nspawn", "--boot", "--register=no", "-M", "boottest", "--directory", "."]
print(f"running nspawn command: {' '.join(cmd)}")
container = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=dir)
try:
yield None
finally:
container.kill()
def run_osbuild(pipeline, store):
osbuild_cmd = ["python3", "-m", "osbuild", "--json", "--libdir", ".", "--store", store, "-"]
build_env = os.getenv("OSBUILD_TEST_BUILD_ENV", None)
if build_env:
osbuild_cmd.append("--build-env")
osbuild_cmd.append(os.path.abspath(build_env))
result = dict()
try:
result = json.loads(subprocess.check_output(osbuild_cmd, cwd="./osbuild", encoding="utf-8", input=json.dumps(pipeline)))
except subprocess.CalledProcessError as err:
print(err.output)
return result["tree_id"], result["output_id"]
def run_test(case, store):
try:
if "compose" not in case:
print("skipping this test case")
return True
if "pipeline" in case:
_, output_id = run_osbuild(case["pipeline"], store)
filename = os.path.join(store, "refs", output_id, case["compose"]["filename"])
else:
filename, _ = urllib.request.urlretrieve(case["url"])
fn, ex = os.path.splitext(filename)
if ex == ".xz":
with open(fn, "w") as f:
subprocess.run(["xz", "--decompress", "--stdout", filename], stdout=f)
filename = fn
info = json.loads(subprocess.check_output(["tools/image-info", filename]))
if info != case["expected"]:
with temporary_json_file(case["expected"]) as a, temporary_json_file(info) as b:
subprocess.run(["diff", "--unified", "--color", "--label", "expected", a, "--label", "got", b], check=False)
return False
except KeyError:
pass
return True
def get_local_boot_test_case(fname: str) -> Union[Tuple[str, str, Dict[Any, Any]], None]:
with open(fname, "r") as fd:
test_case_dict = json.load(fd)
if "boot-test" not in test_case_dict:
return None
pipeline_dict = test_case_dict["pipeline"]
return test_case_dict["boot-test"]["boot-type"], test_case_dict["boot-test"]["filename"], pipeline_dict
def run_ssh_test(private_key):
cmd = ["ssh",
"-p", "22",
"-i", private_key,
"-o", "StrictHostKeyChecking=no",
"redhat@localhost",
"systemctl is-system-running"]
for _ in range(40):
try:
# Run the process with check=False because it returns non-zero return code for "degraded"
sp = subprocess.run(cmd, timeout=120, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if sp.returncode == 255:
raise subprocess.CalledProcessError(sp.returncode, cmd)
output = sp.stdout.decode('utf-8').strip()
print(output)
if output == "running":
print("ssh test success")
return 0
elif output == "degraded":
print("ssh test passed, but the system is degraded")
return 0
else:
print(f"ssh test failed, system status is: {output}")
return 1
except subprocess.TimeoutExpired:
print("ssh timeout expired")
except subprocess.CalledProcessError as e:
print(f"ssh error: {e}")
time.sleep(20)
print("ssh test failure")
return 1
def main():
parser = argparse.ArgumentParser(description='Run test cases.')
parser.add_argument('--boot-test', type=str, nargs='*', help='Boot images produced by osbuild')
parser.add_argument('--image-info', type=str, nargs='*',
help='Build images and run image-info on them (default action)')
arg = parser.parse_args()
# Run local boot test
if arg.boot_test is not None:
failed = False
with osbuild_test_store() as store:
with create_ssh_keys() as keydir:
if arg.boot_test != []:
test_cases = list(map(lambda x: f"{TEST_DIR}/cases/{x}_local_boot.json", arg.boot_test))
else:
test_cases = glob.glob(f"{TEST_DIR}/cases/*.json")
for test_case in test_cases:
test_case_tuple = get_local_boot_test_case(test_case)
if test_case_tuple:
test_type, image_fname, pl_dict = test_case_tuple
print("starting osbuild")
_, output_id = run_osbuild(pl_dict, store)
print("osbuild success")
with netns():
if test_type == "qemu":
with qemu_boot_image(f"{store}/refs/{output_id}/{image_fname}"):
if run_ssh_test(f"{keydir}/id_rsa") == 1:
failed = True
elif test_type == "nspawn":
with nspawn_boot_container(f"{store}/refs/{output_id}/{image_fname}"):
if run_ssh_test(f"{keydir}/id_rsa") == 1:
failed = True
elif test_type == "nspawn-extract":
with nspawn_extract_container(f"{store}/refs/{output_id}/{image_fname}"):
if run_ssh_test(f"{keydir}/id_rsa") == 1:
failed = True
else:
print("unknown test type")
return 1 if failed else 0
failed = False
with osbuild_test_store() as store:
for filename in arg.image_info if arg.image_info != [] else glob.glob(f"{TEST_DIR}/cases/*.json"):
name = os.path.basename(filename)[:-5]
with open(filename) as f:
case = json.load(f)
print(f"{name}")
if not run_test(case, store):
print(f"FAIL")
print()
failed = True
return 1 if failed else 0
r = main()
if r:
sys.exit(r)