summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAngus Salkeld <asalkeld@redhat.com>2012-08-15 08:20:45 +1000
committerAngus Salkeld <asalkeld@redhat.com>2012-08-20 20:32:23 +1000
commit86a3ed7a173c82422270786fc89cf27cbaccc67e (patch)
tree5d024dec0944330b8a437ec83cea83e15b13a1c9
parent053789a75b7fcde1bde82ee366ad43f3aa7e70b9 (diff)
downloadoslo-86a3ed7a173c82422270786fc89cf27cbaccc67e.tar.gz
oslo-86a3ed7a173c82422270786fc89cf27cbaccc67e.tar.xz
oslo-86a3ed7a173c82422270786fc89cf27cbaccc67e.zip
Add multiprocess service launcher
Part of blueprint service-infrastructure Signed-off-by: Angus Salkeld <asalkeld@redhat.com> Change-Id: I848e8c91d2c331e5c78a176258318457a9041131
-rw-r--r--openstack/common/service.py190
-rw-r--r--tests/unit/test_service.py137
2 files changed, 319 insertions, 8 deletions
diff --git a/openstack/common/service.py b/openstack/common/service.py
index 5141ae4..b464aea 100644
--- a/openstack/common/service.py
+++ b/openstack/common/service.py
@@ -19,8 +19,12 @@
"""Generic Node base class for all workers that run on hosts."""
+import errno
+import os
+import random
import signal
import sys
+import time
import eventlet
import greenlet
@@ -119,6 +123,170 @@ class ServiceLauncher(Launcher):
return status
+class ServiceWrapper(object):
+ def __init__(self, service, workers):
+ self.service = service
+ self.workers = workers
+ self.children = set()
+ self.forktimes = []
+
+
+class ProcessLauncher(object):
+ def __init__(self):
+ self.children = {}
+ self.sigcaught = None
+ self.running = True
+ rfd, self.writepipe = os.pipe()
+ self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
+
+ signal.signal(signal.SIGTERM, self._handle_signal)
+ signal.signal(signal.SIGINT, self._handle_signal)
+
+ def _handle_signal(self, signo, frame):
+ self.sigcaught = signo
+ self.running = False
+
+ # Allow the process to be killed again and die from natural causes
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+
+ def _pipe_watcher(self):
+ # This will block until the write end is closed when the parent
+ # dies unexpectedly
+ self.readpipe.read()
+
+ LOG.info(_('Parent process has died unexpectedly, exiting'))
+
+ sys.exit(1)
+
+ def _child_process(self, service):
+ # Setup child signal handlers differently
+ def _sigterm(*args):
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ raise SignalExit(signal.SIGTERM)
+
+ signal.signal(signal.SIGTERM, _sigterm)
+ # Block SIGINT and let the parent send us a SIGTERM
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+ # Reopen the eventlet hub to make sure we don't share an epoll
+ # fd with parent and/or siblings, which would be bad
+ eventlet.hubs.use_hub()
+
+ # Close write to ensure only parent has it open
+ os.close(self.writepipe)
+ # Create greenthread to watch for parent to close pipe
+ eventlet.spawn(self._pipe_watcher)
+
+ # Reseed random number generator
+ random.seed()
+
+ launcher = Launcher()
+ launcher.run_service(service)
+
+ def _start_child(self, wrap):
+ if len(wrap.forktimes) > wrap.workers:
+ # Limit ourselves to one process a second (over the period of
+ # number of workers * 1 second). This will allow workers to
+ # start up quickly but ensure we don't fork off children that
+ # die instantly too quickly.
+ if time.time() - wrap.forktimes[0] < wrap.workers:
+ LOG.info(_('Forking too fast, sleeping'))
+ time.sleep(1)
+
+ wrap.forktimes.pop(0)
+
+ wrap.forktimes.append(time.time())
+
+ pid = os.fork()
+ if pid == 0:
+ # NOTE(johannes): All exceptions are caught to ensure this
+ # doesn't fallback into the loop spawning children. It would
+ # be bad for a child to spawn more children.
+ status = 0
+ try:
+ self._child_process(wrap.service)
+ except SignalExit as exc:
+ signame = {signal.SIGTERM: 'SIGTERM',
+ signal.SIGINT: 'SIGINT'}[exc.signo]
+ LOG.info(_('Caught %s, exiting'), signame)
+ status = exc.code
+ except SystemExit as exc:
+ status = exc.code
+ except BaseException:
+ LOG.exception(_('Unhandled exception'))
+ status = 2
+ finally:
+ wrap.service.stop()
+
+ os._exit(status)
+
+ LOG.info(_('Started child %d'), pid)
+
+ wrap.children.add(pid)
+ self.children[pid] = wrap
+
+ return pid
+
+ def launch_service(self, service, workers=1):
+ wrap = ServiceWrapper(service, workers)
+
+ LOG.info(_('Starting %d workers'), wrap.workers)
+ while self.running and len(wrap.children) < wrap.workers:
+ self._start_child(wrap)
+
+ def _wait_child(self):
+ try:
+ pid, status = os.wait()
+ except OSError as exc:
+ if exc.errno not in (errno.EINTR, errno.ECHILD):
+ raise
+ return None
+
+ if os.WIFSIGNALED(status):
+ sig = os.WTERMSIG(status)
+ LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals())
+ else:
+ code = os.WEXITSTATUS(status)
+ LOG.info(_('Child %(pid)d exited with status %(code)d'), locals())
+
+ if pid not in self.children:
+ LOG.warning(_('pid %d not in child list'), pid)
+ return None
+
+ wrap = self.children.pop(pid)
+ wrap.children.remove(pid)
+ return wrap
+
+ def wait(self):
+ """Loop waiting on children to die and respawning as necessary"""
+ while self.running:
+ wrap = self._wait_child()
+ if not wrap:
+ continue
+
+ while self.running and len(wrap.children) < wrap.workers:
+ self._start_child(wrap)
+
+ if self.sigcaught:
+ signame = {signal.SIGTERM: 'SIGTERM',
+ signal.SIGINT: 'SIGINT'}[self.sigcaught]
+ LOG.info(_('Caught %s, stopping children'), signame)
+
+ for pid in self.children:
+ try:
+ os.kill(pid, signal.SIGTERM)
+ except OSError as exc:
+ if exc.errno != errno.ESRCH:
+ raise
+
+ # Wait for children to die
+ if self.children:
+ LOG.info(_('Waiting on %d children to exit'), len(self.children))
+ while self.children:
+ self._wait_child()
+
+
class Service(object):
"""Service object for binaries running on hosts.
@@ -127,18 +295,26 @@ class Service(object):
def __init__(self, host, manager, *args, **kwargs):
self.host = host
self.manager = manager
+ self.running = False
def start(self):
+ self.running = True
if self.manager:
self.manager.init_host()
def stop(self):
- pass
+ self.running = False
def wait(self):
- pass
-
-
-def launcher(service):
- l = Launcher()
- l.launch_service(service)
+ while self.running:
+ time.sleep(.1)
+
+
+def launch(service, workers=None):
+ if workers:
+ launcher = ProcessLauncher()
+ launcher.launch_service(service, workers=workers)
+ else:
+ launcher = ServiceLauncher()
+ launcher.launch_service(service)
+ return launcher
diff --git a/tests/unit/test_service.py b/tests/unit/test_service.py
index fa79fa8..7e2aa3c 100644
--- a/tests/unit/test_service.py
+++ b/tests/unit/test_service.py
@@ -19,14 +19,24 @@
"""
Unit Tests for remote procedure calls using queue
"""
+import os
+import signal
+import time
+import traceback
+
from eventlet import greenthread
from openstack.common import cfg
-from openstack.common import service
+from openstack.common.gettextutils import _
+from openstack.common import log as logging
from openstack.common import manager
+from openstack.common import service
from tests import utils
+LOG = logging.getLogger(__name__)
+
+
class ExtendedService(service.Service):
def test_method(self):
return 'service'
@@ -38,3 +48,128 @@ class ServiceManagerTestCase(utils.BaseTestCase):
serv = ExtendedService('test', None)
serv.start()
self.assertEqual(serv.test_method(), 'service')
+
+
+class ServiceLauncherTest(utils.BaseTestCase):
+ """
+ Originally from nova/tests/integrated/test_multiprocess_api.py
+ """
+
+ def _spawn(self):
+ self.workers = 2
+ pid = os.fork()
+ if pid == 0:
+ # NOTE(johannes): We can't let the child processes exit back
+ # into the unit test framework since then we'll have multiple
+ # processes running the same tests (and possibly forking more
+ # processes that end up in the same situation). So we need
+ # to catch all exceptions and make sure nothing leaks out, in
+ # particlar SystemExit, which is raised by sys.exit(). We use
+ # os._exit() which doesn't have this problem.
+ status = 0
+ try:
+ launcher = service.ProcessLauncher()
+ serv = ExtendedService('test', None)
+ launcher.launch_service(serv, workers=self.workers)
+ launcher.wait()
+ except SystemExit as exc:
+ status = exc.code
+ except BaseException:
+ # We need to be defensive here too
+ try:
+ traceback.print_exc()
+ except BaseException:
+ print "Couldn't print traceback"
+ status = 2
+
+ # Really exit
+ os._exit(status)
+
+ self.pid = pid
+
+ # Wait for up to a second for workers to get started
+ start = time.time()
+ while time.time() - start < 1:
+ workers = self._get_workers()
+ if len(workers) == self.workers:
+ break
+
+ time.sleep(.1)
+
+ self.assertEqual(len(workers), self.workers)
+ return workers
+
+ def tearDown(self):
+ if self.pid:
+ # Make sure all processes are stopped
+ os.kill(self.pid, signal.SIGTERM)
+
+ # Make sure we reap our test process
+ self._reap_test()
+
+ super(ServiceLauncherTest, self).tearDown()
+
+ def _reap_test(self):
+ pid, status = os.waitpid(self.pid, 0)
+ self.pid = None
+ return status
+
+ def _get_workers(self):
+ f = os.popen('ps ax -o pid,ppid,command')
+ # Skip ps header
+ f.readline()
+
+ processes = [tuple(int(p) for p in l.strip().split()[:2])
+ for l in f.readlines()]
+ return [p for p, pp in processes if pp == self.pid]
+
+ def test_killed_worker_recover(self):
+ start_workers = self._spawn()
+
+ # kill one worker and check if new worker can come up
+ LOG.info('pid of first child is %s' % start_workers[0])
+ os.kill(start_workers[0], signal.SIGTERM)
+
+ # loop and check if new worker is spawned (for 1 second max)
+ start = time.time()
+ while time.time() - start < 1:
+ end_workers = self._get_workers()
+ LOG.info('workers: %r' % end_workers)
+
+ if start_workers != end_workers:
+ break
+
+ time.sleep(.1)
+
+ # Make sure worker pids don't match
+ self.assertNotEqual(start_workers, end_workers)
+
+ def _terminate_with_signal(self, sig):
+ self._spawn()
+
+ os.kill(self.pid, sig)
+
+ # loop and check if all processes are killed (for 1 second max)
+ start = time.time()
+ while time.time() - start < 1:
+ workers = self._get_workers()
+ LOG.info('workers: %r' % workers)
+
+ if not workers:
+ break
+
+ time.sleep(.1)
+
+ self.assertFalse(workers, 'No OS processes left.')
+
+ def test_terminate_sigkill(self):
+ self._terminate_with_signal(signal.SIGKILL)
+ status = self._reap_test()
+ self.assertTrue(os.WIFSIGNALED(status))
+ self.assertEqual(os.WTERMSIG(status), signal.SIGKILL)
+
+ def test_terminate_sigterm(self):
+ self._terminate_with_signal(signal.SIGTERM)
+ status = self._reap_test()
+ self.assertTrue(os.WIFEXITED(status))
+ self.assertEqual(os.WEXITSTATUS(status), 0)