PR#4044: Update getNextTask for scheduler

Merges #4044
https://pagure.io/koji/pull-request/4044

Fixes: #4028
https://pagure.io/koji/issue/4028
Clean getNextTask
This commit is contained in:
Tomas Kopecek 2024-03-28 15:58:15 +01:00
commit c4b50c65c7
4 changed files with 247 additions and 222 deletions

View file

@ -154,10 +154,11 @@ def main(options, session):
signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGUSR1, restart)
exit_code = 0
taken = False
while True:
try:
taken = False
tm.updateBuildroots()
if not taken:
tm.updateBuildroots()
tm.updateTasks()
taken = tm.getNextTask()
except (SystemExit, ServerExit, KeyboardInterrupt):
@ -180,6 +181,7 @@ def main(options, session):
# XXX - this is a little extreme
# log the exception and continue
logger.error(''.join(traceback.format_exception(*sys.exc_info())))
taken = False
try:
if not taken:
# Only sleep if we didn't take a task, otherwise retry immediately.

View file

@ -1037,129 +1037,41 @@ class TaskManager(object):
return data
def getNextTask(self):
"""Task the next task
:returns: True if a task was taken, False otherwise
"""
self.ready = self.readyForTask()
self.session.host.updateHost(self.task_load, self.ready, data=self._get_host_data())
if not self.ready:
self.logger.info("Not ready for task")
return False
hosts, tasks = self.session.host.getLoadData()
self.logger.debug("Load Data:")
self.logger.debug(" hosts: %r" % hosts)
self.logger.debug(" tasks: %r" % tasks)
# now we organize this data into channel-arch bins
bin_hosts = {} # hosts indexed by bin
bins = {} # bins for this host
our_avail = None
for host in hosts:
host['bins'] = []
if host['id'] == self.host_id:
# note: task_load reported by server might differ from what we
# sent due to precision variation
our_avail = host['capacity'] - host['task_load']
for chan in host['channels']:
for arch in host['arches'].split() + ['noarch']:
bin = "%s:%s" % (chan, arch)
bin_hosts.setdefault(bin, []).append(host)
if host['id'] == self.host_id:
bins[bin] = 1
self.logger.debug("bins: %r" % bins)
if our_avail is None:
self.logger.info("Server did not report this host. Are we disabled?")
return False
elif not bins:
self.logger.info("No bins for this host. Missing channel/arch config?")
# Note: we may still take an assigned task below
# sort available capacities for each of our bins
avail = {}
for bin in bins:
avail[bin] = [host['capacity'] - host['task_load'] for host in bin_hosts[bin]]
avail[bin].sort()
avail[bin].reverse()
self.cleanDelayTimes()
# get our assigned tasks
tasks = self.session.host.getTasks()
for task in tasks:
# note: tasks are in priority order
self.logger.debug("task: %r" % task)
if task['method'] not in self.handlers:
self.logger.warning("Skipping task %(id)i, no handler for method %(method)s", task)
continue
if task['id'] in self.tasks:
# we were running this task, but it apparently has been
# freed or reassigned. We can't do anything with it until
# reassigned. We can't do anything with it until
# updateTasks notices this and cleans up.
self.logger.debug("Task %(id)s freed or reassigned", task)
self.logger.info("Task %(id)s reassigned", task)
continue
if task['state'] == koji.TASK_STATES['ASSIGNED']:
self.logger.debug("task is assigned")
if self.host_id == task['host_id']:
# assigned to us, we can take it regardless
if self.takeTask(task):
return True
elif task['state'] == koji.TASK_STATES['FREE']:
bin = "%(channel_id)s:%(arch)s" % task
self.logger.debug("task is free, bin=%r" % bin)
if bin not in bins:
continue
# see where our available capacity is compared to other hosts for this bin
# (note: the hosts in this bin are exactly those that could
# accept this task)
bin_avail = avail.get(bin, [0])
if self.checkAvailDelay(task, bin_avail, our_avail):
# decline for now and give the upper half a chance
continue
# otherwise, we attempt to open the task
if self.takeTask(task):
return True
else:
# should not happen
raise Exception("Invalid task state reported by server")
if task['state'] != koji.TASK_STATES['ASSIGNED']:
# shouldn't happen
self.logger.error("Recieved task %(id)s is not assigned, state=%(state)s", task)
continue
if task['host_id'] != self.host_id:
# shouldn't happen
self.logger.error("Recieved task %(id)s is not ours, host=%(host_id)s", task)
continue
# otherwise attempt to take it
if self.takeTask(task):
return True
return False
def checkAvailDelay(self, task, bin_avail, our_avail):
"""Check to see if we should still delay taking a task
Returns True if we are still in the delay period and should skip the
task. Otherwise False (delay has expired).
"""
now = time.time()
ts = self.skipped_tasks.get(task['id'])
if not ts:
ts = self.skipped_tasks[task['id']] = now
# determine our normalized bin rank
for pos, cap in enumerate(bin_avail):
if our_avail >= cap:
break
if len(bin_avail) > 1:
rank = float(pos) / (len(bin_avail) - 1)
else:
rank = 0.0
# so, 0.0 for highest available capacity, 1.0 for lowest
delay = getattr(self.options, 'task_avail_delay', 180)
delay *= rank
# return True if we should delay
if now - ts < delay:
self.logger.debug("skipping task %i, age=%s rank=%s"
% (task['id'], int(now - ts), rank))
return True
# otherwise
del self.skipped_tasks[task['id']]
return False
def cleanDelayTimes(self):
"""Remove old entries from skipped_tasks"""
now = time.time()
delay = getattr(self.options, 'task_avail_delay', 180)
cutoff = now - delay * 10
# After 10x the delay, we've had plenty of opportunity to take the
# task, so either it has already been taken or we can't take it.
for task_id in list(self.skipped_tasks):
ts = self.skipped_tasks[task_id]
if ts < cutoff:
del self.skipped_tasks[task_id]
def _waitTask(self, task_id, pid=None):
"""Wait (nohang) on the task, return true if finished"""
if pid is None:
@ -1421,7 +1333,9 @@ class TaskManager(object):
if method in self.handlers:
handlerClass = self.handlers[method]
else:
raise koji.GenericError("No handler found for method '%s'" % method)
self.logger.warning("Refusing task %(id)s, no handler for %(method)s", task)
self.session.host.refuseTask(task['id'], soft=False, msg="no handler for method")
return False
task_info = self.session.getTaskInfo(task['id'], request=True)
if task_info.get('request') is None:
self.logger.warning("Task '%s' has no request" % task['id'])

