From 9bf31de75fa5ec6ce04e0d89ed005eee47bc1bb6 Mon Sep 17 00:00:00 2001 From: Jana Cupova Date: Tue, 29 Nov 2022 09:21:41 +0100 Subject: [PATCH] Add checksum API Fixes: https://pagure.io/koji/issue/3627 --- docs/schema-upgrade-1.31-1.32.sql | 9 + docs/schema.sql | 9 + docs/source/hub_conf.rst | 12 ++ kojihub/app/hub.conf | 3 + kojihub/kojihub.py | 132 +++++++++++++- kojihub/kojixmlrpc.py | 5 +- tests/test_hub/test_create_rpm_checksum.py | 132 ++++++++++++++ .../test_create_rpm_checksums_output.py | 34 ++++ tests/test_hub/test_get_rpm_checksums.py | 167 ++++++++++++++++++ 9 files changed, 501 insertions(+), 2 deletions(-) create mode 100644 tests/test_hub/test_create_rpm_checksum.py create mode 100644 tests/test_hub/test_create_rpm_checksums_output.py create mode 100644 tests/test_hub/test_get_rpm_checksums.py diff --git a/docs/schema-upgrade-1.31-1.32.sql b/docs/schema-upgrade-1.31-1.32.sql index 2c25886f..78f0f5d9 100644 --- a/docs/schema-upgrade-1.31-1.32.sql +++ b/docs/schema-upgrade-1.31-1.32.sql @@ -6,4 +6,13 @@ BEGIN; -- fix duplicate extension in archivetypes UPDATE archivetypes SET extensions = 'vhdx.gz vhdx.xz' WHERE name = 'vhdx-compressed'; + -- track checksum of rpms + CREATE TABLE rpm_checksum ( + rpm_id INTEGER NOT NULL REFERENCES rpminfo(id), + sigkey TEXT NOT NULL, + checksum TEXT NOT NULL UNIQUE, + checksum_type SMALLINT NOT NULL, + UNIQUE(rpm_id, sigkey, checksum_type) + ) WITHOUT OIDS; + CREATE INDEX rpm_checksum_rpm_id ON rpm_checksum(rpm_id); COMMIT; diff --git a/docs/schema.sql b/docs/schema.sql index 007e786e..9fdf6a92 100644 --- a/docs/schema.sql +++ b/docs/schema.sql @@ -966,5 +966,14 @@ CREATE TABLE proton_queue ( body JSON NOT NULL ) WITHOUT OIDS; +-- track checksum of rpms +CREATE TABLE rpm_checksum ( + rpm_id INTEGER NOT NULL REFERENCES rpminfo(id), + sigkey TEXT NOT NULL, + checksum TEXT NOT NULL UNIQUE, + checksum_type SMALLINT NOT NULL, + UNIQUE(rpm_id, sigkey, checksum_type) +) WITHOUT OIDS; +CREATE INDEX rpm_checksum_rpm_id ON rpm_checksum(rpm_id); COMMIT WORK; diff --git a/docs/source/hub_conf.rst b/docs/source/hub_conf.rst index 0870a987..7738fbc8 100644 --- a/docs/source/hub_conf.rst +++ b/docs/source/hub_conf.rst @@ -542,3 +542,15 @@ Host names are listed in both groups because hosts always have an associated use Set regex for verify a user name and kerberos. User name and kerberos have in default set up allowed '@' and '/' chars on top of basic name regex for internal names. When regex string is empty, verifying is disabled. + +Default checksums types +^^^^^^^^^^^^^^^^^^^^^^^ +We have default checksums types for create rpm checksums. + +.. glossary:: + RPMDefaultChecksums + Type: string + + Default: ``md5 sha256`` + + Set RPM default checksums type. Default value is set upt to ``md5 sha256``. diff --git a/kojihub/app/hub.conf b/kojihub/app/hub.conf index deca03ec..9ea7aa61 100644 --- a/kojihub/app/hub.conf +++ b/kojihub/app/hub.conf @@ -141,3 +141,6 @@ NotifyOnSuccess = True # MaxNameLengthInternal = 256 # RegexNameInternal = ^[A-Za-z0-9/_.+-]+$ # RegexUserName = ^[A-Za-z0-9/_.@-]+$ + +## Determines default checksums +# RPMDefaultChecksums = md5 sha256 diff --git a/kojihub/kojihub.py b/kojihub/kojihub.py index ade89043..adf7cd6a 100644 --- a/kojihub/kojihub.py +++ b/kojihub/kojihub.py @@ -7968,7 +7968,7 @@ def query_rpm_sigs(rpm_id=None, sigkey=None, queryOpts=None): return query.execute() -def write_signed_rpm(an_rpm, sigkey, force=False): +def write_signed_rpm(an_rpm, sigkey, force=False, checksum_types=None): """Write a signed copy of the rpm""" sigkey = sigkey.lower() rinfo = get_rpm(an_rpm, strict=True) @@ -8003,6 +8003,9 @@ def write_signed_rpm(an_rpm, sigkey, force=False): sighdr = fo.read() koji.ensuredir(os.path.dirname(signedpath)) koji.splice_rpm_sighdr(sighdr, rpm_path, signedpath) + if not checksum_types: + checksum_types = context.opts.get('RPMDefaultChecksums').split() + create_rpm_checksum(rinfo['id'], sigkey, checksum_types=checksum_types) def query_history(tables=None, **kwargs): @@ -8638,6 +8641,9 @@ def reset_build(build): delete = DeleteProcessor(table='archive_rpm_components', clauses=['rpm_id=%(rpm_id)i'], values={'rpm_id': rpm_id}) delete.execute() + delete = DeleteProcessor(table='rpm_checksum', clauses=['rpm_id=%(rpm_id)i'], + values={'rpm_id': rpm_id}) + delete.execute() delete = DeleteProcessor(table='rpminfo', clauses=['build_id=%(id)i'], values={'id': binfo['build_id']}) delete.execute() @@ -13649,6 +13655,56 @@ class RootExports(object): values=locals(), opts=queryOpts) return query.iterate() + def getRPMChecksums(self, rpm_id, checksum_types=None, cacheonly=False): + """Returns RPM checksums for specific rpm. + + :param int rpm_id: RPM id + :param list checksum_type: List of checksum types. Default sha256 checksum type + :param bool cacheonly: when False, checksum is created for missing checksum type + when True, checksum is returned as None when checsum is missing + for specific checksum type + :returns: A dict of specific checksum types and checksums + """ + if not isinstance(rpm_id, int): + raise koji.GenericError('rpm_id must be an integer') + if not checksum_types: + checksum_types = context.opts.get('RPMDefaultChecksums').split() + if not isinstance(checksum_types, list): + raise koji.GenericError('checksum_type must be a list') + + for ch_type in checksum_types: + if ch_type not in koji.CHECKSUM_TYPES: + raise koji.GenericError(f"Checksum_type {ch_type} isn't supported") + + query = QueryProcessor(tables=['rpmsigs'], columns=['sigkey'], + clauses=['rpm_id=%(rpm_id)i'], values={'rpm_id': rpm_id}) + sigkeys = [r['sigkey'] for r in query.execute()] + if not sigkeys: + raise koji.GenericError(f'No cached signature for rpm ID {rpm_id}.') + list_checksums_sigkeys = {s: set(checksum_types) for s in sigkeys} + + checksum_type_int = [koji.CHECKSUM_TYPES[chsum] for chsum in checksum_types] + query_checksum = QueryProcessor(tables=['rpm_checksum'], + columns=['checksum', 'checksum_type', 'sigkey'], + clauses={'rpm_id=%(rpm_id)i', + 'checksum_type IN %(checksum_type)s'}, + values={'rpm_id': rpm_id, + 'checksum_type': checksum_type_int}) + query_result = query_checksum.execute() + if len(query_result) == (len(checksum_type_int) * len(sigkeys)) or cacheonly: + return create_rpm_checksums_output(query_result, list_checksums_sigkeys) + else: + missing_chsum_sigkeys = list_checksums_sigkeys.copy() + for r in query_result: + if r['checksum_type'] in checksum_type_int and r['sigkey'] in sigkeys: + missing_chsum_sigkeys[r['sigkey']].remove( + koji.CHECKSUM_TYPES[r['checksum_type']]) + + for sigkey, chsums in missing_chsum_sigkeys.items(): + write_signed_rpm(rpm_id, sigkey, force=True, checksum_types=list(chsums)) + query_result = query_checksum.execute() + return create_rpm_checksums_output(query_result, list_checksums_sigkeys) + class BuildRoot(object): @@ -15434,3 +15490,77 @@ def verify_name_user(name=None, krb=None): def verify_host_name(name): verify_name_internal(name) verify_name_user(name) + + +def create_rpm_checksums_output(query_result, list_chsum_sigkeys): + """Creates RPM checksum human-friendly dict. + + :param dict query_result: Result of QueryProcessor + :param list checksum_type: List of checksum types + :return result: Human-friendly dict of checksums + """ + result = {} + for sigkey, chsums in list_chsum_sigkeys.items(): + result.setdefault(sigkey, dict(zip(chsums, [None] * len(chsums)))) + for r in query_result: + result[r['sigkey']][koji.CHECKSUM_TYPES[r['checksum_type']]] = r['checksum'] + return result + + +def create_rpm_checksum(rpm_id, sigkey, checksum_types=None): + """Creates RPM checksum. + + :param int rpm_id: RPM id + :param string sigkey: Sigkey for specific RPM + :param list checksum_type: List of checksum types. + """ + if checksum_types is None: + checksum_types = context.opts.get('RPMDefaultChecksums').split() + else: + if not isinstance(checksum_types, (list, tuple)): + raise koji.ParameterError(f'Invalid type of checksum_types: {type(checksum_types)}') + for ch_type in checksum_types: + if ch_type not in koji.CHECKSUM_TYPES: + raise koji.GenericError(f"Checksum_type {ch_type} isn't supported") + rinfo = get_rpm(rpm_id) + nvra = "%(name)s-%(version)s-%(release)s.%(arch)s" % rinfo + if rinfo['external_repo_id']: + raise koji.GenericError(f"Not an internal rpm: {nvra} " + f"(from {rinfo['external_repo_name']})") + query = QueryProcessor(tables=['rpmsigs'], clauses=['rpm_id=%(rpm_id)i', 'sigkey=%(sigkey)s'], + values={'rpm_id': rpm_id, 'sigkey': sigkey}, opts={'countOnly': True}) + sighash = query.singleValue(strict=False) + if not sighash: + raise koji.GenericError(f"There is no rpm {nvra} signed with {sigkey}") + checksum_type_int = [koji.CHECKSUM_TYPES[chsum] for chsum in checksum_types] + query = QueryProcessor(tables=['rpm_checksum'], columns=['checksum_type'], + clauses=["checksum_type IN %(checksum_types)s", 'sigkey=%(sigkey)s'], + values={'checksum_types': checksum_type_int, 'sigkey': sigkey}) + rows = query.execute() + if len(rows) == len(checksum_type_int): + return None + else: + for r in rows: + if r['checksum_type'] in checksum_type_int: + checksum_types.remove(koji.CHECKSUM_TYPES[r['checksum_type']]) + buildinfo = get_build(rinfo['build_id']) + rpm_path = joinpath(koji.pathinfo.build(buildinfo), koji.pathinfo.signed(rinfo, sigkey)) + chsum_list = {chsum: getattr(hashlib, chsum)() for chsum in checksum_types} + + try: + with open(rpm_path, 'rb') as f: + while 1: + chunk = f.read(1024**2) + if not chunk: + break + for func, chsum in chsum_list.items(): + chsum.update(chunk) + except IOError: + raise koji.GenericError(f"RPM path {rpm_path} cannot be open.") + + if chsum_list: + insert = BulkInsertProcessor(table='rpm_checksum') + for func, chsum in chsum_list.items(): + insert.add_record(rpm_id=rpm_id, sigkey=sigkey, checksum=chsum.hexdigest(), + checksum_type=koji.CHECKSUM_TYPES[func]) + insert.execute() diff --git a/kojihub/kojixmlrpc.py b/kojihub/kojixmlrpc.py index f598d370..30ca70ca 100644 --- a/kojihub/kojixmlrpc.py +++ b/kojihub/kojixmlrpc.py @@ -497,7 +497,9 @@ def load_config(environ): ['MaxNameLengthInternal', 'integer', 256], ['RegexNameInternal', 'string', r'^[A-Za-z0-9/_.+-]+$'], - ['RegexUserName', 'string', r'^[A-Za-z0-9/_.@-]+$'] + ['RegexUserName', 'string', r'^[A-Za-z0-9/_.@-]+$'], + + ['RPMDefaultChecksums', 'string', 'md5 sha256'] ] opts = {} for name, dtype, default in cfgmap: @@ -532,6 +534,7 @@ def load_config(environ): opts['RegexNameInternal.compiled'] = re.compile(opts['RegexNameInternal']) if opts['RegexUserName'] != '': opts['RegexUserName.compiled'] = re.compile(opts['RegexUserName']) + return opts diff --git a/tests/test_hub/test_create_rpm_checksum.py b/tests/test_hub/test_create_rpm_checksum.py new file mode 100644 index 00000000..b146b12a --- /dev/null +++ b/tests/test_hub/test_create_rpm_checksum.py @@ -0,0 +1,132 @@ +import unittest +import mock +import six + +import koji +import kojihub + +QP = kojihub.QueryProcessor + + +def mock_open(): + """Return the right patch decorator for open""" + if six.PY2: + return mock.patch('__builtin__.open') + else: + return mock.patch('builtins.open') + + +class TestCreateRPMChecksum(unittest.TestCase): + def setUp(self): + self.maxDiff = None + self.QueryProcessor = mock.patch('kojihub.kojihub.QueryProcessor', + side_effect=self.getQuery).start() + self.queries = [] + self.get_rpm = mock.patch('kojihub.kojihub.get_rpm').start() + self.get_build = mock.patch('kojihub.kojihub.get_build').start() + self. rpm_info = {'id': 123, 'name': 'test-rpm', 'external_repo_id': 0, 'version': '11', + 'release': '222', 'arch': 'noarch', 'build_id': 222} + self.rpm_id = 123 + self.sigkey = 'test-sigkey' + self.query_execute = mock.MagicMock() + self.single_value = mock.MagicMock() + self.path_signed = mock.patch('koji.pathinfo.signed').start() + self.path_build = mock.patch('koji.pathinfo.build').start() + self.context = mock.patch('kojihub.kojihub.context').start() + self.context.session.assertPerm = mock.MagicMock() + self.context.opts = {'RPMDefaultChecksums': 'md5 sha256'} + + def tearDown(self): + mock.patch.stopall() + + def getQuery(self, *args, **kwargs): + query = QP(*args, **kwargs) + query.execute = self.query_execute + query.singleValue = self.single_value + self.queries.append(query) + return query + + def test_checksum_type_not_string(self): + checksum_types = 'type-1 type-2' + with self.assertRaises(koji.GenericError) as ex: + kojihub.create_rpm_checksum(self.rpm_id, self.sigkey, checksum_types=checksum_types) + self.assertEqual(f'Invalid type of checksum_types: {type(checksum_types)}', + str(ex.exception)) + + def test_checksum_type_bad_type(self): + checksum_types = ['md5', 'type-1'] + with self.assertRaises(koji.GenericError) as ex: + kojihub.create_rpm_checksum(self.rpm_id, self.sigkey, checksum_types=checksum_types) + self.assertEqual("Checksum_type type-1 isn't supported", str(ex.exception)) + + def test_external_rpm(self): + ext_rpm_info = {'id': 123, 'name': 'test-rpm', 'external_repo_id': 125, + 'external_repo_name': 'test-ext-rpm', 'version': '11', + 'release': '222', 'arch': 'noarch'} + self.get_rpm.return_value = ext_rpm_info + nvra = "%(name)s-%(version)s-%(release)s.%(arch)s" % ext_rpm_info + with self.assertRaises(koji.GenericError) as ex: + kojihub.create_rpm_checksum(self.rpm_id, self.sigkey) + self.assertEqual(f'Not an internal rpm: {nvra} (from test-ext-rpm)', str(ex.exception)) + + def test_not_sigkey_related_to_rpm(self): + self.get_rpm.return_value = self.rpm_info + checksum_types = ['md5', 'sha256'] + nvra = "%(name)s-%(version)s-%(release)s.%(arch)s" % self.rpm_info + expected_err = f'There is no rpm {nvra} signed with {self.sigkey}' + self.single_value.return_value = None + with self.assertRaises(koji.GenericError) as ex: + kojihub.create_rpm_checksum(self.rpm_id, self.sigkey, checksum_types=checksum_types) + self.assertEqual(expected_err % self.rpm_info, str(ex.exception)) + + self.assertEqual(len(self.queries), 1) + query = self.queries[0] + self.assertEqual(query.tables, ['rpmsigs']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['rpm_id=%(rpm_id)i', 'sigkey=%(sigkey)s']) + + def test_checksum_exists(self): + self.get_rpm.return_value = self.rpm_info + self.single_value.return_value = 'test-sighash' + self.query_execute.return_value = [{'checksum_type': 'md5'}, {'checksum_type': 'sha256'}] + result = kojihub.create_rpm_checksum(self.rpm_id, self.sigkey) + self.assertIsNone(result) + + self.assertEqual(len(self.queries), 2) + query = self.queries[0] + self.assertEqual(query.tables, ['rpmsigs']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['rpm_id=%(rpm_id)i', 'sigkey=%(sigkey)s']) + + query = self.queries[1] + self.assertEqual(query.tables, ['rpm_checksum']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, + ["checksum_type IN %(checksum_types)s", "sigkey=%(sigkey)s"]) + + @mock_open() + def test_cannot_open_file(self, m_open): + self.get_rpm.return_value = self.rpm_info + self.get_build.return_value = {'build_id': 222} + self.single_value.return_value = 'test-sighash' + self.query_execute.return_value = [{'checksum_type': 'md5'}] + self.path_build.return_value = 'fakebuildpath' + self.path_signed.return_value = 'fakesignedpath' + m_open.side_effect = IOError() + + with self.assertRaises(koji.GenericError) as ex: + kojihub.create_rpm_checksum(self.rpm_id, self.sigkey) + self.assertEqual("RPM path fakebuildpath/fakesignedpath cannot be open.", + str(ex.exception)) + + self.assertEqual(len(self.queries), 2) + query = self.queries[0] + self.assertEqual(query.tables, ['rpmsigs']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['rpm_id=%(rpm_id)i', 'sigkey=%(sigkey)s']) + + query = self.queries[1] + self.assertEqual(query.tables, ['rpm_checksum']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, + ["checksum_type IN %(checksum_types)s", "sigkey=%(sigkey)s"]) diff --git a/tests/test_hub/test_create_rpm_checksums_output.py b/tests/test_hub/test_create_rpm_checksums_output.py new file mode 100644 index 00000000..17bc5573 --- /dev/null +++ b/tests/test_hub/test_create_rpm_checksums_output.py @@ -0,0 +1,34 @@ +import unittest + +import kojihub + + +class TestCreateRPMChecksumsOutput(unittest.TestCase): + def setUp(self): + self.maxDiff = None + self.exports = kojihub.RootExports() + + def test_cacheonly_all_exists(self): + expected_result = {'sigkey1': {'md5': 'checksum-md5', 'sha256': 'checksum-sha256'}, + 'sigkey2': {'md5': 'checksum-md5', 'sha256': 'checksum-sha256'}} + query_result = [{'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'sigkey1'}, + {'checksum': 'checksum-sha256', 'checksum_type': 2, 'sigkey': 'sigkey1'}, + {'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'sigkey2'}, + {'checksum': 'checksum-sha256', 'checksum_type': 2, 'sigkey': 'sigkey2'}] + checksum_types = {'sigkey1': {'md5', 'sha256'}, + 'sigkey2': {'md5', 'sha256'} + } + + result = kojihub.create_rpm_checksums_output(query_result, checksum_types) + self.assertEqual(expected_result, result) + + def test_cacheonly_some_exists(self): + expected_result = {'sigkey1': {'md5': 'checksum-md5', 'sha256': None}, + 'sigkey2': {'md5': None, 'sha256': 'checksum-sha256'}} + query_result = [{'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'sigkey1'}, + {'checksum': 'checksum-sha256', 'checksum_type': 2, 'sigkey': 'sigkey2'}] + checksum_types = {'sigkey1': {'md5', 'sha256'}, + 'sigkey2': {'md5', 'sha256'} + } + result = kojihub.create_rpm_checksums_output(query_result, checksum_types) + self.assertEqual(expected_result, result) diff --git a/tests/test_hub/test_get_rpm_checksums.py b/tests/test_hub/test_get_rpm_checksums.py new file mode 100644 index 00000000..60f30f25 --- /dev/null +++ b/tests/test_hub/test_get_rpm_checksums.py @@ -0,0 +1,167 @@ +import unittest +import mock + +import koji +import kojihub + +QP = kojihub.QueryProcessor + + +class TestGetRpmChecksums(unittest.TestCase): + def setUp(self): + self.maxDiff = None + self.exports = kojihub.RootExports() + self.create_rpm_checksum_output = mock.patch( + 'kojihub.kojihub.create_rpm_checksums_output').start() + self.write_signed_rpm = mock.patch('kojihub.kojihub.write_signed_rpm').start() + self.QueryProcessor = mock.patch('kojihub.kojihub.QueryProcessor', + side_effect=self.getQuery).start() + self.queries = [] + self.query_execute = mock.MagicMock() + + def tearDown(self): + mock.patch.stopall() + + def getQuery(self, *args, **kwargs): + query = QP(*args, **kwargs) + query.execute = self.query_execute + self.queries.append(query) + return query + + def test_rpm_id_not_str(self): + rpm_id = ['123'] + with self.assertRaises(koji.GenericError) as ex: + self.exports.getRPMChecksums(rpm_id) + self.assertEqual('rpm_id must be an integer', str(ex.exception)) + + def test_checksum_types_not_list(self): + rpm_id = 123 + checksum_types = 'type' + with self.assertRaises(koji.GenericError) as ex: + self.exports.getRPMChecksums(rpm_id, checksum_types=checksum_types) + self.assertEqual('checksum_type must be a list', str(ex.exception)) + + def test_checksum_types_wrong_type(self): + rpm_id = 123 + checksum_types = ['md5', 'type'] + with self.assertRaises(koji.GenericError) as ex: + self.exports.getRPMChecksums(rpm_id, checksum_types=checksum_types) + self.assertEqual("Checksum_type type isn't supported", str(ex.exception)) + + def test_all_checksum_exists(self): + rpm_id = 123 + checksum_types = ['md5', 'sha256'] + expected_result = {'sigkey1': {'md5': 'checksum-md5', 'sha256': 'checksum-sha256'}} + self.query_execute.side_effect = [ + [{'sigkey': 'sigkey-1'}], + [{'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'test-sigkey'}, + {'checksum': 'checksum-sha256', 'checksum_type': 2, 'sigkey': 'test-sigkey'}]] + self.create_rpm_checksum_output.return_value = expected_result + result = self.exports.getRPMChecksums(rpm_id, checksum_types=checksum_types) + self.assertEqual(len(self.queries), 2) + query = self.queries[0] + self.assertEqual(query.tables, ['rpmsigs']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['rpm_id=%(rpm_id)i']) + + query = self.queries[1] + self.assertEqual(query.tables, ['rpm_checksum']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, + ['checksum_type IN %(checksum_type)s', 'rpm_id=%(rpm_id)i']) + self.assertEqual(expected_result, result) + + def test_missing_checksum_not_sigkey(self): + rpm_id = 123 + checksum_types = ['md5'] + self.query_execute.side_effect = [[], []] + with self.assertRaises(koji.GenericError) as ex: + self.exports.getRPMChecksums(rpm_id, checksum_types=checksum_types) + self.assertEqual(f'No cached signature for rpm ID {rpm_id}.', str(ex.exception)) + + self.assertEqual(len(self.queries), 1) + query = self.queries[0] + self.assertEqual(query.tables, ['rpmsigs']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['rpm_id=%(rpm_id)i']) + + def test_missing_valid_checksum_generated(self): + rpm_id = 123 + checksum_types = ['md5'] + expected_result = {'sigkey1': {'md5': 'checksum-md5'}} + self.query_execute.side_effect = [ + [{'sigkey': 'sigkey-1'}], + [], + [{'checksum': 'checksum-md5', 'checksum_type': 0}]] + self.write_signed_rpm.return_value = None + self.create_rpm_checksum_output.return_value = expected_result + result = self.exports.getRPMChecksums(rpm_id, checksum_types=checksum_types) + self.assertEqual(expected_result, result) + + self.assertEqual(len(self.queries), 2) + query = self.queries[0] + self.assertEqual(query.tables, ['rpmsigs']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['rpm_id=%(rpm_id)i']) + + query = self.queries[1] + self.assertEqual(query.tables, ['rpm_checksum']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, + ['checksum_type IN %(checksum_type)s', 'rpm_id=%(rpm_id)i']) + + def test_missing_valid_more_checksum_generated_and_exists(self): + rpm_id = 123 + checksum_types = ['md5', 'sha256'] + expected_result = {'sigkey1': {'md5': 'checksum-md5', 'sha256': 'checksum-sha256'}} + self.query_execute.side_effect = [ + [{'sigkey': 'sigkey-1'}], + [{'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'test-sigkey'}], + [{'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'test-sigkey'}, + {'checksum': 'checksum-sha256', 'checksum_type': 2, 'sigkey': 'test-sigkey'}]] + self.write_signed_rpm.return_value = None + self.create_rpm_checksum_output.return_value = expected_result + result = self.exports.getRPMChecksums(rpm_id, checksum_types=checksum_types) + self.assertEqual(expected_result, result) + + self.assertEqual(len(self.queries), 2) + query = self.queries[0] + self.assertEqual(query.tables, ['rpmsigs']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['rpm_id=%(rpm_id)i']) + + query = self.queries[1] + self.assertEqual(query.tables, ['rpm_checksum']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, + ['checksum_type IN %(checksum_type)s', 'rpm_id=%(rpm_id)i']) + + def test_missing_valid_more_checksum_generated_and_exists_more_sigkeys(self): + rpm_id = 123 + checksum_types = ['md5', 'sha256'] + expected_result = {'sigkey1': {'md5': 'checksum-md5', 'sha256': 'checksum-sha256'}, + 'sigkey2': {'md5': 'checksum-md5', 'sha256': 'checksum-sha256'}} + self.query_execute.side_effect = [ + [{'sigkey': 'sigkey-1'}, {'sigkey': 'sigkey-2'}], + [{'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'sigkey-1'}, + {'checksum': 'checksum-sha256', 'checksum_type': 2, 'sigkey': 'sigkey-2'}], + [{'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'sigkey-1'}, + {'checksum': 'checksum-sha256', 'checksum_type': 2, 'sigkey': 'sigkey-1'}, + {'checksum': 'checksum-md5', 'checksum_type': 0, 'sigkey': 'sigkey-2'}, + {'checksum': 'checksum-sha256', 'checksum_type': 2, 'sigkey': 'sigkey-2'}]] + self.write_signed_rpm.return_value = None + self.create_rpm_checksum_output.return_value = expected_result + result = self.exports.getRPMChecksums(rpm_id, checksum_types=checksum_types) + self.assertEqual(expected_result, result) + + self.assertEqual(len(self.queries), 2) + query = self.queries[0] + self.assertEqual(query.tables, ['rpmsigs']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['rpm_id=%(rpm_id)i']) + + query = self.queries[1] + self.assertEqual(query.tables, ['rpm_checksum']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, + ['checksum_type IN %(checksum_type)s', 'rpm_id=%(rpm_id)i'])