debian-forge/sources/org.osbuild.curl
2024-08-28 07:46:37 +02:00

375 lines
14 KiB
Python
Executable file

#!/usr/bin/python3
"""
Source for downloading files from URLs.
The files are indexed by their content hash. It can download files
that require secrets. The secret providers currently supported are:
- `org.osbuild.rhsm` for downloading Red Hat content that requires
a subscriptions.
- `org.osbuild.mtls` for downloading content that requires client
certificats. The paths to the key and cert should be set in the
environment in OSBUILD_SOURCES_CURL_SSL_CLIENT_KEY,
OSBUILD_SOURCES_CURL_SSL_CLIENT_CERT, and optionally
OSBUILD_SOURCES_CURL_SSL_CA_CERT.
It uses curl to download the files; the files are cached in an
internal cache. Multiple parallel connections are used to speed
up the download.
"""
import concurrent.futures
import contextlib
import json
import os
import pathlib
import platform
import re
import subprocess
import sys
import tempfile
import textwrap
import urllib.parse
from typing import Dict, List, Tuple
from osbuild import sources
from osbuild.util.checksum import verify_file
from osbuild.util.rhsm import Subscriptions
SCHEMA = """
"additionalProperties": false,
"definitions": {
"item": {
"description": "The files to fetch indexed their content checksum",
"type": "object",
"additionalProperties": false,
"patternProperties": {
"(md5|sha1|sha256|sha384|sha512):[0-9a-f]{32,128}": {
"oneOf": [
{
"type": "string",
"description": "URL to download the file from."
},
{
"type": "object",
"additionalProperties": false,
"required": [
"url"
],
"properties": {
"url": {
"type": "string",
"description": "URL to download the file from."
},
"insecure": {
"type": "boolean",
"description": "Skip the verification step for secure connections and proceed without checking",
"default": false
},
"secrets": {
"type": "object",
"additionalProperties": false,
"required": [
"name"
],
"properties": {
"name": {
"type": "string",
"description": "Name of the secrets provider."
}
}
}
}
}
]
}
}
}
},
"properties": {
"items": {"$ref": "#/definitions/item"},
"urls": {"$ref": "#/definitions/item"}
},
"oneOf": [{
"required": ["items"]
}, {
"required": ["urls"]
}]
"""
# We are not just using %{json} here because older curl (7.76) will
# write {"http_connect":000} which python cannot parse so we write our
# own json subset
CURL_WRITE_OUT_FMT = r'\{\"url\": \"%{url}\"\, \"filename_effective\": \"%{filename_effective}\", \"exitcode\": %{exitcode}, \"errormsg\": \"%{errormsg}\" \}\n'
NR_RETRYS = 10
def curl_has_parallel_downloads():
"""
Return true if curl has all the support we needed for parallel downloading
(this include --write-out "%{json}" too
"""
output = subprocess.check_output(["curl", "--version"], universal_newlines=True)
first_line = output.split("\n", maxsplit=1)[0]
m = re.match(r'^curl (\d+\.\d+\.\d+)', first_line)
if not m:
print(f"WARNING: cannot parse curl version from '{first_line}'", file=sys.stderr)
return False
major, minor, _ = m.group(1).split(".")
if int(major) > 7:
return True
# --parallel got added in 7.68
# --write-out "%{exitcode} is 7.75
if int(major) == 7 and int(minor) >= 75:
return True
return False
def _quote_url(url: str) -> str:
purl = urllib.parse.urlparse(url)
path = urllib.parse.quote(purl.path)
quoted = purl._replace(path=path)
return quoted.geturl()
def gen_curl_download_config(config_path: pathlib.Path, chksum_desc_tuple: List[Tuple[str, Dict]], parallel=False):
with open(config_path, "w", encoding="utf8") as fp:
# Because we use --next which resets the parser state we need to set
# these options for each url.
per_url_opts = textwrap.dedent(f"""\
user-agent = "osbuild (Linux.{platform.machine()}; https://osbuild.org/)"
silent
speed-limit = 1000
connect-timeout = 30
fail
location
""")
if parallel:
per_url_opts += textwrap.dedent(f"""\
write-out = "{CURL_WRITE_OUT_FMT}"
""")
proxy = os.getenv("OSBUILD_SOURCES_CURL_PROXY")
if proxy:
per_url_opts += f'proxy = "{proxy}"\n'
# start with the global option(s)
if parallel:
fp.write(textwrap.dedent("""\
# global options
parallel
"""))
# then generate the per-url config
fp.write("# per-url options\n")
for i, (checksum, desc) in enumerate(chksum_desc_tuple):
url = _quote_url(desc.get("url"))
fp.write(f'url = "{url}"\n')
fp.write(f'output = "{checksum}"\n')
fp.write(f'{per_url_opts}')
secrets = desc.get("secrets")
if secrets:
ssl_ca_cert = secrets.get('ssl_ca_cert')
if ssl_ca_cert:
fp.write(f'cacert = "{ssl_ca_cert}"\n')
ssl_client_cert = secrets.get('ssl_client_cert')
if ssl_client_cert:
fp.write(f'cert = "{ssl_client_cert}"\n')
ssl_client_key = secrets.get('ssl_client_key')
if ssl_client_key:
fp.write(f'key = "{ssl_client_key}"\n')
insecure = desc.get("insecure")
if insecure:
fp.write('insecure\n')
else:
fp.write('no-insecure\n')
if i + 1 < len(chksum_desc_tuple):
fp.write("next\n\n")
def try_parse_curl_line(line):
try:
return json.loads(line.strip())
except Exception as e: # pylint: disable=broad-exception-caught
print(f"WARNING: cannot json parse {line} {e}", file=sys.stderr)
return None
def validate_and_move_to_targetdir(tmpdir, targetdir, checksum, origin):
"""
Validate that the checksum of the file with the filename
"checksum" in tmpdir matches and move into target dir. The
"origin" parameter is purely information to generate better
errors.
"""
if not verify_file(f"{tmpdir}/{checksum}", checksum):
raise RuntimeError(f"checksum mismatch: {checksum} {origin}")
# The checksum has been verified, move the file into place. in case we race
# another download of the same file, we simply ignore the error as their
# contents are guaranteed to be the same.
with contextlib.suppress(FileExistsError):
os.rename(f"{tmpdir}/{checksum}", f"{targetdir}/{checksum}")
def fetch_many_new_curl(tmpdir, targetdir, dl_pairs):
curl_config_path = f"{tmpdir}/curl-config.txt"
gen_curl_download_config(curl_config_path, dl_pairs, parallel=True)
curl_command = [
"curl",
"--config", curl_config_path,
# this adds a bunch of noise but might be nice for debug?
# "--show-error",
]
with contextlib.ExitStack() as cm:
curl_p = subprocess.Popen(curl_command, encoding="utf-8", cwd=tmpdir, stdout=subprocess.PIPE)
# ensure that curl is killed even if an unexpected exit happens
cm.callback(curl_p.kill)
errors = []
while True:
line = curl_p.stdout.readline()
# empty line means eof/process finished
if line == "":
break
dl_details = try_parse_curl_line(line)
if not dl_details:
continue
url = dl_details['url']
# Keep track of individual errors as curl will only report
# the last download operation success/failure via the global
# exit code. There is "--fail-early" but the downside of that
# is that abort all in progress downloads too.
if dl_details["exitcode"] != 0:
print(f"WARNING: failed to download {url}: {dl_details['errormsg']}", file=sys.stderr)
errors.append(f'{url}: error code {dl_details["exitcode"]}')
continue
# the way downloads are setup the filename is the expected hash
# so validate now and move into place
checksum = dl_details["filename_effective"]
validate_and_move_to_targetdir(tmpdir, targetdir, checksum, url)
# remove item from download list
for todo_chksum, desc in dl_pairs[:]:
if todo_chksum == checksum:
dl_pairs.remove((checksum, desc))
# Workaround the lack of structured progress reporting from
# stages/sources. It generates messages of the form
# "message": "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f38/f38-x86_64-fedora-20230413/Packages/f/fonts-srpm-macros-2.0.5-11.fc38.noarch.rpm\n
#
# Without it just a long pause with no progress while curl
# downloads.
print(f"Downloaded {url}")
# return overall download status (this will be an error if any
# transfer failed)
curl_exit_code = curl_p.wait()
if not errors and curl_exit_code > 0:
errors.append("curl exited non-zero but reported no errors")
return errors
class CurlSource(sources.SourceService):
content_type = "org.osbuild.files"
max_workers = 2 * (os.cpu_count() or 1)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.subscriptions = None
self._curl_has_parallel_downloads = curl_has_parallel_downloads()
def amend_secrets(self, checksum, desc_or_url):
if not isinstance(desc_or_url, dict):
desc = {"url": desc_or_url}
else:
desc = desc_or_url
# check if desc needs rhsm secrets
if desc.get("secrets", {}).get("name") == "org.osbuild.rhsm":
# rhsm secrets only need to be retrieved once and can then be reused
if self.subscriptions is None:
self.subscriptions = Subscriptions.from_host_system()
desc["secrets"] = self.subscriptions.get_secrets(desc.get("url"))
elif desc.get("secrets", {}).get("name") == "org.osbuild.mtls":
key = os.getenv("OSBUILD_SOURCES_CURL_SSL_CLIENT_KEY")
cert = os.getenv("OSBUILD_SOURCES_CURL_SSL_CLIENT_CERT")
if not (key and cert):
raise RuntimeError(f"mtls secrets required but key ({key}) or cert ({cert}) not defined")
desc["secrets"] = {
'ssl_ca_cert': os.getenv("OSBUILD_SOURCES_CURL_SSL_CA_CERT"),
'ssl_client_cert': cert,
'ssl_client_key': key,
}
return checksum, desc
def fetch_all(self, items: Dict) -> None:
filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache
amended = map(lambda i: self.amend_secrets(i[0], i[1]), filtered)
if self._curl_has_parallel_downloads:
self._fetch_all_new_curl(amended)
else:
self._fetch_all_old_curl(amended)
def _fetch_all_new_curl(self, dl_pairs):
dl_pairs = list(dl_pairs)
if len(dl_pairs) == 0:
return
# Download to a temporary sub cache until we have verified the checksum. Use a
# subdirectory, so we avoid copying across block devices.
with tempfile.TemporaryDirectory(prefix="osbuild-unverified-file-", dir=self.cache) as tmpdir:
# some mirrors are sometimes broken. retry manually, because we could be
# redirected to a different, working, one on retry.
return_code = 0
for _ in range(NR_RETRYS):
errors = fetch_many_new_curl(tmpdir, self.cache, dl_pairs)
if not errors:
break
else:
details = ",".join(errors)
raise RuntimeError(f"curl: error downloading {details}")
if len(dl_pairs) > 0:
raise RuntimeError(f"curl: finished with return_code {return_code} but {dl_pairs} left to download")
def _fetch_all_old_curl(self, amended):
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
for _ in executor.map(self.fetch_one, *zip(*amended)):
pass
def fetch_one(self, checksum, desc):
url = _quote_url(desc.get("url"))
# Download to a temporary sub cache until we have verified the checksum. Use a
# subdirectory, so we avoid copying across block devices.
with tempfile.TemporaryDirectory(prefix="osbuild-unverified-file-", dir=self.cache) as tmpdir:
# some mirrors are sometimes broken. retry manually, because we could be
# redirected to a different, working, one on retry.
return_code = 0
for _ in range(NR_RETRYS):
curl_config_path = f"{tmpdir}/curl-config.txt"
gen_curl_download_config(curl_config_path, [(checksum, desc)])
curl = subprocess.run(
["curl", "--config", curl_config_path],
encoding="utf-8", cwd=tmpdir, check=False)
return_code = curl.returncode
if return_code == 0:
break
else:
raise RuntimeError(f"curl: error downloading {url}: error code {return_code}")
validate_and_move_to_targetdir(tmpdir, self.cache, checksum, url)
# Workaround the lack of structured progress reporting from
# stages/sources. It generates messages of the form
# "message": "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f38/f38-x86_64-fedora-20230413/Packages/f/fonts-srpm-macros-2.0.5-11.fc38.noarch.rpm\n
#
# Without it just a long pause with no progress while curl
# downloads.
print(f"Downloaded {url}")
def main():
service = CurlSource.from_args(sys.argv[1:])
service.main()
if __name__ == '__main__':
main()