From 22ec8ff616a799085239e3e529daeeefea6366c4 Mon Sep 17 00:00:00 2001 From: Raymond Pekowski Date: Tue, 25 Jun 2013 04:24:05 +0000 Subject: Make AMQP based RPC consumer threads more robust bug 1189711 Should RPC consume_in_thread() be more fault tolerant? There are unprotected holes in the thread kicked off by RPC consume_in_thread such that an exception will kill the thread. This exists for both the service (TopicConsumer) and the new reply proxy (DirectConsumer) consumers. This patch plugs those holes as close to the base of the consumer thread as possible by catching all non-caught exceptions and retrying with sleeps between retries and some pacing of the log output to prevent log flooding. Change-Id: I0d6ec8a5e3a310314da201656ee862bb40b41616 --- openstack/common/rpc/impl_kombu.py | 2 ++ openstack/common/rpc/impl_qpid.py | 2 ++ 2 files changed, 4 insertions(+) (limited to 'openstack/common/rpc') diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 716b120..8fb3504 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -30,6 +30,7 @@ import kombu.entity import kombu.messaging from oslo.config import cfg +from openstack.common import excutils from openstack.common.gettextutils import _ from openstack.common import network_utils from openstack.common.rpc import amqp as rpc_amqp @@ -748,6 +749,7 @@ class Connection(object): def consume_in_thread(self): """Consumer from all queues/consumers in a greenthread.""" + @excutils.forever_retry_uncaught_exceptions def _consumer_thread(): try: self.consume() diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index 46dcb6a..6fa9be4 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -24,6 +24,7 @@ import eventlet import greenlet from oslo.config import cfg +from openstack.common import excutils from openstack.common.gettextutils import _ from openstack.common import importutils from openstack.common import jsonutils @@ -576,6 +577,7 @@ class Connection(object): def consume_in_thread(self): """Consumer from all queues/consumers in a greenthread.""" + @excutils.forever_retry_uncaught_exceptions def _consumer_thread(): try: self.consume() -- cgit