some tests for koji.auth

This commit is contained in:
Tomas Kopecek 2017-10-04 13:41:04 +02:00 committed by Mike McLean
parent 299e0cda67
commit c86f41ebcf
2 changed files with 193 additions and 2 deletions

View file

@ -103,7 +103,8 @@ class Session(object):
'EXTRACT(EPOCH FROM update_time)': 'update_ts',
'user_id': 'user_id',
}
fields, aliases = list(zip(*list(fields.items())))
# sort for stability (unittests)
fields, aliases = list(zip(*list(sorted(fields.items(), key=lambda x: x[1]))))
q = """
SELECT %s FROM sessions
WHERE id = %%(id)i
@ -737,7 +738,7 @@ def sharedSession():
return context.session.makeShared()
if __name__ == '__main__':
if __name__ == '__main__': # pragma: no cover
# XXX - testing defaults
import db
db.setDBopts(database="test", user="test")

View file

@ -0,0 +1,190 @@
from __future__ import absolute_import
import mock
import unittest
import koji
import koji.auth
class TestAuthSession(unittest.TestCase):
def test_instance(self):
"""Simple auth.Session instance"""
s = koji.auth.Session()
# no args in request/environment
self.assertEqual(s.message, 'no session args')
@mock.patch('koji.auth.context')
def get_session(self, context):
"""auth.Session instance"""
# base session from test_basic_instance
context.environ = {
'QUERY_STRING': 'session-id=123&session-key=xyz&callnum=345',
'REMOTE_ADDR': 'remote-addr',
}
cursor = mock.MagicMock(name='cursor')
context.cnx.cursor.return_value = cursor
cursor.fetchone.side_effect = [
# get session
[koji.AUTHTYPE_NORMAL, 344, False, False, 'master', 'start_time', 'start_ts', 'update_time', 'update_ts', 'user_id'],
# get user
['name', koji.USER_STATUS['NORMAL'], koji.USERTYPES['NORMAL']],
# get excl.session
None,
# upd. timestamp
None,
# upd callnum
None,
]
s = koji.auth.Session()
return s, context, cursor
@mock.patch('koji.auth.context')
def test_basic_instance(self, context):
"""auth.Session instance"""
s, cntext, cursor = self.get_session()
context.cnx = cntext.cnx
self.assertEqual(s.id, 123)
self.assertEqual(s.key, 'xyz')
self.assertEqual(s.hostip, 'remote-addr')
self.assertEqual(s.callnum, 345)
self.assertEqual(s.user_id, 'user_id')
self.assertEqual(s.authtype, koji.AUTHTYPE_NORMAL)
self.assertEqual(s.master, 'master')
self.assertTrue(s.logged_in)
# 5 SQL calls: get session, get user, get excl. session,
# update timestamp, update callnum
self.assertEqual(cursor.execute.call_count, 5)
@mock.patch('koji.auth.context')
def test_getattr(self, context):
"""auth.Session instance"""
s, cntext, cursor = self.get_session()
context.cnx = cntext.cnx
# test
self.assertEqual(s.perms, {})
self.assertEqual(s.groups, {})
self.assertEqual(s.host_id, None)
# all other names should raise error
with self.assertRaises(AttributeError):
s.non_existing_attribute
@mock.patch('koji.auth.context')
def test_str(self, context):
"""auth.Session string representation"""
s, cntext, cursor = self.get_session()
context.cnx = cntext.cnx
s.logged_in = False
s.message = 'msg'
self.assertEqual(str(s), 'session: not logged in (msg)')
s.logged_in = True
self.assertNotEqual(str(s), 'session: not logged in')
@mock.patch('koji.auth.context')
def test_validate(self, context):
"""Session.validate"""
s, cntext, cursor = self.get_session()
context.cnx = cntext.cnx
s.lockerror = True
with self.assertRaises(koji.AuthLockError):
s.validate()
s.lockerror = False
self.assertTrue(s.validate())
@mock.patch('koji.auth.context')
def test_makeShared(self, context):
"""Session.makeShared"""
s, cntext, cursor = self.get_session()
context.cnx = cntext.cnx
s.makeShared()
c = cursor.execute.call_args[0]
self.assertEqual(c[0], 'UPDATE sessions SET "exclusive"=NULL WHERE id=%(session_id)s')
self.assertEqual(c[1]['session_id'], 123)
@mock.patch('socket.gethostbyname')
@mock.patch('koji.auth.context')
def test_get_remote_ip(self, context, gethostbyname):
"""Session.get_remote_ip"""
s, cntext, cursor = self.get_session()
context.opts = {'CheckClientIP': False}
self.assertEqual(s.get_remote_ip(), '-')
context.opts = {'CheckClientIP': True}
self.assertEqual(s.get_remote_ip(override='xoverride'), 'xoverride')
context.environ = {'REMOTE_ADDR': '123.123.123.123'}
self.assertEqual(s.get_remote_ip(), '123.123.123.123')
gethostbyname.return_value = 'ip'
context.environ = {'REMOTE_ADDR': '127.0.0.1'}
self.assertEqual(s.get_remote_ip(), 'ip')
@mock.patch('koji.auth.context')
def test_login(self, context):
s, cntext, cursor = self.get_session()
context.cnx = cntext.cnx
# already logged in
with self.assertRaises(koji.GenericError):
s.login('user', 'password')
s.logged_in = False
with self.assertRaises(koji.AuthError):
s.login('user', 123)
with self.assertRaises(koji.AuthError):
s.login('user', '')
# correct
s.get_remote_ip = mock.MagicMock()
s.get_remote_ip.return_value = 'hostip'
s.checkLoginAllowed = mock.MagicMock()
s.checkLoginAllowed.return_value = True
s.createSession = mock.MagicMock()
s.createSession.return_value = {'session-id': 'session-id'}
cursor.fetchone = mock.MagicMock()
cursor.fetchone.return_value = ['user_id']
result = s.login('user', 'password')
self.assertEqual(s.get_remote_ip.call_count, 1)
self.assertEqual(s.checkLoginAllowed.call_args, mock.call('user_id'))
self.assertEqual(result, s.createSession.return_value)
# one more try for non-existing user
cursor.fetchone.return_value = None
with self.assertRaises(koji.AuthError):
s.login('user', 'password')
@mock.patch('koji.auth.context')
def test_krbLogin(self, context):
# TODO
s, cntext, cursor = self.get_session()
context.cnx = cntext.cnx
with self.assertRaises(koji.AuthError):
s.krbLogin('krb_req', 'proxyuser')
s.logged_in = False
with self.assertRaises(TypeError):
s.krbLogin('krb_req', 'proxyuser')
# functions outside Session object
@mock.patch('koji.auth.context')
def test_get_user_data(self, context):
"""koji.auth.get_user_data"""
cursor = mock.MagicMock(name='cursor')
context.cnx.cursor.return_value = cursor
cursor.fetchone.return_value = ['name', 'status', 'usertype']
self.assertEqual(sorted(koji.auth.get_user_data(1).items()),
sorted({'name': 'name', 'status': 'status', 'usertype': 'usertype'}.items()))
cursor.fetchone.return_value = None
self.assertEqual(koji.auth.get_user_data(1), None)