summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
authorJason Kölker <jason@koelker.net>2011-11-01 13:26:41 -0500
committerJason Kölker <jason@koelker.net>2011-11-01 13:26:41 -0500
commit997c2e8eb0ade364a8920dd085ec0e24f56182fb (patch)
tree5641d99a31163258d25be0af7da3f34b395359fa /tests
parent1a8fa72410cacaf2589544ecc161306de9d13766 (diff)
parent02c95aeb2ffe112f7b60a1d3c53cdde22bc5db4d (diff)
downloadoslo-997c2e8eb0ade364a8920dd085ec0e24f56182fb.tar.gz
oslo-997c2e8eb0ade364a8920dd085ec0e24f56182fb.tar.xz
oslo-997c2e8eb0ade364a8920dd085ec0e24f56182fb.zip
merge in upstream
Diffstat (limited to 'tests')
-rw-r--r--tests/unit/extension_stubs.py53
-rw-r--r--tests/unit/extensions/__init__.py15
-rw-r--r--tests/unit/extensions/foxinsocks.py102
-rw-r--r--tests/unit/test_extensions.py504
-rw-r--r--tests/unit/test_wsgi.py413
5 files changed, 1087 insertions, 0 deletions
diff --git a/tests/unit/extension_stubs.py b/tests/unit/extension_stubs.py
new file mode 100644
index 0000000..c25f285
--- /dev/null
+++ b/tests/unit/extension_stubs.py
@@ -0,0 +1,53 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from openstack.common import wsgi
+
+
+class StubExtension(object):
+
+ def __init__(self, alias="stub_extension"):
+ self.alias = alias
+
+ def get_name(self):
+ return "Stub Extension"
+
+ def get_alias(self):
+ return self.alias
+
+ def get_description(self):
+ return ""
+
+ def get_namespace(self):
+ return ""
+
+ def get_updated(self):
+ return ""
+
+
+class StubBaseAppController(object):
+
+ def index(self, request):
+ return "base app index"
+
+ def show(self, request, id):
+ return {'fort': 'knox'}
+
+ def update(self, request, id, body=None):
+ return {'uneditable': 'original_value'}
+
+ def create_resource(self):
+ return wsgi.Resource(self)
diff --git a/tests/unit/extensions/__init__.py b/tests/unit/extensions/__init__.py
new file mode 100644
index 0000000..848908a
--- /dev/null
+++ b/tests/unit/extensions/__init__.py
@@ -0,0 +1,15 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
diff --git a/tests/unit/extensions/foxinsocks.py b/tests/unit/extensions/foxinsocks.py
new file mode 100644
index 0000000..a0efd7e
--- /dev/null
+++ b/tests/unit/extensions/foxinsocks.py
@@ -0,0 +1,102 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+
+from openstack.common import extensions
+from openstack.common import wsgi
+
+
+class FoxInSocksController(object):
+
+ def index(self, request):
+ return "Try to say this Mr. Knox, sir..."
+
+ def create_resource(self):
+ return wsgi.Resource(self)
+
+
+class Foxinsocks(object):
+
+ def __init__(self):
+ pass
+
+ def get_name(self):
+ return "Fox In Socks"
+
+ def get_alias(self):
+ return "FOXNSOX"
+
+ def get_description(self):
+ return "The Fox In Socks Extension"
+
+ def get_namespace(self):
+ return "http://www.fox.in.socks/api/ext/pie/v1.0"
+
+ def get_updated(self):
+ return "2011-01-22T13:25:27-06:00"
+
+ def get_resources(self):
+ resources = []
+ resource = extensions.ResourceExtension('foxnsocks',
+ FoxInSocksController())
+ resources.append(resource)
+ return resources
+
+ def get_actions(self):
+ return [extensions.ActionExtension('dummy_resources',
+ 'FOXNSOX:add_tweedle',
+ self._add_tweedle_handler),
+ extensions.ActionExtension('dummy_resources',
+ 'FOXNSOX:delete_tweedle',
+ self._delete_tweedle_handler)]
+
+ def get_request_extensions(self):
+ request_exts = []
+
+ def _goose_handler(req, res):
+ #NOTE: This only handles JSON responses.
+ # You can use content type header to test for XML.
+ data = json.loads(res.body)
+ data['FOXNSOX:googoose'] = req.GET.get('chewing')
+ res.body = json.dumps(data)
+ return res
+
+ req_ext1 = extensions.RequestExtension('GET', '/dummy_resources/:(id)',
+ _goose_handler)
+ request_exts.append(req_ext1)
+
+ def _bands_handler(req, res):
+ #NOTE: This only handles JSON responses.
+ # You can use content type header to test for XML.
+ data = json.loads(res.body)
+ data['FOXNSOX:big_bands'] = 'Pig Bands!'
+ res.body = json.dumps(data)
+ return res
+
+ req_ext2 = extensions.RequestExtension('GET', '/dummy_resources/:(id)',
+ _bands_handler)
+ request_exts.append(req_ext2)
+ return request_exts
+
+ def _add_tweedle_handler(self, input_dict, req, id):
+ return "Tweedle {0} Added.".format(
+ input_dict['FOXNSOX:add_tweedle']['name'])
+
+ def _delete_tweedle_handler(self, input_dict, req, id):
+ return "Tweedle {0} Deleted.".format(
+ input_dict['FOXNSOX:delete_tweedle']['name'])
diff --git a/tests/unit/test_extensions.py b/tests/unit/test_extensions.py
new file mode 100644
index 0000000..841bf4d
--- /dev/null
+++ b/tests/unit/test_extensions.py
@@ -0,0 +1,504 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+from lxml import etree
+import os.path
+import routes
+import unittest
+
+from webtest import TestApp
+
+
+from openstack.common import wsgi
+from openstack.common import config
+from openstack.common import extensions
+from tests.unit.extension_stubs import (StubExtension,
+ StubBaseAppController)
+from openstack.common.extensions import (ExtensionManager,
+ ExtensionMiddleware)
+
+
+test_conf_file = os.path.join(os.path.dirname(__file__), os.pardir,
+ os.pardir, 'etc', 'openstack-common.conf.test')
+extensions_path = os.path.join(os.path.dirname(__file__), "extensions")
+
+NS = "{http://docs.openstack.org/}"
+ATOMNS = "{http://www.w3.org/2005/Atom}"
+
+
+class ExtensionsTestApp(wsgi.Router):
+
+ def __init__(self, options={}):
+ mapper = routes.Mapper()
+ controller = StubBaseAppController()
+ mapper.resource("dummy_resource", "/dummy_resources",
+ controller=controller.create_resource())
+ super(ExtensionsTestApp, self).__init__(mapper)
+
+
+class ResourceExtensionTest(unittest.TestCase):
+
+ class ResourceExtensionController(object):
+
+ def index(self, request):
+ return "resource index"
+
+ def show(self, request, id):
+ return {'data': {'id': id}}
+
+ def custom_member_action(self, request, id):
+ return {'member_action': 'value'}
+
+ def custom_collection_action(self, request, **kwargs):
+ return {'collection': 'value'}
+
+ def test_resource_can_be_added_as_extension(self):
+ res_ext = extensions.ResourceExtension('tweedles',
+ self.ResourceExtensionController())
+ test_app = setup_extensions_test_app(SimpleExtensionManager(res_ext))
+
+ index_response = test_app.get("/tweedles")
+ self.assertEqual(200, index_response.status_int)
+ self.assertEqual("resource index", index_response.json)
+
+ show_response = test_app.get("/tweedles/25266")
+ self.assertEqual({'data': {'id': "25266"}}, show_response.json)
+
+ def test_resource_extension_with_custom_member_action(self):
+ controller = self.ResourceExtensionController()
+ member = {'custom_member_action': "GET"}
+ res_ext = extensions.ResourceExtension('tweedles', controller,
+ member_actions=member)
+ test_app = setup_extensions_test_app(SimpleExtensionManager(res_ext))
+
+ response = test_app.get("/tweedles/some_id/custom_member_action")
+ self.assertEqual(200, response.status_int)
+ self.assertEqual(json.loads(response.body)['member_action'], "value")
+
+ def test_resource_extension_for_get_custom_collection_action(self):
+ controller = self.ResourceExtensionController()
+ collections = {'custom_collection_action': "PUT"}
+ res_ext = extensions.ResourceExtension('tweedles', controller,
+ collection_actions=collections)
+ test_app = setup_extensions_test_app(SimpleExtensionManager(res_ext))
+
+ response = test_app.put("/tweedles/custom_collection_action")
+ self.assertEqual(200, response.status_int)
+ self.assertEqual(json.loads(response.body)['collection'], "value")
+
+ def test_resource_extension_for_put_custom_collection_action(self):
+ controller = self.ResourceExtensionController()
+ collections = {'custom_collection_action': "PUT"}
+ res_ext = extensions.ResourceExtension('tweedles', controller,
+ collection_actions=collections)
+ test_app = setup_extensions_test_app(SimpleExtensionManager(res_ext))
+
+ response = test_app.put("/tweedles/custom_collection_action")
+
+ self.assertEqual(200, response.status_int)
+ self.assertEqual(json.loads(response.body)['collection'], 'value')
+
+ def test_resource_extension_for_post_custom_collection_action(self):
+ controller = self.ResourceExtensionController()
+ collections = {'custom_collection_action': "POST"}
+ res_ext = extensions.ResourceExtension('tweedles', controller,
+ collection_actions=collections)
+ test_app = setup_extensions_test_app(SimpleExtensionManager(res_ext))
+
+ response = test_app.post("/tweedles/custom_collection_action")
+
+ self.assertEqual(200, response.status_int)
+ self.assertEqual(json.loads(response.body)['collection'], 'value')
+
+ def test_resource_extension_for_delete_custom_collection_action(self):
+ controller = self.ResourceExtensionController()
+ collections = {'custom_collection_action': "DELETE"}
+ res_ext = extensions.ResourceExtension('tweedles', controller,
+ collection_actions=collections)
+ test_app = setup_extensions_test_app(SimpleExtensionManager(res_ext))
+
+ response = test_app.delete("/tweedles/custom_collection_action")
+
+ self.assertEqual(200, response.status_int)
+ self.assertEqual(json.loads(response.body)['collection'], 'value')
+
+ def test_resource_ext_for_formatted_req_on_custom_collection_action(self):
+ controller = self.ResourceExtensionController()
+ collections = {'custom_collection_action': "GET"}
+ res_ext = extensions.ResourceExtension('tweedles', controller,
+ collection_actions=collections)
+ test_app = setup_extensions_test_app(SimpleExtensionManager(res_ext))
+
+ response = test_app.get("/tweedles/custom_collection_action.json")
+
+ self.assertEqual(200, response.status_int)
+ self.assertEqual(json.loads(response.body)['collection'], "value")
+
+ def test_resource_ext_for_nested_resource_custom_collection_action(self):
+ controller = self.ResourceExtensionController()
+ collections = {'custom_collection_action': "GET"}
+ parent = dict(collection_name='beetles', member_name='beetle')
+ res_ext = extensions.ResourceExtension('tweedles', controller,
+ collection_actions=collections,
+ parent=parent)
+ test_app = setup_extensions_test_app(SimpleExtensionManager(res_ext))
+
+ response = test_app.get("/beetles/beetle_id"
+ "/tweedles/custom_collection_action")
+
+ self.assertEqual(200, response.status_int)
+ self.assertEqual(json.loads(response.body)['collection'], "value")
+
+ def test_returns_404_for_non_existant_extension(self):
+ test_app = setup_extensions_test_app(SimpleExtensionManager(None))
+
+ response = test_app.get("/non_extistant_extension", status='*')
+
+ self.assertEqual(404, response.status_int)
+
+
+class ActionExtensionTest(unittest.TestCase):
+
+ def setUp(self):
+ super(ActionExtensionTest, self).setUp()
+ self.extension_app = setup_extensions_test_app()
+
+ def test_extended_action_for_adding_extra_data(self):
+ action_name = 'FOXNSOX:add_tweedle'
+ action_params = dict(name='Beetle')
+ req_body = json.dumps({action_name: action_params})
+ response = self.extension_app.post('/dummy_resources/1/action',
+ req_body, content_type='application/json')
+
+ self.assertEqual("Tweedle Beetle Added.", response.json)
+
+ def test_extended_action_for_deleting_extra_data(self):
+ action_name = 'FOXNSOX:delete_tweedle'
+ action_params = dict(name='Bailey')
+ req_body = json.dumps({action_name: action_params})
+ response = self.extension_app.post("/dummy_resources/1/action",
+ req_body, content_type='application/json')
+ self.assertEqual("Tweedle Bailey Deleted.", response.json)
+
+ def test_returns_404_for_non_existant_action(self):
+ non_existant_action = 'blah_action'
+ action_params = dict(name="test")
+ req_body = json.dumps({non_existant_action: action_params})
+
+ response = self.extension_app.post("/dummy_resources/1/action",
+ req_body, content_type='application/json',
+ status='*')
+
+ self.assertEqual(404, response.status_int)
+
+ def test_returns_404_for_non_existant_resource(self):
+ action_name = 'add_tweedle'
+ action_params = dict(name='Beetle')
+ req_body = json.dumps({action_name: action_params})
+
+ response = self.extension_app.post("/asdf/1/action", req_body,
+ content_type='application/json', status='*')
+ self.assertEqual(404, response.status_int)
+
+
+class RequestExtensionTest(unittest.TestCase):
+
+ def test_headers_can_be_extended(self):
+ def extend_headers(req, res):
+ assert req.headers['X-NEW-REQUEST-HEADER'] == "sox"
+ res.headers['X-NEW-RESPONSE-HEADER'] = "response_header_data"
+ return res
+
+ app = self._setup_app_with_request_handler(extend_headers, 'GET')
+ response = app.get("/dummy_resources/1",
+ headers={'X-NEW-REQUEST-HEADER': "sox"})
+
+ self.assertEqual(response.headers['X-NEW-RESPONSE-HEADER'],
+ "response_header_data")
+
+ def test_extend_get_resource_response(self):
+ def extend_response_data(req, res):
+ data = json.loads(res.body)
+ data['FOXNSOX:extended_key'] = req.GET.get('extended_key')
+ res.body = json.dumps(data)
+ return res
+
+ app = self._setup_app_with_request_handler(extend_response_data, 'GET')
+ response = app.get("/dummy_resources/1?extended_key=extended_data")
+
+ self.assertEqual(200, response.status_int)
+
+ response_data = json.loads(response.body)
+ self.assertEqual('extended_data',
+ response_data['FOXNSOX:extended_key'])
+ self.assertEqual('knox', response_data['fort'])
+
+ def test_get_resources(self):
+ app = setup_extensions_test_app()
+
+ response = app.get("/dummy_resources/1?chewing=newblue")
+
+ response_data = json.loads(response.body)
+ self.assertEqual('newblue', response_data['FOXNSOX:googoose'])
+ self.assertEqual("Pig Bands!", response_data['FOXNSOX:big_bands'])
+
+ def test_edit_previously_uneditable_field(self):
+
+ def _update_handler(req, res):
+ data = json.loads(res.body)
+ data['uneditable'] = json.loads(req.body)['uneditable']
+ res.body = json.dumps(data)
+ return res
+
+ base_app = TestApp(setup_base_app())
+ response = base_app.put("/dummy_resources/1",
+ json.dumps({'uneditable': "new_value"}),
+ headers={'Content-Type': "application/json"})
+ self.assertEqual(response.json['uneditable'], "original_value")
+
+ ext_app = self._setup_app_with_request_handler(_update_handler,
+ 'PUT')
+ ext_response = ext_app.put("/dummy_resources/1",
+ json.dumps({'uneditable': "new_value"}),
+ headers={'Content-Type': "application/json"})
+ self.assertEqual(ext_response.json['uneditable'], "new_value")
+
+ def _setup_app_with_request_handler(self, handler, verb):
+ req_ext = extensions.RequestExtension(verb,
+ '/dummy_resources/:(id)', handler)
+ manager = SimpleExtensionManager(None, None, req_ext)
+ return setup_extensions_test_app(manager)
+
+
+class ExtensionManagerTest(unittest.TestCase):
+
+ def test_invalid_extensions_are_not_registered(self):
+
+ class InvalidExtension(object):
+ """
+ This Extension doesn't implement extension methods :
+ get_name, get_description, get_namespace and get_updated
+ """
+ def get_alias(self):
+ return "invalid_extension"
+
+ ext_mgr = ExtensionManager('')
+ ext_mgr.add_extension(InvalidExtension())
+ ext_mgr.add_extension(StubExtension("valid_extension"))
+
+ self.assertTrue('valid_extension' in ext_mgr.extensions)
+ self.assertFalse('invalid_extension' in ext_mgr.extensions)
+
+
+class ExtensionControllerTest(unittest.TestCase):
+
+ def setUp(self):
+ super(ExtensionControllerTest, self).setUp()
+ self.test_app = setup_extensions_test_app()
+
+ def test_index_gets_all_registerd_extensions(self):
+ response = self.test_app.get("/extensions")
+ foxnsox = response.json["extensions"][0]
+
+ self.assertEqual(foxnsox, {
+ 'namespace': 'http://www.fox.in.socks/api/ext/pie/v1.0',
+ 'name': 'Fox In Socks',
+ 'updated': '2011-01-22T13:25:27-06:00',
+ 'description': 'The Fox In Socks Extension',
+ 'alias': 'FOXNSOX',
+ 'links': []
+ }
+ )
+
+ def test_extension_can_be_accessed_by_alias(self):
+ json_response = self.test_app.get("/extensions/FOXNSOX").json
+ foxnsox = json_response['extension']
+
+ self.assertEqual(foxnsox, {
+ 'namespace': 'http://www.fox.in.socks/api/ext/pie/v1.0',
+ 'name': 'Fox In Socks',
+ 'updated': '2011-01-22T13:25:27-06:00',
+ 'description': 'The Fox In Socks Extension',
+ 'alias': 'FOXNSOX',
+ 'links': []
+ }
+ )
+
+ def test_show_returns_not_found_for_non_existant_extension(self):
+ response = self.test_app.get("/extensions/non_existant", status="*")
+
+ self.assertEqual(response.status_int, 404)
+
+ def test_list_extensions_xml(self):
+ response = self.test_app.get("/extensions.xml")
+
+ self.assertEqual(200, response.status_int)
+ root = etree.XML(response.body)
+ self.assertEqual(root.tag.split('extensions')[0], NS)
+
+ # Make sure that Fox in Sox extension is correct.
+ exts = root.findall('{0}extension'.format(NS))
+ fox_ext = exts[0]
+ self.assertEqual(fox_ext.get('name'), 'Fox In Socks')
+ self.assertEqual(fox_ext.get('namespace'),
+ 'http://www.fox.in.socks/api/ext/pie/v1.0')
+ self.assertEqual(fox_ext.get('updated'), '2011-01-22T13:25:27-06:00')
+ self.assertEqual(fox_ext.findtext('{0}description'.format(NS)),
+ 'The Fox In Socks Extension')
+
+ def test_get_extension_xml(self):
+ response = self.test_app.get("/extensions/FOXNSOX.xml")
+ self.assertEqual(200, response.status_int)
+ xml = response.body
+
+ root = etree.XML(xml)
+ self.assertEqual(root.tag.split('extension')[0], NS)
+ self.assertEqual(root.get('alias'), 'FOXNSOX')
+ self.assertEqual(root.get('name'), 'Fox In Socks')
+ self.assertEqual(root.get('namespace'),
+ 'http://www.fox.in.socks/api/ext/pie/v1.0')
+ self.assertEqual(root.get('updated'), '2011-01-22T13:25:27-06:00')
+ self.assertEqual(root.findtext('{0}description'.format(NS)),
+ 'The Fox In Socks Extension')
+
+
+class ExtensionsXMLSerializerTest(unittest.TestCase):
+
+ def test_serialize_extenstion(self):
+ serializer = extensions.ExtensionsXMLSerializer()
+ data = {'extension': {
+ 'name': 'ext1',
+ 'namespace': 'http://docs.rack.com/servers/api/ext/pie/v1.0',
+ 'alias': 'RS-PIE',
+ 'updated': '2011-01-22T13:25:27-06:00',
+ 'description': 'Adds the capability to share an image.',
+ 'links': [{'rel': 'describedby',
+ 'type': 'application/pdf',
+ 'href': 'http://docs.rack.com/servers/api/ext/cs.pdf'},
+ {'rel': 'describedby',
+ 'type': 'application/vnd.sun.wadl+xml',
+ 'href': 'http://docs.rack.com/servers/api/ext/cs.wadl'}]}}
+
+ xml = serializer.serialize(data, 'show')
+ root = etree.XML(xml)
+ ext_dict = data['extension']
+ self.assertEqual(root.findtext('{0}description'.format(NS)),
+ ext_dict['description'])
+
+ for key in ['name', 'namespace', 'alias', 'updated']:
+ self.assertEqual(root.get(key), ext_dict[key])
+
+ link_nodes = root.findall('{0}link'.format(ATOMNS))
+ self.assertEqual(len(link_nodes), 2)
+ for i, link in enumerate(ext_dict['links']):
+ for key, value in link.items():
+ self.assertEqual(link_nodes[i].get(key), value)
+
+ def test_serialize_extensions(self):
+ serializer = extensions.ExtensionsXMLSerializer()
+ data = {"extensions": [{
+ "name": "Public Image Extension",
+ "namespace": "http://foo.com/api/ext/pie/v1.0",
+ "alias": "RS-PIE",
+ "updated": "2011-01-22T13:25:27-06:00",
+ "description": "Adds the capability to share an image.",
+ "links": [{"rel": "describedby",
+ "type": "application/pdf",
+ "type": "application/vnd.sun.wadl+xml",
+ "href": "http://foo.com/api/ext/cs-pie.pdf"},
+ {"rel": "describedby",
+ "type": "application/vnd.sun.wadl+xml",
+ "href": "http://foo.com/api/ext/cs-pie.wadl"}]},
+ {"name": "Cloud Block Storage",
+ "namespace": "http://foo.com/api/ext/cbs/v1.0",
+ "alias": "RS-CBS",
+ "updated": "2011-01-12T11:22:33-06:00",
+ "description": "Allows mounting cloud block storage.",
+ "links": [{"rel": "describedby",
+ "type": "application/pdf",
+ "href": "http://foo.com/api/ext/cs-cbs.pdf"},
+ {"rel": "describedby",
+ "type": "application/vnd.sun.wadl+xml",
+ "href": "http://foo.com/api/ext/cs-cbs.wadl"}]}]}
+
+ xml = serializer.serialize(data, 'index')
+ root = etree.XML(xml)
+ ext_elems = root.findall('{0}extension'.format(NS))
+ self.assertEqual(len(ext_elems), 2)
+ for i, ext_elem in enumerate(ext_elems):
+ ext_dict = data['extensions'][i]
+ self.assertEqual(ext_elem.findtext('{0}description'.format(NS)),
+ ext_dict['description'])
+
+ for key in ['name', 'namespace', 'alias', 'updated']:
+ self.assertEqual(ext_elem.get(key), ext_dict[key])
+
+ link_nodes = ext_elem.findall('{0}link'.format(ATOMNS))
+ self.assertEqual(len(link_nodes), 2)
+ for i, link in enumerate(ext_dict['links']):
+ for key, value in link.items():
+ self.assertEqual(link_nodes[i].get(key), value)
+
+
+def app_factory(global_conf, **local_conf):
+ conf = global_conf.copy()
+ conf.update(local_conf)
+ return ExtensionsTestApp(conf)
+
+
+def setup_base_app():
+ options = {'config_file': test_conf_file}
+ conf, app = config.load_paste_app('extensions_test_app', options, None)
+ return app
+
+
+def setup_extensions_middleware(extension_manager=None):
+ extension_manager = (extension_manager or
+ ExtensionManager(extensions_path))
+ options = {'config_file': test_conf_file}
+ conf, app = config.load_paste_app('extensions_test_app', options, None)
+ return ExtensionMiddleware(app, extension_manager)
+
+
+def setup_extensions_test_app(extension_manager=None):
+ return TestApp(setup_extensions_middleware(extension_manager))
+
+
+class SimpleExtensionManager(object):
+
+ def __init__(self, resource_ext=None, action_ext=None, request_ext=None):
+ self.resource_ext = resource_ext
+ self.action_ext = action_ext
+ self.request_ext = request_ext
+
+ def get_resources(self):
+ resource_exts = []
+ if self.resource_ext:
+ resource_exts.append(self.resource_ext)
+ return resource_exts
+
+ def get_actions(self):
+ action_exts = []
+ if self.action_ext:
+ action_exts.append(self.action_ext)
+ return action_exts
+
+ def get_request_extensions(self):
+ request_extensions = []
+ if self.request_ext:
+ request_extensions.append(self.request_ext)
+ return request_extensions
diff --git a/tests/unit/test_wsgi.py b/tests/unit/test_wsgi.py
new file mode 100644
index 0000000..7a5eaa4
--- /dev/null
+++ b/tests/unit/test_wsgi.py
@@ -0,0 +1,413 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import unittest
+import webob
+
+from openstack.common import exception
+from openstack.common import wsgi
+
+
+class RequestTest(unittest.TestCase):
+
+ def test_content_type_missing(self):
+ request = wsgi.Request.blank('/tests/123', method='POST')
+ request.body = "<body />"
+ self.assertEqual(None, request.get_content_type())
+
+ def test_content_type_unsupported(self):
+ request = wsgi.Request.blank('/tests/123', method='POST')
+ request.headers["Content-Type"] = "text/html"
+ request.body = "asdf<br />"
+ self.assertRaises(exception.InvalidContentType,
+ request.get_content_type)
+
+ def test_content_type_with_charset(self):
+ request = wsgi.Request.blank('/tests/123')
+ request.headers["Content-Type"] = "application/json; charset=UTF-8"
+ result = request.get_content_type()
+ self.assertEqual(result, "application/json")
+
+ def test_content_type_with_given_content_types(self):
+ request = wsgi.Request.blank('/tests/123')
+ request.headers["Content-Type"] = "application/new-type;"
+ result = request.get_content_type(["application/json",
+ "application/new-type"])
+ self.assertEqual(result, "application/new-type")
+
+ def test_content_type_from_accept_xml(self):
+ request = wsgi.Request.blank('/tests/123')
+ request.headers["Accept"] = "application/xml"
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/xml")
+
+ request = wsgi.Request.blank('/tests/123')
+ request.headers["Accept"] = "application/json"
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/json")
+
+ request = wsgi.Request.blank('/tests/123')
+ request.headers["Accept"] = "application/xml, application/json"
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/json")
+
+ request = wsgi.Request.blank('/tests/123')
+ request.headers["Accept"] = \
+ "application/json; q=0.3, application/xml; q=0.9"
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/xml")
+
+ def test_content_type_from_query_extension(self):
+ request = wsgi.Request.blank('/tests/123.xml')
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/xml")
+
+ request = wsgi.Request.blank('/tests/123.json')
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/json")
+
+ request = wsgi.Request.blank('/tests/123.invalid')
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/json")
+
+ def test_content_type_accept_and_query_extension(self):
+ request = wsgi.Request.blank('/tests/123.xml')
+ request.headers["Accept"] = "application/json"
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/xml")
+
+ def test_content_type_accept_default(self):
+ request = wsgi.Request.blank('/tests/123.unsupported')
+ request.headers["Accept"] = "application/unsupported1"
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/json")
+
+ def test_content_type_accept_with_given_content_types(self):
+ request = wsgi.Request.blank('/tests/123')
+ request.headers["Accept"] = "application/new_type"
+ result = request.best_match_content_type(["application/new_type"])
+ self.assertEqual(result, "application/new_type")
+
+
+class ActionDispatcherTest(unittest.TestCase):
+
+ def test_dispatch(self):
+ serializer = wsgi.ActionDispatcher()
+ serializer.create = lambda x: x
+ self.assertEqual(serializer.dispatch('pants', action='create'),
+ 'pants')
+
+ def test_dispatch_action_None(self):
+ serializer = wsgi.ActionDispatcher()
+ serializer.create = lambda x: x + ' pants'
+ serializer.default = lambda x: x + ' trousers'
+ self.assertEqual(serializer.dispatch('Two', action=None),
+ 'Two trousers')
+
+ def test_dispatch_default(self):
+ serializer = wsgi.ActionDispatcher()
+ serializer.create = lambda x: x + ' pants'
+ serializer.default = lambda x: x + ' trousers'
+ self.assertEqual(serializer.dispatch('Two', action='update'),
+ 'Two trousers')
+
+
+class ResponseHeadersSerializerTest(unittest.TestCase):
+
+ def test_default(self):
+ serializer = wsgi.ResponseHeadersSerializer()
+ response = webob.Response()
+ serializer.serialize(response, {'v': '123'}, 'asdf')
+ self.assertEqual(response.status_int, 200)
+
+ def test_custom(self):
+ class Serializer(wsgi.ResponseHeadersSerializer):
+ def update(self, response, data):
+ response.status_int = 404
+ response.headers['X-Custom-Header'] = data['v']
+ serializer = Serializer()
+ response = webob.Response()
+ serializer.serialize(response, {'v': '123'}, 'update')
+ self.assertEqual(response.status_int, 404)
+ self.assertEqual(response.headers['X-Custom-Header'], '123')
+
+
+class DictSerializerTest(unittest.TestCase):
+
+ def test_dispatch_default(self):
+ serializer = wsgi.DictSerializer()
+ self.assertEqual(serializer.serialize({}, 'NonExistantAction'), '')
+
+
+class XMLDictSerializerTest(unittest.TestCase):
+
+ def test_xml(self):
+ input_dict = dict(servers=dict(a=(2, 3)))
+ expected_xml = """<servers xmlns="asdf">
+ <a>(2,3)</a>
+ </servers>"""
+ serializer = wsgi.XMLDictSerializer(xmlns="asdf")
+ result = serializer.serialize(input_dict)
+ result = result.replace('\n', '').replace(' ', '')
+ expected_xml = expected_xml.replace('\n', '').replace(' ', '')
+ self.assertEqual(result, expected_xml)
+
+
+class JSONDictSerializerTest(unittest.TestCase):
+
+ def test_json(self):
+ input_dict = dict(servers=dict(a=(2, 3)))
+ expected_json = '{"servers":{"a":[2,3]}}'
+ serializer = wsgi.JSONDictSerializer()
+ result = serializer.serialize(input_dict)
+ result = result.replace('\n', '').replace(' ', '')
+ self.assertEqual(result, expected_json)
+
+
+class TextDeserializerTest(unittest.TestCase):
+
+ def test_dispatch_default(self):
+ deserializer = wsgi.TextDeserializer()
+ self.assertEqual(deserializer.deserialize({}, 'update'), {})
+
+
+class JSONDeserializerTest(unittest.TestCase):
+
+ def test_json(self):
+ data = """{"a": {
+ "a1": "1",
+ "a2": "2",
+ "bs": ["1", "2", "3", {"c": {"c1": "1"}}],
+ "d": {"e": "1"},
+ "f": "1"}}"""
+ as_dict = {
+ 'body': {
+ 'a': {
+ 'a1': '1',
+ 'a2': '2',
+ 'bs': ['1', '2', '3', {'c': {'c1': '1'}}],
+ 'd': {'e': '1'},
+ 'f': '1',
+ },
+ },
+ }
+ deserializer = wsgi.JSONDeserializer()
+ self.assertEqual(deserializer.deserialize(data), as_dict)
+
+
+class XMLDeserializerTest(unittest.TestCase):
+
+ def test_xml(self):
+ xml = """
+ <a a1="1" a2="2">
+ <bs><b>1</b><b>2</b><b>3</b><b><c c1="1"/></b></bs>
+ <d><e>1</e></d>
+ <f>1</f>
+ </a>
+ """.strip()
+ as_dict = {
+ 'body': {
+ 'a': {
+ 'a1': '1',
+ 'a2': '2',
+ 'bs': ['1', '2', '3', {'c': {'c1': '1'}}],
+ 'd': {'e': '1'},
+ 'f': '1',
+ },
+ },
+ }
+ metadata = {'plurals': {'bs': 'b', 'ts': 't'}}
+ deserializer = wsgi.XMLDeserializer(metadata=metadata)
+ self.assertEqual(deserializer.deserialize(xml), as_dict)
+
+ def test_xml_empty(self):
+ xml = '<a></a>'
+ as_dict = {"body": {"a": {}}}
+ deserializer = wsgi.XMLDeserializer()
+ self.assertEqual(deserializer.deserialize(xml), as_dict)
+
+
+class RequestHeadersDeserializerTest(unittest.TestCase):
+
+ def test_default(self):
+ deserializer = wsgi.RequestHeadersDeserializer()
+ req = wsgi.Request.blank('/')
+ self.assertEqual(deserializer.deserialize(req, 'nonExistant'), {})
+
+ def test_custom(self):
+ class Deserializer(wsgi.RequestHeadersDeserializer):
+ def update(self, request):
+ return {'a': request.headers['X-Custom-Header']}
+ deserializer = Deserializer()
+ req = wsgi.Request.blank('/')
+ req.headers['X-Custom-Header'] = 'b'
+ self.assertEqual(deserializer.deserialize(req, 'update'), {'a': 'b'})
+
+
+class ResponseSerializerTest(unittest.TestCase):
+
+ def setUp(self):
+ class JSONSerializer(object):
+ def serialize(self, data, action='default'):
+ return 'pew_json'
+
+ class XMLSerializer(object):
+ def serialize(self, data, action='default'):
+ return 'pew_xml'
+
+ class HeadersSerializer(object):
+ def serialize(self, response, data, action):
+ response.status_int = 404
+
+ self.body_serializers = {
+ 'application/json': JSONSerializer(),
+ 'application/XML': XMLSerializer(),
+ }
+
+ self.serializer = wsgi.ResponseSerializer(self.body_serializers,
+ HeadersSerializer())
+
+ def tearDown(self):
+ pass
+
+ def test_get_serializer(self):
+ ctype = 'application/json'
+ self.assertEqual(self.serializer.get_body_serializer(ctype),
+ self.body_serializers[ctype])
+
+ def test_get_serializer_unknown_content_type(self):
+ self.assertRaises(exception.InvalidContentType,
+ self.serializer.get_body_serializer,
+ 'application/unknown')
+
+ def test_serialize_response(self):
+ response = self.serializer.serialize({}, 'application/json')
+ self.assertEqual(response.headers['Content-Type'], 'application/json')
+ self.assertEqual(response.body, 'pew_json')
+ self.assertEqual(response.status_int, 404)
+
+ def test_serialize_response_None(self):
+ response = self.serializer.serialize(None, 'application/json')
+
+ self.assertEqual(response.headers['Content-Type'], 'application/json')
+ self.assertEqual(response.body, '')
+ self.assertEqual(response.status_int, 404)
+
+ def test_serialize_response_dict_to_unknown_content_type(self):
+ self.assertRaises(exception.InvalidContentType,
+ self.serializer.serialize,
+ {}, 'application/unknown')
+
+
+class RequestDeserializerTest(unittest.TestCase):
+
+ def setUp(self):
+ class JSONDeserializer(object):
+ def deserialize(self, data, action='default'):
+ return 'pew_json'
+
+ class XMLDeserializer(object):
+ def deserialize(self, data, action='default'):
+ return 'pew_xml'
+
+ self.body_deserializers = {
+ 'application/json': JSONDeserializer(),
+ 'application/XML': XMLDeserializer(),
+ }
+
+ self.deserializer = wsgi.RequestDeserializer(self.body_deserializers)
+
+ def test_get_deserializer(self):
+ expected = self.deserializer.get_body_deserializer('application/json')
+ self.assertEqual(expected, self.body_deserializers['application/json'])
+
+ def test_get_deserializer_unknown_content_type(self):
+ self.assertRaises(exception.InvalidContentType,
+ self.deserializer.get_body_deserializer,
+ 'application/unknown')
+
+ def test_get_expected_content_type(self):
+ request = wsgi.Request.blank('/')
+ request.headers['Accept'] = 'application/json'
+ self.assertEqual(self.deserializer.get_expected_content_type(request),
+ 'application/json')
+
+ def test_get_action_args(self):
+ env = {
+ 'wsgiorg.routing_args': [None, {
+ 'controller': None,
+ 'format': None,
+ 'action': 'update',
+ 'id': 12,
+ }],
+ }
+
+ expected = {'action': 'update', 'id': 12}
+
+ self.assertEqual(self.deserializer.get_action_args(env), expected)
+
+ def test_deserialize(self):
+ def fake_get_routing_args(request):
+ return {'action': 'create'}
+ self.deserializer.get_action_args = fake_get_routing_args
+
+ request = wsgi.Request.blank('/')
+ request.headers['Accept'] = 'application/xml'
+
+ deserialized = self.deserializer.deserialize(request)
+ expected = ('create', {}, 'application/xml')
+
+ self.assertEqual(expected, deserialized)
+
+
+class ResourceTest(unittest.TestCase):
+
+ def test_dispatch(self):
+ class Controller(object):
+ def index(self, req, pants=None):
+ return pants
+
+ resource = wsgi.Resource(Controller())
+ actual = resource.dispatch(resource.controller,
+ 'index', None, pants='off')
+ expected = 'off'
+ self.assertEqual(actual, expected)
+
+ def test_dispatch_unknown_controller_action(self):
+ class Controller(object):
+ def index(self, req, pants=None):
+ return pants
+
+ resource = wsgi.Resource(Controller())
+ self.assertRaises(AttributeError, resource.dispatch,
+ resource.controller, 'create', None, {})
+
+ def test_malformed_request_body_throws_bad_request(self):
+ resource = wsgi.Resource(None)
+ request = wsgi.Request.blank("/", body="{mal:formed", method='POST',
+ headers={'Content-Type': "application/json"})
+
+ response = resource(request)
+ self.assertEqual(response.status, '400 Bad Request')
+
+ def test_wrong_content_type_throws_unsupported_media_type_error(self):
+ resource = wsgi.Resource(None)
+ request = wsgi.Request.blank("/", body="{some:json}", method='POST',
+ headers={'Content-Type': "xxx"})
+
+ response = resource(request)
+ self.assertEqual(response.status, '415 Unsupported Media Type')