#!/usr/bin/python3 import argparse import subprocess import json import os import sys import tempfile def is_subprocess_succeeding(*args, **kwargs): sp = subprocess.run(*args, **kwargs, stdout=subprocess.PIPE) return sp.returncode == 0 def get_subprocess_stdout(*args, **kwargs): sp = subprocess.run(*args, **kwargs, stdout=subprocess.PIPE) if sp.returncode != 0: sys.stderr.write(sp.stdout) sys.exit(1) return sp.stdout def run_osbuild(manifest, store, output, export): with tempfile.TemporaryFile(dir="/tmp", prefix="osbuild-test-case-generator-", suffix=".log") as log: try: subprocess.run(["osbuild", "--store", store, "--output-directory", output, "--checkpoint", "build", "--export", export, "-"], stdout=log, stderr=subprocess.STDOUT, check=True, encoding="utf-8", input=json.dumps(manifest)) except: log.seek(0) print(log.read()) raise class TestCaseGenerator: ''' This class generates a json test case. It accepts a test_case_request as input to the constructor: { "boot": { "type": "qemu" }, "compose-request": { "distro": "fedora-30", "arch": "x86_64", "image-type": "qcow2", "filename": "disk.qcow2", "blueprint": {} } } It then outputs a json test case from the get_test_case() method. ''' def __init__(self, test_case_request): self.test_case = test_case_request def get_test_case(self, no_image_info, store): compose_request = json.dumps(self.test_case["compose-request"]) pipeline_command = ["go", "run", "./cmd/osbuild-pipeline", "-"] self.test_case["manifest"] = json.loads(get_subprocess_stdout(pipeline_command, input=compose_request, encoding="utf-8")) pipeline_command = ["go", "run", "./cmd/osbuild-pipeline", "-rpmmd", "-"] self.test_case["rpmmd"] = json.loads(get_subprocess_stdout(pipeline_command, input=compose_request, encoding="utf-8")) if no_image_info == False: with tempfile.TemporaryDirectory(dir=store, prefix="test-case-output-") as output: manifest = self.test_case["manifest"] version = manifest.get("version", "1") if version == "1": export = "assembler" elif version == "2": export = manifest["pipelines"][-1]["name"] else: print(f"Unknown manifest format version {version}") sys.exit(1) run_osbuild(manifest, store, output, export) image_file = os.path.join(output, export, self.test_case["compose-request"]["filename"]) image_info = get_subprocess_stdout(["tools/image-info", image_file], encoding="utf-8") self.test_case["image-info"] = json.loads(image_info) return self.test_case def generate_test_case(test_type, distro, arch, output_format, test_case_request, keep_image_info, store, output): print(f"generating test case for {output_format}", flush=True) generator = TestCaseGenerator(test_case_request) test_case = generator.get_test_case(keep_image_info, store) name = distro.replace("-", "_") + "-" + arch + "-" + output_format.replace("-", "_") + "-" + test_type + ".json" file_name = output + "/" + name if keep_image_info: try: with open(file_name, 'r') as case_file: old_test_case = json.load(case_file) image_info = old_test_case.get("image-info") if image_info: test_case["image-info"] = image_info except: pass with open(file_name, 'w') as case_file: json.dump(test_case, case_file, indent=2) case_file.write("\n") def filter_repos_by_image_type_tags(image_type: str, repos: list) -> list: filtered_repos = [] for repo in repos: image_type_tags = repo.get("image_type_tags") # if image type tags are defined for the repository, add it to the list only # if the provided image type is in the list of image type tags. if image_type_tags is not None: if image_type in image_type_tags: filtered_repos.append(repo) continue filtered_repos.append(repo) return filtered_repos def main(distro, arch, image_types, keep_image_info, store, output): with open("tools/test-case-generators/format-request-map.json") as format_request_json: format_request_dict = json.load(format_request_json) with open("tools/test-case-generators/repos.json") as repos_json: repos_dict = json.load(repos_json) filtered_test_case_request_items = [ "overrides", "supported_arches" ] for output_format, test_case_request in format_request_dict.items(): filtered_request = dict(filter(lambda i: i[0] not in filtered_test_case_request_items, test_case_request.items())) if filtered_request["compose-request"]["image-type"] not in image_types: continue filtered_request["compose-request"]["distro"] = distro # if the compose-request has specified supported arches, then generate # the test case only if the requested arch is in the list supported_arches = test_case_request.get("supported_arches") if supported_arches is not None and arch not in supported_arches: continue filtered_request["compose-request"]["arch"] = arch request_repos = filter_repos_by_image_type_tags(filtered_request["compose-request"]["image-type"], repos_dict[distro][arch]) filtered_request["compose-request"]["repositories"] = request_repos if distro in test_case_request["overrides"]: filtered_request["compose-request"].update(test_case_request["overrides"][distro]) # Some image types can not be analyzed by 'image-info'. To be able to # regenerate all test cases for a distro, this must be request definition # and not a cli option. Default to the 'keep_image_info' value if not # set explicitly for the image type. no_image_info = filtered_request.get("no-image-info", keep_image_info) generate_test_case("boot", distro, arch, output_format, filtered_request, no_image_info, store, output) return if __name__ == '__main__': parser = argparse.ArgumentParser(description="Generate test cases") parser.add_argument("--distro", help="distribution for test cases", required=True) parser.add_argument("--arch", help="architecture for test cases", required=True) parser.add_argument("--image-types", help="image types for test cases", required=True, nargs='+') parser.add_argument("--keep-image-info", action='store_true', help="skip image info (re)generation, but keep the one found in the existing test case") parser.add_argument("--store", metavar="STORE_DIRECTORY", type=os.path.abspath, help="path to the osbuild store", required=True) parser.add_argument("--output", metavar="OUTPUT_DIRECTORY", type=os.path.abspath, help="path to the output directory", required=True) args = parser.parse_args() main(args.distro, args.arch, args.image_types, args.keep_image_info, args.store, args.output) sys.exit()