summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJohannes Erdfelt <johannes.erdfelt@rackspace.com>2012-03-25 02:06:01 +0800
committerJohannes Erdfelt <johannes.erdfelt@rackspace.com>2012-06-28 19:57:37 +0000
commit46c1b6eaee4ca00c256c5c403c6d6bfeaf3b63f8 (patch)
tree365480c3ce477dffa80db3e650d24fc705e52fc2
parent2264c1c0b6ccfe7dc3e4c7e448b4a5eac92758d4 (diff)
downloadnova-46c1b6eaee4ca00c256c5c403c6d6bfeaf3b63f8.tar.gz
nova-46c1b6eaee4ca00c256c5c403c6d6bfeaf3b63f8.tar.xz
nova-46c1b6eaee4ca00c256c5c403c6d6bfeaf3b63f8.zip
Add multi-process support for API services
Implements blueprint multi-process-api-service This is based on Huang Zhiteng's patch. This patch adds support for running services as multiple processes. This is primarily intended to be used with the API service as a way to provide more concurrency than eventlet can sometimes provide. A SIGTERM or SIGINT signal will cause the parent process to gracefully terminate the child processes, allowing them to finish processing the requests currently being processed. The parent will wait for the children to finish before exiting. Change-Id: Ie6d6802626eb42d5e64c4167be363fbf6cea2a1b
-rwxr-xr-xbin/nova-all15
-rwxr-xr-xbin/nova-api10
-rwxr-xr-xbin/nova-api-ec24
-rwxr-xr-xbin/nova-api-metadata4
-rwxr-xr-xbin/nova-api-os-compute4
-rwxr-xr-xbin/nova-api-os-volume4
-rw-r--r--etc/nova/nova.conf.sample12
-rw-r--r--nova/service.py255
-rw-r--r--nova/tests/__init__.py2
-rw-r--r--nova/tests/integrated/test_multiprocess_api.py169
-rw-r--r--nova/tests/test_service.py6
-rw-r--r--nova/tests/test_wsgi.py4
-rw-r--r--nova/wsgi.py52
13 files changed, 451 insertions, 90 deletions
diff --git a/bin/nova-all b/bin/nova-all
index 5c0644b8c..6aee87805 100755
--- a/bin/nova-all
+++ b/bin/nova-all
@@ -28,7 +28,7 @@ continue attempting to launch the rest of the services.
"""
import eventlet
-eventlet.monkey_patch()
+eventlet.monkey_patch(os=False)
import os
import sys
@@ -54,25 +54,26 @@ if __name__ == '__main__':
flags.parse_args(sys.argv)
logging.setup()
utils.monkey_patch()
- servers = []
+ launcher = service.ProcessLauncher()
+
# nova-api
for api in flags.FLAGS.enabled_apis:
try:
- servers.append(service.WSGIService(api))
+ server = service.WSGIService(api)
+ launcher.launch_server(server, workers=server.workers or 1)
except (Exception, SystemExit):
LOG.exception(_('Failed to load %s') % '%s-api' % api)
for mod in [s3server, xvp_proxy]:
try:
- servers.append(mod.get_wsgi_server())
+ launcher.launch_server(mod.get_wsgi_server())
except (Exception, SystemExit):
LOG.exception(_('Failed to load %s') % mod.__name__)
for binary in ['nova-compute', 'nova-volume',
'nova-network', 'nova-scheduler', 'nova-cert']:
try:
- servers.append(service.Service.create(binary=binary))
+ launcher.launch_server(service.Service.create(binary=binary))
except (Exception, SystemExit):
LOG.exception(_('Failed to load %s'), binary)
- service.serve(*servers)
- service.wait()
+ launcher.wait()
diff --git a/bin/nova-api b/bin/nova-api
index e6779df4f..b778854f0 100755
--- a/bin/nova-api
+++ b/bin/nova-api
@@ -24,7 +24,7 @@ Starts both the EC2 and OpenStack APIs in separate greenthreads.
"""
import eventlet
-eventlet.monkey_patch()
+eventlet.monkey_patch(os=False)
import os
import sys
@@ -45,8 +45,8 @@ if __name__ == '__main__':
flags.parse_args(sys.argv)
logging.setup()
utils.monkey_patch()
- servers = []
+ launcher = service.ProcessLauncher()
for api in flags.FLAGS.enabled_apis:
- servers.append(service.WSGIService(api))
- service.serve(*servers)
- service.wait()
+ server = service.WSGIService(api)
+ launcher.launch_server(server, workers=server.workers or 1)
+ launcher.wait()
diff --git a/bin/nova-api-ec2 b/bin/nova-api-ec2
index b53c9158a..f76f3dd7e 100755
--- a/bin/nova-api-ec2
+++ b/bin/nova-api-ec2
@@ -20,7 +20,7 @@
"""Starter script for Nova EC2 API."""
import eventlet
-eventlet.monkey_patch()
+eventlet.monkey_patch(os=False)
import os
import sys
@@ -42,5 +42,5 @@ if __name__ == '__main__':
logging.setup()
utils.monkey_patch()
server = service.WSGIService('ec2')
- service.serve(server)
+ service.serve(server, workers=server.workers)
service.wait()
diff --git a/bin/nova-api-metadata b/bin/nova-api-metadata
index 2f2ef9454..2aad93453 100755
--- a/bin/nova-api-metadata
+++ b/bin/nova-api-metadata
@@ -20,7 +20,7 @@
"""Starter script for Nova Metadata API."""
import eventlet
-eventlet.monkey_patch()
+eventlet.monkey_patch(os=False)
import os
import sys
@@ -42,5 +42,5 @@ if __name__ == '__main__':
logging.setup()
utils.monkey_patch()
server = service.WSGIService('metadata')
- service.serve(server)
+ service.serve(server, workers=server.workers)
service.wait()
diff --git a/bin/nova-api-os-compute b/bin/nova-api-os-compute
index 75c921943..c83855254 100755
--- a/bin/nova-api-os-compute
+++ b/bin/nova-api-os-compute
@@ -20,7 +20,7 @@
"""Starter script for Nova OS API."""
import eventlet
-eventlet.monkey_patch()
+eventlet.monkey_patch(os=False)
import os
import sys
@@ -42,5 +42,5 @@ if __name__ == '__main__':
logging.setup()
utils.monkey_patch()
server = service.WSGIService('osapi_compute')
- service.serve(server)
+ service.serve(server, workers=server.workers)
service.wait()
diff --git a/bin/nova-api-os-volume b/bin/nova-api-os-volume
index b93fd51ae..f08c69384 100755
--- a/bin/nova-api-os-volume
+++ b/bin/nova-api-os-volume
@@ -20,7 +20,7 @@
"""Starter script for Nova OS API."""
import eventlet
-eventlet.monkey_patch()
+eventlet.monkey_patch(os=False)
import os
import sys
@@ -42,5 +42,5 @@ if __name__ == '__main__':
logging.setup()
utils.monkey_patch()
server = service.WSGIService('osapi_volume')
- service.serve(server)
+ service.serve(server, workers=server.workers)
service.wait()
diff --git a/etc/nova/nova.conf.sample b/etc/nova/nova.conf.sample
index a84d212da..879838eb7 100644
--- a/etc/nova/nova.conf.sample
+++ b/etc/nova/nova.conf.sample
@@ -489,12 +489,18 @@
# ec2_listen_port=8773
#### (IntOpt) port for ec2 api to listen
+# ec2_workers=0
+#### (IntOpt) Number of EC2 API workers
+
# osapi_compute_listen=0.0.0.0
#### (StrOpt) IP address for OpenStack API to listen
# osapi_compute_listen_port=8774
#### (IntOpt) list port for osapi compute
+# osapi_compute_workers=0
+#### (IntOpt) Number of workers for OpenStack API
+
# metadata_manager=nova.api.manager.MetadataManager
#### (StrOpt) OpenStack metadata service manager
@@ -504,12 +510,18 @@
# metadata_listen_port=8775
#### (IntOpt) port for metadata api to listen
+# metadata_workers=0
+#### (IntOpt) Number of workers for metadata API
+
# osapi_volume_listen=0.0.0.0
#### (StrOpt) IP address for OpenStack Volume API to listen
# osapi_volume_listen_port=8776
#### (IntOpt) port for os volume api to listen
+# osapi_volume_workers=0
+#### (IntOpt) Number of workers for OpenStack Volume API
+
######## defined in nova.test ########
diff --git a/nova/service.py b/nova/service.py
index c701d3813..d3fb76933 100644
--- a/nova/service.py
+++ b/nova/service.py
@@ -19,10 +19,13 @@
"""Generic Node base class for all workers that run on hosts."""
+import errno
import inspect
import os
import random
import signal
+import sys
+import time
import eventlet
import greenlet
@@ -61,12 +64,18 @@ service_opts = [
cfg.IntOpt('ec2_listen_port',
default=8773,
help='port for ec2 api to listen'),
+ cfg.IntOpt('ec2_workers',
+ default=None,
+ help='Number of workers for EC2 API service'),
cfg.StrOpt('osapi_compute_listen',
default="0.0.0.0",
help='IP address for OpenStack API to listen'),
cfg.IntOpt('osapi_compute_listen_port',
default=8774,
help='list port for osapi compute'),
+ cfg.IntOpt('osapi_compute_workers',
+ default=None,
+ help='Number of workers for OpenStack API service'),
cfg.StrOpt('metadata_manager',
default='nova.api.manager.MetadataManager',
help='OpenStack metadata service manager'),
@@ -76,12 +85,18 @@ service_opts = [
cfg.IntOpt('metadata_listen_port',
default=8775,
help='port for metadata api to listen'),
+ cfg.IntOpt('metadata_workers',
+ default=None,
+ help='Number of workers for metadata service'),
cfg.StrOpt('osapi_volume_listen',
default="0.0.0.0",
help='IP address for OpenStack Volume API to listen'),
cfg.IntOpt('osapi_volume_listen_port',
default=8776,
- help='port for os volume api to listen')
+ help='port for os volume api to listen'),
+ cfg.IntOpt('osapi_volume_workers',
+ default=None,
+ help='Number of workers for OpenStack Volume API service'),
]
FLAGS = flags.FLAGS
@@ -98,6 +113,7 @@ class Launcher(object):
"""
self._services = []
+ eventlet_backdoor.initialize_if_enabled()
@staticmethod
def run_server(server):
@@ -135,15 +151,6 @@ class Launcher(object):
:returns: None
"""
- def sigterm(sig, frame):
- LOG.audit(_("SIGTERM received"))
- # NOTE(jk0): Raise a ^C which is caught by the caller and cleanly
- # shuts down the service. This does not yet handle eventlet
- # threads.
- raise KeyboardInterrupt
-
- signal.signal(signal.SIGTERM, sigterm)
-
for service in self._services:
try:
service.wait()
@@ -151,6 +158,198 @@ class Launcher(object):
pass
+class ServiceLauncher(Launcher):
+ def _handle_signal(self, signo, frame):
+ signame = {signal.SIGTERM: 'SIGTERM', signal.SIGINT: 'SIGINT'}[signo]
+ LOG.info(_('Caught %s, exiting'), signame)
+
+ # Allow the process to be killed again and die from natural causes
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+
+ sys.exit(1)
+
+ def wait(self):
+ signal.signal(signal.SIGTERM, self._handle_signal)
+ signal.signal(signal.SIGINT, self._handle_signal)
+
+ LOG.debug(_('Full set of FLAGS:'))
+ for flag in FLAGS:
+ flag_get = FLAGS.get(flag, None)
+ # hide flag contents from log if contains a password
+ # should use secret flag when switch over to openstack-common
+ if ("_password" in flag or "_key" in flag or
+ (flag == "sql_connection" and "mysql:" in flag_get)):
+ LOG.debug(_('%(flag)s : FLAG SET ') % locals())
+ else:
+ LOG.debug('%(flag)s : %(flag_get)s' % locals())
+
+ status = None
+ try:
+ super(ServiceLauncher, self).wait()
+ except SystemExit as exc:
+ status = exc.code
+ self.stop()
+ rpc.cleanup()
+
+ if status is not None:
+ sys.exit(status)
+
+
+class ServerWrapper(object):
+ def __init__(self, server, workers):
+ self.server = server
+ self.workers = workers
+ self.children = set()
+ self.forktimes = []
+
+
+class ProcessLauncher(object):
+ def __init__(self):
+ self.children = {}
+ self.running = True
+ rfd, self.writepipe = os.pipe()
+ self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
+
+ signal.signal(signal.SIGTERM, self._handle_signal)
+ signal.signal(signal.SIGINT, self._handle_signal)
+
+ def _handle_signal(self, signo, frame):
+ signame = {signal.SIGTERM: 'SIGTERM', signal.SIGINT: 'SIGINT'}[signo]
+ LOG.info(_('Caught %s, stopping children'), signame)
+
+ self.running = False
+ for pid in self.children:
+ try:
+ os.kill(pid, signal.SIGTERM)
+ except OSError as exc:
+ if exc.errno != errno.ESRCH:
+ raise
+
+ # Allow the process to be killed again and die from natural causes
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+
+ def _pipe_watcher(self):
+ # This will block until the write end is closed when the parent
+ # dies unexpectedly
+ self.readpipe.read()
+
+ LOG.info(_('Parent process has died unexpectedly, exiting'))
+
+ sys.exit(1)
+
+ def _child_process(self, server):
+ # Setup child signal handlers differently
+ def _sigterm(*args):
+ LOG.info(_('Received SIGTERM, stopping'))
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ server.stop()
+
+ signal.signal(signal.SIGTERM, _sigterm)
+ # Block SIGINT and let the parent send us a SIGTERM
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+ # Reopen the eventlet hub to make sure we don't share an epoll
+ # fd with parent and/or siblings, which would be bad
+ eventlet.hubs.use_hub()
+
+ # Close write to ensure only parent has it open
+ os.close(self.writepipe)
+ # Create greenthread to watch for parent to close pipe
+ eventlet.spawn(self._pipe_watcher)
+
+ # Reseed random number generator
+ random.seed()
+
+ launcher = Launcher()
+ launcher.run_server(server)
+
+ def _start_child(self, wrap):
+ if len(wrap.forktimes) > wrap.workers:
+ # Limit ourselves to one process a second (over the period of
+ # number of workers * 1 second). This will allow workers to
+ # start up quickly but ensure we don't fork off children that
+ # die instantly too quickly.
+ if time.time() - wrap.forktimes[0] < wrap.workers:
+ LOG.info(_('Forking too fast, sleeping'))
+ time.sleep(1)
+
+ wrap.forktimes.pop(0)
+
+ wrap.forktimes.append(time.time())
+
+ pid = os.fork()
+ if pid == 0:
+ # NOTE(johannes): All exceptions are caught to ensure this
+ # doesn't fallback into the loop spawning children. It would
+ # be bad for a child to spawn more children.
+ status = 0
+ try:
+ self._child_process(wrap.server)
+ except SystemExit as exc:
+ status = exc.code
+ except BaseException:
+ LOG.exception(_('Unhandled exception'))
+ status = 2
+
+ os._exit(status)
+
+ LOG.info(_('Started child %d'), pid)
+
+ wrap.children.add(pid)
+ self.children[pid] = wrap
+
+ return pid
+
+ def launch_server(self, server, workers=1):
+ wrap = ServerWrapper(server, workers)
+
+ LOG.info(_('Starting %d workers'), wrap.workers)
+ while self.running and len(wrap.children) < wrap.workers:
+ self._start_child(wrap)
+
+ def _wait_child(self):
+ try:
+ pid, status = os.wait()
+ except OSError as exc:
+ if exc.errno not in (errno.EINTR, errno.ECHILD):
+ raise
+ return None
+
+ if os.WIFSIGNALED(status):
+ sig = os.WTERMSIG(status)
+ LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals())
+ else:
+ code = os.WEXITSTATUS(status)
+ LOG.info(_('Child %(pid)d exited with status %(code)d'), locals())
+
+ if pid not in self.children:
+ LOG.warning(_('pid %d not in child list'), pid)
+ return None
+
+ wrap = self.children.pop(pid)
+ wrap.children.remove(pid)
+ return wrap
+
+ def wait(self):
+ """Loop waiting on children to die and respawning as necessary"""
+ # Loop calling wait and respawning as necessary
+ while self.running:
+ wrap = self._wait_child()
+ if not wrap:
+ continue
+
+ while self.running and len(wrap.children) < wrap.workers:
+ self._start_child(wrap)
+
+ # Wait for children to die
+ if self.children:
+ LOG.info(_('Waiting on %d children to exit'), len(self.children))
+ while self.children:
+ self._wait_child()
+
+
class Service(object):
"""Service object for binaries running on hosts.
@@ -170,7 +369,6 @@ class Service(object):
self.report_interval = report_interval
self.periodic_interval = periodic_interval
self.periodic_fuzzy_delay = periodic_fuzzy_delay
- super(Service, self).__init__(*args, **kwargs)
self.saved_args, self.saved_kwargs = args, kwargs
self.timers = []
@@ -361,10 +559,13 @@ class WSGIService(object):
self.app = self.loader.load_app(name)
self.host = getattr(FLAGS, '%s_listen' % name, "0.0.0.0")
self.port = getattr(FLAGS, '%s_listen_port' % name, 0)
+ self.workers = getattr(FLAGS, '%s_workers' % name, None)
self.server = wsgi.Server(name,
self.app,
host=self.host,
port=self.port)
+ # Pull back actual port used
+ self.port = self.server.port
def _get_manager(self):
"""Initialize a Manager object appropriate for this service.
@@ -400,7 +601,6 @@ class WSGIService(object):
if self.manager:
self.manager.init_host()
self.server.start()
- self.port = self.server.port
def stop(self):
"""Stop serving this API.
@@ -425,29 +625,18 @@ class WSGIService(object):
_launcher = None
-def serve(*servers):
+def serve(server, workers=None):
global _launcher
- if not _launcher:
- _launcher = Launcher()
- for server in servers:
+ if _launcher:
+ raise RuntimeError(_('serve() can only be called once'))
+
+ if workers:
+ _launcher = ProcessLauncher()
+ _launcher.launch_server(server, workers=workers)
+ else:
+ _launcher = ServiceLauncher()
_launcher.launch_server(server)
- eventlet_backdoor.initialize_if_enabled()
-
def wait():
- LOG.debug(_('Full set of FLAGS:'))
- for flag in FLAGS:
- flag_get = FLAGS.get(flag, None)
- # hide flag contents from log if contains a password
- # should use secret flag when switch over to openstack-common
- if ("_password" in flag or "_key" in flag or
- (flag == "sql_connection" and "mysql:" in flag_get)):
- LOG.debug(_('%(flag)s : FLAG SET ') % locals())
- else:
- LOG.debug('%(flag)s : %(flag_get)s' % locals())
- try:
- _launcher.wait()
- except KeyboardInterrupt:
- _launcher.stop()
- rpc.cleanup()
+ _launcher.wait()
diff --git a/nova/tests/__init__.py b/nova/tests/__init__.py
index dff2ad649..9ff38e3a9 100644
--- a/nova/tests/__init__.py
+++ b/nova/tests/__init__.py
@@ -44,7 +44,7 @@ from nova import log as logging
import eventlet
-eventlet.monkey_patch()
+eventlet.monkey_patch(os=False)
FLAGS = flags.FLAGS
FLAGS.use_stderr = False
diff --git a/nova/tests/integrated/test_multiprocess_api.py b/nova/tests/integrated/test_multiprocess_api.py
new file mode 100644
index 000000000..6fe1479cd
--- /dev/null
+++ b/nova/tests/integrated/test_multiprocess_api.py
@@ -0,0 +1,169 @@
+# Copyright (c) 2012 Intel, LLC
+# Copyright (c) 2012 OpenStack, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Test multiprocess enabled API service.
+"""
+import os
+import signal
+import time
+import traceback
+
+from nova import flags
+from nova.log import logging
+from nova import service
+from nova.tests.integrated import integrated_helpers
+
+FLAGS = flags.FLAGS
+LOG = logging.getLogger(__name__)
+
+
+class MultiprocessWSGITest(integrated_helpers._IntegratedTestBase):
+ def _start_api_service(self):
+ # Process will be started in _spawn()
+ self.osapi = service.WSGIService("osapi_compute")
+ self.auth_url = 'http://%s:%s/v2' % (self.osapi.host, self.osapi.port)
+ LOG.info('auth_url = %s' % self.auth_url)
+
+ def _get_flags(self):
+ self.workers = 2
+ f = super(MultiprocessWSGITest, self)._get_flags()
+ f['osapi_compute_workers'] = self.workers
+ return f
+
+ def _spawn(self):
+ pid = os.fork()
+ if pid == 0:
+ # NOTE(johannes): We can't let the child processes exit back
+ # into the unit test framework since then we'll have multiple
+ # processes running the same tests (and possibly forking more
+ # processes that end up in the same situation). So we need
+ # to catch all exceptions and make sure nothing leaks out, in
+ # particlar SystemExit, which is raised by sys.exit(). We use
+ # os._exit() which doesn't have this problem.
+ status = 0
+ try:
+ launcher = service.ProcessLauncher()
+ launcher.launch_server(self.osapi, workers=self.osapi.workers)
+ launcher.wait()
+ except SystemExit as exc:
+ status = exc.code
+ except BaseException:
+ # We need to be defensive here too
+ try:
+ traceback.print_exc()
+ except BaseException:
+ print "Couldn't print traceback"
+ status = 2
+
+ # Really exit
+ os._exit(status)
+
+ self.pid = pid
+
+ # Wait for up to a second for workers to get started
+ start = time.time()
+ while time.time() - start < 1:
+ workers = self._get_workers()
+ if len(workers) == self.workers:
+ break
+
+ time.sleep(.1)
+
+ self.assertEqual(len(workers), self.workers)
+ return workers
+
+ def tearDown(self):
+ if self.pid:
+ # Make sure all processes are stopped
+ os.kill(self.pid, signal.SIGTERM)
+
+ # Make sure we reap our test process
+ self._reap_test()
+
+ super(MultiprocessWSGITest, self).tearDown()
+
+ def _reap_test(self):
+ pid, status = os.waitpid(self.pid, 0)
+ self.pid = None
+ return status
+
+ def _get_workers(self):
+ f = os.popen('ps ax -o pid,ppid,command')
+ # Skip ps header
+ f.readline()
+
+ processes = [tuple(int(p) for p in l.strip().split()[:2])
+ for l in f.readlines()]
+ return [p for p, pp in processes if pp == self.pid]
+
+ def test_killed_worker_recover(self):
+ start_workers = self._spawn()
+
+ # kill one worker and check if new worker can come up
+ LOG.info('pid of first child is %s' % start_workers[0])
+ os.kill(start_workers[0], signal.SIGTERM)
+
+ # loop and check if new worker is spawned (for 1 second max)
+ start = time.time()
+ while time.time() - start < 1:
+ end_workers = self._get_workers()
+ LOG.info('workers: %r' % end_workers)
+
+ if start_workers != end_workers:
+ break
+
+ time.sleep(.1)
+
+ # Make sure worker pids don't match
+ self.assertNotEqual(start_workers, end_workers)
+
+ # check if api service still works
+ flavors = self.api.get_flavors()
+ self.assertTrue(len(flavors) > 0, 'Num of flavors > 0.')
+
+ def _terminate_with_signal(self, sig):
+ self._spawn()
+
+ # check if api service is working
+ flavors = self.api.get_flavors()
+ self.assertTrue(len(flavors) > 0, 'Num of flavors > 0.')
+
+ os.kill(self.pid, sig)
+
+ # loop and check if all processes are killed (for 1 second max)
+ start = time.time()
+ while time.time() - start < 1:
+ workers = self._get_workers()
+ LOG.info('workers: %r' % workers)
+
+ if not workers:
+ break
+
+ time.sleep(.1)
+
+ self.assertFalse(workers, 'No OS processes left.')
+
+ def test_terminate_sigkill(self):
+ self._terminate_with_signal(signal.SIGKILL)
+ status = self._reap_test()
+ self.assertTrue(os.WIFSIGNALED(status))
+ self.assertEqual(os.WTERMSIG(status), signal.SIGKILL)
+
+ def test_terminate_sigterm(self):
+ self._terminate_with_signal(signal.SIGTERM)
+ status = self._reap_test()
+ self.assertTrue(os.WIFEXITED(status))
+ self.assertEqual(os.WEXITSTATUS(status), 0)
diff --git a/nova/tests/test_service.py b/nova/tests/test_service.py
index 2b6c83653..6aa390a11 100644
--- a/nova/tests/test_service.py
+++ b/nova/tests/test_service.py
@@ -40,7 +40,7 @@ test_service_opts = [
default="nova.tests.test_service.FakeManager",
help="Manager for testing"),
cfg.StrOpt("test_service_listen",
- default=None,
+ default='127.0.0.1',
help="Host to bind test service to"),
cfg.IntOpt("test_service_listen_port",
default=0,
@@ -202,7 +202,6 @@ class TestWSGIService(test.TestCase):
def test_service_random_port(self):
test_service = service.WSGIService("test_service")
- self.assertEquals(0, test_service.port)
test_service.start()
self.assertNotEqual(0, test_service.port)
test_service.stop()
@@ -216,10 +215,7 @@ class TestLauncher(test.TestCase):
self.service = service.WSGIService("test_service")
def test_launch_app(self):
- self.assertEquals(0, self.service.port)
launcher = service.Launcher()
launcher.launch_server(self.service)
- # Give spawned thread a chance to execute
- greenthread.sleep(0)
self.assertNotEquals(0, self.service.port)
launcher.stop()
diff --git a/nova/tests/test_wsgi.py b/nova/tests/test_wsgi.py
index 20b2bf81e..9f694f4bc 100644
--- a/nova/tests/test_wsgi.py
+++ b/nova/tests/test_wsgi.py
@@ -84,8 +84,8 @@ class TestWSGIServer(unittest.TestCase):
self.assertEquals("test_app", server.name)
def test_start_random_port(self):
- server = nova.wsgi.Server("test_random_port", None, host="127.0.0.1")
- self.assertEqual(0, server.port)
+ server = nova.wsgi.Server("test_random_port", None,
+ host="127.0.0.1", port=0)
server.start()
self.assertNotEqual(0, server.port)
server.stop()
diff --git a/nova/wsgi.py b/nova/wsgi.py
index 436f4df2e..a02fdcc18 100644
--- a/nova/wsgi.py
+++ b/nova/wsgi.py
@@ -44,8 +44,8 @@ class Server(object):
default_pool_size = 1000
- def __init__(self, name, app, host=None, port=None, pool_size=None,
- protocol=eventlet.wsgi.HttpProtocol):
+ def __init__(self, name, app, host='0.0.0.0', port=0, pool_size=None,
+ protocol=eventlet.wsgi.HttpProtocol, backlog=128):
"""Initialize, but do not start, a WSGI server.
:param name: Pretty name for logging.
@@ -53,47 +53,37 @@ class Server(object):
:param host: IP address to serve the application.
:param port: Port number to server the application.
:param pool_size: Maximum number of eventlets to spawn concurrently.
+ :param backlog: Maximum number of queued connections.
:returns: None
-
+ :raises: nova.exception.InvalidInput
"""
self.name = name
self.app = app
- self.host = host or "0.0.0.0"
- self.port = port or 0
self._server = None
- self._socket = None
self._protocol = protocol
self._pool = eventlet.GreenPool(pool_size or self.default_pool_size)
- self._logger = logging.getLogger("eventlet.wsgi.server")
+ self._logger = logging.getLogger("nova.%s.wsgi.server" % self.name)
self._wsgi_logger = logging.WritableLogger(self._logger)
- def _start(self):
- """Run the blocking eventlet WSGI server.
-
- :returns: None
+ if backlog < 1:
+ raise exception.InvalidInput(
+ reason='The backlog must be more than 1')
- """
- eventlet.wsgi.server(self._socket,
- self.app,
- protocol=self._protocol,
- custom_pool=self._pool,
- log=self._wsgi_logger)
+ self._socket = eventlet.listen((host, port), backlog=backlog)
+ (self.host, self.port) = self._socket.getsockname()
+ LOG.info(_("%(name)s listening on %(host)s:%(port)s") % self.__dict__)
- def start(self, backlog=128):
+ def start(self):
"""Start serving a WSGI application.
- :param backlog: Maximum number of queued connections.
:returns: None
- :raises: nova.exception.InvalidInput
-
"""
- if backlog < 1:
- raise exception.InvalidInput(
- reason='The backlog must be more than 1')
- self._socket = eventlet.listen((self.host, self.port), backlog=backlog)
- self._server = eventlet.spawn(self._start)
- (self.host, self.port) = self._socket.getsockname()
- LOG.info(_("Started %(name)s on %(host)s:%(port)s") % self.__dict__)
+ self._server = eventlet.spawn(eventlet.wsgi.server,
+ self._socket,
+ self.app,
+ protocol=self._protocol,
+ custom_pool=self._pool,
+ log=self._wsgi_logger)
def stop(self):
"""Stop this server.
@@ -105,7 +95,11 @@ class Server(object):
"""
LOG.info(_("Stopping WSGI server."))
- self._server.kill()
+
+ if self._server is not None:
+ # Resize pool to stop new requests from being processed
+ self._pool.resize(0)
+ self._server.kill()
def wait(self):
"""Block, until the server has stopped.