This commit is contained in:
Tomas Kopecek 2022-12-06 16:34:25 +01:00
parent c3c0a6a139
commit 6a23a12fd0
5 changed files with 150 additions and 5 deletions

View file

@ -7899,3 +7899,59 @@ def anon_handle_repoinfo(goptions, session, args):
# repoID option added in 1.33
if options.buildroots:
warn("--buildroots option is available with hub 1.33 or newer")
def anon_handle_scheduler_info(goptions, session, args):
"""[monitor] Show information about scheduling"""
usage = "usage: %prog schedulerinfo [options]"
parser = OptionParser(usage=get_usage_str(usage))
parser.add_option("-t", "--task", action="store", type=int, default=None,
help="Limit data to given task id")
parser.add_option("--host", action="store", default=None,
help="Limit data to given builder (name/id)")
parser.add_option("--state", action="store", type='str', default=None,
choices=[x for x in koji.TASK_STATES.keys()],
help="Limit data to task state")
(options, args) = parser.parse_args(args)
if len(args) > 0:
parser.error("This command takes no arguments")
ensure_connection(session, goptions)
host_id = None
if options.host:
try:
host_id = int(options.host)
except ValueError:
host_id = session.getHost(options.host, strict=True)['id']
if options.state:
state = koji.TASK_STATES[options.state]
else:
state = None
# get the data
runs = session.scheduler.getTaskRuns(taskID=options.task, hostID=host_id, state=state)
mask = '%(task_id)s\t%(host_id)s\t%(state)s\t%(create_time)s\t%(start_time)s\t%(end_time)s'
if not goptions.quiet:
header = mask % {
'task_id': 'Task',
'host_name': 'Host',
'state': 'State',
'create_time': 'Created',
'start_time': 'Started',
'end_time': 'Ended'
}
print(header)
print('-' * len(header))
for run in runs:
run['state'] = koji.TASK_STATES[runs['state']]
print(mask % run)
if host_id:
print('Host data for %s:' % options.host)
host_data = session.scheduler.getHostData(hostID=host_id)
if len(host_data) > 0:
print(host_data[0]['data'])
else:
print('-')

View file

@ -194,6 +194,8 @@ TASK_STATES = Enum((
'CANCELED',
'ASSIGNED',
'FAILED',
'SCHEDULED',
'REFUSED',
))
BUILD_STATES = Enum((

View file

@ -95,6 +95,7 @@ from .db import ( # noqa: F401
logger = logging.getLogger('koji.hub')
sched_logger = scheduler.DBLogger()
NUMERIC_TYPES = (int, float)
@ -315,10 +316,12 @@ class Task(object):
else:
return None
def free(self):
def free(self, newstate=koji.TASK_STATES['FREE']):
"""Free a task"""
if newstate not in [koji.TASK_STATES['FREE'], koji.TASK_STATES['REFUSED']]:
raise koji.GenericError("Can't be called with other than FREE/REFUSED states")
info = self.getInfo(request=True)
self.runCallbacks('preTaskStateChange', info, 'state', koji.TASK_STATES['FREE'])
self.runCallbacks('preTaskStateChange', info, 'state', newstate)
self.runCallbacks('preTaskStateChange', info, 'host_id', None)
# access checks should be performed by calling function
query = QueryProcessor(tables=['task'], columns=['state'], clauses=['id = %(id)i'],
@ -327,14 +330,13 @@ class Task(object):
if not oldstate:
raise koji.GenericError("No such task: %i" % self.id)
if koji.TASK_STATES[oldstate] in ['CLOSED', 'CANCELED', 'FAILED']:
raise koji.GenericError("Cannot free task %i, state is %s" %
raise koji.GenericError("Cannot free/refuse task %i, state is %s" %
(self.id, koji.TASK_STATES[oldstate]))
newstate = koji.TASK_STATES['FREE']
newhost = None
update = UpdateProcessor('task', clauses=['id=%(task_id)s'], values={'task_id': self.id},
data={'state': newstate, 'host_id': newhost})
update.execute()
self.runCallbacks('postTaskStateChange', info, 'state', koji.TASK_STATES['FREE'])
self.runCallbacks('postTaskStateChange', info, 'state', newstate)
self.runCallbacks('postTaskStateChange', info, 'host_id', None)
return True
@ -14409,6 +14411,66 @@ class HostExports(object):
task.assertHost(host.id)
return task.setWeight(weight)
def setHostData(self, hostdata):
"""Builder will update all its resources
Initial implementation contains:
- available task methods
- maxjobs
- host readiness
"""
host = Host()
host.verify()
clauses = ['host_id = %(host_id)i']
values = {'host_id': host.id}
table = 'scheduler_host_data'
query = QueryProcessor(tables=[table], clauses=clauses, values=values,
opts={'countOnly': True})
if query.singleValue() > 0:
update = UpdateProcessor(table=table, data={'data': hostdata},
clauses=clauses, values=values)
update.execute()
else:
insert = InsertProcessor(table=table, data={'data': hostdata},
clauses=clauses, values=values)
insert.execute()
sched_logger.debug(f"Updating host data with: {hostdata}",
host_id=host.id, location='setHostData')
def getTasks(self):
host = Host()
host.verify()
query = QueryProcessor(
tables=['scheduler_task_runs'],
clauses=[
'host_id = %(host_id)s',
'state in %(states)s'
],
values={
'host_id': host.id,
'states': [
koji.TASK_STATES['SCHEDULED'],
koji.TASK_STATES['ASSIGNED'],
],
}
)
tasks = query.execute()
for task in tasks:
sched_logger.debug("Sending task", host_id=host.id, task_id=task['id'],
location="getTasks")
return tasks
def refuseTask(self, task_id):
host = Host()
host.verify()
task = Task(task_id)
task.free(newstate=koji.TASK_STATES['REFUSED'])
sched_logger.warning("Refusing task", host_id=host.id, task_id=task_id,
location="refuseTask")
return True
def getHostTasks(self):
host = Host()
host.verify()

View file

@ -711,6 +711,7 @@ def setup_logging2(opts):
log_handler.setFormatter(HubFormatter(opts['LogFormat']))
import scheduler
def get_memory_usage():
pagesize = resource.getpagesize()
statm = [pagesize * int(y) // 1024
@ -844,8 +845,10 @@ def get_registry(opts, plugins):
registry = HandlerRegistry()
functions = kojihub.RootExports()
hostFunctions = kojihub.HostExports()
schedulerFunctions = scheduler.SchedulerExports()
registry.register_instance(functions)
registry.register_module(hostFunctions, "host")
registry.register_module(schedulerFunctions, "scheduler")
registry.register_function(auth.login)
registry.register_function(auth.sslLogin)
registry.register_function(auth.logout)

View file

@ -983,6 +983,28 @@ CREATE TABLE rpm_checksum (
) WITHOUT OIDS;
CREATE INDEX rpm_checksum_rpm_id ON rpm_checksum(rpm_id);
-- scheduler tables
CREATE TABLE scheduler_task_runs (
id SERIAL NOT NULL PRIMARY KEY,
task_id INTEGER REFERENCES task (id) NOT NULL,
host_id INTEGER REFERENCES host (id) NOT NULL,
state INTEGER NOT NULL,
create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
start_time TIMESTAMPTZ,
end_time TIMESTAMPTZ,
) WITHOUT OIDS;
CREATE INDEX scheduler_task_runs_task ON scheduler_task_runs(task_id);
CREATE INDEX scheduler_task_runs_host ON scheduler_task_runs(host_id);
CREATE INDEX scheduler_task_runs_state ON scheduler_task_runs(state);
CREATE INDEX scheduler_task_runs_create_time ON scheduler_task_runs(create_time);
CREATE TABLE scheduler_host_data (
host_id INTEGER REFERENCES host (id) PRIMARY KEY,
data JSONB,
) WITHOUT OIDS;
-- this table is used for locking, see db_lock()
CREATE TABLE locks (
name TEXT NOT NULL PRIMARY KEY