summaryrefslogtreecommitdiffstats
path: root/keystone/tests/test_v3.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone/tests/test_v3.py')
-rw-r--r--keystone/tests/test_v3.py971
1 files changed, 971 insertions, 0 deletions
diff --git a/keystone/tests/test_v3.py b/keystone/tests/test_v3.py
new file mode 100644
index 00000000..7db14c84
--- /dev/null
+++ b/keystone/tests/test_v3.py
@@ -0,0 +1,971 @@
+import datetime
+import uuid
+
+from lxml import etree
+import webtest
+
+from keystone import auth
+from keystone.common import serializer
+from keystone import config
+from keystone.openstack.common import timeutils
+from keystone.policy.backends import rules
+from keystone.tests import core as test
+
+import test_content_types
+
+
+CONF = config.CONF
+DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id
+
+TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
+
+
+class RestfulTestCase(test_content_types.RestfulTestCase):
+ _config_file_list = [test.etcdir('keystone.conf.sample'),
+ test.testsdir('test_overrides.conf'),
+ test.testsdir('backend_sql.conf'),
+ test.testsdir('backend_sql_disk.conf')]
+
+ #override this to sepcify the complete list of configuration files
+ def config_files(self):
+ return self._config_file_list
+
+ def setUp(self, load_sample_data=True):
+ """Setup for v3 Restful Test Cases.
+
+ If a child class wants to create their own sample data
+ and provide their own auth data to obtain tokens, then
+ load_sample_data should be set to false.
+
+ """
+ self.config(self.config_files())
+
+ test.setup_test_database()
+ self.load_backends()
+
+ self.public_app = webtest.TestApp(
+ self.loadapp('keystone', name='main'))
+ self.admin_app = webtest.TestApp(
+ self.loadapp('keystone', name='admin'))
+
+ if load_sample_data:
+ self.domain_id = uuid.uuid4().hex
+ self.domain = self.new_domain_ref()
+ self.domain['id'] = self.domain_id
+ self.identity_api.create_domain(self.domain_id, self.domain)
+
+ self.project_id = uuid.uuid4().hex
+ self.project = self.new_project_ref(
+ domain_id=self.domain_id)
+ self.project['id'] = self.project_id
+ self.identity_api.create_project(self.project_id, self.project)
+
+ self.user_id = uuid.uuid4().hex
+ self.user = self.new_user_ref(
+ domain_id=self.domain_id,
+ project_id=self.project_id)
+ self.user['id'] = self.user_id
+ self.identity_api.create_user(self.user_id, self.user)
+
+ self.default_domain_project_id = uuid.uuid4().hex
+ self.default_domain_project = self.new_project_ref(
+ domain_id=DEFAULT_DOMAIN_ID)
+ self.default_domain_project['id'] = self.default_domain_project_id
+ self.identity_api.create_project(self.default_domain_project_id,
+ self.default_domain_project)
+
+ self.default_domain_user_id = uuid.uuid4().hex
+ self.default_domain_user = self.new_user_ref(
+ domain_id=DEFAULT_DOMAIN_ID,
+ project_id=self.default_domain_project_id)
+ self.default_domain_user['id'] = self.default_domain_user_id
+ self.identity_api.create_user(self.default_domain_user_id,
+ self.default_domain_user)
+
+ # create & grant policy.json's default role for admin_required
+ self.role_id = uuid.uuid4().hex
+ self.role = self.new_role_ref()
+ self.role['id'] = self.role_id
+ self.role['name'] = 'admin'
+ self.identity_api.create_role(self.role_id, self.role)
+ self.identity_api.add_role_to_user_and_project(
+ self.user_id, self.project_id, self.role_id)
+ self.identity_api.add_role_to_user_and_project(
+ self.default_domain_user_id, self.default_domain_project_id,
+ self.role_id)
+ self.identity_api.add_role_to_user_and_project(
+ self.default_domain_user_id, self.project_id,
+ self.role_id)
+
+ self.public_server = self.serveapp('keystone', name='main')
+ self.admin_server = self.serveapp('keystone', name='admin')
+
+ def tearDown(self):
+ self.public_server.kill()
+ self.admin_server.kill()
+ self.public_server = None
+ self.admin_server = None
+ test.teardown_test_database()
+ # need to reset the plug-ins
+ auth.controllers.AUTH_METHODS = {}
+ #drop the policy rules
+ CONF.reset()
+ rules.reset()
+
+ def new_ref(self):
+ """Populates a ref with attributes common to all API entities."""
+ return {
+ 'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'description': uuid.uuid4().hex,
+ 'enabled': True}
+
+ def new_service_ref(self):
+ ref = self.new_ref()
+ ref['type'] = uuid.uuid4().hex
+ return ref
+
+ def new_endpoint_ref(self, service_id):
+ ref = self.new_ref()
+ ref['interface'] = uuid.uuid4().hex[:8]
+ ref['service_id'] = service_id
+ ref['url'] = uuid.uuid4().hex
+ ref['region'] = uuid.uuid4().hex
+ return ref
+
+ def new_domain_ref(self):
+ ref = self.new_ref()
+ return ref
+
+ def new_project_ref(self, domain_id):
+ ref = self.new_ref()
+ ref['domain_id'] = domain_id
+ return ref
+
+ def new_user_ref(self, domain_id, project_id=None):
+ ref = self.new_ref()
+ ref['domain_id'] = domain_id
+ ref['email'] = uuid.uuid4().hex
+ ref['password'] = uuid.uuid4().hex
+ if project_id:
+ ref['project_id'] = project_id
+ return ref
+
+ def new_group_ref(self, domain_id):
+ ref = self.new_ref()
+ ref['domain_id'] = domain_id
+ return ref
+
+ def new_credential_ref(self, user_id, project_id=None):
+ ref = self.new_ref()
+ ref['user_id'] = user_id
+ ref['blob'] = uuid.uuid4().hex
+ ref['type'] = uuid.uuid4().hex
+ if project_id:
+ ref['project_id'] = project_id
+ return ref
+
+ def new_role_ref(self):
+ ref = self.new_ref()
+ return ref
+
+ def new_policy_ref(self):
+ ref = self.new_ref()
+ ref['blob'] = uuid.uuid4().hex
+ ref['type'] = uuid.uuid4().hex
+ return ref
+
+ def new_trust_ref(self, trustor_user_id, trustee_user_id, project_id=None,
+ impersonation=None, expires=None, role_ids=None,
+ role_names=None):
+ ref = self.new_ref()
+
+ ref['trustor_user_id'] = trustor_user_id
+ ref['trustee_user_id'] = trustee_user_id
+ ref['impersonation'] = impersonation or False
+ ref['project_id'] = project_id
+
+ if isinstance(expires, basestring):
+ ref['expires_at'] = expires
+ elif isinstance(expires, dict):
+ ref['expires_at'] = timeutils.strtime(
+ timeutils.utcnow() + datetime.timedelta(**expires),
+ fmt=TIME_FORMAT)
+ elif expires is None:
+ pass
+ else:
+ raise NotImplementedError('Unexpected value for "expires"')
+
+ role_ids = role_ids or []
+ role_names = role_names or []
+ if role_ids or role_names:
+ ref['roles'] = []
+ for role_id in role_ids:
+ ref['roles'].append({'id': role_id})
+ for role_name in role_names:
+ ref['roles'].append({'name': role_name})
+
+ return ref
+
+ def admin_request(self, *args, **kwargs):
+ """Translates XML responses to dicts.
+
+ This implies that we only have to write assertions for JSON.
+
+ """
+ r = super(RestfulTestCase, self).admin_request(*args, **kwargs)
+ if r.headers.get('Content-Type') == 'application/xml':
+ r.result = serializer.from_xml(etree.tostring(r.result))
+ return r
+
+ def get_scoped_token(self):
+ """Convenience method so that we can test authenticated requests."""
+ r = self.admin_request(
+ method='POST',
+ path='/v3/auth/tokens',
+ body={
+ 'auth': {
+ 'identity': {
+ 'methods': ['password'],
+ 'password': {
+ 'user': {
+ 'name': self.user['name'],
+ 'password': self.user['password'],
+ 'domain': {
+ 'id': self.user['domain_id']
+ }
+ }
+ }
+ },
+ 'scope': {
+ 'project': {
+ 'id': self.project['id'],
+ }
+ }
+ }
+ })
+ return r.headers.get('X-Subject-Token')
+
+ def get_requested_token(self, auth):
+ """Request the specific token we want."""
+
+ r = self.admin_request(
+ method='POST',
+ path='/v3/auth/tokens',
+ body=auth)
+ return r.headers.get('X-Subject-Token')
+
+ def v3_request(self, path, **kwargs):
+ # Check if the caller has passed in auth details for
+ # use in requesting the token
+ auth = kwargs.pop('auth', None)
+ if auth:
+ token = self.get_requested_token(auth)
+ else:
+ token = kwargs.pop('token', None)
+ if not token:
+ token = self.get_scoped_token()
+ path = '/v3' + path
+
+ return self.admin_request(path=path, token=token, **kwargs)
+
+ def get(self, path, **kwargs):
+ r = self.v3_request(method='GET', path=path, **kwargs)
+ if 'expected_status' not in kwargs:
+ self.assertResponseStatus(r, 200)
+ return r
+
+ def head(self, path, **kwargs):
+ r = self.v3_request(method='HEAD', path=path, **kwargs)
+ if 'expected_status' not in kwargs:
+ self.assertResponseStatus(r, 204)
+ return r
+
+ def post(self, path, **kwargs):
+ r = self.v3_request(method='POST', path=path, **kwargs)
+ if 'expected_status' not in kwargs:
+ self.assertResponseStatus(r, 201)
+ return r
+
+ def put(self, path, **kwargs):
+ r = self.v3_request(method='PUT', path=path, **kwargs)
+ if 'expected_status' not in kwargs:
+ self.assertResponseStatus(r, 204)
+ return r
+
+ def patch(self, path, **kwargs):
+ r = self.v3_request(method='PATCH', path=path, **kwargs)
+ if 'expected_status' not in kwargs:
+ self.assertResponseStatus(r, 200)
+ return r
+
+ def delete(self, path, **kwargs):
+ r = self.v3_request(method='DELETE', path=path, **kwargs)
+ if 'expected_status' not in kwargs:
+ self.assertResponseStatus(r, 204)
+ return r
+
+ def assertValidErrorResponse(self, r):
+ if r.headers.get('Content-Type') == 'application/xml':
+ resp = serializer.from_xml(etree.tostring(r.result))
+ else:
+ resp = r.result
+ self.assertIsNotNone(resp.get('error'))
+ self.assertIsNotNone(resp['error'].get('code'))
+ self.assertIsNotNone(resp['error'].get('title'))
+ self.assertIsNotNone(resp['error'].get('message'))
+ self.assertEqual(int(resp['error']['code']), r.status_code)
+
+ def assertValidListLinks(self, links):
+ self.assertIsNotNone(links)
+ self.assertIsNotNone(links.get('self'))
+ self.assertIn(CONF.public_endpoint % CONF, links['self'])
+
+ self.assertIn('next', links)
+ if links['next'] is not None:
+ self.assertIn(
+ CONF.public_endpoint % CONF,
+ links['next'])
+
+ self.assertIn('previous', links)
+ if links['previous'] is not None:
+ self.assertIn(
+ CONF.public_endpoint % CONF,
+ links['previous'])
+
+ def assertValidListResponse(self, resp, key, entity_validator, ref=None,
+ expected_length=None, keys_to_check=None):
+ """Make assertions common to all API list responses.
+
+ If a reference is provided, it's ID will be searched for in the
+ response, and asserted to be equal.
+
+ """
+ entities = resp.result.get(key)
+ self.assertIsNotNone(entities)
+
+ if expected_length is not None:
+ self.assertEqual(len(entities), expected_length)
+ elif ref is not None:
+ # we're at least expecting the ref
+ self.assertNotEmpty(entities)
+
+ # collections should have relational links
+ self.assertValidListLinks(resp.result.get('links'))
+
+ for entity in entities:
+ self.assertIsNotNone(entity)
+ self.assertValidEntity(entity, keys_to_check=keys_to_check)
+ entity_validator(entity)
+ if ref:
+ entity = [x for x in entities if x['id'] == ref['id']][0]
+ self.assertValidEntity(entity, ref=ref,
+ keys_to_check=keys_to_check)
+ entity_validator(entity, ref)
+ return entities
+
+ def assertValidResponse(self, resp, key, entity_validator, *args,
+ **kwargs):
+ """Make assertions common to all API responses."""
+ entity = resp.result.get(key)
+ self.assertIsNotNone(entity)
+ keys = kwargs.pop('keys_to_check', None)
+ self.assertValidEntity(entity, keys_to_check=keys, *args, **kwargs)
+ entity_validator(entity, *args, **kwargs)
+ return entity
+
+ def assertValidEntity(self, entity, ref=None, keys_to_check=None):
+ """Make assertions common to all API entities.
+
+ If a reference is provided, the entity will also be compared against
+ the reference.
+ """
+ if keys_to_check:
+ keys = keys_to_check
+ else:
+ keys = ['name', 'description', 'enabled']
+
+ for k in ['id'] + keys:
+ msg = '%s unexpectedly None in %s' % (k, entity)
+ self.assertIsNotNone(entity.get(k), msg)
+
+ self.assertIsNotNone(entity.get('links'))
+ self.assertIsNotNone(entity['links'].get('self'))
+ self.assertIn(CONF.public_endpoint % CONF, entity['links']['self'])
+ self.assertIn(entity['id'], entity['links']['self'])
+
+ if ref:
+ for k in keys:
+ msg = '%s not equal: %s != %s' % (k, ref[k], entity[k])
+ self.assertEquals(ref[k], entity[k])
+
+ return entity
+
+ # auth validation
+
+ def assertValidISO8601ExtendedFormatDatetime(self, dt):
+ try:
+ return timeutils.parse_strtime(dt, fmt=TIME_FORMAT)
+ except Exception:
+ msg = '%s is not a valid ISO 8601 extended format date time.' % dt
+ raise AssertionError(msg)
+ self.assertTrue(isinstance(dt, datetime.datetime))
+
+ def assertValidTokenResponse(self, r, user=None):
+ self.assertTrue(r.headers.get('X-Subject-Token'))
+ token = r.result['token']
+
+ self.assertIsNotNone(token.get('expires_at'))
+ expires_at = self.assertValidISO8601ExtendedFormatDatetime(
+ token['expires_at'])
+ self.assertIsNotNone(token.get('issued_at'))
+ issued_at = self.assertValidISO8601ExtendedFormatDatetime(
+ token['issued_at'])
+ self.assertTrue(issued_at < expires_at)
+
+ self.assertIn('user', token)
+ self.assertIn('id', token['user'])
+ self.assertIn('name', token['user'])
+ self.assertIn('domain', token['user'])
+ self.assertIn('id', token['user']['domain'])
+
+ if user is not None:
+ self.assertEqual(user['id'], token['user']['id'])
+ self.assertEqual(user['name'], token['user']['name'])
+ self.assertEqual(user['domain_id'], token['user']['domain']['id'])
+
+ return token
+
+ def assertValidUnscopedTokenResponse(self, r, *args, **kwargs):
+ token = self.assertValidTokenResponse(r, *args, **kwargs)
+
+ self.assertNotIn('roles', token)
+ self.assertNotIn('catalog', token)
+ self.assertNotIn('project', token)
+ self.assertNotIn('domain', token)
+
+ return token
+
+ def assertValidScopedTokenResponse(self, r, *args, **kwargs):
+ require_catalog = kwargs.pop('require_catalog', True)
+ token = self.assertValidTokenResponse(r, *args, **kwargs)
+
+ if require_catalog:
+ self.assertIn('catalog', token)
+ else:
+ self.assertNotIn('catalog', token)
+
+ self.assertIn('roles', token)
+ self.assertTrue(token['roles'])
+ for role in token['roles']:
+ self.assertIn('id', role)
+ self.assertIn('name', role)
+
+ return token
+
+ def assertValidProjectScopedTokenResponse(self, r, *args, **kwargs):
+ token = self.assertValidScopedTokenResponse(r, *args, **kwargs)
+
+ self.assertIn('project', token)
+ self.assertIn('id', token['project'])
+ self.assertIn('name', token['project'])
+ self.assertIn('domain', token['project'])
+ self.assertIn('id', token['project']['domain'])
+ self.assertIn('name', token['project']['domain'])
+
+ self.assertEqual(self.role_id, token['roles'][0]['id'])
+
+ return token
+
+ def assertValidProjectTrustScopedTokenResponse(self, r, *args, **kwargs):
+ token = self.assertValidProjectScopedTokenResponse(r, *args, **kwargs)
+
+ trust = token.get('OS-TRUST:trust')
+ self.assertIsNotNone(trust)
+ self.assertIsNotNone(trust.get('id'))
+ self.assertTrue(isinstance(trust.get('impersonation'), bool))
+ self.assertIsNotNone(trust.get('trustor_user'))
+ self.assertIsNotNone(trust.get('trustee_user'))
+ self.assertIsNotNone(trust['trustor_user'].get('id'))
+ self.assertIsNotNone(trust['trustee_user'].get('id'))
+
+ def assertValidDomainScopedTokenResponse(self, r, *args, **kwargs):
+ token = self.assertValidScopedTokenResponse(r, *args, **kwargs)
+
+ self.assertIn('domain', token)
+ self.assertIn('id', token['domain'])
+ self.assertIn('name', token['domain'])
+
+ return token
+
+ def assertEqualTokens(self, a, b):
+ """Assert that two tokens are equal.
+
+ Compare two tokens except for their ids. This also truncates
+ the time in the comparison.
+ """
+ def normalize(token):
+ del token['token']['expires_at']
+ del token['token']['issued_at']
+ return token
+
+ a_expires_at = self.assertValidISO8601ExtendedFormatDatetime(
+ a['token']['expires_at'])
+ b_expires_at = self.assertValidISO8601ExtendedFormatDatetime(
+ b['token']['expires_at'])
+ self.assertCloseEnoughForGovernmentWork(a_expires_at, b_expires_at)
+
+ a_issued_at = self.assertValidISO8601ExtendedFormatDatetime(
+ a['token']['issued_at'])
+ b_issued_at = self.assertValidISO8601ExtendedFormatDatetime(
+ b['token']['issued_at'])
+ self.assertCloseEnoughForGovernmentWork(a_issued_at, b_issued_at)
+
+ return self.assertDictEqual(normalize(a), normalize(b))
+
+ # service validation
+
+ def assertValidServiceListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'services',
+ self.assertValidService,
+ *args,
+ **kwargs)
+
+ def assertValidServiceResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'service',
+ self.assertValidService,
+ *args,
+ **kwargs)
+
+ def assertValidService(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('type'))
+ if ref:
+ self.assertEqual(ref['type'], entity['type'])
+ return entity
+
+ # endpoint validation
+
+ def assertValidEndpointListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'endpoints',
+ self.assertValidEndpoint,
+ *args,
+ **kwargs)
+
+ def assertValidEndpointResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'endpoint',
+ self.assertValidEndpoint,
+ *args,
+ **kwargs)
+
+ def assertValidEndpoint(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('interface'))
+ self.assertIsNotNone(entity.get('service_id'))
+
+ # this is intended to be an unexposed implementation detail
+ self.assertNotIn('legacy_endpoint_id', entity)
+
+ if ref:
+ self.assertEqual(ref['interface'], entity['interface'])
+ self.assertEqual(ref['service_id'], entity['service_id'])
+ return entity
+
+ # domain validation
+
+ def assertValidDomainListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'domains',
+ self.assertValidDomain,
+ *args,
+ **kwargs)
+
+ def assertValidDomainResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'domain',
+ self.assertValidDomain,
+ *args,
+ **kwargs)
+
+ def assertValidDomain(self, entity, ref=None):
+ if ref:
+ pass
+ return entity
+
+ # project validation
+
+ def assertValidProjectListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'projects',
+ self.assertValidProject,
+ *args,
+ **kwargs)
+
+ def assertValidProjectResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'project',
+ self.assertValidProject,
+ *args,
+ **kwargs)
+
+ def assertValidProject(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('domain_id'))
+ if ref:
+ self.assertEqual(ref['domain_id'], entity['domain_id'])
+ return entity
+
+ # user validation
+
+ def assertValidUserListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'users',
+ self.assertValidUser,
+ *args,
+ **kwargs)
+
+ def assertValidUserResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'user',
+ self.assertValidUser,
+ *args,
+ **kwargs)
+
+ def assertValidUser(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('domain_id'))
+ self.assertIsNotNone(entity.get('email'))
+ self.assertIsNone(entity.get('password'))
+ if ref:
+ self.assertEqual(ref['domain_id'], entity['domain_id'])
+ self.assertEqual(ref['email'], entity['email'])
+ return entity
+
+ # group validation
+
+ def assertValidGroupListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'groups',
+ self.assertValidGroup,
+ *args,
+ **kwargs)
+
+ def assertValidGroupResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'group',
+ self.assertValidGroup,
+ *args,
+ **kwargs)
+
+ def assertValidGroup(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('name'))
+ if ref:
+ self.assertEqual(ref['name'], entity['name'])
+ return entity
+
+ # credential validation
+
+ def assertValidCredentialListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'credentials',
+ self.assertValidCredential,
+ *args,
+ **kwargs)
+
+ def assertValidCredentialResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'credential',
+ self.assertValidCredential,
+ *args,
+ **kwargs)
+
+ def assertValidCredential(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('user_id'))
+ self.assertIsNotNone(entity.get('blob'))
+ self.assertIsNotNone(entity.get('type'))
+ if ref:
+ self.assertEqual(ref['user_id'], entity['user_id'])
+ self.assertEqual(ref['blob'], entity['blob'])
+ self.assertEqual(ref['type'], entity['type'])
+ self.assertEqual(ref.get('project_id'), entity.get('project_id'))
+ return entity
+
+ # role validation
+
+ def assertValidRoleListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'roles',
+ self.assertValidRole,
+ keys_to_check=['name'],
+ *args,
+ **kwargs)
+
+ def assertValidRoleResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'role',
+ self.assertValidRole,
+ keys_to_check=['name'],
+ *args,
+ **kwargs)
+
+ def assertValidRole(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('name'))
+ if ref:
+ self.assertEqual(ref['name'], entity['name'])
+ return entity
+
+ def assertValidRoleAssignmentListResponse(self, resp, ref=None,
+ expected_length=None):
+
+ entities = resp.result.get('role_assignments')
+
+ if expected_length is not None:
+ self.assertEqual(len(entities), expected_length)
+ elif ref is not None:
+ # we're at least expecting the ref
+ self.assertNotEmpty(entities)
+
+ # collections should have relational links
+ self.assertValidListLinks(resp.result.get('links'))
+
+ for entity in entities:
+ self.assertIsNotNone(entity)
+ self.assertValidRoleAssignment(entity)
+ if ref:
+ self.assertValidRoleAssignment(entity, ref)
+ return entities
+
+ def assertValidRoleAssignment(self, entity, ref=None, url=None):
+ self.assertIsNotNone(entity.get('role'))
+ self.assertIsNotNone(entity.get('scope'))
+
+ # Only one of user or group should be present
+ self.assertIsNotNone(entity.get('user') or
+ entity.get('group'))
+ self.assertIsNone(entity.get('user') and
+ entity.get('group'))
+
+ # Only one of domain or project should be present
+ self.assertIsNotNone(entity['scope'].get('project') or
+ entity['scope'].get('domain'))
+ self.assertIsNone(entity['scope'].get('project') and
+ entity['scope'].get('domain'))
+
+ if entity['scope'].get('project'):
+ self.assertIsNotNone(entity['scope']['project'].get('id'))
+ else:
+ self.assertIsNotNone(entity['scope']['domain'].get('id'))
+ self.assertIsNotNone(entity.get('links'))
+ self.assertIsNotNone(entity['links'].get('assignment'))
+
+ if ref:
+ if ref.get('user'):
+ self.assertEqual(ref['user']['id'], entity['user']['id'])
+ if ref.get('group'):
+ self.assertEqual(ref['group']['id'], entity['group']['id'])
+ if ref.get('role'):
+ self.assertEqual(ref['role']['id'], entity['role']['id'])
+ if ref['scope'].get('project'):
+ self.assertEqual(ref['scope']['project']['id'],
+ entity['scope']['project']['id'])
+ if ref['scope'].get('domain'):
+ self.assertEqual(ref['scope']['domain']['id'],
+ entity['scope']['domain']['id'])
+ if url:
+ self.assertIn(url, entity['links']['assignment'])
+
+ def assertRoleAssignmentInListResponse(
+ self, resp, ref, link_url=None, expected=1):
+
+ found_count = 0
+ for entity in resp.result.get('role_assignments'):
+ try:
+ self.assertValidRoleAssignment(
+ entity, ref=ref, url=link_url)
+ except Exception:
+ # It doesn't match, so let's go onto the next one
+ pass
+ else:
+ found_count += 1
+ self.assertEqual(found_count, expected)
+
+ def assertRoleAssignmentNotInListResponse(
+ self, resp, ref, link_url=None):
+
+ self.assertRoleAssignmentInListResponse(
+ resp, ref=ref, link_url=link_url, expected=0)
+
+ # policy validation
+
+ def assertValidPolicyListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'policies',
+ self.assertValidPolicy,
+ *args,
+ **kwargs)
+
+ def assertValidPolicyResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'policy',
+ self.assertValidPolicy,
+ *args,
+ **kwargs)
+
+ def assertValidPolicy(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('blob'))
+ self.assertIsNotNone(entity.get('type'))
+ if ref:
+ self.assertEqual(ref['blob'], entity['blob'])
+ self.assertEqual(ref['type'], entity['type'])
+ return entity
+
+ # trust validation
+
+ def assertValidTrustListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'trusts',
+ self.assertValidTrust,
+ *args,
+ **kwargs)
+
+ def assertValidTrustResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'trust',
+ self.assertValidTrust,
+ *args,
+ **kwargs)
+
+ def assertValidTrust(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('trustor_user_id'))
+ self.assertIsNotNone(entity.get('trustee_user_id'))
+
+ self.assertIn('expires_at', entity)
+ if entity['expires_at'] is not None:
+ self.assertValidISO8601ExtendedFormatDatetime(entity['expires_at'])
+
+ # always disallow project xor project_id (neither or both is allowed)
+ has_roles = bool(entity.get('roles'))
+ has_project = bool(entity.get('project_id'))
+ self.assertFalse(has_roles ^ has_project)
+
+ for role in entity['roles']:
+ self.assertIsNotNone(role)
+ self.assertValidEntity(role)
+ self.assertValidRole(role)
+
+ self.assertValidListLinks(entity.get('roles_links'))
+
+ # these were used during dev and shouldn't land in final impl
+ self.assertNotIn('role_ids', entity)
+ self.assertNotIn('role_names', entity)
+
+ if ref:
+ self.assertEqual(ref['trustor_user_id'], entity['trustor_user_id'])
+ self.assertEqual(ref['trustee_user_id'], entity['trustee_user_id'])
+ self.assertEqual(ref['project_id'], entity['project_id'])
+ if entity.get('expires_at') or ref.get('expires_at'):
+ entity_exp = self.assertValidISO8601ExtendedFormatDatetime(
+ entity['expires_at'])
+ ref_exp = self.assertValidISO8601ExtendedFormatDatetime(
+ ref['expires_at'])
+ self.assertCloseEnoughForGovernmentWork(entity_exp, ref_exp)
+ else:
+ self.assertEqual(ref.get('expires_at'),
+ entity.get('expires_at'))
+
+ return entity
+
+ def build_auth_scope(self, project_id=None, project_name=None,
+ project_domain_id=None, project_domain_name=None,
+ domain_id=None, domain_name=None, trust_id=None):
+ scope_data = {}
+ if project_id or project_name:
+ scope_data['project'] = {}
+ if project_id:
+ scope_data['project']['id'] = project_id
+ else:
+ scope_data['project']['name'] = project_name
+ if project_domain_id or project_domain_name:
+ project_domain_json = {}
+ if project_domain_id:
+ project_domain_json['id'] = project_domain_id
+ else:
+ project_domain_json['name'] = project_domain_name
+ scope_data['project']['domain'] = project_domain_json
+ if domain_id or domain_name:
+ scope_data['domain'] = {}
+ if domain_id:
+ scope_data['domain']['id'] = domain_id
+ else:
+ scope_data['domain']['name'] = domain_name
+ if trust_id:
+ scope_data['OS-TRUST:trust'] = {}
+ scope_data['OS-TRUST:trust']['id'] = trust_id
+ return scope_data
+
+ def build_password_auth(self, user_id=None, username=None,
+ user_domain_id=None, user_domain_name=None,
+ password=None):
+ password_data = {'user': {}}
+ if user_id:
+ password_data['user']['id'] = user_id
+ else:
+ password_data['user']['name'] = username
+ if user_domain_id or user_domain_name:
+ password_data['user']['domain'] = {}
+ if user_domain_id:
+ password_data['user']['domain']['id'] = user_domain_id
+ else:
+ password_data['user']['domain']['name'] = user_domain_name
+ password_data['user']['password'] = password
+ return password_data
+
+ def build_token_auth(self, token):
+ return {'id': token}
+
+ def build_authentication_request(self, token=None, user_id=None,
+ username=None, user_domain_id=None,
+ user_domain_name=None, password=None,
+ **kwargs):
+ """Build auth dictionary.
+
+ It will create an auth dictionary based on all the arguments
+ that it receives.
+ """
+ auth_data = {}
+ auth_data['identity'] = {'methods': []}
+ if token:
+ auth_data['identity']['methods'].append('token')
+ auth_data['identity']['token'] = self.build_token_auth(token)
+ if user_id or username:
+ auth_data['identity']['methods'].append('password')
+ auth_data['identity']['password'] = self.build_password_auth(
+ user_id, username, user_domain_id, user_domain_name, password)
+ if kwargs:
+ auth_data['scope'] = self.build_auth_scope(**kwargs)
+ return {'auth': auth_data}
+
+
+class VersionTestCase(RestfulTestCase):
+ def test_get_version(self):
+ pass