summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
authorSandy Walsh <sandy.walsh@rackspace.com>2013-06-19 17:30:06 -0300
committerSandy Walsh <sandy.walsh@rackspace.com>2013-06-20 14:35:25 -0300
commite204885a907c6cca79f8504a942006cdb737bb28 (patch)
treee28105d857c079035eae1371209cc02e3e6e6855 /tests
parent49552a75bbe14c4fbce1de40efe985fe5832816e (diff)
downloadoslo-e204885a907c6cca79f8504a942006cdb737bb28.tar.gz
oslo-e204885a907c6cca79f8504a942006cdb737bb28.tar.xz
oslo-e204885a907c6cca79f8504a942006cdb737bb28.zip
Optionally reject messages on exception.
Rather than blindly ack() all messages, consumers can be created that reject() the message if an exception occurs in the callback. Currently only implemented in kombu driver. Change-Id: I308dd794b50966e3a189d31b2b51fdac36cd7d3a
Diffstat (limited to 'tests')
-rw-r--r--tests/unit/rpc/test_kombu.py105
1 files changed, 104 insertions, 1 deletions
diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py
index 159fefb..cafb98b 100644
--- a/tests/unit/rpc/test_kombu.py
+++ b/tests/unit/rpc/test_kombu.py
@@ -23,6 +23,7 @@ import eventlet
eventlet.monkey_patch()
import contextlib
+import functools
import logging
import mock
@@ -77,6 +78,20 @@ class KombuStubs:
self.rpc = None
+class FakeMessage(object):
+ acked = False
+ rejected = False
+
+ def __init__(self, payload):
+ self.payload = payload
+
+ def ack(self):
+ self.acked = True
+
+ def reject(self):
+ self.rejected = True
+
+
class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase):
def setUp(self):
KombuStubs.setUp(self)
@@ -112,6 +127,74 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase):
self.assertEqual(self.received_message, message)
+ def test_callback_handler_ack_on_error(self):
+ """The default case will ack on error. Same as before.
+ """
+ def _callback(msg):
+ pass
+
+ conn = self.rpc.create_connection(FLAGS)
+ consumer = conn.declare_consumer(functools.partial(
+ impl_kombu.TopicConsumer,
+ name=None,
+ exchange_name=None),
+ "a_topic", _callback)
+ message = FakeMessage("some message")
+ consumer._callback_handler(message, _callback)
+ self.assertTrue(message.acked)
+ self.assertFalse(message.rejected)
+
+ def test_callback_handler_ack_on_error_exception(self):
+
+ def _callback(msg):
+ raise MyException()
+
+ conn = self.rpc.create_connection(FLAGS)
+ consumer = conn.declare_consumer(functools.partial(
+ impl_kombu.TopicConsumer,
+ name=None,
+ exchange_name=None,
+ ack_on_error=True),
+ "a_topic", _callback)
+ message = FakeMessage("some message")
+ consumer._callback_handler(message, _callback)
+ self.assertTrue(message.acked)
+ self.assertFalse(message.rejected)
+
+ def test_callback_handler_no_ack_on_error_exception(self):
+
+ def _callback(msg):
+ raise MyException()
+
+ conn = self.rpc.create_connection(FLAGS)
+ consumer = conn.declare_consumer(functools.partial(
+ impl_kombu.TopicConsumer,
+ name=None,
+ exchange_name=None,
+ ack_on_error=False),
+ "a_topic", _callback)
+ message = FakeMessage("some message")
+ consumer._callback_handler(message, _callback)
+ self.assertFalse(message.acked)
+ self.assertTrue(message.rejected)
+
+ def test_callback_handler_no_ack_on_error(self):
+
+ def _callback(msg):
+ pass
+
+ conn = self.rpc.create_connection(FLAGS)
+ consumer = conn.declare_consumer(functools.partial(
+ impl_kombu.TopicConsumer,
+ name=None,
+ exchange_name=None,
+ ack_on_error=False),
+ "a_topic", _callback)
+ message = FakeMessage("some message")
+ consumer._callback_handler(message, _callback)
+ self.assertTrue(message.acked)
+ self.assertFalse(message.rejected)
+
def test_message_ttl_on_timeout(self):
"""Test message ttl being set by request timeout. The message
should die on the vine and never arrive.
@@ -514,7 +597,25 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase):
'pool.name',
)
- def test_join_consumer_pool(self):
+ def test_join_consumer_pool_default(self):
+ meth = 'declare_topic_consumer'
+ with mock.patch.object(self.rpc.Connection, meth) as p:
+ conn = self.rpc.create_connection(FLAGS)
+ conn.join_consumer_pool(
+ callback=lambda *a, **k: (a, k),
+ pool_name='pool.name',
+ topic='topic.name',
+ exchange_name='exchange.name',
+ )
+ p.assert_called_with(
+ callback=mock.ANY, # the callback wrapper
+ queue_name='pool.name',
+ exchange_name='exchange.name',
+ topic='topic.name',
+ ack_on_error=True,
+ )
+
+ def test_join_consumer_pool_no_ack(self):
meth = 'declare_topic_consumer'
with mock.patch.object(self.rpc.Connection, meth) as p:
conn = self.rpc.create_connection(FLAGS)
@@ -523,12 +624,14 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase):
pool_name='pool.name',
topic='topic.name',
exchange_name='exchange.name',
+ ack_on_error=False,
)
p.assert_called_with(
callback=mock.ANY, # the callback wrapper
queue_name='pool.name',
exchange_name='exchange.name',
topic='topic.name',
+ ack_on_error=False,
)