diff options
author | Sandy Walsh <sandy.walsh@rackspace.com> | 2013-06-19 17:30:06 -0300 |
---|---|---|
committer | Sandy Walsh <sandy.walsh@rackspace.com> | 2013-06-20 14:35:25 -0300 |
commit | e204885a907c6cca79f8504a942006cdb737bb28 (patch) | |
tree | e28105d857c079035eae1371209cc02e3e6e6855 /tests | |
parent | 49552a75bbe14c4fbce1de40efe985fe5832816e (diff) | |
download | oslo-e204885a907c6cca79f8504a942006cdb737bb28.tar.gz oslo-e204885a907c6cca79f8504a942006cdb737bb28.tar.xz oslo-e204885a907c6cca79f8504a942006cdb737bb28.zip |
Optionally reject messages on exception.
Rather than blindly ack() all messages, consumers can be created
that reject() the message if an exception occurs in the callback.
Currently only implemented in kombu driver.
Change-Id: I308dd794b50966e3a189d31b2b51fdac36cd7d3a
Diffstat (limited to 'tests')
-rw-r--r-- | tests/unit/rpc/test_kombu.py | 105 |
1 files changed, 104 insertions, 1 deletions
diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py index 159fefb..cafb98b 100644 --- a/tests/unit/rpc/test_kombu.py +++ b/tests/unit/rpc/test_kombu.py @@ -23,6 +23,7 @@ import eventlet eventlet.monkey_patch() import contextlib +import functools import logging import mock @@ -77,6 +78,20 @@ class KombuStubs: self.rpc = None +class FakeMessage(object): + acked = False + rejected = False + + def __init__(self, payload): + self.payload = payload + + def ack(self): + self.acked = True + + def reject(self): + self.rejected = True + + class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase): def setUp(self): KombuStubs.setUp(self) @@ -112,6 +127,74 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase): self.assertEqual(self.received_message, message) + def test_callback_handler_ack_on_error(self): + """The default case will ack on error. Same as before. + """ + def _callback(msg): + pass + + conn = self.rpc.create_connection(FLAGS) + consumer = conn.declare_consumer(functools.partial( + impl_kombu.TopicConsumer, + name=None, + exchange_name=None), + "a_topic", _callback) + message = FakeMessage("some message") + consumer._callback_handler(message, _callback) + self.assertTrue(message.acked) + self.assertFalse(message.rejected) + + def test_callback_handler_ack_on_error_exception(self): + + def _callback(msg): + raise MyException() + + conn = self.rpc.create_connection(FLAGS) + consumer = conn.declare_consumer(functools.partial( + impl_kombu.TopicConsumer, + name=None, + exchange_name=None, + ack_on_error=True), + "a_topic", _callback) + message = FakeMessage("some message") + consumer._callback_handler(message, _callback) + self.assertTrue(message.acked) + self.assertFalse(message.rejected) + + def test_callback_handler_no_ack_on_error_exception(self): + + def _callback(msg): + raise MyException() + + conn = self.rpc.create_connection(FLAGS) + consumer = conn.declare_consumer(functools.partial( + impl_kombu.TopicConsumer, + name=None, + exchange_name=None, + ack_on_error=False), + "a_topic", _callback) + message = FakeMessage("some message") + consumer._callback_handler(message, _callback) + self.assertFalse(message.acked) + self.assertTrue(message.rejected) + + def test_callback_handler_no_ack_on_error(self): + + def _callback(msg): + pass + + conn = self.rpc.create_connection(FLAGS) + consumer = conn.declare_consumer(functools.partial( + impl_kombu.TopicConsumer, + name=None, + exchange_name=None, + ack_on_error=False), + "a_topic", _callback) + message = FakeMessage("some message") + consumer._callback_handler(message, _callback) + self.assertTrue(message.acked) + self.assertFalse(message.rejected) + def test_message_ttl_on_timeout(self): """Test message ttl being set by request timeout. The message should die on the vine and never arrive. @@ -514,7 +597,25 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase): 'pool.name', ) - def test_join_consumer_pool(self): + def test_join_consumer_pool_default(self): + meth = 'declare_topic_consumer' + with mock.patch.object(self.rpc.Connection, meth) as p: + conn = self.rpc.create_connection(FLAGS) + conn.join_consumer_pool( + callback=lambda *a, **k: (a, k), + pool_name='pool.name', + topic='topic.name', + exchange_name='exchange.name', + ) + p.assert_called_with( + callback=mock.ANY, # the callback wrapper + queue_name='pool.name', + exchange_name='exchange.name', + topic='topic.name', + ack_on_error=True, + ) + + def test_join_consumer_pool_no_ack(self): meth = 'declare_topic_consumer' with mock.patch.object(self.rpc.Connection, meth) as p: conn = self.rpc.create_connection(FLAGS) @@ -523,12 +624,14 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase): pool_name='pool.name', topic='topic.name', exchange_name='exchange.name', + ack_on_error=False, ) p.assert_called_with( callback=mock.ANY, # the callback wrapper queue_name='pool.name', exchange_name='exchange.name', topic='topic.name', + ack_on_error=False, ) |