diff options
| author | Jenkins <jenkins@review.openstack.org> | 2011-12-07 18:29:57 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2011-12-07 18:29:57 +0000 |
| commit | 8a8dd22aea07dcbbfa80ea5519dcb7bfb92b0217 (patch) | |
| tree | 85c6042044fda83c3b7954a6cf5fab3c69615e22 | |
| parent | 42b571181b5f2793e03a4958071ef4c8067578e5 (diff) | |
| parent | 2b42968a53c70f2fd759a22e8e5aeadc18ae65f6 (diff) | |
Merge "Refactors periodic tasks to use a decorator."
| -rw-r--r-- | nova/compute/manager.py | 118 | ||||
| -rw-r--r-- | nova/manager.py | 91 | ||||
| -rw-r--r-- | nova/network/manager.py | 5 | ||||
| -rw-r--r-- | nova/scheduler/manager.py | 3 | ||||
| -rw-r--r-- | nova/scheduler/zone_manager.py | 2 | ||||
| -rw-r--r-- | nova/service.py | 5 | ||||
| -rw-r--r-- | nova/tests/test_compute.py | 4 | ||||
| -rw-r--r-- | nova/virt/xenapi/vm_utils.py | 12 | ||||
| -rw-r--r-- | nova/virt/xenapi/vmops.py | 32 | ||||
| -rw-r--r-- | nova/volume/manager.py | 18 |
10 files changed, 169 insertions, 121 deletions
diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 625d2c94b..be48e253e 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -1806,77 +1806,39 @@ class ComputeManager(manager.SchedulerDependentManager): self.driver.destroy(instance_ref, network_info, block_device_info, True) - def periodic_tasks(self, context=None): - """Tasks to be run at a periodic interval.""" - error_list = super(ComputeManager, self).periodic_tasks(context) - if error_list is None: - error_list = [] + @manager.periodic_task + def _poll_rebooting_instances(self, context): + if FLAGS.reboot_timeout > 0: + self.driver.poll_rebooting_instances(FLAGS.reboot_timeout) + + @manager.periodic_task + def _poll_rescued_instances(self, context): + if FLAGS.rescue_timeout > 0: + self.driver.poll_rescued_instances(FLAGS.rescue_timeout) + + @manager.periodic_task + def _poll_unconfirmed_resizes(self, context): + if FLAGS.resize_confirm_window > 0: + self.driver.poll_unconfirmed_resizes(FLAGS.resize_confirm_window) + + @manager.periodic_task + def _poll_bandwidth_usage(self, context, start_time=None, stop_time=None): + if not start_time: + start_time = utils.current_audit_period()[1] - try: - if FLAGS.reboot_timeout > 0: - self.driver.poll_rebooting_instances(FLAGS.reboot_timeout) - except Exception as ex: - LOG.warning(_("Error during poll_rebooting_instances: %s"), - unicode(ex)) - error_list.append(ex) - - try: - if FLAGS.rescue_timeout > 0: - self.driver.poll_rescued_instances(FLAGS.rescue_timeout) - except Exception as ex: - LOG.warning(_("Error during poll_rescued_instances: %s"), - unicode(ex)) - error_list.append(ex) - - try: - if FLAGS.resize_confirm_window > 0: - self.driver.poll_unconfirmed_resizes( - FLAGS.resize_confirm_window) - except Exception as ex: - LOG.warning(_("Error during poll_unconfirmed_resizes: %s"), - unicode(ex)) - error_list.append(ex) - - try: - self._report_driver_status() - except Exception as ex: - LOG.warning(_("Error during report_driver_status(): %s"), - unicode(ex)) - error_list.append(ex) - - try: - self._sync_power_states(context) - except Exception as ex: - LOG.warning(_("Error during power_state sync: %s"), unicode(ex)) - error_list.append(ex) - - try: - self._reclaim_queued_deletes(context) - except Exception as ex: - LOG.warning(_("Error during reclamation of queued deletes: %s"), - unicode(ex)) - error_list.append(ex) - try: - start = utils.current_audit_period()[1] - self._update_bandwidth_usage(context, start) - except NotImplementedError: - # Not all hypervisors have bandwidth polling implemented yet. - # If they don't id doesn't break anything, they just don't get the - # info in the usage events. (mdragon) - pass - except Exception as ex: - LOG.warning(_("Error updating bandwidth usage: %s"), - unicode(ex)) - error_list.append(ex) - - return error_list - - def _update_bandwidth_usage(self, context, start_time, stop_time=None): curr_time = time.time() if curr_time - self._last_bw_usage_poll > FLAGS.bandwith_poll_interval: self._last_bw_usage_poll = curr_time LOG.info(_("Updating bandwidth usage cache")) - bw_usage = self.driver.get_all_bw_usage(start_time, stop_time) + + try: + bw_usage = self.driver.get_all_bw_usage(start_time, stop_time) + except NotImplementedError: + # NOTE(mdragon): Not all hypervisors have bandwidth polling + # implemented yet. If they don't it doesn't break anything, + # they just don't get the info in the usage events. + return + for usage in bw_usage: vif = usage['virtual_interface'] self.db.bw_usage_update(context, @@ -1885,7 +1847,8 @@ class ComputeManager(manager.SchedulerDependentManager): start_time, usage['bw_in'], usage['bw_out']) - def _report_driver_status(self): + @manager.periodic_task + def _report_driver_status(self, context): curr_time = time.time() if curr_time - self._last_host_check > FLAGS.host_state_interval: self._last_host_check = curr_time @@ -1895,6 +1858,7 @@ class ComputeManager(manager.SchedulerDependentManager): self.update_service_capabilities( self.driver.get_host_stats(refresh=True)) + @manager.periodic_task def _sync_power_states(self, context): """Align power states between the database and the hypervisor. @@ -1933,16 +1897,22 @@ class ComputeManager(manager.SchedulerDependentManager): db_instance["id"], power_state=vm_power_state) + @manager.periodic_task def _reclaim_queued_deletes(self, context): """Reclaim instances that are queued for deletion.""" + if FLAGS.reclaim_instance_interval <= 0: + LOG.debug(_("FLAGS.reclaim_instance_interval <= 0, skipping...")) + return instances = self.db.instance_get_all_by_host(context, self.host) - - queue_time = datetime.timedelta( - seconds=FLAGS.reclaim_instance_interval) - curtime = utils.utcnow() for instance in instances: - if instance['vm_state'] == vm_states.SOFT_DELETE and \ - (curtime - instance['deleted_at']) >= queue_time: - LOG.info('Deleting %s' % instance['name']) + old_enough = (not instance.deleted_at or utils.is_older_than( + instance.deleted_at, + FLAGS.reclaim_instance_interval)) + soft_deleted = instance.vm_state == vm_states.SOFT_DELETE + + if soft_deleted and old_enough: + instance_id = instance.id + LOG.info(_("Reclaiming deleted instance %(instance_id)s"), + locals()) self._delete_instance(context, instance) diff --git a/nova/manager.py b/nova/manager.py index 34338ac04..b153f1f42 100644 --- a/nova/manager.py +++ b/nova/manager.py @@ -66,16 +66,98 @@ FLAGS = flags.FLAGS LOG = logging.getLogger('nova.manager') +def periodic_task(*args, **kwargs): + """Decorator to indicate that a method is a periodic task. + + This decorator can be used in two ways: + + 1. Without arguments '@periodic_task', this will be run on every tick + of the periodic scheduler. + + 2. With arguments, @periodic_task(ticks_between_runs=N), this will be + run on every N ticks of the periodic scheduler. + """ + def decorator(f): + f._periodic_task = True + f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0) + return f + + # NOTE(sirp): The `if` is necessary to allow the decorator to be used with + # and without parens. + # + # In the 'with-parens' case (with kwargs present), this function needs to + # return a decorator function since the interpreter will invoke it like: + # + # periodic_task(*args, **kwargs)(f) + # + # In the 'without-parens' case, the original function will be passed + # in as the first argument, like: + # + # periodic_task(f) + if kwargs: + return decorator + else: + return decorator(args[0]) + + +class ManagerMeta(type): + def __init__(cls, names, bases, dict_): + """Metaclass that allows us to collect decorated periodic tasks.""" + super(ManagerMeta, cls).__init__(names, bases, dict_) + + # NOTE(sirp): if the attribute is not present then we must be the base + # class, so, go ahead an initialize it. If the attribute is present, + # then we're a subclass so make a copy of it so we don't step on our + # parent's toes. + try: + cls._periodic_tasks = cls._periodic_tasks[:] + except AttributeError: + cls._periodic_tasks = [] + + try: + cls._ticks_to_skip = cls._ticks_to_skip.copy() + except AttributeError: + cls._ticks_to_skip = {} + + for value in cls.__dict__.values(): + if getattr(value, '_periodic_task', False): + task = value + name = task.__name__ + cls._periodic_tasks.append((name, task)) + cls._ticks_to_skip[name] = task._ticks_between_runs + + class Manager(base.Base): + __metaclass__ = ManagerMeta + def __init__(self, host=None, db_driver=None): if not host: host = FLAGS.host self.host = host super(Manager, self).__init__(db_driver) - def periodic_tasks(self, context=None): + def periodic_tasks(self, context, raise_on_error=False): """Tasks to be run at a periodic interval.""" - pass + for task_name, task in self._periodic_tasks: + full_task_name = '.'.join([self.__class__.__name__, task_name]) + + ticks_to_skip = self._ticks_to_skip[task_name] + if ticks_to_skip > 0: + LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s" + " ticks left until next run"), locals()) + self._ticks_to_skip[task_name] -= 1 + continue + + self._ticks_to_skip[task_name] = task._ticks_between_runs + LOG.debug(_("Running periodic task %(full_task_name)s"), locals()) + + try: + task(self, context) + except Exception as e: + if raise_on_error: + raise + LOG.exception(_("Error during %(full_task_name)s: %(e)s"), + locals()) def init_host(self): """Handle initialization if this is a standalone service. @@ -105,11 +187,10 @@ class SchedulerDependentManager(Manager): """Remember these capabilities to send on next periodic update.""" self.last_capabilities = capabilities - def periodic_tasks(self, context=None): + @periodic_task + def _publish_service_capabilities(self, context): """Pass data back to the scheduler at a periodic interval.""" if self.last_capabilities: LOG.debug(_('Notifying Schedulers of capabilities ...')) api.update_service_capabilities(context, self.service_name, self.host, self.last_capabilities) - - super(SchedulerDependentManager, self).periodic_tasks(context) diff --git a/nova/network/manager.py b/nova/network/manager.py index 20f42dc01..b3e2e55f9 100644 --- a/nova/network/manager.py +++ b/nova/network/manager.py @@ -506,9 +506,8 @@ class NetworkManager(manager.SchedulerDependentManager): for network in self.db.network_get_all_by_host(ctxt, self.host): self._setup_network(ctxt, network) - def periodic_tasks(self, context=None): - """Tasks to be run at a periodic interval.""" - super(NetworkManager, self).periodic_tasks(context) + @manager.periodic_task + def _disassociate_stale_fixed_ips(self, context): if self.timeout_fixed_ips: now = utils.utcnow() timeout = FLAGS.fixed_ip_disassociate_timeout diff --git a/nova/scheduler/manager.py b/nova/scheduler/manager.py index 9d4d03b13..5aadc6e82 100644 --- a/nova/scheduler/manager.py +++ b/nova/scheduler/manager.py @@ -54,7 +54,8 @@ class SchedulerManager(manager.Manager): """Converts all method calls to use the schedule method""" return functools.partial(self._schedule, key) - def periodic_tasks(self, context=None): + @manager.periodic_task + def _poll_child_zones(self, context): """Poll child zones periodically to get status.""" self.zone_manager.ping(context) diff --git a/nova/scheduler/zone_manager.py b/nova/scheduler/zone_manager.py index 0410cfb1d..543e8567e 100644 --- a/nova/scheduler/zone_manager.py +++ b/nova/scheduler/zone_manager.py @@ -318,7 +318,7 @@ class ZoneManager(object): """Try to connect to each child zone and get update.""" self.green_pool.imap(_poll_zone, self.zone_states.values()) - def ping(self, context=None): + def ping(self, context): """Ping should be called periodically to update zone status.""" diff = utils.utcnow() - self.last_zone_db_check if diff.seconds >= FLAGS.zone_db_check_interval: diff --git a/nova/service.py b/nova/service.py index fe6bf8e59..c56973907 100644 --- a/nova/service.py +++ b/nova/service.py @@ -252,9 +252,10 @@ class Service(object): except Exception: pass - def periodic_tasks(self): + def periodic_tasks(self, raise_on_error=False): """Tasks to be run at a periodic interval.""" - self.manager.periodic_tasks(context.get_admin_context()) + ctxt = context.get_admin_context() + self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error) def report_state(self): """Update the state of this service in the datastore.""" diff --git a/nova/tests/test_compute.py b/nova/tests/test_compute.py index 7a3fceef1..6c4540094 100644 --- a/nova/tests/test_compute.py +++ b/nova/tests/test_compute.py @@ -1054,8 +1054,8 @@ class ComputeTestCase(BaseTestCase): self.compute.driver.test_remove_vm(instance_name) # Force the compute manager to do its periodic poll - error_list = self.compute.periodic_tasks(context.get_admin_context()) - self.assertFalse(error_list) + ctxt = context.get_admin_context() + self.compute.periodic_tasks(ctxt, raise_on_error=True) instances = db.instance_get_all(context.get_admin_context()) LOG.info(_("After force-killing instances: %s"), instances) diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py index ca06a9450..e467b8cb4 100644 --- a/nova/virt/xenapi/vm_utils.py +++ b/nova/virt/xenapi/vm_utils.py @@ -786,6 +786,18 @@ class VMHelper(HelperBase): session.call_xenapi("VM.set_name_label", vm_ref, name_label) @classmethod + def list_vms(cls, session): + vm_refs = session.call_xenapi("VM.get_all") + for vm_ref in vm_refs: + vm_rec = session.call_xenapi("VM.get_record", vm_ref) + if vm_rec["is_a_template"]: + continue + elif vm_rec["is_control_domain"]: + continue + else: + yield vm_ref, vm_rec + + @classmethod def lookup(cls, session, name_label): """Look the instance up and return it if available""" vm_refs = session.call_xenapi("VM.get_by_name_label", name_label) diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py index 1a5cd8d33..375af567d 100644 --- a/nova/virt/xenapi/vmops.py +++ b/nova/virt/xenapi/vmops.py @@ -100,28 +100,26 @@ class VMOps(object): """List VM instances.""" # TODO(justinsb): Should we just always use the details method? # Seems to be the same number of API calls.. - vm_refs = [] - for vm_ref in self._session.call_xenapi("VM.get_all"): - vm_rec = self._session.call_xenapi("VM.get_record", vm_ref) - if not vm_rec["is_a_template"] and not vm_rec["is_control_domain"]: - vm_refs.append(vm_rec["name_label"]) - return vm_refs + name_labels = [] + for vm_ref, vm_rec in VMHelper.list_vms(self._session): + name_labels.append(vm_rec["name_label"]) + + return name_labels def list_instances_detail(self): """List VM instances, returning InstanceInfo objects.""" - instance_infos = [] - for vm_ref in self._session.call_xenapi("VM.get_all"): - vm_rec = self._session.call_xenapi("VM.get_record", vm_ref) - if not vm_rec["is_a_template"] and not vm_rec["is_control_domain"]: - name = vm_rec["name_label"] + details = [] + for vm_ref, vm_rec in VMHelper.list_vms(self._session): + name = vm_rec["name_label"] + + # TODO(justinsb): This a roundabout way to map the state + openstack_format = VMHelper.compile_info(vm_rec) + state = openstack_format['state'] - # TODO(justinsb): This a roundabout way to map the state - openstack_format = VMHelper.compile_info(vm_rec) - state = openstack_format['state'] + instance_info = driver.InstanceInfo(name, state) + details.append(instance_info) - instance_info = driver.InstanceInfo(name, state) - instance_infos.append(instance_info) - return instance_infos + return details def confirm_migration(self, migration, instance, network_info): name_label = self._get_orig_vm_name_label(instance) diff --git a/nova/volume/manager.py b/nova/volume/manager.py index 3e1eac92e..5332f9d1e 100644 --- a/nova/volume/manager.py +++ b/nova/volume/manager.py @@ -279,21 +279,6 @@ class VolumeManager(manager.SchedulerDependentManager): for volume in instance_ref['volumes']: self.driver.check_for_export(context, volume['id']) - def periodic_tasks(self, context=None): - """Tasks to be run at a periodic interval.""" - - error_list = [] - try: - self._report_driver_status() - except Exception as ex: - LOG.warning(_("Error during report_driver_status(): %s"), - unicode(ex)) - error_list.append(ex) - - super(VolumeManager, self).periodic_tasks(context) - - return error_list - def _volume_stats_changed(self, stat1, stat2): if FLAGS.volume_force_update_capabilities: return True @@ -304,7 +289,8 @@ class VolumeManager(manager.SchedulerDependentManager): return True return False - def _report_driver_status(self): + @manager.periodic_task + def _report_driver_status(self, context): volume_stats = self.driver.get_volume_stats(refresh=True) if volume_stats: LOG.info(_("Checking volume capabilities")) |
