sources: add new _fetch_all_new_curl() helper

When using a modern curl we can download download multiple urls
in parallel which avoids connection setup overhead and is generally
more efficient. Use when it's detected.

TODO: ensure both old and new curl are tested automatically via
the testsuite.
This commit is contained in:
Michael Vogt 2024-04-29 13:35:06 +02:00 committed by Simon de Vlieger
parent 974c8adff9
commit 0d3a153c78
2 changed files with 113 additions and 11 deletions

View file

@ -34,6 +34,14 @@ class SilentHTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
def log_message(self, *args, **kwargs):
pass
def do_GET(self):
# silence errors when the other side "hangs up" unexpectedly
# (our tests will do that when downloading in parallel)
try:
super().do_GET()
except (ConnectionResetError, BrokenPipeError):
pass
class DirHTTPServer(ThreadingHTTPServer):
def __init__(self, *args, directory=None, simulate_failures=0, **kwargs):

View file

@ -19,6 +19,8 @@ up the download.
"""
import concurrent.futures
import contextlib
import json
import os
import pathlib
import platform
@ -105,7 +107,7 @@ def curl_has_parallel_downloads():
first_line = output.split("\n", maxsplit=1)[0]
m = re.match(r'^curl (\d+\.\d+\.\d+)', first_line)
if not m:
print(f"WARNING: cannot parse curl version from '{first_line}'")
print(f"WARNING: cannot parse curl version from '{first_line}'", file=sys.stderr)
return False
major, minor, _ = m.group(1).split(".")
if int(major) > 7:
@ -164,6 +166,82 @@ def gen_curl_download_config(config_path: pathlib.Path, chksum_desc_tuple: List[
fp.write("\n")
def try_parse_curl_json_line(line):
line = line.strip()
if line:
try:
return json.loads(line)
except json.decoder.JSONDecodeError:
print(f"WARNING: cannot decode {line}", file=sys.stderr)
return None
def validate_and_move_to_targetdir(tmpdir, targetdir, checksum, origin):
"""
Validate that the checksum of the file with the filename
"checksum" in tmpdir matches and move into target dir. The
"origin" parameter is purely information to generate better
errors.
"""
if not verify_file(f"{tmpdir}/{checksum}", checksum):
raise RuntimeError(f"checksum mismatch: {checksum} {origin}")
# The checksum has been verified, move the file into place. in case we race
# another download of the same file, we simply ignore the error as their
# contents are guaranteed to be the same.
with contextlib.suppress(FileExistsError):
os.rename(f"{tmpdir}/{checksum}", f"{targetdir}/{checksum}")
def fetch_many_new_curl(tmpdir, targetdir, dl_pairs):
curl_config_path = f"{tmpdir}/curl-config.txt"
gen_curl_download_config(curl_config_path, dl_pairs)
curl_command = [
"curl",
"--config", curl_config_path,
# this adds a bunch of noise but might be nice for debug?
# "--show-error",
"--parallel",
# this will write out a json record for each finished download
"--write-out", "%{json}\n",
]
with contextlib.ExitStack() as cm:
curl_p = subprocess.Popen(curl_command, encoding="utf-8", cwd=tmpdir, stdout=subprocess.PIPE)
# ensure that curl is killed even if an unexpected exit happens
cm.callback(curl_p.kill)
while True:
line = curl_p.stdout.readline()
# empty line means eof/process finished
if line == "":
break
dl_details = try_parse_curl_json_line(line)
if not dl_details:
continue
url = dl_details['url']
# ignore individual download errors, the overall exit status will
# reflect them and the caller can retry
if dl_details["exitcode"] != 0:
print(f"WARNING: failed to download {url}: {dl_details['errormsg']}", file=sys.stderr)
continue
# the way downloads are setup the filename is the expected hash
# so validate now and move into place
checksum = dl_details["filename_effective"]
validate_and_move_to_targetdir(tmpdir, targetdir, checksum, url)
# remove item from download list
for todo_chksum, desc in dl_pairs[:]:
if todo_chksum == checksum:
dl_pairs.remove((checksum, desc))
# Workaround the lack of structured progress reporting from
# stages/sources. It generates messages of the form
# "message": "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f38/f38-x86_64-fedora-20230413/Packages/f/fonts-srpm-macros-2.0.5-11.fc38.noarch.rpm\n
#
# Without it just a long pause with no progress while curl
# downloads.
print(f"Downloaded {url}")
# return overall download status (this will be an error if any
# transfer failed)
return curl_p.wait()
class CurlSource(sources.SourceService):
content_type = "org.osbuild.files"
@ -203,6 +281,31 @@ class CurlSource(sources.SourceService):
filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache
amended = map(lambda i: self.amend_secrets(i[0], i[1]), filtered)
if curl_has_parallel_downloads():
self._fetch_all_new_curl(amended)
else:
self._fetch_all_old_curl(amended)
def _fetch_all_new_curl(self, dl_pairs):
dl_pairs = list(dl_pairs)
if len(dl_pairs) == 0:
return
# Download to a temporary sub cache until we have verified the checksum. Use a
# subdirectory, so we avoid copying across block devices.
with tempfile.TemporaryDirectory(prefix="osbuild-unverified-file-", dir=self.cache) as tmpdir:
# some mirrors are sometimes broken. retry manually, because we could be
# redirected to a different, working, one on retry.
return_code = 0
for _ in range(10):
return_code = fetch_many_new_curl(tmpdir, self.cache, dl_pairs)
if return_code == 0:
break
else:
failed_urls = ",".join([itm[1]["url"] for itm in dl_pairs])
raise RuntimeError(f"curl: error downloading {failed_urls}: error code {return_code}")
def _fetch_all_old_curl(self, amended):
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
for _ in executor.map(self.fetch_one, *zip(*amended)):
pass
@ -227,16 +330,7 @@ class CurlSource(sources.SourceService):
else:
raise RuntimeError(f"curl: error downloading {url}: error code {return_code}")
if not verify_file(f"{tmpdir}/{checksum}", checksum):
raise RuntimeError(f"checksum mismatch: {checksum} {url}")
# The checksum has been verified, move the file into place. in case we race
# another download of the same file, we simply ignore the error as their
# contents are guaranteed to be the same.
try:
os.rename(f"{tmpdir}/{checksum}", f"{self.cache}/{checksum}")
except FileExistsError:
pass
validate_and_move_to_targetdir(tmpdir, self.cache, checksum, url)
# Workaround the lack of structured progress reporting from
# stages/sources. It generates messages of the form
# "message": "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f38/f38-x86_64-fedora-20230413/Packages/f/fonts-srpm-macros-2.0.5-11.fc38.noarch.rpm\n