proton: persistent message queue

Fixes: https://pagure.io/koji/issue/2230
This commit is contained in:
Tomas Kopecek 2020-09-08 15:01:07 +02:00
parent dfccd676b9
commit 8c253396e8
4 changed files with 121 additions and 20 deletions

View file

@ -11,3 +11,11 @@ send_timeout = 60
# if field is longer (json.dumps), ignore it
# default value is 0 - unlimited size
extra_limit = 0
[queue]
# enable persistent database queue
enabled = true
# how many messages are picked from db in one call
# note, that big number can slow requests if there
# is a huge message backlog (typically after broker outage)
batch_size = 100

View file

@ -16,6 +16,7 @@ from proton.reactor import Container
import koji
from koji.context import context
from koji.plugin import callback, convert_datetime, ignore_error
from kojihub import QueryProcessor, InsertProcessor
CONFIG_FILE = '/etc/koji-hub/plugins/protonmsg.conf'
CONFIG = None
@ -295,25 +296,7 @@ def prep_repo_done(cbtype, *args, **kws):
queue_msg(address, props, kws)
@ignore_error
@convert_datetime
@callback('postCommit')
def send_queued_msgs(cbtype, *args, **kws):
msgs = getattr(context, 'protonmsg_msgs', None)
if not msgs:
return
global CONFIG
if not CONFIG:
CONFIG = koji.read_config_files([(CONFIG_FILE, True)])
urls = CONFIG.get('broker', 'urls').split()
test_mode = False
if CONFIG.has_option('broker', 'test_mode'):
test_mode = CONFIG.getboolean('broker', 'test_mode')
if test_mode:
LOG.debug('test mode: skipping send to urls: %r', urls)
for msg in msgs:
LOG.debug('test mode: skipped msg: %r', msg)
return
def _send_msgs(urls, msgs, CONFIG):
random.shuffle(urls)
for url in urls:
container = Container(TimeoutHandler(url, msgs, CONFIG))
@ -326,3 +309,78 @@ def send_queued_msgs(cbtype, *args, **kws):
break
else:
LOG.error('could not send messages to any destinations')
return msgs
def store_to_db(msgs):
for msg in msgs:
if isinstance(msg, tuple):
address = msg[0]
props = json.dumps(msg[1])
body = msg[2]
else:
address = msg['address']
body = msg['body'] # already serialized
props = json.dumps(msg['props'])
insert = InsertProcessor(table='proton_queue')
insert.set(address=address, props=props, body=body)
if 'id' in msg:
# if we've something from db, we should store it in correct order
insert.set(id=msg['db_id'])
insert.execute()
context.cnx.commit()
def query_from_db():
limit = CONFIG.getint('queue', 'batch_size', fallback=100)
c = context.cnx.cursor()
c.execute('BEGIN')
c.execute('LOCK TABLE proton_queue IN ACCESS EXCLUSIVE MODE NOWAIT')
query = QueryProcessor(tables=('proton_queue',),
columns=('id', 'address', 'props', 'body'),
opts={'order': 'id', 'limit': limit})
msgs = list(query.execute())
if msgs:
c.execute('DELETE FROM proton_queue WHERE id IN %(ids)s',
{'ids': [msg['id'] for msg in msgs]})
c.execute('COMMIT')
@ignore_error
@convert_datetime
@callback('postCommit')
def send_queued_msgs(cbtype, *args, **kws):
global CONFIG
msgs = getattr(context, 'protonmsg_msgs', None)
if not msgs:
return
if not CONFIG:
CONFIG = koji.read_config_files([(CONFIG_FILE, True)])
urls = CONFIG.get('broker', 'urls').split()
test_mode = False
if CONFIG.has_option('broker', 'test_mode'):
test_mode = CONFIG.getboolean('broker', 'test_mode')
db_enabled = False
if CONFIG.has_option('queue', 'enabled'):
db_enabled = CONFIG.getboolean('queue', 'test_mode')
if test_mode:
LOG.debug('test mode: skipping send to urls: %r', urls)
for msg in msgs:
LOG.debug('test mode: skipped msg: %r', msg)
return
msgs = _send_msgs(urls, msgs, CONFIG)
if db_enabled and not test_mode:
if msgs:
# if we still have some messages, store them and leave for another call to pick them up
store_to_db(msgs)
else:
# otherwise we are another call - look to db if there remains something to send
msgs = query_from_db()
msgs = _send_msgs(urls, msgs, CONFIG)
# return unsuccesful data to db
store_to_db(msgs)
if msgs:
LOG.error('could not send messages to any destinations, %s stored to db' % len(msgs))