summaryrefslogtreecommitdiffstats
path: root/openstack
diff options
context:
space:
mode:
authorSandy Walsh <sandy.walsh@rackspace.com>2013-06-19 17:30:06 -0300
committerSandy Walsh <sandy.walsh@rackspace.com>2013-06-20 14:35:25 -0300
commite204885a907c6cca79f8504a942006cdb737bb28 (patch)
treee28105d857c079035eae1371209cc02e3e6e6855 /openstack
parent49552a75bbe14c4fbce1de40efe985fe5832816e (diff)
downloadoslo-e204885a907c6cca79f8504a942006cdb737bb28.tar.gz
oslo-e204885a907c6cca79f8504a942006cdb737bb28.tar.xz
oslo-e204885a907c6cca79f8504a942006cdb737bb28.zip
Optionally reject messages on exception.
Rather than blindly ack() all messages, consumers can be created that reject() the message if an exception occurs in the callback. Currently only implemented in kombu driver. Change-Id: I308dd794b50966e3a189d31b2b51fdac36cd7d3a
Diffstat (limited to 'openstack')
-rw-r--r--openstack/common/rpc/amqp.py6
-rw-r--r--openstack/common/rpc/impl_kombu.py51
-rw-r--r--openstack/common/rpc/impl_qpid.py3
3 files changed, 44 insertions, 16 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index 22e01d7..1ab7ae4 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -151,11 +151,13 @@ class ConnectionContext(rpc_common.Connection):
def create_worker(self, topic, proxy, pool_name):
self.connection.create_worker(topic, proxy, pool_name)
- def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
+ def join_consumer_pool(self, callback, pool_name, topic, exchange_name,
+ ack_on_error=True):
self.connection.join_consumer_pool(callback,
pool_name,
topic,
- exchange_name)
+ exchange_name,
+ ack_on_error)
def consume_in_thread(self):
self.connection.consume_in_thread()
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index c062d9a..716b120 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -129,6 +129,7 @@ class ConsumerBase(object):
self.tag = str(tag)
self.kwargs = kwargs
self.queue = None
+ self.ack_on_error = kwargs.get('ack_on_error', True)
self.reconnect(channel)
def reconnect(self, channel):
@@ -138,6 +139,36 @@ class ConsumerBase(object):
self.queue = kombu.entity.Queue(**self.kwargs)
self.queue.declare()
+ def _callback_handler(self, message, callback):
+ """Call callback with deserialized message.
+
+ Messages that are processed without exception are ack'ed.
+
+ If the message processing generates an exception, it will be
+ ack'ed if ack_on_error=True. Otherwise it will be .reject()'ed.
+ Rejection is better than waiting for the message to timeout.
+ Rejected messages are immediately requeued.
+ """
+
+ ack_msg = False
+ try:
+ msg = rpc_common.deserialize_msg(message.payload)
+ callback(msg)
+ ack_msg = True
+ except Exception:
+ if self.ack_on_error:
+ ack_msg = True
+ LOG.exception(_("Failed to process message"
+ " ... skipping it."))
+ else:
+ LOG.exception(_("Failed to process message"
+ " ... will requeue."))
+ finally:
+ if ack_msg:
+ message.ack()
+ else:
+ message.reject()
+
def consume(self, *args, **kwargs):
"""Actually declare the consumer on the amqp channel. This will
start the flow of messages from the queue. Using the
@@ -150,8 +181,6 @@ class ConsumerBase(object):
If kwargs['nowait'] is True, then this call will block until
a message is read.
- Messages will automatically be acked if the callback doesn't
- raise an exception
"""
options = {'consumer_tag': self.tag}
@@ -162,13 +191,7 @@ class ConsumerBase(object):
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
- try:
- msg = rpc_common.deserialize_msg(message.payload)
- callback(msg)
- except Exception:
- LOG.exception(_("Failed to process message... skipping it."))
- finally:
- message.ack()
+ self._callback_handler(message, callback)
self.queue.consume(*args, callback=_callback, **options)
@@ -635,8 +658,8 @@ class Connection(object):
def _consume():
if info['do_consume']:
- queues_head = self.consumers[:-1]
- queues_tail = self.consumers[-1]
+ queues_head = self.consumers[:-1] # not fanout.
+ queues_tail = self.consumers[-1] # fanout
for queue in queues_head:
queue.consume(nowait=True)
queues_tail.consume(nowait=False)
@@ -685,11 +708,12 @@ class Connection(object):
self.declare_consumer(DirectConsumer, topic, callback)
def declare_topic_consumer(self, topic, callback=None, queue_name=None,
- exchange_name=None):
+ exchange_name=None, ack_on_error=True):
"""Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
exchange_name=exchange_name,
+ ack_on_error=ack_on_error,
),
topic, callback)
@@ -754,7 +778,7 @@ class Connection(object):
self.declare_topic_consumer(topic, proxy_cb, pool_name)
def join_consumer_pool(self, callback, pool_name, topic,
- exchange_name=None):
+ exchange_name=None, ack_on_error=True):
"""Register as a member of a group of consumers for a given topic from
the specified exchange.
@@ -775,6 +799,7 @@ class Connection(object):
topic=topic,
exchange_name=exchange_name,
callback=callback_wrapper,
+ ack_on_error=ack_on_error,
)
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index 7352517..46dcb6a 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -152,6 +152,7 @@ class ConsumerBase(object):
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
+ # TODO(sandy): Need support for optional ack_on_error.
self.session.acknowledge(message)
def get_receiver(self):
@@ -615,7 +616,7 @@ class Connection(object):
return consumer
def join_consumer_pool(self, callback, pool_name, topic,
- exchange_name=None):
+ exchange_name=None, ack_on_error=True):
"""Register as a member of a group of consumers for a given topic from
the specified exchange.