debian-koji/plugins/hub/protonmsg.py

328 lines
11 KiB
Python

# Koji callback for sending notifications about events using the
# qpid proton library.
# Copyright (c) 2016 Red Hat, Inc.
#
# Authors:
# Mike Bonnet <mikeb@redhat.com>
import json
import logging
import random
from proton import Message, SSLDomain
from proton.handlers import MessagingHandler
from proton.reactor import Container
import koji
from koji.context import context
from koji.plugin import callback, convert_datetime, ignore_error
CONFIG_FILE = '/etc/koji-hub/plugins/protonmsg.conf'
CONFIG = None
LOG = logging.getLogger('koji.plugin.protonmsg')
class TimeoutHandler(MessagingHandler):
def __init__(self, url, msgs, conf, *args, **kws):
super(TimeoutHandler, self).__init__(*args, **kws)
self.url = url
self.msgs = msgs
self.conf = conf
self.pending = {}
self.senders = {}
self.connect_task = None
self.timeout_task = None
self.log = logging.getLogger('koji.plugin.protonmsg.TimeoutHandler')
def on_start(self, event):
self.log.debug('Container starting')
event.container.connected = False
if self.conf.has_option('broker', 'cert') and self.conf.has_option('broker', 'cacert'):
ssl = SSLDomain(SSLDomain.MODE_CLIENT)
cert = self.conf.get('broker', 'cert')
ssl.set_credentials(cert, cert, None)
ssl.set_trusted_ca_db(self.conf.get('broker', 'cacert'))
ssl.set_peer_authentication(SSLDomain.VERIFY_PEER)
else:
ssl = None
self.log.debug('connecting to %s', self.url)
event.container.connect(url=self.url, reconnect=False, ssl_domain=ssl)
connect_timeout = self.conf.getint('broker', 'connect_timeout')
self.connect_task = event.container.schedule(connect_timeout, self)
send_timeout = self.conf.getint('broker', 'send_timeout')
self.timeout_task = event.container.schedule(send_timeout, self)
def on_timer_task(self, event):
if not event.container.connected:
self.log.error('not connected, stopping container')
if self.timeout_task:
self.timeout_task.cancel()
self.timeout_task = None
event.container.stop()
else:
# This should only run when called from the timeout task
self.log.error('send timeout expired with %s messages unsent, stopping container',
len(self.msgs))
event.container.stop()
def on_connection_opened(self, event):
event.container.connected = True
self.connect_task.cancel()
self.connect_task = None
self.log.debug('connection to %s opened successfully', event.connection.hostname)
self.send_msgs(event)
def send_msgs(self, event):
prefix = self.conf.get('broker', 'topic_prefix')
for msg in self.msgs:
address = 'topic://' + prefix + '.' + msg[0]
if address in self.senders:
sender = self.senders[address]
self.log.debug('retrieved cached sender for %s', address)
else:
sender = event.container.create_sender(event.connection, target=address)
self.log.debug('created new sender for %s', address)
self.senders[address] = sender
pmsg = Message(properties=msg[1], body=msg[2])
delivery = sender.send(pmsg)
self.log.debug('sent message: %s', msg[1])
self.pending[delivery] = msg
def update_pending(self, event):
msg = self.pending[event.delivery]
del self.pending[event.delivery]
self.log.debug('removed message from self.pending: %s', msg[1])
if not self.pending:
if self.msgs:
self.log.error('%s messages unsent (rejected or released)', len(self.msgs))
else:
self.log.debug('all messages sent successfully')
for sender in self.senders.values():
self.log.debug('closing sender for %s', sender.target.address)
sender.close()
if self.timeout_task:
self.log.debug('canceling timeout task')
self.timeout_task.cancel()
self.timeout_task = None
self.log.debug('closing connection to %s', event.connection.hostname)
event.connection.close()
def on_settled(self, event):
msg = self.pending[event.delivery]
self.msgs.remove(msg)
self.log.debug('removed message from self.msgs: %s', msg[1])
self.update_pending(event)
def on_rejected(self, event):
msg = self.pending[event.delivery]
self.log.error('message was rejected: %s', msg[1])
self.update_pending(event)
def on_released(self, event):
msg = self.pending[event.delivery]
self.log.error('message was released: %s', msg[1])
self.update_pending(event)
def on_transport_tail_closed(self, event):
if self.connect_task:
self.log.debug('canceling connect timer')
self.connect_task.cancel()
self.connect_task = None
if self.timeout_task:
self.log.debug('canceling send timer')
self.timeout_task.cancel()
self.timeout_task = None
def _strip_extra(buildinfo):
"""If extra_limit is configured, compare extra's size and drop it,
if it is over"""
global CONFIG
if not CONFIG:
CONFIG = koji.read_config_files([(CONFIG_FILE, True)])
if CONFIG.has_option('message', 'extra_limit'):
extra_limit = abs(CONFIG.getint('message', 'extra_limit'))
if extra_limit == 0:
return buildinfo
extra_size = len(json.dumps(buildinfo.get('extra', {}), default=json_serialize))
if extra_limit and extra_size > extra_limit:
LOG.debug("Dropping 'extra' from build %s (length: %d > %d)" %
(buildinfo['nvr'], extra_size, extra_limit))
buildinfo = buildinfo.copy()
del buildinfo['extra']
return buildinfo
def json_serialize(o):
"""JSON helper to encode otherwise unserializable data types"""
if isinstance(o, set):
return list(o)
LOG.error("Not JSON serializable data: %s" % repr(o))
return {"error": "Can't serialize", "type": str(type(o))}
def queue_msg(address, props, data):
msgs = getattr(context, 'protonmsg_msgs', None)
if msgs is None:
msgs = []
context.protonmsg_msgs = msgs
body = json.dumps(data, default=json_serialize)
msgs.append((address, props, body))
@convert_datetime
@callback('postPackageListChange')
def prep_package_list_change(cbtype, *args, **kws):
address = 'package.' + kws['action']
props = {'type': cbtype[4:],
'tag': kws['tag']['name'],
'package': kws['package']['name'],
'action': kws['action'],
'user': kws['user']['name']}
queue_msg(address, props, kws)
@convert_datetime
@callback('postTaskStateChange')
def prep_task_state_change(cbtype, *args, **kws):
if kws['attribute'] != 'state':
return
address = 'task.' + kws['new'].lower()
props = {'type': cbtype[4:],
'id': kws['info']['id'],
'parent': kws['info']['parent'],
'method': kws['info']['method'],
'attribute': kws['attribute'],
'old': kws['old'],
'new': kws['new']}
queue_msg(address, props, kws)
@convert_datetime
@callback('postBuildStateChange')
def prep_build_state_change(cbtype, *args, **kws):
if kws['attribute'] != 'state':
return
old = kws['old']
if old is not None:
old = koji.BUILD_STATES[old]
new = koji.BUILD_STATES[kws['new']]
address = 'build.' + new.lower()
kws['info'] = _strip_extra(kws['info'])
props = {'type': cbtype[4:],
'name': kws['info']['name'],
'version': kws['info']['version'],
'release': kws['info']['release'],
'attribute': kws['attribute'],
'old': old,
'new': new}
queue_msg(address, props, kws)
@convert_datetime
@callback('postImport')
def prep_import(cbtype, *args, **kws):
kws['build'] = _strip_extra(kws['build'])
address = 'import.' + kws['type']
props = {'type': cbtype[4:],
'importType': kws['type'],
'name': kws['build']['name'],
'version': kws['build']['version'],
'release': kws['build']['release']}
queue_msg(address, props, kws)
@convert_datetime
@callback('postRPMSign')
def prep_rpm_sign(cbtype, *args, **kws):
if not kws['sigkey']:
return
kws['build'] = _strip_extra(kws['build'])
address = 'sign.rpm'
props = {'type': cbtype[4:],
'sigkey': kws['sigkey'],
'name': kws['build']['name'],
'version': kws['build']['version'],
'release': kws['build']['release'],
'rpm_name': kws['rpm']['name'],
'rpm_version': kws['rpm']['version'],
'rpm_release': kws['rpm']['release'],
'rpm_arch': kws['rpm']['arch']}
queue_msg(address, props, kws)
def _prep_tag_msg(address, cbtype, kws):
kws['build'] = _strip_extra(kws['build'])
props = {'type': cbtype[4:],
'tag': kws['tag']['name'],
'name': kws['build']['name'],
'version': kws['build']['version'],
'release': kws['build']['release'],
'user': kws['user']['name']}
queue_msg(address, props, kws)
@convert_datetime
@callback('postTag')
def prep_tag(cbtype, *args, **kws):
_prep_tag_msg('build.tag', cbtype, kws)
@convert_datetime
@callback('postUntag')
def prep_untag(cbtype, *args, **kws):
_prep_tag_msg('build.untag', cbtype, kws)
@convert_datetime
@callback('postRepoInit')
def prep_repo_init(cbtype, *args, **kws):
address = 'repo.init'
props = {'type': cbtype[4:],
'tag': kws['tag']['name'],
'repo_id': kws['repo_id']}
queue_msg(address, props, kws)
@convert_datetime
@callback('postRepoDone')
def prep_repo_done(cbtype, *args, **kws):
address = 'repo.done'
props = {'type': cbtype[4:],
'tag': kws['repo']['tag_name'],
'repo_id': kws['repo']['id'],
'expire': kws['expire']}
queue_msg(address, props, kws)
@ignore_error
@convert_datetime
@callback('postCommit')
def send_queued_msgs(cbtype, *args, **kws):
msgs = getattr(context, 'protonmsg_msgs', None)
if not msgs:
return
global CONFIG
if not CONFIG:
CONFIG = koji.read_config_files([(CONFIG_FILE, True)])
urls = CONFIG.get('broker', 'urls').split()
test_mode = False
if CONFIG.has_option('broker', 'test_mode'):
test_mode = CONFIG.getboolean('broker', 'test_mode')
if test_mode:
LOG.debug('test mode: skipping send to urls: %r', urls)
for msg in msgs:
LOG.debug('test mode: skipped msg: %r', msg)
return
random.shuffle(urls)
for url in urls:
container = Container(TimeoutHandler(url, msgs, CONFIG))
container.run()
if msgs:
LOG.debug('could not send to %s, %s messages remaining',
url, len(msgs))
else:
LOG.debug('all messages sent to %s successfully', url)
break
else:
LOG.error('could not send messages to any destinations')