Fix review
This commit is contained in:
parent
9bf31de75f
commit
21a4cc4d70
9 changed files with 213 additions and 181 deletions
|
|
@ -12,7 +12,7 @@ BEGIN;
|
||||||
sigkey TEXT NOT NULL,
|
sigkey TEXT NOT NULL,
|
||||||
checksum TEXT NOT NULL UNIQUE,
|
checksum TEXT NOT NULL UNIQUE,
|
||||||
checksum_type SMALLINT NOT NULL,
|
checksum_type SMALLINT NOT NULL,
|
||||||
UNIQUE(rpm_id, sigkey, checksum_type)
|
PRIMARY KEY (rpm_id, sigkey, checksum_type)
|
||||||
) WITHOUT OIDS;
|
) WITHOUT OIDS;
|
||||||
CREATE INDEX rpm_checksum_rpm_id ON rpm_checksum(rpm_id);
|
CREATE INDEX rpm_checksum_rpm_id ON rpm_checksum(rpm_id);
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
|
||||||
|
|
@ -972,7 +972,7 @@ CREATE TABLE rpm_checksum (
|
||||||
sigkey TEXT NOT NULL,
|
sigkey TEXT NOT NULL,
|
||||||
checksum TEXT NOT NULL UNIQUE,
|
checksum TEXT NOT NULL UNIQUE,
|
||||||
checksum_type SMALLINT NOT NULL,
|
checksum_type SMALLINT NOT NULL,
|
||||||
UNIQUE(rpm_id, sigkey, checksum_type)
|
PRIMARY KEY (rpm_id, sigkey, checksum_type)
|
||||||
) WITHOUT OIDS;
|
) WITHOUT OIDS;
|
||||||
CREATE INDEX rpm_checksum_rpm_id ON rpm_checksum(rpm_id);
|
CREATE INDEX rpm_checksum_rpm_id ON rpm_checksum(rpm_id);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -943,7 +943,8 @@ def get_sighdr_key(sighdr):
|
||||||
return get_sigpacket_key_id(sig)
|
return get_sigpacket_key_id(sig)
|
||||||
|
|
||||||
|
|
||||||
def splice_rpm_sighdr(sighdr, src, dst=None, bufsize=8192):
|
def splice_rpm_sighdr(sighdr, src, dst=None, bufsize=8192, rpm_id=None, sigkey=None,
|
||||||
|
checksum_types=None, callback=None):
|
||||||
"""Write a copy of an rpm with signature header spliced in"""
|
"""Write a copy of an rpm with signature header spliced in"""
|
||||||
(start, size) = find_rpm_sighdr(src)
|
(start, size) = find_rpm_sighdr(src)
|
||||||
if dst is not None:
|
if dst is not None:
|
||||||
|
|
@ -953,6 +954,7 @@ def splice_rpm_sighdr(sighdr, src, dst=None, bufsize=8192):
|
||||||
else:
|
else:
|
||||||
(fd, dst_temp) = tempfile.mkstemp()
|
(fd, dst_temp) = tempfile.mkstemp()
|
||||||
os.close(fd)
|
os.close(fd)
|
||||||
|
chsum_list = {chsum: getattr(hashlib, chsum)() for chsum in checksum_types}
|
||||||
with open(src, 'rb') as src_fo, open(dst_temp, 'wb') as dst_fo:
|
with open(src, 'rb') as src_fo, open(dst_temp, 'wb') as dst_fo:
|
||||||
dst_fo.write(src_fo.read(start))
|
dst_fo.write(src_fo.read(start))
|
||||||
dst_fo.write(sighdr)
|
dst_fo.write(sighdr)
|
||||||
|
|
@ -962,6 +964,8 @@ def splice_rpm_sighdr(sighdr, src, dst=None, bufsize=8192):
|
||||||
if not buf:
|
if not buf:
|
||||||
break
|
break
|
||||||
dst_fo.write(buf)
|
dst_fo.write(buf)
|
||||||
|
for func, chsum in chsum_list.items():
|
||||||
|
chsum.update(buf)
|
||||||
if dst is not None:
|
if dst is not None:
|
||||||
src_stats = os.stat(src)
|
src_stats = os.stat(src)
|
||||||
dst_temp_stats = os.stat(dst_temp)
|
dst_temp_stats = os.stat(dst_temp)
|
||||||
|
|
@ -970,6 +974,9 @@ def splice_rpm_sighdr(sighdr, src, dst=None, bufsize=8192):
|
||||||
os.rename(dst_temp, dst)
|
os.rename(dst_temp, dst)
|
||||||
else:
|
else:
|
||||||
dst = dst_temp
|
dst = dst_temp
|
||||||
|
if callback:
|
||||||
|
callback(src=src, dst=dst, sighdr=sighdr, rpm_id=rpm_id, sigkey=sigkey,
|
||||||
|
chsum_list=chsum_list)
|
||||||
return dst
|
return dst
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7817,6 +7817,8 @@ def delete_rpm_sig(rpminfo, sigkey=None, all_sigs=False):
|
||||||
rpm_id = rinfo['id']
|
rpm_id = rinfo['id']
|
||||||
delete = DeleteProcessor(table='rpmsigs', clauses=clauses, values=locals())
|
delete = DeleteProcessor(table='rpmsigs', clauses=clauses, values=locals())
|
||||||
delete.execute()
|
delete.execute()
|
||||||
|
delete = DeleteProcessor(table='rpm_checksum', clauses=clauses, values=locals())
|
||||||
|
delete.execute()
|
||||||
binfo = get_build(rinfo['build_id'])
|
binfo = get_build(rinfo['build_id'])
|
||||||
builddir = koji.pathinfo.build(binfo)
|
builddir = koji.pathinfo.build(binfo)
|
||||||
list_sigcaches = []
|
list_sigcaches = []
|
||||||
|
|
@ -7968,8 +7970,32 @@ def query_rpm_sigs(rpm_id=None, sigkey=None, queryOpts=None):
|
||||||
return query.execute()
|
return query.execute()
|
||||||
|
|
||||||
|
|
||||||
|
def calculate_chsum(signed_path, checksum_types):
|
||||||
|
chsum_list = {chsum: getattr(hashlib, chsum)() for chsum in checksum_types}
|
||||||
|
try:
|
||||||
|
with open(signed_path, 'rb') as f:
|
||||||
|
while 1:
|
||||||
|
chunk = f.read(1024 ** 2)
|
||||||
|
if not chunk:
|
||||||
|
break
|
||||||
|
for func, chsum in chsum_list.items():
|
||||||
|
chsum.update(chunk)
|
||||||
|
except IOError:
|
||||||
|
raise koji.GenericError(f"RPM path {signed_path} cannot be open.")
|
||||||
|
return chsum_list
|
||||||
|
|
||||||
|
|
||||||
def write_signed_rpm(an_rpm, sigkey, force=False, checksum_types=None):
|
def write_signed_rpm(an_rpm, sigkey, force=False, checksum_types=None):
|
||||||
"""Write a signed copy of the rpm"""
|
"""Write a signed copy of the rpm"""
|
||||||
|
if checksum_types is None:
|
||||||
|
checksum_types = context.opts.get('RPMDefaultChecksums').split()
|
||||||
|
else:
|
||||||
|
if not isinstance(checksum_types, (list, tuple)):
|
||||||
|
raise koji.ParameterError(f'Invalid type of checksum_types: {type(checksum_types)}')
|
||||||
|
for ch_type in checksum_types:
|
||||||
|
if ch_type not in koji.CHECKSUM_TYPES:
|
||||||
|
raise koji.GenericError(f"Checksum_type {ch_type} isn't supported")
|
||||||
|
|
||||||
sigkey = sigkey.lower()
|
sigkey = sigkey.lower()
|
||||||
rinfo = get_rpm(an_rpm, strict=True)
|
rinfo = get_rpm(an_rpm, strict=True)
|
||||||
if rinfo['external_repo_id']:
|
if rinfo['external_repo_id']:
|
||||||
|
|
@ -7995,6 +8021,8 @@ def write_signed_rpm(an_rpm, sigkey, force=False, checksum_types=None):
|
||||||
if os.path.exists(signedpath):
|
if os.path.exists(signedpath):
|
||||||
if not force:
|
if not force:
|
||||||
# already present
|
# already present
|
||||||
|
chsum_list = calculate_chsum(signedpath, checksum_types)
|
||||||
|
create_rpm_checksum(rpm_id=rpm_id, sigkey=sigkey, chsum_list=chsum_list)
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
os.unlink(signedpath)
|
os.unlink(signedpath)
|
||||||
|
|
@ -8002,10 +8030,8 @@ def write_signed_rpm(an_rpm, sigkey, force=False, checksum_types=None):
|
||||||
with open(sigpath, 'rb') as fo:
|
with open(sigpath, 'rb') as fo:
|
||||||
sighdr = fo.read()
|
sighdr = fo.read()
|
||||||
koji.ensuredir(os.path.dirname(signedpath))
|
koji.ensuredir(os.path.dirname(signedpath))
|
||||||
koji.splice_rpm_sighdr(sighdr, rpm_path, signedpath)
|
koji.splice_rpm_sighdr(sighdr, rpm_path, signedpath, rpm_id=rpm_id, sigkey=sigkey,
|
||||||
if not checksum_types:
|
checksum_types=checksum_types, callback=create_rpm_checksum)
|
||||||
checksum_types = context.opts.get('RPMDefaultChecksums').split()
|
|
||||||
create_rpm_checksum(rinfo['id'], sigkey, checksum_types=checksum_types)
|
|
||||||
|
|
||||||
|
|
||||||
def query_history(tables=None, **kwargs):
|
def query_history(tables=None, **kwargs):
|
||||||
|
|
@ -8592,6 +8618,9 @@ def _delete_build(binfo):
|
||||||
delete = DeleteProcessor(table='rpmsigs', clauses=['rpm_id=%(rpm_id)i'],
|
delete = DeleteProcessor(table='rpmsigs', clauses=['rpm_id=%(rpm_id)i'],
|
||||||
values={'rpm_id': rpm_id})
|
values={'rpm_id': rpm_id})
|
||||||
delete.execute()
|
delete.execute()
|
||||||
|
delete = DeleteProcessor(table='rpm_checksum', clauses=['rpm_id=%(rpm_id)i'],
|
||||||
|
values={'rpm_id': rpm_id})
|
||||||
|
delete.execute()
|
||||||
values = {'build_id': build_id}
|
values = {'build_id': build_id}
|
||||||
update = UpdateProcessor('tag_listing', clauses=["build_id=%(build_id)i"], values=values)
|
update = UpdateProcessor('tag_listing', clauses=["build_id=%(build_id)i"], values=values)
|
||||||
update.make_revoke()
|
update.make_revoke()
|
||||||
|
|
@ -13655,7 +13684,7 @@ class RootExports(object):
|
||||||
values=locals(), opts=queryOpts)
|
values=locals(), opts=queryOpts)
|
||||||
return query.iterate()
|
return query.iterate()
|
||||||
|
|
||||||
def getRPMChecksums(self, rpm_id, checksum_types=None, cacheonly=False):
|
def getRPMChecksums(self, rpm_id, checksum_types=None, cacheonly=False, strict=False):
|
||||||
"""Returns RPM checksums for specific rpm.
|
"""Returns RPM checksums for specific rpm.
|
||||||
|
|
||||||
:param int rpm_id: RPM id
|
:param int rpm_id: RPM id
|
||||||
|
|
@ -13680,7 +13709,10 @@ class RootExports(object):
|
||||||
clauses=['rpm_id=%(rpm_id)i'], values={'rpm_id': rpm_id})
|
clauses=['rpm_id=%(rpm_id)i'], values={'rpm_id': rpm_id})
|
||||||
sigkeys = [r['sigkey'] for r in query.execute()]
|
sigkeys = [r['sigkey'] for r in query.execute()]
|
||||||
if not sigkeys:
|
if not sigkeys:
|
||||||
|
if strict:
|
||||||
raise koji.GenericError(f'No cached signature for rpm ID {rpm_id}.')
|
raise koji.GenericError(f'No cached signature for rpm ID {rpm_id}.')
|
||||||
|
else:
|
||||||
|
return {}
|
||||||
list_checksums_sigkeys = {s: set(checksum_types) for s in sigkeys}
|
list_checksums_sigkeys = {s: set(checksum_types) for s in sigkeys}
|
||||||
|
|
||||||
checksum_type_int = [koji.CHECKSUM_TYPES[chsum] for chsum in checksum_types]
|
checksum_type_int = [koji.CHECKSUM_TYPES[chsum] for chsum in checksum_types]
|
||||||
|
|
@ -13701,7 +13733,7 @@ class RootExports(object):
|
||||||
koji.CHECKSUM_TYPES[r['checksum_type']])
|
koji.CHECKSUM_TYPES[r['checksum_type']])
|
||||||
|
|
||||||
for sigkey, chsums in missing_chsum_sigkeys.items():
|
for sigkey, chsums in missing_chsum_sigkeys.items():
|
||||||
write_signed_rpm(rpm_id, sigkey, force=True, checksum_types=list(chsums))
|
write_signed_rpm(rpm_id, sigkey, checksum_types=list(chsums))
|
||||||
query_result = query_checksum.execute()
|
query_result = query_checksum.execute()
|
||||||
return create_rpm_checksums_output(query_result, list_checksums_sigkeys)
|
return create_rpm_checksums_output(query_result, list_checksums_sigkeys)
|
||||||
|
|
||||||
|
|
@ -15507,32 +15539,18 @@ def create_rpm_checksums_output(query_result, list_chsum_sigkeys):
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
def create_rpm_checksum(rpm_id, sigkey, checksum_types=None):
|
def create_rpm_checksum(**kwargs):
|
||||||
"""Creates RPM checksum.
|
"""Creates RPM checksum.
|
||||||
|
|
||||||
:param int rpm_id: RPM id
|
:param int rpm_id: RPM id
|
||||||
:param string sigkey: Sigkey for specific RPM
|
:param string sigkey: Sigkey for specific RPM
|
||||||
:param list checksum_type: List of checksum types.
|
:param list checksum_type: List of checksum types.
|
||||||
"""
|
"""
|
||||||
if checksum_types is None:
|
chsum_list = kwargs.get('chsum_list')
|
||||||
checksum_types = context.opts.get('RPMDefaultChecksums').split()
|
sigkey = kwargs.get('sigkey')
|
||||||
else:
|
rpm_id = kwargs.get('rpm_id')
|
||||||
if not isinstance(checksum_types, (list, tuple)):
|
|
||||||
raise koji.ParameterError(f'Invalid type of checksum_types: {type(checksum_types)}')
|
checksum_type_int = [koji.CHECKSUM_TYPES[func] for func, _ in chsum_list.items()]
|
||||||
for ch_type in checksum_types:
|
|
||||||
if ch_type not in koji.CHECKSUM_TYPES:
|
|
||||||
raise koji.GenericError(f"Checksum_type {ch_type} isn't supported")
|
|
||||||
rinfo = get_rpm(rpm_id)
|
|
||||||
nvra = "%(name)s-%(version)s-%(release)s.%(arch)s" % rinfo
|
|
||||||
if rinfo['external_repo_id']:
|
|
||||||
raise koji.GenericError(f"Not an internal rpm: {nvra} "
|
|
||||||
f"(from {rinfo['external_repo_name']})")
|
|
||||||
query = QueryProcessor(tables=['rpmsigs'], clauses=['rpm_id=%(rpm_id)i', 'sigkey=%(sigkey)s'],
|
|
||||||
values={'rpm_id': rpm_id, 'sigkey': sigkey}, opts={'countOnly': True})
|
|
||||||
sighash = query.singleValue(strict=False)
|
|
||||||
if not sighash:
|
|
||||||
raise koji.GenericError(f"There is no rpm {nvra} signed with {sigkey}")
|
|
||||||
checksum_type_int = [koji.CHECKSUM_TYPES[chsum] for chsum in checksum_types]
|
|
||||||
query = QueryProcessor(tables=['rpm_checksum'], columns=['checksum_type'],
|
query = QueryProcessor(tables=['rpm_checksum'], columns=['checksum_type'],
|
||||||
clauses=["checksum_type IN %(checksum_types)s", 'sigkey=%(sigkey)s'],
|
clauses=["checksum_type IN %(checksum_types)s", 'sigkey=%(sigkey)s'],
|
||||||
values={'checksum_types': checksum_type_int, 'sigkey': sigkey})
|
values={'checksum_types': checksum_type_int, 'sigkey': sigkey})
|
||||||
|
|
@ -15542,22 +15560,7 @@ def create_rpm_checksum(rpm_id, sigkey, checksum_types=None):
|
||||||
else:
|
else:
|
||||||
for r in rows:
|
for r in rows:
|
||||||
if r['checksum_type'] in checksum_type_int:
|
if r['checksum_type'] in checksum_type_int:
|
||||||
checksum_types.remove(koji.CHECKSUM_TYPES[r['checksum_type']])
|
del chsum_list[koji.CHECKSUM_TYPES[r['checksum_type']]]
|
||||||
buildinfo = get_build(rinfo['build_id'])
|
|
||||||
rpm_path = joinpath(koji.pathinfo.build(buildinfo), koji.pathinfo.signed(rinfo, sigkey))
|
|
||||||
chsum_list = {chsum: getattr(hashlib, chsum)() for chsum in checksum_types}
|
|
||||||
|
|
||||||
try:
|
|
||||||
with open(rpm_path, 'rb') as f:
|
|
||||||
while 1:
|
|
||||||
chunk = f.read(1024**2)
|
|
||||||
if not chunk:
|
|
||||||
break
|
|
||||||
for func, chsum in chsum_list.items():
|
|
||||||
chsum.update(chunk)
|
|
||||||
except IOError:
|
|
||||||
raise koji.GenericError(f"RPM path {rpm_path} cannot be open.")
|
|
||||||
|
|
||||||
if chsum_list:
|
if chsum_list:
|
||||||
insert = BulkInsertProcessor(table='rpm_checksum')
|
insert = BulkInsertProcessor(table='rpm_checksum')
|
||||||
for func, chsum in chsum_list.items():
|
for func, chsum in chsum_list.items():
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,6 @@ import unittest
|
||||||
import mock
|
import mock
|
||||||
import six
|
import six
|
||||||
|
|
||||||
import koji
|
|
||||||
import kojihub
|
import kojihub
|
||||||
|
|
||||||
QP = kojihub.QueryProcessor
|
QP = kojihub.QueryProcessor
|
||||||
|
|
@ -22,19 +21,7 @@ class TestCreateRPMChecksum(unittest.TestCase):
|
||||||
self.QueryProcessor = mock.patch('kojihub.kojihub.QueryProcessor',
|
self.QueryProcessor = mock.patch('kojihub.kojihub.QueryProcessor',
|
||||||
side_effect=self.getQuery).start()
|
side_effect=self.getQuery).start()
|
||||||
self.queries = []
|
self.queries = []
|
||||||
self.get_rpm = mock.patch('kojihub.kojihub.get_rpm').start()
|
|
||||||
self.get_build = mock.patch('kojihub.kojihub.get_build').start()
|
|
||||||
self. rpm_info = {'id': 123, 'name': 'test-rpm', 'external_repo_id': 0, 'version': '11',
|
|
||||||
'release': '222', 'arch': 'noarch', 'build_id': 222}
|
|
||||||
self.rpm_id = 123
|
|
||||||
self.sigkey = 'test-sigkey'
|
|
||||||
self.query_execute = mock.MagicMock()
|
self.query_execute = mock.MagicMock()
|
||||||
self.single_value = mock.MagicMock()
|
|
||||||
self.path_signed = mock.patch('koji.pathinfo.signed').start()
|
|
||||||
self.path_build = mock.patch('koji.pathinfo.build').start()
|
|
||||||
self.context = mock.patch('kojihub.kojihub.context').start()
|
|
||||||
self.context.session.assertPerm = mock.MagicMock()
|
|
||||||
self.context.opts = {'RPMDefaultChecksums': 'md5 sha256'}
|
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
mock.patch.stopall()
|
mock.patch.stopall()
|
||||||
|
|
@ -42,90 +29,21 @@ class TestCreateRPMChecksum(unittest.TestCase):
|
||||||
def getQuery(self, *args, **kwargs):
|
def getQuery(self, *args, **kwargs):
|
||||||
query = QP(*args, **kwargs)
|
query = QP(*args, **kwargs)
|
||||||
query.execute = self.query_execute
|
query.execute = self.query_execute
|
||||||
query.singleValue = self.single_value
|
|
||||||
self.queries.append(query)
|
self.queries.append(query)
|
||||||
return query
|
return query
|
||||||
|
|
||||||
def test_checksum_type_not_string(self):
|
def test_checksum_exists(self):
|
||||||
checksum_types = 'type-1 type-2'
|
src = 'test-src'
|
||||||
with self.assertRaises(koji.GenericError) as ex:
|
dst = 'test-dst'
|
||||||
kojihub.create_rpm_checksum(self.rpm_id, self.sigkey, checksum_types=checksum_types)
|
rpm_id = 123
|
||||||
self.assertEqual(f'Invalid type of checksum_types: {type(checksum_types)}',
|
chsum_list = {'md5': 'chsum-1', 'sha256': 'chsum-2'}
|
||||||
str(ex.exception))
|
sigkey = 'test-sigkey'
|
||||||
|
self.query_execute.return_value = [{'checksum_type': 'md5'}, {'checksum_type': 'sha256'}]
|
||||||
def test_checksum_type_bad_type(self):
|
result = kojihub.create_rpm_checksum(src, dst, rpm_id, sigkey, chsum_list)
|
||||||
checksum_types = ['md5', 'type-1']
|
self.assertIsNone(result)
|
||||||
with self.assertRaises(koji.GenericError) as ex:
|
|
||||||
kojihub.create_rpm_checksum(self.rpm_id, self.sigkey, checksum_types=checksum_types)
|
|
||||||
self.assertEqual("Checksum_type type-1 isn't supported", str(ex.exception))
|
|
||||||
|
|
||||||
def test_external_rpm(self):
|
|
||||||
ext_rpm_info = {'id': 123, 'name': 'test-rpm', 'external_repo_id': 125,
|
|
||||||
'external_repo_name': 'test-ext-rpm', 'version': '11',
|
|
||||||
'release': '222', 'arch': 'noarch'}
|
|
||||||
self.get_rpm.return_value = ext_rpm_info
|
|
||||||
nvra = "%(name)s-%(version)s-%(release)s.%(arch)s" % ext_rpm_info
|
|
||||||
with self.assertRaises(koji.GenericError) as ex:
|
|
||||||
kojihub.create_rpm_checksum(self.rpm_id, self.sigkey)
|
|
||||||
self.assertEqual(f'Not an internal rpm: {nvra} (from test-ext-rpm)', str(ex.exception))
|
|
||||||
|
|
||||||
def test_not_sigkey_related_to_rpm(self):
|
|
||||||
self.get_rpm.return_value = self.rpm_info
|
|
||||||
checksum_types = ['md5', 'sha256']
|
|
||||||
nvra = "%(name)s-%(version)s-%(release)s.%(arch)s" % self.rpm_info
|
|
||||||
expected_err = f'There is no rpm {nvra} signed with {self.sigkey}'
|
|
||||||
self.single_value.return_value = None
|
|
||||||
with self.assertRaises(koji.GenericError) as ex:
|
|
||||||
kojihub.create_rpm_checksum(self.rpm_id, self.sigkey, checksum_types=checksum_types)
|
|
||||||
self.assertEqual(expected_err % self.rpm_info, str(ex.exception))
|
|
||||||
|
|
||||||
self.assertEqual(len(self.queries), 1)
|
self.assertEqual(len(self.queries), 1)
|
||||||
query = self.queries[0]
|
query = self.queries[0]
|
||||||
self.assertEqual(query.tables, ['rpmsigs'])
|
|
||||||
self.assertEqual(query.joins, None)
|
|
||||||
self.assertEqual(query.clauses, ['rpm_id=%(rpm_id)i', 'sigkey=%(sigkey)s'])
|
|
||||||
|
|
||||||
def test_checksum_exists(self):
|
|
||||||
self.get_rpm.return_value = self.rpm_info
|
|
||||||
self.single_value.return_value = 'test-sighash'
|
|
||||||
self.query_execute.return_value = [{'checksum_type': 'md5'}, {'checksum_type': 'sha256'}]
|
|
||||||
result = kojihub.create_rpm_checksum(self.rpm_id, self.sigkey)
|
|
||||||
self.assertIsNone(result)
|
|
||||||
|
|
||||||
self.assertEqual(len(self.queries), 2)
|
|
||||||
query = self.queries[0]
|
|
||||||
self.assertEqual(query.tables, ['rpmsigs'])
|
|
||||||
self.assertEqual(query.joins, None)
|
|
||||||
self.assertEqual(query.clauses, ['rpm_id=%(rpm_id)i', 'sigkey=%(sigkey)s'])
|
|
||||||
|
|
||||||
query = self.queries[1]
|
|
||||||
self.assertEqual(query.tables, ['rpm_checksum'])
|
|
||||||
self.assertEqual(query.joins, None)
|
|
||||||
self.assertEqual(query.clauses,
|
|
||||||
["checksum_type IN %(checksum_types)s", "sigkey=%(sigkey)s"])
|
|
||||||
|
|
||||||
@mock_open()
|
|
||||||
def test_cannot_open_file(self, m_open):
|
|
||||||
self.get_rpm.return_value = self.rpm_info
|
|
||||||
self.get_build.return_value = {'build_id': 222}
|
|
||||||
self.single_value.return_value = 'test-sighash'
|
|
||||||
self.query_execute.return_value = [{'checksum_type': 'md5'}]
|
|
||||||
self.path_build.return_value = 'fakebuildpath'
|
|
||||||
self.path_signed.return_value = 'fakesignedpath'
|
|
||||||
m_open.side_effect = IOError()
|
|
||||||
|
|
||||||
with self.assertRaises(koji.GenericError) as ex:
|
|
||||||
kojihub.create_rpm_checksum(self.rpm_id, self.sigkey)
|
|
||||||
self.assertEqual("RPM path fakebuildpath/fakesignedpath cannot be open.",
|
|
||||||
str(ex.exception))
|
|
||||||
|
|
||||||
self.assertEqual(len(self.queries), 2)
|
|
||||||
query = self.queries[0]
|
|
||||||
self.assertEqual(query.tables, ['rpmsigs'])
|
|
||||||
self.assertEqual(query.joins, None)
|
|
||||||
self.assertEqual(query.clauses, ['rpm_id=%(rpm_id)i', 'sigkey=%(sigkey)s'])
|
|
||||||
|
|
||||||
query = self.queries[1]
|
|
||||||
self.assertEqual(query.tables, ['rpm_checksum'])
|
self.assertEqual(query.tables, ['rpm_checksum'])
|
||||||
self.assertEqual(query.joins, None)
|
self.assertEqual(query.joins, None)
|
||||||
self.assertEqual(query.clauses,
|
self.assertEqual(query.clauses,
|
||||||
|
|
|
||||||
|
|
@ -6,13 +6,58 @@ from collections import defaultdict
|
||||||
import koji
|
import koji
|
||||||
import kojihub
|
import kojihub
|
||||||
|
|
||||||
|
DP = kojihub.DeleteProcessor
|
||||||
|
QP = kojihub.QueryProcessor
|
||||||
|
UP = kojihub.UpdateProcessor
|
||||||
|
|
||||||
|
|
||||||
class TestDeleteBuild(unittest.TestCase):
|
class TestDeleteBuild(unittest.TestCase):
|
||||||
|
|
||||||
@mock.patch('kojihub.kojihub.context')
|
def getDelete(self, *args, **kwargs):
|
||||||
@mock.patch('kojihub.kojihub.get_build')
|
delete = DP(*args, **kwargs)
|
||||||
def test_delete_build_raise_error(self, build, context):
|
delete.execute = mock.MagicMock()
|
||||||
context.session.assertPerm = mock.MagicMock()
|
self.deletes.append(delete)
|
||||||
|
return delete
|
||||||
|
|
||||||
|
def getQuery(self, *args, **kwargs):
|
||||||
|
query = QP(*args, **kwargs)
|
||||||
|
query.execute = self.query_execute
|
||||||
|
self.queries.append(query)
|
||||||
|
return query
|
||||||
|
|
||||||
|
def getUpdate(self, *args, **kwargs):
|
||||||
|
update = UP(*args, **kwargs)
|
||||||
|
update.execute = mock.MagicMock()
|
||||||
|
self.updates.append(update)
|
||||||
|
return update
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.DeleteProcessor = mock.patch('kojihub.kojihub.DeleteProcessor',
|
||||||
|
side_effect=self.getDelete).start()
|
||||||
|
self.deletes = []
|
||||||
|
self.QueryProcessor = mock.patch('kojihub.kojihub.QueryProcessor',
|
||||||
|
side_effect=self.getQuery).start()
|
||||||
|
self.queries = []
|
||||||
|
self.query_execute = mock.MagicMock()
|
||||||
|
self.UpdateProcessor = mock.patch('kojihub.kojihub.UpdateProcessor',
|
||||||
|
side_effect=self.getUpdate).start()
|
||||||
|
self.updates = []
|
||||||
|
self.context_db = mock.patch('koji.db.context').start()
|
||||||
|
self.context_db.session.assertLogin = mock.MagicMock()
|
||||||
|
self.context_db.event_id = 42
|
||||||
|
self.context_db.session.user_id = 24
|
||||||
|
self.get_build = mock.patch('kojihub.kojihub.get_build').start()
|
||||||
|
self._delete_build = mock.patch('kojihub.kojihub._delete_build').start()
|
||||||
|
self.get_user = mock.patch('kojihub.kojihub.get_user').start()
|
||||||
|
self.context = mock.patch('kojihub.kojihub.context').start()
|
||||||
|
self.context.session.assertPerm = mock.MagicMock()
|
||||||
|
self.binfo = {'id': 'BUILD ID', 'state': koji.BUILD_STATES['COMPLETE'], 'name': 'test_nvr',
|
||||||
|
'nvr': 'test_nvr-3.3-20.el8', 'version': '3.3', 'release': '20'}
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
mock.patch.stopall()
|
||||||
|
|
||||||
|
def test_delete_build_raise_error(self):
|
||||||
references = ['tags', 'rpms', 'archives', 'component_of']
|
references = ['tags', 'rpms', 'archives', 'component_of']
|
||||||
for ref in references:
|
for ref in references:
|
||||||
context = mock.MagicMock()
|
context = mock.MagicMock()
|
||||||
|
|
@ -25,10 +70,7 @@ class TestDeleteBuild(unittest.TestCase):
|
||||||
with self.assertRaises(koji.GenericError):
|
with self.assertRaises(koji.GenericError):
|
||||||
kojihub.delete_build(build='', strict=True)
|
kojihub.delete_build(build='', strict=True)
|
||||||
|
|
||||||
@mock.patch('kojihub.kojihub.context')
|
def test_delete_build_return_false(self):
|
||||||
@mock.patch('kojihub.kojihub.get_build')
|
|
||||||
def test_delete_build_return_false(self, build, context):
|
|
||||||
context.session.assertPerm = mock.MagicMock()
|
|
||||||
references = ['tags', 'rpms', 'archives', 'component_of']
|
references = ['tags', 'rpms', 'archives', 'component_of']
|
||||||
for ref in references:
|
for ref in references:
|
||||||
context = mock.MagicMock()
|
context = mock.MagicMock()
|
||||||
|
|
@ -40,10 +82,7 @@ class TestDeleteBuild(unittest.TestCase):
|
||||||
refs.return_value = retval
|
refs.return_value = retval
|
||||||
assert kojihub.delete_build(build='', strict=False) is False
|
assert kojihub.delete_build(build='', strict=False) is False
|
||||||
|
|
||||||
@mock.patch('kojihub.kojihub.context')
|
def test_delete_build_check_last_used_raise_error(self):
|
||||||
@mock.patch('kojihub.kojihub.get_build')
|
|
||||||
def test_delete_build_check_last_used_raise_error(self, build, context):
|
|
||||||
context.session.assertPerm = mock.MagicMock()
|
|
||||||
references = ['tags', 'rpms', 'archives', 'component_of', 'last_used']
|
references = ['tags', 'rpms', 'archives', 'component_of', 'last_used']
|
||||||
for ref in references:
|
for ref in references:
|
||||||
context = mock.MagicMock()
|
context = mock.MagicMock()
|
||||||
|
|
@ -56,22 +95,52 @@ class TestDeleteBuild(unittest.TestCase):
|
||||||
refs.return_value = retval
|
refs.return_value = retval
|
||||||
self.assertFalse(kojihub.delete_build(build='', strict=False))
|
self.assertFalse(kojihub.delete_build(build='', strict=False))
|
||||||
|
|
||||||
@mock.patch('kojihub.kojihub.get_user')
|
|
||||||
@mock.patch('kojihub.kojihub._delete_build')
|
|
||||||
@mock.patch('kojihub.kojihub.build_references')
|
@mock.patch('kojihub.kojihub.build_references')
|
||||||
@mock.patch('kojihub.kojihub.context')
|
def test_delete_build_lazy_refs(self, buildrefs):
|
||||||
@mock.patch('kojihub.kojihub.get_build')
|
|
||||||
def test_delete_build_lazy_refs(self, build, context, buildrefs, _delete, get_user):
|
|
||||||
'''Test that we can handle lazy return from build_references'''
|
'''Test that we can handle lazy return from build_references'''
|
||||||
get_user.return_value = {'authtype': 2, 'id': 1, 'krb_principal': None,
|
self.get_user.return_value = {'authtype': 2, 'id': 1, 'krb_principal': None,
|
||||||
'krb_principals': [], 'name': 'kojiadmin', 'status': 0,
|
'krb_principals': [], 'name': 'kojiadmin', 'status': 0,
|
||||||
'usertype': 0}
|
'usertype': 0}
|
||||||
context.session.assertPerm = mock.MagicMock()
|
|
||||||
buildrefs.return_value = {'tags': []}
|
buildrefs.return_value = {'tags': []}
|
||||||
binfo = {'id': 'BUILD ID', 'state': koji.BUILD_STATES['COMPLETE'],
|
self.get_build.return_value = self.binfo
|
||||||
'nvr': 'test_nvr-3.3-20.el8'}
|
kojihub.delete_build(build=self.binfo, strict=True)
|
||||||
build.return_value = binfo
|
|
||||||
kojihub.delete_build(build=binfo, strict=True)
|
|
||||||
|
|
||||||
# no build refs, so we should have called _delete_build
|
# no build refs, so we should have called _delete_build
|
||||||
_delete.assert_called_with(binfo)
|
self._delete_build.assert_called_with(self.binfo)
|
||||||
|
|
||||||
|
def test_delete_build_queries(self):
|
||||||
|
self.query_execute.return_value = [(123, )]
|
||||||
|
|
||||||
|
kojihub._delete_build(self.binfo)
|
||||||
|
|
||||||
|
self.assertEqual(len(self.queries), 1)
|
||||||
|
query = self.queries[0]
|
||||||
|
self.assertEqual(query.tables, ['rpminfo'])
|
||||||
|
self.assertEqual(query.joins, None)
|
||||||
|
self.assertEqual(query.clauses, ['build_id=%(build_id)i'])
|
||||||
|
self.assertEqual(query.columns, ['id'])
|
||||||
|
|
||||||
|
self.assertEqual(len(self.deletes), 2)
|
||||||
|
delete = self.deletes[0]
|
||||||
|
self.assertEqual(delete.table, 'rpmsigs')
|
||||||
|
self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)i"])
|
||||||
|
|
||||||
|
delete = self.deletes[1]
|
||||||
|
self.assertEqual(delete.table, 'rpm_checksum')
|
||||||
|
self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)i"])
|
||||||
|
|
||||||
|
self.assertEqual(len(self.updates), 2)
|
||||||
|
update = self.updates[0]
|
||||||
|
self.assertEqual(update.table, 'tag_listing')
|
||||||
|
self.assertEqual(update.values, {'build_id': self.binfo['id']})
|
||||||
|
self.assertEqual(update.data, {'revoke_event': 42, 'revoker_id': 24})
|
||||||
|
self.assertEqual(update.rawdata, {'active': 'NULL'})
|
||||||
|
self.assertEqual(update.clauses, ["build_id=%(build_id)i", 'active = TRUE'])
|
||||||
|
|
||||||
|
update = self.updates[1]
|
||||||
|
self.assertEqual(update.table, 'build')
|
||||||
|
self.assertEqual(update.values, {'build_id': self.binfo['id']})
|
||||||
|
self.assertEqual(update.data, {'state': 2})
|
||||||
|
self.assertEqual(update.rawdata, {})
|
||||||
|
self.assertEqual(update.clauses, ['id=%(build_id)i'])
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -119,9 +119,15 @@ class TestDeleteRPMSig(unittest.TestCase):
|
||||||
self.query_rpm_sigs.return_value = self.queryrpmsigs
|
self.query_rpm_sigs.return_value = self.queryrpmsigs
|
||||||
r = kojihub.delete_rpm_sig(rpminfo, sigkey='testkey')
|
r = kojihub.delete_rpm_sig(rpminfo, sigkey='testkey')
|
||||||
self.assertEqual(r, None)
|
self.assertEqual(r, None)
|
||||||
|
|
||||||
|
self.assertEqual(len(self.deletes), 2)
|
||||||
delete = self.deletes[0]
|
delete = self.deletes[0]
|
||||||
self.assertEqual(delete.table, 'rpmsigs')
|
self.assertEqual(delete.table, 'rpmsigs')
|
||||||
self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)i", "sigkey=%(sigkey)s"])
|
self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)i", "sigkey=%(sigkey)s"])
|
||||||
|
|
||||||
|
delete = self.deletes[1]
|
||||||
|
self.assertEqual(delete.table, 'rpm_checksum')
|
||||||
|
self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)i", "sigkey=%(sigkey)s"])
|
||||||
self.get_rpm.assert_called_once_with(rpminfo, strict=True)
|
self.get_rpm.assert_called_once_with(rpminfo, strict=True)
|
||||||
self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey='testkey')
|
self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey='testkey')
|
||||||
self.get_build.assert_called_once_with(self.rinfo['build_id'])
|
self.get_build.assert_called_once_with(self.rinfo['build_id'])
|
||||||
|
|
@ -138,9 +144,15 @@ class TestDeleteRPMSig(unittest.TestCase):
|
||||||
with self.assertRaises(koji.GenericError) as ex:
|
with self.assertRaises(koji.GenericError) as ex:
|
||||||
kojihub.delete_rpm_sig(rpminfo, all_sigs=True)
|
kojihub.delete_rpm_sig(rpminfo, all_sigs=True)
|
||||||
self.assertEqual(ex.exception.args[0], expected_msg)
|
self.assertEqual(ex.exception.args[0], expected_msg)
|
||||||
|
|
||||||
|
self.assertEqual(len(self.deletes), 2)
|
||||||
delete = self.deletes[0]
|
delete = self.deletes[0]
|
||||||
self.assertEqual(delete.table, 'rpmsigs')
|
self.assertEqual(delete.table, 'rpmsigs')
|
||||||
self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)i"])
|
self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)i"])
|
||||||
|
|
||||||
|
delete = self.deletes[1]
|
||||||
|
self.assertEqual(delete.table, 'rpm_checksum')
|
||||||
|
self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)i"])
|
||||||
self.get_rpm.assert_called_once_with(rpminfo, strict=True)
|
self.get_rpm.assert_called_once_with(rpminfo, strict=True)
|
||||||
self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey=None)
|
self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey=None)
|
||||||
self.get_build.assert_called_once_with(self.rinfo['build_id'])
|
self.get_build.assert_called_once_with(self.rinfo['build_id'])
|
||||||
|
|
@ -154,10 +166,15 @@ class TestDeleteRPMSig(unittest.TestCase):
|
||||||
self.get_user.return_value = self.userinfo
|
self.get_user.return_value = self.userinfo
|
||||||
self.query_rpm_sigs.return_value = self.queryrpmsigs
|
self.query_rpm_sigs.return_value = self.queryrpmsigs
|
||||||
kojihub.delete_rpm_sig(rpminfo, all_sigs=True)
|
kojihub.delete_rpm_sig(rpminfo, all_sigs=True)
|
||||||
self.assertEqual(len(self.deletes), 1)
|
|
||||||
|
self.assertEqual(len(self.deletes), 2)
|
||||||
delete = self.deletes[0]
|
delete = self.deletes[0]
|
||||||
self.assertEqual(delete.table, 'rpmsigs')
|
self.assertEqual(delete.table, 'rpmsigs')
|
||||||
self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)i"])
|
self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)i"])
|
||||||
|
|
||||||
|
delete = self.deletes[1]
|
||||||
|
self.assertEqual(delete.table, 'rpm_checksum')
|
||||||
|
self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)i"])
|
||||||
self.get_rpm.assert_called_once_with(rpminfo, strict=True)
|
self.get_rpm.assert_called_once_with(rpminfo, strict=True)
|
||||||
self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey=None)
|
self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey=None)
|
||||||
self.get_build.assert_called_once_with(self.rinfo['build_id'])
|
self.get_build.assert_called_once_with(self.rinfo['build_id'])
|
||||||
|
|
|
||||||
|
|
@ -71,12 +71,25 @@ class TestGetRpmChecksums(unittest.TestCase):
|
||||||
['checksum_type IN %(checksum_type)s', 'rpm_id=%(rpm_id)i'])
|
['checksum_type IN %(checksum_type)s', 'rpm_id=%(rpm_id)i'])
|
||||||
self.assertEqual(expected_result, result)
|
self.assertEqual(expected_result, result)
|
||||||
|
|
||||||
def test_missing_checksum_not_sigkey(self):
|
def test_missing_checksum_not_sigkey_without_strict(self):
|
||||||
|
rpm_id = 123
|
||||||
|
checksum_types = ['md5']
|
||||||
|
self.query_execute.side_effect = [[], []]
|
||||||
|
result = self.exports.getRPMChecksums(rpm_id, checksum_types=checksum_types)
|
||||||
|
self.assertEqual({}, result)
|
||||||
|
|
||||||
|
self.assertEqual(len(self.queries), 1)
|
||||||
|
query = self.queries[0]
|
||||||
|
self.assertEqual(query.tables, ['rpmsigs'])
|
||||||
|
self.assertEqual(query.joins, None)
|
||||||
|
self.assertEqual(query.clauses, ['rpm_id=%(rpm_id)i'])
|
||||||
|
|
||||||
|
def test_missing_checksum_not_sigkey_with_strict(self):
|
||||||
rpm_id = 123
|
rpm_id = 123
|
||||||
checksum_types = ['md5']
|
checksum_types = ['md5']
|
||||||
self.query_execute.side_effect = [[], []]
|
self.query_execute.side_effect = [[], []]
|
||||||
with self.assertRaises(koji.GenericError) as ex:
|
with self.assertRaises(koji.GenericError) as ex:
|
||||||
self.exports.getRPMChecksums(rpm_id, checksum_types=checksum_types)
|
self.exports.getRPMChecksums(rpm_id, checksum_types=checksum_types, strict=True)
|
||||||
self.assertEqual(f'No cached signature for rpm ID {rpm_id}.', str(ex.exception))
|
self.assertEqual(f'No cached signature for rpm ID {rpm_id}.', str(ex.exception))
|
||||||
|
|
||||||
self.assertEqual(len(self.queries), 1)
|
self.assertEqual(len(self.queries), 1)
|
||||||
|
|
|
||||||
|
|
@ -87,7 +87,7 @@ class TestResetBuild(unittest.TestCase):
|
||||||
self.assertEqual(update.rawdata, {})
|
self.assertEqual(update.rawdata, {})
|
||||||
self.assertEqual(update.clauses, ['id=%(id)s'])
|
self.assertEqual(update.clauses, ['id=%(id)s'])
|
||||||
|
|
||||||
self.assertEqual(len(self.deletes), 17)
|
self.assertEqual(len(self.deletes), 18)
|
||||||
delete = self.deletes[0]
|
delete = self.deletes[0]
|
||||||
self.assertEqual(delete.table, 'rpmsigs')
|
self.assertEqual(delete.table, 'rpmsigs')
|
||||||
self.assertEqual(delete.clauses, ['rpm_id=%(rpm_id)i'])
|
self.assertEqual(delete.clauses, ['rpm_id=%(rpm_id)i'])
|
||||||
|
|
@ -109,66 +109,71 @@ class TestResetBuild(unittest.TestCase):
|
||||||
self.assertEqual(delete.values, {'id': self.binfo['build_id']})
|
self.assertEqual(delete.values, {'id': self.binfo['build_id']})
|
||||||
|
|
||||||
delete = self.deletes[4]
|
delete = self.deletes[4]
|
||||||
|
self.assertEqual(delete.table, 'rpm_checksum')
|
||||||
|
self.assertEqual(delete.clauses, ['rpm_id=%(rpm_id)i'])
|
||||||
|
self.assertEqual(delete.values, {'rpm_id': 123})
|
||||||
|
|
||||||
|
delete = self.deletes[5]
|
||||||
self.assertEqual(delete.table, 'maven_archives')
|
self.assertEqual(delete.table, 'maven_archives')
|
||||||
self.assertEqual(delete.clauses, ['archive_id=%(archive_id)i'])
|
self.assertEqual(delete.clauses, ['archive_id=%(archive_id)i'])
|
||||||
self.assertEqual(delete.values, {'archive_id': 9999})
|
self.assertEqual(delete.values, {'archive_id': 9999})
|
||||||
|
|
||||||
delete = self.deletes[5]
|
delete = self.deletes[6]
|
||||||
self.assertEqual(delete.table, 'win_archives')
|
self.assertEqual(delete.table, 'win_archives')
|
||||||
self.assertEqual(delete.clauses, ['archive_id=%(archive_id)i'])
|
self.assertEqual(delete.clauses, ['archive_id=%(archive_id)i'])
|
||||||
self.assertEqual(delete.values, {'archive_id': 9999})
|
self.assertEqual(delete.values, {'archive_id': 9999})
|
||||||
|
|
||||||
delete = self.deletes[6]
|
delete = self.deletes[7]
|
||||||
self.assertEqual(delete.table, 'image_archives')
|
self.assertEqual(delete.table, 'image_archives')
|
||||||
self.assertEqual(delete.clauses, ['archive_id=%(archive_id)i'])
|
self.assertEqual(delete.clauses, ['archive_id=%(archive_id)i'])
|
||||||
self.assertEqual(delete.values, {'archive_id': 9999})
|
self.assertEqual(delete.values, {'archive_id': 9999})
|
||||||
|
|
||||||
delete = self.deletes[7]
|
delete = self.deletes[8]
|
||||||
self.assertEqual(delete.table, 'buildroot_archives')
|
self.assertEqual(delete.table, 'buildroot_archives')
|
||||||
self.assertEqual(delete.clauses, ['archive_id=%(archive_id)i'])
|
self.assertEqual(delete.clauses, ['archive_id=%(archive_id)i'])
|
||||||
self.assertEqual(delete.values, {'archive_id': 9999})
|
self.assertEqual(delete.values, {'archive_id': 9999})
|
||||||
|
|
||||||
delete = self.deletes[8]
|
|
||||||
self.assertEqual(delete.table, 'archive_rpm_components')
|
|
||||||
self.assertEqual(delete.clauses, ['archive_id=%(archive_id)i'])
|
|
||||||
self.assertEqual(delete.values, {'archive_id': 9999})
|
|
||||||
|
|
||||||
delete = self.deletes[9]
|
delete = self.deletes[9]
|
||||||
self.assertEqual(delete.table, 'archive_components')
|
self.assertEqual(delete.table, 'archive_rpm_components')
|
||||||
self.assertEqual(delete.clauses, ['archive_id=%(archive_id)i'])
|
self.assertEqual(delete.clauses, ['archive_id=%(archive_id)i'])
|
||||||
self.assertEqual(delete.values, {'archive_id': 9999})
|
self.assertEqual(delete.values, {'archive_id': 9999})
|
||||||
|
|
||||||
delete = self.deletes[10]
|
delete = self.deletes[10]
|
||||||
self.assertEqual(delete.table, 'archive_components')
|
self.assertEqual(delete.table, 'archive_components')
|
||||||
self.assertEqual(delete.clauses, ['component_id=%(archive_id)i'])
|
self.assertEqual(delete.clauses, ['archive_id=%(archive_id)i'])
|
||||||
self.assertEqual(delete.values, {'archive_id': 9999})
|
self.assertEqual(delete.values, {'archive_id': 9999})
|
||||||
|
|
||||||
delete = self.deletes[11]
|
delete = self.deletes[11]
|
||||||
|
self.assertEqual(delete.table, 'archive_components')
|
||||||
|
self.assertEqual(delete.clauses, ['component_id=%(archive_id)i'])
|
||||||
|
self.assertEqual(delete.values, {'archive_id': 9999})
|
||||||
|
|
||||||
|
delete = self.deletes[12]
|
||||||
self.assertEqual(delete.table, 'archiveinfo')
|
self.assertEqual(delete.table, 'archiveinfo')
|
||||||
self.assertEqual(delete.clauses, ['build_id=%(id)i'])
|
self.assertEqual(delete.clauses, ['build_id=%(id)i'])
|
||||||
self.assertEqual(delete.values, {'id': self.binfo['build_id']})
|
self.assertEqual(delete.values, {'id': self.binfo['build_id']})
|
||||||
|
|
||||||
delete = self.deletes[12]
|
delete = self.deletes[13]
|
||||||
self.assertEqual(delete.table, 'maven_builds')
|
self.assertEqual(delete.table, 'maven_builds')
|
||||||
self.assertEqual(delete.clauses, ['build_id=%(id)i'])
|
self.assertEqual(delete.clauses, ['build_id=%(id)i'])
|
||||||
self.assertEqual(delete.values, {'id': self.binfo['build_id']})
|
self.assertEqual(delete.values, {'id': self.binfo['build_id']})
|
||||||
|
|
||||||
delete = self.deletes[13]
|
delete = self.deletes[14]
|
||||||
self.assertEqual(delete.table, 'win_builds')
|
self.assertEqual(delete.table, 'win_builds')
|
||||||
self.assertEqual(delete.clauses, ['build_id=%(id)i'])
|
self.assertEqual(delete.clauses, ['build_id=%(id)i'])
|
||||||
self.assertEqual(delete.values, {'id': self.binfo['build_id']})
|
self.assertEqual(delete.values, {'id': self.binfo['build_id']})
|
||||||
|
|
||||||
delete = self.deletes[14]
|
delete = self.deletes[15]
|
||||||
self.assertEqual(delete.table, 'image_builds')
|
self.assertEqual(delete.table, 'image_builds')
|
||||||
self.assertEqual(delete.clauses, ['build_id=%(id)i'])
|
self.assertEqual(delete.clauses, ['build_id=%(id)i'])
|
||||||
self.assertEqual(delete.values, {'id': self.binfo['build_id']})
|
self.assertEqual(delete.values, {'id': self.binfo['build_id']})
|
||||||
|
|
||||||
delete = self.deletes[15]
|
delete = self.deletes[16]
|
||||||
self.assertEqual(delete.table, 'build_types')
|
self.assertEqual(delete.table, 'build_types')
|
||||||
self.assertEqual(delete.clauses, ['build_id=%(id)i'])
|
self.assertEqual(delete.clauses, ['build_id=%(id)i'])
|
||||||
self.assertEqual(delete.values, {'id': self.binfo['build_id']})
|
self.assertEqual(delete.values, {'id': self.binfo['build_id']})
|
||||||
|
|
||||||
delete = self.deletes[16]
|
delete = self.deletes[17]
|
||||||
self.assertEqual(delete.table, 'tag_listing')
|
self.assertEqual(delete.table, 'tag_listing')
|
||||||
self.assertEqual(delete.clauses, ['build_id=%(id)i'])
|
self.assertEqual(delete.clauses, ['build_id=%(id)i'])
|
||||||
self.assertEqual(delete.values, {'id': self.binfo['build_id']})
|
self.assertEqual(delete.values, {'id': self.binfo['build_id']})
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue