diff --git a/osbuild/util/rhsm.py b/osbuild/util/rhsm.py index 987a175e..21a2d502 100644 --- a/osbuild/util/rhsm.py +++ b/osbuild/util/rhsm.py @@ -5,18 +5,54 @@ with the subscriptions attached to the host machine. """ import configparser +import contextlib +import glob +import os import re class Subscriptions: def __init__(self, repositories): self.repositories = repositories + # These are used as a fallback if the repositories don't + # contain secrets for a requested URL. + self.secrets = None + + def get_fallback_rhsm_secrets(self): + rhsm_secrets = { + 'ssl_ca_cert': "/etc/rhsm/ca/redhat-uep.pem", + 'ssl_client_key': "", + 'ssl_client_cert': "" + } + + keys = glob.glob("/etc/pki/entitlement/*-key.pem") + for key in keys: + # The key and cert have the same prefix + cert = key.rstrip("-key.pem") + ".pem" + # The key is only valid if it has a matching cert + if os.path.exists(cert): + rhsm_secrets['ssl_client_key'] = key + rhsm_secrets['ssl_client_cert'] = cert + # Once the dictionary is complete, assign it to the object + self.secrets = rhsm_secrets + + raise RuntimeError("no matching rhsm key and cert") @classmethod def from_host_system(cls): """Read redhat.repo file and process the list of repositories in there.""" - with open("/etc/yum.repos.d/redhat.repo", "r") as fp: - return cls.parse_repo_file(fp) + ret = cls(None) + with contextlib.suppress(FileNotFoundError): + with open("/etc/yum.repos.d/redhat.repo", "r") as fp: + ret = cls.parse_repo_file(fp) + + with contextlib.suppress(RuntimeError): + ret.get_fallback_rhsm_secrets() + + if not ret.repositories and not ret.secrets: + raise RuntimeError("No RHSM secrets found on this host.") + + return ret @staticmethod def _process_baseurl(input_url): @@ -56,6 +92,7 @@ class Subscriptions: return cls(repositories) def get_secrets(self, url): + # Try to find a matching URL from redhat.repo file first for parameters in self.repositories.values(): if parameters["matchurl"].match(url) is not None: return { @@ -64,4 +101,8 @@ class Subscriptions: "ssl_client_cert": parameters["sslclientcert"] } + # In case there is no matching URL, try the fallback + if self.secrets: + return self.secrets + raise RuntimeError(f"There are no RHSM secret associated with {url}")