PR#3437: Authtype as enum and getSessionInfo prints authtype name

Merges #3437
https://pagure.io/koji/pull-request/3437

Fixes: #3436
https://pagure.io/koji/issue/3436
AUTHTYPEs as enum
This commit is contained in:
Tomas Kopecek 2022-07-26 14:36:15 +02:00
commit 2db60a04b9
7 changed files with 42 additions and 35 deletions

View file

@ -7371,13 +7371,13 @@ def handle_moshimoshi(options, session, args):
print("")
print("You are using the hub at %s" % session.baseurl)
authtype = u.get('authtype', getattr(session, 'authtype', None))
if authtype == koji.AUTHTYPE_NORMAL:
if authtype == koji.AUTHTYPES['NORMAL']:
print("Authenticated via password")
elif authtype == koji.AUTHTYPE_GSSAPI:
elif authtype == koji.AUTHTYPES['GSSAPI']:
print("Authenticated via GSSAPI")
elif authtype == koji.AUTHTYPE_KERB:
elif authtype == koji.AUTHTYPES['KERBEROS']:
print("Authenticated via Kerberos principal %s" % session.krb_principal)
elif authtype == koji.AUTHTYPE_SSL:
elif authtype == koji.AUTHTYPES['SSL']:
print("Authenticated via client certificate %s" % options.cert)

View file

