Support packages that are head-signed
This supports packages which do not have the RPMv3 signature scheme (over the full RPM header+payload), but instead only have signatures over the header (v4 scheme). For the v4 scheme, the signature is only stored in SIGTAG_RSA (or SIGTAG_DSA). Signed-off-by: Patrick Uiterwijk <patrick@puiterwijk.org>
This commit is contained in:
parent
f358fd7bc4
commit
af25fc2e24
6 changed files with 162 additions and 2 deletions
|
|
@ -1617,12 +1617,17 @@ def handle_import_sig(goptions, session, args):
|
|||
activate_session(session, goptions)
|
||||
for path in args:
|
||||
data = koji.get_header_fields(path, ('name', 'version', 'release', 'arch', 'siggpg',
|
||||
'sigpgp', 'sourcepackage'))
|
||||
'sigpgp', 'dsaheader', 'rsaheader',
|
||||
'sourcepackage'))
|
||||
if data['sourcepackage']:
|
||||
data['arch'] = 'src'
|
||||
sigkey = data['siggpg']
|
||||
if not sigkey:
|
||||
sigkey = data['sigpgp']
|
||||
if not sigkey:
|
||||
sigkey = data['dsaheader']
|
||||
if not sigkey:
|
||||
sigkey = data['rsaheader']
|
||||
if not sigkey:
|
||||
sigkey = ""
|
||||
if not options.with_unsigned:
|
||||
|
|
@ -1632,6 +1637,8 @@ def handle_import_sig(goptions, session, args):
|
|||
sigkey = koji.get_sigpacket_key_id(sigkey)
|
||||
del data['siggpg']
|
||||
del data['sigpgp']
|
||||
del data['dsaheader']
|
||||
del data['rsaheader']
|
||||
rinfo = session.getRPM(data)
|
||||
if not rinfo:
|
||||
print("No such rpm in system: %(name)s-%(version)s-%(release)s.%(arch)s" % data)
|
||||
|
|
|
|||
|
|
@ -7575,6 +7575,10 @@ def add_rpm_sig(an_rpm, sighdr):
|
|||
sigkey = rawhdr.get(koji.RPM_SIGTAG_GPG)
|
||||
if not sigkey:
|
||||
sigkey = rawhdr.get(koji.RPM_SIGTAG_PGP)
|
||||
if not sigkey:
|
||||
sigkey = rawhdr.get(koji.RPM_SIGTAG_DSA)
|
||||
if not sigkey:
|
||||
sigkey = rawhdr.get(koji.RPM_SIGTAG_RSA)
|
||||
else:
|
||||
# In older rpms, this field in the signature header does not actually match
|
||||
# sigmd5 (I think rpmlib pulls it from SIGTAG_GPG). Anyway, this
|
||||
|
|
@ -7721,6 +7725,10 @@ def _scan_sighdr(sighdr, fn):
|
|||
sig = koji.get_header_field(hdr, 'siggpg')
|
||||
if not sig:
|
||||
sig = koji.get_header_field(hdr, 'sigpgp')
|
||||
if not sig:
|
||||
sig = koji.get_header_field(hdr, 'dsaheader')
|
||||
if not sig:
|
||||
sig = koji.get_header_field(hdr, 'rsaheader')
|
||||
return koji.get_header_field(hdr, 'sigmd5'), sig
|
||||
|
||||
|
||||
|
|
@ -7751,6 +7759,10 @@ def check_rpm_sig(an_rpm, sigkey, sighdr):
|
|||
raw_key = koji.get_header_field(hdr, 'siggpg')
|
||||
if not raw_key:
|
||||
raw_key = koji.get_header_field(hdr, 'sigpgp')
|
||||
if not raw_key:
|
||||
raw_key = koji.get_header_field(hdr, 'dsaheader')
|
||||
if not raw_key:
|
||||
raw_key = koji.get_header_field(hdr, 'rsaheader')
|
||||
if not raw_key:
|
||||
found_key = None
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -100,6 +100,8 @@ def _(args):
|
|||
RPM_HEADER_MAGIC = six.b('\x8e\xad\xe8')
|
||||
RPM_TAG_HEADERSIGNATURES = 62
|
||||
RPM_TAG_FILEDIGESTALGO = 5011
|
||||
RPM_SIGTAG_DSA = 267
|
||||
RPM_SIGTAG_RSA = 268
|
||||
RPM_SIGTAG_PGP = 1002
|
||||
RPM_SIGTAG_MD5 = 1004
|
||||
RPM_SIGTAG_GPG = 1005
|
||||
|
|
@ -924,6 +926,10 @@ def get_sighdr_key(sighdr):
|
|||
sig = rh.get(RPM_SIGTAG_GPG)
|
||||
if not sig:
|
||||
sig = rh.get(RPM_SIGTAG_PGP)
|
||||
if not sig:
|
||||
sig = rh.GIT(RPM_SIGTAG_DSA)
|
||||
if not sig:
|
||||
sig = rh.GET(RPM_SIGTAG_RSA)
|
||||
if not sig:
|
||||
return None
|
||||
else:
|
||||
|
|
@ -1016,7 +1022,7 @@ def get_header_field(hdr, name, src_arch=False):
|
|||
pass
|
||||
|
||||
# some string results are binary and should not be decoded
|
||||
if name.startswith('SIG'):
|
||||
if name.startswith('SIG') or name.endswith('HEADER'):
|
||||
return result
|
||||
|
||||
# Some older versions of rpm return string header values as bytes. Newer
|
||||
|
|
|
|||
|
|
@ -43,6 +43,8 @@ class TestImportSIG(utils.CliTestCase):
|
|||
'arch': 'x86_64',
|
||||
'siggpg': None,
|
||||
'sigpgp': None,
|
||||
'dsaheader': None,
|
||||
'rsaheader': None,
|
||||
},
|
||||
{
|
||||
'sourcepackage': 1,
|
||||
|
|
@ -52,6 +54,8 @@ class TestImportSIG(utils.CliTestCase):
|
|||
'arch': 'x86_64',
|
||||
'siggpg': None,
|
||||
'sigpgp': None,
|
||||
'dsaheader': None,
|
||||
'rsaheader': None,
|
||||
},
|
||||
{
|
||||
'sourcepackage': 1,
|
||||
|
|
@ -61,6 +65,8 @@ class TestImportSIG(utils.CliTestCase):
|
|||
'arch': 'x86_64',
|
||||
'siggpg': None,
|
||||
'sigpgp': None,
|
||||
'dsaheader': None,
|
||||
'rsaheader': None,
|
||||
}
|
||||
]
|
||||
|
||||
|
|
@ -121,6 +127,8 @@ class TestImportSIG(utils.CliTestCase):
|
|||
for data in self.rpm_headers:
|
||||
data['siggpg'] = fake_sigkey
|
||||
data['sigpgp'] = fake_sigkey
|
||||
data['dsaheader'] = fake_sigkey
|
||||
data['rsaheader'] = fake_sigkey
|
||||
tmp = data.copy()
|
||||
tmp['arch'] = 'src' if tmp['sourcepackage'] else tmp['arch']
|
||||
expected += "No such rpm in system: %(name)s-%(version)s-%(release)s.%(arch)s" % \
|
||||
|
|
@ -240,6 +248,47 @@ class TestImportSIG(utils.CliTestCase):
|
|||
# restore os.path.exists patch
|
||||
os_path_exists_patch.stop()
|
||||
|
||||
@mock.patch('sys.stderr', new_callable=six.StringIO)
|
||||
@mock.patch('sys.stdout', new_callable=six.StringIO)
|
||||
@mock.patch('koji_cli.commands.activate_session')
|
||||
def test_handle_import_sig_sigkey_from_header_signed(
|
||||
self,
|
||||
activate_session_mock,
|
||||
stdout, stderr):
|
||||
"""Test sigkey computation from header-only signed rpm in handle_import_sig function"""
|
||||
data_path = os.path.abspath("tests/test_hub/data/rpms")
|
||||
arguments = [os.path.join(data_path, 'header-signed.rpm')]
|
||||
sigkey = '15f712be'
|
||||
|
||||
options = mock.MagicMock()
|
||||
session = mock.MagicMock()
|
||||
expected = ''
|
||||
|
||||
for pkg in arguments:
|
||||
expected += "Importing signature [key %s] from %s..." % (sigkey, pkg) + "\n"
|
||||
expected += "Writing signed copy" + "\n"
|
||||
|
||||
session.getRPM.side_effect = [
|
||||
{
|
||||
'sourcepackage': 0,
|
||||
'name': 'testpkg',
|
||||
'version': '1.0.0',
|
||||
'release': '1',
|
||||
'arch': 'x86_64',
|
||||
'external_repo_id': 0,
|
||||
'id': 1,
|
||||
}
|
||||
]
|
||||
session.queryRPMSigs.side_effect = None
|
||||
session.queryRPMSigs.return_value = []
|
||||
|
||||
# Run
|
||||
handle_import_sig(options, session, arguments + ['--test'])
|
||||
|
||||
self.assert_console_message(stdout, expected)
|
||||
session.addRPMSig.assert_not_called()
|
||||
session.writeSignedRPM.assert_not_called()
|
||||
|
||||
def test_handle_import_sig_argument_test(self):
|
||||
"""Test handle_import_sig function without arguments"""
|
||||
options = mock.MagicMock()
|
||||
|
|
|
|||
BIN
tests/test_hub/data/rpms/header-signed.rpm
Normal file
BIN
tests/test_hub/data/rpms/header-signed.rpm
Normal file
Binary file not shown.
86
tests/test_hub/test_add_rpm_sig.py
Normal file
86
tests/test_hub/test_add_rpm_sig.py
Normal file
|
|
@ -0,0 +1,86 @@
|
|||
import base64
|
||||
import mock
|
||||
import os
|
||||
import unittest
|
||||
|
||||
import koji
|
||||
import kojihub
|
||||
|
||||
IP = kojihub.InsertProcessor
|
||||
|
||||
|
||||
class TestAddRPMSig(unittest.TestCase):
|
||||
def getInsert(self, *args, **kwargs):
|
||||
insert = IP(*args, **kwargs)
|
||||
insert.execute = mock.MagicMock()
|
||||
self.inserts.append(insert)
|
||||
return insert
|
||||
|
||||
def setUp(self):
|
||||
self.InsertProcessor = mock.patch('kojihub.InsertProcessor',
|
||||
side_effect=self.getInsert).start()
|
||||
self.inserts = []
|
||||
self.context = mock.patch('kojihub.context').start()
|
||||
# It seems MagicMock will not automatically handle attributes that
|
||||
# start with "assert"
|
||||
self.context.session.assertLogin = mock.MagicMock()
|
||||
self.context.session.assertPerm = mock.MagicMock()
|
||||
self.context.opts = {'HostPrincipalFormat': '-%s-'}
|
||||
self.exports = kojihub.RootExports()
|
||||
self.data_path = os.path.abspath("tests/test_hub/data/rpms")
|
||||
|
||||
def tearDown(self):
|
||||
mock.patch.stopall()
|
||||
|
||||
@mock.patch('kojihub._fetchMulti')
|
||||
@mock.patch('koji.plugin.run_callbacks')
|
||||
@mock.patch('kojihub.get_rpm')
|
||||
@mock.patch('kojihub.get_build')
|
||||
@mock.patch('os.path.isdir')
|
||||
@mock.patch('koji.ensuredir')
|
||||
@mock.patch('kojihub.open')
|
||||
def test_add_rpm_sig_header_signed(
|
||||
self,
|
||||
open,
|
||||
ensuredir,
|
||||
isdir,
|
||||
get_build,
|
||||
get_rpm,
|
||||
run_callbacks,
|
||||
_fetchMulti):
|
||||
"""Test addRPMSig with header-only signed RPM"""
|
||||
_fetchMulti.side_effect = [[]]
|
||||
isdir.side_effect = [True]
|
||||
get_rpm.side_effect = [{
|
||||
'id': 1,
|
||||
'name': 'testpkg',
|
||||
'version': '1.0.0',
|
||||
'release': '1',
|
||||
'arch': 'noarch',
|
||||
'epoch': None,
|
||||
'payloadhash': '1706d0174aa29a5a3e5c60855a778c35',
|
||||
'size': 123,
|
||||
'external_repo_id': None,
|
||||
'build_id': 1,
|
||||
}]
|
||||
open.side_effect = [mock.MagicMock()]
|
||||
|
||||
rpm_path = os.path.join(self.data_path, 'header-signed.rpm')
|
||||
sighdr = koji.rip_rpm_sighdr(rpm_path)
|
||||
|
||||
self.exports.addRPMSig(1, base64.b64encode(sighdr))
|
||||
self.context.session.assertPerm.assert_called_once_with('sign')
|
||||
self.assertEqual(len(self.inserts), 1)
|
||||
insert = self.inserts[0]
|
||||
self.assertEqual(insert.data['rpm_id'], 1)
|
||||
self.assertEqual(insert.data['sigkey'], '15f712be')
|
||||
|
||||
def test_scan_sighdr_header_signed(self):
|
||||
"""Test _scan_sighdr on a header-only signed package"""
|
||||
rpm_path = os.path.join(self.data_path, 'header-signed.rpm')
|
||||
sighdr = koji.rip_rpm_sighdr(rpm_path)
|
||||
|
||||
sigmd5, sig = kojihub._scan_sighdr(sighdr, rpm_path)
|
||||
self.assertEqual(koji.hex_string(sigmd5), '1706d0174aa29a5a3e5c60855a778c35')
|
||||
sigkey = koji.get_sigpacket_key_id(sig)
|
||||
self.assertEqual(sigkey, '15f712be')
|
||||
Loading…
Add table
Add a link
Reference in a new issue