summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--openstack/common/excutils.py38
-rw-r--r--openstack/common/rpc/impl_kombu.py13
-rw-r--r--openstack/common/service.py70
-rw-r--r--tests/unit/rpc/test_kombu.py36
-rw-r--r--tests/unit/test_excutils.py8
-rw-r--r--tests/unit/test_service.py27
6 files changed, 149 insertions, 43 deletions
diff --git a/openstack/common/excutils.py b/openstack/common/excutils.py
index d40d46c..336e147 100644
--- a/openstack/common/excutils.py
+++ b/openstack/common/excutils.py
@@ -19,7 +19,6 @@
Exception related utilities.
"""
-import contextlib
import logging
import sys
import time
@@ -28,8 +27,7 @@ import traceback
from openstack.common.gettextutils import _
-@contextlib.contextmanager
-def save_and_reraise_exception():
+class save_and_reraise_exception(object):
"""Save current exception, run some code and then re-raise.
In some cases the exception context can be cleared, resulting in None
@@ -41,15 +39,33 @@ def save_and_reraise_exception():
To work around this, we save the exception state, run handler code, and
then re-raise the original exception. If another exception occurs, the
saved exception is logged and the new exception is re-raised.
- """
- type_, value, tb = sys.exc_info()
- try:
- yield
+
+ In some cases the caller may not want to re-raise the exception, and
+ for those circumstances this context provides a reraise flag that
+ can be used to suppress the exception. For example:
+
except Exception:
- logging.error(_('Original exception being dropped: %s'),
- traceback.format_exception(type_, value, tb))
- raise
- raise type_, value, tb
+ with save_and_reraise_exception() as ctxt:
+ decide_if_need_reraise()
+ if not should_be_reraised:
+ ctxt.reraise = False
+ """
+ def __init__(self):
+ self.reraise = True
+
+ def __enter__(self):
+ self.type_, self.value, self.tb, = sys.exc_info()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if exc_type is not None:
+ logging.error(_('Original exception being dropped: %s'),
+ traceback.format_exception(self.type_,
+ self.value,
+ self.tb))
+ return False
+ if self.reraise:
+ raise self.type_, self.value, self.tb
def forever_retry_uncaught_exceptions(infunc):
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index 8fb3504..36d2fc5 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -18,7 +18,6 @@ import functools
import itertools
import socket
import ssl
-import sys
import time
import uuid
@@ -561,13 +560,11 @@ class Connection(object):
log_info.update(params)
if self.max_retries and attempt == self.max_retries:
- LOG.error(_('Unable to connect to AMQP server on '
- '%(hostname)s:%(port)d after %(max_retries)d '
- 'tries: %(err_str)s') % log_info)
- # NOTE(comstud): Copied from original code. There's
- # really no better recourse because if this was a queue we
- # need to consume on, we have no way to consume anymore.
- sys.exit(1)
+ msg = _('Unable to connect to AMQP server on '
+ '%(hostname)s:%(port)d after %(max_retries)d '
+ 'tries: %(err_str)s') % log_info
+ LOG.error(msg)
+ raise rpc_common.RPCException(msg)
if attempt == 1:
sleep_time = self.interval_start or 1
diff --git a/openstack/common/service.py b/openstack/common/service.py
index 55e23ed..36cf300 100644
--- a/openstack/common/service.py
+++ b/openstack/common/service.py
@@ -27,6 +27,7 @@ import sys
import time
import eventlet
+from eventlet import event
import logging as std_logging
from oslo.config import cfg
@@ -51,20 +52,9 @@ class Launcher(object):
:returns: None
"""
- self._services = threadgroup.ThreadGroup()
+ self.services = Services()
self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
- @staticmethod
- def run_service(service):
- """Start and wait for a service to finish.
-
- :param service: service to run and wait for.
- :returns: None
-
- """
- service.start()
- service.wait()
-
def launch_service(self, service):
"""Load and start the given service.
@@ -73,7 +63,7 @@ class Launcher(object):
"""
service.backdoor_port = self.backdoor_port
- self._services.add_thread(self.run_service, service)
+ self.services.add(service)
def stop(self):
"""Stop all services which are currently running.
@@ -81,7 +71,7 @@ class Launcher(object):
:returns: None
"""
- self._services.stop()
+ self.services.stop()
def wait(self):
"""Waits until all services have been stopped, and then returns.
@@ -89,7 +79,7 @@ class Launcher(object):
:returns: None
"""
- self._services.wait()
+ self.services.wait()
class SignalExit(SystemExit):
@@ -124,9 +114,9 @@ class ServiceLauncher(Launcher):
except SystemExit as exc:
status = exc.code
finally:
+ self.stop()
if rpc:
rpc.cleanup()
- self.stop()
return status
@@ -189,7 +179,8 @@ class ProcessLauncher(object):
random.seed()
launcher = Launcher()
- launcher.run_service(service)
+ launcher.launch_service(service)
+ launcher.wait()
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
@@ -313,15 +304,60 @@ class Service(object):
def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup(threads)
+ # signal that the service is done shutting itself down:
+ self._done = event.Event()
+
def start(self):
pass
def stop(self):
self.tg.stop()
+ self.tg.wait()
+ self._done.send()
+
+ def wait(self):
+ self._done.wait()
+
+
+class Services(object):
+
+ def __init__(self):
+ self.services = []
+ self.tg = threadgroup.ThreadGroup()
+ self.done = event.Event()
+
+ def add(self, service):
+ self.services.append(service)
+ self.tg.add_thread(self.run_service, service, self.done)
+
+ def stop(self):
+ # wait for graceful shutdown of services:
+ for service in self.services:
+ service.stop()
+ service.wait()
+
+ # each service has performed cleanup, now signal that the run_service
+ # wrapper threads can now die:
+ self.done.send()
+
+ # reap threads:
+ self.tg.stop()
def wait(self):
self.tg.wait()
+ @staticmethod
+ def run_service(service, done):
+ """Service start wrapper.
+
+ :param service: service to run
+ :param done: event to wait on until a shutdown is triggered
+ :returns: None
+
+ """
+ service.start()
+ done.wait()
+
def launch(service, workers=None):
if workers:
diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py
index c8c3f32..cbe948d 100644
--- a/tests/unit/rpc/test_kombu.py
+++ b/tests/unit/rpc/test_kombu.py
@@ -41,6 +41,8 @@ from tests import utils
try:
import kombu
+ import kombu.connection
+ import kombu.entity
from openstack.common.rpc import impl_kombu
except ImportError:
kombu = None
@@ -712,6 +714,32 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase):
"args": {"value": value}})
self.assertEqual(value, result)
+ def test_reconnect_max_retries(self):
+ self.config(rabbit_hosts=[
+ 'host1:1234', 'host2:5678', '[::1]:2345',
+ '[2001:0db8:85a3:0042:0000:8a2e:0370:7334]'],
+ rabbit_max_retries=2,
+ rabbit_retry_interval=0.1,
+ rabbit_retry_backoff=0.1)
+
+ info = {'attempt': 0}
+
+ class MyConnection(kombu.connection.BrokerConnection):
+ def __init__(self, *args, **params):
+ super(MyConnection, self).__init__(*args, **params)
+ info['attempt'] += 1
+
+ def connect(self):
+ if info['attempt'] < 3:
+ # the word timeout is important (see impl_kombu.py:486)
+ raise Exception('connection timeout')
+ super(kombu.connection.BrokerConnection, self).connect()
+
+ self.stubs.Set(kombu.connection, 'BrokerConnection', MyConnection)
+
+ self.assertRaises(rpc_common.RPCException, self.rpc.Connection, FLAGS)
+ self.assertEqual(info['attempt'], 2)
+
class RpcKombuHATestCase(utils.BaseTestCase):
def setUp(self):
@@ -758,15 +786,13 @@ class RpcKombuHATestCase(utils.BaseTestCase):
]
}
- import kombu.connection
-
class MyConnection(kombu.connection.BrokerConnection):
def __init__(myself, *args, **params):
super(MyConnection, myself).__init__(*args, **params)
self.assertEqual(params,
info['params_list'][info['attempt'] %
len(info['params_list'])])
- info['attempt'] = info['attempt'] + 1
+ info['attempt'] += 1
def connect(myself):
if info['attempt'] < 5:
@@ -783,8 +809,6 @@ class RpcKombuHATestCase(utils.BaseTestCase):
def test_queue_not_declared_ha_if_ha_off(self):
self.config(rabbit_ha_queues=False)
- import kombu.entity
-
def my_declare(myself):
self.assertEqual(None,
(myself.queue_arguments or {}).get('x-ha-policy'))
@@ -797,8 +821,6 @@ class RpcKombuHATestCase(utils.BaseTestCase):
def test_queue_declared_ha_if_ha_on(self):
self.config(rabbit_ha_queues=True)
- import kombu.entity
-
def my_declare(myself):
self.assertEqual('all',
(myself.queue_arguments or {}).get('x-ha-policy'))
diff --git a/tests/unit/test_excutils.py b/tests/unit/test_excutils.py
index b8f9b96..1386eaa 100644
--- a/tests/unit/test_excutils.py
+++ b/tests/unit/test_excutils.py
@@ -52,6 +52,14 @@ class SaveAndReraiseTest(utils.BaseTestCase):
self.assertEqual(str(e), msg)
+ def test_save_and_reraise_exception_no_reraise(self):
+ """Test that suppressing the reraise works."""
+ try:
+ raise Exception('foo')
+ except Exception:
+ with excutils.save_and_reraise_exception() as ctxt:
+ ctxt.reraise = False
+
class ForeverRetryUncaughtExceptionsTest(utils.BaseTestCase):
diff --git a/tests/unit/test_service.py b/tests/unit/test_service.py
index 0f93830..20007de 100644
--- a/tests/unit/test_service.py
+++ b/tests/unit/test_service.py
@@ -31,6 +31,7 @@ import socket
import time
import traceback
+from eventlet import event
from oslo.config import cfg
from openstack.common import eventlet_backdoor
@@ -195,6 +196,20 @@ class ServiceLauncherTest(utils.BaseTestCase):
self.assertEqual(os.WEXITSTATUS(status), 0)
+class _Service(service.Service):
+ def __init__(self):
+ super(_Service, self).__init__()
+ self.init = event.Event()
+ self.cleaned_up = False
+
+ def start(self):
+ self.init.send()
+
+ def stop(self):
+ self.cleaned_up = True
+ super(_Service, self).stop()
+
+
class LauncherTest(utils.BaseTestCase):
def test_backdoor_port(self):
@@ -252,3 +267,15 @@ class LauncherTest(utils.BaseTestCase):
svc = service.Service()
self.assertRaises(eventlet_backdoor.EventletBackdoorConfigValueError,
service.launch, svc)
+
+ def test_graceful_shutdown(self):
+ # test that services are given a chance to clean up:
+ svc = _Service()
+
+ launcher = service.launch(svc)
+ # wait on 'init' so we know the service had time to start:
+ svc.init.wait()
+
+ launcher.stop()
+ self.assertTrue(svc.cleaned_up)
+ self.assertTrue(svc._done.ready())