summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-07-04 14:31:12 +0000
committerGerrit Code Review <review@openstack.org>2013-07-04 14:31:12 +0000
commit6a4800d86ae94123875d510f4f9a7967f09d93cf (patch)
treeea63dcc26e90296b150d11cda5b3cf19be670a1d
parent98d8fda44d8580b4d726708a91614114c32bd521 (diff)
parent5518ad3b80afd4990e0c5462c96256d9155b02b7 (diff)
downloadoslo-6a4800d86ae94123875d510f4f9a7967f09d93cf.tar.gz
oslo-6a4800d86ae94123875d510f4f9a7967f09d93cf.tar.xz
oslo-6a4800d86ae94123875d510f4f9a7967f09d93cf.zip
Merge "Add graceful service shutdown support to Launcher"
-rw-r--r--openstack/common/service.py70
-rw-r--r--tests/unit/test_service.py27
2 files changed, 80 insertions, 17 deletions
diff --git a/openstack/common/service.py b/openstack/common/service.py
index 55e23ed..36cf300 100644
--- a/openstack/common/service.py
+++ b/openstack/common/service.py
@@ -27,6 +27,7 @@ import sys
import time
import eventlet
+from eventlet import event
import logging as std_logging
from oslo.config import cfg
@@ -51,20 +52,9 @@ class Launcher(object):
:returns: None
"""
- self._services = threadgroup.ThreadGroup()
+ self.services = Services()
self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
- @staticmethod
- def run_service(service):
- """Start and wait for a service to finish.
-
- :param service: service to run and wait for.
- :returns: None
-
- """
- service.start()
- service.wait()
-
def launch_service(self, service):
"""Load and start the given service.
@@ -73,7 +63,7 @@ class Launcher(object):
"""
service.backdoor_port = self.backdoor_port
- self._services.add_thread(self.run_service, service)
+ self.services.add(service)
def stop(self):
"""Stop all services which are currently running.
@@ -81,7 +71,7 @@ class Launcher(object):
:returns: None
"""
- self._services.stop()
+ self.services.stop()
def wait(self):
"""Waits until all services have been stopped, and then returns.
@@ -89,7 +79,7 @@ class Launcher(object):
:returns: None
"""
- self._services.wait()
+ self.services.wait()
class SignalExit(SystemExit):
@@ -124,9 +114,9 @@ class ServiceLauncher(Launcher):
except SystemExit as exc:
status = exc.code
finally:
+ self.stop()
if rpc:
rpc.cleanup()
- self.stop()
return status
@@ -189,7 +179,8 @@ class ProcessLauncher(object):
random.seed()
launcher = Launcher()
- launcher.run_service(service)
+ launcher.launch_service(service)
+ launcher.wait()
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
@@ -313,15 +304,60 @@ class Service(object):
def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup(threads)
+ # signal that the service is done shutting itself down:
+ self._done = event.Event()
+
def start(self):
pass
def stop(self):
self.tg.stop()
+ self.tg.wait()
+ self._done.send()
+
+ def wait(self):
+ self._done.wait()
+
+
+class Services(object):
+
+ def __init__(self):
+ self.services = []
+ self.tg = threadgroup.ThreadGroup()
+ self.done = event.Event()
+
+ def add(self, service):
+ self.services.append(service)
+ self.tg.add_thread(self.run_service, service, self.done)
+
+ def stop(self):
+ # wait for graceful shutdown of services:
+ for service in self.services:
+ service.stop()
+ service.wait()
+
+ # each service has performed cleanup, now signal that the run_service
+ # wrapper threads can now die:
+ self.done.send()
+
+ # reap threads:
+ self.tg.stop()
def wait(self):
self.tg.wait()
+ @staticmethod
+ def run_service(service, done):
+ """Service start wrapper.
+
+ :param service: service to run
+ :param done: event to wait on until a shutdown is triggered
+ :returns: None
+
+ """
+ service.start()
+ done.wait()
+
def launch(service, workers=None):
if workers:
diff --git a/tests/unit/test_service.py b/tests/unit/test_service.py
index 0f93830..20007de 100644
--- a/tests/unit/test_service.py
+++ b/tests/unit/test_service.py
@@ -31,6 +31,7 @@ import socket
import time
import traceback
+from eventlet import event
from oslo.config import cfg
from openstack.common import eventlet_backdoor
@@ -195,6 +196,20 @@ class ServiceLauncherTest(utils.BaseTestCase):
self.assertEqual(os.WEXITSTATUS(status), 0)
+class _Service(service.Service):
+ def __init__(self):
+ super(_Service, self).__init__()
+ self.init = event.Event()
+ self.cleaned_up = False
+
+ def start(self):
+ self.init.send()
+
+ def stop(self):
+ self.cleaned_up = True
+ super(_Service, self).stop()
+
+
class LauncherTest(utils.BaseTestCase):
def test_backdoor_port(self):
@@ -252,3 +267,15 @@ class LauncherTest(utils.BaseTestCase):
svc = service.Service()
self.assertRaises(eventlet_backdoor.EventletBackdoorConfigValueError,
service.launch, svc)
+
+ def test_graceful_shutdown(self):
+ # test that services are given a chance to clean up:
+ svc = _Service()
+
+ launcher = service.launch(svc)
+ # wait on 'init' so we know the service had time to start:
+ svc.init.wait()
+
+ launcher.stop()
+ self.assertTrue(svc.cleaned_up)
+ self.assertTrue(svc._done.ready())