summaryrefslogtreecommitdiffstats
path: root/openstack
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2012-11-29 16:50:20 +0000
committerGerrit Code Review <review@openstack.org>2012-11-29 16:50:20 +0000
commit57fd94b00727d38afbc55ebac568238803d4a693 (patch)
tree79e188cf9870ab0118fe3b96f53c5a3ba273702b /openstack
parent93173520d0e7eb8cfba56108e9db35a93cc37ae8 (diff)
parent86952852c1dadd54f1294ae0adba01cfe827b8ce (diff)
downloadoslo-57fd94b00727d38afbc55ebac568238803d4a693.tar.gz
oslo-57fd94b00727d38afbc55ebac568238803d4a693.tar.xz
oslo-57fd94b00727d38afbc55ebac568238803d4a693.zip
Merge "Qpid H/A cluster support"
Diffstat (limited to 'openstack')
-rw-r--r--openstack/common/rpc/impl_qpid.py23
1 files changed, 14 insertions, 9 deletions
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index fcf2508..fc8a382 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -41,6 +41,9 @@ qpid_opts = [
cfg.StrOpt('qpid_port',
default='5672',
help='Qpid broker port'),
+ cfg.ListOpt('qpid_hosts',
+ default=['$qpid_hostname:$qpid_port'],
+ help='Qpid HA cluster host:port pairs'),
cfg.StrOpt('qpid_username',
default='',
help='Username for qpid connection'),
@@ -277,22 +280,21 @@ class Connection(object):
self.conf = conf
params = {
- 'hostname': self.conf.qpid_hostname,
- 'port': self.conf.qpid_port,
+ 'qpid_hosts': self.conf.qpid_hosts,
'username': self.conf.qpid_username,
'password': self.conf.qpid_password,
}
params.update(server_params or {})
- self.broker = params['hostname'] + ":" + str(params['port'])
+ self.brokers = params['qpid_hosts']
self.username = params['username']
self.password = params['password']
- self.connection_create()
+ self.connection_create(self.brokers[0])
self.reconnect()
- def connection_create(self):
+ def connection_create(self, broker):
# Create the connection - this does not open the connection
- self.connection = qpid.messaging.Connection(self.broker)
+ self.connection = qpid.messaging.Connection(broker)
# Check if flags are set and if so set them for the connection
# before we call open
@@ -320,10 +322,14 @@ class Connection(object):
except qpid.messaging.exceptions.ConnectionError:
pass
+ attempt = 0
delay = 1
while True:
+ broker = self.brokers[attempt % len(self.brokers)]
+ attempt += 1
+
try:
- self.connection_create()
+ self.connection_create(broker)
self.connection.open()
except qpid.messaging.exceptions.ConnectionError, e:
msg_dict = dict(e=e, delay=delay)
@@ -333,10 +339,9 @@ class Connection(object):
time.sleep(delay)
delay = min(2 * delay, 60)
else:
+ LOG.info(_('Connected to AMQP server on %s'), broker)
break
- LOG.info(_('Connected to AMQP server on %s'), self.broker)
-
self.session = self.connection.session()
if self.consumers: