diff options
Diffstat (limited to 'keystone/tests/test_content_types.py')
-rw-r--r-- | keystone/tests/test_content_types.py | 1104 |
1 files changed, 1104 insertions, 0 deletions
diff --git a/keystone/tests/test_content_types.py b/keystone/tests/test_content_types.py new file mode 100644 index 00000000..7c874732 --- /dev/null +++ b/keystone/tests/test_content_types.py @@ -0,0 +1,1104 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 OpenStack LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import io +import uuid + +from lxml import etree +import webtest + +from keystone.tests import core as test + +from keystone.common import extension +from keystone.common import serializer +from keystone.openstack.common import jsonutils + +import default_fixtures + + +class RestfulTestCase(test.TestCase): + """Performs restful tests against the WSGI app over HTTP. + + This class launches public & admin WSGI servers for every test, which can + be accessed by calling ``public_request()`` or ``admin_request()``, + respectfully. + + ``restful_request()`` and ``request()`` methods are also exposed if you + need to bypass restful conventions or access HTTP details in your test + implementation. + + Three new asserts are provided: + + * ``assertResponseSuccessful``: called automatically for every request + unless an ``expected_status`` is provided + * ``assertResponseStatus``: called instead of ``assertResponseSuccessful``, + if an ``expected_status`` is provided + * ``assertValidResponseHeaders``: validates that the response headers + appear as expected + + Requests are automatically serialized according to the defined + ``content_type``. Responses are automatically deserialized as well, and + available in the ``response.body`` attribute. The original body content is + available in the ``response.raw`` attribute. + + """ + + # default content type to test + content_type = 'json' + + def setUp(self): + super(RestfulTestCase, self).setUp() + + self.load_backends() + self.load_fixtures(default_fixtures) + + self.public_app = webtest.TestApp( + self.loadapp('keystone', name='main')) + self.admin_app = webtest.TestApp( + self.loadapp('keystone', name='admin')) + + # TODO(termie): add an admin user to the fixtures and use that user + # override the fixtures, for now + self.metadata_foobar = self.identity_api.add_role_to_user_and_project( + self.user_foo['id'], + self.tenant_bar['id'], + self.role_admin['id']) + + def tearDown(self): + """Kill running servers and release references to avoid leaks.""" + self.public_app = None + self.admin_app = None + super(RestfulTestCase, self).tearDown() + + def request(self, app, path, body=None, headers=None, token=None, + expected_status=None, **kwargs): + if headers: + headers = dict([(str(k), str(v)) for k, v in headers.iteritems()]) + else: + headers = {} + + if token: + headers['X-Auth-Token'] = str(token) + + # setting body this way because of: + # https://github.com/Pylons/webtest/issues/71 + if body: + kwargs['body_file'] = io.BytesIO(body) + + # sets environ['REMOTE_ADDR'] + kwargs.setdefault('remote_addr', 'localhost') + + response = app.request(path, headers=headers, + status=expected_status, **kwargs) + + return response + + def assertResponseSuccessful(self, response): + """Asserts that a status code lies inside the 2xx range. + + :param response: :py:class:`httplib.HTTPResponse` to be + verified to have a status code between 200 and 299. + + example:: + + self.assertResponseSuccessful(response, 203) + """ + self.assertTrue( + response.status_code >= 200 and response.status_code <= 299, + 'Status code %d is outside of the expected range (2xx)\n\n%s' % + (response.status, response.body)) + + def assertResponseStatus(self, response, expected_status): + """Asserts a specific status code on the response. + + :param response: :py:class:`httplib.HTTPResponse` + :param assert_status: The specific ``status`` result expected + + example:: + + self.assertResponseStatus(response, 203) + """ + self.assertEqual( + response.status_code, + expected_status, + 'Status code %s is not %s, as expected)\n\n%s' % + (response.status_code, expected_status, response.body)) + + def assertValidResponseHeaders(self, response): + """Ensures that response headers appear as expected.""" + self.assertIn('X-Auth-Token', response.headers.get('Vary')) + + def _to_content_type(self, body, headers, content_type=None): + """Attempt to encode JSON and XML automatically.""" + content_type = content_type or self.content_type + + if content_type == 'json': + headers['Accept'] = 'application/json' + if body: + headers['Content-Type'] = 'application/json' + return jsonutils.dumps(body) + elif content_type == 'xml': + headers['Accept'] = 'application/xml' + if body: + headers['Content-Type'] = 'application/xml' + return serializer.to_xml(body) + + def _from_content_type(self, response, content_type=None): + """Attempt to decode JSON and XML automatically, if detected.""" + content_type = content_type or self.content_type + + if response.body is not None and response.body.strip(): + # if a body is provided, a Content-Type is also expected + header = response.headers.get('Content-Type', None) + self.assertIn(content_type, header) + + if content_type == 'json': + response.result = jsonutils.loads(response.body) + elif content_type == 'xml': + response.result = etree.fromstring(response.body) + + def restful_request(self, method='GET', headers=None, body=None, + content_type=None, **kwargs): + """Serializes/deserializes json/xml as request/response body. + + .. WARNING:: + + * Existing Accept header will be overwritten. + * Existing Content-Type header will be overwritten. + + """ + # Initialize headers dictionary + headers = {} if not headers else headers + + body = self._to_content_type(body, headers, content_type) + + # Perform the HTTP request/response + response = self.request(method=method, headers=headers, body=body, + **kwargs) + + self._from_content_type(response, content_type) + + # we can save some code & improve coverage by always doing this + if method != 'HEAD' and response.status_code >= 400: + self.assertValidErrorResponse(response) + + # Contains the decoded response.body + return response + + def _request(self, convert=True, **kwargs): + if convert: + response = self.restful_request(**kwargs) + else: + response = self.request(**kwargs) + + self.assertValidResponseHeaders(response) + return response + + def public_request(self, **kwargs): + return self._request(app=self.public_app, **kwargs) + + def admin_request(self, **kwargs): + return self._request(app=self.admin_app, **kwargs) + + def _get_token(self, body): + """Convenience method so that we can test authenticated requests.""" + r = self.public_request(method='POST', path='/v2.0/tokens', body=body) + return self._get_token_id(r) + + def get_unscoped_token(self): + """Convenience method so that we can test authenticated requests.""" + return self._get_token({ + 'auth': { + 'passwordCredentials': { + 'username': self.user_foo['name'], + 'password': self.user_foo['password'], + }, + }, + }) + + def get_scoped_token(self, tenant_id=None): + """Convenience method so that we can test authenticated requests.""" + if not tenant_id: + tenant_id = self.tenant_bar['id'] + return self._get_token({ + 'auth': { + 'passwordCredentials': { + 'username': self.user_foo['name'], + 'password': self.user_foo['password'], + }, + 'tenantId': tenant_id, + }, + }) + + def _get_token_id(self, r): + """Helper method to return a token ID from a response. + + This needs to be overridden by child classes for on their content type. + + """ + raise NotImplementedError() + + +class CoreApiTests(object): + def assertValidError(self, error): + """Applicable to XML and JSON.""" + self.assertIsNotNone(error.get('code')) + self.assertIsNotNone(error.get('title')) + self.assertIsNotNone(error.get('message')) + + def assertValidVersion(self, version): + """Applicable to XML and JSON. + + However, navigating links and media-types differs between content + types so they need to be validated separately. + + """ + self.assertIsNotNone(version) + self.assertIsNotNone(version.get('id')) + self.assertIsNotNone(version.get('status')) + self.assertIsNotNone(version.get('updated')) + + def assertValidExtension(self, extension): + """Applicable to XML and JSON. + + However, navigating extension links differs between content types. + They need to be validated separately with assertValidExtensionLink. + + """ + self.assertIsNotNone(extension) + self.assertIsNotNone(extension.get('name')) + self.assertIsNotNone(extension.get('namespace')) + self.assertIsNotNone(extension.get('alias')) + self.assertIsNotNone(extension.get('updated')) + + def assertValidExtensionLink(self, link): + """Applicable to XML and JSON.""" + self.assertIsNotNone(link.get('rel')) + self.assertIsNotNone(link.get('type')) + self.assertIsNotNone(link.get('href')) + + def assertValidTenant(self, tenant): + """Applicable to XML and JSON.""" + self.assertIsNotNone(tenant.get('id')) + self.assertIsNotNone(tenant.get('name')) + + def assertValidUser(self, user): + """Applicable to XML and JSON.""" + self.assertIsNotNone(user.get('id')) + self.assertIsNotNone(user.get('name')) + + def assertValidRole(self, tenant): + """Applicable to XML and JSON.""" + self.assertIsNotNone(tenant.get('id')) + self.assertIsNotNone(tenant.get('name')) + + def test_public_not_found(self): + r = self.public_request( + path='/%s' % uuid.uuid4().hex, + expected_status=404) + self.assertValidErrorResponse(r) + + def test_admin_not_found(self): + r = self.admin_request( + path='/%s' % uuid.uuid4().hex, + expected_status=404) + self.assertValidErrorResponse(r) + + def test_public_multiple_choice(self): + r = self.public_request(path='/', expected_status=300) + self.assertValidMultipleChoiceResponse(r) + + def test_admin_multiple_choice(self): + r = self.admin_request(path='/', expected_status=300) + self.assertValidMultipleChoiceResponse(r) + + def test_public_version(self): + r = self.public_request(path='/v2.0/') + self.assertValidVersionResponse(r) + + def test_admin_version(self): + r = self.admin_request(path='/v2.0/') + self.assertValidVersionResponse(r) + + def test_public_extensions(self): + r = self.public_request(path='/v2.0/extensions') + self.assertValidExtensionListResponse(r, + extension.PUBLIC_EXTENSIONS) + + def test_admin_extensions(self): + r = self.admin_request(path='/v2.0/extensions') + self.assertValidExtensionListResponse(r, + extension.ADMIN_EXTENSIONS) + + def test_admin_extensions_404(self): + self.admin_request(path='/v2.0/extensions/invalid-extension', + expected_status=404) + + def test_public_osksadm_extension_404(self): + self.public_request(path='/v2.0/extensions/OS-KSADM', + expected_status=404) + + def test_admin_osksadm_extension(self): + r = self.admin_request(path='/v2.0/extensions/OS-KSADM') + self.assertValidExtensionResponse(r, + extension.ADMIN_EXTENSIONS) + + def test_authenticate(self): + r = self.public_request( + method='POST', + path='/v2.0/tokens', + body={ + 'auth': { + 'passwordCredentials': { + 'username': self.user_foo['name'], + 'password': self.user_foo['password'], + }, + 'tenantId': self.tenant_bar['id'], + }, + }, + expected_status=200) + self.assertValidAuthenticationResponse(r, require_service_catalog=True) + + def test_authenticate_unscoped(self): + r = self.public_request( + method='POST', + path='/v2.0/tokens', + body={ + 'auth': { + 'passwordCredentials': { + 'username': self.user_foo['name'], + 'password': self.user_foo['password'], + }, + }, + }, + expected_status=200) + self.assertValidAuthenticationResponse(r) + + def test_get_tenants_for_token(self): + r = self.public_request(path='/v2.0/tenants', + token=self.get_scoped_token()) + self.assertValidTenantListResponse(r) + + def test_validate_token(self): + token = self.get_scoped_token() + r = self.admin_request( + path='/v2.0/tokens/%(token_id)s' % { + 'token_id': token, + }, + token=token) + self.assertValidAuthenticationResponse(r) + + def test_validate_token_service_role(self): + self.metadata_foobar = self.identity_api.add_role_to_user_and_project( + self.user_foo['id'], + self.tenant_service['id'], + self.role_service['id']) + + token = self.get_scoped_token(tenant_id='service') + r = self.admin_request( + path='/v2.0/tokens/%s' % token, + token=token) + self.assertValidAuthenticationResponse(r) + + def test_validate_token_belongs_to(self): + token = self.get_scoped_token() + path = ('/v2.0/tokens/%s?belongsTo=%s' % (token, + self.tenant_bar['id'])) + r = self.admin_request(path=path, token=token) + self.assertValidAuthenticationResponse(r, require_service_catalog=True) + + def test_validate_token_no_belongs_to_still_returns_catalog(self): + token = self.get_scoped_token() + path = ('/v2.0/tokens/%s' % token) + r = self.admin_request(path=path, token=token) + self.assertValidAuthenticationResponse(r, require_service_catalog=True) + + def test_validate_token_head(self): + """The same call as above, except using HEAD. + + There's no response to validate here, but this is included for the + sake of completely covering the core API. + + """ + token = self.get_scoped_token() + self.admin_request( + method='HEAD', + path='/v2.0/tokens/%(token_id)s' % { + 'token_id': token, + }, + token=token, + expected_status=204) + + def test_endpoints(self): + token = self.get_scoped_token() + r = self.admin_request( + path='/v2.0/tokens/%(token_id)s/endpoints' % { + 'token_id': token, + }, + token=token) + self.assertValidEndpointListResponse(r) + + def test_get_tenant(self): + token = self.get_scoped_token() + r = self.admin_request( + path='/v2.0/tenants/%(tenant_id)s' % { + 'tenant_id': self.tenant_bar['id'], + }, + token=token) + self.assertValidTenantResponse(r) + + def test_get_tenant_by_name(self): + token = self.get_scoped_token() + r = self.admin_request( + path='/v2.0/tenants?name=%(tenant_name)s' % { + 'tenant_name': self.tenant_bar['name'], + }, + token=token) + self.assertValidTenantResponse(r) + + def test_get_user_roles(self): + self.skipTest('Blocked by bug 933565') + + token = self.get_scoped_token() + r = self.admin_request( + path='/v2.0/users/%(user_id)s/roles' % { + 'user_id': self.user_foo['id'], + }, + token=token) + self.assertValidRoleListResponse(r) + + def test_get_user_roles_with_tenant(self): + token = self.get_scoped_token() + r = self.admin_request( + path='/v2.0/tenants/%(tenant_id)s/users/%(user_id)s/roles' % { + 'tenant_id': self.tenant_bar['id'], + 'user_id': self.user_foo['id'], + }, + token=token) + self.assertValidRoleListResponse(r) + + def test_get_user(self): + token = self.get_scoped_token() + r = self.admin_request( + path='/v2.0/users/%(user_id)s' % { + 'user_id': self.user_foo['id'], + }, + token=token) + self.assertValidUserResponse(r) + + def test_get_user_by_name(self): + token = self.get_scoped_token() + r = self.admin_request( + path='/v2.0/users?name=%(user_name)s' % { + 'user_name': self.user_foo['name'], + }, + token=token) + self.assertValidUserResponse(r) + + def test_create_update_user_invalid_enabled_type(self): + # Enforce usage of boolean for 'enabled' field in JSON and XML + token = self.get_scoped_token() + + # Test CREATE request + r = self.admin_request( + method='POST', + path='/v2.0/users', + body={ + 'user': { + 'name': uuid.uuid4().hex, + 'password': uuid.uuid4().hex, + # In XML, only "true|false" are converted to boolean. + 'enabled': "False", + }, + }, + token=token, + expected_status=400) + self.assertValidErrorResponse(r) + + r = self.admin_request( + method='POST', + path='/v2.0/users', + body={ + 'user': { + 'name': uuid.uuid4().hex, + 'password': uuid.uuid4().hex, + # In JSON, 0|1 are not booleans + 'enabled': 0, + }, + }, + token=token, + expected_status=400) + self.assertValidErrorResponse(r) + + # Test UPDATE request + path = '/v2.0/users/%(user_id)s' % { + 'user_id': self.user_foo['id'], + } + + r = self.admin_request( + method='PUT', + path=path, + body={ + 'user': { + # In XML, only "true|false" are converted to boolean. + 'enabled': "False", + }, + }, + token=token, + expected_status=400) + self.assertValidErrorResponse(r) + + r = self.admin_request( + method='PUT', + path=path, + body={ + 'user': { + # In JSON, 0|1 are not booleans + 'enabled': 1, + }, + }, + token=token, + expected_status=400) + self.assertValidErrorResponse(r) + + def test_error_response(self): + """This triggers assertValidErrorResponse by convention.""" + self.public_request(path='/v2.0/tenants', expected_status=401) + + def test_invalid_parameter_error_response(self): + token = self.get_scoped_token() + bad_body = { + 'OS-KSADM:service%s' % uuid.uuid4().hex: { + 'name': uuid.uuid4().hex, + 'type': uuid.uuid4().hex, + }, + } + res = self.admin_request(method='POST', + path='/v2.0/OS-KSADM/services', + body=bad_body, + token=token, + expected_status=400) + self.assertValidErrorResponse(res) + res = self.admin_request(method='POST', + path='/v2.0/users', + body=bad_body, + token=token, + expected_status=400) + self.assertValidErrorResponse(res) + + +class JsonTestCase(RestfulTestCase, CoreApiTests): + content_type = 'json' + + def _get_token_id(self, r): + """Applicable only to JSON.""" + return r.result['access']['token']['id'] + + def assertValidErrorResponse(self, r): + self.assertIsNotNone(r.result.get('error')) + self.assertValidError(r.result['error']) + self.assertEqual(r.result['error']['code'], r.status_code) + + def assertValidExtension(self, extension, expected): + super(JsonTestCase, self).assertValidExtension(extension) + descriptions = [ext['description'] for ext in expected.itervalues()] + description = extension.get('description') + self.assertIsNotNone(description) + self.assertIn(description, descriptions) + self.assertIsNotNone(extension.get('links')) + self.assertNotEmpty(extension.get('links')) + for link in extension.get('links'): + self.assertValidExtensionLink(link) + + def assertValidExtensionListResponse(self, r, expected): + self.assertIsNotNone(r.result.get('extensions')) + self.assertIsNotNone(r.result['extensions'].get('values')) + self.assertNotEmpty(r.result['extensions'].get('values')) + for extension in r.result['extensions']['values']: + self.assertValidExtension(extension, expected) + + def assertValidExtensionResponse(self, r, expected): + self.assertValidExtension(r.result.get('extension'), expected) + + def assertValidAuthenticationResponse(self, r, + require_service_catalog=False): + self.assertIsNotNone(r.result.get('access')) + self.assertIsNotNone(r.result['access'].get('token')) + self.assertIsNotNone(r.result['access'].get('user')) + + # validate token + self.assertIsNotNone(r.result['access']['token'].get('id')) + self.assertIsNotNone(r.result['access']['token'].get('expires')) + tenant = r.result['access']['token'].get('tenant') + if tenant is not None: + # validate tenant + self.assertIsNotNone(tenant.get('id')) + self.assertIsNotNone(tenant.get('name')) + + # validate user + self.assertIsNotNone(r.result['access']['user'].get('id')) + self.assertIsNotNone(r.result['access']['user'].get('name')) + + if require_service_catalog: + # roles are only provided with a service catalog + roles = r.result['access']['user'].get('roles') + self.assertNotEmpty(roles) + for role in roles: + self.assertIsNotNone(role.get('name')) + + serviceCatalog = r.result['access'].get('serviceCatalog') + # validate service catalog + if require_service_catalog: + self.assertIsNotNone(serviceCatalog) + if serviceCatalog is not None: + self.assertTrue(isinstance(serviceCatalog, list)) + if require_service_catalog: + self.assertNotEmpty(serviceCatalog) + for service in r.result['access']['serviceCatalog']: + # validate service + self.assertIsNotNone(service.get('name')) + self.assertIsNotNone(service.get('type')) + + # services contain at least one endpoint + self.assertIsNotNone(service.get('endpoints')) + self.assertNotEmpty(service['endpoints']) + for endpoint in service['endpoints']: + # validate service endpoint + self.assertIsNotNone(endpoint.get('publicURL')) + + def assertValidTenantListResponse(self, r): + self.assertIsNotNone(r.result.get('tenants')) + self.assertNotEmpty(r.result['tenants']) + for tenant in r.result['tenants']: + self.assertValidTenant(tenant) + self.assertIsNotNone(tenant.get('enabled')) + self.assertIn(tenant.get('enabled'), [True, False]) + + def assertValidUserResponse(self, r): + self.assertIsNotNone(r.result.get('user')) + self.assertValidUser(r.result['user']) + + def assertValidTenantResponse(self, r): + self.assertIsNotNone(r.result.get('tenant')) + self.assertValidTenant(r.result['tenant']) + + def assertValidRoleListResponse(self, r): + self.assertIsNotNone(r.result.get('roles')) + self.assertNotEmpty(r.result['roles']) + for role in r.result['roles']: + self.assertValidRole(role) + + def assertValidVersion(self, version): + super(JsonTestCase, self).assertValidVersion(version) + + self.assertIsNotNone(version.get('links')) + self.assertNotEmpty(version.get('links')) + for link in version.get('links'): + self.assertIsNotNone(link.get('rel')) + self.assertIsNotNone(link.get('href')) + + self.assertIsNotNone(version.get('media-types')) + self.assertNotEmpty(version.get('media-types')) + for media in version.get('media-types'): + self.assertIsNotNone(media.get('base')) + self.assertIsNotNone(media.get('type')) + + def assertValidMultipleChoiceResponse(self, r): + self.assertIsNotNone(r.result.get('versions')) + self.assertIsNotNone(r.result['versions'].get('values')) + self.assertNotEmpty(r.result['versions']['values']) + for version in r.result['versions']['values']: + self.assertValidVersion(version) + + def assertValidVersionResponse(self, r): + self.assertValidVersion(r.result.get('version')) + + def assertValidEndpointListResponse(self, r): + self.assertIsNotNone(r.result.get('endpoints')) + self.assertNotEmpty(r.result['endpoints']) + for endpoint in r.result['endpoints']: + self.assertIsNotNone(endpoint.get('id')) + self.assertIsNotNone(endpoint.get('name')) + self.assertIsNotNone(endpoint.get('type')) + self.assertIsNotNone(endpoint.get('publicURL')) + self.assertIsNotNone(endpoint.get('internalURL')) + self.assertIsNotNone(endpoint.get('adminURL')) + + def test_service_crud_requires_auth(self): + """Service CRUD should 401 without an X-Auth-Token (bug 1006822).""" + # values here don't matter because we should 401 before they're checked + service_path = '/v2.0/OS-KSADM/services/%s' % uuid.uuid4().hex + service_body = { + 'OS-KSADM:service': { + 'name': uuid.uuid4().hex, + 'type': uuid.uuid4().hex, + }, + } + + r = self.admin_request(method='GET', + path='/v2.0/OS-KSADM/services', + expected_status=401) + self.assertValidErrorResponse(r) + + r = self.admin_request(method='POST', + path='/v2.0/OS-KSADM/services', + body=service_body, + expected_status=401) + self.assertValidErrorResponse(r) + + r = self.admin_request(method='GET', + path=service_path, + expected_status=401) + self.assertValidErrorResponse(r) + + r = self.admin_request(method='DELETE', + path=service_path, + expected_status=401) + self.assertValidErrorResponse(r) + + def test_user_role_list_requires_auth(self): + """User role list should 401 without an X-Auth-Token (bug 1006815).""" + # values here don't matter because we should 401 before they're checked + path = '/v2.0/tenants/%(tenant_id)s/users/%(user_id)s/roles' % { + 'tenant_id': uuid.uuid4().hex, + 'user_id': uuid.uuid4().hex, + } + + r = self.admin_request(path=path, expected_status=401) + self.assertValidErrorResponse(r) + + def test_fetch_revocation_list_nonadmin_fails(self): + self.admin_request( + method='GET', + path='/v2.0/tokens/revoked', + expected_status=401) + + def test_fetch_revocation_list_admin_200(self): + token = self.get_scoped_token() + r = self.admin_request( + method='GET', + path='/v2.0/tokens/revoked', + token=token, + expected_status=200) + self.assertValidRevocationListResponse(r) + + def assertValidRevocationListResponse(self, response): + self.assertIsNotNone(response.result['signed']) + + def test_create_update_user_json_invalid_enabled_type(self): + # Enforce usage of boolean for 'enabled' field in JSON + token = self.get_scoped_token() + + # Test CREATE request + r = self.admin_request( + method='POST', + path='/v2.0/users', + body={ + 'user': { + 'name': uuid.uuid4().hex, + 'password': uuid.uuid4().hex, + # In JSON, "true|false" are not boolean + 'enabled': "true", + }, + }, + token=token, + expected_status=400) + self.assertValidErrorResponse(r) + + # Test UPDATE request + r = self.admin_request( + method='PUT', + path='/v2.0/users/%(user_id)s' % { + 'user_id': self.user_foo['id'], + }, + body={ + 'user': { + # In JSON, "true|false" are not boolean + 'enabled': "true", + }, + }, + token=token, + expected_status=400) + self.assertValidErrorResponse(r) + + +class XmlTestCase(RestfulTestCase, CoreApiTests): + xmlns = 'http://docs.openstack.org/identity/api/v2.0' + content_type = 'xml' + + def _get_token_id(self, r): + return r.result.find(self._tag('token')).get('id') + + def _tag(self, tag_name, xmlns=None): + """Helper method to build an namespaced element name.""" + return '{%(ns)s}%(tag)s' % {'ns': xmlns or self.xmlns, 'tag': tag_name} + + def assertValidErrorResponse(self, r): + xml = r.result + self.assertEqual(xml.tag, self._tag('error')) + + self.assertValidError(xml) + self.assertEqual(xml.get('code'), str(r.status_code)) + + def assertValidExtension(self, extension, expected): + super(XmlTestCase, self).assertValidExtension(extension) + + self.assertIsNotNone(extension.find(self._tag('description'))) + self.assertTrue(extension.find(self._tag('description')).text) + links = extension.find(self._tag('links')) + self.assertNotEmpty(links.findall(self._tag('link'))) + descriptions = [ext['description'] for ext in expected.itervalues()] + description = extension.find(self._tag('description')).text + self.assertIn(description, descriptions) + for link in links.findall(self._tag('link')): + self.assertValidExtensionLink(link) + + def assertValidExtensionListResponse(self, r, expected): + xml = r.result + self.assertEqual(xml.tag, self._tag('extensions')) + self.assertNotEmpty(xml.findall(self._tag('extension'))) + for ext in xml.findall(self._tag('extension')): + self.assertValidExtension(ext, expected) + + def assertValidExtensionResponse(self, r, expected): + xml = r.result + self.assertEqual(xml.tag, self._tag('extension')) + + self.assertValidExtension(xml, expected) + + def assertValidVersion(self, version): + super(XmlTestCase, self).assertValidVersion(version) + + links = version.find(self._tag('links')) + self.assertIsNotNone(links) + self.assertNotEmpty(links.findall(self._tag('link'))) + for link in links.findall(self._tag('link')): + self.assertIsNotNone(link.get('rel')) + self.assertIsNotNone(link.get('href')) + + media_types = version.find(self._tag('media-types')) + self.assertIsNotNone(media_types) + self.assertNotEmpty(media_types.findall(self._tag('media-type'))) + for media in media_types.findall(self._tag('media-type')): + self.assertIsNotNone(media.get('base')) + self.assertIsNotNone(media.get('type')) + + def assertValidMultipleChoiceResponse(self, r): + xml = r.result + self.assertEqual(xml.tag, self._tag('versions')) + + self.assertNotEmpty(xml.findall(self._tag('version'))) + for version in xml.findall(self._tag('version')): + self.assertValidVersion(version) + + def assertValidVersionResponse(self, r): + xml = r.result + self.assertEqual(xml.tag, self._tag('version')) + + self.assertValidVersion(xml) + + def assertValidEndpointListResponse(self, r): + xml = r.result + self.assertEqual(xml.tag, self._tag('endpoints')) + + self.assertNotEmpty(xml.findall(self._tag('endpoint'))) + for endpoint in xml.findall(self._tag('endpoint')): + self.assertIsNotNone(endpoint.get('id')) + self.assertIsNotNone(endpoint.get('name')) + self.assertIsNotNone(endpoint.get('type')) + self.assertIsNotNone(endpoint.get('publicURL')) + self.assertIsNotNone(endpoint.get('internalURL')) + self.assertIsNotNone(endpoint.get('adminURL')) + + def assertValidTenantResponse(self, r): + xml = r.result + self.assertEqual(xml.tag, self._tag('tenant')) + + self.assertValidTenant(xml) + + def assertValidUserResponse(self, r): + xml = r.result + self.assertEqual(xml.tag, self._tag('user')) + + self.assertValidUser(xml) + + def assertValidRoleListResponse(self, r): + xml = r.result + self.assertEqual(xml.tag, self._tag('roles')) + + self.assertNotEmpty(r.result.findall(self._tag('role'))) + for role in r.result.findall(self._tag('role')): + self.assertValidRole(role) + + def assertValidAuthenticationResponse(self, r, + require_service_catalog=False): + xml = r.result + self.assertEqual(xml.tag, self._tag('access')) + + # validate token + token = xml.find(self._tag('token')) + self.assertIsNotNone(token) + self.assertIsNotNone(token.get('id')) + self.assertIsNotNone(token.get('expires')) + tenant = token.find(self._tag('tenant')) + if tenant is not None: + # validate tenant + self.assertValidTenant(tenant) + self.assertIn(tenant.get('enabled'), ['true', 'false']) + + user = xml.find(self._tag('user')) + self.assertIsNotNone(user) + self.assertIsNotNone(user.get('id')) + self.assertIsNotNone(user.get('name')) + + if require_service_catalog: + # roles are only provided with a service catalog + roles = user.findall(self._tag('role')) + self.assertNotEmpty(roles) + for role in roles: + self.assertIsNotNone(role.get('name')) + + serviceCatalog = xml.find(self._tag('serviceCatalog')) + # validate the serviceCatalog + if require_service_catalog: + self.assertIsNotNone(serviceCatalog) + if serviceCatalog is not None: + services = serviceCatalog.findall(self._tag('service')) + if require_service_catalog: + self.assertNotEmpty(services) + for service in services: + # validate service + self.assertIsNotNone(service.get('name')) + self.assertIsNotNone(service.get('type')) + + # services contain at least one endpoint + endpoints = service.findall(self._tag('endpoint')) + self.assertNotEmpty(endpoints) + for endpoint in endpoints: + # validate service endpoint + self.assertIsNotNone(endpoint.get('publicURL')) + + def assertValidTenantListResponse(self, r): + xml = r.result + self.assertEqual(xml.tag, self._tag('tenants')) + + self.assertNotEmpty(r.result) + for tenant in r.result.findall(self._tag('tenant')): + self.assertValidTenant(tenant) + self.assertIn(tenant.get('enabled'), ['true', 'false']) + + def test_authenticate_with_invalid_xml_in_password(self): + # public_request would auto escape the ampersand + self.public_request( + method='POST', + path='/v2.0/tokens', + headers={ + 'Content-Type': 'application/xml' + }, + body=""" + <?xml version="1.0" encoding="UTF-8"?> + <auth xmlns="http://docs.openstack.org/identity/api/v2.0" + tenantId="bar"> + <passwordCredentials username="FOO" password="&"/> + </auth> + """, + expected_status=400, + convert=False) + + def test_add_tenant_xml(self): + """Create a tenant without providing description field.""" + token = self.get_scoped_token() + r = self.admin_request( + method='POST', + path='/v2.0/tenants', + headers={ + 'Content-Type': 'application/xml', + 'X-Auth-Token': token + }, + body=""" + <?xml version="1.0" encoding="UTF-8"?> + <tenant xmlns="http://docs.openstack.org/identity/api/v2.0" + enabled="true" name="ACME Corp"> + <description></description> + </tenant> + """, + convert=False) + self._from_content_type(r, 'json') + self.assertIsNotNone(r.result.get('tenant')) + self.assertValidTenant(r.result['tenant']) + self.assertEqual(r.result['tenant'].get('description'), "") + + def test_add_tenant_json(self): + """Create a tenant without providing description field.""" + token = self.get_scoped_token() + r = self.admin_request( + method='POST', + path='/v2.0/tenants', + headers={ + 'Content-Type': 'application/json', + 'X-Auth-Token': token + }, + body=""" + {"tenant":{ + "name":"test1", + "description":"", + "enabled":true} + } + """, + convert=False) + self._from_content_type(r, 'json') + self.assertIsNotNone(r.result.get('tenant')) + self.assertValidTenant(r.result['tenant']) + self.assertEqual(r.result['tenant'].get('description'), "") + + def test_create_project_invalid_enabled_type_string(self): + # Forbidden usage of string for 'enabled' field in JSON and XML + token = self.get_scoped_token() + + r = self.admin_request( + method='POST', + path='/v2.0/tenants', + body={ + 'tenant': { + 'name': uuid.uuid4().hex, + # In XML, only "true|false" are converted to boolean. + 'enabled': "False", + }, + }, + token=token, + expected_status=400) + self.assertValidErrorResponse(r) + + def test_update_project_invalid_enabled_type_string(self): + # Forbidden usage of string for 'enabled' field in JSON and XML + token = self.get_scoped_token() + + path = '/v2.0/tenants/%(tenant_id)s' % { + 'tenant_id': self.tenant_bar['id'], + } + + r = self.admin_request( + method='PUT', + path=path, + body={ + 'tenant': { + # In XML, only "true|false" are converted to boolean. + 'enabled': "False", + }, + }, + token=token, + expected_status=400) + self.assertValidErrorResponse(r) |