actually pick a host and add a run

This commit is contained in:
Mike McLean 2023-04-05 16:58:31 -04:00 committed by Tomas Kopecek
parent 49b66e9ec7
commit ae8607b261
2 changed files with 42 additions and 30 deletions

View file

@ -14275,6 +14275,7 @@ class Host(object):
# for builders using this old api, we fake some of this data to get them to take the
# task runs assigned to them
for task in tasks:
task['id'] = task['task_id'] # builder expects task id not run id
task['state'] = koji.TASK_STATES['ASSIGNED']
task['host_id'] = self.id
return [[host], tasks]

View file

@ -2,7 +2,7 @@ import logging
import psycopg2
import koji
from .db import QueryProcessor, db_lock
from .db import QueryProcessor, InsertProcessor, db_lock
from .util import convert_value
from koji.context import context
@ -37,6 +37,7 @@ def getTaskRuns(taskID=None, hostID=None, state=None):
('task_id', 'task_id'),
('host_id', 'host_id'),
('host.name', 'host_name'),
('task.method', 'method'),
('state', 'state'),
("date_part('epoch', create_time)", 'create_ts'),
("date_part('epoch', start_time)", 'start_ts'),
@ -54,7 +55,7 @@ def getTaskRuns(taskID=None, hostID=None, state=None):
query = QueryProcessor(
columns=fields, aliases=aliases, tables=['scheduler_task_runs'],
joins=['LEFT OUTER JOIN host on host_id=host.id'],
joins=['host ON host_id=host.id', 'task ON task_id=task.id'],
clauses=clauses, values=locals())
data = query.execute()
@ -77,7 +78,9 @@ class TaskScheduler(object):
self.hosts = None
self.tasks_by_bin = None
self.active_tasks = None
self.free_tasks = None
self.maxjobs = 15 # XXX
self.capacity_overcommit = 5 # TODO config
def run(self):
if not db_lock('scheduler', wait=False):
@ -101,6 +104,11 @@ class TaskScheduler(object):
self.get_tasks()
self.get_hosts()
# debug
logger.info(f'Hosts: {len(self.hosts)}')
logger.info(f'Free tasks: {len(self.free_tasks)}')
logger.info(f'Active tasks: {len(self.active_tasks)}')
# calculate host load and task count
for task in self.active_tasks:
# for now, we mirror what kojid updateTasks has been doing
@ -117,6 +125,7 @@ class TaskScheduler(object):
for host in self.hosts.values():
host.setdefault('_load', 0.0)
host.setdefault('_ntasks', 0)
# temporary test code
logger.info(f'Host: {host}')
ldiff = host['task_load'] - host['_load']
@ -124,35 +133,36 @@ class TaskScheduler(object):
# this is expected in a number of cases, just observing
logger.info(f'Host load differs by {ldiff:.2f}: {host}')
# order bins by available host capacity
order = []
for _bin in self.hosts_by_bin:
hosts = self.hosts_by_bin.get(_bin, [])
avail = sum([min(0, h['capacity'] - h['_load']) for h in hosts])
order.append((avail, _bin))
order.sort()
# note bin demand for each host
for n, (avail, _bin) in enumerate(order):
rank = float(n) / len(order)
for host in self.hosts_by_bin.get(_bin, []):
host.setdefault('_rank', rank)
# so host rank is set by the most contentious bin it covers
# TODO - we could be smarter here, but it's a start
# sort binned hosts by rank
for _bin in self.hosts_by_bin:
hosts = self.hosts_by_bin[_bin]
hosts.sort(key=lambda h: h['_rank'], reverse=True)
# hosts with least contention first
# figure out which hosts *can* take each task
# at the moment this is mostly just bin, but in the future it will be more complex
for task in self.free_tasks:
task['_hosts'] = []
min_avail = task['weight'] + self.capacity_overcommit
for host in self.hosts_by_bin.get(task['_bin'], []):
if (host['capacity'] > host['_load'] and
host['_ntasks'] < self.maxjobs and
host['capacity'] - host['_load'] > min_avail):
task['_hosts'].append(host)
logger.info(f'Task {task["task_id"]}: {len(task["_hosts"])} options')
for host in task['_hosts']:
# demand gives us a rough measure of how much overall load is pending for the host
host.setdefault('_demand', 0.0)
host['_demand'] += task['weight'] / len(task['_hosts'])
# tasks are already in priority order
for task in self.free_tasks:
hosts = self.hosts_by_bin.get(task['_bin'], [])
# these are the hosts that _can_ take this task
# TODO - update host ranks as we go
# TODO - pick a host and assign
# pick the host with least demand
task['_hosts'].sort(key=lambda h: h['_demand'])
min_avail = task['weight'] + self.capacity_overcommit
for host in task['_hosts']:
if (host['capacity'] > host['_load'] and
host['_ntasks'] < self.maxjobs and
host['capacity'] - host['_load'] > min_avail):
# add run entry
self.add_run(task, host)
# update our totals
host['_load'] += task['weight']
host['_ntasks'] += 1
def get_tasks(self):
"""Get the task data that we need for scheduling"""
@ -268,6 +278,7 @@ class TaskScheduler(object):
return hosts
def add_run(self, task, host):
insert = InsertProcessor('scheduler_runs')
insert.set(task_id=task['id'], host_id=host['id'], state=koji.TASK_STATES['ASSIGNED'])
insert = InsertProcessor('scheduler_task_runs')
insert.set(task_id=task['task_id'], host_id=host['id'], state=koji.TASK_STATES['ASSIGNED'])
insert.execute()
# TODO actually assign the task entry too