summaryrefslogtreecommitdiffstats
path: root/nova/compute
diff options
context:
space:
mode:
Diffstat (limited to 'nova/compute')
-rw-r--r--nova/compute/manager.py31
-rw-r--r--nova/compute/resource_tracker.py450
-rw-r--r--nova/compute/stats.py118
3 files changed, 591 insertions, 8 deletions
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index 6d499f9fb..e45c3333d 100644
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -47,6 +47,7 @@ from nova import block_device
from nova import compute
from nova.compute import instance_types
from nova.compute import power_state
+from nova.compute import resource_tracker
from nova.compute import rpcapi as compute_rpcapi
from nova.compute import task_states
from nova.compute import utils as compute_utils
@@ -278,11 +279,16 @@ class ComputeManager(manager.SchedulerDependentManager):
super(ComputeManager, self).__init__(service_name="compute",
*args, **kwargs)
+ self.resource_tracker = resource_tracker.ResourceTracker(self.host,
+ self.driver)
+
def _instance_update(self, context, instance_uuid, **kwargs):
"""Update an instance in the database using kwargs as value."""
(old_ref, instance_ref) = self.db.instance_update_and_get_original(
context, instance_uuid, kwargs)
+ self.resource_tracker.update_load_stats_for_instance(context, old_ref,
+ instance_ref)
notifications.send_update(context, old_ref, instance_ref)
return instance_ref
@@ -523,10 +529,16 @@ class ComputeManager(manager.SchedulerDependentManager):
network_info = self._allocate_network(context, instance,
requested_networks)
try:
- block_device_info = self._prep_block_device(context, instance)
- instance = self._spawn(context, instance, image_meta,
- network_info, block_device_info,
- injected_files, admin_password)
+ memory_mb_limit = filter_properties.get('memory_mb_limit',
+ None)
+ with self.resource_tracker.instance_resource_claim(context,
+ instance, memory_mb_limit=memory_mb_limit):
+ block_device_info = self._prep_block_device(context,
+ instance)
+ instance = self._spawn(context, instance, image_meta,
+ network_info, block_device_info,
+ injected_files, admin_password)
+
except exception.InstanceNotFound:
raise # the instance got deleted during the spawn
except Exception:
@@ -920,6 +932,8 @@ class ComputeManager(manager.SchedulerDependentManager):
self.db.instance_destroy(context, instance_uuid)
system_meta = self.db.instance_system_metadata_get(context,
instance_uuid)
+ # mark resources free
+ self.resource_tracker.free_resources(context)
self._notify_about_instance_usage(context, instance, "delete.end",
system_metadata=system_meta)
@@ -2902,13 +2916,14 @@ class ComputeManager(manager.SchedulerDependentManager):
@manager.periodic_task
def update_available_resource(self, context):
- """See driver.update_available_resource()
+ """See driver.get_available_resource()
- :param context: security context
- :returns: See driver.update_available_resource()
+ Periodic process that keeps that the compute host's understanding of
+ resource availability and usage in sync with the underlying hypervisor.
+ :param context: security context
"""
- self.driver.update_available_resource(context, self.host)
+ self.resource_tracker.update_available_resource(context)
def _add_instance_fault_from_exc(self, context, instance_uuid, fault,
exc_info=None):
diff --git a/nova/compute/resource_tracker.py b/nova/compute/resource_tracker.py
new file mode 100644
index 000000000..3a43390a0
--- /dev/null
+++ b/nova/compute/resource_tracker.py
@@ -0,0 +1,450 @@
+# Copyright (c) 2012 OpenStack, LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Track resources like memory and disk for a compute host. Provides the
+scheduler with useful information about availability through the ComputeNode
+model.
+"""
+
+from nova import context
+from nova import db
+from nova import exception
+from nova import flags
+from nova.openstack.common import cfg
+from nova.openstack.common import importutils
+from nova.openstack.common import log as logging
+from nova.openstack.common import timeutils
+from nova import utils
+
+resource_tracker_opts = [
+ cfg.IntOpt('claim_timeout_seconds', default=600,
+ help='How long, in seconds, before a resource claim times out'),
+ cfg.StrOpt('compute_stats_class',
+ default='nova.compute.stats.Stats',
+ help='Class that will manage stats for the local compute host')
+]
+
+FLAGS = flags.FLAGS
+FLAGS.register_opts(resource_tracker_opts)
+
+LOG = logging.getLogger(__name__)
+COMPUTE_RESOURCE_SEMAPHORE = "compute_resources"
+
+
+class Claim(object):
+ """A declaration that a compute host operation will require free resources.
+
+ This information will be used to help keep the local compute hosts's
+ ComputeNode model in sync to aid the scheduler in making efficient / more
+ correct decisions with respect to host selection.
+ """
+
+ def __init__(self, claim_id, memory_mb, disk_gb, timeout, *args, **kwargs):
+ self.claim_id = claim_id
+ self.memory_mb = memory_mb
+ self.disk_gb = disk_gb
+ self.timeout = timeout
+ self.expire_ts = timeutils.utcnow_ts() + timeout
+
+ def apply_claim(self, resources):
+ """Adjust the resources required from available resources.
+
+ :param resources: Should be a dictionary-like object that
+ has fields like a compute node
+ """
+ return self._apply(resources)
+
+ def undo_claim(self, resources):
+ return self._apply(resources, sign=-1)
+
+ def is_expired(self):
+ """Determine if this adjustment is old enough that we can assume it's
+ no longer needed.
+ """
+ return timeutils.utcnow_ts() > self.expire_ts
+
+ def _apply(self, resources, sign=1):
+ values = {}
+ values['memory_mb_used'] = (resources['memory_mb_used'] + sign *
+ self.memory_mb)
+ values['free_ram_mb'] = (resources['free_ram_mb'] - sign *
+ self.memory_mb)
+ values['local_gb_used'] = (resources['local_gb_used'] + sign *
+ self.disk_gb)
+ values['free_disk_gb'] = (resources['free_disk_gb'] - sign *
+ self.disk_gb)
+
+ return values
+
+ def __str__(self):
+ return "[Claim %d: %d MB memory, %d GB disk]" % (self.claim_id,
+ self.memory_mb, self.disk_gb)
+
+
+class ResourceContextManager(object):
+ def __init__(self, context, claim, tracker):
+ self.context = context
+ self.claim = claim
+ self.tracker = tracker
+
+ def __enter__(self):
+ if not self.claim and not self.tracker.disabled:
+ # insufficient resources to complete request
+ raise exception.ComputeResourcesUnavailable()
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if not self.claim:
+ return
+
+ if exc_type is None:
+ self.tracker.finish_resource_claim(self.claim)
+ else:
+ self.tracker.abort_resource_claim(self.context, self.claim)
+
+
+class ResourceTracker(object):
+ """Compute helper class for keeping track of resource usage as instances
+ are built and destroyed.
+ """
+
+ def __init__(self, host, driver):
+ self.host = host
+ self.driver = driver
+ self.compute_node = None
+ self.next_claim_id = 1
+ self.claims = {}
+ self.stats = importutils.import_object(FLAGS.compute_stats_class)
+
+ def resource_claim(self, context, *args, **kwargs):
+ claim = self.begin_resource_claim(context, *args, **kwargs)
+ return ResourceContextManager(context, claim, self)
+
+ def instance_resource_claim(self, context, instance_ref, *args, **kwargs):
+ claim = self.begin_instance_resource_claim(context, instance_ref,
+ *args, **kwargs)
+ return ResourceContextManager(context, claim, self)
+
+ @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
+ def begin_instance_resource_claim(self, context, instance_ref, *args,
+ **kwargs):
+ """Method to begin a resource claim for a new instance."""
+ memory_mb = instance_ref['memory_mb']
+ disk_gb = instance_ref['root_gb'] + instance_ref['ephemeral_gb']
+
+ claim = self._do_begin_resource_claim(context, memory_mb, disk_gb,
+ *args, **kwargs)
+ if claim:
+ # also update load stats related to new instances firing up -
+ values = self._create_load_stats(context, instance_ref)
+ self.compute_node = self._update(context, values)
+ return claim
+
+ @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
+ def begin_resource_claim(self, context, memory_mb, disk_gb,
+ memory_mb_limit=None, timeout=None, *args, **kwargs):
+ """Indicate that some resources are needed for an upcoming compute
+ host operation.
+
+ This should be called any time the compute node is about to perform
+ an operation that will consume resources.
+
+ :param memory_mb: security context
+ :param memory_mb: Memory in MB to be claimed
+ :param root_gb: Disk in GB to be claimed
+ :param memory_mb_limit: Memory in MB that is the maximum to allocate on
+ this node. May exceed installed physical memory if
+ oversubscription is the desired behavior.
+ :param timeout: How long, in seconds, the operation that requires
+ these resources should take to actually allocate what
+ it needs from the hypervisor. If the timeout is
+ exceeded, the new resource claim will assume caller
+ before releasing the resources.
+ :returns: An integer 'claim ticket'. This should be turned into
+ finalize a resource claim or free resources after the
+ compute operation is finished. Returns None if the claim
+ failed.
+ """
+
+ return self._do_begin_resource_claim(context, memory_mb, disk_gb,
+ memory_mb_limit, timeout, *args, **kwargs)
+
+ def _do_begin_resource_claim(self, context, memory_mb, disk_gb,
+ memory_mb_limit=None, timeout=None, *args, **kwargs):
+
+ if self.disabled:
+ return
+
+ if not timeout:
+ timeout = FLAGS.claim_timeout_seconds
+
+ memory_mb = abs(memory_mb)
+ disk_gb = abs(disk_gb)
+
+ msg = _("Attempting claim: memory %(memory_mb)d MB, disk %(disk_gb)d "
+ "GB, mem limit %(memory_mb_limit)s") % locals()
+ LOG.audit(msg)
+
+ if not memory_mb_limit:
+ # default to total memory:
+ memory_mb_limit = self.compute_node['memory_mb']
+
+ free_ram_mb = memory_mb_limit - self.compute_node['memory_mb_used']
+
+ # Installed memory and usage info:
+ msg = _("Total memory: %(total_mem)d MB, used: %(used_mem)d MB, free: "
+ "%(free_mem)d") % dict(
+ total_mem=self.compute_node['memory_mb'],
+ used_mem=self.compute_node['memory_mb_used'],
+ free_mem=self.compute_node['local_gb_used'])
+ LOG.audit(msg)
+
+ # Oversubscribed memory policy info:
+ msg = _("Limit: %(memory_mb_limit)d MB, free: %(free_ram_mb)d") % \
+ locals()
+ LOG.audit(msg)
+
+ if memory_mb > free_ram_mb:
+ msg = _("Unable to claim resources. Free memory %(free_ram_mb)d "
+ "MB < requested memory %(memory_mb)d MB") % locals()
+ LOG.info(msg)
+ return None
+
+ if disk_gb > self.compute_node['free_disk_gb']:
+ msg = _("Unable to claim resources. Free disk %(free_disk_gb)d GB"
+ " < requested disk %(disk_gb)d GB") % dict(
+ free_disk_gb=self.compute_node['free_disk_gb'],
+ disk_gb=disk_gb)
+ LOG.info(msg)
+ return None
+
+ claim_id = self._get_next_id()
+ c = Claim(claim_id, memory_mb, disk_gb, timeout, *args, **kwargs)
+
+ # adjust compute node usage values and save so scheduler will see it:
+ values = c.apply_claim(self.compute_node)
+ self.compute_node = self._update(context, values)
+
+ # keep track of this claim until we know whether the compute operation
+ # was successful/completed:
+ self.claims[claim_id] = c
+ return c
+
+ def finish_resource_claim(self, claim):
+ """Indicate that the compute operation that previously claimed the
+ resources identified by 'claim' has now completed and the resources
+ have been allocated at the virt layer.
+
+ Calling this keeps the available resource data more accurate and
+ timely than letting the claim timeout elapse and waiting for
+ update_available_resource to reflect the changed usage data.
+
+ :param claim: A claim indicating a set of resources that were
+ previously claimed.
+ """
+ if self.disabled:
+ return
+
+ if self.claims.pop(claim.claim_id, None):
+ LOG.info(_("Finishing claim: %s") % claim)
+ else:
+ LOG.info(_("Can't find claim %d. It may have been 'finished' "
+ "twice, or it has already timed out."), claim.claim_id)
+
+ @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
+ def abort_resource_claim(self, context, claim):
+ """Indicate that the operation that claimed the resources identified by
+ 'claim_id' has either failed or been aborted and the resources are no
+ longer needed.
+
+ :param claim: A claim ticket indicating a set of resources that were
+ previously claimed.
+ """
+ if self.disabled:
+ return
+
+ # un-claim the resources:
+ if self.claims.pop(claim.claim_id, None):
+ LOG.info(_("Aborting claim: %s") % claim)
+ values = claim.undo_claim(self.compute_node)
+ self.compute_node = self._update(context, values)
+ else:
+ # can't find the claim. this may mean the claim already timed
+ # out or it was already explicitly finished/aborted.
+ LOG.info(_("Claim %d not found. It either timed out or was "
+ "already explicitly finished/aborted"), claim.claim_id)
+
+ @property
+ def disabled(self):
+ return self.compute_node is None
+
+ def free_resources(self, context):
+ """A compute operation finished freeing up resources. Update compute
+ model to reflect updated resource usage.
+
+ (The hypervisor may not immediately 'GC' all resources, so ask directly
+ to see what's available to update the compute node model.)
+ """
+ self.update_available_resource(context.elevated())
+
+ @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
+ def update_available_resource(self, context):
+ """Override in-memory calculations of compute node resource usage based
+ on data audited from the hypervisor layer.
+
+ Add in resource claims in progress to account for operations that have
+ declared a need for resources, but not necessarily retrieved them from
+ the hypervisor layer yet.
+ """
+ # ask hypervisor for its view of resource availability &
+ # usage:
+ resources = self.driver.get_available_resource()
+ if not resources:
+ # The virt driver does not support this function
+ LOG.warn(_("Virt driver does not support "
+ "'get_available_resource' Compute tracking is disabled."))
+ self.compute_node = None
+ self.claims = {}
+ return
+
+ # Confirm resources dictionary contains expected keys:
+ self._verify_resources(resources)
+
+ resources['free_ram_mb'] = resources['memory_mb'] - \
+ resources['memory_mb_used']
+ resources['free_disk_gb'] = resources['local_gb'] - \
+ resources['local_gb_used']
+
+ LOG.audit(_("free_ram_mb: %s") % resources['free_ram_mb'])
+ LOG.audit(_("free_disk_gb: %s") % resources['free_disk_gb'])
+ # Apply resource claims representing in-progress operations to
+ # 'resources'. This may over-estimate the amount of resources in use,
+ # at least until the next time 'update_available_resource' runs.
+ self._apply_claims(resources)
+
+ # also generate all load stats:
+ values = self._create_load_stats(context)
+ resources.update(values)
+
+ if not self.compute_node:
+ # we need a copy of the ComputeNode record:
+ service = self._get_service(context)
+ if not service:
+ # no service record, disable resource
+ return
+
+ compute_node_ref = service['compute_node']
+ if compute_node_ref:
+ self.compute_node = compute_node_ref[0]
+
+ if not self.compute_node:
+ # Need to create the ComputeNode record:
+ resources['service_id'] = service['id']
+ self.compute_node = self._create(context, resources)
+ LOG.info(_('Compute_service record created for %s ') % self.host)
+
+ else:
+ # just update the record:
+ self.compute_node = self._update(context, resources,
+ prune_stats=True)
+ LOG.info(_('Compute_service record updated for %s ') % self.host)
+
+ @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
+ def update_load_stats_for_instance(self, context, old_ref, instance_ref):
+ """Update workload stats for the local compute host."""
+
+ if self.disabled:
+ return
+
+ values = {}
+ self.stats.update_stats_for_instance(old_ref, instance_ref)
+ values['stats'] = self.stats
+
+ values['current_workload'] = self.stats.calculate_workload()
+ values['running_vms'] = self.stats.num_instances
+ values['vcpus_used'] = self.stats.num_vcpus_used
+
+ self.compute_node = self._update(context.elevated(), values)
+
+ def _apply_claims(self, resources):
+ """Apply in-progress resource claims to the 'resources' dict from the
+ virt layer
+ """
+ for claim_id in self.claims.keys():
+ c = self.claims[claim_id]
+ if c.is_expired():
+ # if this claim is expired, just expunge it
+ LOG.info(_("Expiring resource claim %d"), claim_id)
+ self.claims.pop(claim_id)
+ else:
+ values = c.apply_claim(resources)
+ resources.update(values)
+
+ def _create(self, context, values):
+ """Create the compute node in the DB"""
+ # initialize load stats from existing instances:
+ compute_node = db.compute_node_create(context, values)
+ return compute_node
+
+ def _create_load_stats(self, context, instance=None):
+ """For each existing instance generate load stats for the compute
+ node record.
+ """
+ values = {}
+
+ if instance:
+ instances = [instance]
+ else:
+ self.stats.clear() # re-generating all, so clear old stats
+
+ # grab all instances that are not yet DELETED
+ filters = {'host': self.host, 'deleted': False}
+ instances = db.instance_get_all_by_filters(context,
+ {'host': self.host})
+
+ for instance in instances:
+ self.stats.add_stats_for_instance(instance)
+
+ values['current_workload'] = self.stats.calculate_workload()
+ values['running_vms'] = self.stats.num_instances
+ values['vcpus_used'] = self.stats.num_vcpus_used
+ values['stats'] = self.stats
+ return values
+
+ def _get_next_id(self):
+ next_id = self.next_claim_id
+ self.next_claim_id += 1
+ return next_id
+
+ def _get_service(self, context):
+ try:
+ return db.service_get_all_compute_by_host(context,
+ self.host)[0]
+ except exception.NotFound:
+ LOG.warn(_("No service record for host %s"), self.host)
+
+ def _update(self, context, values, prune_stats=False):
+ """Persist the compute node updates to the DB"""
+ return db.compute_node_update(context, self.compute_node['id'],
+ values, prune_stats)
+
+ def _verify_resources(self, resources):
+ resource_keys = ["vcpus", "memory_mb", "local_gb", "cpu_info",
+ "vcpus_used", "memory_mb_used", "local_gb_used"]
+
+ missing_keys = [k for k in resource_keys if k not in resources]
+ if missing_keys:
+ reason = _("Missing keys: %s") % missing_keys
+ raise exception.InvalidInput(reason=reason)
diff --git a/nova/compute/stats.py b/nova/compute/stats.py
new file mode 100644
index 000000000..f67615028
--- /dev/null
+++ b/nova/compute/stats.py
@@ -0,0 +1,118 @@
+# Copyright (c) 2012 OpenStack, LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from nova.compute import task_states
+from nova.compute import vm_states
+from nova import db
+from nova.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+class Stats(dict):
+ """Handler for updates to compute node workload stats."""
+
+ def add_stats_for_instance(self, instance):
+ self._increment("num_task_%s" % instance['task_state'])
+
+ self._increment("num_vm_%s" % instance['vm_state'])
+ self._increment("num_instances")
+
+ os_type = instance['os_type']
+ self._increment("num_os_type_%s" % os_type)
+
+ proj_id = instance['project_id']
+ self._increment("num_proj_%s" % proj_id)
+
+ x = self.get("num_vcpus_used", 0)
+ self["num_vcpus_used"] = x + instance["vcpus"]
+
+ def calculate_io_workload(self):
+ """Calculate an I/O based load by counting I/O heavy operations"""
+
+ def _get(state, state_type):
+ key = "num_%s_%s" % (state_type, state)
+ return self.get(key, 0)
+
+ num_builds = _get(vm_states.BUILDING, "vm")
+ num_migrations = _get(task_states.RESIZE_MIGRATING, "task")
+ num_rebuilds = _get(task_states.REBUILDING, "task")
+ num_resizes = _get(task_states.RESIZE_PREP, "task")
+ num_snapshots = _get(task_states.IMAGE_SNAPSHOT, "task")
+ num_backups = _get(task_states.IMAGE_BACKUP, "task")
+
+ return (num_builds + num_rebuilds + num_resizes + num_migrations +
+ num_snapshots + num_backups)
+
+ def calculate_workload(self):
+ """Calculate current load of the compute host based on
+ task states.
+ """
+ current_workload = 0
+ for k in self:
+ if k.startswith("num_task") and not k.endswith("None"):
+ current_workload += self[k]
+ return current_workload
+
+ @property
+ def num_instances(self):
+ return self.get("num_instances", 0)
+
+ def num_instances_for_project(self, project_id):
+ key = "num_proj_%s" % project_id
+ return self.get(key, 0)
+
+ def num_os_type(self, os_type):
+ key = "num_os_type_%s" % os_type
+ return self.get(key, 0)
+
+ @property
+ def num_vcpus_used(self):
+ return self.get("num_vcpus_used", 0)
+
+ def update_stats_for_instance(self, old_instance, instance):
+ """Update stats after an instance is changed."""
+
+ old_vm_state = old_instance['vm_state']
+ new_vm_state = instance['vm_state']
+
+ if old_vm_state != new_vm_state:
+ self._decrement("num_vm_%s" % old_vm_state)
+ self._increment("num_vm_%s" % new_vm_state)
+
+ if new_vm_state == vm_states.DELETED:
+ self._decrement("num_instances")
+
+ self._decrement("num_os_type_%s" % old_instance['os_type'])
+
+ self._decrement("num_proj_%s" % old_instance["project_id"])
+
+ x = self.get("num_vcpus_used", 0)
+ self["num_vcpus_used"] = x - old_instance['vcpus']
+
+ old_task_state = old_instance['task_state']
+ new_task_state = instance['task_state']
+
+ if old_task_state != new_task_state:
+ self._decrement("num_task_%s" % old_task_state)
+ self._increment("num_task_%s" % new_task_state)
+
+ def _decrement(self, key):
+ x = self.get(key, 0)
+ self[key] = x - 1
+
+ def _increment(self, key):
+ x = self.get(key, 0)
+ self[key] = x + 1