diff options
-rw-r--r-- | openstack/common/excutils.py | 38 | ||||
-rw-r--r-- | openstack/common/rpc/impl_kombu.py | 13 | ||||
-rw-r--r-- | openstack/common/service.py | 70 | ||||
-rw-r--r-- | tests/unit/rpc/test_kombu.py | 36 | ||||
-rw-r--r-- | tests/unit/test_excutils.py | 8 | ||||
-rw-r--r-- | tests/unit/test_service.py | 27 |
6 files changed, 149 insertions, 43 deletions
diff --git a/openstack/common/excutils.py b/openstack/common/excutils.py index d40d46c..336e147 100644 --- a/openstack/common/excutils.py +++ b/openstack/common/excutils.py @@ -19,7 +19,6 @@ Exception related utilities. """ -import contextlib import logging import sys import time @@ -28,8 +27,7 @@ import traceback from openstack.common.gettextutils import _ -@contextlib.contextmanager -def save_and_reraise_exception(): +class save_and_reraise_exception(object): """Save current exception, run some code and then re-raise. In some cases the exception context can be cleared, resulting in None @@ -41,15 +39,33 @@ def save_and_reraise_exception(): To work around this, we save the exception state, run handler code, and then re-raise the original exception. If another exception occurs, the saved exception is logged and the new exception is re-raised. - """ - type_, value, tb = sys.exc_info() - try: - yield + + In some cases the caller may not want to re-raise the exception, and + for those circumstances this context provides a reraise flag that + can be used to suppress the exception. For example: + except Exception: - logging.error(_('Original exception being dropped: %s'), - traceback.format_exception(type_, value, tb)) - raise - raise type_, value, tb + with save_and_reraise_exception() as ctxt: + decide_if_need_reraise() + if not should_be_reraised: + ctxt.reraise = False + """ + def __init__(self): + self.reraise = True + + def __enter__(self): + self.type_, self.value, self.tb, = sys.exc_info() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + logging.error(_('Original exception being dropped: %s'), + traceback.format_exception(self.type_, + self.value, + self.tb)) + return False + if self.reraise: + raise self.type_, self.value, self.tb def forever_retry_uncaught_exceptions(infunc): diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 8fb3504..36d2fc5 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -18,7 +18,6 @@ import functools import itertools import socket import ssl -import sys import time import uuid @@ -561,13 +560,11 @@ class Connection(object): log_info.update(params) if self.max_retries and attempt == self.max_retries: - LOG.error(_('Unable to connect to AMQP server on ' - '%(hostname)s:%(port)d after %(max_retries)d ' - 'tries: %(err_str)s') % log_info) - # NOTE(comstud): Copied from original code. There's - # really no better recourse because if this was a queue we - # need to consume on, we have no way to consume anymore. - sys.exit(1) + msg = _('Unable to connect to AMQP server on ' + '%(hostname)s:%(port)d after %(max_retries)d ' + 'tries: %(err_str)s') % log_info + LOG.error(msg) + raise rpc_common.RPCException(msg) if attempt == 1: sleep_time = self.interval_start or 1 diff --git a/openstack/common/service.py b/openstack/common/service.py index 55e23ed..36cf300 100644 --- a/openstack/common/service.py +++ b/openstack/common/service.py @@ -27,6 +27,7 @@ import sys import time import eventlet +from eventlet import event import logging as std_logging from oslo.config import cfg @@ -51,20 +52,9 @@ class Launcher(object): :returns: None """ - self._services = threadgroup.ThreadGroup() + self.services = Services() self.backdoor_port = eventlet_backdoor.initialize_if_enabled() - @staticmethod - def run_service(service): - """Start and wait for a service to finish. - - :param service: service to run and wait for. - :returns: None - - """ - service.start() - service.wait() - def launch_service(self, service): """Load and start the given service. @@ -73,7 +63,7 @@ class Launcher(object): """ service.backdoor_port = self.backdoor_port - self._services.add_thread(self.run_service, service) + self.services.add(service) def stop(self): """Stop all services which are currently running. @@ -81,7 +71,7 @@ class Launcher(object): :returns: None """ - self._services.stop() + self.services.stop() def wait(self): """Waits until all services have been stopped, and then returns. @@ -89,7 +79,7 @@ class Launcher(object): :returns: None """ - self._services.wait() + self.services.wait() class SignalExit(SystemExit): @@ -124,9 +114,9 @@ class ServiceLauncher(Launcher): except SystemExit as exc: status = exc.code finally: + self.stop() if rpc: rpc.cleanup() - self.stop() return status @@ -189,7 +179,8 @@ class ProcessLauncher(object): random.seed() launcher = Launcher() - launcher.run_service(service) + launcher.launch_service(service) + launcher.wait() def _start_child(self, wrap): if len(wrap.forktimes) > wrap.workers: @@ -313,15 +304,60 @@ class Service(object): def __init__(self, threads=1000): self.tg = threadgroup.ThreadGroup(threads) + # signal that the service is done shutting itself down: + self._done = event.Event() + def start(self): pass def stop(self): self.tg.stop() + self.tg.wait() + self._done.send() + + def wait(self): + self._done.wait() + + +class Services(object): + + def __init__(self): + self.services = [] + self.tg = threadgroup.ThreadGroup() + self.done = event.Event() + + def add(self, service): + self.services.append(service) + self.tg.add_thread(self.run_service, service, self.done) + + def stop(self): + # wait for graceful shutdown of services: + for service in self.services: + service.stop() + service.wait() + + # each service has performed cleanup, now signal that the run_service + # wrapper threads can now die: + self.done.send() + + # reap threads: + self.tg.stop() def wait(self): self.tg.wait() + @staticmethod + def run_service(service, done): + """Service start wrapper. + + :param service: service to run + :param done: event to wait on until a shutdown is triggered + :returns: None + + """ + service.start() + done.wait() + def launch(service, workers=None): if workers: diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py index c8c3f32..cbe948d 100644 --- a/tests/unit/rpc/test_kombu.py +++ b/tests/unit/rpc/test_kombu.py @@ -41,6 +41,8 @@ from tests import utils try: import kombu + import kombu.connection + import kombu.entity from openstack.common.rpc import impl_kombu except ImportError: kombu = None @@ -712,6 +714,32 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase): "args": {"value": value}}) self.assertEqual(value, result) + def test_reconnect_max_retries(self): + self.config(rabbit_hosts=[ + 'host1:1234', 'host2:5678', '[::1]:2345', + '[2001:0db8:85a3:0042:0000:8a2e:0370:7334]'], + rabbit_max_retries=2, + rabbit_retry_interval=0.1, + rabbit_retry_backoff=0.1) + + info = {'attempt': 0} + + class MyConnection(kombu.connection.BrokerConnection): + def __init__(self, *args, **params): + super(MyConnection, self).__init__(*args, **params) + info['attempt'] += 1 + + def connect(self): + if info['attempt'] < 3: + # the word timeout is important (see impl_kombu.py:486) + raise Exception('connection timeout') + super(kombu.connection.BrokerConnection, self).connect() + + self.stubs.Set(kombu.connection, 'BrokerConnection', MyConnection) + + self.assertRaises(rpc_common.RPCException, self.rpc.Connection, FLAGS) + self.assertEqual(info['attempt'], 2) + class RpcKombuHATestCase(utils.BaseTestCase): def setUp(self): @@ -758,15 +786,13 @@ class RpcKombuHATestCase(utils.BaseTestCase): ] } - import kombu.connection - class MyConnection(kombu.connection.BrokerConnection): def __init__(myself, *args, **params): super(MyConnection, myself).__init__(*args, **params) self.assertEqual(params, info['params_list'][info['attempt'] % len(info['params_list'])]) - info['attempt'] = info['attempt'] + 1 + info['attempt'] += 1 def connect(myself): if info['attempt'] < 5: @@ -783,8 +809,6 @@ class RpcKombuHATestCase(utils.BaseTestCase): def test_queue_not_declared_ha_if_ha_off(self): self.config(rabbit_ha_queues=False) - import kombu.entity - def my_declare(myself): self.assertEqual(None, (myself.queue_arguments or {}).get('x-ha-policy')) @@ -797,8 +821,6 @@ class RpcKombuHATestCase(utils.BaseTestCase): def test_queue_declared_ha_if_ha_on(self): self.config(rabbit_ha_queues=True) - import kombu.entity - def my_declare(myself): self.assertEqual('all', (myself.queue_arguments or {}).get('x-ha-policy')) diff --git a/tests/unit/test_excutils.py b/tests/unit/test_excutils.py index b8f9b96..1386eaa 100644 --- a/tests/unit/test_excutils.py +++ b/tests/unit/test_excutils.py @@ -52,6 +52,14 @@ class SaveAndReraiseTest(utils.BaseTestCase): self.assertEqual(str(e), msg) + def test_save_and_reraise_exception_no_reraise(self): + """Test that suppressing the reraise works.""" + try: + raise Exception('foo') + except Exception: + with excutils.save_and_reraise_exception() as ctxt: + ctxt.reraise = False + class ForeverRetryUncaughtExceptionsTest(utils.BaseTestCase): diff --git a/tests/unit/test_service.py b/tests/unit/test_service.py index 0f93830..20007de 100644 --- a/tests/unit/test_service.py +++ b/tests/unit/test_service.py @@ -31,6 +31,7 @@ import socket import time import traceback +from eventlet import event from oslo.config import cfg from openstack.common import eventlet_backdoor @@ -195,6 +196,20 @@ class ServiceLauncherTest(utils.BaseTestCase): self.assertEqual(os.WEXITSTATUS(status), 0) +class _Service(service.Service): + def __init__(self): + super(_Service, self).__init__() + self.init = event.Event() + self.cleaned_up = False + + def start(self): + self.init.send() + + def stop(self): + self.cleaned_up = True + super(_Service, self).stop() + + class LauncherTest(utils.BaseTestCase): def test_backdoor_port(self): @@ -252,3 +267,15 @@ class LauncherTest(utils.BaseTestCase): svc = service.Service() self.assertRaises(eventlet_backdoor.EventletBackdoorConfigValueError, service.launch, svc) + + def test_graceful_shutdown(self): + # test that services are given a chance to clean up: + svc = _Service() + + launcher = service.launch(svc) + # wait on 'init' so we know the service had time to start: + svc.init.wait() + + launcher.stop() + self.assertTrue(svc.cleaned_up) + self.assertTrue(svc._done.ready()) |