debian-koji/plugins/hub/messagebus.py
Mike McLean 91b306e108 Avoid failing at import time if krbV module is missing
Several parts of Koji already handled this absence gracefully. This change
extends that behavior to all places where krbV is imported.
2018-04-02 16:38:27 -04:00

277 lines
9.3 KiB
Python

# Koji callback for sending notifications about events to a messagebus (amqp broker)
# Copyright (c) 2009-2014 Red Hat, Inc.
#
# Authors:
# Mike Bonnet <mikeb@redhat.com>
from koji import PluginError
from koji.context import context
from koji.plugin import callbacks, callback, ignore_error, convert_datetime
import ConfigParser
import logging
import qpid.messaging
import qpid.messaging.transports
from ssl import wrap_socket
import socket
import os
try:
import krbV
except ImportError: # pragma: no cover
krbV = None
MAX_KEY_LENGTH = 255
CONFIG_FILE = '/etc/koji-hub/plugins/messagebus.conf'
config = None
session = None
target = None
def connect_timeout(host, port, timeout):
for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
sock = socket.socket(af, socktype, proto)
sock.settimeout(timeout)
try:
sock.connect(sa)
break
except socket.error:
sock.close()
else:
# If we got here then we couldn't connect (yet)
raise
return sock
class tlstimeout(qpid.messaging.transports.tls):
def __init__(self, conn, host, port):
self.socket = connect_timeout(host, port, getattr(conn, '_timeout'))
if conn.tcp_nodelay:
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.tls = wrap_socket(self.socket, keyfile=conn.ssl_keyfile, certfile=conn.ssl_certfile, ca_certs=conn.ssl_trustfile)
self.socket.setblocking(0)
self.state = None
self.write_retry = None
qpid.messaging.transports.TRANSPORTS['tls+timeout'] = tlstimeout
class Connection(qpid.messaging.Connection):
"""
A connection class which supports a timeout option
to the establish() method. Only necessary until
upstream Apache Qpid commit 1487578 is available in
a supported release.
"""
@staticmethod
def establish(url=None, timeout=None, **options):
conn = Connection(url, **options)
conn._timeout = timeout
conn.open()
return conn
def _wait(self, predicate, timeout=None):
if timeout is None and hasattr(self, '_timeout'):
timeout = self._timeout
return qpid.messaging.Connection._wait(self, predicate, timeout)
def get_config():
global config
if config:
return config
config = ConfigParser.SafeConfigParser()
config.read(CONFIG_FILE)
if not config.has_option('broker', 'timeout'):
config.set('broker', 'timeout', '60')
if not config.has_option('broker', 'heartbeat'):
config.set('broker', 'heartbeat', '60')
return config
def get_sender():
global session, target
if session and target:
try:
return session.sender(target)
except:
logging.getLogger('koji.plugin.messagebus').warning('Error getting session, will retry', exc_info=True)
session = None
target = None
config = get_config()
if config.getboolean('broker', 'ssl'):
url = 'amqps://'
else:
url = 'amqp://'
auth = config.get('broker', 'auth')
if auth == 'PLAIN':
url += config.get('broker', 'username') + '/'
url += config.get('broker', 'password') + '@'
elif auth == 'GSSAPI':
if krbV is None:
# TODO: port this to python-gssapi
raise PluginError('krbV module not installed')
ccname = 'MEMORY:messagebus'
os.environ['KRB5CCNAME'] = ccname
ctx = krbV.default_context()
ccache = krbV.CCache(name=ccname, context=ctx)
cprinc = krbV.Principal(name=config.get('broker', 'principal'), context=ctx)
ccache.init(principal=cprinc)
keytab = krbV.Keytab(name='FILE:' + config.get('broker', 'keytab'), context=ctx)
ccache.init_creds_keytab(principal=cprinc, keytab=keytab)
else:
raise PluginError('unsupported auth type: %s' % auth)
url += config.get('broker', 'host') + ':'
url += config.get('broker', 'port')
conn = Connection.establish(url,
sasl_mechanisms=config.get('broker', 'auth'),
transport='tls+timeout',
timeout=config.getfloat('broker', 'timeout'),
heartbeat=config.getint('broker', 'heartbeat'))
sess = conn.session()
tgt = """%s;
{ create: sender,
assert: always,
node: { type: topic,
durable: %s,
x-declare: { exchange: "%s",
type: %s } } }""" % \
(config.get('exchange', 'name'), config.getboolean('exchange', 'durable'),
config.get('exchange', 'name'), config.get('exchange', 'type'))
sender = sess.sender(tgt)
session = sess
target = tgt
return sender
def _token_append(tokenlist, val):
# Replace any periods with underscores so we have a deterministic number of tokens
val = val.replace('.', '_')
tokenlist.append(val)
def get_message_subject(msgtype, *args, **kws):
key = [config.get('topic', 'prefix'), msgtype]
if msgtype == 'PackageListChange':
_token_append(key, kws['tag']['name'])
_token_append(key, kws['package']['name'])
elif msgtype == 'TaskStateChange':
_token_append(key, kws['info']['method'])
_token_append(key, kws['attribute'])
elif msgtype == 'BuildStateChange':
info = kws['info']
_token_append(key, kws['attribute'])
_token_append(key, info['name'])
elif msgtype == 'Import':
_token_append(key, kws['type'])
elif msgtype in ('Tag', 'Untag'):
_token_append(key, kws['tag']['name'])
build = kws['build']
_token_append(key, build['name'])
_token_append(key, kws['user']['name'])
elif msgtype == 'RepoInit':
_token_append(key, kws['tag']['name'])
elif msgtype == 'RepoDone':
_token_append(key, kws['repo']['tag_name'])
key = '.'.join(key)
key = key[:MAX_KEY_LENGTH]
return key
def get_message_headers(msgtype, *args, **kws):
headers = {'type': msgtype}
if msgtype == 'PackageListChange':
headers['tag'] = kws['tag']['name']
headers['package'] = kws['package']['name']
elif msgtype == 'TaskStateChange':
headers['id'] = kws['info']['id']
headers['parent'] = kws['info']['parent']
headers['method'] = kws['info']['method']
headers['attribute'] = kws['attribute']
headers['old'] = kws['old']
headers['new'] = kws['new']
elif msgtype == 'BuildStateChange':
info = kws['info']
headers['name'] = info['name']
headers['version'] = info['version']
headers['release'] = info['release']
headers['attribute'] = kws['attribute']
headers['old'] = kws['old']
headers['new'] = kws['new']
elif msgtype == 'Import':
headers['importType'] = kws['type']
elif msgtype in ('Tag', 'Untag'):
headers['tag'] = kws['tag']['name']
build = kws['build']
headers['name'] = build['name']
headers['version'] = build['version']
headers['release'] = build['release']
headers['user'] = kws['user']['name']
elif msgtype == 'RepoInit':
headers['tag'] = kws['tag']['name']
elif msgtype == 'RepoDone':
headers['tag'] = kws['repo']['tag_name']
return headers
@callback(*[c for c in callbacks.keys() if c.startswith('post')
and c != 'postCommit'])
@ignore_error
@convert_datetime
def prep_message(cbtype, *args, **kws):
if cbtype.startswith('post'):
msgtype = cbtype[4:]
else:
msgtype = cbtype[3:]
data = kws.copy()
if args:
data['args'] = list(args)
config = get_config()
exchange_type = config.get('exchange', 'type')
if exchange_type == 'topic':
subject = get_message_subject(msgtype, *args, **kws)
message = qpid.messaging.Message(subject=subject, content=data)
elif exchange_type == 'headers':
headers = get_message_headers(msgtype, *args, **kws)
message = qpid.messaging.Message(properties=headers, content=data)
else:
raise PluginError('unsupported exchange type: %s' % exchange_type)
messages = getattr(context, 'messagebus_plugin_messages', None)
if messages is None:
messages = []
context.messagebus_plugin_messages = messages
messages.append(message)
@callback('postCommit')
@ignore_error
def send_messages(cbtype, *args, **kws):
'''Send the messages cached by prep_message'''
logger = logging.getLogger('koji.plugin.messagebus')
config = get_config()
messages = getattr(context, 'messagebus_plugin_messages', [])
if not messages:
return
test_mode = False
if config.has_option('broker', 'test_mode'):
test_mode = config.getboolean('broker', 'test_mode')
if test_mode:
logger.debug('test mode: skipping broker connection')
for message in messages:
logger.debug('test mode: skipping message: %r', message)
else:
sender = get_sender()
for message in messages:
sender.send(message, sync=False,
timeout=config.getfloat('broker', 'timeout'))
sender.close(timeout=config.getfloat('broker', 'timeout'))
# koji should do this for us, but just in case...
del context.messagebus_plugin_messages