diff --git a/osbuild/sources.py b/osbuild/sources.py index fadb58d4..bbeab1b3 100644 --- a/osbuild/sources.py +++ b/osbuild/sources.py @@ -1,5 +1,4 @@ import abc -import concurrent.futures import contextlib import json import os @@ -67,6 +66,10 @@ class SourceService(host.Service): def fetch_one(self, checksum, desc) -> None: """Performs the actual fetch of an element described by its checksum and its descriptor""" + @abc.abstractmethod + def download(self, items: Dict) -> None: + """Download all sources.""" + def exists(self, checksum, _desc) -> bool: """Returns True if the item to download is in cache. """ return os.path.isfile(f"{self.cache}/{checksum}") @@ -76,14 +79,6 @@ class SourceService(host.Service): """Modify the input data before downloading. By default only transforms an item object to a Tupple.""" return checksum, desc - def download(self, items: Dict) -> None: - filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache - transformed = map(lambda i: self.transform(i[0], i[1]), filtered) # prepare each item to be downloaded - - with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: - for _ in executor.map(self.fetch_one, *zip(*transformed)): - pass - @staticmethod def load_items(fds): with os.fdopen(fds.steal(0)) as f: diff --git a/sources/org.osbuild.curl b/sources/org.osbuild.curl index 364df38e..d56bac56 100755 --- a/sources/org.osbuild.curl +++ b/sources/org.osbuild.curl @@ -12,12 +12,13 @@ an internal cache. Multiple parallel connections are used to speed up the download. """ - +import concurrent.futures import os import subprocess import sys import tempfile import urllib.parse +from typing import Dict from osbuild import sources from osbuild.util.checksum import verify_file @@ -116,6 +117,14 @@ class CurlSource(sources.SourceService): quoted = purl._replace(path=path) return quoted.geturl() + def download(self, items: Dict) -> None: + filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache + transformed = map(lambda i: self.transform(i[0], i[1]), filtered) # prepare each item to be downloaded + + with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: + for _ in executor.map(self.fetch_one, *zip(*transformed)): + pass + def fetch_one(self, checksum, desc): secrets = desc.get("secrets") insecure = desc.get("insecure") diff --git a/sources/org.osbuild.inline b/sources/org.osbuild.inline index 85f0f27a..e4e28b8f 100755 --- a/sources/org.osbuild.inline +++ b/sources/org.osbuild.inline @@ -12,9 +12,11 @@ resource is decoded and written to the store. import base64 +import concurrent.futures import contextlib import os import sys +from typing import Dict from osbuild import sources from osbuild.util.checksum import verify_file @@ -56,6 +58,14 @@ class InlineSource(sources.SourceService): content_type = "org.osbuild.files" + def download(self, items: Dict) -> None: + filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache + transformed = map(lambda i: self.transform(i[0], i[1]), filtered) # prepare each item to be downloaded + + with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: + for _ in executor.map(self.fetch_one, *zip(*transformed)): + pass + def fetch_one(self, checksum, desc): target = os.path.join(self.cache, checksum) floating = os.path.join(self.tmpdir, checksum) diff --git a/sources/org.osbuild.ostree b/sources/org.osbuild.ostree index f6b5a378..0adebdd2 100755 --- a/sources/org.osbuild.ostree +++ b/sources/org.osbuild.ostree @@ -7,9 +7,11 @@ gpg keys are provided via `gpgkeys`. """ +import concurrent.futures import os import sys import uuid +from typing import Dict from osbuild import sources from osbuild.util import ostree @@ -87,6 +89,14 @@ class OSTreeSource(sources.SourceService): super().__init__(*args, **kwargs) self.repo = None + def download(self, items: Dict) -> None: + filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache + transformed = map(lambda i: self.transform(i[0], i[1]), filtered) # prepare each item to be downloaded + + with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: + for _ in executor.map(self.fetch_one, *zip(*transformed)): + pass + def fetch_one(self, checksum, desc): commit = checksum remote = desc["remote"] diff --git a/sources/org.osbuild.skopeo b/sources/org.osbuild.skopeo index ec7ce8ce..162eaaf8 100755 --- a/sources/org.osbuild.skopeo +++ b/sources/org.osbuild.skopeo @@ -14,12 +14,14 @@ retaining signatures and manifests. Buildhost commands used: `skopeo`. """ +import concurrent.futures import errno import hashlib import os import subprocess import sys import tempfile +from typing import Dict from osbuild import sources from osbuild.util import ctx @@ -100,6 +102,14 @@ class SkopeoSource(sources.SourceService): return f"containers-storage:{reference}" raise RuntimeError("Unrecognized containers transport") + def download(self, items: Dict) -> None: + filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache + transformed = map(lambda i: self.transform(i[0], i[1]), filtered) # prepare each item to be downloaded + + with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: + for _ in executor.map(self.fetch_one, *zip(*transformed)): + pass + def fetch_one(self, checksum, desc): image_id = checksum image = desc["image"] diff --git a/sources/org.osbuild.skopeo-index b/sources/org.osbuild.skopeo-index index b2ca3062..f00ea5b5 100755 --- a/sources/org.osbuild.skopeo-index +++ b/sources/org.osbuild.skopeo-index @@ -6,12 +6,14 @@ The manifest is stored as a single file indexed by its content hash. Buildhost commands used: `skopeo`. """ +import concurrent.futures import errno import json import os import subprocess import sys import tempfile +from typing import Dict from osbuild import sources from osbuild.util import containers, ctx @@ -85,6 +87,14 @@ class SkopeoIndexSource(sources.SourceService): return f"containers-storage:{reference}" raise RuntimeError("Unrecognized containers transport") + def download(self, items: Dict) -> None: + filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache + transformed = map(lambda i: self.transform(i[0], i[1]), filtered) # prepare each item to be downloaded + + with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: + for _ in executor.map(self.fetch_one, *zip(*transformed)): + pass + def fetch_one(self, checksum, desc): digest = checksum image = desc["image"]