diff options
| author | Jenkins <jenkins@review.openstack.org> | 2012-09-20 07:48:34 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2012-09-20 07:48:34 +0000 |
| commit | 59ad151a296c22d8be053e408d9377acebb5ad15 (patch) | |
| tree | 16ed60798e43ba98c93516f05bac2ccff25b901b /nova/compute | |
| parent | 62435929e287495626a515975221b1d506999ab0 (diff) | |
| parent | 8e851409f3a8a345ec954a880c81232fbf9e27b4 (diff) | |
Merge "Fix bugs in resource tracker and cleanup"
Diffstat (limited to 'nova/compute')
| -rw-r--r-- | nova/compute/manager.py | 18 | ||||
| -rw-r--r-- | nova/compute/resource_tracker.py | 463 |
2 files changed, 289 insertions, 192 deletions
diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 7ee84abd8..42a2fd447 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -251,8 +251,7 @@ class ComputeManager(manager.SchedulerDependentManager): (old_ref, instance_ref) = self.db.instance_update_and_get_original( context, instance_uuid, kwargs) - self.resource_tracker.update_load_stats_for_instance(context, - instance_ref) + self.resource_tracker.update_usage(context, instance_ref) notifications.send_update(context, old_ref, instance_ref) return instance_ref @@ -480,10 +479,14 @@ class ComputeManager(manager.SchedulerDependentManager): network_info = self._allocate_network(context, instance, requested_networks) try: - memory_mb_limit = filter_properties.get('memory_mb_limit', - None) - with self.resource_tracker.instance_resource_claim(context, - instance, memory_mb_limit=memory_mb_limit): + limits = filter_properties.get('limits', {}) + with self.resource_tracker.resource_claim(context, instance, + limits): + # Resources are available to build this instance here, + # mark it as belonging to this host: + self._instance_update(context, instance['uuid'], + host=self.host, launched_on=self.host) + block_device_info = self._prep_block_device(context, instance) instance = self._spawn(context, instance, image_meta, @@ -684,7 +687,6 @@ class ComputeManager(manager.SchedulerDependentManager): LOG.audit(_('Starting instance...'), context=context, instance=instance) self._instance_update(context, instance['uuid'], - host=self.host, launched_on=self.host, vm_state=vm_states.BUILDING, task_state=None, expected_task_state=(task_states.SCHEDULING, @@ -889,8 +891,6 @@ class ComputeManager(manager.SchedulerDependentManager): self.db.instance_destroy(context, instance_uuid) system_meta = self.db.instance_system_metadata_get(context, instance_uuid) - # mark resources free - self.resource_tracker.free_resources(context) self._notify_about_instance_usage(context, instance, "delete.end", system_metadata=system_meta) diff --git a/nova/compute/resource_tracker.py b/nova/compute/resource_tracker.py index cc088655b..eb0d302b0 100644 --- a/nova/compute/resource_tracker.py +++ b/nova/compute/resource_tracker.py @@ -19,17 +19,22 @@ scheduler with useful information about availability through the ComputeNode model. """ -from nova import context +from nova.compute import vm_states from nova import db from nova import exception from nova import flags from nova.openstack.common import cfg from nova.openstack.common import importutils +from nova.openstack.common import jsonutils from nova.openstack.common import log as logging from nova.openstack.common import timeutils from nova import utils resource_tracker_opts = [ + cfg.IntOpt('reserved_host_disk_mb', default=0, + help='Amount of disk in MB to reserve for the host'), + cfg.IntOpt('reserved_host_memory_mb', default=512, + help='Amount of memory in MB to reserve for the host'), cfg.IntOpt('claim_timeout_seconds', default=600, help='How long, in seconds, before a resource claim times out'), cfg.StrOpt('compute_stats_class', @@ -52,46 +57,36 @@ class Claim(object): correct decisions with respect to host selection. """ - def __init__(self, claim_id, memory_mb, disk_gb, timeout, *args, **kwargs): - self.claim_id = claim_id - self.memory_mb = memory_mb - self.disk_gb = disk_gb + def __init__(self, instance, timeout): + self.instance = jsonutils.to_primitive(instance) self.timeout = timeout self.expire_ts = timeutils.utcnow_ts() + timeout - def apply_claim(self, resources): - """Adjust the resources required from available resources. - - :param resources: Should be a dictionary-like object that - has fields like a compute node - """ - return self._apply(resources) - - def undo_claim(self, resources): - return self._apply(resources, sign=-1) - def is_expired(self): """Determine if this adjustment is old enough that we can assume it's no longer needed. """ return timeutils.utcnow_ts() > self.expire_ts - def _apply(self, resources, sign=1): - values = {} - values['memory_mb_used'] = (resources['memory_mb_used'] + sign * - self.memory_mb) - values['free_ram_mb'] = (resources['free_ram_mb'] - sign * - self.memory_mb) - values['local_gb_used'] = (resources['local_gb_used'] + sign * - self.disk_gb) - values['free_disk_gb'] = (resources['free_disk_gb'] - sign * - self.disk_gb) + @property + def claim_id(self): + return self.instance['uuid'] - return values + @property + def disk_gb(self): + return self.instance['root_gb'] + self.instance['ephemeral_gb'] + + @property + def memory_mb(self): + return self.instance['memory_mb'] + + @property + def vcpus(self): + return self.instance['vcpus'] def __str__(self): - return "[Claim %d: %d MB memory, %d GB disk]" % (self.claim_id, - self.memory_mb, self.disk_gb) + return "[Claim %s: %d MB memory, %d GB disk, %d VCPUS]" % \ + (self.claim_id, self.memory_mb, self.disk_gb, self.vcpus) class ResourceContextManager(object): @@ -127,46 +122,25 @@ class ResourceTracker(object): self.next_claim_id = 1 self.claims = {} self.stats = importutils.import_object(FLAGS.compute_stats_class) + self.tracked_instances = {} - def resource_claim(self, context, *args, **kwargs): - claim = self.begin_resource_claim(context, *args, **kwargs) - return ResourceContextManager(context, claim, self) - - def instance_resource_claim(self, context, instance_ref, *args, **kwargs): - claim = self.begin_instance_resource_claim(context, instance_ref, - *args, **kwargs) + def resource_claim(self, context, instance_ref, limits=None): + claim = self.begin_resource_claim(context, instance_ref, limits) return ResourceContextManager(context, claim, self) @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) - def begin_instance_resource_claim(self, context, instance_ref, *args, - **kwargs): - """Method to begin a resource claim for a new instance.""" - memory_mb = instance_ref['memory_mb'] - disk_gb = instance_ref['root_gb'] + instance_ref['ephemeral_gb'] - - claim = self._do_begin_resource_claim(context, memory_mb, disk_gb, - *args, **kwargs) - if claim: - # also update load stats related to new instances firing up - - values = self._create_load_stats(context, instance_ref) - self.compute_node = self._update(context, values) - return claim - - @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) - def begin_resource_claim(self, context, memory_mb, disk_gb, - memory_mb_limit=None, timeout=None, *args, **kwargs): + def begin_resource_claim(self, context, instance_ref, limits=None, + timeout=None): """Indicate that some resources are needed for an upcoming compute - host operation. + instance build operation. - This should be called any time the compute node is about to perform - an operation that will consume resources. + This should be called before the compute node is about to perform + an instance build operation that will consume additional resources. - :param memory_mb: security context - :param memory_mb: Memory in MB to be claimed - :param root_gb: Disk in GB to be claimed - :param memory_mb_limit: Memory in MB that is the maximum to allocate on - this node. May exceed installed physical memory if - oversubscription is the desired behavior. + :param context: security context + :param instance_ref: instance to reserve resources for + :param limits: Dict of oversubscription limits for memory, disk, + and CPUs. :param timeout: How long, in seconds, the operation that requires these resources should take to actually allocate what it needs from the hypervisor. If the timeout is @@ -177,71 +151,142 @@ class ResourceTracker(object): compute operation is finished. Returns None if the claim failed. """ - - return self._do_begin_resource_claim(context, memory_mb, disk_gb, - memory_mb_limit, timeout, *args, **kwargs) - - def _do_begin_resource_claim(self, context, memory_mb, disk_gb, - memory_mb_limit=None, timeout=None, *args, **kwargs): - if self.disabled: return + if not limits: + limits = {} + if not timeout: timeout = FLAGS.claim_timeout_seconds - memory_mb = abs(memory_mb) - disk_gb = abs(disk_gb) + # If an individual limit is None, the resource will be considered + # unlimited: + memory_mb_limit = limits.get('memory_mb') + disk_gb_limit = limits.get('disk_gb') + vcpu_limit = limits.get('vcpu') + + memory_mb = instance_ref['memory_mb'] + disk_gb = instance_ref['root_gb'] + instance_ref['ephemeral_gb'] + vcpus = instance_ref['vcpus'] msg = _("Attempting claim: memory %(memory_mb)d MB, disk %(disk_gb)d " - "GB, mem limit %(memory_mb_limit)s") % locals() + "GB, VCPUs %(vcpus)d") % locals() LOG.audit(msg) - if not memory_mb_limit: - # default to total memory: - memory_mb_limit = self.compute_node['memory_mb'] + # Test for resources: + if not self._can_claim_memory(memory_mb, memory_mb_limit): + return - free_ram_mb = memory_mb_limit - self.compute_node['memory_mb_used'] + if not self._can_claim_disk(disk_gb, disk_gb_limit): + return + if not self._can_claim_cpu(vcpus, vcpu_limit): + return + + # keep track of this claim until we know whether the compute operation + # was successful/completed: + claim = Claim(instance_ref, timeout) + self.claims[claim.claim_id] = claim + + # Mark resources in-use and update stats + self._update_usage_from_instance(self.compute_node, instance_ref) + + # persist changes to the compute node: + self._update(context, self.compute_node) + return claim + + def _can_claim_memory(self, memory_mb, memory_mb_limit): + """Test if memory needed for a claim can be safely allocated""" # Installed memory and usage info: msg = _("Total memory: %(total_mem)d MB, used: %(used_mem)d MB, free: " - "%(free_mem)d") % dict( + "%(free_mem)d MB") % dict( total_mem=self.compute_node['memory_mb'], used_mem=self.compute_node['memory_mb_used'], free_mem=self.compute_node['local_gb_used']) LOG.audit(msg) + if memory_mb_limit is None: + # treat memory as unlimited: + LOG.audit(_("Memory limit not specified, defaulting to unlimited")) + return True + + free_ram_mb = memory_mb_limit - self.compute_node['memory_mb_used'] + # Oversubscribed memory policy info: - msg = _("Limit: %(memory_mb_limit)d MB, free: %(free_ram_mb)d") % \ - locals() + msg = _("Memory limit: %(memory_mb_limit)d MB, free: " + "%(free_ram_mb)d MB") % locals() LOG.audit(msg) - if memory_mb > free_ram_mb: + can_claim_mem = memory_mb <= free_ram_mb + + if not can_claim_mem: msg = _("Unable to claim resources. Free memory %(free_ram_mb)d " "MB < requested memory %(memory_mb)d MB") % locals() LOG.info(msg) - return None - if disk_gb > self.compute_node['free_disk_gb']: + return can_claim_mem + + def _can_claim_disk(self, disk_gb, disk_gb_limit): + """Test if disk space needed can be safely allocated""" + # Installed disk and usage info: + msg = _("Total disk: %(total_disk)d GB, used: %(used_disk)d GB, free: " + "%(free_disk)d GB") % dict( + total_disk=self.compute_node['local_gb'], + used_disk=self.compute_node['local_gb_used'], + free_disk=self.compute_node['free_disk_gb']) + LOG.audit(msg) + + if disk_gb_limit is None: + # treat disk as unlimited: + LOG.audit(_("Disk limit not specified, defaulting to unlimited")) + return True + + free_disk_gb = disk_gb_limit - self.compute_node['local_gb_used'] + + # Oversubscribed disk policy info: + msg = _("Disk limit: %(disk_gb_limit)d GB, free: " + "%(free_disk_gb)d GB") % locals() + LOG.audit(msg) + + can_claim_disk = disk_gb <= free_disk_gb + if not can_claim_disk: msg = _("Unable to claim resources. Free disk %(free_disk_gb)d GB" " < requested disk %(disk_gb)d GB") % dict( free_disk_gb=self.compute_node['free_disk_gb'], disk_gb=disk_gb) LOG.info(msg) - return None - claim_id = self._get_next_id() - c = Claim(claim_id, memory_mb, disk_gb, timeout, *args, **kwargs) + return can_claim_disk - # adjust compute node usage values and save so scheduler will see it: - values = c.apply_claim(self.compute_node) - self.compute_node = self._update(context, values) + def _can_claim_cpu(self, vcpus, vcpu_limit): + """Test if CPUs can be safely allocated according to given policy.""" - # keep track of this claim until we know whether the compute operation - # was successful/completed: - self.claims[claim_id] = c - return c + msg = _("Total VCPUs: %(total_vcpus)d, used: %(used_vcpus)d") \ + % dict(total_vcpus=self.compute_node['vcpus'], + used_vcpus=self.compute_node['vcpus_used']) + LOG.audit(msg) + if vcpu_limit is None: + # treat cpu as unlimited: + LOG.audit(_("VCPU limit not specified, defaulting to unlimited")) + return True + + # Oversubscribed disk policy info: + msg = _("CPU limit: %(vcpu_limit)d") % locals() + LOG.audit(msg) + + free_vcpus = vcpu_limit - self.compute_node['vcpus_used'] + can_claim_cpu = vcpus <= free_vcpus + + if not can_claim_cpu: + msg = _("Unable to claim resources. Free CPU %(free_vcpus)d < " + "requested CPU %(vcpus)d") % locals() + LOG.info(msg) + + return can_claim_cpu + + @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) def finish_resource_claim(self, claim): """Indicate that the compute operation that previously claimed the resources identified by 'claim' has now completed and the resources @@ -260,7 +305,7 @@ class ResourceTracker(object): if self.claims.pop(claim.claim_id, None): LOG.info(_("Finishing claim: %s") % claim) else: - LOG.info(_("Can't find claim %d. It may have been 'finished' " + LOG.info(_("Can't find claim %s. It may have been 'finished' " "twice, or it has already timed out."), claim.claim_id) @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) @@ -278,27 +323,37 @@ class ResourceTracker(object): # un-claim the resources: if self.claims.pop(claim.claim_id, None): LOG.info(_("Aborting claim: %s") % claim) - values = claim.undo_claim(self.compute_node) - self.compute_node = self._update(context, values) + # flag the instance as deleted to revert the resource usage + # and associated stats: + claim.instance['vm_state'] = vm_states.DELETED + self._update_usage_from_instance(self.compute_node, claim.instance) + self._update(context, self.compute_node) + else: # can't find the claim. this may mean the claim already timed # out or it was already explicitly finished/aborted. - LOG.info(_("Claim %d not found. It either timed out or was " + LOG.audit(_("Claim %s not found. It either timed out or was " "already explicitly finished/aborted"), claim.claim_id) + @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) + def update_usage(self, context, instance): + """Update the resource usage and stats after a change in an + instance + """ + if self.disabled: + return + + # don't update usage for this instance unless it submitted a resource + # claim first: + uuid = instance['uuid'] + if uuid in self.tracked_instances: + self._update_usage_from_instance(self.compute_node, instance) + self._update(context.elevated(), self.compute_node) + @property def disabled(self): return self.compute_node is None - def free_resources(self, context): - """A compute operation finished freeing up resources. Update compute - model to reflect updated resource usage. - - (The hypervisor may not immediately 'GC' all resources, so ask directly - to see what's available to update the compute node model.) - """ - self.update_available_resource(context.elevated()) - @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) def update_available_resource(self, context): """Override in-memory calculations of compute node resource usage based @@ -308,36 +363,33 @@ class ResourceTracker(object): declared a need for resources, but not necessarily retrieved them from the hypervisor layer yet. """ - # ask hypervisor for its view of resource availability & - # usage: resources = self.driver.get_available_resource() if not resources: # The virt driver does not support this function - LOG.warn(_("Virt driver does not support " + LOG.audit(_("Virt driver does not support " "'get_available_resource' Compute tracking is disabled.")) self.compute_node = None self.claims = {} return - # Confirm resources dictionary contains expected keys: self._verify_resources(resources) - resources['free_ram_mb'] = resources['memory_mb'] - \ - resources['memory_mb_used'] - resources['free_disk_gb'] = resources['local_gb'] - \ - resources['local_gb_used'] + self._report_hypervisor_resource_view(resources) + + self._purge_expired_claims() - LOG.audit(_("free_ram_mb: %s") % resources['free_ram_mb']) - LOG.audit(_("free_disk_gb: %s") % resources['free_disk_gb']) - # Apply resource claims representing in-progress operations to - # 'resources'. This may over-estimate the amount of resources in use, - # at least until the next time 'update_available_resource' runs. - self._apply_claims(resources) + # Grab all instances assigned to this host: + filters = {'host': self.host, 'deleted': False} + instances = db.instance_get_all_by_filters(context, filters) - # also generate all load stats: - values = self._create_load_stats(context) - resources.update(values) + # Now calculate usage based on instance utilization: + self._update_usage_from_instances(resources, instances) + self._report_final_resource_view(resources) + self._sync_compute_node(context, resources) + + def _sync_compute_node(self, context, resources): + """Create or update the compute node DB record""" if not self.compute_node: # we need a copy of the ComputeNode record: service = self._get_service(context) @@ -352,80 +404,30 @@ class ResourceTracker(object): if not self.compute_node: # Need to create the ComputeNode record: resources['service_id'] = service['id'] - self.compute_node = self._create(context, resources) + self._create(context, resources) LOG.info(_('Compute_service record created for %s ') % self.host) else: # just update the record: - self.compute_node = self._update(context, resources, - prune_stats=True) + self._update(context, resources, prune_stats=True) LOG.info(_('Compute_service record updated for %s ') % self.host) - @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) - def update_load_stats_for_instance(self, context, instance_ref): - """Update workload stats for the local compute host.""" - - if self.disabled: - return - - values = {} - self.stats.update_stats_for_instance(instance_ref) - values['stats'] = self.stats - - values['current_workload'] = self.stats.calculate_workload() - values['running_vms'] = self.stats.num_instances - values['vcpus_used'] = self.stats.num_vcpus_used - - self.compute_node = self._update(context.elevated(), values) - - def _apply_claims(self, resources): - """Apply in-progress resource claims to the 'resources' dict from the - virt layer - """ + def _purge_expired_claims(self): + """Purge expired resource claims""" for claim_id in self.claims.keys(): c = self.claims[claim_id] if c.is_expired(): - # if this claim is expired, just expunge it - LOG.info(_("Expiring resource claim %d"), claim_id) + # if this claim is expired, just expunge it. + # it is assumed that the instance will eventually get built + # successfully. + LOG.audit(_("Expiring resource claim %s"), claim_id) self.claims.pop(claim_id) - else: - values = c.apply_claim(resources) - resources.update(values) def _create(self, context, values): """Create the compute node in the DB""" # initialize load stats from existing instances: compute_node = db.compute_node_create(context, values) - return compute_node - - def _create_load_stats(self, context, instance=None): - """For each existing instance generate load stats for the compute - node record. - """ - values = {} - - if instance: - instances = [instance] - else: - self.stats.clear() # re-generating all, so clear old stats - - # grab all instances that are not yet DELETED - filters = {'host': self.host, 'deleted': False} - instances = db.instance_get_all_by_filters(context, filters) - - for instance in instances: - self.stats.update_stats_for_instance(instance) - - values['current_workload'] = self.stats.calculate_workload() - values['running_vms'] = self.stats.num_instances - values['vcpus_used'] = self.stats.num_vcpus_used - values['stats'] = self.stats - return values - - def _get_next_id(self): - next_id = self.next_claim_id - self.next_claim_id += 1 - return next_id + self.compute_node = dict(compute_node) def _get_service(self, context): try: @@ -434,10 +436,105 @@ class ResourceTracker(object): except exception.NotFound: LOG.warn(_("No service record for host %s"), self.host) + def _report_hypervisor_resource_view(self, resources): + """Log the hypervisor's view of free memory in and free disk. + This is just a snapshot of resource usage recorded by the + virt driver. + """ + free_ram_mb = resources['memory_mb'] - resources['memory_mb_used'] + free_disk_gb = resources['local_gb'] - resources['local_gb_used'] + + LOG.debug(_("Hypervisor: free ram (MB): %s") % free_ram_mb) + LOG.debug(_("Hypervisor: free disk (GB): %s") % free_disk_gb) + + vcpus = resources['vcpus'] + if vcpus: + free_vcpus = vcpus - resources['vcpus_used'] + LOG.debug(_("Hypervisor: free VCPUs: %s") % free_vcpus) + else: + LOG.debug(_("Hypervisor: VCPU information unavailable")) + + def _report_final_resource_view(self, resources): + """Report final calculate of free memory and free disk including + instance calculations and in-progress resource claims. These + values will be exposed via the compute node table to the scheduler. + """ + LOG.audit(_("Free ram (MB): %s") % resources['free_ram_mb']) + LOG.audit(_("Free disk (GB): %s") % resources['free_disk_gb']) + + vcpus = resources['vcpus'] + if vcpus: + free_vcpus = vcpus - resources['vcpus_used'] + LOG.audit(_("Free VCPUS: %s") % free_vcpus) + else: + LOG.audit(_("Free VCPU information unavailable")) + def _update(self, context, values, prune_stats=False): """Persist the compute node updates to the DB""" - return db.compute_node_update(context, self.compute_node['id'], - values, prune_stats) + compute_node = db.compute_node_update(context, + self.compute_node['id'], values, prune_stats) + self.compute_node = dict(compute_node) + + def _update_usage_from_instance(self, resources, instance): + """Update usage for a single instance.""" + + uuid = instance['uuid'] + is_new_instance = uuid not in self.tracked_instances + is_deleted_instance = instance['vm_state'] == vm_states.DELETED + + if is_new_instance: + self.tracked_instances[uuid] = 1 + sign = 1 + + if instance['vm_state'] == vm_states.DELETED: + self.tracked_instances.pop(uuid) + sign = -1 + + self.stats.update_stats_for_instance(instance) + + # if it's a new or deleted instance: + if is_new_instance or is_deleted_instance: + # new instance, update compute node resource usage: + resources['memory_mb_used'] += sign * instance['memory_mb'] + resources['local_gb_used'] += sign * instance['root_gb'] + resources['local_gb_used'] += sign * instance['ephemeral_gb'] + + # free ram and disk may be negative, depending on policy: + resources['free_ram_mb'] = (resources['memory_mb'] - + resources['memory_mb_used']) + resources['free_disk_gb'] = (resources['local_gb'] - + resources['local_gb_used']) + + resources['running_vms'] = self.stats.num_instances + resources['vcpus_used'] = self.stats.num_vcpus_used + + resources['current_workload'] = self.stats.calculate_workload() + resources['stats'] = self.stats + + def _update_usage_from_instances(self, resources, instances): + """Calculate resource usage based on instance utilization. This is + different than the hypervisor's view as it will account for all + instances assigned to the local compute host, even if they are not + currently powered on. + """ + self.tracked_instances.clear() + + # purge old stats + self.stats.clear() + + # set some intiial values, reserve room for host/hypervisor: + resources['local_gb_used'] = FLAGS.reserved_host_disk_mb / 1024 + resources['memory_mb_used'] = FLAGS.reserved_host_memory_mb + resources['vcpus_used'] = 0 + resources['free_ram_mb'] = (resources['memory_mb'] - + resources['memory_mb_used']) + resources['free_disk_gb'] = (resources['local_gb'] - + resources['local_gb_used']) + resources['current_workload'] = 0 + resources['running_vms'] = 0 + + for instance in instances: + self._update_usage_from_instance(resources, instance) def _verify_resources(self, resources): resource_keys = ["vcpus", "memory_mb", "local_gb", "cpu_info", |
