PR#537: messagebus plugin: deferred sending and test mode

Merges #537
https://pagure.io/koji/pull-request/537
This commit is contained in:
Mike McLean 2017-10-05 16:16:26 -04:00
commit bea9a86916

View file

@ -5,6 +5,7 @@
# Mike Bonnet <mikeb@redhat.com>
from koji import PluginError
from koji.context import context
from koji.plugin import callbacks, callback, ignore_error, convert_datetime
import ConfigParser
import logging
@ -68,8 +69,23 @@ class Connection(qpid.messaging.Connection):
timeout = self._timeout
return qpid.messaging.Connection._wait(self, predicate, timeout)
def get_config():
global config
if config:
return config
config = ConfigParser.SafeConfigParser()
config.read(CONFIG_FILE)
if not config.has_option('broker', 'timeout'):
config.set('broker', 'timeout', '60')
if not config.has_option('broker', 'heartbeat'):
config.set('broker', 'heartbeat', '60')
return config
def get_sender():
global config, session, target
global session, target
if session and target:
try:
return session.sender(target)
@ -78,12 +94,7 @@ def get_sender():
session = None
target = None
config = ConfigParser.SafeConfigParser()
config.read(CONFIG_FILE)
if not config.has_option('broker', 'timeout'):
config.set('broker', 'timeout', '60')
if not config.has_option('broker', 'heartbeat'):
config.set('broker', 'heartbeat', '60')
config = get_config()
if config.getboolean('broker', 'ssl'):
url = 'amqps://'
@ -204,9 +215,7 @@ def get_message_headers(msgtype, *args, **kws):
and c != 'postCommit'])
@ignore_error
@convert_datetime
def send_message(cbtype, *args, **kws):
global config
sender = get_sender()
def prep_message(cbtype, *args, **kws):
if cbtype.startswith('post'):
msgtype = cbtype[4:]
else:
@ -216,6 +225,7 @@ def send_message(cbtype, *args, **kws):
if args:
data['args'] = list(args)
config = get_config()
exchange_type = config.get('exchange', 'type')
if exchange_type == 'topic':
subject = get_message_subject(msgtype, *args, **kws)
@ -226,5 +236,36 @@ def send_message(cbtype, *args, **kws):
else:
raise PluginError('unsupported exchange type: %s' % exchange_type)
sender.send(message, sync=True, timeout=config.getfloat('broker', 'timeout'))
sender.close(timeout=config.getfloat('broker', 'timeout'))
messages = getattr(context, 'messagebus_plugin_messages', None)
if messages is None:
messages = []
context.messagebus_plugin_messages = messages
messages.append(message)
@callback('postCommit')
@ignore_error
def send_messages(cbtype, *args, **kws):
'''Send the messages cached by prep_message'''
logger = logging.getLogger('koji.plugin.messagebus')
config = get_config()
messages = getattr(context, 'messagebus_plugin_messages', [])
if not messages:
return
test_mode = False
if config.has_option('broker', 'test_mode'):
test_mode = config.getboolean('broker', 'test_mode')
if test_mode:
logger.debug('test mode: skipping broker connection')
for message in messages:
logger.debug('test mode: skipping message: %r', message)
else:
sender = get_sender()
for message in messages:
sender.send(message, sync=False,
timeout=config.getfloat('broker', 'timeout'))
sender.close(timeout=config.getfloat('broker', 'timeout'))
# koji should do this for us, but just in case...
del context.messagebus_plugin_messages