#!/usr/bin/python3 """ Source for downloading rpms using librepo. Download the list of rpms using a metalink or mirrorlist URL, trying new mirrors if there is an error. The files are written to the osbuild file cache using the hash as the filename. It can download files that require secrets. The support secret providers are: - `org.osbuild.rhsm` for downloading Red Hat content that requires a subscriptions. - `org.osbuild.mtls` """ import os import sys from typing import Dict import librepo from osbuild import sources from osbuild.util.rhsm import Subscriptions # NOTE: The top level schema properties are limited to items and options by the # v2 schema definition # # "mirror" pattern follows dnf repo_id as described in: # https://dnf.readthedocs.io/en/latest/conf_ref.html#description SCHEMA_2 = """ "properties": { "items": { "description": "List of the packages and their hash to download from the mirror", "type": "object", "additionalProperties": false, "patternProperties": { "^(sha256|sha384|sha512):[0-9a-f]{64,128}$": { "required": [ "path", "mirror" ], "properties": { "path": { "description": "Name or path of the package file. Supports bare name or relative paths", "type": "string" }, "mirror": { "description": "The mirror id (from options) to use for this package", "type": "string" } } } } }, "options": { "required": [ "mirrors" ], "properties": { "mirrors": { "description": "List of mirrors to be used for downloading packages", "type": "object", "additionalProperties": false, "patternProperties": { "^[0-9a-zA-Z-._:]+$": { "required": [ "url", "type" ], "properties": { "url": { "description": "URL of the mirrorlist or metalink", "type": "string" }, "type": { "description": "Type of mirror: mirrorlist or metalink", "type": "string", "enum": [ "mirrorlist", "metalink", "baseurl" ] }, "max-parallels": { "description": "Maximum number of parallel downloads.", "type": "number" }, "fastest-mirror": { "description": "When true the mirrorlist is sorted by connection speed.", "type": "boolean", "default": false }, "insecure": { "description": "Skip the verification step for secure connections and proceed without checking", "type": "boolean", "default": false }, "secrets": { "type": "object", "additionalProperties": true, "required": [ "name" ], "properties": { "name": { "description": "Name of the secrets provider.", "type": "string" } } } } } } } } } } """ class LibRepoSource(sources.SourceService): """Use librepo to download rpm files. This will download rpms, in parallel, retrying with a new mirror on errors, and saving them into the store using their hash. It supports org.osbuild.rhsm secrets for downloading RHEL content and org.osbuild.mtls """ content_type = "org.osbuild.files" CHKSUM_TYPE = { "sha256": librepo.SHA256, "sha384": librepo.SHA384, "sha512": librepo.SHA512, } def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.subscriptions = None self.errors = [] def fetch_one(self, checksum, desc) -> None: raise RuntimeError("fetch_one is not used in org.osbuild.librepo") def _setup_rhsm(self, handle, mirror): """Setup the mirror's certificates if the secrets provider is org.osbuild.rhsm""" # rhsm secrets only need to be retrieved once and can then be reused if self.subscriptions is None: self.subscriptions = Subscriptions.from_host_system() secrets = self.subscriptions.get_secrets(mirror["url"]) if secrets: if secrets.get('ssl_ca_cert'): handle.sslcacert = secrets.get('ssl_ca_cert') if secrets.get('ssl_client_cert'): handle.sslclientcert = secrets.get('ssl_client_cert') if secrets.get('ssl_client_key'): handle.sslclientkey = secrets.get('ssl_client_key') def _setup_mtls(self, handle): # this is currently using the CURL names, librepo is using # curl under the hood so this is okay-ish, but maybe we should # generalize the name key = os.getenv("OSBUILD_SOURCES_CURL_SSL_CLIENT_KEY") cert = os.getenv("OSBUILD_SOURCES_CURL_SSL_CLIENT_CERT") if not (key and cert): raise RuntimeError(f"mtls secrets required but key ({key}) or cert ({cert}) not defined") handle.sslcacert = os.getenv("OSBUILD_SOURCES_CURL_SSL_CA_CERT") handle.sslclientcert = cert handle.sslclientkey = key # This gets called when done # data comes from cbdata # status is librepo.TRANSFER_* # librepo.TRANSFER_SUCCESSFUL # librepo.TRANSFER_ALREADYEXISTS # librepo.TRANSFER_ERROR def _endcb(self, data, status, msg): """Callback for librepo transfers data is the name/path of the package status is a librepo TRANSFER_* status code msg is a status message or error TRANSFER_ERROR is returned if all mirrors are tried and it cannot download the file. """ if status == librepo.TRANSFER_ERROR: self.errors.append(f"{data}: {msg}") # Workaround the lack of structured progress reporting from # stages/sources. See curl source for more details, once we # have something better this block can be removed. if status == librepo.TRANSFER_ERROR: print(f"Error downloading {data}") elif status == librepo.TRANSFER_SUCCESSFUL: print(f"Downloaded {data}") elif status == librepo.TRANSFER_ALREADYEXISTS: print(f"Already downloaded {data}") else: print(f"Unknown status {status} for {data}") def make_pkg_target(self, handle, dest, path, checksum): """Return a librepo.PackageTarget populated with the package data This specifies what to download, where to save it, the checksum, etc. """ chksum_type, checksum = checksum.split(":") return librepo.PackageTarget( path, handle=handle, checksum_type=self.CHKSUM_TYPE[chksum_type], checksum=checksum, dest=dest, cbdata=path, endcb=self._endcb) # pylint: disable=too-many-branches def fetch_all(self, items: Dict) -> None: """Use librepo to download the packages""" # Organize the packages by the mirror id packages_from_mirror = {} for item_id, pkg in items.items(): if pkg["mirror"] not in self.options["mirrors"]: raise RuntimeError(f'Missing mirror: {pkg["mirror"]}') packages_from_mirror.setdefault(pkg["mirror"], []).append((pkg["path"], item_id)) # Download packages from each of the mirror ids for mirror_id, packages in packages_from_mirror.items(): mirror = self.options["mirrors"][mirror_id] handle = librepo.Handle() handle.repotype = librepo.YUMREPO if mirror["type"] == "metalink": handle.metalinkurl = mirror["url"] elif mirror["type"] == "mirrorlist": handle.mirrorlisturl = mirror["url"] elif mirror["type"] == "baseurl": handle.urls = [mirror["url"]] if mirror.get("insecure"): # Disable peer certificate verification handle.sslverifypeer = False # Disable host name verification handle.sslverifyhost = False else: handle.sslverifypeer = True handle.sslverifyhost = True if "max-parallels" in mirror: handle.maxparalleldownloads = mirror["max-parallels"] if mirror.get("fastest-mirror", False): handle.fastestmirror = True # If this mirror has secrets, set them up on the librepo handle secrets_name = mirror.get("secrets", {}).get("name") if secrets_name == "org.osbuild.rhsm": self._setup_rhsm(handle, mirror) elif secrets_name == "org.osbuild.mtls": self._setup_mtls(handle) download = [] for path, checksum in packages: download.append(self.make_pkg_target(handle, f"{self.cache}/{checksum}", path, checksum)) # Download everything from this mirror librepo.download_packages(download) if self.errors: raise RuntimeError(",".join(self.errors)) def main(): service = LibRepoSource.from_args(sys.argv[1:]) service.main() if __name__ == '__main__': main()