diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index 3a9919b1..9019382c 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -197,7 +197,11 @@ The following fields are understood: * ``cert`` -- the combined client cert and key file for authenticating koji to the broker. * ``cacert`` -- the CA certificate to verify the broker server TLS connection -* ``topic_prefix`` -- this string will be used as a prefix for all message topics +* ``topic_prefix`` -- Koji uses this string as a prefix for all message + topics. For example, if you choose ``topic://koji``, then Koji + will publish messages on ``topic://koji.package.add`` when an user runs + ``kojidev add-pkg`` etc. Use ``topic://`` prefixes for ActiveMQ brokers, + ``/topic/`` for RabbitMQ brokers. * ``connect_timeout`` -- the number of seconds to wait for a connection before timing out * ``send_timeout`` -- the number of seconds to wait while sending a message diff --git a/plugins/hub/protonmsg.conf b/plugins/hub/protonmsg.conf index 33e98b78..45f43c48 100644 --- a/plugins/hub/protonmsg.conf +++ b/plugins/hub/protonmsg.conf @@ -2,7 +2,7 @@ urls = amqps://broker1.example.com:5671 amqps://broker2.example.com:5671 cert = /etc/koji-hub/plugins/client.pem cacert = /etc/koji-hub/plugins/ca.pem -topic_prefix = koji +topic_prefix = topic://koji connect_timeout = 10 send_timeout = 60 diff --git a/plugins/hub/protonmsg.py b/plugins/hub/protonmsg.py index 148ed2da..e415bf0b 100644 --- a/plugins/hub/protonmsg.py +++ b/plugins/hub/protonmsg.py @@ -74,10 +74,28 @@ class TimeoutHandler(MessagingHandler): self.log.debug('connection to %s opened successfully', event.connection.hostname) self.send_msgs(event) + @property + def topic_prefix(self): + """Normalize topic_prefix value that the user configured. + + RabbitMQ brokers require that topics start with "/topic/" + ActiveMQ brokers require that topics start with "topic://" + + If the user specified a prefix that begins with one or the other, use + that. For backwards compatibility, if the user chose neither, prepend + "topic://". + """ + koji_topic_prefix = self.conf.get('broker', 'topic_prefix') + if koji_topic_prefix.startswith('/topic/'): + return koji_topic_prefix + if koji_topic_prefix.startswith('topic://'): + return koji_topic_prefix + return 'topic://' + koji_topic_prefix + def send_msgs(self, event): - prefix = self.conf.get('broker', 'topic_prefix') for msg in self.msgs: - address = 'topic://' + prefix + '.' + msg['address'] + # address is like "topic://koji.package.add" + address = self.topic_prefix + '.' + msg['address'] if address in self.senders: sender = self.senders[address] self.log.debug('retrieved cached sender for %s', address) diff --git a/tests/test_plugins/test_protonmsg.py b/tests/test_plugins/test_protonmsg.py index 47020f68..66543c1f 100644 --- a/tests/test_plugins/test_protonmsg.py +++ b/tests/test_plugins/test_protonmsg.py @@ -3,6 +3,7 @@ from __future__ import absolute_import import json import tempfile import unittest +import pytest import protonmsg import mock @@ -488,3 +489,17 @@ send_timeout = 60 self.assertEqual(event.container.schedule.return_value.cancel.call_count, 2) self.assertTrue(self.handler.connect_task is None) self.assertTrue(self.handler.timeout_task is None) + + +@pytest.mark.parametrize('topic_prefix,expected', ( + ('koji', 'topic://koji'), + ('brew', 'topic://brew'), + ('topic://koji', 'topic://koji'), + ('/topic/koji', '/topic/koji'), +)) +def test_topic_prefix(topic_prefix, expected): + conf = ConfigParser() + conf.add_section('broker') + conf.set('broker', 'topic_prefix', topic_prefix) + handler = protonmsg.TimeoutHandler('amqp://broker1.example.com:5672', [], conf) + assert handler.topic_prefix == expected