From 52d027393cc4e7dc767c5efa638fa5add967daeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Hozza?= Date: Fri, 14 Feb 2025 09:33:55 +0100 Subject: [PATCH] tools/test/depsolve: factor out test case repo servers filtering MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract the code that filters and composes repo servers for a test case into a separate function. This enables reusing it in all places that did the same thing. The problem would get more prominent as we would separate some test scenarios into separate test cases. Signed-off-by: Tomáš Hozza --- tools/test/test_depsolve.py | 95 ++++++++++++++++++++++++++----------- 1 file changed, 68 insertions(+), 27 deletions(-) diff --git a/tools/test/test_depsolve.py b/tools/test/test_depsolve.py index 68a8d5fd..e1a18a7b 100644 --- a/tools/test/test_depsolve.py +++ b/tools/test/test_depsolve.py @@ -1362,6 +1362,65 @@ def config_combos(tmp_path, servers): yield repo_configs, root_dir, opt_metadata +def get_test_case_repo_servers(test_case, repo_servers): + """ + Return a list of repository servers for the test case. + """ + repo_servers_copy = repo_servers.copy() + # filter to only include enabled repositories + repo_servers_copy = [r for r in repo_servers_copy if r["name"] in test_case["enabled_repos"]] + repo_servers_copy.extend(test_case.get("additional_servers", [])) + return repo_servers_copy + + +@pytest.mark.parametrize("test_case,repo_servers,expected", [ + ( + {"enabled_repos": ["baseos", "custom"], "additional_servers": []}, + [{"name": "baseos", "address": "file:///baseos"}, {"name": "custom", "address": "file:///custom"}], + [{"name": "baseos", "address": "file:///baseos"}, {"name": "custom", "address": "file:///custom"}] + ), + ( + {"enabled_repos": ["baseos"], "additional_servers": []}, + [{"name": "baseos", "address": "file:///baseos"}, {"name": "custom", "address": "file:///custom"}], + [{"name": "baseos", "address": "file:///baseos"}] + ), + ( + { + "enabled_repos": ["baseos", "custom"], + "additional_servers": [{"name": "broken", "address": "file:///broken"}] + }, + [{"name": "baseos", "address": "file:///baseos"}, {"name": "custom", "address": "file:///custom"}], + [ + {"name": "baseos", "address": "file:///baseos"}, + {"name": "custom", "address": "file:///custom"}, + {"name": "broken", "address": "file:///broken"}, + ] + ), + ( + { + "enabled_repos": ["baseos"], "additional_servers": [{"name": "broken", "address": "file:///broken"}] + }, + [{"name": "baseos", "address": "file:///baseos"}, {"name": "custom", "address": "file:///custom"}], + [ + {"name": "baseos", "address": "file:///baseos"}, + {"name": "broken", "address": "file:///broken"}, + ] + ), + ( + { + "enabled_repos": [], + "additional_servers": [{"name": "broken", "address": "file:///broken"}] + }, + [{"name": "baseos", "address": "file:///baseos"}, {"name": "custom", "address": "file:///custom"}], + [ + {"name": "broken", "address": "file:///broken"}, + ] + ), +]) +def test_get_test_case_repo_servers(test_case, repo_servers, expected): + assert get_test_case_repo_servers(test_case, repo_servers) == expected + + # pylint: disable=too-many-branches @pytest.mark.parametrize("test_case", depsolve_test_cases, ids=tcase_idfn) @pytest.mark.parametrize("with_sbom", [False, True]) @@ -1390,15 +1449,9 @@ def test_depsolve(tmp_path, repo_servers, dnf_config, detect_fn, with_sbom, test transactions = test_case["transactions"] - repo_servers_copy = repo_servers.copy() + tc_repo_servers = get_test_case_repo_servers(test_case, repo_servers) - # filter to only include enabled repositories - repo_servers_copy = [r for r in repo_servers_copy if r["name"] in test_case["enabled_repos"]] - - if "additional_servers" in test_case: - repo_servers_copy.extend(test_case["additional_servers"]) - - for repo_configs, root_dir, opt_metadata in config_combos(tmp_path, repo_servers_copy): + for repo_configs, root_dir, opt_metadata in config_combos(tmp_path, tc_repo_servers): with TemporaryDirectory() as cache_dir: res, exit_code = depsolve( transactions, cache_dir, dnf_config, repo_configs, root_dir, opt_metadata, with_sbom) @@ -1438,7 +1491,7 @@ def test_depsolve(tmp_path, repo_servers, dnf_config, detect_fn, with_sbom, test # *filelists* n_filelist_files = len(glob(f"{cache_dir}/*/repodata/*filelists*")) if "filelists" in opt_metadata: - assert n_filelist_files == len(repo_servers_copy) + assert n_filelist_files == len(tc_repo_servers) else: assert n_filelist_files == 0 @@ -1461,15 +1514,9 @@ def test_dump(tmp_path, repo_servers, dnf_config, detect_fn, test_case): except RuntimeError as e: pytest.skip(str(e)) - repo_servers_copy = repo_servers.copy() + tc_repo_servers = get_test_case_repo_servers(test_case, repo_servers) - # filter to only include enabled repositories - repo_servers_copy = [r for r in repo_servers_copy if r["name"] in test_case["enabled_repos"]] - - if "additional_servers" in test_case: - repo_servers_copy.extend(test_case["additional_servers"]) - - for repo_configs, root_dir, opt_metadata in config_combos(tmp_path, repo_servers_copy): + for repo_configs, root_dir, opt_metadata in config_combos(tmp_path, tc_repo_servers): with TemporaryDirectory() as cache_dir: res, exit_code = dump(cache_dir, dnf_config, repo_configs, root_dir, opt_metadata) @@ -1495,7 +1542,7 @@ def test_dump(tmp_path, repo_servers, dnf_config, detect_fn, test_case): # *filelists* n_filelist_files = len(glob(f"{cache_dir}/*/repodata/*filelists*")) if "filelists" in opt_metadata: - assert n_filelist_files == len(repo_servers_copy) + assert n_filelist_files == len(tc_repo_servers) else: assert n_filelist_files == 0 @@ -1512,17 +1559,11 @@ def test_search(tmp_path, repo_servers, dnf_config, detect_fn, test_case): except RuntimeError as e: pytest.skip(str(e)) - repo_servers_copy = repo_servers.copy() - - # filter to only include enabled repositories - repo_servers_copy = [r for r in repo_servers_copy if r["name"] in test_case["enabled_repos"]] - - if "additional_servers" in test_case: - repo_servers_copy.extend(test_case["additional_servers"]) + tc_repo_servers = get_test_case_repo_servers(test_case, repo_servers) search_args = test_case["search_args"] - for repo_configs, root_dir, opt_metadata in config_combos(tmp_path, repo_servers_copy): + for repo_configs, root_dir, opt_metadata in config_combos(tmp_path, tc_repo_servers): with TemporaryDirectory() as cache_dir: res, exit_code = search(search_args, cache_dir, dnf_config, repo_configs, root_dir, opt_metadata) @@ -1539,7 +1580,7 @@ def test_search(tmp_path, repo_servers, dnf_config, detect_fn, test_case): # *filelists* n_filelist_files = len(glob(f"{cache_dir}/*/repodata/*filelists*")) if "filelists" in opt_metadata: - assert n_filelist_files == len(repo_servers_copy) + assert n_filelist_files == len(tc_repo_servers) else: assert n_filelist_files == 0