summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-06-26 11:40:23 +0000
committerGerrit Code Review <review@openstack.org>2013-06-26 11:40:23 +0000
commit891287451754ac0394f1b8fa31a0505a304dccc2 (patch)
tree9fa50b44acbf26731cee1e2c520bc7c5f2451c6c
parent18dc396748114e5d8af0a9f84ba944e532815b65 (diff)
parent22ec8ff616a799085239e3e529daeeefea6366c4 (diff)
downloadoslo-891287451754ac0394f1b8fa31a0505a304dccc2.zip
oslo-891287451754ac0394f1b8fa31a0505a304dccc2.tar.gz
oslo-891287451754ac0394f1b8fa31a0505a304dccc2.tar.xz
Merge "Make AMQP based RPC consumer threads more robust"
-rw-r--r--openstack/common/excutils.py31
-rw-r--r--openstack/common/rpc/impl_kombu.py2
-rw-r--r--openstack/common/rpc/impl_qpid.py2
-rw-r--r--tests/unit/rpc/test_kombu.py49
-rw-r--r--tests/unit/rpc/test_qpid.py31
-rw-r--r--tests/unit/test_excutils.py111
6 files changed, 222 insertions, 4 deletions
diff --git a/openstack/common/excutils.py b/openstack/common/excutils.py
index 06d6e29..d40d46c 100644
--- a/openstack/common/excutils.py
+++ b/openstack/common/excutils.py
@@ -22,6 +22,7 @@ Exception related utilities.
import contextlib
import logging
import sys
+import time
import traceback
from openstack.common.gettextutils import _
@@ -49,3 +50,33 @@ def save_and_reraise_exception():
traceback.format_exception(type_, value, tb))
raise
raise type_, value, tb
+
+
+def forever_retry_uncaught_exceptions(infunc):
+ def inner_func(*args, **kwargs):
+ last_log_time = 0
+ last_exc_message = None
+ exc_count = 0
+ while True:
+ try:
+ return infunc(*args, **kwargs)
+ except Exception as exc:
+ if exc.message == last_exc_message:
+ exc_count += 1
+ else:
+ exc_count = 1
+ # Do not log any more frequently than once a minute unless
+ # the exception message changes
+ cur_time = int(time.time())
+ if (cur_time - last_log_time > 60 or
+ exc.message != last_exc_message):
+ logging.exception(
+ _('Unexpected exception occurred %d time(s)... '
+ 'retrying.') % exc_count)
+ last_log_time = cur_time
+ last_exc_message = exc.message
+ exc_count = 0
+ # This should be a very rare event. In case it isn't, do
+ # a sleep.
+ time.sleep(1)
+ return inner_func
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index 716b120..8fb3504 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -30,6 +30,7 @@ import kombu.entity
import kombu.messaging
from oslo.config import cfg
+from openstack.common import excutils
from openstack.common.gettextutils import _
from openstack.common import network_utils
from openstack.common.rpc import amqp as rpc_amqp
@@ -748,6 +749,7 @@ class Connection(object):
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread."""
+ @excutils.forever_retry_uncaught_exceptions
def _consumer_thread():
try:
self.consume()
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index 46dcb6a..6fa9be4 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -24,6 +24,7 @@ import eventlet
import greenlet
from oslo.config import cfg
+from openstack.common import excutils
from openstack.common.gettextutils import _
from openstack.common import importutils
from openstack.common import jsonutils
@@ -576,6 +577,7 @@ class Connection(object):
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread."""
+ @excutils.forever_retry_uncaught_exceptions
def _consumer_thread():
try:
self.consume()
diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py
index 3debe52..54f8389 100644
--- a/tests/unit/rpc/test_kombu.py
+++ b/tests/unit/rpc/test_kombu.py
@@ -29,6 +29,7 @@ import logging
import mock
from oslo.config import cfg
import six
+import time
from openstack.common import exception
from openstack.common.rpc import amqp as rpc_amqp
@@ -651,6 +652,54 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase):
ack_on_error=False,
)
+ # used to make unexpected exception tests run faster
+ def my_time_sleep(self, sleep_time):
+ return
+
+ def test_service_consume_thread_unexpected_exceptions(self):
+
+ def my_TopicConsumer_consume(myself, *args, **kwargs):
+ self.consume_calls += 1
+ # see if it can sustain three failures
+ if self.consume_calls < 3:
+ raise Exception('unexpected exception')
+ else:
+ self.orig_TopicConsumer_consume(myself, *args, **kwargs)
+
+ self.consume_calls = 0
+ self.orig_TopicConsumer_consume = impl_kombu.TopicConsumer.consume
+ self.stubs.Set(impl_kombu.TopicConsumer, 'consume',
+ my_TopicConsumer_consume)
+ self.stubs.Set(time, 'sleep', self.my_time_sleep)
+
+ value = 42
+ result = self.rpc.call(FLAGS, self.context, self.topic,
+ {"method": "echo",
+ "args": {"value": value}})
+ self.assertEqual(value, result)
+
+ def test_replyproxy_consume_thread_unexpected_exceptions(self):
+
+ def my_DirectConsumer_consume(myself, *args, **kwargs):
+ self.consume_calls += 1
+ # see if it can sustain three failures
+ if self.consume_calls < 3:
+ raise Exception('unexpected exception')
+ else:
+ self.orig_DirectConsumer_consume(myself, *args, **kwargs)
+
+ self.consume_calls = 1
+ self.orig_DirectConsumer_consume = impl_kombu.DirectConsumer.consume
+ self.stubs.Set(impl_kombu.DirectConsumer, 'consume',
+ my_DirectConsumer_consume)
+ self.stubs.Set(time, 'sleep', self.my_time_sleep)
+
+ value = 42
+ result = self.rpc.call(FLAGS, self.context, self.topic,
+ {"method": "echo",
+ "args": {"value": value}})
+ self.assertEqual(value, result)
+
class RpcKombuHATestCase(utils.BaseTestCase):
def setUp(self):
diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py
index 0bad387..5d51a4b 100644
--- a/tests/unit/rpc/test_qpid.py
+++ b/tests/unit/rpc/test_qpid.py
@@ -26,6 +26,7 @@ eventlet.monkey_patch()
import fixtures
import mox
from oslo.config import cfg
+import time
import uuid
from openstack.common import context
@@ -218,7 +219,7 @@ class RpcQpidTestCase(utils.BaseTestCase):
)
connection.close()
- def test_topic_consumer(self):
+ def test_topic_consumer(self, consume_thread_exc=False):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
@@ -235,6 +236,9 @@ class RpcQpidTestCase(utils.BaseTestCase):
self.mock_session.receiver(expected_address).AndReturn(
self.mock_receiver)
self.mock_receiver.capacity = 1
+ if consume_thread_exc:
+ self.mock_session.next_receiver(timeout=None).AndRaise(
+ Exception('unexpected exception'))
self.mock_connection.close()
self.mox.ReplayAll()
@@ -244,8 +248,14 @@ class RpcQpidTestCase(utils.BaseTestCase):
lambda *_x, **_y: None,
queue_name='impl.qpid.test.workers',
exchange_name='foobar')
+ if consume_thread_exc:
+ connection.consume_in_thread()
+ time.sleep(0)
connection.close()
+ def test_consume_thread_exception(self):
+ self.test_topic_consumer(consume_thread_exc=True)
+
def _test_cast(self, fanout, server_params=None):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
@@ -338,7 +348,11 @@ class RpcQpidTestCase(utils.BaseTestCase):
self._setup_to_server_tests(server_params)
self._test_cast(fanout=True, server_params=server_params)
+ def my_time_sleep(self, arg):
+ pass
+
def _test_call_mock_common(self):
+ self.stubs.Set(time, 'sleep', self.my_time_sleep)
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
self.mock_sender = self.mox.CreateMock(self.orig_sender)
@@ -367,9 +381,12 @@ class RpcQpidTestCase(utils.BaseTestCase):
self.mock_session.close()
self.mock_connection.session().AndReturn(self.mock_session)
- def _test_call(self, multi):
+ def _test_call(self, multi, reply_proxy_exc):
self._test_call_mock_common()
+ if reply_proxy_exc:
+ self.mock_session.next_receiver(timeout=None).AndRaise(
+ Exception('unexpected exception'))
self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
@@ -393,6 +410,9 @@ class RpcQpidTestCase(utils.BaseTestCase):
"failure": False,
"ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
+ if reply_proxy_exc:
+ self.mock_session.next_receiver(timeout=None).AndRaise(
+ Exception('unexpected exception'))
self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
@@ -425,7 +445,10 @@ class RpcQpidTestCase(utils.BaseTestCase):
self.uuid4 = uuid.uuid4()
def test_call(self):
- self._test_call(multi=False)
+ self._test_call(multi=False, reply_proxy_exc=False)
+
+ def test_replyproxy_consume_thread_unexpected_exceptions(self):
+ self._test_call(multi=False, reply_proxy_exc=True)
def _test_call_with_timeout(self, timeout, expect_failure):
self._test_call_mock_common()
@@ -483,7 +506,7 @@ class RpcQpidTestCase(utils.BaseTestCase):
self._test_call_with_timeout(timeout=0.1, expect_failure=True)
def test_multicall(self):
- self._test_call(multi=True)
+ self._test_call(multi=True, reply_proxy_exc=False)
def _test_publisher(self, message=True):
"""Test that messages containing long strings are correctly serialized
diff --git a/tests/unit/test_excutils.py b/tests/unit/test_excutils.py
index 8c8137a..b8f9b96 100644
--- a/tests/unit/test_excutils.py
+++ b/tests/unit/test_excutils.py
@@ -14,6 +14,10 @@
# License for the specific language governing permissions and limitations
# under the License.
+import logging
+import mox
+import time
+
from openstack.common import excutils
from tests import utils
@@ -47,3 +51,110 @@ class SaveAndReraiseTest(utils.BaseTestCase):
e = _e
self.assertEqual(str(e), msg)
+
+
+class ForeverRetryUncaughtExceptionsTest(utils.BaseTestCase):
+
+ @excutils.forever_retry_uncaught_exceptions
+ def exception_generator(self):
+ exc = self.exception_to_raise()
+ while exc is not None:
+ raise exc
+ exc = self.exception_to_raise()
+
+ def exception_to_raise(self):
+ return None
+
+ def my_time_sleep(self, arg):
+ pass
+
+ def exc_retrier_common_start(self):
+ self.stubs.Set(time, 'sleep', self.my_time_sleep)
+ self.mox.StubOutWithMock(logging, 'exception')
+ self.mox.StubOutWithMock(time, 'time')
+ self.mox.StubOutWithMock(self, 'exception_to_raise')
+
+ def exc_retrier_sequence(self, exc_id=None, timestamp=None,
+ exc_count=None):
+ self.exception_to_raise().AndReturn(
+ Exception('unexpected %d' % exc_id))
+ time.time().AndReturn(timestamp)
+ if exc_count != 0:
+ logging.exception(mox.In(
+ 'Unexpected exception occurred %d time(s)' % exc_count))
+
+ def exc_retrier_common_end(self):
+ self.exception_to_raise().AndReturn(None)
+ self.mox.ReplayAll()
+ self.exception_generator()
+ self.addCleanup(self.stubs.UnsetAll)
+
+ def test_exc_retrier_1exc_gives_1log(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_same_10exc_1min_gives_1log(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ # By design, the following exception don't get logged because they
+ # are within the same minute.
+ for i in range(2, 11):
+ self.exc_retrier_sequence(exc_id=1, timestamp=i, exc_count=0)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_same_2exc_2min_gives_2logs(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ self.exc_retrier_sequence(exc_id=1, timestamp=65, exc_count=1)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_same_10exc_2min_gives_2logs(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ self.exc_retrier_sequence(exc_id=1, timestamp=12, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=23, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=34, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=45, exc_count=0)
+ # The previous 4 exceptions are counted here
+ self.exc_retrier_sequence(exc_id=1, timestamp=106, exc_count=5)
+ # Again, the following are not logged due to being within
+ # the same minute
+ self.exc_retrier_sequence(exc_id=1, timestamp=117, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=128, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=139, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=150, exc_count=0)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_mixed_4exc_1min_gives_2logs(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ # By design, this second 'unexpected 1' exception is not counted. This
+ # is likely a rare thing and is a sacrifice for code simplicity.
+ self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0)
+ self.exc_retrier_sequence(exc_id=2, timestamp=20, exc_count=1)
+ # Again, trailing exceptions within a minute are not counted.
+ self.exc_retrier_sequence(exc_id=2, timestamp=30, exc_count=0)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_mixed_4exc_2min_gives_2logs(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ # Again, this second exception of the same type is not counted
+ # for the sake of code simplicity.
+ self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0)
+ # The difference between this and the previous case is the log
+ # is also triggered by more than a minute expiring.
+ self.exc_retrier_sequence(exc_id=2, timestamp=100, exc_count=1)
+ self.exc_retrier_sequence(exc_id=2, timestamp=110, exc_count=0)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_mixed_4exc_2min_gives_3logs(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ # This time the second 'unexpected 1' exception is counted due
+ # to the same exception occurring same when the minute expires.
+ self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=100, exc_count=2)
+ self.exc_retrier_sequence(exc_id=2, timestamp=110, exc_count=1)
+ self.exc_retrier_common_end()