summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-05-01 16:23:57 +0000
committerGerrit Code Review <review@openstack.org>2013-05-01 16:23:57 +0000
commit3266bff670f6ab25ab8d850917898bf4ee2a705c (patch)
treed63aed7ae0d25503c48ea68ef6cba5ed4da21144
parent2caf27936eb3b5ba8eef4b80aa04fec3d36a0b4e (diff)
parent01b0f6554be87a0664e74c63817c3034ee40a6ac (diff)
Merge "Update to using oslo periodic tasks implementation."
-rw-r--r--nova/cells/manager.py5
-rwxr-xr-xnova/compute/manager.py34
-rw-r--r--nova/exception.py4
-rw-r--r--nova/manager.py161
-rw-r--r--nova/network/manager.py5
-rw-r--r--nova/scheduler/manager.py3
-rw-r--r--nova/tests/test_periodic_tasks.py182
7 files changed, 30 insertions, 364 deletions
diff --git a/nova/cells/manager.py b/nova/cells/manager.py
index 6c44d0cba..15ee2224c 100644
--- a/nova/cells/manager.py
+++ b/nova/cells/manager.py
@@ -28,6 +28,7 @@ from nova import context
from nova import exception
from nova import manager
from nova.openstack.common import importutils
+from nova.openstack.common import periodic_task
from nova.openstack.common import timeutils
cell_manager_opts = [
@@ -97,7 +98,7 @@ class CellsManager(manager.Manager):
else:
self._update_our_parents(ctxt)
- @manager.periodic_task
+ @periodic_task.periodic_task
def _update_our_parents(self, ctxt):
"""Update our parent cells with our capabilities and capacity
if we're at the bottom of the tree.
@@ -105,7 +106,7 @@ class CellsManager(manager.Manager):
self.msg_runner.tell_parents_our_capabilities(ctxt)
self.msg_runner.tell_parents_our_capacities(ctxt)
- @manager.periodic_task
+ @periodic_task.periodic_task
def _heal_instances(self, ctxt):
"""Periodic task to send updates for a number of instances to
parent cells.
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index 3f6d828e1..86079a661 100755
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -65,6 +65,7 @@ from nova.openstack.common import jsonutils
from nova.openstack.common import lockutils
from nova.openstack.common import log as logging
from nova.openstack.common.notifier import api as notifier
+from nova.openstack.common import periodic_task
from nova.openstack.common import rpc
from nova.openstack.common.rpc import common as rpc_common
from nova.openstack.common import timeutils
@@ -1059,7 +1060,7 @@ class ComputeManager(manager.SchedulerDependentManager):
scheduler_method(context, *method_args)
return True
- @manager.periodic_task
+ @periodic_task.periodic_task
def _check_instance_build_time(self, context):
"""Ensure that instances are not stuck in build."""
timeout = CONF.instance_build_timeout
@@ -3458,7 +3459,7 @@ class ComputeManager(manager.SchedulerDependentManager):
self.driver.destroy(instance, self._legacy_nw_info(network_info),
block_device_info)
- @manager.periodic_task
+ @periodic_task.periodic_task
def _heal_instance_info_cache(self, context):
"""Called periodically. On every call, try to update the
info_cache's network information for another instance by
@@ -3511,7 +3512,7 @@ class ComputeManager(manager.SchedulerDependentManager):
# We don't care about any failures
pass
- @manager.periodic_task
+ @periodic_task.periodic_task
def _poll_rebooting_instances(self, context):
if CONF.reboot_timeout > 0:
instances = self.conductor_api.instance_get_all_hung_in_rebooting(
@@ -3519,7 +3520,7 @@ class ComputeManager(manager.SchedulerDependentManager):
self.driver.poll_rebooting_instances(CONF.reboot_timeout,
instances)
- @manager.periodic_task
+ @periodic_task.periodic_task
def _poll_rescued_instances(self, context):
if CONF.rescue_timeout > 0:
instances = self.conductor_api.instance_get_all_by_host(
@@ -3539,7 +3540,7 @@ class ComputeManager(manager.SchedulerDependentManager):
for instance in to_unrescue:
self.conductor_api.compute_unrescue(context, instance)
- @manager.periodic_task
+ @periodic_task.periodic_task
def _poll_unconfirmed_resizes(self, context):
if CONF.resize_confirm_window > 0:
capi = self.conductor_api
@@ -3596,7 +3597,7 @@ class ComputeManager(manager.SchedulerDependentManager):
"Will retry later.")
LOG.error(msg % locals(), instance=instance)
- @manager.periodic_task
+ @periodic_task.periodic_task
def _instance_usage_audit(self, context):
if CONF.instance_usage_audit:
if not compute_utils.has_audit_been_run(context,
@@ -3644,7 +3645,7 @@ class ComputeManager(manager.SchedulerDependentManager):
num_instances,
time.time() - start_time))
- @manager.periodic_task
+ @periodic_task.periodic_task
def _poll_bandwidth_usage(self, context):
prev_time, start_time = utils.last_completed_audit_period()
@@ -3751,7 +3752,7 @@ class ComputeManager(manager.SchedulerDependentManager):
notifier.INFO,
compute_utils.usage_volume_info(vol_usage))
- @manager.periodic_task
+ @periodic_task.periodic_task
def _poll_volume_usage(self, context, start_time=None):
if CONF.volume_usage_poll_interval == 0:
return
@@ -3781,7 +3782,7 @@ class ComputeManager(manager.SchedulerDependentManager):
self._send_volume_usage_notifications(context, start_time)
- @manager.periodic_task
+ @periodic_task.periodic_task
def _report_driver_status(self, context):
curr_time = time.time()
if curr_time - self._last_host_check > CONF.host_state_interval:
@@ -3795,8 +3796,8 @@ class ComputeManager(manager.SchedulerDependentManager):
capability['host_ip'] = CONF.my_ip
self.update_service_capabilities(capabilities)
- @manager.periodic_task(spacing=CONF.sync_power_state_interval,
- run_immediately=True)
+ @periodic_task.periodic_task(spacing=CONF.sync_power_state_interval,
+ run_immediately=True)
def _sync_power_states(self, context):
"""Align power states between the database and the hypervisor.
@@ -3956,7 +3957,7 @@ class ComputeManager(manager.SchedulerDependentManager):
LOG.warn(_("Instance is not (soft-)deleted."),
instance=db_instance)
- @manager.periodic_task
+ @periodic_task.periodic_task
def _reclaim_queued_deletes(self, context):
"""Reclaim instances that are queued for deletion."""
interval = CONF.reclaim_instance_interval
@@ -3987,7 +3988,7 @@ class ComputeManager(manager.SchedulerDependentManager):
# pass reservations here.
self._delete_instance(context, instance, bdms)
- @manager.periodic_task
+ @periodic_task.periodic_task
def update_available_resource(self, context):
"""See driver.get_available_resource()
@@ -4023,7 +4024,8 @@ class ComputeManager(manager.SchedulerDependentManager):
return service_ref['compute_node']
- @manager.periodic_task(spacing=CONF.running_deleted_instance_poll_interval)
+ @periodic_task.periodic_task(
+ spacing=CONF.running_deleted_instance_poll_interval)
def _cleanup_running_deleted_instances(self, context):
"""Cleanup any instances which are erroneously still running after
having been deleted.
@@ -4143,8 +4145,8 @@ class ComputeManager(manager.SchedulerDependentManager):
aggregate, host,
isinstance(e, exception.AggregateError))
- @manager.periodic_task(spacing=CONF.image_cache_manager_interval,
- external_process_ok=True)
+ @periodic_task.periodic_task(spacing=CONF.image_cache_manager_interval,
+ external_process_ok=True)
def _run_image_cache_manager_pass(self, context):
"""Run a single pass of the image cache manager."""
diff --git a/nova/exception.py b/nova/exception.py
index fd3899cff..698b49c2b 100644
--- a/nova/exception.py
+++ b/nova/exception.py
@@ -452,10 +452,6 @@ class InvalidID(Invalid):
message = _("Invalid ID received %(id)s.")
-class InvalidPeriodicTaskArg(Invalid):
- message = _("Unexpected argument for periodic task creation: %(arg)s.")
-
-
class ConstraintNotMet(NovaException):
message = _("Constraint not met.")
code = 412
diff --git a/nova/manager.py b/nova/manager.py
index 50f26a1b4..2a151827d 100644
--- a/nova/manager.py
+++ b/nova/manager.py
@@ -53,146 +53,23 @@ This module provides Manager, a base class for managers.
"""
-import datetime
-import eventlet
from oslo.config import cfg
from nova import baserpc
from nova.db import base
-from nova import exception
from nova.openstack.common import log as logging
+from nova.openstack.common import periodic_task
from nova.openstack.common.plugin import pluginmanager
from nova.openstack.common.rpc import dispatcher as rpc_dispatcher
-from nova.openstack.common import timeutils
from nova.scheduler import rpcapi as scheduler_rpcapi
-periodic_opts = [
- cfg.BoolOpt('run_external_periodic_tasks',
- default=True,
- help=('Some periodic tasks can be run in a separate process. '
- 'Should we run them here?')),
- ]
-
CONF = cfg.CONF
-CONF.register_opts(periodic_opts)
CONF.import_opt('host', 'nova.netconf')
LOG = logging.getLogger(__name__)
-DEFAULT_INTERVAL = 60.0
-
-
-def periodic_task(*args, **kwargs):
- """Decorator to indicate that a method is a periodic task.
-
- This decorator can be used in two ways:
-
- 1. Without arguments '@periodic_task', this will be run on every cycle
- of the periodic scheduler.
-
- 2. With arguments:
- @periodic_task(spacing=N [, run_immediately=[True|False]])
- this will be run on approximately every N seconds. If this number is
- negative the periodic task will be disabled. If the run_immediately
- argument is provided and has a value of 'True', the first run of the
- task will be shortly after task scheduler starts. If
- run_immediately is omitted or set to 'False', the first time the
- task runs will be approximately N seconds after the task scheduler
- starts.
- """
- def decorator(f):
- # Test for old style invocation
- if 'ticks_between_runs' in kwargs:
- raise exception.InvalidPeriodicTaskArg(arg='ticks_between_runs')
-
- # Control if run at all
- f._periodic_task = True
- f._periodic_external_ok = kwargs.pop('external_process_ok', False)
- if f._periodic_external_ok and not CONF.run_external_periodic_tasks:
- f._periodic_enabled = False
- else:
- f._periodic_enabled = kwargs.pop('enabled', True)
-
- # Control frequency
- f._periodic_spacing = kwargs.pop('spacing', 0)
- f._periodic_immediate = kwargs.pop('run_immediately', False)
- if f._periodic_immediate:
- f._periodic_last_run = None
- else:
- f._periodic_last_run = timeutils.utcnow()
- return f
-
- # NOTE(sirp): The `if` is necessary to allow the decorator to be used with
- # and without parens.
- #
- # In the 'with-parens' case (with kwargs present), this function needs to
- # return a decorator function since the interpreter will invoke it like:
- #
- # periodic_task(*args, **kwargs)(f)
- #
- # In the 'without-parens' case, the original function will be passed
- # in as the first argument, like:
- #
- # periodic_task(f)
- if kwargs:
- return decorator
- else:
- return decorator(args[0])
-
-
-class ManagerMeta(type):
- def __init__(cls, names, bases, dict_):
- """Metaclass that allows us to collect decorated periodic tasks."""
- super(ManagerMeta, cls).__init__(names, bases, dict_)
-
- # NOTE(sirp): if the attribute is not present then we must be the base
- # class, so, go ahead an initialize it. If the attribute is present,
- # then we're a subclass so make a copy of it so we don't step on our
- # parent's toes.
- try:
- cls._periodic_tasks = cls._periodic_tasks[:]
- except AttributeError:
- cls._periodic_tasks = []
-
- try:
- cls._periodic_last_run = cls._periodic_last_run.copy()
- except AttributeError:
- cls._periodic_last_run = {}
-
- try:
- cls._periodic_spacing = cls._periodic_spacing.copy()
- except AttributeError:
- cls._periodic_spacing = {}
-
- for value in cls.__dict__.values():
- if getattr(value, '_periodic_task', False):
- task = value
- name = task.__name__
-
- if task._periodic_spacing < 0:
- LOG.info(_('Skipping periodic task %(task)s because '
- 'its interval is negative'),
- {'task': name})
- continue
- if not task._periodic_enabled:
- LOG.info(_('Skipping periodic task %(task)s because '
- 'it is disabled'),
- {'task': name})
- continue
-
- # A periodic spacing of zero indicates that this task should
- # be run every pass
- if task._periodic_spacing == 0:
- task._periodic_spacing = None
-
- cls._periodic_tasks.append((name, task))
- cls._periodic_spacing[name] = task._periodic_spacing
- cls._periodic_last_run[name] = task._periodic_last_run
-
-
-class Manager(base.Base):
- __metaclass__ = ManagerMeta
+class Manager(base.Base, periodic_task.PeriodicTasks):
# Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0'
@@ -220,37 +97,7 @@ class Manager(base.Base):
def periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
- idle_for = DEFAULT_INTERVAL
- for task_name, task in self._periodic_tasks:
- full_task_name = '.'.join([self.__class__.__name__, task_name])
-
- now = timeutils.utcnow()
- spacing = self._periodic_spacing[task_name]
- last_run = self._periodic_last_run[task_name]
-
- # If a periodic task is _nearly_ due, then we'll run it early
- if spacing is not None and last_run is not None:
- due = last_run + datetime.timedelta(seconds=spacing)
- if not timeutils.is_soon(due, 0.2):
- idle_for = min(idle_for, timeutils.delta_seconds(now, due))
- continue
-
- if spacing is not None:
- idle_for = min(idle_for, spacing)
-
- LOG.debug(_("Running periodic task %(full_task_name)s"), locals())
- self._periodic_last_run[task_name] = timeutils.utcnow()
-
- try:
- task(self, context)
- except Exception as e:
- if raise_on_error:
- raise
- LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
- locals())
- eventlet.sleep(0)
-
- return idle_for
+ return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
def init_host(self):
"""Hook to do additional manager initialization when one requests
@@ -308,7 +155,7 @@ class SchedulerDependentManager(Manager):
capabilities = [capabilities]
self.last_capabilities = capabilities
- @periodic_task
+ @periodic_task.periodic_task
def publish_service_capabilities(self, context):
"""Pass data back to the scheduler.
diff --git a/nova/network/manager.py b/nova/network/manager.py
index 3c736e1ce..776df16ec 100644
--- a/nova/network/manager.py
+++ b/nova/network/manager.py
@@ -68,6 +68,7 @@ from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
from nova.openstack.common import lockutils
from nova.openstack.common import log as logging
+from nova.openstack.common import periodic_task
from nova.openstack.common.rpc import common as rpc_common
from nova.openstack.common import timeutils
from nova.openstack.common import uuidutils
@@ -350,7 +351,7 @@ class NetworkManager(manager.Manager):
dev = self.driver.get_dev(network)
self.driver.update_dns(ctxt, dev, network)
- @manager.periodic_task
+ @periodic_task.periodic_task
def _disassociate_stale_fixed_ips(self, context):
if self.timeout_fixed_ips:
now = timeutils.utcnow()
@@ -1408,7 +1409,7 @@ class NetworkManager(manager.Manager):
vif['net_uuid'] = network['uuid']
return vif
- @manager.periodic_task(
+ @periodic_task.periodic_task(
spacing=CONF.dns_update_periodic_interval)
def _periodic_update_dns(self, context):
"""Update local DNS entries of all networks on this host."""
diff --git a/nova/scheduler/manager.py b/nova/scheduler/manager.py
index 8fd89858d..c71b34963 100644
--- a/nova/scheduler/manager.py
+++ b/nova/scheduler/manager.py
@@ -39,6 +39,7 @@ from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
from nova.openstack.common import log as logging
from nova.openstack.common.notifier import api as notifier
+from nova.openstack.common import periodic_task
from nova import quota
@@ -287,7 +288,7 @@ class SchedulerManager(manager.Manager):
return {'resource': resource, 'usage': usage}
- @manager.periodic_task
+ @periodic_task.periodic_task
def _expire_reservations(self, context):
QUOTAS.expire(context)
diff --git a/nova/tests/test_periodic_tasks.py b/nova/tests/test_periodic_tasks.py
deleted file mode 100644
index a4f594b3a..000000000
--- a/nova/tests/test_periodic_tasks.py
+++ /dev/null
@@ -1,182 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2012 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import datetime
-
-from testtools import matchers
-
-from nova import manager
-from nova.openstack.common import timeutils
-from nova import test
-
-
-class ManagerMetaTestCase(test.TestCase):
- """Tests for the meta class which manages the creation of periodic tasks.
- """
-
- def test_meta(self):
- class Manager(object):
- __metaclass__ = manager.ManagerMeta
-
- @manager.periodic_task
- def foo(self):
- return 'foo'
-
- @manager.periodic_task(spacing=4)
- def bar(self):
- return 'bar'
-
- @manager.periodic_task(enabled=False)
- def baz(self):
- return 'baz'
-
- m = Manager()
- self.assertThat(m._periodic_tasks, matchers.HasLength(2))
- self.assertEqual(None, m._periodic_spacing['foo'])
- self.assertEqual(4, m._periodic_spacing['bar'])
- self.assertThat(
- m._periodic_spacing, matchers.Not(matchers.Contains('baz')))
-
-
-class Manager(test.TestCase):
- """Tests the periodic tasks portion of the manager class."""
-
- def test_periodic_tasks_with_idle(self):
- class Manager(manager.Manager):
- @manager.periodic_task(spacing=200)
- def bar(self):
- return 'bar'
-
- m = Manager()
- self.assertThat(m._periodic_tasks, matchers.HasLength(1))
- self.assertEqual(200, m._periodic_spacing['bar'])
-
- # Now a single pass of the periodic tasks
- idle = m.periodic_tasks(None)
- self.assertAlmostEqual(60, idle, 1)
-
- def test_periodic_tasks_constant(self):
- class Manager(manager.Manager):
- @manager.periodic_task(spacing=0)
- def bar(self):
- return 'bar'
-
- m = Manager()
- idle = m.periodic_tasks(None)
- self.assertAlmostEqual(60, idle, 1)
-
- def test_periodic_tasks_idle_calculation(self):
- fake_time = datetime.datetime(3000, 1, 1)
- timeutils.set_time_override(fake_time)
-
- class Manager(manager.Manager):
- @manager.periodic_task(spacing=10)
- def bar(self, context):
- return 'bar'
-
- m = Manager()
-
- # Ensure initial values are correct
- self.assertEqual(1, len(m._periodic_tasks))
- task_name, task = m._periodic_tasks[0]
-
- # Test task values
- self.assertEqual('bar', task_name)
- self.assertEqual(10, task._periodic_spacing)
- self.assertEqual(True, task._periodic_enabled)
- self.assertEqual(False, task._periodic_external_ok)
- self.assertEqual(False, task._periodic_immediate)
- self.assertNotEqual(None, task._periodic_last_run)
-
- # Test the manager's representation of those values
- self.assertEqual(10, m._periodic_spacing[task_name])
- self.assertNotEqual(None, m._periodic_last_run[task_name])
-
- timeutils.advance_time_delta(datetime.timedelta(seconds=5))
- m.periodic_tasks(None)
-
- timeutils.advance_time_delta(datetime.timedelta(seconds=5))
- idle = m.periodic_tasks(None)
- self.assertAlmostEqual(10, idle, 1)
-
- def test_periodic_tasks_immediate_runs_now(self):
- fake_time = datetime.datetime(3000, 1, 1)
- timeutils.set_time_override(fake_time)
-
- class Manager(manager.Manager):
- @manager.periodic_task(spacing=10, run_immediately=True)
- def bar(self, context):
- return 'bar'
-
- m = Manager()
-
- # Ensure initial values are correct
- self.assertEqual(1, len(m._periodic_tasks))
- task_name, task = m._periodic_tasks[0]
-
- # Test task values
- self.assertEqual('bar', task_name)
- self.assertEqual(10, task._periodic_spacing)
- self.assertEqual(True, task._periodic_enabled)
- self.assertEqual(False, task._periodic_external_ok)
- self.assertEqual(True, task._periodic_immediate)
- self.assertEqual(None, task._periodic_last_run)
-
- # Test the manager's representation of those values
- self.assertEqual(10, m._periodic_spacing[task_name])
- self.assertEqual(None, m._periodic_last_run[task_name])
-
- idle = m.periodic_tasks(None)
- self.assertEqual(datetime.datetime(3000, 1, 1, 0, 0),
- m._periodic_last_run[task_name])
- self.assertAlmostEqual(10, idle, 1)
-
- timeutils.advance_time_delta(datetime.timedelta(seconds=5))
- idle = m.periodic_tasks(None)
- self.assertAlmostEqual(5, idle, 1)
-
- def test_periodic_tasks_disabled(self):
- class Manager(manager.Manager):
- @manager.periodic_task(spacing=-1)
- def bar(self):
- return 'bar'
-
- m = Manager()
- idle = m.periodic_tasks(None)
- self.assertAlmostEqual(60, idle, 1)
-
- def test_external_running_here(self):
- self.flags(run_external_periodic_tasks=True)
-
- class Manager(manager.Manager):
- @manager.periodic_task(spacing=200, external_process_ok=True)
- def bar(self):
- return 'bar'
-
- m = Manager()
- self.assertThat(m._periodic_tasks, matchers.HasLength(1))
-
- def test_external_running_elsewhere(self):
- self.flags(run_external_periodic_tasks=False)
-
- class Manager(manager.Manager):
- @manager.periodic_task(spacing=200, external_process_ok=True)
- def bar(self):
- return 'bar'
-
- m = Manager()
- self.assertEqual([], m._periodic_tasks)