From d93b4223657e7ce3b9b5ec3d1190f3e90e21e9ed Mon Sep 17 00:00:00 2001 From: Gary Kotton Date: Sat, 3 Nov 2012 03:29:46 +0000 Subject: Fix QPID reconnect issues The problem results from the eventlet monkey patching and the way in which qpid performs the session reconnection. The solution is to have the qpid reconnection disbaled and the common rpc code performing the reconnection. The time waiting between reconnections will be a multiple of 2 starting at 1 and a maximum of 60 seconds. This works around bug 1073999. Change-Id: I3c53211d23e4e68ed73046cb3c637662ab73b339 --- openstack/common/rpc/impl_qpid.py | 70 +++++++++++++++------------------------ 1 file changed, 26 insertions(+), 44 deletions(-) (limited to 'openstack') diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index 1b81cd9..d61cd84 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -50,24 +50,6 @@ qpid_opts = [ cfg.StrOpt('qpid_sasl_mechanisms', default='', help='Space separated list of SASL mechanisms to use for auth'), - cfg.BoolOpt('qpid_reconnect', - default=True, - help='Automatically reconnect'), - cfg.IntOpt('qpid_reconnect_timeout', - default=0, - help='Reconnection timeout in seconds'), - cfg.IntOpt('qpid_reconnect_limit', - default=0, - help='Max reconnections before giving up'), - cfg.IntOpt('qpid_reconnect_interval_min', - default=0, - help='Minimum seconds between reconnection attempts'), - cfg.IntOpt('qpid_reconnect_interval_max', - default=0, - help='Maximum seconds between reconnection attempts'), - cfg.IntOpt('qpid_reconnect_interval', - default=0, - help='Equivalent to setting max and min to the same value'), cfg.IntOpt('qpid_heartbeat', default=60, help='Seconds between connection keepalive heartbeats'), @@ -307,37 +289,27 @@ class Connection(object): params.setdefault(key, default_params[key]) self.broker = params['hostname'] + ":" + str(params['port']) + self.username = params['username'] + self.password = params['password'] + self.connection_create() + self.reconnect() + + def connection_create(self): # Create the connection - this does not open the connection self.connection = qpid.messaging.Connection(self.broker) # Check if flags are set and if so set them for the connection # before we call open - self.connection.username = params['username'] - self.connection.password = params['password'] + self.connection.username = self.username + self.connection.password = self.password + self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms - self.connection.reconnect = self.conf.qpid_reconnect - if self.conf.qpid_reconnect_timeout: - self.connection.reconnect_timeout = ( - self.conf.qpid_reconnect_timeout) - if self.conf.qpid_reconnect_limit: - self.connection.reconnect_limit = self.conf.qpid_reconnect_limit - if self.conf.qpid_reconnect_interval_max: - self.connection.reconnect_interval_max = ( - self.conf.qpid_reconnect_interval_max) - if self.conf.qpid_reconnect_interval_min: - self.connection.reconnect_interval_min = ( - self.conf.qpid_reconnect_interval_min) - if self.conf.qpid_reconnect_interval: - self.connection.reconnect_interval = ( - self.conf.qpid_reconnect_interval) + # Reconnection is done by self.reconnect() + self.connection.reconnect = False self.connection.heartbeat = self.conf.qpid_heartbeat self.connection.protocol = self.conf.qpid_protocol self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay - # Open is part of reconnect - - # NOTE(WGH) not sure we need this with the reconnect flags - self.reconnect() - def _register_consumer(self, consumer): self.consumers[str(consumer.get_receiver())] = consumer @@ -352,12 +324,18 @@ class Connection(object): except qpid.messaging.exceptions.ConnectionError: pass + delay = 1 while True: try: + self.connection_create() self.connection.open() except qpid.messaging.exceptions.ConnectionError, e: - LOG.error(_('Unable to connect to AMQP server: %s'), e) - time.sleep(self.conf.qpid_reconnect_interval or 1) + msg_dict = dict(e=e, delay=delay) + msg = _("Unable to connect to AMQP server: %(e)s. " + "Sleeping %(delay)s seconds") % msg_dict + LOG.error(msg) + time.sleep(delay) + delay = min(2 * delay, 60) else: break @@ -365,10 +343,14 @@ class Connection(object): self.session = self.connection.session() - for consumer in self.consumers.itervalues(): - consumer.reconnect(self.session) - if self.consumers: + consumers = self.consumers + self.consumers = {} + + for consumer in consumers.itervalues(): + consumer.reconnect(self.session) + self._register_consumer(consumer) + LOG.debug(_("Re-established AMQP queues")) def ensure(self, error_callback, method, *args, **kwargs): -- cgit