summaryrefslogtreecommitdiffstats
path: root/openstack/common/rpc
diff options
context:
space:
mode:
authorEugene Kirpichov <ekirpichov@gmail.com>2012-11-26 11:51:49 +0000
committerEugene Kirpichov <ekirpichov@gmail.com>2012-11-29 14:30:08 +0000
commit86952852c1dadd54f1294ae0adba01cfe827b8ce (patch)
treec88965d23a65af956367d85c42e08a0154fed595 /openstack/common/rpc
parentcf705c59615fb68cdbfe664e7827e4a275a4a282 (diff)
downloadoslo-86952852c1dadd54f1294ae0adba01cfe827b8ce.tar.gz
oslo-86952852c1dadd54f1294ae0adba01cfe827b8ce.tar.xz
oslo-86952852c1dadd54f1294ae0adba01cfe827b8ce.zip
Qpid H/A cluster support
Implements blueprint qpid-roundrobin-reconnect Change-Id: I01b65e6c33a238663667e9b6023b90172f1b77dd
Diffstat (limited to 'openstack/common/rpc')
-rw-r--r--openstack/common/rpc/impl_qpid.py23
1 files changed, 14 insertions, 9 deletions
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index fcf2508..fc8a382 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -41,6 +41,9 @@ qpid_opts = [
cfg.StrOpt('qpid_port',
default='5672',
help='Qpid broker port'),
+ cfg.ListOpt('qpid_hosts',
+ default=['$qpid_hostname:$qpid_port'],
+ help='Qpid HA cluster host:port pairs'),
cfg.StrOpt('qpid_username',
default='',
help='Username for qpid connection'),
@@ -277,22 +280,21 @@ class Connection(object):
self.conf = conf
params = {
- 'hostname': self.conf.qpid_hostname,
- 'port': self.conf.qpid_port,
+ 'qpid_hosts': self.conf.qpid_hosts,
'username': self.conf.qpid_username,
'password': self.conf.qpid_password,
}
params.update(server_params or {})
- self.broker = params['hostname'] + ":" + str(params['port'])
+ self.brokers = params['qpid_hosts']
self.username = params['username']
self.password = params['password']
- self.connection_create()
+ self.connection_create(self.brokers[0])
self.reconnect()
- def connection_create(self):
+ def connection_create(self, broker):
# Create the connection - this does not open the connection
- self.connection = qpid.messaging.Connection(self.broker)
+ self.connection = qpid.messaging.Connection(broker)
# Check if flags are set and if so set them for the connection
# before we call open
@@ -320,10 +322,14 @@ class Connection(object):
except qpid.messaging.exceptions.ConnectionError:
pass
+ attempt = 0
delay = 1
while True:
+ broker = self.brokers[attempt % len(self.brokers)]
+ attempt += 1
+
try:
- self.connection_create()
+ self.connection_create(broker)
self.connection.open()
except qpid.messaging.exceptions.ConnectionError, e:
msg_dict = dict(e=e, delay=delay)
@@ -333,10 +339,9 @@ class Connection(object):
time.sleep(delay)
delay = min(2 * delay, 60)
else:
+ LOG.info(_('Connected to AMQP server on %s'), broker)
break
- LOG.info(_('Connected to AMQP server on %s'), self.broker)
-
self.session = self.connection.session()
if self.consumers: