- extend the plugin system with an event-based callback infrastructure

- implement callback hooks for task state transitions
 - example echo plugin that logs all callback invocations
This commit is contained in:
Mike Bonnet 2009-10-08 12:04:22 -04:00
parent f7d990f8d7
commit ae953a7072
5 changed files with 153 additions and 10 deletions

View file

@ -26,6 +26,7 @@ import calendar
import koji import koji
import koji.auth import koji.auth
import koji.db import koji.db
import koji.plugin
import koji.policy import koji.policy
import datetime import datetime
import errno import errno
@ -130,6 +131,9 @@ class Task(object):
"""Attempt to associate the task for host, either to assign or open """Attempt to associate the task for host, either to assign or open
returns True if successful, False otherwise""" returns True if successful, False otherwise"""
info = self.getInfo(request=True)
self.runCallbacks('preTaskStateChange', info, 'state', koji.TASK_STATES[newstate])
self.runCallbacks('preTaskStateChange', info, 'host_id', host_id)
#we use row-level locks to keep things sane #we use row-level locks to keep things sane
#note the SELECT...FOR UPDATE #note the SELECT...FOR UPDATE
task_id = self.id task_id = self.id
@ -166,6 +170,8 @@ class Task(object):
q = """UPDATE task SET state=%(state)s,host_id=%(host_id)s q = """UPDATE task SET state=%(state)s,host_id=%(host_id)s
WHERE id=%(task_id)s""" WHERE id=%(task_id)s"""
_dml(q,locals()) _dml(q,locals())
self.runCallbacks('postTaskStateChange', info, 'state', koji.TASK_STATES[newstate])
self.runCallbacks('postTaskStateChange', info, 'host_id', host_id)
return True return True
def assign(self,host_id,force=False): def assign(self,host_id,force=False):
@ -192,6 +198,9 @@ class Task(object):
def free(self): def free(self):
"""Free a task""" """Free a task"""
info = self.getInfo(request=True)
self.runCallbacks('preTaskStateChange', info, 'state', koji.TASK_STATES['FREE'])
self.runCallbacks('preTaskStateChange', info, 'host_id', None)
task_id = self.id task_id = self.id
# access checks should be performed by calling function # access checks should be performed by calling function
query = """SELECT state FROM task WHERE id = %(id)i FOR UPDATE""" query = """SELECT state FROM task WHERE id = %(id)i FOR UPDATE"""
@ -207,23 +216,31 @@ class Task(object):
q = """UPDATE task SET state=%(newstate)s,host_id=%(newhost)s q = """UPDATE task SET state=%(newstate)s,host_id=%(newhost)s
WHERE id=%(task_id)s""" WHERE id=%(task_id)s"""
_dml(q,locals()) _dml(q,locals())
self.runCallbacks('postTaskStateChange', info, 'state', koji.TASK_STATES['FREE'])
self.runCallbacks('postTaskStateChange', info, 'host_id', None)
return True return True
def setWeight(self,weight): def setWeight(self,weight):
"""Set weight for task""" """Set weight for task"""
task_id = self.id task_id = self.id
weight = float(weight)
info = self.getInfo(request=True)
self.runCallbacks('preTaskStateChange', info, 'weight', weight)
# access checks should be performed by calling function # access checks should be performed by calling function
q = """UPDATE task SET weight=%(weight)s WHERE id = %(task_id)s""" q = """UPDATE task SET weight=%(weight)s WHERE id = %(task_id)s"""
_dml(q,locals()) _dml(q,locals())
self.runCallbacks('postTaskStateChange', info, 'weight', weight)
def setPriority(self, priority, recurse=False): def setPriority(self, priority, recurse=False):
"""Set priority for task""" """Set priority for task"""
task_id = self.id task_id = self.id
priority = int(priority) priority = int(priority)
info = self.getInfo(request=True)
self.runCallbacks('preTaskStateChange', info, 'priority', priority)
# access checks should be performed by calling function # access checks should be performed by calling function
q = """UPDATE task SET priority=%(priority)s WHERE id = %(task_id)s""" q = """UPDATE task SET priority=%(priority)s WHERE id = %(task_id)s"""
_dml(q,locals()) _dml(q,locals())
self.runCallbacks('postTaskStateChange', info, 'priority', priority)
if recurse: if recurse:
"""Change priority of child tasks""" """Change priority of child tasks"""
@ -237,11 +254,18 @@ class Task(object):
Returns True if successful, False if not""" Returns True if successful, False if not"""
task_id = self.id task_id = self.id
# access checks should be performed by calling function # access checks should be performed by calling function
st_closed = koji.TASK_STATES['CLOSED'] # this is an approximation, and will be different than what is in the database
# the actual value should be retrieved from the 'new' value of the post callback
now = time.time()
info = self.getInfo(request=True)
self.runCallbacks('preTaskStateChange', info, 'state', state)
self.runCallbacks('preTaskStateChange', info, 'completion_ts', now)
update = """UPDATE task SET result = %(result)s, state = %(state)s, completion_time = NOW() update = """UPDATE task SET result = %(result)s, state = %(state)s, completion_time = NOW()
WHERE id = %(task_id)d WHERE id = %(task_id)d
""" """
_dml(update,locals()) _dml(update,locals())
self.runCallbacks('postTaskStateChange', info, 'state', state)
self.runCallbacks('postTaskStateChange', info, 'completion_ts', now)
def close(self,result): def close(self,result):
# access checks should be performed by calling function # access checks should be performed by calling function
@ -272,6 +296,10 @@ class Task(object):
successfully canceled, or if it was already canceled, False if it is successfully canceled, or if it was already canceled, False if it is
closed.""" closed."""
# access checks should be performed by calling function # access checks should be performed by calling function
now = time.time()
info = self.getInfo(request=True)
self.runCallbacks('preTaskStateChange', info, 'state', koji.TASK_STATES['CANCELED'])
self.runCallbacks('preTaskStateChange', info, 'completion_ts', now)
task_id = self.id task_id = self.id
q = """SELECT state FROM task WHERE id = %(task_id)s FOR UPDATE""" q = """SELECT state FROM task WHERE id = %(task_id)s FOR UPDATE"""
state = _singleValue(q,locals()) state = _singleValue(q,locals())
@ -285,6 +313,8 @@ class Task(object):
update = """UPDATE task SET state = %(st_canceled)i, completion_time = NOW() update = """UPDATE task SET state = %(st_canceled)i, completion_time = NOW()
WHERE id = %(task_id)i""" WHERE id = %(task_id)i"""
_dml(update, locals()) _dml(update, locals())
self.runCallbacks('postTaskStateChange', info, 'state', koji.TASK_STATES['CANCELED'])
self.runCallbacks('postTaskStateChange', info, 'completion_ts', now)
#cancel associated builds (only if state is 'BUILDING') #cancel associated builds (only if state is 'BUILDING')
#since we check build state, we avoid loops with cancel_build on our end #since we check build state, we avoid loops with cancel_build on our end
b_building = koji.BUILD_STATES['BUILDING'] b_building = koji.BUILD_STATES['BUILDING']
@ -399,6 +429,22 @@ class Task(object):
task['request'] = xmlrpclib.loads(task['request'])[0] task['request'] = xmlrpclib.loads(task['request'])[0]
return results return results
def runCallbacks(self, cbtype, old_info, attr, new_val):
if cbtype.startswith('pre'):
info = old_info
elif cbtype.startswith('post'):
info = self.getInfo(request=True)
new_val = info[attr]
else:
raise koji.GenericError, 'unknown callback type: %s' % cbtype
old_val = old_info[attr]
if attr == 'state':
# state is passed in as an integer, but we want to use the string
old_val = koji.TASK_STATES[old_val]
new_val = koji.TASK_STATES[new_val]
koji.plugin.run_callbacks(cbtype, attribute=attr, old=old_val, new=new_val,
info=info)
def make_task(method,arglist,**opts): def make_task(method,arglist,**opts):
"""Create a task """Create a task
@ -460,6 +506,7 @@ def make_task(method,arglist,**opts):
allow_none=1) allow_none=1)
opts['state'] = koji.TASK_STATES['FREE'] opts['state'] = koji.TASK_STATES['FREE']
opts['method'] = method opts['method'] = method
koji.plugin.run_callbacks('preTaskStateChange', attribute='state', old=None, new='FREE', info=opts)
# stick it in the database # stick it in the database
q = """ q = """
INSERT INTO task (state,owner,method,request,priority, INSERT INTO task (state,owner,method,request,priority,
@ -470,6 +517,8 @@ def make_task(method,arglist,**opts):
_dml(q,opts) _dml(q,opts)
q = """SELECT currval('task_id_seq')""" q = """SELECT currval('task_id_seq')"""
task_id = _singleValue(q, {}) task_id = _singleValue(q, {})
opts['id'] = task_id
koji.plugin.run_callbacks('postTaskStateChange', attribute='state', old=None, new='FREE', info=opts)
return task_id return task_id
def mktask(__taskopts,__method,*args,**opts): def mktask(__taskopts,__method,*args,**opts):

View file

@ -102,12 +102,16 @@ class HandlerRegistry(object):
if isinstance(v, (types.ClassType, types.TypeType)): if isinstance(v, (types.ClassType, types.TypeType)):
#skip classes #skip classes
continue continue
if callable(v) and getattr(v, 'exported', False): if callable(v):
if hasattr(v, 'export_alias'): if getattr(v, 'exported', False):
name = getattr(v, 'export_alias') if hasattr(v, 'export_alias'):
else: name = getattr(v, 'export_alias')
name = v.__name__ else:
self.register_function(v, name=name) name = v.__name__
self.register_function(v, name=name)
if getattr(v, 'callbacks', None):
for cbtype in v.callbacks:
koji.plugin.register_callback(cbtype, v)
def list_api(self): def list_api(self):
funcs = [] funcs = []

View file

@ -281,6 +281,14 @@ class LiveCDError(GenericError):
"""Raised when LiveCD Image creation fails""" """Raised when LiveCD Image creation fails"""
faultCode = 1015 faultCode = 1015
class PluginError(GenericError):
"""Raised when there is an error with a plugin"""
faultCode = 1016
class CallbackError(PluginError):
"""Raised when there is an error executing a callback"""
faultCode = 1017
class MultiCallInProgress(object): class MultiCallInProgress(object):
""" """
Placeholder class to be returned by method calls when in the process of Placeholder class to be returned by method calls when in the process of

View file

@ -20,8 +20,34 @@
import imp import imp
import koji import koji
import logging
import sys import sys
import traceback
# set this up for use by the plugins
# we want log output to go to Apache's error_log
logger = logging.getLogger('koji.plugin')
logger.addHandler(logging.StreamHandler(sys.stderr))
logger.setLevel(logging.INFO)
# the available callback hooks and a list
# of functions to be called for each event
callbacks = {
'prePackageAdd': [],
'postPackageAdd': [],
'preTaskStateChange': [],
'postTaskStateChange': [],
'preBuildStateChange': [],
'postBuildStateChange': [],
'preImport': [],
'portImport': [],
'preTag': [],
'postTag': [],
'preUntag': [],
'postUntag': [],
'preDelete': [],
'postDelete': []
}
class PluginTracker(object): class PluginTracker(object):
@ -41,11 +67,11 @@ class PluginTracker(object):
#(no '.' -- it causes problems) #(no '.' -- it causes problems)
mod_name = self.prefix + name mod_name = self.prefix + name
if sys.modules.has_key(mod_name) and not reload: if sys.modules.has_key(mod_name) and not reload:
raise koji.GenericError, 'module name conflict: %s' % mod_name raise koji.PluginError, 'module name conflict: %s' % mod_name
if path is None: if path is None:
path = self.searchpath path = self.searchpath
if path is None: if path is None:
raise koji.GenericError, "empty module search path" raise koji.PluginError, "empty module search path"
file, pathname, description = imp.find_module(name, self.pathlist(path)) file, pathname, description = imp.find_module(name, self.pathlist(path))
try: try:
plugin = imp.load_module(mod_name, file, pathname, description) plugin = imp.load_module(mod_name, file, pathname, description)
@ -101,3 +127,44 @@ def export_in(module, alias=None):
setattr(f, 'export_alias', alias) setattr(f, 'export_alias', alias)
return f return f
return dec return dec
def callback(*cbtypes):
"""A decorator that indicates a function is a callback.
cbtypes is a list of callback types to register for. Valid
callback types are listed in the plugin module.
Intended to be used by plugins.
"""
def dec(f):
setattr(f, 'callbacks', cbtypes)
return f
return dec
def ignore_error(f):
"""a decorator that marks a callback as ok to fail
intended to be used by plugins
"""
setattr(f, 'failure_is_an_option', True)
return f
def register_callback(cbtype, func):
if not cbtype in callbacks:
raise koji.PluginError, '"%s" is not a valid callback type' % cbtype
if not callable(func):
raise koji.PluginError, '%s is not callable' % getattr(func, '__name__', 'function')
callbacks[cbtype].append(func)
def run_callbacks(cbtype, *args, **kws):
if not cbtype in callbacks:
raise koji.PluginError, '"%s" is not a valid callback type' % cbtype
for func in callbacks[cbtype]:
try:
func(cbtype, *args, **kws)
except:
tb = ''.join(traceback.format_exception(*sys.exc_info()))
msg = 'Error running %s callback from %s: %s' % (cbtype, func.__module__, tb)
if getattr(func, 'failure_is_an_option', False):
logging.getLogger('koji.plugin').warn('%s: %s' % (msg, tb))
else:
raise koji.CallbackError, msg

15
plugins/echo.py Normal file
View file

@ -0,0 +1,15 @@
# Example Koji callback
# Copyright (c) 2009 Red Hat, Inc.
# This callback simply logs all of its args using the logging module
#
# Authors:
# Mike Bonnet <mikeb@redhat.com>
from koji.plugin import callbacks, callback, ignore_error
import logging
@callback(*callbacks.keys())
@ignore_error
def echo(cbtype, *args, **kws):
logging.getLogger('koji.plugin.echo').info('Called the %s callback, args: %s; kws: %s',
cbtype, str(args), str(kws))