diff options
author | Dolph Mathews <dolph.mathews@gmail.com> | 2013-02-26 19:46:03 -0600 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2013-03-05 19:36:32 +0000 |
commit | c0f017ce861d1532f790b18a137b491440ffa83b (patch) | |
tree | 84f4372d7184a4034c5d7ef783ead38a05ca4ff8 /tests/test_v3.py | |
parent | 601eeb50b60a2e99041690fe19238202bc203503 (diff) | |
download | keystone-c0f017ce861d1532f790b18a137b491440ffa83b.tar.gz keystone-c0f017ce861d1532f790b18a137b491440ffa83b.tar.xz keystone-c0f017ce861d1532f790b18a137b491440ffa83b.zip |
Expand v3 trust test coverage
Change-Id: Ie697e19c2fa0e4ac203ff24aa9efa23c25ea6ccd
Diffstat (limited to 'tests/test_v3.py')
-rw-r--r-- | tests/test_v3.py | 476 |
1 files changed, 465 insertions, 11 deletions
diff --git a/tests/test_v3.py b/tests/test_v3.py index 2aa2c2b5..39807cec 100644 --- a/tests/test_v3.py +++ b/tests/test_v3.py @@ -1,20 +1,23 @@ +import datetime import uuid from lxml import etree +from keystone import auth from keystone.common import serializer from keystone.common.sql import util as sql_util -from keystone import auth -from keystone import test from keystone import config +from keystone.openstack.common import timeutils from keystone.policy.backends import rules - +from keystone import test import test_content_types CONF = config.CONF +TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' + class RestfulTestCase(test_content_types.RestfulTestCase): def setUp(self): @@ -133,6 +136,38 @@ class RestfulTestCase(test_content_types.RestfulTestCase): ref['type'] = uuid.uuid4().hex return ref + def new_trust_ref(self, trustor_user_id, trustee_user_id, project_id=None, + impersonation=None, expires=None, role_ids=None, + role_names=None): + ref = self.new_ref() + + ref['trustor_user_id'] = trustor_user_id + ref['trustee_user_id'] = trustee_user_id + ref['impersonation'] = impersonation or False + ref['project_id'] = project_id + + if isinstance(expires, basestring): + ref['expires_at'] = expires + elif isinstance(expires, dict): + ref['expires_at'] = timeutils.strtime( + timeutils.utcnow() + datetime.timedelta(**expires), + fmt=TIME_FORMAT) + elif expires is None: + pass + else: + raise NotImplementedError('Unexpected value for "expires"') + + role_ids = role_ids or [] + role_names = role_names or [] + if role_ids or role_names: + ref['roles'] = [] + for role_id in role_ids: + ref['roles'].append({'id': role_id}) + for role_name in role_names: + ref['roles'].append({'name': role_name}) + + return ref + def get_scoped_token(self): """Convenience method so that we can test authenticated requests.""" r = self.admin_request( @@ -214,6 +249,23 @@ class RestfulTestCase(test_content_types.RestfulTestCase): self.assertIsNotNone(resp['error'].get('message')) self.assertEqual(int(resp['error']['code']), r.status) + def assertValidListLinks(self, links): + self.assertIsNotNone(links) + self.assertIsNotNone(links.get('self')) + self.assertIn(CONF.public_endpoint % CONF, links['self']) + + self.assertIn('next', links) + if links['next'] is not None: + self.assertIn( + CONF.public_endpoint % CONF, + links['next']) + + self.assertIn('previous', links) + if links['previous'] is not None: + self.assertIn( + CONF.public_endpoint % CONF, + links['previous']) + def assertValidListResponse(self, resp, key, entity_validator, ref=None, expected_length=None): """Make assertions common to all API list responses. @@ -232,11 +284,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase): self.assertTrue(len(entities)) # collections should have relational links - self.assertIsNotNone(resp.body.get('links')) - self.assertIn('previous', resp.body['links']) - self.assertIn('self', resp.body['links']) - self.assertIn('next', resp.body['links']) - self.assertIn(CONF.public_endpoint % CONF, resp.body['links']['self']) + self.assertValidListLinks(resp.body.get('links')) for entity in entities: self.assertIsNotNone(entity) @@ -248,12 +296,13 @@ class RestfulTestCase(test_content_types.RestfulTestCase): entity_validator(entity, ref) return entities - def assertValidResponse(self, resp, key, entity_validator, ref): + def assertValidResponse(self, resp, key, entity_validator, *args, + **kwargs): """Make assertions common to all API responses.""" entity = resp.body.get(key) self.assertIsNotNone(entity) - self.assertValidEntity(entity, ref) - entity_validator(entity, ref) + self.assertValidEntity(entity, *args, **kwargs) + entity_validator(entity, *args, **kwargs) return entity def assertValidEntity(self, entity, ref=None): @@ -280,6 +329,411 @@ class RestfulTestCase(test_content_types.RestfulTestCase): return entity + # auth validation + + def assertValidISO8601ExtendedFormatDatetime(self, dt): + try: + return timeutils.parse_strtime(dt, fmt=TIME_FORMAT) + except Exception: + msg = '%s is not a valid ISO 8601 extended format date time.' % dt + raise AssertionError(msg) + self.assertTrue(isinstance(dt, datetime.datetime)) + + def assertValidTokenResponse(self, r, user=None): + self.assertTrue(r.getheader('X-Subject-Token')) + token = r.body + if r.getheader('Content-Type') == 'application/xml': + token = serializer.from_xml(etree.tostring(r.body))['token'] + else: + token = r.body['token'] + + self.assertIsNotNone(token.get('expires_at')) + expires_at = self.assertValidISO8601ExtendedFormatDatetime( + token['expires_at']) + self.assertIsNotNone(token.get('issued_at')) + issued_at = self.assertValidISO8601ExtendedFormatDatetime( + token['issued_at']) + self.assertTrue(issued_at < expires_at) + + self.assertIn('user', token) + self.assertIn('id', token['user']) + self.assertIn('name', token['user']) + self.assertIn('domain', token['user']) + self.assertIn('id', token['user']['domain']) + + if user is not None: + self.assertEqual(user['id'], token['user']['id']) + self.assertEqual(user['name'], token['user']['name']) + self.assertEqual(user['domain_id'], token['user']['domain']['id']) + + return token + + def assertValidUnscopedTokenResponse(self, r, *args, **kwargs): + token = self.assertValidTokenResponse(r, *args, **kwargs) + + self.assertNotIn('roles', token) + self.assertNotIn('catalog', token) + self.assertNotIn('project', token) + self.assertNotIn('domain', token) + + return token + + def assertValidScopedTokenResponse(self, r, *args, **kwargs): + token = self.assertValidTokenResponse(r, *args, **kwargs) + + self.assertIn('catalog', token) + self.assertIn('roles', token) + self.assertTrue(token['roles']) + for role in token['roles']: + self.assertIn('id', role) + self.assertIn('name', role) + + return token + + def assertValidProjectScopedTokenResponse(self, r, *args, **kwargs): + token = self.assertValidScopedTokenResponse(r, *args, **kwargs) + + self.assertIn('project', token) + self.assertIn('id', token['project']) + self.assertIn('name', token['project']) + self.assertIn('domain', token['project']) + self.assertIn('id', token['project']['domain']) + self.assertIn('name', token['project']['domain']) + + self.assertEqual(self.role_id, token['roles'][0]['id']) + + return token + + def assertValidProjectTrustScopedTokenResponse(self, r, *args, **kwargs): + token = self.assertValidProjectScopedTokenResponse(r, *args, **kwargs) + + self.assertIsNotNone(token.get('trust')) + self.assertIsNotNone(token['trust'].get('id')) + self.assertTrue(isinstance(token['trust'].get('impersonation'), bool)) + self.assertIsNotNone(token['trust'].get('trustor_user')) + self.assertIsNotNone(token['trust'].get('trustee_user')) + self.assertIsNotNone(token['trust']['trustor_user'].get('id')) + self.assertIsNotNone(token['trust']['trustee_user'].get('id')) + + def assertValidDomainScopedTokenResponse(self, r, *args, **kwargs): + token = self.assertValidScopedTokenResponse(r, *args, **kwargs) + + self.assertIn('domain', token) + self.assertIn('id', token['domain']) + self.assertIn('name', token['domain']) + + return token + + def assertEqualTokens(self, a, b): + """Assert that two tokens are equal. + + Compare two tokens except for their ids. This also truncates + the time in the comparison. + """ + def normalize(token): + del token['token']['expires_at'] + del token['token']['issued_at'] + return token + + a_expires_at = self.assertValidISO8601ExtendedFormatDatetime( + a['token']['expires_at']) + b_expires_at = self.assertValidISO8601ExtendedFormatDatetime( + b['token']['expires_at']) + self.assertCloseEnoughForGovernmentWork(a_expires_at, b_expires_at) + + a_issued_at = self.assertValidISO8601ExtendedFormatDatetime( + a['token']['issued_at']) + b_issued_at = self.assertValidISO8601ExtendedFormatDatetime( + b['token']['issued_at']) + self.assertCloseEnoughForGovernmentWork(a_issued_at, b_issued_at) + + return self.assertDictEqual(normalize(a), normalize(b)) + + # service validation + + def assertValidServiceListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'services', + self.assertValidService, + *args, + **kwargs) + + def assertValidServiceResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'service', + self.assertValidService, + *args, + **kwargs) + + def assertValidService(self, entity, ref=None): + self.assertIsNotNone(entity.get('type')) + if ref: + self.assertEqual(ref['type'], entity['type']) + return entity + + # endpoint validation + + def assertValidEndpointListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'endpoints', + self.assertValidEndpoint, + *args, + **kwargs) + + def assertValidEndpointResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'endpoint', + self.assertValidEndpoint, + *args, + **kwargs) + + def assertValidEndpoint(self, entity, ref=None): + self.assertIsNotNone(entity.get('interface')) + self.assertIsNotNone(entity.get('service_id')) + if ref: + self.assertEqual(ref['interface'], entity['interface']) + self.assertEqual(ref['service_id'], entity['service_id']) + return entity + + # domain validation + + def assertValidDomainListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'domains', + self.assertValidDomain, + *args, + **kwargs) + + def assertValidDomainResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'domain', + self.assertValidDomain, + *args, + **kwargs) + + def assertValidDomain(self, entity, ref=None): + if ref: + pass + return entity + + # project validation + + def assertValidProjectListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'projects', + self.assertValidProject, + *args, + **kwargs) + + def assertValidProjectResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'project', + self.assertValidProject, + *args, + **kwargs) + + def assertValidProject(self, entity, ref=None): + self.assertIsNotNone(entity.get('domain_id')) + if ref: + self.assertEqual(ref['domain_id'], entity['domain_id']) + return entity + + # user validation + + def assertValidUserListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'users', + self.assertValidUser, + *args, + **kwargs) + + def assertValidUserResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'user', + self.assertValidUser, + *args, + **kwargs) + + def assertValidUser(self, entity, ref=None): + self.assertIsNotNone(entity.get('domain_id')) + self.assertIsNotNone(entity.get('email')) + self.assertIsNone(entity.get('password')) + if ref: + self.assertEqual(ref['domain_id'], entity['domain_id']) + self.assertEqual(ref['email'], entity['email']) + return entity + + # group validation + + def assertValidGroupListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'groups', + self.assertValidGroup, + *args, + **kwargs) + + def assertValidGroupResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'group', + self.assertValidGroup, + *args, + **kwargs) + + def assertValidGroup(self, entity, ref=None): + self.assertIsNotNone(entity.get('name')) + if ref: + self.assertEqual(ref['name'], entity['name']) + return entity + + # credential validation + + def assertValidCredentialListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'credentials', + self.assertValidCredential, + *args, + **kwargs) + + def assertValidCredentialResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'credential', + self.assertValidCredential, + *args, + **kwargs) + + def assertValidCredential(self, entity, ref=None): + self.assertIsNotNone(entity.get('user_id')) + self.assertIsNotNone(entity.get('blob')) + self.assertIsNotNone(entity.get('type')) + if ref: + self.assertEqual(ref['user_id'], entity['user_id']) + self.assertEqual(ref['blob'], entity['blob']) + self.assertEqual(ref['type'], entity['type']) + self.assertEqual(ref.get('project_id'), entity.get('project_id')) + return entity + + # role validation + + def assertValidRoleListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'roles', + self.assertValidRole, + *args, + **kwargs) + + def assertValidRoleResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'role', + self.assertValidRole, + *args, + **kwargs) + + def assertValidRole(self, entity, ref=None): + self.assertIsNotNone(entity.get('name')) + if ref: + self.assertEqual(ref['name'], entity['name']) + return entity + + # policy validation + + def assertValidPolicyListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'policies', + self.assertValidPolicy, + *args, + **kwargs) + + def assertValidPolicyResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'policy', + self.assertValidPolicy, + *args, + **kwargs) + + def assertValidPolicy(self, entity, ref=None): + self.assertIsNotNone(entity.get('blob')) + self.assertIsNotNone(entity.get('type')) + if ref: + self.assertEqual(ref['blob'], entity['blob']) + self.assertEqual(ref['type'], entity['type']) + return entity + + # trust validation + + def assertValidTrustListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'trusts', + self.assertValidTrust, + *args, + **kwargs) + + def assertValidTrustResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'trust', + self.assertValidTrust, + *args, + **kwargs) + + def assertValidTrust(self, entity, ref=None): + self.assertIsNotNone(entity.get('trustor_user_id')) + self.assertIsNotNone(entity.get('trustee_user_id')) + + self.assertIn('expires_at', entity) + if entity['expires_at'] is not None: + self.assertValidISO8601ExtendedFormatDatetime(entity['expires_at']) + + # always disallow project xor project_id (neither or both is allowed) + has_roles = bool(entity.get('roles')) + has_project = bool(entity.get('project_id')) + self.assertFalse(has_roles ^ has_project) + + for role in entity['roles']: + self.assertIsNotNone(role) + self.assertValidEntity(role) + self.assertValidRole(role) + + self.assertValidListLinks(entity.get('roles_links')) + + # these were used during dev and shouldn't land in final impl + self.assertNotIn('role_ids', entity) + self.assertNotIn('role_names', entity) + + if ref: + self.assertEqual(ref['trustor_user_id'], entity['trustor_user_id']) + self.assertEqual(ref['trustee_user_id'], entity['trustee_user_id']) + self.assertEqual(ref['project_id'], entity['project_id']) + if entity.get('expires_at') or ref.get('expires_at'): + entity_exp = self.assertValidISO8601ExtendedFormatDatetime( + entity['expires_at']) + ref_exp = self.assertValidISO8601ExtendedFormatDatetime( + ref['expires_at']) + self.assertCloseEnoughForGovernmentWork(entity_exp, ref_exp) + else: + self.assertEqual(ref.get('expires_at'), + entity.get('expires_at')) + + return entity + class VersionTestCase(RestfulTestCase): def test_get_version(self): |