delete_rpm_sig: check header files first

also fixup some error logging and update unit tests
This commit is contained in:
Mike McLean 2023-12-08 16:34:56 -05:00 committed by Tomas Kopecek
parent f732aad509
commit e0fea6a3b0
2 changed files with 111 additions and 18 deletions

View file

@ -30,6 +30,7 @@ import calendar
import copy
import datetime
import fcntl
import filecmp
import fnmatch
import functools
import hashlib
@ -7976,6 +7977,32 @@ def delete_rpm_sig(rpminfo, sigkey=None, all_sigs=False):
binfo = get_build(rinfo['build_id'], strict=True)
builddir = koji.pathinfo.build(binfo)
# Check header files
hdr_renames = []
hdr_deletes = []
for rpmsig in rpm_query_result:
hdr_path = joinpath(builddir, koji.pathinfo.sighdr(rinfo, rpmsig['sigkey']))
backup_path = hdr_path + f".{rpmsig['sighash']}.save"
if not os.path.exists(hdr_path):
logger.error(f'Missing signature header file: {hdr_path}')
# this doesn't prevent us from deleting the signature
# it just means we have nothing to back up
continue
if not os.path.isfile(hdr_path):
# this should not happen and requires human intervention
raise koji.GenericError(f"Not a regular file: {hdr_path}")
if os.path.exists(backup_path):
# Likely residue of previous failed deletion
if filecmp.cmp(hdr_path, backup_path, shallow=False):
# same file contents, so we're already backed up
logger.warning(f"Signature header already backed up: {backup_path}")
hdr_deletes.append([rpmsig, hdr_path])
else:
# this shouldn't happen
raise koji.GenericError(f"Stray header backup file: {backup_path}")
else:
hdr_renames.append([rpmsig, hdr_path, backup_path])
# Delete signed copies
# We do these first since they are the lowest risk
for rpmsig in rpm_query_result:
@ -7986,29 +8013,28 @@ def delete_rpm_sig(rpminfo, sigkey=None, all_sigs=False):
try:
os.remove(signed_path)
logger.warning(f"Deleted signed copy {signed_path}")
except FileNotFoundError:
# possibly a race?
# not fatal -- log and continue
logger.error(f"Signed copy disappeared during call: {signed_path}")
except Exception:
logger.error(f"Failed to delete {signed_path}", exc_info=True)
raise koji.GenericError(f"Failed to delete {signed_path}")
# Backup header files
for rpmsig in rpm_query_result:
hdr_path = joinpath(builddir, koji.pathinfo.sighdr(rinfo, rpmsig['sigkey']))
backup_path = hdr_path + f".{rpmsig['sighash']}.save"
if os.path.exists(backup_path):
# Likely residue of previous failed deletion
raise koji.GenericError(f"Stray header backup file: {backup_path}")
if not os.path.exists(hdr_path):
logger.error(f'Missing signature header file: {hdr_path}')
# not fatal
elif not os.path.isfile(hdr_path):
raise koji.GenericError(f"Not a regular file: {hdr_path}")
else:
for rpmsig, hdr_path, backup_path in hdr_renames:
# sanity checked above
try:
os.rename(hdr_path, backup_path)
logger.debug(f"Signature header saved to {backup_path}")
logger.warning(f"Signature header saved to {backup_path}")
except Exception:
logger.error(f"Failed to rename {hdr_path} to {backup_path}", exc_info=True)
# Delete already backed-up headers
for rpmsig, hdr_path in hdr_deletes:
# verified backup above
try:
os.remove(hdr_path)
logger.warning(f"Deleted signature header {hdr_path}")
except Exception:
logger.error(f"Failed to delete {hdr_path}", exc_info=True)
raise koji.GenericError(f"Failed to delete {hdr_path}")
# Note: we do not delete any empty parent dirs as the primary use case for deleting these
# signatures is to allow the import of new, overlapping ones

View file

