Add delete-rpm-sig CLI and deleteRPMSig hub call

Fixes: https://pagure.io/koji/issue/2665
This commit is contained in:
Jana Cupova 2021-07-07 07:32:57 +02:00 committed by Tomas Kopecek
parent 8d1a94c8d4
commit fb8cfc5678
6 changed files with 347 additions and 0 deletions

View file

@ -1640,6 +1640,36 @@ def handle_import_sig(goptions, session, args):
session.writeSignedRPM(rinfo['id'], sigkey)
def handle_remove_sig(goptions, session, args):
"[admin] Remove signed RPMs from db and disk"
usage = _("usage: %prog remove-sig [options] <rpm-id/n-v-r.a/rpminfo>")
parser = OptionParser(usage=get_usage_str(usage))
parser.add_option("--sigkey", action="store", default=None, help=_("Specify signature key"))
parser.add_option("--all", action="store_true",
help=_("Remove all signed copies for specified RPM"))
(options, args) = parser.parse_args(args)
if len(args) < 1:
parser.error(_("Please specify an RPM"))
if not options.all and not options.sigkey:
error("Either --sigkey or --all options must be given")
if options.all and options.sigkey:
error("Conflicting options specified")
activate_session(session, goptions)
rpminfo = args[0]
rinfo = session.getRPM(rpminfo)
if not rinfo:
print("No such rpm in system: %s" % rpminfo)
else:
try:
session.deleteRPMSig(rpminfo, sigkey=options.sigkey)
except koji.GenericError:
print("Signature %s for rpm %s does not existing" % (options.sigkey, rpminfo))
def handle_write_signed_rpm(goptions, session, args):
"[admin] Write signed RPMs to disk"
usage = _("usage: %prog write-signed-rpm [options] <signature-key> <n-v-r> [<n-v-r> ...]")

View file

@ -7585,6 +7585,75 @@ def add_rpm_sig(an_rpm, sighdr):
sigkey=sigkey, sighash=sighash, build=binfo, rpm=rinfo)
def delete_rpm_sig(rpminfo, sigkey=None, all_sigs=False):
"""Delete rpm signature
:param dict/str/id rpm: map containing 'name', 'version', 'release', and 'arch'
string N-V-R.A
int ID
:param str sigkey: Signature key.
:param bool all_sigs: Delete all signed copies for specified RPM.
"""
if all_sigs:
sigkey = None
elif not sigkey:
raise koji.GenericError("No signature specified")
rinfo = get_rpm(rpminfo, strict=True)
if rinfo['external_repo_id']:
raise koji.GenericError("Not an internal rpm: %s (from %s)"
% (rpminfo, rinfo['external_repo_name']))
rpm_query_result = query_rpm_sigs(rpm_id=rinfo['id'], sigkey=sigkey)
if not rpm_query_result:
nvra = "%(name)s-%(version)s-%(release)s.%(arch)s" % rinfo
raise koji.GenericError("%s has no matching signatures to delete" % nvra)
clauses = ["rpm_id=%(rpm_id)i"]
if sigkey is not None:
clauses.append("sigkey=%(sigkey)s")
clauses_str = " AND ".join(clauses)
delete = """DELETE FROM rpmsigs WHERE %s""" % clauses_str
rpm_id = rinfo['id']
_dml(delete, locals())
binfo = get_build(rinfo['build_id'])
builddir = koji.pathinfo.build(binfo)
list_sigcaches = []
list_sighdrs = []
for rpmsig in rpm_query_result:
list_sigcaches.append(joinpath(builddir, koji.pathinfo.sighdr(rinfo, rpmsig['sigkey'])))
list_sighdrs.append(joinpath(builddir, koji.pathinfo.signed(rinfo, rpmsig['sigkey'])))
list_paths = list_sighdrs + list_sigcaches
count = 0
for file_path in list_paths:
try:
os.remove(file_path)
count += 1
except FileNotFoundError:
logger.info("File: %s has been deleted", file_path)
except Exception:
logger.error("An error happens when deleting %s, %s deleting are deleted, "
"%s deleting are skipped, the original request is %s rpm "
"and %s sigkey", file_path,
list_paths[:count], list_paths[count:], rpminfo, sigkey, exc_info=True)
raise koji.GenericError("File %s cannot be deleted." % file_path)
for path in list_paths:
basedir = joinpath(builddir, os.path.dirname(path))
if os.path.isdir(basedir) and not os.listdir(basedir):
try:
os.rmdir(basedir)
except OSError:
logger.warning("An error happens when deleting %s directory",
basedir, exc_info=True)
sigdir = os.path.dirname(basedir)
if os.path.isdir(sigdir) and not os.listdir(sigdir):
try:
os.rmdir(sigdir)
except OSError:
logger.warning("An error happens when deleting %s directory",
sigdir, exc_info=True)
def _scan_sighdr(sighdr, fn):
"""Splices sighdr with other headers from fn and queries (no payload)"""
# This is hackish, but it works
@ -11941,6 +12010,18 @@ class RootExports(object):
context.session.assertPerm('sign')
return add_rpm_sig(an_rpm, base64.b64decode(data))
def deleteRPMSig(self, rpminfo, sigkey=None, all_sigs=False):
"""Delete rpm signature
:param dict/str/id rpm: map containing 'name', 'version', 'release', and 'arch'
string N-V-R.A
int ID
:param str sigkey: Signature key.
:param bool all_sigs: Delete all signed copies for specified RPM.
"""
context.session.assertPerm('admin')
return delete_rpm_sig(rpminfo, sigkey=sigkey, all_sigs=all_sigs)
findBuildID = staticmethod(find_build_id)
getTagID = staticmethod(get_tag_id)
getTag = staticmethod(get_tag)

