PR#2441: proton: persistent message queue
Merges #2441 https://pagure.io/koji/pull-request/2441 Fixes: #2230 https://pagure.io/koji/issue/2230 protonmsg persistent message queue
This commit is contained in:
commit
729f84767c
6 changed files with 200 additions and 20 deletions
|
|
@ -6,4 +6,15 @@ BEGIN;
|
|||
|
||||
CREATE INDEX task_by_no_parent_state_method ON task(parent, state, method) WHERE parent IS NULL;
|
||||
|
||||
|
||||
-- Message queue for the protonmsg plugin
|
||||
CREATE TABLE proton_queue (
|
||||
id SERIAL PRIMARY KEY,
|
||||
created_ts TIMESTAMPTZ,
|
||||
address TEXT NOT NULL,
|
||||
props JSON NOT NULL,
|
||||
body JSON NOT NULL
|
||||
) WITHOUT OIDS;
|
||||
|
||||
|
||||
COMMIT;
|
||||
|
|
|
|||
|
|
@ -937,4 +937,15 @@ CREATE TABLE win_archives (
|
|||
flags TEXT
|
||||
) WITHOUT OIDS;
|
||||
|
||||
|
||||
-- Message queue for the protonmsg plugin
|
||||
CREATE TABLE proton_queue (
|
||||
id SERIAL PRIMARY KEY,
|
||||
created_ts TIMESTAMPTZ,
|
||||
address TEXT NOT NULL,
|
||||
props JSON NOT NULL,
|
||||
body JSON NOT NULL
|
||||
) WITHOUT OIDS;
|
||||
|
||||
|
||||
COMMIT WORK;
|
||||
|
|
|
|||
|
|
@ -158,3 +158,58 @@ And in scripts, you can use following calls:
|
|||
ks = koji.ClientSession('https://koji.fedoraproject.org/kojihub')
|
||||
ks.gssapi_login()
|
||||
ks.createSideTag('f30-build')
|
||||
|
||||
Proton messaging
|
||||
================
|
||||
|
||||
The ``protonmsg`` plugin for the hub will, if enabled, send a wide range of
|
||||
messages about Koji activity to the configured amqps message brokers.
|
||||
Most callback events on the hub are translated into messages.
|
||||
|
||||
In order to enable this plugin, you must:
|
||||
|
||||
* add ``protonmsg`` to the ``Plugins`` setting in ``/etc/koji-hub/hub.conf``
|
||||
|
||||
* provide a configuration file for the plugin at
|
||||
``/etc/koji-hub/plugins/protonmsg.conf``
|
||||
|
||||
The configuration file is ini-style format with three sections: broker,
|
||||
queue and message.
|
||||
The ``[broker]`` section defines how the plugin connects to the message bus.
|
||||
The following fields are understood:
|
||||
|
||||
* ``urls`` -- a space separated list of amqps urls. Additional urls are
|
||||
treated as fallbacks. The plugin will send to the first one that accepts
|
||||
the message
|
||||
* ``cert`` -- the client cert file for authentication
|
||||
* ``cacert`` -- the ca cert to validate the server
|
||||
* ``topic_prefix`` -- this string will be used as a prefix for all message topics
|
||||
* ``connect_timeout`` -- the number of seconds to wait for a connection before
|
||||
timing out
|
||||
* ``send_timeout`` -- the number of seconds to wait while sending a message
|
||||
before timing out
|
||||
|
||||
The ``[message]`` section sets parameters for how messages are formed.
|
||||
Currently only one field is understood:
|
||||
|
||||
* ``extra_limit`` -- the maximum allowed size for ``build.extra`` fields that
|
||||
appear in messages. If the ``build.extra`` field is longer (in terms of
|
||||
json-encoded length), then it will be omitted. The default value is ``0``
|
||||
which means no limit.
|
||||
|
||||
The ``[queue]`` section controls how (or if) the plugin will use the database
|
||||
to queue messages when they cannot be immediately sent.
|
||||
The following fields are understood:
|
||||
|
||||
* ``enabled`` -- if true, then the feature is enabled
|
||||
* ``batch_size`` -- the maximum number of queued messages to send at one time
|
||||
* ``max_age`` -- the age (in hours) at which old messages in the queue are discarded
|
||||
|
||||
It is important to note that the database queue is only a fallback mechanism.
|
||||
The plugin will always attempt to send messages as they are issued.
|
||||
Messages are only placed in the database queue when they cannot be immediately
|
||||
sent on the bus (e.g. if the amqps server is offline).
|
||||
|
||||
Admins should consider the balance between the ``batch_size`` and
|
||||
``extra_limit`` options, as both can affect the total amount of data that the
|
||||
plugin could attempt to send during a single call.
|
||||
|
|
|
|||
|
|
@ -11,3 +11,13 @@ send_timeout = 60
|
|||
# if field is longer (json.dumps), ignore it
|
||||
# default value is 0 - unlimited size
|
||||
extra_limit = 0
|
||||
|
||||
[queue]
|
||||
# enable persistent database queue
|
||||
enabled = true
|
||||
# how many messages are picked from db in one call
|
||||
# note, that big number can slow requests if there
|
||||
# is a huge message backlog (typically after broker outage)
|
||||
batch_size = 100
|
||||
# how old messages should be stored (hours)
|
||||
max_age = 24
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import json
|
|||
import logging
|
||||
import random
|
||||
|
||||
import psycopg2
|
||||
from proton import Message, SSLDomain
|
||||
from proton.handlers import MessagingHandler
|
||||
from proton.reactor import Container
|
||||
|
|
@ -16,6 +17,7 @@ from proton.reactor import Container
|
|||
import koji
|
||||
from koji.context import context
|
||||
from koji.plugin import callback, convert_datetime, ignore_error
|
||||
from kojihub import QueryProcessor, InsertProcessor
|
||||
|
||||
CONFIG_FILE = '/etc/koji-hub/plugins/protonmsg.conf'
|
||||
CONFIG = None
|
||||
|
|
@ -295,25 +297,7 @@ def prep_repo_done(cbtype, *args, **kws):
|
|||
queue_msg(address, props, kws)
|
||||
|
||||
|
||||
@ignore_error
|
||||
@convert_datetime
|
||||
@callback('postCommit')
|
||||
def send_queued_msgs(cbtype, *args, **kws):
|
||||
msgs = getattr(context, 'protonmsg_msgs', None)
|
||||
if not msgs:
|
||||
return
|
||||
global CONFIG
|
||||
if not CONFIG:
|
||||
CONFIG = koji.read_config_files([(CONFIG_FILE, True)])
|
||||
urls = CONFIG.get('broker', 'urls').split()
|
||||
test_mode = False
|
||||
if CONFIG.has_option('broker', 'test_mode'):
|
||||
test_mode = CONFIG.getboolean('broker', 'test_mode')
|
||||
if test_mode:
|
||||
LOG.debug('test mode: skipping send to urls: %r', urls)
|
||||
for msg in msgs:
|
||||
LOG.debug('test mode: skipped msg: %r', msg)
|
||||
return
|
||||
def _send_msgs(urls, msgs, CONFIG):
|
||||
random.shuffle(urls)
|
||||
for url in urls:
|
||||
container = Container(TimeoutHandler(url, msgs, CONFIG))
|
||||
|
|
@ -326,3 +310,112 @@ def send_queued_msgs(cbtype, *args, **kws):
|
|||
break
|
||||
else:
|
||||
LOG.error('could not send messages to any destinations')
|
||||
return msgs
|
||||
|
||||
|
||||
def store_to_db(msgs):
|
||||
c = context.cnx.cursor()
|
||||
# we're running in postCommit, so we need to handle new transaction
|
||||
c.execute('BEGIN')
|
||||
for msg in msgs:
|
||||
if isinstance(msg, tuple):
|
||||
address = msg[0]
|
||||
props = json.dumps(msg[1])
|
||||
body = msg[2]
|
||||
else:
|
||||
address = msg['address']
|
||||
body = msg['body'] # already serialized
|
||||
props = json.dumps(msg['props'])
|
||||
insert = InsertProcessor(table='proton_queue')
|
||||
insert.set(address=address, props=props, body=body)
|
||||
if 'id' in msg:
|
||||
# if we've something from db, we should store it in correct order
|
||||
insert.set(id=msg['db_id'])
|
||||
insert.execute()
|
||||
c.execute('COMMIT')
|
||||
|
||||
|
||||
def handle_db_msgs(urls, CONFIG):
|
||||
limit = CONFIG.getint('queue', 'batch_size', fallback=100)
|
||||
c = context.cnx.cursor()
|
||||
# we're running in postCommit, so we need to handle new transaction
|
||||
c.execute('BEGIN')
|
||||
try:
|
||||
c.execute('LOCK TABLE proton_queue IN ACCESS EXCLUSIVE MODE NOWAIT')
|
||||
except psycopg2.OperationalError:
|
||||
LOG.debug('skipping db queue due to lock')
|
||||
return
|
||||
try:
|
||||
c.execute("DELETE FROM proton_queue WHERE created_ts < NOW() -'%s hours'::interval" %
|
||||
CONFIG.getint('queue', 'age', fallback=24))
|
||||
query = QueryProcessor(tables=('proton_queue',),
|
||||
columns=('id', 'address', 'props', 'body'),
|
||||
opts={'order': 'id', 'limit': limit})
|
||||
msgs = list(query.execute())
|
||||
if CONFIG.getboolean('broker', 'test_mode', fallback=False):
|
||||
if msgs:
|
||||
LOG.debug('test mode: skipping send for %i messages from db', len(msgs))
|
||||
unsent = []
|
||||
else:
|
||||
unsent = {m['id'] for m in _send_msgs(urls, msgs, CONFIG)}
|
||||
sent = [m for m in msgs if m['id'] not in unsent]
|
||||
if msgs:
|
||||
c.execute('DELETE FROM proton_queue WHERE id IN %(ids)s',
|
||||
{'ids': [msg['id'] for msg in sent]})
|
||||
finally:
|
||||
# make sure we free the lock
|
||||
try:
|
||||
c.execute('COMMIT')
|
||||
except Exception:
|
||||
c.execute('ROLLBACK')
|
||||
|
||||
|
||||
@ignore_error
|
||||
@convert_datetime
|
||||
@callback('postCommit')
|
||||
def send_queued_msgs(cbtype, *args, **kws):
|
||||
global CONFIG
|
||||
msgs = getattr(context, 'protonmsg_msgs', None)
|
||||
if not msgs:
|
||||
return
|
||||
if not CONFIG:
|
||||
CONFIG = koji.read_config_files([(CONFIG_FILE, True)])
|
||||
urls = CONFIG.get('broker', 'urls').split()
|
||||
test_mode = False
|
||||
if CONFIG.has_option('broker', 'test_mode'):
|
||||
test_mode = CONFIG.getboolean('broker', 'test_mode')
|
||||
db_enabled = False
|
||||
if CONFIG.has_option('queue', 'enabled'):
|
||||
db_enabled = CONFIG.getboolean('queue', 'enabled')
|
||||
|
||||
if test_mode:
|
||||
LOG.debug('test mode: skipping send to urls: %r', urls)
|
||||
fail_chance = CONFIG.getint('broker', 'test_mode_fail', fallback=0)
|
||||
if fail_chance:
|
||||
# simulate unsent messages in test mode
|
||||
sent = []
|
||||
unsent = []
|
||||
for m in msgs:
|
||||
if random.randint(1, 100) <= fail_chance:
|
||||
unsent.append(m)
|
||||
else:
|
||||
sent.append(m)
|
||||
if unsent:
|
||||
LOG.info('simulating %i unsent messages' % len(unsent))
|
||||
else:
|
||||
sent = msgs
|
||||
unsent = []
|
||||
for msg in sent:
|
||||
LOG.debug('test mode: skipped msg: %r', msg)
|
||||
else:
|
||||
unsent = _send_msgs(urls, msgs, CONFIG)
|
||||
|
||||
if db_enabled:
|
||||
if unsent:
|
||||
# if we still have some messages, store them and leave for another call to pick them up
|
||||
store_to_db(msgs)
|
||||
else:
|
||||
# otherwise we are another call - look to db if there remains something to send
|
||||
handle_db_msgs(urls, CONFIG)
|
||||
elif unsent:
|
||||
LOG.error('could not send %i messages. db queue disabled' % len(msgs))
|
||||
|
|
|
|||
|
|
@ -215,7 +215,7 @@ extra_limit = 2048
|
|||
self.assertEqual(log.debug.call_count, 2)
|
||||
for args in log.debug.call_args_list:
|
||||
self.assertTrue(args[0][0].startswith('could not send'))
|
||||
self.assertEqual(log.error.call_count, 1)
|
||||
self.assertEqual(log.error.call_count, 2)
|
||||
self.assertTrue(log.error.call_args[0][0].startswith('could not send'))
|
||||
|
||||
@patch('protonmsg.Container')
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue