Rewrite generator to IOStream
This commit is contained in:
parent
35b3e51fcc
commit
a79375789d
5 changed files with 226 additions and 158 deletions
|
|
@ -1,20 +1,11 @@
|
|||
import unittest
|
||||
import mock
|
||||
import six
|
||||
|
||||
import kojihub
|
||||
|
||||
QP = kojihub.QueryProcessor
|
||||
|
||||
|
||||
def mock_open():
|
||||
"""Return the right patch decorator for open"""
|
||||
if six.PY2:
|
||||
return mock.patch('__builtin__.open')
|
||||
else:
|
||||
return mock.patch('builtins.open')
|
||||
|
||||
|
||||
class TestCreateRPMChecksum(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.maxDiff = None
|
||||
|
|
|
|||
|
|
@ -11,9 +11,11 @@ class TestGetRpmChecksums(unittest.TestCase):
|
|||
def setUp(self):
|
||||
self.maxDiff = None
|
||||
self.exports = kojihub.RootExports()
|
||||
self.os_path = mock.patch('os.path.exists').start()
|
||||
self.create_rpm_checksum_output = mock.patch(
|
||||
'kojihub.kojihub.create_rpm_checksums_output').start()
|
||||
self.write_signed_rpm = mock.patch('kojihub.kojihub.write_signed_rpm').start()
|
||||
self.create_rpm_checksum = mock.patch('kojihub.kojihub.create_rpm_checksum').start()
|
||||
self.calculate_chsum = mock.patch('kojihub.kojihub.calculate_chsum').start()
|
||||
self.get_rpm = mock.patch('kojihub.kojihub.get_rpm').start()
|
||||
self.QueryProcessor = mock.patch('kojihub.kojihub.QueryProcessor',
|
||||
side_effect=self.getQuery).start()
|
||||
|
|
@ -37,6 +39,10 @@ class TestGetRpmChecksums(unittest.TestCase):
|
|||
with self.assertRaises(koji.GenericError) as ex:
|
||||
self.exports.getRPMChecksums(rpm_id)
|
||||
self.assertEqual('rpm_id must be an integer', str(ex.exception))
|
||||
self.get_rpm.assert_not_called()
|
||||
self.calculate_chsum.assert_not_called()
|
||||
self.create_rpm_checksum.assert_not_called()
|
||||
self.create_rpm_checksum_output.assert_not_called()
|
||||
|
||||
def test_checksum_types_not_list(self):
|
||||
rpm_id = 123
|
||||
|
|
@ -45,6 +51,10 @@ class TestGetRpmChecksums(unittest.TestCase):
|
|||
with self.assertRaises(koji.GenericError) as ex:
|
||||
self.exports.getRPMChecksums(rpm_id, checksum_types=checksum_types)
|
||||
self.assertEqual('checksum_type must be a list', str(ex.exception))
|
||||
self.get_rpm.assert_called_once_with(rpm_id, strict=True)
|
||||
self.calculate_chsum.assert_not_called()
|
||||
self.create_rpm_checksum.assert_not_called()
|
||||
self.create_rpm_checksum_output.assert_not_called()
|
||||
|
||||
def test_checksum_types_wrong_type(self):
|
||||
rpm_id = 123
|
||||
|
|
@ -52,16 +62,21 @@ class TestGetRpmChecksums(unittest.TestCase):
|
|||
with self.assertRaises(koji.GenericError) as ex:
|
||||
self.exports.getRPMChecksums(rpm_id, checksum_types=checksum_types)
|
||||
self.assertEqual("Checksum_type type isn't supported", str(ex.exception))
|
||||
self.get_rpm.assert_called_once_with(rpm_id, strict=True)
|
||||
self.calculate_chsum.assert_not_called()
|
||||
self.create_rpm_checksum.assert_not_called()
|
||||
self.create_rpm_checksum_output.assert_not_called()
|
||||
|
||||
def test_all_checksum_exists(self):
|
||||
self.os_path.return_value = True
|
||||
rpm_id = 123
|
||||
self.get_rpm.return_value = self.rpm_info
|
||||
checksum_types = ['md5', 'sha256']
|
||||
expected_result = {'sigkey1': {'md5': 'checksum-md5', 'sha256': 'checksum-sha256'}}
|
||||
expected_result = {'sigkey-1': {'md5': 'checksum-md5', 'sha256': 'checksum-sha256'}}
|
||||
self.query_execute.side_effect = [
|
||||
[{'sigkey': 'sigkey-1'}],
|
||||
[{'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'test-sigkey'},
|
||||
{'checksum': 'checksum-sha256', 'checksum_type': 2, 'sigkey': 'test-sigkey'}]]
|
||||
[{'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'sigkey-1'},
|
||||
{'checksum': 'checksum-sha256', 'checksum_type': 2, 'sigkey': 'sigkey-1'}]]
|
||||
self.create_rpm_checksum_output.return_value = expected_result
|
||||
result = self.exports.getRPMChecksums(rpm_id, checksum_types=checksum_types)
|
||||
self.assertEqual(len(self.queries), 2)
|
||||
|
|
@ -76,8 +91,16 @@ class TestGetRpmChecksums(unittest.TestCase):
|
|||
self.assertEqual(query.clauses,
|
||||
['checksum_type IN %(checksum_type)s', 'rpm_id=%(rpm_id)i'])
|
||||
self.assertEqual(expected_result, result)
|
||||
self.get_rpm.assert_called_once_with(rpm_id, strict=True)
|
||||
self.calculate_chsum.assert_not_called()
|
||||
self.create_rpm_checksum.assert_not_called()
|
||||
self.create_rpm_checksum_output.assert_called_once_with(
|
||||
[{'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'sigkey-1'},
|
||||
{'checksum': 'checksum-sha256', 'checksum_type': 2, 'sigkey': 'sigkey-1'}],
|
||||
{'sigkey-1': {'sha256', 'md5'}}
|
||||
)
|
||||
|
||||
def test_missing_checksum_not_sigkey_without_strict(self):
|
||||
def test_missing_checksum_not_sigkey(self):
|
||||
rpm_id = 123
|
||||
self.get_rpm.return_value = self.rpm_info
|
||||
checksum_types = ['md5']
|
||||
|
|
@ -85,109 +108,31 @@ class TestGetRpmChecksums(unittest.TestCase):
|
|||
result = self.exports.getRPMChecksums(rpm_id, checksum_types=checksum_types)
|
||||
self.assertEqual({}, result)
|
||||
|
||||
self.assertEqual(len(self.queries), 2)
|
||||
self.assertEqual(len(self.queries), 1)
|
||||
query = self.queries[0]
|
||||
self.assertEqual(query.tables, ['rpmsigs'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses, ['rpm_id=%(rpm_id)i'])
|
||||
|
||||
query = self.queries[1]
|
||||
self.assertEqual(query.tables, ['rpm_checksum'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses,
|
||||
['checksum_type IN %(checksum_type)s', 'rpm_id=%(rpm_id)i'])
|
||||
self.write_signed_rpm.assert_not_called()
|
||||
self.get_rpm.assert_called_once_with(rpm_id, strict=True)
|
||||
self.calculate_chsum.assert_not_called()
|
||||
self.create_rpm_checksum.assert_not_called()
|
||||
self.create_rpm_checksum_output.assert_not_called()
|
||||
|
||||
def test_missing_checksum_not_sigkey_with_strict(self):
|
||||
rpm_id = 123
|
||||
self.get_rpm.return_value = self.rpm_info
|
||||
checksum_types = ['md5']
|
||||
self.query_execute.side_effect = [[], []]
|
||||
with self.assertRaises(koji.GenericError) as ex:
|
||||
self.exports.getRPMChecksums(rpm_id, checksum_types=checksum_types, strict=True)
|
||||
self.assertEqual(f"Rpm {self.nvra} doesn't have cached checksums or signed copies.",
|
||||
str(ex.exception))
|
||||
|
||||
self.assertEqual(len(self.queries), 2)
|
||||
query = self.queries[0]
|
||||
self.assertEqual(query.tables, ['rpmsigs'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses, ['rpm_id=%(rpm_id)i'])
|
||||
|
||||
query = self.queries[1]
|
||||
self.assertEqual(query.tables, ['rpm_checksum'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses,
|
||||
['checksum_type IN %(checksum_type)s', 'rpm_id=%(rpm_id)i'])
|
||||
self.write_signed_rpm.assert_not_called()
|
||||
self.create_rpm_checksum_output.assert_not_called()
|
||||
|
||||
def test_missing_valid_checksum_generated(self):
|
||||
@mock.patch('kojihub.kojihub.open')
|
||||
def test_missing_valid_all_checksum_generated(self, open):
|
||||
self.os_path.return_value = True
|
||||
rpm_id = 123
|
||||
checksum_types = ['md5']
|
||||
self.get_rpm.return_value = self.rpm_info
|
||||
expected_result = {}
|
||||
expected_result = {'sigkey-1': {'md5': 'checksum-md5'}}
|
||||
calculate_chsum_res = {'sigkey-1': {'md5': 'checksum-md5'}}
|
||||
self.query_execute.side_effect = [
|
||||
[{'sigkey': 'sigkey-1'}],
|
||||
[],
|
||||
[{'checksum': 'checksum-md5', 'checksum_type': 0}]]
|
||||
result = self.exports.getRPMChecksums(rpm_id, checksum_types=checksum_types)
|
||||
self.assertEqual(expected_result, result)
|
||||
|
||||
self.assertEqual(len(self.queries), 2)
|
||||
query = self.queries[0]
|
||||
self.assertEqual(query.tables, ['rpmsigs'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses, ['rpm_id=%(rpm_id)i'])
|
||||
|
||||
query = self.queries[1]
|
||||
self.assertEqual(query.tables, ['rpm_checksum'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses,
|
||||
['checksum_type IN %(checksum_type)s', 'rpm_id=%(rpm_id)i'])
|
||||
self.write_signed_rpm.assert_not_called()
|
||||
self.create_rpm_checksum_output.assert_not_called()
|
||||
|
||||
def test_missing_valid_checksum_generated_with_strict(self):
|
||||
rpm_id = 123
|
||||
checksum_types = ['md5']
|
||||
self.get_rpm.return_value = self.rpm_info
|
||||
self.query_execute.side_effect = [
|
||||
[{'sigkey': 'sigkey-1'}],
|
||||
[],
|
||||
[{'checksum': 'checksum-md5', 'checksum_type': 0}]]
|
||||
with self.assertRaises(koji.GenericError) as ex:
|
||||
self.exports.getRPMChecksums(rpm_id, checksum_types=checksum_types, strict=True)
|
||||
self.assertEqual(f"Rpm {self.nvra} doesn't have cached checksums or signed copies.",
|
||||
str(ex.exception))
|
||||
|
||||
self.assertEqual(len(self.queries), 2)
|
||||
query = self.queries[0]
|
||||
self.assertEqual(query.tables, ['rpmsigs'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses, ['rpm_id=%(rpm_id)i'])
|
||||
|
||||
query = self.queries[1]
|
||||
self.assertEqual(query.tables, ['rpm_checksum'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses,
|
||||
['checksum_type IN %(checksum_type)s', 'rpm_id=%(rpm_id)i'])
|
||||
|
||||
self.write_signed_rpm.assert_not_called()
|
||||
self.create_rpm_checksum_output.assert_not_called()
|
||||
|
||||
def test_missing_valid_more_checksum_generated_and_exists(self):
|
||||
rpm_id = 123
|
||||
self.get_rpm.return_value = self.rpm_info
|
||||
checksum_types = ['md5', 'sha256']
|
||||
expected_result = {'sigkey1': {'md5': 'checksum-md5', 'sha256': 'checksum-sha256'}}
|
||||
self.query_execute.side_effect = [
|
||||
[{'sigkey': 'sigkey-1'}],
|
||||
[{'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'test-sigkey'}],
|
||||
[{'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'test-sigkey'},
|
||||
{'checksum': 'checksum-sha256', 'checksum_type': 2, 'sigkey': 'test-sigkey'}]]
|
||||
self.write_signed_rpm.return_value = None
|
||||
self.calculate_chsum.return_value = calculate_chsum_res
|
||||
self.create_rpm_checksum.return_value = None
|
||||
self.create_rpm_checksum_output.return_value = expected_result
|
||||
result = self.exports.getRPMChecksums(rpm_id, checksum_types=checksum_types)
|
||||
self.assertEqual(expected_result, result)
|
||||
|
|
@ -203,23 +148,98 @@ class TestGetRpmChecksums(unittest.TestCase):
|
|||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses,
|
||||
['checksum_type IN %(checksum_type)s', 'rpm_id=%(rpm_id)i'])
|
||||
self.get_rpm.assert_called_once_with(rpm_id, strict=True)
|
||||
self.assertEqual(self.calculate_chsum.call_count, 1)
|
||||
self.create_rpm_checksum.assert_called_once_with(rpm_id, 'sigkey-1', calculate_chsum_res)
|
||||
self.create_rpm_checksum_output.assert_called_once_with(
|
||||
[{'checksum': 'checksum-md5', 'checksum_type': 0}], {'sigkey-1': {'md5'}}
|
||||
)
|
||||
|
||||
def test_missing_valid_more_checksum_generated_and_exists_more_sigkeys(self):
|
||||
def test_missing_valid_checksum_generated_with_strict(self):
|
||||
self.os_path.return_value = False
|
||||
rpm_id = 123
|
||||
checksum_types = ['md5']
|
||||
self.get_rpm.return_value = self.rpm_info
|
||||
self.query_execute.return_value = [{'sigkey': 'sigkey-1'}]
|
||||
with self.assertRaises(koji.GenericError) as ex:
|
||||
self.exports.getRPMChecksums(rpm_id, checksum_types=checksum_types, strict=True)
|
||||
self.assertEqual(f"Rpm {self.nvra} doesn't have cached signed copies.",
|
||||
str(ex.exception))
|
||||
|
||||
self.assertEqual(len(self.queries), 1)
|
||||
query = self.queries[0]
|
||||
self.assertEqual(query.tables, ['rpmsigs'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses, ['rpm_id=%(rpm_id)i'])
|
||||
|
||||
self.get_rpm.assert_called_once_with(rpm_id, strict=True)
|
||||
self.calculate_chsum.assert_not_called()
|
||||
self.create_rpm_checksum.assert_not_called()
|
||||
self.create_rpm_checksum_output.assert_not_called()
|
||||
|
||||
@mock.patch('kojihub.kojihub.open')
|
||||
def test_missing_valid_more_checksum_generated_and_exists(self, open):
|
||||
self.os_path.return_value = True
|
||||
rpm_id = 123
|
||||
self.get_rpm.return_value = self.rpm_info
|
||||
checksum_types = ['md5', 'sha256']
|
||||
expected_result = {'sigkey-1': {'md5': 'checksum-md5', 'sha256': 'checksum-sha256'}}
|
||||
calculate_chsum_res = {'sigkey-1': {'sha256': 'checksum-sha256'}}
|
||||
self.query_execute.side_effect = [
|
||||
[{'sigkey': 'sigkey-1'}],
|
||||
[{'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'sigkey-1'}],
|
||||
[{'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'sigkey-1'},
|
||||
{'checksum': 'checksum-sha256', 'checksum_type': 2, 'sigkey': 'sigkey-1'}]]
|
||||
self.calculate_chsum.return_value = calculate_chsum_res
|
||||
self.create_rpm_checksum_output.return_value = expected_result
|
||||
self.create_rpm_checksum.return_value = None
|
||||
result = self.exports.getRPMChecksums(rpm_id, checksum_types=checksum_types)
|
||||
self.assertEqual(expected_result, result)
|
||||
|
||||
self.assertEqual(len(self.queries), 2)
|
||||
query = self.queries[0]
|
||||
self.assertEqual(query.tables, ['rpmsigs'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses, ['rpm_id=%(rpm_id)i'])
|
||||
|
||||
query = self.queries[1]
|
||||
self.assertEqual(query.tables, ['rpm_checksum'])
|
||||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses,
|
||||
['checksum_type IN %(checksum_type)s', 'rpm_id=%(rpm_id)i'])
|
||||
|
||||
self.get_rpm.assert_called_once_with(rpm_id, strict=True)
|
||||
self.assertEqual(self.calculate_chsum.call_count, 1)
|
||||
self.create_rpm_checksum.assert_called_once_with(rpm_id, 'sigkey-1', calculate_chsum_res)
|
||||
self.create_rpm_checksum_output.assert_called_once_with(
|
||||
[{'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'sigkey-1'},
|
||||
{'checksum': 'checksum-sha256', 'checksum_type': 2, 'sigkey': 'sigkey-1'}],
|
||||
{'sigkey-1': {'md5', 'sha256'}}
|
||||
)
|
||||
|
||||
@mock.patch('kojihub.kojihub.open')
|
||||
def test_missing_valid_more_checksum_generated_and_exists_more_sigkeys(self, open):
|
||||
self.os_path.return_value = True
|
||||
rpm_id = 123
|
||||
self.get_rpm.return_value = self.rpm_info
|
||||
checksum_types = ['md5', 'sha256']
|
||||
expected_result = {'sigkey1': {'md5': 'checksum-md5', 'sha256': 'checksum-sha256'},
|
||||
'sigkey2': {'md5': 'checksum-md5', 'sha256': 'checksum-sha256'}}
|
||||
calculate_chsum_res = [
|
||||
{'sigkey1': {'md5': 'checksum-md5', 'sha256': 'checksum-sha256'}},
|
||||
{'sigkey2': {'md5': 'checksum-md5', 'sha256': 'checksum-sha256'}},
|
||||
]
|
||||
self.query_execute.side_effect = [
|
||||
[{'sigkey': 'sigkey-1'}, {'sigkey': 'sigkey-2'}],
|
||||
[{'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'sigkey-1'},
|
||||
{'checksum': 'checksum-sha256', 'checksum_type': 2, 'sigkey': 'sigkey-2'}],
|
||||
[{'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'sigkey-1'},
|
||||
{'checksum': 'checksum-sha256', 'checksum_type': 2, 'sigkey': 'sigkey-1'},
|
||||
{'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'sigkey-2'},
|
||||
{'checksum': 'checksum-sha256', 'checksum_type': 2, 'sigkey': 'sigkey-2'}]]
|
||||
self.write_signed_rpm.return_value = None
|
||||
[{'sigkey': 'sigkey1'}, {'sigkey': 'sigkey2'}],
|
||||
[{'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'sigkey1'},
|
||||
{'checksum': 'checksum-sha256', 'checksum_type': 2, 'sigkey': 'sigkey2'}],
|
||||
[{'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'sigkey1'},
|
||||
{'checksum': 'checksum-sha256', 'checksum_type': 2, 'sigkey': 'sigkey1'},
|
||||
{'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'sigkey2'},
|
||||
{'checksum': 'checksum-sha256', 'checksum_type': 2, 'sigkey': 'sigkey2'}]]
|
||||
self.calculate_chsum.side_effect = calculate_chsum_res
|
||||
self.create_rpm_checksum_output.return_value = expected_result
|
||||
self.create_rpm_checksum.side_effect = [None, None]
|
||||
result = self.exports.getRPMChecksums(rpm_id, checksum_types=checksum_types)
|
||||
self.assertEqual(expected_result, result)
|
||||
|
||||
|
|
@ -234,3 +254,16 @@ class TestGetRpmChecksums(unittest.TestCase):
|
|||
self.assertEqual(query.joins, None)
|
||||
self.assertEqual(query.clauses,
|
||||
['checksum_type IN %(checksum_type)s', 'rpm_id=%(rpm_id)i'])
|
||||
|
||||
self.get_rpm.assert_called_once_with(rpm_id, strict=True)
|
||||
self.assertEqual(self.calculate_chsum.call_count, 2)
|
||||
self.create_rpm_checksum.assert_has_calls(
|
||||
[mock.call(rpm_id, 'sigkey1', calculate_chsum_res[0]),
|
||||
mock.call(rpm_id, 'sigkey2', calculate_chsum_res[1])])
|
||||
self.create_rpm_checksum_output.assert_called_once_with(
|
||||
[{'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'sigkey1'},
|
||||
{'checksum': 'checksum-sha256', 'checksum_type': 2, 'sigkey': 'sigkey1'},
|
||||
{'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'sigkey2'},
|
||||
{'checksum': 'checksum-sha256', 'checksum_type': 2, 'sigkey': 'sigkey2'}],
|
||||
{'sigkey1': {'md5', 'sha256'}, 'sigkey2': {'md5', 'sha256'}}
|
||||
)
|
||||
|
|
|
|||
|
|
@ -104,15 +104,15 @@ class TestResetBuild(unittest.TestCase):
|
|||
self.assertEqual(delete.values, {'rpm_id': 123})
|
||||
|
||||
delete = self.deletes[3]
|
||||
self.assertEqual(delete.table, 'rpminfo')
|
||||
self.assertEqual(delete.clauses, ['build_id=%(id)i'])
|
||||
self.assertEqual(delete.values, {'id': self.binfo['build_id']})
|
||||
|
||||
delete = self.deletes[4]
|
||||
self.assertEqual(delete.table, 'rpm_checksum')
|
||||
self.assertEqual(delete.clauses, ['rpm_id=%(rpm_id)i'])
|
||||
self.assertEqual(delete.values, {'rpm_id': 123})
|
||||
|
||||
delete = self.deletes[4]
|
||||
self.assertEqual(delete.table, 'rpminfo')
|
||||
self.assertEqual(delete.clauses, ['build_id=%(id)i'])
|
||||
self.assertEqual(delete.values, {'id': self.binfo['build_id']})
|
||||
|
||||
delete = self.deletes[5]
|
||||
self.assertEqual(delete.table, 'maven_archives')
|
||||
self.assertEqual(delete.clauses, ['archive_id=%(archive_id)i'])
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue