sources: generalizing download method
Before, the download method was defined in the inherited class of each program. With the same kind of workflow redefined every time. This contribution aims at making the workflow more clear and to generalize what can be in the SourceService class. The download worklow is as follow: Setup -> Filter -> Prepare -> Download The setup mainly step sets up caches. Where the download data will be stored in the end. The filter step is used to discard some of the items to download based on some criterion. By default, it is used to verify if an item is already in the cache using the item's checksum. The Prepare step goes from each element and let the overloading step the ability to alter each item before downloading it. This is used mainly for the curl command which for rhel must generate the subscriptions. Then the download step will call fetch_one for each item. Here the download can be performed sequentially or in parallel depending on the number of workers selected.
This commit is contained in:
parent
0953cf64e0
commit
1de74ce2c9
5 changed files with 129 additions and 141 deletions
|
|
@ -3,7 +3,9 @@ import contextlib
|
|||
import os
|
||||
import json
|
||||
import tempfile
|
||||
import concurrent.futures
|
||||
from abc import abstractmethod
|
||||
from typing import Dict, Tuple
|
||||
|
||||
from . import host
|
||||
from .objectstore import ObjectStore
|
||||
|
|
@ -51,6 +53,8 @@ class Source:
|
|||
class SourceService(host.Service):
|
||||
"""Source host service"""
|
||||
|
||||
max_workers = 1
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.cache = None
|
||||
|
|
@ -58,8 +62,24 @@ class SourceService(host.Service):
|
|||
self.tmpdir = None
|
||||
|
||||
@abc.abstractmethod
|
||||
def download(self, items):
|
||||
pass
|
||||
def fetch_one(self, checksum, desc) -> None:
|
||||
"""Performs the actual fetch of an element described by its checksum and its descriptor"""
|
||||
|
||||
def exists(self, checksum, _desc) -> bool:
|
||||
"""Returns True if the item to download is in cache. """
|
||||
return os.path.isfile(f"{self.cache}/{checksum}")
|
||||
|
||||
# pylint: disable=[no-self-use]
|
||||
def transform(self, checksum, desc) -> Tuple:
|
||||
"""Modify the input data before downloading. By default only transforms an item object to a Tupple."""
|
||||
return checksum, desc
|
||||
|
||||
def download(self, items: Dict) -> None:
|
||||
items = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache
|
||||
items = map(lambda i: self.transform(i[0], i[1]), items) # prepare each item to be downloaded
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
|
||||
for _ in executor.map(self.fetch_one, *zip(*items)):
|
||||
pass
|
||||
|
||||
@property
|
||||
@classmethod
|
||||
|
|
|
|||
|
|
@ -13,8 +13,6 @@ up the download.
|
|||
"""
|
||||
|
||||
|
||||
import concurrent.futures
|
||||
import itertools
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
|
|
@ -83,92 +81,74 @@ SCHEMA = """
|
|||
"""
|
||||
|
||||
|
||||
def fetch(url, checksum, directory):
|
||||
secrets = url.get("secrets")
|
||||
url_path = url.get("url")
|
||||
# Download to a temporary directory until we have verified the checksum. Use a
|
||||
# subdirectory, so we avoid copying across block devices.
|
||||
with tempfile.TemporaryDirectory(prefix="osbuild-unverified-file-", dir=directory) as tmpdir:
|
||||
# some mirrors are sometimes broken. retry manually, because we could be
|
||||
# redirected to a different, working, one on retry.
|
||||
return_code = 0
|
||||
for _ in range(10):
|
||||
curl_command = [
|
||||
"curl",
|
||||
"--silent",
|
||||
"--speed-limit", "1000",
|
||||
"--connect-timeout", "30",
|
||||
"--fail",
|
||||
"--location",
|
||||
"--output", checksum,
|
||||
]
|
||||
if secrets:
|
||||
if secrets.get('ssl_ca_cert'):
|
||||
curl_command.extend(["--cacert", secrets.get('ssl_ca_cert')])
|
||||
if secrets.get('ssl_client_cert'):
|
||||
curl_command.extend(["--cert", secrets.get('ssl_client_cert')])
|
||||
if secrets.get('ssl_client_key'):
|
||||
curl_command.extend(["--key", secrets.get('ssl_client_key')])
|
||||
# url must follow options
|
||||
curl_command.append(url_path)
|
||||
|
||||
curl = subprocess.run(curl_command, encoding="utf-8", cwd=tmpdir, check=False)
|
||||
return_code = curl.returncode
|
||||
if return_code == 0:
|
||||
break
|
||||
else:
|
||||
raise RuntimeError(f"curl: error downloading {url}: error code {return_code}")
|
||||
|
||||
if not verify_file(f"{tmpdir}/{checksum}", checksum):
|
||||
raise RuntimeError(f"checksum mismatch: {checksum} {url}")
|
||||
|
||||
# The checksum has been verified, move the file into place. in case we race
|
||||
# another download of the same file, we simply ignore the error as their
|
||||
# contents are guaranteed to be the same.
|
||||
try:
|
||||
os.rename(f"{tmpdir}/{checksum}", f"{directory}/{checksum}")
|
||||
except FileExistsError:
|
||||
pass
|
||||
|
||||
|
||||
def download(items, cache):
|
||||
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
|
||||
requested_urls = []
|
||||
requested_checksums = []
|
||||
subscriptions = None
|
||||
|
||||
for (checksum, url) in items.items():
|
||||
|
||||
# Invariant: all files in @directory must be named after their (verified) checksum.
|
||||
# Check this before secrets so that if everything is pre-downloaded we don't need secrets
|
||||
if os.path.isfile(f"{cache}/{checksum}"):
|
||||
continue
|
||||
|
||||
if not isinstance(url, dict):
|
||||
url = {"url": url}
|
||||
|
||||
# check if url needs rhsm secrets
|
||||
if url.get("secrets", {}).get("name") == "org.osbuild.rhsm":
|
||||
# rhsm secrets only need to be retrieved once and can then be reused
|
||||
if subscriptions is None:
|
||||
subscriptions = Subscriptions.from_host_system()
|
||||
url["secrets"] = subscriptions.get_secrets(url.get("url"))
|
||||
|
||||
requested_urls.append(url)
|
||||
requested_checksums.append(checksum)
|
||||
|
||||
results = executor.map(fetch, requested_urls, requested_checksums, itertools.repeat(cache))
|
||||
|
||||
for _ in results:
|
||||
pass
|
||||
|
||||
|
||||
class CurlSource(sources.SourceService):
|
||||
|
||||
content_type = "org.osbuild.files"
|
||||
max_workers = 4
|
||||
|
||||
def download(self, items):
|
||||
download(items, self.cache)
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.subscriptions = None
|
||||
|
||||
def transform(self, checksum, desc):
|
||||
url = desc
|
||||
if not isinstance(url, dict):
|
||||
url = {"url": url}
|
||||
|
||||
# check if url needs rhsm secrets
|
||||
if url.get("secrets", {}).get("name") == "org.osbuild.rhsm":
|
||||
# rhsm secrets only need to be retrieved once and can then be reused
|
||||
if self.subscriptions is None:
|
||||
self.subscriptions = Subscriptions.from_host_system()
|
||||
url["secrets"] = self.subscriptions.get_secrets(url.get("url"))
|
||||
return checksum, url
|
||||
|
||||
def fetch_one(self, checksum, desc):
|
||||
secrets = desc.get("secrets")
|
||||
url = desc.get("url")
|
||||
# Download to a temporary sub cache until we have verified the checksum. Use a
|
||||
# subdirectory, so we avoid copying across block devices.
|
||||
with tempfile.TemporaryDirectory(prefix="osbuild-unverified-file-", dir=self.cache) as tmpdir:
|
||||
# some mirrors are sometimes broken. retry manually, because we could be
|
||||
# redirected to a different, working, one on retry.
|
||||
return_code = 0
|
||||
for _ in range(10):
|
||||
curl_command = [
|
||||
"curl",
|
||||
"--silent",
|
||||
"--speed-limit", "1000",
|
||||
"--connect-timeout", "30",
|
||||
"--fail",
|
||||
"--location",
|
||||
"--output", checksum,
|
||||
]
|
||||
if secrets:
|
||||
if secrets.get('ssl_ca_cert'):
|
||||
curl_command.extend(["--cacert", secrets.get('ssl_ca_cert')])
|
||||
if secrets.get('ssl_client_cert'):
|
||||
curl_command.extend(["--cert", secrets.get('ssl_client_cert')])
|
||||
if secrets.get('ssl_client_key'):
|
||||
curl_command.extend(["--key", secrets.get('ssl_client_key')])
|
||||
# url must follow options
|
||||
curl_command.append(url)
|
||||
|
||||
curl = subprocess.run(curl_command, encoding="utf-8", cwd=tmpdir, check=False)
|
||||
return_code = curl.returncode
|
||||
if return_code == 0:
|
||||
break
|
||||
else:
|
||||
raise RuntimeError(f"curl: error downloading {url}: error code {return_code}")
|
||||
|
||||
if not verify_file(f"{tmpdir}/{checksum}", checksum):
|
||||
raise RuntimeError(f"checksum mismatch: {checksum} {url}")
|
||||
|
||||
# The checksum has been verified, move the file into place. in case we race
|
||||
# another download of the same file, we simply ignore the error as their
|
||||
# contents are guaranteed to be the same.
|
||||
try:
|
||||
os.rename(f"{tmpdir}/{checksum}", f"{self.cache}/{checksum}")
|
||||
except FileExistsError:
|
||||
pass
|
||||
|
||||
|
||||
def main():
|
||||
|
|
|
|||
|
|
@ -15,9 +15,6 @@ import base64
|
|||
import contextlib
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
from typing import Dict
|
||||
|
||||
from osbuild import sources
|
||||
from osbuild.util.checksum import verify_file
|
||||
|
|
@ -56,15 +53,18 @@ SCHEMA = """
|
|||
"""
|
||||
|
||||
|
||||
def process(items: Dict, cache: str, tmpdir):
|
||||
for checksum, item in items.items():
|
||||
target = os.path.join(cache, checksum)
|
||||
floating = os.path.join(tmpdir, checksum)
|
||||
class InlineSource(sources.SourceService):
|
||||
|
||||
content_type = "org.osbuild.files"
|
||||
|
||||
def fetch_one(self, checksum, desc):
|
||||
target = os.path.join(self.cache, checksum)
|
||||
floating = os.path.join(self.tmpdir, checksum)
|
||||
|
||||
if os.path.isfile(target):
|
||||
return
|
||||
|
||||
data = base64.b64decode(item["data"])
|
||||
data = base64.b64decode(desc["data"])
|
||||
|
||||
# Write the bits to disk and then verify the checksum
|
||||
# This ensures that 1) the data is ok and that 2) we
|
||||
|
|
@ -73,20 +73,12 @@ def process(items: Dict, cache: str, tmpdir):
|
|||
f.write(data)
|
||||
|
||||
if not verify_file(floating, checksum):
|
||||
raise RuntimeError("Checksum mismatch for {}".format(checksum))
|
||||
raise RuntimeError(f"Checksum mismatch for {format(checksum)}")
|
||||
|
||||
with contextlib.suppress(FileExistsError):
|
||||
os.rename(floating, target)
|
||||
|
||||
|
||||
class InlineSource(sources.SourceService):
|
||||
|
||||
content_type = "org.osbuild.files"
|
||||
|
||||
def download(self, items):
|
||||
process(items, self.cache, self.tmpdir)
|
||||
|
||||
|
||||
def main():
|
||||
service = InlineSource.from_args(sys.argv[1:])
|
||||
service.main()
|
||||
|
|
|
|||
|
|
@ -74,18 +74,17 @@ def ostree(*args, _input=None, **kwargs):
|
|||
check=True)
|
||||
|
||||
|
||||
def download(items, cache):
|
||||
# Prepare the cache and the output repo
|
||||
repo_cache = os.path.join(cache, "repo")
|
||||
ostree("init", mode="archive", repo=repo_cache)
|
||||
class OSTreeSource(sources.SourceService):
|
||||
|
||||
# Make sure the cache repository uses locks to protect the metadata during
|
||||
# shared access. This is the default since `2018.5`, but lets document this
|
||||
# explicitly here.
|
||||
ostree("config", "set", "repo.locking", "true", repo=repo_cache)
|
||||
content_type = "org.osbuild.ostree"
|
||||
|
||||
for commit, item in items.items():
|
||||
remote = item["remote"]
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.repo = None
|
||||
|
||||
def fetch_one(self, checksum, desc):
|
||||
commit = checksum
|
||||
remote = desc["remote"]
|
||||
url = remote["url"]
|
||||
gpg = remote.get("gpgkeys", [])
|
||||
uid = str(uuid.uuid4())
|
||||
|
|
@ -97,28 +96,34 @@ def download(items, cache):
|
|||
ostree("remote", "add",
|
||||
uid, url,
|
||||
*verify_args,
|
||||
repo=repo_cache)
|
||||
repo=self.repo)
|
||||
|
||||
for key in gpg:
|
||||
ostree("remote", "gpg-import", "--stdin", uid,
|
||||
repo=repo_cache, _input=key)
|
||||
repo=self.repo, _input=key)
|
||||
|
||||
# Transfer the commit: remote → cache
|
||||
print(f"pulling {commit}", file=sys.stderr)
|
||||
ostree("pull", uid, commit, repo=repo_cache)
|
||||
ostree("pull", uid, commit, repo=self.repo)
|
||||
|
||||
# Remove the temporary remotes again
|
||||
ostree("remote", "delete", uid,
|
||||
repo=repo_cache)
|
||||
repo=self.repo)
|
||||
|
||||
def setup(self, args):
|
||||
super().setup(args)
|
||||
# Prepare the cache and the output repo
|
||||
self.repo = os.path.join(self.cache, "repo")
|
||||
ostree("init", mode="archive", repo=self.repo)
|
||||
|
||||
class OSTreeSource(sources.SourceService):
|
||||
# Make sure the cache repository uses locks to protect the metadata during
|
||||
# shared access. This is the default since `2018.5`, but lets document this
|
||||
# explicitly here.
|
||||
ostree("config", "set", "repo.locking", "true", repo=self.repo)
|
||||
|
||||
content_type = "org.osbuild.ostree"
|
||||
|
||||
def download(self, items):
|
||||
|
||||
download(items, self.cache)
|
||||
# pylint: disable=[no-self-use]
|
||||
def exists(self, _checksum, _item):
|
||||
return False
|
||||
|
||||
|
||||
def main():
|
||||
|
|
|
|||
|
|
@ -64,18 +64,18 @@ SCHEMA = """
|
|||
"""
|
||||
|
||||
|
||||
def download(items, cache):
|
||||
for image_id, item in items.items():
|
||||
image = item["image"]
|
||||
class SkopeoSource(sources.SourceService):
|
||||
|
||||
content_type = "org.osbuild.containers"
|
||||
|
||||
def fetch_one(self, checksum, desc):
|
||||
image_id = checksum
|
||||
image = desc["image"]
|
||||
imagename = image["name"]
|
||||
digest = image["digest"]
|
||||
tls_verify = image.get("tls-verify", True)
|
||||
|
||||
path = f"{cache}/{image_id}"
|
||||
if os.path.isdir(path):
|
||||
continue
|
||||
|
||||
with tempfile.TemporaryDirectory(prefix="tmp-download-", dir=cache) as tmpdir:
|
||||
with tempfile.TemporaryDirectory(prefix="tmp-download-", dir=self.cache) as tmpdir:
|
||||
archive_path = os.path.join(tmpdir, "container-image.tar")
|
||||
|
||||
source = f"docker://{imagename}@{digest}"
|
||||
|
|
@ -113,16 +113,7 @@ def download(items, cache):
|
|||
# Atomically move download dir into place on successful download
|
||||
os.chmod(tmpdir, 0o755)
|
||||
with ctx.suppress_oserror(errno.ENOTEMPTY, errno.EEXIST):
|
||||
os.rename(tmpdir, path)
|
||||
|
||||
|
||||
class SkopeoSource(sources.SourceService):
|
||||
|
||||
content_type = "org.osbuild.containers"
|
||||
|
||||
def download(self, items):
|
||||
|
||||
download(items, self.cache)
|
||||
os.rename(tmpdir, f"{self.cache}/{image_id}")
|
||||
|
||||
|
||||
def main():
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue