summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--nova/api/openstack/compute/contrib/security_groups.py57
-rw-r--r--nova/compute/api.py4
-rwxr-xr-xnova/compute/manager.py20
-rw-r--r--nova/exception.py10
-rw-r--r--nova/network/api.py2
-rw-r--r--nova/network/quantumv2/api.py51
-rw-r--r--nova/network/security_group/openstack_driver.py13
-rw-r--r--nova/network/security_group/quantum_driver.py398
-rw-r--r--nova/network/security_group/security_group_base.py3
-rw-r--r--nova/tests/api/openstack/compute/contrib/test_quantum_security_groups.py639
-rw-r--r--nova/tests/api/openstack/compute/contrib/test_security_groups.py191
-rw-r--r--nova/tests/compute/test_compute.py6
-rw-r--r--nova/utils.py9
13 files changed, 1317 insertions, 86 deletions
diff --git a/nova/api/openstack/compute/contrib/security_groups.py b/nova/api/openstack/compute/contrib/security_groups.py
index 3f48176cc..092e89b9b 100644
--- a/nova/api/openstack/compute/contrib/security_groups.py
+++ b/nova/api/openstack/compute/contrib/security_groups.py
@@ -16,8 +16,10 @@
"""The security groups extension."""
+import json
import webob
from webob import exc
+from xml.dom import minidom
from nova.api.openstack import common
from nova.api.openstack import extensions
@@ -28,6 +30,7 @@ from nova.compute import api as compute_api
from nova import db
from nova import exception
from nova.network.security_group import openstack_driver
+from nova.network.security_group import quantum_driver
from nova.openstack.common import log as logging
from nova import utils
from nova.virt import netutils
@@ -194,8 +197,8 @@ class SecurityGroupControllerBase(object):
if rule['group_id']:
source_group = self.security_group_api.get(context,
id=rule['group_id'])
- sg_rule['group'] = {'name': source_group.name,
- 'tenant_id': source_group.project_id}
+ sg_rule['group'] = {'name': source_group.get('name'),
+ 'tenant_id': source_group.get('project_id')}
else:
sg_rule['ip_range'] = {'cidr': rule['cidr']}
return sg_rule
@@ -466,11 +469,46 @@ class SecurityGroupsOutputController(wsgi.Controller):
def _extend_servers(self, req, servers):
key = "security_groups"
- for server in servers:
- instance = req.get_db_instance(server['id'])
- groups = instance.get(key)
- if groups:
- server[key] = [{"name": group["name"]} for group in groups]
+ if not openstack_driver.is_quantum_security_groups():
+ for server in servers:
+ instance = req.get_db_instance(server['id'])
+ groups = instance.get(key)
+ if groups:
+ server[key] = [{"name": group["name"]} for group in groups]
+ else:
+ # If method is a POST we get the security groups intended for an
+ # instance from the request. The reason for this is if using
+ # quantum security groups the requested security groups for the
+ # instance are not in the db and have not been sent to quantum yet.
+ instance_sgs = []
+ if req.method != 'POST':
+ for server in servers:
+ instance_sgs = (
+ self.security_group_api.get_instance_security_groups(
+ req, server['id']))
+ else:
+ try:
+ # try converting to json
+ req_obj = json.loads(req.body)
+ # Add security group to server, if no security group was in
+ # request add default since that is the group it is part of
+ instance_sgs = req_obj['server'].get(
+ key, [{'name': 'default'}])
+ except ValueError:
+ root = minidom.parseString(req.body)
+ sg_root = root.getElementsByTagName(key)
+ if sg_root:
+ security_groups = sg_root[0].getElementsByTagName(
+ 'security_group')
+ for security_group in security_groups:
+ instance_sgs.append(
+ {'name': security_group.getAttribute('name')})
+ if not instance_sgs:
+ instance_sgs = [{'name': 'default'}]
+
+ if instance_sgs:
+ for server in servers:
+ server[key] = instance_sgs
def _show(self, req, resp_obj):
if not softauth(req.environ['nova.context']):
@@ -587,3 +625,8 @@ class NativeSecurityGroupExceptions(object):
class NativeNovaSecurityGroupAPI(compute_api.SecurityGroupAPI,
NativeSecurityGroupExceptions):
pass
+
+
+class NativeQuantumSecurityGroupAPI(quantum_driver.SecurityGroupAPI,
+ NativeSecurityGroupExceptions):
+ pass
diff --git a/nova/compute/api.py b/nova/compute/api.py
index 90b1e9176..cc07a998a 100644
--- a/nova/compute/api.py
+++ b/nova/compute/api.py
@@ -2747,6 +2747,10 @@ class SecurityGroupAPI(base.Base, security_group_base.SecurityGroupBase):
Sub-set of the Compute API related to managing security groups
and security group rules
"""
+
+ # The nova seurity group api does not use a uuid for the id.
+ id_is_uuid = False
+
def __init__(self, **kwargs):
super(SecurityGroupAPI, self).__init__(**kwargs)
self.security_group_rpcapi = compute_rpcapi.SecurityGroupAPI()
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index ed95ff8df..afeb9f02e 100755
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -57,6 +57,7 @@ from nova.image import glance
from nova import manager
from nova import network
from nova.network import model as network_model
+from nova.network.security_group import openstack_driver
from nova.openstack.common import excutils
from nova.openstack.common import jsonutils
from nova.openstack.common import lockutils
@@ -332,7 +333,8 @@ class ComputeManager(manager.SchedulerDependentManager):
self.compute_rpcapi = compute_rpcapi.ComputeAPI()
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
self.conductor_api = conductor.API()
-
+ self.is_quantum_security_groups = (
+ openstack_driver.is_quantum_security_groups())
super(ComputeManager, self).__init__(service_name="compute",
*args, **kwargs)
@@ -716,6 +718,13 @@ class ComputeManager(manager.SchedulerDependentManager):
"""Launch a new instance with specified options."""
context = context.elevated()
+ # If quantum security groups pass requested security
+ # groups to allocate_for_instance()
+ if request_spec and self.is_quantum_security_groups:
+ security_groups = request_spec.get('security_group')
+ else:
+ security_groups = []
+
try:
self._check_instance_exists(context, instance)
image_meta = self._check_image_size(context, instance)
@@ -747,7 +756,7 @@ class ComputeManager(manager.SchedulerDependentManager):
macs = self.driver.macs_for_instance(instance)
network_info = self._allocate_network(context, instance,
- requested_networks, macs)
+ requested_networks, macs, security_groups)
self._instance_update(
context, instance['uuid'],
@@ -982,7 +991,8 @@ class ComputeManager(manager.SchedulerDependentManager):
expected_task_state=(task_states.SCHEDULING,
None))
- def _allocate_network(self, context, instance, requested_networks, macs):
+ def _allocate_network(self, context, instance, requested_networks, macs,
+ security_groups):
"""Allocate networks for an instance and return the network info."""
instance = self._instance_update(context, instance['uuid'],
vm_state=vm_states.BUILDING,
@@ -994,7 +1004,9 @@ class ComputeManager(manager.SchedulerDependentManager):
network_info = self.network_api.allocate_for_instance(
context, instance, vpn=is_vpn,
requested_networks=requested_networks,
- macs=macs, conductor_api=self.conductor_api)
+ macs=macs,
+ conductor_api=self.conductor_api,
+ security_groups=security_groups)
except Exception:
LOG.exception(_('Instance failed network setup'),
instance=instance)
diff --git a/nova/exception.py b/nova/exception.py
index fd0122835..13e007ecf 100644
--- a/nova/exception.py
+++ b/nova/exception.py
@@ -736,6 +736,16 @@ class SecurityGroupDefaultRuleNotFound(Invalid):
message = _("Security group default rule (%rule_id)s not found.")
+class SecurityGroupCannotBeApplied(Invalid):
+ message = _("Network requires port_security_enabled and subnet associated"
+ " in order to apply security groups.")
+
+
+class NoUniqueMatch(NovaException):
+ message = _("No Unique Match Found.")
+ code = 409
+
+
class MigrationNotFound(NotFound):
message = _("Migration %(migration_id)s could not be found.")
diff --git a/nova/network/api.py b/nova/network/api.py
index f89cfb8de..4f0a2bffa 100644
--- a/nova/network/api.py
+++ b/nova/network/api.py
@@ -244,7 +244,7 @@ class API(base.Base):
@refresh_cache
def allocate_for_instance(self, context, instance, vpn,
requested_networks, macs=None,
- conductor_api=None):
+ conductor_api=None, security_groups=None):
"""Allocates all network structures for an instance.
TODO(someone): document the rest of these parameters.
diff --git a/nova/network/quantumv2/api.py b/nova/network/quantumv2/api.py
index 450445063..a177c1ac0 100644
--- a/nova/network/quantumv2/api.py
+++ b/nova/network/quantumv2/api.py
@@ -27,6 +27,7 @@ from nova import exception
from nova.network import api as network_api
from nova.network import model as network_model
from nova.network import quantumv2
+from nova.network.security_group import openstack_driver
from nova.openstack.common import excutils
from nova.openstack.common import log as logging
from nova.openstack.common import uuidutils
@@ -83,6 +84,7 @@ class API(base.Base):
"""API for interacting with the quantum 2.x API."""
conductor_api = conductor.API()
+ security_group_api = openstack_driver.get_openstack_security_group_driver()
def __init__(self):
super(API, self).__init__()
@@ -175,9 +177,55 @@ class API(base.Base):
nets = self._get_available_networks(context, instance['project_id'],
net_ids)
+ security_groups = kwargs.get('security_groups', [])
+ security_group_ids = []
+
+ # TODO(arosen) Should optimize more to do direct query for security
+ # group if len(security_groups) == 1
+ if len(security_groups):
+ search_opts = {'tenant_id': instance['project_id']}
+ user_security_groups = quantum.list_security_groups(
+ **search_opts).get('security_groups')
+
+ for security_group in security_groups:
+ name_match = None
+ uuid_match = None
+ for user_security_group in user_security_groups:
+ if user_security_group['name'] == security_group:
+ if name_match:
+ msg = (_("Multiple security groups found matching"
+ " '%s'. Use an ID to be more specific."),
+ security_group)
+ raise exception.NoUniqueMatch(msg)
+ name_match = user_security_group['id']
+ if user_security_group['id'] == security_group:
+ uuid_match = user_security_group['id']
+
+ # If a user names the security group the same as
+ # another's security groups uuid, the name takes priority.
+ if not name_match and not uuid_match:
+ raise exception.SecurityGroupNotFound(
+ security_group_id=security_group)
+ security_group_ids.append(name_match)
+ elif name_match:
+ security_group_ids.append(name_match)
+ elif uuid_match:
+ security_group_ids.append(uuid_match)
+
touched_port_ids = []
created_port_ids = []
for network in nets:
+ # If security groups are requested on an instance then the
+ # network must has a subnet associated with it. Some plugins
+ # implement the port-security extension which requires
+ # 'port_security_enabled' to be True for security groups.
+ # That is why True is returned if 'port_security_enabled'
+ # is not found.
+ if (security_groups and not (
+ network['subnets']
+ and network.get('port_security_enabled', True))):
+
+ raise exception.SecurityGroupCannotBeApplied()
network_id = network['id']
zone = 'compute:%s' % instance['availability_zone']
port_req_body = {'port': {'device_id': instance['uuid'],
@@ -194,6 +242,9 @@ class API(base.Base):
port_req_body['port']['network_id'] = network_id
port_req_body['port']['admin_state_up'] = True
port_req_body['port']['tenant_id'] = instance['project_id']
+ if security_group_ids:
+ port_req_body['port']['security_groups'] = (
+ security_group_ids)
if available_macs is not None:
if not available_macs:
raise exception.PortNotFree(
diff --git a/nova/network/security_group/openstack_driver.py b/nova/network/security_group/openstack_driver.py
index 91ce69891..46f3f3491 100644
--- a/nova/network/security_group/openstack_driver.py
+++ b/nova/network/security_group/openstack_driver.py
@@ -35,12 +35,25 @@ CONF.register_opts(security_group_opts)
NOVA_DRIVER = ('nova.api.openstack.compute.contrib.security_groups.'
'NativeNovaSecurityGroupAPI')
+QUANTUM_DRIVER = ('nova.api.openstack.compute.contrib.security_groups.'
+ 'NativeQuantumSecurityGroupAPI')
def get_openstack_security_group_driver():
if CONF.security_group_api.lower() == 'nova':
return importutils.import_object(NOVA_DRIVER)
+ elif CONF.security_group_api.lower() == 'quantum':
+ return importutils.import_object(QUANTUM_DRIVER)
+ else:
+ return importutils.import_object(CONF.security_group_api)
def get_security_group_handler():
return importutils.import_object(CONF.security_group_handler)
+
+
+def is_quantum_security_groups():
+ if CONF.security_group_api.lower() == "quantum":
+ return True
+ else:
+ return False
diff --git a/nova/network/security_group/quantum_driver.py b/nova/network/security_group/quantum_driver.py
new file mode 100644
index 000000000..918c839e9
--- /dev/null
+++ b/nova/network/security_group/quantum_driver.py
@@ -0,0 +1,398 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Nicira, Inc.
+# All Rights Reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Aaron Rosen, Nicira Networks, Inc.
+
+from oslo.config import cfg
+from quantumclient.common import exceptions as q_exc
+from quantumclient.quantum import v2_0 as quantumv20
+from webob import exc
+
+from nova.compute import api as compute_api
+from nova import context
+from nova import exception
+from nova.network import quantumv2
+from nova.network.security_group import security_group_base
+from nova.openstack.common import log as logging
+from nova.openstack.common import uuidutils
+
+
+from nova import utils
+
+
+wrap_check_security_groups_policy = compute_api.policy_decorator(
+ scope='compute:security_groups')
+
+CONF = cfg.CONF
+LOG = logging.getLogger(__name__)
+
+
+class SecurityGroupAPI(security_group_base.SecurityGroupBase):
+
+ id_is_uuid = True
+
+ def create_security_group(self, context, name, description):
+ quantum = quantumv2.get_client(context)
+ body = self._make_quantum_security_group_dict(name, description)
+ try:
+ security_group = quantum.create_security_group(
+ body).get('security_group')
+ except q_exc.QuantumClientException as e:
+ LOG.exception(_("Quantum Error creating security group %s"),
+ name)
+ if e.status_code == 401:
+ # TODO(arosen) Cannot raise generic response from quantum here
+ # as this error code could be related to bad input or over
+ # quota
+ raise exc.HTTPBadRequest()
+ raise e
+ return self._convert_to_nova_security_group_format(security_group)
+
+ def _convert_to_nova_security_group_format(self, security_group):
+ nova_group = {}
+ nova_group['id'] = security_group['id']
+ nova_group['description'] = security_group['description']
+ nova_group['name'] = security_group['name']
+ nova_group['project_id'] = security_group['tenant_id']
+ nova_group['rules'] = []
+ for rule in security_group.get('security_group_rules', []):
+ if rule['direction'] == 'ingress':
+ nova_group['rules'].append(
+ self._convert_to_nova_security_group_rule_format(rule))
+
+ return nova_group
+
+ def _convert_to_nova_security_group_rule_format(self, rule):
+ nova_rule = {}
+ nova_rule['id'] = rule['id']
+ nova_rule['parent_group_id'] = rule['security_group_id']
+ nova_rule['protocol'] = rule['protocol']
+ if rule['port_range_min'] is None:
+ nova_rule['from_port'] = -1
+ else:
+ nova_rule['from_port'] = rule['port_range_min']
+
+ if rule['port_range_max'] is None:
+ nova_rule['to_port'] = -1
+ else:
+ nova_rule['to_port'] = rule['port_range_max']
+ nova_rule['group_id'] = rule['source_group_id']
+ nova_rule['cidr'] = rule['source_ip_prefix']
+ return nova_rule
+
+ def get(self, context, name=None, id=None, map_exception=False):
+ quantum = quantumv2.get_client(context)
+ try:
+ group = quantum.show_security_group(id).get('security_group')
+ except q_exc.QuantumClientException as e:
+ if e.status_code == 404:
+ LOG.exception(_("Quantum Error getting security group %s"),
+ name)
+ self.raise_not_found(e.message)
+ else:
+ LOG.error(_("Quantum Error: %s"), e)
+ raise e
+
+ return self._convert_to_nova_security_group_format(group)
+
+ def list(self, context, names=None, ids=None, project=None,
+ search_opts=None):
+ """Returns list of security group rules owned by tenant."""
+ quantum = quantumv2.get_client(context)
+ try:
+ security_groups = quantum.list_security_groups().get(
+ 'security_groups')
+ except q_exc.QuantumClientException as e:
+ LOG.exception(_("Quantum Error getting security groups"))
+ raise e
+ converted_rules = []
+ for security_group in security_groups:
+ converted_rules.append(
+ self._convert_to_nova_security_group_format(security_group))
+ return converted_rules
+
+ def validate_id(self, id):
+ if not uuidutils.is_uuid_like(id):
+ msg = _("Security group id should be uuid")
+ self.raise_invalid_property(msg)
+ return id
+
+ def destroy(self, context, security_group):
+ """This function deletes a security group."""
+
+ quantum = quantumv2.get_client(context)
+ try:
+ quantum.delete_security_group(security_group['id'])
+ except q_exc.QuantumClientException as e:
+ if e.status_code == 404:
+ self.raise_not_found(e.message)
+ elif e.status_code == 409:
+ self.raise_invalid_property(e.message)
+ else:
+ LOG.error(_("Quantum Error: %s"), e)
+ raise e
+
+ def add_rules(self, context, id, name, vals):
+ """Add security group rule(s) to security group.
+
+ Note: the Nova security group API doesn't support adding muliple
+ security group rules at once but the EC2 one does. Therefore,
+ this function is writen to support both. Multiple rules are
+ installed to a security group in quantum using bulk support."""
+
+ quantum = quantumv2.get_client(context)
+ body = self._make_quantum_security_group_rules_list(vals)
+ try:
+ rules = quantum.create_security_group_rule(
+ body).get('security_group_rules')
+ except q_exc.QuantumClientException as e:
+ if e.status_code == 409:
+ LOG.exception(_("Quantum Error getting security group %s"),
+ name)
+ self.raise_not_found(e.message)
+ else:
+ LOG.exception(_("Quantum Error:"))
+ raise e
+ converted_rules = []
+ for rule in rules:
+ converted_rules.append(
+ self._convert_to_nova_security_group_rule_format(rule))
+ return converted_rules
+
+ def _make_quantum_security_group_dict(self, name, description):
+ return {'security_group': {'name': name,
+ 'description': description}}
+
+ def _make_quantum_security_group_rules_list(self, rules):
+ new_rules = []
+ for rule in rules:
+ new_rule = {}
+ # nova only supports ingress rules so all rules are ingress.
+ new_rule['direction'] = "ingress"
+ new_rule['protocol'] = rule.get('protocol')
+
+ # FIXME(arosen) Nova does not expose ethertype on security group
+ # rules. Therefore, in the case of self referential rules we
+ # should probably assume they want to allow both IPv4 and IPv6.
+ # Unfortunately, this would require adding two rules in quantum.
+ # The reason we do not do this is because when the user using the
+ # nova api wants to remove the rule we'd have to have some way to
+ # know that we should delete both of these rules in quantum.
+ # For now, self referential rules only support IPv4.
+ if not rule.get('cidr'):
+ new_rule['ethertype'] = 'IPv4'
+ else:
+ new_rule['ethertype'] = utils.get_ip_version(rule.get('cidr'))
+ new_rule['source_ip_prefix'] = rule.get('cidr')
+ new_rule['security_group_id'] = rule.get('parent_group_id')
+ new_rule['source_group_id'] = rule.get('group_id')
+ if rule['from_port'] != -1:
+ new_rule['port_range_min'] = rule['from_port']
+ if rule['to_port'] != -1:
+ new_rule['port_range_max'] = rule['to_port']
+ new_rules.append(new_rule)
+ return {'security_group_rules': new_rules}
+
+ def create_security_group_rule(self, context, security_group, new_rule):
+ return self.add_rules(context, new_rule['parent_group_id'],
+ security_group['name'], [new_rule])[0]
+
+ def remove_rules(self, context, security_group, rule_ids):
+ quantum = quantumv2.get_client(context)
+ rule_ids = set(rule_ids)
+ try:
+ # The ec2 api allows one to delete multiple security group rules
+ # at once. Since there is no bulk delete for quantum the best
+ # thing we can do is delete the rules one by one and hope this
+ # works.... :/
+ for rule_id in range(0, len(rule_ids)):
+ quantum.delete_security_group_rule(rule_ids.pop())
+ except q_exc.QuantumClientException as e:
+ LOG.exception(_("Quantum Error unable to delete %s"),
+ rule_ids)
+ raise e
+
+ def get_rule(self, context, id):
+ quantum = quantumv2.get_client(context)
+ try:
+ rule = quantum.show_security_group_rule(
+ id).get('security_group_rule')
+ except q_exc.QuantumClientException as e:
+ if e.status_code == 404:
+ LOG.exception(_("Quantum Error getting security group rule "
+ "%s.") % id)
+ self.raise_not_found(e.message)
+ else:
+ LOG.error(_("Quantum Error: %s"), e)
+ raise e
+ return self._convert_to_nova_security_group_rule_format(rule)
+
+ def get_instance_security_groups(self, req, instance_id):
+ dict_security_groups = {}
+ security_group_name_map = {}
+ admin_context = context.get_admin_context()
+
+ quantum = quantumv2.get_client(admin_context)
+ params = {'device_id': instance_id}
+ ports = quantum.list_ports(**params)
+ security_groups = quantum.list_security_groups().get('security_groups')
+
+ for security_group in security_groups:
+ name = security_group.get('name')
+ # Since the name is optional for quantum security groups
+ if not name:
+ name = security_group['id']
+ security_group_name_map[security_group['id']] = name
+
+ for port in ports['ports']:
+ for security_group in port.get('security_groups', []):
+ try:
+ dict_security_groups[security_group] = (
+ security_group_name_map[security_group])
+ except KeyError:
+ # If this should only happen due to a race condition
+ # if the security group on a port was deleted after the
+ # ports were returned. We pass since this security group
+ # is no longer on the port.
+ pass
+ ret = []
+ for security_group in dict_security_groups.values():
+ ret.append({'name': security_group})
+ return ret
+
+ def _has_security_group_requirements(self, port):
+ port_security_enabled = port.get('port_security_enabled')
+ has_ip = port.get('fixed_ips')
+ if port_security_enabled and has_ip:
+ return True
+ else:
+ return False
+
+ @wrap_check_security_groups_policy
+ def add_to_instance(self, context, instance, security_group_name):
+ """Add security group to the instance."""
+
+ quantum = quantumv2.get_client(context)
+ try:
+ security_group_id = quantumv20.find_resourceid_by_name_or_id(
+ quantum, 'security_group', security_group_name)
+ except q_exc.QuantumClientException as e:
+ if e.status_code == 404:
+ msg = ("Security group %s is not found for project %s" %
+ (security_group_name, context.project_id))
+ self.raise_not_found(msg)
+ else:
+ LOG.exception(_("Quantum Error:"))
+ raise e
+ params = {'device_id': instance['uuid']}
+ try:
+ ports = quantum.list_ports(**params).get('ports')
+ except q_exc.QuantumClientException as e:
+ LOG.exception(_("Quantum Error:"))
+ raise e
+
+ if not ports:
+ msg = ("instance_id %s could not be found as device id on"
+ " any ports" % instance['uuid'])
+ self.raise_not_found(msg)
+
+ for port in ports:
+ if not self._has_security_group_requirements(port):
+ LOG.warn(_("Cannot add security group %(name)s to %(instance)s"
+ " since the port %(port_id)s does not meet security"
+ " requirements"), {'name': security_group_name,
+ 'instance': instance['uuid'], 'port_id': port['id']})
+ raise exception.SecurityGroupCannotBeApplied()
+ if 'security_groups' not in port:
+ port['security_groups'] = []
+ port['security_groups'].append(security_group_id)
+ updated_port = {'security_groups': port['security_groups']}
+ try:
+ LOG.info(_("Adding security group %(security_group_id)s to "
+ "port %(port_id)s"),
+ {'security_group_id': security_group_id,
+ 'port_id': port['id']})
+ quantum.update_port(port['id'], {'port': updated_port})
+ except Exception:
+ LOG.exception(_("Quantum Error:"))
+ raise
+
+ @wrap_check_security_groups_policy
+ def remove_from_instance(self, context, instance, security_group_name):
+ """Remove the security group associated with the instance."""
+ quantum = quantumv2.get_client(context)
+ try:
+ security_group_id = quantumv20.find_resourceid_by_name_or_id(
+ quantum, 'security_group', security_group_name)
+ except q_exc.QuantumClientException as e:
+ if e.status_code == 404:
+ msg = ("Security group %s is not found for project %s" %
+ (security_group_name, context.project_id))
+ self.raise_not_found(msg)
+ else:
+ LOG.exception(_("Quantum Error:"))
+ raise e
+ params = {'device_id': instance['uuid']}
+ try:
+ ports = quantum.list_ports(**params).get('ports')
+ except q_exc.QuantumClientException as e:
+ LOG.exception(_("Quantum Error:"))
+ raise e
+
+ if not ports:
+ msg = ("instance_id %s could not be found as device id on"
+ " any ports" % instance['uuid'])
+ self.raise_not_found(msg)
+
+ found_security_group = False
+ for port in ports:
+ try:
+ port.get('security_groups', []).remove(security_group_id)
+ except ValueError:
+ # When removing a security group from an instance the security
+ # group should be on both ports since it was added this way if
+ # done through the nova api. In case it is not a 404 is only
+ # raised if the security group is not found on any of the
+ # ports on the instance.
+ continue
+
+ updated_port = {'security_groups': port['security_groups']}
+ try:
+ LOG.info(_("Adding security group %(security_group_id)s to "
+ "port %(port_id)s"),
+ {'security_group_id': security_group_id,
+ 'port_id': port['id']})
+ quantum.update_port(port['id'], {'port': updated_port})
+ found_security_group = True
+ except Exception:
+ LOG.exception(_("Quantum Error:"))
+ raise e
+ if not found_security_group:
+ msg = (_("Security group %(security_group_name)s not assocaited "
+ "with the instance %(instance)s"),
+ {'security_group_name': security_group_name,
+ 'instance': instance['uuid']})
+ self.raise_not_found(msg)
+
+ def rule_exists(self, security_group, new_rule):
+ # Handled by quantum
+ pass
+
+ def populate_security_groups(self, instance, security_groups):
+ # Setting to emply list since we do not want to populate this field
+ # in the nova database if using the quantum driver
+ instance['security_groups'] = []
diff --git a/nova/network/security_group/security_group_base.py b/nova/network/security_group/security_group_base.py
index e6abf988a..499f808b1 100644
--- a/nova/network/security_group/security_group_base.py
+++ b/nova/network/security_group/security_group_base.py
@@ -191,3 +191,6 @@ class SecurityGroupBase(object):
def remove_from_instance(self, context, instance, security_group_name):
raise NotImplementedError()
+
+ def rule_exists(self, security_group, new_rule):
+ raise NotImplementedError()
diff --git a/nova/tests/api/openstack/compute/contrib/test_quantum_security_groups.py b/nova/tests/api/openstack/compute/contrib/test_quantum_security_groups.py
new file mode 100644
index 000000000..e32fadbb8
--- /dev/null
+++ b/nova/tests/api/openstack/compute/contrib/test_quantum_security_groups.py
@@ -0,0 +1,639 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Nicira, Inc.
+# All Rights Reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Aaron Rosen, Nicira Networks, Inc.
+
+import uuid
+
+from lxml import etree
+from oslo.config import cfg
+import webob
+
+from nova.api.openstack.compute.contrib import security_groups
+from nova.api.openstack import xmlutil
+from nova import compute
+from nova import context
+import nova.db
+from nova import exception
+from nova.network import quantumv2
+from nova.network.quantumv2 import api as quantum_api
+from nova.openstack.common import jsonutils
+from nova import test
+from nova.tests.api.openstack.compute.contrib import test_security_groups
+from nova.tests.api.openstack import fakes
+from quantumclient.common import exceptions as q_exc
+
+
+class TestQuantumSecurityGroupsTestCase(test.TestCase):
+ def setUp(self):
+ super(TestQuantumSecurityGroupsTestCase, self).setUp()
+ cfg.CONF.set_override('security_group_api', 'quantum')
+ self.original_client = quantumv2.get_client
+ quantumv2.get_client = get_client
+
+ def tearDown(self):
+ quantumv2.get_client = self.original_client
+ get_client()._reset()
+ super(TestQuantumSecurityGroupsTestCase, self).tearDown()
+
+
+class TestQuantumSecurityGroups(
+ test_security_groups.TestSecurityGroups,
+ TestQuantumSecurityGroupsTestCase):
+
+ def _create_sg_template(self, **kwargs):
+ sg = test_security_groups.security_group_template(**kwargs)
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
+ return self.controller.create(req, {'security_group': sg})
+
+ def _create_network(self):
+ body = {'network': {'name': 'net1'}}
+ quantum = get_client()
+ net = quantum.create_network(body)
+ body = {'subnet': {'network_id': net['network']['id'],
+ 'cidr': '10.0.0.0/24'}}
+ quantum.create_subnet(body)
+ return net
+
+ def _create_port(self, **kwargs):
+ body = {'port': {}}
+ fields = ['security_groups', 'device_id', 'network_id',
+ 'port_security_enabled']
+ for field in fields:
+ if field in kwargs:
+ body['port'][field] = kwargs[field]
+ quantum = get_client()
+ return quantum.create_port(body)
+
+ def test_create_security_group_with_no_description(self):
+ # Quantum's security group descirption field is optional.
+ pass
+
+ def test_create_security_group_with_blank_name(self):
+ # Quantum's security group name field is optional.
+ pass
+
+ def test_create_security_group_with_whitespace_name(self):
+ # Quantum allows security group name to be whitespace.
+ pass
+
+ def test_create_security_group_with_blank_description(self):
+ # Quantum's security group descirption field is optional.
+ pass
+
+ def test_create_security_group_with_whitespace_description(self):
+ # Quantum allows description to be whitespace.
+ pass
+
+ def test_create_security_group_with_duplicate_name(self):
+ # Quantum allows duplicate names for security groups.
+ pass
+
+ def test_create_security_group_non_string_name(self):
+ # Quantum allows security group name to be non string.
+ pass
+
+ def test_create_security_group_non_string_description(self):
+ # Quantum allows non string description.
+ pass
+
+ def test_create_security_group_quota_limit(self):
+ # Enforced by Quantum server.
+ pass
+
+ def test_get_security_group_list(self):
+ self._create_sg_template().get('security_group')
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
+ list_dict = self.controller.index(req)
+ self.assertEquals(len(list_dict['security_groups']), 2)
+
+ def test_get_security_group_list_all_tenants(self):
+ pass
+
+ def test_get_security_group_by_instance(self):
+ pass
+
+ def test_get_security_group_by_instance_non_existing(self):
+ pass
+
+ def test_get_security_group_by_id(self):
+ sg = self._create_sg_template().get('security_group')
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/%s'
+ % sg['id'])
+ res_dict = self.controller.show(req, sg['id'])
+ expected = {'security_group': sg}
+ self.assertEquals(res_dict, expected)
+
+ def test_delete_security_group_by_id(self):
+ sg = self._create_sg_template().get('security_group')
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/%s' %
+ sg['id'])
+ self.controller.delete(req, sg['id'])
+
+ def test_delete_security_group_in_use(self):
+ sg = self._create_sg_template().get('security_group')
+ self._create_network()
+ fake_instance = {'project_id': 'fake_tenant',
+ 'availability_zone': 'zone_one',
+ 'security_groups': [],
+ 'uuid': str(uuid.uuid4()),
+ 'display_name': 'test_instance'}
+ quantum = quantum_api.API()
+ quantum.allocate_for_instance(context.get_admin_context(),
+ fake_instance,
+ security_groups=[sg['id']])
+
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/%s'
+ % sg['id'])
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete,
+ req, sg['id'])
+
+ def test_associate_non_running_instance(self):
+ # Quantum does not care if the instance is running or not. When the
+ # instances is detected by quantum it will push down the security
+ # group policy to it.
+ pass
+
+ def test_associate_already_associated_security_group_to_instance(self):
+ # Quantum security groups does not raise an error if you update a
+ # port adding a security group to it that was already associated
+ # to the port. This is because PUT semantics are used.
+ pass
+
+ def test_associate(self):
+ sg = self._create_sg_template().get('security_group')
+ net = self._create_network()
+ self._create_port(
+ network_id=net['network']['id'], security_groups=[sg['id']],
+ device_id=test_security_groups.FAKE_UUID)
+
+ self.stubs.Set(nova.db, 'instance_get',
+ test_security_groups.return_server)
+ self.stubs.Set(nova.db, 'instance_get_by_uuid',
+ test_security_groups.return_server_by_uuid)
+ body = dict(addSecurityGroup=dict(name="test"))
+
+ req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ self.manager._addSecurityGroup(req, '1', body)
+
+ def test_disassociate_by_non_existing_security_group_name(self):
+ self.stubs.Set(nova.db, 'instance_get',
+ test_security_groups.return_server)
+ body = dict(removeSecurityGroup=dict(name='non-existing'))
+
+ req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.manager._removeSecurityGroup, req, '1', body)
+
+ def test_disassociate_non_running_instance(self):
+ # Quantum does not care if the instance is running or not. When the
+ # instances is detected by quantum it will push down the security
+ # group policy to it.
+ pass
+
+ def test_disassociate_already_associated_security_group_to_instance(self):
+ # Quantum security groups does not raise an error if you update a
+ # port adding a security group to it that was already associated
+ # to the port. This is because PUT semantics are used.
+ pass
+
+ def test_disassociate(self):
+ sg = self._create_sg_template().get('security_group')
+ net = self._create_network()
+ self._create_port(
+ network_id=net['network']['id'], security_groups=[sg['id']],
+ device_id=test_security_groups.FAKE_UUID)
+
+ self.stubs.Set(nova.db, 'instance_get',
+ test_security_groups.return_server)
+ self.stubs.Set(nova.db, 'instance_get_by_uuid',
+ test_security_groups.return_server_by_uuid)
+ body = dict(removeSecurityGroup=dict(name="test"))
+
+ req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ self.manager._removeSecurityGroup(req, '1', body)
+
+
+class TestQuantumSecurityGroupRulesTestCase(TestQuantumSecurityGroupsTestCase):
+ def setUp(self):
+ super(TestQuantumSecurityGroupRulesTestCase, self).setUp()
+ id1 = '11111111-1111-1111-1111-111111111111'
+ sg_template1 = test_security_groups.security_group_template(
+ security_group_rules=[], id=id1)
+ id2 = '22222222-2222-2222-2222-222222222222'
+ sg_template2 = test_security_groups.security_group_template(
+ security_group_rules=[], id=id2)
+ quantum = get_client()
+ quantum._fake_security_groups[id1] = sg_template1
+ quantum._fake_security_groups[id2] = sg_template2
+
+ def tearDown(self):
+ quantumv2.get_client = self.original_client
+ get_client()._reset()
+ super(TestQuantumSecurityGroupsTestCase, self).tearDown()
+
+
+class TestQuantumSecurityGroupRules(
+ test_security_groups.TestSecurityGroupRules,
+ TestQuantumSecurityGroupRulesTestCase):
+
+ def test_create_add_existing_rules_by_cidr(self):
+ # Enforced by quantum
+ pass
+
+ def test_create_add_existing_rules_by_group_id(self):
+ # Enforced by quantum
+ pass
+
+ def test_delete(self):
+ rule = test_security_groups.security_group_rule_template(
+ parent_group_id=self.sg2['id'])
+
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
+ res_dict = self.controller.create(req, {'security_group_rule': rule})
+ security_group_rule = res_dict['security_group_rule']
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules/%s'
+ % security_group_rule['id'])
+ self.controller.delete(req, security_group_rule['id'])
+
+ def test_create_rule_quota_limit(self):
+ # Enforced by quantum
+ pass
+
+
+class TestQuantumSecurityGroupsXMLDeserializer(
+ test_security_groups.TestSecurityGroupXMLDeserializer,
+ TestQuantumSecurityGroupsTestCase):
+ pass
+
+
+class TestQuantumSecurityGroupsXMLSerializer(
+ test_security_groups.TestSecurityGroupXMLSerializer,
+ TestQuantumSecurityGroupsTestCase):
+ pass
+
+
+class TestQuantumSecurityGroupsOutputTest(TestQuantumSecurityGroupsTestCase):
+ content_type = 'application/json'
+
+ def setUp(self):
+ super(TestQuantumSecurityGroupsOutputTest, self).setUp()
+ fakes.stub_out_nw_api(self.stubs)
+ self.controller = security_groups.SecurityGroupController()
+ self.stubs.Set(compute.api.API, 'get',
+ test_security_groups.fake_compute_get)
+ self.stubs.Set(compute.api.API, 'create',
+ test_security_groups.fake_compute_create)
+ self.flags(
+ osapi_compute_extension=[
+ 'nova.api.openstack.compute.contrib.select_extensions'],
+ osapi_compute_ext_list=['Security_groups'])
+
+ def _make_request(self, url, body=None):
+ req = webob.Request.blank(url)
+ if body:
+ req.method = 'POST'
+ req.body = self._encode_body(body)
+ req.content_type = self.content_type
+ req.headers['Accept'] = self.content_type
+ res = req.get_response(fakes.wsgi_app(init_only=('servers',)))
+ return res
+
+ def _encode_body(self, body):
+ return jsonutils.dumps(body)
+
+ def _get_server(self, body):
+ return jsonutils.loads(body).get('server')
+
+ def _get_servers(self, body):
+ return jsonutils.loads(body).get('servers')
+
+ def _get_groups(self, server):
+ return server.get('security_groups')
+
+ def test_create(self):
+ url = '/v2/fake/servers'
+ image_uuid = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
+ security_groups = [{'name': 'fake-2-0'}, {'name': 'fake-2-1'}]
+ for security_group in security_groups:
+ sg = test_security_groups.security_group_template(
+ name=security_group['name'])
+ self.controller.create(req, {'security_group': sg})
+
+ server = dict(name='server_test', imageRef=image_uuid, flavorRef=2,
+ security_groups=security_groups)
+ res = self._make_request(url, {'server': server})
+ self.assertEqual(res.status_int, 202)
+ server = self._get_server(res.body)
+ for i, group in enumerate(self._get_groups(server)):
+ name = 'fake-2-%s' % i
+ self.assertEqual(group.get('name'), name)
+
+ def test_show(self):
+ url = '/v2/fake/servers'
+ image_uuid = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
+ security_groups = [{'name': 'fake-2-0'}, {'name': 'fake-2-1'}]
+ for security_group in security_groups:
+ sg = test_security_groups.security_group_template(
+ name=security_group['name'])
+ self.controller.create(req, {'security_group': sg})
+ server = dict(name='server_test', imageRef=image_uuid, flavorRef=2,
+ security_groups=security_groups)
+
+ res = self._make_request(url, {'server': server})
+ self.assertEqual(res.status_int, 202)
+ server = self._get_server(res.body)
+ for i, group in enumerate(self._get_groups(server)):
+ name = 'fake-2-%s' % i
+ self.assertEqual(group.get('name'), name)
+
+ def test_detail(self):
+ url = '/v2/fake/servers/detail'
+ res = self._make_request(url)
+
+ self.assertEqual(res.status_int, 200)
+ for i, server in enumerate(self._get_servers(res.body)):
+ for j, group in enumerate(self._get_groups(server)):
+ name = 'fake-%s-%s' % (i, j)
+ self.assertEqual(group.get('name'), name)
+
+ def test_no_instance_passthrough_404(self):
+
+ def fake_compute_get(*args, **kwargs):
+ raise exception.InstanceNotFound(instance_id='fake')
+
+ self.stubs.Set(compute.api.API, 'get', fake_compute_get)
+ url = '/v2/fake/servers/70f6db34-de8d-4fbd-aafb-4065bdfa6115'
+ res = self._make_request(url)
+
+ self.assertEqual(res.status_int, 404)
+
+
+class TestQuantumSecurityGroupsOutputXMLTest(
+ TestQuantumSecurityGroupsOutputTest):
+
+ content_type = 'application/xml'
+
+ class MinimalCreateServerTemplate(xmlutil.TemplateBuilder):
+ def construct(self):
+ root = xmlutil.TemplateElement('server', selector='server')
+ root.set('name')
+ root.set('id')
+ root.set('imageRef')
+ root.set('flavorRef')
+ elem = xmlutil.SubTemplateElement(root, 'security_groups')
+ sg = xmlutil.SubTemplateElement(elem, 'security_group',
+ selector='security_groups')
+ sg.set('name')
+ return xmlutil.MasterTemplate(root, 1,
+ nsmap={None: xmlutil.XMLNS_V11})
+
+ def _encode_body(self, body):
+ serializer = self.MinimalCreateServerTemplate()
+ return serializer.serialize(body)
+
+ def _get_server(self, body):
+ return etree.XML(body)
+
+ def _get_servers(self, body):
+ return etree.XML(body).getchildren()
+
+ def _get_groups(self, server):
+ # NOTE(vish): we are adding security groups without an extension
+ # namespace so we don't break people using the existing
+ # functionality, but that means we need to use find with
+ # the existing server namespace.
+ namespace = server.nsmap[None]
+ return server.find('{%s}security_groups' % namespace).getchildren()
+
+
+def get_client(context=None, admin=False):
+ return MockClient()
+
+
+class MockClient(object):
+
+ # Needs to be global to survive multiple calls to get_client.
+ _fake_security_groups = {}
+ _fake_ports = {}
+ _fake_networks = {}
+ _fake_subnets = {}
+ _fake_security_group_rules = {}
+
+ def __init__(self):
+ # add default security group
+ if not len(self._fake_security_groups):
+ ret = {'name': 'default', 'description': 'default',
+ 'tenant_id': 'fake_tenant', 'security_group_rules': [],
+ 'id': str(uuid.uuid4())}
+ self._fake_security_groups[ret['id']] = ret
+
+ def _reset(self):
+ self._fake_security_groups.clear()
+ self._fake_ports.clear()
+ self._fake_networks.clear()
+ self._fake_subnets.clear()
+ self._fake_security_group_rules.clear()
+
+ def create_security_group(self, body=None):
+ s = body.get('security_group')
+ if len(s.get('name')) > 255 or len(s.get('description')) > 255:
+ msg = 'Security Group name great than 255'
+ raise q_exc.QuantumClientException(message=msg, status_code=401)
+ ret = {'name': s.get('name'), 'description': s.get('description'),
+ 'tenant_id': 'fake_tenant', 'security_group_rules': [],
+ 'id': str(uuid.uuid4())}
+
+ self._fake_security_groups[ret['id']] = ret
+ return {'security_group': ret}
+
+ def create_network(self, body):
+ n = body.get('network')
+ ret = {'status': 'ACTIVE', 'subnets': [], 'name': n.get('name'),
+ 'admin_state_up': n.get('admin_state_up', True),
+ 'tenant_id': 'fake_tenant',
+ 'port_security_enabled': n.get('port_security_enabled', True),
+ 'id': str(uuid.uuid4())}
+ self._fake_networks[ret['id']] = ret
+ return {'network': ret}
+
+ def create_subnet(self, body):
+ s = body.get('subnet')
+ try:
+ net = self._fake_networks[s.get('network_id')]
+ except KeyError:
+ msg = 'Network %s not found' % s.get('network_id')
+ raise q_exc.QuantumClientException(message=msg, status_code=404)
+ ret = {'name': s.get('name'), 'network_id': s.get('network_id'),
+ 'tenant_id': 'fake_tenant', 'cidr': s.get('cidr'),
+ 'id': str(uuid.uuid4()), 'gateway_ip': '10.0.0.1'}
+ net['subnets'].append(ret['id'])
+ self._fake_networks[net['id']] = net
+ self._fake_subnets[ret['id']] = ret
+ return {'subnet': ret}
+
+ def create_port(self, body):
+ p = body.get('port')
+ ret = {'status': 'ACTIVE', 'id': str(uuid.uuid4()),
+ 'mac_address': p.get('mac_address', 'fa:16:3e:b8:f5:fb'),
+ 'port_security_enabled': p.get('port_security_enabled'),
+ 'device_owner': str(uuid.uuid4())}
+
+ fields = ['network_id', 'security_groups', 'admin_state_up']
+ for field in fields:
+ ret[field] = p.get(field)
+
+ network = self._fake_networks[p['network_id']]
+ if not ret['port_security_enabled']:
+ ret['port_security_enabled'] = network['port_security_enabled']
+ if network['subnets']:
+ ret['fixed_ips'] = [{'subnet_id': network['subnets'][0],
+ 'ip_address': '10.0.0.1'}]
+ if not ret['security_groups']:
+ for security_group in self._fake_security_groups.values():
+ if security_group['name'] == 'default':
+ ret['security_groups'] = [security_group['id']]
+ break
+ self._fake_ports[ret['id']] = ret
+ return {'port': ret}
+
+ def create_security_group_rule(self, body):
+ # does not handle bulk case so just picks rule[0]
+ r = body.get('security_group_rules')[0]
+ fields = ['direction', 'protocol', 'port_range_min', 'port_range_max',
+ 'ethertype', 'source_ip_prefix', 'tenant_id',
+ 'security_group_id', 'source_group_id']
+ ret = {}
+ for field in fields:
+ ret[field] = r.get(field)
+ ret['id'] = str(uuid.uuid4())
+ self._fake_security_group_rules[ret['id']] = ret
+ return {'security_group_rules': [ret]}
+
+ def show_security_group(self, security_group, **_params):
+ try:
+ return {'security_group':
+ self._fake_security_groups[security_group]}
+ except KeyError:
+ msg = 'Security Group %s not found' % security_group
+ raise q_exc.QuantumClientException(message=msg, status_code=404)
+
+ def show_security_group_rule(self, security_group_rule, **_params):
+ try:
+ return {'security_group_rule':
+ self._fake_security_group_rules[security_group_rule]}
+ except KeyError:
+ msg = 'Security Group rule %s not found' % security_group_rule
+ raise q_exc.QuantumClientException(message=msg, status_code=404)
+
+ def show_network(self, network, **_params):
+ try:
+ return {'network':
+ self._fake_networks[network]}
+ except KeyError:
+ msg = 'Network %s not found' % network
+ raise q_exc.QuantumClientException(message=msg, status_code=404)
+
+ def show_port(self, port, **_params):
+ try:
+ return {'port':
+ self._fake_ports[port]}
+ except KeyError:
+ msg = 'Port %s not found' % port
+ raise q_exc.QuantumClientException(message=msg, status_code=404)
+
+ def show_subnet(self, subnet, **_params):
+ try:
+ return {'subnet':
+ self._fake_subnets[subnet]}
+ except KeyError:
+ msg = 'Port %s not found' % subnet
+ raise q_exc.QuantumClientException(message=msg, status_code=404)
+
+ def list_security_groups(self, **_params):
+ ret = []
+ for security_group in self._fake_security_groups.values():
+ if _params.get('name'):
+ if security_group.get('name') == _params['name']:
+ ret.append(security_group)
+ else:
+ ret.append(security_group)
+ return {'security_groups': ret}
+
+ def list_networks(self, **_params):
+ return {'networks':
+ [network for network in self._fake_networks.values()]}
+
+ def list_ports(self, **_params):
+ return {'ports':
+ [port for port in self._fake_ports.values()]}
+
+ def list_subnets(self, **_params):
+ return {'subnets':
+ [subnet for subnet in self._fake_subnets.values()]}
+
+ def delete_security_group(self, security_group):
+ self.show_security_group(security_group)
+ ports = self.list_ports()
+ for port in ports.get('ports'):
+ for sg_port in port['security_groups']:
+ if sg_port == security_group:
+ msg = ('Unable to delete Security group %s in use'
+ % security_group)
+ raise q_exc.QuantumClientException(message=msg,
+ status_code=409)
+ del self._fake_security_groups[security_group]
+
+ def delete_security_group_rule(self, security_group_rule):
+ self.show_security_group_rule(security_group_rule)
+ del self._fake_security_group_rules[security_group_rule]
+
+ def delete_network(self, network):
+ self.show_network(network)
+ self._check_ports_on_network(network)
+ for subnet in self._fake_subnets.values():
+ if subnet['network_id'] == network:
+ del self._fake_subnets[subnet['id']]
+ del self._fake_networks[network]
+
+ def delete_subnet(self, subnet):
+ subnet = self.show_subnet(subnet).get('subnet')
+ self._check_ports_on_network(subnet['network_id'])
+ del self._fake_subnet[subnet]
+
+ def delete_port(self, port):
+ self.show_port(port)
+ del self._fake_ports[port]
+
+ def update_port(self, port, body=None):
+ self.show_port(port)
+ self._fake_ports[port].update(body['port'])
+ return {'port': self._fake_ports[port]}
+
+ def list_extensions(self, **_parms):
+ return {'extensions': []}
+
+ def _check_ports_on_network(self, network):
+ ports = self.list_ports()
+ for port in ports:
+ if port['network_id'] == network:
+ msg = ('Unable to complete operation on network %s. There is '
+ 'one or more ports still in use on the network'
+ % network)
+ raise q_exc.QuantumClientException(message=msg, status_code=409)
diff --git a/nova/tests/api/openstack/compute/contrib/test_security_groups.py b/nova/tests/api/openstack/compute/contrib/test_security_groups.py
index 1b254c2a7..5058f17ac 100644
--- a/nova/tests/api/openstack/compute/contrib/test_security_groups.py
+++ b/nova/tests/api/openstack/compute/contrib/test_security_groups.py
@@ -118,6 +118,14 @@ class TestSecurityGroups(test.TestCase):
security_groups.ServerSecurityGroupController())
self.manager = security_groups.SecurityGroupActionController()
+ # This needs to be done here to set fake_id because the derived
+ # class needs to be called first if it wants to set
+ # 'security_group_api' and this setUp method needs to be called.
+ if self.controller.security_group_api.id_is_uuid:
+ self.fake_id = '11111111-1111-1111-1111-111111111111'
+ else:
+ self.fake_id = '11111111'
+
def _assert_no_security_groups_reserved(self, context):
"""Check that no reservations are leaked during tests."""
result = quota.QUOTAS.get_project_quotas(context, context.project_id)
@@ -392,9 +400,10 @@ class TestSecurityGroups(test.TestCase):
req, 'invalid')
def test_get_security_group_by_non_existing_id(self):
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/111111111')
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/%s' %
+ self.fake_id)
self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete,
- req, '111111111')
+ req, self.fake_id)
def test_delete_security_group_by_id(self):
sg = security_group_template(id=1, rules=[])
@@ -424,9 +433,10 @@ class TestSecurityGroups(test.TestCase):
req, 'invalid')
def test_delete_security_group_by_non_existing_id(self):
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/11111111')
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/%s'
+ % self.fake_id)
self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete,
- req, '11111111')
+ req, self.fake_id)
def test_delete_security_group_in_use(self):
sg = security_group_template(id=1, rules=[])
@@ -639,12 +649,23 @@ class TestSecurityGroupRules(test.TestCase):
def setUp(self):
super(TestSecurityGroupRules, self).setUp()
- sg1 = security_group_template(id=1)
- sg2 = security_group_template(id=2,
- name='authorize_revoke',
- description='authorize-revoke testing')
- db1 = security_group_db(sg1)
- db2 = security_group_db(sg2)
+ self.controller = security_groups.SecurityGroupController()
+ if self.controller.security_group_api.id_is_uuid:
+ id1 = '11111111-1111-1111-1111-111111111111'
+ id2 = '22222222-2222-2222-2222-222222222222'
+ self.invalid_id = '33333333-3333-3333-3333-333333333333'
+ else:
+ id1 = 1
+ id2 = 2
+ self.invalid_id = '33333333'
+
+ self.sg1 = security_group_template(id=id1)
+ self.sg2 = security_group_template(
+ id=id2, name='authorize_revoke',
+ description='authorize-revoke testing')
+
+ db1 = security_group_db(self.sg1)
+ db2 = security_group_db(self.sg2)
def return_security_group(context, group_id):
if group_id == db1['id']:
@@ -661,41 +682,47 @@ class TestSecurityGroupRules(test.TestCase):
self.controller = security_groups.SecurityGroupRulesController()
def test_create_by_cidr(self):
- rule = security_group_rule_template(cidr='10.2.3.124/24')
+ rule = security_group_rule_template(cidr='10.2.3.124/24',
+ parent_group_id=self.sg2['id'])
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
res_dict = self.controller.create(req, {'security_group_rule': rule})
-
security_group_rule = res_dict['security_group_rule']
self.assertNotEquals(security_group_rule['id'], 0)
- self.assertEquals(security_group_rule['parent_group_id'], 2)
+ self.assertEquals(security_group_rule['parent_group_id'],
+ self.sg2['id'])
self.assertEquals(security_group_rule['ip_range']['cidr'],
"10.2.3.124/24")
def test_create_by_group_id(self):
- rule = security_group_rule_template(group_id=1)
+ rule = security_group_rule_template(group_id=self.sg1['id'],
+ parent_group_id=self.sg2['id'])
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
res_dict = self.controller.create(req, {'security_group_rule': rule})
security_group_rule = res_dict['security_group_rule']
self.assertNotEquals(security_group_rule['id'], 0)
- self.assertEquals(security_group_rule['parent_group_id'], 2)
+ self.assertEquals(security_group_rule['parent_group_id'],
+ self.sg2['id'])
def test_create_by_same_group_id(self):
- rule1 = security_group_rule_template(group_id=1, from_port=80,
- to_port=80)
+ rule1 = security_group_rule_template(group_id=self.sg1['id'],
+ from_port=80, to_port=80,
+ parent_group_id=self.sg2['id'])
self.parent_security_group['rules'] = [security_group_rule_db(rule1)]
- rule2 = security_group_rule_template(group_id=1, from_port=81,
- to_port=81)
+ rule2 = security_group_rule_template(group_id=self.sg1['id'],
+ from_port=81, to_port=81,
+ parent_group_id=self.sg2['id'])
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
res_dict = self.controller.create(req, {'security_group_rule': rule2})
security_group_rule = res_dict['security_group_rule']
self.assertNotEquals(security_group_rule['id'], 0)
- self.assertEquals(security_group_rule['parent_group_id'], 2)
+ self.assertEquals(security_group_rule['parent_group_id'],
+ self.sg2['id'])
self.assertEquals(security_group_rule['from_port'], 81)
self.assertEquals(security_group_rule['to_port'], 81)
@@ -705,13 +732,13 @@ class TestSecurityGroupRules(test.TestCase):
"ip_protocol": "tcp",
"from_port": "22",
"to_port": "22",
- "parent_group_id": 2,
+ "parent_group_id": self.sg2['id'],
"cidr": "10.2.3.124/2433"}}
rule = security_group_rule_template(
ip_protocol="tcp",
from_port=22,
to_port=22,
- parent_group_id=2,
+ parent_group_id=self.sg2['id'],
cidr="10.2.3.124/2433")
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
@@ -722,7 +749,7 @@ class TestSecurityGroupRules(test.TestCase):
ip_protocol="tcp",
from_port=75534,
to_port=22,
- parent_group_id=2,
+ parent_group_id=self.sg2['id'],
cidr="10.2.3.124/24")
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
@@ -734,14 +761,15 @@ class TestSecurityGroupRules(test.TestCase):
ip_protocol="icmp",
from_port=1,
to_port=256,
- parent_group_id=2,
+ parent_group_id=self.sg2['id'],
cidr="10.2.3.124/24")
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
req, {'security_group_rule': rule})
def test_create_add_existing_rules_by_cidr(self):
- rule = security_group_rule_template(cidr='10.0.0.0/24')
+ rule = security_group_rule_template(cidr='10.0.0.0/24',
+ parent_group_id=self.sg2['id'])
self.parent_security_group['rules'] = [security_group_rule_db(rule)]
@@ -778,7 +806,7 @@ class TestSecurityGroupRules(test.TestCase):
def test_create_with_non_existing_parent_group_id(self):
rule = security_group_rule_template(group_id='invalid',
- parent_group_id='1111111111111')
+ parent_group_id=self.invalid_id)
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
self.assertRaises(webob.exc.HTTPNotFound, self.controller.create,
@@ -786,14 +814,16 @@ class TestSecurityGroupRules(test.TestCase):
def test_create_with_invalid_protocol(self):
rule = security_group_rule_template(ip_protocol='invalid-protocol',
- cidr='10.2.2.0/24')
+ cidr='10.2.2.0/24',
+ parent_group_id=self.sg2['id'])
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
req, {'security_group_rule': rule})
def test_create_with_no_protocol(self):
- rule = security_group_rule_template(cidr='10.2.2.0/24')
+ rule = security_group_rule_template(cidr='10.2.2.0/24',
+ parent_group_id=self.sg2['id'])
del rule['ip_protocol']
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
@@ -802,7 +832,8 @@ class TestSecurityGroupRules(test.TestCase):
def test_create_with_invalid_from_port(self):
rule = security_group_rule_template(from_port='666666',
- cidr='10.2.2.0/24')
+ cidr='10.2.2.0/24',
+ parent_group_id=self.sg2['id'])
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
@@ -810,7 +841,8 @@ class TestSecurityGroupRules(test.TestCase):
def test_create_with_invalid_to_port(self):
rule = security_group_rule_template(to_port='666666',
- cidr='10.2.2.0/24')
+ cidr='10.2.2.0/24',
+ parent_group_id=self.sg2['id'])
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
@@ -818,7 +850,8 @@ class TestSecurityGroupRules(test.TestCase):
def test_create_with_non_numerical_from_port(self):
rule = security_group_rule_template(from_port='invalid',
- cidr='10.2.2.0/24')
+ cidr='10.2.2.0/24',
+ parent_group_id=self.sg2['id'])
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
@@ -826,14 +859,16 @@ class TestSecurityGroupRules(test.TestCase):
def test_create_with_non_numerical_to_port(self):
rule = security_group_rule_template(to_port='invalid',
- cidr='10.2.2.0/24')
+ cidr='10.2.2.0/24',
+ parent_group_id=self.sg2['id'])
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
req, {'security_group_rule': rule})
def test_create_with_no_from_port(self):
- rule = security_group_rule_template(cidr='10.2.2.0/24')
+ rule = security_group_rule_template(cidr='10.2.2.0/24',
+ parent_group_id=self.sg2['id'])
del rule['from_port']
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
@@ -841,7 +876,8 @@ class TestSecurityGroupRules(test.TestCase):
req, {'security_group_rule': rule})
def test_create_with_no_to_port(self):
- rule = security_group_rule_template(cidr='10.2.2.0/24')
+ rule = security_group_rule_template(cidr='10.2.2.0/24',
+ parent_group_id=self.sg2['id'])
del rule['to_port']
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
@@ -849,14 +885,15 @@ class TestSecurityGroupRules(test.TestCase):
req, {'security_group_rule': rule})
def test_create_with_invalid_cidr(self):
- rule = security_group_rule_template(cidr='10.2.2222.0/24')
+ rule = security_group_rule_template(cidr='10.2.2222.0/24',
+ parent_group_id=self.sg2['id'])
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
req, {'security_group_rule': rule})
def test_create_with_no_cidr_group(self):
- rule = security_group_rule_template()
+ rule = security_group_rule_template(parent_group_id=self.sg2['id'])
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
res_dict = self.controller.create(req, {'security_group_rule': rule})
@@ -869,54 +906,59 @@ class TestSecurityGroupRules(test.TestCase):
"0.0.0.0/0")
def test_create_with_invalid_group_id(self):
- rule = security_group_rule_template(group_id='invalid')
+ rule = security_group_rule_template(group_id='invalid',
+ parent_group_id=self.sg2['id'])
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
req, {'security_group_rule': rule})
def test_create_with_empty_group_id(self):
- rule = security_group_rule_template(group_id='')
+ rule = security_group_rule_template(group_id='',
+ parent_group_id=self.sg2['id'])
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
req, {'security_group_rule': rule})
def test_create_with_nonexist_group_id(self):
- rule = security_group_rule_template(group_id='222222')
+ rule = security_group_rule_template(group_id=self.invalid_id,
+ parent_group_id=self.sg2['id'])
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
req, {'security_group_rule': rule})
def test_create_with_same_group_parent_id_and_group_id(self):
- rule = security_group_rule_template(group_id=1, parent_group_id=1)
-
+ rule = security_group_rule_template(group_id=self.sg1['id'],
+ parent_group_id=self.sg1['id'])
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
res_dict = self.controller.create(req, {'security_group_rule': rule})
security_group_rule = res_dict['security_group_rule']
self.assertNotEquals(security_group_rule['id'], 0)
- self.assertEquals(security_group_rule['parent_group_id'], 1)
- self.assertEquals(security_group_rule['id'], 1)
+ self.assertEquals(security_group_rule['parent_group_id'],
+ self.sg1['id'])
+ self.assertEquals(security_group_rule['group']['name'],
+ self.sg1['name'])
def _test_create_with_no_ports_and_no_group(self, proto):
- rule = {'ip_protocol': proto, 'parent_group_id': '2'}
+ rule = {'ip_protocol': proto, 'parent_group_id': self.sg2['id']}
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
req, {'security_group_rule': rule})
def _test_create_with_no_ports(self, proto):
- rule = {'ip_protocol': proto, 'parent_group_id': '2', 'group_id': '1'}
+ rule = {'ip_protocol': proto, 'parent_group_id': self.sg2['id'],
+ 'group_id': self.sg1['id']}
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
res_dict = self.controller.create(req, {'security_group_rule': rule})
-
security_group_rule = res_dict['security_group_rule']
expected_rule = {
'from_port': 1, 'group': {'tenant_id': '123', 'name': 'test'},
- 'ip_protocol': proto, 'to_port': 65535, 'parent_group_id': 2,
- 'ip_range': {}, 'id': 1
+ 'ip_protocol': proto, 'to_port': 65535, 'parent_group_id':
+ self.sg2['id'], 'ip_range': {}, 'id': security_group_rule['id']
}
if proto == 'icmp':
expected_rule['to_port'] = -1
@@ -935,10 +977,10 @@ class TestSecurityGroupRules(test.TestCase):
self._test_create_with_no_ports_and_no_group('udp')
self._test_create_with_no_ports('udp')
- def _test_create_with_ports(self, id_val, proto, from_port, to_port):
+ def _test_create_with_ports(self, proto, from_port, to_port):
rule = {
'ip_protocol': proto, 'from_port': from_port, 'to_port': to_port,
- 'parent_group_id': '2', 'group_id': '1'
+ 'parent_group_id': self.sg2['id'], 'group_id': self.sg1['id']
}
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
res_dict = self.controller.create(req, {'security_group_rule': rule})
@@ -947,32 +989,32 @@ class TestSecurityGroupRules(test.TestCase):
expected_rule = {
'from_port': from_port,
'group': {'tenant_id': '123', 'name': 'test'},
- 'ip_protocol': proto, 'to_port': to_port, 'parent_group_id': 2,
- 'ip_range': {}, 'id': id_val
+ 'ip_protocol': proto, 'to_port': to_port, 'parent_group_id':
+ self.sg2['id'], 'ip_range': {}, 'id': security_group_rule['id']
}
self.assertTrue(security_group_rule['ip_protocol'] == proto)
- self.assertTrue(security_group_rule['id'] == id_val)
self.assertTrue(security_group_rule['from_port'] == from_port)
self.assertTrue(security_group_rule['to_port'] == to_port)
self.assertTrue(security_group_rule == expected_rule)
def test_create_with_ports_icmp(self):
- self._test_create_with_ports(1, 'icmp', 0, 1)
- self._test_create_with_ports(2, 'icmp', 0, 0)
- self._test_create_with_ports(3, 'icmp', 1, 0)
+ self._test_create_with_ports('icmp', 0, 1)
+ self._test_create_with_ports('icmp', 0, 0)
+ self._test_create_with_ports('icmp', 1, 0)
def test_create_with_ports_tcp(self):
- self._test_create_with_ports(1, 'tcp', 1, 1)
- self._test_create_with_ports(2, 'tcp', 1, 65535)
- self._test_create_with_ports(3, 'tcp', 65535, 65535)
+ self._test_create_with_ports('tcp', 1, 1)
+ self._test_create_with_ports('tcp', 1, 65535)
+ self._test_create_with_ports('tcp', 65535, 65535)
def test_create_with_ports_udp(self):
- self._test_create_with_ports(1, 'udp', 1, 1)
- self._test_create_with_ports(2, 'udp', 1, 65535)
- self._test_create_with_ports(3, 'udp', 65535, 65535)
+ self._test_create_with_ports('udp', 1, 1)
+ self._test_create_with_ports('udp', 1, 65535)
+ self._test_create_with_ports('udp', 65535, 65535)
def test_delete(self):
- rule = security_group_rule_template(id=10)
+ rule = security_group_rule_template(id=self.sg2['id'],
+ parent_group_id=self.sg2['id'])
def security_group_rule_get(context, id):
return security_group_rule_db(rule)
@@ -985,8 +1027,9 @@ class TestSecurityGroupRules(test.TestCase):
self.stubs.Set(nova.db, 'security_group_rule_destroy',
security_group_rule_destroy)
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules/10')
- self.controller.delete(req, '10')
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules/%s'
+ % self.sg2['id'])
+ self.controller.delete(req, self.sg2['id'])
def test_delete_invalid_rule_id(self):
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules' +
@@ -995,30 +1038,32 @@ class TestSecurityGroupRules(test.TestCase):
req, 'invalid')
def test_delete_non_existing_rule_id(self):
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules' +
- '/22222222222222')
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules/%s'
+ % self.invalid_id)
self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete,
- req, '22222222222222')
+ req, self.invalid_id)
def test_create_rule_quota_limit(self):
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
for num in range(100, 100 + CONF.quota_security_group_rules):
rule = {
'ip_protocol': 'tcp', 'from_port': num,
- 'to_port': num, 'parent_group_id': '2', 'group_id': '1'
+ 'to_port': num, 'parent_group_id': self.sg2['id'],
+ 'group_id': self.sg1['id']
}
self.controller.create(req, {'security_group_rule': rule})
rule = {
'ip_protocol': 'tcp', 'from_port': '121', 'to_port': '121',
- 'parent_group_id': '2', 'group_id': '1'
+ 'parent_group_id': self.sg2['id'], 'group_id': self.sg1['id']
}
self.assertRaises(exception.SecurityGroupLimitExceeded,
self.controller.create,
req, {'security_group_rule': rule})
def test_create_rule_cidr_allow_all(self):
- rule = security_group_rule_template(cidr='0.0.0.0/0')
+ rule = security_group_rule_template(cidr='0.0.0.0/0',
+ parent_group_id=self.sg2['id'])
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
res_dict = self.controller.create(req, {'security_group_rule': rule})
@@ -1031,7 +1076,8 @@ class TestSecurityGroupRules(test.TestCase):
"0.0.0.0/0")
def test_create_rule_cidr_allow_some(self):
- rule = security_group_rule_template(cidr='15.0.0.0/8')
+ rule = security_group_rule_template(cidr='15.0.0.0/8',
+ parent_group_id=self.sg2['id'])
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
res_dict = self.controller.create(req, {'security_group_rule': rule})
@@ -1340,6 +1386,7 @@ class SecurityGroupsOutputTest(test.TestCase):
def setUp(self):
super(SecurityGroupsOutputTest, self).setUp()
+ self.controller = security_groups.SecurityGroupController()
fakes.stub_out_nw_api(self.stubs)
self.stubs.Set(compute.api.API, 'get', fake_compute_get)
self.stubs.Set(compute.api.API, 'get_all', fake_compute_get_all)
diff --git a/nova/tests/compute/test_compute.py b/nova/tests/compute/test_compute.py
index 542a7e162..e19470db5 100644
--- a/nova/tests/compute/test_compute.py
+++ b/nova/tests/compute/test_compute.py
@@ -1662,7 +1662,8 @@ class ComputeTestCase(BaseTestCase):
mox.IgnoreArg(),
requested_networks=None,
vpn=False, macs=macs,
- conductor_api=self.compute.conductor_api).AndReturn(
+ conductor_api=self.compute.conductor_api,
+ security_groups=[]).AndReturn(
fake_network.fake_get_instance_nw_info(self.stubs, 1, 1,
spectacular=True))
self.mox.StubOutWithMock(self.compute.driver, "macs_for_instance")
@@ -1681,7 +1682,8 @@ class ComputeTestCase(BaseTestCase):
mox.IgnoreArg(),
requested_networks=None,
vpn=False, macs=None,
- conductor_api=self.compute.conductor_api
+ conductor_api=self.compute.conductor_api,
+ security_groups=[]
).AndRaise(rpc_common.RemoteError())
fake_network.unset_stub_network_methods(self.stubs)
diff --git a/nova/utils.py b/nova/utils.py
index 2dcedb5d5..2c7d0b427 100644
--- a/nova/utils.py
+++ b/nova/utils.py
@@ -983,6 +983,15 @@ def is_valid_cidr(address):
return True
+def get_ip_version(network):
+ """Returns the IP version of a network (IPv4 or IPv6). Raises
+ AddrFormatError if invalid network."""
+ if netaddr.IPNetwork(network).version == 6:
+ return "IPv6"
+ elif netaddr.IPNetwork(network).version == 4:
+ return "IPv4"
+
+
def monkey_patch():
"""If the Flags.monkey_patch set as True,
this function patches a decorator