diff options
author | Eugene Kirpichov <ekirpichov@gmail.com> | 2012-07-24 04:14:51 +0000 |
---|---|---|
committer | Mark McLoughlin <markmc@redhat.com> | 2012-09-25 11:52:46 +0100 |
commit | 1481849e0d55051f97bed7b1183d07ab724ccaa8 (patch) | |
tree | 4cace36d2f1e6a92391dc173109e4afcd0891bfc /tests/unit/rpc/test_kombu.py | |
parent | 2265258dad5c6a1e535336867d9cea5c7a2355d3 (diff) | |
download | oslo-1481849e0d55051f97bed7b1183d07ab724ccaa8.tar.gz oslo-1481849e0d55051f97bed7b1183d07ab724ccaa8.tar.xz oslo-1481849e0d55051f97bed7b1183d07ab724ccaa8.zip |
Support for several HA RabbitMQ servers.
Use the rabbit_ha_queues boolean option.
It is off by default, and you need to wipe RabbitMQ Mnesia
database (/var/lib/rabbitmq/mnesia) when changing it.
Change-Id: Id06692a142d1e5316602f6ceed4ccad615ce74c1
Diffstat (limited to 'tests/unit/rpc/test_kombu.py')
-rw-r--r-- | tests/unit/rpc/test_kombu.py | 148 |
1 files changed, 131 insertions, 17 deletions
diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py index 2cdca15..16ee63e 100644 --- a/tests/unit/rpc/test_kombu.py +++ b/tests/unit/rpc/test_kombu.py @@ -22,6 +22,7 @@ Unit Tests for remote procedure calls using kombu import eventlet eventlet.monkey_patch() +import contextlib import logging import unittest @@ -32,6 +33,7 @@ from openstack.common import exception from openstack.common.rpc import amqp as rpc_amqp from openstack.common.rpc import common as rpc_common from openstack.common import testutils +from tests import utils from tests.unit.rpc import common try: @@ -64,7 +66,8 @@ def _raise_exc_stub(stubs, times, obj, method, exc_msg, return info -class RpcKombuTestCase(common.BaseRpcAMQPTestCase): +class KombuStubs: + @staticmethod def setUp(self): self.stubs = stubout.StubOutForTesting() if kombu: @@ -73,14 +76,23 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): self.rpc = impl_kombu else: self.rpc = None - super(RpcKombuTestCase, self).setUp() + @staticmethod def tearDown(self): self.stubs.UnsetAll() self.stubs.SmartUnsetAll() if kombu: impl_kombu.cleanup() - super(RpcKombuTestCase, self).tearDown() + + +class RpcKombuTestCase(common.BaseRpcAMQPTestCase): + def setUp(self): + KombuStubs.setUp(self) + common.BaseRpcAMQPTestCase.setUp(self) + + def tearDown(self): + common.BaseRpcAMQPTestCase.tearDown(self) + KombuStubs.tearDown(self) @testutils.skip_if(kombu is None, "Test requires kombu") def test_reusing_connection(self): @@ -166,13 +178,13 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): def __init__(myself, *args, **kwargs): super(MyConnection, myself).__init__(*args, **kwargs) self.assertEqual( - myself.params, - {'hostname': FLAGS.rabbit_host, - 'userid': FLAGS.rabbit_userid, - 'password': FLAGS.rabbit_password, - 'port': FLAGS.rabbit_port, - 'virtual_host': FLAGS.rabbit_virtual_host, - 'transport': 'memory'}) + myself.params_list, + [{'hostname': FLAGS.rabbit_host, + 'userid': FLAGS.rabbit_userid, + 'password': FLAGS.rabbit_password, + 'port': FLAGS.rabbit_port, + 'virtual_host': FLAGS.rabbit_virtual_host, + 'transport': 'memory'}]) def topic_send(_context, topic, msg): pass @@ -199,13 +211,13 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): def __init__(myself, *args, **kwargs): super(MyConnection, myself).__init__(*args, **kwargs) self.assertEqual( - myself.params, - {'hostname': server_params['hostname'], - 'userid': server_params['username'], - 'password': server_params['password'], - 'port': server_params['port'], - 'virtual_host': server_params['virtual_host'], - 'transport': 'memory'}) + myself.params_list, + [{'hostname': server_params['hostname'], + 'userid': server_params['username'], + 'password': server_params['password'], + 'port': server_params['port'], + 'virtual_host': server_params['virtual_host'], + 'transport': 'memory'}]) def topic_send(_context, topic, msg): pass @@ -411,3 +423,105 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): self.assertTrue(value in unicode(exc)) #Traceback should be included in exception message self.assertTrue('exception.ApiError' in unicode(exc)) + + +class RpcKombuHATestCase(utils.BaseTestCase): + def setUp(self): + utils.BaseTestCase.setUp(self) + KombuStubs.setUp(self) + + def tearDown(self): + FLAGS.reset() + KombuStubs.tearDown(self) + utils.BaseTestCase.tearDown(self) + + @testutils.skip_if(kombu is None, "Test requires kombu") + def test_roundrobin_reconnect(self): + """Test that rabbits are tried in roundrobin at connection failures.""" + self.config(rabbit_hosts=[ + 'host1:1234', 'host2:5678', '[::1]:2345', + '[2001:0db8:85a3:0042:0000:8a2e:0370:7334]'], + rabbit_retry_interval=0.1, + rabbit_retry_backoff=0.1) + + info = { + 'attempt': 0, + 'params_list': [ + {'hostname': 'host1', + 'userid': FLAGS.rabbit_userid, + 'password': FLAGS.rabbit_password, + 'port': 1234, + 'virtual_host': FLAGS.rabbit_virtual_host, + 'transport': 'memory'}, + {'hostname': 'host2', + 'userid': FLAGS.rabbit_userid, + 'password': FLAGS.rabbit_password, + 'port': 5678, + 'virtual_host': FLAGS.rabbit_virtual_host, + 'transport': 'memory'}, + {'hostname': '::1', + 'userid': FLAGS.rabbit_userid, + 'password': FLAGS.rabbit_password, + 'port': 2345, + 'virtual_host': FLAGS.rabbit_virtual_host, + 'transport': 'memory'}, + {'hostname': '2001:0db8:85a3:0042:0000:8a2e:0370:7334', + 'userid': FLAGS.rabbit_userid, + 'password': FLAGS.rabbit_password, + 'port': 5672, + 'virtual_host': FLAGS.rabbit_virtual_host, + 'transport': 'memory'}, + ] + } + + import kombu.connection + + class MyConnection(kombu.connection.BrokerConnection): + def __init__(myself, *args, **params): + super(MyConnection, myself).__init__(*args, **params) + self.assertEqual(params, + info['params_list'][info['attempt'] % + len(info['params_list'])]) + info['attempt'] = info['attempt'] + 1 + + def connect(myself): + if info['attempt'] < 5: + # the word timeout is important (see impl_kombu.py:486) + raise Exception('connection timeout') + super(kombu.connection.BrokerConnection, myself).connect() + + self.stubs.Set(kombu.connection, 'BrokerConnection', MyConnection) + + conn = self.rpc.Connection(FLAGS) + + self.assertEqual(info['attempt'], 5) + + @testutils.skip_if(kombu is None, "Test requires kombu") + def test_queue_not_declared_ha_if_ha_off(self): + self.config(rabbit_ha_queues=False) + + import kombu.entity + + def my_declare(myself): + self.assertEqual(None, + (myself.queue_arguments or {}).get('x-ha-policy')) + + self.stubs.Set(kombu.entity.Queue, 'declare', my_declare) + + with contextlib.closing(self.rpc.create_connection(FLAGS)) as conn: + conn.declare_topic_consumer('a_topic', lambda *args: None) + + @testutils.skip_if(kombu is None, "Test requires kombu") + def test_queue_declared_ha_if_ha_on(self): + self.config(rabbit_ha_queues=True) + + import kombu.entity + + def my_declare(myself): + self.assertEqual('all', + (myself.queue_arguments or {}).get('x-ha-policy')) + + self.stubs.Set(kombu.entity.Queue, 'declare', my_declare) + + with contextlib.closing(self.rpc.create_connection(FLAGS)) as conn: + conn.declare_topic_consumer('a_topic', lambda *args: None) |