summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--keystone/identity/backends/kvs.py9
-rw-r--r--keystone/identity/backends/ldap.py24
-rw-r--r--keystone/identity/backends/sql.py26
-rw-r--r--keystone/identity/core.py78
-rw-r--r--tests/test_backend.py101
-rw-r--r--tests/test_backend_ldap.py51
-rw-r--r--tests/test_v3_auth.py131
7 files changed, 352 insertions, 68 deletions
diff --git a/keystone/identity/backends/kvs.py b/keystone/identity/backends/kvs.py
index 2eea08cf..565a5587 100644
--- a/keystone/identity/backends/kvs.py
+++ b/keystone/identity/backends/kvs.py
@@ -131,15 +131,6 @@ class Identity(kvs.Base, identity.Driver):
user_ref = self._get_user(user_id)
return user_ref.get('tenants', [])
- def get_roles_for_user_and_project(self, user_id, tenant_id):
- self.get_user(user_id)
- self.get_project(tenant_id)
- try:
- metadata_ref = self.get_metadata(user_id, tenant_id)
- except exception.MetadataNotFound:
- metadata_ref = {}
- return metadata_ref.get('roles', [])
-
def add_role_to_user_and_project(self, user_id, tenant_id, role_id):
self.get_user(user_id)
self.get_project(tenant_id)
diff --git a/keystone/identity/backends/ldap.py b/keystone/identity/backends/ldap.py
index dffbf835..2387c84e 100644
--- a/keystone/identity/backends/ldap.py
+++ b/keystone/identity/backends/ldap.py
@@ -149,12 +149,23 @@ class Identity(identity.Driver):
def get_metadata(self, user_id=None, tenant_id=None,
domain_id=None, group_id=None):
+
+ def _get_roles_for_just_user_and_project(user_id, tenant_id):
+ self.get_user(user_id)
+ self.get_project(tenant_id)
+ user_dn = self.user._id_to_dn(user_id)
+ return [self.role._dn_to_id(a.role_dn)
+ for a in self.role.get_role_assignments
+ (self.project._id_to_dn(tenant_id))
+ if a.user_dn == user_dn]
+
if domain_id is not None:
- raise NotImplemented('Domain metadata not supported by LDAP.')
+ msg = 'Domain metadata not supported by LDAP'
+ raise exception.NotImplemented(message=msg)
if not self.get_project(tenant_id) or not self.get_user(user_id):
return {}
- metadata_ref = self.get_roles_for_user_and_project(user_id, tenant_id)
+ metadata_ref = _get_roles_for_just_user_and_project(user_id, tenant_id)
if not metadata_ref:
return {}
return {'roles': metadata_ref}
@@ -182,15 +193,6 @@ class Identity(identity.Driver):
self.project.get_user_dns(tenant_id, rolegrants)]
return self._set_default_domain(users)
- def get_roles_for_user_and_project(self, user_id, tenant_id):
- self.get_user(user_id)
- self.get_project(tenant_id)
- user_dn = self.user._id_to_dn(user_id)
- return [self.role._dn_to_id(a.role_dn)
- for a in self.role.get_role_assignments
- (self.project._id_to_dn(tenant_id))
- if a.user_dn == user_dn]
-
def _subrole_id_to_dn(self, role_id, tenant_id):
if tenant_id is None:
return self.role._id_to_dn(role_id)
diff --git a/keystone/identity/backends/sql.py b/keystone/identity/backends/sql.py
index f81feb1d..ba318c73 100644
--- a/keystone/identity/backends/sql.py
+++ b/keystone/identity/backends/sql.py
@@ -368,32 +368,6 @@ class Identity(sql.Base, identity.Driver):
membership_refs = query.all()
return [x.project_id for x in membership_refs]
- def _get_user_group_project_roles(self, metadata_ref, user_id, project_id):
- group_refs = self.list_groups_for_user(user_id=user_id)
- for x in group_refs:
- try:
- metadata_ref.update(
- self.get_metadata(group_id=x['id'],
- tenant_id=project_id))
- except exception.MetadataNotFound:
- # no group grant, skip
- pass
-
- def _get_user_project_roles(self, metadata_ref, user_id, project_id):
- try:
- metadata_ref.update(self.get_metadata(user_id, project_id))
- except exception.MetadataNotFound:
- pass
-
- def get_roles_for_user_and_project(self, user_id, tenant_id):
- session = self.get_session()
- self._get_user(session, user_id)
- self._get_project(session, tenant_id)
- metadata_ref = {}
- self._get_user_project_roles(metadata_ref, user_id, tenant_id)
- self._get_user_group_project_roles(metadata_ref, user_id, tenant_id)
- return list(set(metadata_ref.get('roles', [])))
-
def add_role_to_user_and_project(self, user_id, tenant_id, role_id):
session = self.get_session()
self._get_user(session, user_id)
diff --git a/keystone/identity/core.py b/keystone/identity/core.py
index a254470e..09324ea3 100644
--- a/keystone/identity/core.py
+++ b/keystone/identity/core.py
@@ -186,50 +186,88 @@ class Driver(object):
def get_roles_for_user_and_project(self, user_id, tenant_id):
"""Get the roles associated with a user within given tenant.
+ This includes roles directly assigned to the user on the
+ project, as well as those by virtue of group membership.
+
:returns: a list of role ids.
:raises: keystone.exception.UserNotFound,
keystone.exception.ProjectNotFound
"""
- raise exception.NotImplemented()
+ def _get_group_project_roles(user_id, tenant_id):
+ role_list = []
+ group_refs = self.list_groups_for_user(user_id=user_id)
+ for x in group_refs:
+ try:
+ metadata_ref = self.get_metadata(group_id=x['id'],
+ tenant_id=tenant_id)
+ role_list += metadata_ref.get('roles', [])
+ except exception.MetadataNotFound:
+ # no group grant, skip
+ pass
+ return role_list
+
+ def _get_user_project_roles(user_id, tenant_id):
+ metadata_ref = {}
+ try:
+ metadata_ref = self.get_metadata(user_id=user_id,
+ tenant_id=tenant_id)
+ except exception.MetadataNotFound:
+ pass
+ return metadata_ref.get('roles', [])
+
+ self.get_user(user_id)
+ self.get_project(tenant_id)
+ user_role_list = _get_user_project_roles(user_id, tenant_id)
+ group_role_list = _get_group_project_roles(user_id, tenant_id)
+ # Use set() to process the list to remove any duplicates
+ return list(set(user_role_list + group_role_list))
def get_roles_for_user_and_domain(self, user_id, domain_id):
"""Get the roles associated with a user within given domain.
+ This includes roles directly assigned to the user on the
+ domain, as well as those by virtue of group membership.
+
:returns: a list of role ids.
:raises: keystone.exception.UserNotFound,
- keystone.exception.ProjectNotFound
+ keystone.exception.DomainNotFound
"""
- def update_metadata_for_group_domain_roles(self, metadata_ref,
- user_id, domain_id):
+ def _get_group_domain_roles(user_id, domain_id):
+ role_list = []
group_refs = self.list_groups_for_user(user_id=user_id)
for x in group_refs:
try:
- metadata_ref.update(
- self.get_metadata(group_id=x['id'],
- domain_id=domain_id))
- except exception.MetadataNotFound:
- # no group grant, skip
+ metadata_ref = self.get_metadata(group_id=x['id'],
+ domain_id=domain_id)
+ role_list += metadata_ref.get('roles', [])
+ except (exception.MetadataNotFound, exception.NotImplemented):
+ # MetadataNotFound implies no group grant, so skip.
+ # Ignore NotImplemented since not all backends support
+ # domains.
pass
+ return role_list
- def update_metadata_for_user_domain_roles(self, metadata_ref,
- user_id, domain_id):
+ def _get_user_domain_roles(user_id, domain_id):
+ metadata_ref = {}
try:
- metadata_ref.update(self.get_metadata(user_id=user_id,
- domain_id=domain_id))
- except exception.MetadataNotFound:
+ metadata_ref = self.get_metadata(user_id=user_id,
+ domain_id=domain_id)
+ except (exception.MetadataNotFound, exception.NotImplemented):
+ # MetadataNotFound implies no user grants.
+ # Ignore NotImplemented since not all backends support
+ # domains.
pass
+ return metadata_ref.get('roles', [])
self.get_user(user_id)
self.get_domain(domain_id)
- metadata_ref = {}
- update_metadata_for_user_domain_roles(self, metadata_ref,
- user_id, domain_id)
- update_metadata_for_group_domain_roles(self, metadata_ref,
- user_id, domain_id)
- return list(set(metadata_ref.get('roles', [])))
+ user_role_list = _get_user_domain_roles(user_id, domain_id)
+ group_role_list = _get_group_domain_roles(user_id, domain_id)
+ # Use set() to process the list to remove any duplicates
+ return list(set(user_role_list + group_role_list))
def add_role_to_user_and_project(self, user_id, tenant_id, role_id):
"""Add a role to a user within given tenant.
diff --git a/tests/test_backend.py b/tests/test_backend.py
index ea40cd8b..ebf94924 100644
--- a/tests/test_backend.py
+++ b/tests/test_backend.py
@@ -1081,7 +1081,7 @@ class IdentityTests(object):
def test_multi_role_grant_by_user_group_on_project_domain(self):
role_list = []
- for _ in range(8):
+ for _ in range(10):
role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_role(role['id'], role)
role_list.append(role)
@@ -1150,6 +1150,105 @@ class IdentityTests(object):
self.assertIn(role_list[6], roles_ref)
self.assertIn(role_list[7], roles_ref)
+ # Now test the alternate way of getting back lists of grants,
+ # where user and group roles are combined. These should match
+ # the above results.
+ combined_role_list = self.identity_api.get_roles_for_user_and_project(
+ user1['id'], project1['id'])
+ self.assertEquals(len(combined_role_list), 4)
+ self.assertIn(role_list[4]['id'], combined_role_list)
+ self.assertIn(role_list[5]['id'], combined_role_list)
+ self.assertIn(role_list[6]['id'], combined_role_list)
+ self.assertIn(role_list[7]['id'], combined_role_list)
+
+ combined_role_list = self.identity_api.get_roles_for_user_and_domain(
+ user1['id'], domain1['id'])
+ self.assertEquals(len(combined_role_list), 4)
+ self.assertIn(role_list[0]['id'], combined_role_list)
+ self.assertIn(role_list[1]['id'], combined_role_list)
+ self.assertIn(role_list[2]['id'], combined_role_list)
+ self.assertIn(role_list[3]['id'], combined_role_list)
+
+ def test_multi_group_grants_on_project_domain(self):
+ """Test multiple group roles for user on project and domain.
+
+ Test Plan:
+ - Create 6 roles
+ - Create a domain, with a project, user and two groups
+ - Make the user a member of both groups
+ - Check no roles yet exit
+ - Assign a role to each user and both groups on both the
+ project and domain
+ - Get a list of effective roles for the user on both the
+ project and domain, checking we get back the correct three
+ roles
+
+ """
+ role_list = []
+ for _ in range(6):
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.identity_api.create_role(role['id'], role)
+ role_list.append(role)
+ domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.identity_api.create_domain(domain1['id'], domain1)
+ user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
+ 'domain_id': domain1['id'], 'password': uuid.uuid4().hex,
+ 'enabled': True}
+ self.identity_api.create_user(user1['id'], user1)
+ group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
+ 'domain_id': domain1['id'], 'enabled': True}
+ self.identity_api.create_group(group1['id'], group1)
+ group2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
+ 'domain_id': domain1['id'], 'enabled': True}
+ self.identity_api.create_group(group2['id'], group2)
+ project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
+ 'domain_id': domain1['id']}
+ self.identity_api.create_project(project1['id'], project1)
+
+ self.identity_api.add_user_to_group(user1['id'],
+ group1['id'])
+ self.identity_api.add_user_to_group(user1['id'],
+ group2['id'])
+
+ roles_ref = self.identity_api.list_grants(
+ user_id=user1['id'],
+ project_id=project1['id'])
+ self.assertEquals(len(roles_ref), 0)
+ self.identity_api.create_grant(user_id=user1['id'],
+ domain_id=domain1['id'],
+ role_id=role_list[0]['id'])
+ self.identity_api.create_grant(group_id=group1['id'],
+ domain_id=domain1['id'],
+ role_id=role_list[1]['id'])
+ self.identity_api.create_grant(group_id=group2['id'],
+ domain_id=domain1['id'],
+ role_id=role_list[2]['id'])
+ self.identity_api.create_grant(user_id=user1['id'],
+ project_id=project1['id'],
+ role_id=role_list[3]['id'])
+ self.identity_api.create_grant(group_id=group1['id'],
+ project_id=project1['id'],
+ role_id=role_list[4]['id'])
+ self.identity_api.create_grant(group_id=group2['id'],
+ project_id=project1['id'],
+ role_id=role_list[5]['id'])
+
+ # Read by the roles, ensuring we get the correct 3 roles for
+ # both project and domain
+ combined_role_list = self.identity_api.get_roles_for_user_and_project(
+ user1['id'], project1['id'])
+ self.assertEquals(len(combined_role_list), 3)
+ self.assertIn(role_list[3]['id'], combined_role_list)
+ self.assertIn(role_list[4]['id'], combined_role_list)
+ self.assertIn(role_list[5]['id'], combined_role_list)
+
+ combined_role_list = self.identity_api.get_roles_for_user_and_domain(
+ user1['id'], domain1['id'])
+ self.assertEquals(len(combined_role_list), 3)
+ self.assertIn(role_list[0]['id'], combined_role_list)
+ self.assertIn(role_list[1]['id'], combined_role_list)
+ self.assertIn(role_list[2]['id'], combined_role_list)
+
def test_delete_role_with_user_and_group_grants(self):
role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_role(role1['id'], role1)
diff --git a/tests/test_backend_ldap.py b/tests/test_backend_ldap.py
index 577a6ef0..7d44ebe4 100644
--- a/tests/test_backend_ldap.py
+++ b/tests/test_backend_ldap.py
@@ -498,7 +498,56 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
def test_multi_role_grant_by_user_group_on_project_domain(self):
- raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
+ # This is a partial implementation of the standard test that
+ # is defined in test_backend.py. It omits both domain and
+ # group grants. since neither of these are yet supported by
+ # the ldap backend.
+
+ role_list = []
+ for _ in range(2):
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.identity_api.create_role(role['id'], role)
+ role_list.append(role)
+
+ user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
+ 'domain_id': CONF.identity.default_domain_id,
+ 'password': uuid.uuid4().hex,
+ 'enabled': True}
+ self.identity_api.create_user(user1['id'], user1)
+ project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
+ 'domain_id': CONF.identity.default_domain_id}
+ self.identity_api.create_project(project1['id'], project1)
+
+ self.identity_api.add_role_to_user_and_project(
+ user_id=user1['id'],
+ tenant_id=project1['id'],
+ role_id=role_list[0]['id'])
+ self.identity_api.add_role_to_user_and_project(
+ user_id=user1['id'],
+ tenant_id=project1['id'],
+ role_id=role_list[1]['id'])
+
+ # Although list_grants are not yet supported, we can test the
+ # alternate way of getting back lists of grants, where user
+ # and group roles are combined. Only directly assigned user
+ # roles are available, since group grants are not yet supported
+
+ combined_role_list = self.identity_api.get_roles_for_user_and_project(
+ user1['id'], project1['id'])
+ self.assertEquals(len(combined_role_list), 2)
+ self.assertIn(role_list[0]['id'], combined_role_list)
+ self.assertIn(role_list[1]['id'], combined_role_list)
+
+ # Finally, although domain roles are not implemented, check we can
+ # issue the combined get roles call with benign results, since thus is
+ # used in token generation
+
+ combined_role_list = self.identity_api.get_roles_for_user_and_domain(
+ user1['id'], CONF.identity.default_domain_id)
+ self.assertEquals(len(combined_role_list), 0)
+
+ def test_multi_group_grants_on_project_domain(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101287')
def test_delete_role_with_user_and_group_grants(self):
raise nose.exc.SkipTest('Blocked by bug 1101287')
diff --git a/tests/test_v3_auth.py b/tests/test_v3_auth.py
index a2bee8b8..c38d13c9 100644
--- a/tests/test_v3_auth.py
+++ b/tests/test_v3_auth.py
@@ -888,6 +888,137 @@ class TestAuthJSON(test_v3.RestfulTestCase):
project_id=project['id'])
self.post('/auth/tokens', body=auth_data, expected_status=401)
+ def test_user_and_group_roles_scoped_token(self):
+ """Test correct roles are returned in scoped token.
+
+ Test Plan:
+ - Create a domain, with 1 project, 2 users (user1 and user2)
+ and 2 groups (group1 and group2)
+ - Make user1 a member of group1, user2 a member of group2
+ - Create 8 roles, assigning them to each of the 8 combinations
+ of users/groups on domain/project
+ - Get a project scoped token for user1, checking that the right
+ two roles are returned (one directly assigned, one by virtue
+ of group membership)
+ - Repeat this for a domain scoped token
+ - Make user1 also a member of group2
+ - Get another scoped token making sure the additional role
+ shows up
+ - User2 is just here as a spoiler, to make sure we don't get
+ any roles uniquely assigned to it returned in any of our
+ tokens
+
+ """
+
+ domainA = self.new_domain_ref()
+ self.identity_api.create_domain(domainA['id'], domainA)
+ projectA = self.new_project_ref(domain_id=domainA['id'])
+ self.identity_api.create_project(projectA['id'], projectA)
+
+ user1 = self.new_user_ref(
+ domain_id=domainA['id'])
+ user1['password'] = uuid.uuid4().hex
+ self.identity_api.create_user(user1['id'], user1)
+
+ user2 = self.new_user_ref(
+ domain_id=domainA['id'])
+ user2['password'] = uuid.uuid4().hex
+ self.identity_api.create_user(user2['id'], user2)
+
+ group1 = self.new_group_ref(
+ domain_id=domainA['id'])
+ self.identity_api.create_group(group1['id'], group1)
+
+ group2 = self.new_group_ref(
+ domain_id=domainA['id'])
+ self.identity_api.create_group(group2['id'], group2)
+
+ self.identity_api.add_user_to_group(user1['id'],
+ group1['id'])
+ self.identity_api.add_user_to_group(user2['id'],
+ group2['id'])
+
+ # Now create all the roles and assign them
+ role_list = []
+ for _ in range(8):
+ role = self.new_role_ref()
+ self.identity_api.create_role(role['id'], role)
+ role_list.append(role)
+
+ self.identity_api.create_grant(role_list[0]['id'],
+ user_id=user1['id'],
+ domain_id=domainA['id'])
+ self.identity_api.create_grant(role_list[1]['id'],
+ user_id=user1['id'],
+ project_id=projectA['id'])
+ self.identity_api.create_grant(role_list[2]['id'],
+ user_id=user2['id'],
+ domain_id=domainA['id'])
+ self.identity_api.create_grant(role_list[3]['id'],
+ user_id=user2['id'],
+ project_id=projectA['id'])
+ self.identity_api.create_grant(role_list[4]['id'],
+ group_id=group1['id'],
+ domain_id=domainA['id'])
+ self.identity_api.create_grant(role_list[5]['id'],
+ group_id=group1['id'],
+ project_id=projectA['id'])
+ self.identity_api.create_grant(role_list[6]['id'],
+ group_id=group2['id'],
+ domain_id=domainA['id'])
+ self.identity_api.create_grant(role_list[7]['id'],
+ group_id=group2['id'],
+ project_id=projectA['id'])
+
+ # First, get a project scoped token - which should
+ # contain the direct user role and the one by virtue
+ # of group membership
+ auth_data = self.build_authentication_request(
+ user_id=user1['id'],
+ password=user1['password'],
+ project_id=projectA['id'])
+ r = self.post('/auth/tokens', body=auth_data)
+ token = self.assertValidScopedTokenResponse(r)
+ roles_ids = []
+ for i, ref in enumerate(token['roles']):
+ roles_ids.append(ref['id'])
+ self.assertEqual(len(token['roles']), 2)
+ self.assertIn(role_list[1]['id'], roles_ids)
+ self.assertIn(role_list[5]['id'], roles_ids)
+
+ # Now the same thing for a domain scoped token
+ auth_data = self.build_authentication_request(
+ user_id=user1['id'],
+ password=user1['password'],
+ domain_id=domainA['id'])
+ r = self.post('/auth/tokens', body=auth_data)
+ token = self.assertValidScopedTokenResponse(r)
+ roles_ids = []
+ for i, ref in enumerate(token['roles']):
+ roles_ids.append(ref['id'])
+ self.assertEqual(len(token['roles']), 2)
+ self.assertIn(role_list[0]['id'], roles_ids)
+ self.assertIn(role_list[4]['id'], roles_ids)
+
+ # Finally, add user1 to the 2nd group, and get a new
+ # scoped token - the extra role should now be included
+ # by virtue of the 2nd group
+ self.identity_api.add_user_to_group(user1['id'],
+ group2['id'])
+ auth_data = self.build_authentication_request(
+ user_id=user1['id'],
+ password=user1['password'],
+ project_id=projectA['id'])
+ r = self.post('/auth/tokens', body=auth_data)
+ token = self.assertValidScopedTokenResponse(r)
+ roles_ids = []
+ for i, ref in enumerate(token['roles']):
+ roles_ids.append(ref['id'])
+ self.assertEqual(len(token['roles']), 3)
+ self.assertIn(role_list[1]['id'], roles_ids)
+ self.assertIn(role_list[5]['id'], roles_ids)
+ self.assertIn(role_list[7]['id'], roles_ids)
+
def test_project_id_scoped_token_with_user_domain_id(self):
auth_data = self.build_authentication_request(
username=self.user['name'],