use BulkInsertProcessor for hub mass inserts

Fixes: https://pagure.io/koji/issue/1712
This commit is contained in:
Tomas Kopecek 2019-10-22 15:43:12 +02:00 committed by Mike McLean
parent a365c9ad54
commit 6de0700ac8
5 changed files with 211 additions and 98 deletions

View file

@ -6446,16 +6446,16 @@ class CG_Importer(object):
rpmlist = fileinfo['hub.rpmlist']
archives = fileinfo['hub.archives']
insert = InsertProcessor('archive_rpm_components')
insert.set(archive_id=archive_id)
for rpminfo in rpmlist:
insert.set(rpm_id=rpminfo['id'])
if rpmlist:
insert = BulkInsertProcessor('archive_rpm_components')
for rpminfo in rpmlist:
insert.set(archive_id=archive_id, rpm_id=rpminfo['id'])
insert.execute()
insert = InsertProcessor('archive_components')
insert.set(archive_id=archive_id)
for archiveinfo in archives:
insert.set(component_id=archiveinfo['id'])
if archives:
insert = BulkInsertProcessor('archive_components')
for archiveinfo in archives:
insert.set(archive_id=archive_id, component_id=archiveinfo['id'])
insert.execute()
@ -8348,6 +8348,74 @@ def _fix_extra_field(row):
return row
class BulkInsertProcessor(object):
def __init__(self, table, data=None, columns=None, strict=True):
"""Do bulk inserts - it has some limitations compared to
InsertProcessor (no rawset, dup_check).
set() is replaced with set_record() to avoid confusion
table - name of the table
data - list of dict per record
columns - list/set of names of used columns - makes sense
mainly with strict=True
strict - all records must contain values for all columns, if
it is False, missing values will be inserted as NULLs
"""
self.table = table
self.data = []
self.prepared_data = {}
if columns is None:
self.columns = set()
else:
self.columns = set(columns)
if data is not None:
self.data = data
for row in data:
self.columns |= set(row.keys())
self.strict = strict
def __str__(self):
if not self.data:
return "-- incomplete update: no assigns"
parts = ['INSERT INTO %s ' % self.table]
columns = sorted(self.columns)
parts.append("(%s) " % ', '.join(columns))
self.prepared_data = {}
values = []
i = 0
for row in self.data:
row_values = []
for key in columns:
if key in row:
row_key = '%s%d' % (key, i)
row_values.append("%%(%s)s" % row_key)
self.prepared_data[row_key] = row[key]
elif self.strict:
raise koji.GenericError("Missing value %s in BulkInsert" % key)
else:
row_values.append("NULL")
values.append("(%s)" % ', '.join(row_values))
i += 1
parts.append("VALUES %s" % ', '.join(values))
return ''.join(parts)
def __repr__(self):
return "<BulkInsertProcessor: %r>" % vars(self)
def set_record(self, **kwargs):
"""Set whole record via keyword args"""
if not kwargs:
raise koji.GenericError("Missing values in BulkInsert.set_record")
self.data.append(kwargs)
self.columns |= set(kwargs.keys())
def execute(self):
return _dml(str(self), self.prepared_data)
class InsertProcessor(object):
"""Build an insert statement
@ -9438,16 +9506,16 @@ def importImageInternal(task_id, build_id, imgdata):
rpm_ids.append(data['id'])
# associate those RPMs with the image
insert = InsertProcessor('archive_rpm_components')
insert = BulkInsertProcessor('archive_rpm_components')
for archive in archives:
logger.info('working on archive %s', archive)
if archive['filename'].endswith('xml'):
continue
insert.set(archive_id = archive['id'])
logger.info('associating installed rpms with %s', archive['id'])
for rpm_id in rpm_ids:
insert.set(rpm_id = rpm_id)
insert.execute()
insert.set_record(archive_id=archive['id'], rpm_id=rpm_id)
if insert.data:
insert.execute()
koji.plugin.run_callbacks('postImport', type='image', image=imgdata,
build=build_info, fullpath=fullpath)
@ -12421,6 +12489,7 @@ class BuildRoot(object):
def _setList(self, rpmlist, update=False):
"""Set or update the list of rpms in a buildroot"""
update = bool(update)
if self.id is None:
raise koji.GenericError("buildroot not specified")
if update:
@ -12441,11 +12510,11 @@ class BuildRoot(object):
#we sort to try to avoid deadlock issues
rpm_ids.sort()
# actually do the inserts
insert = InsertProcessor('buildroot_listing')
insert.set(buildroot_id=self.id, is_update=bool(update))
for rpm_id in rpm_ids:
insert.set(rpm_id=rpm_id)
# actually do the inserts (in bulk)
if rpm_ids:
insert = BulkInsertProcessor(table='buildroot_listing')
for rpm_id in rpm_ids:
insert.set_record(buildroot_id=self.id, rpm_id=rpm_id, is_update=update)
insert.execute()
def setList(self, rpmlist):
@ -12490,6 +12559,7 @@ class BuildRoot(object):
If False, they dependencies required to setup the build environment.
"""
project = bool(project)
if self.is_standard:
if not (context.opts.get('EnableMaven') or context.opts.get('EnableWin')):
raise koji.GenericError("non-rpm support is not enabled")
@ -12498,21 +12568,25 @@ class BuildRoot(object):
archives = set([r['id'] for r in archives])
current = set([r['id'] for r in self.getArchiveList()])
new_archives = archives.difference(current)
insert = InsertProcessor('buildroot_archives')
insert.set(buildroot_id=self.id, project_dep=bool(project))
for archive_id in sorted(new_archives):
insert.set(archive_id=archive_id)
if new_archives:
insert = BulkInsertProcessor('buildroot_archives')
for archive_id in sorted(new_archives):
insert.set_record(buildroot_id=self.id,
project_dep=project,
archive_id=archive_id)
insert.execute()
def setTools(self, tools):
"""Set tools info for buildroot"""
insert = InsertProcessor('buildroot_tools_info')
insert.set(buildroot_id=self.id)
if not tools:
return
insert = BulkInsertProcessor('buildroot_tools_info')
for tool in tools:
insert.set(tool=tool['name'])
insert.set(version=tool['version'])
insert.execute()
insert.set_record(buildroot_id=self.id, tool=tool['name'], version=tool['version'])
insert.execute()
class Host(object):

View file

@ -121,76 +121,25 @@
{}
],
[
"INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",
{
"archive_id": 1002,
"rpm_id": 1000
},
{}
],
[
"INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",
{
"archive_id": 1002,
"rpm_id": 1001
},
{}
],
[
"INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",
{
"archive_id": 1002,
"rpm_id": 1002
},
{}
],
[
"INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",
{
"archive_id": 1003,
"rpm_id": 1000
},
{}
],
[
"INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",
{
"archive_id": 1003,
"rpm_id": 1001
},
{}
],
[
"INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",
{
"archive_id": 1003,
"rpm_id": 1002
},
{}
],
[
"INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",
{
"archive_id": 1005,
"rpm_id": 1000
},
{}
],
[
"INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",
{
"archive_id": 1005,
"rpm_id": 1001
},
{}
],
[
"INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",
{
"archive_id": 1005,
"rpm_id": 1002
},
{}
"INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id0)s, %(rpm_id0)s), (%(archive_id1)s, %(rpm_id1)s), (%(archive_id2)s, %(rpm_id2)s), (%(archive_id3)s, %(rpm_id3)s), (%(archive_id4)s, %(rpm_id4)s), (%(archive_id5)s, %(rpm_id5)s), (%(archive_id6)s, %(rpm_id6)s), (%(archive_id7)s, %(rpm_id7)s), (%(archive_id8)s, %(rpm_id8)s)",
{"archive_id0": 1002,
"archive_id1": 1002,
"archive_id2": 1002,
"archive_id3": 1003,
"archive_id4": 1003,
"archive_id5": 1003,
"archive_id6": 1005,
"archive_id7": 1005,
"archive_id8": 1005,
"rpm_id0": 1000,
"rpm_id1": 1001,
"rpm_id2": 1002,
"rpm_id3": 1000,
"rpm_id4": 1001,
"rpm_id5": 1002,
"rpm_id6": 1000,
"rpm_id7": 1001,
"rpm_id8": 1002}
]
],
"updates": [

View file

@ -28,6 +28,16 @@ def make_insert_grabber(test):
return grab_insert
def make_bulk_insert_grabber(test):
# test is the test class instance
def grab_insert(insert):
# insert is self for the InsertProcessor instance
# we are replacing execute()
info = [str(insert), copy.copy(insert.prepared_data)]
test.inserts.append(info)
return grab_insert
def make_update_grabber(test):
# test is the test class instance
def grab_update(update):
@ -68,6 +78,8 @@ class TestCompleteImageBuild(unittest.TestCase):
self.updates = []
mock.patch.object(kojihub.InsertProcessor, 'execute',
new=make_insert_grabber(self)).start()
mock.patch.object(kojihub.BulkInsertProcessor, 'execute',
new=make_bulk_insert_grabber(self)).start()
mock.patch.object(kojihub.UpdateProcessor, 'execute',
new=make_update_grabber(self)).start()
mock.patch('kojihub.nextval', new=self.my_nextval).start()

View file

@ -110,6 +110,6 @@ class TestImportImageInternal(unittest.TestCase):
expression, kwargs = cursor.execute.mock_calls[0][1]
expression = " ".join(expression.split())
expected = 'INSERT INTO archive_rpm_components (archive_id, rpm_id) ' + \
'VALUES (%(archive_id)s, %(rpm_id)s)'
'VALUES (%(archive_id0)s, %(rpm_id0)s)'
self.assertEquals(expression, expected)
self.assertEquals(kwargs, {'archive_id': 9, 'rpm_id': 6})
self.assertEquals(kwargs, {'archive_id0': 9, 'rpm_id0': 6})

View file

@ -5,6 +5,7 @@ try:
except ImportError:
import unittest
import koji
import kojihub
@ -91,3 +92,80 @@ class TestInsertProcessor(unittest.TestCase):
actual = str(proc)
expected = "INSERT INTO sometable (foo) VALUES (('bar'))" # raw data
self.assertEquals(actual, expected)
class TestBulkInsertProcessor(unittest.TestCase):
def test_basic_instantiation(self):
proc = kojihub.BulkInsertProcessor('sometable')
actual = str(proc)
expected = '-- incomplete update: no assigns'
self.assertEquals(actual, expected)
def test_to_string_with_single_row(self):
proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar'}])
actual = str(proc)
expected = 'INSERT INTO sometable (foo) VALUES (%(foo0)s)'
self.assertEquals(actual, expected)
proc = kojihub.BulkInsertProcessor('sometable')
proc.set_record(foo='bar')
actual = str(proc)
self.assertEquals(actual, expected)
@mock.patch('kojihub.context')
def test_simple_execution(self, context):
cursor = mock.MagicMock()
context.cnx.cursor.return_value = cursor
proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar'}])
proc.execute()
cursor.execute.assert_called_once_with(
'INSERT INTO sometable (foo) VALUES (%(foo0)s)',
{'foo0': 'bar'},
)
cursor.reset_mock()
proc = kojihub.BulkInsertProcessor('sometable')
proc.set_record(foo='bar')
proc.execute()
cursor.execute.assert_called_once_with(
'INSERT INTO sometable (foo) VALUES (%(foo0)s)',
{'foo0': 'bar'},
)
@mock.patch('kojihub.context')
def test_bulk_execution(self, context):
cursor = mock.MagicMock()
context.cnx.cursor.return_value = cursor
proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar1'}])
proc.set_record(foo='bar2')
proc.set_record(foo='bar3')
proc.execute()
cursor.execute.assert_called_once_with(
'INSERT INTO sometable (foo) VALUES (%(foo0)s), (%(foo1)s), (%(foo2)s)',
{'foo0': 'bar1', 'foo1': 'bar2', 'foo2': 'bar3'},
)
def test_missing_values(self):
proc = kojihub.BulkInsertProcessor('sometable')
proc.set_record(foo='bar')
proc.set_record(foo2='bar2')
with self.assertRaises(koji.GenericError) as cm:
str(proc)
self.assertEquals(cm.exception.args[0], 'Missing value foo2 in BulkInsert')
def test_missing_values_nostrict(self):
proc = kojihub.BulkInsertProcessor('sometable', strict=False)
proc.set_record(foo='bar')
proc.set_record(foo2='bar2')
actual = str(proc)
expected = 'INSERT INTO sometable (foo, foo2) VALUES (%(foo0)s, NULL), (NULL, %(foo21)s)'
self.assertEquals(actual, expected)
def test_missing_values_explicit_columns(self):
proc = kojihub.BulkInsertProcessor('sometable', strict=True, columns=['foo', 'foo2'])
proc.set_record(foo='bar')
with self.assertRaises(koji.GenericError) as cm:
str(proc)
self.assertEquals(cm.exception.args[0], 'Missing value foo2 in BulkInsert')