From 1d1e4cd6d94d1729c86ad52f5d32b630198fe9bf Mon Sep 17 00:00:00 2001 From: Michael Still Date: Tue, 2 Apr 2013 17:26:16 +1100 Subject: Copy recent changes in periodic tasks from nova. The nova periodic task code has moved on. Bring oslo inline with the new code. The next step after this is to update Quantum (the only user of this code I can find) to the new interface, and then move nova across to using this code. Change-Id: I3f041ca9a6ba9f0e5be4b29198d16454c9797fef --- openstack/common/periodic_task.py | 124 ++++++++++++++++++++++++++++++-------- 1 file changed, 98 insertions(+), 26 deletions(-) (limited to 'openstack/common') diff --git a/openstack/common/periodic_task.py b/openstack/common/periodic_task.py index 28ab22e..c48d8b9 100644 --- a/openstack/common/periodic_task.py +++ b/openstack/common/periodic_task.py @@ -13,26 +13,72 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime +import time +from oslo.config import cfg + from openstack.common.gettextutils import _ from openstack.common import log as logging +from openstack.common import timeutils + + +periodic_opts = [ + cfg.BoolOpt('run_external_periodic_tasks', + default=True, + help=('Some periodic tasks can be run in a separate process. ' + 'Should we run them here?')), +] + +CONF = cfg.CONF +CONF.register_opts(periodic_opts) LOG = logging.getLogger(__name__) +DEFAULT_INTERVAL = 60.0 + + +class InvalidPeriodicTaskArg(Exception): + message = _("Unexpected argument for periodic task creation: %(arg)s.") + def periodic_task(*args, **kwargs): """Decorator to indicate that a method is a periodic task. This decorator can be used in two ways: - 1. Without arguments '@periodic_task', this will be run on every tick + 1. Without arguments '@periodic_task', this will be run on every cycle of the periodic scheduler. - 2. With arguments, @periodic_task(ticks_between_runs=N), this will be - run on every N ticks of the periodic scheduler. + 2. With arguments: + @periodic_task(spacing=N [, run_immediately=[True|False]]) + this will be run on approximately every N seconds. If this number is + negative the periodic task will be disabled. If the run_immediately + argument is provided and has a value of 'True', the first run of the + task will be shortly after task scheduler starts. If + run_immediately is omitted or set to 'False', the first time the + task runs will be approximately N seconds after the task scheduler + starts. """ def decorator(f): + # Test for old style invocation + if 'ticks_between_runs' in kwargs: + raise InvalidPeriodicTaskArg(arg='ticks_between_runs') + + # Control if run at all f._periodic_task = True - f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0) + f._periodic_external_ok = kwargs.pop('external_process_ok', False) + if f._periodic_external_ok and not CONF.run_external_periodic_tasks: + f._periodic_enabled = False + else: + f._periodic_enabled = kwargs.pop('enabled', True) + + # Control frequency + f._periodic_spacing = kwargs.pop('spacing', 0) + f._periodic_immediate = kwargs.pop('run_immediately', False) + if f._periodic_immediate: + f._periodic_last_run = None + else: + f._periodic_last_run = timeutils.utcnow() return f # NOTE(sirp): The `if` is necessary to allow the decorator to be used with @@ -59,7 +105,7 @@ class _PeriodicTasksMeta(type): super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_) # NOTE(sirp): if the attribute is not present then we must be the base - # class, so, go ahead and initialize it. If the attribute is present, + # class, so, go ahead an initialize it. If the attribute is present, # then we're a subclass so make a copy of it so we don't step on our # parent's toes. try: @@ -68,20 +114,39 @@ class _PeriodicTasksMeta(type): cls._periodic_tasks = [] try: - cls._ticks_to_skip = cls._ticks_to_skip.copy() + cls._periodic_last_run = cls._periodic_last_run.copy() except AttributeError: - cls._ticks_to_skip = {} + cls._periodic_last_run = {} + + try: + cls._periodic_spacing = cls._periodic_spacing.copy() + except AttributeError: + cls._periodic_spacing = {} - # This uses __dict__ instead of - # inspect.getmembers(cls, inspect.ismethod) so only the methods of the - # current class are added when this class is scanned, and base classes - # are not added redundantly. for value in cls.__dict__.values(): if getattr(value, '_periodic_task', False): task = value name = task.__name__ + + if task._periodic_spacing < 0: + LOG.info(_('Skipping periodic task %(task)s because ' + 'its interval is negative'), + {'task': name}) + continue + if not task._periodic_enabled: + LOG.info(_('Skipping periodic task %(task)s because ' + 'it is disabled'), + {'task': name}) + continue + + # A periodic spacing of zero indicates that this task should + # be run every pass + if task._periodic_spacing == 0: + task._periodic_spacing = None + cls._periodic_tasks.append((name, task)) - cls._ticks_to_skip[name] = task._ticks_between_runs + cls._periodic_spacing[name] = task._periodic_spacing + cls._periodic_last_run[name] = task._periodic_last_run class PeriodicTasks(object): @@ -89,27 +154,34 @@ class PeriodicTasks(object): def run_periodic_tasks(self, context, raise_on_error=False): """Tasks to be run at a periodic interval.""" + idle_for = DEFAULT_INTERVAL for task_name, task in self._periodic_tasks: full_task_name = '.'.join([self.__class__.__name__, task_name]) - ticks_to_skip = self._ticks_to_skip[task_name] - if ticks_to_skip > 0: - LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s" - " ticks left until next run"), - dict(full_task_name=full_task_name, - ticks_to_skip=ticks_to_skip)) - self._ticks_to_skip[task_name] -= 1 - continue + now = timeutils.utcnow() + spacing = self._periodic_spacing[task_name] + last_run = self._periodic_last_run[task_name] + + # If a periodic task is _nearly_ due, then we'll run it early + if spacing is not None and last_run is not None: + due = last_run + datetime.timedelta(seconds=spacing) + if not timeutils.is_soon(due, 0.2): + idle_for = min(idle_for, timeutils.delta_seconds(now, due)) + continue - self._ticks_to_skip[task_name] = task._ticks_between_runs - LOG.debug(_("Running periodic task %(full_task_name)s"), - dict(full_task_name=full_task_name)) + if spacing is not None: + idle_for = min(idle_for, spacing) + + LOG.debug(_("Running periodic task %(full_task_name)s"), locals()) + self._periodic_last_run[task_name] = timeutils.utcnow() try: task(self, context) except Exception as e: if raise_on_error: raise - LOG.exception(_("Error during %(full_task_name)s:" - " %(e)s"), - dict(e=e, full_task_name=full_task_name)) + LOG.exception(_("Error during %(full_task_name)s: %(e)s"), + locals()) + time.sleep(0) + + return idle_for -- cgit