scale task_avail_delay based on bin rank
This commit is contained in:
parent
bfbdf314b3
commit
c4f7724647
1 changed files with 17 additions and 23 deletions
|
|
@ -899,11 +899,7 @@ class TaskManager(object):
|
|||
#(note: the hosts in this bin are exactly those that could
|
||||
#accept this task)
|
||||
bin_avail = avail.get(bin, [0])
|
||||
self.logger.debug("available capacities for bin: %r" % bin_avail)
|
||||
median = bin_avail[int((len(bin_avail) - 1) // 2)]
|
||||
self.logger.debug("ours: %.2f, median: %.2f" % (our_avail, median))
|
||||
if not self.checkRelAvail(bin_avail, our_avail):
|
||||
if self.checkAvailDelay(task):
|
||||
if self.checkAvailDelay(task, bin_avail, our_avail):
|
||||
# decline for now and give the upper half a chance
|
||||
return False
|
||||
#otherwise, we attempt to open the task
|
||||
|
|
@ -914,7 +910,7 @@ class TaskManager(object):
|
|||
raise Exception("Invalid task state reported by server")
|
||||
return False
|
||||
|
||||
def checkAvailDelay(self, task):
|
||||
def checkAvailDelay(self, task, bin_avail, our_avail):
|
||||
"""Check to see if we should still delay taking a task
|
||||
|
||||
Returns True if we are still in the delay period and should skip the
|
||||
|
|
@ -924,14 +920,25 @@ class TaskManager(object):
|
|||
now = time.time()
|
||||
ts = self.skipped_tasks.get(task['id'])
|
||||
if not ts:
|
||||
self.skipped_tasks[task['id']] = now
|
||||
return True
|
||||
# else
|
||||
ts = self.skipped_tasks[task['id']] = now
|
||||
|
||||
# determine our normalized bin rank
|
||||
for pos, cap in enumerate(bin_avail):
|
||||
if our_avail >= cap:
|
||||
break
|
||||
rank = float(pos) / len(bin_avail)
|
||||
# so, 0.0 for highest available capacity, 1.0 for lowest
|
||||
|
||||
delay = getattr(self.options, 'task_avail_delay', 180)
|
||||
delay *= rank
|
||||
|
||||
# return True if we should delay
|
||||
if now - ts < delay:
|
||||
del self.skipped_tasks[task['id']]
|
||||
self.logger.debug("skipping task %i, age=%s rank=%s"
|
||||
% (task['id'], int(now - ts), rank))
|
||||
return True
|
||||
# otherwise
|
||||
del self.skipped_tasks[task['id']]
|
||||
return False
|
||||
|
||||
def cleanDelayTimes(self):
|
||||
|
|
@ -946,19 +953,6 @@ class TaskManager(object):
|
|||
if ts < cutoff:
|
||||
del self.skipped_tasks[task_id]
|
||||
|
||||
def checkRelAvail(self, bin_avail, avail):
|
||||
"""
|
||||
Check our available capacity against the capacity of other hosts in this bin.
|
||||
Return True if we should take a task, False otherwise.
|
||||
"""
|
||||
median = bin_avail[int((len(bin_avail) - 1) // 2)]
|
||||
self.logger.debug("ours: %.2f, median: %.2f" % (avail, median))
|
||||
if avail >= median:
|
||||
return True
|
||||
else:
|
||||
self.logger.debug("Skipping - available capacity in lower half")
|
||||
return False
|
||||
|
||||
def _waitTask(self, task_id, pid=None):
|
||||
"""Wait (nohang) on the task, return true if finished"""
|
||||
if pid is None:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue