diff options
| author | Dan Wendlandt <dan@nicira.com> | 2011-09-07 14:27:06 -0700 |
|---|---|---|
| committer | Dan Wendlandt <dan@nicira.com> | 2011-09-07 14:27:06 -0700 |
| commit | 0f5eb3f888de5f6eb23f968fa5a2270d2a350bcc (patch) | |
| tree | 0ee3f8a60147da042b0b14db350a320228876fa6 /nova/compute | |
| parent | e5e3b306985a3b1fdd8a971f48b76eaf8f923f21 (diff) | |
| parent | 3e0698e3b57c9f73a359340f51c2797d8adc669a (diff) | |
merge trunk
Diffstat (limited to 'nova/compute')
| -rw-r--r-- | nova/compute/api.py | 196 | ||||
| -rw-r--r-- | nova/compute/manager.py | 453 | ||||
| -rw-r--r-- | nova/compute/task_states.py | 59 | ||||
| -rw-r--r-- | nova/compute/vm_states.py | 39 |
4 files changed, 471 insertions, 276 deletions
diff --git a/nova/compute/api.py b/nova/compute/api.py index 3b4bde8ea..4e2944bb7 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -19,13 +19,11 @@ """Handles all requests relating to instances (guest vms).""" -import eventlet import novaclient import re import time from nova import block_device -from nova import db from nova import exception from nova import flags import nova.image @@ -37,6 +35,8 @@ from nova import utils from nova import volume from nova.compute import instance_types from nova.compute import power_state +from nova.compute import task_states +from nova.compute import vm_states from nova.compute.utils import terminate_volumes from nova.scheduler import api as scheduler_api from nova.db import base @@ -75,12 +75,18 @@ def generate_default_hostname(instance): def _is_able_to_shutdown(instance, instance_id): - states = {'terminating': "Instance %s is already being terminated", - 'migrating': "Instance %s is being migrated", - 'stopping': "Instance %s is being stopped"} - msg = states.get(instance['state_description']) - if msg: - LOG.warning(_(msg), instance_id) + vm_state = instance["vm_state"] + task_state = instance["task_state"] + + valid_shutdown_states = [ + vm_states.ACTIVE, + vm_states.REBUILDING, + vm_states.BUILDING, + ] + + if vm_state not in valid_shutdown_states: + LOG.warn(_("Instance %(instance_id)s is not in an 'active' state. It " + "is currently %(vm_state)s. Shutdown aborted.") % locals()) return False return True @@ -237,7 +243,7 @@ class API(base.Base): self.ensure_default_security_group(context) if key_data is None and key_name: - key_pair = db.key_pair_get(context, context.user_id, key_name) + key_pair = self.db.key_pair_get(context, context.user_id, key_name) key_data = key_pair['public_key'] if reservation_id is None: @@ -251,10 +257,10 @@ class API(base.Base): 'image_ref': image_href, 'kernel_id': kernel_id or '', 'ramdisk_id': ramdisk_id or '', + 'power_state': power_state.NOSTATE, + 'vm_state': vm_states.BUILDING, 'config_drive_id': config_drive_id or '', 'config_drive': config_drive or '', - 'state': 0, - 'state_description': 'scheduling', 'user_id': context.user_id, 'project_id': context.project_id, 'launch_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()), @@ -377,10 +383,6 @@ class API(base.Base): If you are changing this method, be sure to update both call paths. """ - instance = dict(launch_index=num, **base_options) - instance = self.db.instance_create(context, instance) - instance_id = instance['id'] - elevated = context.elevated() if security_group is None: security_group = ['default'] @@ -389,11 +391,15 @@ class API(base.Base): security_groups = [] for security_group_name in security_group: - group = db.security_group_get_by_name(context, - context.project_id, - security_group_name) + group = self.db.security_group_get_by_name(context, + context.project_id, + security_group_name) security_groups.append(group['id']) + instance = dict(launch_index=num, **base_options) + instance = self.db.instance_create(context, instance) + instance_id = instance['id'] + for security_group_id in security_groups: self.db.instance_add_security_group(elevated, instance_id, @@ -415,6 +421,8 @@ class API(base.Base): updates['display_name'] = "Server %s" % instance_id instance['display_name'] = updates['display_name'] updates['hostname'] = self.hostname_factory(instance) + updates['vm_state'] = vm_states.BUILDING + updates['task_state'] = task_states.SCHEDULING instance = self.update(context, instance_id, **updates) return instance @@ -551,8 +559,9 @@ class API(base.Base): def has_finished_migration(self, context, instance_uuid): """Returns true if an instance has a finished migration.""" try: - db.migration_get_by_instance_and_status(context, instance_uuid, - 'finished') + self.db.migration_get_by_instance_and_status(context, + instance_uuid, + 'finished') return True except exception.NotFound: return False @@ -566,14 +575,15 @@ class API(base.Base): :param context: the security context """ try: - db.security_group_get_by_name(context, context.project_id, - 'default') + self.db.security_group_get_by_name(context, + context.project_id, + 'default') except exception.NotFound: values = {'name': 'default', 'description': 'default', 'user_id': context.user_id, 'project_id': context.project_id} - db.security_group_create(context, values) + self.db.security_group_create(context, values) def trigger_security_group_rules_refresh(self, context, security_group_id): """Called when a rule is added to or removed from a security_group.""" @@ -638,7 +648,7 @@ class API(base.Base): """Called when a rule is added to or removed from a security_group""" hosts = [x['host'] for (x, idx) - in db.service_get_all_compute_sorted(context)] + in self.db.service_get_all_compute_sorted(context)] for host in hosts: rpc.cast(context, self.db.queue_get_for(context, FLAGS.compute_topic, host), @@ -666,11 +676,11 @@ class API(base.Base): def add_security_group(self, context, instance_id, security_group_name): """Add security group to the instance""" - security_group = db.security_group_get_by_name(context, - context.project_id, - security_group_name) + security_group = self.db.security_group_get_by_name(context, + context.project_id, + security_group_name) # check if the server exists - inst = db.instance_get(context, instance_id) + inst = self.db.instance_get(context, instance_id) #check if the security group is associated with the server if self._is_security_group_associated_with_server(security_group, instance_id): @@ -682,21 +692,21 @@ class API(base.Base): if inst['state'] != power_state.RUNNING: raise exception.InstanceNotRunning(instance_id=instance_id) - db.instance_add_security_group(context.elevated(), - instance_id, - security_group['id']) + self.db.instance_add_security_group(context.elevated(), + instance_id, + security_group['id']) rpc.cast(context, - db.queue_get_for(context, FLAGS.compute_topic, inst['host']), + self.db.queue_get_for(context, FLAGS.compute_topic, inst['host']), {"method": "refresh_security_group_rules", "args": {"security_group_id": security_group['id']}}) def remove_security_group(self, context, instance_id, security_group_name): """Remove the security group associated with the instance""" - security_group = db.security_group_get_by_name(context, - context.project_id, - security_group_name) + security_group = self.db.security_group_get_by_name(context, + context.project_id, + security_group_name) # check if the server exists - inst = db.instance_get(context, instance_id) + inst = self.db.instance_get(context, instance_id) #check if the security group is associated with the server if not self._is_security_group_associated_with_server(security_group, instance_id): @@ -708,11 +718,11 @@ class API(base.Base): if inst['state'] != power_state.RUNNING: raise exception.InstanceNotRunning(instance_id=instance_id) - db.instance_remove_security_group(context.elevated(), - instance_id, - security_group['id']) + self.db.instance_remove_security_group(context.elevated(), + instance_id, + security_group['id']) rpc.cast(context, - db.queue_get_for(context, FLAGS.compute_topic, inst['host']), + self.db.queue_get_for(context, FLAGS.compute_topic, inst['host']), {"method": "refresh_security_group_rules", "args": {"security_group_id": security_group['id']}}) @@ -750,10 +760,8 @@ class API(base.Base): return self.update(context, - instance['id'], - state_description='terminating', - state=0, - terminated_at=utils.utcnow()) + instance_id, + task_state=task_states.DELETING) host = instance['host'] if host: @@ -773,9 +781,9 @@ class API(base.Base): return self.update(context, - instance['id'], - state_description='stopping', - state=power_state.NOSTATE, + instance_id, + vm_state=vm_states.ACTIVE, + task_state=task_states.STOPPING, terminated_at=utils.utcnow()) host = instance['host'] @@ -787,12 +795,18 @@ class API(base.Base): """Start an instance.""" LOG.debug(_("Going to try to start %s"), instance_id) instance = self._get_instance(context, instance_id, 'starting') - if instance['state_description'] != 'stopped': - _state_description = instance['state_description'] + vm_state = instance["vm_state"] + + if vm_state != vm_states.STOPPED: LOG.warning(_("Instance %(instance_id)s is not " - "stopped(%(_state_description)s)") % locals()) + "stopped. (%(vm_state)s)") % locals()) return + self.update(context, + instance_id, + vm_state=vm_states.STOPPED, + task_state=task_states.STARTING) + # TODO(yamahata): injected_files isn't supported right now. # It is used only for osapi. not for ec2 api. # availability_zone isn't used by run_instance. @@ -802,6 +816,15 @@ class API(base.Base): "args": {"topic": FLAGS.compute_topic, "instance_id": instance_id}}) + def get_active_by_window(self, context, begin, end=None, project_id=None): + """Get instances that were continuously active over a window.""" + return self.db.instance_get_active_by_window(context, begin, end, + project_id) + + def get_instance_type(self, context, instance_type_id): + """Get an instance type by instance type id.""" + return self.db.instance_type_get(context, instance_type_id) + def get(self, context, instance_id): """Get a single instance with the given instance_id.""" # NOTE(sirp): id used to be exclusively integer IDs; now we're @@ -854,6 +877,7 @@ class API(base.Base): 'image': 'image_ref', 'name': 'display_name', 'instance_name': 'name', + 'tenant_id': 'project_id', 'recurse_zones': None, 'flavor': _remap_flavor_filter, 'fixed_ip': _remap_fixed_ip_filter} @@ -1001,7 +1025,7 @@ class API(base.Base): :param extra_properties: dict of extra image properties to include """ - instance = db.api.instance_get(context, instance_id) + instance = self.db.instance_get(context, instance_id) properties = {'instance_uuid': instance['uuid'], 'user_id': str(context.user_id), 'image_state': 'creating', @@ -1020,28 +1044,36 @@ class API(base.Base): @scheduler_api.reroute_compute("reboot") def reboot(self, context, instance_id): """Reboot the given instance.""" + self.update(context, + instance_id, + vm_state=vm_states.ACTIVE, + task_state=task_states.REBOOTING) self._cast_compute_message('reboot_instance', context, instance_id) @scheduler_api.reroute_compute("rebuild") def rebuild(self, context, instance_id, image_href, admin_password, name=None, metadata=None, files_to_inject=None): """Rebuild the given instance with the provided metadata.""" - instance = db.api.instance_get(context, instance_id) + instance = self.db.instance_get(context, instance_id) + name = name or instance["display_name"] - if instance["state"] == power_state.BUILDING: - msg = _("Instance already building") - raise exception.BuildInProgress(msg) + if instance["vm_state"] != vm_states.ACTIVE: + msg = _("Instance must be active to rebuild.") + raise exception.RebuildRequiresActiveInstance(msg) files_to_inject = files_to_inject or [] + metadata = metadata or {} + self._check_injected_file_quota(context, files_to_inject) + self._check_metadata_properties_quota(context, metadata) - values = {"image_ref": image_href} - if metadata is not None: - self._check_metadata_properties_quota(context, metadata) - values['metadata'] = metadata - if name is not None: - values['display_name'] = name - self.db.instance_update(context, instance_id, values) + self.update(context, + instance_id, + metadata=metadata, + display_name=name, + image_ref=image_href, + vm_state=vm_states.ACTIVE, + task_state=task_states.REBUILDING) rebuild_params = { "new_pass": admin_password, @@ -1065,6 +1097,11 @@ class API(base.Base): raise exception.MigrationNotFoundByStatus(instance_id=instance_id, status='finished') + self.update(context, + instance_id, + vm_state=vm_states.ACTIVE, + task_state=None) + params = {'migration_id': migration_ref['id']} self._cast_compute_message('revert_resize', context, instance_ref['uuid'], @@ -1085,6 +1122,12 @@ class API(base.Base): if not migration_ref: raise exception.MigrationNotFoundByStatus(instance_id=instance_id, status='finished') + + self.update(context, + instance_id, + vm_state=vm_states.ACTIVE, + task_state=None) + params = {'migration_id': migration_ref['id']} self._cast_compute_message('confirm_resize', context, instance_ref['uuid'], @@ -1130,6 +1173,11 @@ class API(base.Base): if (current_memory_mb == new_memory_mb) and flavor_id: raise exception.CannotResizeToSameSize() + self.update(context, + instance_id, + vm_state=vm_states.RESIZING, + task_state=task_states.RESIZE_PREP) + instance_ref = self._get_instance(context, instance_id, 'resize') self._cast_scheduler_message(context, {"method": "prep_resize", @@ -1163,11 +1211,19 @@ class API(base.Base): @scheduler_api.reroute_compute("pause") def pause(self, context, instance_id): """Pause the given instance.""" + self.update(context, + instance_id, + vm_state=vm_states.ACTIVE, + task_state=task_states.PAUSING) self._cast_compute_message('pause_instance', context, instance_id) @scheduler_api.reroute_compute("unpause") def unpause(self, context, instance_id): """Unpause the given instance.""" + self.update(context, + instance_id, + vm_state=vm_states.PAUSED, + task_state=task_states.UNPAUSING) self._cast_compute_message('unpause_instance', context, instance_id) def _call_compute_message_for_host(self, action, context, host, params): @@ -1200,21 +1256,37 @@ class API(base.Base): @scheduler_api.reroute_compute("suspend") def suspend(self, context, instance_id): """Suspend the given instance.""" + self.update(context, + instance_id, + vm_state=vm_states.ACTIVE, + task_state=task_states.SUSPENDING) self._cast_compute_message('suspend_instance', context, instance_id) @scheduler_api.reroute_compute("resume") def resume(self, context, instance_id): """Resume the given instance.""" + self.update(context, + instance_id, + vm_state=vm_states.SUSPENDED, + task_state=task_states.RESUMING) self._cast_compute_message('resume_instance', context, instance_id) @scheduler_api.reroute_compute("rescue") def rescue(self, context, instance_id): """Rescue the given instance.""" + self.update(context, + instance_id, + vm_state=vm_states.ACTIVE, + task_state=task_states.RESCUING) self._cast_compute_message('rescue_instance', context, instance_id) @scheduler_api.reroute_compute("unrescue") def unrescue(self, context, instance_id): """Unrescue the given instance.""" + self.update(context, + instance_id, + vm_state=vm_states.RESCUED, + task_state=task_states.UNRESCUING) self._cast_compute_message('unrescue_instance', context, instance_id) @scheduler_api.reroute_compute("set_admin_password") diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 6fcb3786c..0477db745 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -56,6 +56,8 @@ from nova import rpc from nova import utils from nova import volume from nova.compute import power_state +from nova.compute import task_states +from nova.compute import vm_states from nova.notifier import api as notifier from nova.compute.utils import terminate_volumes from nova.virt import driver @@ -146,6 +148,10 @@ class ComputeManager(manager.SchedulerDependentManager): super(ComputeManager, self).__init__(service_name="compute", *args, **kwargs) + def _instance_update(self, context, instance_id, **kwargs): + """Update an instance in the database using kwargs as value.""" + return self.db.instance_update(context, instance_id, kwargs) + def init_host(self): """Initialization for a standalone compute service.""" self.driver.init_host(host=self.host) @@ -153,8 +159,8 @@ class ComputeManager(manager.SchedulerDependentManager): instances = self.db.instance_get_all_by_host(context, self.host) for instance in instances: inst_name = instance['name'] - db_state = instance['state'] - drv_state = self._update_state(context, instance['id']) + db_state = instance['power_state'] + drv_state = self._get_power_state(context, instance) expect_running = db_state == power_state.RUNNING \ and drv_state != db_state @@ -177,29 +183,13 @@ class ComputeManager(manager.SchedulerDependentManager): LOG.warning(_('Hypervisor driver does not ' 'support firewall rules')) - def _update_state(self, context, instance_id, state=None): - """Update the state of an instance from the driver info.""" - instance_ref = self.db.instance_get(context, instance_id) - - if state is None: - try: - LOG.debug(_('Checking state of %s'), instance_ref['name']) - info = self.driver.get_info(instance_ref['name']) - except exception.NotFound: - info = None - - if info is not None: - state = info['state'] - else: - state = power_state.FAILED - - self.db.instance_set_state(context, instance_id, state) - return state - - def _update_launched_at(self, context, instance_id, launched_at=None): - """Update the launched_at parameter of the given instance.""" - data = {'launched_at': launched_at or utils.utcnow()} - self.db.instance_update(context, instance_id, data) + def _get_power_state(self, context, instance): + """Retrieve the power state for the given instance.""" + LOG.debug(_('Checking state of %s'), instance['name']) + try: + return self.driver.get_info(instance['name'])["state"] + except exception.NotFound: + return power_state.FAILED def get_console_topic(self, context, **kwargs): """Retrieves the console host for a project on this host. @@ -251,11 +241,6 @@ class ComputeManager(manager.SchedulerDependentManager): def _setup_block_device_mapping(self, context, instance_id): """setup volumes for block device mapping""" - self.db.instance_set_state(context, - instance_id, - power_state.NOSTATE, - 'block_device_mapping') - volume_api = volume.API() block_device_mapping = [] swap = None @@ -389,17 +374,12 @@ class ComputeManager(manager.SchedulerDependentManager): updates = {} updates['host'] = self.host updates['launched_on'] = self.host - instance = self.db.instance_update(context, - instance_id, - updates) + updates['vm_state'] = vm_states.BUILDING + updates['task_state'] = task_states.NETWORKING + instance = self.db.instance_update(context, instance_id, updates) instance['injected_files'] = kwargs.get('injected_files', []) instance['admin_pass'] = kwargs.get('admin_password', None) - self.db.instance_set_state(context, - instance_id, - power_state.NOSTATE, - 'networking') - is_vpn = instance['image_ref'] == str(FLAGS.vpn_image_id) try: # NOTE(vish): This could be a cast because we don't do anything @@ -418,6 +398,11 @@ class ComputeManager(manager.SchedulerDependentManager): # all vif creation and network injection, maybe this is correct network_info = [] + self._instance_update(context, + instance_id, + vm_state=vm_states.BUILDING, + task_state=task_states.BLOCK_DEVICE_MAPPING) + (swap, ephemerals, block_device_mapping) = self._setup_block_device_mapping( context, instance_id) @@ -427,9 +412,12 @@ class ComputeManager(manager.SchedulerDependentManager): 'ephemerals': ephemerals, 'block_device_mapping': block_device_mapping} - # TODO(vish) check to make sure the availability zone matches - self._update_state(context, instance_id, power_state.BUILDING) + self._instance_update(context, + instance_id, + vm_state=vm_states.BUILDING, + task_state=task_states.SPAWNING) + # TODO(vish) check to make sure the availability zone matches try: self.driver.spawn(context, instance, network_info, block_device_info) @@ -438,13 +426,21 @@ class ComputeManager(manager.SchedulerDependentManager): "virtualization enabled in the BIOS? Details: " "%(ex)s") % locals() LOG.exception(msg) + return + + current_power_state = self._get_power_state(context, instance) + self._instance_update(context, + instance_id, + power_state=current_power_state, + vm_state=vm_states.ACTIVE, + task_state=None, + launched_at=utils.utcnow()) - self._update_launched_at(context, instance_id) - self._update_state(context, instance_id) usage_info = utils.usage_from_instance(instance) notifier.notify('compute.%s' % self.host, 'compute.instance.create', notifier.INFO, usage_info) + except exception.InstanceNotFound: # FIXME(wwolf): We are just ignoring InstanceNotFound # exceptions here in case the instance was immediately @@ -480,8 +476,7 @@ class ComputeManager(manager.SchedulerDependentManager): for volume in volumes: self._detach_volume(context, instance_id, volume['id'], False) - if (instance['state'] == power_state.SHUTOFF and - instance['state_description'] != 'stopped'): + if instance['power_state'] == power_state.SHUTOFF: self.db.instance_destroy(context, instance_id) raise exception.Error(_('trying to destroy already destroyed' ' instance: %s') % instance_id) @@ -496,9 +491,14 @@ class ComputeManager(manager.SchedulerDependentManager): """Terminate an instance on this host.""" self._shutdown_instance(context, instance_id, 'Terminating') instance = self.db.instance_get(context.elevated(), instance_id) + self._instance_update(context, + instance_id, + vm_state=vm_states.DELETED, + task_state=None, + terminated_at=utils.utcnow()) - # TODO(ja): should we keep it in a terminated state for a bit? self.db.instance_destroy(context, instance_id) + usage_info = utils.usage_from_instance(instance) notifier.notify('compute.%s' % self.host, 'compute.instance.delete', @@ -509,7 +509,10 @@ class ComputeManager(manager.SchedulerDependentManager): def stop_instance(self, context, instance_id): """Stopping an instance on this host.""" self._shutdown_instance(context, instance_id, 'Stopping') - # instance state will be updated to stopped by _poll_instance_states() + self._instance_update(context, + instance_id, + vm_state=vm_states.STOPPED, + task_state=None) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) @checks_instance_lock @@ -529,26 +532,46 @@ class ComputeManager(manager.SchedulerDependentManager): instance_ref = self.db.instance_get(context, instance_id) LOG.audit(_("Rebuilding instance %s"), instance_id, context=context) - self._update_state(context, instance_id, power_state.BUILDING) + current_power_state = self._get_power_state(context, instance_ref) + self._instance_update(context, + instance_id, + power_state=current_power_state, + vm_state=vm_states.REBUILDING, + task_state=None) network_info = self._get_instance_nw_info(context, instance_ref) - self.driver.destroy(instance_ref, network_info) + + self._instance_update(context, + instance_id, + vm_state=vm_states.REBUILDING, + task_state=task_states.BLOCK_DEVICE_MAPPING) + instance_ref.injected_files = kwargs.get('injected_files', []) network_info = self.network_api.get_instance_nw_info(context, instance_ref) bd_mapping = self._setup_block_device_mapping(context, instance_id) + self._instance_update(context, + instance_id, + vm_state=vm_states.REBUILDING, + task_state=task_states.SPAWNING) + # pull in new password here since the original password isn't in the db instance_ref.admin_pass = kwargs.get('new_pass', utils.generate_password(FLAGS.password_length)) self.driver.spawn(context, instance_ref, network_info, bd_mapping) - self._update_launched_at(context, instance_id) - self._update_state(context, instance_id) - usage_info = utils.usage_from_instance(instance_ref) + current_power_state = self._get_power_state(context, instance_ref) + self._instance_update(context, + instance_id, + power_state=current_power_state, + vm_state=vm_states.ACTIVE, + task_state=None, + launched_at=utils.utcnow()) + usage_info = utils.usage_from_instance(instance_ref) notifier.notify('compute.%s' % self.host, 'compute.instance.rebuild', notifier.INFO, @@ -558,26 +581,34 @@ class ComputeManager(manager.SchedulerDependentManager): @checks_instance_lock def reboot_instance(self, context, instance_id): """Reboot an instance on this host.""" + LOG.audit(_("Rebooting instance %s"), instance_id, context=context) context = context.elevated() - self._update_state(context, instance_id) instance_ref = self.db.instance_get(context, instance_id) - LOG.audit(_("Rebooting instance %s"), instance_id, context=context) - if instance_ref['state'] != power_state.RUNNING: - state = instance_ref['state'] + current_power_state = self._get_power_state(context, instance_ref) + self._instance_update(context, + instance_id, + power_state=current_power_state, + vm_state=vm_states.ACTIVE, + task_state=task_states.REBOOTING) + + if instance_ref['power_state'] != power_state.RUNNING: + state = instance_ref['power_state'] running = power_state.RUNNING LOG.warn(_('trying to reboot a non-running ' 'instance: %(instance_id)s (state: %(state)s ' 'expected: %(running)s)') % locals(), context=context) - self.db.instance_set_state(context, - instance_id, - power_state.NOSTATE, - 'rebooting') network_info = self._get_instance_nw_info(context, instance_ref) self.driver.reboot(instance_ref, network_info) - self._update_state(context, instance_id) + + current_power_state = self._get_power_state(context, instance_ref) + self._instance_update(context, + instance_id, + power_state=current_power_state, + vm_state=vm_states.ACTIVE, + task_state=None) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) def snapshot_instance(self, context, instance_id, image_id, @@ -593,37 +624,45 @@ class ComputeManager(manager.SchedulerDependentManager): :param rotation: int representing how many backups to keep around; None if rotation shouldn't be used (as in the case of snapshots) """ + if image_type == "snapshot": + task_state = task_states.IMAGE_SNAPSHOT + elif image_type == "backup": + task_state = task_states.IMAGE_BACKUP + else: + raise Exception(_('Image type not recognized %s') % image_type) + context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) - #NOTE(sirp): update_state currently only refreshes the state field - # if we add is_snapshotting, we will need this refreshed too, - # potentially? - self._update_state(context, instance_id) + current_power_state = self._get_power_state(context, instance_ref) + self._instance_update(context, + instance_id, + power_state=current_power_state, + vm_state=vm_states.ACTIVE, + task_state=task_state) LOG.audit(_('instance %s: snapshotting'), instance_id, context=context) - if instance_ref['state'] != power_state.RUNNING: - state = instance_ref['state'] + + if instance_ref['power_state'] != power_state.RUNNING: + state = instance_ref['power_state'] running = power_state.RUNNING LOG.warn(_('trying to snapshot a non-running ' 'instance: %(instance_id)s (state: %(state)s ' 'expected: %(running)s)') % locals()) self.driver.snapshot(context, instance_ref, image_id) + self._instance_update(context, instance_id, task_state=None) + + if image_type == 'snapshot' and rotation: + raise exception.ImageRotationNotAllowed() + + elif image_type == 'backup' and rotation: + instance_uuid = instance_ref['uuid'] + self.rotate_backups(context, instance_uuid, backup_type, rotation) - if image_type == 'snapshot': - if rotation: - raise exception.ImageRotationNotAllowed() elif image_type == 'backup': - if rotation: - instance_uuid = instance_ref['uuid'] - self.rotate_backups(context, instance_uuid, backup_type, - rotation) - else: - raise exception.RotationRequiredForBackup() - else: - raise Exception(_('Image type not recognized %s') % image_type) + raise exception.RotationRequiredForBackup() def rotate_backups(self, context, instance_uuid, backup_type, rotation): """Delete excess backups associated to an instance. @@ -691,7 +730,7 @@ class ComputeManager(manager.SchedulerDependentManager): for i in xrange(max_tries): instance_ref = self.db.instance_get(context, instance_id) instance_id = instance_ref["id"] - instance_state = instance_ref["state"] + instance_state = instance_ref["power_state"] expected_state = power_state.RUNNING if instance_state != expected_state: @@ -726,7 +765,7 @@ class ComputeManager(manager.SchedulerDependentManager): context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) instance_id = instance_ref['id'] - instance_state = instance_ref['state'] + instance_state = instance_ref['power_state'] expected_state = power_state.RUNNING if instance_state != expected_state: LOG.warn(_('trying to inject a file into a non-running ' @@ -744,7 +783,7 @@ class ComputeManager(manager.SchedulerDependentManager): context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) instance_id = instance_ref['id'] - instance_state = instance_ref['state'] + instance_state = instance_ref['power_state'] expected_state = power_state.RUNNING if instance_state != expected_state: LOG.warn(_('trying to update agent on a non-running ' @@ -759,40 +798,41 @@ class ComputeManager(manager.SchedulerDependentManager): @checks_instance_lock def rescue_instance(self, context, instance_id): """Rescue an instance on this host.""" + LOG.audit(_('instance %s: rescuing'), instance_id, context=context) context = context.elevated() + instance_ref = self.db.instance_get(context, instance_id) - LOG.audit(_('instance %s: rescuing'), instance_id, context=context) - self.db.instance_set_state(context, - instance_id, - power_state.NOSTATE, - 'rescuing') - _update_state = lambda result: self._update_state_callback( - self, context, instance_id, result) network_info = self._get_instance_nw_info(context, instance_ref) - self.driver.rescue(context, instance_ref, _update_state, network_info) - self._update_state(context, instance_id) + + # NOTE(blamar): None of the virt drivers use the 'callback' param + self.driver.rescue(context, instance_ref, None, network_info) + + current_power_state = self._get_power_state(context, instance_ref) + self._instance_update(context, + instance_id, + vm_state=vm_states.RESCUED, + task_state=None, + power_state=current_power_state) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) @checks_instance_lock def unrescue_instance(self, context, instance_id): """Rescue an instance on this host.""" + LOG.audit(_('instance %s: unrescuing'), instance_id, context=context) context = context.elevated() + instance_ref = self.db.instance_get(context, instance_id) - LOG.audit(_('instance %s: unrescuing'), instance_id, context=context) - self.db.instance_set_state(context, - instance_id, - power_state.NOSTATE, - 'unrescuing') - _update_state = lambda result: self._update_state_callback( - self, context, instance_id, result) network_info = self._get_instance_nw_info(context, instance_ref) - self.driver.unrescue(instance_ref, _update_state, network_info) - self._update_state(context, instance_id) - @staticmethod - def _update_state_callback(self, context, instance_id, result): - """Update instance state when async task completes.""" - self._update_state(context, instance_id) + # NOTE(blamar): None of the virt drivers use the 'callback' param + self.driver.unrescue(instance_ref, None, network_info) + + current_power_state = self._get_power_state(context, instance_ref) + self._instance_update(context, + instance_id, + vm_state=vm_states.ACTIVE, + task_state=None, + power_state=current_power_state) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) @checks_instance_lock @@ -851,11 +891,12 @@ class ComputeManager(manager.SchedulerDependentManager): # Just roll back the record. There's no need to resize down since # the 'old' VM already has the preferred attributes - self.db.instance_update(context, instance_ref['uuid'], - dict(memory_mb=instance_type['memory_mb'], - vcpus=instance_type['vcpus'], - local_gb=instance_type['local_gb'], - instance_type_id=instance_type['id'])) + self._instance_update(context, + instance_ref["uuid"], + memory_mb=instance_type['memory_mb'], + vcpus=instance_type['vcpus'], + local_gb=instance_type['local_gb'], + instance_type_id=instance_type['id']) self.driver.revert_migration(instance_ref) self.db.migration_update(context, migration_id, @@ -882,8 +923,11 @@ class ComputeManager(manager.SchedulerDependentManager): instance_ref = self.db.instance_get_by_uuid(context, instance_id) if instance_ref['host'] == FLAGS.host: - raise exception.Error(_( - 'Migration error: destination same as source!')) + self._instance_update(context, + instance_id, + vm_state=vm_states.ERROR) + msg = _('Migration error: destination same as source!') + raise exception.Error(msg) old_instance_type = self.db.instance_type_get(context, instance_ref['instance_type_id']) @@ -977,6 +1021,11 @@ class ComputeManager(manager.SchedulerDependentManager): self.driver.finish_migration(context, instance_ref, disk_info, network_info, resize_instance) + self._instance_update(context, + instance_id, + vm_state=vm_states.ACTIVE, + task_state=task_states.RESIZE_VERIFY) + self.db.migration_update(context, migration_id, {'status': 'finished', }) @@ -1008,35 +1057,35 @@ class ComputeManager(manager.SchedulerDependentManager): @checks_instance_lock def pause_instance(self, context, instance_id): """Pause an instance on this host.""" + LOG.audit(_('instance %s: pausing'), instance_id, context=context) context = context.elevated() + instance_ref = self.db.instance_get(context, instance_id) - LOG.audit(_('instance %s: pausing'), instance_id, context=context) - self.db.instance_set_state(context, - instance_id, - power_state.NOSTATE, - 'pausing') - self.driver.pause(instance_ref, - lambda result: self._update_state_callback(self, - context, - instance_id, - result)) + self.driver.pause(instance_ref, lambda result: None) + + current_power_state = self._get_power_state(context, instance_ref) + self._instance_update(context, + instance_id, + power_state=current_power_state, + vm_state=vm_states.PAUSED, + task_state=None) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) @checks_instance_lock def unpause_instance(self, context, instance_id): """Unpause a paused instance on this host.""" + LOG.audit(_('instance %s: unpausing'), instance_id, context=context) context = context.elevated() + instance_ref = self.db.instance_get(context, instance_id) - LOG.audit(_('instance %s: unpausing'), instance_id, context=context) - self.db.instance_set_state(context, - instance_id, - power_state.NOSTATE, - 'unpausing') - self.driver.unpause(instance_ref, - lambda result: self._update_state_callback(self, - context, - instance_id, - result)) + self.driver.unpause(instance_ref, lambda result: None) + + current_power_state = self._get_power_state(context, instance_ref) + self._instance_update(context, + instance_id, + power_state=current_power_state, + vm_state=vm_states.ACTIVE, + task_state=None) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) def host_power_action(self, context, host=None, action=None): @@ -1052,7 +1101,7 @@ class ComputeManager(manager.SchedulerDependentManager): def get_diagnostics(self, context, instance_id): """Retrieve diagnostics for an instance on this host.""" instance_ref = self.db.instance_get(context, instance_id) - if instance_ref["state"] == power_state.RUNNING: + if instance_ref["power_state"] == power_state.RUNNING: LOG.audit(_("instance %s: retrieving diagnostics"), instance_id, context=context) return self.driver.get_diagnostics(instance_ref) @@ -1061,33 +1110,35 @@ class ComputeManager(manager.SchedulerDependentManager): @checks_instance_lock def suspend_instance(self, context, instance_id): """Suspend the given instance.""" + LOG.audit(_('instance %s: suspending'), instance_id, context=context) context = context.elevated() + instance_ref = self.db.instance_get(context, instance_id) - LOG.audit(_('instance %s: suspending'), instance_id, context=context) - self.db.instance_set_state(context, instance_id, - power_state.NOSTATE, - 'suspending') - self.driver.suspend(instance_ref, - lambda result: self._update_state_callback(self, - context, - instance_id, - result)) + self.driver.suspend(instance_ref, lambda result: None) + + current_power_state = self._get_power_state(context, instance_ref) + self._instance_update(context, + instance_id, + power_state=current_power_state, + vm_state=vm_states.SUSPENDED, + task_state=None) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) @checks_instance_lock def resume_instance(self, context, instance_id): """Resume the given suspended instance.""" + LOG.audit(_('instance %s: resuming'), instance_id, context=context) context = context.elevated() + instance_ref = self.db.instance_get(context, instance_id) - LOG.audit(_('instance %s: resuming'), instance_id, context=context) - self.db.instance_set_state(context, instance_id, - power_state.NOSTATE, - 'resuming') - self.driver.resume(instance_ref, - lambda result: self._update_state_callback(self, - context, - instance_id, - result)) + self.driver.resume(instance_ref, lambda result: None) + + current_power_state = self._get_power_state(context, instance_ref) + self._instance_update(context, + instance_id, + power_state=current_power_state, + vm_state=vm_states.ACTIVE, + task_state=None) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) def lock_instance(self, context, instance_id): @@ -1498,11 +1549,14 @@ class ComputeManager(manager.SchedulerDependentManager): 'block_migration': block_migration}}) # Restore instance state - self.db.instance_update(ctxt, - instance_ref['id'], - {'state_description': 'running', - 'state': power_state.RUNNING, - 'host': dest}) + current_power_state = self._get_power_state(ctxt, instance_ref) + self._instance_update(ctxt, + instance_ref["id"], + host=dest, + power_state=current_power_state, + vm_state=vm_states.ACTIVE, + task_state=None) + # Restore volume state for volume_ref in instance_ref['volumes']: volume_id = volume_ref['id'] @@ -1548,11 +1602,11 @@ class ComputeManager(manager.SchedulerDependentManager): This param specifies destination host. """ host = instance_ref['host'] - self.db.instance_update(context, - instance_ref['id'], - {'state_description': 'running', - 'state': power_state.RUNNING, - 'host': host}) + self._instance_update(context, + instance_ref['id'], + host=host, + vm_state=vm_states.ACTIVE, + task_state=None) for volume_ref in instance_ref['volumes']: volume_id = volume_ref['id'] @@ -1600,10 +1654,9 @@ class ComputeManager(manager.SchedulerDependentManager): error_list.append(ex) try: - self._poll_instance_states(context) + self._sync_power_states(context) except Exception as ex: - LOG.warning(_("Error during instance poll: %s"), - unicode(ex)) + LOG.warning(_("Error during power_state sync: %s"), unicode(ex)) error_list.append(ex) return error_list @@ -1618,68 +1671,40 @@ class ComputeManager(manager.SchedulerDependentManager): self.update_service_capabilities( self.driver.get_host_stats(refresh=True)) - def _poll_instance_states(self, context): + def _sync_power_states(self, context): + """Align power states between the database and the hypervisor. + + The hypervisor is authoritative for the power_state data, so we + simply loop over all known instances for this host and update the + power_state according to the hypervisor. If the instance is not found + then it will be set to power_state.NOSTATE, because it doesn't exist + on the hypervisor. + + """ vm_instances = self.driver.list_instances_detail() vm_instances = dict((vm.name, vm) for vm in vm_instances) + db_instances = self.db.instance_get_all_by_host(context, self.host) - # Keep a list of VMs not in the DB, cross them off as we find them - vms_not_found_in_db = list(vm_instances.keys()) + num_vm_instances = len(vm_instances) + num_db_instances = len(db_instances) - db_instances = self.db.instance_get_all_by_host(context, self.host) + if num_vm_instances != num_db_instances: + LOG.info(_("Found %(num_db_instances)s in the database and " + "%(num_vm_instances)s on the hypervisor.") % locals()) for db_instance in db_instances: - name = db_instance['name'] - db_state = db_instance['state'] + name = db_instance["name"] + db_power_state = db_instance['power_state'] vm_instance = vm_instances.get(name) if vm_instance is None: - # NOTE(justinsb): We have to be very careful here, because a - # concurrent operation could be in progress (e.g. a spawn) - if db_state == power_state.BUILDING: - # TODO(justinsb): This does mean that if we crash during a - # spawn, the machine will never leave the spawning state, - # but this is just the way nova is; this function isn't - # trying to correct that problem. - # We could have a separate task to correct this error. - # TODO(justinsb): What happens during a live migration? - LOG.info(_("Found instance '%(name)s' in DB but no VM. " - "State=%(db_state)s, so assuming spawn is in " - "progress.") % locals()) - vm_state = db_state - else: - LOG.info(_("Found instance '%(name)s' in DB but no VM. " - "State=%(db_state)s, so setting state to " - "shutoff.") % locals()) - vm_state = power_state.SHUTOFF - if db_instance['state_description'] == 'stopping': - self.db.instance_stop(context, db_instance['id']) - continue + vm_power_state = power_state.NOSTATE else: - vm_state = vm_instance.state - vms_not_found_in_db.remove(name) - - if (db_instance['state_description'] in ['migrating', 'stopping']): - # A situation which db record exists, but no instance" - # sometimes occurs while live-migration at src compute, - # this case should be ignored. - LOG.debug(_("Ignoring %(name)s, as it's currently being " - "migrated.") % locals()) - continue - - if vm_state != db_state: - LOG.info(_("DB/VM state mismatch. Changing state from " - "'%(db_state)s' to '%(vm_state)s'") % locals()) - self._update_state(context, db_instance['id'], vm_state) + vm_power_state = vm_instance.state - # NOTE(justinsb): We no longer auto-remove SHUTOFF instances - # It's quite hard to get them back when we do. - - # Are there VMs not in the DB? - for vm_not_found_in_db in vms_not_found_in_db: - name = vm_not_found_in_db + if vm_power_state == db_power_state: + continue - # We only care about instances that compute *should* know about - if name.startswith("instance-"): - # TODO(justinsb): What to do here? Adopt it? Shut it down? - LOG.warning(_("Found VM not in DB: '%(name)s'. Ignoring") - % locals()) + self._instance_update(context, + db_instance["id"], + power_state=vm_power_state) diff --git a/nova/compute/task_states.py b/nova/compute/task_states.py new file mode 100644 index 000000000..e3315a542 --- /dev/null +++ b/nova/compute/task_states.py @@ -0,0 +1,59 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Possible task states for instances. + +Compute instance task states represent what is happening to the instance at the +current moment. These tasks can be generic, such as 'spawning', or specific, +such as 'block_device_mapping'. These task states allow for a better view into +what an instance is doing and should be displayed to users/administrators as +necessary. + +""" + +SCHEDULING = 'scheduling' +BLOCK_DEVICE_MAPPING = 'block_device_mapping' +NETWORKING = 'networking' +SPAWNING = 'spawning' + +IMAGE_SNAPSHOT = 'image_snapshot' +IMAGE_BACKUP = 'image_backup' + +UPDATING_PASSWORD = 'updating_password' + +RESIZE_PREP = 'resize_prep' +RESIZE_MIGRATING = 'resize_migrating' +RESIZE_MIGRATED = 'resize_migrated' +RESIZE_FINISH = 'resize_finish' +RESIZE_REVERTING = 'resize_reverting' +RESIZE_CONFIRMING = 'resize_confirming' +RESIZE_VERIFY = 'resize_verify' + +REBUILDING = 'rebuilding' + +REBOOTING = 'rebooting' +PAUSING = 'pausing' +UNPAUSING = 'unpausing' +SUSPENDING = 'suspending' +RESUMING = 'resuming' + +RESCUING = 'rescuing' +UNRESCUING = 'unrescuing' + +DELETING = 'deleting' +STOPPING = 'stopping' +STARTING = 'starting' diff --git a/nova/compute/vm_states.py b/nova/compute/vm_states.py new file mode 100644 index 000000000..6f16c1f09 --- /dev/null +++ b/nova/compute/vm_states.py @@ -0,0 +1,39 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Possible vm states for instances. + +Compute instance vm states represent the state of an instance as it pertains to +a user or administrator. When combined with task states (task_states.py), a +better picture can be formed regarding the instance's health. + +""" + +ACTIVE = 'active' +BUILDING = 'building' +REBUILDING = 'rebuilding' + +PAUSED = 'paused' +SUSPENDED = 'suspended' +RESCUED = 'rescued' +DELETED = 'deleted' +STOPPED = 'stopped' + +MIGRATING = 'migrating' +RESIZING = 'resizing' + +ERROR = 'error' |
