summaryrefslogtreecommitdiffstats
path: root/tests/test_v3.py
diff options
context:
space:
mode:
authorDolph Mathews <dolph.mathews@gmail.com>2013-02-26 19:46:03 -0600
committerGerrit Code Review <review@openstack.org>2013-03-05 19:36:32 +0000
commitc0f017ce861d1532f790b18a137b491440ffa83b (patch)
tree84f4372d7184a4034c5d7ef783ead38a05ca4ff8 /tests/test_v3.py
parent601eeb50b60a2e99041690fe19238202bc203503 (diff)
downloadkeystone-c0f017ce861d1532f790b18a137b491440ffa83b.tar.gz
keystone-c0f017ce861d1532f790b18a137b491440ffa83b.tar.xz
keystone-c0f017ce861d1532f790b18a137b491440ffa83b.zip
Expand v3 trust test coverage
Change-Id: Ie697e19c2fa0e4ac203ff24aa9efa23c25ea6ccd
Diffstat (limited to 'tests/test_v3.py')
-rw-r--r--tests/test_v3.py476
1 files changed, 465 insertions, 11 deletions
diff --git a/tests/test_v3.py b/tests/test_v3.py
index 2aa2c2b5..39807cec 100644
--- a/tests/test_v3.py
+++ b/tests/test_v3.py
@@ -1,20 +1,23 @@
+import datetime
import uuid
from lxml import etree
+from keystone import auth
from keystone.common import serializer
from keystone.common.sql import util as sql_util
-from keystone import auth
-from keystone import test
from keystone import config
+from keystone.openstack.common import timeutils
from keystone.policy.backends import rules
-
+from keystone import test
import test_content_types
CONF = config.CONF
+TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
+
class RestfulTestCase(test_content_types.RestfulTestCase):
def setUp(self):
@@ -133,6 +136,38 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
ref['type'] = uuid.uuid4().hex
return ref
+ def new_trust_ref(self, trustor_user_id, trustee_user_id, project_id=None,
+ impersonation=None, expires=None, role_ids=None,
+ role_names=None):
+ ref = self.new_ref()
+
+ ref['trustor_user_id'] = trustor_user_id
+ ref['trustee_user_id'] = trustee_user_id
+ ref['impersonation'] = impersonation or False
+ ref['project_id'] = project_id
+
+ if isinstance(expires, basestring):
+ ref['expires_at'] = expires
+ elif isinstance(expires, dict):
+ ref['expires_at'] = timeutils.strtime(
+ timeutils.utcnow() + datetime.timedelta(**expires),
+ fmt=TIME_FORMAT)
+ elif expires is None:
+ pass
+ else:
+ raise NotImplementedError('Unexpected value for "expires"')
+
+ role_ids = role_ids or []
+ role_names = role_names or []
+ if role_ids or role_names:
+ ref['roles'] = []
+ for role_id in role_ids:
+ ref['roles'].append({'id': role_id})
+ for role_name in role_names:
+ ref['roles'].append({'name': role_name})
+
+ return ref
+
def get_scoped_token(self):
"""Convenience method so that we can test authenticated requests."""
r = self.admin_request(
@@ -214,6 +249,23 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
self.assertIsNotNone(resp['error'].get('message'))
self.assertEqual(int(resp['error']['code']), r.status)
+ def assertValidListLinks(self, links):
+ self.assertIsNotNone(links)
+ self.assertIsNotNone(links.get('self'))
+ self.assertIn(CONF.public_endpoint % CONF, links['self'])
+
+ self.assertIn('next', links)
+ if links['next'] is not None:
+ self.assertIn(
+ CONF.public_endpoint % CONF,
+ links['next'])
+
+ self.assertIn('previous', links)
+ if links['previous'] is not None:
+ self.assertIn(
+ CONF.public_endpoint % CONF,
+ links['previous'])
+
def assertValidListResponse(self, resp, key, entity_validator, ref=None,
expected_length=None):
"""Make assertions common to all API list responses.
@@ -232,11 +284,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
self.assertTrue(len(entities))
# collections should have relational links
- self.assertIsNotNone(resp.body.get('links'))
- self.assertIn('previous', resp.body['links'])
- self.assertIn('self', resp.body['links'])
- self.assertIn('next', resp.body['links'])
- self.assertIn(CONF.public_endpoint % CONF, resp.body['links']['self'])
+ self.assertValidListLinks(resp.body.get('links'))
for entity in entities:
self.assertIsNotNone(entity)
@@ -248,12 +296,13 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
entity_validator(entity, ref)
return entities
- def assertValidResponse(self, resp, key, entity_validator, ref):
+ def assertValidResponse(self, resp, key, entity_validator, *args,
+ **kwargs):
"""Make assertions common to all API responses."""
entity = resp.body.get(key)
self.assertIsNotNone(entity)
- self.assertValidEntity(entity, ref)
- entity_validator(entity, ref)
+ self.assertValidEntity(entity, *args, **kwargs)
+ entity_validator(entity, *args, **kwargs)
return entity
def assertValidEntity(self, entity, ref=None):
@@ -280,6 +329,411 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
return entity
+ # auth validation
+
+ def assertValidISO8601ExtendedFormatDatetime(self, dt):
+ try:
+ return timeutils.parse_strtime(dt, fmt=TIME_FORMAT)
+ except Exception:
+ msg = '%s is not a valid ISO 8601 extended format date time.' % dt
+ raise AssertionError(msg)
+ self.assertTrue(isinstance(dt, datetime.datetime))
+
+ def assertValidTokenResponse(self, r, user=None):
+ self.assertTrue(r.getheader('X-Subject-Token'))
+ token = r.body
+ if r.getheader('Content-Type') == 'application/xml':
+ token = serializer.from_xml(etree.tostring(r.body))['token']
+ else:
+ token = r.body['token']
+
+ self.assertIsNotNone(token.get('expires_at'))
+ expires_at = self.assertValidISO8601ExtendedFormatDatetime(
+ token['expires_at'])
+ self.assertIsNotNone(token.get('issued_at'))
+ issued_at = self.assertValidISO8601ExtendedFormatDatetime(
+ token['issued_at'])
+ self.assertTrue(issued_at < expires_at)
+
+ self.assertIn('user', token)
+ self.assertIn('id', token['user'])
+ self.assertIn('name', token['user'])
+ self.assertIn('domain', token['user'])
+ self.assertIn('id', token['user']['domain'])
+
+ if user is not None:
+ self.assertEqual(user['id'], token['user']['id'])
+ self.assertEqual(user['name'], token['user']['name'])
+ self.assertEqual(user['domain_id'], token['user']['domain']['id'])
+
+ return token
+
+ def assertValidUnscopedTokenResponse(self, r, *args, **kwargs):
+ token = self.assertValidTokenResponse(r, *args, **kwargs)
+
+ self.assertNotIn('roles', token)
+ self.assertNotIn('catalog', token)
+ self.assertNotIn('project', token)
+ self.assertNotIn('domain', token)
+
+ return token
+
+ def assertValidScopedTokenResponse(self, r, *args, **kwargs):
+ token = self.assertValidTokenResponse(r, *args, **kwargs)
+
+ self.assertIn('catalog', token)
+ self.assertIn('roles', token)
+ self.assertTrue(token['roles'])
+ for role in token['roles']:
+ self.assertIn('id', role)
+ self.assertIn('name', role)
+
+ return token
+
+ def assertValidProjectScopedTokenResponse(self, r, *args, **kwargs):
+ token = self.assertValidScopedTokenResponse(r, *args, **kwargs)
+
+ self.assertIn('project', token)
+ self.assertIn('id', token['project'])
+ self.assertIn('name', token['project'])
+ self.assertIn('domain', token['project'])
+ self.assertIn('id', token['project']['domain'])
+ self.assertIn('name', token['project']['domain'])
+
+ self.assertEqual(self.role_id, token['roles'][0]['id'])
+
+ return token
+
+ def assertValidProjectTrustScopedTokenResponse(self, r, *args, **kwargs):
+ token = self.assertValidProjectScopedTokenResponse(r, *args, **kwargs)
+
+ self.assertIsNotNone(token.get('trust'))
+ self.assertIsNotNone(token['trust'].get('id'))
+ self.assertTrue(isinstance(token['trust'].get('impersonation'), bool))
+ self.assertIsNotNone(token['trust'].get('trustor_user'))
+ self.assertIsNotNone(token['trust'].get('trustee_user'))
+ self.assertIsNotNone(token['trust']['trustor_user'].get('id'))
+ self.assertIsNotNone(token['trust']['trustee_user'].get('id'))
+
+ def assertValidDomainScopedTokenResponse(self, r, *args, **kwargs):
+ token = self.assertValidScopedTokenResponse(r, *args, **kwargs)
+
+ self.assertIn('domain', token)
+ self.assertIn('id', token['domain'])
+ self.assertIn('name', token['domain'])
+
+ return token
+
+ def assertEqualTokens(self, a, b):
+ """Assert that two tokens are equal.
+
+ Compare two tokens except for their ids. This also truncates
+ the time in the comparison.
+ """
+ def normalize(token):
+ del token['token']['expires_at']
+ del token['token']['issued_at']
+ return token
+
+ a_expires_at = self.assertValidISO8601ExtendedFormatDatetime(
+ a['token']['expires_at'])
+ b_expires_at = self.assertValidISO8601ExtendedFormatDatetime(
+ b['token']['expires_at'])
+ self.assertCloseEnoughForGovernmentWork(a_expires_at, b_expires_at)
+
+ a_issued_at = self.assertValidISO8601ExtendedFormatDatetime(
+ a['token']['issued_at'])
+ b_issued_at = self.assertValidISO8601ExtendedFormatDatetime(
+ b['token']['issued_at'])
+ self.assertCloseEnoughForGovernmentWork(a_issued_at, b_issued_at)
+
+ return self.assertDictEqual(normalize(a), normalize(b))
+
+ # service validation
+
+ def assertValidServiceListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'services',
+ self.assertValidService,
+ *args,
+ **kwargs)
+
+ def assertValidServiceResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'service',
+ self.assertValidService,
+ *args,
+ **kwargs)
+
+ def assertValidService(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('type'))
+ if ref:
+ self.assertEqual(ref['type'], entity['type'])
+ return entity
+
+ # endpoint validation
+
+ def assertValidEndpointListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'endpoints',
+ self.assertValidEndpoint,
+ *args,
+ **kwargs)
+
+ def assertValidEndpointResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'endpoint',
+ self.assertValidEndpoint,
+ *args,
+ **kwargs)
+
+ def assertValidEndpoint(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('interface'))
+ self.assertIsNotNone(entity.get('service_id'))
+ if ref:
+ self.assertEqual(ref['interface'], entity['interface'])
+ self.assertEqual(ref['service_id'], entity['service_id'])
+ return entity
+
+ # domain validation
+
+ def assertValidDomainListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'domains',
+ self.assertValidDomain,
+ *args,
+ **kwargs)
+
+ def assertValidDomainResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'domain',
+ self.assertValidDomain,
+ *args,
+ **kwargs)
+
+ def assertValidDomain(self, entity, ref=None):
+ if ref:
+ pass
+ return entity
+
+ # project validation
+
+ def assertValidProjectListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'projects',
+ self.assertValidProject,
+ *args,
+ **kwargs)
+
+ def assertValidProjectResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'project',
+ self.assertValidProject,
+ *args,
+ **kwargs)
+
+ def assertValidProject(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('domain_id'))
+ if ref:
+ self.assertEqual(ref['domain_id'], entity['domain_id'])
+ return entity
+
+ # user validation
+
+ def assertValidUserListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'users',
+ self.assertValidUser,
+ *args,
+ **kwargs)
+
+ def assertValidUserResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'user',
+ self.assertValidUser,
+ *args,
+ **kwargs)
+
+ def assertValidUser(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('domain_id'))
+ self.assertIsNotNone(entity.get('email'))
+ self.assertIsNone(entity.get('password'))
+ if ref:
+ self.assertEqual(ref['domain_id'], entity['domain_id'])
+ self.assertEqual(ref['email'], entity['email'])
+ return entity
+
+ # group validation
+
+ def assertValidGroupListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'groups',
+ self.assertValidGroup,
+ *args,
+ **kwargs)
+
+ def assertValidGroupResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'group',
+ self.assertValidGroup,
+ *args,
+ **kwargs)
+
+ def assertValidGroup(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('name'))
+ if ref:
+ self.assertEqual(ref['name'], entity['name'])
+ return entity
+
+ # credential validation
+
+ def assertValidCredentialListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'credentials',
+ self.assertValidCredential,
+ *args,
+ **kwargs)
+
+ def assertValidCredentialResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'credential',
+ self.assertValidCredential,
+ *args,
+ **kwargs)
+
+ def assertValidCredential(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('user_id'))
+ self.assertIsNotNone(entity.get('blob'))
+ self.assertIsNotNone(entity.get('type'))
+ if ref:
+ self.assertEqual(ref['user_id'], entity['user_id'])
+ self.assertEqual(ref['blob'], entity['blob'])
+ self.assertEqual(ref['type'], entity['type'])
+ self.assertEqual(ref.get('project_id'), entity.get('project_id'))
+ return entity
+
+ # role validation
+
+ def assertValidRoleListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'roles',
+ self.assertValidRole,
+ *args,
+ **kwargs)
+
+ def assertValidRoleResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'role',
+ self.assertValidRole,
+ *args,
+ **kwargs)
+
+ def assertValidRole(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('name'))
+ if ref:
+ self.assertEqual(ref['name'], entity['name'])
+ return entity
+
+ # policy validation
+
+ def assertValidPolicyListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'policies',
+ self.assertValidPolicy,
+ *args,
+ **kwargs)
+
+ def assertValidPolicyResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'policy',
+ self.assertValidPolicy,
+ *args,
+ **kwargs)
+
+ def assertValidPolicy(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('blob'))
+ self.assertIsNotNone(entity.get('type'))
+ if ref:
+ self.assertEqual(ref['blob'], entity['blob'])
+ self.assertEqual(ref['type'], entity['type'])
+ return entity
+
+ # trust validation
+
+ def assertValidTrustListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'trusts',
+ self.assertValidTrust,
+ *args,
+ **kwargs)
+
+ def assertValidTrustResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'trust',
+ self.assertValidTrust,
+ *args,
+ **kwargs)
+
+ def assertValidTrust(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('trustor_user_id'))
+ self.assertIsNotNone(entity.get('trustee_user_id'))
+
+ self.assertIn('expires_at', entity)
+ if entity['expires_at'] is not None:
+ self.assertValidISO8601ExtendedFormatDatetime(entity['expires_at'])
+
+ # always disallow project xor project_id (neither or both is allowed)
+ has_roles = bool(entity.get('roles'))
+ has_project = bool(entity.get('project_id'))
+ self.assertFalse(has_roles ^ has_project)
+
+ for role in entity['roles']:
+ self.assertIsNotNone(role)
+ self.assertValidEntity(role)
+ self.assertValidRole(role)
+
+ self.assertValidListLinks(entity.get('roles_links'))
+
+ # these were used during dev and shouldn't land in final impl
+ self.assertNotIn('role_ids', entity)
+ self.assertNotIn('role_names', entity)
+
+ if ref:
+ self.assertEqual(ref['trustor_user_id'], entity['trustor_user_id'])
+ self.assertEqual(ref['trustee_user_id'], entity['trustee_user_id'])
+ self.assertEqual(ref['project_id'], entity['project_id'])
+ if entity.get('expires_at') or ref.get('expires_at'):
+ entity_exp = self.assertValidISO8601ExtendedFormatDatetime(
+ entity['expires_at'])
+ ref_exp = self.assertValidISO8601ExtendedFormatDatetime(
+ ref['expires_at'])
+ self.assertCloseEnoughForGovernmentWork(entity_exp, ref_exp)
+ else:
+ self.assertEqual(ref.get('expires_at'),
+ entity.get('expires_at'))
+
+ return entity
+
class VersionTestCase(RestfulTestCase):
def test_get_version(self):