diff --git a/docs/schema-upgrade-1.22-1.23.sql b/docs/schema-upgrade-1.22-1.23.sql index e5fa4dca..841344d0 100644 --- a/docs/schema-upgrade-1.22-1.23.sql +++ b/docs/schema-upgrade-1.22-1.23.sql @@ -6,4 +6,15 @@ BEGIN; CREATE INDEX task_by_no_parent_state_method ON task(parent, state, method) WHERE parent IS NULL; + +-- Message queue for the protonmsg plugin +CREATE TABLE proton_queue ( + id SERIAL PRIMARY KEY, + created_ts TIMESTAMPTZ, + address TEXT NOT NULL, + props JSON NOT NULL, + body JSON NOT NULL +) WITHOUT OIDS; + + COMMIT; diff --git a/docs/schema.sql b/docs/schema.sql index ac3b374f..3fbcd27c 100644 --- a/docs/schema.sql +++ b/docs/schema.sql @@ -937,4 +937,15 @@ CREATE TABLE win_archives ( flags TEXT ) WITHOUT OIDS; + +-- Message queue for the protonmsg plugin +CREATE TABLE proton_queue ( + id SERIAL PRIMARY KEY, + created_ts TIMESTAMPTZ, + address TEXT NOT NULL, + props JSON NOT NULL, + body JSON NOT NULL +) WITHOUT OIDS; + + COMMIT WORK; diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index 2151e87b..2299af89 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -162,34 +162,54 @@ And in scripts, you can use following calls: Proton messaging ================ -It is hub-only plugin which can send all the messages produced by koji to amqps -message brokers. +The ``protonmsg`` plugin for the hub will, if enabled, send a wide range of +messages about Koji activity to the configured amqps message brokers. +Most callback events on the hub are translated into messages. -``Plugins = protonmsg`` needs to be added to ``/etc/koji-hub/hub.conf``. -Configuration file must be placed in ``/etc/koji-hub/plugins/protonmsg.conf``. -There are three sections in config file - broker, queue and message. +In order to enable this plugin, you must: -Broker section allows admin to set up connection options like urls, -certificates, timeouts and topic prefix. +* add ``protonmsg`` to the ``Plugins`` setting in ``/etc/koji-hub/hub.conf`` -Normally, only messages in apache process memory are remembered. There are -various reasons, why these messages can be lost if broker is unavailable for -longer time. For more reliability admin can enable persistent database message -queue. For this is section ``queue`` where ``enabled`` boolean enables this -behaviour. Currently you need to create table manually by running the following -SQL: +* provide a configuration file for the plugin at + ``/etc/koji-hub/plugins/protonmsg.conf`` -.. code-block:: plpgsql +The configuration file is ini-style format with three sections: broker, +queue and message. +The ``[broker]`` section defines how the plugin connects to the message bus. +The following fields are understood: - CREATE TABLE proton_queue ( - id SERIAL PRIMARY KEY, - props JSON NOT NULL, - body JSON NOT NULL - ) +* ``urls`` -- a space separated list of amqps urls. Additional urls are + treated as fallbacks. The plugin will send to the first one that accepts + the message +* ``cert`` -- the client cert file for authentication +* ``cacert`` -- the ca cert to validate the server +* ``topic_prefix`` -- this string will be used as a prefix for all message topics +* ``connect_timeout`` -- the number of seconds to wait for a connection before + timing out +* ``send_timeout`` -- the number of seconds to wait while sending a message + before timing out -Last related option is ``batch_size`` - it says how many messages are send -during one request. It should be balanced number. If there is a large queue it -shouldn't block the request itself as user is waiting for it. On the other hand -it is not hardcoded as it plays with ``extra_limit`` - e.g. there could be more small -messages if ``extra_limit`` is set to small number or less bigger messages with -unlimited size. +The ``[message]`` section sets parameters for how messages are formed. +Currently only one field is understood: + +* ``extra_limit`` -- the maximum allowed size for ``build.extra`` fields that + appear in messages. If the ``build.extra`` field is longer (in terms of + json-encoded length), then it will be omitted. The default value is ``0`` + which means no limit. + +The ``[queue]`` section controls how (or if) the plugin will use the database +to queue messages when they cannot be immediately sent. +The following fields are understood: + +* ``enabled`` -- if true, then the feature is enabled +* ``batch_size`` -- the maximum number of queued messages to send at one time +* ``max_age`` -- the age (in hours) at which old messages in the queue are discarded + +It is important to note that the database queue is only a fallback mechanism. +The plugin will always attempt to send messages as they are issued. +Messages are only placed in the database queue when they cannot be immediately +sent on the bus (e.g. if the amqps server is offline). + +Admins should consider the balance between the ``batch_size`` and +``extra_limit`` options, as both can affect the total amount of data that the +plugin could attempt to send during a single call. diff --git a/plugins/hub/protonmsg.py b/plugins/hub/protonmsg.py index 23f4dd3a..db57198f 100644 --- a/plugins/hub/protonmsg.py +++ b/plugins/hub/protonmsg.py @@ -335,26 +335,39 @@ def store_to_db(msgs): c.execute('COMMIT') -def query_from_db(): +def handle_db_msgs(urls, CONFIG): limit = CONFIG.getint('queue', 'batch_size', fallback=100) + c = context.cnx.cursor() + # we're running in postCommit, so we need to handle new transaction + c.execute('BEGIN') try: - c = context.cnx.cursor() - # we're running in postCommit, so we need to handle new transaction - c.execute('BEGIN') c.execute('LOCK TABLE proton_queue IN ACCESS EXCLUSIVE MODE NOWAIT') + except psycopg2.OperationalError: + LOG.debug('skipping db queue due to lock') + return + try: c.execute("DELETE FROM proton_queue WHERE created_ts < NOW() -'%s hours'::interval" % CONFIG.getint('queue', 'age', fallback=24)) query = QueryProcessor(tables=('proton_queue',), columns=('id', 'address', 'props', 'body'), opts={'order': 'id', 'limit': limit}) msgs = list(query.execute()) + if CONFIG.getboolean('broker', 'test_mode', fallback=False): + if msgs: + LOG.debug('test mode: skipping send for %i messages from db', len(msgs)) + unsent = [] + else: + unsent = {m['id'] for m in _send_msgs(urls, msgs, CONFIG)} + sent = [m for m in msgs if m['id'] not in unsent] if msgs: c.execute('DELETE FROM proton_queue WHERE id IN %(ids)s', - {'ids': [msg['id'] for msg in msgs]}) - c.execute('COMMIT') - return msgs - except psycopg2.errors.LockNotAvailable: - return [] + {'ids': [msg['id'] for msg in sent]}) + finally: + # make sure we free the lock + try: + c.execute('COMMIT') + except Exception: + c.execute('ROLLBACK') @ignore_error @@ -374,24 +387,35 @@ def send_queued_msgs(cbtype, *args, **kws): db_enabled = False if CONFIG.has_option('queue', 'enabled'): db_enabled = CONFIG.getboolean('queue', 'enabled') + if test_mode: LOG.debug('test mode: skipping send to urls: %r', urls) - for msg in msgs: + fail_chance = CONFIG.getint('broker', 'test_mode_fail', fallback=0) + if fail_chance: + # simulate unsent messages in test mode + sent = [] + unsent = [] + for m in msgs: + if random.randint(1, 100) <= fail_chance: + unsent.append(m) + else: + sent.append(m) + if unsent: + LOG.info('simulating %i unsent messages' % len(unsent)) + else: + sent = msgs + unsent = [] + for msg in sent: LOG.debug('test mode: skipped msg: %r', msg) - return + else: + unsent = _send_msgs(urls, msgs, CONFIG) - msgs = _send_msgs(urls, msgs, CONFIG) - - if db_enabled and not test_mode: - if msgs: + if db_enabled: + if unsent: # if we still have some messages, store them and leave for another call to pick them up store_to_db(msgs) else: # otherwise we are another call - look to db if there remains something to send - msgs = query_from_db() - msgs = _send_msgs(urls, msgs, CONFIG) - # return unsuccesful data to db - store_to_db(msgs) - - if msgs: - LOG.error('could not send messages to any destinations, %s stored to db' % len(msgs)) + handle_db_msgs(urls, CONFIG) + elif unsent: + LOG.error('could not send %i messages. db queue disabled' % len(msgs))