diff options
| author | Gary Kotton <gkotton@redhat.com> | 2012-11-03 03:29:46 +0000 |
|---|---|---|
| committer | Gary Kotton <gkotton@redhat.com> | 2012-11-07 02:47:28 +0000 |
| commit | d93b4223657e7ce3b9b5ec3d1190f3e90e21e9ed (patch) | |
| tree | 18dd8cb568b07a1901248df6f7738b2a50b0878d /openstack | |
| parent | ac4515559fb7bd1a480f7260b7143ebf84e2202a (diff) | |
| download | oslo-d93b4223657e7ce3b9b5ec3d1190f3e90e21e9ed.tar.gz oslo-d93b4223657e7ce3b9b5ec3d1190f3e90e21e9ed.tar.xz oslo-d93b4223657e7ce3b9b5ec3d1190f3e90e21e9ed.zip | |
Fix QPID reconnect issues
The problem results from the eventlet monkey patching and the way in
which qpid performs the session reconnection. The solution is to have
the qpid reconnection disbaled and the common rpc code performing the
reconnection.
The time waiting between reconnections will be a multiple
of 2 starting at 1 and a maximum of 60 seconds.
This works around bug 1073999.
Change-Id: I3c53211d23e4e68ed73046cb3c637662ab73b339
Diffstat (limited to 'openstack')
| -rw-r--r-- | openstack/common/rpc/impl_qpid.py | 70 |
1 files changed, 26 insertions, 44 deletions
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index 1b81cd9..d61cd84 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -50,24 +50,6 @@ qpid_opts = [ cfg.StrOpt('qpid_sasl_mechanisms', default='', help='Space separated list of SASL mechanisms to use for auth'), - cfg.BoolOpt('qpid_reconnect', - default=True, - help='Automatically reconnect'), - cfg.IntOpt('qpid_reconnect_timeout', - default=0, - help='Reconnection timeout in seconds'), - cfg.IntOpt('qpid_reconnect_limit', - default=0, - help='Max reconnections before giving up'), - cfg.IntOpt('qpid_reconnect_interval_min', - default=0, - help='Minimum seconds between reconnection attempts'), - cfg.IntOpt('qpid_reconnect_interval_max', - default=0, - help='Maximum seconds between reconnection attempts'), - cfg.IntOpt('qpid_reconnect_interval', - default=0, - help='Equivalent to setting max and min to the same value'), cfg.IntOpt('qpid_heartbeat', default=60, help='Seconds between connection keepalive heartbeats'), @@ -307,37 +289,27 @@ class Connection(object): params.setdefault(key, default_params[key]) self.broker = params['hostname'] + ":" + str(params['port']) + self.username = params['username'] + self.password = params['password'] + self.connection_create() + self.reconnect() + + def connection_create(self): # Create the connection - this does not open the connection self.connection = qpid.messaging.Connection(self.broker) # Check if flags are set and if so set them for the connection # before we call open - self.connection.username = params['username'] - self.connection.password = params['password'] + self.connection.username = self.username + self.connection.password = self.password + self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms - self.connection.reconnect = self.conf.qpid_reconnect - if self.conf.qpid_reconnect_timeout: - self.connection.reconnect_timeout = ( - self.conf.qpid_reconnect_timeout) - if self.conf.qpid_reconnect_limit: - self.connection.reconnect_limit = self.conf.qpid_reconnect_limit - if self.conf.qpid_reconnect_interval_max: - self.connection.reconnect_interval_max = ( - self.conf.qpid_reconnect_interval_max) - if self.conf.qpid_reconnect_interval_min: - self.connection.reconnect_interval_min = ( - self.conf.qpid_reconnect_interval_min) - if self.conf.qpid_reconnect_interval: - self.connection.reconnect_interval = ( - self.conf.qpid_reconnect_interval) + # Reconnection is done by self.reconnect() + self.connection.reconnect = False self.connection.heartbeat = self.conf.qpid_heartbeat self.connection.protocol = self.conf.qpid_protocol self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay - # Open is part of reconnect - - # NOTE(WGH) not sure we need this with the reconnect flags - self.reconnect() - def _register_consumer(self, consumer): self.consumers[str(consumer.get_receiver())] = consumer @@ -352,12 +324,18 @@ class Connection(object): except qpid.messaging.exceptions.ConnectionError: pass + delay = 1 while True: try: + self.connection_create() self.connection.open() except qpid.messaging.exceptions.ConnectionError, e: - LOG.error(_('Unable to connect to AMQP server: %s'), e) - time.sleep(self.conf.qpid_reconnect_interval or 1) + msg_dict = dict(e=e, delay=delay) + msg = _("Unable to connect to AMQP server: %(e)s. " + "Sleeping %(delay)s seconds") % msg_dict + LOG.error(msg) + time.sleep(delay) + delay = min(2 * delay, 60) else: break @@ -365,10 +343,14 @@ class Connection(object): self.session = self.connection.session() - for consumer in self.consumers.itervalues(): - consumer.reconnect(self.session) - if self.consumers: + consumers = self.consumers + self.consumers = {} + + for consumer in consumers.itervalues(): + consumer.reconnect(self.session) + self._register_consumer(consumer) + LOG.debug(_("Re-established AMQP queues")) def ensure(self, error_callback, method, *args, **kwargs): |
