From e204885a907c6cca79f8504a942006cdb737bb28 Mon Sep 17 00:00:00 2001 From: Sandy Walsh Date: Wed, 19 Jun 2013 17:30:06 -0300 Subject: Optionally reject messages on exception. Rather than blindly ack() all messages, consumers can be created that reject() the message if an exception occurs in the callback. Currently only implemented in kombu driver. Change-Id: I308dd794b50966e3a189d31b2b51fdac36cd7d3a --- tests/unit/rpc/test_kombu.py | 105 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 104 insertions(+), 1 deletion(-) (limited to 'tests') diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py index 159fefb..cafb98b 100644 --- a/tests/unit/rpc/test_kombu.py +++ b/tests/unit/rpc/test_kombu.py @@ -23,6 +23,7 @@ import eventlet eventlet.monkey_patch() import contextlib +import functools import logging import mock @@ -77,6 +78,20 @@ class KombuStubs: self.rpc = None +class FakeMessage(object): + acked = False + rejected = False + + def __init__(self, payload): + self.payload = payload + + def ack(self): + self.acked = True + + def reject(self): + self.rejected = True + + class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase): def setUp(self): KombuStubs.setUp(self) @@ -112,6 +127,74 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase): self.assertEqual(self.received_message, message) + def test_callback_handler_ack_on_error(self): + """The default case will ack on error. Same as before. + """ + def _callback(msg): + pass + + conn = self.rpc.create_connection(FLAGS) + consumer = conn.declare_consumer(functools.partial( + impl_kombu.TopicConsumer, + name=None, + exchange_name=None), + "a_topic", _callback) + message = FakeMessage("some message") + consumer._callback_handler(message, _callback) + self.assertTrue(message.acked) + self.assertFalse(message.rejected) + + def test_callback_handler_ack_on_error_exception(self): + + def _callback(msg): + raise MyException() + + conn = self.rpc.create_connection(FLAGS) + consumer = conn.declare_consumer(functools.partial( + impl_kombu.TopicConsumer, + name=None, + exchange_name=None, + ack_on_error=True), + "a_topic", _callback) + message = FakeMessage("some message") + consumer._callback_handler(message, _callback) + self.assertTrue(message.acked) + self.assertFalse(message.rejected) + + def test_callback_handler_no_ack_on_error_exception(self): + + def _callback(msg): + raise MyException() + + conn = self.rpc.create_connection(FLAGS) + consumer = conn.declare_consumer(functools.partial( + impl_kombu.TopicConsumer, + name=None, + exchange_name=None, + ack_on_error=False), + "a_topic", _callback) + message = FakeMessage("some message") + consumer._callback_handler(message, _callback) + self.assertFalse(message.acked) + self.assertTrue(message.rejected) + + def test_callback_handler_no_ack_on_error(self): + + def _callback(msg): + pass + + conn = self.rpc.create_connection(FLAGS) + consumer = conn.declare_consumer(functools.partial( + impl_kombu.TopicConsumer, + name=None, + exchange_name=None, + ack_on_error=False), + "a_topic", _callback) + message = FakeMessage("some message") + consumer._callback_handler(message, _callback) + self.assertTrue(message.acked) + self.assertFalse(message.rejected) + def test_message_ttl_on_timeout(self): """Test message ttl being set by request timeout. The message should die on the vine and never arrive. @@ -514,7 +597,25 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase): 'pool.name', ) - def test_join_consumer_pool(self): + def test_join_consumer_pool_default(self): + meth = 'declare_topic_consumer' + with mock.patch.object(self.rpc.Connection, meth) as p: + conn = self.rpc.create_connection(FLAGS) + conn.join_consumer_pool( + callback=lambda *a, **k: (a, k), + pool_name='pool.name', + topic='topic.name', + exchange_name='exchange.name', + ) + p.assert_called_with( + callback=mock.ANY, # the callback wrapper + queue_name='pool.name', + exchange_name='exchange.name', + topic='topic.name', + ack_on_error=True, + ) + + def test_join_consumer_pool_no_ack(self): meth = 'declare_topic_consumer' with mock.patch.object(self.rpc.Connection, meth) as p: conn = self.rpc.create_connection(FLAGS) @@ -523,12 +624,14 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase): pool_name='pool.name', topic='topic.name', exchange_name='exchange.name', + ack_on_error=False, ) p.assert_called_with( callback=mock.ANY, # the callback wrapper queue_name='pool.name', exchange_name='exchange.name', topic='topic.name', + ack_on_error=False, ) -- cgit