summaryrefslogtreecommitdiffstats
path: root/nova/service.py
diff options
context:
space:
mode:
Diffstat (limited to 'nova/service.py')
-rw-r--r--nova/service.py318
1 files changed, 14 insertions, 304 deletions
diff --git a/nova/service.py b/nova/service.py
index 12bab14c3..3731986c4 100644
--- a/nova/service.py
+++ b/nova/service.py
@@ -19,26 +19,20 @@
"""Generic Node base class for all workers that run on hosts."""
-import errno
import inspect
import os
import random
-import signal
import sys
-import time
-import eventlet
-import greenlet
from oslo.config import cfg
from nova import conductor
from nova import context
from nova import exception
-from nova.openstack.common import eventlet_backdoor
from nova.openstack.common import importutils
from nova.openstack.common import log as logging
-from nova.openstack.common import loopingcall
from nova.openstack.common import rpc
+from nova.openstack.common import service
from nova import servicegroup
from nova import utils
from nova import version
@@ -119,275 +113,7 @@ CONF.register_opts(service_opts)
CONF.import_opt('host', 'nova.netconf')
-class SignalExit(SystemExit):
- def __init__(self, signo, exccode=1):
- super(SignalExit, self).__init__(exccode)
- self.signo = signo
-
-
-class Launcher(object):
- """Launch one or more services and wait for them to complete."""
-
- def __init__(self):
- """Initialize the service launcher.
-
- :returns: None
-
- """
- self._services = []
- self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
-
- @staticmethod
- def run_server(server):
- """Start and wait for a server to finish.
-
- :param service: Server to run and wait for.
- :returns: None
-
- """
- server.start()
- server.wait()
-
- def launch_server(self, server):
- """Load and start the given server.
-
- :param server: The server you would like to start.
- :returns: None
-
- """
- if self.backdoor_port is not None:
- server.backdoor_port = self.backdoor_port
- gt = eventlet.spawn(self.run_server, server)
- self._services.append(gt)
-
- def stop(self):
- """Stop all services which are currently running.
-
- :returns: None
-
- """
- for service in self._services:
- service.kill()
-
- def wait(self):
- """Waits until all services have been stopped, and then returns.
-
- :returns: None
-
- """
- for service in self._services:
- try:
- service.wait()
- except greenlet.GreenletExit:
- pass
-
-
-class ServiceLauncher(Launcher):
- def _handle_signal(self, signo, frame):
- # Allow the process to be killed again and die from natural causes
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- signal.signal(signal.SIGINT, signal.SIG_DFL)
-
- raise SignalExit(signo)
-
- def wait(self):
- signal.signal(signal.SIGTERM, self._handle_signal)
- signal.signal(signal.SIGINT, self._handle_signal)
-
- LOG.debug(_('Full set of CONF:'))
- for flag in CONF:
- flag_get = CONF.get(flag, None)
- # hide flag contents from log if contains a password
- # should use secret flag when switch over to openstack-common
- if ("_password" in flag or "_key" in flag or
- (flag == "sql_connection" and "mysql:" in flag_get)):
- LOG.debug(_('%(flag)s : FLAG SET ') % locals())
- else:
- LOG.debug('%(flag)s : %(flag_get)s' % locals())
-
- status = None
- try:
- super(ServiceLauncher, self).wait()
- except SignalExit as exc:
- signame = {signal.SIGTERM: 'SIGTERM',
- signal.SIGINT: 'SIGINT'}[exc.signo]
- LOG.info(_('Caught %s, exiting'), signame)
- status = exc.code
- except SystemExit as exc:
- status = exc.code
- finally:
- self.stop()
- rpc.cleanup()
-
- if status is not None:
- sys.exit(status)
-
-
-class ServerWrapper(object):
- def __init__(self, server, workers):
- self.server = server
- self.workers = workers
- self.children = set()
- self.forktimes = []
-
-
-class ProcessLauncher(object):
- def __init__(self):
- self.children = {}
- self.sigcaught = None
- self.running = True
- rfd, self.writepipe = os.pipe()
- self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
-
- signal.signal(signal.SIGTERM, self._handle_signal)
- signal.signal(signal.SIGINT, self._handle_signal)
-
- def _handle_signal(self, signo, frame):
- self.sigcaught = signo
- self.running = False
-
- # Allow the process to be killed again and die from natural causes
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- signal.signal(signal.SIGINT, signal.SIG_DFL)
-
- def _pipe_watcher(self):
- # This will block until the write end is closed when the parent
- # dies unexpectedly
- self.readpipe.read()
-
- LOG.info(_('Parent process has died unexpectedly, exiting'))
-
- sys.exit(1)
-
- def _child_process(self, server):
- # Setup child signal handlers differently
- def _sigterm(*args):
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- raise SignalExit(signal.SIGTERM)
-
- signal.signal(signal.SIGTERM, _sigterm)
- # Block SIGINT and let the parent send us a SIGTERM
- signal.signal(signal.SIGINT, signal.SIG_IGN)
-
- # Reopen the eventlet hub to make sure we don't share an epoll
- # fd with parent and/or siblings, which would be bad
- eventlet.hubs.use_hub()
-
- # Close write to ensure only parent has it open
- os.close(self.writepipe)
- # Create greenthread to watch for parent to close pipe
- eventlet.spawn(self._pipe_watcher)
-
- # Reseed random number generator
- random.seed()
-
- launcher = Launcher()
- launcher.run_server(server)
-
- def _start_child(self, wrap):
- if len(wrap.forktimes) > wrap.workers:
- # Limit ourselves to one process a second (over the period of
- # number of workers * 1 second). This will allow workers to
- # start up quickly but ensure we don't fork off children that
- # die instantly too quickly.
- if time.time() - wrap.forktimes[0] < wrap.workers:
- LOG.info(_('Forking too fast, sleeping'))
- time.sleep(1)
-
- wrap.forktimes.pop(0)
-
- wrap.forktimes.append(time.time())
-
- pid = os.fork()
- if pid == 0:
- # NOTE(johannes): All exceptions are caught to ensure this
- # doesn't fallback into the loop spawning children. It would
- # be bad for a child to spawn more children.
- status = 0
- try:
- self._child_process(wrap.server)
- except SignalExit as exc:
- signame = {signal.SIGTERM: 'SIGTERM',
- signal.SIGINT: 'SIGINT'}[exc.signo]
- LOG.info(_('Caught %s, exiting'), signame)
- status = exc.code
- except SystemExit as exc:
- status = exc.code
- except BaseException:
- LOG.exception(_('Unhandled exception'))
- status = 2
- finally:
- wrap.server.stop()
-
- os._exit(status)
-
- LOG.info(_('Started child %d'), pid)
-
- wrap.children.add(pid)
- self.children[pid] = wrap
-
- return pid
-
- def launch_server(self, server, workers=1):
- wrap = ServerWrapper(server, workers)
-
- LOG.info(_('Starting %d workers'), wrap.workers)
- while self.running and len(wrap.children) < wrap.workers:
- self._start_child(wrap)
-
- def _wait_child(self):
- try:
- pid, status = os.wait()
- except OSError as exc:
- if exc.errno not in (errno.EINTR, errno.ECHILD):
- raise
- return None
-
- if os.WIFSIGNALED(status):
- sig = os.WTERMSIG(status)
- LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals())
- else:
- code = os.WEXITSTATUS(status)
- LOG.info(_('Child %(pid)d exited with status %(code)d'), locals())
-
- if pid not in self.children:
- LOG.warning(_('pid %d not in child list'), pid)
- return None
-
- wrap = self.children.pop(pid)
- wrap.children.remove(pid)
- return wrap
-
- def wait(self):
- """Loop waiting on children to die and respawning as necessary."""
- while self.running:
- wrap = self._wait_child()
- if not wrap:
- continue
-
- while self.running and len(wrap.children) < wrap.workers:
- self._start_child(wrap)
-
- if self.sigcaught:
- signame = {signal.SIGTERM: 'SIGTERM',
- signal.SIGINT: 'SIGINT'}[self.sigcaught]
- LOG.info(_('Caught %s, stopping children'), signame)
-
- for pid in self.children:
- try:
- os.kill(pid, signal.SIGTERM)
- except OSError as exc:
- if exc.errno != errno.ESRCH:
- raise
-
- # Wait for children to die
- if self.children:
- LOG.info(_('Waiting on %d children to exit'), len(self.children))
- while self.children:
- self._wait_child()
-
-
-class Service(object):
+class Service(service.Service):
"""Service object for binaries running on hosts.
A service takes a manager and enables rpc by listening to queues based
@@ -398,6 +124,7 @@ class Service(object):
periodic_enable=None, periodic_fuzzy_delay=None,
periodic_interval_max=None, db_allowed=True,
*args, **kwargs):
+ super(Service, self).__init__()
self.host = host
self.binary = binary
self.topic = topic
@@ -417,7 +144,6 @@ class Service(object):
self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.periodic_interval_max = periodic_interval_max
self.saved_args, self.saved_kwargs = args, kwargs
- self.timers = []
self.backdoor_port = None
self.conductor_api = conductor.API(use_local=db_allowed)
self.conductor_api.wait_until_ready(context.get_admin_context())
@@ -464,9 +190,7 @@ class Service(object):
LOG.debug(_("Join ServiceGroup membership for this service %s")
% self.topic)
# Add service to the ServiceGroup membership group.
- pulse = self.servicegroup_api.join(self.host, self.topic, self)
- if pulse:
- self.timers.append(pulse)
+ self.servicegroup_api.join(self.host, self.topic, self)
if self.periodic_enable:
if self.periodic_fuzzy_delay:
@@ -474,10 +198,10 @@ class Service(object):
else:
initial_delay = None
- periodic = loopingcall.DynamicLoopingCall(self.periodic_tasks)
- periodic.start(initial_delay=initial_delay,
- periodic_interval_max=self.periodic_interval_max)
- self.timers.append(periodic)
+ self.tg.add_dynamic_timer(self.periodic_tasks,
+ initial_delay=initial_delay,
+ periodic_interval_max=
+ self.periodic_interval_max)
def _create_service_ref(self, context):
svc_values = {
@@ -546,25 +270,12 @@ class Service(object):
LOG.warn(_('Service killed that has no database entry'))
def stop(self):
- # Try to shut the connection down, but if we get any sort of
- # errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.conn.close()
except Exception:
pass
- for x in self.timers:
- try:
- x.stop()
- except Exception:
- pass
- self.timers = []
- def wait(self):
- for x in self.timers:
- try:
- x.wait()
- except Exception:
- pass
+ super(Service, self).stop()
def periodic_tasks(self, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
@@ -667,6 +378,10 @@ class WSGIService(object):
self.server.wait()
+def process_launcher():
+ return service.ProcessLauncher()
+
+
# NOTE(vish): the global launcher is to maintain the existing
# functionality of calling service.serve +
# service.wait
@@ -678,12 +393,7 @@ def serve(server, workers=None):
if _launcher:
raise RuntimeError(_('serve() can only be called once'))
- if workers:
- _launcher = ProcessLauncher()
- _launcher.launch_server(server, workers=workers)
- else:
- _launcher = ServiceLauncher()
- _launcher.launch_server(server)
+ _launcher = service.launch(server, workers=workers)
def wait():