From 86952852c1dadd54f1294ae0adba01cfe827b8ce Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Mon, 26 Nov 2012 11:51:49 +0000 Subject: Qpid H/A cluster support Implements blueprint qpid-roundrobin-reconnect Change-Id: I01b65e6c33a238663667e9b6023b90172f1b77dd --- openstack/common/rpc/impl_qpid.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) (limited to 'openstack') diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index fcf2508..fc8a382 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -41,6 +41,9 @@ qpid_opts = [ cfg.StrOpt('qpid_port', default='5672', help='Qpid broker port'), + cfg.ListOpt('qpid_hosts', + default=['$qpid_hostname:$qpid_port'], + help='Qpid HA cluster host:port pairs'), cfg.StrOpt('qpid_username', default='', help='Username for qpid connection'), @@ -277,22 +280,21 @@ class Connection(object): self.conf = conf params = { - 'hostname': self.conf.qpid_hostname, - 'port': self.conf.qpid_port, + 'qpid_hosts': self.conf.qpid_hosts, 'username': self.conf.qpid_username, 'password': self.conf.qpid_password, } params.update(server_params or {}) - self.broker = params['hostname'] + ":" + str(params['port']) + self.brokers = params['qpid_hosts'] self.username = params['username'] self.password = params['password'] - self.connection_create() + self.connection_create(self.brokers[0]) self.reconnect() - def connection_create(self): + def connection_create(self, broker): # Create the connection - this does not open the connection - self.connection = qpid.messaging.Connection(self.broker) + self.connection = qpid.messaging.Connection(broker) # Check if flags are set and if so set them for the connection # before we call open @@ -320,10 +322,14 @@ class Connection(object): except qpid.messaging.exceptions.ConnectionError: pass + attempt = 0 delay = 1 while True: + broker = self.brokers[attempt % len(self.brokers)] + attempt += 1 + try: - self.connection_create() + self.connection_create(broker) self.connection.open() except qpid.messaging.exceptions.ConnectionError, e: msg_dict = dict(e=e, delay=delay) @@ -333,10 +339,9 @@ class Connection(object): time.sleep(delay) delay = min(2 * delay, 60) else: + LOG.info(_('Connected to AMQP server on %s'), broker) break - LOG.info(_('Connected to AMQP server on %s'), self.broker) - self.session = self.connection.session() if self.consumers: -- cgit