View file

@ -50,6 +50,7 @@ admin commands:
remove-group Remove group from tag
remove-host-from-channel Remove a host from a channel
remove-pkg Remove a package from the listing for tag
remove-sig Remove signed RPMs from db and disk
remove-tag Remove a tag
remove-tag-inheritance Remove a tag inheritance link
remove-target Remove a build target

View file

@ -50,6 +50,7 @@ admin commands:
remove-group Remove group from tag
remove-host-from-channel Remove a host from a channel
remove-pkg Remove a package from the listing for tag
remove-sig Remove signed RPMs from db and disk
remove-tag Remove a tag
remove-tag-inheritance Remove a tag inheritance link
remove-target Remove a build target

View file

@ -0,0 +1,73 @@
from __future__ import absolute_import
import mock
from six.moves import StringIO
import koji
from koji_cli.commands import handle_remove_sig
from . import utils
class TestRemoveSig(utils.CliTestCase):
maxDiff = None
def setUp(self):
self.options = mock.MagicMock()
self.options.debug = False
self.session = mock.MagicMock()
self.session.getAPIVersion.return_value = koji.API_VERSION
def test_delete_sig_help(self):
self.assert_help(
handle_remove_sig,
"""Usage: %s remove-sig [options] <rpm-id/n-v-r.a/rpminfo>
(Specify the --help global option for a list of other help options)
Options:
-h, --help show this help message and exit
--sigkey=SIGKEY Specify signature key
--all Remove all signed copies for specified RPM
""" % self.progname)
@mock.patch('sys.stderr', new_callable=StringIO)
def test_delete_sig_without_option(self, stderr):
expected = "Usage: %s remove-sig [options] <rpm-id/n-v-r.a/rpminfo>\n" \
"(Specify the --help global option for a list of other help options)\n\n" \
"%s: error: Please specify an RPM\n" % (self.progname, self.progname)
with self.assertRaises(SystemExit) as ex:
handle_remove_sig(self.options, self.session, [])
self.assertExitCode(ex, 2)
self.assert_console_message(stderr, expected)
@mock.patch('sys.stdout', new_callable=StringIO)
def test_delete_sig_non_exist_rpm(self, stdout):
rpm = '1234'
expected = "No such rpm in system: %s\n" % rpm
self.session.getRPM.return_value = None
handle_remove_sig(self.options, self.session, [rpm, '--all'])
self.assert_console_message(stdout, expected)
self.session.getRPM.assert_called_with('1234')
self.session.deleteRPMSig.assert_not_called()
def test_delete_sig_valid(self):
rpm = '1'
rpminfo = {'arch': 'src',
'build_id': 10,
'buildroot_id': None,
'buildtime': 1618361584,
'epoch': None,
'external_repo_id': 0,
'external_repo_name': 'INTERNAL',
'extra': None,
'id': 1,
'metadata_only': False,
'name': 'koji',
'payloadhash': 'c2b13f978c45e274c856e0a4599842a4',
'release': '1.fc34',
'size': 1178794,
'version': '1.24.1'}
self.session.getRPM.return_value = rpminfo
self.session.deleteRPMSig.return_value = None
handle_remove_sig(self.options, self.session, [rpm, '--sigkey', 'testkey'])
self.session.getRPM.assert_called_with('1')
self.session.deleteRPMSig.assert_called_with('1', sigkey='testkey')

