As part of the investigation of the CI failure in https://github.com/osbuild/osbuild-composer/pull/4247 we noticed that curl can return a return_code of `0` even when it did not downloaded all the urls in a `--config` provided file. This seems to be curl version dependent, I had a hard time writing a test-case with the real curl (8.6.0) that reproduces this so I went with mocking it. We definietly saw this failure with the centos 9 version (7.76). Our current code is buggy and assumes that the exit status of curl is always non-zero if any download fails but that is only the case when `--fail-early` is used. The extra paranoia will not hurt even when relying on the exit code of curl is fixed.
364 lines
14 KiB
Python
Executable file
364 lines
14 KiB
Python
Executable file
#!/usr/bin/python3
|
|
"""
|
|
Source for downloading files from URLs.
|
|
|
|
The files are indexed by their content hash. It can download files
|
|
that require secrets. The secret providers currently supported are:
|
|
|
|
- `org.osbuild.rhsm` for downloading Red Hat content that requires
|
|
a subscriptions.
|
|
- `org.osbuild.mtls` for downloading content that requires client
|
|
certificats. The paths to the key and cert should be set in the
|
|
environment in OSBUILD_SOURCES_CURL_SSL_CLIENT_KEY,
|
|
OSBUILD_SOURCES_CURL_SSL_CLIENT_CERT, and optionally
|
|
OSBUILD_SOURCES_CURL_SSL_CA_CERT.
|
|
|
|
It uses curl to download the files; the files are cached in an
|
|
internal cache. Multiple parallel connections are used to speed
|
|
up the download.
|
|
"""
|
|
|
|
import concurrent.futures
|
|
import contextlib
|
|
import os
|
|
import pathlib
|
|
import platform
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import textwrap
|
|
import urllib.parse
|
|
from typing import Dict, List, Tuple
|
|
|
|
from osbuild import sources
|
|
from osbuild.util.checksum import verify_file
|
|
from osbuild.util.rhsm import Subscriptions
|
|
|
|
SCHEMA = """
|
|
"additionalProperties": false,
|
|
"definitions": {
|
|
"item": {
|
|
"description": "The files to fetch indexed their content checksum",
|
|
"type": "object",
|
|
"additionalProperties": false,
|
|
"patternProperties": {
|
|
"(md5|sha1|sha256|sha384|sha512):[0-9a-f]{32,128}": {
|
|
"oneOf": [
|
|
{
|
|
"type": "string",
|
|
"description": "URL to download the file from."
|
|
},
|
|
{
|
|
"type": "object",
|
|
"additionalProperties": false,
|
|
"required": [
|
|
"url"
|
|
],
|
|
"properties": {
|
|
"url": {
|
|
"type": "string",
|
|
"description": "URL to download the file from."
|
|
},
|
|
"insecure": {
|
|
"type": "boolean",
|
|
"description": "Skip the verification step for secure connections and proceed without checking",
|
|
"default": false
|
|
},
|
|
"secrets": {
|
|
"type": "object",
|
|
"additionalProperties": false,
|
|
"required": [
|
|
"name"
|
|
],
|
|
"properties": {
|
|
"name": {
|
|
"type": "string",
|
|
"description": "Name of the secrets provider."
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
},
|
|
"properties": {
|
|
"items": {"$ref": "#/definitions/item"},
|
|
"urls": {"$ref": "#/definitions/item"}
|
|
},
|
|
"oneOf": [{
|
|
"required": ["items"]
|
|
}, {
|
|
"required": ["urls"]
|
|
}]
|
|
"""
|
|
|
|
|
|
def curl_has_parallel_downloads():
|
|
"""
|
|
Return true if curl has all the support we needed for parallel downloading
|
|
(this include --write-out "%{json}" too
|
|
"""
|
|
# Re-enable this once the failures in
|
|
# https://github.com/osbuild/osbuild-composer/pull/4247
|
|
# are understood
|
|
if os.getenv("OSBUILD_SOURCES_CURL_USE_PARALLEL") not in ["1", "yes", "true"]:
|
|
return False
|
|
|
|
output = subprocess.check_output(["curl", "--version"], universal_newlines=True)
|
|
first_line = output.split("\n", maxsplit=1)[0]
|
|
m = re.match(r'^curl (\d+\.\d+\.\d+)', first_line)
|
|
if not m:
|
|
print(f"WARNING: cannot parse curl version from '{first_line}'", file=sys.stderr)
|
|
return False
|
|
major, minor, _ = m.group(1).split(".")
|
|
if int(major) > 7:
|
|
return True
|
|
# --parallel got added in 7.68
|
|
# --write-out "%{exitcode} is 7.75
|
|
if int(major) == 7 and int(minor) >= 75:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _quote_url(url: str) -> str:
|
|
purl = urllib.parse.urlparse(url)
|
|
path = urllib.parse.quote(purl.path)
|
|
quoted = purl._replace(path=path)
|
|
return quoted.geturl()
|
|
|
|
|
|
def gen_curl_download_config(config_path: pathlib.Path, chksum_desc_tuple: List[Tuple[str, Dict]]):
|
|
with open(config_path, "w", encoding="utf8") as fp:
|
|
# global options
|
|
fp.write(textwrap.dedent(f"""\
|
|
user-agent = "osbuild (Linux.{platform.machine()}; https://osbuild.org/)"
|
|
silent
|
|
speed-limit = 1000
|
|
connect-timeout = 30
|
|
fail
|
|
location
|
|
"""))
|
|
proxy = os.getenv("OSBUILD_SOURCES_CURL_PROXY")
|
|
if proxy:
|
|
fp.write(f'proxy = "{proxy}"\n')
|
|
fp.write("\n")
|
|
# per url options
|
|
for checksum, desc in chksum_desc_tuple:
|
|
url = _quote_url(desc.get("url"))
|
|
fp.write(f'url = "{url}"\n')
|
|
fp.write(f'output = "{checksum}"\n')
|
|
secrets = desc.get("secrets")
|
|
if secrets:
|
|
ssl_ca_cert = secrets.get('ssl_ca_cert')
|
|
if ssl_ca_cert:
|
|
fp.write(f'cacert = "{ssl_ca_cert}"\n')
|
|
ssl_client_cert = secrets.get('ssl_client_cert')
|
|
if ssl_client_cert:
|
|
fp.write(f'cert = "{ssl_client_cert}"\n')
|
|
ssl_client_key = secrets.get('ssl_client_key')
|
|
if ssl_client_key:
|
|
fp.write(f'key = "{ssl_client_key}"\n')
|
|
insecure = desc.get("insecure")
|
|
if insecure:
|
|
fp.write('insecure\n')
|
|
else:
|
|
fp.write('no-insecure\n')
|
|
fp.write("\n")
|
|
|
|
|
|
def try_parse_curl_line(line):
|
|
line = line.strip()
|
|
print(line)
|
|
if not line.startswith("osbuild-dl\x1c"):
|
|
print(f"WARNING: unexpected prefix in {line}", file=sys.stderr)
|
|
return None
|
|
_, url, filename, exitcode, errormsg = line.split("\x1c")
|
|
return {
|
|
"url": url,
|
|
"filename_effective": filename,
|
|
"exitcode": int(exitcode),
|
|
"errormsg": errormsg,
|
|
}
|
|
|
|
|
|
def validate_and_move_to_targetdir(tmpdir, targetdir, checksum, origin):
|
|
"""
|
|
Validate that the checksum of the file with the filename
|
|
"checksum" in tmpdir matches and move into target dir. The
|
|
"origin" parameter is purely information to generate better
|
|
errors.
|
|
"""
|
|
if not verify_file(f"{tmpdir}/{checksum}", checksum):
|
|
raise RuntimeError(f"checksum mismatch: {checksum} {origin}")
|
|
# The checksum has been verified, move the file into place. in case we race
|
|
# another download of the same file, we simply ignore the error as their
|
|
# contents are guaranteed to be the same.
|
|
with contextlib.suppress(FileExistsError):
|
|
os.rename(f"{tmpdir}/{checksum}", f"{targetdir}/{checksum}")
|
|
|
|
|
|
def fetch_many_new_curl(tmpdir, targetdir, dl_pairs):
|
|
curl_config_path = f"{tmpdir}/curl-config.txt"
|
|
gen_curl_download_config(curl_config_path, dl_pairs)
|
|
curl_command = [
|
|
"curl",
|
|
"--config", curl_config_path,
|
|
# this adds a bunch of noise but might be nice for debug?
|
|
# "--show-error",
|
|
"--parallel",
|
|
# this will write out a "record" for each finished download
|
|
# Not using %{json} here because older curl (7.76) will write
|
|
# {"http_connect":000} which python cannot parse
|
|
"--write-out", "osbuild-dl\x1c%{url}\x1c%{filename_effective}\x1c%{exitcode}\x1cerror: %{errormsg}\n",
|
|
]
|
|
with contextlib.ExitStack() as cm:
|
|
curl_p = subprocess.Popen(curl_command, encoding="utf-8", cwd=tmpdir, stdout=subprocess.PIPE)
|
|
# ensure that curl is killed even if an unexpected exit happens
|
|
cm.callback(curl_p.kill)
|
|
while True:
|
|
line = curl_p.stdout.readline()
|
|
# empty line means eof/process finished
|
|
if line == "":
|
|
break
|
|
dl_details = try_parse_curl_line(line)
|
|
if not dl_details:
|
|
continue
|
|
url = dl_details['url']
|
|
# ignore individual download errors, the overall exit status will
|
|
# reflect them and the caller can retry
|
|
if dl_details["exitcode"] != 0:
|
|
print(f"WARNING: failed to download {url}: {dl_details['errormsg']}", file=sys.stderr)
|
|
continue
|
|
# the way downloads are setup the filename is the expected hash
|
|
# so validate now and move into place
|
|
checksum = dl_details["filename_effective"]
|
|
validate_and_move_to_targetdir(tmpdir, targetdir, checksum, url)
|
|
# remove item from download list
|
|
for todo_chksum, desc in dl_pairs[:]:
|
|
if todo_chksum == checksum:
|
|
dl_pairs.remove((checksum, desc))
|
|
# Workaround the lack of structured progress reporting from
|
|
# stages/sources. It generates messages of the form
|
|
# "message": "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f38/f38-x86_64-fedora-20230413/Packages/f/fonts-srpm-macros-2.0.5-11.fc38.noarch.rpm\n
|
|
#
|
|
# Without it just a long pause with no progress while curl
|
|
# downloads.
|
|
print(f"Downloaded {url}")
|
|
# return overall download status (this will be an error if any
|
|
# transfer failed)
|
|
return curl_p.wait()
|
|
|
|
|
|
class CurlSource(sources.SourceService):
|
|
|
|
content_type = "org.osbuild.files"
|
|
|
|
max_workers = 2 * (os.cpu_count() or 1)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.subscriptions = None
|
|
self._curl_has_parallel_downloads = curl_has_parallel_downloads()
|
|
|
|
def amend_secrets(self, checksum, desc_or_url):
|
|
if not isinstance(desc_or_url, dict):
|
|
desc = {"url": desc_or_url}
|
|
else:
|
|
desc = desc_or_url
|
|
|
|
# check if desc needs rhsm secrets
|
|
if desc.get("secrets", {}).get("name") == "org.osbuild.rhsm":
|
|
# rhsm secrets only need to be retrieved once and can then be reused
|
|
if self.subscriptions is None:
|
|
self.subscriptions = Subscriptions.from_host_system()
|
|
desc["secrets"] = self.subscriptions.get_secrets(desc.get("url"))
|
|
elif desc.get("secrets", {}).get("name") == "org.osbuild.mtls":
|
|
key = os.getenv("OSBUILD_SOURCES_CURL_SSL_CLIENT_KEY")
|
|
cert = os.getenv("OSBUILD_SOURCES_CURL_SSL_CLIENT_CERT")
|
|
if not (key and cert):
|
|
raise RuntimeError(f"mtls secrets required but key ({key}) or cert ({cert}) not defined")
|
|
desc["secrets"] = {
|
|
'ssl_ca_cert': os.getenv("OSBUILD_SOURCES_CURL_SSL_CA_CERT"),
|
|
'ssl_client_cert': cert,
|
|
'ssl_client_key': key,
|
|
}
|
|
|
|
return checksum, desc
|
|
|
|
def fetch_all(self, items: Dict) -> None:
|
|
filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache
|
|
amended = map(lambda i: self.amend_secrets(i[0], i[1]), filtered)
|
|
|
|
if self._curl_has_parallel_downloads:
|
|
self._fetch_all_new_curl(amended)
|
|
else:
|
|
self._fetch_all_old_curl(amended)
|
|
|
|
def _fetch_all_new_curl(self, dl_pairs):
|
|
dl_pairs = list(dl_pairs)
|
|
if len(dl_pairs) == 0:
|
|
return
|
|
|
|
# Download to a temporary sub cache until we have verified the checksum. Use a
|
|
# subdirectory, so we avoid copying across block devices.
|
|
with tempfile.TemporaryDirectory(prefix="osbuild-unverified-file-", dir=self.cache) as tmpdir:
|
|
# some mirrors are sometimes broken. retry manually, because we could be
|
|
# redirected to a different, working, one on retry.
|
|
return_code = 0
|
|
for _ in range(10):
|
|
return_code = fetch_many_new_curl(tmpdir, self.cache, dl_pairs)
|
|
if return_code == 0:
|
|
break
|
|
else:
|
|
failed_urls = ",".join([itm[1]["url"] for itm in dl_pairs])
|
|
raise RuntimeError(f"curl: error downloading {failed_urls}: error code {return_code}")
|
|
|
|
if len(dl_pairs) > 0:
|
|
raise RuntimeError(f"curl: finished with return_code {return_code} but {dl_pairs} left to download")
|
|
|
|
def _fetch_all_old_curl(self, amended):
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
|
|
for _ in executor.map(self.fetch_one, *zip(*amended)):
|
|
pass
|
|
|
|
def fetch_one(self, checksum, desc):
|
|
url = _quote_url(desc.get("url"))
|
|
# Download to a temporary sub cache until we have verified the checksum. Use a
|
|
# subdirectory, so we avoid copying across block devices.
|
|
with tempfile.TemporaryDirectory(prefix="osbuild-unverified-file-", dir=self.cache) as tmpdir:
|
|
# some mirrors are sometimes broken. retry manually, because we could be
|
|
# redirected to a different, working, one on retry.
|
|
return_code = 0
|
|
for _ in range(10):
|
|
curl_config_path = f"{tmpdir}/curl-config.txt"
|
|
gen_curl_download_config(curl_config_path, [(checksum, desc)])
|
|
curl = subprocess.run(
|
|
["curl", "--config", curl_config_path],
|
|
encoding="utf-8", cwd=tmpdir, check=False)
|
|
return_code = curl.returncode
|
|
if return_code == 0:
|
|
break
|
|
else:
|
|
raise RuntimeError(f"curl: error downloading {url}: error code {return_code}")
|
|
|
|
validate_and_move_to_targetdir(tmpdir, self.cache, checksum, url)
|
|
# Workaround the lack of structured progress reporting from
|
|
# stages/sources. It generates messages of the form
|
|
# "message": "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f38/f38-x86_64-fedora-20230413/Packages/f/fonts-srpm-macros-2.0.5-11.fc38.noarch.rpm\n
|
|
#
|
|
# Without it just a long pause with no progress while curl
|
|
# downloads.
|
|
print(f"Downloaded {url}")
|
|
|
|
|
|
def main():
|
|
service = CurlSource.from_args(sys.argv[1:])
|
|
service.main()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|