From dbacf1f9855d5b808c943004d2c2856fa5f83a46 Mon Sep 17 00:00:00 2001 From: Ken Dreyer Date: Mon, 29 Nov 2021 16:06:33 -0500 Subject: [PATCH] protonmsg: allow users to specify router-specific topic prefixes Prior to this change, Koji would always send messages with a hard-coded topic:// prefix. This works fine for ActiveMQ 5 and Artemis, but RabbitMQ does not support that syntax. Instead, RabbitMQ brokers expect clients to use topic addresses with a "/topic/" prefix. The RFE for RabbitMQ to support "topic://" prefixes is https://github.com/rabbitmq/rabbitmq-server/issues/2583 In the meantime, allow users to specify "topic://" or "/topic/" explicitly in their configuration. For backwards-compatibility, if the user chose neither "topic://" nor "/topic/", prepend the "topic://" string, preserving the plugin's existing behavior. (Note: ActiveMQ 5 advertises its expected topic:// prefix in the initial connection properties, so we could read that value dynamically, but RabbitMQ and Artemis do not send an expected topic prefix connection properties, so we just make the user choose explicitly here.) --- docs/source/plugins.rst | 6 +++++- plugins/hub/protonmsg.conf | 2 +- plugins/hub/protonmsg.py | 22 ++++++++++++++++++++-- tests/test_plugins/test_protonmsg.py | 15 +++++++++++++++ 4 files changed, 41 insertions(+), 4 deletions(-) diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index 3a9919b1..9019382c 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -197,7 +197,11 @@ The following fields are understood: * ``cert`` -- the combined client cert and key file for authenticating koji to the broker. * ``cacert`` -- the CA certificate to verify the broker server TLS connection -* ``topic_prefix`` -- this string will be used as a prefix for all message topics +* ``topic_prefix`` -- Koji uses this string as a prefix for all message + topics. For example, if you choose ``topic://koji``, then Koji + will publish messages on ``topic://koji.package.add`` when an user runs + ``kojidev add-pkg`` etc. Use ``topic://`` prefixes for ActiveMQ brokers, + ``/topic/`` for RabbitMQ brokers. * ``connect_timeout`` -- the number of seconds to wait for a connection before timing out * ``send_timeout`` -- the number of seconds to wait while sending a message diff --git a/plugins/hub/protonmsg.conf b/plugins/hub/protonmsg.conf index 33e98b78..45f43c48 100644 --- a/plugins/hub/protonmsg.conf +++ b/plugins/hub/protonmsg.conf @@ -2,7 +2,7 @@ urls = amqps://broker1.example.com:5671 amqps://broker2.example.com:5671 cert = /etc/koji-hub/plugins/client.pem cacert = /etc/koji-hub/plugins/ca.pem -topic_prefix = koji +topic_prefix = topic://koji connect_timeout = 10 send_timeout = 60 diff --git a/plugins/hub/protonmsg.py b/plugins/hub/protonmsg.py index 148ed2da..e415bf0b 100644 --- a/plugins/hub/protonmsg.py +++ b/plugins/hub/protonmsg.py @@ -74,10 +74,28 @@ class TimeoutHandler(MessagingHandler): self.log.debug('connection to %s opened successfully', event.connection.hostname) self.send_msgs(event) + @property + def topic_prefix(self): + """Normalize topic_prefix value that the user configured. + + RabbitMQ brokers require that topics start with "/topic/" + ActiveMQ brokers require that topics start with "topic://" + + If the user specified a prefix that begins with one or the other, use + that. For backwards compatibility, if the user chose neither, prepend + "topic://". + """ + koji_topic_prefix = self.conf.get('broker', 'topic_prefix') + if koji_topic_prefix.startswith('/topic/'): + return koji_topic_prefix + if koji_topic_prefix.startswith('topic://'): + return koji_topic_prefix + return 'topic://' + koji_topic_prefix + def send_msgs(self, event): - prefix = self.conf.get('broker', 'topic_prefix') for msg in self.msgs: - address = 'topic://' + prefix + '.' + msg['address'] + # address is like "topic://koji.package.add" + address = self.topic_prefix + '.' + msg['address'] if address in self.senders: sender = self.senders[address] self.log.debug('retrieved cached sender for %s', address) diff --git a/tests/test_plugins/test_protonmsg.py b/tests/test_plugins/test_protonmsg.py index 47020f68..66543c1f 100644 --- a/tests/test_plugins/test_protonmsg.py +++ b/tests/test_plugins/test_protonmsg.py @@ -3,6 +3,7 @@ from __future__ import absolute_import import json import tempfile import unittest +import pytest import protonmsg import mock @@ -488,3 +489,17 @@ send_timeout = 60 self.assertEqual(event.container.schedule.return_value.cancel.call_count, 2) self.assertTrue(self.handler.connect_task is None) self.assertTrue(self.handler.timeout_task is None) + + +@pytest.mark.parametrize('topic_prefix,expected', ( + ('koji', 'topic://koji'), + ('brew', 'topic://brew'), + ('topic://koji', 'topic://koji'), + ('/topic/koji', '/topic/koji'), +)) +def test_topic_prefix(topic_prefix, expected): + conf = ConfigParser() + conf.add_section('broker') + conf.set('broker', 'topic_prefix', topic_prefix) + handler = protonmsg.TimeoutHandler('amqp://broker1.example.com:5672', [], conf) + assert handler.topic_prefix == expected