summaryrefslogtreecommitdiffstats
path: root/openstack
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-06-26 11:40:23 +0000
committerGerrit Code Review <review@openstack.org>2013-06-26 11:40:23 +0000
commit891287451754ac0394f1b8fa31a0505a304dccc2 (patch)
tree9fa50b44acbf26731cee1e2c520bc7c5f2451c6c /openstack
parent18dc396748114e5d8af0a9f84ba944e532815b65 (diff)
parent22ec8ff616a799085239e3e529daeeefea6366c4 (diff)
downloadoslo-891287451754ac0394f1b8fa31a0505a304dccc2.tar.gz
oslo-891287451754ac0394f1b8fa31a0505a304dccc2.tar.xz
oslo-891287451754ac0394f1b8fa31a0505a304dccc2.zip
Merge "Make AMQP based RPC consumer threads more robust"
Diffstat (limited to 'openstack')
-rw-r--r--openstack/common/excutils.py31
-rw-r--r--openstack/common/rpc/impl_kombu.py2
-rw-r--r--openstack/common/rpc/impl_qpid.py2
3 files changed, 35 insertions, 0 deletions
diff --git a/openstack/common/excutils.py b/openstack/common/excutils.py
index 06d6e29..d40d46c 100644
--- a/openstack/common/excutils.py
+++ b/openstack/common/excutils.py
@@ -22,6 +22,7 @@ Exception related utilities.
import contextlib
import logging
import sys
+import time
import traceback
from openstack.common.gettextutils import _
@@ -49,3 +50,33 @@ def save_and_reraise_exception():
traceback.format_exception(type_, value, tb))
raise
raise type_, value, tb
+
+
+def forever_retry_uncaught_exceptions(infunc):
+ def inner_func(*args, **kwargs):
+ last_log_time = 0
+ last_exc_message = None
+ exc_count = 0
+ while True:
+ try:
+ return infunc(*args, **kwargs)
+ except Exception as exc:
+ if exc.message == last_exc_message:
+ exc_count += 1
+ else:
+ exc_count = 1
+ # Do not log any more frequently than once a minute unless
+ # the exception message changes
+ cur_time = int(time.time())
+ if (cur_time - last_log_time > 60 or
+ exc.message != last_exc_message):
+ logging.exception(
+ _('Unexpected exception occurred %d time(s)... '
+ 'retrying.') % exc_count)
+ last_log_time = cur_time
+ last_exc_message = exc.message
+ exc_count = 0
+ # This should be a very rare event. In case it isn't, do
+ # a sleep.
+ time.sleep(1)
+ return inner_func
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index 716b120..8fb3504 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -30,6 +30,7 @@ import kombu.entity
import kombu.messaging
from oslo.config import cfg
+from openstack.common import excutils
from openstack.common.gettextutils import _
from openstack.common import network_utils
from openstack.common.rpc import amqp as rpc_amqp
@@ -748,6 +749,7 @@ class Connection(object):
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread."""
+ @excutils.forever_retry_uncaught_exceptions
def _consumer_thread():
try:
self.consume()
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index 46dcb6a..6fa9be4 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -24,6 +24,7 @@ import eventlet
import greenlet
from oslo.config import cfg
+from openstack.common import excutils
from openstack.common.gettextutils import _
from openstack.common import importutils
from openstack.common import jsonutils
@@ -576,6 +577,7 @@ class Connection(object):
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread."""
+ @excutils.forever_retry_uncaught_exceptions
def _consumer_thread():
try:
self.consume()