From 22ec8ff616a799085239e3e529daeeefea6366c4 Mon Sep 17 00:00:00 2001 From: Raymond Pekowski Date: Tue, 25 Jun 2013 04:24:05 +0000 Subject: Make AMQP based RPC consumer threads more robust bug 1189711 Should RPC consume_in_thread() be more fault tolerant? There are unprotected holes in the thread kicked off by RPC consume_in_thread such that an exception will kill the thread. This exists for both the service (TopicConsumer) and the new reply proxy (DirectConsumer) consumers. This patch plugs those holes as close to the base of the consumer thread as possible by catching all non-caught exceptions and retrying with sleeps between retries and some pacing of the log output to prevent log flooding. Change-Id: I0d6ec8a5e3a310314da201656ee862bb40b41616 --- openstack/common/excutils.py | 31 +++++++++++ openstack/common/rpc/impl_kombu.py | 2 + openstack/common/rpc/impl_qpid.py | 2 + tests/unit/rpc/test_kombu.py | 49 ++++++++++++++++ tests/unit/rpc/test_qpid.py | 31 +++++++++-- tests/unit/test_excutils.py | 111 +++++++++++++++++++++++++++++++++++++ 6 files changed, 222 insertions(+), 4 deletions(-) diff --git a/openstack/common/excutils.py b/openstack/common/excutils.py index 06d6e29..d40d46c 100644 --- a/openstack/common/excutils.py +++ b/openstack/common/excutils.py @@ -22,6 +22,7 @@ Exception related utilities. import contextlib import logging import sys +import time import traceback from openstack.common.gettextutils import _ @@ -49,3 +50,33 @@ def save_and_reraise_exception(): traceback.format_exception(type_, value, tb)) raise raise type_, value, tb + + +def forever_retry_uncaught_exceptions(infunc): + def inner_func(*args, **kwargs): + last_log_time = 0 + last_exc_message = None + exc_count = 0 + while True: + try: + return infunc(*args, **kwargs) + except Exception as exc: + if exc.message == last_exc_message: + exc_count += 1 + else: + exc_count = 1 + # Do not log any more frequently than once a minute unless + # the exception message changes + cur_time = int(time.time()) + if (cur_time - last_log_time > 60 or + exc.message != last_exc_message): + logging.exception( + _('Unexpected exception occurred %d time(s)... ' + 'retrying.') % exc_count) + last_log_time = cur_time + last_exc_message = exc.message + exc_count = 0 + # This should be a very rare event. In case it isn't, do + # a sleep. + time.sleep(1) + return inner_func diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 716b120..8fb3504 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -30,6 +30,7 @@ import kombu.entity import kombu.messaging from oslo.config import cfg +from openstack.common import excutils from openstack.common.gettextutils import _ from openstack.common import network_utils from openstack.common.rpc import amqp as rpc_amqp @@ -748,6 +749,7 @@ class Connection(object): def consume_in_thread(self): """Consumer from all queues/consumers in a greenthread.""" + @excutils.forever_retry_uncaught_exceptions def _consumer_thread(): try: self.consume() diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index 46dcb6a..6fa9be4 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -24,6 +24,7 @@ import eventlet import greenlet from oslo.config import cfg +from openstack.common import excutils from openstack.common.gettextutils import _ from openstack.common import importutils from openstack.common import jsonutils @@ -576,6 +577,7 @@ class Connection(object): def consume_in_thread(self): """Consumer from all queues/consumers in a greenthread.""" + @excutils.forever_retry_uncaught_exceptions def _consumer_thread(): try: self.consume() diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py index ba88078..2476008 100644 --- a/tests/unit/rpc/test_kombu.py +++ b/tests/unit/rpc/test_kombu.py @@ -29,6 +29,7 @@ import logging import mock from oslo.config import cfg import six +import time from openstack.common import exception from openstack.common.rpc import amqp as rpc_amqp @@ -650,6 +651,54 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase): ack_on_error=False, ) + # used to make unexpected exception tests run faster + def my_time_sleep(self, sleep_time): + return + + def test_service_consume_thread_unexpected_exceptions(self): + + def my_TopicConsumer_consume(myself, *args, **kwargs): + self.consume_calls += 1 + # see if it can sustain three failures + if self.consume_calls < 3: + raise Exception('unexpected exception') + else: + self.orig_TopicConsumer_consume(myself, *args, **kwargs) + + self.consume_calls = 0 + self.orig_TopicConsumer_consume = impl_kombu.TopicConsumer.consume + self.stubs.Set(impl_kombu.TopicConsumer, 'consume', + my_TopicConsumer_consume) + self.stubs.Set(time, 'sleep', self.my_time_sleep) + + value = 42 + result = self.rpc.call(FLAGS, self.context, self.topic, + {"method": "echo", + "args": {"value": value}}) + self.assertEqual(value, result) + + def test_replyproxy_consume_thread_unexpected_exceptions(self): + + def my_DirectConsumer_consume(myself, *args, **kwargs): + self.consume_calls += 1 + # see if it can sustain three failures + if self.consume_calls < 3: + raise Exception('unexpected exception') + else: + self.orig_DirectConsumer_consume(myself, *args, **kwargs) + + self.consume_calls = 1 + self.orig_DirectConsumer_consume = impl_kombu.DirectConsumer.consume + self.stubs.Set(impl_kombu.DirectConsumer, 'consume', + my_DirectConsumer_consume) + self.stubs.Set(time, 'sleep', self.my_time_sleep) + + value = 42 + result = self.rpc.call(FLAGS, self.context, self.topic, + {"method": "echo", + "args": {"value": value}}) + self.assertEqual(value, result) + class RpcKombuHATestCase(utils.BaseTestCase): def setUp(self): diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py index 0bad387..5d51a4b 100644 --- a/tests/unit/rpc/test_qpid.py +++ b/tests/unit/rpc/test_qpid.py @@ -26,6 +26,7 @@ eventlet.monkey_patch() import fixtures import mox from oslo.config import cfg +import time import uuid from openstack.common import context @@ -218,7 +219,7 @@ class RpcQpidTestCase(utils.BaseTestCase): ) connection.close() - def test_topic_consumer(self): + def test_topic_consumer(self, consume_thread_exc=False): self.mock_connection = self.mox.CreateMock(self.orig_connection) self.mock_session = self.mox.CreateMock(self.orig_session) self.mock_receiver = self.mox.CreateMock(self.orig_receiver) @@ -235,6 +236,9 @@ class RpcQpidTestCase(utils.BaseTestCase): self.mock_session.receiver(expected_address).AndReturn( self.mock_receiver) self.mock_receiver.capacity = 1 + if consume_thread_exc: + self.mock_session.next_receiver(timeout=None).AndRaise( + Exception('unexpected exception')) self.mock_connection.close() self.mox.ReplayAll() @@ -244,8 +248,14 @@ class RpcQpidTestCase(utils.BaseTestCase): lambda *_x, **_y: None, queue_name='impl.qpid.test.workers', exchange_name='foobar') + if consume_thread_exc: + connection.consume_in_thread() + time.sleep(0) connection.close() + def test_consume_thread_exception(self): + self.test_topic_consumer(consume_thread_exc=True) + def _test_cast(self, fanout, server_params=None): self.mock_connection = self.mox.CreateMock(self.orig_connection) self.mock_session = self.mox.CreateMock(self.orig_session) @@ -338,7 +348,11 @@ class RpcQpidTestCase(utils.BaseTestCase): self._setup_to_server_tests(server_params) self._test_cast(fanout=True, server_params=server_params) + def my_time_sleep(self, arg): + pass + def _test_call_mock_common(self): + self.stubs.Set(time, 'sleep', self.my_time_sleep) self.mock_connection = self.mox.CreateMock(self.orig_connection) self.mock_session = self.mox.CreateMock(self.orig_session) self.mock_sender = self.mox.CreateMock(self.orig_sender) @@ -367,9 +381,12 @@ class RpcQpidTestCase(utils.BaseTestCase): self.mock_session.close() self.mock_connection.session().AndReturn(self.mock_session) - def _test_call(self, multi): + def _test_call(self, multi, reply_proxy_exc): self._test_call_mock_common() + if reply_proxy_exc: + self.mock_session.next_receiver(timeout=None).AndRaise( + Exception('unexpected exception')) self.mock_session.next_receiver(timeout=None).AndReturn( self.mock_receiver) self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( @@ -393,6 +410,9 @@ class RpcQpidTestCase(utils.BaseTestCase): "failure": False, "ending": False})) self.mock_session.acknowledge(mox.IgnoreArg()) + if reply_proxy_exc: + self.mock_session.next_receiver(timeout=None).AndRaise( + Exception('unexpected exception')) self.mock_session.next_receiver(timeout=None).AndReturn( self.mock_receiver) self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( @@ -425,7 +445,10 @@ class RpcQpidTestCase(utils.BaseTestCase): self.uuid4 = uuid.uuid4() def test_call(self): - self._test_call(multi=False) + self._test_call(multi=False, reply_proxy_exc=False) + + def test_replyproxy_consume_thread_unexpected_exceptions(self): + self._test_call(multi=False, reply_proxy_exc=True) def _test_call_with_timeout(self, timeout, expect_failure): self._test_call_mock_common() @@ -483,7 +506,7 @@ class RpcQpidTestCase(utils.BaseTestCase): self._test_call_with_timeout(timeout=0.1, expect_failure=True) def test_multicall(self): - self._test_call(multi=True) + self._test_call(multi=True, reply_proxy_exc=False) def _test_publisher(self, message=True): """Test that messages containing long strings are correctly serialized diff --git a/tests/unit/test_excutils.py b/tests/unit/test_excutils.py index 8c8137a..b8f9b96 100644 --- a/tests/unit/test_excutils.py +++ b/tests/unit/test_excutils.py @@ -14,6 +14,10 @@ # License for the specific language governing permissions and limitations # under the License. +import logging +import mox +import time + from openstack.common import excutils from tests import utils @@ -47,3 +51,110 @@ class SaveAndReraiseTest(utils.BaseTestCase): e = _e self.assertEqual(str(e), msg) + + +class ForeverRetryUncaughtExceptionsTest(utils.BaseTestCase): + + @excutils.forever_retry_uncaught_exceptions + def exception_generator(self): + exc = self.exception_to_raise() + while exc is not None: + raise exc + exc = self.exception_to_raise() + + def exception_to_raise(self): + return None + + def my_time_sleep(self, arg): + pass + + def exc_retrier_common_start(self): + self.stubs.Set(time, 'sleep', self.my_time_sleep) + self.mox.StubOutWithMock(logging, 'exception') + self.mox.StubOutWithMock(time, 'time') + self.mox.StubOutWithMock(self, 'exception_to_raise') + + def exc_retrier_sequence(self, exc_id=None, timestamp=None, + exc_count=None): + self.exception_to_raise().AndReturn( + Exception('unexpected %d' % exc_id)) + time.time().AndReturn(timestamp) + if exc_count != 0: + logging.exception(mox.In( + 'Unexpected exception occurred %d time(s)' % exc_count)) + + def exc_retrier_common_end(self): + self.exception_to_raise().AndReturn(None) + self.mox.ReplayAll() + self.exception_generator() + self.addCleanup(self.stubs.UnsetAll) + + def test_exc_retrier_1exc_gives_1log(self): + self.exc_retrier_common_start() + self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1) + self.exc_retrier_common_end() + + def test_exc_retrier_same_10exc_1min_gives_1log(self): + self.exc_retrier_common_start() + self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1) + # By design, the following exception don't get logged because they + # are within the same minute. + for i in range(2, 11): + self.exc_retrier_sequence(exc_id=1, timestamp=i, exc_count=0) + self.exc_retrier_common_end() + + def test_exc_retrier_same_2exc_2min_gives_2logs(self): + self.exc_retrier_common_start() + self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1) + self.exc_retrier_sequence(exc_id=1, timestamp=65, exc_count=1) + self.exc_retrier_common_end() + + def test_exc_retrier_same_10exc_2min_gives_2logs(self): + self.exc_retrier_common_start() + self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1) + self.exc_retrier_sequence(exc_id=1, timestamp=12, exc_count=0) + self.exc_retrier_sequence(exc_id=1, timestamp=23, exc_count=0) + self.exc_retrier_sequence(exc_id=1, timestamp=34, exc_count=0) + self.exc_retrier_sequence(exc_id=1, timestamp=45, exc_count=0) + # The previous 4 exceptions are counted here + self.exc_retrier_sequence(exc_id=1, timestamp=106, exc_count=5) + # Again, the following are not logged due to being within + # the same minute + self.exc_retrier_sequence(exc_id=1, timestamp=117, exc_count=0) + self.exc_retrier_sequence(exc_id=1, timestamp=128, exc_count=0) + self.exc_retrier_sequence(exc_id=1, timestamp=139, exc_count=0) + self.exc_retrier_sequence(exc_id=1, timestamp=150, exc_count=0) + self.exc_retrier_common_end() + + def test_exc_retrier_mixed_4exc_1min_gives_2logs(self): + self.exc_retrier_common_start() + self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1) + # By design, this second 'unexpected 1' exception is not counted. This + # is likely a rare thing and is a sacrifice for code simplicity. + self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0) + self.exc_retrier_sequence(exc_id=2, timestamp=20, exc_count=1) + # Again, trailing exceptions within a minute are not counted. + self.exc_retrier_sequence(exc_id=2, timestamp=30, exc_count=0) + self.exc_retrier_common_end() + + def test_exc_retrier_mixed_4exc_2min_gives_2logs(self): + self.exc_retrier_common_start() + self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1) + # Again, this second exception of the same type is not counted + # for the sake of code simplicity. + self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0) + # The difference between this and the previous case is the log + # is also triggered by more than a minute expiring. + self.exc_retrier_sequence(exc_id=2, timestamp=100, exc_count=1) + self.exc_retrier_sequence(exc_id=2, timestamp=110, exc_count=0) + self.exc_retrier_common_end() + + def test_exc_retrier_mixed_4exc_2min_gives_3logs(self): + self.exc_retrier_common_start() + self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1) + # This time the second 'unexpected 1' exception is counted due + # to the same exception occurring same when the minute expires. + self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0) + self.exc_retrier_sequence(exc_id=1, timestamp=100, exc_count=2) + self.exc_retrier_sequence(exc_id=2, timestamp=110, exc_count=1) + self.exc_retrier_common_end() -- cgit