diff options
Diffstat (limited to 'nova/compute')
-rw-r--r-- | nova/compute/api.py | 71 | ||||
-rw-r--r-- | nova/compute/cells_api.py | 4 | ||||
-rw-r--r-- | nova/compute/flavors.py | 14 | ||||
-rwxr-xr-x | nova/compute/manager.py | 363 | ||||
-rw-r--r-- | nova/compute/rpcapi.py | 19 | ||||
-rw-r--r-- | nova/compute/utils.py | 3 |
6 files changed, 287 insertions, 187 deletions
diff --git a/nova/compute/api.py b/nova/compute/api.py index 1e24e8ce5..48d404148 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -110,10 +110,12 @@ RO_SECURITY_GROUPS = ['default'] SM_IMAGE_PROP_PREFIX = "image_" -def check_instance_state(vm_state=None, task_state=(None,)): +def check_instance_state(vm_state=None, task_state=(None,), + must_have_launched=True): """Decorator to check VM and/or task state before entry to API functions. - If the instance is in the wrong state, the wrapper will raise an exception. + If the instance is in the wrong state, or has not been sucessfully started + at least once the wrapper will raise an exception. """ if vm_state is not None and not isinstance(vm_state, set): @@ -137,6 +139,13 @@ def check_instance_state(vm_state=None, task_state=(None,)): instance_uuid=instance['uuid'], state=instance['task_state'], method=f.__name__) + if must_have_launched and not instance['launched_at']: + raise exception.InstanceInvalidState( + attr=None, + not_launched=True, + instance_uuid=instance['uuid'], + state=instance['vm_state'], + method=f.__name__) return f(self, context, instance, *args, **kw) return inner @@ -487,21 +496,20 @@ class API(base.Base): instance['uuid'], updates) return instance - def _check_config_drive(self, context, config_drive): - try: - bool_like = strutils.bool_from_string(config_drive, strict=True) - except ValueError: - bool_like = False - - if config_drive is None: - return None, None - elif bool_like: - return None, bool_like + def _check_config_drive(self, config_drive): + if config_drive: + try: + bool_val = strutils.bool_from_string(config_drive, + strict=True) + except ValueError: + raise exception.ConfigDriveInvalidValue(option=config_drive) else: - cd_image_service, config_drive_id = \ - glance.get_remote_image_service(context, config_drive) - cd_image_service.show(context, config_drive_id) - return config_drive_id, None + bool_val = False + # FIXME(comstud): Bug ID 1193438 filed for this. This looks silly, + # but this is because the config drive column is a String. False + # is represented by using an empty string. And for whatever + # reason, we rely on the DB to cast True to a String. + return True if bool_val else '' def _check_requested_image(self, context, image_id, image, instance_type): if not image: @@ -589,8 +597,7 @@ class API(base.Base): kernel_id, ramdisk_id = self._handle_kernel_and_ramdisk( context, kernel_id, ramdisk_id, image) - config_drive_id, config_drive = self._check_config_drive( - context, config_drive) + config_drive = self._check_config_drive(config_drive) if key_data is None and key_name: key_pair = self.db.key_pair_get(context, context.user_id, @@ -610,8 +617,7 @@ class API(base.Base): 'ramdisk_id': ramdisk_id or '', 'power_state': power_state.NOSTATE, 'vm_state': vm_states.BUILDING, - 'config_drive_id': config_drive_id or '', - 'config_drive': config_drive or '', + 'config_drive': config_drive, 'user_id': context.user_id, 'project_id': context.project_id, 'instance_type_id': instance_type['id'], @@ -1305,7 +1311,8 @@ class API(base.Base): # NOTE(maoy): we allow delete to be called no matter what vm_state says. @wrap_check_policy @check_instance_lock - @check_instance_state(vm_state=None, task_state=None) + @check_instance_state(vm_state=None, task_state=None, + must_have_launched=True) def soft_delete(self, context, instance): """Terminate an instance.""" LOG.debug(_('Going to try to soft delete instance'), @@ -1329,7 +1336,8 @@ class API(base.Base): @wrap_check_policy @check_instance_lock - @check_instance_state(vm_state=None, task_state=None) + @check_instance_state(vm_state=None, task_state=None, + must_have_launched=False) def delete(self, context, instance): """Terminate an instance.""" LOG.debug(_("Going to try to terminate instance"), instance=instance) @@ -1369,7 +1377,8 @@ class API(base.Base): @wrap_check_policy @check_instance_lock - @check_instance_state(vm_state=[vm_states.SOFT_DELETED]) + @check_instance_state(vm_state=[vm_states.SOFT_DELETED], + must_have_launched=False) def force_delete(self, context, instance): """Force delete a previously deleted (but not reclaimed) instance.""" self._delete_instance(context, instance) @@ -1790,7 +1799,8 @@ class API(base.Base): @wrap_check_policy @check_instance_lock @check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.STOPPED, - vm_states.PAUSED, vm_states.SUSPENDED], + vm_states.PAUSED, vm_states.SUSPENDED, + vm_states.ERROR], task_state=[None, task_states.REBOOTING, task_states.REBOOTING_HARD, task_states.RESUMING, @@ -1826,7 +1836,8 @@ class API(base.Base): @wrap_check_policy @check_instance_lock - @check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.STOPPED], + @check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.STOPPED, + vm_states.ERROR], task_state=[None]) def rebuild(self, context, instance, image_href, admin_password, **kwargs): """Rebuild the given instance with the provided attributes.""" @@ -2224,7 +2235,8 @@ class API(base.Base): @wrap_check_policy @check_instance_lock - @check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.STOPPED]) + @check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.STOPPED, + vm_states.ERROR]) def rescue(self, context, instance, rescue_password=None): """Rescue the given instance.""" @@ -3201,7 +3213,8 @@ class SecurityGroupAPI(base.Base, security_group_base.SecurityGroupBase): def trigger_rules_refresh(self, context, id): """Called when a rule is added to or removed from a security_group.""" - security_group = self.db.security_group_get(context, id) + security_group = self.db.security_group_get( + context, id, columns_to_join=['instances']) for instance in security_group['instances']: if instance['host'] is not None: @@ -3227,8 +3240,8 @@ class SecurityGroupAPI(base.Base, security_group_base.SecurityGroupBase): security_groups = set() for rule in security_group_rules: security_group = self.db.security_group_get( - context, - rule['parent_group_id']) + context, rule['parent_group_id'], + columns_to_join=['instances']) security_groups.add(security_group) # ..then we find the instances that are members of these groups.. diff --git a/nova/compute/cells_api.py b/nova/compute/cells_api.py index 6f1e12480..ddf959ef3 100644 --- a/nova/compute/cells_api.py +++ b/nova/compute/cells_api.py @@ -520,8 +520,6 @@ class ComputeCellsAPI(compute_api.API): """Attach an existing volume to an existing instance.""" if device and not block_device.match_device(device): raise exception.InvalidDevicePath(path=device) - device = self.compute_rpcapi.reserve_block_device_name( - context, device=device, instance=instance, volume_id=volume_id) try: volume = self.volume_api.get(context, volume_id) self.volume_api.check_attach(context, volume, instance=instance) @@ -529,7 +527,7 @@ class ComputeCellsAPI(compute_api.API): with excutils.save_and_reraise_exception(): self.db.block_device_mapping_destroy_by_instance_and_device( context, instance['uuid'], device) - self._cast_to_cells(context, instance, 'attach_volume', + return self._call_to_cells(context, instance, 'attach_volume', volume_id, device) @validate_cell diff --git a/nova/compute/flavors.py b/nova/compute/flavors.py index 59d5d5715..a18b375d8 100644 --- a/nova/compute/flavors.py +++ b/nova/compute/flavors.py @@ -95,20 +95,20 @@ def create(name, memory, vcpus, root_gb, ephemeral_gb=0, flavorid=None, # Some attributes are positive ( > 0) integers for option in ['memory_mb', 'vcpus']: try: + assert int(str(kwargs[option])) > 0 kwargs[option] = int(kwargs[option]) - assert kwargs[option] > 0 - except (ValueError, AssertionError): - msg = _("'%s' argument must be greater than 0") % option + except (ValueError, AssertionError, TypeError): + msg = _("'%s' argument must be a positive integer") % option raise exception.InvalidInput(reason=msg) # Some attributes are non-negative ( >= 0) integers for option in ['root_gb', 'ephemeral_gb', 'swap']: try: + assert int(str(kwargs[option])) >= 0 kwargs[option] = int(kwargs[option]) - assert kwargs[option] >= 0 - except (ValueError, AssertionError): - msg = _("'%s' argument must be greater than or equal" - " to 0") % option + except (ValueError, AssertionError, TypeError): + msg = _("'%s' argument must be an integer greater than or" + " equal to 0") % option raise exception.InvalidInput(reason=msg) # rxtx_factor should be a positive float diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 11ec70e44..6d18952bf 100755 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -106,6 +106,9 @@ compute_opts = [ default=False, help='Whether to start guests that were running before the ' 'host rebooted'), + cfg.IntOpt('network_allocate_retries', + default=0, + help="Number of times to retry network allocation on failures"), ] interval_opts = [ @@ -282,7 +285,8 @@ def object_compat(function): instance = kwargs['instance'] if isinstance(instance, dict): kwargs['instance'] = instance_obj.Instance._from_db_object( - instance_obj.Instance(), instance, expected_attrs=metas) + context, instance_obj.Instance(), instance, + expected_attrs=metas) kwargs['instance']._context = context return function(self, context, **kwargs) @@ -408,7 +412,8 @@ class ComputeManager(manager.SchedulerDependentManager): 'trying to set it to ERROR'), instance_uuid=instance_uuid) - def _get_instances_on_driver(self, context, filters=None): + def _get_instances_on_driver(self, context, filters=None, + columns_to_join=None): """Return a list of instance records for the instances found on the hypervisor which satisfy the specified filters. If filters=None return a list of instance records for all the instances found on the @@ -420,7 +425,7 @@ class ComputeManager(manager.SchedulerDependentManager): driver_uuids = self.driver.list_instance_uuids() filters['uuid'] = driver_uuids local_instances = self.conductor_api.instance_get_all_by_filters( - context, filters, columns_to_join=[]) + context, filters, columns_to_join=columns_to_join) return local_instances except NotImplementedError: pass @@ -429,7 +434,7 @@ class ComputeManager(manager.SchedulerDependentManager): # to brute force. driver_instances = self.driver.list_instances() instances = self.conductor_api.instance_get_all_by_filters( - context, filters, columns_to_join=[]) + context, filters, columns_to_join=columns_to_join) name_map = dict((instance['name'], instance) for instance in instances) local_instances = [] for driver_instance in driver_instances: @@ -495,7 +500,7 @@ class ComputeManager(manager.SchedulerDependentManager): 'assuming it\'s not on shared storage'), instance=instance) shared_storage = False - except Exception as e: + except Exception: LOG.exception(_('Failed to check if instance shared'), instance=instance) finally: @@ -643,7 +648,11 @@ class ComputeManager(manager.SchedulerDependentManager): def handle_events(self, event): if isinstance(event, virtevent.LifecycleEvent): - self.handle_lifecycle_event(event) + try: + self.handle_lifecycle_event(event) + except exception.InstanceNotFound: + LOG.debug(_("Event %s arrived for non-existent instance. The " + "instance was probably deleted.") % event) else: LOG.debug(_("Ignoring event %s") % event) @@ -878,7 +887,7 @@ class ComputeManager(manager.SchedulerDependentManager): extra_usage_info = {} - def notify(status, msg=None): + def notify(status, msg=None, **kwargs): """Send a create.{start,error,end} notification.""" type_ = "create.%(status)s" % dict(status=status) info = extra_usage_info.copy() @@ -886,7 +895,7 @@ class ComputeManager(manager.SchedulerDependentManager): msg = "" info['message'] = msg self._notify_about_instance_usage(context, instance, type_, - extra_usage_info=info) + extra_usage_info=info, **kwargs) try: image_meta = self._prebuild_instance(context, instance) @@ -895,10 +904,11 @@ class ComputeManager(manager.SchedulerDependentManager): notify("start") # notify that build is starting - instance = self._build_instance(context, request_spec, - filter_properties, requested_networks, injected_files, - admin_password, is_first_time, node, instance, image_meta) - notify("end", msg=_("Success")) # notify that build is done + instance, network_info = self._build_instance(context, + request_spec, filter_properties, requested_networks, + injected_files, admin_password, is_first_time, node, + instance, image_meta) + notify("end", msg=_("Success"), network_info=network_info) except exception.RescheduledException as e: # Instance build encountered an error, and has been rescheduled. @@ -985,15 +995,19 @@ class ComputeManager(manager.SchedulerDependentManager): set_access_ip=set_access_ip) except exception.InstanceNotFound: # the instance got deleted during the spawn - with excutils.save_and_reraise_exception(): - # Make sure the async call finishes - if network_info is not None: - network_info.wait(do_raise=False) - try: - self._deallocate_network(context, instance) - except Exception: - LOG.exception(_('Failed to dealloc network for ' - 'deleted instance'), instance=instance) + # Make sure the async call finishes + msg = _("Instance disappeared during build") + if network_info is not None: + network_info.wait(do_raise=False) + try: + self._deallocate_network(context, instance) + except Exception: + msg = _('Failed to dealloc network ' + 'for deleted instance') + LOG.exception(msg, instance=instance) + raise exception.BuildAbortException( + instance_uuid=instance['uuid'], + reason=msg) except exception.UnexpectedTaskStateError as e: exc_info = sys.exc_info() # Make sure the async call finishes @@ -1028,7 +1042,7 @@ class ComputeManager(manager.SchedulerDependentManager): raise exc_info[0], exc_info[1], exc_info[2] # spawn success - return instance + return instance, network_info def _log_original_error(self, exc_info, instance_uuid): type_, value, tb = exc_info @@ -1145,6 +1159,50 @@ class ComputeManager(manager.SchedulerDependentManager): expected_task_state=(task_states.SCHEDULING, None)) + def _allocate_network_async(self, context, instance, requested_networks, + macs, security_groups, is_vpn): + """Method used to allocate networks in the background. + + Broken out for testing. + """ + LOG.debug(_("Allocating IP information in the background."), + instance=instance) + retries = CONF.network_allocate_retries + if retries < 0: + LOG.warn(_("Treating negative config value (%(retries)s) for " + "'network_allocate_retries' as 0."), + {'retries': retries}) + attempts = retries > 1 and retries + 1 or 1 + retry_time = 1 + for attempt in range(1, attempts + 1): + try: + nwinfo = self.network_api.allocate_for_instance( + context, instance, vpn=is_vpn, + requested_networks=requested_networks, + macs=macs, + conductor_api=self.conductor_api, + security_groups=security_groups) + LOG.debug(_('Instance network_info: |%s|'), nwinfo, + instance=instance) + return nwinfo + except Exception: + exc_info = sys.exc_info() + log_info = {'attempt': attempt, + 'attempts': attempts} + if attempt == attempts: + LOG.exception(_('Instance failed network setup ' + 'after %(attempts)d attempt(s)'), + log_info) + raise exc_info[0], exc_info[1], exc_info[2] + LOG.warn(_('Instance failed network setup ' + '(attempt %(attempt)d of %(attempts)d)'), + log_info, instance=instance) + time.sleep(retry_time) + retry_time *= 2 + if retry_time > 30: + retry_time = 30 + # Not reached. + def _allocate_network(self, context, instance, requested_networks, macs, security_groups): """Start network allocation asynchronously. Return an instance @@ -1159,25 +1217,9 @@ class ComputeManager(manager.SchedulerDependentManager): task_state=task_states.NETWORKING, expected_task_state=None) is_vpn = pipelib.is_vpn_image(instance['image_ref']) - - def async_alloc(): - LOG.debug(_("Allocating IP information in the background."), - instance=instance) - try: - nwinfo = self.network_api.allocate_for_instance( - context, instance, vpn=is_vpn, - requested_networks=requested_networks, - macs=macs, - conductor_api=self.conductor_api, - security_groups=security_groups) - except Exception: - with excutils.save_and_reraise_exception(): - LOG.exception(_('Instance failed network setup'), - instance=instance) - LOG.debug(_('Instance network_info: |%s|'), nwinfo, - instance=instance) - return nwinfo - return network_model.NetworkInfoAsyncWrapper(async_alloc) + return network_model.NetworkInfoAsyncWrapper( + self._allocate_network_async, context, instance, + requested_networks, macs, security_groups, is_vpn) def _prep_block_device(self, context, instance, bdms): """Set up the block device for an instance with error logging.""" @@ -1344,7 +1386,7 @@ class ComputeManager(manager.SchedulerDependentManager): # get network info before tearing down try: network_info = self._get_instance_nw_info(context, instance) - except exception.NetworkNotFound: + except (exception.NetworkNotFound, exception.NoMoreFixedIps): network_info = network_model.NetworkInfo() # NOTE(vish) get bdms before destroying the instance @@ -1446,7 +1488,7 @@ class ComputeManager(manager.SchedulerDependentManager): vm_state=vm_states.DELETED, task_state=None, terminated_at=timeutils.utcnow()) - system_meta = utils.metadata_to_dict(instance['system_metadata']) + system_meta = utils.instance_sys_meta(instance) self.conductor_api.instance_destroy(context, instance) except Exception: with excutils.save_and_reraise_exception(): @@ -1538,7 +1580,14 @@ class ComputeManager(manager.SchedulerDependentManager): def start_instance(self, context, instance): """Starting an instance on this host.""" self._notify_about_instance_usage(context, instance, "power_on.start") - self.driver.power_on(instance) + + network_info = self._get_instance_nw_info(context, instance) + block_device_info = self._get_instance_volume_block_device_info( + context, instance) + self.driver.power_on(context, instance, + self._legacy_nw_info(network_info), + block_device_info) + current_power_state = self._get_power_state(context, instance) instance.power_state = current_power_state instance.vm_state = vm_states.ACTIVE @@ -1593,7 +1642,12 @@ class ComputeManager(manager.SchedulerDependentManager): except NotImplementedError: # Fallback to just powering on the instance if the hypervisor # doesn't implement the restore method - self.driver.power_on(instance) + network_info = self._get_instance_nw_info(context, instance) + block_device_info = self._get_instance_volume_block_device_info( + context, instance) + self.driver.power_on(context, instance, + self._legacy_nw_info(network_info), + block_device_info) current_power_state = self._get_power_state(context, instance) instance = self._instance_update(context, instance['uuid'], power_state=current_power_state, @@ -1900,53 +1954,70 @@ class ComputeManager(manager.SchedulerDependentManager): context = context.elevated() current_power_state = self._get_power_state(context, instance) - instance = self._instance_update(context, instance['uuid'], - power_state=current_power_state) - - LOG.audit(_('instance snapshotting'), context=context, + try: + instance = self._instance_update(context, instance['uuid'], + power_state=current_power_state) + LOG.audit(_('instance snapshotting'), context=context, instance=instance) - if instance['power_state'] != power_state.RUNNING: - state = instance['power_state'] - running = power_state.RUNNING - LOG.warn(_('trying to snapshot a non-running instance: ' + if instance['power_state'] != power_state.RUNNING: + state = instance['power_state'] + running = power_state.RUNNING + LOG.warn(_('trying to snapshot a non-running instance: ' '(state: %(state)s expected: %(running)s)'), {'state': state, 'running': running}, instance=instance) - self._notify_about_instance_usage( + self._notify_about_instance_usage( context, instance, "snapshot.start") - if image_type == 'snapshot': - expected_task_state = task_states.IMAGE_SNAPSHOT + if image_type == 'snapshot': + expected_task_state = task_states.IMAGE_SNAPSHOT - elif image_type == 'backup': - expected_task_state = task_states.IMAGE_BACKUP + elif image_type == 'backup': + expected_task_state = task_states.IMAGE_BACKUP - def update_task_state(task_state, expected_state=expected_task_state): - return self._instance_update(context, instance['uuid'], - task_state=task_state, - expected_task_state=expected_state) + def update_task_state(task_state, + expected_state=expected_task_state): + return self._instance_update(context, instance['uuid'], + task_state=task_state, + expected_task_state=expected_state + ) - self.driver.snapshot(context, instance, image_id, update_task_state) - # The instance could have changed from the driver. But since - # we're doing a fresh update here, we'll grab the changes. + self.driver.snapshot(context, instance, image_id, + update_task_state) + # The instance could have changed from the driver. But since + # we're doing a fresh update here, we'll grab the changes. - instance = self._instance_update(context, instance['uuid'], - task_state=None, - expected_task_state=task_states.IMAGE_UPLOADING) + instance = self._instance_update(context, instance['uuid'], + task_state=None, + expected_task_state= + task_states.IMAGE_UPLOADING) - if image_type == 'snapshot' and rotation: - raise exception.ImageRotationNotAllowed() + if image_type == 'snapshot' and rotation: + raise exception.ImageRotationNotAllowed() - elif image_type == 'backup' and rotation >= 0: - self._rotate_backups(context, instance, backup_type, rotation) + elif image_type == 'backup' and rotation >= 0: + self._rotate_backups(context, instance, backup_type, rotation) - elif image_type == 'backup': - raise exception.RotationRequiredForBackup() + elif image_type == 'backup': + raise exception.RotationRequiredForBackup() - self._notify_about_instance_usage( - context, instance, "snapshot.end") + self._notify_about_instance_usage(context, instance, + "snapshot.end") + + except exception.InstanceNotFound: + # the instance got deleted during the snapshot + # Quickly bail out of here + msg = _("Instance disappeared during snapshot") + LOG.debug(msg, instance=instance) + except exception.UnexpectedTaskStateError as e: + actual_task_state = e.kwargs.get('actual', None) + if actual_task_state == 'deleting': + msg = _('Instance was deleted during snapshot.') + LOG.debug(msg, instance=instance) + else: + raise @wrap_instance_fault def _rotate_backups(self, context, instance, backup_type, rotation): @@ -2071,7 +2142,7 @@ class ComputeManager(manager.SchedulerDependentManager): def _get_rescue_image_ref(self, context, instance): """Determine what image should be used to boot the rescue VM.""" - system_meta = utils.metadata_to_dict(instance['system_metadata']) + system_meta = utils.instance_sys_meta(instance) rescue_image_ref = system_meta.get('image_base_image_ref') @@ -2173,7 +2244,7 @@ class ComputeManager(manager.SchedulerDependentManager): Returns the updated system_metadata as a dict, as well as the post-cleanup current instance type. """ - sys_meta = utils.metadata_to_dict(instance['system_metadata']) + sys_meta = utils.instance_sys_meta(instance) if restore_old: instance_type = flavors.extract_flavor(instance, 'old_') sys_meta = flavors.save_flavor_info(sys_meta, instance_type) @@ -2239,7 +2310,8 @@ class ComputeManager(manager.SchedulerDependentManager): instance = self._instance_update(context, instance['uuid'], vm_state=vm_state, task_state=None, - expected_task_state=None) + expected_task_state=[None, + task_states.DELETING]) self._notify_about_instance_usage( context, instance, "resize.confirm.end", @@ -2426,9 +2498,8 @@ class ComputeManager(manager.SchedulerDependentManager): # NOTE(danms): Stash the new instance_type to avoid having to # look it up in the database later - sys_meta = utils.metadata_to_dict(instance['system_metadata']) - flavors.save_flavor_info(sys_meta, instance_type, - prefix='new_') + sys_meta = utils.instance_sys_meta(instance) + flavors.save_flavor_info(sys_meta, instance_type, prefix='new_') # NOTE(mriedem): Stash the old vm_state so we can set the # resized/reverted instance back to the same state later. vm_state = instance['vm_state'] @@ -2598,7 +2669,7 @@ class ComputeManager(manager.SchedulerDependentManager): old_instance_type_id = migration['old_instance_type_id'] new_instance_type_id = migration['new_instance_type_id'] old_instance_type = flavors.extract_flavor(instance) - sys_meta = utils.metadata_to_dict(instance['system_metadata']) + sys_meta = utils.instance_sys_meta(instance) # NOTE(mriedem): Get the old_vm_state so we know if we should # power on the instance. If old_vm_sate is not set we need to default # to ACTIVE for backwards compatibility @@ -3613,20 +3684,20 @@ class ComputeManager(manager.SchedulerDependentManager): while not instance or instance['host'] != self.host: if instance_uuids: try: - instance = self.conductor_api.instance_get_by_uuid(context, - instance_uuids.pop(0)) + instance = instance_obj.Instance.get_by_uuid( + context, instance_uuids.pop(0)) except exception.InstanceNotFound: # Instance is gone. Try to grab another. continue else: # No more in our copy of uuids. Pull from the DB. - db_instances = self.conductor_api.instance_get_all_by_host( - context, self.host, columns_to_join=[]) + db_instances = instance_obj.InstanceList.get_by_host( + context, self.host, expected_attrs=[]) if not db_instances: # None.. just return. return - instance = db_instances.pop(0) - instance_uuids = [inst['uuid'] for inst in db_instances] + instance = db_instances[0] + instance_uuids = [inst['uuid'] for inst in db_instances[1:]] self._instance_uuids_to_heal = instance_uuids # We have an instance now and it's ours @@ -3675,62 +3746,64 @@ class ComputeManager(manager.SchedulerDependentManager): @periodic_task.periodic_task def _poll_unconfirmed_resizes(self, context): - if CONF.resize_confirm_window > 0: - capi = self.conductor_api - migrations = capi.migration_get_unconfirmed_by_dest_compute( - context, CONF.resize_confirm_window, self.host) - - migrations_info = dict(migration_count=len(migrations), - confirm_window=CONF.resize_confirm_window) - - if migrations_info["migration_count"] > 0: - LOG.info(_("Found %(migration_count)d unconfirmed migrations " - "older than %(confirm_window)d seconds"), - migrations_info) - - def _set_migration_to_error(migration, reason, **kwargs): - LOG.warn(_("Setting migration %(migration_id)s to error: " - "%(reason)s"), - {'migration_id': migration['id'], 'reason': reason}, - **kwargs) - self.conductor_api.migration_update(context, migration, - 'error') - - for migration in migrations: - instance_uuid = migration['instance_uuid'] - LOG.info(_("Automatically confirming migration " - "%(migration_id)s for instance %(instance_uuid)s"), - {'migration_id': migration['id'], - 'instance_uuid': instance_uuid}) - try: - instance = self.conductor_api.instance_get_by_uuid( - context, instance_uuid) - except exception.InstanceNotFound: - reason = (_("Instance %s not found") % - instance_uuid) - _set_migration_to_error(migration, reason) - continue - if instance['vm_state'] == vm_states.ERROR: - reason = _("In ERROR state") - _set_migration_to_error(migration, reason, - instance=instance) - continue - vm_state = instance['vm_state'] - task_state = instance['task_state'] - if vm_state != vm_states.RESIZED or task_state is not None: - reason = (_("In states %(vm_state)s/%(task_state)s, not " - "RESIZED/None") % - {'vm_state': vm_state, - 'task_state': task_state}) - _set_migration_to_error(migration, reason, - instance=instance) - continue - try: - self.conductor_api.compute_confirm_resize( - context, instance, migration_ref=migration) - except Exception as e: - LOG.error(_("Error auto-confirming resize: %s. " - "Will retry later.") % e, instance=instance) + if CONF.resize_confirm_window == 0: + return + + capi = self.conductor_api + migrations = capi.migration_get_unconfirmed_by_dest_compute( + context, CONF.resize_confirm_window, self.host) + + migrations_info = dict(migration_count=len(migrations), + confirm_window=CONF.resize_confirm_window) + + if migrations_info["migration_count"] > 0: + LOG.info(_("Found %(migration_count)d unconfirmed migrations " + "older than %(confirm_window)d seconds"), + migrations_info) + + def _set_migration_to_error(migration, reason, **kwargs): + LOG.warn(_("Setting migration %(migration_id)s to error: " + "%(reason)s"), + {'migration_id': migration['id'], 'reason': reason}, + **kwargs) + self.conductor_api.migration_update(context, migration, + 'error') + + for migration in migrations: + instance_uuid = migration['instance_uuid'] + LOG.info(_("Automatically confirming migration " + "%(migration_id)s for instance %(instance_uuid)s"), + {'migration_id': migration['id'], + 'instance_uuid': instance_uuid}) + try: + instance = instance_obj.Instance.get_by_uuid(context, + instance_uuid) + except exception.InstanceNotFound: + reason = (_("Instance %s not found") % + instance_uuid) + _set_migration_to_error(migration, reason) + continue + if instance['vm_state'] == vm_states.ERROR: + reason = _("In ERROR state") + _set_migration_to_error(migration, reason, + instance=instance) + continue + vm_state = instance['vm_state'] + task_state = instance['task_state'] + if vm_state != vm_states.RESIZED or task_state is not None: + reason = (_("In states %(vm_state)s/%(task_state)s, not " + "RESIZED/None") % + {'vm_state': vm_state, + 'task_state': task_state}) + _set_migration_to_error(migration, reason, + instance=instance) + continue + try: + self.conductor_api.compute_confirm_resize( + context, instance, migration_ref=migration) + except Exception as e: + LOG.error(_("Error auto-confirming resize: %s. " + "Will retry later.") % e, instance=instance) @periodic_task.periodic_task def _instance_usage_audit(self, context): diff --git a/nova/compute/rpcapi.py b/nova/compute/rpcapi.py index 98e90cf24..30db6010f 100644 --- a/nova/compute/rpcapi.py +++ b/nova/compute/rpcapi.py @@ -36,6 +36,11 @@ rpcapi_opts = [ CONF = cfg.CONF CONF.register_opts(rpcapi_opts) +rpcapi_cap_opt = cfg.StrOpt('compute', + default=None, + help='Set a version cap for messages sent to compute services') +CONF.register_opt(rpcapi_cap_opt, 'upgrade_levels') + def _compute_topic(topic, ctxt, host, instance): '''Get the topic to use for a message. @@ -167,6 +172,11 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy): vnc on the correct port 2.27 - Adds 'reservations' to terminate_instance() and soft_delete_instance() + + ... Grizzly supports message version 2.27. So, any changes to existing + methods in 2.x after that point should be done such that they can + handle the version_cap being set to 2.27. + 2.28 - Adds check_instance_shared_storage() 2.29 - Made start_instance() and stop_instance() take new-world instance objects @@ -182,11 +192,18 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy): # BASE_RPC_API_VERSION = '2.0' + VERSION_ALIASES = { + 'grizzly': '2.27', + } + def __init__(self): + version_cap = self.VERSION_ALIASES.get(CONF.upgrade_levels.compute, + CONF.upgrade_levels.compute) super(ComputeAPI, self).__init__( topic=CONF.compute_topic, default_version=self.BASE_RPC_API_VERSION, - serializer=objects_base.NovaObjectSerializer()) + serializer=objects_base.NovaObjectSerializer(), + version_cap=version_cap) def add_aggregate_host(self, ctxt, aggregate, host_param, host, slave_info=None): diff --git a/nova/compute/utils.py b/nova/compute/utils.py index 9637d8773..3db4a14f2 100644 --- a/nova/compute/utils.py +++ b/nova/compute/utils.py @@ -203,8 +203,7 @@ def notify_usage_exists(context, instance_ref, current_period=False, ignore_missing_network_data) if system_metadata is None: - system_metadata = utils.metadata_to_dict( - instance_ref['system_metadata']) + system_metadata = utils.instance_sys_meta(instance_ref) # add image metadata to the notification: image_meta = notifications.image_meta(system_metadata) |