allow renaming rpm signatures

This commit is contained in:
Mike McLean 2025-04-14 23:27:30 -04:00
parent 311dfde77b
commit a3fc36fa85
2 changed files with 435 additions and 62 deletions

View file

@ -7573,7 +7573,7 @@ class CG_Importer(object):
fn = fileinfo['hub.path'] fn = fileinfo['hub.path']
rpminfo = import_rpm(fn, buildinfo, brinfo.id, fileinfo=fileinfo) rpminfo = import_rpm(fn, buildinfo, brinfo.id, fileinfo=fileinfo)
import_rpm_file(fn, buildinfo, rpminfo) import_rpm_file(fn, buildinfo, rpminfo)
add_rpm_sig(rpminfo['id'], koji.rip_rpm_sighdr(fn)) add_rpm_sig(rpminfo['id'], koji.rip_rpm_sighdr(fn), sigkey=fileinfo.get('sigkey'))
def import_log(self, buildinfo, fileinfo): def import_log(self, buildinfo, fileinfo):
if fileinfo.get('metadata_only', False): if fileinfo.get('metadata_only', False):
@ -8251,7 +8251,7 @@ def _generate_maven_metadata(mavendir):
sumobj.write(sum.hexdigest()) sumobj.write(sum.hexdigest())
def add_rpm_sig(an_rpm, sighdr): def add_rpm_sig(an_rpm, sighdr, sigkey=None):
"""Store a signature header for an rpm""" """Store a signature header for an rpm"""
# calling function should perform permission checks, if applicable # calling function should perform permission checks, if applicable
rinfo = get_rpm(an_rpm, strict=True) rinfo = get_rpm(an_rpm, strict=True)
@ -8262,30 +8262,38 @@ def add_rpm_sig(an_rpm, sighdr):
builddir = koji.pathinfo.build(binfo) builddir = koji.pathinfo.build(binfo)
if not os.path.isdir(builddir): if not os.path.isdir(builddir):
raise koji.GenericError("No such directory: %s" % builddir) raise koji.GenericError("No such directory: %s" % builddir)
if sigkey is not None:
verify_name_internal(sigkey)
# verify sigmd5 matches rpm and pick sigkey if needed
rawhdr = koji.RawHeader(sighdr) rawhdr = koji.RawHeader(sighdr)
sigmd5 = koji.hex_string(rawhdr.get(koji.RPM_SIGTAG_MD5)) sigmd5 = koji.hex_string(rawhdr.get(koji.RPM_SIGTAG_MD5))
if sigmd5 == rinfo['payloadhash']: if sigmd5 != rinfo['payloadhash']:
# note: payloadhash is a misnomer, that field is populated with sigmd5. # note: payloadhash is a misnomer, that field is populated with sigmd5.
sigkey = rawhdr.get(koji.RPM_SIGTAG_GPG)
if not sigkey:
sigkey = rawhdr.get(koji.RPM_SIGTAG_PGP)
if not sigkey:
sigkey = rawhdr.get(koji.RPM_SIGTAG_DSA)
if not sigkey:
sigkey = rawhdr.get(koji.RPM_SIGTAG_RSA)
else:
# Double check using rpm in case we have somehow misread # Double check using rpm in case we have somehow misread
rpm_path = "%s/%s" % (builddir, koji.pathinfo.rpm(rinfo)) rpm_path = "%s/%s" % (builddir, koji.pathinfo.rpm(rinfo))
sigmd5, sigkey = _scan_sighdr(sighdr, rpm_path) sigmd5, rawsig = _scan_sighdr(sighdr, rpm_path)
sigmd5 = koji.hex_string(sigmd5) sigmd5 = koji.hex_string(sigmd5)
if sigmd5 != rinfo['payloadhash']: if sigmd5 != rinfo['payloadhash']:
nvra = "%(name)s-%(version)s-%(release)s.%(arch)s" % rinfo nvra = "%(name)s-%(version)s-%(release)s.%(arch)s" % rinfo
raise koji.GenericError("wrong md5 for %s: %s" % (nvra, sigmd5)) raise koji.GenericError("wrong md5 for %s: %s" % (nvra, sigmd5))
if not sigkey: elif sigkey is None:
sigkey = '' rawsig = rawhdr.get(koji.RPM_SIGTAG_GPG)
# we use the sigkey='' to represent unsigned in the db (so that uniqueness works) if not rawsig:
else: rawsig = rawhdr.get(koji.RPM_SIGTAG_PGP)
sigkey = koji.get_sigpacket_key_id(sigkey) if not rawsig:
sigkey = rawhdr.get(koji.RPM_SIGTAG_DSA)
if not rawsig:
rawsig = rawhdr.get(koji.RPM_SIGTAG_RSA)
if sigkey is None:
if not rawsig:
sigkey = ''
# we use the sigkey='' to represent unsigned in the db (so that uniqueness works)
else:
sigkey = koji.get_sigpacket_key_id(rawsig)
# do the insert
sighash = md5_constructor(sighdr).hexdigest() sighash = md5_constructor(sighdr).hexdigest()
rpm_id = rinfo['id'] rpm_id = rinfo['id']
koji.plugin.run_callbacks('preRPMSign', sigkey=sigkey, sighash=sighash, build=binfo, rpm=rinfo) koji.plugin.run_callbacks('preRPMSign', sigkey=sigkey, sighash=sighash, build=binfo, rpm=rinfo)
@ -8296,6 +8304,7 @@ def add_rpm_sig(an_rpm, sighdr):
except IntegrityError: except IntegrityError:
nvra = "%(name)s-%(version)s-%(release)s.%(arch)s" % rinfo nvra = "%(name)s-%(version)s-%(release)s.%(arch)s" % rinfo
raise koji.GenericError("Signature already exists for package %s, key %s" % (nvra, sigkey)) raise koji.GenericError("Signature already exists for package %s, key %s" % (nvra, sigkey))
# - write to fs # - write to fs
sigpath = "%s/%s" % (builddir, koji.pathinfo.sighdr(rinfo, sigkey)) sigpath = "%s/%s" % (builddir, koji.pathinfo.sighdr(rinfo, sigkey))
koji.ensuredir(os.path.dirname(sigpath)) koji.ensuredir(os.path.dirname(sigpath))
@ -8305,6 +8314,80 @@ def add_rpm_sig(an_rpm, sighdr):
sigkey=sigkey, sighash=sighash, build=binfo, rpm=rinfo) sigkey=sigkey, sighash=sighash, build=binfo, rpm=rinfo)
def rename_rpm_sig(rpminfo, oldkey, newkey):
"""Change the sigkey for an rpm signature"""
verify_name_internal(newkey)
rinfo = get_rpm(rpminfo, strict=True)
nvra = "%(name)s-%(version)s-%(release)s.%(arch)s" % rinfo
if rinfo['external_repo_id']:
raise koji.GenericError("Not an internal rpm: %s (from %s)"
% (rpminfo, rinfo['external_repo_name']))
# Determine what signature we have
rows = query_rpm_sigs(rpm_id=rinfo['id'], sigkey=oldkey)
if not rows:
raise koji.GenericError(f'No {oldkey} signature for rpm {nvra}')
# Check if newkey exists already
rows = query_rpm_sigs(rpm_id=rinfo['id'], sigkey=newkey)
if rows:
raise koji.GenericError(f'A {newkey} signature already exists for rpm {nvra}')
# Update db
update = UpdateProcessor(
table='rpmsigs',
data={'sigkey': newkey},
clauses=["rpm_id=%(rpm_id)s", "sigkey=%(oldkey)s"],
values={'rpm_id': rinfo['id'], 'oldkey': oldkey},
)
update.execute()
# Get the base build dir for our paths
binfo = get_build(rinfo['build_id'], strict=True)
builddir = koji.pathinfo.build(binfo)
# Check header file
old_path = joinpath(builddir, koji.pathinfo.sighdr(rinfo, oldkey))
if not os.path.exists(old_path):
raise koji.GenericError(f'Missing signature header file: {old_path}')
new_path = joinpath(builddir, koji.pathinfo.sighdr(rinfo, newkey))
if os.path.exists(new_path):
# shouldn't happen, newkey isn't in db
raise koji.GenericError(f'Signature header file already exists: {new_path}')
# Check signed copies
new_signed_path = joinpath(builddir, koji.pathinfo.signed(rinfo, newkey))
if os.path.exists(new_signed_path):
# shouldn't happen, newkey isn't in db
raise koji.GenericError(f'Signed copy already exists: {new_signed_path}')
# rename the signed copy first if present, lowest risk
old_signed_path = joinpath(builddir, koji.pathinfo.signed(rinfo, oldkey))
if os.path.exists(old_signed_path):
# signed copies might not exist
try:
koji.ensuredir(os.path.dirname(new_signed_path))
os.rename(old_signed_path, new_signed_path)
except Exception:
# shouldn't happen and may need cleanup, so log copiously
logger.error(f"Failed to rename {old_signed_path}", exc_info=True)
raise koji.GenericError(f"Failed to rename {old_signed_path}")
# rename the header file next
try:
koji.ensuredir(os.path.dirname(new_path))
os.rename(old_path, new_path)
except Exception:
# shouldn't happen and may need cleanup, so log copiously
logger.error(f"Failed to rename {old_path}", exc_info=True)
raise koji.GenericError(f"Failed to rename {old_path}")
# Note: we do not delete any empty parent dirs
logger.warning("Renamed signature for rpm %s: %s to %s", nvra, oldkey, newkey)
def delete_rpm_sig(rpminfo, sigkey=None, all_sigs=False): def delete_rpm_sig(rpminfo, sigkey=None, all_sigs=False):
"""Delete rpm signature """Delete rpm signature
@ -8447,47 +8530,6 @@ def _scan_sighdr(sighdr, fn):
return koji.get_header_field(hdr, 'sigmd5'), sig return koji.get_header_field(hdr, 'sigmd5'), sig
def check_rpm_sig(an_rpm, sigkey, sighdr):
# verify that the provided signature header matches the key and rpm
rinfo = get_rpm(an_rpm, strict=True)
binfo = get_build(rinfo['build_id'])
builddir = koji.pathinfo.build(binfo)
rpm_path = "%s/%s" % (builddir, koji.pathinfo.rpm(rinfo))
if not os.path.exists(rpm_path):
raise koji.GenericError("No such path: %s" % rpm_path)
if not os.path.isfile(rpm_path):
raise koji.GenericError("Not a regular file: %s" % rpm_path)
fd, temp = tempfile.mkstemp()
os.close(fd)
try:
koji.splice_rpm_sighdr(sighdr, rpm_path, dst=temp)
ts = rpm.TransactionSet()
ts.setVSFlags(0) # full verify
with open(temp, 'rb') as fo:
hdr = ts.hdrFromFdno(fo.fileno())
except Exception:
try:
os.unlink(temp)
except Exception:
pass
raise
raw_key = koji.get_header_field(hdr, 'siggpg')
if not raw_key:
raw_key = koji.get_header_field(hdr, 'sigpgp')
if not raw_key:
raw_key = koji.get_header_field(hdr, 'dsaheader')
if not raw_key:
raw_key = koji.get_header_field(hdr, 'rsaheader')
if not raw_key:
found_key = None
else:
found_key = koji.get_sigpacket_key_id(raw_key)
if sigkey.lower() != found_key:
raise koji.GenericError("Signature key mismatch: got %s, expected %s"
% (found_key, sigkey))
os.unlink(temp)
def query_rpm_sigs(rpm_id=None, sigkey=None, queryOpts=None): def query_rpm_sigs(rpm_id=None, sigkey=None, queryOpts=None):
"""Queries db for rpm signatures """Queries db for rpm signatures
@ -11661,7 +11703,7 @@ class RootExports(object):
reject_draft(build) reject_draft(build)
new_image_build(build) new_image_build(build)
def importRPM(self, path, basename): def importRPM(self, path, basename, sigkey=None):
"""Import an RPM into the database. """Import an RPM into the database.
The file must be uploaded first. The file must be uploaded first.
@ -11673,7 +11715,7 @@ class RootExports(object):
raise koji.GenericError("No such file: %s" % fn) raise koji.GenericError("No such file: %s" % fn)
rpminfo = import_rpm(fn) rpminfo = import_rpm(fn)
import_rpm_file(fn, rpminfo['build'], rpminfo) import_rpm_file(fn, rpminfo['build'], rpminfo)
add_rpm_sig(rpminfo['id'], koji.rip_rpm_sighdr(fn)) add_rpm_sig(rpminfo['id'], koji.rip_rpm_sighdr(fn), sigkey=sigkey)
for tag in list_tags(build=rpminfo['build_id']): for tag in list_tags(build=rpminfo['build_id']):
set_tag_update(tag['id'], 'IMPORT') set_tag_update(tag['id'], 'IMPORT')
return rpminfo return rpminfo
@ -13258,13 +13300,30 @@ class RootExports(object):
# XXX - still not sure if this is the right restriction # XXX - still not sure if this is the right restriction
return write_signed_rpm(an_rpm, sigkey, force) return write_signed_rpm(an_rpm, sigkey, force)
def addRPMSig(self, an_rpm, data): def addRPMSig(self, an_rpm, data, sigkey=None):
"""Store a signature header for an rpm """Store a signature header for an rpm
data: the signature header encoded as base64 data: the signature header encoded as base64
""" """
context.session.assertPerm('sign') context.session.assertPerm('sign')
return add_rpm_sig(an_rpm, base64.b64decode(data)) return add_rpm_sig(an_rpm, base64.b64decode(data), sigkey=sigkey)
def renameRPMSig(self, rpminfo, oldkey, newkey):
"""Rename rpm signature
This changes the 'sigkey' value for a stored rpm signature and renames
the files approriately.
This call requires ``admin`` permission (``sign`` is not sufficient).
:param dict/str/id rpm: map containing 'name', 'version', 'release', and 'arch'
string N-V-R.A
int ID
:param str oldkey: Old signature key
:param str newkey: New signature key
"""
context.session.assertPerm('admin')
return rename_rpm_sig(rpminfo, oldkey, newkey)
def deleteRPMSig(self, rpminfo, sigkey=None, all_sigs=False): def deleteRPMSig(self, rpminfo, sigkey=None, all_sigs=False):
"""Delete rpm signature """Delete rpm signature
@ -13272,6 +13331,8 @@ class RootExports(object):
Only use this method in extreme situations, because it goes against Only use this method in extreme situations, because it goes against
Koji's design of immutable, auditable data. Koji's design of immutable, auditable data.
In most cases, it is preferable to use renameRPMSig instead.
This call requires ``admin`` permission (``sign`` is not sufficient). This call requires ``admin`` permission (``sign`` is not sufficient).
:param dict/str/id rpm: map containing 'name', 'version', 'release', and 'arch' :param dict/str/id rpm: map containing 'name', 'version', 'release', and 'arch'

View file

@ -0,0 +1,312 @@
import os
import tempfile
import shutil
import unittest
from unittest import mock
import koji
import kojihub
from koji.util import joinpath
UP = kojihub.UpdateProcessor
class TestRenameRPMSig(unittest.TestCase):
def getUpdate(self, *args, **kwargs):
update = UP(*args, **kwargs)
update.execute = mock.MagicMock()
self.updates.append(update)
return update
def setUp(self):
self.tempdir = tempfile.mkdtemp()
self.pathinfo = koji.PathInfo(self.tempdir)
mock.patch('koji.pathinfo', new=self.pathinfo).start()
self.updates = []
self.UpdateProcessor = mock.patch('kojihub.kojihub.UpdateProcessor',
side_effect=self.getUpdate).start()
self.get_rpm = mock.patch('kojihub.kojihub.get_rpm').start()
self.query_rpm_sigs = mock.patch('kojihub.kojihub.query_rpm_sigs').start()
self.get_build = mock.patch('kojihub.kojihub.get_build').start()
self.get_user = mock.patch('kojihub.kojihub.get_user').start()
self.verify_name_internal = mock.patch('kojihub.kojihub.verify_name_internal').start()
self.buildinfo = {'build_id': 1,
'epoch': None,
'extra': None,
'id': 1,
'name': 'fs_mark',
'nvr': 'fs_mark-3.3-20.el8',
'owner_id': 1,
'owner_name': 'kojiadmin',
'package_id': 1,
'package_name': 'fs_mark',
'release': '20.el8',
'state': 1,
'task_id': None,
'version': '3.3'}
self.rinfo = {'arch': 'x86_64',
'build_id': 1,
'buildroot_id': None,
'buildtime': 1564782768,
'epoch': None,
'external_repo_id': None,
'extra': None,
'id': 2,
'metadata_only': False,
'name': 'fs_mark',
'payloadhash': 'ed0690ab4b0508f2448d99a08e0a004a',
'release': '20.el8',
'size': 25644,
'version': '3.3'}
self.rpmsigs = {
'': {'rpm_id': 2, 'sighash': 'cb4d01bd3671b41ef51abc9be851e614', 'sigkey': ''},
'2f86d6a1': {'rpm_id': 2, 'sighash': '78c245caa6deb70f0abc8b844c642cd6',
'sigkey': '2f86d6a1'}
}
self.queryrpmsigs = [self.rpmsigs[k] for k in sorted(self.rpmsigs)]
self.userinfo = {'authtype': 2, 'id': 1, 'krb_principal': None, 'krb_principals': [],
'name': 'testuser', 'status': 0, 'usertype': 0}
self.set_up_files()
def set_up_files(self):
builddir = self.pathinfo.build(self.buildinfo)
os.makedirs(builddir)
self.builddir = builddir
self.signed = {}
self.sighdr = {}
for sig in self.queryrpmsigs:
key = sig['sigkey']
signed = joinpath(builddir, self.pathinfo.signed(self.rinfo, key))
self.signed[key] = signed
koji.ensuredir(os.path.dirname(signed))
with open(signed, 'wt') as fo:
fo.write('SIGNED COPY\n')
sighdr = joinpath(builddir, self.pathinfo.sighdr(self.rinfo, key))
self.sighdr[key] = sighdr
koji.ensuredir(os.path.dirname(sighdr))
with open(sighdr, 'wt') as fo:
fo.write('DETACHED SIGHDR\n')
def get_files(self, with_dirs=True):
data = []
for root, dirs, files in os.walk(self.builddir):
for name in files:
fn = os.path.join(root, name)
with open(fn, 'rt') as fp:
contents = fp.read()
# tuple because we will sort later
data.append((fn, contents))
if with_dirs:
for name in dirs:
fn = os.path.join(root, name)
data.append((fn,))
data.sort()
return data
def tearDown(self):
mock.patch.stopall()
shutil.rmtree(self.tempdir)
def test_rpm_missing(self):
rpm_id = 1234
expected_msg = 'No such rpm: %s' % rpm_id
self.get_rpm.side_effect = koji.GenericError("No such rpm: %s" % rpm_id)
before = self.get_files()
with self.assertRaises(koji.GenericError) as ex:
kojihub.rename_rpm_sig(rpm_id, 'foo', 'bar')
self.assertEqual(len(self.updates), 0)
self.assertEqual(ex.exception.args[0], expected_msg)
self.get_rpm.assert_called_once_with(rpm_id, strict=True)
self.query_rpm_sigs.assert_not_called()
self.assertEqual(before, self.get_files())
def test_external_repo(self):
rpminfo = 1234
rinfo = self.rinfo.copy()
rinfo.update({'external_repo_id': 1, 'external_repo_name': 'INTERNAL'})
self.get_rpm.return_value = rinfo
before = self.get_files()
with self.assertRaises(koji.GenericError) as ex:
kojihub.rename_rpm_sig(rpminfo, 'foo', 'bar')
self.assertEqual(len(self.updates), 0)
expected_msg = "Not an internal rpm: %s (from %s)" % (rpminfo, rinfo['external_repo_name'])
self.assertEqual(ex.exception.args[0], expected_msg)
self.get_rpm.assert_called_once_with(rpminfo, strict=True)
self.query_rpm_sigs.assert_not_called()
self.assertEqual(before, self.get_files())
def test_no_oldsig(self):
rpminfo = 1234
nvra = "%s-%s-%s.%s" % (self.rinfo['name'], self.rinfo['version'], self.rinfo['release'],
self.rinfo['arch'])
expected_msg = "No foo signature for rpm %s" % nvra
self.get_rpm.return_value = self.rinfo
self.query_rpm_sigs.return_value = []
before = self.get_files()
with self.assertRaises(koji.GenericError) as ex:
kojihub.rename_rpm_sig(rpminfo, 'foo', 'bar')
self.assertEqual(len(self.updates), 0)
self.assertEqual(ex.exception.args[0], expected_msg)
self.get_rpm.assert_called_once_with(rpminfo, strict=True)
self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey='foo')
self.assertEqual(before, self.get_files())
def test_already_got_one(self):
rpminfo = 1234
nvra = "%s-%s-%s.%s" % (self.rinfo['name'], self.rinfo['version'], self.rinfo['release'],
self.rinfo['arch'])
expected_msg = "A bar signature already exists for rpm %s" % nvra
self.get_rpm.return_value = self.rinfo
self.query_rpm_sigs.side_effect = [self.query_rpm_sigs, [{'foo':1}]]
before = self.get_files()
with self.assertRaises(koji.GenericError) as ex:
kojihub.rename_rpm_sig(rpminfo, 'foo', 'bar')
self.assertEqual(len(self.updates), 0)
self.assertEqual(ex.exception.args[0], expected_msg)
self.get_rpm.assert_called_once_with(rpminfo, strict=True)
self.assertEqual(before, self.get_files())
def test_header_missing(self):
rpminfo = self.rinfo['id']
self.get_rpm.return_value = self.rinfo
self.get_build.return_value = self.buildinfo
self.get_user.return_value = self.userinfo
self.query_rpm_sigs.side_effect = [self.query_rpm_sigs, []]
# a missing header should error
builddir = self.pathinfo.build(self.buildinfo)
sigkey = '2f86d6a1'
os.remove(self.sighdr[sigkey])
before = self.get_files()
with self.assertRaises(koji.GenericError) as ex:
kojihub.rename_rpm_sig(rpminfo, sigkey, 'foobar')
expected_msg = "Missing signature header file: %s" % self.sighdr[sigkey]
self.assertEqual(ex.exception.args[0], expected_msg)
self.assertEqual(before, self.get_files())
def test_valid(self):
rpminfo = 2
self.get_rpm.return_value = self.rinfo
self.get_build.return_value = self.buildinfo
self.get_user.return_value = self.userinfo
self.query_rpm_sigs.side_effect = [self.query_rpm_sigs, []]
oldkey = '2f86d6a1'
kojihub.rename_rpm_sig(rpminfo, oldkey, 'foobar')
# the old files should be gone
if os.path.exists(self.signed[oldkey]):
raise Exception('signed copy not deleted')
if os.path.exists(self.sighdr[oldkey]):
raise Exception('header still in place')
# the new file should be there
sighdr = joinpath(self.builddir, self.pathinfo.sighdr(self.rinfo, 'foobar'))
signed = joinpath(self.builddir, self.pathinfo.signed(self.rinfo, 'foobar'))
with open(sighdr, 'rt') as fp:
self.assertEqual(fp.read(), 'DETACHED SIGHDR\n')
with open(signed, 'rt') as fp:
self.assertEqual(fp.read(), 'SIGNED COPY\n')
self.assertEqual(len(self.updates), 1)
update = self.updates[0]
self.assertEqual(update.table, 'rpmsigs')
self.assertEqual(update.clauses, ["rpm_id=%(rpm_id)s", "sigkey=%(oldkey)s"])
self.get_build.assert_called_once_with(self.rinfo['build_id'], strict=True)
def test_already_got_signed_copy(self):
rpminfo = 2
self.get_rpm.return_value = self.rinfo
self.get_build.return_value = self.buildinfo
self.get_user.return_value = self.userinfo
self.query_rpm_sigs.side_effect = [self.query_rpm_sigs, []]
oldkey = '2f86d6a1'
signed = joinpath(self.builddir, self.pathinfo.signed(self.rinfo, 'foobar'))
koji.ensuredir(os.path.dirname(signed))
with open(signed, 'wt') as fp:
fp.write('STRAY SIGNED COPY\n')
before = self.get_files()
with self.assertRaises(koji.GenericError) as ex:
kojihub.rename_rpm_sig(rpminfo, oldkey, 'foobar')
expected_msg = f'Signed copy already exists: {signed}'
self.assertEqual(ex.exception.args[0], expected_msg)
self.get_rpm.assert_called_once_with(rpminfo, strict=True)
self.assertEqual(before, self.get_files())
def test_already_got_sighdr(self):
rpminfo = 2
self.get_rpm.return_value = self.rinfo
self.get_build.return_value = self.buildinfo
self.get_user.return_value = self.userinfo
self.query_rpm_sigs.side_effect = [self.query_rpm_sigs, []]
oldkey = '2f86d6a1'
sighdr = joinpath(self.builddir, self.pathinfo.sighdr(self.rinfo, 'foobar'))
koji.ensuredir(os.path.dirname(sighdr))
with open(sighdr, 'wt') as fp:
fp.write('STRAY HEADER\n')
before = self.get_files()
with self.assertRaises(koji.GenericError) as ex:
kojihub.rename_rpm_sig(rpminfo, oldkey, 'foobar')
expected_msg = f'Signature header file already exists: {sighdr}'
self.assertEqual(ex.exception.args[0], expected_msg)
self.get_rpm.assert_called_once_with(rpminfo, strict=True)
self.assertEqual(before, self.get_files())
@mock.patch('os.rename')
def test_first_rename_fails(self, rename):
rpminfo = 2
self.get_rpm.return_value = self.rinfo
self.get_build.return_value = self.buildinfo
self.get_user.return_value = self.userinfo
self.query_rpm_sigs.side_effect = [self.query_rpm_sigs, []]
oldkey = '2f86d6a1'
rename.side_effect = FileNotFoundError('...')
before = self.get_files(with_dirs=False)
# this error case will leave a stray dir
with self.assertRaises(koji.GenericError) as ex:
kojihub.rename_rpm_sig(rpminfo, oldkey, 'foobar')
oldpath = joinpath(self.builddir, self.pathinfo.signed(self.rinfo, oldkey))
expected_msg = f'Failed to rename {oldpath}'
self.assertEqual(ex.exception.args[0], expected_msg)
self.get_rpm.assert_called_once_with(rpminfo, strict=True)
self.assertEqual(before, self.get_files(with_dirs=False))
@mock.patch('os.rename')
def test_second_rename_fails(self, rename):
rpminfo = 2
self.get_rpm.return_value = self.rinfo
self.get_build.return_value = self.buildinfo
self.get_user.return_value = self.userinfo
self.query_rpm_sigs.side_effect = [self.query_rpm_sigs, []]
oldkey = '2f86d6a1'
rename.side_effect = [None, FileNotFoundError('...')]
with self.assertRaises(koji.GenericError) as ex:
kojihub.rename_rpm_sig(rpminfo, oldkey, 'foobar')
oldpath = joinpath(self.builddir, self.pathinfo.sighdr(self.rinfo, oldkey))
expected_msg = f'Failed to rename {oldpath}'
self.assertEqual(ex.exception.args[0], expected_msg)
self.get_rpm.assert_called_once_with(rpminfo, strict=True)
# the end