From 5518ad3b80afd4990e0c5462c96256d9155b02b7 Mon Sep 17 00:00:00 2001 From: Brian Elliott Date: Thu, 16 May 2013 13:00:24 +0000 Subject: Add graceful service shutdown support to Launcher Provides support for letting each running service finish in-progress requests and cleanup gracefully before stopping. blueprint graceful-shutdown Change-Id: Idf137620b87417de89eba23f05daf08780dc9dff --- openstack/common/service.py | 70 ++++++++++++++++++++++++++++++++++----------- tests/unit/test_service.py | 27 +++++++++++++++++ 2 files changed, 80 insertions(+), 17 deletions(-) diff --git a/openstack/common/service.py b/openstack/common/service.py index 55e23ed..36cf300 100644 --- a/openstack/common/service.py +++ b/openstack/common/service.py @@ -27,6 +27,7 @@ import sys import time import eventlet +from eventlet import event import logging as std_logging from oslo.config import cfg @@ -51,20 +52,9 @@ class Launcher(object): :returns: None """ - self._services = threadgroup.ThreadGroup() + self.services = Services() self.backdoor_port = eventlet_backdoor.initialize_if_enabled() - @staticmethod - def run_service(service): - """Start and wait for a service to finish. - - :param service: service to run and wait for. - :returns: None - - """ - service.start() - service.wait() - def launch_service(self, service): """Load and start the given service. @@ -73,7 +63,7 @@ class Launcher(object): """ service.backdoor_port = self.backdoor_port - self._services.add_thread(self.run_service, service) + self.services.add(service) def stop(self): """Stop all services which are currently running. @@ -81,7 +71,7 @@ class Launcher(object): :returns: None """ - self._services.stop() + self.services.stop() def wait(self): """Waits until all services have been stopped, and then returns. @@ -89,7 +79,7 @@ class Launcher(object): :returns: None """ - self._services.wait() + self.services.wait() class SignalExit(SystemExit): @@ -124,9 +114,9 @@ class ServiceLauncher(Launcher): except SystemExit as exc: status = exc.code finally: + self.stop() if rpc: rpc.cleanup() - self.stop() return status @@ -189,7 +179,8 @@ class ProcessLauncher(object): random.seed() launcher = Launcher() - launcher.run_service(service) + launcher.launch_service(service) + launcher.wait() def _start_child(self, wrap): if len(wrap.forktimes) > wrap.workers: @@ -313,15 +304,60 @@ class Service(object): def __init__(self, threads=1000): self.tg = threadgroup.ThreadGroup(threads) + # signal that the service is done shutting itself down: + self._done = event.Event() + def start(self): pass def stop(self): self.tg.stop() + self.tg.wait() + self._done.send() + + def wait(self): + self._done.wait() + + +class Services(object): + + def __init__(self): + self.services = [] + self.tg = threadgroup.ThreadGroup() + self.done = event.Event() + + def add(self, service): + self.services.append(service) + self.tg.add_thread(self.run_service, service, self.done) + + def stop(self): + # wait for graceful shutdown of services: + for service in self.services: + service.stop() + service.wait() + + # each service has performed cleanup, now signal that the run_service + # wrapper threads can now die: + self.done.send() + + # reap threads: + self.tg.stop() def wait(self): self.tg.wait() + @staticmethod + def run_service(service, done): + """Service start wrapper. + + :param service: service to run + :param done: event to wait on until a shutdown is triggered + :returns: None + + """ + service.start() + done.wait() + def launch(service, workers=None): if workers: diff --git a/tests/unit/test_service.py b/tests/unit/test_service.py index 0f93830..20007de 100644 --- a/tests/unit/test_service.py +++ b/tests/unit/test_service.py @@ -31,6 +31,7 @@ import socket import time import traceback +from eventlet import event from oslo.config import cfg from openstack.common import eventlet_backdoor @@ -195,6 +196,20 @@ class ServiceLauncherTest(utils.BaseTestCase): self.assertEqual(os.WEXITSTATUS(status), 0) +class _Service(service.Service): + def __init__(self): + super(_Service, self).__init__() + self.init = event.Event() + self.cleaned_up = False + + def start(self): + self.init.send() + + def stop(self): + self.cleaned_up = True + super(_Service, self).stop() + + class LauncherTest(utils.BaseTestCase): def test_backdoor_port(self): @@ -252,3 +267,15 @@ class LauncherTest(utils.BaseTestCase): svc = service.Service() self.assertRaises(eventlet_backdoor.EventletBackdoorConfigValueError, service.launch, svc) + + def test_graceful_shutdown(self): + # test that services are given a chance to clean up: + svc = _Service() + + launcher = service.launch(svc) + # wait on 'init' so we know the service had time to start: + svc.init.wait() + + launcher.stop() + self.assertTrue(svc.cleaned_up) + self.assertTrue(svc._done.ready()) -- cgit