diff options
| -rw-r--r-- | nova/compute/api.py | 212 | ||||
| -rw-r--r-- | nova/tests/api/openstack/compute/test_server_actions.py | 2 | ||||
| -rw-r--r-- | nova/tests/api/openstack/compute/test_servers.py | 5 | ||||
| -rw-r--r-- | nova/tests/test_compute.py | 5 |
4 files changed, 99 insertions, 125 deletions
diff --git a/nova/compute/api.py b/nova/compute/api.py index 5254757de..f154103cf 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -760,11 +760,13 @@ class API(base.Base): self.db.instance_add_security_group(context.elevated(), instance_uuid, security_group['id']) - host = instance['host'] - rpc.cast(context, - self.db.queue_get_for(context, FLAGS.compute_topic, host), - {"method": "refresh_security_group_rules", - "args": {"security_group_id": security_group['id']}}) + params = {"security_group_id": security_group['id']} + # NOTE(comstud): No instance_uuid argument to this compute manager + # call + self._cast_compute_message('refresh_security_group_rules', + context, + host=instance['host'], + params=params) @wrap_check_policy def remove_security_group(self, context, instance, security_group_name): @@ -789,11 +791,13 @@ class API(base.Base): self.db.instance_remove_security_group(context.elevated(), instance_uuid, security_group['id']) - host = instance['host'] - rpc.cast(context, - self.db.queue_get_for(context, FLAGS.compute_topic, host), - {"method": "refresh_security_group_rules", - "args": {"security_group_id": security_group['id']}}) + params = {"security_group_id": security_group['id']} + # NOTE(comstud): No instance_uuid argument to this compute manager + # call + self._cast_compute_message('refresh_security_group_rules', + context, + host=instance['host'], + params=params) @wrap_check_policy @scheduler_api.reroute_compute("update") @@ -835,7 +839,7 @@ class API(base.Base): deleted_at=utils.utcnow()) self._cast_compute_message('power_off_instance', context, - instance_uuid, host) + instance) else: LOG.warning(_("No host for instance %s, deleting immediately"), instance["uuid"]) @@ -850,7 +854,7 @@ class API(base.Base): progress=0) self._cast_compute_message('terminate_instance', context, - instance['uuid'], host) + instance) else: self.db.instance_destroy(context, instance['id']) @@ -888,7 +892,7 @@ class API(base.Base): instance, task_state=task_states.POWERING_ON) self._cast_compute_message('power_on_instance', context, - instance['uuid'], host) + instance) @wrap_check_policy @check_instance_state(vm_state=[vm_states.SOFT_DELETE]) @@ -914,15 +918,9 @@ class API(base.Base): terminated_at=utils.utcnow(), progress=0) - host = instance['host'] - if not host: - return - - if do_cast: - self._cast_compute_message('stop_instance', context, - instance_uuid, host) - else: - self._call_compute_message('stop_instance', context, instance) + rpc_method = rpc.cast if do_cast else rpc.call + self._cast_or_call_compute_message(rpc_method, 'stop_instance', + context, instance) @wrap_check_policy @check_instance_state(vm_state=[vm_states.STOPPED, vm_states.SHUTOFF]) @@ -1110,10 +1108,17 @@ class API(base.Base): return self.db.instance_get_all_by_filters(context, filters) - def _cast_compute_message(self, method, context, instance_uuid, host=None, - params=None): - """Generic handler for RPC casts to compute. - + def _cast_or_call_compute_message(self, rpc_method, compute_method, + context, instance=None, host=None, params=None): + """Generic handler for RPC casts and calls to compute. + + :param rpc_method: RPC method to use (rpc.call or rpc.cast) + :param compute_method: Compute manager method to call + :param context: RequestContext of caller + :param instance: The instance object to use to find host to send to + Can be None to not include instance_uuid in args + :param host: Optional host to send to instead of instance['host'] + Must be specified if 'instance' is None :param params: Optional dictionary of arguments to be passed to the compute worker @@ -1122,49 +1127,30 @@ class API(base.Base): if not params: params = {} if not host: - instance = self.get(context, instance_uuid) + if not instance: + raise exception.Error(_("No compute host specified")) host = instance['host'] + if not host: + raise exception.Error(_("Unable to find host for " + "Instance %s") % instance['uuid']) queue = self.db.queue_get_for(context, FLAGS.compute_topic, host) - #TODO (ameade): this check should be removed after everything - #in compute manager expects uuids instead of ids - if utils.is_uuid_like(instance_uuid): - params['instance_uuid'] = instance_uuid - else: - params['instance_id'] = instance_uuid - kwargs = {'method': method, 'args': params} - rpc.cast(context, queue, kwargs) + if instance: + params['instance_uuid'] = instance['uuid'] + kwargs = {'method': compute_method, 'args': params} + return rpc_method(context, queue, kwargs) - def _call_compute_message(self, method, context, instance, params=None): - """Generic handler for RPC calls to compute. + def _cast_compute_message(self, *args, **kwargs): + """Generic handler for RPC casts to compute.""" + self._cast_or_call_compute_message(rpc.cast, *args, **kwargs) - :param params: Optional dictionary of arguments to be passed to the - compute worker - - :returns: Result returned by compute worker - """ - if not params: - params = {} - host = instance['host'] - queue = self.db.queue_get_for(context, FLAGS.compute_topic, host) - params['instance_uuid'] = instance['uuid'] - kwargs = {'method': method, 'args': params} - return rpc.call(context, queue, kwargs) + def _call_compute_message(self, *args, **kwargs): + """Generic handler for RPC calls to compute.""" + return self._cast_or_call_compute_message(rpc.call, *args, **kwargs) def _cast_scheduler_message(self, context, args): """Generic handler for RPC calls to the scheduler.""" rpc.cast(context, FLAGS.scheduler_topic, args) - def _find_host(self, context, instance_uuid): - """Find the host associated with an instance.""" - for attempts in xrange(FLAGS.find_host_timeout): - instance = self.get(context, instance_uuid) - host = instance["host"] - if host: - return host - time.sleep(1) - raise exception.Error(_("Unable to find host for Instance %s") - % instance_uuid) - @wrap_check_policy @check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.SHUTOFF], task_state=[None, task_states.RESIZE_VERIFY]) @@ -1243,8 +1229,8 @@ class API(base.Base): recv_meta = self.image_service.create(context, sent_meta) params = {'image_id': recv_meta['id'], 'image_type': image_type, 'backup_type': backup_type, 'rotation': rotation} - self._cast_compute_message('snapshot_instance', context, instance_uuid, - params=params) + self._cast_compute_message('snapshot_instance', context, + instance, params=params) return recv_meta def _get_minram_mindisk_params(self, context, instance): @@ -1281,7 +1267,7 @@ class API(base.Base): task_state=state) self._cast_compute_message('reboot_instance', context, - instance['uuid'], + instance, params={'reboot_type': reboot_type}) def _validate_image_href(self, context, image_href): @@ -1320,7 +1306,7 @@ class API(base.Base): self._cast_compute_message('rebuild_instance', context, - instance["uuid"], + instance, params=rebuild_params) @wrap_check_policy @@ -1343,8 +1329,8 @@ class API(base.Base): params = {'migration_id': migration_ref['id']} self._cast_compute_message('revert_resize', context, - instance['uuid'], - migration_ref['dest_compute'], + instance, + host=migration_ref['dest_compute'], params=params) self.db.migration_update(context, migration_ref['id'], @@ -1370,8 +1356,8 @@ class API(base.Base): params = {'migration_id': migration_ref['id']} self._cast_compute_message('confirm_resize', context, - instance['uuid'], - migration_ref['source_compute'], + instance, + host=migration_ref['source_compute'], params=params) self.db.migration_update(context, migration_ref['id'], @@ -1442,20 +1428,18 @@ class API(base.Base): @scheduler_api.reroute_compute("add_fixed_ip") def add_fixed_ip(self, context, instance, network_id): """Add fixed_ip from specified network to given instance.""" - instance_uuid = instance['uuid'] self._cast_compute_message('add_fixed_ip_to_instance', context, - instance_uuid, + instance, params=dict(network_id=network_id)) @wrap_check_policy @scheduler_api.reroute_compute("remove_fixed_ip") def remove_fixed_ip(self, context, instance, address): """Remove fixed_ip from specified network to given instance.""" - instance_uuid = instance['uuid'] self._cast_compute_message('remove_fixed_ip_from_instance', context, - instance_uuid, + instance, params=dict(address=address)) @wrap_check_policy @@ -1465,40 +1449,36 @@ class API(base.Base): @scheduler_api.reroute_compute("pause") def pause(self, context, instance): """Pause the given instance.""" - instance_uuid = instance["uuid"] self.update(context, instance, vm_state=vm_states.ACTIVE, task_state=task_states.PAUSING) - self._cast_compute_message('pause_instance', context, instance_uuid) + self._cast_compute_message('pause_instance', context, instance) @wrap_check_policy @check_instance_state(vm_state=[vm_states.PAUSED]) @scheduler_api.reroute_compute("unpause") def unpause(self, context, instance): """Unpause the given instance.""" - instance_uuid = instance["uuid"] self.update(context, instance, vm_state=vm_states.PAUSED, task_state=task_states.UNPAUSING) - self._cast_compute_message('unpause_instance', context, instance_uuid) - - def _call_compute_message_for_host(self, action, context, host, params): - """Call method deliberately designed to make host/service only calls""" - queue = self.db.queue_get_for(context, FLAGS.compute_topic, host) - kwargs = {'method': action, 'args': params} - return rpc.call(context, queue, kwargs) + self._cast_compute_message('unpause_instance', context, instance) def set_host_enabled(self, context, host, enabled): """Sets the specified host's ability to accept new instances.""" - return self._call_compute_message_for_host("set_host_enabled", context, + # NOTE(comstud): No instance_uuid argument to this compute manager + # call + return self._call_compute_message("set_host_enabled", context, host=host, params={"enabled": enabled}) def host_power_action(self, context, host, action): """Reboots, shuts down or powers up the host.""" - return self._call_compute_message_for_host("host_power_action", - context, host=host, params={"action": action}) + # NOTE(comstud): No instance_uuid argument to this compute manager + # call + return self._call_compute_message("host_power_action", context, + host=host, params={"action": action}) @wrap_check_policy @scheduler_api.reroute_compute("diagnostics") @@ -1520,24 +1500,22 @@ class API(base.Base): @scheduler_api.reroute_compute("suspend") def suspend(self, context, instance): """Suspend the given instance.""" - instance_uuid = instance["uuid"] self.update(context, instance, vm_state=vm_states.ACTIVE, task_state=task_states.SUSPENDING) - self._cast_compute_message('suspend_instance', context, instance_uuid) + self._cast_compute_message('suspend_instance', context, instance) @wrap_check_policy @check_instance_state(vm_state=[vm_states.SUSPENDED]) @scheduler_api.reroute_compute("resume") def resume(self, context, instance): """Resume the given instance.""" - instance_uuid = instance["uuid"] self.update(context, instance, vm_state=vm_states.SUSPENDED, task_state=task_states.RESUMING) - self._cast_compute_message('resume_instance', context, instance_uuid) + self._cast_compute_message('resume_instance', context, instance) @wrap_check_policy @check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.SHUTOFF, @@ -1555,7 +1533,7 @@ class API(base.Base): "rescue_password": rescue_password } self._cast_compute_message('rescue_instance', context, - instance['uuid'], + instance, params=rescue_params) @wrap_check_policy @@ -1568,25 +1546,21 @@ class API(base.Base): vm_state=vm_states.RESCUED, task_state=task_states.UNRESCUING) self._cast_compute_message('unrescue_instance', context, - instance['uuid']) + instance) @wrap_check_policy + @check_instance_state(vm_state=[vm_states.ACTIVE]) @scheduler_api.reroute_compute("set_admin_password") def set_admin_password(self, context, instance, password=None): """Set the root/admin password for the given instance.""" - instance_uuid = instance['uuid'] self.update(context, instance, task_state=task_states.UPDATING_PASSWORD) - host = self._find_host(context, instance['uuid']) - - rpc.cast(context, - self.db.queue_get_for(context, FLAGS.compute_topic, host), - { - "method": "set_admin_password", - "args": { - "instance_uuid": instance_uuid, "new_pass": password}}) + params = {"new_pass": password} + self._cast_compute_message('set_admin_password', context, + instance, + params=params) @wrap_check_policy @scheduler_api.reroute_compute("inject_file") @@ -1594,7 +1568,7 @@ class API(base.Base): """Write a file to the given instance.""" params = {'path': path, 'file_contents': file_contents} self._cast_compute_message('inject_file', context, - instance['uuid'], params=params) + instance, params=params) @wrap_check_policy def get_ajax_console(self, context, instance): @@ -1631,22 +1605,23 @@ class API(base.Base): @wrap_check_policy def get_console_output(self, context, instance, tail_length=None): """Get console output for an an instance.""" + params = {'tail_length': tail_length} return self._call_compute_message('get_console_output', context, instance, - {'tail_length': tail_length}) + params=params) @wrap_check_policy def lock(self, context, instance): """Lock the given instance.""" - self._cast_compute_message('lock_instance', context, instance['uuid']) + self._cast_compute_message('lock_instance', context, instance) @wrap_check_policy def unlock(self, context, instance): """Unlock the given instance.""" self._cast_compute_message('unlock_instance', context, - instance['uuid']) + instance) @wrap_check_policy def get_lock(self, context, instance): @@ -1656,13 +1631,13 @@ class API(base.Base): @wrap_check_policy def reset_network(self, context, instance): """Reset networking on the instance.""" - self._cast_compute_message('reset_network', context, instance['uuid']) + self._cast_compute_message('reset_network', context, instance) @wrap_check_policy def inject_network_info(self, context, instance): """Inject network info for the instance.""" self._cast_compute_message('inject_network_info', context, - instance['uuid']) + instance) @wrap_check_policy def attach_volume(self, context, instance, volume_id, device): @@ -1672,14 +1647,14 @@ class API(base.Base): "Example device: /dev/vdb") % device) volume = self.volume_api.get(context, volume_id) self.volume_api.check_attach(context, volume) - host = instance['host'] - rpc.cast(context, - self.db.queue_get_for(context, FLAGS.compute_topic, host), - {"method": "attach_volume", - "args": {"volume_id": volume_id, - "instance_uuid": instance['uuid'], - "mountpoint": device}}) + params = {"volume_id": volume_id, + "mountpoint": device} + self._cast_compute_message('attach_volume', context, + instance, + params=params) + # FIXME(comstud): I wonder if API should pull in the instance from + # the volume ID via volume API and pass it and the volume object here def detach_volume(self, context, volume_id): """Detach a volume from an instance.""" instance = self.db.volume_get_instance(context.elevated(), volume_id) @@ -1690,12 +1665,11 @@ class API(base.Base): volume = self.volume_api.get(context, volume_id) self.volume_api.check_detach(context, volume) - host = instance['host'] - rpc.cast(context, - self.db.queue_get_for(context, FLAGS.compute_topic, host), - {"method": "detach_volume", - "args": {"instance_uuid": instance['uuid'], - "volume_id": volume_id}}) + + params = {'volume_id': volume_id} + self._cast_compute_message('detach_volume', context, + instance, + params=params) return instance @wrap_check_policy diff --git a/nova/tests/api/openstack/compute/test_server_actions.py b/nova/tests/api/openstack/compute/test_server_actions.py index a0c4c390e..08612adb1 100644 --- a/nova/tests/api/openstack/compute/test_server_actions.py +++ b/nova/tests/api/openstack/compute/test_server_actions.py @@ -102,7 +102,7 @@ def stub_instance(id, metadata=None, image_ref="10", flavor_id="1", "vcpus": 0, "local_gb": 0, "hostname": "", - "host": "", + "host": "fake_host", "instance_type": dict(inst_type), "user_data": "", "reservation_id": "", diff --git a/nova/tests/api/openstack/compute/test_servers.py b/nova/tests/api/openstack/compute/test_servers.py index 0fafa1e77..f545aeaec 100644 --- a/nova/tests/api/openstack/compute/test_servers.py +++ b/nova/tests/api/openstack/compute/test_servers.py @@ -150,10 +150,6 @@ def fake_compute_api(cls, req, id): return True -def find_host(self, context, instance_id): - return "nova" - - class MockSetAdminPassword(object): def __init__(self): self.instance_id = None @@ -1564,7 +1560,6 @@ class ServersControllerCreateTest(test.TestCase): self.stubs.Set(nova.db, 'queue_get_for', queue_get_for) self.stubs.Set(nova.network.manager.VlanManager, 'allocate_fixed_ip', fake_method) - self.stubs.Set(nova.compute.api.API, "_find_host", find_host) def _test_create_instance(self): image_uuid = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77' diff --git a/nova/tests/test_compute.py b/nova/tests/test_compute.py index 88d4bf54b..44707fe33 100644 --- a/nova/tests/test_compute.py +++ b/nova/tests/test_compute.py @@ -147,6 +147,7 @@ class BaseTestCase(test.TestCase): inst['launch_time'] = '10' inst['user_id'] = self.user_id inst['project_id'] = self.project_id + inst['host'] = 'fake_host' type_id = instance_types.get_instance_type_by_name(type_name)['id'] inst['instance_type_id'] = type_id inst['ami_launch_index'] = 0 @@ -3024,6 +3025,10 @@ class ComputePolicyTestCase(BaseTestCase): def test_wrapped_method(self): instance = self._create_fake_instance() + # Reset this to None for this policy check. If it's set, it + # tries to do a compute_api.update() and we're not testing for + # that here. + instance['host'] = None self.compute.run_instance(self.context, instance['uuid']) # force delete to fail |
