diff --git a/plugins/messagebus.py b/plugins/messagebus.py index 89a14347..e5028912 100644 --- a/plugins/messagebus.py +++ b/plugins/messagebus.py @@ -93,6 +93,40 @@ def get_routing_key(cbtype, *args, **kws): key = key[:MAX_KEY_LENGTH] return key +def get_message_headers(cbtype, *args, **kws): + headers = {'type': cbtype} + + if cbtype in ('prePackageListChange', 'postPackageListChange'): + headers['tag'] = kws['tag']['name'] + headers['package'] = kws['package']['name'] + elif cbtype in ('preTaskStateChange', 'postTaskStateChange'): + headers['attribute'] = kws['attribute'] + headers['old'] = kws['old'] + headers['new'] = kws['new'] + elif cbtype in ('preBuildStateChange', 'postBuildStateChange'): + info = kws['info'] + headers['name'] = info['name'] + headers['version'] = info['version'] + headers['release'] = info['release'] + headers['attribute'] = kws['attribute'] + headers['old'] = kws['old'] + headers['new'] = kws['new'] + elif cbtype in ('preImport', 'postImport'): + headers['importType'] = kws['type'] + elif cbtype in ('preTag', 'postTag', 'preUntag', 'postUntag'): + headers['tag'] = kws['tag']['name'] + build = kws['build'] + headers['name'] = build['name'] + headers['version'] = build['version'] + headers['release'] = build['release'] + headers['user'] = kws['user']['name'] + elif cbtype in ('preRepoInit', 'postRepoInit'): + headers['tag'] = kws['tag']['name'] + elif cbtype in ('preRepoDone', 'postRepoDone'): + headers['tag'] = kws['repo']['tag_name'] + + return headers + def encode_data(data): global config format = config.get('format', 'encoding') @@ -106,8 +140,17 @@ def encode_data(data): def send_message(cbtype, *args, **kws): global config session = get_session() - routing_key = get_routing_key(cbtype, *args, **kws) - props = session.delivery_properties(routing_key=routing_key) + exchange_type = config.get('exchange', 'type') + + if exchange_type == 'topic': + routing_key = get_routing_key(cbtype, *args, **kws) + props = session.delivery_properties(routing_key=routing_key) + elif exchange_type == 'headers': + headers = get_message_headers(cbtype, *args, **kws) + props = session.message_properties(application_headers=headers) + else: + raise koji.PluginError, 'unsupported exchange type: %s' % exchange_type + data = kws.copy() if args: data['args'] = list(args)