@ -173,6 +173,35 @@ class TestDeleteRPMSig(unittest.TestCase):
self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey='testkey')
self.get_build.assert_called_once_with(self.rinfo['build_id'], strict=True)
def test_header_not_a_file(self):
rpminfo = self.rinfo['id']
self.get_rpm.return_value = self.rinfo
self.get_build.return_value = self.buildinfo
self.get_user.return_value = self.userinfo
self.query_rpm_sigs.return_value = self.queryrpmsigs
# we should error, without making any changes, if a header is not a regular file
builddir = self.pathinfo.build(self.buildinfo)
bad_sigkey = '2f86d6a1'
bad_hdr= self.sighdr[bad_sigkey]
os.remove(bad_hdr)
os.mkdir(bad_hdr)
with self.assertRaises(koji.GenericError) as ex:
r = kojihub.delete_rpm_sig(rpminfo, sigkey='testkey')
expected_msg = "Not a regular file: %s" % bad_hdr
self.assertEqual(ex.exception.args[0], expected_msg)
# the files should still be there
for sigkey in self.signed:
if not os.path.exists(self.signed[sigkey]):
raise Exception('signed copy was deleted')
for sigkey in self.sighdr:
if not os.path.exists(self.sighdr[sigkey]):
raise Exception('header was deleted')
if not os.path.isdir(bad_hdr):
# the function should not have touched the invalid path
raise Exception('bad header file was removed')
def test_stray_backup(self):
rpminfo = self.rinfo['id']
self.get_rpm.return_value = self.rinfo
@ -180,16 +209,54 @@ class TestDeleteRPMSig(unittest.TestCase):
self.get_user.return_value = self.userinfo
self.query_rpm_sigs.return_value = self.queryrpmsigs
# a missing signed copy or header should not error
siginfo = self.queryrpmsigs[0]
sigkey = siginfo['sigkey']
backup = "%s.%s.save" % (self.sighdr[sigkey], siginfo['sighash'])
with open(backup, 'wt') as fo:
fo.write('STRAY FILE\n')
# different contents
with self.assertRaises(koji.GenericError) as ex:
r = kojihub.delete_rpm_sig(rpminfo, sigkey='testkey')
expected_msg = "Stray header backup file: %s" % backup
self.assertEqual(ex.exception.args[0], expected_msg)
# files should not have been removed
for sigkey in self.signed:
if not os.path.exists(self.signed[sigkey]):
raise Exception('signed copy was deleted incorrectly')
for sigkey in self.sighdr:
if not os.path.exists(self.sighdr[sigkey]):
raise Exception('header was deleted incorrectly')
def test_dup_backup(self):
rpminfo = self.rinfo['id']
self.get_rpm.return_value = self.rinfo
self.get_build.return_value = self.buildinfo
self.get_user.return_value = self.userinfo
self.query_rpm_sigs.return_value = self.queryrpmsigs
siginfo = self.queryrpmsigs[0]
sigkey = siginfo['sigkey']
backup = "%s.%s.save" % (self.sighdr[sigkey], siginfo['sighash'])
with open(backup, 'wt') as fo:
fo.write('DETACHED SIGHDR\n')
# SAME contents
r = kojihub.delete_rpm_sig(rpminfo, sigkey='testkey')
# the files should be gone
for sigkey in self.signed:
if os.path.exists(self.signed[sigkey]):
raise Exception('signed copy not deleted')
for sigkey in self.sighdr:
if os.path.exists(self.sighdr[sigkey]):
raise Exception('header still in place')
# the sighdrs should be saved
for siginfo in self.queryrpmsigs:
sigkey = siginfo['sigkey']
backup = "%s.%s.save" % (self.sighdr[sigkey], siginfo['sighash'])
with open(backup, 'rt') as fo:
self.assertEqual(fo.read(), 'DETACHED SIGHDR\n')
@mock.patch('os.remove', side_effect=OSError)
def test_not_valid(self, os_remove):