summaryrefslogtreecommitdiffstats
path: root/openstack
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2012-11-13 06:49:41 +0000
committerGerrit Code Review <review@openstack.org>2012-11-13 06:49:41 +0000
commitff8f185ff27c45b3463357ef7c2773bd547f1323 (patch)
tree6e5c6e8a0c210aec447919361b5e8b7e333eb3c0 /openstack
parent5e3c7b50398bd79cf4103f062584c12efc685451 (diff)
parentd93b4223657e7ce3b9b5ec3d1190f3e90e21e9ed (diff)
downloadoslo-ff8f185ff27c45b3463357ef7c2773bd547f1323.tar.gz
oslo-ff8f185ff27c45b3463357ef7c2773bd547f1323.tar.xz
oslo-ff8f185ff27c45b3463357ef7c2773bd547f1323.zip
Merge "Fix QPID reconnect issues"
Diffstat (limited to 'openstack')
-rw-r--r--openstack/common/rpc/impl_qpid.py70
1 files changed, 26 insertions, 44 deletions
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index c29da5d..887a54e 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -50,24 +50,6 @@ qpid_opts = [
cfg.StrOpt('qpid_sasl_mechanisms',
default='',
help='Space separated list of SASL mechanisms to use for auth'),
- cfg.BoolOpt('qpid_reconnect',
- default=True,
- help='Automatically reconnect'),
- cfg.IntOpt('qpid_reconnect_timeout',
- default=0,
- help='Reconnection timeout in seconds'),
- cfg.IntOpt('qpid_reconnect_limit',
- default=0,
- help='Max reconnections before giving up'),
- cfg.IntOpt('qpid_reconnect_interval_min',
- default=0,
- help='Minimum seconds between reconnection attempts'),
- cfg.IntOpt('qpid_reconnect_interval_max',
- default=0,
- help='Maximum seconds between reconnection attempts'),
- cfg.IntOpt('qpid_reconnect_interval',
- default=0,
- help='Equivalent to setting max and min to the same value'),
cfg.IntOpt('qpid_heartbeat',
default=60,
help='Seconds between connection keepalive heartbeats'),
@@ -303,37 +285,27 @@ class Connection(object):
params.update(server_params or {})
self.broker = params['hostname'] + ":" + str(params['port'])
+ self.username = params['username']
+ self.password = params['password']
+ self.connection_create()
+ self.reconnect()
+
+ def connection_create(self):
# Create the connection - this does not open the connection
self.connection = qpid.messaging.Connection(self.broker)
# Check if flags are set and if so set them for the connection
# before we call open
- self.connection.username = params['username']
- self.connection.password = params['password']
+ self.connection.username = self.username
+ self.connection.password = self.password
+
self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
- self.connection.reconnect = self.conf.qpid_reconnect
- if self.conf.qpid_reconnect_timeout:
- self.connection.reconnect_timeout = (
- self.conf.qpid_reconnect_timeout)
- if self.conf.qpid_reconnect_limit:
- self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
- if self.conf.qpid_reconnect_interval_max:
- self.connection.reconnect_interval_max = (
- self.conf.qpid_reconnect_interval_max)
- if self.conf.qpid_reconnect_interval_min:
- self.connection.reconnect_interval_min = (
- self.conf.qpid_reconnect_interval_min)
- if self.conf.qpid_reconnect_interval:
- self.connection.reconnect_interval = (
- self.conf.qpid_reconnect_interval)
+ # Reconnection is done by self.reconnect()
+ self.connection.reconnect = False
self.connection.heartbeat = self.conf.qpid_heartbeat
self.connection.protocol = self.conf.qpid_protocol
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
- # Open is part of reconnect -
- # NOTE(WGH) not sure we need this with the reconnect flags
- self.reconnect()
-
def _register_consumer(self, consumer):
self.consumers[str(consumer.get_receiver())] = consumer
@@ -348,12 +320,18 @@ class Connection(object):
except qpid.messaging.exceptions.ConnectionError:
pass
+ delay = 1
while True:
try:
+ self.connection_create()
self.connection.open()
except qpid.messaging.exceptions.ConnectionError, e:
- LOG.error(_('Unable to connect to AMQP server: %s'), e)
- time.sleep(self.conf.qpid_reconnect_interval or 1)
+ msg_dict = dict(e=e, delay=delay)
+ msg = _("Unable to connect to AMQP server: %(e)s. "
+ "Sleeping %(delay)s seconds") % msg_dict
+ LOG.error(msg)
+ time.sleep(delay)
+ delay = min(2 * delay, 60)
else:
break
@@ -361,10 +339,14 @@ class Connection(object):
self.session = self.connection.session()
- for consumer in self.consumers.itervalues():
- consumer.reconnect(self.session)
-
if self.consumers:
+ consumers = self.consumers
+ self.consumers = {}
+
+ for consumer in consumers.itervalues():
+ consumer.reconnect(self.session)
+ self._register_consumer(consumer)
+
LOG.debug(_("Re-established AMQP queues"))
def ensure(self, error_callback, method, *args, **kwargs):