PR#3772: Scheduler part 1

Merges #3772
https://pagure.io/koji/pull-request/3772

Fixes #3877
https://pagure.io/koji/issue/3877
This commit is contained in:
Tomas Kopecek 2023-10-03 15:23:04 +02:00
commit b872c0dd5e
13 changed files with 1055 additions and 143 deletions

View file

@ -7899,3 +7899,125 @@ def anon_handle_repoinfo(goptions, session, args):
# repoID option added in 1.33
if options.buildroots:
warn("--buildroots option is available with hub 1.33 or newer")
def _format_ts(ts):
if ts:
return time.strftime("%y-%m-%d %H:%M:%S", time.localtime(ts))
else:
return ''
def anon_handle_scheduler_info(goptions, session, args):
"""[monitor] Show information about scheduling"""
usage = "usage: %prog schedulerinfo [options]"
parser = OptionParser(usage=get_usage_str(usage))
parser.add_option("-t", "--task", action="store", type=int, default=None,
help="Limit data to given task id")
parser.add_option("--host", action="store", default=None,
help="Limit data to given builder id")
parser.add_option("--state", action="store", type='choice', default=None,
choices=[x for x in koji.TASK_STATES.keys()],
help="Limit data to task state")
(options, args) = parser.parse_args(args)
if len(args) > 0:
parser.error("This command takes no arguments")
ensure_connection(session, goptions)
host_id = None
if options.host:
try:
host_id = int(options.host)
except ValueError:
host_id = session.getHost(options.host, strict=True)['id']
# get the data
clauses = []
if options.task:
clauses.append(('task_id', options.task))
if options.host:
clauses.append(('host_id', options.host))
if options.state:
clauses.append(('state', koji.TASK_STATES[options.state]))
runs = session.scheduler.getTaskRuns(
clauses=clauses,
fields=('task_id', 'host_name', 'state', 'create_ts', 'start_ts', 'completion_ts')
)
mask = '%(task_id)-9s %(host_name)-20s %(state)-7s ' \
'%(create_ts)-17s %(start_ts)-17s %(completion_ts)-17s'
if not goptions.quiet:
header = mask % {
'task_id': 'Task',
'host_name': 'Host',
'state': 'State',
'create_ts': 'Created',
'start_ts': 'Started',
'completion_ts': 'Ended',
}
print(header)
print('-' * len(header))
for run in runs:
run['state'] = koji.TASK_STATES[run['state']]
for ts in ('create_ts', 'start_ts', 'completion_ts'):
run[ts] = _format_ts(run[ts])
print(mask % run)
if host_id:
print('Host data for %s:' % options.host)
host_data = session.scheduler.getHostData(hostID=host_id)
if len(host_data) > 0:
print(host_data[0]['data'])
else:
print('-')
def handle_scheduler_logs(goptions, session, args):
"[monitor] Query scheduler logs"
usage = "usage: %prog scheduler-logs <options>"
parser = OptionParser(usage=get_usage_str(usage))
parser.add_option("--task", type="int", action="store",
help="Filter by task ID")
parser.add_option("--host", type="str", action="store",
help="Filter by host (name/ID)")
parser.add_option("--from", type="float", action="store", dest="from_ts",
help="Logs from given timestamp")
parser.add_option("--to", type="float", action="store", dest="to_ts",
help="Logs until given timestamp (included)")
(options, args) = parser.parse_args(args)
if len(args) != 0:
parser.error("There are no arguments for this command")
clauses = []
if options.task:
clauses.append(['task_id', options.task])
if options.host:
try:
host_id = int(options.host)
except ValueError:
host_id = session.getHost(options.host)['id']
clauses.append(['host_id', host_id])
if options.from_ts:
clauses.append(['msg_ts', '>=', options.from_ts])
if options.to_ts:
clauses.append(['msg_ts', '<', options.to_ts])
logs = session.scheduler.getLogMessages(clauses, fields=('task_id', 'host_id', 'host_name',
'msg_ts', 'msg'))
for log in logs:
log['time'] = time.asctime(time.localtime(log['msg_ts']))
mask = ("%(task_id)s\t%(host_name)s\t%(time)s\t%(msg)s")
if not goptions.quiet:
h = mask % {
'task_id': 'Task',
'host_name': 'Host',
'time': 'Time',
'msg': 'Message',
}
print(h)
print('-' * len(h))
for log in logs:
print(mask % log)

View file

@ -261,12 +261,13 @@ class TaskWatcher(object):
# not finished either. info would be none.
if not info:
return 'unknown'
if info['state'] == koji.TASK_STATES['OPEN']:
if koji.TASK_STATES[info['state']] in ['OPEN', 'ASSIGNED']:
state = koji.TASK_STATES[info['state']].lower()
if info['host_id']:
host = self.session.getHost(info['host_id'])
return 'open (%s)' % host['name']
return '%s (%s)' % (state, host['name'])
else:
return 'open'
return state
elif info['state'] == koji.TASK_STATES['FAILED']:
s = 'FAILED: %s' % self.get_failure()

View file

