diff options
| author | Chris Behrens <cbehrens@codestud.com> | 2012-03-12 20:13:39 +0000 |
|---|---|---|
| committer | Chris Behrens <cbehrens@codestud.com> | 2012-03-12 21:45:59 +0000 |
| commit | 1747cc366dd93534db78ca393ec8e39a7652dc27 (patch) | |
| tree | ca2667fd979de9c644f1bd5c54786a40cad65624 /nova | |
| parent | 625c5e45db62c59691b16fadde464a192af3ac3f (diff) | |
| download | nova-1747cc366dd93534db78ca393ec8e39a7652dc27.tar.gz nova-1747cc366dd93534db78ca393ec8e39a7652dc27.tar.xz nova-1747cc366dd93534db78ca393ec8e39a7652dc27.zip | |
Move (cast|call)_compute_message methods back into compute API class
Those calls need to be able to be overridden in a subclass to support
compute cells, at least initially. A prior patch that moved them to
module functions broke the cells work.
Change-Id: I6c35fa1ac646fe686bbc2c5eed2da7e286d32a15
Diffstat (limited to 'nova')
| -rw-r--r-- | nova/compute/api.py | 213 | ||||
| -rw-r--r-- | nova/tests/test_compute.py | 6 |
2 files changed, 106 insertions, 113 deletions
diff --git a/nova/compute/api.py b/nova/compute/api.py index 73a4ef1b6..e1bebfc90 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -118,6 +118,49 @@ class API(base.Base): self.volume_api = volume_api or volume.API() super(API, self).__init__(**kwargs) + def _cast_or_call_compute_message(self, rpc_method, compute_method, + context, instance=None, host=None, params=None): + """Generic handler for RPC casts and calls to compute. + + :param rpc_method: RPC method to use (rpc.call or rpc.cast) + :param compute_method: Compute manager method to call + :param context: RequestContext of caller + :param instance: The instance object to use to find host to send to + Can be None to not include instance_uuid in args + :param host: Optional host to send to instead of instance['host'] + Must be specified if 'instance' is None + :param params: Optional dictionary of arguments to be passed to the + compute worker + + :returns: None + """ + if not params: + params = {} + if not host: + if not instance: + raise exception.Error(_("No compute host specified")) + host = instance['host'] + if not host: + raise exception.Error(_("Unable to find host for " + "Instance %s") % instance['uuid']) + queue = self.db.queue_get_for(context, FLAGS.compute_topic, host) + if instance: + params['instance_uuid'] = instance['uuid'] + kwargs = {'method': compute_method, 'args': params} + return rpc_method(context, queue, kwargs) + + def _cast_compute_message(self, *args, **kwargs): + """Generic handler for RPC casts to compute.""" + self._cast_or_call_compute_message(rpc.cast, *args, **kwargs) + + def _call_compute_message(self, *args, **kwargs): + """Generic handler for RPC calls to compute.""" + return self._cast_or_call_compute_message(rpc.call, *args, **kwargs) + + def _cast_scheduler_message(self, context, args): + """Generic handler for RPC calls to the scheduler.""" + rpc.cast(context, FLAGS.scheduler_topic, args) + def _check_injected_file_quota(self, context, injected_files): """Enforce quota limits on injected files. @@ -773,8 +816,8 @@ class API(base.Base): params = {"security_group_id": security_group['id']} # NOTE(comstud): No instance_uuid argument to this compute manager # call - _cast_compute_message(self.db, 'refresh_security_group_rules', context, - host=instance['host'], params=params) + self._cast_compute_message('refresh_security_group_rules', + context, host=instance['host'], params=params) @wrap_check_policy def remove_security_group(self, context, instance, security_group_name): @@ -802,8 +845,8 @@ class API(base.Base): params = {"security_group_id": security_group['id']} # NOTE(comstud): No instance_uuid argument to this compute manager # call - _cast_compute_message(self.db, 'refresh_security_group_rules', - context, host=instance['host'], params=params) + self._cast_compute_message('refresh_security_group_rules', + context, host=instance['host'], params=params) @wrap_check_policy def update(self, context, instance, **kwargs): @@ -842,8 +885,8 @@ class API(base.Base): task_state=task_states.POWERING_OFF, deleted_at=utils.utcnow()) - _cast_compute_message(self.db, 'power_off_instance', context, - instance) + self._cast_compute_message('power_off_instance', + context, instance) else: LOG.warning(_('No host for instance, deleting immediately'), instance=instance) @@ -862,8 +905,8 @@ class API(base.Base): task_state=task_states.DELETING, progress=0) - _cast_compute_message(self.db, 'terminate_instance', context, - instance) + self._cast_compute_message('terminate_instance', + context, instance) else: self.db.instance_destroy(context, instance['id']) except exception.InstanceNotFound: @@ -901,8 +944,8 @@ class API(base.Base): self.update(context, instance, task_state=task_states.POWERING_ON) - _cast_compute_message(self.db, 'power_on_instance', - context, instance) + self._cast_compute_message('power_on_instance', + context, instance) @wrap_check_policy @check_instance_state(vm_state=[vm_states.SOFT_DELETE]) @@ -927,8 +970,8 @@ class API(base.Base): progress=0) rpc_method = rpc.cast if do_cast else rpc.call - _cast_or_call_compute_message(self.db, rpc_method, 'stop_instance', - context, instance) + self._cast_or_call_compute_message(rpc_method, 'stop_instance', + context, instance) @wrap_check_policy @check_instance_state(vm_state=[vm_states.STOPPED, vm_states.SHUTOFF]) @@ -956,7 +999,7 @@ class API(base.Base): # TODO(yamahata): injected_files isn't supported right now. # It is used only for osapi. not for ec2 api. # availability_zone isn't used by run_instance. - _cast_compute_message(self.db, 'start_instance', context, instance) + self._cast_compute_message('start_instance', context, instance) #NOTE(bcwaldon): no policy check here since it should be rolled in to # search_opts in get_all @@ -1164,8 +1207,8 @@ class API(base.Base): recv_meta = self.image_service.create(context, sent_meta) params = {'image_id': recv_meta['id'], 'image_type': image_type, 'backup_type': backup_type, 'rotation': rotation} - _cast_compute_message(self.db, 'snapshot_instance', context, - instance, params=params) + self._cast_compute_message('snapshot_instance', context, instance, + params=params) return recv_meta def _get_minram_mindisk_params(self, context, instance): @@ -1199,8 +1242,8 @@ class API(base.Base): instance, vm_state=vm_states.ACTIVE, task_state=state) - _cast_compute_message(self.db, 'reboot_instance', context, - instance, params={'reboot_type': reboot_type}) + self._cast_compute_message('reboot_instance', context, instance, + params={'reboot_type': reboot_type}) def _validate_image_href(self, context, image_href): """Throws an ImageNotFound exception if image_href does not exist.""" @@ -1235,8 +1278,8 @@ class API(base.Base): "injected_files": files_to_inject, } - _cast_compute_message(self.db, 'rebuild_instance', context, - instance, params=rebuild_params) + self._cast_compute_message('rebuild_instance', context, instance, + params=rebuild_params) @wrap_check_policy @check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.SHUTOFF], @@ -1256,9 +1299,8 @@ class API(base.Base): task_state=task_states.RESIZE_REVERTING) params = {'migration_id': migration_ref['id']} - _cast_compute_message(self.db, 'revert_resize', context, instance, - host=migration_ref['dest_compute'], - params=params) + self._cast_compute_message('revert_resize', context, instance, + host=migration_ref['dest_compute'], params=params) self.db.migration_update(context, migration_ref['id'], {'status': 'reverted'}) @@ -1281,9 +1323,8 @@ class API(base.Base): task_state=None) params = {'migration_id': migration_ref['id']} - _cast_compute_message(self.db, 'confirm_resize', context, instance, - host=migration_ref['source_compute'], - params=params) + self._cast_compute_message('confirm_resize', context, instance, + host=migration_ref['source_compute'], params=params) self.db.migration_update(context, migration_ref['id'], {'status': 'confirmed'}) @@ -1352,21 +1393,21 @@ class API(base.Base): "request_spec": utils.to_primitive(request_spec), "filter_properties": filter_properties, } - _cast_scheduler_message(context, + self._cast_scheduler_message(context, {"method": "prep_resize", "args": args}) @wrap_check_policy def add_fixed_ip(self, context, instance, network_id): """Add fixed_ip from specified network to given instance.""" - _cast_compute_message(self.db, 'add_fixed_ip_to_instance', context, - instance, params=dict(network_id=network_id)) + self._cast_compute_message('add_fixed_ip_to_instance', context, + instance, params=dict(network_id=network_id)) @wrap_check_policy def remove_fixed_ip(self, context, instance, address): """Remove fixed_ip from specified network to given instance.""" - _cast_compute_message(self.db, 'remove_fixed_ip_from_instance', - context, instance, params=dict(address=address)) + self._cast_compute_message('remove_fixed_ip_from_instance', + context, instance, params=dict(address=address)) @wrap_check_policy @check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.SHUTOFF, @@ -1378,7 +1419,7 @@ class API(base.Base): instance, vm_state=vm_states.ACTIVE, task_state=task_states.PAUSING) - _cast_compute_message(self.db, 'pause_instance', context, instance) + self._cast_compute_message('pause_instance', context, instance) @wrap_check_policy @check_instance_state(vm_state=[vm_states.PAUSED]) @@ -1388,13 +1429,13 @@ class API(base.Base): instance, vm_state=vm_states.PAUSED, task_state=task_states.UNPAUSING) - _cast_compute_message(self.db, 'unpause_instance', context, instance) + self._cast_compute_message('unpause_instance', context, instance) @wrap_check_policy def get_diagnostics(self, context, instance): """Retrieve diagnostics for the given instance.""" - return _call_compute_message(self.db, "get_diagnostics", context, - instance) + return self._call_compute_message("get_diagnostics", context, + instance) @wrap_check_policy def get_actions(self, context, instance): @@ -1411,7 +1452,7 @@ class API(base.Base): instance, vm_state=vm_states.ACTIVE, task_state=task_states.SUSPENDING) - _cast_compute_message(self.db, 'suspend_instance', context, instance) + self._cast_compute_message('suspend_instance', context, instance) @wrap_check_policy @check_instance_state(vm_state=[vm_states.SUSPENDED]) @@ -1421,7 +1462,7 @@ class API(base.Base): instance, vm_state=vm_states.SUSPENDED, task_state=task_states.RESUMING) - _cast_compute_message(self.db, 'resume_instance', context, instance) + self._cast_compute_message('resume_instance', context, instance) @wrap_check_policy @check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.SHUTOFF, @@ -1437,8 +1478,8 @@ class API(base.Base): rescue_params = { "rescue_password": rescue_password } - _cast_compute_message(self.db, 'rescue_instance', context, - instance, params=rescue_params) + self._cast_compute_message('rescue_instance', context, instance, + params=rescue_params) @wrap_check_policy @check_instance_state(vm_state=[vm_states.RESCUED]) @@ -1448,7 +1489,7 @@ class API(base.Base): instance, vm_state=vm_states.RESCUED, task_state=task_states.UNRESCUING) - _cast_compute_message(self.db, 'unrescue_instance', context, instance) + self._cast_compute_message('unrescue_instance', context, instance) @wrap_check_policy @check_instance_state(vm_state=[vm_states.ACTIVE]) @@ -1459,22 +1500,21 @@ class API(base.Base): task_state=task_states.UPDATING_PASSWORD) params = {"new_pass": password} - _cast_compute_message(self.db, 'set_admin_password', context, - instance, params=params) + self._cast_compute_message('set_admin_password', context, instance, + params=params) @wrap_check_policy def inject_file(self, context, instance, path, file_contents): """Write a file to the given instance.""" params = {'path': path, 'file_contents': file_contents} - _cast_compute_message(self.db, 'inject_file', context, - instance, params=params) + self._cast_compute_message('inject_file', context, instance, + params=params) @wrap_check_policy def get_vnc_console(self, context, instance, console_type): """Get a url to an instance Console.""" - connect_info = _call_compute_message(self.db, 'get_vnc_console', - context, instance, - params={"console_type": console_type}) + connect_info = self._call_compute_message('get_vnc_console', + context, instance, params={"console_type": console_type}) rpc.call(context, '%s' % FLAGS.consoleauth_topic, {'method': 'authorize_console', @@ -1491,18 +1531,18 @@ class API(base.Base): def get_console_output(self, context, instance, tail_length=None): """Get console output for an an instance.""" params = {'tail_length': tail_length} - return _call_compute_message(self.db, 'get_console_output', context, - instance, params=params) + return self._call_compute_message('get_console_output', context, + instance, params=params) @wrap_check_policy def lock(self, context, instance): """Lock the given instance.""" - _cast_compute_message(self.db, 'lock_instance', context, instance) + self._cast_compute_message('lock_instance', context, instance) @wrap_check_policy def unlock(self, context, instance): """Unlock the given instance.""" - _cast_compute_message(self.db, 'unlock_instance', context, instance) + self._cast_compute_message('unlock_instance', context, instance) @wrap_check_policy def get_lock(self, context, instance): @@ -1512,13 +1552,12 @@ class API(base.Base): @wrap_check_policy def reset_network(self, context, instance): """Reset networking on the instance.""" - _cast_compute_message(self.db, 'reset_network', context, instance) + self._cast_compute_message('reset_network', context, instance) @wrap_check_policy def inject_network_info(self, context, instance): """Inject network info for the instance.""" - _cast_compute_message(self.db, 'inject_network_info', - context, instance) + self._cast_compute_message('inject_network_info', context, instance) @wrap_check_policy def attach_volume(self, context, instance, volume_id, device): @@ -1530,8 +1569,8 @@ class API(base.Base): self.volume_api.reserve_volume(context, volume) params = {"volume_id": volume_id, "mountpoint": device} - _cast_compute_message(self.db, 'attach_volume', context, instance, - params=params) + self._cast_compute_message('attach_volume', context, instance, + params=params) # FIXME(comstud): I wonder if API should pull in the instance from # the volume ID via volume API and pass it and the volume object here @@ -1547,8 +1586,8 @@ class API(base.Base): self.volume_api.check_detach(context, volume) params = {'volume_id': volume_id} - _cast_compute_message(self.db, 'detach_volume', context, instance, - params=params) + self._cast_compute_message('detach_volume', context, instance, + params=params) return instance @wrap_check_policy @@ -1640,22 +1679,21 @@ class HostAPI(base.Base): """Sets the specified host's ability to accept new instances.""" # NOTE(comstud): No instance_uuid argument to this compute manager # call - return _call_compute_message(self.db, "set_host_enabled", context, - host=host, params={"enabled": enabled}) + return self._call_compute_message("set_host_enabled", context, + host=host, params={"enabled": enabled}) def host_power_action(self, context, host, action): """Reboots, shuts down or powers up the host.""" # NOTE(comstud): No instance_uuid argument to this compute manager # call - return _call_compute_message(self.db, "host_power_action", context, - host=host, params={"action": action}) + return self._call_compute_message("host_power_action", context, + host=host, params={"action": action}) def set_host_maintenance(self, context, host, mode): """Start/Stop host maintenance window. On start, it triggers guest VMs evacuation.""" - return _call_compute_message(self.db, "host_maintenance_mode", context, - host=host, params={"host": host, - "mode": mode}) + return self._call_compute_message("host_maintenance_mode", context, + host=host, params={"host": host, "mode": mode}) class AggregateAPI(base.Base): @@ -1791,50 +1829,3 @@ class AggregateAPI(base.Base): result["metadata"] = metadata result["hosts"] = hosts return result - - -def _cast_or_call_compute_message(db, rpc_method, compute_method, - context, instance=None, host=None, params=None): - """Generic handler for RPC casts and calls to compute. - - :param rpc_method: RPC method to use (rpc.call or rpc.cast) - :param compute_method: Compute manager method to call - :param context: RequestContext of caller - :param instance: The instance object to use to find host to send to - Can be None to not include instance_uuid in args - :param host: Optional host to send to instead of instance['host'] - Must be specified if 'instance' is None - :param params: Optional dictionary of arguments to be passed to the - compute worker - - :returns: None - """ - if not params: - params = {} - if not host: - if not instance: - raise exception.Error(_("No compute host specified")) - host = instance['host'] - if not host: - raise exception.Error(_("Unable to find host for " - "Instance %s") % instance['uuid']) - queue = db.queue_get_for(context, FLAGS.compute_topic, host) - if instance: - params['instance_uuid'] = instance['uuid'] - kwargs = {'method': compute_method, 'args': params} - return rpc_method(context, queue, kwargs) - - -def _cast_compute_message(db, *args, **kwargs): - """Generic handler for RPC casts to compute.""" - _cast_or_call_compute_message(db, rpc.cast, *args, **kwargs) - - -def _call_compute_message(db, *args, **kwargs): - """Generic handler for RPC calls to compute.""" - return _cast_or_call_compute_message(db, rpc.call, *args, **kwargs) - - -def _cast_scheduler_message(context, args): - """Generic handler for RPC calls to the scheduler.""" - rpc.cast(context, FLAGS.scheduler_topic, args) diff --git a/nova/tests/test_compute.py b/nova/tests/test_compute.py index f30fefeba..a291b2c80 100644 --- a/nova/tests/test_compute.py +++ b/nova/tests/test_compute.py @@ -2434,7 +2434,8 @@ class ComputeAPITestCase(BaseTestCase): self.assertEqual(instance_properties['host'], 'host2') self.assertIn('host2', filter_properties['ignore_hosts']) - self.stubs.Set(compute.api, '_cast_scheduler_message', _fake_cast) + self.stubs.Set(self.compute_api, '_cast_scheduler_message', + _fake_cast) context = self.context.elevated() instance = self._create_fake_instance(dict(host='host2')) @@ -2453,7 +2454,8 @@ class ComputeAPITestCase(BaseTestCase): self.assertEqual(instance_properties['host'], 'host2') self.assertNotIn('host2', filter_properties['ignore_hosts']) - self.stubs.Set(compute.api, '_cast_scheduler_message', _fake_cast) + self.stubs.Set(self.compute_api, '_cast_scheduler_message', + _fake_cast) self.flags(allow_resize_to_same_host=True) context = self.context.elevated() |
