summaryrefslogtreecommitdiffstats
path: root/nova/compute
diff options
context:
space:
mode:
authorRussell Bryant <rbryant@redhat.com>2012-05-10 19:28:04 -0400
committerRussell Bryant <rbryant@redhat.com>2012-05-23 07:59:36 -0400
commit7e15d4e28f98e13f0ea7399787c50839139d8492 (patch)
tree174c2d372ec464565c82babc9abe672f667f08c5 /nova/compute
parent21e9d2e2b793241d8264833cfd9a9c94d023df8e (diff)
Add version to compute rpc API.
Part of blueprint versioned-rpc-apis. Change-Id: I5943d1fae2c96cfe519817b59098402481a1026b
Diffstat (limited to 'nova/compute')
-rw-r--r--nova/compute/api.py240
-rw-r--r--nova/compute/manager.py2
-rw-r--r--nova/compute/rpcapi.py276
3 files changed, 361 insertions, 157 deletions
diff --git a/nova/compute/api.py b/nova/compute/api.py
index 04a752ed0..f1410df29 100644
--- a/nova/compute/api.py
+++ b/nova/compute/api.py
@@ -30,6 +30,7 @@ from nova import block_device
from nova.compute import aggregate_states
from nova.compute import instance_types
from nova.compute import power_state
+from nova.compute import rpcapi as compute_rpcapi
from nova.compute import task_states
from nova.compute import vm_states
from nova.consoleauth import rpcapi as consoleauth_rpcapi
@@ -103,52 +104,7 @@ def check_policy(context, action, target):
nova.policy.enforce(context, _action, target)
-class BaseAPI(base.Base):
- """Base API class."""
- def __init__(self, **kwargs):
- super(BaseAPI, self).__init__(**kwargs)
-
- def _cast_or_call_compute_message(self, rpc_method, compute_method,
- context, instance=None, host=None, params=None):
- """Generic handler for RPC casts and calls to compute.
-
- :param rpc_method: RPC method to use (rpc.call or rpc.cast)
- :param compute_method: Compute manager method to call
- :param context: RequestContext of caller
- :param instance: The instance object to use to find host to send to
- Can be None to not include instance_uuid in args
- :param host: Optional host to send to instead of instance['host']
- Must be specified if 'instance' is None
- :param params: Optional dictionary of arguments to be passed to the
- compute worker
-
- :returns: None
- """
- if not params:
- params = {}
- if not host:
- if not instance:
- raise exception.NovaException(_("No compute host specified"))
- host = instance['host']
- if not host:
- raise exception.NovaException(_("Unable to find host for "
- "Instance %s") % instance['uuid'])
- queue = self.db.queue_get_for(context, FLAGS.compute_topic, host)
- if instance:
- params['instance_uuid'] = instance['uuid']
- kwargs = {'method': compute_method, 'args': params}
- return rpc_method(context, queue, kwargs)
-
- def _cast_compute_message(self, *args, **kwargs):
- """Generic handler for RPC casts to compute."""
- self._cast_or_call_compute_message(rpc.cast, *args, **kwargs)
-
- def _call_compute_message(self, *args, **kwargs):
- """Generic handler for RPC calls to compute."""
- return self._cast_or_call_compute_message(rpc.call, *args, **kwargs)
-
-
-class API(BaseAPI):
+class API(base.Base):
"""API for interacting with the compute manager."""
def __init__(self, image_service=None, network_api=None, volume_api=None,
@@ -160,6 +116,7 @@ class API(BaseAPI):
self.volume_api = volume_api or volume.API()
self.consoleauth_rpcapi = consoleauth_rpcapi.ConsoleAuthAPI()
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
+ self.compute_rpcapi = compute_rpcapi.ComputeAPI()
super(API, self).__init__(**kwargs)
def _check_injected_file_quota(self, context, injected_files):
@@ -762,10 +719,8 @@ class API(BaseAPI):
hosts.add(instance['host'])
for host in hosts:
- rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.compute_topic, host),
- {"method": "refresh_security_group_rules",
- "args": {"security_group_id": security_group.id}})
+ self.compute_rpcapi.refresh_security_group_rules(context,
+ security_group.id, host=host)
def trigger_security_group_members_refresh(self, context, group_ids):
"""Called when a security group gains a new or loses a member.
@@ -805,10 +760,8 @@ class API(BaseAPI):
# ...and finally we tell these nodes to refresh their view of this
# particular security group.
for host in hosts:
- rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.compute_topic, host),
- {"method": "refresh_security_group_members",
- "args": {"security_group_id": group_id}})
+ self.compute_rpcapi.refresh_security_group_members(context,
+ group_id, host=host)
def trigger_provider_fw_rules_refresh(self, context):
"""Called when a rule is added/removed from a provider firewall"""
@@ -862,11 +815,10 @@ class API(BaseAPI):
self.db.instance_add_security_group(context.elevated(),
instance_uuid,
security_group['id'])
- params = {"security_group_id": security_group['id']}
# NOTE(comstud): No instance_uuid argument to this compute manager
# call
- self._cast_compute_message('refresh_security_group_rules',
- context, host=instance['host'], params=params)
+ self.compute_rpcapi.refresh_security_group_rules(context,
+ security_group['id'], host=instance['host'])
@wrap_check_policy
def remove_security_group(self, context, instance, security_group_name):
@@ -891,11 +843,10 @@ class API(BaseAPI):
self.db.instance_remove_security_group(context.elevated(),
instance_uuid,
security_group['id'])
- params = {"security_group_id": security_group['id']}
# NOTE(comstud): No instance_uuid argument to this compute manager
# call
- self._cast_compute_message('refresh_security_group_rules',
- context, host=instance['host'], params=params)
+ self.compute_rpcapi.refresh_security_group_rules(context,
+ security_group['id'], host=instance['host'])
@wrap_check_policy
def update(self, context, instance, **kwargs):
@@ -926,16 +877,14 @@ class API(BaseAPI):
# NOTE(jerdfelt): The compute daemon handles reclaiming instances
# that are in soft delete. If there is no host assigned, there is
# no daemon to reclaim, so delete it immediately.
- host = instance['host']
- if host:
+ if instance['host']:
self.update(context,
instance,
vm_state=vm_states.SOFT_DELETE,
task_state=task_states.POWERING_OFF,
deleted_at=utils.utcnow())
- self._cast_compute_message('power_off_instance',
- context, instance)
+ self.compute_rpcapi.power_off_instance(context, instance)
else:
LOG.warning(_('No host for instance, deleting immediately'),
instance=instance)
@@ -946,9 +895,8 @@ class API(BaseAPI):
pass
def _delete(self, context, instance):
- host = instance['host']
try:
- if not host:
+ if not instance['host']:
# Just update database, nothing else we can do
return self.db.instance_destroy(context, instance['id'])
@@ -964,14 +912,13 @@ class API(BaseAPI):
context, instance['uuid'], 'finished')
if migration_ref:
src_host = migration_ref['source_compute']
- params = {'migration_id': migration_ref['id']}
# Call since this can race with the terminate_instance
- self._call_compute_message('confirm_resize', context,
- instance, host=src_host,
- params=params)
+ self.compute_rpcapi.confirm_resize(context,
+ instance, migration_ref['id'],
+ host=src_host, cast=False)
+
+ self.compute_rpcapi.terminate_instance(context, instance)
- self._cast_compute_message('terminate_instance',
- context, instance)
except exception.InstanceNotFound:
# NOTE(comstud): Race condition. Instance already gone.
pass
@@ -1002,13 +949,9 @@ class API(BaseAPI):
task_state=None,
deleted_at=None)
- host = instance['host']
- if host:
- self.update(context,
- instance,
- task_state=task_states.POWERING_ON)
- self._cast_compute_message('power_on_instance',
- context, instance)
+ if instance['host']:
+ self.update(context, instance, task_state=task_states.POWERING_ON)
+ self.compute_rpcapi.power_on_instance(context, instance)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.SOFT_DELETE])
@@ -1032,9 +975,7 @@ class API(BaseAPI):
terminated_at=utils.utcnow(),
progress=0)
- rpc_method = rpc.cast if do_cast else rpc.call
- self._cast_or_call_compute_message(rpc_method, 'stop_instance',
- context, instance)
+ self.compute_rpcapi.stop_instance(context, instance, cast=do_cast)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.STOPPED, vm_states.SHUTOFF])
@@ -1062,7 +1003,7 @@ class API(BaseAPI):
# TODO(yamahata): injected_files isn't supported right now.
# It is used only for osapi. not for ec2 api.
# availability_zone isn't used by run_instance.
- self._cast_compute_message('start_instance', context, instance)
+ self.compute_rpcapi.start_instance(context, instance)
#NOTE(bcwaldon): no policy check here since it should be rolled in to
# search_opts in get_all
@@ -1275,10 +1216,9 @@ class API(BaseAPI):
sent_meta['properties'] = properties
recv_meta = self.image_service.create(context, sent_meta)
- params = {'image_id': recv_meta['id'], 'image_type': image_type,
- 'backup_type': backup_type, 'rotation': rotation}
- self._cast_compute_message('snapshot_instance', context, instance,
- params=params)
+ self.compute_rpcapi.snapshot_instance(context, instance=instance,
+ image_id=recv_meta['id'], image_type=image_type,
+ backup_type=backup_type, rotation=rotation)
return recv_meta
def _get_minram_mindisk_params(self, context, instance):
@@ -1312,8 +1252,8 @@ class API(BaseAPI):
instance,
vm_state=vm_states.ACTIVE,
task_state=state)
- self._cast_compute_message('reboot_instance', context, instance,
- params={'reboot_type': reboot_type})
+ self.compute_rpcapi.reboot_instance(context, instance=instance,
+ reboot_type=reboot_type)
def _get_image(self, context, image_href):
"""Throws an ImageNotFound exception if image_href does not exist."""
@@ -1383,15 +1323,9 @@ class API(BaseAPI):
# system metadata... and copy in the properties for the new image.
_reset_image_metadata()
- rebuild_params = {
- "new_pass": admin_password,
- "injected_files": files_to_inject,
- "image_ref": image_href,
- "orig_image_ref": orig_image_ref,
- }
-
- self._cast_compute_message('rebuild_instance', context, instance,
- params=rebuild_params)
+ self.compute_rpcapi.rebuild_instance(context, instance=instance,
+ new_pass=admin_password, injected_files=files_to_inject,
+ image_ref=image_href, orig_image_ref=orig_image_ref)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.SHUTOFF],
@@ -1410,9 +1344,9 @@ class API(BaseAPI):
vm_state=vm_states.RESIZING,
task_state=task_states.RESIZE_REVERTING)
- params = {'migration_id': migration_ref['id']}
- self._cast_compute_message('revert_resize', context, instance,
- host=migration_ref['dest_compute'], params=params)
+ self.compute_rpcapi.revert_resize(context,
+ instance=instance, migration_id=migration_ref['id'],
+ host=migration_ref['dest_compute'])
self.db.migration_update(context, migration_ref['id'],
{'status': 'reverted'})
@@ -1434,9 +1368,9 @@ class API(BaseAPI):
vm_state=vm_states.ACTIVE,
task_state=None)
- params = {'migration_id': migration_ref['id']}
- self._cast_compute_message('confirm_resize', context, instance,
- host=migration_ref['source_compute'], params=params)
+ self.compute_rpcapi.confirm_resize(context,
+ instance=instance, migration_id=migration_ref['id'],
+ host=migration_ref['source_compute'])
self.db.migration_update(context, migration_ref['id'],
{'status': 'confirmed'})
@@ -1510,14 +1444,14 @@ class API(BaseAPI):
@wrap_check_policy
def add_fixed_ip(self, context, instance, network_id):
"""Add fixed_ip from specified network to given instance."""
- self._cast_compute_message('add_fixed_ip_to_instance', context,
- instance, params=dict(network_id=network_id))
+ self.compute_rpcapi.add_fixed_ip_to_instance(context,
+ instance=instance, network_id=network_id)
@wrap_check_policy
def remove_fixed_ip(self, context, instance, address):
"""Remove fixed_ip from specified network to given instance."""
- self._cast_compute_message('remove_fixed_ip_from_instance',
- context, instance, params=dict(address=address))
+ self.compute_rpcapi.remove_fixed_ip_from_instance(context,
+ instance=instance, address=address)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.SHUTOFF,
@@ -1529,7 +1463,7 @@ class API(BaseAPI):
instance,
vm_state=vm_states.ACTIVE,
task_state=task_states.PAUSING)
- self._cast_compute_message('pause_instance', context, instance)
+ self.compute_rpcapi.pause_instance(context, instance=instance)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.PAUSED])
@@ -1539,13 +1473,12 @@ class API(BaseAPI):
instance,
vm_state=vm_states.PAUSED,
task_state=task_states.UNPAUSING)
- self._cast_compute_message('unpause_instance', context, instance)
+ self.compute_rpcapi.unpause_instance(context, instance=instance)
@wrap_check_policy
def get_diagnostics(self, context, instance):
"""Retrieve diagnostics for the given instance."""
- return self._call_compute_message("get_diagnostics", context,
- instance)
+ return self.compute_rpcapi.get_diagnostics(context, instance=instance)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.SHUTOFF,
@@ -1557,7 +1490,7 @@ class API(BaseAPI):
instance,
vm_state=vm_states.ACTIVE,
task_state=task_states.SUSPENDING)
- self._cast_compute_message('suspend_instance', context, instance)
+ self.compute_rpcapi.suspend_instance(context, instance=instance)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.SUSPENDED])
@@ -1567,7 +1500,7 @@ class API(BaseAPI):
instance,
vm_state=vm_states.SUSPENDED,
task_state=task_states.RESUMING)
- self._cast_compute_message('resume_instance', context, instance)
+ self.compute_rpcapi.resume_instance(context, instance=instance)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.SHUTOFF,
@@ -1580,11 +1513,8 @@ class API(BaseAPI):
vm_state=vm_states.ACTIVE,
task_state=task_states.RESCUING)
- rescue_params = {
- "rescue_password": rescue_password
- }
- self._cast_compute_message('rescue_instance', context, instance,
- params=rescue_params)
+ self.compute_rpcapi.rescue_instance(context, instance=instance,
+ rescue_password=rescue_password)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.RESCUED])
@@ -1594,7 +1524,7 @@ class API(BaseAPI):
instance,
vm_state=vm_states.RESCUED,
task_state=task_states.UNRESCUING)
- self._cast_compute_message('unrescue_instance', context, instance)
+ self.compute_rpcapi.unrescue_instance(context, instance=instance)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.ACTIVE])
@@ -1604,22 +1534,20 @@ class API(BaseAPI):
instance,
task_state=task_states.UPDATING_PASSWORD)
- params = {"new_pass": password}
- self._cast_compute_message('set_admin_password', context, instance,
- params=params)
+ self.compute_rpcapi.set_admin_password(context, instance=instance,
+ new_pass=password)
@wrap_check_policy
def inject_file(self, context, instance, path, file_contents):
"""Write a file to the given instance."""
- params = {'path': path, 'file_contents': file_contents}
- self._cast_compute_message('inject_file', context, instance,
- params=params)
+ self.compute_rpcapi.inject_file(context, instance=instance, path=path,
+ file_contents=file_contents)
@wrap_check_policy
def get_vnc_console(self, context, instance, console_type):
"""Get a url to an instance Console."""
- connect_info = self._call_compute_message('get_vnc_console',
- context, instance, params={"console_type": console_type})
+ connect_info = self.compute_rpcapi.get_vnc_console(context,
+ instance=instance, console_type=console_type)
self.consoleauth_rpcapi.authorize_console(context,
connect_info['token'], console_type, connect_info['host'],
@@ -1630,19 +1558,18 @@ class API(BaseAPI):
@wrap_check_policy
def get_console_output(self, context, instance, tail_length=None):
"""Get console output for an an instance."""
- params = {'tail_length': tail_length}
- return self._call_compute_message('get_console_output', context,
- instance, params=params)
+ return self.compute_rpcapi.get_console_output(context,
+ instance=instance, tail_length=tail_length)
@wrap_check_policy
def lock(self, context, instance):
"""Lock the given instance."""
- self._cast_compute_message('lock_instance', context, instance)
+ self.compute_rpcapi.lock_instance(context, instance=instance)
@wrap_check_policy
def unlock(self, context, instance):
"""Unlock the given instance."""
- self._cast_compute_message('unlock_instance', context, instance)
+ self.compute_rpcapi.unlock_instance(context, instance=instance)
@wrap_check_policy
def get_lock(self, context, instance):
@@ -1652,12 +1579,12 @@ class API(BaseAPI):
@wrap_check_policy
def reset_network(self, context, instance):
"""Reset networking on the instance."""
- self._cast_compute_message('reset_network', context, instance)
+ self.compute_rpcapi.reset_network(context, instance=instance)
@wrap_check_policy
def inject_network_info(self, context, instance):
"""Inject network info for the instance."""
- self._cast_compute_message('inject_network_info', context, instance)
+ self.compute_rpcapi.inject_network_info(context, instance=instance)
@wrap_check_policy
def attach_volume(self, context, instance, volume_id, device):
@@ -1667,10 +1594,8 @@ class API(BaseAPI):
volume = self.volume_api.get(context, volume_id)
self.volume_api.check_attach(context, volume)
self.volume_api.reserve_volume(context, volume)
- params = {"volume_id": volume_id,
- "mountpoint": device}
- self._cast_compute_message('attach_volume', context, instance,
- params=params)
+ self.compute_rpcapi.attach_volume(context, instance=instance,
+ volume_id=volume_id, mountpoint=device)
# FIXME(comstud): I wonder if API should pull in the instance from
# the volume ID via volume API and pass it and the volume object here
@@ -1688,9 +1613,8 @@ class API(BaseAPI):
volume = self.volume_api.get(context, volume_id)
self.volume_api.check_detach(context, volume)
- params = {'volume_id': volume_id}
- self._cast_compute_message('detach_volume', context, instance,
- params=params)
+ self.compute_rpcapi.detach_volume(context, instance=instance,
+ volume_id=volume_id)
return instance
@wrap_check_policy
@@ -1778,32 +1702,38 @@ class API(BaseAPI):
instance['uuid'])
-class HostAPI(BaseAPI):
+class HostAPI(base.Base):
+ def __init__(self):
+ self.compute_rpcapi = compute_rpcapi.ComputeAPI()
+ super(HostAPI, self).__init__()
+
"""Sub-set of the Compute Manager API for managing host operations."""
def set_host_enabled(self, context, host, enabled):
"""Sets the specified host's ability to accept new instances."""
# NOTE(comstud): No instance_uuid argument to this compute manager
# call
- return self._call_compute_message("set_host_enabled", context,
- host=host, params={"enabled": enabled})
+ return self.compute_rpcapi.set_host_enabled(context, enabled=enabled,
+ host=host)
def host_power_action(self, context, host, action):
"""Reboots, shuts down or powers up the host."""
# NOTE(comstud): No instance_uuid argument to this compute manager
# call
- return self._call_compute_message("host_power_action", context,
- host=host, params={"action": action})
+ topic = self.db.queue_get_for(context, FLAGS.compute_topic, host)
+ return self.compute_rpcapi.host_power_action(context, action=action,
+ host=host)
def set_host_maintenance(self, context, host, mode):
"""Start/Stop host maintenance window. On start, it triggers
guest VMs evacuation."""
- return self._call_compute_message("host_maintenance_mode", context,
- host=host, params={"host": host, "mode": mode})
+ return self.compute_rpcapi.host_maintenance_mode(context,
+ host_param=host, mode=mode, host=host)
class AggregateAPI(base.Base):
"""Sub-set of the Compute Manager API for managing host aggregates."""
def __init__(self, **kwargs):
+ self.compute_rpcapi = compute_rpcapi.ComputeAPI()
super(AggregateAPI, self).__init__(**kwargs)
def create_aggregate(self, context, aggregate_name, availability_zone):
@@ -1883,10 +1813,8 @@ class AggregateAPI(base.Base):
if aggregate.operational_state == aggregate_states.CREATED:
values = {'operational_state': aggregate_states.CHANGING}
self.db.aggregate_update(context, aggregate_id, values)
- queue = self.db.queue_get_for(context, service.topic, host)
- rpc.cast(context, queue, {"method": "add_aggregate_host",
- "args": {"aggregate_id": aggregate_id,
- "host": host}, })
+ self.compute_rpcapi.add_aggregate_host(context,
+ aggregate_id=aggregate_id, host_param=host, host=host)
return self.get_aggregate(context, aggregate_id)
else:
invalid = {aggregate_states.CHANGING: 'setup in progress',
@@ -1906,10 +1834,8 @@ class AggregateAPI(base.Base):
if aggregate.operational_state in [aggregate_states.ACTIVE,
aggregate_states.ERROR]:
self.db.aggregate_host_delete(context, aggregate_id, host)
- queue = self.db.queue_get_for(context, service.topic, host)
- rpc.cast(context, queue, {"method": "remove_aggregate_host",
- "args": {"aggregate_id": aggregate_id,
- "host": host}, })
+ self.compute_rpcapi.remove_aggregate_host(context,
+ aggregate_id=aggregate_id, host_param=host, host=host)
return self.get_aggregate(context, aggregate_id)
else:
invalid = {aggregate_states.CREATED: 'no hosts to remove',
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index 8025a46a5..bd3cd02cc 100644
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -224,6 +224,8 @@ def _get_additional_capabilities():
class ComputeManager(manager.SchedulerDependentManager):
"""Manages the running instances from creation to destruction."""
+ RPC_API_VERSION = '1.0'
+
def __init__(self, compute_driver=None, *args, **kwargs):
"""Load configuration options and connect to the hypervisor."""
# TODO(vish): sync driver creation logic with the rest of the system
diff --git a/nova/compute/rpcapi.py b/nova/compute/rpcapi.py
new file mode 100644
index 000000000..a2a3b281c
--- /dev/null
+++ b/nova/compute/rpcapi.py
@@ -0,0 +1,276 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012, Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Client side of the compute RPC API.
+"""
+
+from nova.db import base
+from nova import exception
+from nova import flags
+import nova.rpc.proxy
+
+
+FLAGS = flags.FLAGS
+
+
+class ComputeAPI(nova.rpc.proxy.RpcProxy, base.Base):
+ '''Client side of the compute rpc API.
+
+ API version history:
+
+ 1.0 - Initial version.
+ '''
+
+ RPC_API_VERSION = '1.0'
+
+ def __init__(self):
+ super(ComputeAPI, self).__init__(topic=FLAGS.compute_topic,
+ default_version=self.RPC_API_VERSION)
+
+ def _compute_topic(self, ctxt, host, instance):
+ '''Get the topic to use for a message.
+
+ :param ctxt: request context
+ :param host: explicit host to send the message to.
+ :param instance: If an explicit host was not specified, use
+ instance['host']
+
+ :returns: A topic string
+ '''
+ if not host:
+ if not instance:
+ raise exception.NovaException(_('No compute host specified'))
+ host = instance['host']
+ if not host:
+ raise exception.NovaException(_('Unable to find host for '
+ 'Instance %s') % instance['uuid'])
+ return self.db.queue_get_for(ctxt, self.topic, host)
+
+ def add_aggregate_host(self, ctxt, aggregate_id, host_param, host):
+ '''Add aggregate host.
+
+ :param ctxt: request context
+ :param aggregate_id:
+ :param host_param: This value is placed in the message to be the 'host'
+ parameter for the remote method.
+ :param host: This is the host to send the message to.
+ '''
+ self.cast(ctxt, self.make_msg('add_aggregate_host',
+ aggregate_id=aggregate_id, host=host_param),
+ topic=self._compute_topic(ctxt, host, None))
+
+ def add_fixed_ip_to_instance(self, ctxt, instance, network_id):
+ self.cast(ctxt, self.make_msg('add_fixed_ip_to_instance',
+ instance_uuid=instance['uuid'], network_id=network_id),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def attach_volume(self, ctxt, instance, volume_id, mountpoint):
+ self.cast(ctxt, self.make_msg('attach_volume',
+ instance_uuid=instance['uuid'], volume_id=volume_id,
+ mountpoint=mountpoint),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def confirm_resize(self, ctxt, instance, migration_id, host,
+ cast=True):
+ rpc_method = self.cast if cast else self.call
+ return rpc_method(ctxt, self.make_msg('confirm_resize',
+ instance_uuid=instance['uuid'], migration_id=migration_id),
+ topic=self._compute_topic(ctxt, host, instance))
+
+ def detach_volume(self, ctxt, instance, volume_id):
+ self.cast(ctxt, self.make_msg('detach_volume',
+ instance_uuid=instance['uuid'], volume_id=volume_id),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def get_console_output(self, ctxt, instance, tail_length):
+ return self.call(ctxt, self.make_msg('get_console_output',
+ instance_uuid=instance['uuid'], tail_length=tail_length),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def get_diagnostics(self, ctxt, instance):
+ return self.call(ctxt, self.make_msg('get_diagnostics',
+ instance_uuid=instance['uuid']),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def get_vnc_console(self, ctxt, instance, console_type):
+ return self.call(ctxt, self.make_msg('get_vnc_console',
+ instance_uuid=instance['uuid'], console_type=console_type),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def host_maintenance_mode(self, ctxt, host_param, mode, host):
+ '''Set host maintenance mode
+
+ :param ctxt: request context
+ :param host_param: This value is placed in the message to be the 'host'
+ parameter for the remote method.
+ :param mode:
+ :param host: This is the host to send the message to.
+ '''
+ return self.call(ctxt, self.make_msg('host_maintenance_mode',
+ host=host_param, mode=mode),
+ topic=self._compute_topic(ctxt, host, None))
+
+ def host_power_action(self, ctxt, action, host):
+ return self.call(ctxt, self.make_msg('host_power_action',
+ action=action), topic=self._compute_topic(ctxt, host, None))
+
+ def inject_file(self, ctxt, instance, path, file_contents):
+ self.cast(ctxt, self.make_msg('inject_file',
+ instance_uuid=instance['uuid'], path=path,
+ file_contents=file_contents),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def inject_network_info(self, ctxt, instance):
+ self.cast(ctxt, self.make_msg('inject_network_info',
+ instance_uuid=instance['uuid']),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def lock_instance(self, ctxt, instance):
+ self.cast(ctxt, self.make_msg('lock_instance',
+ instance_uuid=instance['uuid']),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def pause_instance(self, ctxt, instance):
+ self.cast(ctxt, self.make_msg('pause_instance',
+ instance_uuid=instance['uuid']),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def power_off_instance(self, ctxt, instance):
+ self.cast(ctxt, self.make_msg('power_off_instance',
+ instance_uuid=instance['uuid']),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def power_on_instance(self, ctxt, instance):
+ self.cast(ctxt, self.make_msg('power_on_instance',
+ instance_uuid=instance['uuid']),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def reboot_instance(self, ctxt, instance, reboot_type):
+ self.cast(ctxt, self.make_msg('reboot_instance',
+ instance_uuid=instance['uuid'], reboot_type=reboot_type),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def rebuild_instance(self, ctxt, instance, new_pass, injected_files,
+ image_ref, orig_image_ref):
+ self.cast(ctxt, self.make_msg('rebuild_instance',
+ instance_uuid=instance['uuid'], new_pass=new_pass,
+ injected_files=injected_files, image_ref=image_ref,
+ orig_image_ref=orig_image_ref),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def refresh_security_group_rules(self, ctxt, security_group_id, host):
+ self.cast(ctxt, self.make_msg('refresh_security_group_rules',
+ security_group_id=security_group_id),
+ topic=self._compute_topic(ctxt, host, None))
+
+ def refresh_security_group_members(self, ctxt, security_group_id,
+ host):
+ self.cast(ctxt, self.make_msg('refresh_security_group_members',
+ security_group_id=security_group_id),
+ topic=self._compute_topic(ctxt, host, None))
+
+ def remove_aggregate_host(self, ctxt, aggregate_id, host_param, host):
+ '''Remove aggregate host.
+
+ :param ctxt: request context
+ :param aggregate_id:
+ :param host_param: This value is placed in the message to be the 'host'
+ parameter for the remote method.
+ :param host: This is the host to send the message to.
+ '''
+ self.cast(ctxt, self.make_msg('remove_aggregate_host',
+ aggregate_id=aggregate_id, host=host_param),
+ topic=self._compute_topic(ctxt, host, None))
+
+ def remove_fixed_ip_from_instance(self, ctxt, instance, address):
+ self.cast(ctxt, self.make_msg('remove_fixed_ip_from_instance',
+ instance_uuid=instance['uuid'], address=address),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def rescue_instance(self, ctxt, instance, rescue_password):
+ self.cast(ctxt, self.make_msg('rescue_instance',
+ instance_uuid=instance['uuid'],
+ rescue_password=rescue_password),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def reset_network(self, ctxt, instance):
+ self.cast(ctxt, self.make_msg('reset_network',
+ instance_uuid=instance['uuid']),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def resume_instance(self, ctxt, instance):
+ self.cast(ctxt, self.make_msg('resume_instance',
+ instance_uuid=instance['uuid']),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def revert_resize(self, ctxt, instance, migration_id, host):
+ self.cast(ctxt, self.make_msg('revert_resize',
+ instance_uuid=instance['uuid'], migration_id=migration_id),
+ topic=self._compute_topic(ctxt, host, instance))
+
+ def set_admin_password(self, ctxt, instance, new_pass):
+ self.cast(ctxt, self.make_msg('set_admin_password',
+ instance_uuid=instance['uuid'], new_pass=new_pass),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def set_host_enabled(self, ctxt, enabled, host):
+ return self.call(ctxt, self.make_msg('set_host_enabled',
+ enabled=enabled), topic=self._compute_topic(ctxt, host, None))
+
+ def snapshot_instance(self, ctxt, instance, image_id, image_type,
+ backup_type, rotation):
+ self.cast(ctxt, self.make_msg('snapshot_instance',
+ instance_uuid=instance['uuid'], image_id=image_id,
+ image_type=image_type, backup_type=backup_type,
+ rotation=rotation),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def start_instance(self, ctxt, instance):
+ self.cast(ctxt, self.make_msg('start_instance',
+ instance_uuid=instance['uuid']),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def stop_instance(self, ctxt, instance, cast=True):
+ rpc_method = self.cast if cast else self.call
+ return rpc_method(ctxt, self.make_msg('stop_instance',
+ instance_uuid=instance['uuid']),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def suspend_instance(self, ctxt, instance):
+ self.cast(ctxt, self.make_msg('suspend_instance',
+ instance_uuid=instance['uuid']),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def terminate_instance(self, ctxt, instance):
+ self.cast(ctxt, self.make_msg('terminate_instance',
+ instance_uuid=instance['uuid']),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def unlock_instance(self, ctxt, instance):
+ self.cast(ctxt, self.make_msg('unlock_instance',
+ instance_uuid=instance['uuid']),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def unpause_instance(self, ctxt, instance):
+ self.cast(ctxt, self.make_msg('unpause_instance',
+ instance_uuid=instance['uuid']),
+ topic=self._compute_topic(ctxt, None, instance))
+
+ def unrescue_instance(self, ctxt, instance):
+ self.cast(ctxt, self.make_msg('unrescue_instance',
+ instance_uuid=instance['uuid']),
+ topic=self._compute_topic(ctxt, None, instance))