@ -0,0 +1,51 @@
-- upgrade script to migrate the Koji database schema
-- from version 1.33 to 1.34
BEGIN;
-- scheduler tables
CREATE TABLE scheduler_task_runs (
id SERIAL NOT NULL PRIMARY KEY,
task_id INTEGER REFERENCES task (id) NOT NULL,
host_id INTEGER REFERENCES host (id) NOT NULL,
active BOOLEAN NOT NULL DEFAULT TRUE,
create_time TIMESTAMPTZ NOT NULL DEFAULT NOW()
) WITHOUT OIDS;
CREATE INDEX scheduler_task_runs_task ON scheduler_task_runs(task_id);
CREATE INDEX scheduler_task_runs_host ON scheduler_task_runs(host_id);
CREATE INDEX scheduler_task_runs_create_time ON scheduler_task_runs(create_time);
CREATE TABLE scheduler_host_data (
host_id INTEGER REFERENCES host (id) PRIMARY KEY,
data JSONB
) WITHOUT OIDS;
CREATE TABLE scheduler_sys_data (
name TEXT NOT NULL PRIMARY KEY,
data JSONB
) WITHOUT OIDS;
CREATE TABLE scheduler_task_refusals (
id SERIAL NOT NULL PRIMARY KEY,
task_id INTEGER REFERENCES task (id) NOT NULL,
host_id INTEGER REFERENCES host (id) NOT NULL,
by_host BOOLEAN NOT NULL,
soft BOOLEAN NOT NULL DEFAULT FALSE,
msg TEXT,
time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (task_id, host_id)
) WITHOUT OIDS;
CREATE TABLE scheduler_log_messages (
id SERIAL NOT NULL PRIMARY KEY,
task_id INTEGER REFERENCES task (id),
host_id INTEGER REFERENCES host (id),
msg_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
msg TEXT NOT NULL
) WITHOUT OIDS;
INSERT INTO locks(name) VALUES('scheduler');
COMMIT;

View file

@ -1429,6 +1429,8 @@ class TaskManager(object):
if not valid_host:
self.logger.info(
'Skipping task %s (%s) due to host check', task['id'], task['method'])
if task['state'] == koji.TASK_STATES['ASSIGNED']:
self.session.host.refuseTask(task['id'], soft=False, msg='failed host check')
return False
data = self.session.host.openTask(task['id'])
if data is None:

View file

@ -444,6 +444,42 @@ class InsertProcessor(object):
return _dml(str(self), self.data)
class UpsertProcessor(InsertProcessor):
"""Build a basic upsert statement
table - the table to insert into
data - a dictionary of data to insert (keys = row names)
rawdata - data to insert specified as sql expressions rather than python values
keys - the rows that are the unique keys
skip_dup - if set to true, do nothing on conflict
"""
def __init__(self, table, data=None, rawdata=None, keys=None, skip_dup=False):
super(UpsertProcessor, self).__init__(table, data=data, rawdata=rawdata)
self.keys = keys
self.skip_dup = skip_dup
if not keys and not skip_dup:
raise ValueError('either keys or skip_dup must be set')
def __repr__(self):
return "<UpsertProcessor: %r>" % vars(self)
def __str__(self):
insert = super(UpsertProcessor, self).__str__()
parts = [insert]
if self.skip_dup:
parts.append(' ON CONFLICT DO NOTHING')
else:
parts.append(f' ON CONFLICT ({",".join(self.keys)}) DO UPDATE SET ')
# filter out conflict keys from data
data = {k: self.data[k] for k in self.data if k not in self.keys}
rawdata = {k: self.rawdata[k] for k in self.rawdata if k not in self.keys}
assigns = [f"{key} = %({key})s" for key in data]
assigns.extend([f"{key} = ({rawdata[key]})" for key in self.rawdata])
parts.append(', '.join(sorted(assigns)))
return ''.join(parts)
class UpdateProcessor(object):
"""Build an update statement
@ -834,6 +870,99 @@ SELECT %(col_str)s
return results
class QueryView:
# abstract base class
# subclasses should provide...
tables = []
joins = []
joinmap = {}
fieldmap = {}
default_fields = ()
def __init__(self, clauses=None, fields=None, opts=None):
self.extra_joins = []
self.values = {}
tables = list(self.tables) # copy
fields = self.get_fields(fields)
fields, aliases = zip(*fields.items())
clauses = self.get_clauses(clauses)
joins = self.get_joins()
self.query = QueryProcessor(
columns=fields, aliases=aliases,
tables=tables, joins=joins,
clauses=clauses, values=self.values,
opts=opts)
def get_fields(self, fields):
fields = fields or self.default_fields
if not fields or fields == '*':
fields = sorted(self.fieldmap.keys())
return {self.map_field(f): f for f in fields}
def map_field(self, field):
f_info = self.fieldmap.get(field)
if f_info is None:
raise koji.ParameterError(f'Invalid field for query {field}')
fullname, joinkey = f_info
fullname = fullname or field
if joinkey:
self.extra_joins.append(joinkey)
# duplicates removed later
return fullname
def get_clauses(self, clauses):
# for now, just a very simple implementation
result = []
clauses = clauses or []
for n, clause in enumerate(clauses):
# TODO checks check checks
if len(clause) == 2:
# implicit operator
field, value = clause
if isinstance(value, (list, tuple)):
op = 'IN'
else:
op = '='
elif len(clause) == 3:
field, op, value = clause
op = op.upper()
if op not in ('IN', '=', '!=', '>', '<', '>=', '<='):
raise koji.ParameterError(f'Invalid operator: {op}')
else:
raise koji.ParameterError(f'Invalid clause: {clause}')
fullname = self.map_field(field)
key = f'v_{field}_{n}'
self.values[key] = value
result.append(f'{fullname} {op} %({key})s')
return result
def get_joins(self):
joins = list(self.joins)
seen = set()
# note we preserve the order that extra joins were added
for joinkey in self.extra_joins:
if joinkey in seen:
continue
seen.add(joinkey)
joins.append(self.joinmap[joinkey])
return joins
def execute(self):
return self.query.execute()
def executeOne(self, strict=False):
return self.query.executeOne(strict=strict)
def iterate(self):
return self.query.iterate()
def singleValue(self, strict=True):
return self.query.singleValue(strict=strict)
class BulkInsertProcessor(object):
def __init__(self, table, data=None, columns=None, strict=True, batch=1000):
"""Do bulk inserts - it has some limitations compared to

View file

@ -74,6 +74,7 @@ from koji.util import (
multi_fnmatch,
safer_move,
)
from . import scheduler
from .auth import get_user_perms, get_user_groups
from .db import ( # noqa: F401
BulkInsertProcessor,
@ -82,6 +83,7 @@ from .db import ( # noqa: F401
QueryProcessor,
Savepoint,
UpdateProcessor,
UpsertProcessor,
_applyQueryOpts,
_dml,
_fetchSingle,
@ -387,6 +389,7 @@ class Task(object):
data={'result': info['result'], 'state': state},
rawdata={'completion_time': 'NOW()'})
update.execute()
self.runCallbacks('postTaskStateChange', info, 'state', state)
self.runCallbacks('postTaskStateChange', info, 'completion_ts', now)
@ -2534,44 +2537,6 @@ def set_channel_enabled(channelname, enabled=True, comment=None):
update.execute()
def get_ready_hosts():
"""Return information about hosts that are ready to build.
Hosts set the ready flag themselves
Note: We ignore hosts that are late checking in (even if a host
is busy with tasks, it should be checking in quite often).
"""
query = QueryProcessor(
tables=['host'],
columns=['host.id', 'name', 'arches', 'task_load', 'capacity'],
aliases=['id', 'name', 'arches', 'task_load', 'capacity'],
clauses=[
'enabled IS TRUE',
'ready IS TRUE',
'expired IS FALSE',
'master IS NULL',
'active IS TRUE',
"sessions.update_time > NOW() - '5 minutes'::interval"
],
joins=[
'sessions USING (user_id)',
'host_config ON host.id = host_config.host_id'
]
)
hosts = query.execute()
for host in hosts:
query = QueryProcessor(
tables=['host_channels'],
columns=['channel_id'],
clauses=['host_id=%(id)s', 'active IS TRUE', 'enabled IS TRUE'],
joins=['channels ON host_channels.channel_id = channels.id'],
values=host
)
rows = query.execute()
host['channels'] = [row['channel_id'] for row in rows]
return hosts
def get_all_arches():
"""Return a list of all (canonical) arches available from hosts"""
ret = {}
@ -2587,27 +2552,6 @@ def get_all_arches():
return list(ret.keys())
def get_active_tasks(host=None):
"""Return data on tasks that are yet to be run"""
fields = ['id', 'state', 'channel_id', 'host_id', 'arch', 'method', 'priority', 'create_time']
values = dslice(koji.TASK_STATES, ('FREE', 'ASSIGNED'))
if host:
values['arches'] = host['arches'].split() + ['noarch']
values['channels'] = host['channels']
values['host_id'] = host['id']
clause = '(state = %(ASSIGNED)i AND host_id = %(host_id)i)'
if values['channels']:
clause += ''' OR (state = %(FREE)i AND arch IN %(arches)s \
AND channel_id IN %(channels)s)'''
clauses = [clause]
else:
clauses = ['state IN (%(FREE)i,%(ASSIGNED)i)']
queryOpts = {'limit': 100, 'order': 'priority,create_time'}
query = QueryProcessor(columns=fields, tables=['task'], clauses=clauses,
values=values, opts=queryOpts)
return query.execute()
def get_task_descendents(task, childMap=None, request=False):
if childMap is None:
childMap = {}
@ -14324,18 +14268,17 @@ class Host(object):
def getLoadData(self):
"""Get load balancing data
This data is relatively small and the necessary load analysis is
relatively complex, so we let the host machines crunch it."""
hosts = get_ready_hosts()
for host in hosts:
if host['id'] == self.id:
break
else:
# this host not in ready list
return [[], []]
# host is the host making the call
tasks = get_active_tasks(host)
return [hosts, tasks]
This call is here for backwards compatibility.
Originally, it returned broad information about all hosts and tasks so that individual
hosts could make informed decisions about which task to take.
Now it presents only data for the calling host and the tasks that have been assigned to
it"""
host = get_host(self.id)
host['channels'] = [c['id'] for c in list_channels(hostID=self.id)]
tasks = scheduler.get_tasks_for_host(hostID=self.id, retry=True)
return [[host], tasks]
def isEnabled(self):
"""Return whether this host is enabled or not."""
@ -14409,6 +14352,43 @@ class HostExports(object):
task.assertHost(host.id)
return task.setWeight(weight)
def setHostData(self, hostdata):
"""Builder will update all its resources
Initial implementation contains:
- available task methods
- maxjobs
- host readiness
"""
host = Host()
host.verify()
upsert = UpsertProcessor(
table='scheduler_host_data',
keys=['host_id'],
data={'host_id': host.id, 'data': hostdata},
)
upsert.execute()
def getTasks(self):
host = Host()
host.verify()
return scheduler.get_tasks_for_host(hostID=host.id, retry=True)
def refuseTask(self, task_id, soft=True, msg=''):
soft = convert_value(soft, cast=bool)
msg = convert_value(msg, cast=str)
host = Host()
host.verify()
task = Task(task_id)
tinfo = task.getInfo(strict=True)
if tinfo['host_id'] != host.id:
logger.warning('Host %s refused unrelated task: %s', host.id, tinfo['id'])
return
scheduler.set_refusal(host.id, tinfo['id'], soft=soft, msg=msg, by_host=True)
# also free the task
task.free()
def getHostTasks(self):
host = Host()
host.verify()

View file

@ -41,6 +41,7 @@ from koji.server import ServerError, BadRequest, RequestTimeout
from koji.xmlrpcplus import ExtendedMarshaller, Fault, dumps, getparser
from . import auth
from . import db
from . import scheduler
class Marshaller(ExtendedMarshaller):
@ -502,6 +503,15 @@ def load_config(environ):
['RPMDefaultChecksums', 'string', 'md5 sha256'],
['SessionRenewalTimeout', 'integer', 1440],
# scheduler options
['MaxJobs', 'integer', 15],
['CapacityOvercommit', 'integer', 5],
['ReadyTimeout', 'integer', 180],
['AssignTimeout', 'integer', 300],
['SoftRefusalTimeout', 'integer', 900],
['HostTimeout', 'integer', 900],
['RunInterval', 'integer', 60],
]
opts = {}
for name, dtype, default in cfgmap:
@ -844,8 +854,10 @@ def get_registry(opts, plugins):
registry = HandlerRegistry()
functions = kojihub.RootExports()
hostFunctions = kojihub.HostExports()
schedulerFunctions = scheduler.SchedulerExports()
registry.register_instance(functions)
registry.register_module(hostFunctions, "host")
registry.register_module(schedulerFunctions, "scheduler")
registry.register_function(auth.login)
registry.register_function(auth.sslLogin)
registry.register_function(auth.logout)

605
kojihub/scheduler.py Normal file
View file

@ -0,0 +1,605 @@
import json
import logging
import time
import koji
from koji.context import context
from . import kojihub
from .db import QueryProcessor, InsertProcessor, UpsertProcessor, UpdateProcessor, \
DeleteProcessor, QueryView, db_lock
logger = logging.getLogger('koji.scheduler')
def log_db(msg, task_id=None, host_id=None):
insert = InsertProcessor(
'scheduler_log_messages',
data={'msg': msg, 'task_id': task_id, 'host_id': host_id},
)
insert.execute()
def log_both(msg, task_id=None, host_id=None, level=logging.INFO):
pre1 = f"[task_id={task_id}] " if task_id else ""
pre2 = f"[host_id={host_id}] " if host_id else ""
logger.log(level, '%s%s%s', pre1, pre2, msg)
log_db(msg, task_id, host_id)
class LogMessagesQuery(QueryView):
tables = ['scheduler_log_messages']
joinmap = {
# outer joins because these fields can be null
'task': 'LEFT JOIN task ON scheduler_log_messages.task_id = task.id',
'host': 'LEFT JOIN host ON scheduler_log_messages.host_id = host.id',
}
fieldmap = {
'id': ['scheduler_log_messages.id', None],
'task_id': ['scheduler_log_messages.task_id', None],
'host_id': ['scheduler_log_messages.host_id', None],
'msg_ts': ["date_part('epoch', scheduler_log_messages.msg_time)", None],
'msg': ['scheduler_log_messages.msg', None],
'method': ['task.method', 'task'],
'state': ['task.state', 'task'],
'owner': ['task.owner', 'task'],
'arch': ['task.arch', 'task'],
'channel_id': ['task.channel_id', 'task'],
'host_name': ['host.name', 'host'],
'host_ready': ['host.ready', 'host'],
}
default_fields = ('id', 'task_id', 'host_id', 'msg', 'msg_ts')
def get_log_messages(clauses=None, fields=None):
return LogMessagesQuery(clauses, fields).execute()
def get_tasks_for_host(hostID, retry=True):
"""Get the tasks assigned to a given host"""
hostID = kojihub.convert_value(hostID, cast=int, none_allowed=True)
fields = (
('task.id', 'id'),
('task.state', 'state'),
('task.channel_id', 'channel_id'),
('task.host_id', 'host_id'),
('task.arch', 'arch'),
('task.method', 'method'),
('task.priority', 'priority'),
("date_part('epoch', create_time)", 'create_ts'),
)
fields, aliases = zip(*fields)
query = QueryProcessor(
columns=fields, aliases=aliases, tables=['task'],
clauses=['host_id = %(hostID)s', 'state=%(assigned)s'],
values={'hostID': hostID, 'assigned': koji.TASK_STATES['ASSIGNED']},
)
tasks = query.execute()
if not tasks and retry:
# run scheduler and try again
TaskScheduler().run()
tasks = query.execute()
return tasks
def set_refusal(hostID, taskID, soft=True, by_host=False, msg=''):
data = {
'task_id': kojihub.convert_value(hostID, cast=int),
'host_id': kojihub.convert_value(taskID, cast=int),
'soft': kojihub.convert_value(soft, cast=bool),
'by_host': kojihub.convert_value(by_host, cast=bool),
'msg': kojihub.convert_value(msg, cast=str),
}
upsert = UpsertProcessor('scheduler_task_refusals', data=data, keys=('task_id', 'host_id'))
upsert.execute()
log_both('Host refused task', task_id=taskID, host_id=hostID)
class TaskRefusalsQuery(QueryView):
tables = ['scheduler_task_refusals']
joinmap = {
'task': 'task ON scheduler_task_refusals.task_id = task.id',
'host': 'host ON scheduler_task_refusals.host_id = host.id',
}
fieldmap = {
'id': ['scheduler_task_refusals.id', None],
'task_id': ['scheduler_task_refusals.task_id', None],
'host_id': ['scheduler_task_refusals.host_id', None],
'by_host': ['scheduler_task_refusals.by_host', None],
'soft': ['scheduler_task_refusals.soft', None],
'msg': ['scheduler_task_refusals.msg', None],
'ts': ["date_part('epoch', scheduler_task_refusals.time)", None],
'method': ['task.method', 'task'],
'state': ['task.state', 'task'],
'owner': ['task.owner', 'task'],
'arch': ['task.arch', 'task'],
'channel_id': ['task.channel_id', 'task'],
'host_name': ['host.name', 'host'],
'host_ready': ['host.ready', 'host'],
}
default_fields = ('id', 'task_id', 'host_id', 'by_host', 'soft', 'msg', 'ts')
def get_task_refusals(clauses=None, fields=None):
return TaskRefusalsQuery(clauses, fields).execute()
def get_host_data(hostID=None):
"""Return actual builder data
:param int hostID: Return data for given host (otherwise for all)
:returns list[dict]: list of host_id/data dicts
"""
clauses = []
columns = ['host_id', 'data']
if hostID is not None:
clauses.append('host_id = %(hostID)i')
query = QueryProcessor(
tables=['scheduler_host_data'],
clauses=clauses,
columns=columns,
values=locals(),
opts={'order': 'host_id'}
)
return query.execute()
class TaskRunsQuery(QueryView):
tables = ['scheduler_task_runs']
joinmap = {
'task': 'task ON scheduler_task_runs.task_id = task.id',
'host': 'host ON scheduler_task_runs.host_id = host.id',
}
fieldmap = {
'id': ['scheduler_task_runs.id', None],
'task_id': ['scheduler_task_runs.task_id', None],
'method': ['task.method', 'task'],
'state': ['task.state', 'task'],
'owner': ['task.owner', 'task'],
'arch': ['task.arch', 'task'],
'channel_id': ['task.channel_id', 'task'],
'host_name': ['host.name', 'host'],
'host_ready': ['host.ready', 'host'],
'host_id': ['scheduler_task_runs.host_id', None],
'active': ['scheduler_task_runs.active', None],
'create_ts': ["date_part('epoch', scheduler_task_runs.create_time)", None],
'start_ts': ["date_part('epoch', task.start_time)", 'task'],
'completion_ts': ["date_part('epoch', task.completion_time)", 'task'],
}
default_fields = ('id', 'task_id', 'host_id', 'active', 'create_ts')
def get_task_runs(clauses=None, fields=None):
return TaskRunsQuery(clauses, fields).execute()
class TaskScheduler(object):
def __init__(self):
self.hosts_by_bin = {}
self.hosts = {}
self.active_tasks = []
self.free_tasks = []
# TODO these things need proper config
self.maxjobs = context.opts['MaxJobs']
self.capacity_overcommit = context.opts['CapacityOvercommit']
self.ready_timeout = context.opts['ReadyTimeout']
self.assign_timeout = context.opts['AssignTimeout']
self.soft_refusal_timeout = context.opts['SoftRefusalTimeout']
self.host_timeout = context.opts['HostTimeout']
self.run_interval = context.opts['RunInterval']
def run(self, force=False):
if not db_lock('scheduler', wait=force):
# already running elsewhere
return False
if not force and not self.check_ts():
# already ran too recently
return False
logger.info('Running task scheduler')
self.get_tasks()
self.get_hosts()
self.check_hosts()
self.do_schedule()
self.check_active_tasks()
return True
def check_ts(self):
"""Check the last run timestamp
Returns True if the scheduler should run, False otherwise
"""
# get last ts
query = QueryProcessor(
tables=['scheduler_sys_data'],
columns=['data'],
clauses=['name = %(name)s'],
values={'name': 'last_run_ts'},
)
last = query.singleValue(strict=False) or 0
now = time.time()
delta = now - last
if delta < 0:
logger.error('Last run in the future by %i seconds', -delta)
ret = False
# update the ts so that a system time rollback doesn't keep us from running
elif delta < self.run_interval:
logger.debug('Skipping run due to run_interval setting')
# return now without updating ts
return False
else:
ret = True
# save current ts
upsert = UpsertProcessor(
'scheduler_sys_data',
data={'name': 'last_run_ts',
'data': json.dumps(now)},
keys=['name'],
)
upsert.execute()
return ret
def do_schedule(self):
# debug
logger.info(f'Hosts: {len(self.hosts)}')
logger.info(f'Free tasks: {len(self.free_tasks)}')
logger.info(f'Active tasks: {len(self.active_tasks)}')
# calculate host load and task count
for task in self.active_tasks:
# for now, we mirror what kojid updateTasks has been doing
host = self.hosts.get(task['host_id'])
if not host:
# not showing as ready
# TODO log and deal with this condition
continue
host.setdefault('_load', 0.0)
if not task['waiting']:
host['_load'] += task['weight']
host.setdefault('_ntasks', 0)
host['_ntasks'] += 1
for host in self.hosts.values():
host.setdefault('_load', 0.0)
host.setdefault('_ntasks', 0)
host.setdefault('_demand', 0.0)
# temporary test code
logger.info(f'Host: {host}')
ldiff = host['task_load'] - host['_load']
if abs(ldiff) > 0.01:
# this is expected in a number of cases, just observing
logger.info(f'Host load differs by {ldiff:.2f}: {host}')
# figure out which hosts *can* take each task
# at the moment this is mostly just bin, but in the future it will be more complex
refusals = self.get_refusals()
for task in self.free_tasks:
task['_hosts'] = []
min_avail = min(0, task['weight'] - self.capacity_overcommit)
h_refused = refusals.get(task['task_id'], {})
for host in self.hosts_by_bin.get(task['_bin'], []):
if (host['ready'] and host['_ntasks'] < self.maxjobs and
host['capacity'] - host['_load'] > min_avail and
host['id'] not in h_refused):
task['_hosts'].append(host)
logger.info(f'Task {task["task_id"]}: {len(task["_hosts"])} options')
for host in task['_hosts']:
# demand gives us a rough measure of how much overall load is pending for the host
host.setdefault('_demand', 0.0)
host['_demand'] += task['weight'] / len(task['_hosts'])
# normalize demand to 1
max_demand = sum([h['_demand'] for h in self.hosts.values()])
if max_demand > 0.0:
for h in self.hosts.values():
h['_demand'] = (h['_demand'] / max_demand)
for h in self.hosts.values():
self._rank_host(h)
# tasks are already in priority order
for task in self.free_tasks:
min_avail = task['weight'] - self.capacity_overcommit
task['_hosts'].sort(key=lambda h: h['_rank'])
logger.debug('Task %i choices: %s', task['task_id'],
[(h['name'], "%(_rank).2f" % h) for h in task['_hosts']])
for host in task['_hosts']:
if (host['capacity'] - host['_load'] > min_avail and
host['_ntasks'] < self.maxjobs):
# add run entry
self.add_run(task, host)
# update our totals and rank
host['_load'] += task['weight']
host['_ntasks'] += 1
self._rank_host(host)
break
else:
logger.debug('Could not assign task %s', task['task_id'])
def _rank_host(self, host):
host['_rank'] = host['_load'] + host['_ntasks'] + host['_demand']
def check_active_tasks(self):
"""Check on active tasks"""
runs = self.get_active_runs()
logger.info('Found %i active runs', len(runs))
logger.info('Checking on %i active tasks', len(self.active_tasks))
for task in self.active_tasks:
if not task['host_id']:
log_both('Active task with no host', task_id=task['task_id'], level=logging.ERROR)
kojihub.Task(task['task_id']).free()
continue
host = self.hosts.get(task['host_id'])
if not host:
# host disabled?
# TODO
continue
taskruns = runs.get(task['task_id'], [])
if not taskruns:
log_both('Assigned task with no active run entry', task_id=task['task_id'],
host_id=host['id'], level=logging.ERROR)
kojihub.Task(task['task_id']).free()
continue
if len(taskruns) > 1:
logger.error('Multiple active run entries for assigned task %(task_id)s',
task)
# TODO fix
if task['state'] == koji.TASK_STATES['ASSIGNED']:
# TODO check time since assigned
# if not taken within a timeout
# - if host not checking in, then make sure host marked unavail and free
# - if host *is* checking in, then treat as refusal and free
age = time.time() - min([r['create_ts'] for r in taskruns])
if age > self.assign_timeout:
log_both('Task assignment timeout', task_id=task['task_id'],
host_id=host['id'])
kojihub.Task(task['task_id']).free()
elif task['state'] == koji.TASK_STATES['OPEN']:
if host['update_ts'] is None:
# shouldn't happen?
# fall back to task_run time
age = time.time() - min([r['create_ts'] for r in taskruns])
else:
age = time.time() - host['update_ts']
if age > self.host_timeout:
log_both('Freeing task from unresponsive host', task_id=task['task_id'],
host_id=host['id'])
kojihub.Task(task['task_id']).free()
# end stale runs
update = UpdateProcessor(
'scheduler_task_runs',
data={'active': False},
clauses=['active = TRUE',
'(SELECT id FROM task WHERE task.id=task_id AND '
'state IN %(states)s) IS NULL'],
values={'states': [koji.TASK_STATES[s] for s in ('OPEN', 'ASSIGNED')]},
)
update.execute()
def check_hosts(self):
# sanity check ready status
hosts_to_mark = []
for host in self.hosts.values():
if not host['ready']:
continue
if (host['update_ts'] is None or time.time() - host['update_ts'] > self.ready_timeout):
hosts_to_mark.append(host)
log_both('Marking host not ready', host_id=host['id'])
if hosts_to_mark:
update = UpdateProcessor(
'host',
data={'ready': False},
clauses=['id IN %(host_ids)s'],
values={'host_ids': [h['id'] for h in hosts_to_mark]},
)
update.execute()
def get_active_runs(self):
runs = get_task_runs([["active", True]])
runs_by_task = {}
for run in runs:
runs_by_task.setdefault(run['task_id'], [])
runs_by_task[run['task_id']].append(run)
return runs_by_task
def get_tasks(self):
"""Get the task data that we need for scheduling"""
fields = (
('task.id', 'task_id'),
('task.state', 'state'),
('task.waiting', 'waiting'),
('task.weight', 'weight'),
('channel_id', 'channel_id'),
('task.host_id', 'host_id'),
('arch', 'arch'),
('method', 'method'),
('priority', 'priority'),
("date_part('epoch', task.create_time)", 'create_ts'),
# ('scheduler_task_runs.id', 'run_id'),
)
fields, aliases = zip(*fields)
values = {'states': [koji.TASK_STATES[n] for n in ('ASSIGNED', 'OPEN')]}
query = QueryProcessor(
columns=fields, aliases=aliases, tables=['task'],
clauses=('task.state IN %(states)s',
'task.host_id IS NOT NULL', # should always be set, but...
),
values=values,
)
active_tasks = query.execute()
values = {'state': koji.TASK_STATES['FREE']}
query = QueryProcessor(
columns=fields, aliases=aliases, tables=['task'],
clauses=('task.state = %(state)s',),
values=values,
opts={'order': 'priority,create_ts', 'limit': 1000}, # TODO config
# scheduler order
# lower priority numbers take precedence, like posix process priority
# at a given priority, earlier creation times take precedence
)
free_tasks = query.execute()
for task in free_tasks:
tbin = '%(channel_id)s:%(arch)s' % task
task['_bin'] = tbin
for task in active_tasks:
tbin = '%(channel_id)s:%(arch)s' % task
task['_bin'] = tbin
self.free_tasks = free_tasks
self.active_tasks = active_tasks
def get_refusals(self):
"""Get task refusals and clean stale entries"""
refusals = {}
cutoff_ts = time.time() - self.soft_refusal_timeout
to_drop = []
for row in get_task_refusals(fields=('id', 'task_id', 'host_id', 'soft', 'ts', 'state')):
if ((row['soft'] and row['ts'] < cutoff_ts) or
koji.TASK_STATES[row['state']] not in ('FREE', 'OPEN', 'ASSIGNED')):
to_drop.append(row['id'])
else:
# index by task and host
refusals.setdefault(row['task_id'], {})[row['host_id']] = row
if to_drop:
# drop stale entries
delete = DeleteProcessor(
'scheduler_task_refusals',
clauses=['id IN %(to_drop)s'],
values=locals(),
)
delete.execute()
return refusals
def get_hosts(self):
# get hosts and bin them
hosts_by_bin = {}
hosts_by_id = {}
for host in self._get_hosts():
host['_bins'] = []
hosts_by_id[host['id']] = host
for chan in host['channels']:
for arch in host['arches'].split() + ['noarch']:
host_bin = "%s:%s" % (chan, arch)
hosts_by_bin.setdefault(host_bin, []).append(host)
host['_bins'].append(host_bin)
self.hosts_by_bin = hosts_by_bin
self.hosts = hosts_by_id
def _get_hosts(self):
"""Query enabled hosts"""
fields = (
('host.id', 'id'),
('host.name', 'name'),
("date_part('epoch', host.update_time)", 'update_ts'),
('host.task_load', 'task_load'),
('host.ready', 'ready'),
('host_config.arches', 'arches'),
('host_config.capacity', 'capacity'),
)
fields, aliases = zip(*fields)
query = QueryProcessor(
tables=['host'],
columns=fields,
aliases=aliases,
clauses=[
'host_config.enabled IS TRUE',
'host_config.active IS TRUE',
],
joins=[
'host_config ON host.id = host_config.host_id'
]
)
hosts = query.execute()
# also get channel info
query = QueryProcessor(
tables=['host_channels'],
columns=['host_id', 'channel_id'],
clauses=['active IS TRUE', 'channels.enabled IS TRUE'],
joins=['channels ON host_channels.channel_id = channels.id'],
)
chan_idx = {}
for row in query.execute():
chan_idx.setdefault(row['host_id'], []).append(row['channel_id'])
for host in hosts:
host['channels'] = chan_idx.get(host['id'], [])
return hosts
def add_run(self, task, host):
log_both('Assigning task', task_id=task['task_id'], host_id=host['id'])
# mark any older runs inactive
update = UpdateProcessor(
'scheduler_task_runs',
data={'active': False},
clauses=['task_id=%(task_id)s', 'active = TRUE'],
values={'task_id': task['task_id']},
)
update.execute()
# add the new run
insert = InsertProcessor('scheduler_task_runs')
insert.set(task_id=task['task_id'], host_id=host['id'])
insert.execute()
# mark the task assigned
task = kojihub.Task(task['task_id'])
task.assign(host['id'])
class SchedulerExports:
getTaskRuns = staticmethod(get_task_runs)
getTaskRefusals = staticmethod(get_task_refusals)
getHostData = staticmethod(get_host_data)
getLogMessages = staticmethod(get_log_messages)
def doRun(self, force=False):
"""Run the scheduler
This is a debug tool and should not normally be needed.
Scheduler runs are regularly triggered by builder checkins
"""
force = kojihub.convert_value(force, cast=bool)
context.session.assertPerm('admin')
return TaskScheduler().run(force=force)

View file

@ -983,10 +983,58 @@ CREATE TABLE rpm_checksum (
) WITHOUT OIDS;
CREATE INDEX rpm_checksum_rpm_id ON rpm_checksum(rpm_id);
-- scheduler tables
CREATE TABLE scheduler_task_runs (
id SERIAL NOT NULL PRIMARY KEY,
task_id INTEGER REFERENCES task (id) NOT NULL,
host_id INTEGER REFERENCES host (id) NOT NULL,
active BOOLEAN NOT NULL DEFAULT TRUE,
create_time TIMESTAMPTZ NOT NULL DEFAULT NOW()
) WITHOUT OIDS;
CREATE INDEX scheduler_task_runs_task ON scheduler_task_runs(task_id);
CREATE INDEX scheduler_task_runs_host ON scheduler_task_runs(host_id);
CREATE INDEX scheduler_task_runs_create_time ON scheduler_task_runs(create_time);
CREATE TABLE scheduler_host_data (
host_id INTEGER REFERENCES host (id) PRIMARY KEY,
data JSONB
) WITHOUT OIDS;
CREATE TABLE scheduler_sys_data (
name TEXT NOT NULL PRIMARY KEY,
data JSONB
) WITHOUT OIDS;
CREATE TABLE scheduler_task_refusals (
id SERIAL NOT NULL PRIMARY KEY,
task_id INTEGER REFERENCES task (id) NOT NULL,
host_id INTEGER REFERENCES host (id) NOT NULL,
by_host BOOLEAN NOT NULL,
soft BOOLEAN NOT NULL DEFAULT FALSE,
msg TEXT,
time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (task_id, host_id)
) WITHOUT OIDS;
CREATE TABLE scheduler_log_messages (
id SERIAL NOT NULL PRIMARY KEY,
task_id INTEGER REFERENCES task (id),
host_id INTEGER REFERENCES host (id),
msg_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
msg TEXT NOT NULL
) WITHOUT OIDS;
-- this table is used for locking, see db_lock()
CREATE TABLE locks (
name TEXT NOT NULL PRIMARY KEY
) WITHOUT OIDS;
INSERT INTO locks(name) VALUES('protonmsg-plugin');
INSERT INTO locks(name) VALUES('scheduler');
COMMIT WORK;

View file

@ -136,6 +136,8 @@ monitor commands:
edit-notification Edit user's notification
list-notifications List user's notifications and blocks
remove-notification Remove user's notifications
scheduler-info Show information about scheduling
scheduler-logs Query scheduler logs
unblock-notification Unblock user's notification
wait-repo Wait for a repo to be regenerated
watch-logs Watch logs in realtime

View file

@ -1,64 +0,0 @@
import kojihub
import mock
import unittest
QP = kojihub.QueryProcessor
class TestGetReadyHosts(unittest.TestCase):
def setUp(self):
self.maxDiff = None
self.QueryProcessor = mock.patch('kojihub.kojihub.QueryProcessor',
side_effect=self.getQuery).start()
self.queries = []
self.query_execute = mock.MagicMock()
def tearDown(self):
mock.patch.stopall()
def getQuery(self, *args, **kwargs):
query = QP(*args, **kwargs)
query.execute = self.query_execute
self.queries.append(query)
return query
def test_valid(self):
hosts = [{'host.id': 1, 'name': 'hostname', 'arches': 'arch123', 'task_load': 0,
'capacity': 3},
{'host.id': 2, 'name': 'hostname-2', 'arches': 'arch123', 'task_load': 0,
'capacity': 3}]
expected_res = [{'host.id': 1, 'name': 'hostname', 'arches': 'arch123', 'task_load': 0,
'capacity': 3, 'channels': [1]},
{'host.id': 2, 'name': 'hostname-2', 'arches': 'arch123', 'task_load': 0,
'capacity': 3, 'channels': [2, 3]}
]
self.query_execute.side_effect = [hosts, [{'channel_id': 1}],
[{'channel_id': 2}, {'channel_id': 3}]]
result = kojihub.get_ready_hosts()
self.assertEqual(result, expected_res)
self.assertEqual(len(self.queries), 3)
query = self.queries[0]
self.assertEqual(query.tables, ['host'])
self.assertEqual(query.joins, ['sessions USING (user_id)',
'host_config ON host.id = host_config.host_id'])
self.assertEqual(query.clauses, ['active IS TRUE', 'enabled IS TRUE', 'expired IS FALSE',
'master IS NULL', 'ready IS TRUE',
"sessions.update_time > NOW() - '5 minutes'::interval"])
self.assertEqual(query.values, {})
self.assertEqual(query.columns, ['arches', 'capacity', 'host.id', 'name', 'task_load'])
query = self.queries[1]
self.assertEqual(query.tables, ['host_channels'])
self.assertEqual(query.joins, ['channels ON host_channels.channel_id = channels.id'])
self.assertEqual(query.clauses, ['active IS TRUE', 'enabled IS TRUE', 'host_id=%(id)s'])
self.assertEqual(query.values, hosts[0])
self.assertEqual(query.columns, ['channel_id'])
query = self.queries[2]
self.assertEqual(query.tables, ['host_channels'])
self.assertEqual(query.joins, ['channels ON host_channels.channel_id = channels.id'])
self.assertEqual(query.clauses, ['active IS TRUE', 'enabled IS TRUE', 'host_id=%(id)s'])
self.assertEqual(query.values, hosts[1])
self.assertEqual(query.columns, ['channel_id'])

View file

@ -11,17 +11,17 @@ from kojihub.db import DeleteProcessor, QueryProcessor, BulkInsertProcessor
def clean_sessions(cursor, vacuum, test, age, absolute):
clauses = f"(update_time < NOW() - '{age:d} days'::interval)"
clause = f"(update_time < NOW() - '{age:d} days'::interval)"
if absolute is not None:
clauses += f"OR (start_time < NOW() - '{absolute:d} days'::interval)"
clause += f"OR (start_time < NOW() - '{absolute:d} days'::interval)"
if options.verbose:
query = QueryProcessor(tables=['sessions'], clauses=[clauses], opts={'countOnly': True})
query = QueryProcessor(tables=['sessions'], clauses=[clause], opts={'countOnly': True})
rows = query.execute()
print(f"Deleting {rows} sessions")
if not test:
delete = DeleteProcessor(table='sessions', clauses=[clauses])
delete = DeleteProcessor(table='sessions', clauses=[clause])
delete.execute()
if vacuum:
cursor.execute("VACUUM ANALYZE sessions")
@ -147,6 +147,21 @@ def clean_buildroots(cursor, vacuum, test):
cursor.execute("VACUUM ANALYZE buildroot")
def clean_scheduler_logs(cursor, vacuum, test, age):
clauses = [f"(msg_time < NOW() - '{age:d} days'::interval)"]
if options.verbose:
query = QueryProcessor(tables=["scheduler_log_messages"],
clauses=clauses,
opts={'countOnly': True})
rows = query.execute()
print(f"Deleting {rows} scheduler log messages")
if not test:
delete = DeleteProcessor(table="scheduler_log_messages", clauses=clauses)
delete.execute()
if vacuum:
cursor.execute("VACUUM ANALYZE scheduler_log_messages")
if __name__ == "__main__":
global options
parser = OptionParser("%prog cleans koji database")
@ -180,6 +195,9 @@ if __name__ == "__main__":
parser.add_option('--scratch-builds-age', type=int, dest="scratch_age",
action="store", default=730, metavar="DAYS",
help="Delete scratch builds' tasks older than this (default: 2 years")
parser.add_option('--logs-age', type=int,
action="store", default=7, metavar="DAYS",
help="Delete scheduler log messages older than this (default: 7 days)")
parser.add_option('--buildroots', action="store_true",
help="Delete unreferenced buildroots")
parser.add_option('-f', '--force', action="store_true",
@ -240,6 +258,7 @@ if __name__ == "__main__":
clean_sessions(cursor, options.vacuum, options.test, options.sessions_age,
options.sessions_absolute_age)
clean_reservations(cursor, options.vacuum, options.test, options.reservations_age)
clean_scheduler_logs(cursor, options.vacuum, options.test, options.logs_age)
if options.tag_notifications:
clean_notification_tasks(cursor, options.vacuum, options.test,
age=options.tag_notifications_age)

View file

@ -1780,7 +1780,12 @@ def hostinfo(environ, hostID=None, userID=None):
values['host'] = host
values['channels'] = channels
values['buildroots'] = buildroots
values['lastUpdate'] = server.getLastHostUpdate(host['id'], ts=True)
if 'update_ts' not in host:
# be nice with older hub
# TODO remove this compat workaround after a release
values['lastUpdate'] = server.getLastHostUpdate(host['id'], ts=True)
else:
values['lastUpdate'] = koji.formatTimeLong(host['update_ts'])
if environ['koji.currentUser']:
values['perms'] = server.getUserPerms(environ['koji.currentUser']['id'])
else: