PR#1714: use BulkInsertProcessor for hub mass inserts

Merges #1714
https://pagure.io/koji/pull-request/1714

Fixes: #1712
https://pagure.io/koji/issue/1712
[RFE] Use bulk inserts in hub
This commit is contained in:
Mike McLean 2019-11-19 12:01:19 -05:00
commit aaaabcc99c
5 changed files with 273 additions and 98 deletions

View file

@ -6446,16 +6446,16 @@ class CG_Importer(object):
rpmlist = fileinfo['hub.rpmlist']
archives = fileinfo['hub.archives']
insert = InsertProcessor('archive_rpm_components')
insert.set(archive_id=archive_id)
for rpminfo in rpmlist:
insert.set(rpm_id=rpminfo['id'])
if rpmlist:
insert = BulkInsertProcessor('archive_rpm_components')
for rpminfo in rpmlist:
insert.set(archive_id=archive_id, rpm_id=rpminfo['id'])
insert.execute()
insert = InsertProcessor('archive_components')
insert.set(archive_id=archive_id)
for archiveinfo in archives:
insert.set(component_id=archiveinfo['id'])
if archives:
insert = BulkInsertProcessor('archive_components')
for archiveinfo in archives:
insert.set(archive_id=archive_id, component_id=archiveinfo['id'])
insert.execute()
@ -8348,6 +8348,98 @@ def _fix_extra_field(row):
return row
class BulkInsertProcessor(object):
def __init__(self, table, data=None, columns=None, strict=True, batch=1000):
"""Do bulk inserts - it has some limitations compared to
InsertProcessor (no rawset, dup_check).
set() is replaced with add_record() to avoid confusion
table - name of the table
data - list of dict per record
columns - list/set of names of used columns - makes sense
mainly with strict=True
strict - if True, all records must contain values for all columns.
if False, missing values will be inserted as NULLs
batch - batch size for inserts (one statement per batch)
"""
self.table = table
self.data = []
if columns is None:
self.columns = set()
else:
self.columns = set(columns)
if data is not None:
self.data = data
for row in data:
self.columns |= set(row.keys())
self.strict = strict
self.batch = batch
def __str__(self):
if not self.data:
return "-- incomplete insert: no data"
query, params = self._get_insert(self.data)
return query
def _get_insert(self, data):
"""
Generate one insert statement for the given data
:param list data: list of rows (dict format) to insert
:returns: (query, params)
"""
if not data:
# should not happen
raise ValueError('no data for insert')
parts = ['INSERT INTO %s ' % self.table]
columns = sorted(self.columns)
parts.append("(%s) " % ', '.join(columns))
prepared_data = {}
values = []
i = 0
for row in data:
row_values = []
for key in columns:
if key in row:
row_key = '%s%d' % (key, i)
row_values.append("%%(%s)s" % row_key)
prepared_data[row_key] = row[key]
elif self.strict:
raise koji.GenericError("Missing value %s in BulkInsert" % key)
else:
row_values.append("NULL")
values.append("(%s)" % ', '.join(row_values))
i += 1
parts.append("VALUES %s" % ', '.join(values))
return ''.join(parts), prepared_data
def __repr__(self):
return "<BulkInsertProcessor: %r>" % vars(self)
def add_record(self, **kwargs):
"""Set whole record via keyword args"""
if not kwargs:
raise koji.GenericError("Missing values in BulkInsert.add_record")
self.data.append(kwargs)
self.columns |= set(kwargs.keys())
def execute(self):
if not self.batch:
self._one_insert(self.data)
else:
for i in range(0, len(self.data), self.batch):
data = self.data[i:i+self.batch]
self._one_insert(data)
def _one_insert(self, data):
query, params = self._get_insert(data)
_dml(query, params)
class InsertProcessor(object):
"""Build an insert statement
@ -9438,16 +9530,16 @@ def importImageInternal(task_id, build_id, imgdata):
rpm_ids.append(data['id'])
# associate those RPMs with the image
insert = InsertProcessor('archive_rpm_components')
insert = BulkInsertProcessor('archive_rpm_components')
for archive in archives:
logger.info('working on archive %s', archive)
if archive['filename'].endswith('xml'):
continue
insert.set(archive_id = archive['id'])
logger.info('associating installed rpms with %s', archive['id'])
for rpm_id in rpm_ids:
insert.set(rpm_id = rpm_id)
insert.execute()
insert.add_record(archive_id=archive['id'], rpm_id=rpm_id)
if insert.data:
insert.execute()
koji.plugin.run_callbacks('postImport', type='image', image=imgdata,
build=build_info, fullpath=fullpath)
@ -12421,6 +12513,7 @@ class BuildRoot(object):
def _setList(self, rpmlist, update=False):
"""Set or update the list of rpms in a buildroot"""
update = bool(update)
if self.id is None:
raise koji.GenericError("buildroot not specified")
if update:
@ -12441,11 +12534,11 @@ class BuildRoot(object):
#we sort to try to avoid deadlock issues
rpm_ids.sort()
# actually do the inserts
insert = InsertProcessor('buildroot_listing')
insert.set(buildroot_id=self.id, is_update=bool(update))
for rpm_id in rpm_ids:
insert.set(rpm_id=rpm_id)
# actually do the inserts (in bulk)
if rpm_ids:
insert = BulkInsertProcessor(table='buildroot_listing')
for rpm_id in rpm_ids:
insert.add_record(buildroot_id=self.id, rpm_id=rpm_id, is_update=update)
insert.execute()
def setList(self, rpmlist):
@ -12490,6 +12583,7 @@ class BuildRoot(object):
If False, they dependencies required to setup the build environment.
"""
project = bool(project)
if self.is_standard:
if not (context.opts.get('EnableMaven') or context.opts.get('EnableWin')):
raise koji.GenericError("non-rpm support is not enabled")
@ -12498,21 +12592,25 @@ class BuildRoot(object):
archives = set([r['id'] for r in archives])
current = set([r['id'] for r in self.getArchiveList()])
new_archives = archives.difference(current)
insert = InsertProcessor('buildroot_archives')
insert.set(buildroot_id=self.id, project_dep=bool(project))
for archive_id in sorted(new_archives):
insert.set(archive_id=archive_id)
if new_archives:
insert = BulkInsertProcessor('buildroot_archives')
for archive_id in sorted(new_archives):
insert.add_record(buildroot_id=self.id,
project_dep=project,
archive_id=archive_id)
insert.execute()
def setTools(self, tools):
"""Set tools info for buildroot"""
insert = InsertProcessor('buildroot_tools_info')
insert.set(buildroot_id=self.id)
if not tools:
return
insert = BulkInsertProcessor('buildroot_tools_info')
for tool in tools:
insert.set(tool=tool['name'])
insert.set(version=tool['version'])
insert.execute()
insert.add_record(buildroot_id=self.id, tool=tool['name'], version=tool['version'])
insert.execute()
class Host(object):

View file

@ -121,76 +121,25 @@
{}
],
[
"INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",
{
"archive_id": 1002,
"rpm_id": 1000
},
{}
],
[
"INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",
{
"archive_id": 1002,
"rpm_id": 1001
},
{}
],
[
"INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",
{
"archive_id": 1002,
"rpm_id": 1002
},
{}
],
[
"INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",
{
"archive_id": 1003,
"rpm_id": 1000
},
{}
],
[
"INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",
{
"archive_id": 1003,
"rpm_id": 1001
},
{}
],
[
"INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",
{
"archive_id": 1003,
"rpm_id": 1002
},
{}
],
[
"INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",
{
"archive_id": 1005,
"rpm_id": 1000
},
{}
],
[
"INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",
{
"archive_id": 1005,
"rpm_id": 1001
},
{}
],
[
"INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id)s, %(rpm_id)s)",
{
"archive_id": 1005,
"rpm_id": 1002
},
{}
"INSERT INTO archive_rpm_components (archive_id, rpm_id) VALUES (%(archive_id0)s, %(rpm_id0)s), (%(archive_id1)s, %(rpm_id1)s), (%(archive_id2)s, %(rpm_id2)s), (%(archive_id3)s, %(rpm_id3)s), (%(archive_id4)s, %(rpm_id4)s), (%(archive_id5)s, %(rpm_id5)s), (%(archive_id6)s, %(rpm_id6)s), (%(archive_id7)s, %(rpm_id7)s), (%(archive_id8)s, %(rpm_id8)s)",
{"archive_id0": 1002,
"archive_id1": 1002,
"archive_id2": 1002,
"archive_id3": 1003,
"archive_id4": 1003,
"archive_id5": 1003,
"archive_id6": 1005,
"archive_id7": 1005,
"archive_id8": 1005,
"rpm_id0": 1000,
"rpm_id1": 1001,
"rpm_id2": 1002,
"rpm_id3": 1000,
"rpm_id4": 1001,
"rpm_id5": 1002,
"rpm_id6": 1000,
"rpm_id7": 1001,
"rpm_id8": 1002}
]
],
"updates": [

View file

@ -28,6 +28,17 @@ def make_insert_grabber(test):
return grab_insert
def make_bulk_insert_grabber(test):
# test is the test class instance
def grab_insert(insert, data):
# insert is self for the BulkInsertProcessor instance
# we are replacing _one_insert()
query, params = insert._get_insert(data)
info = [query, copy.copy(params)]
test.inserts.append(info)
return grab_insert
def make_update_grabber(test):
# test is the test class instance
def grab_update(update):
@ -68,6 +79,8 @@ class TestCompleteImageBuild(unittest.TestCase):
self.updates = []
mock.patch.object(kojihub.InsertProcessor, 'execute',
new=make_insert_grabber(self)).start()
mock.patch.object(kojihub.BulkInsertProcessor, '_one_insert',
new=make_bulk_insert_grabber(self)).start()
mock.patch.object(kojihub.UpdateProcessor, 'execute',
new=make_update_grabber(self)).start()
mock.patch('kojihub.nextval', new=self.my_nextval).start()

View file

@ -110,6 +110,6 @@ class TestImportImageInternal(unittest.TestCase):
expression, kwargs = cursor.execute.mock_calls[0][1]
expression = " ".join(expression.split())
expected = 'INSERT INTO archive_rpm_components (archive_id, rpm_id) ' + \
'VALUES (%(archive_id)s, %(rpm_id)s)'
'VALUES (%(archive_id0)s, %(rpm_id0)s)'
self.assertEquals(expression, expected)
self.assertEquals(kwargs, {'archive_id': 9, 'rpm_id': 6})
self.assertEquals(kwargs, {'archive_id0': 9, 'rpm_id0': 6})

View file

@ -5,6 +5,7 @@ try:
except ImportError:
import unittest
import koji
import kojihub
@ -91,3 +92,117 @@ class TestInsertProcessor(unittest.TestCase):
actual = str(proc)
expected = "INSERT INTO sometable (foo) VALUES (('bar'))" # raw data
self.assertEquals(actual, expected)
class TestBulkInsertProcessor(unittest.TestCase):
def test_basic_instantiation(self):
proc = kojihub.BulkInsertProcessor('sometable')
actual = str(proc)
expected = '-- incomplete insert: no data'
self.assertEquals(actual, expected)
def test_to_string_with_single_row(self):
proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar'}])
actual = str(proc)
expected = 'INSERT INTO sometable (foo) VALUES (%(foo0)s)'
self.assertEquals(actual, expected)
proc = kojihub.BulkInsertProcessor('sometable')
proc.add_record(foo='bar')
actual = str(proc)
self.assertEquals(actual, expected)
@mock.patch('kojihub.context')
def test_simple_execution(self, context):
cursor = mock.MagicMock()
context.cnx.cursor.return_value = cursor
proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar'}])
proc.execute()
cursor.execute.assert_called_once_with(
'INSERT INTO sometable (foo) VALUES (%(foo0)s)',
{'foo0': 'bar'},
)
cursor.reset_mock()
proc = kojihub.BulkInsertProcessor('sometable')
proc.add_record(foo='bar')
proc.execute()
cursor.execute.assert_called_once_with(
'INSERT INTO sometable (foo) VALUES (%(foo0)s)',
{'foo0': 'bar'},
)
@mock.patch('kojihub.context')
def test_bulk_execution(self, context):
cursor = mock.MagicMock()
context.cnx.cursor.return_value = cursor
proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar1'}])
proc.add_record(foo='bar2')
proc.add_record(foo='bar3')
proc.execute()
cursor.execute.assert_called_once_with(
'INSERT INTO sometable (foo) VALUES (%(foo0)s), (%(foo1)s), (%(foo2)s)',
{'foo0': 'bar1', 'foo1': 'bar2', 'foo2': 'bar3'},
)
def test_missing_values(self):
proc = kojihub.BulkInsertProcessor('sometable')
proc.add_record(foo='bar')
proc.add_record(foo2='bar2')
with self.assertRaises(koji.GenericError) as cm:
str(proc)
self.assertEquals(cm.exception.args[0], 'Missing value foo2 in BulkInsert')
def test_missing_values_nostrict(self):
proc = kojihub.BulkInsertProcessor('sometable', strict=False)
proc.add_record(foo='bar')
proc.add_record(foo2='bar2')
actual = str(proc)
expected = 'INSERT INTO sometable (foo, foo2) VALUES (%(foo0)s, NULL), (NULL, %(foo21)s)'
self.assertEquals(actual, expected)
def test_missing_values_explicit_columns(self):
proc = kojihub.BulkInsertProcessor('sometable', strict=True, columns=['foo', 'foo2'])
proc.add_record(foo='bar')
with self.assertRaises(koji.GenericError) as cm:
str(proc)
self.assertEquals(cm.exception.args[0], 'Missing value foo2 in BulkInsert')
@mock.patch('kojihub.context')
def test_batch_execution(self, context):
cursor = mock.MagicMock()
context.cnx.cursor.return_value = cursor
proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar1'}], batch=2)
proc.add_record(foo='bar2')
proc.add_record(foo='bar3')
proc.execute()
calls = cursor.execute.mock_calls
# list of (name, positional args, keyword args)
self.assertEquals(len(calls), 2)
self.assertEquals(
calls[0][1],
('INSERT INTO sometable (foo) VALUES (%(foo0)s), (%(foo1)s)',
{'foo0': 'bar1', 'foo1': 'bar2'}))
self.assertEquals(
calls[1][1],
('INSERT INTO sometable (foo) VALUES (%(foo0)s)',
{'foo0': 'bar3'}))
@mock.patch('kojihub.context')
def test_no_batch_execution(self, context):
cursor = mock.MagicMock()
context.cnx.cursor.return_value = cursor
proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar1'}], batch=None)
proc.add_record(foo='bar2')
proc.add_record(foo='bar3')
proc.execute()
calls = cursor.execute.mock_calls
# list of (name, positional args, keyword args)
self.assertEquals(len(calls), 1)
self.assertEquals(
calls[0][1],
('INSERT INTO sometable (foo) VALUES (%(foo0)s), (%(foo1)s), (%(foo2)s)',
{'foo0': 'bar1', 'foo1': 'bar2', 'foo2': 'bar3'}))