summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-06-24 21:27:10 +0000
committerGerrit Code Review <review@openstack.org>2013-06-24 21:27:10 +0000
commitbc10b7aeada4ea66e85e10b94b5c219e3c8d2e77 (patch)
tree401cb3970168f27af98ca5d7f3d2a60bad0b0255 /tests
parente177a47c8397b6285ffc4ef877280ec550b71e4e (diff)
parente204885a907c6cca79f8504a942006cdb737bb28 (diff)
downloadoslo-bc10b7aeada4ea66e85e10b94b5c219e3c8d2e77.tar.gz
oslo-bc10b7aeada4ea66e85e10b94b5c219e3c8d2e77.tar.xz
oslo-bc10b7aeada4ea66e85e10b94b5c219e3c8d2e77.zip
Merge "Optionally reject messages on exception."
Diffstat (limited to 'tests')
-rw-r--r--tests/unit/rpc/test_kombu.py105
1 files changed, 104 insertions, 1 deletions
diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py
index ef5d634..ba88078 100644
--- a/tests/unit/rpc/test_kombu.py
+++ b/tests/unit/rpc/test_kombu.py
@@ -23,6 +23,7 @@ import eventlet
eventlet.monkey_patch()
import contextlib
+import functools
import logging
import mock
@@ -77,6 +78,20 @@ class KombuStubs:
self.rpc = None
+class FakeMessage(object):
+ acked = False
+ rejected = False
+
+ def __init__(self, payload):
+ self.payload = payload
+
+ def ack(self):
+ self.acked = True
+
+ def reject(self):
+ self.rejected = True
+
+
class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase):
def setUp(self):
KombuStubs.setUp(self)
@@ -112,6 +127,74 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase):
self.assertEqual(self.received_message, message)
+ def test_callback_handler_ack_on_error(self):
+ """The default case will ack on error. Same as before.
+ """
+ def _callback(msg):
+ pass
+
+ conn = self.rpc.create_connection(FLAGS)
+ consumer = conn.declare_consumer(functools.partial(
+ impl_kombu.TopicConsumer,
+ name=None,
+ exchange_name=None),
+ "a_topic", _callback)
+ message = FakeMessage("some message")
+ consumer._callback_handler(message, _callback)
+ self.assertTrue(message.acked)
+ self.assertFalse(message.rejected)
+
+ def test_callback_handler_ack_on_error_exception(self):
+
+ def _callback(msg):
+ raise MyException()
+
+ conn = self.rpc.create_connection(FLAGS)
+ consumer = conn.declare_consumer(functools.partial(
+ impl_kombu.TopicConsumer,
+ name=None,
+ exchange_name=None,
+ ack_on_error=True),
+ "a_topic", _callback)
+ message = FakeMessage("some message")
+ consumer._callback_handler(message, _callback)
+ self.assertTrue(message.acked)
+ self.assertFalse(message.rejected)
+
+ def test_callback_handler_no_ack_on_error_exception(self):
+
+ def _callback(msg):
+ raise MyException()
+
+ conn = self.rpc.create_connection(FLAGS)
+ consumer = conn.declare_consumer(functools.partial(
+ impl_kombu.TopicConsumer,
+ name=None,
+ exchange_name=None,
+ ack_on_error=False),
+ "a_topic", _callback)
+ message = FakeMessage("some message")
+ consumer._callback_handler(message, _callback)
+ self.assertFalse(message.acked)
+ self.assertTrue(message.rejected)
+
+ def test_callback_handler_no_ack_on_error(self):
+
+ def _callback(msg):
+ pass
+
+ conn = self.rpc.create_connection(FLAGS)
+ consumer = conn.declare_consumer(functools.partial(
+ impl_kombu.TopicConsumer,
+ name=None,
+ exchange_name=None,
+ ack_on_error=False),
+ "a_topic", _callback)
+ message = FakeMessage("some message")
+ consumer._callback_handler(message, _callback)
+ self.assertTrue(message.acked)
+ self.assertFalse(message.rejected)
+
def test_message_ttl_on_timeout(self):
"""Test message ttl being set by request timeout. The message
should die on the vine and never arrive.
@@ -530,7 +613,25 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase):
'pool.name',
)
- def test_join_consumer_pool(self):
+ def test_join_consumer_pool_default(self):
+ meth = 'declare_topic_consumer'
+ with mock.patch.object(self.rpc.Connection, meth) as p:
+ conn = self.rpc.create_connection(FLAGS)
+ conn.join_consumer_pool(
+ callback=lambda *a, **k: (a, k),
+ pool_name='pool.name',
+ topic='topic.name',
+ exchange_name='exchange.name',
+ )
+ p.assert_called_with(
+ callback=mock.ANY, # the callback wrapper
+ queue_name='pool.name',
+ exchange_name='exchange.name',
+ topic='topic.name',
+ ack_on_error=True,
+ )
+
+ def test_join_consumer_pool_no_ack(self):
meth = 'declare_topic_consumer'
with mock.patch.object(self.rpc.Connection, meth) as p:
conn = self.rpc.create_connection(FLAGS)
@@ -539,12 +640,14 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase):
pool_name='pool.name',
topic='topic.name',
exchange_name='exchange.name',
+ ack_on_error=False,
)
p.assert_called_with(
callback=mock.ANY, # the callback wrapper
queue_name='pool.name',
exchange_name='exchange.name',
topic='topic.name',
+ ack_on_error=False,
)