# Koji callback for sending notifications about events using the # qpid proton library. # Copyright (c) 2016 Red Hat, Inc. # # Authors: # Mike Bonnet import koji from koji.plugin import callback, ignore_error from koji.context import context import ConfigParser import logging import json import random from proton import Message, SSLDomain from proton.reactor import Container from proton.handlers import MessagingHandler CONFIG_FILE = '/etc/koji-hub/plugins/protonmsg.conf' CONFIG = None class TimeoutHandler(MessagingHandler): def __init__(self, url, msgs, conf, *args, **kws): super(TimeoutHandler, self).__init__(*args, **kws) self.url = url self.msgs = msgs self.conf = conf self.pending = {} self.senders = {} self.connect_task = None self.timeout_task = None self.log = logging.getLogger('koji.plugin.protonmsg.TimeoutHandler') def on_start(self, event): self.log.debug('Container starting') event.container.connected = False if self.conf.has_option('broker', 'cert') and self.conf.has_option('broker', 'cacert'): ssl = SSLDomain(SSLDomain.MODE_CLIENT) cert = self.conf.get('broker', 'cert') ssl.set_credentials(cert, cert, None) ssl.set_trusted_ca_db(self.conf.get('broker', 'cacert')) ssl.set_peer_authentication(SSLDomain.VERIFY_PEER) else: ssl = None self.log.debug('connecting to %s', self.url) event.container.connect(url=self.url, reconnect=False, ssl_domain=ssl) connect_timeout = self.conf.getint('broker', 'connect_timeout') self.connect_task = event.container.schedule(connect_timeout, self) send_timeout = self.conf.getint('broker', 'send_timeout') self.timeout_task = event.container.schedule(send_timeout, self) def on_timer_task(self, event): if not event.container.connected: self.log.error('not connected, stopping container') if self.timeout_task: self.timeout_task.cancel() self.timeout_task = None event.container.stop() else: # This should only run when called from the timeout task self.log.error('send timeout expired with %s messages unsent, stopping container', len(self.msgs)) event.container.stop() def on_connection_opened(self, event): event.container.connected = True self.connect_task.cancel() self.connect_task = None self.log.debug('connection to %s opened successfully', event.connection.hostname) self.send_msgs(event) def send_msgs(self, event): prefix = self.conf.get('broker', 'topic_prefix') for msg in self.msgs: address = 'topic://' + prefix + '.' + msg[0] if address in self.senders: sender = self.senders[address] self.log.debug('retrieved cached sender for %s', address) else: sender = event.container.create_sender(event.connection, target=address) self.log.debug('created new sender for %s', address) self.senders[address] = sender pmsg = Message(properties=msg[1], body=msg[2]) delivery = sender.send(pmsg) self.log.debug('sent message: %s', msg[1]) self.pending[delivery] = msg def update_pending(self, event): msg = self.pending[event.delivery] del self.pending[event.delivery] self.log.debug('removed message from self.pending: %s', msg[1]) if not self.pending: if self.msgs: self.log.error('%s messages unsent (rejected or released)', len(self.msgs)) else: self.log.debug('all messages sent successfully') for sender in self.senders.values(): self.log.debug('closing sender for %s', sender.target.address) sender.close() if self.timeout_task: self.log.debug('canceling timeout task') self.timeout_task.cancel() self.timeout_task = None self.log.debug('closing connection to %s', event.connection.hostname) event.connection.close() def on_settled(self, event): msg = self.pending[event.delivery] self.msgs.remove(msg) self.log.debug('removed message from self.msgs: %s', msg[1]) self.update_pending(event) def on_rejected(self, event): msg = self.pending[event.delivery] self.log.error('message was rejected: %s', msg[1]) self.update_pending(event) def on_released(self, event): msg = self.pending[event.delivery] self.log.error('message was released: %s', msg[1]) self.update_pending(event) def on_transport_tail_closed(self, event): if self.connect_task: self.log.debug('canceling connect timer') self.connect_task.cancel() self.connect_task = None if self.timeout_task: self.log.debug('canceling send timer') self.timeout_task.cancel() self.timeout_task = None def queue_msg(address, props, data): msgs = getattr(context, 'protonmsg_msgs', None) if msgs is None: msgs = [] context.protonmsg_msgs = msgs body = json.dumps(data) msgs.append((address, props, body)) @callback('postPackageListChange') def prep_package_list_change(cbtype, *args, **kws): address = 'package.' + kws['action'] props = {'type': cbtype[4:], 'tag': kws['tag']['name'], 'package': kws['package']['name'], 'action': kws['action']} queue_msg(address, props, kws) @callback('postTaskStateChange') def prep_task_state_change(cbtype, *args, **kws): if kws['attribute'] != 'state': return address = 'task.' + kws['new'].lower() props = {'type': cbtype[4:], 'id': kws['info']['id'], 'parent': kws['info']['parent'], 'method': kws['info']['method'], 'attribute': kws['attribute'], 'old': kws['old'], 'new': kws['new']} queue_msg(address, props, kws) @callback('postBuildStateChange') def prep_build_state_change(cbtype, *args, **kws): if kws['attribute'] != 'state': return old = kws['old'] if old is not None: old = koji.BUILD_STATES[old] new = koji.BUILD_STATES[kws['new']] address = 'build.' + new.lower() props = {'type': cbtype[4:], 'name': kws['info']['name'], 'version': kws['info']['version'], 'release': kws['info']['release'], 'attribute': kws['attribute'], 'old': old, 'new': new} queue_msg(address, props, kws) @callback('postImport') def prep_import(cbtype, *args, **kws): address = 'import.' + kws['type'] props = {'type': cbtype[4:], 'importType': kws['type'], 'name': kws['build']['name'], 'version': kws['build']['version'], 'release': kws['build']['release']} queue_msg(address, props, kws) @callback('postRPMSign') def prep_rpm_sign(cbtype, *args, **kws): address = 'sign.rpm' props = {'type': cbtype[4:], 'sigkey': kws['sigkey'], 'name': kws['build']['name'], 'version': kws['build']['version'], 'release': kws['build']['release'], 'rpm_name': kws['rpm']['name'], 'rpm_version': kws['rpm']['version'], 'rpm_release': kws['rpm']['release']} queue_msg(address, props, kws) def _prep_tag_msg(address, cbtype, kws): build = kws['build'] props = {'type': cbtype[4:], 'tag': kws['tag']['name'], 'name': build['name'], 'version': build['version'], 'release': build['release'], 'user': kws['user']['name']} queue_msg(address, props, kws) @callback('postTag') def prep_tag(cbtype, *args, **kws): _prep_tag_msg('build.tag', cbtype, kws) @callback('postUntag') def prep_untag(cbtype, *args, **kws): _prep_tag_msg('build.untag', cbtype, kws) @callback('postRepoInit') def prep_repo_init(cbtype, *args, **kws): address = 'repo.init' props = {'type': cbtype[4:], 'tag': kws['tag']['name'], 'repo_id': kws['repo_id']} queue_msg(address, props, kws) @callback('postRepoDone') def prep_repo_done(cbtype, *args, **kws): address = 'repo.done' props = {'type': cbtype[4:], 'tag': kws['repo']['tag_name'], 'repo_id': kws['repo']['id'], 'expire': kws['expire']} queue_msg(address, props, kws) @ignore_error @callback('postCommit') def send_queued_msgs(cbtype, *args, **kws): msgs = getattr(context, 'protonmsg_msgs', None) if not msgs: return log = logging.getLogger('koji.plugin.protonmsg') global CONFIG if not CONFIG: conf = ConfigParser.SafeConfigParser() with open(CONFIG_FILE) as conffile: conf.readfp(conffile) CONFIG = conf urls = CONFIG.get('broker', 'urls').split() for url in sorted(urls, key=lambda k: random.random()): container = Container(TimeoutHandler(url, msgs, CONFIG)) container.run() if msgs: log.debug('could not send to %s, %s messages remaining', url, len(msgs)) else: log.debug('all messages sent to %s successfully', url) break else: log.error('could not send messages to any destinations')