diff options
-rw-r--r-- | doc/source/configuration.rst | 12 | ||||
-rw-r--r-- | etc/keystone.conf.sample | 5 | ||||
-rw-r--r-- | keystone/assignment/backends/kvs.py | 86 | ||||
-rw-r--r-- | keystone/assignment/backends/ldap.py | 6 | ||||
-rw-r--r-- | keystone/assignment/backends/sql.py | 162 | ||||
-rw-r--r-- | keystone/assignment/core.py | 121 | ||||
-rw-r--r-- | keystone/common/config.py | 3 | ||||
-rw-r--r-- | keystone/common/sql/migrate_repo/versions/028_fixup_group_metadata.py | 190 | ||||
-rw-r--r-- | keystone/common/sql/migrate_repo/versions/029_update_assignment_metadata.py | 102 | ||||
-rw-r--r-- | keystone/identity/controllers.py | 199 | ||||
-rw-r--r-- | keystone/identity/core.py | 29 | ||||
-rw-r--r-- | keystone/identity/routers.py | 42 | ||||
-rw-r--r-- | tests/test_backend.py | 190 | ||||
-rw-r--r-- | tests/test_backend_ldap.py | 4 | ||||
-rw-r--r-- | tests/test_backend_sql.py | 4 | ||||
-rw-r--r-- | tests/test_sql_upgrade.py | 298 | ||||
-rw-r--r-- | tests/test_v3.py | 19 | ||||
-rw-r--r-- | tests/test_v3_identity.py | 575 |
18 files changed, 1859 insertions, 188 deletions
diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst index a074c704..c4db1be2 100644 --- a/doc/source/configuration.rst +++ b/doc/source/configuration.rst @@ -79,6 +79,7 @@ following sections: * ``[signing]`` - cryptographic signatures for PKI based tokens * ``[ssl]`` - SSL configuration * ``[auth]`` - Authentication plugin configuration +* ``[os_inherit]`` - Inherited Role Assignment extension * ``[paste_deploy]`` - Pointer to the PasteDeploy configuration file The Keystone primary configuration file is expected to be named ``keystone.conf``. @@ -494,6 +495,17 @@ In addition to changing their password all of the users current tokens will be deleted (if the backend used is kvs or sql) +Inherited Role Assignment Extension +----------------------------------- + +Keystone provides an optional extension that adds the capability to assign roles to a domain that, rather than +affect the domain itself, are instead inherited to all projects owned by theat domain. This extension is disabled by +default, but can be enabled by including the following in ``keystone.conf``. + + [os_inherit] + enabled = True + + Sample Configuration Files -------------------------- diff --git a/etc/keystone.conf.sample b/etc/keystone.conf.sample index 671accf1..7fa232f0 100644 --- a/etc/keystone.conf.sample +++ b/etc/keystone.conf.sample @@ -109,6 +109,11 @@ # delegation and impersonation features can be optionally disabled # enabled = True +[os_inherit] +# role-assignment inheritance to projects from owning domain can be +# optionally enabled +# enabled = False + [catalog] # dynamic, sql-based backend (supports API/CLI-based management commands) # driver = keystone.catalog.backends.sql.Catalog diff --git a/keystone/assignment/backends/kvs.py b/keystone/assignment/backends/kvs.py index 4ed3937b..4dfd908f 100644 --- a/keystone/assignment/backends/kvs.py +++ b/keystone/assignment/backends/kvs.py @@ -33,10 +33,16 @@ class Assignment(kvs.Base, assignment.Driver): except exception.NotFound: raise exception.ProjectNotFound(project_id=tenant_id) - def list_projects(self): - tenant_keys = filter(lambda x: x.startswith("tenant-"), - self.db.keys()) - return [self.db.get(key) for key in tenant_keys] + def list_projects(self, domain_id=None): + project_keys = filter(lambda x: x.startswith("tenant-"), + self.db.keys()) + project_refs = [self.db.get(key) for key in project_keys] + + if domain_id: + self.get_domain(domain_id) + project_refs = filter(lambda x: domain_id in x['domain_id'], + project_refs) + return project_refs def get_project_by_name(self, tenant_name, domain_id): try: @@ -105,13 +111,16 @@ class Assignment(kvs.Base, assignment.Driver): metadata_ref = self._get_metadata(user_id, tenant_id) except exception.MetadataNotFound: metadata_ref = {} - roles = set(metadata_ref.get('roles', [])) - if role_id in roles: + + try: + metadata_ref['roles'] = self._add_role_to_role_dicts( + role_id, False, metadata_ref.get('roles', []), + allow_existing=False) + except KeyError: msg = ('User %s already has role %s in tenant %s' % (user_id, role_id, tenant_id)) raise exception.Conflict(type='role grant', details=msg) - roles.add(role_id) - metadata_ref['roles'] = list(roles) + self._update_metadata(user_id, tenant_id, metadata_ref) def remove_role_from_user_and_project(self, user_id, tenant_id, role_id): @@ -119,23 +128,25 @@ class Assignment(kvs.Base, assignment.Driver): metadata_ref = self._get_metadata(user_id, tenant_id) except exception.MetadataNotFound: metadata_ref = {} - roles = set(metadata_ref.get('roles', [])) - if role_id not in roles: - msg = 'Cannot remove role that has not been granted, %s' % role_id - raise exception.RoleNotFound(message=msg) - roles.remove(role_id) - metadata_ref['roles'] = list(roles) + try: + metadata_ref['roles'] = self._remove_role_from_role_dicts( + role_id, False, metadata_ref.get('roles', [])) + except KeyError: + raise exception.RoleNotFound(message=_( + 'Cannot remove role that has not been granted, %s') % + role_id) + + if len(metadata_ref['roles']): + self._update_metadata(user_id, tenant_id, metadata_ref) + else: - if not len(roles): self.db.delete('metadata-%s-%s' % (tenant_id, user_id)) user_ref = self._get_user(user_id) tenants = set(user_ref.get('tenants', [])) tenants.remove(tenant_id) user_ref['tenants'] = list(tenants) self.identity_api.update_user(user_id, user_ref) - else: - self._update_metadata(user_id, tenant_id, metadata_ref) def list_role_assignments(self): """List the role assignments. @@ -144,7 +155,7 @@ class Assignment(kvs.Base, assignment.Driver): "metadata-{target}-{actor}", with the value being a role list - i.e. "metadata-MyProjectID-MyUserID" [role1, role2] + i.e. "metadata-MyProjectID-MyUserID" [{'id': role1}, {'id': role2}] ...so we enumerate the list and extract the targets, actors and roles. @@ -169,7 +180,8 @@ class Assignment(kvs.Base, assignment.Driver): template['group_id'] = meta_id2 entry = self.db.get(key) - for r in entry.get('roles', []): + for r in self._roles_from_role_dicts(entry.get('roles', {}), + False): role_assignment = template.copy() role_assignment['role_id'] = r assignment_list.append(role_assignment) @@ -324,7 +336,8 @@ class Assignment(kvs.Base, assignment.Driver): self.db.set('role_list', list(role_list)) def create_grant(self, role_id, user_id=None, group_id=None, - domain_id=None, project_id=None): + domain_id=None, project_id=None, + inherited_to_projects=False): self.get_role(role_id) if user_id: @@ -341,14 +354,16 @@ class Assignment(kvs.Base, assignment.Driver): domain_id, group_id) except exception.MetadataNotFound: metadata_ref = {} - roles = set(metadata_ref.get('roles', [])) - roles.add(role_id) - metadata_ref['roles'] = list(roles) + + metadata_ref['roles'] = self._add_role_to_role_dicts( + role_id, inherited_to_projects, metadata_ref.get('roles', [])) + self._update_metadata(user_id, project_id, metadata_ref, domain_id, group_id) def list_grants(self, user_id=None, group_id=None, - domain_id=None, project_id=None): + domain_id=None, project_id=None, + inherited_to_projects=False): if user_id: self.identity_api.get_user(user_id) if group_id: @@ -363,10 +378,14 @@ class Assignment(kvs.Base, assignment.Driver): domain_id, group_id) except exception.MetadataNotFound: metadata_ref = {} - return [self.get_role(x) for x in metadata_ref.get('roles', [])] + + return [self.get_role(x) for x in + self._roles_from_role_dicts(metadata_ref.get('roles', []), + inherited_to_projects)] def get_grant(self, role_id, user_id=None, group_id=None, - domain_id=None, project_id=None): + domain_id=None, project_id=None, + inherited_to_projects=False): self.get_role(role_id) if user_id: self.identity_api.get_user(user_id) @@ -382,13 +401,17 @@ class Assignment(kvs.Base, assignment.Driver): domain_id, group_id) except exception.MetadataNotFound: metadata_ref = {} - role_ids = set(metadata_ref.get('roles', [])) + + role_ids = set(self._roles_from_role_dicts( + metadata_ref.get('roles', []), inherited_to_projects)) + if role_id not in role_ids: raise exception.RoleNotFound(role_id=role_id) return self.get_role(role_id) def delete_grant(self, role_id, user_id=None, group_id=None, - domain_id=None, project_id=None): + domain_id=None, project_id=None, + inherited_to_projects=False): self.get_role(role_id) if user_id: self.identity_api.get_user(user_id) @@ -404,12 +427,13 @@ class Assignment(kvs.Base, assignment.Driver): domain_id, group_id) except exception.MetadataNotFound: metadata_ref = {} - roles = set(metadata_ref.get('roles', [])) + try: - roles.remove(role_id) + metadata_ref['roles'] = self._remove_role_from_role_dicts( + role_id, inherited_to_projects, metadata_ref.get('roles', [])) except KeyError: raise exception.RoleNotFound(role_id=role_id) - metadata_ref['roles'] = list(roles) + self._update_metadata(user_id, project_id, metadata_ref, domain_id, group_id) diff --git a/keystone/assignment/backends/ldap.py b/keystone/assignment/backends/ldap.py index 09539c9f..b1b3f99f 100644 --- a/keystone/assignment/backends/ldap.py +++ b/keystone/assignment/backends/ldap.py @@ -72,7 +72,9 @@ class Assignment(assignment.Driver): def get_project(self, tenant_id): return self._set_default_domain(self.project.get(tenant_id)) - def list_projects(self): + def list_projects(self, domain_id=None): + # We don't support multiple domains within this driver, so ignore + # any domain passed. return self._set_default_domain(self.project.get_all()) def get_project_by_name(self, tenant_name, domain_id): @@ -117,7 +119,7 @@ class Assignment(assignment.Driver): metadata_ref = _get_roles_for_just_user_and_project(user_id, tenant_id) if not metadata_ref: return {} - return {'roles': metadata_ref} + return {'roles': [self._role_to_dict(r, False) for r in metadata_ref]} def get_role(self, role_id): return self.role.get(role_id) diff --git a/keystone/assignment/backends/sql.py b/keystone/assignment/backends/sql.py index 237330ce..5ec435ff 100644 --- a/keystone/assignment/backends/sql.py +++ b/keystone/assignment/backends/sql.py @@ -96,7 +96,8 @@ class Assignment(sql.Base, assignment.Driver): raise exception.MetadataNotFound() def create_grant(self, role_id, user_id=None, group_id=None, - domain_id=None, project_id=None): + domain_id=None, project_id=None, + inherited_to_projects=False): if user_id: self.identity_api.get_user(user_id) if group_id: @@ -110,6 +111,10 @@ class Assignment(sql.Base, assignment.Driver): if project_id: self._get_project(session, project_id) + if project_id and inherited_to_projects: + msg = _('Inherited roles can only be assigned to domains') + raise exception.Conflict(type='role grant', details=msg) + try: metadata_ref = self._get_metadata(user_id, project_id, domain_id, group_id) @@ -117,9 +122,10 @@ class Assignment(sql.Base, assignment.Driver): except exception.MetadataNotFound: metadata_ref = {} is_new = True - roles = set(metadata_ref.get('roles', [])) - roles.add(role_id) - metadata_ref['roles'] = list(roles) + + metadata_ref['roles'] = self._add_role_to_role_dicts( + role_id, inherited_to_projects, metadata_ref.get('roles', [])) + if is_new: self._create_metadata(user_id, project_id, metadata_ref, domain_id, group_id) @@ -128,7 +134,8 @@ class Assignment(sql.Base, assignment.Driver): domain_id, group_id) def list_grants(self, user_id=None, group_id=None, - domain_id=None, project_id=None): + domain_id=None, project_id=None, + inherited_to_projects=False): if user_id: self.identity_api.get_user(user_id) if group_id: @@ -144,10 +151,14 @@ class Assignment(sql.Base, assignment.Driver): domain_id, group_id) except exception.MetadataNotFound: metadata_ref = {} - return [self.get_role(x) for x in metadata_ref.get('roles', [])] + + return [self.get_role(x) for x in + self._roles_from_role_dicts(metadata_ref.get('roles', []), + inherited_to_projects)] def get_grant(self, role_id, user_id=None, group_id=None, - domain_id=None, project_id=None): + domain_id=None, project_id=None, + inherited_to_projects=False): if user_id: self.identity_api.get_user(user_id) if group_id: @@ -166,13 +177,15 @@ class Assignment(sql.Base, assignment.Driver): domain_id, group_id) except exception.MetadataNotFound: metadata_ref = {} - role_ids = set(metadata_ref.get('roles', [])) + role_ids = set(self._roles_from_role_dicts( + metadata_ref.get('roles', []), inherited_to_projects)) if role_id not in role_ids: raise exception.RoleNotFound(role_id=role_id) return role_ref.to_dict() def delete_grant(self, role_id, user_id=None, group_id=None, - domain_id=None, project_id=None): + domain_id=None, project_id=None, + inherited_to_projects=False): if user_id: self.identity_api.get_user(user_id) if group_id: @@ -193,25 +206,43 @@ class Assignment(sql.Base, assignment.Driver): except exception.MetadataNotFound: metadata_ref = {} is_new = True - roles = set(metadata_ref.get('roles', [])) + try: - roles.remove(role_id) + metadata_ref['roles'] = self._remove_role_from_role_dicts( + role_id, inherited_to_projects, metadata_ref.get('roles', [])) except KeyError: raise exception.RoleNotFound(role_id=role_id) - metadata_ref['roles'] = list(roles) + if is_new: + # TODO(henry-nash) It seems odd that you would create a new + # entry in response to trying to delete a role that was not + # assigned. Although benign, this should probably be removed. self._create_metadata(user_id, project_id, metadata_ref, domain_id, group_id) else: self._update_metadata(user_id, project_id, metadata_ref, domain_id, group_id) - def list_projects(self): + def list_projects(self, domain_id=None): session = self.get_session() - tenant_refs = session.query(Project).all() - return [tenant_ref.to_dict() for tenant_ref in tenant_refs] + if domain_id: + self._get_domain(session, domain_id) + + query = session.query(Project) + if domain_id: + query = query.filter_by(domain_id=domain_id) + project_refs = query.all() + return [project_ref.to_dict() for project_ref in project_refs] def get_projects_for_user(self, user_id): + + # FIXME(henry-nash) The following should take into account + # both group and inherited roles. In fact, I don't see why this + # call can't be handled at the controller level like we do + # with 'get_roles_for_user_and_project()'. Further, this + # call seems essentially the same as 'list_user_projects()' + # later in this driver. Both should be removed. + self.identity_api.get_user(user_id) session = self.get_session() query = session.query(UserProjectGrant) @@ -230,13 +261,16 @@ class Assignment(sql.Base, assignment.Driver): except exception.MetadataNotFound: metadata_ref = {} is_new = True - roles = set(metadata_ref.get('roles', [])) - if role_id in roles: + + try: + metadata_ref['roles'] = self._add_role_to_role_dicts( + role_id, False, metadata_ref.get('roles', []), + allow_existing=False) + except KeyError: msg = ('User %s already has role %s in tenant %s' % (user_id, role_id, tenant_id)) raise exception.Conflict(type='role grant', details=msg) - roles.add(role_id) - metadata_ref['roles'] = list(roles) + if is_new: self._create_metadata(user_id, tenant_id, metadata_ref) else: @@ -245,14 +279,15 @@ class Assignment(sql.Base, assignment.Driver): def remove_role_from_user_and_project(self, user_id, tenant_id, role_id): try: metadata_ref = self._get_metadata(user_id, tenant_id) - roles = set(metadata_ref.get('roles', [])) - if role_id not in roles: + try: + metadata_ref['roles'] = self._remove_role_from_role_dicts( + role_id, False, metadata_ref.get('roles', [])) + except KeyError: raise exception.RoleNotFound(message=_( 'Cannot remove role that has not been granted, %s') % role_id) - roles.remove(role_id) - metadata_ref['roles'] = list(roles) - if len(roles): + + if len(metadata_ref['roles']): self._update_metadata(user_id, tenant_id, metadata_ref) else: session = self.get_session() @@ -277,28 +312,44 @@ class Assignment(sql.Base, assignment.Driver): assignment_list = [] refs = session.query(UserDomainGrant).all() for x in refs: - for r in x.data.get('roles', []): - assignment_list.append({'user_id': x.user_id, - 'domain_id': x.domain_id, - 'role_id': r}) + for r in self._roles_from_role_dicts( + x.data.get('roles', {}), False): + assignment_list.append({'user_id': x.user_id, + 'domain_id': x.domain_id, + 'role_id': r}) + for r in self._roles_from_role_dicts( + x.data.get('roles', {}), True): + assignment_list.append({'user_id': x.user_id, + 'domain_id': x.domain_id, + 'role_id': r, + 'inherited_to_projects': True}) refs = session.query(UserProjectGrant).all() for x in refs: - for r in x.data.get('roles', []): - assignment_list.append({'user_id': x.user_id, - 'project_id': x.project_id, - 'role_id': r}) + for r in self._roles_from_role_dicts( + x.data.get('roles', {}), False): + assignment_list.append({'user_id': x.user_id, + 'project_id': x.project_id, + 'role_id': r}) refs = session.query(GroupDomainGrant).all() for x in refs: - for r in x.data.get('roles', []): - assignment_list.append({'group_id': x.group_id, - 'domain_id': x.domain_id, - 'role_id': r}) + for r in self._roles_from_role_dicts( + x.data.get('roles', {}), False): + assignment_list.append({'group_id': x.group_id, + 'domain_id': x.domain_id, + 'role_id': r}) + for r in self._roles_from_role_dicts( + x.data.get('roles', {}), True): + assignment_list.append({'group_id': x.group_id, + 'domain_id': x.domain_id, + 'role_id': r, + 'inherited_to_projects': True}) refs = session.query(GroupProjectGrant).all() for x in refs: - for r in x.data.get('roles', []): - assignment_list.append({'group_id': x.group_id, - 'project_id': x.project_id, - 'role_id': r}) + for r in self._roles_from_role_dicts( + x.data.get('roles', {}), False): + assignment_list.append({'group_id': x.group_id, + 'project_id': x.project_id, + 'role_id': r}) return assignment_list # CRUD @@ -473,6 +524,14 @@ class Assignment(sql.Base, assignment.Driver): session.flush() def list_user_projects(self, user_id): + + # FIXME(henry-nash) The following should take into account + # both group and inherited roles. In fact, I don't see why this + # call can't be handled at the controller level like we do + # with 'get_roles_for_user_and_project()'. Further, this + # call seems essentially the same as 'get_projects_for_user()' + # earlier in this driver. Both should be removed. + session = self.get_session() user = self.identity_api.get_user(user_id) metadata_refs = session\ @@ -627,6 +686,29 @@ class Role(sql.ModelBase, sql.DictBase): class BaseGrant(sql.DictBase): + """Base Grant class. + + There are four grant tables in the current implementation, one for + each type of grant: + + - User for Project + - User for Domain + - Group for Project + - Group for Domain + + Each is a table with the two attributes above as a combined primary key, + with the data field holding all roles for that combination. The data + field is a list of dicts. For regular role assignments each dict in + the list of of the form: + + {'id': role_id} + + If the OS-INHERIT extension is enabled and the role on a domain is an + inherited role, the dict will be of the form: + + {'id': role_id, 'inherited_to': 'projects'} + + """ def to_dict(self): """Override parent to_dict() method with a simpler implementation. diff --git a/keystone/assignment/core.py b/keystone/assignment/core.py index 531da02e..b71e2a18 100644 --- a/keystone/assignment/core.py +++ b/keystone/assignment/core.py @@ -59,33 +59,72 @@ class Manager(manager.Manager): self.identity_api.assignment_api = self def get_roles_for_user_and_project(self, user_id, tenant_id): - def _get_group_project_roles(user_id, tenant_id): + """Get the roles associated with a user within given project. + + This includes roles directly assigned to the user on the + project, as well as those by virtue of group membership. If + the OS-INHERIT extension is enabled, then this will also + include roles inherited from the domain. + + :returns: a list of role ids. + :raises: keystone.exception.UserNotFound, + keystone.exception.ProjectNotFound + + """ + def _get_group_project_roles(user_id, project_ref): role_list = [] group_refs = (self.identity_api.list_groups_for_user (user_id=user_id)) for x in group_refs: try: - metadata_ref = self._get_metadata(group_id=x['id'], - tenant_id=tenant_id) - role_list += metadata_ref.get('roles', []) + metadata_ref = self._get_metadata( + group_id=x['id'], tenant_id=project_ref['id']) + role_list += self._roles_from_role_dicts( + metadata_ref.get('roles', {}), False) except exception.MetadataNotFound: # no group grant, skip pass + + if CONF.os_inherit.enabled: + # Now get any inherited group roles for the owning domain + try: + metadata_ref = self._get_metadata( + group_id=x['id'], + domain_id=project_ref['domain_id']) + role_list += self._roles_from_role_dicts( + metadata_ref.get('roles', {}), True) + except (exception.MetadataNotFound, + exception.NotImplemented): + pass + return role_list - def _get_user_project_roles(user_id, tenant_id): - metadata_ref = {} + def _get_user_project_roles(user_id, project_ref): + role_list = [] try: metadata_ref = self._get_metadata(user_id=user_id, - tenant_id=tenant_id) + tenant_id=project_ref['id']) + role_list = self._roles_from_role_dicts( + metadata_ref.get('roles', {}), False) except exception.MetadataNotFound: pass - return metadata_ref.get('roles', []) + + if CONF.os_inherit.enabled: + # Now get any inherited roles for the owning domain + try: + metadata_ref = self._get_metadata( + user_id=user_id, domain_id=project_ref['domain_id']) + role_list += self._roles_from_role_dicts( + metadata_ref.get('roles', {}), True) + except (exception.MetadataNotFound, exception.NotImplemented): + pass + + return role_list self.identity_api.get_user(user_id) - self.get_project(tenant_id) - user_role_list = _get_user_project_roles(user_id, tenant_id) - group_role_list = _get_group_project_roles(user_id, tenant_id) + project_ref = self.get_project(tenant_id) + user_role_list = _get_user_project_roles(user_id, project_ref) + group_role_list = _get_group_project_roles(user_id, project_ref) # Use set() to process the list to remove any duplicates return list(set(user_role_list + group_role_list)) @@ -106,11 +145,12 @@ class Manager(manager.Manager): try: metadata_ref = self._get_metadata(group_id=x['id'], domain_id=domain_id) - role_list += metadata_ref.get('roles', []) + role_list += self._roles_from_role_dicts( + metadata_ref.get('roles', {}), False) except (exception.MetadataNotFound, exception.NotImplemented): # MetadataNotFound implies no group grant, so skip. # Ignore NotImplemented since not all backends support - # domains. pass + # domains. pass return role_list @@ -124,7 +164,8 @@ class Manager(manager.Manager): # Ignore NotImplemented since not all backends support # domains pass - return metadata_ref.get('roles', []) + return self._roles_from_role_dicts( + metadata_ref.get('roles', {}), False) self.identity_api.get_user(user_id) self.get_domain(domain_id) @@ -160,6 +201,40 @@ class Manager(manager.Manager): class Driver(object): + def _role_to_dict(self, role_id, inherited): + role_dict = {'id': role_id} + if inherited: + role_dict['inherited_to'] = 'projects' + return role_dict + + def _roles_from_role_dicts(self, dict_list, inherited): + role_list = [] + for d in dict_list: + if ((not d.get('inherited_to') and not inherited) or + (d.get('inherited_to') == 'projects' and inherited)): + role_list.append(d['id']) + return role_list + + def _add_role_to_role_dicts(self, role_id, inherited, dict_list, + allow_existing=True): + # There is a difference in error semantics when trying to + # assign a role that already exists between the coded v2 and v3 + # API calls. v2 will error if the assignment already exists, + # while v3 is silent. Setting the 'allow_existing' parameter + # appropriately lets this call be used for both. + role_set = set([frozenset(r.items()) for r in dict_list]) + key = frozenset(self._role_to_dict(role_id, inherited).items()) + if not allow_existing and key in role_set: + raise KeyError + role_set.add(key) + return [dict(r) for r in role_set] + + def _remove_role_from_role_dicts(self, role_id, inherited, dict_list): + role_set = set([frozenset(r.items()) for r in dict_list]) + role_set.remove(frozenset(self._role_to_dict(role_id, + inherited).items())) + return [dict(r) for r in role_set] + def get_project_by_name(self, tenant_name, domain_id): """Get a tenant by name. @@ -209,9 +284,14 @@ class Driver(object): # assignment/grant crud def create_grant(self, role_id, user_id=None, group_id=None, - domain_id=None, project_id=None): + domain_id=None, project_id=None, + inherited_to_projects=False): """Creates a new assignment/grant. + If the assignment is to a domain, then optionally it may be + specified as inherited to owned projects (this requires + the OS-INHERIT extension to be enabled). + :raises: keystone.exception.UserNotFound, keystone.exception.GroupNotFound, keystone.exception.ProjectNotFound, @@ -223,7 +303,8 @@ class Driver(object): raise exception.NotImplemented() def list_grants(self, user_id=None, group_id=None, - domain_id=None, project_id=None): + domain_id=None, project_id=None, + inherited_to_projects=False): """Lists assignments/grants. :raises: keystone.exception.UserNotFound, @@ -237,7 +318,8 @@ class Driver(object): raise exception.NotImplemented() def get_grant(self, role_id, user_id=None, group_id=None, - domain_id=None, project_id=None): + domain_id=None, project_id=None, + inherited_to_projects=False): """Lists assignments/grants. :raises: keystone.exception.UserNotFound, @@ -251,7 +333,8 @@ class Driver(object): raise exception.NotImplemented() def delete_grant(self, role_id, user_id=None, group_id=None, - domain_id=None, project_id=None): + domain_id=None, project_id=None, + inherited_to_projects=False): """Lists assignments/grants. :raises: keystone.exception.UserNotFound, @@ -329,7 +412,7 @@ class Driver(object): """ raise exception.NotImplemented() - def list_projects(self): + def list_projects(self, domain_id=None): """List all projects in the system. :returns: a list of project_refs or an empty list. diff --git a/keystone/common/config.py b/keystone/common/config.py index fc428f8c..030ead69 100644 --- a/keystone/common/config.py +++ b/keystone/common/config.py @@ -214,6 +214,9 @@ def configure(): # trust register_bool('enabled', group='trust', default=True) + # os_inherit + register_bool('enabled', group='os_inherit', default=False) + # ssl register_bool('enable', group='ssl', default=False) register_str('certfile', group='ssl', diff --git a/keystone/common/sql/migrate_repo/versions/028_fixup_group_metadata.py b/keystone/common/sql/migrate_repo/versions/028_fixup_group_metadata.py new file mode 100644 index 00000000..66055a99 --- /dev/null +++ b/keystone/common/sql/migrate_repo/versions/028_fixup_group_metadata.py @@ -0,0 +1,190 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 OpenStack LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import sqlalchemy as sql + + +def upgrade(migrate_engine): + # The group_project_metadata table was not updated in terms of its + # FK to the tenant table when the tenant->project change was made at + # the 015 migration for sqlite. This upgrade fixes that. + # We need to create a fake tenant table so that we can first load + # the group_project_metadata at all, then do a dance of copying tables + # to get us to the correct schema. + meta = sql.MetaData() + meta.bind = migrate_engine + + if migrate_engine.name != 'sqlite': + return + + temp_tenant_table = sql.Table( + 'tenant', + meta, + sql.Column('id', sql.String(64), primary_key=True)) + temp_tenant_table.create(migrate_engine, checkfirst=True) + + sql.Table('user', meta, autoload=True) + old_group_metadata_table = sql.Table('group_project_metadata', + meta, autoload=True) + + # OK, we now have the table loaded, create a first + # temporary table of a different name with the correct FK + sql.Table('project', meta, autoload=True) + temp_group_project_metadata_table = sql.Table( + 'temp_group_project_metadata', + meta, + sql.Column( + 'group_id', + sql.String(64), + primary_key=True), + sql.Column( + 'project_id', + sql.String(64), + sql.ForeignKey('project.id'), + primary_key=True), + sql.Column('data', sql.Text())) + temp_group_project_metadata_table.create(migrate_engine, checkfirst=True) + + # Populate the new temporary table, and then drop the old one + session = sql.orm.sessionmaker(bind=migrate_engine)() + + for metadata in session.query(old_group_metadata_table): + q = temp_group_project_metadata_table.insert().values( + group_id=metadata.group_id, + project_id=metadata.project_id, + data=metadata.data) + session.execute(q) + session.commit() + old_group_metadata_table.drop() + temp_tenant_table.drop() + + # Now do a final table copy to get the table of the right name. + # Re-init the metadata so that sqlalchemy does not get confused with + # multiple versions of the same named table. + meta2 = sql.MetaData() + meta2.bind = migrate_engine + + sql.Table('project', meta2, autoload=True) + new_group_project_metadata_table = sql.Table( + 'group_project_metadata', + meta2, + sql.Column( + 'group_id', + sql.String(64), + primary_key=True), + sql.Column( + 'project_id', + sql.String(64), + sql.ForeignKey('project.id'), + primary_key=True), + sql.Column('data', sql.Text())) + new_group_project_metadata_table.create(migrate_engine, checkfirst=True) + + for metadata in session.query(temp_group_project_metadata_table): + q = new_group_project_metadata_table.insert().values( + group_id=metadata.group_id, + project_id=metadata.project_id, + data=metadata.data) + session.execute(q) + session.commit() + + temp_group_project_metadata_table.drop() + + +def downgrade(migrate_engine): + # Put the group_project_metadata table back the way it was in its rather + # broken state. We don't try and re-write history, since otherwise people + # get out of step. + meta = sql.MetaData() + meta.bind = migrate_engine + + if migrate_engine.name != 'sqlite': + return + + sql.Table('user', meta, autoload=True) + sql.Table('project', meta, autoload=True) + group_metadata_table = sql.Table('group_project_metadata', + meta, autoload=True) + + # We want to create a temp group meta table with the FK + # set to the wrong place. + temp_tenant_table = sql.Table( + 'tenant', + meta, + sql.Column('id', sql.String(64), primary_key=True)) + temp_tenant_table.create(migrate_engine, checkfirst=True) + + temp_group_project_metadata_table = sql.Table( + 'temp_group_project_metadata', + meta, + sql.Column( + 'group_id', + sql.String(64), + primary_key=True), + sql.Column( + 'project_id', + sql.String(64), + sql.ForeignKey('tenant.id'), + primary_key=True), + sql.Column('data', sql.Text())) + temp_group_project_metadata_table.create(migrate_engine, checkfirst=True) + + # Now populate the temp table and drop the real one + session = sql.orm.sessionmaker(bind=migrate_engine)() + + for metadata in session.query(group_metadata_table): + q = temp_group_project_metadata_table.insert().values( + group_id=metadata.group_id, + project_id=metadata.project_id, + data=metadata.data) + session.execute(q) + + session.commit() + group_metadata_table.drop() + + # Now copy again into the correctly named table. Re-init the metadata + # so that sqlalchemy does not get confused with multiple versions of the + # same named table. + meta2 = sql.MetaData() + meta2.bind = migrate_engine + + sql.Table('tenant', meta2, autoload=True) + new_group_project_metadata_table = sql.Table( + 'group_project_metadata', + meta2, + sql.Column( + 'group_id', + sql.String(64), + primary_key=True), + sql.Column( + 'project_id', + sql.String(64), + sql.ForeignKey('tenant.id'), + primary_key=True), + sql.Column('data', sql.Text())) + new_group_project_metadata_table.create(migrate_engine, checkfirst=True) + + for metadata in session.query(temp_group_project_metadata_table): + q = new_group_project_metadata_table.insert().values( + group_id=metadata.group_id, + project_id=metadata.project_id, + data=metadata.data) + session.execute(q) + + session.commit() + + temp_group_project_metadata_table.drop() + temp_tenant_table.drop() diff --git a/keystone/common/sql/migrate_repo/versions/029_update_assignment_metadata.py b/keystone/common/sql/migrate_repo/versions/029_update_assignment_metadata.py new file mode 100644 index 00000000..a9276804 --- /dev/null +++ b/keystone/common/sql/migrate_repo/versions/029_update_assignment_metadata.py @@ -0,0 +1,102 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 OpenStack LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json + +import sqlalchemy as sql + + +def build_update(table_name, upgrade_table, row, values): + if table_name == 'user_project_metadata': + update = upgrade_table.update().where( + upgrade_table.c.user_id == row.user_id).where( + upgrade_table.c.project_id == row.project_id).values(values) + elif table_name == 'group_project_metadata': + update = upgrade_table.update().where( + upgrade_table.c.group_id == row.group_id).where( + upgrade_table.c.project_id == row.project_id).values(values) + elif table_name == 'user_domain_metadata': + update = upgrade_table.update().where( + upgrade_table.c.user_id == row.user_id).where( + upgrade_table.c.domain_id == row.domain_id).values(values) + else: + update = upgrade_table.update().where( + upgrade_table.c.group_id == row.group_id).where( + upgrade_table.c.domain_id == row.domain_id).values(values) + return update + + +def upgrade_grant_table(meta, migrate_engine, session, table_name): + + # Convert the roles component of the metadata from a list + # of ids to a list of dicts + + def list_to_dict_list(metadata): + json_metadata = json.loads(metadata) + if 'roles' in json_metadata: + json_metadata['roles'] = ( + [{'id': x} for x in json_metadata['roles']]) + return json.dumps(json_metadata) + + upgrade_table = sql.Table(table_name, meta, autoload=True) + for assignment in session.query(upgrade_table): + values = {'data': list_to_dict_list(assignment.data)} + update = build_update(table_name, upgrade_table, assignment, values) + migrate_engine.execute(update) + + +def downgrade_grant_table(meta, migrate_engine, session, table_name): + + # Convert the roles component of the metadata from a list + # of dicts to a simple list of ids. Any inherited roles are deleted + # since they would have no meaning + + def dict_list_to_list(metadata): + json_metadata = json.loads(metadata) + if 'roles' in json_metadata: + json_metadata['roles'] = ([x['id'] for x in json_metadata['roles'] + if 'inherited_to' not in x]) + return json.dumps(json_metadata) + + downgrade_table = sql.Table(table_name, meta, autoload=True) + for assignment in session.query(downgrade_table): + values = {'data': dict_list_to_list(assignment.data)} + update = build_update(table_name, downgrade_table, assignment, values) + migrate_engine.execute(update) + + +def upgrade(migrate_engine): + meta = sql.MetaData() + meta.bind = migrate_engine + session = sql.orm.sessionmaker(bind=migrate_engine)() + + for grant_table in ['user_project_metadata', 'user_domain_metadata', + 'group_project_metadata', 'group_domain_metadata']: + upgrade_grant_table(meta, migrate_engine, session, grant_table) + session.commit() + session.close() + + +def downgrade(migrate_engine): + meta = sql.MetaData() + meta.bind = migrate_engine + session = sql.orm.sessionmaker(bind=migrate_engine)() + + for grant_table in ['user_project_metadata', 'user_domain_metadata', + 'group_project_metadata', 'group_domain_metadata']: + downgrade_grant_table(meta, migrate_engine, session, grant_table) + session.commit() + session.close() diff --git a/keystone/identity/controllers.py b/keystone/identity/controllers.py index 9271f3d9..b4e8ea6a 100644 --- a/keystone/identity/controllers.py +++ b/keystone/identity/controllers.py @@ -748,6 +748,11 @@ class RoleV3(controller.V3Controller): msg = 'Specify a user or group, not both' raise exception.ValidationError(msg) + def _check_if_inherited(self, context): + return (CONF.os_inherit.enabled and + context['path'].startswith('/OS-INHERIT') and + context['path'].endswith('/inherited_to_projects')) + @controller.protected def create_grant(self, context, role_id, user_id=None, group_id=None, domain_id=None, project_id=None): @@ -756,7 +761,8 @@ class RoleV3(controller.V3Controller): self._require_user_xor_group(user_id, group_id) self.identity_api.create_grant( - role_id, user_id, group_id, domain_id, project_id) + role_id, user_id, group_id, domain_id, project_id, + self._check_if_inherited(context)) @controller.protected def list_grants(self, context, user_id=None, group_id=None, @@ -766,7 +772,8 @@ class RoleV3(controller.V3Controller): self._require_user_xor_group(user_id, group_id) refs = self.identity_api.list_grants( - user_id, group_id, domain_id, project_id) + user_id, group_id, domain_id, project_id, + self._check_if_inherited(context)) return RoleV3.wrap_collection(context, refs) @controller.protected @@ -777,7 +784,8 @@ class RoleV3(controller.V3Controller): self._require_user_xor_group(user_id, group_id) self.identity_api.get_grant( - role_id, user_id, group_id, domain_id, project_id) + role_id, user_id, group_id, domain_id, project_id, + self._check_if_inherited(context)) @controller.protected def revoke_grant(self, context, role_id, user_id=None, group_id=None, @@ -787,7 +795,8 @@ class RoleV3(controller.V3Controller): self._require_user_xor_group(user_id, group_id) self.identity_api.delete_grant( - role_id, user_id, group_id, domain_id, project_id) + role_id, user_id, group_id, domain_id, project_id, + self._check_if_inherited(context)) # Now delete any tokens for this user or, in the case of a group, # tokens from all the uses who are members of this group. @@ -815,13 +824,55 @@ class RoleAssignmentV3(controller.V3Controller): pass def _format_entity(self, entity): + """Format an assignment entity for API response. + + The driver layer returns entities as dicts containing the ids of the + actor (e.g. user or group), target (e.g. domain or project) and role. + If it is an inherited role, then this is also indicated. Examples: + + {'user_id': user_id, + 'project_id': domain_id, + 'role_id': role_id} + + or, for an inherited role: + + {'user_id': user_id, + 'domain_id': domain_id, + 'role_id': role_id, + 'inherited_to_projects': true} + + This function maps this into the format to be returned via the API, + e.g. for the second example above: + + { + 'user': { + {'id': user_id} + }, + 'scope': { + 'domain': { + {'id': domain_id} + }, + 'OS-INHERIT:inherited_to': 'projects + }, + 'role': { + {'id': role_id} + }, + 'links': { + 'assignment': '/domains/domain_id/users/user_id/roles/' + 'role_id/inherited_to_projects' + } + } + + """ + formatted_entity = {} + suffix = "" if 'user_id' in entity: formatted_entity['user'] = {'id': entity['user_id']} - actor_link = '/users/%s' % entity['user_id'] + actor_link = 'users/%s' % entity['user_id'] if 'group_id' in entity: formatted_entity['group'] = {'id': entity['group_id']} - actor_link = '/groups/%s' % entity['group_id'] + actor_link = 'groups/%s' % entity['group_id'] if 'role_id' in entity: formatted_entity['role'] = {'id': entity['role_id']} if 'project_id' in entity: @@ -831,12 +882,21 @@ class RoleAssignmentV3(controller.V3Controller): if 'domain_id' in entity: formatted_entity['scope'] = ( {'domain': {'id': entity['domain_id']}}) - target_link = '/domains/%s' % entity['domain_id'] - + if 'inherited_to_projects' in entity: + formatted_entity['scope']['OS-INHERIT:inherited_to'] = ( + 'projects') + target_link = '/OS-INHERIT/domains/%s' % entity['domain_id'] + suffix = '/inherited_to_projects' + else: + target_link = '/domains/%s' % entity['domain_id'] formatted_entity.setdefault('links', {}) formatted_entity['links']['assignment'] = ( - self.base_url(target_link + actor_link + - '/roles/%s' % entity['role_id'])) + self.base_url('%(target)s/%(actor)s/roles/%(role)s%(suffix)s' % { + 'target': target_link, + 'actor': actor_link, + 'role': entity['role_id'], + 'suffix': suffix})) + return formatted_entity def _expand_indirect_assignments(self, refs): @@ -846,6 +906,10 @@ class RoleAssignmentV3(controller.V3Controller): entity for each member of that group, and then remove the group assignment entity itself from the list. + If the OS-INHERIT extension is enabled, then honor any inherited + roles on the domain by creating the equivalent on all projects + owned by the domain. + For any new entity created by virtue of group membership, add in an additional link to that membership. @@ -881,7 +945,8 @@ class RoleAssignmentV3(controller.V3Controller): 'role': ref.get('role_id')}) return members - def _build_equivalent_user_assignment(user, group_id, template): + def _build_user_assignment_equivalent_of_group( + user, group_id, template): """Create a user assignment equivalent to the group one. The template has had the 'group' entity removed, so @@ -900,33 +965,112 @@ class RoleAssignmentV3(controller.V3Controller): '/projects/%s' % scope['project']['id']) user_entry['links']['assignment'] = ( self.base_url('%s/users/%s/roles/%s' % - (target_link, m['id'], + (target_link, user['id'], user_entry['role']['id']))) user_entry['links']['membership'] = ( self.base_url('/groups/%s/users/%s' % (group_id, user['id']))) return user_entry - # Scan the list of entities for any group assignments, expanding - # them into equivalent user entities. Due to potential large - # expansion of group entities, rather than modify the + def _build_project_equivalent_of_user_domain_role( + project_id, domain_id, template): + """Create a user project assignment equivalent to the domain one. + + The template has had the 'domain' entity removed, so + substitute a 'project' one, modifying the 'assignment' link + to match. + + """ + project_entry = copy.deepcopy(template) + project_entry['scope']['project'] = {'id': project_id} + project_entry['links']['assignment'] = ( + self.base_url( + '/OS-INHERIT/domains/%s/users/%s/roles/%s' + '/inherited_to_projects' % ( + domain_id, project_entry['user']['id'], + project_entry['role']['id']))) + return project_entry + + def _build_project_equivalent_of_group_domain_role( + user_id, group_id, project_id, domain_id, template): + """Create a user project equivalent to the domain group one. + + The template has had the 'domain' and 'group' entities removed, so + substitute a 'user-project' one, modifying the 'assignment' link + to match. + + """ + project_entry = copy.deepcopy(template) + project_entry['user'] = {'id': user_id} + project_entry['scope']['project'] = {'id': project_id} + project_entry['links']['assignment'] = ( + self.base_url('/OS-INHERIT/domains/%s/groups/%s/roles/%s' + '/inherited_to_projects' % ( + domain_id, group_id, + project_entry['role']['id']))) + project_entry['links']['membership'] = ( + self.base_url('/groups/%s/users/%s' % + (group_id, user_id))) + return project_entry + + # Scan the list of entities for any assignments that need to be + # expanded. + # + # If the OS-INERIT extension is enabled, the refs lists may + # contain roles to be inherited from domain to project, so expand + # these as well into project equivalents + # + # For any regular group entries, expand these into user entries based + # on membership of that group. + # + # Due to the potentially large expansions, rather than modify the # list we are enumerating, we build a new one as we go. + # + new_refs = [] for r in refs: - if 'group' in r: - # As it is a group role assignment, first get the list of - # members. - + if 'OS-INHERIT:inherited_to' in r['scope']: + # It's an inherited domain role - so get the list of projects + # owned by this domain. A domain scope is guaranteed since we + # checked this when we built the refs list + project_ids = ( + [x['id'] for x in self.assignment_api.list_projects( + r['scope']['domain']['id'])]) + base_entry = copy.deepcopy(r) + domain_id = base_entry['scope']['domain']['id'] + base_entry['scope'].pop('domain') + # For each project, create an equivalent role assignment + for p in project_ids: + # If it's a group assignment, then create equivalent user + # roles based on membership of the group + if 'group' in base_entry: + members = _get_group_members(base_entry) + sub_entry = copy.deepcopy(base_entry) + group_id = sub_entry['group']['id'] + sub_entry.pop('group') + for m in members: + new_entry = ( + _build_project_equivalent_of_group_domain_role( + m['id'], group_id, p, + domain_id, sub_entry)) + new_refs.append(new_entry) + else: + new_entry = ( + _build_project_equivalent_of_user_domain_role( + p, domain_id, base_entry)) + new_refs.append(new_entry) + elif 'group' in r: + # It's a non-inherited group role assignment, so get the list + # of members. members = _get_group_members(r) # Now replace that group role assignment entry with an # equivalent user role assignment for each of the group members - base_entry = copy.deepcopy(r) group_id = base_entry['group']['id'] base_entry.pop('group') for m in members: - user_entry = _build_equivalent_user_assignment( + user_entry = _build_user_assignment_equivalent_of_group( m, group_id, base_entry) new_refs.append(user_entry) else: @@ -954,9 +1098,16 @@ class RoleAssignmentV3(controller.V3Controller): val = True return val + def _filter_inherited(self, entry): + if ('inherited_to_projects' in entry and + not CONF.os_inherit.enabled): + return False + else: + return True + @controller.filterprotected('group.id', 'role.id', 'scope.domain.id', 'scope.project.id', - 'user.id') + 'scope.OS-INHERIT:inherited_to', 'user.id') def list_role_assignments(self, context, filters): # TODO(henry-nash): This implementation uses the standard filtering @@ -966,7 +1117,9 @@ class RoleAssignmentV3(controller.V3Controller): # kept a minimum. refs = self.identity_api.list_role_assignments() - formatted_refs = [self._format_entity(x) for x in refs] + formatted_refs = ( + [self._format_entity(x) for x in refs + if self._filter_inherited(x)]) if ('effective' in context['query_string'] and self._query_filter_is_true( diff --git a/keystone/identity/core.py b/keystone/identity/core.py index 981d24ed..d04902ae 100644 --- a/keystone/identity/core.py +++ b/keystone/identity/core.py @@ -107,8 +107,8 @@ class Manager(manager.Manager): def get_project(self, tenant_id): return self.assignment.get_project(tenant_id) - def list_projects(self): - return self.assignment.list_projects() + def list_projects(self, domain_id=None): + return self.assignment.list_projects(domain_id) def get_role(self, role_id): return self.assignment.get_role(role_id) @@ -156,24 +156,32 @@ class Manager(manager.Manager): return self.assignment.update_role(role_id, role) def create_grant(self, role_id, user_id=None, group_id=None, - domain_id=None, project_id=None): + domain_id=None, project_id=None, + inherited_to_projects=False): return (self.assignment.create_grant - (role_id, user_id, group_id, domain_id, project_id)) + (role_id, user_id, group_id, domain_id, project_id, + inherited_to_projects)) def list_grants(self, user_id=None, group_id=None, - domain_id=None, project_id=None): + domain_id=None, project_id=None, + inherited_to_projects=False): return (self.assignment.list_grants - (user_id, group_id, domain_id, project_id)) + (user_id, group_id, domain_id, project_id, + inherited_to_projects)) def get_grant(self, role_id, user_id=None, group_id=None, - domain_id=None, project_id=None): + domain_id=None, project_id=None, + inherited_to_projects=False): return (self.assignment.get_grant - (role_id, user_id, group_id, domain_id, project_id)) + (role_id, user_id, group_id, domain_id, project_id, + inherited_to_projects)) def delete_grant(self, role_id, user_id=None, group_id=None, - domain_id=None, project_id=None): + domain_id=None, project_id=None, + inherited_to_projects=False): return (self.assignment.delete_grant - (role_id, user_id, group_id, domain_id, project_id)) + (role_id, user_id, group_id, domain_id, project_id, + inherited_to_projects)) def create_domain(self, domain_id, domain): return self.assignment.create_domain(domain_id, domain) @@ -214,6 +222,7 @@ class Driver(object): :raises: AssertionError """ raise exception.NotImplemented() + # user crud def create_user(self, user_id, user): diff --git a/keystone/identity/routers.py b/keystone/identity/routers.py index ab71eb4f..5f236842 100644 --- a/keystone/identity/routers.py +++ b/keystone/identity/routers.py @@ -16,6 +16,7 @@ """WSGI Routers for the Identity service.""" from keystone.common import router from keystone.common import wsgi +from keystone import config from keystone.identity import controllers @@ -174,6 +175,47 @@ def append_v3_routers(mapper, routers): action='revoke_grant', conditions=dict(method=['DELETE'])) + if config.CONF.os_inherit.enabled: + mapper.connect(('/OS-INHERIT/domains/{domain_id}/users/{user_id}' + '/roles/{role_id}/inherited_to_projects'), + controller=role_controller, + action='create_grant', + conditions=dict(method=['PUT'])) + mapper.connect(('/OS-INHERIT/domains/{domain_id}/groups/{group_id}' + '/roles/{role_id}/inherited_to_projects'), + controller=role_controller, + action='create_grant', + conditions=dict(method=['PUT'])) + mapper.connect(('/OS-INHERIT/domains/{domain_id}/users/{user_id}' + '/roles/{role_id}/inherited_to_projects'), + controller=role_controller, + action='check_grant', + conditions=dict(method=['HEAD'])) + mapper.connect(('/OS-INHERIT/domains/{domain_id}/groups/{group_id}' + '/roles/{role_id}/inherited_to_projects'), + controller=role_controller, + action='check_grant', + conditions=dict(method=['HEAD'])) + mapper.connect(('/OS-INHERIT/domains/{domain_id}/users/{user_id}' + '/roles/inherited_to_projects'), + controller=role_controller, + action='list_grants', + conditions=dict(method=['GET'])) + mapper.connect(('/OS-INHERIT/domains/{domain_id}/groups/{group_id}' + '/roles/inherited_to_projects'), + controller=role_controller, + action='list_grants', + conditions=dict(method=['GET'])) + mapper.connect(('/OS-INHERIT/domains/{domain_id}/users/{user_id}' + '/roles/{role_id}/inherited_to_projects'), + controller=role_controller, + action='revoke_grant', + conditions=dict(method=['DELETE'])) + mapper.connect(('/OS-INHERIT/domains/{domain_id}/groups/{group_id}' + '/roles/{role_id}/inherited_to_projects'), + controller=role_controller, + action='revoke_grant', + conditions=dict(method=['DELETE'])) routers.append( router.Router(controllers.RoleAssignmentV3(), 'role_assignments', 'role_assignment')) diff --git a/tests/test_backend.py b/tests/test_backend.py index 48e8d3a0..8599ba24 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -1106,12 +1106,17 @@ class IdentityTests(object): group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id'], 'enabled': True} self.identity_api.create_group(group1['id'], group1) + group2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'domain_id': domain1['id'], 'enabled': True} + self.identity_api.create_group(group2['id'], group2) project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id']} self.identity_api.create_project(project1['id'], project1) self.identity_api.add_user_to_group(user1['id'], group1['id']) + self.identity_api.add_user_to_group(user1['id'], + group2['id']) roles_ref = self.identity_api.list_grants( user_id=user1['id'], @@ -1766,6 +1771,30 @@ class IdentityTests(object): self.assertIn(self.tenant_bar['id'], project_ids) self.assertIn(self.tenant_baz['id'], project_ids) + def test_list_projects_for_domain(self): + project_ids = ([x['id'] for x in + self.assignment_api.list_projects(DEFAULT_DOMAIN_ID)]) + self.assertEquals(len(project_ids), 4) + self.assertIn(self.tenant_bar['id'], project_ids) + self.assertIn(self.tenant_baz['id'], project_ids) + self.assertIn(self.tenant_mtu['id'], project_ids) + self.assertIn(self.tenant_service['id'], project_ids) + + def test_list_projects_for_alternate_domain(self): + domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.assignment_api.create_domain(domain1['id'], domain1) + project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'domain_id': domain1['id']} + self.assignment_api.create_project(project1['id'], project1) + project2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'domain_id': domain1['id']} + self.assignment_api.create_project(project2['id'], project2) + project_ids = ([x['id'] for x in + self.assignment_api.list_projects(domain1['id'])]) + self.assertEquals(len(project_ids), 2) + self.assertIn(project1['id'], project_ids) + self.assertIn(project2['id'], project_ids) + def test_list_roles(self): roles = self.identity_api.list_roles() for test_role in default_fixtures.ROLES: @@ -2690,3 +2719,164 @@ class PolicyTests(object): self.assertRaises(exception.PolicyNotFound, self.policy_api.delete_policy, uuid.uuid4().hex) + + +class InheritanceTests(object): + + def test_inherited_role_grants_for_user(self): + """Test inherited user roles. + + Test Plan: + - Enable OS-INHERIT extension + - Create 3 roles + - Create a domain, with a project and a user + - Check no roles yet exit + - Assign a direct user role to the project and a (non-inherited) + user role to the domain + - Get a list of effective roles - should only get the one direct role + - Now add an inherited user role to the domain + - Get a list of effective roles - should have two roles, one + direct and one by virtue of the inherited user role + - Also get effective roles for the domain - the role marked as + inherited should not show up + + """ + self.opt_in_group('os_inherit', enabled=True) + role_list = [] + for _ in range(3): + role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_role(role['id'], role) + role_list.append(role) + domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_domain(domain1['id'], domain1) + user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'domain_id': domain1['id'], 'password': uuid.uuid4().hex, + 'enabled': True} + self.identity_api.create_user(user1['id'], user1) + project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'domain_id': domain1['id']} + self.identity_api.create_project(project1['id'], project1) + + roles_ref = self.identity_api.list_grants( + user_id=user1['id'], + project_id=project1['id']) + self.assertEquals(len(roles_ref), 0) + + # Create the first two roles - the domain one is not inherited + self.identity_api.create_grant(user_id=user1['id'], + project_id=project1['id'], + role_id=role_list[0]['id']) + self.identity_api.create_grant(user_id=user1['id'], + domain_id=domain1['id'], + role_id=role_list[1]['id']) + + # Now get the effective roles for the user and project, this + # should only include the direct role assignment on the project + combined_role_list = self.identity_api.get_roles_for_user_and_project( + user1['id'], project1['id']) + self.assertEquals(len(combined_role_list), 1) + self.assertIn(role_list[0]['id'], combined_role_list) + + # Now add an inherited role on the domain + self.identity_api.create_grant(user_id=user1['id'], + domain_id=domain1['id'], + role_id=role_list[2]['id'], + inherited_to_projects=True) + + # Now get the effective roles for the user and project again, this + # should now include the inherited role on the domain + combined_role_list = self.identity_api.get_roles_for_user_and_project( + user1['id'], project1['id']) + self.assertEquals(len(combined_role_list), 2) + self.assertIn(role_list[0]['id'], combined_role_list) + self.assertIn(role_list[2]['id'], combined_role_list) + + # Finally, check that the inherited role does not appear as a valid + # directly assigned role on the domain itself + combined_role_list = self.identity_api.get_roles_for_user_and_domain( + user1['id'], domain1['id']) + self.assertEquals(len(combined_role_list), 1) + self.assertIn(role_list[1]['id'], combined_role_list) + + def test_inherited_role_grants_for_group(self): + """Test inherited group roles. + + Test Plan: + - Enable OS-INHERIT extension + - Create 4 roles + - Create a domain, with a project, user and two groups + - Make the user a member of both groups + - Check no roles yet exit + - Assign a direct user role to the project and a (non-inherited) + group role on the domain + - Get a list of effective roles - should only get the one direct role + - Now add two inherited group roles to the domain + - Get a list of effective roles - should have three roles, one + direct and two by virtue of inherited group roles + + """ + self.opt_in_group('os_inherit', enabled=True) + role_list = [] + for _ in range(4): + role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_role(role['id'], role) + role_list.append(role) + domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_domain(domain1['id'], domain1) + user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'domain_id': domain1['id'], 'password': uuid.uuid4().hex, + 'enabled': True} + self.identity_api.create_user(user1['id'], user1) + group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'domain_id': domain1['id'], 'enabled': True} + self.identity_api.create_group(group1['id'], group1) + group2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'domain_id': domain1['id'], 'enabled': True} + self.identity_api.create_group(group2['id'], group2) + project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'domain_id': domain1['id']} + self.identity_api.create_project(project1['id'], project1) + + self.identity_api.add_user_to_group(user1['id'], + group1['id']) + self.identity_api.add_user_to_group(user1['id'], + group2['id']) + + roles_ref = self.identity_api.list_grants( + user_id=user1['id'], + project_id=project1['id']) + self.assertEquals(len(roles_ref), 0) + + # Create two roles - the domain one is not inherited + self.identity_api.create_grant(user_id=user1['id'], + project_id=project1['id'], + role_id=role_list[0]['id']) + self.identity_api.create_grant(group_id=group1['id'], + domain_id=domain1['id'], + role_id=role_list[1]['id']) + + # Now get the effective roles for the user and project, this + # should only include the direct role assignment on the project + combined_role_list = self.identity_api.get_roles_for_user_and_project( + user1['id'], project1['id']) + self.assertEquals(len(combined_role_list), 1) + self.assertIn(role_list[0]['id'], combined_role_list) + + # Now add to more group roles, both inherited, to the domain + self.identity_api.create_grant(group_id=group2['id'], + domain_id=domain1['id'], + role_id=role_list[2]['id'], + inherited_to_projects=True) + self.identity_api.create_grant(group_id=group2['id'], + domain_id=domain1['id'], + role_id=role_list[3]['id'], + inherited_to_projects=True) + + # Now get the effective roles for the user and project again, this + # should now include the inherited roles on the domain + combined_role_list = self.identity_api.get_roles_for_user_and_project( + user1['id'], project1['id']) + self.assertEquals(len(combined_role_list), 3) + self.assertIn(role_list[0]['id'], combined_role_list) + self.assertIn(role_list[2]['id'], combined_role_list) + self.assertIn(role_list[3]['id'], combined_role_list) diff --git a/tests/test_backend_ldap.py b/tests/test_backend_ldap.py index b68399ba..ce4a297c 100644 --- a/tests/test_backend_ldap.py +++ b/tests/test_backend_ldap.py @@ -630,6 +630,10 @@ class LDAPIdentity(test.TestCase, BaseLDAPIdentity): user1['id'], CONF.identity.default_domain_id) self.assertEquals(len(combined_role_list), 0) + def test_list_projects_for_alternate_domain(self): + raise nose.exc.SkipTest( + 'N/A: LDAP does not support multiple domains') + class LDAPIdentityEnabledEmulation(LDAPIdentity): def setUp(self): diff --git a/tests/test_backend_sql.py b/tests/test_backend_sql.py index 2f1f5e3e..38eddaa4 100644 --- a/tests/test_backend_sql.py +++ b/tests/test_backend_sql.py @@ -410,3 +410,7 @@ class SqlCatalog(SqlTests, test_backend.CatalogTests): class SqlPolicy(SqlTests, test_backend.PolicyTests): pass + + +class SqlInheritance(SqlTests, test_backend.InheritanceTests): + pass diff --git a/tests/test_sql_upgrade.py b/tests/test_sql_upgrade.py index e7e34b4b..64bf53c8 100644 --- a/tests/test_sql_upgrade.py +++ b/tests/test_sql_upgrade.py @@ -824,6 +824,304 @@ class SqlUpgradeTests(test.TestCase): self.assertEqual(ref.legacy_endpoint_id, legacy_endpoint_id) self.assertEqual(ref.extra, '{}') + def test_group_project_FK_fixup(self): + # To create test data we must start before we broke in the + # group_project_metadata table in 015. + self.upgrade(14) + session = self.Session() + + domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True) + group_table = sqlalchemy.Table('group', self.metadata, autoload=True) + tenant_table = sqlalchemy.Table('tenant', self.metadata, autoload=True) + role_table = sqlalchemy.Table('role', self.metadata, autoload=True) + group_project_metadata_table = sqlalchemy.Table( + 'group_project_metadata', self.metadata, autoload=True) + + # Create a Domain + domain = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'enabled': True} + session.execute(domain_table.insert().values(domain)) + + # Create two Tenants + tenant = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'extra': "{}"} + session.execute(tenant_table.insert().values(tenant)) + + tenant1 = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'extra': "{}"} + session.execute(tenant_table.insert().values(tenant1)) + + # Create a Group + group = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': domain['id'], + 'extra': json.dumps({})} + session.execute(group_table.insert().values(group)) + + # Create roles + role_list = [] + for _ in range(2): + role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + session.execute(role_table.insert().values(role)) + role_list.append(role) + + # Grant Role to User on Project + role_grant = {'group_id': group['id'], + 'project_id': tenant['id'], + 'data': json.dumps({'roles': [role_list[0]['id']]})} + session.execute( + group_project_metadata_table.insert().values(role_grant)) + + role_grant = {'group_id': group['id'], + 'project_id': tenant1['id'], + 'data': json.dumps({'roles': [role_list[1]['id']]})} + session.execute( + group_project_metadata_table.insert().values(role_grant)) + + session.commit() + + # Now upgrade and fix up the FKs + self.upgrade(28) + self.assertTableExists('group_project_metadata') + self.assertTableExists('project') + self.assertTableDoesNotExist('tenant') + + s = sqlalchemy.select([group_project_metadata_table.c.data]).where( + (group_project_metadata_table.c.group_id == group['id']) & + (group_project_metadata_table.c.project_id == tenant['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn(role_list[0]['id'], data['roles']) + + s = sqlalchemy.select([group_project_metadata_table.c.data]).where( + (group_project_metadata_table.c.group_id == group['id']) & + (group_project_metadata_table.c.project_id == tenant1['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn(role_list[1]['id'], data['roles']) + + self.downgrade(27) + self.assertTableExists('group_project_metadata') + self.assertTableExists('project') + self.assertTableDoesNotExist('tenant') + + def test_assignment_metadata_migration(self): + self.upgrade(28) + # Scaffolding + session = self.Session() + + domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True) + user_table = sqlalchemy.Table('user', self.metadata, autoload=True) + group_table = sqlalchemy.Table('group', self.metadata, autoload=True) + role_table = sqlalchemy.Table('role', self.metadata, autoload=True) + project_table = sqlalchemy.Table( + 'project', self.metadata, autoload=True) + user_project_metadata_table = sqlalchemy.Table( + 'user_project_metadata', self.metadata, autoload=True) + user_domain_metadata_table = sqlalchemy.Table( + 'user_domain_metadata', self.metadata, autoload=True) + group_project_metadata_table = sqlalchemy.Table( + 'group_project_metadata', self.metadata, autoload=True) + group_domain_metadata_table = sqlalchemy.Table( + 'group_domain_metadata', self.metadata, autoload=True) + + # Create a Domain + domain = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'enabled': True} + session.execute(domain_table.insert().values(domain)) + + # Create anther Domain + domain2 = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'enabled': True} + session.execute(domain_table.insert().values(domain2)) + + # Create a Project + project = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': domain['id'], + 'extra': "{}"} + session.execute(project_table.insert().values(project)) + + # Create another Project + project2 = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': domain['id'], + 'extra': "{}"} + session.execute(project_table.insert().values(project2)) + + # Create a User + user = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': domain['id'], + 'password': uuid.uuid4().hex, + 'enabled': True, + 'extra': json.dumps({})} + session.execute(user_table.insert().values(user)) + + # Create a Group + group = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': domain['id'], + 'extra': json.dumps({})} + session.execute(group_table.insert().values(group)) + + # Create roles + role_list = [] + for _ in range(7): + role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + session.execute(role_table.insert().values(role)) + role_list.append(role) + + # Grant Role to User on Project + role_grant = {'user_id': user['id'], + 'project_id': project['id'], + 'data': json.dumps({'roles': [role_list[0]['id']]})} + session.execute( + user_project_metadata_table.insert().values(role_grant)) + + role_grant = {'user_id': user['id'], + 'project_id': project2['id'], + 'data': json.dumps({'roles': [role_list[1]['id']]})} + session.execute( + user_project_metadata_table.insert().values(role_grant)) + + # Grant Role to Group on different Project + role_grant = {'group_id': group['id'], + 'project_id': project2['id'], + 'data': json.dumps({'roles': [role_list[2]['id']]})} + session.execute( + group_project_metadata_table.insert().values(role_grant)) + + # Grant Role to User on Domain + role_grant = {'user_id': user['id'], + 'domain_id': domain['id'], + 'data': json.dumps({'roles': [role_list[3]['id']]})} + session.execute(user_domain_metadata_table.insert().values(role_grant)) + + # Grant Role to Group on Domain + role_grant = {'group_id': group['id'], + 'domain_id': domain['id'], + 'data': json.dumps( + {'roles': [role_list[4]['id']], + 'other': 'somedata'})} + session.execute( + group_domain_metadata_table.insert().values(role_grant)) + + session.commit() + + self.upgrade(29) + s = sqlalchemy.select([user_project_metadata_table.c.data]).where( + (user_project_metadata_table.c.user_id == user['id']) & + (user_project_metadata_table.c.project_id == project['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn({'id': role_list[0]['id']}, data['roles']) + + s = sqlalchemy.select([user_project_metadata_table.c.data]).where( + (user_project_metadata_table.c.user_id == user['id']) & + (user_project_metadata_table.c.project_id == project2['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn({'id': role_list[1]['id']}, data['roles']) + + s = sqlalchemy.select([group_project_metadata_table.c.data]).where( + (group_project_metadata_table.c.group_id == group['id']) & + (group_project_metadata_table.c.project_id == project2['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn({'id': role_list[2]['id']}, data['roles']) + + s = sqlalchemy.select([user_domain_metadata_table.c.data]).where( + (user_domain_metadata_table.c.user_id == user['id']) & + (user_domain_metadata_table.c.domain_id == domain['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn({'id': role_list[3]['id']}, data['roles']) + + s = sqlalchemy.select([group_domain_metadata_table.c.data]).where( + (group_domain_metadata_table.c.group_id == group['id']) & + (group_domain_metadata_table.c.domain_id == domain['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn({'id': role_list[4]['id']}, data['roles']) + self.assertIn('other', data) + + # Now add an entry that has one regular and one inherited role + role_grant = {'user_id': user['id'], + 'domain_id': domain2['id'], + 'data': json.dumps( + {'roles': [{'id': role_list[5]['id']}, + {'id': role_list[6]['id'], + 'inherited_to': 'projects'}]})} + session.execute(user_domain_metadata_table.insert().values(role_grant)) + + session.commit() + self.downgrade(28) + + s = sqlalchemy.select([user_project_metadata_table.c.data]).where( + (user_project_metadata_table.c.user_id == user['id']) & + (user_project_metadata_table.c.project_id == project['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn(role_list[0]['id'], data['roles']) + + s = sqlalchemy.select([user_project_metadata_table.c.data]).where( + (user_project_metadata_table.c.user_id == user['id']) & + (user_project_metadata_table.c.project_id == project2['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn(role_list[1]['id'], data['roles']) + + s = sqlalchemy.select([group_project_metadata_table.c.data]).where( + (group_project_metadata_table.c.group_id == group['id']) & + (group_project_metadata_table.c.project_id == project2['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn(role_list[2]['id'], data['roles']) + + s = sqlalchemy.select([user_domain_metadata_table.c.data]).where( + (user_domain_metadata_table.c.user_id == user['id']) & + (user_domain_metadata_table.c.domain_id == domain['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn(role_list[3]['id'], data['roles']) + + s = sqlalchemy.select([group_domain_metadata_table.c.data]).where( + (group_domain_metadata_table.c.group_id == group['id']) & + (group_domain_metadata_table.c.domain_id == domain['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn(role_list[4]['id'], data['roles']) + self.assertIn('other', data) + + # For user-domain2, where we had one regular and one inherited role, + # only the direct role should remain, the inherited role should + # have been deleted during the downgrade + s = sqlalchemy.select([user_domain_metadata_table.c.data]).where( + (user_domain_metadata_table.c.user_id == user['id']) & + (user_domain_metadata_table.c.domain_id == domain2['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn(role_list[5]['id'], data['roles']) + def populate_user_table(self, with_pass_enab=False, with_pass_enab_domain=False): # Populate the appropriate fields in the user diff --git a/tests/test_v3.py b/tests/test_v3.py index ff9f7596..d3023843 100644 --- a/tests/test_v3.py +++ b/tests/test_v3.py @@ -338,7 +338,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase): links['previous']) def assertValidListResponse(self, resp, key, entity_validator, ref=None, - expected_length=None): + expected_length=None, keys_to_check=None): """Make assertions common to all API list responses. If a reference is provided, it's ID will be searched for in the @@ -359,11 +359,12 @@ class RestfulTestCase(test_content_types.RestfulTestCase): for entity in entities: self.assertIsNotNone(entity) - self.assertValidEntity(entity) + self.assertValidEntity(entity, keys_to_check=keys_to_check) entity_validator(entity) if ref: entity = [x for x in entities if x['id'] == ref['id']][0] - self.assertValidEntity(entity, ref) + self.assertValidEntity(entity, ref=ref, + keys_to_check=keys_to_check) entity_validator(entity, ref) return entities @@ -372,17 +373,21 @@ class RestfulTestCase(test_content_types.RestfulTestCase): """Make assertions common to all API responses.""" entity = resp.result.get(key) self.assertIsNotNone(entity) - self.assertValidEntity(entity, *args, **kwargs) + keys = kwargs.pop('keys_to_check', None) + self.assertValidEntity(entity, keys_to_check=keys, *args, **kwargs) entity_validator(entity, *args, **kwargs) return entity - def assertValidEntity(self, entity, ref=None): + def assertValidEntity(self, entity, ref=None, keys_to_check=None): """Make assertions common to all API entities. If a reference is provided, the entity will also be compared against the reference. """ - keys = ['name', 'description', 'enabled'] + if keys_to_check: + keys = keys_to_check + else: + keys = ['name', 'description', 'enabled'] for k in ['id'] + keys: msg = '%s unexpectedly None in %s' % (k, entity) @@ -705,6 +710,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase): resp, 'roles', self.assertValidRole, + keys_to_check=['name'], *args, **kwargs) @@ -713,6 +719,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase): resp, 'role', self.assertValidRole, + keys_to_check=['name'], *args, **kwargs) diff --git a/tests/test_v3_identity.py b/tests/test_v3_identity.py index af8890fb..3efc3b35 100644 --- a/tests/test_v3_identity.py +++ b/tests/test_v3_identity.py @@ -16,11 +16,64 @@ import uuid +from keystone import config from keystone import exception import test_v3 +def _build_role_assignment_url_and_entity( + role_id, user_id=None, group_id=None, domain_id=None, + project_id=None, inherited_to_projects=False, + effective=False): + + if user_id and domain_id: + url = ('/domains/%(domain_id)s/users/%(user_id)s' + '/roles/%(role_id)s' % { + 'domain_id': domain_id, + 'user_id': user_id, + 'role_id': role_id}) + entity = {'role': {'id': role_id}, + 'user': {'id': user_id}, + 'scope': {'domain': {'id': domain_id}}} + if inherited_to_projects: + url = '/OS-INHERIT%s/inherited_to_projects' % url + if not effective: + entity['OS-INHERIT:inherited_to'] = 'projects' + elif user_id and project_id: + url = ('/projects/%(project_id)s/users/%(user_id)s' + '/roles/%(role_id)s' % { + 'project_id': project_id, + 'user_id': user_id, + 'role_id': role_id}) + entity = {'role': {'id': role_id}, + 'user': {'id': user_id}, + 'scope': {'project': {'id': project_id}}} + if group_id and domain_id: + url = ('/domains/%(domain_id)s/groups/%(group_id)s' + '/roles/%(role_id)s' % { + 'domain_id': domain_id, + 'group_id': group_id, + 'role_id': role_id}) + entity = {'role': {'id': role_id}, + 'group': {'id': group_id}, + 'scope': {'domain': {'id': domain_id}}} + if inherited_to_projects: + url = '/OS-INHERIT%s/inherited_to_projects' % url + if not effective: + entity['OS-INHERIT:inherited_to'] = 'projects' + elif group_id and project_id: + url = ('/projects/%(project_id)s/groups/%(group_id)s' + '/roles/%(role_id)s' % { + 'project_id': project_id, + 'group_id': group_id, + 'role_id': role_id}) + entity = {'role': {'id': role_id}, + 'group': {'id': group_id}, + 'scope': {'project': {'id': project_id}}} + return (url, entity) + + class IdentityTestCase(test_v3.RestfulTestCase): """Test domains, projects, users, groups, & role CRUD.""" @@ -628,48 +681,6 @@ class IdentityTestCase(test_v3.RestfulTestCase): self.assertValidRoleListResponse(r, expected_length=0) self.assertIn(collection_url, r.result['links']['self']) - def _build_role_assignment_url_and_entity( - self, role_id, user_id=None, group_id=None, domain_id=None, - project_id=None): - - if user_id and domain_id: - url = ('/domains/%(domain_id)s/users/%(user_id)s' - '/roles/%(role_id)s' % { - 'domain_id': domain_id, - 'user_id': user_id, - 'role_id': role_id}) - entity = {'role': {'id': role_id}, - 'user': {'id': user_id}, - 'scope': {'domain': {'id': domain_id}}} - elif user_id and project_id: - url = ('/projects/%(project_id)s/users/%(user_id)s' - '/roles/%(role_id)s' % { - 'project_id': project_id, - 'user_id': user_id, - 'role_id': role_id}) - entity = {'role': {'id': role_id}, - 'user': {'id': user_id}, - 'scope': {'project': {'id': project_id}}} - if group_id and domain_id: - url = ('/domains/%(domain_id)s/groups/%(group_id)s' - '/roles/%(role_id)s' % { - 'domain_id': domain_id, - 'group_id': group_id, - 'role_id': role_id}) - entity = {'role': {'id': role_id}, - 'group': {'id': group_id}, - 'scope': {'domain': {'id': domain_id}}} - elif group_id and project_id: - url = ('/projects/%(project_id)s/groups/%(group_id)s' - '/roles/%(role_id)s' % { - 'project_id': project_id, - 'group_id': group_id, - 'role_id': role_id}) - entity = {'role': {'id': role_id}, - 'group': {'id': group_id}, - 'scope': {'project': {'id': project_id}}} - return (url, entity) - def test_get_role_assignments(self): """Call ``GET /role_assignments``. @@ -712,7 +723,7 @@ class IdentityTestCase(test_v3.RestfulTestCase): # Now add one of each of the four types of assignment, making sure # that we get them all back. - gd_url, gd_entity = self._build_role_assignment_url_and_entity( + gd_url, gd_entity = _build_role_assignment_url_and_entity( domain_id=self.domain_id, group_id=self.group_id, role_id=self.role_id) self.put(gd_url) @@ -722,7 +733,7 @@ class IdentityTestCase(test_v3.RestfulTestCase): existing_assignments + 1) self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url) - ud_url, ud_entity = self._build_role_assignment_url_and_entity( + ud_url, ud_entity = _build_role_assignment_url_and_entity( domain_id=self.domain_id, user_id=self.user1['id'], role_id=self.role_id) self.put(ud_url) @@ -732,7 +743,7 @@ class IdentityTestCase(test_v3.RestfulTestCase): existing_assignments + 2) self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url) - gp_url, gp_entity = self._build_role_assignment_url_and_entity( + gp_url, gp_entity = _build_role_assignment_url_and_entity( project_id=self.project_id, group_id=self.group_id, role_id=self.role_id) self.put(gp_url) @@ -742,7 +753,7 @@ class IdentityTestCase(test_v3.RestfulTestCase): existing_assignments + 3) self.assertRoleAssignmentInListResponse(r, gp_entity, link_url=gp_url) - up_url, up_entity = self._build_role_assignment_url_and_entity( + up_url, up_entity = _build_role_assignment_url_and_entity( project_id=self.project_id, user_id=self.user1['id'], role_id=self.role_id) self.put(up_url) @@ -798,7 +809,7 @@ class IdentityTestCase(test_v3.RestfulTestCase): self.assertIn(collection_url, r.result['links']['self']) existing_assignments = len(r.result.get('role_assignments')) - gd_url, gd_entity = self._build_role_assignment_url_and_entity( + gd_url, gd_entity = _build_role_assignment_url_and_entity( domain_id=self.domain_id, group_id=self.group_id, role_id=self.role_id) self.put(gd_url) @@ -816,11 +827,11 @@ class IdentityTestCase(test_v3.RestfulTestCase): self.assertValidRoleAssignmentListResponse(r) self.assertEqual(len(r.result.get('role_assignments')), existing_assignments + 2) - ud_url, ud_entity = self._build_role_assignment_url_and_entity( + ud_url, ud_entity = _build_role_assignment_url_and_entity( domain_id=self.domain_id, user_id=self.user1['id'], role_id=self.role_id) self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url) - ud_url, ud_entity = self._build_role_assignment_url_and_entity( + ud_url, ud_entity = _build_role_assignment_url_and_entity( domain_id=self.domain_id, user_id=self.user2['id'], role_id=self.role_id) self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url) @@ -866,7 +877,7 @@ class IdentityTestCase(test_v3.RestfulTestCase): self.assertValidRoleAssignmentListResponse(r) existing_assignments = len(r.result.get('role_assignments')) - gd_url, gd_entity = self._build_role_assignment_url_and_entity( + gd_url, gd_entity = _build_role_assignment_url_and_entity( domain_id=self.domain_id, group_id=self.group_id, role_id=self.role_id) self.put(gd_url) @@ -954,22 +965,22 @@ class IdentityTestCase(test_v3.RestfulTestCase): # Now add one of each of the four types of assignment - gd_url, gd_entity = self._build_role_assignment_url_and_entity( + gd_url, gd_entity = _build_role_assignment_url_and_entity( domain_id=self.domain_id, group_id=self.group1['id'], role_id=self.role1['id']) self.put(gd_url) - ud_url, ud_entity = self._build_role_assignment_url_and_entity( + ud_url, ud_entity = _build_role_assignment_url_and_entity( domain_id=self.domain_id, user_id=self.user1['id'], role_id=self.role2['id']) self.put(ud_url) - gp_url, gp_entity = self._build_role_assignment_url_and_entity( + gp_url, gp_entity = _build_role_assignment_url_and_entity( project_id=self.project1['id'], group_id=self.group1['id'], role_id=self.role1['id']) self.put(gp_url) - up_url, up_entity = self._build_role_assignment_url_and_entity( + up_url, up_entity = _build_role_assignment_url_and_entity( project_id=self.project1['id'], user_id=self.user1['id'], role_id=self.role2['id']) self.put(up_url) @@ -1038,10 +1049,10 @@ class IdentityTestCase(test_v3.RestfulTestCase): self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url) self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url) # ...and the two via group membership... - up1_url, up1_entity = self._build_role_assignment_url_and_entity( + up1_url, up1_entity = _build_role_assignment_url_and_entity( project_id=self.project1['id'], user_id=self.user1['id'], role_id=self.role1['id']) - ud1_url, ud1_entity = self._build_role_assignment_url_and_entity( + ud1_url, ud1_entity = _build_role_assignment_url_and_entity( domain_id=self.domain_id, user_id=self.user1['id'], role_id=self.role1['id']) self.assertRoleAssignmentInListResponse(r, up1_entity, @@ -1062,9 +1073,459 @@ class IdentityTestCase(test_v3.RestfulTestCase): self.assertValidRoleAssignmentListResponse(r) self.assertEqual(len(r.result.get('role_assignments')), 2) # Should have one direct role and one from group membership... - up1_url, up1_entity = self._build_role_assignment_url_and_entity( + up1_url, up1_entity = _build_role_assignment_url_and_entity( project_id=self.project1['id'], user_id=self.user1['id'], role_id=self.role1['id']) self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url) self.assertRoleAssignmentInListResponse(r, up1_entity, link_url=up1_url) + + +class IdentityIneritanceTestCase(test_v3.RestfulTestCase): + """Test inheritance crud and its effects.""" + + def setUp(self): + self.orig_extension_enablement = config.CONF.os_inherit.enabled + self.opt_in_group('os_inherit', enabled=True) + super(IdentityIneritanceTestCase, self).setUp() + + def tearDown(self): + super(IdentityIneritanceTestCase, self).tearDown() + self.opt_in_group('os_inherit', enabled=self.orig_extension_enablement) + + def test_crud_user_inherited_domain_role_grants(self): + role_list = [] + for _ in range(2): + role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.assignment_api.create_role(role['id'], role) + role_list.append(role) + + # Create a non-inherited role as a spoiler + self.assignment_api.create_grant( + role_list[1]['id'], user_id=self.user['id'], + domain_id=self.domain_id) + + base_collection_url = ( + '/OS-INHERIT/domains/%(domain_id)s/users/%(user_id)s/roles' % { + 'domain_id': self.domain_id, + 'user_id': self.user['id']}) + member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % { + 'collection_url': base_collection_url, + 'role_id': role_list[0]['id']} + collection_url = base_collection_url + '/inherited_to_projects' + + self.put(member_url) + + # Check we can read it back + self.head(member_url) + r = self.get(collection_url) + self.assertValidRoleListResponse(r, ref=role_list[0]) + self.assertIn(collection_url, r.result['links']['self']) + + # Now delete and check its gone + self.delete(member_url) + r = self.get(collection_url) + self.assertValidRoleListResponse(r, expected_length=0) + self.assertIn(collection_url, r.result['links']['self']) + + def test_crud_inherited_role_grants_failed_if_disabled(self): + # Disable the extension and check no API calls can be issued + self.opt_in_group('os_inherit', enabled=False) + super(IdentityIneritanceTestCase, self).setUp() + + role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.assignment_api.create_role(role['id'], role) + + base_collection_url = ( + '/OS-INHERIT/domains/%(domain_id)s/users/%(user_id)s/roles' % { + 'domain_id': self.domain_id, + 'user_id': self.user['id']}) + member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % { + 'collection_url': base_collection_url, + 'role_id': role['id']} + collection_url = base_collection_url + '/inherited_to_projects' + + self.put(member_url, expected_status=404) + self.head(member_url, expected_status=404) + self.get(collection_url, expected_status=404) + self.delete(member_url, expected_status=404) + + def test_list_role_assignments_for_inherited_domain_grants(self): + """Call ``GET /role_assignments with inherited domain grants``. + + Test Plan: + - Create 4 roles + - Create a domain with a user and two projects + - Assign two direct roles to project1 + - Assign a spoiler role to project2 + - Issue the URL to add inherited role to the domain + - Issue the URL to check it is indeed on the domain + - Issue the URL to check effective roles on project1 - this + should return 3 roles. + + """ + role_list = [] + for _ in range(4): + role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.assignment_api.create_role(role['id'], role) + role_list.append(role) + + domain = self.new_domain_ref() + self.identity_api.create_domain(domain['id'], domain) + user1 = self.new_user_ref( + domain_id=domain['id']) + user1['password'] = uuid.uuid4().hex + self.identity_api.create_user(user1['id'], user1) + project1 = self.new_project_ref( + domain_id=domain['id']) + self.assignment_api.create_project(project1['id'], project1) + project2 = self.new_project_ref( + domain_id=domain['id']) + self.assignment_api.create_project(project2['id'], project2) + # Add some roles to the project + self.assignment_api.add_role_to_user_and_project( + user1['id'], project1['id'], role_list[0]['id']) + self.assignment_api.add_role_to_user_and_project( + user1['id'], project1['id'], role_list[1]['id']) + # ..and one on a different project as a spoiler + self.assignment_api.add_role_to_user_and_project( + user1['id'], project2['id'], role_list[2]['id']) + + # Now create our inherited role on the domain + base_collection_url = ( + '/OS-INHERIT/domains/%(domain_id)s/users/%(user_id)s/roles' % { + 'domain_id': domain['id'], + 'user_id': user1['id']}) + member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % { + 'collection_url': base_collection_url, + 'role_id': role_list[3]['id']} + collection_url = base_collection_url + '/inherited_to_projects' + + self.put(member_url) + self.head(member_url) + r = self.get(collection_url) + self.assertValidRoleListResponse(r, ref=role_list[3]) + self.assertIn(collection_url, r.result['links']['self']) + + # Now use the list domain role assignments api to check if this + # is included + collection_url = ( + '/role_assignments?user.id=%(user_id)s' + '&scope.domain.id=%(domain_id)s' % { + 'user_id': user1['id'], + 'domain_id': domain['id']}) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), 1) + ud_url, ud_entity = _build_role_assignment_url_and_entity( + domain_id=domain['id'], user_id=user1['id'], + role_id=role_list[3]['id'], inherited_to_projects=True) + self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url) + + # Now ask for effective list role assignments - the role should + # turn into a project role, along with the two direct roles that are + # on the project + collection_url = ( + '/role_assignments?effective&user.id=%(user_id)s' + '&scope.project.id=%(project_id)s' % { + 'user_id': user1['id'], + 'project_id': project1['id']}) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), 3) + # An effective role for an inherited role will be a project + # entity, with a domain link to the inherited assignment + unused, up_entity = _build_role_assignment_url_and_entity( + project_id=project1['id'], user_id=user1['id'], + role_id=role_list[3]['id']) + ud_url, unused = _build_role_assignment_url_and_entity( + domain_id=domain['id'], user_id=user1['id'], + role_id=role_list[3]['id'], inherited_to_projects=True) + self.assertRoleAssignmentInListResponse(r, up_entity, link_url=ud_url) + + def test_list_role_assignments_for_disabled_inheritance_extension(self): + """Call ``GET /role_assignments with inherited domain grants``. + + Test Plan: + - Issue the URL to add inherited role to the domain + - Issue the URL to check effective roles on project include the + inherited role + - Disable the extension + - Re-check the effective roles, proving the inherited role no longer + shows up. + + """ + + role_list = [] + for _ in range(4): + role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.assignment_api.create_role(role['id'], role) + role_list.append(role) + + domain = self.new_domain_ref() + self.identity_api.create_domain(domain['id'], domain) + user1 = self.new_user_ref( + domain_id=domain['id']) + user1['password'] = uuid.uuid4().hex + self.identity_api.create_user(user1['id'], user1) + project1 = self.new_project_ref( + domain_id=domain['id']) + self.assignment_api.create_project(project1['id'], project1) + project2 = self.new_project_ref( + domain_id=domain['id']) + self.assignment_api.create_project(project2['id'], project2) + # Add some roles to the project + self.assignment_api.add_role_to_user_and_project( + user1['id'], project1['id'], role_list[0]['id']) + self.assignment_api.add_role_to_user_and_project( + user1['id'], project1['id'], role_list[1]['id']) + # ..and one on a different project as a spoiler + self.assignment_api.add_role_to_user_and_project( + user1['id'], project2['id'], role_list[2]['id']) + + # Now create our inherited role on the domain + base_collection_url = ( + '/OS-INHERIT/domains/%(domain_id)s/users/%(user_id)s/roles' % { + 'domain_id': domain['id'], + 'user_id': user1['id']}) + member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % { + 'collection_url': base_collection_url, + 'role_id': role_list[3]['id']} + collection_url = base_collection_url + '/inherited_to_projects' + + self.put(member_url) + self.head(member_url) + r = self.get(collection_url) + self.assertValidRoleListResponse(r, ref=role_list[3]) + self.assertIn(collection_url, r.result['links']['self']) + + # Get effective list role assignments - the role should + # turn into a project role, along with the two direct roles that are + # on the project + collection_url = ( + '/role_assignments?effective&user.id=%(user_id)s' + '&scope.project.id=%(project_id)s' % { + 'user_id': user1['id'], + 'project_id': project1['id']}) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), 3) + + unused, up_entity = _build_role_assignment_url_and_entity( + project_id=project1['id'], user_id=user1['id'], + role_id=role_list[3]['id']) + ud_url, unused = _build_role_assignment_url_and_entity( + domain_id=domain['id'], user_id=user1['id'], + role_id=role_list[3]['id'], inherited_to_projects=True) + self.assertRoleAssignmentInListResponse(r, up_entity, link_url=ud_url) + + # Disable the extension and re-check the list, the role inherited + # from the project should no longer show up + self.opt_in_group('os_inherit', enabled=False) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), 2) + + unused, up_entity = _build_role_assignment_url_and_entity( + project_id=project1['id'], user_id=user1['id'], + role_id=role_list[3]['id']) + ud_url, unused = _build_role_assignment_url_and_entity( + domain_id=domain['id'], user_id=user1['id'], + role_id=role_list[3]['id'], inherited_to_projects=True) + self.assertRoleAssignmentNotInListResponse(r, up_entity, + link_url=ud_url) + + def test_list_role_assignments_for_inherited_group_domain_grants(self): + """Call ``GET /role_assignments with inherited group domain grants``. + + Test Plan: + - Create 4 roles + - Create a domain with a user and two projects + - Assign two direct roles to project1 + - Assign a spoiler role to project2 + - Issue the URL to add inherited role to the domain + - Issue the URL to check it is indeed on the domain + - Issue the URL to check effective roles on project1 - this + should return 3 roles. + + """ + role_list = [] + for _ in range(4): + role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.assignment_api.create_role(role['id'], role) + role_list.append(role) + + domain = self.new_domain_ref() + self.identity_api.create_domain(domain['id'], domain) + user1 = self.new_user_ref( + domain_id=domain['id']) + user1['password'] = uuid.uuid4().hex + self.identity_api.create_user(user1['id'], user1) + user2 = self.new_user_ref( + domain_id=domain['id']) + user2['password'] = uuid.uuid4().hex + self.identity_api.create_user(user2['id'], user2) + group1 = self.new_group_ref( + domain_id=domain['id']) + self.identity_api.create_group(group1['id'], group1) + self.identity_api.add_user_to_group(user1['id'], + group1['id']) + self.identity_api.add_user_to_group(user2['id'], + group1['id']) + project1 = self.new_project_ref( + domain_id=domain['id']) + self.assignment_api.create_project(project1['id'], project1) + project2 = self.new_project_ref( + domain_id=domain['id']) + self.assignment_api.create_project(project2['id'], project2) + # Add some roles to the project + self.assignment_api.add_role_to_user_and_project( + user1['id'], project1['id'], role_list[0]['id']) + self.assignment_api.add_role_to_user_and_project( + user1['id'], project1['id'], role_list[1]['id']) + # ..and one on a different project as a spoiler + self.assignment_api.add_role_to_user_and_project( + user1['id'], project2['id'], role_list[2]['id']) + + # Now create our inherited role on the domain + base_collection_url = ( + '/OS-INHERIT/domains/%(domain_id)s/groups/%(group_id)s/roles' % { + 'domain_id': domain['id'], + 'group_id': group1['id']}) + member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % { + 'collection_url': base_collection_url, + 'role_id': role_list[3]['id']} + collection_url = base_collection_url + '/inherited_to_projects' + + self.put(member_url) + self.head(member_url) + r = self.get(collection_url) + self.assertValidRoleListResponse(r, ref=role_list[3]) + self.assertIn(collection_url, r.result['links']['self']) + + # Now use the list domain role assignments api to check if this + # is included + collection_url = ( + '/role_assignments?group.id=%(group_id)s' + '&scope.domain.id=%(domain_id)s' % { + 'group_id': group1['id'], + 'domain_id': domain['id']}) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), 1) + gd_url, gd_entity = _build_role_assignment_url_and_entity( + domain_id=domain['id'], group_id=group1['id'], + role_id=role_list[3]['id'], inherited_to_projects=True) + self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url) + + # Now ask for effective list role assignments - the role should + # turn into a user project role, along with the two direct roles + # that are on the project + collection_url = ( + '/role_assignments?effective&user.id=%(user_id)s' + '&scope.project.id=%(project_id)s' % { + 'user_id': user1['id'], + 'project_id': project1['id']}) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), 3) + # An effective role for an inherited role will be a project + # entity, with a domain link to the inherited assignment + unused, up_entity = _build_role_assignment_url_and_entity( + project_id=project1['id'], user_id=user1['id'], + role_id=role_list[3]['id']) + gd_url, unused = _build_role_assignment_url_and_entity( + domain_id=domain['id'], group_id=group1['id'], + role_id=role_list[3]['id'], inherited_to_projects=True) + self.assertRoleAssignmentInListResponse(r, up_entity, link_url=gd_url) + + def test_filtered_role_assignments_for_inherited_grants(self): + """Call ``GET /role_assignments?scope.OS-INHERIT:inherited_to``. + + Test Plan: + - Create 5 roles + - Create a domain with a user, group and two projects + - Assign three direct spoiler roles to projects + - Issue the URL to add an inherited user role to the domain + - Issue the URL to add an inherited group role to the domain + - Issue the URL to filter by inherited roles - this should + return just the 2 inherited roles. + + """ + role_list = [] + for _ in range(5): + role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.assignment_api.create_role(role['id'], role) + role_list.append(role) + + domain = self.new_domain_ref() + self.identity_api.create_domain(domain['id'], domain) + user1 = self.new_user_ref( + domain_id=domain['id']) + user1['password'] = uuid.uuid4().hex + self.identity_api.create_user(user1['id'], user1) + group1 = self.new_group_ref( + domain_id=domain['id']) + self.identity_api.create_group(group1['id'], group1) + project1 = self.new_project_ref( + domain_id=domain['id']) + self.assignment_api.create_project(project1['id'], project1) + project2 = self.new_project_ref( + domain_id=domain['id']) + self.assignment_api.create_project(project2['id'], project2) + # Add some spoiler roles to the projects + self.assignment_api.add_role_to_user_and_project( + user1['id'], project1['id'], role_list[0]['id']) + self.assignment_api.add_role_to_user_and_project( + user1['id'], project2['id'], role_list[1]['id']) + # Create a non-inherited role as a spoiler + self.assignment_api.create_grant( + role_list[2]['id'], user_id=user1['id'], domain_id=domain['id']) + + # Now create two inherited roles on the domain, one for a user + # and one for a domain + base_collection_url = ( + '/OS-INHERIT/domains/%(domain_id)s/users/%(user_id)s/roles' % { + 'domain_id': domain['id'], + 'user_id': user1['id']}) + member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % { + 'collection_url': base_collection_url, + 'role_id': role_list[3]['id']} + collection_url = base_collection_url + '/inherited_to_projects' + + self.put(member_url) + self.head(member_url) + r = self.get(collection_url) + self.assertValidRoleListResponse(r, ref=role_list[3]) + self.assertIn(collection_url, r.result['links']['self']) + + base_collection_url = ( + '/OS-INHERIT/domains/%(domain_id)s/groups/%(group_id)s/roles' % { + 'domain_id': domain['id'], + 'group_id': group1['id']}) + member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % { + 'collection_url': base_collection_url, + 'role_id': role_list[4]['id']} + collection_url = base_collection_url + '/inherited_to_projects' + + self.put(member_url) + self.head(member_url) + r = self.get(collection_url) + self.assertValidRoleListResponse(r, ref=role_list[4]) + self.assertIn(collection_url, r.result['links']['self']) + + # Now use the list role assignments api to get a list of inherited + # roles on the domain - should get back the two roles + collection_url = ( + '/role_assignments?scope.OS-INHERIT:inherited_to=projects') + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), 2) + ud_url, ud_entity = _build_role_assignment_url_and_entity( + domain_id=domain['id'], user_id=user1['id'], + role_id=role_list[3]['id'], inherited_to_projects=True) + gd_url, gd_entity = _build_role_assignment_url_and_entity( + domain_id=domain['id'], group_id=group1['id'], + role_id=role_list[4]['id'], inherited_to_projects=True) + self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url) + self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url) |