fix some unittests

This commit is contained in:
Yu Ming Zhu 2018-12-11 09:04:44 +00:00 committed by Mike McLean
parent 200951c9d6
commit eb36620aa3
4 changed files with 54 additions and 7 deletions

237
tests/test_lib/test_auth.py Normal file
View file

@ -0,0 +1,237 @@
from __future__ import absolute_import
import mock
try:
import unittest2 as unittest
except ImportError:
import unittest
import six
import koji
import koji.auth
class TestAuthSession(unittest.TestCase):
def test_instance(self):
"""Simple auth.Session instance"""
s = koji.auth.Session()
# no args in request/environment
self.assertEqual(s.message, 'no session args')
@mock.patch('koji.auth.context')
def get_session(self, context):
"""auth.Session instance"""
# base session from test_basic_instance
context.environ = {
'QUERY_STRING': 'session-id=123&session-key=xyz&callnum=345',
'REMOTE_ADDR': 'remote-addr',
}
cursor = mock.MagicMock(name='cursor')
context.cnx.cursor.return_value = cursor
cursor.fetchone.side_effect = [
# get session
[koji.AUTHTYPE_NORMAL, 344, False, False, 'master', 'start_time',
'start_ts', 'update_time', 'update_ts', 'user_id'],
# get user
['name', koji.USER_STATUS['NORMAL'], koji.USERTYPES['NORMAL']],
# get excl.session
None,
# upd. timestamp
None,
# upd callnum
None,
]
s = koji.auth.Session()
return s, context, cursor
@mock.patch('koji.auth.context')
def test_basic_instance(self, context):
"""auth.Session instance"""
s, cntext, cursor = self.get_session()
context.cnx = cntext.cnx
self.assertEqual(s.id, 123)
self.assertEqual(s.key, 'xyz')
self.assertEqual(s.hostip, 'remote-addr')
self.assertEqual(s.callnum, 345)
self.assertEqual(s.user_id, 'user_id')
self.assertEqual(s.authtype, koji.AUTHTYPE_NORMAL)
self.assertEqual(s.master, 'master')
self.assertTrue(s.logged_in)
# 5 SQL calls: get session, get user, get excl. session,
# update timestamp, update callnum
self.assertEqual(cursor.execute.call_count, 5)
@mock.patch('koji.auth.context')
def test_getattr(self, context):
"""auth.Session instance"""
s, cntext, cursor = self.get_session()
context.cnx = cntext.cnx
# test
self.assertEqual(s.perms, {})
self.assertEqual(s.groups, {})
self.assertEqual(s.host_id, None)
# all other names should raise error
with self.assertRaises(AttributeError):
s.non_existing_attribute
@mock.patch('koji.auth.context')
def test_str(self, context):
"""auth.Session string representation"""
s, cntext, cursor = self.get_session()
context.cnx = cntext.cnx
s.logged_in = False
s.message = 'msg'
self.assertEqual(str(s), 'session: not logged in (msg)')
s.logged_in = True
self.assertNotEqual(str(s), 'session: not logged in')
@mock.patch('koji.auth.context')
def test_validate(self, context):
"""Session.validate"""
s, cntext, cursor = self.get_session()
context.cnx = cntext.cnx
s.lockerror = True
with self.assertRaises(koji.AuthLockError):
s.validate()
s.lockerror = False
self.assertTrue(s.validate())
@mock.patch('koji.auth.context')
def test_makeShared(self, context):
"""Session.makeShared"""
s, cntext, cursor = self.get_session()
context.cnx = cntext.cnx
s.makeShared()
c = cursor.execute.call_args[0]
self.assertEqual(c[0],
'UPDATE sessions SET "exclusive"=NULL WHERE id=%(session_id)s')
self.assertEqual(c[1]['session_id'], 123)
@mock.patch('socket.gethostbyname')
@mock.patch('koji.auth.context')
def test_get_remote_ip(self, context, gethostbyname):
"""Session.get_remote_ip"""
s, cntext, cursor = self.get_session()
context.opts = {'CheckClientIP': False}
self.assertEqual(s.get_remote_ip(), '-')
context.opts = {'CheckClientIP': True}
self.assertEqual(s.get_remote_ip(override='xoverride'), 'xoverride')
context.environ = {'REMOTE_ADDR': '123.123.123.123'}
self.assertEqual(s.get_remote_ip(), '123.123.123.123')
gethostbyname.return_value = 'ip'
context.environ = {'REMOTE_ADDR': '127.0.0.1'}
self.assertEqual(s.get_remote_ip(), 'ip')
@mock.patch('koji.auth.context')
def test_login(self, context):
s, cntext, cursor = self.get_session()
context.cnx = cntext.cnx
# already logged in
with self.assertRaises(koji.GenericError):
s.login('user', 'password')
s.logged_in = False
with self.assertRaises(koji.AuthError):
s.login('user', 123)
with self.assertRaises(koji.AuthError):
s.login('user', '')
# correct
s.get_remote_ip = mock.MagicMock()
s.get_remote_ip.return_value = 'hostip'
s.checkLoginAllowed = mock.MagicMock()
s.checkLoginAllowed.return_value = True
s.createSession = mock.MagicMock()
s.createSession.return_value = {'session-id': 'session-id'}
cursor.fetchone = mock.MagicMock()
cursor.fetchone.return_value = ['user_id']
result = s.login('user', 'password')
self.assertEqual(s.get_remote_ip.call_count, 1)
self.assertEqual(s.checkLoginAllowed.call_args, mock.call('user_id'))
self.assertEqual(result, s.createSession.return_value)
# one more try for non-existing user
cursor.fetchone.return_value = None
with self.assertRaises(koji.AuthError):
s.login('user', 'password')
@mock.patch('koji.auth.context')
@mock.patch('koji.auth.socket')
@mock.patch('koji.auth.base64')
def test_krbLogin(self, base64, socket, context):
# TODO
s, cntext, cursor = self.get_session()
context.cnx = cntext.cnx
with self.assertRaises(koji.AuthError) as cm:
s.krbLogin('krb_req', 'proxyuser')
self.assertEqual(cm.exception.args[0], 'Already logged in')
s.logged_in = False
if six.PY3:
with self.assertRaises(koji.AuthError) as cm:
s.krbLogin('krb_req', 'proxyuser')
self.assertEqual(cm.exception.args[0], 'krbV module not installed')
else:
with mock.patch('koji.auth.krbV', create=True) as krbV:
princ = mock.MagicMock()
princ.name = 'princ_name'
krbV.default_context.return_value \
.rd_req.return_value = (mock.MagicMock(), 2, 3,
[1, 2, princ])
with self.assertRaises(koji.AuthError) as cm:
s.krbLogin('krb_req', 'proxyuser')
self.assertEqual(cm.exception.args[0],
'Kerberos principal princ_name is'
' not authorized to log in other users')
context.opts = {'ProxyPrincipals': 'anyothers,' + princ.name,
'AuthPrincipal': 'authprinc',
'AuthKeytab': 'authkeytab',
'LoginCreatesUser': False,
'CheckClientIP': False}
with self.assertRaises(koji.AuthError) as cm:
s.krbLogin('krb_req', 'proxyuser@realm.com')
self.assertEqual(cm.exception.args[0],
'Unknown Kerberos principal:'
' proxyuser@realm.com')
context.opts['LoginCreatesUser'] = True
context.cnx.cursor.return_value. \
fetchone.side_effect = [None,
None,
None,
(1,),
('name', 'type',
koji.USER_STATUS['NORMAL']),
('session-id',)]
s.krbLogin('krb_req', 'proxyuser@realm.com')
# functions outside Session object
@mock.patch('koji.auth.context')
def test_get_user_data(self, context):
"""koji.auth.get_user_data"""
cursor = mock.MagicMock(name='cursor')
context.cnx.cursor.return_value = cursor
cursor.fetchone.return_value = ['name', 'status', 'usertype']
self.assertEqual(sorted(koji.auth.get_user_data(1).items()),
sorted({'name': 'name', 'status': 'status',
'usertype': 'usertype'}.items()))
cursor.fetchone.return_value = None
self.assertEqual(koji.auth.get_user_data(1), None)

View file

@ -2,6 +2,7 @@ from __future__ import absolute_import
import six
import time
import random
from six.moves import range
try:
import unittest2 as unittest
except ImportError:

View file

@ -0,0 +1,41 @@
from __future__ import absolute_import
import base64
# This is python-mock, not the rpm mock tool we know and love
import mock
import six
try:
import unittest2 as unittest
except ImportError:
import unittest
import koji
class KrbVTestCase(unittest.TestCase):
@mock.patch('koji.krbV', new=None)
@mock.patch('koji.requests_kerberos', new=None)
def test_krbv_disabled(self):
"""Test that when krbV and gssapi are absent, we behave rationally"""
self.assertEquals(koji.krbV, None)
session = koji.ClientSession('whatever')
with self.assertRaises(ImportError):
session.krb_login()
# this case should work on python3, but skipped still
@unittest.skipIf(six.PY3, "skipped on python3 since missing of python-krbV")
@mock.patch('koji.krbV', create=True)
@mock.patch('requests_kerberos.__version__', new='0.7.0')
@mock.patch('koji.ClientSession._serverPrincipal')
def test_krbv_old_requests_kerberos(self, _serverPrincipal_mock, krbV_mock):
self.assertIsNotNone(koji.krbV)
ctx = koji.krbV.default_context.return_value
ctx.mk_req = mock.MagicMock()
ac = mock.MagicMock()
ctx.mk_req.return_value = (ac, six.b('req'))
ac.rd_priv = mock.MagicMock(return_value='session-id session-key')
session = koji.ClientSession('whatever')
session._callMethod = mock.MagicMock(
return_value=(base64.encodestring(six.b('a')), base64.encodestring(six.b('b')), [0, 1, 2, 3]))
rv = session.krb_login(principal='any@SOMEWHERE.COM', keytab='/path/to/keytab')
self.assertTrue(rv)

View file

@ -0,0 +1,245 @@
from __future__ import absolute_import
import mock
import shutil
import tempfile
try:
import unittest2 as unittest
except ImportError:
import unittest
import koji.tasks
class TestRestartTask(unittest.TestCase):
def setUp(self):
self.session = mock.MagicMock()
self.options = mock.MagicMock()
self.manager = mock.MagicMock()
self.workdir = tempfile.mkdtemp()
self.options.workdir = self.workdir
self.safe_rmtree = mock.patch('koji.tasks.safe_rmtree').start()
def tearDown(self):
shutil.rmtree(self.workdir)
mock.patch.stopall()
def get_handler(self, *args, **kwargs):
params = koji.encode_args(*args, **kwargs)
handler = koji.tasks.RestartTask(137, 'restart', params, self.session,
self.options)
# this is a foreground task
handler.setManager(self.manager)
return handler
def test_restart_task(self):
host = {'id': 'HOST ID'}
self.session.host.getID.return_value = "HOST ID"
handler = self.get_handler(host)
self.assertEqual(handler.Foreground, True)
result = handler.run()
self.assertEqual(self.manager.restart_pending, True)
def test_restart_wrong_host(self):
host = {'id': 'HOST ID'}
self.session.host.getID.return_value = "ANOTHER HOST"
handler = self.get_handler(host)
self.assertEqual(handler.Foreground, True)
with self.assertRaises(koji.GenericError):
result = handler.run()
class TestRestartVerifyTask(unittest.TestCase):
def setUp(self):
self.session = mock.MagicMock()
self.options = mock.MagicMock()
self.manager = mock.MagicMock()
self.workdir = tempfile.mkdtemp()
self.options.workdir = self.workdir
self.safe_rmtree = mock.patch('koji.tasks.safe_rmtree').start()
def tearDown(self):
shutil.rmtree(self.workdir)
mock.patch.stopall()
def get_handler(self, *args, **kwargs):
params = koji.encode_args(*args, **kwargs)
handler = koji.tasks.RestartVerifyTask(137, 'restartVerify', params, self.session,
self.options)
# this is a foreground task
handler.setManager(self.manager)
return handler
def test_restart_verify_task(self):
task1 = {
'id': 'TASK ID',
'state': koji.TASK_STATES['CLOSED'],
'completion_ts': 10,
}
host = {'id': 'HOST ID'}
self.session.host.getID.return_value = "HOST ID"
self.session.getTaskInfo.return_value = task1
handler = self.get_handler(task1['id'], host)
self.manager.start_time = 100 # greater than task1['start_time']
self.assertEqual(handler.Foreground, True)
result = handler.run()
def test_restart_verify_not_closed(self):
task1 = {
'id': 'TASK ID',
'state': koji.TASK_STATES['OPEN'],
'completion_ts': 10,
}
host = {'id': 'HOST ID'}
self.session.host.getID.return_value = "HOST ID"
self.session.getTaskInfo.return_value = task1
handler = self.get_handler(task1['id'], host)
try:
result = handler.run()
except koji.GenericError as e:
self.assertEqual(e.args[0], 'Stage one restart task is OPEN')
else:
raise Exception('Error not raised')
def test_restart_verify_wrong_host(self):
task1 = {
'id': 'TASK ID',
'state': koji.TASK_STATES['CLOSED'],
'completion_ts': 10,
}
host = {'id': 'HOST ID'}
self.session.host.getID.return_value = "OTHER HOST"
self.session.getTaskInfo.return_value = task1
handler = self.get_handler(task1['id'], host)
try:
result = handler.run()
except koji.GenericError as e:
self.assertEqual(e.args[0], 'Host mismatch')
else:
raise Exception('Error not raised')
def test_restart_verify_wrong_time(self):
task1 = {
'id': 'TASK ID',
'state': koji.TASK_STATES['CLOSED'],
'completion_ts': 10,
}
host = {'id': 'HOST ID'}
self.session.host.getID.return_value = "HOST ID"
self.session.getTaskInfo.return_value = task1
handler = self.get_handler(task1['id'], host)
self.manager.start_time = 0 # LESS THAN task1['start_time']
try:
result = handler.run()
except koji.GenericError as e:
self.assertEqual(e.args[0][:30], 'Restart failed - start time is')
else:
raise Exception('Error not raised')
class TestRestartHostsTask(unittest.TestCase):
def setUp(self):
self.session = mock.MagicMock()
self.options = mock.MagicMock()
self.manager = mock.MagicMock()
self.workdir = tempfile.mkdtemp()
self.options.workdir = self.workdir
self.safe_rmtree = mock.patch('koji.tasks.safe_rmtree').start()
def tearDown(self):
shutil.rmtree(self.workdir)
mock.patch.stopall()
def get_handler(self, *args, **kwargs):
params = koji.encode_args(*args, **kwargs)
handler = koji.tasks.RestartHostsTask(137, 'restartHosts', params, self.session,
self.options)
handler.wait = mock.MagicMock()
handler.subtask = mock.MagicMock()
return handler
def test_restart_hosts_task(self):
self.session.host.getID.return_value = "THIS HOST"
host = {'id': 99}
self.session.listHosts.return_value = [host]
handler = self.get_handler({})
handler.subtask.side_effect = [101, 102]
result = handler.run()
self.session.listHosts.assert_called_once_with(enabled=True)
self.session.taskFinished.assert_not_called()
handler.wait.assert_called_once_with([101, 102], all=True, timeout=3600*24)
# subtask calls
call1 = mock.call('restart', [host], assign=host['id'], label="restart %i" % host['id'])
call2 = mock.call('restartVerify', [101, host], assign=host['id'], label="sleep %i" % host['id'])
handler.subtask.assert_has_calls([call1, call2])
def test_restart_hosts_no_host(self):
self.session.listHosts.return_value = []
handler = self.get_handler({})
try:
result = handler.run()
except koji.GenericError as e:
self.assertEqual(e.args[0], 'No matching hosts')
else:
raise Exception('Error not raised')
self.session.listHosts.assert_called_once_with(enabled=True)
self.session.taskFinished.assert_not_called()
handler.wait.assert_not_called()
handler.subtask.assert_not_called()
def test_restart_hosts_with_opts(self):
self.session.host.getID.return_value = "THIS HOST"
host = {'id': 99}
self.session.listHosts.return_value = [host]
self.session.getChannel.return_value = {'id': 1, 'name': 'default'}
handler = self.get_handler({'channel': 'default', 'arches': ['x86_64']})
handler.subtask.side_effect = [101, 102]
result = handler.run()
self.session.listHosts.assert_called_once_with(enabled=True, channelID=1, arches=['x86_64'])
self.session.taskFinished.assert_not_called()
handler.wait.assert_called_once_with([101, 102], all=True, timeout=3600*24)
# subtask calls
call1 = mock.call('restart', [host], assign=host['id'], label="restart %i" % host['id'])
call2 = mock.call('restartVerify', [101, host], assign=host['id'], label="sleep %i" % host['id'])
handler.subtask.assert_has_calls([call1, call2])
def test_restart_hosts_self_finished(self):
self.session.host.getID.return_value = 99
host = {'id': 99}
self.session.listHosts.return_value = [host]
handler = self.get_handler({})
self.session.taskFinished.return_value = True
handler.subtask.side_effect = [101, 102]
result = handler.run()
self.session.listHosts.assert_called_once_with(enabled=True)
self.session.taskFinished.assert_called_once()
call1 = mock.call('restart', [host], assign=host['id'], label="restart %i" % host['id'])
call2 = mock.call('restartVerify', [101, host], assign=host['id'], label="sleep %i" % host['id'])
handler.subtask.assert_has_calls([call1, call2])
call1 = mock.call(101, timeout=3600*24)
call2 = mock.call([101, 102], all=True, timeout=3600*24)
handler.wait.assert_has_calls([call1, call2])
def test_restart_hosts_self_unfinished(self):
self.session.host.getID.return_value = 99
host = {'id': 99}
self.session.listHosts.return_value = [host]
handler = self.get_handler({})
self.session.taskFinished.return_value = False
handler.subtask.side_effect = [101, 102]
with self.assertRaises(koji.tasks.ServerRestart):
result = handler.run()
self.session.listHosts.assert_called_once_with(enabled=True)
self.session.taskFinished.assert_called_once()
call1 = mock.call('restart', [host], assign=host['id'], label="restart %i" % host['id'])
call2 = mock.call('restartVerify', [101, host], assign=host['id'], label="sleep %i" % host['id'])
handler.subtask.assert_has_calls([call1, call2])
handler.wait.assert_called_once_with(101, timeout=3600*24)