update the messagebus plugin to use the new qpid.messaging API

This commit is contained in:
Mike Bonnet 2010-10-27 11:57:38 -04:00
parent 574f9bcd5b
commit afe05a0263
2 changed files with 72 additions and 67 deletions

View file

@ -7,10 +7,7 @@
from koji.plugin import callbacks, callback, ignore_error
import ConfigParser
import logging
import qpid
import qpid.util
import qpid.connection
import qpid.datatypes
import qpid.messaging
try:
import json
except ImportError:
@ -20,95 +17,96 @@ MAX_KEY_LENGTH = 255
CONFIG_FILE = '/etc/koji-hub/plugins/messagebus.conf'
config = None
connection = None
session = None
target = None
def get_session():
global connection, config
if connection:
def get_sender():
global config, session, target
if session and target:
try:
return connection.session('koji-' + str(qpid.datatypes.uuid4()))
return session.sender(target)
except:
logging.getLogger('koji.plugin.messagebus').warning('Error getting session, will retry', exc_info=True)
connection = None
session = None
target = None
config = ConfigParser.SafeConfigParser()
config.read(CONFIG_FILE)
sock = qpid.util.connect(config.get('broker', 'host'),
int(config.get('broker', 'port')))
if config.getboolean('broker', 'ssl'):
sock = qpid.util.ssl(sock)
conn_opts = {'sock': sock, 'mechanism': config.get('broker', 'auth')}
if conn_opts['mechanism'] == 'PLAIN':
conn_opts['username'] = config.get('broker', 'username')
conn_opts['password'] = config.get('broker', 'password')
conn = qpid.connection.Connection(**conn_opts)
conn.start()
session = conn.session('koji-' + str(qpid.datatypes.uuid4()))
url = 'amqps://'
else:
url = 'amqp://'
if config.get('broker', 'auth').lower() == 'plain':
url += config.get('broker', 'username') + '/'
url += config.get('broker', 'password') + '@'
url += config.get('broker', 'host') + ':'
url += config.get('broker', 'port')
session.exchange_declare(exchange=config.get('exchange', 'name'),
type=config.get('exchange', 'type'),
durable=config.getboolean('exchange', 'durable'))
conn = qpid.messaging.Connection.establish(url,
sasl_mechanisms=config.get('broker', 'auth'))
sess = conn.session()
tgt = """%s;
{ create: sender,
assert: always,
node: { type: topic,
durable: %s,
x-declare: { exchange: "%s",
type: %s } } }""" % \
(config.get('exchange', 'name'), config.getboolean('exchange', 'durable'),
config.get('exchange', 'name'), config.get('exchange', 'type'))
sender = sess.sender(tgt)
session = sess
target = tgt
connection = conn
return session
return sender
def _token_append(tokenlist, val):
# Replace any periods with underscores so we have a deterministic number of tokens
val = val.replace('.', '_')
tokenlist.append(val)
def get_routing_key(cbtype, *args, **kws):
global config
# We're only registering for post callbacks, so strip
# off the redundant "post" prefix
key = [config.get('topic', 'prefix'), cbtype[4:]]
def get_message_subject(msgtype, *args, **kws):
key = [config.get('topic', 'prefix'), msgtype]
if cbtype in ('prePackageListChange', 'postPackageListChange'):
if msgtype == 'PackageListChange':
_token_append(key, kws['tag']['name'])
_token_append(key, kws['package']['name'])
elif cbtype in ('preTaskStateChange', 'postTaskStateChange'):
elif msgtype == 'TaskStateChange':
_token_append(key, kws['info']['method'])
_token_append(key, kws['attribute'])
elif cbtype in ('preBuildStateChange', 'postBuildStateChange'):
elif msgtype == 'BuildStateChange':
info = kws['info']
_token_append(key, kws['attribute'])
_token_append(key, info['name'])
elif cbtype in ('preImport', 'postImport'):
elif msgtype == 'Import':
_token_append(key, kws['type'])
elif cbtype in ('preTag', 'postTag', 'preUntag', 'postUntag'):
elif msgtype in ('Tag', 'Untag'):
_token_append(key, kws['tag']['name'])
build = kws['build']
_token_append(key, build['name'])
_token_append(key, kws['user']['name'])
elif cbtype in ('preRepoInit', 'postRepoInit'):
elif msgtype == 'RepoInit':
_token_append(key, kws['tag']['name'])
elif cbtype in ('preRepoDone', 'postRepoDone'):
elif msgtype == 'RepoDone':
_token_append(key, kws['repo']['tag_name'])
# ensure the routing key is an ascii string with a maximum
# length of 255 characters
key = '.'.join(key)
key = key.encode('ascii', 'xmlcharrefreplace')
key = key[:MAX_KEY_LENGTH]
return key
def get_message_headers(cbtype, *args, **kws):
if cbtype.startswith('pre'):
headers = {'type': cbtype[3:]}
else:
headers = {'type': cbtype[4:]}
def get_message_headers(msgtype, *args, **kws):
headers = {'type': msgtype}
if cbtype in ('prePackageListChange', 'postPackageListChange'):
if msgtype == 'PackageListChange':
headers['tag'] = kws['tag']['name']
headers['package'] = kws['package']['name']
elif cbtype in ('preTaskStateChange', 'postTaskStateChange'):
elif msgtype == 'TaskStateChange':
headers['method'] = kws['info']['method']
headers['attribute'] = kws['attribute']
headers['old'] = kws['old']
headers['new'] = kws['new']
elif cbtype in ('preBuildStateChange', 'postBuildStateChange'):
elif msgtype == 'BuildStateChange':
info = kws['info']
headers['name'] = info['name']
headers['version'] = info['version']
@ -116,18 +114,18 @@ def get_message_headers(cbtype, *args, **kws):
headers['attribute'] = kws['attribute']
headers['old'] = kws['old']
headers['new'] = kws['new']
elif cbtype in ('preImport', 'postImport'):
elif msgtype == 'Import':
headers['importType'] = kws['type']
elif cbtype in ('preTag', 'postTag', 'preUntag', 'postUntag'):
elif msgtype in ('Tag', 'Untag'):
headers['tag'] = kws['tag']['name']
build = kws['build']
headers['name'] = build['name']
headers['version'] = build['version']
headers['release'] = build['release']
headers['user'] = kws['user']['name']
elif cbtype in ('preRepoInit', 'postRepoInit'):
elif msgtype == 'RepoInit':
headers['tag'] = kws['tag']['name']
elif cbtype in ('preRepoDone', 'postRepoDone'):
elif msgtype == 'RepoDone':
headers['tag'] = kws['repo']['tag_name']
return headers
@ -144,22 +142,26 @@ def encode_data(data):
@ignore_error
def send_message(cbtype, *args, **kws):
global config
session = get_session()
exchange_type = config.get('exchange', 'type')
if exchange_type == 'topic':
routing_key = get_routing_key(cbtype, *args, **kws)
props = session.delivery_properties(routing_key=routing_key)
elif exchange_type == 'headers':
headers = get_message_headers(cbtype, *args, **kws)
props = session.message_properties(application_headers=headers)
sender = get_sender()
if cbtype.startswith('post'):
msgtype = cbtype[4:]
else:
raise koji.PluginError, 'unsupported exchange type: %s' % exchange_type
msgtype = cbtype[3:]
data = kws.copy()
if args:
data['args'] = list(args)
payload = encode_data(data)
message = qpid.datatypes.Message(props, payload)
session.message_transfer(destination=config.get('exchange', 'name'), message=message)
session.close()
exchange_type = config.get('exchange', 'type')
if exchange_type == 'topic':
subject = get_message_subject(msgtype, *args, **kws)
message = qpid.messaging.Message(subject=subject, content=payload)
elif exchange_type == 'headers':
headers = get_message_headers(msgtype, *args, **kws)
message = qpid.messaging.Message(properties=headers, content=payload)
else:
raise koji.PluginError, 'unsupported exchange type: %s' % exchange_type
sender.send(message)
sender.close()