summaryrefslogtreecommitdiffstats
path: root/nova/compute
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2012-06-14 15:43:34 +0000
committerGerrit Code Review <review@openstack.org>2012-06-14 15:43:34 +0000
commit2adeb5a76d8376d3506f4c63ec73211bfa1e5cc0 (patch)
tree83264d1db655bfa3cd91f4f9f146a5d549e0523b /nova/compute
parent81fef25e96b20f69f58044fa341b108edea67d93 (diff)
parent123b28cd1a4ffa1e972e29963cb0e6be46b0d7c2 (diff)
Merge "Dedupe native and EC2 security group APIs."
Diffstat (limited to 'nova/compute')
-rw-r--r--nova/compute/api.py648
-rw-r--r--nova/compute/rpcapi.py171
2 files changed, 590 insertions, 229 deletions
diff --git a/nova/compute/api.py b/nova/compute/api.py
index 10bcfe457..922a8bace 100644
--- a/nova/compute/api.py
+++ b/nova/compute/api.py
@@ -25,6 +25,7 @@ import functools
import re
import string
import time
+import urllib
from nova import block_device
from nova.compute import aggregate_states
@@ -43,6 +44,7 @@ from nova import log as logging
from nova import network
from nova import notifications
from nova.openstack.common import excutils
+from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
import nova.policy
from nova import quota
@@ -92,17 +94,23 @@ def check_instance_state(vm_state=None, task_state=None):
return outer
-def wrap_check_policy(func):
+def policy_decorator(scope):
"""Check corresponding policy prior of wrapped method to execution"""
- @functools.wraps(func)
- def wrapped(self, context, target, *args, **kwargs):
- check_policy(context, func.__name__, target)
- return func(self, context, target, *args, **kwargs)
- return wrapped
+ def outer(func):
+ @functools.wraps(func)
+ def wrapped(self, context, target, *args, **kwargs):
+ check_policy(context, func.__name__, target, scope)
+ return func(self, context, target, *args, **kwargs)
+ return wrapped
+ return outer
+
+wrap_check_policy = policy_decorator(scope='compute')
+wrap_check_security_groups_policy = policy_decorator(
+ scope='compute:security_groups')
-def check_policy(context, action, target):
- _action = 'compute:%s' % action
+def check_policy(context, action, target, scope='compute'):
+ _action = '%s:%s' % (scope, action)
nova.policy.enforce(context, _action, target)
@@ -110,12 +118,13 @@ class API(base.Base):
"""API for interacting with the compute manager."""
def __init__(self, image_service=None, network_api=None, volume_api=None,
- **kwargs):
+ security_group_api=None, **kwargs):
self.image_service = (image_service or
nova.image.get_default_image_service())
self.network_api = network_api or network.API()
self.volume_api = volume_api or volume.API()
+ self.security_group_api = security_group_api or SecurityGroupAPI()
self.consoleauth_rpcapi = consoleauth_rpcapi.ConsoleAuthAPI()
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
self.compute_rpcapi = compute_rpcapi.ComputeAPI()
@@ -389,7 +398,7 @@ class API(base.Base):
kernel_id, ramdisk_id = self._handle_kernel_and_ramdisk(
context, kernel_id, ramdisk_id, image, image_service)
- self.ensure_default_security_group(context)
+ self.security_group_api.ensure_default(context)
if key_data is None and key_name:
key_pair = self.db.key_pair_get(context, context.user_id, key_name)
@@ -771,80 +780,6 @@ class API(base.Base):
return (inst_ret_list, reservation_id)
- def ensure_default_security_group(self, context):
- """Ensure that a context has a security group.
-
- Creates a security group for the security context if it does not
- already exist.
-
- :param context: the security context
- """
- try:
- self.db.security_group_get_by_name(context,
- context.project_id,
- 'default')
- except exception.NotFound:
- values = {'name': 'default',
- 'description': 'default',
- 'user_id': context.user_id,
- 'project_id': context.project_id}
- self.db.security_group_create(context, values)
-
- def trigger_security_group_rules_refresh(self, context, security_group_id):
- """Called when a rule is added to or removed from a security_group."""
-
- security_group = self.db.security_group_get(context, security_group_id)
-
- hosts = set()
- for instance in security_group['instances']:
- if instance['host'] is not None:
- hosts.add(instance['host'])
-
- for host in hosts:
- self.compute_rpcapi.refresh_security_group_rules(context,
- security_group.id, host=host)
-
- def trigger_security_group_members_refresh(self, context, group_ids):
- """Called when a security group gains a new or loses a member.
-
- Sends an update request to each compute node for whom this is
- relevant.
- """
- # First, we get the security group rules that reference these groups as
- # the grantee..
- security_group_rules = set()
- for group_id in group_ids:
- security_group_rules.update(
- self.db.security_group_rule_get_by_security_group_grantee(
- context,
- group_id))
-
- # ..then we distill the security groups to which they belong..
- security_groups = set()
- for rule in security_group_rules:
- security_group = self.db.security_group_get(
- context,
- rule['parent_group_id'])
- security_groups.add(security_group)
-
- # ..then we find the instances that are members of these groups..
- instances = set()
- for security_group in security_groups:
- for instance in security_group['instances']:
- instances.add(instance)
-
- # ...then we find the hosts where they live...
- hosts = set()
- for instance in instances:
- if instance['host']:
- hosts.add(instance['host'])
-
- # ...and finally we tell these nodes to refresh their view of this
- # particular security group.
- for host in hosts:
- self.compute_rpcapi.refresh_security_group_members(context,
- group_id, host=host)
-
def trigger_provider_fw_rules_refresh(self, context):
"""Called when a rule is added/removed from a provider firewall"""
@@ -853,81 +788,6 @@ class API(base.Base):
for host in hosts:
self.compute_rpcapi.refresh_provider_fw_rules(context, host)
- def _is_security_group_associated_with_server(self, security_group,
- instance_uuid):
- """Check if the security group is already associated
- with the instance. If Yes, return True.
- """
-
- if not security_group:
- return False
-
- instances = security_group.get('instances')
- if not instances:
- return False
-
- for inst in instances:
- if (instance_uuid == inst['uuid']):
- return True
-
- return False
-
- @wrap_check_policy
- def add_security_group(self, context, instance, security_group_name):
- """Add security group to the instance"""
- security_group = self.db.security_group_get_by_name(context,
- context.project_id,
- security_group_name)
-
- instance_uuid = instance['uuid']
-
- #check if the security group is associated with the server
- if self._is_security_group_associated_with_server(security_group,
- instance_uuid):
- raise exception.SecurityGroupExistsForInstance(
- security_group_id=security_group['id'],
- instance_id=instance_uuid)
-
- #check if the instance is in running state
- if instance['power_state'] != power_state.RUNNING:
- raise exception.InstanceNotRunning(instance_id=instance_uuid)
-
- self.db.instance_add_security_group(context.elevated(),
- instance_uuid,
- security_group['id'])
- # NOTE(comstud): No instance_uuid argument to this compute manager
- # call
- self.compute_rpcapi.refresh_security_group_rules(context,
- security_group['id'], host=instance['host'])
-
- @wrap_check_policy
- def remove_security_group(self, context, instance, security_group_name):
- """Remove the security group associated with the instance"""
- security_group = self.db.security_group_get_by_name(context,
- context.project_id,
- security_group_name)
-
- instance_uuid = instance['uuid']
-
- #check if the security group is associated with the server
- if not self._is_security_group_associated_with_server(security_group,
- instance_uuid):
- raise exception.SecurityGroupNotExistsForInstance(
- security_group_id=security_group['id'],
- instance_id=instance_uuid)
-
- #check if the instance is in running state
- if instance['power_state'] != power_state.RUNNING:
- raise exception.InstanceNotRunning(instance_id=instance_uuid)
-
- self.db.instance_remove_security_group(context.elevated(),
- instance_uuid,
- security_group['id'])
- # NOTE(comstud): No instance_uuid argument to this compute manager
- # call
- self.compute_rpcapi.refresh_security_group_rules(context,
- security_group['id'], host=instance['host'])
-
@wrap_check_policy
def update(self, context, instance, **kwargs):
"""Updates the instance in the datastore.
@@ -2065,3 +1925,473 @@ class KeypairAPI(base.Base):
'fingerprint': key_pair['fingerprint'],
})
return rval
+
+
+class SecurityGroupAPI(base.Base):
+ """
+ Sub-set of the Compute API related to managing security groups
+ and security group rules
+ """
+ def __init__(self, **kwargs):
+ super(SecurityGroupAPI, self).__init__(**kwargs)
+ self.security_group_rpcapi = compute_rpcapi.SecurityGroupAPI()
+ self.sgh = importutils.import_object(FLAGS.security_group_handler)
+
+ def validate_property(self, value, property, allowed):
+ """
+ Validate given security group property.
+
+ :param value: the value to validate, as a string or unicode
+ :param property: the property, either 'name' or 'description'
+ :param allowed: the range of characters allowed
+ """
+
+ try:
+ val = value.strip()
+ except AttributeError:
+ msg = _("Security group %s is not a string or unicode") % property
+ self.raise_invalid_property(msg)
+ if not val:
+ msg = _("Security group %s cannot be empty.") % property
+ self.raise_invalid_property(msg)
+
+ if allowed and not re.match(allowed, val):
+ # Some validation to ensure that values match API spec.
+ # - Alphanumeric characters, spaces, dashes, and underscores.
+ # TODO(Daviey): LP: #813685 extend beyond group_name checking, and
+ # probably create a param validator that can be used elsewhere.
+ msg = (_("Value (%(value)s) for parameter Group%(property)s is "
+ "invalid. Content limited to '%(allowed)'.") %
+ dict(value=value, allowed=allowed,
+ property=property.capitalize()))
+ self.raise_invalid_property(msg)
+ if len(val) > 255:
+ msg = _("Security group %s should not be greater "
+ "than 255 characters.") % property
+ self.raise_invalid_property(msg)
+
+ def ensure_default(self, context):
+ """Ensure that a context has a security group.
+
+ Creates a security group for the security context if it does not
+ already exist.
+
+ :param context: the security context
+ """
+ try:
+ self.db.security_group_get_by_name(context,
+ context.project_id,
+ 'default')
+ except exception.NotFound:
+ values = {'name': 'default',
+ 'description': 'default',
+ 'user_id': context.user_id,
+ 'project_id': context.project_id}
+ self.db.security_group_create(context, values)
+
+ def create(self, context, name, description):
+ try:
+ reservations = QUOTAS.reserve(context, security_groups=1)
+ except exception.OverQuota:
+ msg = _("Quota exceeded, too many security groups.")
+ self.raise_over_quota(msg)
+
+ LOG.audit(_("Create Security Group %s"), name, context=context)
+
+ self.ensure_default(context)
+
+ if self.db.security_group_exists(context, context.project_id, name):
+ msg = _('Security group %s already exists') % name
+ self.raise_group_already_exists(msg)
+
+ try:
+ group = {'user_id': context.user_id,
+ 'project_id': context.project_id,
+ 'name': name,
+ 'description': description}
+ group_ref = self.db.security_group_create(context, group)
+ self.sgh.trigger_security_group_create_refresh(context, group)
+ # Commit the reservation
+ QUOTAS.commit(context, reservations)
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ QUOTAS.rollback(context, reservations)
+
+ return group_ref
+
+ def get(self, context, name=None, id=None, map_exception=False):
+ self.ensure_default(context)
+ try:
+ if name:
+ return self.db.security_group_get_by_name(context,
+ context.project_id,
+ name)
+ elif id:
+ return self.db.security_group_get(context, id)
+ except exception.NotFound as exp:
+ if map_exception:
+ msg = unicode(exp)
+ self.raise_not_found(msg)
+ else:
+ raise
+
+ def list(self, context, names=None, ids=None, project=None):
+ self.ensure_default(context)
+
+ groups = []
+ if names or ids:
+ if names:
+ for name in names:
+ groups.append(self.db.security_group_get_by_name(context,
+ project,
+ name))
+ if ids:
+ for id in ids:
+ groups.append(self.db.security_group_get(context, id))
+
+ elif context.is_admin:
+ groups = self.db.security_group_get_all(context)
+
+ elif project:
+ groups = self.db.security_group_get_by_project(context, project)
+
+ return groups
+
+ def destroy(self, context, security_group):
+ if self.db.security_group_in_use(context, security_group.id):
+ msg = _("Security group is still in use")
+ self.raise_invalid_group(msg)
+
+ # Get reservations
+ try:
+ reservations = QUOTAS.reserve(context, security_groups=-1)
+ except Exception:
+ reservations = None
+ LOG.exception(_("Failed to update usages deallocating "
+ "security group"))
+
+ LOG.audit(_("Delete security group %s"), security_group.name,
+ context=context)
+ self.db.security_group_destroy(context, security_group.id)
+
+ self.sgh.trigger_security_group_destroy_refresh(context,
+ security_group.id)
+
+ # Commit the reservations
+ if reservations:
+ QUOTAS.commit(context, reservations)
+
+ def is_associated_with_server(self, security_group, instance_uuid):
+ """Check if the security group is already associated
+ with the instance. If Yes, return True.
+ """
+
+ if not security_group:
+ return False
+
+ instances = security_group.get('instances')
+ if not instances:
+ return False
+
+ for inst in instances:
+ if (instance_uuid == inst['uuid']):
+ return True
+
+ return False
+
+ @wrap_check_security_groups_policy
+ def add_to_instance(self, context, instance, security_group_name):
+ """Add security group to the instance"""
+ security_group = self.db.security_group_get_by_name(context,
+ context.project_id,
+ security_group_name)
+
+ instance_uuid = instance['uuid']
+
+ #check if the security group is associated with the server
+ if self.is_associated_with_server(security_group, instance_uuid):
+ raise exception.SecurityGroupExistsForInstance(
+ security_group_id=security_group['id'],
+ instance_id=instance_uuid)
+
+ #check if the instance is in running state
+ if instance['power_state'] != power_state.RUNNING:
+ raise exception.InstanceNotRunning(instance_id=instance_uuid)
+
+ self.db.instance_add_security_group(context.elevated(),
+ instance_uuid,
+ security_group['id'])
+ params = {"security_group_id": security_group['id']}
+ # NOTE(comstud): No instance_uuid argument to this compute manager
+ # call
+ self.security_group_rpcapi.refresh_security_group_rules(context,
+ security_group['id'], host=instance['host'])
+
+ self.trigger_handler('instance_add_security_group',
+ context, instance, security_group_name)
+
+ @wrap_check_security_groups_policy
+ def remove_from_instance(self, context, instance, security_group_name):
+ """Remove the security group associated with the instance"""
+ security_group = self.db.security_group_get_by_name(context,
+ context.project_id,
+ security_group_name)
+
+ instance_uuid = instance['uuid']
+
+ #check if the security group is associated with the server
+ if not self.is_associated_with_server(security_group, instance_uuid):
+ raise exception.SecurityGroupNotExistsForInstance(
+ security_group_id=security_group['id'],
+ instance_id=instance_uuid)
+
+ #check if the instance is in running state
+ if instance['power_state'] != power_state.RUNNING:
+ raise exception.InstanceNotRunning(instance_id=instance_uuid)
+
+ self.db.instance_remove_security_group(context.elevated(),
+ instance_uuid,
+ security_group['id'])
+ params = {"security_group_id": security_group['id']}
+ # NOTE(comstud): No instance_uuid argument to this compute manager
+ # call
+ self.security_group_rpcapi.refresh_security_group_rules(context,
+ security_group['id'], host=instance['host'])
+
+ self.trigger_handler('instance_remove_security_group',
+ context, instance, security_group_name)
+
+ def trigger_handler(self, event, *args):
+ handle = getattr(self.sgh, 'trigger_%s_refresh' % event)
+ handle(*args)
+
+ def trigger_rules_refresh(self, context, id):
+ """Called when a rule is added to or removed from a security_group."""
+
+ security_group = self.db.security_group_get(context, id)
+
+ hosts = set()
+ for instance in security_group['instances']:
+ if instance['host'] is not None:
+ hosts.add(instance['host'])
+
+ for host in hosts:
+ self.security_group_rpcapi.refresh_security_group_rules(context,
+ security_group.id, host=host)
+
+ def trigger_members_refresh(self, context, group_ids):
+ """Called when a security group gains a new or loses a member.
+
+ Sends an update request to each compute node for whom this is
+ relevant.
+ """
+ # First, we get the security group rules that reference these groups as
+ # the grantee..
+ security_group_rules = set()
+ for group_id in group_ids:
+ security_group_rules.update(
+ self.db.security_group_rule_get_by_security_group_grantee(
+ context,
+ group_id))
+
+ # ..then we distill the security groups to which they belong..
+ security_groups = set()
+ for rule in security_group_rules:
+ security_group = self.db.security_group_get(
+ context,
+ rule['parent_group_id'])
+ security_groups.add(security_group)
+
+ # ..then we find the instances that are members of these groups..
+ instances = set()
+ for security_group in security_groups:
+ for instance in security_group['instances']:
+ instances.add(instance)
+
+ # ...then we find the hosts where they live...
+ hosts = set()
+ for instance in instances:
+ if instance['host']:
+ hosts.add(instance['host'])
+
+ # ...and finally we tell these nodes to refresh their view of this
+ # particular security group.
+ for host in hosts:
+ self.security_group_rpcapi.refresh_security_group_members(context,
+ group_id, host=host)
+
+ def parse_cidr(self, cidr):
+ if cidr:
+ try:
+ cidr = urllib.unquote(cidr).decode()
+ except Exception as e:
+ self.raise_invalid_cidr(cidr, e)
+
+ if not utils.is_valid_cidr(cidr):
+ self.raise_invalid_cidr(cidr)
+
+ return cidr
+ else:
+ return '0.0.0.0/0'
+
+ @staticmethod
+ def new_group_ingress_rule(grantee_group_id, protocol, from_port,
+ to_port):
+ return SecurityGroupAPI._new_ingress_rule(protocol, from_port,
+ to_port, group_id=grantee_group_id)
+
+ @staticmethod
+ def new_cidr_ingress_rule(grantee_cidr, protocol, from_port, to_port):
+ return SecurityGroupAPI._new_ingress_rule(protocol, from_port,
+ to_port, cidr=grantee_cidr)
+
+ @staticmethod
+ def _new_ingress_rule(ip_protocol, from_port, to_port,
+ group_id=None, cidr=None):
+ values = {}
+
+ if group_id:
+ values['group_id'] = group_id
+ # Open everything if an explicit port range or type/code are not
+ # specified, but only if a source group was specified.
+ ip_proto_upper = ip_protocol.upper() if ip_protocol else ''
+ if (ip_proto_upper == 'ICMP' and
+ from_port is None and to_port is None):
+ from_port = -1
+ to_port = -1
+ elif (ip_proto_upper in ['TCP', 'UDP'] and from_port is None
+ and to_port is None):
+ from_port = 1
+ to_port = 65535
+
+ elif cidr:
+ values['cidr'] = cidr
+
+ if ip_protocol and from_port is not None and to_port is not None:
+
+ ip_protocol = str(ip_protocol)
+ try:
+ # Verify integer conversions
+ from_port = int(from_port)
+ to_port = int(to_port)
+ except ValueError:
+ if ip_protocol.upper() == 'ICMP':
+ raise exception.InvalidInput(reason="Type and"
+ " Code must be integers for ICMP protocol type")
+ else:
+ raise exception.InvalidInput(reason="To and From ports "
+ "must be integers")
+
+ if ip_protocol.upper() not in ['TCP', 'UDP', 'ICMP']:
+ raise exception.InvalidIpProtocol(protocol=ip_protocol)
+
+ # Verify that from_port must always be less than
+ # or equal to to_port
+ if (ip_protocol.upper() in ['TCP', 'UDP'] and
+ (from_port > to_port)):
+ raise exception.InvalidPortRange(from_port=from_port,
+ to_port=to_port, msg="Former value cannot"
+ " be greater than the later")
+
+ # Verify valid TCP, UDP port ranges
+ if (ip_protocol.upper() in ['TCP', 'UDP'] and
+ (from_port < 1 or to_port > 65535)):
+ raise exception.InvalidPortRange(from_port=from_port,
+ to_port=to_port, msg="Valid TCP ports should"
+ " be between 1-65535")
+
+ # Verify ICMP type and code
+ if (ip_protocol.upper() == "ICMP" and
+ (from_port < -1 or from_port > 255 or
+ to_port < -1 or to_port > 255)):
+ raise exception.InvalidPortRange(from_port=from_port,
+ to_port=to_port, msg="For ICMP, the"
+ " type:code must be valid")
+
+ values['protocol'] = ip_protocol
+ values['from_port'] = from_port
+ values['to_port'] = to_port
+
+ else:
+ # If cidr based filtering, protocol and ports are mandatory
+ if cidr:
+ return None
+
+ return values
+
+ def rule_exists(self, security_group, values):
+ """Indicates whether the specified rule values are already
+ defined in the given security group.
+ """
+ for rule in security_group.rules:
+ is_duplicate = True
+ keys = ('group_id', 'cidr', 'from_port', 'to_port', 'protocol')
+ for key in keys:
+ if rule.get(key) != values.get(key):
+ is_duplicate = False
+ break
+ if is_duplicate:
+ return rule.get('id') or True
+ return False
+
+ def get_rule(self, context, id):
+ self.ensure_default(context)
+ try:
+ return self.db.security_group_rule_get(context, id)
+ except exception.NotFound:
+ msg = _("Rule (%s) not found") % id
+ self.raise_not_found(msg)
+
+ def add_rules(self, context, id, name, vals):
+ count = QUOTAS.count(context, 'security_group_rules', id)
+ try:
+ projected = count + len(vals)
+ QUOTAS.limit_check(context, security_group_rules=projected)
+ except exception.OverQuota:
+ msg = _("Quota exceeded, too many security group rules.")
+ self.raise_over_quota(msg)
+
+ msg = _("Authorize security group ingress %s")
+ LOG.audit(msg, name, context=context)
+
+ rules = [self.db.security_group_rule_create(context, v) for v in vals]
+
+ self.trigger_rules_refresh(context, id=id)
+ self.trigger_handler('security_group_rule_create', context,
+ [r['id'] for r in rules])
+ return rules
+
+ def remove_rules(self, context, security_group, rule_ids):
+ msg = _("Revoke security group ingress %s")
+ LOG.audit(msg, security_group['name'], context=context)
+
+ for rule_id in rule_ids:
+ self.db.security_group_rule_destroy(context, rule_id)
+
+ # NOTE(vish): we removed some rules, so refresh
+ self.trigger_rules_refresh(context, id=security_group['id'])
+ self.trigger_handler('security_group_rule_destroy', context, rule_ids)
+
+ @staticmethod
+ def raise_invalid_property(msg):
+ raise NotImplementedError()
+
+ @staticmethod
+ def raise_group_already_exists(msg):
+ raise NotImplementedError()
+
+ @staticmethod
+ def raise_invalid_group(msg):
+ raise NotImplementedError()
+
+ @staticmethod
+ def raise_invalid_cidr(cidr, decoding_exception=None):
+ raise NotImplementedError()
+
+ @staticmethod
+ def raise_over_quota(msg):
+ raise NotImplementedError()
+
+ @staticmethod
+ def raise_not_found(msg):
+ raise NotImplementedError()
diff --git a/nova/compute/rpcapi.py b/nova/compute/rpcapi.py
index 8c25906c9..e7945c7d4 100644
--- a/nova/compute/rpcapi.py
+++ b/nova/compute/rpcapi.py
@@ -27,6 +27,27 @@ import nova.rpc.proxy
FLAGS = flags.FLAGS
+def _compute_topic(topic, ctxt, host, instance):
+ '''Get the topic to use for a message.
+
+ :param topic: the base topic
+ :param ctxt: request context
+ :param host: explicit host to send the message to.
+ :param instance: If an explicit host was not specified, use
+ instance['host']
+
+ :returns: A topic string
+ '''
+ if not host:
+ if not instance:
+ raise exception.NovaException(_('No compute host specified'))
+ host = instance['host']
+ if not host:
+ raise exception.NovaException(_('Unable to find host for '
+ 'Instance %s') % instance['uuid'])
+ return rpc.queue_get_for(ctxt, topic, host)
+
+
class ComputeAPI(nova.rpc.proxy.RpcProxy):
'''Client side of the compute rpc API.
@@ -41,25 +62,6 @@ class ComputeAPI(nova.rpc.proxy.RpcProxy):
super(ComputeAPI, self).__init__(topic=FLAGS.compute_topic,
default_version=self.RPC_API_VERSION)
- def _compute_topic(self, ctxt, host, instance):
- '''Get the topic to use for a message.
-
- :param ctxt: request context
- :param host: explicit host to send the message to.
- :param instance: If an explicit host was not specified, use
- instance['host']
-
- :returns: A topic string
- '''
- if not host:
- if not instance:
- raise exception.NovaException(_('No compute host specified'))
- host = instance['host']
- if not host:
- raise exception.NovaException(_('Unable to find host for '
- 'Instance %s') % instance['uuid'])
- return rpc.queue_get_for(ctxt, self.topic, host)
-
def add_aggregate_host(self, ctxt, aggregate_id, host_param, host):
'''Add aggregate host.
@@ -71,90 +73,90 @@ class ComputeAPI(nova.rpc.proxy.RpcProxy):
'''
self.cast(ctxt, self.make_msg('add_aggregate_host',
aggregate_id=aggregate_id, host=host_param),
- topic=self._compute_topic(ctxt, host, None))
+ topic=_compute_topic(self.topic, ctxt, host, None))
def add_fixed_ip_to_instance(self, ctxt, instance, network_id):
self.cast(ctxt, self.make_msg('add_fixed_ip_to_instance',
instance_uuid=instance['uuid'], network_id=network_id),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def attach_volume(self, ctxt, instance, volume_id, mountpoint):
self.cast(ctxt, self.make_msg('attach_volume',
instance_uuid=instance['uuid'], volume_id=volume_id,
mountpoint=mountpoint),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def check_shared_storage_test_file(self, ctxt, filename, host):
return self.call(ctxt, self.make_msg('check_shared_storage_test_file',
filename=filename),
- topic=self._compute_topic(ctxt, host, None))
+ topic=_compute_topic(self.topic, ctxt, host, None))
def cleanup_shared_storage_test_file(self, ctxt, filename, host):
self.cast(ctxt, self.make_msg('cleanup_shared_storage_test_file',
filename=filename),
- topic=self._compute_topic(ctxt, host, None))
+ topic=_compute_topic(self.topic, ctxt, host, None))
def compare_cpu(self, ctxt, cpu_info, host):
return self.call(ctxt, self.make_msg('compare_cpu', cpu_info=cpu_info),
- topic=self._compute_topic(ctxt, host, None))
+ topic=_compute_topic(self.topic, ctxt, host, None))
def confirm_resize(self, ctxt, instance, migration_id, host,
cast=True):
rpc_method = self.cast if cast else self.call
return rpc_method(ctxt, self.make_msg('confirm_resize',
instance_uuid=instance['uuid'], migration_id=migration_id),
- topic=self._compute_topic(ctxt, host, instance))
+ topic=_compute_topic(self.topic, ctxt, host, instance))
def create_shared_storage_test_file(self, ctxt, host):
return self.call(ctxt,
self.make_msg('create_shared_storage_test_file'),
- topic=self._compute_topic(ctxt, host, None))
+ topic=_compute_topic(self.topic, ctxt, host, None))
def detach_volume(self, ctxt, instance, volume_id):
self.cast(ctxt, self.make_msg('detach_volume',
instance_uuid=instance['uuid'], volume_id=volume_id),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def finish_resize(self, ctxt, instance, migration_id, image, disk_info,
host):
self.cast(ctxt, self.make_msg('finish_resize',
instance_uuid=instance['uuid'], migration_id=migration_id,
image=image, disk_info=disk_info),
- topic=self._compute_topic(ctxt, host, None))
+ topic=_compute_topic(self.topic, ctxt, host, None))
def finish_revert_resize(self, ctxt, instance, migration_id, host):
self.cast(ctxt, self.make_msg('finish_revert_resize',
instance_uuid=instance['uuid'], migration_id=migration_id),
- topic=self._compute_topic(ctxt, host, None))
+ topic=_compute_topic(self.topic, ctxt, host, None))
def get_console_output(self, ctxt, instance, tail_length):
return self.call(ctxt, self.make_msg('get_console_output',
instance_uuid=instance['uuid'], tail_length=tail_length),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def get_console_pool_info(self, ctxt, console_type, host):
return self.call(ctxt, self.make_msg('get_console_pool_info',
console_type=console_type),
- topic=self._compute_topic(ctxt, host, None))
+ topic=_compute_topic(self.topic, ctxt, host, None))
def get_console_topic(self, ctxt, host):
return self.call(ctxt, self.make_msg('get_console_topic'),
- topic=self._compute_topic(ctxt, host, None))
+ topic=_compute_topic(self.topic, ctxt, host, None))
def get_diagnostics(self, ctxt, instance):
return self.call(ctxt, self.make_msg('get_diagnostics',
instance_uuid=instance['uuid']),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def get_instance_disk_info(self, ctxt, instance):
return self.call(ctxt, self.make_msg('get_instance_disk_info',
instance_name=instance['name']),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def get_vnc_console(self, ctxt, instance, console_type):
return self.call(ctxt, self.make_msg('get_vnc_console',
instance_uuid=instance['uuid'], console_type=console_type),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def host_maintenance_mode(self, ctxt, host_param, mode, host):
'''Set host maintenance mode
@@ -167,60 +169,61 @@ class ComputeAPI(nova.rpc.proxy.RpcProxy):
'''
return self.call(ctxt, self.make_msg('host_maintenance_mode',
host=host_param, mode=mode),
- topic=self._compute_topic(ctxt, host, None))
+ topic=_compute_topic(self.topic, ctxt, host, None))
def host_power_action(self, ctxt, action, host):
+ topic = _compute_topic(self.topic, ctxt, host, None)
return self.call(ctxt, self.make_msg('host_power_action',
- action=action), topic=self._compute_topic(ctxt, host, None))
+ action=action), topic)
def inject_file(self, ctxt, instance, path, file_contents):
self.cast(ctxt, self.make_msg('inject_file',
instance_uuid=instance['uuid'], path=path,
file_contents=file_contents),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def inject_network_info(self, ctxt, instance):
self.cast(ctxt, self.make_msg('inject_network_info',
instance_uuid=instance['uuid']),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def lock_instance(self, ctxt, instance):
self.cast(ctxt, self.make_msg('lock_instance',
instance_uuid=instance['uuid']),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def post_live_migration_at_destination(self, ctxt, instance,
block_migration, host):
return self.call(ctxt,
self.make_msg('post_live_migration_at_destination',
instance_id=instance['id'], block_migration=block_migration),
- self._compute_topic(ctxt, host, None))
+ _compute_topic(self.topic, ctxt, host, None))
def pause_instance(self, ctxt, instance):
self.cast(ctxt, self.make_msg('pause_instance',
instance_uuid=instance['uuid']),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def power_off_instance(self, ctxt, instance):
self.cast(ctxt, self.make_msg('power_off_instance',
instance_uuid=instance['uuid']),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def power_on_instance(self, ctxt, instance):
self.cast(ctxt, self.make_msg('power_on_instance',
instance_uuid=instance['uuid']),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def pre_live_migration(self, ctxt, instance, block_migration, disk,
host):
return self.call(ctxt, self.make_msg('pre_live_migration',
instance_id=instance['id'], block_migration=block_migration,
- disk=disk), self._compute_topic(ctxt, host, None))
+ disk=disk), _compute_topic(self.topic, ctxt, host, None))
def reboot_instance(self, ctxt, instance, reboot_type):
self.cast(ctxt, self.make_msg('reboot_instance',
instance_uuid=instance['uuid'], reboot_type=reboot_type),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def rebuild_instance(self, ctxt, instance, new_pass, injected_files,
image_ref, orig_image_ref):
@@ -228,22 +231,22 @@ class ComputeAPI(nova.rpc.proxy.RpcProxy):
instance_uuid=instance['uuid'], new_pass=new_pass,
injected_files=injected_files, image_ref=image_ref,
orig_image_ref=orig_image_ref),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def refresh_provider_fw_rules(self, ctxt, host):
self.cast(ctxt, self.make_msg('refresh_provider_fw_rules'),
- self._compute_topic(ctxt, host, None))
+ _compute_topic(self.topic, ctxt, host, None))
def refresh_security_group_rules(self, ctxt, security_group_id, host):
self.cast(ctxt, self.make_msg('refresh_security_group_rules',
security_group_id=security_group_id),
- topic=self._compute_topic(ctxt, host, None))
+ topic=_compute_topic(self.topic, ctxt, host, None))
def refresh_security_group_members(self, ctxt, security_group_id,
host):
self.cast(ctxt, self.make_msg('refresh_security_group_members',
security_group_id=security_group_id),
- topic=self._compute_topic(ctxt, host, None))
+ topic=_compute_topic(self.topic, ctxt, host, None))
def remove_aggregate_host(self, ctxt, aggregate_id, host_param, host):
'''Remove aggregate host.
@@ -256,57 +259,59 @@ class ComputeAPI(nova.rpc.proxy.RpcProxy):
'''
self.cast(ctxt, self.make_msg('remove_aggregate_host',
aggregate_id=aggregate_id, host=host_param),
- topic=self._compute_topic(ctxt, host, None))
+ topic=_compute_topic(self.topic, ctxt, host, None))
def remove_fixed_ip_from_instance(self, ctxt, instance, address):
self.cast(ctxt, self.make_msg('remove_fixed_ip_from_instance',
instance_uuid=instance['uuid'], address=address),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def remove_volume_connection(self, ctxt, instance, volume_id, host):
return self.call(ctxt, self.make_msg('remove_volume_connection',
instance_id=instance['id'], volume_id=volume_id),
- topic=self._compute_topic(ctxt, host, None))
+ topic=_compute_topic(self.topic, ctxt, host, None))
def rescue_instance(self, ctxt, instance, rescue_password):
self.cast(ctxt, self.make_msg('rescue_instance',
instance_uuid=instance['uuid'],
rescue_password=rescue_password),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def reset_network(self, ctxt, instance):
self.cast(ctxt, self.make_msg('reset_network',
instance_uuid=instance['uuid']),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def resize_instance(self, ctxt, instance, migration_id, image):
+ topic = _compute_topic(self.topic, ctxt, None, instance)
self.cast(ctxt, self.make_msg('resize_instance',
instance_uuid=instance['uuid'], migration_id=migration_id,
- image=image), topic=self._compute_topic(ctxt, None, instance))
+ image=image), topic)
def resume_instance(self, ctxt, instance):
self.cast(ctxt, self.make_msg('resume_instance',
instance_uuid=instance['uuid']),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def revert_resize(self, ctxt, instance, migration_id, host):
self.cast(ctxt, self.make_msg('revert_resize',
instance_uuid=instance['uuid'], migration_id=migration_id),
- topic=self._compute_topic(ctxt, host, instance))
+ topic=_compute_topic(self.topic, ctxt, host, instance))
def rollback_live_migration_at_destination(self, ctxt, instance, host):
self.cast(ctxt, self.make_msg('rollback_live_migration_at_destination',
instance_id=instance['id']),
- topic=self._compute_topic(ctxt, host, None))
+ topic=_compute_topic(self.topic, ctxt, host, None))
def set_admin_password(self, ctxt, instance, new_pass):
self.cast(ctxt, self.make_msg('set_admin_password',
instance_uuid=instance['uuid'], new_pass=new_pass),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def set_host_enabled(self, ctxt, enabled, host):
+ topic = _compute_topic(self.topic, ctxt, host, None)
return self.call(ctxt, self.make_msg('set_host_enabled',
- enabled=enabled), topic=self._compute_topic(ctxt, host, None))
+ enabled=enabled), topic)
def snapshot_instance(self, ctxt, instance, image_id, image_type,
backup_type, rotation):
@@ -314,40 +319,66 @@ class ComputeAPI(nova.rpc.proxy.RpcProxy):
instance_uuid=instance['uuid'], image_id=image_id,
image_type=image_type, backup_type=backup_type,
rotation=rotation),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def start_instance(self, ctxt, instance):
self.cast(ctxt, self.make_msg('start_instance',
instance_uuid=instance['uuid']),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def stop_instance(self, ctxt, instance, cast=True):
rpc_method = self.cast if cast else self.call
return rpc_method(ctxt, self.make_msg('stop_instance',
instance_uuid=instance['uuid']),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def suspend_instance(self, ctxt, instance):
self.cast(ctxt, self.make_msg('suspend_instance',
instance_uuid=instance['uuid']),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def terminate_instance(self, ctxt, instance):
self.cast(ctxt, self.make_msg('terminate_instance',
instance_uuid=instance['uuid']),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def unlock_instance(self, ctxt, instance):
self.cast(ctxt, self.make_msg('unlock_instance',
instance_uuid=instance['uuid']),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def unpause_instance(self, ctxt, instance):
self.cast(ctxt, self.make_msg('unpause_instance',
instance_uuid=instance['uuid']),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
def unrescue_instance(self, ctxt, instance):
self.cast(ctxt, self.make_msg('unrescue_instance',
instance_uuid=instance['uuid']),
- topic=self._compute_topic(ctxt, None, instance))
+ topic=_compute_topic(self.topic, ctxt, None, instance))
+
+
+class SecurityGroupAPI(nova.rpc.proxy.RpcProxy):
+ '''Client side of the security group rpc API.
+
+ API version history:
+
+ 1.0 - Initial version.
+ '''
+
+ RPC_API_VERSION = '1.0'
+
+ def __init__(self):
+ super(SecurityGroupAPI, self).__init__(topic=FLAGS.compute_topic,
+ default_version=self.RPC_API_VERSION)
+
+ def refresh_security_group_rules(self, ctxt, security_group_id, host):
+ self.cast(ctxt, self.make_msg('refresh_security_group_rules',
+ security_group_id=security_group_id),
+ topic=_compute_topic(self.topic, ctxt, host, None))
+
+ def refresh_security_group_members(self, ctxt, security_group_id,
+ host):
+ self.cast(ctxt, self.make_msg('refresh_security_group_members',
+ security_group_id=security_group_id),
+ topic=_compute_topic(self.topic, ctxt, host, None))