diff --git a/plugins/messagebus.conf b/plugins/messagebus.conf index f1c89a92..fe18a1ce 100644 --- a/plugins/messagebus.conf +++ b/plugins/messagebus.conf @@ -4,6 +4,8 @@ host = amqp.example.com port = 5671 ssl = true +timeout = 10 +heartbeat = 60 # PLAIN options auth = PLAIN username = guest diff --git a/plugins/messagebus.py b/plugins/messagebus.py index aeda7abe..5917056f 100644 --- a/plugins/messagebus.py +++ b/plugins/messagebus.py @@ -8,6 +8,9 @@ from koji.plugin import callbacks, callback, ignore_error import ConfigParser import logging import qpid.messaging +import qpid.messaging.transports +from ssl import wrap_socket +import socket import os import krbV @@ -18,6 +21,51 @@ config = None session = None target = None +def connect_timeout(host, port, timeout): + for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM): + af, socktype, proto, canonname, sa = res + sock = socket.socket(af, socktype, proto) + sock.settimeout(timeout) + try: + sock.connect(sa) + break + except socket.error, msg: + sock.close() + else: + # If we got here then we couldn't connect (yet) + raise + return sock + +class tlstimeout(qpid.messaging.transports.tls): + def __init__(self, conn, host, port): + self.socket = connect_timeout(host, port, getattr(conn, '_timeout')) + if conn.tcp_nodelay: + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + self.tls = wrap_socket(self.socket, keyfile=conn.ssl_keyfile, certfile=conn.ssl_certfile, ca_certs=conn.ssl_trustfile) + self.socket.setblocking(0) + self.state = None + +qpid.messaging.transports.TRANSPORTS['tls+timeout'] = tlstimeout + +class Connection(qpid.messaging.Connection): + """ + A connection class which supports a timeout option + to the establish() method. Only necessary until + upstream Apache Qpid commit 1487578 is available in + a supported release. + """ + @staticmethod + def establish(url=None, timeout=None, **options): + conn = Connection(url, **options) + conn._timeout = timeout + conn.open() + return conn + + def _wait(self, predicate, timeout=None): + if timeout is None and hasattr(self, '_timeout'): + timeout = self._timeout + return qpid.messaging.Connection._wait(self, predicate, timeout) + def get_sender(): global config, session, target if session and target: @@ -30,6 +78,10 @@ def get_sender(): config = ConfigParser.SafeConfigParser() config.read(CONFIG_FILE) + if not config.has_option('broker', 'timeout'): + config.set('broker', 'timeout', '60') + if not config.has_option('broker', 'heartbeat'): + config.set('broker', 'heartbeat', '60') if config.getboolean('broker', 'ssl'): url = 'amqps://' @@ -54,9 +106,11 @@ def get_sender(): url += config.get('broker', 'host') + ':' url += config.get('broker', 'port') - conn = qpid.messaging.Connection.establish(url, - sasl_mechanisms=config.get('broker', 'auth'), - heartbeat=60) + conn = Connection.establish(url, + sasl_mechanisms=config.get('broker', 'auth'), + transport='tls+timeout', + timeout=config.getfloat('broker', 'timeout'), + heartbeat=config.getint('broker', 'heartbeat')) sess = conn.session() tgt = """%s; { create: sender, @@ -166,6 +220,5 @@ def send_message(cbtype, *args, **kws): else: raise koji.PluginError, 'unsupported exchange type: %s' % exchange_type - sender.send(message, sync=False) - sender.sync(timeout=60) - sender.close() + sender.send(message, sync=True, timeout=config.getfloat('broker', 'timeout')) + sender.close(timeout=config.getfloat('broker', 'timeout'))