From 86a3ed7a173c82422270786fc89cf27cbaccc67e Mon Sep 17 00:00:00 2001 From: Angus Salkeld Date: Wed, 15 Aug 2012 08:20:45 +1000 Subject: Add multiprocess service launcher Part of blueprint service-infrastructure Signed-off-by: Angus Salkeld Change-Id: I848e8c91d2c331e5c78a176258318457a9041131 --- openstack/common/service.py | 190 ++++++++++++++++++++++++++++++++++++++++++-- tests/unit/test_service.py | 137 +++++++++++++++++++++++++++++++- 2 files changed, 319 insertions(+), 8 deletions(-) diff --git a/openstack/common/service.py b/openstack/common/service.py index 5141ae4..b464aea 100644 --- a/openstack/common/service.py +++ b/openstack/common/service.py @@ -19,8 +19,12 @@ """Generic Node base class for all workers that run on hosts.""" +import errno +import os +import random import signal import sys +import time import eventlet import greenlet @@ -119,6 +123,170 @@ class ServiceLauncher(Launcher): return status +class ServiceWrapper(object): + def __init__(self, service, workers): + self.service = service + self.workers = workers + self.children = set() + self.forktimes = [] + + +class ProcessLauncher(object): + def __init__(self): + self.children = {} + self.sigcaught = None + self.running = True + rfd, self.writepipe = os.pipe() + self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r') + + signal.signal(signal.SIGTERM, self._handle_signal) + signal.signal(signal.SIGINT, self._handle_signal) + + def _handle_signal(self, signo, frame): + self.sigcaught = signo + self.running = False + + # Allow the process to be killed again and die from natural causes + signal.signal(signal.SIGTERM, signal.SIG_DFL) + signal.signal(signal.SIGINT, signal.SIG_DFL) + + def _pipe_watcher(self): + # This will block until the write end is closed when the parent + # dies unexpectedly + self.readpipe.read() + + LOG.info(_('Parent process has died unexpectedly, exiting')) + + sys.exit(1) + + def _child_process(self, service): + # Setup child signal handlers differently + def _sigterm(*args): + signal.signal(signal.SIGTERM, signal.SIG_DFL) + raise SignalExit(signal.SIGTERM) + + signal.signal(signal.SIGTERM, _sigterm) + # Block SIGINT and let the parent send us a SIGTERM + signal.signal(signal.SIGINT, signal.SIG_IGN) + + # Reopen the eventlet hub to make sure we don't share an epoll + # fd with parent and/or siblings, which would be bad + eventlet.hubs.use_hub() + + # Close write to ensure only parent has it open + os.close(self.writepipe) + # Create greenthread to watch for parent to close pipe + eventlet.spawn(self._pipe_watcher) + + # Reseed random number generator + random.seed() + + launcher = Launcher() + launcher.run_service(service) + + def _start_child(self, wrap): + if len(wrap.forktimes) > wrap.workers: + # Limit ourselves to one process a second (over the period of + # number of workers * 1 second). This will allow workers to + # start up quickly but ensure we don't fork off children that + # die instantly too quickly. + if time.time() - wrap.forktimes[0] < wrap.workers: + LOG.info(_('Forking too fast, sleeping')) + time.sleep(1) + + wrap.forktimes.pop(0) + + wrap.forktimes.append(time.time()) + + pid = os.fork() + if pid == 0: + # NOTE(johannes): All exceptions are caught to ensure this + # doesn't fallback into the loop spawning children. It would + # be bad for a child to spawn more children. + status = 0 + try: + self._child_process(wrap.service) + except SignalExit as exc: + signame = {signal.SIGTERM: 'SIGTERM', + signal.SIGINT: 'SIGINT'}[exc.signo] + LOG.info(_('Caught %s, exiting'), signame) + status = exc.code + except SystemExit as exc: + status = exc.code + except BaseException: + LOG.exception(_('Unhandled exception')) + status = 2 + finally: + wrap.service.stop() + + os._exit(status) + + LOG.info(_('Started child %d'), pid) + + wrap.children.add(pid) + self.children[pid] = wrap + + return pid + + def launch_service(self, service, workers=1): + wrap = ServiceWrapper(service, workers) + + LOG.info(_('Starting %d workers'), wrap.workers) + while self.running and len(wrap.children) < wrap.workers: + self._start_child(wrap) + + def _wait_child(self): + try: + pid, status = os.wait() + except OSError as exc: + if exc.errno not in (errno.EINTR, errno.ECHILD): + raise + return None + + if os.WIFSIGNALED(status): + sig = os.WTERMSIG(status) + LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals()) + else: + code = os.WEXITSTATUS(status) + LOG.info(_('Child %(pid)d exited with status %(code)d'), locals()) + + if pid not in self.children: + LOG.warning(_('pid %d not in child list'), pid) + return None + + wrap = self.children.pop(pid) + wrap.children.remove(pid) + return wrap + + def wait(self): + """Loop waiting on children to die and respawning as necessary""" + while self.running: + wrap = self._wait_child() + if not wrap: + continue + + while self.running and len(wrap.children) < wrap.workers: + self._start_child(wrap) + + if self.sigcaught: + signame = {signal.SIGTERM: 'SIGTERM', + signal.SIGINT: 'SIGINT'}[self.sigcaught] + LOG.info(_('Caught %s, stopping children'), signame) + + for pid in self.children: + try: + os.kill(pid, signal.SIGTERM) + except OSError as exc: + if exc.errno != errno.ESRCH: + raise + + # Wait for children to die + if self.children: + LOG.info(_('Waiting on %d children to exit'), len(self.children)) + while self.children: + self._wait_child() + + class Service(object): """Service object for binaries running on hosts. @@ -127,18 +295,26 @@ class Service(object): def __init__(self, host, manager, *args, **kwargs): self.host = host self.manager = manager + self.running = False def start(self): + self.running = True if self.manager: self.manager.init_host() def stop(self): - pass + self.running = False def wait(self): - pass - - -def launcher(service): - l = Launcher() - l.launch_service(service) + while self.running: + time.sleep(.1) + + +def launch(service, workers=None): + if workers: + launcher = ProcessLauncher() + launcher.launch_service(service, workers=workers) + else: + launcher = ServiceLauncher() + launcher.launch_service(service) + return launcher diff --git a/tests/unit/test_service.py b/tests/unit/test_service.py index fa79fa8..7e2aa3c 100644 --- a/tests/unit/test_service.py +++ b/tests/unit/test_service.py @@ -19,14 +19,24 @@ """ Unit Tests for remote procedure calls using queue """ +import os +import signal +import time +import traceback + from eventlet import greenthread from openstack.common import cfg -from openstack.common import service +from openstack.common.gettextutils import _ +from openstack.common import log as logging from openstack.common import manager +from openstack.common import service from tests import utils +LOG = logging.getLogger(__name__) + + class ExtendedService(service.Service): def test_method(self): return 'service' @@ -38,3 +48,128 @@ class ServiceManagerTestCase(utils.BaseTestCase): serv = ExtendedService('test', None) serv.start() self.assertEqual(serv.test_method(), 'service') + + +class ServiceLauncherTest(utils.BaseTestCase): + """ + Originally from nova/tests/integrated/test_multiprocess_api.py + """ + + def _spawn(self): + self.workers = 2 + pid = os.fork() + if pid == 0: + # NOTE(johannes): We can't let the child processes exit back + # into the unit test framework since then we'll have multiple + # processes running the same tests (and possibly forking more + # processes that end up in the same situation). So we need + # to catch all exceptions and make sure nothing leaks out, in + # particlar SystemExit, which is raised by sys.exit(). We use + # os._exit() which doesn't have this problem. + status = 0 + try: + launcher = service.ProcessLauncher() + serv = ExtendedService('test', None) + launcher.launch_service(serv, workers=self.workers) + launcher.wait() + except SystemExit as exc: + status = exc.code + except BaseException: + # We need to be defensive here too + try: + traceback.print_exc() + except BaseException: + print "Couldn't print traceback" + status = 2 + + # Really exit + os._exit(status) + + self.pid = pid + + # Wait for up to a second for workers to get started + start = time.time() + while time.time() - start < 1: + workers = self._get_workers() + if len(workers) == self.workers: + break + + time.sleep(.1) + + self.assertEqual(len(workers), self.workers) + return workers + + def tearDown(self): + if self.pid: + # Make sure all processes are stopped + os.kill(self.pid, signal.SIGTERM) + + # Make sure we reap our test process + self._reap_test() + + super(ServiceLauncherTest, self).tearDown() + + def _reap_test(self): + pid, status = os.waitpid(self.pid, 0) + self.pid = None + return status + + def _get_workers(self): + f = os.popen('ps ax -o pid,ppid,command') + # Skip ps header + f.readline() + + processes = [tuple(int(p) for p in l.strip().split()[:2]) + for l in f.readlines()] + return [p for p, pp in processes if pp == self.pid] + + def test_killed_worker_recover(self): + start_workers = self._spawn() + + # kill one worker and check if new worker can come up + LOG.info('pid of first child is %s' % start_workers[0]) + os.kill(start_workers[0], signal.SIGTERM) + + # loop and check if new worker is spawned (for 1 second max) + start = time.time() + while time.time() - start < 1: + end_workers = self._get_workers() + LOG.info('workers: %r' % end_workers) + + if start_workers != end_workers: + break + + time.sleep(.1) + + # Make sure worker pids don't match + self.assertNotEqual(start_workers, end_workers) + + def _terminate_with_signal(self, sig): + self._spawn() + + os.kill(self.pid, sig) + + # loop and check if all processes are killed (for 1 second max) + start = time.time() + while time.time() - start < 1: + workers = self._get_workers() + LOG.info('workers: %r' % workers) + + if not workers: + break + + time.sleep(.1) + + self.assertFalse(workers, 'No OS processes left.') + + def test_terminate_sigkill(self): + self._terminate_with_signal(signal.SIGKILL) + status = self._reap_test() + self.assertTrue(os.WIFSIGNALED(status)) + self.assertEqual(os.WTERMSIG(status), signal.SIGKILL) + + def test_terminate_sigterm(self): + self._terminate_with_signal(signal.SIGTERM) + status = self._reap_test() + self.assertTrue(os.WIFEXITED(status)) + self.assertEqual(os.WEXITSTATUS(status), 0) -- cgit