summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTushar Patil <tushar.vitthal.patil@gmail.com>2011-08-12 16:48:13 -0700
committerTushar Patil <tushar.vitthal.patil@gmail.com>2011-08-12 16:48:13 -0700
commit19a4ddaf157ebb388cce37ddc142dfad304b8cf0 (patch)
tree792a16f922c71a6fa11d41bb3c5824fb834c965c
parent7aef19a8757dc9558b1c0d83cb1fb08ac976cf5b (diff)
downloadnova-19a4ddaf157ebb388cce37ddc142dfad304b8cf0.tar.gz
nova-19a4ddaf157ebb388cce37ddc142dfad304b8cf0.tar.xz
nova-19a4ddaf157ebb388cce37ddc142dfad304b8cf0.zip
Added add securitygroup to instance and remove securitygroup from instance functionality
-rw-r--r--nova/api/openstack/contrib/security_groups.py199
-rw-r--r--nova/api/openstack/create_instance_helper.py30
-rw-r--r--nova/db/api.py6
-rw-r--r--nova/db/sqlalchemy/api.py15
-rw-r--r--nova/tests/api/openstack/contrib/test_security_groups.py299
5 files changed, 530 insertions, 19 deletions
diff --git a/nova/api/openstack/contrib/security_groups.py b/nova/api/openstack/contrib/security_groups.py
index 6c57fbb51..a104a42e4 100644
--- a/nova/api/openstack/contrib/security_groups.py
+++ b/nova/api/openstack/contrib/security_groups.py
@@ -25,10 +25,11 @@ from nova import db
from nova import exception
from nova import flags
from nova import log as logging
+from nova import rpc
from nova.api.openstack import common
from nova.api.openstack import extensions
from nova.api.openstack import wsgi
-
+from nova.compute import power_state
from xml.dom import minidom
@@ -73,33 +74,28 @@ class SecurityGroupController(object):
context, rule)]
return security_group
- def show(self, req, id):
- """Return data about the given security group."""
- context = req.environ['nova.context']
+ def _get_security_group(self, context, id):
try:
id = int(id)
security_group = db.security_group_get(context, id)
except ValueError:
- msg = _("Security group id is not integer")
- return exc.HTTPBadRequest(explanation=msg)
+ msg = _("Security group id should be integer")
+ raise exc.HTTPBadRequest(explanation=msg)
except exception.NotFound as exp:
- return exc.HTTPNotFound(explanation=unicode(exp))
+ raise exc.HTTPNotFound(explanation=unicode(exp))
+ return security_group
+ def show(self, req, id):
+ """Return data about the given security group."""
+ context = req.environ['nova.context']
+ security_group = self._get_security_group(context, id)
return {'security_group': self._format_security_group(context,
security_group)}
def delete(self, req, id):
"""Delete a security group."""
context = req.environ['nova.context']
- try:
- id = int(id)
- security_group = db.security_group_get(context, id)
- except ValueError:
- msg = _("Security group id is not integer")
- return exc.HTTPBadRequest(explanation=msg)
- except exception.SecurityGroupNotFound as exp:
- return exc.HTTPNotFound(explanation=unicode(exp))
-
+ security_group = self._get_security_group(context, id)
LOG.audit(_("Delete security group %s"), id, context=context)
db.security_group_destroy(context, security_group.id)
@@ -172,6 +168,135 @@ class SecurityGroupController(object):
"than 255 characters.") % typ
raise exc.HTTPBadRequest(explanation=msg)
+ def associate(self, req, id, body):
+ context = req.environ['nova.context']
+
+ if not body:
+ raise exc.HTTPUnprocessableEntity()
+
+ if not 'security_group_associate' in body:
+ raise exc.HTTPUnprocessableEntity()
+
+ security_group = self._get_security_group(context, id)
+
+ servers = body['security_group_associate'].get('servers')
+
+ if not servers:
+ msg = _("No servers found")
+ return exc.HTTPBadRequest(explanation=msg)
+
+ hosts = set()
+ for server in servers:
+ if server['id']:
+ try:
+ # check if the server exists
+ inst = db.instance_get(context, server['id'])
+ #check if the security group is assigned to the server
+ if self._is_security_group_associated_to_server(
+ security_group, inst['id']):
+ msg = _("Security group %s is already associated with"
+ " the instance %s") % (security_group['id'],
+ server['id'])
+ raise exc.HTTPBadRequest(explanation=msg)
+
+ #check if the instance is in running state
+ if inst['state'] != power_state.RUNNING:
+ msg = _("Server %s is not in the running state")\
+ % server['id']
+ raise exc.HTTPBadRequest(explanation=msg)
+
+ hosts.add(inst['host'])
+ except exception.InstanceNotFound as exp:
+ return exc.HTTPNotFound(explanation=unicode(exp))
+
+ # Associate security group with the server in the db
+ for server in servers:
+ if server['id']:
+ db.instance_add_security_group(context.elevated(),
+ server['id'],
+ security_group['id'])
+
+ for host in hosts:
+ rpc.cast(context,
+ db.queue_get_for(context, FLAGS.compute_topic, host),
+ {"method": "refresh_security_group_rules",
+ "args": {"security_group_id": security_group['id']}})
+
+ return exc.HTTPAccepted()
+
+ def _is_security_group_associated_to_server(self, security_group,
+ instance_id):
+ if not security_group:
+ return False
+
+ instances = security_group.get('instances')
+ if not instances:
+ return False
+
+ inst_id = None
+ for inst_id in (instance['id'] for instance in instances \
+ if instance_id == instance['id']):
+ return True
+
+ return False
+
+ def disassociate(self, req, id, body):
+ context = req.environ['nova.context']
+
+ if not body:
+ raise exc.HTTPUnprocessableEntity()
+
+ if not 'security_group_disassociate' in body:
+ raise exc.HTTPUnprocessableEntity()
+
+ security_group = self._get_security_group(context, id)
+
+ servers = body['security_group_disassociate'].get('servers')
+
+ if not servers:
+ msg = _("No servers found")
+ return exc.HTTPBadRequest(explanation=msg)
+
+ hosts = set()
+ for server in servers:
+ if server['id']:
+ try:
+ # check if the instance exists
+ inst = db.instance_get(context, server['id'])
+ # Check if the security group is not associated
+ # with the instance
+ if not self._is_security_group_associated_to_server(
+ security_group, inst['id']):
+ msg = _("Security group %s is not associated with the"
+ "instance %s") % (security_group['id'],
+ server['id'])
+ raise exc.HTTPBadRequest(explanation=msg)
+
+ #check if the instance is in running state
+ if inst['state'] != power_state.RUNNING:
+ msg = _("Server %s is not in the running state")\
+ % server['id']
+ raise exp.HTTPBadRequest(explanation=msg)
+
+ hosts.add(inst['host'])
+ except exception.InstanceNotFound as exp:
+ return exc.HTTPNotFound(explanation=unicode(exp))
+
+ # Disassociate security group from the server
+ for server in servers:
+ if server['id']:
+ db.instance_remove_security_group(context.elevated(),
+ server['id'],
+ security_group['id'])
+
+ for host in hosts:
+ rpc.cast(context,
+ db.queue_get_for(context, FLAGS.compute_topic, host),
+ {"method": "refresh_security_group_rules",
+ "args": {"security_group_id": security_group['id']}})
+
+ return exc.HTTPAccepted()
+
class SecurityGroupRulesController(SecurityGroupController):
@@ -226,9 +351,9 @@ class SecurityGroupRulesController(SecurityGroupController):
security_group_rule = db.security_group_rule_create(context, values)
self.compute_api.trigger_security_group_rules_refresh(context,
- security_group_id=security_group['id'])
+ security_group_id=security_group['id'])
- return {'security_group_rule': self._format_security_group_rule(
+ return {"security_group_rule": self._format_security_group_rule(
context,
security_group_rule)}
@@ -368,6 +493,10 @@ class Security_groups(extensions.ExtensionDescriptor):
res = extensions.ResourceExtension('os-security-groups',
controller=SecurityGroupController(),
+ member_actions={
+ 'associate': 'POST',
+ 'disassociate': 'POST'
+ },
deserializer=deserializer,
serializer=serializer)
@@ -405,6 +534,40 @@ class SecurityGroupXMLDeserializer(wsgi.MetadataXMLDeserializer):
security_group['description'] = self.extract_text(desc_node)
return {'body': {'security_group': security_group}}
+ def _get_servers(self, node):
+ servers_dict = {'servers': []}
+ if node is not None:
+ servers_node = self.find_first_child_named(node,
+ 'servers')
+ if servers_node is not None:
+ for server_node in self.find_children_named(servers_node,
+ "server"):
+ servers_dict['servers'].append(
+ {"id": self.extract_text(server_node)})
+ return servers_dict
+
+ def associate(self, string):
+ """Deserialize an xml-formatted security group associate request"""
+ dom = minidom.parseString(string)
+ node = self.find_first_child_named(dom,
+ 'security_group_associate')
+ result = {'body': {}}
+ if node:
+ result['body']['security_group_associate'] = \
+ self._get_servers(node)
+ return result
+
+ def disassociate(self, string):
+ """Deserialize an xml-formatted security group disassociate request"""
+ dom = minidom.parseString(string)
+ node = self.find_first_child_named(dom,
+ 'security_group_disassociate')
+ result = {'body': {}}
+ if node:
+ result['body']['security_group_disassociate'] = \
+ self._get_servers(node)
+ return result
+
class SecurityGroupRulesXMLDeserializer(wsgi.MetadataXMLDeserializer):
"""
diff --git a/nova/api/openstack/create_instance_helper.py b/nova/api/openstack/create_instance_helper.py
index 1425521a9..4ceb972c0 100644
--- a/nova/api/openstack/create_instance_helper.py
+++ b/nova/api/openstack/create_instance_helper.py
@@ -111,6 +111,16 @@ class CreateInstanceHelper(object):
if personality:
injected_files = self._get_injected_files(personality)
+ sg_names = []
+ security_groups = server_dict.get('security_groups')
+ if security_groups:
+ sg_names = [sg['name'] for sg in security_groups if sg.get('name')]
+ if not sg_names:
+ sg_names.append('default')
+
+ sg_names = list(set(sg_names))
+ LOG.debug(sg_names)
+
try:
flavor_id = self.controller._flavor_id_from_req_data(body)
except ValueError as error:
@@ -161,7 +171,8 @@ class CreateInstanceHelper(object):
zone_blob=zone_blob,
reservation_id=reservation_id,
min_count=min_count,
- max_count=max_count))
+ max_count=max_count,
+ security_group=sg_names))
except quota.QuotaError as error:
self._handle_quota_error(error)
except exception.ImageNotFound as error:
@@ -170,6 +181,8 @@ class CreateInstanceHelper(object):
except exception.FlavorNotFound as error:
msg = _("Invalid flavorRef provided.")
raise exc.HTTPBadRequest(explanation=msg)
+ except exception.SecurityGroupNotFound as error:
+ raise exc.HTTPBadRequest(explanation=unicode(error))
# Let the caller deal with unhandled exceptions.
def _handle_quota_error(self, error):
@@ -454,6 +467,8 @@ class ServerXMLDeserializerV11(wsgi.MetadataXMLDeserializer):
if personality is not None:
server["personality"] = personality
+ server["security_groups"] = self._extract_security_groups(server_node)
+
return server
def _extract_personality(self, server_node):
@@ -470,3 +485,16 @@ class ServerXMLDeserializerV11(wsgi.MetadataXMLDeserializer):
return personality
else:
return None
+
+ def _extract_security_groups(self, server_node):
+ """Marshal the security_groups attribute of a parsed request"""
+ node = self.find_first_child_named(server_node, "security_groups")
+ security_groups = []
+ if node is not None:
+ for sg_node in self.find_children_named(node, "security_group"):
+ item = {}
+ name_node = self.find_first_child_named(sg_node, "name")
+ if name_node:
+ item["name"] = self.extract_text(name_node)
+ security_groups.append(item)
+ return security_groups
diff --git a/nova/db/api.py b/nova/db/api.py
index 0f2218752..cf814d43e 100644
--- a/nova/db/api.py
+++ b/nova/db/api.py
@@ -570,6 +570,12 @@ def instance_add_security_group(context, instance_id, security_group_id):
security_group_id)
+def instance_remove_security_group(context, instance_id, security_group_id):
+ """Disassociate the given security group from the given instance."""
+ return IMPL.instance_remove_security_group(context, instance_id,
+ security_group_id)
+
+
def instance_get_vcpu_sum_by_host_and_project(context, hostname, proj_id):
"""Get instances.vcpus by host and project."""
return IMPL.instance_get_vcpu_sum_by_host_and_project(context,
diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py
index e5d35a20b..ba16f9109 100644
--- a/nova/db/sqlalchemy/api.py
+++ b/nova/db/sqlalchemy/api.py
@@ -1483,6 +1483,19 @@ def instance_add_security_group(context, instance_id, security_group_id):
@require_context
+def instance_remove_security_group(context, instance_id, security_group_id):
+ """Disassociate the given security group from the given instance"""
+ session = get_session()
+
+ session.query(models.SecurityGroupInstanceAssociation).\
+ filter_by(instance_id=instance_id).\
+ filter_by(security_group_id=security_group_id).\
+ update({'deleted': True,
+ 'deleted_at': utils.utcnow(),
+ 'updated_at': literal_column('updated_at')})
+
+
+@require_context
def instance_get_vcpu_sum_by_host_and_project(context, hostname, proj_id):
session = get_session()
result = session.query(models.Instance).\
@@ -2456,6 +2469,7 @@ def security_group_get(context, security_group_id, session=None):
filter_by(deleted=can_read_deleted(context),).\
filter_by(id=security_group_id).\
options(joinedload_all('rules')).\
+ options(joinedload_all('instances')).\
first()
else:
result = session.query(models.SecurityGroup).\
@@ -2463,6 +2477,7 @@ def security_group_get(context, security_group_id, session=None):
filter_by(id=security_group_id).\
filter_by(project_id=context.project_id).\
options(joinedload_all('rules')).\
+ options(joinedload_all('instances')).\
first()
if not result:
raise exception.SecurityGroupNotFound(
diff --git a/nova/tests/api/openstack/contrib/test_security_groups.py b/nova/tests/api/openstack/contrib/test_security_groups.py
index 4317880ca..894b0c591 100644
--- a/nova/tests/api/openstack/contrib/test_security_groups.py
+++ b/nova/tests/api/openstack/contrib/test_security_groups.py
@@ -15,10 +15,13 @@
# under the License.
import json
+import mox
+import nova
import unittest
import webob
from xml.dom import minidom
+from nova import exception
from nova import test
from nova.api.openstack.contrib import security_groups
from nova.tests.api.openstack import fakes
@@ -51,6 +54,28 @@ def _create_security_group_request_dict(security_group):
return {'security_group': sg}
+def return_server(context, server_id):
+ return {'id': server_id, 'state': 0x01, 'host': "localhost"}
+
+
+def return_non_running_server(context, server_id):
+ return {'id': server_id, 'state': 0x02,
+ 'host': "localhost"}
+
+
+def return_security_group(context, group_id):
+ return {'id': group_id, "instances": [
+ {'id': 1}]}
+
+
+def return_security_group_without_instances(context, group_id):
+ return {'id': group_id}
+
+
+def return_server_nonexistant(context, server_id):
+ raise exception.InstanceNotFound()
+
+
class TestSecurityGroups(test.TestCase):
def setUp(self):
super(TestSecurityGroups, self).setUp()
@@ -325,6 +350,280 @@ class TestSecurityGroups(test.TestCase):
response = self._delete_security_group(11111111)
self.assertEquals(response.status_int, 404)
+ def test_associate_by_non_existing_security_group_id(self):
+ req = webob.Request.blank('/v1.1/os-security-groups/111111/associate')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ body_dict = {"security_group_associate": {
+ "servers": [
+ {"id": '2'}
+ ]
+ }
+ }
+ req.body = json.dumps(body_dict)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 404)
+
+ def test_associate_by_invalid_security_group_id(self):
+ req = webob.Request.blank('/v1.1/os-security-groups/invalid/associate')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ body_dict = {"security_group_associate": {
+ "servers": [
+ {"id": "2"}
+ ]
+ }
+ }
+ req.body = json.dumps(body_dict)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 400)
+
+ def test_associate_without_body(self):
+ req = webob.Request.blank('/v1.1/os-security-groups/1/associate')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ req.body = json.dumps(None)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 422)
+
+ def test_associate_no_security_group_element(self):
+ req = webob.Request.blank('/v1.1/os-security-groups/1/associate')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ body_dict = {"security_group_associate_invalid": {
+ "servers": [
+ {"id": "2"}
+ ]
+ }
+ }
+ req.body = json.dumps(body_dict)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 422)
+
+ def test_associate_no_instances(self):
+ #self.stubs.Set(nova.db.api, 'instance_get', return_server)
+ self.stubs.Set(nova.db, 'security_group_get', return_security_group)
+ req = webob.Request.blank('/v1.1/os-security-groups/1/associate')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ body_dict = {"security_group_associate": {
+ "servers": [
+ ]
+ }
+ }
+ req.body = json.dumps(body_dict)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 400)
+
+ def test_associate_non_existing_instance(self):
+ self.stubs.Set(nova.db, 'instance_get', return_server_nonexistant)
+ self.stubs.Set(nova.db, 'security_group_get', return_security_group)
+ req = webob.Request.blank('/v1.1/os-security-groups/1/associate')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ body_dict = {"security_group_associate": {
+ "servers": [
+ {'id': 2}
+ ]
+ }
+ }
+ req.body = json.dumps(body_dict)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 404)
+
+ def test_associate_non_running_instance(self):
+ self.stubs.Set(nova.db, 'instance_get', return_non_running_server)
+ self.stubs.Set(nova.db, 'security_group_get',
+ return_security_group_without_instances)
+ req = webob.Request.blank('/v1.1/os-security-groups/1/associate')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ body_dict = {"security_group_associate": {
+ "servers": [
+ {'id': 1}
+ ]
+ }
+ }
+ req.body = json.dumps(body_dict)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 400)
+
+ def test_associate_already_associated_security_group_to_instance(self):
+ self.stubs.Set(nova.db, 'instance_get', return_server)
+ self.stubs.Set(nova.db, 'security_group_get', return_security_group)
+ req = webob.Request.blank('/v1.1/os-security-groups/1/associate')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ body_dict = {"security_group_associate": {
+ "servers": [
+ {'id': 1}
+ ]
+ }
+ }
+ req.body = json.dumps(body_dict)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 400)
+
+ def test_associate(self):
+ self.stubs.Set(nova.db, 'instance_get', return_server)
+ self.mox.StubOutWithMock(nova.db, 'instance_add_security_group')
+ nova.db.instance_add_security_group(mox.IgnoreArg(),
+ mox.IgnoreArg(),
+ mox.IgnoreArg())
+ self.stubs.Set(nova.db, 'security_group_get',
+ return_security_group_without_instances)
+ self.mox.ReplayAll()
+
+ req = webob.Request.blank('/v1.1/os-security-groups/1/associate')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ body_dict = {"security_group_associate": {
+ "servers": [
+ {'id': 1}
+ ]
+ }
+ }
+ req.body = json.dumps(body_dict)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 202)
+
+ def test_disassociate_by_non_existing_security_group_id(self):
+ req = webob.Request.blank('/v1.1/os-security-groups/1111/disassociate')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ body_dict = {"security_group_disassociate": {
+ "servers": [
+ {"id": "2"}
+ ]
+ }
+ }
+ req.body = json.dumps(body_dict)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 404)
+
+ def test_disassociate_by_invalid_security_group_id(self):
+ req = webob.Request.blank('/v1.1/os-security-groups/id/disassociate')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ body_dict = {"security_group_disassociate": {
+ "servers": [
+ {"id": "2"}
+ ]
+ }
+ }
+ req.body = json.dumps(body_dict)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 400)
+
+ def test_disassociate_without_body(self):
+ req = webob.Request.blank('/v1.1/os-security-groups/1/disassociate')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ req.body = json.dumps(None)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 422)
+
+ def test_disassociate_no_security_group_element(self):
+ req = webob.Request.blank('/v1.1/os-security-groups/1/disassociate')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ body_dict = {"security_group_disassociate_invalid": {
+ "servers": [
+ {"id": "2"}
+ ]
+ }
+ }
+ req.body = json.dumps(body_dict)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 422)
+
+ def test_disassociate_no_instances(self):
+ #self.stubs.Set(nova.db.api, 'instance_get', return_server)
+ self.stubs.Set(nova.db, 'security_group_get', return_security_group)
+ req = webob.Request.blank('/v1.1/os-security-groups/1/disassociate')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ body_dict = {"security_group_disassociate": {
+ "servers": [
+ ]
+ }
+ }
+ req.body = json.dumps(body_dict)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 400)
+
+ def test_disassociate_non_existing_instance(self):
+ self.stubs.Set(nova.db, 'instance_get', return_server_nonexistant)
+ self.stubs.Set(nova.db, 'security_group_get', return_security_group)
+ req = webob.Request.blank('/v1.1/os-security-groups/1/disassociate')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ body_dict = {"security_group_disassociate": {
+ "servers": [
+ {'id': 2}
+ ]
+ }
+ }
+ req.body = json.dumps(body_dict)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 404)
+
+ def test_disassociate_non_running_instance(self):
+ self.stubs.Set(nova.db, 'instance_get', return_non_running_server)
+ self.stubs.Set(nova.db, 'security_group_get',
+ return_security_group_without_instances)
+ req = webob.Request.blank('/v1.1/os-security-groups/1/disassociate')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ body_dict = {"security_group_disassociate": {
+ "servers": [
+ {'id': 1}
+ ]
+ }
+ }
+ req.body = json.dumps(body_dict)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 400)
+
+ def test_disassociate_not_associated_security_group_to_instance(self):
+ self.stubs.Set(nova.db, 'instance_get', return_server)
+ self.stubs.Set(nova.db, 'security_group_get', return_security_group)
+ req = webob.Request.blank('/v1.1/os-security-groups/1/disassociate')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ body_dict = {"security_group_disassociate": {
+ "servers": [
+ {'id': 2}
+ ]
+ }
+ }
+ req.body = json.dumps(body_dict)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 400)
+
+ def test_disassociate(self):
+ self.stubs.Set(nova.db, 'instance_get', return_server)
+ self.mox.StubOutWithMock(nova.db, 'instance_remove_security_group')
+ nova.db.instance_remove_security_group(mox.IgnoreArg(),
+ mox.IgnoreArg(),
+ mox.IgnoreArg())
+ self.stubs.Set(nova.db, 'security_group_get',
+ return_security_group)
+ self.mox.ReplayAll()
+
+ req = webob.Request.blank('/v1.1/os-security-groups/1/disassociate')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ body_dict = {"security_group_disassociate": {
+ "servers": [
+ {'id': 1}
+ ]
+ }
+ }
+ req.body = json.dumps(body_dict)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 202)
+
class TestSecurityGroupRules(test.TestCase):
def setUp(self):