protonmsg: allow users to specify router-specific topic prefixes

Prior to this change, Koji would always send messages with a hard-coded
topic:// prefix.

This works fine for ActiveMQ 5 and Artemis, but RabbitMQ does not
support that syntax. Instead, RabbitMQ brokers expect clients to use
topic addresses with a "/topic/" prefix.

The RFE for RabbitMQ to support "topic://" prefixes is
https://github.com/rabbitmq/rabbitmq-server/issues/2583

In the meantime, allow users to specify "topic://" or "/topic/"
explicitly in their configuration.

For backwards-compatibility, if the user chose neither "topic://" nor
"/topic/", prepend the "topic://" string, preserving the plugin's
existing behavior.

(Note: ActiveMQ 5 advertises its expected topic:// prefix in the initial
connection properties, so we could read that value dynamically, but
RabbitMQ and Artemis do not send an expected topic prefix connection
properties, so we just make the user choose explicitly here.)
This commit is contained in:
Ken Dreyer 2021-11-29 16:06:33 -05:00 committed by Tomas Kopecek
parent dc47bcc8cc
commit dbacf1f985
4 changed files with 41 additions and 4 deletions

View file

@ -197,7 +197,11 @@ The following fields are understood:
* ``cert`` -- the combined client cert and key file for authenticating koji to
the broker.
* ``cacert`` -- the CA certificate to verify the broker server TLS connection
* ``topic_prefix`` -- this string will be used as a prefix for all message topics
* ``topic_prefix`` -- Koji uses this string as a prefix for all message
topics. For example, if you choose ``topic://koji``, then Koji
will publish messages on ``topic://koji.package.add`` when an user runs
``kojidev add-pkg`` etc. Use ``topic://`` prefixes for ActiveMQ brokers,
``/topic/`` for RabbitMQ brokers.
* ``connect_timeout`` -- the number of seconds to wait for a connection before
timing out
* ``send_timeout`` -- the number of seconds to wait while sending a message

View file

@ -2,7 +2,7 @@
urls = amqps://broker1.example.com:5671 amqps://broker2.example.com:5671
cert = /etc/koji-hub/plugins/client.pem
cacert = /etc/koji-hub/plugins/ca.pem
topic_prefix = koji
topic_prefix = topic://koji
connect_timeout = 10
send_timeout = 60

View file

@ -74,10 +74,28 @@ class TimeoutHandler(MessagingHandler):
self.log.debug('connection to %s opened successfully', event.connection.hostname)
self.send_msgs(event)
@property
def topic_prefix(self):
"""Normalize topic_prefix value that the user configured.
RabbitMQ brokers require that topics start with "/topic/"
ActiveMQ brokers require that topics start with "topic://"
If the user specified a prefix that begins with one or the other, use
that. For backwards compatibility, if the user chose neither, prepend
"topic://".
"""
koji_topic_prefix = self.conf.get('broker', 'topic_prefix')
if koji_topic_prefix.startswith('/topic/'):
return koji_topic_prefix
if koji_topic_prefix.startswith('topic://'):
return koji_topic_prefix
return 'topic://' + koji_topic_prefix
def send_msgs(self, event):
prefix = self.conf.get('broker', 'topic_prefix')
for msg in self.msgs:
address = 'topic://' + prefix + '.' + msg['address']
# address is like "topic://koji.package.add"
address = self.topic_prefix + '.' + msg['address']
if address in self.senders:
sender = self.senders[address]
self.log.debug('retrieved cached sender for %s', address)

View file

@ -3,6 +3,7 @@ from __future__ import absolute_import
import json
import tempfile
import unittest
import pytest
import protonmsg
import mock
@ -488,3 +489,17 @@ send_timeout = 60
self.assertEqual(event.container.schedule.return_value.cancel.call_count, 2)
self.assertTrue(self.handler.connect_task is None)
self.assertTrue(self.handler.timeout_task is None)
@pytest.mark.parametrize('topic_prefix,expected', (
('koji', 'topic://koji'),
('brew', 'topic://brew'),
('topic://koji', 'topic://koji'),
('/topic/koji', '/topic/koji'),
))
def test_topic_prefix(topic_prefix, expected):
conf = ConfigParser()
conf.add_section('broker')
conf.set('broker', 'topic_prefix', topic_prefix)
handler = protonmsg.TimeoutHandler('amqp://broker1.example.com:5672', [], conf)
assert handler.topic_prefix == expected