tests: remove test/run script
The script is not needed anymore, because Travis now uses the Go implementation of image tests. Hurray!
This commit is contained in:
parent
d83908f0cf
commit
5311b3a45c
1 changed files with 0 additions and 268 deletions
268
test/run
268
test/run
|
|
@ -1,268 +0,0 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
import argparse
|
||||
import contextlib
|
||||
import ctypes
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
import shutil
|
||||
import urllib.request
|
||||
|
||||
|
||||
from typing import Dict, Any, Union, Tuple
|
||||
|
||||
|
||||
|
||||
|
||||
TEST_DIR = os.path.dirname(__file__)
|
||||
|
||||
|
||||
def errcheck(ret, func, args):
|
||||
if ret == -1:
|
||||
e = ctypes.get_errno()
|
||||
raise OSError(e, os.strerror(e))
|
||||
|
||||
|
||||
CLONE_NEWNET = 0x40000000
|
||||
libc = ctypes.CDLL('libc.so.6', use_errno=True)
|
||||
libc.setns.errcheck = errcheck
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def netns():
|
||||
# Grab a reference to the current namespace.
|
||||
with open("/proc/self/ns/net") as oldnet:
|
||||
# Create a new namespace and enter it.
|
||||
libc.unshare(CLONE_NEWNET)
|
||||
# Up the loopback device in the new namespace.
|
||||
subprocess.run(["ip", "link", "set", "up", "dev", "lo"], check=True)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
# Revert to the old namespace, dropping our
|
||||
# reference to the new one.
|
||||
libc.setns(oldnet.fileno(), CLONE_NEWNET)
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def osbuild_test_store():
|
||||
store = os.getenv("OSBUILD_TEST_STORE")
|
||||
if store:
|
||||
yield os.path.abspath(store)
|
||||
else:
|
||||
with tempfile.TemporaryDirectory(dir="/var/tmp", prefix="osbuild-composer-test-") as store:
|
||||
yield store
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def temporary_json_file(obj):
|
||||
f = tempfile.NamedTemporaryFile("w", delete=False)
|
||||
json.dump(obj, f, indent=2)
|
||||
f.close()
|
||||
try:
|
||||
yield f.name
|
||||
finally:
|
||||
os.unlink(f.name)
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def qemu_boot_image(image_file):
|
||||
with tempfile.TemporaryDirectory() as dir:
|
||||
# Create an ISO that cloud-init can consume with userdata.
|
||||
subprocess.run(["genisoimage",
|
||||
"-quiet",
|
||||
"-input-charset", "utf-8",
|
||||
"-output", f"{dir}/cloudinit.iso",
|
||||
"-volid", "cidata",
|
||||
"-joliet",
|
||||
"-rock",
|
||||
f"{TEST_DIR}/cloud-init/user-data",
|
||||
f"{TEST_DIR}/cloud-init/meta-data"],
|
||||
check=True)
|
||||
# run in background
|
||||
cmd = ["qemu-system-x86_64",
|
||||
"-m", "2048",
|
||||
"-snapshot",
|
||||
"-accel", "accel=kvm:hvf:tcg",
|
||||
"-cdrom", f"{dir}/cloudinit.iso",
|
||||
"-net", "nic,model=rtl8139", "-net", "user,hostfwd=tcp::22-:22",
|
||||
"-nographic",
|
||||
image_file
|
||||
]
|
||||
print(f"running qemu command: {' '.join(cmd)}")
|
||||
vm = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
try:
|
||||
yield None
|
||||
finally:
|
||||
vm.kill()
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def nspawn_boot_image(image_file, name):
|
||||
cmd = ["systemd-nspawn", "--boot", "--register=no", "-M", name, "--image", image_file]
|
||||
print(f"running nspawn command: {' '.join(cmd)}")
|
||||
container = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
try:
|
||||
yield None
|
||||
finally:
|
||||
container.kill()
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def nspawn_boot_archive(image_file, name):
|
||||
with tempfile.TemporaryDirectory(dir="/var/tmp") as dir:
|
||||
cmd = ["tar", "xf", image_file]
|
||||
print(f"extracting image to {dir}: {' '.join(cmd)}")
|
||||
subprocess.run(cmd, cwd=dir)
|
||||
cmd = ["systemd-nspawn", "--boot", "--register=no", "-M", name, "--directory", dir]
|
||||
print(f"running nspawn command: {' '.join(cmd)}")
|
||||
container = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
try:
|
||||
yield None
|
||||
finally:
|
||||
container.kill()
|
||||
|
||||
|
||||
def run_osbuild(manifest, store):
|
||||
osbuild_cmd = ["python3", "-m", "osbuild", "--json", "--libdir", ".", "--store", store, "-"]
|
||||
|
||||
build_env = os.getenv("OSBUILD_TEST_BUILD_ENV", None)
|
||||
if build_env:
|
||||
osbuild_cmd.append("--build-env")
|
||||
osbuild_cmd.append(os.path.abspath(build_env))
|
||||
|
||||
result = dict()
|
||||
result = json.loads(subprocess.check_output(osbuild_cmd, cwd="./osbuild", encoding="utf-8", input=json.dumps(manifest)))
|
||||
|
||||
return result.get("output_id")
|
||||
|
||||
|
||||
def run_ssh_test(private_key):
|
||||
cmd = ["ssh",
|
||||
"-p", "22",
|
||||
"-i", private_key,
|
||||
"-o", "StrictHostKeyChecking=no",
|
||||
"redhat@localhost",
|
||||
"systemctl --wait is-system-running"]
|
||||
for _ in range(20):
|
||||
try:
|
||||
print("attempting ssh connection")
|
||||
# Run the process with check=False because it returns non-zero return code for "degraded"
|
||||
sp = subprocess.run(cmd, timeout=120, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
if sp.returncode == 255:
|
||||
raise subprocess.CalledProcessError(sp.returncode, cmd)
|
||||
output = sp.stdout.decode('utf-8').strip()
|
||||
if output == "running":
|
||||
print("ssh test passed")
|
||||
return True
|
||||
elif output == "degraded":
|
||||
print("ssh test passed, but the system is degraded")
|
||||
return True
|
||||
elif output == "starting":
|
||||
time.sleep(10)
|
||||
else:
|
||||
print(f"ssh test failed, system status is: {output}")
|
||||
return False
|
||||
except subprocess.TimeoutExpired:
|
||||
print("ssh timeout expired")
|
||||
except subprocess.CalledProcessError as e:
|
||||
time.sleep(10)
|
||||
|
||||
print("ssh test failure")
|
||||
return False
|
||||
|
||||
|
||||
def run_test(case, private_key, store):
|
||||
try:
|
||||
output_id = run_osbuild(case["manifest"], store)
|
||||
except subprocess.CalledProcessError as err:
|
||||
print(err.output)
|
||||
return False
|
||||
|
||||
if output_id == None:
|
||||
print(f"osbuild did not produce an image")
|
||||
return False
|
||||
|
||||
print(f"osbuild successfully built pipeline {output_id}")
|
||||
|
||||
filename = os.path.join(store, "refs", output_id, case["compose-request"]["filename"])
|
||||
|
||||
fn, ex = os.path.splitext(filename)
|
||||
if ex == ".xz":
|
||||
_, ex = os.path.splitext(fn)
|
||||
if ex != ".tar":
|
||||
with open(fn, "w") as f:
|
||||
subprocess.run(["xz", "--decompress", "--stdout", filename], stdout=f)
|
||||
filename = fn
|
||||
|
||||
if "image-info" in case:
|
||||
info = json.loads(subprocess.check_output(["tools/image-info", filename]))
|
||||
if info != case["image-info"]:
|
||||
with temporary_json_file(case["image-info"]) as a, temporary_json_file(info) as b:
|
||||
subprocess.run(["diff", "--unified", "--color", "--label", "expected", a, "--label", "got", b], check=False)
|
||||
return False
|
||||
|
||||
if "boot" in case:
|
||||
with netns():
|
||||
if case["boot"]["type"] == "qemu":
|
||||
with qemu_boot_image(filename):
|
||||
return run_ssh_test(private_key)
|
||||
elif case["boot"]["type"] == "qemu-extract":
|
||||
with qemu_boot_image(filename):
|
||||
return run_ssh_test(private_key)
|
||||
elif case["boot"]["type"] == "nspawn":
|
||||
with nspawn_boot_image(filename, output_id):
|
||||
return run_ssh_test(private_key)
|
||||
elif case["boot"]["type"] == "nspawn-extract":
|
||||
with nspawn_boot_archive(filename, output_id):
|
||||
return run_ssh_test(private_key)
|
||||
else:
|
||||
print("unknown test type")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Run test cases.')
|
||||
parser.add_argument('--cases', type=str, nargs='*', help='Limit tests to given cases.')
|
||||
parser.add_argument('--arches', type=str, nargs='*', help='Limit tests to given architectures.')
|
||||
parser.add_argument('--distros', type=str, nargs='*', help='Limit tests to given distros.')
|
||||
arg = parser.parse_args()
|
||||
|
||||
failed = False
|
||||
with osbuild_test_store() as store:
|
||||
print(f"OSBUILD_TEST_STORE={store}")
|
||||
private_key = f"{TEST_DIR}/keyring/id_rsa"
|
||||
for filename in arg.cases if arg.cases else glob.glob(f"{TEST_DIR}/cases/*.json"):
|
||||
with open(filename) as f:
|
||||
case = json.load(f)
|
||||
|
||||
if arg.distros:
|
||||
if case["compose-request"]["distro"] not in arg.distros:
|
||||
continue
|
||||
|
||||
if arg.arches:
|
||||
if case["compose-request"]["arch"] not in arg.arches:
|
||||
continue
|
||||
|
||||
print(f"RUNNING: {filename}")
|
||||
if run_test(case, private_key, store):
|
||||
print(f"SUCCESS")
|
||||
print()
|
||||
else:
|
||||
print(f"FAIL")
|
||||
print()
|
||||
failed = True
|
||||
|
||||
return 1 if failed else 0
|
||||
|
||||
|
||||
r = main()
|
||||
if r:
|
||||
sys.exit(r)
|
||||
Loading…
Add table
Add a link
Reference in a new issue