summaryrefslogtreecommitdiffstats
path: root/tests/unit/test_service.py
diff options
context:
space:
mode:
authorAngus Salkeld <asalkeld@redhat.com>2012-08-15 08:20:45 +1000
committerAngus Salkeld <asalkeld@redhat.com>2012-08-20 20:32:23 +1000
commit86a3ed7a173c82422270786fc89cf27cbaccc67e (patch)
tree5d024dec0944330b8a437ec83cea83e15b13a1c9 /tests/unit/test_service.py
parent053789a75b7fcde1bde82ee366ad43f3aa7e70b9 (diff)
downloadoslo-86a3ed7a173c82422270786fc89cf27cbaccc67e.tar.gz
oslo-86a3ed7a173c82422270786fc89cf27cbaccc67e.tar.xz
oslo-86a3ed7a173c82422270786fc89cf27cbaccc67e.zip
Add multiprocess service launcher
Part of blueprint service-infrastructure Signed-off-by: Angus Salkeld <asalkeld@redhat.com> Change-Id: I848e8c91d2c331e5c78a176258318457a9041131
Diffstat (limited to 'tests/unit/test_service.py')
-rw-r--r--tests/unit/test_service.py137
1 files changed, 136 insertions, 1 deletions
diff --git a/tests/unit/test_service.py b/tests/unit/test_service.py
index fa79fa8..7e2aa3c 100644
--- a/tests/unit/test_service.py
+++ b/tests/unit/test_service.py
@@ -19,14 +19,24 @@
"""
Unit Tests for remote procedure calls using queue
"""
+import os
+import signal
+import time
+import traceback
+
from eventlet import greenthread
from openstack.common import cfg
-from openstack.common import service
+from openstack.common.gettextutils import _
+from openstack.common import log as logging
from openstack.common import manager
+from openstack.common import service
from tests import utils
+LOG = logging.getLogger(__name__)
+
+
class ExtendedService(service.Service):
def test_method(self):
return 'service'
@@ -38,3 +48,128 @@ class ServiceManagerTestCase(utils.BaseTestCase):
serv = ExtendedService('test', None)
serv.start()
self.assertEqual(serv.test_method(), 'service')
+
+
+class ServiceLauncherTest(utils.BaseTestCase):
+ """
+ Originally from nova/tests/integrated/test_multiprocess_api.py
+ """
+
+ def _spawn(self):
+ self.workers = 2
+ pid = os.fork()
+ if pid == 0:
+ # NOTE(johannes): We can't let the child processes exit back
+ # into the unit test framework since then we'll have multiple
+ # processes running the same tests (and possibly forking more
+ # processes that end up in the same situation). So we need
+ # to catch all exceptions and make sure nothing leaks out, in
+ # particlar SystemExit, which is raised by sys.exit(). We use
+ # os._exit() which doesn't have this problem.
+ status = 0
+ try:
+ launcher = service.ProcessLauncher()
+ serv = ExtendedService('test', None)
+ launcher.launch_service(serv, workers=self.workers)
+ launcher.wait()
+ except SystemExit as exc:
+ status = exc.code
+ except BaseException:
+ # We need to be defensive here too
+ try:
+ traceback.print_exc()
+ except BaseException:
+ print "Couldn't print traceback"
+ status = 2
+
+ # Really exit
+ os._exit(status)
+
+ self.pid = pid
+
+ # Wait for up to a second for workers to get started
+ start = time.time()
+ while time.time() - start < 1:
+ workers = self._get_workers()
+ if len(workers) == self.workers:
+ break
+
+ time.sleep(.1)
+
+ self.assertEqual(len(workers), self.workers)
+ return workers
+
+ def tearDown(self):
+ if self.pid:
+ # Make sure all processes are stopped
+ os.kill(self.pid, signal.SIGTERM)
+
+ # Make sure we reap our test process
+ self._reap_test()
+
+ super(ServiceLauncherTest, self).tearDown()
+
+ def _reap_test(self):
+ pid, status = os.waitpid(self.pid, 0)
+ self.pid = None
+ return status
+
+ def _get_workers(self):
+ f = os.popen('ps ax -o pid,ppid,command')
+ # Skip ps header
+ f.readline()
+
+ processes = [tuple(int(p) for p in l.strip().split()[:2])
+ for l in f.readlines()]
+ return [p for p, pp in processes if pp == self.pid]
+
+ def test_killed_worker_recover(self):
+ start_workers = self._spawn()
+
+ # kill one worker and check if new worker can come up
+ LOG.info('pid of first child is %s' % start_workers[0])
+ os.kill(start_workers[0], signal.SIGTERM)
+
+ # loop and check if new worker is spawned (for 1 second max)
+ start = time.time()
+ while time.time() - start < 1:
+ end_workers = self._get_workers()
+ LOG.info('workers: %r' % end_workers)
+
+ if start_workers != end_workers:
+ break
+
+ time.sleep(.1)
+
+ # Make sure worker pids don't match
+ self.assertNotEqual(start_workers, end_workers)
+
+ def _terminate_with_signal(self, sig):
+ self._spawn()
+
+ os.kill(self.pid, sig)
+
+ # loop and check if all processes are killed (for 1 second max)
+ start = time.time()
+ while time.time() - start < 1:
+ workers = self._get_workers()
+ LOG.info('workers: %r' % workers)
+
+ if not workers:
+ break
+
+ time.sleep(.1)
+
+ self.assertFalse(workers, 'No OS processes left.')
+
+ def test_terminate_sigkill(self):
+ self._terminate_with_signal(signal.SIGKILL)
+ status = self._reap_test()
+ self.assertTrue(os.WIFSIGNALED(status))
+ self.assertEqual(os.WTERMSIG(status), signal.SIGKILL)
+
+ def test_terminate_sigterm(self):
+ self._terminate_with_signal(signal.SIGTERM)
+ status = self._reap_test()
+ self.assertTrue(os.WIFEXITED(status))
+ self.assertEqual(os.WEXITSTATUS(status), 0)