Backup signature headers in delete_rpm_sig

This commit is contained in:
Mike McLean 2023-11-21 15:14:41 -05:00 committed by Tomas Kopecek
parent 15018712bc
commit f732aad509
2 changed files with 141 additions and 66 deletions

View file

@ -1,9 +1,13 @@
import os
import tempfile
import shutil
import unittest
import mock
import koji
import kojihub
from koji.util import joinpath
DP = kojihub.DeleteProcessor
@ -17,6 +21,9 @@ class TestDeleteRPMSig(unittest.TestCase):
return delete
def setUp(self):
self.tempdir = tempfile.mkdtemp()
self.pathinfo = koji.PathInfo(self.tempdir)
mock.patch('koji.pathinfo', new=self.pathinfo).start()
self.DeleteProcessor = mock.patch('kojihub.kojihub.DeleteProcessor',
side_effect=self.getDelete).start()
self.deletes = []
@ -58,9 +65,31 @@ class TestDeleteRPMSig(unittest.TestCase):
'sigkey': '2f86d6a1'}]
self.userinfo = {'authtype': 2, 'id': 1, 'krb_principal': None, 'krb_principals': [],
'name': 'testuser', 'status': 0, 'usertype': 0}
self.set_up_files()
def set_up_files(self):
builddir = self.pathinfo.build(self.buildinfo)
os.makedirs(builddir)
self.builddir = builddir
self.signed = {}
self.sighdr = {}
for sig in self.queryrpmsigs:
key = sig['sigkey']
signed = joinpath(builddir, self.pathinfo.signed(self.rinfo, key))
self.signed[key] = signed
koji.ensuredir(os.path.dirname(signed))
with open(signed, 'wt') as fo:
fo.write('SIGNED COPY\n')
sighdr = joinpath(builddir, self.pathinfo.sighdr(self.rinfo, key))
self.sighdr[key] = sighdr
koji.ensuredir(os.path.dirname(sighdr))
with open(sighdr, 'wt') as fo:
fo.write('DETACHED SIGHDR\n')
def tearDown(self):
mock.patch.stopall()
shutil.rmtree(self.tempdir)
def test_rpm_not_existing(self):
rpm_id = 1234
@ -84,7 +113,8 @@ class TestDeleteRPMSig(unittest.TestCase):
def test_external_repo(self):
rpminfo = 1234
rinfo = {'external_repo_id': 1, 'external_repo_name': 'INTERNAL'}
rinfo = self.rinfo.copy()
rinfo.update({'external_repo_id': 1, 'external_repo_name': 'INTERNAL'})
self.get_rpm.return_value = rinfo
with self.assertRaises(koji.GenericError) as ex:
kojihub.delete_rpm_sig(rpminfo, all_sigs=True)
@ -108,39 +138,67 @@ class TestDeleteRPMSig(unittest.TestCase):
self.get_rpm.assert_called_once_with(rpminfo, strict=True)
self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey=None)
@mock.patch('koji.pathinfo.build', return_value='fakebuildpath')
@mock.patch('os.remove')
def test_file_not_found_error(self, os_remove, pb):
rpminfo = 2
os_remove.side_effect = FileNotFoundError()
def test_file_not_found_error(self):
rpminfo = self.rinfo['id']
self.get_rpm.return_value = self.rinfo
self.get_build.return_value = self.buildinfo
self.get_user.return_value = self.userinfo
self.query_rpm_sigs.return_value = self.queryrpmsigs
# a missing signed copy or header should not error
builddir = self.pathinfo.build(self.buildinfo)
sigkey = '2f86d6a1'
os.remove(self.signed[sigkey])
os.remove(self.sighdr[sigkey])
r = kojihub.delete_rpm_sig(rpminfo, sigkey='testkey')
self.assertEqual(r, None)
# the files should still be gone
for sigkey in self.signed:
if os.path.exists(self.signed[sigkey]):
raise Exception('signed copy not deleted')
for sigkey in self.sighdr:
if os.path.exists(self.sighdr[sigkey]):
raise Exception('header still in place')
self.assertEqual(len(self.deletes), 2)
delete = self.deletes[0]
self.assertEqual(delete.table, 'rpmsigs')
self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)i", "sigkey=%(sigkey)s"])
self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)s", "sigkey IN %(found_keys)s"])
delete = self.deletes[1]
self.assertEqual(delete.table, 'rpm_checksum')
self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)i", "sigkey=%(sigkey)s"])
self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)s", "sigkey IN %(found_keys)s"])
self.get_rpm.assert_called_once_with(rpminfo, strict=True)
self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey='testkey')
self.get_build.assert_called_once_with(self.rinfo['build_id'])
self.get_build.assert_called_once_with(self.rinfo['build_id'], strict=True)
def test_stray_backup(self):
rpminfo = self.rinfo['id']
self.get_rpm.return_value = self.rinfo
self.get_build.return_value = self.buildinfo
self.get_user.return_value = self.userinfo
self.query_rpm_sigs.return_value = self.queryrpmsigs
# a missing signed copy or header should not error
siginfo = self.queryrpmsigs[0]
sigkey = siginfo['sigkey']
backup = "%s.%s.save" % (self.sighdr[sigkey], siginfo['sighash'])
with open(backup, 'wt') as fo:
fo.write('STRAY FILE\n')
with self.assertRaises(koji.GenericError) as ex:
r = kojihub.delete_rpm_sig(rpminfo, sigkey='testkey')
expected_msg = "Stray header backup file: %s" % backup
self.assertEqual(ex.exception.args[0], expected_msg)
@mock.patch('koji.pathinfo.build', return_value='fakebuildpath')
@mock.patch('os.remove', side_effect=OSError)
def test_not_valid(self, os_remove, pb):
def test_not_valid(self, os_remove):
rpminfo = 2
filepath = 'fakebuildpath/data/signed/x86_64/fs_mark-3.3-20.el8.x86_64.rpm'
filepath = '%s/packages/fs_mark/3.3/20.el8/data/signed/x86_64/fs_mark-3.3-20.el8.x86_64.rpm' % self.tempdir
self.get_rpm.return_value = self.rinfo
self.get_build.return_value = self.buildinfo
self.query_rpm_sigs.return_value = self.queryrpmsigs
expected_msg = "File %s cannot be deleted." % filepath
expected_msg = "Failed to delete %s" % filepath
with self.assertRaises(koji.GenericError) as ex:
kojihub.delete_rpm_sig(rpminfo, all_sigs=True)
self.assertEqual(ex.exception.args[0], expected_msg)
@ -148,18 +206,16 @@ class TestDeleteRPMSig(unittest.TestCase):
self.assertEqual(len(self.deletes), 2)
delete = self.deletes[0]
self.assertEqual(delete.table, 'rpmsigs')
self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)i"])
self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)s", "sigkey IN %(found_keys)s"])
delete = self.deletes[1]
self.assertEqual(delete.table, 'rpm_checksum')
self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)i"])
self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)s", "sigkey IN %(found_keys)s"])
self.get_rpm.assert_called_once_with(rpminfo, strict=True)
self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey=None)
self.get_build.assert_called_once_with(self.rinfo['build_id'])
self.get_build.assert_called_once_with(self.rinfo['build_id'], strict=True)
@mock.patch('koji.pathinfo.build', return_value='fakebuildpath')
@mock.patch('os.remove')
def test_valid(self, os_remove, pb):
def test_valid(self):
rpminfo = 2
self.get_rpm.return_value = self.rinfo
self.get_build.return_value = self.buildinfo
@ -167,14 +223,29 @@ class TestDeleteRPMSig(unittest.TestCase):
self.query_rpm_sigs.return_value = self.queryrpmsigs
kojihub.delete_rpm_sig(rpminfo, all_sigs=True)
# the files should be gone
for sigkey in self.signed:
if os.path.exists(self.signed[sigkey]):
raise Exception('signed copy not deleted')
for sigkey in self.sighdr:
if os.path.exists(self.sighdr[sigkey]):
raise Exception('header still in place')
# the sighdrs should be saved
for siginfo in self.queryrpmsigs:
sigkey = siginfo['sigkey']
backup = "%s.%s.save" % (self.sighdr[sigkey], siginfo['sighash'])
with open(backup, 'rt') as fo:
self.assertEqual(fo.read(), 'DETACHED SIGHDR\n')
self.assertEqual(len(self.deletes), 2)
delete = self.deletes[0]
self.assertEqual(delete.table, 'rpmsigs')
self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)i"])
self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)s", "sigkey IN %(found_keys)s"])
delete = self.deletes[1]
self.assertEqual(delete.table, 'rpm_checksum')
self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)i"])
self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)s", "sigkey IN %(found_keys)s"])
self.get_rpm.assert_called_once_with(rpminfo, strict=True)
self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey=None)
self.get_build.assert_called_once_with(self.rinfo['build_id'])
self.get_build.assert_called_once_with(self.rinfo['build_id'], strict=True)