diff options
Diffstat (limited to 'nova/service.py')
-rw-r--r-- | nova/service.py | 318 |
1 files changed, 14 insertions, 304 deletions
diff --git a/nova/service.py b/nova/service.py index 12bab14c3..3731986c4 100644 --- a/nova/service.py +++ b/nova/service.py @@ -19,26 +19,20 @@ """Generic Node base class for all workers that run on hosts.""" -import errno import inspect import os import random -import signal import sys -import time -import eventlet -import greenlet from oslo.config import cfg from nova import conductor from nova import context from nova import exception -from nova.openstack.common import eventlet_backdoor from nova.openstack.common import importutils from nova.openstack.common import log as logging -from nova.openstack.common import loopingcall from nova.openstack.common import rpc +from nova.openstack.common import service from nova import servicegroup from nova import utils from nova import version @@ -119,275 +113,7 @@ CONF.register_opts(service_opts) CONF.import_opt('host', 'nova.netconf') -class SignalExit(SystemExit): - def __init__(self, signo, exccode=1): - super(SignalExit, self).__init__(exccode) - self.signo = signo - - -class Launcher(object): - """Launch one or more services and wait for them to complete.""" - - def __init__(self): - """Initialize the service launcher. - - :returns: None - - """ - self._services = [] - self.backdoor_port = eventlet_backdoor.initialize_if_enabled() - - @staticmethod - def run_server(server): - """Start and wait for a server to finish. - - :param service: Server to run and wait for. - :returns: None - - """ - server.start() - server.wait() - - def launch_server(self, server): - """Load and start the given server. - - :param server: The server you would like to start. - :returns: None - - """ - if self.backdoor_port is not None: - server.backdoor_port = self.backdoor_port - gt = eventlet.spawn(self.run_server, server) - self._services.append(gt) - - def stop(self): - """Stop all services which are currently running. - - :returns: None - - """ - for service in self._services: - service.kill() - - def wait(self): - """Waits until all services have been stopped, and then returns. - - :returns: None - - """ - for service in self._services: - try: - service.wait() - except greenlet.GreenletExit: - pass - - -class ServiceLauncher(Launcher): - def _handle_signal(self, signo, frame): - # Allow the process to be killed again and die from natural causes - signal.signal(signal.SIGTERM, signal.SIG_DFL) - signal.signal(signal.SIGINT, signal.SIG_DFL) - - raise SignalExit(signo) - - def wait(self): - signal.signal(signal.SIGTERM, self._handle_signal) - signal.signal(signal.SIGINT, self._handle_signal) - - LOG.debug(_('Full set of CONF:')) - for flag in CONF: - flag_get = CONF.get(flag, None) - # hide flag contents from log if contains a password - # should use secret flag when switch over to openstack-common - if ("_password" in flag or "_key" in flag or - (flag == "sql_connection" and "mysql:" in flag_get)): - LOG.debug(_('%(flag)s : FLAG SET ') % locals()) - else: - LOG.debug('%(flag)s : %(flag_get)s' % locals()) - - status = None - try: - super(ServiceLauncher, self).wait() - except SignalExit as exc: - signame = {signal.SIGTERM: 'SIGTERM', - signal.SIGINT: 'SIGINT'}[exc.signo] - LOG.info(_('Caught %s, exiting'), signame) - status = exc.code - except SystemExit as exc: - status = exc.code - finally: - self.stop() - rpc.cleanup() - - if status is not None: - sys.exit(status) - - -class ServerWrapper(object): - def __init__(self, server, workers): - self.server = server - self.workers = workers - self.children = set() - self.forktimes = [] - - -class ProcessLauncher(object): - def __init__(self): - self.children = {} - self.sigcaught = None - self.running = True - rfd, self.writepipe = os.pipe() - self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r') - - signal.signal(signal.SIGTERM, self._handle_signal) - signal.signal(signal.SIGINT, self._handle_signal) - - def _handle_signal(self, signo, frame): - self.sigcaught = signo - self.running = False - - # Allow the process to be killed again and die from natural causes - signal.signal(signal.SIGTERM, signal.SIG_DFL) - signal.signal(signal.SIGINT, signal.SIG_DFL) - - def _pipe_watcher(self): - # This will block until the write end is closed when the parent - # dies unexpectedly - self.readpipe.read() - - LOG.info(_('Parent process has died unexpectedly, exiting')) - - sys.exit(1) - - def _child_process(self, server): - # Setup child signal handlers differently - def _sigterm(*args): - signal.signal(signal.SIGTERM, signal.SIG_DFL) - raise SignalExit(signal.SIGTERM) - - signal.signal(signal.SIGTERM, _sigterm) - # Block SIGINT and let the parent send us a SIGTERM - signal.signal(signal.SIGINT, signal.SIG_IGN) - - # Reopen the eventlet hub to make sure we don't share an epoll - # fd with parent and/or siblings, which would be bad - eventlet.hubs.use_hub() - - # Close write to ensure only parent has it open - os.close(self.writepipe) - # Create greenthread to watch for parent to close pipe - eventlet.spawn(self._pipe_watcher) - - # Reseed random number generator - random.seed() - - launcher = Launcher() - launcher.run_server(server) - - def _start_child(self, wrap): - if len(wrap.forktimes) > wrap.workers: - # Limit ourselves to one process a second (over the period of - # number of workers * 1 second). This will allow workers to - # start up quickly but ensure we don't fork off children that - # die instantly too quickly. - if time.time() - wrap.forktimes[0] < wrap.workers: - LOG.info(_('Forking too fast, sleeping')) - time.sleep(1) - - wrap.forktimes.pop(0) - - wrap.forktimes.append(time.time()) - - pid = os.fork() - if pid == 0: - # NOTE(johannes): All exceptions are caught to ensure this - # doesn't fallback into the loop spawning children. It would - # be bad for a child to spawn more children. - status = 0 - try: - self._child_process(wrap.server) - except SignalExit as exc: - signame = {signal.SIGTERM: 'SIGTERM', - signal.SIGINT: 'SIGINT'}[exc.signo] - LOG.info(_('Caught %s, exiting'), signame) - status = exc.code - except SystemExit as exc: - status = exc.code - except BaseException: - LOG.exception(_('Unhandled exception')) - status = 2 - finally: - wrap.server.stop() - - os._exit(status) - - LOG.info(_('Started child %d'), pid) - - wrap.children.add(pid) - self.children[pid] = wrap - - return pid - - def launch_server(self, server, workers=1): - wrap = ServerWrapper(server, workers) - - LOG.info(_('Starting %d workers'), wrap.workers) - while self.running and len(wrap.children) < wrap.workers: - self._start_child(wrap) - - def _wait_child(self): - try: - pid, status = os.wait() - except OSError as exc: - if exc.errno not in (errno.EINTR, errno.ECHILD): - raise - return None - - if os.WIFSIGNALED(status): - sig = os.WTERMSIG(status) - LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals()) - else: - code = os.WEXITSTATUS(status) - LOG.info(_('Child %(pid)d exited with status %(code)d'), locals()) - - if pid not in self.children: - LOG.warning(_('pid %d not in child list'), pid) - return None - - wrap = self.children.pop(pid) - wrap.children.remove(pid) - return wrap - - def wait(self): - """Loop waiting on children to die and respawning as necessary.""" - while self.running: - wrap = self._wait_child() - if not wrap: - continue - - while self.running and len(wrap.children) < wrap.workers: - self._start_child(wrap) - - if self.sigcaught: - signame = {signal.SIGTERM: 'SIGTERM', - signal.SIGINT: 'SIGINT'}[self.sigcaught] - LOG.info(_('Caught %s, stopping children'), signame) - - for pid in self.children: - try: - os.kill(pid, signal.SIGTERM) - except OSError as exc: - if exc.errno != errno.ESRCH: - raise - - # Wait for children to die - if self.children: - LOG.info(_('Waiting on %d children to exit'), len(self.children)) - while self.children: - self._wait_child() - - -class Service(object): +class Service(service.Service): """Service object for binaries running on hosts. A service takes a manager and enables rpc by listening to queues based @@ -398,6 +124,7 @@ class Service(object): periodic_enable=None, periodic_fuzzy_delay=None, periodic_interval_max=None, db_allowed=True, *args, **kwargs): + super(Service, self).__init__() self.host = host self.binary = binary self.topic = topic @@ -417,7 +144,6 @@ class Service(object): self.periodic_fuzzy_delay = periodic_fuzzy_delay self.periodic_interval_max = periodic_interval_max self.saved_args, self.saved_kwargs = args, kwargs - self.timers = [] self.backdoor_port = None self.conductor_api = conductor.API(use_local=db_allowed) self.conductor_api.wait_until_ready(context.get_admin_context()) @@ -464,9 +190,7 @@ class Service(object): LOG.debug(_("Join ServiceGroup membership for this service %s") % self.topic) # Add service to the ServiceGroup membership group. - pulse = self.servicegroup_api.join(self.host, self.topic, self) - if pulse: - self.timers.append(pulse) + self.servicegroup_api.join(self.host, self.topic, self) if self.periodic_enable: if self.periodic_fuzzy_delay: @@ -474,10 +198,10 @@ class Service(object): else: initial_delay = None - periodic = loopingcall.DynamicLoopingCall(self.periodic_tasks) - periodic.start(initial_delay=initial_delay, - periodic_interval_max=self.periodic_interval_max) - self.timers.append(periodic) + self.tg.add_dynamic_timer(self.periodic_tasks, + initial_delay=initial_delay, + periodic_interval_max= + self.periodic_interval_max) def _create_service_ref(self, context): svc_values = { @@ -546,25 +270,12 @@ class Service(object): LOG.warn(_('Service killed that has no database entry')) def stop(self): - # Try to shut the connection down, but if we get any sort of - # errors, go ahead and ignore them.. as we're shutting down anyway try: self.conn.close() except Exception: pass - for x in self.timers: - try: - x.stop() - except Exception: - pass - self.timers = [] - def wait(self): - for x in self.timers: - try: - x.wait() - except Exception: - pass + super(Service, self).stop() def periodic_tasks(self, raise_on_error=False): """Tasks to be run at a periodic interval.""" @@ -667,6 +378,10 @@ class WSGIService(object): self.server.wait() +def process_launcher(): + return service.ProcessLauncher() + + # NOTE(vish): the global launcher is to maintain the existing # functionality of calling service.serve + # service.wait @@ -678,12 +393,7 @@ def serve(server, workers=None): if _launcher: raise RuntimeError(_('serve() can only be called once')) - if workers: - _launcher = ProcessLauncher() - _launcher.launch_server(server, workers=workers) - else: - _launcher = ServiceLauncher() - _launcher.launch_server(server) + _launcher = service.launch(server, workers=workers) def wait(): |