From 123b28cd1a4ffa1e972e29963cb0e6be46b0d7c2 Mon Sep 17 00:00:00 2001 From: Eoghan Glynn Date: Tue, 22 May 2012 13:35:06 +0100 Subject: Dedupe native and EC2 security group APIs. Reduce the code duplication in the native openstack and EC2 APIs related to security groups, by factoring commonality into a new internal SecurityGroupAPI. Also fixes bug lp 1005931 Change-Id: Ifb92bf5d0f07d5713818a3eee6175ef03e8c0b7c --- nova/compute/api.py | 648 +++++++++++++++++++++++++++++++++++++------------ nova/compute/rpcapi.py | 171 +++++++------ 2 files changed, 590 insertions(+), 229 deletions(-) (limited to 'nova/compute') diff --git a/nova/compute/api.py b/nova/compute/api.py index 10bcfe457..922a8bace 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -25,6 +25,7 @@ import functools import re import string import time +import urllib from nova import block_device from nova.compute import aggregate_states @@ -43,6 +44,7 @@ from nova import log as logging from nova import network from nova import notifications from nova.openstack.common import excutils +from nova.openstack.common import importutils from nova.openstack.common import jsonutils import nova.policy from nova import quota @@ -92,17 +94,23 @@ def check_instance_state(vm_state=None, task_state=None): return outer -def wrap_check_policy(func): +def policy_decorator(scope): """Check corresponding policy prior of wrapped method to execution""" - @functools.wraps(func) - def wrapped(self, context, target, *args, **kwargs): - check_policy(context, func.__name__, target) - return func(self, context, target, *args, **kwargs) - return wrapped + def outer(func): + @functools.wraps(func) + def wrapped(self, context, target, *args, **kwargs): + check_policy(context, func.__name__, target, scope) + return func(self, context, target, *args, **kwargs) + return wrapped + return outer + +wrap_check_policy = policy_decorator(scope='compute') +wrap_check_security_groups_policy = policy_decorator( + scope='compute:security_groups') -def check_policy(context, action, target): - _action = 'compute:%s' % action +def check_policy(context, action, target, scope='compute'): + _action = '%s:%s' % (scope, action) nova.policy.enforce(context, _action, target) @@ -110,12 +118,13 @@ class API(base.Base): """API for interacting with the compute manager.""" def __init__(self, image_service=None, network_api=None, volume_api=None, - **kwargs): + security_group_api=None, **kwargs): self.image_service = (image_service or nova.image.get_default_image_service()) self.network_api = network_api or network.API() self.volume_api = volume_api or volume.API() + self.security_group_api = security_group_api or SecurityGroupAPI() self.consoleauth_rpcapi = consoleauth_rpcapi.ConsoleAuthAPI() self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI() self.compute_rpcapi = compute_rpcapi.ComputeAPI() @@ -389,7 +398,7 @@ class API(base.Base): kernel_id, ramdisk_id = self._handle_kernel_and_ramdisk( context, kernel_id, ramdisk_id, image, image_service) - self.ensure_default_security_group(context) + self.security_group_api.ensure_default(context) if key_data is None and key_name: key_pair = self.db.key_pair_get(context, context.user_id, key_name) @@ -771,80 +780,6 @@ class API(base.Base): return (inst_ret_list, reservation_id) - def ensure_default_security_group(self, context): - """Ensure that a context has a security group. - - Creates a security group for the security context if it does not - already exist. - - :param context: the security context - """ - try: - self.db.security_group_get_by_name(context, - context.project_id, - 'default') - except exception.NotFound: - values = {'name': 'default', - 'description': 'default', - 'user_id': context.user_id, - 'project_id': context.project_id} - self.db.security_group_create(context, values) - - def trigger_security_group_rules_refresh(self, context, security_group_id): - """Called when a rule is added to or removed from a security_group.""" - - security_group = self.db.security_group_get(context, security_group_id) - - hosts = set() - for instance in security_group['instances']: - if instance['host'] is not None: - hosts.add(instance['host']) - - for host in hosts: - self.compute_rpcapi.refresh_security_group_rules(context, - security_group.id, host=host) - - def trigger_security_group_members_refresh(self, context, group_ids): - """Called when a security group gains a new or loses a member. - - Sends an update request to each compute node for whom this is - relevant. - """ - # First, we get the security group rules that reference these groups as - # the grantee.. - security_group_rules = set() - for group_id in group_ids: - security_group_rules.update( - self.db.security_group_rule_get_by_security_group_grantee( - context, - group_id)) - - # ..then we distill the security groups to which they belong.. - security_groups = set() - for rule in security_group_rules: - security_group = self.db.security_group_get( - context, - rule['parent_group_id']) - security_groups.add(security_group) - - # ..then we find the instances that are members of these groups.. - instances = set() - for security_group in security_groups: - for instance in security_group['instances']: - instances.add(instance) - - # ...then we find the hosts where they live... - hosts = set() - for instance in instances: - if instance['host']: - hosts.add(instance['host']) - - # ...and finally we tell these nodes to refresh their view of this - # particular security group. - for host in hosts: - self.compute_rpcapi.refresh_security_group_members(context, - group_id, host=host) - def trigger_provider_fw_rules_refresh(self, context): """Called when a rule is added/removed from a provider firewall""" @@ -853,81 +788,6 @@ class API(base.Base): for host in hosts: self.compute_rpcapi.refresh_provider_fw_rules(context, host) - def _is_security_group_associated_with_server(self, security_group, - instance_uuid): - """Check if the security group is already associated - with the instance. If Yes, return True. - """ - - if not security_group: - return False - - instances = security_group.get('instances') - if not instances: - return False - - for inst in instances: - if (instance_uuid == inst['uuid']): - return True - - return False - - @wrap_check_policy - def add_security_group(self, context, instance, security_group_name): - """Add security group to the instance""" - security_group = self.db.security_group_get_by_name(context, - context.project_id, - security_group_name) - - instance_uuid = instance['uuid'] - - #check if the security group is associated with the server - if self._is_security_group_associated_with_server(security_group, - instance_uuid): - raise exception.SecurityGroupExistsForInstance( - security_group_id=security_group['id'], - instance_id=instance_uuid) - - #check if the instance is in running state - if instance['power_state'] != power_state.RUNNING: - raise exception.InstanceNotRunning(instance_id=instance_uuid) - - self.db.instance_add_security_group(context.elevated(), - instance_uuid, - security_group['id']) - # NOTE(comstud): No instance_uuid argument to this compute manager - # call - self.compute_rpcapi.refresh_security_group_rules(context, - security_group['id'], host=instance['host']) - - @wrap_check_policy - def remove_security_group(self, context, instance, security_group_name): - """Remove the security group associated with the instance""" - security_group = self.db.security_group_get_by_name(context, - context.project_id, - security_group_name) - - instance_uuid = instance['uuid'] - - #check if the security group is associated with the server - if not self._is_security_group_associated_with_server(security_group, - instance_uuid): - raise exception.SecurityGroupNotExistsForInstance( - security_group_id=security_group['id'], - instance_id=instance_uuid) - - #check if the instance is in running state - if instance['power_state'] != power_state.RUNNING: - raise exception.InstanceNotRunning(instance_id=instance_uuid) - - self.db.instance_remove_security_group(context.elevated(), - instance_uuid, - security_group['id']) - # NOTE(comstud): No instance_uuid argument to this compute manager - # call - self.compute_rpcapi.refresh_security_group_rules(context, - security_group['id'], host=instance['host']) - @wrap_check_policy def update(self, context, instance, **kwargs): """Updates the instance in the datastore. @@ -2065,3 +1925,473 @@ class KeypairAPI(base.Base): 'fingerprint': key_pair['fingerprint'], }) return rval + + +class SecurityGroupAPI(base.Base): + """ + Sub-set of the Compute API related to managing security groups + and security group rules + """ + def __init__(self, **kwargs): + super(SecurityGroupAPI, self).__init__(**kwargs) + self.security_group_rpcapi = compute_rpcapi.SecurityGroupAPI() + self.sgh = importutils.import_object(FLAGS.security_group_handler) + + def validate_property(self, value, property, allowed): + """ + Validate given security group property. + + :param value: the value to validate, as a string or unicode + :param property: the property, either 'name' or 'description' + :param allowed: the range of characters allowed + """ + + try: + val = value.strip() + except AttributeError: + msg = _("Security group %s is not a string or unicode") % property + self.raise_invalid_property(msg) + if not val: + msg = _("Security group %s cannot be empty.") % property + self.raise_invalid_property(msg) + + if allowed and not re.match(allowed, val): + # Some validation to ensure that values match API spec. + # - Alphanumeric characters, spaces, dashes, and underscores. + # TODO(Daviey): LP: #813685 extend beyond group_name checking, and + # probably create a param validator that can be used elsewhere. + msg = (_("Value (%(value)s) for parameter Group%(property)s is " + "invalid. Content limited to '%(allowed)'.") % + dict(value=value, allowed=allowed, + property=property.capitalize())) + self.raise_invalid_property(msg) + if len(val) > 255: + msg = _("Security group %s should not be greater " + "than 255 characters.") % property + self.raise_invalid_property(msg) + + def ensure_default(self, context): + """Ensure that a context has a security group. + + Creates a security group for the security context if it does not + already exist. + + :param context: the security context + """ + try: + self.db.security_group_get_by_name(context, + context.project_id, + 'default') + except exception.NotFound: + values = {'name': 'default', + 'description': 'default', + 'user_id': context.user_id, + 'project_id': context.project_id} + self.db.security_group_create(context, values) + + def create(self, context, name, description): + try: + reservations = QUOTAS.reserve(context, security_groups=1) + except exception.OverQuota: + msg = _("Quota exceeded, too many security groups.") + self.raise_over_quota(msg) + + LOG.audit(_("Create Security Group %s"), name, context=context) + + self.ensure_default(context) + + if self.db.security_group_exists(context, context.project_id, name): + msg = _('Security group %s already exists') % name + self.raise_group_already_exists(msg) + + try: + group = {'user_id': context.user_id, + 'project_id': context.project_id, + 'name': name, + 'description': description} + group_ref = self.db.security_group_create(context, group) + self.sgh.trigger_security_group_create_refresh(context, group) + # Commit the reservation + QUOTAS.commit(context, reservations) + except Exception: + with excutils.save_and_reraise_exception(): + QUOTAS.rollback(context, reservations) + + return group_ref + + def get(self, context, name=None, id=None, map_exception=False): + self.ensure_default(context) + try: + if name: + return self.db.security_group_get_by_name(context, + context.project_id, + name) + elif id: + return self.db.security_group_get(context, id) + except exception.NotFound as exp: + if map_exception: + msg = unicode(exp) + self.raise_not_found(msg) + else: + raise + + def list(self, context, names=None, ids=None, project=None): + self.ensure_default(context) + + groups = [] + if names or ids: + if names: + for name in names: + groups.append(self.db.security_group_get_by_name(context, + project, + name)) + if ids: + for id in ids: + groups.append(self.db.security_group_get(context, id)) + + elif context.is_admin: + groups = self.db.security_group_get_all(context) + + elif project: + groups = self.db.security_group_get_by_project(context, project) + + return groups + + def destroy(self, context, security_group): + if self.db.security_group_in_use(context, security_group.id): + msg = _("Security group is still in use") + self.raise_invalid_group(msg) + + # Get reservations + try: + reservations = QUOTAS.reserve(context, security_groups=-1) + except Exception: + reservations = None + LOG.exception(_("Failed to update usages deallocating " + "security group")) + + LOG.audit(_("Delete security group %s"), security_group.name, + context=context) + self.db.security_group_destroy(context, security_group.id) + + self.sgh.trigger_security_group_destroy_refresh(context, + security_group.id) + + # Commit the reservations + if reservations: + QUOTAS.commit(context, reservations) + + def is_associated_with_server(self, security_group, instance_uuid): + """Check if the security group is already associated + with the instance. If Yes, return True. + """ + + if not security_group: + return False + + instances = security_group.get('instances') + if not instances: + return False + + for inst in instances: + if (instance_uuid == inst['uuid']): + return True + + return False + + @wrap_check_security_groups_policy + def add_to_instance(self, context, instance, security_group_name): + """Add security group to the instance""" + security_group = self.db.security_group_get_by_name(context, + context.project_id, + security_group_name) + + instance_uuid = instance['uuid'] + + #check if the security group is associated with the server + if self.is_associated_with_server(security_group, instance_uuid): + raise exception.SecurityGroupExistsForInstance( + security_group_id=security_group['id'], + instance_id=instance_uuid) + + #check if the instance is in running state + if instance['power_state'] != power_state.RUNNING: + raise exception.InstanceNotRunning(instance_id=instance_uuid) + + self.db.instance_add_security_group(context.elevated(), + instance_uuid, + security_group['id']) + params = {"security_group_id": security_group['id']} + # NOTE(comstud): No instance_uuid argument to this compute manager + # call + self.security_group_rpcapi.refresh_security_group_rules(context, + security_group['id'], host=instance['host']) + + self.trigger_handler('instance_add_security_group', + context, instance, security_group_name) + + @wrap_check_security_groups_policy + def remove_from_instance(self, context, instance, security_group_name): + """Remove the security group associated with the instance""" + security_group = self.db.security_group_get_by_name(context, + context.project_id, + security_group_name) + + instance_uuid = instance['uuid'] + + #check if the security group is associated with the server + if not self.is_associated_with_server(security_group, instance_uuid): + raise exception.SecurityGroupNotExistsForInstance( + security_group_id=security_group['id'], + instance_id=instance_uuid) + + #check if the instance is in running state + if instance['power_state'] != power_state.RUNNING: + raise exception.InstanceNotRunning(instance_id=instance_uuid) + + self.db.instance_remove_security_group(context.elevated(), + instance_uuid, + security_group['id']) + params = {"security_group_id": security_group['id']} + # NOTE(comstud): No instance_uuid argument to this compute manager + # call + self.security_group_rpcapi.refresh_security_group_rules(context, + security_group['id'], host=instance['host']) + + self.trigger_handler('instance_remove_security_group', + context, instance, security_group_name) + + def trigger_handler(self, event, *args): + handle = getattr(self.sgh, 'trigger_%s_refresh' % event) + handle(*args) + + def trigger_rules_refresh(self, context, id): + """Called when a rule is added to or removed from a security_group.""" + + security_group = self.db.security_group_get(context, id) + + hosts = set() + for instance in security_group['instances']: + if instance['host'] is not None: + hosts.add(instance['host']) + + for host in hosts: + self.security_group_rpcapi.refresh_security_group_rules(context, + security_group.id, host=host) + + def trigger_members_refresh(self, context, group_ids): + """Called when a security group gains a new or loses a member. + + Sends an update request to each compute node for whom this is + relevant. + """ + # First, we get the security group rules that reference these groups as + # the grantee.. + security_group_rules = set() + for group_id in group_ids: + security_group_rules.update( + self.db.security_group_rule_get_by_security_group_grantee( + context, + group_id)) + + # ..then we distill the security groups to which they belong.. + security_groups = set() + for rule in security_group_rules: + security_group = self.db.security_group_get( + context, + rule['parent_group_id']) + security_groups.add(security_group) + + # ..then we find the instances that are members of these groups.. + instances = set() + for security_group in security_groups: + for instance in security_group['instances']: + instances.add(instance) + + # ...then we find the hosts where they live... + hosts = set() + for instance in instances: + if instance['host']: + hosts.add(instance['host']) + + # ...and finally we tell these nodes to refresh their view of this + # particular security group. + for host in hosts: + self.security_group_rpcapi.refresh_security_group_members(context, + group_id, host=host) + + def parse_cidr(self, cidr): + if cidr: + try: + cidr = urllib.unquote(cidr).decode() + except Exception as e: + self.raise_invalid_cidr(cidr, e) + + if not utils.is_valid_cidr(cidr): + self.raise_invalid_cidr(cidr) + + return cidr + else: + return '0.0.0.0/0' + + @staticmethod + def new_group_ingress_rule(grantee_group_id, protocol, from_port, + to_port): + return SecurityGroupAPI._new_ingress_rule(protocol, from_port, + to_port, group_id=grantee_group_id) + + @staticmethod + def new_cidr_ingress_rule(grantee_cidr, protocol, from_port, to_port): + return SecurityGroupAPI._new_ingress_rule(protocol, from_port, + to_port, cidr=grantee_cidr) + + @staticmethod + def _new_ingress_rule(ip_protocol, from_port, to_port, + group_id=None, cidr=None): + values = {} + + if group_id: + values['group_id'] = group_id + # Open everything if an explicit port range or type/code are not + # specified, but only if a source group was specified. + ip_proto_upper = ip_protocol.upper() if ip_protocol else '' + if (ip_proto_upper == 'ICMP' and + from_port is None and to_port is None): + from_port = -1 + to_port = -1 + elif (ip_proto_upper in ['TCP', 'UDP'] and from_port is None + and to_port is None): + from_port = 1 + to_port = 65535 + + elif cidr: + values['cidr'] = cidr + + if ip_protocol and from_port is not None and to_port is not None: + + ip_protocol = str(ip_protocol) + try: + # Verify integer conversions + from_port = int(from_port) + to_port = int(to_port) + except ValueError: + if ip_protocol.upper() == 'ICMP': + raise exception.InvalidInput(reason="Type and" + " Code must be integers for ICMP protocol type") + else: + raise exception.InvalidInput(reason="To and From ports " + "must be integers") + + if ip_protocol.upper() not in ['TCP', 'UDP', 'ICMP']: + raise exception.InvalidIpProtocol(protocol=ip_protocol) + + # Verify that from_port must always be less than + # or equal to to_port + if (ip_protocol.upper() in ['TCP', 'UDP'] and + (from_port > to_port)): + raise exception.InvalidPortRange(from_port=from_port, + to_port=to_port, msg="Former value cannot" + " be greater than the later") + + # Verify valid TCP, UDP port ranges + if (ip_protocol.upper() in ['TCP', 'UDP'] and + (from_port < 1 or to_port > 65535)): + raise exception.InvalidPortRange(from_port=from_port, + to_port=to_port, msg="Valid TCP ports should" + " be between 1-65535") + + # Verify ICMP type and code + if (ip_protocol.upper() == "ICMP" and + (from_port < -1 or from_port > 255 or + to_port < -1 or to_port > 255)): + raise exception.InvalidPortRange(from_port=from_port, + to_port=to_port, msg="For ICMP, the" + " type:code must be valid") + + values['protocol'] = ip_protocol + values['from_port'] = from_port + values['to_port'] = to_port + + else: + # If cidr based filtering, protocol and ports are mandatory + if cidr: + return None + + return values + + def rule_exists(self, security_group, values): + """Indicates whether the specified rule values are already + defined in the given security group. + """ + for rule in security_group.rules: + is_duplicate = True + keys = ('group_id', 'cidr', 'from_port', 'to_port', 'protocol') + for key in keys: + if rule.get(key) != values.get(key): + is_duplicate = False + break + if is_duplicate: + return rule.get('id') or True + return False + + def get_rule(self, context, id): + self.ensure_default(context) + try: + return self.db.security_group_rule_get(context, id) + except exception.NotFound: + msg = _("Rule (%s) not found") % id + self.raise_not_found(msg) + + def add_rules(self, context, id, name, vals): + count = QUOTAS.count(context, 'security_group_rules', id) + try: + projected = count + len(vals) + QUOTAS.limit_check(context, security_group_rules=projected) + except exception.OverQuota: + msg = _("Quota exceeded, too many security group rules.") + self.raise_over_quota(msg) + + msg = _("Authorize security group ingress %s") + LOG.audit(msg, name, context=context) + + rules = [self.db.security_group_rule_create(context, v) for v in vals] + + self.trigger_rules_refresh(context, id=id) + self.trigger_handler('security_group_rule_create', context, + [r['id'] for r in rules]) + return rules + + def remove_rules(self, context, security_group, rule_ids): + msg = _("Revoke security group ingress %s") + LOG.audit(msg, security_group['name'], context=context) + + for rule_id in rule_ids: + self.db.security_group_rule_destroy(context, rule_id) + + # NOTE(vish): we removed some rules, so refresh + self.trigger_rules_refresh(context, id=security_group['id']) + self.trigger_handler('security_group_rule_destroy', context, rule_ids) + + @staticmethod + def raise_invalid_property(msg): + raise NotImplementedError() + + @staticmethod + def raise_group_already_exists(msg): + raise NotImplementedError() + + @staticmethod + def raise_invalid_group(msg): + raise NotImplementedError() + + @staticmethod + def raise_invalid_cidr(cidr, decoding_exception=None): + raise NotImplementedError() + + @staticmethod + def raise_over_quota(msg): + raise NotImplementedError() + + @staticmethod + def raise_not_found(msg): + raise NotImplementedError() diff --git a/nova/compute/rpcapi.py b/nova/compute/rpcapi.py index 8c25906c9..e7945c7d4 100644 --- a/nova/compute/rpcapi.py +++ b/nova/compute/rpcapi.py @@ -27,6 +27,27 @@ import nova.rpc.proxy FLAGS = flags.FLAGS +def _compute_topic(topic, ctxt, host, instance): + '''Get the topic to use for a message. + + :param topic: the base topic + :param ctxt: request context + :param host: explicit host to send the message to. + :param instance: If an explicit host was not specified, use + instance['host'] + + :returns: A topic string + ''' + if not host: + if not instance: + raise exception.NovaException(_('No compute host specified')) + host = instance['host'] + if not host: + raise exception.NovaException(_('Unable to find host for ' + 'Instance %s') % instance['uuid']) + return rpc.queue_get_for(ctxt, topic, host) + + class ComputeAPI(nova.rpc.proxy.RpcProxy): '''Client side of the compute rpc API. @@ -41,25 +62,6 @@ class ComputeAPI(nova.rpc.proxy.RpcProxy): super(ComputeAPI, self).__init__(topic=FLAGS.compute_topic, default_version=self.RPC_API_VERSION) - def _compute_topic(self, ctxt, host, instance): - '''Get the topic to use for a message. - - :param ctxt: request context - :param host: explicit host to send the message to. - :param instance: If an explicit host was not specified, use - instance['host'] - - :returns: A topic string - ''' - if not host: - if not instance: - raise exception.NovaException(_('No compute host specified')) - host = instance['host'] - if not host: - raise exception.NovaException(_('Unable to find host for ' - 'Instance %s') % instance['uuid']) - return rpc.queue_get_for(ctxt, self.topic, host) - def add_aggregate_host(self, ctxt, aggregate_id, host_param, host): '''Add aggregate host. @@ -71,90 +73,90 @@ class ComputeAPI(nova.rpc.proxy.RpcProxy): ''' self.cast(ctxt, self.make_msg('add_aggregate_host', aggregate_id=aggregate_id, host=host_param), - topic=self._compute_topic(ctxt, host, None)) + topic=_compute_topic(self.topic, ctxt, host, None)) def add_fixed_ip_to_instance(self, ctxt, instance, network_id): self.cast(ctxt, self.make_msg('add_fixed_ip_to_instance', instance_uuid=instance['uuid'], network_id=network_id), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def attach_volume(self, ctxt, instance, volume_id, mountpoint): self.cast(ctxt, self.make_msg('attach_volume', instance_uuid=instance['uuid'], volume_id=volume_id, mountpoint=mountpoint), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def check_shared_storage_test_file(self, ctxt, filename, host): return self.call(ctxt, self.make_msg('check_shared_storage_test_file', filename=filename), - topic=self._compute_topic(ctxt, host, None)) + topic=_compute_topic(self.topic, ctxt, host, None)) def cleanup_shared_storage_test_file(self, ctxt, filename, host): self.cast(ctxt, self.make_msg('cleanup_shared_storage_test_file', filename=filename), - topic=self._compute_topic(ctxt, host, None)) + topic=_compute_topic(self.topic, ctxt, host, None)) def compare_cpu(self, ctxt, cpu_info, host): return self.call(ctxt, self.make_msg('compare_cpu', cpu_info=cpu_info), - topic=self._compute_topic(ctxt, host, None)) + topic=_compute_topic(self.topic, ctxt, host, None)) def confirm_resize(self, ctxt, instance, migration_id, host, cast=True): rpc_method = self.cast if cast else self.call return rpc_method(ctxt, self.make_msg('confirm_resize', instance_uuid=instance['uuid'], migration_id=migration_id), - topic=self._compute_topic(ctxt, host, instance)) + topic=_compute_topic(self.topic, ctxt, host, instance)) def create_shared_storage_test_file(self, ctxt, host): return self.call(ctxt, self.make_msg('create_shared_storage_test_file'), - topic=self._compute_topic(ctxt, host, None)) + topic=_compute_topic(self.topic, ctxt, host, None)) def detach_volume(self, ctxt, instance, volume_id): self.cast(ctxt, self.make_msg('detach_volume', instance_uuid=instance['uuid'], volume_id=volume_id), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def finish_resize(self, ctxt, instance, migration_id, image, disk_info, host): self.cast(ctxt, self.make_msg('finish_resize', instance_uuid=instance['uuid'], migration_id=migration_id, image=image, disk_info=disk_info), - topic=self._compute_topic(ctxt, host, None)) + topic=_compute_topic(self.topic, ctxt, host, None)) def finish_revert_resize(self, ctxt, instance, migration_id, host): self.cast(ctxt, self.make_msg('finish_revert_resize', instance_uuid=instance['uuid'], migration_id=migration_id), - topic=self._compute_topic(ctxt, host, None)) + topic=_compute_topic(self.topic, ctxt, host, None)) def get_console_output(self, ctxt, instance, tail_length): return self.call(ctxt, self.make_msg('get_console_output', instance_uuid=instance['uuid'], tail_length=tail_length), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def get_console_pool_info(self, ctxt, console_type, host): return self.call(ctxt, self.make_msg('get_console_pool_info', console_type=console_type), - topic=self._compute_topic(ctxt, host, None)) + topic=_compute_topic(self.topic, ctxt, host, None)) def get_console_topic(self, ctxt, host): return self.call(ctxt, self.make_msg('get_console_topic'), - topic=self._compute_topic(ctxt, host, None)) + topic=_compute_topic(self.topic, ctxt, host, None)) def get_diagnostics(self, ctxt, instance): return self.call(ctxt, self.make_msg('get_diagnostics', instance_uuid=instance['uuid']), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def get_instance_disk_info(self, ctxt, instance): return self.call(ctxt, self.make_msg('get_instance_disk_info', instance_name=instance['name']), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def get_vnc_console(self, ctxt, instance, console_type): return self.call(ctxt, self.make_msg('get_vnc_console', instance_uuid=instance['uuid'], console_type=console_type), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def host_maintenance_mode(self, ctxt, host_param, mode, host): '''Set host maintenance mode @@ -167,60 +169,61 @@ class ComputeAPI(nova.rpc.proxy.RpcProxy): ''' return self.call(ctxt, self.make_msg('host_maintenance_mode', host=host_param, mode=mode), - topic=self._compute_topic(ctxt, host, None)) + topic=_compute_topic(self.topic, ctxt, host, None)) def host_power_action(self, ctxt, action, host): + topic = _compute_topic(self.topic, ctxt, host, None) return self.call(ctxt, self.make_msg('host_power_action', - action=action), topic=self._compute_topic(ctxt, host, None)) + action=action), topic) def inject_file(self, ctxt, instance, path, file_contents): self.cast(ctxt, self.make_msg('inject_file', instance_uuid=instance['uuid'], path=path, file_contents=file_contents), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def inject_network_info(self, ctxt, instance): self.cast(ctxt, self.make_msg('inject_network_info', instance_uuid=instance['uuid']), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def lock_instance(self, ctxt, instance): self.cast(ctxt, self.make_msg('lock_instance', instance_uuid=instance['uuid']), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def post_live_migration_at_destination(self, ctxt, instance, block_migration, host): return self.call(ctxt, self.make_msg('post_live_migration_at_destination', instance_id=instance['id'], block_migration=block_migration), - self._compute_topic(ctxt, host, None)) + _compute_topic(self.topic, ctxt, host, None)) def pause_instance(self, ctxt, instance): self.cast(ctxt, self.make_msg('pause_instance', instance_uuid=instance['uuid']), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def power_off_instance(self, ctxt, instance): self.cast(ctxt, self.make_msg('power_off_instance', instance_uuid=instance['uuid']), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def power_on_instance(self, ctxt, instance): self.cast(ctxt, self.make_msg('power_on_instance', instance_uuid=instance['uuid']), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def pre_live_migration(self, ctxt, instance, block_migration, disk, host): return self.call(ctxt, self.make_msg('pre_live_migration', instance_id=instance['id'], block_migration=block_migration, - disk=disk), self._compute_topic(ctxt, host, None)) + disk=disk), _compute_topic(self.topic, ctxt, host, None)) def reboot_instance(self, ctxt, instance, reboot_type): self.cast(ctxt, self.make_msg('reboot_instance', instance_uuid=instance['uuid'], reboot_type=reboot_type), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def rebuild_instance(self, ctxt, instance, new_pass, injected_files, image_ref, orig_image_ref): @@ -228,22 +231,22 @@ class ComputeAPI(nova.rpc.proxy.RpcProxy): instance_uuid=instance['uuid'], new_pass=new_pass, injected_files=injected_files, image_ref=image_ref, orig_image_ref=orig_image_ref), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def refresh_provider_fw_rules(self, ctxt, host): self.cast(ctxt, self.make_msg('refresh_provider_fw_rules'), - self._compute_topic(ctxt, host, None)) + _compute_topic(self.topic, ctxt, host, None)) def refresh_security_group_rules(self, ctxt, security_group_id, host): self.cast(ctxt, self.make_msg('refresh_security_group_rules', security_group_id=security_group_id), - topic=self._compute_topic(ctxt, host, None)) + topic=_compute_topic(self.topic, ctxt, host, None)) def refresh_security_group_members(self, ctxt, security_group_id, host): self.cast(ctxt, self.make_msg('refresh_security_group_members', security_group_id=security_group_id), - topic=self._compute_topic(ctxt, host, None)) + topic=_compute_topic(self.topic, ctxt, host, None)) def remove_aggregate_host(self, ctxt, aggregate_id, host_param, host): '''Remove aggregate host. @@ -256,57 +259,59 @@ class ComputeAPI(nova.rpc.proxy.RpcProxy): ''' self.cast(ctxt, self.make_msg('remove_aggregate_host', aggregate_id=aggregate_id, host=host_param), - topic=self._compute_topic(ctxt, host, None)) + topic=_compute_topic(self.topic, ctxt, host, None)) def remove_fixed_ip_from_instance(self, ctxt, instance, address): self.cast(ctxt, self.make_msg('remove_fixed_ip_from_instance', instance_uuid=instance['uuid'], address=address), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def remove_volume_connection(self, ctxt, instance, volume_id, host): return self.call(ctxt, self.make_msg('remove_volume_connection', instance_id=instance['id'], volume_id=volume_id), - topic=self._compute_topic(ctxt, host, None)) + topic=_compute_topic(self.topic, ctxt, host, None)) def rescue_instance(self, ctxt, instance, rescue_password): self.cast(ctxt, self.make_msg('rescue_instance', instance_uuid=instance['uuid'], rescue_password=rescue_password), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def reset_network(self, ctxt, instance): self.cast(ctxt, self.make_msg('reset_network', instance_uuid=instance['uuid']), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def resize_instance(self, ctxt, instance, migration_id, image): + topic = _compute_topic(self.topic, ctxt, None, instance) self.cast(ctxt, self.make_msg('resize_instance', instance_uuid=instance['uuid'], migration_id=migration_id, - image=image), topic=self._compute_topic(ctxt, None, instance)) + image=image), topic) def resume_instance(self, ctxt, instance): self.cast(ctxt, self.make_msg('resume_instance', instance_uuid=instance['uuid']), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def revert_resize(self, ctxt, instance, migration_id, host): self.cast(ctxt, self.make_msg('revert_resize', instance_uuid=instance['uuid'], migration_id=migration_id), - topic=self._compute_topic(ctxt, host, instance)) + topic=_compute_topic(self.topic, ctxt, host, instance)) def rollback_live_migration_at_destination(self, ctxt, instance, host): self.cast(ctxt, self.make_msg('rollback_live_migration_at_destination', instance_id=instance['id']), - topic=self._compute_topic(ctxt, host, None)) + topic=_compute_topic(self.topic, ctxt, host, None)) def set_admin_password(self, ctxt, instance, new_pass): self.cast(ctxt, self.make_msg('set_admin_password', instance_uuid=instance['uuid'], new_pass=new_pass), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def set_host_enabled(self, ctxt, enabled, host): + topic = _compute_topic(self.topic, ctxt, host, None) return self.call(ctxt, self.make_msg('set_host_enabled', - enabled=enabled), topic=self._compute_topic(ctxt, host, None)) + enabled=enabled), topic) def snapshot_instance(self, ctxt, instance, image_id, image_type, backup_type, rotation): @@ -314,40 +319,66 @@ class ComputeAPI(nova.rpc.proxy.RpcProxy): instance_uuid=instance['uuid'], image_id=image_id, image_type=image_type, backup_type=backup_type, rotation=rotation), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def start_instance(self, ctxt, instance): self.cast(ctxt, self.make_msg('start_instance', instance_uuid=instance['uuid']), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def stop_instance(self, ctxt, instance, cast=True): rpc_method = self.cast if cast else self.call return rpc_method(ctxt, self.make_msg('stop_instance', instance_uuid=instance['uuid']), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def suspend_instance(self, ctxt, instance): self.cast(ctxt, self.make_msg('suspend_instance', instance_uuid=instance['uuid']), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def terminate_instance(self, ctxt, instance): self.cast(ctxt, self.make_msg('terminate_instance', instance_uuid=instance['uuid']), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def unlock_instance(self, ctxt, instance): self.cast(ctxt, self.make_msg('unlock_instance', instance_uuid=instance['uuid']), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def unpause_instance(self, ctxt, instance): self.cast(ctxt, self.make_msg('unpause_instance', instance_uuid=instance['uuid']), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) def unrescue_instance(self, ctxt, instance): self.cast(ctxt, self.make_msg('unrescue_instance', instance_uuid=instance['uuid']), - topic=self._compute_topic(ctxt, None, instance)) + topic=_compute_topic(self.topic, ctxt, None, instance)) + + +class SecurityGroupAPI(nova.rpc.proxy.RpcProxy): + '''Client side of the security group rpc API. + + API version history: + + 1.0 - Initial version. + ''' + + RPC_API_VERSION = '1.0' + + def __init__(self): + super(SecurityGroupAPI, self).__init__(topic=FLAGS.compute_topic, + default_version=self.RPC_API_VERSION) + + def refresh_security_group_rules(self, ctxt, security_group_id, host): + self.cast(ctxt, self.make_msg('refresh_security_group_rules', + security_group_id=security_group_id), + topic=_compute_topic(self.topic, ctxt, host, None)) + + def refresh_security_group_members(self, ctxt, security_group_id, + host): + self.cast(ctxt, self.make_msg('refresh_security_group_members', + security_group_id=security_group_id), + topic=_compute_topic(self.topic, ctxt, host, None)) -- cgit