diff options
author | Russell Bryant <rbryant@redhat.com> | 2012-06-12 17:07:18 -0400 |
---|---|---|
committer | Russell Bryant <rbryant@redhat.com> | 2012-06-13 09:11:45 -0400 |
commit | d15f3034dfab05ef0ec02b4e353b3dfa1512db35 (patch) | |
tree | f40b18262abac99fd888f000ced91bf7407e9d3b | |
parent | 9e0c6845562dcd8e294841ac06aad559247de0cf (diff) | |
download | nova-d15f3034dfab05ef0ec02b4e353b3dfa1512db35.tar.gz nova-d15f3034dfab05ef0ec02b4e353b3dfa1512db35.tar.xz nova-d15f3034dfab05ef0ec02b4e353b3dfa1512db35.zip |
Add missing ack to impl_qpid.
Fix bug 1012374.
Johannes Erdfelt pointed out that impl_qpid wasn't acking messages that
it received. This turned out to be a nasty oversight, resulting in
unbounded message queue growth inside of the python-qpid library. This
fixes it.
Change-Id: I0370293807f0282e1dbdd59246f70be031e888a9
-rw-r--r-- | nova/rpc/impl_qpid.py | 7 | ||||
-rw-r--r-- | nova/tests/rpc/test_qpid.py | 4 |
2 files changed, 10 insertions, 1 deletions
diff --git a/nova/rpc/impl_qpid.py b/nova/rpc/impl_qpid.py index 4044ec03a..c5ab4a1d5 100644 --- a/nova/rpc/impl_qpid.py +++ b/nova/rpc/impl_qpid.py @@ -137,7 +137,12 @@ class ConsumerBase(object): def consume(self): """Fetch the message and pass it to the callback object""" message = self.receiver.fetch() - self.callback(message.content) + try: + self.callback(message.content) + except Exception: + LOG.exception(_("Failed to process message... skipping it.")) + finally: + self.session.acknowledge(message) def get_receiver(self): return self.receiver diff --git a/nova/tests/rpc/test_qpid.py b/nova/tests/rpc/test_qpid.py index 1b21158f8..b8553873b 100644 --- a/nova/tests/rpc/test_qpid.py +++ b/nova/tests/rpc/test_qpid.py @@ -296,6 +296,7 @@ class RpcQpidTestCase(test.TestCase): self.mock_receiver) self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( {"result": "foo", "failure": False, "ending": False})) + self.mock_session.acknowledge(mox.IgnoreArg()) if multi: self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( self.mock_receiver) @@ -303,16 +304,19 @@ class RpcQpidTestCase(test.TestCase): qpid.messaging.Message( {"result": "bar", "failure": False, "ending": False})) + self.mock_session.acknowledge(mox.IgnoreArg()) self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( self.mock_receiver) self.mock_receiver.fetch().AndReturn( qpid.messaging.Message( {"result": "baz", "failure": False, "ending": False})) + self.mock_session.acknowledge(mox.IgnoreArg()) self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( self.mock_receiver) self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( {"failure": False, "ending": True})) + self.mock_session.acknowledge(mox.IgnoreArg()) self.mock_session.close() self.mock_connection.session().AndReturn(self.mock_session) |