debian-koji/plugins/hub/protonmsg.py
2017-06-01 09:39:24 +02:00

264 lines
9.5 KiB
Python

# Koji callback for sending notifications about events using the
# qpid proton library.
# Copyright (c) 2016 Red Hat, Inc.
#
# Authors:
# Mike Bonnet <mikeb@redhat.com>
import koji
from koji.plugin import callback, ignore_error
from koji.context import context
import ConfigParser
import logging
import json
import random
from proton import Message, SSLDomain
from proton.reactor import Container
from proton.handlers import MessagingHandler
CONFIG_FILE = '/etc/koji-hub/plugins/protonmsg.conf'
CONFIG = None
class TimeoutHandler(MessagingHandler):
def __init__(self, url, msgs, conf, *args, **kws):
super(TimeoutHandler, self).__init__(*args, **kws)
self.url = url
self.msgs = msgs
self.conf = conf
self.pending = {}
self.senders = {}
self.connect_task = None
self.timeout_task = None
self.log = logging.getLogger('koji.plugin.protonmsg.TimeoutHandler')
def on_start(self, event):
self.log.debug('Container starting')
event.container.connected = False
if self.conf.has_option('broker', 'cert') and self.conf.has_option('broker', 'cacert'):
ssl = SSLDomain(SSLDomain.MODE_CLIENT)
cert = self.conf.get('broker', 'cert')
ssl.set_credentials(cert, cert, None)
ssl.set_trusted_ca_db(self.conf.get('broker', 'cacert'))
ssl.set_peer_authentication(SSLDomain.VERIFY_PEER)
else:
ssl = None
self.log.debug('connecting to %s', self.url)
event.container.connect(url=self.url, reconnect=False, ssl_domain=ssl)
connect_timeout = self.conf.getint('broker', 'connect_timeout')
self.connect_task = event.container.schedule(connect_timeout, self)
send_timeout = self.conf.getint('broker', 'send_timeout')
self.timeout_task = event.container.schedule(send_timeout, self)
def on_timer_task(self, event):
if not event.container.connected:
self.log.error('not connected, stopping container')
if self.timeout_task:
self.timeout_task.cancel()
self.timeout_task = None
event.container.stop()
else:
# This should only run when called from the timeout task
self.log.error('send timeout expired with %s messages unsent, stopping container',
len(self.msgs))
event.container.stop()
def on_connection_opened(self, event):
event.container.connected = True
self.connect_task.cancel()
self.connect_task = None
self.log.debug('connection to %s opened successfully', event.connection.hostname)
self.send_msgs(event)
def send_msgs(self, event):
prefix = self.conf.get('broker', 'topic_prefix')
for msg in self.msgs:
address = 'topic://' + prefix + '.' + msg[0]
if address in self.senders:
sender = self.senders[address]
self.log.debug('retrieved cached sender for %s', address)
else:
sender = event.container.create_sender(event.connection, target=address)
self.log.debug('created new sender for %s', address)
self.senders[address] = sender
pmsg = Message(properties=msg[1], body=msg[2])
delivery = sender.send(pmsg)
self.log.debug('sent message: %s', msg[1])
self.pending[delivery] = msg
def update_pending(self, event):
msg = self.pending[event.delivery]
del self.pending[event.delivery]
self.log.debug('removed message from self.pending: %s', msg[1])
if not self.pending:
if self.msgs:
self.log.error('%s messages unsent (rejected or released)', len(self.msgs))
else:
self.log.debug('all messages sent successfully')
for sender in self.senders.values():
self.log.debug('closing sender for %s', sender.target.address)
sender.close()
if self.timeout_task:
self.log.debug('canceling timeout task')
self.timeout_task.cancel()
self.timeout_task = None
self.log.debug('closing connection to %s', event.connection.hostname)
event.connection.close()
def on_settled(self, event):
msg = self.pending[event.delivery]
self.msgs.remove(msg)
self.log.debug('removed message from self.msgs: %s', msg[1])
self.update_pending(event)
def on_rejected(self, event):
msg = self.pending[event.delivery]
self.log.error('message was rejected: %s', msg[1])
self.update_pending(event)
def on_released(self, event):
msg = self.pending[event.delivery]
self.log.error('message was released: %s', msg[1])
self.update_pending(event)
def on_transport_tail_closed(self, event):
if self.connect_task:
self.log.debug('canceling connect timer')
self.connect_task.cancel()
self.connect_task = None
if self.timeout_task:
self.log.debug('canceling send timer')
self.timeout_task.cancel()
self.timeout_task = None
def queue_msg(address, props, data):
msgs = getattr(context, 'protonmsg_msgs', None)
if msgs is None:
msgs = []
context.protonmsg_msgs = msgs
body = json.dumps(data)
msgs.append((address, props, body))
@callback('postPackageListChange')
def prep_package_list_change(cbtype, *args, **kws):
address = 'package.' + kws['action']
props = {'type': cbtype[4:],
'tag': kws['tag']['name'],
'package': kws['package']['name'],
'action': kws['action']}
queue_msg(address, props, kws)
@callback('postTaskStateChange')
def prep_task_state_change(cbtype, *args, **kws):
if kws['attribute'] != 'state':
return
address = 'task.' + kws['new'].lower()
props = {'type': cbtype[4:],
'id': kws['info']['id'],
'parent': kws['info']['parent'],
'method': kws['info']['method'],
'attribute': kws['attribute'],
'old': kws['old'],
'new': kws['new']}
queue_msg(address, props, kws)
@callback('postBuildStateChange')
def prep_build_state_change(cbtype, *args, **kws):
if kws['attribute'] != 'state':
return
old = kws['old']
if old is not None:
old = koji.BUILD_STATES[old]
new = koji.BUILD_STATES[kws['new']]
address = 'build.' + new.lower()
props = {'type': cbtype[4:],
'name': kws['info']['name'],
'version': kws['info']['version'],
'release': kws['info']['release'],
'attribute': kws['attribute'],
'old': old,
'new': new}
queue_msg(address, props, kws)
@callback('postImport')
def prep_import(cbtype, *args, **kws):
address = 'import.' + kws['type']
props = {'type': cbtype[4:],
'importType': kws['type'],
'name': kws['build']['name'],
'version': kws['build']['version'],
'release': kws['build']['release']}
queue_msg(address, props, kws)
@callback('postRPMSign')
def prep_rpm_sign(cbtype, *args, **kws):
address = 'sign.rpm'
props = {'type': cbtype[4:],
'sigkey': kws['sigkey'],
'name': kws['build']['name'],
'version': kws['build']['version'],
'release': kws['build']['release'],
'rpm_name': kws['rpm']['name'],
'rpm_version': kws['rpm']['version'],
'rpm_release': kws['rpm']['release']}
queue_msg(address, props, kws)
def _prep_tag_msg(address, cbtype, kws):
build = kws['build']
props = {'type': cbtype[4:],
'tag': kws['tag']['name'],
'name': build['name'],
'version': build['version'],
'release': build['release'],
'user': kws['user']['name']}
queue_msg(address, props, kws)
@callback('postTag')
def prep_tag(cbtype, *args, **kws):
_prep_tag_msg('build.tag', cbtype, kws)
@callback('postUntag')
def prep_untag(cbtype, *args, **kws):
_prep_tag_msg('build.untag', cbtype, kws)
@callback('postRepoInit')
def prep_repo_init(cbtype, *args, **kws):
address = 'repo.init'
props = {'type': cbtype[4:],
'tag': kws['tag']['name'],
'repo_id': kws['repo_id']}
queue_msg(address, props, kws)
@callback('postRepoDone')
def prep_repo_done(cbtype, *args, **kws):
address = 'repo.done'
props = {'type': cbtype[4:],
'tag': kws['repo']['tag_name'],
'repo_id': kws['repo']['id'],
'expire': kws['expire']}
queue_msg(address, props, kws)
@ignore_error
@callback('postCommit')
def send_queued_msgs(cbtype, *args, **kws):
msgs = getattr(context, 'protonmsg_msgs', None)
if not msgs:
return
log = logging.getLogger('koji.plugin.protonmsg')
global CONFIG
if not CONFIG:
conf = ConfigParser.SafeConfigParser()
with open(CONFIG_FILE) as conffile:
conf.readfp(conffile)
CONFIG = conf
urls = CONFIG.get('broker', 'urls').split()
for url in sorted(urls, key=lambda k: random.random()):
container = Container(TimeoutHandler(url, msgs, CONFIG))
container.run()
if msgs:
log.debug('could not send to %s, %s messages remaining',
url, len(msgs))
else:
log.debug('all messages sent to %s successfully', url)
break
else:
log.error('could not send messages to any destinations')