diff --git a/osbuild-composer.spec b/osbuild-composer.spec index 544f7614c..c8dcedf4e 100644 --- a/osbuild-composer.spec +++ b/osbuild-composer.spec @@ -199,7 +199,6 @@ install -m 0755 -vp tools/generic_s3_test.sh %{buildroot}% install -m 0755 -vp tools/generic_s3_https_test.sh %{buildroot}%{_libexecdir}/osbuild-composer-test/ install -m 0755 -vp tools/run-mock-auth-servers.sh %{buildroot}%{_libexecdir}/osbuild-composer-test/ install -m 0755 -vp tools/set-env-variables.sh %{buildroot}%{_libexecdir}/osbuild-composer-test/ -install -m 0755 -vp tools/test-case-generators/generate-test-cases %{buildroot}%{_libexecdir}/osbuild-composer-test/ install -m 0755 -vd %{buildroot}%{_libexecdir}/tests/osbuild-composer install -m 0755 -vp test/cases/*.sh %{buildroot}%{_libexecdir}/tests/osbuild-composer/ diff --git a/tools/test-case-generators/generate-all-test-cases b/tools/test-case-generators/generate-all-test-cases deleted file mode 100755 index 27f92e743..000000000 --- a/tools/test-case-generators/generate-all-test-cases +++ /dev/null @@ -1,1517 +0,0 @@ -#!/usr/bin/python3 - -# pylint: disable=line-too-long -""" - generate-all-test-cases - - Script to generate all image test cases based on distro x arch x image-type - matrix read from `distro-arch-imagetype-map.json` or passed file. One can - filter the matrix just to a subset using `--distro`, `--arch` or - `--image-types` arguments. - - The script is intended to be run from the osbuild-composer sources directory - root, for which the image test cases should be (re)generated. Alternatively, - one can specify the path to the sources using the `--sources` option. - - The script generates image test cases in so-called Runner, which is a system - of a specific architecture. The used Runner type is specific to the used - command, but in general it is a system accessible via SSH connection. - - In simplified example, the script does the following: - 1. Provisions Runners if needed. - 2. Waits for the Runner to be ready for use by running a specific command - on it. - 3. Installs RPMs necessary for the test case generation on the Runner. - - In case you need to install packages from a specific external repository, - you can specify each such repository using --repofrompath option. - e.g. --repofrompath 'osbuild,https://download.copr.fedorainfracloud.org/results/@osbuild/osbuild/fedora-$releasever-$basearch/' - - In case you need to install osbuild-composer RPMs, which were built - from the sources copied over to the runner, use the `--build-rpms` - option. The script will build osbuild-composer RPMs on the remote - runner and install them. - 4. Copies the 'sources' using rsync to the Runner. - 5. Executes the 'tools/test-case-generators/generate-test-cases' on the - runner for each requested distro and image type. - 6. After each image test case is generated successfully, the result is - copied using rsync from the Runner to 'output' directory. - - The script supports the following commands: - - 'qemu' - generates image test cases locally using QEMU VMs. - - 'remote' - generates image test cases on existing remote hosts. - - 'qemu' command - ============== - Example (builds rhel-8 qcow2 images on aarch64 s390x ppc64le): - tools/test-case-generators/generate-all-test-cases \ - --output test/data/manifests \ - --arch aarch64 \ - --arch s390x \ - --arch ppc64le \ - --distro rhel-8 \ - --image-type qcow2 \ - qemu \ - --image-x86_64 ~/Downloads/Images/Fedora-Cloud-Base-33-1.2.x86_64.qcow2 \ - --image-ppc64le ~/Downloads/Images/Fedora-Cloud-Base-33-1.2.ppc64le.qcow2 \ - --image-aarch64 ~/Downloads/Images/Fedora-Cloud-Base-33-1.2.aarch64.qcow2 \ - --image-s390x ~/Downloads/Images/Fedora-Cloud-Base-33-1.2.s390x.qcow2 - - When using this command, the script spins up an ephemeral QEMU VM per each - required architecture. - - One can use e.q. Fedora cloud qcow2 images: - x86_64: https://download.fedoraproject.org/pub/fedora/linux/releases/33/Cloud/x86_64/images/Fedora-Cloud-Base-33-1.2.x86_64.qcow2 - aarch64: https://download.fedoraproject.org/pub/fedora/linux/releases/33/Cloud/aarch64/images/Fedora-Cloud-Base-33-1.2.aarch64.qcow2 - ppc64le: https://download.fedoraproject.org/pub/fedora-secondary/releases/33/Cloud/ppc64le/images/Fedora-Cloud-Base-33-1.2.ppc64le.qcow2 - s390x: https://download.fedoraproject.org/pub/fedora-secondary/releases/33/Cloud/s390x/images/Fedora-Cloud-Base-33-1.2.s390x.qcow2 - - aarch64 special note: - make sure to have the *edk2-aarch64* package installed, which provides UEFI - builds for QEMU and AARCH64 (/usr/share/edk2/aarch64/QEMU_EFI.fd) - https://fedoraproject.org/wiki/Architectures/AArch64/Install_with_QEMU - - Images need to have enough disk space to be able to build images using - osbuild. You can resize them using 'qemu-img resize 20G' command. - - HW requirements: - - The x86_64 VM uses 1 CPU and 1GB of RAM - - The aarch64, s390x and ppc64le VMs each uses 2CPU and 2GB of RAM - - Unless filtered using `--arch` option, the script starts 4 VMs in parallel - - Tested with: - - Fedora 32 (x86_64) and QEMU version 4.2.1 - - 'remote' command - ================ - Example (builds rhel-8 qcow2 images on aarch64 s390x ppc64le): - tools/test-case-generators/generate-all-test-cases \ - --output test/data/manifests \ - --arch aarch64 \ - --arch s390x \ - --arch ppc64le \ - --distro rhel-8 \ - --image-type qcow2 \ - remote \ - --host-ppc64le 192.168.1.10 \ - --host-aarch64 192.168.1.20 \ - --host-s390x 192.168.1.30 - - When using this command, the script uses existing remote hosts accessible - via SSH for each architecture. -""" - - -import argparse -import subprocess -import json -import os -import tempfile -import shutil -import time -import socket -import contextlib -import multiprocessing -import logging -import glob - -import yaml - - -# setup logging -log = logging.getLogger("generate-all-test-cases") -log.setLevel(logging.INFO) -formatter = logging.Formatter("%(asctime)s [%(levelname)s] - %(processName)s: %(message)s") -sh = logging.StreamHandler() -sh.setFormatter(formatter) -log.addHandler(sh) - - -# list holding all supported generator classes -SUPPORTED_GENERATORS = [] - - -# decorator to register new generator classes -def register_generator_cls(cls): - SUPPORTED_GENERATORS.append(cls) - return cls - - -class BaseRunner(contextlib.AbstractContextManager): - """ - Base class representing a generic runner, which is used for generating image - test case definitions. - - 'repos' is a list of strings such as ",", specifying additional - DNF repositories to use when installing packages. - """ - - def __init__(self, hostname, username="root", repos=[], port=22): - self.hostname = hostname - self.port = port - self.username = username - self.repos = repos - self.runner_ready = False - - def run_command(self, command): - """ - Runs a given command on the Runner over ssh in a blocking fashion. - - Calling this method before is_ready() returned True has undefined - behavior. - - Returns stdin, stdout, stderr from the run command. - """ - ssh_command = [ - "ssh", - "-oStrictHostKeyChecking=no", # don't verify the remote host's key - "-oUserKnownHostsFile=/dev/null", # don't add the remote host's key as trusted - "-oLogLevel=ERROR", # don't log warning that the host's key has been added as trusted - "-p", f"{self.port}", - f"{self.username}@{self.hostname}", - command - ] - - try: - # don't log commands when the vm is not yet ready for use - if self.runner_ready: - log.debug("Running on runner: '%s'", command) - completed_process = subprocess.run(ssh_command, capture_output=True, text=True) - except Exception as e: - # don't log errors when vm is not ready yet, because there are many errors - if self.runner_ready: - log.error("Running command over SSH failed: %s", str(e)) - raise e - - stdout = completed_process.stdout if completed_process.stdout else "" - stderr = completed_process.stderr if completed_process.stderr else "" - - return stdout, stderr, completed_process.returncode - - def run_command_check_call(self, command): - """ - Runs a command on the runner over SSH in a similar fashion as subprocess.check_call() - """ - stdout, stderr, ret = self.run_command(command) - if ret != 0: - raise subprocess.CalledProcessError(ret, command, stdout, stderr) - - def run_command_check_output(self, command): - """ - Runs a command on the runner over SSH in a similar fashion as subprocess.check_output() - """ - stdout, stderr, ret = self.run_command(command) - if ret != 0: - raise subprocess.CalledProcessError(ret, command, stdout, stderr) - return stdout - - @contextlib.contextmanager - def get_managed_workdir(self, basedir="~", cleanup=True): - """ - Context manager which creates a random workdir under the specified - 'basedir' on the runner. The 'basedir' defaults to user's home ('~'). - The created workdir is by default deleted on context manager exit, - unless 'cleanup' is set to False. - """ - workdir = self.run_command_check_output(f"TMPDIR={basedir} mktemp -d").strip() - try: - yield workdir - finally: - if cleanup: - self.run_command_check_output(f"sudo rm -rf {workdir}") - - def copytree_to_runner(self, host_path, runner_path): - """ - Copies the content of 'host_path' directory from the host to the - 'runner_path' directory on the runner using rsync. - """ - if not host_path[-1] == "/": - host_path += "/" - rsync_command = [ - "rsync", - "-az", - "-e", f"ssh -p {self.port} -oStrictHostKeyChecking=no -oUserKnownHostsFile=/dev/null -oLogLevel=ERROR", - host_path, - f"{self.username}@{self.hostname}:{runner_path}" - ] - subprocess.check_call(rsync_command) - - def copytree_from_runner(self, runner_path, host_path): - """ - Copies the content of 'runner_path' directory from the runner to the - 'host_path' directory on the host using rsync. - """ - if not runner_path[-1] == "/": - runner_path += "/" - rsync_command = [ - "rsync", - "-az", - "-e", f"ssh -p {self.port} -oStrictHostKeyChecking=no -oUserKnownHostsFile=/dev/null -oLogLevel=ERROR", - f"{self.username}@{self.hostname}:{runner_path}", - host_path - ] - subprocess.check_call(rsync_command) - - def wait_until_ready(self, timeout=None, retry_sec=15): - """ - Waits for the runner until it is ready for use. This is determined by - executing the 'is_ready()' method in a blocking fashion. - - This method is blocking, unless 'timeout' is provided. - """ - now = time.time() - while not self.is_ready(): - if timeout is not None and time.time() > (now + timeout): - raise subprocess.TimeoutExpired("wait_until_ready()", timeout) - time.sleep(retry_sec) - # make sure that rsync is installed to be able to transfer the data - self.dnf_install(["rsync"]) - - def dnf_install(self, packages): - """ - Installs always the latest version of provided packages using DNF. - If the packages are already installed and there is a newer version in - tne repos, packages are upgraded. - - If the runner was instantiated with a list of repositories, these will - be added to the DNF command. - """ - cmd = ["dnf", "-y"] - for repo in self.repos: - cmd.append(f"--repofrompath='{repo}'") - repo_name, _ = repo.split(',', 1) - cmd.append(f"--setopt={repo_name}.gpgcheck=0") - - self.run_command_check_call(" ".join(cmd + ["--refresh", "install"] + packages)) - self.run_command_check_call(" ".join(cmd + ["upgrade"] + packages)) - - def is_ready(self, command="id"): - """ - Returns True if the runner is ready to be used, which is determined by - running the provided 'command', which must exit with 0 return value. - """ - if self.runner_ready: - return True - - try: - # run command to determine if the host is ready for use - self.run_command_check_call(command) - except (subprocess.CalledProcessError) as _: - # ignore exceptions, this is useful when the host is still stating up - pass - else: - log.debug("Runner is ready for use") - self.runner_ready = True - - return self.runner_ready - - -class RemoteRunner(BaseRunner): - """ - Runner class representing existing remote host accessible via SSH. - """ - - def __exit__(self, *exc_details): - pass - - -class BaseQEMURunner(BaseRunner): - """ - Base class representing a QEMU VM runner, which is used for generating image - test case definitions. - - Each architecture-specific runner should inherit from this class and define - QEMU_BIN, QEMU_CMD class variable. These will be used to successfully boot - VM for the given architecture. - """ - - # name of the QEMU binary to use for running the VM - QEMU_BIN = None - # the actual command to use for running QEMU VM - QEMU_CMD = None - - DEFAULT_CI_USER_DATA = { - "user": "admin", - "sudo": "ALL=(ALL) NOPASSWD:ALL" - } - - def __init__(self, image, username, repos=[], cdrom_iso=None): - super().__init__("localhost", username, repos) - self._check_qemu_bin() - - # path to image to run - self.image = image - # path to cdrom iso to attach (for cloud-init) - self.cdrom_iso = cdrom_iso - # Popen object of the qemu process - self.vm = None - # following values are set after the VM is terminated - self.vm_return_code = None - self.vm_stdout = None - self.vm_stderr = None - - def _check_qemu_bin(self): - """ - Checks whether QEMU binary used for the particular runner is present - on the system. - """ - try: - subprocess.check_call([self.QEMU_BIN, "--version"]) - except subprocess.CalledProcessError as _: - raise RuntimeError("QEMU binary {} not found".format(self.QEMU_BIN)) - - def _get_qemu_cdrom_option(self): - """ - Get the appropriate options for attaching CDROM device to the VM, if - the path to ISO has been provided. - - This method may be reimplemented by architecture specific runner class - if needed. Returns a list of strings to be appended to the QEMU command. - """ - if self.cdrom_iso: - return ["-cdrom", self.cdrom_iso] - - return list() - - def _get_qemu_boot_image_option(self): - """ - Get the appropriate options for specifying the image to boot from. - - This method may be reimplemented by architecture specific runner class - if needed. - - Returns a list of strings to be appended to the QEMU command. - """ - return [self.image] - - def _get_qemu_ssh_fwd_option(self): - """ - Get the appropriate options for forwarding guest's port 22 to host's - random available port. - """ - # get a random free TCP port. This should work in majority of cases - with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: - sock.bind(('localhost', 0)) - self.port = sock.getsockname()[1] - - return ["-net", "user,hostfwd=tcp::{}-:22".format(self.port)] - - def _run_qemu_cmd(self, qemu_cmd): - """ - Assembles the QEMU command to run and executes using subprocess. - """ - # handle CDROM - qemu_cmd.extend(self._get_qemu_cdrom_option()) - - # handle boot image - qemu_cmd.extend(self._get_qemu_boot_image_option()) - - # handle forwarding of guest's SSH port to host - qemu_cmd.extend(self._get_qemu_ssh_fwd_option()) - - log.debug("Starting VM using command: '%s'", " ".join(qemu_cmd)) - self.vm = subprocess.Popen( - qemu_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE - ) - - def start(self): - """ - Starts the QEMU process running the VM - """ - if not self.QEMU_CMD: - raise NotImplementedError("The way to spin up QEMU VM is not implemented") - - # don't start the qemu process if there is already one running - if self.vm is None: - self._run_qemu_cmd(list(self.QEMU_CMD)) - log.info( - "Runner started. You can SSH to it once it has been configured:" + \ - "'ssh %s@localhost -p %d'", - self.username, - self.port - ) - - def stop(self): - """ - Stops the QEMU process running the VM - """ - if self.vm: - self.vm.terminate() - try: - # give the process some time to terminate - self.vm.wait(timeout=15) - except subprocess.TimeoutExpired as _: - self.vm.kill() - self.vm.wait(timeout=15) - - if self.vm.stdout: - self.vm_stdout = self.vm.stdout.read().decode() - if self.vm.stderr: - self.vm_stderr = self.vm.stderr.read().decode() - self.vm_return_code = self.vm.returncode - - if self.vm_return_code == 0: - log.debug("%s process ended with return code %d\n\n" + \ - "stdout:\n%s\nstderr:\n%s", self.QEMU_BIN, - self.vm_return_code, self.vm_stdout, self.vm_stderr) - else: - log.error("%s process ended with return code %d\n\n" + \ - "stdout:\n%s\nstderr:\n%s", self.QEMU_BIN, - self.vm_return_code, self.vm_stdout, self.vm_stderr) - - self.vm = None - self.runner_ready = False - - def is_ready(self, command="ls /var/lib/cloud/instance/boot-finished"): - """ - Returns True if the VM is ready to be used. - VM is ready after the cloud-init setup is finished. - """ - if self.runner_ready: - return True - - # check if the runner didn't terminate unexpectedly before being ready - try: - if self.vm: - self.vm.wait(1) - except subprocess.TimeoutExpired as _: - # process still running - pass - else: - # process not running, call .stop() to log stdout, stderr and retcode - self.stop() - qemu_bin = self.QEMU_BIN - raise RuntimeError(f"'{qemu_bin}' process ended before being ready to use") - - return super().is_ready(command) - - def __enter__(self): - self.start() - return self - - def __exit__(self, *exc_details): - self.stop() - - @classmethod - def create_default_ci_userdata(cls, workdir): - """ - Creates the default 'user-data.yml' file for cloud-init inside the - 'workdir'. The path of the created file is returned. - """ - default_ci_userdata_path = f"{workdir}/user-data.yml" - - with open(default_ci_userdata_path, "w") as f: - f.write("#cloud-config\n") - yaml.safe_dump(cls.DEFAULT_CI_USER_DATA, f) - - return default_ci_userdata_path - - @staticmethod - def ci_userdata_add_authorized_ssh_key(userdata_file, ssh_id_file): - """ - Modifies the provided 'userdata_file' in-place by appending the provided - 'ssh_id_file' as authorized SSH key to it. - """ - append_data = {} - with open(ssh_id_file, encoding="utf-8") as f: - append_data["ssh_authorized_keys"] = [f.read().strip()] - - with open(userdata_file, "a") as f: - yaml.safe_dump(append_data, f, width=float("inf")) - - @staticmethod - def prepare_cloud_init_cdrom(ssh_id_file, workdir, userdata=None): - """ - Generates a CDROM ISO used as a data source for cloud-init. - - Returns path to the generated CDROM ISO image and path to the used - cloud-init userdata. - """ - iso_path = os.path.join(workdir, "cloudinit.iso") - cidatadir = os.path.join(workdir, "cidata") - user_data_path = os.path.join(cidatadir, "user-data") - meta_data_path = os.path.join(cidatadir, "meta-data") - - os.mkdir(cidatadir) - - # If no userdata were provided, use the default one - if not userdata: - userdata = BaseQEMURunner.create_default_ci_userdata(workdir) - log.debug("Using default cloud-init user-data created at: %s", userdata) - - if os.path.isdir(userdata): - # create a copy of the provided userdata, since they will be modified - userdata_tmp_dir = f"{workdir}/ci_userdata_copy" - os.makedirs(userdata_tmp_dir) - shutil.copytree(userdata, userdata_tmp_dir) - userdata = userdata_tmp_dir - - # Add the ssh key to the user-data - userdata_file = f"{userdata}/user-data.yml" - BaseQEMURunner.ci_userdata_add_authorized_ssh_key(userdata_file, ssh_id_file) - - with open(user_data_path, "w") as f: - script_dir = os.path.dirname(__file__) - subprocess.check_call( - [os.path.abspath(f"{script_dir}/../gen-user-data"), userdata], stdout=f) - else: - shutil.copy(userdata, user_data_path) - # Add the ssh key to the user-data - BaseQEMURunner.ci_userdata_add_authorized_ssh_key(user_data_path, ssh_id_file) - - with open(meta_data_path, "w") as f: - f.write("instance-id: nocloud\nlocal-hostname: vm\n") - - sysname = os.uname().sysname - log.debug("Generating CDROM ISO image for cloud-init user data: %s", iso_path) - if sysname == "Linux": - subprocess.check_call( - [ - "mkisofs", - "-input-charset", "utf-8", - "-output", iso_path, - "-volid", "cidata", - "-joliet", - "-rock", - "-quiet", - "-graft-points", - user_data_path, - meta_data_path - ], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL - ) - elif sysname == "Darwin": - subprocess.check_call( - [ - "hdiutil", - "makehybrid", - "-iso", - "-joliet", - "-o", iso_path, - f"{cidatadir}" - ], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL - ) - else: - raise NotImplementedError(f"Unsupported system '{sysname}' for generating cdrom iso") - - return iso_path, userdata - - -class X86_64QEMURunner(BaseQEMURunner): - """ - VM Runner for x86_64 architecture - """ - - QEMU_BIN = "qemu-system-x86_64" - QEMU_CMD = [ - QEMU_BIN, - "-M", "accel=kvm:hvf", - "-m", "1024", - "-object", "rng-random,filename=/dev/urandom,id=rng0", - "-device", "virtio-rng-pci,rng=rng0", - "-snapshot", - "-cpu", "max", - "-net", "nic,model=virtio", - ] - - -class Ppc64QEMURunner(BaseQEMURunner): - """ - VM Runner for ppc64le architecture - """ - - QEMU_BIN = "qemu-system-ppc64" - QEMU_CMD = [ - QEMU_BIN, - "-m", "2048", # RAM - "-smp", "2", # CPUs - "-object", "rng-random,filename=/dev/urandom,id=rng0", - "-device", "virtio-rng-pci,rng=rng0", - "-snapshot", - "-net", "nic,model=virtio", - ] - - -class Aarch64QEMURunner(BaseQEMURunner): - """ - VM Runner for aarch64 architecture - """ - - # aarch64 requires UEFI build for QEMU - # https://rwmj.wordpress.com/2015/02/27/how-to-boot-a-fedora-21-aarch64-uefi-guest-on-x86_64/ - # https://fedoraproject.org/wiki/Architectures/AArch64/Install_with_QEMU - QEMU_BIN = "qemu-system-aarch64" - QEMU_CMD = [ - QEMU_BIN, - "-m", "2048", # RAM - "-smp", "2", # CPUs - "-object", "rng-random,filename=/dev/urandom,id=rng0", - "-device", "virtio-rng-pci,rng=rng0", - "-snapshot", - "-monitor", "none", - "-machine", "virt", - "-cpu", "cortex-a57", - "-bios", "/usr/share/edk2/aarch64/QEMU_EFI.fd", # provided by 'edk2-aarch64' Fedora package - "-net", "nic,model=virtio", - ] - - -class S390xQEMURunner(BaseQEMURunner): - """ - VM Runner for s390x architecture - """ - - QEMU_BIN = "qemu-system-s390x" - QEMU_CMD = [ - QEMU_BIN, - "-m", "2048", # RAM - "-smp", "2", # CPUs - "-machine", "s390-ccw-virtio", - # disable msa5-base to suppress errors: - # qemu-system-s390x: warning: 'msa5-base' requires 'kimd-sha-512' - # qemu-system-s390x: warning: 'msa5-base' requires 'klmd-sha-512' - "-cpu", "max,msa5-base=no", - "-object", "rng-random,filename=/dev/urandom,id=rng0", - "-device", "virtio-rng-ccw,rng=rng0", - "-monitor", "none", - "-snapshot", - "-net", "nic,model=virtio", - ] - - def _get_qemu_cdrom_option(self): - """ - Get the appropriate options for attaching CDROM device to the VM, if the path to ISO has been provided. - - s390x tries to boot from the CDROM if attached the way as BaseRunner does it. - """ - if self.cdrom_iso: - iso_path = self.cdrom_iso - return list(["-drive", f"file={iso_path},media=cdrom"]) - else: - return list() - - def _get_qemu_boot_image_option(self): - """ - Get the appropriate options for specifying the image to boot from. - - s390x needs to have an explicit 'bootindex' specified. - https://qemu.readthedocs.io/en/latest/system/s390x/bootdevices.html - """ - image_path = self.image - return [ - "-drive", f"if=none,id=dr1,file={image_path}", - "-device", "virtio-blk,drive=dr1,bootindex=1" - ] - - -class BaseTestCaseMatrixGenerator(contextlib.AbstractContextManager): - """ - Base class representing generation of all test cases based on provided test - cases matrix using any runner. - """ - - # Define an appropriate Runner class for each supported architecture name - # in the child class. - # Example: - # arch_runner_map = { - # "x86_64": MyX86_64Runner, - # "aarch64": MyAarch64Runner, - # "ppc64le": MyPpc64Runner, - # "s390x": MyS390xRunner - # } - arch_runner_map = {} - - # packages to be installed on the Runner before generating test cases - install_rpms_list = [ - "osbuild", - "osbuild-selinux", - "osbuild-ostree", - "osbuild-composer", - "golang", - "python3-pyyaml", # needed by image-info - ] - - def __init__(self, arch_gen_matrix, sources, output, ssh_id_file, repos=[], build_rpms=False, keep_workdir=False, log_level=logging.INFO): - """ - 'arch_get_matrix' is a dict of requested distro-image_type matrix per architecture: - { - "arch1": { - "distro1": [ - "image-type1", - "image-type2" - ], - "distro2": [ - "image-type2", - "image-type3" - ] - }, - "arch2": { - "distro2": [ - "image-type2" - ] - }, - ... - } - 'sources' is a directory path with the osbuild-composer sources, which will be used to generate image test - cases. - 'output' is a directory path, where the generated test case manifests should be stored. - 'ssh_id_file' is path to the SSH ID file to use as the authorized key for the QEMU VMs. - 'repos' is a list of strings such as ",", specifying additional - DNF repositories to use when installing packages. - 'keep_workdir' is a boolean specifying if the workdir created on the remote host should be deleted - after the runner finishes its work. - 'log_level' is the desired log level to be used by new processes created for each runner. - """ - self._processes = list() - self.arch_gen_matrix = arch_gen_matrix - self.sources = sources - self.output = output - self.ssh_id_file = ssh_id_file - self.repos = repos - self.build_rpms = build_rpms - self.keep_workdir = keep_workdir - self.log_level = log_level - - # check that the generator class supports each needed architecture - for arch in self.arch_gen_matrix.keys(): - if self.arch_runner_map.get(arch) is None: - raise RuntimeError(f"architecture '{arch}' is not supported by {self.__class__.__name__}") - - def generate(self): - """ - Generates all test cases based on provided data in a blocking manner. - - The method must be implemented in the child class and call '_generate()' method. - """ - # In a child class: - # 1. Construct a dictionary of architecture-specific runner class arguments in 'arch_runner_cls_args' - # 2. call 'self._generate(arch_runner_cls_args)' - raise NotImplementedError() - - def _generate(self, arch_runner_cls_args_map): - """ - Generates all test cases based on provided data in a blocking manner. - - The method runs a separate Runner for each architecture. All runners - are run in parallel in a new process. The method blocks until all runners - finish their work. - """ - # Start a separate runner for each required architecture - for arch in self.arch_gen_matrix.keys(): - process = multiprocessing.Process( - target=self._runner_process_main, - args=(self.arch_runner_map[arch], arch_runner_cls_args_map[arch], arch)) - process.name = f"{arch}-Runner" - self._processes.append(process) - process.start() - log.info("Started '%s'", process.name) - - # wait for all processes to finish - log.info("Waiting for all runner processes to finish") - for process in self._processes: - process.join() - self._processes.clear() - - def _runner_process_main(self, runner_cls, runner_cls_args, arch): - """ - Main function of a process generating test cases for a single architecture - using the provided Runner class - """ - # set the expected log level in the new process - log.setLevel(self.log_level) - - # spin up appropriate VM represented by 'runner' - with runner_cls(*runner_cls_args) as runner: - self._generate_arch_with_runner(runner, arch) - - def _generate_arch_with_runner(self, runner, arch): - """ - Generate test cases for one architecture using the provided Runner. - - 'runner' is a specific Runner class instance, which can be used to generate - image test cases. - 'arch' is the architecture of the Runner class instance. This information - is used to determine which image test cases should be generated. - """ - current_process_name = multiprocessing.current_process().name - generation_matrix = self.arch_gen_matrix[arch] - go_tls_timeout_retries = 3 - - log.info("Waiting for '%s' to become ready", current_process_name) - runner.wait_until_ready() - - # First create a workdir, which will be deleted after everything is finished - with runner.get_managed_workdir(cleanup=not self.keep_workdir) as runner_workdir: - log.debug("Using '%s' as a workdir", runner_workdir) - - # don't use /var/tmp for osbuild's store directory to prevent systemd from possibly - # removing some of the downloaded RPMs due to "ageing" - runner_osbuild_store_dir = f"{runner_workdir}/osbuild-store" - runner.run_command_check_call(f"mkdir {runner_osbuild_store_dir}") - - # install necessary packages - runner.dnf_install(self.install_rpms_list) - - # copy sources from the host to the runner - log.info("Copying sources to the runner") - runner_sources_dir = f"{runner_workdir}/sources" - runner.copytree_to_runner(self.sources, runner_sources_dir) - - if self.build_rpms: - # build composer RPMs from the copied over sources and install them - self._build_install_rpms_from_source(runner, runner_sources_dir) - - # Log installed versions of important RPMs - rpm_versions, _, _ = runner.run_command("rpm -q osbuild osbuild-composer") - log.info("Installed packages: %s", " ".join(rpm_versions.split("\n"))) - - # create output directory for the results on the runner - runner_output_dir = f"{runner_workdir}/output" - runner.run_command_check_call(f"mkdir {runner_output_dir}") - - # Workaround the problem that 'image-info' can not read SELinux labels unknown to the host. - # It is not possible to relabel the 'image-info' in the mounted path, because it is read-only. - # Also bind-mounting copy of image-info with proper SELinux labels to /mnt/sources didn't do the trick. - # For the reason above, make a full copy of sources in /home/admin and operate on it instead. - osbuild_label = runner.run_command_check_output("matchpathcon -n /usr/bin/osbuild") - osbuild_label = osbuild_label.strip() - image_info_runner_path = f"{runner_sources_dir}/tools/image-info" - runner.run_command_check_call(f"chcon {osbuild_label} {image_info_runner_path}") - - results = {} - for distro, img_type_list in generation_matrix.items(): - results[distro] = distro_results = {"SUCCESS": [], "FAIL": []} - for image_type in img_type_list: - log.info("Generating test case for '%s' '%s' image on '%s'", distro, image_type, arch) - - gen_test_cases_cmd = f"cd {runner_sources_dir}; sudo tools/test-case-generators/generate-test-cases" + \ - f" --distro {distro} --arch {arch} --image-types {image_type}" + \ - f" --store {runner_osbuild_store_dir} --output {runner_output_dir}" - - # allow fixed number of retries if the command fails for a specific reason - for i in range(1, go_tls_timeout_retries+1): - if i > 1: - log.info("Retrying image test case generation (%d of %d)", i, go_tls_timeout_retries) - - stdout, stderr, retcode = runner.run_command(gen_test_cases_cmd) - - if retcode != 0: - log.error("Generating test case for %s-%s-%s - FAIL\nretcode: %d\nstdout: %s\nstderr: %s", - distro, arch, image_type, retcode, stdout, stderr) - # Retry the command, if there was an error due to TLS handshake timeout - # This is happening on all runners using other than host's arch from time to time. - if stderr.find("net/http: TLS handshake timeout") != -1: - continue - distro_results["FAIL"].append(image_type) - else: - log.info("Generating test case for %s-%s-%s - SUCCESS", distro, arch, image_type) - distro_results["SUCCESS"].append(image_type) - # don't retry if the process ended successfully or if there was a different error - break - - # copy partial results back to the host - runner.copytree_from_runner(runner_output_dir, self.output) - - # clean up the store direcotry after each distro, to prevent running out of space - runner.run_command_check_call(f"sudo rm -rf {runner_osbuild_store_dir}/*") - - log.info("'%s' finished its work", current_process_name) - log.info("Results: %s", results) - - @staticmethod - def _build_install_rpms_from_source(runner: BaseRunner, sources_path: os.PathLike): - """ - Builds the (presumably osbuild-composer's) RPMs from sources on the runner - and installs them on the runner. - """ - log.info("Building RPMs from source and installing them") - enter_src_dir = f"cd {sources_path};" - - runner.dnf_install(["'@RPM Development Tools'"]) - runner.run_command_check_call(f"{enter_src_dir} sudo dnf -y builddep *.spec") - # Build RPMs without tests to prevent failing the unit test that checks - # image manifests. Since we are regenerating image test cases, it is - # assumed that the unit test would not pass until the respective tests - # are regenerated. - runner.run_command_check_call(f"{enter_src_dir} make scratch") - # do not install debuginfo and debugsource RPMs - runner.run_command_check_call(f"{enter_src_dir} sudo dnf install -y $(ls rpmbuild/RPMS/*/*.rpm | grep -Ev 'debugsource|debuginfo')") - - def _cleanup(self): - """ - Terminates all running Runner processes. - """ - # ensure that all Runner processes are stopped - for process in self._processes: - process.terminate() - process.join(5) - # kill the process if it didn't terminate yet - if process.exitcode is None: - process.kill() - process.close() - self._processes.clear() - - def __exit__(self, *exc_details): - self._cleanup() - - def __getstate__(self): - # references to already spawned processes are problematic for pickle - state = self.__dict__.copy() - # remove problematic variable - state.pop("_processes") - return state - - @staticmethod - def add_subparser(subparsers): - raise NotImplementedError() - - @staticmethod - def main(arch_gen_matrix_dict, sources, output, ssh_id_file, repos, build_rpms, keep_workdir, parser_args): - raise NotImplementedError() - - -@register_generator_cls -class QEMUTestCaseMatrixGenerator(BaseTestCaseMatrixGenerator): - """ - Class representing generation of all test cases based on provided test - cases matrix using QEMU runners. - - The class should be used as a context manager to ensure that cleanup - of all resources is done (mainly VMs and processes running them). - - VM for each architecture is run in a separate process to ensure that - generation is done in parallel. - """ - - arch_runner_map = { - "x86_64": X86_64QEMURunner, - "aarch64": Aarch64QEMURunner, - "ppc64le": Ppc64QEMURunner, - "s390x": S390xQEMURunner - } - - def __init__(self, images, arch_gen_matrix, sources, output, ssh_id_file, repos=[], build_rpms=False, ci_userdata=None, keep_workdir=False, log_level=logging.INFO): - """ - 'images' is a dict of qcow2 image paths for each supported architecture, - that should be used for VMs: - { - "arch1": "", - "arch2": "", - ... - } - 'arch_get_matrix' is a dict of requested distro-image_type matrix per architecture: - { - "arch1": { - "distro1": [ - "image-type1", - "image-type2" - ], - "distro2": [ - "image-type2", - "image-type3" - ] - }, - "arch2": { - "distro2": [ - "image-type2" - ] - }, - ... - } - 'sources' is a directory path with the osbuild-composer sources, which will be used to generate image test - cases. - 'output' is a directory path, where the generated test case manifests should be stored. - 'ssh_id_file' is path to the SSH ID file to use as the authorized key for the QEMU VMs. - 'ci_userdata' is path to file / directory containing cloud-init user-data used - for generating CDROM ISO image, that is attached to each VM as a cloud-init data source. - If the value is not provided, then the default internal cloud-init user-data are used. - """ - super().__init__(arch_gen_matrix, sources, output, ssh_id_file, repos, build_rpms, keep_workdir, log_level) - self.images = images - self.ci_userdata = ci_userdata - - # check that we have image for each needed architecture - for arch in self.arch_gen_matrix.keys(): - if self.images.get(arch) is None: - raise RuntimeError(f"architecture '{arch}' is in requested test matrix, but no image was provided") - - def generate(self): - """ - Generates all test cases based on provided data in a blocking manner. - """ - # use the same CDROM ISO image for all VMs - with tempfile.TemporaryDirectory(prefix="osbuild-composer-test-gen-") as tmpdir: - cdrom_iso, used_userdata = BaseQEMURunner.prepare_cloud_init_cdrom( - self.ssh_id_file, tmpdir, self.ci_userdata - ) - - # Load user from the cloud-init user-data - if os.path.isdir(used_userdata): - user_data_path = f"{used_userdata}/user-data.yml" - else: - user_data_path = used_userdata - with open(user_data_path, "r") as ud: - user_data = yaml.safe_load(ud) - vm_user = user_data["user"] - - # Create architecture-specific map or runner class arguments and start the test case generation. - arch_runner_cls_args_map = {} - for arch in self.arch_gen_matrix.keys(): - arch_runner_cls_args_map[arch] = (self.images[arch], vm_user, self.repos, cdrom_iso) - - self._generate(arch_runner_cls_args_map) - - @staticmethod - def add_subparser(subparsers): - """ - Adds subparser for the 'qemu' command - """ - parser_qemu = subparsers.add_parser( - "qemu", - description="generate test cases locally using QEMU", - help="generate test cases locally using QEMU" - ) - - parser_qemu.add_argument( - "--image-x86_64", - metavar="PATH", - help="x86_64 image to use for QEMU VM", - required=False - ) - parser_qemu.add_argument( - "--image-ppc64le", - metavar="PATH", - help="ppc64le image to use for QEMU VM", - required=False - ) - parser_qemu.add_argument( - "--image-aarch64", - metavar="PATH", - help="aarch64 image to use for QEMU VM", - required=False - ) - parser_qemu.add_argument( - "--image-s390x", - metavar="PATH", - help="s390x image to use for QEMU VM", - required=False - ) - parser_qemu.add_argument( - "--ci-userdata", - metavar="PATH", - help="file or directory with cloud-init user-data, to use to configure runner VMs", - type=os.path.abspath - ) - parser_qemu.set_defaults(func=QEMUTestCaseMatrixGenerator.main) - - @staticmethod - def main(arch_gen_matrix_dict, sources, output, ssh_id_file, repos, build_rpms, keep_workdir, parser_args): - """ - The main function of the 'qemu' command - """ - vm_images = { - "x86_64": parser_args.image_x86_64, - "aarch64": parser_args.image_aarch64, - "ppc64le": parser_args.image_ppc64le, - "s390x": parser_args.image_s390x - } - ci_userdata = parser_args.ci_userdata - - with QEMUTestCaseMatrixGenerator( - vm_images, arch_gen_matrix_dict, sources, output, - ssh_id_file, repos, build_rpms, ci_userdata, keep_workdir, - log.level) as generator: - generator.generate() - - -@register_generator_cls -class RemoteTestCaseMatrixGenerator(BaseTestCaseMatrixGenerator): - """ - Class representing generation of all test cases based on provided test - cases matrix using existing remote runners. - """ - - arch_runner_map = { - "x86_64": RemoteRunner, - "aarch64": RemoteRunner, - "ppc64le": RemoteRunner, - "s390x": RemoteRunner - } - - def __init__(self, hosts, username, arch_gen_matrix, sources, output, ssh_id_file, repos, build_rpms, keep_workdir, log_level=logging.INFO): - """ - 'hosts' is a dict of a remote system hostnames or IP addresses for each supported architecture, - that should be used to generate image test cases: - { - "arch1": "", - "arch2": "", - ... - } - 'username' is a username to be used to SSH to the remote hosts. The same username is used for all remote - hosts. - 'arch_get_matrix' is a dict of requested distro-image_type matrix per architecture: - { - "arch1": { - "distro1": [ - "image-type1", - "image-type2" - ], - "distro2": [ - "image-type2", - "image-type3" - ] - }, - "arch2": { - "distro2": [ - "image-type2" - ] - }, - ... - } - 'sources' is a directory path with the osbuild-composer sources, which will be used to generate image test - cases. - 'output' is a directory path, where the generated test case manifests should be stored. - 'ssh_id_file' is path to the SSH ID file to use as the authorized key for the QEMU VMs. - """ - super().__init__(arch_gen_matrix, sources, output, ssh_id_file, repos, build_rpms, keep_workdir, log_level) - self.hosts = hosts - self.username = username - - # check that we have image for each needed architecture - for arch in self.arch_gen_matrix.keys(): - if self.hosts.get(arch) is None: - raise RuntimeError(f"architecture '{arch}' is in requested test matrix, but no host was provided") - - def generate(self): - """ - Generates all test cases based on provided data in a blocking manner. - """ - # Create architecture-specific map or runner class arguments and start the test case generation. - arch_runner_cls_args_map = {} - for arch in self.arch_gen_matrix.keys(): - arch_runner_cls_args_map[arch] = (self.hosts[arch], self.username, self.repos) - - self._generate(arch_runner_cls_args_map) - - @staticmethod - def add_subparser(subparsers): - """ - Adds subparser for the 'remote' command - """ - parser_remote = subparsers.add_parser( - "remote", - description="generate test cases on existing remote systems", - help="generate test cases on existing remote systems" - ) - - parser_remote.add_argument( - "--host-x86_64", - metavar="HOSTNAME", - help="hostname or an IP address of the remote x86_64 host", - required=False - ) - parser_remote.add_argument( - "--host-ppc64le", - metavar="HOSTNAME", - help="hostname or an IP address of the remote ppc64le host", - required=False - ) - parser_remote.add_argument( - "--host-aarch64", - metavar="HOSTNAME", - help="hostname or an IP address of the remote aarch64 host", - required=False - ) - parser_remote.add_argument( - "--host-s390x", - metavar="HOSTNAME", - help="hostname or an IP address of the remote s390x host", - required=False - ) - parser_remote.add_argument( - "-u", "--username", - metavar="USER", - help="username to use to SSH to the remote systems. The same username " + \ - "is used to connect to all remote hosts. (default 'root')", - default="root" - ) - parser_remote.set_defaults(func=RemoteTestCaseMatrixGenerator.main) - - @staticmethod - def main(arch_gen_matrix_dict, sources, output, ssh_id_file, repos, build_rpms, keep_workdir, parser_args): - """ - The main function of the 'remote' command - """ - hosts = { - "x86_64": parser_args.host_x86_64, - "aarch64": parser_args.host_aarch64, - "ppc64le": parser_args.host_ppc64le, - "s390x": parser_args.host_s390x - } - username = parser_args.username - - with RemoteTestCaseMatrixGenerator( - hosts, username, arch_gen_matrix_dict, sources, output, - ssh_id_file, repos, build_rpms, keep_workdir, log.level) as generator: - generator.generate() - - -@register_generator_cls -class ManifestTestCaseMatrixGenerator(BaseTestCaseMatrixGenerator): - """ - Class representing generation of all test case manifests based on provided test - cases matrix locally. - """ - - arch_runner_map = { - "x86_64": True, - "aarch64": True, - "ppc64le": True, - "s390x": True - } - - def __init__(self, arch_gen_matrix, sources, output, repos, keep_workdir, log_level=logging.INFO): - """ - Constructor - """ - super().__init__(arch_gen_matrix=arch_gen_matrix, sources=sources, output=output, ssh_id_file="/dev/null", - repos=repos, build_rpms=False, keep_workdir=keep_workdir, log_level=log_level) - - def generate(self): - """ - Generates all test cases based on provided data in a blocking manner. - """ - for arch_name, distros in self.arch_gen_matrix.items(): - for distro_name, image_types in distros.items(): - for image_type in image_types: - script_dir = os.path.dirname(__file__) - log.info("Generating manifest for %s-%s-%s", distro_name, arch_name, image_type) - try: - subprocess.run([f"{script_dir}/generate-test-cases", "--distro", f"{distro_name}", "--arch", - f"{arch_name}", "--image-types", f"{image_type}", "--store", "/dev/null", "--output", f"{self.output}", - "--keep-image-info"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, encoding="utf-8") - except subprocess.CalledProcessError as e: - log.error("Generating manifest for %s-%s-%s FAILED: %s", distro_name, arch_name, image_type, e) - - @staticmethod - def add_subparser(subparsers): - """ - Adds subparser for the 'manifests' command - """ - parser_remote = subparsers.add_parser( - "manifests", - description="locally generate test case manifests only", - help="locally generate test case manifests only" - ) - parser_remote.set_defaults(func=ManifestTestCaseMatrixGenerator.main) - - @staticmethod - def main(arch_gen_matrix_dict, sources, output, ssh_id_file, repos, build_rpms, keep_workdir, parser_args): - """ - The main function of the 'manifests' command - """ - with ManifestTestCaseMatrixGenerator( - arch_gen_matrix_dict, sources, output, repos, log.level) as generator: - generator.generate() - - -def get_default_ssh_id_file(): - """ - Returns the path of the default SSH ID file to use. - - The defalt SSH ID file is the most recent file that matches: ~/.ssh/id*.pub, - (excluding those that match ~/.ssh/*-cert.pub). This mimics the bahaviour - of the 'ssh-copy-id' command. - """ - id_files = glob.glob(os.path.expanduser("~/.ssh/id*.pub")) - id_files = [f for f in id_files if not f.endswith("-cert.pub")] - - try: - most_recent_file = id_files[0] - except IndexError as e: - raise RuntimeError("Found no files matching '~/.ssh/id*.pub'") from e - most_recent_file_mtime = os.path.getmtime(most_recent_file) - - for id_file in id_files[1:]: - id_file_mtime = os.path.getmtime(id_file) - if most_recent_file_mtime < id_file_mtime: - most_recent_file = id_file - most_recent_file_mtime = id_file_mtime - - return most_recent_file - - -def get_args(): - """ - Returns ArgumentParser instance specific to this script. - """ - parser = argparse.ArgumentParser(description="(re)generate image all test cases") - - parser.add_argument( - "--output", - metavar="DIR", - type=os.path.abspath, - default="test/data/manifests", - help="directory for storing generated image test cases (default 'test/data/manifests')" - ) - parser.add_argument( - "--sources", - metavar="DIR", - type=os.path.abspath, - help="osbuild-composer sources directory used when generate test cases. " + \ - "If not provided, the current working directory is used." - ) - parser.add_argument( - "--distro", - help="reduce the generation matrix only to specified distribution. " + \ - "Can be specified multiple times.", - action="append", - default=[] - ) - parser.add_argument( - "--arch", - help="reduce the generation matrix only to specified architecture. " + \ - "Can be specified multiple times.", - action="append", - default=[] - ) - parser.add_argument( - "--image-type", - metavar="TYPE", - help="reduce the generation matrix only to specified image type." + \ - "Can be specified multiple times.", - action="append", - default=[] - ) - parser.add_argument( - "--gen-matrix-file", - metavar="PATH", - help="JSON file with test case generation matrix (distro x arch x image type)." + \ - " If not provided, the matrix is fetched directly from composer.", - type=os.path.abspath - ) - parser.add_argument( - "-i", "--ssh-id-file", - metavar="PATH", - help="SSH ID file to use for authenticating to the runner VMs. If the file does not end with " + \ - ".pub, it will be appended to it.", - type=os.path.abspath - ) - parser.add_argument( - "--keep-workdir", - action="store_true", - help="Don't delete the workdir created on the remote host after finishing.", - default=False - ) - parser.add_argument( - "--build-rpms", - action="store_true", - help="Build RPMs from sources copied to the runner and install them.", - default=False - ) - parser.add_argument( - "--repofrompath", - metavar=",", - action="append", - help="Specify a repository to add to the repositories used when installing packages on the runner. " + \ - "Can be specified multiple times.", - default=[] - ) - parser.add_argument( - "-d", "--debug", - action='store_true', - default=False, - help="turn on debug logging." - ) - - subparsers = parser.add_subparsers(dest="command") - subparsers.required = True - for supported_generator_cls in SUPPORTED_GENERATORS: - supported_generator_cls.add_subparser(subparsers) - - return parser.parse_args() - - -def main(args): - output = args.output - sources = args.sources if args.sources else os.getcwd() - gen_matrix_file = args.gen_matrix_file - - distros = args.distro - arches = args.arch - image_types = args.image_type - repos = args.repofrompath - build_rpms = args.build_rpms - keep_workdir = args.keep_workdir - - # determine the SSH ID file to be used - ssh_id_file = args.ssh_id_file - if not ssh_id_file: - ssh_id_file = get_default_ssh_id_file() - if not ssh_id_file.endswith(".pub"): - ssh_id_file += ".pub" - log.debug("Using SSH ID file: %s", ssh_id_file) - - if not os.path.isdir(output): - raise RuntimeError(f"output directory {output} does not exist") - - if gen_matrix_file is None: - # load the matrix directly from composer - script_dir = os.path.dirname(__file__) - matrix_generator_path = os.path.join(script_dir, "../../cmd/osbuild-composer-image-definitions/main.go") - raw_output = subprocess.check_output(["go", "run", matrix_generator_path], encoding="UTF-8") - gen_matrix_dict = json.loads(raw_output) - else: - # read the matrix from a file - log.info("Loading generation matrix from file: '%s'", gen_matrix_file) - with open(gen_matrix_file, "r") as gen_matrix_json: - gen_matrix_dict = json.load(gen_matrix_json) - - # Filter generation matrix based on passed arguments - for distro in list(gen_matrix_dict.keys()): - # filter the distros list - if distros and distro not in distros: - del gen_matrix_dict[distro] - continue - for arch in list(gen_matrix_dict[distro].keys()): - # filter the arches list of a distro - if arches and arch not in arches: - del gen_matrix_dict[distro][arch] - continue - # filter the image types of a distro and arch - if image_types: - gen_matrix_dict[distro][arch] = list(filter(lambda x: x in image_types, gen_matrix_dict[distro][arch])) - # delete the whole arch if there is no image type left after filtering - if len(gen_matrix_dict[distro][arch]) == 0: - del gen_matrix_dict[distro][arch] - - log.debug("gen_matrix_dict:\n%s", json.dumps(gen_matrix_dict, indent=2, sort_keys=True)) - - # Construct per-architecture matrix dictionary of distro x image type - arch_gen_matrix_dict = dict() - for distro, arches in gen_matrix_dict.items(): - for arch, image_types in arches.items(): - try: - arch_dict = arch_gen_matrix_dict[arch] - except KeyError as _: - arch_dict = arch_gen_matrix_dict[arch] = dict() - arch_dict[distro] = image_types.copy() - - log.debug("arch_gen_matrix_dict:\n%s", json.dumps(arch_gen_matrix_dict, indent=2, sort_keys=True)) - - args.func(arch_gen_matrix_dict, sources, output, ssh_id_file, repos, build_rpms, keep_workdir, args) - - -if __name__ == '__main__': - args = get_args() - - if args.debug: - log.setLevel(logging.DEBUG) - - try: - main(args) - except KeyboardInterrupt as _: - log.info("Interrupted by user") diff --git a/tools/test-case-generators/generate-test-cases b/tools/test-case-generators/generate-test-cases deleted file mode 100755 index d34b17a7f..000000000 --- a/tools/test-case-generators/generate-test-cases +++ /dev/null @@ -1,177 +0,0 @@ -#!/usr/bin/python3 - -import argparse -import subprocess -import json -import os -import sys -import tempfile - -def is_subprocess_succeeding(*args, **kwargs): - sp = subprocess.run(*args, **kwargs, stdout=subprocess.PIPE) - return sp.returncode == 0 - -def get_subprocess_stdout(*args, **kwargs): - sp = subprocess.run(*args, **kwargs, stdout=subprocess.PIPE) - if sp.returncode != 0: - sys.stderr.write(sp.stdout) - sys.exit(1) - - return sp.stdout - - -def run_osbuild(manifest, store, output, export): - with tempfile.TemporaryFile(dir="/tmp", prefix="osbuild-test-case-generator-", suffix=".log") as log: - try: - subprocess.run(["osbuild", - "--store", store, - "--output-directory", output, - "--checkpoint", "build", - "--export", export, - "-"], - stdout=log, - stderr=subprocess.STDOUT, - check=True, - encoding="utf-8", - input=json.dumps(manifest)) - except: - log.seek(0) - print(log.read()) - raise - - -class TestCaseGenerator: - ''' - This class generates a json test case. It accepts a test_case_request as input to the constructor: - - { - "boot": { - "type": "qemu" - }, - "compose-request": { - "distro": "fedora-30", - "arch": "x86_64", - "image-type": "qcow2", - "filename": "disk.qcow2", - "blueprint": {} - } - } - - It then outputs a json test case from the get_test_case() method. - ''' - - def __init__(self, test_case_request): - self.test_case = test_case_request - - def get_test_case(self, no_image_info, store): - compose_request = json.dumps(self.test_case["compose-request"]) - - pipeline_command = ["go", "run", "./cmd/osbuild-pipeline", "-"] - self.test_case["manifest"] = json.loads(get_subprocess_stdout(pipeline_command, input=compose_request, encoding="utf-8")) - - pipeline_command = ["go", "run", "./cmd/osbuild-pipeline", "-rpmmd", "-"] - self.test_case["rpmmd"] = json.loads(get_subprocess_stdout(pipeline_command, input=compose_request, encoding="utf-8")) - - if no_image_info == False: - with tempfile.TemporaryDirectory(dir=store, prefix="test-case-output-") as output: - manifest = self.test_case["manifest"] - version = manifest.get("version", "1") - if version == "1": - export = "assembler" - elif version == "2": - export = manifest["pipelines"][-1]["name"] - else: - print(f"Unknown manifest format version {version}") - sys.exit(1) - run_osbuild(manifest, store, output, export) - image_file = os.path.join(output, export, self.test_case["compose-request"]["filename"]) - image_info = get_subprocess_stdout(["tools/image-info", image_file], encoding="utf-8") - self.test_case["image-info"] = json.loads(image_info) - - return self.test_case - - -def generate_test_case(test_type, distro, arch, output_format, test_case_request, keep_image_info, store, output): - print(f"generating test case for {output_format}", flush=True) - generator = TestCaseGenerator(test_case_request) - test_case = generator.get_test_case(keep_image_info, store) - name = distro.replace("-", "_") + "-" + arch + "-" + output_format.replace("-", "_") + "-" + test_type + ".json" - file_name = output + "/" + name - if keep_image_info: - try: - with open(file_name, 'r') as case_file: - old_test_case = json.load(case_file) - image_info = old_test_case.get("image-info") - if image_info: - test_case["image-info"] = image_info - except: - pass - with open(file_name, 'w') as case_file: - json.dump(test_case, case_file, indent=2) - case_file.write("\n") - - -def filter_repos_by_image_type_tags(image_type: str, repos: list) -> list: - filtered_repos = [] - for repo in repos: - image_type_tags = repo.get("image_type_tags") - # if image type tags are defined for the repository, add it to the list only - # if the provided image type is in the list of image type tags. - if image_type_tags is not None: - if image_type in image_type_tags: - filtered_repos.append(repo) - continue - filtered_repos.append(repo) - return filtered_repos - - -def main(distro, arch, image_types, keep_image_info, store, output): - with open("tools/test-case-generators/format-request-map.json") as format_request_json: - format_request_dict = json.load(format_request_json) - with open("tools/test-case-generators/repos.json") as repos_json: - repos_dict = json.load(repos_json) - - filtered_test_case_request_items = [ - "overrides", - "supported_arches" - ] - for output_format, test_case_request in format_request_dict.items(): - filtered_request = dict(filter(lambda i: i[0] not in filtered_test_case_request_items, test_case_request.items())) - if filtered_request["compose-request"]["image-type"] not in image_types: - continue - filtered_request["compose-request"]["distro"] = distro - # if the compose-request has specified supported arches, then generate - # the test case only if the requested arch is in the list - supported_arches = test_case_request.get("supported_arches") - if supported_arches is not None and arch not in supported_arches: - continue - filtered_request["compose-request"]["arch"] = arch - - request_repos = filter_repos_by_image_type_tags(filtered_request["compose-request"]["image-type"], repos_dict[distro][arch]) - filtered_request["compose-request"]["repositories"] = request_repos - - if distro in test_case_request["overrides"]: - filtered_request["compose-request"].update(test_case_request["overrides"][distro]) - - # Some image types can not be analyzed by 'image-info'. To be able to - # regenerate all test cases for a distro, this must be request definition - # and not a cli option. Default to the 'keep_image_info' value if not - # set explicitly for the image type. - no_image_info = filtered_request.get("no-image-info", keep_image_info) - generate_test_case("boot", distro, arch, output_format, filtered_request, no_image_info, store, output) - - return - - -if __name__ == '__main__': - parser = argparse.ArgumentParser(description="Generate test cases") - parser.add_argument("--distro", help="distribution for test cases", required=True) - parser.add_argument("--arch", help="architecture for test cases", required=True) - parser.add_argument("--image-types", help="image types for test cases", required=True, nargs='+') - parser.add_argument("--keep-image-info", action='store_true', help="skip image info (re)generation, but keep the one found in the existing test case") - parser.add_argument("--store", metavar="STORE_DIRECTORY", type=os.path.abspath, help="path to the osbuild store", required=True) - parser.add_argument("--output", metavar="OUTPUT_DIRECTORY", type=os.path.abspath, help="path to the output directory", required=True) - args = parser.parse_args() - - main(args.distro, args.arch, args.image_types, args.keep_image_info, args.store, args.output) - sys.exit()