diff options
| author | Jenkins <jenkins@review.openstack.org> | 2012-06-15 13:13:36 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2012-06-15 13:13:36 +0000 |
| commit | 925edb3ee8bbd97afaa43b2888ab45d2bca50faf (patch) | |
| tree | 262f3809e2222e321816182066d1b96a7b4a6f96 | |
| parent | c092bd596287a2a66938d01152ea8dbade1926b6 (diff) | |
| parent | 341ae3b475f9aab85c9202995529b1d041a71b2b (diff) | |
| download | oslo-925edb3ee8bbd97afaa43b2888ab45d2bca50faf.tar.gz oslo-925edb3ee8bbd97afaa43b2888ab45d2bca50faf.tar.xz oslo-925edb3ee8bbd97afaa43b2888ab45d2bca50faf.zip | |
Merge "Add missing ack to impl_qpid."
| -rw-r--r-- | openstack/common/rpc/impl_qpid.py | 7 | ||||
| -rw-r--r-- | tests/unit/rpc/test_qpid.py | 4 |
2 files changed, 10 insertions, 1 deletions
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index 3c46309..78236f2 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -138,7 +138,12 @@ class ConsumerBase(object): def consume(self): """Fetch the message and pass it to the callback object""" message = self.receiver.fetch() - self.callback(message.content) + try: + self.callback(message.content) + except Exception: + logging.exception(_("Failed to process message... skipping it.")) + finally: + self.session.acknowledge(message) def get_receiver(self): return self.receiver diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py index b753c22..a7526c5 100644 --- a/tests/unit/rpc/test_qpid.py +++ b/tests/unit/rpc/test_qpid.py @@ -307,6 +307,7 @@ class RpcQpidTestCase(unittest.TestCase): self.mock_receiver) self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( {"result": "foo", "failure": False, "ending": False})) + self.mock_session.acknowledge(mox.IgnoreArg()) if multi: self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( self.mock_receiver) @@ -314,16 +315,19 @@ class RpcQpidTestCase(unittest.TestCase): qpid.messaging.Message( {"result": "bar", "failure": False, "ending": False})) + self.mock_session.acknowledge(mox.IgnoreArg()) self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( self.mock_receiver) self.mock_receiver.fetch().AndReturn( qpid.messaging.Message( {"result": "baz", "failure": False, "ending": False})) + self.mock_session.acknowledge(mox.IgnoreArg()) self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( self.mock_receiver) self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( {"failure": False, "ending": True})) + self.mock_session.acknowledge(mox.IgnoreArg()) self.mock_session.close() self.mock_connection.session().AndReturn(self.mock_session) |
