diff options
author | Raymond Pekowski <pekowski@gmail.com> | 2013-06-25 04:24:05 +0000 |
---|---|---|
committer | Raymond Pekowski <pekowski@gmail.com> | 2013-06-25 21:49:50 +0000 |
commit | 22ec8ff616a799085239e3e529daeeefea6366c4 (patch) | |
tree | f7a0802d0ef438dcbb31ff62c0fbea43d0b269b2 /tests/unit/rpc/test_qpid.py | |
parent | bc10b7aeada4ea66e85e10b94b5c219e3c8d2e77 (diff) | |
download | oslo-22ec8ff616a799085239e3e529daeeefea6366c4.tar.gz oslo-22ec8ff616a799085239e3e529daeeefea6366c4.tar.xz oslo-22ec8ff616a799085239e3e529daeeefea6366c4.zip |
Make AMQP based RPC consumer threads more robust
bug 1189711 Should RPC consume_in_thread() be more fault tolerant?
There are unprotected holes in the thread kicked off by RPC
consume_in_thread such that an exception will kill the thread.
This exists for both the service (TopicConsumer) and the new
reply proxy (DirectConsumer) consumers. This patch plugs
those holes as close to the base of the consumer thread as
possible by catching all non-caught exceptions and retrying
with sleeps between retries and some pacing of the log
output to prevent log flooding.
Change-Id: I0d6ec8a5e3a310314da201656ee862bb40b41616
Diffstat (limited to 'tests/unit/rpc/test_qpid.py')
-rw-r--r-- | tests/unit/rpc/test_qpid.py | 31 |
1 files changed, 27 insertions, 4 deletions
diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py index 0bad387..5d51a4b 100644 --- a/tests/unit/rpc/test_qpid.py +++ b/tests/unit/rpc/test_qpid.py @@ -26,6 +26,7 @@ eventlet.monkey_patch() import fixtures import mox from oslo.config import cfg +import time import uuid from openstack.common import context @@ -218,7 +219,7 @@ class RpcQpidTestCase(utils.BaseTestCase): ) connection.close() - def test_topic_consumer(self): + def test_topic_consumer(self, consume_thread_exc=False): self.mock_connection = self.mox.CreateMock(self.orig_connection) self.mock_session = self.mox.CreateMock(self.orig_session) self.mock_receiver = self.mox.CreateMock(self.orig_receiver) @@ -235,6 +236,9 @@ class RpcQpidTestCase(utils.BaseTestCase): self.mock_session.receiver(expected_address).AndReturn( self.mock_receiver) self.mock_receiver.capacity = 1 + if consume_thread_exc: + self.mock_session.next_receiver(timeout=None).AndRaise( + Exception('unexpected exception')) self.mock_connection.close() self.mox.ReplayAll() @@ -244,8 +248,14 @@ class RpcQpidTestCase(utils.BaseTestCase): lambda *_x, **_y: None, queue_name='impl.qpid.test.workers', exchange_name='foobar') + if consume_thread_exc: + connection.consume_in_thread() + time.sleep(0) connection.close() + def test_consume_thread_exception(self): + self.test_topic_consumer(consume_thread_exc=True) + def _test_cast(self, fanout, server_params=None): self.mock_connection = self.mox.CreateMock(self.orig_connection) self.mock_session = self.mox.CreateMock(self.orig_session) @@ -338,7 +348,11 @@ class RpcQpidTestCase(utils.BaseTestCase): self._setup_to_server_tests(server_params) self._test_cast(fanout=True, server_params=server_params) + def my_time_sleep(self, arg): + pass + def _test_call_mock_common(self): + self.stubs.Set(time, 'sleep', self.my_time_sleep) self.mock_connection = self.mox.CreateMock(self.orig_connection) self.mock_session = self.mox.CreateMock(self.orig_session) self.mock_sender = self.mox.CreateMock(self.orig_sender) @@ -367,9 +381,12 @@ class RpcQpidTestCase(utils.BaseTestCase): self.mock_session.close() self.mock_connection.session().AndReturn(self.mock_session) - def _test_call(self, multi): + def _test_call(self, multi, reply_proxy_exc): self._test_call_mock_common() + if reply_proxy_exc: + self.mock_session.next_receiver(timeout=None).AndRaise( + Exception('unexpected exception')) self.mock_session.next_receiver(timeout=None).AndReturn( self.mock_receiver) self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( @@ -393,6 +410,9 @@ class RpcQpidTestCase(utils.BaseTestCase): "failure": False, "ending": False})) self.mock_session.acknowledge(mox.IgnoreArg()) + if reply_proxy_exc: + self.mock_session.next_receiver(timeout=None).AndRaise( + Exception('unexpected exception')) self.mock_session.next_receiver(timeout=None).AndReturn( self.mock_receiver) self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( @@ -425,7 +445,10 @@ class RpcQpidTestCase(utils.BaseTestCase): self.uuid4 = uuid.uuid4() def test_call(self): - self._test_call(multi=False) + self._test_call(multi=False, reply_proxy_exc=False) + + def test_replyproxy_consume_thread_unexpected_exceptions(self): + self._test_call(multi=False, reply_proxy_exc=True) def _test_call_with_timeout(self, timeout, expect_failure): self._test_call_mock_common() @@ -483,7 +506,7 @@ class RpcQpidTestCase(utils.BaseTestCase): self._test_call_with_timeout(timeout=0.1, expect_failure=True) def test_multicall(self): - self._test_call(multi=True) + self._test_call(multi=True, reply_proxy_exc=False) def _test_publisher(self, message=True): """Test that messages containing long strings are correctly serialized |