summaryrefslogtreecommitdiffstats
path: root/nova/wsgi.py
diff options
context:
space:
mode:
Diffstat (limited to 'nova/wsgi.py')
-rw-r--r--nova/wsgi.py166
1 files changed, 154 insertions, 12 deletions
diff --git a/nova/wsgi.py b/nova/wsgi.py
index 118fd14e1..27e24e210 100644
--- a/nova/wsgi.py
+++ b/nova/wsgi.py
@@ -19,12 +19,16 @@
"""Utility methods for working with WSGI servers."""
-import os.path
+import errno
+import os
+import signal
import sys
import eventlet
+from eventlet.green import socket
import eventlet.wsgi
import greenlet
+import multiprocessing
from paste import deploy
import routes.middleware
import webob.dec
@@ -45,14 +49,15 @@ class Server(object):
default_pool_size = 1000
- def __init__(self, name, app, host=None, port=None, pool_size=None,
- protocol=eventlet.wsgi.HttpProtocol):
+ def __init__(self, name, app, host=None, port=None, workers=None,
+ pool_size=None, protocol=eventlet.wsgi.HttpProtocol):
"""Initialize, but do not start, a WSGI server.
:param name: Pretty name for logging.
:param app: The WSGI application to serve.
:param host: IP address to serve the application.
:param port: Port number to server the application.
+ :param workers: Number of process to spawn concurrently
:param pool_size: Maximum number of eventlets to spawn concurrently.
:returns: None
@@ -61,12 +66,17 @@ class Server(object):
self.app = app
self.host = host or "0.0.0.0"
self.port = port or 0
+ self.workers = workers or 0
self._server = None
self._socket = None
self._protocol = protocol
- self._pool = eventlet.GreenPool(pool_size or self.default_pool_size)
- self._logger = logging.getLogger("eventlet.wsgi.server")
+ self._pool_size = pool_size or self.default_pool_size
+ self._pool = None
+ self._logger = logging.getLogger("nova.%s.wsgi.server" % self.name)
self._wsgi_logger = logging.WritableLogger(self._logger)
+ self.master_worker = None
+ self.children = []
+ self.running = True
def _start(self):
"""Run the blocking eventlet WSGI server.
@@ -90,11 +100,124 @@ class Server(object):
"""
if backlog < 1:
raise exception.InvalidInput(
- reason='The backlog must be more than 1')
- self._socket = eventlet.listen((self.host, self.port), backlog=backlog)
- self._server = eventlet.spawn(self._start)
- (self.host, self.port) = self._socket.getsockname()
- LOG.info(_("Started %(name)s on %(host)s:%(port)s") % self.__dict__)
+ reason='The backlog must be more than 1')
+
+ try:
+ self._socket = eventlet.listen((self.host, self.port),
+ backlog=backlog)
+ (self.host, self.port) = self._socket.getsockname()
+ except socket.error, err:
+ if err[0] != errno.EINVAL:
+ raise
+
+ if self.workers == 0:
+ # single process mode, useful for profiling, test, debug etc.
+ self._pool = eventlet.GreenPool(self._pool_size)
+ self._server = self._pool.spawn(self._start)
+ LOG.info(_("Started %(name)s on %(host)s:%(port)s") %
+ self.__dict__)
+ return None
+
+ # master_worker doesn't actually do work (i.e. handle API request)
+ # but it's a managing process to handle signal/termination for
+ # this type of API service, only needed if workers > 1
+ self.master_worker = multiprocessing.Process(target=self.run_workers,
+ args=())
+ self.master_worker.start()
+ self._logger.info(_("Started %(name)s on %(host)s:%(port)s in process")
+ % self.__dict__)
+ return None
+
+ def run_server_in_process(self):
+ """Run a WSGI server."""
+ eventlet.wsgi.HttpProtocol.default_request_version = "HTTP/1.0"
+ eventlet.hubs.use_hub('poll')
+ eventlet.patcher.monkey_patch(all=False, socket=True)
+
+ self._pool = eventlet.GreenPool(size=self._pool_size)
+ try:
+ self._pool.spawn_n(self._start)
+ except socket.error, err:
+ if err[0] != errno.EINVAL:
+ raise
+
+ self._pool.waitall()
+
+ def run_workers(self):
+ """Start workers and wait for them to join"""
+ def kill_children(*args):
+ """Kills the entire process group."""
+ #TODO(zhiteng) Gracefully kill all eventlet greenthread
+ self._logger.error(_('SIGTERM or SIGINT received'))
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ self.running = False
+ for pid in self.children:
+ os.kill(pid, signal.SIGTERM)
+
+ def hup(*args):
+ """
+ Shuts down the server, but allows running requests to complete
+ """
+ self._logger.error(_('SIGHUP received'))
+ signal.signal(signal.SIGHUP, signal.SIG_IGN)
+ self.running = False
+
+ signal.signal(signal.SIGTERM, kill_children)
+ signal.signal(signal.SIGINT, kill_children)
+ signal.signal(signal.SIGHUP, hup)
+
+ while len(self.children) < self.workers:
+ self.run_child()
+
+ self._logger.info(_("Started %(children_count)d worker for %(name)s")
+ % {'children_count': len(self.children),
+ 'name': self.name})
+
+ self.wait_on_children()
+
+ def run_child(self):
+ try:
+ pid = os.fork()
+ except KeyboardInterrupt:
+ pass
+ if pid == 0:
+ signal.signal(signal.SIGHUP, signal.SIG_DFL)
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ # to avoid race condition that child receive signal before
+ # parent and is respawned
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ try:
+ self.run_server_in_process()
+ except KeyboardInterrupt:
+ pass
+ self._logger.info(_('Child %d exiting normally') % os.getpid())
+ return None
+ else:
+ self._logger.info(_('[%(name)s] Started worker (pid: %(pid)s)') %
+ {'name': self.name,
+ 'pid': pid})
+ self.children.append(pid)
+
+ def wait_on_children(self):
+ while self.running:
+ try:
+ pid, status = os.wait()
+ if os.WIFEXITED(status) or os.WIFSIGNALED(status):
+ self._logger.error(_('Dead worker %(pid)s') % locals())
+ if pid in self.children:
+ self.children.remove(pid)
+ self.run_child()
+ except OSError, err:
+ if err.errno not in (errno.EINTR, errno.ECHILD):
+ raise
+ except KeyboardInterrupt:
+ self._logger.info(_('Caught keyboard interrupt. Exiting.'))
+ self.running = False
+ break
+ eventlet.greenio.shutdown_safe(self._socket)
+ self._socket.close()
+ self._logger.debug(_('Exited'))
def stop(self):
"""Stop this server.
@@ -106,7 +229,19 @@ class Server(object):
"""
LOG.info(_("Stopping WSGI server."))
- self._server.kill()
+ if self.workers > 0:
+ # set running state to false and kill all workers
+ self.running = False
+ for pid in self.children:
+ os.kill(pid, signal.SIGTERM)
+ self.children.remove(pid)
+ # now terminate master_worker
+ if self.master_worker.is_alive():
+ self.master_worker.terminate()
+ else:
+ # Resize Pool to stop accepting new connection
+ self._pool.resize(0)
+ self._server.kill()
def wait(self):
"""Block, until the server has stopped.
@@ -117,9 +252,16 @@ class Server(object):
"""
try:
- self._server.wait()
+ if self.workers and self.master_worker:
+ # for services enabled multi-process,a separate master_worker
+ # is already waiting
+ pass
+ else:
+ self._pool.waitall()
except greenlet.GreenletExit:
LOG.info(_("WSGI server has stopped."))
+ except KeyboardInterrupt:
+ pass
class Request(webob.Request):