PR#753: check python-requests-kerberos version before gssapi login

Merges #753
https://pagure.io/koji/pull-request/753

Fixes: #747
https://pagure.io/koji/issue/747
gssapi keytab login fails on rhel7
This commit is contained in:
Mike McLean 2018-02-07 14:34:37 +01:00
commit becc88cba4
3 changed files with 86 additions and 22 deletions

View file

@ -50,7 +50,7 @@ from koji.util import md5_constructor
SSL_Error = None
try:
from OpenSSL.SSL import Error as SSL_Error
except Exception: #pragma: no cover
except Exception: # pragma: no cover
# the hub imports koji, and sometimes this import fails there
# see: https://cryptography.io/en/latest/faq/#starting-cryptography-using-mod-wsgi-produces-an-internalerror-during-a-call-in-register-osrandom-engine
# unfortunately the workaround at the above link does not always work, so
@ -64,9 +64,9 @@ import random
import re
import requests
try:
from requests_kerberos import HTTPKerberosAuth
except ImportError: #pragma: no cover
HTTPKerberosAuth = None
import requests_kerberos
except ImportError: # pragma: no cover
requests_kerberos = None
import rpm
import shutil
import signal
@ -2145,7 +2145,7 @@ class ClientSession(object):
sprinc = krbV.Principal(name=self._serverPrincipal(cprinc), context=ctx)
ac = krbV.AuthContext(context=ctx)
ac.flags = krbV.KRB5_AUTH_CONTEXT_DO_SEQUENCE|krbV.KRB5_AUTH_CONTEXT_DO_TIME
ac.flags = krbV.KRB5_AUTH_CONTEXT_DO_SEQUENCE | krbV.KRB5_AUTH_CONTEXT_DO_TIME
ac.rcache = ctx.default_rcache()
# create and encode the authentication request
@ -2156,7 +2156,6 @@ class ClientSession(object):
# ask the server to authenticate us
(rep_enc, sinfo_enc, addrinfo) = self.callMethod('krbLogin', req_enc, proxyuser)
# Set the addrinfo we received from the server
# (necessary before calling rd_priv())
# addrinfo is in (serveraddr, serverport, clientaddr, clientport)
@ -2206,7 +2205,7 @@ class ClientSession(object):
return host
def gssapi_login(self, principal=None, keytab=None, ccache=None, proxyuser=None):
if not HTTPKerberosAuth:
if not requests_kerberos:
raise PythonImportError(
"Please install python-requests-kerberos to use GSSAPI."
)
@ -2234,8 +2233,14 @@ class ClientSession(object):
old_env['KRB5CCNAME'] = os.environ.get('KRB5CCNAME')
os.environ['KRB5CCNAME'] = ccache
if principal:
kwargs['principal'] = principal
self.opts['auth'] = HTTPKerberosAuth(**kwargs)
if re.match(r'0[.][1-8]\b', requests_kerberos.__version__):
raise PythonImportError(
'python-requests-kerberos >= 0.9.0 required for '
'keytab auth'
)
else:
kwargs['principal'] = principal
self.opts['auth'] = requests_kerberos.HTTPKerberosAuth(**kwargs)
try:
# Depending on the server configuration, we might not be able to
# connect without client certificate, which means that the conn

View file

