summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRaymond Pekowski <pekowski@gmail.com>2013-06-25 04:24:05 +0000
committerRaymond Pekowski <pekowski@gmail.com>2013-06-25 21:49:50 +0000
commit22ec8ff616a799085239e3e529daeeefea6366c4 (patch)
treef7a0802d0ef438dcbb31ff62c0fbea43d0b269b2
parentbc10b7aeada4ea66e85e10b94b5c219e3c8d2e77 (diff)
downloadoslo-22ec8ff616a799085239e3e529daeeefea6366c4.tar.gz
oslo-22ec8ff616a799085239e3e529daeeefea6366c4.tar.xz
oslo-22ec8ff616a799085239e3e529daeeefea6366c4.zip
Make AMQP based RPC consumer threads more robust
bug 1189711 Should RPC consume_in_thread() be more fault tolerant? There are unprotected holes in the thread kicked off by RPC consume_in_thread such that an exception will kill the thread. This exists for both the service (TopicConsumer) and the new reply proxy (DirectConsumer) consumers. This patch plugs those holes as close to the base of the consumer thread as possible by catching all non-caught exceptions and retrying with sleeps between retries and some pacing of the log output to prevent log flooding. Change-Id: I0d6ec8a5e3a310314da201656ee862bb40b41616
-rw-r--r--openstack/common/excutils.py31
-rw-r--r--openstack/common/rpc/impl_kombu.py2
-rw-r--r--openstack/common/rpc/impl_qpid.py2
-rw-r--r--tests/unit/rpc/test_kombu.py49
-rw-r--r--tests/unit/rpc/test_qpid.py31
-rw-r--r--tests/unit/test_excutils.py111
6 files changed, 222 insertions, 4 deletions
diff --git a/openstack/common/excutils.py b/openstack/common/excutils.py
index 06d6e29..d40d46c 100644
--- a/openstack/common/excutils.py
+++ b/openstack/common/excutils.py
@@ -22,6 +22,7 @@ Exception related utilities.
import contextlib
import logging
import sys
+import time
import traceback
from openstack.common.gettextutils import _
@@ -49,3 +50,33 @@ def save_and_reraise_exception():
traceback.format_exception(type_, value, tb))
raise
raise type_, value, tb
+
+
+def forever_retry_uncaught_exceptions(infunc):
+ def inner_func(*args, **kwargs):
+ last_log_time = 0
+ last_exc_message = None
+ exc_count = 0
+ while True:
+ try:
+ return infunc(*args, **kwargs)
+ except Exception as exc:
+ if exc.message == last_exc_message:
+ exc_count += 1
+ else:
+ exc_count = 1
+ # Do not log any more frequently than once a minute unless
+ # the exception message changes
+ cur_time = int(time.time())
+ if (cur_time - last_log_time > 60 or
+ exc.message != last_exc_message):
+ logging.exception(
+ _('Unexpected exception occurred %d time(s)... '
+ 'retrying.') % exc_count)
+ last_log_time = cur_time
+ last_exc_message = exc.message
+ exc_count = 0
+ # This should be a very rare event. In case it isn't, do
+ # a sleep.
+ time.sleep(1)
+ return inner_func
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index 716b120..8fb3504 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -30,6 +30,7 @@ import kombu.entity
import kombu.messaging
from oslo.config import cfg
+from openstack.common import excutils
from openstack.common.gettextutils import _
from openstack.common import network_utils
from openstack.common.rpc import amqp as rpc_amqp
@@ -748,6 +749,7 @@ class Connection(object):
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread."""
+ @excutils.forever_retry_uncaught_exceptions
def _consumer_thread():
try:
self.consume()
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index 46dcb6a..6fa9be4 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -24,6 +24,7 @@ import eventlet
import greenlet
from oslo.config import cfg
+from openstack.common import excutils
from openstack.common.gettextutils import _
from openstack.common import importutils
from openstack.common import jsonutils
@@ -576,6 +577,7 @@ class Connection(object):
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread."""
+ @excutils.forever_retry_uncaught_exceptions
def _consumer_thread():
try:
self.consume()
diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py
index ba88078..2476008 100644
--- a/tests/unit/rpc/test_kombu.py
+++ b/tests/unit/rpc/test_kombu.py
@@ -29,6 +29,7 @@ import logging
import mock
from oslo.config import cfg
import six
+import time
from openstack.common import exception
from openstack.common.rpc import amqp as rpc_amqp
@@ -650,6 +651,54 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase):
ack_on_error=False,
)
+ # used to make unexpected exception tests run faster
+ def my_time_sleep(self, sleep_time):
+ return
+
+ def test_service_consume_thread_unexpected_exceptions(self):
+
+ def my_TopicConsumer_consume(myself, *args, **kwargs):
+ self.consume_calls += 1
+ # see if it can sustain three failures
+ if self.consume_calls < 3:
+ raise Exception('unexpected exception')
+ else:
+ self.orig_TopicConsumer_consume(myself, *args, **kwargs)
+
+ self.consume_calls = 0
+ self.orig_TopicConsumer_consume = impl_kombu.TopicConsumer.consume
+ self.stubs.Set(impl_kombu.TopicConsumer, 'consume',
+ my_TopicConsumer_consume)
+ self.stubs.Set(time, 'sleep', self.my_time_sleep)
+
+ value = 42
+ result = self.rpc.call(FLAGS, self.context, self.topic,
+ {"method": "echo",
+ "args": {"value": value}})
+ self.assertEqual(value, result)
+
+ def test_replyproxy_consume_thread_unexpected_exceptions(self):
+
+ def my_DirectConsumer_consume(myself, *args, **kwargs):
+ self.consume_calls += 1
+ # see if it can sustain three failures
+ if self.consume_calls < 3:
+ raise Exception('unexpected exception')
+ else:
+ self.orig_DirectConsumer_consume(myself, *args, **kwargs)
+
+ self.consume_calls = 1
+ self.orig_DirectConsumer_consume = impl_kombu.DirectConsumer.consume
+ self.stubs.Set(impl_kombu.DirectConsumer, 'consume',
+ my_DirectConsumer_consume)
+ self.stubs.Set(time, 'sleep', self.my_time_sleep)
+
+ value = 42
+ result = self.rpc.call(FLAGS, self.context, self.topic,
+ {"method": "echo",
+ "args": {"value": value}})
+ self.assertEqual(value, result)
+
class RpcKombuHATestCase(utils.BaseTestCase):
def setUp(self):
diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py
index 0bad387..5d51a4b 100644
--- a/tests/unit/rpc/test_qpid.py
+++ b/tests/unit/rpc/test_qpid.py
@@ -26,6 +26,7 @@ eventlet.monkey_patch()
import fixtures
import mox
from oslo.config import cfg
+import time
import uuid
from openstack.common import context
@@ -218,7 +219,7 @@ class RpcQpidTestCase(utils.BaseTestCase):
)
connection.close()
- def test_topic_consumer(self):
+ def test_topic_consumer(self, consume_thread_exc=False):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
@@ -235,6 +236,9 @@ class RpcQpidTestCase(utils.BaseTestCase):
self.mock_session.receiver(expected_address).AndReturn(
self.mock_receiver)
self.mock_receiver.capacity = 1
+ if consume_thread_exc:
+ self.mock_session.next_receiver(timeout=None).AndRaise(
+ Exception('unexpected exception'))
self.mock_connection.close()
self.mox.ReplayAll()
@@ -244,8 +248,14 @@ class RpcQpidTestCase(utils.BaseTestCase):
lambda *_x, **_y: None,
queue_name='impl.qpid.test.workers',
exchange_name='foobar')
+ if consume_thread_exc:
+ connection.consume_in_thread()
+ time.sleep(0)
connection.close()
+ def test_consume_thread_exception(self):
+ self.test_topic_consumer(consume_thread_exc=True)
+
def _test_cast(self, fanout, server_params=None):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
@@ -338,7 +348,11 @@ class RpcQpidTestCase(utils.BaseTestCase):
self._setup_to_server_tests(server_params)
self._test_cast(fanout=True, server_params=server_params)
+ def my_time_sleep(self, arg):
+ pass
+
def _test_call_mock_common(self):
+ self.stubs.Set(time, 'sleep', self.my_time_sleep)
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
self.mock_sender = self.mox.CreateMock(self.orig_sender)
@@ -367,9 +381,12 @@ class RpcQpidTestCase(utils.BaseTestCase):
self.mock_session.close()
self.mock_connection.session().AndReturn(self.mock_session)
- def _test_call(self, multi):
+ def _test_call(self, multi, reply_proxy_exc):
self._test_call_mock_common()
+ if reply_proxy_exc:
+ self.mock_session.next_receiver(timeout=None).AndRaise(
+ Exception('unexpected exception'))
self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
@@ -393,6 +410,9 @@ class RpcQpidTestCase(utils.BaseTestCase):
"failure": False,
"ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
+ if reply_proxy_exc:
+ self.mock_session.next_receiver(timeout=None).AndRaise(
+ Exception('unexpected exception'))
self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
@@ -425,7 +445,10 @@ class RpcQpidTestCase(utils.BaseTestCase):
self.uuid4 = uuid.uuid4()
def test_call(self):
- self._test_call(multi=False)
+ self._test_call(multi=False, reply_proxy_exc=False)
+
+ def test_replyproxy_consume_thread_unexpected_exceptions(self):
+ self._test_call(multi=False, reply_proxy_exc=True)
def _test_call_with_timeout(self, timeout, expect_failure):
self._test_call_mock_common()
@@ -483,7 +506,7 @@ class RpcQpidTestCase(utils.BaseTestCase):
self._test_call_with_timeout(timeout=0.1, expect_failure=True)
def test_multicall(self):
- self._test_call(multi=True)
+ self._test_call(multi=True, reply_proxy_exc=False)
def _test_publisher(self, message=True):
"""Test that messages containing long strings are correctly serialized
diff --git a/tests/unit/test_excutils.py b/tests/unit/test_excutils.py
index 8c8137a..b8f9b96 100644
--- a/tests/unit/test_excutils.py
+++ b/tests/unit/test_excutils.py
@@ -14,6 +14,10 @@
# License for the specific language governing permissions and limitations
# under the License.
+import logging
+import mox
+import time
+
from openstack.common import excutils
from tests import utils
@@ -47,3 +51,110 @@ class SaveAndReraiseTest(utils.BaseTestCase):
e = _e
self.assertEqual(str(e), msg)
+
+
+class ForeverRetryUncaughtExceptionsTest(utils.BaseTestCase):
+
+ @excutils.forever_retry_uncaught_exceptions
+ def exception_generator(self):
+ exc = self.exception_to_raise()
+ while exc is not None:
+ raise exc
+ exc = self.exception_to_raise()
+
+ def exception_to_raise(self):
+ return None
+
+ def my_time_sleep(self, arg):
+ pass
+
+ def exc_retrier_common_start(self):
+ self.stubs.Set(time, 'sleep', self.my_time_sleep)
+ self.mox.StubOutWithMock(logging, 'exception')
+ self.mox.StubOutWithMock(time, 'time')
+ self.mox.StubOutWithMock(self, 'exception_to_raise')
+
+ def exc_retrier_sequence(self, exc_id=None, timestamp=None,
+ exc_count=None):
+ self.exception_to_raise().AndReturn(
+ Exception('unexpected %d' % exc_id))
+ time.time().AndReturn(timestamp)
+ if exc_count != 0:
+ logging.exception(mox.In(
+ 'Unexpected exception occurred %d time(s)' % exc_count))
+
+ def exc_retrier_common_end(self):
+ self.exception_to_raise().AndReturn(None)
+ self.mox.ReplayAll()
+ self.exception_generator()
+ self.addCleanup(self.stubs.UnsetAll)
+
+ def test_exc_retrier_1exc_gives_1log(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_same_10exc_1min_gives_1log(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ # By design, the following exception don't get logged because they
+ # are within the same minute.
+ for i in range(2, 11):
+ self.exc_retrier_sequence(exc_id=1, timestamp=i, exc_count=0)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_same_2exc_2min_gives_2logs(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ self.exc_retrier_sequence(exc_id=1, timestamp=65, exc_count=1)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_same_10exc_2min_gives_2logs(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ self.exc_retrier_sequence(exc_id=1, timestamp=12, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=23, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=34, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=45, exc_count=0)
+ # The previous 4 exceptions are counted here
+ self.exc_retrier_sequence(exc_id=1, timestamp=106, exc_count=5)
+ # Again, the following are not logged due to being within
+ # the same minute
+ self.exc_retrier_sequence(exc_id=1, timestamp=117, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=128, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=139, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=150, exc_count=0)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_mixed_4exc_1min_gives_2logs(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ # By design, this second 'unexpected 1' exception is not counted. This
+ # is likely a rare thing and is a sacrifice for code simplicity.
+ self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0)
+ self.exc_retrier_sequence(exc_id=2, timestamp=20, exc_count=1)
+ # Again, trailing exceptions within a minute are not counted.
+ self.exc_retrier_sequence(exc_id=2, timestamp=30, exc_count=0)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_mixed_4exc_2min_gives_2logs(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ # Again, this second exception of the same type is not counted
+ # for the sake of code simplicity.
+ self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0)
+ # The difference between this and the previous case is the log
+ # is also triggered by more than a minute expiring.
+ self.exc_retrier_sequence(exc_id=2, timestamp=100, exc_count=1)
+ self.exc_retrier_sequence(exc_id=2, timestamp=110, exc_count=0)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_mixed_4exc_2min_gives_3logs(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ # This time the second 'unexpected 1' exception is counted due
+ # to the same exception occurring same when the minute expires.
+ self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=100, exc_count=2)
+ self.exc_retrier_sequence(exc_id=2, timestamp=110, exc_count=1)
+ self.exc_retrier_common_end()