diff options
author | Jenkins <jenkins@review.openstack.org> | 2013-06-26 11:40:23 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2013-06-26 11:40:23 +0000 |
commit | 891287451754ac0394f1b8fa31a0505a304dccc2 (patch) | |
tree | 9fa50b44acbf26731cee1e2c520bc7c5f2451c6c /openstack | |
parent | 18dc396748114e5d8af0a9f84ba944e532815b65 (diff) | |
parent | 22ec8ff616a799085239e3e529daeeefea6366c4 (diff) | |
download | oslo-891287451754ac0394f1b8fa31a0505a304dccc2.tar.gz oslo-891287451754ac0394f1b8fa31a0505a304dccc2.tar.xz oslo-891287451754ac0394f1b8fa31a0505a304dccc2.zip |
Merge "Make AMQP based RPC consumer threads more robust"
Diffstat (limited to 'openstack')
-rw-r--r-- | openstack/common/excutils.py | 31 | ||||
-rw-r--r-- | openstack/common/rpc/impl_kombu.py | 2 | ||||
-rw-r--r-- | openstack/common/rpc/impl_qpid.py | 2 |
3 files changed, 35 insertions, 0 deletions
diff --git a/openstack/common/excutils.py b/openstack/common/excutils.py index 06d6e29..d40d46c 100644 --- a/openstack/common/excutils.py +++ b/openstack/common/excutils.py @@ -22,6 +22,7 @@ Exception related utilities. import contextlib import logging import sys +import time import traceback from openstack.common.gettextutils import _ @@ -49,3 +50,33 @@ def save_and_reraise_exception(): traceback.format_exception(type_, value, tb)) raise raise type_, value, tb + + +def forever_retry_uncaught_exceptions(infunc): + def inner_func(*args, **kwargs): + last_log_time = 0 + last_exc_message = None + exc_count = 0 + while True: + try: + return infunc(*args, **kwargs) + except Exception as exc: + if exc.message == last_exc_message: + exc_count += 1 + else: + exc_count = 1 + # Do not log any more frequently than once a minute unless + # the exception message changes + cur_time = int(time.time()) + if (cur_time - last_log_time > 60 or + exc.message != last_exc_message): + logging.exception( + _('Unexpected exception occurred %d time(s)... ' + 'retrying.') % exc_count) + last_log_time = cur_time + last_exc_message = exc.message + exc_count = 0 + # This should be a very rare event. In case it isn't, do + # a sleep. + time.sleep(1) + return inner_func diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 716b120..8fb3504 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -30,6 +30,7 @@ import kombu.entity import kombu.messaging from oslo.config import cfg +from openstack.common import excutils from openstack.common.gettextutils import _ from openstack.common import network_utils from openstack.common.rpc import amqp as rpc_amqp @@ -748,6 +749,7 @@ class Connection(object): def consume_in_thread(self): """Consumer from all queues/consumers in a greenthread.""" + @excutils.forever_retry_uncaught_exceptions def _consumer_thread(): try: self.consume() diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index 46dcb6a..6fa9be4 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -24,6 +24,7 @@ import eventlet import greenlet from oslo.config import cfg +from openstack.common import excutils from openstack.common.gettextutils import _ from openstack.common import importutils from openstack.common import jsonutils @@ -576,6 +577,7 @@ class Connection(object): def consume_in_thread(self): """Consumer from all queues/consumers in a greenthread.""" + @excutils.forever_retry_uncaught_exceptions def _consumer_thread(): try: self.consume() |