diff options
| author | Angus Salkeld <asalkeld@redhat.com> | 2012-08-15 17:29:00 +1000 |
|---|---|---|
| committer | Angus Salkeld <asalkeld@redhat.com> | 2012-08-20 20:32:23 +1000 |
| commit | 31495c3a9d31920cea8f98fef23b10958d38e6a5 (patch) | |
| tree | 021b02e4f66549c63bf6a76377ee8d9b0de11150 /openstack | |
| parent | a6c4b02c206120a5815f08ad7112cb68c3e9c3e5 (diff) | |
| download | oslo-31495c3a9d31920cea8f98fef23b10958d38e6a5.tar.gz oslo-31495c3a9d31920cea8f98fef23b10958d38e6a5.tar.xz oslo-31495c3a9d31920cea8f98fef23b10958d38e6a5.zip | |
Add threadgroup to manage timers and greenthreads.
Part of blueprint service-infrastructure
Change-Id: I40593b6ad13d99d9e63c8c04154a0dac5aced02e
Signed-off-by: Angus Salkeld <asalkeld@redhat.com>
Diffstat (limited to 'openstack')
| -rw-r--r-- | openstack/common/service.py | 30 | ||||
| -rw-r--r-- | openstack/common/threadgroup.py | 118 |
2 files changed, 141 insertions, 7 deletions
diff --git a/openstack/common/service.py b/openstack/common/service.py index b464aea..01c38a4 100644 --- a/openstack/common/service.py +++ b/openstack/common/service.py @@ -30,6 +30,7 @@ import eventlet import greenlet from openstack.common import log as logging +from openstack.common import threadgroup from openstack.common.gettextutils import _ @@ -290,24 +291,39 @@ class ProcessLauncher(object): class Service(object): """Service object for binaries running on hosts. - A service takes a manager.""" + A service takes a manager and periodically runs tasks on the manager.""" - def __init__(self, host, manager, *args, **kwargs): + def __init__(self, host, manager, + periodic_interval=None, + periodic_fuzzy_delay=None): self.host = host self.manager = manager - self.running = False + self.periodic_interval = periodic_interval + self.periodic_fuzzy_delay = periodic_fuzzy_delay + self.tg = threadgroup.ThreadGroup('service') + self.periodic_args = [] + self.periodic_kwargs = {} def start(self): - self.running = True if self.manager: self.manager.init_host() + if self.periodic_interval and self.manager: + if self.periodic_fuzzy_delay: + initial_delay = random.randint(0, self.periodic_fuzzy_delay) + else: + initial_delay = 0 + self.tg.add_timer(self.periodic_interval, + self.manager.run_periodic_tasks, + initial_delay, + *self.periodic_args, + **self.periodic_kwargs) + def stop(self): - self.running = False + self.tg.stop() def wait(self): - while self.running: - time.sleep(.1) + self.tg.wait() def launch(service, workers=None): diff --git a/openstack/common/threadgroup.py b/openstack/common/threadgroup.py new file mode 100644 index 0000000..a9dfa5e --- /dev/null +++ b/openstack/common/threadgroup.py @@ -0,0 +1,118 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import os +import sys + +from eventlet import event +from eventlet import greenthread +from eventlet import greenpool + +from openstack.common import loopingcall +from openstack.common.gettextutils import _ +from openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +def _thread_done(gt, *args, **kwargs): + args[0].thread_done(args[1]) + + +class Thread(object): + """ + Wrapper around a greenthread, that holds a reference to + the ThreadGroup. The Thread will notify the ThreadGroup + when it has done so it can be removed from the threads + list. + """ + def __init__(self, name, thread, group): + self.name = name + self.thread = thread + self.thread.link(_thread_done, group, self) + + def stop(self): + self.thread.cancel() + + def wait(self): + return self.thread.wait() + + +class ThreadGroup(): + """ + The point of this class is to: + - keep track of timers and greenthreads (making it easier to stop them + when need be). + - provide an easy API to add timers. + """ + def __init__(self, name, thread_pool_size=10): + self.name = name + self.pool = greenpool.GreenPool(thread_pool_size) + self.threads = [] + self.timers = [] + + def add_timer(self, interval, callback, initial_delay=None, + *args, **kwargs): + pulse = loopingcall.LoopingCall(callback, *args, **kwargs) + pulse.start(interval=interval, + initial_delay=initial_delay) + self.timers.append(pulse) + + def add_thread(self, callback, *args, **kwargs): + gt = self.pool.spawn(callback, *args, **kwargs) + th = Thread(callback.__name__, gt, self) + self.threads.append(th) + + def thread_done(self, thread): + try: + thread.wait() + except Exception as ex: + LOG.exception(ex) + finally: + self.threads.remove(thread) + + def stop(self): + current = greenthread.getcurrent() + for x in self.threads: + if x is current: + # don't kill the current thread. + continue + try: + x.stop() + except Exception as ex: + LOG.exception(ex) + + for x in self.timers: + try: + x.stop() + except Exception as ex: + LOG.exception(ex) + self.timers = [] + + def wait(self): + for x in self.timers: + try: + x.wait() + except Exception as ex: + LOG.exception(ex) + current = greenthread.getcurrent() + for x in self.threads: + if x is current: + continue + try: + x.wait() + except Exception as ex: + LOG.exception(ex) |
