diff --git a/osbuild/util/rhsm.py b/osbuild/util/rhsm.py new file mode 100644 index 00000000..987a175e --- /dev/null +++ b/osbuild/util/rhsm.py @@ -0,0 +1,67 @@ +"""Red Hat Subscription Manager support module + +This module implements utilities that help with interactions +with the subscriptions attached to the host machine. +""" + +import configparser +import re + + +class Subscriptions: + def __init__(self, repositories): + self.repositories = repositories + + @classmethod + def from_host_system(cls): + """Read redhat.repo file and process the list of repositories in there.""" + with open("/etc/yum.repos.d/redhat.repo", "r") as fp: + return cls.parse_repo_file(fp) + + @staticmethod + def _process_baseurl(input_url): + """Create a regex from a baseurl. + + The osbuild manifest format does not contain information about repositories. + It only includes URLs of each RPM. In order to make this RHSM support work, + osbuild needs to find a relation between a "baseurl" in a *.repo file and the + URL given in the manifest. To do so, it creates a regex from all baseurls + found in the *.repo file and matches them against the URL. + """ + # First escape meta characters that might occur in a URL + input_url = re.escape(input_url) + + # Now replace variables with regexes (see man 5 yum.conf for the list) + for variable in ["\\$releasever", "\\$arch", "\\$basearch", "\\$uuid"]: + input_url = input_url.replace(variable, "[^/]*") + + return re.compile(input_url) + + @classmethod + def parse_repo_file(cls, fp): + """Take a file object and reads its content assuming it is a .repo file.""" + parser = configparser.ConfigParser() + parser.read_file(fp) + + repositories = dict() + for section in parser.sections(): + current = { + "matchurl": cls._process_baseurl(parser.get(section, "baseurl")) + } + for parameter in ["sslcacert", "sslclientkey", "sslclientcert"]: + current[parameter] = parser.get(section, parameter) + + repositories[section] = current + + return cls(repositories) + + def get_secrets(self, url): + for parameters in self.repositories.values(): + if parameters["matchurl"].match(url) is not None: + return { + "ssl_ca_cert": parameters["sslcacert"], + "ssl_client_key": parameters["sslclientkey"], + "ssl_client_cert": parameters["sslclientcert"] + } + + raise RuntimeError(f"There are no RHSM secret associated with {url}") diff --git a/sources/org.osbuild.curl b/sources/org.osbuild.curl index 28928448..48b5a2b4 100755 --- a/sources/org.osbuild.curl +++ b/sources/org.osbuild.curl @@ -14,7 +14,6 @@ up the download. import concurrent.futures -import glob import itertools import json import math @@ -27,6 +26,7 @@ import time from typing import Dict from osbuild.util.checksum import verify_file +from osbuild.util.rhsm import Subscriptions SCHEMA = """ @@ -142,30 +142,10 @@ def fetch(url, checksum, directory): pass -def get_rhsm_secrets(): - rhsm_secrets = { - 'ssl_ca_cert': "/etc/rhsm/ca/redhat-uep.pem", - 'ssl_client_key': "", - 'ssl_client_cert': "" - } - - keys = glob.glob("/etc/pki/entitlement/*-key.pem") - for key in keys: - # The key and cert have the same prefix - cert = key.rstrip("-key.pem") + ".pem" - # The key is only valid if it has a matching cert - if os.path.exists(cert): - rhsm_secrets['ssl_client_key'] = key - rhsm_secrets['ssl_client_cert'] = cert - return rhsm_secrets - - raise RuntimeError("no matching rhsm key and cert") - - def download(items, cache): with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: requested_urls = [] - rhsm_secrets = None + subscriptions = None for url in items.values(): @@ -174,14 +154,14 @@ def download(items, cache): # check if url needs rhsm secrets if url.get("secrets", {}).get("name") == "org.osbuild.rhsm": - # rhsm secrets only need to be retrieved once and can then be reused - if rhsm_secrets is None: - try: - rhsm_secrets = get_rhsm_secrets() - except RuntimeError as e: - json.dump({"error": e.args[0]}, sys.stdout) - return 1 - url["secrets"] = rhsm_secrets + try: + # rhsm secrets only need to be retrieved once and can then be reused + if subscriptions is None: + subscriptions = Subscriptions.from_host_system() + url["secrets"] = subscriptions.get_secrets(url.get("url")) + except RuntimeError as e: + json.dump({"error": e.args[0]}, sys.stdout) + return 1 requested_urls.append(url) diff --git a/test/mod/test_util_rhsm.py b/test/mod/test_util_rhsm.py new file mode 100644 index 00000000..f89fab06 --- /dev/null +++ b/test/mod/test_util_rhsm.py @@ -0,0 +1,72 @@ +# +# Tests for the `osbuild.util.rhsm` module. +# + +from io import StringIO + +from osbuild.util.rhsm import Subscriptions + +REPO_FILE="""[jpp] +name = Red Hat JBoss Portal +baseurl = https://cdn.redhat.com/1.0/$basearch/os +enabled = 0 +gpgcheck = 1 +gpgkey = file:// +sslverify = 1 +sslcacert = /etc/rhsm/ca/redhat-uep.pem +sslclientkey = /etc/pki/entitlement/1-key.pem +sslclientcert = /etc/pki/entitlement/1.pem +metadata_expire = 86400 +enabled_metadata = 0 + +[jws] +name = Red Hat JBoss Web +baseurl = https://cdn.redhat.com/$releasever/jws/1.0/$basearch/os +enabled = 0 +gpgcheck = 1 +gpgkey = file:// +sslverify = 1 +sslcacert = /etc/rhsm/ca/redhat-uep.pem +sslclientkey = /etc/pki/entitlement/2-key.pem +sslclientcert = /etc/pki/entitlement/2.pem +metadata_expire = 86400 +enabled_metadata = 0 +""" + + +def test_from_host_system(): + # + # Test the `ioctl_get_immutable()` helper and make sure it works + # as intended. + # + subscriptions = Subscriptions.parse_repo_file(StringIO(REPO_FILE)) + rpm_url_cases = [ + { + "url": "https://cdn.redhat.com/8/jws/1.0/risc_v/os/Packages/fishy-fish-1-1.el8.risc_v.rpm", + "success": True, + "key": "2" + }, + { + "url": "https://cdn.redhat.com/8/jws/1.0/os/Packages/fishy-fish-1-1.el8.risc_v.rpm", + "success": False, + "key": "" + }, + { + "url": "https://cdn.redhat.com/1.0/x86_64/os/Packages/aaa.rpm", + "success": True, + "key": "1" + }, + ] + for test_case in rpm_url_cases: + try: + secrets = subscriptions.get_secrets(test_case["url"]) + except RuntimeError as e: + if not test_case["success"]: + continue + + raise e + + assert test_case["success"] # Verify this test case should pass + assert secrets["ssl_ca_cert"] == "/etc/rhsm/ca/redhat-uep.pem" + assert secrets["ssl_client_key"] == f'/etc/pki/entitlement/{test_case["key"]}-key.pem' + assert secrets["ssl_client_cert"] == f'/etc/pki/entitlement/{test_case["key"]}.pem'