tools: extract make_dnf_scafolding() from config_combos()

Extract make_dnf_scafolding as a helper, mostly so that the config_combos()
function is easier to read. It seems one core concept here is the iteration
of "combo[0]" and "combo[1]" so having them symetrical at the same indent
level feel easier to read to me.
This commit is contained in:
Michael Vogt 2024-04-11 09:59:48 +02:00 committed by Achilleas Koutsou
parent a4dfd2614f
commit 0ef3459155

View file

@ -2,6 +2,7 @@ import configparser
import importlib
import json
import os
import pathlib
import socket
import subprocess as sp
from tempfile import TemporaryDirectory
@ -182,7 +183,23 @@ test_cases = [
]
def config_combos(servers):
def make_dnf_scafolding(base_dir):
root_dir = pathlib.Path(TemporaryDirectory(dir=base_dir).name)
repos_dir = root_dir / "etc/yum.repos.d"
repos_dir.mkdir(parents=True)
keys_dir = root_dir / "etc/pki/rpm-gpg"
keys_dir.mkdir(parents=True)
vars_dir = root_dir / "etc/dnf/vars"
vars_dir.mkdir(parents=True)
vars_path = vars_dir / "customvar"
vars_path.write_text(CUSTOMVAR, encoding="utf8")
return root_dir, repos_dir, keys_dir
def config_combos(tmp_path, servers):
"""
Return all configurations for the provided repositories, either as config files in a directory or as repository
configs in the depsolve request, or a combination of both.
@ -207,43 +224,31 @@ def config_combos(servers):
"rhsm": False,
"gpgkeys": [TEST_KEY + server["name"]],
})
with TemporaryDirectory() as root_dir:
repos_dir = os.path.join(root_dir, "etc/yum.repos.d")
os.makedirs(repos_dir)
keys_dir = os.path.join(root_dir, "etc/pki/rpm-gpg")
os.makedirs(keys_dir)
vars_dir = os.path.join(root_dir, "etc/dnf/vars")
os.makedirs(vars_dir)
root_dir, repos_dir, keys_dir = make_dnf_scafolding(tmp_path)
for idx in combo[1]: # servers to be configured through root_dir
server = servers[idx]
name = server["name"]
# Use the gpgkey to test both the key reading and the variable substitution.
# For this test, it doesn't need to be a real key.
key_url = f"file:///etc/pki/rpm-gpg/RPM-GPG-KEY-$releasever-$basearch-$customvar-{name}"
vars_path = os.path.join(vars_dir, "customvar")
with open(vars_path, "w", encoding="utf-8") as vars_file:
vars_file.write(CUSTOMVAR)
key_path = keys_dir / f"RPM-GPG-KEY-{RELEASEVER}-{ARCH}-{CUSTOMVAR}-{name}"
key_path.write_text(TEST_KEY + name, encoding="utf8")
parser = configparser.ConfigParser()
parser.add_section(name)
# Set some options in a specific order in which they tend to be
# written in repo files.
parser.set(name, "name", name)
parser.set(name, "baseurl", server["address"])
parser.set(name, "enabled", "1")
parser.set(name, "gpgcheck", "1")
parser.set(name, "sslverify", "0")
parser.set(name, "gpgkey", key_url)
for idx in combo[1]: # servers to be configured through root_dir
server = servers[idx]
name = server["name"]
# Use the gpgkey to test both the key reading and the variable substitution.
# For this test, it doesn't need to be a real key.
key_url = f"file:///etc/pki/rpm-gpg/RPM-GPG-KEY-$releasever-$basearch-$customvar-{name}"
with (repos_dir / f"{name}.repo").open("w", encoding="utf-8") as fp:
parser.write(fp, space_around_delimiters=False)
key_path = os.path.join(keys_dir, f"RPM-GPG-KEY-{RELEASEVER}-{ARCH}-{CUSTOMVAR}-{name}")
with open(key_path, "w", encoding="utf-8") as key_file:
key_file.write(TEST_KEY + name)
parser = configparser.ConfigParser()
parser.add_section(name)
# Set some options in a specific order in which they tend to be
# written in repo files.
parser.set(name, "name", name)
parser.set(name, "baseurl", server["address"])
parser.set(name, "enabled", "1")
parser.set(name, "gpgcheck", "1")
parser.set(name, "sslverify", "0")
parser.set(name, "gpgkey", key_url)
with open(f"{repos_dir}/{name}.repo", "w", encoding="utf-8") as repo_file:
parser.write(repo_file, space_around_delimiters=False)
yield repo_configs, root_dir
yield repo_configs, os.fspath(root_dir)
@pytest.fixture(name="cache_dir", scope="session")
@ -252,10 +257,10 @@ def cache_dir_fixture(tmpdir_factory):
@pytest.mark.parametrize("test_case", test_cases)
def test_depsolve(repo_servers, test_case, cache_dir):
def test_depsolve(tmp_path, repo_servers, test_case, cache_dir):
pks = test_case["packages"]
for repo_configs, root_dir in config_combos(repo_servers):
for repo_configs, root_dir in config_combos(tmp_path, repo_servers):
res = depsolve(pks, repo_configs, root_dir, cache_dir, "./tools/osbuild-depsolve-dnf")
assert {pkg["name"] for pkg in res["packages"]} == test_case["results"]["packages"]
assert res["repos"].keys() == test_case["results"]["reponames"]
@ -265,10 +270,10 @@ def test_depsolve(repo_servers, test_case, cache_dir):
@pytest.mark.skipif(not has_dnf5(), reason="libdnf5 not available")
@pytest.mark.parametrize("test_case", test_cases)
def test_depsolve_dnf5(repo_servers, test_case, cache_dir):
def test_depsolve_dnf5(tmp_path, repo_servers, test_case, cache_dir):
pks = test_case["packages"]
for repo_configs, repos_dir in config_combos(repo_servers):
for repo_configs, repos_dir in config_combos(tmp_path, repo_servers):
res = depsolve(pks, repo_configs, repos_dir, cache_dir, "./tools/osbuild-depsolve-dnf5")
assert {pkg["name"] for pkg in res["packages"]} == test_case["results"]["packages"]
assert res["repos"].keys() == test_case["results"]["reponames"]