summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBrian Elliott <bdelliott@gmail.com>2013-05-09 21:07:27 +0000
committerBrian Elliott <bdelliott@gmail.com>2013-05-13 13:28:58 +0000
commitd8e4c91ebf589583ac79291de7a5e2c3f033962b (patch)
treea444f2ae9a3082f3615c7b05a9ac18876bffea57
parentc39687edb9138c6eac675b8781a5d4bf5a51c968 (diff)
downloadnova-d8e4c91ebf589583ac79291de7a5e2c3f033962b.tar.gz
nova-d8e4c91ebf589583ac79291de7a5e2c3f033962b.tar.xz
nova-d8e4c91ebf589583ac79291de7a5e2c3f033962b.zip
Convert Nova to use Oslo service infrastructure.
Convert Nova to use the Oslo versions of: * Service * Launchers Also add Service timers to the Service ThreadGroup. blueprint use-oslo-services Change-Id: Id8ab017f4525afd69fed322311f2d5cc3b6d6f98
-rw-r--r--nova/cmd/all.py10
-rw-r--r--nova/cmd/api.py4
-rw-r--r--nova/service.py318
-rw-r--r--nova/servicegroup/drivers/db.py8
-rw-r--r--nova/servicegroup/drivers/mc.py8
-rw-r--r--nova/tests/integrated/test_multiprocess_api.py4
-rw-r--r--nova/tests/test_service.py5
7 files changed, 29 insertions, 328 deletions
diff --git a/nova/cmd/all.py b/nova/cmd/all.py
index f510069b6..517033d05 100644
--- a/nova/cmd/all.py
+++ b/nova/cmd/all.py
@@ -49,19 +49,19 @@ def main():
logging.setup("nova")
LOG = logging.getLogger('nova.all')
utils.monkey_patch()
- launcher = service.ProcessLauncher()
+ launcher = service.process_launcher()
# nova-api
for api in CONF.enabled_apis:
try:
server = service.WSGIService(api)
- launcher.launch_server(server, workers=server.workers or 1)
+ launcher.launch_service(server, workers=server.workers or 1)
except (Exception, SystemExit):
LOG.exception(_('Failed to load %s') % '%s-api' % api)
for mod in [s3server, xvp_proxy]:
try:
- launcher.launch_server(mod.get_wsgi_server())
+ launcher.launch_service(mod.get_wsgi_server())
except (Exception, SystemExit):
LOG.exception(_('Failed to load %s') % mod.__name__)
@@ -82,8 +82,8 @@ def main():
manager = None
try:
- launcher.launch_server(service.Service.create(binary=binary,
- topic=topic,
+ launcher.launch_service(service.Service.create(binary=binary,
+ topic=topic,
manager=manager))
except (Exception, SystemExit):
LOG.exception(_('Failed to load %s'), binary)
diff --git a/nova/cmd/api.py b/nova/cmd/api.py
index a7f6313b0..f838d988f 100644
--- a/nova/cmd/api.py
+++ b/nova/cmd/api.py
@@ -41,7 +41,7 @@ def main():
logging.setup("nova")
utils.monkey_patch()
- launcher = service.ProcessLauncher()
+ launcher = service.process_launcher()
for api in CONF.enabled_apis:
should_use_ssl = api in CONF.enabled_ssl_apis
if api == 'ec2':
@@ -49,5 +49,5 @@ def main():
max_url_len=16384)
else:
server = service.WSGIService(api, use_ssl=should_use_ssl)
- launcher.launch_server(server, workers=server.workers or 1)
+ launcher.launch_service(server, workers=server.workers or 1)
launcher.wait()
diff --git a/nova/service.py b/nova/service.py
index 12bab14c3..3731986c4 100644
--- a/nova/service.py
+++ b/nova/service.py
@@ -19,26 +19,20 @@
"""Generic Node base class for all workers that run on hosts."""
-import errno
import inspect
import os
import random
-import signal
import sys
-import time
-import eventlet
-import greenlet
from oslo.config import cfg
from nova import conductor
from nova import context
from nova import exception
-from nova.openstack.common import eventlet_backdoor
from nova.openstack.common import importutils
from nova.openstack.common import log as logging
-from nova.openstack.common import loopingcall
from nova.openstack.common import rpc
+from nova.openstack.common import service
from nova import servicegroup
from nova import utils
from nova import version
@@ -119,275 +113,7 @@ CONF.register_opts(service_opts)
CONF.import_opt('host', 'nova.netconf')
-class SignalExit(SystemExit):
- def __init__(self, signo, exccode=1):
- super(SignalExit, self).__init__(exccode)
- self.signo = signo
-
-
-class Launcher(object):
- """Launch one or more services and wait for them to complete."""
-
- def __init__(self):
- """Initialize the service launcher.
-
- :returns: None
-
- """
- self._services = []
- self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
-
- @staticmethod
- def run_server(server):
- """Start and wait for a server to finish.
-
- :param service: Server to run and wait for.
- :returns: None
-
- """
- server.start()
- server.wait()
-
- def launch_server(self, server):
- """Load and start the given server.
-
- :param server: The server you would like to start.
- :returns: None
-
- """
- if self.backdoor_port is not None:
- server.backdoor_port = self.backdoor_port
- gt = eventlet.spawn(self.run_server, server)
- self._services.append(gt)
-
- def stop(self):
- """Stop all services which are currently running.
-
- :returns: None
-
- """
- for service in self._services:
- service.kill()
-
- def wait(self):
- """Waits until all services have been stopped, and then returns.
-
- :returns: None
-
- """
- for service in self._services:
- try:
- service.wait()
- except greenlet.GreenletExit:
- pass
-
-
-class ServiceLauncher(Launcher):
- def _handle_signal(self, signo, frame):
- # Allow the process to be killed again and die from natural causes
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- signal.signal(signal.SIGINT, signal.SIG_DFL)
-
- raise SignalExit(signo)
-
- def wait(self):
- signal.signal(signal.SIGTERM, self._handle_signal)
- signal.signal(signal.SIGINT, self._handle_signal)
-
- LOG.debug(_('Full set of CONF:'))
- for flag in CONF:
- flag_get = CONF.get(flag, None)
- # hide flag contents from log if contains a password
- # should use secret flag when switch over to openstack-common
- if ("_password" in flag or "_key" in flag or
- (flag == "sql_connection" and "mysql:" in flag_get)):
- LOG.debug(_('%(flag)s : FLAG SET ') % locals())
- else:
- LOG.debug('%(flag)s : %(flag_get)s' % locals())
-
- status = None
- try:
- super(ServiceLauncher, self).wait()
- except SignalExit as exc:
- signame = {signal.SIGTERM: 'SIGTERM',
- signal.SIGINT: 'SIGINT'}[exc.signo]
- LOG.info(_('Caught %s, exiting'), signame)
- status = exc.code
- except SystemExit as exc:
- status = exc.code
- finally:
- self.stop()
- rpc.cleanup()
-
- if status is not None:
- sys.exit(status)
-
-
-class ServerWrapper(object):
- def __init__(self, server, workers):
- self.server = server
- self.workers = workers
- self.children = set()
- self.forktimes = []
-
-
-class ProcessLauncher(object):
- def __init__(self):
- self.children = {}
- self.sigcaught = None
- self.running = True
- rfd, self.writepipe = os.pipe()
- self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
-
- signal.signal(signal.SIGTERM, self._handle_signal)
- signal.signal(signal.SIGINT, self._handle_signal)
-
- def _handle_signal(self, signo, frame):
- self.sigcaught = signo
- self.running = False
-
- # Allow the process to be killed again and die from natural causes
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- signal.signal(signal.SIGINT, signal.SIG_DFL)
-
- def _pipe_watcher(self):
- # This will block until the write end is closed when the parent
- # dies unexpectedly
- self.readpipe.read()
-
- LOG.info(_('Parent process has died unexpectedly, exiting'))
-
- sys.exit(1)
-
- def _child_process(self, server):
- # Setup child signal handlers differently
- def _sigterm(*args):
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- raise SignalExit(signal.SIGTERM)
-
- signal.signal(signal.SIGTERM, _sigterm)
- # Block SIGINT and let the parent send us a SIGTERM
- signal.signal(signal.SIGINT, signal.SIG_IGN)
-
- # Reopen the eventlet hub to make sure we don't share an epoll
- # fd with parent and/or siblings, which would be bad
- eventlet.hubs.use_hub()
-
- # Close write to ensure only parent has it open
- os.close(self.writepipe)
- # Create greenthread to watch for parent to close pipe
- eventlet.spawn(self._pipe_watcher)
-
- # Reseed random number generator
- random.seed()
-
- launcher = Launcher()
- launcher.run_server(server)
-
- def _start_child(self, wrap):
- if len(wrap.forktimes) > wrap.workers:
- # Limit ourselves to one process a second (over the period of
- # number of workers * 1 second). This will allow workers to
- # start up quickly but ensure we don't fork off children that
- # die instantly too quickly.
- if time.time() - wrap.forktimes[0] < wrap.workers:
- LOG.info(_('Forking too fast, sleeping'))
- time.sleep(1)
-
- wrap.forktimes.pop(0)
-
- wrap.forktimes.append(time.time())
-
- pid = os.fork()
- if pid == 0:
- # NOTE(johannes): All exceptions are caught to ensure this
- # doesn't fallback into the loop spawning children. It would
- # be bad for a child to spawn more children.
- status = 0
- try:
- self._child_process(wrap.server)
- except SignalExit as exc:
- signame = {signal.SIGTERM: 'SIGTERM',
- signal.SIGINT: 'SIGINT'}[exc.signo]
- LOG.info(_('Caught %s, exiting'), signame)
- status = exc.code
- except SystemExit as exc:
- status = exc.code
- except BaseException:
- LOG.exception(_('Unhandled exception'))
- status = 2
- finally:
- wrap.server.stop()
-
- os._exit(status)
-
- LOG.info(_('Started child %d'), pid)
-
- wrap.children.add(pid)
- self.children[pid] = wrap
-
- return pid
-
- def launch_server(self, server, workers=1):
- wrap = ServerWrapper(server, workers)
-
- LOG.info(_('Starting %d workers'), wrap.workers)
- while self.running and len(wrap.children) < wrap.workers:
- self._start_child(wrap)
-
- def _wait_child(self):
- try:
- pid, status = os.wait()
- except OSError as exc:
- if exc.errno not in (errno.EINTR, errno.ECHILD):
- raise
- return None
-
- if os.WIFSIGNALED(status):
- sig = os.WTERMSIG(status)
- LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals())
- else:
- code = os.WEXITSTATUS(status)
- LOG.info(_('Child %(pid)d exited with status %(code)d'), locals())
-
- if pid not in self.children:
- LOG.warning(_('pid %d not in child list'), pid)
- return None
-
- wrap = self.children.pop(pid)
- wrap.children.remove(pid)
- return wrap
-
- def wait(self):
- """Loop waiting on children to die and respawning as necessary."""
- while self.running:
- wrap = self._wait_child()
- if not wrap:
- continue
-
- while self.running and len(wrap.children) < wrap.workers:
- self._start_child(wrap)
-
- if self.sigcaught:
- signame = {signal.SIGTERM: 'SIGTERM',
- signal.SIGINT: 'SIGINT'}[self.sigcaught]
- LOG.info(_('Caught %s, stopping children'), signame)
-
- for pid in self.children:
- try:
- os.kill(pid, signal.SIGTERM)
- except OSError as exc:
- if exc.errno != errno.ESRCH:
- raise
-
- # Wait for children to die
- if self.children:
- LOG.info(_('Waiting on %d children to exit'), len(self.children))
- while self.children:
- self._wait_child()
-
-
-class Service(object):
+class Service(service.Service):
"""Service object for binaries running on hosts.
A service takes a manager and enables rpc by listening to queues based
@@ -398,6 +124,7 @@ class Service(object):
periodic_enable=None, periodic_fuzzy_delay=None,
periodic_interval_max=None, db_allowed=True,
*args, **kwargs):
+ super(Service, self).__init__()
self.host = host
self.binary = binary
self.topic = topic
@@ -417,7 +144,6 @@ class Service(object):
self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.periodic_interval_max = periodic_interval_max
self.saved_args, self.saved_kwargs = args, kwargs
- self.timers = []
self.backdoor_port = None
self.conductor_api = conductor.API(use_local=db_allowed)
self.conductor_api.wait_until_ready(context.get_admin_context())
@@ -464,9 +190,7 @@ class Service(object):
LOG.debug(_("Join ServiceGroup membership for this service %s")
% self.topic)
# Add service to the ServiceGroup membership group.
- pulse = self.servicegroup_api.join(self.host, self.topic, self)
- if pulse:
- self.timers.append(pulse)
+ self.servicegroup_api.join(self.host, self.topic, self)
if self.periodic_enable:
if self.periodic_fuzzy_delay:
@@ -474,10 +198,10 @@ class Service(object):
else:
initial_delay = None
- periodic = loopingcall.DynamicLoopingCall(self.periodic_tasks)
- periodic.start(initial_delay=initial_delay,
- periodic_interval_max=self.periodic_interval_max)
- self.timers.append(periodic)
+ self.tg.add_dynamic_timer(self.periodic_tasks,
+ initial_delay=initial_delay,
+ periodic_interval_max=
+ self.periodic_interval_max)
def _create_service_ref(self, context):
svc_values = {
@@ -546,25 +270,12 @@ class Service(object):
LOG.warn(_('Service killed that has no database entry'))
def stop(self):
- # Try to shut the connection down, but if we get any sort of
- # errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.conn.close()
except Exception:
pass
- for x in self.timers:
- try:
- x.stop()
- except Exception:
- pass
- self.timers = []
- def wait(self):
- for x in self.timers:
- try:
- x.wait()
- except Exception:
- pass
+ super(Service, self).stop()
def periodic_tasks(self, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
@@ -667,6 +378,10 @@ class WSGIService(object):
self.server.wait()
+def process_launcher():
+ return service.ProcessLauncher()
+
+
# NOTE(vish): the global launcher is to maintain the existing
# functionality of calling service.serve +
# service.wait
@@ -678,12 +393,7 @@ def serve(server, workers=None):
if _launcher:
raise RuntimeError(_('serve() can only be called once'))
- if workers:
- _launcher = ProcessLauncher()
- _launcher.launch_server(server, workers=workers)
- else:
- _launcher = ServiceLauncher()
- _launcher.launch_server(server)
+ _launcher = service.launch(server, workers=workers)
def wait():
diff --git a/nova/servicegroup/drivers/db.py b/nova/servicegroup/drivers/db.py
index 24fcfd04f..22b25d61c 100644
--- a/nova/servicegroup/drivers/db.py
+++ b/nova/servicegroup/drivers/db.py
@@ -18,7 +18,6 @@ from oslo.config import cfg
from nova import conductor
from nova import context
from nova.openstack.common import log as logging
-from nova.openstack.common import loopingcall
from nova.openstack.common import timeutils
from nova.servicegroup import api
from nova import utils
@@ -47,11 +46,8 @@ class DbDriver(api.ServiceGroupDriver):
' ServiceGroup driver'))
report_interval = service.report_interval
if report_interval:
- pulse = loopingcall.FixedIntervalLoopingCall(self._report_state,
- service)
- pulse.start(interval=report_interval,
- initial_delay=report_interval)
- return pulse
+ service.tg.add_timer(report_interval, self._report_state,
+ report_interval, service)
def is_up(self, service_ref):
"""Moved from nova.utils
diff --git a/nova/servicegroup/drivers/mc.py b/nova/servicegroup/drivers/mc.py
index 86c27a3aa..6e8cda456 100644
--- a/nova/servicegroup/drivers/mc.py
+++ b/nova/servicegroup/drivers/mc.py
@@ -22,7 +22,6 @@ from oslo.config import cfg
from nova import conductor
from nova import context
from nova.openstack.common import log as logging
-from nova.openstack.common import loopingcall
from nova.openstack.common import memorycache
from nova.openstack.common import timeutils
from nova.servicegroup import api
@@ -58,11 +57,8 @@ class MemcachedDriver(api.ServiceGroupDriver):
'Memcached based ServiceGroup driver'))
report_interval = service.report_interval
if report_interval:
- pulse = loopingcall.FixedIntervalLoopingCall(self._report_state,
- service)
- pulse.start(interval=report_interval,
- initial_delay=report_interval)
- return pulse
+ service.tg.add_timer(report_interval, self._report_state,
+ report_interval, service)
def is_up(self, service_ref):
"""Moved from nova.utils
diff --git a/nova/tests/integrated/test_multiprocess_api.py b/nova/tests/integrated/test_multiprocess_api.py
index c85a43a0c..2610cdcb7 100644
--- a/nova/tests/integrated/test_multiprocess_api.py
+++ b/nova/tests/integrated/test_multiprocess_api.py
@@ -54,8 +54,8 @@ class MultiprocessWSGITest(integrated_helpers._IntegratedTestBase):
# os._exit() which doesn't have this problem.
status = 0
try:
- launcher = service.ProcessLauncher()
- launcher.launch_server(self.osapi, workers=self.osapi.workers)
+ launcher = service.process_launcher()
+ launcher.launch_service(self.osapi, workers=self.osapi.workers)
launcher.wait()
except SystemExit as exc:
status = exc.code
diff --git a/nova/tests/test_service.py b/nova/tests/test_service.py
index 2cbc82fda..3ca6d7bc1 100644
--- a/nova/tests/test_service.py
+++ b/nova/tests/test_service.py
@@ -201,7 +201,6 @@ class TestLauncher(test.TestCase):
self.service = service.WSGIService("test_service")
def test_launch_app(self):
- launcher = service.Launcher()
- launcher.launch_server(self.service)
+ service.serve(self.service)
self.assertNotEquals(0, self.service.port)
- launcher.stop()
+ service._launcher.stop()