debian-forge-composer/tools/test-case-generators/generate-all-test-cases
Tomas Hozza 9ec2788ac8 generate-all-test-cases: allow specifying additional DNF repos
Add a new option `--repofrompath` allowing to specify additional DNF
repositories, which will be used on the Runner when installing any
packages (such as osbuild).

Extend the `test/README.md` to mention the new option. In addition,
specify some aspects of the script in more detail, because some of
them were not easy to figure out by users.

Signed-off-by: Tomas Hozza <thozza@redhat.com>
2021-10-06 15:17:06 +02:00

1415 lines
52 KiB
Python
Executable file

#!/usr/bin/python3
# pylint: disable=line-too-long
"""
generate-all-test-cases
Script to generate all image test cases based on distro x arch x image-type
matrix read from `distro-arch-imagetype-map.json` or passed file. One can
filter the matrix just to a subset using `--distro`, `--arch` or
`--image-types` arguments.
The script is intended to be run from the osbuild-composer sources directory
root, for which the image test cases should be (re)generated. Alternatively,
one can specify the path to the sources using the `--sources` option.
The script generates image test cases in so-called Runner, which is a system
of a specific architecture. The used Runner type is specific to the used
command, but in general it is a system accessible via SSH connection.
In simplified example, the script does the following:
1. Provisions Runners if needed.
2. Waits for the Runner to be ready for use by running a specific command
on it.
3. Installs RPMs necessary for the test case generation on the Runner.
- In case you need to install packages from a specific external repository,
you can specify each such repository using --repofrompath option.
e.g. --repofrompath 'osbuild,https://download.copr.fedorainfracloud.org/results/@osbuild/osbuild/fedora-$releasever-$basearch/'
4. Copies the 'sources' using rsync to the Runner.
5. Executes the 'tools/test-case-generators/generate-test-cases' on the
runner for each requested distro and image type.
6. After each image test case is generated successfully, the result is
copied using rsync from the Runner to 'output' directory.
The script supports the following commands:
- 'qemu' - generates image test cases locally using QEMU VMs.
- 'remote' - generates image test cases on existing remote hosts.
'qemu' command
==============
Example (builds rhel-8 qcow2 images on aarch64 s390x ppc64le):
tools/test-case-generators/generate-all-test-cases \
--output test/data/manifests \
--arch aarch64 \
--arch s390x \
--arch ppc64le \
--distro rhel-8 \
--image-type qcow2 \
qemu \
--image-x86_64 ~/Downloads/Images/Fedora-Cloud-Base-33-1.2.x86_64.qcow2 \
--image-ppc64le ~/Downloads/Images/Fedora-Cloud-Base-33-1.2.ppc64le.qcow2 \
--image-aarch64 ~/Downloads/Images/Fedora-Cloud-Base-33-1.2.aarch64.qcow2 \
--image-s390x ~/Downloads/Images/Fedora-Cloud-Base-33-1.2.s390x.qcow2
When using this command, the script spins up an ephemeral QEMU VM per each
required architecture.
One can use e.q. Fedora cloud qcow2 images:
x86_64: https://download.fedoraproject.org/pub/fedora/linux/releases/33/Cloud/x86_64/images/Fedora-Cloud-Base-33-1.2.x86_64.qcow2
aarch64: https://download.fedoraproject.org/pub/fedora/linux/releases/33/Cloud/aarch64/images/Fedora-Cloud-Base-33-1.2.aarch64.qcow2
ppc64le: https://download.fedoraproject.org/pub/fedora-secondary/releases/33/Cloud/ppc64le/images/Fedora-Cloud-Base-33-1.2.ppc64le.qcow2
s390x: https://download.fedoraproject.org/pub/fedora-secondary/releases/33/Cloud/s390x/images/Fedora-Cloud-Base-33-1.2.s390x.qcow2
aarch64 special note:
make sure to have the *edk2-aarch64* package installed, which provides UEFI
builds for QEMU and AARCH64 (/usr/share/edk2/aarch64/QEMU_EFI.fd)
https://fedoraproject.org/wiki/Architectures/AArch64/Install_with_QEMU
Images need to have enough disk space to be able to build images using
osbuild. You can resize them using 'qemu-img resize <image> 20G' command.
HW requirements:
- The x86_64 VM uses 1 CPU and 1GB of RAM
- The aarch64, s390x and ppc64le VMs each uses 2CPU and 2GB of RAM
- Unless filtered using `--arch` option, the script starts 4 VMs in parallel
Tested with:
- Fedora 32 (x86_64) and QEMU version 4.2.1
'remote' command
================
Example (builds rhel-8 qcow2 images on aarch64 s390x ppc64le):
tools/test-case-generators/generate-all-test-cases \
--output test/data/manifests \
--arch aarch64 \
--arch s390x \
--arch ppc64le \
--distro rhel-8 \
--image-type qcow2 \
remote \
--host-ppc64le 192.168.1.10 \
--host-aarch64 192.168.1.20 \
--host-s390x 192.168.1.30
When using this command, the script uses existing remote hosts accessible
via SSH for each architecture.
"""
import argparse
import subprocess
import json
import os
import tempfile
import shutil
import time
import socket
import contextlib
import multiprocessing
import logging
import glob
import yaml
# setup logging
log = logging.getLogger("generate-all-test-cases")
log.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s [%(levelname)s] - %(processName)s: %(message)s")
sh = logging.StreamHandler()
sh.setFormatter(formatter)
log.addHandler(sh)
# list holding all supported generator classes
SUPPORTED_GENERATORS = []
# decorator to register new generator classes
def register_generator_cls(cls):
SUPPORTED_GENERATORS.append(cls)
return cls
class BaseRunner(contextlib.AbstractContextManager):
"""
Base class representing a generic runner, which is used for generating image
test case definitions.
'repos' is a list of strings such as "<repo>,<path/url>", specifying additional
DNF repositories to use when installing packages.
"""
def __init__(self, hostname, username="root", repos=[], port=22):
self.hostname = hostname
self.port = port
self.username = username
self.repos = repos
self.runner_ready = False
def run_command(self, command):
"""
Runs a given command on the Runner over ssh in a blocking fashion.
Calling this method before is_ready() returned True has undefined
behavior.
Returns stdin, stdout, stderr from the run command.
"""
ssh_command = [
"ssh",
"-oStrictHostKeyChecking=no", # don't verify the remote host's key
"-oUserKnownHostsFile=/dev/null", # don't add the remote host's key as trusted
"-oLogLevel=ERROR", # don't log warning that the host's key has been added as trusted
"-p", f"{self.port}",
f"{self.username}@{self.hostname}",
command
]
try:
# don't log commands when the vm is not yet ready for use
if self.runner_ready:
log.debug("Running on runner: '%s'", command)
completed_process = subprocess.run(ssh_command, capture_output=True, text=True)
except Exception as e:
# don't log errors when vm is not ready yet, because there are many errors
if self.runner_ready:
log.error("Running command over SSH failed: %s", str(e))
raise e
stdout = completed_process.stdout if completed_process.stdout else ""
stderr = completed_process.stderr if completed_process.stderr else ""
return stdout, stderr, completed_process.returncode
def run_command_check_call(self, command):
"""
Runs a command on the runner over SSH in a similar fashion as subprocess.check_call()
"""
stdout, stderr, ret = self.run_command(command)
if ret != 0:
raise subprocess.CalledProcessError(ret, command, stdout, stderr)
def run_command_check_output(self, command):
"""
Runs a command on the runner over SSH in a similar fashion as subprocess.check_output()
"""
stdout, stderr, ret = self.run_command(command)
if ret != 0:
raise subprocess.CalledProcessError(ret, command, stdout, stderr)
return stdout
@contextlib.contextmanager
def get_managed_workdir(self, basedir="~", cleanup=True):
"""
Context manager which creates a random workdir under the specified
'basedir' on the runner. The 'basedir' defaults to user's home ('~').
The created workdir is by default deleted on context manager exit,
unless 'cleanup' is set to False.
"""
workdir = self.run_command_check_output(f"TMPDIR={basedir} mktemp -d").strip()
try:
yield workdir
finally:
if cleanup:
self.run_command_check_output(f"sudo rm -rf {workdir}")
def copytree_to_runner(self, host_path, runner_path):
"""
Copies the content of 'host_path' directory from the host to the
'runner_path' directory on the runner using rsync.
"""
if not host_path[-1] == "/":
host_path += "/"
rsync_command = [
"rsync",
"-az",
"-e", f"ssh -p {self.port} -oStrictHostKeyChecking=no -oUserKnownHostsFile=/dev/null -oLogLevel=ERROR",
host_path,
f"{self.username}@{self.hostname}:{runner_path}"
]
subprocess.check_call(rsync_command)
def copytree_from_runner(self, runner_path, host_path):
"""
Copies the content of 'runner_path' directory from the runner to the
'host_path' directory on the host using rsync.
"""
if not runner_path[-1] == "/":
runner_path += "/"
rsync_command = [
"rsync",
"-az",
"-e", f"ssh -p {self.port} -oStrictHostKeyChecking=no -oUserKnownHostsFile=/dev/null -oLogLevel=ERROR",
f"{self.username}@{self.hostname}:{runner_path}",
host_path
]
subprocess.check_call(rsync_command)
def wait_until_ready(self, timeout=None, retry_sec=15):
"""
Waits for the runner until it is ready for use. This is determined by
executing the 'is_ready()' method in a blocking fashion.
This method is blocking, unless 'timeout' is provided.
"""
now = time.time()
while not self.is_ready():
if timeout is not None and time.time() > (now + timeout):
raise subprocess.TimeoutExpired("wait_until_ready()", timeout)
time.sleep(retry_sec)
# make sure that rsync is installed to be able to transfer the data
self.dnf_install(["rsync"])
def dnf_install(self, packages):
"""
Installs always the latest version of provided packages using DNF.
If the packages are already installed and there is a newer version in
tne repos, packages are upgraded.
If the runner was instantiated with a list of repositories, these will
be added to the DNF command.
"""
cmd = ["dnf", "-y"]
for repo in self.repos:
cmd.append(f"--repofrompath='{repo}'")
repo_name, _ = repo.split(',', 1)
cmd.append(f"--setopt={repo_name}.gpgcheck=0")
self.run_command_check_call(" ".join(cmd + ["--refresh", "install"] + packages))
self.run_command_check_call(" ".join(cmd + ["upgrade"] + packages))
def is_ready(self, command="id"):
"""
Returns True if the runner is ready to be used, which is determined by
running the provided 'command', which must exit with 0 return value.
"""
if self.runner_ready:
return True
try:
# run command to determine if the host is ready for use
self.run_command_check_call(command)
except (subprocess.CalledProcessError) as _:
# ignore exceptions, this is useful when the host is still stating up
pass
else:
log.debug("Runner is ready for use")
self.runner_ready = True
return self.runner_ready
class RemoteRunner(BaseRunner):
"""
Runner class representing existing remote host accessible via SSH.
"""
def __exit__(self, *exc_details):
pass
class BaseQEMURunner(BaseRunner):
"""
Base class representing a QEMU VM runner, which is used for generating image
test case definitions.
Each architecture-specific runner should inherit from this class and define
QEMU_BIN, QEMU_CMD class variable. These will be used to successfully boot
VM for the given architecture.
"""
# name of the QEMU binary to use for running the VM
QEMU_BIN = None
# the actual command to use for running QEMU VM
QEMU_CMD = None
DEFAULT_CI_USER_DATA = {
"user": "admin",
"sudo": "ALL=(ALL) NOPASSWD:ALL"
}
def __init__(self, image, username, repos=[], cdrom_iso=None):
super().__init__("localhost", username, repos)
self._check_qemu_bin()
# path to image to run
self.image = image
# path to cdrom iso to attach (for cloud-init)
self.cdrom_iso = cdrom_iso
# Popen object of the qemu process
self.vm = None
# following values are set after the VM is terminated
self.vm_return_code = None
self.vm_stdout = None
self.vm_stderr = None
def _check_qemu_bin(self):
"""
Checks whether QEMU binary used for the particular runner is present
on the system.
"""
try:
subprocess.check_call([self.QEMU_BIN, "--version"])
except subprocess.CalledProcessError as _:
raise RuntimeError("QEMU binary {} not found".format(self.QEMU_BIN))
def _get_qemu_cdrom_option(self):
"""
Get the appropriate options for attaching CDROM device to the VM, if
the path to ISO has been provided.
This method may be reimplemented by architecture specific runner class
if needed. Returns a list of strings to be appended to the QEMU command.
"""
if self.cdrom_iso:
return ["-cdrom", self.cdrom_iso]
return list()
def _get_qemu_boot_image_option(self):
"""
Get the appropriate options for specifying the image to boot from.
This method may be reimplemented by architecture specific runner class
if needed.
Returns a list of strings to be appended to the QEMU command.
"""
return [self.image]
def _get_qemu_ssh_fwd_option(self):
"""
Get the appropriate options for forwarding guest's port 22 to host's
random available port.
"""
# get a random free TCP port. This should work in majority of cases
with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.bind(('localhost', 0))
self.port = sock.getsockname()[1]
return ["-net", "user,hostfwd=tcp::{}-:22".format(self.port)]
def _run_qemu_cmd(self, qemu_cmd):
"""
Assembles the QEMU command to run and executes using subprocess.
"""
# handle CDROM
qemu_cmd.extend(self._get_qemu_cdrom_option())
# handle boot image
qemu_cmd.extend(self._get_qemu_boot_image_option())
# handle forwarding of guest's SSH port to host
qemu_cmd.extend(self._get_qemu_ssh_fwd_option())
log.debug("Starting VM using command: '%s'", " ".join(qemu_cmd))
self.vm = subprocess.Popen(
qemu_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
def start(self):
"""
Starts the QEMU process running the VM
"""
if not self.QEMU_CMD:
raise NotImplementedError("The way to spin up QEMU VM is not implemented")
# don't start the qemu process if there is already one running
if self.vm is None:
self._run_qemu_cmd(list(self.QEMU_CMD))
log.info(
"Runner started. You can SSH to it once it has been configured:" + \
"'ssh %s@localhost -p %d'",
self.username,
self.port
)
def stop(self):
"""
Stops the QEMU process running the VM
"""
if self.vm:
self.vm.terminate()
try:
# give the process some time to terminate
self.vm.wait(timeout=15)
except subprocess.TimeoutExpired as _:
self.vm.kill()
self.vm.wait(timeout=15)
if self.vm.stdout:
self.vm_stdout = self.vm.stdout.read().decode()
if self.vm.stderr:
self.vm_stderr = self.vm.stderr.read().decode()
self.vm_return_code = self.vm.returncode
if self.vm_return_code == 0:
log.debug("%s process ended with return code %d\n\n" + \
"stdout:\n%s\nstderr:\n%s", self.QEMU_BIN,
self.vm_return_code, self.vm_stdout, self.vm_stderr)
else:
log.error("%s process ended with return code %d\n\n" + \
"stdout:\n%s\nstderr:\n%s", self.QEMU_BIN,
self.vm_return_code, self.vm_stdout, self.vm_stderr)
self.vm = None
self.runner_ready = False
def is_ready(self, command="ls /var/lib/cloud/instance/boot-finished"):
"""
Returns True if the VM is ready to be used.
VM is ready after the cloud-init setup is finished.
"""
if self.runner_ready:
return True
# check if the runner didn't terminate unexpectedly before being ready
try:
if self.vm:
self.vm.wait(1)
except subprocess.TimeoutExpired as _:
# process still running
pass
else:
# process not running, call .stop() to log stdout, stderr and retcode
self.stop()
qemu_bin = self.QEMU_BIN
raise RuntimeError(f"'{qemu_bin}' process ended before being ready to use")
return super().is_ready(command)
def __enter__(self):
self.start()
return self
def __exit__(self, *exc_details):
self.stop()
@classmethod
def create_default_ci_userdata(cls, workdir):
"""
Creates the default 'user-data.yml' file for cloud-init inside the
'workdir'. The path of the created file is returned.
"""
default_ci_userdata_path = f"{workdir}/user-data.yml"
with open(default_ci_userdata_path, "w") as f:
f.write("#cloud-config\n")
yaml.safe_dump(cls.DEFAULT_CI_USER_DATA, f)
return default_ci_userdata_path
@staticmethod
def ci_userdata_add_authorized_ssh_key(userdata_file, ssh_id_file):
"""
Modifies the provided 'userdata_file' in-place by appending the provided
'ssh_id_file' as authorized SSH key to it.
"""
append_data = {}
with open(ssh_id_file, encoding="utf-8") as f:
append_data["ssh_authorized_keys"] = [f.read().strip()]
with open(userdata_file, "a") as f:
yaml.safe_dump(append_data, f, width=float("inf"))
@staticmethod
def prepare_cloud_init_cdrom(ssh_id_file, workdir, userdata=None):
"""
Generates a CDROM ISO used as a data source for cloud-init.
Returns path to the generated CDROM ISO image and path to the used
cloud-init userdata.
"""
iso_path = os.path.join(workdir, "cloudinit.iso")
cidatadir = os.path.join(workdir, "cidata")
user_data_path = os.path.join(cidatadir, "user-data")
meta_data_path = os.path.join(cidatadir, "meta-data")
os.mkdir(cidatadir)
# If no userdata were provided, use the default one
if not userdata:
userdata = BaseQEMURunner.create_default_ci_userdata(workdir)
log.debug("Using default cloud-init user-data created at: %s", userdata)
if os.path.isdir(userdata):
# create a copy of the provided userdata, since they will be modified
userdata_tmp_dir = f"{workdir}/ci_userdata_copy"
os.makedirs(userdata_tmp_dir)
shutil.copytree(userdata, userdata_tmp_dir)
userdata = userdata_tmp_dir
# Add the ssh key to the user-data
userdata_file = f"{userdata}/user-data.yml"
BaseQEMURunner.ci_userdata_add_authorized_ssh_key(userdata_file, ssh_id_file)
with open(user_data_path, "w") as f:
script_dir = os.path.dirname(__file__)
subprocess.check_call(
[os.path.abspath(f"{script_dir}/../gen-user-data"), userdata], stdout=f)
else:
shutil.copy(userdata, user_data_path)
# Add the ssh key to the user-data
BaseQEMURunner.ci_userdata_add_authorized_ssh_key(user_data_path, ssh_id_file)
with open(meta_data_path, "w") as f:
f.write("instance-id: nocloud\nlocal-hostname: vm\n")
sysname = os.uname().sysname
log.debug("Generating CDROM ISO image for cloud-init user data: %s", iso_path)
if sysname == "Linux":
subprocess.check_call(
[
"mkisofs",
"-input-charset", "utf-8",
"-output", iso_path,
"-volid", "cidata",
"-joliet",
"-rock",
"-quiet",
"-graft-points",
user_data_path,
meta_data_path
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
elif sysname == "Darwin":
subprocess.check_call(
[
"hdiutil",
"makehybrid",
"-iso",
"-joliet",
"-o", iso_path,
f"{cidatadir}"
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
else:
raise NotImplementedError(f"Unsupported system '{sysname}' for generating cdrom iso")
return iso_path, userdata
class X86_64QEMURunner(BaseQEMURunner):
"""
VM Runner for x86_64 architecture
"""
QEMU_BIN = "qemu-system-x86_64"
QEMU_CMD = [
QEMU_BIN,
"-M", "accel=kvm:hvf",
"-m", "1024",
"-object", "rng-random,filename=/dev/urandom,id=rng0",
"-device", "virtio-rng-pci,rng=rng0",
"-snapshot",
"-cpu", "max",
"-net", "nic,model=virtio",
]
class Ppc64QEMURunner(BaseQEMURunner):
"""
VM Runner for ppc64le architecture
"""
QEMU_BIN = "qemu-system-ppc64"
QEMU_CMD = [
QEMU_BIN,
"-m", "2048", # RAM
"-smp", "2", # CPUs
"-object", "rng-random,filename=/dev/urandom,id=rng0",
"-device", "virtio-rng-pci,rng=rng0",
"-snapshot",
"-net", "nic,model=virtio",
]
class Aarch64QEMURunner(BaseQEMURunner):
"""
VM Runner for aarch64 architecture
"""
# aarch64 requires UEFI build for QEMU
# https://rwmj.wordpress.com/2015/02/27/how-to-boot-a-fedora-21-aarch64-uefi-guest-on-x86_64/
# https://fedoraproject.org/wiki/Architectures/AArch64/Install_with_QEMU
QEMU_BIN = "qemu-system-aarch64"
QEMU_CMD = [
QEMU_BIN,
"-m", "2048", # RAM
"-smp", "2", # CPUs
"-object", "rng-random,filename=/dev/urandom,id=rng0",
"-device", "virtio-rng-pci,rng=rng0",
"-snapshot",
"-monitor", "none",
"-machine", "virt",
"-cpu", "cortex-a57",
"-bios", "/usr/share/edk2/aarch64/QEMU_EFI.fd", # provided by 'edk2-aarch64' Fedora package
"-net", "nic,model=virtio",
]
class S390xQEMURunner(BaseQEMURunner):
"""
VM Runner for s390x architecture
"""
QEMU_BIN = "qemu-system-s390x"
QEMU_CMD = [
QEMU_BIN,
"-m", "2048", # RAM
"-smp", "2", # CPUs
"-machine", "s390-ccw-virtio",
# disable msa5-base to suppress errors:
# qemu-system-s390x: warning: 'msa5-base' requires 'kimd-sha-512'
# qemu-system-s390x: warning: 'msa5-base' requires 'klmd-sha-512'
"-cpu", "max,msa5-base=no",
"-object", "rng-random,filename=/dev/urandom,id=rng0",
"-device", "virtio-rng-ccw,rng=rng0",
"-monitor", "none",
"-snapshot",
"-net", "nic,model=virtio",
]
def _get_qemu_cdrom_option(self):
"""
Get the appropriate options for attaching CDROM device to the VM, if the path to ISO has been provided.
s390x tries to boot from the CDROM if attached the way as BaseRunner does it.
"""
if self.cdrom_iso:
iso_path = self.cdrom_iso
return list(["-drive", f"file={iso_path},media=cdrom"])
else:
return list()
def _get_qemu_boot_image_option(self):
"""
Get the appropriate options for specifying the image to boot from.
s390x needs to have an explicit 'bootindex' specified.
https://qemu.readthedocs.io/en/latest/system/s390x/bootdevices.html
"""
image_path = self.image
return [
"-drive", f"if=none,id=dr1,file={image_path}",
"-device", "virtio-blk,drive=dr1,bootindex=1"
]
class BaseTestCaseMatrixGenerator(contextlib.AbstractContextManager):
"""
Base class representing generation of all test cases based on provided test
cases matrix using any runner.
"""
# Define an appropriate Runner class for each supported architecture name
# in the child class.
# Example:
# arch_runner_map = {
# "x86_64": MyX86_64Runner,
# "aarch64": MyAarch64Runner,
# "ppc64le": MyPpc64Runner,
# "s390x": MyS390xRunner
# }
arch_runner_map = {}
# packages to be installed on the Runner before generating test cases
install_rpms_list = [
"osbuild",
"osbuild-selinux",
"osbuild-ostree",
"osbuild-composer",
"golang",
"python3-pyyaml", # needed by image-info
]
def __init__(self, arch_gen_matrix, sources, output, ssh_id_file, repos=[], keep_workdir=False, log_level=logging.INFO):
"""
'arch_get_matrix' is a dict of requested distro-image_type matrix per architecture:
{
"arch1": {
"distro1": [
"image-type1",
"image-type2"
],
"distro2": [
"image-type2",
"image-type3"
]
},
"arch2": {
"distro2": [
"image-type2"
]
},
...
}
'sources' is a directory path with the osbuild-composer sources, which will be used to generate image test
cases.
'output' is a directory path, where the generated test case manifests should be stored.
'ssh_id_file' is path to the SSH ID file to use as the authorized key for the QEMU VMs.
'repos' is a list of strings such as "<repo>,<path/url>", specifying additional
DNF repositories to use when installing packages.
'keep_workdir' is a boolean specifying if the workdir created on the remote host should be deleted
after the runner finishes its work.
'log_level' is the desired log level to be used by new processes created for each runner.
"""
self._processes = list()
self.arch_gen_matrix = arch_gen_matrix
self.sources = sources
self.output = output
self.ssh_id_file = ssh_id_file
self.repos = repos
self.keep_workdir = keep_workdir
self.log_level = log_level
# check that the generator class supports each needed architecture
for arch in self.arch_gen_matrix.keys():
if self.arch_runner_map.get(arch) is None:
raise RuntimeError(f"architecture '{arch}' is not supported by {self.__class__.__name__}")
def generate(self):
"""
Generates all test cases based on provided data in a blocking manner.
The method must be implemented in the child class and call '_generate()' method.
"""
# In a child class:
# 1. Construct a dictionary of architecture-specific runner class arguments in 'arch_runner_cls_args'
# 2. call 'self._generate(arch_runner_cls_args)'
raise NotImplementedError()
def _generate(self, arch_runner_cls_args_map):
"""
Generates all test cases based on provided data in a blocking manner.
The method runs a separate Runner for each architecture. All runners
are run in parallel in a new process. The method blocks until all runners
finish their work.
"""
# Start a separate runner for each required architecture
for arch in self.arch_gen_matrix.keys():
process = multiprocessing.Process(
target=self._runner_process_main,
args=(self.arch_runner_map[arch], arch_runner_cls_args_map[arch], arch))
process.name = f"{arch}-Runner"
self._processes.append(process)
process.start()
log.info("Started '%s'", process.name)
# wait for all processes to finish
log.info("Waiting for all runner processes to finish")
for process in self._processes:
process.join()
self._processes.clear()
def _runner_process_main(self, runner_cls, runner_cls_args, arch):
"""
Main function of a process generating test cases for a single architecture
using the provided Runner class
"""
# set the expected log level in the new process
log.setLevel(self.log_level)
# spin up appropriate VM represented by 'runner'
with runner_cls(*runner_cls_args) as runner:
self._generate_arch_with_runner(runner, arch)
def _generate_arch_with_runner(self, runner, arch):
"""
Generate test cases for one architecture using the provided Runner.
'runner' is a specific Runner class instance, which can be used to generate
image test cases.
'arch' is the architecture of the Runner class instance. This information
is used to determine which image test cases should be generated.
"""
current_process_name = multiprocessing.current_process().name
generation_matrix = self.arch_gen_matrix[arch]
go_tls_timeout_retries = 3
log.info("Waiting for '%s' to become ready", current_process_name)
runner.wait_until_ready()
# First create a workdir, which will be deleted after everything is finished
with runner.get_managed_workdir(cleanup=not self.keep_workdir) as runner_workdir:
log.debug("Using '%s' as a workdir", runner_workdir)
# don't use /var/tmp for osbuild's store directory to prevent systemd from possibly
# removing some of the downloaded RPMs due to "ageing"
runner_osbuild_store_dir = f"{runner_workdir}/osbuild-store"
runner.run_command_check_call(f"mkdir {runner_osbuild_store_dir}")
# install necessary packages
runner.dnf_install(self.install_rpms_list)
# Log installed versions of important RPMs
rpm_versions, _, _ = runner.run_command("rpm -q osbuild osbuild-composer")
log.info("Installed packages: %s", " ".join(rpm_versions.split("\n")))
# copy sources from the host to the runner
log.info("Copying sources to the runner")
runner_sources_dir = f"{runner_workdir}/sources"
runner.copytree_to_runner(self.sources, runner_sources_dir)
# create output directory for the results on the runner
runner_output_dir = f"{runner_workdir}/output"
runner.run_command_check_call(f"mkdir {runner_output_dir}")
# Workaround the problem that 'image-info' can not read SELinux labels unknown to the host.
# It is not possible to relabel the 'image-info' in the mounted path, because it is read-only.
# Also bind-mounting copy of image-info with proper SELinux labels to /mnt/sources didn't do the trick.
# For the reason above, make a full copy of sources in /home/admin and operate on it instead.
osbuild_label = runner.run_command_check_output("matchpathcon -n /usr/bin/osbuild")
osbuild_label = osbuild_label.strip()
image_info_runner_path = f"{runner_sources_dir}/tools/image-info"
runner.run_command_check_call(f"chcon {osbuild_label} {image_info_runner_path}")
results = {}
for distro, img_type_list in generation_matrix.items():
results[distro] = distro_results = {"SUCCESS": [], "FAIL": []}
for image_type in img_type_list:
log.info("Generating test case for '%s' '%s' image on '%s'", distro, image_type, arch)
gen_test_cases_cmd = f"cd {runner_sources_dir}; sudo tools/test-case-generators/generate-test-cases" + \
f" --distro {distro} --arch {arch} --image-types {image_type}" + \
f" --store {runner_osbuild_store_dir} --output {runner_output_dir}"
# allow fixed number of retries if the command fails for a specific reason
for i in range(1, go_tls_timeout_retries+1):
if i > 1:
log.info("Retrying image test case generation (%d of %d)", i, go_tls_timeout_retries)
stdout, stderr, retcode = runner.run_command(gen_test_cases_cmd)
if retcode != 0:
log.error("Generating test case for %s-%s-%s - FAIL\nretcode: %d\nstdout: %s\nstderr: %s",
distro, arch, image_type, retcode, stdout, stderr)
# Retry the command, if there was an error due to TLS handshake timeout
# This is happening on all runners using other than host's arch from time to time.
if stderr.find("net/http: TLS handshake timeout") != -1:
continue
distro_results["FAIL"].append(image_type)
else:
log.info("Generating test case for %s-%s-%s - SUCCESS", distro, arch, image_type)
distro_results["SUCCESS"].append(image_type)
# don't retry if the process ended successfully or if there was a different error
break
# copy partial results back to the host
runner.copytree_from_runner(runner_output_dir, self.output)
# clean up the store direcotry after each distro, to prevent running out of space
runner.run_command_check_call(f"sudo rm -rf {runner_osbuild_store_dir}/*")
log.info("'%s' finished its work", current_process_name)
log.info("Results: %s", results)
def _cleanup(self):
"""
Terminates all running Runner processes.
"""
# ensure that all Runner processes are stopped
for process in self._processes:
process.terminate()
process.join(5)
# kill the process if it didn't terminate yet
if process.exitcode is None:
process.kill()
process.close()
self._processes.clear()
def __exit__(self, *exc_details):
self._cleanup()
def __getstate__(self):
# references to already spawned processes are problematic for pickle
state = self.__dict__.copy()
# remove problematic variable
state.pop("_processes")
return state
@staticmethod
def add_subparser(subparsers):
raise NotImplementedError()
@staticmethod
def main(arch_gen_matrix_dict, sources, output, ssh_id_file, repos, keep_workdir, parser_args):
raise NotImplementedError()
@register_generator_cls
class QEMUTestCaseMatrixGenerator(BaseTestCaseMatrixGenerator):
"""
Class representing generation of all test cases based on provided test
cases matrix using QEMU runners.
The class should be used as a context manager to ensure that cleanup
of all resources is done (mainly VMs and processes running them).
VM for each architecture is run in a separate process to ensure that
generation is done in parallel.
"""
arch_runner_map = {
"x86_64": X86_64QEMURunner,
"aarch64": Aarch64QEMURunner,
"ppc64le": Ppc64QEMURunner,
"s390x": S390xQEMURunner
}
def __init__(self, images, arch_gen_matrix, sources, output, ssh_id_file, repos=[], ci_userdata=None, keep_workdir=False, log_level=logging.INFO):
"""
'images' is a dict of qcow2 image paths for each supported architecture,
that should be used for VMs:
{
"arch1": "<image path>",
"arch2": "<image path>",
...
}
'arch_get_matrix' is a dict of requested distro-image_type matrix per architecture:
{
"arch1": {
"distro1": [
"image-type1",
"image-type2"
],
"distro2": [
"image-type2",
"image-type3"
]
},
"arch2": {
"distro2": [
"image-type2"
]
},
...
}
'sources' is a directory path with the osbuild-composer sources, which will be used to generate image test
cases.
'output' is a directory path, where the generated test case manifests should be stored.
'ssh_id_file' is path to the SSH ID file to use as the authorized key for the QEMU VMs.
'ci_userdata' is path to file / directory containing cloud-init user-data used
for generating CDROM ISO image, that is attached to each VM as a cloud-init data source.
If the value is not provided, then the default internal cloud-init user-data are used.
"""
super().__init__(arch_gen_matrix, sources, output, ssh_id_file, repos, keep_workdir, log_level)
self.images = images
self.ci_userdata = ci_userdata
# check that we have image for each needed architecture
for arch in self.arch_gen_matrix.keys():
if self.images.get(arch) is None:
raise RuntimeError(f"architecture '{arch}' is in requested test matrix, but no image was provided")
def generate(self):
"""
Generates all test cases based on provided data in a blocking manner.
"""
# use the same CDROM ISO image for all VMs
with tempfile.TemporaryDirectory(prefix="osbuild-composer-test-gen-") as tmpdir:
cdrom_iso, used_userdata = BaseQEMURunner.prepare_cloud_init_cdrom(
self.ssh_id_file, tmpdir, self.ci_userdata
)
# Load user from the cloud-init user-data
if os.path.isdir(used_userdata):
user_data_path = f"{used_userdata}/user-data.yml"
else:
user_data_path = used_userdata
with open(user_data_path, "r") as ud:
user_data = yaml.safe_load(ud)
vm_user = user_data["user"]
# Create architecture-specific map or runner class arguments and start the test case generation.
arch_runner_cls_args_map = {}
for arch in self.arch_gen_matrix.keys():
arch_runner_cls_args_map[arch] = (self.images[arch], vm_user, self.repos, cdrom_iso)
self._generate(arch_runner_cls_args_map)
@staticmethod
def add_subparser(subparsers):
"""
Adds subparser for the 'qemu' command
"""
parser_qemu = subparsers.add_parser(
"qemu",
description="generate test cases locally using QEMU",
help="generate test cases locally using QEMU"
)
parser_qemu.add_argument(
"--image-x86_64",
metavar="PATH",
help="x86_64 image to use for QEMU VM",
required=False
)
parser_qemu.add_argument(
"--image-ppc64le",
metavar="PATH",
help="ppc64le image to use for QEMU VM",
required=False
)
parser_qemu.add_argument(
"--image-aarch64",
metavar="PATH",
help="aarch64 image to use for QEMU VM",
required=False
)
parser_qemu.add_argument(
"--image-s390x",
metavar="PATH",
help="s390x image to use for QEMU VM",
required=False
)
parser_qemu.add_argument(
"--ci-userdata",
metavar="PATH",
help="file or directory with cloud-init user-data, to use to configure runner VMs",
type=os.path.abspath
)
parser_qemu.set_defaults(func=QEMUTestCaseMatrixGenerator.main)
@staticmethod
def main(arch_gen_matrix_dict, sources, output, ssh_id_file, repos, keep_workdir, parser_args):
"""
The main function of the 'qemu' command
"""
vm_images = {
"x86_64": parser_args.image_x86_64,
"aarch64": parser_args.image_aarch64,
"ppc64le": parser_args.image_ppc64le,
"s390x": parser_args.image_s390x
}
ci_userdata = parser_args.ci_userdata
with QEMUTestCaseMatrixGenerator(
vm_images, arch_gen_matrix_dict, sources, output,
ssh_id_file, repos, ci_userdata, keep_workdir, log.level) as generator:
generator.generate()
@register_generator_cls
class RemoteTestCaseMatrixGenerator(BaseTestCaseMatrixGenerator):
"""
Class representing generation of all test cases based on provided test
cases matrix using existing remote runners.
"""
arch_runner_map = {
"x86_64": RemoteRunner,
"aarch64": RemoteRunner,
"ppc64le": RemoteRunner,
"s390x": RemoteRunner
}
def __init__(self, hosts, username, arch_gen_matrix, sources, output, ssh_id_file, repos, keep_workdir, log_level=logging.INFO):
"""
'hosts' is a dict of a remote system hostnames or IP addresses for each supported architecture,
that should be used to generate image test cases:
{
"arch1": "<hostname/IP>",
"arch2": "<hostname/IP>",
...
}
'username' is a username to be used to SSH to the remote hosts. The same username is used for all remote
hosts.
'arch_get_matrix' is a dict of requested distro-image_type matrix per architecture:
{
"arch1": {
"distro1": [
"image-type1",
"image-type2"
],
"distro2": [
"image-type2",
"image-type3"
]
},
"arch2": {
"distro2": [
"image-type2"
]
},
...
}
'sources' is a directory path with the osbuild-composer sources, which will be used to generate image test
cases.
'output' is a directory path, where the generated test case manifests should be stored.
'ssh_id_file' is path to the SSH ID file to use as the authorized key for the QEMU VMs.
"""
super().__init__(arch_gen_matrix, sources, output, ssh_id_file, repos, keep_workdir, log_level)
self.hosts = hosts
self.username = username
# check that we have image for each needed architecture
for arch in self.arch_gen_matrix.keys():
if self.hosts.get(arch) is None:
raise RuntimeError(f"architecture '{arch}' is in requested test matrix, but no host was provided")
def generate(self):
"""
Generates all test cases based on provided data in a blocking manner.
"""
# Create architecture-specific map or runner class arguments and start the test case generation.
arch_runner_cls_args_map = {}
for arch in self.arch_gen_matrix.keys():
arch_runner_cls_args_map[arch] = (self.hosts[arch], self.username, self.repos)
self._generate(arch_runner_cls_args_map)
@staticmethod
def add_subparser(subparsers):
"""
Adds subparser for the 'remote' command
"""
parser_remote = subparsers.add_parser(
"remote",
description="generate test cases on existing remote systems",
help="generate test cases on existing remote systems"
)
parser_remote.add_argument(
"--host-x86_64",
metavar="HOSTNAME",
help="hostname or an IP address of the remote x86_64 host",
required=False
)
parser_remote.add_argument(
"--host-ppc64le",
metavar="HOSTNAME",
help="hostname or an IP address of the remote ppc64le host",
required=False
)
parser_remote.add_argument(
"--host-aarch64",
metavar="HOSTNAME",
help="hostname or an IP address of the remote aarch64 host",
required=False
)
parser_remote.add_argument(
"--host-s390x",
metavar="HOSTNAME",
help="hostname or an IP address of the remote s390x host",
required=False
)
parser_remote.add_argument(
"-u", "--username",
metavar="USER",
help="username to use to SSH to the remote systems. The same username " + \
"is used to connect to all remote hosts. (default 'root')",
default="root"
)
parser_remote.set_defaults(func=RemoteTestCaseMatrixGenerator.main)
@staticmethod
def main(arch_gen_matrix_dict, sources, output, ssh_id_file, repos, keep_workdir, parser_args):
"""
The main function of the 'remote' command
"""
hosts = {
"x86_64": parser_args.host_x86_64,
"aarch64": parser_args.host_aarch64,
"ppc64le": parser_args.host_ppc64le,
"s390x": parser_args.host_s390x
}
username = parser_args.username
with RemoteTestCaseMatrixGenerator(
hosts, username, arch_gen_matrix_dict, sources, output,
ssh_id_file, repos, keep_workdir, log.level) as generator:
generator.generate()
def get_default_ssh_id_file():
"""
Returns the path of the default SSH ID file to use.
The defalt SSH ID file is the most recent file that matches: ~/.ssh/id*.pub,
(excluding those that match ~/.ssh/*-cert.pub). This mimics the bahaviour
of the 'ssh-copy-id' command.
"""
id_files = glob.glob(os.path.expanduser("~/.ssh/id*.pub"))
id_files = [f for f in id_files if not f.endswith("-cert.pub")]
try:
most_recent_file = id_files[0]
except IndexError as e:
raise RuntimeError("Found no files matching '~/.ssh/id*.pub'") from e
most_recent_file_mtime = os.path.getmtime(most_recent_file)
for id_file in id_files[1:]:
id_file_mtime = os.path.getmtime(id_file)
if most_recent_file_mtime < id_file_mtime:
most_recent_file = id_file
most_recent_file_mtime = id_file_mtime
return most_recent_file
def get_args():
"""
Returns ArgumentParser instance specific to this script.
"""
parser = argparse.ArgumentParser(description="(re)generate image all test cases")
parser.add_argument(
"--output",
metavar="DIR",
type=os.path.abspath,
help="directory for storing generated image test cases",
required=True
)
parser.add_argument(
"--sources",
metavar="DIR",
type=os.path.abspath,
help="osbuild-composer sources directory used when generate test cases. " + \
"If not provided, the current working directory is used."
)
parser.add_argument(
"--distro",
help="reduce the generation matrix only to specified distribution. " + \
"Can be specified multiple times.",
action="append",
default=[]
)
parser.add_argument(
"--arch",
help="reduce the generation matrix only to specified architecture. " + \
"Can be specified multiple times.",
action="append",
default=[]
)
parser.add_argument(
"--image-type",
metavar="TYPE",
help="reduce the generation matrix only to specified image type." + \
"Can be specified multiple times.",
action="append",
default=[]
)
parser.add_argument(
"--gen-matrix-file",
metavar="PATH",
help="JSON file with test case generation matrix (distro x arch x image type)." + \
" If not provided, '<script_location_dir>/distro-arch-imagetype-map.json' is read.",
type=os.path.abspath
)
parser.add_argument(
"-i", "--ssh-id-file",
metavar="PATH",
help="SSH ID file to use for authenticating to the runner VMs. If the file does not end with " + \
".pub, it will be appended to it.",
type=os.path.abspath
)
parser.add_argument(
"--keep-workdir",
action="store_true",
help="Don't delete the workdir created on the remote host after finishing.",
default=False
)
parser.add_argument(
"--repofrompath",
metavar="<repo>,<path/url>",
action="append",
help="Specify a repository to add to the repositories used when installing packages on the runner. " + \
"Can be specified multiple times.",
default=[]
)
parser.add_argument(
"-d", "--debug",
action='store_true',
default=False,
help="turn on debug logging."
)
subparsers = parser.add_subparsers(dest="command")
subparsers.required = True
for supported_generator_cls in SUPPORTED_GENERATORS:
supported_generator_cls.add_subparser(subparsers)
return parser.parse_args()
def main(args):
output = args.output
sources = args.sources if args.sources else os.getcwd()
gen_matrix_file = args.gen_matrix_file
distros = args.distro
arches = args.arch
image_types = args.image_type
repos = args.repofrompath
keep_workdir = args.keep_workdir
# determine the SSH ID file to be used
ssh_id_file = args.ssh_id_file
if not ssh_id_file:
ssh_id_file = get_default_ssh_id_file()
if not ssh_id_file.endswith(".pub"):
ssh_id_file += ".pub"
log.debug("Using SSH ID file: %s", ssh_id_file)
if not os.path.isdir(output):
raise RuntimeError(f"output directory {output} does not exist")
script_dir = os.path.dirname(__file__)
gen_matrix_path = gen_matrix_file if gen_matrix_file else f"{script_dir}/distro-arch-imagetype-map.json"
log.info("Loading generation matrix from file: '%s'", gen_matrix_path)
with open(gen_matrix_path, "r") as gen_matrix_json:
gen_matrix_dict = json.load(gen_matrix_json)
# Filter generation matrix based on passed arguments
for distro in list(gen_matrix_dict.keys()):
# filter the distros list
if distros and distro not in distros:
del gen_matrix_dict[distro]
continue
for arch in list(gen_matrix_dict[distro].keys()):
# filter the arches list of a distro
if arches and arch not in arches:
del gen_matrix_dict[distro][arch]
continue
# filter the image types of a distro and arch
if image_types:
gen_matrix_dict[distro][arch] = list(filter(lambda x: x in image_types, gen_matrix_dict[distro][arch]))
# delete the whole arch if there is no image type left after filtering
if len(gen_matrix_dict[distro][arch]) == 0:
del gen_matrix_dict[distro][arch]
log.debug("gen_matrix_dict:\n%s", json.dumps(gen_matrix_dict, indent=2, sort_keys=True))
# Construct per-architecture matrix dictionary of distro x image type
arch_gen_matrix_dict = dict()
for distro, arches in gen_matrix_dict.items():
for arch, image_types in arches.items():
try:
arch_dict = arch_gen_matrix_dict[arch]
except KeyError as _:
arch_dict = arch_gen_matrix_dict[arch] = dict()
arch_dict[distro] = image_types.copy()
log.debug("arch_gen_matrix_dict:\n%s", json.dumps(arch_gen_matrix_dict, indent=2, sort_keys=True))
args.func(arch_gen_matrix_dict, sources, output, ssh_id_file, repos, keep_workdir, args)
if __name__ == '__main__':
args = get_args()
if args.debug:
log.setLevel(logging.DEBUG)
try:
main(args)
except KeyboardInterrupt as _:
log.info("Interrupted by user")