summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--nova/compute/manager.py31
-rw-r--r--nova/compute/resource_tracker.py450
-rw-r--r--nova/compute/stats.py118
-rw-r--r--nova/db/api.py17
-rw-r--r--nova/db/sqlalchemy/api.py153
-rw-r--r--nova/db/sqlalchemy/migrate_repo/versions/117_add_compute_node_stats.py61
-rw-r--r--nova/db/sqlalchemy/models.py20
-rw-r--r--nova/exception.py4
-rw-r--r--nova/scheduler/filters/ram_filter.py10
-rw-r--r--nova/scheduler/host_manager.py30
-rw-r--r--nova/tests/api/openstack/compute/contrib/test_hosts.py2
-rw-r--r--nova/tests/compute/fake_resource_tracker.py35
-rw-r--r--nova/tests/compute/test_compute.py91
-rw-r--r--nova/tests/compute/test_resource_tracker.py492
-rw-r--r--nova/tests/compute/test_stats.py182
-rw-r--r--nova/tests/scheduler/fakes.py7
-rw-r--r--nova/tests/scheduler/test_filter_scheduler.py3
-rw-r--r--nova/tests/scheduler/test_host_filters.py18
-rw-r--r--nova/tests/scheduler/test_host_manager.py4
-rw-r--r--nova/tests/test_db_api.py134
-rw-r--r--nova/tests/test_xenapi.py2
-rw-r--r--nova/virt/baremetal/driver.py23
-rw-r--r--nova/virt/driver.py10
-rw-r--r--nova/virt/fake.py20
-rw-r--r--nova/virt/libvirt/driver.py25
-rw-r--r--nova/virt/vmwareapi/driver.py2
-rw-r--r--nova/virt/xenapi/driver.py27
-rw-r--r--nova/virt/xenapi/fake.py2
-rw-r--r--nova/virt/xenapi/host.py3
29 files changed, 1675 insertions, 301 deletions
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index 6d499f9fb..e45c3333d 100644
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -47,6 +47,7 @@ from nova import block_device
from nova import compute
from nova.compute import instance_types
from nova.compute import power_state
+from nova.compute import resource_tracker
from nova.compute import rpcapi as compute_rpcapi
from nova.compute import task_states
from nova.compute import utils as compute_utils
@@ -278,11 +279,16 @@ class ComputeManager(manager.SchedulerDependentManager):
super(ComputeManager, self).__init__(service_name="compute",
*args, **kwargs)
+ self.resource_tracker = resource_tracker.ResourceTracker(self.host,
+ self.driver)
+
def _instance_update(self, context, instance_uuid, **kwargs):
"""Update an instance in the database using kwargs as value."""
(old_ref, instance_ref) = self.db.instance_update_and_get_original(
context, instance_uuid, kwargs)
+ self.resource_tracker.update_load_stats_for_instance(context, old_ref,
+ instance_ref)
notifications.send_update(context, old_ref, instance_ref)
return instance_ref
@@ -523,10 +529,16 @@ class ComputeManager(manager.SchedulerDependentManager):
network_info = self._allocate_network(context, instance,
requested_networks)
try:
- block_device_info = self._prep_block_device(context, instance)
- instance = self._spawn(context, instance, image_meta,
- network_info, block_device_info,
- injected_files, admin_password)
+ memory_mb_limit = filter_properties.get('memory_mb_limit',
+ None)
+ with self.resource_tracker.instance_resource_claim(context,
+ instance, memory_mb_limit=memory_mb_limit):
+ block_device_info = self._prep_block_device(context,
+ instance)
+ instance = self._spawn(context, instance, image_meta,
+ network_info, block_device_info,
+ injected_files, admin_password)
+
except exception.InstanceNotFound:
raise # the instance got deleted during the spawn
except Exception:
@@ -920,6 +932,8 @@ class ComputeManager(manager.SchedulerDependentManager):
self.db.instance_destroy(context, instance_uuid)
system_meta = self.db.instance_system_metadata_get(context,
instance_uuid)
+ # mark resources free
+ self.resource_tracker.free_resources(context)
self._notify_about_instance_usage(context, instance, "delete.end",
system_metadata=system_meta)
@@ -2902,13 +2916,14 @@ class ComputeManager(manager.SchedulerDependentManager):
@manager.periodic_task
def update_available_resource(self, context):
- """See driver.update_available_resource()
+ """See driver.get_available_resource()
- :param context: security context
- :returns: See driver.update_available_resource()
+ Periodic process that keeps that the compute host's understanding of
+ resource availability and usage in sync with the underlying hypervisor.
+ :param context: security context
"""
- self.driver.update_available_resource(context, self.host)
+ self.resource_tracker.update_available_resource(context)
def _add_instance_fault_from_exc(self, context, instance_uuid, fault,
exc_info=None):
diff --git a/nova/compute/resource_tracker.py b/nova/compute/resource_tracker.py
new file mode 100644
index 000000000..3a43390a0
--- /dev/null
+++ b/nova/compute/resource_tracker.py
@@ -0,0 +1,450 @@
+# Copyright (c) 2012 OpenStack, LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Track resources like memory and disk for a compute host. Provides the
+scheduler with useful information about availability through the ComputeNode
+model.
+"""
+
+from nova import context
+from nova import db
+from nova import exception
+from nova import flags
+from nova.openstack.common import cfg
+from nova.openstack.common import importutils
+from nova.openstack.common import log as logging
+from nova.openstack.common import timeutils
+from nova import utils
+
+resource_tracker_opts = [
+ cfg.IntOpt('claim_timeout_seconds', default=600,
+ help='How long, in seconds, before a resource claim times out'),
+ cfg.StrOpt('compute_stats_class',
+ default='nova.compute.stats.Stats',
+ help='Class that will manage stats for the local compute host')
+]
+
+FLAGS = flags.FLAGS
+FLAGS.register_opts(resource_tracker_opts)
+
+LOG = logging.getLogger(__name__)
+COMPUTE_RESOURCE_SEMAPHORE = "compute_resources"
+
+
+class Claim(object):
+ """A declaration that a compute host operation will require free resources.
+
+ This information will be used to help keep the local compute hosts's
+ ComputeNode model in sync to aid the scheduler in making efficient / more
+ correct decisions with respect to host selection.
+ """
+
+ def __init__(self, claim_id, memory_mb, disk_gb, timeout, *args, **kwargs):
+ self.claim_id = claim_id
+ self.memory_mb = memory_mb
+ self.disk_gb = disk_gb
+ self.timeout = timeout
+ self.expire_ts = timeutils.utcnow_ts() + timeout
+
+ def apply_claim(self, resources):
+ """Adjust the resources required from available resources.
+
+ :param resources: Should be a dictionary-like object that
+ has fields like a compute node
+ """
+ return self._apply(resources)
+
+ def undo_claim(self, resources):
+ return self._apply(resources, sign=-1)
+
+ def is_expired(self):
+ """Determine if this adjustment is old enough that we can assume it's
+ no longer needed.
+ """
+ return timeutils.utcnow_ts() > self.expire_ts
+
+ def _apply(self, resources, sign=1):
+ values = {}
+ values['memory_mb_used'] = (resources['memory_mb_used'] + sign *
+ self.memory_mb)
+ values['free_ram_mb'] = (resources['free_ram_mb'] - sign *
+ self.memory_mb)
+ values['local_gb_used'] = (resources['local_gb_used'] + sign *
+ self.disk_gb)
+ values['free_disk_gb'] = (resources['free_disk_gb'] - sign *
+ self.disk_gb)
+
+ return values
+
+ def __str__(self):
+ return "[Claim %d: %d MB memory, %d GB disk]" % (self.claim_id,
+ self.memory_mb, self.disk_gb)
+
+
+class ResourceContextManager(object):
+ def __init__(self, context, claim, tracker):
+ self.context = context
+ self.claim = claim
+ self.tracker = tracker
+
+ def __enter__(self):
+ if not self.claim and not self.tracker.disabled:
+ # insufficient resources to complete request
+ raise exception.ComputeResourcesUnavailable()
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if not self.claim:
+ return
+
+ if exc_type is None:
+ self.tracker.finish_resource_claim(self.claim)
+ else:
+ self.tracker.abort_resource_claim(self.context, self.claim)
+
+
+class ResourceTracker(object):
+ """Compute helper class for keeping track of resource usage as instances
+ are built and destroyed.
+ """
+
+ def __init__(self, host, driver):
+ self.host = host
+ self.driver = driver
+ self.compute_node = None
+ self.next_claim_id = 1
+ self.claims = {}
+ self.stats = importutils.import_object(FLAGS.compute_stats_class)
+
+ def resource_claim(self, context, *args, **kwargs):
+ claim = self.begin_resource_claim(context, *args, **kwargs)
+ return ResourceContextManager(context, claim, self)
+
+ def instance_resource_claim(self, context, instance_ref, *args, **kwargs):
+ claim = self.begin_instance_resource_claim(context, instance_ref,
+ *args, **kwargs)
+ return ResourceContextManager(context, claim, self)
+
+ @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
+ def begin_instance_resource_claim(self, context, instance_ref, *args,
+ **kwargs):
+ """Method to begin a resource claim for a new instance."""
+ memory_mb = instance_ref['memory_mb']
+ disk_gb = instance_ref['root_gb'] + instance_ref['ephemeral_gb']
+
+ claim = self._do_begin_resource_claim(context, memory_mb, disk_gb,
+ *args, **kwargs)
+ if claim:
+ # also update load stats related to new instances firing up -
+ values = self._create_load_stats(context, instance_ref)
+ self.compute_node = self._update(context, values)
+ return claim
+
+ @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
+ def begin_resource_claim(self, context, memory_mb, disk_gb,
+ memory_mb_limit=None, timeout=None, *args, **kwargs):
+ """Indicate that some resources are needed for an upcoming compute
+ host operation.
+
+ This should be called any time the compute node is about to perform
+ an operation that will consume resources.
+
+ :param memory_mb: security context
+ :param memory_mb: Memory in MB to be claimed
+ :param root_gb: Disk in GB to be claimed
+ :param memory_mb_limit: Memory in MB that is the maximum to allocate on
+ this node. May exceed installed physical memory if
+ oversubscription is the desired behavior.
+ :param timeout: How long, in seconds, the operation that requires
+ these resources should take to actually allocate what
+ it needs from the hypervisor. If the timeout is
+ exceeded, the new resource claim will assume caller
+ before releasing the resources.
+ :returns: An integer 'claim ticket'. This should be turned into
+ finalize a resource claim or free resources after the
+ compute operation is finished. Returns None if the claim
+ failed.
+ """
+
+ return self._do_begin_resource_claim(context, memory_mb, disk_gb,
+ memory_mb_limit, timeout, *args, **kwargs)
+
+ def _do_begin_resource_claim(self, context, memory_mb, disk_gb,
+ memory_mb_limit=None, timeout=None, *args, **kwargs):
+
+ if self.disabled:
+ return
+
+ if not timeout:
+ timeout = FLAGS.claim_timeout_seconds
+
+ memory_mb = abs(memory_mb)
+ disk_gb = abs(disk_gb)
+
+ msg = _("Attempting claim: memory %(memory_mb)d MB, disk %(disk_gb)d "
+ "GB, mem limit %(memory_mb_limit)s") % locals()
+ LOG.audit(msg)
+
+ if not memory_mb_limit:
+ # default to total memory:
+ memory_mb_limit = self.compute_node['memory_mb']
+
+ free_ram_mb = memory_mb_limit - self.compute_node['memory_mb_used']
+
+ # Installed memory and usage info:
+ msg = _("Total memory: %(total_mem)d MB, used: %(used_mem)d MB, free: "
+ "%(free_mem)d") % dict(
+ total_mem=self.compute_node['memory_mb'],
+ used_mem=self.compute_node['memory_mb_used'],
+ free_mem=self.compute_node['local_gb_used'])
+ LOG.audit(msg)
+
+ # Oversubscribed memory policy info:
+ msg = _("Limit: %(memory_mb_limit)d MB, free: %(free_ram_mb)d") % \
+ locals()
+ LOG.audit(msg)
+
+ if memory_mb > free_ram_mb:
+ msg = _("Unable to claim resources. Free memory %(free_ram_mb)d "
+ "MB < requested memory %(memory_mb)d MB") % locals()
+ LOG.info(msg)
+ return None
+
+ if disk_gb > self.compute_node['free_disk_gb']:
+ msg = _("Unable to claim resources. Free disk %(free_disk_gb)d GB"
+ " < requested disk %(disk_gb)d GB") % dict(
+ free_disk_gb=self.compute_node['free_disk_gb'],
+ disk_gb=disk_gb)
+ LOG.info(msg)
+ return None
+
+ claim_id = self._get_next_id()
+ c = Claim(claim_id, memory_mb, disk_gb, timeout, *args, **kwargs)
+
+ # adjust compute node usage values and save so scheduler will see it:
+ values = c.apply_claim(self.compute_node)
+ self.compute_node = self._update(context, values)
+
+ # keep track of this claim until we know whether the compute operation
+ # was successful/completed:
+ self.claims[claim_id] = c
+ return c
+
+ def finish_resource_claim(self, claim):
+ """Indicate that the compute operation that previously claimed the
+ resources identified by 'claim' has now completed and the resources
+ have been allocated at the virt layer.
+
+ Calling this keeps the available resource data more accurate and
+ timely than letting the claim timeout elapse and waiting for
+ update_available_resource to reflect the changed usage data.
+
+ :param claim: A claim indicating a set of resources that were
+ previously claimed.
+ """
+ if self.disabled:
+ return
+
+ if self.claims.pop(claim.claim_id, None):
+ LOG.info(_("Finishing claim: %s") % claim)
+ else:
+ LOG.info(_("Can't find claim %d. It may have been 'finished' "
+ "twice, or it has already timed out."), claim.claim_id)
+
+ @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
+ def abort_resource_claim(self, context, claim):
+ """Indicate that the operation that claimed the resources identified by
+ 'claim_id' has either failed or been aborted and the resources are no
+ longer needed.
+
+ :param claim: A claim ticket indicating a set of resources that were
+ previously claimed.
+ """
+ if self.disabled:
+ return
+
+ # un-claim the resources:
+ if self.claims.pop(claim.claim_id, None):
+ LOG.info(_("Aborting claim: %s") % claim)
+ values = claim.undo_claim(self.compute_node)
+ self.compute_node = self._update(context, values)
+ else:
+ # can't find the claim. this may mean the claim already timed
+ # out or it was already explicitly finished/aborted.
+ LOG.info(_("Claim %d not found. It either timed out or was "
+ "already explicitly finished/aborted"), claim.claim_id)
+
+ @property
+ def disabled(self):
+ return self.compute_node is None
+
+ def free_resources(self, context):
+ """A compute operation finished freeing up resources. Update compute
+ model to reflect updated resource usage.
+
+ (The hypervisor may not immediately 'GC' all resources, so ask directly
+ to see what's available to update the compute node model.)
+ """
+ self.update_available_resource(context.elevated())
+
+ @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
+ def update_available_resource(self, context):
+ """Override in-memory calculations of compute node resource usage based
+ on data audited from the hypervisor layer.
+
+ Add in resource claims in progress to account for operations that have
+ declared a need for resources, but not necessarily retrieved them from
+ the hypervisor layer yet.
+ """
+ # ask hypervisor for its view of resource availability &
+ # usage:
+ resources = self.driver.get_available_resource()
+ if not resources:
+ # The virt driver does not support this function
+ LOG.warn(_("Virt driver does not support "
+ "'get_available_resource' Compute tracking is disabled."))
+ self.compute_node = None
+ self.claims = {}
+ return
+
+ # Confirm resources dictionary contains expected keys:
+ self._verify_resources(resources)
+
+ resources['free_ram_mb'] = resources['memory_mb'] - \
+ resources['memory_mb_used']
+ resources['free_disk_gb'] = resources['local_gb'] - \
+ resources['local_gb_used']
+
+ LOG.audit(_("free_ram_mb: %s") % resources['free_ram_mb'])
+ LOG.audit(_("free_disk_gb: %s") % resources['free_disk_gb'])
+ # Apply resource claims representing in-progress operations to
+ # 'resources'. This may over-estimate the amount of resources in use,
+ # at least until the next time 'update_available_resource' runs.
+ self._apply_claims(resources)
+
+ # also generate all load stats:
+ values = self._create_load_stats(context)
+ resources.update(values)
+
+ if not self.compute_node:
+ # we need a copy of the ComputeNode record:
+ service = self._get_service(context)
+ if not service:
+ # no service record, disable resource
+ return
+
+ compute_node_ref = service['compute_node']
+ if compute_node_ref:
+ self.compute_node = compute_node_ref[0]
+
+ if not self.compute_node:
+ # Need to create the ComputeNode record:
+ resources['service_id'] = service['id']
+ self.compute_node = self._create(context, resources)
+ LOG.info(_('Compute_service record created for %s ') % self.host)
+
+ else:
+ # just update the record:
+ self.compute_node = self._update(context, resources,
+ prune_stats=True)
+ LOG.info(_('Compute_service record updated for %s ') % self.host)
+
+ @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
+ def update_load_stats_for_instance(self, context, old_ref, instance_ref):
+ """Update workload stats for the local compute host."""
+
+ if self.disabled:
+ return
+
+ values = {}
+ self.stats.update_stats_for_instance(old_ref, instance_ref)
+ values['stats'] = self.stats
+
+ values['current_workload'] = self.stats.calculate_workload()
+ values['running_vms'] = self.stats.num_instances
+ values['vcpus_used'] = self.stats.num_vcpus_used
+
+ self.compute_node = self._update(context.elevated(), values)
+
+ def _apply_claims(self, resources):
+ """Apply in-progress resource claims to the 'resources' dict from the
+ virt layer
+ """
+ for claim_id in self.claims.keys():
+ c = self.claims[claim_id]
+ if c.is_expired():
+ # if this claim is expired, just expunge it
+ LOG.info(_("Expiring resource claim %d"), claim_id)
+ self.claims.pop(claim_id)
+ else:
+ values = c.apply_claim(resources)
+ resources.update(values)
+
+ def _create(self, context, values):
+ """Create the compute node in the DB"""
+ # initialize load stats from existing instances:
+ compute_node = db.compute_node_create(context, values)
+ return compute_node
+
+ def _create_load_stats(self, context, instance=None):
+ """For each existing instance generate load stats for the compute
+ node record.
+ """
+ values = {}
+
+ if instance:
+ instances = [instance]
+ else:
+ self.stats.clear() # re-generating all, so clear old stats
+
+ # grab all instances that are not yet DELETED
+ filters = {'host': self.host, 'deleted': False}
+ instances = db.instance_get_all_by_filters(context,
+ {'host': self.host})
+
+ for instance in instances:
+ self.stats.add_stats_for_instance(instance)
+
+ values['current_workload'] = self.stats.calculate_workload()
+ values['running_vms'] = self.stats.num_instances
+ values['vcpus_used'] = self.stats.num_vcpus_used
+ values['stats'] = self.stats
+ return values
+
+ def _get_next_id(self):
+ next_id = self.next_claim_id
+ self.next_claim_id += 1
+ return next_id
+
+ def _get_service(self, context):
+ try:
+ return db.service_get_all_compute_by_host(context,
+ self.host)[0]
+ except exception.NotFound:
+ LOG.warn(_("No service record for host %s"), self.host)
+
+ def _update(self, context, values, prune_stats=False):
+ """Persist the compute node updates to the DB"""
+ return db.compute_node_update(context, self.compute_node['id'],
+ values, prune_stats)
+
+ def _verify_resources(self, resources):
+ resource_keys = ["vcpus", "memory_mb", "local_gb", "cpu_info",
+ "vcpus_used", "memory_mb_used", "local_gb_used"]
+
+ missing_keys = [k for k in resource_keys if k not in resources]
+ if missing_keys:
+ reason = _("Missing keys: %s") % missing_keys
+ raise exception.InvalidInput(reason=reason)
diff --git a/nova/compute/stats.py b/nova/compute/stats.py
new file mode 100644
index 000000000..f67615028
--- /dev/null
+++ b/nova/compute/stats.py
@@ -0,0 +1,118 @@
+# Copyright (c) 2012 OpenStack, LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from nova.compute import task_states
+from nova.compute import vm_states
+from nova import db
+from nova.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+class Stats(dict):
+ """Handler for updates to compute node workload stats."""
+
+ def add_stats_for_instance(self, instance):
+ self._increment("num_task_%s" % instance['task_state'])
+
+ self._increment("num_vm_%s" % instance['vm_state'])
+ self._increment("num_instances")
+
+ os_type = instance['os_type']
+ self._increment("num_os_type_%s" % os_type)
+
+ proj_id = instance['project_id']
+ self._increment("num_proj_%s" % proj_id)
+
+ x = self.get("num_vcpus_used", 0)
+ self["num_vcpus_used"] = x + instance["vcpus"]
+
+ def calculate_io_workload(self):
+ """Calculate an I/O based load by counting I/O heavy operations"""
+
+ def _get(state, state_type):
+ key = "num_%s_%s" % (state_type, state)
+ return self.get(key, 0)
+
+ num_builds = _get(vm_states.BUILDING, "vm")
+ num_migrations = _get(task_states.RESIZE_MIGRATING, "task")
+ num_rebuilds = _get(task_states.REBUILDING, "task")
+ num_resizes = _get(task_states.RESIZE_PREP, "task")
+ num_snapshots = _get(task_states.IMAGE_SNAPSHOT, "task")
+ num_backups = _get(task_states.IMAGE_BACKUP, "task")
+
+ return (num_builds + num_rebuilds + num_resizes + num_migrations +
+ num_snapshots + num_backups)
+
+ def calculate_workload(self):
+ """Calculate current load of the compute host based on
+ task states.
+ """
+ current_workload = 0
+ for k in self:
+ if k.startswith("num_task") and not k.endswith("None"):
+ current_workload += self[k]
+ return current_workload
+
+ @property
+ def num_instances(self):
+ return self.get("num_instances", 0)
+
+ def num_instances_for_project(self, project_id):
+ key = "num_proj_%s" % project_id
+ return self.get(key, 0)
+
+ def num_os_type(self, os_type):
+ key = "num_os_type_%s" % os_type
+ return self.get(key, 0)
+
+ @property
+ def num_vcpus_used(self):
+ return self.get("num_vcpus_used", 0)
+
+ def update_stats_for_instance(self, old_instance, instance):
+ """Update stats after an instance is changed."""
+
+ old_vm_state = old_instance['vm_state']
+ new_vm_state = instance['vm_state']
+
+ if old_vm_state != new_vm_state:
+ self._decrement("num_vm_%s" % old_vm_state)
+ self._increment("num_vm_%s" % new_vm_state)
+
+ if new_vm_state == vm_states.DELETED:
+ self._decrement("num_instances")
+
+ self._decrement("num_os_type_%s" % old_instance['os_type'])
+
+ self._decrement("num_proj_%s" % old_instance["project_id"])
+
+ x = self.get("num_vcpus_used", 0)
+ self["num_vcpus_used"] = x - old_instance['vcpus']
+
+ old_task_state = old_instance['task_state']
+ new_task_state = instance['task_state']
+
+ if old_task_state != new_task_state:
+ self._decrement("num_task_%s" % old_task_state)
+ self._increment("num_task_%s" % new_task_state)
+
+ def _decrement(self, key):
+ x = self.get(key, 0)
+ self[key] = x - 1
+
+ def _increment(self, key):
+ x = self.get(key, 0)
+ self[key] = x + 1
diff --git a/nova/db/api.py b/nova/db/api.py
index 94fdd8ce2..c36606a5c 100644
--- a/nova/db/api.py
+++ b/nova/db/api.py
@@ -208,31 +208,18 @@ def compute_node_create(context, values):
return IMPL.compute_node_create(context, values)
-def compute_node_update(context, compute_id, values, auto_adjust=True):
+def compute_node_update(context, compute_id, values, prune_stats=False):
"""Set the given properties on a computeNode and update it.
Raises NotFound if computeNode does not exist.
"""
- return IMPL.compute_node_update(context, compute_id, values, auto_adjust)
+ return IMPL.compute_node_update(context, compute_id, values, prune_stats)
def compute_node_get_by_host(context, host):
return IMPL.compute_node_get_by_host(context, host)
-def compute_node_utilization_update(context, host, free_ram_mb_delta=0,
- free_disk_gb_delta=0, work_delta=0, vm_delta=0):
- return IMPL.compute_node_utilization_update(context, host,
- free_ram_mb_delta, free_disk_gb_delta, work_delta,
- vm_delta)
-
-
-def compute_node_utilization_set(context, host, free_ram_mb=None,
- free_disk_gb=None, work=None, vms=None):
- return IMPL.compute_node_utilization_set(context, host, free_ram_mb,
- free_disk_gb, work, vms)
-
-
def compute_node_statistics(context):
return IMPL.compute_node_statistics(context)
diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py
index d15b7b353..c85dfc723 100644
--- a/nova/db/sqlalchemy/api.py
+++ b/nova/db/sqlalchemy/api.py
@@ -485,6 +485,7 @@ def service_update(context, service_id, values):
def compute_node_get(context, compute_id, session=None):
result = model_query(context, models.ComputeNode, session=session).\
filter_by(id=compute_id).\
+ options(joinedload('stats')).\
first()
if not result:
@@ -497,6 +498,7 @@ def compute_node_get(context, compute_id, session=None):
def compute_node_get_all(context, session=None):
return model_query(context, models.ComputeNode, session=session).\
options(joinedload('service')).\
+ options(joinedload('stats')).\
all()
@@ -509,42 +511,27 @@ def compute_node_search_by_hypervisor(context, hypervisor_match):
all()
-def _get_host_utilization(context, host, ram_mb, disk_gb):
- """Compute the current utilization of a given host."""
- instances = instance_get_all_by_host(context, host)
- vms = len(instances)
- free_ram_mb = ram_mb - FLAGS.reserved_host_memory_mb
- free_disk_gb = disk_gb - (FLAGS.reserved_host_disk_mb * 1024)
-
- work = 0
- for instance in instances:
- free_ram_mb -= instance.memory_mb
- free_disk_gb -= instance.root_gb
- free_disk_gb -= instance.ephemeral_gb
- if instance.task_state is not None:
- work += 1
- return dict(free_ram_mb=free_ram_mb,
- free_disk_gb=free_disk_gb,
- current_workload=work,
- running_vms=vms)
-
-
-def _adjust_compute_node_values_for_utilization(context, values, session):
- service_ref = service_get(context, values['service_id'], session=session)
- host = service_ref['host']
- ram_mb = values['memory_mb']
- disk_gb = values['local_gb']
- values.update(_get_host_utilization(context, host, ram_mb, disk_gb))
+def _prep_stats_dict(values):
+ """Make list of ComputeNodeStats"""
+ stats = []
+ d = values.get('stats', {})
+ for k, v in d.iteritems():
+ stat = models.ComputeNodeStat()
+ stat['key'] = k
+ stat['value'] = v
+ stats.append(stat)
+ values['stats'] = stats
@require_admin_context
def compute_node_create(context, values, session=None):
"""Creates a new ComputeNode and populates the capacity fields
with the most recent data."""
+ _prep_stats_dict(values)
+
if not session:
session = get_session()
- _adjust_compute_node_values_for_utilization(context, values, session)
with session.begin(subtransactions=True):
compute_node_ref = models.ComputeNode()
session.add(compute_node_ref)
@@ -552,17 +539,52 @@ def compute_node_create(context, values, session=None):
return compute_node_ref
+def _update_stats(context, new_stats, compute_id, session, prune_stats=False):
+
+ existing = model_query(context, models.ComputeNodeStat, session=session,
+ read_deleted="no").filter_by(compute_node_id=compute_id).all()
+ statmap = {}
+ for stat in existing:
+ key = stat['key']
+ statmap[key] = stat
+
+ stats = []
+ for k, v in new_stats.iteritems():
+ old_stat = statmap.pop(k, None)
+ if old_stat:
+ # update existing value:
+ old_stat.update({'value': v})
+ stats.append(old_stat)
+ else:
+ # add new stat:
+ stat = models.ComputeNodeStat()
+ stat['compute_node_id'] = compute_id
+ stat['key'] = k
+ stat['value'] = v
+ stats.append(stat)
+
+ if prune_stats:
+ # prune un-touched old stats:
+ for stat in statmap.values():
+ session.add(stat)
+ stat.update({'deleted': True})
+
+ # add new and updated stats
+ for stat in stats:
+ session.add(stat)
+
+
@require_admin_context
-def compute_node_update(context, compute_id, values, auto_adjust):
- """Creates a new ComputeNode and populates the capacity fields
- with the most recent data."""
+def compute_node_update(context, compute_id, values, prune_stats=False):
+ """Updates the ComputeNode record with the most recent data"""
+ stats = values.pop('stats', {})
+
session = get_session()
- if auto_adjust:
- _adjust_compute_node_values_for_utilization(context, values, session)
with session.begin(subtransactions=True):
+ _update_stats(context, stats, compute_id, session, prune_stats)
compute_ref = compute_node_get(context, compute_id, session=session)
compute_ref.update(values)
- compute_ref.save(session=session)
+ return compute_ref
def compute_node_get_by_host(context, host):
@@ -576,71 +598,6 @@ def compute_node_get_by_host(context, host):
return node.first()
-def compute_node_utilization_update(context, host, free_ram_mb_delta=0,
- free_disk_gb_delta=0, work_delta=0, vm_delta=0):
- """Update a specific ComputeNode entry by a series of deltas.
- Do this as a single atomic action and lock the row for the
- duration of the operation. Requires that ComputeNode record exist."""
- session = get_session()
- compute_node = None
- with session.begin(subtransactions=True):
- compute_node = session.query(models.ComputeNode).\
- options(joinedload('service')).\
- filter(models.Service.host == host).\
- filter_by(deleted=False).\
- with_lockmode('update').\
- first()
- if compute_node is None:
- raise exception.NotFound(_("No ComputeNode for %(host)s") %
- locals())
-
- # This table thingy is how we get atomic UPDATE x = x + 1
- # semantics.
- table = models.ComputeNode.__table__
- if free_ram_mb_delta != 0:
- compute_node.free_ram_mb = table.c.free_ram_mb + free_ram_mb_delta
- if free_disk_gb_delta != 0:
- compute_node.free_disk_gb = (table.c.free_disk_gb +
- free_disk_gb_delta)
- if work_delta != 0:
- compute_node.current_workload = (table.c.current_workload +
- work_delta)
- if vm_delta != 0:
- compute_node.running_vms = table.c.running_vms + vm_delta
- return compute_node
-
-
-def compute_node_utilization_set(context, host, free_ram_mb=None,
- free_disk_gb=None, work=None, vms=None):
- """Like compute_node_utilization_update() modify a specific host
- entry. But this function will set the metrics absolutely
- (vs. a delta update).
- """
- session = get_session()
- compute_node = None
- with session.begin(subtransactions=True):
- compute_node = session.query(models.ComputeNode).\
- options(joinedload('service')).\
- filter(models.Service.host == host).\
- filter_by(deleted=False).\
- with_lockmode('update').\
- first()
- if compute_node is None:
- raise exception.NotFound(_("No ComputeNode for %(host)s") %
- locals())
-
- if free_ram_mb is not None:
- compute_node.free_ram_mb = free_ram_mb
- if free_disk_gb is not None:
- compute_node.free_disk_gb = free_disk_gb
- if work is not None:
- compute_node.current_workload = work
- if vms is not None:
- compute_node.running_vms = vms
-
- return compute_node
-
-
def compute_node_statistics(context):
"""Compute statistics over all compute nodes."""
result = model_query(context,
diff --git a/nova/db/sqlalchemy/migrate_repo/versions/117_add_compute_node_stats.py b/nova/db/sqlalchemy/migrate_repo/versions/117_add_compute_node_stats.py
new file mode 100644
index 000000000..5b0e19660
--- /dev/null
+++ b/nova/db/sqlalchemy/migrate_repo/versions/117_add_compute_node_stats.py
@@ -0,0 +1,61 @@
+# Copyright (c) 2012 OpenStack, LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from sqlalchemy import Boolean, Column, DateTime, Integer
+from sqlalchemy import Index, MetaData, String, Table
+from nova.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+def upgrade(migrate_engine):
+ meta = MetaData()
+ meta.bind = migrate_engine
+
+ # load tables for fk
+ compute_nodes = Table('compute_nodes', meta, autoload=True)
+
+ # create new table
+ compute_node_stats = Table('compute_node_stats', meta,
+ Column('created_at', DateTime(timezone=False)),
+ Column('updated_at', DateTime(timezone=False)),
+ Column('deleted_at', DateTime(timezone=False)),
+ Column('deleted', Boolean(create_constraint=True, name=None)),
+ Column('id', Integer(), primary_key=True, nullable=False,
+ autoincrement=True),
+ Column('compute_node_id', Integer, index=True, nullable=False),
+ Column('key', String(length=255, convert_unicode=True,
+ assert_unicode=None, unicode_error=None,
+ _warn_on_bytestring=False), nullable=False),
+ Column('value', String(length=255, convert_unicode=True,
+ assert_unicode=None, unicode_error=None,
+ _warn_on_bytestring=False)),
+ mysql_engine='InnoDB')
+ try:
+ compute_node_stats.create()
+ except Exception:
+ LOG.exception("Exception while creating table 'compute_node_stats'")
+ raise
+
+
+def downgrade(migrate_engine):
+ meta = MetaData()
+ meta.bind = migrate_engine
+
+ # load tables for fk
+ compute_nodes = Table('compute_nodes', meta, autoload=True)
+
+ compute_node_stats = Table('compute_node_stats', meta, autoload=True)
+ compute_node_stats.drop()
diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py
index 7d4435a7a..a37b1c215 100644
--- a/nova/db/sqlalchemy/models.py
+++ b/nova/db/sqlalchemy/models.py
@@ -144,8 +144,6 @@ class ComputeNode(BASE, NovaBase):
# Free Ram, amount of activity (resize, migration, boot, etc) and
# the number of running VM's are a good starting point for what's
# important when making scheduling decisions.
- #
- # NOTE(sandy): We'll need to make this extensible for other schedulers.
free_ram_mb = Column(Integer)
free_disk_gb = Column(Integer)
current_workload = Column(Integer)
@@ -165,6 +163,24 @@ class ComputeNode(BASE, NovaBase):
disk_available_least = Column(Integer)
+class ComputeNodeStat(BASE, NovaBase):
+ """Stats related to the current workload of a compute host that are
+ intended to aid in making scheduler decisions."""
+ __tablename__ = 'compute_node_stats'
+ id = Column(Integer, primary_key=True)
+ key = Column(String(511))
+ value = Column(String(255))
+ compute_node_id = Column(Integer, ForeignKey('compute_nodes.id'))
+
+ primary_join = ('and_(ComputeNodeStat.compute_node_id == '
+ 'ComputeNode.id, ComputeNodeStat.deleted == False)')
+ stats = relationship("ComputeNode", backref="stats",
+ primaryjoin=primary_join)
+
+ def __str__(self):
+ return "{%d: %s = %s}" % (self.compute_node_id, self.key, self.value)
+
+
class Certificate(BASE, NovaBase):
"""Represents a x509 certificate"""
__tablename__ = 'certificates'
diff --git a/nova/exception.py b/nova/exception.py
index 5d232bdd8..292f8d6ca 100644
--- a/nova/exception.py
+++ b/nova/exception.py
@@ -351,6 +351,10 @@ class ServiceUnavailable(Invalid):
message = _("Service is unavailable at this time.")
+class ComputeResourcesUnavailable(ServiceUnavailable):
+ message = _("Insufficient compute resources.")
+
+
class ComputeServiceUnavailable(ServiceUnavailable):
message = _("Compute service is unavailable at this time.")
diff --git a/nova/scheduler/filters/ram_filter.py b/nova/scheduler/filters/ram_filter.py
index c2bcfbf7f..6ae9c654a 100644
--- a/nova/scheduler/filters/ram_filter.py
+++ b/nova/scheduler/filters/ram_filter.py
@@ -38,12 +38,18 @@ class RamFilter(filters.BaseHostFilter):
requested_ram = instance_type['memory_mb']
free_ram_mb = host_state.free_ram_mb
total_usable_ram_mb = host_state.total_usable_ram_mb
+
+ oversubscribed_ram_limit_mb = (total_usable_ram_mb *
+ FLAGS.ram_allocation_ratio)
used_ram_mb = total_usable_ram_mb - free_ram_mb
- usable_ram = (total_usable_ram_mb * FLAGS.ram_allocation_ratio -
- used_ram_mb)
+ usable_ram = oversubscribed_ram_limit_mb - used_ram_mb
if not usable_ram >= requested_ram:
LOG.debug(_("%(host_state)s does not have %(requested_ram)s MB "
"usable ram, it only has %(usable_ram)s MB usable ram."),
locals())
return False
+
+ # save oversubscribe ram limit so the compute host can verify
+ # memory availability on builds:
+ filter_properties['memory_limit_mb'] = oversubscribed_ram_limit_mb
return True
diff --git a/nova/scheduler/host_manager.py b/nova/scheduler/host_manager.py
index fb683d91f..f373a2ef3 100644
--- a/nova/scheduler/host_manager.py
+++ b/nova/scheduler/host_manager.py
@@ -120,19 +120,26 @@ class HostState(object):
"""Update information about a host from its compute_node info."""
all_disk_mb = compute['local_gb'] * 1024
all_ram_mb = compute['memory_mb']
- vcpus_total = compute['vcpus']
+
+ free_disk_mb = compute['free_disk_gb'] * 1024
+ free_ram_mb = compute['free_ram_mb']
+
if FLAGS.reserved_host_disk_mb > 0:
all_disk_mb -= FLAGS.reserved_host_disk_mb
+ free_disk_mb -= FLAGS.reserved_host_disk_mb
if FLAGS.reserved_host_memory_mb > 0:
all_ram_mb -= FLAGS.reserved_host_memory_mb
+ free_ram_mb -= FLAGS.reserved_host_memory_mb
+
#NOTE(jogo) free_ram_mb can be negative
- self.free_ram_mb = all_ram_mb
+ self.free_ram_mb = free_ram_mb
self.total_usable_ram_mb = all_ram_mb
- self.free_disk_mb = all_disk_mb
- self.vcpus_total = vcpus_total
+ self.free_disk_mb = free_disk_mb
+ self.vcpus_total = compute['vcpus']
+ self.vcpus_used = compute['vcpus_used']
def consume_from_instance(self, instance):
- """Update information about a host from instance info."""
+ """Incrementally update host state from an instance"""
disk_mb = (instance['root_gb'] + instance['ephemeral_gb']) * 1024
ram_mb = instance['memory_mb']
vcpus = instance['vcpus']
@@ -252,7 +259,7 @@ class HostManager(object):
host_state_map = {}
- # Make a compute node dict with the bare essential metrics.
+ # Get resource usage across the available compute nodes:
compute_nodes = db.compute_node_get_all(context)
for compute in compute_nodes:
service = compute['service']
@@ -267,15 +274,4 @@ class HostManager(object):
host_state.update_from_compute_node(compute)
host_state_map[host] = host_state
- # "Consume" resources from the host the instance resides on.
- instances = db.instance_get_all(context,
- columns_to_join=['instance_type'])
- for instance in instances:
- host = instance['host']
- if not host:
- continue
- host_state = host_state_map.get(host, None)
- if not host_state:
- continue
- host_state.consume_from_instance(instance)
return host_state_map
diff --git a/nova/tests/api/openstack/compute/contrib/test_hosts.py b/nova/tests/api/openstack/compute/contrib/test_hosts.py
index ea23cd5c2..570b13473 100644
--- a/nova/tests/api/openstack/compute/contrib/test_hosts.py
+++ b/nova/tests/api/openstack/compute/contrib/test_hosts.py
@@ -214,7 +214,7 @@ class HostTestCase(test.TestCase):
'vcpus': 16, 'memory_mb': 32, 'local_gb': 100,
'vcpus_used': 16, 'memory_mb_used': 32, 'local_gb_used': 10,
'hypervisor_type': 'qemu', 'hypervisor_version': 12003,
- 'cpu_info': ''}
+ 'cpu_info': '', 'stats': {}}
db.compute_node_create(ctxt, dic)
return db.service_get(ctxt, s_ref['id'])
diff --git a/nova/tests/compute/fake_resource_tracker.py b/nova/tests/compute/fake_resource_tracker.py
new file mode 100644
index 000000000..faba13bab
--- /dev/null
+++ b/nova/tests/compute/fake_resource_tracker.py
@@ -0,0 +1,35 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2012 OpenStack, LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from nova.compute import resource_tracker
+
+
+class FakeResourceTracker(resource_tracker.ResourceTracker):
+ """Version without a DB requirement"""
+
+ def _create(self, context, values):
+ return values
+
+ def _update(self, context, values, prune_stats=False):
+ self.compute_node.update(values)
+ return self.compute_node
+
+ def _get_service(self, context):
+ return {
+ "id": 1,
+ "compute_node": None
+ }
diff --git a/nova/tests/compute/test_compute.py b/nova/tests/compute/test_compute.py
index b6d775bd2..dc2d43394 100644
--- a/nova/tests/compute/test_compute.py
+++ b/nova/tests/compute/test_compute.py
@@ -53,6 +53,7 @@ import nova.policy
from nova import quota
from nova.scheduler import driver as scheduler_driver
from nova import test
+from nova.tests.compute import fake_resource_tracker
from nova.tests.db.fakes import FakeModel
from nova.tests import fake_network
from nova.tests.image import fake as fake_image
@@ -92,6 +93,13 @@ class BaseTestCase(test.TestCase):
network_manager='nova.network.manager.FlatManager')
self.compute = importutils.import_object(FLAGS.compute_manager)
+ # override tracker with a version that doesn't need the database:
+ self.compute.resource_tracker = \
+ fake_resource_tracker.FakeResourceTracker(self.compute.host,
+ self.compute.driver)
+ self.compute.update_available_resource(
+ context.get_admin_context())
+
self.user_id = 'fake'
self.project_id = 'fake'
self.context = context.RequestContext(self.user_id,
@@ -140,6 +148,7 @@ class BaseTestCase(test.TestCase):
inst['root_gb'] = 0
inst['ephemeral_gb'] = 0
inst['architecture'] = 'x86_64'
+ inst['os_type'] = 'Linux'
inst.update(params)
return db.instance_create(self.context, inst)
@@ -261,6 +270,87 @@ class ComputeTestCase(BaseTestCase):
finally:
db.instance_destroy(self.context, instance['uuid'])
+ def test_create_instance_insufficient_memory(self):
+ params = {"memory_mb": 999999999999}
+ instance = self._create_fake_instance(params)
+ self.assertRaises(exception.ComputeResourcesUnavailable,
+ self.compute.run_instance, self.context, instance=instance)
+
+ def test_create_instance_insufficient_disk(self):
+ params = {"root_gb": 999999999999,
+ "ephemeral_gb": 99999999999}
+ instance = self._create_fake_instance(params)
+ self.assertRaises(exception.ComputeResourcesUnavailable,
+ self.compute.run_instance, self.context, instance=instance)
+
+ def test_create_multiple_instances_then_starve(self):
+ params = {"memory_mb": 1024, "root_gb": 128, "ephemeral_gb": 128}
+ instance = self._create_fake_instance(params)
+ self.compute.run_instance(self.context, instance=instance)
+ self.assertEquals(1024,
+ self.compute.resource_tracker.compute_node['memory_mb_used'])
+ self.assertEquals(256,
+ self.compute.resource_tracker.compute_node['local_gb_used'])
+
+ params = {"memory_mb": 2048, "root_gb": 256, "ephemeral_gb": 256}
+ instance = self._create_fake_instance(params)
+ self.compute.run_instance(self.context, instance=instance)
+ self.assertEquals(3072,
+ self.compute.resource_tracker.compute_node['memory_mb_used'])
+ self.assertEquals(768,
+ self.compute.resource_tracker.compute_node['local_gb_used'])
+
+ params = {"memory_mb": 8192, "root_gb": 8192, "ephemeral_gb": 8192}
+ instance = self._create_fake_instance(params)
+ self.assertRaises(exception.ComputeResourcesUnavailable,
+ self.compute.run_instance, self.context, instance=instance)
+
+ def test_create_instance_with_oversubscribed_ram(self):
+ """Test passing of oversubscribed ram policy from the scheduler."""
+
+ # get total memory as reported by virt driver:
+ resources = self.compute.driver.get_available_resource()
+ total_mem_mb = resources['memory_mb']
+
+ oversub_limit_mb = total_mem_mb * 1.5
+ instance_mb = int(total_mem_mb * 1.45)
+
+ # build an instance, specifying an amount of memory that exceeds
+ # total_mem_mb, but is less than the oversubscribed limit:
+ params = {"memory_mb": instance_mb, "root_gb": 128,
+ "ephemeral_gb": 128}
+ instance = self._create_fake_instance(params)
+
+ filter_properties = dict(memory_mb_limit=oversub_limit_mb)
+ self.compute.run_instance(self.context, instance=instance,
+ filter_properties=filter_properties)
+
+ self.assertEqual(instance_mb,
+ self.compute.resource_tracker.compute_node['memory_mb_used'])
+
+ def test_create_instance_with_oversubscribed_ram_fail(self):
+ """Test passing of oversubscribed ram policy from the scheduler, but
+ with insufficient memory.
+ """
+ # get total memory as reported by virt driver:
+ resources = self.compute.driver.get_available_resource()
+ total_mem_mb = resources['memory_mb']
+
+ oversub_limit_mb = total_mem_mb * 1.5
+ instance_mb = int(total_mem_mb * 1.55)
+
+ # build an instance, specifying an amount of memory that exceeds
+ # total_mem_mb, but is less than the oversubscribed limit:
+ params = {"memory_mb": instance_mb, "root_gb": 128,
+ "ephemeral_gb": 128}
+ instance = self._create_fake_instance(params)
+
+ filter_properties = dict(memory_mb_limit=oversub_limit_mb)
+
+ self.assertRaises(exception.ComputeResourcesUnavailable,
+ self.compute.run_instance, self.context, instance=instance,
+ filter_properties=filter_properties)
+
def test_default_access_ip(self):
self.flags(default_access_ip_network_name='test1', stub_network=False)
instance = jsonutils.to_primitive(self._create_fake_instance())
@@ -3986,6 +4076,7 @@ class ComputeAPITestCase(BaseTestCase):
instance=jsonutils.to_primitive(instance))
instance = self.compute_api.get(self.context, instance['uuid'])
security_group_name = self._create_group()['name']
+
self.security_group_api.add_to_instance(self.context,
instance,
security_group_name)
diff --git a/nova/tests/compute/test_resource_tracker.py b/nova/tests/compute/test_resource_tracker.py
new file mode 100644
index 000000000..cba2a6c17
--- /dev/null
+++ b/nova/tests/compute/test_resource_tracker.py
@@ -0,0 +1,492 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2012 OpenStack, LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Tests for compute resource tracking"""
+
+import copy
+
+from nova.compute import resource_tracker
+from nova.compute import task_states
+from nova.compute import vm_states
+from nova import db
+from nova import exception
+from nova.openstack.common import timeutils
+from nova import test
+from nova.virt import driver
+
+
+class FakeContext(object):
+ def __init__(self, is_admin=False):
+ self.is_admin = is_admin
+
+ def elevated(self):
+ return FakeContext(is_admin=True)
+
+
+class UnsupportedVirtDriver(driver.ComputeDriver):
+ """Pretend version of a lame virt driver"""
+ def get_available_resource(self):
+ # no support for getting resource usage info
+ return {}
+
+
+class FakeVirtDriver(driver.ComputeDriver):
+
+ def __init__(self):
+ self.memory_mb = 5
+ self.local_gb = 6
+ self.vcpus = 1
+
+ self.memory_mb_used = 0
+ self.local_gb_used = 0
+
+ def get_available_resource(self):
+ d = {
+ 'vcpus': self.vcpus,
+ 'memory_mb': self.memory_mb,
+ 'local_gb': self.local_gb,
+ 'vcpus_used': 0,
+ 'memory_mb_used': self.memory_mb_used,
+ 'local_gb_used': self.local_gb_used,
+ 'hypervisor_type': 'fake',
+ 'hypervisor_version': 0,
+ 'hypervisor_hostname': 'fakehost',
+ 'cpu_info': '',
+ }
+ return d
+
+
+class BaseTestCase(test.TestCase):
+
+ def setUp(self):
+ super(BaseTestCase, self).setUp()
+
+ self.context = FakeContext()
+
+ self.instance_ref = {
+ "memory_mb": 1,
+ "root_gb": 1,
+ "ephemeral_gb": 1,
+ "vm_state": vm_states.BUILDING,
+ "task_state": None,
+ "os_type": "Linux",
+ "project_id": "1234",
+ "vcpus": 1,
+ }
+
+ self.stubs.Set(db, 'instance_get_all_by_filters',
+ self._fake_instance_get_all_by_filters)
+
+ def _create_compute_node(self, values=None):
+ compute = {
+ "id": 1,
+ "service_id": 1,
+ "vcpus": 1,
+ "memory_mb": 1,
+ "local_gb": 1,
+ "vcpus_used": 1,
+ "memory_mb_used": 1,
+ "local_gb_used": 1,
+ "free_ram_mb": 1,
+ "free_disk_gb": 1,
+ "current_workload": 1,
+ "running_vms": 0,
+ "cpu_info": None,
+ "stats": [{"key": "num_instances", "value": "1"}]
+ }
+ if values:
+ compute.update(values)
+ return compute
+
+ def _create_service(self, host="fakehost", compute=None):
+ if compute:
+ compute = [compute]
+
+ service = {
+ "id": 1,
+ "host": host,
+ "binary": "nova-compute",
+ "topic": "compute",
+ "compute_node": compute,
+ }
+ return service
+
+ def _fake_instance_get_all_by_filters(self, ctx, filters, **kwargs):
+ return []
+
+ def _tracker(self, unsupported=False):
+ host = "fakehost"
+
+ if unsupported:
+ driver = UnsupportedVirtDriver()
+ else:
+ driver = FakeVirtDriver()
+
+ tracker = resource_tracker.ResourceTracker(host, driver)
+ return tracker
+
+
+class UnsupportedDriverTestCase(BaseTestCase):
+ """Resource tracking should be disabled when the virt driver doesn't
+ support it.
+ """
+ def setUp(self):
+ super(UnsupportedDriverTestCase, self).setUp()
+ self.tracker = self._tracker(unsupported=True)
+ # seed tracker with data:
+ self.tracker.update_available_resource(self.context)
+
+ def testDisabled(self):
+ # disabled = no compute node stats
+ self.assertTrue(self.tracker.disabled)
+ self.assertEqual(None, self.tracker.compute_node)
+
+ def testDisabledClaim(self):
+ # basic claim:
+ claim = self.tracker.begin_resource_claim(self.context, 1, 1)
+ self.assertEqual(None, claim)
+
+ def testDisabledContextClaim(self):
+ # basic context manager variation:
+ with self.tracker.resource_claim(self.context, 1, 1):
+ pass
+ self.assertEqual(0, len(self.tracker.claims))
+
+ def testDisabledInstanceClaim(self):
+ # instance variation:
+ claim = self.tracker.begin_instance_resource_claim(self.context,
+ self.instance_ref)
+ self.assertEqual(None, claim)
+
+ def testDisabledInstanceContextClaim(self):
+ # instance context manager variation:
+ with self.tracker.instance_resource_claim(self.context,
+ self.instance_ref):
+ pass
+ self.assertEqual(0, len(self.tracker.claims))
+
+ def testDisabledFinishClaim(self):
+ self.assertEqual(None, self.tracker.finish_resource_claim(None))
+
+ def testDisabledAbortClaim(self):
+ self.assertEqual(None, self.tracker.abort_resource_claim(self.context,
+ None))
+
+ def testDisabledFreeResources(self):
+ self.tracker.free_resources(self.context)
+ self.assertTrue(self.tracker.disabled)
+ self.assertEqual(None, self.tracker.compute_node)
+
+
+class MissingServiceTestCase(BaseTestCase):
+ def setUp(self):
+ super(MissingServiceTestCase, self).setUp()
+ self.context = FakeContext(is_admin=True)
+ self.tracker = self._tracker()
+
+ def testMissingService(self):
+ """No service record in DB."""
+ self.tracker.update_available_resource(self.context)
+ self.assertTrue(self.tracker.disabled)
+
+
+class MissingComputeNodeTestCase(BaseTestCase):
+ def setUp(self):
+ super(MissingComputeNodeTestCase, self).setUp()
+ self.tracker = self._tracker()
+
+ self.stubs.Set(db, 'service_get_all_compute_by_host',
+ self._fake_service_get_all_compute_by_host)
+ self.stubs.Set(db, 'compute_node_create',
+ self._fake_create_compute_node)
+
+ def _fake_create_compute_node(self, context, values):
+ self.created = True
+ return self._create_compute_node()
+
+ def _fake_service_get_all_compute_by_host(self, ctx, host):
+ # return a service with no joined compute
+ service = self._create_service()
+ return [service]
+
+ def testCreatedComputeNode(self):
+ self.tracker.update_available_resource(self.context)
+ self.assertTrue(self.created)
+
+ def testEnabled(self):
+ self.tracker.update_available_resource(self.context)
+ self.assertFalse(self.tracker.disabled)
+
+
+class ResourceTestCase(BaseTestCase):
+ def setUp(self):
+ super(ResourceTestCase, self).setUp()
+ self.tracker = self._tracker()
+ self.stubs.Set(db, 'service_get_all_compute_by_host',
+ self._fake_service_get_all_compute_by_host)
+ self.stubs.Set(db, 'compute_node_update',
+ self._fake_compute_node_update)
+
+ self.tracker.update_available_resource(self.context)
+
+ def _fake_service_get_all_compute_by_host(self, ctx, host):
+ self.compute = self._create_compute_node()
+ self.service = self._create_service(host, compute=self.compute)
+ return [self.service]
+
+ def _fake_compute_node_update(self, ctx, compute_node_id, values,
+ prune_stats=False):
+ self.updated = True
+ values['stats'] = [{"key": "num_instances", "value": "1"}]
+ self.compute.update(values)
+ return self.compute
+
+ def testFreeRamResourceValue(self):
+ driver = FakeVirtDriver()
+ mem_free = driver.memory_mb - driver.memory_mb_used
+ self.assertEqual(mem_free, self.tracker.compute_node['free_ram_mb'])
+
+ def testFreeDiskResourceValue(self):
+ driver = FakeVirtDriver()
+ mem_free = driver.local_gb - driver.local_gb_used
+ self.assertEqual(mem_free, self.tracker.compute_node['free_disk_gb'])
+
+ def testUpdateComputeNode(self):
+ self.assertFalse(self.tracker.disabled)
+ self.assertTrue(self.updated)
+
+ def testInsufficientMemoryClaim(self):
+ """Exceed memory limit of 5MB"""
+ claim = self.tracker.begin_resource_claim(self.context, memory_mb=2,
+ disk_gb=0)
+ self.assertNotEqual(None, claim)
+
+ claim = self.tracker.begin_resource_claim(self.context, memory_mb=3,
+ disk_gb=0)
+ self.assertNotEqual(None, claim)
+
+ claim = self.tracker.begin_resource_claim(self.context, memory_mb=1,
+ disk_gb=0)
+ self.assertEqual(None, claim)
+
+ def testInsufficientMemoryClaimWithOversubscription(self):
+ """Exceed oversubscribed memory limit of 10MB"""
+ claim = self.tracker.begin_resource_claim(self.context, memory_mb=10,
+ disk_gb=0, memory_mb_limit=10)
+ self.assertNotEqual(None, claim)
+
+ claim = self.tracker.begin_resource_claim(self.context, memory_mb=1,
+ disk_gb=0, memory_mb_limit=10)
+ self.assertEqual(None, claim)
+
+ def testInsufficientDiskClaim(self):
+ """Exceed disk limit of 5GB"""
+ claim = self.tracker.begin_resource_claim(self.context, memory_mb=0,
+ disk_gb=2)
+ self.assertNotEqual(None, claim)
+
+ claim = self.tracker.begin_resource_claim(self.context, memory_mb=0,
+ disk_gb=3)
+ self.assertNotEqual(None, claim)
+
+ claim = self.tracker.begin_resource_claim(self.context, memory_mb=0,
+ disk_gb=5)
+ self.assertEqual(None, claim)
+
+ def testClaimAndFinish(self):
+ self.assertEqual(5, self.tracker.compute_node['memory_mb'])
+ self.assertEqual(0, self.tracker.compute_node['memory_mb_used'])
+
+ self.assertEqual(6, self.tracker.compute_node['local_gb'])
+ self.assertEqual(0, self.tracker.compute_node['local_gb_used'])
+
+ claim_mem = 3
+ claim_disk = 2
+ claim = self.tracker.begin_resource_claim(self.context, claim_mem,
+ claim_disk)
+
+ self.assertEqual(5, self.compute["memory_mb"])
+ self.assertEqual(claim_mem, self.compute["memory_mb_used"])
+ self.assertEqual(5 - claim_mem, self.compute["free_ram_mb"])
+
+ self.assertEqual(6, self.compute["local_gb"])
+ self.assertEqual(claim_disk, self.compute["local_gb_used"])
+ self.assertEqual(6 - claim_disk, self.compute["free_disk_gb"])
+
+ # 1st pretend that the compute operation finished and claimed the
+ # desired resources from the virt layer
+ driver = self.tracker.driver
+ driver.memory_mb_used = claim_mem
+ driver.local_gb_used = claim_disk
+
+ # 2nd update compute node from the virt layer. because the claim is
+ # in-progress (unfinished), the audit will actually mark the resources
+ # as unsubscribed:
+ self.tracker.update_available_resource(self.context)
+
+ self.assertEqual(2 * claim_mem,
+ self.compute['memory_mb_used'])
+ self.assertEqual(5 - (2 * claim_mem),
+ self.compute['free_ram_mb'])
+
+ self.assertEqual(2 * claim_disk,
+ self.compute['local_gb_used'])
+ self.assertEqual(6 - (2 * claim_disk),
+ self.compute['free_disk_gb'])
+
+ # Finally, finish the claimm and update from the virt layer again.
+ # Resource usage will be consistent again:
+ self.tracker.finish_resource_claim(claim)
+ self.tracker.update_available_resource(self.context)
+
+ self.assertEqual(claim_mem,
+ self.compute['memory_mb_used'])
+ self.assertEqual(5 - claim_mem,
+ self.compute['free_ram_mb'])
+
+ self.assertEqual(claim_disk,
+ self.compute['local_gb_used'])
+ self.assertEqual(6 - claim_disk,
+ self.compute['free_disk_gb'])
+
+ def testClaimAndAbort(self):
+ self.assertEqual(5, self.tracker.compute_node['memory_mb'])
+ self.assertEqual(0, self.tracker.compute_node['memory_mb_used'])
+
+ self.assertEqual(6, self.tracker.compute_node['local_gb'])
+ self.assertEqual(0, self.tracker.compute_node['local_gb_used'])
+
+ claim_mem = 3
+ claim_disk = 2
+ claim = self.tracker.begin_resource_claim(self.context, claim_mem,
+ claim_disk)
+
+ self.assertEqual(5, self.compute["memory_mb"])
+ self.assertEqual(claim_mem, self.compute["memory_mb_used"])
+ self.assertEqual(5 - claim_mem, self.compute["free_ram_mb"])
+
+ self.assertEqual(6, self.compute["local_gb"])
+ self.assertEqual(claim_disk, self.compute["local_gb_used"])
+ self.assertEqual(6 - claim_disk, self.compute["free_disk_gb"])
+
+ self.tracker.abort_resource_claim(self.context, claim)
+
+ self.assertEqual(5, self.compute["memory_mb"])
+ self.assertEqual(0, self.compute["memory_mb_used"])
+ self.assertEqual(5, self.compute["free_ram_mb"])
+
+ self.assertEqual(6, self.compute["local_gb"])
+ self.assertEqual(0, self.compute["local_gb_used"])
+ self.assertEqual(6, self.compute["free_disk_gb"])
+
+ def testExpiredClaims(self):
+ """Test that old claims get cleaned up automatically if not finished
+ or aborted explicitly.
+ """
+ claim = self.tracker.begin_resource_claim(self.context, memory_mb=2,
+ disk_gb=2)
+ claim.expire_ts = timeutils.utcnow_ts() - 1
+ self.assertTrue(claim.is_expired())
+
+ # and an unexpired claim
+ claim2 = self.tracker.begin_resource_claim(self.context, memory_mb=1,
+ disk_gb=1)
+
+ self.assertEqual(2, len(self.tracker.claims))
+ self.assertEqual(2 + 1, self.tracker.compute_node['memory_mb_used'])
+ self.assertEqual(2 + 1, self.tracker.compute_node['local_gb_used'])
+
+ # expired claims get expunged when audit runs:
+ self.tracker.update_available_resource(self.context)
+
+ self.assertEqual(1, len(self.tracker.claims))
+ self.assertEqual(1, self.tracker.compute_node['memory_mb_used'])
+ self.assertEqual(1, self.tracker.compute_node['local_gb_used'])
+
+ # and just call finish & abort to ensure expired claims do not cause
+ # any other explosions:
+ self.tracker.abort_resource_claim(self.context, claim)
+ self.tracker.finish_resource_claim(claim)
+
+ def testInstanceClaim(self):
+ self.tracker.begin_instance_resource_claim(self.context,
+ self.instance_ref)
+ self.assertEqual(1, self.tracker.compute_node['memory_mb_used'])
+ self.assertEqual(2, self.tracker.compute_node['local_gb_used'])
+
+ def testContextClaim(self):
+ with self.tracker.resource_claim(self.context, memory_mb=1, disk_gb=1):
+ # <insert exciting things that utilize resources>
+ self.assertEqual(1, self.tracker.compute_node['memory_mb_used'])
+ self.assertEqual(1, self.tracker.compute_node['local_gb_used'])
+ self.assertEqual(1, self.compute['memory_mb_used'])
+ self.assertEqual(1, self.compute['local_gb_used'])
+
+ self.tracker.update_available_resource(self.context)
+ self.assertEqual(0, self.tracker.compute_node['memory_mb_used'])
+ self.assertEqual(0, self.tracker.compute_node['local_gb_used'])
+ self.assertEqual(0, self.compute['memory_mb_used'])
+ self.assertEqual(0, self.compute['local_gb_used'])
+
+ def testContextClaimWithException(self):
+ try:
+ with self.tracker.resource_claim(self.context, memory_mb=1,
+ disk_gb=1):
+ # <insert exciting things that utilize resources>
+ raise Exception("THE SKY IS FALLING")
+ except Exception:
+ pass
+
+ self.tracker.update_available_resource(self.context)
+ self.assertEqual(0, self.tracker.compute_node['memory_mb_used'])
+ self.assertEqual(0, self.tracker.compute_node['local_gb_used'])
+ self.assertEqual(0, self.compute['memory_mb_used'])
+ self.assertEqual(0, self.compute['local_gb_used'])
+
+ def testInstanceContextClaim(self):
+ with self.tracker.instance_resource_claim(self.context,
+ self.instance_ref):
+ # <insert exciting things that utilize resources>
+ self.assertEqual(1, self.tracker.compute_node['memory_mb_used'])
+ self.assertEqual(2, self.tracker.compute_node['local_gb_used'])
+ self.assertEqual(1, self.compute['memory_mb_used'])
+ self.assertEqual(2, self.compute['local_gb_used'])
+
+ self.tracker.update_available_resource(self.context)
+ self.assertEqual(0, self.tracker.compute_node['memory_mb_used'])
+ self.assertEqual(0, self.tracker.compute_node['local_gb_used'])
+ self.assertEqual(0, self.compute['memory_mb_used'])
+ self.assertEqual(0, self.compute['local_gb_used'])
+
+ def testUpdateLoadStatsForInstance(self):
+ self.assertFalse(self.tracker.disabled)
+ self.assertEqual(0, self.tracker.compute_node['current_workload'])
+
+ old_ref = self.instance_ref
+ old_ref['task_state'] = task_states.SCHEDULING
+ with self.tracker.instance_resource_claim(self.context, old_ref):
+ pass
+
+ self.assertEqual(1, self.tracker.compute_node['current_workload'])
+
+ new_ref = copy.copy(old_ref)
+ new_ref['vm_state'] = vm_states.ACTIVE
+ new_ref['task_state'] = None
+
+ self.tracker.update_load_stats_for_instance(self.context, old_ref,
+ new_ref)
+ self.assertEqual(0, self.tracker.compute_node['current_workload'])
diff --git a/nova/tests/compute/test_stats.py b/nova/tests/compute/test_stats.py
new file mode 100644
index 000000000..0e4ac666d
--- /dev/null
+++ b/nova/tests/compute/test_stats.py
@@ -0,0 +1,182 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2012 OpenStack, LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Tests for compute node stats"""
+
+from nova.compute import stats
+from nova.compute import task_states
+from nova.compute import vm_states
+from nova import db
+from nova import exception
+from nova import test
+
+
+class StatsTestCase(test.TestCase):
+ def setUp(self):
+ super(StatsTestCase, self).setUp()
+ self.stats = stats.Stats()
+
+ def _create_instance(self, values=None):
+ instance = {
+ "os_type": "Linux",
+ "project_id": "1234",
+ "task_state": None,
+ "vm_state": vm_states.BUILDING,
+ "vcpus": 1,
+ }
+ if values:
+ instance.update(values)
+ return instance
+
+ def testOsTypeCount(self):
+ os_type = "Linux"
+ self.assertEqual(0, self.stats.num_os_type(os_type))
+ self.stats._increment("num_os_type_" + os_type)
+ self.stats._increment("num_os_type_" + os_type)
+ self.stats._increment("num_os_type_Vax")
+ self.assertEqual(2, self.stats.num_os_type(os_type))
+ self.stats["num_os_type_" + os_type] -= 1
+ self.assertEqual(1, self.stats.num_os_type(os_type))
+
+ def testUpdateProjectCount(self):
+ proj_id = "1234"
+
+ def _get():
+ return self.stats.num_instances_for_project(proj_id)
+
+ self.assertEqual(0, _get())
+ self.stats._increment("num_proj_" + proj_id)
+ self.assertEqual(1, _get())
+ self.stats["num_proj_" + proj_id] -= 1
+ self.assertEqual(0, _get())
+
+ def testInstanceCount(self):
+ self.assertEqual(0, self.stats.num_instances)
+ for i in range(5):
+ self.stats._increment("num_instances")
+ self.stats["num_instances"] -= 1
+ self.assertEqual(4, self.stats.num_instances)
+
+ def testAddStatsForInstance(self):
+ instance = {
+ "os_type": "Linux",
+ "project_id": "1234",
+ "task_state": None,
+ "vm_state": vm_states.BUILDING,
+ "vcpus": 3,
+ }
+ self.stats.add_stats_for_instance(instance)
+
+ instance = {
+ "os_type": "FreeBSD",
+ "project_id": "1234",
+ "task_state": task_states.SCHEDULING,
+ "vm_state": None,
+ "vcpus": 1,
+ }
+ self.stats.add_stats_for_instance(instance)
+
+ instance = {
+ "os_type": "Linux",
+ "project_id": "2345",
+ "task_state": task_states.SCHEDULING,
+ "vm_state": vm_states.BUILDING,
+ "vcpus": 2,
+ }
+ self.stats.add_stats_for_instance(instance)
+
+ self.assertEqual(2, self.stats.num_os_type("Linux"))
+ self.assertEqual(1, self.stats.num_os_type("FreeBSD"))
+
+ self.assertEquals(2, self.stats.num_instances_for_project("1234"))
+ self.assertEquals(1, self.stats.num_instances_for_project("2345"))
+
+ self.assertEqual(1, self.stats["num_task_None"])
+ self.assertEqual(2, self.stats["num_task_" + task_states.SCHEDULING])
+
+ self.assertEqual(1, self.stats["num_vm_None"])
+ self.assertEqual(2, self.stats["num_vm_" + vm_states.BUILDING])
+
+ self.assertEqual(6, self.stats.num_vcpus_used)
+
+ def testCalculateWorkload(self):
+ self.stats._increment("num_task_None")
+ self.stats._increment("num_task_" + task_states.SCHEDULING)
+ self.stats._increment("num_task_" + task_states.SCHEDULING)
+ self.assertEqual(2, self.stats.calculate_workload())
+
+ def testUpdateStatsForInstanceNoChange(self):
+ old = self._create_instance()
+ self.stats.add_stats_for_instance(old)
+
+ self.stats.update_stats_for_instance(old, old) # no change
+ self.assertEqual(1, self.stats.num_instances)
+ self.assertEqual(1, self.stats.num_instances_for_project("1234"))
+ self.assertEqual(1, self.stats["num_os_type_Linux"])
+ self.assertEqual(1, self.stats["num_task_None"])
+ self.assertEqual(1, self.stats["num_vm_" + vm_states.BUILDING])
+
+ def testUpdateStatsForInstanceVmChange(self):
+ old = self._create_instance()
+ self.stats.add_stats_for_instance(old)
+
+ new = self._create_instance({"vm_state": vm_states.PAUSED})
+ self.stats.update_stats_for_instance(old, new)
+ self.assertEqual(1, self.stats.num_instances)
+ self.assertEqual(1, self.stats.num_instances_for_project(1234))
+ self.assertEqual(1, self.stats.num_os_type("Linux"))
+ self.assertEqual(0, self.stats.num_vm_state(vm_states.BUILDING))
+ self.assertEqual(1, self.stats.num_vm_state(vm_states.PAUSED))
+
+ def testUpdateStatsForInstanceVmChange(self):
+ old = self._create_instance()
+ self.stats.add_stats_for_instance(old)
+
+ new = self._create_instance({"task_state": task_states.REBUILDING})
+ self.stats.update_stats_for_instance(old, new)
+ self.assertEqual(1, self.stats.num_instances)
+ self.assertEqual(1, self.stats.num_instances_for_project("1234"))
+ self.assertEqual(1, self.stats["num_os_type_Linux"])
+ self.assertEqual(0, self.stats["num_task_None"])
+ self.assertEqual(1, self.stats["num_task_" + task_states.REBUILDING])
+
+ def testUpdateStatsForInstanceDeleted(self):
+ old = self._create_instance()
+ self.stats.add_stats_for_instance(old)
+ self.assertEqual(1, self.stats["num_proj_1234"])
+
+ new = self._create_instance({"vm_state": vm_states.DELETED})
+ self.stats.update_stats_for_instance(old, new)
+
+ self.assertEqual(0, self.stats.num_instances)
+ self.assertEqual(0, self.stats.num_instances_for_project("1234"))
+ self.assertEqual(0, self.stats.num_os_type("Linux"))
+ self.assertEqual(0, self.stats["num_vm_" + vm_states.BUILDING])
+ self.assertEqual(0, self.stats.num_vcpus_used)
+
+ def testIoWorkload(self):
+ vms = [vm_states.ACTIVE, vm_states.BUILDING, vm_states.PAUSED]
+ tasks = [task_states.RESIZE_MIGRATING, task_states.REBUILDING,
+ task_states.RESIZE_PREP, task_states.IMAGE_SNAPSHOT,
+ task_states.IMAGE_BACKUP, task_states.RESCUING]
+
+ for state in vms:
+ self.stats._increment("num_vm_" + state)
+ for state in tasks:
+ self.stats._increment("num_task_" + state)
+
+ self.assertEqual(6, self.stats.calculate_io_workload())
diff --git a/nova/tests/scheduler/fakes.py b/nova/tests/scheduler/fakes.py
index 84424c1c7..e48e987a4 100644
--- a/nova/tests/scheduler/fakes.py
+++ b/nova/tests/scheduler/fakes.py
@@ -27,12 +27,16 @@ from nova.scheduler import host_manager
COMPUTE_NODES = [
dict(id=1, local_gb=1024, memory_mb=1024, vcpus=1,
+ free_disk_gb=512, free_ram_mb=512, vcpus_used=1,
service=dict(host='host1', disabled=False)),
dict(id=2, local_gb=2048, memory_mb=2048, vcpus=2,
+ free_disk_gb=1024, free_ram_mb=1024, vcpus_used=2,
service=dict(host='host2', disabled=True)),
dict(id=3, local_gb=4096, memory_mb=4096, vcpus=4,
+ free_disk_gb=3072, free_ram_mb=3072, vcpus_used=1,
service=dict(host='host3', disabled=False)),
dict(id=4, local_gb=8192, memory_mb=8192, vcpus=8,
+ free_disk_gb=8192, free_ram_mb=8192, vcpus_used=0,
service=dict(host='host4', disabled=False)),
# Broken entry
dict(id=5, local_gb=1024, memory_mb=1024, vcpus=1, service=None),
@@ -128,8 +132,5 @@ class FakeComputeAPI(object):
def mox_host_manager_db_calls(mock, context):
mock.StubOutWithMock(db, 'compute_node_get_all')
- mock.StubOutWithMock(db, 'instance_get_all')
db.compute_node_get_all(mox.IgnoreArg()).AndReturn(COMPUTE_NODES)
- db.instance_get_all(mox.IgnoreArg(),
- columns_to_join=['instance_type']).AndReturn(INSTANCES)
diff --git a/nova/tests/scheduler/test_filter_scheduler.py b/nova/tests/scheduler/test_filter_scheduler.py
index 64bf5415f..1cab6ebbf 100644
--- a/nova/tests/scheduler/test_filter_scheduler.py
+++ b/nova/tests/scheduler/test_filter_scheduler.py
@@ -203,7 +203,8 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
self.assertEquals(weight, -1.0)
hostinfo = host_manager.HostState('host', 'compute')
hostinfo.update_from_compute_node(dict(memory_mb=1000,
- local_gb=0, vcpus=1))
+ local_gb=0, vcpus=1, free_disk_gb=1000, free_ram_mb=1000,
+ vcpus_used=0))
self.assertEquals(1000 - 128, fn(hostinfo, {}))
def test_max_attempts(self):
diff --git a/nova/tests/scheduler/test_host_filters.py b/nova/tests/scheduler/test_host_filters.py
index d6f083576..a25e9afae 100644
--- a/nova/tests/scheduler/test_host_filters.py
+++ b/nova/tests/scheduler/test_host_filters.py
@@ -339,6 +339,24 @@ class HostFiltersTestCase(test.TestCase):
'capabilities': capabilities, 'service': service})
self.assertTrue(filt_cls.host_passes(host, filter_properties))
+ def test_ram_filter_sets_memory_limit(self):
+ """Test that ram filter sets a filter_property denoting the memory
+ ceiling.
+ """
+ self._stub_service_is_up(True)
+ filt_cls = self.class_map['RamFilter']()
+ self.flags(ram_allocation_ratio=2.0)
+ filter_properties = {'instance_type': {'memory_mb': 1024}}
+ capabilities = {'enabled': True}
+ service = {'disabled': False}
+ host = fakes.FakeHostState('host1', 'compute',
+ {'free_ram_mb': -1024, 'total_usable_ram_mb': 2048,
+ 'capabilities': capabilities, 'service': service})
+ filt_cls.host_passes(host, filter_properties)
+
+ self.assertEqual(host.total_usable_ram_mb * 2.0,
+ filter_properties['memory_limit_mb'])
+
def test_compute_filter_fails_on_service_disabled(self):
self._stub_service_is_up(True)
filt_cls = self.class_map['ComputeFilter']()
diff --git a/nova/tests/scheduler/test_host_manager.py b/nova/tests/scheduler/test_host_manager.py
index 45ad1021b..a2f9fc425 100644
--- a/nova/tests/scheduler/test_host_manager.py
+++ b/nova/tests/scheduler/test_host_manager.py
@@ -130,14 +130,10 @@ class HostManagerTestCase(test.TestCase):
self.mox.StubOutWithMock(db, 'compute_node_get_all')
self.mox.StubOutWithMock(host_manager.LOG, 'warn')
- self.mox.StubOutWithMock(db, 'instance_get_all')
db.compute_node_get_all(context).AndReturn(fakes.COMPUTE_NODES)
# Invalid service
host_manager.LOG.warn("No service for compute ID 5")
- db.instance_get_all(context,
- columns_to_join=['instance_type']).AndReturn(
- fakes.INSTANCES)
self.mox.ReplayAll()
host_states = self.host_manager.get_all_host_states(context, topic)
diff --git a/nova/tests/test_db_api.py b/nova/tests/test_db_api.py
index 93e0edfbc..e2420b48f 100644
--- a/nova/tests/test_db_api.py
+++ b/nova/tests/test_db_api.py
@@ -887,9 +887,15 @@ class CapacityTestCase(test.TestCase):
self.compute_node_dict = dict(vcpus=2, memory_mb=1024, local_gb=2048,
vcpus_used=0, memory_mb_used=0,
- local_gb_used=0, hypervisor_type="xen",
+ local_gb_used=0, free_ram_mb=1024,
+ free_disk_gb=2048, hypervisor_type="xen",
hypervisor_version=1, cpu_info="",
+ running_vms=0, current_workload=0,
service_id=self.service.id)
+ # add some random stats
+ stats = dict(num_instances=3, num_proj_12345=2,
+ num_proj_23456=2, num_vm_building=3)
+ self.compute_node_dict['stats'] = stats
self.flags(reserved_host_memory_mb=0)
self.flags(reserved_host_disk_mb=0)
@@ -898,6 +904,13 @@ class CapacityTestCase(test.TestCase):
self.compute_node_dict['host'] = host
return db.compute_node_create(self.ctxt, self.compute_node_dict)
+ def _stats_as_dict(self, stats):
+ d = {}
+ for s in stats:
+ key = s['key']
+ d[key] = s['value']
+ return d
+
def test_compute_node_create(self):
item = self._create_helper('host1')
self.assertEquals(item.free_ram_mb, 1024)
@@ -905,70 +918,63 @@ class CapacityTestCase(test.TestCase):
self.assertEquals(item.running_vms, 0)
self.assertEquals(item.current_workload, 0)
- def test_compute_node_create_with_reservations(self):
- self.flags(reserved_host_memory_mb=256)
+ stats = self._stats_as_dict(item['stats'])
+ self.assertEqual(3, stats['num_instances'])
+ self.assertEqual(2, stats['num_proj_12345'])
+ self.assertEqual(3, stats['num_vm_building'])
+
+ def test_compute_node_get_all(self):
+ item = self._create_helper('host1')
+ nodes = db.compute_node_get_all(self.ctxt)
+ self.assertEqual(1, len(nodes))
+
+ node = nodes[0]
+ self.assertEqual(2, node['vcpus'])
+
+ stats = self._stats_as_dict(node['stats'])
+ self.assertEqual(3, int(stats['num_instances']))
+ self.assertEqual(2, int(stats['num_proj_12345']))
+ self.assertEqual(3, int(stats['num_vm_building']))
+
+ def test_compute_node_update(self):
+ item = self._create_helper('host1')
+
+ compute_node_id = item['id']
+ stats = self._stats_as_dict(item['stats'])
+
+ # change some values:
+ stats['num_instances'] = 8
+ stats['num_tribbles'] = 1
+ values = {
+ 'vcpus': 4,
+ 'stats': stats,
+ }
+ item = db.compute_node_update(self.ctxt, compute_node_id, values)
+ stats = self._stats_as_dict(item['stats'])
+
+ self.assertEqual(4, item['vcpus'])
+ self.assertEqual(8, int(stats['num_instances']))
+ self.assertEqual(2, int(stats['num_proj_12345']))
+ self.assertEqual(1, int(stats['num_tribbles']))
+
+ def test_compute_node_stat_prune(self):
item = self._create_helper('host1')
- self.assertEquals(item.free_ram_mb, 1024 - 256)
-
- def test_compute_node_set(self):
- self._create_helper('host1')
-
- x = db.compute_node_utilization_set(self.ctxt, 'host1',
- free_ram_mb=2048, free_disk_gb=4096)
- self.assertEquals(x.free_ram_mb, 2048)
- self.assertEquals(x.free_disk_gb, 4096)
- self.assertEquals(x.running_vms, 0)
- self.assertEquals(x.current_workload, 0)
-
- x = db.compute_node_utilization_set(self.ctxt, 'host1', work=3)
- self.assertEquals(x.free_ram_mb, 2048)
- self.assertEquals(x.free_disk_gb, 4096)
- self.assertEquals(x.current_workload, 3)
- self.assertEquals(x.running_vms, 0)
-
- x = db.compute_node_utilization_set(self.ctxt, 'host1', vms=5)
- self.assertEquals(x.free_ram_mb, 2048)
- self.assertEquals(x.free_disk_gb, 4096)
- self.assertEquals(x.current_workload, 3)
- self.assertEquals(x.running_vms, 5)
-
- def test_compute_node_utilization_update(self):
- self._create_helper('host1')
-
- x = db.compute_node_utilization_update(self.ctxt, 'host1',
- free_ram_mb_delta=-24)
- self.assertEquals(x.free_ram_mb, 1000)
- self.assertEquals(x.free_disk_gb, 2048)
- self.assertEquals(x.running_vms, 0)
- self.assertEquals(x.current_workload, 0)
-
- x = db.compute_node_utilization_update(self.ctxt, 'host1',
- free_disk_gb_delta=-48)
- self.assertEquals(x.free_ram_mb, 1000)
- self.assertEquals(x.free_disk_gb, 2000)
- self.assertEquals(x.running_vms, 0)
- self.assertEquals(x.current_workload, 0)
-
- x = db.compute_node_utilization_update(self.ctxt, 'host1',
- work_delta=3)
- self.assertEquals(x.free_ram_mb, 1000)
- self.assertEquals(x.free_disk_gb, 2000)
- self.assertEquals(x.current_workload, 3)
- self.assertEquals(x.running_vms, 0)
-
- x = db.compute_node_utilization_update(self.ctxt, 'host1',
- work_delta=-1)
- self.assertEquals(x.free_ram_mb, 1000)
- self.assertEquals(x.free_disk_gb, 2000)
- self.assertEquals(x.current_workload, 2)
- self.assertEquals(x.running_vms, 0)
-
- x = db.compute_node_utilization_update(self.ctxt, 'host1',
- vm_delta=5)
- self.assertEquals(x.free_ram_mb, 1000)
- self.assertEquals(x.free_disk_gb, 2000)
- self.assertEquals(x.current_workload, 2)
- self.assertEquals(x.running_vms, 5)
+ for stat in item['stats']:
+ if stat['key'] == 'num_instances':
+ num_instance_stat = stat
+ break
+
+ values = {
+ 'stats': dict(num_instances=1)
+ }
+ db.compute_node_update(self.ctxt, item['id'], values, prune_stats=True)
+ item = db.compute_node_get_all(self.ctxt)[0]
+ self.assertEqual(1, len(item['stats']))
+
+ stat = item['stats'][0]
+ self.assertEqual(num_instance_stat['id'], stat['id'])
+ self.assertEqual(num_instance_stat['key'], stat['key'])
+ self.assertEqual(1, int(stat['value']))
class TestIpAllocation(test.TestCase):
diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py
index c84a924e3..5ab059d07 100644
--- a/nova/tests/test_xenapi.py
+++ b/nova/tests/test_xenapi.py
@@ -1240,7 +1240,7 @@ class XenAPIHostTestCase(stubs.XenAPITestBase):
def test_host_state(self):
stats = self.conn.get_host_stats()
- self.assertEquals(stats['disk_total'], 10000)
+ self.assertEquals(stats['disk_total'], 40000)
self.assertEquals(stats['disk_used'], 20000)
self.assertEquals(stats['host_memory_total'], 10)
self.assertEquals(stats['host_memory_overhead'], 20)
diff --git a/nova/virt/baremetal/driver.py b/nova/virt/baremetal/driver.py
index 9fcdf4eb2..d92688276 100644
--- a/nova/virt/baremetal/driver.py
+++ b/nova/virt/baremetal/driver.py
@@ -659,22 +659,13 @@ class BareMetalDriver(driver.ComputeDriver):
# Bare metal doesn't currently support security groups
pass
- def update_available_resource(self, ctxt, host):
+ def get_available_resource(self):
"""Updates compute manager resource info on ComputeNode table.
This method is called when nova-coompute launches, and
whenever admin executes "nova-manage service update_resource".
-
- :param ctxt: security context
- :param host: hostname that compute manager is currently running
-
"""
- try:
- service_ref = db.service_get_all_compute_by_host(ctxt, host)[0]
- except exception.NotFound:
- raise exception.ComputeServiceUnavailable(host=host)
-
# Updating host information
dic = {'vcpus': self.get_vcpu_total(),
'memory_mb': self.get_memory_mb_total(),
@@ -685,18 +676,10 @@ class BareMetalDriver(driver.ComputeDriver):
'hypervisor_type': self.get_hypervisor_type(),
'hypervisor_version': self.get_hypervisor_version(),
'cpu_info': self.get_cpu_info(),
- 'cpu_arch': FLAGS.cpu_arch,
- 'service_id': service_ref['id']}
+ 'cpu_arch': FLAGS.cpu_arch}
- compute_node_ref = service_ref['compute_node']
LOG.info(_('#### RLK: cpu_arch = %s ') % FLAGS.cpu_arch)
- if not compute_node_ref:
- LOG.info(_('Compute_service record created for %s ') % host)
- dic['service_id'] = service_ref['id']
- db.compute_node_create(ctxt, dic)
- else:
- LOG.info(_('Compute_service record updated for %s ') % host)
- db.compute_node_update(ctxt, compute_node_ref[0]['id'], dic)
+ return dic
def ensure_filtering_rules_for_instance(self, instance_ref, network_info):
raise NotImplementedError()
diff --git a/nova/virt/driver.py b/nova/virt/driver.py
index dd3646bc9..1d8f94e9d 100644
--- a/nova/virt/driver.py
+++ b/nova/virt/driver.py
@@ -325,15 +325,13 @@ class ComputeDriver(object):
"""Power on the specified instance"""
raise NotImplementedError()
- def update_available_resource(self, ctxt, host):
- """Updates compute manager resource info on ComputeNode table.
+ def get_available_resource(self):
+ """Retrieve resource information.
This method is called when nova-compute launches, and
- whenever admin executes "nova-manage service update_resource".
-
- :param ctxt: security context
- :param host: hostname that compute manager is currently running
+ as part of a periodic task
+ :returns: Dictionary describing resources
"""
raise NotImplementedError()
diff --git a/nova/virt/fake.py b/nova/virt/fake.py
index dd20b0b15..db7f3a6ff 100644
--- a/nova/virt/fake.py
+++ b/nova/virt/fake.py
@@ -221,19 +221,13 @@ class FakeDriver(driver.ComputeDriver):
def refresh_provider_fw_rules(self):
pass
- def update_available_resource(self, ctxt, host):
+ def get_available_resource(self):
"""Updates compute manager resource info on ComputeNode table.
Since we don't have a real hypervisor, pretend we have lots of
disk and ram.
"""
- try:
- service_ref = db.service_get_all_compute_by_host(ctxt, host)[0]
- except exception.NotFound:
- raise exception.ComputeServiceUnavailable(host=host)
-
- # Updating host information
dic = {'vcpus': 1,
'memory_mb': 4096,
'local_gb': 1028,
@@ -242,16 +236,8 @@ class FakeDriver(driver.ComputeDriver):
'local_gb_used': 0,
'hypervisor_type': 'fake',
'hypervisor_version': '1.0',
- 'service_id': service_ref['id'],
- 'cpu_info': '?'}
-
- compute_node_ref = service_ref['compute_node']
- if not compute_node_ref:
- LOG.info(_('Compute_service record created for %s ') % host)
- db.compute_node_create(ctxt, dic)
- else:
- LOG.info(_('Compute_service record updated for %s ') % host)
- db.compute_node_update(ctxt, compute_node_ref[0]['id'], dic)
+ 'cpu_info': '?'}
+ return dic
def ensure_filtering_rules_for_instance(self, instance_ref, network_info):
"""This method is supported only by libvirt."""
diff --git a/nova/virt/libvirt/driver.py b/nova/virt/libvirt/driver.py
index 1e2706841..adc0c8c7b 100644
--- a/nova/virt/libvirt/driver.py
+++ b/nova/virt/libvirt/driver.py
@@ -2173,23 +2173,14 @@ class LibvirtDriver(driver.ComputeDriver):
def refresh_provider_fw_rules(self):
self.firewall_driver.refresh_provider_fw_rules()
- def update_available_resource(self, ctxt, host):
- """Updates compute manager resource info on ComputeNode table.
+ def get_available_resource(self):
+ """Retrieve resource info.
This method is called as a periodic task and is used only
in live migration currently.
- :param ctxt: security context
- :param host: hostname that compute manager is currently running
-
+ :returns: dictionary containing resource info
"""
-
- try:
- service_ref = db.service_get_all_compute_by_host(ctxt, host)[0]
- except exception.NotFound:
- raise exception.ComputeServiceUnavailable(host=host)
-
- # Updating host information
dic = {'vcpus': self.get_vcpu_total(),
'memory_mb': self.get_memory_mb_total(),
'local_gb': self.get_local_gb_total(),
@@ -2200,16 +2191,8 @@ class LibvirtDriver(driver.ComputeDriver):
'hypervisor_version': self.get_hypervisor_version(),
'hypervisor_hostname': self.get_hypervisor_hostname(),
'cpu_info': self.get_cpu_info(),
- 'service_id': service_ref['id'],
'disk_available_least': self.get_disk_available_least()}
-
- compute_node_ref = service_ref['compute_node']
- if not compute_node_ref:
- LOG.info(_('Compute_service record created for %s ') % host)
- db.compute_node_create(ctxt, dic)
- else:
- LOG.info(_('Compute_service record updated for %s ') % host)
- db.compute_node_update(ctxt, compute_node_ref[0]['id'], dic)
+ return dic
def check_can_live_migrate_destination(self, ctxt, instance_ref,
block_migration=False,
diff --git a/nova/virt/vmwareapi/driver.py b/nova/virt/vmwareapi/driver.py
index 947bd3422..98d5dc306 100644
--- a/nova/virt/vmwareapi/driver.py
+++ b/nova/virt/vmwareapi/driver.py
@@ -196,7 +196,7 @@ class VMWareESXDriver(driver.ComputeDriver):
'username': FLAGS.vmwareapi_host_username,
'password': FLAGS.vmwareapi_host_password}
- def update_available_resource(self, ctxt, host):
+ def get_available_resource(self):
"""This method is supported only by libvirt."""
return
diff --git a/nova/virt/xenapi/driver.py b/nova/virt/xenapi/driver.py
index 3709c13af..6eb39e2b7 100644
--- a/nova/virt/xenapi/driver.py
+++ b/nova/virt/xenapi/driver.py
@@ -372,26 +372,22 @@ class XenAPIDriver(driver.ComputeDriver):
'username': FLAGS.xenapi_connection_username,
'password': FLAGS.xenapi_connection_password}
- def update_available_resource(self, ctxt, host):
- """Updates compute manager resource info on ComputeNode table.
+ def get_available_resource(self):
+ """Retrieve resource info.
This method is called when nova-compute launches, and
- whenever admin executes "nova-manage service update_resource".
+ as part of a periodic task.
- :param ctxt: security context
- :param host: hostname that compute manager is currently running
+ :returns: dictionary describing resources
"""
- try:
- service_ref = db.service_get_all_compute_by_host(ctxt, host)[0]
- except exception.NotFound:
- raise exception.ComputeServiceUnavailable(host=host)
-
host_stats = self.get_host_stats(refresh=True)
# Updating host information
total_ram_mb = host_stats['host_memory_total'] / (1024 * 1024)
- free_ram_mb = host_stats['host_memory_free'] / (1024 * 1024)
+ # NOTE(belliott) memory-free-computed is a value provided by XenServer
+ # for gauging free memory more conservatively than memory-free.
+ free_ram_mb = host_stats['host_memory_free_computed'] / (1024 * 1024)
total_disk_gb = host_stats['disk_total'] / (1024 * 1024 * 1024)
used_disk_gb = host_stats['disk_used'] / (1024 * 1024 * 1024)
@@ -404,16 +400,9 @@ class XenAPIDriver(driver.ComputeDriver):
'hypervisor_type': 'xen',
'hypervisor_version': 0,
'hypervisor_hostname': host_stats['host_hostname'],
- 'service_id': service_ref['id'],
'cpu_info': host_stats['host_cpu_info']['cpu_count']}
- compute_node_ref = service_ref['compute_node']
- if not compute_node_ref:
- LOG.info(_('Compute_service record created for %s ') % host)
- db.compute_node_create(ctxt, dic)
- else:
- LOG.info(_('Compute_service record updated for %s ') % host)
- db.compute_node_update(ctxt, compute_node_ref[0]['id'], dic)
+ return dic
def ensure_filtering_rules_for_instance(self, instance_ref, network_info):
# NOTE(salvatore-orlando): it enforces security groups on
diff --git a/nova/virt/xenapi/fake.py b/nova/virt/xenapi/fake.py
index 1f8e400c8..7cb6b5f59 100644
--- a/nova/virt/xenapi/fake.py
+++ b/nova/virt/xenapi/fake.py
@@ -254,6 +254,7 @@ def create_local_srs():
other_config={'i18n-original-value-name_label':
'Local storage',
'i18n-key': 'local-storage'},
+ physical_size=40000,
physical_utilisation=20000,
virtual_allocation=10000,
host_ref=host_ref)
@@ -262,6 +263,7 @@ def create_local_srs():
other_config={'i18n-original-value-name_label':
'Local storage ISO',
'i18n-key': 'local-storage-iso'},
+ physical_size=80000,
physical_utilisation=40000,
virtual_allocation=80000,
host_ref=host_ref)
diff --git a/nova/virt/xenapi/host.py b/nova/virt/xenapi/host.py
index b45a9106c..9db708925 100644
--- a/nova/virt/xenapi/host.py
+++ b/nova/virt/xenapi/host.py
@@ -155,8 +155,9 @@ class HostState(object):
# No SR configured
LOG.error(_("Unable to get SR for this host: %s") % e)
return
+ self._session.call_xenapi("SR.scan", sr_ref)
sr_rec = self._session.call_xenapi("SR.get_record", sr_ref)
- total = int(sr_rec["virtual_allocation"])
+ total = int(sr_rec["physical_size"])
used = int(sr_rec["physical_utilisation"])
data["disk_total"] = total
data["disk_used"] = used