diff options
| author | Jenkins <jenkins@review.openstack.org> | 2012-11-29 16:50:20 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2012-11-29 16:50:20 +0000 |
| commit | 57fd94b00727d38afbc55ebac568238803d4a693 (patch) | |
| tree | 79e188cf9870ab0118fe3b96f53c5a3ba273702b /openstack/common | |
| parent | 93173520d0e7eb8cfba56108e9db35a93cc37ae8 (diff) | |
| parent | 86952852c1dadd54f1294ae0adba01cfe827b8ce (diff) | |
| download | oslo-57fd94b00727d38afbc55ebac568238803d4a693.tar.gz oslo-57fd94b00727d38afbc55ebac568238803d4a693.tar.xz oslo-57fd94b00727d38afbc55ebac568238803d4a693.zip | |
Merge "Qpid H/A cluster support"
Diffstat (limited to 'openstack/common')
| -rw-r--r-- | openstack/common/rpc/impl_qpid.py | 23 |
1 files changed, 14 insertions, 9 deletions
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index fcf2508..fc8a382 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -41,6 +41,9 @@ qpid_opts = [ cfg.StrOpt('qpid_port', default='5672', help='Qpid broker port'), + cfg.ListOpt('qpid_hosts', + default=['$qpid_hostname:$qpid_port'], + help='Qpid HA cluster host:port pairs'), cfg.StrOpt('qpid_username', default='', help='Username for qpid connection'), @@ -277,22 +280,21 @@ class Connection(object): self.conf = conf params = { - 'hostname': self.conf.qpid_hostname, - 'port': self.conf.qpid_port, + 'qpid_hosts': self.conf.qpid_hosts, 'username': self.conf.qpid_username, 'password': self.conf.qpid_password, } params.update(server_params or {}) - self.broker = params['hostname'] + ":" + str(params['port']) + self.brokers = params['qpid_hosts'] self.username = params['username'] self.password = params['password'] - self.connection_create() + self.connection_create(self.brokers[0]) self.reconnect() - def connection_create(self): + def connection_create(self, broker): # Create the connection - this does not open the connection - self.connection = qpid.messaging.Connection(self.broker) + self.connection = qpid.messaging.Connection(broker) # Check if flags are set and if so set them for the connection # before we call open @@ -320,10 +322,14 @@ class Connection(object): except qpid.messaging.exceptions.ConnectionError: pass + attempt = 0 delay = 1 while True: + broker = self.brokers[attempt % len(self.brokers)] + attempt += 1 + try: - self.connection_create() + self.connection_create(broker) self.connection.open() except qpid.messaging.exceptions.ConnectionError, e: msg_dict = dict(e=e, delay=delay) @@ -333,10 +339,9 @@ class Connection(object): time.sleep(delay) delay = min(2 * delay, 60) else: + LOG.info(_('Connected to AMQP server on %s'), broker) break - LOG.info(_('Connected to AMQP server on %s'), self.broker) - self.session = self.connection.session() if self.consumers: |
