diff --git a/cli/koji_cli/commands.py b/cli/koji_cli/commands.py index a906b269..770083bb 100644 --- a/cli/koji_cli/commands.py +++ b/cli/koji_cli/commands.py @@ -7899,3 +7899,59 @@ def anon_handle_repoinfo(goptions, session, args): # repoID option added in 1.33 if options.buildroots: warn("--buildroots option is available with hub 1.33 or newer") + + +def anon_handle_scheduler_info(goptions, session, args): + """[monitor] Show information about scheduling""" + usage = "usage: %prog schedulerinfo [options]" + parser = OptionParser(usage=get_usage_str(usage)) + parser.add_option("-t", "--task", action="store", type=int, default=None, + help="Limit data to given task id") + parser.add_option("--host", action="store", default=None, + help="Limit data to given builder (name/id)") + parser.add_option("--state", action="store", type='str', default=None, + choices=[x for x in koji.TASK_STATES.keys()], + help="Limit data to task state") + (options, args) = parser.parse_args(args) + if len(args) > 0: + parser.error("This command takes no arguments") + + ensure_connection(session, goptions) + + host_id = None + if options.host: + try: + host_id = int(options.host) + except ValueError: + host_id = session.getHost(options.host, strict=True)['id'] + + if options.state: + state = koji.TASK_STATES[options.state] + else: + state = None + + # get the data + runs = session.scheduler.getTaskRuns(taskID=options.task, hostID=host_id, state=state) + mask = '%(task_id)s\t%(host_id)s\t%(state)s\t%(create_time)s\t%(start_time)s\t%(end_time)s' + if not goptions.quiet: + header = mask % { + 'task_id': 'Task', + 'host_name': 'Host', + 'state': 'State', + 'create_time': 'Created', + 'start_time': 'Started', + 'end_time': 'Ended' + } + print(header) + print('-' * len(header)) + for run in runs: + run['state'] = koji.TASK_STATES[runs['state']] + print(mask % run) + + if host_id: + print('Host data for %s:' % options.host) + host_data = session.scheduler.getHostData(hostID=host_id) + if len(host_data) > 0: + print(host_data[0]['data']) + else: + print('-') diff --git a/koji/__init__.py b/koji/__init__.py index 4d0e7b25..6f1aa837 100644 --- a/koji/__init__.py +++ b/koji/__init__.py @@ -194,6 +194,8 @@ TASK_STATES = Enum(( 'CANCELED', 'ASSIGNED', 'FAILED', + 'SCHEDULED', + 'REFUSED', )) BUILD_STATES = Enum(( diff --git a/kojihub/kojihub.py b/kojihub/kojihub.py index cc266f3a..b4b37588 100644 --- a/kojihub/kojihub.py +++ b/kojihub/kojihub.py @@ -95,6 +95,7 @@ from .db import ( # noqa: F401 logger = logging.getLogger('koji.hub') +sched_logger = scheduler.DBLogger() NUMERIC_TYPES = (int, float) @@ -315,10 +316,12 @@ class Task(object): else: return None - def free(self): + def free(self, newstate=koji.TASK_STATES['FREE']): """Free a task""" + if newstate not in [koji.TASK_STATES['FREE'], koji.TASK_STATES['REFUSED']]: + raise koji.GenericError("Can't be called with other than FREE/REFUSED states") info = self.getInfo(request=True) - self.runCallbacks('preTaskStateChange', info, 'state', koji.TASK_STATES['FREE']) + self.runCallbacks('preTaskStateChange', info, 'state', newstate) self.runCallbacks('preTaskStateChange', info, 'host_id', None) # access checks should be performed by calling function query = QueryProcessor(tables=['task'], columns=['state'], clauses=['id = %(id)i'], @@ -327,14 +330,13 @@ class Task(object): if not oldstate: raise koji.GenericError("No such task: %i" % self.id) if koji.TASK_STATES[oldstate] in ['CLOSED', 'CANCELED', 'FAILED']: - raise koji.GenericError("Cannot free task %i, state is %s" % + raise koji.GenericError("Cannot free/refuse task %i, state is %s" % (self.id, koji.TASK_STATES[oldstate])) - newstate = koji.TASK_STATES['FREE'] newhost = None update = UpdateProcessor('task', clauses=['id=%(task_id)s'], values={'task_id': self.id}, data={'state': newstate, 'host_id': newhost}) update.execute() - self.runCallbacks('postTaskStateChange', info, 'state', koji.TASK_STATES['FREE']) + self.runCallbacks('postTaskStateChange', info, 'state', newstate) self.runCallbacks('postTaskStateChange', info, 'host_id', None) return True @@ -14409,6 +14411,66 @@ class HostExports(object): task.assertHost(host.id) return task.setWeight(weight) + def setHostData(self, hostdata): + """Builder will update all its resources + + Initial implementation contains: + - available task methods + - maxjobs + - host readiness + """ + host = Host() + host.verify() + clauses = ['host_id = %(host_id)i'] + values = {'host_id': host.id} + table = 'scheduler_host_data' + query = QueryProcessor(tables=[table], clauses=clauses, values=values, + opts={'countOnly': True}) + if query.singleValue() > 0: + update = UpdateProcessor(table=table, data={'data': hostdata}, + clauses=clauses, values=values) + update.execute() + else: + insert = InsertProcessor(table=table, data={'data': hostdata}, + clauses=clauses, values=values) + insert.execute() + sched_logger.debug(f"Updating host data with: {hostdata}", + host_id=host.id, location='setHostData') + + def getTasks(self): + host = Host() + host.verify() + + query = QueryProcessor( + tables=['scheduler_task_runs'], + clauses=[ + 'host_id = %(host_id)s', + 'state in %(states)s' + ], + values={ + 'host_id': host.id, + 'states': [ + koji.TASK_STATES['SCHEDULED'], + koji.TASK_STATES['ASSIGNED'], + ], + } + ) + tasks = query.execute() + for task in tasks: + sched_logger.debug("Sending task", host_id=host.id, task_id=task['id'], + location="getTasks") + return tasks + + def refuseTask(self, task_id): + host = Host() + host.verify() + + task = Task(task_id) + task.free(newstate=koji.TASK_STATES['REFUSED']) + sched_logger.warning("Refusing task", host_id=host.id, task_id=task_id, + location="refuseTask") + return True + def getHostTasks(self): host = Host() host.verify() diff --git a/kojihub/kojixmlrpc.py b/kojihub/kojixmlrpc.py index e9e0b3d9..88f2f806 100644 --- a/kojihub/kojixmlrpc.py +++ b/kojihub/kojixmlrpc.py @@ -711,6 +711,7 @@ def setup_logging2(opts): log_handler.setFormatter(HubFormatter(opts['LogFormat'])) + import scheduler def get_memory_usage(): pagesize = resource.getpagesize() statm = [pagesize * int(y) // 1024 @@ -844,8 +845,10 @@ def get_registry(opts, plugins): registry = HandlerRegistry() functions = kojihub.RootExports() hostFunctions = kojihub.HostExports() + schedulerFunctions = scheduler.SchedulerExports() registry.register_instance(functions) registry.register_module(hostFunctions, "host") + registry.register_module(schedulerFunctions, "scheduler") registry.register_function(auth.login) registry.register_function(auth.sslLogin) registry.register_function(auth.logout) diff --git a/schemas/schema.sql b/schemas/schema.sql index 5fa6a22c..cfe39326 100644 --- a/schemas/schema.sql +++ b/schemas/schema.sql @@ -983,6 +983,28 @@ CREATE TABLE rpm_checksum ( ) WITHOUT OIDS; CREATE INDEX rpm_checksum_rpm_id ON rpm_checksum(rpm_id); + +-- scheduler tables +CREATE TABLE scheduler_task_runs ( + id SERIAL NOT NULL PRIMARY KEY, + task_id INTEGER REFERENCES task (id) NOT NULL, + host_id INTEGER REFERENCES host (id) NOT NULL, + state INTEGER NOT NULL, + create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), + start_time TIMESTAMPTZ, + end_time TIMESTAMPTZ, +) WITHOUT OIDS; +CREATE INDEX scheduler_task_runs_task ON scheduler_task_runs(task_id); +CREATE INDEX scheduler_task_runs_host ON scheduler_task_runs(host_id); +CREATE INDEX scheduler_task_runs_state ON scheduler_task_runs(state); +CREATE INDEX scheduler_task_runs_create_time ON scheduler_task_runs(create_time); + +CREATE TABLE scheduler_host_data ( + host_id INTEGER REFERENCES host (id) PRIMARY KEY, + data JSONB, +) WITHOUT OIDS; + + -- this table is used for locking, see db_lock() CREATE TABLE locks ( name TEXT NOT NULL PRIMARY KEY