230 lines
10 KiB
Python
230 lines
10 KiB
Python
import unittest
|
|
|
|
import os
|
|
import sys
|
|
import StringIO as stringio
|
|
|
|
import mock
|
|
|
|
from mock import call
|
|
|
|
import loadcli
|
|
|
|
cli = loadcli.cli
|
|
|
|
|
|
class TestWatchTasks(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.options = mock.MagicMock()
|
|
cli.options = self.options
|
|
self.session = mock.MagicMock(name='sessionMock')
|
|
self.args = mock.MagicMock()
|
|
self.original_parser = cli.OptionParser
|
|
cli.OptionParser = mock.MagicMock()
|
|
self.parser = cli.OptionParser.return_value
|
|
|
|
def tearDown(self):
|
|
cli.OptionParser = self.original_parser
|
|
|
|
# Show long diffs in error output...
|
|
maxDiff = None
|
|
|
|
@mock.patch('sys.stdout', new_callable=stringio.StringIO)
|
|
def test_watch_tasks_no_tasklist(self, stdout):
|
|
returned = cli.watch_tasks(self.session, [])
|
|
actual = stdout.getvalue()
|
|
expected = ""
|
|
self.assertIsNone(returned)
|
|
self.assertEqual(actual, expected)
|
|
|
|
@mock.patch('sys.stdout', new_callable=stringio.StringIO)
|
|
@mock.patch('koji_cli.TaskWatcher')
|
|
@mock.patch('koji_cli.display_tasklist_status')
|
|
@mock.patch('koji_cli.display_task_results')
|
|
def test_watch_tasks(self, dtrMock, dtsMock, twClzMock, stdout):
|
|
self.options.poll_interval = 0
|
|
manager = mock.MagicMock()
|
|
manager.attach_mock(twClzMock, 'TaskWatcherMock')
|
|
manager.attach_mock(dtrMock, 'display_task_results_mock')
|
|
manager.attach_mock(dtsMock, 'display_tasklist_status_mock')
|
|
tw1 = manager.tw1
|
|
tw1.level = 0
|
|
tw1.is_done.side_effect = [False, True, False, True, True]
|
|
tw1.update.side_effect = [False, False, True, True, True]
|
|
tw1.is_success.return_value = False
|
|
tw2 = manager.tw2
|
|
tw2.level = 0
|
|
tw2.is_done.side_effect = [False, False, False, False, True]
|
|
tw2.update.side_effect = [True, False, False, True, True]
|
|
tw2.is_success.return_value = False
|
|
self.session.getTaskChildren.side_effect = lambda p: [
|
|
{'id': 11}, {'id': 12}] if (0 == p) else []
|
|
manager.attach_mock(self.session, 'sessionMock')
|
|
|
|
def side_effect(*args, **kwargs):
|
|
rt = None
|
|
if args[0] not in range(2):
|
|
rt = mock.MagicMock()
|
|
rt.level = args[2]
|
|
rt.is_done.return_value = True
|
|
rt.update.return_value = True
|
|
rt.is_success.return_value = True
|
|
manager.attach_mock(rt, 'tw' + str(args[0]))
|
|
else:
|
|
rt = {0: tw1, 1: tw2}.get(args[0])
|
|
return rt
|
|
|
|
twClzMock.side_effect = side_effect
|
|
rv = cli.watch_tasks(self.session, range(2), quiet=False)
|
|
actual = stdout.getvalue()
|
|
self.assertMultiLineEqual(
|
|
actual, "Watching tasks (this may be safely interrupted)...\n\n")
|
|
self.assertEqual(rv, 1)
|
|
self.assertEqual(manager.mock_calls,
|
|
[call.TaskWatcherMock(0, self.session, quiet=False),
|
|
call.TaskWatcherMock(1, self.session, quiet=False),
|
|
call.tw1.update(),
|
|
call.tw1.is_done(),
|
|
call.sessionMock.getTaskChildren(0),
|
|
call.TaskWatcherMock(11, self.session, 1, quiet=False),
|
|
call.tw11.update(),
|
|
call.TaskWatcherMock(12, self.session, 1, quiet=False),
|
|
call.tw12.update(),
|
|
call.tw2.update(),
|
|
call.tw2.is_done(),
|
|
call.sessionMock.getTaskChildren(1),
|
|
call.tw1.update(),
|
|
call.tw1.is_done(),
|
|
call.tw1.is_success(),
|
|
call.sessionMock.getTaskChildren(0),
|
|
call.tw2.update(),
|
|
call.tw2.is_done(),
|
|
call.sessionMock.getTaskChildren(1),
|
|
call.tw11.update(),
|
|
call.tw11.is_done(),
|
|
call.display_tasklist_status_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12}),
|
|
call.tw11.is_success(),
|
|
call.sessionMock.getTaskChildren(11),
|
|
call.tw12.update(),
|
|
call.tw12.is_done(),
|
|
call.display_tasklist_status_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12}),
|
|
call.tw12.is_success(),
|
|
call.sessionMock.getTaskChildren(12),
|
|
call.tw1.update(),
|
|
call.tw1.is_done(),
|
|
call.sessionMock.getTaskChildren(0),
|
|
call.tw2.update(),
|
|
call.tw2.is_done(),
|
|
call.sessionMock.getTaskChildren(1),
|
|
call.tw11.update(),
|
|
call.tw11.is_done(),
|
|
call.display_tasklist_status_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12}),
|
|
call.tw11.is_success(),
|
|
call.sessionMock.getTaskChildren(11),
|
|
call.tw12.update(),
|
|
call.tw12.is_done(),
|
|
call.display_tasklist_status_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12}),
|
|
call.tw12.is_success(),
|
|
call.sessionMock.getTaskChildren(12),
|
|
call.tw1.update(),
|
|
call.tw1.is_done(),
|
|
call.display_tasklist_status_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12}),
|
|
call.tw1.is_success(),
|
|
call.sessionMock.getTaskChildren(0),
|
|
call.tw2.update(),
|
|
call.tw2.is_done(),
|
|
call.sessionMock.getTaskChildren(1),
|
|
call.tw11.update(),
|
|
call.tw11.is_done(),
|
|
call.display_tasklist_status_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12}),
|
|
call.tw11.is_success(),
|
|
call.sessionMock.getTaskChildren(11),
|
|
call.tw12.update(),
|
|
call.tw12.is_done(),
|
|
call.display_tasklist_status_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12}),
|
|
call.tw12.is_success(),
|
|
call.sessionMock.getTaskChildren(12),
|
|
call.tw1.update(),
|
|
call.tw1.is_done(),
|
|
call.display_tasklist_status_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12}),
|
|
call.tw1.is_success(),
|
|
call.sessionMock.getTaskChildren(0),
|
|
call.tw2.update(),
|
|
call.tw2.is_done(),
|
|
call.display_tasklist_status_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12}),
|
|
call.tw2.is_success(),
|
|
call.sessionMock.getTaskChildren(1),
|
|
call.tw11.update(),
|
|
call.tw11.is_done(),
|
|
call.display_tasklist_status_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12}),
|
|
call.tw11.is_success(),
|
|
call.sessionMock.getTaskChildren(11),
|
|
call.tw12.update(),
|
|
call.tw12.is_done(),
|
|
call.display_tasklist_status_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12}),
|
|
call.tw12.is_success(),
|
|
call.sessionMock.getTaskChildren(12),
|
|
call.display_task_results_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12})
|
|
])
|
|
|
|
@mock.patch('sys.stdout', new_callable=stringio.StringIO)
|
|
@mock.patch('koji_cli.TaskWatcher')
|
|
@mock.patch('koji_cli.display_tasklist_status')
|
|
@mock.patch('koji_cli.display_task_results')
|
|
def test_watch_tasks_with_keyboardinterrupt(
|
|
self, dtrMock, dtsMock, twClzMock, stdout):
|
|
"""Raise KeyboardInterrupt inner watch_tasks.
|
|
Raising it by SIGNAL might be better"""
|
|
self.options.poll_interval = 0
|
|
manager = mock.MagicMock()
|
|
manager.attach_mock(twClzMock, 'TaskWatcherMock')
|
|
manager.attach_mock(dtrMock, 'display_task_results_mock')
|
|
manager.attach_mock(dtsMock, 'display_tasklist_status_mock')
|
|
tw1 = manager.tw1
|
|
tw1.level = 0
|
|
tw1.is_done.side_effect = [False, KeyboardInterrupt, False]
|
|
tw1.update.side_effect = [False, False]
|
|
tw1.is_success.return_value = False
|
|
tw1.str.return_value = 'tw1'
|
|
tw1.display_state.return_value = 'tw1.display_state'
|
|
tw2 = manager.tw2
|
|
tw2.level = 0
|
|
tw2.is_done.side_effect = [False, False, False, False, True]
|
|
tw2.update.side_effect = [True, False, False, True, True]
|
|
tw2.is_success.return_value = False
|
|
tw2.str.return_value = 'tw2'
|
|
tw2.display_state.return_value = 'tw2.display_state'
|
|
self.session.getTaskChildren.side_effect = lambda p: [
|
|
{'id': 11}, {'id': 12}] if (0 == p) else []
|
|
manager.attach_mock(self.session, 'sessionMock')
|
|
|
|
def side_effect(*args, **kwargs):
|
|
rt = None
|
|
if args[0] not in range(2):
|
|
rt = mock.MagicMock()
|
|
rt.level = args[2]
|
|
rt.is_done.return_value = True
|
|
rt.update.return_value = True
|
|
rt.is_success.return_value = True
|
|
manager.attach_mock(rt, 'tw' + str(args[0]))
|
|
else:
|
|
rt = {0: tw1, 1: tw2}.get(args[0])
|
|
return rt
|
|
|
|
twClzMock.side_effect = side_effect
|
|
|
|
with self.assertRaises(KeyboardInterrupt):
|
|
cli.watch_tasks(self.session, range(2), quiet=False)
|
|
|
|
actual = stdout.getvalue()
|
|
self.assertMultiLineEqual(
|
|
actual, """Watching tasks (this may be safely interrupted)...
|
|
Tasks still running. You can continue to watch with the '%s watch-task' command.
|
|
Running Tasks:
|
|
tw1: tw1.display_state
|
|
tw2: tw2.display_state
|
|
""" % (os.path.basename(sys.argv[0]) or 'koji'))
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|