summaryrefslogtreecommitdiffstats
path: root/tests/unit
diff options
context:
space:
mode:
authorRaymond Pekowski <pekowski@gmail.com>2013-06-25 04:24:05 +0000
committerRaymond Pekowski <pekowski@gmail.com>2013-06-25 21:49:50 +0000
commit22ec8ff616a799085239e3e529daeeefea6366c4 (patch)
treef7a0802d0ef438dcbb31ff62c0fbea43d0b269b2 /tests/unit
parentbc10b7aeada4ea66e85e10b94b5c219e3c8d2e77 (diff)
downloadoslo-22ec8ff616a799085239e3e529daeeefea6366c4.tar.gz
oslo-22ec8ff616a799085239e3e529daeeefea6366c4.tar.xz
oslo-22ec8ff616a799085239e3e529daeeefea6366c4.zip
Make AMQP based RPC consumer threads more robust
bug 1189711 Should RPC consume_in_thread() be more fault tolerant? There are unprotected holes in the thread kicked off by RPC consume_in_thread such that an exception will kill the thread. This exists for both the service (TopicConsumer) and the new reply proxy (DirectConsumer) consumers. This patch plugs those holes as close to the base of the consumer thread as possible by catching all non-caught exceptions and retrying with sleeps between retries and some pacing of the log output to prevent log flooding. Change-Id: I0d6ec8a5e3a310314da201656ee862bb40b41616
Diffstat (limited to 'tests/unit')
-rw-r--r--tests/unit/rpc/test_kombu.py49
-rw-r--r--tests/unit/rpc/test_qpid.py31
-rw-r--r--tests/unit/test_excutils.py111
3 files changed, 187 insertions, 4 deletions
diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py
index ba88078..2476008 100644
--- a/tests/unit/rpc/test_kombu.py
+++ b/tests/unit/rpc/test_kombu.py
@@ -29,6 +29,7 @@ import logging
import mock
from oslo.config import cfg
import six
+import time
from openstack.common import exception
from openstack.common.rpc import amqp as rpc_amqp
@@ -650,6 +651,54 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase):
ack_on_error=False,
)
+ # used to make unexpected exception tests run faster
+ def my_time_sleep(self, sleep_time):
+ return
+
+ def test_service_consume_thread_unexpected_exceptions(self):
+
+ def my_TopicConsumer_consume(myself, *args, **kwargs):
+ self.consume_calls += 1
+ # see if it can sustain three failures
+ if self.consume_calls < 3:
+ raise Exception('unexpected exception')
+ else:
+ self.orig_TopicConsumer_consume(myself, *args, **kwargs)
+
+ self.consume_calls = 0
+ self.orig_TopicConsumer_consume = impl_kombu.TopicConsumer.consume
+ self.stubs.Set(impl_kombu.TopicConsumer, 'consume',
+ my_TopicConsumer_consume)
+ self.stubs.Set(time, 'sleep', self.my_time_sleep)
+
+ value = 42
+ result = self.rpc.call(FLAGS, self.context, self.topic,
+ {"method": "echo",
+ "args": {"value": value}})
+ self.assertEqual(value, result)
+
+ def test_replyproxy_consume_thread_unexpected_exceptions(self):
+
+ def my_DirectConsumer_consume(myself, *args, **kwargs):
+ self.consume_calls += 1
+ # see if it can sustain three failures
+ if self.consume_calls < 3:
+ raise Exception('unexpected exception')
+ else:
+ self.orig_DirectConsumer_consume(myself, *args, **kwargs)
+
+ self.consume_calls = 1
+ self.orig_DirectConsumer_consume = impl_kombu.DirectConsumer.consume
+ self.stubs.Set(impl_kombu.DirectConsumer, 'consume',
+ my_DirectConsumer_consume)
+ self.stubs.Set(time, 'sleep', self.my_time_sleep)
+
+ value = 42
+ result = self.rpc.call(FLAGS, self.context, self.topic,
+ {"method": "echo",
+ "args": {"value": value}})
+ self.assertEqual(value, result)
+
class RpcKombuHATestCase(utils.BaseTestCase):
def setUp(self):
diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py
index 0bad387..5d51a4b 100644
--- a/tests/unit/rpc/test_qpid.py
+++ b/tests/unit/rpc/test_qpid.py
@@ -26,6 +26,7 @@ eventlet.monkey_patch()
import fixtures
import mox
from oslo.config import cfg
+import time
import uuid
from openstack.common import context
@@ -218,7 +219,7 @@ class RpcQpidTestCase(utils.BaseTestCase):
)
connection.close()
- def test_topic_consumer(self):
+ def test_topic_consumer(self, consume_thread_exc=False):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
@@ -235,6 +236,9 @@ class RpcQpidTestCase(utils.BaseTestCase):
self.mock_session.receiver(expected_address).AndReturn(
self.mock_receiver)
self.mock_receiver.capacity = 1
+ if consume_thread_exc:
+ self.mock_session.next_receiver(timeout=None).AndRaise(
+ Exception('unexpected exception'))
self.mock_connection.close()
self.mox.ReplayAll()
@@ -244,8 +248,14 @@ class RpcQpidTestCase(utils.BaseTestCase):
lambda *_x, **_y: None,
queue_name='impl.qpid.test.workers',
exchange_name='foobar')
+ if consume_thread_exc:
+ connection.consume_in_thread()
+ time.sleep(0)
connection.close()
+ def test_consume_thread_exception(self):
+ self.test_topic_consumer(consume_thread_exc=True)
+
def _test_cast(self, fanout, server_params=None):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
@@ -338,7 +348,11 @@ class RpcQpidTestCase(utils.BaseTestCase):
self._setup_to_server_tests(server_params)
self._test_cast(fanout=True, server_params=server_params)
+ def my_time_sleep(self, arg):
+ pass
+
def _test_call_mock_common(self):
+ self.stubs.Set(time, 'sleep', self.my_time_sleep)
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
self.mock_sender = self.mox.CreateMock(self.orig_sender)
@@ -367,9 +381,12 @@ class RpcQpidTestCase(utils.BaseTestCase):
self.mock_session.close()
self.mock_connection.session().AndReturn(self.mock_session)
- def _test_call(self, multi):
+ def _test_call(self, multi, reply_proxy_exc):
self._test_call_mock_common()
+ if reply_proxy_exc:
+ self.mock_session.next_receiver(timeout=None).AndRaise(
+ Exception('unexpected exception'))
self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
@@ -393,6 +410,9 @@ class RpcQpidTestCase(utils.BaseTestCase):
"failure": False,
"ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
+ if reply_proxy_exc:
+ self.mock_session.next_receiver(timeout=None).AndRaise(
+ Exception('unexpected exception'))
self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
@@ -425,7 +445,10 @@ class RpcQpidTestCase(utils.BaseTestCase):
self.uuid4 = uuid.uuid4()
def test_call(self):
- self._test_call(multi=False)
+ self._test_call(multi=False, reply_proxy_exc=False)
+
+ def test_replyproxy_consume_thread_unexpected_exceptions(self):
+ self._test_call(multi=False, reply_proxy_exc=True)
def _test_call_with_timeout(self, timeout, expect_failure):
self._test_call_mock_common()
@@ -483,7 +506,7 @@ class RpcQpidTestCase(utils.BaseTestCase):
self._test_call_with_timeout(timeout=0.1, expect_failure=True)
def test_multicall(self):
- self._test_call(multi=True)
+ self._test_call(multi=True, reply_proxy_exc=False)
def _test_publisher(self, message=True):
"""Test that messages containing long strings are correctly serialized
diff --git a/tests/unit/test_excutils.py b/tests/unit/test_excutils.py
index 8c8137a..b8f9b96 100644
--- a/tests/unit/test_excutils.py
+++ b/tests/unit/test_excutils.py
@@ -14,6 +14,10 @@
# License for the specific language governing permissions and limitations
# under the License.
+import logging
+import mox
+import time
+
from openstack.common import excutils
from tests import utils
@@ -47,3 +51,110 @@ class SaveAndReraiseTest(utils.BaseTestCase):
e = _e
self.assertEqual(str(e), msg)
+
+
+class ForeverRetryUncaughtExceptionsTest(utils.BaseTestCase):
+
+ @excutils.forever_retry_uncaught_exceptions
+ def exception_generator(self):
+ exc = self.exception_to_raise()
+ while exc is not None:
+ raise exc
+ exc = self.exception_to_raise()
+
+ def exception_to_raise(self):
+ return None
+
+ def my_time_sleep(self, arg):
+ pass
+
+ def exc_retrier_common_start(self):
+ self.stubs.Set(time, 'sleep', self.my_time_sleep)
+ self.mox.StubOutWithMock(logging, 'exception')
+ self.mox.StubOutWithMock(time, 'time')
+ self.mox.StubOutWithMock(self, 'exception_to_raise')
+
+ def exc_retrier_sequence(self, exc_id=None, timestamp=None,
+ exc_count=None):
+ self.exception_to_raise().AndReturn(
+ Exception('unexpected %d' % exc_id))
+ time.time().AndReturn(timestamp)
+ if exc_count != 0:
+ logging.exception(mox.In(
+ 'Unexpected exception occurred %d time(s)' % exc_count))
+
+ def exc_retrier_common_end(self):
+ self.exception_to_raise().AndReturn(None)
+ self.mox.ReplayAll()
+ self.exception_generator()
+ self.addCleanup(self.stubs.UnsetAll)
+
+ def test_exc_retrier_1exc_gives_1log(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_same_10exc_1min_gives_1log(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ # By design, the following exception don't get logged because they
+ # are within the same minute.
+ for i in range(2, 11):
+ self.exc_retrier_sequence(exc_id=1, timestamp=i, exc_count=0)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_same_2exc_2min_gives_2logs(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ self.exc_retrier_sequence(exc_id=1, timestamp=65, exc_count=1)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_same_10exc_2min_gives_2logs(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ self.exc_retrier_sequence(exc_id=1, timestamp=12, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=23, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=34, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=45, exc_count=0)
+ # The previous 4 exceptions are counted here
+ self.exc_retrier_sequence(exc_id=1, timestamp=106, exc_count=5)
+ # Again, the following are not logged due to being within
+ # the same minute
+ self.exc_retrier_sequence(exc_id=1, timestamp=117, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=128, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=139, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=150, exc_count=0)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_mixed_4exc_1min_gives_2logs(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ # By design, this second 'unexpected 1' exception is not counted. This
+ # is likely a rare thing and is a sacrifice for code simplicity.
+ self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0)
+ self.exc_retrier_sequence(exc_id=2, timestamp=20, exc_count=1)
+ # Again, trailing exceptions within a minute are not counted.
+ self.exc_retrier_sequence(exc_id=2, timestamp=30, exc_count=0)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_mixed_4exc_2min_gives_2logs(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ # Again, this second exception of the same type is not counted
+ # for the sake of code simplicity.
+ self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0)
+ # The difference between this and the previous case is the log
+ # is also triggered by more than a minute expiring.
+ self.exc_retrier_sequence(exc_id=2, timestamp=100, exc_count=1)
+ self.exc_retrier_sequence(exc_id=2, timestamp=110, exc_count=0)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_mixed_4exc_2min_gives_3logs(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ # This time the second 'unexpected 1' exception is counted due
+ # to the same exception occurring same when the minute expires.
+ self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=100, exc_count=2)
+ self.exc_retrier_sequence(exc_id=2, timestamp=110, exc_count=1)
+ self.exc_retrier_common_end()