#!/usr/bin/python3 """Fetch container manifest list from a registry using skopeo The manifest is stored as a single file indexed by its content hash. Buildhost commands used: `skopeo`. """ import concurrent.futures import errno import json import os import subprocess import sys import tempfile from typing import Dict from osbuild import sources from osbuild.util import containers, ctx SCHEMA = """ "additionalProperties": false, "definitions": { "item": { "description": "The manifest list to fetch", "type": "object", "additionalProperties": false, "patternProperties": { "sha256:[0-9a-f]{64}": { "type": "object", "additionalProperties": false, "required": ["image"], "properties": { "image": { "type": "object", "additionalProperties": false, "required": ["name"], "properties": { "name": { "type": "string", "description": "Name of the image (including registry)." }, "tls-verify": { "type": "boolean", "description": "Require https (default true)." }, "containers-transport": { "type": "string", "enum": ["docker", "containers-storage" ], "description": "The containers transport from which to copy the container", "default": "docker" }, "storage-location": { "type": "string", "description": "The location of the local containers storage" } } } } } } } }, "properties": { "items": {"$ref": "#/definitions/item"}, "digests": {"$ref": "#/definitions/item"} }, "oneOf": [{ "required": ["items"] }, { "required": ["digests"] }] """ DOCKER_TRANSPORT = "docker" CONTAINERS_STORAGE_TRANSPORT = "containers-storage" class SkopeoIndexSource(sources.SourceService): content_type = "org.osbuild.files" def get_source(self, transport, reference): if transport == DOCKER_TRANSPORT: return f"docker://{reference}" if transport == CONTAINERS_STORAGE_TRANSPORT: return f"containers-storage:{reference}" raise RuntimeError("Unrecognized containers transport") def fetch_all(self, items: Dict) -> None: filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache transformed = map(lambda i: self.transform(i[0], i[1]), filtered) # prepare each item to be downloaded with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: for _ in executor.map(self.fetch_one, *zip(*transformed)): pass def fetch_one(self, checksum, desc): digest = checksum image = desc["image"] imagename = image["name"] tls_verify = image.get("tls-verify", True) transport = image.get("containers-transport", DOCKER_TRANSPORT) location = image.get("storage-location", "") with tempfile.TemporaryDirectory(prefix="tmp-download-", dir=self.cache) as tmpdir: archive_dir = os.path.join(tmpdir, "index") os.makedirs(archive_dir) os.chmod(archive_dir, 0o755) # Skopeo will read the default storage path from the # /etc/containers/storage.conf unless storage-location # is provided. See: # https://github.com/containers/storage/blob/acbb93bb802702bc171b9987a47a9b713c280d38/types/options.go#L53 specifier = location if location == "" else f"[overlay@{location}]" reference = f"{specifier}{imagename}@{digest}" source = self.get_source(transport, reference) destination = f"dir:{archive_dir}" extra_args = [] if not tls_verify: extra_args.append("--src-tls-verify=false") subprocess.run(["skopeo", "copy", "--multi-arch=index-only", *extra_args, source, destination], encoding="utf-8", check=True) # Verify that the digest supplied downloaded a manifest-list. res = subprocess.check_output(["skopeo", "inspect", "--raw", destination]) if not containers.is_manifest_list(json.loads(res)): raise RuntimeError( f"{imagename}@{digest} is not a manifest-list") # use skopeo to calculate the checksum instead of our verify utility to make sure it's computed properly for # all types of manifests and handles any potential future changes to the way it's calculated manifest_path = os.path.join(archive_dir, "manifest.json") dl_checksum = subprocess.check_output(["skopeo", "manifest-digest", manifest_path]).decode().strip() if dl_checksum != checksum: raise RuntimeError( f"Downloaded manifest-list {imagename}@{digest} has a checksum of {dl_checksum}, " f"but expected {checksum}" ) # Move manifest into place on successful download with ctx.suppress_oserror(errno.ENOTEMPTY, errno.EEXIST): os.rename(f"{archive_dir}/manifest.json", f"{self.cache}/{digest}") def main(): service = SkopeoIndexSource.from_args(sys.argv[1:]) service.main() if __name__ == '__main__': main()