summaryrefslogtreecommitdiffstats
path: root/openstack/common/service.py
diff options
context:
space:
mode:
Diffstat (limited to 'openstack/common/service.py')
-rw-r--r--openstack/common/service.py70
1 files changed, 53 insertions, 17 deletions
diff --git a/openstack/common/service.py b/openstack/common/service.py
index 55e23ed..36cf300 100644
--- a/openstack/common/service.py
+++ b/openstack/common/service.py
@@ -27,6 +27,7 @@ import sys
import time
import eventlet
+from eventlet import event
import logging as std_logging
from oslo.config import cfg
@@ -51,20 +52,9 @@ class Launcher(object):
:returns: None
"""
- self._services = threadgroup.ThreadGroup()
+ self.services = Services()
self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
- @staticmethod
- def run_service(service):
- """Start and wait for a service to finish.
-
- :param service: service to run and wait for.
- :returns: None
-
- """
- service.start()
- service.wait()
-
def launch_service(self, service):
"""Load and start the given service.
@@ -73,7 +63,7 @@ class Launcher(object):
"""
service.backdoor_port = self.backdoor_port
- self._services.add_thread(self.run_service, service)
+ self.services.add(service)
def stop(self):
"""Stop all services which are currently running.
@@ -81,7 +71,7 @@ class Launcher(object):
:returns: None
"""
- self._services.stop()
+ self.services.stop()
def wait(self):
"""Waits until all services have been stopped, and then returns.
@@ -89,7 +79,7 @@ class Launcher(object):
:returns: None
"""
- self._services.wait()
+ self.services.wait()
class SignalExit(SystemExit):
@@ -124,9 +114,9 @@ class ServiceLauncher(Launcher):
except SystemExit as exc:
status = exc.code
finally:
+ self.stop()
if rpc:
rpc.cleanup()
- self.stop()
return status
@@ -189,7 +179,8 @@ class ProcessLauncher(object):
random.seed()
launcher = Launcher()
- launcher.run_service(service)
+ launcher.launch_service(service)
+ launcher.wait()
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
@@ -313,15 +304,60 @@ class Service(object):
def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup(threads)
+ # signal that the service is done shutting itself down:
+ self._done = event.Event()
+
def start(self):
pass
def stop(self):
self.tg.stop()
+ self.tg.wait()
+ self._done.send()
+
+ def wait(self):
+ self._done.wait()
+
+
+class Services(object):
+
+ def __init__(self):
+ self.services = []
+ self.tg = threadgroup.ThreadGroup()
+ self.done = event.Event()
+
+ def add(self, service):
+ self.services.append(service)
+ self.tg.add_thread(self.run_service, service, self.done)
+
+ def stop(self):
+ # wait for graceful shutdown of services:
+ for service in self.services:
+ service.stop()
+ service.wait()
+
+ # each service has performed cleanup, now signal that the run_service
+ # wrapper threads can now die:
+ self.done.send()
+
+ # reap threads:
+ self.tg.stop()
def wait(self):
self.tg.wait()
+ @staticmethod
+ def run_service(service, done):
+ """Service start wrapper.
+
+ :param service: service to run
+ :param done: event to wait on until a shutdown is triggered
+ :returns: None
+
+ """
+ service.start()
+ done.wait()
+
def launch(service, workers=None):
if workers: