diff options
-rw-r--r-- | Authors | 1 | ||||
-rwxr-xr-x | bin/nova-api | 2 | ||||
-rwxr-xr-x | bin/nova-api-ec2 | 2 | ||||
-rwxr-xr-x | bin/nova-api-metadata | 2 | ||||
-rwxr-xr-x | bin/nova-api-os-compute | 2 | ||||
-rwxr-xr-x | bin/nova-api-os-volume | 2 | ||||
-rw-r--r-- | etc/nova/nova.conf.sample | 12 | ||||
-rw-r--r-- | nova/service.py | 26 | ||||
-rw-r--r-- | nova/testing/runner.py | 2 | ||||
-rw-r--r-- | nova/tests/integrated/test_multiprocess_api.py | 186 | ||||
-rw-r--r-- | nova/utils.py | 8 | ||||
-rw-r--r-- | nova/wsgi.py | 166 |
12 files changed, 383 insertions, 28 deletions
@@ -213,6 +213,7 @@ Yun Mao <yunmao@gmail.com> Yun Shen <Yun.Shen@hp.com> Yuriy Taraday <yorik.sar@gmail.com> Zed Shaw <zedshaw@zedshaw.com> +Zhiteng Huang <zhiteng.huang@intel.com> Zhixue Wu <Zhixue.Wu@citrix.com> Zhongyue Luo <lzyeval@gmail.com> Ziad Sawalha <github@highbridgellc.com> diff --git a/bin/nova-api b/bin/nova-api index e6779df4f..5d15fc7c4 100755 --- a/bin/nova-api +++ b/bin/nova-api @@ -24,7 +24,7 @@ Starts both the EC2 and OpenStack APIs in separate greenthreads. """ import eventlet -eventlet.monkey_patch() +eventlet.monkey_patch(os=False) import os import sys diff --git a/bin/nova-api-ec2 b/bin/nova-api-ec2 index b53c9158a..6d94466d6 100755 --- a/bin/nova-api-ec2 +++ b/bin/nova-api-ec2 @@ -20,7 +20,7 @@ """Starter script for Nova EC2 API.""" import eventlet -eventlet.monkey_patch() +eventlet.monkey_patch(os=False) import os import sys diff --git a/bin/nova-api-metadata b/bin/nova-api-metadata index 2f2ef9454..3d78cfcee 100755 --- a/bin/nova-api-metadata +++ b/bin/nova-api-metadata @@ -20,7 +20,7 @@ """Starter script for Nova Metadata API.""" import eventlet -eventlet.monkey_patch() +eventlet.monkey_patch(os=False) import os import sys diff --git a/bin/nova-api-os-compute b/bin/nova-api-os-compute index 75c921943..978a14868 100755 --- a/bin/nova-api-os-compute +++ b/bin/nova-api-os-compute @@ -20,7 +20,7 @@ """Starter script for Nova OS API.""" import eventlet -eventlet.monkey_patch() +eventlet.monkey_patch(os=False) import os import sys diff --git a/bin/nova-api-os-volume b/bin/nova-api-os-volume index b93fd51ae..d891d0754 100755 --- a/bin/nova-api-os-volume +++ b/bin/nova-api-os-volume @@ -20,7 +20,7 @@ """Starter script for Nova OS API.""" import eventlet -eventlet.monkey_patch() +eventlet.monkey_patch(os=False) import os import sys diff --git a/etc/nova/nova.conf.sample b/etc/nova/nova.conf.sample index 8726f5dcb..bc283c5e3 100644 --- a/etc/nova/nova.conf.sample +++ b/etc/nova/nova.conf.sample @@ -489,12 +489,18 @@ # ec2_listen_port=8773 #### (IntOpt) port for ec2 api to listen +# ec2_workers=0 +#### (IntOpt) Number of EC2 API workers + # osapi_compute_listen=0.0.0.0 #### (StrOpt) IP address for OpenStack API to listen # osapi_compute_listen_port=8774 #### (IntOpt) list port for osapi compute +# osapi_compute_workers=0 +#### (IntOpt) Number of workers for OpenStack API + # metadata_manager=nova.api.manager.MetadataManager #### (StrOpt) OpenStack metadata service manager @@ -504,12 +510,18 @@ # metadata_listen_port=8775 #### (IntOpt) port for metadata api to listen +# metadata_workers=0 +#### (IntOpt) Number of workers for metadata API + # osapi_volume_listen=0.0.0.0 #### (StrOpt) IP address for OpenStack Volume API to listen # osapi_volume_listen_port=8776 #### (IntOpt) port for os volume api to listen +# osapi_volume_workers=0 +#### (IntOpt) Number of workers for OpenStack Volume API + ######## defined in nova.test ######## diff --git a/nova/service.py b/nova/service.py index b179cda6c..136668311 100644 --- a/nova/service.py +++ b/nova/service.py @@ -61,12 +61,18 @@ service_opts = [ cfg.IntOpt('ec2_listen_port', default=8773, help='port for ec2 api to listen'), + cfg.IntOpt('ec2_workers', + default=0, + help='Number of workers for EC2 API service'), cfg.StrOpt('osapi_compute_listen', default="0.0.0.0", help='IP address for OpenStack API to listen'), cfg.IntOpt('osapi_compute_listen_port', default=8774, help='list port for osapi compute'), + cfg.IntOpt('osapi_compute_workers', + default=0, + help='Number of workers for OpenStack API service'), cfg.StrOpt('metadata_manager', default='nova.api.manager.MetadataManager', help='OpenStack metadata service manager'), @@ -76,12 +82,18 @@ service_opts = [ cfg.IntOpt('metadata_listen_port', default=8775, help='port for metadata api to listen'), + cfg.IntOpt('metadata_workers', + default=0, + help='Number of workers for metadata service'), cfg.StrOpt('osapi_volume_listen', default="0.0.0.0", help='IP address for OpenStack Volume API to listen'), cfg.IntOpt('osapi_volume_listen_port', default=8776, - help='port for os volume api to listen') + help='port for os volume api to listen'), + cfg.IntOpt('osapi_volume_workers', + default=0, + help='Number of workers for OpenStack Volume API service') ] FLAGS = flags.FLAGS @@ -135,14 +147,6 @@ class Launcher(object): :returns: None """ - def sigterm(sig, frame): - LOG.audit(_("SIGTERM received")) - # NOTE(jk0): Raise a ^C which is caught by the caller and cleanly - # shuts down the service. This does not yet handle eventlet - # threads. - raise KeyboardInterrupt - - signal.signal(signal.SIGTERM, sigterm) for service in self._services: try: @@ -362,10 +366,12 @@ class WSGIService(object): self.app = self.loader.load_app(name) self.host = getattr(FLAGS, '%s_listen' % name, "0.0.0.0") self.port = getattr(FLAGS, '%s_listen_port' % name, 0) + self.workers = getattr(FLAGS, '%s_workers' % name, 0) self.server = wsgi.Server(name, self.app, host=self.host, - port=self.port) + port=self.port, + workers=self.workers) def _get_manager(self): """Initialize a Manager object appropriate for this service. diff --git a/nova/testing/runner.py b/nova/testing/runner.py index 2b5004394..cdba02acb 100644 --- a/nova/testing/runner.py +++ b/nova/testing/runner.py @@ -366,5 +366,5 @@ def run(): if __name__ == '__main__': - eventlet.monkey_patch() + eventlet.monkey_patch(os=False) run() diff --git a/nova/tests/integrated/test_multiprocess_api.py b/nova/tests/integrated/test_multiprocess_api.py new file mode 100644 index 000000000..e8d174993 --- /dev/null +++ b/nova/tests/integrated/test_multiprocess_api.py @@ -0,0 +1,186 @@ +# Copyright (c) 2012 Intel, LLC +# Copyright (c) 2012 OpenStack, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Test multiprocess enabled EC2/OSAPI_Compute/OSAPI_Volume/Metadata API service. +""" +import boto +from boto.ec2 import regioninfo +import os +import signal +import sys +import time + +from nova import flags +from nova.log import logging +from nova import service +from nova.tests.integrated import integrated_helpers + +FLAGS = flags.FLAGS +LOG = logging.getLogger(__name__) + + +class MultiprocessEC2Test(integrated_helpers._IntegratedTestBase): + def _start_api_service(self): + self.osapi = service.WSGIService("ec2") + self.osapi.start() + self.auth_url = 'http://%s:%s/services/Cloud' % \ + (self.osapi.host, self.osapi.port) + LOG.warn(self.auth_url) + + def _get_flags(self): + f = super(MultiprocessEC2Test, self)._get_flags() + f['ec2_workers'] = 2 + return f + + def test_ec2(self): + region = regioninfo.RegionInfo(None, 'test', self.osapi.host) + self.ec2 = boto.connect_ec2( + aws_access_key_id='fake', + aws_secret_access_key='fake', + is_secure=False, + region=region, + host=self.osapi.host, + port=self.osapi.port, + path='/services/Cloud') + result = self.ec2.get_all_regions() + self.assertEqual(len(result), 1) + + +class MultiprocessMetadataTest(integrated_helpers._IntegratedTestBase): + def _start_api_service(self): + self.osapi = service.WSGIService("metadata") + self.osapi.start() + self.auth_url = 'http://%s:%s/' % (self.osapi.host, self.osapi.port) + LOG.warn(self.auth_url) + + def _get_flags(self): + f = super(MultiprocessMetadataTest, self)._get_flags() + f['metadata_workers'] = 2 + return f + + def request(self, relative_url): + return self.api.api_get(relative_url) + + def test_meta(self): + userdata_url = self.auth_url + '/user-data' + resp = self.api.request(userdata_url) + self.assertEqual(resp.status, 200) + + +class MultiprocessOSAPIComputeTest(integrated_helpers._IntegratedTestBase): + def _start_api_service(self): + self.osapi = service.WSGIService("osapi_compute") + self.osapi.start() + self.auth_url = 'http://%s:%s/v2' % (self.osapi.host, self.osapi.port) + LOG.warn(self.auth_url) + + def _get_flags(self): + f = super(MultiprocessOSAPIComputeTest, self)._get_flags() + f['osapi_compute_workers'] = 2 + return f + + def test_osapi_compute(self): + flavors = self.api.get_flavors() + self.assertTrue(len(flavors) > 0, 'Num of flavors > 0.') + + +class MultiprocessOSAPIVolumesTest(integrated_helpers._IntegratedTestBase): + def _start_api_service(self): + self.osapi = service.WSGIService("osapi_volume") + self.osapi.start() + self.auth_url = 'http://%s:%s/v1' % (self.osapi.host, self.osapi.port) + LOG.warn(self.auth_url) + + def _get_flags(self): + f = super(MultiprocessOSAPIVolumesTest, self)._get_flags() + f['osapi_volume_workers'] = 2 + f['use_local_volumes'] = False # Avoids calling local_path + f['volume_driver'] = 'nova.volume.driver.LoggingVolumeDriver' + return f + + def test_create_volumes(self): + """Create Volume with API""" + body = {'volume': {'size': 1, + 'snapshot_id': None, + 'display_name': None, + 'display_description': None, + 'volume_type': None}} + created_volume = self.api.post_volume(body) + self.assertTrue(created_volume['id']) + + +class MultiprocessWSGITest(integrated_helpers._IntegratedTestBase): + def setUp(self): + self.workers = 4 + super(MultiprocessWSGITest, self).setUp() + + def _start_api_service(self): + self.osapi = service.WSGIService("osapi_compute") + self.osapi.start() + self.master_worker_pid = self.osapi.server.master_worker.pid + LOG.warn('Master_work pid is: %d' % self.master_worker_pid) + self.auth_url = 'http://%s:%s/v2' % (self.osapi.host, self.osapi.port) + LOG.warn(self.auth_url) + + def _get_flags(self): + f = super(MultiprocessWSGITest, self)._get_flags() + f['osapi_compute_workers'] = self.workers + return f + + def test_killed_worker_recover(self): + # kill one worker and check if new worker can come up + f = os.popen('ps --no-headers --ppid %d' % self.master_worker_pid) + children_pid = f.readline().split() + for pid in children_pid: + LOG.warn('pid of first child is %s' % pid) + if pid.isdigit(): + os.kill(int(pid), signal.SIGTERM) + break + else: + continue + # wait for new worker + time.sleep(1.5) + f = os.popen('ps --no-headers --ppid %d|wc -l' % + self.master_worker_pid) + workers = f.readline() + LOG.warn('# of workers: %s' % workers) + self.assertEqual(int(workers), self.workers, + 'Num of children = %d.' % self.workers) + flavors = self.api.get_flavors() + # check if api service works + self.assertTrue(len(flavors) > 0, 'Num of flavors > 0.') + + def test_terminate_api_with_signal(self): + # check if api service is working + flavors = self.api.get_flavors() + self.assertTrue(len(flavors) > 0, 'Num of flavors > 0.') + # send SIGTERM to master_worker will terminate service + os.kill(self.master_worker_pid, signal.SIGTERM) + time.sleep(1.5) + # check if service is still available (shouldn't be) + #""" + try: + self.api.get_flavors() + self.fail('API service should have been terminated') + except Exception as ex: + exc_value = sys.exc_info()[1] + self.assertTrue('Connection refused' in exc_value or + 'ECONNREFUSED' in exc_value) + #""" + #self.api.get_flavors() + # check there is no OS processes left over + f = os.popen('ps --no-headers --ppid %d' % self.master_worker_pid) + self.assertEqual(f.readline(), '', 'No OS processes left.') diff --git a/nova/utils.py b/nova/utils.py index ae99212e9..403a6d960 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -63,6 +63,7 @@ LOG = logging.getLogger(__name__) ISO_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S" PERFECT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f" FLAGS = flags.FLAGS +RESEED = True FLAGS.register_opt( cfg.BoolOpt('disable_process_locking', default=False, @@ -290,6 +291,13 @@ def debug(arg): def generate_uid(topic, size=8): + global RESEED + if RESEED: + random.seed("%d%s%s" % (os.getpid(), + socket.gethostname(), + time.time())) + RESEED = False + characters = '01234567890abcdefghijklmnopqrstuvwxyz' choices = [random.choice(characters) for _x in xrange(size)] return '%s-%s' % (topic, ''.join(choices)) diff --git a/nova/wsgi.py b/nova/wsgi.py index 118fd14e1..27e24e210 100644 --- a/nova/wsgi.py +++ b/nova/wsgi.py @@ -19,12 +19,16 @@ """Utility methods for working with WSGI servers.""" -import os.path +import errno +import os +import signal import sys import eventlet +from eventlet.green import socket import eventlet.wsgi import greenlet +import multiprocessing from paste import deploy import routes.middleware import webob.dec @@ -45,14 +49,15 @@ class Server(object): default_pool_size = 1000 - def __init__(self, name, app, host=None, port=None, pool_size=None, - protocol=eventlet.wsgi.HttpProtocol): + def __init__(self, name, app, host=None, port=None, workers=None, + pool_size=None, protocol=eventlet.wsgi.HttpProtocol): """Initialize, but do not start, a WSGI server. :param name: Pretty name for logging. :param app: The WSGI application to serve. :param host: IP address to serve the application. :param port: Port number to server the application. + :param workers: Number of process to spawn concurrently :param pool_size: Maximum number of eventlets to spawn concurrently. :returns: None @@ -61,12 +66,17 @@ class Server(object): self.app = app self.host = host or "0.0.0.0" self.port = port or 0 + self.workers = workers or 0 self._server = None self._socket = None self._protocol = protocol - self._pool = eventlet.GreenPool(pool_size or self.default_pool_size) - self._logger = logging.getLogger("eventlet.wsgi.server") + self._pool_size = pool_size or self.default_pool_size + self._pool = None + self._logger = logging.getLogger("nova.%s.wsgi.server" % self.name) self._wsgi_logger = logging.WritableLogger(self._logger) + self.master_worker = None + self.children = [] + self.running = True def _start(self): """Run the blocking eventlet WSGI server. @@ -90,11 +100,124 @@ class Server(object): """ if backlog < 1: raise exception.InvalidInput( - reason='The backlog must be more than 1') - self._socket = eventlet.listen((self.host, self.port), backlog=backlog) - self._server = eventlet.spawn(self._start) - (self.host, self.port) = self._socket.getsockname() - LOG.info(_("Started %(name)s on %(host)s:%(port)s") % self.__dict__) + reason='The backlog must be more than 1') + + try: + self._socket = eventlet.listen((self.host, self.port), + backlog=backlog) + (self.host, self.port) = self._socket.getsockname() + except socket.error, err: + if err[0] != errno.EINVAL: + raise + + if self.workers == 0: + # single process mode, useful for profiling, test, debug etc. + self._pool = eventlet.GreenPool(self._pool_size) + self._server = self._pool.spawn(self._start) + LOG.info(_("Started %(name)s on %(host)s:%(port)s") % + self.__dict__) + return None + + # master_worker doesn't actually do work (i.e. handle API request) + # but it's a managing process to handle signal/termination for + # this type of API service, only needed if workers > 1 + self.master_worker = multiprocessing.Process(target=self.run_workers, + args=()) + self.master_worker.start() + self._logger.info(_("Started %(name)s on %(host)s:%(port)s in process") + % self.__dict__) + return None + + def run_server_in_process(self): + """Run a WSGI server.""" + eventlet.wsgi.HttpProtocol.default_request_version = "HTTP/1.0" + eventlet.hubs.use_hub('poll') + eventlet.patcher.monkey_patch(all=False, socket=True) + + self._pool = eventlet.GreenPool(size=self._pool_size) + try: + self._pool.spawn_n(self._start) + except socket.error, err: + if err[0] != errno.EINVAL: + raise + + self._pool.waitall() + + def run_workers(self): + """Start workers and wait for them to join""" + def kill_children(*args): + """Kills the entire process group.""" + #TODO(zhiteng) Gracefully kill all eventlet greenthread + self._logger.error(_('SIGTERM or SIGINT received')) + signal.signal(signal.SIGTERM, signal.SIG_IGN) + signal.signal(signal.SIGINT, signal.SIG_IGN) + self.running = False + for pid in self.children: + os.kill(pid, signal.SIGTERM) + + def hup(*args): + """ + Shuts down the server, but allows running requests to complete + """ + self._logger.error(_('SIGHUP received')) + signal.signal(signal.SIGHUP, signal.SIG_IGN) + self.running = False + + signal.signal(signal.SIGTERM, kill_children) + signal.signal(signal.SIGINT, kill_children) + signal.signal(signal.SIGHUP, hup) + + while len(self.children) < self.workers: + self.run_child() + + self._logger.info(_("Started %(children_count)d worker for %(name)s") + % {'children_count': len(self.children), + 'name': self.name}) + + self.wait_on_children() + + def run_child(self): + try: + pid = os.fork() + except KeyboardInterrupt: + pass + if pid == 0: + signal.signal(signal.SIGHUP, signal.SIG_DFL) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + # to avoid race condition that child receive signal before + # parent and is respawned + signal.signal(signal.SIGINT, signal.SIG_IGN) + try: + self.run_server_in_process() + except KeyboardInterrupt: + pass + self._logger.info(_('Child %d exiting normally') % os.getpid()) + return None + else: + self._logger.info(_('[%(name)s] Started worker (pid: %(pid)s)') % + {'name': self.name, + 'pid': pid}) + self.children.append(pid) + + def wait_on_children(self): + while self.running: + try: + pid, status = os.wait() + if os.WIFEXITED(status) or os.WIFSIGNALED(status): + self._logger.error(_('Dead worker %(pid)s') % locals()) + if pid in self.children: + self.children.remove(pid) + self.run_child() + except OSError, err: + if err.errno not in (errno.EINTR, errno.ECHILD): + raise + except KeyboardInterrupt: + self._logger.info(_('Caught keyboard interrupt. Exiting.')) + self.running = False + break + eventlet.greenio.shutdown_safe(self._socket) + self._socket.close() + self._logger.debug(_('Exited')) def stop(self): """Stop this server. @@ -106,7 +229,19 @@ class Server(object): """ LOG.info(_("Stopping WSGI server.")) - self._server.kill() + if self.workers > 0: + # set running state to false and kill all workers + self.running = False + for pid in self.children: + os.kill(pid, signal.SIGTERM) + self.children.remove(pid) + # now terminate master_worker + if self.master_worker.is_alive(): + self.master_worker.terminate() + else: + # Resize Pool to stop accepting new connection + self._pool.resize(0) + self._server.kill() def wait(self): """Block, until the server has stopped. @@ -117,9 +252,16 @@ class Server(object): """ try: - self._server.wait() + if self.workers and self.master_worker: + # for services enabled multi-process,a separate master_worker + # is already waiting + pass + else: + self._pool.waitall() except greenlet.GreenletExit: LOG.info(_("WSGI server has stopped.")) + except KeyboardInterrupt: + pass class Request(webob.Request): |