diff options
Diffstat (limited to 'nova/wsgi.py')
-rw-r--r-- | nova/wsgi.py | 166 |
1 files changed, 154 insertions, 12 deletions
diff --git a/nova/wsgi.py b/nova/wsgi.py index 118fd14e1..27e24e210 100644 --- a/nova/wsgi.py +++ b/nova/wsgi.py @@ -19,12 +19,16 @@ """Utility methods for working with WSGI servers.""" -import os.path +import errno +import os +import signal import sys import eventlet +from eventlet.green import socket import eventlet.wsgi import greenlet +import multiprocessing from paste import deploy import routes.middleware import webob.dec @@ -45,14 +49,15 @@ class Server(object): default_pool_size = 1000 - def __init__(self, name, app, host=None, port=None, pool_size=None, - protocol=eventlet.wsgi.HttpProtocol): + def __init__(self, name, app, host=None, port=None, workers=None, + pool_size=None, protocol=eventlet.wsgi.HttpProtocol): """Initialize, but do not start, a WSGI server. :param name: Pretty name for logging. :param app: The WSGI application to serve. :param host: IP address to serve the application. :param port: Port number to server the application. + :param workers: Number of process to spawn concurrently :param pool_size: Maximum number of eventlets to spawn concurrently. :returns: None @@ -61,12 +66,17 @@ class Server(object): self.app = app self.host = host or "0.0.0.0" self.port = port or 0 + self.workers = workers or 0 self._server = None self._socket = None self._protocol = protocol - self._pool = eventlet.GreenPool(pool_size or self.default_pool_size) - self._logger = logging.getLogger("eventlet.wsgi.server") + self._pool_size = pool_size or self.default_pool_size + self._pool = None + self._logger = logging.getLogger("nova.%s.wsgi.server" % self.name) self._wsgi_logger = logging.WritableLogger(self._logger) + self.master_worker = None + self.children = [] + self.running = True def _start(self): """Run the blocking eventlet WSGI server. @@ -90,11 +100,124 @@ class Server(object): """ if backlog < 1: raise exception.InvalidInput( - reason='The backlog must be more than 1') - self._socket = eventlet.listen((self.host, self.port), backlog=backlog) - self._server = eventlet.spawn(self._start) - (self.host, self.port) = self._socket.getsockname() - LOG.info(_("Started %(name)s on %(host)s:%(port)s") % self.__dict__) + reason='The backlog must be more than 1') + + try: + self._socket = eventlet.listen((self.host, self.port), + backlog=backlog) + (self.host, self.port) = self._socket.getsockname() + except socket.error, err: + if err[0] != errno.EINVAL: + raise + + if self.workers == 0: + # single process mode, useful for profiling, test, debug etc. + self._pool = eventlet.GreenPool(self._pool_size) + self._server = self._pool.spawn(self._start) + LOG.info(_("Started %(name)s on %(host)s:%(port)s") % + self.__dict__) + return None + + # master_worker doesn't actually do work (i.e. handle API request) + # but it's a managing process to handle signal/termination for + # this type of API service, only needed if workers > 1 + self.master_worker = multiprocessing.Process(target=self.run_workers, + args=()) + self.master_worker.start() + self._logger.info(_("Started %(name)s on %(host)s:%(port)s in process") + % self.__dict__) + return None + + def run_server_in_process(self): + """Run a WSGI server.""" + eventlet.wsgi.HttpProtocol.default_request_version = "HTTP/1.0" + eventlet.hubs.use_hub('poll') + eventlet.patcher.monkey_patch(all=False, socket=True) + + self._pool = eventlet.GreenPool(size=self._pool_size) + try: + self._pool.spawn_n(self._start) + except socket.error, err: + if err[0] != errno.EINVAL: + raise + + self._pool.waitall() + + def run_workers(self): + """Start workers and wait for them to join""" + def kill_children(*args): + """Kills the entire process group.""" + #TODO(zhiteng) Gracefully kill all eventlet greenthread + self._logger.error(_('SIGTERM or SIGINT received')) + signal.signal(signal.SIGTERM, signal.SIG_IGN) + signal.signal(signal.SIGINT, signal.SIG_IGN) + self.running = False + for pid in self.children: + os.kill(pid, signal.SIGTERM) + + def hup(*args): + """ + Shuts down the server, but allows running requests to complete + """ + self._logger.error(_('SIGHUP received')) + signal.signal(signal.SIGHUP, signal.SIG_IGN) + self.running = False + + signal.signal(signal.SIGTERM, kill_children) + signal.signal(signal.SIGINT, kill_children) + signal.signal(signal.SIGHUP, hup) + + while len(self.children) < self.workers: + self.run_child() + + self._logger.info(_("Started %(children_count)d worker for %(name)s") + % {'children_count': len(self.children), + 'name': self.name}) + + self.wait_on_children() + + def run_child(self): + try: + pid = os.fork() + except KeyboardInterrupt: + pass + if pid == 0: + signal.signal(signal.SIGHUP, signal.SIG_DFL) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + # to avoid race condition that child receive signal before + # parent and is respawned + signal.signal(signal.SIGINT, signal.SIG_IGN) + try: + self.run_server_in_process() + except KeyboardInterrupt: + pass + self._logger.info(_('Child %d exiting normally') % os.getpid()) + return None + else: + self._logger.info(_('[%(name)s] Started worker (pid: %(pid)s)') % + {'name': self.name, + 'pid': pid}) + self.children.append(pid) + + def wait_on_children(self): + while self.running: + try: + pid, status = os.wait() + if os.WIFEXITED(status) or os.WIFSIGNALED(status): + self._logger.error(_('Dead worker %(pid)s') % locals()) + if pid in self.children: + self.children.remove(pid) + self.run_child() + except OSError, err: + if err.errno not in (errno.EINTR, errno.ECHILD): + raise + except KeyboardInterrupt: + self._logger.info(_('Caught keyboard interrupt. Exiting.')) + self.running = False + break + eventlet.greenio.shutdown_safe(self._socket) + self._socket.close() + self._logger.debug(_('Exited')) def stop(self): """Stop this server. @@ -106,7 +229,19 @@ class Server(object): """ LOG.info(_("Stopping WSGI server.")) - self._server.kill() + if self.workers > 0: + # set running state to false and kill all workers + self.running = False + for pid in self.children: + os.kill(pid, signal.SIGTERM) + self.children.remove(pid) + # now terminate master_worker + if self.master_worker.is_alive(): + self.master_worker.terminate() + else: + # Resize Pool to stop accepting new connection + self._pool.resize(0) + self._server.kill() def wait(self): """Block, until the server has stopped. @@ -117,9 +252,16 @@ class Server(object): """ try: - self._server.wait() + if self.workers and self.master_worker: + # for services enabled multi-process,a separate master_worker + # is already waiting + pass + else: + self._pool.waitall() except greenlet.GreenletExit: LOG.info(_("WSGI server has stopped.")) + except KeyboardInterrupt: + pass class Request(webob.Request): |