PR#708: Implement support for keytab in gssapi codepaths
Merges #708 https://pagure.io/koji/pull-request/708
This commit is contained in:
commit
b320d2f1d7
2 changed files with 99 additions and 14 deletions
|
|
@ -2126,16 +2126,17 @@ class ClientSession(object):
|
|||
principal. The principal must be in the "ProxyPrincipals" list on
|
||||
the server side."""
|
||||
|
||||
if principal is None and keytab is None and ccache is None:
|
||||
try:
|
||||
# Silently try GSSAPI first
|
||||
if self.gssapi_login(proxyuser=proxyuser):
|
||||
return True
|
||||
except:
|
||||
if krbV:
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
try:
|
||||
# Silently try GSSAPI first
|
||||
if self.gssapi_login(principal, keytab, ccache, proxyuser=proxyuser):
|
||||
return True
|
||||
except Exception as e:
|
||||
if krbV:
|
||||
e_str = ''.join(traceback.format_exception_only(type(e), e))
|
||||
self.logger.debug('gssapi auth failed: %s', e_str)
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
|
||||
if not krbV:
|
||||
raise PythonImportError(
|
||||
|
|
@ -2224,7 +2225,7 @@ class ClientSession(object):
|
|||
# else
|
||||
return host
|
||||
|
||||
def gssapi_login(self, proxyuser=None):
|
||||
def gssapi_login(self, principal=None, keytab=None, ccache=None, proxyuser=None):
|
||||
if not HTTPKerberosAuth:
|
||||
raise PythonImportError(
|
||||
"Please install python-requests-kerberos to use GSSAPI."
|
||||
|
|
@ -2241,21 +2242,38 @@ class ClientSession(object):
|
|||
|
||||
# 60 second timeout during login
|
||||
sinfo = None
|
||||
old_env = {}
|
||||
old_opts = self.opts
|
||||
self.opts = old_opts.copy()
|
||||
self.opts['timeout'] = 60
|
||||
self.opts['auth'] = HTTPKerberosAuth()
|
||||
try:
|
||||
self.opts['timeout'] = 60
|
||||
kwargs = {}
|
||||
if keytab:
|
||||
old_env['KRB5_CLIENT_KTNAME'] = os.environ.get('KRB5_CLIENT_KTNAME')
|
||||
os.environ['KRB5_CLIENT_KTNAME'] = keytab
|
||||
if ccache:
|
||||
old_env['KRB5CCNAME'] = os.environ.get('KRB5CCNAME')
|
||||
os.environ['KRB5CCNAME'] = ccache
|
||||
if principal:
|
||||
kwargs['principal'] = principal
|
||||
self.opts['auth'] = HTTPKerberosAuth(**kwargs)
|
||||
try:
|
||||
# Depending on the server configuration, we might not be able to
|
||||
# connect without client certificate, which means that the conn
|
||||
# will fail with a handshake failure, which is retried by default.
|
||||
sinfo = self._callMethod('sslLogin', [proxyuser], retry=False)
|
||||
except:
|
||||
except Exception as e:
|
||||
e_str = ''.join(traceback.format_exception_only(type(e), e))
|
||||
self.logger.debug('gssapi auth failed: %s', e_str)
|
||||
# Auth with https didn't work. Restore for the next attempt.
|
||||
self.baseurl = old_baseurl
|
||||
finally:
|
||||
self.opts = old_opts
|
||||
for key in old_env:
|
||||
if old_env[key] is None:
|
||||
del os.environ[key]
|
||||
else:
|
||||
os.environ[key] = old_env[key]
|
||||
if not sinfo:
|
||||
raise AuthError('unable to obtain a session')
|
||||
|
||||
|
|
|
|||
67
tests/test_lib/test_gssapi.py
Normal file
67
tests/test_lib/test_gssapi.py
Normal file
|
|
@ -0,0 +1,67 @@
|
|||
from __future__ import absolute_import
|
||||
import mock
|
||||
import os
|
||||
import unittest
|
||||
|
||||
import koji
|
||||
|
||||
|
||||
class TestGSSAPI(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.session = koji.ClientSession('https://koji.example.com/kojihub', {})
|
||||
self.session._callMethod = mock.MagicMock(name='_callMethod')
|
||||
|
||||
def tearDown(self):
|
||||
mock.patch.stopall()
|
||||
|
||||
maxDiff = None
|
||||
|
||||
@mock.patch('koji.HTTPKerberosAuth', new=None)
|
||||
def test_gssapi_disabled(self):
|
||||
with self.assertRaises(ImportError):
|
||||
self.session.gssapi_login()
|
||||
|
||||
def test_gssapi_login(self):
|
||||
old_environ = dict(**os.environ)
|
||||
self.session.gssapi_login()
|
||||
self.session._callMethod.assert_called_once_with('sslLogin', [None],
|
||||
retry=False)
|
||||
self.assertEqual(old_environ, dict(**os.environ))
|
||||
|
||||
def test_gssapi_login_keytab(self):
|
||||
principal = 'user@EXAMPLE.COM'
|
||||
keytab = '/path/to/keytab'
|
||||
ccache = '/path/to/cache'
|
||||
old_environ = dict(**os.environ)
|
||||
self.session.gssapi_login(principal, keytab, ccache)
|
||||
self.session._callMethod.assert_called_once_with('sslLogin', [None],
|
||||
retry=False)
|
||||
self.assertEqual(old_environ, dict(**os.environ))
|
||||
|
||||
def test_gssapi_login_error(self):
|
||||
old_environ = dict(**os.environ)
|
||||
self.session._callMethod.side_effect = Exception('login failed')
|
||||
with self.assertRaises(koji.AuthError):
|
||||
self.session.gssapi_login()
|
||||
self.session._callMethod.assert_called_once_with('sslLogin', [None],
|
||||
retry=False)
|
||||
self.assertEqual(old_environ, dict(**os.environ))
|
||||
|
||||
def test_gssapi_login_http(self):
|
||||
old_environ = dict(**os.environ)
|
||||
url1 = 'http://koji.example.com/kojihub'
|
||||
url2 = 'https://koji.example.com/kojihub'
|
||||
|
||||
# successful gssapi auth should force https
|
||||
self.session.baseurl = url1
|
||||
self.session.gssapi_login()
|
||||
self.assertEqual(self.session.baseurl, url2)
|
||||
|
||||
# failed gssapi auth should leave the url alone
|
||||
self.session.baseurl = url1
|
||||
self.session._callMethod.side_effect = Exception('login failed')
|
||||
with self.assertRaises(koji.AuthError):
|
||||
self.session.gssapi_login()
|
||||
self.assertEqual(self.session.baseurl, url1)
|
||||
self.assertEqual(old_environ, dict(**os.environ))
|
||||
Loading…
Add table
Add a link
Reference in a new issue