builder: support for sso via oauth2

Implement support for authentication via OAuth2 using the client
credentials "Client Credentials Grant" flow (4.4 of RFC 6749).
For this a new configuration section is added to the config file,
where the client_id, client_secret and token_url have to be
specified.
The impelmention does currently not support "refresh tokens", but
does support refreshing the token if an `expires_in` is present
in the token itself.
Corresponding unit tests have been added.

[1] https://datatracker.ietf.org/doc/html/rfc6749#section-4.4
This commit is contained in:
Christian Kellner 2022-01-31 10:56:12 +00:00
parent ca05cc9f00
commit 940e122ae9
2 changed files with 304 additions and 5 deletions

View file

@ -191,6 +191,76 @@ class ComposeLogs:
return cls(image_logs, import_logs, init_logs)
class OAuth2(requests.auth.AuthBase):
"""Auth provider for requests supporting OAuth2 client credentials
This auth provider supports the obtaining a token via the "Client
Credentials Grant" (RFC 6749 section 4.4[1]). Required properties
are the client id, client secret and the token url.
Automatic refreshing of the token is supported if the token was
acquired specified a `expires_in` field.
Currently, this implementation does not support a actual "refresh
token".
[1] https://datatracker.ietf.org/doc/html/rfc6749#section-4.4
"""
class Token:
def __init__(self, data):
self.data = data["access_token"]
self.type = data["token_type"]
self.expires_in = int(data["expires_in"])
self.scope = data.get("scope")
self.created = time.time()
@property
def expired(self) -> bool:
if not self.expires_in:
return False
now = time.time()
return now > self.created + self.expires_in
def __init__(self, cid: str, secret: str, token_url: str) -> None:
self.id = cid
self.secret = secret
self.token_url = token_url
self.token = None
@property
def token_expired(self) -> bool:
return not self.token or self.token.expired
def fetch_token(self, http: requests.Session):
data = {
"grant_type": "client_credentials",
"client_id": self.id,
"client_secret": self.id
}
res = http.post(self.token_url, data=data)
if res.status_code != 200:
body = res.content.decode("utf-8").strip()
msg = f"Failed to authenticate via SSO/OAuth: {body}"
raise koji.GenericError(msg) from None
token_data = res.json()
self.token = self.Token(token_data)
def __call__(self, r: requests.Request):
"""Called by requests to obtain authorization"""
# don't add the header if we fetch the token
if r.url == self.token_url:
return r
r.headers["authorization"] = "Bearer " + self.token.data
return r
class Client:
def __init__(self, url):
self.server = url
@ -209,8 +279,27 @@ class Client:
return certs
def oauth_init(self, client_id: str, secret: str, token_url: str):
oauth = OAuth2(client_id, secret, token_url)
self.http.auth = oauth
def oauth_check(self) -> bool:
auth = self.http.auth
if auth and auth.token_expired:
auth.fetch_token(self.http)
return True
return False
def request(self, method: str, url: str, js: Optional[Dict] = None):
return self.http.request(method, url, json=js)
self.oauth_check()
res = self.http.request(method, url, json=js)
if res.status_code == 401 and self.oauth_check():
res = self.http.request(method, url, json=js)
return res
def get(self, url: str) -> requests.Response:
return self.request("GET", url)
@ -319,6 +408,13 @@ class OSBuildImage(BaseTaskHandler):
self.client.http.verify = val
self.logger.debug("ssl verify: %s", val)
if "composer:oauth" in cfg:
oa = cfg["composer:oauth"]
client_id, client_secret = oa["client_id"], oa["client_secret"]
token_url = oa["token_url"]
self.logger.debug("Using OAuth2 with token url: %s", token_url)
self.client.oauth_init(client_id, client_secret, token_url)
def upload_json(self, data: Dict, name: str):
fd = io.StringIO()
json.dump(data, fd, indent=4, sort_keys=True)