Add timeout and heartbeat options to the messagebus plugin.

The plugin will block for timeout seconds at most when trying to send a message to the broker.  After that amount of time, the send will fail and an error will be logged.
The plugin will check that the broker is reachable every heartbeat seconds.  If it is not reachable, the connection will be aborted and a new connection will be attempted the next time a message is sent.
This commit is contained in:
Mike Bonnet 2013-06-12 10:11:03 -04:00
parent b5da4d0587
commit a71ae2711b
2 changed files with 61 additions and 6 deletions

View file

@ -4,6 +4,8 @@
host = amqp.example.com
port = 5671
ssl = true
timeout = 10
heartbeat = 60
# PLAIN options
auth = PLAIN
username = guest

View file

@ -8,6 +8,9 @@ from koji.plugin import callbacks, callback, ignore_error
import ConfigParser
import logging
import qpid.messaging
import qpid.messaging.transports
from ssl import wrap_socket
import socket
import os
import krbV
@ -18,6 +21,51 @@ config = None
session = None
target = None
def connect_timeout(host, port, timeout):
for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
sock = socket.socket(af, socktype, proto)
sock.settimeout(timeout)
try:
sock.connect(sa)
break
except socket.error, msg:
sock.close()
else:
# If we got here then we couldn't connect (yet)
raise
return sock
class tlstimeout(qpid.messaging.transports.tls):
def __init__(self, conn, host, port):
self.socket = connect_timeout(host, port, getattr(conn, '_timeout'))
if conn.tcp_nodelay:
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.tls = wrap_socket(self.socket, keyfile=conn.ssl_keyfile, certfile=conn.ssl_certfile, ca_certs=conn.ssl_trustfile)
self.socket.setblocking(0)
self.state = None
qpid.messaging.transports.TRANSPORTS['tls+timeout'] = tlstimeout
class Connection(qpid.messaging.Connection):
"""
A connection class which supports a timeout option
to the establish() method. Only necessary until
upstream Apache Qpid commit 1487578 is available in
a supported release.
"""
@staticmethod
def establish(url=None, timeout=None, **options):
conn = Connection(url, **options)
conn._timeout = timeout
conn.open()
return conn
def _wait(self, predicate, timeout=None):
if timeout is None and hasattr(self, '_timeout'):
timeout = self._timeout
return qpid.messaging.Connection._wait(self, predicate, timeout)
def get_sender():
global config, session, target
if session and target:
@ -30,6 +78,10 @@ def get_sender():
config = ConfigParser.SafeConfigParser()
config.read(CONFIG_FILE)
if not config.has_option('broker', 'timeout'):
config.set('broker', 'timeout', '60')
if not config.has_option('broker', 'heartbeat'):
config.set('broker', 'heartbeat', '60')
if config.getboolean('broker', 'ssl'):
url = 'amqps://'
@ -54,9 +106,11 @@ def get_sender():
url += config.get('broker', 'host') + ':'
url += config.get('broker', 'port')
conn = qpid.messaging.Connection.establish(url,
sasl_mechanisms=config.get('broker', 'auth'),
heartbeat=60)
conn = Connection.establish(url,
sasl_mechanisms=config.get('broker', 'auth'),
transport='tls+timeout',
timeout=config.getfloat('broker', 'timeout'),
heartbeat=config.getint('broker', 'heartbeat'))
sess = conn.session()
tgt = """%s;
{ create: sender,
@ -166,6 +220,5 @@ def send_message(cbtype, *args, **kws):
else:
raise koji.PluginError, 'unsupported exchange type: %s' % exchange_type
sender.send(message, sync=False)
sender.sync(timeout=60)
sender.close()
sender.send(message, sync=True, timeout=config.getfloat('broker', 'timeout'))
sender.close(timeout=config.getfloat('broker', 'timeout'))