protonmsg: use consistent type for messages

Previously messages were represented as either tuples or
dictionaries. Now they are always dictionaries.

Fixes: https://pagure.io/koji/issue/2841
This commit is contained in:
Mike McLean 2021-05-01 15:00:20 -04:00 committed by Tomas Kopecek
parent e158b1c81d
commit 1c633dd37b

View file

@ -77,7 +77,7 @@ class TimeoutHandler(MessagingHandler):
def send_msgs(self, event):
prefix = self.conf.get('broker', 'topic_prefix')
for msg in self.msgs:
address = 'topic://' + prefix + '.' + msg[0]
address = 'topic://' + prefix + '.' + msg['address']
if address in self.senders:
sender = self.senders[address]
self.log.debug('retrieved cached sender for %s', address)
@ -85,15 +85,15 @@ class TimeoutHandler(MessagingHandler):
sender = event.container.create_sender(event.connection, target=address)
self.log.debug('created new sender for %s', address)
self.senders[address] = sender
pmsg = Message(properties=msg[1], body=msg[2])
pmsg = Message(properties=msg['props'], body=msg['body'])
delivery = sender.send(pmsg)
self.log.debug('sent message: %s', msg[1])
self.log.debug('sent message: %s', msg['props'])
self.pending[delivery] = msg
def update_pending(self, event):
msg = self.pending[event.delivery]
del self.pending[event.delivery]
self.log.debug('removed message from self.pending: %s', msg[1])
self.log.debug('removed message from self.pending: %s', msg['props'])
if not self.pending:
if self.msgs:
self.log.error('%s messages unsent (rejected or released)', len(self.msgs))
@ -112,17 +112,17 @@ class TimeoutHandler(MessagingHandler):
def on_settled(self, event):
msg = self.pending[event.delivery]
self.msgs.remove(msg)
self.log.debug('removed message from self.msgs: %s', msg[1])
self.log.debug('removed message from self.msgs: %s', msg['props'])
self.update_pending(event)
def on_rejected(self, event):
msg = self.pending[event.delivery]
self.log.error('message was rejected: %s', msg[1])
self.log.error('message was rejected: %s', msg['props'])
self.update_pending(event)
def on_released(self, event):
msg = self.pending[event.delivery]
self.log.error('message was released: %s', msg[1])
self.log.error('message was released: %s', msg['props'])
self.update_pending(event)
def on_transport_tail_closed(self, event):
@ -169,7 +169,7 @@ def queue_msg(address, props, data):
msgs = []
context.protonmsg_msgs = msgs
body = json.dumps(data, default=json_serialize)
msgs.append((address, props, body))
msgs.append({'address': address, 'props': props, 'body': body})
@convert_datetime
@ -322,14 +322,9 @@ def store_to_db(msgs):
# we're running in postCommit, so we need to handle new transaction
c.execute('BEGIN')
for msg in msgs:
if isinstance(msg, tuple):
address = msg[0]
props = json.dumps(msg[1])
body = msg[2]
else:
address = msg['address']
body = msg['body'] # already serialized
props = json.dumps(msg['props'])
address = msg['address']
body = msg['body']
props = json.dumps(msg['props'])
insert = InsertProcessor(table='proton_queue')
insert.set(address=address, props=props, body=body)
if 'id' in msg: