summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTushar Patil <tushar.vitthal.patil@gmail.com>2011-08-21 00:04:29 +0000
committerTarmac <>2011-08-21 00:04:29 +0000
commit271817cdce37c55f29bb9782429ee8b6ad57364e (patch)
tree40483fbcd36ea864039af10969645e184b7d535d
parent7924fb7899b02d3cb7420c916e035094d5c90194 (diff)
parentbb989133196744779527e36cba22a76bd44e533b (diff)
Added OS APIs to associate/disassociate security groups to/from instances.
I will add views to return list of security groups associated with the servers later after this branch is merged into trunk. The reason I will do this later is because my previous merge proposal (https://code.launchpad.net/~tpatil/nova/add-options-network-create-os-apis/+merge/68292) is dependent on this work. In this merge proposal I have added a new extension which still uses default OS v1.1 controllers and views, but I am going to override views in this extension to send extra information like security groups.
-rw-r--r--nova/api/openstack/contrib/security_groups.py113
-rw-r--r--nova/api/openstack/create_instance_helper.py31
-rw-r--r--nova/compute/api.py72
-rw-r--r--nova/db/api.py6
-rw-r--r--nova/db/sqlalchemy/api.py15
-rw-r--r--nova/exception.py10
-rw-r--r--nova/tests/api/openstack/contrib/test_security_groups.py271
7 files changed, 500 insertions, 18 deletions
diff --git a/nova/api/openstack/contrib/security_groups.py b/nova/api/openstack/contrib/security_groups.py
index 6c57fbb51..1fd64f3b8 100644
--- a/nova/api/openstack/contrib/security_groups.py
+++ b/nova/api/openstack/contrib/security_groups.py
@@ -25,10 +25,11 @@ from nova import db
from nova import exception
from nova import flags
from nova import log as logging
+from nova import rpc
from nova.api.openstack import common
from nova.api.openstack import extensions
from nova.api.openstack import wsgi
-
+from nova.compute import power_state
from xml.dom import minidom
@@ -73,33 +74,28 @@ class SecurityGroupController(object):
context, rule)]
return security_group
- def show(self, req, id):
- """Return data about the given security group."""
- context = req.environ['nova.context']
+ def _get_security_group(self, context, id):
try:
id = int(id)
security_group = db.security_group_get(context, id)
except ValueError:
- msg = _("Security group id is not integer")
- return exc.HTTPBadRequest(explanation=msg)
+ msg = _("Security group id should be integer")
+ raise exc.HTTPBadRequest(explanation=msg)
except exception.NotFound as exp:
- return exc.HTTPNotFound(explanation=unicode(exp))
+ raise exc.HTTPNotFound(explanation=unicode(exp))
+ return security_group
+ def show(self, req, id):
+ """Return data about the given security group."""
+ context = req.environ['nova.context']
+ security_group = self._get_security_group(context, id)
return {'security_group': self._format_security_group(context,
security_group)}
def delete(self, req, id):
"""Delete a security group."""
context = req.environ['nova.context']
- try:
- id = int(id)
- security_group = db.security_group_get(context, id)
- except ValueError:
- msg = _("Security group id is not integer")
- return exc.HTTPBadRequest(explanation=msg)
- except exception.SecurityGroupNotFound as exp:
- return exc.HTTPNotFound(explanation=unicode(exp))
-
+ security_group = self._get_security_group(context, id)
LOG.audit(_("Delete security group %s"), id, context=context)
db.security_group_destroy(context, security_group.id)
@@ -226,9 +222,9 @@ class SecurityGroupRulesController(SecurityGroupController):
security_group_rule = db.security_group_rule_create(context, values)
self.compute_api.trigger_security_group_rules_refresh(context,
- security_group_id=security_group['id'])
+ security_group_id=security_group['id'])
- return {'security_group_rule': self._format_security_group_rule(
+ return {"security_group_rule": self._format_security_group_rule(
context,
security_group_rule)}
@@ -336,6 +332,11 @@ class SecurityGroupRulesController(SecurityGroupController):
class Security_groups(extensions.ExtensionDescriptor):
+
+ def __init__(self):
+ self.compute_api = compute.API()
+ super(Security_groups, self).__init__()
+
def get_name(self):
return "SecurityGroups"
@@ -351,6 +352,82 @@ class Security_groups(extensions.ExtensionDescriptor):
def get_updated(self):
return "2011-07-21T00:00:00+00:00"
+ def _addSecurityGroup(self, input_dict, req, instance_id):
+ context = req.environ['nova.context']
+
+ try:
+ body = input_dict['addSecurityGroup']
+ group_name = body['name']
+ instance_id = int(instance_id)
+ except ValueError:
+ msg = _("Server id should be integer")
+ raise exc.HTTPBadRequest(explanation=msg)
+ except TypeError:
+ msg = _("Missing parameter dict")
+ raise webob.exc.HTTPBadRequest(explanation=msg)
+ except KeyError:
+ msg = _("Security group not specified")
+ raise webob.exc.HTTPBadRequest(explanation=msg)
+
+ if not group_name or group_name.strip() == '':
+ msg = _("Security group name cannot be empty")
+ raise webob.exc.HTTPBadRequest(explanation=msg)
+
+ try:
+ self.compute_api.add_security_group(context, instance_id,
+ group_name)
+ except exception.SecurityGroupNotFound as exp:
+ return exc.HTTPNotFound(explanation=unicode(exp))
+ except exception.InstanceNotFound as exp:
+ return exc.HTTPNotFound(explanation=unicode(exp))
+ except exception.Invalid as exp:
+ return exc.HTTPBadRequest(explanation=unicode(exp))
+
+ return exc.HTTPAccepted()
+
+ def _removeSecurityGroup(self, input_dict, req, instance_id):
+ context = req.environ['nova.context']
+
+ try:
+ body = input_dict['removeSecurityGroup']
+ group_name = body['name']
+ instance_id = int(instance_id)
+ except ValueError:
+ msg = _("Server id should be integer")
+ raise exc.HTTPBadRequest(explanation=msg)
+ except TypeError:
+ msg = _("Missing parameter dict")
+ raise webob.exc.HTTPBadRequest(explanation=msg)
+ except KeyError:
+ msg = _("Security group not specified")
+ raise webob.exc.HTTPBadRequest(explanation=msg)
+
+ if not group_name or group_name.strip() == '':
+ msg = _("Security group name cannot be empty")
+ raise webob.exc.HTTPBadRequest(explanation=msg)
+
+ try:
+ self.compute_api.remove_security_group(context, instance_id,
+ group_name)
+ except exception.SecurityGroupNotFound as exp:
+ return exc.HTTPNotFound(explanation=unicode(exp))
+ except exception.InstanceNotFound as exp:
+ return exc.HTTPNotFound(explanation=unicode(exp))
+ except exception.Invalid as exp:
+ return exc.HTTPBadRequest(explanation=unicode(exp))
+
+ return exc.HTTPAccepted()
+
+ def get_actions(self):
+ """Return the actions the extensions adds"""
+ actions = [
+ extensions.ActionExtension("servers", "addSecurityGroup",
+ self._addSecurityGroup),
+ extensions.ActionExtension("servers", "removeSecurityGroup",
+ self._removeSecurityGroup)
+ ]
+ return actions
+
def get_resources(self):
resources = []
diff --git a/nova/api/openstack/create_instance_helper.py b/nova/api/openstack/create_instance_helper.py
index 978741682..73e48dca5 100644
--- a/nova/api/openstack/create_instance_helper.py
+++ b/nova/api/openstack/create_instance_helper.py
@@ -111,6 +111,15 @@ class CreateInstanceHelper(object):
if personality:
injected_files = self._get_injected_files(personality)
+ sg_names = []
+ security_groups = server_dict.get('security_groups')
+ if security_groups is not None:
+ sg_names = [sg['name'] for sg in security_groups if sg.get('name')]
+ if not sg_names:
+ sg_names.append('default')
+
+ sg_names = list(set(sg_names))
+
try:
flavor_id = self.controller._flavor_id_from_req_data(body)
except ValueError as error:
@@ -164,6 +173,7 @@ class CreateInstanceHelper(object):
reservation_id=reservation_id,
min_count=min_count,
max_count=max_count,
+ security_group=sg_names,
user_data=user_data,
availability_zone=availability_zone))
except quota.QuotaError as error:
@@ -174,6 +184,8 @@ class CreateInstanceHelper(object):
except exception.FlavorNotFound as error:
msg = _("Invalid flavorRef provided.")
raise exc.HTTPBadRequest(explanation=msg)
+ except exception.SecurityGroupNotFound as error:
+ raise exc.HTTPBadRequest(explanation=unicode(error))
# Let the caller deal with unhandled exceptions.
def _handle_quota_error(self, error):
@@ -465,6 +477,10 @@ class ServerXMLDeserializerV11(wsgi.MetadataXMLDeserializer):
if personality is not None:
server["personality"] = personality
+ security_groups = self._extract_security_groups(server_node)
+ if security_groups is not None:
+ server["security_groups"] = security_groups
+
return server
def _extract_personality(self, server_node):
@@ -481,3 +497,18 @@ class ServerXMLDeserializerV11(wsgi.MetadataXMLDeserializer):
return personality
else:
return None
+
+ def _extract_security_groups(self, server_node):
+ """Marshal the security_groups attribute of a parsed request"""
+ node = self.find_first_child_named(server_node, "security_groups")
+ if node is not None:
+ security_groups = []
+ for sg_node in self.find_children_named(node, "security_group"):
+ item = {}
+ name_node = self.find_first_child_named(sg_node, "name")
+ if name_node:
+ item["name"] = self.extract_text(name_node)
+ security_groups.append(item)
+ return security_groups
+ else:
+ return None
diff --git a/nova/compute/api.py b/nova/compute/api.py
index efc9da79b..0c6beacaa 100644
--- a/nova/compute/api.py
+++ b/nova/compute/api.py
@@ -613,6 +613,78 @@ class API(base.Base):
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{'method': 'refresh_provider_fw_rules', 'args': {}})
+ def _is_security_group_associated_with_server(self, security_group,
+ instance_id):
+ """Check if the security group is already associated
+ with the instance. If Yes, return True.
+ """
+
+ if not security_group:
+ return False
+
+ instances = security_group.get('instances')
+ if not instances:
+ return False
+
+ inst_id = None
+ for inst_id in (instance['id'] for instance in instances \
+ if instance_id == instance['id']):
+ return True
+
+ return False
+
+ def add_security_group(self, context, instance_id, security_group_name):
+ """Add security group to the instance"""
+ security_group = db.security_group_get_by_name(context,
+ context.project_id,
+ security_group_name)
+ # check if the server exists
+ inst = db.instance_get(context, instance_id)
+ #check if the security group is associated with the server
+ if self._is_security_group_associated_with_server(security_group,
+ instance_id):
+ raise exception.SecurityGroupExistsForInstance(
+ security_group_id=security_group['id'],
+ instance_id=instance_id)
+
+ #check if the instance is in running state
+ if inst['state'] != power_state.RUNNING:
+ raise exception.InstanceNotRunning(instance_id=instance_id)
+
+ db.instance_add_security_group(context.elevated(),
+ instance_id,
+ security_group['id'])
+ rpc.cast(context,
+ db.queue_get_for(context, FLAGS.compute_topic, inst['host']),
+ {"method": "refresh_security_group_rules",
+ "args": {"security_group_id": security_group['id']}})
+
+ def remove_security_group(self, context, instance_id, security_group_name):
+ """Remove the security group associated with the instance"""
+ security_group = db.security_group_get_by_name(context,
+ context.project_id,
+ security_group_name)
+ # check if the server exists
+ inst = db.instance_get(context, instance_id)
+ #check if the security group is associated with the server
+ if not self._is_security_group_associated_with_server(security_group,
+ instance_id):
+ raise exception.SecurityGroupNotExistsForInstance(
+ security_group_id=security_group['id'],
+ instance_id=instance_id)
+
+ #check if the instance is in running state
+ if inst['state'] != power_state.RUNNING:
+ raise exception.InstanceNotRunning(instance_id=instance_id)
+
+ db.instance_remove_security_group(context.elevated(),
+ instance_id,
+ security_group['id'])
+ rpc.cast(context,
+ db.queue_get_for(context, FLAGS.compute_topic, inst['host']),
+ {"method": "refresh_security_group_rules",
+ "args": {"security_group_id": security_group['id']}})
+
@scheduler_api.reroute_compute("update")
def update(self, context, instance_id, **kwargs):
"""Updates the instance in the datastore.
diff --git a/nova/db/api.py b/nova/db/api.py
index b9ea8757c..e946e8436 100644
--- a/nova/db/api.py
+++ b/nova/db/api.py
@@ -570,6 +570,12 @@ def instance_add_security_group(context, instance_id, security_group_id):
security_group_id)
+def instance_remove_security_group(context, instance_id, security_group_id):
+ """Disassociate the given security group from the given instance."""
+ return IMPL.instance_remove_security_group(context, instance_id,
+ security_group_id)
+
+
def instance_action_create(context, values):
"""Create an instance action from the values dictionary."""
return IMPL.instance_action_create(context, values)
diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py
index fe80056ab..0f747c602 100644
--- a/nova/db/sqlalchemy/api.py
+++ b/nova/db/sqlalchemy/api.py
@@ -1502,6 +1502,19 @@ def instance_add_security_group(context, instance_id, security_group_id):
@require_context
+def instance_remove_security_group(context, instance_id, security_group_id):
+ """Disassociate the given security group from the given instance"""
+ session = get_session()
+
+ session.query(models.SecurityGroupInstanceAssociation).\
+ filter_by(instance_id=instance_id).\
+ filter_by(security_group_id=security_group_id).\
+ update({'deleted': True,
+ 'deleted_at': utils.utcnow(),
+ 'updated_at': literal_column('updated_at')})
+
+
+@require_context
def instance_action_create(context, values):
"""Create an instance action from the values dictionary."""
action_ref = models.InstanceActions()
@@ -2437,6 +2450,7 @@ def security_group_get(context, security_group_id, session=None):
filter_by(deleted=can_read_deleted(context),).\
filter_by(id=security_group_id).\
options(joinedload_all('rules')).\
+ options(joinedload_all('instances')).\
first()
else:
result = session.query(models.SecurityGroup).\
@@ -2444,6 +2458,7 @@ def security_group_get(context, security_group_id, session=None):
filter_by(id=security_group_id).\
filter_by(project_id=context.project_id).\
options(joinedload_all('rules')).\
+ options(joinedload_all('instances')).\
first()
if not result:
raise exception.SecurityGroupNotFound(
diff --git a/nova/exception.py b/nova/exception.py
index b09d50797..e8cb7bcb5 100644
--- a/nova/exception.py
+++ b/nova/exception.py
@@ -541,6 +541,16 @@ class SecurityGroupNotFoundForRule(SecurityGroupNotFound):
message = _("Security group with rule %(rule_id)s not found.")
+class SecurityGroupExistsForInstance(Invalid):
+ message = _("Security group %(security_group_id)s is already associated"
+ " with the instance %(instance_id)s")
+
+
+class SecurityGroupNotExistsForInstance(Invalid):
+ message = _("Security group %(security_group_id)s is not associated with"
+ " the instance %(instance_id)s")
+
+
class MigrationNotFound(NotFound):
message = _("Migration %(migration_id)s could not be found.")
diff --git a/nova/tests/api/openstack/contrib/test_security_groups.py b/nova/tests/api/openstack/contrib/test_security_groups.py
index 4317880ca..b44ebc9fb 100644
--- a/nova/tests/api/openstack/contrib/test_security_groups.py
+++ b/nova/tests/api/openstack/contrib/test_security_groups.py
@@ -15,10 +15,13 @@
# under the License.
import json
+import mox
+import nova
import unittest
import webob
from xml.dom import minidom
+from nova import exception
from nova import test
from nova.api.openstack.contrib import security_groups
from nova.tests.api.openstack import fakes
@@ -51,6 +54,28 @@ def _create_security_group_request_dict(security_group):
return {'security_group': sg}
+def return_server(context, server_id):
+ return {'id': server_id, 'state': 0x01, 'host': "localhost"}
+
+
+def return_non_running_server(context, server_id):
+ return {'id': server_id, 'state': 0x02,
+ 'host': "localhost"}
+
+
+def return_security_group(context, project_id, group_name):
+ return {'id': 1, 'name': group_name, "instances": [
+ {'id': 1}]}
+
+
+def return_security_group_without_instances(context, project_id, group_name):
+ return {'id': 1, 'name': group_name}
+
+
+def return_server_nonexistant(context, server_id):
+ raise exception.InstanceNotFound(instance_id=server_id)
+
+
class TestSecurityGroups(test.TestCase):
def setUp(self):
super(TestSecurityGroups, self).setUp()
@@ -325,6 +350,252 @@ class TestSecurityGroups(test.TestCase):
response = self._delete_security_group(11111111)
self.assertEquals(response.status_int, 404)
+ def test_associate_by_non_existing_security_group_name(self):
+ body = dict(addSecurityGroup=dict(name='non-existing'))
+ req = webob.Request.blank('/v1.1/servers/1/action')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 404)
+
+ def test_associate_by_invalid_server_id(self):
+ body = dict(addSecurityGroup=dict(name='test'))
+ self.stubs.Set(nova.db, 'security_group_get_by_name',
+ return_security_group)
+ req = webob.Request.blank('/v1.1/servers/invalid/action')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 400)
+
+ def test_associate_without_body(self):
+ req = webob.Request.blank('/v1.1/servers/1/action')
+ body = dict(addSecurityGroup=None)
+ self.stubs.Set(nova.db, 'instance_get', return_server)
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 400)
+
+ def test_associate_no_security_group_name(self):
+ req = webob.Request.blank('/v1.1/servers/1/action')
+ body = dict(addSecurityGroup=dict())
+ self.stubs.Set(nova.db, 'instance_get', return_server)
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 400)
+
+ def test_associate_security_group_name_with_whitespaces(self):
+ req = webob.Request.blank('/v1.1/servers/1/action')
+ body = dict(addSecurityGroup=dict(name=" "))
+ self.stubs.Set(nova.db, 'instance_get', return_server)
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 400)
+
+ def test_associate_non_existing_instance(self):
+ self.stubs.Set(nova.db, 'instance_get', return_server_nonexistant)
+ body = dict(addSecurityGroup=dict(name="test"))
+ self.stubs.Set(nova.db, 'security_group_get_by_name',
+ return_security_group)
+ req = webob.Request.blank('/v1.1/servers/10000/action')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 404)
+
+ def test_associate_non_running_instance(self):
+ self.stubs.Set(nova.db, 'instance_get', return_non_running_server)
+ self.stubs.Set(nova.db, 'security_group_get_by_name',
+ return_security_group_without_instances)
+ body = dict(addSecurityGroup=dict(name="test"))
+ req = webob.Request.blank('/v1.1/servers/1/action')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 400)
+
+ def test_associate_already_associated_security_group_to_instance(self):
+ self.stubs.Set(nova.db, 'instance_get', return_server)
+ self.stubs.Set(nova.db, 'security_group_get_by_name',
+ return_security_group)
+ body = dict(addSecurityGroup=dict(name="test"))
+ req = webob.Request.blank('/v1.1/servers/1/action')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 400)
+
+ def test_associate(self):
+ self.stubs.Set(nova.db, 'instance_get', return_server)
+ self.mox.StubOutWithMock(nova.db, 'instance_add_security_group')
+ nova.db.instance_add_security_group(mox.IgnoreArg(),
+ mox.IgnoreArg(),
+ mox.IgnoreArg())
+ self.stubs.Set(nova.db, 'security_group_get_by_name',
+ return_security_group_without_instances)
+ self.mox.ReplayAll()
+
+ body = dict(addSecurityGroup=dict(name="test"))
+ req = webob.Request.blank('/v1.1/servers/1/action')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 202)
+
+ def test_associate_xml(self):
+ self.stubs.Set(nova.db, 'instance_get', return_server)
+ self.mox.StubOutWithMock(nova.db, 'instance_add_security_group')
+ nova.db.instance_add_security_group(mox.IgnoreArg(),
+ mox.IgnoreArg(),
+ mox.IgnoreArg())
+ self.stubs.Set(nova.db, 'security_group_get_by_name',
+ return_security_group_without_instances)
+ self.mox.ReplayAll()
+
+ req = webob.Request.blank('/v1.1/servers/1/action')
+ req.headers['Content-Type'] = 'application/xml'
+ req.method = 'POST'
+ req.body = """<addSecurityGroup>
+ <name>test</name>
+ </addSecurityGroup>"""
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 202)
+
+ def test_disassociate_by_non_existing_security_group_name(self):
+ body = dict(removeSecurityGroup=dict(name='non-existing'))
+ req = webob.Request.blank('/v1.1/servers/1/action')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 404)
+
+ def test_disassociate_by_invalid_server_id(self):
+ body = dict(removeSecurityGroup=dict(name='test'))
+ self.stubs.Set(nova.db, 'security_group_get_by_name',
+ return_security_group)
+ req = webob.Request.blank('/v1.1/servers/invalid/action')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 400)
+
+ def test_disassociate_without_body(self):
+ req = webob.Request.blank('/v1.1/servers/1/action')
+ body = dict(removeSecurityGroup=None)
+ self.stubs.Set(nova.db, 'instance_get', return_server)
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 400)
+
+ def test_disassociate_no_security_group_name(self):
+ req = webob.Request.blank('/v1.1/servers/1/action')
+ body = dict(removeSecurityGroup=dict())
+ self.stubs.Set(nova.db, 'instance_get', return_server)
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 400)
+
+ def test_disassociate_security_group_name_with_whitespaces(self):
+ req = webob.Request.blank('/v1.1/servers/1/action')
+ body = dict(removeSecurityGroup=dict(name=" "))
+ self.stubs.Set(nova.db, 'instance_get', return_server)
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 400)
+
+ def test_disassociate_non_existing_instance(self):
+ self.stubs.Set(nova.db, 'instance_get', return_server_nonexistant)
+ body = dict(removeSecurityGroup=dict(name="test"))
+ self.stubs.Set(nova.db, 'security_group_get_by_name',
+ return_security_group)
+ req = webob.Request.blank('/v1.1/servers/10000/action')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 404)
+
+ def test_disassociate_non_running_instance(self):
+ self.stubs.Set(nova.db, 'instance_get', return_non_running_server)
+ self.stubs.Set(nova.db, 'security_group_get_by_name',
+ return_security_group)
+ body = dict(removeSecurityGroup=dict(name="test"))
+ req = webob.Request.blank('/v1.1/servers/1/action')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 400)
+
+ def test_disassociate_already_associated_security_group_to_instance(self):
+ self.stubs.Set(nova.db, 'instance_get', return_server)
+ self.stubs.Set(nova.db, 'security_group_get_by_name',
+ return_security_group_without_instances)
+ body = dict(removeSecurityGroup=dict(name="test"))
+ req = webob.Request.blank('/v1.1/servers/1/action')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 400)
+
+ def test_disassociate(self):
+ self.stubs.Set(nova.db, 'instance_get', return_server)
+ self.mox.StubOutWithMock(nova.db, 'instance_remove_security_group')
+ nova.db.instance_remove_security_group(mox.IgnoreArg(),
+ mox.IgnoreArg(),
+ mox.IgnoreArg())
+ self.stubs.Set(nova.db, 'security_group_get_by_name',
+ return_security_group)
+ self.mox.ReplayAll()
+
+ body = dict(removeSecurityGroup=dict(name="test"))
+ req = webob.Request.blank('/v1.1/servers/1/action')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 202)
+
+ def test_disassociate_xml(self):
+ self.stubs.Set(nova.db, 'instance_get', return_server)
+ self.mox.StubOutWithMock(nova.db, 'instance_remove_security_group')
+ nova.db.instance_remove_security_group(mox.IgnoreArg(),
+ mox.IgnoreArg(),
+ mox.IgnoreArg())
+ self.stubs.Set(nova.db, 'security_group_get_by_name',
+ return_security_group)
+ self.mox.ReplayAll()
+
+ req = webob.Request.blank('/v1.1/servers/1/action')
+ req.headers['Content-Type'] = 'application/xml'
+ req.method = 'POST'
+ req.body = """<removeSecurityGroup>
+ <name>test</name>
+ </removeSecurityGroup>"""
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 202)
+
class TestSecurityGroupRules(test.TestCase):
def setUp(self):