summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2011-11-03 03:42:26 +0000
committerGerrit Code Review <review@openstack.org>2011-11-03 03:42:26 +0000
commit286e39ee0836dc5b0ea461446aa838ca878c0ca4 (patch)
tree84f898643148565438c06620ea5ae1a5389efddc
parent4fb1eda581606d804be8da9e8045d7641725e33c (diff)
parent6932d6b94fb6c131c8cff76e28a7d8af342ab827 (diff)
downloadnova-286e39ee0836dc5b0ea461446aa838ca878c0ca4.tar.gz
nova-286e39ee0836dc5b0ea461446aa838ca878c0ca4.tar.xz
nova-286e39ee0836dc5b0ea461446aa838ca878c0ca4.zip
Merge "Speed up tests yet another 45 seconds"
-rw-r--r--nova/tests/api/openstack/contrib/test_security_groups.py1020
1 files changed, 436 insertions, 584 deletions
diff --git a/nova/tests/api/openstack/contrib/test_security_groups.py b/nova/tests/api/openstack/contrib/test_security_groups.py
index 92b721d30..f55ce4a55 100644
--- a/nova/tests/api/openstack/contrib/test_security_groups.py
+++ b/nova/tests/api/openstack/contrib/test_security_groups.py
@@ -30,31 +30,44 @@ from nova.tests.api.openstack import fakes
FAKE_UUID = 'a47ae74e-ab08-447f-8eee-ffd43fc46c16'
-def _get_create_request_json(body_dict):
- req = webob.Request.blank('/v1.1/123/os-security-groups')
- req.headers['Content-Type'] = 'application/json'
- req.method = 'POST'
- req.body = json.dumps(body_dict)
- return req
+class AttrDict(dict):
+ def __getattr__(self, k):
+ return self[k]
-def _create_security_group_json(security_group):
- body_dict = _create_security_group_request_dict(security_group)
- request = _get_create_request_json(body_dict)
- response = request.get_response(fakes.wsgi_app())
- return response
+def security_group_template(**kwargs):
+ sg = kwargs.copy()
+ sg.setdefault('tenant_id', '123')
+ sg.setdefault('name', 'test')
+ sg.setdefault('description', 'test-description')
+ return sg
-def _create_security_group_request_dict(security_group):
- sg = {}
- if security_group is not None:
- name = security_group.get('name', None)
- description = security_group.get('description', None)
- if name:
- sg['name'] = security_group['name']
- if description:
- sg['description'] = security_group['description']
- return {'security_group': sg}
+def security_group_db(security_group, id=None):
+ attrs = security_group.copy()
+ if 'tenant_id' in attrs:
+ attrs['project_id'] = attrs.pop('tenant_id')
+ if id is not None:
+ attrs['id'] = id
+ attrs.setdefault('rules', [])
+ attrs.setdefault('instances', [])
+ return AttrDict(attrs)
+
+
+def security_group_rule_template(**kwargs):
+ rule = kwargs.copy()
+ rule.setdefault('ip_protocol', 'tcp')
+ rule.setdefault('from_port', 22)
+ rule.setdefault('to_port', 22)
+ rule.setdefault('parent_group_id', 2)
+ return rule
+
+
+def security_group_rule_db(rule, id=None):
+ attrs = rule.copy()
+ if 'ip_protocol' in attrs:
+ attrs['protocol'] = attrs.pop('ip_protocol')
+ return AttrDict(attrs)
def return_server(context, server_id):
@@ -72,13 +85,11 @@ def return_server_by_uuid(context, server_uuid):
def return_non_running_server(context, server_id):
- return {'id': server_id, 'state': 0x02,
- 'host': "localhost"}
+ return {'id': server_id, 'state': 0x02, 'host': "localhost"}
-def return_security_group(context, project_id, group_name):
- return {'id': 1, 'name': group_name, "instances": [
- {'id': 1}]}
+def return_security_group_by_name(context, project_id, group_name):
+ return {'id': 1, 'name': group_name, "instances": [{'id': 1}]}
def return_security_group_without_instances(context, project_id, group_name):
@@ -89,286 +100,251 @@ def return_server_nonexistent(context, server_id):
raise exception.InstanceNotFound(instance_id=server_id)
+class StubExtensionManager(object):
+ def register(self, *args):
+ pass
+
+
class TestSecurityGroups(test.TestCase):
def setUp(self):
super(TestSecurityGroups, self).setUp()
+ self.controller = security_groups.SecurityGroupController()
+ self.manager = security_groups.Security_groups(StubExtensionManager())
+
def tearDown(self):
super(TestSecurityGroups, self).tearDown()
- def _create_security_group_request_dict(self, security_group):
- sg = {}
- if security_group is not None:
- name = security_group.get('name', None)
- description = security_group.get('description', None)
- if name:
- sg['name'] = security_group['name']
- if description:
- sg['description'] = security_group['description']
- return {'security_group': sg}
-
- def _delete_security_group(self, id):
- request = webob.Request.blank('/v1.1/123/os-security-groups/%s'
- % id)
- request.method = 'DELETE'
- response = request.get_response(fakes.wsgi_app())
- return response
-
def test_create_security_group(self):
- security_group = {}
- security_group['name'] = "test"
- security_group['description'] = "group-description"
- response = _create_security_group_json(security_group)
- res_dict = json.loads(response.body)
- self.assertEqual(res_dict['security_group']['name'], "test")
+ sg = security_group_template()
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups')
+ res_dict = self.controller.create(req, {'security_group': sg})
+ self.assertEqual(res_dict['security_group']['name'], 'test')
self.assertEqual(res_dict['security_group']['description'],
- "group-description")
- self.assertEquals(response.status_int, 200)
+ 'test-description')
def test_create_security_group_with_no_name(self):
- security_group = {}
- security_group['description'] = "group-description"
- response = _create_security_group_json(security_group)
- self.assertEquals(response.status_int, 400)
+ sg = security_group_template()
+ del sg['name']
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups')
+ self.assertRaises(webob.exc.HTTPUnprocessableEntity,
+ self.controller.create, req, sg)
def test_create_security_group_with_no_description(self):
- security_group = {}
- security_group['name'] = "test"
- response = _create_security_group_json(security_group)
- self.assertEquals(response.status_int, 400)
+ sg = security_group_template()
+ del sg['description']
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group': sg})
def test_create_security_group_with_blank_name(self):
- security_group = {}
- security_group['name'] = ""
- security_group['description'] = "group-description"
- response = _create_security_group_json(security_group)
- self.assertEquals(response.status_int, 400)
+ sg = security_group_template(name='')
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group': sg})
def test_create_security_group_with_whitespace_name(self):
- security_group = {}
- security_group['name'] = " "
- security_group['description'] = "group-description"
- response = _create_security_group_json(security_group)
- self.assertEquals(response.status_int, 400)
+ sg = security_group_template(name=' ')
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group': sg})
def test_create_security_group_with_blank_description(self):
- security_group = {}
- security_group['name'] = "test"
- security_group['description'] = ""
- response = _create_security_group_json(security_group)
- self.assertEquals(response.status_int, 400)
+ sg = security_group_template(description='')
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group': sg})
def test_create_security_group_with_whitespace_description(self):
- security_group = {}
- security_group['name'] = "name"
- security_group['description'] = " "
- response = _create_security_group_json(security_group)
- self.assertEquals(response.status_int, 400)
+ sg = security_group_template(description=' ')
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group': sg})
def test_create_security_group_with_duplicate_name(self):
- security_group = {}
- security_group['name'] = "test"
- security_group['description'] = "group-description"
- response = _create_security_group_json(security_group)
+ sg = security_group_template()
+
+ # FIXME: Stub out _get instead of creating twice
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups')
+ self.controller.create(req, {'security_group': sg})
- self.assertEquals(response.status_int, 200)
- response = _create_security_group_json(security_group)
- self.assertEquals(response.status_int, 400)
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group': sg})
def test_create_security_group_with_no_body(self):
- request = _get_create_request_json(body_dict=None)
- response = request.get_response(fakes.wsgi_app())
- self.assertEquals(response.status_int, 422)
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups')
+ self.assertRaises(webob.exc.HTTPUnprocessableEntity,
+ self.controller.create, req, None)
def test_create_security_group_with_no_security_group(self):
- body_dict = {}
- body_dict['no-securityGroup'] = None
- request = _get_create_request_json(body_dict)
- response = request.get_response(fakes.wsgi_app())
- self.assertEquals(response.status_int, 422)
+ body = {'no-securityGroup': None}
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups')
+ self.assertRaises(webob.exc.HTTPUnprocessableEntity,
+ self.controller.create, req, body)
def test_create_security_group_above_255_characters_name(self):
- security_group = {}
- security_group['name'] = ("1234567890123456"
- "1234567890123456789012345678901234567890"
- "1234567890123456789012345678901234567890"
- "1234567890123456789012345678901234567890"
- "1234567890123456789012345678901234567890"
- "1234567890123456789012345678901234567890"
- "1234567890123456789012345678901234567890")
- security_group['description'] = "group-description"
- response = _create_security_group_json(security_group)
-
- self.assertEquals(response.status_int, 400)
+ sg = security_group_template(name='1234567890' * 26)
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group': sg})
def test_create_security_group_above_255_characters_description(self):
- security_group = {}
- security_group['name'] = "test"
- security_group['description'] = ("1234567890123456"
- "1234567890123456789012345678901234567890"
- "1234567890123456789012345678901234567890"
- "1234567890123456789012345678901234567890"
- "1234567890123456789012345678901234567890"
- "1234567890123456789012345678901234567890"
- "1234567890123456789012345678901234567890")
- response = _create_security_group_json(security_group)
- self.assertEquals(response.status_int, 400)
+ sg = security_group_template(description='1234567890' * 26)
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group': sg})
def test_create_security_group_non_string_name(self):
- security_group = {}
- security_group['name'] = 12
- security_group['description'] = "group-description"
- response = _create_security_group_json(security_group)
- self.assertEquals(response.status_int, 400)
+ sg = security_group_template(name=12)
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group': sg})
def test_create_security_group_non_string_description(self):
- security_group = {}
- security_group['name'] = "test"
- security_group['description'] = 12
- response = _create_security_group_json(security_group)
- self.assertEquals(response.status_int, 400)
+ sg = security_group_template(description=12)
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group': sg})
def test_get_security_group_list(self):
- security_group = {}
- security_group['name'] = "test"
- security_group['description'] = "group-description"
- response = _create_security_group_json(security_group)
-
- req = webob.Request.blank('/v1.1/123/os-security-groups')
- req.headers['Content-Type'] = 'application/json'
- req.method = 'GET'
- response = req.get_response(fakes.wsgi_app())
- res_dict = json.loads(response.body)
-
- expected = {'security_groups': [
- {'id': 1,
- 'name':"default",
- 'tenant_id': "123",
- "description":"default",
- "rules": []
- },
- ]
- }
- expected['security_groups'].append(
- {
- 'id': 2,
- 'name': "test",
- 'tenant_id': "123",
- "description": "group-description",
- "rules": []
- }
- )
- self.assertEquals(response.status_int, 200)
+ groups = []
+ for i, name in enumerate(['default', 'test']):
+ sg = security_group_template(id=i + 1,
+ name=name,
+ description=name + '-desc',
+ rules=[])
+ groups.append(sg)
+ expected = {'security_groups': groups}
+
+ def return_security_groups(context, project_id):
+ return [security_group_db(sg) for sg in groups]
+
+ self.stubs.Set(nova.db, 'security_group_get_by_project',
+ return_security_groups)
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups')
+ res_dict = self.controller.index(req)
+
self.assertEquals(res_dict, expected)
def test_get_security_group_by_id(self):
- security_group = {}
- security_group['name'] = "test"
- security_group['description'] = "group-description"
- response = _create_security_group_json(security_group)
-
- res_dict = json.loads(response.body)
- req = webob.Request.blank('/v1.1/123/os-security-groups/%s' %
- res_dict['security_group']['id'])
- req.headers['Content-Type'] = 'application/json'
- req.method = 'GET'
- response = req.get_response(fakes.wsgi_app())
- res_dict = json.loads(response.body)
+ sg = security_group_template(id=2, rules=[])
- expected = {
- 'security_group': {
- 'id': 2,
- 'name': "test",
- 'tenant_id': "123",
- 'description': "group-description",
- 'rules': []
- }
- }
+ def return_security_group(context, group_id):
+ self.assertEquals(sg['id'], group_id)
+ return security_group_db(sg)
+
+ self.stubs.Set(nova.db, 'security_group_get',
+ return_security_group)
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups/2')
+ res_dict = self.controller.show(req, '2')
+
+ expected = {'security_group': sg}
self.assertEquals(res_dict, expected)
def test_get_security_group_by_invalid_id(self):
- req = webob.Request.blank('/v1.1/123/os-security-groups/invalid')
- req.headers['Content-Type'] = 'application/json'
- req.method = 'GET'
- response = req.get_response(fakes.wsgi_app())
- self.assertEquals(response.status_int, 400)
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups/invalid')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete,
+ req, 'invalid')
def test_get_security_group_by_non_existing_id(self):
- req = webob.Request.blank('/v1.1/123/os-security-groups/111111111')
- req.headers['Content-Type'] = 'application/json'
- req.method = 'GET'
- response = req.get_response(fakes.wsgi_app())
- self.assertEquals(response.status_int, 404)
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups/111111111')
+ self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete,
+ req, '111111111')
def test_delete_security_group_by_id(self):
- security_group = {}
- security_group['name'] = "test"
- security_group['description'] = "group-description"
- response = _create_security_group_json(security_group)
- security_group = json.loads(response.body)['security_group']
- response = self._delete_security_group(security_group['id'])
- self.assertEquals(response.status_int, 202)
+ sg = security_group_template(id=1, rules=[])
+
+ self.called = False
+
+ def security_group_destroy(context, id):
+ self.called = True
+
+ def return_security_group(context, group_id):
+ self.assertEquals(sg['id'], group_id)
+ return security_group_db(sg)
+
+ self.stubs.Set(nova.db, 'security_group_destroy',
+ security_group_destroy)
+ self.stubs.Set(nova.db, 'security_group_get',
+ return_security_group)
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups/1')
+ self.controller.delete(req, '1')
- response = self._delete_security_group(security_group['id'])
- self.assertEquals(response.status_int, 404)
+ self.assertTrue(self.called)
def test_delete_security_group_by_invalid_id(self):
- response = self._delete_security_group('invalid')
- self.assertEquals(response.status_int, 400)
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups/invalid')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete,
+ req, 'invalid')
def test_delete_security_group_by_non_existing_id(self):
- response = self._delete_security_group(11111111)
- self.assertEquals(response.status_int, 404)
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups/11111111')
+ self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete,
+ req, '11111111')
def test_associate_by_non_existing_security_group_name(self):
body = dict(addSecurityGroup=dict(name='non-existing'))
- req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID)
- req.headers['Content-Type'] = 'application/json'
- req.method = 'POST'
- req.body = json.dumps(body)
- response = req.get_response(fakes.wsgi_app())
- self.assertEquals(response.status_int, 404)
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action')
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.manager._addSecurityGroup, body, req, '1')
+
+ def test_associate_by_invalid_server_id(self):
+ body = dict(addSecurityGroup=dict(name='test'))
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/servers/invalid/action')
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.manager._addSecurityGroup, body, req, 'invalid')
def test_associate_without_body(self):
- req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID)
+ self.stubs.Set(nova.db, 'instance_get', return_server)
body = dict(addSecurityGroup=None)
- req.headers['Content-Type'] = 'application/json'
- req.method = 'POST'
- req.body = json.dumps(body)
- response = req.get_response(fakes.wsgi_app())
- self.assertEquals(response.status_int, 400)
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action')
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.manager._addSecurityGroup, body, req, '1')
def test_associate_no_security_group_name(self):
- req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID)
+ self.stubs.Set(nova.db, 'instance_get', return_server)
body = dict(addSecurityGroup=dict())
- req.headers['Content-Type'] = 'application/json'
- req.method = 'POST'
- req.body = json.dumps(body)
- response = req.get_response(fakes.wsgi_app())
- self.assertEquals(response.status_int, 400)
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action')
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.manager._addSecurityGroup, body, req, '1')
def test_associate_security_group_name_with_whitespaces(self):
- req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID)
+ self.stubs.Set(nova.db, 'instance_get', return_server)
body = dict(addSecurityGroup=dict(name=" "))
- req.headers['Content-Type'] = 'application/json'
- req.method = 'POST'
- req.body = json.dumps(body)
- response = req.get_response(fakes.wsgi_app())
- self.assertEquals(response.status_int, 400)
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action')
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.manager._addSecurityGroup, body, req, '1')
def test_associate_non_existing_instance(self):
self.stubs.Set(nova.db, 'instance_get', return_server_nonexistent)
self.stubs.Set(nova.db, 'instance_get_by_uuid',
return_server_nonexistent)
body = dict(addSecurityGroup=dict(name="test"))
- self.stubs.Set(nova.db, 'security_group_get_by_name',
- return_security_group)
- req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID)
- req.headers['Content-Type'] = 'application/json'
- req.method = 'POST'
- req.body = json.dumps(body)
- response = req.get_response(fakes.wsgi_app())
- self.assertEquals(response.status_int, 404)
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action')
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.manager._addSecurityGroup, body, req, '1')
def test_associate_non_running_instance(self):
self.stubs.Set(nova.db, 'instance_get', return_non_running_server)
@@ -377,26 +353,22 @@ class TestSecurityGroups(test.TestCase):
self.stubs.Set(nova.db, 'security_group_get_by_name',
return_security_group_without_instances)
body = dict(addSecurityGroup=dict(name="test"))
- req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID)
- req.headers['Content-Type'] = 'application/json'
- req.method = 'POST'
- req.body = json.dumps(body)
- response = req.get_response(fakes.wsgi_app())
- self.assertEquals(response.status_int, 400)
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action')
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.manager._addSecurityGroup, body, req, '1')
def test_associate_already_associated_security_group_to_instance(self):
self.stubs.Set(nova.db, 'instance_get', return_server)
self.stubs.Set(nova.db, 'instance_get_by_uuid',
return_server_by_uuid)
self.stubs.Set(nova.db, 'security_group_get_by_name',
- return_security_group)
+ return_security_group_by_name)
body = dict(addSecurityGroup=dict(name="test"))
- req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID)
- req.headers['Content-Type'] = 'application/json'
- req.method = 'POST'
- req.body = json.dumps(body)
- response = req.get_response(fakes.wsgi_app())
- self.assertEquals(response.status_int, 400)
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action')
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.manager._addSecurityGroup, body, req, '1')
def test_associate(self):
self.stubs.Set(nova.db, 'instance_get', return_server)
@@ -404,83 +376,79 @@ class TestSecurityGroups(test.TestCase):
return_server_by_uuid)
self.mox.StubOutWithMock(nova.db, 'instance_add_security_group')
nova.db.instance_add_security_group(mox.IgnoreArg(),
- mox.IgnoreArg(),
- mox.IgnoreArg())
+ mox.IgnoreArg(),
+ mox.IgnoreArg())
self.stubs.Set(nova.db, 'security_group_get_by_name',
return_security_group_without_instances)
self.mox.ReplayAll()
body = dict(addSecurityGroup=dict(name="test"))
- req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID)
- req.headers['Content-Type'] = 'application/json'
- req.method = 'POST'
- req.body = json.dumps(body)
- response = req.get_response(fakes.wsgi_app())
- self.assertEquals(response.status_int, 202)
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action')
+ self.manager._addSecurityGroup(body, req, '1')
def test_disassociate_by_non_existing_security_group_name(self):
body = dict(removeSecurityGroup=dict(name='non-existing'))
- req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID)
- req.headers['Content-Type'] = 'application/json'
- req.method = 'POST'
- req.body = json.dumps(body)
- response = req.get_response(fakes.wsgi_app())
- self.assertEquals(response.status_int, 404)
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action')
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.manager._removeSecurityGroup, body, req, '1')
+
+ def test_disassociate_by_invalid_server_id(self):
+ self.stubs.Set(nova.db, 'security_group_get_by_name',
+ return_security_group_by_name)
+ body = dict(removeSecurityGroup=dict(name='test'))
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/servers/invalid/action')
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.manager._removeSecurityGroup, body, req,
+ 'invalid')
def test_disassociate_without_body(self):
- req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID)
+ self.stubs.Set(nova.db, 'instance_get', return_server)
body = dict(removeSecurityGroup=None)
- req.headers['Content-Type'] = 'application/json'
- req.method = 'POST'
- req.body = json.dumps(body)
- response = req.get_response(fakes.wsgi_app())
- self.assertEquals(response.status_int, 400)
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action')
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.manager._removeSecurityGroup, body, req, '1')
def test_disassociate_no_security_group_name(self):
- req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID)
+ self.stubs.Set(nova.db, 'instance_get', return_server)
body = dict(removeSecurityGroup=dict())
- req.headers['Content-Type'] = 'application/json'
- req.method = 'POST'
- req.body = json.dumps(body)
- response = req.get_response(fakes.wsgi_app())
- self.assertEquals(response.status_int, 400)
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action')
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.manager._removeSecurityGroup, body, req, '1')
def test_disassociate_security_group_name_with_whitespaces(self):
- req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID)
+ self.stubs.Set(nova.db, 'instance_get', return_server)
body = dict(removeSecurityGroup=dict(name=" "))
- req.headers['Content-Type'] = 'application/json'
- req.method = 'POST'
- req.body = json.dumps(body)
- response = req.get_response(fakes.wsgi_app())
- self.assertEquals(response.status_int, 400)
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action')
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.manager._removeSecurityGroup, body, req, '1')
def test_disassociate_non_existing_instance(self):
self.stubs.Set(nova.db, 'instance_get', return_server_nonexistent)
- self.stubs.Set(nova.db, 'instance_get_by_uuid',
- return_server_nonexistent)
- body = dict(removeSecurityGroup=dict(name="test"))
self.stubs.Set(nova.db, 'security_group_get_by_name',
- return_security_group)
- req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID)
- req.headers['Content-Type'] = 'application/json'
- req.method = 'POST'
- req.body = json.dumps(body)
- response = req.get_response(fakes.wsgi_app())
- self.assertEquals(response.status_int, 404)
+ return_security_group_by_name)
+ body = dict(removeSecurityGroup=dict(name="test"))
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action')
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.manager._removeSecurityGroup, body, req, '1')
def test_disassociate_non_running_instance(self):
self.stubs.Set(nova.db, 'instance_get', return_non_running_server)
self.stubs.Set(nova.db, 'instance_get_by_uuid',
return_non_running_server)
self.stubs.Set(nova.db, 'security_group_get_by_name',
- return_security_group)
+ return_security_group_by_name)
body = dict(removeSecurityGroup=dict(name="test"))
- req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID)
- req.headers['Content-Type'] = 'application/json'
- req.method = 'POST'
- req.body = json.dumps(body)
- response = req.get_response(fakes.wsgi_app())
- self.assertEquals(response.status_int, 400)
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action')
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.manager._removeSecurityGroup, body, req, '1')
def test_disassociate_already_associated_security_group_to_instance(self):
self.stubs.Set(nova.db, 'instance_get', return_server)
@@ -489,12 +457,10 @@ class TestSecurityGroups(test.TestCase):
self.stubs.Set(nova.db, 'security_group_get_by_name',
return_security_group_without_instances)
body = dict(removeSecurityGroup=dict(name="test"))
- req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID)
- req.headers['Content-Type'] = 'application/json'
- req.method = 'POST'
- req.body = json.dumps(body)
- response = req.get_response(fakes.wsgi_app())
- self.assertEquals(response.status_int, 400)
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action')
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.manager._removeSecurityGroup, body, req, '1')
def test_disassociate(self):
self.stubs.Set(nova.db, 'instance_get', return_server)
@@ -505,278 +471,180 @@ class TestSecurityGroups(test.TestCase):
mox.IgnoreArg(),
mox.IgnoreArg())
self.stubs.Set(nova.db, 'security_group_get_by_name',
- return_security_group)
+ return_security_group_by_name)
self.mox.ReplayAll()
body = dict(removeSecurityGroup=dict(name="test"))
- req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID)
- req.headers['Content-Type'] = 'application/json'
- req.method = 'POST'
- req.body = json.dumps(body)
- response = req.get_response(fakes.wsgi_app())
- self.assertEquals(response.status_int, 202)
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action')
+ self.manager._removeSecurityGroup(body, req, '1')
class TestSecurityGroupRules(test.TestCase):
def setUp(self):
super(TestSecurityGroupRules, self).setUp()
- security_group = {}
- security_group['name'] = "authorize-revoke"
- security_group['description'] = ("Security group created for "
- " authorize-revoke testing")
- response = _create_security_group_json(security_group)
- security_group = json.loads(response.body)
- self.parent_security_group = security_group['security_group']
-
- rules = {
- "security_group_rule": {
- "ip_protocol": "tcp",
- "from_port": "22",
- "to_port": "22",
- "parent_group_id": self.parent_security_group['id'],
- "cidr": "10.0.0.0/24"
- }
- }
- res = self._create_security_group_rule_json(rules)
- self.assertEquals(res.status_int, 200)
- self.security_group_rule = json.loads(res.body)['security_group_rule']
+
+ controller = security_groups.SecurityGroupController()
+
+ sg1 = security_group_template(id=1)
+ sg2 = security_group_template(id=2,
+ name='authorize_revoke',
+ description='authorize-revoke testing')
+ db1 = security_group_db(sg1)
+ db2 = security_group_db(sg2)
+
+ def return_security_group(context, group_id):
+ if group_id == db1['id']:
+ return db1
+ if group_id == db2['id']:
+ return db2
+ raise exception.NotFound()
+
+ self.stubs.Set(nova.db, 'security_group_get',
+ return_security_group)
+
+ self.parent_security_group = db2
+
+ self.controller = security_groups.SecurityGroupRulesController()
def tearDown(self):
super(TestSecurityGroupRules, self).tearDown()
- def _create_security_group_rule_json(self, rules):
- request = webob.Request.blank('/v1.1/123/os-security-group-rules')
- request.headers['Content-Type'] = 'application/json'
- request.method = 'POST'
- request.body = json.dumps(rules)
- response = request.get_response(fakes.wsgi_app())
- return response
-
- def _delete_security_group_rule(self, id):
- request = webob.Request.blank('/v1.1/123/os-security-group-rules/%s'
- % id)
- request.method = 'DELETE'
- response = request.get_response(fakes.wsgi_app())
- return response
-
def test_create_by_cidr(self):
- rules = {
- "security_group_rule": {
- "ip_protocol": "tcp",
- "from_port": "22",
- "to_port": "22",
- "parent_group_id": 2,
- "cidr": "10.2.3.124/24"
- }
- }
-
- response = self._create_security_group_rule_json(rules)
- security_group_rule = json.loads(response.body)['security_group_rule']
- self.assertEquals(response.status_int, 200)
+ rule = security_group_rule_template(cidr='10.2.3.124/24')
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules')
+ res_dict = self.controller.create(req, {'security_group_rule': rule})
+
+ security_group_rule = res_dict['security_group_rule']
self.assertNotEquals(security_group_rule['id'], 0)
self.assertEquals(security_group_rule['parent_group_id'], 2)
self.assertEquals(security_group_rule['ip_range']['cidr'],
"10.2.3.124/24")
def test_create_by_group_id(self):
- rules = {
- "security_group_rule": {
- "ip_protocol": "tcp",
- "from_port": "22",
- "to_port": "22",
- "group_id": "1",
- "parent_group_id": "%s"
- % self.parent_security_group['id'],
- }
- }
-
- response = self._create_security_group_rule_json(rules)
- self.assertEquals(response.status_int, 200)
- security_group_rule = json.loads(response.body)['security_group_rule']
+ rule = security_group_rule_template(group_id='1')
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules')
+ res_dict = self.controller.create(req, {'security_group_rule': rule})
+
+ security_group_rule = res_dict['security_group_rule']
self.assertNotEquals(security_group_rule['id'], 0)
self.assertEquals(security_group_rule['parent_group_id'], 2)
def test_create_add_existing_rules(self):
- rules = {
- "security_group_rule": {
- "ip_protocol": "tcp",
- "from_port": "22",
- "to_port": "22",
- "cidr": "10.0.0.0/24",
- "parent_group_id": "%s" % self.parent_security_group['id'],
- }
- }
-
- response = self._create_security_group_rule_json(rules)
- self.assertEquals(response.status_int, 400)
+ rule = security_group_rule_template(cidr='10.0.0.0/24')
+
+ self.parent_security_group['rules'] = [security_group_rule_db(rule)]
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group_rule': rule})
def test_create_with_no_body(self):
- request = webob.Request.blank('/v1.1/123/os-security-group-rules')
- request.headers['Content-Type'] = 'application/json'
- request.method = 'POST'
- request.body = json.dumps(None)
- response = request.get_response(fakes.wsgi_app())
- self.assertEquals(response.status_int, 422)
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules')
+ self.assertRaises(webob.exc.HTTPUnprocessableEntity,
+ self.controller.create, req, None)
def test_create_with_no_security_group_rule_in_body(self):
- request = webob.Request.blank('/v1.1/123/os-security-group-rules')
- request.headers['Content-Type'] = 'application/json'
- request.method = 'POST'
- body_dict = {'test': "test"}
- request.body = json.dumps(body_dict)
- response = request.get_response(fakes.wsgi_app())
- self.assertEquals(response.status_int, 422)
+ rules = {'test': 'test'}
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules')
+ self.assertRaises(webob.exc.HTTPUnprocessableEntity,
+ self.controller.create, req, rules)
def test_create_with_invalid_parent_group_id(self):
- rules = {
- "security_group_rule": {
- "ip_protocol": "tcp",
- "from_port": "22",
- "to_port": "22",
- "parent_group_id": "invalid"
- }
- }
-
- response = self._create_security_group_rule_json(rules)
- self.assertEquals(response.status_int, 400)
+ rule = security_group_rule_template(parent_group_id='invalid')
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group_rule': rule})
def test_create_with_non_existing_parent_group_id(self):
- rules = {
- "security_group_rule": {
- "ip_protocol": "tcp",
- "from_port": "22",
- "to_port": "22",
- "group_id": "invalid",
- "parent_group_id": "1111111111111"
- }
- }
-
- response = self._create_security_group_rule_json(rules)
- self.assertEquals(response.status_int, 404)
+ rule = security_group_rule_template(group_id='invalid',
+ parent_group_id='1111111111111')
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules')
+ self.assertRaises(webob.exc.HTTPNotFound, self.controller.create,
+ req, {'security_group_rule': rule})
def test_create_with_invalid_protocol(self):
- rules = {
- "security_group_rule": {
- "ip_protocol": "invalid-protocol",
- "from_port": "22",
- "to_port": "22",
- "cidr": "10.2.2.0/24",
- "parent_group_id": "%s" % self.parent_security_group['id'],
- }
- }
-
- response = self._create_security_group_rule_json(rules)
- self.assertEquals(response.status_int, 400)
+ rule = security_group_rule_template(ip_protocol='invalid-protocol',
+ cidr='10.2.2.0/24')
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group_rule': rule})
def test_create_with_no_protocol(self):
- rules = {
- "security_group_rule": {
- "from_port": "22",
- "to_port": "22",
- "cidr": "10.2.2.0/24",
- "parent_group_id": "%s" % self.parent_security_group['id'],
- }
- }
-
- response = self._create_security_group_rule_json(rules)
- self.assertEquals(response.status_int, 400)
+ rule = security_group_rule_template(cidr='10.2.2.0/24')
+ del rule['ip_protocol']
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group_rule': rule})
def test_create_with_invalid_from_port(self):
- rules = {
- "security_group_rule": {
- "ip_protocol": "tcp",
- "from_port": "666666",
- "to_port": "22",
- "cidr": "10.2.2.0/24",
- "parent_group_id": "%s" % self.parent_security_group['id'],
- }
- }
-
- response = self._create_security_group_rule_json(rules)
- self.assertEquals(response.status_int, 400)
+ rule = security_group_rule_template(from_port='666666',
+ cidr='10.2.2.0/24')
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group_rule': rule})
def test_create_with_invalid_to_port(self):
- rules = {
- "security_group_rule": {
- "ip_protocol": "tcp",
- "from_port": "22",
- "to_port": "666666",
- "cidr": "10.2.2.0/24",
- "parent_group_id": "%s" % self.parent_security_group['id'],
- }
- }
-
- response = self._create_security_group_rule_json(rules)
- self.assertEquals(response.status_int, 400)
+ rule = security_group_rule_template(to_port='666666',
+ cidr='10.2.2.0/24')
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group_rule': rule})
def test_create_with_non_numerical_from_port(self):
- rules = {
- "security_group_rule": {
- "ip_protocol": "tcp",
- "from_port": "invalid",
- "to_port": "22",
- "cidr": "10.2.2.0/24",
- "parent_group_id": "%s" % self.parent_security_group['id'],
- }
- }
-
- response = self._create_security_group_rule_json(rules)
- self.assertEquals(response.status_int, 400)
+ rule = security_group_rule_template(from_port='invalid',
+ cidr='10.2.2.0/24')
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group_rule': rule})
def test_create_with_non_numerical_to_port(self):
- rules = {
- "security_group_rule": {
- "ip_protocol": "tcp",
- "from_port": "22",
- "to_port": "invalid",
- "cidr": "10.2.2.0/24",
- "parent_group_id": "%s" % self.parent_security_group['id'],
- }
- }
-
- response = self._create_security_group_rule_json(rules)
- self.assertEquals(response.status_int, 400)
+ rule = security_group_rule_template(to_port='invalid',
+ cidr='10.2.2.0/24')
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group_rule': rule})
+
+ def test_create_with_no_from_port(self):
+ rule = security_group_rule_template(cidr='10.2.2.0/24')
+ del rule['from_port']
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group_rule': rule})
def test_create_with_no_to_port(self):
- rules = {
- "security_group_rule": {
- "ip_protocol": "tcp",
- "from_port": "22",
- "cidr": "10.2.2.0/24",
- "parent_group_id": "%s" % self.parent_security_group['id'],
- }
- }
-
- response = self._create_security_group_rule_json(rules)
- self.assertEquals(response.status_int, 400)
+ rule = security_group_rule_template(cidr='10.2.2.0/24')
+ del rule['to_port']
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group_rule': rule})
def test_create_with_invalid_cidr(self):
- rules = {
- "security_group_rule": {
- "ip_protocol": "tcp",
- "from_port": "22",
- "to_port": "22",
- "cidr": "10.2.22222.0/24",
- "parent_group_id": "%s" % self.parent_security_group['id'],
- }
- }
-
- response = self._create_security_group_rule_json(rules)
- self.assertEquals(response.status_int, 400)
+ rule = security_group_rule_template(cidr='10.2.2222.0/24')
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group_rule': rule})
def test_create_with_no_cidr_group(self):
- rules = {
- "security_group_rule": {
- "ip_protocol": "tcp",
- "from_port": "22",
- "to_port": "22",
- "parent_group_id": "%s" % self.parent_security_group['id'],
- }
- }
-
- response = self._create_security_group_rule_json(rules)
- security_group_rule = json.loads(response.body)['security_group_rule']
- self.assertEquals(response.status_int, 200)
+ rule = security_group_rule_template()
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules')
+ res_dict = self.controller.create(req, {'security_group_rule': rule})
+
+ security_group_rule = res_dict['security_group_rule']
self.assertNotEquals(security_group_rule['id'], 0)
self.assertEquals(security_group_rule['parent_group_id'],
self.parent_security_group['id'])
@@ -784,77 +652,61 @@ class TestSecurityGroupRules(test.TestCase):
"0.0.0.0/0")
def test_create_with_invalid_group_id(self):
- rules = {
- "security_group_rule": {
- "ip_protocol": "tcp",
- "from_port": "22",
- "to_port": "22",
- "group_id": "invalid",
- "parent_group_id": "%s" % self.parent_security_group['id'],
- }
- }
-
- response = self._create_security_group_rule_json(rules)
- self.assertEquals(response.status_int, 400)
+ rule = security_group_rule_template(group_id='invalid')
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group_rule': rule})
def test_create_with_empty_group_id(self):
- rules = {
- "security_group_rule": {
- "ip_protocol": "tcp",
- "from_port": "22",
- "to_port": "22",
- "group_id": "",
- "parent_group_id": "%s" % self.parent_security_group['id'],
- }
- }
-
- response = self._create_security_group_rule_json(rules)
- self.assertEquals(response.status_int, 400)
+ rule = security_group_rule_template(group_id='')
- def test_create_with_invalid_group_id(self):
- rules = {
- "security_group_rule": {
- "ip_protocol": "tcp",
- "from_port": "22",
- "to_port": "22",
- "group_id": "222222",
- "parent_group_id": "%s" % self.parent_security_group['id'],
- }
- }
-
- response = self._create_security_group_rule_json(rules)
- self.assertEquals(response.status_int, 400)
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group_rule': rule})
+
+ def test_create_with_nonexist_group_id(self):
+ rule = security_group_rule_template(group_id='222222')
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group_rule': rule})
def test_create_rule_with_same_group_parent_id(self):
- rules = {
- "security_group_rule": {
- "ip_protocol": "tcp",
- "from_port": "22",
- "to_port": "22",
- "group_id": "%s" % self.parent_security_group['id'],
- "parent_group_id": "%s" % self.parent_security_group['id'],
- }
- }
-
- response = self._create_security_group_rule_json(rules)
- self.assertEquals(response.status_int, 400)
+ rule = security_group_rule_template(group_id=2)
+
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group_rule': rule})
def test_delete(self):
- response = self._delete_security_group_rule(
- self.security_group_rule['id'])
- self.assertEquals(response.status_int, 202)
+ rule = security_group_rule_template(id=10)
+
+ def security_group_rule_get(context, id):
+ return security_group_rule_db(rule)
+
+ def security_group_rule_destroy(context, id):
+ pass
+
+ self.stubs.Set(nova.db, 'security_group_rule_get',
+ security_group_rule_get)
+ self.stubs.Set(nova.db, 'security_group_rule_destroy',
+ security_group_rule_destroy)
- response = self._delete_security_group_rule(
- self.security_group_rule['id'])
- self.assertEquals(response.status_int, 404)
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules/10')
+ self.controller.delete(req, '10')
def test_delete_invalid_rule_id(self):
- response = self._delete_security_group_rule('invalid')
- self.assertEquals(response.status_int, 400)
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules' +
+ '/invalid')
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete,
+ req, 'invalid')
def test_delete_non_existing_rule_id(self):
- response = self._delete_security_group_rule(22222222222222)
- self.assertEquals(response.status_int, 404)
+ req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules' +
+ '/22222222222222')
+ self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete,
+ req, '22222222222222')
class TestSecurityGroupRulesXMLDeserializer(unittest.TestCase):