From 634fe881223a7ea8e04b3054b39724207153be5b Mon Sep 17 00:00:00 2001 From: Tushar Patil Date: Wed, 3 Aug 2011 15:03:34 -0700 Subject: Initial version --- .../api/openstack/contrib/test_security_groups.py | 761 +++++++++++++++++++++ nova/tests/api/openstack/test_extensions.py | 6 +- 2 files changed, 764 insertions(+), 3 deletions(-) create mode 100644 nova/tests/api/openstack/contrib/test_security_groups.py (limited to 'nova/tests') diff --git a/nova/tests/api/openstack/contrib/test_security_groups.py b/nova/tests/api/openstack/contrib/test_security_groups.py new file mode 100644 index 000000000..eaef413a7 --- /dev/null +++ b/nova/tests/api/openstack/contrib/test_security_groups.py @@ -0,0 +1,761 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import unittest +import webob +from xml.dom import minidom + +from nova import test +from nova.api.openstack.contrib import security_groups +from nova.tests.api.openstack import fakes + + +def _get_create_request_json(body_dict): + req = webob.Request.blank('/v1.1/security_groups') + req.headers['Content-Type'] = 'application/json' + req.method = 'POST' + req.body = json.dumps(body_dict) + return req + + +def _create_security_group_json(security_group): + body_dict = _create_security_group_request_dict(security_group) + request = _get_create_request_json(body_dict) + response = request.get_response(fakes.wsgi_app()) + return response + + +def _create_security_group_request_dict(security_group): + sg = {} + if security_group is not None: + name = security_group.get('name', None) + description = security_group.get('description', None) + if name: + sg['name'] = security_group['name'] + if description: + sg['description'] = security_group['description'] + return {'security_group': sg} + + +class TestSecurityGroups(test.TestCase): + def setUp(self): + super(TestSecurityGroups, self).setUp() + + def tearDown(self): + super(TestSecurityGroups, self).tearDown() + + def _create_security_group_request_dict(self, security_group): + sg = {} + if security_group is not None: + name = security_group.get('name', None) + description = security_group.get('description', None) + if name: + sg['name'] = security_group['name'] + if description: + sg['description'] = security_group['description'] + return {'security_group': sg} + + def _format_create_xml_request_body(self, body_dict): + sg = body_dict['security_group'] + body_parts = [] + body_parts.extend([ + '', + '' % (sg['name'])]) + if 'description' in sg: + body_parts.append('%s' + % sg['description']) + body_parts.append('') + return ''.join(body_parts) + + def _get_create_request_xml(self, body_dict): + req = webob.Request.blank('/v1.1/security_groups') + req.headers['Content-Type'] = 'application/xml' + req.content_type = 'application/xml' + req.accept = 'application/xml' + req.method = 'POST' + req.body = self._format_create_xml_request_body(body_dict) + return req + + def _create_security_group_xml(self, security_group): + body_dict = self._create_security_group_request_dict(security_group) + request = self._get_create_request_xml(body_dict) + response = request.get_response(fakes.wsgi_app()) + return response + + def _delete_security_group(self, id): + request = webob.Request.blank('/v1.1/security_groups/%s' + % id) + request.method = 'DELETE' + response = request.get_response(fakes.wsgi_app()) + return response + + def test_create_security_group_json(self): + security_group = {} + security_group['name'] = "test" + security_group['description'] = "group-description" + response = _create_security_group_json(security_group) + res_dict = json.loads(response.body) + self.assertEqual(res_dict['security_group']['name'], "test") + self.assertEqual(res_dict['security_group']['description'], + "group-description") + self.assertEquals(response.status_int, 200) + + def test_create_security_group_xml(self): + security_group = {} + security_group['name'] = "test" + security_group['description'] = "group-description" + response = \ + self._create_security_group_xml(security_group) + + self.assertEquals(response.status_int, 200) + dom = minidom.parseString(response.body) + sg = dom.childNodes[0] + self.assertEquals(sg.nodeName, 'security_group') + self.assertEqual(security_group['name'], sg.getAttribute('name')) + + def test_create_security_group_with_no_name_json(self): + security_group = {} + security_group['description'] = "group-description" + response = _create_security_group_json(security_group) + self.assertEquals(response.status_int, 400) + + def test_create_security_group_with_no_description_json(self): + security_group = {} + security_group['name'] = "test" + response = _create_security_group_json(security_group) + self.assertEquals(response.status_int, 400) + + def test_create_security_group_with_blank_name_json(self): + security_group = {} + security_group['name'] = "" + security_group['description'] = "group-description" + response = _create_security_group_json(security_group) + self.assertEquals(response.status_int, 400) + + def test_create_security_group_with_whitespace_name_json(self): + security_group = {} + security_group['name'] = " " + security_group['description'] = "group-description" + response = _create_security_group_json(security_group) + self.assertEquals(response.status_int, 400) + + def test_create_security_group_with_blank_description_json(self): + security_group = {} + security_group['name'] = "test" + security_group['description'] = "" + response = _create_security_group_json(security_group) + self.assertEquals(response.status_int, 400) + + def test_create_security_group_with_whitespace_description_json(self): + security_group = {} + security_group['name'] = "name" + security_group['description'] = " " + response = _create_security_group_json(security_group) + self.assertEquals(response.status_int, 400) + + def test_create_security_group_with_duplicate_name_json(self): + security_group = {} + security_group['name'] = "test" + security_group['description'] = "group-description" + response = _create_security_group_json(security_group) + + self.assertEquals(response.status_int, 200) + response = _create_security_group_json(security_group) + self.assertEquals(response.status_int, 400) + + def test_create_security_group_with_no_body_json(self): + request = _get_create_request_json(body_dict=None) + response = request.get_response(fakes.wsgi_app()) + self.assertEquals(response.status_int, 422) + + def test_create_security_group_with_no_security_group(self): + body_dict = {} + body_dict['no-securityGroup'] = None + request = _get_create_request_json(body_dict) + response = request.get_response(fakes.wsgi_app()) + self.assertEquals(response.status_int, 422) + + def test_create_security_group_above_255_characters_name_json(self): + security_group = {} + security_group['name'] = ("1234567890123456" + "1234567890123456789012345678901234567890" + "1234567890123456789012345678901234567890" + "1234567890123456789012345678901234567890" + "1234567890123456789012345678901234567890" + "1234567890123456789012345678901234567890" + "1234567890123456789012345678901234567890") + security_group['description'] = "group-description" + response = _create_security_group_json(security_group) + + self.assertEquals(response.status_int, 400) + + def test_create_security_group_above_255_characters_description_json(self): + security_group = {} + security_group['name'] = "test" + security_group['description'] = ("1234567890123456" + "1234567890123456789012345678901234567890" + "1234567890123456789012345678901234567890" + "1234567890123456789012345678901234567890" + "1234567890123456789012345678901234567890" + "1234567890123456789012345678901234567890" + "1234567890123456789012345678901234567890") + response = _create_security_group_json(security_group) + self.assertEquals(response.status_int, 400) + + def test_create_security_group_non_string_name_json(self): + security_group = {} + security_group['name'] = 12 + security_group['description'] = "group-description" + response = _create_security_group_json(security_group) + self.assertEquals(response.status_int, 400) + + def test_create_security_group_non_string_description_json(self): + security_group = {} + security_group['name'] = "test" + security_group['description'] = 12 + response = _create_security_group_json(security_group) + self.assertEquals(response.status_int, 400) + + def test_get_security_group_list(self): + security_group = {} + security_group['name'] = "test" + security_group['description'] = "group-description" + response = _create_security_group_json(security_group) + + req = webob.Request.blank('/v1.1/security_groups') + req.headers['Content-Type'] = 'application/json' + req.method = 'GET' + response = req.get_response(fakes.wsgi_app()) + res_dict = json.loads(response.body) + + expected = {'security_groups': [ + {'id': 1, + 'name':"default", + 'tenant_id': "fake", + "description":"default", + "rules": [] + }, + ] + } + expected['security_groups'].append( + { + 'id': 2, + 'name': "test", + 'tenant_id': "fake", + "description": "group-description", + "rules": [] + } + ) + self.assertEquals(response.status_int, 200) + self.assertEquals(res_dict, expected) + + def test_get_security_group_by_id(self): + security_group = {} + security_group['name'] = "test" + security_group['description'] = "group-description" + response = _create_security_group_json(security_group) + + res_dict = json.loads(response.body) + req = webob.Request.blank('/v1.1/security_groups/%s' % + res_dict['security_group']['id']) + req.headers['Content-Type'] = 'application/json' + req.method = 'GET' + response = req.get_response(fakes.wsgi_app()) + res_dict = json.loads(response.body) + + expected = { + 'security_group': { + 'id': 2, + 'name': "test", + 'tenant_id': "fake", + 'description': "group-description", + 'rules': [] + } + } + self.assertEquals(response.status_int, 200) + self.assertEquals(res_dict, expected) + + def test_get_security_group_by_invalid_id(self): + req = webob.Request.blank('/v1.1/security_groups/invalid') + req.headers['Content-Type'] = 'application/json' + req.method = 'GET' + response = req.get_response(fakes.wsgi_app()) + self.assertEquals(response.status_int, 400) + + def test_get_security_group_by_non_existing_id(self): + req = webob.Request.blank('/v1.1/security_groups/111111111') + req.headers['Content-Type'] = 'application/json' + req.method = 'GET' + response = req.get_response(fakes.wsgi_app()) + self.assertEquals(response.status_int, 404) + + def test_delete_security_group_by_id(self): + security_group = {} + security_group['name'] = "test" + security_group['description'] = "group-description" + response = _create_security_group_json(security_group) + security_group = json.loads(response.body)['security_group'] + response = self._delete_security_group(security_group['id']) + self.assertEquals(response.status_int, 202) + + response = self._delete_security_group(security_group['id']) + self.assertEquals(response.status_int, 404) + + def test_delete_security_group_by_invalid_id(self): + response = self._delete_security_group('invalid') + self.assertEquals(response.status_int, 400) + + def test_delete_security_group_by_non_existing_id(self): + response = self._delete_security_group(11111111) + self.assertEquals(response.status_int, 404) + + +class TestSecurityGroupRules(test.TestCase): + def setUp(self): + super(TestSecurityGroupRules, self).setUp() + security_group = {} + security_group['name'] = "authorize-revoke" + security_group['description'] = ("Security group created for " + " authorize-revoke testing") + response = _create_security_group_json(security_group) + security_group = json.loads(response.body) + self.parent_security_group = security_group['security_group'] + + rules = { + "security_group_rule": { + "ip_protocol": "tcp", + "from_port": "22", + "to_port": "22", + "parent_group_id": self.parent_security_group['id'], + "cidr": "10.0.0.0/24" + } + } + res = self._create_security_group_rule_json(rules) + self.assertEquals(res.status_int, 200) + self.security_group_rule = json.loads(res.body)['security_group_rule'] + + def tearDown(self): + super(TestSecurityGroupRules, self).tearDown() + + def _create_security_group_rule_json(self, rules): + request = webob.Request.blank('/v1.1/security_group_rules') + request.headers['Content-Type'] = 'application/json' + request.method = 'POST' + request.body = json.dumps(rules) + response = request.get_response(fakes.wsgi_app()) + return response + + def _delete_security_group_rule(self, id): + request = webob.Request.blank('/v1.1/security_group_rules/%s' + % id) + request.method = 'DELETE' + response = request.get_response(fakes.wsgi_app()) + return response + + def test_create_by_cidr_json(self): + rules = { + "security_group_rule": { + "ip_protocol": "tcp", + "from_port": "22", + "to_port": "22", + "parent_group_id": 2, + "cidr": "10.2.3.124/24" + } + } + + response = self._create_security_group_rule_json(rules) + security_group_rule = json.loads(response.body)['security_group_rule'] + self.assertEquals(response.status_int, 200) + self.assertNotEquals(security_group_rule['id'], 0) + self.assertEquals(security_group_rule['parent_group_id'], 2) + self.assertEquals(security_group_rule['ip_range']['cidr'], + "10.2.3.124/24") + + def test_create_by_group_id_json(self): + rules = { + "security_group_rule": { + "ip_protocol": "tcp", + "from_port": "22", + "to_port": "22", + "group_id": "1", + "parent_group_id": "%s" + % self.parent_security_group['id'], + } + } + + response = self._create_security_group_rule_json(rules) + self.assertEquals(response.status_int, 200) + security_group_rule = json.loads(response.body)['security_group_rule'] + self.assertNotEquals(security_group_rule['id'], 0) + self.assertEquals(security_group_rule['parent_group_id'], 2) + + def test_create_add_existing_rules_json(self): + rules = { + "security_group_rule": { + "ip_protocol": "tcp", + "from_port": "22", + "to_port": "22", + "cidr": "10.0.0.0/24", + "parent_group_id": "%s" % self.parent_security_group['id'], + } + } + + response = self._create_security_group_rule_json(rules) + self.assertEquals(response.status_int, 400) + + def test_create_with_no_body_json(self): + request = webob.Request.blank('/v1.1/security_group_rules') + request.headers['Content-Type'] = 'application/json' + request.method = 'POST' + request.body = json.dumps(None) + response = request.get_response(fakes.wsgi_app()) + self.assertEquals(response.status_int, 422) + + def test_create_with_no_security_group_rule_in_body_json(self): + request = webob.Request.blank('/v1.1/security_group_rules') + request.headers['Content-Type'] = 'application/json' + request.method = 'POST' + body_dict = {'test': "test"} + request.body = json.dumps(body_dict) + response = request.get_response(fakes.wsgi_app()) + self.assertEquals(response.status_int, 422) + + def test_create_with_invalid_parent_group_id_json(self): + rules = { + "security_group_rule": { + "ip_protocol": "tcp", + "from_port": "22", + "to_port": "22", + "parent_group_id": "invalid" + } + } + + response = self._create_security_group_rule_json(rules) + self.assertEquals(response.status_int, 400) + + def test_create_with_non_existing_parent_group_id_json(self): + rules = { + "security_group_rule": { + "ip_protocol": "tcp", + "from_port": "22", + "to_port": "22", + "group_id": "invalid", + "parent_group_id": "1111111111111" + } + } + + response = self._create_security_group_rule_json(rules) + self.assertEquals(response.status_int, 404) + + def test_create_with_invalid_protocol_json(self): + rules = { + "security_group_rule": { + "ip_protocol": "invalid-protocol", + "from_port": "22", + "to_port": "22", + "cidr": "10.2.2.0/24", + "parent_group_id": "%s" % self.parent_security_group['id'], + } + } + + response = self._create_security_group_rule_json(rules) + self.assertEquals(response.status_int, 400) + + def test_create_with_no_protocol_json(self): + rules = { + "security_group_rule": { + "from_port": "22", + "to_port": "22", + "cidr": "10.2.2.0/24", + "parent_group_id": "%s" % self.parent_security_group['id'], + } + } + + response = self._create_security_group_rule_json(rules) + self.assertEquals(response.status_int, 400) + + def test_create_with_invalid_from_port_json(self): + rules = { + "security_group_rule": { + "ip_protocol": "tcp", + "from_port": "666666", + "to_port": "22", + "cidr": "10.2.2.0/24", + "parent_group_id": "%s" % self.parent_security_group['id'], + } + } + + response = self._create_security_group_rule_json(rules) + self.assertEquals(response.status_int, 400) + + def test_create_with_invalid_to_port_json(self): + rules = { + "security_group_rule": { + "ip_protocol": "tcp", + "from_port": "22", + "to_port": "666666", + "cidr": "10.2.2.0/24", + "parent_group_id": "%s" % self.parent_security_group['id'], + } + } + + response = self._create_security_group_rule_json(rules) + self.assertEquals(response.status_int, 400) + + def test_create_with_non_numerical_from_port_json(self): + rules = { + "security_group_rule": { + "ip_protocol": "tcp", + "from_port": "invalid", + "to_port": "22", + "cidr": "10.2.2.0/24", + "parent_group_id": "%s" % self.parent_security_group['id'], + } + } + + response = self._create_security_group_rule_json(rules) + self.assertEquals(response.status_int, 400) + + def test_create_with_non_numerical_to_port_json(self): + rules = { + "security_group_rule": { + "ip_protocol": "tcp", + "from_port": "22", + "to_port": "invalid", + "cidr": "10.2.2.0/24", + "parent_group_id": "%s" % self.parent_security_group['id'], + } + } + + response = self._create_security_group_rule_json(rules) + self.assertEquals(response.status_int, 400) + + def test_create_with_no_to_port_json(self): + rules = { + "security_group_rule": { + "ip_protocol": "tcp", + "from_port": "22", + "cidr": "10.2.2.0/24", + "parent_group_id": "%s" % self.parent_security_group['id'], + } + } + + response = self._create_security_group_rule_json(rules) + self.assertEquals(response.status_int, 400) + + def test_create_with_invalid_cidr_json(self): + rules = { + "security_group_rule": { + "ip_protocol": "tcp", + "from_port": "22", + "to_port": "22", + "cidr": "10.2.22222.0/24", + "parent_group_id": "%s" % self.parent_security_group['id'], + } + } + + response = self._create_security_group_rule_json(rules) + self.assertEquals(response.status_int, 400) + + def test_create_with_no_cidr_group_json(self): + rules = { + "security_group_rule": { + "ip_protocol": "tcp", + "from_port": "22", + "to_port": "22", + "parent_group_id": "%s" % self.parent_security_group['id'], + } + } + + response = self._create_security_group_rule_json(rules) + security_group_rule = json.loads(response.body)['security_group_rule'] + self.assertEquals(response.status_int, 200) + self.assertNotEquals(security_group_rule['id'], 0) + self.assertEquals(security_group_rule['parent_group_id'], + self.parent_security_group['id']) + self.assertEquals(security_group_rule['ip_range']['cidr'], + "0.0.0.0/0") + + def test_create_with_invalid_group_id_json(self): + rules = { + "security_group_rule": { + "ip_protocol": "tcp", + "from_port": "22", + "to_port": "22", + "group_id": "invalid", + "parent_group_id": "%s" % self.parent_security_group['id'], + } + } + + response = self._create_security_group_rule_json(rules) + self.assertEquals(response.status_int, 400) + + def test_create_with_empty_group_id_json(self): + rules = { + "security_group_rule": { + "ip_protocol": "tcp", + "from_port": "22", + "to_port": "22", + "group_id": "invalid", + "parent_group_id": "%s" % self.parent_security_group['id'], + } + } + + response = self._create_security_group_rule_json(rules) + self.assertEquals(response.status_int, 400) + + def test_create_with_invalid_group_id_json(self): + rules = { + "security_group_rule": { + "ip_protocol": "tcp", + "from_port": "22", + "to_port": "22", + "group_id": "222222", + "parent_group_id": "%s" % self.parent_security_group['id'], + } + } + + response = self._create_security_group_rule_json(rules) + self.assertEquals(response.status_int, 400) + + def test_create_rule_with_same_group_parent_id_json(self): + rules = { + "security_group_rule": { + "ip_protocol": "tcp", + "from_port": "22", + "to_port": "22", + "group_id": "%s" % self.parent_security_group['id'], + "parent_group_id": "%s" % self.parent_security_group['id'], + } + } + + response = self._create_security_group_rule_json(rules) + self.assertEquals(response.status_int, 400) + + def test_delete(self): + response = self._delete_security_group_rule( + self.security_group_rule['id']) + self.assertEquals(response.status_int, 202) + + response = self._delete_security_group_rule( + self.security_group_rule['id']) + self.assertEquals(response.status_int, 404) + + def test_delete_invalid_rule_id(self): + response = self._delete_security_group_rule('invalid') + self.assertEquals(response.status_int, 400) + + def test_delete_non_existing_rule_id(self): + response = self._delete_security_group_rule(22222222222222) + self.assertEquals(response.status_int, 404) + + +class TestSecurityGroupRulesXMLDeserializer(unittest.TestCase): + + def setUp(self): + self.deserializer = security_groups.SecurityGroupRulesXMLDeserializer() + + def test_create_request(self): + serial_request = """ + + 12 + 22 + 22 + + tcp + 10.0.0.0/24 +""" + request = self.deserializer.deserialize(serial_request, 'create') + expected = { + "security_group_rule": { + "parent_group_id": "12", + "from_port": "22", + "to_port": "22", + "ip_protocol": "tcp", + "group_id": "", + "cidr": "10.0.0.0/24", + }, + } + self.assertEquals(request['body'], expected) + + def test_create_no_protocol_request(self): + serial_request = """ + + 12 + 22 + 22 + + 10.0.0.0/24 +""" + request = self.deserializer.deserialize(serial_request, 'create') + expected = { + "security_group_rule": { + "parent_group_id": "12", + "from_port": "22", + "to_port": "22", + "group_id": "", + "cidr": "10.0.0.0/24", + }, + } + self.assertEquals(request['body'], expected) + + +class TestSecurityGroupXMLDeserializer(unittest.TestCase): + + def setUp(self): + self.deserializer = security_groups.SecurityGroupXMLDeserializer() + + def test_create_request(self): + serial_request = """ + + test +""" + request = self.deserializer.deserialize(serial_request, 'create') + expected = { + "security_group": { + "name": "test", + "description": "test", + }, + } + self.assertEquals(request['body'], expected) + + def test_create_no_description_request(self): + serial_request = """ + +""" + request = self.deserializer.deserialize(serial_request, 'create') + expected = { + "security_group": { + "name": "test", + }, + } + self.assertEquals(request['body'], expected) + + def test_create_no_name_request(self): + serial_request = """ + +test +""" + request = self.deserializer.deserialize(serial_request, 'create') + expected = { + "security_group": { + "description": "test", + }, + } + self.assertEquals(request['body'], expected) diff --git a/nova/tests/api/openstack/test_extensions.py b/nova/tests/api/openstack/test_extensions.py index 47c37225c..98a3a72d4 100644 --- a/nova/tests/api/openstack/test_extensions.py +++ b/nova/tests/api/openstack/test_extensions.py @@ -98,7 +98,7 @@ class ExtensionControllerTest(test.TestCase): names = [x['name'] for x in data['extensions']] names.sort() self.assertEqual(names, ["FlavorExtraSpecs", "Floating_ips", - "Fox In Socks", "Hosts", "Multinic", "Volumes"]) + "Fox In Socks", "Hosts", "Multinic", "SecurityGroups", "Volumes"]) # Make sure that at least Fox in Sox is correct. (fox_ext,) = [ @@ -109,7 +109,7 @@ class ExtensionControllerTest(test.TestCase): 'updated': '2011-01-22T13:25:27-06:00', 'description': 'The Fox In Socks Extension', 'alias': 'FOXNSOX', - 'links': [], + 'links': [] }, ) @@ -145,7 +145,7 @@ class ExtensionControllerTest(test.TestCase): # Make sure we have all the extensions. exts = root.findall('{0}extension'.format(NS)) - self.assertEqual(len(exts), 6) + self.assertEqual(len(exts), 7) # Make sure that at least Fox in Sox is correct. (fox_ext,) = [x for x in exts if x.get('alias') == 'FOXNSOX'] -- cgit From 2fe0c5fe95487df8827db10f38065e3c8ac3800f Mon Sep 17 00:00:00 2001 From: Tushar Patil Date: Fri, 5 Aug 2011 12:09:46 -0700 Subject: Fixed review comments --- nova/tests/api/openstack/contrib/test_security_groups.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'nova/tests') diff --git a/nova/tests/api/openstack/contrib/test_security_groups.py b/nova/tests/api/openstack/contrib/test_security_groups.py index eaef413a7..9a86a60f8 100644 --- a/nova/tests/api/openstack/contrib/test_security_groups.py +++ b/nova/tests/api/openstack/contrib/test_security_groups.py @@ -73,10 +73,10 @@ class TestSecurityGroups(test.TestCase): sg = body_dict['security_group'] body_parts = [] body_parts.extend([ - '', - '' % (sg['name'])]) + '', + '' % (sg['name'])]) if 'description' in sg: body_parts.append('%s' % sg['description']) -- cgit From 8f3ad17b4ee25fedeee98132f22cf1eeb5974a2c Mon Sep 17 00:00:00 2001 From: William Wolf Date: Thu, 11 Aug 2011 16:04:12 -0400 Subject: fixed pep8 issue --- nova/tests/api/openstack/contrib/test_keypairs.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) (limited to 'nova/tests') diff --git a/nova/tests/api/openstack/contrib/test_keypairs.py b/nova/tests/api/openstack/contrib/test_keypairs.py index c9dc34d65..23f19157e 100644 --- a/nova/tests/api/openstack/contrib/test_keypairs.py +++ b/nova/tests/api/openstack/contrib/test_keypairs.py @@ -77,8 +77,21 @@ class KeypairsTest(test.TestCase): self.assertTrue(len(res_dict['keypair']['private_key']) > 0) def test_keypair_import(self): - body = {'keypair': {'name': 'create_test', - 'public_key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDBYIznAx9D7118Q1VKGpXy2HDiKyUTM8XcUuhQpo0srqb9rboUp4a9NmCwpWpeElDLuva707GOUnfaBAvHBwsRXyxHJjRaI6YQj2oLJwqvaSaWUbyT1vtryRqy6J3TecN0WINY71f4uymiMZP0wby4bKBcYnac8KiCIlvkEl0ETjkOGUq8OyWRmn7ljj5SESEUdBP0JnuTFKddWTU/wD6wydeJaUhBTqOlHn0kX1GyqoNTE1UEhcM5ZRWgfUZfTjVyDF2kGj3vJLCJtJ8LoGcj7YaN4uPg1rBle+izwE/tLonRrds+cev8p6krSSrxWOwBbHkXa6OciiJDvkRzJXzf'}} + body = { + 'keypair': { + 'name': 'create_test', + 'public_key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDBYIznA' + 'x9D7118Q1VKGpXy2HDiKyUTM8XcUuhQpo0srqb9rboUp4' + 'a9NmCwpWpeElDLuva707GOUnfaBAvHBwsRXyxHJjRaI6Y' + 'Qj2oLJwqvaSaWUbyT1vtryRqy6J3TecN0WINY71f4uymi' + 'MZP0wby4bKBcYnac8KiCIlvkEl0ETjkOGUq8OyWRmn7lj' + 'j5SESEUdBP0JnuTFKddWTU/wD6wydeJaUhBTqOlHn0kX1' + 'GyqoNTE1UEhcM5ZRWgfUZfTjVyDF2kGj3vJLCJtJ8LoGc' + 'j7YaN4uPg1rBle+izwE/tLonRrds+cev8p6krSSrxWOwB' + 'bHkXa6OciiJDvkRzJXzf', + }, + } + req = webob.Request.blank('/v1.1/os-keypairs') req.method = 'POST' req.body = json.dumps(body) -- cgit From 03cf6551feae597dd71fbf7b52b41415863d1241 Mon Sep 17 00:00:00 2001 From: William Wolf Date: Thu, 11 Aug 2011 16:05:33 -0400 Subject: spacing fixes --- nova/tests/api/openstack/contrib/test_keypairs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'nova/tests') diff --git a/nova/tests/api/openstack/contrib/test_keypairs.py b/nova/tests/api/openstack/contrib/test_keypairs.py index 23f19157e..eb3bc7af0 100644 --- a/nova/tests/api/openstack/contrib/test_keypairs.py +++ b/nova/tests/api/openstack/contrib/test_keypairs.py @@ -28,6 +28,7 @@ def fake_keypair(name): 'fingerprint': 'FAKE_FINGERPRINT', 'name': name} + def db_key_pair_get_all_by_user(self, user_id): return [fake_keypair('FAKE')] @@ -109,4 +110,3 @@ class KeypairsTest(test.TestCase): req.headers['Content-Type'] = 'application/json' res = req.get_response(fakes.wsgi_app()) self.assertEqual(res.status_int, 202) - -- cgit From 68161e3e224ff77e4e93d02e5fabbd9ea17b0d48 Mon Sep 17 00:00:00 2001 From: Tushar Patil Date: Thu, 11 Aug 2011 17:04:33 -0700 Subject: prefixed with os- for the newly added extensions --- .../api/openstack/contrib/test_security_groups.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) (limited to 'nova/tests') diff --git a/nova/tests/api/openstack/contrib/test_security_groups.py b/nova/tests/api/openstack/contrib/test_security_groups.py index 9a86a60f8..4317880ca 100644 --- a/nova/tests/api/openstack/contrib/test_security_groups.py +++ b/nova/tests/api/openstack/contrib/test_security_groups.py @@ -25,7 +25,7 @@ from nova.tests.api.openstack import fakes def _get_create_request_json(body_dict): - req = webob.Request.blank('/v1.1/security_groups') + req = webob.Request.blank('/v1.1/os-security-groups') req.headers['Content-Type'] = 'application/json' req.method = 'POST' req.body = json.dumps(body_dict) @@ -84,7 +84,7 @@ class TestSecurityGroups(test.TestCase): return ''.join(body_parts) def _get_create_request_xml(self, body_dict): - req = webob.Request.blank('/v1.1/security_groups') + req = webob.Request.blank('/v1.1/os-security-groups') req.headers['Content-Type'] = 'application/xml' req.content_type = 'application/xml' req.accept = 'application/xml' @@ -99,7 +99,7 @@ class TestSecurityGroups(test.TestCase): return response def _delete_security_group(self, id): - request = webob.Request.blank('/v1.1/security_groups/%s' + request = webob.Request.blank('/v1.1/os-security-groups/%s' % id) request.method = 'DELETE' response = request.get_response(fakes.wsgi_app()) @@ -238,7 +238,7 @@ class TestSecurityGroups(test.TestCase): security_group['description'] = "group-description" response = _create_security_group_json(security_group) - req = webob.Request.blank('/v1.1/security_groups') + req = webob.Request.blank('/v1.1/os-security-groups') req.headers['Content-Type'] = 'application/json' req.method = 'GET' response = req.get_response(fakes.wsgi_app()) @@ -272,7 +272,7 @@ class TestSecurityGroups(test.TestCase): response = _create_security_group_json(security_group) res_dict = json.loads(response.body) - req = webob.Request.blank('/v1.1/security_groups/%s' % + req = webob.Request.blank('/v1.1/os-security-groups/%s' % res_dict['security_group']['id']) req.headers['Content-Type'] = 'application/json' req.method = 'GET' @@ -292,14 +292,14 @@ class TestSecurityGroups(test.TestCase): self.assertEquals(res_dict, expected) def test_get_security_group_by_invalid_id(self): - req = webob.Request.blank('/v1.1/security_groups/invalid') + req = webob.Request.blank('/v1.1/os-security-groups/invalid') req.headers['Content-Type'] = 'application/json' req.method = 'GET' response = req.get_response(fakes.wsgi_app()) self.assertEquals(response.status_int, 400) def test_get_security_group_by_non_existing_id(self): - req = webob.Request.blank('/v1.1/security_groups/111111111') + req = webob.Request.blank('/v1.1/os-security-groups/111111111') req.headers['Content-Type'] = 'application/json' req.method = 'GET' response = req.get_response(fakes.wsgi_app()) @@ -354,7 +354,7 @@ class TestSecurityGroupRules(test.TestCase): super(TestSecurityGroupRules, self).tearDown() def _create_security_group_rule_json(self, rules): - request = webob.Request.blank('/v1.1/security_group_rules') + request = webob.Request.blank('/v1.1/os-security-group-rules') request.headers['Content-Type'] = 'application/json' request.method = 'POST' request.body = json.dumps(rules) @@ -362,7 +362,7 @@ class TestSecurityGroupRules(test.TestCase): return response def _delete_security_group_rule(self, id): - request = webob.Request.blank('/v1.1/security_group_rules/%s' + request = webob.Request.blank('/v1.1/os-security-group-rules/%s' % id) request.method = 'DELETE' response = request.get_response(fakes.wsgi_app()) @@ -420,7 +420,7 @@ class TestSecurityGroupRules(test.TestCase): self.assertEquals(response.status_int, 400) def test_create_with_no_body_json(self): - request = webob.Request.blank('/v1.1/security_group_rules') + request = webob.Request.blank('/v1.1/os-security-group-rules') request.headers['Content-Type'] = 'application/json' request.method = 'POST' request.body = json.dumps(None) @@ -428,7 +428,7 @@ class TestSecurityGroupRules(test.TestCase): self.assertEquals(response.status_int, 422) def test_create_with_no_security_group_rule_in_body_json(self): - request = webob.Request.blank('/v1.1/security_group_rules') + request = webob.Request.blank('/v1.1/os-security-group-rules') request.headers['Content-Type'] = 'application/json' request.method = 'POST' body_dict = {'test': "test"} -- cgit