summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-03-06 03:43:48 +0000
committerGerrit Code Review <review@openstack.org>2013-03-06 03:43:48 +0000
commit2e8c4d2ff1897d4235dd9718d92cc7c0c6ea30e5 (patch)
tree8aeeec5f3bbe31c08f628468d83c144660c0ed62
parentcdeda9416b8ea4af5250fb4bd48865f4d87967b3 (diff)
parentc0f017ce861d1532f790b18a137b491440ffa83b (diff)
Merge "Expand v3 trust test coverage"
-rw-r--r--tests/test_v3.py476
-rw-r--r--tests/test_v3_auth.py371
-rw-r--r--tests/test_v3_catalog.py46
-rw-r--r--tests/test_v3_identity.py139
-rw-r--r--tests/test_v3_policy.py24
5 files changed, 727 insertions, 329 deletions
diff --git a/tests/test_v3.py b/tests/test_v3.py
index 2aa2c2b5..39807cec 100644
--- a/tests/test_v3.py
+++ b/tests/test_v3.py
@@ -1,20 +1,23 @@
+import datetime
import uuid
from lxml import etree
+from keystone import auth
from keystone.common import serializer
from keystone.common.sql import util as sql_util
-from keystone import auth
-from keystone import test
from keystone import config
+from keystone.openstack.common import timeutils
from keystone.policy.backends import rules
-
+from keystone import test
import test_content_types
CONF = config.CONF
+TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
+
class RestfulTestCase(test_content_types.RestfulTestCase):
def setUp(self):
@@ -133,6 +136,38 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
ref['type'] = uuid.uuid4().hex
return ref
+ def new_trust_ref(self, trustor_user_id, trustee_user_id, project_id=None,
+ impersonation=None, expires=None, role_ids=None,
+ role_names=None):
+ ref = self.new_ref()
+
+ ref['trustor_user_id'] = trustor_user_id
+ ref['trustee_user_id'] = trustee_user_id
+ ref['impersonation'] = impersonation or False
+ ref['project_id'] = project_id
+
+ if isinstance(expires, basestring):
+ ref['expires_at'] = expires
+ elif isinstance(expires, dict):
+ ref['expires_at'] = timeutils.strtime(
+ timeutils.utcnow() + datetime.timedelta(**expires),
+ fmt=TIME_FORMAT)
+ elif expires is None:
+ pass
+ else:
+ raise NotImplementedError('Unexpected value for "expires"')
+
+ role_ids = role_ids or []
+ role_names = role_names or []
+ if role_ids or role_names:
+ ref['roles'] = []
+ for role_id in role_ids:
+ ref['roles'].append({'id': role_id})
+ for role_name in role_names:
+ ref['roles'].append({'name': role_name})
+
+ return ref
+
def get_scoped_token(self):
"""Convenience method so that we can test authenticated requests."""
r = self.admin_request(
@@ -214,6 +249,23 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
self.assertIsNotNone(resp['error'].get('message'))
self.assertEqual(int(resp['error']['code']), r.status)
+ def assertValidListLinks(self, links):
+ self.assertIsNotNone(links)
+ self.assertIsNotNone(links.get('self'))
+ self.assertIn(CONF.public_endpoint % CONF, links['self'])
+
+ self.assertIn('next', links)
+ if links['next'] is not None:
+ self.assertIn(
+ CONF.public_endpoint % CONF,
+ links['next'])
+
+ self.assertIn('previous', links)
+ if links['previous'] is not None:
+ self.assertIn(
+ CONF.public_endpoint % CONF,
+ links['previous'])
+
def assertValidListResponse(self, resp, key, entity_validator, ref=None,
expected_length=None):
"""Make assertions common to all API list responses.
@@ -232,11 +284,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
self.assertTrue(len(entities))
# collections should have relational links
- self.assertIsNotNone(resp.body.get('links'))
- self.assertIn('previous', resp.body['links'])
- self.assertIn('self', resp.body['links'])
- self.assertIn('next', resp.body['links'])
- self.assertIn(CONF.public_endpoint % CONF, resp.body['links']['self'])
+ self.assertValidListLinks(resp.body.get('links'))
for entity in entities:
self.assertIsNotNone(entity)
@@ -248,12 +296,13 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
entity_validator(entity, ref)
return entities
- def assertValidResponse(self, resp, key, entity_validator, ref):
+ def assertValidResponse(self, resp, key, entity_validator, *args,
+ **kwargs):
"""Make assertions common to all API responses."""
entity = resp.body.get(key)
self.assertIsNotNone(entity)
- self.assertValidEntity(entity, ref)
- entity_validator(entity, ref)
+ self.assertValidEntity(entity, *args, **kwargs)
+ entity_validator(entity, *args, **kwargs)
return entity
def assertValidEntity(self, entity, ref=None):
@@ -280,6 +329,411 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
return entity
+ # auth validation
+
+ def assertValidISO8601ExtendedFormatDatetime(self, dt):
+ try:
+ return timeutils.parse_strtime(dt, fmt=TIME_FORMAT)
+ except Exception:
+ msg = '%s is not a valid ISO 8601 extended format date time.' % dt
+ raise AssertionError(msg)
+ self.assertTrue(isinstance(dt, datetime.datetime))
+
+ def assertValidTokenResponse(self, r, user=None):
+ self.assertTrue(r.getheader('X-Subject-Token'))
+ token = r.body
+ if r.getheader('Content-Type') == 'application/xml':
+ token = serializer.from_xml(etree.tostring(r.body))['token']
+ else:
+ token = r.body['token']
+
+ self.assertIsNotNone(token.get('expires_at'))
+ expires_at = self.assertValidISO8601ExtendedFormatDatetime(
+ token['expires_at'])
+ self.assertIsNotNone(token.get('issued_at'))
+ issued_at = self.assertValidISO8601ExtendedFormatDatetime(
+ token['issued_at'])
+ self.assertTrue(issued_at < expires_at)
+
+ self.assertIn('user', token)
+ self.assertIn('id', token['user'])
+ self.assertIn('name', token['user'])
+ self.assertIn('domain', token['user'])
+ self.assertIn('id', token['user']['domain'])
+
+ if user is not None:
+ self.assertEqual(user['id'], token['user']['id'])
+ self.assertEqual(user['name'], token['user']['name'])
+ self.assertEqual(user['domain_id'], token['user']['domain']['id'])
+
+ return token
+
+ def assertValidUnscopedTokenResponse(self, r, *args, **kwargs):
+ token = self.assertValidTokenResponse(r, *args, **kwargs)
+
+ self.assertNotIn('roles', token)
+ self.assertNotIn('catalog', token)
+ self.assertNotIn('project', token)
+ self.assertNotIn('domain', token)
+
+ return token
+
+ def assertValidScopedTokenResponse(self, r, *args, **kwargs):
+ token = self.assertValidTokenResponse(r, *args, **kwargs)
+
+ self.assertIn('catalog', token)
+ self.assertIn('roles', token)
+ self.assertTrue(token['roles'])
+ for role in token['roles']:
+ self.assertIn('id', role)
+ self.assertIn('name', role)
+
+ return token
+
+ def assertValidProjectScopedTokenResponse(self, r, *args, **kwargs):
+ token = self.assertValidScopedTokenResponse(r, *args, **kwargs)
+
+ self.assertIn('project', token)
+ self.assertIn('id', token['project'])
+ self.assertIn('name', token['project'])
+ self.assertIn('domain', token['project'])
+ self.assertIn('id', token['project']['domain'])
+ self.assertIn('name', token['project']['domain'])
+
+ self.assertEqual(self.role_id, token['roles'][0]['id'])
+
+ return token
+
+ def assertValidProjectTrustScopedTokenResponse(self, r, *args, **kwargs):
+ token = self.assertValidProjectScopedTokenResponse(r, *args, **kwargs)
+
+ self.assertIsNotNone(token.get('trust'))
+ self.assertIsNotNone(token['trust'].get('id'))
+ self.assertTrue(isinstance(token['trust'].get('impersonation'), bool))
+ self.assertIsNotNone(token['trust'].get('trustor_user'))
+ self.assertIsNotNone(token['trust'].get('trustee_user'))
+ self.assertIsNotNone(token['trust']['trustor_user'].get('id'))
+ self.assertIsNotNone(token['trust']['trustee_user'].get('id'))
+
+ def assertValidDomainScopedTokenResponse(self, r, *args, **kwargs):
+ token = self.assertValidScopedTokenResponse(r, *args, **kwargs)
+
+ self.assertIn('domain', token)
+ self.assertIn('id', token['domain'])
+ self.assertIn('name', token['domain'])
+
+ return token
+
+ def assertEqualTokens(self, a, b):
+ """Assert that two tokens are equal.
+
+ Compare two tokens except for their ids. This also truncates
+ the time in the comparison.
+ """
+ def normalize(token):
+ del token['token']['expires_at']
+ del token['token']['issued_at']
+ return token
+
+ a_expires_at = self.assertValidISO8601ExtendedFormatDatetime(
+ a['token']['expires_at'])
+ b_expires_at = self.assertValidISO8601ExtendedFormatDatetime(
+ b['token']['expires_at'])
+ self.assertCloseEnoughForGovernmentWork(a_expires_at, b_expires_at)
+
+ a_issued_at = self.assertValidISO8601ExtendedFormatDatetime(
+ a['token']['issued_at'])
+ b_issued_at = self.assertValidISO8601ExtendedFormatDatetime(
+ b['token']['issued_at'])
+ self.assertCloseEnoughForGovernmentWork(a_issued_at, b_issued_at)
+
+ return self.assertDictEqual(normalize(a), normalize(b))
+
+ # service validation
+
+ def assertValidServiceListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'services',
+ self.assertValidService,
+ *args,
+ **kwargs)
+
+ def assertValidServiceResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'service',
+ self.assertValidService,
+ *args,
+ **kwargs)
+
+ def assertValidService(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('type'))
+ if ref:
+ self.assertEqual(ref['type'], entity['type'])
+ return entity
+
+ # endpoint validation
+
+ def assertValidEndpointListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'endpoints',
+ self.assertValidEndpoint,
+ *args,
+ **kwargs)
+
+ def assertValidEndpointResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'endpoint',
+ self.assertValidEndpoint,
+ *args,
+ **kwargs)
+
+ def assertValidEndpoint(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('interface'))
+ self.assertIsNotNone(entity.get('service_id'))
+ if ref:
+ self.assertEqual(ref['interface'], entity['interface'])
+ self.assertEqual(ref['service_id'], entity['service_id'])
+ return entity
+
+ # domain validation
+
+ def assertValidDomainListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'domains',
+ self.assertValidDomain,
+ *args,
+ **kwargs)
+
+ def assertValidDomainResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'domain',
+ self.assertValidDomain,
+ *args,
+ **kwargs)
+
+ def assertValidDomain(self, entity, ref=None):
+ if ref:
+ pass
+ return entity
+
+ # project validation
+
+ def assertValidProjectListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'projects',
+ self.assertValidProject,
+ *args,
+ **kwargs)
+
+ def assertValidProjectResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'project',
+ self.assertValidProject,
+ *args,
+ **kwargs)
+
+ def assertValidProject(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('domain_id'))
+ if ref:
+ self.assertEqual(ref['domain_id'], entity['domain_id'])
+ return entity
+
+ # user validation
+
+ def assertValidUserListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'users',
+ self.assertValidUser,
+ *args,
+ **kwargs)
+
+ def assertValidUserResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'user',
+ self.assertValidUser,
+ *args,
+ **kwargs)
+
+ def assertValidUser(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('domain_id'))
+ self.assertIsNotNone(entity.get('email'))
+ self.assertIsNone(entity.get('password'))
+ if ref:
+ self.assertEqual(ref['domain_id'], entity['domain_id'])
+ self.assertEqual(ref['email'], entity['email'])
+ return entity
+
+ # group validation
+
+ def assertValidGroupListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'groups',
+ self.assertValidGroup,
+ *args,
+ **kwargs)
+
+ def assertValidGroupResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'group',
+ self.assertValidGroup,
+ *args,
+ **kwargs)
+
+ def assertValidGroup(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('name'))
+ if ref:
+ self.assertEqual(ref['name'], entity['name'])
+ return entity
+
+ # credential validation
+
+ def assertValidCredentialListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'credentials',
+ self.assertValidCredential,
+ *args,
+ **kwargs)
+
+ def assertValidCredentialResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'credential',
+ self.assertValidCredential,
+ *args,
+ **kwargs)
+
+ def assertValidCredential(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('user_id'))
+ self.assertIsNotNone(entity.get('blob'))
+ self.assertIsNotNone(entity.get('type'))
+ if ref:
+ self.assertEqual(ref['user_id'], entity['user_id'])
+ self.assertEqual(ref['blob'], entity['blob'])
+ self.assertEqual(ref['type'], entity['type'])
+ self.assertEqual(ref.get('project_id'), entity.get('project_id'))
+ return entity
+
+ # role validation
+
+ def assertValidRoleListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'roles',
+ self.assertValidRole,
+ *args,
+ **kwargs)
+
+ def assertValidRoleResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'role',
+ self.assertValidRole,
+ *args,
+ **kwargs)
+
+ def assertValidRole(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('name'))
+ if ref:
+ self.assertEqual(ref['name'], entity['name'])
+ return entity
+
+ # policy validation
+
+ def assertValidPolicyListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'policies',
+ self.assertValidPolicy,
+ *args,
+ **kwargs)
+
+ def assertValidPolicyResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'policy',
+ self.assertValidPolicy,
+ *args,
+ **kwargs)
+
+ def assertValidPolicy(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('blob'))
+ self.assertIsNotNone(entity.get('type'))
+ if ref:
+ self.assertEqual(ref['blob'], entity['blob'])
+ self.assertEqual(ref['type'], entity['type'])
+ return entity
+
+ # trust validation
+
+ def assertValidTrustListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'trusts',
+ self.assertValidTrust,
+ *args,
+ **kwargs)
+
+ def assertValidTrustResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'trust',
+ self.assertValidTrust,
+ *args,
+ **kwargs)
+
+ def assertValidTrust(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('trustor_user_id'))
+ self.assertIsNotNone(entity.get('trustee_user_id'))
+
+ self.assertIn('expires_at', entity)
+ if entity['expires_at'] is not None:
+ self.assertValidISO8601ExtendedFormatDatetime(entity['expires_at'])
+
+ # always disallow project xor project_id (neither or both is allowed)
+ has_roles = bool(entity.get('roles'))
+ has_project = bool(entity.get('project_id'))
+ self.assertFalse(has_roles ^ has_project)
+
+ for role in entity['roles']:
+ self.assertIsNotNone(role)
+ self.assertValidEntity(role)
+ self.assertValidRole(role)
+
+ self.assertValidListLinks(entity.get('roles_links'))
+
+ # these were used during dev and shouldn't land in final impl
+ self.assertNotIn('role_ids', entity)
+ self.assertNotIn('role_names', entity)
+
+ if ref:
+ self.assertEqual(ref['trustor_user_id'], entity['trustor_user_id'])
+ self.assertEqual(ref['trustee_user_id'], entity['trustee_user_id'])
+ self.assertEqual(ref['project_id'], entity['project_id'])
+ if entity.get('expires_at') or ref.get('expires_at'):
+ entity_exp = self.assertValidISO8601ExtendedFormatDatetime(
+ entity['expires_at'])
+ ref_exp = self.assertValidISO8601ExtendedFormatDatetime(
+ ref['expires_at'])
+ self.assertCloseEnoughForGovernmentWork(entity_exp, ref_exp)
+ else:
+ self.assertEqual(ref.get('expires_at'),
+ entity.get('expires_at'))
+
+ return entity
+
class VersionTestCase(RestfulTestCase):
def test_get_version(self):
diff --git a/tests/test_v3_auth.py b/tests/test_v3_auth.py
index 8f66cdf4..40a4c846 100644
--- a/tests/test_v3_auth.py
+++ b/tests/test_v3_auth.py
@@ -12,17 +12,13 @@
# License for the specific language governing permissions and limitations
# under the License.
-import re
import uuid
-from lxml import etree
+import nose.exc
from keystone import auth
from keystone import config
from keystone import exception
-from keystone.common import serializer
-from keystone.openstack.common import jsonutils
-from keystone.openstack.common import timeutils
from keystone import test
import test_v3
@@ -33,7 +29,7 @@ CONF = config.CONF
def _build_auth_scope(project_id=None, project_name=None,
project_domain_id=None, project_domain_name=None,
- domain_id=None, domain_name=None):
+ domain_id=None, domain_name=None, trust_id=None):
scope_data = {}
if project_id or project_name:
scope_data['project'] = {}
@@ -54,6 +50,9 @@ def _build_auth_scope(project_id=None, project_name=None,
scope_data['domain']['id'] = domain_id
else:
scope_data['domain']['name'] = domain_name
+ if trust_id:
+ scope_data['trust'] = {}
+ scope_data['trust']['id'] = trust_id
return scope_data
@@ -81,10 +80,7 @@ def _build_token_auth(token):
def _build_authentication_request(token=None, user_id=None, username=None,
user_domain_id=None, user_domain_name=None,
- password=None, project_id=None,
- project_name=None, project_domain_id=None,
- project_domain_name=None,
- domain_id=None, domain_name=None):
+ password=None, **kwargs):
"""Build auth dictionary.
It will create an auth dictionary based on all the arguments
@@ -99,106 +95,11 @@ def _build_authentication_request(token=None, user_id=None, username=None,
auth_data['identity']['methods'].append('password')
auth_data['identity']['password'] = _build_password_auth(
user_id, username, user_domain_id, user_domain_name, password)
- if project_id or project_name or domain_id or domain_name:
- auth_data['scope'] = _build_auth_scope(project_id,
- project_name,
- project_domain_id,
- project_domain_name,
- domain_id,
- domain_name)
+ if kwargs:
+ auth_data['scope'] = _build_auth_scope(**kwargs)
return {'auth': auth_data}
-class AuthTest(test_v3.RestfulTestCase):
- def assertValidTokenResponse(self, r):
- self.assertTrue(r.getheader('X-Subject-Token'))
- token = r.body
- if r.getheader('Content-Type') == 'application/xml':
- token = serializer.from_xml(etree.tostring(r.body))['token']
- else:
- token = r.body['token']
- self.assertIn('expires_at', token)
- self.assertValidISOTimeFormat(token['expires_at'])
- if 'issued_at' in token:
- self.assertValidISOTimeFormat(token['issued_at'])
- self.assertIn('user', token)
- self.assertEqual(self.user['id'], token['user']['id'])
- self.assertEqual(self.user['name'], token['user']['name'])
- self.assertEqual(self.user['domain_id'], token['user']['domain']['id'])
-
- return token
-
- def assertValidISOTimeFormat(self, timestr):
- # match ISO 8610 time to seconds or fraction of seconds
- isotime_re = re.compile(
- '^\s*\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z\s*$')
- self.assertTrue(isotime_re.match(timestr))
-
- def assertValidUnscopedTokenResponse(self, r):
- token = self.assertValidTokenResponse(r)
-
- self.assertNotIn('roles', token)
- self.assertNotIn('catalog', token)
- self.assertNotIn('project', token)
- self.assertNotIn('domain', token)
-
- return token
-
- def assertValidScopedTokenResponse(self, r):
- token = self.assertValidTokenResponse(r)
-
- self.assertIn('catalog', token)
- self.assertIn('roles', token)
- self.assertTrue(token['roles'])
- for role in token['roles']:
- self.assertIn('id', role)
- self.assertIn('name', role)
-
- return token
-
- def assertValidProjectScopedTokenResponse(self, r):
- token = self.assertValidScopedTokenResponse(r)
-
- self.assertIn('project', token)
- self.assertIn('id', token['project'])
- self.assertIn('name', token['project'])
- self.assertIn('domain', token['project'])
- self.assertIn('id', token['project']['domain'])
- self.assertIn('name', token['project']['domain'])
-
- self.assertEqual(self.role_id, token['roles'][0]['id'])
-
- return token
-
- def assertValidDomainScopedTokenResponse(self, r):
- token = self.assertValidScopedTokenResponse(r)
-
- self.assertIn('domain', token)
- self.assertIn('id', token['domain'])
- self.assertIn('name', token['domain'])
-
- return token
-
- def assertEqualTokens(self, a, b):
- """Assert that two tokens are equal.
-
- Compare two tokens except for their ids. This also truncates
- the time in the comparison.
- """
- def normalize(token):
- del token['token']['expires_at']
- del token['token']['issued_at']
- return token
-
- self.assertCloseEnoughForGovernmentWork(
- timeutils.parse_isotime(a['token']['expires_at']),
- timeutils.parse_isotime(b['token']['expires_at']))
- self.assertCloseEnoughForGovernmentWork(
- timeutils.parse_isotime(a['token']['issued_at']),
- timeutils.parse_isotime(b['token']['issued_at']))
- return self.assertDictEqual(normalize(a), normalize(b))
-
-
class TestAuthInfo(test.TestCase):
def test_missing_auth_methods(self):
auth_data = {'identity': {}}
@@ -245,7 +146,7 @@ class TestAuthInfo(test.TestCase):
auth_data)
-class TestTokenAPIs(AuthTest):
+class TestTokenAPIs(test_v3.RestfulTestCase):
def setUp(self):
super(TestTokenAPIs, self).setUp()
auth_data = _build_authentication_request(
@@ -395,7 +296,7 @@ class TestTokenAPIs(AuthTest):
self.assertIn('signed', r.body)
-class TestAuthJSON(AuthTest):
+class TestAuthJSON(test_v3.RestfulTestCase):
content_type = 'json'
def test_unscoped_token_with_user_id(self):
@@ -653,3 +554,255 @@ class TestAuthJSON(AuthTest):
class TestAuthXML(TestAuthJSON):
content_type = 'xml'
+
+
+class TestTrustAuth(test_v3.RestfulTestCase):
+ def setUp(self):
+ super(TestTrustAuth, self).setUp()
+
+ # create a trustee to delegate stuff to
+ self.trustee_user_id = uuid.uuid4().hex
+ self.trustee_user = self.new_user_ref(domain_id=self.domain_id)
+ self.trustee_user['id'] = self.trustee_user_id
+ self.identity_api.create_user(self.trustee_user_id, self.trustee_user)
+
+ def test_create_trust_400(self):
+ raise nose.exc.SkipTest('Blocked by bug 1133435')
+ self.post('/trusts', body={'trust': {}}, expected_status=400)
+
+ def test_create_unscoped_trust(self):
+ ref = self.new_trust_ref(
+ trustor_user_id=self.user_id,
+ trustee_user_id=self.trustee_user_id)
+ del ref['id']
+ r = self.post('/trusts', body={'trust': ref})
+ self.assertValidTrustResponse(r, ref)
+
+ def test_trust_crud(self):
+ ref = self.new_trust_ref(
+ trustor_user_id=self.user_id,
+ trustee_user_id=self.trustee_user_id,
+ project_id=self.project_id,
+ role_ids=[self.role_id])
+ del ref['id']
+ r = self.post('/trusts', body={'trust': ref})
+ trust = self.assertValidTrustResponse(r, ref)
+
+ r = self.get(
+ '/trusts/%(trust_id)s' % {'trust_id': trust['id']},
+ expected_status=200)
+ self.assertValidTrustResponse(r, ref)
+
+ # validate roles on the trust
+ r = self.get(
+ '/trusts/%(trust_id)s/roles' % {
+ 'trust_id': trust['id']},
+ expected_status=200)
+ roles = self.assertValidRoleListResponse(r, self.role)
+ self.assertIn(self.role['id'], [x['id'] for x in roles])
+ self.head(
+ '/trusts/%(trust_id)s/roles/%(role_id)s' % {
+ 'trust_id': trust['id'],
+ 'role_id': self.role['id']},
+ expected_status=204)
+ r = self.get(
+ '/trusts/%(trust_id)s/roles/%(role_id)s' % {
+ 'trust_id': trust['id'],
+ 'role_id': self.role['id']},
+ expected_status=200)
+ self.assertValidRoleResponse(r, self.role)
+
+ r = self.get('/trusts', expected_status=200)
+ self.assertValidTrustListResponse(r, trust)
+
+ # trusts are immutable
+ self.patch(
+ '/trusts/%(trust_id)s' % {'trust_id': trust['id']},
+ body={'trust': ref},
+ expected_status=404)
+
+ self.delete(
+ '/trusts/%(trust_id)s' % {'trust_id': trust['id']},
+ expected_status=204)
+
+ self.get(
+ '/trusts/%(trust_id)s' % {'trust_id': trust['id']},
+ expected_status=404)
+
+ def test_create_trust_trustee_404(self):
+ ref = self.new_trust_ref(
+ trustor_user_id=self.user_id,
+ trustee_user_id=uuid.uuid4().hex)
+ del ref['id']
+ self.post('/trusts', body={'trust': ref}, expected_status=404)
+
+ def test_create_trust_trustor_trustee_backwards(self):
+ ref = self.new_trust_ref(
+ trustor_user_id=self.trustee_user_id,
+ trustee_user_id=self.user_id)
+ del ref['id']
+ self.post('/trusts', body={'trust': ref}, expected_status=403)
+
+ def test_create_trust_project_404(self):
+ ref = self.new_trust_ref(
+ trustor_user_id=self.user_id,
+ trustee_user_id=self.trustee_user_id,
+ project_id=uuid.uuid4().hex,
+ role_ids=[self.role_id])
+ del ref['id']
+ self.post('/trusts', body={'trust': ref}, expected_status=404)
+
+ def test_create_trust_role_id_404(self):
+ ref = self.new_trust_ref(
+ trustor_user_id=self.user_id,
+ trustee_user_id=self.trustee_user_id,
+ project_id=self.project_id,
+ role_ids=[uuid.uuid4().hex])
+ del ref['id']
+ self.post('/trusts', body={'trust': ref}, expected_status=404)
+
+ def test_create_trust_role_name_404(self):
+ ref = self.new_trust_ref(
+ trustor_user_id=self.user_id,
+ trustee_user_id=self.trustee_user_id,
+ project_id=self.project_id,
+ role_names=[uuid.uuid4().hex])
+ del ref['id']
+ self.post('/trusts', body={'trust': ref}, expected_status=404)
+
+ def test_create_expired_trust(self):
+ ref = self.new_trust_ref(
+ trustor_user_id=self.user_id,
+ trustee_user_id=self.trustee_user_id,
+ project_id=self.project_id,
+ expires=dict(seconds=-1),
+ role_ids=[self.role_id])
+ del ref['id']
+ r = self.post('/trusts', body={'trust': ref})
+ trust = self.assertValidTrustResponse(r, ref)
+
+ self.get('/trusts/%(trust_id)s' % {
+ 'trust_id': trust['id']},
+ expected_status=404)
+
+ auth_data = _build_authentication_request(
+ user_id=self.trustee_user['id'],
+ password=self.trustee_user['password'],
+ trust_id=trust['id'])
+ self.post('/auth/tokens', body=auth_data, expected_status=401)
+
+ def test_exercise_trust_scoped_token_without_impersonation(self):
+ ref = self.new_trust_ref(
+ trustor_user_id=self.user_id,
+ trustee_user_id=self.trustee_user_id,
+ project_id=self.project_id,
+ impersonation=False,
+ expires=dict(minutes=1),
+ role_ids=[self.role_id])
+ del ref['id']
+
+ r = self.post('/trusts', body={'trust': ref})
+ trust = self.assertValidTrustResponse(r)
+
+ auth_data = _build_authentication_request(
+ user_id=self.trustee_user['id'],
+ password=self.trustee_user['password'],
+ trust_id=trust['id'])
+ r = self.post('/auth/tokens', body=auth_data)
+ self.assertValidProjectTrustScopedTokenResponse(r, self.trustee_user)
+ self.assertEqual(r.body['token']['user']['id'],
+ self.trustee_user['id'])
+ self.assertEqual(r.body['token']['user']['name'],
+ self.trustee_user['name'])
+ self.assertEqual(r.body['token']['user']['domain']['id'],
+ self.domain['id'])
+ self.assertEqual(r.body['token']['user']['domain']['name'],
+ self.domain['name'])
+ self.assertEqual(r.body['token']['project']['id'], self.project['id'])
+ self.assertEqual(r.body['token']['project']['name'],
+ self.project['name'])
+
+ def test_exercise_trust_scoped_token_with_impersonation(self):
+ ref = self.new_trust_ref(
+ trustor_user_id=self.user_id,
+ trustee_user_id=self.trustee_user_id,
+ project_id=self.project_id,
+ impersonation=True,
+ expires=dict(minutes=1),
+ role_ids=[self.role_id])
+ del ref['id']
+
+ r = self.post('/trusts', body={'trust': ref})
+ trust = self.assertValidTrustResponse(r)
+
+ auth_data = _build_authentication_request(
+ user_id=self.trustee_user['id'],
+ password=self.trustee_user['password'],
+ trust_id=trust['id'])
+ r = self.post('/auth/tokens', body=auth_data)
+ self.assertValidProjectTrustScopedTokenResponse(r, self.user)
+ self.assertEqual(r.body['token']['user']['id'], self.user['id'])
+ self.assertEqual(r.body['token']['user']['name'], self.user['name'])
+ self.assertEqual(r.body['token']['user']['domain']['id'],
+ self.domain['id'])
+ self.assertEqual(r.body['token']['user']['domain']['name'],
+ self.domain['name'])
+ self.assertEqual(r.body['token']['project']['id'], self.project['id'])
+ self.assertEqual(r.body['token']['project']['name'],
+ self.project['name'])
+
+ def test_delete_trust(self):
+ ref = self.new_trust_ref(
+ trustor_user_id=self.user_id,
+ trustee_user_id=self.trustee_user_id,
+ project_id=self.project_id,
+ impersonation=False,
+ expires=dict(minutes=1),
+ role_ids=[self.role_id])
+ del ref['id']
+
+ r = self.post('/trusts', body={'trust': ref})
+
+ trust = self.assertValidTrustResponse(r, ref)
+
+ self.delete('/trusts/%(trust_id)s' % {
+ 'trust_id': trust['id']},
+ expected_status=204)
+
+ self.get('/trusts/%(trust_id)s' % {
+ 'trust_id': trust['id']},
+ expected_status=404)
+
+ self.get('/trusts/%(trust_id)s' % {
+ 'trust_id': trust['id']},
+ expected_status=404)
+
+ auth_data = _build_authentication_request(
+ user_id=self.trustee_user['id'],
+ password=self.trustee_user['password'],
+ trust_id=trust['id'])
+ self.post('/auth/tokens', body=auth_data, expected_status=401)
+
+ def test_list_trusts(self):
+ ref = self.new_trust_ref(
+ trustor_user_id=self.user_id,
+ trustee_user_id=self.trustee_user_id,
+ project_id=self.project_id,
+ impersonation=False,
+ expires=dict(minutes=1),
+ role_ids=[self.role_id])
+ del ref['id']
+
+ for i in range(0, 3):
+ r = self.post('/trusts', body={'trust': ref})
+ trust = self.assertValidTrustResponse(r, ref)
+
+ r = self.get('/trusts?trustor_user_id=%s' %
+ self.user_id, expected_status=200)
+ trusts = r.body['trusts']
+ self.assertEqual(len(trusts), 3)
+
+ r = self.get('/trusts?trustee_user_id=%s' %
+ self.user_id, expected_status=200)
+ trusts = r.body['trusts']
+ self.assertEqual(len(trusts), 0)
diff --git a/tests/test_v3_catalog.py b/tests/test_v3_catalog.py
index 9f2f4f01..2d161db5 100644
--- a/tests/test_v3_catalog.py
+++ b/tests/test_v3_catalog.py
@@ -23,52 +23,6 @@ class CatalogTestCase(test_v3.RestfulTestCase):
self.endpoint_id,
self.endpoint.copy())
- # service validation
-
- def assertValidServiceListResponse(self, resp, **kwargs):
- return self.assertValidListResponse(
- resp,
- 'services',
- self.assertValidService,
- **kwargs)
-
- def assertValidServiceResponse(self, resp, ref):
- return self.assertValidResponse(
- resp,
- 'service',
- self.assertValidService,
- ref)
-
- def assertValidService(self, entity, ref=None):
- self.assertIsNotNone(entity.get('type'))
- if ref:
- self.assertEqual(ref['type'], entity['type'])
- return entity
-
- # endpoint validation
-
- def assertValidEndpointListResponse(self, resp, **kwargs):
- return self.assertValidListResponse(
- resp,
- 'endpoints',
- self.assertValidEndpoint,
- **kwargs)
-
- def assertValidEndpointResponse(self, resp, ref):
- return self.assertValidResponse(
- resp,
- 'endpoint',
- self.assertValidEndpoint,
- ref)
-
- def assertValidEndpoint(self, entity, ref=None):
- self.assertIsNotNone(entity.get('interface'))
- self.assertIsNotNone(entity.get('service_id'))
- if ref:
- self.assertEqual(ref['interface'], entity['interface'])
- self.assertEqual(ref['service_id'], entity['service_id'])
- return entity
-
# service crud tests
def test_create_service(self):
diff --git a/tests/test_v3_identity.py b/tests/test_v3_identity.py
index 47e50281..9ef487c3 100644
--- a/tests/test_v3_identity.py
+++ b/tests/test_v3_identity.py
@@ -40,145 +40,6 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.credential_id,
self.credential)
- # domain validation
-
- def assertValidDomainListResponse(self, resp, **kwargs):
- return self.assertValidListResponse(
- resp,
- 'domains',
- self.assertValidDomain,
- **kwargs)
-
- def assertValidDomainResponse(self, resp, ref):
- return self.assertValidResponse(
- resp,
- 'domain',
- self.assertValidDomain,
- ref)
-
- def assertValidDomain(self, entity, ref=None):
- if ref:
- pass
- return entity
-
- # project validation
-
- def assertValidProjectListResponse(self, resp, **kwargs):
- return self.assertValidListResponse(
- resp,
- 'projects',
- self.assertValidProject,
- **kwargs)
-
- def assertValidProjectResponse(self, resp, ref):
- return self.assertValidResponse(
- resp,
- 'project',
- self.assertValidProject,
- ref)
-
- def assertValidProject(self, entity, ref=None):
- self.assertIsNotNone(entity.get('domain_id'))
- if ref:
- self.assertEqual(ref['domain_id'], entity['domain_id'])
- return entity
-
- # user validation
-
- def assertValidUserListResponse(self, resp, **kwargs):
- return self.assertValidListResponse(
- resp,
- 'users',
- self.assertValidUser,
- **kwargs)
-
- def assertValidUserResponse(self, resp, ref):
- return self.assertValidResponse(
- resp,
- 'user',
- self.assertValidUser,
- ref)
-
- def assertValidUser(self, entity, ref=None):
- self.assertIsNotNone(entity.get('domain_id'))
- self.assertIsNotNone(entity.get('email'))
- self.assertIsNone(entity.get('password'))
- if ref:
- self.assertEqual(ref['domain_id'], entity['domain_id'])
- self.assertEqual(ref['email'], entity['email'])
- return entity
-
- # group validation
-
- def assertValidGroupListResponse(self, resp, **kwargs):
- return self.assertValidListResponse(
- resp,
- 'groups',
- self.assertValidGroup,
- **kwargs)
-
- def assertValidGroupResponse(self, resp, ref):
- return self.assertValidResponse(
- resp,
- 'group',
- self.assertValidGroup,
- ref)
-
- def assertValidGroup(self, entity, ref=None):
- self.assertIsNotNone(entity.get('name'))
- if ref:
- self.assertEqual(ref['name'], entity['name'])
- return entity
-
- # credential validation
-
- def assertValidCredentialListResponse(self, resp, **kwargs):
- return self.assertValidListResponse(
- resp,
- 'credentials',
- self.assertValidCredential,
- **kwargs)
-
- def assertValidCredentialResponse(self, resp, ref):
- return self.assertValidResponse(
- resp,
- 'credential',
- self.assertValidCredential,
- ref)
-
- def assertValidCredential(self, entity, ref=None):
- self.assertIsNotNone(entity.get('user_id'))
- self.assertIsNotNone(entity.get('blob'))
- self.assertIsNotNone(entity.get('type'))
- if ref:
- self.assertEqual(ref['user_id'], entity['user_id'])
- self.assertEqual(ref['blob'], entity['blob'])
- self.assertEqual(ref['type'], entity['type'])
- self.assertEqual(ref.get('project_id'), entity.get('project_id'))
- return entity
-
- # role validation
-
- def assertValidRoleListResponse(self, resp, **kwargs):
- return self.assertValidListResponse(
- resp,
- 'roles',
- self.assertValidRole,
- **kwargs)
-
- def assertValidRoleResponse(self, resp, ref):
- return self.assertValidResponse(
- resp,
- 'role',
- self.assertValidRole,
- ref)
-
- def assertValidRole(self, entity, ref=None):
- self.assertIsNotNone(entity.get('name'))
- if ref:
- self.assertEqual(ref['name'], entity['name'])
- return entity
-
# domain crud tests
def test_create_domain(self):
diff --git a/tests/test_v3_policy.py b/tests/test_v3_policy.py
index 811eb577..1af68b6e 100644
--- a/tests/test_v3_policy.py
+++ b/tests/test_v3_policy.py
@@ -15,30 +15,6 @@ class PolicyTestCase(test_v3.RestfulTestCase):
self.policy_id,
self.policy.copy())
- # policy validation
-
- def assertValidPolicyListResponse(self, resp, **kwargs):
- return self.assertValidListResponse(
- resp,
- 'policies',
- self.assertValidPolicy,
- **kwargs)
-
- def assertValidPolicyResponse(self, resp, ref):
- return self.assertValidResponse(
- resp,
- 'policy',
- self.assertValidPolicy,
- ref)
-
- def assertValidPolicy(self, entity, ref=None):
- self.assertIsNotNone(entity.get('blob'))
- self.assertIsNotNone(entity.get('type'))
- if ref:
- self.assertEqual(ref['blob'], entity['blob'])
- self.assertEqual(ref['type'], entity['type'])
- return entity
-
# policy crud tests
def test_create_policy(self):