diff options
Diffstat (limited to 'nova/service.py')
-rw-r--r-- | nova/service.py | 255 |
1 files changed, 222 insertions, 33 deletions
diff --git a/nova/service.py b/nova/service.py index c701d3813..d3fb76933 100644 --- a/nova/service.py +++ b/nova/service.py @@ -19,10 +19,13 @@ """Generic Node base class for all workers that run on hosts.""" +import errno import inspect import os import random import signal +import sys +import time import eventlet import greenlet @@ -61,12 +64,18 @@ service_opts = [ cfg.IntOpt('ec2_listen_port', default=8773, help='port for ec2 api to listen'), + cfg.IntOpt('ec2_workers', + default=None, + help='Number of workers for EC2 API service'), cfg.StrOpt('osapi_compute_listen', default="0.0.0.0", help='IP address for OpenStack API to listen'), cfg.IntOpt('osapi_compute_listen_port', default=8774, help='list port for osapi compute'), + cfg.IntOpt('osapi_compute_workers', + default=None, + help='Number of workers for OpenStack API service'), cfg.StrOpt('metadata_manager', default='nova.api.manager.MetadataManager', help='OpenStack metadata service manager'), @@ -76,12 +85,18 @@ service_opts = [ cfg.IntOpt('metadata_listen_port', default=8775, help='port for metadata api to listen'), + cfg.IntOpt('metadata_workers', + default=None, + help='Number of workers for metadata service'), cfg.StrOpt('osapi_volume_listen', default="0.0.0.0", help='IP address for OpenStack Volume API to listen'), cfg.IntOpt('osapi_volume_listen_port', default=8776, - help='port for os volume api to listen') + help='port for os volume api to listen'), + cfg.IntOpt('osapi_volume_workers', + default=None, + help='Number of workers for OpenStack Volume API service'), ] FLAGS = flags.FLAGS @@ -98,6 +113,7 @@ class Launcher(object): """ self._services = [] + eventlet_backdoor.initialize_if_enabled() @staticmethod def run_server(server): @@ -135,15 +151,6 @@ class Launcher(object): :returns: None """ - def sigterm(sig, frame): - LOG.audit(_("SIGTERM received")) - # NOTE(jk0): Raise a ^C which is caught by the caller and cleanly - # shuts down the service. This does not yet handle eventlet - # threads. - raise KeyboardInterrupt - - signal.signal(signal.SIGTERM, sigterm) - for service in self._services: try: service.wait() @@ -151,6 +158,198 @@ class Launcher(object): pass +class ServiceLauncher(Launcher): + def _handle_signal(self, signo, frame): + signame = {signal.SIGTERM: 'SIGTERM', signal.SIGINT: 'SIGINT'}[signo] + LOG.info(_('Caught %s, exiting'), signame) + + # Allow the process to be killed again and die from natural causes + signal.signal(signal.SIGTERM, signal.SIG_DFL) + signal.signal(signal.SIGINT, signal.SIG_DFL) + + sys.exit(1) + + def wait(self): + signal.signal(signal.SIGTERM, self._handle_signal) + signal.signal(signal.SIGINT, self._handle_signal) + + LOG.debug(_('Full set of FLAGS:')) + for flag in FLAGS: + flag_get = FLAGS.get(flag, None) + # hide flag contents from log if contains a password + # should use secret flag when switch over to openstack-common + if ("_password" in flag or "_key" in flag or + (flag == "sql_connection" and "mysql:" in flag_get)): + LOG.debug(_('%(flag)s : FLAG SET ') % locals()) + else: + LOG.debug('%(flag)s : %(flag_get)s' % locals()) + + status = None + try: + super(ServiceLauncher, self).wait() + except SystemExit as exc: + status = exc.code + self.stop() + rpc.cleanup() + + if status is not None: + sys.exit(status) + + +class ServerWrapper(object): + def __init__(self, server, workers): + self.server = server + self.workers = workers + self.children = set() + self.forktimes = [] + + +class ProcessLauncher(object): + def __init__(self): + self.children = {} + self.running = True + rfd, self.writepipe = os.pipe() + self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r') + + signal.signal(signal.SIGTERM, self._handle_signal) + signal.signal(signal.SIGINT, self._handle_signal) + + def _handle_signal(self, signo, frame): + signame = {signal.SIGTERM: 'SIGTERM', signal.SIGINT: 'SIGINT'}[signo] + LOG.info(_('Caught %s, stopping children'), signame) + + self.running = False + for pid in self.children: + try: + os.kill(pid, signal.SIGTERM) + except OSError as exc: + if exc.errno != errno.ESRCH: + raise + + # Allow the process to be killed again and die from natural causes + signal.signal(signal.SIGTERM, signal.SIG_DFL) + signal.signal(signal.SIGINT, signal.SIG_DFL) + + def _pipe_watcher(self): + # This will block until the write end is closed when the parent + # dies unexpectedly + self.readpipe.read() + + LOG.info(_('Parent process has died unexpectedly, exiting')) + + sys.exit(1) + + def _child_process(self, server): + # Setup child signal handlers differently + def _sigterm(*args): + LOG.info(_('Received SIGTERM, stopping')) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + server.stop() + + signal.signal(signal.SIGTERM, _sigterm) + # Block SIGINT and let the parent send us a SIGTERM + signal.signal(signal.SIGINT, signal.SIG_IGN) + + # Reopen the eventlet hub to make sure we don't share an epoll + # fd with parent and/or siblings, which would be bad + eventlet.hubs.use_hub() + + # Close write to ensure only parent has it open + os.close(self.writepipe) + # Create greenthread to watch for parent to close pipe + eventlet.spawn(self._pipe_watcher) + + # Reseed random number generator + random.seed() + + launcher = Launcher() + launcher.run_server(server) + + def _start_child(self, wrap): + if len(wrap.forktimes) > wrap.workers: + # Limit ourselves to one process a second (over the period of + # number of workers * 1 second). This will allow workers to + # start up quickly but ensure we don't fork off children that + # die instantly too quickly. + if time.time() - wrap.forktimes[0] < wrap.workers: + LOG.info(_('Forking too fast, sleeping')) + time.sleep(1) + + wrap.forktimes.pop(0) + + wrap.forktimes.append(time.time()) + + pid = os.fork() + if pid == 0: + # NOTE(johannes): All exceptions are caught to ensure this + # doesn't fallback into the loop spawning children. It would + # be bad for a child to spawn more children. + status = 0 + try: + self._child_process(wrap.server) + except SystemExit as exc: + status = exc.code + except BaseException: + LOG.exception(_('Unhandled exception')) + status = 2 + + os._exit(status) + + LOG.info(_('Started child %d'), pid) + + wrap.children.add(pid) + self.children[pid] = wrap + + return pid + + def launch_server(self, server, workers=1): + wrap = ServerWrapper(server, workers) + + LOG.info(_('Starting %d workers'), wrap.workers) + while self.running and len(wrap.children) < wrap.workers: + self._start_child(wrap) + + def _wait_child(self): + try: + pid, status = os.wait() + except OSError as exc: + if exc.errno not in (errno.EINTR, errno.ECHILD): + raise + return None + + if os.WIFSIGNALED(status): + sig = os.WTERMSIG(status) + LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals()) + else: + code = os.WEXITSTATUS(status) + LOG.info(_('Child %(pid)d exited with status %(code)d'), locals()) + + if pid not in self.children: + LOG.warning(_('pid %d not in child list'), pid) + return None + + wrap = self.children.pop(pid) + wrap.children.remove(pid) + return wrap + + def wait(self): + """Loop waiting on children to die and respawning as necessary""" + # Loop calling wait and respawning as necessary + while self.running: + wrap = self._wait_child() + if not wrap: + continue + + while self.running and len(wrap.children) < wrap.workers: + self._start_child(wrap) + + # Wait for children to die + if self.children: + LOG.info(_('Waiting on %d children to exit'), len(self.children)) + while self.children: + self._wait_child() + + class Service(object): """Service object for binaries running on hosts. @@ -170,7 +369,6 @@ class Service(object): self.report_interval = report_interval self.periodic_interval = periodic_interval self.periodic_fuzzy_delay = periodic_fuzzy_delay - super(Service, self).__init__(*args, **kwargs) self.saved_args, self.saved_kwargs = args, kwargs self.timers = [] @@ -361,10 +559,13 @@ class WSGIService(object): self.app = self.loader.load_app(name) self.host = getattr(FLAGS, '%s_listen' % name, "0.0.0.0") self.port = getattr(FLAGS, '%s_listen_port' % name, 0) + self.workers = getattr(FLAGS, '%s_workers' % name, None) self.server = wsgi.Server(name, self.app, host=self.host, port=self.port) + # Pull back actual port used + self.port = self.server.port def _get_manager(self): """Initialize a Manager object appropriate for this service. @@ -400,7 +601,6 @@ class WSGIService(object): if self.manager: self.manager.init_host() self.server.start() - self.port = self.server.port def stop(self): """Stop serving this API. @@ -425,29 +625,18 @@ class WSGIService(object): _launcher = None -def serve(*servers): +def serve(server, workers=None): global _launcher - if not _launcher: - _launcher = Launcher() - for server in servers: + if _launcher: + raise RuntimeError(_('serve() can only be called once')) + + if workers: + _launcher = ProcessLauncher() + _launcher.launch_server(server, workers=workers) + else: + _launcher = ServiceLauncher() _launcher.launch_server(server) - eventlet_backdoor.initialize_if_enabled() - def wait(): - LOG.debug(_('Full set of FLAGS:')) - for flag in FLAGS: - flag_get = FLAGS.get(flag, None) - # hide flag contents from log if contains a password - # should use secret flag when switch over to openstack-common - if ("_password" in flag or "_key" in flag or - (flag == "sql_connection" and "mysql:" in flag_get)): - LOG.debug(_('%(flag)s : FLAG SET ') % locals()) - else: - LOG.debug('%(flag)s : %(flag_get)s' % locals()) - try: - _launcher.wait() - except KeyboardInterrupt: - _launcher.stop() - rpc.cleanup() + _launcher.wait() |