diff options
| author | Eugene Kirpichov <ekirpichov@gmail.com> | 2012-07-24 04:14:51 +0000 |
|---|---|---|
| committer | Mark McLoughlin <markmc@redhat.com> | 2012-09-25 11:52:46 +0100 |
| commit | 1481849e0d55051f97bed7b1183d07ab724ccaa8 (patch) | |
| tree | 4cace36d2f1e6a92391dc173109e4afcd0891bfc /openstack/common | |
| parent | 2265258dad5c6a1e535336867d9cea5c7a2355d3 (diff) | |
| download | oslo-1481849e0d55051f97bed7b1183d07ab724ccaa8.tar.gz oslo-1481849e0d55051f97bed7b1183d07ab724ccaa8.tar.xz oslo-1481849e0d55051f97bed7b1183d07ab724ccaa8.zip | |
Support for several HA RabbitMQ servers.
Use the rabbit_ha_queues boolean option.
It is off by default, and you need to wipe RabbitMQ Mnesia
database (/var/lib/rabbitmq/mnesia) when changing it.
Change-Id: Id06692a142d1e5316602f6ceed4ccad615ce74c1
Diffstat (limited to 'openstack/common')
| -rw-r--r-- | openstack/common/rpc/impl_kombu.py | 78 |
1 files changed, 49 insertions, 29 deletions
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 8617bfc..9eaa3cd 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -33,6 +33,7 @@ from openstack.common import cfg from openstack.common.gettextutils import _ from openstack.common.rpc import amqp as rpc_amqp from openstack.common.rpc import common as rpc_common +from openstack.common import utils kombu_opts = [ cfg.StrOpt('kombu_ssl_version', @@ -50,10 +51,13 @@ kombu_opts = [ '(valid only if SSL enabled)')), cfg.StrOpt('rabbit_host', default='localhost', - help='the RabbitMQ host'), + help='Deprecated: Use rabbit_hosts instead. The RabbitMQ host'), cfg.IntOpt('rabbit_port', default=5672, - help='the RabbitMQ port'), + help='Deprecated: Use rabbit_hosts instead. The RabbitMQ port'), + cfg.ListOpt('rabbit_hosts', + default=['$rabbit_host:$rabbit_port'], + help='RabbitMQ HA cluster host:port pairs'), cfg.BoolOpt('rabbit_use_ssl', default=False, help='connect over SSL for RabbitMQ'), @@ -80,6 +84,11 @@ kombu_opts = [ cfg.BoolOpt('rabbit_durable_queues', default=False, help='use durable queues in RabbitMQ'), + cfg.BoolOpt('rabbit_ha_queues', + default=False, + help='use H/A queues in RabbitMQ (x-ha-policy: all).' + 'You need to wipe RabbitMQ database when ' + 'changing this option.'), ] @@ -206,7 +215,9 @@ class TopicConsumer(ConsumerBase): Other kombu options may be passed as keyword arguments """ # Default options + args = {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} options = {'durable': conf.rabbit_durable_queues, + 'queue_arguments': args, 'auto_delete': False, 'exclusive': False} options.update(kwargs) @@ -336,6 +347,7 @@ class NotifyPublisher(TopicPublisher): def __init__(self, conf, channel, topic, **kwargs): self.durable = kwargs.pop('durable', conf.rabbit_durable_queues) + self.rabbit_ha_queues = conf.rabbit_ha_queues super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) def reconnect(self, channel): @@ -344,11 +356,13 @@ class NotifyPublisher(TopicPublisher): # NOTE(jerdfelt): Normally the consumer would create the queue, but # we do this to ensure that messages don't get dropped if the # consumer is started after we do + args = {'x-ha-policy': 'all'} if self.rabbit_ha_queues else {} queue = kombu.entity.Queue(channel=channel, exchange=self.exchange, durable=self.durable, name=self.routing_key, - routing_key=self.routing_key) + routing_key=self.routing_key, + queue_arguments=args) queue.declare() @@ -373,31 +387,36 @@ class Connection(object): if server_params is None: server_params = {} - # Keys to translate from server_params to kombu params server_params_to_kombu_params = {'username': 'userid'} - params = {} - for sp_key, value in server_params.iteritems(): - p_key = server_params_to_kombu_params.get(sp_key, sp_key) - params[p_key] = value + ssl_params = self._fetch_ssl_params() + params_list = [] + for adr in self.conf.rabbit_hosts: + hostname, port = utils.parse_host_port(adr, default_port=5672) - params.setdefault('hostname', self.conf.rabbit_host) - params.setdefault('port', self.conf.rabbit_port) - params.setdefault('userid', self.conf.rabbit_userid) - params.setdefault('password', self.conf.rabbit_password) - params.setdefault('virtual_host', self.conf.rabbit_virtual_host) + params = {} - self.params = params + for sp_key, value in server_params.iteritems(): + p_key = server_params_to_kombu_params.get(sp_key, sp_key) + params[p_key] = value - if self.conf.fake_rabbit: - self.params['transport'] = 'memory' - self.memory_transport = True - else: - self.memory_transport = False + params.setdefault('hostname', hostname) + params.setdefault('port', port) + params.setdefault('userid', self.conf.rabbit_userid) + params.setdefault('password', self.conf.rabbit_password) + params.setdefault('virtual_host', self.conf.rabbit_virtual_host) + + if self.conf.fake_rabbit: + params['transport'] = 'memory' + if self.conf.rabbit_use_ssl: + params['ssl'] = ssl_params + + params_list.append(params) + + self.params_list = params_list - if self.conf.rabbit_use_ssl: - self.params['ssl'] = self._fetch_ssl_params() + self.memory_transport = self.conf.fake_rabbit self.connection = None self.reconnect() @@ -427,14 +446,14 @@ class Connection(object): # Return the extended behavior return ssl_params - def _connect(self): + def _connect(self, params): """Connect to rabbit. Re-establish any queues that may have been declared before if we are reconnecting. Exceptions should be handled by the caller. """ if self.connection: LOG.info(_("Reconnecting to AMQP server on " - "%(hostname)s:%(port)d") % self.params) + "%(hostname)s:%(port)d") % params) try: self.connection.close() except self.connection_errors: @@ -442,7 +461,7 @@ class Connection(object): # Setting this in case the next statement fails, though # it shouldn't be doing any network operations, yet. self.connection = None - self.connection = kombu.connection.BrokerConnection(**self.params) + self.connection = kombu.connection.BrokerConnection(**params) self.connection_errors = self.connection.connection_errors if self.memory_transport: # Kludge to speed up tests. @@ -455,8 +474,8 @@ class Connection(object): self.channel._new_queue('ae.undeliver') for consumer in self.consumers: consumer.reconnect(self.channel) - LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'), - self.params) + LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') % + params) def reconnect(self): """Handles reconnecting and re-establishing queues. @@ -469,11 +488,12 @@ class Connection(object): attempt = 0 while True: + params = self.params_list[attempt % len(self.params_list)] attempt += 1 try: - self._connect() + self._connect(params) return - except (self.connection_errors, IOError), e: + except (IOError, self.connection_errors) as e: pass except Exception, e: # NOTE(comstud): Unfortunately it's possible for amqplib @@ -488,7 +508,7 @@ class Connection(object): log_info = {} log_info['err_str'] = str(e) log_info['max_retries'] = self.max_retries - log_info.update(self.params) + log_info.update(params) if self.max_retries and attempt == self.max_retries: LOG.exception(_('Unable to connect to AMQP server on ' |
