test_lib_py2only dir
This commit is contained in:
parent
efe59699c6
commit
01a0894ee5
4 changed files with 0 additions and 0 deletions
|
|
@ -1,18 +0,0 @@
|
|||
from __future__ import absolute_import
|
||||
import unittest
|
||||
|
||||
# This is python-mock, not the rpm mock tool we know and love
|
||||
import mock
|
||||
|
||||
import koji
|
||||
|
||||
|
||||
class KrbVTestCase(unittest.TestCase):
|
||||
|
||||
@mock.patch('koji.krbV', new=None)
|
||||
def test_krbv_disabled(self):
|
||||
""" Test that when krbV is absent, we behave rationally. """
|
||||
self.assertEquals(koji.krbV, None)
|
||||
session = koji.ClientSession('whatever')
|
||||
with self.assertRaises(ImportError):
|
||||
session.krb_login()
|
||||
|
|
@ -1,299 +0,0 @@
|
|||
from __future__ import absolute_import
|
||||
import unittest
|
||||
|
||||
from nose.tools import raises
|
||||
|
||||
import koji.policy
|
||||
|
||||
|
||||
class MyBoolTest(koji.policy.BoolTest):
|
||||
name = 'bool_check'
|
||||
field = 'bool_field'
|
||||
|
||||
|
||||
class MyMatchTest(koji.policy.MatchTest):
|
||||
name = 'match_check'
|
||||
field = 'match_field'
|
||||
|
||||
|
||||
class myvarTest(koji.policy.CompareTest):
|
||||
name = None
|
||||
field = 'myvar'
|
||||
allow_float = False
|
||||
|
||||
|
||||
class TestBasicTests(unittest.TestCase):
|
||||
|
||||
@raises(NotImplementedError)
|
||||
def test_base_test(self):
|
||||
obj = koji.policy.BaseSimpleTest('something')
|
||||
obj.run({})
|
||||
|
||||
def test_true_test(self):
|
||||
obj = koji.policy.TrueTest('something')
|
||||
self.assertTrue(obj.run({}))
|
||||
|
||||
def test_false_test(self):
|
||||
obj = koji.policy.FalseTest('something')
|
||||
self.assertFalse(obj.run({}))
|
||||
|
||||
def test_all_test(self):
|
||||
obj = koji.policy.AllTest('something')
|
||||
self.assertTrue(obj.run({}))
|
||||
|
||||
def test_none_test(self):
|
||||
obj = koji.policy.NoneTest('something')
|
||||
self.assertFalse(obj.run({}))
|
||||
|
||||
def test_has_test(self):
|
||||
obj = koji.policy.HasTest('some thing')
|
||||
self.assertFalse(obj.run({}))
|
||||
self.assertFalse(obj.run({'blah': 'blah'}))
|
||||
self.assertTrue(obj.run({'thing': 'blah'}))
|
||||
self.assertRaises(koji.GenericError, koji.policy.HasTest, 'something')
|
||||
|
||||
def test_bool_test(self):
|
||||
obj = koji.policy.BoolTest('some thing')
|
||||
self.assertFalse(obj.run({'thing': None}))
|
||||
self.assertFalse(obj.run({'thing': []}))
|
||||
self.assertTrue(obj.run({'thing': 'yes'}))
|
||||
|
||||
def test_match_test(self):
|
||||
obj = koji.policy.MatchTest('some thing else')
|
||||
self.assertFalse(obj.run({'thing': 'elseplus'}))
|
||||
obj = koji.policy.MatchTest('some thing else*')
|
||||
self.assertTrue(obj.run({'thing': 'elseplus'}))
|
||||
|
||||
def test_compare_test(self):
|
||||
obj = koji.policy.CompareTest('compare thing > 2')
|
||||
self.assertFalse(obj.run({'thing': 1}))
|
||||
self.assertFalse(obj.run({'thing': 2}))
|
||||
self.assertTrue(obj.run({'thing': 3}))
|
||||
|
||||
obj = koji.policy.CompareTest('compare thing < 1.5')
|
||||
self.assertFalse(obj.run({'thing': 3.2}))
|
||||
self.assertTrue(obj.run({'thing': 1.0}))
|
||||
|
||||
obj = koji.policy.CompareTest('compare thing = 42')
|
||||
self.assertFalse(obj.run({'thing': 54}))
|
||||
self.assertTrue(obj.run({'thing': 42}))
|
||||
|
||||
obj = koji.policy.CompareTest('compare thing != 99')
|
||||
self.assertFalse(obj.run({'thing': 99}))
|
||||
self.assertTrue(obj.run({'thing': 100}))
|
||||
|
||||
obj = koji.policy.CompareTest('compare thing >= 2')
|
||||
self.assertFalse(obj.run({'thing': 1}))
|
||||
self.assertTrue(obj.run({'thing': 2}))
|
||||
self.assertTrue(obj.run({'thing': 3}))
|
||||
|
||||
obj = koji.policy.CompareTest('compare thing <= 5')
|
||||
self.assertFalse(obj.run({'thing': 23}))
|
||||
self.assertTrue(obj.run({'thing': 5}))
|
||||
self.assertTrue(obj.run({'thing': 0}))
|
||||
|
||||
@raises(koji.GenericError)
|
||||
def test_invalid_compare_test(self):
|
||||
koji.policy.CompareTest('some thing LOL 2')
|
||||
|
||||
|
||||
class TestDiscovery(unittest.TestCase):
|
||||
|
||||
def test_find_simple_tests(self):
|
||||
actual = koji.policy.findSimpleTests(koji.policy.__dict__)
|
||||
expected = {
|
||||
'all': koji.policy.AllTest,
|
||||
'bool': koji.policy.BoolTest,
|
||||
'compare': koji.policy.CompareTest,
|
||||
'false': koji.policy.FalseTest,
|
||||
'has': koji.policy.HasTest,
|
||||
'match': koji.policy.MatchTest,
|
||||
'none': koji.policy.NoneTest,
|
||||
'true': koji.policy.TrueTest,
|
||||
}
|
||||
self.assertDictEqual(expected, actual)
|
||||
|
||||
|
||||
class TestRuleHandling(unittest.TestCase):
|
||||
|
||||
def test_simple_rule_set_instantiation(self):
|
||||
tests = koji.policy.findSimpleTests(koji.policy.__dict__)
|
||||
rules = ['true :: allow']
|
||||
koji.policy.SimpleRuleSet(rules, tests)
|
||||
|
||||
def test_simple_rule_set_all_actions(self):
|
||||
tests = koji.policy.findSimpleTests(koji.policy.__dict__)
|
||||
rules = ['true :: allow']
|
||||
obj = koji.policy.SimpleRuleSet(rules, tests)
|
||||
result = obj.all_actions()
|
||||
self.assertEquals(result, ['allow'])
|
||||
|
||||
def test_simple_rule_set_apply(self):
|
||||
tests = koji.policy.findSimpleTests(koji.policy.__dict__)
|
||||
data = {}
|
||||
|
||||
rules = ['true :: allow']
|
||||
obj = koji.policy.SimpleRuleSet(rules, tests)
|
||||
action = obj.apply(data)
|
||||
self.assertEqual(action, 'allow')
|
||||
|
||||
rules = ['false :: allow']
|
||||
obj = koji.policy.SimpleRuleSet(rules, tests)
|
||||
action = obj.apply(data)
|
||||
self.assertEqual(action, None)
|
||||
|
||||
def test_custom_rules(self):
|
||||
tests = koji.policy.findSimpleTests([globals(), koji.policy.__dict__])
|
||||
|
||||
rules = ['bool_check :: True', 'all :: False']
|
||||
for val in True, False:
|
||||
data = {'bool_field' : val}
|
||||
obj = koji.policy.SimpleRuleSet(rules, tests)
|
||||
action = obj.apply(data)
|
||||
self.assertEqual(action, str(val))
|
||||
|
||||
rules = ['match_check foo* :: foo', 'match_check * :: bar']
|
||||
data = {'match_field' : 'foo1234'}
|
||||
obj = koji.policy.SimpleRuleSet(rules, tests)
|
||||
action = obj.apply(data)
|
||||
self.assertEqual(action, 'foo')
|
||||
|
||||
data = {'match_field' : 'not foo'}
|
||||
obj = koji.policy.SimpleRuleSet(rules, tests)
|
||||
action = obj.apply(data)
|
||||
self.assertEqual(action, 'bar')
|
||||
|
||||
data = {'myvar': 37}
|
||||
rules = ['myvar = 37 :: get back here']
|
||||
obj = koji.policy.SimpleRuleSet(rules, tests)
|
||||
action = obj.apply(data)
|
||||
self.assertEqual(action, 'get back here')
|
||||
|
||||
rules = ['myvar = 2.718281828 :: euler']
|
||||
with self.assertRaises(ValueError):
|
||||
obj = koji.policy.SimpleRuleSet(rules, tests)
|
||||
|
||||
def test_last_rule(self):
|
||||
tests = koji.policy.findSimpleTests(koji.policy.__dict__)
|
||||
data = {}
|
||||
|
||||
# no match
|
||||
rules = ['none :: allow']
|
||||
obj = koji.policy.SimpleRuleSet(rules, tests)
|
||||
self.assertEquals(obj.last_rule(), None)
|
||||
action = obj.apply(data)
|
||||
self.assertEquals(obj.last_rule(), '(no match)')
|
||||
|
||||
# simple rule
|
||||
rules = ['all :: allow']
|
||||
obj = koji.policy.SimpleRuleSet(rules, tests)
|
||||
action = obj.apply(data)
|
||||
self.assertEquals(obj.last_rule(), rules[0])
|
||||
|
||||
# negate rule
|
||||
rules = ['none !! allow']
|
||||
obj = koji.policy.SimpleRuleSet(rules, tests)
|
||||
action = obj.apply(data)
|
||||
self.assertEquals(obj.last_rule(), rules[0])
|
||||
|
||||
# nested rule
|
||||
policy = '''
|
||||
all :: {
|
||||
all :: {
|
||||
all :: allow
|
||||
}
|
||||
}
|
||||
'''
|
||||
rules = policy.splitlines()
|
||||
obj = koji.policy.SimpleRuleSet(rules, tests)
|
||||
action = obj.apply(data)
|
||||
expected = 'all :: ... all :: ... all :: allow'
|
||||
self.assertEquals(obj.last_rule(), expected)
|
||||
|
||||
def test_unclosed_brace(self):
|
||||
tests = koji.policy.findSimpleTests(koji.policy.__dict__)
|
||||
data = {}
|
||||
|
||||
lines = ['true :: {']
|
||||
with self.assertRaises(koji.GenericError):
|
||||
obj = koji.policy.SimpleRuleSet(lines, tests)
|
||||
|
||||
def test_unmatched_brace(self):
|
||||
tests = koji.policy.findSimpleTests(koji.policy.__dict__)
|
||||
data = {}
|
||||
|
||||
lines = ['true :: }']
|
||||
with self.assertRaises(koji.GenericError):
|
||||
obj = koji.policy.SimpleRuleSet(lines, tests)
|
||||
|
||||
def test_no_action(self):
|
||||
tests = koji.policy.findSimpleTests(koji.policy.__dict__)
|
||||
data = {}
|
||||
|
||||
lines = ['true && true']
|
||||
with self.assertRaises(Exception):
|
||||
obj = koji.policy.SimpleRuleSet(lines, tests)
|
||||
|
||||
def test_missing_handler(self):
|
||||
tests = koji.policy.findSimpleTests(koji.policy.__dict__)
|
||||
data = {}
|
||||
|
||||
lines = ['NOSUCHHANDLER && true :: allow']
|
||||
with self.assertRaises(koji.GenericError):
|
||||
obj = koji.policy.SimpleRuleSet(lines, tests)
|
||||
|
||||
def test_complex_policy(self):
|
||||
tests = koji.policy.findSimpleTests(koji.policy.__dict__)
|
||||
data = {}
|
||||
|
||||
policy = '''
|
||||
# This is a comment in the test policy
|
||||
|
||||
#^blank line
|
||||
# commented test && true :: some result
|
||||
|
||||
# First some rules that should never match
|
||||
false :: ERROR
|
||||
none :: ERROR
|
||||
|
||||
true !! ERROR
|
||||
all !! ERROR
|
||||
|
||||
false && true && true :: ERROR
|
||||
none && true && true :: ERROR
|
||||
|
||||
has NOSUCHFIELD :: ERROR
|
||||
|
||||
# nesting
|
||||
has DEPTH :: {
|
||||
match DEPTH 1 :: 1
|
||||
all :: {
|
||||
match DEPTH 2 :: 2
|
||||
all :: {
|
||||
match DEPTH 3 :: 3
|
||||
all :: {
|
||||
match DEPTH 4 :: 4
|
||||
all :: END
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
'''
|
||||
|
||||
lines = policy.splitlines()
|
||||
|
||||
for depth in ['1', '2', '3', '4']:
|
||||
data = {'DEPTH': depth}
|
||||
obj = koji.policy.SimpleRuleSet(lines, tests)
|
||||
action = obj.apply(data)
|
||||
self.assertEqual(action, depth)
|
||||
|
||||
data = {'DEPTH': '99'}
|
||||
obj = koji.policy.SimpleRuleSet(lines, tests)
|
||||
action = obj.apply(data)
|
||||
self.assertEqual(action, 'END')
|
||||
|
||||
actions = set(obj.all_actions())
|
||||
self.assertEquals(actions, set(['1', '2', '3', '4', 'ERROR', 'END']))
|
||||
|
||||
|
||||
|
|
@ -1,715 +0,0 @@
|
|||
from __future__ import absolute_import
|
||||
import random
|
||||
from os import path, makedirs
|
||||
from shutil import rmtree
|
||||
from tempfile import gettempdir
|
||||
from unittest import TestCase
|
||||
from mock import patch, Mock, call
|
||||
|
||||
import koji
|
||||
from koji.tasks import BaseTaskHandler, FakeTask, ForkTask, SleepTask, \
|
||||
WaitTestTask, scan_mounts, umount_all, \
|
||||
safe_rmtree
|
||||
import six
|
||||
|
||||
|
||||
def get_fake_mounts_file():
|
||||
""" Returns contents of /prc/mounts in a file-like object
|
||||
"""
|
||||
return six.StringIO(six.text_type((
|
||||
'sysfs /sys sysfs rw,seclabel,nosuid,nodev,noexec,relatime 0 0\n'
|
||||
'proc /proc proc rw,nosuid,nodev,noexec,relatime 0 0\n'
|
||||
'devtmpfs /dev devtmpfs rw,seclabel,nosuid,size=238836k,nr_inodes=59709,mode=755 0 0\n'
|
||||
'securityfs /sys/kernel/security securityfs rw,nosuid,nodev,noexec,relatime 0 0\n'
|
||||
'tmpfs /dev/shm\040(deleted) tmpfs rw,seclabel,nosuid,nodev 0 0\n'
|
||||
'devpts /dev/pts devpts rw,seclabel,nosuid,noexec,relatime,gid=5,mode=620,ptmxmode=000 0 0\n'
|
||||
'tmpfs /run tmpfs rw,seclabel,nosuid,nodev,mode=755 0 0\n'
|
||||
'tmpfs /sys/fs/cgroup tmpfs ro,seclabel,nosuid,nodev,noexec,mode=755 0 0\n'
|
||||
'pstore /sys/fs/pstore pstore rw,seclabel,nosuid,nodev,noexec,relatime 0 0\n'
|
||||
'cgroup /sys/fs/cgroup/devices cgroup rw,nosuid,nodev,noexec,relatime,devices 0 0\n'
|
||||
'cgroup /sys/fs/cgroup/perf_event cgroup rw,nosuid,nodev,noexec,relatime,perf_event 0 0\n'
|
||||
'cgroup /sys/fs/cgroup/net_cls,net_prio cgroup rw,nosuid,nodev,noexec,relatime,net_cls,net_prio 0 0\n'
|
||||
'cgroup /sys/fs/cgroup/cpu,cpuacct cgroup rw,nosuid,nodev,noexec,relatime,cpu,cpuacct 0 0\n'
|
||||
'cgroup /sys/fs/cgroup/blkio cgroup rw,nosuid,nodev,noexec,relatime,blkio 0 0\n'
|
||||
'cgroup /sys/fs/cgroup/cpuset cgroup rw,nosuid,nodev,noexec,relatime,cpuset 0 0\n'
|
||||
'cgroup /sys/fs/cgroup/freezer cgroup rw,nosuid,nodev,noexec,relatime,freezer 0 0\n'
|
||||
'cgroup /sys/fs/cgroup/memory cgroup rw,nosuid,nodev,noexec,relatime,memory 0 0\n'
|
||||
'cgroup /sys/fs/cgroup/hugetlb cgroup rw,nosuid,nodev,noexec,relatime,hugetlb 0 0\n'
|
||||
'configfs /sys/kernel/config configfs rw,relatime 0 0\n'
|
||||
)))
|
||||
|
||||
|
||||
def get_temp_dir_root():
|
||||
return path.join(gettempdir(), 'koji_tests')
|
||||
|
||||
|
||||
def get_tmp_dir_path(folder_starts_with):
|
||||
return path.join(get_temp_dir_root(), ('{0}{1}'.format(folder_starts_with, random.randint(1, 999999999999))))
|
||||
|
||||
|
||||
class TestTask(BaseTaskHandler):
|
||||
Methods = ['some_method']
|
||||
_taskWeight = 5.2
|
||||
|
||||
def handler(self, *args):
|
||||
return 42
|
||||
|
||||
|
||||
class TestTaskNoWeight(BaseTaskHandler):
|
||||
Methods = ['some_method']
|
||||
|
||||
def handler(self, *args):
|
||||
return 42
|
||||
|
||||
|
||||
class BadTask(BaseTaskHandler):
|
||||
Methods = ['some_method']
|
||||
|
||||
|
||||
class TasksTestCase(TestCase):
|
||||
|
||||
def tearDown(self):
|
||||
temp_dir_root = get_temp_dir_root()
|
||||
|
||||
if path.isdir(temp_dir_root):
|
||||
rmtree(get_temp_dir_root())
|
||||
|
||||
def test_scan_mounts_results(self):
|
||||
""" Tests the scan_mounts function with a mocked /proc/mounts file. A list containing mount points
|
||||
starting with /dev are expected to be returned from the function based on the function input of /dev.
|
||||
"""
|
||||
fake_mounts_file_contents = get_fake_mounts_file()
|
||||
|
||||
with patch('{0}.open'.format(__name__), return_value=fake_mounts_file_contents, create=True):
|
||||
self.assertIn(scan_mounts('/dev'), [['/dev/shm', '/dev/pts', '/dev/mqueue', '/dev/hugepages', '/dev'],
|
||||
['/dev/shm', '/dev/pts', '/dev/mqueue', '/dev/console', '/dev']])
|
||||
|
||||
def test_scan_mounts_no_results(self):
|
||||
""" Tests the scan_mounts function with a mocked /proc/mounts file. An argument of /nonexistent/path
|
||||
to the function should return an empty list.
|
||||
"""
|
||||
fake_mounts_file_contents = get_fake_mounts_file()
|
||||
|
||||
with patch('{0}.open'.format(__name__), return_value=fake_mounts_file_contents, create=True):
|
||||
self.assertEquals(scan_mounts('/nonexistent/path'), [])
|
||||
|
||||
# Patching the scan_mounts function instead of the built-in open function because this is only testing umount_all
|
||||
@patch('koji.tasks.scan_mounts', side_effect=[['/dev/shm', '/dev/pts', '/dev/mqueue'], []])
|
||||
@patch('os.spawnvp', return_value=0)
|
||||
def test_umount_all(self, mocked_spawnvp, mocked_scan_mounts):
|
||||
""" Tests that umount_all returns nothing when successful.
|
||||
"""
|
||||
self.assertEquals(umount_all('/test'), None)
|
||||
|
||||
# Patching the scan_mounts function instead of the built-in open function because this is only testing umount_all
|
||||
@patch('koji.tasks.scan_mounts', return_value=['/dev/shm', '/dev/pts', '/dev/mqueue'])
|
||||
@patch('os.spawnvp', return_value=1)
|
||||
def test_umount_all_failure(self, mocked_spawnvp, mocked_scan_mounts):
|
||||
""" Tests that umount_all raises an exception when a mount point can't be unmounted.
|
||||
"""
|
||||
try:
|
||||
umount_all('/dev')
|
||||
raise Exception('A GenericError was not raised during the test')
|
||||
except koji.GenericError as e:
|
||||
self.assertEquals(e.args[0],
|
||||
'umount failed (exit code 1) for /dev/shm')
|
||||
|
||||
# Patching the scan_mounts function instead of the built-in open function because this is only testing umount_all
|
||||
@patch('koji.tasks.scan_mounts', side_effect=[['/dev/shm', '/dev/pts', '/dev/mqueue'], ['/dev/shm', '/dev/mqueue']])
|
||||
@patch('os.spawnvp', return_value=0)
|
||||
def test_umount_all_unexpected_failure(self, mocked_spawnvp, mocked_scan_mounts):
|
||||
""" Tests that umount_all will fail if the command to unmount the mount points was successful
|
||||
but a second run of scan_mounts still shows some of the unmount mount points still mounted.
|
||||
"""
|
||||
try:
|
||||
umount_all('/dev')
|
||||
raise Exception('A GenericError was not raised during the test')
|
||||
except koji.GenericError as e:
|
||||
self.assertEquals(e.args[0], 'Unmounting incomplete: [\'/dev/shm\', \'/dev/mqueue\']')
|
||||
|
||||
@patch('os.path.isfile', return_value=True)
|
||||
@patch('os.remove')
|
||||
def test_safe_rmtree_file(self, mock_remove, mock_isfile):
|
||||
""" Tests that the safe_rmtree function returns nothing when the path parameter is a file.
|
||||
"""
|
||||
self.assertEquals(safe_rmtree('/mnt/folder/some_file', False, True), None)
|
||||
|
||||
@patch('os.path.isfile', return_value=False)
|
||||
@patch('os.path.islink', return_value=True)
|
||||
@patch('os.remove')
|
||||
def test_safe_rmtree_link(self, mock_remove, mock_islink, mock_isfile):
|
||||
""" Tests that the safe_rmtree function returns nothing when the path parameter is a link.
|
||||
"""
|
||||
self.assertEquals(safe_rmtree('/mnt/folder/some_link', False, True), None)
|
||||
|
||||
@patch('os.path.isfile', return_value=False)
|
||||
@patch('os.path.islink', return_value=False)
|
||||
@patch('os.path.exists', return_value=False)
|
||||
def test_safe_rmtree_does_not_exist(self, mock_exists, mock_islink, mock_isfile):
|
||||
""" Tests that the safe_rmtree function returns nothing if the path does not exist.
|
||||
"""
|
||||
self.assertEquals(safe_rmtree('/mnt/folder/some_file', False, True), None)
|
||||
|
||||
@patch('os.path.isfile', return_value=False)
|
||||
@patch('os.path.islink', return_value=False)
|
||||
@patch('os.path.exists', return_value=True)
|
||||
@patch('os.system', side_effect=[0, 0])
|
||||
def test_safe_rmtree_directory(self, mock_os_system, mock_exists, mock_islink, mock_isfile):
|
||||
""" Tests that the safe_rmtree function returns nothing when the path is a directory.
|
||||
"""
|
||||
self.assertEquals(safe_rmtree('/mnt/folder', False, True), 0)
|
||||
|
||||
@patch('os.path.isfile', return_value=False)
|
||||
@patch('os.path.islink', return_value=False)
|
||||
@patch('os.path.exists', return_value=True)
|
||||
@patch('os.system', side_effect=[1, 0])
|
||||
def test_safe_rmtree_directory_scrub_file_failure(self, mock_os_system, mock_exists, mock_islink, mock_isfile):
|
||||
""" Tests that the safe_rmtree function returns a GeneralException when the path parameter is a directory
|
||||
and the scrub of the files in the directory fails.
|
||||
"""
|
||||
try:
|
||||
safe_rmtree('/mnt/folder', False, True)
|
||||
raise Exception('A GenericError was not raised during the test')
|
||||
except koji.GenericError as e:
|
||||
self.assertEquals(e.args[0], 'file removal failed (code 1) for /mnt/folder')
|
||||
|
||||
@patch('os.path.isfile', return_value=False)
|
||||
@patch('os.path.islink', return_value=False)
|
||||
@patch('os.path.exists', return_value=True)
|
||||
@patch('os.system', side_effect=[0, 1])
|
||||
def test_safe_rmtree_directory_scrub_directory_failure(self, mock_os_system, mock_exists, mock_islink, mock_isfile):
|
||||
""" Tests that the safe_rmtree function returns a GeneralException when the path parameter is a directory
|
||||
and the scrub of the directories in the directory fails.
|
||||
"""
|
||||
try:
|
||||
safe_rmtree('/mnt/folder', False, True)
|
||||
raise Exception('A GenericError was not raised during the test')
|
||||
except koji.GenericError as e:
|
||||
self.assertEquals(e.args[0], 'dir removal failed (code 1) for /mnt/folder')
|
||||
|
||||
def test_BaseTaskHandler_handler_not_set(self):
|
||||
""" Tests that an exception is thrown when the handler function is not overwritten by the child class.
|
||||
"""
|
||||
obj = BadTask(123, 'some_method', ['random_arg'], None, None, (get_tmp_dir_path('BadTask')))
|
||||
try:
|
||||
obj.handler()
|
||||
raise Exception('The NotImplementedError exception was not raised')
|
||||
except NotImplementedError as e:
|
||||
self.assertEquals(e.__class__.__name__, 'NotImplementedError')
|
||||
|
||||
def test_BaseTaskHandler_weight_default(self):
|
||||
""" Tests that the weight function returns 1.0 when _taskWeight is not set in the child class' definition.
|
||||
"""
|
||||
obj = TestTaskNoWeight(123, 'some_method', ['random_arg'], None, None, (get_tmp_dir_path('TestTaskNoWeight')))
|
||||
self.assertEquals(obj.weight(), 1.0)
|
||||
|
||||
def test_BaseTaskHandler_weight_set(self):
|
||||
""" Tests that the weight function returns the value of _taskWeight when it is set in the
|
||||
child class' definition.
|
||||
"""
|
||||
obj = TestTask(123, 'some_method', ['random_arg'], None, None, (get_tmp_dir_path('TestTask')))
|
||||
self.assertEquals(obj.weight(), 5.2)
|
||||
|
||||
def test_BaseTaskHandler_createWorkdir_workdir_not_defined(self):
|
||||
""" Tests that the createWorkdir function does nothing when the workdir member variable is set to None.
|
||||
"""
|
||||
temp_path = get_tmp_dir_path('TestTask')
|
||||
obj = TestTask(123, 'some_method', ['random_arg'], None, None, temp_path)
|
||||
obj.workdir = None
|
||||
obj.createWorkdir()
|
||||
self.assertEquals(path.isdir(temp_path), False)
|
||||
|
||||
# This patch removes the dependence on removeWorkdir functioning
|
||||
@patch('{0}.TestTask.removeWorkdir'.format(__name__))
|
||||
def test_BaseTaskHandler_createWorkdir(self, mock_removeWorkDir):
|
||||
""" Tests that the createWorkdir function creates a folder based on the path given to the
|
||||
workdir member variable.
|
||||
"""
|
||||
temp_path = get_tmp_dir_path('TestTask')
|
||||
obj = TestTask(123, 'some_method', ['random_arg'], None, None, temp_path)
|
||||
obj.createWorkdir()
|
||||
self.assertEquals(path.isdir(temp_path), True)
|
||||
rmtree(get_temp_dir_root())
|
||||
|
||||
def test_BaseTaskHandler_removeWorkdir(self):
|
||||
""" Tests that the removeWOrkdir function deletes a folder based on the path given to the
|
||||
workdir member variable.
|
||||
"""
|
||||
temp_path = get_tmp_dir_path('TestTask')
|
||||
obj = TestTask(123, 'some_method', ['random_arg'], None, None, temp_path)
|
||||
makedirs(temp_path)
|
||||
self.assertEquals(path.isdir(temp_path), True)
|
||||
obj.removeWorkdir()
|
||||
self.assertEquals(path.isdir(temp_path), False)
|
||||
|
||||
def test_BaseTaskHandler_wait_all_done(self):
|
||||
""" Tests that the wait function returns the subtask results of when the taskWait function returns only
|
||||
two finished tasks
|
||||
"""
|
||||
temp_path = get_tmp_dir_path('TestTask')
|
||||
obj = TestTask(12345678, 'some_method', ['random_arg'], None, None, temp_path)
|
||||
makedirs(temp_path)
|
||||
obj.session = Mock()
|
||||
obj.session.host.taskSetWait.return_value = None
|
||||
obj.session.host.taskWait.return_value = [[1551234, 1591234], []]
|
||||
taskWaitResults = [
|
||||
['1551234', {
|
||||
'brootid': 2342345,
|
||||
'logs': ['tasks/5678/12345678/root.log',
|
||||
'tasks/5678/12345678/state.log',
|
||||
'tasks/5678/12345678/build.log'],
|
||||
'srpm': 'tasks/5678/12345678/some_package-1.2.3p5-25.src.rpm'
|
||||
}],
|
||||
|
||||
['1591234', {
|
||||
'brootid': 1231234,
|
||||
'logs': ['tasks/6789/2345678/root.log',
|
||||
'tasks/6789/2345678/state.log',
|
||||
'tasks/6789/2345678/build.log'],
|
||||
'rpms': ['tasks/6789/2345678/some_other_package-doc-1.2.3p5-25.el7.noarch.rpm'],
|
||||
'srpms': ['tasks/6789/2345678/some_other_package-1.2.3p5-25.el7.src.rpm']
|
||||
}]
|
||||
]
|
||||
|
||||
obj.session.host.taskWaitResults.return_value = taskWaitResults
|
||||
self.assertEquals(obj.wait([1551234, 1591234]), dict(taskWaitResults))
|
||||
obj.session.host.taskSetWait.assert_called_once_with(12345678, [1551234, 1591234])
|
||||
obj.session.host.taskWaitResults.assert_called_once_with(12345678, [1551234, 1591234], canfail=[])
|
||||
|
||||
def test_BaseTaskHandler_wait_some_not_done(self):
|
||||
""" Tests that the wait function returns the one finished subtask results of
|
||||
when the taskWait function returns one finished task and one unfinished
|
||||
"""
|
||||
temp_path = get_tmp_dir_path('TestTask')
|
||||
obj = TestTask(12345678, 'some_method', ['random_arg'], None, None, temp_path)
|
||||
makedirs(temp_path)
|
||||
obj.session = Mock()
|
||||
obj.session.host.taskSetWait.return_value = None
|
||||
obj.session.host.taskWait.return_value = [[1551234], [1591234]]
|
||||
taskWaitResults = [
|
||||
['1551234', {
|
||||
'brootid': 2342345,
|
||||
'logs': ['tasks/5678/12345678/root.log',
|
||||
'tasks/5678/12345678/state.log',
|
||||
'tasks/5678/12345678/build.log'],
|
||||
'srpm': 'tasks/5678/12345678/some_package-1.2.3p5-25.src.rpm'
|
||||
}]
|
||||
]
|
||||
|
||||
obj.session.host.taskWaitResults.return_value = taskWaitResults
|
||||
self.assertEquals(obj.wait([1551234, 1591234]), dict(taskWaitResults))
|
||||
obj.session.host.taskSetWait.assert_called_once_with(12345678, [1551234, 1591234])
|
||||
obj.session.host.taskWaitResults.assert_called_once_with(12345678, [1551234], canfail=[])
|
||||
|
||||
@patch('signal.pause', return_value=None)
|
||||
def test_BaseTaskHandler_wait_some_not_done_all_set(self, mock_signal_pause):
|
||||
""" Tests that the wait function returns the two subtask results since the all kwarg is set to True.
|
||||
The taskWait function should first return one finished and one unfinished task, then the second time it should
|
||||
return two finished tasks.
|
||||
"""
|
||||
temp_path = get_tmp_dir_path('TestTask')
|
||||
obj = TestTask(12345678, 'some_method', ['random_arg'], None, None, temp_path)
|
||||
makedirs(temp_path)
|
||||
obj.session = Mock()
|
||||
obj.session.host.taskSetWait.return_value = None
|
||||
obj.session.host.taskWait.side_effect = [[[1551234], [1591234]], [[1551234, 1591234], []]]
|
||||
taskWaitResults = [
|
||||
['1551234', {
|
||||
'brootid': 2342345,
|
||||
'logs': ['tasks/5678/12345678/root.log',
|
||||
'tasks/5678/12345678/state.log',
|
||||
'tasks/5678/12345678/build.log'],
|
||||
'srpm': 'tasks/5678/12345678/some_package-1.2.3p5-25.src.rpm'
|
||||
}],
|
||||
|
||||
['1591234', {
|
||||
'brootid': 1231234,
|
||||
'logs': ['tasks/6789/2345678/root.log',
|
||||
'tasks/6789/2345678/state.log',
|
||||
'tasks/6789/2345678/build.log'],
|
||||
'rpms': ['tasks/6789/2345678/some_other_package-doc-1.2.3p5-25.el7.noarch.rpm'],
|
||||
'srpms': ['tasks/6789/2345678/some_other_package-1.2.3p5-25.el7.src.rpm']
|
||||
}]
|
||||
]
|
||||
|
||||
obj.session.getTaskResult.side_effect
|
||||
|
||||
obj.session.host.taskWaitResults.return_value = taskWaitResults
|
||||
self.assertEquals(obj.wait([1551234, 1591234], all=True), dict(taskWaitResults))
|
||||
obj.session.host.taskSetWait.assert_called_once_with(12345678, [1551234, 1591234])
|
||||
obj.session.host.taskWait.assert_has_calls([call(12345678), call(12345678)])
|
||||
mock_signal_pause.assert_called_once_with()
|
||||
obj.session.host.taskWaitResults.assert_called_once_with(12345678, [1551234, 1591234], canfail=[])
|
||||
|
||||
def test_BaseTaskHandler_wait_some_not_done_all_set_failany_set_failed_task(self):
|
||||
""" Tests that the wait function raises an exception when one of the subtask fails when the failany flag is set
|
||||
to True.
|
||||
"""
|
||||
temp_path = get_tmp_dir_path('TestTask')
|
||||
obj = TestTask(12345678, 'some_method', ['random_arg'], None, None, temp_path)
|
||||
makedirs(temp_path)
|
||||
obj.session = Mock()
|
||||
obj.session.host.taskSetWait.return_value = None
|
||||
obj.session.host.taskWait.side_effect = [[[1551234], [1591234]], [[1551234, 1591234], []]]
|
||||
obj.session.getTaskResult.side_effect = koji.GenericError('Uh oh, we\'ve got a problem here!')
|
||||
try:
|
||||
obj.wait([1551234, 1591234], all=True, failany=True)
|
||||
raise Exception('A GeneralError was not raised.')
|
||||
except koji.GenericError as e:
|
||||
self.assertEquals(e.args[0], 'Uh oh, we\'ve got a problem here!')
|
||||
obj.session.host.taskSetWait.assert_called_once_with(12345678, [1551234, 1591234])
|
||||
|
||||
def test_BaseTaskHandler_getUploadDir(self):
|
||||
""" Tests that the getUploadDir function returns the appropriate path based on the id of the handler.
|
||||
"""
|
||||
temp_path = get_tmp_dir_path('TestTask')
|
||||
obj = TestTask(123, 'some_method', ['random_arg'], None, None, temp_path)
|
||||
self.assertEquals(obj.getUploadDir(), 'tasks/123/123')
|
||||
|
||||
# This patch removes the dependence on getUploadDir functioning
|
||||
@patch('{0}.TestTask.getUploadDir'.format(__name__), return_value='tasks/123/123')
|
||||
def test_BaseTaskHandler_uploadFile(self, mock_getUploadDir):
|
||||
""" Tests that the uploadFile function calls the uploadWrapper function on the session member variable
|
||||
with the correct input
|
||||
"""
|
||||
temp_path = get_tmp_dir_path('TestTask')
|
||||
makedirs(temp_path)
|
||||
temp_file = path.join(temp_path, 'test.txt')
|
||||
with open(temp_file, 'w') as temp_file_handler:
|
||||
temp_file_handler.write('Test')
|
||||
|
||||
obj = TestTask(123, 'some_method', ['random_arg'], None, None, temp_path)
|
||||
obj.session = Mock()
|
||||
self.assertEquals(obj.uploadFile(temp_file), None)
|
||||
obj.session.uploadWrapper.assert_called_once_with(temp_file, 'tasks/123/123', None, volume=None)
|
||||
|
||||
# This patch removes the dependence on getUploadDir functioning
|
||||
@patch('{0}.TestTask.getUploadDir'.format(__name__), return_value='tasks/123/123')
|
||||
def test_BaseTaskHandler_uploadFile_no_content(self, mock_getUploadDir):
|
||||
""" Tests that the uploadFile function calls the uploadWrapper function on the session member variable
|
||||
without including empty files.
|
||||
"""
|
||||
temp_path = get_tmp_dir_path('TestTask')
|
||||
makedirs(temp_path)
|
||||
|
||||
temp_file = path.join(temp_path, 'test.txt')
|
||||
temp_file_handler = open(temp_file, 'w')
|
||||
temp_file_handler.close()
|
||||
|
||||
obj = TestTask(123, 'some_method', ['random_arg'], None, None, temp_path)
|
||||
obj.session = Mock()
|
||||
self.assertEquals(obj.uploadFile(temp_file), None)
|
||||
self.assertEquals(obj.session.uploadWrapper.called, False)
|
||||
|
||||
def test_BaseTaskHandler_uploadTree(self):
|
||||
""" Tests that the uploadTree function calls the uploadFile function with the correct parameters.
|
||||
"""
|
||||
temp_path = get_tmp_dir_path('TestTask')
|
||||
makedirs(temp_path)
|
||||
|
||||
dummy_dir = path.join(temp_path, 'some_directory')
|
||||
makedirs(dummy_dir)
|
||||
|
||||
dummy_file = path.join(temp_path, 'test.txt')
|
||||
with open(dummy_file, 'w') as temp_file_handler:
|
||||
temp_file_handler.write('Test')
|
||||
|
||||
dummy_file2 = path.join(dummy_dir, 'test2.txt')
|
||||
with open(dummy_file2, 'w') as temp_file_handler2:
|
||||
temp_file_handler2.write('Test2')
|
||||
|
||||
obj = TestTask(123, 'some_method', ['random_arg'], None, None, temp_path)
|
||||
obj.uploadFile = Mock()
|
||||
obj.uploadFile.return_value = None
|
||||
self.assertEquals(obj.uploadTree(temp_path), None)
|
||||
obj.uploadFile.assert_has_calls([call(dummy_file, '', volume=None), call(dummy_file2, 'some_directory', volume=None)])
|
||||
|
||||
@patch('os.lchown', return_value=None)
|
||||
def test_BaseTaskHandler_chownTree(self, mock_lchown):
|
||||
""" Tests that the chownTree functions as expected on dummy files created in a temp directory
|
||||
"""
|
||||
temp_path = get_tmp_dir_path('TestTask')
|
||||
makedirs(temp_path)
|
||||
|
||||
dummy_file = path.join(temp_path, 'test.txt')
|
||||
dummy_file_handler = open(dummy_file, 'w')
|
||||
dummy_file_handler.close()
|
||||
|
||||
dummy_file2 = path.join(temp_path, 'test2.txt')
|
||||
dummy_file_handler2 = open(dummy_file2, 'w')
|
||||
dummy_file_handler2.close()
|
||||
|
||||
obj = TestTask(123, 'some_method', ['random_arg'], None, None, temp_path)
|
||||
self.assertEquals(obj.chownTree(temp_path, 2, 0), None)
|
||||
mock_lchown.assert_has_calls([call(temp_path, 2, 0), call(dummy_file2, 2, 0), call(dummy_file, 2, 0)], any_order=True)
|
||||
|
||||
def test_BaseTaskHandler_localPath_file_exists(self):
|
||||
""" Tests the localPath function to ensure that when a file exists, it returns that path without
|
||||
trying to download it.
|
||||
"""
|
||||
temp_path = get_tmp_dir_path('TestTask')
|
||||
makedirs(temp_path)
|
||||
|
||||
local_folder = path.join(temp_path, 'local')
|
||||
makedirs(local_folder)
|
||||
|
||||
dummy_file = path.join(local_folder, 'test.txt')
|
||||
dummy_file_handler = open(dummy_file, 'w')
|
||||
dummy_file_handler.close()
|
||||
options = Mock()
|
||||
options.topurl = 'https://www.domain.local'
|
||||
obj = TestTask(123, 'some_method', ['random_arg'], None, options, temp_path)
|
||||
self.assertEquals(obj.localPath('test.txt'), dummy_file)
|
||||
|
||||
@patch('six.moves.urllib.request.urlopen', return_value=six.StringIO(six.text_type('Important things\nSome more important things\n')))
|
||||
def test_BaseTaskHandler_localPath_no_file(self, mock_urlopen):
|
||||
"""
|
||||
"""
|
||||
temp_path = get_tmp_dir_path('TestTask')
|
||||
makedirs(temp_path)
|
||||
|
||||
local_folder = path.join(temp_path, 'local')
|
||||
makedirs(local_folder)
|
||||
|
||||
target_file_path = path.join(local_folder, 'test.txt')
|
||||
|
||||
options = Mock()
|
||||
options.topurl = 'https://www.domain.local'
|
||||
obj = TestTask(123, 'some_method', ['random_arg'], None, options, temp_path)
|
||||
|
||||
self.assertEquals(obj.localPath('test.txt'), target_file_path)
|
||||
mock_urlopen.assert_called_once_with('https://www.domain.local/test.txt')
|
||||
|
||||
def test_BaseTaskHandler_localPath_no_topurl(self):
|
||||
""" Tests that the localPath function returns a path when options.topurl is not defined.
|
||||
"""
|
||||
temp_path = get_tmp_dir_path('TestTask')
|
||||
makedirs(temp_path)
|
||||
|
||||
options = Mock()
|
||||
options.topurl = None
|
||||
options.topdir = get_temp_dir_root()
|
||||
obj = TestTask(123, 'some_method', ['random_arg'], None, options, temp_path)
|
||||
|
||||
self.assertEquals(obj.localPath('test.txt'), path.join(get_temp_dir_root(), 'test.txt'))
|
||||
|
||||
def test_BaseTaskHandler_find_arch(self):
|
||||
""" Tests that the find_arch function returns the input for arch when the input is not "noarch".
|
||||
"""
|
||||
temp_path = get_tmp_dir_path('TestTask')
|
||||
makedirs(temp_path)
|
||||
obj = TestTask(123, 'some_method', ['random_arg'], None, None, temp_path)
|
||||
self.assertEquals(obj.find_arch('x86_64', None, None), 'x86_64')
|
||||
|
||||
def test_BaseTaskHandler_find_arch_noarch_bad_host(self):
|
||||
""" Tests that the find_arch function raises an exception when the host parameter doesn't contain a
|
||||
value for the arches key.
|
||||
"""
|
||||
temp_path = get_tmp_dir_path('TestTask')
|
||||
makedirs(temp_path)
|
||||
host = {'arches': None, 'name': 'test.domain.local'}
|
||||
obj = TestTask(123, 'some_method', ['random_arg'], None, None, temp_path)
|
||||
try:
|
||||
obj.find_arch('noarch', host, None)
|
||||
raise Exception('The BuildError Exception was not raised')
|
||||
except koji.BuildError as e:
|
||||
self.assertEquals(e.args[0], 'No arch list for this host: test.domain.local')
|
||||
|
||||
def test_BaseTaskHandler_find_arch_noarch_bad_tag(self):
|
||||
""" Tests that the find_arch function raises an exception when the tag parameter doesn't contain a
|
||||
value for the arches key.
|
||||
"""
|
||||
temp_path = get_tmp_dir_path('TestTask')
|
||||
makedirs(temp_path)
|
||||
host = {'arches': 'x86_64', 'name': 'test.domain.local'}
|
||||
tag = {'arches': None, 'name': 'some_package-1.2-build'}
|
||||
obj = TestTask(123, 'some_method', ['random_arg'], None, None, temp_path)
|
||||
try:
|
||||
obj.find_arch('noarch', host, tag)
|
||||
raise Exception('The BuildError Exception was not raised')
|
||||
except koji.BuildError as e:
|
||||
self.assertEquals(e.args[0], 'No arch list for tag: some_package-1.2-build')
|
||||
|
||||
def test_BaseTaskHandler_find_arch_noarch(self):
|
||||
""" Tests that the find_arch function finds a match of x86_64 when the host only supports x86_64
|
||||
and the tag supports x86_64 and aarch64.
|
||||
"""
|
||||
temp_path = get_tmp_dir_path('TestTask')
|
||||
makedirs(temp_path)
|
||||
host = {'arches': 'x86_64', 'name': 'test.domain.local'}
|
||||
tag = {'arches': 'x86_64 aarch64', 'name': 'some_package-1.2-build'}
|
||||
obj = TestTask(123, 'some_method', ['random_arg'], None, None, temp_path)
|
||||
self.assertEquals(obj.find_arch('noarch', host, tag), 'x86_64')
|
||||
|
||||
def test_BaseTaskHandler_find_arch__noarch_no_match(self):
|
||||
""" Tests that the find_arch function raises an exception when there isn't a common arch supported between
|
||||
the host and the tag.
|
||||
"""
|
||||
temp_path = get_tmp_dir_path('TestTask')
|
||||
makedirs(temp_path)
|
||||
host = {'arches': 'i386', 'name': 'test.domain.local'}
|
||||
tag = {'arches': 'x86_64 aarch64', 'name': 'some_package-1.2-build'}
|
||||
obj = TestTask(123, 'some_method', ['random_arg'], None, None, temp_path)
|
||||
try:
|
||||
obj.find_arch('noarch', host, tag)
|
||||
raise Exception('The BuildError Exception was not raised')
|
||||
except koji.BuildError as e:
|
||||
self.assertEquals(e.args[0], ('host test.domain.local (i386) does not support '
|
||||
'any arches of tag some_package-1.2-build (aarch64, x86_64)'))
|
||||
|
||||
def test_getRepo_tied_to_session(self):
|
||||
""" Tests that the getRepo function calls session.getRepo(), and returns the result when successful
|
||||
"""
|
||||
temp_path = get_tmp_dir_path('TestTask')
|
||||
makedirs(temp_path)
|
||||
|
||||
repo_dict = {
|
||||
'create_event': 13635166,
|
||||
'create_ts': 1469039671.5743899,
|
||||
'creation_time': '2016-07-20 18:34:31.574386',
|
||||
'id': 1630631,
|
||||
'state': 1
|
||||
}
|
||||
|
||||
obj = TestTask(123, 'some_method', ['random_arg'], None, None, temp_path)
|
||||
obj.session = Mock()
|
||||
obj.session.getRepo.return_value = repo_dict
|
||||
|
||||
self.assertEquals(obj.getRepo(8472), repo_dict)
|
||||
|
||||
@patch('{0}.TestTask.wait'.format(__name__))
|
||||
def test_getRepo_not_tied_to_session(self, mock_wait):
|
||||
""" Tests that the getRepo function waits until the results are available for session.getRepo, when it is
|
||||
not available at the start of the function call.
|
||||
"""
|
||||
temp_path = get_tmp_dir_path('TestTask')
|
||||
makedirs(temp_path)
|
||||
|
||||
repo_dict = {
|
||||
'create_event': 13413120,
|
||||
'create_ts': 1466140834.9119599,
|
||||
'creation_time': '2016-06-17 05:20:34.911962',
|
||||
'id': 1592850,
|
||||
'state': 1
|
||||
}
|
||||
|
||||
obj = TestTask(123, 'some_method', ['random_arg'], None, None, temp_path)
|
||||
obj.session = Mock()
|
||||
obj.session.getRepo.return_value = None
|
||||
obj.session.getTag.return_value = {
|
||||
'arches': 'i386 ia64 x86_64 ppc s390 s390x ppc64',
|
||||
'extra': {},
|
||||
'id': 851,
|
||||
'locked': True,
|
||||
'maven_include_all': False,
|
||||
'maven_support': False,
|
||||
'name': 'dist-3.0E-build',
|
||||
'perm': None,
|
||||
'perm_id': None
|
||||
}
|
||||
obj.session.getBuildTargets.return_value = [{
|
||||
'build_tag': 3093,
|
||||
'build_tag_name': 'dist-6E-dsrv-9-build',
|
||||
'dest_tag': 3092,
|
||||
'dest_tag_name': 'dist-6E-dsrv-9-qu-candidate',
|
||||
'id': 851,
|
||||
'name': 'dist-6E-dsrv-9-qu-candidate'
|
||||
}]
|
||||
|
||||
obj.session.host.subtask.return_value = 123
|
||||
mock_wait.return_value = {123: repo_dict}
|
||||
|
||||
self.assertEquals(obj.getRepo(851), repo_dict)
|
||||
obj.session.getRepo.assert_called_once_with(851)
|
||||
obj.session.getTag.assert_called_once_with(851, strict=True)
|
||||
|
||||
@patch('{0}.TestTask.wait'.format(__name__))
|
||||
def test_getRepo_not_tied_to_session_no_build_targets(self, mock_wait):
|
||||
""" Tests that the getRepo function raises an exception when session.getBuildTargets returns an empty list
|
||||
"""
|
||||
temp_path = get_tmp_dir_path('TestTask')
|
||||
makedirs(temp_path)
|
||||
|
||||
obj = TestTask(123, 'some_method', ['random_arg'], None, None, temp_path)
|
||||
obj.session = Mock()
|
||||
obj.session.getRepo.return_value = None
|
||||
obj.session.getTag.return_value = {
|
||||
'arches': 'i686 x86_64 ppc ppc64 ppc64le s390 s390x aarch64',
|
||||
'extra': {},
|
||||
'id': 8472,
|
||||
'locked': False,
|
||||
'maven_include_all': False,
|
||||
'maven_support': False,
|
||||
'name': 'rhel-7.3-build',
|
||||
'perm': 'admin',
|
||||
'perm_id': 1
|
||||
}
|
||||
obj.session.getBuildTargets.return_value = []
|
||||
|
||||
try:
|
||||
obj.getRepo(8472)
|
||||
raise Exception('The BuildError Exception was not raised')
|
||||
except koji.BuildError as e:
|
||||
obj.session.getRepo.assert_called_once_with(8472)
|
||||
self.assertEquals(e.args[0], 'no repo (and no target) for tag rhel-7.3-build')
|
||||
|
||||
def test_FakeTask_handler(self):
|
||||
""" Tests that the FakeTest handler can be instantiated and returns 42 when run
|
||||
"""
|
||||
obj = FakeTask(123, 'someMethod', ['random_arg'], None, None, (get_tmp_dir_path('FakeTask')))
|
||||
self.assertEquals(obj.run(), 42)
|
||||
|
||||
@patch('time.sleep')
|
||||
def test_SleepTask_handler(self, mock_sleep):
|
||||
""" Tests that the SleepTask handler can be instantiated and runs appropriately based on the input
|
||||
"""
|
||||
obj = SleepTask(123, 'sleep', [5], None, None, (get_tmp_dir_path('SleepTask')))
|
||||
obj.run()
|
||||
mock_sleep.assert_called_once_with(5)
|
||||
|
||||
@patch('os.spawnvp')
|
||||
def test_ForkTask_handler(self, mock_spawnvp):
|
||||
""" Tests that the ForkTask handler can be instantiated and runs appropriately based on the input
|
||||
"""
|
||||
obj = ForkTask(123, 'fork', [1, 20], None, None, (get_tmp_dir_path('ForkTask')))
|
||||
obj.run()
|
||||
mock_spawnvp.assert_called_once_with(1, 'sleep', ['sleep', '20'])
|
||||
|
||||
@patch('signal.pause', return_value=None)
|
||||
@patch('time.sleep')
|
||||
def test_WaitTestTask_handler(self, mock_sleep, mock_signal_pause):
|
||||
""" Tests that the WaitTestTask handler can be instantiated and runs appropriately based on the input
|
||||
Specifically, that forking works and canfail behaves correctly.
|
||||
"""
|
||||
self.mock_subtask_id = 1
|
||||
def mock_subtask(method, arglist, id, **opts):
|
||||
self.assertEqual(method, 'sleep')
|
||||
task_id = self.mock_subtask_id
|
||||
self.mock_subtask_id += 1
|
||||
obj = SleepTask(task_id, 'sleep', arglist, None, None, (get_tmp_dir_path('SleepTask')))
|
||||
obj.run()
|
||||
return task_id
|
||||
|
||||
mock_taskWait = [
|
||||
[[], [1, 2, 3, 4]],
|
||||
[[3, 4], [1, 2]],
|
||||
[[1, 2, 3, 4], []],
|
||||
]
|
||||
def mock_getTaskResult(task_id):
|
||||
if task_id == 4:
|
||||
raise koji.GenericError()
|
||||
|
||||
|
||||
obj = WaitTestTask(123, 'waittest', [3], None, None, (get_tmp_dir_path('WaitTestTask')))
|
||||
obj.session = Mock()
|
||||
obj.session.host.subtask.side_effect = mock_subtask
|
||||
obj.session.getTaskResult.side_effect = mock_getTaskResult
|
||||
obj.session.host.taskWait.side_effect = mock_taskWait
|
||||
obj.session.host.taskWaitResults.return_value = [ ['1', {}], ['2', {}], ['3', {}], ['4', {}], ]
|
||||
obj.run()
|
||||
#self.assertEqual(mock_sleep.call_count, 4)
|
||||
obj.session.host.taskSetWait.assert_called_once()
|
||||
obj.session.host.taskWait.assert_has_calls([call(123), call(123), call(123)])
|
||||
# getTaskResult should be called in 2nd round only for task 3, as 4
|
||||
# will be skipped as 'canfail'
|
||||
obj.session.getTaskResult.assert_has_calls([call(3)])
|
||||
Loading…
Add table
Add a link
Reference in a new issue