diff options
| author | Eugene Kirpichov <ekirpichov@gmail.com> | 2012-07-24 04:14:51 +0000 |
|---|---|---|
| committer | Mark McLoughlin <markmc@redhat.com> | 2012-09-25 11:52:46 +0100 |
| commit | 1481849e0d55051f97bed7b1183d07ab724ccaa8 (patch) | |
| tree | 4cace36d2f1e6a92391dc173109e4afcd0891bfc | |
| parent | 2265258dad5c6a1e535336867d9cea5c7a2355d3 (diff) | |
| download | oslo-1481849e0d55051f97bed7b1183d07ab724ccaa8.tar.gz oslo-1481849e0d55051f97bed7b1183d07ab724ccaa8.tar.xz oslo-1481849e0d55051f97bed7b1183d07ab724ccaa8.zip | |
Support for several HA RabbitMQ servers.
Use the rabbit_ha_queues boolean option.
It is off by default, and you need to wipe RabbitMQ Mnesia
database (/var/lib/rabbitmq/mnesia) when changing it.
Change-Id: Id06692a142d1e5316602f6ceed4ccad615ce74c1
| -rw-r--r-- | openstack/common/rpc/impl_kombu.py | 78 | ||||
| -rw-r--r-- | tests/unit/rpc/test_kombu.py | 148 |
2 files changed, 180 insertions, 46 deletions
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 8617bfc..9eaa3cd 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -33,6 +33,7 @@ from openstack.common import cfg from openstack.common.gettextutils import _ from openstack.common.rpc import amqp as rpc_amqp from openstack.common.rpc import common as rpc_common +from openstack.common import utils kombu_opts = [ cfg.StrOpt('kombu_ssl_version', @@ -50,10 +51,13 @@ kombu_opts = [ '(valid only if SSL enabled)')), cfg.StrOpt('rabbit_host', default='localhost', - help='the RabbitMQ host'), + help='Deprecated: Use rabbit_hosts instead. The RabbitMQ host'), cfg.IntOpt('rabbit_port', default=5672, - help='the RabbitMQ port'), + help='Deprecated: Use rabbit_hosts instead. The RabbitMQ port'), + cfg.ListOpt('rabbit_hosts', + default=['$rabbit_host:$rabbit_port'], + help='RabbitMQ HA cluster host:port pairs'), cfg.BoolOpt('rabbit_use_ssl', default=False, help='connect over SSL for RabbitMQ'), @@ -80,6 +84,11 @@ kombu_opts = [ cfg.BoolOpt('rabbit_durable_queues', default=False, help='use durable queues in RabbitMQ'), + cfg.BoolOpt('rabbit_ha_queues', + default=False, + help='use H/A queues in RabbitMQ (x-ha-policy: all).' + 'You need to wipe RabbitMQ database when ' + 'changing this option.'), ] @@ -206,7 +215,9 @@ class TopicConsumer(ConsumerBase): Other kombu options may be passed as keyword arguments """ # Default options + args = {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} options = {'durable': conf.rabbit_durable_queues, + 'queue_arguments': args, 'auto_delete': False, 'exclusive': False} options.update(kwargs) @@ -336,6 +347,7 @@ class NotifyPublisher(TopicPublisher): def __init__(self, conf, channel, topic, **kwargs): self.durable = kwargs.pop('durable', conf.rabbit_durable_queues) + self.rabbit_ha_queues = conf.rabbit_ha_queues super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) def reconnect(self, channel): @@ -344,11 +356,13 @@ class NotifyPublisher(TopicPublisher): # NOTE(jerdfelt): Normally the consumer would create the queue, but # we do this to ensure that messages don't get dropped if the # consumer is started after we do + args = {'x-ha-policy': 'all'} if self.rabbit_ha_queues else {} queue = kombu.entity.Queue(channel=channel, exchange=self.exchange, durable=self.durable, name=self.routing_key, - routing_key=self.routing_key) + routing_key=self.routing_key, + queue_arguments=args) queue.declare() @@ -373,31 +387,36 @@ class Connection(object): if server_params is None: server_params = {} - # Keys to translate from server_params to kombu params server_params_to_kombu_params = {'username': 'userid'} - params = {} - for sp_key, value in server_params.iteritems(): - p_key = server_params_to_kombu_params.get(sp_key, sp_key) - params[p_key] = value + ssl_params = self._fetch_ssl_params() + params_list = [] + for adr in self.conf.rabbit_hosts: + hostname, port = utils.parse_host_port(adr, default_port=5672) - params.setdefault('hostname', self.conf.rabbit_host) - params.setdefault('port', self.conf.rabbit_port) - params.setdefault('userid', self.conf.rabbit_userid) - params.setdefault('password', self.conf.rabbit_password) - params.setdefault('virtual_host', self.conf.rabbit_virtual_host) + params = {} - self.params = params + for sp_key, value in server_params.iteritems(): + p_key = server_params_to_kombu_params.get(sp_key, sp_key) + params[p_key] = value - if self.conf.fake_rabbit: - self.params['transport'] = 'memory' - self.memory_transport = True - else: - self.memory_transport = False + params.setdefault('hostname', hostname) + params.setdefault('port', port) + params.setdefault('userid', self.conf.rabbit_userid) + params.setdefault('password', self.conf.rabbit_password) + params.setdefault('virtual_host', self.conf.rabbit_virtual_host) + + if self.conf.fake_rabbit: + params['transport'] = 'memory' + if self.conf.rabbit_use_ssl: + params['ssl'] = ssl_params + + params_list.append(params) + + self.params_list = params_list - if self.conf.rabbit_use_ssl: - self.params['ssl'] = self._fetch_ssl_params() + self.memory_transport = self.conf.fake_rabbit self.connection = None self.reconnect() @@ -427,14 +446,14 @@ class Connection(object): # Return the extended behavior return ssl_params - def _connect(self): + def _connect(self, params): """Connect to rabbit. Re-establish any queues that may have been declared before if we are reconnecting. Exceptions should be handled by the caller. """ if self.connection: LOG.info(_("Reconnecting to AMQP server on " - "%(hostname)s:%(port)d") % self.params) + "%(hostname)s:%(port)d") % params) try: self.connection.close() except self.connection_errors: @@ -442,7 +461,7 @@ class Connection(object): # Setting this in case the next statement fails, though # it shouldn't be doing any network operations, yet. self.connection = None - self.connection = kombu.connection.BrokerConnection(**self.params) + self.connection = kombu.connection.BrokerConnection(**params) self.connection_errors = self.connection.connection_errors if self.memory_transport: # Kludge to speed up tests. @@ -455,8 +474,8 @@ class Connection(object): self.channel._new_queue('ae.undeliver') for consumer in self.consumers: consumer.reconnect(self.channel) - LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'), - self.params) + LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') % + params) def reconnect(self): """Handles reconnecting and re-establishing queues. @@ -469,11 +488,12 @@ class Connection(object): attempt = 0 while True: + params = self.params_list[attempt % len(self.params_list)] attempt += 1 try: - self._connect() + self._connect(params) return - except (self.connection_errors, IOError), e: + except (IOError, self.connection_errors) as e: pass except Exception, e: # NOTE(comstud): Unfortunately it's possible for amqplib @@ -488,7 +508,7 @@ class Connection(object): log_info = {} log_info['err_str'] = str(e) log_info['max_retries'] = self.max_retries - log_info.update(self.params) + log_info.update(params) if self.max_retries and attempt == self.max_retries: LOG.exception(_('Unable to connect to AMQP server on ' diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py index 2cdca15..16ee63e 100644 --- a/tests/unit/rpc/test_kombu.py +++ b/tests/unit/rpc/test_kombu.py @@ -22,6 +22,7 @@ Unit Tests for remote procedure calls using kombu import eventlet eventlet.monkey_patch() +import contextlib import logging import unittest @@ -32,6 +33,7 @@ from openstack.common import exception from openstack.common.rpc import amqp as rpc_amqp from openstack.common.rpc import common as rpc_common from openstack.common import testutils +from tests import utils from tests.unit.rpc import common try: @@ -64,7 +66,8 @@ def _raise_exc_stub(stubs, times, obj, method, exc_msg, return info -class RpcKombuTestCase(common.BaseRpcAMQPTestCase): +class KombuStubs: + @staticmethod def setUp(self): self.stubs = stubout.StubOutForTesting() if kombu: @@ -73,14 +76,23 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): self.rpc = impl_kombu else: self.rpc = None - super(RpcKombuTestCase, self).setUp() + @staticmethod def tearDown(self): self.stubs.UnsetAll() self.stubs.SmartUnsetAll() if kombu: impl_kombu.cleanup() - super(RpcKombuTestCase, self).tearDown() + + +class RpcKombuTestCase(common.BaseRpcAMQPTestCase): + def setUp(self): + KombuStubs.setUp(self) + common.BaseRpcAMQPTestCase.setUp(self) + + def tearDown(self): + common.BaseRpcAMQPTestCase.tearDown(self) + KombuStubs.tearDown(self) @testutils.skip_if(kombu is None, "Test requires kombu") def test_reusing_connection(self): @@ -166,13 +178,13 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): def __init__(myself, *args, **kwargs): super(MyConnection, myself).__init__(*args, **kwargs) self.assertEqual( - myself.params, - {'hostname': FLAGS.rabbit_host, - 'userid': FLAGS.rabbit_userid, - 'password': FLAGS.rabbit_password, - 'port': FLAGS.rabbit_port, - 'virtual_host': FLAGS.rabbit_virtual_host, - 'transport': 'memory'}) + myself.params_list, + [{'hostname': FLAGS.rabbit_host, + 'userid': FLAGS.rabbit_userid, + 'password': FLAGS.rabbit_password, + 'port': FLAGS.rabbit_port, + 'virtual_host': FLAGS.rabbit_virtual_host, + 'transport': 'memory'}]) def topic_send(_context, topic, msg): pass @@ -199,13 +211,13 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): def __init__(myself, *args, **kwargs): super(MyConnection, myself).__init__(*args, **kwargs) self.assertEqual( - myself.params, - {'hostname': server_params['hostname'], - 'userid': server_params['username'], - 'password': server_params['password'], - 'port': server_params['port'], - 'virtual_host': server_params['virtual_host'], - 'transport': 'memory'}) + myself.params_list, + [{'hostname': server_params['hostname'], + 'userid': server_params['username'], + 'password': server_params['password'], + 'port': server_params['port'], + 'virtual_host': server_params['virtual_host'], + 'transport': 'memory'}]) def topic_send(_context, topic, msg): pass @@ -411,3 +423,105 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): self.assertTrue(value in unicode(exc)) #Traceback should be included in exception message self.assertTrue('exception.ApiError' in unicode(exc)) + + +class RpcKombuHATestCase(utils.BaseTestCase): + def setUp(self): + utils.BaseTestCase.setUp(self) + KombuStubs.setUp(self) + + def tearDown(self): + FLAGS.reset() + KombuStubs.tearDown(self) + utils.BaseTestCase.tearDown(self) + + @testutils.skip_if(kombu is None, "Test requires kombu") + def test_roundrobin_reconnect(self): + """Test that rabbits are tried in roundrobin at connection failures.""" + self.config(rabbit_hosts=[ + 'host1:1234', 'host2:5678', '[::1]:2345', + '[2001:0db8:85a3:0042:0000:8a2e:0370:7334]'], + rabbit_retry_interval=0.1, + rabbit_retry_backoff=0.1) + + info = { + 'attempt': 0, + 'params_list': [ + {'hostname': 'host1', + 'userid': FLAGS.rabbit_userid, + 'password': FLAGS.rabbit_password, + 'port': 1234, + 'virtual_host': FLAGS.rabbit_virtual_host, + 'transport': 'memory'}, + {'hostname': 'host2', + 'userid': FLAGS.rabbit_userid, + 'password': FLAGS.rabbit_password, + 'port': 5678, + 'virtual_host': FLAGS.rabbit_virtual_host, + 'transport': 'memory'}, + {'hostname': '::1', + 'userid': FLAGS.rabbit_userid, + 'password': FLAGS.rabbit_password, + 'port': 2345, + 'virtual_host': FLAGS.rabbit_virtual_host, + 'transport': 'memory'}, + {'hostname': '2001:0db8:85a3:0042:0000:8a2e:0370:7334', + 'userid': FLAGS.rabbit_userid, + 'password': FLAGS.rabbit_password, + 'port': 5672, + 'virtual_host': FLAGS.rabbit_virtual_host, + 'transport': 'memory'}, + ] + } + + import kombu.connection + + class MyConnection(kombu.connection.BrokerConnection): + def __init__(myself, *args, **params): + super(MyConnection, myself).__init__(*args, **params) + self.assertEqual(params, + info['params_list'][info['attempt'] % + len(info['params_list'])]) + info['attempt'] = info['attempt'] + 1 + + def connect(myself): + if info['attempt'] < 5: + # the word timeout is important (see impl_kombu.py:486) + raise Exception('connection timeout') + super(kombu.connection.BrokerConnection, myself).connect() + + self.stubs.Set(kombu.connection, 'BrokerConnection', MyConnection) + + conn = self.rpc.Connection(FLAGS) + + self.assertEqual(info['attempt'], 5) + + @testutils.skip_if(kombu is None, "Test requires kombu") + def test_queue_not_declared_ha_if_ha_off(self): + self.config(rabbit_ha_queues=False) + + import kombu.entity + + def my_declare(myself): + self.assertEqual(None, + (myself.queue_arguments or {}).get('x-ha-policy')) + + self.stubs.Set(kombu.entity.Queue, 'declare', my_declare) + + with contextlib.closing(self.rpc.create_connection(FLAGS)) as conn: + conn.declare_topic_consumer('a_topic', lambda *args: None) + + @testutils.skip_if(kombu is None, "Test requires kombu") + def test_queue_declared_ha_if_ha_on(self): + self.config(rabbit_ha_queues=True) + + import kombu.entity + + def my_declare(myself): + self.assertEqual('all', + (myself.queue_arguments or {}).get('x-ha-policy')) + + self.stubs.Set(kombu.entity.Queue, 'declare', my_declare) + + with contextlib.closing(self.rpc.create_connection(FLAGS)) as conn: + conn.declare_topic_consumer('a_topic', lambda *args: None) |
