diff options
| author | Jenkins <jenkins@review.openstack.org> | 2012-09-25 10:56:03 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2012-09-25 10:56:03 +0000 |
| commit | 974989d9b1b67ad40e22034b60f9b2640e9f5824 (patch) | |
| tree | 933f7d1e8b02574b93f781951238256439bac6c6 /openstack | |
| parent | d5095d7effe79c1dcf19a67ff03e00a454662830 (diff) | |
| parent | 1481849e0d55051f97bed7b1183d07ab724ccaa8 (diff) | |
| download | oslo-974989d9b1b67ad40e22034b60f9b2640e9f5824.tar.gz oslo-974989d9b1b67ad40e22034b60f9b2640e9f5824.tar.xz oslo-974989d9b1b67ad40e22034b60f9b2640e9f5824.zip | |
Merge "Support for several HA RabbitMQ servers."
Diffstat (limited to 'openstack')
| -rw-r--r-- | openstack/common/rpc/impl_kombu.py | 78 |
1 files changed, 49 insertions, 29 deletions
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 8617bfc..9eaa3cd 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -33,6 +33,7 @@ from openstack.common import cfg from openstack.common.gettextutils import _ from openstack.common.rpc import amqp as rpc_amqp from openstack.common.rpc import common as rpc_common +from openstack.common import utils kombu_opts = [ cfg.StrOpt('kombu_ssl_version', @@ -50,10 +51,13 @@ kombu_opts = [ '(valid only if SSL enabled)')), cfg.StrOpt('rabbit_host', default='localhost', - help='the RabbitMQ host'), + help='Deprecated: Use rabbit_hosts instead. The RabbitMQ host'), cfg.IntOpt('rabbit_port', default=5672, - help='the RabbitMQ port'), + help='Deprecated: Use rabbit_hosts instead. The RabbitMQ port'), + cfg.ListOpt('rabbit_hosts', + default=['$rabbit_host:$rabbit_port'], + help='RabbitMQ HA cluster host:port pairs'), cfg.BoolOpt('rabbit_use_ssl', default=False, help='connect over SSL for RabbitMQ'), @@ -80,6 +84,11 @@ kombu_opts = [ cfg.BoolOpt('rabbit_durable_queues', default=False, help='use durable queues in RabbitMQ'), + cfg.BoolOpt('rabbit_ha_queues', + default=False, + help='use H/A queues in RabbitMQ (x-ha-policy: all).' + 'You need to wipe RabbitMQ database when ' + 'changing this option.'), ] @@ -206,7 +215,9 @@ class TopicConsumer(ConsumerBase): Other kombu options may be passed as keyword arguments """ # Default options + args = {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} options = {'durable': conf.rabbit_durable_queues, + 'queue_arguments': args, 'auto_delete': False, 'exclusive': False} options.update(kwargs) @@ -336,6 +347,7 @@ class NotifyPublisher(TopicPublisher): def __init__(self, conf, channel, topic, **kwargs): self.durable = kwargs.pop('durable', conf.rabbit_durable_queues) + self.rabbit_ha_queues = conf.rabbit_ha_queues super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) def reconnect(self, channel): @@ -344,11 +356,13 @@ class NotifyPublisher(TopicPublisher): # NOTE(jerdfelt): Normally the consumer would create the queue, but # we do this to ensure that messages don't get dropped if the # consumer is started after we do + args = {'x-ha-policy': 'all'} if self.rabbit_ha_queues else {} queue = kombu.entity.Queue(channel=channel, exchange=self.exchange, durable=self.durable, name=self.routing_key, - routing_key=self.routing_key) + routing_key=self.routing_key, + queue_arguments=args) queue.declare() @@ -373,31 +387,36 @@ class Connection(object): if server_params is None: server_params = {} - # Keys to translate from server_params to kombu params server_params_to_kombu_params = {'username': 'userid'} - params = {} - for sp_key, value in server_params.iteritems(): - p_key = server_params_to_kombu_params.get(sp_key, sp_key) - params[p_key] = value + ssl_params = self._fetch_ssl_params() + params_list = [] + for adr in self.conf.rabbit_hosts: + hostname, port = utils.parse_host_port(adr, default_port=5672) - params.setdefault('hostname', self.conf.rabbit_host) - params.setdefault('port', self.conf.rabbit_port) - params.setdefault('userid', self.conf.rabbit_userid) - params.setdefault('password', self.conf.rabbit_password) - params.setdefault('virtual_host', self.conf.rabbit_virtual_host) + params = {} - self.params = params + for sp_key, value in server_params.iteritems(): + p_key = server_params_to_kombu_params.get(sp_key, sp_key) + params[p_key] = value - if self.conf.fake_rabbit: - self.params['transport'] = 'memory' - self.memory_transport = True - else: - self.memory_transport = False + params.setdefault('hostname', hostname) + params.setdefault('port', port) + params.setdefault('userid', self.conf.rabbit_userid) + params.setdefault('password', self.conf.rabbit_password) + params.setdefault('virtual_host', self.conf.rabbit_virtual_host) + + if self.conf.fake_rabbit: + params['transport'] = 'memory' + if self.conf.rabbit_use_ssl: + params['ssl'] = ssl_params + + params_list.append(params) + + self.params_list = params_list - if self.conf.rabbit_use_ssl: - self.params['ssl'] = self._fetch_ssl_params() + self.memory_transport = self.conf.fake_rabbit self.connection = None self.reconnect() @@ -427,14 +446,14 @@ class Connection(object): # Return the extended behavior return ssl_params - def _connect(self): + def _connect(self, params): """Connect to rabbit. Re-establish any queues that may have been declared before if we are reconnecting. Exceptions should be handled by the caller. """ if self.connection: LOG.info(_("Reconnecting to AMQP server on " - "%(hostname)s:%(port)d") % self.params) + "%(hostname)s:%(port)d") % params) try: self.connection.close() except self.connection_errors: @@ -442,7 +461,7 @@ class Connection(object): # Setting this in case the next statement fails, though # it shouldn't be doing any network operations, yet. self.connection = None - self.connection = kombu.connection.BrokerConnection(**self.params) + self.connection = kombu.connection.BrokerConnection(**params) self.connection_errors = self.connection.connection_errors if self.memory_transport: # Kludge to speed up tests. @@ -455,8 +474,8 @@ class Connection(object): self.channel._new_queue('ae.undeliver') for consumer in self.consumers: consumer.reconnect(self.channel) - LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'), - self.params) + LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') % + params) def reconnect(self): """Handles reconnecting and re-establishing queues. @@ -469,11 +488,12 @@ class Connection(object): attempt = 0 while True: + params = self.params_list[attempt % len(self.params_list)] attempt += 1 try: - self._connect() + self._connect(params) return - except (self.connection_errors, IOError), e: + except (IOError, self.connection_errors) as e: pass except Exception, e: # NOTE(comstud): Unfortunately it's possible for amqplib @@ -488,7 +508,7 @@ class Connection(object): log_info = {} log_info['err_str'] = str(e) log_info['max_retries'] = self.max_retries - log_info.update(self.params) + log_info.update(params) if self.max_retries and attempt == self.max_retries: LOG.exception(_('Unable to connect to AMQP server on ' |
