From 0d3a153c782e395ce226e07a652f7a2c4315e72d Mon Sep 17 00:00:00 2001 From: Michael Vogt Date: Mon, 29 Apr 2024 13:35:06 +0200 Subject: [PATCH] sources: add new _fetch_all_new_curl() helper When using a modern curl we can download download multiple urls in parallel which avoids connection setup overhead and is generally more efficient. Use when it's detected. TODO: ensure both old and new curl are tested automatically via the testsuite. --- osbuild/testutil/net.py | 8 +++ sources/org.osbuild.curl | 116 +++++++++++++++++++++++++++++++++++---- 2 files changed, 113 insertions(+), 11 deletions(-) diff --git a/osbuild/testutil/net.py b/osbuild/testutil/net.py index 6dc8acca..092b0965 100644 --- a/osbuild/testutil/net.py +++ b/osbuild/testutil/net.py @@ -34,6 +34,14 @@ class SilentHTTPRequestHandler(http.server.SimpleHTTPRequestHandler): def log_message(self, *args, **kwargs): pass + def do_GET(self): + # silence errors when the other side "hangs up" unexpectedly + # (our tests will do that when downloading in parallel) + try: + super().do_GET() + except (ConnectionResetError, BrokenPipeError): + pass + class DirHTTPServer(ThreadingHTTPServer): def __init__(self, *args, directory=None, simulate_failures=0, **kwargs): diff --git a/sources/org.osbuild.curl b/sources/org.osbuild.curl index 45db3723..d24a7cc3 100755 --- a/sources/org.osbuild.curl +++ b/sources/org.osbuild.curl @@ -19,6 +19,8 @@ up the download. """ import concurrent.futures +import contextlib +import json import os import pathlib import platform @@ -105,7 +107,7 @@ def curl_has_parallel_downloads(): first_line = output.split("\n", maxsplit=1)[0] m = re.match(r'^curl (\d+\.\d+\.\d+)', first_line) if not m: - print(f"WARNING: cannot parse curl version from '{first_line}'") + print(f"WARNING: cannot parse curl version from '{first_line}'", file=sys.stderr) return False major, minor, _ = m.group(1).split(".") if int(major) > 7: @@ -164,6 +166,82 @@ def gen_curl_download_config(config_path: pathlib.Path, chksum_desc_tuple: List[ fp.write("\n") +def try_parse_curl_json_line(line): + line = line.strip() + if line: + try: + return json.loads(line) + except json.decoder.JSONDecodeError: + print(f"WARNING: cannot decode {line}", file=sys.stderr) + return None + + +def validate_and_move_to_targetdir(tmpdir, targetdir, checksum, origin): + """ + Validate that the checksum of the file with the filename + "checksum" in tmpdir matches and move into target dir. The + "origin" parameter is purely information to generate better + errors. + """ + if not verify_file(f"{tmpdir}/{checksum}", checksum): + raise RuntimeError(f"checksum mismatch: {checksum} {origin}") + # The checksum has been verified, move the file into place. in case we race + # another download of the same file, we simply ignore the error as their + # contents are guaranteed to be the same. + with contextlib.suppress(FileExistsError): + os.rename(f"{tmpdir}/{checksum}", f"{targetdir}/{checksum}") + + +def fetch_many_new_curl(tmpdir, targetdir, dl_pairs): + curl_config_path = f"{tmpdir}/curl-config.txt" + gen_curl_download_config(curl_config_path, dl_pairs) + curl_command = [ + "curl", + "--config", curl_config_path, + # this adds a bunch of noise but might be nice for debug? + # "--show-error", + "--parallel", + # this will write out a json record for each finished download + "--write-out", "%{json}\n", + ] + with contextlib.ExitStack() as cm: + curl_p = subprocess.Popen(curl_command, encoding="utf-8", cwd=tmpdir, stdout=subprocess.PIPE) + # ensure that curl is killed even if an unexpected exit happens + cm.callback(curl_p.kill) + while True: + line = curl_p.stdout.readline() + # empty line means eof/process finished + if line == "": + break + dl_details = try_parse_curl_json_line(line) + if not dl_details: + continue + url = dl_details['url'] + # ignore individual download errors, the overall exit status will + # reflect them and the caller can retry + if dl_details["exitcode"] != 0: + print(f"WARNING: failed to download {url}: {dl_details['errormsg']}", file=sys.stderr) + continue + # the way downloads are setup the filename is the expected hash + # so validate now and move into place + checksum = dl_details["filename_effective"] + validate_and_move_to_targetdir(tmpdir, targetdir, checksum, url) + # remove item from download list + for todo_chksum, desc in dl_pairs[:]: + if todo_chksum == checksum: + dl_pairs.remove((checksum, desc)) + # Workaround the lack of structured progress reporting from + # stages/sources. It generates messages of the form + # "message": "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f38/f38-x86_64-fedora-20230413/Packages/f/fonts-srpm-macros-2.0.5-11.fc38.noarch.rpm\n + # + # Without it just a long pause with no progress while curl + # downloads. + print(f"Downloaded {url}") + # return overall download status (this will be an error if any + # transfer failed) + return curl_p.wait() + + class CurlSource(sources.SourceService): content_type = "org.osbuild.files" @@ -203,6 +281,31 @@ class CurlSource(sources.SourceService): filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache amended = map(lambda i: self.amend_secrets(i[0], i[1]), filtered) + if curl_has_parallel_downloads(): + self._fetch_all_new_curl(amended) + else: + self._fetch_all_old_curl(amended) + + def _fetch_all_new_curl(self, dl_pairs): + dl_pairs = list(dl_pairs) + if len(dl_pairs) == 0: + return + + # Download to a temporary sub cache until we have verified the checksum. Use a + # subdirectory, so we avoid copying across block devices. + with tempfile.TemporaryDirectory(prefix="osbuild-unverified-file-", dir=self.cache) as tmpdir: + # some mirrors are sometimes broken. retry manually, because we could be + # redirected to a different, working, one on retry. + return_code = 0 + for _ in range(10): + return_code = fetch_many_new_curl(tmpdir, self.cache, dl_pairs) + if return_code == 0: + break + else: + failed_urls = ",".join([itm[1]["url"] for itm in dl_pairs]) + raise RuntimeError(f"curl: error downloading {failed_urls}: error code {return_code}") + + def _fetch_all_old_curl(self, amended): with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: for _ in executor.map(self.fetch_one, *zip(*amended)): pass @@ -227,16 +330,7 @@ class CurlSource(sources.SourceService): else: raise RuntimeError(f"curl: error downloading {url}: error code {return_code}") - if not verify_file(f"{tmpdir}/{checksum}", checksum): - raise RuntimeError(f"checksum mismatch: {checksum} {url}") - - # The checksum has been verified, move the file into place. in case we race - # another download of the same file, we simply ignore the error as their - # contents are guaranteed to be the same. - try: - os.rename(f"{tmpdir}/{checksum}", f"{self.cache}/{checksum}") - except FileExistsError: - pass + validate_and_move_to_targetdir(tmpdir, self.cache, checksum, url) # Workaround the lack of structured progress reporting from # stages/sources. It generates messages of the form # "message": "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f38/f38-x86_64-fedora-20230413/Packages/f/fonts-srpm-macros-2.0.5-11.fc38.noarch.rpm\n