summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--openstack/common/rpc/impl_qpid.py7
-rw-r--r--tests/unit/rpc/test_qpid.py4
2 files changed, 10 insertions, 1 deletions
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index 3c46309..78236f2 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -138,7 +138,12 @@ class ConsumerBase(object):
def consume(self):
"""Fetch the message and pass it to the callback object"""
message = self.receiver.fetch()
- self.callback(message.content)
+ try:
+ self.callback(message.content)
+ except Exception:
+ logging.exception(_("Failed to process message... skipping it."))
+ finally:
+ self.session.acknowledge(message)
def get_receiver(self):
return self.receiver
diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py
index b753c22..a7526c5 100644
--- a/tests/unit/rpc/test_qpid.py
+++ b/tests/unit/rpc/test_qpid.py
@@ -307,6 +307,7 @@ class RpcQpidTestCase(unittest.TestCase):
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
{"result": "foo", "failure": False, "ending": False}))
+ self.mock_session.acknowledge(mox.IgnoreArg())
if multi:
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
self.mock_receiver)
@@ -314,16 +315,19 @@ class RpcQpidTestCase(unittest.TestCase):
qpid.messaging.Message(
{"result": "bar", "failure": False,
"ending": False}))
+ self.mock_session.acknowledge(mox.IgnoreArg())
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(
qpid.messaging.Message(
{"result": "baz", "failure": False,
"ending": False}))
+ self.mock_session.acknowledge(mox.IgnoreArg())
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
{"failure": False, "ending": True}))
+ self.mock_session.acknowledge(mox.IgnoreArg())
self.mock_session.close()
self.mock_connection.session().AndReturn(self.mock_session)