From 2da05e9e59444a928fa9a6531ae798bd3e4dc7ca Mon Sep 17 00:00:00 2001 From: Yuming Zhu Date: Thu, 22 Sep 2016 06:14:37 +0000 Subject: [PATCH] ut: cli - test_watch_tasks --- tests/test_cli/test_list_commands.py | 16 +-- tests/test_cli/test_runroot.py | 14 +-- tests/test_cli/test_watch_tasks.py | 173 +++++++++++++++++++++++++++ 3 files changed, 177 insertions(+), 26 deletions(-) create mode 100644 tests/test_cli/test_watch_tasks.py diff --git a/tests/test_cli/test_list_commands.py b/tests/test_cli/test_list_commands.py index a4de0d99..720f6abf 100644 --- a/tests/test_cli/test_list_commands.py +++ b/tests/test_cli/test_list_commands.py @@ -6,20 +6,8 @@ import StringIO as stringio import mock - -# We have to do this craziness because 'import koji' is ambiguous. Is it the -# koji module, or the koji cli module. Jump through hoops accordingly. -# http://stackoverflow.com/questions/67631/how-to-import-a-module-given-the-full-path -CLI_FILENAME = os.path.dirname(__file__) + "/../../cli/koji" -if sys.version_info[0] >= 3: - import importlib.util - spec = importlib.util.spec_from_file_location("koji_cli", CLI_FILENAME) - cli = importlib.util.module_from_spec(spec) - spec.loader.exec_module(cli) -else: - import imp - cli = imp.load_source('koji_cli', CLI_FILENAME) - +import loadcli +cli = loadcli.cli class TestListCommands(unittest.TestCase): def setUp(self): diff --git a/tests/test_cli/test_runroot.py b/tests/test_cli/test_runroot.py index 7fd27776..91c3866b 100644 --- a/tests/test_cli/test_runroot.py +++ b/tests/test_cli/test_runroot.py @@ -6,19 +6,9 @@ import StringIO as stringio import mock +import loadcli +cli = loadcli.cli -# We have to do this craziness because 'import koji' is ambiguous. Is it the -# koji module, or the koji cli module. Jump through hoops accordingly. -# http://stackoverflow.com/questions/67631/how-to-import-a-module-given-the-full-path -CLI_FILENAME = os.path.dirname(__file__) + "/../../cli/koji" -if sys.version_info[0] >= 3: - import importlib.util - spec = importlib.util.spec_from_file_location("koji_cli", CLI_FILENAME) - cli = importlib.util.module_from_spec(spec) - spec.loader.exec_module(cli) -else: - import imp - cli = imp.load_source('koji_cli', CLI_FILENAME) class TestListCommands(unittest.TestCase): diff --git a/tests/test_cli/test_watch_tasks.py b/tests/test_cli/test_watch_tasks.py new file mode 100644 index 00000000..f1c83327 --- /dev/null +++ b/tests/test_cli/test_watch_tasks.py @@ -0,0 +1,173 @@ +import unittest + +import StringIO as stringio + +import mock + +from mock import call + +import loadcli + +cli = loadcli.cli + + +class TestWatchTasks(unittest.TestCase): + + def setUp(self): + self.options = mock.MagicMock() + cli.options = self.options + self.session = mock.MagicMock(name='sessionMock') + self.args = mock.MagicMock() + self.original_parser = cli.OptionParser + cli.OptionParser = mock.MagicMock() + self.parser = cli.OptionParser.return_value + + def tearDown(self): + cli.OptionParser = self.original_parser + + # Show long diffs in error output... + maxDiff = None + + @mock.patch('sys.stdout', new_callable=stringio.StringIO) + def test_watch_tasks_no_tasklist(self, stdout): + returned = cli.watch_tasks(self.session, []) + actual = stdout.getvalue() + expected = "" + self.assertIsNone(returned) + self.assertEqual(actual, expected) + + @mock.patch('sys.stdout', new_callable=stringio.StringIO) + @mock.patch('koji_cli.TaskWatcher') + @mock.patch('koji_cli.display_tasklist_status') + @mock.patch('koji_cli.display_task_results') + def test_watch_tasks(self, dtrMock, dtsMock, twClzMock, stdout): + self.options.poll_interval = 0 + global options + option = self.options + manager = mock.MagicMock() + manager.attach_mock(twClzMock, 'TaskWatcherMock') + manager.attach_mock(dtrMock, 'display_task_results_mock') + manager.attach_mock(dtsMock, 'display_tasklist_status_mock') + tw1 = manager.tw1 + tw1.level = 0 + tw1.is_done.side_effect = [False, True, False, True, True] + tw1.update.side_effect = [False, False, True, True, True] + tw1.is_success.return_value = False + tw2 = manager.tw2 + tw2.level = 0 + tw2.is_done.side_effect = [False, False, False, False, True] + tw2.update.side_effect = [True, False, False, True, True] + tw2.is_success.return_value = False + self.session.getTaskChildren.side_effect = lambda p: [ + {'id': 11}, {'id': 12}] if (0 == p) else [] + manager.attach_mock(self.session, 'sessionMock') + + def side_effect(*args, **kwargs): + rt = None + if args[0] not in range(2): + rt = mock.MagicMock() + rt.level = args[2] + rt.is_done.return_value = True + rt.update.return_value = True + rt.is_success.return_value = True + manager.attach_mock(rt, 'tw' + str(args[0])) + else: + rt = {0: tw1, 1: tw2}.get(args[0]) + return rt + + twClzMock.side_effect = side_effect + rv = cli.watch_tasks(self.session, range(2), quiet=False) + actual = stdout.getvalue() + self.assertMultiLineEqual( + actual, "Watching tasks (this may be safely interrupted)...\n\n") + self.assertEqual(rv, 1) + self.assertEqual(manager.mock_calls, + [call.TaskWatcherMock(0, self.session, quiet=False), + call.TaskWatcherMock(1, self.session, quiet=False), + call.tw1.update(), + call.tw1.is_done(), + call.sessionMock.getTaskChildren(0), + call.TaskWatcherMock(11, self.session, 1, quiet=False), + call.tw11.update(), + call.TaskWatcherMock(12, self.session, 1, quiet=False), + call.tw12.update(), + call.tw2.update(), + call.tw2.is_done(), + call.sessionMock.getTaskChildren(1), + call.tw1.update(), + call.tw1.is_done(), + call.tw1.is_success(), + call.sessionMock.getTaskChildren(0), + call.tw2.update(), + call.tw2.is_done(), + call.sessionMock.getTaskChildren(1), + call.tw11.update(), + call.tw11.is_done(), + call.display_tasklist_status_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12}), + call.tw11.is_success(), + call.sessionMock.getTaskChildren(11), + call.tw12.update(), + call.tw12.is_done(), + call.display_tasklist_status_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12}), + call.tw12.is_success(), + call.sessionMock.getTaskChildren(12), + call.tw1.update(), + call.tw1.is_done(), + call.sessionMock.getTaskChildren(0), + call.tw2.update(), + call.tw2.is_done(), + call.sessionMock.getTaskChildren(1), + call.tw11.update(), + call.tw11.is_done(), + call.display_tasklist_status_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12}), + call.tw11.is_success(), + call.sessionMock.getTaskChildren(11), + call.tw12.update(), + call.tw12.is_done(), + call.display_tasklist_status_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12}), + call.tw12.is_success(), + call.sessionMock.getTaskChildren(12), + call.tw1.update(), + call.tw1.is_done(), + call.display_tasklist_status_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12}), + call.tw1.is_success(), + call.sessionMock.getTaskChildren(0), + call.tw2.update(), + call.tw2.is_done(), + call.sessionMock.getTaskChildren(1), + call.tw11.update(), + call.tw11.is_done(), + call.display_tasklist_status_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12}), + call.tw11.is_success(), + call.sessionMock.getTaskChildren(11), + call.tw12.update(), + call.tw12.is_done(), + call.display_tasklist_status_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12}), + call.tw12.is_success(), + call.sessionMock.getTaskChildren(12), + call.tw1.update(), + call.tw1.is_done(), + call.display_tasklist_status_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12}), + call.tw1.is_success(), + call.sessionMock.getTaskChildren(0), + call.tw2.update(), + call.tw2.is_done(), + call.display_tasklist_status_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12}), + call.tw2.is_success(), + call.sessionMock.getTaskChildren(1), + call.tw11.update(), + call.tw11.is_done(), + call.display_tasklist_status_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12}), + call.tw11.is_success(), + call.sessionMock.getTaskChildren(11), + call.tw12.update(), + call.tw12.is_done(), + call.display_tasklist_status_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12}), + call.tw12.is_success(), + call.sessionMock.getTaskChildren(12), + call.display_task_results_mock({0: tw1, 1: tw2, 11: manager.tw11, 12: manager.tw12}) + ]) + + +if __name__ == '__main__': + unittest.main()