diff options
| author | Todd Willey <todd@ansolabs.com> | 2011-05-26 11:30:07 -0400 |
|---|---|---|
| committer | Todd Willey <todd@ansolabs.com> | 2011-05-26 11:30:07 -0400 |
| commit | eb90339f094da74eafcc6555907e247e25a8902b (patch) | |
| tree | e5ac40e24ead1273497bca3dba6006f51f5a8d73 /nova/compute | |
| parent | 2278f2886d369af285f914a7b5883d3a7a5727cc (diff) | |
| parent | db18a792414240cbdb1221d0e79e8a63313f103e (diff) | |
Merge trunk.
Diffstat (limited to 'nova/compute')
| -rw-r--r-- | nova/compute/api.py | 266 | ||||
| -rw-r--r-- | nova/compute/instance_types.py | 111 | ||||
| -rw-r--r-- | nova/compute/manager.py | 492 | ||||
| -rw-r--r-- | nova/compute/monitor.py | 4 | ||||
| -rw-r--r-- | nova/compute/power_state.py | 21 |
5 files changed, 532 insertions, 362 deletions
diff --git a/nova/compute/api.py b/nova/compute/api.py index bca6863d6..e9e8e10e7 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -16,11 +16,10 @@ # License for the specific language governing permissions and limitations # under the License. -""" -Handles all requests relating to instances (guest vms). -""" +"""Handles all requests relating to instances (guest vms).""" import datetime +import eventlet import re import time @@ -34,6 +33,7 @@ from nova import rpc from nova import utils from nova import volume from nova.compute import instance_types +from nova.compute import power_state from nova.scheduler import api as scheduler_api from nova.db import base @@ -43,6 +43,8 @@ LOG = logging.getLogger('nova.compute.api') FLAGS = flags.FLAGS flags.DECLARE('vncproxy_topic', 'nova.vnc') +flags.DEFINE_integer('find_host_timeout', 30, + 'Timeout after NN seconds when looking for a host.') def generate_default_hostname(instance_id): @@ -86,68 +88,78 @@ class API(base.Base): {"method": "get_network_topic", "args": {'fake': 1}}) def _check_injected_file_quota(self, context, injected_files): - """ - Enforce quota limits on injected files + """Enforce quota limits on injected files. + + Raises a QuotaError if any limit is exceeded. - Raises a QuotaError if any limit is exceeded """ if injected_files is None: return - limit = quota.allowed_injected_files(context) + limit = quota.allowed_injected_files(context, len(injected_files)) if len(injected_files) > limit: raise quota.QuotaError(code="OnsetFileLimitExceeded") path_limit = quota.allowed_injected_file_path_bytes(context) - content_limit = quota.allowed_injected_file_content_bytes(context) for path, content in injected_files: if len(path) > path_limit: raise quota.QuotaError(code="OnsetFilePathLimitExceeded") + content_limit = quota.allowed_injected_file_content_bytes( + context, len(content)) if len(content) > content_limit: raise quota.QuotaError(code="OnsetFileContentLimitExceeded") - def create(self, context, instance_type, - image_id, kernel_id=None, ramdisk_id=None, - min_count=1, max_count=1, - display_name='', display_description='', - key_name=None, key_data=None, security_group='default', - availability_zone=None, user_data=None, metadata=[], - injected_files=None): - """Create the number of instances requested if quota and - other arguments check out ok.""" - - type_data = instance_types.get_instance_type(instance_type) - num_instances = quota.allowed_instances(context, max_count, type_data) - if num_instances < min_count: - pid = context.project_id - LOG.warn(_("Quota exceeeded for %(pid)s," - " tried to run %(min_count)s instances") % locals()) - raise quota.QuotaError(_("Instance quota exceeded. You can only " - "run %s more instances of this type.") % - num_instances, "InstanceLimitExceeded") - + def _check_metadata_properties_quota(self, context, metadata={}): + """Enforce quota limits on metadata properties.""" num_metadata = len(metadata) quota_metadata = quota.allowed_metadata_items(context, num_metadata) if quota_metadata < num_metadata: pid = context.project_id - msg = (_("Quota exceeeded for %(pid)s," - " tried to set %(num_metadata)s metadata properties") - % locals()) + msg = _("Quota exceeeded for %(pid)s, tried to set " + "%(num_metadata)s metadata properties") % locals() LOG.warn(msg) raise quota.QuotaError(msg, "MetadataLimitExceeded") # Because metadata is stored in the DB, we hard-code the size limits # In future, we may support more variable length strings, so we act # as if this is quota-controlled for forwards compatibility - for metadata_item in metadata: - k = metadata_item['key'] - v = metadata_item['value'] + for k, v in metadata.iteritems(): if len(k) > 255 or len(v) > 255: pid = context.project_id - msg = (_("Quota exceeeded for %(pid)s," - " metadata property key or value too long") - % locals()) + msg = _("Quota exceeeded for %(pid)s, metadata property " + "key or value too long") % locals() LOG.warn(msg) raise quota.QuotaError(msg, "MetadataLimitExceeded") + def create(self, context, instance_type, + image_id, kernel_id=None, ramdisk_id=None, + min_count=1, max_count=1, + display_name='', display_description='', + key_name=None, key_data=None, security_group='default', + availability_zone=None, user_data=None, metadata={}, + injected_files=None, + admin_password=None): + """Create the number and type of instances requested. + + Verifies that quota and other arguments are valid. + + """ + if not instance_type: + instance_type = instance_types.get_default_instance_type() + + num_instances = quota.allowed_instances(context, max_count, + instance_type) + if num_instances < min_count: + pid = context.project_id + LOG.warn(_("Quota exceeeded for %(pid)s," + " tried to run %(min_count)s instances") % locals()) + if num_instances <= 0: + message = _("Instance quota exceeded. You cannot run any " + "more instances of this type.") + else: + message = _("Instance quota exceeded. You can only run %s " + "more instances of this type.") % num_instances + raise quota.QuotaError(message, "InstanceLimitExceeded") + + self._check_metadata_properties_quota(context, metadata) self._check_injected_file_quota(context, injected_files) image = self.image_service.show(context, image_id) @@ -201,10 +213,10 @@ class API(base.Base): 'user_id': context.user_id, 'project_id': context.project_id, 'launch_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()), - 'instance_type': instance_type, - 'memory_mb': type_data['memory_mb'], - 'vcpus': type_data['vcpus'], - 'local_gb': type_data['local_gb'], + 'instance_type_id': instance_type['id'], + 'memory_mb': instance_type['memory_mb'], + 'vcpus': instance_type['vcpus'], + 'local_gb': instance_type['local_gb'], 'display_name': display_name, 'display_description': display_description, 'user_data': user_data or '', @@ -235,7 +247,7 @@ class API(base.Base): # Set sane defaults if not specified updates = dict(hostname=self.hostname_factory(instance_id)) if (not hasattr(instance, 'display_name') or - instance.display_name == None): + instance.display_name is None): updates['display_name'] = "Server %s" % instance_id instance = self.update(context, instance_id, **updates) @@ -245,13 +257,21 @@ class API(base.Base): uid = context.user_id LOG.debug(_("Casting to scheduler for %(pid)s/%(uid)s's" " instance %(instance_id)s") % locals()) + + # NOTE(sandy): For now we're just going to pass in the + # instance_type record to the scheduler. In a later phase + # we'll be ripping this whole for-loop out and deferring the + # creation of the Instance record. At that point all this will + # change. rpc.cast(context, FLAGS.scheduler_topic, {"method": "run_instance", "args": {"topic": FLAGS.compute_topic, "instance_id": instance_id, + "instance_type": instance_type, "availability_zone": availability_zone, - "injected_files": injected_files}}) + "injected_files": injected_files, + "admin_password": admin_password}}) for group_id in security_groups: self.trigger_security_group_members_refresh(elevated, group_id) @@ -259,8 +279,7 @@ class API(base.Base): return [dict(x.iteritems()) for x in instances] def has_finished_migration(self, context, instance_id): - """Retrieves whether or not a finished migration exists for - an instance""" + """Returns true if an instance has a finished migration.""" try: db.migration_get_by_instance_and_status(context, instance_id, 'finished') @@ -269,8 +288,10 @@ class API(base.Base): return False def ensure_default_security_group(self, context): - """ Create security group for the security context if it - does not already exist + """Ensure that a context has a security group. + + Creates a security group for the security context if it does not + already exist. :param context: the security context @@ -286,7 +307,7 @@ class API(base.Base): db.security_group_create(context, values) def trigger_security_group_rules_refresh(self, context, security_group_id): - """Called when a rule is added to or removed from a security_group""" + """Called when a rule is added to or removed from a security_group.""" security_group = self.db.security_group_get(context, security_group_id) @@ -302,11 +323,12 @@ class API(base.Base): "args": {"security_group_id": security_group.id}}) def trigger_security_group_members_refresh(self, context, group_id): - """Called when a security group gains a new or loses a member + """Called when a security group gains a new or loses a member. Sends an update request to each compute node for whom this is - relevant.""" + relevant. + """ # First, we get the security group rules that reference this group as # the grantee.. security_group_rules = \ @@ -361,7 +383,7 @@ class API(base.Base): as data fields of the instance to be updated - :retval None + :returns: None """ rv = self.db.instance_update(context, instance_id, kwargs) @@ -369,6 +391,7 @@ class API(base.Base): @scheduler_api.reroute_compute("delete") def delete(self, context, instance_id): + """Terminate an instance.""" LOG.debug(_("Going to try to terminate %s"), instance_id) try: instance = self.get(context, instance_id) @@ -377,11 +400,15 @@ class API(base.Base): instance_id) raise - if (instance['state_description'] == 'terminating'): + if instance['state_description'] == 'terminating': LOG.warning(_("Instance %s is already being terminated"), instance_id) return + if instance['state_description'] == 'migrating': + LOG.warning(_("Instance %s is being migrated"), instance_id) + return + self.update(context, instance['id'], state_description='terminating', @@ -396,22 +423,28 @@ class API(base.Base): self.db.instance_destroy(context, instance_id) def get(self, context, instance_id): - """Get a single instance with the given ID.""" + """Get a single instance with the given instance_id.""" rv = self.db.instance_get(context, instance_id) return dict(rv.iteritems()) @scheduler_api.reroute_compute("get") def routing_get(self, context, instance_id): - """Use this method instead of get() if this is the only - operation you intend to to. It will route to novaclient.get - if the instance is not found.""" + """A version of get with special routing characteristics. + + Use this method instead of get() if this is the only operation you + intend to to. It will route to novaclient.get if the instance is not + found. + + """ return self.get(context, instance_id) def get_all(self, context, project_id=None, reservation_id=None, fixed_ip=None): - """Get all instances, possibly filtered by one of the - given parameters. If there is no filter and the context is - an admin, it will retreive all instances in the system. + """Get all instances filtered by one of the given parameters. + + If there is no filter and the context is an admin, it will retreive + all instances in the system. + """ if reservation_id is not None: return self.db.instance_get_all_by_reservation( @@ -440,7 +473,8 @@ class API(base.Base): :param params: Optional dictionary of arguments to be passed to the compute worker - :retval None + :returns: None + """ if not params: params = {} @@ -459,7 +493,7 @@ class API(base.Base): :param params: Optional dictionary of arguments to be passed to the compute worker - :retval: Result returned by compute worker + :returns: Result returned by compute worker """ if not params: params = {} @@ -472,13 +506,25 @@ class API(base.Base): return rpc.call(context, queue, kwargs) def _cast_scheduler_message(self, context, args): - """Generic handler for RPC calls to the scheduler""" + """Generic handler for RPC calls to the scheduler.""" rpc.cast(context, FLAGS.scheduler_topic, args) + def _find_host(self, context, instance_id): + """Find the host associated with an instance.""" + for attempts in xrange(FLAGS.find_host_timeout): + instance = self.get(context, instance_id) + host = instance["host"] + if host: + return host + time.sleep(1) + raise exception.Error(_("Unable to find host for Instance %s") + % instance_id) + def snapshot(self, context, instance_id, name): """Snapshot the given instance. - :retval: A dict containing image metadata + :returns: A dict containing image metadata + """ properties = {'instance_id': str(instance_id), 'user_id': str(context.user_id)} @@ -494,14 +540,41 @@ class API(base.Base): """Reboot the given instance.""" self._cast_compute_message('reboot_instance', context, instance_id) + def rebuild(self, context, instance_id, image_id, metadata=None, + files_to_inject=None): + """Rebuild the given instance with the provided metadata.""" + instance = db.api.instance_get(context, instance_id) + + if instance["state"] == power_state.BUILDING: + msg = _("Instance already building") + raise exception.BuildInProgress(msg) + + metadata = metadata or {} + self._check_metadata_properties_quota(context, metadata) + + files_to_inject = files_to_inject or [] + self._check_injected_file_quota(context, files_to_inject) + + self.db.instance_update(context, instance_id, {"metadata": metadata}) + + rebuild_params = { + "image_id": image_id, + "injected_files": files_to_inject, + } + + self._cast_compute_message('rebuild_instance', + context, + instance_id, + params=rebuild_params) + def revert_resize(self, context, instance_id): - """Reverts a resize, deleting the 'new' instance in the process""" + """Reverts a resize, deleting the 'new' instance in the process.""" context = context.elevated() migration_ref = self.db.migration_get_by_instance_and_status(context, instance_id, 'finished') if not migration_ref: - raise exception.NotFound(_("No finished migrations found for " - "instance")) + raise exception.MigrationNotFoundByStatus(instance_id=instance_id, + status='finished') params = {'migration_id': migration_ref['id']} self._cast_compute_message('revert_resize', context, instance_id, @@ -510,14 +583,13 @@ class API(base.Base): {'status': 'reverted'}) def confirm_resize(self, context, instance_id): - """Confirms a migration/resize, deleting the 'old' instance in the - process.""" + """Confirms a migration/resize and deletes the 'old' instance.""" context = context.elevated() migration_ref = self.db.migration_get_by_instance_and_status(context, instance_id, 'finished') if not migration_ref: - raise exception.NotFound(_("No finished migrations found for " - "instance")) + raise exception.MigrationNotFoundByStatus(instance_id=instance_id, + status='finished') instance_ref = self.db.instance_get(context, instance_id) params = {'migration_id': migration_ref['id']} self._cast_compute_message('confirm_resize', context, instance_id, @@ -531,8 +603,7 @@ class API(base.Base): def resize(self, context, instance_id, flavor_id): """Resize a running instance.""" instance = self.db.instance_get(context, instance_id) - current_instance_type = self.db.instance_type_get_by_name( - context, instance['instance_type']) + current_instance_type = instance['instance_type'] new_instance_type = self.db.instance_type_get_by_flavor_id( context, flavor_id) @@ -572,10 +643,9 @@ class API(base.Base): @scheduler_api.reroute_compute("diagnostics") def get_diagnostics(self, context, instance_id): """Retrieve diagnostics for the given instance.""" - return self._call_compute_message( - "get_diagnostics", - context, - instance_id) + return self._call_compute_message("get_diagnostics", + context, + instance_id) def get_actions(self, context, instance_id): """Retrieve actions for the given instance.""" @@ -583,12 +653,12 @@ class API(base.Base): @scheduler_api.reroute_compute("suspend") def suspend(self, context, instance_id): - """suspend the instance with instance_id""" + """Suspend the given instance.""" self._cast_compute_message('suspend_instance', context, instance_id) @scheduler_api.reroute_compute("resume") def resume(self, context, instance_id): - """resume the instance with instance_id""" + """Resume the given instance.""" self._cast_compute_message('resume_instance', context, instance_id) @scheduler_api.reroute_compute("rescue") @@ -603,15 +673,19 @@ class API(base.Base): def set_admin_password(self, context, instance_id, password=None): """Set the root/admin password for the given instance.""" - self._cast_compute_message('set_admin_password', context, instance_id, - password) + host = self._find_host(context, instance_id) + + rpc.cast(context, + self.db.queue_get_for(context, FLAGS.compute_topic, host), + {"method": "set_admin_password", + "args": {"instance_id": instance_id, "new_pass": password}}) def inject_file(self, context, instance_id): """Write a file to the given instance.""" self._cast_compute_message('inject_file', context, instance_id) def get_ajax_console(self, context, instance_id): - """Get a url to an AJAX Console""" + """Get a url to an AJAX Console.""" output = self._call_compute_message('get_ajax_console', context, instance_id) @@ -620,7 +694,7 @@ class API(base.Base): 'args': {'token': output['token'], 'host': output['host'], 'port': output['port']}}) return {'url': '%s/?token=%s' % (FLAGS.ajax_console_proxy_url, - output['token'])} + output['token'])} def get_vnc_console(self, context, instance_id): """Get a url to a VNC Console.""" @@ -642,39 +716,34 @@ class API(base.Base): 'portignore')} def get_console_output(self, context, instance_id): - """Get console output for an an instance""" + """Get console output for an an instance.""" return self._call_compute_message('get_console_output', context, instance_id) def lock(self, context, instance_id): - """lock the instance with instance_id""" + """Lock the given instance.""" self._cast_compute_message('lock_instance', context, instance_id) def unlock(self, context, instance_id): - """unlock the instance with instance_id""" + """Unlock the given instance.""" self._cast_compute_message('unlock_instance', context, instance_id) def get_lock(self, context, instance_id): - """return the boolean state of (instance with instance_id)'s lock""" + """Return the boolean state of given instance's lock.""" instance = self.get(context, instance_id) return instance['locked'] def reset_network(self, context, instance_id): - """ - Reset networking on the instance. - - """ + """Reset networking on the instance.""" self._cast_compute_message('reset_network', context, instance_id) def inject_network_info(self, context, instance_id): - """ - Inject network info for the instance. - - """ + """Inject network info for the instance.""" self._cast_compute_message('inject_network_info', context, instance_id) def attach_volume(self, context, instance_id, volume_id, device): + """Attach an existing volume to an existing instance.""" if not re.match("^/dev/[a-z]d[a-z]+$", device): raise exception.ApiError(_("Invalid device specified: %s. " "Example device: /dev/vdb") % device) @@ -689,6 +758,7 @@ class API(base.Base): "mountpoint": device}}) def detach_volume(self, context, volume_id): + """Detach a volume from an instance.""" instance = self.db.volume_get_instance(context.elevated(), volume_id) if not instance: raise exception.ApiError(_("Volume isn't attached to anything!")) @@ -702,6 +772,7 @@ class API(base.Base): return instance def associate_floating_ip(self, context, instance_id, address): + """Associate a floating ip with an instance.""" instance = self.get(context, instance_id) self.network_api.associate_floating_ip(context, floating_ip=address, @@ -713,11 +784,14 @@ class API(base.Base): return dict(rv.iteritems()) def delete_instance_metadata(self, context, instance_id, key): - """Delete the given metadata item""" + """Delete the given metadata item from an instance.""" self.db.instance_metadata_delete(context, instance_id, key) def update_or_create_instance_metadata(self, context, instance_id, metadata): - """Updates or creates instance metadata""" + """Updates or creates instance metadata.""" + combined_metadata = self.get_instance_metadata(context, instance_id) + combined_metadata.update(metadata) + self._check_metadata_properties_quota(context, combined_metadata) self.db.instance_metadata_update_or_create(context, instance_id, metadata) diff --git a/nova/compute/instance_types.py b/nova/compute/instance_types.py index fa02a5dfa..1275a6fdd 100644 --- a/nova/compute/instance_types.py +++ b/nova/compute/instance_types.py @@ -18,9 +18,7 @@ # License for the specific language governing permissions and limitations # under the License. -""" -The built-in instance properties. -""" +"""Built-in instance properties.""" from nova import context from nova import db @@ -34,18 +32,16 @@ LOG = logging.getLogger('nova.instance_types') def create(name, memory, vcpus, local_gb, flavorid, swap=0, rxtx_quota=0, rxtx_cap=0): - """Creates instance types / flavors - arguments: name memory vcpus local_gb flavorid swap rxtx_quota rxtx_cap - """ + """Creates instance types.""" for option in [memory, vcpus, local_gb, flavorid]: try: int(option) except ValueError: - raise exception.InvalidInputException( - _("create arguments must be positive integers")) + raise exception.InvalidInput(reason=_("create arguments must " + "be positive integers")) if (int(memory) <= 0) or (int(vcpus) <= 0) or (int(local_gb) < 0): - raise exception.InvalidInputException( - _("create arguments must be positive integers")) + raise exception.InvalidInput(reason=_("create arguments must " + "be positive integers")) try: db.instance_type_create( @@ -59,83 +55,88 @@ def create(name, memory, vcpus, local_gb, flavorid, swap=0, rxtx_quota=rxtx_quota, rxtx_cap=rxtx_cap)) except exception.DBError, e: - LOG.exception(_('DB error: %s' % e)) - raise exception.ApiError(_("Cannot create instance type: %s" % name)) + LOG.exception(_('DB error: %s') % e) + raise exception.ApiError(_("Cannot create instance_type with " + "name %(name)s and flavorid %(flavorid)s") + % locals()) def destroy(name): - """Marks instance types / flavors as deleted - arguments: name""" - if name == None: - raise exception.InvalidInputException(_("No instance type specified")) + """Marks instance types as deleted.""" + if name is None: + raise exception.InvalidInstanceType(instance_type=name) else: try: db.instance_type_destroy(context.get_admin_context(), name) except exception.NotFound: - LOG.exception(_('Instance type %s not found for deletion' % name)) - raise exception.ApiError(_("Unknown instance type: %s" % name)) + LOG.exception(_('Instance type %s not found for deletion') % name) + raise exception.ApiError(_("Unknown instance type: %s") % name) def purge(name): - """Removes instance types / flavors from database - arguments: name""" - if name == None: - raise exception.InvalidInputException(_("No instance type specified")) + """Removes instance types from database.""" + if name is None: + raise exception.InvalidInstanceType(instance_type=name) else: try: db.instance_type_purge(context.get_admin_context(), name) except exception.NotFound: - LOG.exception(_('Instance type %s not found for purge' % name)) - raise exception.ApiError(_("Unknown instance type: %s" % name)) + LOG.exception(_('Instance type %s not found for purge') % name) + raise exception.ApiError(_("Unknown instance type: %s") % name) def get_all_types(inactive=0): - """Retrieves non-deleted instance_types. - Pass true as argument if you want deleted instance types returned also.""" + """Get all non-deleted instance_types. + + Pass true as argument if you want deleted instance types returned also. + + """ return db.instance_type_get_all(context.get_admin_context(), inactive) -def get_all_flavors(): - """retrieves non-deleted flavors. alias for instance_types.get_all_types(). - Pass true as argument if you want deleted instance types returned also.""" - return get_all_types(context.get_admin_context()) +get_all_flavors = get_all_types -def get_instance_type(name): - """Retrieves single instance type by name""" - if name is None: - return FLAGS.default_instance_type +def get_default_instance_type(): + """Get the default instance type.""" + name = FLAGS.default_instance_type try: - ctxt = context.get_admin_context() - inst_type = db.instance_type_get_by_name(ctxt, name) - return inst_type + return get_instance_type_by_name(name) except exception.DBError: - raise exception.ApiError(_("Unknown instance type: %s" % name)) + raise exception.ApiError(_("Unknown instance type: %s") % name) -def get_by_type(instance_type): - """retrieve instance type name""" - if instance_type is None: - return FLAGS.default_instance_type +def get_instance_type(id): + """Retrieves single instance type by id.""" + if id is None: + return get_default_instance_type() + try: + ctxt = context.get_admin_context() + return db.instance_type_get_by_id(ctxt, id) + except exception.DBError: + raise exception.ApiError(_("Unknown instance type: %s") % name) + +def get_instance_type_by_name(name): + """Retrieves single instance type by name.""" + if name is None: + return get_default_instance_type() try: ctxt = context.get_admin_context() - inst_type = db.instance_type_get_by_name(ctxt, instance_type) - return inst_type['name'] - except exception.DBError, e: - LOG.exception(_('DB error: %s' % e)) - raise exception.ApiError(_("Unknown instance type: %s" %\ - instance_type)) + return db.instance_type_get_by_name(ctxt, name) + except exception.DBError: + raise exception.ApiError(_("Unknown instance type: %s") % name) -def get_by_flavor_id(flavor_id): - """retrieve instance type's name by flavor_id""" +# TODO(termie): flavor-specific code should probably be in the API that uses +# flavors. +def get_instance_type_by_flavor_id(flavor_id): + """Retrieve instance type by flavor_id.""" if flavor_id is None: - return FLAGS.default_instance_type + return get_default_instance_type() try: ctxt = context.get_admin_context() - flavor = db.instance_type_get_by_flavor_id(ctxt, flavor_id) - return flavor['name'] + return db.instance_type_get_by_flavor_id(ctxt, flavor_id) except exception.DBError, e: - LOG.exception(_('DB error: %s' % e)) - raise exception.ApiError(_("Unknown flavor: %s" % flavor_id)) + LOG.exception(_('DB error: %s') % e) + raise exception.ApiError(_("Unknown flavor: %s") % flavor_id) diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 916c1db5e..ff7aeb053 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -17,8 +17,7 @@ # License for the specific language governing permissions and limitations # under the License. -""" -Handles all processes relating to instances (guest vms). +"""Handles all processes relating to instances (guest vms). The :py:class:`ComputeManager` class is a :py:class:`nova.manager.Manager` that handles RPC calls relating to creating instances. It is responsible for @@ -33,15 +32,15 @@ terminating it. by :func:`nova.utils.import_object` :volume_manager: Name of class that handles persistent storage, loaded by :func:`nova.utils.import_object` + """ import datetime import os -import random -import string import socket import sys import tempfile +import time import functools from eventlet import greenthread @@ -50,11 +49,14 @@ from nova import exception from nova import flags from nova import log as logging from nova import manager +from nova import network from nova import rpc from nova import utils +from nova import volume from nova.compute import power_state from nova.virt import driver + FLAGS = flags.FLAGS flags.DEFINE_string('instances_path', '$state_path/instances', 'where instances are stored on disk') @@ -73,20 +75,18 @@ flags.DEFINE_integer('live_migration_retry_count', 30, flags.DEFINE_integer("rescue_timeout", 0, "Automatically unrescue an instance after N seconds." " Set to 0 to disable.") +flags.DEFINE_bool('auto_assign_floating_ip', False, + 'Autoassigning floating ip to VM') +flags.DEFINE_integer('host_state_interval', 120, + 'Interval in seconds for querying the host status') LOG = logging.getLogger('nova.compute.manager') def checks_instance_lock(function): - """ - decorator used for preventing action against locked instances - unless, of course, you happen to be admin - - """ - + """Decorator to prevent action against locked instances for non-admins.""" @functools.wraps(function) def decorated_function(self, context, instance_id, *args, **kwargs): - LOG.info(_("check_instance_lock: decorating: |%s|"), function, context=context) LOG.info(_("check_instance_lock: arguments: |%(self)s| |%(context)s|" @@ -112,7 +112,6 @@ def checks_instance_lock(function): class ComputeManager(manager.SchedulerDependentManager): - """Manages the running instances from creation to destruction.""" def __init__(self, compute_driver=None, *args, **kwargs): @@ -132,37 +131,55 @@ class ComputeManager(manager.SchedulerDependentManager): self.network_manager = utils.import_object(FLAGS.network_manager) self.volume_manager = utils.import_object(FLAGS.volume_manager) + self.network_api = network.API() + self._last_host_check = 0 super(ComputeManager, self).__init__(service_name="compute", *args, **kwargs) def init_host(self): - """Do any initialization that needs to be run if this is a - standalone service. - """ + """Initialization for a standalone compute service.""" self.driver.init_host(host=self.host) - def _update_state(self, context, instance_id): + def _update_state(self, context, instance_id, state=None): """Update the state of an instance from the driver info.""" - # FIXME(ja): include other fields from state? instance_ref = self.db.instance_get(context, instance_id) - try: - info = self.driver.get_info(instance_ref['name']) - state = info['state'] - except exception.NotFound: - state = power_state.FAILED + + if state is None: + try: + info = self.driver.get_info(instance_ref['name']) + except exception.NotFound: + info = None + + if info is not None: + state = info['state'] + else: + state = power_state.FAILED + self.db.instance_set_state(context, instance_id, state) + def _update_launched_at(self, context, instance_id, launched_at=None): + """Update the launched_at parameter of the given instance.""" + data = {'launched_at': launched_at or datetime.datetime.utcnow()} + self.db.instance_update(context, instance_id, data) + + def _update_image_id(self, context, instance_id, image_id): + """Update the image_id for the given instance.""" + data = {'image_id': image_id} + self.db.instance_update(context, instance_id, data) + def get_console_topic(self, context, **kwargs): - """Retrieves the console host for a project on this host - Currently this is just set in the flags for each compute - host.""" + """Retrieves the console host for a project on this host. + + Currently this is just set in the flags for each compute host. + + """ #TODO(mdragon): perhaps make this variable by console_type? return self.db.queue_get_for(context, FLAGS.console_topic, FLAGS.console_host) def get_network_topic(self, context, **kwargs): - """Retrieves the network host for a project on this host""" + """Retrieves the network host for a project on this host.""" # TODO(vish): This method should be memoized. This will make # the call to get_network_host cheaper, so that # it can pas messages instead of checking the db @@ -179,15 +196,23 @@ class ComputeManager(manager.SchedulerDependentManager): return self.driver.get_console_pool_info(console_type) @exception.wrap_exception - def refresh_security_group_rules(self, context, - security_group_id, **kwargs): - """This call passes straight through to the virtualization driver.""" + def refresh_security_group_rules(self, context, security_group_id, + **kwargs): + """Tell the virtualization driver to refresh security group rules. + + Passes straight through to the virtualization driver. + + """ return self.driver.refresh_security_group_rules(security_group_id) @exception.wrap_exception def refresh_security_group_members(self, context, security_group_id, **kwargs): - """This call passes straight through to the virtualization driver.""" + """Tell the virtualization driver to refresh security group members. + + Passes straight through to the virtualization driver. + + """ return self.driver.refresh_security_group_members(security_group_id) @exception.wrap_exception @@ -201,6 +226,7 @@ class ComputeManager(manager.SchedulerDependentManager): context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) instance_ref.injected_files = kwargs.get('injected_files', []) + instance_ref.admin_pass = kwargs.get('admin_password', None) if instance_ref['name'] in self.driver.list_instances(): raise exception.Error(_("Instance has already been created")) LOG.audit(_("instance %s: starting..."), instance_id, @@ -214,7 +240,7 @@ class ComputeManager(manager.SchedulerDependentManager): power_state.NOSTATE, 'networking') - is_vpn = instance_ref['image_id'] == FLAGS.vpn_image_id + is_vpn = instance_ref['image_id'] == str(FLAGS.vpn_image_id) # NOTE(vish): This could be a cast because we don't do anything # with the address currently, but I'm leaving it as # a call to ensure that network setup completes. We @@ -230,31 +256,36 @@ class ComputeManager(manager.SchedulerDependentManager): instance_id) # TODO(vish) check to make sure the availability zone matches - self.db.instance_set_state(context, - instance_id, - power_state.NOSTATE, - 'spawning') + self._update_state(context, instance_id, power_state.BUILDING) try: self.driver.spawn(instance_ref) - now = datetime.datetime.utcnow() - self.db.instance_update(context, - instance_id, - {'launched_at': now}) - except Exception: # pylint: disable=W0702 - LOG.exception(_("Instance '%s' failed to spawn. Is virtualization" - " enabled in the BIOS?"), instance_id, - context=context) - self.db.instance_set_state(context, - instance_id, - power_state.SHUTDOWN) - + except Exception as ex: # pylint: disable=W0702 + msg = _("Instance '%(instance_id)s' failed to spawn. Is " + "virtualization enabled in the BIOS? Details: " + "%(ex)s") % locals() + LOG.exception(msg) + + if not FLAGS.stub_network and FLAGS.auto_assign_floating_ip: + public_ip = self.network_api.allocate_floating_ip(context) + + self.db.floating_ip_set_auto_assigned(context, public_ip) + fixed_ip = self.db.fixed_ip_get_by_address(context, address) + floating_ip = self.db.floating_ip_get_by_address(context, + public_ip) + + self.network_api.associate_floating_ip(context, + floating_ip, + fixed_ip, + affect_auto_assigned=True) + + self._update_launched_at(context, instance_id) self._update_state(context, instance_id) @exception.wrap_exception @checks_instance_lock def terminate_instance(self, context, instance_id): - """Terminate an instance on this machine.""" + """Terminate an instance on this host.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) LOG.audit(_("Terminating instance %s"), instance_id, context=context) @@ -269,13 +300,17 @@ class ComputeManager(manager.SchedulerDependentManager): # NOTE(vish): Right now we don't really care if the ip is # disassociated. We may need to worry about # checking this later. - network_topic = self.db.queue_get_for(context, - FLAGS.network_topic, - floating_ip['host']) - rpc.cast(context, - network_topic, - {"method": "disassociate_floating_ip", - "args": {"floating_address": address}}) + self.network_api.disassociate_floating_ip(context, + address, + True) + if (FLAGS.auto_assign_floating_ip + and floating_ip.get('auto_assigned')): + LOG.debug(_("Deallocating floating ip %s"), + floating_ip['address'], + context=context) + self.network_api.release_floating_ip(context, + address, + True) address = fixed_ip['address'] if address: @@ -301,8 +336,35 @@ class ComputeManager(manager.SchedulerDependentManager): @exception.wrap_exception @checks_instance_lock + def rebuild_instance(self, context, instance_id, image_id): + """Destroy and re-make this instance. + + A 'rebuild' effectively purges all existing data from the system and + remakes the VM with given 'metadata' and 'personalities'. + + :param context: `nova.RequestContext` object + :param instance_id: Instance identifier (integer) + :param image_id: Image identifier (integer) + """ + context = context.elevated() + + instance_ref = self.db.instance_get(context, instance_id) + LOG.audit(_("Rebuilding instance %s"), instance_id, context=context) + + self._update_state(context, instance_id, power_state.BUILDING) + + self.driver.destroy(instance_ref) + instance_ref.image_id = image_id + self.driver.spawn(instance_ref) + + self._update_image_id(context, instance_id, image_id) + self._update_launched_at(context, instance_id) + self._update_state(context, instance_id) + + @exception.wrap_exception + @checks_instance_lock def reboot_instance(self, context, instance_id): - """Reboot an instance on this server.""" + """Reboot an instance on this host.""" context = context.elevated() self._update_state(context, instance_id) instance_ref = self.db.instance_get(context, instance_id) @@ -326,7 +388,7 @@ class ComputeManager(manager.SchedulerDependentManager): @exception.wrap_exception def snapshot_instance(self, context, instance_id, image_id): - """Snapshot an instance on this server.""" + """Snapshot an instance on this host.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) @@ -349,28 +411,55 @@ class ComputeManager(manager.SchedulerDependentManager): @exception.wrap_exception @checks_instance_lock def set_admin_password(self, context, instance_id, new_pass=None): - """Set the root/admin password for an instance on this server.""" + """Set the root/admin password for an instance on this host. + + This is generally only called by API password resets after an + image has been built. + """ + context = context.elevated() - instance_ref = self.db.instance_get(context, instance_id) - instance_id = instance_ref['id'] - instance_state = instance_ref['state'] - expected_state = power_state.RUNNING - if instance_state != expected_state: - LOG.warn(_('trying to reset the password on a non-running ' - 'instance: %(instance_id)s (state: %(instance_state)s ' - 'expected: %(expected_state)s)') % locals()) - LOG.audit(_('instance %s: setting admin password'), - instance_ref['name']) + if new_pass is None: # Generate a random password new_pass = utils.generate_password(FLAGS.password_length) - self.driver.set_admin_password(instance_ref, new_pass) - self._update_state(context, instance_id) + + max_tries = 10 + + for i in xrange(max_tries): + instance_ref = self.db.instance_get(context, instance_id) + instance_id = instance_ref["id"] + instance_state = instance_ref["state"] + expected_state = power_state.RUNNING + + if instance_state != expected_state: + raise exception.Error(_('Instance is not running')) + else: + try: + self.driver.set_admin_password(instance_ref, new_pass) + LOG.audit(_("Instance %s: Root password set"), + instance_ref["name"]) + break + except NotImplementedError: + # NOTE(dprince): if the driver doesn't implement + # set_admin_password we break to avoid a loop + LOG.warn(_('set_admin_password is not implemented ' + 'by this driver.')) + break + except Exception, e: + # Catch all here because this could be anything. + LOG.exception(e) + if i == max_tries - 1: + # At some point this exception may make it back + # to the API caller, and we don't want to reveal + # too much. The real exception is logged above + raise exception.Error(_('Internal error')) + time.sleep(1) + continue @exception.wrap_exception @checks_instance_lock def inject_file(self, context, instance_id, path, file_contents): - """Write a file to the specified path on an instance on this server""" + """Write a file to the specified path in an instance on this host.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) instance_id = instance_ref['id'] @@ -388,44 +477,34 @@ class ComputeManager(manager.SchedulerDependentManager): @exception.wrap_exception @checks_instance_lock def rescue_instance(self, context, instance_id): - """Rescue an instance on this server.""" + """Rescue an instance on this host.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) LOG.audit(_('instance %s: rescuing'), instance_id, context=context) - self.db.instance_set_state( - context, - instance_id, - power_state.NOSTATE, - 'rescuing') + self.db.instance_set_state(context, + instance_id, + power_state.NOSTATE, + 'rescuing') self.network_manager.setup_compute_network(context, instance_id) - self.driver.rescue( - instance_ref, - lambda result: self._update_state_callback( - self, - context, - instance_id, - result)) + _update_state = lambda result: self._update_state_callback( + self, context, instance_id, result) + self.driver.rescue(instance_ref, _update_state) self._update_state(context, instance_id) @exception.wrap_exception @checks_instance_lock def unrescue_instance(self, context, instance_id): - """Rescue an instance on this server.""" + """Rescue an instance on this host.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) LOG.audit(_('instance %s: unrescuing'), instance_id, context=context) - self.db.instance_set_state( - context, - instance_id, - power_state.NOSTATE, - 'unrescuing') - self.driver.unrescue( - instance_ref, - lambda result: self._update_state_callback( - self, - context, - instance_id, - result)) + self.db.instance_set_state(context, + instance_id, + power_state.NOSTATE, + 'unrescuing') + _update_state = lambda result: self._update_state_callback( + self, context, instance_id, result) + self.driver.unrescue(instance_ref, _update_state) self._update_state(context, instance_id) @staticmethod @@ -436,18 +515,20 @@ class ComputeManager(manager.SchedulerDependentManager): @exception.wrap_exception @checks_instance_lock def confirm_resize(self, context, instance_id, migration_id): - """Destroys the source instance""" + """Destroys the source instance.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) - migration_ref = self.db.migration_get(context, migration_id) self.driver.destroy(instance_ref) @exception.wrap_exception @checks_instance_lock def revert_resize(self, context, instance_id, migration_id): - """Destroys the new instance on the destination machine, - reverts the model changes, and powers on the old - instance on the source machine""" + """Destroys the new instance on the destination machine. + + Reverts the model changes, and powers on the old instance on the + source machine. + + """ instance_ref = self.db.instance_get(context, instance_id) migration_ref = self.db.migration_get(context, migration_id) @@ -464,9 +545,12 @@ class ComputeManager(manager.SchedulerDependentManager): @exception.wrap_exception @checks_instance_lock def finish_revert_resize(self, context, instance_id, migration_id): - """Finishes the second half of reverting a resize, powering back on - the source instance and reverting the resized attributes in the - database""" + """Finishes the second half of reverting a resize. + + Power back on the source instance and revert the resized attributes + in the database. + + """ instance_ref = self.db.instance_get(context, instance_id) migration_ref = self.db.migration_get(context, migration_id) instance_type = self.db.instance_type_get_by_flavor_id(context, @@ -486,8 +570,11 @@ class ComputeManager(manager.SchedulerDependentManager): @exception.wrap_exception @checks_instance_lock def prep_resize(self, context, instance_id, flavor_id): - """Initiates the process of moving a running instance to another - host, possibly changing the RAM and disk size in the process""" + """Initiates the process of moving a running instance to another host. + + Possibly changes the RAM and disk size in the process. + + """ context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) if instance_ref['host'] == FLAGS.host: @@ -519,34 +606,38 @@ class ComputeManager(manager.SchedulerDependentManager): @exception.wrap_exception @checks_instance_lock def resize_instance(self, context, instance_id, migration_id): - """Starts the migration of a running instance to another host""" + """Starts the migration of a running instance to another host.""" migration_ref = self.db.migration_get(context, migration_id) instance_ref = self.db.instance_get(context, instance_id) - self.db.migration_update(context, migration_id, - {'status': 'migrating', }) - - disk_info = self.driver.migrate_disk_and_power_off(instance_ref, - migration_ref['dest_host']) - self.db.migration_update(context, migration_id, - {'status': 'post-migrating', }) - - service = self.db.service_get_by_host_and_topic(context, - migration_ref['dest_compute'], FLAGS.compute_topic) - topic = self.db.queue_get_for(context, FLAGS.compute_topic, - migration_ref['dest_compute']) - rpc.cast(context, topic, - {'method': 'finish_resize', - 'args': { - 'migration_id': migration_id, - 'instance_id': instance_id, - 'disk_info': disk_info, }, - }) + self.db.migration_update(context, + migration_id, + {'status': 'migrating'}) + + disk_info = self.driver.migrate_disk_and_power_off( + instance_ref, migration_ref['dest_host']) + self.db.migration_update(context, + migration_id, + {'status': 'post-migrating'}) + + service = self.db.service_get_by_host_and_topic( + context, migration_ref['dest_compute'], FLAGS.compute_topic) + topic = self.db.queue_get_for(context, + FLAGS.compute_topic, + migration_ref['dest_compute']) + rpc.cast(context, topic, {'method': 'finish_resize', + 'args': {'migration_id': migration_id, + 'instance_id': instance_id, + 'disk_info': disk_info}}) @exception.wrap_exception @checks_instance_lock def finish_resize(self, context, instance_id, migration_id, disk_info): - """Completes the migration process by setting up the newly transferred - disk and turning on the instance on its new host machine""" + """Completes the migration process. + + Sets up the newly transferred disk and turns on the instance at its + new host machine. + + """ migration_ref = self.db.migration_get(context, migration_id) instance_ref = self.db.instance_get(context, migration_ref['instance_id']) @@ -555,7 +646,7 @@ class ComputeManager(manager.SchedulerDependentManager): instance_type = self.db.instance_type_get_by_flavor_id(context, migration_ref['new_flavor_id']) self.db.instance_update(context, instance_id, - dict(instance_type=instance_type['name'], + dict(instance_type_id=instance_type['id'], memory_mb=instance_type['memory_mb'], vcpus=instance_type['vcpus'], local_gb=instance_type['local_gb'])) @@ -571,7 +662,7 @@ class ComputeManager(manager.SchedulerDependentManager): @exception.wrap_exception @checks_instance_lock def pause_instance(self, context, instance_id): - """Pause an instance on this server.""" + """Pause an instance on this host.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) LOG.audit(_('instance %s: pausing'), instance_id, context=context) @@ -588,7 +679,7 @@ class ComputeManager(manager.SchedulerDependentManager): @exception.wrap_exception @checks_instance_lock def unpause_instance(self, context, instance_id): - """Unpause a paused instance on this server.""" + """Unpause a paused instance on this host.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) LOG.audit(_('instance %s: unpausing'), instance_id, context=context) @@ -604,7 +695,7 @@ class ComputeManager(manager.SchedulerDependentManager): @exception.wrap_exception def get_diagnostics(self, context, instance_id): - """Retrieve diagnostics for an instance on this server.""" + """Retrieve diagnostics for an instance on this host.""" instance_ref = self.db.instance_get(context, instance_id) if instance_ref["state"] == power_state.RUNNING: @@ -615,10 +706,7 @@ class ComputeManager(manager.SchedulerDependentManager): @exception.wrap_exception @checks_instance_lock def suspend_instance(self, context, instance_id): - """ - suspend the instance with instance_id - - """ + """Suspend the given instance.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) LOG.audit(_('instance %s: suspending'), instance_id, context=context) @@ -634,10 +722,7 @@ class ComputeManager(manager.SchedulerDependentManager): @exception.wrap_exception @checks_instance_lock def resume_instance(self, context, instance_id): - """ - resume the suspended instance with instance_id - - """ + """Resume the given suspended instance.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) LOG.audit(_('instance %s: resuming'), instance_id, context=context) @@ -652,34 +737,23 @@ class ComputeManager(manager.SchedulerDependentManager): @exception.wrap_exception def lock_instance(self, context, instance_id): - """ - lock the instance with instance_id - - """ + """Lock the given instance.""" context = context.elevated() - instance_ref = self.db.instance_get(context, instance_id) LOG.debug(_('instance %s: locking'), instance_id, context=context) self.db.instance_update(context, instance_id, {'locked': True}) @exception.wrap_exception def unlock_instance(self, context, instance_id): - """ - unlock the instance with instance_id - - """ + """Unlock the given instance.""" context = context.elevated() - instance_ref = self.db.instance_get(context, instance_id) LOG.debug(_('instance %s: unlocking'), instance_id, context=context) self.db.instance_update(context, instance_id, {'locked': False}) @exception.wrap_exception def get_lock(self, context, instance_id): - """ - return the boolean state of (instance with instance_id)'s lock - - """ + """Return the boolean state of the given instance's lock.""" context = context.elevated() LOG.debug(_('instance %s: getting locked state'), instance_id, context=context) @@ -688,10 +762,7 @@ class ComputeManager(manager.SchedulerDependentManager): @checks_instance_lock def reset_network(self, context, instance_id): - """ - Reset networking on the instance. - - """ + """Reset networking on the given instance.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) LOG.debug(_('instance %s: reset network'), instance_id, @@ -700,10 +771,7 @@ class ComputeManager(manager.SchedulerDependentManager): @checks_instance_lock def inject_network_info(self, context, instance_id): - """ - Inject network info for the instance. - - """ + """Inject network info for the given instance.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) LOG.debug(_('instance %s: inject network info'), instance_id, @@ -712,29 +780,28 @@ class ComputeManager(manager.SchedulerDependentManager): @exception.wrap_exception def get_console_output(self, context, instance_id): - """Send the console output for an instance.""" + """Send the console output for the given instance.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) LOG.audit(_("Get console output for instance %s"), instance_id, context=context) - return self.driver.get_console_output(instance_ref) + output = self.driver.get_console_output(instance_ref) + return output.decode('utf-8', 'replace').encode('ascii', 'replace') @exception.wrap_exception def get_ajax_console(self, context, instance_id): - """Return connection information for an ajax console""" + """Return connection information for an ajax console.""" context = context.elevated() LOG.debug(_("instance %s: getting ajax console"), instance_id) instance_ref = self.db.instance_get(context, instance_id) - return self.driver.get_ajax_console(instance_ref) @exception.wrap_exception def get_vnc_console(self, context, instance_id): - """Return connection information for an vnc console.""" + """Return connection information for a vnc console.""" context = context.elevated() LOG.debug(_("instance %s: getting vnc console"), instance_id) instance_ref = self.db.instance_get(context, instance_id) - return self.driver.get_vnc_console(instance_ref) @checks_instance_lock @@ -786,9 +853,17 @@ class ComputeManager(manager.SchedulerDependentManager): self.db.volume_detached(context, volume_id) return True + def remove_volume(self, context, volume_id): + """Remove volume on compute host. + + :param context: security context + :param volume_id: volume ID + """ + self.volume_manager.remove_compute_volume(context, volume_id) + @exception.wrap_exception def compare_cpu(self, context, cpu_info): - """Checks the host cpu is compatible to a cpu given by xml. + """Checks that the host cpu is compatible with a cpu given by xml. :param context: security context :param cpu_info: json string obtained from virConnect.getCapabilities @@ -809,7 +884,6 @@ class ComputeManager(manager.SchedulerDependentManager): :returns: tmpfile name(basename) """ - dirpath = FLAGS.instances_path fd, tmp_file = tempfile.mkstemp(dir=dirpath) LOG.debug(_("Creating tmpfile %s to notify to other " @@ -826,10 +900,9 @@ class ComputeManager(manager.SchedulerDependentManager): :param filename: confirm existence of FLAGS.instances_path/thisfile """ - tmp_file = os.path.join(FLAGS.instances_path, filename) if not os.path.exists(tmp_file): - raise exception.NotFound(_('%s not found') % tmp_file) + raise exception.FileNotFound(file_path=tmp_file) @exception.wrap_exception def cleanup_shared_storage_test_file(self, context, filename): @@ -839,7 +912,6 @@ class ComputeManager(manager.SchedulerDependentManager): :param filename: remove existence of FLAGS.instances_path/thisfile """ - tmp_file = os.path.join(FLAGS.instances_path, filename) os.remove(tmp_file) @@ -851,7 +923,6 @@ class ComputeManager(manager.SchedulerDependentManager): :returns: See driver.update_available_resource() """ - return self.driver.update_available_resource(context, self.host) def pre_live_migration(self, context, instance_id, time=None): @@ -861,7 +932,6 @@ class ComputeManager(manager.SchedulerDependentManager): :param instance_id: nova.db.sqlalchemy.models.Instance.Id """ - if not time: time = greenthread @@ -872,8 +942,7 @@ class ComputeManager(manager.SchedulerDependentManager): # Getting fixed ips fixed_ip = self.db.instance_get_fixed_address(context, instance_id) if not fixed_ip: - msg = _("%(instance_id)s(%(ec2_id)s) does not have fixed_ip.") - raise exception.NotFound(msg % locals()) + raise exception.NoFixedIpsFoundForInstance(instance_id=instance_id) # If any volume is mounted, prepare here. if not instance_ref['volumes']: @@ -920,7 +989,6 @@ class ComputeManager(manager.SchedulerDependentManager): :param dest: destination host """ - # Get instance for error handling. instance_ref = self.db.instance_get(context, instance_id) i_name = instance_ref.name @@ -1011,17 +1079,15 @@ class ComputeManager(manager.SchedulerDependentManager): "Domain not found: no domain with matching name.\" " "This error can be safely ignored.")) - def recover_live_migration(self, ctxt, instance_ref, host=None): + def recover_live_migration(self, ctxt, instance_ref, host=None, dest=None): """Recovers Instance/volume state from migrating -> running. :param ctxt: security context :param instance_id: nova.db.sqlalchemy.models.Instance.Id - :param host: - DB column value is updated by this hostname. - if none, the host instance currently running is selected. + :param host: DB column value is updated by this hostname. + If none, the host instance currently running is selected. """ - if not host: host = instance_ref['host'] @@ -1031,8 +1097,13 @@ class ComputeManager(manager.SchedulerDependentManager): 'state': power_state.RUNNING, 'host': host}) - for volume in instance_ref['volumes']: - self.db.volume_update(ctxt, volume['id'], {'status': 'in-use'}) + if dest: + volume_api = volume.API() + for volume_ref in instance_ref['volumes']: + volume_id = volume_ref['id'] + self.db.volume_update(ctxt, volume_id, {'status': 'in-use'}) + if dest: + volume_api.remove_from_compute(ctxt, volume_id, dest) def periodic_tasks(self, context=None): """Tasks to be run at a periodic interval.""" @@ -1049,6 +1120,13 @@ class ComputeManager(manager.SchedulerDependentManager): error_list.append(ex) try: + self._report_driver_status() + except Exception as ex: + LOG.warning(_("Error during report_driver_status(): %s"), + unicode(ex)) + error_list.append(ex) + + try: self._poll_instance_states(context) except Exception as ex: LOG.warning(_("Error during instance poll: %s"), @@ -1057,6 +1135,16 @@ class ComputeManager(manager.SchedulerDependentManager): return error_list + def _report_driver_status(self): + curr_time = time.time() + if curr_time - self._last_host_check > FLAGS.host_state_interval: + self._last_host_check = curr_time + LOG.info(_("Updating host status")) + # This will grab info about the host and queue it + # to be sent to the Schedulers. + self.update_service_capabilities( + self.driver.get_host_stats(refresh=True)) + def _poll_instance_states(self, context): vm_instances = self.driver.list_instances_detail() vm_instances = dict((vm.name, vm) for vm in vm_instances) @@ -1074,8 +1162,7 @@ class ComputeManager(manager.SchedulerDependentManager): if vm_instance is None: # NOTE(justinsb): We have to be very careful here, because a # concurrent operation could be in progress (e.g. a spawn) - if db_state == power_state.NOSTATE: - # Assume that NOSTATE => spawning + if db_state == power_state.BUILDING: # TODO(justinsb): This does mean that if we crash during a # spawn, the machine will never leave the spawning state, # but this is just the way nova is; this function isn't @@ -1095,23 +1182,28 @@ class ComputeManager(manager.SchedulerDependentManager): vm_state = vm_instance.state vms_not_found_in_db.remove(name) + if db_instance['state_description'] == 'migrating': + # A situation which db record exists, but no instance" + # sometimes occurs while live-migration at src compute, + # this case should be ignored. + LOG.debug(_("Ignoring %(name)s, as it's currently being " + "migrated.") % locals()) + continue + if vm_state != db_state: LOG.info(_("DB/VM state mismatch. Changing state from " "'%(db_state)s' to '%(vm_state)s'") % locals()) - self.db.instance_set_state(context, - db_instance['id'], - vm_state) + self._update_state(context, db_instance['id'], vm_state) - if vm_state == power_state.SHUTOFF: - # TODO(soren): This is what the compute manager does when you - # terminate an instance. At some point I figure we'll have a - # "terminated" state and some sort of cleanup job that runs - # occasionally, cleaning them out. - self.db.instance_destroy(context, db_instance['id']) + # NOTE(justinsb): We no longer auto-remove SHUTOFF instances + # It's quite hard to get them back when we do. # Are there VMs not in the DB? for vm_not_found_in_db in vms_not_found_in_db: name = vm_not_found_in_db - # TODO(justinsb): What to do here? Adopt it? Shut it down? - LOG.warning(_("Found VM not in DB: '%(name)s'. Ignoring") - % locals()) + + # We only care about instances that compute *should* know about + if name.startswith("instance-"): + # TODO(justinsb): What to do here? Adopt it? Shut it down? + LOG.warning(_("Found VM not in DB: '%(name)s'. Ignoring") + % locals()) diff --git a/nova/compute/monitor.py b/nova/compute/monitor.py index 04e08a235..3bb54a382 100644 --- a/nova/compute/monitor.py +++ b/nova/compute/monitor.py @@ -260,7 +260,7 @@ class Instance(object): try: data = self.fetch_cpu_stats() - if data != None: + if data is not None: LOG.debug('CPU: %s', data) update_rrd(self, 'cpu', data) @@ -313,7 +313,7 @@ class Instance(object): LOG.debug('CPU: %d', self.cputime) # Skip calculation on first pass. Need delta to get a meaningful value. - if cputime_last_updated == None: + if cputime_last_updated is None: return None # Calculate the number of seconds between samples. diff --git a/nova/compute/power_state.py b/nova/compute/power_state.py index ef013b2ef..c468fe6b3 100644 --- a/nova/compute/power_state.py +++ b/nova/compute/power_state.py @@ -30,20 +30,23 @@ SHUTOFF = 0x05 CRASHED = 0x06 SUSPENDED = 0x07 FAILED = 0x08 +BUILDING = 0x09 # TODO(justinsb): Power state really needs to be a proper class, # so that we're not locked into the libvirt status codes and can put mapping # logic here rather than spread throughout the code _STATE_MAP = { - NOSTATE: 'pending', - RUNNING: 'running', - BLOCKED: 'blocked', - PAUSED: 'paused', - SHUTDOWN: 'shutdown', - SHUTOFF: 'shutdown', - CRASHED: 'crashed', - SUSPENDED: 'suspended', - FAILED: 'failed to spawn'} + NOSTATE: 'pending', + RUNNING: 'running', + BLOCKED: 'blocked', + PAUSED: 'paused', + SHUTDOWN: 'shutdown', + SHUTOFF: 'shutdown', + CRASHED: 'crashed', + SUSPENDED: 'suspended', + FAILED: 'failed to spawn', + BUILDING: 'building', +} def name(code): |
