PR#1176: kojid: implement task_avail_delay check

Merges #1176
https://pagure.io/koji/pull-request/1176

Relates: #340
https://pagure.io/koji/issue/340
koji doesn't take plugins into account when median capacity is calculated
This commit is contained in:
Mike McLean 2018-12-03 15:54:20 -05:00
commit d424fb085b
3 changed files with 112 additions and 4 deletions

View file

@ -5829,6 +5829,7 @@ def get_options():
'failed_buildroot_lifetime': 3600 * 4,
'rpmbuild_timeout': 3600 * 24,
'oz_install_timeout': 0,
'task_avail_delay': 300,
'cert': None,
'ca': '', # FIXME: Unused, remove in next major release
'serverca': None}
@ -5836,7 +5837,8 @@ def get_options():
for name, value in config.items('kojid'):
if name in ['sleeptime', 'maxjobs', 'minspace', 'retry_interval',
'max_retries', 'offline_retry_interval', 'failed_buildroot_lifetime',
'timeout', 'rpmbuild_timeout', 'oz_install_timeout']:
'timeout', 'rpmbuild_timeout', 'oz_install_timeout',
'task_avail_delay']:
try:
defaults[name] = int(value)
except ValueError:

View file

@ -546,6 +546,7 @@ class TaskManager(object):
self.options = options
self.session = session
self.tasks = {}
self.skipped_tasks = {}
self.pids = {}
self.subsessions = {}
self.handlers = {}
@ -863,6 +864,7 @@ class TaskManager(object):
avail[bin] = [host['capacity'] - host['task_load'] for host in bin_hosts[bin]]
avail[bin].sort()
avail[bin].reverse()
self.cleanDelayTimes()
for task in tasks:
# note: tasks are in priority order
self.logger.debug("task: %r" % task)
@ -891,11 +893,12 @@ class TaskManager(object):
#accept this task)
bin_avail = avail.get(bin, [0])
self.logger.debug("available capacities for bin: %r" % bin_avail)
median = bin_avail[(len(bin_avail)-1)//2]
median = bin_avail[(len(bin_avail) - 1) // 2]
self.logger.debug("ours: %.2f, median: %.2f" % (our_avail, median))
if not self.checkRelAvail(bin_avail, our_avail):
#decline for now and give the upper half a chance
return False
if self.checkAvailDelay(task):
# decline for now and give the upper half a chance
return False
#otherwise, we attempt to open the task
if self.takeTask(task):
return True
@ -904,6 +907,38 @@ class TaskManager(object):
raise Exception("Invalid task state reported by server")
return False
def checkAvailDelay(self, task):
"""Check to see if we should still delay taking a task
Returns True if we are still in the delay period and should skip the
task. Otherwise False (delay has expired).
"""
now = time.time()
ts = self.skipped_tasks.get(task['id'])
if not ts:
self.skipped_tasks[task['id']] = now
return True
# else
delay = getattr(self.options, 'task_avail_delay', 180)
if now - ts < delay:
del self.skipped_tasks[task['id']]
return True
# otherwise
return False
def cleanDelayTimes(self):
"""Remove old entries from skipped_tasks"""
now = time.time()
delay = getattr(self.options, 'task_avail_delay', 180)
cutoff = now - delay * 10
# After 10x the delay, we've had plenty of opportunity to take the
# task, so either it has already been taken or we can't take it.
for task_id in list(self.skipped_tasks):
ts = self.skipped_tasks[task_id]
if ts < cutoff:
del self.skipped_tasks[task_id]
def checkRelAvail(self, bin_avail, avail):
"""
Check our available capacity against the capacity of other hosts in this bin.

View file

@ -0,0 +1,71 @@
from __future__ import absolute_import
import mock
try:
import unittest2 as unittest
except ImportError:
import unittest
import koji.daemon
import koji
class TestDelayTimes(unittest.TestCase):
def setUp(self):
self.options = mock.MagicMock()
self.session = mock.MagicMock()
self.tm = koji.daemon.TaskManager(self.options, self.session)
self.time = mock.patch('time.time').start()
def tearDown(self):
mock.patch.stopall()
def test_check_avail_delay(self):
self.options.task_avail_delay = 180 # same as default
# test skipped entry less than delay
start = 10000
task = {'id': 100}
self.tm.skipped_tasks = {task['id']: start}
self.time.return_value = start + 100
self.assertEqual(self.tm.checkAvailDelay(task), True)
# and greater than delay
self.time.return_value = start + 200
self.tm.skipped_tasks = {task['id']: start}
self.assertEqual(self.tm.checkAvailDelay(task), False)
# and no entry
self.time.return_value = start
self.tm.skipped_tasks = {}
self.assertEqual(self.tm.checkAvailDelay(task), True)
def test_clean_delay_times(self):
self.options.task_avail_delay = 180 # same as default
# test no skipped entries
start = 10000
self.time.return_value = start + 100
self.tm.skipped_tasks = {}
self.tm.cleanDelayTimes()
self.assertEqual(self.tm.skipped_tasks, {})
# test all skipped entries
self.time.return_value = start + 5000
skipped = {}
for i in range(25):
skipped[i] = start + i
# all older than 180 in age
self.tm.skipped_tasks = skipped
self.tm.cleanDelayTimes()
self.assertEqual(self.tm.skipped_tasks, {})
# test mixed entries
skipped = {100: start + 5000}
expected = skipped.copy()
for i in range(25):
skipped[i] = start + i
# all older than 180 in age
self.tm.skipped_tasks = skipped
self.tm.cleanDelayTimes()
self.assertEqual(self.tm.skipped_tasks, expected)