tools/test/depsolve: don't always pass repos, root_dir and opt_metadata
Modify the `config_combos()` to return `repo_configs` and `root_dir` only if it should be really used. Otherwise, return `None`. Modify all helper functions for dnf-depsolve API calls to add relevant fields to the request JSON, only if the relevant values are set. This makes the test cleaner, since previously, the `root_dir` was always set. The same applies to `dnf_config`, which could be set to `None` already, so let's make it optional. Signed-off-by: Tomáš Hozza <thozza@redhat.com>
This commit is contained in:
parent
a76d3e406b
commit
e635a8a067
1 changed files with 77 additions and 46 deletions
|
|
@ -40,7 +40,11 @@ def assert_dnf():
|
|||
raise RuntimeError("Cannot import dnf")
|
||||
|
||||
|
||||
def depsolve(transactions, repos, root_dir, cache_dir, dnf_config, opt_metadata, with_sbom=False) -> Tuple[dict, int]:
|
||||
def depsolve(transactions, cache_dir, dnf_config=None, repos=None, root_dir=None,
|
||||
opt_metadata=None, with_sbom=False) -> Tuple[dict, int]:
|
||||
if not repos and not root_dir:
|
||||
raise ValueError("At least one of 'repos' or 'root_dir' must be specified")
|
||||
|
||||
req = {
|
||||
"command": "depsolve",
|
||||
"arch": ARCH,
|
||||
|
|
@ -48,13 +52,19 @@ def depsolve(transactions, repos, root_dir, cache_dir, dnf_config, opt_metadata,
|
|||
"releasever": RELEASEVER,
|
||||
"cachedir": cache_dir,
|
||||
"arguments": {
|
||||
"root_dir": root_dir,
|
||||
"repos": repos,
|
||||
"optional-metadata": opt_metadata,
|
||||
"transactions": transactions,
|
||||
}
|
||||
}
|
||||
|
||||
if repos:
|
||||
req["arguments"]["repos"] = repos
|
||||
|
||||
if root_dir:
|
||||
req["arguments"]["root_dir"] = root_dir
|
||||
|
||||
if opt_metadata:
|
||||
req["arguments"]["optional-metadata"] = opt_metadata
|
||||
|
||||
if with_sbom:
|
||||
req["arguments"]["sbom"] = {"type": "spdx"}
|
||||
|
||||
|
|
@ -72,20 +82,28 @@ def depsolve(transactions, repos, root_dir, cache_dir, dnf_config, opt_metadata,
|
|||
return json.loads(p.stdout), p.returncode
|
||||
|
||||
|
||||
def dump(repos, root_dir, cache_dir, dnf_config, opt_metadata) -> Tuple[dict, int]:
|
||||
def dump(cache_dir, dnf_config, repos=None, root_dir=None, opt_metadata=None) -> Tuple[dict, int]:
|
||||
if not repos and not root_dir:
|
||||
raise ValueError("At least one of 'repos' or 'root_dir' must be specified")
|
||||
|
||||
req = {
|
||||
"command": "dump",
|
||||
"arch": ARCH,
|
||||
"module_platform_id": f"platform:el{RELEASEVER}",
|
||||
"releasever": RELEASEVER,
|
||||
"cachedir": cache_dir,
|
||||
"arguments": {
|
||||
"root_dir": root_dir,
|
||||
"repos": repos,
|
||||
"optional-metadata": opt_metadata,
|
||||
}
|
||||
"arguments": {}
|
||||
}
|
||||
|
||||
if repos:
|
||||
req["arguments"]["repos"] = repos
|
||||
|
||||
if root_dir:
|
||||
req["arguments"]["root_dir"] = root_dir
|
||||
|
||||
if opt_metadata:
|
||||
req["arguments"]["optional-metadata"] = opt_metadata
|
||||
|
||||
# If there is a config file, write it to a temporary file and pass it to the depsolver
|
||||
with TemporaryDirectory() as cfg_dir:
|
||||
env = None
|
||||
|
|
@ -100,7 +118,10 @@ def dump(repos, root_dir, cache_dir, dnf_config, opt_metadata) -> Tuple[dict, in
|
|||
return json.loads(p.stdout), p.returncode
|
||||
|
||||
|
||||
def search(search_args, repos, root_dir, cache_dir, dnf_config, opt_metadata) -> Tuple[dict, int]:
|
||||
def search(search_args, cache_dir, dnf_config, repos=None, root_dir=None, opt_metadata=None) -> Tuple[dict, int]:
|
||||
if not repos and not root_dir:
|
||||
raise ValueError("At least one of 'repos' or 'root_dir' must be specified")
|
||||
|
||||
req = {
|
||||
"command": "search",
|
||||
"arch": ARCH,
|
||||
|
|
@ -109,12 +130,18 @@ def search(search_args, repos, root_dir, cache_dir, dnf_config, opt_metadata) ->
|
|||
"cachedir": cache_dir,
|
||||
"arguments": {
|
||||
"search": search_args,
|
||||
"root_dir": root_dir,
|
||||
"repos": repos,
|
||||
"optional-metadata": opt_metadata,
|
||||
}
|
||||
}
|
||||
|
||||
if repos:
|
||||
req["arguments"]["repos"] = repos
|
||||
|
||||
if root_dir:
|
||||
req["arguments"]["root_dir"] = root_dir
|
||||
|
||||
if opt_metadata:
|
||||
req["arguments"]["optional-metadata"] = opt_metadata
|
||||
|
||||
# If there is a config file, write it to a temporary file and pass it to the depsolver
|
||||
with TemporaryDirectory() as cfg_dir:
|
||||
env = None
|
||||
|
|
@ -1296,38 +1323,43 @@ def config_combos(tmp_path, servers):
|
|||
configs in the depsolve request, or a combination of both.
|
||||
"""
|
||||
for combo in gen_config_combos(len(servers)):
|
||||
repo_configs = []
|
||||
for idx in combo[0]: # servers to be configured through request
|
||||
server = servers[idx]
|
||||
repo_configs.append(gen_repo_config(server))
|
||||
repo_configs = None
|
||||
if len(combo[0]):
|
||||
repo_configs = []
|
||||
for idx in combo[0]: # servers to be configured through request
|
||||
server = servers[idx]
|
||||
repo_configs.append(gen_repo_config(server))
|
||||
|
||||
root_dir, repos_dir, keys_dir = make_dnf_scafolding(tmp_path)
|
||||
for idx in combo[1]: # servers to be configured through root_dir
|
||||
server = servers[idx]
|
||||
name = server["name"]
|
||||
# Use the gpgkey to test both the key reading and the variable substitution.
|
||||
# For this test, it doesn't need to be a real key.
|
||||
key_url = f"file:///etc/pki/rpm-gpg/RPM-GPG-KEY-$releasever-$basearch-$customvar-{name}"
|
||||
root_dir = None
|
||||
if len(combo[1]):
|
||||
root_dir, repos_dir, keys_dir = make_dnf_scafolding(tmp_path)
|
||||
for idx in combo[1]: # servers to be configured through root_dir
|
||||
server = servers[idx]
|
||||
name = server["name"]
|
||||
# Use the gpgkey to test both the key reading and the variable substitution.
|
||||
# For this test, it doesn't need to be a real key.
|
||||
key_url = f"file:///etc/pki/rpm-gpg/RPM-GPG-KEY-$releasever-$basearch-$customvar-{name}"
|
||||
|
||||
key_path = keys_dir / f"RPM-GPG-KEY-{RELEASEVER}-{ARCH}-{CUSTOMVAR}-{name}"
|
||||
key_path.write_text(TEST_KEY + name, encoding="utf8")
|
||||
parser = configparser.ConfigParser()
|
||||
parser.add_section(name)
|
||||
# Set some options in a specific order in which they tend to be
|
||||
# written in repo files.
|
||||
parser.set(name, "name", name)
|
||||
parser.set(name, "baseurl", server["address"])
|
||||
parser.set(name, "enabled", "1")
|
||||
parser.set(name, "gpgcheck", "1")
|
||||
parser.set(name, "sslverify", "0")
|
||||
parser.set(name, "gpgkey", key_url)
|
||||
key_path = keys_dir / f"RPM-GPG-KEY-{RELEASEVER}-{ARCH}-{CUSTOMVAR}-{name}"
|
||||
key_path.write_text(TEST_KEY + name, encoding="utf8")
|
||||
parser = configparser.ConfigParser()
|
||||
parser.add_section(name)
|
||||
# Set some options in a specific order in which they tend to be
|
||||
# written in repo files.
|
||||
parser.set(name, "name", name)
|
||||
parser.set(name, "baseurl", server["address"])
|
||||
parser.set(name, "enabled", "1")
|
||||
parser.set(name, "gpgcheck", "1")
|
||||
parser.set(name, "sslverify", "0")
|
||||
parser.set(name, "gpgkey", key_url)
|
||||
|
||||
with (repos_dir / f"{name}.repo").open("w", encoding="utf-8") as fp:
|
||||
parser.write(fp, space_around_delimiters=False)
|
||||
with (repos_dir / f"{name}.repo").open("w", encoding="utf-8") as fp:
|
||||
parser.write(fp, space_around_delimiters=False)
|
||||
root_dir = os.fspath(root_dir)
|
||||
|
||||
# for each combo, let's also enable or disable filelists (optional-metadata)
|
||||
for opt_metadata in ([], ["filelists"]):
|
||||
yield repo_configs, os.fspath(root_dir), opt_metadata
|
||||
yield repo_configs, root_dir, opt_metadata
|
||||
|
||||
|
||||
# pylint: disable=too-many-branches
|
||||
|
|
@ -1368,8 +1400,8 @@ def test_depsolve(tmp_path, repo_servers, dnf_config, detect_fn, with_sbom, test
|
|||
|
||||
for repo_configs, root_dir, opt_metadata in config_combos(tmp_path, repo_servers_copy):
|
||||
with TemporaryDirectory() as cache_dir:
|
||||
res, exit_code = depsolve(transactions, repo_configs, root_dir,
|
||||
cache_dir, dnf_config, opt_metadata, with_sbom)
|
||||
res, exit_code = depsolve(
|
||||
transactions, cache_dir, dnf_config, repo_configs, root_dir, opt_metadata, with_sbom)
|
||||
|
||||
if test_case.get("error", False):
|
||||
assert exit_code != 0
|
||||
|
|
@ -1439,7 +1471,7 @@ def test_dump(tmp_path, repo_servers, dnf_config, detect_fn, test_case):
|
|||
|
||||
for repo_configs, root_dir, opt_metadata in config_combos(tmp_path, repo_servers_copy):
|
||||
with TemporaryDirectory() as cache_dir:
|
||||
res, exit_code = dump(repo_configs, root_dir, cache_dir, dnf_config, opt_metadata)
|
||||
res, exit_code = dump(cache_dir, dnf_config, repo_configs, root_dir, opt_metadata)
|
||||
|
||||
if test_case.get("error", False):
|
||||
assert exit_code != 0
|
||||
|
|
@ -1492,7 +1524,7 @@ def test_search(tmp_path, repo_servers, dnf_config, detect_fn, test_case):
|
|||
|
||||
for repo_configs, root_dir, opt_metadata in config_combos(tmp_path, repo_servers_copy):
|
||||
with TemporaryDirectory() as cache_dir:
|
||||
res, exit_code = search(search_args, repo_configs, root_dir, cache_dir, dnf_config, opt_metadata)
|
||||
res, exit_code = search(search_args, cache_dir, dnf_config, repo_configs, root_dir, opt_metadata)
|
||||
|
||||
if test_case.get("error", False):
|
||||
assert exit_code != 0
|
||||
|
|
@ -1544,8 +1576,7 @@ def test_depsolve_result_api(tmp_path, repo_servers, dnf_config, detect_fn):
|
|||
opt_metadata = []
|
||||
with_sbom = False
|
||||
|
||||
res, exit_code = depsolve(transactions, repo_configs, root_dir,
|
||||
cache_dir, dnf_config, opt_metadata, with_sbom)
|
||||
res, exit_code = depsolve(transactions, cache_dir, dnf_config, repo_configs, root_dir, opt_metadata, with_sbom)
|
||||
|
||||
assert exit_code == 0
|
||||
# If any of this changes, increase:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue