tools/test/depsolve: factor out test case repo servers filtering
Extract the code that filters and composes repo servers for a test case into a separate function. This enables reusing it in all places that did the same thing. The problem would get more prominent as we would separate some test scenarios into separate test cases. Signed-off-by: Tomáš Hozza <thozza@redhat.com>
This commit is contained in:
parent
3743654178
commit
52d027393c
1 changed files with 68 additions and 27 deletions
|
|
@ -1362,6 +1362,65 @@ def config_combos(tmp_path, servers):
|
|||
yield repo_configs, root_dir, opt_metadata
|
||||
|
||||
|
||||
def get_test_case_repo_servers(test_case, repo_servers):
|
||||
"""
|
||||
Return a list of repository servers for the test case.
|
||||
"""
|
||||
repo_servers_copy = repo_servers.copy()
|
||||
# filter to only include enabled repositories
|
||||
repo_servers_copy = [r for r in repo_servers_copy if r["name"] in test_case["enabled_repos"]]
|
||||
repo_servers_copy.extend(test_case.get("additional_servers", []))
|
||||
return repo_servers_copy
|
||||
|
||||
|
||||
@pytest.mark.parametrize("test_case,repo_servers,expected", [
|
||||
(
|
||||
{"enabled_repos": ["baseos", "custom"], "additional_servers": []},
|
||||
[{"name": "baseos", "address": "file:///baseos"}, {"name": "custom", "address": "file:///custom"}],
|
||||
[{"name": "baseos", "address": "file:///baseos"}, {"name": "custom", "address": "file:///custom"}]
|
||||
),
|
||||
(
|
||||
{"enabled_repos": ["baseos"], "additional_servers": []},
|
||||
[{"name": "baseos", "address": "file:///baseos"}, {"name": "custom", "address": "file:///custom"}],
|
||||
[{"name": "baseos", "address": "file:///baseos"}]
|
||||
),
|
||||
(
|
||||
{
|
||||
"enabled_repos": ["baseos", "custom"],
|
||||
"additional_servers": [{"name": "broken", "address": "file:///broken"}]
|
||||
},
|
||||
[{"name": "baseos", "address": "file:///baseos"}, {"name": "custom", "address": "file:///custom"}],
|
||||
[
|
||||
{"name": "baseos", "address": "file:///baseos"},
|
||||
{"name": "custom", "address": "file:///custom"},
|
||||
{"name": "broken", "address": "file:///broken"},
|
||||
]
|
||||
),
|
||||
(
|
||||
{
|
||||
"enabled_repos": ["baseos"], "additional_servers": [{"name": "broken", "address": "file:///broken"}]
|
||||
},
|
||||
[{"name": "baseos", "address": "file:///baseos"}, {"name": "custom", "address": "file:///custom"}],
|
||||
[
|
||||
{"name": "baseos", "address": "file:///baseos"},
|
||||
{"name": "broken", "address": "file:///broken"},
|
||||
]
|
||||
),
|
||||
(
|
||||
{
|
||||
"enabled_repos": [],
|
||||
"additional_servers": [{"name": "broken", "address": "file:///broken"}]
|
||||
},
|
||||
[{"name": "baseos", "address": "file:///baseos"}, {"name": "custom", "address": "file:///custom"}],
|
||||
[
|
||||
{"name": "broken", "address": "file:///broken"},
|
||||
]
|
||||
),
|
||||
])
|
||||
def test_get_test_case_repo_servers(test_case, repo_servers, expected):
|
||||
assert get_test_case_repo_servers(test_case, repo_servers) == expected
|
||||
|
||||
|
||||
# pylint: disable=too-many-branches
|
||||
@pytest.mark.parametrize("test_case", depsolve_test_cases, ids=tcase_idfn)
|
||||
@pytest.mark.parametrize("with_sbom", [False, True])
|
||||
|
|
@ -1390,15 +1449,9 @@ def test_depsolve(tmp_path, repo_servers, dnf_config, detect_fn, with_sbom, test
|
|||
|
||||
transactions = test_case["transactions"]
|
||||
|
||||
repo_servers_copy = repo_servers.copy()
|
||||
tc_repo_servers = get_test_case_repo_servers(test_case, repo_servers)
|
||||
|
||||
# filter to only include enabled repositories
|
||||
repo_servers_copy = [r for r in repo_servers_copy if r["name"] in test_case["enabled_repos"]]
|
||||
|
||||
if "additional_servers" in test_case:
|
||||
repo_servers_copy.extend(test_case["additional_servers"])
|
||||
|
||||
for repo_configs, root_dir, opt_metadata in config_combos(tmp_path, repo_servers_copy):
|
||||
for repo_configs, root_dir, opt_metadata in config_combos(tmp_path, tc_repo_servers):
|
||||
with TemporaryDirectory() as cache_dir:
|
||||
res, exit_code = depsolve(
|
||||
transactions, cache_dir, dnf_config, repo_configs, root_dir, opt_metadata, with_sbom)
|
||||
|
|
@ -1438,7 +1491,7 @@ def test_depsolve(tmp_path, repo_servers, dnf_config, detect_fn, with_sbom, test
|
|||
# *filelists*
|
||||
n_filelist_files = len(glob(f"{cache_dir}/*/repodata/*filelists*"))
|
||||
if "filelists" in opt_metadata:
|
||||
assert n_filelist_files == len(repo_servers_copy)
|
||||
assert n_filelist_files == len(tc_repo_servers)
|
||||
else:
|
||||
assert n_filelist_files == 0
|
||||
|
||||
|
|
@ -1461,15 +1514,9 @@ def test_dump(tmp_path, repo_servers, dnf_config, detect_fn, test_case):
|
|||
except RuntimeError as e:
|
||||
pytest.skip(str(e))
|
||||
|
||||
repo_servers_copy = repo_servers.copy()
|
||||
tc_repo_servers = get_test_case_repo_servers(test_case, repo_servers)
|
||||
|
||||
# filter to only include enabled repositories
|
||||
repo_servers_copy = [r for r in repo_servers_copy if r["name"] in test_case["enabled_repos"]]
|
||||
|
||||
if "additional_servers" in test_case:
|
||||
repo_servers_copy.extend(test_case["additional_servers"])
|
||||
|
||||
for repo_configs, root_dir, opt_metadata in config_combos(tmp_path, repo_servers_copy):
|
||||
for repo_configs, root_dir, opt_metadata in config_combos(tmp_path, tc_repo_servers):
|
||||
with TemporaryDirectory() as cache_dir:
|
||||
res, exit_code = dump(cache_dir, dnf_config, repo_configs, root_dir, opt_metadata)
|
||||
|
||||
|
|
@ -1495,7 +1542,7 @@ def test_dump(tmp_path, repo_servers, dnf_config, detect_fn, test_case):
|
|||
# *filelists*
|
||||
n_filelist_files = len(glob(f"{cache_dir}/*/repodata/*filelists*"))
|
||||
if "filelists" in opt_metadata:
|
||||
assert n_filelist_files == len(repo_servers_copy)
|
||||
assert n_filelist_files == len(tc_repo_servers)
|
||||
else:
|
||||
assert n_filelist_files == 0
|
||||
|
||||
|
|
@ -1512,17 +1559,11 @@ def test_search(tmp_path, repo_servers, dnf_config, detect_fn, test_case):
|
|||
except RuntimeError as e:
|
||||
pytest.skip(str(e))
|
||||
|
||||
repo_servers_copy = repo_servers.copy()
|
||||
|
||||
# filter to only include enabled repositories
|
||||
repo_servers_copy = [r for r in repo_servers_copy if r["name"] in test_case["enabled_repos"]]
|
||||
|
||||
if "additional_servers" in test_case:
|
||||
repo_servers_copy.extend(test_case["additional_servers"])
|
||||
tc_repo_servers = get_test_case_repo_servers(test_case, repo_servers)
|
||||
|
||||
search_args = test_case["search_args"]
|
||||
|
||||
for repo_configs, root_dir, opt_metadata in config_combos(tmp_path, repo_servers_copy):
|
||||
for repo_configs, root_dir, opt_metadata in config_combos(tmp_path, tc_repo_servers):
|
||||
with TemporaryDirectory() as cache_dir:
|
||||
res, exit_code = search(search_args, cache_dir, dnf_config, repo_configs, root_dir, opt_metadata)
|
||||
|
||||
|
|
@ -1539,7 +1580,7 @@ def test_search(tmp_path, repo_servers, dnf_config, detect_fn, test_case):
|
|||
# *filelists*
|
||||
n_filelist_files = len(glob(f"{cache_dir}/*/repodata/*filelists*"))
|
||||
if "filelists" in opt_metadata:
|
||||
assert n_filelist_files == len(repo_servers_copy)
|
||||
assert n_filelist_files == len(tc_repo_servers)
|
||||
else:
|
||||
assert n_filelist_files == 0
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue