avoid message re-insertion, extend test mode, schema and docs updates

include new table in main schema, since this plugin is part of Koji itself

clean up and expand the docs for this plugin

refactor query_from_db() into handle_db_msgs()
* fix lock error cases
* only delete messages from queue if we successfully send them
* handle test_mode

allow test_mode to exercise db queue via test_mode_fail setting
This commit is contained in:
Mike McLean 2020-09-24 15:54:27 -04:00 committed by Tomas Kopecek
parent aff74c4b3a
commit 4ddc48e72a
4 changed files with 113 additions and 47 deletions

View file

@ -6,4 +6,15 @@ BEGIN;
CREATE INDEX task_by_no_parent_state_method ON task(parent, state, method) WHERE parent IS NULL; CREATE INDEX task_by_no_parent_state_method ON task(parent, state, method) WHERE parent IS NULL;
-- Message queue for the protonmsg plugin
CREATE TABLE proton_queue (
id SERIAL PRIMARY KEY,
created_ts TIMESTAMPTZ,
address TEXT NOT NULL,
props JSON NOT NULL,
body JSON NOT NULL
) WITHOUT OIDS;
COMMIT; COMMIT;

View file

@ -937,4 +937,15 @@ CREATE TABLE win_archives (
flags TEXT flags TEXT
) WITHOUT OIDS; ) WITHOUT OIDS;
-- Message queue for the protonmsg plugin
CREATE TABLE proton_queue (
id SERIAL PRIMARY KEY,
created_ts TIMESTAMPTZ,
address TEXT NOT NULL,
props JSON NOT NULL,
body JSON NOT NULL
) WITHOUT OIDS;
COMMIT WORK; COMMIT WORK;

View file

@ -162,34 +162,54 @@ And in scripts, you can use following calls:
Proton messaging Proton messaging
================ ================
It is hub-only plugin which can send all the messages produced by koji to amqps The ``protonmsg`` plugin for the hub will, if enabled, send a wide range of
message brokers. messages about Koji activity to the configured amqps message brokers.
Most callback events on the hub are translated into messages.
``Plugins = protonmsg`` needs to be added to ``/etc/koji-hub/hub.conf``. In order to enable this plugin, you must:
Configuration file must be placed in ``/etc/koji-hub/plugins/protonmsg.conf``.
There are three sections in config file - broker, queue and message.
Broker section allows admin to set up connection options like urls, * add ``protonmsg`` to the ``Plugins`` setting in ``/etc/koji-hub/hub.conf``
certificates, timeouts and topic prefix.
Normally, only messages in apache process memory are remembered. There are * provide a configuration file for the plugin at
various reasons, why these messages can be lost if broker is unavailable for ``/etc/koji-hub/plugins/protonmsg.conf``
longer time. For more reliability admin can enable persistent database message
queue. For this is section ``queue`` where ``enabled`` boolean enables this
behaviour. Currently you need to create table manually by running the following
SQL:
.. code-block:: plpgsql The configuration file is ini-style format with three sections: broker,
queue and message.
The ``[broker]`` section defines how the plugin connects to the message bus.
The following fields are understood:
CREATE TABLE proton_queue ( * ``urls`` -- a space separated list of amqps urls. Additional urls are
id SERIAL PRIMARY KEY, treated as fallbacks. The plugin will send to the first one that accepts
props JSON NOT NULL, the message
body JSON NOT NULL * ``cert`` -- the client cert file for authentication
) * ``cacert`` -- the ca cert to validate the server
* ``topic_prefix`` -- this string will be used as a prefix for all message topics
* ``connect_timeout`` -- the number of seconds to wait for a connection before
timing out
* ``send_timeout`` -- the number of seconds to wait while sending a message
before timing out
Last related option is ``batch_size`` - it says how many messages are send The ``[message]`` section sets parameters for how messages are formed.
during one request. It should be balanced number. If there is a large queue it Currently only one field is understood:
shouldn't block the request itself as user is waiting for it. On the other hand
it is not hardcoded as it plays with ``extra_limit`` - e.g. there could be more small * ``extra_limit`` -- the maximum allowed size for ``build.extra`` fields that
messages if ``extra_limit`` is set to small number or less bigger messages with appear in messages. If the ``build.extra`` field is longer (in terms of
unlimited size. json-encoded length), then it will be omitted. The default value is ``0``
which means no limit.
The ``[queue]`` section controls how (or if) the plugin will use the database
to queue messages when they cannot be immediately sent.
The following fields are understood:
* ``enabled`` -- if true, then the feature is enabled
* ``batch_size`` -- the maximum number of queued messages to send at one time
* ``max_age`` -- the age (in hours) at which old messages in the queue are discarded
It is important to note that the database queue is only a fallback mechanism.
The plugin will always attempt to send messages as they are issued.
Messages are only placed in the database queue when they cannot be immediately
sent on the bus (e.g. if the amqps server is offline).
Admins should consider the balance between the ``batch_size`` and
``extra_limit`` options, as both can affect the total amount of data that the
plugin could attempt to send during a single call.

View file

@ -335,26 +335,39 @@ def store_to_db(msgs):
c.execute('COMMIT') c.execute('COMMIT')
def query_from_db(): def handle_db_msgs(urls, CONFIG):
limit = CONFIG.getint('queue', 'batch_size', fallback=100) limit = CONFIG.getint('queue', 'batch_size', fallback=100)
c = context.cnx.cursor()
# we're running in postCommit, so we need to handle new transaction
c.execute('BEGIN')
try: try:
c = context.cnx.cursor()
# we're running in postCommit, so we need to handle new transaction
c.execute('BEGIN')
c.execute('LOCK TABLE proton_queue IN ACCESS EXCLUSIVE MODE NOWAIT') c.execute('LOCK TABLE proton_queue IN ACCESS EXCLUSIVE MODE NOWAIT')
except psycopg2.OperationalError:
LOG.debug('skipping db queue due to lock')
return
try:
c.execute("DELETE FROM proton_queue WHERE created_ts < NOW() -'%s hours'::interval" % c.execute("DELETE FROM proton_queue WHERE created_ts < NOW() -'%s hours'::interval" %
CONFIG.getint('queue', 'age', fallback=24)) CONFIG.getint('queue', 'age', fallback=24))
query = QueryProcessor(tables=('proton_queue',), query = QueryProcessor(tables=('proton_queue',),
columns=('id', 'address', 'props', 'body'), columns=('id', 'address', 'props', 'body'),
opts={'order': 'id', 'limit': limit}) opts={'order': 'id', 'limit': limit})
msgs = list(query.execute()) msgs = list(query.execute())
if CONFIG.getboolean('broker', 'test_mode', fallback=False):
if msgs:
LOG.debug('test mode: skipping send for %i messages from db', len(msgs))
unsent = []
else:
unsent = {m['id'] for m in _send_msgs(urls, msgs, CONFIG)}
sent = [m for m in msgs if m['id'] not in unsent]
if msgs: if msgs:
c.execute('DELETE FROM proton_queue WHERE id IN %(ids)s', c.execute('DELETE FROM proton_queue WHERE id IN %(ids)s',
{'ids': [msg['id'] for msg in msgs]}) {'ids': [msg['id'] for msg in sent]})
c.execute('COMMIT') finally:
return msgs # make sure we free the lock
except psycopg2.errors.LockNotAvailable: try:
return [] c.execute('COMMIT')
except Exception:
c.execute('ROLLBACK')
@ignore_error @ignore_error
@ -374,24 +387,35 @@ def send_queued_msgs(cbtype, *args, **kws):
db_enabled = False db_enabled = False
if CONFIG.has_option('queue', 'enabled'): if CONFIG.has_option('queue', 'enabled'):
db_enabled = CONFIG.getboolean('queue', 'enabled') db_enabled = CONFIG.getboolean('queue', 'enabled')
if test_mode: if test_mode:
LOG.debug('test mode: skipping send to urls: %r', urls) LOG.debug('test mode: skipping send to urls: %r', urls)
for msg in msgs: fail_chance = CONFIG.getint('broker', 'test_mode_fail', fallback=0)
if fail_chance:
# simulate unsent messages in test mode
sent = []
unsent = []
for m in msgs:
if random.randint(1, 100) <= fail_chance:
unsent.append(m)
else:
sent.append(m)
if unsent:
LOG.info('simulating %i unsent messages' % len(unsent))
else:
sent = msgs
unsent = []
for msg in sent:
LOG.debug('test mode: skipped msg: %r', msg) LOG.debug('test mode: skipped msg: %r', msg)
return else:
unsent = _send_msgs(urls, msgs, CONFIG)
msgs = _send_msgs(urls, msgs, CONFIG) if db_enabled:
if unsent:
if db_enabled and not test_mode:
if msgs:
# if we still have some messages, store them and leave for another call to pick them up # if we still have some messages, store them and leave for another call to pick them up
store_to_db(msgs) store_to_db(msgs)
else: else:
# otherwise we are another call - look to db if there remains something to send # otherwise we are another call - look to db if there remains something to send
msgs = query_from_db() handle_db_msgs(urls, CONFIG)
msgs = _send_msgs(urls, msgs, CONFIG) elif unsent:
# return unsuccesful data to db LOG.error('could not send %i messages. db queue disabled' % len(msgs))
store_to_db(msgs)
if msgs:
LOG.error('could not send messages to any destinations, %s stored to db' % len(msgs))