summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--openstack/common/exception.py4
-rw-r--r--openstack/common/extensions.py17
-rw-r--r--openstack/common/wsgi.py248
-rw-r--r--tests/unit/extension_stubs.py2
-rw-r--r--tests/unit/test_extensions.py152
-rw-r--r--tests/unit/test_wsgi.py413
6 files changed, 701 insertions, 135 deletions
diff --git a/openstack/common/exception.py b/openstack/common/exception.py
index a81355e..ba32da5 100644
--- a/openstack/common/exception.py
+++ b/openstack/common/exception.py
@@ -139,5 +139,9 @@ class OpenstackException(Exception):
return self._error_string
+class MalformedRequestBody(OpenstackException):
+ message = "Malformed message body: %(reason)s"
+
+
class InvalidContentType(OpenstackException):
message = "Invalid content type %(content_type)s"
diff --git a/openstack/common/extensions.py b/openstack/common/extensions.py
index 7044227..d2fab36 100644
--- a/openstack/common/extensions.py
+++ b/openstack/common/extensions.py
@@ -27,8 +27,9 @@ from lxml import etree
from openstack.common import exception
from openstack.common import wsgi
-
LOG = logging.getLogger('extensions')
+DEFAULT_XMLNS = "http://docs.openstack.org/"
+XMLNS_ATOM = "http://www.w3.org/2005/Atom"
class ExtensionDescriptor(object):
@@ -166,6 +167,9 @@ class ExtensionsResource(wsgi.Resource):
def __init__(self, extension_manager):
self.extension_manager = extension_manager
+ body_serializers = {'application/xml': ExtensionsXMLSerializer()}
+ serializer = wsgi.ResponseSerializer(body_serializers=body_serializers)
+ super(ExtensionsResource, self).__init__(self, None, serializer)
def _translate(self, ext):
ext_data = {}
@@ -342,8 +346,11 @@ class ExtensionManager(object):
def get_resources(self):
"""Returns a list of ResourceExtension objects."""
resources = []
- resources.append(ResourceExtension('extensions',
- ExtensionsResource(self)))
+ extension_resource = ExtensionsResource(self)
+ res_ext = ResourceExtension('extensions',
+ extension_resource,
+ serializer=extension_resource.serializer)
+ resources.append(res_ext)
for alias, ext in self.extensions.iteritems():
try:
resources.extend(ext.get_resources())
@@ -486,7 +493,7 @@ class ResourceExtension(object):
class ExtensionsXMLSerializer(wsgi.XMLDictSerializer):
-# NSMAP = {None: xmlutil.XMLNS_V11, 'atom': xmlutil.XMLNS_ATOM}
+ NSMAP = {None: DEFAULT_XMLNS, 'atom': XMLNS_ATOM}
def show(self, ext_dict):
ext = etree.Element('extension', nsmap=self.NSMAP)
@@ -511,7 +518,7 @@ class ExtensionsXMLSerializer(wsgi.XMLDictSerializer):
desc.text = ext_dict['description']
ext_elem.append(desc)
for link in ext_dict.get('links', []):
- elem = etree.SubElement(ext_elem, '{%s}link' % xmlutil.XMLNS_ATOM)
+ elem = etree.SubElement(ext_elem, '{%s}link' % XMLNS_ATOM)
elem.set('rel', link['rel'])
elem.set('href', link['href'])
elem.set('type', link['type'])
diff --git a/openstack/common/wsgi.py b/openstack/common/wsgi.py
index 628b3a4..128ae8c 100644
--- a/openstack/common/wsgi.py
+++ b/openstack/common/wsgi.py
@@ -210,31 +210,47 @@ class Router(object):
class Request(webob.Request):
-
"""Add some Openstack API-specific logic to the base webob.Request."""
+ default_request_content_types = ('application/json', 'application/xml')
+ default_accept_types = ('application/json', 'application/xml')
+ default_accept_type = 'application/json'
+
def best_match_content_type(self, supported_content_types=None):
- """Determine the requested response content-type."""
- supported_content_types = (supported_content_types
- or ("application/xml",
- "application/json",))
+ """Determine the requested response content-type.
+
+ Based on the query extension then the Accept header.
+ Defaults to default_accept_type if we don't find a preference
+
+ """
+ supported_content_types = (supported_content_types or
+ self.default_accept_types)
+
+ parts = self.path.rsplit('.', 1)
+ if len(parts) > 1:
+ ctype = 'application/{0}'.format(parts[1])
+ if ctype in supported_content_types:
+ return ctype
+
bm = self.accept.best_match(supported_content_types)
- return bm or 'application/json'
+ return bm or self.default_accept_type
def get_content_type(self, allowed_content_types=None):
- """Determine content type of the request body."""
+ """Determine content type of the request body.
- allowed_content_types = allowed_content_types or ("application/xml",
- "application/json",)
+ Does not do any body introspection, only checks header
+
+ """
if not "Content-Type" in self.headers:
- raise exception.InvalidContentType(content_type=None)
+ return None
content_type = self.content_type
+ allowed_content_types = (allowed_content_types or
+ self.default_request_content_types)
if content_type not in allowed_content_types:
raise exception.InvalidContentType(content_type=content_type)
- else:
- return content_type
+ return content_type
class Resource(object):
@@ -269,11 +285,19 @@ class Resource(object):
@webob.dec.wsgify(RequestClass=Request)
def __call__(self, request):
"""WSGI method that controls (de)serialization and method dispatch."""
- action, action_args, accept = self.deserialize_request(request)
+
+ try:
+ action, action_args, accept = self.deserialize_request(request)
+ except exception.InvalidContentType:
+ msg = _("Unsupported Content-Type")
+ return webob.exc.HTTPUnsupportedMediaType(explanation=msg)
+ except exception.MalformedRequestBody:
+ msg = _("Malformed request body")
+ return webob.exc.HTTPBadRequest(explanation=msg)
+
action_result = self.execute_action(action, request, **action_args)
try:
return self.serialize_response(action, action_result, accept)
-
# return unserializable result (typically a webob exc)
except Exception:
return action_result
@@ -329,107 +353,6 @@ class ActionDispatcher(object):
raise NotImplementedError()
-class TextDeserializer(ActionDispatcher):
- """Default request body deserialization"""
-
- def deserialize(self, datastring, action='default'):
- return self.dispatch(datastring, action=action)
-
- def default(self, datastring):
- return {}
-
-
-class JSONDeserializer(TextDeserializer):
-
- def _from_json(self, datastring):
- try:
- return json.loads(datastring)
- except ValueError:
- msg = _("cannot understand JSON")
- raise exception.MalformedRequestBody(reason=msg)
-
- def default(self, datastring):
- return {'body': self._from_json(datastring)}
-
-
-class XMLDeserializer(TextDeserializer):
-
- def __init__(self, metadata=None):
- """
- :param metadata: information needed to deserialize xml into
- a dictionary.
- """
- super(XMLDeserializer, self).__init__()
- self.metadata = metadata or {}
-
- def _from_xml(self, datastring):
- plurals = set(self.metadata.get('plurals', {}))
-
- try:
- node = minidom.parseString(datastring).childNodes[0]
- return {node.nodeName: self._from_xml_node(node, plurals)}
- except expat.ExpatError:
- msg = _("cannot understand XML")
- raise exception.MalformedRequestBody(reason=msg)
-
- def _from_xml_node(self, node, listnames):
- """Convert a minidom node to a simple Python type.
-
- :param listnames: list of XML node names whose subnodes should
- be considered list items.
-
- """
- if len(node.childNodes) == 1 and node.childNodes[0].nodeType == 3:
- return node.childNodes[0].nodeValue
- elif node.nodeName in listnames:
- return [self._from_xml_node(n, listnames) for n in node.childNodes]
- else:
- result = dict()
- for attr in node.attributes.keys():
- result[attr] = node.attributes[attr].nodeValue
- for child in node.childNodes:
- if child.nodeType != node.TEXT_NODE:
- result[child.nodeName] = self._from_xml_node(child,
- listnames)
- return result
-
- def find_first_child_named(self, parent, name):
- """Search a nodes children for the first child with a given name"""
- for node in parent.childNodes:
- if node.nodeName == name:
- return node
- return None
-
- def find_children_named(self, parent, name):
- """Return all of a nodes children who have the given name"""
- for node in parent.childNodes:
- if node.nodeName == name:
- yield node
-
- def extract_text(self, node):
- """Get the text field contained by the given node"""
- if len(node.childNodes) == 1:
- child = node.childNodes[0]
- if child.nodeType == child.TEXT_NODE:
- return child.nodeValue
- return ""
-
- def default(self, datastring):
- return {'body': self._from_xml(datastring)}
-
-
-class MetadataXMLDeserializer(XMLDeserializer):
-
- def extract_metadata(self, metadata_node):
- """Marshal the metadata attribute of a parsed request"""
- metadata = {}
- if metadata_node is not None:
- for meta_node in self.find_children_named(metadata_node, "meta"):
- key = meta_node.getAttribute("key")
- metadata[key] = self.extract_text(meta_node)
- return metadata
-
-
class DictSerializer(ActionDispatcher):
"""Default request body serialization"""
@@ -619,8 +542,7 @@ class RequestDeserializer(object):
def __init__(self, body_deserializers=None, headers_deserializer=None,
supported_content_types=None):
- self.supported_content_types = supported_content_types or \
- ('application/json', 'application/xml')
+ self.supported_content_types = supported_content_types
self.body_deserializers = {
'application/xml': XMLDeserializer(),
@@ -662,7 +584,7 @@ class RequestDeserializer(object):
content_type = request.get_content_type()
except exception.InvalidContentType:
LOG.debug(_("Unrecognized Content-Type provided in request"))
- return {}
+ raise
if content_type is None:
LOG.debug(_("No Content-Type provided in request"))
@@ -703,3 +625,93 @@ class RequestDeserializer(object):
pass
return args
+
+
+class TextDeserializer(ActionDispatcher):
+ """Default request body deserialization"""
+
+ def deserialize(self, datastring, action='default'):
+ return self.dispatch(datastring, action=action)
+
+ def default(self, datastring):
+ return {}
+
+
+class JSONDeserializer(TextDeserializer):
+
+ def _from_json(self, datastring):
+ try:
+ return json.loads(datastring)
+ except ValueError:
+ msg = _("cannot understand JSON")
+ raise exception.MalformedRequestBody(reason=msg)
+
+ def default(self, datastring):
+ return {'body': self._from_json(datastring)}
+
+
+class XMLDeserializer(TextDeserializer):
+
+ def __init__(self, metadata=None):
+ """
+ :param metadata: information needed to deserialize xml into
+ a dictionary.
+ """
+ super(XMLDeserializer, self).__init__()
+ self.metadata = metadata or {}
+
+ def _from_xml(self, datastring):
+ plurals = set(self.metadata.get('plurals', {}))
+
+ try:
+ node = minidom.parseString(datastring).childNodes[0]
+ return {node.nodeName: self._from_xml_node(node, plurals)}
+ except expat.ExpatError:
+ msg = _("cannot understand XML")
+ raise exception.MalformedRequestBody(reason=msg)
+
+ def _from_xml_node(self, node, listnames):
+ """Convert a minidom node to a simple Python type.
+
+ :param listnames: list of XML node names whose subnodes should
+ be considered list items.
+
+ """
+
+ if len(node.childNodes) == 1 and node.childNodes[0].nodeType == 3:
+ return node.childNodes[0].nodeValue
+ elif node.nodeName in listnames:
+ return [self._from_xml_node(n, listnames) for n in node.childNodes]
+ else:
+ result = dict()
+ for attr in node.attributes.keys():
+ result[attr] = node.attributes[attr].nodeValue
+ for child in node.childNodes:
+ if child.nodeType != node.TEXT_NODE:
+ result[child.nodeName] = self._from_xml_node(child,
+ listnames)
+ return result
+
+ def find_first_child_named(self, parent, name):
+ """Search a nodes children for the first child with a given name"""
+ for node in parent.childNodes:
+ if node.nodeName == name:
+ return node
+ return None
+
+ def find_children_named(self, parent, name):
+ """Return all of a nodes children who have the given name"""
+ for node in parent.childNodes:
+ if node.nodeName == name:
+ yield node
+
+ def extract_text(self, node):
+ """Get the text field contained by the given node"""
+ if len(node.childNodes) == 1:
+ child = node.childNodes[0]
+ if child.nodeType == child.TEXT_NODE:
+ return child.nodeValue
+ return ""
+
+ def default(self, datastring):
+ return {'body': self._from_xml(datastring)}
diff --git a/tests/unit/extension_stubs.py b/tests/unit/extension_stubs.py
index 0352614..c25f285 100644
--- a/tests/unit/extension_stubs.py
+++ b/tests/unit/extension_stubs.py
@@ -46,7 +46,7 @@ class StubBaseAppController(object):
def show(self, request, id):
return {'fort': 'knox'}
- def update(self, request, id):
+ def update(self, request, id, body=None):
return {'uneditable': 'original_value'}
def create_resource(self):
diff --git a/tests/unit/test_extensions.py b/tests/unit/test_extensions.py
index fe56589..841bf4d 100644
--- a/tests/unit/test_extensions.py
+++ b/tests/unit/test_extensions.py
@@ -15,11 +15,14 @@
# under the License.
import json
+from lxml import etree
import os.path
import routes
import unittest
+
from webtest import TestApp
+
from openstack.common import wsgi
from openstack.common import config
from openstack.common import extensions
@@ -33,6 +36,9 @@ test_conf_file = os.path.join(os.path.dirname(__file__), os.pardir,
os.pardir, 'etc', 'openstack-common.conf.test')
extensions_path = os.path.join(os.path.dirname(__file__), "extensions")
+NS = "{http://docs.openstack.org/}"
+ATOMNS = "{http://www.w3.org/2005/Atom}"
+
class ExtensionsTestApp(wsgi.Router):
@@ -254,19 +260,21 @@ class RequestExtensionTest(unittest.TestCase):
def _update_handler(req, res):
data = json.loads(res.body)
- data['uneditable'] = req.params['uneditable']
+ data['uneditable'] = json.loads(req.body)['uneditable']
res.body = json.dumps(data)
return res
base_app = TestApp(setup_base_app())
response = base_app.put("/dummy_resources/1",
- {'uneditable': "new_value"})
+ json.dumps({'uneditable': "new_value"}),
+ headers={'Content-Type': "application/json"})
self.assertEqual(response.json['uneditable'], "original_value")
ext_app = self._setup_app_with_request_handler(_update_handler,
'PUT')
ext_response = ext_app.put("/dummy_resources/1",
- {'uneditable': "new_value"})
+ json.dumps({'uneditable': "new_value"}),
+ headers={'Content-Type': "application/json"})
self.assertEqual(ext_response.json['uneditable'], "new_value")
def _setup_app_with_request_handler(self, handler, verb):
@@ -306,23 +314,145 @@ class ExtensionControllerTest(unittest.TestCase):
response = self.test_app.get("/extensions")
foxnsox = response.json["extensions"][0]
- self.assertEqual(foxnsox["alias"], "FOXNSOX")
- self.assertEqual(foxnsox["namespace"],
- "http://www.fox.in.socks/api/ext/pie/v1.0")
+ self.assertEqual(foxnsox, {
+ 'namespace': 'http://www.fox.in.socks/api/ext/pie/v1.0',
+ 'name': 'Fox In Socks',
+ 'updated': '2011-01-22T13:25:27-06:00',
+ 'description': 'The Fox In Socks Extension',
+ 'alias': 'FOXNSOX',
+ 'links': []
+ }
+ )
def test_extension_can_be_accessed_by_alias(self):
json_response = self.test_app.get("/extensions/FOXNSOX").json
- foxnsox_extension = json_response['extension']
-
- self.assertEqual(foxnsox_extension["alias"], "FOXNSOX")
- self.assertEqual(foxnsox_extension["namespace"],
- "http://www.fox.in.socks/api/ext/pie/v1.0")
+ foxnsox = json_response['extension']
+
+ self.assertEqual(foxnsox, {
+ 'namespace': 'http://www.fox.in.socks/api/ext/pie/v1.0',
+ 'name': 'Fox In Socks',
+ 'updated': '2011-01-22T13:25:27-06:00',
+ 'description': 'The Fox In Socks Extension',
+ 'alias': 'FOXNSOX',
+ 'links': []
+ }
+ )
def test_show_returns_not_found_for_non_existant_extension(self):
response = self.test_app.get("/extensions/non_existant", status="*")
self.assertEqual(response.status_int, 404)
+ def test_list_extensions_xml(self):
+ response = self.test_app.get("/extensions.xml")
+
+ self.assertEqual(200, response.status_int)
+ root = etree.XML(response.body)
+ self.assertEqual(root.tag.split('extensions')[0], NS)
+
+ # Make sure that Fox in Sox extension is correct.
+ exts = root.findall('{0}extension'.format(NS))
+ fox_ext = exts[0]
+ self.assertEqual(fox_ext.get('name'), 'Fox In Socks')
+ self.assertEqual(fox_ext.get('namespace'),
+ 'http://www.fox.in.socks/api/ext/pie/v1.0')
+ self.assertEqual(fox_ext.get('updated'), '2011-01-22T13:25:27-06:00')
+ self.assertEqual(fox_ext.findtext('{0}description'.format(NS)),
+ 'The Fox In Socks Extension')
+
+ def test_get_extension_xml(self):
+ response = self.test_app.get("/extensions/FOXNSOX.xml")
+ self.assertEqual(200, response.status_int)
+ xml = response.body
+
+ root = etree.XML(xml)
+ self.assertEqual(root.tag.split('extension')[0], NS)
+ self.assertEqual(root.get('alias'), 'FOXNSOX')
+ self.assertEqual(root.get('name'), 'Fox In Socks')
+ self.assertEqual(root.get('namespace'),
+ 'http://www.fox.in.socks/api/ext/pie/v1.0')
+ self.assertEqual(root.get('updated'), '2011-01-22T13:25:27-06:00')
+ self.assertEqual(root.findtext('{0}description'.format(NS)),
+ 'The Fox In Socks Extension')
+
+
+class ExtensionsXMLSerializerTest(unittest.TestCase):
+
+ def test_serialize_extenstion(self):
+ serializer = extensions.ExtensionsXMLSerializer()
+ data = {'extension': {
+ 'name': 'ext1',
+ 'namespace': 'http://docs.rack.com/servers/api/ext/pie/v1.0',
+ 'alias': 'RS-PIE',
+ 'updated': '2011-01-22T13:25:27-06:00',
+ 'description': 'Adds the capability to share an image.',
+ 'links': [{'rel': 'describedby',
+ 'type': 'application/pdf',
+ 'href': 'http://docs.rack.com/servers/api/ext/cs.pdf'},
+ {'rel': 'describedby',
+ 'type': 'application/vnd.sun.wadl+xml',
+ 'href': 'http://docs.rack.com/servers/api/ext/cs.wadl'}]}}
+
+ xml = serializer.serialize(data, 'show')
+ root = etree.XML(xml)
+ ext_dict = data['extension']
+ self.assertEqual(root.findtext('{0}description'.format(NS)),
+ ext_dict['description'])
+
+ for key in ['name', 'namespace', 'alias', 'updated']:
+ self.assertEqual(root.get(key), ext_dict[key])
+
+ link_nodes = root.findall('{0}link'.format(ATOMNS))
+ self.assertEqual(len(link_nodes), 2)
+ for i, link in enumerate(ext_dict['links']):
+ for key, value in link.items():
+ self.assertEqual(link_nodes[i].get(key), value)
+
+ def test_serialize_extensions(self):
+ serializer = extensions.ExtensionsXMLSerializer()
+ data = {"extensions": [{
+ "name": "Public Image Extension",
+ "namespace": "http://foo.com/api/ext/pie/v1.0",
+ "alias": "RS-PIE",
+ "updated": "2011-01-22T13:25:27-06:00",
+ "description": "Adds the capability to share an image.",
+ "links": [{"rel": "describedby",
+ "type": "application/pdf",
+ "type": "application/vnd.sun.wadl+xml",
+ "href": "http://foo.com/api/ext/cs-pie.pdf"},
+ {"rel": "describedby",
+ "type": "application/vnd.sun.wadl+xml",
+ "href": "http://foo.com/api/ext/cs-pie.wadl"}]},
+ {"name": "Cloud Block Storage",
+ "namespace": "http://foo.com/api/ext/cbs/v1.0",
+ "alias": "RS-CBS",
+ "updated": "2011-01-12T11:22:33-06:00",
+ "description": "Allows mounting cloud block storage.",
+ "links": [{"rel": "describedby",
+ "type": "application/pdf",
+ "href": "http://foo.com/api/ext/cs-cbs.pdf"},
+ {"rel": "describedby",
+ "type": "application/vnd.sun.wadl+xml",
+ "href": "http://foo.com/api/ext/cs-cbs.wadl"}]}]}
+
+ xml = serializer.serialize(data, 'index')
+ root = etree.XML(xml)
+ ext_elems = root.findall('{0}extension'.format(NS))
+ self.assertEqual(len(ext_elems), 2)
+ for i, ext_elem in enumerate(ext_elems):
+ ext_dict = data['extensions'][i]
+ self.assertEqual(ext_elem.findtext('{0}description'.format(NS)),
+ ext_dict['description'])
+
+ for key in ['name', 'namespace', 'alias', 'updated']:
+ self.assertEqual(ext_elem.get(key), ext_dict[key])
+
+ link_nodes = ext_elem.findall('{0}link'.format(ATOMNS))
+ self.assertEqual(len(link_nodes), 2)
+ for i, link in enumerate(ext_dict['links']):
+ for key, value in link.items():
+ self.assertEqual(link_nodes[i].get(key), value)
+
def app_factory(global_conf, **local_conf):
conf = global_conf.copy()
diff --git a/tests/unit/test_wsgi.py b/tests/unit/test_wsgi.py
new file mode 100644
index 0000000..7a5eaa4
--- /dev/null
+++ b/tests/unit/test_wsgi.py
@@ -0,0 +1,413 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import unittest
+import webob
+
+from openstack.common import exception
+from openstack.common import wsgi
+
+
+class RequestTest(unittest.TestCase):
+
+ def test_content_type_missing(self):
+ request = wsgi.Request.blank('/tests/123', method='POST')
+ request.body = "<body />"
+ self.assertEqual(None, request.get_content_type())
+
+ def test_content_type_unsupported(self):
+ request = wsgi.Request.blank('/tests/123', method='POST')
+ request.headers["Content-Type"] = "text/html"
+ request.body = "asdf<br />"
+ self.assertRaises(exception.InvalidContentType,
+ request.get_content_type)
+
+ def test_content_type_with_charset(self):
+ request = wsgi.Request.blank('/tests/123')
+ request.headers["Content-Type"] = "application/json; charset=UTF-8"
+ result = request.get_content_type()
+ self.assertEqual(result, "application/json")
+
+ def test_content_type_with_given_content_types(self):
+ request = wsgi.Request.blank('/tests/123')
+ request.headers["Content-Type"] = "application/new-type;"
+ result = request.get_content_type(["application/json",
+ "application/new-type"])
+ self.assertEqual(result, "application/new-type")
+
+ def test_content_type_from_accept_xml(self):
+ request = wsgi.Request.blank('/tests/123')
+ request.headers["Accept"] = "application/xml"
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/xml")
+
+ request = wsgi.Request.blank('/tests/123')
+ request.headers["Accept"] = "application/json"
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/json")
+
+ request = wsgi.Request.blank('/tests/123')
+ request.headers["Accept"] = "application/xml, application/json"
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/json")
+
+ request = wsgi.Request.blank('/tests/123')
+ request.headers["Accept"] = \
+ "application/json; q=0.3, application/xml; q=0.9"
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/xml")
+
+ def test_content_type_from_query_extension(self):
+ request = wsgi.Request.blank('/tests/123.xml')
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/xml")
+
+ request = wsgi.Request.blank('/tests/123.json')
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/json")
+
+ request = wsgi.Request.blank('/tests/123.invalid')
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/json")
+
+ def test_content_type_accept_and_query_extension(self):
+ request = wsgi.Request.blank('/tests/123.xml')
+ request.headers["Accept"] = "application/json"
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/xml")
+
+ def test_content_type_accept_default(self):
+ request = wsgi.Request.blank('/tests/123.unsupported')
+ request.headers["Accept"] = "application/unsupported1"
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/json")
+
+ def test_content_type_accept_with_given_content_types(self):
+ request = wsgi.Request.blank('/tests/123')
+ request.headers["Accept"] = "application/new_type"
+ result = request.best_match_content_type(["application/new_type"])
+ self.assertEqual(result, "application/new_type")
+
+
+class ActionDispatcherTest(unittest.TestCase):
+
+ def test_dispatch(self):
+ serializer = wsgi.ActionDispatcher()
+ serializer.create = lambda x: x
+ self.assertEqual(serializer.dispatch('pants', action='create'),
+ 'pants')
+
+ def test_dispatch_action_None(self):
+ serializer = wsgi.ActionDispatcher()
+ serializer.create = lambda x: x + ' pants'
+ serializer.default = lambda x: x + ' trousers'
+ self.assertEqual(serializer.dispatch('Two', action=None),
+ 'Two trousers')
+
+ def test_dispatch_default(self):
+ serializer = wsgi.ActionDispatcher()
+ serializer.create = lambda x: x + ' pants'
+ serializer.default = lambda x: x + ' trousers'
+ self.assertEqual(serializer.dispatch('Two', action='update'),
+ 'Two trousers')
+
+
+class ResponseHeadersSerializerTest(unittest.TestCase):
+
+ def test_default(self):
+ serializer = wsgi.ResponseHeadersSerializer()
+ response = webob.Response()
+ serializer.serialize(response, {'v': '123'}, 'asdf')
+ self.assertEqual(response.status_int, 200)
+
+ def test_custom(self):
+ class Serializer(wsgi.ResponseHeadersSerializer):
+ def update(self, response, data):
+ response.status_int = 404
+ response.headers['X-Custom-Header'] = data['v']
+ serializer = Serializer()
+ response = webob.Response()
+ serializer.serialize(response, {'v': '123'}, 'update')
+ self.assertEqual(response.status_int, 404)
+ self.assertEqual(response.headers['X-Custom-Header'], '123')
+
+
+class DictSerializerTest(unittest.TestCase):
+
+ def test_dispatch_default(self):
+ serializer = wsgi.DictSerializer()
+ self.assertEqual(serializer.serialize({}, 'NonExistantAction'), '')
+
+
+class XMLDictSerializerTest(unittest.TestCase):
+
+ def test_xml(self):
+ input_dict = dict(servers=dict(a=(2, 3)))
+ expected_xml = """<servers xmlns="asdf">
+ <a>(2,3)</a>
+ </servers>"""
+ serializer = wsgi.XMLDictSerializer(xmlns="asdf")
+ result = serializer.serialize(input_dict)
+ result = result.replace('\n', '').replace(' ', '')
+ expected_xml = expected_xml.replace('\n', '').replace(' ', '')
+ self.assertEqual(result, expected_xml)
+
+
+class JSONDictSerializerTest(unittest.TestCase):
+
+ def test_json(self):
+ input_dict = dict(servers=dict(a=(2, 3)))
+ expected_json = '{"servers":{"a":[2,3]}}'
+ serializer = wsgi.JSONDictSerializer()
+ result = serializer.serialize(input_dict)
+ result = result.replace('\n', '').replace(' ', '')
+ self.assertEqual(result, expected_json)
+
+
+class TextDeserializerTest(unittest.TestCase):
+
+ def test_dispatch_default(self):
+ deserializer = wsgi.TextDeserializer()
+ self.assertEqual(deserializer.deserialize({}, 'update'), {})
+
+
+class JSONDeserializerTest(unittest.TestCase):
+
+ def test_json(self):
+ data = """{"a": {
+ "a1": "1",
+ "a2": "2",
+ "bs": ["1", "2", "3", {"c": {"c1": "1"}}],
+ "d": {"e": "1"},
+ "f": "1"}}"""
+ as_dict = {
+ 'body': {
+ 'a': {
+ 'a1': '1',
+ 'a2': '2',
+ 'bs': ['1', '2', '3', {'c': {'c1': '1'}}],
+ 'd': {'e': '1'},
+ 'f': '1',
+ },
+ },
+ }
+ deserializer = wsgi.JSONDeserializer()
+ self.assertEqual(deserializer.deserialize(data), as_dict)
+
+
+class XMLDeserializerTest(unittest.TestCase):
+
+ def test_xml(self):
+ xml = """
+ <a a1="1" a2="2">
+ <bs><b>1</b><b>2</b><b>3</b><b><c c1="1"/></b></bs>
+ <d><e>1</e></d>
+ <f>1</f>
+ </a>
+ """.strip()
+ as_dict = {
+ 'body': {
+ 'a': {
+ 'a1': '1',
+ 'a2': '2',
+ 'bs': ['1', '2', '3', {'c': {'c1': '1'}}],
+ 'd': {'e': '1'},
+ 'f': '1',
+ },
+ },
+ }
+ metadata = {'plurals': {'bs': 'b', 'ts': 't'}}
+ deserializer = wsgi.XMLDeserializer(metadata=metadata)
+ self.assertEqual(deserializer.deserialize(xml), as_dict)
+
+ def test_xml_empty(self):
+ xml = '<a></a>'
+ as_dict = {"body": {"a": {}}}
+ deserializer = wsgi.XMLDeserializer()
+ self.assertEqual(deserializer.deserialize(xml), as_dict)
+
+
+class RequestHeadersDeserializerTest(unittest.TestCase):
+
+ def test_default(self):
+ deserializer = wsgi.RequestHeadersDeserializer()
+ req = wsgi.Request.blank('/')
+ self.assertEqual(deserializer.deserialize(req, 'nonExistant'), {})
+
+ def test_custom(self):
+ class Deserializer(wsgi.RequestHeadersDeserializer):
+ def update(self, request):
+ return {'a': request.headers['X-Custom-Header']}
+ deserializer = Deserializer()
+ req = wsgi.Request.blank('/')
+ req.headers['X-Custom-Header'] = 'b'
+ self.assertEqual(deserializer.deserialize(req, 'update'), {'a': 'b'})
+
+
+class ResponseSerializerTest(unittest.TestCase):
+
+ def setUp(self):
+ class JSONSerializer(object):
+ def serialize(self, data, action='default'):
+ return 'pew_json'
+
+ class XMLSerializer(object):
+ def serialize(self, data, action='default'):
+ return 'pew_xml'
+
+ class HeadersSerializer(object):
+ def serialize(self, response, data, action):
+ response.status_int = 404
+
+ self.body_serializers = {
+ 'application/json': JSONSerializer(),
+ 'application/XML': XMLSerializer(),
+ }
+
+ self.serializer = wsgi.ResponseSerializer(self.body_serializers,
+ HeadersSerializer())
+
+ def tearDown(self):
+ pass
+
+ def test_get_serializer(self):
+ ctype = 'application/json'
+ self.assertEqual(self.serializer.get_body_serializer(ctype),
+ self.body_serializers[ctype])
+
+ def test_get_serializer_unknown_content_type(self):
+ self.assertRaises(exception.InvalidContentType,
+ self.serializer.get_body_serializer,
+ 'application/unknown')
+
+ def test_serialize_response(self):
+ response = self.serializer.serialize({}, 'application/json')
+ self.assertEqual(response.headers['Content-Type'], 'application/json')
+ self.assertEqual(response.body, 'pew_json')
+ self.assertEqual(response.status_int, 404)
+
+ def test_serialize_response_None(self):
+ response = self.serializer.serialize(None, 'application/json')
+
+ self.assertEqual(response.headers['Content-Type'], 'application/json')
+ self.assertEqual(response.body, '')
+ self.assertEqual(response.status_int, 404)
+
+ def test_serialize_response_dict_to_unknown_content_type(self):
+ self.assertRaises(exception.InvalidContentType,
+ self.serializer.serialize,
+ {}, 'application/unknown')
+
+
+class RequestDeserializerTest(unittest.TestCase):
+
+ def setUp(self):
+ class JSONDeserializer(object):
+ def deserialize(self, data, action='default'):
+ return 'pew_json'
+
+ class XMLDeserializer(object):
+ def deserialize(self, data, action='default'):
+ return 'pew_xml'
+
+ self.body_deserializers = {
+ 'application/json': JSONDeserializer(),
+ 'application/XML': XMLDeserializer(),
+ }
+
+ self.deserializer = wsgi.RequestDeserializer(self.body_deserializers)
+
+ def test_get_deserializer(self):
+ expected = self.deserializer.get_body_deserializer('application/json')
+ self.assertEqual(expected, self.body_deserializers['application/json'])
+
+ def test_get_deserializer_unknown_content_type(self):
+ self.assertRaises(exception.InvalidContentType,
+ self.deserializer.get_body_deserializer,
+ 'application/unknown')
+
+ def test_get_expected_content_type(self):
+ request = wsgi.Request.blank('/')
+ request.headers['Accept'] = 'application/json'
+ self.assertEqual(self.deserializer.get_expected_content_type(request),
+ 'application/json')
+
+ def test_get_action_args(self):
+ env = {
+ 'wsgiorg.routing_args': [None, {
+ 'controller': None,
+ 'format': None,
+ 'action': 'update',
+ 'id': 12,
+ }],
+ }
+
+ expected = {'action': 'update', 'id': 12}
+
+ self.assertEqual(self.deserializer.get_action_args(env), expected)
+
+ def test_deserialize(self):
+ def fake_get_routing_args(request):
+ return {'action': 'create'}
+ self.deserializer.get_action_args = fake_get_routing_args
+
+ request = wsgi.Request.blank('/')
+ request.headers['Accept'] = 'application/xml'
+
+ deserialized = self.deserializer.deserialize(request)
+ expected = ('create', {}, 'application/xml')
+
+ self.assertEqual(expected, deserialized)
+
+
+class ResourceTest(unittest.TestCase):
+
+ def test_dispatch(self):
+ class Controller(object):
+ def index(self, req, pants=None):
+ return pants
+
+ resource = wsgi.Resource(Controller())
+ actual = resource.dispatch(resource.controller,
+ 'index', None, pants='off')
+ expected = 'off'
+ self.assertEqual(actual, expected)
+
+ def test_dispatch_unknown_controller_action(self):
+ class Controller(object):
+ def index(self, req, pants=None):
+ return pants
+
+ resource = wsgi.Resource(Controller())
+ self.assertRaises(AttributeError, resource.dispatch,
+ resource.controller, 'create', None, {})
+
+ def test_malformed_request_body_throws_bad_request(self):
+ resource = wsgi.Resource(None)
+ request = wsgi.Request.blank("/", body="{mal:formed", method='POST',
+ headers={'Content-Type': "application/json"})
+
+ response = resource(request)
+ self.assertEqual(response.status, '400 Bad Request')
+
+ def test_wrong_content_type_throws_unsupported_media_type_error(self):
+ resource = wsgi.Resource(None)
+ request = wsgi.Request.blank("/", body="{some:json}", method='POST',
+ headers={'Content-Type': "xxx"})
+
+ response = resource(request)
+ self.assertEqual(response.status, '415 Unsupported Media Type')