diff options
| author | Jason Kölker <jason@koelker.net> | 2011-11-01 13:26:41 -0500 |
|---|---|---|
| committer | Jason Kölker <jason@koelker.net> | 2011-11-01 13:26:41 -0500 |
| commit | 997c2e8eb0ade364a8920dd085ec0e24f56182fb (patch) | |
| tree | 5641d99a31163258d25be0af7da3f34b395359fa /tests | |
| parent | 1a8fa72410cacaf2589544ecc161306de9d13766 (diff) | |
| parent | 02c95aeb2ffe112f7b60a1d3c53cdde22bc5db4d (diff) | |
| download | oslo-997c2e8eb0ade364a8920dd085ec0e24f56182fb.tar.gz oslo-997c2e8eb0ade364a8920dd085ec0e24f56182fb.tar.xz oslo-997c2e8eb0ade364a8920dd085ec0e24f56182fb.zip | |
merge in upstream
Diffstat (limited to 'tests')
| -rw-r--r-- | tests/unit/extension_stubs.py | 53 | ||||
| -rw-r--r-- | tests/unit/extensions/__init__.py | 15 | ||||
| -rw-r--r-- | tests/unit/extensions/foxinsocks.py | 102 | ||||
| -rw-r--r-- | tests/unit/test_extensions.py | 504 | ||||
| -rw-r--r-- | tests/unit/test_wsgi.py | 413 |
5 files changed, 1087 insertions, 0 deletions
diff --git a/tests/unit/extension_stubs.py b/tests/unit/extension_stubs.py new file mode 100644 index 0000000..c25f285 --- /dev/null +++ b/tests/unit/extension_stubs.py @@ -0,0 +1,53 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from openstack.common import wsgi + + +class StubExtension(object): + + def __init__(self, alias="stub_extension"): + self.alias = alias + + def get_name(self): + return "Stub Extension" + + def get_alias(self): + return self.alias + + def get_description(self): + return "" + + def get_namespace(self): + return "" + + def get_updated(self): + return "" + + +class StubBaseAppController(object): + + def index(self, request): + return "base app index" + + def show(self, request, id): + return {'fort': 'knox'} + + def update(self, request, id, body=None): + return {'uneditable': 'original_value'} + + def create_resource(self): + return wsgi.Resource(self) diff --git a/tests/unit/extensions/__init__.py b/tests/unit/extensions/__init__.py new file mode 100644 index 0000000..848908a --- /dev/null +++ b/tests/unit/extensions/__init__.py @@ -0,0 +1,15 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/tests/unit/extensions/foxinsocks.py b/tests/unit/extensions/foxinsocks.py new file mode 100644 index 0000000..a0efd7e --- /dev/null +++ b/tests/unit/extensions/foxinsocks.py @@ -0,0 +1,102 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json + +from openstack.common import extensions +from openstack.common import wsgi + + +class FoxInSocksController(object): + + def index(self, request): + return "Try to say this Mr. Knox, sir..." + + def create_resource(self): + return wsgi.Resource(self) + + +class Foxinsocks(object): + + def __init__(self): + pass + + def get_name(self): + return "Fox In Socks" + + def get_alias(self): + return "FOXNSOX" + + def get_description(self): + return "The Fox In Socks Extension" + + def get_namespace(self): + return "http://www.fox.in.socks/api/ext/pie/v1.0" + + def get_updated(self): + return "2011-01-22T13:25:27-06:00" + + def get_resources(self): + resources = [] + resource = extensions.ResourceExtension('foxnsocks', + FoxInSocksController()) + resources.append(resource) + return resources + + def get_actions(self): + return [extensions.ActionExtension('dummy_resources', + 'FOXNSOX:add_tweedle', + self._add_tweedle_handler), + extensions.ActionExtension('dummy_resources', + 'FOXNSOX:delete_tweedle', + self._delete_tweedle_handler)] + + def get_request_extensions(self): + request_exts = [] + + def _goose_handler(req, res): + #NOTE: This only handles JSON responses. + # You can use content type header to test for XML. + data = json.loads(res.body) + data['FOXNSOX:googoose'] = req.GET.get('chewing') + res.body = json.dumps(data) + return res + + req_ext1 = extensions.RequestExtension('GET', '/dummy_resources/:(id)', + _goose_handler) + request_exts.append(req_ext1) + + def _bands_handler(req, res): + #NOTE: This only handles JSON responses. + # You can use content type header to test for XML. + data = json.loads(res.body) + data['FOXNSOX:big_bands'] = 'Pig Bands!' + res.body = json.dumps(data) + return res + + req_ext2 = extensions.RequestExtension('GET', '/dummy_resources/:(id)', + _bands_handler) + request_exts.append(req_ext2) + return request_exts + + def _add_tweedle_handler(self, input_dict, req, id): + return "Tweedle {0} Added.".format( + input_dict['FOXNSOX:add_tweedle']['name']) + + def _delete_tweedle_handler(self, input_dict, req, id): + return "Tweedle {0} Deleted.".format( + input_dict['FOXNSOX:delete_tweedle']['name']) diff --git a/tests/unit/test_extensions.py b/tests/unit/test_extensions.py new file mode 100644 index 0000000..841bf4d --- /dev/null +++ b/tests/unit/test_extensions.py @@ -0,0 +1,504 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +from lxml import etree +import os.path +import routes +import unittest + +from webtest import TestApp + + +from openstack.common import wsgi +from openstack.common import config +from openstack.common import extensions +from tests.unit.extension_stubs import (StubExtension, + StubBaseAppController) +from openstack.common.extensions import (ExtensionManager, + ExtensionMiddleware) + + +test_conf_file = os.path.join(os.path.dirname(__file__), os.pardir, + os.pardir, 'etc', 'openstack-common.conf.test') +extensions_path = os.path.join(os.path.dirname(__file__), "extensions") + +NS = "{http://docs.openstack.org/}" +ATOMNS = "{http://www.w3.org/2005/Atom}" + + +class ExtensionsTestApp(wsgi.Router): + + def __init__(self, options={}): + mapper = routes.Mapper() + controller = StubBaseAppController() + mapper.resource("dummy_resource", "/dummy_resources", + controller=controller.create_resource()) + super(ExtensionsTestApp, self).__init__(mapper) + + +class ResourceExtensionTest(unittest.TestCase): + + class ResourceExtensionController(object): + + def index(self, request): + return "resource index" + + def show(self, request, id): + return {'data': {'id': id}} + + def custom_member_action(self, request, id): + return {'member_action': 'value'} + + def custom_collection_action(self, request, **kwargs): + return {'collection': 'value'} + + def test_resource_can_be_added_as_extension(self): + res_ext = extensions.ResourceExtension('tweedles', + self.ResourceExtensionController()) + test_app = setup_extensions_test_app(SimpleExtensionManager(res_ext)) + + index_response = test_app.get("/tweedles") + self.assertEqual(200, index_response.status_int) + self.assertEqual("resource index", index_response.json) + + show_response = test_app.get("/tweedles/25266") + self.assertEqual({'data': {'id': "25266"}}, show_response.json) + + def test_resource_extension_with_custom_member_action(self): + controller = self.ResourceExtensionController() + member = {'custom_member_action': "GET"} + res_ext = extensions.ResourceExtension('tweedles', controller, + member_actions=member) + test_app = setup_extensions_test_app(SimpleExtensionManager(res_ext)) + + response = test_app.get("/tweedles/some_id/custom_member_action") + self.assertEqual(200, response.status_int) + self.assertEqual(json.loads(response.body)['member_action'], "value") + + def test_resource_extension_for_get_custom_collection_action(self): + controller = self.ResourceExtensionController() + collections = {'custom_collection_action': "PUT"} + res_ext = extensions.ResourceExtension('tweedles', controller, + collection_actions=collections) + test_app = setup_extensions_test_app(SimpleExtensionManager(res_ext)) + + response = test_app.put("/tweedles/custom_collection_action") + self.assertEqual(200, response.status_int) + self.assertEqual(json.loads(response.body)['collection'], "value") + + def test_resource_extension_for_put_custom_collection_action(self): + controller = self.ResourceExtensionController() + collections = {'custom_collection_action': "PUT"} + res_ext = extensions.ResourceExtension('tweedles', controller, + collection_actions=collections) + test_app = setup_extensions_test_app(SimpleExtensionManager(res_ext)) + + response = test_app.put("/tweedles/custom_collection_action") + + self.assertEqual(200, response.status_int) + self.assertEqual(json.loads(response.body)['collection'], 'value') + + def test_resource_extension_for_post_custom_collection_action(self): + controller = self.ResourceExtensionController() + collections = {'custom_collection_action': "POST"} + res_ext = extensions.ResourceExtension('tweedles', controller, + collection_actions=collections) + test_app = setup_extensions_test_app(SimpleExtensionManager(res_ext)) + + response = test_app.post("/tweedles/custom_collection_action") + + self.assertEqual(200, response.status_int) + self.assertEqual(json.loads(response.body)['collection'], 'value') + + def test_resource_extension_for_delete_custom_collection_action(self): + controller = self.ResourceExtensionController() + collections = {'custom_collection_action': "DELETE"} + res_ext = extensions.ResourceExtension('tweedles', controller, + collection_actions=collections) + test_app = setup_extensions_test_app(SimpleExtensionManager(res_ext)) + + response = test_app.delete("/tweedles/custom_collection_action") + + self.assertEqual(200, response.status_int) + self.assertEqual(json.loads(response.body)['collection'], 'value') + + def test_resource_ext_for_formatted_req_on_custom_collection_action(self): + controller = self.ResourceExtensionController() + collections = {'custom_collection_action': "GET"} + res_ext = extensions.ResourceExtension('tweedles', controller, + collection_actions=collections) + test_app = setup_extensions_test_app(SimpleExtensionManager(res_ext)) + + response = test_app.get("/tweedles/custom_collection_action.json") + + self.assertEqual(200, response.status_int) + self.assertEqual(json.loads(response.body)['collection'], "value") + + def test_resource_ext_for_nested_resource_custom_collection_action(self): + controller = self.ResourceExtensionController() + collections = {'custom_collection_action': "GET"} + parent = dict(collection_name='beetles', member_name='beetle') + res_ext = extensions.ResourceExtension('tweedles', controller, + collection_actions=collections, + parent=parent) + test_app = setup_extensions_test_app(SimpleExtensionManager(res_ext)) + + response = test_app.get("/beetles/beetle_id" + "/tweedles/custom_collection_action") + + self.assertEqual(200, response.status_int) + self.assertEqual(json.loads(response.body)['collection'], "value") + + def test_returns_404_for_non_existant_extension(self): + test_app = setup_extensions_test_app(SimpleExtensionManager(None)) + + response = test_app.get("/non_extistant_extension", status='*') + + self.assertEqual(404, response.status_int) + + +class ActionExtensionTest(unittest.TestCase): + + def setUp(self): + super(ActionExtensionTest, self).setUp() + self.extension_app = setup_extensions_test_app() + + def test_extended_action_for_adding_extra_data(self): + action_name = 'FOXNSOX:add_tweedle' + action_params = dict(name='Beetle') + req_body = json.dumps({action_name: action_params}) + response = self.extension_app.post('/dummy_resources/1/action', + req_body, content_type='application/json') + + self.assertEqual("Tweedle Beetle Added.", response.json) + + def test_extended_action_for_deleting_extra_data(self): + action_name = 'FOXNSOX:delete_tweedle' + action_params = dict(name='Bailey') + req_body = json.dumps({action_name: action_params}) + response = self.extension_app.post("/dummy_resources/1/action", + req_body, content_type='application/json') + self.assertEqual("Tweedle Bailey Deleted.", response.json) + + def test_returns_404_for_non_existant_action(self): + non_existant_action = 'blah_action' + action_params = dict(name="test") + req_body = json.dumps({non_existant_action: action_params}) + + response = self.extension_app.post("/dummy_resources/1/action", + req_body, content_type='application/json', + status='*') + + self.assertEqual(404, response.status_int) + + def test_returns_404_for_non_existant_resource(self): + action_name = 'add_tweedle' + action_params = dict(name='Beetle') + req_body = json.dumps({action_name: action_params}) + + response = self.extension_app.post("/asdf/1/action", req_body, + content_type='application/json', status='*') + self.assertEqual(404, response.status_int) + + +class RequestExtensionTest(unittest.TestCase): + + def test_headers_can_be_extended(self): + def extend_headers(req, res): + assert req.headers['X-NEW-REQUEST-HEADER'] == "sox" + res.headers['X-NEW-RESPONSE-HEADER'] = "response_header_data" + return res + + app = self._setup_app_with_request_handler(extend_headers, 'GET') + response = app.get("/dummy_resources/1", + headers={'X-NEW-REQUEST-HEADER': "sox"}) + + self.assertEqual(response.headers['X-NEW-RESPONSE-HEADER'], + "response_header_data") + + def test_extend_get_resource_response(self): + def extend_response_data(req, res): + data = json.loads(res.body) + data['FOXNSOX:extended_key'] = req.GET.get('extended_key') + res.body = json.dumps(data) + return res + + app = self._setup_app_with_request_handler(extend_response_data, 'GET') + response = app.get("/dummy_resources/1?extended_key=extended_data") + + self.assertEqual(200, response.status_int) + + response_data = json.loads(response.body) + self.assertEqual('extended_data', + response_data['FOXNSOX:extended_key']) + self.assertEqual('knox', response_data['fort']) + + def test_get_resources(self): + app = setup_extensions_test_app() + + response = app.get("/dummy_resources/1?chewing=newblue") + + response_data = json.loads(response.body) + self.assertEqual('newblue', response_data['FOXNSOX:googoose']) + self.assertEqual("Pig Bands!", response_data['FOXNSOX:big_bands']) + + def test_edit_previously_uneditable_field(self): + + def _update_handler(req, res): + data = json.loads(res.body) + data['uneditable'] = json.loads(req.body)['uneditable'] + res.body = json.dumps(data) + return res + + base_app = TestApp(setup_base_app()) + response = base_app.put("/dummy_resources/1", + json.dumps({'uneditable': "new_value"}), + headers={'Content-Type': "application/json"}) + self.assertEqual(response.json['uneditable'], "original_value") + + ext_app = self._setup_app_with_request_handler(_update_handler, + 'PUT') + ext_response = ext_app.put("/dummy_resources/1", + json.dumps({'uneditable': "new_value"}), + headers={'Content-Type': "application/json"}) + self.assertEqual(ext_response.json['uneditable'], "new_value") + + def _setup_app_with_request_handler(self, handler, verb): + req_ext = extensions.RequestExtension(verb, + '/dummy_resources/:(id)', handler) + manager = SimpleExtensionManager(None, None, req_ext) + return setup_extensions_test_app(manager) + + +class ExtensionManagerTest(unittest.TestCase): + + def test_invalid_extensions_are_not_registered(self): + + class InvalidExtension(object): + """ + This Extension doesn't implement extension methods : + get_name, get_description, get_namespace and get_updated + """ + def get_alias(self): + return "invalid_extension" + + ext_mgr = ExtensionManager('') + ext_mgr.add_extension(InvalidExtension()) + ext_mgr.add_extension(StubExtension("valid_extension")) + + self.assertTrue('valid_extension' in ext_mgr.extensions) + self.assertFalse('invalid_extension' in ext_mgr.extensions) + + +class ExtensionControllerTest(unittest.TestCase): + + def setUp(self): + super(ExtensionControllerTest, self).setUp() + self.test_app = setup_extensions_test_app() + + def test_index_gets_all_registerd_extensions(self): + response = self.test_app.get("/extensions") + foxnsox = response.json["extensions"][0] + + self.assertEqual(foxnsox, { + 'namespace': 'http://www.fox.in.socks/api/ext/pie/v1.0', + 'name': 'Fox In Socks', + 'updated': '2011-01-22T13:25:27-06:00', + 'description': 'The Fox In Socks Extension', + 'alias': 'FOXNSOX', + 'links': [] + } + ) + + def test_extension_can_be_accessed_by_alias(self): + json_response = self.test_app.get("/extensions/FOXNSOX").json + foxnsox = json_response['extension'] + + self.assertEqual(foxnsox, { + 'namespace': 'http://www.fox.in.socks/api/ext/pie/v1.0', + 'name': 'Fox In Socks', + 'updated': '2011-01-22T13:25:27-06:00', + 'description': 'The Fox In Socks Extension', + 'alias': 'FOXNSOX', + 'links': [] + } + ) + + def test_show_returns_not_found_for_non_existant_extension(self): + response = self.test_app.get("/extensions/non_existant", status="*") + + self.assertEqual(response.status_int, 404) + + def test_list_extensions_xml(self): + response = self.test_app.get("/extensions.xml") + + self.assertEqual(200, response.status_int) + root = etree.XML(response.body) + self.assertEqual(root.tag.split('extensions')[0], NS) + + # Make sure that Fox in Sox extension is correct. + exts = root.findall('{0}extension'.format(NS)) + fox_ext = exts[0] + self.assertEqual(fox_ext.get('name'), 'Fox In Socks') + self.assertEqual(fox_ext.get('namespace'), + 'http://www.fox.in.socks/api/ext/pie/v1.0') + self.assertEqual(fox_ext.get('updated'), '2011-01-22T13:25:27-06:00') + self.assertEqual(fox_ext.findtext('{0}description'.format(NS)), + 'The Fox In Socks Extension') + + def test_get_extension_xml(self): + response = self.test_app.get("/extensions/FOXNSOX.xml") + self.assertEqual(200, response.status_int) + xml = response.body + + root = etree.XML(xml) + self.assertEqual(root.tag.split('extension')[0], NS) + self.assertEqual(root.get('alias'), 'FOXNSOX') + self.assertEqual(root.get('name'), 'Fox In Socks') + self.assertEqual(root.get('namespace'), + 'http://www.fox.in.socks/api/ext/pie/v1.0') + self.assertEqual(root.get('updated'), '2011-01-22T13:25:27-06:00') + self.assertEqual(root.findtext('{0}description'.format(NS)), + 'The Fox In Socks Extension') + + +class ExtensionsXMLSerializerTest(unittest.TestCase): + + def test_serialize_extenstion(self): + serializer = extensions.ExtensionsXMLSerializer() + data = {'extension': { + 'name': 'ext1', + 'namespace': 'http://docs.rack.com/servers/api/ext/pie/v1.0', + 'alias': 'RS-PIE', + 'updated': '2011-01-22T13:25:27-06:00', + 'description': 'Adds the capability to share an image.', + 'links': [{'rel': 'describedby', + 'type': 'application/pdf', + 'href': 'http://docs.rack.com/servers/api/ext/cs.pdf'}, + {'rel': 'describedby', + 'type': 'application/vnd.sun.wadl+xml', + 'href': 'http://docs.rack.com/servers/api/ext/cs.wadl'}]}} + + xml = serializer.serialize(data, 'show') + root = etree.XML(xml) + ext_dict = data['extension'] + self.assertEqual(root.findtext('{0}description'.format(NS)), + ext_dict['description']) + + for key in ['name', 'namespace', 'alias', 'updated']: + self.assertEqual(root.get(key), ext_dict[key]) + + link_nodes = root.findall('{0}link'.format(ATOMNS)) + self.assertEqual(len(link_nodes), 2) + for i, link in enumerate(ext_dict['links']): + for key, value in link.items(): + self.assertEqual(link_nodes[i].get(key), value) + + def test_serialize_extensions(self): + serializer = extensions.ExtensionsXMLSerializer() + data = {"extensions": [{ + "name": "Public Image Extension", + "namespace": "http://foo.com/api/ext/pie/v1.0", + "alias": "RS-PIE", + "updated": "2011-01-22T13:25:27-06:00", + "description": "Adds the capability to share an image.", + "links": [{"rel": "describedby", + "type": "application/pdf", + "type": "application/vnd.sun.wadl+xml", + "href": "http://foo.com/api/ext/cs-pie.pdf"}, + {"rel": "describedby", + "type": "application/vnd.sun.wadl+xml", + "href": "http://foo.com/api/ext/cs-pie.wadl"}]}, + {"name": "Cloud Block Storage", + "namespace": "http://foo.com/api/ext/cbs/v1.0", + "alias": "RS-CBS", + "updated": "2011-01-12T11:22:33-06:00", + "description": "Allows mounting cloud block storage.", + "links": [{"rel": "describedby", + "type": "application/pdf", + "href": "http://foo.com/api/ext/cs-cbs.pdf"}, + {"rel": "describedby", + "type": "application/vnd.sun.wadl+xml", + "href": "http://foo.com/api/ext/cs-cbs.wadl"}]}]} + + xml = serializer.serialize(data, 'index') + root = etree.XML(xml) + ext_elems = root.findall('{0}extension'.format(NS)) + self.assertEqual(len(ext_elems), 2) + for i, ext_elem in enumerate(ext_elems): + ext_dict = data['extensions'][i] + self.assertEqual(ext_elem.findtext('{0}description'.format(NS)), + ext_dict['description']) + + for key in ['name', 'namespace', 'alias', 'updated']: + self.assertEqual(ext_elem.get(key), ext_dict[key]) + + link_nodes = ext_elem.findall('{0}link'.format(ATOMNS)) + self.assertEqual(len(link_nodes), 2) + for i, link in enumerate(ext_dict['links']): + for key, value in link.items(): + self.assertEqual(link_nodes[i].get(key), value) + + +def app_factory(global_conf, **local_conf): + conf = global_conf.copy() + conf.update(local_conf) + return ExtensionsTestApp(conf) + + +def setup_base_app(): + options = {'config_file': test_conf_file} + conf, app = config.load_paste_app('extensions_test_app', options, None) + return app + + +def setup_extensions_middleware(extension_manager=None): + extension_manager = (extension_manager or + ExtensionManager(extensions_path)) + options = {'config_file': test_conf_file} + conf, app = config.load_paste_app('extensions_test_app', options, None) + return ExtensionMiddleware(app, extension_manager) + + +def setup_extensions_test_app(extension_manager=None): + return TestApp(setup_extensions_middleware(extension_manager)) + + +class SimpleExtensionManager(object): + + def __init__(self, resource_ext=None, action_ext=None, request_ext=None): + self.resource_ext = resource_ext + self.action_ext = action_ext + self.request_ext = request_ext + + def get_resources(self): + resource_exts = [] + if self.resource_ext: + resource_exts.append(self.resource_ext) + return resource_exts + + def get_actions(self): + action_exts = [] + if self.action_ext: + action_exts.append(self.action_ext) + return action_exts + + def get_request_extensions(self): + request_extensions = [] + if self.request_ext: + request_extensions.append(self.request_ext) + return request_extensions diff --git a/tests/unit/test_wsgi.py b/tests/unit/test_wsgi.py new file mode 100644 index 0000000..7a5eaa4 --- /dev/null +++ b/tests/unit/test_wsgi.py @@ -0,0 +1,413 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import unittest +import webob + +from openstack.common import exception +from openstack.common import wsgi + + +class RequestTest(unittest.TestCase): + + def test_content_type_missing(self): + request = wsgi.Request.blank('/tests/123', method='POST') + request.body = "<body />" + self.assertEqual(None, request.get_content_type()) + + def test_content_type_unsupported(self): + request = wsgi.Request.blank('/tests/123', method='POST') + request.headers["Content-Type"] = "text/html" + request.body = "asdf<br />" + self.assertRaises(exception.InvalidContentType, + request.get_content_type) + + def test_content_type_with_charset(self): + request = wsgi.Request.blank('/tests/123') + request.headers["Content-Type"] = "application/json; charset=UTF-8" + result = request.get_content_type() + self.assertEqual(result, "application/json") + + def test_content_type_with_given_content_types(self): + request = wsgi.Request.blank('/tests/123') + request.headers["Content-Type"] = "application/new-type;" + result = request.get_content_type(["application/json", + "application/new-type"]) + self.assertEqual(result, "application/new-type") + + def test_content_type_from_accept_xml(self): + request = wsgi.Request.blank('/tests/123') + request.headers["Accept"] = "application/xml" + result = request.best_match_content_type() + self.assertEqual(result, "application/xml") + + request = wsgi.Request.blank('/tests/123') + request.headers["Accept"] = "application/json" + result = request.best_match_content_type() + self.assertEqual(result, "application/json") + + request = wsgi.Request.blank('/tests/123') + request.headers["Accept"] = "application/xml, application/json" + result = request.best_match_content_type() + self.assertEqual(result, "application/json") + + request = wsgi.Request.blank('/tests/123') + request.headers["Accept"] = \ + "application/json; q=0.3, application/xml; q=0.9" + result = request.best_match_content_type() + self.assertEqual(result, "application/xml") + + def test_content_type_from_query_extension(self): + request = wsgi.Request.blank('/tests/123.xml') + result = request.best_match_content_type() + self.assertEqual(result, "application/xml") + + request = wsgi.Request.blank('/tests/123.json') + result = request.best_match_content_type() + self.assertEqual(result, "application/json") + + request = wsgi.Request.blank('/tests/123.invalid') + result = request.best_match_content_type() + self.assertEqual(result, "application/json") + + def test_content_type_accept_and_query_extension(self): + request = wsgi.Request.blank('/tests/123.xml') + request.headers["Accept"] = "application/json" + result = request.best_match_content_type() + self.assertEqual(result, "application/xml") + + def test_content_type_accept_default(self): + request = wsgi.Request.blank('/tests/123.unsupported') + request.headers["Accept"] = "application/unsupported1" + result = request.best_match_content_type() + self.assertEqual(result, "application/json") + + def test_content_type_accept_with_given_content_types(self): + request = wsgi.Request.blank('/tests/123') + request.headers["Accept"] = "application/new_type" + result = request.best_match_content_type(["application/new_type"]) + self.assertEqual(result, "application/new_type") + + +class ActionDispatcherTest(unittest.TestCase): + + def test_dispatch(self): + serializer = wsgi.ActionDispatcher() + serializer.create = lambda x: x + self.assertEqual(serializer.dispatch('pants', action='create'), + 'pants') + + def test_dispatch_action_None(self): + serializer = wsgi.ActionDispatcher() + serializer.create = lambda x: x + ' pants' + serializer.default = lambda x: x + ' trousers' + self.assertEqual(serializer.dispatch('Two', action=None), + 'Two trousers') + + def test_dispatch_default(self): + serializer = wsgi.ActionDispatcher() + serializer.create = lambda x: x + ' pants' + serializer.default = lambda x: x + ' trousers' + self.assertEqual(serializer.dispatch('Two', action='update'), + 'Two trousers') + + +class ResponseHeadersSerializerTest(unittest.TestCase): + + def test_default(self): + serializer = wsgi.ResponseHeadersSerializer() + response = webob.Response() + serializer.serialize(response, {'v': '123'}, 'asdf') + self.assertEqual(response.status_int, 200) + + def test_custom(self): + class Serializer(wsgi.ResponseHeadersSerializer): + def update(self, response, data): + response.status_int = 404 + response.headers['X-Custom-Header'] = data['v'] + serializer = Serializer() + response = webob.Response() + serializer.serialize(response, {'v': '123'}, 'update') + self.assertEqual(response.status_int, 404) + self.assertEqual(response.headers['X-Custom-Header'], '123') + + +class DictSerializerTest(unittest.TestCase): + + def test_dispatch_default(self): + serializer = wsgi.DictSerializer() + self.assertEqual(serializer.serialize({}, 'NonExistantAction'), '') + + +class XMLDictSerializerTest(unittest.TestCase): + + def test_xml(self): + input_dict = dict(servers=dict(a=(2, 3))) + expected_xml = """<servers xmlns="asdf"> + <a>(2,3)</a> + </servers>""" + serializer = wsgi.XMLDictSerializer(xmlns="asdf") + result = serializer.serialize(input_dict) + result = result.replace('\n', '').replace(' ', '') + expected_xml = expected_xml.replace('\n', '').replace(' ', '') + self.assertEqual(result, expected_xml) + + +class JSONDictSerializerTest(unittest.TestCase): + + def test_json(self): + input_dict = dict(servers=dict(a=(2, 3))) + expected_json = '{"servers":{"a":[2,3]}}' + serializer = wsgi.JSONDictSerializer() + result = serializer.serialize(input_dict) + result = result.replace('\n', '').replace(' ', '') + self.assertEqual(result, expected_json) + + +class TextDeserializerTest(unittest.TestCase): + + def test_dispatch_default(self): + deserializer = wsgi.TextDeserializer() + self.assertEqual(deserializer.deserialize({}, 'update'), {}) + + +class JSONDeserializerTest(unittest.TestCase): + + def test_json(self): + data = """{"a": { + "a1": "1", + "a2": "2", + "bs": ["1", "2", "3", {"c": {"c1": "1"}}], + "d": {"e": "1"}, + "f": "1"}}""" + as_dict = { + 'body': { + 'a': { + 'a1': '1', + 'a2': '2', + 'bs': ['1', '2', '3', {'c': {'c1': '1'}}], + 'd': {'e': '1'}, + 'f': '1', + }, + }, + } + deserializer = wsgi.JSONDeserializer() + self.assertEqual(deserializer.deserialize(data), as_dict) + + +class XMLDeserializerTest(unittest.TestCase): + + def test_xml(self): + xml = """ + <a a1="1" a2="2"> + <bs><b>1</b><b>2</b><b>3</b><b><c c1="1"/></b></bs> + <d><e>1</e></d> + <f>1</f> + </a> + """.strip() + as_dict = { + 'body': { + 'a': { + 'a1': '1', + 'a2': '2', + 'bs': ['1', '2', '3', {'c': {'c1': '1'}}], + 'd': {'e': '1'}, + 'f': '1', + }, + }, + } + metadata = {'plurals': {'bs': 'b', 'ts': 't'}} + deserializer = wsgi.XMLDeserializer(metadata=metadata) + self.assertEqual(deserializer.deserialize(xml), as_dict) + + def test_xml_empty(self): + xml = '<a></a>' + as_dict = {"body": {"a": {}}} + deserializer = wsgi.XMLDeserializer() + self.assertEqual(deserializer.deserialize(xml), as_dict) + + +class RequestHeadersDeserializerTest(unittest.TestCase): + + def test_default(self): + deserializer = wsgi.RequestHeadersDeserializer() + req = wsgi.Request.blank('/') + self.assertEqual(deserializer.deserialize(req, 'nonExistant'), {}) + + def test_custom(self): + class Deserializer(wsgi.RequestHeadersDeserializer): + def update(self, request): + return {'a': request.headers['X-Custom-Header']} + deserializer = Deserializer() + req = wsgi.Request.blank('/') + req.headers['X-Custom-Header'] = 'b' + self.assertEqual(deserializer.deserialize(req, 'update'), {'a': 'b'}) + + +class ResponseSerializerTest(unittest.TestCase): + + def setUp(self): + class JSONSerializer(object): + def serialize(self, data, action='default'): + return 'pew_json' + + class XMLSerializer(object): + def serialize(self, data, action='default'): + return 'pew_xml' + + class HeadersSerializer(object): + def serialize(self, response, data, action): + response.status_int = 404 + + self.body_serializers = { + 'application/json': JSONSerializer(), + 'application/XML': XMLSerializer(), + } + + self.serializer = wsgi.ResponseSerializer(self.body_serializers, + HeadersSerializer()) + + def tearDown(self): + pass + + def test_get_serializer(self): + ctype = 'application/json' + self.assertEqual(self.serializer.get_body_serializer(ctype), + self.body_serializers[ctype]) + + def test_get_serializer_unknown_content_type(self): + self.assertRaises(exception.InvalidContentType, + self.serializer.get_body_serializer, + 'application/unknown') + + def test_serialize_response(self): + response = self.serializer.serialize({}, 'application/json') + self.assertEqual(response.headers['Content-Type'], 'application/json') + self.assertEqual(response.body, 'pew_json') + self.assertEqual(response.status_int, 404) + + def test_serialize_response_None(self): + response = self.serializer.serialize(None, 'application/json') + + self.assertEqual(response.headers['Content-Type'], 'application/json') + self.assertEqual(response.body, '') + self.assertEqual(response.status_int, 404) + + def test_serialize_response_dict_to_unknown_content_type(self): + self.assertRaises(exception.InvalidContentType, + self.serializer.serialize, + {}, 'application/unknown') + + +class RequestDeserializerTest(unittest.TestCase): + + def setUp(self): + class JSONDeserializer(object): + def deserialize(self, data, action='default'): + return 'pew_json' + + class XMLDeserializer(object): + def deserialize(self, data, action='default'): + return 'pew_xml' + + self.body_deserializers = { + 'application/json': JSONDeserializer(), + 'application/XML': XMLDeserializer(), + } + + self.deserializer = wsgi.RequestDeserializer(self.body_deserializers) + + def test_get_deserializer(self): + expected = self.deserializer.get_body_deserializer('application/json') + self.assertEqual(expected, self.body_deserializers['application/json']) + + def test_get_deserializer_unknown_content_type(self): + self.assertRaises(exception.InvalidContentType, + self.deserializer.get_body_deserializer, + 'application/unknown') + + def test_get_expected_content_type(self): + request = wsgi.Request.blank('/') + request.headers['Accept'] = 'application/json' + self.assertEqual(self.deserializer.get_expected_content_type(request), + 'application/json') + + def test_get_action_args(self): + env = { + 'wsgiorg.routing_args': [None, { + 'controller': None, + 'format': None, + 'action': 'update', + 'id': 12, + }], + } + + expected = {'action': 'update', 'id': 12} + + self.assertEqual(self.deserializer.get_action_args(env), expected) + + def test_deserialize(self): + def fake_get_routing_args(request): + return {'action': 'create'} + self.deserializer.get_action_args = fake_get_routing_args + + request = wsgi.Request.blank('/') + request.headers['Accept'] = 'application/xml' + + deserialized = self.deserializer.deserialize(request) + expected = ('create', {}, 'application/xml') + + self.assertEqual(expected, deserialized) + + +class ResourceTest(unittest.TestCase): + + def test_dispatch(self): + class Controller(object): + def index(self, req, pants=None): + return pants + + resource = wsgi.Resource(Controller()) + actual = resource.dispatch(resource.controller, + 'index', None, pants='off') + expected = 'off' + self.assertEqual(actual, expected) + + def test_dispatch_unknown_controller_action(self): + class Controller(object): + def index(self, req, pants=None): + return pants + + resource = wsgi.Resource(Controller()) + self.assertRaises(AttributeError, resource.dispatch, + resource.controller, 'create', None, {}) + + def test_malformed_request_body_throws_bad_request(self): + resource = wsgi.Resource(None) + request = wsgi.Request.blank("/", body="{mal:formed", method='POST', + headers={'Content-Type': "application/json"}) + + response = resource(request) + self.assertEqual(response.status, '400 Bad Request') + + def test_wrong_content_type_throws_unsupported_media_type_error(self): + resource = wsgi.Resource(None) + request = wsgi.Request.blank("/", body="{some:json}", method='POST', + headers={'Content-Type': "xxx"}) + + response = resource(request) + self.assertEqual(response.status, '415 Unsupported Media Type') |
