diff options
| author | Jenkins <jenkins@review.openstack.org> | 2013-03-06 03:43:48 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2013-03-06 03:43:48 +0000 |
| commit | 2e8c4d2ff1897d4235dd9718d92cc7c0c6ea30e5 (patch) | |
| tree | 8aeeec5f3bbe31c08f628468d83c144660c0ed62 | |
| parent | cdeda9416b8ea4af5250fb4bd48865f4d87967b3 (diff) | |
| parent | c0f017ce861d1532f790b18a137b491440ffa83b (diff) | |
Merge "Expand v3 trust test coverage"
| -rw-r--r-- | tests/test_v3.py | 476 | ||||
| -rw-r--r-- | tests/test_v3_auth.py | 371 | ||||
| -rw-r--r-- | tests/test_v3_catalog.py | 46 | ||||
| -rw-r--r-- | tests/test_v3_identity.py | 139 | ||||
| -rw-r--r-- | tests/test_v3_policy.py | 24 |
5 files changed, 727 insertions, 329 deletions
diff --git a/tests/test_v3.py b/tests/test_v3.py index 2aa2c2b5..39807cec 100644 --- a/tests/test_v3.py +++ b/tests/test_v3.py @@ -1,20 +1,23 @@ +import datetime import uuid from lxml import etree +from keystone import auth from keystone.common import serializer from keystone.common.sql import util as sql_util -from keystone import auth -from keystone import test from keystone import config +from keystone.openstack.common import timeutils from keystone.policy.backends import rules - +from keystone import test import test_content_types CONF = config.CONF +TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' + class RestfulTestCase(test_content_types.RestfulTestCase): def setUp(self): @@ -133,6 +136,38 @@ class RestfulTestCase(test_content_types.RestfulTestCase): ref['type'] = uuid.uuid4().hex return ref + def new_trust_ref(self, trustor_user_id, trustee_user_id, project_id=None, + impersonation=None, expires=None, role_ids=None, + role_names=None): + ref = self.new_ref() + + ref['trustor_user_id'] = trustor_user_id + ref['trustee_user_id'] = trustee_user_id + ref['impersonation'] = impersonation or False + ref['project_id'] = project_id + + if isinstance(expires, basestring): + ref['expires_at'] = expires + elif isinstance(expires, dict): + ref['expires_at'] = timeutils.strtime( + timeutils.utcnow() + datetime.timedelta(**expires), + fmt=TIME_FORMAT) + elif expires is None: + pass + else: + raise NotImplementedError('Unexpected value for "expires"') + + role_ids = role_ids or [] + role_names = role_names or [] + if role_ids or role_names: + ref['roles'] = [] + for role_id in role_ids: + ref['roles'].append({'id': role_id}) + for role_name in role_names: + ref['roles'].append({'name': role_name}) + + return ref + def get_scoped_token(self): """Convenience method so that we can test authenticated requests.""" r = self.admin_request( @@ -214,6 +249,23 @@ class RestfulTestCase(test_content_types.RestfulTestCase): self.assertIsNotNone(resp['error'].get('message')) self.assertEqual(int(resp['error']['code']), r.status) + def assertValidListLinks(self, links): + self.assertIsNotNone(links) + self.assertIsNotNone(links.get('self')) + self.assertIn(CONF.public_endpoint % CONF, links['self']) + + self.assertIn('next', links) + if links['next'] is not None: + self.assertIn( + CONF.public_endpoint % CONF, + links['next']) + + self.assertIn('previous', links) + if links['previous'] is not None: + self.assertIn( + CONF.public_endpoint % CONF, + links['previous']) + def assertValidListResponse(self, resp, key, entity_validator, ref=None, expected_length=None): """Make assertions common to all API list responses. @@ -232,11 +284,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase): self.assertTrue(len(entities)) # collections should have relational links - self.assertIsNotNone(resp.body.get('links')) - self.assertIn('previous', resp.body['links']) - self.assertIn('self', resp.body['links']) - self.assertIn('next', resp.body['links']) - self.assertIn(CONF.public_endpoint % CONF, resp.body['links']['self']) + self.assertValidListLinks(resp.body.get('links')) for entity in entities: self.assertIsNotNone(entity) @@ -248,12 +296,13 @@ class RestfulTestCase(test_content_types.RestfulTestCase): entity_validator(entity, ref) return entities - def assertValidResponse(self, resp, key, entity_validator, ref): + def assertValidResponse(self, resp, key, entity_validator, *args, + **kwargs): """Make assertions common to all API responses.""" entity = resp.body.get(key) self.assertIsNotNone(entity) - self.assertValidEntity(entity, ref) - entity_validator(entity, ref) + self.assertValidEntity(entity, *args, **kwargs) + entity_validator(entity, *args, **kwargs) return entity def assertValidEntity(self, entity, ref=None): @@ -280,6 +329,411 @@ class RestfulTestCase(test_content_types.RestfulTestCase): return entity + # auth validation + + def assertValidISO8601ExtendedFormatDatetime(self, dt): + try: + return timeutils.parse_strtime(dt, fmt=TIME_FORMAT) + except Exception: + msg = '%s is not a valid ISO 8601 extended format date time.' % dt + raise AssertionError(msg) + self.assertTrue(isinstance(dt, datetime.datetime)) + + def assertValidTokenResponse(self, r, user=None): + self.assertTrue(r.getheader('X-Subject-Token')) + token = r.body + if r.getheader('Content-Type') == 'application/xml': + token = serializer.from_xml(etree.tostring(r.body))['token'] + else: + token = r.body['token'] + + self.assertIsNotNone(token.get('expires_at')) + expires_at = self.assertValidISO8601ExtendedFormatDatetime( + token['expires_at']) + self.assertIsNotNone(token.get('issued_at')) + issued_at = self.assertValidISO8601ExtendedFormatDatetime( + token['issued_at']) + self.assertTrue(issued_at < expires_at) + + self.assertIn('user', token) + self.assertIn('id', token['user']) + self.assertIn('name', token['user']) + self.assertIn('domain', token['user']) + self.assertIn('id', token['user']['domain']) + + if user is not None: + self.assertEqual(user['id'], token['user']['id']) + self.assertEqual(user['name'], token['user']['name']) + self.assertEqual(user['domain_id'], token['user']['domain']['id']) + + return token + + def assertValidUnscopedTokenResponse(self, r, *args, **kwargs): + token = self.assertValidTokenResponse(r, *args, **kwargs) + + self.assertNotIn('roles', token) + self.assertNotIn('catalog', token) + self.assertNotIn('project', token) + self.assertNotIn('domain', token) + + return token + + def assertValidScopedTokenResponse(self, r, *args, **kwargs): + token = self.assertValidTokenResponse(r, *args, **kwargs) + + self.assertIn('catalog', token) + self.assertIn('roles', token) + self.assertTrue(token['roles']) + for role in token['roles']: + self.assertIn('id', role) + self.assertIn('name', role) + + return token + + def assertValidProjectScopedTokenResponse(self, r, *args, **kwargs): + token = self.assertValidScopedTokenResponse(r, *args, **kwargs) + + self.assertIn('project', token) + self.assertIn('id', token['project']) + self.assertIn('name', token['project']) + self.assertIn('domain', token['project']) + self.assertIn('id', token['project']['domain']) + self.assertIn('name', token['project']['domain']) + + self.assertEqual(self.role_id, token['roles'][0]['id']) + + return token + + def assertValidProjectTrustScopedTokenResponse(self, r, *args, **kwargs): + token = self.assertValidProjectScopedTokenResponse(r, *args, **kwargs) + + self.assertIsNotNone(token.get('trust')) + self.assertIsNotNone(token['trust'].get('id')) + self.assertTrue(isinstance(token['trust'].get('impersonation'), bool)) + self.assertIsNotNone(token['trust'].get('trustor_user')) + self.assertIsNotNone(token['trust'].get('trustee_user')) + self.assertIsNotNone(token['trust']['trustor_user'].get('id')) + self.assertIsNotNone(token['trust']['trustee_user'].get('id')) + + def assertValidDomainScopedTokenResponse(self, r, *args, **kwargs): + token = self.assertValidScopedTokenResponse(r, *args, **kwargs) + + self.assertIn('domain', token) + self.assertIn('id', token['domain']) + self.assertIn('name', token['domain']) + + return token + + def assertEqualTokens(self, a, b): + """Assert that two tokens are equal. + + Compare two tokens except for their ids. This also truncates + the time in the comparison. + """ + def normalize(token): + del token['token']['expires_at'] + del token['token']['issued_at'] + return token + + a_expires_at = self.assertValidISO8601ExtendedFormatDatetime( + a['token']['expires_at']) + b_expires_at = self.assertValidISO8601ExtendedFormatDatetime( + b['token']['expires_at']) + self.assertCloseEnoughForGovernmentWork(a_expires_at, b_expires_at) + + a_issued_at = self.assertValidISO8601ExtendedFormatDatetime( + a['token']['issued_at']) + b_issued_at = self.assertValidISO8601ExtendedFormatDatetime( + b['token']['issued_at']) + self.assertCloseEnoughForGovernmentWork(a_issued_at, b_issued_at) + + return self.assertDictEqual(normalize(a), normalize(b)) + + # service validation + + def assertValidServiceListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'services', + self.assertValidService, + *args, + **kwargs) + + def assertValidServiceResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'service', + self.assertValidService, + *args, + **kwargs) + + def assertValidService(self, entity, ref=None): + self.assertIsNotNone(entity.get('type')) + if ref: + self.assertEqual(ref['type'], entity['type']) + return entity + + # endpoint validation + + def assertValidEndpointListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'endpoints', + self.assertValidEndpoint, + *args, + **kwargs) + + def assertValidEndpointResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'endpoint', + self.assertValidEndpoint, + *args, + **kwargs) + + def assertValidEndpoint(self, entity, ref=None): + self.assertIsNotNone(entity.get('interface')) + self.assertIsNotNone(entity.get('service_id')) + if ref: + self.assertEqual(ref['interface'], entity['interface']) + self.assertEqual(ref['service_id'], entity['service_id']) + return entity + + # domain validation + + def assertValidDomainListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'domains', + self.assertValidDomain, + *args, + **kwargs) + + def assertValidDomainResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'domain', + self.assertValidDomain, + *args, + **kwargs) + + def assertValidDomain(self, entity, ref=None): + if ref: + pass + return entity + + # project validation + + def assertValidProjectListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'projects', + self.assertValidProject, + *args, + **kwargs) + + def assertValidProjectResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'project', + self.assertValidProject, + *args, + **kwargs) + + def assertValidProject(self, entity, ref=None): + self.assertIsNotNone(entity.get('domain_id')) + if ref: + self.assertEqual(ref['domain_id'], entity['domain_id']) + return entity + + # user validation + + def assertValidUserListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'users', + self.assertValidUser, + *args, + **kwargs) + + def assertValidUserResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'user', + self.assertValidUser, + *args, + **kwargs) + + def assertValidUser(self, entity, ref=None): + self.assertIsNotNone(entity.get('domain_id')) + self.assertIsNotNone(entity.get('email')) + self.assertIsNone(entity.get('password')) + if ref: + self.assertEqual(ref['domain_id'], entity['domain_id']) + self.assertEqual(ref['email'], entity['email']) + return entity + + # group validation + + def assertValidGroupListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'groups', + self.assertValidGroup, + *args, + **kwargs) + + def assertValidGroupResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'group', + self.assertValidGroup, + *args, + **kwargs) + + def assertValidGroup(self, entity, ref=None): + self.assertIsNotNone(entity.get('name')) + if ref: + self.assertEqual(ref['name'], entity['name']) + return entity + + # credential validation + + def assertValidCredentialListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'credentials', + self.assertValidCredential, + *args, + **kwargs) + + def assertValidCredentialResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'credential', + self.assertValidCredential, + *args, + **kwargs) + + def assertValidCredential(self, entity, ref=None): + self.assertIsNotNone(entity.get('user_id')) + self.assertIsNotNone(entity.get('blob')) + self.assertIsNotNone(entity.get('type')) + if ref: + self.assertEqual(ref['user_id'], entity['user_id']) + self.assertEqual(ref['blob'], entity['blob']) + self.assertEqual(ref['type'], entity['type']) + self.assertEqual(ref.get('project_id'), entity.get('project_id')) + return entity + + # role validation + + def assertValidRoleListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'roles', + self.assertValidRole, + *args, + **kwargs) + + def assertValidRoleResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'role', + self.assertValidRole, + *args, + **kwargs) + + def assertValidRole(self, entity, ref=None): + self.assertIsNotNone(entity.get('name')) + if ref: + self.assertEqual(ref['name'], entity['name']) + return entity + + # policy validation + + def assertValidPolicyListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'policies', + self.assertValidPolicy, + *args, + **kwargs) + + def assertValidPolicyResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'policy', + self.assertValidPolicy, + *args, + **kwargs) + + def assertValidPolicy(self, entity, ref=None): + self.assertIsNotNone(entity.get('blob')) + self.assertIsNotNone(entity.get('type')) + if ref: + self.assertEqual(ref['blob'], entity['blob']) + self.assertEqual(ref['type'], entity['type']) + return entity + + # trust validation + + def assertValidTrustListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'trusts', + self.assertValidTrust, + *args, + **kwargs) + + def assertValidTrustResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'trust', + self.assertValidTrust, + *args, + **kwargs) + + def assertValidTrust(self, entity, ref=None): + self.assertIsNotNone(entity.get('trustor_user_id')) + self.assertIsNotNone(entity.get('trustee_user_id')) + + self.assertIn('expires_at', entity) + if entity['expires_at'] is not None: + self.assertValidISO8601ExtendedFormatDatetime(entity['expires_at']) + + # always disallow project xor project_id (neither or both is allowed) + has_roles = bool(entity.get('roles')) + has_project = bool(entity.get('project_id')) + self.assertFalse(has_roles ^ has_project) + + for role in entity['roles']: + self.assertIsNotNone(role) + self.assertValidEntity(role) + self.assertValidRole(role) + + self.assertValidListLinks(entity.get('roles_links')) + + # these were used during dev and shouldn't land in final impl + self.assertNotIn('role_ids', entity) + self.assertNotIn('role_names', entity) + + if ref: + self.assertEqual(ref['trustor_user_id'], entity['trustor_user_id']) + self.assertEqual(ref['trustee_user_id'], entity['trustee_user_id']) + self.assertEqual(ref['project_id'], entity['project_id']) + if entity.get('expires_at') or ref.get('expires_at'): + entity_exp = self.assertValidISO8601ExtendedFormatDatetime( + entity['expires_at']) + ref_exp = self.assertValidISO8601ExtendedFormatDatetime( + ref['expires_at']) + self.assertCloseEnoughForGovernmentWork(entity_exp, ref_exp) + else: + self.assertEqual(ref.get('expires_at'), + entity.get('expires_at')) + + return entity + class VersionTestCase(RestfulTestCase): def test_get_version(self): diff --git a/tests/test_v3_auth.py b/tests/test_v3_auth.py index 8f66cdf4..40a4c846 100644 --- a/tests/test_v3_auth.py +++ b/tests/test_v3_auth.py @@ -12,17 +12,13 @@ # License for the specific language governing permissions and limitations # under the License. -import re import uuid -from lxml import etree +import nose.exc from keystone import auth from keystone import config from keystone import exception -from keystone.common import serializer -from keystone.openstack.common import jsonutils -from keystone.openstack.common import timeutils from keystone import test import test_v3 @@ -33,7 +29,7 @@ CONF = config.CONF def _build_auth_scope(project_id=None, project_name=None, project_domain_id=None, project_domain_name=None, - domain_id=None, domain_name=None): + domain_id=None, domain_name=None, trust_id=None): scope_data = {} if project_id or project_name: scope_data['project'] = {} @@ -54,6 +50,9 @@ def _build_auth_scope(project_id=None, project_name=None, scope_data['domain']['id'] = domain_id else: scope_data['domain']['name'] = domain_name + if trust_id: + scope_data['trust'] = {} + scope_data['trust']['id'] = trust_id return scope_data @@ -81,10 +80,7 @@ def _build_token_auth(token): def _build_authentication_request(token=None, user_id=None, username=None, user_domain_id=None, user_domain_name=None, - password=None, project_id=None, - project_name=None, project_domain_id=None, - project_domain_name=None, - domain_id=None, domain_name=None): + password=None, **kwargs): """Build auth dictionary. It will create an auth dictionary based on all the arguments @@ -99,106 +95,11 @@ def _build_authentication_request(token=None, user_id=None, username=None, auth_data['identity']['methods'].append('password') auth_data['identity']['password'] = _build_password_auth( user_id, username, user_domain_id, user_domain_name, password) - if project_id or project_name or domain_id or domain_name: - auth_data['scope'] = _build_auth_scope(project_id, - project_name, - project_domain_id, - project_domain_name, - domain_id, - domain_name) + if kwargs: + auth_data['scope'] = _build_auth_scope(**kwargs) return {'auth': auth_data} -class AuthTest(test_v3.RestfulTestCase): - def assertValidTokenResponse(self, r): - self.assertTrue(r.getheader('X-Subject-Token')) - token = r.body - if r.getheader('Content-Type') == 'application/xml': - token = serializer.from_xml(etree.tostring(r.body))['token'] - else: - token = r.body['token'] - self.assertIn('expires_at', token) - self.assertValidISOTimeFormat(token['expires_at']) - if 'issued_at' in token: - self.assertValidISOTimeFormat(token['issued_at']) - self.assertIn('user', token) - self.assertEqual(self.user['id'], token['user']['id']) - self.assertEqual(self.user['name'], token['user']['name']) - self.assertEqual(self.user['domain_id'], token['user']['domain']['id']) - - return token - - def assertValidISOTimeFormat(self, timestr): - # match ISO 8610 time to seconds or fraction of seconds - isotime_re = re.compile( - '^\s*\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z\s*$') - self.assertTrue(isotime_re.match(timestr)) - - def assertValidUnscopedTokenResponse(self, r): - token = self.assertValidTokenResponse(r) - - self.assertNotIn('roles', token) - self.assertNotIn('catalog', token) - self.assertNotIn('project', token) - self.assertNotIn('domain', token) - - return token - - def assertValidScopedTokenResponse(self, r): - token = self.assertValidTokenResponse(r) - - self.assertIn('catalog', token) - self.assertIn('roles', token) - self.assertTrue(token['roles']) - for role in token['roles']: - self.assertIn('id', role) - self.assertIn('name', role) - - return token - - def assertValidProjectScopedTokenResponse(self, r): - token = self.assertValidScopedTokenResponse(r) - - self.assertIn('project', token) - self.assertIn('id', token['project']) - self.assertIn('name', token['project']) - self.assertIn('domain', token['project']) - self.assertIn('id', token['project']['domain']) - self.assertIn('name', token['project']['domain']) - - self.assertEqual(self.role_id, token['roles'][0]['id']) - - return token - - def assertValidDomainScopedTokenResponse(self, r): - token = self.assertValidScopedTokenResponse(r) - - self.assertIn('domain', token) - self.assertIn('id', token['domain']) - self.assertIn('name', token['domain']) - - return token - - def assertEqualTokens(self, a, b): - """Assert that two tokens are equal. - - Compare two tokens except for their ids. This also truncates - the time in the comparison. - """ - def normalize(token): - del token['token']['expires_at'] - del token['token']['issued_at'] - return token - - self.assertCloseEnoughForGovernmentWork( - timeutils.parse_isotime(a['token']['expires_at']), - timeutils.parse_isotime(b['token']['expires_at'])) - self.assertCloseEnoughForGovernmentWork( - timeutils.parse_isotime(a['token']['issued_at']), - timeutils.parse_isotime(b['token']['issued_at'])) - return self.assertDictEqual(normalize(a), normalize(b)) - - class TestAuthInfo(test.TestCase): def test_missing_auth_methods(self): auth_data = {'identity': {}} @@ -245,7 +146,7 @@ class TestAuthInfo(test.TestCase): auth_data) -class TestTokenAPIs(AuthTest): +class TestTokenAPIs(test_v3.RestfulTestCase): def setUp(self): super(TestTokenAPIs, self).setUp() auth_data = _build_authentication_request( @@ -395,7 +296,7 @@ class TestTokenAPIs(AuthTest): self.assertIn('signed', r.body) -class TestAuthJSON(AuthTest): +class TestAuthJSON(test_v3.RestfulTestCase): content_type = 'json' def test_unscoped_token_with_user_id(self): @@ -653,3 +554,255 @@ class TestAuthJSON(AuthTest): class TestAuthXML(TestAuthJSON): content_type = 'xml' + + +class TestTrustAuth(test_v3.RestfulTestCase): + def setUp(self): + super(TestTrustAuth, self).setUp() + + # create a trustee to delegate stuff to + self.trustee_user_id = uuid.uuid4().hex + self.trustee_user = self.new_user_ref(domain_id=self.domain_id) + self.trustee_user['id'] = self.trustee_user_id + self.identity_api.create_user(self.trustee_user_id, self.trustee_user) + + def test_create_trust_400(self): + raise nose.exc.SkipTest('Blocked by bug 1133435') + self.post('/trusts', body={'trust': {}}, expected_status=400) + + def test_create_unscoped_trust(self): + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id) + del ref['id'] + r = self.post('/trusts', body={'trust': ref}) + self.assertValidTrustResponse(r, ref) + + def test_trust_crud(self): + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + role_ids=[self.role_id]) + del ref['id'] + r = self.post('/trusts', body={'trust': ref}) + trust = self.assertValidTrustResponse(r, ref) + + r = self.get( + '/trusts/%(trust_id)s' % {'trust_id': trust['id']}, + expected_status=200) + self.assertValidTrustResponse(r, ref) + + # validate roles on the trust + r = self.get( + '/trusts/%(trust_id)s/roles' % { + 'trust_id': trust['id']}, + expected_status=200) + roles = self.assertValidRoleListResponse(r, self.role) + self.assertIn(self.role['id'], [x['id'] for x in roles]) + self.head( + '/trusts/%(trust_id)s/roles/%(role_id)s' % { + 'trust_id': trust['id'], + 'role_id': self.role['id']}, + expected_status=204) + r = self.get( + '/trusts/%(trust_id)s/roles/%(role_id)s' % { + 'trust_id': trust['id'], + 'role_id': self.role['id']}, + expected_status=200) + self.assertValidRoleResponse(r, self.role) + + r = self.get('/trusts', expected_status=200) + self.assertValidTrustListResponse(r, trust) + + # trusts are immutable + self.patch( + '/trusts/%(trust_id)s' % {'trust_id': trust['id']}, + body={'trust': ref}, + expected_status=404) + + self.delete( + '/trusts/%(trust_id)s' % {'trust_id': trust['id']}, + expected_status=204) + + self.get( + '/trusts/%(trust_id)s' % {'trust_id': trust['id']}, + expected_status=404) + + def test_create_trust_trustee_404(self): + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=uuid.uuid4().hex) + del ref['id'] + self.post('/trusts', body={'trust': ref}, expected_status=404) + + def test_create_trust_trustor_trustee_backwards(self): + ref = self.new_trust_ref( + trustor_user_id=self.trustee_user_id, + trustee_user_id=self.user_id) + del ref['id'] + self.post('/trusts', body={'trust': ref}, expected_status=403) + + def test_create_trust_project_404(self): + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=uuid.uuid4().hex, + role_ids=[self.role_id]) + del ref['id'] + self.post('/trusts', body={'trust': ref}, expected_status=404) + + def test_create_trust_role_id_404(self): + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + role_ids=[uuid.uuid4().hex]) + del ref['id'] + self.post('/trusts', body={'trust': ref}, expected_status=404) + + def test_create_trust_role_name_404(self): + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + role_names=[uuid.uuid4().hex]) + del ref['id'] + self.post('/trusts', body={'trust': ref}, expected_status=404) + + def test_create_expired_trust(self): + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + expires=dict(seconds=-1), + role_ids=[self.role_id]) + del ref['id'] + r = self.post('/trusts', body={'trust': ref}) + trust = self.assertValidTrustResponse(r, ref) + + self.get('/trusts/%(trust_id)s' % { + 'trust_id': trust['id']}, + expected_status=404) + + auth_data = _build_authentication_request( + user_id=self.trustee_user['id'], + password=self.trustee_user['password'], + trust_id=trust['id']) + self.post('/auth/tokens', body=auth_data, expected_status=401) + + def test_exercise_trust_scoped_token_without_impersonation(self): + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + impersonation=False, + expires=dict(minutes=1), + role_ids=[self.role_id]) + del ref['id'] + + r = self.post('/trusts', body={'trust': ref}) + trust = self.assertValidTrustResponse(r) + + auth_data = _build_authentication_request( + user_id=self.trustee_user['id'], + password=self.trustee_user['password'], + trust_id=trust['id']) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidProjectTrustScopedTokenResponse(r, self.trustee_user) + self.assertEqual(r.body['token']['user']['id'], + self.trustee_user['id']) + self.assertEqual(r.body['token']['user']['name'], + self.trustee_user['name']) + self.assertEqual(r.body['token']['user']['domain']['id'], + self.domain['id']) + self.assertEqual(r.body['token']['user']['domain']['name'], + self.domain['name']) + self.assertEqual(r.body['token']['project']['id'], self.project['id']) + self.assertEqual(r.body['token']['project']['name'], + self.project['name']) + + def test_exercise_trust_scoped_token_with_impersonation(self): + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + impersonation=True, + expires=dict(minutes=1), + role_ids=[self.role_id]) + del ref['id'] + + r = self.post('/trusts', body={'trust': ref}) + trust = self.assertValidTrustResponse(r) + + auth_data = _build_authentication_request( + user_id=self.trustee_user['id'], + password=self.trustee_user['password'], + trust_id=trust['id']) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidProjectTrustScopedTokenResponse(r, self.user) + self.assertEqual(r.body['token']['user']['id'], self.user['id']) + self.assertEqual(r.body['token']['user']['name'], self.user['name']) + self.assertEqual(r.body['token']['user']['domain']['id'], + self.domain['id']) + self.assertEqual(r.body['token']['user']['domain']['name'], + self.domain['name']) + self.assertEqual(r.body['token']['project']['id'], self.project['id']) + self.assertEqual(r.body['token']['project']['name'], + self.project['name']) + + def test_delete_trust(self): + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + impersonation=False, + expires=dict(minutes=1), + role_ids=[self.role_id]) + del ref['id'] + + r = self.post('/trusts', body={'trust': ref}) + + trust = self.assertValidTrustResponse(r, ref) + + self.delete('/trusts/%(trust_id)s' % { + 'trust_id': trust['id']}, + expected_status=204) + + self.get('/trusts/%(trust_id)s' % { + 'trust_id': trust['id']}, + expected_status=404) + + self.get('/trusts/%(trust_id)s' % { + 'trust_id': trust['id']}, + expected_status=404) + + auth_data = _build_authentication_request( + user_id=self.trustee_user['id'], + password=self.trustee_user['password'], + trust_id=trust['id']) + self.post('/auth/tokens', body=auth_data, expected_status=401) + + def test_list_trusts(self): + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + impersonation=False, + expires=dict(minutes=1), + role_ids=[self.role_id]) + del ref['id'] + + for i in range(0, 3): + r = self.post('/trusts', body={'trust': ref}) + trust = self.assertValidTrustResponse(r, ref) + + r = self.get('/trusts?trustor_user_id=%s' % + self.user_id, expected_status=200) + trusts = r.body['trusts'] + self.assertEqual(len(trusts), 3) + + r = self.get('/trusts?trustee_user_id=%s' % + self.user_id, expected_status=200) + trusts = r.body['trusts'] + self.assertEqual(len(trusts), 0) diff --git a/tests/test_v3_catalog.py b/tests/test_v3_catalog.py index 9f2f4f01..2d161db5 100644 --- a/tests/test_v3_catalog.py +++ b/tests/test_v3_catalog.py @@ -23,52 +23,6 @@ class CatalogTestCase(test_v3.RestfulTestCase): self.endpoint_id, self.endpoint.copy()) - # service validation - - def assertValidServiceListResponse(self, resp, **kwargs): - return self.assertValidListResponse( - resp, - 'services', - self.assertValidService, - **kwargs) - - def assertValidServiceResponse(self, resp, ref): - return self.assertValidResponse( - resp, - 'service', - self.assertValidService, - ref) - - def assertValidService(self, entity, ref=None): - self.assertIsNotNone(entity.get('type')) - if ref: - self.assertEqual(ref['type'], entity['type']) - return entity - - # endpoint validation - - def assertValidEndpointListResponse(self, resp, **kwargs): - return self.assertValidListResponse( - resp, - 'endpoints', - self.assertValidEndpoint, - **kwargs) - - def assertValidEndpointResponse(self, resp, ref): - return self.assertValidResponse( - resp, - 'endpoint', - self.assertValidEndpoint, - ref) - - def assertValidEndpoint(self, entity, ref=None): - self.assertIsNotNone(entity.get('interface')) - self.assertIsNotNone(entity.get('service_id')) - if ref: - self.assertEqual(ref['interface'], entity['interface']) - self.assertEqual(ref['service_id'], entity['service_id']) - return entity - # service crud tests def test_create_service(self): diff --git a/tests/test_v3_identity.py b/tests/test_v3_identity.py index 47e50281..9ef487c3 100644 --- a/tests/test_v3_identity.py +++ b/tests/test_v3_identity.py @@ -40,145 +40,6 @@ class IdentityTestCase(test_v3.RestfulTestCase): self.credential_id, self.credential) - # domain validation - - def assertValidDomainListResponse(self, resp, **kwargs): - return self.assertValidListResponse( - resp, - 'domains', - self.assertValidDomain, - **kwargs) - - def assertValidDomainResponse(self, resp, ref): - return self.assertValidResponse( - resp, - 'domain', - self.assertValidDomain, - ref) - - def assertValidDomain(self, entity, ref=None): - if ref: - pass - return entity - - # project validation - - def assertValidProjectListResponse(self, resp, **kwargs): - return self.assertValidListResponse( - resp, - 'projects', - self.assertValidProject, - **kwargs) - - def assertValidProjectResponse(self, resp, ref): - return self.assertValidResponse( - resp, - 'project', - self.assertValidProject, - ref) - - def assertValidProject(self, entity, ref=None): - self.assertIsNotNone(entity.get('domain_id')) - if ref: - self.assertEqual(ref['domain_id'], entity['domain_id']) - return entity - - # user validation - - def assertValidUserListResponse(self, resp, **kwargs): - return self.assertValidListResponse( - resp, - 'users', - self.assertValidUser, - **kwargs) - - def assertValidUserResponse(self, resp, ref): - return self.assertValidResponse( - resp, - 'user', - self.assertValidUser, - ref) - - def assertValidUser(self, entity, ref=None): - self.assertIsNotNone(entity.get('domain_id')) - self.assertIsNotNone(entity.get('email')) - self.assertIsNone(entity.get('password')) - if ref: - self.assertEqual(ref['domain_id'], entity['domain_id']) - self.assertEqual(ref['email'], entity['email']) - return entity - - # group validation - - def assertValidGroupListResponse(self, resp, **kwargs): - return self.assertValidListResponse( - resp, - 'groups', - self.assertValidGroup, - **kwargs) - - def assertValidGroupResponse(self, resp, ref): - return self.assertValidResponse( - resp, - 'group', - self.assertValidGroup, - ref) - - def assertValidGroup(self, entity, ref=None): - self.assertIsNotNone(entity.get('name')) - if ref: - self.assertEqual(ref['name'], entity['name']) - return entity - - # credential validation - - def assertValidCredentialListResponse(self, resp, **kwargs): - return self.assertValidListResponse( - resp, - 'credentials', - self.assertValidCredential, - **kwargs) - - def assertValidCredentialResponse(self, resp, ref): - return self.assertValidResponse( - resp, - 'credential', - self.assertValidCredential, - ref) - - def assertValidCredential(self, entity, ref=None): - self.assertIsNotNone(entity.get('user_id')) - self.assertIsNotNone(entity.get('blob')) - self.assertIsNotNone(entity.get('type')) - if ref: - self.assertEqual(ref['user_id'], entity['user_id']) - self.assertEqual(ref['blob'], entity['blob']) - self.assertEqual(ref['type'], entity['type']) - self.assertEqual(ref.get('project_id'), entity.get('project_id')) - return entity - - # role validation - - def assertValidRoleListResponse(self, resp, **kwargs): - return self.assertValidListResponse( - resp, - 'roles', - self.assertValidRole, - **kwargs) - - def assertValidRoleResponse(self, resp, ref): - return self.assertValidResponse( - resp, - 'role', - self.assertValidRole, - ref) - - def assertValidRole(self, entity, ref=None): - self.assertIsNotNone(entity.get('name')) - if ref: - self.assertEqual(ref['name'], entity['name']) - return entity - # domain crud tests def test_create_domain(self): diff --git a/tests/test_v3_policy.py b/tests/test_v3_policy.py index 811eb577..1af68b6e 100644 --- a/tests/test_v3_policy.py +++ b/tests/test_v3_policy.py @@ -15,30 +15,6 @@ class PolicyTestCase(test_v3.RestfulTestCase): self.policy_id, self.policy.copy()) - # policy validation - - def assertValidPolicyListResponse(self, resp, **kwargs): - return self.assertValidListResponse( - resp, - 'policies', - self.assertValidPolicy, - **kwargs) - - def assertValidPolicyResponse(self, resp, ref): - return self.assertValidResponse( - resp, - 'policy', - self.assertValidPolicy, - ref) - - def assertValidPolicy(self, entity, ref=None): - self.assertIsNotNone(entity.get('blob')) - self.assertIsNotNone(entity.get('type')) - if ref: - self.assertEqual(ref['blob'], entity['blob']) - self.assertEqual(ref['type'], entity['type']) - return entity - # policy crud tests def test_create_policy(self): |
