test/run: merge boot and non-boot test handling
We can now select specific cases, but whether or not to check image-info or boot the image is determined purely by the contents of the json test case. We still run the tests as two travis workers just to avoid the timeout, this should clearly be reworked. Signed-off-by: Tom Gundersen <teg@jklm.no>
This commit is contained in:
parent
c090ab3812
commit
3adcac0123
2 changed files with 68 additions and 120 deletions
165
test/run
165
test/run
|
|
@ -70,21 +70,6 @@ def temporary_json_file(obj):
|
|||
os.unlink(f.name)
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def create_ssh_keys():
|
||||
with tempfile.TemporaryDirectory() as dir:
|
||||
# Copy the keys and set correct permissions/ownership on the directory and keys
|
||||
# Proper directory ownership is implied by the fact that this process creates the directory
|
||||
# The mode is adjusted by `chmod`
|
||||
shutil.copyfile(f"{TEST_DIR}/keyring/id_rsa", f"{dir}/id_rsa")
|
||||
shutil.copyfile(f"{TEST_DIR}/keyring/id_rsa.pub", f"{dir}/id_rsa.pub")
|
||||
os.chmod(f"{dir}/id_rsa", 0o600)
|
||||
try:
|
||||
yield dir
|
||||
finally:
|
||||
pass
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def qemu_boot_image(image_file):
|
||||
with tempfile.TemporaryDirectory() as dir:
|
||||
|
|
@ -118,19 +103,7 @@ def qemu_boot_image(image_file):
|
|||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def qemu_extract_boot_image(image_file):
|
||||
with tempfile.TemporaryDirectory() as dir:
|
||||
basename = os.path.basename(image_file)
|
||||
uncompressed, _ = os.path.splitext(basename)
|
||||
print(f"uncompressing {image_file} to {dir}/{uncompressed}")
|
||||
subprocess.run(["cp", "-a", image_file, f"{dir}/"], check=True)
|
||||
subprocess.run(["unxz", f"{dir}/{basename}"])
|
||||
with qemu_boot_image(f"{dir}/{uncompressed}"):
|
||||
yield
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def nspawn_boot_container(image_file, name):
|
||||
def nspawn_boot_image(image_file, name):
|
||||
cmd = ["systemd-nspawn", "--boot", "--register=no", "-M", name, "--image", image_file]
|
||||
print(f"running nspawn command: {' '.join(cmd)}")
|
||||
container = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
|
|
@ -141,8 +114,8 @@ def nspawn_boot_container(image_file, name):
|
|||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def nspawn_extract_container(image_file, name):
|
||||
with tempfile.TemporaryDirectory() as dir:
|
||||
def nspawn_boot_archive(image_file, name):
|
||||
with tempfile.TemporaryDirectory(dir="/var/tmp") as dir:
|
||||
subprocess.run(["tar", "xf", image_file], cwd=dir)
|
||||
cmd = ["systemd-nspawn", "--boot", "--register=no", "-M", name, "--directory", "."]
|
||||
print(f"running nspawn command: {' '.join(cmd)}")
|
||||
|
|
@ -170,42 +143,6 @@ def run_osbuild(pipeline, store):
|
|||
return result["tree_id"], result["output_id"]
|
||||
|
||||
|
||||
def run_test(case, store):
|
||||
try:
|
||||
if "pipeline" not in case:
|
||||
print("skipping this test case, no pipeline given")
|
||||
return True
|
||||
|
||||
_, output_id = run_osbuild(case["pipeline"], store)
|
||||
filename = os.path.join(store, "refs", output_id, case["compose"]["filename"])
|
||||
|
||||
fn, ex = os.path.splitext(filename)
|
||||
if ex == ".xz":
|
||||
with open(fn, "w") as f:
|
||||
subprocess.run(["xz", "--decompress", "--stdout", filename], stdout=f)
|
||||
filename = fn
|
||||
info = json.loads(subprocess.check_output(["tools/image-info", filename]))
|
||||
if info != case["image-info"]:
|
||||
with temporary_json_file(case["image-info"]) as a, temporary_json_file(info) as b:
|
||||
subprocess.run(["diff", "--unified", "--color", "--label", "expected", a, "--label", "got", b], check=False)
|
||||
return False
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def get_local_boot_test_case(fname: str) -> Union[Tuple[str, str, Dict[Any, Any]], None]:
|
||||
with open(fname, "r") as fd:
|
||||
test_case_dict = json.load(fd)
|
||||
|
||||
if "boot" not in test_case_dict:
|
||||
return None
|
||||
|
||||
pipeline_dict = test_case_dict["pipeline"]
|
||||
return test_case_dict["boot"]["type"], test_case_dict["compose"]["filename"], pipeline_dict
|
||||
|
||||
|
||||
def run_ssh_test(private_key):
|
||||
cmd = ["ssh",
|
||||
"-p", "22",
|
||||
|
|
@ -239,61 +176,69 @@ def run_ssh_test(private_key):
|
|||
return 1
|
||||
|
||||
|
||||
def run_test(case, private_key, store):
|
||||
if "pipeline" not in case:
|
||||
print("skipping this test case, no pipeline given")
|
||||
return True
|
||||
|
||||
_, output_id = run_osbuild(case["pipeline"], store)
|
||||
filename = os.path.join(store, "refs", output_id, case["compose"]["filename"])
|
||||
|
||||
fn, ex = os.path.splitext(filename)
|
||||
if ex == ".xz":
|
||||
_, ex = os.path.splitext(fn)
|
||||
if ex != ".tar":
|
||||
with open(fn, "w") as f:
|
||||
subprocess.run(["xz", "--decompress", "--stdout", filename], stdout=f)
|
||||
filename = fn
|
||||
|
||||
if "image-info" in case:
|
||||
info = json.loads(subprocess.check_output(["tools/image-info", filename]))
|
||||
if info != case["image-info"]:
|
||||
with temporary_json_file(case["image-info"]) as a, temporary_json_file(info) as b:
|
||||
subprocess.run(["diff", "--unified", "--color", "--label", "expected", a, "--label", "got", b], check=False)
|
||||
return False
|
||||
|
||||
if "boot" in case:
|
||||
with netns():
|
||||
if case["boot"]["type"] == "qemu":
|
||||
with qemu_boot_image(filename):
|
||||
if run_ssh_test(private_key) == 1:
|
||||
failed = True
|
||||
elif case["boot"]["type"] == "qemu-extract":
|
||||
with qemu_boot_image(filename):
|
||||
if run_ssh_test(private_key) == 1:
|
||||
failed = True
|
||||
elif case["boot"]["type"] == "nspawn":
|
||||
with nspawn_boot_image(filename, output_id):
|
||||
if run_ssh_test(private_key) == 1:
|
||||
failed = True
|
||||
elif case["boot"]["type"] == "nspawn-extract":
|
||||
with nspawn_boot_archive(filename, output_id):
|
||||
if run_ssh_test(private_key) == 1:
|
||||
failed = True
|
||||
else:
|
||||
print("unknown test type")
|
||||
failed = True
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Run test cases.')
|
||||
parser.add_argument('--boot-test', type=str, nargs='*', help='Boot images produced by osbuild')
|
||||
parser.add_argument('--image-info', type=str, nargs='*',
|
||||
help='Build images and run image-info on them (default action)')
|
||||
parser.add_argument('--cases', type=str, nargs='*', help='Limit tests to given cases.')
|
||||
arg = parser.parse_args()
|
||||
|
||||
# Run local boot test
|
||||
if arg.boot_test is not None:
|
||||
failed = False
|
||||
with osbuild_test_store() as store:
|
||||
with create_ssh_keys() as keydir:
|
||||
if arg.boot_test != []:
|
||||
test_cases = list(map(lambda x: f"{TEST_DIR}/cases/{x}_local_boot.json", arg.boot_test))
|
||||
else:
|
||||
test_cases = glob.glob(f"{TEST_DIR}/cases/*.json")
|
||||
|
||||
for test_case in test_cases:
|
||||
test_case_tuple = get_local_boot_test_case(test_case)
|
||||
if test_case_tuple:
|
||||
test_type, image_fname, pl_dict = test_case_tuple
|
||||
print("starting osbuild")
|
||||
_, output_id = run_osbuild(pl_dict, store)
|
||||
print("osbuild success")
|
||||
with netns():
|
||||
if test_type == "qemu":
|
||||
with qemu_boot_image(f"{store}/refs/{output_id}/{image_fname}"):
|
||||
if run_ssh_test(f"{keydir}/id_rsa") == 1:
|
||||
failed = True
|
||||
elif test_type == "qemu-extract":
|
||||
with qemu_extract_boot_image(f"{store}/refs/{output_id}/{image_fname}"):
|
||||
if run_ssh_test(f"{keydir}/id_rsa") == 1:
|
||||
failed = True
|
||||
elif test_type == "nspawn":
|
||||
with nspawn_boot_container(f"{store}/refs/{output_id}/{image_fname}", output_id):
|
||||
if run_ssh_test(f"{keydir}/id_rsa") == 1:
|
||||
failed = True
|
||||
elif test_type == "nspawn-extract":
|
||||
with nspawn_extract_container(f"{store}/refs/{output_id}/{image_fname}", output_id):
|
||||
if run_ssh_test(f"{keydir}/id_rsa") == 1:
|
||||
failed = True
|
||||
else:
|
||||
print("unknown test type")
|
||||
|
||||
return 1 if failed else 0
|
||||
|
||||
failed = False
|
||||
with osbuild_test_store() as store:
|
||||
for filename in arg.image_info if arg.image_info != [] else glob.glob(f"{TEST_DIR}/cases/*.json"):
|
||||
private_key = f"{TEST_DIR}/keyring/id_rsa"
|
||||
for filename in arg.cases if arg.cases else glob.glob(f"{TEST_DIR}/cases/*.json"):
|
||||
name = os.path.basename(filename)[:-5]
|
||||
with open(filename) as f:
|
||||
case = json.load(f)
|
||||
|
||||
print(f"{name}")
|
||||
if not run_test(case, store):
|
||||
if not run_test(case, private_key, store):
|
||||
print(f"FAIL")
|
||||
print()
|
||||
failed = True
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue