check on tasks, partially implement assign timeout

This commit is contained in:
Mike McLean 2023-04-07 16:00:56 -04:00 committed by Tomas Kopecek
parent ddca9d133b
commit 6737cb5fa5
3 changed files with 82 additions and 29 deletions

View file

@ -10018,8 +10018,10 @@ def _delete_event_id():
class RootExports(object):
'''Contains functions that are made available via XMLRPC'''
def TEST(self):
def TEST(self, fail=False):
scheduler.TaskScheduler().run()
if fail:
raise Exception('DEBUG')
def TEST2(self, hostID=1):
return Host(hostID).getLoadData()

View file

@ -1,5 +1,6 @@
import logging
import psycopg2
import time
import koji
from .db import QueryProcessor, InsertProcessor, UpdateProcessor, db_lock
@ -52,21 +53,19 @@ def get_tasks_for_host(hostID):
return query.execute()
def getTaskRuns(taskID=None, hostID=None, state=None):
def getTaskRuns(taskID=None, hostID=None, active=None):
taskID = convert_value(taskID, cast=int, none_allowed=True)
hostID = convert_value(hostID, cast=int, none_allowed=True)
state = convert_value(state, cast=intlist, none_allowed=True)
active = convert_value(active, cast=bool, none_allowed=True)
fields = (
('scheduler_task_runs.id', 'id'),
('scheduler_task_runs.task_id', 'task_id'),
('scheduler_task_runs.host_id', 'host_id'),
('host.name', 'host_name'),
('task.method', 'method'),
('scheduler_task_runs.state', 'state'),
("date_part('epoch', create_time)", 'create_ts'),
("date_part('epoch', start_time)", 'start_ts'),
("date_part('epoch', end_time)", 'end_ts'),
# ('host.name', 'host_name'),
# ('task.method', 'method'),
('scheduler_task_runs.active', 'active'),
("date_part('epoch', scheduler_task_runs.create_time)", 'create_ts'),
)
fields, aliases = zip(*fields)
@ -75,16 +74,15 @@ def getTaskRuns(taskID=None, hostID=None, state=None):
clauses.append('task_id = %(taskID)s')
if hostID is not None:
clauses.append('host_id = %(hostID)s')
if state is not None:
clauses.append('host_id IN %(state)s')
if active is not None:
clauses.append('active = %(active)s')
query = QueryProcessor(
columns=fields, aliases=aliases, tables=['scheduler_task_runs'],
joins=['host ON host_id=host.id', 'task ON task_id=task.id'],
# joins=['host ON host_id=host.id', 'task ON task_id=task.id'],
clauses=clauses, values=locals())
data = query.execute()
return data
return query.execute()
def scheduler_map_task(taskinfo):
@ -104,8 +102,11 @@ class TaskScheduler(object):
self.tasks_by_bin = None
self.active_tasks = None
self.free_tasks = None
# TODO these things need proper config
self.maxjobs = 15 # XXX
self.capacity_overcommit = 5 # TODO config
self.capacity_overcommit = 5
self.assign_timeout = 300
def run(self):
if not db_lock('scheduler', wait=False):
@ -114,22 +115,16 @@ class TaskScheduler(object):
self.do_schedule()
# TODO clean up bad data (e.g. active tasks with no host)
# TODO check for runs that aren't getting picked up
self.check_active_tasks()
return True
def get_runs(self):
runs = getTaskRuns()
runs_by_task = {}
for run in runs:
runs_by_task.setdefault(run['task_id'], [])
runs_by_task[run['task_id']].append(run)
def do_schedule(self):
self.get_tasks()
self.get_hosts()
# debug
logger.info('Running task scheduler')
logger.info(f'Hosts: {len(self.hosts)}')
logger.info(f'Free tasks: {len(self.free_tasks)}')
logger.info(f'Active tasks: {len(self.active_tasks)}')
@ -205,6 +200,52 @@ class TaskScheduler(object):
def _rank_host(self, host):
host['_rank'] = host['_load'] + host['_ntasks'] + host['_demand']
def check_active_tasks(self):
"""Check on active tasks"""
runs = self.get_active_runs()
logger.info('Found %i active runs', len(runs))
logger.info('Checking on %i active tasks', len(self.active_tasks))
for task in self.active_tasks:
if task['state'] == koji.TASK_STATES['ASSIGNED']:
# TODO check time since assigned
# if not taken within a timeout
# - if host not checking in, then make sure host marked unavail and free
# - if host *is* checking in, then treat as refusal and free
taskruns = runs.get(task['task_id'], [])
if not taskruns:
logger.error('No active run for assigned task %(task_id)s', task)
# TODO free
continue
else:
if len(taskruns) > 1:
logger.error('Multiple active run entries for assigned task %(task_id)s',
task)
# TODO fix
age = time.time() - min([r['create_ts'] for r in taskruns])
if age > self.assign_timeout:
# TODO free
# TODO check host too
logger.info('Task assignment timeout for %(task_id)s', task)
pass
elif task['state'] == koji.TASK_STATES['OPEN']:
# TODO sanity check host
if not task['host_id']:
# shouldn't happen
# TODO
continue
host = self.hosts.get(task['host_id'])
if not host:
logger.error('Host for task is not available')
def get_active_runs(self):
runs = getTaskRuns(active=True)
runs_by_task = {}
for run in runs:
runs_by_task.setdefault(run['task_id'], [])
runs_by_task[run['task_id']].append(run)
return runs_by_task
def get_tasks(self):
"""Get the task data that we need for scheduling"""
@ -324,9 +365,22 @@ class TaskScheduler(object):
def add_run(self, task, host):
logger.info('Assigning task %s (%s) to host %s',
task['task_id'], task['method'], host['name'])
# mark any older runs inactive
update = UpdateProcessor(
'scheduler_task_runs',
data={'active': False},
clauses=['task_id=%(task_id)s', 'active = TRUE'],
values={'task_id': task['task_id']},
)
update.execute()
# add the new run
insert = InsertProcessor('scheduler_task_runs')
insert.set(task_id=task['task_id'], host_id=host['id'], state=koji.TASK_STATES['ASSIGNED'])
insert.set(task_id=task['task_id'], host_id=host['id'])
insert.execute()
# mark the task assigned
update = UpdateProcessor(
'task',
data={'host_id': host['id'], 'state': koji.TASK_STATES['ASSIGNED']},

View file

@ -989,14 +989,11 @@ CREATE TABLE scheduler_task_runs (
id SERIAL NOT NULL PRIMARY KEY,
task_id INTEGER REFERENCES task (id) NOT NULL,
host_id INTEGER REFERENCES host (id) NOT NULL,
state INTEGER NOT NULL,
create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
start_time TIMESTAMPTZ,
end_time TIMESTAMPTZ
active BOOLEAN NOT NULL DEFAULT TRUE,
create_time TIMESTAMPTZ NOT NULL DEFAULT NOW()
) WITHOUT OIDS;
CREATE INDEX scheduler_task_runs_task ON scheduler_task_runs(task_id);
CREATE INDEX scheduler_task_runs_host ON scheduler_task_runs(host_id);
CREATE INDEX scheduler_task_runs_state ON scheduler_task_runs(state);
CREATE INDEX scheduler_task_runs_create_time ON scheduler_task_runs(create_time);