summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBrian Elliott <bdelliott@gmail.com>2013-05-08 18:58:39 +0000
committerBrian Elliott <bdelliott@gmail.com>2013-05-11 21:56:38 +0000
commit1298b1880a07596832cd981ee60e4c053efccec8 (patch)
tree42ef79d2e692adf76bf0791cc4b3a67cdf4db40a
parente4f05bada6bba3dfab936bb0e77a673b640b6f9e (diff)
Sync service and threadgroup modules from oslo
blueprint use-oslo-services Change-Id: I8641dbed0a83a6d57c0b6c1a02796eb3e633fafd
-rw-r--r--nova/openstack/common/jsonutils.py8
-rw-r--r--nova/openstack/common/log.py18
-rw-r--r--nova/openstack/common/processutils.py2
-rw-r--r--nova/openstack/common/service.py333
-rw-r--r--nova/openstack/common/threadgroup.py121
-rw-r--r--openstack-common.conf2
6 files changed, 467 insertions, 17 deletions
diff --git a/nova/openstack/common/jsonutils.py b/nova/openstack/common/jsonutils.py
index 0aab537a7..11af66daf 100644
--- a/nova/openstack/common/jsonutils.py
+++ b/nova/openstack/common/jsonutils.py
@@ -41,6 +41,8 @@ import json
import types
import xmlrpclib
+import six
+
from nova.openstack.common import timeutils
@@ -93,7 +95,7 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
# value of itertools.count doesn't get caught by nasty_type_tests
# and results in infinite loop when list(value) is called.
if type(value) == itertools.count:
- return unicode(value)
+ return six.text_type(value)
# FIXME(vish): Workaround for LP bug 852095. Without this workaround,
# tests that raise an exception in a mocked method that
@@ -137,12 +139,12 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
return recursive(value.__dict__, level=level + 1)
else:
if any(test(value) for test in _nasty_type_tests):
- return unicode(value)
+ return six.text_type(value)
return value
except TypeError:
# Class objects are tricky since they may define something like
# __iter__ defined but it isn't callable as list().
- return unicode(value)
+ return six.text_type(value)
def dumps(value, default=to_primitive, **kwargs):
diff --git a/nova/openstack/common/log.py b/nova/openstack/common/log.py
index e11f666fc..cdaba43ab 100644
--- a/nova/openstack/common/log.py
+++ b/nova/openstack/common/log.py
@@ -43,9 +43,9 @@ import traceback
from oslo.config import cfg
from nova.openstack.common.gettextutils import _
+from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
from nova.openstack.common import local
-from nova.openstack.common import notifier
_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
@@ -322,17 +322,6 @@ class JSONFormatter(logging.Formatter):
return jsonutils.dumps(message)
-class PublishErrorsHandler(logging.Handler):
- def emit(self, record):
- if ('nova.openstack.common.notifier.log_notifier' in
- CONF.notification_driver):
- return
- notifier.api.notify(None, 'error.publisher',
- 'error_notification',
- notifier.api.ERROR,
- dict(error=record.msg))
-
-
def _create_logging_excepthook(product_name):
def logging_excepthook(type, value, tb):
extra = {}
@@ -428,7 +417,10 @@ def _setup_logging_from_conf():
log_root.addHandler(streamlog)
if CONF.publish_errors:
- log_root.addHandler(PublishErrorsHandler(logging.ERROR))
+ handler = importutils.import_object(
+ "nova.openstack.common.log_handler.PublishErrorsHandler",
+ logging.ERROR)
+ log_root.addHandler(handler)
datefmt = CONF.log_date_format
for handler in log_root.handlers:
diff --git a/nova/openstack/common/processutils.py b/nova/openstack/common/processutils.py
index 6474db4ad..de6179602 100644
--- a/nova/openstack/common/processutils.py
+++ b/nova/openstack/common/processutils.py
@@ -123,7 +123,7 @@ def execute(*cmd, **kwargs):
elif isinstance(check_exit_code, int):
check_exit_code = [check_exit_code]
- if len(kwargs):
+ if kwargs:
raise UnknownArgumentError(_('Got unknown keyword args '
'to utils.execute: %r') % kwargs)
diff --git a/nova/openstack/common/service.py b/nova/openstack/common/service.py
new file mode 100644
index 000000000..a1f430999
--- /dev/null
+++ b/nova/openstack/common/service.py
@@ -0,0 +1,333 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# Copyright 2011 Justin Santa Barbara
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Generic Node base class for all workers that run on hosts."""
+
+import errno
+import os
+import random
+import signal
+import sys
+import time
+
+import eventlet
+import logging as std_logging
+from oslo.config import cfg
+
+from nova.openstack.common import eventlet_backdoor
+from nova.openstack.common.gettextutils import _
+from nova.openstack.common import importutils
+from nova.openstack.common import log as logging
+from nova.openstack.common import threadgroup
+
+
+rpc = importutils.try_import('nova.openstack.common.rpc')
+CONF = cfg.CONF
+LOG = logging.getLogger(__name__)
+
+
+class Launcher(object):
+ """Launch one or more services and wait for them to complete."""
+
+ def __init__(self):
+ """Initialize the service launcher.
+
+ :returns: None
+
+ """
+ self._services = threadgroup.ThreadGroup()
+ self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
+
+ @staticmethod
+ def run_service(service):
+ """Start and wait for a service to finish.
+
+ :param service: service to run and wait for.
+ :returns: None
+
+ """
+ service.start()
+ service.wait()
+
+ def launch_service(self, service):
+ """Load and start the given service.
+
+ :param service: The service you would like to start.
+ :returns: None
+
+ """
+ service.backdoor_port = self.backdoor_port
+ self._services.add_thread(self.run_service, service)
+
+ def stop(self):
+ """Stop all services which are currently running.
+
+ :returns: None
+
+ """
+ self._services.stop()
+
+ def wait(self):
+ """Waits until all services have been stopped, and then returns.
+
+ :returns: None
+
+ """
+ self._services.wait()
+
+
+class SignalExit(SystemExit):
+ def __init__(self, signo, exccode=1):
+ super(SignalExit, self).__init__(exccode)
+ self.signo = signo
+
+
+class ServiceLauncher(Launcher):
+ def _handle_signal(self, signo, frame):
+ # Allow the process to be killed again and die from natural causes
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+
+ raise SignalExit(signo)
+
+ def wait(self):
+ signal.signal(signal.SIGTERM, self._handle_signal)
+ signal.signal(signal.SIGINT, self._handle_signal)
+
+ LOG.debug(_('Full set of CONF:'))
+ CONF.log_opt_values(LOG, std_logging.DEBUG)
+
+ status = None
+ try:
+ super(ServiceLauncher, self).wait()
+ except SignalExit as exc:
+ signame = {signal.SIGTERM: 'SIGTERM',
+ signal.SIGINT: 'SIGINT'}[exc.signo]
+ LOG.info(_('Caught %s, exiting'), signame)
+ status = exc.code
+ except SystemExit as exc:
+ status = exc.code
+ finally:
+ if rpc:
+ rpc.cleanup()
+ self.stop()
+ return status
+
+
+class ServiceWrapper(object):
+ def __init__(self, service, workers):
+ self.service = service
+ self.workers = workers
+ self.children = set()
+ self.forktimes = []
+
+
+class ProcessLauncher(object):
+ def __init__(self):
+ self.children = {}
+ self.sigcaught = None
+ self.running = True
+ rfd, self.writepipe = os.pipe()
+ self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
+
+ signal.signal(signal.SIGTERM, self._handle_signal)
+ signal.signal(signal.SIGINT, self._handle_signal)
+
+ def _handle_signal(self, signo, frame):
+ self.sigcaught = signo
+ self.running = False
+
+ # Allow the process to be killed again and die from natural causes
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+
+ def _pipe_watcher(self):
+ # This will block until the write end is closed when the parent
+ # dies unexpectedly
+ self.readpipe.read()
+
+ LOG.info(_('Parent process has died unexpectedly, exiting'))
+
+ sys.exit(1)
+
+ def _child_process(self, service):
+ # Setup child signal handlers differently
+ def _sigterm(*args):
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ raise SignalExit(signal.SIGTERM)
+
+ signal.signal(signal.SIGTERM, _sigterm)
+ # Block SIGINT and let the parent send us a SIGTERM
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+ # Reopen the eventlet hub to make sure we don't share an epoll
+ # fd with parent and/or siblings, which would be bad
+ eventlet.hubs.use_hub()
+
+ # Close write to ensure only parent has it open
+ os.close(self.writepipe)
+ # Create greenthread to watch for parent to close pipe
+ eventlet.spawn_n(self._pipe_watcher)
+
+ # Reseed random number generator
+ random.seed()
+
+ launcher = Launcher()
+ launcher.run_service(service)
+
+ def _start_child(self, wrap):
+ if len(wrap.forktimes) > wrap.workers:
+ # Limit ourselves to one process a second (over the period of
+ # number of workers * 1 second). This will allow workers to
+ # start up quickly but ensure we don't fork off children that
+ # die instantly too quickly.
+ if time.time() - wrap.forktimes[0] < wrap.workers:
+ LOG.info(_('Forking too fast, sleeping'))
+ time.sleep(1)
+
+ wrap.forktimes.pop(0)
+
+ wrap.forktimes.append(time.time())
+
+ pid = os.fork()
+ if pid == 0:
+ # NOTE(johannes): All exceptions are caught to ensure this
+ # doesn't fallback into the loop spawning children. It would
+ # be bad for a child to spawn more children.
+ status = 0
+ try:
+ self._child_process(wrap.service)
+ except SignalExit as exc:
+ signame = {signal.SIGTERM: 'SIGTERM',
+ signal.SIGINT: 'SIGINT'}[exc.signo]
+ LOG.info(_('Caught %s, exiting'), signame)
+ status = exc.code
+ except SystemExit as exc:
+ status = exc.code
+ except BaseException:
+ LOG.exception(_('Unhandled exception'))
+ status = 2
+ finally:
+ wrap.service.stop()
+
+ os._exit(status)
+
+ LOG.info(_('Started child %d'), pid)
+
+ wrap.children.add(pid)
+ self.children[pid] = wrap
+
+ return pid
+
+ def launch_service(self, service, workers=1):
+ wrap = ServiceWrapper(service, workers)
+
+ LOG.info(_('Starting %d workers'), wrap.workers)
+ while self.running and len(wrap.children) < wrap.workers:
+ self._start_child(wrap)
+
+ def _wait_child(self):
+ try:
+ # Don't block if no child processes have exited
+ pid, status = os.waitpid(0, os.WNOHANG)
+ if not pid:
+ return None
+ except OSError as exc:
+ if exc.errno not in (errno.EINTR, errno.ECHILD):
+ raise
+ return None
+
+ if os.WIFSIGNALED(status):
+ sig = os.WTERMSIG(status)
+ LOG.info(_('Child %(pid)d killed by signal %(sig)d'),
+ dict(pid=pid, sig=sig))
+ else:
+ code = os.WEXITSTATUS(status)
+ LOG.info(_('Child %(pid)s exited with status %(code)d'),
+ dict(pid=pid, code=code))
+
+ if pid not in self.children:
+ LOG.warning(_('pid %d not in child list'), pid)
+ return None
+
+ wrap = self.children.pop(pid)
+ wrap.children.remove(pid)
+ return wrap
+
+ def wait(self):
+ """Loop waiting on children to die and respawning as necessary"""
+
+ LOG.debug(_('Full set of CONF:'))
+ CONF.log_opt_values(LOG, std_logging.DEBUG)
+
+ while self.running:
+ wrap = self._wait_child()
+ if not wrap:
+ # Yield to other threads if no children have exited
+ # Sleep for a short time to avoid excessive CPU usage
+ # (see bug #1095346)
+ eventlet.greenthread.sleep(.01)
+ continue
+
+ while self.running and len(wrap.children) < wrap.workers:
+ self._start_child(wrap)
+
+ if self.sigcaught:
+ signame = {signal.SIGTERM: 'SIGTERM',
+ signal.SIGINT: 'SIGINT'}[self.sigcaught]
+ LOG.info(_('Caught %s, stopping children'), signame)
+
+ for pid in self.children:
+ try:
+ os.kill(pid, signal.SIGTERM)
+ except OSError as exc:
+ if exc.errno != errno.ESRCH:
+ raise
+
+ # Wait for children to die
+ if self.children:
+ LOG.info(_('Waiting on %d children to exit'), len(self.children))
+ while self.children:
+ self._wait_child()
+
+
+class Service(object):
+ """Service object for binaries running on hosts."""
+
+ def __init__(self, threads=1000):
+ self.tg = threadgroup.ThreadGroup(threads)
+
+ def start(self):
+ pass
+
+ def stop(self):
+ self.tg.stop()
+
+ def wait(self):
+ self.tg.wait()
+
+
+def launch(service, workers=None):
+ if workers:
+ launcher = ProcessLauncher()
+ launcher.launch_service(service, workers=workers)
+ else:
+ launcher = ServiceLauncher()
+ launcher.launch_service(service)
+ return launcher
diff --git a/nova/openstack/common/threadgroup.py b/nova/openstack/common/threadgroup.py
new file mode 100644
index 000000000..26a391f58
--- /dev/null
+++ b/nova/openstack/common/threadgroup.py
@@ -0,0 +1,121 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from eventlet import greenlet
+from eventlet import greenpool
+from eventlet import greenthread
+
+from nova.openstack.common import log as logging
+from nova.openstack.common import loopingcall
+
+
+LOG = logging.getLogger(__name__)
+
+
+def _thread_done(gt, *args, **kwargs):
+ """ Callback function to be passed to GreenThread.link() when we spawn()
+ Calls the :class:`ThreadGroup` to notify if.
+
+ """
+ kwargs['group'].thread_done(kwargs['thread'])
+
+
+class Thread(object):
+ """ Wrapper around a greenthread, that holds a reference to the
+ :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
+ it has done so it can be removed from the threads list.
+ """
+ def __init__(self, thread, group):
+ self.thread = thread
+ self.thread.link(_thread_done, group=group, thread=self)
+
+ def stop(self):
+ self.thread.kill()
+
+ def wait(self):
+ return self.thread.wait()
+
+
+class ThreadGroup(object):
+ """ The point of the ThreadGroup classis to:
+
+ * keep track of timers and greenthreads (making it easier to stop them
+ when need be).
+ * provide an easy API to add timers.
+ """
+ def __init__(self, thread_pool_size=10):
+ self.pool = greenpool.GreenPool(thread_pool_size)
+ self.threads = []
+ self.timers = []
+
+ def add_dynamic_timer(self, callback, initial_delay=None,
+ periodic_interval_max=None, *args, **kwargs):
+ timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs)
+ timer.start(initial_delay=initial_delay,
+ periodic_interval_max=periodic_interval_max)
+ self.timers.append(timer)
+
+ def add_timer(self, interval, callback, initial_delay=None,
+ *args, **kwargs):
+ pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs)
+ pulse.start(interval=interval,
+ initial_delay=initial_delay)
+ self.timers.append(pulse)
+
+ def add_thread(self, callback, *args, **kwargs):
+ gt = self.pool.spawn(callback, *args, **kwargs)
+ th = Thread(gt, self)
+ self.threads.append(th)
+
+ def thread_done(self, thread):
+ self.threads.remove(thread)
+
+ def stop(self):
+ current = greenthread.getcurrent()
+ for x in self.threads:
+ if x is current:
+ # don't kill the current thread.
+ continue
+ try:
+ x.stop()
+ except Exception as ex:
+ LOG.exception(ex)
+
+ for x in self.timers:
+ try:
+ x.stop()
+ except Exception as ex:
+ LOG.exception(ex)
+ self.timers = []
+
+ def wait(self):
+ for x in self.timers:
+ try:
+ x.wait()
+ except greenlet.GreenletExit:
+ pass
+ except Exception as ex:
+ LOG.exception(ex)
+ current = greenthread.getcurrent()
+ for x in self.threads:
+ if x is current:
+ continue
+ try:
+ x.wait()
+ except greenlet.GreenletExit:
+ pass
+ except Exception as ex:
+ LOG.exception(ex)
diff --git a/openstack-common.conf b/openstack-common.conf
index 24c306181..87be18491 100644
--- a/openstack-common.conf
+++ b/openstack-common.conf
@@ -27,7 +27,9 @@ module=processutils
module=redhat-eventlet.patch
module=rootwrap
module=rpc
+module=service
module=strutils
+module=threadgroup
module=timeutils
module=uuidutils