diff options
author | Eugene Kirpichov <ekirpichov@gmail.com> | 2012-11-26 11:51:49 +0000 |
---|---|---|
committer | Eugene Kirpichov <ekirpichov@gmail.com> | 2012-11-29 14:30:08 +0000 |
commit | 86952852c1dadd54f1294ae0adba01cfe827b8ce (patch) | |
tree | c88965d23a65af956367d85c42e08a0154fed595 /openstack/common/rpc | |
parent | cf705c59615fb68cdbfe664e7827e4a275a4a282 (diff) | |
download | oslo-86952852c1dadd54f1294ae0adba01cfe827b8ce.tar.gz oslo-86952852c1dadd54f1294ae0adba01cfe827b8ce.tar.xz oslo-86952852c1dadd54f1294ae0adba01cfe827b8ce.zip |
Qpid H/A cluster support
Implements blueprint qpid-roundrobin-reconnect
Change-Id: I01b65e6c33a238663667e9b6023b90172f1b77dd
Diffstat (limited to 'openstack/common/rpc')
-rw-r--r-- | openstack/common/rpc/impl_qpid.py | 23 |
1 files changed, 14 insertions, 9 deletions
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index fcf2508..fc8a382 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -41,6 +41,9 @@ qpid_opts = [ cfg.StrOpt('qpid_port', default='5672', help='Qpid broker port'), + cfg.ListOpt('qpid_hosts', + default=['$qpid_hostname:$qpid_port'], + help='Qpid HA cluster host:port pairs'), cfg.StrOpt('qpid_username', default='', help='Username for qpid connection'), @@ -277,22 +280,21 @@ class Connection(object): self.conf = conf params = { - 'hostname': self.conf.qpid_hostname, - 'port': self.conf.qpid_port, + 'qpid_hosts': self.conf.qpid_hosts, 'username': self.conf.qpid_username, 'password': self.conf.qpid_password, } params.update(server_params or {}) - self.broker = params['hostname'] + ":" + str(params['port']) + self.brokers = params['qpid_hosts'] self.username = params['username'] self.password = params['password'] - self.connection_create() + self.connection_create(self.brokers[0]) self.reconnect() - def connection_create(self): + def connection_create(self, broker): # Create the connection - this does not open the connection - self.connection = qpid.messaging.Connection(self.broker) + self.connection = qpid.messaging.Connection(broker) # Check if flags are set and if so set them for the connection # before we call open @@ -320,10 +322,14 @@ class Connection(object): except qpid.messaging.exceptions.ConnectionError: pass + attempt = 0 delay = 1 while True: + broker = self.brokers[attempt % len(self.brokers)] + attempt += 1 + try: - self.connection_create() + self.connection_create(broker) self.connection.open() except qpid.messaging.exceptions.ConnectionError, e: msg_dict = dict(e=e, delay=delay) @@ -333,10 +339,9 @@ class Connection(object): time.sleep(delay) delay = min(2 * delay, 60) else: + LOG.info(_('Connected to AMQP server on %s'), broker) break - LOG.info(_('Connected to AMQP server on %s'), self.broker) - self.session = self.connection.session() if self.consumers: |