View file

@ -1,109 +0,0 @@
from __future__ import absolute_import
import mock
import unittest
import koji.daemon
import koji
class TestDelayTimes(unittest.TestCase):
def setUp(self):
self.options = mock.MagicMock()
self.session = mock.MagicMock()
self.tm = koji.daemon.TaskManager(self.options, self.session)
self.time = mock.patch('time.time').start()
def tearDown(self):
mock.patch.stopall()
def test_check_avail_delay(self):
self.options.task_avail_delay = 180 # same as default
# highest capacity, no skip entry
start = 10000
task = {'id': 100}
self.tm.skipped_tasks = {}
self.time.return_value = start
bin_avail = [10.0, 9.0, 8.0, 7.0]
our_avail = 10.1
chk = self.tm.checkAvailDelay(task, bin_avail, our_avail)
self.assertEqual(chk, False)
# not highest, no skip entry
our_avail = 9.0
self.tm.skipped_tasks = {}
chk = self.tm.checkAvailDelay(task, bin_avail, our_avail)
self.assertEqual(chk, True)
# last, but past full delay
self.tm.skipped_tasks = {task['id']: start}
our_avail = 7.0
self.options.task_avail_delay = 500
self.time.return_value = start + 500
chk = self.tm.checkAvailDelay(task, bin_avail, our_avail)
self.assertEqual(chk, False)
# last, but less than delay
self.tm.skipped_tasks = {task['id']: start}
our_avail = 7.0
self.time.return_value = start + 499
chk = self.tm.checkAvailDelay(task, bin_avail, our_avail)
self.assertEqual(chk, True)
# median, but less than scaled delay
self.tm.skipped_tasks = {task['id']: start}
bin_avail = [10.0, 9.0, 8.0, 7.0, 6.0]
our_avail = 8.0
# rank = 2/4 = 0.5, so adjusted delay is 250
self.time.return_value = start + 249
chk = self.tm.checkAvailDelay(task, bin_avail, our_avail)
self.assertEqual(chk, True)
# median, but past scaled delay
self.tm.skipped_tasks = {task['id']: start}
bin_avail = [10.0, 9.0, 8.0, 7.0, 6.0]
our_avail = 8.0
# rank = 3/4 = 0.75, so adjusted delay is 250
self.time.return_value = start + 476
chk = self.tm.checkAvailDelay(task, bin_avail, our_avail)
self.assertEqual(chk, False)
# only one in bin
self.tm.skipped_tasks = {}
bin_avail = [5.0]
our_avail = 5.0
self.time.return_value = start
chk = self.tm.checkAvailDelay(task, bin_avail, our_avail)
self.assertEqual(chk, False)
def test_clean_delay_times(self):
self.options.task_avail_delay = 180 # same as default
# test no skipped entries
start = 10000
self.time.return_value = start + 100
self.tm.skipped_tasks = {}
self.tm.cleanDelayTimes()
self.assertEqual(self.tm.skipped_tasks, {})
# test all skipped entries
self.time.return_value = start + 5000
skipped = {}
for i in range(25):
skipped[i] = start + i
# all older than 180 in age
self.tm.skipped_tasks = skipped
self.tm.cleanDelayTimes()
self.assertEqual(self.tm.skipped_tasks, {})
# test mixed entries
skipped = {100: start + 5000}
expected = skipped.copy()
for i in range(25):
skipped[i] = start + i
# all older than 180 in age
self.tm.skipped_tasks = skipped
self.tm.cleanDelayTimes()
self.assertEqual(self.tm.skipped_tasks, expected)

View file

@ -0,0 +1,218 @@
from __future__ import absolute_import
import mock
import unittest
import koji.daemon
import koji
class TestGetNextTask(unittest.TestCase):
def setUp(self):
self.options = mock.MagicMock()
self.session = mock.MagicMock()
self.tm = koji.daemon.TaskManager(self.options, self.session)
self.tm.readyForTask = mock.MagicMock()
self.tm.takeTask = mock.MagicMock()
self.time = mock.patch('time.time').start()
def tearDown(self):
mock.patch.stopall()
def test_not_ready(self):
self.tm.readyForTask.return_value = False
retval = self.tm.getNextTask()
self.assertEqual(retval, False)
self.session.host.getTasks.assert_not_called()
self.tm.takeTask.assert_not_called()
def test_no_tasks(self):
self.tm.host_id = host_id = 999
self.tm.readyForTask.return_value = True
self.session.host.getTasks.return_value = []
retval = self.tm.getNextTask()
self.assertEqual(retval, False)
self.session.host.getTasks.assert_called_once()
self.tm.takeTask.assert_not_called()
def test_one_good_task(self):
self.tm.host_id = host_id = 999
self.tm.readyForTask.return_value = True
self.tm.tasks = {3: 'already running'}
tasks = [
{'id': 1, 'state': koji.TASK_STATES['FREE'], 'host_id': None}, # bad state
{'id': 2, 'state': koji.TASK_STATES['ASSIGNED'], 'host_id': 666}, # wrong host
{'id': 3, 'state': koji.TASK_STATES['ASSIGNED'], 'host_id': host_id}, # already in tasks
{'id': 4, 'state': koji.TASK_STATES['ASSIGNED'], 'host_id': host_id}, # good
]
self.session.host.getTasks.return_value = tasks
retval = self.tm.getNextTask()
self.assertEqual(retval, True)
self.session.host.getTasks.assert_called_once()
self.tm.takeTask.assert_called_once_with(tasks[3])
class TestTakeTask(unittest.TestCase):
def setUp(self):
self.options = mock.MagicMock()
self.session = mock.MagicMock()
self.tm = koji.daemon.TaskManager(self.options, self.session)
self.tm.readyForTask = mock.MagicMock()
self.tm.runTask = mock.MagicMock()
self.tm.forkTask = mock.MagicMock()
self.time = mock.patch('time.time').start()
self.handler = mock.MagicMock()
self.tm.handlers = {'fake': mock.MagicMock(return_value=self.handler)}
def tearDown(self):
mock.patch.stopall()
def test_simple_fork(self):
task = {
'id': 4,
'state': koji.TASK_STATES['ASSIGNED'],
'method': 'fake',
}
self.handler.Foreground = False
self.session.host.openTask.return_value = task
self.tm.forkTask.return_value = ['PID', 'SESSION']
retval = self.tm.takeTask(task)
self.handler.setManager.assert_not_called()
self.tm.runTask.assert_not_called()
self.tm.forkTask.assert_called_once_with(self.handler)
self.assertEqual(self.tm.pids, {4: 'PID'})
self.assertEqual(self.tm.subsessions, {4: 'SESSION'})
self.assertEqual(retval, True)
def test_simple_foreground(self):
task = {
'id': 4,
'state': koji.TASK_STATES['ASSIGNED'],
'method': 'fake',
}
self.handler.Foreground = True
self.session.host.openTask.return_value = task
retval = self.tm.takeTask(task)
self.handler.setManager.assert_called_once_with(self.tm)
self.tm.runTask.assert_called_once_with(self.handler)
self.tm.forkTask.assert_not_called()
self.assertEqual(self.tm.pids, {})
self.assertEqual(self.tm.subsessions, {})
self.assertEqual(retval, True)
def test_refuse_no_handler(self):
task = {
'id': 4,
'state': koji.TASK_STATES['ASSIGNED'],
'method': 'missing',
}
retval = self.tm.takeTask(task)
self.assertEqual(retval, False)
self.session.host.refuseTask.assert_called_once()
self.session.getTaskInfo.assert_not_called()
self.session.host.openTask.assert_not_called()
self.tm.runTask.assert_not_called()
self.tm.forkTask.assert_not_called()
def test_skip_no_request(self):
task = {
'id': 4,
'state': koji.TASK_STATES['ASSIGNED'],
'method': 'fake',
}
self.session.getTaskInfo.return_value = {}
retval = self.tm.takeTask(task)
self.assertEqual(retval, False)
self.session.host.openTask.assert_not_called()
self.tm.runTask.assert_not_called()
self.tm.forkTask.assert_not_called()
def test_skip_bad_check(self):
task = {
'id': 4,
'state': koji.TASK_STATES['ASSIGNED'],
'method': 'fake',
}
self.handler.checkHost.side_effect = Exception('should refuse')
retval = self.tm.takeTask(task)
self.assertEqual(retval, False)
self.session.host.refuseTask.assert_called_once()
self.session.host.openTask.assert_not_called()
self.tm.runTask.assert_not_called()
self.tm.forkTask.assert_not_called()
def test_open_fails(self):
task = {
'id': 4,
'state': koji.TASK_STATES['ASSIGNED'],
'method': 'fake',
}
self.session.host.openTask.return_value = None
retval = self.tm.takeTask(task)
self.assertEqual(retval, False)
self.session.host.openTask.assert_called_once()
self.tm.runTask.assert_not_called()
self.tm.forkTask.assert_not_called()
def test_set_weight_fails(self):
task = {
'id': 4,
'state': koji.TASK_STATES['ASSIGNED'],
'method': 'fake',
'request': '...',
'host_id': 999,
}
self.session.host.openTask.return_value = task
self.session.host.setTaskWeight.side_effect = koji.ActionNotAllowed('should skip')
task2 = task.copy()
task2['host_id'] = 42
self.session.getTaskInfo.side_effect = [task, task2]
retval = self.tm.takeTask(task)
self.assertEqual(retval, False)
self.session.host.openTask.assert_called_once()
self.tm.runTask.assert_not_called()
self.tm.forkTask.assert_not_called()
def test_set_weight_fails_state(self):
self.tm.host_id = 999
task = {
'id': 4,
'state': koji.TASK_STATES['ASSIGNED'],
'method': 'fake',
'request': '...',
'host_id': self.tm.host_id,
}
self.session.host.openTask.return_value = task
self.session.host.setTaskWeight.side_effect = koji.ActionNotAllowed('should skip')
task2 = task.copy()
task2['state'] = koji.TASK_STATES['FREE']
self.session.getTaskInfo.side_effect = [task, task2]
retval = self.tm.takeTask(task)
self.assertEqual(retval, False)
self.session.host.openTask.assert_called_once()
self.tm.runTask.assert_not_called()
self.tm.forkTask.assert_not_called()