diff --git a/osbuild/sources.py b/osbuild/sources.py index 7c572a6b..115c9b57 100644 --- a/osbuild/sources.py +++ b/osbuild/sources.py @@ -3,7 +3,9 @@ import contextlib import os import json import tempfile +import concurrent.futures from abc import abstractmethod +from typing import Dict, Tuple from . import host from .objectstore import ObjectStore @@ -51,6 +53,8 @@ class Source: class SourceService(host.Service): """Source host service""" + max_workers = 1 + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.cache = None @@ -58,8 +62,24 @@ class SourceService(host.Service): self.tmpdir = None @abc.abstractmethod - def download(self, items): - pass + def fetch_one(self, checksum, desc) -> None: + """Performs the actual fetch of an element described by its checksum and its descriptor""" + + def exists(self, checksum, _desc) -> bool: + """Returns True if the item to download is in cache. """ + return os.path.isfile(f"{self.cache}/{checksum}") + + # pylint: disable=[no-self-use] + def transform(self, checksum, desc) -> Tuple: + """Modify the input data before downloading. By default only transforms an item object to a Tupple.""" + return checksum, desc + + def download(self, items: Dict) -> None: + items = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache + items = map(lambda i: self.transform(i[0], i[1]), items) # prepare each item to be downloaded + with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: + for _ in executor.map(self.fetch_one, *zip(*items)): + pass @property @classmethod diff --git a/sources/org.osbuild.curl b/sources/org.osbuild.curl index daf05510..6a7dfadc 100755 --- a/sources/org.osbuild.curl +++ b/sources/org.osbuild.curl @@ -13,8 +13,6 @@ up the download. """ -import concurrent.futures -import itertools import os import subprocess import sys @@ -83,92 +81,74 @@ SCHEMA = """ """ -def fetch(url, checksum, directory): - secrets = url.get("secrets") - url_path = url.get("url") - # Download to a temporary directory until we have verified the checksum. Use a - # subdirectory, so we avoid copying across block devices. - with tempfile.TemporaryDirectory(prefix="osbuild-unverified-file-", dir=directory) as tmpdir: - # some mirrors are sometimes broken. retry manually, because we could be - # redirected to a different, working, one on retry. - return_code = 0 - for _ in range(10): - curl_command = [ - "curl", - "--silent", - "--speed-limit", "1000", - "--connect-timeout", "30", - "--fail", - "--location", - "--output", checksum, - ] - if secrets: - if secrets.get('ssl_ca_cert'): - curl_command.extend(["--cacert", secrets.get('ssl_ca_cert')]) - if secrets.get('ssl_client_cert'): - curl_command.extend(["--cert", secrets.get('ssl_client_cert')]) - if secrets.get('ssl_client_key'): - curl_command.extend(["--key", secrets.get('ssl_client_key')]) - # url must follow options - curl_command.append(url_path) - - curl = subprocess.run(curl_command, encoding="utf-8", cwd=tmpdir, check=False) - return_code = curl.returncode - if return_code == 0: - break - else: - raise RuntimeError(f"curl: error downloading {url}: error code {return_code}") - - if not verify_file(f"{tmpdir}/{checksum}", checksum): - raise RuntimeError(f"checksum mismatch: {checksum} {url}") - - # The checksum has been verified, move the file into place. in case we race - # another download of the same file, we simply ignore the error as their - # contents are guaranteed to be the same. - try: - os.rename(f"{tmpdir}/{checksum}", f"{directory}/{checksum}") - except FileExistsError: - pass - - -def download(items, cache): - with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: - requested_urls = [] - requested_checksums = [] - subscriptions = None - - for (checksum, url) in items.items(): - - # Invariant: all files in @directory must be named after their (verified) checksum. - # Check this before secrets so that if everything is pre-downloaded we don't need secrets - if os.path.isfile(f"{cache}/{checksum}"): - continue - - if not isinstance(url, dict): - url = {"url": url} - - # check if url needs rhsm secrets - if url.get("secrets", {}).get("name") == "org.osbuild.rhsm": - # rhsm secrets only need to be retrieved once and can then be reused - if subscriptions is None: - subscriptions = Subscriptions.from_host_system() - url["secrets"] = subscriptions.get_secrets(url.get("url")) - - requested_urls.append(url) - requested_checksums.append(checksum) - - results = executor.map(fetch, requested_urls, requested_checksums, itertools.repeat(cache)) - - for _ in results: - pass - - class CurlSource(sources.SourceService): content_type = "org.osbuild.files" + max_workers = 4 - def download(self, items): - download(items, self.cache) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.subscriptions = None + + def transform(self, checksum, desc): + url = desc + if not isinstance(url, dict): + url = {"url": url} + + # check if url needs rhsm secrets + if url.get("secrets", {}).get("name") == "org.osbuild.rhsm": + # rhsm secrets only need to be retrieved once and can then be reused + if self.subscriptions is None: + self.subscriptions = Subscriptions.from_host_system() + url["secrets"] = self.subscriptions.get_secrets(url.get("url")) + return checksum, url + + def fetch_one(self, checksum, desc): + secrets = desc.get("secrets") + url = desc.get("url") + # Download to a temporary sub cache until we have verified the checksum. Use a + # subdirectory, so we avoid copying across block devices. + with tempfile.TemporaryDirectory(prefix="osbuild-unverified-file-", dir=self.cache) as tmpdir: + # some mirrors are sometimes broken. retry manually, because we could be + # redirected to a different, working, one on retry. + return_code = 0 + for _ in range(10): + curl_command = [ + "curl", + "--silent", + "--speed-limit", "1000", + "--connect-timeout", "30", + "--fail", + "--location", + "--output", checksum, + ] + if secrets: + if secrets.get('ssl_ca_cert'): + curl_command.extend(["--cacert", secrets.get('ssl_ca_cert')]) + if secrets.get('ssl_client_cert'): + curl_command.extend(["--cert", secrets.get('ssl_client_cert')]) + if secrets.get('ssl_client_key'): + curl_command.extend(["--key", secrets.get('ssl_client_key')]) + # url must follow options + curl_command.append(url) + + curl = subprocess.run(curl_command, encoding="utf-8", cwd=tmpdir, check=False) + return_code = curl.returncode + if return_code == 0: + break + else: + raise RuntimeError(f"curl: error downloading {url}: error code {return_code}") + + if not verify_file(f"{tmpdir}/{checksum}", checksum): + raise RuntimeError(f"checksum mismatch: {checksum} {url}") + + # The checksum has been verified, move the file into place. in case we race + # another download of the same file, we simply ignore the error as their + # contents are guaranteed to be the same. + try: + os.rename(f"{tmpdir}/{checksum}", f"{self.cache}/{checksum}") + except FileExistsError: + pass def main(): diff --git a/sources/org.osbuild.inline b/sources/org.osbuild.inline index e13116f0..3ebc3864 100755 --- a/sources/org.osbuild.inline +++ b/sources/org.osbuild.inline @@ -15,9 +15,6 @@ import base64 import contextlib import os import sys -import tempfile - -from typing import Dict from osbuild import sources from osbuild.util.checksum import verify_file @@ -56,15 +53,18 @@ SCHEMA = """ """ -def process(items: Dict, cache: str, tmpdir): - for checksum, item in items.items(): - target = os.path.join(cache, checksum) - floating = os.path.join(tmpdir, checksum) +class InlineSource(sources.SourceService): + + content_type = "org.osbuild.files" + + def fetch_one(self, checksum, desc): + target = os.path.join(self.cache, checksum) + floating = os.path.join(self.tmpdir, checksum) if os.path.isfile(target): return - data = base64.b64decode(item["data"]) + data = base64.b64decode(desc["data"]) # Write the bits to disk and then verify the checksum # This ensures that 1) the data is ok and that 2) we @@ -73,20 +73,12 @@ def process(items: Dict, cache: str, tmpdir): f.write(data) if not verify_file(floating, checksum): - raise RuntimeError("Checksum mismatch for {}".format(checksum)) + raise RuntimeError(f"Checksum mismatch for {format(checksum)}") with contextlib.suppress(FileExistsError): os.rename(floating, target) -class InlineSource(sources.SourceService): - - content_type = "org.osbuild.files" - - def download(self, items): - process(items, self.cache, self.tmpdir) - - def main(): service = InlineSource.from_args(sys.argv[1:]) service.main() diff --git a/sources/org.osbuild.ostree b/sources/org.osbuild.ostree index 099781ba..fddb914a 100755 --- a/sources/org.osbuild.ostree +++ b/sources/org.osbuild.ostree @@ -74,18 +74,17 @@ def ostree(*args, _input=None, **kwargs): check=True) -def download(items, cache): - # Prepare the cache and the output repo - repo_cache = os.path.join(cache, "repo") - ostree("init", mode="archive", repo=repo_cache) +class OSTreeSource(sources.SourceService): - # Make sure the cache repository uses locks to protect the metadata during - # shared access. This is the default since `2018.5`, but lets document this - # explicitly here. - ostree("config", "set", "repo.locking", "true", repo=repo_cache) + content_type = "org.osbuild.ostree" - for commit, item in items.items(): - remote = item["remote"] + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.repo = None + + def fetch_one(self, checksum, desc): + commit = checksum + remote = desc["remote"] url = remote["url"] gpg = remote.get("gpgkeys", []) uid = str(uuid.uuid4()) @@ -97,28 +96,34 @@ def download(items, cache): ostree("remote", "add", uid, url, *verify_args, - repo=repo_cache) + repo=self.repo) for key in gpg: ostree("remote", "gpg-import", "--stdin", uid, - repo=repo_cache, _input=key) + repo=self.repo, _input=key) # Transfer the commit: remote → cache print(f"pulling {commit}", file=sys.stderr) - ostree("pull", uid, commit, repo=repo_cache) + ostree("pull", uid, commit, repo=self.repo) # Remove the temporary remotes again ostree("remote", "delete", uid, - repo=repo_cache) + repo=self.repo) + def setup(self, args): + super().setup(args) + # Prepare the cache and the output repo + self.repo = os.path.join(self.cache, "repo") + ostree("init", mode="archive", repo=self.repo) -class OSTreeSource(sources.SourceService): + # Make sure the cache repository uses locks to protect the metadata during + # shared access. This is the default since `2018.5`, but lets document this + # explicitly here. + ostree("config", "set", "repo.locking", "true", repo=self.repo) - content_type = "org.osbuild.ostree" - - def download(self, items): - - download(items, self.cache) + # pylint: disable=[no-self-use] + def exists(self, _checksum, _item): + return False def main(): diff --git a/sources/org.osbuild.skopeo b/sources/org.osbuild.skopeo index a61e9f65..72c2fef7 100755 --- a/sources/org.osbuild.skopeo +++ b/sources/org.osbuild.skopeo @@ -64,18 +64,18 @@ SCHEMA = """ """ -def download(items, cache): - for image_id, item in items.items(): - image = item["image"] +class SkopeoSource(sources.SourceService): + + content_type = "org.osbuild.containers" + + def fetch_one(self, checksum, desc): + image_id = checksum + image = desc["image"] imagename = image["name"] digest = image["digest"] tls_verify = image.get("tls-verify", True) - path = f"{cache}/{image_id}" - if os.path.isdir(path): - continue - - with tempfile.TemporaryDirectory(prefix="tmp-download-", dir=cache) as tmpdir: + with tempfile.TemporaryDirectory(prefix="tmp-download-", dir=self.cache) as tmpdir: archive_path = os.path.join(tmpdir, "container-image.tar") source = f"docker://{imagename}@{digest}" @@ -113,16 +113,7 @@ def download(items, cache): # Atomically move download dir into place on successful download os.chmod(tmpdir, 0o755) with ctx.suppress_oserror(errno.ENOTEMPTY, errno.EEXIST): - os.rename(tmpdir, path) - - -class SkopeoSource(sources.SourceService): - - content_type = "org.osbuild.containers" - - def download(self, items): - - download(items, self.cache) + os.rename(tmpdir, f"{self.cache}/{image_id}") def main():