resort tests from lib to hub for py2
This commit is contained in:
parent
aa54d705bc
commit
951cee051f
8 changed files with 0 additions and 0 deletions
|
|
@ -1,770 +0,0 @@
|
|||
from __future__ import absolute_import
|
||||
|
||||
try:
|
||||
from unittest import mock
|
||||
except ImportError:
|
||||
import mock
|
||||
|
||||
import unittest
|
||||
|
||||
import koji
|
||||
import datetime
|
||||
import kojihub.auth
|
||||
|
||||
UP = kojihub.auth.UpdateProcessor
|
||||
QP = kojihub.auth.QueryProcessor
|
||||
|
||||
|
||||
class TestAuthSession(unittest.TestCase):
|
||||
def getUpdate(self, *args, **kwargs):
|
||||
update = UP(*args, **kwargs)
|
||||
update.execute = mock.MagicMock()
|
||||
self.updates.append(update)
|
||||
return update
|
||||
|
||||
def getQuery(self, *args, **kwargs):
|
||||
query = QP(*args, **kwargs)
|
||||
query.execute = self.query_execute
|
||||
query.executeOne = self.query_executeOne
|
||||
query.singleValue = self.query_singleValue
|
||||
self.queries.append(query)
|
||||
return query
|
||||
|
||||
def setUp(self):
|
||||
self.context = mock.patch('kojihub.auth.context').start()
|
||||
kojihub.db.context = self.context
|
||||
kojihub.auth.context = self.context
|
||||
self.UpdateProcessor = mock.patch('kojihub.auth.UpdateProcessor',
|
||||
side_effect=self.getUpdate).start()
|
||||
self.updates = []
|
||||
self.query_execute = mock.MagicMock()
|
||||
self.query_executeOne = mock.MagicMock()
|
||||
self.query_singleValue = mock.MagicMock()
|
||||
self.QueryProcessor = mock.patch('kojihub.auth.QueryProcessor',
|
||||
side_effect=self.getQuery).start()
|
||||
self.queries = []
|
||||
# It seems MagicMock will not automatically handle attributes that
|
||||
# start with "assert"
|
||||
self.context.session.assertLogin = mock.MagicMock()
|
||||
|
||||
def tearDown(self):
|
||||
mock.patch.stopall()
|
||||
|
||||
def test_instance(self):
|
||||
"""Simple kojihub.auth.Session instance"""
|
||||
self.context.opts = {
|
||||
'CheckClientIP': True,
|
||||
'DisableURLSessions': False,
|
||||
'SessionRenewalTimeout': 0,
|
||||
}
|
||||
self.context.environ = {'QUERY_STRING': 'non_session_key=1'}
|
||||
with self.assertRaises(koji.GenericError) as cm:
|
||||
kojihub.auth.Session()
|
||||
# no args in request/environment
|
||||
self.assertEqual(cm.exception.args[0], "'session-id' not specified in session args")
|
||||
|
||||
def get_session_old(self):
|
||||
"""auth.Session instance"""
|
||||
# base session from test_basic_instance
|
||||
# url-based kojihub.auth - will be dropped in 1.34
|
||||
self.context.opts = {
|
||||
'CheckClientIP': True,
|
||||
'DisableURLSessions': False,
|
||||
'SessionRenewalTimeout': 0,
|
||||
}
|
||||
self.context.environ = {
|
||||
'QUERY_STRING': 'session-id=123&session-key=xyz&callnum=345',
|
||||
'REMOTE_ADDR': 'remote-addr',
|
||||
}
|
||||
|
||||
self.query_executeOne.side_effect = [
|
||||
{'authtype': 2, 'callnum': 1, "date_part('epoch', start_time)": 1666599426.227002,
|
||||
"date_part('epoch', update_time)": 1666599426.254308, 'exclusive': None,
|
||||
'expired': False, 'master': None,
|
||||
'start_time': datetime.datetime(2022, 10, 24, 8, 17, 6, 227002,
|
||||
tzinfo=datetime.timezone.utc),
|
||||
'update_time': datetime.datetime(2022, 10, 24, 8, 17, 6, 254308,
|
||||
tzinfo=datetime.timezone.utc),
|
||||
'user_id': 1},
|
||||
{'name': 'kojiadmin', 'status': 0, 'usertype': 0}]
|
||||
self.query_singleValue.return_value = 123
|
||||
s = kojihub.auth.Session()
|
||||
return s, self.context
|
||||
|
||||
def get_session(self):
|
||||
# base session from test_basic_instance
|
||||
# header-based auth
|
||||
self.context.opts = {
|
||||
'CheckClientIP': True,
|
||||
'DisableURLSessions': True,
|
||||
'SessionRenewalTimeout': 0,
|
||||
}
|
||||
self.context.environ = {
|
||||
'HTTP_KOJI_SESSION_ID': '123',
|
||||
'HTTP_KOJI_SESSION_KEY': 'xyz',
|
||||
'HTTP_KOJI_CALLNUM': '345',
|
||||
'REMOTE_ADDR': 'remote-addr',
|
||||
}
|
||||
|
||||
self.query_executeOne.side_effect = [
|
||||
{'authtype': 2, 'callnum': 1, "date_part('epoch', start_time)": 1666599426.227002,
|
||||
"date_part('epoch', update_time)": 1666599426.254308, 'exclusive': None,
|
||||
'expired': False, 'master': None,
|
||||
'start_time': datetime.datetime(2022, 10, 24, 8, 17, 6, 227002,
|
||||
tzinfo=datetime.timezone.utc),
|
||||
'update_time': datetime.datetime(2022, 10, 24, 8, 17, 6, 254308,
|
||||
tzinfo=datetime.timezone.utc),
|
||||
'user_id': 1},
|
||||
{'name': 'kojiadmin', 'status': 0, 'usertype': 0}]
|
||||
self.query_singleValue.return_value = 123
|
||||
s = kojihub.auth.Session()
|
||||
return s, self.context
|
||||
|
||||
def test_session_old(self):
|
||||
self.get_session_old()
|
||||
|
||||
def test_renewal_timeout(self):
|
||||
"""Simple kojihub.auth.Session instance"""
|
||||
self.context.opts = {
|
||||
'CheckClientIP': True,
|
||||
'DisableURLSessions': False,
|
||||
'SessionRenewalTimeout': 1440,
|
||||
}
|
||||
self.context.environ = {
|
||||
'QUERY_STRING': 'session-id=123&session-key=xyz&callnum=345',
|
||||
'REMOTE_ADDR': 'remote-addr',
|
||||
}
|
||||
|
||||
self.query_executeOne.side_effect = [
|
||||
{'authtype': 2, 'callnum': 1, "start_ts": 1666599426.227002,
|
||||
"update_ts": 1666599426.254308, 'exclusive': None,
|
||||
'expired': False, 'master': None,
|
||||
'start_time': datetime.datetime(2022, 10, 24, 8, 17, 6, 227002,
|
||||
tzinfo=datetime.timezone.utc),
|
||||
'update_time': datetime.datetime(2022, 10, 24, 8, 17, 6, 254308,
|
||||
tzinfo=datetime.timezone.utc),
|
||||
'renew_ts': None,
|
||||
'user_id': 1},
|
||||
{'name': 'kojiadmin', 'status': 0, 'usertype': 0}]
|
||||
with self.assertRaises(koji.GenericError) as cm:
|
||||
kojihub.auth.Session()
|
||||
# no args in request/environment
|
||||
self.assertEqual(cm.exception.args[0], 'session "123" has expired')
|
||||
|
||||
self.assertEqual(len(self.updates), 1)
|
||||
self.assertEqual(len(self.queries), 1)
|
||||
|
||||
update = self.updates[0]
|
||||
|
||||
self.assertEqual(update.table, 'sessions')
|
||||
self.assertEqual(update.values['id'], 123)
|
||||
self.assertEqual(update.clauses, ['id = %(id)s OR master = %(id)s'])
|
||||
self.assertEqual(update.data, {'expired': True})
|
||||
self.assertEqual(update.rawdata, {})
|
||||
|
||||
query = self.queries[0]
|
||||
self.assertEqual(query.tables, ['sessions'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses, ['closed IS FALSE', 'hostip = %(hostip)s', 'id = %(id)i',
|
||||
'key = %(key)s'])
|
||||
self.assertEqual(query.columns, ['authtype', 'callnum', 'exclusive', 'expired', 'master',
|
||||
'renew_time', "date_part('epoch', renew_time)",
|
||||
'start_time', "date_part('epoch', start_time)",
|
||||
'update_time', "date_part('epoch', update_time)",
|
||||
'user_id'])
|
||||
self.assertEqual(query.aliases, ['authtype', 'callnum', 'exclusive', 'expired', 'master',
|
||||
'renew_time', 'renew_ts', 'start_time', 'start_ts',
|
||||
'update_time', 'update_ts', 'user_id'])
|
||||
self.assertEqual(query.values, {'id': 123, 'key': 'xyz', 'hostip': 'remote-addr'})
|
||||
|
||||
def test_basic_instance(self):
|
||||
"""auth.Session instance"""
|
||||
s, cntext = self.get_session()
|
||||
self.assertEqual(len(self.updates), 2)
|
||||
update = self.updates[0]
|
||||
|
||||
self.assertEqual(update.table, 'sessions')
|
||||
self.assertEqual(update.values['id'], 123)
|
||||
self.assertEqual(update.clauses, ['id = %(id)i'])
|
||||
self.assertEqual(update.data, {})
|
||||
self.assertEqual(update.rawdata, {'update_time': 'NOW()'})
|
||||
|
||||
update = self.updates[1]
|
||||
|
||||
self.assertEqual(update.table, 'sessions')
|
||||
self.assertEqual(update.values['id'], 123)
|
||||
self.assertEqual(update.clauses, ['id = %(id)i'])
|
||||
self.assertEqual(update.data, {'callnum': 345})
|
||||
self.assertEqual(update.rawdata, {})
|
||||
|
||||
self.assertEqual(len(self.queries), 3)
|
||||
|
||||
query = self.queries[0]
|
||||
self.assertEqual(query.tables, ['sessions'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses, ['closed IS FALSE', 'hostip = %(hostip)s', 'id = %(id)i',
|
||||
'key = %(key)s'])
|
||||
self.assertEqual(query.columns, ['authtype', 'callnum', 'exclusive', 'expired', 'master',
|
||||
'renew_time', "date_part('epoch', renew_time)",
|
||||
'start_time', "date_part('epoch', start_time)",
|
||||
'update_time', "date_part('epoch', update_time)",
|
||||
'user_id'])
|
||||
self.assertEqual(query.aliases, ['authtype', 'callnum', 'exclusive', 'expired', 'master',
|
||||
'renew_time', 'renew_ts', 'start_time', 'start_ts',
|
||||
'update_time', 'update_ts', 'user_id'])
|
||||
self.assertEqual(query.values, {'id': 123, 'key': 'xyz', 'hostip': 'remote-addr'})
|
||||
|
||||
query = self.queries[1]
|
||||
self.assertEqual(query.tables, ['users'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses, ['id=%(user_id)s'])
|
||||
self.assertEqual(query.columns, ['name', 'status', 'usertype'])
|
||||
self.assertEqual(query.values, {'user_id': 1})
|
||||
|
||||
query = self.queries[2]
|
||||
self.assertEqual(query.tables, ['sessions'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses, ['closed = FALSE', 'exclusive = TRUE',
|
||||
'user_id=%(user_id)s'])
|
||||
self.assertEqual(query.columns, ['id'])
|
||||
|
||||
def test_getattr(self):
|
||||
"""auth.Session instance"""
|
||||
s, cntext = self.get_session()
|
||||
|
||||
self.assertEqual(len(self.updates), 2)
|
||||
update = self.updates[0]
|
||||
|
||||
self.assertEqual(update.table, 'sessions')
|
||||
self.assertEqual(update.values['id'], 123)
|
||||
self.assertEqual(update.clauses, ['id = %(id)i'])
|
||||
self.assertEqual(update.data, {})
|
||||
self.assertEqual(update.rawdata, {'update_time': 'NOW()'})
|
||||
|
||||
update = self.updates[1]
|
||||
|
||||
self.assertEqual(update.table, 'sessions')
|
||||
self.assertEqual(update.values['id'], 123)
|
||||
self.assertEqual(update.clauses, ['id = %(id)i'])
|
||||
self.assertEqual(update.data, {'callnum': 345})
|
||||
self.assertEqual(update.rawdata, {})
|
||||
|
||||
self.assertEqual(len(self.queries), 3)
|
||||
|
||||
query = self.queries[0]
|
||||
self.assertEqual(query.tables, ['sessions'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses, ['closed IS FALSE', 'hostip = %(hostip)s', 'id = %(id)i',
|
||||
'key = %(key)s'])
|
||||
self.assertEqual(query.columns, ['authtype', 'callnum', 'exclusive', 'expired', 'master',
|
||||
'renew_time', "date_part('epoch', renew_time)",
|
||||
'start_time', "date_part('epoch', start_time)",
|
||||
'update_time', "date_part('epoch', update_time)",
|
||||
'user_id'])
|
||||
self.assertEqual(query.aliases, ['authtype', 'callnum', 'exclusive', 'expired', 'master',
|
||||
'renew_time', 'renew_ts', 'start_time', 'start_ts',
|
||||
'update_time', 'update_ts', 'user_id'])
|
||||
self.assertEqual(query.values, {'id': 123, 'key': 'xyz', 'hostip': 'remote-addr'})
|
||||
|
||||
query = self.queries[1]
|
||||
self.assertEqual(query.tables, ['users'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses, ['id=%(user_id)s'])
|
||||
self.assertEqual(query.columns, ['name', 'status', 'usertype'])
|
||||
self.assertEqual(query.values, {'user_id': 1})
|
||||
|
||||
query = self.queries[2]
|
||||
self.assertEqual(query.tables, ['sessions'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses, ['closed = FALSE', 'exclusive = TRUE',
|
||||
'user_id=%(user_id)s'])
|
||||
self.assertEqual(query.columns, ['id'])
|
||||
|
||||
# all other names should raise error
|
||||
with self.assertRaises(AttributeError):
|
||||
s.non_existing_attribute
|
||||
|
||||
def test_str(self):
|
||||
"""auth.Session string representation"""
|
||||
s, cntext = self.get_session()
|
||||
self.context.cnx = cntext.cnx
|
||||
|
||||
s.logged_in = False
|
||||
s.message = 'msg'
|
||||
self.assertEqual(str(s), 'session: not logged in (msg)')
|
||||
s.logged_in = True
|
||||
self.assertNotEqual(str(s), 'session: not logged in')
|
||||
|
||||
def test_validate(self):
|
||||
"""Session.validate"""
|
||||
s, cntext = self.get_session()
|
||||
self.context.cnx = cntext.cnx
|
||||
|
||||
s.lockerror = True
|
||||
with self.assertRaises(koji.AuthLockError):
|
||||
s.validate()
|
||||
|
||||
s.lockerror = False
|
||||
self.assertTrue(s.validate())
|
||||
|
||||
def test_makeShared(self):
|
||||
"""Session.makeShared"""
|
||||
s, _ = self.get_session()
|
||||
s.makeShared()
|
||||
self.assertEqual(len(self.updates), 3)
|
||||
# check only last update query, first two are tested in test_basic_instance
|
||||
update = self.updates[2]
|
||||
|
||||
self.assertEqual(update.table, 'sessions')
|
||||
self.assertEqual(update.values['session_id'], 123)
|
||||
self.assertEqual(update.clauses, ['id=%(session_id)s'])
|
||||
self.assertEqual(update.data, {'exclusive': None})
|
||||
self.assertEqual(update.rawdata, {})
|
||||
|
||||
self.assertEqual(len(self.queries), 3)
|
||||
# all queries are tested in test_basic_instance
|
||||
|
||||
@mock.patch('socket.gethostbyname')
|
||||
def test_get_remote_ip(self, gethostbyname):
|
||||
"""Session.get_remote_ip"""
|
||||
s, _ = self.get_session()
|
||||
|
||||
self.context.opts = {'CheckClientIP': False}
|
||||
self.assertEqual(s.get_remote_ip(), '-')
|
||||
|
||||
self.context.opts = {'CheckClientIP': True}
|
||||
self.assertEqual(s.get_remote_ip(override='xoverride'), 'xoverride')
|
||||
|
||||
self.context.environ = {'REMOTE_ADDR': '123.123.123.123'}
|
||||
self.assertEqual(s.get_remote_ip(), '123.123.123.123')
|
||||
|
||||
gethostbyname.return_value = 'ip'
|
||||
self.context.environ = {'REMOTE_ADDR': '127.0.0.1'}
|
||||
self.assertEqual(s.get_remote_ip(), 'ip')
|
||||
|
||||
def test_login(self):
|
||||
s, _ = self.get_session()
|
||||
|
||||
# already logged in
|
||||
with self.assertRaises(koji.GenericError):
|
||||
s.login('user', 'password')
|
||||
|
||||
s.logged_in = False
|
||||
with self.assertRaises(koji.AuthError):
|
||||
s.login('user', 123)
|
||||
with self.assertRaises(koji.AuthError):
|
||||
s.login('user', '')
|
||||
|
||||
# correct
|
||||
s.get_remote_ip = mock.MagicMock()
|
||||
s.get_remote_ip.return_value = 'hostip'
|
||||
s.checkLoginAllowed = mock.MagicMock()
|
||||
s.checkLoginAllowed.return_value = True
|
||||
s.createSession = mock.MagicMock()
|
||||
s.createSession.return_value = {'session-id': 'session-id'}
|
||||
self.query_singleValue.return_value = 123
|
||||
|
||||
result = s.login('user', 'password')
|
||||
|
||||
self.assertEqual(len(self.queries), 4)
|
||||
|
||||
# check only last update query, first three are tested in test_basic_instance
|
||||
query = self.queries[3]
|
||||
self.assertEqual(query.tables, ['users'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses, ['name = %(user)s', 'password = %(password)s'])
|
||||
self.assertEqual(query.columns, ['id'])
|
||||
self.assertEqual(query.values, {'user': 'user', 'password': 'password'})
|
||||
|
||||
self.assertEqual(len(self.updates), 2)
|
||||
# all updates are tested in test_basic_instance
|
||||
|
||||
self.assertEqual(s.get_remote_ip.call_count, 1)
|
||||
self.assertEqual(s.checkLoginAllowed.call_args, mock.call(123))
|
||||
self.assertEqual(result, s.createSession.return_value)
|
||||
|
||||
# one more try for non-existing user
|
||||
self.query_singleValue.return_value = None
|
||||
with self.assertRaises(koji.AuthError):
|
||||
s.login('user', 'password')
|
||||
|
||||
def test_checkKrbPrincipal(self):
|
||||
s, cntext = self.get_session()
|
||||
self.assertIsNone(s.checkKrbPrincipal(None))
|
||||
self.context.opts = {'AllowedKrbRealms': '*'}
|
||||
self.assertIsNone(s.checkKrbPrincipal('any'))
|
||||
self.context.opts = {'AllowedKrbRealms': 'example.com'}
|
||||
with self.assertRaises(koji.AuthError) as cm:
|
||||
s.checkKrbPrincipal('any')
|
||||
self.assertEqual(cm.exception.args[0],
|
||||
'invalid Kerberos principal: any')
|
||||
with self.assertRaises(koji.AuthError) as cm:
|
||||
s.checkKrbPrincipal('any@')
|
||||
self.assertEqual(cm.exception.args[0],
|
||||
'invalid Kerberos principal: any@')
|
||||
with self.assertRaises(koji.AuthError) as cm:
|
||||
s.checkKrbPrincipal('any@bannedrealm')
|
||||
self.assertEqual(cm.exception.args[0],
|
||||
"Kerberos principal's realm:"
|
||||
" bannedrealm is not allowed")
|
||||
self.assertIsNone(s.checkKrbPrincipal('user@example.com'))
|
||||
self.context.opts = {'AllowedKrbRealms': 'example.com,example.net,example.org'}
|
||||
self.assertIsNone(s.checkKrbPrincipal('user@example.net'))
|
||||
|
||||
def test_getUserIdFromKerberos(self):
|
||||
krb_principal = 'test-krb-principal'
|
||||
self.query_singleValue.return_value = 135
|
||||
s, cntext = self.get_session()
|
||||
s.checkKrbPrincipal = mock.MagicMock()
|
||||
s.checkKrbPrincipal.return_value = True
|
||||
|
||||
s.getUserIdFromKerberos(krb_principal)
|
||||
|
||||
self.assertEqual(len(self.queries), 4)
|
||||
# check only last update query, first three are tested in test_basic_instance
|
||||
query = self.queries[3]
|
||||
self.assertEqual(query.tables, ['users'])
|
||||
self.assertEqual(query.joins, ['user_krb_principals ON '
|
||||
'users.id = user_krb_principals.user_id'])
|
||||
self.assertEqual(query.clauses, ['krb_principal = %(krb_principal)s'])
|
||||
self.assertEqual(query.columns, ['id'])
|
||||
self.assertEqual(query.values, {'krb_principal': krb_principal})
|
||||
|
||||
self.assertEqual(len(self.updates), 2)
|
||||
# all updates are tested in test_basic_instance
|
||||
|
||||
def test_getUserId(self):
|
||||
self.query_singleValue.return_value = 135
|
||||
s, cntext = self.get_session()
|
||||
username = 'test-user'
|
||||
|
||||
s.getUserId(username)
|
||||
|
||||
self.assertEqual(len(self.queries), 4)
|
||||
# check only last update query, first three are tested in test_basic_instance
|
||||
query = self.queries[3]
|
||||
self.assertEqual(query.tables, ['users'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses, ['name = %(username)s'])
|
||||
self.assertEqual(query.columns, ['id'])
|
||||
self.assertEqual(query.values, {'username': username})
|
||||
|
||||
self.assertEqual(len(self.updates), 2)
|
||||
# all updates are tested in test_basic_instance
|
||||
|
||||
def test_getHostId(self):
|
||||
self.query_singleValue.return_value = 199
|
||||
s, cntext = self.get_session()
|
||||
|
||||
s._getHostId()
|
||||
|
||||
self.assertEqual(len(self.queries), 4)
|
||||
# check only last update query, first three are tested in test_basic_instance
|
||||
query = self.queries[3]
|
||||
self.assertEqual(query.tables, ['host'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses, ['user_id = %(uid)d'])
|
||||
self.assertEqual(query.columns, ['id'])
|
||||
self.assertEqual(query.values, {'uid': 1})
|
||||
|
||||
self.assertEqual(len(self.updates), 2)
|
||||
# all updates are tested in test_basic_instance
|
||||
|
||||
def test_logout_not_logged(self):
|
||||
s, cntext = self.get_session()
|
||||
s.logged_in = False
|
||||
with self.assertRaises(koji.AuthError) as cm:
|
||||
s.logout()
|
||||
self.assertEqual(cm.exception.args[0], 'Not logged in')
|
||||
|
||||
def test_logout_logged(self):
|
||||
s, _ = self.get_session()
|
||||
s.logged_in = True
|
||||
s.logout()
|
||||
|
||||
self.assertEqual(len(self.queries), 3)
|
||||
# all queries are tested in test_basic_instance
|
||||
|
||||
self.assertEqual(len(self.updates), 3)
|
||||
# check only last update query, first two are tested in test_basic_instance
|
||||
update = self.updates[2]
|
||||
|
||||
self.assertEqual(update.table, 'sessions')
|
||||
self.assertEqual(update.values, {'id': 123, 'id': 123})
|
||||
self.assertEqual(update.clauses, ['id = %(id)i OR master = %(id)i'])
|
||||
self.assertEqual(update.data, {'closed': True, 'expired': True, 'exclusive': None})
|
||||
self.assertEqual(update.rawdata, {})
|
||||
|
||||
def test_logoutChild_not_logged(self):
|
||||
s, cntext = self.get_session()
|
||||
s.logged_in = False
|
||||
with self.assertRaises(koji.AuthError) as cm:
|
||||
s.logoutChild(111)
|
||||
self.assertEqual(cm.exception.args[0], 'Not logged in')
|
||||
|
||||
def test_logoutChild_logged(self):
|
||||
s, _ = self.get_session()
|
||||
s.logged_in = True
|
||||
s.logoutChild(111)
|
||||
|
||||
self.assertEqual(len(self.queries), 3)
|
||||
# all queries are tested in test_basic_instance
|
||||
|
||||
self.assertEqual(len(self.updates), 3)
|
||||
# check only last update query, first two are tested in test_basic_instance
|
||||
update = self.updates[2]
|
||||
|
||||
self.assertEqual(update.table, 'sessions')
|
||||
self.assertEqual(update.values, {'session_id': 111, 'master': 123})
|
||||
self.assertEqual(update.clauses, ['id = %(session_id)i', 'master = %(master)i'])
|
||||
self.assertEqual(update.data, {'expired': True, 'exclusive': None, 'closed': True})
|
||||
self.assertEqual(update.rawdata, {})
|
||||
|
||||
def test_makeExclusive_not_master(self):
|
||||
s, cntext = self.get_session()
|
||||
s.master = 333
|
||||
with self.assertRaises(koji.GenericError) as cm:
|
||||
s.makeExclusive()
|
||||
self.assertEqual(cm.exception.args[0], 'subsessions cannot become exclusive')
|
||||
|
||||
def test_makeExclusive_already_exclusive(self):
|
||||
s, cntext = self.get_session()
|
||||
s.master = None
|
||||
s.exclusive = True
|
||||
with self.assertRaises(koji.GenericError) as cm:
|
||||
s.makeExclusive()
|
||||
self.assertEqual(cm.exception.args[0], 'session is already exclusive')
|
||||
|
||||
def test_makeExclusive_without_force(self):
|
||||
s, cntext = self.get_session()
|
||||
s.master = None
|
||||
s.exclusive = False
|
||||
self.query_singleValue.return_value = 123
|
||||
|
||||
with self.assertRaises(koji.AuthLockError) as cm:
|
||||
s.makeExclusive()
|
||||
self.assertEqual(cm.exception.args[0], 'Cannot get exclusive session')
|
||||
|
||||
self.assertEqual(len(self.queries), 5)
|
||||
self.assertEqual(len(self.updates), 2)
|
||||
|
||||
def test_makeExclusive(self):
|
||||
s, _ = self.get_session()
|
||||
s.master = None
|
||||
s.exclusive = False
|
||||
self.query_singleValue.return_value = 123
|
||||
|
||||
s.makeExclusive(force=True)
|
||||
|
||||
self.assertEqual(len(self.queries), 5)
|
||||
# check only last two queries, first two are tested in test_basic_instance
|
||||
|
||||
query = self.queries[3]
|
||||
self.assertEqual(query.tables, ['users'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses, ['id=%(user_id)s'])
|
||||
self.assertEqual(query.columns, ['id'])
|
||||
self.assertEqual(query.values, {'user_id': 1})
|
||||
|
||||
query = self.queries[4]
|
||||
self.assertEqual(query.tables, ['sessions'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses, ['closed = FALSE', 'exclusive = TRUE',
|
||||
'user_id=%(user_id)s'])
|
||||
self.assertEqual(query.columns, ['id'])
|
||||
self.assertEqual(query.values, {'user_id': 1})
|
||||
|
||||
self.assertEqual(len(self.updates), 4)
|
||||
# check only last two update queries, first two are tested in test_basic_instance
|
||||
|
||||
update = self.updates[2]
|
||||
self.assertEqual(update.table, 'sessions')
|
||||
self.assertEqual(update.values, {'excl_id': 123})
|
||||
self.assertEqual(update.clauses, ['id=%(excl_id)s'])
|
||||
self.assertEqual(update.data, {'expired': True, 'exclusive': None, 'closed': True})
|
||||
self.assertEqual(update.rawdata, {})
|
||||
|
||||
update = self.updates[3]
|
||||
self.assertEqual(update.table, 'sessions')
|
||||
self.assertEqual(update.values, {'session_id': 123})
|
||||
self.assertEqual(update.clauses, ['id=%(session_id)s'])
|
||||
self.assertEqual(update.data, {'exclusive': True})
|
||||
self.assertEqual(update.rawdata, {})
|
||||
|
||||
def test_checkLoginAllowed(self):
|
||||
s, cntext = self.get_session()
|
||||
self.query_executeOne.side_effect = [{'name': 'testuser', 'status': 0, 'usertype': 0}]
|
||||
s.checkLoginAllowed(2)
|
||||
|
||||
self.assertEqual(len(self.queries), 4)
|
||||
query = self.queries[3]
|
||||
self.assertEqual(query.tables, ['users'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses, ['id = %(user_id)i'])
|
||||
self.assertEqual(query.columns, ['name', 'status', 'usertype'])
|
||||
self.assertEqual(query.values, {'user_id': 2})
|
||||
|
||||
self.assertEqual(len(self.updates), 2)
|
||||
|
||||
def test_checkLoginAllowed_not_normal_status(self):
|
||||
s, cntext = self.get_session()
|
||||
self.query_executeOne.side_effect = [{'name': 'testuser', 'status': 1, 'usertype': 0}]
|
||||
|
||||
with self.assertRaises(koji.AuthError) as cm:
|
||||
s.checkLoginAllowed(2)
|
||||
self.assertEqual(cm.exception.args[0], 'logins by testuser are not allowed')
|
||||
|
||||
self.assertEqual(len(self.queries), 4)
|
||||
query = self.queries[3]
|
||||
self.assertEqual(query.tables, ['users'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses, ['id = %(user_id)i'])
|
||||
self.assertEqual(query.columns, ['name', 'status', 'usertype'])
|
||||
self.assertEqual(query.values, {'user_id': 2})
|
||||
|
||||
self.assertEqual(len(self.updates), 2)
|
||||
|
||||
def test_checkLoginAllowed_not_exist_user(self):
|
||||
s, cntext = self.get_session()
|
||||
self.query_executeOne.side_effect = [None]
|
||||
|
||||
with self.assertRaises(koji.AuthError) as cm:
|
||||
s.checkLoginAllowed(2)
|
||||
self.assertEqual(cm.exception.args[0], 'invalid user_id: 2')
|
||||
|
||||
self.assertEqual(len(self.queries), 4)
|
||||
query = self.queries[3]
|
||||
self.assertEqual(query.tables, ['users'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses, ['id = %(user_id)i'])
|
||||
self.assertEqual(query.columns, ['name', 'status', 'usertype'])
|
||||
self.assertEqual(query.values, {'user_id': 2})
|
||||
|
||||
self.assertEqual(len(self.updates), 2)
|
||||
|
||||
def test_createUserFromKerberos_invalid_krb(self):
|
||||
s, cntext = self.get_session()
|
||||
krb_principal = 'test-krb-princ'
|
||||
with self.assertRaises(koji.AuthError) as cm:
|
||||
s.createUserFromKerberos(krb_principal)
|
||||
self.assertEqual(cm.exception.args[0], 'invalid Kerberos principal: %s' % krb_principal)
|
||||
|
||||
def test_createUserFromKerberos_user_not_exists(self):
|
||||
self.query_execute.return_value = None
|
||||
s, cntext = self.get_session()
|
||||
krb_principal = 'test-krb-princ@redhat.com'
|
||||
s.createUser = mock.MagicMock()
|
||||
s.createUser.return_value = 3
|
||||
s.createUserFromKerberos(krb_principal)
|
||||
self.assertEqual(len(self.queries), 4)
|
||||
self.assertEqual(len(self.updates), 2)
|
||||
|
||||
query = self.queries[3]
|
||||
self.assertEqual(query.tables, ['users'])
|
||||
self.assertEqual(query.joins, ['LEFT JOIN user_krb_principals ON '
|
||||
'users.id = user_krb_principals.user_id'])
|
||||
self.assertEqual(query.clauses, ['name = %(user_name)s'])
|
||||
self.assertEqual(query.columns, ['id', 'krb_principal'])
|
||||
self.assertEqual(query.values, {'user_name': 'test-krb-princ'})
|
||||
|
||||
def test_createUserFromKerberos_valid(self):
|
||||
self.query_execute.return_value = [{'id': 1, 'krb_principal': 'krb-user-1@redhat.com'},
|
||||
{'id': 1, 'krb_principal': 'krb-user-2@redhat.com'}]
|
||||
s, cntext = self.get_session()
|
||||
krb_principal = 'test-krb-princ@redhat.com'
|
||||
s.setKrbPrincipal = mock.MagicMock()
|
||||
s.setKrbPrincipal.return_value = 1
|
||||
s.createUserFromKerberos(krb_principal)
|
||||
self.assertEqual(len(self.queries), 4)
|
||||
self.assertEqual(len(self.updates), 2)
|
||||
|
||||
query = self.queries[3]
|
||||
self.assertEqual(query.tables, ['users'])
|
||||
self.assertEqual(query.joins, ['LEFT JOIN user_krb_principals ON '
|
||||
'users.id = user_krb_principals.user_id'])
|
||||
self.assertEqual(query.clauses, ['name = %(user_name)s'])
|
||||
self.assertEqual(query.columns, ['id', 'krb_principal'])
|
||||
self.assertEqual(query.values, {'user_name': 'test-krb-princ'})
|
||||
|
||||
# functions outside Session object
|
||||
|
||||
def test_get_user_data(self):
|
||||
"""auth.get_user_data"""
|
||||
self.query_executeOne.return_value = None
|
||||
self.assertEqual(len(self.queries), 0)
|
||||
|
||||
self.query_executeOne.return_value = {'name': 'name', 'status': 'status',
|
||||
'usertype': 'usertype'}
|
||||
kojihub.auth.get_user_data(1)
|
||||
self.assertEqual(len(self.queries), 1)
|
||||
query = self.queries[0]
|
||||
self.assertEqual(query.tables, ['users'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses, ['id=%(user_id)s'])
|
||||
self.assertEqual(query.columns, ['name', 'status', 'usertype'])
|
||||
|
||||
def test_get_user_groups(self):
|
||||
"""auth.get_user_groups"""
|
||||
kojihub.auth.get_user_groups(1)
|
||||
self.assertEqual(len(self.queries), 1)
|
||||
query = self.queries[0]
|
||||
self.assertEqual(query.tables, ['user_groups'])
|
||||
self.assertEqual(query.joins, ['users ON group_id = users.id'])
|
||||
self.assertEqual(query.clauses, ['active IS TRUE', 'user_id=%(user_id)i',
|
||||
'users.usertype=%(t_group)i'])
|
||||
self.assertEqual(query.columns, ['group_id', 'name'])
|
||||
|
||||
def test_get_user_perms(self):
|
||||
"""auth.get_user_perms"""
|
||||
kojihub.auth.get_user_perms(1)
|
||||
self.assertEqual(len(self.queries), 2)
|
||||
query = self.queries[0]
|
||||
self.assertEqual(query.tables, ['user_perms'])
|
||||
self.assertEqual(query.joins, ['permissions ON perm_id = permissions.id'])
|
||||
self.assertEqual(query.clauses, ['active IS TRUE', 'user_id=%(user_id)s'])
|
||||
self.assertEqual(query.columns, ['name'])
|
||||
query = self.queries[1]
|
||||
self.assertEqual(query.tables, ['user_groups'])
|
||||
self.assertEqual(query.joins, [
|
||||
'user_perms ON user_perms.user_id = user_groups.group_id',
|
||||
'permissions ON perm_id = permissions.id'])
|
||||
self.assertEqual(sorted(query.clauses), sorted([
|
||||
'user_groups.active IS TRUE',
|
||||
'user_perms.active IS TRUE',
|
||||
'user_groups.user_id=%(user_id)s']))
|
||||
self.assertEqual(query.columns, ['permissions.name'])
|
||||
|
||||
def test_get_user_perms_inherited(self):
|
||||
self.query_execute.side_effect = [
|
||||
[{'id': 1, 'name': 'perm1'}, {'id': 2, 'name': 'perm2'}],
|
||||
[{'name': 'perm3'}]
|
||||
]
|
||||
result = kojihub.auth.get_user_perms(1)
|
||||
self.assertEqual(set(result), {'perm1', 'perm2', 'perm3'})
|
||||
|
||||
def test_get_user_perms_inherited_data(self):
|
||||
self.query_execute.side_effect = [
|
||||
[{'id': 1, 'name': 'perm1'}, {'id': 2, 'name': 'perm2'}],
|
||||
[{'name': 'perm3', 'group': 'group_a'},
|
||||
{'name': 'perm4', 'group': 'group_b'},
|
||||
{'name': 'perm4', 'group': 'group_c'}]
|
||||
]
|
||||
result = kojihub.auth.get_user_perms(1, inheritance_data=True)
|
||||
self.assertEqual(result, {
|
||||
'perm1': [None],
|
||||
'perm2': [None],
|
||||
'perm3': ['group_a'],
|
||||
'perm4': ['group_b', 'group_c'],
|
||||
})
|
||||
|
||||
def test_logout_logged_not_owner(self):
|
||||
s, _ = self.get_session()
|
||||
|
||||
s.logged_in = True
|
||||
# session_id without admin perms and not owner
|
||||
self.context.session.hasPerm.return_value = False
|
||||
self.context.session.user_id.return_value = 123
|
||||
self.query_singleValue.return_value = None
|
||||
with self.assertRaises(koji.ActionNotAllowed) as ex:
|
||||
s.logout(session_id=1)
|
||||
self.assertEqual("only admins or owner may logout other session", str(ex.exception))
|
||||
|
|
@ -1,211 +0,0 @@
|
|||
try:
|
||||
from unittest import mock
|
||||
except ImportError:
|
||||
import mock
|
||||
import unittest
|
||||
|
||||
import koji
|
||||
import kojihub
|
||||
|
||||
|
||||
class TestInsertProcessor(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.context_db = mock.patch('kojihub.db.context').start()
|
||||
|
||||
def tearDown(self):
|
||||
mock.patch.stopall()
|
||||
|
||||
def test_basic_instantiation(self):
|
||||
proc = kojihub.InsertProcessor('sometable')
|
||||
actual = str(proc)
|
||||
expected = '-- incomplete update: no assigns'
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
def test_to_string_with_data(self):
|
||||
proc = kojihub.InsertProcessor('sometable', data={'foo': 'bar'})
|
||||
actual = str(proc)
|
||||
expected = 'INSERT INTO sometable (foo) VALUES (%(foo)s)'
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
def test_simple_execution_with_iterate(self):
|
||||
cursor = mock.MagicMock()
|
||||
self.context_db.cnx.cursor.return_value = cursor
|
||||
proc = kojihub.InsertProcessor('sometable', data={'foo': 'bar'})
|
||||
proc.execute()
|
||||
cursor.execute.assert_called_once_with(
|
||||
'INSERT INTO sometable (foo) VALUES (%(foo)s)',
|
||||
{'foo': 'bar'}, log_errors=True)
|
||||
|
||||
def test_make_create(self,):
|
||||
cursor = mock.MagicMock()
|
||||
self.context_db.cnx.cursor.return_value = cursor
|
||||
self.context_db.session.assertLogin = mock.MagicMock()
|
||||
proc = kojihub.InsertProcessor('sometable', data={'foo': 'bar'})
|
||||
proc.make_create(event_id=1, user_id=2)
|
||||
self.assertEqual(proc.data['create_event'], 1)
|
||||
self.assertEqual(proc.data['creator_id'], 2)
|
||||
|
||||
proc.make_create(user_id=2)
|
||||
self.assertEqual(proc.data['create_event'], self.context_db.event_id)
|
||||
self.assertEqual(proc.data['creator_id'], 2)
|
||||
|
||||
proc.make_create(event_id=1)
|
||||
self.assertEqual(proc.data['create_event'], 1)
|
||||
self.assertEqual(proc.data['creator_id'], self.context_db.session.user_id)
|
||||
|
||||
proc.make_create()
|
||||
self.assertEqual(proc.data['create_event'], self.context_db.event_id)
|
||||
self.assertEqual(proc.data['creator_id'], self.context_db.session.user_id)
|
||||
|
||||
def test_dup_check(self):
|
||||
cursor = mock.MagicMock()
|
||||
self.context_db.cnx.cursor.return_value = cursor
|
||||
self.context_db.session.assertLogin = mock.MagicMock()
|
||||
proc = kojihub.InsertProcessor('sometable', data={'foo': 'bar'})
|
||||
proc.dup_check()
|
||||
|
||||
args = cursor.execute.call_args
|
||||
actual = ' '.join(args[0][0].split())
|
||||
expected = 'SELECT foo FROM sometable WHERE (foo = %(foo)s)'
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
proc.make_create()
|
||||
proc.dup_check()
|
||||
args = cursor.execute.call_args
|
||||
actual = ' '.join(args[0][0].split())
|
||||
expected = 'SELECT active, foo FROM sometable WHERE ' + \
|
||||
'(active = %(active)s) AND (foo = %(foo)s)'
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
proc.set(onething='another')
|
||||
proc.rawset(something='something else')
|
||||
result = proc.dup_check()
|
||||
self.assertEqual(result, None)
|
||||
|
||||
def test_raw_data(self):
|
||||
cursor = mock.MagicMock()
|
||||
self.context_db.cnx.cursor.return_value = cursor
|
||||
proc = kojihub.InsertProcessor('sometable', rawdata={'foo': '\'bar\''})
|
||||
result = proc.dup_check()
|
||||
self.assertEqual(result, None)
|
||||
actual = str(proc)
|
||||
expected = "INSERT INTO sometable (foo) VALUES (('bar'))" # raw data
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
|
||||
class TestBulkInsertProcessor(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.context_db = mock.patch('kojihub.db.context').start()
|
||||
|
||||
def tearDown(self):
|
||||
mock.patch.stopall()
|
||||
|
||||
def test_basic_instantiation(self):
|
||||
proc = kojihub.BulkInsertProcessor('sometable')
|
||||
actual = str(proc)
|
||||
expected = '-- incomplete insert: no data'
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
def test_to_string_with_single_row(self):
|
||||
proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar'}])
|
||||
actual = str(proc)
|
||||
expected = 'INSERT INTO sometable (foo) VALUES (%(foo0)s)'
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
proc = kojihub.BulkInsertProcessor('sometable')
|
||||
proc.add_record(foo='bar')
|
||||
actual = str(proc)
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
def test_simple_execution(self):
|
||||
cursor = mock.MagicMock()
|
||||
self.context_db.cnx.cursor.return_value = cursor
|
||||
proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar'}])
|
||||
proc.execute()
|
||||
cursor.execute.assert_called_once_with(
|
||||
'INSERT INTO sometable (foo) VALUES (%(foo0)s)',
|
||||
{'foo0': 'bar'},
|
||||
log_errors=True
|
||||
)
|
||||
|
||||
cursor.reset_mock()
|
||||
proc = kojihub.BulkInsertProcessor('sometable')
|
||||
proc.add_record(foo='bar')
|
||||
proc.execute()
|
||||
cursor.execute.assert_called_once_with(
|
||||
'INSERT INTO sometable (foo) VALUES (%(foo0)s)',
|
||||
{'foo0': 'bar'},
|
||||
log_errors=True
|
||||
)
|
||||
|
||||
def test_bulk_execution(self):
|
||||
cursor = mock.MagicMock()
|
||||
self.context_db.cnx.cursor.return_value = cursor
|
||||
|
||||
proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar1'}])
|
||||
proc.add_record(foo='bar2')
|
||||
proc.add_record(foo='bar3')
|
||||
proc.execute()
|
||||
cursor.execute.assert_called_once_with(
|
||||
'INSERT INTO sometable (foo) VALUES (%(foo0)s), (%(foo1)s), (%(foo2)s)',
|
||||
{'foo0': 'bar1', 'foo1': 'bar2', 'foo2': 'bar3'},
|
||||
log_errors=True
|
||||
)
|
||||
|
||||
def test_missing_values(self):
|
||||
proc = kojihub.BulkInsertProcessor('sometable')
|
||||
proc.add_record(foo='bar')
|
||||
proc.add_record(foo2='bar2')
|
||||
with self.assertRaises(koji.GenericError) as cm:
|
||||
str(proc)
|
||||
self.assertEqual(cm.exception.args[0], 'Missing value foo2 in BulkInsert')
|
||||
|
||||
def test_missing_values_nostrict(self):
|
||||
proc = kojihub.BulkInsertProcessor('sometable', strict=False)
|
||||
proc.add_record(foo='bar')
|
||||
proc.add_record(foo2='bar2')
|
||||
actual = str(proc)
|
||||
expected = 'INSERT INTO sometable (foo, foo2) VALUES (%(foo0)s, NULL), (NULL, %(foo21)s)'
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
def test_missing_values_explicit_columns(self):
|
||||
proc = kojihub.BulkInsertProcessor('sometable', strict=True, columns=['foo', 'foo2'])
|
||||
proc.add_record(foo='bar')
|
||||
with self.assertRaises(koji.GenericError) as cm:
|
||||
str(proc)
|
||||
self.assertEqual(cm.exception.args[0], 'Missing value foo2 in BulkInsert')
|
||||
|
||||
def test_batch_execution(self):
|
||||
cursor = mock.MagicMock()
|
||||
self.context_db.cnx.cursor.return_value = cursor
|
||||
|
||||
proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar1'}], batch=2)
|
||||
proc.add_record(foo='bar2')
|
||||
proc.add_record(foo='bar3')
|
||||
proc.execute()
|
||||
calls = cursor.execute.mock_calls
|
||||
# list of (name, positional args, keyword args)
|
||||
self.assertEqual(len(calls), 2)
|
||||
self.assertEqual(calls[0],
|
||||
mock.call('INSERT INTO sometable (foo) VALUES (%(foo0)s), (%(foo1)s)',
|
||||
{'foo0': 'bar1', 'foo1': 'bar2'}, log_errors=True))
|
||||
self.assertEqual(calls[1],
|
||||
mock.call('INSERT INTO sometable (foo) VALUES (%(foo0)s)',
|
||||
{'foo0': 'bar3'}, log_errors=True))
|
||||
|
||||
def test_no_batch_execution(self):
|
||||
cursor = mock.MagicMock()
|
||||
self.context_db.cnx.cursor.return_value = cursor
|
||||
|
||||
proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar1'}], batch=0)
|
||||
proc.add_record(foo='bar2')
|
||||
proc.add_record(foo='bar3')
|
||||
proc.execute()
|
||||
calls = cursor.execute.mock_calls
|
||||
# list of (name, positional args, keyword args)
|
||||
self.assertEqual(len(calls), 1)
|
||||
self.assertEqual(
|
||||
calls[0],
|
||||
mock.call('INSERT INTO sometable (foo) VALUES (%(foo0)s), (%(foo1)s), (%(foo2)s)',
|
||||
{'foo0': 'bar1', 'foo1': 'bar2', 'foo2': 'bar3'}, log_errors=True)
|
||||
)
|
||||
|
|
@ -1,145 +0,0 @@
|
|||
try:
|
||||
from unittest import mock
|
||||
except ImportError:
|
||||
import mock
|
||||
import unittest
|
||||
|
||||
import kojihub
|
||||
|
||||
|
||||
class TestQueryProcessor(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.simple_arguments = dict(
|
||||
columns=['something'],
|
||||
tables=['awesome'],
|
||||
)
|
||||
self.complex_arguments = dict(
|
||||
columns=['something'],
|
||||
aliases=['other'],
|
||||
tables=['awesome'],
|
||||
joins=['morestuff'],
|
||||
# values=...
|
||||
# transform=...
|
||||
opts={
|
||||
# 'countOnly': True,
|
||||
'order': 'other',
|
||||
'offset': 10,
|
||||
'limit': 3,
|
||||
'group': 'awesome.aha'
|
||||
# 'rowlock': True,
|
||||
},
|
||||
enable_group=True
|
||||
)
|
||||
self.original_chunksize = kojihub.QueryProcessor.iterchunksize
|
||||
kojihub.QueryProcessor.iterchunksize = 2
|
||||
self.context_db = mock.patch('kojihub.db.context').start()
|
||||
|
||||
def tearDown(self):
|
||||
kojihub.QueryProcessor.iterchunksize = self.original_chunksize
|
||||
mock.patch.stopall()
|
||||
|
||||
def test_basic_instantiation(self):
|
||||
kojihub.QueryProcessor() # No exception!
|
||||
|
||||
def test_instantiation_with_cols_and_aliases(self):
|
||||
proc = kojihub.QueryProcessor(columns=['wat'], aliases=['zap'])
|
||||
assert 'zap' in proc.colsByAlias
|
||||
assert proc.colsByAlias['zap'] == 'wat'
|
||||
assert len(proc.colsByAlias) == 1
|
||||
|
||||
def test_empty_as_string(self):
|
||||
proc = kojihub.QueryProcessor()
|
||||
actual = str(proc)
|
||||
self.assertIn("SELECT", actual)
|
||||
self.assertIn("FROM", actual)
|
||||
|
||||
def test_simple_as_string(self):
|
||||
proc = kojihub.QueryProcessor(
|
||||
columns=['something'],
|
||||
tables=['awesome'],
|
||||
)
|
||||
actual = " ".join([token for token in str(proc).split() if token])
|
||||
expected = "SELECT something FROM awesome"
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
def test_complex_as_string(self):
|
||||
proc = kojihub.QueryProcessor(**self.complex_arguments)
|
||||
actual = " ".join([token for token in str(proc).split() if token])
|
||||
expected = "SELECT something FROM awesome JOIN morestuff" \
|
||||
" GROUP BY awesome.aha ORDER BY something OFFSET 10 LIMIT 3"
|
||||
self.assertEqual(actual, expected)
|
||||
args2 = self.complex_arguments.copy()
|
||||
args2['enable_group'] = False
|
||||
proc = kojihub.QueryProcessor(**args2)
|
||||
actual = " ".join([token for token in str(proc).split() if token])
|
||||
expected = "SELECT something FROM awesome JOIN morestuff" \
|
||||
" ORDER BY something OFFSET 10 LIMIT 3"
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
def test_simple_with_execution(self):
|
||||
cursor = mock.MagicMock()
|
||||
self.context_db.cnx.cursor.return_value = cursor
|
||||
proc = kojihub.QueryProcessor(**self.simple_arguments)
|
||||
proc.execute()
|
||||
cursor.execute.assert_called_once_with(
|
||||
'\nSELECT something\n FROM awesome\n\n\n \n \n\n \n', {})
|
||||
|
||||
def test_simple_count_with_execution(self):
|
||||
cursor = mock.MagicMock()
|
||||
self.context_db.cnx.cursor.return_value = cursor
|
||||
cursor.fetchall.return_value = [('some count',)]
|
||||
args = self.simple_arguments.copy()
|
||||
args['opts'] = {'countOnly': True}
|
||||
proc = kojihub.QueryProcessor(**args)
|
||||
results = proc.execute()
|
||||
cursor.execute.assert_called_once_with(
|
||||
'\nSELECT count(*)\n FROM awesome\n\n\n \n \n\n \n', {})
|
||||
self.assertEqual(results, 'some count')
|
||||
|
||||
cursor.reset_mock()
|
||||
args['opts']['group'] = 'id'
|
||||
args['enable_group'] = True
|
||||
proc = kojihub.QueryProcessor(**args)
|
||||
results = proc.execute()
|
||||
cursor.execute.assert_called_once_with(
|
||||
'SELECT count(*)\nFROM (\nSELECT 1\n'
|
||||
' FROM awesome\n\n\n GROUP BY id\n \n\n \n) numrows', {})
|
||||
self.assertEqual(results, 'some count')
|
||||
|
||||
def test_simple_execution_with_iterate(self):
|
||||
cursor = mock.MagicMock()
|
||||
self.context_db.cnx.cursor.return_value = cursor
|
||||
cursor.fetchall.return_value = [
|
||||
('value number 1',),
|
||||
('value number 2',),
|
||||
('value number 3',),
|
||||
]
|
||||
proc = kojihub.QueryProcessor(**self.simple_arguments)
|
||||
generator = proc.iterate()
|
||||
calls = cursor.execute.mock_calls
|
||||
result = next(generator)
|
||||
# two calls so far..
|
||||
self.assertEqual(result, {'something': 'value number 1'})
|
||||
self.assertEqual(len(calls), 2)
|
||||
result = next(generator)
|
||||
# still two.
|
||||
self.assertEqual(result, {'something': 'value number 2'})
|
||||
self.assertEqual(len(calls), 2)
|
||||
# now three.
|
||||
result = next(generator)
|
||||
self.assertEqual(result, {'something': 'value number 3'})
|
||||
|
||||
@mock.patch('kojihub.db._multiRow')
|
||||
def test_execution_as_list_transform(self, multirow):
|
||||
multirow.return_value = [{'col1': 'result_1_col_1', 'col2': 'result_1_col_2'},
|
||||
{'col1': 'result_2_col_1', 'col2': 'result_2_col_2'}]
|
||||
args = dict(
|
||||
columns=['col1', 'col2'],
|
||||
tables=['table'],
|
||||
opts={'asList': True},
|
||||
transform=lambda x: x,
|
||||
)
|
||||
proc = kojihub.QueryProcessor(**args)
|
||||
results = proc.execute()
|
||||
self.assertEqual(
|
||||
results, [['result_1_col_1', 'result_1_col_2'], ['result_2_col_1', 'result_2_col_2']])
|
||||
|
|
@ -1,27 +0,0 @@
|
|||
try:
|
||||
from unittest import mock
|
||||
except ImportError:
|
||||
import mock
|
||||
|
||||
import unittest
|
||||
|
||||
import kojihub
|
||||
|
||||
|
||||
class TestSavepoint(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.dml = mock.patch('kojihub.db._dml').start()
|
||||
self.context_db = mock.patch('kojihub.db.context').start()
|
||||
|
||||
def tearDown(self):
|
||||
mock.patch.stopall()
|
||||
|
||||
def test_savepoint(self):
|
||||
sp = kojihub.Savepoint('some_name')
|
||||
self.assertEqual(sp.name, 'some_name')
|
||||
self.dml.assert_called_once_with('SAVEPOINT some_name', {})
|
||||
|
||||
self.dml.reset_mock()
|
||||
sp.rollback()
|
||||
self.dml.assert_called_once_with('ROLLBACK TO SAVEPOINT some_name', {})
|
||||
|
|
@ -1,37 +0,0 @@
|
|||
try:
|
||||
from unittest import mock
|
||||
except ImportError:
|
||||
import mock
|
||||
import unittest
|
||||
|
||||
import kojihub
|
||||
|
||||
|
||||
class TestUpdateProcessor(unittest.TestCase):
|
||||
|
||||
def test_basic_instantiation(self):
|
||||
kojihub.UpdateProcessor('sometable') # No exception!
|
||||
|
||||
def test_to_string_with_data(self):
|
||||
proc = kojihub.UpdateProcessor('sometable', data={'foo': 'bar'})
|
||||
actual = str(proc)
|
||||
expected = 'UPDATE sometable SET foo = %(data.foo)s'
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
def test_to_values_from_data(self):
|
||||
proc = kojihub.UpdateProcessor('sometable', data={'foo': 'bar'})
|
||||
actual = proc.get_values()
|
||||
expected = {'data.foo': 'bar'}
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
@mock.patch('kojihub.db.context')
|
||||
def test_simple_execution_with_iterate(self, context_db):
|
||||
cursor = mock.MagicMock()
|
||||
context_db.cnx.cursor.return_value = cursor
|
||||
proc = kojihub.UpdateProcessor('sometable', data={'foo': 'bar'})
|
||||
proc.execute()
|
||||
cursor.execute.assert_called_once_with(
|
||||
'UPDATE sometable SET foo = %(data.foo)s',
|
||||
{'data.foo': 'bar'},
|
||||
log_errors=True,
|
||||
)
|
||||
Loading…
Add table
Add a link
Reference in a new issue