diff options
| author | Brian Elliott <brian.elliott@rackspace.com> | 2012-06-21 04:25:24 +0000 |
|---|---|---|
| committer | Brian Elliott <brian.elliott@rackspace.com> | 2012-08-20 15:46:53 +0000 |
| commit | c7d812a35bf4ef42907366c3f674fd623cd46905 (patch) | |
| tree | cc0cb8ac745d1fae51dafe7c24f499d5b0b27ea8 /nova/compute | |
| parent | 740e93aae891d6c20f38b091ad9f54d71db0d7f7 (diff) | |
Keep the ComputeNode model updated with usage
Keep the compute host's ComputeNode model in sync with the
level of resource usage. This enables the ComputeNode
model to be used as a basis for scheduling decisions
rather than forcing scheduler to calculate free
resources from an instance_get_all on each request.
Resources like memory and disk are claimed as instances are built
or deleted. There is also support for configurable compute node
stats (a generic key/value store) for extensible advertising of other
usage stats that may be useful for a particular scheduler
implementation. Additionally, there is a periodic task on the
compute host that audits actual resource consumption at the virt
layer to ensure that the database stays in sync.
This change partially implements blueprint: scheduler-resource-race
This patch complements:
https://review.openstack.org/#/c/9540/ (build re-scheduling support)
Change-Id: Ibbe3839a054f8b80664b413d47f766ca8d68e3f2
Diffstat (limited to 'nova/compute')
| -rw-r--r-- | nova/compute/manager.py | 31 | ||||
| -rw-r--r-- | nova/compute/resource_tracker.py | 450 | ||||
| -rw-r--r-- | nova/compute/stats.py | 118 |
3 files changed, 591 insertions, 8 deletions
diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 6d499f9fb..e45c3333d 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -47,6 +47,7 @@ from nova import block_device from nova import compute from nova.compute import instance_types from nova.compute import power_state +from nova.compute import resource_tracker from nova.compute import rpcapi as compute_rpcapi from nova.compute import task_states from nova.compute import utils as compute_utils @@ -278,11 +279,16 @@ class ComputeManager(manager.SchedulerDependentManager): super(ComputeManager, self).__init__(service_name="compute", *args, **kwargs) + self.resource_tracker = resource_tracker.ResourceTracker(self.host, + self.driver) + def _instance_update(self, context, instance_uuid, **kwargs): """Update an instance in the database using kwargs as value.""" (old_ref, instance_ref) = self.db.instance_update_and_get_original( context, instance_uuid, kwargs) + self.resource_tracker.update_load_stats_for_instance(context, old_ref, + instance_ref) notifications.send_update(context, old_ref, instance_ref) return instance_ref @@ -523,10 +529,16 @@ class ComputeManager(manager.SchedulerDependentManager): network_info = self._allocate_network(context, instance, requested_networks) try: - block_device_info = self._prep_block_device(context, instance) - instance = self._spawn(context, instance, image_meta, - network_info, block_device_info, - injected_files, admin_password) + memory_mb_limit = filter_properties.get('memory_mb_limit', + None) + with self.resource_tracker.instance_resource_claim(context, + instance, memory_mb_limit=memory_mb_limit): + block_device_info = self._prep_block_device(context, + instance) + instance = self._spawn(context, instance, image_meta, + network_info, block_device_info, + injected_files, admin_password) + except exception.InstanceNotFound: raise # the instance got deleted during the spawn except Exception: @@ -920,6 +932,8 @@ class ComputeManager(manager.SchedulerDependentManager): self.db.instance_destroy(context, instance_uuid) system_meta = self.db.instance_system_metadata_get(context, instance_uuid) + # mark resources free + self.resource_tracker.free_resources(context) self._notify_about_instance_usage(context, instance, "delete.end", system_metadata=system_meta) @@ -2902,13 +2916,14 @@ class ComputeManager(manager.SchedulerDependentManager): @manager.periodic_task def update_available_resource(self, context): - """See driver.update_available_resource() + """See driver.get_available_resource() - :param context: security context - :returns: See driver.update_available_resource() + Periodic process that keeps that the compute host's understanding of + resource availability and usage in sync with the underlying hypervisor. + :param context: security context """ - self.driver.update_available_resource(context, self.host) + self.resource_tracker.update_available_resource(context) def _add_instance_fault_from_exc(self, context, instance_uuid, fault, exc_info=None): diff --git a/nova/compute/resource_tracker.py b/nova/compute/resource_tracker.py new file mode 100644 index 000000000..3a43390a0 --- /dev/null +++ b/nova/compute/resource_tracker.py @@ -0,0 +1,450 @@ +# Copyright (c) 2012 OpenStack, LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Track resources like memory and disk for a compute host. Provides the +scheduler with useful information about availability through the ComputeNode +model. +""" + +from nova import context +from nova import db +from nova import exception +from nova import flags +from nova.openstack.common import cfg +from nova.openstack.common import importutils +from nova.openstack.common import log as logging +from nova.openstack.common import timeutils +from nova import utils + +resource_tracker_opts = [ + cfg.IntOpt('claim_timeout_seconds', default=600, + help='How long, in seconds, before a resource claim times out'), + cfg.StrOpt('compute_stats_class', + default='nova.compute.stats.Stats', + help='Class that will manage stats for the local compute host') +] + +FLAGS = flags.FLAGS +FLAGS.register_opts(resource_tracker_opts) + +LOG = logging.getLogger(__name__) +COMPUTE_RESOURCE_SEMAPHORE = "compute_resources" + + +class Claim(object): + """A declaration that a compute host operation will require free resources. + + This information will be used to help keep the local compute hosts's + ComputeNode model in sync to aid the scheduler in making efficient / more + correct decisions with respect to host selection. + """ + + def __init__(self, claim_id, memory_mb, disk_gb, timeout, *args, **kwargs): + self.claim_id = claim_id + self.memory_mb = memory_mb + self.disk_gb = disk_gb + self.timeout = timeout + self.expire_ts = timeutils.utcnow_ts() + timeout + + def apply_claim(self, resources): + """Adjust the resources required from available resources. + + :param resources: Should be a dictionary-like object that + has fields like a compute node + """ + return self._apply(resources) + + def undo_claim(self, resources): + return self._apply(resources, sign=-1) + + def is_expired(self): + """Determine if this adjustment is old enough that we can assume it's + no longer needed. + """ + return timeutils.utcnow_ts() > self.expire_ts + + def _apply(self, resources, sign=1): + values = {} + values['memory_mb_used'] = (resources['memory_mb_used'] + sign * + self.memory_mb) + values['free_ram_mb'] = (resources['free_ram_mb'] - sign * + self.memory_mb) + values['local_gb_used'] = (resources['local_gb_used'] + sign * + self.disk_gb) + values['free_disk_gb'] = (resources['free_disk_gb'] - sign * + self.disk_gb) + + return values + + def __str__(self): + return "[Claim %d: %d MB memory, %d GB disk]" % (self.claim_id, + self.memory_mb, self.disk_gb) + + +class ResourceContextManager(object): + def __init__(self, context, claim, tracker): + self.context = context + self.claim = claim + self.tracker = tracker + + def __enter__(self): + if not self.claim and not self.tracker.disabled: + # insufficient resources to complete request + raise exception.ComputeResourcesUnavailable() + + def __exit__(self, exc_type, exc_val, exc_tb): + if not self.claim: + return + + if exc_type is None: + self.tracker.finish_resource_claim(self.claim) + else: + self.tracker.abort_resource_claim(self.context, self.claim) + + +class ResourceTracker(object): + """Compute helper class for keeping track of resource usage as instances + are built and destroyed. + """ + + def __init__(self, host, driver): + self.host = host + self.driver = driver + self.compute_node = None + self.next_claim_id = 1 + self.claims = {} + self.stats = importutils.import_object(FLAGS.compute_stats_class) + + def resource_claim(self, context, *args, **kwargs): + claim = self.begin_resource_claim(context, *args, **kwargs) + return ResourceContextManager(context, claim, self) + + def instance_resource_claim(self, context, instance_ref, *args, **kwargs): + claim = self.begin_instance_resource_claim(context, instance_ref, + *args, **kwargs) + return ResourceContextManager(context, claim, self) + + @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) + def begin_instance_resource_claim(self, context, instance_ref, *args, + **kwargs): + """Method to begin a resource claim for a new instance.""" + memory_mb = instance_ref['memory_mb'] + disk_gb = instance_ref['root_gb'] + instance_ref['ephemeral_gb'] + + claim = self._do_begin_resource_claim(context, memory_mb, disk_gb, + *args, **kwargs) + if claim: + # also update load stats related to new instances firing up - + values = self._create_load_stats(context, instance_ref) + self.compute_node = self._update(context, values) + return claim + + @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) + def begin_resource_claim(self, context, memory_mb, disk_gb, + memory_mb_limit=None, timeout=None, *args, **kwargs): + """Indicate that some resources are needed for an upcoming compute + host operation. + + This should be called any time the compute node is about to perform + an operation that will consume resources. + + :param memory_mb: security context + :param memory_mb: Memory in MB to be claimed + :param root_gb: Disk in GB to be claimed + :param memory_mb_limit: Memory in MB that is the maximum to allocate on + this node. May exceed installed physical memory if + oversubscription is the desired behavior. + :param timeout: How long, in seconds, the operation that requires + these resources should take to actually allocate what + it needs from the hypervisor. If the timeout is + exceeded, the new resource claim will assume caller + before releasing the resources. + :returns: An integer 'claim ticket'. This should be turned into + finalize a resource claim or free resources after the + compute operation is finished. Returns None if the claim + failed. + """ + + return self._do_begin_resource_claim(context, memory_mb, disk_gb, + memory_mb_limit, timeout, *args, **kwargs) + + def _do_begin_resource_claim(self, context, memory_mb, disk_gb, + memory_mb_limit=None, timeout=None, *args, **kwargs): + + if self.disabled: + return + + if not timeout: + timeout = FLAGS.claim_timeout_seconds + + memory_mb = abs(memory_mb) + disk_gb = abs(disk_gb) + + msg = _("Attempting claim: memory %(memory_mb)d MB, disk %(disk_gb)d " + "GB, mem limit %(memory_mb_limit)s") % locals() + LOG.audit(msg) + + if not memory_mb_limit: + # default to total memory: + memory_mb_limit = self.compute_node['memory_mb'] + + free_ram_mb = memory_mb_limit - self.compute_node['memory_mb_used'] + + # Installed memory and usage info: + msg = _("Total memory: %(total_mem)d MB, used: %(used_mem)d MB, free: " + "%(free_mem)d") % dict( + total_mem=self.compute_node['memory_mb'], + used_mem=self.compute_node['memory_mb_used'], + free_mem=self.compute_node['local_gb_used']) + LOG.audit(msg) + + # Oversubscribed memory policy info: + msg = _("Limit: %(memory_mb_limit)d MB, free: %(free_ram_mb)d") % \ + locals() + LOG.audit(msg) + + if memory_mb > free_ram_mb: + msg = _("Unable to claim resources. Free memory %(free_ram_mb)d " + "MB < requested memory %(memory_mb)d MB") % locals() + LOG.info(msg) + return None + + if disk_gb > self.compute_node['free_disk_gb']: + msg = _("Unable to claim resources. Free disk %(free_disk_gb)d GB" + " < requested disk %(disk_gb)d GB") % dict( + free_disk_gb=self.compute_node['free_disk_gb'], + disk_gb=disk_gb) + LOG.info(msg) + return None + + claim_id = self._get_next_id() + c = Claim(claim_id, memory_mb, disk_gb, timeout, *args, **kwargs) + + # adjust compute node usage values and save so scheduler will see it: + values = c.apply_claim(self.compute_node) + self.compute_node = self._update(context, values) + + # keep track of this claim until we know whether the compute operation + # was successful/completed: + self.claims[claim_id] = c + return c + + def finish_resource_claim(self, claim): + """Indicate that the compute operation that previously claimed the + resources identified by 'claim' has now completed and the resources + have been allocated at the virt layer. + + Calling this keeps the available resource data more accurate and + timely than letting the claim timeout elapse and waiting for + update_available_resource to reflect the changed usage data. + + :param claim: A claim indicating a set of resources that were + previously claimed. + """ + if self.disabled: + return + + if self.claims.pop(claim.claim_id, None): + LOG.info(_("Finishing claim: %s") % claim) + else: + LOG.info(_("Can't find claim %d. It may have been 'finished' " + "twice, or it has already timed out."), claim.claim_id) + + @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) + def abort_resource_claim(self, context, claim): + """Indicate that the operation that claimed the resources identified by + 'claim_id' has either failed or been aborted and the resources are no + longer needed. + + :param claim: A claim ticket indicating a set of resources that were + previously claimed. + """ + if self.disabled: + return + + # un-claim the resources: + if self.claims.pop(claim.claim_id, None): + LOG.info(_("Aborting claim: %s") % claim) + values = claim.undo_claim(self.compute_node) + self.compute_node = self._update(context, values) + else: + # can't find the claim. this may mean the claim already timed + # out or it was already explicitly finished/aborted. + LOG.info(_("Claim %d not found. It either timed out or was " + "already explicitly finished/aborted"), claim.claim_id) + + @property + def disabled(self): + return self.compute_node is None + + def free_resources(self, context): + """A compute operation finished freeing up resources. Update compute + model to reflect updated resource usage. + + (The hypervisor may not immediately 'GC' all resources, so ask directly + to see what's available to update the compute node model.) + """ + self.update_available_resource(context.elevated()) + + @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) + def update_available_resource(self, context): + """Override in-memory calculations of compute node resource usage based + on data audited from the hypervisor layer. + + Add in resource claims in progress to account for operations that have + declared a need for resources, but not necessarily retrieved them from + the hypervisor layer yet. + """ + # ask hypervisor for its view of resource availability & + # usage: + resources = self.driver.get_available_resource() + if not resources: + # The virt driver does not support this function + LOG.warn(_("Virt driver does not support " + "'get_available_resource' Compute tracking is disabled.")) + self.compute_node = None + self.claims = {} + return + + # Confirm resources dictionary contains expected keys: + self._verify_resources(resources) + + resources['free_ram_mb'] = resources['memory_mb'] - \ + resources['memory_mb_used'] + resources['free_disk_gb'] = resources['local_gb'] - \ + resources['local_gb_used'] + + LOG.audit(_("free_ram_mb: %s") % resources['free_ram_mb']) + LOG.audit(_("free_disk_gb: %s") % resources['free_disk_gb']) + # Apply resource claims representing in-progress operations to + # 'resources'. This may over-estimate the amount of resources in use, + # at least until the next time 'update_available_resource' runs. + self._apply_claims(resources) + + # also generate all load stats: + values = self._create_load_stats(context) + resources.update(values) + + if not self.compute_node: + # we need a copy of the ComputeNode record: + service = self._get_service(context) + if not service: + # no service record, disable resource + return + + compute_node_ref = service['compute_node'] + if compute_node_ref: + self.compute_node = compute_node_ref[0] + + if not self.compute_node: + # Need to create the ComputeNode record: + resources['service_id'] = service['id'] + self.compute_node = self._create(context, resources) + LOG.info(_('Compute_service record created for %s ') % self.host) + + else: + # just update the record: + self.compute_node = self._update(context, resources, + prune_stats=True) + LOG.info(_('Compute_service record updated for %s ') % self.host) + + @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) + def update_load_stats_for_instance(self, context, old_ref, instance_ref): + """Update workload stats for the local compute host.""" + + if self.disabled: + return + + values = {} + self.stats.update_stats_for_instance(old_ref, instance_ref) + values['stats'] = self.stats + + values['current_workload'] = self.stats.calculate_workload() + values['running_vms'] = self.stats.num_instances + values['vcpus_used'] = self.stats.num_vcpus_used + + self.compute_node = self._update(context.elevated(), values) + + def _apply_claims(self, resources): + """Apply in-progress resource claims to the 'resources' dict from the + virt layer + """ + for claim_id in self.claims.keys(): + c = self.claims[claim_id] + if c.is_expired(): + # if this claim is expired, just expunge it + LOG.info(_("Expiring resource claim %d"), claim_id) + self.claims.pop(claim_id) + else: + values = c.apply_claim(resources) + resources.update(values) + + def _create(self, context, values): + """Create the compute node in the DB""" + # initialize load stats from existing instances: + compute_node = db.compute_node_create(context, values) + return compute_node + + def _create_load_stats(self, context, instance=None): + """For each existing instance generate load stats for the compute + node record. + """ + values = {} + + if instance: + instances = [instance] + else: + self.stats.clear() # re-generating all, so clear old stats + + # grab all instances that are not yet DELETED + filters = {'host': self.host, 'deleted': False} + instances = db.instance_get_all_by_filters(context, + {'host': self.host}) + + for instance in instances: + self.stats.add_stats_for_instance(instance) + + values['current_workload'] = self.stats.calculate_workload() + values['running_vms'] = self.stats.num_instances + values['vcpus_used'] = self.stats.num_vcpus_used + values['stats'] = self.stats + return values + + def _get_next_id(self): + next_id = self.next_claim_id + self.next_claim_id += 1 + return next_id + + def _get_service(self, context): + try: + return db.service_get_all_compute_by_host(context, + self.host)[0] + except exception.NotFound: + LOG.warn(_("No service record for host %s"), self.host) + + def _update(self, context, values, prune_stats=False): + """Persist the compute node updates to the DB""" + return db.compute_node_update(context, self.compute_node['id'], + values, prune_stats) + + def _verify_resources(self, resources): + resource_keys = ["vcpus", "memory_mb", "local_gb", "cpu_info", + "vcpus_used", "memory_mb_used", "local_gb_used"] + + missing_keys = [k for k in resource_keys if k not in resources] + if missing_keys: + reason = _("Missing keys: %s") % missing_keys + raise exception.InvalidInput(reason=reason) diff --git a/nova/compute/stats.py b/nova/compute/stats.py new file mode 100644 index 000000000..f67615028 --- /dev/null +++ b/nova/compute/stats.py @@ -0,0 +1,118 @@ +# Copyright (c) 2012 OpenStack, LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from nova.compute import task_states +from nova.compute import vm_states +from nova import db +from nova.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + + +class Stats(dict): + """Handler for updates to compute node workload stats.""" + + def add_stats_for_instance(self, instance): + self._increment("num_task_%s" % instance['task_state']) + + self._increment("num_vm_%s" % instance['vm_state']) + self._increment("num_instances") + + os_type = instance['os_type'] + self._increment("num_os_type_%s" % os_type) + + proj_id = instance['project_id'] + self._increment("num_proj_%s" % proj_id) + + x = self.get("num_vcpus_used", 0) + self["num_vcpus_used"] = x + instance["vcpus"] + + def calculate_io_workload(self): + """Calculate an I/O based load by counting I/O heavy operations""" + + def _get(state, state_type): + key = "num_%s_%s" % (state_type, state) + return self.get(key, 0) + + num_builds = _get(vm_states.BUILDING, "vm") + num_migrations = _get(task_states.RESIZE_MIGRATING, "task") + num_rebuilds = _get(task_states.REBUILDING, "task") + num_resizes = _get(task_states.RESIZE_PREP, "task") + num_snapshots = _get(task_states.IMAGE_SNAPSHOT, "task") + num_backups = _get(task_states.IMAGE_BACKUP, "task") + + return (num_builds + num_rebuilds + num_resizes + num_migrations + + num_snapshots + num_backups) + + def calculate_workload(self): + """Calculate current load of the compute host based on + task states. + """ + current_workload = 0 + for k in self: + if k.startswith("num_task") and not k.endswith("None"): + current_workload += self[k] + return current_workload + + @property + def num_instances(self): + return self.get("num_instances", 0) + + def num_instances_for_project(self, project_id): + key = "num_proj_%s" % project_id + return self.get(key, 0) + + def num_os_type(self, os_type): + key = "num_os_type_%s" % os_type + return self.get(key, 0) + + @property + def num_vcpus_used(self): + return self.get("num_vcpus_used", 0) + + def update_stats_for_instance(self, old_instance, instance): + """Update stats after an instance is changed.""" + + old_vm_state = old_instance['vm_state'] + new_vm_state = instance['vm_state'] + + if old_vm_state != new_vm_state: + self._decrement("num_vm_%s" % old_vm_state) + self._increment("num_vm_%s" % new_vm_state) + + if new_vm_state == vm_states.DELETED: + self._decrement("num_instances") + + self._decrement("num_os_type_%s" % old_instance['os_type']) + + self._decrement("num_proj_%s" % old_instance["project_id"]) + + x = self.get("num_vcpus_used", 0) + self["num_vcpus_used"] = x - old_instance['vcpus'] + + old_task_state = old_instance['task_state'] + new_task_state = instance['task_state'] + + if old_task_state != new_task_state: + self._decrement("num_task_%s" % old_task_state) + self._increment("num_task_%s" % new_task_state) + + def _decrement(self, key): + x = self.get(key, 0) + self[key] = x - 1 + + def _increment(self, key): + x = self.get(key, 0) + self[key] = x + 1 |
