diff options
author | Jenkins <jenkins@review.openstack.org> | 2013-06-24 21:27:10 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2013-06-24 21:27:10 +0000 |
commit | bc10b7aeada4ea66e85e10b94b5c219e3c8d2e77 (patch) | |
tree | 401cb3970168f27af98ca5d7f3d2a60bad0b0255 | |
parent | e177a47c8397b6285ffc4ef877280ec550b71e4e (diff) | |
parent | e204885a907c6cca79f8504a942006cdb737bb28 (diff) | |
download | oslo-bc10b7aeada4ea66e85e10b94b5c219e3c8d2e77.tar.gz oslo-bc10b7aeada4ea66e85e10b94b5c219e3c8d2e77.tar.xz oslo-bc10b7aeada4ea66e85e10b94b5c219e3c8d2e77.zip |
Merge "Optionally reject messages on exception."
-rw-r--r-- | openstack/common/rpc/amqp.py | 6 | ||||
-rw-r--r-- | openstack/common/rpc/impl_kombu.py | 51 | ||||
-rw-r--r-- | openstack/common/rpc/impl_qpid.py | 3 | ||||
-rw-r--r-- | tests/unit/rpc/test_kombu.py | 105 |
4 files changed, 148 insertions, 17 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index 22e01d7..1ab7ae4 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -151,11 +151,13 @@ class ConnectionContext(rpc_common.Connection): def create_worker(self, topic, proxy, pool_name): self.connection.create_worker(topic, proxy, pool_name) - def join_consumer_pool(self, callback, pool_name, topic, exchange_name): + def join_consumer_pool(self, callback, pool_name, topic, exchange_name, + ack_on_error=True): self.connection.join_consumer_pool(callback, pool_name, topic, - exchange_name) + exchange_name, + ack_on_error) def consume_in_thread(self): self.connection.consume_in_thread() diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index c062d9a..716b120 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -129,6 +129,7 @@ class ConsumerBase(object): self.tag = str(tag) self.kwargs = kwargs self.queue = None + self.ack_on_error = kwargs.get('ack_on_error', True) self.reconnect(channel) def reconnect(self, channel): @@ -138,6 +139,36 @@ class ConsumerBase(object): self.queue = kombu.entity.Queue(**self.kwargs) self.queue.declare() + def _callback_handler(self, message, callback): + """Call callback with deserialized message. + + Messages that are processed without exception are ack'ed. + + If the message processing generates an exception, it will be + ack'ed if ack_on_error=True. Otherwise it will be .reject()'ed. + Rejection is better than waiting for the message to timeout. + Rejected messages are immediately requeued. + """ + + ack_msg = False + try: + msg = rpc_common.deserialize_msg(message.payload) + callback(msg) + ack_msg = True + except Exception: + if self.ack_on_error: + ack_msg = True + LOG.exception(_("Failed to process message" + " ... skipping it.")) + else: + LOG.exception(_("Failed to process message" + " ... will requeue.")) + finally: + if ack_msg: + message.ack() + else: + message.reject() + def consume(self, *args, **kwargs): """Actually declare the consumer on the amqp channel. This will start the flow of messages from the queue. Using the @@ -150,8 +181,6 @@ class ConsumerBase(object): If kwargs['nowait'] is True, then this call will block until a message is read. - Messages will automatically be acked if the callback doesn't - raise an exception """ options = {'consumer_tag': self.tag} @@ -162,13 +191,7 @@ class ConsumerBase(object): def _callback(raw_message): message = self.channel.message_to_python(raw_message) - try: - msg = rpc_common.deserialize_msg(message.payload) - callback(msg) - except Exception: - LOG.exception(_("Failed to process message... skipping it.")) - finally: - message.ack() + self._callback_handler(message, callback) self.queue.consume(*args, callback=_callback, **options) @@ -635,8 +658,8 @@ class Connection(object): def _consume(): if info['do_consume']: - queues_head = self.consumers[:-1] - queues_tail = self.consumers[-1] + queues_head = self.consumers[:-1] # not fanout. + queues_tail = self.consumers[-1] # fanout for queue in queues_head: queue.consume(nowait=True) queues_tail.consume(nowait=False) @@ -685,11 +708,12 @@ class Connection(object): self.declare_consumer(DirectConsumer, topic, callback) def declare_topic_consumer(self, topic, callback=None, queue_name=None, - exchange_name=None): + exchange_name=None, ack_on_error=True): """Create a 'topic' consumer.""" self.declare_consumer(functools.partial(TopicConsumer, name=queue_name, exchange_name=exchange_name, + ack_on_error=ack_on_error, ), topic, callback) @@ -754,7 +778,7 @@ class Connection(object): self.declare_topic_consumer(topic, proxy_cb, pool_name) def join_consumer_pool(self, callback, pool_name, topic, - exchange_name=None): + exchange_name=None, ack_on_error=True): """Register as a member of a group of consumers for a given topic from the specified exchange. @@ -775,6 +799,7 @@ class Connection(object): topic=topic, exchange_name=exchange_name, callback=callback_wrapper, + ack_on_error=ack_on_error, ) diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index 7352517..46dcb6a 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -152,6 +152,7 @@ class ConsumerBase(object): except Exception: LOG.exception(_("Failed to process message... skipping it.")) finally: + # TODO(sandy): Need support for optional ack_on_error. self.session.acknowledge(message) def get_receiver(self): @@ -615,7 +616,7 @@ class Connection(object): return consumer def join_consumer_pool(self, callback, pool_name, topic, - exchange_name=None): + exchange_name=None, ack_on_error=True): """Register as a member of a group of consumers for a given topic from the specified exchange. diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py index ef5d634..ba88078 100644 --- a/tests/unit/rpc/test_kombu.py +++ b/tests/unit/rpc/test_kombu.py @@ -23,6 +23,7 @@ import eventlet eventlet.monkey_patch() import contextlib +import functools import logging import mock @@ -77,6 +78,20 @@ class KombuStubs: self.rpc = None +class FakeMessage(object): + acked = False + rejected = False + + def __init__(self, payload): + self.payload = payload + + def ack(self): + self.acked = True + + def reject(self): + self.rejected = True + + class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase): def setUp(self): KombuStubs.setUp(self) @@ -112,6 +127,74 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase): self.assertEqual(self.received_message, message) + def test_callback_handler_ack_on_error(self): + """The default case will ack on error. Same as before. + """ + def _callback(msg): + pass + + conn = self.rpc.create_connection(FLAGS) + consumer = conn.declare_consumer(functools.partial( + impl_kombu.TopicConsumer, + name=None, + exchange_name=None), + "a_topic", _callback) + message = FakeMessage("some message") + consumer._callback_handler(message, _callback) + self.assertTrue(message.acked) + self.assertFalse(message.rejected) + + def test_callback_handler_ack_on_error_exception(self): + + def _callback(msg): + raise MyException() + + conn = self.rpc.create_connection(FLAGS) + consumer = conn.declare_consumer(functools.partial( + impl_kombu.TopicConsumer, + name=None, + exchange_name=None, + ack_on_error=True), + "a_topic", _callback) + message = FakeMessage("some message") + consumer._callback_handler(message, _callback) + self.assertTrue(message.acked) + self.assertFalse(message.rejected) + + def test_callback_handler_no_ack_on_error_exception(self): + + def _callback(msg): + raise MyException() + + conn = self.rpc.create_connection(FLAGS) + consumer = conn.declare_consumer(functools.partial( + impl_kombu.TopicConsumer, + name=None, + exchange_name=None, + ack_on_error=False), + "a_topic", _callback) + message = FakeMessage("some message") + consumer._callback_handler(message, _callback) + self.assertFalse(message.acked) + self.assertTrue(message.rejected) + + def test_callback_handler_no_ack_on_error(self): + + def _callback(msg): + pass + + conn = self.rpc.create_connection(FLAGS) + consumer = conn.declare_consumer(functools.partial( + impl_kombu.TopicConsumer, + name=None, + exchange_name=None, + ack_on_error=False), + "a_topic", _callback) + message = FakeMessage("some message") + consumer._callback_handler(message, _callback) + self.assertTrue(message.acked) + self.assertFalse(message.rejected) + def test_message_ttl_on_timeout(self): """Test message ttl being set by request timeout. The message should die on the vine and never arrive. @@ -530,7 +613,25 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase): 'pool.name', ) - def test_join_consumer_pool(self): + def test_join_consumer_pool_default(self): + meth = 'declare_topic_consumer' + with mock.patch.object(self.rpc.Connection, meth) as p: + conn = self.rpc.create_connection(FLAGS) + conn.join_consumer_pool( + callback=lambda *a, **k: (a, k), + pool_name='pool.name', + topic='topic.name', + exchange_name='exchange.name', + ) + p.assert_called_with( + callback=mock.ANY, # the callback wrapper + queue_name='pool.name', + exchange_name='exchange.name', + topic='topic.name', + ack_on_error=True, + ) + + def test_join_consumer_pool_no_ack(self): meth = 'declare_topic_consumer' with mock.patch.object(self.rpc.Connection, meth) as p: conn = self.rpc.create_connection(FLAGS) @@ -539,12 +640,14 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase): pool_name='pool.name', topic='topic.name', exchange_name='exchange.name', + ack_on_error=False, ) p.assert_called_with( callback=mock.ANY, # the callback wrapper queue_name='pool.name', exchange_name='exchange.name', topic='topic.name', + ack_on_error=False, ) |