View file

@ -0,0 +1,161 @@
import unittest
import mock
import koji
import kojihub
QP = kojihub.QueryProcessor
class TestDeleteRPMSig(unittest.TestCase):
def getQuery(self, *args, **kwargs):
query = QP(*args, **kwargs)
query.execute = mock.MagicMock()
self.queries.append(query)
return query
def setUp(self):
self.QueryProcessor = mock.patch('kojihub.QueryProcessor',
side_effect=self.getQuery).start()
self.queries = []
self.get_rpm = mock.patch('kojihub.get_rpm').start()
self.query_rpm_sigs = mock.patch('kojihub.query_rpm_sigs').start()
self.get_build = mock.patch('kojihub.get_build').start()
self.buildinfo = {'build_id': 1,
'epoch': None,
'extra': None,
'id': 1,
'name': 'fs_mark',
'nvr': 'fs_mark-3.3-20.el8',
'owner_id': 1,
'owner_name': 'kojiadmin',
'package_id': 1,
'package_name': 'fs_mark',
'release': '20.el8',
'state': 1,
'task_id': None,
'version': '3.3'}
self.rinfo = {'arch': 'x86_64',
'build_id': 1,
'buildroot_id': None,
'buildtime': 1564782768,
'epoch': None,
'external_repo_id': None,
'extra': None,
'id': 2,
'metadata_only': False,
'name': 'fs_mark',
'payloadhash': 'ed0690ab4b0508f2448d99a08e0a004a',
'release': '20.el8',
'size': 25644,
'version': '3.3'}
self.queryrpmsigs = [{'rpm_id': 2, 'sighash': 'cb4d01bd3671b41ef51abc9be851e614',
'sigkey': ''},
{'rpm_id': 2, 'sighash': '78c245caa6deb70f0abc8b844c642cd6',
'sigkey': '2f86d6a1'}]
def tearDown(self):
mock.patch.stopall()
@mock.patch('kojihub._dml')
def test_rpm_not_existing(self, dml):
rpm_id = 1234
expected_msg = 'No such rpm: %s' % rpm_id
self.get_rpm.side_effect = koji.GenericError("No such rpm: %s" % rpm_id)
with self.assertRaises(koji.GenericError) as ex:
kojihub.delete_rpm_sig(rpm_id, all_sigs=True)
self.assertEqual(len(self.queries), 0)
self.assertEqual(ex.exception.args[0], expected_msg)
self.get_rpm.assert_called_once_with(rpm_id, strict=True)
self.query_rpm_sigs.assert_not_called()
dml.assert_not_called()
@mock.patch('kojihub._dml')
def test_not_all_sig_and_not_sigkey(self, dml):
expected_msg = 'No signature specified'
with self.assertRaises(koji.GenericError) as ex:
kojihub.delete_rpm_sig(1234)
self.assertEqual(len(self.queries), 0)
self.assertEqual(ex.exception.args[0], expected_msg)
self.get_rpm.assert_not_called()
self.query_rpm_sigs.assert_not_called()
dml.assert_not_called()
@mock.patch('kojihub._dml')
def test_external_repo(self, dml):
rpminfo = 1234
rinfo = {'external_repo_id': 1, 'external_repo_name': 'INTERNAL'}
self.get_rpm.return_value = rinfo
with self.assertRaises(koji.GenericError) as ex:
kojihub.delete_rpm_sig(rpminfo, all_sigs=True)
self.assertEqual(len(self.queries), 0)
expected_msg = "Not an internal rpm: %s (from %s)" % (rpminfo, rinfo['external_repo_name'])
self.assertEqual(ex.exception.args[0], expected_msg)
self.get_rpm.assert_called_once_with(rpminfo, strict=True)
self.query_rpm_sigs.assert_not_called()
dml.assert_not_called()
@mock.patch('kojihub._dml')
def test_empty_query_sign(self, dml):
rpminfo = 1234
nvra = "%s-%s-%s.%s" % (self.rinfo['name'], self.rinfo['version'], self.rinfo['release'],
self.rinfo['arch'])
expected_msg = "%s has no matching signatures to delete" % nvra
self.get_rpm.return_value = self.rinfo
self.query_rpm_sigs.return_value = []
with self.assertRaises(koji.GenericError) as ex:
kojihub.delete_rpm_sig(rpminfo, all_sigs=True)
self.assertEqual(len(self.queries), 0)
self.assertEqual(ex.exception.args[0], expected_msg)
self.get_rpm.assert_called_once_with(rpminfo, strict=True)
self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey=None)
dml.assert_not_called()
@mock.patch('kojihub._dml')
@mock.patch('koji.pathinfo.build', return_value='fakebuildpath')
@mock.patch('os.remove')
def test_file_not_found_error(self, os_remove, pb, dml):
rpminfo = 2
os_remove.side_effect = FileNotFoundError()
self.get_rpm.return_value = self.rinfo
self.get_build.return_value = self.buildinfo
self.query_rpm_sigs.return_value = self.queryrpmsigs
r = kojihub.delete_rpm_sig(rpminfo, all_sigs=True)
self.assertEqual(r, None)
self.assertEqual(len(self.queries), 0)
self.get_rpm.assert_called_once_with(rpminfo, strict=True)
self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey=None)
self.get_build.assert_called_once_with(self.rinfo['build_id'])
@mock.patch('kojihub._dml')
@mock.patch('koji.pathinfo.build', return_value='fakebuildpath')
@mock.patch('os.remove', side_effect=OSError)
def test_not_valid(self, os_remove, pb, dml):
rpminfo = 2
filepath = 'fakebuildpath/data/signed/x86_64/fs_mark-3.3-20.el8.x86_64.rpm'
self.get_rpm.return_value = self.rinfo
self.get_build.return_value = self.buildinfo
self.query_rpm_sigs.return_value = self.queryrpmsigs
expected_msg = "File %s cannot be deleted." % filepath
with self.assertRaises(koji.GenericError) as ex:
kojihub.delete_rpm_sig(rpminfo, all_sigs=True)
self.assertEqual(ex.exception.args[0], expected_msg)
self.assertEqual(len(self.queries), 0)
self.get_rpm.assert_called_once_with(rpminfo, strict=True)
self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey=None)
self.get_build.assert_called_once_with(self.rinfo['build_id'])
@mock.patch('kojihub._dml')
@mock.patch('koji.pathinfo.build', return_value='fakebuildpath')
@mock.patch('os.remove')
def test_valid(self, os_remove, pb, dml):
rpminfo = 2
self.get_rpm.return_value = self.rinfo
self.get_build.return_value = self.buildinfo
self.query_rpm_sigs.return_value = self.queryrpmsigs
kojihub.delete_rpm_sig(rpminfo, all_sigs=True)
self.get_rpm.assert_called_once_with(rpminfo, strict=True)
self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey=None)
self.get_build.assert_called_once_with(self.rinfo['build_id'])