summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--nova/compute/api.py212
-rw-r--r--nova/tests/api/openstack/compute/test_server_actions.py2
-rw-r--r--nova/tests/api/openstack/compute/test_servers.py5
-rw-r--r--nova/tests/test_compute.py5
4 files changed, 99 insertions, 125 deletions
diff --git a/nova/compute/api.py b/nova/compute/api.py
index 5254757de..f154103cf 100644
--- a/nova/compute/api.py
+++ b/nova/compute/api.py
@@ -760,11 +760,13 @@ class API(base.Base):
self.db.instance_add_security_group(context.elevated(),
instance_uuid,
security_group['id'])
- host = instance['host']
- rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.compute_topic, host),
- {"method": "refresh_security_group_rules",
- "args": {"security_group_id": security_group['id']}})
+ params = {"security_group_id": security_group['id']}
+ # NOTE(comstud): No instance_uuid argument to this compute manager
+ # call
+ self._cast_compute_message('refresh_security_group_rules',
+ context,
+ host=instance['host'],
+ params=params)
@wrap_check_policy
def remove_security_group(self, context, instance, security_group_name):
@@ -789,11 +791,13 @@ class API(base.Base):
self.db.instance_remove_security_group(context.elevated(),
instance_uuid,
security_group['id'])
- host = instance['host']
- rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.compute_topic, host),
- {"method": "refresh_security_group_rules",
- "args": {"security_group_id": security_group['id']}})
+ params = {"security_group_id": security_group['id']}
+ # NOTE(comstud): No instance_uuid argument to this compute manager
+ # call
+ self._cast_compute_message('refresh_security_group_rules',
+ context,
+ host=instance['host'],
+ params=params)
@wrap_check_policy
@scheduler_api.reroute_compute("update")
@@ -835,7 +839,7 @@ class API(base.Base):
deleted_at=utils.utcnow())
self._cast_compute_message('power_off_instance', context,
- instance_uuid, host)
+ instance)
else:
LOG.warning(_("No host for instance %s, deleting immediately"),
instance["uuid"])
@@ -850,7 +854,7 @@ class API(base.Base):
progress=0)
self._cast_compute_message('terminate_instance', context,
- instance['uuid'], host)
+ instance)
else:
self.db.instance_destroy(context, instance['id'])
@@ -888,7 +892,7 @@ class API(base.Base):
instance,
task_state=task_states.POWERING_ON)
self._cast_compute_message('power_on_instance', context,
- instance['uuid'], host)
+ instance)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.SOFT_DELETE])
@@ -914,15 +918,9 @@ class API(base.Base):
terminated_at=utils.utcnow(),
progress=0)
- host = instance['host']
- if not host:
- return
-
- if do_cast:
- self._cast_compute_message('stop_instance', context,
- instance_uuid, host)
- else:
- self._call_compute_message('stop_instance', context, instance)
+ rpc_method = rpc.cast if do_cast else rpc.call
+ self._cast_or_call_compute_message(rpc_method, 'stop_instance',
+ context, instance)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.STOPPED, vm_states.SHUTOFF])
@@ -1110,10 +1108,17 @@ class API(base.Base):
return self.db.instance_get_all_by_filters(context, filters)
- def _cast_compute_message(self, method, context, instance_uuid, host=None,
- params=None):
- """Generic handler for RPC casts to compute.
-
+ def _cast_or_call_compute_message(self, rpc_method, compute_method,
+ context, instance=None, host=None, params=None):
+ """Generic handler for RPC casts and calls to compute.
+
+ :param rpc_method: RPC method to use (rpc.call or rpc.cast)
+ :param compute_method: Compute manager method to call
+ :param context: RequestContext of caller
+ :param instance: The instance object to use to find host to send to
+ Can be None to not include instance_uuid in args
+ :param host: Optional host to send to instead of instance['host']
+ Must be specified if 'instance' is None
:param params: Optional dictionary of arguments to be passed to the
compute worker
@@ -1122,49 +1127,30 @@ class API(base.Base):
if not params:
params = {}
if not host:
- instance = self.get(context, instance_uuid)
+ if not instance:
+ raise exception.Error(_("No compute host specified"))
host = instance['host']
+ if not host:
+ raise exception.Error(_("Unable to find host for "
+ "Instance %s") % instance['uuid'])
queue = self.db.queue_get_for(context, FLAGS.compute_topic, host)
- #TODO (ameade): this check should be removed after everything
- #in compute manager expects uuids instead of ids
- if utils.is_uuid_like(instance_uuid):
- params['instance_uuid'] = instance_uuid
- else:
- params['instance_id'] = instance_uuid
- kwargs = {'method': method, 'args': params}
- rpc.cast(context, queue, kwargs)
+ if instance:
+ params['instance_uuid'] = instance['uuid']
+ kwargs = {'method': compute_method, 'args': params}
+ return rpc_method(context, queue, kwargs)
- def _call_compute_message(self, method, context, instance, params=None):
- """Generic handler for RPC calls to compute.
+ def _cast_compute_message(self, *args, **kwargs):
+ """Generic handler for RPC casts to compute."""
+ self._cast_or_call_compute_message(rpc.cast, *args, **kwargs)
- :param params: Optional dictionary of arguments to be passed to the
- compute worker
-
- :returns: Result returned by compute worker
- """
- if not params:
- params = {}
- host = instance['host']
- queue = self.db.queue_get_for(context, FLAGS.compute_topic, host)
- params['instance_uuid'] = instance['uuid']
- kwargs = {'method': method, 'args': params}
- return rpc.call(context, queue, kwargs)
+ def _call_compute_message(self, *args, **kwargs):
+ """Generic handler for RPC calls to compute."""
+ return self._cast_or_call_compute_message(rpc.call, *args, **kwargs)
def _cast_scheduler_message(self, context, args):
"""Generic handler for RPC calls to the scheduler."""
rpc.cast(context, FLAGS.scheduler_topic, args)
- def _find_host(self, context, instance_uuid):
- """Find the host associated with an instance."""
- for attempts in xrange(FLAGS.find_host_timeout):
- instance = self.get(context, instance_uuid)
- host = instance["host"]
- if host:
- return host
- time.sleep(1)
- raise exception.Error(_("Unable to find host for Instance %s")
- % instance_uuid)
-
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.SHUTOFF],
task_state=[None, task_states.RESIZE_VERIFY])
@@ -1243,8 +1229,8 @@ class API(base.Base):
recv_meta = self.image_service.create(context, sent_meta)
params = {'image_id': recv_meta['id'], 'image_type': image_type,
'backup_type': backup_type, 'rotation': rotation}
- self._cast_compute_message('snapshot_instance', context, instance_uuid,
- params=params)
+ self._cast_compute_message('snapshot_instance', context,
+ instance, params=params)
return recv_meta
def _get_minram_mindisk_params(self, context, instance):
@@ -1281,7 +1267,7 @@ class API(base.Base):
task_state=state)
self._cast_compute_message('reboot_instance',
context,
- instance['uuid'],
+ instance,
params={'reboot_type': reboot_type})
def _validate_image_href(self, context, image_href):
@@ -1320,7 +1306,7 @@ class API(base.Base):
self._cast_compute_message('rebuild_instance',
context,
- instance["uuid"],
+ instance,
params=rebuild_params)
@wrap_check_policy
@@ -1343,8 +1329,8 @@ class API(base.Base):
params = {'migration_id': migration_ref['id']}
self._cast_compute_message('revert_resize', context,
- instance['uuid'],
- migration_ref['dest_compute'],
+ instance,
+ host=migration_ref['dest_compute'],
params=params)
self.db.migration_update(context, migration_ref['id'],
@@ -1370,8 +1356,8 @@ class API(base.Base):
params = {'migration_id': migration_ref['id']}
self._cast_compute_message('confirm_resize', context,
- instance['uuid'],
- migration_ref['source_compute'],
+ instance,
+ host=migration_ref['source_compute'],
params=params)
self.db.migration_update(context, migration_ref['id'],
@@ -1442,20 +1428,18 @@ class API(base.Base):
@scheduler_api.reroute_compute("add_fixed_ip")
def add_fixed_ip(self, context, instance, network_id):
"""Add fixed_ip from specified network to given instance."""
- instance_uuid = instance['uuid']
self._cast_compute_message('add_fixed_ip_to_instance',
context,
- instance_uuid,
+ instance,
params=dict(network_id=network_id))
@wrap_check_policy
@scheduler_api.reroute_compute("remove_fixed_ip")
def remove_fixed_ip(self, context, instance, address):
"""Remove fixed_ip from specified network to given instance."""
- instance_uuid = instance['uuid']
self._cast_compute_message('remove_fixed_ip_from_instance',
context,
- instance_uuid,
+ instance,
params=dict(address=address))
@wrap_check_policy
@@ -1465,40 +1449,36 @@ class API(base.Base):
@scheduler_api.reroute_compute("pause")
def pause(self, context, instance):
"""Pause the given instance."""
- instance_uuid = instance["uuid"]
self.update(context,
instance,
vm_state=vm_states.ACTIVE,
task_state=task_states.PAUSING)
- self._cast_compute_message('pause_instance', context, instance_uuid)
+ self._cast_compute_message('pause_instance', context, instance)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.PAUSED])
@scheduler_api.reroute_compute("unpause")
def unpause(self, context, instance):
"""Unpause the given instance."""
- instance_uuid = instance["uuid"]
self.update(context,
instance,
vm_state=vm_states.PAUSED,
task_state=task_states.UNPAUSING)
- self._cast_compute_message('unpause_instance', context, instance_uuid)
-
- def _call_compute_message_for_host(self, action, context, host, params):
- """Call method deliberately designed to make host/service only calls"""
- queue = self.db.queue_get_for(context, FLAGS.compute_topic, host)
- kwargs = {'method': action, 'args': params}
- return rpc.call(context, queue, kwargs)
+ self._cast_compute_message('unpause_instance', context, instance)
def set_host_enabled(self, context, host, enabled):
"""Sets the specified host's ability to accept new instances."""
- return self._call_compute_message_for_host("set_host_enabled", context,
+ # NOTE(comstud): No instance_uuid argument to this compute manager
+ # call
+ return self._call_compute_message("set_host_enabled", context,
host=host, params={"enabled": enabled})
def host_power_action(self, context, host, action):
"""Reboots, shuts down or powers up the host."""
- return self._call_compute_message_for_host("host_power_action",
- context, host=host, params={"action": action})
+ # NOTE(comstud): No instance_uuid argument to this compute manager
+ # call
+ return self._call_compute_message("host_power_action", context,
+ host=host, params={"action": action})
@wrap_check_policy
@scheduler_api.reroute_compute("diagnostics")
@@ -1520,24 +1500,22 @@ class API(base.Base):
@scheduler_api.reroute_compute("suspend")
def suspend(self, context, instance):
"""Suspend the given instance."""
- instance_uuid = instance["uuid"]
self.update(context,
instance,
vm_state=vm_states.ACTIVE,
task_state=task_states.SUSPENDING)
- self._cast_compute_message('suspend_instance', context, instance_uuid)
+ self._cast_compute_message('suspend_instance', context, instance)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.SUSPENDED])
@scheduler_api.reroute_compute("resume")
def resume(self, context, instance):
"""Resume the given instance."""
- instance_uuid = instance["uuid"]
self.update(context,
instance,
vm_state=vm_states.SUSPENDED,
task_state=task_states.RESUMING)
- self._cast_compute_message('resume_instance', context, instance_uuid)
+ self._cast_compute_message('resume_instance', context, instance)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.SHUTOFF,
@@ -1555,7 +1533,7 @@ class API(base.Base):
"rescue_password": rescue_password
}
self._cast_compute_message('rescue_instance', context,
- instance['uuid'],
+ instance,
params=rescue_params)
@wrap_check_policy
@@ -1568,25 +1546,21 @@ class API(base.Base):
vm_state=vm_states.RESCUED,
task_state=task_states.UNRESCUING)
self._cast_compute_message('unrescue_instance', context,
- instance['uuid'])
+ instance)
@wrap_check_policy
+ @check_instance_state(vm_state=[vm_states.ACTIVE])
@scheduler_api.reroute_compute("set_admin_password")
def set_admin_password(self, context, instance, password=None):
"""Set the root/admin password for the given instance."""
- instance_uuid = instance['uuid']
self.update(context,
instance,
task_state=task_states.UPDATING_PASSWORD)
- host = self._find_host(context, instance['uuid'])
-
- rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.compute_topic, host),
- {
- "method": "set_admin_password",
- "args": {
- "instance_uuid": instance_uuid, "new_pass": password}})
+ params = {"new_pass": password}
+ self._cast_compute_message('set_admin_password', context,
+ instance,
+ params=params)
@wrap_check_policy
@scheduler_api.reroute_compute("inject_file")
@@ -1594,7 +1568,7 @@ class API(base.Base):
"""Write a file to the given instance."""
params = {'path': path, 'file_contents': file_contents}
self._cast_compute_message('inject_file', context,
- instance['uuid'], params=params)
+ instance, params=params)
@wrap_check_policy
def get_ajax_console(self, context, instance):
@@ -1631,22 +1605,23 @@ class API(base.Base):
@wrap_check_policy
def get_console_output(self, context, instance, tail_length=None):
"""Get console output for an an instance."""
+ params = {'tail_length': tail_length}
return self._call_compute_message('get_console_output',
context,
instance,
- {'tail_length': tail_length})
+ params=params)
@wrap_check_policy
def lock(self, context, instance):
"""Lock the given instance."""
- self._cast_compute_message('lock_instance', context, instance['uuid'])
+ self._cast_compute_message('lock_instance', context, instance)
@wrap_check_policy
def unlock(self, context, instance):
"""Unlock the given instance."""
self._cast_compute_message('unlock_instance',
context,
- instance['uuid'])
+ instance)
@wrap_check_policy
def get_lock(self, context, instance):
@@ -1656,13 +1631,13 @@ class API(base.Base):
@wrap_check_policy
def reset_network(self, context, instance):
"""Reset networking on the instance."""
- self._cast_compute_message('reset_network', context, instance['uuid'])
+ self._cast_compute_message('reset_network', context, instance)
@wrap_check_policy
def inject_network_info(self, context, instance):
"""Inject network info for the instance."""
self._cast_compute_message('inject_network_info', context,
- instance['uuid'])
+ instance)
@wrap_check_policy
def attach_volume(self, context, instance, volume_id, device):
@@ -1672,14 +1647,14 @@ class API(base.Base):
"Example device: /dev/vdb") % device)
volume = self.volume_api.get(context, volume_id)
self.volume_api.check_attach(context, volume)
- host = instance['host']
- rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.compute_topic, host),
- {"method": "attach_volume",
- "args": {"volume_id": volume_id,
- "instance_uuid": instance['uuid'],
- "mountpoint": device}})
+ params = {"volume_id": volume_id,
+ "mountpoint": device}
+ self._cast_compute_message('attach_volume', context,
+ instance,
+ params=params)
+ # FIXME(comstud): I wonder if API should pull in the instance from
+ # the volume ID via volume API and pass it and the volume object here
def detach_volume(self, context, volume_id):
"""Detach a volume from an instance."""
instance = self.db.volume_get_instance(context.elevated(), volume_id)
@@ -1690,12 +1665,11 @@ class API(base.Base):
volume = self.volume_api.get(context, volume_id)
self.volume_api.check_detach(context, volume)
- host = instance['host']
- rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.compute_topic, host),
- {"method": "detach_volume",
- "args": {"instance_uuid": instance['uuid'],
- "volume_id": volume_id}})
+
+ params = {'volume_id': volume_id}
+ self._cast_compute_message('detach_volume', context,
+ instance,
+ params=params)
return instance
@wrap_check_policy
diff --git a/nova/tests/api/openstack/compute/test_server_actions.py b/nova/tests/api/openstack/compute/test_server_actions.py
index a0c4c390e..08612adb1 100644
--- a/nova/tests/api/openstack/compute/test_server_actions.py
+++ b/nova/tests/api/openstack/compute/test_server_actions.py
@@ -102,7 +102,7 @@ def stub_instance(id, metadata=None, image_ref="10", flavor_id="1",
"vcpus": 0,
"local_gb": 0,
"hostname": "",
- "host": "",
+ "host": "fake_host",
"instance_type": dict(inst_type),
"user_data": "",
"reservation_id": "",
diff --git a/nova/tests/api/openstack/compute/test_servers.py b/nova/tests/api/openstack/compute/test_servers.py
index 0fafa1e77..f545aeaec 100644
--- a/nova/tests/api/openstack/compute/test_servers.py
+++ b/nova/tests/api/openstack/compute/test_servers.py
@@ -150,10 +150,6 @@ def fake_compute_api(cls, req, id):
return True
-def find_host(self, context, instance_id):
- return "nova"
-
-
class MockSetAdminPassword(object):
def __init__(self):
self.instance_id = None
@@ -1564,7 +1560,6 @@ class ServersControllerCreateTest(test.TestCase):
self.stubs.Set(nova.db, 'queue_get_for', queue_get_for)
self.stubs.Set(nova.network.manager.VlanManager, 'allocate_fixed_ip',
fake_method)
- self.stubs.Set(nova.compute.api.API, "_find_host", find_host)
def _test_create_instance(self):
image_uuid = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'
diff --git a/nova/tests/test_compute.py b/nova/tests/test_compute.py
index 88d4bf54b..44707fe33 100644
--- a/nova/tests/test_compute.py
+++ b/nova/tests/test_compute.py
@@ -147,6 +147,7 @@ class BaseTestCase(test.TestCase):
inst['launch_time'] = '10'
inst['user_id'] = self.user_id
inst['project_id'] = self.project_id
+ inst['host'] = 'fake_host'
type_id = instance_types.get_instance_type_by_name(type_name)['id']
inst['instance_type_id'] = type_id
inst['ami_launch_index'] = 0
@@ -3024,6 +3025,10 @@ class ComputePolicyTestCase(BaseTestCase):
def test_wrapped_method(self):
instance = self._create_fake_instance()
+ # Reset this to None for this policy check. If it's set, it
+ # tries to do a compute_api.update() and we're not testing for
+ # that here.
+ instance['host'] = None
self.compute.run_instance(self.context, instance['uuid'])
# force delete to fail