diff options
Diffstat (limited to 'tests/unit/rpc/test_kombu.py')
-rw-r--r-- | tests/unit/rpc/test_kombu.py | 238 |
1 files changed, 221 insertions, 17 deletions
diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py index 159fefb..cbe948d 100644 --- a/tests/unit/rpc/test_kombu.py +++ b/tests/unit/rpc/test_kombu.py @@ -23,11 +23,15 @@ import eventlet eventlet.monkey_patch() import contextlib +import functools import logging +import weakref +import fixtures import mock from oslo.config import cfg import six +import time from openstack.common import exception from openstack.common.rpc import amqp as rpc_amqp @@ -37,6 +41,8 @@ from tests import utils try: import kombu + import kombu.connection + import kombu.entity from openstack.common.rpc import impl_kombu except ImportError: kombu = None @@ -65,25 +71,50 @@ def _raise_exc_stub(stubs, times, obj, method, exc_msg, return info -class KombuStubs: - @staticmethod +class KombuStubs(fixtures.Fixture): + def __init__(self, test): + super(KombuStubs, self).__init__() + + # NOTE(rpodolyaka): use a weak ref here to prevent ref cycles + self.test = weakref.ref(test) + def setUp(self): + super(KombuStubs, self).setUp() + + test = self.test() if kombu: - self.config(fake_rabbit=True) - self.config(rpc_response_timeout=5) - self.rpc = impl_kombu + test.conf = FLAGS + test.config(fake_rabbit=True) + test.config(rpc_response_timeout=5) + test.rpc = impl_kombu self.addCleanup(impl_kombu.cleanup) else: - self.rpc = None + test.rpc = None + + +class FakeMessage(object): + acked = False + rejected = False + + def __init__(self, payload): + self.payload = payload + + def ack(self): + self.acked = True + + def reject(self): + self.rejected = True class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase): def setUp(self): - KombuStubs.setUp(self) - super(RpcKombuTestCase, self).setUp() if kombu is None: self.skipTest("Test requires kombu") + self.useFixture(KombuStubs(self)) + + super(RpcKombuTestCase, self).setUp() + def test_reusing_connection(self): """Test that reusing a connection returns same one.""" conn_context = self.rpc.create_connection(FLAGS, new=False) @@ -112,6 +143,74 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase): self.assertEqual(self.received_message, message) + def test_callback_handler_ack_on_error(self): + """The default case will ack on error. Same as before. + """ + def _callback(msg): + pass + + conn = self.rpc.create_connection(FLAGS) + consumer = conn.declare_consumer(functools.partial( + impl_kombu.TopicConsumer, + name=None, + exchange_name=None), + "a_topic", _callback) + message = FakeMessage("some message") + consumer._callback_handler(message, _callback) + self.assertTrue(message.acked) + self.assertFalse(message.rejected) + + def test_callback_handler_ack_on_error_exception(self): + + def _callback(msg): + raise MyException() + + conn = self.rpc.create_connection(FLAGS) + consumer = conn.declare_consumer(functools.partial( + impl_kombu.TopicConsumer, + name=None, + exchange_name=None, + ack_on_error=True), + "a_topic", _callback) + message = FakeMessage("some message") + consumer._callback_handler(message, _callback) + self.assertTrue(message.acked) + self.assertFalse(message.rejected) + + def test_callback_handler_no_ack_on_error_exception(self): + + def _callback(msg): + raise MyException() + + conn = self.rpc.create_connection(FLAGS) + consumer = conn.declare_consumer(functools.partial( + impl_kombu.TopicConsumer, + name=None, + exchange_name=None, + ack_on_error=False), + "a_topic", _callback) + message = FakeMessage("some message") + consumer._callback_handler(message, _callback) + self.assertFalse(message.acked) + self.assertTrue(message.rejected) + + def test_callback_handler_no_ack_on_error(self): + + def _callback(msg): + pass + + conn = self.rpc.create_connection(FLAGS) + consumer = conn.declare_consumer(functools.partial( + impl_kombu.TopicConsumer, + name=None, + exchange_name=None, + ack_on_error=False), + "a_topic", _callback) + message = FakeMessage("some message") + consumer._callback_handler(message, _callback) + self.assertTrue(message.acked) + self.assertFalse(message.rejected) + def test_message_ttl_on_timeout(self): """Test message ttl being set by request timeout. The message should die on the vine and never arrive. @@ -308,6 +407,22 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase): impl_kombu.cast_to_server(FLAGS, ctxt, server_params, 'fake_topic', {'msg': 'fake'}) + def test_fanout_success(self): + # Overriding the method in the base class because the test + # seems to fail for the same reason as + # test_fanout_send_receive(). + self.skipTest("kombu memory transport seems buggy with " + "fanout queues as this test passes when " + "you use rabbit (fake_rabbit=False)") + + def test_cast_success_despite_missing_args(self): + # Overriding the method in the base class because the test + # seems to fail for the same reason as + # test_fanout_send_receive(). + self.skipTest("kombu memory transport seems buggy with " + "fanout queues as this test passes when " + "you use rabbit (fake_rabbit=False)") + def test_fanout_send_receive(self): """Test sending to a fanout exchange and consuming from 2 queues.""" @@ -514,7 +629,7 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase): 'pool.name', ) - def test_join_consumer_pool(self): + def test_join_consumer_pool_default(self): meth = 'declare_topic_consumer' with mock.patch.object(self.rpc.Connection, meth) as p: conn = self.rpc.create_connection(FLAGS) @@ -529,13 +644,108 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase): queue_name='pool.name', exchange_name='exchange.name', topic='topic.name', + ack_on_error=True, ) + def test_join_consumer_pool_no_ack(self): + meth = 'declare_topic_consumer' + with mock.patch.object(self.rpc.Connection, meth) as p: + conn = self.rpc.create_connection(FLAGS) + conn.join_consumer_pool( + callback=lambda *a, **k: (a, k), + pool_name='pool.name', + topic='topic.name', + exchange_name='exchange.name', + ack_on_error=False, + ) + p.assert_called_with( + callback=mock.ANY, # the callback wrapper + queue_name='pool.name', + exchange_name='exchange.name', + topic='topic.name', + ack_on_error=False, + ) + + # used to make unexpected exception tests run faster + def my_time_sleep(self, sleep_time): + return + + def test_service_consume_thread_unexpected_exceptions(self): + + def my_TopicConsumer_consume(myself, *args, **kwargs): + self.consume_calls += 1 + # see if it can sustain three failures + if self.consume_calls < 3: + raise Exception('unexpected exception') + else: + self.orig_TopicConsumer_consume(myself, *args, **kwargs) + + self.consume_calls = 0 + self.orig_TopicConsumer_consume = impl_kombu.TopicConsumer.consume + self.stubs.Set(impl_kombu.TopicConsumer, 'consume', + my_TopicConsumer_consume) + self.stubs.Set(time, 'sleep', self.my_time_sleep) + + value = 42 + result = self.rpc.call(FLAGS, self.context, self.topic, + {"method": "echo", + "args": {"value": value}}) + self.assertEqual(value, result) + + def test_replyproxy_consume_thread_unexpected_exceptions(self): + + def my_DirectConsumer_consume(myself, *args, **kwargs): + self.consume_calls += 1 + # see if it can sustain three failures + if self.consume_calls < 3: + raise Exception('unexpected exception') + else: + self.orig_DirectConsumer_consume(myself, *args, **kwargs) + + self.consume_calls = 1 + self.orig_DirectConsumer_consume = impl_kombu.DirectConsumer.consume + self.stubs.Set(impl_kombu.DirectConsumer, 'consume', + my_DirectConsumer_consume) + self.stubs.Set(time, 'sleep', self.my_time_sleep) + + value = 42 + result = self.rpc.call(FLAGS, self.context, self.topic, + {"method": "echo", + "args": {"value": value}}) + self.assertEqual(value, result) + + def test_reconnect_max_retries(self): + self.config(rabbit_hosts=[ + 'host1:1234', 'host2:5678', '[::1]:2345', + '[2001:0db8:85a3:0042:0000:8a2e:0370:7334]'], + rabbit_max_retries=2, + rabbit_retry_interval=0.1, + rabbit_retry_backoff=0.1) + + info = {'attempt': 0} + + class MyConnection(kombu.connection.BrokerConnection): + def __init__(self, *args, **params): + super(MyConnection, self).__init__(*args, **params) + info['attempt'] += 1 + + def connect(self): + if info['attempt'] < 3: + # the word timeout is important (see impl_kombu.py:486) + raise Exception('connection timeout') + super(kombu.connection.BrokerConnection, self).connect() + + self.stubs.Set(kombu.connection, 'BrokerConnection', MyConnection) + + self.assertRaises(rpc_common.RPCException, self.rpc.Connection, FLAGS) + self.assertEqual(info['attempt'], 2) + class RpcKombuHATestCase(utils.BaseTestCase): def setUp(self): super(RpcKombuHATestCase, self).setUp() - KombuStubs.setUp(self) + + self.useFixture(KombuStubs(self)) self.addCleanup(FLAGS.reset) def test_roundrobin_reconnect(self): @@ -576,15 +786,13 @@ class RpcKombuHATestCase(utils.BaseTestCase): ] } - import kombu.connection - class MyConnection(kombu.connection.BrokerConnection): def __init__(myself, *args, **params): super(MyConnection, myself).__init__(*args, **params) self.assertEqual(params, info['params_list'][info['attempt'] % len(info['params_list'])]) - info['attempt'] = info['attempt'] + 1 + info['attempt'] += 1 def connect(myself): if info['attempt'] < 5: @@ -601,8 +809,6 @@ class RpcKombuHATestCase(utils.BaseTestCase): def test_queue_not_declared_ha_if_ha_off(self): self.config(rabbit_ha_queues=False) - import kombu.entity - def my_declare(myself): self.assertEqual(None, (myself.queue_arguments or {}).get('x-ha-policy')) @@ -615,8 +821,6 @@ class RpcKombuHATestCase(utils.BaseTestCase): def test_queue_declared_ha_if_ha_on(self): self.config(rabbit_ha_queues=True) - import kombu.entity - def my_declare(myself): self.assertEqual('all', (myself.queue_arguments or {}).get('x-ha-policy')) |