diff --git a/kojihub/kojihub.py b/kojihub/kojihub.py index e1f3fa13..1c75a09f 100644 --- a/kojihub/kojihub.py +++ b/kojihub/kojihub.py @@ -30,6 +30,7 @@ import calendar import copy import datetime import fcntl +import filecmp import fnmatch import functools import hashlib @@ -7976,6 +7977,32 @@ def delete_rpm_sig(rpminfo, sigkey=None, all_sigs=False): binfo = get_build(rinfo['build_id'], strict=True) builddir = koji.pathinfo.build(binfo) + # Check header files + hdr_renames = [] + hdr_deletes = [] + for rpmsig in rpm_query_result: + hdr_path = joinpath(builddir, koji.pathinfo.sighdr(rinfo, rpmsig['sigkey'])) + backup_path = hdr_path + f".{rpmsig['sighash']}.save" + if not os.path.exists(hdr_path): + logger.error(f'Missing signature header file: {hdr_path}') + # this doesn't prevent us from deleting the signature + # it just means we have nothing to back up + continue + if not os.path.isfile(hdr_path): + # this should not happen and requires human intervention + raise koji.GenericError(f"Not a regular file: {hdr_path}") + if os.path.exists(backup_path): + # Likely residue of previous failed deletion + if filecmp.cmp(hdr_path, backup_path, shallow=False): + # same file contents, so we're already backed up + logger.warning(f"Signature header already backed up: {backup_path}") + hdr_deletes.append([rpmsig, hdr_path]) + else: + # this shouldn't happen + raise koji.GenericError(f"Stray header backup file: {backup_path}") + else: + hdr_renames.append([rpmsig, hdr_path, backup_path]) + # Delete signed copies # We do these first since they are the lowest risk for rpmsig in rpm_query_result: @@ -7986,29 +8013,28 @@ def delete_rpm_sig(rpminfo, sigkey=None, all_sigs=False): try: os.remove(signed_path) logger.warning(f"Deleted signed copy {signed_path}") - except FileNotFoundError: - # possibly a race? - # not fatal -- log and continue - logger.error(f"Signed copy disappeared during call: {signed_path}") except Exception: logger.error(f"Failed to delete {signed_path}", exc_info=True) raise koji.GenericError(f"Failed to delete {signed_path}") # Backup header files - for rpmsig in rpm_query_result: - hdr_path = joinpath(builddir, koji.pathinfo.sighdr(rinfo, rpmsig['sigkey'])) - backup_path = hdr_path + f".{rpmsig['sighash']}.save" - if os.path.exists(backup_path): - # Likely residue of previous failed deletion - raise koji.GenericError(f"Stray header backup file: {backup_path}") - if not os.path.exists(hdr_path): - logger.error(f'Missing signature header file: {hdr_path}') - # not fatal - elif not os.path.isfile(hdr_path): - raise koji.GenericError(f"Not a regular file: {hdr_path}") - else: + for rpmsig, hdr_path, backup_path in hdr_renames: + # sanity checked above + try: os.rename(hdr_path, backup_path) - logger.debug(f"Signature header saved to {backup_path}") + logger.warning(f"Signature header saved to {backup_path}") + except Exception: + logger.error(f"Failed to rename {hdr_path} to {backup_path}", exc_info=True) + + # Delete already backed-up headers + for rpmsig, hdr_path in hdr_deletes: + # verified backup above + try: + os.remove(hdr_path) + logger.warning(f"Deleted signature header {hdr_path}") + except Exception: + logger.error(f"Failed to delete {hdr_path}", exc_info=True) + raise koji.GenericError(f"Failed to delete {hdr_path}") # Note: we do not delete any empty parent dirs as the primary use case for deleting these # signatures is to allow the import of new, overlapping ones diff --git a/tests/test_hub/test_delete_rpm_sig.py b/tests/test_hub/test_delete_rpm_sig.py index 88796acd..907c0337 100644 --- a/tests/test_hub/test_delete_rpm_sig.py +++ b/tests/test_hub/test_delete_rpm_sig.py @@ -173,6 +173,35 @@ class TestDeleteRPMSig(unittest.TestCase): self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey='testkey') self.get_build.assert_called_once_with(self.rinfo['build_id'], strict=True) + def test_header_not_a_file(self): + rpminfo = self.rinfo['id'] + self.get_rpm.return_value = self.rinfo + self.get_build.return_value = self.buildinfo + self.get_user.return_value = self.userinfo + self.query_rpm_sigs.return_value = self.queryrpmsigs + + # we should error, without making any changes, if a header is not a regular file + builddir = self.pathinfo.build(self.buildinfo) + bad_sigkey = '2f86d6a1' + bad_hdr= self.sighdr[bad_sigkey] + os.remove(bad_hdr) + os.mkdir(bad_hdr) + with self.assertRaises(koji.GenericError) as ex: + r = kojihub.delete_rpm_sig(rpminfo, sigkey='testkey') + expected_msg = "Not a regular file: %s" % bad_hdr + self.assertEqual(ex.exception.args[0], expected_msg) + + # the files should still be there + for sigkey in self.signed: + if not os.path.exists(self.signed[sigkey]): + raise Exception('signed copy was deleted') + for sigkey in self.sighdr: + if not os.path.exists(self.sighdr[sigkey]): + raise Exception('header was deleted') + if not os.path.isdir(bad_hdr): + # the function should not have touched the invalid path + raise Exception('bad header file was removed') + def test_stray_backup(self): rpminfo = self.rinfo['id'] self.get_rpm.return_value = self.rinfo @@ -180,16 +209,54 @@ class TestDeleteRPMSig(unittest.TestCase): self.get_user.return_value = self.userinfo self.query_rpm_sigs.return_value = self.queryrpmsigs - # a missing signed copy or header should not error siginfo = self.queryrpmsigs[0] sigkey = siginfo['sigkey'] backup = "%s.%s.save" % (self.sighdr[sigkey], siginfo['sighash']) with open(backup, 'wt') as fo: fo.write('STRAY FILE\n') + # different contents with self.assertRaises(koji.GenericError) as ex: r = kojihub.delete_rpm_sig(rpminfo, sigkey='testkey') expected_msg = "Stray header backup file: %s" % backup self.assertEqual(ex.exception.args[0], expected_msg) + # files should not have been removed + for sigkey in self.signed: + if not os.path.exists(self.signed[sigkey]): + raise Exception('signed copy was deleted incorrectly') + for sigkey in self.sighdr: + if not os.path.exists(self.sighdr[sigkey]): + raise Exception('header was deleted incorrectly') + + def test_dup_backup(self): + rpminfo = self.rinfo['id'] + self.get_rpm.return_value = self.rinfo + self.get_build.return_value = self.buildinfo + self.get_user.return_value = self.userinfo + self.query_rpm_sigs.return_value = self.queryrpmsigs + + siginfo = self.queryrpmsigs[0] + sigkey = siginfo['sigkey'] + backup = "%s.%s.save" % (self.sighdr[sigkey], siginfo['sighash']) + with open(backup, 'wt') as fo: + fo.write('DETACHED SIGHDR\n') + # SAME contents + + r = kojihub.delete_rpm_sig(rpminfo, sigkey='testkey') + + # the files should be gone + for sigkey in self.signed: + if os.path.exists(self.signed[sigkey]): + raise Exception('signed copy not deleted') + for sigkey in self.sighdr: + if os.path.exists(self.sighdr[sigkey]): + raise Exception('header still in place') + + # the sighdrs should be saved + for siginfo in self.queryrpmsigs: + sigkey = siginfo['sigkey'] + backup = "%s.%s.save" % (self.sighdr[sigkey], siginfo['sighash']) + with open(backup, 'rt') as fo: + self.assertEqual(fo.read(), 'DETACHED SIGHDR\n') @mock.patch('os.remove', side_effect=OSError) def test_not_valid(self, os_remove):