Move db/auth to kojihub module

Related: https://pagure.io/koji/issue/3666
This commit is contained in:
Tomas Kopecek 2023-02-08 15:40:09 +01:00
parent 5ecc788603
commit b0e91e378c
38 changed files with 128 additions and 140 deletions

View file

@ -5,12 +5,11 @@ import mock
import unittest
import koji
import koji.auth
import koji.db
import datetime
import kojihub.auth
UP = koji.auth.UpdateProcessor
QP = koji.auth.QueryProcessor
UP = kojihub.auth.UpdateProcessor
QP = kojihub.auth.QueryProcessor
class TestAuthSession(unittest.TestCase):
@ -29,42 +28,41 @@ class TestAuthSession(unittest.TestCase):
return query
def setUp(self):
self.context = mock.patch('kojihub.context').start()
self.UpdateProcessor = mock.patch('koji.auth.UpdateProcessor',
self.context = mock.patch('kojihub.auth.context').start()
kojihub.db.context = self.context
self.UpdateProcessor = mock.patch('kojihub.auth.UpdateProcessor',
side_effect=self.getUpdate).start()
self.updates = []
self.query_execute = mock.MagicMock()
self.query_executeOne = mock.MagicMock()
self.query_singleValue = mock.MagicMock()
self.QueryProcessor = mock.patch('koji.auth.QueryProcessor',
self.QueryProcessor = mock.patch('kojihub.auth.QueryProcessor',
side_effect=self.getQuery).start()
self.queries = []
# It seems MagicMock will not automatically handle attributes that
# start with "assert"
self.context.session.assertLogin = mock.MagicMock()
@mock.patch('koji.auth.context')
def test_instance(self, context):
"""Simple auth.Session instance"""
context.opts = {
def test_instance(self):
"""Simple kojihub.auth.Session instance"""
self.context.opts = {
'CheckClientIP': True,
'DisableURLSessions': False,
}
with self.assertRaises(koji.GenericError) as cm:
koji.auth.Session()
kojihub.auth.Session()
# no args in request/environment
self.assertEqual(cm.exception.args[0], "'session-id' not specified in session args")
@mock.patch('koji.auth.context')
def get_session_old(self, context):
def get_session_old(self):
"""auth.Session instance"""
# base session from test_basic_instance
# url-based auth - will be dropped in 1.34
context.opts = {
# url-based kojihub.auth - will be dropped in 1.34
self.context.opts = {
'CheckClientIP': True,
'DisableURLSessions': False,
}
context.environ = {
self.context.environ = {
'QUERY_STRING': 'session-id=123&session-key=xyz&callnum=345',
'REMOTE_ADDR': 'remote-addr',
}
@ -80,18 +78,17 @@ class TestAuthSession(unittest.TestCase):
'user_id': 1},
{'name': 'kojiadmin', 'status': 0, 'usertype': 0}]
self.query_singleValue.return_value = 123
s = koji.auth.Session()
return s, context
s = kojihub.auth.Session()
return s, self.context
@mock.patch('koji.auth.context')
def get_session(self, context):
def get_session(self):
# base session from test_basic_instance
# header-based auth
context.opts = {
self.context.opts = {
'CheckClientIP': True,
'DisableURLSessions': True,
}
context.environ = {
self.context.environ = {
'HTTP_KOJI_SESSION_ID': '123',
'HTTP_KOJI_SESSION_KEY': 'xyz',
'HTTP_KOJI_CALLNUM': '345',
@ -109,8 +106,8 @@ class TestAuthSession(unittest.TestCase):
'user_id': 1},
{'name': 'kojiadmin', 'status': 0, 'usertype': 0}]
self.query_singleValue.return_value = 123
s = koji.auth.Session()
return s, context
s = kojihub.auth.Session()
return s, self.context
def test_session_old(self):
self.get_session_old()
@ -220,7 +217,7 @@ class TestAuthSession(unittest.TestCase):
with self.assertRaises(AttributeError):
s.non_existing_attribute
@mock.patch('koji.auth.context')
@mock.patch('auth.context')
def test_str(self, context):
"""auth.Session string representation"""
s, cntext = self.get_session()
@ -232,11 +229,10 @@ class TestAuthSession(unittest.TestCase):
s.logged_in = True
self.assertNotEqual(str(s), 'session: not logged in')
@mock.patch('koji.auth.context')
def test_validate(self, context):
def test_validate(self):
"""Session.validate"""
s, cntext = self.get_session()
context.cnx = cntext.cnx
self.context.cnx = cntext.cnx
s.lockerror = True
with self.assertRaises(koji.AuthLockError):
@ -245,8 +241,7 @@ class TestAuthSession(unittest.TestCase):
s.lockerror = False
self.assertTrue(s.validate())
@mock.patch('koji.auth.context')
def test_makeShared(self, context):
def test_makeShared(self):
"""Session.makeShared"""
s, _ = self.get_session()
s.makeShared()
@ -264,25 +259,24 @@ class TestAuthSession(unittest.TestCase):
# all queries are tested in test_basic_instance
@mock.patch('socket.gethostbyname')
@mock.patch('koji.auth.context')
def test_get_remote_ip(self, context, gethostbyname):
def test_get_remote_ip(self, gethostbyname):
"""Session.get_remote_ip"""
s, cntext = self.get_session()
s, _ = self.get_session()
context.opts = {'CheckClientIP': False}
self.context.opts = {'CheckClientIP': False}
self.assertEqual(s.get_remote_ip(), '-')
context.opts = {'CheckClientIP': True}
self.context.opts = {'CheckClientIP': True}
self.assertEqual(s.get_remote_ip(override='xoverride'), 'xoverride')
context.environ = {'REMOTE_ADDR': '123.123.123.123'}
self.context.environ = {'REMOTE_ADDR': '123.123.123.123'}
self.assertEqual(s.get_remote_ip(), '123.123.123.123')
gethostbyname.return_value = 'ip'
context.environ = {'REMOTE_ADDR': '127.0.0.1'}
self.context.environ = {'REMOTE_ADDR': '127.0.0.1'}
self.assertEqual(s.get_remote_ip(), 'ip')
@mock.patch('koji.auth.context')
@mock.patch('auth.context')
def test_login(self, context):
s, cntext = self.get_session()
@ -329,13 +323,12 @@ class TestAuthSession(unittest.TestCase):
with self.assertRaises(koji.AuthError):
s.login('user', 'password')
@mock.patch('koji.auth.context')
def test_checkKrbPrincipal(self, context):
def test_checkKrbPrincipal(self):
s, cntext = self.get_session()
self.assertIsNone(s.checkKrbPrincipal(None))
context.opts = {'AllowedKrbRealms': '*'}
self.context.opts = {'AllowedKrbRealms': '*'}
self.assertIsNone(s.checkKrbPrincipal('any'))
context.opts = {'AllowedKrbRealms': 'example.com'}
self.context.opts = {'AllowedKrbRealms': 'example.com'}
with self.assertRaises(koji.AuthError) as cm:
s.checkKrbPrincipal('any')
self.assertEqual(cm.exception.args[0],
@ -350,8 +343,7 @@ class TestAuthSession(unittest.TestCase):
"Kerberos principal's realm:"
" bannedrealm is not allowed")
self.assertIsNone(s.checkKrbPrincipal('user@example.com'))
context.opts = {'AllowedKrbRealms': 'example.com,example.net'
' , example.org'}
self.context.opts = {'AllowedKrbRealms': 'example.com,example.net,example.org'}
self.assertIsNone(s.checkKrbPrincipal('user@example.net'))
def test_getUserIdFromKerberos(self):
@ -420,7 +412,7 @@ class TestAuthSession(unittest.TestCase):
s.logout()
self.assertEqual(cm.exception.args[0], 'Not logged in')
@mock.patch('koji.auth.context')
@mock.patch('auth.context')
def test_logout_logged(self, context):
s, cntext = self.get_session()
s.logged_in = True
@ -446,7 +438,7 @@ class TestAuthSession(unittest.TestCase):
s.logoutChild(111)
self.assertEqual(cm.exception.args[0], 'Not logged in')
@mock.patch('koji.auth.context')
@mock.patch('auth.context')
def test_logoutChild_logged(self, context):
s, cntext = self.get_session()
s.logged_in = True
@ -493,7 +485,7 @@ class TestAuthSession(unittest.TestCase):
self.assertEqual(len(self.queries), 5)
self.assertEqual(len(self.updates), 2)
@mock.patch('koji.auth.context')
@mock.patch('auth.context')
def test_makeExclusive(self, context):
s, cntext = self.get_session()
s.master = None
@ -635,13 +627,13 @@ class TestAuthSession(unittest.TestCase):
# functions outside Session object
def test_get_user_data(self):
"""koji.auth.get_user_data"""
"""auth.get_user_data"""
self.query_executeOne.return_value = None
self.assertEqual(len(self.queries), 0)
self.query_executeOne.return_value = {'name': 'name', 'status': 'status',
'usertype': 'usertype'}
koji.auth.get_user_data(1)
kojihub.auth.get_user_data(1)
self.assertEqual(len(self.queries), 1)
query = self.queries[0]
self.assertEqual(query.tables, ['users'])
@ -650,8 +642,8 @@ class TestAuthSession(unittest.TestCase):
self.assertEqual(query.columns, ['name', 'status', 'usertype'])
def test_get_user_groups(self):
"""koji.auth.get_user_groups"""
koji.auth.get_user_groups(1)
"""auth.get_user_groups"""
kojihub.auth.get_user_groups(1)
self.assertEqual(len(self.queries), 1)
query = self.queries[0]
self.assertEqual(query.tables, ['user_groups'])
@ -661,8 +653,8 @@ class TestAuthSession(unittest.TestCase):
self.assertEqual(query.columns, ['group_id', 'name'])
def test_get_user_perms(self):
"""koji.auth.get_user_perms"""
koji.auth.get_user_perms(1)
"""auth.get_user_perms"""
kojihub.auth.get_user_perms(1)
self.assertEqual(len(self.queries), 1)
query = self.queries[0]
self.assertEqual(query.tables, ['user_perms'])
@ -670,14 +662,13 @@ class TestAuthSession(unittest.TestCase):
self.assertEqual(query.clauses, ['active = TRUE', 'user_id=%(user_id)s'])
self.assertEqual(query.columns, ['name'])
@mock.patch('koji.auth.context')
def test_logout_logged_not_owner(self, context):
s, cntext = self.get_session()
def test_logout_logged_not_owner(self):
s, _ = self.get_session()
s.logged_in = True
# session_id without admin perms and not owner
context.session.hasPerm.return_value = False
context.session.user_id.return_value = 123
self.context.session.hasPerm.return_value = False
self.context.session.user_id.return_value = 123
self.query_singleValue.return_value = None
with self.assertRaises(koji.ActionNotAllowed) as ex:
s.logout(session_id=1)