diff options
| -rw-r--r-- | nova/tests/api/openstack/contrib/test_security_groups.py | 1020 |
1 files changed, 436 insertions, 584 deletions
diff --git a/nova/tests/api/openstack/contrib/test_security_groups.py b/nova/tests/api/openstack/contrib/test_security_groups.py index 92b721d30..f55ce4a55 100644 --- a/nova/tests/api/openstack/contrib/test_security_groups.py +++ b/nova/tests/api/openstack/contrib/test_security_groups.py @@ -30,31 +30,44 @@ from nova.tests.api.openstack import fakes FAKE_UUID = 'a47ae74e-ab08-447f-8eee-ffd43fc46c16' -def _get_create_request_json(body_dict): - req = webob.Request.blank('/v1.1/123/os-security-groups') - req.headers['Content-Type'] = 'application/json' - req.method = 'POST' - req.body = json.dumps(body_dict) - return req +class AttrDict(dict): + def __getattr__(self, k): + return self[k] -def _create_security_group_json(security_group): - body_dict = _create_security_group_request_dict(security_group) - request = _get_create_request_json(body_dict) - response = request.get_response(fakes.wsgi_app()) - return response +def security_group_template(**kwargs): + sg = kwargs.copy() + sg.setdefault('tenant_id', '123') + sg.setdefault('name', 'test') + sg.setdefault('description', 'test-description') + return sg -def _create_security_group_request_dict(security_group): - sg = {} - if security_group is not None: - name = security_group.get('name', None) - description = security_group.get('description', None) - if name: - sg['name'] = security_group['name'] - if description: - sg['description'] = security_group['description'] - return {'security_group': sg} +def security_group_db(security_group, id=None): + attrs = security_group.copy() + if 'tenant_id' in attrs: + attrs['project_id'] = attrs.pop('tenant_id') + if id is not None: + attrs['id'] = id + attrs.setdefault('rules', []) + attrs.setdefault('instances', []) + return AttrDict(attrs) + + +def security_group_rule_template(**kwargs): + rule = kwargs.copy() + rule.setdefault('ip_protocol', 'tcp') + rule.setdefault('from_port', 22) + rule.setdefault('to_port', 22) + rule.setdefault('parent_group_id', 2) + return rule + + +def security_group_rule_db(rule, id=None): + attrs = rule.copy() + if 'ip_protocol' in attrs: + attrs['protocol'] = attrs.pop('ip_protocol') + return AttrDict(attrs) def return_server(context, server_id): @@ -72,13 +85,11 @@ def return_server_by_uuid(context, server_uuid): def return_non_running_server(context, server_id): - return {'id': server_id, 'state': 0x02, - 'host': "localhost"} + return {'id': server_id, 'state': 0x02, 'host': "localhost"} -def return_security_group(context, project_id, group_name): - return {'id': 1, 'name': group_name, "instances": [ - {'id': 1}]} +def return_security_group_by_name(context, project_id, group_name): + return {'id': 1, 'name': group_name, "instances": [{'id': 1}]} def return_security_group_without_instances(context, project_id, group_name): @@ -89,286 +100,251 @@ def return_server_nonexistent(context, server_id): raise exception.InstanceNotFound(instance_id=server_id) +class StubExtensionManager(object): + def register(self, *args): + pass + + class TestSecurityGroups(test.TestCase): def setUp(self): super(TestSecurityGroups, self).setUp() + self.controller = security_groups.SecurityGroupController() + self.manager = security_groups.Security_groups(StubExtensionManager()) + def tearDown(self): super(TestSecurityGroups, self).tearDown() - def _create_security_group_request_dict(self, security_group): - sg = {} - if security_group is not None: - name = security_group.get('name', None) - description = security_group.get('description', None) - if name: - sg['name'] = security_group['name'] - if description: - sg['description'] = security_group['description'] - return {'security_group': sg} - - def _delete_security_group(self, id): - request = webob.Request.blank('/v1.1/123/os-security-groups/%s' - % id) - request.method = 'DELETE' - response = request.get_response(fakes.wsgi_app()) - return response - def test_create_security_group(self): - security_group = {} - security_group['name'] = "test" - security_group['description'] = "group-description" - response = _create_security_group_json(security_group) - res_dict = json.loads(response.body) - self.assertEqual(res_dict['security_group']['name'], "test") + sg = security_group_template() + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups') + res_dict = self.controller.create(req, {'security_group': sg}) + self.assertEqual(res_dict['security_group']['name'], 'test') self.assertEqual(res_dict['security_group']['description'], - "group-description") - self.assertEquals(response.status_int, 200) + 'test-description') def test_create_security_group_with_no_name(self): - security_group = {} - security_group['description'] = "group-description" - response = _create_security_group_json(security_group) - self.assertEquals(response.status_int, 400) + sg = security_group_template() + del sg['name'] + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups') + self.assertRaises(webob.exc.HTTPUnprocessableEntity, + self.controller.create, req, sg) def test_create_security_group_with_no_description(self): - security_group = {} - security_group['name'] = "test" - response = _create_security_group_json(security_group) - self.assertEquals(response.status_int, 400) + sg = security_group_template() + del sg['description'] + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group': sg}) def test_create_security_group_with_blank_name(self): - security_group = {} - security_group['name'] = "" - security_group['description'] = "group-description" - response = _create_security_group_json(security_group) - self.assertEquals(response.status_int, 400) + sg = security_group_template(name='') + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group': sg}) def test_create_security_group_with_whitespace_name(self): - security_group = {} - security_group['name'] = " " - security_group['description'] = "group-description" - response = _create_security_group_json(security_group) - self.assertEquals(response.status_int, 400) + sg = security_group_template(name=' ') + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group': sg}) def test_create_security_group_with_blank_description(self): - security_group = {} - security_group['name'] = "test" - security_group['description'] = "" - response = _create_security_group_json(security_group) - self.assertEquals(response.status_int, 400) + sg = security_group_template(description='') + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group': sg}) def test_create_security_group_with_whitespace_description(self): - security_group = {} - security_group['name'] = "name" - security_group['description'] = " " - response = _create_security_group_json(security_group) - self.assertEquals(response.status_int, 400) + sg = security_group_template(description=' ') + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group': sg}) def test_create_security_group_with_duplicate_name(self): - security_group = {} - security_group['name'] = "test" - security_group['description'] = "group-description" - response = _create_security_group_json(security_group) + sg = security_group_template() + + # FIXME: Stub out _get instead of creating twice + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups') + self.controller.create(req, {'security_group': sg}) - self.assertEquals(response.status_int, 200) - response = _create_security_group_json(security_group) - self.assertEquals(response.status_int, 400) + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group': sg}) def test_create_security_group_with_no_body(self): - request = _get_create_request_json(body_dict=None) - response = request.get_response(fakes.wsgi_app()) - self.assertEquals(response.status_int, 422) + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups') + self.assertRaises(webob.exc.HTTPUnprocessableEntity, + self.controller.create, req, None) def test_create_security_group_with_no_security_group(self): - body_dict = {} - body_dict['no-securityGroup'] = None - request = _get_create_request_json(body_dict) - response = request.get_response(fakes.wsgi_app()) - self.assertEquals(response.status_int, 422) + body = {'no-securityGroup': None} + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups') + self.assertRaises(webob.exc.HTTPUnprocessableEntity, + self.controller.create, req, body) def test_create_security_group_above_255_characters_name(self): - security_group = {} - security_group['name'] = ("1234567890123456" - "1234567890123456789012345678901234567890" - "1234567890123456789012345678901234567890" - "1234567890123456789012345678901234567890" - "1234567890123456789012345678901234567890" - "1234567890123456789012345678901234567890" - "1234567890123456789012345678901234567890") - security_group['description'] = "group-description" - response = _create_security_group_json(security_group) - - self.assertEquals(response.status_int, 400) + sg = security_group_template(name='1234567890' * 26) + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group': sg}) def test_create_security_group_above_255_characters_description(self): - security_group = {} - security_group['name'] = "test" - security_group['description'] = ("1234567890123456" - "1234567890123456789012345678901234567890" - "1234567890123456789012345678901234567890" - "1234567890123456789012345678901234567890" - "1234567890123456789012345678901234567890" - "1234567890123456789012345678901234567890" - "1234567890123456789012345678901234567890") - response = _create_security_group_json(security_group) - self.assertEquals(response.status_int, 400) + sg = security_group_template(description='1234567890' * 26) + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group': sg}) def test_create_security_group_non_string_name(self): - security_group = {} - security_group['name'] = 12 - security_group['description'] = "group-description" - response = _create_security_group_json(security_group) - self.assertEquals(response.status_int, 400) + sg = security_group_template(name=12) + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group': sg}) def test_create_security_group_non_string_description(self): - security_group = {} - security_group['name'] = "test" - security_group['description'] = 12 - response = _create_security_group_json(security_group) - self.assertEquals(response.status_int, 400) + sg = security_group_template(description=12) + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group': sg}) def test_get_security_group_list(self): - security_group = {} - security_group['name'] = "test" - security_group['description'] = "group-description" - response = _create_security_group_json(security_group) - - req = webob.Request.blank('/v1.1/123/os-security-groups') - req.headers['Content-Type'] = 'application/json' - req.method = 'GET' - response = req.get_response(fakes.wsgi_app()) - res_dict = json.loads(response.body) - - expected = {'security_groups': [ - {'id': 1, - 'name':"default", - 'tenant_id': "123", - "description":"default", - "rules": [] - }, - ] - } - expected['security_groups'].append( - { - 'id': 2, - 'name': "test", - 'tenant_id': "123", - "description": "group-description", - "rules": [] - } - ) - self.assertEquals(response.status_int, 200) + groups = [] + for i, name in enumerate(['default', 'test']): + sg = security_group_template(id=i + 1, + name=name, + description=name + '-desc', + rules=[]) + groups.append(sg) + expected = {'security_groups': groups} + + def return_security_groups(context, project_id): + return [security_group_db(sg) for sg in groups] + + self.stubs.Set(nova.db, 'security_group_get_by_project', + return_security_groups) + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups') + res_dict = self.controller.index(req) + self.assertEquals(res_dict, expected) def test_get_security_group_by_id(self): - security_group = {} - security_group['name'] = "test" - security_group['description'] = "group-description" - response = _create_security_group_json(security_group) - - res_dict = json.loads(response.body) - req = webob.Request.blank('/v1.1/123/os-security-groups/%s' % - res_dict['security_group']['id']) - req.headers['Content-Type'] = 'application/json' - req.method = 'GET' - response = req.get_response(fakes.wsgi_app()) - res_dict = json.loads(response.body) + sg = security_group_template(id=2, rules=[]) - expected = { - 'security_group': { - 'id': 2, - 'name': "test", - 'tenant_id': "123", - 'description': "group-description", - 'rules': [] - } - } + def return_security_group(context, group_id): + self.assertEquals(sg['id'], group_id) + return security_group_db(sg) + + self.stubs.Set(nova.db, 'security_group_get', + return_security_group) + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups/2') + res_dict = self.controller.show(req, '2') + + expected = {'security_group': sg} self.assertEquals(res_dict, expected) def test_get_security_group_by_invalid_id(self): - req = webob.Request.blank('/v1.1/123/os-security-groups/invalid') - req.headers['Content-Type'] = 'application/json' - req.method = 'GET' - response = req.get_response(fakes.wsgi_app()) - self.assertEquals(response.status_int, 400) + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups/invalid') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete, + req, 'invalid') def test_get_security_group_by_non_existing_id(self): - req = webob.Request.blank('/v1.1/123/os-security-groups/111111111') - req.headers['Content-Type'] = 'application/json' - req.method = 'GET' - response = req.get_response(fakes.wsgi_app()) - self.assertEquals(response.status_int, 404) + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups/111111111') + self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete, + req, '111111111') def test_delete_security_group_by_id(self): - security_group = {} - security_group['name'] = "test" - security_group['description'] = "group-description" - response = _create_security_group_json(security_group) - security_group = json.loads(response.body)['security_group'] - response = self._delete_security_group(security_group['id']) - self.assertEquals(response.status_int, 202) + sg = security_group_template(id=1, rules=[]) + + self.called = False + + def security_group_destroy(context, id): + self.called = True + + def return_security_group(context, group_id): + self.assertEquals(sg['id'], group_id) + return security_group_db(sg) + + self.stubs.Set(nova.db, 'security_group_destroy', + security_group_destroy) + self.stubs.Set(nova.db, 'security_group_get', + return_security_group) + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups/1') + self.controller.delete(req, '1') - response = self._delete_security_group(security_group['id']) - self.assertEquals(response.status_int, 404) + self.assertTrue(self.called) def test_delete_security_group_by_invalid_id(self): - response = self._delete_security_group('invalid') - self.assertEquals(response.status_int, 400) + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups/invalid') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete, + req, 'invalid') def test_delete_security_group_by_non_existing_id(self): - response = self._delete_security_group(11111111) - self.assertEquals(response.status_int, 404) + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-groups/11111111') + self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete, + req, '11111111') def test_associate_by_non_existing_security_group_name(self): body = dict(addSecurityGroup=dict(name='non-existing')) - req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID) - req.headers['Content-Type'] = 'application/json' - req.method = 'POST' - req.body = json.dumps(body) - response = req.get_response(fakes.wsgi_app()) - self.assertEquals(response.status_int, 404) + + req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action') + self.assertRaises(webob.exc.HTTPNotFound, + self.manager._addSecurityGroup, body, req, '1') + + def test_associate_by_invalid_server_id(self): + body = dict(addSecurityGroup=dict(name='test')) + + req = fakes.HTTPRequest.blank('/v1.1/123/servers/invalid/action') + self.assertRaises(webob.exc.HTTPNotFound, + self.manager._addSecurityGroup, body, req, 'invalid') def test_associate_without_body(self): - req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID) + self.stubs.Set(nova.db, 'instance_get', return_server) body = dict(addSecurityGroup=None) - req.headers['Content-Type'] = 'application/json' - req.method = 'POST' - req.body = json.dumps(body) - response = req.get_response(fakes.wsgi_app()) - self.assertEquals(response.status_int, 400) + + req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action') + self.assertRaises(webob.exc.HTTPBadRequest, + self.manager._addSecurityGroup, body, req, '1') def test_associate_no_security_group_name(self): - req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID) + self.stubs.Set(nova.db, 'instance_get', return_server) body = dict(addSecurityGroup=dict()) - req.headers['Content-Type'] = 'application/json' - req.method = 'POST' - req.body = json.dumps(body) - response = req.get_response(fakes.wsgi_app()) - self.assertEquals(response.status_int, 400) + + req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action') + self.assertRaises(webob.exc.HTTPBadRequest, + self.manager._addSecurityGroup, body, req, '1') def test_associate_security_group_name_with_whitespaces(self): - req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID) + self.stubs.Set(nova.db, 'instance_get', return_server) body = dict(addSecurityGroup=dict(name=" ")) - req.headers['Content-Type'] = 'application/json' - req.method = 'POST' - req.body = json.dumps(body) - response = req.get_response(fakes.wsgi_app()) - self.assertEquals(response.status_int, 400) + + req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action') + self.assertRaises(webob.exc.HTTPBadRequest, + self.manager._addSecurityGroup, body, req, '1') def test_associate_non_existing_instance(self): self.stubs.Set(nova.db, 'instance_get', return_server_nonexistent) self.stubs.Set(nova.db, 'instance_get_by_uuid', return_server_nonexistent) body = dict(addSecurityGroup=dict(name="test")) - self.stubs.Set(nova.db, 'security_group_get_by_name', - return_security_group) - req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID) - req.headers['Content-Type'] = 'application/json' - req.method = 'POST' - req.body = json.dumps(body) - response = req.get_response(fakes.wsgi_app()) - self.assertEquals(response.status_int, 404) + + req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action') + self.assertRaises(webob.exc.HTTPNotFound, + self.manager._addSecurityGroup, body, req, '1') def test_associate_non_running_instance(self): self.stubs.Set(nova.db, 'instance_get', return_non_running_server) @@ -377,26 +353,22 @@ class TestSecurityGroups(test.TestCase): self.stubs.Set(nova.db, 'security_group_get_by_name', return_security_group_without_instances) body = dict(addSecurityGroup=dict(name="test")) - req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID) - req.headers['Content-Type'] = 'application/json' - req.method = 'POST' - req.body = json.dumps(body) - response = req.get_response(fakes.wsgi_app()) - self.assertEquals(response.status_int, 400) + + req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action') + self.assertRaises(webob.exc.HTTPBadRequest, + self.manager._addSecurityGroup, body, req, '1') def test_associate_already_associated_security_group_to_instance(self): self.stubs.Set(nova.db, 'instance_get', return_server) self.stubs.Set(nova.db, 'instance_get_by_uuid', return_server_by_uuid) self.stubs.Set(nova.db, 'security_group_get_by_name', - return_security_group) + return_security_group_by_name) body = dict(addSecurityGroup=dict(name="test")) - req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID) - req.headers['Content-Type'] = 'application/json' - req.method = 'POST' - req.body = json.dumps(body) - response = req.get_response(fakes.wsgi_app()) - self.assertEquals(response.status_int, 400) + + req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action') + self.assertRaises(webob.exc.HTTPBadRequest, + self.manager._addSecurityGroup, body, req, '1') def test_associate(self): self.stubs.Set(nova.db, 'instance_get', return_server) @@ -404,83 +376,79 @@ class TestSecurityGroups(test.TestCase): return_server_by_uuid) self.mox.StubOutWithMock(nova.db, 'instance_add_security_group') nova.db.instance_add_security_group(mox.IgnoreArg(), - mox.IgnoreArg(), - mox.IgnoreArg()) + mox.IgnoreArg(), + mox.IgnoreArg()) self.stubs.Set(nova.db, 'security_group_get_by_name', return_security_group_without_instances) self.mox.ReplayAll() body = dict(addSecurityGroup=dict(name="test")) - req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID) - req.headers['Content-Type'] = 'application/json' - req.method = 'POST' - req.body = json.dumps(body) - response = req.get_response(fakes.wsgi_app()) - self.assertEquals(response.status_int, 202) + + req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action') + self.manager._addSecurityGroup(body, req, '1') def test_disassociate_by_non_existing_security_group_name(self): body = dict(removeSecurityGroup=dict(name='non-existing')) - req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID) - req.headers['Content-Type'] = 'application/json' - req.method = 'POST' - req.body = json.dumps(body) - response = req.get_response(fakes.wsgi_app()) - self.assertEquals(response.status_int, 404) + + req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action') + self.assertRaises(webob.exc.HTTPNotFound, + self.manager._removeSecurityGroup, body, req, '1') + + def test_disassociate_by_invalid_server_id(self): + self.stubs.Set(nova.db, 'security_group_get_by_name', + return_security_group_by_name) + body = dict(removeSecurityGroup=dict(name='test')) + + req = fakes.HTTPRequest.blank('/v1.1/123/servers/invalid/action') + self.assertRaises(webob.exc.HTTPNotFound, + self.manager._removeSecurityGroup, body, req, + 'invalid') def test_disassociate_without_body(self): - req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID) + self.stubs.Set(nova.db, 'instance_get', return_server) body = dict(removeSecurityGroup=None) - req.headers['Content-Type'] = 'application/json' - req.method = 'POST' - req.body = json.dumps(body) - response = req.get_response(fakes.wsgi_app()) - self.assertEquals(response.status_int, 400) + + req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action') + self.assertRaises(webob.exc.HTTPBadRequest, + self.manager._removeSecurityGroup, body, req, '1') def test_disassociate_no_security_group_name(self): - req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID) + self.stubs.Set(nova.db, 'instance_get', return_server) body = dict(removeSecurityGroup=dict()) - req.headers['Content-Type'] = 'application/json' - req.method = 'POST' - req.body = json.dumps(body) - response = req.get_response(fakes.wsgi_app()) - self.assertEquals(response.status_int, 400) + + req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action') + self.assertRaises(webob.exc.HTTPBadRequest, + self.manager._removeSecurityGroup, body, req, '1') def test_disassociate_security_group_name_with_whitespaces(self): - req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID) + self.stubs.Set(nova.db, 'instance_get', return_server) body = dict(removeSecurityGroup=dict(name=" ")) - req.headers['Content-Type'] = 'application/json' - req.method = 'POST' - req.body = json.dumps(body) - response = req.get_response(fakes.wsgi_app()) - self.assertEquals(response.status_int, 400) + + req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action') + self.assertRaises(webob.exc.HTTPBadRequest, + self.manager._removeSecurityGroup, body, req, '1') def test_disassociate_non_existing_instance(self): self.stubs.Set(nova.db, 'instance_get', return_server_nonexistent) - self.stubs.Set(nova.db, 'instance_get_by_uuid', - return_server_nonexistent) - body = dict(removeSecurityGroup=dict(name="test")) self.stubs.Set(nova.db, 'security_group_get_by_name', - return_security_group) - req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID) - req.headers['Content-Type'] = 'application/json' - req.method = 'POST' - req.body = json.dumps(body) - response = req.get_response(fakes.wsgi_app()) - self.assertEquals(response.status_int, 404) + return_security_group_by_name) + body = dict(removeSecurityGroup=dict(name="test")) + + req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action') + self.assertRaises(webob.exc.HTTPNotFound, + self.manager._removeSecurityGroup, body, req, '1') def test_disassociate_non_running_instance(self): self.stubs.Set(nova.db, 'instance_get', return_non_running_server) self.stubs.Set(nova.db, 'instance_get_by_uuid', return_non_running_server) self.stubs.Set(nova.db, 'security_group_get_by_name', - return_security_group) + return_security_group_by_name) body = dict(removeSecurityGroup=dict(name="test")) - req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID) - req.headers['Content-Type'] = 'application/json' - req.method = 'POST' - req.body = json.dumps(body) - response = req.get_response(fakes.wsgi_app()) - self.assertEquals(response.status_int, 400) + + req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action') + self.assertRaises(webob.exc.HTTPBadRequest, + self.manager._removeSecurityGroup, body, req, '1') def test_disassociate_already_associated_security_group_to_instance(self): self.stubs.Set(nova.db, 'instance_get', return_server) @@ -489,12 +457,10 @@ class TestSecurityGroups(test.TestCase): self.stubs.Set(nova.db, 'security_group_get_by_name', return_security_group_without_instances) body = dict(removeSecurityGroup=dict(name="test")) - req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID) - req.headers['Content-Type'] = 'application/json' - req.method = 'POST' - req.body = json.dumps(body) - response = req.get_response(fakes.wsgi_app()) - self.assertEquals(response.status_int, 400) + + req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action') + self.assertRaises(webob.exc.HTTPBadRequest, + self.manager._removeSecurityGroup, body, req, '1') def test_disassociate(self): self.stubs.Set(nova.db, 'instance_get', return_server) @@ -505,278 +471,180 @@ class TestSecurityGroups(test.TestCase): mox.IgnoreArg(), mox.IgnoreArg()) self.stubs.Set(nova.db, 'security_group_get_by_name', - return_security_group) + return_security_group_by_name) self.mox.ReplayAll() body = dict(removeSecurityGroup=dict(name="test")) - req = webob.Request.blank('/v1.1/123/servers/%s/action' % FAKE_UUID) - req.headers['Content-Type'] = 'application/json' - req.method = 'POST' - req.body = json.dumps(body) - response = req.get_response(fakes.wsgi_app()) - self.assertEquals(response.status_int, 202) + + req = fakes.HTTPRequest.blank('/v1.1/123/servers/1/action') + self.manager._removeSecurityGroup(body, req, '1') class TestSecurityGroupRules(test.TestCase): def setUp(self): super(TestSecurityGroupRules, self).setUp() - security_group = {} - security_group['name'] = "authorize-revoke" - security_group['description'] = ("Security group created for " - " authorize-revoke testing") - response = _create_security_group_json(security_group) - security_group = json.loads(response.body) - self.parent_security_group = security_group['security_group'] - - rules = { - "security_group_rule": { - "ip_protocol": "tcp", - "from_port": "22", - "to_port": "22", - "parent_group_id": self.parent_security_group['id'], - "cidr": "10.0.0.0/24" - } - } - res = self._create_security_group_rule_json(rules) - self.assertEquals(res.status_int, 200) - self.security_group_rule = json.loads(res.body)['security_group_rule'] + + controller = security_groups.SecurityGroupController() + + sg1 = security_group_template(id=1) + sg2 = security_group_template(id=2, + name='authorize_revoke', + description='authorize-revoke testing') + db1 = security_group_db(sg1) + db2 = security_group_db(sg2) + + def return_security_group(context, group_id): + if group_id == db1['id']: + return db1 + if group_id == db2['id']: + return db2 + raise exception.NotFound() + + self.stubs.Set(nova.db, 'security_group_get', + return_security_group) + + self.parent_security_group = db2 + + self.controller = security_groups.SecurityGroupRulesController() def tearDown(self): super(TestSecurityGroupRules, self).tearDown() - def _create_security_group_rule_json(self, rules): - request = webob.Request.blank('/v1.1/123/os-security-group-rules') - request.headers['Content-Type'] = 'application/json' - request.method = 'POST' - request.body = json.dumps(rules) - response = request.get_response(fakes.wsgi_app()) - return response - - def _delete_security_group_rule(self, id): - request = webob.Request.blank('/v1.1/123/os-security-group-rules/%s' - % id) - request.method = 'DELETE' - response = request.get_response(fakes.wsgi_app()) - return response - def test_create_by_cidr(self): - rules = { - "security_group_rule": { - "ip_protocol": "tcp", - "from_port": "22", - "to_port": "22", - "parent_group_id": 2, - "cidr": "10.2.3.124/24" - } - } - - response = self._create_security_group_rule_json(rules) - security_group_rule = json.loads(response.body)['security_group_rule'] - self.assertEquals(response.status_int, 200) + rule = security_group_rule_template(cidr='10.2.3.124/24') + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules') + res_dict = self.controller.create(req, {'security_group_rule': rule}) + + security_group_rule = res_dict['security_group_rule'] self.assertNotEquals(security_group_rule['id'], 0) self.assertEquals(security_group_rule['parent_group_id'], 2) self.assertEquals(security_group_rule['ip_range']['cidr'], "10.2.3.124/24") def test_create_by_group_id(self): - rules = { - "security_group_rule": { - "ip_protocol": "tcp", - "from_port": "22", - "to_port": "22", - "group_id": "1", - "parent_group_id": "%s" - % self.parent_security_group['id'], - } - } - - response = self._create_security_group_rule_json(rules) - self.assertEquals(response.status_int, 200) - security_group_rule = json.loads(response.body)['security_group_rule'] + rule = security_group_rule_template(group_id='1') + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules') + res_dict = self.controller.create(req, {'security_group_rule': rule}) + + security_group_rule = res_dict['security_group_rule'] self.assertNotEquals(security_group_rule['id'], 0) self.assertEquals(security_group_rule['parent_group_id'], 2) def test_create_add_existing_rules(self): - rules = { - "security_group_rule": { - "ip_protocol": "tcp", - "from_port": "22", - "to_port": "22", - "cidr": "10.0.0.0/24", - "parent_group_id": "%s" % self.parent_security_group['id'], - } - } - - response = self._create_security_group_rule_json(rules) - self.assertEquals(response.status_int, 400) + rule = security_group_rule_template(cidr='10.0.0.0/24') + + self.parent_security_group['rules'] = [security_group_rule_db(rule)] + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) def test_create_with_no_body(self): - request = webob.Request.blank('/v1.1/123/os-security-group-rules') - request.headers['Content-Type'] = 'application/json' - request.method = 'POST' - request.body = json.dumps(None) - response = request.get_response(fakes.wsgi_app()) - self.assertEquals(response.status_int, 422) + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules') + self.assertRaises(webob.exc.HTTPUnprocessableEntity, + self.controller.create, req, None) def test_create_with_no_security_group_rule_in_body(self): - request = webob.Request.blank('/v1.1/123/os-security-group-rules') - request.headers['Content-Type'] = 'application/json' - request.method = 'POST' - body_dict = {'test': "test"} - request.body = json.dumps(body_dict) - response = request.get_response(fakes.wsgi_app()) - self.assertEquals(response.status_int, 422) + rules = {'test': 'test'} + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules') + self.assertRaises(webob.exc.HTTPUnprocessableEntity, + self.controller.create, req, rules) def test_create_with_invalid_parent_group_id(self): - rules = { - "security_group_rule": { - "ip_protocol": "tcp", - "from_port": "22", - "to_port": "22", - "parent_group_id": "invalid" - } - } - - response = self._create_security_group_rule_json(rules) - self.assertEquals(response.status_int, 400) + rule = security_group_rule_template(parent_group_id='invalid') + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) def test_create_with_non_existing_parent_group_id(self): - rules = { - "security_group_rule": { - "ip_protocol": "tcp", - "from_port": "22", - "to_port": "22", - "group_id": "invalid", - "parent_group_id": "1111111111111" - } - } - - response = self._create_security_group_rule_json(rules) - self.assertEquals(response.status_int, 404) + rule = security_group_rule_template(group_id='invalid', + parent_group_id='1111111111111') + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules') + self.assertRaises(webob.exc.HTTPNotFound, self.controller.create, + req, {'security_group_rule': rule}) def test_create_with_invalid_protocol(self): - rules = { - "security_group_rule": { - "ip_protocol": "invalid-protocol", - "from_port": "22", - "to_port": "22", - "cidr": "10.2.2.0/24", - "parent_group_id": "%s" % self.parent_security_group['id'], - } - } - - response = self._create_security_group_rule_json(rules) - self.assertEquals(response.status_int, 400) + rule = security_group_rule_template(ip_protocol='invalid-protocol', + cidr='10.2.2.0/24') + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) def test_create_with_no_protocol(self): - rules = { - "security_group_rule": { - "from_port": "22", - "to_port": "22", - "cidr": "10.2.2.0/24", - "parent_group_id": "%s" % self.parent_security_group['id'], - } - } - - response = self._create_security_group_rule_json(rules) - self.assertEquals(response.status_int, 400) + rule = security_group_rule_template(cidr='10.2.2.0/24') + del rule['ip_protocol'] + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) def test_create_with_invalid_from_port(self): - rules = { - "security_group_rule": { - "ip_protocol": "tcp", - "from_port": "666666", - "to_port": "22", - "cidr": "10.2.2.0/24", - "parent_group_id": "%s" % self.parent_security_group['id'], - } - } - - response = self._create_security_group_rule_json(rules) - self.assertEquals(response.status_int, 400) + rule = security_group_rule_template(from_port='666666', + cidr='10.2.2.0/24') + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) def test_create_with_invalid_to_port(self): - rules = { - "security_group_rule": { - "ip_protocol": "tcp", - "from_port": "22", - "to_port": "666666", - "cidr": "10.2.2.0/24", - "parent_group_id": "%s" % self.parent_security_group['id'], - } - } - - response = self._create_security_group_rule_json(rules) - self.assertEquals(response.status_int, 400) + rule = security_group_rule_template(to_port='666666', + cidr='10.2.2.0/24') + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) def test_create_with_non_numerical_from_port(self): - rules = { - "security_group_rule": { - "ip_protocol": "tcp", - "from_port": "invalid", - "to_port": "22", - "cidr": "10.2.2.0/24", - "parent_group_id": "%s" % self.parent_security_group['id'], - } - } - - response = self._create_security_group_rule_json(rules) - self.assertEquals(response.status_int, 400) + rule = security_group_rule_template(from_port='invalid', + cidr='10.2.2.0/24') + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) def test_create_with_non_numerical_to_port(self): - rules = { - "security_group_rule": { - "ip_protocol": "tcp", - "from_port": "22", - "to_port": "invalid", - "cidr": "10.2.2.0/24", - "parent_group_id": "%s" % self.parent_security_group['id'], - } - } - - response = self._create_security_group_rule_json(rules) - self.assertEquals(response.status_int, 400) + rule = security_group_rule_template(to_port='invalid', + cidr='10.2.2.0/24') + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) + + def test_create_with_no_from_port(self): + rule = security_group_rule_template(cidr='10.2.2.0/24') + del rule['from_port'] + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) def test_create_with_no_to_port(self): - rules = { - "security_group_rule": { - "ip_protocol": "tcp", - "from_port": "22", - "cidr": "10.2.2.0/24", - "parent_group_id": "%s" % self.parent_security_group['id'], - } - } - - response = self._create_security_group_rule_json(rules) - self.assertEquals(response.status_int, 400) + rule = security_group_rule_template(cidr='10.2.2.0/24') + del rule['to_port'] + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) def test_create_with_invalid_cidr(self): - rules = { - "security_group_rule": { - "ip_protocol": "tcp", - "from_port": "22", - "to_port": "22", - "cidr": "10.2.22222.0/24", - "parent_group_id": "%s" % self.parent_security_group['id'], - } - } - - response = self._create_security_group_rule_json(rules) - self.assertEquals(response.status_int, 400) + rule = security_group_rule_template(cidr='10.2.2222.0/24') + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) def test_create_with_no_cidr_group(self): - rules = { - "security_group_rule": { - "ip_protocol": "tcp", - "from_port": "22", - "to_port": "22", - "parent_group_id": "%s" % self.parent_security_group['id'], - } - } - - response = self._create_security_group_rule_json(rules) - security_group_rule = json.loads(response.body)['security_group_rule'] - self.assertEquals(response.status_int, 200) + rule = security_group_rule_template() + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules') + res_dict = self.controller.create(req, {'security_group_rule': rule}) + + security_group_rule = res_dict['security_group_rule'] self.assertNotEquals(security_group_rule['id'], 0) self.assertEquals(security_group_rule['parent_group_id'], self.parent_security_group['id']) @@ -784,77 +652,61 @@ class TestSecurityGroupRules(test.TestCase): "0.0.0.0/0") def test_create_with_invalid_group_id(self): - rules = { - "security_group_rule": { - "ip_protocol": "tcp", - "from_port": "22", - "to_port": "22", - "group_id": "invalid", - "parent_group_id": "%s" % self.parent_security_group['id'], - } - } - - response = self._create_security_group_rule_json(rules) - self.assertEquals(response.status_int, 400) + rule = security_group_rule_template(group_id='invalid') + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) def test_create_with_empty_group_id(self): - rules = { - "security_group_rule": { - "ip_protocol": "tcp", - "from_port": "22", - "to_port": "22", - "group_id": "", - "parent_group_id": "%s" % self.parent_security_group['id'], - } - } - - response = self._create_security_group_rule_json(rules) - self.assertEquals(response.status_int, 400) + rule = security_group_rule_template(group_id='') - def test_create_with_invalid_group_id(self): - rules = { - "security_group_rule": { - "ip_protocol": "tcp", - "from_port": "22", - "to_port": "22", - "group_id": "222222", - "parent_group_id": "%s" % self.parent_security_group['id'], - } - } - - response = self._create_security_group_rule_json(rules) - self.assertEquals(response.status_int, 400) + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) + + def test_create_with_nonexist_group_id(self): + rule = security_group_rule_template(group_id='222222') + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) def test_create_rule_with_same_group_parent_id(self): - rules = { - "security_group_rule": { - "ip_protocol": "tcp", - "from_port": "22", - "to_port": "22", - "group_id": "%s" % self.parent_security_group['id'], - "parent_group_id": "%s" % self.parent_security_group['id'], - } - } - - response = self._create_security_group_rule_json(rules) - self.assertEquals(response.status_int, 400) + rule = security_group_rule_template(group_id=2) + + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) def test_delete(self): - response = self._delete_security_group_rule( - self.security_group_rule['id']) - self.assertEquals(response.status_int, 202) + rule = security_group_rule_template(id=10) + + def security_group_rule_get(context, id): + return security_group_rule_db(rule) + + def security_group_rule_destroy(context, id): + pass + + self.stubs.Set(nova.db, 'security_group_rule_get', + security_group_rule_get) + self.stubs.Set(nova.db, 'security_group_rule_destroy', + security_group_rule_destroy) - response = self._delete_security_group_rule( - self.security_group_rule['id']) - self.assertEquals(response.status_int, 404) + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules/10') + self.controller.delete(req, '10') def test_delete_invalid_rule_id(self): - response = self._delete_security_group_rule('invalid') - self.assertEquals(response.status_int, 400) + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules' + + '/invalid') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete, + req, 'invalid') def test_delete_non_existing_rule_id(self): - response = self._delete_security_group_rule(22222222222222) - self.assertEquals(response.status_int, 404) + req = fakes.HTTPRequest.blank('/v1.1/123/os-security-group-rules' + + '/22222222222222') + self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete, + req, '22222222222222') class TestSecurityGroupRulesXMLDeserializer(unittest.TestCase): |
