stages: tweak the skopeo copy test

Small tweaks to the skopeo copy stage integration test to split
the tests into their own test-cases.
This commit is contained in:
Michael Vogt 2024-04-26 12:21:08 +02:00 committed by Achilleas Koutsou
parent 2269554829
commit bd8f361851

View file

@ -2,6 +2,7 @@
import json
import os
import subprocess
import tarfile
import pytest
@ -46,14 +47,7 @@ def test_schema_validation_skopeo(stage_schema, test_data, expected_err):
testutil.assert_jsonschema_error_contains(res, expected_err, expected_num_errs=1)
@pytest.mark.skipif(os.getuid() != 0, reason="needs root")
@pytest.mark.skipif(not has_executable("podman"), reason="no podman executable")
@pytest.mark.parametrize("dest_type,local_output_path", [
("dir", "/tmp/test-output-skopeo-dir"),
("oci-archive", "/tmp/test-output-skopeo.tar"),
("oci", "/tmp/test-output-skopeo-dir"),
])
def test_skopeo_copy(tmp_path, stage_module, dest_type, local_output_path):
def make_fake_oci_archive(tmp_path):
with make_container(tmp_path, {"file1": "file1 from final layer"}) as cont_tag:
# export for the container-deploy stage
fake_container_dst = tmp_path / "fake-container"
@ -63,65 +57,81 @@ def test_skopeo_copy(tmp_path, stage_module, dest_type, local_output_path):
f"--output={fake_container_dst}",
cont_tag,
])
return fake_container_dst
inputs = {
def make_skopeo_copy_inputs(fake_oci_path, name):
return {
"images": {
# seems to be unused with fake_container_path?
"path": fake_container_dst,
"path": fake_oci_path,
"data": {
"archives": {
fake_container_dst: {
fake_oci_path: {
"format": "oci-archive",
"name": cont_tag,
"name": name,
},
},
},
},
}
def assert_manifest_file(manifest_file):
assert manifest_file.exists()
data = json.loads(manifest_file.read_bytes())
assert data.get("config") is not None, \
"'manifest.json' seems corrupt - no 'config' section found"
assert data["config"].get("digest") is not None, \
"'manifest.json' seems corrupt - no 'config.digest' section found"
def _test_skopeo_copy(tmp_path, stage_module, typ, dest_name):
fake_oci_path = make_fake_oci_archive(tmp_path)
inputs = make_skopeo_copy_inputs(fake_oci_path, "some-name")
output_dir = tmp_path / "output"
local_path = f"/some/{dest_name}"
options = {
"destination": {
"type": dest_type,
"path": local_output_path
"type": typ,
"path": local_path,
}
}
stage_module.main(inputs, output_dir, options)
assert output_dir.exists()
result = output_dir / local_output_path.lstrip("/")
result = output_dir / local_path.lstrip("/")
assert result.exists()
return result
if dest_type == "dir":
assert (result / "version").exists()
manifest_file = (result / "manifest.json")
elif dest_type == "oci-archive":
# TBD extract TAR and check content
assert result.exists()
elif dest_type == "oci":
assert (result / "oci-layout").exists()
@pytest.mark.skipif(os.getuid() != 0, reason="needs root")
@pytest.mark.skipif(not has_executable("podman"), reason="no podman executable")
def test_skopeo_copy_dir(tmp_path, stage_module):
result_path = _test_skopeo_copy(tmp_path, stage_module, "dir", "skopeo-dir")
assert (result_path / "version").exists()
assert_manifest_file(result_path / "manifest.json")
index_file = (result / "index.json")
assert index_file.exists()
data = json.loads(index_file.read_bytes())
@pytest.mark.skipif(os.getuid() != 0, reason="needs root")
@pytest.mark.skipif(not has_executable("podman"), reason="no podman executable")
def test_skopeo_copy_oci_archive(tmp_path, stage_module):
result_path = _test_skopeo_copy(tmp_path, stage_module, "oci-archive", "skopeo-archive.tar")
assert result_path.exists()
with tarfile.open(result_path) as tf:
assert tf
assert data.get("manifests") is not None, "'index.json' seems corrupt - no 'manifests' section found"
assert data["manifests"][0].get("digest") is not None, \
"'manifest.json' seems corrupt - no 'manifests[0].digest' section found"
data_digest = data["manifests"][0]["digest"].split(':')
manifest_file = result / "blobs" / data_digest[0] / data_digest[1]
@pytest.mark.skipif(os.getuid() != 0, reason="needs root")
@pytest.mark.skipif(not has_executable("podman"), reason="no podman executable")
def test_skopeo_copy_oci(tmp_path, stage_module):
result_path = _test_skopeo_copy(tmp_path, stage_module, "oci", "skopeo-archive-oci")
index_file = (result_path / "index.json")
assert index_file.exists()
if dest_type in ["dir", "oci"]:
assert manifest_file.exists()
data = json.loads(manifest_file.read_bytes())
assert data.get("config") is not None, "'manifest.json' seems corrupt - no 'config' section found"
assert data["config"].get("digest") is not None, \
"'manifest.json' seems corrupt - no 'config.digest' section found"
data = json.loads(index_file.read_bytes())
assert data.get("manifests") is not None, "'index.json' seems corrupt - no 'manifests' section found"
assert data["manifests"][0].get("digest") is not None, \
"'manifest.json' seems corrupt - no 'manifests[0].digest' section found"
data_digest = data["manifests"][0]["digest"].split(':')
manifest_file = result_path / "blobs" / data_digest[0] / data_digest[1]
assert_manifest_file(manifest_file)