@ -1,15 +1,17 @@
from __future__ import absolute_import
import mock
import os
import unittest
import mock
import koji
class TestGSSAPI(unittest.TestCase):
def setUp(self):
self.session = koji.ClientSession('https://koji.example.com/kojihub', {})
self.session = koji.ClientSession('https://koji.example.com/kojihub',
{})
self.session._callMethod = mock.MagicMock(name='_callMethod')
def tearDown(self):
@ -17,7 +19,7 @@ class TestGSSAPI(unittest.TestCase):
maxDiff = None
@mock.patch('koji.HTTPKerberosAuth', new=None)
@mock.patch('koji.requests_kerberos', new=None)
def test_gssapi_disabled(self):
with self.assertRaises(ImportError):
self.session.gssapi_login()
@ -26,18 +28,57 @@ class TestGSSAPI(unittest.TestCase):
old_environ = dict(**os.environ)
self.session.gssapi_login()
self.session._callMethod.assert_called_once_with('sslLogin', [None],
retry=False)
retry=False)
self.assertEqual(old_environ, dict(**os.environ))
def test_gssapi_login_keytab(self):
@mock.patch('requests_kerberos.HTTPKerberosAuth')
def test_gssapi_login_keytab(self, HTTPKerberosAuth_mock):
principal = 'user@EXAMPLE.COM'
keytab = '/path/to/keytab'
ccache = '/path/to/cache'
old_environ = dict(**os.environ)
self.session.gssapi_login(principal, keytab, ccache)
self.session._callMethod.assert_called_once_with('sslLogin', [None],
retry=False)
self.assertEqual(old_environ, dict(**os.environ))
current_version = koji.requests_kerberos.__version__
accepted_versions = ['0.12.0.beta1',
'0.12.0dev',
'0.12.0a1',
'0.11.0',
'0.10.0',
'0.9.0']
for accepted_version in accepted_versions:
koji.requests_kerberos.__version__ = accepted_version
rv = self.session.gssapi_login(principal, keytab, ccache)
self.session._callMethod.assert_called_once_with('sslLogin',
[None],
retry=False)
self.assertEqual(old_environ, dict(**os.environ))
self.assertTrue(rv)
self.session._callMethod.reset_mock()
koji.requests_kerberos.__version__ = current_version
def test_gssapi_login_keytab_unsupported_requests_kerberos_version(self):
principal = 'user@EXAMPLE.COM'
keytab = '/path/to/keytab'
ccache = '/path/to/cache'
old_environ = dict(**os.environ)
current_version = koji.requests_kerberos.__version__
old_versions = ['0.8.0',
'0.7.0',
'0.6.1',
'0.6',
'0.5',
'0.3',
'0.2',
'0.1']
for old_version in old_versions:
koji.requests_kerberos.__version__ = old_version
with self.assertRaises(koji.PythonImportError) as cm:
self.session.gssapi_login(principal, keytab, ccache)
self.assertEqual(cm.exception.args[0],
'python-requests-kerberos >= 0.9.0 required for '
'keytab auth')
self.session._callMethod.assert_not_called()
self.assertEqual(old_environ, dict(**os.environ))
koji.requests_kerberos.__version__ = current_version
def test_gssapi_login_error(self):
old_environ = dict(**os.environ)
@ -45,7 +86,7 @@ class TestGSSAPI(unittest.TestCase):
with self.assertRaises(koji.AuthError):
self.session.gssapi_login()
self.session._callMethod.assert_called_once_with('sslLogin', [None],
retry=False)
retry=False)
self.assertEqual(old_environ, dict(**os.environ))
def test_gssapi_login_http(self):

View file

@ -1,4 +1,7 @@
from __future__ import absolute_import
import base64
import six
import unittest
# This is python-mock, not the rpm mock tool we know and love
@ -8,12 +11,27 @@ import koji
class KrbVTestCase(unittest.TestCase):
@mock.patch('koji.krbV', new=None)
@mock.patch('koji.HTTPKerberosAuth', new=None)
@mock.patch('koji.requests_kerberos', new=None)
def test_krbv_disabled(self):
"""Test that when krbV and gssapi are absent, we behave rationally"""
self.assertEquals(koji.krbV, None)
session = koji.ClientSession('whatever')
with self.assertRaises(ImportError):
session.krb_login()
@mock.patch('koji.krbV', create=True)
@mock.patch('requests_kerberos.__version__', new='0.7.0')
@mock.patch('koji.ClientSession._serverPrincipal')
def test_krbv_old_requests_kerberos(self, _serverPrincipal_mock, krbV_mock):
self.assertIsNotNone(koji.krbV)
ctx = koji.krbV.default_context.return_value
ctx.mk_req = mock.MagicMock()
ac = mock.MagicMock()
ctx.mk_req.return_value = (ac, six.b('req'))
ac.rd_priv = mock.MagicMock(return_value='session-id session-key')
session = koji.ClientSession('whatever')
session._callMethod = mock.MagicMock(
return_value=(base64.encodestring(six.b('a')), base64.encodestring(six.b('b')), [0, 1, 2, 3]))
rv = session.krb_login(principal='any@SOMEWHERE.COM', keytab='/path/to/keytab')
self.assertTrue(rv)