summaryrefslogtreecommitdiffstats
path: root/openstack/common
diff options
context:
space:
mode:
authorAngus Salkeld <asalkeld@redhat.com>2012-08-15 17:29:00 +1000
committerAngus Salkeld <asalkeld@redhat.com>2012-08-20 20:32:23 +1000
commit31495c3a9d31920cea8f98fef23b10958d38e6a5 (patch)
tree021b02e4f66549c63bf6a76377ee8d9b0de11150 /openstack/common
parenta6c4b02c206120a5815f08ad7112cb68c3e9c3e5 (diff)
downloadoslo-31495c3a9d31920cea8f98fef23b10958d38e6a5.tar.gz
oslo-31495c3a9d31920cea8f98fef23b10958d38e6a5.tar.xz
oslo-31495c3a9d31920cea8f98fef23b10958d38e6a5.zip
Add threadgroup to manage timers and greenthreads.
Part of blueprint service-infrastructure Change-Id: I40593b6ad13d99d9e63c8c04154a0dac5aced02e Signed-off-by: Angus Salkeld <asalkeld@redhat.com>
Diffstat (limited to 'openstack/common')
-rw-r--r--openstack/common/service.py30
-rw-r--r--openstack/common/threadgroup.py118
2 files changed, 141 insertions, 7 deletions
diff --git a/openstack/common/service.py b/openstack/common/service.py
index b464aea..01c38a4 100644
--- a/openstack/common/service.py
+++ b/openstack/common/service.py
@@ -30,6 +30,7 @@ import eventlet
import greenlet
from openstack.common import log as logging
+from openstack.common import threadgroup
from openstack.common.gettextutils import _
@@ -290,24 +291,39 @@ class ProcessLauncher(object):
class Service(object):
"""Service object for binaries running on hosts.
- A service takes a manager."""
+ A service takes a manager and periodically runs tasks on the manager."""
- def __init__(self, host, manager, *args, **kwargs):
+ def __init__(self, host, manager,
+ periodic_interval=None,
+ periodic_fuzzy_delay=None):
self.host = host
self.manager = manager
- self.running = False
+ self.periodic_interval = periodic_interval
+ self.periodic_fuzzy_delay = periodic_fuzzy_delay
+ self.tg = threadgroup.ThreadGroup('service')
+ self.periodic_args = []
+ self.periodic_kwargs = {}
def start(self):
- self.running = True
if self.manager:
self.manager.init_host()
+ if self.periodic_interval and self.manager:
+ if self.periodic_fuzzy_delay:
+ initial_delay = random.randint(0, self.periodic_fuzzy_delay)
+ else:
+ initial_delay = 0
+ self.tg.add_timer(self.periodic_interval,
+ self.manager.run_periodic_tasks,
+ initial_delay,
+ *self.periodic_args,
+ **self.periodic_kwargs)
+
def stop(self):
- self.running = False
+ self.tg.stop()
def wait(self):
- while self.running:
- time.sleep(.1)
+ self.tg.wait()
def launch(service, workers=None):
diff --git a/openstack/common/threadgroup.py b/openstack/common/threadgroup.py
new file mode 100644
index 0000000..a9dfa5e
--- /dev/null
+++ b/openstack/common/threadgroup.py
@@ -0,0 +1,118 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import os
+import sys
+
+from eventlet import event
+from eventlet import greenthread
+from eventlet import greenpool
+
+from openstack.common import loopingcall
+from openstack.common.gettextutils import _
+from openstack.common import log as logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+def _thread_done(gt, *args, **kwargs):
+ args[0].thread_done(args[1])
+
+
+class Thread(object):
+ """
+ Wrapper around a greenthread, that holds a reference to
+ the ThreadGroup. The Thread will notify the ThreadGroup
+ when it has done so it can be removed from the threads
+ list.
+ """
+ def __init__(self, name, thread, group):
+ self.name = name
+ self.thread = thread
+ self.thread.link(_thread_done, group, self)
+
+ def stop(self):
+ self.thread.cancel()
+
+ def wait(self):
+ return self.thread.wait()
+
+
+class ThreadGroup():
+ """
+ The point of this class is to:
+ - keep track of timers and greenthreads (making it easier to stop them
+ when need be).
+ - provide an easy API to add timers.
+ """
+ def __init__(self, name, thread_pool_size=10):
+ self.name = name
+ self.pool = greenpool.GreenPool(thread_pool_size)
+ self.threads = []
+ self.timers = []
+
+ def add_timer(self, interval, callback, initial_delay=None,
+ *args, **kwargs):
+ pulse = loopingcall.LoopingCall(callback, *args, **kwargs)
+ pulse.start(interval=interval,
+ initial_delay=initial_delay)
+ self.timers.append(pulse)
+
+ def add_thread(self, callback, *args, **kwargs):
+ gt = self.pool.spawn(callback, *args, **kwargs)
+ th = Thread(callback.__name__, gt, self)
+ self.threads.append(th)
+
+ def thread_done(self, thread):
+ try:
+ thread.wait()
+ except Exception as ex:
+ LOG.exception(ex)
+ finally:
+ self.threads.remove(thread)
+
+ def stop(self):
+ current = greenthread.getcurrent()
+ for x in self.threads:
+ if x is current:
+ # don't kill the current thread.
+ continue
+ try:
+ x.stop()
+ except Exception as ex:
+ LOG.exception(ex)
+
+ for x in self.timers:
+ try:
+ x.stop()
+ except Exception as ex:
+ LOG.exception(ex)
+ self.timers = []
+
+ def wait(self):
+ for x in self.timers:
+ try:
+ x.wait()
+ except Exception as ex:
+ LOG.exception(ex)
+ current = greenthread.getcurrent()
+ for x in self.threads:
+ if x is current:
+ continue
+ try:
+ x.wait()
+ except Exception as ex:
+ LOG.exception(ex)