diff options
author | Zhiteng Huang <zhiteng.huang@intel.com> | 2012-03-25 02:06:01 +0800 |
---|---|---|
committer | Zhiteng Huang <zhiteng.huang@intel.com> | 2012-06-02 00:34:55 +0800 |
commit | e599636d09755f635604f64f17e9f56cac14575e (patch) | |
tree | dc729c51267d8ab976422463a3a95b070ddd6135 /nova/wsgi.py | |
parent | 31108020fc237624e244f08658646e2f119506db (diff) | |
download | nova-e599636d09755f635604f64f17e9f56cac14575e.tar.gz nova-e599636d09755f635604f64f17e9f56cac14575e.tar.xz nova-e599636d09755f635604f64f17e9f56cac14575e.zip |
blueprint <multi-process-api-service>
Add multiprocess support for API serivces (EC2/OSAPI_Compute/OSAPI_Volume/Metadata).
2012-06-1 v7:
* Add unittest to cover worker recovery, service termination functionality
in wsgi.py, fix python 2.6 compatibility issue.
* Modify generate_uid() to introduce per-process seeds in utils.py to avoid
collisions.
* Add worker session to nova.conf.sample.
2012-05-21 v6:
* Fix 'test_wsgi' unittest error.
2012-04-28 v5:
* Add SIGINT handler and fix child-parent race condition when Ctrl+C is
pressed.
2012-03-31 v4:
* Fixed typo, removed debug code.
2012-03-30 v3:
* Fixed localization/pep8 error in unittest, add metadata test.
* nova/wsgi.py:Server: use the greenthread pool created for each process.
* nova/service.py: remove debug code
2012-03-27 v2:
* Fixed unittest error.
* nova/wsgi.py:Server: Use self._logger to do logging in multiprocess mode.
* nova/wsgi.py:Server: Move self._pool creation into proper place.
* code style fix.
2012-03-25 v1:
* Modification to nova/service.py and nova/wsgi.py in order to support
multiprocess (a.k.a. workers) for various API services. If multiprocess
mode is enabled, (i.e. flags 'APINAME_workers' set to positive numbers),
corresponding API service will run in target number of process(es). There
is also a master_worker process spawned for managing all workers (handling
signal/termination).
* Add unittest for multiprocess API service, also alter testing/runner.py
to adopt new unittest.
Change-Id: Ia045e595543ddfd192894b2a05801cc4b7ca90cb
Diffstat (limited to 'nova/wsgi.py')
-rw-r--r-- | nova/wsgi.py | 166 |
1 files changed, 154 insertions, 12 deletions
diff --git a/nova/wsgi.py b/nova/wsgi.py index 118fd14e1..27e24e210 100644 --- a/nova/wsgi.py +++ b/nova/wsgi.py @@ -19,12 +19,16 @@ """Utility methods for working with WSGI servers.""" -import os.path +import errno +import os +import signal import sys import eventlet +from eventlet.green import socket import eventlet.wsgi import greenlet +import multiprocessing from paste import deploy import routes.middleware import webob.dec @@ -45,14 +49,15 @@ class Server(object): default_pool_size = 1000 - def __init__(self, name, app, host=None, port=None, pool_size=None, - protocol=eventlet.wsgi.HttpProtocol): + def __init__(self, name, app, host=None, port=None, workers=None, + pool_size=None, protocol=eventlet.wsgi.HttpProtocol): """Initialize, but do not start, a WSGI server. :param name: Pretty name for logging. :param app: The WSGI application to serve. :param host: IP address to serve the application. :param port: Port number to server the application. + :param workers: Number of process to spawn concurrently :param pool_size: Maximum number of eventlets to spawn concurrently. :returns: None @@ -61,12 +66,17 @@ class Server(object): self.app = app self.host = host or "0.0.0.0" self.port = port or 0 + self.workers = workers or 0 self._server = None self._socket = None self._protocol = protocol - self._pool = eventlet.GreenPool(pool_size or self.default_pool_size) - self._logger = logging.getLogger("eventlet.wsgi.server") + self._pool_size = pool_size or self.default_pool_size + self._pool = None + self._logger = logging.getLogger("nova.%s.wsgi.server" % self.name) self._wsgi_logger = logging.WritableLogger(self._logger) + self.master_worker = None + self.children = [] + self.running = True def _start(self): """Run the blocking eventlet WSGI server. @@ -90,11 +100,124 @@ class Server(object): """ if backlog < 1: raise exception.InvalidInput( - reason='The backlog must be more than 1') - self._socket = eventlet.listen((self.host, self.port), backlog=backlog) - self._server = eventlet.spawn(self._start) - (self.host, self.port) = self._socket.getsockname() - LOG.info(_("Started %(name)s on %(host)s:%(port)s") % self.__dict__) + reason='The backlog must be more than 1') + + try: + self._socket = eventlet.listen((self.host, self.port), + backlog=backlog) + (self.host, self.port) = self._socket.getsockname() + except socket.error, err: + if err[0] != errno.EINVAL: + raise + + if self.workers == 0: + # single process mode, useful for profiling, test, debug etc. + self._pool = eventlet.GreenPool(self._pool_size) + self._server = self._pool.spawn(self._start) + LOG.info(_("Started %(name)s on %(host)s:%(port)s") % + self.__dict__) + return None + + # master_worker doesn't actually do work (i.e. handle API request) + # but it's a managing process to handle signal/termination for + # this type of API service, only needed if workers > 1 + self.master_worker = multiprocessing.Process(target=self.run_workers, + args=()) + self.master_worker.start() + self._logger.info(_("Started %(name)s on %(host)s:%(port)s in process") + % self.__dict__) + return None + + def run_server_in_process(self): + """Run a WSGI server.""" + eventlet.wsgi.HttpProtocol.default_request_version = "HTTP/1.0" + eventlet.hubs.use_hub('poll') + eventlet.patcher.monkey_patch(all=False, socket=True) + + self._pool = eventlet.GreenPool(size=self._pool_size) + try: + self._pool.spawn_n(self._start) + except socket.error, err: + if err[0] != errno.EINVAL: + raise + + self._pool.waitall() + + def run_workers(self): + """Start workers and wait for them to join""" + def kill_children(*args): + """Kills the entire process group.""" + #TODO(zhiteng) Gracefully kill all eventlet greenthread + self._logger.error(_('SIGTERM or SIGINT received')) + signal.signal(signal.SIGTERM, signal.SIG_IGN) + signal.signal(signal.SIGINT, signal.SIG_IGN) + self.running = False + for pid in self.children: + os.kill(pid, signal.SIGTERM) + + def hup(*args): + """ + Shuts down the server, but allows running requests to complete + """ + self._logger.error(_('SIGHUP received')) + signal.signal(signal.SIGHUP, signal.SIG_IGN) + self.running = False + + signal.signal(signal.SIGTERM, kill_children) + signal.signal(signal.SIGINT, kill_children) + signal.signal(signal.SIGHUP, hup) + + while len(self.children) < self.workers: + self.run_child() + + self._logger.info(_("Started %(children_count)d worker for %(name)s") + % {'children_count': len(self.children), + 'name': self.name}) + + self.wait_on_children() + + def run_child(self): + try: + pid = os.fork() + except KeyboardInterrupt: + pass + if pid == 0: + signal.signal(signal.SIGHUP, signal.SIG_DFL) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + # to avoid race condition that child receive signal before + # parent and is respawned + signal.signal(signal.SIGINT, signal.SIG_IGN) + try: + self.run_server_in_process() + except KeyboardInterrupt: + pass + self._logger.info(_('Child %d exiting normally') % os.getpid()) + return None + else: + self._logger.info(_('[%(name)s] Started worker (pid: %(pid)s)') % + {'name': self.name, + 'pid': pid}) + self.children.append(pid) + + def wait_on_children(self): + while self.running: + try: + pid, status = os.wait() + if os.WIFEXITED(status) or os.WIFSIGNALED(status): + self._logger.error(_('Dead worker %(pid)s') % locals()) + if pid in self.children: + self.children.remove(pid) + self.run_child() + except OSError, err: + if err.errno not in (errno.EINTR, errno.ECHILD): + raise + except KeyboardInterrupt: + self._logger.info(_('Caught keyboard interrupt. Exiting.')) + self.running = False + break + eventlet.greenio.shutdown_safe(self._socket) + self._socket.close() + self._logger.debug(_('Exited')) def stop(self): """Stop this server. @@ -106,7 +229,19 @@ class Server(object): """ LOG.info(_("Stopping WSGI server.")) - self._server.kill() + if self.workers > 0: + # set running state to false and kill all workers + self.running = False + for pid in self.children: + os.kill(pid, signal.SIGTERM) + self.children.remove(pid) + # now terminate master_worker + if self.master_worker.is_alive(): + self.master_worker.terminate() + else: + # Resize Pool to stop accepting new connection + self._pool.resize(0) + self._server.kill() def wait(self): """Block, until the server has stopped. @@ -117,9 +252,16 @@ class Server(object): """ try: - self._server.wait() + if self.workers and self.master_worker: + # for services enabled multi-process,a separate master_worker + # is already waiting + pass + else: + self._pool.waitall() except greenlet.GreenletExit: LOG.info(_("WSGI server has stopped.")) + except KeyboardInterrupt: + pass class Request(webob.Request): |