diff --git a/tools/test/test_depsolve.py b/tools/test/test_depsolve.py index 68a8d5fd..e1a18a7b 100644 --- a/tools/test/test_depsolve.py +++ b/tools/test/test_depsolve.py @@ -1362,6 +1362,65 @@ def config_combos(tmp_path, servers): yield repo_configs, root_dir, opt_metadata +def get_test_case_repo_servers(test_case, repo_servers): + """ + Return a list of repository servers for the test case. + """ + repo_servers_copy = repo_servers.copy() + # filter to only include enabled repositories + repo_servers_copy = [r for r in repo_servers_copy if r["name"] in test_case["enabled_repos"]] + repo_servers_copy.extend(test_case.get("additional_servers", [])) + return repo_servers_copy + + +@pytest.mark.parametrize("test_case,repo_servers,expected", [ + ( + {"enabled_repos": ["baseos", "custom"], "additional_servers": []}, + [{"name": "baseos", "address": "file:///baseos"}, {"name": "custom", "address": "file:///custom"}], + [{"name": "baseos", "address": "file:///baseos"}, {"name": "custom", "address": "file:///custom"}] + ), + ( + {"enabled_repos": ["baseos"], "additional_servers": []}, + [{"name": "baseos", "address": "file:///baseos"}, {"name": "custom", "address": "file:///custom"}], + [{"name": "baseos", "address": "file:///baseos"}] + ), + ( + { + "enabled_repos": ["baseos", "custom"], + "additional_servers": [{"name": "broken", "address": "file:///broken"}] + }, + [{"name": "baseos", "address": "file:///baseos"}, {"name": "custom", "address": "file:///custom"}], + [ + {"name": "baseos", "address": "file:///baseos"}, + {"name": "custom", "address": "file:///custom"}, + {"name": "broken", "address": "file:///broken"}, + ] + ), + ( + { + "enabled_repos": ["baseos"], "additional_servers": [{"name": "broken", "address": "file:///broken"}] + }, + [{"name": "baseos", "address": "file:///baseos"}, {"name": "custom", "address": "file:///custom"}], + [ + {"name": "baseos", "address": "file:///baseos"}, + {"name": "broken", "address": "file:///broken"}, + ] + ), + ( + { + "enabled_repos": [], + "additional_servers": [{"name": "broken", "address": "file:///broken"}] + }, + [{"name": "baseos", "address": "file:///baseos"}, {"name": "custom", "address": "file:///custom"}], + [ + {"name": "broken", "address": "file:///broken"}, + ] + ), +]) +def test_get_test_case_repo_servers(test_case, repo_servers, expected): + assert get_test_case_repo_servers(test_case, repo_servers) == expected + + # pylint: disable=too-many-branches @pytest.mark.parametrize("test_case", depsolve_test_cases, ids=tcase_idfn) @pytest.mark.parametrize("with_sbom", [False, True]) @@ -1390,15 +1449,9 @@ def test_depsolve(tmp_path, repo_servers, dnf_config, detect_fn, with_sbom, test transactions = test_case["transactions"] - repo_servers_copy = repo_servers.copy() + tc_repo_servers = get_test_case_repo_servers(test_case, repo_servers) - # filter to only include enabled repositories - repo_servers_copy = [r for r in repo_servers_copy if r["name"] in test_case["enabled_repos"]] - - if "additional_servers" in test_case: - repo_servers_copy.extend(test_case["additional_servers"]) - - for repo_configs, root_dir, opt_metadata in config_combos(tmp_path, repo_servers_copy): + for repo_configs, root_dir, opt_metadata in config_combos(tmp_path, tc_repo_servers): with TemporaryDirectory() as cache_dir: res, exit_code = depsolve( transactions, cache_dir, dnf_config, repo_configs, root_dir, opt_metadata, with_sbom) @@ -1438,7 +1491,7 @@ def test_depsolve(tmp_path, repo_servers, dnf_config, detect_fn, with_sbom, test # *filelists* n_filelist_files = len(glob(f"{cache_dir}/*/repodata/*filelists*")) if "filelists" in opt_metadata: - assert n_filelist_files == len(repo_servers_copy) + assert n_filelist_files == len(tc_repo_servers) else: assert n_filelist_files == 0 @@ -1461,15 +1514,9 @@ def test_dump(tmp_path, repo_servers, dnf_config, detect_fn, test_case): except RuntimeError as e: pytest.skip(str(e)) - repo_servers_copy = repo_servers.copy() + tc_repo_servers = get_test_case_repo_servers(test_case, repo_servers) - # filter to only include enabled repositories - repo_servers_copy = [r for r in repo_servers_copy if r["name"] in test_case["enabled_repos"]] - - if "additional_servers" in test_case: - repo_servers_copy.extend(test_case["additional_servers"]) - - for repo_configs, root_dir, opt_metadata in config_combos(tmp_path, repo_servers_copy): + for repo_configs, root_dir, opt_metadata in config_combos(tmp_path, tc_repo_servers): with TemporaryDirectory() as cache_dir: res, exit_code = dump(cache_dir, dnf_config, repo_configs, root_dir, opt_metadata) @@ -1495,7 +1542,7 @@ def test_dump(tmp_path, repo_servers, dnf_config, detect_fn, test_case): # *filelists* n_filelist_files = len(glob(f"{cache_dir}/*/repodata/*filelists*")) if "filelists" in opt_metadata: - assert n_filelist_files == len(repo_servers_copy) + assert n_filelist_files == len(tc_repo_servers) else: assert n_filelist_files == 0 @@ -1512,17 +1559,11 @@ def test_search(tmp_path, repo_servers, dnf_config, detect_fn, test_case): except RuntimeError as e: pytest.skip(str(e)) - repo_servers_copy = repo_servers.copy() - - # filter to only include enabled repositories - repo_servers_copy = [r for r in repo_servers_copy if r["name"] in test_case["enabled_repos"]] - - if "additional_servers" in test_case: - repo_servers_copy.extend(test_case["additional_servers"]) + tc_repo_servers = get_test_case_repo_servers(test_case, repo_servers) search_args = test_case["search_args"] - for repo_configs, root_dir, opt_metadata in config_combos(tmp_path, repo_servers_copy): + for repo_configs, root_dir, opt_metadata in config_combos(tmp_path, tc_repo_servers): with TemporaryDirectory() as cache_dir: res, exit_code = search(search_args, cache_dir, dnf_config, repo_configs, root_dir, opt_metadata) @@ -1539,7 +1580,7 @@ def test_search(tmp_path, repo_servers, dnf_config, detect_fn, test_case): # *filelists* n_filelist_files = len(glob(f"{cache_dir}/*/repodata/*filelists*")) if "filelists" in opt_metadata: - assert n_filelist_files == len(repo_servers_copy) + assert n_filelist_files == len(tc_repo_servers) else: assert n_filelist_files == 0