@ -217,10 +217,19 @@ USER_STATUS = Enum((
# authtype values
# normal == username/password
AUTHTYPE_NORMAL = 0
AUTHTYPE_KERB = 1
AUTHTYPE_SSL = 2
AUTHTYPE_GSSAPI = 3
AUTHTYPES = Enum((
'NORMAL',
'KERBEROS',
'SSL',
'GSSAPI',
))
# authtype values - BACKWARD COMPATIBILITY (could be dropped in Koji 1.34)
# normal == username/password
AUTHTYPE_NORMAL = AUTHTYPES['NORMAL']
AUTHTYPE_KERB = AUTHTYPES['KERBEROS']
AUTHTYPE_SSL = AUTHTYPES['SSL']
AUTHTYPE_GSSAPI = AUTHTYPES['GSSAPI']
# dependency types
DEP_REQUIRE = 0
@ -2473,7 +2482,7 @@ class ClientSession(object):
if not sinfo:
return False
self.setSession(sinfo)
self.authtype = AUTHTYPE_NORMAL
self.authtype = AUTHTYPES['NORMAL']
return True
def subsession(self):
@ -2577,7 +2586,7 @@ class ClientSession(object):
self.setSession(sinfo)
self.authtype = AUTHTYPE_GSSAPI
self.authtype = AUTHTYPES['GSSAPI']
return True
def ssl_login(self, cert=None, ca=None, serverca=None, proxyuser=None, proxyauthtype=None):
@ -2631,7 +2640,7 @@ class ClientSession(object):
self.opts['serverca'] = serverca
self.setSession(sinfo)
self.authtype = AUTHTYPE_SSL
self.authtype = AUTHTYPES['SSL']
return True
def logout(self):

View file

@ -289,7 +289,7 @@ class Session(object):
self.checkLoginAllowed(user_id)
# create session and return
sinfo = self.createSession(user_id, hostip, koji.AUTHTYPE_NORMAL)
sinfo = self.createSession(user_id, hostip, koji.AUTHTYPES['NORMAL'])
session_id = sinfo['session-id']
context.cnx.commit()
return sinfo
@ -320,7 +320,7 @@ class Session(object):
"""Login into brew via SSL. proxyuser name can be specified and if it is
allowed in the configuration file then connection is allowed to login as
that user. By default we assume that proxyuser is coming via same
authentication mechanism but proxyauthtype can be set to koji.AUTHTYPE_*
authentication mechanism but proxyauthtype can be set to koji.AUTHTYPE['*']
value for different handling. Typical case is proxying kerberos user via
web ui which itself is authenticated via SSL certificate. (See kojiweb
for usage).
@ -336,7 +336,7 @@ class Session(object):
# it is kerberos principal rather than user's name.
username = context.environ.get('REMOTE_USER')
client_dn = username
authtype = koji.AUTHTYPE_GSSAPI
authtype = koji.AUTHTYPES['GSSAPI']
else:
if context.environ.get('SSL_CLIENT_VERIFY') != 'SUCCESS':
raise koji.AuthError('could not verify client: %s' %
@ -349,10 +349,10 @@ class Session(object):
'unable to get user information (%s) from client certificate' %
name_dn_component)
client_dn = context.environ.get('SSL_CLIENT_S_DN')
authtype = koji.AUTHTYPE_SSL
authtype = koji.AUTHTYPES['SSL']
if proxyuser:
if authtype == koji.AUTHTYPE_GSSAPI:
if authtype == koji.AUTHTYPES['GSSAPI']:
delimiter = ','
proxy_opt = 'ProxyPrincipals'
else:
@ -363,7 +363,7 @@ class Session(object):
# backwards compatible for GSSAPI.
# in old way, proxy user whitelist is ProxyDNs.
# TODO: this should be removed in future release
if authtype == koji.AUTHTYPE_GSSAPI and not context.opts.get(
if authtype == koji.AUTHTYPES['GSSAPI'] and not context.opts.get(
'DisableGSSAPIProxyDNFallback', False):
proxy_dns += [dn.strip() for dn in
context.opts.get('ProxyDNs', '').split('|')]
@ -379,18 +379,18 @@ class Session(object):
if not context.opts['AllowProxyAuthType'] and authtype != proxyauthtype:
raise koji.AuthError("Proxy must use same auth mechanism as hub (behaviour "
"can be overriden via AllowProxyAuthType hub option)")
if proxyauthtype not in (koji.AUTHTYPE_GSSAPI, koji.AUTHTYPE_SSL):
if proxyauthtype not in (koji.AUTHTYPES['GSSAPI'], koji.AUTHTYPES['SSL']):
raise koji.AuthError(
"Proxied authtype %s is not valid for sslLogin" % proxyauthtype)
authtype = proxyauthtype
if authtype == koji.AUTHTYPE_GSSAPI and '@' in username:
if authtype == koji.AUTHTYPES['GSSAPI'] and '@' in username:
user_id = self.getUserIdFromKerberos(username)
else:
user_id = self.getUserId(username)
if not user_id:
if context.opts.get('LoginCreatesUser'):
if authtype == koji.AUTHTYPE_GSSAPI and '@' in username:
if authtype == koji.AUTHTYPES['GSSAPI'] and '@' in username:
user_id = self.createUserFromKerberos(username)
else:
user_id = self.createUser(username)

View file

@ -73,12 +73,11 @@ class TestHello(utils.CliTestCase):
# valid authentication
auth_tests = {
koji.AUTHTYPE_NORMAL: 'Authenticated via password',
koji.AUTHTYPE_GSSAPI: 'Authenticated via GSSAPI',
koji.AUTHTYPE_KERB: 'Authenticated via Kerberos principal %s' %
user['krb_principal'],
koji.AUTHTYPE_SSL: 'Authenticated via client certificate %s' %
cert
koji.AUTHTYPES['NORMAL']: 'Authenticated via password',
koji.AUTHTYPES['GSSAPI']: 'Authenticated via GSSAPI',
koji.AUTHTYPES['KERBEROS']: 'Authenticated via Kerberos principal %s' %
user['krb_principal'],
koji.AUTHTYPES['SSL']: 'Authenticated via client certificate %s' % cert
}
hubinfo = "You are using the hub at %s" % self.huburl
session.getLoggedInUser.return_value = user

View file

@ -3,7 +3,6 @@ from __future__ import absolute_import
import mock
import unittest
import six
import koji
import koji.auth
@ -28,7 +27,7 @@ class TestAuthSession(unittest.TestCase):
context.cnx.cursor.return_value = cursor
cursor.fetchone.side_effect = [
# get session
[koji.AUTHTYPE_NORMAL, 344, False, False, 'master', 'start_time',
[koji.AUTHTYPES['NORMAL'], 344, False, False, 'master', 'start_time',
'start_ts', 'update_time', 'update_ts', 'user_id'],
# get user
['name', koji.USER_STATUS['NORMAL'], koji.USERTYPES['NORMAL']],
@ -54,7 +53,7 @@ class TestAuthSession(unittest.TestCase):
self.assertEqual(s.hostip, 'remote-addr')
self.assertEqual(s.callnum, 345)
self.assertEqual(s.user_id, 'user_id')
self.assertEqual(s.authtype, koji.AUTHTYPE_NORMAL)
self.assertEqual(s.authtype, koji.AUTHTYPES['NORMAL'])
self.assertEqual(s.master, 'master')
self.assertTrue(s.logged_in)

View file

@ -266,7 +266,7 @@ def login(environ, page=None):
session = _getServer(environ)
options = environ['koji.options']
if options['WebAuthType'] == koji.AUTHTYPE_SSL:
if options['WebAuthType'] == koji.AUTHTYPES['SSL']:
## Clients authenticate to KojiWeb by SSL, so extract
## the username via the (verified) client certificate
if environ['wsgi.url_scheme'] != 'https':
@ -283,7 +283,7 @@ def login(environ, page=None):
username = environ.get('SSL_CLIENT_S_DN_CN')
if not username:
raise koji.AuthError('unable to get user information from client certificate')
elif options['WebAuthType'] == koji.AUTHTYPE_GSSAPI:
elif options['WebAuthType'] == koji.AUTHTYPES['GSSAPI']:
## Clients authenticate to KojiWeb by Kerberos, so extract
## the username via the REMOTE_USER which will be the
## Kerberos principal

View file

@ -155,14 +155,14 @@ class Dispatcher(object):
raise koji.ConfigurationError(f"Invalid value {opts['WebAuthType']} for "
"WebAuthType (ssl/gssapi)")
if opts['WebAuthType'] == 'gssapi':
opts['WebAuthType'] = koji.AUTHTYPE_GSSAPI
opts['WebAuthType'] = koji.AUTHTYPES['GSSAPI']
elif opts['WebAuthType'] == 'ssl':
opts['WebAuthType'] = koji.AUTHTYPE_SSL
opts['WebAuthType'] = koji.AUTHTYPES['SSL']
# if there is no explicit request, use same authtype as web has
elif opts['WebPrincipal']:
opts['WebAuthType'] = koji.AUTHTYPE_GSSAPI
opts['WebAuthType'] = koji.AUTHTYPES['GSSAPI']
elif opts['WebCert']:
opts['WebAuthType'] = koji.AUTHTYPE_SSL
opts['WebAuthType'] = koji.AUTHTYPES['SSL']
self.options = opts
return opts