From 02c95aeb2ffe112f7b60a1d3c53cdde22bc5db4d Mon Sep 17 00:00:00 2001 From: Rajaram Mallya Date: Thu, 8 Sep 2011 18:29:40 +0530 Subject: Rajaram/Vinkesh | Copied tests for wsgi from nova. Added default content/accept types in Request which can be overridden by projects. Copied tests for XML serialization of Extension Controller's action from nova --- openstack/common/exception.py | 4 + openstack/common/extensions.py | 17 +- openstack/common/wsgi.py | 248 +++++++++++++------------ tests/unit/extension_stubs.py | 2 +- tests/unit/test_extensions.py | 152 +++++++++++++-- tests/unit/test_wsgi.py | 413 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 701 insertions(+), 135 deletions(-) create mode 100644 tests/unit/test_wsgi.py diff --git a/openstack/common/exception.py b/openstack/common/exception.py index a81355e..ba32da5 100644 --- a/openstack/common/exception.py +++ b/openstack/common/exception.py @@ -139,5 +139,9 @@ class OpenstackException(Exception): return self._error_string +class MalformedRequestBody(OpenstackException): + message = "Malformed message body: %(reason)s" + + class InvalidContentType(OpenstackException): message = "Invalid content type %(content_type)s" diff --git a/openstack/common/extensions.py b/openstack/common/extensions.py index 7044227..d2fab36 100644 --- a/openstack/common/extensions.py +++ b/openstack/common/extensions.py @@ -27,8 +27,9 @@ from lxml import etree from openstack.common import exception from openstack.common import wsgi - LOG = logging.getLogger('extensions') +DEFAULT_XMLNS = "http://docs.openstack.org/" +XMLNS_ATOM = "http://www.w3.org/2005/Atom" class ExtensionDescriptor(object): @@ -166,6 +167,9 @@ class ExtensionsResource(wsgi.Resource): def __init__(self, extension_manager): self.extension_manager = extension_manager + body_serializers = {'application/xml': ExtensionsXMLSerializer()} + serializer = wsgi.ResponseSerializer(body_serializers=body_serializers) + super(ExtensionsResource, self).__init__(self, None, serializer) def _translate(self, ext): ext_data = {} @@ -342,8 +346,11 @@ class ExtensionManager(object): def get_resources(self): """Returns a list of ResourceExtension objects.""" resources = [] - resources.append(ResourceExtension('extensions', - ExtensionsResource(self))) + extension_resource = ExtensionsResource(self) + res_ext = ResourceExtension('extensions', + extension_resource, + serializer=extension_resource.serializer) + resources.append(res_ext) for alias, ext in self.extensions.iteritems(): try: resources.extend(ext.get_resources()) @@ -486,7 +493,7 @@ class ResourceExtension(object): class ExtensionsXMLSerializer(wsgi.XMLDictSerializer): -# NSMAP = {None: xmlutil.XMLNS_V11, 'atom': xmlutil.XMLNS_ATOM} + NSMAP = {None: DEFAULT_XMLNS, 'atom': XMLNS_ATOM} def show(self, ext_dict): ext = etree.Element('extension', nsmap=self.NSMAP) @@ -511,7 +518,7 @@ class ExtensionsXMLSerializer(wsgi.XMLDictSerializer): desc.text = ext_dict['description'] ext_elem.append(desc) for link in ext_dict.get('links', []): - elem = etree.SubElement(ext_elem, '{%s}link' % xmlutil.XMLNS_ATOM) + elem = etree.SubElement(ext_elem, '{%s}link' % XMLNS_ATOM) elem.set('rel', link['rel']) elem.set('href', link['href']) elem.set('type', link['type']) diff --git a/openstack/common/wsgi.py b/openstack/common/wsgi.py index 628b3a4..128ae8c 100644 --- a/openstack/common/wsgi.py +++ b/openstack/common/wsgi.py @@ -210,31 +210,47 @@ class Router(object): class Request(webob.Request): - """Add some Openstack API-specific logic to the base webob.Request.""" + default_request_content_types = ('application/json', 'application/xml') + default_accept_types = ('application/json', 'application/xml') + default_accept_type = 'application/json' + def best_match_content_type(self, supported_content_types=None): - """Determine the requested response content-type.""" - supported_content_types = (supported_content_types - or ("application/xml", - "application/json",)) + """Determine the requested response content-type. + + Based on the query extension then the Accept header. + Defaults to default_accept_type if we don't find a preference + + """ + supported_content_types = (supported_content_types or + self.default_accept_types) + + parts = self.path.rsplit('.', 1) + if len(parts) > 1: + ctype = 'application/{0}'.format(parts[1]) + if ctype in supported_content_types: + return ctype + bm = self.accept.best_match(supported_content_types) - return bm or 'application/json' + return bm or self.default_accept_type def get_content_type(self, allowed_content_types=None): - """Determine content type of the request body.""" + """Determine content type of the request body. - allowed_content_types = allowed_content_types or ("application/xml", - "application/json",) + Does not do any body introspection, only checks header + + """ if not "Content-Type" in self.headers: - raise exception.InvalidContentType(content_type=None) + return None content_type = self.content_type + allowed_content_types = (allowed_content_types or + self.default_request_content_types) if content_type not in allowed_content_types: raise exception.InvalidContentType(content_type=content_type) - else: - return content_type + return content_type class Resource(object): @@ -269,11 +285,19 @@ class Resource(object): @webob.dec.wsgify(RequestClass=Request) def __call__(self, request): """WSGI method that controls (de)serialization and method dispatch.""" - action, action_args, accept = self.deserialize_request(request) + + try: + action, action_args, accept = self.deserialize_request(request) + except exception.InvalidContentType: + msg = _("Unsupported Content-Type") + return webob.exc.HTTPUnsupportedMediaType(explanation=msg) + except exception.MalformedRequestBody: + msg = _("Malformed request body") + return webob.exc.HTTPBadRequest(explanation=msg) + action_result = self.execute_action(action, request, **action_args) try: return self.serialize_response(action, action_result, accept) - # return unserializable result (typically a webob exc) except Exception: return action_result @@ -329,107 +353,6 @@ class ActionDispatcher(object): raise NotImplementedError() -class TextDeserializer(ActionDispatcher): - """Default request body deserialization""" - - def deserialize(self, datastring, action='default'): - return self.dispatch(datastring, action=action) - - def default(self, datastring): - return {} - - -class JSONDeserializer(TextDeserializer): - - def _from_json(self, datastring): - try: - return json.loads(datastring) - except ValueError: - msg = _("cannot understand JSON") - raise exception.MalformedRequestBody(reason=msg) - - def default(self, datastring): - return {'body': self._from_json(datastring)} - - -class XMLDeserializer(TextDeserializer): - - def __init__(self, metadata=None): - """ - :param metadata: information needed to deserialize xml into - a dictionary. - """ - super(XMLDeserializer, self).__init__() - self.metadata = metadata or {} - - def _from_xml(self, datastring): - plurals = set(self.metadata.get('plurals', {})) - - try: - node = minidom.parseString(datastring).childNodes[0] - return {node.nodeName: self._from_xml_node(node, plurals)} - except expat.ExpatError: - msg = _("cannot understand XML") - raise exception.MalformedRequestBody(reason=msg) - - def _from_xml_node(self, node, listnames): - """Convert a minidom node to a simple Python type. - - :param listnames: list of XML node names whose subnodes should - be considered list items. - - """ - if len(node.childNodes) == 1 and node.childNodes[0].nodeType == 3: - return node.childNodes[0].nodeValue - elif node.nodeName in listnames: - return [self._from_xml_node(n, listnames) for n in node.childNodes] - else: - result = dict() - for attr in node.attributes.keys(): - result[attr] = node.attributes[attr].nodeValue - for child in node.childNodes: - if child.nodeType != node.TEXT_NODE: - result[child.nodeName] = self._from_xml_node(child, - listnames) - return result - - def find_first_child_named(self, parent, name): - """Search a nodes children for the first child with a given name""" - for node in parent.childNodes: - if node.nodeName == name: - return node - return None - - def find_children_named(self, parent, name): - """Return all of a nodes children who have the given name""" - for node in parent.childNodes: - if node.nodeName == name: - yield node - - def extract_text(self, node): - """Get the text field contained by the given node""" - if len(node.childNodes) == 1: - child = node.childNodes[0] - if child.nodeType == child.TEXT_NODE: - return child.nodeValue - return "" - - def default(self, datastring): - return {'body': self._from_xml(datastring)} - - -class MetadataXMLDeserializer(XMLDeserializer): - - def extract_metadata(self, metadata_node): - """Marshal the metadata attribute of a parsed request""" - metadata = {} - if metadata_node is not None: - for meta_node in self.find_children_named(metadata_node, "meta"): - key = meta_node.getAttribute("key") - metadata[key] = self.extract_text(meta_node) - return metadata - - class DictSerializer(ActionDispatcher): """Default request body serialization""" @@ -619,8 +542,7 @@ class RequestDeserializer(object): def __init__(self, body_deserializers=None, headers_deserializer=None, supported_content_types=None): - self.supported_content_types = supported_content_types or \ - ('application/json', 'application/xml') + self.supported_content_types = supported_content_types self.body_deserializers = { 'application/xml': XMLDeserializer(), @@ -662,7 +584,7 @@ class RequestDeserializer(object): content_type = request.get_content_type() except exception.InvalidContentType: LOG.debug(_("Unrecognized Content-Type provided in request")) - return {} + raise if content_type is None: LOG.debug(_("No Content-Type provided in request")) @@ -703,3 +625,93 @@ class RequestDeserializer(object): pass return args + + +class TextDeserializer(ActionDispatcher): + """Default request body deserialization""" + + def deserialize(self, datastring, action='default'): + return self.dispatch(datastring, action=action) + + def default(self, datastring): + return {} + + +class JSONDeserializer(TextDeserializer): + + def _from_json(self, datastring): + try: + return json.loads(datastring) + except ValueError: + msg = _("cannot understand JSON") + raise exception.MalformedRequestBody(reason=msg) + + def default(self, datastring): + return {'body': self._from_json(datastring)} + + +class XMLDeserializer(TextDeserializer): + + def __init__(self, metadata=None): + """ + :param metadata: information needed to deserialize xml into + a dictionary. + """ + super(XMLDeserializer, self).__init__() + self.metadata = metadata or {} + + def _from_xml(self, datastring): + plurals = set(self.metadata.get('plurals', {})) + + try: + node = minidom.parseString(datastring).childNodes[0] + return {node.nodeName: self._from_xml_node(node, plurals)} + except expat.ExpatError: + msg = _("cannot understand XML") + raise exception.MalformedRequestBody(reason=msg) + + def _from_xml_node(self, node, listnames): + """Convert a minidom node to a simple Python type. + + :param listnames: list of XML node names whose subnodes should + be considered list items. + + """ + + if len(node.childNodes) == 1 and node.childNodes[0].nodeType == 3: + return node.childNodes[0].nodeValue + elif node.nodeName in listnames: + return [self._from_xml_node(n, listnames) for n in node.childNodes] + else: + result = dict() + for attr in node.attributes.keys(): + result[attr] = node.attributes[attr].nodeValue + for child in node.childNodes: + if child.nodeType != node.TEXT_NODE: + result[child.nodeName] = self._from_xml_node(child, + listnames) + return result + + def find_first_child_named(self, parent, name): + """Search a nodes children for the first child with a given name""" + for node in parent.childNodes: + if node.nodeName == name: + return node + return None + + def find_children_named(self, parent, name): + """Return all of a nodes children who have the given name""" + for node in parent.childNodes: + if node.nodeName == name: + yield node + + def extract_text(self, node): + """Get the text field contained by the given node""" + if len(node.childNodes) == 1: + child = node.childNodes[0] + if child.nodeType == child.TEXT_NODE: + return child.nodeValue + return "" + + def default(self, datastring): + return {'body': self._from_xml(datastring)} diff --git a/tests/unit/extension_stubs.py b/tests/unit/extension_stubs.py index 0352614..c25f285 100644 --- a/tests/unit/extension_stubs.py +++ b/tests/unit/extension_stubs.py @@ -46,7 +46,7 @@ class StubBaseAppController(object): def show(self, request, id): return {'fort': 'knox'} - def update(self, request, id): + def update(self, request, id, body=None): return {'uneditable': 'original_value'} def create_resource(self): diff --git a/tests/unit/test_extensions.py b/tests/unit/test_extensions.py index fe56589..841bf4d 100644 --- a/tests/unit/test_extensions.py +++ b/tests/unit/test_extensions.py @@ -15,11 +15,14 @@ # under the License. import json +from lxml import etree import os.path import routes import unittest + from webtest import TestApp + from openstack.common import wsgi from openstack.common import config from openstack.common import extensions @@ -33,6 +36,9 @@ test_conf_file = os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, 'etc', 'openstack-common.conf.test') extensions_path = os.path.join(os.path.dirname(__file__), "extensions") +NS = "{http://docs.openstack.org/}" +ATOMNS = "{http://www.w3.org/2005/Atom}" + class ExtensionsTestApp(wsgi.Router): @@ -254,19 +260,21 @@ class RequestExtensionTest(unittest.TestCase): def _update_handler(req, res): data = json.loads(res.body) - data['uneditable'] = req.params['uneditable'] + data['uneditable'] = json.loads(req.body)['uneditable'] res.body = json.dumps(data) return res base_app = TestApp(setup_base_app()) response = base_app.put("/dummy_resources/1", - {'uneditable': "new_value"}) + json.dumps({'uneditable': "new_value"}), + headers={'Content-Type': "application/json"}) self.assertEqual(response.json['uneditable'], "original_value") ext_app = self._setup_app_with_request_handler(_update_handler, 'PUT') ext_response = ext_app.put("/dummy_resources/1", - {'uneditable': "new_value"}) + json.dumps({'uneditable': "new_value"}), + headers={'Content-Type': "application/json"}) self.assertEqual(ext_response.json['uneditable'], "new_value") def _setup_app_with_request_handler(self, handler, verb): @@ -306,23 +314,145 @@ class ExtensionControllerTest(unittest.TestCase): response = self.test_app.get("/extensions") foxnsox = response.json["extensions"][0] - self.assertEqual(foxnsox["alias"], "FOXNSOX") - self.assertEqual(foxnsox["namespace"], - "http://www.fox.in.socks/api/ext/pie/v1.0") + self.assertEqual(foxnsox, { + 'namespace': 'http://www.fox.in.socks/api/ext/pie/v1.0', + 'name': 'Fox In Socks', + 'updated': '2011-01-22T13:25:27-06:00', + 'description': 'The Fox In Socks Extension', + 'alias': 'FOXNSOX', + 'links': [] + } + ) def test_extension_can_be_accessed_by_alias(self): json_response = self.test_app.get("/extensions/FOXNSOX").json - foxnsox_extension = json_response['extension'] - - self.assertEqual(foxnsox_extension["alias"], "FOXNSOX") - self.assertEqual(foxnsox_extension["namespace"], - "http://www.fox.in.socks/api/ext/pie/v1.0") + foxnsox = json_response['extension'] + + self.assertEqual(foxnsox, { + 'namespace': 'http://www.fox.in.socks/api/ext/pie/v1.0', + 'name': 'Fox In Socks', + 'updated': '2011-01-22T13:25:27-06:00', + 'description': 'The Fox In Socks Extension', + 'alias': 'FOXNSOX', + 'links': [] + } + ) def test_show_returns_not_found_for_non_existant_extension(self): response = self.test_app.get("/extensions/non_existant", status="*") self.assertEqual(response.status_int, 404) + def test_list_extensions_xml(self): + response = self.test_app.get("/extensions.xml") + + self.assertEqual(200, response.status_int) + root = etree.XML(response.body) + self.assertEqual(root.tag.split('extensions')[0], NS) + + # Make sure that Fox in Sox extension is correct. + exts = root.findall('{0}extension'.format(NS)) + fox_ext = exts[0] + self.assertEqual(fox_ext.get('name'), 'Fox In Socks') + self.assertEqual(fox_ext.get('namespace'), + 'http://www.fox.in.socks/api/ext/pie/v1.0') + self.assertEqual(fox_ext.get('updated'), '2011-01-22T13:25:27-06:00') + self.assertEqual(fox_ext.findtext('{0}description'.format(NS)), + 'The Fox In Socks Extension') + + def test_get_extension_xml(self): + response = self.test_app.get("/extensions/FOXNSOX.xml") + self.assertEqual(200, response.status_int) + xml = response.body + + root = etree.XML(xml) + self.assertEqual(root.tag.split('extension')[0], NS) + self.assertEqual(root.get('alias'), 'FOXNSOX') + self.assertEqual(root.get('name'), 'Fox In Socks') + self.assertEqual(root.get('namespace'), + 'http://www.fox.in.socks/api/ext/pie/v1.0') + self.assertEqual(root.get('updated'), '2011-01-22T13:25:27-06:00') + self.assertEqual(root.findtext('{0}description'.format(NS)), + 'The Fox In Socks Extension') + + +class ExtensionsXMLSerializerTest(unittest.TestCase): + + def test_serialize_extenstion(self): + serializer = extensions.ExtensionsXMLSerializer() + data = {'extension': { + 'name': 'ext1', + 'namespace': 'http://docs.rack.com/servers/api/ext/pie/v1.0', + 'alias': 'RS-PIE', + 'updated': '2011-01-22T13:25:27-06:00', + 'description': 'Adds the capability to share an image.', + 'links': [{'rel': 'describedby', + 'type': 'application/pdf', + 'href': 'http://docs.rack.com/servers/api/ext/cs.pdf'}, + {'rel': 'describedby', + 'type': 'application/vnd.sun.wadl+xml', + 'href': 'http://docs.rack.com/servers/api/ext/cs.wadl'}]}} + + xml = serializer.serialize(data, 'show') + root = etree.XML(xml) + ext_dict = data['extension'] + self.assertEqual(root.findtext('{0}description'.format(NS)), + ext_dict['description']) + + for key in ['name', 'namespace', 'alias', 'updated']: + self.assertEqual(root.get(key), ext_dict[key]) + + link_nodes = root.findall('{0}link'.format(ATOMNS)) + self.assertEqual(len(link_nodes), 2) + for i, link in enumerate(ext_dict['links']): + for key, value in link.items(): + self.assertEqual(link_nodes[i].get(key), value) + + def test_serialize_extensions(self): + serializer = extensions.ExtensionsXMLSerializer() + data = {"extensions": [{ + "name": "Public Image Extension", + "namespace": "http://foo.com/api/ext/pie/v1.0", + "alias": "RS-PIE", + "updated": "2011-01-22T13:25:27-06:00", + "description": "Adds the capability to share an image.", + "links": [{"rel": "describedby", + "type": "application/pdf", + "type": "application/vnd.sun.wadl+xml", + "href": "http://foo.com/api/ext/cs-pie.pdf"}, + {"rel": "describedby", + "type": "application/vnd.sun.wadl+xml", + "href": "http://foo.com/api/ext/cs-pie.wadl"}]}, + {"name": "Cloud Block Storage", + "namespace": "http://foo.com/api/ext/cbs/v1.0", + "alias": "RS-CBS", + "updated": "2011-01-12T11:22:33-06:00", + "description": "Allows mounting cloud block storage.", + "links": [{"rel": "describedby", + "type": "application/pdf", + "href": "http://foo.com/api/ext/cs-cbs.pdf"}, + {"rel": "describedby", + "type": "application/vnd.sun.wadl+xml", + "href": "http://foo.com/api/ext/cs-cbs.wadl"}]}]} + + xml = serializer.serialize(data, 'index') + root = etree.XML(xml) + ext_elems = root.findall('{0}extension'.format(NS)) + self.assertEqual(len(ext_elems), 2) + for i, ext_elem in enumerate(ext_elems): + ext_dict = data['extensions'][i] + self.assertEqual(ext_elem.findtext('{0}description'.format(NS)), + ext_dict['description']) + + for key in ['name', 'namespace', 'alias', 'updated']: + self.assertEqual(ext_elem.get(key), ext_dict[key]) + + link_nodes = ext_elem.findall('{0}link'.format(ATOMNS)) + self.assertEqual(len(link_nodes), 2) + for i, link in enumerate(ext_dict['links']): + for key, value in link.items(): + self.assertEqual(link_nodes[i].get(key), value) + def app_factory(global_conf, **local_conf): conf = global_conf.copy() diff --git a/tests/unit/test_wsgi.py b/tests/unit/test_wsgi.py new file mode 100644 index 0000000..7a5eaa4 --- /dev/null +++ b/tests/unit/test_wsgi.py @@ -0,0 +1,413 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import unittest +import webob + +from openstack.common import exception +from openstack.common import wsgi + + +class RequestTest(unittest.TestCase): + + def test_content_type_missing(self): + request = wsgi.Request.blank('/tests/123', method='POST') + request.body = "" + self.assertEqual(None, request.get_content_type()) + + def test_content_type_unsupported(self): + request = wsgi.Request.blank('/tests/123', method='POST') + request.headers["Content-Type"] = "text/html" + request.body = "asdf
" + self.assertRaises(exception.InvalidContentType, + request.get_content_type) + + def test_content_type_with_charset(self): + request = wsgi.Request.blank('/tests/123') + request.headers["Content-Type"] = "application/json; charset=UTF-8" + result = request.get_content_type() + self.assertEqual(result, "application/json") + + def test_content_type_with_given_content_types(self): + request = wsgi.Request.blank('/tests/123') + request.headers["Content-Type"] = "application/new-type;" + result = request.get_content_type(["application/json", + "application/new-type"]) + self.assertEqual(result, "application/new-type") + + def test_content_type_from_accept_xml(self): + request = wsgi.Request.blank('/tests/123') + request.headers["Accept"] = "application/xml" + result = request.best_match_content_type() + self.assertEqual(result, "application/xml") + + request = wsgi.Request.blank('/tests/123') + request.headers["Accept"] = "application/json" + result = request.best_match_content_type() + self.assertEqual(result, "application/json") + + request = wsgi.Request.blank('/tests/123') + request.headers["Accept"] = "application/xml, application/json" + result = request.best_match_content_type() + self.assertEqual(result, "application/json") + + request = wsgi.Request.blank('/tests/123') + request.headers["Accept"] = \ + "application/json; q=0.3, application/xml; q=0.9" + result = request.best_match_content_type() + self.assertEqual(result, "application/xml") + + def test_content_type_from_query_extension(self): + request = wsgi.Request.blank('/tests/123.xml') + result = request.best_match_content_type() + self.assertEqual(result, "application/xml") + + request = wsgi.Request.blank('/tests/123.json') + result = request.best_match_content_type() + self.assertEqual(result, "application/json") + + request = wsgi.Request.blank('/tests/123.invalid') + result = request.best_match_content_type() + self.assertEqual(result, "application/json") + + def test_content_type_accept_and_query_extension(self): + request = wsgi.Request.blank('/tests/123.xml') + request.headers["Accept"] = "application/json" + result = request.best_match_content_type() + self.assertEqual(result, "application/xml") + + def test_content_type_accept_default(self): + request = wsgi.Request.blank('/tests/123.unsupported') + request.headers["Accept"] = "application/unsupported1" + result = request.best_match_content_type() + self.assertEqual(result, "application/json") + + def test_content_type_accept_with_given_content_types(self): + request = wsgi.Request.blank('/tests/123') + request.headers["Accept"] = "application/new_type" + result = request.best_match_content_type(["application/new_type"]) + self.assertEqual(result, "application/new_type") + + +class ActionDispatcherTest(unittest.TestCase): + + def test_dispatch(self): + serializer = wsgi.ActionDispatcher() + serializer.create = lambda x: x + self.assertEqual(serializer.dispatch('pants', action='create'), + 'pants') + + def test_dispatch_action_None(self): + serializer = wsgi.ActionDispatcher() + serializer.create = lambda x: x + ' pants' + serializer.default = lambda x: x + ' trousers' + self.assertEqual(serializer.dispatch('Two', action=None), + 'Two trousers') + + def test_dispatch_default(self): + serializer = wsgi.ActionDispatcher() + serializer.create = lambda x: x + ' pants' + serializer.default = lambda x: x + ' trousers' + self.assertEqual(serializer.dispatch('Two', action='update'), + 'Two trousers') + + +class ResponseHeadersSerializerTest(unittest.TestCase): + + def test_default(self): + serializer = wsgi.ResponseHeadersSerializer() + response = webob.Response() + serializer.serialize(response, {'v': '123'}, 'asdf') + self.assertEqual(response.status_int, 200) + + def test_custom(self): + class Serializer(wsgi.ResponseHeadersSerializer): + def update(self, response, data): + response.status_int = 404 + response.headers['X-Custom-Header'] = data['v'] + serializer = Serializer() + response = webob.Response() + serializer.serialize(response, {'v': '123'}, 'update') + self.assertEqual(response.status_int, 404) + self.assertEqual(response.headers['X-Custom-Header'], '123') + + +class DictSerializerTest(unittest.TestCase): + + def test_dispatch_default(self): + serializer = wsgi.DictSerializer() + self.assertEqual(serializer.serialize({}, 'NonExistantAction'), '') + + +class XMLDictSerializerTest(unittest.TestCase): + + def test_xml(self): + input_dict = dict(servers=dict(a=(2, 3))) + expected_xml = """ + (2,3) + """ + serializer = wsgi.XMLDictSerializer(xmlns="asdf") + result = serializer.serialize(input_dict) + result = result.replace('\n', '').replace(' ', '') + expected_xml = expected_xml.replace('\n', '').replace(' ', '') + self.assertEqual(result, expected_xml) + + +class JSONDictSerializerTest(unittest.TestCase): + + def test_json(self): + input_dict = dict(servers=dict(a=(2, 3))) + expected_json = '{"servers":{"a":[2,3]}}' + serializer = wsgi.JSONDictSerializer() + result = serializer.serialize(input_dict) + result = result.replace('\n', '').replace(' ', '') + self.assertEqual(result, expected_json) + + +class TextDeserializerTest(unittest.TestCase): + + def test_dispatch_default(self): + deserializer = wsgi.TextDeserializer() + self.assertEqual(deserializer.deserialize({}, 'update'), {}) + + +class JSONDeserializerTest(unittest.TestCase): + + def test_json(self): + data = """{"a": { + "a1": "1", + "a2": "2", + "bs": ["1", "2", "3", {"c": {"c1": "1"}}], + "d": {"e": "1"}, + "f": "1"}}""" + as_dict = { + 'body': { + 'a': { + 'a1': '1', + 'a2': '2', + 'bs': ['1', '2', '3', {'c': {'c1': '1'}}], + 'd': {'e': '1'}, + 'f': '1', + }, + }, + } + deserializer = wsgi.JSONDeserializer() + self.assertEqual(deserializer.deserialize(data), as_dict) + + +class XMLDeserializerTest(unittest.TestCase): + + def test_xml(self): + xml = """ + + 123 + 1 + 1 + + """.strip() + as_dict = { + 'body': { + 'a': { + 'a1': '1', + 'a2': '2', + 'bs': ['1', '2', '3', {'c': {'c1': '1'}}], + 'd': {'e': '1'}, + 'f': '1', + }, + }, + } + metadata = {'plurals': {'bs': 'b', 'ts': 't'}} + deserializer = wsgi.XMLDeserializer(metadata=metadata) + self.assertEqual(deserializer.deserialize(xml), as_dict) + + def test_xml_empty(self): + xml = '' + as_dict = {"body": {"a": {}}} + deserializer = wsgi.XMLDeserializer() + self.assertEqual(deserializer.deserialize(xml), as_dict) + + +class RequestHeadersDeserializerTest(unittest.TestCase): + + def test_default(self): + deserializer = wsgi.RequestHeadersDeserializer() + req = wsgi.Request.blank('/') + self.assertEqual(deserializer.deserialize(req, 'nonExistant'), {}) + + def test_custom(self): + class Deserializer(wsgi.RequestHeadersDeserializer): + def update(self, request): + return {'a': request.headers['X-Custom-Header']} + deserializer = Deserializer() + req = wsgi.Request.blank('/') + req.headers['X-Custom-Header'] = 'b' + self.assertEqual(deserializer.deserialize(req, 'update'), {'a': 'b'}) + + +class ResponseSerializerTest(unittest.TestCase): + + def setUp(self): + class JSONSerializer(object): + def serialize(self, data, action='default'): + return 'pew_json' + + class XMLSerializer(object): + def serialize(self, data, action='default'): + return 'pew_xml' + + class HeadersSerializer(object): + def serialize(self, response, data, action): + response.status_int = 404 + + self.body_serializers = { + 'application/json': JSONSerializer(), + 'application/XML': XMLSerializer(), + } + + self.serializer = wsgi.ResponseSerializer(self.body_serializers, + HeadersSerializer()) + + def tearDown(self): + pass + + def test_get_serializer(self): + ctype = 'application/json' + self.assertEqual(self.serializer.get_body_serializer(ctype), + self.body_serializers[ctype]) + + def test_get_serializer_unknown_content_type(self): + self.assertRaises(exception.InvalidContentType, + self.serializer.get_body_serializer, + 'application/unknown') + + def test_serialize_response(self): + response = self.serializer.serialize({}, 'application/json') + self.assertEqual(response.headers['Content-Type'], 'application/json') + self.assertEqual(response.body, 'pew_json') + self.assertEqual(response.status_int, 404) + + def test_serialize_response_None(self): + response = self.serializer.serialize(None, 'application/json') + + self.assertEqual(response.headers['Content-Type'], 'application/json') + self.assertEqual(response.body, '') + self.assertEqual(response.status_int, 404) + + def test_serialize_response_dict_to_unknown_content_type(self): + self.assertRaises(exception.InvalidContentType, + self.serializer.serialize, + {}, 'application/unknown') + + +class RequestDeserializerTest(unittest.TestCase): + + def setUp(self): + class JSONDeserializer(object): + def deserialize(self, data, action='default'): + return 'pew_json' + + class XMLDeserializer(object): + def deserialize(self, data, action='default'): + return 'pew_xml' + + self.body_deserializers = { + 'application/json': JSONDeserializer(), + 'application/XML': XMLDeserializer(), + } + + self.deserializer = wsgi.RequestDeserializer(self.body_deserializers) + + def test_get_deserializer(self): + expected = self.deserializer.get_body_deserializer('application/json') + self.assertEqual(expected, self.body_deserializers['application/json']) + + def test_get_deserializer_unknown_content_type(self): + self.assertRaises(exception.InvalidContentType, + self.deserializer.get_body_deserializer, + 'application/unknown') + + def test_get_expected_content_type(self): + request = wsgi.Request.blank('/') + request.headers['Accept'] = 'application/json' + self.assertEqual(self.deserializer.get_expected_content_type(request), + 'application/json') + + def test_get_action_args(self): + env = { + 'wsgiorg.routing_args': [None, { + 'controller': None, + 'format': None, + 'action': 'update', + 'id': 12, + }], + } + + expected = {'action': 'update', 'id': 12} + + self.assertEqual(self.deserializer.get_action_args(env), expected) + + def test_deserialize(self): + def fake_get_routing_args(request): + return {'action': 'create'} + self.deserializer.get_action_args = fake_get_routing_args + + request = wsgi.Request.blank('/') + request.headers['Accept'] = 'application/xml' + + deserialized = self.deserializer.deserialize(request) + expected = ('create', {}, 'application/xml') + + self.assertEqual(expected, deserialized) + + +class ResourceTest(unittest.TestCase): + + def test_dispatch(self): + class Controller(object): + def index(self, req, pants=None): + return pants + + resource = wsgi.Resource(Controller()) + actual = resource.dispatch(resource.controller, + 'index', None, pants='off') + expected = 'off' + self.assertEqual(actual, expected) + + def test_dispatch_unknown_controller_action(self): + class Controller(object): + def index(self, req, pants=None): + return pants + + resource = wsgi.Resource(Controller()) + self.assertRaises(AttributeError, resource.dispatch, + resource.controller, 'create', None, {}) + + def test_malformed_request_body_throws_bad_request(self): + resource = wsgi.Resource(None) + request = wsgi.Request.blank("/", body="{mal:formed", method='POST', + headers={'Content-Type': "application/json"}) + + response = resource(request) + self.assertEqual(response.status, '400 Bad Request') + + def test_wrong_content_type_throws_unsupported_media_type_error(self): + resource = wsgi.Resource(None) + request = wsgi.Request.blank("/", body="{some:json}", method='POST', + headers={'Content-Type': "xxx"}) + + response = resource(request) + self.assertEqual(response.status, '415 Unsupported Media Type') -- cgit