summaryrefslogtreecommitdiffstats
path: root/openstack
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-06-24 21:27:10 +0000
committerGerrit Code Review <review@openstack.org>2013-06-24 21:27:10 +0000
commitbc10b7aeada4ea66e85e10b94b5c219e3c8d2e77 (patch)
tree401cb3970168f27af98ca5d7f3d2a60bad0b0255 /openstack
parente177a47c8397b6285ffc4ef877280ec550b71e4e (diff)
parente204885a907c6cca79f8504a942006cdb737bb28 (diff)
downloadoslo-bc10b7aeada4ea66e85e10b94b5c219e3c8d2e77.tar.gz
oslo-bc10b7aeada4ea66e85e10b94b5c219e3c8d2e77.tar.xz
oslo-bc10b7aeada4ea66e85e10b94b5c219e3c8d2e77.zip
Merge "Optionally reject messages on exception."
Diffstat (limited to 'openstack')
-rw-r--r--openstack/common/rpc/amqp.py6
-rw-r--r--openstack/common/rpc/impl_kombu.py51
-rw-r--r--openstack/common/rpc/impl_qpid.py3
3 files changed, 44 insertions, 16 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index 22e01d7..1ab7ae4 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -151,11 +151,13 @@ class ConnectionContext(rpc_common.Connection):
def create_worker(self, topic, proxy, pool_name):
self.connection.create_worker(topic, proxy, pool_name)
- def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
+ def join_consumer_pool(self, callback, pool_name, topic, exchange_name,
+ ack_on_error=True):
self.connection.join_consumer_pool(callback,
pool_name,
topic,
- exchange_name)
+ exchange_name,
+ ack_on_error)
def consume_in_thread(self):
self.connection.consume_in_thread()
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index c062d9a..716b120 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -129,6 +129,7 @@ class ConsumerBase(object):
self.tag = str(tag)
self.kwargs = kwargs
self.queue = None
+ self.ack_on_error = kwargs.get('ack_on_error', True)
self.reconnect(channel)
def reconnect(self, channel):
@@ -138,6 +139,36 @@ class ConsumerBase(object):
self.queue = kombu.entity.Queue(**self.kwargs)
self.queue.declare()
+ def _callback_handler(self, message, callback):
+ """Call callback with deserialized message.
+
+ Messages that are processed without exception are ack'ed.
+
+ If the message processing generates an exception, it will be
+ ack'ed if ack_on_error=True. Otherwise it will be .reject()'ed.
+ Rejection is better than waiting for the message to timeout.
+ Rejected messages are immediately requeued.
+ """
+
+ ack_msg = False
+ try:
+ msg = rpc_common.deserialize_msg(message.payload)
+ callback(msg)
+ ack_msg = True
+ except Exception:
+ if self.ack_on_error:
+ ack_msg = True
+ LOG.exception(_("Failed to process message"
+ " ... skipping it."))
+ else:
+ LOG.exception(_("Failed to process message"
+ " ... will requeue."))
+ finally:
+ if ack_msg:
+ message.ack()
+ else:
+ message.reject()
+
def consume(self, *args, **kwargs):
"""Actually declare the consumer on the amqp channel. This will
start the flow of messages from the queue. Using the
@@ -150,8 +181,6 @@ class ConsumerBase(object):
If kwargs['nowait'] is True, then this call will block until
a message is read.
- Messages will automatically be acked if the callback doesn't
- raise an exception
"""
options = {'consumer_tag': self.tag}
@@ -162,13 +191,7 @@ class ConsumerBase(object):
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
- try:
- msg = rpc_common.deserialize_msg(message.payload)
- callback(msg)
- except Exception:
- LOG.exception(_("Failed to process message... skipping it."))
- finally:
- message.ack()
+ self._callback_handler(message, callback)
self.queue.consume(*args, callback=_callback, **options)
@@ -635,8 +658,8 @@ class Connection(object):
def _consume():
if info['do_consume']:
- queues_head = self.consumers[:-1]
- queues_tail = self.consumers[-1]
+ queues_head = self.consumers[:-1] # not fanout.
+ queues_tail = self.consumers[-1] # fanout
for queue in queues_head:
queue.consume(nowait=True)
queues_tail.consume(nowait=False)
@@ -685,11 +708,12 @@ class Connection(object):
self.declare_consumer(DirectConsumer, topic, callback)
def declare_topic_consumer(self, topic, callback=None, queue_name=None,
- exchange_name=None):
+ exchange_name=None, ack_on_error=True):
"""Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
exchange_name=exchange_name,
+ ack_on_error=ack_on_error,
),
topic, callback)
@@ -754,7 +778,7 @@ class Connection(object):
self.declare_topic_consumer(topic, proxy_cb, pool_name)
def join_consumer_pool(self, callback, pool_name, topic,
- exchange_name=None):
+ exchange_name=None, ack_on_error=True):
"""Register as a member of a group of consumers for a given topic from
the specified exchange.
@@ -775,6 +799,7 @@ class Connection(object):
topic=topic,
exchange_name=exchange_name,
callback=callback_wrapper,
+ ack_on_error=ack_on_error,
)
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index 7352517..46dcb6a 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -152,6 +152,7 @@ class ConsumerBase(object):
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
+ # TODO(sandy): Need support for optional ack_on_error.
self.session.acknowledge(message)
def get_receiver(self):
@@ -615,7 +616,7 @@ class Connection(object):
return consumer
def join_consumer_pool(self, callback, pool_name, topic,
- exchange_name=None):
+ exchange_name=None, ack_on_error=True):
"""Register as a member of a group of consumers for a given topic from
the specified exchange.