proton: handling of lock failure

This commit is contained in:
Tomas Kopecek 2020-09-22 11:08:42 +02:00
parent 99c21690c2
commit aff74c4b3a

View file

@ -9,6 +9,7 @@ import json
import logging
import random
import psycopg2
from proton import Message, SSLDomain
from proton.handlers import MessagingHandler
from proton.reactor import Container
@ -313,6 +314,9 @@ def _send_msgs(urls, msgs, CONFIG):
def store_to_db(msgs):
c = context.cnx.cursor()
# we're running in postCommit, so we need to handle new transaction
c.execute('BEGIN')
for msg in msgs:
if isinstance(msg, tuple):
address = msg[0]
@ -328,24 +332,29 @@ def store_to_db(msgs):
# if we've something from db, we should store it in correct order
insert.set(id=msg['db_id'])
insert.execute()
context.cnx.commit()
c.execute('COMMIT')
def query_from_db():
limit = CONFIG.getint('queue', 'batch_size', fallback=100)
c = context.cnx.cursor()
c.execute('BEGIN')
c.execute('LOCK TABLE proton_queue IN ACCESS EXCLUSIVE MODE NOWAIT')
c.execute("DELETE FROM proton_queue WHERE created_ts < NOW() -'%s hours'::interval" %
CONFIG.getint('queue', 'age', fallback=24))
query = QueryProcessor(tables=('proton_queue',),
columns=('id', 'address', 'props', 'body'),
opts={'order': 'id', 'limit': limit})
msgs = list(query.execute())
if msgs:
c.execute('DELETE FROM proton_queue WHERE id IN %(ids)s',
{'ids': [msg['id'] for msg in msgs]})
c.execute('COMMIT')
try:
c = context.cnx.cursor()
# we're running in postCommit, so we need to handle new transaction
c.execute('BEGIN')
c.execute('LOCK TABLE proton_queue IN ACCESS EXCLUSIVE MODE NOWAIT')
c.execute("DELETE FROM proton_queue WHERE created_ts < NOW() -'%s hours'::interval" %
CONFIG.getint('queue', 'age', fallback=24))
query = QueryProcessor(tables=('proton_queue',),
columns=('id', 'address', 'props', 'body'),
opts={'order': 'id', 'limit': limit})
msgs = list(query.execute())
if msgs:
c.execute('DELETE FROM proton_queue WHERE id IN %(ids)s',
{'ids': [msg['id'] for msg in msgs]})
c.execute('COMMIT')
return msgs
except psycopg2.errors.LockNotAvailable:
return []
@ignore_error
@ -364,7 +373,7 @@ def send_queued_msgs(cbtype, *args, **kws):
test_mode = CONFIG.getboolean('broker', 'test_mode')
db_enabled = False
if CONFIG.has_option('queue', 'enabled'):
db_enabled = CONFIG.getboolean('queue', 'test_mode')
db_enabled = CONFIG.getboolean('queue', 'enabled')
if test_mode:
LOG.debug('test mode: skipping send to urls: %r', urls)
for msg in msgs: