UpsertProcessor

This commit is contained in:
Mike McLean 2023-05-07 17:06:26 -04:00 committed by Tomas Kopecek
parent a577984d06
commit fba5abe2ed
2 changed files with 71 additions and 40 deletions

View file

@ -444,6 +444,42 @@ class InsertProcessor(object):
return _dml(str(self), self.data)
class UpsertProcessor(InsertProcessor):
"""Build a basic upsert statement
table - the table to insert into
data - a dictionary of data to insert (keys = row names)
rawdata - data to insert specified as sql expressions rather than python values
keys - the rows that are the unique keys
skip_dup - if set to true, do nothing on conflict
"""
def __init__(self, table, data=None, rawdata=None, keys=None, skip_dup=False):
super(UpsertProcessor, self).__init__(table, data=data, rawdata=rawdata)
self.keys = keys
self.skip_dup = skip_dup
if not keys and not skip_dup:
raise ValueError('either keys or skip_dup must be set')
def __repr__(self):
return "<UpsertProcessor: %r>" % vars(self)
def __str__(self):
insert = super(UpsertProcessor, self).__str__()
parts = [insert]
if self.skip_dup:
parts.append(' ON CONFLICT DO NOTHING')
else:
parts.append(f' ON CONFLICT ({",".join(self.keys)}) DO UPDATE SET ')
# filter out conflict keys from data
data = {k: self.data[k] for k in self.data if k not in self.keys}
rawdata = {k: self.rawdata[k] for k in self.rawdata if k not in self.keys}
assigns = [f"{key} = %({key})s" for key in data]
assigns.extend([f"{key} = ({rawdata[key]})" for key in self.rawdata])
parts.append(', '.join(sorted(assigns)))
return ''.join(parts)
class UpdateProcessor(object):
"""Build an update statement

View file

@ -4,7 +4,7 @@ import time
import koji
from . import kojihub
from .db import QueryProcessor, InsertProcessor, UpdateProcessor, QueryView, db_lock
from .db import QueryProcessor, InsertProcessor, UpsertProcessor, UpdateProcessor, QueryView, db_lock
logger = logging.getLogger('koji.scheduler')
@ -72,36 +72,34 @@ def set_refusal(hostID, taskID, soft=True, by_host=False, msg=''):
# make very many
def get_task_refusals(taskID=None, hostID=None):
taskID = kojihub.convert_value(taskID, cast=int, none_allowed=True)
hostID = kojihub.convert_value(hostID, cast=int, none_allowed=True)
class TaskRefusalsQuery(QueryView):
fields = (
('scheduler_task_refusals.id', 'id'),
('scheduler_task_refusals.task_id', 'task_id'),
('scheduler_task_refusals.host_id', 'host_id'),
('scheduler_task_refusals.by_host', 'by_host'),
('scheduler_task_refusals.soft', 'soft'),
('scheduler_task_refusals.msg', 'msg'),
# ('host.name', 'host_name'),
("date_part('epoch', scheduler_task_refusals.time)", 'ts'),
)
fields, aliases = zip(*fields)
tables = ['scheduler_task_refusals']
joinmap = {
'task': 'task ON scheduler_task_refusals.task_id = task.id',
'host': 'host ON scheduler_task_refusals.host_id = host.id',
}
fieldmap = {
'id': ['scheduler_task_refusals.id', None],
'task_id': ['scheduler_task_refusals.task_id', None],
'host_id': ['scheduler_task_refusals.host_id', None],
'by_host': ['scheduler_task_refusals.by_host', None],
'soft': ['scheduler_task_refusals.soft', None],
'msg': ['scheduler_task_refusals.msg', None],
'ts': ["date_part('epoch', scheduler_task_refusals.time)", None],
'method': ['task.method', 'task'],
'state': ['task.state', 'task'],
'owner': ['task.owner', 'task'],
'arch': ['task.arch', 'task'],
'channel_id': ['task.channel_id', 'task'],
'host_name': ['host.name', 'host'],
'host_ready': ['host.ready', 'host'],
}
default_fields = ('id', 'task_id', 'host_id', 'by_host', 'soft', 'msg', 'ts')
clauses = []
if taskID is not None:
clauses.append('task_id = %(taskID)s')
if hostID is not None:
clauses.append('host_id = %(hostID)s')
query = QueryProcessor(
columns=fields, aliases=aliases, tables=['scheduler_task_refusals'],
# joins=['host ON host_id=host.id', 'task ON task_id=task.id'],
clauses=clauses, values=locals(),
opts={'order': '-id'}
)
return query.execute()
def get_task_refusals(clauses=None, fields=None):
return TaskRefusalsQuery(clauses, fields).execute()
def get_host_data(hostID=None):
@ -217,20 +215,13 @@ class TaskScheduler(object):
ret = True
# save current ts
update = UpdateProcessor(
upsert = UpsertProcessor(
'scheduler_sys_data',
clauses=['name = %(name)s'],
values={'name': 'last_run_ts'},
data={'data': json.dumps(now)},
data={'name': 'last_run_ts',
'data': json.dumps(now)},
keys=['name'],
)
chk = update.execute()
if not chk:
# hasn't been defined yet
insert = InsertProcessor(
'scheduler_sys_data',
data={'name': 'last_run_ts', 'data': json.dumps(now)},
)
insert.execute()
upsert.execute()
return ret
@ -455,6 +446,9 @@ class TaskScheduler(object):
self.free_tasks = free_tasks
self.active_tasks = active_tasks
def get_refusals(self):
pass
def get_hosts(self):
# get hosts and bin them
hosts_by_bin = {}
@ -544,4 +538,5 @@ class TaskScheduler(object):
class SchedulerExports:
getTaskRuns = staticmethod(get_task_runs)
getTaskRefusals = staticmethod(get_task_refusals)
getHostData = staticmethod(get_host_data)