summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--doc/source/configuration.rst12
-rw-r--r--etc/keystone.conf.sample5
-rw-r--r--keystone/assignment/backends/kvs.py86
-rw-r--r--keystone/assignment/backends/ldap.py6
-rw-r--r--keystone/assignment/backends/sql.py162
-rw-r--r--keystone/assignment/core.py121
-rw-r--r--keystone/common/config.py3
-rw-r--r--keystone/common/sql/migrate_repo/versions/028_fixup_group_metadata.py190
-rw-r--r--keystone/common/sql/migrate_repo/versions/029_update_assignment_metadata.py102
-rw-r--r--keystone/identity/controllers.py199
-rw-r--r--keystone/identity/core.py29
-rw-r--r--keystone/identity/routers.py42
-rw-r--r--tests/test_backend.py190
-rw-r--r--tests/test_backend_ldap.py4
-rw-r--r--tests/test_backend_sql.py4
-rw-r--r--tests/test_sql_upgrade.py298
-rw-r--r--tests/test_v3.py19
-rw-r--r--tests/test_v3_identity.py575
18 files changed, 1859 insertions, 188 deletions
diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst
index a074c704..c4db1be2 100644
--- a/doc/source/configuration.rst
+++ b/doc/source/configuration.rst
@@ -79,6 +79,7 @@ following sections:
* ``[signing]`` - cryptographic signatures for PKI based tokens
* ``[ssl]`` - SSL configuration
* ``[auth]`` - Authentication plugin configuration
+* ``[os_inherit]`` - Inherited Role Assignment extension
* ``[paste_deploy]`` - Pointer to the PasteDeploy configuration file
The Keystone primary configuration file is expected to be named ``keystone.conf``.
@@ -494,6 +495,17 @@ In addition to changing their password all of the users current tokens will be
deleted (if the backend used is kvs or sql)
+Inherited Role Assignment Extension
+-----------------------------------
+
+Keystone provides an optional extension that adds the capability to assign roles to a domain that, rather than
+affect the domain itself, are instead inherited to all projects owned by theat domain. This extension is disabled by
+default, but can be enabled by including the following in ``keystone.conf``.
+
+ [os_inherit]
+ enabled = True
+
+
Sample Configuration Files
--------------------------
diff --git a/etc/keystone.conf.sample b/etc/keystone.conf.sample
index 671accf1..7fa232f0 100644
--- a/etc/keystone.conf.sample
+++ b/etc/keystone.conf.sample
@@ -109,6 +109,11 @@
# delegation and impersonation features can be optionally disabled
# enabled = True
+[os_inherit]
+# role-assignment inheritance to projects from owning domain can be
+# optionally enabled
+# enabled = False
+
[catalog]
# dynamic, sql-based backend (supports API/CLI-based management commands)
# driver = keystone.catalog.backends.sql.Catalog
diff --git a/keystone/assignment/backends/kvs.py b/keystone/assignment/backends/kvs.py
index 4ed3937b..4dfd908f 100644
--- a/keystone/assignment/backends/kvs.py
+++ b/keystone/assignment/backends/kvs.py
@@ -33,10 +33,16 @@ class Assignment(kvs.Base, assignment.Driver):
except exception.NotFound:
raise exception.ProjectNotFound(project_id=tenant_id)
- def list_projects(self):
- tenant_keys = filter(lambda x: x.startswith("tenant-"),
- self.db.keys())
- return [self.db.get(key) for key in tenant_keys]
+ def list_projects(self, domain_id=None):
+ project_keys = filter(lambda x: x.startswith("tenant-"),
+ self.db.keys())
+ project_refs = [self.db.get(key) for key in project_keys]
+
+ if domain_id:
+ self.get_domain(domain_id)
+ project_refs = filter(lambda x: domain_id in x['domain_id'],
+ project_refs)
+ return project_refs
def get_project_by_name(self, tenant_name, domain_id):
try:
@@ -105,13 +111,16 @@ class Assignment(kvs.Base, assignment.Driver):
metadata_ref = self._get_metadata(user_id, tenant_id)
except exception.MetadataNotFound:
metadata_ref = {}
- roles = set(metadata_ref.get('roles', []))
- if role_id in roles:
+
+ try:
+ metadata_ref['roles'] = self._add_role_to_role_dicts(
+ role_id, False, metadata_ref.get('roles', []),
+ allow_existing=False)
+ except KeyError:
msg = ('User %s already has role %s in tenant %s'
% (user_id, role_id, tenant_id))
raise exception.Conflict(type='role grant', details=msg)
- roles.add(role_id)
- metadata_ref['roles'] = list(roles)
+
self._update_metadata(user_id, tenant_id, metadata_ref)
def remove_role_from_user_and_project(self, user_id, tenant_id, role_id):
@@ -119,23 +128,25 @@ class Assignment(kvs.Base, assignment.Driver):
metadata_ref = self._get_metadata(user_id, tenant_id)
except exception.MetadataNotFound:
metadata_ref = {}
- roles = set(metadata_ref.get('roles', []))
- if role_id not in roles:
- msg = 'Cannot remove role that has not been granted, %s' % role_id
- raise exception.RoleNotFound(message=msg)
- roles.remove(role_id)
- metadata_ref['roles'] = list(roles)
+ try:
+ metadata_ref['roles'] = self._remove_role_from_role_dicts(
+ role_id, False, metadata_ref.get('roles', []))
+ except KeyError:
+ raise exception.RoleNotFound(message=_(
+ 'Cannot remove role that has not been granted, %s') %
+ role_id)
+
+ if len(metadata_ref['roles']):
+ self._update_metadata(user_id, tenant_id, metadata_ref)
+ else:
- if not len(roles):
self.db.delete('metadata-%s-%s' % (tenant_id, user_id))
user_ref = self._get_user(user_id)
tenants = set(user_ref.get('tenants', []))
tenants.remove(tenant_id)
user_ref['tenants'] = list(tenants)
self.identity_api.update_user(user_id, user_ref)
- else:
- self._update_metadata(user_id, tenant_id, metadata_ref)
def list_role_assignments(self):
"""List the role assignments.
@@ -144,7 +155,7 @@ class Assignment(kvs.Base, assignment.Driver):
"metadata-{target}-{actor}", with the value being a role list
- i.e. "metadata-MyProjectID-MyUserID" [role1, role2]
+ i.e. "metadata-MyProjectID-MyUserID" [{'id': role1}, {'id': role2}]
...so we enumerate the list and extract the targets, actors
and roles.
@@ -169,7 +180,8 @@ class Assignment(kvs.Base, assignment.Driver):
template['group_id'] = meta_id2
entry = self.db.get(key)
- for r in entry.get('roles', []):
+ for r in self._roles_from_role_dicts(entry.get('roles', {}),
+ False):
role_assignment = template.copy()
role_assignment['role_id'] = r
assignment_list.append(role_assignment)
@@ -324,7 +336,8 @@ class Assignment(kvs.Base, assignment.Driver):
self.db.set('role_list', list(role_list))
def create_grant(self, role_id, user_id=None, group_id=None,
- domain_id=None, project_id=None):
+ domain_id=None, project_id=None,
+ inherited_to_projects=False):
self.get_role(role_id)
if user_id:
@@ -341,14 +354,16 @@ class Assignment(kvs.Base, assignment.Driver):
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
- roles = set(metadata_ref.get('roles', []))
- roles.add(role_id)
- metadata_ref['roles'] = list(roles)
+
+ metadata_ref['roles'] = self._add_role_to_role_dicts(
+ role_id, inherited_to_projects, metadata_ref.get('roles', []))
+
self._update_metadata(user_id, project_id, metadata_ref,
domain_id, group_id)
def list_grants(self, user_id=None, group_id=None,
- domain_id=None, project_id=None):
+ domain_id=None, project_id=None,
+ inherited_to_projects=False):
if user_id:
self.identity_api.get_user(user_id)
if group_id:
@@ -363,10 +378,14 @@ class Assignment(kvs.Base, assignment.Driver):
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
- return [self.get_role(x) for x in metadata_ref.get('roles', [])]
+
+ return [self.get_role(x) for x in
+ self._roles_from_role_dicts(metadata_ref.get('roles', []),
+ inherited_to_projects)]
def get_grant(self, role_id, user_id=None, group_id=None,
- domain_id=None, project_id=None):
+ domain_id=None, project_id=None,
+ inherited_to_projects=False):
self.get_role(role_id)
if user_id:
self.identity_api.get_user(user_id)
@@ -382,13 +401,17 @@ class Assignment(kvs.Base, assignment.Driver):
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
- role_ids = set(metadata_ref.get('roles', []))
+
+ role_ids = set(self._roles_from_role_dicts(
+ metadata_ref.get('roles', []), inherited_to_projects))
+
if role_id not in role_ids:
raise exception.RoleNotFound(role_id=role_id)
return self.get_role(role_id)
def delete_grant(self, role_id, user_id=None, group_id=None,
- domain_id=None, project_id=None):
+ domain_id=None, project_id=None,
+ inherited_to_projects=False):
self.get_role(role_id)
if user_id:
self.identity_api.get_user(user_id)
@@ -404,12 +427,13 @@ class Assignment(kvs.Base, assignment.Driver):
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
- roles = set(metadata_ref.get('roles', []))
+
try:
- roles.remove(role_id)
+ metadata_ref['roles'] = self._remove_role_from_role_dicts(
+ role_id, inherited_to_projects, metadata_ref.get('roles', []))
except KeyError:
raise exception.RoleNotFound(role_id=role_id)
- metadata_ref['roles'] = list(roles)
+
self._update_metadata(user_id, project_id, metadata_ref,
domain_id, group_id)
diff --git a/keystone/assignment/backends/ldap.py b/keystone/assignment/backends/ldap.py
index 09539c9f..b1b3f99f 100644
--- a/keystone/assignment/backends/ldap.py
+++ b/keystone/assignment/backends/ldap.py
@@ -72,7 +72,9 @@ class Assignment(assignment.Driver):
def get_project(self, tenant_id):
return self._set_default_domain(self.project.get(tenant_id))
- def list_projects(self):
+ def list_projects(self, domain_id=None):
+ # We don't support multiple domains within this driver, so ignore
+ # any domain passed.
return self._set_default_domain(self.project.get_all())
def get_project_by_name(self, tenant_name, domain_id):
@@ -117,7 +119,7 @@ class Assignment(assignment.Driver):
metadata_ref = _get_roles_for_just_user_and_project(user_id, tenant_id)
if not metadata_ref:
return {}
- return {'roles': metadata_ref}
+ return {'roles': [self._role_to_dict(r, False) for r in metadata_ref]}
def get_role(self, role_id):
return self.role.get(role_id)
diff --git a/keystone/assignment/backends/sql.py b/keystone/assignment/backends/sql.py
index 237330ce..5ec435ff 100644
--- a/keystone/assignment/backends/sql.py
+++ b/keystone/assignment/backends/sql.py
@@ -96,7 +96,8 @@ class Assignment(sql.Base, assignment.Driver):
raise exception.MetadataNotFound()
def create_grant(self, role_id, user_id=None, group_id=None,
- domain_id=None, project_id=None):
+ domain_id=None, project_id=None,
+ inherited_to_projects=False):
if user_id:
self.identity_api.get_user(user_id)
if group_id:
@@ -110,6 +111,10 @@ class Assignment(sql.Base, assignment.Driver):
if project_id:
self._get_project(session, project_id)
+ if project_id and inherited_to_projects:
+ msg = _('Inherited roles can only be assigned to domains')
+ raise exception.Conflict(type='role grant', details=msg)
+
try:
metadata_ref = self._get_metadata(user_id, project_id,
domain_id, group_id)
@@ -117,9 +122,10 @@ class Assignment(sql.Base, assignment.Driver):
except exception.MetadataNotFound:
metadata_ref = {}
is_new = True
- roles = set(metadata_ref.get('roles', []))
- roles.add(role_id)
- metadata_ref['roles'] = list(roles)
+
+ metadata_ref['roles'] = self._add_role_to_role_dicts(
+ role_id, inherited_to_projects, metadata_ref.get('roles', []))
+
if is_new:
self._create_metadata(user_id, project_id, metadata_ref,
domain_id, group_id)
@@ -128,7 +134,8 @@ class Assignment(sql.Base, assignment.Driver):
domain_id, group_id)
def list_grants(self, user_id=None, group_id=None,
- domain_id=None, project_id=None):
+ domain_id=None, project_id=None,
+ inherited_to_projects=False):
if user_id:
self.identity_api.get_user(user_id)
if group_id:
@@ -144,10 +151,14 @@ class Assignment(sql.Base, assignment.Driver):
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
- return [self.get_role(x) for x in metadata_ref.get('roles', [])]
+
+ return [self.get_role(x) for x in
+ self._roles_from_role_dicts(metadata_ref.get('roles', []),
+ inherited_to_projects)]
def get_grant(self, role_id, user_id=None, group_id=None,
- domain_id=None, project_id=None):
+ domain_id=None, project_id=None,
+ inherited_to_projects=False):
if user_id:
self.identity_api.get_user(user_id)
if group_id:
@@ -166,13 +177,15 @@ class Assignment(sql.Base, assignment.Driver):
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
- role_ids = set(metadata_ref.get('roles', []))
+ role_ids = set(self._roles_from_role_dicts(
+ metadata_ref.get('roles', []), inherited_to_projects))
if role_id not in role_ids:
raise exception.RoleNotFound(role_id=role_id)
return role_ref.to_dict()
def delete_grant(self, role_id, user_id=None, group_id=None,
- domain_id=None, project_id=None):
+ domain_id=None, project_id=None,
+ inherited_to_projects=False):
if user_id:
self.identity_api.get_user(user_id)
if group_id:
@@ -193,25 +206,43 @@ class Assignment(sql.Base, assignment.Driver):
except exception.MetadataNotFound:
metadata_ref = {}
is_new = True
- roles = set(metadata_ref.get('roles', []))
+
try:
- roles.remove(role_id)
+ metadata_ref['roles'] = self._remove_role_from_role_dicts(
+ role_id, inherited_to_projects, metadata_ref.get('roles', []))
except KeyError:
raise exception.RoleNotFound(role_id=role_id)
- metadata_ref['roles'] = list(roles)
+
if is_new:
+ # TODO(henry-nash) It seems odd that you would create a new
+ # entry in response to trying to delete a role that was not
+ # assigned. Although benign, this should probably be removed.
self._create_metadata(user_id, project_id, metadata_ref,
domain_id, group_id)
else:
self._update_metadata(user_id, project_id, metadata_ref,
domain_id, group_id)
- def list_projects(self):
+ def list_projects(self, domain_id=None):
session = self.get_session()
- tenant_refs = session.query(Project).all()
- return [tenant_ref.to_dict() for tenant_ref in tenant_refs]
+ if domain_id:
+ self._get_domain(session, domain_id)
+
+ query = session.query(Project)
+ if domain_id:
+ query = query.filter_by(domain_id=domain_id)
+ project_refs = query.all()
+ return [project_ref.to_dict() for project_ref in project_refs]
def get_projects_for_user(self, user_id):
+
+ # FIXME(henry-nash) The following should take into account
+ # both group and inherited roles. In fact, I don't see why this
+ # call can't be handled at the controller level like we do
+ # with 'get_roles_for_user_and_project()'. Further, this
+ # call seems essentially the same as 'list_user_projects()'
+ # later in this driver. Both should be removed.
+
self.identity_api.get_user(user_id)
session = self.get_session()
query = session.query(UserProjectGrant)
@@ -230,13 +261,16 @@ class Assignment(sql.Base, assignment.Driver):
except exception.MetadataNotFound:
metadata_ref = {}
is_new = True
- roles = set(metadata_ref.get('roles', []))
- if role_id in roles:
+
+ try:
+ metadata_ref['roles'] = self._add_role_to_role_dicts(
+ role_id, False, metadata_ref.get('roles', []),
+ allow_existing=False)
+ except KeyError:
msg = ('User %s already has role %s in tenant %s'
% (user_id, role_id, tenant_id))
raise exception.Conflict(type='role grant', details=msg)
- roles.add(role_id)
- metadata_ref['roles'] = list(roles)
+
if is_new:
self._create_metadata(user_id, tenant_id, metadata_ref)
else:
@@ -245,14 +279,15 @@ class Assignment(sql.Base, assignment.Driver):
def remove_role_from_user_and_project(self, user_id, tenant_id, role_id):
try:
metadata_ref = self._get_metadata(user_id, tenant_id)
- roles = set(metadata_ref.get('roles', []))
- if role_id not in roles:
+ try:
+ metadata_ref['roles'] = self._remove_role_from_role_dicts(
+ role_id, False, metadata_ref.get('roles', []))
+ except KeyError:
raise exception.RoleNotFound(message=_(
'Cannot remove role that has not been granted, %s') %
role_id)
- roles.remove(role_id)
- metadata_ref['roles'] = list(roles)
- if len(roles):
+
+ if len(metadata_ref['roles']):
self._update_metadata(user_id, tenant_id, metadata_ref)
else:
session = self.get_session()
@@ -277,28 +312,44 @@ class Assignment(sql.Base, assignment.Driver):
assignment_list = []
refs = session.query(UserDomainGrant).all()
for x in refs:
- for r in x.data.get('roles', []):
- assignment_list.append({'user_id': x.user_id,
- 'domain_id': x.domain_id,
- 'role_id': r})
+ for r in self._roles_from_role_dicts(
+ x.data.get('roles', {}), False):
+ assignment_list.append({'user_id': x.user_id,
+ 'domain_id': x.domain_id,
+ 'role_id': r})
+ for r in self._roles_from_role_dicts(
+ x.data.get('roles', {}), True):
+ assignment_list.append({'user_id': x.user_id,
+ 'domain_id': x.domain_id,
+ 'role_id': r,
+ 'inherited_to_projects': True})
refs = session.query(UserProjectGrant).all()
for x in refs:
- for r in x.data.get('roles', []):
- assignment_list.append({'user_id': x.user_id,
- 'project_id': x.project_id,
- 'role_id': r})
+ for r in self._roles_from_role_dicts(
+ x.data.get('roles', {}), False):
+ assignment_list.append({'user_id': x.user_id,
+ 'project_id': x.project_id,
+ 'role_id': r})
refs = session.query(GroupDomainGrant).all()
for x in refs:
- for r in x.data.get('roles', []):
- assignment_list.append({'group_id': x.group_id,
- 'domain_id': x.domain_id,
- 'role_id': r})
+ for r in self._roles_from_role_dicts(
+ x.data.get('roles', {}), False):
+ assignment_list.append({'group_id': x.group_id,
+ 'domain_id': x.domain_id,
+ 'role_id': r})
+ for r in self._roles_from_role_dicts(
+ x.data.get('roles', {}), True):
+ assignment_list.append({'group_id': x.group_id,
+ 'domain_id': x.domain_id,
+ 'role_id': r,
+ 'inherited_to_projects': True})
refs = session.query(GroupProjectGrant).all()
for x in refs:
- for r in x.data.get('roles', []):
- assignment_list.append({'group_id': x.group_id,
- 'project_id': x.project_id,
- 'role_id': r})
+ for r in self._roles_from_role_dicts(
+ x.data.get('roles', {}), False):
+ assignment_list.append({'group_id': x.group_id,
+ 'project_id': x.project_id,
+ 'role_id': r})
return assignment_list
# CRUD
@@ -473,6 +524,14 @@ class Assignment(sql.Base, assignment.Driver):
session.flush()
def list_user_projects(self, user_id):
+
+ # FIXME(henry-nash) The following should take into account
+ # both group and inherited roles. In fact, I don't see why this
+ # call can't be handled at the controller level like we do
+ # with 'get_roles_for_user_and_project()'. Further, this
+ # call seems essentially the same as 'get_projects_for_user()'
+ # earlier in this driver. Both should be removed.
+
session = self.get_session()
user = self.identity_api.get_user(user_id)
metadata_refs = session\
@@ -627,6 +686,29 @@ class Role(sql.ModelBase, sql.DictBase):
class BaseGrant(sql.DictBase):
+ """Base Grant class.
+
+ There are four grant tables in the current implementation, one for
+ each type of grant:
+
+ - User for Project
+ - User for Domain
+ - Group for Project
+ - Group for Domain
+
+ Each is a table with the two attributes above as a combined primary key,
+ with the data field holding all roles for that combination. The data
+ field is a list of dicts. For regular role assignments each dict in
+ the list of of the form:
+
+ {'id': role_id}
+
+ If the OS-INHERIT extension is enabled and the role on a domain is an
+ inherited role, the dict will be of the form:
+
+ {'id': role_id, 'inherited_to': 'projects'}
+
+ """
def to_dict(self):
"""Override parent to_dict() method with a simpler implementation.
diff --git a/keystone/assignment/core.py b/keystone/assignment/core.py
index 531da02e..b71e2a18 100644
--- a/keystone/assignment/core.py
+++ b/keystone/assignment/core.py
@@ -59,33 +59,72 @@ class Manager(manager.Manager):
self.identity_api.assignment_api = self
def get_roles_for_user_and_project(self, user_id, tenant_id):
- def _get_group_project_roles(user_id, tenant_id):
+ """Get the roles associated with a user within given project.
+
+ This includes roles directly assigned to the user on the
+ project, as well as those by virtue of group membership. If
+ the OS-INHERIT extension is enabled, then this will also
+ include roles inherited from the domain.
+
+ :returns: a list of role ids.
+ :raises: keystone.exception.UserNotFound,
+ keystone.exception.ProjectNotFound
+
+ """
+ def _get_group_project_roles(user_id, project_ref):
role_list = []
group_refs = (self.identity_api.list_groups_for_user
(user_id=user_id))
for x in group_refs:
try:
- metadata_ref = self._get_metadata(group_id=x['id'],
- tenant_id=tenant_id)
- role_list += metadata_ref.get('roles', [])
+ metadata_ref = self._get_metadata(
+ group_id=x['id'], tenant_id=project_ref['id'])
+ role_list += self._roles_from_role_dicts(
+ metadata_ref.get('roles', {}), False)
except exception.MetadataNotFound:
# no group grant, skip
pass
+
+ if CONF.os_inherit.enabled:
+ # Now get any inherited group roles for the owning domain
+ try:
+ metadata_ref = self._get_metadata(
+ group_id=x['id'],
+ domain_id=project_ref['domain_id'])
+ role_list += self._roles_from_role_dicts(
+ metadata_ref.get('roles', {}), True)
+ except (exception.MetadataNotFound,
+ exception.NotImplemented):
+ pass
+
return role_list
- def _get_user_project_roles(user_id, tenant_id):
- metadata_ref = {}
+ def _get_user_project_roles(user_id, project_ref):
+ role_list = []
try:
metadata_ref = self._get_metadata(user_id=user_id,
- tenant_id=tenant_id)
+ tenant_id=project_ref['id'])
+ role_list = self._roles_from_role_dicts(
+ metadata_ref.get('roles', {}), False)
except exception.MetadataNotFound:
pass
- return metadata_ref.get('roles', [])
+
+ if CONF.os_inherit.enabled:
+ # Now get any inherited roles for the owning domain
+ try:
+ metadata_ref = self._get_metadata(
+ user_id=user_id, domain_id=project_ref['domain_id'])
+ role_list += self._roles_from_role_dicts(
+ metadata_ref.get('roles', {}), True)
+ except (exception.MetadataNotFound, exception.NotImplemented):
+ pass
+
+ return role_list
self.identity_api.get_user(user_id)
- self.get_project(tenant_id)
- user_role_list = _get_user_project_roles(user_id, tenant_id)
- group_role_list = _get_group_project_roles(user_id, tenant_id)
+ project_ref = self.get_project(tenant_id)
+ user_role_list = _get_user_project_roles(user_id, project_ref)
+ group_role_list = _get_group_project_roles(user_id, project_ref)
# Use set() to process the list to remove any duplicates
return list(set(user_role_list + group_role_list))
@@ -106,11 +145,12 @@ class Manager(manager.Manager):
try:
metadata_ref = self._get_metadata(group_id=x['id'],
domain_id=domain_id)
- role_list += metadata_ref.get('roles', [])
+ role_list += self._roles_from_role_dicts(
+ metadata_ref.get('roles', {}), False)
except (exception.MetadataNotFound, exception.NotImplemented):
# MetadataNotFound implies no group grant, so skip.
# Ignore NotImplemented since not all backends support
- # domains. pass
+ # domains.
pass
return role_list
@@ -124,7 +164,8 @@ class Manager(manager.Manager):
# Ignore NotImplemented since not all backends support
# domains
pass
- return metadata_ref.get('roles', [])
+ return self._roles_from_role_dicts(
+ metadata_ref.get('roles', {}), False)
self.identity_api.get_user(user_id)
self.get_domain(domain_id)
@@ -160,6 +201,40 @@ class Manager(manager.Manager):
class Driver(object):
+ def _role_to_dict(self, role_id, inherited):
+ role_dict = {'id': role_id}
+ if inherited:
+ role_dict['inherited_to'] = 'projects'
+ return role_dict
+
+ def _roles_from_role_dicts(self, dict_list, inherited):
+ role_list = []
+ for d in dict_list:
+ if ((not d.get('inherited_to') and not inherited) or
+ (d.get('inherited_to') == 'projects' and inherited)):
+ role_list.append(d['id'])
+ return role_list
+
+ def _add_role_to_role_dicts(self, role_id, inherited, dict_list,
+ allow_existing=True):
+ # There is a difference in error semantics when trying to
+ # assign a role that already exists between the coded v2 and v3
+ # API calls. v2 will error if the assignment already exists,
+ # while v3 is silent. Setting the 'allow_existing' parameter
+ # appropriately lets this call be used for both.
+ role_set = set([frozenset(r.items()) for r in dict_list])
+ key = frozenset(self._role_to_dict(role_id, inherited).items())
+ if not allow_existing and key in role_set:
+ raise KeyError
+ role_set.add(key)
+ return [dict(r) for r in role_set]
+
+ def _remove_role_from_role_dicts(self, role_id, inherited, dict_list):
+ role_set = set([frozenset(r.items()) for r in dict_list])
+ role_set.remove(frozenset(self._role_to_dict(role_id,
+ inherited).items()))
+ return [dict(r) for r in role_set]
+
def get_project_by_name(self, tenant_name, domain_id):
"""Get a tenant by name.
@@ -209,9 +284,14 @@ class Driver(object):
# assignment/grant crud
def create_grant(self, role_id, user_id=None, group_id=None,
- domain_id=None, project_id=None):
+ domain_id=None, project_id=None,
+ inherited_to_projects=False):
"""Creates a new assignment/grant.
+ If the assignment is to a domain, then optionally it may be
+ specified as inherited to owned projects (this requires
+ the OS-INHERIT extension to be enabled).
+
:raises: keystone.exception.UserNotFound,
keystone.exception.GroupNotFound,
keystone.exception.ProjectNotFound,
@@ -223,7 +303,8 @@ class Driver(object):
raise exception.NotImplemented()
def list_grants(self, user_id=None, group_id=None,
- domain_id=None, project_id=None):
+ domain_id=None, project_id=None,
+ inherited_to_projects=False):
"""Lists assignments/grants.
:raises: keystone.exception.UserNotFound,
@@ -237,7 +318,8 @@ class Driver(object):
raise exception.NotImplemented()
def get_grant(self, role_id, user_id=None, group_id=None,
- domain_id=None, project_id=None):
+ domain_id=None, project_id=None,
+ inherited_to_projects=False):
"""Lists assignments/grants.
:raises: keystone.exception.UserNotFound,
@@ -251,7 +333,8 @@ class Driver(object):
raise exception.NotImplemented()
def delete_grant(self, role_id, user_id=None, group_id=None,
- domain_id=None, project_id=None):
+ domain_id=None, project_id=None,
+ inherited_to_projects=False):
"""Lists assignments/grants.
:raises: keystone.exception.UserNotFound,
@@ -329,7 +412,7 @@ class Driver(object):
"""
raise exception.NotImplemented()
- def list_projects(self):
+ def list_projects(self, domain_id=None):
"""List all projects in the system.
:returns: a list of project_refs or an empty list.
diff --git a/keystone/common/config.py b/keystone/common/config.py
index fc428f8c..030ead69 100644
--- a/keystone/common/config.py
+++ b/keystone/common/config.py
@@ -214,6 +214,9 @@ def configure():
# trust
register_bool('enabled', group='trust', default=True)
+ # os_inherit
+ register_bool('enabled', group='os_inherit', default=False)
+
# ssl
register_bool('enable', group='ssl', default=False)
register_str('certfile', group='ssl',
diff --git a/keystone/common/sql/migrate_repo/versions/028_fixup_group_metadata.py b/keystone/common/sql/migrate_repo/versions/028_fixup_group_metadata.py
new file mode 100644
index 00000000..66055a99
--- /dev/null
+++ b/keystone/common/sql/migrate_repo/versions/028_fixup_group_metadata.py
@@ -0,0 +1,190 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import sqlalchemy as sql
+
+
+def upgrade(migrate_engine):
+ # The group_project_metadata table was not updated in terms of its
+ # FK to the tenant table when the tenant->project change was made at
+ # the 015 migration for sqlite. This upgrade fixes that.
+ # We need to create a fake tenant table so that we can first load
+ # the group_project_metadata at all, then do a dance of copying tables
+ # to get us to the correct schema.
+ meta = sql.MetaData()
+ meta.bind = migrate_engine
+
+ if migrate_engine.name != 'sqlite':
+ return
+
+ temp_tenant_table = sql.Table(
+ 'tenant',
+ meta,
+ sql.Column('id', sql.String(64), primary_key=True))
+ temp_tenant_table.create(migrate_engine, checkfirst=True)
+
+ sql.Table('user', meta, autoload=True)
+ old_group_metadata_table = sql.Table('group_project_metadata',
+ meta, autoload=True)
+
+ # OK, we now have the table loaded, create a first
+ # temporary table of a different name with the correct FK
+ sql.Table('project', meta, autoload=True)
+ temp_group_project_metadata_table = sql.Table(
+ 'temp_group_project_metadata',
+ meta,
+ sql.Column(
+ 'group_id',
+ sql.String(64),
+ primary_key=True),
+ sql.Column(
+ 'project_id',
+ sql.String(64),
+ sql.ForeignKey('project.id'),
+ primary_key=True),
+ sql.Column('data', sql.Text()))
+ temp_group_project_metadata_table.create(migrate_engine, checkfirst=True)
+
+ # Populate the new temporary table, and then drop the old one
+ session = sql.orm.sessionmaker(bind=migrate_engine)()
+
+ for metadata in session.query(old_group_metadata_table):
+ q = temp_group_project_metadata_table.insert().values(
+ group_id=metadata.group_id,
+ project_id=metadata.project_id,
+ data=metadata.data)
+ session.execute(q)
+ session.commit()
+ old_group_metadata_table.drop()
+ temp_tenant_table.drop()
+
+ # Now do a final table copy to get the table of the right name.
+ # Re-init the metadata so that sqlalchemy does not get confused with
+ # multiple versions of the same named table.
+ meta2 = sql.MetaData()
+ meta2.bind = migrate_engine
+
+ sql.Table('project', meta2, autoload=True)
+ new_group_project_metadata_table = sql.Table(
+ 'group_project_metadata',
+ meta2,
+ sql.Column(
+ 'group_id',
+ sql.String(64),
+ primary_key=True),
+ sql.Column(
+ 'project_id',
+ sql.String(64),
+ sql.ForeignKey('project.id'),
+ primary_key=True),
+ sql.Column('data', sql.Text()))
+ new_group_project_metadata_table.create(migrate_engine, checkfirst=True)
+
+ for metadata in session.query(temp_group_project_metadata_table):
+ q = new_group_project_metadata_table.insert().values(
+ group_id=metadata.group_id,
+ project_id=metadata.project_id,
+ data=metadata.data)
+ session.execute(q)
+ session.commit()
+
+ temp_group_project_metadata_table.drop()
+
+
+def downgrade(migrate_engine):
+ # Put the group_project_metadata table back the way it was in its rather
+ # broken state. We don't try and re-write history, since otherwise people
+ # get out of step.
+ meta = sql.MetaData()
+ meta.bind = migrate_engine
+
+ if migrate_engine.name != 'sqlite':
+ return
+
+ sql.Table('user', meta, autoload=True)
+ sql.Table('project', meta, autoload=True)
+ group_metadata_table = sql.Table('group_project_metadata',
+ meta, autoload=True)
+
+ # We want to create a temp group meta table with the FK
+ # set to the wrong place.
+ temp_tenant_table = sql.Table(
+ 'tenant',
+ meta,
+ sql.Column('id', sql.String(64), primary_key=True))
+ temp_tenant_table.create(migrate_engine, checkfirst=True)
+
+ temp_group_project_metadata_table = sql.Table(
+ 'temp_group_project_metadata',
+ meta,
+ sql.Column(
+ 'group_id',
+ sql.String(64),
+ primary_key=True),
+ sql.Column(
+ 'project_id',
+ sql.String(64),
+ sql.ForeignKey('tenant.id'),
+ primary_key=True),
+ sql.Column('data', sql.Text()))
+ temp_group_project_metadata_table.create(migrate_engine, checkfirst=True)
+
+ # Now populate the temp table and drop the real one
+ session = sql.orm.sessionmaker(bind=migrate_engine)()
+
+ for metadata in session.query(group_metadata_table):
+ q = temp_group_project_metadata_table.insert().values(
+ group_id=metadata.group_id,
+ project_id=metadata.project_id,
+ data=metadata.data)
+ session.execute(q)
+
+ session.commit()
+ group_metadata_table.drop()
+
+ # Now copy again into the correctly named table. Re-init the metadata
+ # so that sqlalchemy does not get confused with multiple versions of the
+ # same named table.
+ meta2 = sql.MetaData()
+ meta2.bind = migrate_engine
+
+ sql.Table('tenant', meta2, autoload=True)
+ new_group_project_metadata_table = sql.Table(
+ 'group_project_metadata',
+ meta2,
+ sql.Column(
+ 'group_id',
+ sql.String(64),
+ primary_key=True),
+ sql.Column(
+ 'project_id',
+ sql.String(64),
+ sql.ForeignKey('tenant.id'),
+ primary_key=True),
+ sql.Column('data', sql.Text()))
+ new_group_project_metadata_table.create(migrate_engine, checkfirst=True)
+
+ for metadata in session.query(temp_group_project_metadata_table):
+ q = new_group_project_metadata_table.insert().values(
+ group_id=metadata.group_id,
+ project_id=metadata.project_id,
+ data=metadata.data)
+ session.execute(q)
+
+ session.commit()
+
+ temp_group_project_metadata_table.drop()
+ temp_tenant_table.drop()
diff --git a/keystone/common/sql/migrate_repo/versions/029_update_assignment_metadata.py b/keystone/common/sql/migrate_repo/versions/029_update_assignment_metadata.py
new file mode 100644
index 00000000..a9276804
--- /dev/null
+++ b/keystone/common/sql/migrate_repo/versions/029_update_assignment_metadata.py
@@ -0,0 +1,102 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+
+import sqlalchemy as sql
+
+
+def build_update(table_name, upgrade_table, row, values):
+ if table_name == 'user_project_metadata':
+ update = upgrade_table.update().where(
+ upgrade_table.c.user_id == row.user_id).where(
+ upgrade_table.c.project_id == row.project_id).values(values)
+ elif table_name == 'group_project_metadata':
+ update = upgrade_table.update().where(
+ upgrade_table.c.group_id == row.group_id).where(
+ upgrade_table.c.project_id == row.project_id).values(values)
+ elif table_name == 'user_domain_metadata':
+ update = upgrade_table.update().where(
+ upgrade_table.c.user_id == row.user_id).where(
+ upgrade_table.c.domain_id == row.domain_id).values(values)
+ else:
+ update = upgrade_table.update().where(
+ upgrade_table.c.group_id == row.group_id).where(
+ upgrade_table.c.domain_id == row.domain_id).values(values)
+ return update
+
+
+def upgrade_grant_table(meta, migrate_engine, session, table_name):
+
+ # Convert the roles component of the metadata from a list
+ # of ids to a list of dicts
+
+ def list_to_dict_list(metadata):
+ json_metadata = json.loads(metadata)
+ if 'roles' in json_metadata:
+ json_metadata['roles'] = (
+ [{'id': x} for x in json_metadata['roles']])
+ return json.dumps(json_metadata)
+
+ upgrade_table = sql.Table(table_name, meta, autoload=True)
+ for assignment in session.query(upgrade_table):
+ values = {'data': list_to_dict_list(assignment.data)}
+ update = build_update(table_name, upgrade_table, assignment, values)
+ migrate_engine.execute(update)
+
+
+def downgrade_grant_table(meta, migrate_engine, session, table_name):
+
+ # Convert the roles component of the metadata from a list
+ # of dicts to a simple list of ids. Any inherited roles are deleted
+ # since they would have no meaning
+
+ def dict_list_to_list(metadata):
+ json_metadata = json.loads(metadata)
+ if 'roles' in json_metadata:
+ json_metadata['roles'] = ([x['id'] for x in json_metadata['roles']
+ if 'inherited_to' not in x])
+ return json.dumps(json_metadata)
+
+ downgrade_table = sql.Table(table_name, meta, autoload=True)
+ for assignment in session.query(downgrade_table):
+ values = {'data': dict_list_to_list(assignment.data)}
+ update = build_update(table_name, downgrade_table, assignment, values)
+ migrate_engine.execute(update)
+
+
+def upgrade(migrate_engine):
+ meta = sql.MetaData()
+ meta.bind = migrate_engine
+ session = sql.orm.sessionmaker(bind=migrate_engine)()
+
+ for grant_table in ['user_project_metadata', 'user_domain_metadata',
+ 'group_project_metadata', 'group_domain_metadata']:
+ upgrade_grant_table(meta, migrate_engine, session, grant_table)
+ session.commit()
+ session.close()
+
+
+def downgrade(migrate_engine):
+ meta = sql.MetaData()
+ meta.bind = migrate_engine
+ session = sql.orm.sessionmaker(bind=migrate_engine)()
+
+ for grant_table in ['user_project_metadata', 'user_domain_metadata',
+ 'group_project_metadata', 'group_domain_metadata']:
+ downgrade_grant_table(meta, migrate_engine, session, grant_table)
+ session.commit()
+ session.close()
diff --git a/keystone/identity/controllers.py b/keystone/identity/controllers.py
index 9271f3d9..b4e8ea6a 100644
--- a/keystone/identity/controllers.py
+++ b/keystone/identity/controllers.py
@@ -748,6 +748,11 @@ class RoleV3(controller.V3Controller):
msg = 'Specify a user or group, not both'
raise exception.ValidationError(msg)
+ def _check_if_inherited(self, context):
+ return (CONF.os_inherit.enabled and
+ context['path'].startswith('/OS-INHERIT') and
+ context['path'].endswith('/inherited_to_projects'))
+
@controller.protected
def create_grant(self, context, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
@@ -756,7 +761,8 @@ class RoleV3(controller.V3Controller):
self._require_user_xor_group(user_id, group_id)
self.identity_api.create_grant(
- role_id, user_id, group_id, domain_id, project_id)
+ role_id, user_id, group_id, domain_id, project_id,
+ self._check_if_inherited(context))
@controller.protected
def list_grants(self, context, user_id=None, group_id=None,
@@ -766,7 +772,8 @@ class RoleV3(controller.V3Controller):
self._require_user_xor_group(user_id, group_id)
refs = self.identity_api.list_grants(
- user_id, group_id, domain_id, project_id)
+ user_id, group_id, domain_id, project_id,
+ self._check_if_inherited(context))
return RoleV3.wrap_collection(context, refs)
@controller.protected
@@ -777,7 +784,8 @@ class RoleV3(controller.V3Controller):
self._require_user_xor_group(user_id, group_id)
self.identity_api.get_grant(
- role_id, user_id, group_id, domain_id, project_id)
+ role_id, user_id, group_id, domain_id, project_id,
+ self._check_if_inherited(context))
@controller.protected
def revoke_grant(self, context, role_id, user_id=None, group_id=None,
@@ -787,7 +795,8 @@ class RoleV3(controller.V3Controller):
self._require_user_xor_group(user_id, group_id)
self.identity_api.delete_grant(
- role_id, user_id, group_id, domain_id, project_id)
+ role_id, user_id, group_id, domain_id, project_id,
+ self._check_if_inherited(context))
# Now delete any tokens for this user or, in the case of a group,
# tokens from all the uses who are members of this group.
@@ -815,13 +824,55 @@ class RoleAssignmentV3(controller.V3Controller):
pass
def _format_entity(self, entity):
+ """Format an assignment entity for API response.
+
+ The driver layer returns entities as dicts containing the ids of the
+ actor (e.g. user or group), target (e.g. domain or project) and role.
+ If it is an inherited role, then this is also indicated. Examples:
+
+ {'user_id': user_id,
+ 'project_id': domain_id,
+ 'role_id': role_id}
+
+ or, for an inherited role:
+
+ {'user_id': user_id,
+ 'domain_id': domain_id,
+ 'role_id': role_id,
+ 'inherited_to_projects': true}
+
+ This function maps this into the format to be returned via the API,
+ e.g. for the second example above:
+
+ {
+ 'user': {
+ {'id': user_id}
+ },
+ 'scope': {
+ 'domain': {
+ {'id': domain_id}
+ },
+ 'OS-INHERIT:inherited_to': 'projects
+ },
+ 'role': {
+ {'id': role_id}
+ },
+ 'links': {
+ 'assignment': '/domains/domain_id/users/user_id/roles/'
+ 'role_id/inherited_to_projects'
+ }
+ }
+
+ """
+
formatted_entity = {}
+ suffix = ""
if 'user_id' in entity:
formatted_entity['user'] = {'id': entity['user_id']}
- actor_link = '/users/%s' % entity['user_id']
+ actor_link = 'users/%s' % entity['user_id']
if 'group_id' in entity:
formatted_entity['group'] = {'id': entity['group_id']}
- actor_link = '/groups/%s' % entity['group_id']
+ actor_link = 'groups/%s' % entity['group_id']
if 'role_id' in entity:
formatted_entity['role'] = {'id': entity['role_id']}
if 'project_id' in entity:
@@ -831,12 +882,21 @@ class RoleAssignmentV3(controller.V3Controller):
if 'domain_id' in entity:
formatted_entity['scope'] = (
{'domain': {'id': entity['domain_id']}})
- target_link = '/domains/%s' % entity['domain_id']
-
+ if 'inherited_to_projects' in entity:
+ formatted_entity['scope']['OS-INHERIT:inherited_to'] = (
+ 'projects')
+ target_link = '/OS-INHERIT/domains/%s' % entity['domain_id']
+ suffix = '/inherited_to_projects'
+ else:
+ target_link = '/domains/%s' % entity['domain_id']
formatted_entity.setdefault('links', {})
formatted_entity['links']['assignment'] = (
- self.base_url(target_link + actor_link +
- '/roles/%s' % entity['role_id']))
+ self.base_url('%(target)s/%(actor)s/roles/%(role)s%(suffix)s' % {
+ 'target': target_link,
+ 'actor': actor_link,
+ 'role': entity['role_id'],
+ 'suffix': suffix}))
+
return formatted_entity
def _expand_indirect_assignments(self, refs):
@@ -846,6 +906,10 @@ class RoleAssignmentV3(controller.V3Controller):
entity for each member of that group, and then remove the group
assignment entity itself from the list.
+ If the OS-INHERIT extension is enabled, then honor any inherited
+ roles on the domain by creating the equivalent on all projects
+ owned by the domain.
+
For any new entity created by virtue of group membership, add in an
additional link to that membership.
@@ -881,7 +945,8 @@ class RoleAssignmentV3(controller.V3Controller):
'role': ref.get('role_id')})
return members
- def _build_equivalent_user_assignment(user, group_id, template):
+ def _build_user_assignment_equivalent_of_group(
+ user, group_id, template):
"""Create a user assignment equivalent to the group one.
The template has had the 'group' entity removed, so
@@ -900,33 +965,112 @@ class RoleAssignmentV3(controller.V3Controller):
'/projects/%s' % scope['project']['id'])
user_entry['links']['assignment'] = (
self.base_url('%s/users/%s/roles/%s' %
- (target_link, m['id'],
+ (target_link, user['id'],
user_entry['role']['id'])))
user_entry['links']['membership'] = (
self.base_url('/groups/%s/users/%s' %
(group_id, user['id'])))
return user_entry
- # Scan the list of entities for any group assignments, expanding
- # them into equivalent user entities. Due to potential large
- # expansion of group entities, rather than modify the
+ def _build_project_equivalent_of_user_domain_role(
+ project_id, domain_id, template):
+ """Create a user project assignment equivalent to the domain one.
+
+ The template has had the 'domain' entity removed, so
+ substitute a 'project' one, modifying the 'assignment' link
+ to match.
+
+ """
+ project_entry = copy.deepcopy(template)
+ project_entry['scope']['project'] = {'id': project_id}
+ project_entry['links']['assignment'] = (
+ self.base_url(
+ '/OS-INHERIT/domains/%s/users/%s/roles/%s'
+ '/inherited_to_projects' % (
+ domain_id, project_entry['user']['id'],
+ project_entry['role']['id'])))
+ return project_entry
+
+ def _build_project_equivalent_of_group_domain_role(
+ user_id, group_id, project_id, domain_id, template):
+ """Create a user project equivalent to the domain group one.
+
+ The template has had the 'domain' and 'group' entities removed, so
+ substitute a 'user-project' one, modifying the 'assignment' link
+ to match.
+
+ """
+ project_entry = copy.deepcopy(template)
+ project_entry['user'] = {'id': user_id}
+ project_entry['scope']['project'] = {'id': project_id}
+ project_entry['links']['assignment'] = (
+ self.base_url('/OS-INHERIT/domains/%s/groups/%s/roles/%s'
+ '/inherited_to_projects' % (
+ domain_id, group_id,
+ project_entry['role']['id'])))
+ project_entry['links']['membership'] = (
+ self.base_url('/groups/%s/users/%s' %
+ (group_id, user_id)))
+ return project_entry
+
+ # Scan the list of entities for any assignments that need to be
+ # expanded.
+ #
+ # If the OS-INERIT extension is enabled, the refs lists may
+ # contain roles to be inherited from domain to project, so expand
+ # these as well into project equivalents
+ #
+ # For any regular group entries, expand these into user entries based
+ # on membership of that group.
+ #
+ # Due to the potentially large expansions, rather than modify the
# list we are enumerating, we build a new one as we go.
+ #
+
new_refs = []
for r in refs:
- if 'group' in r:
- # As it is a group role assignment, first get the list of
- # members.
-
+ if 'OS-INHERIT:inherited_to' in r['scope']:
+ # It's an inherited domain role - so get the list of projects
+ # owned by this domain. A domain scope is guaranteed since we
+ # checked this when we built the refs list
+ project_ids = (
+ [x['id'] for x in self.assignment_api.list_projects(
+ r['scope']['domain']['id'])])
+ base_entry = copy.deepcopy(r)
+ domain_id = base_entry['scope']['domain']['id']
+ base_entry['scope'].pop('domain')
+ # For each project, create an equivalent role assignment
+ for p in project_ids:
+ # If it's a group assignment, then create equivalent user
+ # roles based on membership of the group
+ if 'group' in base_entry:
+ members = _get_group_members(base_entry)
+ sub_entry = copy.deepcopy(base_entry)
+ group_id = sub_entry['group']['id']
+ sub_entry.pop('group')
+ for m in members:
+ new_entry = (
+ _build_project_equivalent_of_group_domain_role(
+ m['id'], group_id, p,
+ domain_id, sub_entry))
+ new_refs.append(new_entry)
+ else:
+ new_entry = (
+ _build_project_equivalent_of_user_domain_role(
+ p, domain_id, base_entry))
+ new_refs.append(new_entry)
+ elif 'group' in r:
+ # It's a non-inherited group role assignment, so get the list
+ # of members.
members = _get_group_members(r)
# Now replace that group role assignment entry with an
# equivalent user role assignment for each of the group members
-
base_entry = copy.deepcopy(r)
group_id = base_entry['group']['id']
base_entry.pop('group')
for m in members:
- user_entry = _build_equivalent_user_assignment(
+ user_entry = _build_user_assignment_equivalent_of_group(
m, group_id, base_entry)
new_refs.append(user_entry)
else:
@@ -954,9 +1098,16 @@ class RoleAssignmentV3(controller.V3Controller):
val = True
return val
+ def _filter_inherited(self, entry):
+ if ('inherited_to_projects' in entry and
+ not CONF.os_inherit.enabled):
+ return False
+ else:
+ return True
+
@controller.filterprotected('group.id', 'role.id',
'scope.domain.id', 'scope.project.id',
- 'user.id')
+ 'scope.OS-INHERIT:inherited_to', 'user.id')
def list_role_assignments(self, context, filters):
# TODO(henry-nash): This implementation uses the standard filtering
@@ -966,7 +1117,9 @@ class RoleAssignmentV3(controller.V3Controller):
# kept a minimum.
refs = self.identity_api.list_role_assignments()
- formatted_refs = [self._format_entity(x) for x in refs]
+ formatted_refs = (
+ [self._format_entity(x) for x in refs
+ if self._filter_inherited(x)])
if ('effective' in context['query_string'] and
self._query_filter_is_true(
diff --git a/keystone/identity/core.py b/keystone/identity/core.py
index 981d24ed..d04902ae 100644
--- a/keystone/identity/core.py
+++ b/keystone/identity/core.py
@@ -107,8 +107,8 @@ class Manager(manager.Manager):
def get_project(self, tenant_id):
return self.assignment.get_project(tenant_id)
- def list_projects(self):
- return self.assignment.list_projects()
+ def list_projects(self, domain_id=None):
+ return self.assignment.list_projects(domain_id)
def get_role(self, role_id):
return self.assignment.get_role(role_id)
@@ -156,24 +156,32 @@ class Manager(manager.Manager):
return self.assignment.update_role(role_id, role)
def create_grant(self, role_id, user_id=None, group_id=None,
- domain_id=None, project_id=None):
+ domain_id=None, project_id=None,
+ inherited_to_projects=False):
return (self.assignment.create_grant
- (role_id, user_id, group_id, domain_id, project_id))
+ (role_id, user_id, group_id, domain_id, project_id,
+ inherited_to_projects))
def list_grants(self, user_id=None, group_id=None,
- domain_id=None, project_id=None):
+ domain_id=None, project_id=None,
+ inherited_to_projects=False):
return (self.assignment.list_grants
- (user_id, group_id, domain_id, project_id))
+ (user_id, group_id, domain_id, project_id,
+ inherited_to_projects))
def get_grant(self, role_id, user_id=None, group_id=None,
- domain_id=None, project_id=None):
+ domain_id=None, project_id=None,
+ inherited_to_projects=False):
return (self.assignment.get_grant
- (role_id, user_id, group_id, domain_id, project_id))
+ (role_id, user_id, group_id, domain_id, project_id,
+ inherited_to_projects))
def delete_grant(self, role_id, user_id=None, group_id=None,
- domain_id=None, project_id=None):
+ domain_id=None, project_id=None,
+ inherited_to_projects=False):
return (self.assignment.delete_grant
- (role_id, user_id, group_id, domain_id, project_id))
+ (role_id, user_id, group_id, domain_id, project_id,
+ inherited_to_projects))
def create_domain(self, domain_id, domain):
return self.assignment.create_domain(domain_id, domain)
@@ -214,6 +222,7 @@ class Driver(object):
:raises: AssertionError
"""
raise exception.NotImplemented()
+
# user crud
def create_user(self, user_id, user):
diff --git a/keystone/identity/routers.py b/keystone/identity/routers.py
index ab71eb4f..5f236842 100644
--- a/keystone/identity/routers.py
+++ b/keystone/identity/routers.py
@@ -16,6 +16,7 @@
"""WSGI Routers for the Identity service."""
from keystone.common import router
from keystone.common import wsgi
+from keystone import config
from keystone.identity import controllers
@@ -174,6 +175,47 @@ def append_v3_routers(mapper, routers):
action='revoke_grant',
conditions=dict(method=['DELETE']))
+ if config.CONF.os_inherit.enabled:
+ mapper.connect(('/OS-INHERIT/domains/{domain_id}/users/{user_id}'
+ '/roles/{role_id}/inherited_to_projects'),
+ controller=role_controller,
+ action='create_grant',
+ conditions=dict(method=['PUT']))
+ mapper.connect(('/OS-INHERIT/domains/{domain_id}/groups/{group_id}'
+ '/roles/{role_id}/inherited_to_projects'),
+ controller=role_controller,
+ action='create_grant',
+ conditions=dict(method=['PUT']))
+ mapper.connect(('/OS-INHERIT/domains/{domain_id}/users/{user_id}'
+ '/roles/{role_id}/inherited_to_projects'),
+ controller=role_controller,
+ action='check_grant',
+ conditions=dict(method=['HEAD']))
+ mapper.connect(('/OS-INHERIT/domains/{domain_id}/groups/{group_id}'
+ '/roles/{role_id}/inherited_to_projects'),
+ controller=role_controller,
+ action='check_grant',
+ conditions=dict(method=['HEAD']))
+ mapper.connect(('/OS-INHERIT/domains/{domain_id}/users/{user_id}'
+ '/roles/inherited_to_projects'),
+ controller=role_controller,
+ action='list_grants',
+ conditions=dict(method=['GET']))
+ mapper.connect(('/OS-INHERIT/domains/{domain_id}/groups/{group_id}'
+ '/roles/inherited_to_projects'),
+ controller=role_controller,
+ action='list_grants',
+ conditions=dict(method=['GET']))
+ mapper.connect(('/OS-INHERIT/domains/{domain_id}/users/{user_id}'
+ '/roles/{role_id}/inherited_to_projects'),
+ controller=role_controller,
+ action='revoke_grant',
+ conditions=dict(method=['DELETE']))
+ mapper.connect(('/OS-INHERIT/domains/{domain_id}/groups/{group_id}'
+ '/roles/{role_id}/inherited_to_projects'),
+ controller=role_controller,
+ action='revoke_grant',
+ conditions=dict(method=['DELETE']))
routers.append(
router.Router(controllers.RoleAssignmentV3(),
'role_assignments', 'role_assignment'))
diff --git a/tests/test_backend.py b/tests/test_backend.py
index 48e8d3a0..8599ba24 100644
--- a/tests/test_backend.py
+++ b/tests/test_backend.py
@@ -1106,12 +1106,17 @@ class IdentityTests(object):
group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain1['id'], 'enabled': True}
self.identity_api.create_group(group1['id'], group1)
+ group2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
+ 'domain_id': domain1['id'], 'enabled': True}
+ self.identity_api.create_group(group2['id'], group2)
project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain1['id']}
self.identity_api.create_project(project1['id'], project1)
self.identity_api.add_user_to_group(user1['id'],
group1['id'])
+ self.identity_api.add_user_to_group(user1['id'],
+ group2['id'])
roles_ref = self.identity_api.list_grants(
user_id=user1['id'],
@@ -1766,6 +1771,30 @@ class IdentityTests(object):
self.assertIn(self.tenant_bar['id'], project_ids)
self.assertIn(self.tenant_baz['id'], project_ids)
+ def test_list_projects_for_domain(self):
+ project_ids = ([x['id'] for x in
+ self.assignment_api.list_projects(DEFAULT_DOMAIN_ID)])
+ self.assertEquals(len(project_ids), 4)
+ self.assertIn(self.tenant_bar['id'], project_ids)
+ self.assertIn(self.tenant_baz['id'], project_ids)
+ self.assertIn(self.tenant_mtu['id'], project_ids)
+ self.assertIn(self.tenant_service['id'], project_ids)
+
+ def test_list_projects_for_alternate_domain(self):
+ domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.assignment_api.create_domain(domain1['id'], domain1)
+ project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
+ 'domain_id': domain1['id']}
+ self.assignment_api.create_project(project1['id'], project1)
+ project2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
+ 'domain_id': domain1['id']}
+ self.assignment_api.create_project(project2['id'], project2)
+ project_ids = ([x['id'] for x in
+ self.assignment_api.list_projects(domain1['id'])])
+ self.assertEquals(len(project_ids), 2)
+ self.assertIn(project1['id'], project_ids)
+ self.assertIn(project2['id'], project_ids)
+
def test_list_roles(self):
roles = self.identity_api.list_roles()
for test_role in default_fixtures.ROLES:
@@ -2690,3 +2719,164 @@ class PolicyTests(object):
self.assertRaises(exception.PolicyNotFound,
self.policy_api.delete_policy,
uuid.uuid4().hex)
+
+
+class InheritanceTests(object):
+
+ def test_inherited_role_grants_for_user(self):
+ """Test inherited user roles.
+
+ Test Plan:
+ - Enable OS-INHERIT extension
+ - Create 3 roles
+ - Create a domain, with a project and a user
+ - Check no roles yet exit
+ - Assign a direct user role to the project and a (non-inherited)
+ user role to the domain
+ - Get a list of effective roles - should only get the one direct role
+ - Now add an inherited user role to the domain
+ - Get a list of effective roles - should have two roles, one
+ direct and one by virtue of the inherited user role
+ - Also get effective roles for the domain - the role marked as
+ inherited should not show up
+
+ """
+ self.opt_in_group('os_inherit', enabled=True)
+ role_list = []
+ for _ in range(3):
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.identity_api.create_role(role['id'], role)
+ role_list.append(role)
+ domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.identity_api.create_domain(domain1['id'], domain1)
+ user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
+ 'domain_id': domain1['id'], 'password': uuid.uuid4().hex,
+ 'enabled': True}
+ self.identity_api.create_user(user1['id'], user1)
+ project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
+ 'domain_id': domain1['id']}
+ self.identity_api.create_project(project1['id'], project1)
+
+ roles_ref = self.identity_api.list_grants(
+ user_id=user1['id'],
+ project_id=project1['id'])
+ self.assertEquals(len(roles_ref), 0)
+
+ # Create the first two roles - the domain one is not inherited
+ self.identity_api.create_grant(user_id=user1['id'],
+ project_id=project1['id'],
+ role_id=role_list[0]['id'])
+ self.identity_api.create_grant(user_id=user1['id'],
+ domain_id=domain1['id'],
+ role_id=role_list[1]['id'])
+
+ # Now get the effective roles for the user and project, this
+ # should only include the direct role assignment on the project
+ combined_role_list = self.identity_api.get_roles_for_user_and_project(
+ user1['id'], project1['id'])
+ self.assertEquals(len(combined_role_list), 1)
+ self.assertIn(role_list[0]['id'], combined_role_list)
+
+ # Now add an inherited role on the domain
+ self.identity_api.create_grant(user_id=user1['id'],
+ domain_id=domain1['id'],
+ role_id=role_list[2]['id'],
+ inherited_to_projects=True)
+
+ # Now get the effective roles for the user and project again, this
+ # should now include the inherited role on the domain
+ combined_role_list = self.identity_api.get_roles_for_user_and_project(
+ user1['id'], project1['id'])
+ self.assertEquals(len(combined_role_list), 2)
+ self.assertIn(role_list[0]['id'], combined_role_list)
+ self.assertIn(role_list[2]['id'], combined_role_list)
+
+ # Finally, check that the inherited role does not appear as a valid
+ # directly assigned role on the domain itself
+ combined_role_list = self.identity_api.get_roles_for_user_and_domain(
+ user1['id'], domain1['id'])
+ self.assertEquals(len(combined_role_list), 1)
+ self.assertIn(role_list[1]['id'], combined_role_list)
+
+ def test_inherited_role_grants_for_group(self):
+ """Test inherited group roles.
+
+ Test Plan:
+ - Enable OS-INHERIT extension
+ - Create 4 roles
+ - Create a domain, with a project, user and two groups
+ - Make the user a member of both groups
+ - Check no roles yet exit
+ - Assign a direct user role to the project and a (non-inherited)
+ group role on the domain
+ - Get a list of effective roles - should only get the one direct role
+ - Now add two inherited group roles to the domain
+ - Get a list of effective roles - should have three roles, one
+ direct and two by virtue of inherited group roles
+
+ """
+ self.opt_in_group('os_inherit', enabled=True)
+ role_list = []
+ for _ in range(4):
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.identity_api.create_role(role['id'], role)
+ role_list.append(role)
+ domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.identity_api.create_domain(domain1['id'], domain1)
+ user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
+ 'domain_id': domain1['id'], 'password': uuid.uuid4().hex,
+ 'enabled': True}
+ self.identity_api.create_user(user1['id'], user1)
+ group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
+ 'domain_id': domain1['id'], 'enabled': True}
+ self.identity_api.create_group(group1['id'], group1)
+ group2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
+ 'domain_id': domain1['id'], 'enabled': True}
+ self.identity_api.create_group(group2['id'], group2)
+ project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
+ 'domain_id': domain1['id']}
+ self.identity_api.create_project(project1['id'], project1)
+
+ self.identity_api.add_user_to_group(user1['id'],
+ group1['id'])
+ self.identity_api.add_user_to_group(user1['id'],
+ group2['id'])
+
+ roles_ref = self.identity_api.list_grants(
+ user_id=user1['id'],
+ project_id=project1['id'])
+ self.assertEquals(len(roles_ref), 0)
+
+ # Create two roles - the domain one is not inherited
+ self.identity_api.create_grant(user_id=user1['id'],
+ project_id=project1['id'],
+ role_id=role_list[0]['id'])
+ self.identity_api.create_grant(group_id=group1['id'],
+ domain_id=domain1['id'],
+ role_id=role_list[1]['id'])
+
+ # Now get the effective roles for the user and project, this
+ # should only include the direct role assignment on the project
+ combined_role_list = self.identity_api.get_roles_for_user_and_project(
+ user1['id'], project1['id'])
+ self.assertEquals(len(combined_role_list), 1)
+ self.assertIn(role_list[0]['id'], combined_role_list)
+
+ # Now add to more group roles, both inherited, to the domain
+ self.identity_api.create_grant(group_id=group2['id'],
+ domain_id=domain1['id'],
+ role_id=role_list[2]['id'],
+ inherited_to_projects=True)
+ self.identity_api.create_grant(group_id=group2['id'],
+ domain_id=domain1['id'],
+ role_id=role_list[3]['id'],
+ inherited_to_projects=True)
+
+ # Now get the effective roles for the user and project again, this
+ # should now include the inherited roles on the domain
+ combined_role_list = self.identity_api.get_roles_for_user_and_project(
+ user1['id'], project1['id'])
+ self.assertEquals(len(combined_role_list), 3)
+ self.assertIn(role_list[0]['id'], combined_role_list)
+ self.assertIn(role_list[2]['id'], combined_role_list)
+ self.assertIn(role_list[3]['id'], combined_role_list)
diff --git a/tests/test_backend_ldap.py b/tests/test_backend_ldap.py
index b68399ba..ce4a297c 100644
--- a/tests/test_backend_ldap.py
+++ b/tests/test_backend_ldap.py
@@ -630,6 +630,10 @@ class LDAPIdentity(test.TestCase, BaseLDAPIdentity):
user1['id'], CONF.identity.default_domain_id)
self.assertEquals(len(combined_role_list), 0)
+ def test_list_projects_for_alternate_domain(self):
+ raise nose.exc.SkipTest(
+ 'N/A: LDAP does not support multiple domains')
+
class LDAPIdentityEnabledEmulation(LDAPIdentity):
def setUp(self):
diff --git a/tests/test_backend_sql.py b/tests/test_backend_sql.py
index 2f1f5e3e..38eddaa4 100644
--- a/tests/test_backend_sql.py
+++ b/tests/test_backend_sql.py
@@ -410,3 +410,7 @@ class SqlCatalog(SqlTests, test_backend.CatalogTests):
class SqlPolicy(SqlTests, test_backend.PolicyTests):
pass
+
+
+class SqlInheritance(SqlTests, test_backend.InheritanceTests):
+ pass
diff --git a/tests/test_sql_upgrade.py b/tests/test_sql_upgrade.py
index e7e34b4b..64bf53c8 100644
--- a/tests/test_sql_upgrade.py
+++ b/tests/test_sql_upgrade.py
@@ -824,6 +824,304 @@ class SqlUpgradeTests(test.TestCase):
self.assertEqual(ref.legacy_endpoint_id, legacy_endpoint_id)
self.assertEqual(ref.extra, '{}')
+ def test_group_project_FK_fixup(self):
+ # To create test data we must start before we broke in the
+ # group_project_metadata table in 015.
+ self.upgrade(14)
+ session = self.Session()
+
+ domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True)
+ group_table = sqlalchemy.Table('group', self.metadata, autoload=True)
+ tenant_table = sqlalchemy.Table('tenant', self.metadata, autoload=True)
+ role_table = sqlalchemy.Table('role', self.metadata, autoload=True)
+ group_project_metadata_table = sqlalchemy.Table(
+ 'group_project_metadata', self.metadata, autoload=True)
+
+ # Create a Domain
+ domain = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'enabled': True}
+ session.execute(domain_table.insert().values(domain))
+
+ # Create two Tenants
+ tenant = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'extra': "{}"}
+ session.execute(tenant_table.insert().values(tenant))
+
+ tenant1 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'extra': "{}"}
+ session.execute(tenant_table.insert().values(tenant1))
+
+ # Create a Group
+ group = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'extra': json.dumps({})}
+ session.execute(group_table.insert().values(group))
+
+ # Create roles
+ role_list = []
+ for _ in range(2):
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ session.execute(role_table.insert().values(role))
+ role_list.append(role)
+
+ # Grant Role to User on Project
+ role_grant = {'group_id': group['id'],
+ 'project_id': tenant['id'],
+ 'data': json.dumps({'roles': [role_list[0]['id']]})}
+ session.execute(
+ group_project_metadata_table.insert().values(role_grant))
+
+ role_grant = {'group_id': group['id'],
+ 'project_id': tenant1['id'],
+ 'data': json.dumps({'roles': [role_list[1]['id']]})}
+ session.execute(
+ group_project_metadata_table.insert().values(role_grant))
+
+ session.commit()
+
+ # Now upgrade and fix up the FKs
+ self.upgrade(28)
+ self.assertTableExists('group_project_metadata')
+ self.assertTableExists('project')
+ self.assertTableDoesNotExist('tenant')
+
+ s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
+ (group_project_metadata_table.c.group_id == group['id']) &
+ (group_project_metadata_table.c.project_id == tenant['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[0]['id'], data['roles'])
+
+ s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
+ (group_project_metadata_table.c.group_id == group['id']) &
+ (group_project_metadata_table.c.project_id == tenant1['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[1]['id'], data['roles'])
+
+ self.downgrade(27)
+ self.assertTableExists('group_project_metadata')
+ self.assertTableExists('project')
+ self.assertTableDoesNotExist('tenant')
+
+ def test_assignment_metadata_migration(self):
+ self.upgrade(28)
+ # Scaffolding
+ session = self.Session()
+
+ domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True)
+ user_table = sqlalchemy.Table('user', self.metadata, autoload=True)
+ group_table = sqlalchemy.Table('group', self.metadata, autoload=True)
+ role_table = sqlalchemy.Table('role', self.metadata, autoload=True)
+ project_table = sqlalchemy.Table(
+ 'project', self.metadata, autoload=True)
+ user_project_metadata_table = sqlalchemy.Table(
+ 'user_project_metadata', self.metadata, autoload=True)
+ user_domain_metadata_table = sqlalchemy.Table(
+ 'user_domain_metadata', self.metadata, autoload=True)
+ group_project_metadata_table = sqlalchemy.Table(
+ 'group_project_metadata', self.metadata, autoload=True)
+ group_domain_metadata_table = sqlalchemy.Table(
+ 'group_domain_metadata', self.metadata, autoload=True)
+
+ # Create a Domain
+ domain = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'enabled': True}
+ session.execute(domain_table.insert().values(domain))
+
+ # Create anther Domain
+ domain2 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'enabled': True}
+ session.execute(domain_table.insert().values(domain2))
+
+ # Create a Project
+ project = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'extra': "{}"}
+ session.execute(project_table.insert().values(project))
+
+ # Create another Project
+ project2 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'extra': "{}"}
+ session.execute(project_table.insert().values(project2))
+
+ # Create a User
+ user = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'password': uuid.uuid4().hex,
+ 'enabled': True,
+ 'extra': json.dumps({})}
+ session.execute(user_table.insert().values(user))
+
+ # Create a Group
+ group = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'extra': json.dumps({})}
+ session.execute(group_table.insert().values(group))
+
+ # Create roles
+ role_list = []
+ for _ in range(7):
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ session.execute(role_table.insert().values(role))
+ role_list.append(role)
+
+ # Grant Role to User on Project
+ role_grant = {'user_id': user['id'],
+ 'project_id': project['id'],
+ 'data': json.dumps({'roles': [role_list[0]['id']]})}
+ session.execute(
+ user_project_metadata_table.insert().values(role_grant))
+
+ role_grant = {'user_id': user['id'],
+ 'project_id': project2['id'],
+ 'data': json.dumps({'roles': [role_list[1]['id']]})}
+ session.execute(
+ user_project_metadata_table.insert().values(role_grant))
+
+ # Grant Role to Group on different Project
+ role_grant = {'group_id': group['id'],
+ 'project_id': project2['id'],
+ 'data': json.dumps({'roles': [role_list[2]['id']]})}
+ session.execute(
+ group_project_metadata_table.insert().values(role_grant))
+
+ # Grant Role to User on Domain
+ role_grant = {'user_id': user['id'],
+ 'domain_id': domain['id'],
+ 'data': json.dumps({'roles': [role_list[3]['id']]})}
+ session.execute(user_domain_metadata_table.insert().values(role_grant))
+
+ # Grant Role to Group on Domain
+ role_grant = {'group_id': group['id'],
+ 'domain_id': domain['id'],
+ 'data': json.dumps(
+ {'roles': [role_list[4]['id']],
+ 'other': 'somedata'})}
+ session.execute(
+ group_domain_metadata_table.insert().values(role_grant))
+
+ session.commit()
+
+ self.upgrade(29)
+ s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
+ (user_project_metadata_table.c.user_id == user['id']) &
+ (user_project_metadata_table.c.project_id == project['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn({'id': role_list[0]['id']}, data['roles'])
+
+ s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
+ (user_project_metadata_table.c.user_id == user['id']) &
+ (user_project_metadata_table.c.project_id == project2['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn({'id': role_list[1]['id']}, data['roles'])
+
+ s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
+ (group_project_metadata_table.c.group_id == group['id']) &
+ (group_project_metadata_table.c.project_id == project2['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn({'id': role_list[2]['id']}, data['roles'])
+
+ s = sqlalchemy.select([user_domain_metadata_table.c.data]).where(
+ (user_domain_metadata_table.c.user_id == user['id']) &
+ (user_domain_metadata_table.c.domain_id == domain['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn({'id': role_list[3]['id']}, data['roles'])
+
+ s = sqlalchemy.select([group_domain_metadata_table.c.data]).where(
+ (group_domain_metadata_table.c.group_id == group['id']) &
+ (group_domain_metadata_table.c.domain_id == domain['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn({'id': role_list[4]['id']}, data['roles'])
+ self.assertIn('other', data)
+
+ # Now add an entry that has one regular and one inherited role
+ role_grant = {'user_id': user['id'],
+ 'domain_id': domain2['id'],
+ 'data': json.dumps(
+ {'roles': [{'id': role_list[5]['id']},
+ {'id': role_list[6]['id'],
+ 'inherited_to': 'projects'}]})}
+ session.execute(user_domain_metadata_table.insert().values(role_grant))
+
+ session.commit()
+ self.downgrade(28)
+
+ s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
+ (user_project_metadata_table.c.user_id == user['id']) &
+ (user_project_metadata_table.c.project_id == project['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[0]['id'], data['roles'])
+
+ s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
+ (user_project_metadata_table.c.user_id == user['id']) &
+ (user_project_metadata_table.c.project_id == project2['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[1]['id'], data['roles'])
+
+ s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
+ (group_project_metadata_table.c.group_id == group['id']) &
+ (group_project_metadata_table.c.project_id == project2['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[2]['id'], data['roles'])
+
+ s = sqlalchemy.select([user_domain_metadata_table.c.data]).where(
+ (user_domain_metadata_table.c.user_id == user['id']) &
+ (user_domain_metadata_table.c.domain_id == domain['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[3]['id'], data['roles'])
+
+ s = sqlalchemy.select([group_domain_metadata_table.c.data]).where(
+ (group_domain_metadata_table.c.group_id == group['id']) &
+ (group_domain_metadata_table.c.domain_id == domain['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[4]['id'], data['roles'])
+ self.assertIn('other', data)
+
+ # For user-domain2, where we had one regular and one inherited role,
+ # only the direct role should remain, the inherited role should
+ # have been deleted during the downgrade
+ s = sqlalchemy.select([user_domain_metadata_table.c.data]).where(
+ (user_domain_metadata_table.c.user_id == user['id']) &
+ (user_domain_metadata_table.c.domain_id == domain2['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[5]['id'], data['roles'])
+
def populate_user_table(self, with_pass_enab=False,
with_pass_enab_domain=False):
# Populate the appropriate fields in the user
diff --git a/tests/test_v3.py b/tests/test_v3.py
index ff9f7596..d3023843 100644
--- a/tests/test_v3.py
+++ b/tests/test_v3.py
@@ -338,7 +338,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
links['previous'])
def assertValidListResponse(self, resp, key, entity_validator, ref=None,
- expected_length=None):
+ expected_length=None, keys_to_check=None):
"""Make assertions common to all API list responses.
If a reference is provided, it's ID will be searched for in the
@@ -359,11 +359,12 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
for entity in entities:
self.assertIsNotNone(entity)
- self.assertValidEntity(entity)
+ self.assertValidEntity(entity, keys_to_check=keys_to_check)
entity_validator(entity)
if ref:
entity = [x for x in entities if x['id'] == ref['id']][0]
- self.assertValidEntity(entity, ref)
+ self.assertValidEntity(entity, ref=ref,
+ keys_to_check=keys_to_check)
entity_validator(entity, ref)
return entities
@@ -372,17 +373,21 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
"""Make assertions common to all API responses."""
entity = resp.result.get(key)
self.assertIsNotNone(entity)
- self.assertValidEntity(entity, *args, **kwargs)
+ keys = kwargs.pop('keys_to_check', None)
+ self.assertValidEntity(entity, keys_to_check=keys, *args, **kwargs)
entity_validator(entity, *args, **kwargs)
return entity
- def assertValidEntity(self, entity, ref=None):
+ def assertValidEntity(self, entity, ref=None, keys_to_check=None):
"""Make assertions common to all API entities.
If a reference is provided, the entity will also be compared against
the reference.
"""
- keys = ['name', 'description', 'enabled']
+ if keys_to_check:
+ keys = keys_to_check
+ else:
+ keys = ['name', 'description', 'enabled']
for k in ['id'] + keys:
msg = '%s unexpectedly None in %s' % (k, entity)
@@ -705,6 +710,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
resp,
'roles',
self.assertValidRole,
+ keys_to_check=['name'],
*args,
**kwargs)
@@ -713,6 +719,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
resp,
'role',
self.assertValidRole,
+ keys_to_check=['name'],
*args,
**kwargs)
diff --git a/tests/test_v3_identity.py b/tests/test_v3_identity.py
index af8890fb..3efc3b35 100644
--- a/tests/test_v3_identity.py
+++ b/tests/test_v3_identity.py
@@ -16,11 +16,64 @@
import uuid
+from keystone import config
from keystone import exception
import test_v3
+def _build_role_assignment_url_and_entity(
+ role_id, user_id=None, group_id=None, domain_id=None,
+ project_id=None, inherited_to_projects=False,
+ effective=False):
+
+ if user_id and domain_id:
+ url = ('/domains/%(domain_id)s/users/%(user_id)s'
+ '/roles/%(role_id)s' % {
+ 'domain_id': domain_id,
+ 'user_id': user_id,
+ 'role_id': role_id})
+ entity = {'role': {'id': role_id},
+ 'user': {'id': user_id},
+ 'scope': {'domain': {'id': domain_id}}}
+ if inherited_to_projects:
+ url = '/OS-INHERIT%s/inherited_to_projects' % url
+ if not effective:
+ entity['OS-INHERIT:inherited_to'] = 'projects'
+ elif user_id and project_id:
+ url = ('/projects/%(project_id)s/users/%(user_id)s'
+ '/roles/%(role_id)s' % {
+ 'project_id': project_id,
+ 'user_id': user_id,
+ 'role_id': role_id})
+ entity = {'role': {'id': role_id},
+ 'user': {'id': user_id},
+ 'scope': {'project': {'id': project_id}}}
+ if group_id and domain_id:
+ url = ('/domains/%(domain_id)s/groups/%(group_id)s'
+ '/roles/%(role_id)s' % {
+ 'domain_id': domain_id,
+ 'group_id': group_id,
+ 'role_id': role_id})
+ entity = {'role': {'id': role_id},
+ 'group': {'id': group_id},
+ 'scope': {'domain': {'id': domain_id}}}
+ if inherited_to_projects:
+ url = '/OS-INHERIT%s/inherited_to_projects' % url
+ if not effective:
+ entity['OS-INHERIT:inherited_to'] = 'projects'
+ elif group_id and project_id:
+ url = ('/projects/%(project_id)s/groups/%(group_id)s'
+ '/roles/%(role_id)s' % {
+ 'project_id': project_id,
+ 'group_id': group_id,
+ 'role_id': role_id})
+ entity = {'role': {'id': role_id},
+ 'group': {'id': group_id},
+ 'scope': {'project': {'id': project_id}}}
+ return (url, entity)
+
+
class IdentityTestCase(test_v3.RestfulTestCase):
"""Test domains, projects, users, groups, & role CRUD."""
@@ -628,48 +681,6 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.assertValidRoleListResponse(r, expected_length=0)
self.assertIn(collection_url, r.result['links']['self'])
- def _build_role_assignment_url_and_entity(
- self, role_id, user_id=None, group_id=None, domain_id=None,
- project_id=None):
-
- if user_id and domain_id:
- url = ('/domains/%(domain_id)s/users/%(user_id)s'
- '/roles/%(role_id)s' % {
- 'domain_id': domain_id,
- 'user_id': user_id,
- 'role_id': role_id})
- entity = {'role': {'id': role_id},
- 'user': {'id': user_id},
- 'scope': {'domain': {'id': domain_id}}}
- elif user_id and project_id:
- url = ('/projects/%(project_id)s/users/%(user_id)s'
- '/roles/%(role_id)s' % {
- 'project_id': project_id,
- 'user_id': user_id,
- 'role_id': role_id})
- entity = {'role': {'id': role_id},
- 'user': {'id': user_id},
- 'scope': {'project': {'id': project_id}}}
- if group_id and domain_id:
- url = ('/domains/%(domain_id)s/groups/%(group_id)s'
- '/roles/%(role_id)s' % {
- 'domain_id': domain_id,
- 'group_id': group_id,
- 'role_id': role_id})
- entity = {'role': {'id': role_id},
- 'group': {'id': group_id},
- 'scope': {'domain': {'id': domain_id}}}
- elif group_id and project_id:
- url = ('/projects/%(project_id)s/groups/%(group_id)s'
- '/roles/%(role_id)s' % {
- 'project_id': project_id,
- 'group_id': group_id,
- 'role_id': role_id})
- entity = {'role': {'id': role_id},
- 'group': {'id': group_id},
- 'scope': {'project': {'id': project_id}}}
- return (url, entity)
-
def test_get_role_assignments(self):
"""Call ``GET /role_assignments``.
@@ -712,7 +723,7 @@ class IdentityTestCase(test_v3.RestfulTestCase):
# Now add one of each of the four types of assignment, making sure
# that we get them all back.
- gd_url, gd_entity = self._build_role_assignment_url_and_entity(
+ gd_url, gd_entity = _build_role_assignment_url_and_entity(
domain_id=self.domain_id, group_id=self.group_id,
role_id=self.role_id)
self.put(gd_url)
@@ -722,7 +733,7 @@ class IdentityTestCase(test_v3.RestfulTestCase):
existing_assignments + 1)
self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url)
- ud_url, ud_entity = self._build_role_assignment_url_and_entity(
+ ud_url, ud_entity = _build_role_assignment_url_and_entity(
domain_id=self.domain_id, user_id=self.user1['id'],
role_id=self.role_id)
self.put(ud_url)
@@ -732,7 +743,7 @@ class IdentityTestCase(test_v3.RestfulTestCase):
existing_assignments + 2)
self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url)
- gp_url, gp_entity = self._build_role_assignment_url_and_entity(
+ gp_url, gp_entity = _build_role_assignment_url_and_entity(
project_id=self.project_id, group_id=self.group_id,
role_id=self.role_id)
self.put(gp_url)
@@ -742,7 +753,7 @@ class IdentityTestCase(test_v3.RestfulTestCase):
existing_assignments + 3)
self.assertRoleAssignmentInListResponse(r, gp_entity, link_url=gp_url)
- up_url, up_entity = self._build_role_assignment_url_and_entity(
+ up_url, up_entity = _build_role_assignment_url_and_entity(
project_id=self.project_id, user_id=self.user1['id'],
role_id=self.role_id)
self.put(up_url)
@@ -798,7 +809,7 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.assertIn(collection_url, r.result['links']['self'])
existing_assignments = len(r.result.get('role_assignments'))
- gd_url, gd_entity = self._build_role_assignment_url_and_entity(
+ gd_url, gd_entity = _build_role_assignment_url_and_entity(
domain_id=self.domain_id, group_id=self.group_id,
role_id=self.role_id)
self.put(gd_url)
@@ -816,11 +827,11 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')),
existing_assignments + 2)
- ud_url, ud_entity = self._build_role_assignment_url_and_entity(
+ ud_url, ud_entity = _build_role_assignment_url_and_entity(
domain_id=self.domain_id, user_id=self.user1['id'],
role_id=self.role_id)
self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url)
- ud_url, ud_entity = self._build_role_assignment_url_and_entity(
+ ud_url, ud_entity = _build_role_assignment_url_and_entity(
domain_id=self.domain_id, user_id=self.user2['id'],
role_id=self.role_id)
self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url)
@@ -866,7 +877,7 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.assertValidRoleAssignmentListResponse(r)
existing_assignments = len(r.result.get('role_assignments'))
- gd_url, gd_entity = self._build_role_assignment_url_and_entity(
+ gd_url, gd_entity = _build_role_assignment_url_and_entity(
domain_id=self.domain_id, group_id=self.group_id,
role_id=self.role_id)
self.put(gd_url)
@@ -954,22 +965,22 @@ class IdentityTestCase(test_v3.RestfulTestCase):
# Now add one of each of the four types of assignment
- gd_url, gd_entity = self._build_role_assignment_url_and_entity(
+ gd_url, gd_entity = _build_role_assignment_url_and_entity(
domain_id=self.domain_id, group_id=self.group1['id'],
role_id=self.role1['id'])
self.put(gd_url)
- ud_url, ud_entity = self._build_role_assignment_url_and_entity(
+ ud_url, ud_entity = _build_role_assignment_url_and_entity(
domain_id=self.domain_id, user_id=self.user1['id'],
role_id=self.role2['id'])
self.put(ud_url)
- gp_url, gp_entity = self._build_role_assignment_url_and_entity(
+ gp_url, gp_entity = _build_role_assignment_url_and_entity(
project_id=self.project1['id'], group_id=self.group1['id'],
role_id=self.role1['id'])
self.put(gp_url)
- up_url, up_entity = self._build_role_assignment_url_and_entity(
+ up_url, up_entity = _build_role_assignment_url_and_entity(
project_id=self.project1['id'], user_id=self.user1['id'],
role_id=self.role2['id'])
self.put(up_url)
@@ -1038,10 +1049,10 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url)
self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url)
# ...and the two via group membership...
- up1_url, up1_entity = self._build_role_assignment_url_and_entity(
+ up1_url, up1_entity = _build_role_assignment_url_and_entity(
project_id=self.project1['id'], user_id=self.user1['id'],
role_id=self.role1['id'])
- ud1_url, ud1_entity = self._build_role_assignment_url_and_entity(
+ ud1_url, ud1_entity = _build_role_assignment_url_and_entity(
domain_id=self.domain_id, user_id=self.user1['id'],
role_id=self.role1['id'])
self.assertRoleAssignmentInListResponse(r, up1_entity,
@@ -1062,9 +1073,459 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')), 2)
# Should have one direct role and one from group membership...
- up1_url, up1_entity = self._build_role_assignment_url_and_entity(
+ up1_url, up1_entity = _build_role_assignment_url_and_entity(
project_id=self.project1['id'], user_id=self.user1['id'],
role_id=self.role1['id'])
self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url)
self.assertRoleAssignmentInListResponse(r, up1_entity,
link_url=up1_url)
+
+
+class IdentityIneritanceTestCase(test_v3.RestfulTestCase):
+ """Test inheritance crud and its effects."""
+
+ def setUp(self):
+ self.orig_extension_enablement = config.CONF.os_inherit.enabled
+ self.opt_in_group('os_inherit', enabled=True)
+ super(IdentityIneritanceTestCase, self).setUp()
+
+ def tearDown(self):
+ super(IdentityIneritanceTestCase, self).tearDown()
+ self.opt_in_group('os_inherit', enabled=self.orig_extension_enablement)
+
+ def test_crud_user_inherited_domain_role_grants(self):
+ role_list = []
+ for _ in range(2):
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.assignment_api.create_role(role['id'], role)
+ role_list.append(role)
+
+ # Create a non-inherited role as a spoiler
+ self.assignment_api.create_grant(
+ role_list[1]['id'], user_id=self.user['id'],
+ domain_id=self.domain_id)
+
+ base_collection_url = (
+ '/OS-INHERIT/domains/%(domain_id)s/users/%(user_id)s/roles' % {
+ 'domain_id': self.domain_id,
+ 'user_id': self.user['id']})
+ member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % {
+ 'collection_url': base_collection_url,
+ 'role_id': role_list[0]['id']}
+ collection_url = base_collection_url + '/inherited_to_projects'
+
+ self.put(member_url)
+
+ # Check we can read it back
+ self.head(member_url)
+ r = self.get(collection_url)
+ self.assertValidRoleListResponse(r, ref=role_list[0])
+ self.assertIn(collection_url, r.result['links']['self'])
+
+ # Now delete and check its gone
+ self.delete(member_url)
+ r = self.get(collection_url)
+ self.assertValidRoleListResponse(r, expected_length=0)
+ self.assertIn(collection_url, r.result['links']['self'])
+
+ def test_crud_inherited_role_grants_failed_if_disabled(self):
+ # Disable the extension and check no API calls can be issued
+ self.opt_in_group('os_inherit', enabled=False)
+ super(IdentityIneritanceTestCase, self).setUp()
+
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.assignment_api.create_role(role['id'], role)
+
+ base_collection_url = (
+ '/OS-INHERIT/domains/%(domain_id)s/users/%(user_id)s/roles' % {
+ 'domain_id': self.domain_id,
+ 'user_id': self.user['id']})
+ member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % {
+ 'collection_url': base_collection_url,
+ 'role_id': role['id']}
+ collection_url = base_collection_url + '/inherited_to_projects'
+
+ self.put(member_url, expected_status=404)
+ self.head(member_url, expected_status=404)
+ self.get(collection_url, expected_status=404)
+ self.delete(member_url, expected_status=404)
+
+ def test_list_role_assignments_for_inherited_domain_grants(self):
+ """Call ``GET /role_assignments with inherited domain grants``.
+
+ Test Plan:
+ - Create 4 roles
+ - Create a domain with a user and two projects
+ - Assign two direct roles to project1
+ - Assign a spoiler role to project2
+ - Issue the URL to add inherited role to the domain
+ - Issue the URL to check it is indeed on the domain
+ - Issue the URL to check effective roles on project1 - this
+ should return 3 roles.
+
+ """
+ role_list = []
+ for _ in range(4):
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.assignment_api.create_role(role['id'], role)
+ role_list.append(role)
+
+ domain = self.new_domain_ref()
+ self.identity_api.create_domain(domain['id'], domain)
+ user1 = self.new_user_ref(
+ domain_id=domain['id'])
+ user1['password'] = uuid.uuid4().hex
+ self.identity_api.create_user(user1['id'], user1)
+ project1 = self.new_project_ref(
+ domain_id=domain['id'])
+ self.assignment_api.create_project(project1['id'], project1)
+ project2 = self.new_project_ref(
+ domain_id=domain['id'])
+ self.assignment_api.create_project(project2['id'], project2)
+ # Add some roles to the project
+ self.assignment_api.add_role_to_user_and_project(
+ user1['id'], project1['id'], role_list[0]['id'])
+ self.assignment_api.add_role_to_user_and_project(
+ user1['id'], project1['id'], role_list[1]['id'])
+ # ..and one on a different project as a spoiler
+ self.assignment_api.add_role_to_user_and_project(
+ user1['id'], project2['id'], role_list[2]['id'])
+
+ # Now create our inherited role on the domain
+ base_collection_url = (
+ '/OS-INHERIT/domains/%(domain_id)s/users/%(user_id)s/roles' % {
+ 'domain_id': domain['id'],
+ 'user_id': user1['id']})
+ member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % {
+ 'collection_url': base_collection_url,
+ 'role_id': role_list[3]['id']}
+ collection_url = base_collection_url + '/inherited_to_projects'
+
+ self.put(member_url)
+ self.head(member_url)
+ r = self.get(collection_url)
+ self.assertValidRoleListResponse(r, ref=role_list[3])
+ self.assertIn(collection_url, r.result['links']['self'])
+
+ # Now use the list domain role assignments api to check if this
+ # is included
+ collection_url = (
+ '/role_assignments?user.id=%(user_id)s'
+ '&scope.domain.id=%(domain_id)s' % {
+ 'user_id': user1['id'],
+ 'domain_id': domain['id']})
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r)
+ self.assertEqual(len(r.result.get('role_assignments')), 1)
+ ud_url, ud_entity = _build_role_assignment_url_and_entity(
+ domain_id=domain['id'], user_id=user1['id'],
+ role_id=role_list[3]['id'], inherited_to_projects=True)
+ self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url)
+
+ # Now ask for effective list role assignments - the role should
+ # turn into a project role, along with the two direct roles that are
+ # on the project
+ collection_url = (
+ '/role_assignments?effective&user.id=%(user_id)s'
+ '&scope.project.id=%(project_id)s' % {
+ 'user_id': user1['id'],
+ 'project_id': project1['id']})
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r)
+ self.assertEqual(len(r.result.get('role_assignments')), 3)
+ # An effective role for an inherited role will be a project
+ # entity, with a domain link to the inherited assignment
+ unused, up_entity = _build_role_assignment_url_and_entity(
+ project_id=project1['id'], user_id=user1['id'],
+ role_id=role_list[3]['id'])
+ ud_url, unused = _build_role_assignment_url_and_entity(
+ domain_id=domain['id'], user_id=user1['id'],
+ role_id=role_list[3]['id'], inherited_to_projects=True)
+ self.assertRoleAssignmentInListResponse(r, up_entity, link_url=ud_url)
+
+ def test_list_role_assignments_for_disabled_inheritance_extension(self):
+ """Call ``GET /role_assignments with inherited domain grants``.
+
+ Test Plan:
+ - Issue the URL to add inherited role to the domain
+ - Issue the URL to check effective roles on project include the
+ inherited role
+ - Disable the extension
+ - Re-check the effective roles, proving the inherited role no longer
+ shows up.
+
+ """
+
+ role_list = []
+ for _ in range(4):
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.assignment_api.create_role(role['id'], role)
+ role_list.append(role)
+
+ domain = self.new_domain_ref()
+ self.identity_api.create_domain(domain['id'], domain)
+ user1 = self.new_user_ref(
+ domain_id=domain['id'])
+ user1['password'] = uuid.uuid4().hex
+ self.identity_api.create_user(user1['id'], user1)
+ project1 = self.new_project_ref(
+ domain_id=domain['id'])
+ self.assignment_api.create_project(project1['id'], project1)
+ project2 = self.new_project_ref(
+ domain_id=domain['id'])
+ self.assignment_api.create_project(project2['id'], project2)
+ # Add some roles to the project
+ self.assignment_api.add_role_to_user_and_project(
+ user1['id'], project1['id'], role_list[0]['id'])
+ self.assignment_api.add_role_to_user_and_project(
+ user1['id'], project1['id'], role_list[1]['id'])
+ # ..and one on a different project as a spoiler
+ self.assignment_api.add_role_to_user_and_project(
+ user1['id'], project2['id'], role_list[2]['id'])
+
+ # Now create our inherited role on the domain
+ base_collection_url = (
+ '/OS-INHERIT/domains/%(domain_id)s/users/%(user_id)s/roles' % {
+ 'domain_id': domain['id'],
+ 'user_id': user1['id']})
+ member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % {
+ 'collection_url': base_collection_url,
+ 'role_id': role_list[3]['id']}
+ collection_url = base_collection_url + '/inherited_to_projects'
+
+ self.put(member_url)
+ self.head(member_url)
+ r = self.get(collection_url)
+ self.assertValidRoleListResponse(r, ref=role_list[3])
+ self.assertIn(collection_url, r.result['links']['self'])
+
+ # Get effective list role assignments - the role should
+ # turn into a project role, along with the two direct roles that are
+ # on the project
+ collection_url = (
+ '/role_assignments?effective&user.id=%(user_id)s'
+ '&scope.project.id=%(project_id)s' % {
+ 'user_id': user1['id'],
+ 'project_id': project1['id']})
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r)
+ self.assertEqual(len(r.result.get('role_assignments')), 3)
+
+ unused, up_entity = _build_role_assignment_url_and_entity(
+ project_id=project1['id'], user_id=user1['id'],
+ role_id=role_list[3]['id'])
+ ud_url, unused = _build_role_assignment_url_and_entity(
+ domain_id=domain['id'], user_id=user1['id'],
+ role_id=role_list[3]['id'], inherited_to_projects=True)
+ self.assertRoleAssignmentInListResponse(r, up_entity, link_url=ud_url)
+
+ # Disable the extension and re-check the list, the role inherited
+ # from the project should no longer show up
+ self.opt_in_group('os_inherit', enabled=False)
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r)
+ self.assertEqual(len(r.result.get('role_assignments')), 2)
+
+ unused, up_entity = _build_role_assignment_url_and_entity(
+ project_id=project1['id'], user_id=user1['id'],
+ role_id=role_list[3]['id'])
+ ud_url, unused = _build_role_assignment_url_and_entity(
+ domain_id=domain['id'], user_id=user1['id'],
+ role_id=role_list[3]['id'], inherited_to_projects=True)
+ self.assertRoleAssignmentNotInListResponse(r, up_entity,
+ link_url=ud_url)
+
+ def test_list_role_assignments_for_inherited_group_domain_grants(self):
+ """Call ``GET /role_assignments with inherited group domain grants``.
+
+ Test Plan:
+ - Create 4 roles
+ - Create a domain with a user and two projects
+ - Assign two direct roles to project1
+ - Assign a spoiler role to project2
+ - Issue the URL to add inherited role to the domain
+ - Issue the URL to check it is indeed on the domain
+ - Issue the URL to check effective roles on project1 - this
+ should return 3 roles.
+
+ """
+ role_list = []
+ for _ in range(4):
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.assignment_api.create_role(role['id'], role)
+ role_list.append(role)
+
+ domain = self.new_domain_ref()
+ self.identity_api.create_domain(domain['id'], domain)
+ user1 = self.new_user_ref(
+ domain_id=domain['id'])
+ user1['password'] = uuid.uuid4().hex
+ self.identity_api.create_user(user1['id'], user1)
+ user2 = self.new_user_ref(
+ domain_id=domain['id'])
+ user2['password'] = uuid.uuid4().hex
+ self.identity_api.create_user(user2['id'], user2)
+ group1 = self.new_group_ref(
+ domain_id=domain['id'])
+ self.identity_api.create_group(group1['id'], group1)
+ self.identity_api.add_user_to_group(user1['id'],
+ group1['id'])
+ self.identity_api.add_user_to_group(user2['id'],
+ group1['id'])
+ project1 = self.new_project_ref(
+ domain_id=domain['id'])
+ self.assignment_api.create_project(project1['id'], project1)
+ project2 = self.new_project_ref(
+ domain_id=domain['id'])
+ self.assignment_api.create_project(project2['id'], project2)
+ # Add some roles to the project
+ self.assignment_api.add_role_to_user_and_project(
+ user1['id'], project1['id'], role_list[0]['id'])
+ self.assignment_api.add_role_to_user_and_project(
+ user1['id'], project1['id'], role_list[1]['id'])
+ # ..and one on a different project as a spoiler
+ self.assignment_api.add_role_to_user_and_project(
+ user1['id'], project2['id'], role_list[2]['id'])
+
+ # Now create our inherited role on the domain
+ base_collection_url = (
+ '/OS-INHERIT/domains/%(domain_id)s/groups/%(group_id)s/roles' % {
+ 'domain_id': domain['id'],
+ 'group_id': group1['id']})
+ member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % {
+ 'collection_url': base_collection_url,
+ 'role_id': role_list[3]['id']}
+ collection_url = base_collection_url + '/inherited_to_projects'
+
+ self.put(member_url)
+ self.head(member_url)
+ r = self.get(collection_url)
+ self.assertValidRoleListResponse(r, ref=role_list[3])
+ self.assertIn(collection_url, r.result['links']['self'])
+
+ # Now use the list domain role assignments api to check if this
+ # is included
+ collection_url = (
+ '/role_assignments?group.id=%(group_id)s'
+ '&scope.domain.id=%(domain_id)s' % {
+ 'group_id': group1['id'],
+ 'domain_id': domain['id']})
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r)
+ self.assertEqual(len(r.result.get('role_assignments')), 1)
+ gd_url, gd_entity = _build_role_assignment_url_and_entity(
+ domain_id=domain['id'], group_id=group1['id'],
+ role_id=role_list[3]['id'], inherited_to_projects=True)
+ self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url)
+
+ # Now ask for effective list role assignments - the role should
+ # turn into a user project role, along with the two direct roles
+ # that are on the project
+ collection_url = (
+ '/role_assignments?effective&user.id=%(user_id)s'
+ '&scope.project.id=%(project_id)s' % {
+ 'user_id': user1['id'],
+ 'project_id': project1['id']})
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r)
+ self.assertEqual(len(r.result.get('role_assignments')), 3)
+ # An effective role for an inherited role will be a project
+ # entity, with a domain link to the inherited assignment
+ unused, up_entity = _build_role_assignment_url_and_entity(
+ project_id=project1['id'], user_id=user1['id'],
+ role_id=role_list[3]['id'])
+ gd_url, unused = _build_role_assignment_url_and_entity(
+ domain_id=domain['id'], group_id=group1['id'],
+ role_id=role_list[3]['id'], inherited_to_projects=True)
+ self.assertRoleAssignmentInListResponse(r, up_entity, link_url=gd_url)
+
+ def test_filtered_role_assignments_for_inherited_grants(self):
+ """Call ``GET /role_assignments?scope.OS-INHERIT:inherited_to``.
+
+ Test Plan:
+ - Create 5 roles
+ - Create a domain with a user, group and two projects
+ - Assign three direct spoiler roles to projects
+ - Issue the URL to add an inherited user role to the domain
+ - Issue the URL to add an inherited group role to the domain
+ - Issue the URL to filter by inherited roles - this should
+ return just the 2 inherited roles.
+
+ """
+ role_list = []
+ for _ in range(5):
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.assignment_api.create_role(role['id'], role)
+ role_list.append(role)
+
+ domain = self.new_domain_ref()
+ self.identity_api.create_domain(domain['id'], domain)
+ user1 = self.new_user_ref(
+ domain_id=domain['id'])
+ user1['password'] = uuid.uuid4().hex
+ self.identity_api.create_user(user1['id'], user1)
+ group1 = self.new_group_ref(
+ domain_id=domain['id'])
+ self.identity_api.create_group(group1['id'], group1)
+ project1 = self.new_project_ref(
+ domain_id=domain['id'])
+ self.assignment_api.create_project(project1['id'], project1)
+ project2 = self.new_project_ref(
+ domain_id=domain['id'])
+ self.assignment_api.create_project(project2['id'], project2)
+ # Add some spoiler roles to the projects
+ self.assignment_api.add_role_to_user_and_project(
+ user1['id'], project1['id'], role_list[0]['id'])
+ self.assignment_api.add_role_to_user_and_project(
+ user1['id'], project2['id'], role_list[1]['id'])
+ # Create a non-inherited role as a spoiler
+ self.assignment_api.create_grant(
+ role_list[2]['id'], user_id=user1['id'], domain_id=domain['id'])
+
+ # Now create two inherited roles on the domain, one for a user
+ # and one for a domain
+ base_collection_url = (
+ '/OS-INHERIT/domains/%(domain_id)s/users/%(user_id)s/roles' % {
+ 'domain_id': domain['id'],
+ 'user_id': user1['id']})
+ member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % {
+ 'collection_url': base_collection_url,
+ 'role_id': role_list[3]['id']}
+ collection_url = base_collection_url + '/inherited_to_projects'
+
+ self.put(member_url)
+ self.head(member_url)
+ r = self.get(collection_url)
+ self.assertValidRoleListResponse(r, ref=role_list[3])
+ self.assertIn(collection_url, r.result['links']['self'])
+
+ base_collection_url = (
+ '/OS-INHERIT/domains/%(domain_id)s/groups/%(group_id)s/roles' % {
+ 'domain_id': domain['id'],
+ 'group_id': group1['id']})
+ member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % {
+ 'collection_url': base_collection_url,
+ 'role_id': role_list[4]['id']}
+ collection_url = base_collection_url + '/inherited_to_projects'
+
+ self.put(member_url)
+ self.head(member_url)
+ r = self.get(collection_url)
+ self.assertValidRoleListResponse(r, ref=role_list[4])
+ self.assertIn(collection_url, r.result['links']['self'])
+
+ # Now use the list role assignments api to get a list of inherited
+ # roles on the domain - should get back the two roles
+ collection_url = (
+ '/role_assignments?scope.OS-INHERIT:inherited_to=projects')
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r)
+ self.assertEqual(len(r.result.get('role_assignments')), 2)
+ ud_url, ud_entity = _build_role_assignment_url_and_entity(
+ domain_id=domain['id'], user_id=user1['id'],
+ role_id=role_list[3]['id'], inherited_to_projects=True)
+ gd_url, gd_entity = _build_role_assignment_url_and_entity(
+ domain_id=domain['id'], group_id=group1['id'],
+ role_id=role_list[4]['id'], inherited_to_projects=True)
+ self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url)
+ self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url)