diff options
author | Raymond Pekowski <pekowski@gmail.com> | 2013-06-25 04:24:05 +0000 |
---|---|---|
committer | Raymond Pekowski <pekowski@gmail.com> | 2013-06-25 21:49:50 +0000 |
commit | 22ec8ff616a799085239e3e529daeeefea6366c4 (patch) | |
tree | f7a0802d0ef438dcbb31ff62c0fbea43d0b269b2 /openstack | |
parent | bc10b7aeada4ea66e85e10b94b5c219e3c8d2e77 (diff) | |
download | oslo-22ec8ff616a799085239e3e529daeeefea6366c4.tar.gz oslo-22ec8ff616a799085239e3e529daeeefea6366c4.tar.xz oslo-22ec8ff616a799085239e3e529daeeefea6366c4.zip |
Make AMQP based RPC consumer threads more robust
bug 1189711 Should RPC consume_in_thread() be more fault tolerant?
There are unprotected holes in the thread kicked off by RPC
consume_in_thread such that an exception will kill the thread.
This exists for both the service (TopicConsumer) and the new
reply proxy (DirectConsumer) consumers. This patch plugs
those holes as close to the base of the consumer thread as
possible by catching all non-caught exceptions and retrying
with sleeps between retries and some pacing of the log
output to prevent log flooding.
Change-Id: I0d6ec8a5e3a310314da201656ee862bb40b41616
Diffstat (limited to 'openstack')
-rw-r--r-- | openstack/common/excutils.py | 31 | ||||
-rw-r--r-- | openstack/common/rpc/impl_kombu.py | 2 | ||||
-rw-r--r-- | openstack/common/rpc/impl_qpid.py | 2 |
3 files changed, 35 insertions, 0 deletions
diff --git a/openstack/common/excutils.py b/openstack/common/excutils.py index 06d6e29..d40d46c 100644 --- a/openstack/common/excutils.py +++ b/openstack/common/excutils.py @@ -22,6 +22,7 @@ Exception related utilities. import contextlib import logging import sys +import time import traceback from openstack.common.gettextutils import _ @@ -49,3 +50,33 @@ def save_and_reraise_exception(): traceback.format_exception(type_, value, tb)) raise raise type_, value, tb + + +def forever_retry_uncaught_exceptions(infunc): + def inner_func(*args, **kwargs): + last_log_time = 0 + last_exc_message = None + exc_count = 0 + while True: + try: + return infunc(*args, **kwargs) + except Exception as exc: + if exc.message == last_exc_message: + exc_count += 1 + else: + exc_count = 1 + # Do not log any more frequently than once a minute unless + # the exception message changes + cur_time = int(time.time()) + if (cur_time - last_log_time > 60 or + exc.message != last_exc_message): + logging.exception( + _('Unexpected exception occurred %d time(s)... ' + 'retrying.') % exc_count) + last_log_time = cur_time + last_exc_message = exc.message + exc_count = 0 + # This should be a very rare event. In case it isn't, do + # a sleep. + time.sleep(1) + return inner_func diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 716b120..8fb3504 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -30,6 +30,7 @@ import kombu.entity import kombu.messaging from oslo.config import cfg +from openstack.common import excutils from openstack.common.gettextutils import _ from openstack.common import network_utils from openstack.common.rpc import amqp as rpc_amqp @@ -748,6 +749,7 @@ class Connection(object): def consume_in_thread(self): """Consumer from all queues/consumers in a greenthread.""" + @excutils.forever_retry_uncaught_exceptions def _consumer_thread(): try: self.consume() diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index 46dcb6a..6fa9be4 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -24,6 +24,7 @@ import eventlet import greenlet from oslo.config import cfg +from openstack.common import excutils from openstack.common.gettextutils import _ from openstack.common import importutils from openstack.common import jsonutils @@ -576,6 +577,7 @@ class Connection(object): def consume_in_thread(self): """Consumer from all queues/consumers in a greenthread.""" + @excutils.forever_retry_uncaught_exceptions def _consumer_thread(): try: self.consume() |