sources: move parallelisation into source

This moves the parallelisation decisions into the sources themselves,
making the `download` method abstract inside `osbuild` itself.
This commit is contained in:
Simon de Vlieger 2024-01-23 11:45:13 +01:00 committed by Michael Vogt
parent 18e5481ae8
commit 2c42c46c48
6 changed files with 54 additions and 10 deletions

View file

@ -12,12 +12,13 @@ an internal cache. Multiple parallel connections are used to speed
up the download.
"""
import concurrent.futures
import os
import subprocess
import sys
import tempfile
import urllib.parse
from typing import Dict
from osbuild import sources
from osbuild.util.checksum import verify_file
@ -116,6 +117,14 @@ class CurlSource(sources.SourceService):
quoted = purl._replace(path=path)
return quoted.geturl()
def download(self, items: Dict) -> None:
filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache
transformed = map(lambda i: self.transform(i[0], i[1]), filtered) # prepare each item to be downloaded
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
for _ in executor.map(self.fetch_one, *zip(*transformed)):
pass
def fetch_one(self, checksum, desc):
secrets = desc.get("secrets")
insecure = desc.get("insecure")

View file

@ -12,9 +12,11 @@ resource is decoded and written to the store.
import base64
import concurrent.futures
import contextlib
import os
import sys
from typing import Dict
from osbuild import sources
from osbuild.util.checksum import verify_file
@ -56,6 +58,14 @@ class InlineSource(sources.SourceService):
content_type = "org.osbuild.files"
def download(self, items: Dict) -> None:
filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache
transformed = map(lambda i: self.transform(i[0], i[1]), filtered) # prepare each item to be downloaded
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
for _ in executor.map(self.fetch_one, *zip(*transformed)):
pass
def fetch_one(self, checksum, desc):
target = os.path.join(self.cache, checksum)
floating = os.path.join(self.tmpdir, checksum)

View file

@ -7,9 +7,11 @@ gpg keys are provided via `gpgkeys`.
"""
import concurrent.futures
import os
import sys
import uuid
from typing import Dict
from osbuild import sources
from osbuild.util import ostree
@ -87,6 +89,14 @@ class OSTreeSource(sources.SourceService):
super().__init__(*args, **kwargs)
self.repo = None
def download(self, items: Dict) -> None:
filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache
transformed = map(lambda i: self.transform(i[0], i[1]), filtered) # prepare each item to be downloaded
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
for _ in executor.map(self.fetch_one, *zip(*transformed)):
pass
def fetch_one(self, checksum, desc):
commit = checksum
remote = desc["remote"]

View file

@ -14,12 +14,14 @@ retaining signatures and manifests.
Buildhost commands used: `skopeo`.
"""
import concurrent.futures
import errno
import hashlib
import os
import subprocess
import sys
import tempfile
from typing import Dict
from osbuild import sources
from osbuild.util import ctx
@ -100,6 +102,14 @@ class SkopeoSource(sources.SourceService):
return f"containers-storage:{reference}"
raise RuntimeError("Unrecognized containers transport")
def download(self, items: Dict) -> None:
filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache
transformed = map(lambda i: self.transform(i[0], i[1]), filtered) # prepare each item to be downloaded
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
for _ in executor.map(self.fetch_one, *zip(*transformed)):
pass
def fetch_one(self, checksum, desc):
image_id = checksum
image = desc["image"]

View file

@ -6,12 +6,14 @@ The manifest is stored as a single file indexed by its content hash.
Buildhost commands used: `skopeo`.
"""
import concurrent.futures
import errno
import json
import os
import subprocess
import sys
import tempfile
from typing import Dict
from osbuild import sources
from osbuild.util import containers, ctx
@ -85,6 +87,14 @@ class SkopeoIndexSource(sources.SourceService):
return f"containers-storage:{reference}"
raise RuntimeError("Unrecognized containers transport")
def download(self, items: Dict) -> None:
filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache
transformed = map(lambda i: self.transform(i[0], i[1]), filtered) # prepare each item to be downloaded
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
for _ in executor.map(self.fetch_one, *zip(*transformed)):
pass
def fetch_one(self, checksum, desc):
digest = checksum
image = desc["image"]