diff options
author | Jamie Lennox <jlennox@redhat.com> | 2013-05-14 10:43:42 +1000 |
---|---|---|
committer | Jamie Lennox <jlennox@redhat.com> | 2013-05-22 09:18:26 +1000 |
commit | 09a5c436c5596dd7e767b1c1c89279b8b697eec1 (patch) | |
tree | e07e13854d63938b18238b63abf045de08297934 /tests/test_content_types.py | |
parent | fe57d6de5b82a8b2ae07bfb136a7b17183c04cbb (diff) | |
download | keystone-09a5c436c5596dd7e767b1c1c89279b8b697eec1.tar.gz keystone-09a5c436c5596dd7e767b1c1c89279b8b697eec1.tar.xz keystone-09a5c436c5596dd7e767b1c1c89279b8b697eec1.zip |
Use webtest for v2 and v3 API testing.
The intention is to remain as close as possible to the original
implementation and so leaves a number of easy cleanups and optimisations
until a later patch.
In writing tests their are a number of changes for API tests that are a
result of webob/webtest restrictions:
* response.body is now the string body and response.result is the parsed
dictionary.
* response.status is now a string eg. '200 OK', use
response.status_code to get the integer
* response.getheader no longer exists. response.headers is a dictionary
like object that can be accessed (case independently) with [] or
.get()
Working towards: blueprint extract-eventlet
Change-Id: I393b4bad2fd6eacc0b8ae98fc204d1323014b5e4
Diffstat (limited to 'tests/test_content_types.py')
-rw-r--r-- | tests/test_content_types.py | 251 |
1 files changed, 116 insertions, 135 deletions
diff --git a/tests/test_content_types.py b/tests/test_content_types.py index 14dd1db0..73771569 100644 --- a/tests/test_content_types.py +++ b/tests/test_content_types.py @@ -14,11 +14,12 @@ # License for the specific language governing permissions and limitations # under the License. -import httplib +import io import uuid from lxml import etree import nose.exc +import webtest from keystone.common import serializer from keystone.openstack.common import jsonutils @@ -63,8 +64,10 @@ class RestfulTestCase(test.TestCase): self.load_backends() self.load_fixtures(default_fixtures) - self.public_server = self.serveapp('keystone', name='main') - self.admin_server = self.serveapp('keystone', name='admin') + self.public_app = webtest.TestApp( + self.loadapp('keystone', name='main')) + self.admin_app = webtest.TestApp( + self.loadapp('keystone', name='admin')) # TODO(termie): is_admin is being deprecated once the policy stuff # is all working @@ -77,40 +80,31 @@ class RestfulTestCase(test.TestCase): def tearDown(self): """Kill running servers and release references to avoid leaks.""" - self.public_server.kill() - self.admin_server.kill() - self.public_server = None - self.admin_server = None + self.public_app = None + self.admin_app = None super(RestfulTestCase, self).tearDown() - def request(self, host='0.0.0.0', port=80, method='GET', path='/', - headers=None, body=None, expected_status=None): - """Perform request and fetch httplib.HTTPResponse from the server.""" - - # Initialize headers dictionary - headers = {} if not headers else headers - - connection = httplib.HTTPConnection(host, port, timeout=100000) - - # Perform the request - connection.request(method, path, body, headers) + def request(self, app, path, body=None, headers=None, token=None, + expected_status=None, **kwargs): + if headers: + headers = dict([(str(k), str(v)) for k, v in headers.iteritems()]) + else: + headers = {} - # Retrieve the response so we can close the connection - response = connection.getresponse() + if token: + headers['X-Auth-Token'] = str(token) - response.body = response.read() + # setting body this way because of: + # https://github.com/Pylons/webtest/issues/71 + if body: + kwargs['body_file'] = io.BytesIO(body) - # Close the connection - connection.close() + # sets environ['REMOTE_ADDR'] + kwargs.setdefault('remote_addr', 'localhost') - # Automatically assert HTTP status code - if expected_status: - self.assertResponseStatus(response, expected_status) - else: - self.assertResponseSuccessful(response) - self.assertValidResponseHeaders(response) + response = app.request(path, headers=headers, + status=expected_status, **kwargs) - # Contains the response headers, body, etc return response def assertResponseSuccessful(self, response): @@ -124,7 +118,7 @@ class RestfulTestCase(test.TestCase): self.assertResponseSuccessful(response, 203) """ self.assertTrue( - response.status >= 200 and response.status <= 299, + response.status_code >= 200 and response.status_code <= 299, 'Status code %d is outside of the expected range (2xx)\n\n%s' % (response.status, response.body)) @@ -139,14 +133,14 @@ class RestfulTestCase(test.TestCase): self.assertResponseStatus(response, 203) """ self.assertEqual( - response.status, + response.status_code, expected_status, 'Status code %s is not %s, as expected)\n\n%s' % - (response.status, expected_status, response.body)) + (response.status_code, expected_status, response.body)) def assertValidResponseHeaders(self, response): """Ensures that response headers appear as expected.""" - self.assertIn('X-Auth-Token', response.getheader('Vary')) + self.assertIn('X-Auth-Token', response.headers.get('Vary')) def _to_content_type(self, body, headers, content_type=None): """Attempt to encode JSON and XML automatically.""" @@ -167,21 +161,18 @@ class RestfulTestCase(test.TestCase): """Attempt to decode JSON and XML automatically, if detected.""" content_type = content_type or self.content_type - # make the original response body available, for convenience - response.raw = response.body - if response.body is not None and response.body.strip(): # if a body is provided, a Content-Type is also expected - header = response.getheader('Content-Type', None) + header = response.headers.get('Content-Type', None) self.assertIn(content_type, header) if content_type == 'json': - response.body = jsonutils.loads(response.body) + response.result = jsonutils.loads(response.body) elif content_type == 'xml': - response.body = etree.fromstring(response.body) + response.result = etree.fromstring(response.body) def restful_request(self, method='GET', headers=None, body=None, - token=None, content_type=None, **kwargs): + content_type=None, **kwargs): """Serializes/deserializes json/xml as request/response body. .. WARNING:: @@ -193,9 +184,6 @@ class RestfulTestCase(test.TestCase): # Initialize headers dictionary headers = {} if not headers else headers - if token is not None: - headers['X-Auth-Token'] = token - body = self._to_content_type(body, headers, content_type) # Perform the HTTP request/response @@ -205,32 +193,26 @@ class RestfulTestCase(test.TestCase): self._from_content_type(response, content_type) # we can save some code & improve coverage by always doing this - if method != 'HEAD' and response.status >= 400: + if method != 'HEAD' and response.status_code >= 400: self.assertValidErrorResponse(response) # Contains the decoded response.body return response - def _get_port(self, server): - return server.socket_info['socket'][1] - - def _public_port(self): - return self._get_port(self.public_server) - - def _admin_port(self): - return self._get_port(self.admin_server) + def _request(self, convert=True, **kwargs): + if convert: + response = self.restful_request(**kwargs) + else: + response = self.request(**kwargs) - def public_request(self, port=None, **kwargs): - kwargs['port'] = port or self._public_port() - response = self.restful_request(**kwargs) self.assertValidResponseHeaders(response) return response - def admin_request(self, port=None, **kwargs): - kwargs['port'] = port or self._admin_port() - response = self.restful_request(**kwargs) - self.assertValidResponseHeaders(response) - return response + def public_request(self, **kwargs): + return self._request(app=self.public_app, **kwargs) + + def admin_request(self, **kwargs): + return self._request(app=self.admin_app, **kwargs) def get_scoped_token(self): """Convenience method so that we can test authenticated requests.""" @@ -597,12 +579,12 @@ class JsonTestCase(RestfulTestCase, CoreApiTests): def _get_token_id(self, r): """Applicable only to JSON.""" - return r.body['access']['token']['id'] + return r.result['access']['token']['id'] def assertValidErrorResponse(self, r): - self.assertIsNotNone(r.body.get('error')) - self.assertValidError(r.body['error']) - self.assertEqual(r.body['error']['code'], r.status) + self.assertIsNotNone(r.result.get('error')) + self.assertValidError(r.result['error']) + self.assertEqual(r.result['error']['code'], r.status_code) def assertValidExtension(self, extension): super(JsonTestCase, self).assertValidExtension(extension) @@ -614,42 +596,42 @@ class JsonTestCase(RestfulTestCase, CoreApiTests): self.assertValidExtensionLink(link) def assertValidExtensionListResponse(self, r): - self.assertIsNotNone(r.body.get('extensions')) - self.assertIsNotNone(r.body['extensions'].get('values')) - self.assertNotEmpty(r.body['extensions'].get('values')) - for extension in r.body['extensions']['values']: + self.assertIsNotNone(r.result.get('extensions')) + self.assertIsNotNone(r.result['extensions'].get('values')) + self.assertNotEmpty(r.result['extensions'].get('values')) + for extension in r.result['extensions']['values']: self.assertValidExtension(extension) def assertValidExtensionResponse(self, r): - self.assertValidExtension(r.body.get('extension')) + self.assertValidExtension(r.result.get('extension')) def assertValidAuthenticationResponse(self, r, require_service_catalog=False): - self.assertIsNotNone(r.body.get('access')) - self.assertIsNotNone(r.body['access'].get('token')) - self.assertIsNotNone(r.body['access'].get('user')) + self.assertIsNotNone(r.result.get('access')) + self.assertIsNotNone(r.result['access'].get('token')) + self.assertIsNotNone(r.result['access'].get('user')) # validate token - self.assertIsNotNone(r.body['access']['token'].get('id')) - self.assertIsNotNone(r.body['access']['token'].get('expires')) - tenant = r.body['access']['token'].get('tenant') + self.assertIsNotNone(r.result['access']['token'].get('id')) + self.assertIsNotNone(r.result['access']['token'].get('expires')) + tenant = r.result['access']['token'].get('tenant') if tenant is not None: # validate tenant self.assertIsNotNone(tenant.get('id')) self.assertIsNotNone(tenant.get('name')) # validate user - self.assertIsNotNone(r.body['access']['user'].get('id')) - self.assertIsNotNone(r.body['access']['user'].get('name')) + self.assertIsNotNone(r.result['access']['user'].get('id')) + self.assertIsNotNone(r.result['access']['user'].get('name')) if require_service_catalog: # roles are only provided with a service catalog - roles = r.body['access']['user'].get('roles') + roles = r.result['access']['user'].get('roles') self.assertNotEmpty(roles) for role in roles: self.assertIsNotNone(role.get('name')) - serviceCatalog = r.body['access'].get('serviceCatalog') + serviceCatalog = r.result['access'].get('serviceCatalog') # validate service catalog if require_service_catalog: self.assertIsNotNone(serviceCatalog) @@ -657,7 +639,7 @@ class JsonTestCase(RestfulTestCase, CoreApiTests): self.assertTrue(isinstance(serviceCatalog, list)) if require_service_catalog: self.assertNotEmpty(serviceCatalog) - for service in r.body['access']['serviceCatalog']: + for service in r.result['access']['serviceCatalog']: # validate service self.assertIsNotNone(service.get('name')) self.assertIsNotNone(service.get('type')) @@ -670,25 +652,25 @@ class JsonTestCase(RestfulTestCase, CoreApiTests): self.assertIsNotNone(endpoint.get('publicURL')) def assertValidTenantListResponse(self, r): - self.assertIsNotNone(r.body.get('tenants')) - self.assertNotEmpty(r.body['tenants']) - for tenant in r.body['tenants']: + self.assertIsNotNone(r.result.get('tenants')) + self.assertNotEmpty(r.result['tenants']) + for tenant in r.result['tenants']: self.assertValidTenant(tenant) self.assertIsNotNone(tenant.get('enabled')) self.assertIn(tenant.get('enabled'), [True, False]) def assertValidUserResponse(self, r): - self.assertIsNotNone(r.body.get('user')) - self.assertValidUser(r.body['user']) + self.assertIsNotNone(r.result.get('user')) + self.assertValidUser(r.result['user']) def assertValidTenantResponse(self, r): - self.assertIsNotNone(r.body.get('tenant')) - self.assertValidTenant(r.body['tenant']) + self.assertIsNotNone(r.result.get('tenant')) + self.assertValidTenant(r.result['tenant']) def assertValidRoleListResponse(self, r): - self.assertIsNotNone(r.body.get('roles')) - self.assertNotEmpty(r.body['roles']) - for role in r.body['roles']: + self.assertIsNotNone(r.result.get('roles')) + self.assertNotEmpty(r.result['roles']) + for role in r.result['roles']: self.assertValidRole(role) def assertValidVersion(self, version): @@ -707,19 +689,19 @@ class JsonTestCase(RestfulTestCase, CoreApiTests): self.assertIsNotNone(media.get('type')) def assertValidMultipleChoiceResponse(self, r): - self.assertIsNotNone(r.body.get('versions')) - self.assertIsNotNone(r.body['versions'].get('values')) - self.assertNotEmpty(r.body['versions']['values']) - for version in r.body['versions']['values']: + self.assertIsNotNone(r.result.get('versions')) + self.assertIsNotNone(r.result['versions'].get('values')) + self.assertNotEmpty(r.result['versions']['values']) + for version in r.result['versions']['values']: self.assertValidVersion(version) def assertValidVersionResponse(self, r): - self.assertValidVersion(r.body.get('version')) + self.assertValidVersion(r.result.get('version')) def assertValidEndpointListResponse(self, r): - self.assertIsNotNone(r.body.get('endpoints')) - self.assertNotEmpty(r.body['endpoints']) - for endpoint in r.body['endpoints']: + self.assertIsNotNone(r.result.get('endpoints')) + self.assertNotEmpty(r.result['endpoints']) + for endpoint in r.result['endpoints']: self.assertIsNotNone(endpoint.get('id')) self.assertIsNotNone(endpoint.get('name')) self.assertIsNotNone(endpoint.get('type')) @@ -778,16 +760,15 @@ class JsonTestCase(RestfulTestCase, CoreApiTests): def test_fetch_revocation_list_admin_200(self): token = self.get_scoped_token() - r = self.restful_request( + r = self.admin_request( method='GET', path='/v2.0/tokens/revoked', token=token, - expected_status=200, - port=self._admin_port()) + expected_status=200) self.assertValidRevocationListResponse(r) def assertValidRevocationListResponse(self, response): - self.assertIsNotNone(response.body['signed']) + self.assertIsNotNone(response.result['signed']) def test_create_update_user_json_invalid_enabled_type(self): # Enforce usage of boolean for 'enabled' field in JSON @@ -831,18 +812,18 @@ class XmlTestCase(RestfulTestCase, CoreApiTests): content_type = 'xml' def _get_token_id(self, r): - return r.body.find(self._tag('token')).get('id') + return r.result.find(self._tag('token')).get('id') def _tag(self, tag_name, xmlns=None): """Helper method to build an namespaced element name.""" return '{%(ns)s}%(tag)s' % {'ns': xmlns or self.xmlns, 'tag': tag_name} def assertValidErrorResponse(self, r): - xml = r.body + xml = r.result self.assertEqual(xml.tag, self._tag('error')) self.assertValidError(xml) - self.assertEqual(xml.get('code'), str(r.status)) + self.assertEqual(xml.get('code'), str(r.status_code)) def assertValidExtension(self, extension): super(XmlTestCase, self).assertValidExtension(extension) @@ -855,7 +836,7 @@ class XmlTestCase(RestfulTestCase, CoreApiTests): self.assertValidExtensionLink(link) def assertValidExtensionListResponse(self, r): - xml = r.body + xml = r.result self.assertEqual(xml.tag, self._tag('extensions')) self.assertNotEmpty(xml.findall(self._tag('extension'))) @@ -863,7 +844,7 @@ class XmlTestCase(RestfulTestCase, CoreApiTests): self.assertValidExtension(extension) def assertValidExtensionResponse(self, r): - xml = r.body + xml = r.result self.assertEqual(xml.tag, self._tag('extension')) self.assertValidExtension(xml) @@ -886,7 +867,7 @@ class XmlTestCase(RestfulTestCase, CoreApiTests): self.assertIsNotNone(media.get('type')) def assertValidMultipleChoiceResponse(self, r): - xml = r.body + xml = r.result self.assertEqual(xml.tag, self._tag('versions')) self.assertNotEmpty(xml.findall(self._tag('version'))) @@ -894,13 +875,13 @@ class XmlTestCase(RestfulTestCase, CoreApiTests): self.assertValidVersion(version) def assertValidVersionResponse(self, r): - xml = r.body + xml = r.result self.assertEqual(xml.tag, self._tag('version')) self.assertValidVersion(xml) def assertValidEndpointListResponse(self, r): - xml = r.body + xml = r.result self.assertEqual(xml.tag, self._tag('endpoints')) self.assertNotEmpty(xml.findall(self._tag('endpoint'))) @@ -913,28 +894,28 @@ class XmlTestCase(RestfulTestCase, CoreApiTests): self.assertIsNotNone(endpoint.get('adminURL')) def assertValidTenantResponse(self, r): - xml = r.body + xml = r.result self.assertEqual(xml.tag, self._tag('tenant')) self.assertValidTenant(xml) def assertValidUserResponse(self, r): - xml = r.body + xml = r.result self.assertEqual(xml.tag, self._tag('user')) self.assertValidUser(xml) def assertValidRoleListResponse(self, r): - xml = r.body + xml = r.result self.assertEqual(xml.tag, self._tag('roles')) - self.assertNotEmpty(r.body.findall(self._tag('role'))) - for role in r.body.findall(self._tag('role')): + self.assertNotEmpty(r.result.findall(self._tag('role'))) + for role in r.result.findall(self._tag('role')): self.assertValidRole(role) def assertValidAuthenticationResponse(self, r, require_service_catalog=False): - xml = r.body + xml = r.result self.assertEqual(xml.tag, self._tag('access')) # validate token @@ -981,18 +962,17 @@ class XmlTestCase(RestfulTestCase, CoreApiTests): self.assertIsNotNone(endpoint.get('publicURL')) def assertValidTenantListResponse(self, r): - xml = r.body + xml = r.result self.assertEqual(xml.tag, self._tag('tenants')) - self.assertNotEmpty(r.body) - for tenant in r.body.findall(self._tag('tenant')): + self.assertNotEmpty(r.result) + for tenant in r.result.findall(self._tag('tenant')): self.assertValidTenant(tenant) self.assertIn(tenant.get('enabled'), ['true', 'false']) def test_authenticate_with_invalid_xml_in_password(self): # public_request would auto escape the ampersand - self.request( - port=self._public_port(), + self.public_request( method='POST', path='/v2.0/tokens', headers={ @@ -1005,15 +985,15 @@ class XmlTestCase(RestfulTestCase, CoreApiTests): <passwordCredentials username="FOO" password="&"/> </auth> """, - expected_status=400) + expected_status=400, + convert=False) def test_add_tenant_xml(self): """ verify create a tenant without providing description field """ token = self.get_scoped_token() - r = self.request( - port=self._admin_port(), + r = self.admin_request( method='POST', path='/v2.0/tenants', headers={ @@ -1026,19 +1006,19 @@ class XmlTestCase(RestfulTestCase, CoreApiTests): enabled="true" name="ACME Corp"> <description></description> </tenant> - """) + """, + convert=False) self._from_content_type(r, 'json') - self.assertIsNotNone(r.body.get('tenant')) - self.assertValidTenant(r.body['tenant']) - self.assertEqual(r.body['tenant'].get('description'), "") + self.assertIsNotNone(r.result.get('tenant')) + self.assertValidTenant(r.result['tenant']) + self.assertEqual(r.result['tenant'].get('description'), "") def test_add_tenant_json(self): """ verify create a tenant without providing description field """ token = self.get_scoped_token() - r = self.request( - port=self._admin_port(), + r = self.admin_request( method='POST', path='/v2.0/tenants', headers={ @@ -1051,8 +1031,9 @@ class XmlTestCase(RestfulTestCase, CoreApiTests): "description":"", "enabled":"true"} } - """) + """, + convert=False) self._from_content_type(r, 'json') - self.assertIsNotNone(r.body.get('tenant')) - self.assertValidTenant(r.body['tenant']) - self.assertEqual(r.body['tenant'].get('description'), "") + self.assertIsNotNone(r.result.get('tenant')) + self.assertValidTenant(r.result['tenant']) + self.assertEqual(r.result['tenant'].get('description'), "") |