debian-forge-composer/tools/test-case-generators/generate-all-test-cases
Tomas Hozza 94c2a6268c generate-all-test-cases: use SSH keys instead of password for VMs
Previously passwords were used to log into provisioned QEMU VMs. This is
not practical if one would like to use e.g. rsync to transfer files from
and to the VM. The script now does not use passwords at all, but instead
configures the most recent SSH key from the system matching
'~/.ssh/id*.pub' as an authorized key on the VM. Alternatively the SSH
key to be used can be provided as an argument to the script.

In addition, the script no longer relies on external files for
cloud-init user-data. If no cloud-init user-data are provided as an
argument to the script creates default user-data file in a temporary
work directory and uses it. The reason for this change is mostly that the
default user-data became very short and need to be always extended with
the authorized SSH key anyway. In addition, this makes the script more
standalone by relying on fewer external file paths.

Delete the `tools/deploy/gen-test-data` which held the cloud-init
user-data previously used by default by the script.

Signed-off-by: Tomas Hozza <thozza@redhat.com>
2021-09-22 09:12:57 +02:00

1078 lines
40 KiB
Python
Executable file

#!/usr/bin/python3
# pylint: disable=line-too-long
"""
generate-all-test-cases
Script to generate all image test cases based on distro x arch x image-type
matrix read from `distro-arch-imagetype-map.json` or passed file. One can
filter the matrix just to a subset using `--distro`, `--arch` or
`--image-types` arguments.
The script is intended to be run from the osbuild-composer sources directory
root, for which the image test cases should be (re)generated.
Example (builds rhel-8 qcow2 images on aarch64 s390x ppc64le):
tools/test-case-generators/generate-all-test-cases \
--output=test/data/manifests \
--image-x86_64 ~/Downloads/Images/Fedora-Cloud-Base-33-1.2.x86_64.qcow2 \
--image-ppc64le ~/Downloads/Images/Fedora-Cloud-Base-33-1.2.ppc64le.qcow2 \
--image-aarch64 ~/Downloads/Images/Fedora-Cloud-Base-33-1.2.aarch64.qcow2 \
--image-s390x ~/Downloads/Images/Fedora-Cloud-Base-33-1.2.s390x.qcow2 \
--arch aarch64 s390x ppc64le \
--distro rhel-8 \
--image-types qcow2
The script spins up an ephemeral QEMU VM (called Runner) per each required
architecture. The CWD (sources dir root) is attached to the Runner using
virtfs as readonly and later mounted into /mnt/sources on the Runner.
The 'output' directory is also attached to the Runner using virtfs as r/w
and later mounted into /mnt/output on the Runner. The next execution on
Runners is as follows:
- Wait for the runner to be configured using cloud-init.
- includes installing osbuild, osbuild-composer and golang
- Create /mnt/sources and /mnt/output and mount appropriate devices
- in /mnt/sources execute tools/test-case-generators/generate-test-cases
for each requested distro and image type combination on the particular
architecture. Output manifest is written into /mnt/output
One can use e.q. Fedora cloud qcow2 images:
x86_64: https://download.fedoraproject.org/pub/fedora/linux/releases/33/Cloud/x86_64/images/Fedora-Cloud-Base-33-1.2.x86_64.qcow2
aarch64: https://download.fedoraproject.org/pub/fedora/linux/releases/33/Cloud/aarch64/images/Fedora-Cloud-Base-33-1.2.aarch64.qcow2
ppc64le: https://download.fedoraproject.org/pub/fedora-secondary/releases/33/Cloud/ppc64le/images/Fedora-Cloud-Base-33-1.2.ppc64le.qcow2
s390x: https://download.fedoraproject.org/pub/fedora-secondary/releases/33/Cloud/s390x/images/Fedora-Cloud-Base-33-1.2.s390x.qcow2
aarch64 special note:
make sure to have the *edk2-aarch64* package installed, which provides UEFI
builds for QEMU and AARCH64 (/usr/share/edk2/aarch64/QEMU_EFI.fd)
https://fedoraproject.org/wiki/Architectures/AArch64/Install_with_QEMU
Images need to have enough disk space to be able to build images using
osbuild. You can resize them using 'qemu-img resize <image> 20G' command.
Known issues:
- The tool does not work with RHEL qcow2 images, becuase the "9p" filesystem
is not supported on RHEL.
HW requirements:
- The x86_64 VM uses 1 CPU and 1GB of RAM
- The aarch64, s390x and ppc64le VMs each uses 2CPU and 2GB of RAM
- Unless filtered using `--arch` option, the script starts 4 VMs in parallel
Tested with:
- Fedora 32 (x86_64) and QEMU version 4.2.1
Not tested:
- installation of newer 'osbuild-composer' or 'osbuild' packages from the
local 'osbuild' repository, which is configured by cloud-init. Not sure
how dnf will behave if there are packages for multiple architectures.
"""
import argparse
import subprocess
import json
import os
import tempfile
import shutil
import time
import socket
import contextlib
import multiprocessing
import logging
import glob
import yaml
import paramiko
# setup logging
log = logging.getLogger("generate-all-test-cases")
log.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s [%(levelname)s] - %(processName)s: %(message)s")
sh = logging.StreamHandler()
sh.setFormatter(formatter)
log.addHandler(sh)
# suppress all errors logged by paramiko
paramiko.util.log_to_file(os.devnull)
class RunnerMountPoint:
"""
Data structure to represent basic data used by Runners to attach host
directory as virtfs to the guest and then to mount it.
"""
def __init__(self, src_host, dst_guest, mount_tag, security_model, readonly):
self.src_host = src_host
self.dst_guest = dst_guest
self.security_model = security_model
self.readonly = readonly
self.mount_tag = mount_tag
@staticmethod
def get_default_runner_mount_points(output_dir, sources_dir=None):
"""
Returns a list of default mount points used by Runners when generating
image test cases.
"""
sources_dir = os.getcwd() if sources_dir is None else sources_dir
# Use 'passthrough' security policy for /mnt/sources. The reason is that
# we need it to be exported to the VM without attributes, like symlink
# target, being mapped in xattrs. Otherwise copying the directory
# elsewhere produces errors.
mount_points = [
RunnerMountPoint(sources_dir, "/mnt/sources", "sources", "passthrough", True),
RunnerMountPoint(output_dir, "/mnt/output", "output", "mapped-xattr", False)
]
return mount_points
class BaseRunner(contextlib.AbstractContextManager):
"""
Base class representing a generic runner, which is used for generating image
test case definitions.
"""
def __init__(self, hostname, username="root", port=22):
self.hostname = hostname
self.port = port
self.username = username
self.runner_ready = False
def run_command(self, command):
"""
Runs a given command on the Runner over ssh in a blocking fashion.
Calling this method before is_ready() returned True has undefined
behavior.
Returns stdin, stdout, stderr from the run command.
"""
ssh = paramiko.SSHClient()
# don't ask / fail on unknown remote host fingerprint, just accept any
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(self.hostname, self.port, self.username)
ssh_tansport = ssh.get_transport()
channel = ssh_tansport.open_session()
# don't log commands when the vm is not yet ready for use
if self.runner_ready:
log.debug("Running on runner: '%s'", command)
channel.exec_command(command)
stdout = ""
stderr = ""
# wait for the command to finish
while True:
while channel.recv_ready():
stdout += channel.recv(1024).decode()
while channel.recv_stderr_ready():
stderr += channel.recv_stderr(1024).decode()
if channel.exit_status_ready():
break
time.sleep(0.01)
returncode = channel.recv_exit_status()
except Exception as e:
# don't log errors when vm is not ready yet, because there are many errors
if self.runner_ready:
log.error("Running command over SSH failed: %s", str(e))
raise e
finally:
# closes the underlying transport
ssh.close()
return stdout, stderr, returncode
def run_command_check_call(self, command):
"""
Runs a command on the runner over SSH in a similar fashion as subprocess.check_call()
"""
_, _, ret = self.run_command(command)
if ret != 0:
raise subprocess.CalledProcessError(ret, command)
def wait_until_ready(self, timeout=None, retry_sec=15):
"""
Waits for the runner until it is ready for use. This is determined by
executing the 'is_ready()' method in a blocking fashion.
This method is blocking, unless 'timeout' is provided.
"""
now = time.time()
while not self.is_ready():
if timeout is not None and time.time() > (now + timeout):
raise subprocess.TimeoutExpired("wait_until_ready()", timeout)
time.sleep(retry_sec)
def is_ready(self, command="id"):
"""
Returns True if the runner is ready to be used, which is determined by
running the provided 'command', which must exit with 0 return value.
"""
if self.runner_ready:
return True
try:
# run command to determine if the host is ready for use
self.run_command_check_call(command)
except (paramiko.ChannelException,
paramiko.ssh_exception.NoValidConnectionsError,
paramiko.ssh_exception.SSHException,
EOFError,
socket.timeout,
subprocess.CalledProcessError) as _:
# ignore all reasonable paramiko exceptions, this is useful when the host is still stating up
pass
else:
log.debug("Runner is ready for use")
self.runner_ready = True
return self.runner_ready
class BaseQEMURunner(BaseRunner):
"""
Base class representing a QEMU VM runner, which is used for generating image
test case definitions.
Each architecture-specific runner should inherit from this class and define
QEMU_BIN, QEMU_CMD class variable. These will be used to successfully boot
VM for the given architecture.
"""
# name of the QEMU binary to use for running the VM
QEMU_BIN = None
# the actual command to use for running QEMU VM
QEMU_CMD = None
DEFAULT_CI_USER_DATA = {
"user": "admin",
"sudo": "ALL=(ALL) NOPASSWD:ALL"
}
def __init__(self, image, username, cdrom_iso=None, mount_points=None):
super().__init__("localhost", username)
self._check_qemu_bin()
# path to image to run
self.image = image
# path to cdrom iso to attach (for cloud-init)
self.cdrom_iso = cdrom_iso
# host directories to share with the VM as virtfs devices
self.mount_points = mount_points if mount_points else list()
# Popen object of the qemu process
self.vm = None
# following values are set after the VM is terminated
self.vm_return_code = None
self.vm_stdout = None
self.vm_stderr = None
def _check_qemu_bin(self):
"""
Checks whether QEMU binary used for the particular runner is present
on the system.
"""
try:
subprocess.check_call([self.QEMU_BIN, "--version"])
except subprocess.CalledProcessError as _:
raise RuntimeError("QEMU binary {} not found".format(self.QEMU_BIN))
def _get_qemu_cdrom_option(self):
"""
Get the appropriate options for attaching CDROM device to the VM, if
the path to ISO has been provided.
This method may be reimplemented by architecture specific runner class
if needed. Returns a list of strings to be appended to the QEMU command.
"""
if self.cdrom_iso:
return ["-cdrom", self.cdrom_iso]
return list()
def _get_qemu_boot_image_option(self):
"""
Get the appropriate options for specifying the image to boot from.
This method may be reimplemented by architecture specific runner class
if needed.
Returns a list of strings to be appended to the QEMU command.
"""
return [self.image]
def _get_qemu_ssh_fwd_option(self):
"""
Get the appropriate options for forwarding guest's port 22 to host's
random available port.
"""
# get a random free TCP port. This should work in majority of cases
with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.bind(('localhost', 0))
self.port = sock.getsockname()[1]
return ["-net", "user,hostfwd=tcp::{}-:22".format(self.port)]
def _run_qemu_cmd(self, qemu_cmd):
"""
Assembles the QEMU command to run and executes using subprocess.
"""
# handle CDROM
qemu_cmd.extend(self._get_qemu_cdrom_option())
# handle mount points
for mount_point in self.mount_points:
src_host = mount_point.src_host
tag = mount_point.mount_tag
security_model = mount_point.security_model
readonly = ",readonly" if mount_point.readonly else ""
qemu_cmd.extend(["-virtfs", f"local,path={src_host},mount_tag={tag},security_model={security_model}{readonly}"])
# handle boot image
qemu_cmd.extend(self._get_qemu_boot_image_option())
# handle forwarding of guest's SSH port to host
qemu_cmd.extend(self._get_qemu_ssh_fwd_option())
log.debug("Starting VM using command: '%s'", " ".join(qemu_cmd))
self.vm = subprocess.Popen(
qemu_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
def start(self):
"""
Starts the QEMU process running the VM
"""
if not self.QEMU_CMD:
raise NotImplementedError("The way to spin up QEMU VM is not implemented")
# don't start the qemu process if there is already one running
if self.vm is None:
self._run_qemu_cmd(list(self.QEMU_CMD))
log.info(
"Runner started. You can SSH to it once it has been configured:" + \
"'ssh %s@localhost -p %d'",
self.username,
self.port
)
def stop(self):
"""
Stops the QEMU process running the VM
"""
if self.vm:
self.vm.terminate()
try:
# give the process some time to terminate
self.vm.wait(timeout=15)
except subprocess.TimeoutExpired as _:
self.vm.kill()
self.vm.wait(timeout=15)
if self.vm.stdout:
self.vm_stdout = self.vm.stdout.read().decode()
if self.vm.stderr:
self.vm_stderr = self.vm.stderr.read().decode()
self.vm_return_code = self.vm.returncode
if self.vm_return_code == 0:
log.debug("%s process ended with return code %d\n\n" + \
"stdout:\n%s\nstderr:\n%s", self.QEMU_BIN,
self.vm_return_code, self.vm_stdout, self.vm_stderr)
else:
log.error("%s process ended with return code %d\n\n" + \
"stdout:\n%s\nstderr:\n%s", self.QEMU_BIN,
self.vm_return_code, self.vm_stdout, self.vm_stderr)
self.vm = None
self.runner_ready = False
def is_ready(self, command="ls /var/lib/cloud/instance/boot-finished"):
"""
Returns True if the VM is ready to be used.
VM is ready after the cloud-init setup is finished.
"""
if self.runner_ready:
return True
# check if the runner didn't terminate unexpectedly before being ready
try:
if self.vm:
self.vm.wait(1)
except subprocess.TimeoutExpired as _:
# process still running
pass
else:
# process not running, call .stop() to log stdout, stderr and retcode
self.stop()
qemu_bin = self.QEMU_BIN
raise RuntimeError(f"'{qemu_bin}' process ended before being ready to use")
return super().is_ready(command)
def mount_mount_points(self):
"""
This method mounts the needed mount points on the VM.
It should be called only after is_vm_ready() returned True. Otherwise it will fail.
"""
for mount_point in self.mount_points:
dst_guest = mount_point.dst_guest
mount_tag = mount_point.mount_tag
self.run_command_check_call(f"sudo mkdir {dst_guest}")
#! FIXME: "9p" filesystem is not supported on RHEL!
out, err, ret = self.run_command(f"sudo mount -t 9p -o trans=virtio {mount_tag} {dst_guest} -oversion=9p2000.L")
if ret != 0:
log.error("Mounting '%s' to '%s' failed with retcode: %d\nstdout: %s\nstderr: %s", mount_tag, dst_guest,
ret, out, err)
raise subprocess.CalledProcessError(
ret,
f"sudo mount -t 9p -o trans=virtio {mount_tag} {dst_guest} -oversion=9p2000.L")
def __enter__(self):
self.start()
return self
def __exit__(self, *exc_details):
self.stop()
@classmethod
def create_default_ci_userdata(cls, workdir):
"""
Creates the default 'user-data.yml' file for cloud-init inside the
'workdir'. The path of the created file is returned.
"""
default_ci_userdata_path = f"{workdir}/user-data.yml"
with open(default_ci_userdata_path, "w") as f:
f.write("#cloud-config\n")
yaml.safe_dump(cls.DEFAULT_CI_USER_DATA, f)
return default_ci_userdata_path
@staticmethod
def ci_userdata_add_authorized_ssh_key(userdata_file, ssh_id_file):
"""
Modifies the provided 'userdata_file' in-place by appending the provided
'ssh_id_file' as authorized SSH key to it.
"""
append_data = {}
with open(ssh_id_file, encoding="utf-8") as f:
append_data["ssh_authorized_keys"] = [f.read().strip()]
with open(userdata_file, "a") as f:
yaml.safe_dump(append_data, f, width=float("inf"))
@staticmethod
def prepare_cloud_init_cdrom(ssh_id_file, workdir, userdata=None):
"""
Generates a CDROM ISO used as a data source for cloud-init.
Returns path to the generated CDROM ISO image and path to the used
cloud-init userdata.
"""
iso_path = os.path.join(workdir, "cloudinit.iso")
cidatadir = os.path.join(workdir, "cidata")
user_data_path = os.path.join(cidatadir, "user-data")
meta_data_path = os.path.join(cidatadir, "meta-data")
os.mkdir(cidatadir)
# If no userdata were provided, use the default one
if not userdata:
userdata = BaseQEMURunner.create_default_ci_userdata(workdir)
log.debug("Using default cloud-init user-data created at: %s", userdata)
if os.path.isdir(userdata):
# create a copy of the provided userdata, since they will be modified
userdata_tmp_dir = f"{workdir}/ci_userdata_copy"
os.makedirs(userdata_tmp_dir)
shutil.copytree(userdata, userdata_tmp_dir)
userdata = userdata_tmp_dir
# Add the ssh key to the user-data
userdata_file = f"{userdata}/user-data.yml"
BaseQEMURunner.ci_userdata_add_authorized_ssh_key(userdata_file, ssh_id_file)
with open(user_data_path, "w") as f:
script_dir = os.path.dirname(__file__)
subprocess.check_call(
[os.path.abspath(f"{script_dir}/../gen-user-data"), userdata], stdout=f)
else:
shutil.copy(userdata, user_data_path)
# Add the ssh key to the user-data
BaseQEMURunner.ci_userdata_add_authorized_ssh_key(user_data_path, ssh_id_file)
with open(meta_data_path, "w") as f:
f.write("instance-id: nocloud\nlocal-hostname: vm\n")
sysname = os.uname().sysname
log.debug("Generating CDROM ISO image for cloud-init user data: %s", iso_path)
if sysname == "Linux":
subprocess.check_call(
[
"mkisofs",
"-input-charset", "utf-8",
"-output", iso_path,
"-volid", "cidata",
"-joliet",
"-rock",
"-quiet",
"-graft-points",
user_data_path,
meta_data_path
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
elif sysname == "Darwin":
subprocess.check_call(
[
"hdiutil",
"makehybrid",
"-iso",
"-joliet",
"-o", iso_path,
f"{cidatadir}"
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
else:
raise NotImplementedError(f"Unsupported system '{sysname}' for generating cdrom iso")
return iso_path, userdata
class X86_64QEMURunner(BaseQEMURunner):
"""
VM Runner for x86_64 architecture
"""
QEMU_BIN = "qemu-system-x86_64"
QEMU_CMD = [
QEMU_BIN,
"-M", "accel=kvm:hvf",
"-m", "1024",
"-object", "rng-random,filename=/dev/urandom,id=rng0",
"-device", "virtio-rng-pci,rng=rng0",
"-snapshot",
"-cpu", "max",
"-net", "nic,model=virtio",
]
class Ppc64QEMURunner(BaseQEMURunner):
"""
VM Runner for ppc64le architecture
"""
QEMU_BIN = "qemu-system-ppc64"
QEMU_CMD = [
QEMU_BIN,
"-m", "2048", # RAM
"-smp", "2", # CPUs
"-object", "rng-random,filename=/dev/urandom,id=rng0",
"-device", "virtio-rng-pci,rng=rng0",
"-snapshot",
"-net", "nic,model=virtio",
]
class Aarch64QEMURunner(BaseQEMURunner):
"""
VM Runner for aarch64 architecture
"""
# aarch64 requires UEFI build for QEMU
# https://rwmj.wordpress.com/2015/02/27/how-to-boot-a-fedora-21-aarch64-uefi-guest-on-x86_64/
# https://fedoraproject.org/wiki/Architectures/AArch64/Install_with_QEMU
QEMU_BIN = "qemu-system-aarch64"
QEMU_CMD = [
QEMU_BIN,
"-m", "2048", # RAM
"-smp", "2", # CPUs
"-object", "rng-random,filename=/dev/urandom,id=rng0",
"-device", "virtio-rng-pci,rng=rng0",
"-snapshot",
"-monitor", "none",
"-machine", "virt",
"-cpu", "cortex-a57",
"-bios", "/usr/share/edk2/aarch64/QEMU_EFI.fd", # provided by 'edk2-aarch64' Fedora package
"-net", "nic,model=virtio",
]
class S390xQEMURunner(BaseQEMURunner):
"""
VM Runner for s390x architecture
"""
QEMU_BIN = "qemu-system-s390x"
QEMU_CMD = [
QEMU_BIN,
"-m", "2048", # RAM
"-smp", "2", # CPUs
"-machine", "s390-ccw-virtio",
# disable msa5-base to suppress errors:
# qemu-system-s390x: warning: 'msa5-base' requires 'kimd-sha-512'
# qemu-system-s390x: warning: 'msa5-base' requires 'klmd-sha-512'
"-cpu", "max,msa5-base=no",
"-object", "rng-random,filename=/dev/urandom,id=rng0",
"-device", "virtio-rng-ccw,rng=rng0",
"-monitor", "none",
"-snapshot",
"-net", "nic,model=virtio",
]
def _get_qemu_cdrom_option(self):
"""
Get the appropriate options for attaching CDROM device to the VM, if the path to ISO has been provided.
s390x tries to boot from the CDROM if attached the way as BaseRunner does it.
"""
if self.cdrom_iso:
iso_path = self.cdrom_iso
return list(["-drive", f"file={iso_path},media=cdrom"])
else:
return list()
def _get_qemu_boot_image_option(self):
"""
Get the appropriate options for specifying the image to boot from.
s390x needs to have an explicit 'bootindex' specified.
https://qemu.readthedocs.io/en/latest/system/s390x/bootdevices.html
"""
image_path = self.image
return [
"-drive", f"if=none,id=dr1,file={image_path}",
"-device", "virtio-blk,drive=dr1,bootindex=1"
]
class TestCaseMatrixGenerator(contextlib.AbstractContextManager):
"""
Class representing generation of all test cases based on provided test
cases matrix.
The class should be used as a context manager to ensure that cleanup
of all resources is done (mainly VMs and processes running them).
VM for each architecture is run in a separate process to ensure that
generation is done in parallel.
"""
ARCH_RUNNER_MAP = {
"x86_64": X86_64QEMURunner,
"aarch64": Aarch64QEMURunner,
"ppc64le": Ppc64QEMURunner,
"s390x": S390xQEMURunner
}
# packages to be installed on the Runner before generating test cases
INSTALL_RPMS = [
"osbuild",
"osbuild-selinux",
"osbuild-ostree",
"osbuild-composer",
"golang",
"python3-pyyaml" # needed by image-info
]
def __init__(self, images, arch_gen_matrix, output, keep_image_info, ssh_id_file, ci_userdata=None):
"""
'images' is a dict of qcow2 image paths for each supported architecture,
that should be used for VMs:
{
"arch1": "<image path>",
"arch2": "<image path>",
...
}
'arch_get_matrix' is a dict of requested distro-image_type matrix per architecture:
{
"arch1": {
"distro1": [
"image-type1",
"image-type2"
],
"distro2": [
"image-type2",
"image-type3"
]
},
"arch2": {
"distro2": [
"image-type2"
]
},
...
}
'output' is a directory path, where the generated test case manifests should be stored.
'keep_image_info' specifies whether to pass the '--keep-image-info' option to the 'generate-test-cases' script.
'ssh_id_file' is path to the SSH ID file to use as the authorized key for the QEMU VMs.
'ci_userdata' is path to file / directory containing cloud-init user-data used
for generating CDROM ISO image, that is attached to each VM as a cloud-init data source.
If the value is not provided, then the default internal cloud-init user-data are used.
"""
self._processes = list()
self.images = images
self.arch_gen_matrix = arch_gen_matrix
self.output = output
self.keep_image_info = keep_image_info
self.ssh_id_file = ssh_id_file
self.ci_userdata = ci_userdata
# check that we have image for each needed architecture
for arch in self.arch_gen_matrix.keys():
if self.images.get(arch, None) is None:
raise RuntimeError(f"architecture '{arch}' is in requested test matrix, but no image was provided")
@staticmethod
def runner_function(arch, runner_cls, image, user, cdrom_iso, generation_matrix, output, keep_image_info):
"""
Generate test cases using VM with appropriate architecture.
'generation_matrix' is expected to be already architecture-specific
dict of 'distro' x 'image-type' matrix.
{
"fedora-32": [
"qcow2",
"vmdk"
],
"rhel-84": [
"qcow2",
"tar"
],
...
}
"""
mount_points = RunnerMountPoint.get_default_runner_mount_points(output)
go_tls_timeout_retries = 3
# spin up appropriate VM represented by 'runner'
with runner_cls(image, user, cdrom_iso, mount_points=mount_points) as runner:
log.info("Waiting for the '%s' runner to be configured by cloud-init", arch)
runner.wait_until_ready()
runner.mount_mount_points()
# don't use /var/tmp for osbuild's store directory to prevent systemd from possibly
# removing some of the downloaded RPMs due to "ageing"
guest_osbuild_store_dir = "/home/admin/osbuild-store"
runner.run_command_check_call(f"sudo mkdir {guest_osbuild_store_dir}")
# install necessary packages
runner.run_command_check_call("sudo dnf install -y " + " ".join(TestCaseMatrixGenerator.INSTALL_RPMS))
# Log installed versions of important RPMs
rpm_versions, _, _ = runner.run_command("rpm -q osbuild osbuild-composer")
log.info("Installed packages: %s", " ".join(rpm_versions.split("\n")))
# Workaround the problem that 'image-info' can not read SELinux labels unknown to the host.
# It is not possible to relabel the 'image-info' in the mounted path, because it is read-only.
# Also bind-mounting copy of image-info with proper SELinux labels to /mnt/sources didn't do the trick.
# For the reason above, make a full copy of sources in /home/admin and operate on it instead.
osbuild_label, stderr, retcode = runner.run_command("matchpathcon -n /usr/bin/osbuild")
if retcode:
raise RuntimeError(f"Running 'matchpathcon' on the guest failed. retcode: {retcode}\n\nstderr: {stderr}")
osbuild_label = osbuild_label.strip()
sources_path = "/home/admin/sources"
image_info_guest_path = f"{sources_path}/tools/image-info"
log.info(f"Making copy of sources in '{sources_path}'.")
# exclude test/data/manifests, because it is mounted from the host into /mnt/output and
# has UID and GID set, which does not allow us to access it. And it is not needed
# to generate a test case!
runner.run_command_check_call(f"rsync -a --exclude=test/data/manifests /mnt/sources/ {sources_path}")
runner.run_command_check_call(f"chcon {osbuild_label} {image_info_guest_path}")
for distro, img_type_list in generation_matrix.items():
for image_type in img_type_list:
log.info("Generating test case for '%s' '%s' image on '%s'", distro, image_type, arch)
# is the image with customizations?
if image_type.endswith("-customize"):
with_customizations = True
image_type = image_type.rstrip("-customize")
else:
with_customizations = False
gen_test_cases_cmd = f"cd {sources_path}; sudo tools/test-case-generators/generate-test-cases" + \
f" --distro {distro} --arch {arch} --image-types {image_type}" + \
f" --store {guest_osbuild_store_dir} --output /mnt/output/"
if with_customizations:
gen_test_cases_cmd += " --with-customizations"
if keep_image_info:
gen_test_cases_cmd += " --keep-image-info"
# allow fixed number of retries if the command fails for a specific reason
for i in range(1, go_tls_timeout_retries+1):
if i > 1:
log.info("Retrying image test case generation (%d of %d)", i, go_tls_timeout_retries)
stdout, stderr, retcode = runner.run_command(gen_test_cases_cmd)
if retcode != 0:
log.error("'%s' retcode: %d\nstdout: %s\nstderr: %s", gen_test_cases_cmd, retcode,
stdout, stderr)
# Retry the command, if there was an error due to TLS handshake timeout
# This is happening on all runners using other than host's arch from time to time.
if stderr.find("net/http: TLS handshake timeout") != -1:
continue
else:
log.info("Generating test case for %s-%s-%s - SUCCEEDED\nstdout: %s\nstderr: %s", distro, arch, image_type, stdout, stderr)
# don't retry if the process ended successfully or if there was a different error
break
log.info("'%s' runner finished its work", arch)
# TODO: Possibly remove after testing / fine tuning the script
log.info("Waiting for 1 hour, before terminating the runner (CTRL + c will terminate all VMs)")
time.sleep(3600)
runner.stop()
def generate(self):
"""
Generates all test cases based on provided data
"""
# use the same CDROM ISO image for all VMs
with tempfile.TemporaryDirectory(prefix="osbuild-composer-test-gen-") as tmpdir:
cdrom_iso, used_userdata = BaseQEMURunner.prepare_cloud_init_cdrom(
self.ssh_id_file, tmpdir, self.ci_userdata
)
# Load user from the cloud-init user-data
if os.path.isdir(used_userdata):
user_data_path = f"{used_userdata}/user-data.yml"
else:
user_data_path = used_userdata
with open(user_data_path, "r") as ud:
user_data = yaml.safe_load(ud)
vm_user = user_data["user"]
# Start a separate runner VM for each required architecture
for arch, generation_matrix in self.arch_gen_matrix.items():
process = multiprocessing.Process(
target=self.runner_function,
args=(arch, self.ARCH_RUNNER_MAP[arch], self.images[arch], vm_user, cdrom_iso,
generation_matrix, self.output, self.keep_image_info))
self._processes.append(process)
process.start()
log.info("Started '%s' runner - %s", arch, process.name)
# wait for all processes to finish
log.info("Waiting for all runner processes to finish")
for process in self._processes:
process.join()
self._processes.clear()
def cleanup(self):
"""
Terminates all running processes of VM runners.
"""
# ensure that all processes running VMs are stopped
for process in self._processes:
process.terminate()
process.join(5)
# kill the process if it didn't terminate yet
if process.exitcode is None:
process.kill()
process.close()
self._processes.clear()
def __exit__(self, *exc_details):
self.cleanup()
def get_default_ssh_id_file():
"""
Returns the path of the default SSH ID file to use.
The defalt SSH ID file is the most recent file that matches: ~/.ssh/id*.pub,
(excluding those that match ~/.ssh/*-cert.pub). This mimics the bahaviour
of the 'ssh-copy-id' command.
"""
id_files = glob.glob(os.path.expanduser("~/.ssh/id*.pub"))
id_files = [f for f in id_files if not f.endswith("-cert.pub")]
try:
most_recent_file = id_files[0]
except IndexError as e:
raise RuntimeError("Found no files matching '~/.ssh/id*.pub'") from e
most_recent_file_mtime = os.path.getmtime(most_recent_file)
for id_file in id_files[1:]:
id_file_mtime = os.path.getmtime(id_file)
if most_recent_file_mtime < id_file_mtime:
most_recent_file = id_file
most_recent_file_mtime = id_file_mtime
return most_recent_file
def get_args():
"""
Returns ArgumentParser instance specific to this script.
"""
parser = argparse.ArgumentParser(description="(re)generate image all test cases")
parser.add_argument(
"--image-x86_64",
help="Path to x86_64 image to use for QEMU VM",
required=False
)
parser.add_argument(
"--image-ppc64le",
help="Path to ppc64le image to use for QEMU VM",
required=False
)
parser.add_argument(
"--image-aarch64",
help="Path to aarch64 image to use for QEMU VM",
required=False
)
parser.add_argument(
"--image-s390x",
help="Path to s390x image to use for QEMU VM",
required=False
)
parser.add_argument(
"--distro",
help="Filters the matrix for generation only to specified distro",
nargs='*',
required=False
)
parser.add_argument(
"--arch",
help="Filters the matrix for generation only to specified architecture",
nargs='*',
required=False
)
parser.add_argument(
"--image-types",
help="Filters the matrix for generation only to specified image types",
nargs='*',
required=False
)
parser.add_argument(
"--keep-image-info",
action='store_true',
help="Skip image info (re)generation, but keep the one found in the existing test case"
)
parser.add_argument(
"--output",
metavar="OUTPUT_DIRECTORY",
type=os.path.abspath,
help="Path to the output directory, where to store resulting manifests for image test cases",
required=True
)
parser.add_argument(
"--gen-matrix-file",
help="Path to JSON file from which to read the test case generation matrix (distro x arch x image type)." + \
" If not provided, '<script_location_dir>/distro-arch-imagetype-map.json' is read.",
type=os.path.abspath
)
parser.add_argument(
"--ci-userdata",
help="Path to a file/directory with cloud-init user-data, which should be used to configure runner VMs",
type=os.path.abspath
)
parser.add_argument(
"-i", "--ssh-id-file",
help="Path to the SSH ID file to use for authenticating to the runner VMs. If the file does not end with " + \
".pub, it will be appended to it.",
type=os.path.abspath
)
parser.add_argument(
"-d", "--debug",
action='store_true',
default=False,
help="turn on debug logging"
)
return parser.parse_args()
# pylint: disable=too-many-arguments,too-many-locals
def main(vm_images, distros, arches, image_types, ssh_id_file, ci_userdata, gen_matrix_file, output, keep_image_info):
if not os.path.isdir(output):
raise RuntimeError(f"output directory {output} does not exist")
script_dir = os.path.dirname(__file__)
gen_matrix_path = gen_matrix_file if gen_matrix_file else f"{script_dir}/distro-arch-imagetype-map.json"
log.info("Loading generation matrix from file: '%s'", gen_matrix_path)
with open(gen_matrix_path, "r") as gen_matrix_json:
gen_matrix_dict = json.load(gen_matrix_json)
# Filter generation matrix based on passed arguments
for distro in list(gen_matrix_dict.keys()):
# filter the distros list
if distros and distro not in distros:
del gen_matrix_dict[distro]
continue
for arch in list(gen_matrix_dict[distro].keys()):
# filter the arches list of a distro
if arches and arch not in arches:
del gen_matrix_dict[distro][arch]
continue
# filter the image types of a distro and arch
if image_types:
gen_matrix_dict[distro][arch] = list(filter(lambda x: x in image_types, gen_matrix_dict[distro][arch]))
# delete the whole arch if there is no image type left after filtering
if len(gen_matrix_dict[distro][arch]) == 0:
del gen_matrix_dict[distro][arch]
log.debug("gen_matrix_dict:\n%s", json.dumps(gen_matrix_dict, indent=2, sort_keys=True))
# Construct per-architecture matrix dictionary of distro x image type
arch_gen_matrix_dict = dict()
for distro, arches in gen_matrix_dict.items():
for arch, image_types in arches.items():
try:
arch_dict = arch_gen_matrix_dict[arch]
except KeyError as _:
arch_dict = arch_gen_matrix_dict[arch] = dict()
arch_dict[distro] = image_types.copy()
log.debug("arch_gen_matrix_dict:\n%s", json.dumps(arch_gen_matrix_dict, indent=2, sort_keys=True))
# determine the SSH ID file to be used
ssh_id_file = args.ssh_id_file
if not ssh_id_file:
ssh_id_file = get_default_ssh_id_file()
if not ssh_id_file.endswith(".pub"):
ssh_id_file += ".pub"
log.debug("Using SSH ID file: %s", ssh_id_file)
with TestCaseMatrixGenerator(vm_images, arch_gen_matrix_dict, output, keep_image_info, ssh_id_file, ci_userdata) as generator:
generator.generate()
if __name__ == '__main__':
args = get_args()
if args.debug:
log.setLevel(logging.DEBUG)
vm_images = {
"x86_64": args.image_x86_64,
"aarch64": args.image_aarch64,
"ppc64le": args.image_ppc64le,
"s390x": args.image_s390x
}
try:
main(
vm_images,
args.distro,
args.arch,
args.image_types,
args.ssh_id_file,
args.ci_userdata,
args.gen_matrix_file,
args.output,
args.keep_image_info
)
except KeyboardInterrupt as _:
log.info("Interrupted by user")