diff --git a/koji/__init__.py b/koji/__init__.py index a6147e2e..8ff969a1 100644 --- a/koji/__init__.py +++ b/koji/__init__.py @@ -50,7 +50,7 @@ from koji.util import md5_constructor SSL_Error = None try: from OpenSSL.SSL import Error as SSL_Error -except Exception: #pragma: no cover +except Exception: # pragma: no cover # the hub imports koji, and sometimes this import fails there # see: https://cryptography.io/en/latest/faq/#starting-cryptography-using-mod-wsgi-produces-an-internalerror-during-a-call-in-register-osrandom-engine # unfortunately the workaround at the above link does not always work, so @@ -64,9 +64,9 @@ import random import re import requests try: - from requests_kerberos import HTTPKerberosAuth -except ImportError: #pragma: no cover - HTTPKerberosAuth = None + import requests_kerberos +except ImportError: # pragma: no cover + requests_kerberos = None import rpm import shutil import signal @@ -2145,7 +2145,7 @@ class ClientSession(object): sprinc = krbV.Principal(name=self._serverPrincipal(cprinc), context=ctx) ac = krbV.AuthContext(context=ctx) - ac.flags = krbV.KRB5_AUTH_CONTEXT_DO_SEQUENCE|krbV.KRB5_AUTH_CONTEXT_DO_TIME + ac.flags = krbV.KRB5_AUTH_CONTEXT_DO_SEQUENCE | krbV.KRB5_AUTH_CONTEXT_DO_TIME ac.rcache = ctx.default_rcache() # create and encode the authentication request @@ -2156,7 +2156,6 @@ class ClientSession(object): # ask the server to authenticate us (rep_enc, sinfo_enc, addrinfo) = self.callMethod('krbLogin', req_enc, proxyuser) - # Set the addrinfo we received from the server # (necessary before calling rd_priv()) # addrinfo is in (serveraddr, serverport, clientaddr, clientport) @@ -2206,7 +2205,7 @@ class ClientSession(object): return host def gssapi_login(self, principal=None, keytab=None, ccache=None, proxyuser=None): - if not HTTPKerberosAuth: + if not requests_kerberos: raise PythonImportError( "Please install python-requests-kerberos to use GSSAPI." ) @@ -2234,8 +2233,14 @@ class ClientSession(object): old_env['KRB5CCNAME'] = os.environ.get('KRB5CCNAME') os.environ['KRB5CCNAME'] = ccache if principal: - kwargs['principal'] = principal - self.opts['auth'] = HTTPKerberosAuth(**kwargs) + if re.match(r'0[.][1-8]\b', requests_kerberos.__version__): + raise PythonImportError( + 'python-requests-kerberos >= 0.9.0 required for ' + 'keytab auth' + ) + else: + kwargs['principal'] = principal + self.opts['auth'] = requests_kerberos.HTTPKerberosAuth(**kwargs) try: # Depending on the server configuration, we might not be able to # connect without client certificate, which means that the conn diff --git a/tests/test_lib/test_gssapi.py b/tests/test_lib/test_gssapi.py index 4927f969..ebdefe65 100644 --- a/tests/test_lib/test_gssapi.py +++ b/tests/test_lib/test_gssapi.py @@ -1,15 +1,17 @@ from __future__ import absolute_import -import mock + import os import unittest +import mock + import koji class TestGSSAPI(unittest.TestCase): - def setUp(self): - self.session = koji.ClientSession('https://koji.example.com/kojihub', {}) + self.session = koji.ClientSession('https://koji.example.com/kojihub', + {}) self.session._callMethod = mock.MagicMock(name='_callMethod') def tearDown(self): @@ -17,7 +19,7 @@ class TestGSSAPI(unittest.TestCase): maxDiff = None - @mock.patch('koji.HTTPKerberosAuth', new=None) + @mock.patch('koji.requests_kerberos', new=None) def test_gssapi_disabled(self): with self.assertRaises(ImportError): self.session.gssapi_login() @@ -26,18 +28,57 @@ class TestGSSAPI(unittest.TestCase): old_environ = dict(**os.environ) self.session.gssapi_login() self.session._callMethod.assert_called_once_with('sslLogin', [None], - retry=False) + retry=False) self.assertEqual(old_environ, dict(**os.environ)) - def test_gssapi_login_keytab(self): + @mock.patch('requests_kerberos.HTTPKerberosAuth') + def test_gssapi_login_keytab(self, HTTPKerberosAuth_mock): principal = 'user@EXAMPLE.COM' keytab = '/path/to/keytab' ccache = '/path/to/cache' old_environ = dict(**os.environ) - self.session.gssapi_login(principal, keytab, ccache) - self.session._callMethod.assert_called_once_with('sslLogin', [None], - retry=False) - self.assertEqual(old_environ, dict(**os.environ)) + current_version = koji.requests_kerberos.__version__ + accepted_versions = ['0.12.0.beta1', + '0.12.0dev', + '0.12.0a1', + '0.11.0', + '0.10.0', + '0.9.0'] + for accepted_version in accepted_versions: + koji.requests_kerberos.__version__ = accepted_version + rv = self.session.gssapi_login(principal, keytab, ccache) + self.session._callMethod.assert_called_once_with('sslLogin', + [None], + retry=False) + self.assertEqual(old_environ, dict(**os.environ)) + self.assertTrue(rv) + self.session._callMethod.reset_mock() + koji.requests_kerberos.__version__ = current_version + + def test_gssapi_login_keytab_unsupported_requests_kerberos_version(self): + principal = 'user@EXAMPLE.COM' + keytab = '/path/to/keytab' + ccache = '/path/to/cache' + old_environ = dict(**os.environ) + current_version = koji.requests_kerberos.__version__ + old_versions = ['0.8.0', + '0.7.0', + '0.6.1', + '0.6', + '0.5', + '0.3', + '0.2', + '0.1'] + for old_version in old_versions: + koji.requests_kerberos.__version__ = old_version + with self.assertRaises(koji.PythonImportError) as cm: + self.session.gssapi_login(principal, keytab, ccache) + self.assertEqual(cm.exception.args[0], + 'python-requests-kerberos >= 0.9.0 required for ' + 'keytab auth') + self.session._callMethod.assert_not_called() + self.assertEqual(old_environ, dict(**os.environ)) + koji.requests_kerberos.__version__ = current_version def test_gssapi_login_error(self): old_environ = dict(**os.environ) @@ -45,7 +86,7 @@ class TestGSSAPI(unittest.TestCase): with self.assertRaises(koji.AuthError): self.session.gssapi_login() self.session._callMethod.assert_called_once_with('sslLogin', [None], - retry=False) + retry=False) self.assertEqual(old_environ, dict(**os.environ)) def test_gssapi_login_http(self): diff --git a/tests/test_lib_py2only/test_krbv.py b/tests/test_lib_py2only/test_krbv.py index 13e827ed..d5fa4ee7 100644 --- a/tests/test_lib_py2only/test_krbv.py +++ b/tests/test_lib_py2only/test_krbv.py @@ -1,4 +1,7 @@ from __future__ import absolute_import + +import base64 +import six import unittest # This is python-mock, not the rpm mock tool we know and love @@ -8,12 +11,27 @@ import koji class KrbVTestCase(unittest.TestCase): - @mock.patch('koji.krbV', new=None) - @mock.patch('koji.HTTPKerberosAuth', new=None) + @mock.patch('koji.requests_kerberos', new=None) def test_krbv_disabled(self): """Test that when krbV and gssapi are absent, we behave rationally""" self.assertEquals(koji.krbV, None) session = koji.ClientSession('whatever') with self.assertRaises(ImportError): session.krb_login() + + @mock.patch('koji.krbV', create=True) + @mock.patch('requests_kerberos.__version__', new='0.7.0') + @mock.patch('koji.ClientSession._serverPrincipal') + def test_krbv_old_requests_kerberos(self, _serverPrincipal_mock, krbV_mock): + self.assertIsNotNone(koji.krbV) + ctx = koji.krbV.default_context.return_value + ctx.mk_req = mock.MagicMock() + ac = mock.MagicMock() + ctx.mk_req.return_value = (ac, six.b('req')) + ac.rd_priv = mock.MagicMock(return_value='session-id session-key') + session = koji.ClientSession('whatever') + session._callMethod = mock.MagicMock( + return_value=(base64.encodestring(six.b('a')), base64.encodestring(six.b('b')), [0, 1, 2, 3])) + rv = session.krb_login(principal='any@SOMEWHERE.COM', keytab='/path/to/keytab') + self.assertTrue(rv)