summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJamie Lennox <jlennox@redhat.com>2013-05-14 10:43:42 +1000
committerJamie Lennox <jlennox@redhat.com>2013-05-22 09:18:26 +1000
commit09a5c436c5596dd7e767b1c1c89279b8b697eec1 (patch)
treee07e13854d63938b18238b63abf045de08297934
parentfe57d6de5b82a8b2ae07bfb136a7b17183c04cbb (diff)
downloadkeystone-09a5c436c5596dd7e767b1c1c89279b8b697eec1.tar.gz
keystone-09a5c436c5596dd7e767b1c1c89279b8b697eec1.tar.xz
keystone-09a5c436c5596dd7e767b1c1c89279b8b697eec1.zip
Use webtest for v2 and v3 API testing.
The intention is to remain as close as possible to the original implementation and so leaves a number of easy cleanups and optimisations until a later patch. In writing tests their are a number of changes for API tests that are a result of webob/webtest restrictions: * response.body is now the string body and response.result is the parsed dictionary. * response.status is now a string eg. '200 OK', use response.status_code to get the integer * response.getheader no longer exists. response.headers is a dictionary like object that can be accessed (case independently) with [] or .get() Working towards: blueprint extract-eventlet Change-Id: I393b4bad2fd6eacc0b8ae98fc204d1323014b5e4
-rw-r--r--tests/test_catalog.py14
-rw-r--r--tests/test_content_types.py251
-rw-r--r--tests/test_v3.py32
-rw-r--r--tests/test_v3_auth.py134
-rw-r--r--tests/test_v3_catalog.py4
-rw-r--r--tests/test_v3_identity.py20
-rw-r--r--tests/test_v3_protection.py21
7 files changed, 232 insertions, 244 deletions
diff --git a/tests/test_catalog.py b/tests/test_catalog.py
index 8a46be80..3c00b1e8 100644
--- a/tests/test_catalog.py
+++ b/tests/test_catalog.py
@@ -31,10 +31,10 @@ class V2CatalogTestCase(test_content_types.RestfulTestCase):
def _get_token_id(self, r):
"""Applicable only to JSON."""
- return r.body['access']['token']['id']
+ return r.result['access']['token']['id']
def assertValidErrorResponse(self, response):
- self.assertEqual(response.status, 400)
+ self.assertEqual(response.status_code, 400)
def _endpoint_create(self, expected_status=200, missing_param=None):
path = '/v2.0/endpoints'
@@ -56,20 +56,20 @@ class V2CatalogTestCase(test_content_types.RestfulTestCase):
def test_endpoint_create(self):
req_body, response = self._endpoint_create(expected_status=200)
- self.assertTrue('endpoint' in response.body)
- self.assertTrue('id' in response.body['endpoint'])
+ self.assertTrue('endpoint' in response.result)
+ self.assertTrue('id' in response.result['endpoint'])
for field, value in req_body['endpoint'].iteritems():
- self.assertEqual(response.body['endpoint'][field], value)
+ self.assertEqual(response.result['endpoint'][field], value)
def test_endpoint_create_with_missing_adminurl(self):
req_body, response = self._endpoint_create(expected_status=200,
missing_param='adminurl')
- self.assertEqual(response.status, 200)
+ self.assertEqual(response.status_code, 200)
def test_endpoint_create_with_missing_internalurl(self):
req_body, response = self._endpoint_create(expected_status=200,
missing_param='internalurl')
- self.assertEqual(response.status, 200)
+ self.assertEqual(response.status_code, 200)
def test_endpoint_create_with_missing_publicurl(self):
req_body, response = self._endpoint_create(expected_status=400,
diff --git a/tests/test_content_types.py b/tests/test_content_types.py
index 14dd1db0..73771569 100644
--- a/tests/test_content_types.py
+++ b/tests/test_content_types.py
@@ -14,11 +14,12 @@
# License for the specific language governing permissions and limitations
# under the License.
-import httplib
+import io
import uuid
from lxml import etree
import nose.exc
+import webtest
from keystone.common import serializer
from keystone.openstack.common import jsonutils
@@ -63,8 +64,10 @@ class RestfulTestCase(test.TestCase):
self.load_backends()
self.load_fixtures(default_fixtures)
- self.public_server = self.serveapp('keystone', name='main')
- self.admin_server = self.serveapp('keystone', name='admin')
+ self.public_app = webtest.TestApp(
+ self.loadapp('keystone', name='main'))
+ self.admin_app = webtest.TestApp(
+ self.loadapp('keystone', name='admin'))
# TODO(termie): is_admin is being deprecated once the policy stuff
# is all working
@@ -77,40 +80,31 @@ class RestfulTestCase(test.TestCase):
def tearDown(self):
"""Kill running servers and release references to avoid leaks."""
- self.public_server.kill()
- self.admin_server.kill()
- self.public_server = None
- self.admin_server = None
+ self.public_app = None
+ self.admin_app = None
super(RestfulTestCase, self).tearDown()
- def request(self, host='0.0.0.0', port=80, method='GET', path='/',
- headers=None, body=None, expected_status=None):
- """Perform request and fetch httplib.HTTPResponse from the server."""
-
- # Initialize headers dictionary
- headers = {} if not headers else headers
-
- connection = httplib.HTTPConnection(host, port, timeout=100000)
-
- # Perform the request
- connection.request(method, path, body, headers)
+ def request(self, app, path, body=None, headers=None, token=None,
+ expected_status=None, **kwargs):
+ if headers:
+ headers = dict([(str(k), str(v)) for k, v in headers.iteritems()])
+ else:
+ headers = {}
- # Retrieve the response so we can close the connection
- response = connection.getresponse()
+ if token:
+ headers['X-Auth-Token'] = str(token)
- response.body = response.read()
+ # setting body this way because of:
+ # https://github.com/Pylons/webtest/issues/71
+ if body:
+ kwargs['body_file'] = io.BytesIO(body)
- # Close the connection
- connection.close()
+ # sets environ['REMOTE_ADDR']
+ kwargs.setdefault('remote_addr', 'localhost')
- # Automatically assert HTTP status code
- if expected_status:
- self.assertResponseStatus(response, expected_status)
- else:
- self.assertResponseSuccessful(response)
- self.assertValidResponseHeaders(response)
+ response = app.request(path, headers=headers,
+ status=expected_status, **kwargs)
- # Contains the response headers, body, etc
return response
def assertResponseSuccessful(self, response):
@@ -124,7 +118,7 @@ class RestfulTestCase(test.TestCase):
self.assertResponseSuccessful(response, 203)
"""
self.assertTrue(
- response.status >= 200 and response.status <= 299,
+ response.status_code >= 200 and response.status_code <= 299,
'Status code %d is outside of the expected range (2xx)\n\n%s' %
(response.status, response.body))
@@ -139,14 +133,14 @@ class RestfulTestCase(test.TestCase):
self.assertResponseStatus(response, 203)
"""
self.assertEqual(
- response.status,
+ response.status_code,
expected_status,
'Status code %s is not %s, as expected)\n\n%s' %
- (response.status, expected_status, response.body))
+ (response.status_code, expected_status, response.body))
def assertValidResponseHeaders(self, response):
"""Ensures that response headers appear as expected."""
- self.assertIn('X-Auth-Token', response.getheader('Vary'))
+ self.assertIn('X-Auth-Token', response.headers.get('Vary'))
def _to_content_type(self, body, headers, content_type=None):
"""Attempt to encode JSON and XML automatically."""
@@ -167,21 +161,18 @@ class RestfulTestCase(test.TestCase):
"""Attempt to decode JSON and XML automatically, if detected."""
content_type = content_type or self.content_type
- # make the original response body available, for convenience
- response.raw = response.body
-
if response.body is not None and response.body.strip():
# if a body is provided, a Content-Type is also expected
- header = response.getheader('Content-Type', None)
+ header = response.headers.get('Content-Type', None)
self.assertIn(content_type, header)
if content_type == 'json':
- response.body = jsonutils.loads(response.body)
+ response.result = jsonutils.loads(response.body)
elif content_type == 'xml':
- response.body = etree.fromstring(response.body)
+ response.result = etree.fromstring(response.body)
def restful_request(self, method='GET', headers=None, body=None,
- token=None, content_type=None, **kwargs):
+ content_type=None, **kwargs):
"""Serializes/deserializes json/xml as request/response body.
.. WARNING::
@@ -193,9 +184,6 @@ class RestfulTestCase(test.TestCase):
# Initialize headers dictionary
headers = {} if not headers else headers
- if token is not None:
- headers['X-Auth-Token'] = token
-
body = self._to_content_type(body, headers, content_type)
# Perform the HTTP request/response
@@ -205,32 +193,26 @@ class RestfulTestCase(test.TestCase):
self._from_content_type(response, content_type)
# we can save some code & improve coverage by always doing this
- if method != 'HEAD' and response.status >= 400:
+ if method != 'HEAD' and response.status_code >= 400:
self.assertValidErrorResponse(response)
# Contains the decoded response.body
return response
- def _get_port(self, server):
- return server.socket_info['socket'][1]
-
- def _public_port(self):
- return self._get_port(self.public_server)
-
- def _admin_port(self):
- return self._get_port(self.admin_server)
+ def _request(self, convert=True, **kwargs):
+ if convert:
+ response = self.restful_request(**kwargs)
+ else:
+ response = self.request(**kwargs)
- def public_request(self, port=None, **kwargs):
- kwargs['port'] = port or self._public_port()
- response = self.restful_request(**kwargs)
self.assertValidResponseHeaders(response)
return response
- def admin_request(self, port=None, **kwargs):
- kwargs['port'] = port or self._admin_port()
- response = self.restful_request(**kwargs)
- self.assertValidResponseHeaders(response)
- return response
+ def public_request(self, **kwargs):
+ return self._request(app=self.public_app, **kwargs)
+
+ def admin_request(self, **kwargs):
+ return self._request(app=self.admin_app, **kwargs)
def get_scoped_token(self):
"""Convenience method so that we can test authenticated requests."""
@@ -597,12 +579,12 @@ class JsonTestCase(RestfulTestCase, CoreApiTests):
def _get_token_id(self, r):
"""Applicable only to JSON."""
- return r.body['access']['token']['id']
+ return r.result['access']['token']['id']
def assertValidErrorResponse(self, r):
- self.assertIsNotNone(r.body.get('error'))
- self.assertValidError(r.body['error'])
- self.assertEqual(r.body['error']['code'], r.status)
+ self.assertIsNotNone(r.result.get('error'))
+ self.assertValidError(r.result['error'])
+ self.assertEqual(r.result['error']['code'], r.status_code)
def assertValidExtension(self, extension):
super(JsonTestCase, self).assertValidExtension(extension)
@@ -614,42 +596,42 @@ class JsonTestCase(RestfulTestCase, CoreApiTests):
self.assertValidExtensionLink(link)
def assertValidExtensionListResponse(self, r):
- self.assertIsNotNone(r.body.get('extensions'))
- self.assertIsNotNone(r.body['extensions'].get('values'))
- self.assertNotEmpty(r.body['extensions'].get('values'))
- for extension in r.body['extensions']['values']:
+ self.assertIsNotNone(r.result.get('extensions'))
+ self.assertIsNotNone(r.result['extensions'].get('values'))
+ self.assertNotEmpty(r.result['extensions'].get('values'))
+ for extension in r.result['extensions']['values']:
self.assertValidExtension(extension)
def assertValidExtensionResponse(self, r):
- self.assertValidExtension(r.body.get('extension'))
+ self.assertValidExtension(r.result.get('extension'))
def assertValidAuthenticationResponse(self, r,
require_service_catalog=False):
- self.assertIsNotNone(r.body.get('access'))
- self.assertIsNotNone(r.body['access'].get('token'))
- self.assertIsNotNone(r.body['access'].get('user'))
+ self.assertIsNotNone(r.result.get('access'))
+ self.assertIsNotNone(r.result['access'].get('token'))
+ self.assertIsNotNone(r.result['access'].get('user'))
# validate token
- self.assertIsNotNone(r.body['access']['token'].get('id'))
- self.assertIsNotNone(r.body['access']['token'].get('expires'))
- tenant = r.body['access']['token'].get('tenant')
+ self.assertIsNotNone(r.result['access']['token'].get('id'))
+ self.assertIsNotNone(r.result['access']['token'].get('expires'))
+ tenant = r.result['access']['token'].get('tenant')
if tenant is not None:
# validate tenant
self.assertIsNotNone(tenant.get('id'))
self.assertIsNotNone(tenant.get('name'))
# validate user
- self.assertIsNotNone(r.body['access']['user'].get('id'))
- self.assertIsNotNone(r.body['access']['user'].get('name'))
+ self.assertIsNotNone(r.result['access']['user'].get('id'))
+ self.assertIsNotNone(r.result['access']['user'].get('name'))
if require_service_catalog:
# roles are only provided with a service catalog
- roles = r.body['access']['user'].get('roles')
+ roles = r.result['access']['user'].get('roles')
self.assertNotEmpty(roles)
for role in roles:
self.assertIsNotNone(role.get('name'))
- serviceCatalog = r.body['access'].get('serviceCatalog')
+ serviceCatalog = r.result['access'].get('serviceCatalog')
# validate service catalog
if require_service_catalog:
self.assertIsNotNone(serviceCatalog)
@@ -657,7 +639,7 @@ class JsonTestCase(RestfulTestCase, CoreApiTests):
self.assertTrue(isinstance(serviceCatalog, list))
if require_service_catalog:
self.assertNotEmpty(serviceCatalog)
- for service in r.body['access']['serviceCatalog']:
+ for service in r.result['access']['serviceCatalog']:
# validate service
self.assertIsNotNone(service.get('name'))
self.assertIsNotNone(service.get('type'))
@@ -670,25 +652,25 @@ class JsonTestCase(RestfulTestCase, CoreApiTests):
self.assertIsNotNone(endpoint.get('publicURL'))
def assertValidTenantListResponse(self, r):
- self.assertIsNotNone(r.body.get('tenants'))
- self.assertNotEmpty(r.body['tenants'])
- for tenant in r.body['tenants']:
+ self.assertIsNotNone(r.result.get('tenants'))
+ self.assertNotEmpty(r.result['tenants'])
+ for tenant in r.result['tenants']:
self.assertValidTenant(tenant)
self.assertIsNotNone(tenant.get('enabled'))
self.assertIn(tenant.get('enabled'), [True, False])
def assertValidUserResponse(self, r):
- self.assertIsNotNone(r.body.get('user'))
- self.assertValidUser(r.body['user'])
+ self.assertIsNotNone(r.result.get('user'))
+ self.assertValidUser(r.result['user'])
def assertValidTenantResponse(self, r):
- self.assertIsNotNone(r.body.get('tenant'))
- self.assertValidTenant(r.body['tenant'])
+ self.assertIsNotNone(r.result.get('tenant'))
+ self.assertValidTenant(r.result['tenant'])
def assertValidRoleListResponse(self, r):
- self.assertIsNotNone(r.body.get('roles'))
- self.assertNotEmpty(r.body['roles'])
- for role in r.body['roles']:
+ self.assertIsNotNone(r.result.get('roles'))
+ self.assertNotEmpty(r.result['roles'])
+ for role in r.result['roles']:
self.assertValidRole(role)
def assertValidVersion(self, version):
@@ -707,19 +689,19 @@ class JsonTestCase(RestfulTestCase, CoreApiTests):
self.assertIsNotNone(media.get('type'))
def assertValidMultipleChoiceResponse(self, r):
- self.assertIsNotNone(r.body.get('versions'))
- self.assertIsNotNone(r.body['versions'].get('values'))
- self.assertNotEmpty(r.body['versions']['values'])
- for version in r.body['versions']['values']:
+ self.assertIsNotNone(r.result.get('versions'))
+ self.assertIsNotNone(r.result['versions'].get('values'))
+ self.assertNotEmpty(r.result['versions']['values'])
+ for version in r.result['versions']['values']:
self.assertValidVersion(version)
def assertValidVersionResponse(self, r):
- self.assertValidVersion(r.body.get('version'))
+ self.assertValidVersion(r.result.get('version'))
def assertValidEndpointListResponse(self, r):
- self.assertIsNotNone(r.body.get('endpoints'))
- self.assertNotEmpty(r.body['endpoints'])
- for endpoint in r.body['endpoints']:
+ self.assertIsNotNone(r.result.get('endpoints'))
+ self.assertNotEmpty(r.result['endpoints'])
+ for endpoint in r.result['endpoints']:
self.assertIsNotNone(endpoint.get('id'))
self.assertIsNotNone(endpoint.get('name'))
self.assertIsNotNone(endpoint.get('type'))
@@ -778,16 +760,15 @@ class JsonTestCase(RestfulTestCase, CoreApiTests):
def test_fetch_revocation_list_admin_200(self):
token = self.get_scoped_token()
- r = self.restful_request(
+ r = self.admin_request(
method='GET',
path='/v2.0/tokens/revoked',
token=token,
- expected_status=200,
- port=self._admin_port())
+ expected_status=200)
self.assertValidRevocationListResponse(r)
def assertValidRevocationListResponse(self, response):
- self.assertIsNotNone(response.body['signed'])
+ self.assertIsNotNone(response.result['signed'])
def test_create_update_user_json_invalid_enabled_type(self):
# Enforce usage of boolean for 'enabled' field in JSON
@@ -831,18 +812,18 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
content_type = 'xml'
def _get_token_id(self, r):
- return r.body.find(self._tag('token')).get('id')
+ return r.result.find(self._tag('token')).get('id')
def _tag(self, tag_name, xmlns=None):
"""Helper method to build an namespaced element name."""
return '{%(ns)s}%(tag)s' % {'ns': xmlns or self.xmlns, 'tag': tag_name}
def assertValidErrorResponse(self, r):
- xml = r.body
+ xml = r.result
self.assertEqual(xml.tag, self._tag('error'))
self.assertValidError(xml)
- self.assertEqual(xml.get('code'), str(r.status))
+ self.assertEqual(xml.get('code'), str(r.status_code))
def assertValidExtension(self, extension):
super(XmlTestCase, self).assertValidExtension(extension)
@@ -855,7 +836,7 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
self.assertValidExtensionLink(link)
def assertValidExtensionListResponse(self, r):
- xml = r.body
+ xml = r.result
self.assertEqual(xml.tag, self._tag('extensions'))
self.assertNotEmpty(xml.findall(self._tag('extension')))
@@ -863,7 +844,7 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
self.assertValidExtension(extension)
def assertValidExtensionResponse(self, r):
- xml = r.body
+ xml = r.result
self.assertEqual(xml.tag, self._tag('extension'))
self.assertValidExtension(xml)
@@ -886,7 +867,7 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
self.assertIsNotNone(media.get('type'))
def assertValidMultipleChoiceResponse(self, r):
- xml = r.body
+ xml = r.result
self.assertEqual(xml.tag, self._tag('versions'))
self.assertNotEmpty(xml.findall(self._tag('version')))
@@ -894,13 +875,13 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
self.assertValidVersion(version)
def assertValidVersionResponse(self, r):
- xml = r.body
+ xml = r.result
self.assertEqual(xml.tag, self._tag('version'))
self.assertValidVersion(xml)
def assertValidEndpointListResponse(self, r):
- xml = r.body
+ xml = r.result
self.assertEqual(xml.tag, self._tag('endpoints'))
self.assertNotEmpty(xml.findall(self._tag('endpoint')))
@@ -913,28 +894,28 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
self.assertIsNotNone(endpoint.get('adminURL'))
def assertValidTenantResponse(self, r):
- xml = r.body
+ xml = r.result
self.assertEqual(xml.tag, self._tag('tenant'))
self.assertValidTenant(xml)
def assertValidUserResponse(self, r):
- xml = r.body
+ xml = r.result
self.assertEqual(xml.tag, self._tag('user'))
self.assertValidUser(xml)
def assertValidRoleListResponse(self, r):
- xml = r.body
+ xml = r.result
self.assertEqual(xml.tag, self._tag('roles'))
- self.assertNotEmpty(r.body.findall(self._tag('role')))
- for role in r.body.findall(self._tag('role')):
+ self.assertNotEmpty(r.result.findall(self._tag('role')))
+ for role in r.result.findall(self._tag('role')):
self.assertValidRole(role)
def assertValidAuthenticationResponse(self, r,
require_service_catalog=False):
- xml = r.body
+ xml = r.result
self.assertEqual(xml.tag, self._tag('access'))
# validate token
@@ -981,18 +962,17 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
self.assertIsNotNone(endpoint.get('publicURL'))
def assertValidTenantListResponse(self, r):
- xml = r.body
+ xml = r.result
self.assertEqual(xml.tag, self._tag('tenants'))
- self.assertNotEmpty(r.body)
- for tenant in r.body.findall(self._tag('tenant')):
+ self.assertNotEmpty(r.result)
+ for tenant in r.result.findall(self._tag('tenant')):
self.assertValidTenant(tenant)
self.assertIn(tenant.get('enabled'), ['true', 'false'])
def test_authenticate_with_invalid_xml_in_password(self):
# public_request would auto escape the ampersand
- self.request(
- port=self._public_port(),
+ self.public_request(
method='POST',
path='/v2.0/tokens',
headers={
@@ -1005,15 +985,15 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
<passwordCredentials username="FOO" password="&"/>
</auth>
""",
- expected_status=400)
+ expected_status=400,
+ convert=False)
def test_add_tenant_xml(self):
"""
verify create a tenant without providing description field
"""
token = self.get_scoped_token()
- r = self.request(
- port=self._admin_port(),
+ r = self.admin_request(
method='POST',
path='/v2.0/tenants',
headers={
@@ -1026,19 +1006,19 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
enabled="true" name="ACME Corp">
<description></description>
</tenant>
- """)
+ """,
+ convert=False)
self._from_content_type(r, 'json')
- self.assertIsNotNone(r.body.get('tenant'))
- self.assertValidTenant(r.body['tenant'])
- self.assertEqual(r.body['tenant'].get('description'), "")
+ self.assertIsNotNone(r.result.get('tenant'))
+ self.assertValidTenant(r.result['tenant'])
+ self.assertEqual(r.result['tenant'].get('description'), "")
def test_add_tenant_json(self):
"""
verify create a tenant without providing description field
"""
token = self.get_scoped_token()
- r = self.request(
- port=self._admin_port(),
+ r = self.admin_request(
method='POST',
path='/v2.0/tenants',
headers={
@@ -1051,8 +1031,9 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
"description":"",
"enabled":"true"}
}
- """)
+ """,
+ convert=False)
self._from_content_type(r, 'json')
- self.assertIsNotNone(r.body.get('tenant'))
- self.assertValidTenant(r.body['tenant'])
- self.assertEqual(r.body['tenant'].get('description'), "")
+ self.assertIsNotNone(r.result.get('tenant'))
+ self.assertValidTenant(r.result['tenant'])
+ self.assertEqual(r.result['tenant'].get('description'), "")
diff --git a/tests/test_v3.py b/tests/test_v3.py
index 0acfbf81..8373ae98 100644
--- a/tests/test_v3.py
+++ b/tests/test_v3.py
@@ -2,6 +2,7 @@ import datetime
import uuid
from lxml import etree
+import webtest
from keystone import auth
from keystone.common import serializer
@@ -38,6 +39,11 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
sql_util.setup_test_database()
self.load_backends()
+ self.public_app = webtest.TestApp(
+ self.loadapp('keystone', name='main'))
+ self.admin_app = webtest.TestApp(
+ self.loadapp('keystone', name='admin'))
+
if load_sample_data:
self.domain_id = uuid.uuid4().hex
self.domain = self.new_domain_ref()
@@ -204,8 +210,8 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
"""
r = super(RestfulTestCase, self).admin_request(*args, **kwargs)
- if r.getheader('Content-Type') == 'application/xml':
- r.body = serializer.from_xml(etree.tostring(r.body))
+ if r.headers.get('Content-Type') == 'application/xml':
+ r.result = serializer.from_xml(etree.tostring(r.result))
return r
def get_scoped_token(self):
@@ -234,7 +240,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
}
}
})
- return r.getheader('X-Subject-Token')
+ return r.headers.get('X-Subject-Token')
def get_requested_token(self, auth):
"""Request the specific token we want."""
@@ -243,7 +249,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
method='POST',
path='/v3/auth/tokens',
body=auth)
- return r.getheader('X-Subject-Token')
+ return r.headers.get('X-Subject-Token')
def v3_request(self, path, **kwargs):
# Check if the caller has passed in auth details for
@@ -296,15 +302,15 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
return r
def assertValidErrorResponse(self, r):
- if r.getheader('Content-Type') == 'application/xml':
- resp = serializer.from_xml(etree.tostring(r.body))
+ if r.headers.get('Content-Type') == 'application/xml':
+ resp = serializer.from_xml(etree.tostring(r.result))
else:
- resp = r.body
+ resp = r.result
self.assertIsNotNone(resp.get('error'))
self.assertIsNotNone(resp['error'].get('code'))
self.assertIsNotNone(resp['error'].get('title'))
self.assertIsNotNone(resp['error'].get('message'))
- self.assertEqual(int(resp['error']['code']), r.status)
+ self.assertEqual(int(resp['error']['code']), r.status_code)
def assertValidListLinks(self, links):
self.assertIsNotNone(links)
@@ -331,7 +337,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
response, and asserted to be equal.
"""
- entities = resp.body.get(key)
+ entities = resp.result.get(key)
self.assertIsNotNone(entities)
if expected_length is not None:
@@ -341,7 +347,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
self.assertNotEmpty(entities)
# collections should have relational links
- self.assertValidListLinks(resp.body.get('links'))
+ self.assertValidListLinks(resp.result.get('links'))
for entity in entities:
self.assertIsNotNone(entity)
@@ -356,7 +362,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
def assertValidResponse(self, resp, key, entity_validator, *args,
**kwargs):
"""Make assertions common to all API responses."""
- entity = resp.body.get(key)
+ entity = resp.result.get(key)
self.assertIsNotNone(entity)
self.assertValidEntity(entity, *args, **kwargs)
entity_validator(entity, *args, **kwargs)
@@ -397,8 +403,8 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
self.assertTrue(isinstance(dt, datetime.datetime))
def assertValidTokenResponse(self, r, user=None):
- self.assertTrue(r.getheader('X-Subject-Token'))
- token = r.body['token']
+ self.assertTrue(r.headers.get('X-Subject-Token'))
+ token = r.result['token']
self.assertIsNotNone(token.get('expires_at'))
expires_at = self.assertValidISO8601ExtendedFormatDatetime(
diff --git a/tests/test_v3_auth.py b/tests/test_v3_auth.py
index 997165ec..b52b26e0 100644
--- a/tests/test_v3_auth.py
+++ b/tests/test_v3_auth.py
@@ -104,9 +104,9 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
user_domain_id=self.domain_id,
password=self.user['password'])
resp = self.post('/auth/tokens', body=auth_data)
- self.token_data = resp.body
- self.token = resp.getheader('X-Subject-Token')
- self.headers = {'X-Subject-Token': resp.getheader('X-Subject-Token')}
+ self.token_data = resp.result
+ self.token = resp.headers.get('X-Subject-Token')
+ self.headers = {'X-Subject-Token': resp.headers.get('X-Subject-Token')}
def test_default_fixture_scope_token(self):
self.assertIsNotNone(self.get_scoped_token())
@@ -117,8 +117,8 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
user_id=self.user['id'],
password=self.user['password'])
resp = self.post('/auth/tokens', body=auth_data)
- token_data = resp.body
- token_id = resp.getheader('X-Subject-Token')
+ token_data = resp.result
+ token_id = resp.headers.get('X-Subject-Token')
self.assertIn('expires_at', token_data['token'])
token_signed = cms.cms_sign_token(json.dumps(token_data),
CONF.signing.certfile,
@@ -131,8 +131,8 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
user_id=self.user['id'],
password=self.user['password'])
resp = self.post('/auth/tokens', body=auth_data)
- token_data = resp.body
- token = resp.getheader('X-Subject-Token')
+ token_data = resp.result
+ token = resp.headers.get('X-Subject-Token')
# now validate the v3 token with v2 API
path = '/v2.0/tokens/%s' % (token)
@@ -152,8 +152,8 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
password=self.user['password'],
domain_id=self.domain['id'])
resp = self.post('/auth/tokens', body=auth_data)
- token_data = resp.body
- token = resp.getheader('X-Subject-Token')
+ token_data = resp.result
+ token = resp.headers.get('X-Subject-Token')
# now validate the v3 token with v2 API
path = '/v2.0/tokens/%s' % (token)
@@ -168,8 +168,8 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
password=self.default_domain_user['password'],
project_id=self.project['id'])
resp = self.post('/auth/tokens', body=auth_data)
- token_data = resp.body
- token = resp.getheader('X-Subject-Token')
+ token_data = resp.result
+ token = resp.headers.get('X-Subject-Token')
# now validate the v3 token with v2 API
path = '/v2.0/tokens/%s' % (token)
@@ -184,15 +184,15 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'])
resp = self.post('/auth/tokens', body=auth_data)
- token_data = resp.body
- token = resp.getheader('X-Subject-Token')
+ token_data = resp.result
+ token = resp.headers.get('X-Subject-Token')
# now validate the v3 token with v2 API
path = '/v2.0/tokens/%s' % (token)
resp = self.admin_request(path=path,
token='ADMIN',
method='GET')
- v2_token = resp.body
+ v2_token = resp.result
self.assertEqual(v2_token['access']['user']['id'],
token_data['token']['user']['id'])
# v2 token time has not fraction of second precision so
@@ -206,15 +206,15 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'])
resp = self.post('/auth/tokens', body=auth_data)
- token_data = resp.body
- token = resp.getheader('X-Subject-Token')
+ token_data = resp.result
+ token = resp.headers.get('X-Subject-Token')
# now validate the v3 token with v2 API
path = '/v2.0/tokens/%s' % (token)
resp = self.admin_request(path=path,
token='ADMIN',
method='GET')
- v2_token = resp.body
+ v2_token = resp.result
self.assertEqual(v2_token['access']['user']['id'],
token_data['token']['user']['id'])
# v2 token time has not fraction of second precision so
@@ -231,15 +231,15 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
password=self.default_domain_user['password'],
project_id=self.default_domain_project['id'])
resp = self.post('/auth/tokens', body=auth_data)
- token_data = resp.body
- token = resp.getheader('X-Subject-Token')
+ token_data = resp.result
+ token = resp.headers.get('X-Subject-Token')
# now validate the v3 token with v2 API
path = '/v2.0/tokens/%s' % (token)
resp = self.admin_request(path=path,
token='ADMIN',
method='GET')
- v2_token = resp.body
+ v2_token = resp.result
self.assertEqual(v2_token['access']['user']['id'],
token_data['token']['user']['id'])
# v2 token time has not fraction of second precision so
@@ -258,15 +258,15 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
password=self.default_domain_user['password'],
project_id=self.default_domain_project['id'])
resp = self.post('/auth/tokens', body=auth_data)
- token_data = resp.body
- token = resp.getheader('X-Subject-Token')
+ token_data = resp.result
+ token = resp.headers.get('X-Subject-Token')
# now validate the v3 token with v2 API
path = '/v2.0/tokens/%s' % (token)
resp = self.admin_request(path=path,
token='ADMIN',
method='GET')
- v2_token = resp.body
+ v2_token = resp.result
self.assertEqual(v2_token['access']['user']['id'],
token_data['token']['user']['id'])
# v2 token time has not fraction of second precision so
@@ -288,11 +288,11 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
resp = self.admin_request(path='/v2.0/tokens',
method='POST',
body=body)
- v2_token_data = resp.body
+ v2_token_data = resp.result
v2_token = v2_token_data['access']['token']['id']
headers = {'X-Subject-Token': v2_token}
resp = self.get('/auth/tokens', headers=headers)
- token_data = resp.body
+ token_data = resp.result
self.assertEqual(v2_token_data['access']['user']['id'],
token_data['token']['user']['id'])
# v2 token time has not fraction of second precision so
@@ -312,11 +312,11 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
resp = self.admin_request(path='/v2.0/tokens',
method='POST',
body=body)
- v2_token_data = resp.body
+ v2_token_data = resp.result
v2_token = v2_token_data['access']['token']['id']
headers = {'X-Subject-Token': v2_token}
resp = self.get('/auth/tokens', headers=headers)
- token_data = resp.body
+ token_data = resp.result
self.assertEqual(v2_token_data['access']['user']['id'],
token_data['token']['user']['id'])
# v2 token time has not fraction of second precision so
@@ -337,11 +337,11 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
resp = self.admin_request(path='/v2.0/tokens',
method='POST',
body=body)
- v2_token_data = resp.body
+ v2_token_data = resp.result
v2_token = v2_token_data['access']['token']['id']
headers = {'X-Subject-Token': v2_token}
resp = self.get('/auth/tokens', headers=headers)
- token_data = resp.body
+ token_data = resp.result
self.assertEqual(v2_token_data['access']['user']['id'],
token_data['token']['user']['id'])
# v2 token time has not fraction of second precision so
@@ -364,11 +364,11 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
resp = self.admin_request(path='/v2.0/tokens',
method='POST',
body=body)
- v2_token_data = resp.body
+ v2_token_data = resp.result
v2_token = v2_token_data['access']['token']['id']
headers = {'X-Subject-Token': v2_token}
resp = self.get('/auth/tokens', headers=headers)
- token_data = resp.body
+ token_data = resp.result
self.assertEqual(v2_token_data['access']['user']['id'],
token_data['token']['user']['id'])
# v2 token time has not fraction of second precision so
@@ -386,7 +386,7 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
r = self.post('/auth/tokens', body=auth_data)
self.assertValidProjectScopedTokenResponse(r)
# make sure expires stayed the same
- self.assertEqual(expires, r.body['token']['expires_at'])
+ self.assertEqual(expires, r.result['token']['expires_at'])
def test_check_token(self):
self.head('/auth/tokens', headers=self.headers, expected_status=204)
@@ -402,7 +402,7 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
# make sure we have a CRL
r = self.get('/auth/tokens/OS-PKI/revoked')
- self.assertIn('signed', r.body)
+ self.assertIn('signed', r.result)
class TestTokenRevoking(test_v3.RestfulTestCase):
@@ -517,7 +517,7 @@ class TestTokenRevoking(test_v3.RestfulTestCase):
password=self.user1['password'],
project_id=self.projectA['id'])
resp = self.post('/auth/tokens', body=auth_data)
- token = resp.getheader('X-Subject-Token')
+ token = resp.headers.get('X-Subject-Token')
# Confirm token is valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
@@ -548,7 +548,7 @@ class TestTokenRevoking(test_v3.RestfulTestCase):
password=self.user1['password'],
project_id=self.projectA['id'])
resp = self.post('/auth/tokens', body=auth_data)
- token = resp.getheader('X-Subject-Token')
+ token = resp.headers.get('X-Subject-Token')
# Confirm token is valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
@@ -583,19 +583,19 @@ class TestTokenRevoking(test_v3.RestfulTestCase):
password=self.user1['password'],
project_id=self.projectA['id'])
resp = self.post('/auth/tokens', body=auth_data)
- token1 = resp.getheader('X-Subject-Token')
+ token1 = resp.headers.get('X-Subject-Token')
auth_data = self.build_authentication_request(
user_id=self.user2['id'],
password=self.user2['password'],
project_id=self.projectA['id'])
resp = self.post('/auth/tokens', body=auth_data)
- token2 = resp.getheader('X-Subject-Token')
+ token2 = resp.headers.get('X-Subject-Token')
auth_data = self.build_authentication_request(
user_id=self.user3['id'],
password=self.user3['password'],
project_id=self.projectA['id'])
resp = self.post('/auth/tokens', body=auth_data)
- token3 = resp.getheader('X-Subject-Token')
+ token3 = resp.headers.get('X-Subject-Token')
# Confirm tokens are valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token1},
@@ -640,7 +640,7 @@ class TestTokenRevoking(test_v3.RestfulTestCase):
password=self.user1['password'],
project_id=self.projectA['id'])
resp = self.post('/auth/tokens', body=auth_data)
- token = resp.getheader('X-Subject-Token')
+ token = resp.headers.get('X-Subject-Token')
# Confirm token is valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
@@ -676,13 +676,13 @@ class TestTokenRevoking(test_v3.RestfulTestCase):
password=self.user1['password'],
project_id=self.projectA['id'])
resp = self.post('/auth/tokens', body=auth_data)
- token1 = resp.getheader('X-Subject-Token')
+ token1 = resp.headers.get('X-Subject-Token')
auth_data = self.build_authentication_request(
user_id=self.user2['id'],
password=self.user2['password'],
project_id=self.projectA['id'])
resp = self.post('/auth/tokens', body=auth_data)
- token2 = resp.getheader('X-Subject-Token')
+ token2 = resp.headers.get('X-Subject-Token')
# Confirm tokens are valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token1},
@@ -771,7 +771,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
password=self.user['password'])
r = self.post('/auth/tokens', body=auth_data)
self.assertValidProjectScopedTokenResponse(r)
- self.assertEqual(r.body['token']['project']['id'], project['id'])
+ self.assertEqual(r.result['token']['project']['id'], project['id'])
def test_default_project_id_scoped_token_with_user_id_401(self):
# create a second project to work with
@@ -950,8 +950,8 @@ class TestAuthJSON(test_v3.RestfulTestCase):
r = self.post('/auth/tokens', body=auth_data)
self.assertValidUnscopedTokenResponse(r)
- token = r.getheader('X-Subject-Token')
- headers = {'X-Subject-Token': r.getheader('X-Subject-Token')}
+ token = r.headers.get('X-Subject-Token')
+ headers = {'X-Subject-Token': r.headers.get('X-Subject-Token')}
# test token auth
auth_data = self.build_authentication_request(token=token)
@@ -1195,7 +1195,7 @@ class TestTrustAuth(TestAuthInfo):
self.assertValidProjectTrustScopedTokenResponse(
r, self.default_domain_user)
- token = r.getheader('X-Subject-Token')
+ token = r.headers.get('X-Subject-Token')
# now validate the v3 token with v2 API
path = '/v2.0/tokens/%s' % (token)
@@ -1219,7 +1219,7 @@ class TestTrustAuth(TestAuthInfo):
password=self.default_domain_user['password'],
project_id=self.default_domain_project_id)
r = self.post('/auth/tokens', body=auth_data)
- token = r.getheader('X-Subject-Token')
+ token = r.headers.get('X-Subject-Token')
r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=token)
trust = self.assertValidTrustResponse(r)
@@ -1231,7 +1231,7 @@ class TestTrustAuth(TestAuthInfo):
r = self.post('/auth/tokens', body=auth_data)
self.assertValidProjectTrustScopedTokenResponse(
r, self.trustee_user)
- token = r.getheader('X-Subject-Token')
+ token = r.headers.get('X-Subject-Token')
# now validate the v3 token with v2 API
path = '/v2.0/tokens/%s' % (token)
@@ -1261,7 +1261,7 @@ class TestTrustAuth(TestAuthInfo):
password=self.default_domain_user['password'],
project_id=self.default_domain_project_id)
r = self.post('/auth/tokens', body=auth_data)
- token = r.getheader('X-Subject-Token')
+ token = r.headers.get('X-Subject-Token')
r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=token)
trust = self.assertValidTrustResponse(r)
@@ -1273,7 +1273,7 @@ class TestTrustAuth(TestAuthInfo):
r = self.post('/auth/tokens', body=auth_data)
self.assertValidProjectTrustScopedTokenResponse(
r, trustee_user)
- token = r.getheader('X-Subject-Token')
+ token = r.headers.get('X-Subject-Token')
# now validate the v3 token with v2 API
path = '/v2.0/tokens/%s' % (token)
@@ -1302,7 +1302,7 @@ class TestTrustAuth(TestAuthInfo):
password=self.default_domain_user['password'],
project_id=self.default_domain_project_id)
r = self.post('/auth/tokens', body=auth_data)
- token = r.getheader('X-Subject-Token')
+ token = r.headers.get('X-Subject-Token')
r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=token)
trust = self.assertValidTrustResponse(r)
@@ -1314,7 +1314,7 @@ class TestTrustAuth(TestAuthInfo):
r = self.post('/auth/tokens', body=auth_data)
self.assertValidProjectTrustScopedTokenResponse(
r, trustee_user)
- token = r.getheader('X-Subject-Token')
+ token = r.headers.get('X-Subject-Token')
# now validate the v3 token with v2 API
path = '/v2.0/tokens/%s' % (token)
@@ -1342,16 +1342,17 @@ class TestTrustAuth(TestAuthInfo):
trust_id=trust['id'])
r = self.post('/auth/tokens', body=auth_data)
self.assertValidProjectTrustScopedTokenResponse(r, self.trustee_user)
- self.assertEqual(r.body['token']['user']['id'],
+ self.assertEqual(r.result['token']['user']['id'],
self.trustee_user['id'])
- self.assertEqual(r.body['token']['user']['name'],
+ self.assertEqual(r.result['token']['user']['name'],
self.trustee_user['name'])
- self.assertEqual(r.body['token']['user']['domain']['id'],
+ self.assertEqual(r.result['token']['user']['domain']['id'],
self.domain['id'])
- self.assertEqual(r.body['token']['user']['domain']['name'],
+ self.assertEqual(r.result['token']['user']['domain']['name'],
self.domain['name'])
- self.assertEqual(r.body['token']['project']['id'], self.project['id'])
- self.assertEqual(r.body['token']['project']['name'],
+ self.assertEqual(r.result['token']['project']['id'],
+ self.project['id'])
+ self.assertEqual(r.result['token']['project']['name'],
self.project['name'])
def test_exercise_trust_scoped_token_with_impersonation(self):
@@ -1373,14 +1374,15 @@ class TestTrustAuth(TestAuthInfo):
trust_id=trust['id'])
r = self.post('/auth/tokens', body=auth_data)
self.assertValidProjectTrustScopedTokenResponse(r, self.user)
- self.assertEqual(r.body['token']['user']['id'], self.user['id'])
- self.assertEqual(r.body['token']['user']['name'], self.user['name'])
- self.assertEqual(r.body['token']['user']['domain']['id'],
+ self.assertEqual(r.result['token']['user']['id'], self.user['id'])
+ self.assertEqual(r.result['token']['user']['name'], self.user['name'])
+ self.assertEqual(r.result['token']['user']['domain']['id'],
self.domain['id'])
- self.assertEqual(r.body['token']['user']['domain']['name'],
+ self.assertEqual(r.result['token']['user']['domain']['name'],
self.domain['name'])
- self.assertEqual(r.body['token']['project']['id'], self.project['id'])
- self.assertEqual(r.body['token']['project']['name'],
+ self.assertEqual(r.result['token']['project']['id'],
+ self.project['id'])
+ self.assertEqual(r.result['token']['project']['name'],
self.project['name'])
def test_delete_trust(self):
@@ -1431,12 +1433,12 @@ class TestTrustAuth(TestAuthInfo):
r = self.get('/OS-TRUST/trusts?trustor_user_id=%s' %
self.user_id, expected_status=200)
- trusts = r.body['trusts']
+ trusts = r.result['trusts']
self.assertEqual(len(trusts), 3)
r = self.get('/OS-TRUST/trusts?trustee_user_id=%s' %
self.user_id, expected_status=200)
- trusts = r.body['trusts']
+ trusts = r.result['trusts']
self.assertEqual(len(trusts), 0)
def test_change_password_invalidates_trust_tokens(self):
@@ -1459,7 +1461,7 @@ class TestTrustAuth(TestAuthInfo):
r = self.post('/auth/tokens', body=auth_data)
self.assertValidProjectTrustScopedTokenResponse(r, self.user)
- trust_token = r.getheader('X-Subject-Token')
+ trust_token = r.headers.get('X-Subject-Token')
self.get('/OS-TRUST/trusts?trustor_user_id=%s' %
self.user_id, expected_status=200,
diff --git a/tests/test_v3_catalog.py b/tests/test_v3_catalog.py
index e81308fc..91e20454 100644
--- a/tests/test_v3_catalog.py
+++ b/tests/test_v3_catalog.py
@@ -84,7 +84,7 @@ class CatalogTestCase(test_v3.RestfulTestCase):
self.assertValidEndpointResponse(r, ref)
def assertValidErrorResponse(self, response):
- self.assertTrue(response.status in [400])
+ self.assertTrue(response.status_code in [400])
def test_create_endpoint_400(self):
"""POST /endpoints"""
@@ -135,7 +135,7 @@ class CatalogTestCase(test_v3.RestfulTestCase):
path='/v2.0/endpoints',
token=self.get_scoped_token(),
body={'endpoint': ref})
- endpoint_v2 = r.body['endpoint']
+ endpoint_v2 = r.result['endpoint']
# test the endpoint on v3
r = self.get('/endpoints')
diff --git a/tests/test_v3_identity.py b/tests/test_v3_identity.py
index 2e7ac242..95d48a20 100644
--- a/tests/test_v3_identity.py
+++ b/tests/test_v3_identity.py
@@ -402,7 +402,7 @@ class IdentityTestCase(test_v3.RestfulTestCase):
'group_id': self.group_id})
self.assertValidUserListResponse(r, ref=self.user)
self.assertIn('/groups/%(group_id)s/users' % {
- 'group_id': self.group_id}, r.body['links']['self'])
+ 'group_id': self.group_id}, r.result['links']['self'])
def test_remove_user_from_group(self):
"""DELETE /groups/{group_id}/users/{user_id}"""
@@ -450,7 +450,7 @@ class IdentityTestCase(test_v3.RestfulTestCase):
password=self.user['password'],
project_id=self.project['id'])
resp = self.post('/auth/tokens', body=auth_data)
- token = resp.getheader('X-Subject-Token')
+ token = resp.headers.get('X-Subject-Token')
# Confirm token is valid for now
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
@@ -565,14 +565,14 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.head(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, ref=self.role)
- self.assertIn(collection_url, r.body['links']['self'])
+ self.assertIn(collection_url, r.result['links']['self'])
# FIXME(gyee): this test is no longer valid as user
# have no role in the project. Can't get a scoped token
#self.delete(member_url)
#r = self.get(collection_url)
#self.assertValidRoleListResponse(r, expected_length=0)
- #self.assertIn(collection_url, r.body['links']['self'])
+ #self.assertIn(collection_url, r.result['links']['self'])
def test_crud_user_domain_role_grants(self):
collection_url = (
@@ -587,12 +587,12 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.head(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, ref=self.role)
- self.assertIn(collection_url, r.body['links']['self'])
+ self.assertIn(collection_url, r.result['links']['self'])
self.delete(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, expected_length=0)
- self.assertIn(collection_url, r.body['links']['self'])
+ self.assertIn(collection_url, r.result['links']['self'])
def test_crud_group_project_role_grants(self):
collection_url = (
@@ -607,12 +607,12 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.head(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, ref=self.role)
- self.assertIn(collection_url, r.body['links']['self'])
+ self.assertIn(collection_url, r.result['links']['self'])
self.delete(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, expected_length=0)
- self.assertIn(collection_url, r.body['links']['self'])
+ self.assertIn(collection_url, r.result['links']['self'])
def test_crud_group_domain_role_grants(self):
collection_url = (
@@ -627,9 +627,9 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.head(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, ref=self.role)
- self.assertIn(collection_url, r.body['links']['self'])
+ self.assertIn(collection_url, r.result['links']['self'])
self.delete(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, expected_length=0)
- self.assertIn(collection_url, r.body['links']['self'])
+ self.assertIn(collection_url, r.result['links']['self'])
diff --git a/tests/test_v3_protection.py b/tests/test_v3_protection.py
index 0bf26eb8..246b56dd 100644
--- a/tests/test_v3_protection.py
+++ b/tests/test_v3_protection.py
@@ -142,7 +142,7 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
"""
self._set_policy({"identity:list_users": []})
r = self.get('/users', auth=self.auth)
- id_list = self._get_id_list_from_ref_list(r.body.get('users'))
+ id_list = self._get_id_list_from_ref_list(r.result.get('users'))
self.assertIn(self.user1['id'], id_list)
self.assertIn(self.user2['id'], id_list)
self.assertIn(self.user3['id'], id_list)
@@ -160,7 +160,7 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
url_by_name = '/users?domain_id=%s' % self.domainB['id']
r = self.get(url_by_name, auth=self.auth)
# We should get back two users, those in DomainB
- id_list = self._get_id_list_from_ref_list(r.body.get('users'))
+ id_list = self._get_id_list_from_ref_list(r.result.get('users'))
self.assertIn(self.user2['id'], id_list)
self.assertIn(self.user3['id'], id_list)
@@ -181,8 +181,7 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
self._set_policy(new_policy)
url_by_name = '/users/%s' % self.user1['id']
r = self.get(url_by_name, auth=self.auth)
- body = r.body
- self.assertEquals(self.user1['id'], body['user']['id'])
+ self.assertEquals(self.user1['id'], r.result['user']['id'])
def test_list_users_protected_by_domain(self):
"""GET /users?domain_id=mydomain (protected)
@@ -205,7 +204,7 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
url_by_name = '/users?domain_id=%s' % self.domainA['id']
r = self.get(url_by_name, auth=self.auth)
# We should only get back one user, the one in DomainA
- id_list = self._get_id_list_from_ref_list(r.body.get('users'))
+ id_list = self._get_id_list_from_ref_list(r.result.get('users'))
self.assertEqual(len(id_list), 1)
self.assertIn(self.user1['id'], id_list)
@@ -234,7 +233,7 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
url_by_name = '/groups?domain_id=%s' % self.domainA['id']
r = self.get(url_by_name, auth=self.auth)
# We should only get back two groups, the ones in DomainA
- id_list = self._get_id_list_from_ref_list(r.body.get('groups'))
+ id_list = self._get_id_list_from_ref_list(r.result.get('groups'))
self.assertEqual(len(id_list), 2)
self.assertIn(self.group1['id'], id_list)
self.assertIn(self.group2['id'], id_list)
@@ -266,7 +265,7 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
r = self.get(url_by_name, auth=self.auth)
# We should only get back one user, the one in DomainA that matches
# the name supplied
- id_list = self._get_id_list_from_ref_list(r.body.get('groups'))
+ id_list = self._get_id_list_from_ref_list(r.result.get('groups'))
self.assertEqual(len(id_list), 1)
self.assertIn(self.group2['id'], id_list)
@@ -285,21 +284,21 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
new_policy = {"identity:list_domains": []}
self._set_policy(new_policy)
r = self.get('/domains?enabled=0', auth=self.auth)
- id_list = self._get_id_list_from_ref_list(r.body.get('domains'))
+ id_list = self._get_id_list_from_ref_list(r.result.get('domains'))
self.assertEqual(len(id_list), 1)
self.assertIn(self.domainC['id'], id_list)
# Now try a few ways of specifying 'true' when we should get back
# the other two domains, plus the default domain
r = self.get('/domains?enabled=1', auth=self.auth)
- id_list = self._get_id_list_from_ref_list(r.body.get('domains'))
+ id_list = self._get_id_list_from_ref_list(r.result.get('domains'))
self.assertEqual(len(id_list), 3)
self.assertIn(self.domainA['id'], id_list)
self.assertIn(self.domainB['id'], id_list)
self.assertIn(DEFAULT_DOMAIN_ID, id_list)
r = self.get('/domains?enabled', auth=self.auth)
- id_list = self._get_id_list_from_ref_list(r.body.get('domains'))
+ id_list = self._get_id_list_from_ref_list(r.result.get('domains'))
self.assertEqual(len(id_list), 3)
self.assertIn(self.domainA['id'], id_list)
self.assertIn(self.domainB['id'], id_list)
@@ -319,6 +318,6 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
my_url = '/domains?enableds&name=%s' % self.domainA['name']
r = self.get(my_url, auth=self.auth)
- id_list = self._get_id_list_from_ref_list(r.body.get('domains'))
+ id_list = self._get_id_list_from_ref_list(r.result.get('domains'))
self.assertEqual(len(id_list), 1)
self.assertIn(self.domainA['id'], id_list)