diff options
-rwxr-xr-x | bin/nova-all | 15 | ||||
-rwxr-xr-x | bin/nova-api | 10 | ||||
-rwxr-xr-x | bin/nova-api-ec2 | 4 | ||||
-rwxr-xr-x | bin/nova-api-metadata | 4 | ||||
-rwxr-xr-x | bin/nova-api-os-compute | 4 | ||||
-rwxr-xr-x | bin/nova-api-os-volume | 4 | ||||
-rw-r--r-- | etc/nova/nova.conf.sample | 12 | ||||
-rw-r--r-- | nova/service.py | 255 | ||||
-rw-r--r-- | nova/tests/__init__.py | 2 | ||||
-rw-r--r-- | nova/tests/integrated/test_multiprocess_api.py | 169 | ||||
-rw-r--r-- | nova/tests/test_service.py | 6 | ||||
-rw-r--r-- | nova/tests/test_wsgi.py | 4 | ||||
-rw-r--r-- | nova/wsgi.py | 52 |
13 files changed, 451 insertions, 90 deletions
diff --git a/bin/nova-all b/bin/nova-all index 5c0644b8c..6aee87805 100755 --- a/bin/nova-all +++ b/bin/nova-all @@ -28,7 +28,7 @@ continue attempting to launch the rest of the services. """ import eventlet -eventlet.monkey_patch() +eventlet.monkey_patch(os=False) import os import sys @@ -54,25 +54,26 @@ if __name__ == '__main__': flags.parse_args(sys.argv) logging.setup() utils.monkey_patch() - servers = [] + launcher = service.ProcessLauncher() + # nova-api for api in flags.FLAGS.enabled_apis: try: - servers.append(service.WSGIService(api)) + server = service.WSGIService(api) + launcher.launch_server(server, workers=server.workers or 1) except (Exception, SystemExit): LOG.exception(_('Failed to load %s') % '%s-api' % api) for mod in [s3server, xvp_proxy]: try: - servers.append(mod.get_wsgi_server()) + launcher.launch_server(mod.get_wsgi_server()) except (Exception, SystemExit): LOG.exception(_('Failed to load %s') % mod.__name__) for binary in ['nova-compute', 'nova-volume', 'nova-network', 'nova-scheduler', 'nova-cert']: try: - servers.append(service.Service.create(binary=binary)) + launcher.launch_server(service.Service.create(binary=binary)) except (Exception, SystemExit): LOG.exception(_('Failed to load %s'), binary) - service.serve(*servers) - service.wait() + launcher.wait() diff --git a/bin/nova-api b/bin/nova-api index e6779df4f..b778854f0 100755 --- a/bin/nova-api +++ b/bin/nova-api @@ -24,7 +24,7 @@ Starts both the EC2 and OpenStack APIs in separate greenthreads. """ import eventlet -eventlet.monkey_patch() +eventlet.monkey_patch(os=False) import os import sys @@ -45,8 +45,8 @@ if __name__ == '__main__': flags.parse_args(sys.argv) logging.setup() utils.monkey_patch() - servers = [] + launcher = service.ProcessLauncher() for api in flags.FLAGS.enabled_apis: - servers.append(service.WSGIService(api)) - service.serve(*servers) - service.wait() + server = service.WSGIService(api) + launcher.launch_server(server, workers=server.workers or 1) + launcher.wait() diff --git a/bin/nova-api-ec2 b/bin/nova-api-ec2 index b53c9158a..f76f3dd7e 100755 --- a/bin/nova-api-ec2 +++ b/bin/nova-api-ec2 @@ -20,7 +20,7 @@ """Starter script for Nova EC2 API.""" import eventlet -eventlet.monkey_patch() +eventlet.monkey_patch(os=False) import os import sys @@ -42,5 +42,5 @@ if __name__ == '__main__': logging.setup() utils.monkey_patch() server = service.WSGIService('ec2') - service.serve(server) + service.serve(server, workers=server.workers) service.wait() diff --git a/bin/nova-api-metadata b/bin/nova-api-metadata index 2f2ef9454..2aad93453 100755 --- a/bin/nova-api-metadata +++ b/bin/nova-api-metadata @@ -20,7 +20,7 @@ """Starter script for Nova Metadata API.""" import eventlet -eventlet.monkey_patch() +eventlet.monkey_patch(os=False) import os import sys @@ -42,5 +42,5 @@ if __name__ == '__main__': logging.setup() utils.monkey_patch() server = service.WSGIService('metadata') - service.serve(server) + service.serve(server, workers=server.workers) service.wait() diff --git a/bin/nova-api-os-compute b/bin/nova-api-os-compute index 75c921943..c83855254 100755 --- a/bin/nova-api-os-compute +++ b/bin/nova-api-os-compute @@ -20,7 +20,7 @@ """Starter script for Nova OS API.""" import eventlet -eventlet.monkey_patch() +eventlet.monkey_patch(os=False) import os import sys @@ -42,5 +42,5 @@ if __name__ == '__main__': logging.setup() utils.monkey_patch() server = service.WSGIService('osapi_compute') - service.serve(server) + service.serve(server, workers=server.workers) service.wait() diff --git a/bin/nova-api-os-volume b/bin/nova-api-os-volume index b93fd51ae..f08c69384 100755 --- a/bin/nova-api-os-volume +++ b/bin/nova-api-os-volume @@ -20,7 +20,7 @@ """Starter script for Nova OS API.""" import eventlet -eventlet.monkey_patch() +eventlet.monkey_patch(os=False) import os import sys @@ -42,5 +42,5 @@ if __name__ == '__main__': logging.setup() utils.monkey_patch() server = service.WSGIService('osapi_volume') - service.serve(server) + service.serve(server, workers=server.workers) service.wait() diff --git a/etc/nova/nova.conf.sample b/etc/nova/nova.conf.sample index a84d212da..879838eb7 100644 --- a/etc/nova/nova.conf.sample +++ b/etc/nova/nova.conf.sample @@ -489,12 +489,18 @@ # ec2_listen_port=8773 #### (IntOpt) port for ec2 api to listen +# ec2_workers=0 +#### (IntOpt) Number of EC2 API workers + # osapi_compute_listen=0.0.0.0 #### (StrOpt) IP address for OpenStack API to listen # osapi_compute_listen_port=8774 #### (IntOpt) list port for osapi compute +# osapi_compute_workers=0 +#### (IntOpt) Number of workers for OpenStack API + # metadata_manager=nova.api.manager.MetadataManager #### (StrOpt) OpenStack metadata service manager @@ -504,12 +510,18 @@ # metadata_listen_port=8775 #### (IntOpt) port for metadata api to listen +# metadata_workers=0 +#### (IntOpt) Number of workers for metadata API + # osapi_volume_listen=0.0.0.0 #### (StrOpt) IP address for OpenStack Volume API to listen # osapi_volume_listen_port=8776 #### (IntOpt) port for os volume api to listen +# osapi_volume_workers=0 +#### (IntOpt) Number of workers for OpenStack Volume API + ######## defined in nova.test ######## diff --git a/nova/service.py b/nova/service.py index c701d3813..d3fb76933 100644 --- a/nova/service.py +++ b/nova/service.py @@ -19,10 +19,13 @@ """Generic Node base class for all workers that run on hosts.""" +import errno import inspect import os import random import signal +import sys +import time import eventlet import greenlet @@ -61,12 +64,18 @@ service_opts = [ cfg.IntOpt('ec2_listen_port', default=8773, help='port for ec2 api to listen'), + cfg.IntOpt('ec2_workers', + default=None, + help='Number of workers for EC2 API service'), cfg.StrOpt('osapi_compute_listen', default="0.0.0.0", help='IP address for OpenStack API to listen'), cfg.IntOpt('osapi_compute_listen_port', default=8774, help='list port for osapi compute'), + cfg.IntOpt('osapi_compute_workers', + default=None, + help='Number of workers for OpenStack API service'), cfg.StrOpt('metadata_manager', default='nova.api.manager.MetadataManager', help='OpenStack metadata service manager'), @@ -76,12 +85,18 @@ service_opts = [ cfg.IntOpt('metadata_listen_port', default=8775, help='port for metadata api to listen'), + cfg.IntOpt('metadata_workers', + default=None, + help='Number of workers for metadata service'), cfg.StrOpt('osapi_volume_listen', default="0.0.0.0", help='IP address for OpenStack Volume API to listen'), cfg.IntOpt('osapi_volume_listen_port', default=8776, - help='port for os volume api to listen') + help='port for os volume api to listen'), + cfg.IntOpt('osapi_volume_workers', + default=None, + help='Number of workers for OpenStack Volume API service'), ] FLAGS = flags.FLAGS @@ -98,6 +113,7 @@ class Launcher(object): """ self._services = [] + eventlet_backdoor.initialize_if_enabled() @staticmethod def run_server(server): @@ -135,15 +151,6 @@ class Launcher(object): :returns: None """ - def sigterm(sig, frame): - LOG.audit(_("SIGTERM received")) - # NOTE(jk0): Raise a ^C which is caught by the caller and cleanly - # shuts down the service. This does not yet handle eventlet - # threads. - raise KeyboardInterrupt - - signal.signal(signal.SIGTERM, sigterm) - for service in self._services: try: service.wait() @@ -151,6 +158,198 @@ class Launcher(object): pass +class ServiceLauncher(Launcher): + def _handle_signal(self, signo, frame): + signame = {signal.SIGTERM: 'SIGTERM', signal.SIGINT: 'SIGINT'}[signo] + LOG.info(_('Caught %s, exiting'), signame) + + # Allow the process to be killed again and die from natural causes + signal.signal(signal.SIGTERM, signal.SIG_DFL) + signal.signal(signal.SIGINT, signal.SIG_DFL) + + sys.exit(1) + + def wait(self): + signal.signal(signal.SIGTERM, self._handle_signal) + signal.signal(signal.SIGINT, self._handle_signal) + + LOG.debug(_('Full set of FLAGS:')) + for flag in FLAGS: + flag_get = FLAGS.get(flag, None) + # hide flag contents from log if contains a password + # should use secret flag when switch over to openstack-common + if ("_password" in flag or "_key" in flag or + (flag == "sql_connection" and "mysql:" in flag_get)): + LOG.debug(_('%(flag)s : FLAG SET ') % locals()) + else: + LOG.debug('%(flag)s : %(flag_get)s' % locals()) + + status = None + try: + super(ServiceLauncher, self).wait() + except SystemExit as exc: + status = exc.code + self.stop() + rpc.cleanup() + + if status is not None: + sys.exit(status) + + +class ServerWrapper(object): + def __init__(self, server, workers): + self.server = server + self.workers = workers + self.children = set() + self.forktimes = [] + + +class ProcessLauncher(object): + def __init__(self): + self.children = {} + self.running = True + rfd, self.writepipe = os.pipe() + self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r') + + signal.signal(signal.SIGTERM, self._handle_signal) + signal.signal(signal.SIGINT, self._handle_signal) + + def _handle_signal(self, signo, frame): + signame = {signal.SIGTERM: 'SIGTERM', signal.SIGINT: 'SIGINT'}[signo] + LOG.info(_('Caught %s, stopping children'), signame) + + self.running = False + for pid in self.children: + try: + os.kill(pid, signal.SIGTERM) + except OSError as exc: + if exc.errno != errno.ESRCH: + raise + + # Allow the process to be killed again and die from natural causes + signal.signal(signal.SIGTERM, signal.SIG_DFL) + signal.signal(signal.SIGINT, signal.SIG_DFL) + + def _pipe_watcher(self): + # This will block until the write end is closed when the parent + # dies unexpectedly + self.readpipe.read() + + LOG.info(_('Parent process has died unexpectedly, exiting')) + + sys.exit(1) + + def _child_process(self, server): + # Setup child signal handlers differently + def _sigterm(*args): + LOG.info(_('Received SIGTERM, stopping')) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + server.stop() + + signal.signal(signal.SIGTERM, _sigterm) + # Block SIGINT and let the parent send us a SIGTERM + signal.signal(signal.SIGINT, signal.SIG_IGN) + + # Reopen the eventlet hub to make sure we don't share an epoll + # fd with parent and/or siblings, which would be bad + eventlet.hubs.use_hub() + + # Close write to ensure only parent has it open + os.close(self.writepipe) + # Create greenthread to watch for parent to close pipe + eventlet.spawn(self._pipe_watcher) + + # Reseed random number generator + random.seed() + + launcher = Launcher() + launcher.run_server(server) + + def _start_child(self, wrap): + if len(wrap.forktimes) > wrap.workers: + # Limit ourselves to one process a second (over the period of + # number of workers * 1 second). This will allow workers to + # start up quickly but ensure we don't fork off children that + # die instantly too quickly. + if time.time() - wrap.forktimes[0] < wrap.workers: + LOG.info(_('Forking too fast, sleeping')) + time.sleep(1) + + wrap.forktimes.pop(0) + + wrap.forktimes.append(time.time()) + + pid = os.fork() + if pid == 0: + # NOTE(johannes): All exceptions are caught to ensure this + # doesn't fallback into the loop spawning children. It would + # be bad for a child to spawn more children. + status = 0 + try: + self._child_process(wrap.server) + except SystemExit as exc: + status = exc.code + except BaseException: + LOG.exception(_('Unhandled exception')) + status = 2 + + os._exit(status) + + LOG.info(_('Started child %d'), pid) + + wrap.children.add(pid) + self.children[pid] = wrap + + return pid + + def launch_server(self, server, workers=1): + wrap = ServerWrapper(server, workers) + + LOG.info(_('Starting %d workers'), wrap.workers) + while self.running and len(wrap.children) < wrap.workers: + self._start_child(wrap) + + def _wait_child(self): + try: + pid, status = os.wait() + except OSError as exc: + if exc.errno not in (errno.EINTR, errno.ECHILD): + raise + return None + + if os.WIFSIGNALED(status): + sig = os.WTERMSIG(status) + LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals()) + else: + code = os.WEXITSTATUS(status) + LOG.info(_('Child %(pid)d exited with status %(code)d'), locals()) + + if pid not in self.children: + LOG.warning(_('pid %d not in child list'), pid) + return None + + wrap = self.children.pop(pid) + wrap.children.remove(pid) + return wrap + + def wait(self): + """Loop waiting on children to die and respawning as necessary""" + # Loop calling wait and respawning as necessary + while self.running: + wrap = self._wait_child() + if not wrap: + continue + + while self.running and len(wrap.children) < wrap.workers: + self._start_child(wrap) + + # Wait for children to die + if self.children: + LOG.info(_('Waiting on %d children to exit'), len(self.children)) + while self.children: + self._wait_child() + + class Service(object): """Service object for binaries running on hosts. @@ -170,7 +369,6 @@ class Service(object): self.report_interval = report_interval self.periodic_interval = periodic_interval self.periodic_fuzzy_delay = periodic_fuzzy_delay - super(Service, self).__init__(*args, **kwargs) self.saved_args, self.saved_kwargs = args, kwargs self.timers = [] @@ -361,10 +559,13 @@ class WSGIService(object): self.app = self.loader.load_app(name) self.host = getattr(FLAGS, '%s_listen' % name, "0.0.0.0") self.port = getattr(FLAGS, '%s_listen_port' % name, 0) + self.workers = getattr(FLAGS, '%s_workers' % name, None) self.server = wsgi.Server(name, self.app, host=self.host, port=self.port) + # Pull back actual port used + self.port = self.server.port def _get_manager(self): """Initialize a Manager object appropriate for this service. @@ -400,7 +601,6 @@ class WSGIService(object): if self.manager: self.manager.init_host() self.server.start() - self.port = self.server.port def stop(self): """Stop serving this API. @@ -425,29 +625,18 @@ class WSGIService(object): _launcher = None -def serve(*servers): +def serve(server, workers=None): global _launcher - if not _launcher: - _launcher = Launcher() - for server in servers: + if _launcher: + raise RuntimeError(_('serve() can only be called once')) + + if workers: + _launcher = ProcessLauncher() + _launcher.launch_server(server, workers=workers) + else: + _launcher = ServiceLauncher() _launcher.launch_server(server) - eventlet_backdoor.initialize_if_enabled() - def wait(): - LOG.debug(_('Full set of FLAGS:')) - for flag in FLAGS: - flag_get = FLAGS.get(flag, None) - # hide flag contents from log if contains a password - # should use secret flag when switch over to openstack-common - if ("_password" in flag or "_key" in flag or - (flag == "sql_connection" and "mysql:" in flag_get)): - LOG.debug(_('%(flag)s : FLAG SET ') % locals()) - else: - LOG.debug('%(flag)s : %(flag_get)s' % locals()) - try: - _launcher.wait() - except KeyboardInterrupt: - _launcher.stop() - rpc.cleanup() + _launcher.wait() diff --git a/nova/tests/__init__.py b/nova/tests/__init__.py index dff2ad649..9ff38e3a9 100644 --- a/nova/tests/__init__.py +++ b/nova/tests/__init__.py @@ -44,7 +44,7 @@ from nova import log as logging import eventlet -eventlet.monkey_patch() +eventlet.monkey_patch(os=False) FLAGS = flags.FLAGS FLAGS.use_stderr = False diff --git a/nova/tests/integrated/test_multiprocess_api.py b/nova/tests/integrated/test_multiprocess_api.py new file mode 100644 index 000000000..6fe1479cd --- /dev/null +++ b/nova/tests/integrated/test_multiprocess_api.py @@ -0,0 +1,169 @@ +# Copyright (c) 2012 Intel, LLC +# Copyright (c) 2012 OpenStack, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Test multiprocess enabled API service. +""" +import os +import signal +import time +import traceback + +from nova import flags +from nova.log import logging +from nova import service +from nova.tests.integrated import integrated_helpers + +FLAGS = flags.FLAGS +LOG = logging.getLogger(__name__) + + +class MultiprocessWSGITest(integrated_helpers._IntegratedTestBase): + def _start_api_service(self): + # Process will be started in _spawn() + self.osapi = service.WSGIService("osapi_compute") + self.auth_url = 'http://%s:%s/v2' % (self.osapi.host, self.osapi.port) + LOG.info('auth_url = %s' % self.auth_url) + + def _get_flags(self): + self.workers = 2 + f = super(MultiprocessWSGITest, self)._get_flags() + f['osapi_compute_workers'] = self.workers + return f + + def _spawn(self): + pid = os.fork() + if pid == 0: + # NOTE(johannes): We can't let the child processes exit back + # into the unit test framework since then we'll have multiple + # processes running the same tests (and possibly forking more + # processes that end up in the same situation). So we need + # to catch all exceptions and make sure nothing leaks out, in + # particlar SystemExit, which is raised by sys.exit(). We use + # os._exit() which doesn't have this problem. + status = 0 + try: + launcher = service.ProcessLauncher() + launcher.launch_server(self.osapi, workers=self.osapi.workers) + launcher.wait() + except SystemExit as exc: + status = exc.code + except BaseException: + # We need to be defensive here too + try: + traceback.print_exc() + except BaseException: + print "Couldn't print traceback" + status = 2 + + # Really exit + os._exit(status) + + self.pid = pid + + # Wait for up to a second for workers to get started + start = time.time() + while time.time() - start < 1: + workers = self._get_workers() + if len(workers) == self.workers: + break + + time.sleep(.1) + + self.assertEqual(len(workers), self.workers) + return workers + + def tearDown(self): + if self.pid: + # Make sure all processes are stopped + os.kill(self.pid, signal.SIGTERM) + + # Make sure we reap our test process + self._reap_test() + + super(MultiprocessWSGITest, self).tearDown() + + def _reap_test(self): + pid, status = os.waitpid(self.pid, 0) + self.pid = None + return status + + def _get_workers(self): + f = os.popen('ps ax -o pid,ppid,command') + # Skip ps header + f.readline() + + processes = [tuple(int(p) for p in l.strip().split()[:2]) + for l in f.readlines()] + return [p for p, pp in processes if pp == self.pid] + + def test_killed_worker_recover(self): + start_workers = self._spawn() + + # kill one worker and check if new worker can come up + LOG.info('pid of first child is %s' % start_workers[0]) + os.kill(start_workers[0], signal.SIGTERM) + + # loop and check if new worker is spawned (for 1 second max) + start = time.time() + while time.time() - start < 1: + end_workers = self._get_workers() + LOG.info('workers: %r' % end_workers) + + if start_workers != end_workers: + break + + time.sleep(.1) + + # Make sure worker pids don't match + self.assertNotEqual(start_workers, end_workers) + + # check if api service still works + flavors = self.api.get_flavors() + self.assertTrue(len(flavors) > 0, 'Num of flavors > 0.') + + def _terminate_with_signal(self, sig): + self._spawn() + + # check if api service is working + flavors = self.api.get_flavors() + self.assertTrue(len(flavors) > 0, 'Num of flavors > 0.') + + os.kill(self.pid, sig) + + # loop and check if all processes are killed (for 1 second max) + start = time.time() + while time.time() - start < 1: + workers = self._get_workers() + LOG.info('workers: %r' % workers) + + if not workers: + break + + time.sleep(.1) + + self.assertFalse(workers, 'No OS processes left.') + + def test_terminate_sigkill(self): + self._terminate_with_signal(signal.SIGKILL) + status = self._reap_test() + self.assertTrue(os.WIFSIGNALED(status)) + self.assertEqual(os.WTERMSIG(status), signal.SIGKILL) + + def test_terminate_sigterm(self): + self._terminate_with_signal(signal.SIGTERM) + status = self._reap_test() + self.assertTrue(os.WIFEXITED(status)) + self.assertEqual(os.WEXITSTATUS(status), 0) diff --git a/nova/tests/test_service.py b/nova/tests/test_service.py index 2b6c83653..6aa390a11 100644 --- a/nova/tests/test_service.py +++ b/nova/tests/test_service.py @@ -40,7 +40,7 @@ test_service_opts = [ default="nova.tests.test_service.FakeManager", help="Manager for testing"), cfg.StrOpt("test_service_listen", - default=None, + default='127.0.0.1', help="Host to bind test service to"), cfg.IntOpt("test_service_listen_port", default=0, @@ -202,7 +202,6 @@ class TestWSGIService(test.TestCase): def test_service_random_port(self): test_service = service.WSGIService("test_service") - self.assertEquals(0, test_service.port) test_service.start() self.assertNotEqual(0, test_service.port) test_service.stop() @@ -216,10 +215,7 @@ class TestLauncher(test.TestCase): self.service = service.WSGIService("test_service") def test_launch_app(self): - self.assertEquals(0, self.service.port) launcher = service.Launcher() launcher.launch_server(self.service) - # Give spawned thread a chance to execute - greenthread.sleep(0) self.assertNotEquals(0, self.service.port) launcher.stop() diff --git a/nova/tests/test_wsgi.py b/nova/tests/test_wsgi.py index 20b2bf81e..9f694f4bc 100644 --- a/nova/tests/test_wsgi.py +++ b/nova/tests/test_wsgi.py @@ -84,8 +84,8 @@ class TestWSGIServer(unittest.TestCase): self.assertEquals("test_app", server.name) def test_start_random_port(self): - server = nova.wsgi.Server("test_random_port", None, host="127.0.0.1") - self.assertEqual(0, server.port) + server = nova.wsgi.Server("test_random_port", None, + host="127.0.0.1", port=0) server.start() self.assertNotEqual(0, server.port) server.stop() diff --git a/nova/wsgi.py b/nova/wsgi.py index 436f4df2e..a02fdcc18 100644 --- a/nova/wsgi.py +++ b/nova/wsgi.py @@ -44,8 +44,8 @@ class Server(object): default_pool_size = 1000 - def __init__(self, name, app, host=None, port=None, pool_size=None, - protocol=eventlet.wsgi.HttpProtocol): + def __init__(self, name, app, host='0.0.0.0', port=0, pool_size=None, + protocol=eventlet.wsgi.HttpProtocol, backlog=128): """Initialize, but do not start, a WSGI server. :param name: Pretty name for logging. @@ -53,47 +53,37 @@ class Server(object): :param host: IP address to serve the application. :param port: Port number to server the application. :param pool_size: Maximum number of eventlets to spawn concurrently. + :param backlog: Maximum number of queued connections. :returns: None - + :raises: nova.exception.InvalidInput """ self.name = name self.app = app - self.host = host or "0.0.0.0" - self.port = port or 0 self._server = None - self._socket = None self._protocol = protocol self._pool = eventlet.GreenPool(pool_size or self.default_pool_size) - self._logger = logging.getLogger("eventlet.wsgi.server") + self._logger = logging.getLogger("nova.%s.wsgi.server" % self.name) self._wsgi_logger = logging.WritableLogger(self._logger) - def _start(self): - """Run the blocking eventlet WSGI server. - - :returns: None + if backlog < 1: + raise exception.InvalidInput( + reason='The backlog must be more than 1') - """ - eventlet.wsgi.server(self._socket, - self.app, - protocol=self._protocol, - custom_pool=self._pool, - log=self._wsgi_logger) + self._socket = eventlet.listen((host, port), backlog=backlog) + (self.host, self.port) = self._socket.getsockname() + LOG.info(_("%(name)s listening on %(host)s:%(port)s") % self.__dict__) - def start(self, backlog=128): + def start(self): """Start serving a WSGI application. - :param backlog: Maximum number of queued connections. :returns: None - :raises: nova.exception.InvalidInput - """ - if backlog < 1: - raise exception.InvalidInput( - reason='The backlog must be more than 1') - self._socket = eventlet.listen((self.host, self.port), backlog=backlog) - self._server = eventlet.spawn(self._start) - (self.host, self.port) = self._socket.getsockname() - LOG.info(_("Started %(name)s on %(host)s:%(port)s") % self.__dict__) + self._server = eventlet.spawn(eventlet.wsgi.server, + self._socket, + self.app, + protocol=self._protocol, + custom_pool=self._pool, + log=self._wsgi_logger) def stop(self): """Stop this server. @@ -105,7 +95,11 @@ class Server(object): """ LOG.info(_("Stopping WSGI server.")) - self._server.kill() + + if self._server is not None: + # Resize pool to stop new requests from being processed + self._pool.resize(0) + self._server.kill() def wait(self): """Block, until the server has stopped. |