summaryrefslogtreecommitdiffstats
path: root/tests/test_content_types.py
diff options
context:
space:
mode:
authorJamie Lennox <jlennox@redhat.com>2013-05-14 10:43:42 +1000
committerJamie Lennox <jlennox@redhat.com>2013-05-22 09:18:26 +1000
commit09a5c436c5596dd7e767b1c1c89279b8b697eec1 (patch)
treee07e13854d63938b18238b63abf045de08297934 /tests/test_content_types.py
parentfe57d6de5b82a8b2ae07bfb136a7b17183c04cbb (diff)
downloadkeystone-09a5c436c5596dd7e767b1c1c89279b8b697eec1.tar.gz
keystone-09a5c436c5596dd7e767b1c1c89279b8b697eec1.tar.xz
keystone-09a5c436c5596dd7e767b1c1c89279b8b697eec1.zip
Use webtest for v2 and v3 API testing.
The intention is to remain as close as possible to the original implementation and so leaves a number of easy cleanups and optimisations until a later patch. In writing tests their are a number of changes for API tests that are a result of webob/webtest restrictions: * response.body is now the string body and response.result is the parsed dictionary. * response.status is now a string eg. '200 OK', use response.status_code to get the integer * response.getheader no longer exists. response.headers is a dictionary like object that can be accessed (case independently) with [] or .get() Working towards: blueprint extract-eventlet Change-Id: I393b4bad2fd6eacc0b8ae98fc204d1323014b5e4
Diffstat (limited to 'tests/test_content_types.py')
-rw-r--r--tests/test_content_types.py251
1 files changed, 116 insertions, 135 deletions
diff --git a/tests/test_content_types.py b/tests/test_content_types.py
index 14dd1db0..73771569 100644
--- a/tests/test_content_types.py
+++ b/tests/test_content_types.py
@@ -14,11 +14,12 @@
# License for the specific language governing permissions and limitations
# under the License.
-import httplib
+import io
import uuid
from lxml import etree
import nose.exc
+import webtest
from keystone.common import serializer
from keystone.openstack.common import jsonutils
@@ -63,8 +64,10 @@ class RestfulTestCase(test.TestCase):
self.load_backends()
self.load_fixtures(default_fixtures)
- self.public_server = self.serveapp('keystone', name='main')
- self.admin_server = self.serveapp('keystone', name='admin')
+ self.public_app = webtest.TestApp(
+ self.loadapp('keystone', name='main'))
+ self.admin_app = webtest.TestApp(
+ self.loadapp('keystone', name='admin'))
# TODO(termie): is_admin is being deprecated once the policy stuff
# is all working
@@ -77,40 +80,31 @@ class RestfulTestCase(test.TestCase):
def tearDown(self):
"""Kill running servers and release references to avoid leaks."""
- self.public_server.kill()
- self.admin_server.kill()
- self.public_server = None
- self.admin_server = None
+ self.public_app = None
+ self.admin_app = None
super(RestfulTestCase, self).tearDown()
- def request(self, host='0.0.0.0', port=80, method='GET', path='/',
- headers=None, body=None, expected_status=None):
- """Perform request and fetch httplib.HTTPResponse from the server."""
-
- # Initialize headers dictionary
- headers = {} if not headers else headers
-
- connection = httplib.HTTPConnection(host, port, timeout=100000)
-
- # Perform the request
- connection.request(method, path, body, headers)
+ def request(self, app, path, body=None, headers=None, token=None,
+ expected_status=None, **kwargs):
+ if headers:
+ headers = dict([(str(k), str(v)) for k, v in headers.iteritems()])
+ else:
+ headers = {}
- # Retrieve the response so we can close the connection
- response = connection.getresponse()
+ if token:
+ headers['X-Auth-Token'] = str(token)
- response.body = response.read()
+ # setting body this way because of:
+ # https://github.com/Pylons/webtest/issues/71
+ if body:
+ kwargs['body_file'] = io.BytesIO(body)
- # Close the connection
- connection.close()
+ # sets environ['REMOTE_ADDR']
+ kwargs.setdefault('remote_addr', 'localhost')
- # Automatically assert HTTP status code
- if expected_status:
- self.assertResponseStatus(response, expected_status)
- else:
- self.assertResponseSuccessful(response)
- self.assertValidResponseHeaders(response)
+ response = app.request(path, headers=headers,
+ status=expected_status, **kwargs)
- # Contains the response headers, body, etc
return response
def assertResponseSuccessful(self, response):
@@ -124,7 +118,7 @@ class RestfulTestCase(test.TestCase):
self.assertResponseSuccessful(response, 203)
"""
self.assertTrue(
- response.status >= 200 and response.status <= 299,
+ response.status_code >= 200 and response.status_code <= 299,
'Status code %d is outside of the expected range (2xx)\n\n%s' %
(response.status, response.body))
@@ -139,14 +133,14 @@ class RestfulTestCase(test.TestCase):
self.assertResponseStatus(response, 203)
"""
self.assertEqual(
- response.status,
+ response.status_code,
expected_status,
'Status code %s is not %s, as expected)\n\n%s' %
- (response.status, expected_status, response.body))
+ (response.status_code, expected_status, response.body))
def assertValidResponseHeaders(self, response):
"""Ensures that response headers appear as expected."""
- self.assertIn('X-Auth-Token', response.getheader('Vary'))
+ self.assertIn('X-Auth-Token', response.headers.get('Vary'))
def _to_content_type(self, body, headers, content_type=None):
"""Attempt to encode JSON and XML automatically."""
@@ -167,21 +161,18 @@ class RestfulTestCase(test.TestCase):
"""Attempt to decode JSON and XML automatically, if detected."""
content_type = content_type or self.content_type
- # make the original response body available, for convenience
- response.raw = response.body
-
if response.body is not None and response.body.strip():
# if a body is provided, a Content-Type is also expected
- header = response.getheader('Content-Type', None)
+ header = response.headers.get('Content-Type', None)
self.assertIn(content_type, header)
if content_type == 'json':
- response.body = jsonutils.loads(response.body)
+ response.result = jsonutils.loads(response.body)
elif content_type == 'xml':
- response.body = etree.fromstring(response.body)
+ response.result = etree.fromstring(response.body)
def restful_request(self, method='GET', headers=None, body=None,
- token=None, content_type=None, **kwargs):
+ content_type=None, **kwargs):
"""Serializes/deserializes json/xml as request/response body.
.. WARNING::
@@ -193,9 +184,6 @@ class RestfulTestCase(test.TestCase):
# Initialize headers dictionary
headers = {} if not headers else headers
- if token is not None:
- headers['X-Auth-Token'] = token
-
body = self._to_content_type(body, headers, content_type)
# Perform the HTTP request/response
@@ -205,32 +193,26 @@ class RestfulTestCase(test.TestCase):
self._from_content_type(response, content_type)
# we can save some code & improve coverage by always doing this
- if method != 'HEAD' and response.status >= 400:
+ if method != 'HEAD' and response.status_code >= 400:
self.assertValidErrorResponse(response)
# Contains the decoded response.body
return response
- def _get_port(self, server):
- return server.socket_info['socket'][1]
-
- def _public_port(self):
- return self._get_port(self.public_server)
-
- def _admin_port(self):
- return self._get_port(self.admin_server)
+ def _request(self, convert=True, **kwargs):
+ if convert:
+ response = self.restful_request(**kwargs)
+ else:
+ response = self.request(**kwargs)
- def public_request(self, port=None, **kwargs):
- kwargs['port'] = port or self._public_port()
- response = self.restful_request(**kwargs)
self.assertValidResponseHeaders(response)
return response
- def admin_request(self, port=None, **kwargs):
- kwargs['port'] = port or self._admin_port()
- response = self.restful_request(**kwargs)
- self.assertValidResponseHeaders(response)
- return response
+ def public_request(self, **kwargs):
+ return self._request(app=self.public_app, **kwargs)
+
+ def admin_request(self, **kwargs):
+ return self._request(app=self.admin_app, **kwargs)
def get_scoped_token(self):
"""Convenience method so that we can test authenticated requests."""
@@ -597,12 +579,12 @@ class JsonTestCase(RestfulTestCase, CoreApiTests):
def _get_token_id(self, r):
"""Applicable only to JSON."""
- return r.body['access']['token']['id']
+ return r.result['access']['token']['id']
def assertValidErrorResponse(self, r):
- self.assertIsNotNone(r.body.get('error'))
- self.assertValidError(r.body['error'])
- self.assertEqual(r.body['error']['code'], r.status)
+ self.assertIsNotNone(r.result.get('error'))
+ self.assertValidError(r.result['error'])
+ self.assertEqual(r.result['error']['code'], r.status_code)
def assertValidExtension(self, extension):
super(JsonTestCase, self).assertValidExtension(extension)
@@ -614,42 +596,42 @@ class JsonTestCase(RestfulTestCase, CoreApiTests):
self.assertValidExtensionLink(link)
def assertValidExtensionListResponse(self, r):
- self.assertIsNotNone(r.body.get('extensions'))
- self.assertIsNotNone(r.body['extensions'].get('values'))
- self.assertNotEmpty(r.body['extensions'].get('values'))
- for extension in r.body['extensions']['values']:
+ self.assertIsNotNone(r.result.get('extensions'))
+ self.assertIsNotNone(r.result['extensions'].get('values'))
+ self.assertNotEmpty(r.result['extensions'].get('values'))
+ for extension in r.result['extensions']['values']:
self.assertValidExtension(extension)
def assertValidExtensionResponse(self, r):
- self.assertValidExtension(r.body.get('extension'))
+ self.assertValidExtension(r.result.get('extension'))
def assertValidAuthenticationResponse(self, r,
require_service_catalog=False):
- self.assertIsNotNone(r.body.get('access'))
- self.assertIsNotNone(r.body['access'].get('token'))
- self.assertIsNotNone(r.body['access'].get('user'))
+ self.assertIsNotNone(r.result.get('access'))
+ self.assertIsNotNone(r.result['access'].get('token'))
+ self.assertIsNotNone(r.result['access'].get('user'))
# validate token
- self.assertIsNotNone(r.body['access']['token'].get('id'))
- self.assertIsNotNone(r.body['access']['token'].get('expires'))
- tenant = r.body['access']['token'].get('tenant')
+ self.assertIsNotNone(r.result['access']['token'].get('id'))
+ self.assertIsNotNone(r.result['access']['token'].get('expires'))
+ tenant = r.result['access']['token'].get('tenant')
if tenant is not None:
# validate tenant
self.assertIsNotNone(tenant.get('id'))
self.assertIsNotNone(tenant.get('name'))
# validate user
- self.assertIsNotNone(r.body['access']['user'].get('id'))
- self.assertIsNotNone(r.body['access']['user'].get('name'))
+ self.assertIsNotNone(r.result['access']['user'].get('id'))
+ self.assertIsNotNone(r.result['access']['user'].get('name'))
if require_service_catalog:
# roles are only provided with a service catalog
- roles = r.body['access']['user'].get('roles')
+ roles = r.result['access']['user'].get('roles')
self.assertNotEmpty(roles)
for role in roles:
self.assertIsNotNone(role.get('name'))
- serviceCatalog = r.body['access'].get('serviceCatalog')
+ serviceCatalog = r.result['access'].get('serviceCatalog')
# validate service catalog
if require_service_catalog:
self.assertIsNotNone(serviceCatalog)
@@ -657,7 +639,7 @@ class JsonTestCase(RestfulTestCase, CoreApiTests):
self.assertTrue(isinstance(serviceCatalog, list))
if require_service_catalog:
self.assertNotEmpty(serviceCatalog)
- for service in r.body['access']['serviceCatalog']:
+ for service in r.result['access']['serviceCatalog']:
# validate service
self.assertIsNotNone(service.get('name'))
self.assertIsNotNone(service.get('type'))
@@ -670,25 +652,25 @@ class JsonTestCase(RestfulTestCase, CoreApiTests):
self.assertIsNotNone(endpoint.get('publicURL'))
def assertValidTenantListResponse(self, r):
- self.assertIsNotNone(r.body.get('tenants'))
- self.assertNotEmpty(r.body['tenants'])
- for tenant in r.body['tenants']:
+ self.assertIsNotNone(r.result.get('tenants'))
+ self.assertNotEmpty(r.result['tenants'])
+ for tenant in r.result['tenants']:
self.assertValidTenant(tenant)
self.assertIsNotNone(tenant.get('enabled'))
self.assertIn(tenant.get('enabled'), [True, False])
def assertValidUserResponse(self, r):
- self.assertIsNotNone(r.body.get('user'))
- self.assertValidUser(r.body['user'])
+ self.assertIsNotNone(r.result.get('user'))
+ self.assertValidUser(r.result['user'])
def assertValidTenantResponse(self, r):
- self.assertIsNotNone(r.body.get('tenant'))
- self.assertValidTenant(r.body['tenant'])
+ self.assertIsNotNone(r.result.get('tenant'))
+ self.assertValidTenant(r.result['tenant'])
def assertValidRoleListResponse(self, r):
- self.assertIsNotNone(r.body.get('roles'))
- self.assertNotEmpty(r.body['roles'])
- for role in r.body['roles']:
+ self.assertIsNotNone(r.result.get('roles'))
+ self.assertNotEmpty(r.result['roles'])
+ for role in r.result['roles']:
self.assertValidRole(role)
def assertValidVersion(self, version):
@@ -707,19 +689,19 @@ class JsonTestCase(RestfulTestCase, CoreApiTests):
self.assertIsNotNone(media.get('type'))
def assertValidMultipleChoiceResponse(self, r):
- self.assertIsNotNone(r.body.get('versions'))
- self.assertIsNotNone(r.body['versions'].get('values'))
- self.assertNotEmpty(r.body['versions']['values'])
- for version in r.body['versions']['values']:
+ self.assertIsNotNone(r.result.get('versions'))
+ self.assertIsNotNone(r.result['versions'].get('values'))
+ self.assertNotEmpty(r.result['versions']['values'])
+ for version in r.result['versions']['values']:
self.assertValidVersion(version)
def assertValidVersionResponse(self, r):
- self.assertValidVersion(r.body.get('version'))
+ self.assertValidVersion(r.result.get('version'))
def assertValidEndpointListResponse(self, r):
- self.assertIsNotNone(r.body.get('endpoints'))
- self.assertNotEmpty(r.body['endpoints'])
- for endpoint in r.body['endpoints']:
+ self.assertIsNotNone(r.result.get('endpoints'))
+ self.assertNotEmpty(r.result['endpoints'])
+ for endpoint in r.result['endpoints']:
self.assertIsNotNone(endpoint.get('id'))
self.assertIsNotNone(endpoint.get('name'))
self.assertIsNotNone(endpoint.get('type'))
@@ -778,16 +760,15 @@ class JsonTestCase(RestfulTestCase, CoreApiTests):
def test_fetch_revocation_list_admin_200(self):
token = self.get_scoped_token()
- r = self.restful_request(
+ r = self.admin_request(
method='GET',
path='/v2.0/tokens/revoked',
token=token,
- expected_status=200,
- port=self._admin_port())
+ expected_status=200)
self.assertValidRevocationListResponse(r)
def assertValidRevocationListResponse(self, response):
- self.assertIsNotNone(response.body['signed'])
+ self.assertIsNotNone(response.result['signed'])
def test_create_update_user_json_invalid_enabled_type(self):
# Enforce usage of boolean for 'enabled' field in JSON
@@ -831,18 +812,18 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
content_type = 'xml'
def _get_token_id(self, r):
- return r.body.find(self._tag('token')).get('id')
+ return r.result.find(self._tag('token')).get('id')
def _tag(self, tag_name, xmlns=None):
"""Helper method to build an namespaced element name."""
return '{%(ns)s}%(tag)s' % {'ns': xmlns or self.xmlns, 'tag': tag_name}
def assertValidErrorResponse(self, r):
- xml = r.body
+ xml = r.result
self.assertEqual(xml.tag, self._tag('error'))
self.assertValidError(xml)
- self.assertEqual(xml.get('code'), str(r.status))
+ self.assertEqual(xml.get('code'), str(r.status_code))
def assertValidExtension(self, extension):
super(XmlTestCase, self).assertValidExtension(extension)
@@ -855,7 +836,7 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
self.assertValidExtensionLink(link)
def assertValidExtensionListResponse(self, r):
- xml = r.body
+ xml = r.result
self.assertEqual(xml.tag, self._tag('extensions'))
self.assertNotEmpty(xml.findall(self._tag('extension')))
@@ -863,7 +844,7 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
self.assertValidExtension(extension)
def assertValidExtensionResponse(self, r):
- xml = r.body
+ xml = r.result
self.assertEqual(xml.tag, self._tag('extension'))
self.assertValidExtension(xml)
@@ -886,7 +867,7 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
self.assertIsNotNone(media.get('type'))
def assertValidMultipleChoiceResponse(self, r):
- xml = r.body
+ xml = r.result
self.assertEqual(xml.tag, self._tag('versions'))
self.assertNotEmpty(xml.findall(self._tag('version')))
@@ -894,13 +875,13 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
self.assertValidVersion(version)
def assertValidVersionResponse(self, r):
- xml = r.body
+ xml = r.result
self.assertEqual(xml.tag, self._tag('version'))
self.assertValidVersion(xml)
def assertValidEndpointListResponse(self, r):
- xml = r.body
+ xml = r.result
self.assertEqual(xml.tag, self._tag('endpoints'))
self.assertNotEmpty(xml.findall(self._tag('endpoint')))
@@ -913,28 +894,28 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
self.assertIsNotNone(endpoint.get('adminURL'))
def assertValidTenantResponse(self, r):
- xml = r.body
+ xml = r.result
self.assertEqual(xml.tag, self._tag('tenant'))
self.assertValidTenant(xml)
def assertValidUserResponse(self, r):
- xml = r.body
+ xml = r.result
self.assertEqual(xml.tag, self._tag('user'))
self.assertValidUser(xml)
def assertValidRoleListResponse(self, r):
- xml = r.body
+ xml = r.result
self.assertEqual(xml.tag, self._tag('roles'))
- self.assertNotEmpty(r.body.findall(self._tag('role')))
- for role in r.body.findall(self._tag('role')):
+ self.assertNotEmpty(r.result.findall(self._tag('role')))
+ for role in r.result.findall(self._tag('role')):
self.assertValidRole(role)
def assertValidAuthenticationResponse(self, r,
require_service_catalog=False):
- xml = r.body
+ xml = r.result
self.assertEqual(xml.tag, self._tag('access'))
# validate token
@@ -981,18 +962,17 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
self.assertIsNotNone(endpoint.get('publicURL'))
def assertValidTenantListResponse(self, r):
- xml = r.body
+ xml = r.result
self.assertEqual(xml.tag, self._tag('tenants'))
- self.assertNotEmpty(r.body)
- for tenant in r.body.findall(self._tag('tenant')):
+ self.assertNotEmpty(r.result)
+ for tenant in r.result.findall(self._tag('tenant')):
self.assertValidTenant(tenant)
self.assertIn(tenant.get('enabled'), ['true', 'false'])
def test_authenticate_with_invalid_xml_in_password(self):
# public_request would auto escape the ampersand
- self.request(
- port=self._public_port(),
+ self.public_request(
method='POST',
path='/v2.0/tokens',
headers={
@@ -1005,15 +985,15 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
<passwordCredentials username="FOO" password="&"/>
</auth>
""",
- expected_status=400)
+ expected_status=400,
+ convert=False)
def test_add_tenant_xml(self):
"""
verify create a tenant without providing description field
"""
token = self.get_scoped_token()
- r = self.request(
- port=self._admin_port(),
+ r = self.admin_request(
method='POST',
path='/v2.0/tenants',
headers={
@@ -1026,19 +1006,19 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
enabled="true" name="ACME Corp">
<description></description>
</tenant>
- """)
+ """,
+ convert=False)
self._from_content_type(r, 'json')
- self.assertIsNotNone(r.body.get('tenant'))
- self.assertValidTenant(r.body['tenant'])
- self.assertEqual(r.body['tenant'].get('description'), "")
+ self.assertIsNotNone(r.result.get('tenant'))
+ self.assertValidTenant(r.result['tenant'])
+ self.assertEqual(r.result['tenant'].get('description'), "")
def test_add_tenant_json(self):
"""
verify create a tenant without providing description field
"""
token = self.get_scoped_token()
- r = self.request(
- port=self._admin_port(),
+ r = self.admin_request(
method='POST',
path='/v2.0/tenants',
headers={
@@ -1051,8 +1031,9 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
"description":"",
"enabled":"true"}
}
- """)
+ """,
+ convert=False)
self._from_content_type(r, 'json')
- self.assertIsNotNone(r.body.get('tenant'))
- self.assertValidTenant(r.body['tenant'])
- self.assertEqual(r.body['tenant'].get('description'), "")
+ self.assertIsNotNone(r.result.get('tenant'))
+ self.assertValidTenant(r.result['tenant'])
+ self.assertEqual(r.result['tenant'].get('description'), "")