diff options
29 files changed, 1675 insertions, 301 deletions
diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 6d499f9fb..e45c3333d 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -47,6 +47,7 @@ from nova import block_device from nova import compute from nova.compute import instance_types from nova.compute import power_state +from nova.compute import resource_tracker from nova.compute import rpcapi as compute_rpcapi from nova.compute import task_states from nova.compute import utils as compute_utils @@ -278,11 +279,16 @@ class ComputeManager(manager.SchedulerDependentManager): super(ComputeManager, self).__init__(service_name="compute", *args, **kwargs) + self.resource_tracker = resource_tracker.ResourceTracker(self.host, + self.driver) + def _instance_update(self, context, instance_uuid, **kwargs): """Update an instance in the database using kwargs as value.""" (old_ref, instance_ref) = self.db.instance_update_and_get_original( context, instance_uuid, kwargs) + self.resource_tracker.update_load_stats_for_instance(context, old_ref, + instance_ref) notifications.send_update(context, old_ref, instance_ref) return instance_ref @@ -523,10 +529,16 @@ class ComputeManager(manager.SchedulerDependentManager): network_info = self._allocate_network(context, instance, requested_networks) try: - block_device_info = self._prep_block_device(context, instance) - instance = self._spawn(context, instance, image_meta, - network_info, block_device_info, - injected_files, admin_password) + memory_mb_limit = filter_properties.get('memory_mb_limit', + None) + with self.resource_tracker.instance_resource_claim(context, + instance, memory_mb_limit=memory_mb_limit): + block_device_info = self._prep_block_device(context, + instance) + instance = self._spawn(context, instance, image_meta, + network_info, block_device_info, + injected_files, admin_password) + except exception.InstanceNotFound: raise # the instance got deleted during the spawn except Exception: @@ -920,6 +932,8 @@ class ComputeManager(manager.SchedulerDependentManager): self.db.instance_destroy(context, instance_uuid) system_meta = self.db.instance_system_metadata_get(context, instance_uuid) + # mark resources free + self.resource_tracker.free_resources(context) self._notify_about_instance_usage(context, instance, "delete.end", system_metadata=system_meta) @@ -2902,13 +2916,14 @@ class ComputeManager(manager.SchedulerDependentManager): @manager.periodic_task def update_available_resource(self, context): - """See driver.update_available_resource() + """See driver.get_available_resource() - :param context: security context - :returns: See driver.update_available_resource() + Periodic process that keeps that the compute host's understanding of + resource availability and usage in sync with the underlying hypervisor. + :param context: security context """ - self.driver.update_available_resource(context, self.host) + self.resource_tracker.update_available_resource(context) def _add_instance_fault_from_exc(self, context, instance_uuid, fault, exc_info=None): diff --git a/nova/compute/resource_tracker.py b/nova/compute/resource_tracker.py new file mode 100644 index 000000000..3a43390a0 --- /dev/null +++ b/nova/compute/resource_tracker.py @@ -0,0 +1,450 @@ +# Copyright (c) 2012 OpenStack, LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Track resources like memory and disk for a compute host. Provides the +scheduler with useful information about availability through the ComputeNode +model. +""" + +from nova import context +from nova import db +from nova import exception +from nova import flags +from nova.openstack.common import cfg +from nova.openstack.common import importutils +from nova.openstack.common import log as logging +from nova.openstack.common import timeutils +from nova import utils + +resource_tracker_opts = [ + cfg.IntOpt('claim_timeout_seconds', default=600, + help='How long, in seconds, before a resource claim times out'), + cfg.StrOpt('compute_stats_class', + default='nova.compute.stats.Stats', + help='Class that will manage stats for the local compute host') +] + +FLAGS = flags.FLAGS +FLAGS.register_opts(resource_tracker_opts) + +LOG = logging.getLogger(__name__) +COMPUTE_RESOURCE_SEMAPHORE = "compute_resources" + + +class Claim(object): + """A declaration that a compute host operation will require free resources. + + This information will be used to help keep the local compute hosts's + ComputeNode model in sync to aid the scheduler in making efficient / more + correct decisions with respect to host selection. + """ + + def __init__(self, claim_id, memory_mb, disk_gb, timeout, *args, **kwargs): + self.claim_id = claim_id + self.memory_mb = memory_mb + self.disk_gb = disk_gb + self.timeout = timeout + self.expire_ts = timeutils.utcnow_ts() + timeout + + def apply_claim(self, resources): + """Adjust the resources required from available resources. + + :param resources: Should be a dictionary-like object that + has fields like a compute node + """ + return self._apply(resources) + + def undo_claim(self, resources): + return self._apply(resources, sign=-1) + + def is_expired(self): + """Determine if this adjustment is old enough that we can assume it's + no longer needed. + """ + return timeutils.utcnow_ts() > self.expire_ts + + def _apply(self, resources, sign=1): + values = {} + values['memory_mb_used'] = (resources['memory_mb_used'] + sign * + self.memory_mb) + values['free_ram_mb'] = (resources['free_ram_mb'] - sign * + self.memory_mb) + values['local_gb_used'] = (resources['local_gb_used'] + sign * + self.disk_gb) + values['free_disk_gb'] = (resources['free_disk_gb'] - sign * + self.disk_gb) + + return values + + def __str__(self): + return "[Claim %d: %d MB memory, %d GB disk]" % (self.claim_id, + self.memory_mb, self.disk_gb) + + +class ResourceContextManager(object): + def __init__(self, context, claim, tracker): + self.context = context + self.claim = claim + self.tracker = tracker + + def __enter__(self): + if not self.claim and not self.tracker.disabled: + # insufficient resources to complete request + raise exception.ComputeResourcesUnavailable() + + def __exit__(self, exc_type, exc_val, exc_tb): + if not self.claim: + return + + if exc_type is None: + self.tracker.finish_resource_claim(self.claim) + else: + self.tracker.abort_resource_claim(self.context, self.claim) + + +class ResourceTracker(object): + """Compute helper class for keeping track of resource usage as instances + are built and destroyed. + """ + + def __init__(self, host, driver): + self.host = host + self.driver = driver + self.compute_node = None + self.next_claim_id = 1 + self.claims = {} + self.stats = importutils.import_object(FLAGS.compute_stats_class) + + def resource_claim(self, context, *args, **kwargs): + claim = self.begin_resource_claim(context, *args, **kwargs) + return ResourceContextManager(context, claim, self) + + def instance_resource_claim(self, context, instance_ref, *args, **kwargs): + claim = self.begin_instance_resource_claim(context, instance_ref, + *args, **kwargs) + return ResourceContextManager(context, claim, self) + + @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) + def begin_instance_resource_claim(self, context, instance_ref, *args, + **kwargs): + """Method to begin a resource claim for a new instance.""" + memory_mb = instance_ref['memory_mb'] + disk_gb = instance_ref['root_gb'] + instance_ref['ephemeral_gb'] + + claim = self._do_begin_resource_claim(context, memory_mb, disk_gb, + *args, **kwargs) + if claim: + # also update load stats related to new instances firing up - + values = self._create_load_stats(context, instance_ref) + self.compute_node = self._update(context, values) + return claim + + @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) + def begin_resource_claim(self, context, memory_mb, disk_gb, + memory_mb_limit=None, timeout=None, *args, **kwargs): + """Indicate that some resources are needed for an upcoming compute + host operation. + + This should be called any time the compute node is about to perform + an operation that will consume resources. + + :param memory_mb: security context + :param memory_mb: Memory in MB to be claimed + :param root_gb: Disk in GB to be claimed + :param memory_mb_limit: Memory in MB that is the maximum to allocate on + this node. May exceed installed physical memory if + oversubscription is the desired behavior. + :param timeout: How long, in seconds, the operation that requires + these resources should take to actually allocate what + it needs from the hypervisor. If the timeout is + exceeded, the new resource claim will assume caller + before releasing the resources. + :returns: An integer 'claim ticket'. This should be turned into + finalize a resource claim or free resources after the + compute operation is finished. Returns None if the claim + failed. + """ + + return self._do_begin_resource_claim(context, memory_mb, disk_gb, + memory_mb_limit, timeout, *args, **kwargs) + + def _do_begin_resource_claim(self, context, memory_mb, disk_gb, + memory_mb_limit=None, timeout=None, *args, **kwargs): + + if self.disabled: + return + + if not timeout: + timeout = FLAGS.claim_timeout_seconds + + memory_mb = abs(memory_mb) + disk_gb = abs(disk_gb) + + msg = _("Attempting claim: memory %(memory_mb)d MB, disk %(disk_gb)d " + "GB, mem limit %(memory_mb_limit)s") % locals() + LOG.audit(msg) + + if not memory_mb_limit: + # default to total memory: + memory_mb_limit = self.compute_node['memory_mb'] + + free_ram_mb = memory_mb_limit - self.compute_node['memory_mb_used'] + + # Installed memory and usage info: + msg = _("Total memory: %(total_mem)d MB, used: %(used_mem)d MB, free: " + "%(free_mem)d") % dict( + total_mem=self.compute_node['memory_mb'], + used_mem=self.compute_node['memory_mb_used'], + free_mem=self.compute_node['local_gb_used']) + LOG.audit(msg) + + # Oversubscribed memory policy info: + msg = _("Limit: %(memory_mb_limit)d MB, free: %(free_ram_mb)d") % \ + locals() + LOG.audit(msg) + + if memory_mb > free_ram_mb: + msg = _("Unable to claim resources. Free memory %(free_ram_mb)d " + "MB < requested memory %(memory_mb)d MB") % locals() + LOG.info(msg) + return None + + if disk_gb > self.compute_node['free_disk_gb']: + msg = _("Unable to claim resources. Free disk %(free_disk_gb)d GB" + " < requested disk %(disk_gb)d GB") % dict( + free_disk_gb=self.compute_node['free_disk_gb'], + disk_gb=disk_gb) + LOG.info(msg) + return None + + claim_id = self._get_next_id() + c = Claim(claim_id, memory_mb, disk_gb, timeout, *args, **kwargs) + + # adjust compute node usage values and save so scheduler will see it: + values = c.apply_claim(self.compute_node) + self.compute_node = self._update(context, values) + + # keep track of this claim until we know whether the compute operation + # was successful/completed: + self.claims[claim_id] = c + return c + + def finish_resource_claim(self, claim): + """Indicate that the compute operation that previously claimed the + resources identified by 'claim' has now completed and the resources + have been allocated at the virt layer. + + Calling this keeps the available resource data more accurate and + timely than letting the claim timeout elapse and waiting for + update_available_resource to reflect the changed usage data. + + :param claim: A claim indicating a set of resources that were + previously claimed. + """ + if self.disabled: + return + + if self.claims.pop(claim.claim_id, None): + LOG.info(_("Finishing claim: %s") % claim) + else: + LOG.info(_("Can't find claim %d. It may have been 'finished' " + "twice, or it has already timed out."), claim.claim_id) + + @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) + def abort_resource_claim(self, context, claim): + """Indicate that the operation that claimed the resources identified by + 'claim_id' has either failed or been aborted and the resources are no + longer needed. + + :param claim: A claim ticket indicating a set of resources that were + previously claimed. + """ + if self.disabled: + return + + # un-claim the resources: + if self.claims.pop(claim.claim_id, None): + LOG.info(_("Aborting claim: %s") % claim) + values = claim.undo_claim(self.compute_node) + self.compute_node = self._update(context, values) + else: + # can't find the claim. this may mean the claim already timed + # out or it was already explicitly finished/aborted. + LOG.info(_("Claim %d not found. It either timed out or was " + "already explicitly finished/aborted"), claim.claim_id) + + @property + def disabled(self): + return self.compute_node is None + + def free_resources(self, context): + """A compute operation finished freeing up resources. Update compute + model to reflect updated resource usage. + + (The hypervisor may not immediately 'GC' all resources, so ask directly + to see what's available to update the compute node model.) + """ + self.update_available_resource(context.elevated()) + + @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) + def update_available_resource(self, context): + """Override in-memory calculations of compute node resource usage based + on data audited from the hypervisor layer. + + Add in resource claims in progress to account for operations that have + declared a need for resources, but not necessarily retrieved them from + the hypervisor layer yet. + """ + # ask hypervisor for its view of resource availability & + # usage: + resources = self.driver.get_available_resource() + if not resources: + # The virt driver does not support this function + LOG.warn(_("Virt driver does not support " + "'get_available_resource' Compute tracking is disabled.")) + self.compute_node = None + self.claims = {} + return + + # Confirm resources dictionary contains expected keys: + self._verify_resources(resources) + + resources['free_ram_mb'] = resources['memory_mb'] - \ + resources['memory_mb_used'] + resources['free_disk_gb'] = resources['local_gb'] - \ + resources['local_gb_used'] + + LOG.audit(_("free_ram_mb: %s") % resources['free_ram_mb']) + LOG.audit(_("free_disk_gb: %s") % resources['free_disk_gb']) + # Apply resource claims representing in-progress operations to + # 'resources'. This may over-estimate the amount of resources in use, + # at least until the next time 'update_available_resource' runs. + self._apply_claims(resources) + + # also generate all load stats: + values = self._create_load_stats(context) + resources.update(values) + + if not self.compute_node: + # we need a copy of the ComputeNode record: + service = self._get_service(context) + if not service: + # no service record, disable resource + return + + compute_node_ref = service['compute_node'] + if compute_node_ref: + self.compute_node = compute_node_ref[0] + + if not self.compute_node: + # Need to create the ComputeNode record: + resources['service_id'] = service['id'] + self.compute_node = self._create(context, resources) + LOG.info(_('Compute_service record created for %s ') % self.host) + + else: + # just update the record: + self.compute_node = self._update(context, resources, + prune_stats=True) + LOG.info(_('Compute_service record updated for %s ') % self.host) + + @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) + def update_load_stats_for_instance(self, context, old_ref, instance_ref): + """Update workload stats for the local compute host.""" + + if self.disabled: + return + + values = {} + self.stats.update_stats_for_instance(old_ref, instance_ref) + values['stats'] = self.stats + + values['current_workload'] = self.stats.calculate_workload() + values['running_vms'] = self.stats.num_instances + values['vcpus_used'] = self.stats.num_vcpus_used + + self.compute_node = self._update(context.elevated(), values) + + def _apply_claims(self, resources): + """Apply in-progress resource claims to the 'resources' dict from the + virt layer + """ + for claim_id in self.claims.keys(): + c = self.claims[claim_id] + if c.is_expired(): + # if this claim is expired, just expunge it + LOG.info(_("Expiring resource claim %d"), claim_id) + self.claims.pop(claim_id) + else: + values = c.apply_claim(resources) + resources.update(values) + + def _create(self, context, values): + """Create the compute node in the DB""" + # initialize load stats from existing instances: + compute_node = db.compute_node_create(context, values) + return compute_node + + def _create_load_stats(self, context, instance=None): + """For each existing instance generate load stats for the compute + node record. + """ + values = {} + + if instance: + instances = [instance] + else: + self.stats.clear() # re-generating all, so clear old stats + + # grab all instances that are not yet DELETED + filters = {'host': self.host, 'deleted': False} + instances = db.instance_get_all_by_filters(context, + {'host': self.host}) + + for instance in instances: + self.stats.add_stats_for_instance(instance) + + values['current_workload'] = self.stats.calculate_workload() + values['running_vms'] = self.stats.num_instances + values['vcpus_used'] = self.stats.num_vcpus_used + values['stats'] = self.stats + return values + + def _get_next_id(self): + next_id = self.next_claim_id + self.next_claim_id += 1 + return next_id + + def _get_service(self, context): + try: + return db.service_get_all_compute_by_host(context, + self.host)[0] + except exception.NotFound: + LOG.warn(_("No service record for host %s"), self.host) + + def _update(self, context, values, prune_stats=False): + """Persist the compute node updates to the DB""" + return db.compute_node_update(context, self.compute_node['id'], + values, prune_stats) + + def _verify_resources(self, resources): + resource_keys = ["vcpus", "memory_mb", "local_gb", "cpu_info", + "vcpus_used", "memory_mb_used", "local_gb_used"] + + missing_keys = [k for k in resource_keys if k not in resources] + if missing_keys: + reason = _("Missing keys: %s") % missing_keys + raise exception.InvalidInput(reason=reason) diff --git a/nova/compute/stats.py b/nova/compute/stats.py new file mode 100644 index 000000000..f67615028 --- /dev/null +++ b/nova/compute/stats.py @@ -0,0 +1,118 @@ +# Copyright (c) 2012 OpenStack, LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from nova.compute import task_states +from nova.compute import vm_states +from nova import db +from nova.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + + +class Stats(dict): + """Handler for updates to compute node workload stats.""" + + def add_stats_for_instance(self, instance): + self._increment("num_task_%s" % instance['task_state']) + + self._increment("num_vm_%s" % instance['vm_state']) + self._increment("num_instances") + + os_type = instance['os_type'] + self._increment("num_os_type_%s" % os_type) + + proj_id = instance['project_id'] + self._increment("num_proj_%s" % proj_id) + + x = self.get("num_vcpus_used", 0) + self["num_vcpus_used"] = x + instance["vcpus"] + + def calculate_io_workload(self): + """Calculate an I/O based load by counting I/O heavy operations""" + + def _get(state, state_type): + key = "num_%s_%s" % (state_type, state) + return self.get(key, 0) + + num_builds = _get(vm_states.BUILDING, "vm") + num_migrations = _get(task_states.RESIZE_MIGRATING, "task") + num_rebuilds = _get(task_states.REBUILDING, "task") + num_resizes = _get(task_states.RESIZE_PREP, "task") + num_snapshots = _get(task_states.IMAGE_SNAPSHOT, "task") + num_backups = _get(task_states.IMAGE_BACKUP, "task") + + return (num_builds + num_rebuilds + num_resizes + num_migrations + + num_snapshots + num_backups) + + def calculate_workload(self): + """Calculate current load of the compute host based on + task states. + """ + current_workload = 0 + for k in self: + if k.startswith("num_task") and not k.endswith("None"): + current_workload += self[k] + return current_workload + + @property + def num_instances(self): + return self.get("num_instances", 0) + + def num_instances_for_project(self, project_id): + key = "num_proj_%s" % project_id + return self.get(key, 0) + + def num_os_type(self, os_type): + key = "num_os_type_%s" % os_type + return self.get(key, 0) + + @property + def num_vcpus_used(self): + return self.get("num_vcpus_used", 0) + + def update_stats_for_instance(self, old_instance, instance): + """Update stats after an instance is changed.""" + + old_vm_state = old_instance['vm_state'] + new_vm_state = instance['vm_state'] + + if old_vm_state != new_vm_state: + self._decrement("num_vm_%s" % old_vm_state) + self._increment("num_vm_%s" % new_vm_state) + + if new_vm_state == vm_states.DELETED: + self._decrement("num_instances") + + self._decrement("num_os_type_%s" % old_instance['os_type']) + + self._decrement("num_proj_%s" % old_instance["project_id"]) + + x = self.get("num_vcpus_used", 0) + self["num_vcpus_used"] = x - old_instance['vcpus'] + + old_task_state = old_instance['task_state'] + new_task_state = instance['task_state'] + + if old_task_state != new_task_state: + self._decrement("num_task_%s" % old_task_state) + self._increment("num_task_%s" % new_task_state) + + def _decrement(self, key): + x = self.get(key, 0) + self[key] = x - 1 + + def _increment(self, key): + x = self.get(key, 0) + self[key] = x + 1 diff --git a/nova/db/api.py b/nova/db/api.py index 94fdd8ce2..c36606a5c 100644 --- a/nova/db/api.py +++ b/nova/db/api.py @@ -208,31 +208,18 @@ def compute_node_create(context, values): return IMPL.compute_node_create(context, values) -def compute_node_update(context, compute_id, values, auto_adjust=True): +def compute_node_update(context, compute_id, values, prune_stats=False): """Set the given properties on a computeNode and update it. Raises NotFound if computeNode does not exist. """ - return IMPL.compute_node_update(context, compute_id, values, auto_adjust) + return IMPL.compute_node_update(context, compute_id, values, prune_stats) def compute_node_get_by_host(context, host): return IMPL.compute_node_get_by_host(context, host) -def compute_node_utilization_update(context, host, free_ram_mb_delta=0, - free_disk_gb_delta=0, work_delta=0, vm_delta=0): - return IMPL.compute_node_utilization_update(context, host, - free_ram_mb_delta, free_disk_gb_delta, work_delta, - vm_delta) - - -def compute_node_utilization_set(context, host, free_ram_mb=None, - free_disk_gb=None, work=None, vms=None): - return IMPL.compute_node_utilization_set(context, host, free_ram_mb, - free_disk_gb, work, vms) - - def compute_node_statistics(context): return IMPL.compute_node_statistics(context) diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py index d15b7b353..c85dfc723 100644 --- a/nova/db/sqlalchemy/api.py +++ b/nova/db/sqlalchemy/api.py @@ -485,6 +485,7 @@ def service_update(context, service_id, values): def compute_node_get(context, compute_id, session=None): result = model_query(context, models.ComputeNode, session=session).\ filter_by(id=compute_id).\ + options(joinedload('stats')).\ first() if not result: @@ -497,6 +498,7 @@ def compute_node_get(context, compute_id, session=None): def compute_node_get_all(context, session=None): return model_query(context, models.ComputeNode, session=session).\ options(joinedload('service')).\ + options(joinedload('stats')).\ all() @@ -509,42 +511,27 @@ def compute_node_search_by_hypervisor(context, hypervisor_match): all() -def _get_host_utilization(context, host, ram_mb, disk_gb): - """Compute the current utilization of a given host.""" - instances = instance_get_all_by_host(context, host) - vms = len(instances) - free_ram_mb = ram_mb - FLAGS.reserved_host_memory_mb - free_disk_gb = disk_gb - (FLAGS.reserved_host_disk_mb * 1024) - - work = 0 - for instance in instances: - free_ram_mb -= instance.memory_mb - free_disk_gb -= instance.root_gb - free_disk_gb -= instance.ephemeral_gb - if instance.task_state is not None: - work += 1 - return dict(free_ram_mb=free_ram_mb, - free_disk_gb=free_disk_gb, - current_workload=work, - running_vms=vms) - - -def _adjust_compute_node_values_for_utilization(context, values, session): - service_ref = service_get(context, values['service_id'], session=session) - host = service_ref['host'] - ram_mb = values['memory_mb'] - disk_gb = values['local_gb'] - values.update(_get_host_utilization(context, host, ram_mb, disk_gb)) +def _prep_stats_dict(values): + """Make list of ComputeNodeStats""" + stats = [] + d = values.get('stats', {}) + for k, v in d.iteritems(): + stat = models.ComputeNodeStat() + stat['key'] = k + stat['value'] = v + stats.append(stat) + values['stats'] = stats @require_admin_context def compute_node_create(context, values, session=None): """Creates a new ComputeNode and populates the capacity fields with the most recent data.""" + _prep_stats_dict(values) + if not session: session = get_session() - _adjust_compute_node_values_for_utilization(context, values, session) with session.begin(subtransactions=True): compute_node_ref = models.ComputeNode() session.add(compute_node_ref) @@ -552,17 +539,52 @@ def compute_node_create(context, values, session=None): return compute_node_ref +def _update_stats(context, new_stats, compute_id, session, prune_stats=False): + + existing = model_query(context, models.ComputeNodeStat, session=session, + read_deleted="no").filter_by(compute_node_id=compute_id).all() + statmap = {} + for stat in existing: + key = stat['key'] + statmap[key] = stat + + stats = [] + for k, v in new_stats.iteritems(): + old_stat = statmap.pop(k, None) + if old_stat: + # update existing value: + old_stat.update({'value': v}) + stats.append(old_stat) + else: + # add new stat: + stat = models.ComputeNodeStat() + stat['compute_node_id'] = compute_id + stat['key'] = k + stat['value'] = v + stats.append(stat) + + if prune_stats: + # prune un-touched old stats: + for stat in statmap.values(): + session.add(stat) + stat.update({'deleted': True}) + + # add new and updated stats + for stat in stats: + session.add(stat) + + @require_admin_context -def compute_node_update(context, compute_id, values, auto_adjust): - """Creates a new ComputeNode and populates the capacity fields - with the most recent data.""" +def compute_node_update(context, compute_id, values, prune_stats=False): + """Updates the ComputeNode record with the most recent data""" + stats = values.pop('stats', {}) + session = get_session() - if auto_adjust: - _adjust_compute_node_values_for_utilization(context, values, session) with session.begin(subtransactions=True): + _update_stats(context, stats, compute_id, session, prune_stats) compute_ref = compute_node_get(context, compute_id, session=session) compute_ref.update(values) - compute_ref.save(session=session) + return compute_ref def compute_node_get_by_host(context, host): @@ -576,71 +598,6 @@ def compute_node_get_by_host(context, host): return node.first() -def compute_node_utilization_update(context, host, free_ram_mb_delta=0, - free_disk_gb_delta=0, work_delta=0, vm_delta=0): - """Update a specific ComputeNode entry by a series of deltas. - Do this as a single atomic action and lock the row for the - duration of the operation. Requires that ComputeNode record exist.""" - session = get_session() - compute_node = None - with session.begin(subtransactions=True): - compute_node = session.query(models.ComputeNode).\ - options(joinedload('service')).\ - filter(models.Service.host == host).\ - filter_by(deleted=False).\ - with_lockmode('update').\ - first() - if compute_node is None: - raise exception.NotFound(_("No ComputeNode for %(host)s") % - locals()) - - # This table thingy is how we get atomic UPDATE x = x + 1 - # semantics. - table = models.ComputeNode.__table__ - if free_ram_mb_delta != 0: - compute_node.free_ram_mb = table.c.free_ram_mb + free_ram_mb_delta - if free_disk_gb_delta != 0: - compute_node.free_disk_gb = (table.c.free_disk_gb + - free_disk_gb_delta) - if work_delta != 0: - compute_node.current_workload = (table.c.current_workload + - work_delta) - if vm_delta != 0: - compute_node.running_vms = table.c.running_vms + vm_delta - return compute_node - - -def compute_node_utilization_set(context, host, free_ram_mb=None, - free_disk_gb=None, work=None, vms=None): - """Like compute_node_utilization_update() modify a specific host - entry. But this function will set the metrics absolutely - (vs. a delta update). - """ - session = get_session() - compute_node = None - with session.begin(subtransactions=True): - compute_node = session.query(models.ComputeNode).\ - options(joinedload('service')).\ - filter(models.Service.host == host).\ - filter_by(deleted=False).\ - with_lockmode('update').\ - first() - if compute_node is None: - raise exception.NotFound(_("No ComputeNode for %(host)s") % - locals()) - - if free_ram_mb is not None: - compute_node.free_ram_mb = free_ram_mb - if free_disk_gb is not None: - compute_node.free_disk_gb = free_disk_gb - if work is not None: - compute_node.current_workload = work - if vms is not None: - compute_node.running_vms = vms - - return compute_node - - def compute_node_statistics(context): """Compute statistics over all compute nodes.""" result = model_query(context, diff --git a/nova/db/sqlalchemy/migrate_repo/versions/117_add_compute_node_stats.py b/nova/db/sqlalchemy/migrate_repo/versions/117_add_compute_node_stats.py new file mode 100644 index 000000000..5b0e19660 --- /dev/null +++ b/nova/db/sqlalchemy/migrate_repo/versions/117_add_compute_node_stats.py @@ -0,0 +1,61 @@ +# Copyright (c) 2012 OpenStack, LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from sqlalchemy import Boolean, Column, DateTime, Integer +from sqlalchemy import Index, MetaData, String, Table +from nova.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + + +def upgrade(migrate_engine): + meta = MetaData() + meta.bind = migrate_engine + + # load tables for fk + compute_nodes = Table('compute_nodes', meta, autoload=True) + + # create new table + compute_node_stats = Table('compute_node_stats', meta, + Column('created_at', DateTime(timezone=False)), + Column('updated_at', DateTime(timezone=False)), + Column('deleted_at', DateTime(timezone=False)), + Column('deleted', Boolean(create_constraint=True, name=None)), + Column('id', Integer(), primary_key=True, nullable=False, + autoincrement=True), + Column('compute_node_id', Integer, index=True, nullable=False), + Column('key', String(length=255, convert_unicode=True, + assert_unicode=None, unicode_error=None, + _warn_on_bytestring=False), nullable=False), + Column('value', String(length=255, convert_unicode=True, + assert_unicode=None, unicode_error=None, + _warn_on_bytestring=False)), + mysql_engine='InnoDB') + try: + compute_node_stats.create() + except Exception: + LOG.exception("Exception while creating table 'compute_node_stats'") + raise + + +def downgrade(migrate_engine): + meta = MetaData() + meta.bind = migrate_engine + + # load tables for fk + compute_nodes = Table('compute_nodes', meta, autoload=True) + + compute_node_stats = Table('compute_node_stats', meta, autoload=True) + compute_node_stats.drop() diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py index 7d4435a7a..a37b1c215 100644 --- a/nova/db/sqlalchemy/models.py +++ b/nova/db/sqlalchemy/models.py @@ -144,8 +144,6 @@ class ComputeNode(BASE, NovaBase): # Free Ram, amount of activity (resize, migration, boot, etc) and # the number of running VM's are a good starting point for what's # important when making scheduling decisions. - # - # NOTE(sandy): We'll need to make this extensible for other schedulers. free_ram_mb = Column(Integer) free_disk_gb = Column(Integer) current_workload = Column(Integer) @@ -165,6 +163,24 @@ class ComputeNode(BASE, NovaBase): disk_available_least = Column(Integer) +class ComputeNodeStat(BASE, NovaBase): + """Stats related to the current workload of a compute host that are + intended to aid in making scheduler decisions.""" + __tablename__ = 'compute_node_stats' + id = Column(Integer, primary_key=True) + key = Column(String(511)) + value = Column(String(255)) + compute_node_id = Column(Integer, ForeignKey('compute_nodes.id')) + + primary_join = ('and_(ComputeNodeStat.compute_node_id == ' + 'ComputeNode.id, ComputeNodeStat.deleted == False)') + stats = relationship("ComputeNode", backref="stats", + primaryjoin=primary_join) + + def __str__(self): + return "{%d: %s = %s}" % (self.compute_node_id, self.key, self.value) + + class Certificate(BASE, NovaBase): """Represents a x509 certificate""" __tablename__ = 'certificates' diff --git a/nova/exception.py b/nova/exception.py index 5d232bdd8..292f8d6ca 100644 --- a/nova/exception.py +++ b/nova/exception.py @@ -351,6 +351,10 @@ class ServiceUnavailable(Invalid): message = _("Service is unavailable at this time.") +class ComputeResourcesUnavailable(ServiceUnavailable): + message = _("Insufficient compute resources.") + + class ComputeServiceUnavailable(ServiceUnavailable): message = _("Compute service is unavailable at this time.") diff --git a/nova/scheduler/filters/ram_filter.py b/nova/scheduler/filters/ram_filter.py index c2bcfbf7f..6ae9c654a 100644 --- a/nova/scheduler/filters/ram_filter.py +++ b/nova/scheduler/filters/ram_filter.py @@ -38,12 +38,18 @@ class RamFilter(filters.BaseHostFilter): requested_ram = instance_type['memory_mb'] free_ram_mb = host_state.free_ram_mb total_usable_ram_mb = host_state.total_usable_ram_mb + + oversubscribed_ram_limit_mb = (total_usable_ram_mb * + FLAGS.ram_allocation_ratio) used_ram_mb = total_usable_ram_mb - free_ram_mb - usable_ram = (total_usable_ram_mb * FLAGS.ram_allocation_ratio - - used_ram_mb) + usable_ram = oversubscribed_ram_limit_mb - used_ram_mb if not usable_ram >= requested_ram: LOG.debug(_("%(host_state)s does not have %(requested_ram)s MB " "usable ram, it only has %(usable_ram)s MB usable ram."), locals()) return False + + # save oversubscribe ram limit so the compute host can verify + # memory availability on builds: + filter_properties['memory_limit_mb'] = oversubscribed_ram_limit_mb return True diff --git a/nova/scheduler/host_manager.py b/nova/scheduler/host_manager.py index fb683d91f..f373a2ef3 100644 --- a/nova/scheduler/host_manager.py +++ b/nova/scheduler/host_manager.py @@ -120,19 +120,26 @@ class HostState(object): """Update information about a host from its compute_node info.""" all_disk_mb = compute['local_gb'] * 1024 all_ram_mb = compute['memory_mb'] - vcpus_total = compute['vcpus'] + + free_disk_mb = compute['free_disk_gb'] * 1024 + free_ram_mb = compute['free_ram_mb'] + if FLAGS.reserved_host_disk_mb > 0: all_disk_mb -= FLAGS.reserved_host_disk_mb + free_disk_mb -= FLAGS.reserved_host_disk_mb if FLAGS.reserved_host_memory_mb > 0: all_ram_mb -= FLAGS.reserved_host_memory_mb + free_ram_mb -= FLAGS.reserved_host_memory_mb + #NOTE(jogo) free_ram_mb can be negative - self.free_ram_mb = all_ram_mb + self.free_ram_mb = free_ram_mb self.total_usable_ram_mb = all_ram_mb - self.free_disk_mb = all_disk_mb - self.vcpus_total = vcpus_total + self.free_disk_mb = free_disk_mb + self.vcpus_total = compute['vcpus'] + self.vcpus_used = compute['vcpus_used'] def consume_from_instance(self, instance): - """Update information about a host from instance info.""" + """Incrementally update host state from an instance""" disk_mb = (instance['root_gb'] + instance['ephemeral_gb']) * 1024 ram_mb = instance['memory_mb'] vcpus = instance['vcpus'] @@ -252,7 +259,7 @@ class HostManager(object): host_state_map = {} - # Make a compute node dict with the bare essential metrics. + # Get resource usage across the available compute nodes: compute_nodes = db.compute_node_get_all(context) for compute in compute_nodes: service = compute['service'] @@ -267,15 +274,4 @@ class HostManager(object): host_state.update_from_compute_node(compute) host_state_map[host] = host_state - # "Consume" resources from the host the instance resides on. - instances = db.instance_get_all(context, - columns_to_join=['instance_type']) - for instance in instances: - host = instance['host'] - if not host: - continue - host_state = host_state_map.get(host, None) - if not host_state: - continue - host_state.consume_from_instance(instance) return host_state_map diff --git a/nova/tests/api/openstack/compute/contrib/test_hosts.py b/nova/tests/api/openstack/compute/contrib/test_hosts.py index ea23cd5c2..570b13473 100644 --- a/nova/tests/api/openstack/compute/contrib/test_hosts.py +++ b/nova/tests/api/openstack/compute/contrib/test_hosts.py @@ -214,7 +214,7 @@ class HostTestCase(test.TestCase): 'vcpus': 16, 'memory_mb': 32, 'local_gb': 100, 'vcpus_used': 16, 'memory_mb_used': 32, 'local_gb_used': 10, 'hypervisor_type': 'qemu', 'hypervisor_version': 12003, - 'cpu_info': ''} + 'cpu_info': '', 'stats': {}} db.compute_node_create(ctxt, dic) return db.service_get(ctxt, s_ref['id']) diff --git a/nova/tests/compute/fake_resource_tracker.py b/nova/tests/compute/fake_resource_tracker.py new file mode 100644 index 000000000..faba13bab --- /dev/null +++ b/nova/tests/compute/fake_resource_tracker.py @@ -0,0 +1,35 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2012 OpenStack, LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from nova.compute import resource_tracker + + +class FakeResourceTracker(resource_tracker.ResourceTracker): + """Version without a DB requirement""" + + def _create(self, context, values): + return values + + def _update(self, context, values, prune_stats=False): + self.compute_node.update(values) + return self.compute_node + + def _get_service(self, context): + return { + "id": 1, + "compute_node": None + } diff --git a/nova/tests/compute/test_compute.py b/nova/tests/compute/test_compute.py index b6d775bd2..dc2d43394 100644 --- a/nova/tests/compute/test_compute.py +++ b/nova/tests/compute/test_compute.py @@ -53,6 +53,7 @@ import nova.policy from nova import quota from nova.scheduler import driver as scheduler_driver from nova import test +from nova.tests.compute import fake_resource_tracker from nova.tests.db.fakes import FakeModel from nova.tests import fake_network from nova.tests.image import fake as fake_image @@ -92,6 +93,13 @@ class BaseTestCase(test.TestCase): network_manager='nova.network.manager.FlatManager') self.compute = importutils.import_object(FLAGS.compute_manager) + # override tracker with a version that doesn't need the database: + self.compute.resource_tracker = \ + fake_resource_tracker.FakeResourceTracker(self.compute.host, + self.compute.driver) + self.compute.update_available_resource( + context.get_admin_context()) + self.user_id = 'fake' self.project_id = 'fake' self.context = context.RequestContext(self.user_id, @@ -140,6 +148,7 @@ class BaseTestCase(test.TestCase): inst['root_gb'] = 0 inst['ephemeral_gb'] = 0 inst['architecture'] = 'x86_64' + inst['os_type'] = 'Linux' inst.update(params) return db.instance_create(self.context, inst) @@ -261,6 +270,87 @@ class ComputeTestCase(BaseTestCase): finally: db.instance_destroy(self.context, instance['uuid']) + def test_create_instance_insufficient_memory(self): + params = {"memory_mb": 999999999999} + instance = self._create_fake_instance(params) + self.assertRaises(exception.ComputeResourcesUnavailable, + self.compute.run_instance, self.context, instance=instance) + + def test_create_instance_insufficient_disk(self): + params = {"root_gb": 999999999999, + "ephemeral_gb": 99999999999} + instance = self._create_fake_instance(params) + self.assertRaises(exception.ComputeResourcesUnavailable, + self.compute.run_instance, self.context, instance=instance) + + def test_create_multiple_instances_then_starve(self): + params = {"memory_mb": 1024, "root_gb": 128, "ephemeral_gb": 128} + instance = self._create_fake_instance(params) + self.compute.run_instance(self.context, instance=instance) + self.assertEquals(1024, + self.compute.resource_tracker.compute_node['memory_mb_used']) + self.assertEquals(256, + self.compute.resource_tracker.compute_node['local_gb_used']) + + params = {"memory_mb": 2048, "root_gb": 256, "ephemeral_gb": 256} + instance = self._create_fake_instance(params) + self.compute.run_instance(self.context, instance=instance) + self.assertEquals(3072, + self.compute.resource_tracker.compute_node['memory_mb_used']) + self.assertEquals(768, + self.compute.resource_tracker.compute_node['local_gb_used']) + + params = {"memory_mb": 8192, "root_gb": 8192, "ephemeral_gb": 8192} + instance = self._create_fake_instance(params) + self.assertRaises(exception.ComputeResourcesUnavailable, + self.compute.run_instance, self.context, instance=instance) + + def test_create_instance_with_oversubscribed_ram(self): + """Test passing of oversubscribed ram policy from the scheduler.""" + + # get total memory as reported by virt driver: + resources = self.compute.driver.get_available_resource() + total_mem_mb = resources['memory_mb'] + + oversub_limit_mb = total_mem_mb * 1.5 + instance_mb = int(total_mem_mb * 1.45) + + # build an instance, specifying an amount of memory that exceeds + # total_mem_mb, but is less than the oversubscribed limit: + params = {"memory_mb": instance_mb, "root_gb": 128, + "ephemeral_gb": 128} + instance = self._create_fake_instance(params) + + filter_properties = dict(memory_mb_limit=oversub_limit_mb) + self.compute.run_instance(self.context, instance=instance, + filter_properties=filter_properties) + + self.assertEqual(instance_mb, + self.compute.resource_tracker.compute_node['memory_mb_used']) + + def test_create_instance_with_oversubscribed_ram_fail(self): + """Test passing of oversubscribed ram policy from the scheduler, but + with insufficient memory. + """ + # get total memory as reported by virt driver: + resources = self.compute.driver.get_available_resource() + total_mem_mb = resources['memory_mb'] + + oversub_limit_mb = total_mem_mb * 1.5 + instance_mb = int(total_mem_mb * 1.55) + + # build an instance, specifying an amount of memory that exceeds + # total_mem_mb, but is less than the oversubscribed limit: + params = {"memory_mb": instance_mb, "root_gb": 128, + "ephemeral_gb": 128} + instance = self._create_fake_instance(params) + + filter_properties = dict(memory_mb_limit=oversub_limit_mb) + + self.assertRaises(exception.ComputeResourcesUnavailable, + self.compute.run_instance, self.context, instance=instance, + filter_properties=filter_properties) + def test_default_access_ip(self): self.flags(default_access_ip_network_name='test1', stub_network=False) instance = jsonutils.to_primitive(self._create_fake_instance()) @@ -3986,6 +4076,7 @@ class ComputeAPITestCase(BaseTestCase): instance=jsonutils.to_primitive(instance)) instance = self.compute_api.get(self.context, instance['uuid']) security_group_name = self._create_group()['name'] + self.security_group_api.add_to_instance(self.context, instance, security_group_name) diff --git a/nova/tests/compute/test_resource_tracker.py b/nova/tests/compute/test_resource_tracker.py new file mode 100644 index 000000000..cba2a6c17 --- /dev/null +++ b/nova/tests/compute/test_resource_tracker.py @@ -0,0 +1,492 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2012 OpenStack, LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Tests for compute resource tracking""" + +import copy + +from nova.compute import resource_tracker +from nova.compute import task_states +from nova.compute import vm_states +from nova import db +from nova import exception +from nova.openstack.common import timeutils +from nova import test +from nova.virt import driver + + +class FakeContext(object): + def __init__(self, is_admin=False): + self.is_admin = is_admin + + def elevated(self): + return FakeContext(is_admin=True) + + +class UnsupportedVirtDriver(driver.ComputeDriver): + """Pretend version of a lame virt driver""" + def get_available_resource(self): + # no support for getting resource usage info + return {} + + +class FakeVirtDriver(driver.ComputeDriver): + + def __init__(self): + self.memory_mb = 5 + self.local_gb = 6 + self.vcpus = 1 + + self.memory_mb_used = 0 + self.local_gb_used = 0 + + def get_available_resource(self): + d = { + 'vcpus': self.vcpus, + 'memory_mb': self.memory_mb, + 'local_gb': self.local_gb, + 'vcpus_used': 0, + 'memory_mb_used': self.memory_mb_used, + 'local_gb_used': self.local_gb_used, + 'hypervisor_type': 'fake', + 'hypervisor_version': 0, + 'hypervisor_hostname': 'fakehost', + 'cpu_info': '', + } + return d + + +class BaseTestCase(test.TestCase): + + def setUp(self): + super(BaseTestCase, self).setUp() + + self.context = FakeContext() + + self.instance_ref = { + "memory_mb": 1, + "root_gb": 1, + "ephemeral_gb": 1, + "vm_state": vm_states.BUILDING, + "task_state": None, + "os_type": "Linux", + "project_id": "1234", + "vcpus": 1, + } + + self.stubs.Set(db, 'instance_get_all_by_filters', + self._fake_instance_get_all_by_filters) + + def _create_compute_node(self, values=None): + compute = { + "id": 1, + "service_id": 1, + "vcpus": 1, + "memory_mb": 1, + "local_gb": 1, + "vcpus_used": 1, + "memory_mb_used": 1, + "local_gb_used": 1, + "free_ram_mb": 1, + "free_disk_gb": 1, + "current_workload": 1, + "running_vms": 0, + "cpu_info": None, + "stats": [{"key": "num_instances", "value": "1"}] + } + if values: + compute.update(values) + return compute + + def _create_service(self, host="fakehost", compute=None): + if compute: + compute = [compute] + + service = { + "id": 1, + "host": host, + "binary": "nova-compute", + "topic": "compute", + "compute_node": compute, + } + return service + + def _fake_instance_get_all_by_filters(self, ctx, filters, **kwargs): + return [] + + def _tracker(self, unsupported=False): + host = "fakehost" + + if unsupported: + driver = UnsupportedVirtDriver() + else: + driver = FakeVirtDriver() + + tracker = resource_tracker.ResourceTracker(host, driver) + return tracker + + +class UnsupportedDriverTestCase(BaseTestCase): + """Resource tracking should be disabled when the virt driver doesn't + support it. + """ + def setUp(self): + super(UnsupportedDriverTestCase, self).setUp() + self.tracker = self._tracker(unsupported=True) + # seed tracker with data: + self.tracker.update_available_resource(self.context) + + def testDisabled(self): + # disabled = no compute node stats + self.assertTrue(self.tracker.disabled) + self.assertEqual(None, self.tracker.compute_node) + + def testDisabledClaim(self): + # basic claim: + claim = self.tracker.begin_resource_claim(self.context, 1, 1) + self.assertEqual(None, claim) + + def testDisabledContextClaim(self): + # basic context manager variation: + with self.tracker.resource_claim(self.context, 1, 1): + pass + self.assertEqual(0, len(self.tracker.claims)) + + def testDisabledInstanceClaim(self): + # instance variation: + claim = self.tracker.begin_instance_resource_claim(self.context, + self.instance_ref) + self.assertEqual(None, claim) + + def testDisabledInstanceContextClaim(self): + # instance context manager variation: + with self.tracker.instance_resource_claim(self.context, + self.instance_ref): + pass + self.assertEqual(0, len(self.tracker.claims)) + + def testDisabledFinishClaim(self): + self.assertEqual(None, self.tracker.finish_resource_claim(None)) + + def testDisabledAbortClaim(self): + self.assertEqual(None, self.tracker.abort_resource_claim(self.context, + None)) + + def testDisabledFreeResources(self): + self.tracker.free_resources(self.context) + self.assertTrue(self.tracker.disabled) + self.assertEqual(None, self.tracker.compute_node) + + +class MissingServiceTestCase(BaseTestCase): + def setUp(self): + super(MissingServiceTestCase, self).setUp() + self.context = FakeContext(is_admin=True) + self.tracker = self._tracker() + + def testMissingService(self): + """No service record in DB.""" + self.tracker.update_available_resource(self.context) + self.assertTrue(self.tracker.disabled) + + +class MissingComputeNodeTestCase(BaseTestCase): + def setUp(self): + super(MissingComputeNodeTestCase, self).setUp() + self.tracker = self._tracker() + + self.stubs.Set(db, 'service_get_all_compute_by_host', + self._fake_service_get_all_compute_by_host) + self.stubs.Set(db, 'compute_node_create', + self._fake_create_compute_node) + + def _fake_create_compute_node(self, context, values): + self.created = True + return self._create_compute_node() + + def _fake_service_get_all_compute_by_host(self, ctx, host): + # return a service with no joined compute + service = self._create_service() + return [service] + + def testCreatedComputeNode(self): + self.tracker.update_available_resource(self.context) + self.assertTrue(self.created) + + def testEnabled(self): + self.tracker.update_available_resource(self.context) + self.assertFalse(self.tracker.disabled) + + +class ResourceTestCase(BaseTestCase): + def setUp(self): + super(ResourceTestCase, self).setUp() + self.tracker = self._tracker() + self.stubs.Set(db, 'service_get_all_compute_by_host', + self._fake_service_get_all_compute_by_host) + self.stubs.Set(db, 'compute_node_update', + self._fake_compute_node_update) + + self.tracker.update_available_resource(self.context) + + def _fake_service_get_all_compute_by_host(self, ctx, host): + self.compute = self._create_compute_node() + self.service = self._create_service(host, compute=self.compute) + return [self.service] + + def _fake_compute_node_update(self, ctx, compute_node_id, values, + prune_stats=False): + self.updated = True + values['stats'] = [{"key": "num_instances", "value": "1"}] + self.compute.update(values) + return self.compute + + def testFreeRamResourceValue(self): + driver = FakeVirtDriver() + mem_free = driver.memory_mb - driver.memory_mb_used + self.assertEqual(mem_free, self.tracker.compute_node['free_ram_mb']) + + def testFreeDiskResourceValue(self): + driver = FakeVirtDriver() + mem_free = driver.local_gb - driver.local_gb_used + self.assertEqual(mem_free, self.tracker.compute_node['free_disk_gb']) + + def testUpdateComputeNode(self): + self.assertFalse(self.tracker.disabled) + self.assertTrue(self.updated) + + def testInsufficientMemoryClaim(self): + """Exceed memory limit of 5MB""" + claim = self.tracker.begin_resource_claim(self.context, memory_mb=2, + disk_gb=0) + self.assertNotEqual(None, claim) + + claim = self.tracker.begin_resource_claim(self.context, memory_mb=3, + disk_gb=0) + self.assertNotEqual(None, claim) + + claim = self.tracker.begin_resource_claim(self.context, memory_mb=1, + disk_gb=0) + self.assertEqual(None, claim) + + def testInsufficientMemoryClaimWithOversubscription(self): + """Exceed oversubscribed memory limit of 10MB""" + claim = self.tracker.begin_resource_claim(self.context, memory_mb=10, + disk_gb=0, memory_mb_limit=10) + self.assertNotEqual(None, claim) + + claim = self.tracker.begin_resource_claim(self.context, memory_mb=1, + disk_gb=0, memory_mb_limit=10) + self.assertEqual(None, claim) + + def testInsufficientDiskClaim(self): + """Exceed disk limit of 5GB""" + claim = self.tracker.begin_resource_claim(self.context, memory_mb=0, + disk_gb=2) + self.assertNotEqual(None, claim) + + claim = self.tracker.begin_resource_claim(self.context, memory_mb=0, + disk_gb=3) + self.assertNotEqual(None, claim) + + claim = self.tracker.begin_resource_claim(self.context, memory_mb=0, + disk_gb=5) + self.assertEqual(None, claim) + + def testClaimAndFinish(self): + self.assertEqual(5, self.tracker.compute_node['memory_mb']) + self.assertEqual(0, self.tracker.compute_node['memory_mb_used']) + + self.assertEqual(6, self.tracker.compute_node['local_gb']) + self.assertEqual(0, self.tracker.compute_node['local_gb_used']) + + claim_mem = 3 + claim_disk = 2 + claim = self.tracker.begin_resource_claim(self.context, claim_mem, + claim_disk) + + self.assertEqual(5, self.compute["memory_mb"]) + self.assertEqual(claim_mem, self.compute["memory_mb_used"]) + self.assertEqual(5 - claim_mem, self.compute["free_ram_mb"]) + + self.assertEqual(6, self.compute["local_gb"]) + self.assertEqual(claim_disk, self.compute["local_gb_used"]) + self.assertEqual(6 - claim_disk, self.compute["free_disk_gb"]) + + # 1st pretend that the compute operation finished and claimed the + # desired resources from the virt layer + driver = self.tracker.driver + driver.memory_mb_used = claim_mem + driver.local_gb_used = claim_disk + + # 2nd update compute node from the virt layer. because the claim is + # in-progress (unfinished), the audit will actually mark the resources + # as unsubscribed: + self.tracker.update_available_resource(self.context) + + self.assertEqual(2 * claim_mem, + self.compute['memory_mb_used']) + self.assertEqual(5 - (2 * claim_mem), + self.compute['free_ram_mb']) + + self.assertEqual(2 * claim_disk, + self.compute['local_gb_used']) + self.assertEqual(6 - (2 * claim_disk), + self.compute['free_disk_gb']) + + # Finally, finish the claimm and update from the virt layer again. + # Resource usage will be consistent again: + self.tracker.finish_resource_claim(claim) + self.tracker.update_available_resource(self.context) + + self.assertEqual(claim_mem, + self.compute['memory_mb_used']) + self.assertEqual(5 - claim_mem, + self.compute['free_ram_mb']) + + self.assertEqual(claim_disk, + self.compute['local_gb_used']) + self.assertEqual(6 - claim_disk, + self.compute['free_disk_gb']) + + def testClaimAndAbort(self): + self.assertEqual(5, self.tracker.compute_node['memory_mb']) + self.assertEqual(0, self.tracker.compute_node['memory_mb_used']) + + self.assertEqual(6, self.tracker.compute_node['local_gb']) + self.assertEqual(0, self.tracker.compute_node['local_gb_used']) + + claim_mem = 3 + claim_disk = 2 + claim = self.tracker.begin_resource_claim(self.context, claim_mem, + claim_disk) + + self.assertEqual(5, self.compute["memory_mb"]) + self.assertEqual(claim_mem, self.compute["memory_mb_used"]) + self.assertEqual(5 - claim_mem, self.compute["free_ram_mb"]) + + self.assertEqual(6, self.compute["local_gb"]) + self.assertEqual(claim_disk, self.compute["local_gb_used"]) + self.assertEqual(6 - claim_disk, self.compute["free_disk_gb"]) + + self.tracker.abort_resource_claim(self.context, claim) + + self.assertEqual(5, self.compute["memory_mb"]) + self.assertEqual(0, self.compute["memory_mb_used"]) + self.assertEqual(5, self.compute["free_ram_mb"]) + + self.assertEqual(6, self.compute["local_gb"]) + self.assertEqual(0, self.compute["local_gb_used"]) + self.assertEqual(6, self.compute["free_disk_gb"]) + + def testExpiredClaims(self): + """Test that old claims get cleaned up automatically if not finished + or aborted explicitly. + """ + claim = self.tracker.begin_resource_claim(self.context, memory_mb=2, + disk_gb=2) + claim.expire_ts = timeutils.utcnow_ts() - 1 + self.assertTrue(claim.is_expired()) + + # and an unexpired claim + claim2 = self.tracker.begin_resource_claim(self.context, memory_mb=1, + disk_gb=1) + + self.assertEqual(2, len(self.tracker.claims)) + self.assertEqual(2 + 1, self.tracker.compute_node['memory_mb_used']) + self.assertEqual(2 + 1, self.tracker.compute_node['local_gb_used']) + + # expired claims get expunged when audit runs: + self.tracker.update_available_resource(self.context) + + self.assertEqual(1, len(self.tracker.claims)) + self.assertEqual(1, self.tracker.compute_node['memory_mb_used']) + self.assertEqual(1, self.tracker.compute_node['local_gb_used']) + + # and just call finish & abort to ensure expired claims do not cause + # any other explosions: + self.tracker.abort_resource_claim(self.context, claim) + self.tracker.finish_resource_claim(claim) + + def testInstanceClaim(self): + self.tracker.begin_instance_resource_claim(self.context, + self.instance_ref) + self.assertEqual(1, self.tracker.compute_node['memory_mb_used']) + self.assertEqual(2, self.tracker.compute_node['local_gb_used']) + + def testContextClaim(self): + with self.tracker.resource_claim(self.context, memory_mb=1, disk_gb=1): + # <insert exciting things that utilize resources> + self.assertEqual(1, self.tracker.compute_node['memory_mb_used']) + self.assertEqual(1, self.tracker.compute_node['local_gb_used']) + self.assertEqual(1, self.compute['memory_mb_used']) + self.assertEqual(1, self.compute['local_gb_used']) + + self.tracker.update_available_resource(self.context) + self.assertEqual(0, self.tracker.compute_node['memory_mb_used']) + self.assertEqual(0, self.tracker.compute_node['local_gb_used']) + self.assertEqual(0, self.compute['memory_mb_used']) + self.assertEqual(0, self.compute['local_gb_used']) + + def testContextClaimWithException(self): + try: + with self.tracker.resource_claim(self.context, memory_mb=1, + disk_gb=1): + # <insert exciting things that utilize resources> + raise Exception("THE SKY IS FALLING") + except Exception: + pass + + self.tracker.update_available_resource(self.context) + self.assertEqual(0, self.tracker.compute_node['memory_mb_used']) + self.assertEqual(0, self.tracker.compute_node['local_gb_used']) + self.assertEqual(0, self.compute['memory_mb_used']) + self.assertEqual(0, self.compute['local_gb_used']) + + def testInstanceContextClaim(self): + with self.tracker.instance_resource_claim(self.context, + self.instance_ref): + # <insert exciting things that utilize resources> + self.assertEqual(1, self.tracker.compute_node['memory_mb_used']) + self.assertEqual(2, self.tracker.compute_node['local_gb_used']) + self.assertEqual(1, self.compute['memory_mb_used']) + self.assertEqual(2, self.compute['local_gb_used']) + + self.tracker.update_available_resource(self.context) + self.assertEqual(0, self.tracker.compute_node['memory_mb_used']) + self.assertEqual(0, self.tracker.compute_node['local_gb_used']) + self.assertEqual(0, self.compute['memory_mb_used']) + self.assertEqual(0, self.compute['local_gb_used']) + + def testUpdateLoadStatsForInstance(self): + self.assertFalse(self.tracker.disabled) + self.assertEqual(0, self.tracker.compute_node['current_workload']) + + old_ref = self.instance_ref + old_ref['task_state'] = task_states.SCHEDULING + with self.tracker.instance_resource_claim(self.context, old_ref): + pass + + self.assertEqual(1, self.tracker.compute_node['current_workload']) + + new_ref = copy.copy(old_ref) + new_ref['vm_state'] = vm_states.ACTIVE + new_ref['task_state'] = None + + self.tracker.update_load_stats_for_instance(self.context, old_ref, + new_ref) + self.assertEqual(0, self.tracker.compute_node['current_workload']) diff --git a/nova/tests/compute/test_stats.py b/nova/tests/compute/test_stats.py new file mode 100644 index 000000000..0e4ac666d --- /dev/null +++ b/nova/tests/compute/test_stats.py @@ -0,0 +1,182 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2012 OpenStack, LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Tests for compute node stats""" + +from nova.compute import stats +from nova.compute import task_states +from nova.compute import vm_states +from nova import db +from nova import exception +from nova import test + + +class StatsTestCase(test.TestCase): + def setUp(self): + super(StatsTestCase, self).setUp() + self.stats = stats.Stats() + + def _create_instance(self, values=None): + instance = { + "os_type": "Linux", + "project_id": "1234", + "task_state": None, + "vm_state": vm_states.BUILDING, + "vcpus": 1, + } + if values: + instance.update(values) + return instance + + def testOsTypeCount(self): + os_type = "Linux" + self.assertEqual(0, self.stats.num_os_type(os_type)) + self.stats._increment("num_os_type_" + os_type) + self.stats._increment("num_os_type_" + os_type) + self.stats._increment("num_os_type_Vax") + self.assertEqual(2, self.stats.num_os_type(os_type)) + self.stats["num_os_type_" + os_type] -= 1 + self.assertEqual(1, self.stats.num_os_type(os_type)) + + def testUpdateProjectCount(self): + proj_id = "1234" + + def _get(): + return self.stats.num_instances_for_project(proj_id) + + self.assertEqual(0, _get()) + self.stats._increment("num_proj_" + proj_id) + self.assertEqual(1, _get()) + self.stats["num_proj_" + proj_id] -= 1 + self.assertEqual(0, _get()) + + def testInstanceCount(self): + self.assertEqual(0, self.stats.num_instances) + for i in range(5): + self.stats._increment("num_instances") + self.stats["num_instances"] -= 1 + self.assertEqual(4, self.stats.num_instances) + + def testAddStatsForInstance(self): + instance = { + "os_type": "Linux", + "project_id": "1234", + "task_state": None, + "vm_state": vm_states.BUILDING, + "vcpus": 3, + } + self.stats.add_stats_for_instance(instance) + + instance = { + "os_type": "FreeBSD", + "project_id": "1234", + "task_state": task_states.SCHEDULING, + "vm_state": None, + "vcpus": 1, + } + self.stats.add_stats_for_instance(instance) + + instance = { + "os_type": "Linux", + "project_id": "2345", + "task_state": task_states.SCHEDULING, + "vm_state": vm_states.BUILDING, + "vcpus": 2, + } + self.stats.add_stats_for_instance(instance) + + self.assertEqual(2, self.stats.num_os_type("Linux")) + self.assertEqual(1, self.stats.num_os_type("FreeBSD")) + + self.assertEquals(2, self.stats.num_instances_for_project("1234")) + self.assertEquals(1, self.stats.num_instances_for_project("2345")) + + self.assertEqual(1, self.stats["num_task_None"]) + self.assertEqual(2, self.stats["num_task_" + task_states.SCHEDULING]) + + self.assertEqual(1, self.stats["num_vm_None"]) + self.assertEqual(2, self.stats["num_vm_" + vm_states.BUILDING]) + + self.assertEqual(6, self.stats.num_vcpus_used) + + def testCalculateWorkload(self): + self.stats._increment("num_task_None") + self.stats._increment("num_task_" + task_states.SCHEDULING) + self.stats._increment("num_task_" + task_states.SCHEDULING) + self.assertEqual(2, self.stats.calculate_workload()) + + def testUpdateStatsForInstanceNoChange(self): + old = self._create_instance() + self.stats.add_stats_for_instance(old) + + self.stats.update_stats_for_instance(old, old) # no change + self.assertEqual(1, self.stats.num_instances) + self.assertEqual(1, self.stats.num_instances_for_project("1234")) + self.assertEqual(1, self.stats["num_os_type_Linux"]) + self.assertEqual(1, self.stats["num_task_None"]) + self.assertEqual(1, self.stats["num_vm_" + vm_states.BUILDING]) + + def testUpdateStatsForInstanceVmChange(self): + old = self._create_instance() + self.stats.add_stats_for_instance(old) + + new = self._create_instance({"vm_state": vm_states.PAUSED}) + self.stats.update_stats_for_instance(old, new) + self.assertEqual(1, self.stats.num_instances) + self.assertEqual(1, self.stats.num_instances_for_project(1234)) + self.assertEqual(1, self.stats.num_os_type("Linux")) + self.assertEqual(0, self.stats.num_vm_state(vm_states.BUILDING)) + self.assertEqual(1, self.stats.num_vm_state(vm_states.PAUSED)) + + def testUpdateStatsForInstanceVmChange(self): + old = self._create_instance() + self.stats.add_stats_for_instance(old) + + new = self._create_instance({"task_state": task_states.REBUILDING}) + self.stats.update_stats_for_instance(old, new) + self.assertEqual(1, self.stats.num_instances) + self.assertEqual(1, self.stats.num_instances_for_project("1234")) + self.assertEqual(1, self.stats["num_os_type_Linux"]) + self.assertEqual(0, self.stats["num_task_None"]) + self.assertEqual(1, self.stats["num_task_" + task_states.REBUILDING]) + + def testUpdateStatsForInstanceDeleted(self): + old = self._create_instance() + self.stats.add_stats_for_instance(old) + self.assertEqual(1, self.stats["num_proj_1234"]) + + new = self._create_instance({"vm_state": vm_states.DELETED}) + self.stats.update_stats_for_instance(old, new) + + self.assertEqual(0, self.stats.num_instances) + self.assertEqual(0, self.stats.num_instances_for_project("1234")) + self.assertEqual(0, self.stats.num_os_type("Linux")) + self.assertEqual(0, self.stats["num_vm_" + vm_states.BUILDING]) + self.assertEqual(0, self.stats.num_vcpus_used) + + def testIoWorkload(self): + vms = [vm_states.ACTIVE, vm_states.BUILDING, vm_states.PAUSED] + tasks = [task_states.RESIZE_MIGRATING, task_states.REBUILDING, + task_states.RESIZE_PREP, task_states.IMAGE_SNAPSHOT, + task_states.IMAGE_BACKUP, task_states.RESCUING] + + for state in vms: + self.stats._increment("num_vm_" + state) + for state in tasks: + self.stats._increment("num_task_" + state) + + self.assertEqual(6, self.stats.calculate_io_workload()) diff --git a/nova/tests/scheduler/fakes.py b/nova/tests/scheduler/fakes.py index 84424c1c7..e48e987a4 100644 --- a/nova/tests/scheduler/fakes.py +++ b/nova/tests/scheduler/fakes.py @@ -27,12 +27,16 @@ from nova.scheduler import host_manager COMPUTE_NODES = [ dict(id=1, local_gb=1024, memory_mb=1024, vcpus=1, + free_disk_gb=512, free_ram_mb=512, vcpus_used=1, service=dict(host='host1', disabled=False)), dict(id=2, local_gb=2048, memory_mb=2048, vcpus=2, + free_disk_gb=1024, free_ram_mb=1024, vcpus_used=2, service=dict(host='host2', disabled=True)), dict(id=3, local_gb=4096, memory_mb=4096, vcpus=4, + free_disk_gb=3072, free_ram_mb=3072, vcpus_used=1, service=dict(host='host3', disabled=False)), dict(id=4, local_gb=8192, memory_mb=8192, vcpus=8, + free_disk_gb=8192, free_ram_mb=8192, vcpus_used=0, service=dict(host='host4', disabled=False)), # Broken entry dict(id=5, local_gb=1024, memory_mb=1024, vcpus=1, service=None), @@ -128,8 +132,5 @@ class FakeComputeAPI(object): def mox_host_manager_db_calls(mock, context): mock.StubOutWithMock(db, 'compute_node_get_all') - mock.StubOutWithMock(db, 'instance_get_all') db.compute_node_get_all(mox.IgnoreArg()).AndReturn(COMPUTE_NODES) - db.instance_get_all(mox.IgnoreArg(), - columns_to_join=['instance_type']).AndReturn(INSTANCES) diff --git a/nova/tests/scheduler/test_filter_scheduler.py b/nova/tests/scheduler/test_filter_scheduler.py index 64bf5415f..1cab6ebbf 100644 --- a/nova/tests/scheduler/test_filter_scheduler.py +++ b/nova/tests/scheduler/test_filter_scheduler.py @@ -203,7 +203,8 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase): self.assertEquals(weight, -1.0) hostinfo = host_manager.HostState('host', 'compute') hostinfo.update_from_compute_node(dict(memory_mb=1000, - local_gb=0, vcpus=1)) + local_gb=0, vcpus=1, free_disk_gb=1000, free_ram_mb=1000, + vcpus_used=0)) self.assertEquals(1000 - 128, fn(hostinfo, {})) def test_max_attempts(self): diff --git a/nova/tests/scheduler/test_host_filters.py b/nova/tests/scheduler/test_host_filters.py index d6f083576..a25e9afae 100644 --- a/nova/tests/scheduler/test_host_filters.py +++ b/nova/tests/scheduler/test_host_filters.py @@ -339,6 +339,24 @@ class HostFiltersTestCase(test.TestCase): 'capabilities': capabilities, 'service': service}) self.assertTrue(filt_cls.host_passes(host, filter_properties)) + def test_ram_filter_sets_memory_limit(self): + """Test that ram filter sets a filter_property denoting the memory + ceiling. + """ + self._stub_service_is_up(True) + filt_cls = self.class_map['RamFilter']() + self.flags(ram_allocation_ratio=2.0) + filter_properties = {'instance_type': {'memory_mb': 1024}} + capabilities = {'enabled': True} + service = {'disabled': False} + host = fakes.FakeHostState('host1', 'compute', + {'free_ram_mb': -1024, 'total_usable_ram_mb': 2048, + 'capabilities': capabilities, 'service': service}) + filt_cls.host_passes(host, filter_properties) + + self.assertEqual(host.total_usable_ram_mb * 2.0, + filter_properties['memory_limit_mb']) + def test_compute_filter_fails_on_service_disabled(self): self._stub_service_is_up(True) filt_cls = self.class_map['ComputeFilter']() diff --git a/nova/tests/scheduler/test_host_manager.py b/nova/tests/scheduler/test_host_manager.py index 45ad1021b..a2f9fc425 100644 --- a/nova/tests/scheduler/test_host_manager.py +++ b/nova/tests/scheduler/test_host_manager.py @@ -130,14 +130,10 @@ class HostManagerTestCase(test.TestCase): self.mox.StubOutWithMock(db, 'compute_node_get_all') self.mox.StubOutWithMock(host_manager.LOG, 'warn') - self.mox.StubOutWithMock(db, 'instance_get_all') db.compute_node_get_all(context).AndReturn(fakes.COMPUTE_NODES) # Invalid service host_manager.LOG.warn("No service for compute ID 5") - db.instance_get_all(context, - columns_to_join=['instance_type']).AndReturn( - fakes.INSTANCES) self.mox.ReplayAll() host_states = self.host_manager.get_all_host_states(context, topic) diff --git a/nova/tests/test_db_api.py b/nova/tests/test_db_api.py index 93e0edfbc..e2420b48f 100644 --- a/nova/tests/test_db_api.py +++ b/nova/tests/test_db_api.py @@ -887,9 +887,15 @@ class CapacityTestCase(test.TestCase): self.compute_node_dict = dict(vcpus=2, memory_mb=1024, local_gb=2048, vcpus_used=0, memory_mb_used=0, - local_gb_used=0, hypervisor_type="xen", + local_gb_used=0, free_ram_mb=1024, + free_disk_gb=2048, hypervisor_type="xen", hypervisor_version=1, cpu_info="", + running_vms=0, current_workload=0, service_id=self.service.id) + # add some random stats + stats = dict(num_instances=3, num_proj_12345=2, + num_proj_23456=2, num_vm_building=3) + self.compute_node_dict['stats'] = stats self.flags(reserved_host_memory_mb=0) self.flags(reserved_host_disk_mb=0) @@ -898,6 +904,13 @@ class CapacityTestCase(test.TestCase): self.compute_node_dict['host'] = host return db.compute_node_create(self.ctxt, self.compute_node_dict) + def _stats_as_dict(self, stats): + d = {} + for s in stats: + key = s['key'] + d[key] = s['value'] + return d + def test_compute_node_create(self): item = self._create_helper('host1') self.assertEquals(item.free_ram_mb, 1024) @@ -905,70 +918,63 @@ class CapacityTestCase(test.TestCase): self.assertEquals(item.running_vms, 0) self.assertEquals(item.current_workload, 0) - def test_compute_node_create_with_reservations(self): - self.flags(reserved_host_memory_mb=256) + stats = self._stats_as_dict(item['stats']) + self.assertEqual(3, stats['num_instances']) + self.assertEqual(2, stats['num_proj_12345']) + self.assertEqual(3, stats['num_vm_building']) + + def test_compute_node_get_all(self): + item = self._create_helper('host1') + nodes = db.compute_node_get_all(self.ctxt) + self.assertEqual(1, len(nodes)) + + node = nodes[0] + self.assertEqual(2, node['vcpus']) + + stats = self._stats_as_dict(node['stats']) + self.assertEqual(3, int(stats['num_instances'])) + self.assertEqual(2, int(stats['num_proj_12345'])) + self.assertEqual(3, int(stats['num_vm_building'])) + + def test_compute_node_update(self): + item = self._create_helper('host1') + + compute_node_id = item['id'] + stats = self._stats_as_dict(item['stats']) + + # change some values: + stats['num_instances'] = 8 + stats['num_tribbles'] = 1 + values = { + 'vcpus': 4, + 'stats': stats, + } + item = db.compute_node_update(self.ctxt, compute_node_id, values) + stats = self._stats_as_dict(item['stats']) + + self.assertEqual(4, item['vcpus']) + self.assertEqual(8, int(stats['num_instances'])) + self.assertEqual(2, int(stats['num_proj_12345'])) + self.assertEqual(1, int(stats['num_tribbles'])) + + def test_compute_node_stat_prune(self): item = self._create_helper('host1') - self.assertEquals(item.free_ram_mb, 1024 - 256) - - def test_compute_node_set(self): - self._create_helper('host1') - - x = db.compute_node_utilization_set(self.ctxt, 'host1', - free_ram_mb=2048, free_disk_gb=4096) - self.assertEquals(x.free_ram_mb, 2048) - self.assertEquals(x.free_disk_gb, 4096) - self.assertEquals(x.running_vms, 0) - self.assertEquals(x.current_workload, 0) - - x = db.compute_node_utilization_set(self.ctxt, 'host1', work=3) - self.assertEquals(x.free_ram_mb, 2048) - self.assertEquals(x.free_disk_gb, 4096) - self.assertEquals(x.current_workload, 3) - self.assertEquals(x.running_vms, 0) - - x = db.compute_node_utilization_set(self.ctxt, 'host1', vms=5) - self.assertEquals(x.free_ram_mb, 2048) - self.assertEquals(x.free_disk_gb, 4096) - self.assertEquals(x.current_workload, 3) - self.assertEquals(x.running_vms, 5) - - def test_compute_node_utilization_update(self): - self._create_helper('host1') - - x = db.compute_node_utilization_update(self.ctxt, 'host1', - free_ram_mb_delta=-24) - self.assertEquals(x.free_ram_mb, 1000) - self.assertEquals(x.free_disk_gb, 2048) - self.assertEquals(x.running_vms, 0) - self.assertEquals(x.current_workload, 0) - - x = db.compute_node_utilization_update(self.ctxt, 'host1', - free_disk_gb_delta=-48) - self.assertEquals(x.free_ram_mb, 1000) - self.assertEquals(x.free_disk_gb, 2000) - self.assertEquals(x.running_vms, 0) - self.assertEquals(x.current_workload, 0) - - x = db.compute_node_utilization_update(self.ctxt, 'host1', - work_delta=3) - self.assertEquals(x.free_ram_mb, 1000) - self.assertEquals(x.free_disk_gb, 2000) - self.assertEquals(x.current_workload, 3) - self.assertEquals(x.running_vms, 0) - - x = db.compute_node_utilization_update(self.ctxt, 'host1', - work_delta=-1) - self.assertEquals(x.free_ram_mb, 1000) - self.assertEquals(x.free_disk_gb, 2000) - self.assertEquals(x.current_workload, 2) - self.assertEquals(x.running_vms, 0) - - x = db.compute_node_utilization_update(self.ctxt, 'host1', - vm_delta=5) - self.assertEquals(x.free_ram_mb, 1000) - self.assertEquals(x.free_disk_gb, 2000) - self.assertEquals(x.current_workload, 2) - self.assertEquals(x.running_vms, 5) + for stat in item['stats']: + if stat['key'] == 'num_instances': + num_instance_stat = stat + break + + values = { + 'stats': dict(num_instances=1) + } + db.compute_node_update(self.ctxt, item['id'], values, prune_stats=True) + item = db.compute_node_get_all(self.ctxt)[0] + self.assertEqual(1, len(item['stats'])) + + stat = item['stats'][0] + self.assertEqual(num_instance_stat['id'], stat['id']) + self.assertEqual(num_instance_stat['key'], stat['key']) + self.assertEqual(1, int(stat['value'])) class TestIpAllocation(test.TestCase): diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py index c84a924e3..5ab059d07 100644 --- a/nova/tests/test_xenapi.py +++ b/nova/tests/test_xenapi.py @@ -1240,7 +1240,7 @@ class XenAPIHostTestCase(stubs.XenAPITestBase): def test_host_state(self): stats = self.conn.get_host_stats() - self.assertEquals(stats['disk_total'], 10000) + self.assertEquals(stats['disk_total'], 40000) self.assertEquals(stats['disk_used'], 20000) self.assertEquals(stats['host_memory_total'], 10) self.assertEquals(stats['host_memory_overhead'], 20) diff --git a/nova/virt/baremetal/driver.py b/nova/virt/baremetal/driver.py index 9fcdf4eb2..d92688276 100644 --- a/nova/virt/baremetal/driver.py +++ b/nova/virt/baremetal/driver.py @@ -659,22 +659,13 @@ class BareMetalDriver(driver.ComputeDriver): # Bare metal doesn't currently support security groups pass - def update_available_resource(self, ctxt, host): + def get_available_resource(self): """Updates compute manager resource info on ComputeNode table. This method is called when nova-coompute launches, and whenever admin executes "nova-manage service update_resource". - - :param ctxt: security context - :param host: hostname that compute manager is currently running - """ - try: - service_ref = db.service_get_all_compute_by_host(ctxt, host)[0] - except exception.NotFound: - raise exception.ComputeServiceUnavailable(host=host) - # Updating host information dic = {'vcpus': self.get_vcpu_total(), 'memory_mb': self.get_memory_mb_total(), @@ -685,18 +676,10 @@ class BareMetalDriver(driver.ComputeDriver): 'hypervisor_type': self.get_hypervisor_type(), 'hypervisor_version': self.get_hypervisor_version(), 'cpu_info': self.get_cpu_info(), - 'cpu_arch': FLAGS.cpu_arch, - 'service_id': service_ref['id']} + 'cpu_arch': FLAGS.cpu_arch} - compute_node_ref = service_ref['compute_node'] LOG.info(_('#### RLK: cpu_arch = %s ') % FLAGS.cpu_arch) - if not compute_node_ref: - LOG.info(_('Compute_service record created for %s ') % host) - dic['service_id'] = service_ref['id'] - db.compute_node_create(ctxt, dic) - else: - LOG.info(_('Compute_service record updated for %s ') % host) - db.compute_node_update(ctxt, compute_node_ref[0]['id'], dic) + return dic def ensure_filtering_rules_for_instance(self, instance_ref, network_info): raise NotImplementedError() diff --git a/nova/virt/driver.py b/nova/virt/driver.py index dd3646bc9..1d8f94e9d 100644 --- a/nova/virt/driver.py +++ b/nova/virt/driver.py @@ -325,15 +325,13 @@ class ComputeDriver(object): """Power on the specified instance""" raise NotImplementedError() - def update_available_resource(self, ctxt, host): - """Updates compute manager resource info on ComputeNode table. + def get_available_resource(self): + """Retrieve resource information. This method is called when nova-compute launches, and - whenever admin executes "nova-manage service update_resource". - - :param ctxt: security context - :param host: hostname that compute manager is currently running + as part of a periodic task + :returns: Dictionary describing resources """ raise NotImplementedError() diff --git a/nova/virt/fake.py b/nova/virt/fake.py index dd20b0b15..db7f3a6ff 100644 --- a/nova/virt/fake.py +++ b/nova/virt/fake.py @@ -221,19 +221,13 @@ class FakeDriver(driver.ComputeDriver): def refresh_provider_fw_rules(self): pass - def update_available_resource(self, ctxt, host): + def get_available_resource(self): """Updates compute manager resource info on ComputeNode table. Since we don't have a real hypervisor, pretend we have lots of disk and ram. """ - try: - service_ref = db.service_get_all_compute_by_host(ctxt, host)[0] - except exception.NotFound: - raise exception.ComputeServiceUnavailable(host=host) - - # Updating host information dic = {'vcpus': 1, 'memory_mb': 4096, 'local_gb': 1028, @@ -242,16 +236,8 @@ class FakeDriver(driver.ComputeDriver): 'local_gb_used': 0, 'hypervisor_type': 'fake', 'hypervisor_version': '1.0', - 'service_id': service_ref['id'], - 'cpu_info': '?'} - - compute_node_ref = service_ref['compute_node'] - if not compute_node_ref: - LOG.info(_('Compute_service record created for %s ') % host) - db.compute_node_create(ctxt, dic) - else: - LOG.info(_('Compute_service record updated for %s ') % host) - db.compute_node_update(ctxt, compute_node_ref[0]['id'], dic) + 'cpu_info': '?'} + return dic def ensure_filtering_rules_for_instance(self, instance_ref, network_info): """This method is supported only by libvirt.""" diff --git a/nova/virt/libvirt/driver.py b/nova/virt/libvirt/driver.py index 1e2706841..adc0c8c7b 100644 --- a/nova/virt/libvirt/driver.py +++ b/nova/virt/libvirt/driver.py @@ -2173,23 +2173,14 @@ class LibvirtDriver(driver.ComputeDriver): def refresh_provider_fw_rules(self): self.firewall_driver.refresh_provider_fw_rules() - def update_available_resource(self, ctxt, host): - """Updates compute manager resource info on ComputeNode table. + def get_available_resource(self): + """Retrieve resource info. This method is called as a periodic task and is used only in live migration currently. - :param ctxt: security context - :param host: hostname that compute manager is currently running - + :returns: dictionary containing resource info """ - - try: - service_ref = db.service_get_all_compute_by_host(ctxt, host)[0] - except exception.NotFound: - raise exception.ComputeServiceUnavailable(host=host) - - # Updating host information dic = {'vcpus': self.get_vcpu_total(), 'memory_mb': self.get_memory_mb_total(), 'local_gb': self.get_local_gb_total(), @@ -2200,16 +2191,8 @@ class LibvirtDriver(driver.ComputeDriver): 'hypervisor_version': self.get_hypervisor_version(), 'hypervisor_hostname': self.get_hypervisor_hostname(), 'cpu_info': self.get_cpu_info(), - 'service_id': service_ref['id'], 'disk_available_least': self.get_disk_available_least()} - - compute_node_ref = service_ref['compute_node'] - if not compute_node_ref: - LOG.info(_('Compute_service record created for %s ') % host) - db.compute_node_create(ctxt, dic) - else: - LOG.info(_('Compute_service record updated for %s ') % host) - db.compute_node_update(ctxt, compute_node_ref[0]['id'], dic) + return dic def check_can_live_migrate_destination(self, ctxt, instance_ref, block_migration=False, diff --git a/nova/virt/vmwareapi/driver.py b/nova/virt/vmwareapi/driver.py index 947bd3422..98d5dc306 100644 --- a/nova/virt/vmwareapi/driver.py +++ b/nova/virt/vmwareapi/driver.py @@ -196,7 +196,7 @@ class VMWareESXDriver(driver.ComputeDriver): 'username': FLAGS.vmwareapi_host_username, 'password': FLAGS.vmwareapi_host_password} - def update_available_resource(self, ctxt, host): + def get_available_resource(self): """This method is supported only by libvirt.""" return diff --git a/nova/virt/xenapi/driver.py b/nova/virt/xenapi/driver.py index 3709c13af..6eb39e2b7 100644 --- a/nova/virt/xenapi/driver.py +++ b/nova/virt/xenapi/driver.py @@ -372,26 +372,22 @@ class XenAPIDriver(driver.ComputeDriver): 'username': FLAGS.xenapi_connection_username, 'password': FLAGS.xenapi_connection_password} - def update_available_resource(self, ctxt, host): - """Updates compute manager resource info on ComputeNode table. + def get_available_resource(self): + """Retrieve resource info. This method is called when nova-compute launches, and - whenever admin executes "nova-manage service update_resource". + as part of a periodic task. - :param ctxt: security context - :param host: hostname that compute manager is currently running + :returns: dictionary describing resources """ - try: - service_ref = db.service_get_all_compute_by_host(ctxt, host)[0] - except exception.NotFound: - raise exception.ComputeServiceUnavailable(host=host) - host_stats = self.get_host_stats(refresh=True) # Updating host information total_ram_mb = host_stats['host_memory_total'] / (1024 * 1024) - free_ram_mb = host_stats['host_memory_free'] / (1024 * 1024) + # NOTE(belliott) memory-free-computed is a value provided by XenServer + # for gauging free memory more conservatively than memory-free. + free_ram_mb = host_stats['host_memory_free_computed'] / (1024 * 1024) total_disk_gb = host_stats['disk_total'] / (1024 * 1024 * 1024) used_disk_gb = host_stats['disk_used'] / (1024 * 1024 * 1024) @@ -404,16 +400,9 @@ class XenAPIDriver(driver.ComputeDriver): 'hypervisor_type': 'xen', 'hypervisor_version': 0, 'hypervisor_hostname': host_stats['host_hostname'], - 'service_id': service_ref['id'], 'cpu_info': host_stats['host_cpu_info']['cpu_count']} - compute_node_ref = service_ref['compute_node'] - if not compute_node_ref: - LOG.info(_('Compute_service record created for %s ') % host) - db.compute_node_create(ctxt, dic) - else: - LOG.info(_('Compute_service record updated for %s ') % host) - db.compute_node_update(ctxt, compute_node_ref[0]['id'], dic) + return dic def ensure_filtering_rules_for_instance(self, instance_ref, network_info): # NOTE(salvatore-orlando): it enforces security groups on diff --git a/nova/virt/xenapi/fake.py b/nova/virt/xenapi/fake.py index 1f8e400c8..7cb6b5f59 100644 --- a/nova/virt/xenapi/fake.py +++ b/nova/virt/xenapi/fake.py @@ -254,6 +254,7 @@ def create_local_srs(): other_config={'i18n-original-value-name_label': 'Local storage', 'i18n-key': 'local-storage'}, + physical_size=40000, physical_utilisation=20000, virtual_allocation=10000, host_ref=host_ref) @@ -262,6 +263,7 @@ def create_local_srs(): other_config={'i18n-original-value-name_label': 'Local storage ISO', 'i18n-key': 'local-storage-iso'}, + physical_size=80000, physical_utilisation=40000, virtual_allocation=80000, host_ref=host_ref) diff --git a/nova/virt/xenapi/host.py b/nova/virt/xenapi/host.py index b45a9106c..9db708925 100644 --- a/nova/virt/xenapi/host.py +++ b/nova/virt/xenapi/host.py @@ -155,8 +155,9 @@ class HostState(object): # No SR configured LOG.error(_("Unable to get SR for this host: %s") % e) return + self._session.call_xenapi("SR.scan", sr_ref) sr_rec = self._session.call_xenapi("SR.get_record", sr_ref) - total = int(sr_rec["virtual_allocation"]) + total = int(sr_rec["physical_size"]) used = int(sr_rec["physical_utilisation"]) data["disk_total"] = total data["disk_used"] = used |
