From 4be48868ef9b34e90e8e6028201bc3b0ac569c3d Mon Sep 17 00:00:00 2001 From: Adam Young Date: Fri, 28 Jun 2013 18:34:25 -0400 Subject: Mixed LDAP/SQL Backend. Supports the configuration where LDAP is used for identity and SQL is used for assignment. blueprint split-identity Change-Id: Ib91b5d804282b7f78fc2458ff64653bbf2cf5d9e --- keystone/assignment/backends/ldap.py | 44 +--- keystone/assignment/backends/sql.py | 40 +-- keystone/assignment/core.py | 42 ++++ keystone/identity/backends/ldap.py | 10 +- keystone/identity/core.py | 9 - tests/backend_ldap_sql.conf | 36 +++ tests/default_fixtures.py | 11 +- tests/test_backend_ldap.py | 468 ++++++++++++++++++----------------- tests/test_drivers.py | 5 + 9 files changed, 358 insertions(+), 307 deletions(-) create mode 100644 tests/backend_ldap_sql.conf diff --git a/keystone/assignment/backends/ldap.py b/keystone/assignment/backends/ldap.py index 44a479bf..09539c9f 100644 --- a/keystone/assignment/backends/ldap.py +++ b/keystone/assignment/backends/ldap.py @@ -76,41 +76,11 @@ class Assignment(assignment.Driver): return self._set_default_domain(self.project.get_all()) def get_project_by_name(self, tenant_name, domain_id): - self._validate_domain_id(domain_id) + self._validate_default_domain_id(domain_id) return self._set_default_domain(self.project.get_by_name(tenant_name)) - def _validate_domain(self, ref): - """Validate that either the default domain or nothing is specified. - - Also removes the domain from the ref so that LDAP doesn't have to - persist the attribute. - - """ - ref = ref.copy() - domain_id = ref.pop('domain_id', CONF.identity.default_domain_id) - self._validate_domain_id(domain_id) - return ref - - def _validate_domain_id(self, domain_id): - """Validate that the domain ID specified belongs to the default domain. - - """ - if domain_id != CONF.identity.default_domain_id: - raise exception.DomainNotFound(domain_id=domain_id) - - def _set_default_domain(self, ref): - """Overrides any domain reference with the default domain.""" - if isinstance(ref, dict): - ref = ref.copy() - ref['domain_id'] = CONF.identity.default_domain_id - return ref - elif isinstance(ref, list): - return [self._set_default_domain(x) for x in ref] - else: - raise ValueError(_('Expected dict or list: %s') % type(ref)) - def create_project(self, tenant_id, tenant): - tenant = self._validate_domain(tenant) + tenant = self._validate_default_domain(tenant) tenant['name'] = clean.project_name(tenant['name']) data = tenant.copy() if 'id' not in data or data['id'] is None: @@ -120,7 +90,7 @@ class Assignment(assignment.Driver): return self._set_default_domain(self.project.create(data)) def update_project(self, tenant_id, tenant): - tenant = self._validate_domain(tenant) + tenant = self._validate_default_domain(tenant) if 'name' in tenant: tenant['name'] = clean.project_name(tenant['name']) return self._set_default_domain(self.project.update(tenant_id, tenant)) @@ -244,19 +214,19 @@ class Assignment(assignment.Driver): raise exception.Forbidden('Domains are read-only against LDAP') def get_domain(self, domain_id): - self._validate_domain_id(domain_id) + self._validate_default_domain_id(domain_id) return DEFAULT_DOMAIN def update_domain(self, domain_id, domain): - self._validate_domain_id(domain_id) + self._validate_default_domain_id(domain_id) raise exception.Forbidden('Domains are read-only against LDAP') def delete_domain(self, domain_id): - self._validate_domain_id(domain_id) + self._validate_default_domain_id(domain_id) raise exception.Forbidden('Domains are read-only against LDAP') def list_domains(self): - return [DEFAULT_DOMAIN] + return [assignment.DEFAULT_DOMAIN] #Bulk actions on User From identity def delete_user(self, user_id): diff --git a/keystone/assignment/backends/sql.py b/keystone/assignment/backends/sql.py index 57ca7834..237330ce 100644 --- a/keystone/assignment/backends/sql.py +++ b/keystone/assignment/backends/sql.py @@ -97,12 +97,14 @@ class Assignment(sql.Base, assignment.Driver): def create_grant(self, role_id, user_id=None, group_id=None, domain_id=None, project_id=None): - session = self.get_session() - self._get_role(session, role_id) if user_id: - self.identity_api._get_user(session, user_id) + self.identity_api.get_user(user_id) if group_id: - self.identity_api._get_group(session, group_id) + self.identity_api.get_group(group_id) + + session = self.get_session() + self._get_role(session, role_id) + if domain_id: self._get_domain(session, domain_id) if project_id: @@ -127,11 +129,11 @@ class Assignment(sql.Base, assignment.Driver): def list_grants(self, user_id=None, group_id=None, domain_id=None, project_id=None): - session = self.get_session() if user_id: - self.identity_api._get_user(session, user_id) + self.identity_api.get_user(user_id) if group_id: - self.identity_api._get_group(session, group_id) + self.identity_api.get_group(group_id) + session = self.get_session() if domain_id: self._get_domain(session, domain_id) if project_id: @@ -146,12 +148,14 @@ class Assignment(sql.Base, assignment.Driver): def get_grant(self, role_id, user_id=None, group_id=None, domain_id=None, project_id=None): - session = self.get_session() - role_ref = self._get_role(session, role_id) if user_id: - self.identity_api._get_user(session, user_id) + self.identity_api.get_user(user_id) if group_id: - self.identity_api._get_group(session, group_id) + self.identity_api.get_group(group_id) + + session = self.get_session() + role_ref = self._get_role(session, role_id) + if domain_id: self._get_domain(session, domain_id) if project_id: @@ -169,12 +173,14 @@ class Assignment(sql.Base, assignment.Driver): def delete_grant(self, role_id, user_id=None, group_id=None, domain_id=None, project_id=None): - session = self.get_session() - self._get_role(session, role_id) if user_id: - self.identity_api._get_user(session, user_id) + self.identity_api.get_user(user_id) if group_id: - self.identity_api._get_group(session, group_id) + self.identity_api.get_group(group_id) + + session = self.get_session() + self._get_role(session, role_id) + if domain_id: self._get_domain(session, domain_id) if project_id: @@ -206,16 +212,16 @@ class Assignment(sql.Base, assignment.Driver): return [tenant_ref.to_dict() for tenant_ref in tenant_refs] def get_projects_for_user(self, user_id): + self.identity_api.get_user(user_id) session = self.get_session() - self.identity_api._get_user(session, user_id) query = session.query(UserProjectGrant) query = query.filter_by(user_id=user_id) membership_refs = query.all() return [x.project_id for x in membership_refs] def add_role_to_user_and_project(self, user_id, tenant_id, role_id): + self.identity_api.get_user(user_id) session = self.get_session() - self.identity_api._get_user(session, user_id) self._get_project(session, tenant_id) self._get_role(session, role_id) try: diff --git a/keystone/assignment/core.py b/keystone/assignment/core.py index 879fee0b..531da02e 100644 --- a/keystone/assignment/core.py +++ b/keystone/assignment/core.py @@ -26,6 +26,13 @@ from keystone import exception CONF = config.CONF LOG = logging.getLogger(__name__) +DEFAULT_DOMAIN = {'description': + (u'Owns users and tenants (i.e. projects)' + ' available on Identity API v2.'), + 'enabled': True, + 'id': CONF.identity.default_domain_id, + 'name': u'Default'} + @dependency.provider('assignment_api') class Manager(manager.Manager): @@ -424,3 +431,38 @@ class Driver(object): :raises: keystone.exception.RoleNotFound """ + raise exception.NotImplemented() + + #domain management functions for backends that only allow a single domain. + #currently, this is only LDAP, but might be used by PAM or other backends + #as well. This is used by both identity and assignment drivers. + def _set_default_domain(self, ref): + """If the domain ID has not been set, set it to the default.""" + if isinstance(ref, dict): + if 'domain_id' not in ref: + ref = ref.copy() + ref['domain_id'] = CONF.identity.default_domain_id + return ref + elif isinstance(ref, list): + return [self._set_default_domain(x) for x in ref] + else: + raise ValueError(_('Expected dict or list: %s') % type(ref)) + + def _validate_default_domain(self, ref): + """Validate that either the default domain or nothing is specified. + + Also removes the domain from the ref so that LDAP doesn't have to + persist the attribute. + + """ + ref = ref.copy() + domain_id = ref.pop('domain_id', CONF.identity.default_domain_id) + self._validate_default_domain_id(domain_id) + return ref + + def _validate_default_domain_id(self, domain_id): + """Validate that the domain ID specified belongs to the default domain. + + """ + if domain_id != CONF.identity.default_domain_id: + raise exception.DomainNotFound(domain_id=domain_id) diff --git a/keystone/identity/backends/ldap.py b/keystone/identity/backends/ldap.py index de447b60..c06737c8 100644 --- a/keystone/identity/backends/ldap.py +++ b/keystone/identity/backends/ldap.py @@ -88,13 +88,13 @@ class Identity(identity.Driver): return self.assignment._set_default_domain(self.user.get_all()) def get_user_by_name(self, user_name, domain_id): - self.assignment._validate_domain_id(domain_id) + self.assignment._validate_default_domain_id(domain_id) ref = identity.filter_user(self.user.get_by_name(user_name)) return self.assignment._set_default_domain(ref) # CRUD def create_user(self, user_id, user): - user = self.assignment._validate_domain(user) + user = self.assignment._validate_default_domain(user) user_ref = self.user.create(user) tenant_id = user.get('tenant_id') if tenant_id is not None: @@ -103,7 +103,7 @@ class Identity(identity.Driver): (identity.filter_user(user_ref))) def update_user(self, user_id, user): - user = self.assignment._validate_domain(user) + user = self.assignment._validate_default_domain(user) if 'id' in user and user['id'] != user_id: raise exception.ValidationError('Cannot change user ID') old_obj = self.user.get(user_id) @@ -143,7 +143,7 @@ class Identity(identity.Driver): self.user.delete(user_id) def create_group(self, group_id, group): - group = self.assignment._validate_domain(group) + group = self.assignment._validate_default_domain(group) group['name'] = clean.group_name(group['name']) return self.assignment._set_default_domain(self.group.create(group)) @@ -151,7 +151,7 @@ class Identity(identity.Driver): return self.assignment._set_default_domain(self.group.get(group_id)) def update_group(self, group_id, group): - group = self.assignment._validate_domain(group) + group = self.assignment._validate_default_domain(group) if 'name' in group: group['name'] = clean.group_name(group['name']) return (self.assignment._set_default_domain diff --git a/keystone/identity/core.py b/keystone/identity/core.py index f725589f..981d24ed 100644 --- a/keystone/identity/core.py +++ b/keystone/identity/core.py @@ -110,15 +110,6 @@ class Manager(manager.Manager): def list_projects(self): return self.assignment.list_projects() - def _validate_domain(self, ref): - return self.assignment._validate_domain(ref) - - def _validate_domain_id(self, domain_id): - return self.assignment._validate_domain_id(domain_id) - - def _set_default_domain(self, ref): - return self.assignment._set_default_domain(ref) - def get_role(self, role_id): return self.assignment.get_role(role_id) diff --git a/tests/backend_ldap_sql.conf b/tests/backend_ldap_sql.conf new file mode 100644 index 00000000..8dcfa40d --- /dev/null +++ b/tests/backend_ldap_sql.conf @@ -0,0 +1,36 @@ +[sql] +connection = sqlite:// +#For a file based sqlite use +#connection = sqlite:////tmp/keystone.db +#To Test MySQL: +#connection = mysql://keystone:keystone@localhost/keystone?charset=utf8 +#To Test PostgreSQL: +#connection = postgresql://keystone:keystone@localhost/keystone?client_encoding=utf8 +idle_timeout = 200 + +[ldap] +url = fake://memory +user = cn=Admin +password = password +suffix = cn=example,cn=com + +[identity] +driver = keystone.identity.backends.ldap.Identity + +[assignment] +driver = keystone.assignment.backends.sql.Assignment + +[token] +driver = keystone.token.backends.sql.Token + +[ec2] +driver = keystone.contrib.ec2.backends.sql.Ec2 + +[catalog] +driver = keystone.catalog.backends.sql.Catalog + +[policy] +driver = keystone.policy.backends.sql.Policy + +[trust] +driver = keystone.trust.backends.sql.Trust diff --git a/tests/default_fixtures.py b/tests/default_fixtures.py index a1329c22..2695da88 100644 --- a/tests/default_fixtures.py +++ b/tests/default_fixtures.py @@ -17,6 +17,7 @@ # NOTE(dolph): please try to avoid additional fixtures if possible; test suite # performance may be negatively affected. +from keystone import assignment from keystone import config @@ -120,12 +121,4 @@ ROLES = [ } ] -DOMAINS = [ - { - 'id': DEFAULT_DOMAIN_ID, - 'name': 'Default', - 'enabled': True, - 'description': 'Owns users and tenants (i.e. projects) available ' - 'on Identity API v2.' - } -] +DOMAINS = [assignment.DEFAULT_DOMAIN] diff --git a/tests/test_backend_ldap.py b/tests/test_backend_ldap.py index c9a8a4ed..b68399ba 100644 --- a/tests/test_backend_ldap.py +++ b/tests/test_backend_ldap.py @@ -19,12 +19,13 @@ import uuid import nose.exc -from keystone import test - +from keystone import assignment from keystone.common.ldap import fakeldap +from keystone.common import sql from keystone import config from keystone import exception from keystone import identity +from keystone import test import default_fixtures import test_backend @@ -33,7 +34,7 @@ import test_backend CONF = config.CONF -class LDAPIdentity(test.TestCase, test_backend.IdentityTests): +class BaseLDAPIdentity(test_backend.IdentityTests): def _get_domain_fixture(self): """Domains in LDAP are read-only, so just return the static one.""" return self.identity_api.get_domain(CONF.identity.default_domain_id) @@ -47,14 +48,6 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests): test.testsdir('test_overrides.conf'), test.testsdir('backend_ldap.conf')]) - def setUp(self): - super(LDAPIdentity, self).setUp() - self._set_config() - self.clear_database() - - self.load_backends() - self.load_fixtures(default_fixtures) - def test_build_tree(self): """Regression test for building the tree names """ @@ -104,6 +97,202 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests): self.identity_api.delete_user, self.user_foo['id']) + def test_user_filter(self): + user_ref = self.identity_api.get_user(self.user_foo['id']) + self.user_foo.pop('password') + self.assertDictEqual(user_ref, self.user_foo) + + CONF.ldap.user_filter = '(CN=DOES_NOT_MATCH)' + self.load_backends() + self.assertRaises(exception.UserNotFound, + self.identity_api.get_user, + self.user_foo['id']) + + def test_get_role_grant_by_user_and_project(self): + raise nose.exc.SkipTest('Blocked by bug 1101287') + + def test_get_role_grants_for_user_and_project_404(self): + raise nose.exc.SkipTest('Blocked by bug 1101287') + + def test_add_role_grant_to_user_and_project_404(self): + raise nose.exc.SkipTest('Blocked by bug 1101287') + + def test_remove_role_grant_from_user_and_project(self): + raise nose.exc.SkipTest('Blocked by bug 1101287') + + def test_get_and_remove_role_grant_by_group_and_project(self): + raise nose.exc.SkipTest('Blocked by bug 1101287') + + def test_get_and_remove_role_grant_by_group_and_domain(self): + raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains') + + def test_get_and_remove_role_grant_by_user_and_domain(self): + raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains') + + def test_get_and_remove_correct_role_grant_from_a_mix(self): + raise nose.exc.SkipTest('Blocked by bug 1101287') + + def test_get_and_remove_role_grant_by_group_and_cross_domain(self): + raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains') + + def test_get_and_remove_role_grant_by_user_and_cross_domain(self): + raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains') + + def test_role_grant_by_group_and_cross_domain_project(self): + raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains') + + def test_role_grant_by_user_and_cross_domain_project(self): + raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains') + + def test_multi_role_grant_by_user_group_on_project_domain(self): + raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains') + + def test_delete_role_with_user_and_group_grants(self): + raise nose.exc.SkipTest('Blocked by bug 1101287') + + def test_delete_user_with_group_project_domain_links(self): + raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains') + + def test_delete_group_with_user_project_domain_links(self): + raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains') + + def test_list_user_projects(self): + raise nose.exc.SkipTest('Blocked by bug 1101287') + + def test_create_duplicate_user_name_in_different_domains(self): + raise nose.exc.SkipTest('Blocked by bug 1101276') + + def test_create_duplicate_project_name_in_different_domains(self): + raise nose.exc.SkipTest('Blocked by bug 1101276') + + def test_create_duplicate_group_name_in_different_domains(self): + raise nose.exc.SkipTest( + 'N/A: LDAP does not support multiple domains') + + def test_move_user_between_domains(self): + raise nose.exc.SkipTest('Blocked by bug 1101276') + + def test_move_user_between_domains_with_clashing_names_fails(self): + raise nose.exc.SkipTest('Blocked by bug 1101276') + + def test_move_group_between_domains(self): + raise nose.exc.SkipTest( + 'N/A: LDAP does not support multiple domains') + + def test_move_group_between_domains_with_clashing_names_fails(self): + raise nose.exc.SkipTest('Blocked by bug 1101276') + + def test_move_project_between_domains(self): + raise nose.exc.SkipTest('Blocked by bug 1101276') + + def test_move_project_between_domains_with_clashing_names_fails(self): + raise nose.exc.SkipTest('Blocked by bug 1101276') + + def test_get_roles_for_user_and_domain(self): + raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains') + + def test_list_role_assignments_unfiltered(self): + raise nose.exc.SkipTest('Blocked by bug 1195019') + + def test_multi_group_grants_on_project_domain(self): + raise nose.exc.SkipTest('Blocked by bug 1101287') + + def test_list_group_members_missing_entry(self): + """List group members with deleted user. + + If a group has a deleted entry for a member, the non-deleted members + are returned. + + """ + + # Create a group + group_id = None + group = dict(name=uuid.uuid4().hex) + group_id = self.identity_api.create_group(group_id, group)['id'] + + # Create a couple of users and add them to the group. + user_id = None + user = dict(name=uuid.uuid4().hex, id=uuid.uuid4().hex) + user_1_id = self.identity_api.create_user(user_id, user)['id'] + + self.identity_api.add_user_to_group(user_1_id, group_id) + + user_id = None + user = dict(name=uuid.uuid4().hex, id=uuid.uuid4().hex) + user_2_id = self.identity_api.create_user(user_id, user)['id'] + + self.identity_api.add_user_to_group(user_2_id, group_id) + + # Delete user 2 + # NOTE(blk-u): need to go directly to user interface to keep from + # updating the group. + self.identity_api.driver.user.delete(user_2_id) + + # List group users and verify only user 1. + res = self.identity_api.list_users_in_group(group_id) + + self.assertEqual(len(res), 1, "Expected 1 entry (user_1)") + self.assertEqual(res[0]['id'], user_1_id, "Expected user 1 id") + + def test_list_domains(self): + domains = self.identity_api.list_domains() + self.assertEquals( + domains, + [assignment.DEFAULT_DOMAIN]) + + def test_authenticate_requires_simple_bind(self): + user = { + 'id': 'no_meta', + 'name': 'NO_META', + 'domain_id': test_backend.DEFAULT_DOMAIN_ID, + 'password': 'no_meta2', + 'enabled': True, + } + self.identity_api.create_user(user['id'], user) + self.identity_api.add_user_to_project(self.tenant_baz['id'], + user['id']) + self.identity_api.driver.user.LDAP_USER = None + self.identity_api.driver.user.LDAP_PASSWORD = None + + self.assertRaises(AssertionError, + self.identity_api.authenticate, + user_id=user['id'], + password=None) + + # (spzala)The group and domain crud tests below override the standard ones + # in test_backend.py so that we can exclude the update name test, since we + # do not yet support the update of either group or domain names with LDAP. + # In the tests below, the update is demonstrated by updating description. + # Refer to bug 1136403 for more detail. + def test_group_crud(self): + group = { + 'id': uuid.uuid4().hex, + 'domain_id': CONF.identity.default_domain_id, + 'name': uuid.uuid4().hex, + 'description': uuid.uuid4().hex} + self.identity_api.create_group(group['id'], group) + group_ref = self.identity_api.get_group(group['id']) + self.assertDictEqual(group_ref, group) + group['description'] = uuid.uuid4().hex + self.identity_api.update_group(group['id'], group) + group_ref = self.identity_api.get_group(group['id']) + self.assertDictEqual(group_ref, group) + + self.identity_api.delete_group(group['id']) + self.assertRaises(exception.GroupNotFound, + self.identity_api.get_group, + group['id']) + + +class LDAPIdentity(test.TestCase, BaseLDAPIdentity): + def setUp(self): + super(LDAPIdentity, self).setUp() + self._set_config() + self.clear_database() + + self.load_backends() + self.load_fixtures(default_fixtures) + def test_configurable_allowed_project_actions(self): tenant = {'id': 'fake1', 'name': 'fake1', 'enabled': True} self.identity_api.create_project('fake1', tenant) @@ -175,17 +364,6 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests): self.identity_api.delete_role, self.role_member['id']) - def test_user_filter(self): - user_ref = self.identity_api.get_user(self.user_foo['id']) - self.user_foo.pop('password') - self.assertDictEqual(user_ref, self.user_foo) - - CONF.ldap.user_filter = '(CN=DOES_NOT_MATCH)' - self.load_backends() - self.assertRaises(exception.UserNotFound, - self.identity_api.get_user, - self.user_foo['id']) - def test_project_filter(self): tenant_ref = self.identity_api.get_project(self.tenant_bar['id']) self.assertDictEqual(tenant_ref, self.tenant_bar) @@ -216,40 +394,6 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests): self.identity_api.get_user, 'dumb') - def test_user_attribute_mapping(self): - CONF.ldap.user_name_attribute = 'sn' - CONF.ldap.user_mail_attribute = 'mail' - CONF.ldap.user_enabled_attribute = 'enabled' - self.clear_database() - self.load_backends() - self.load_fixtures(default_fixtures) - user_ref = self.identity_api.get_user(self.user_two['id']) - self.assertEqual(user_ref['id'], self.user_two['id']) - self.assertEqual(user_ref['name'], self.user_two['name']) - self.assertEqual(user_ref['email'], self.user_two['email']) - - CONF.ldap.user_name_attribute = 'mail' - CONF.ldap.user_mail_attribute = 'sn' - self.load_backends() - user_ref = self.identity_api.get_user(self.user_two['id']) - self.assertEqual(user_ref['id'], self.user_two['id']) - self.assertEqual(user_ref['name'], self.user_two['email']) - self.assertEqual(user_ref['email'], self.user_two['name']) - - def test_user_attribute_ignore(self): - CONF.ldap.user_attribute_ignore = ['email', 'password', - 'tenant_id', 'enabled', 'tenants'] - self.clear_database() - self.load_backends() - self.load_fixtures(default_fixtures) - user_ref = self.identity_api.get_user(self.user_two['id']) - self.assertEqual(user_ref['id'], self.user_two['id']) - self.assertNotIn('email', user_ref) - self.assertNotIn('password', user_ref) - self.assertNotIn('tenant_id', user_ref) - self.assertNotIn('enabled', user_ref) - self.assertNotIn('tenants', user_ref) - def test_project_attribute_mapping(self): CONF.ldap.tenant_name_attribute = 'ou' CONF.ldap.tenant_desc_attribute = 'description' @@ -384,30 +528,6 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests): # TODO(henry-nash): These need to be removed when the full LDAP implementation # is submitted - see Bugs 1092187, 1101287, 1101276, 1101289 - # (spzala)The group and domain crud tests below override the standard ones - # in test_backend.py so that we can exclude the update name test, since we - # do not yet support the update of either group or domain names with LDAP. - # In the tests below, the update is demonstrated by updating description. - # Refer to bug 1136403 for more detail. - def test_group_crud(self): - group = { - 'id': uuid.uuid4().hex, - 'domain_id': CONF.identity.default_domain_id, - 'name': uuid.uuid4().hex, - 'description': uuid.uuid4().hex} - self.identity_api.create_group(group['id'], group) - group_ref = self.identity_api.get_group(group['id']) - self.assertDictEqual(group_ref, group) - group['description'] = uuid.uuid4().hex - self.identity_api.update_group(group['id'], group) - group_ref = self.identity_api.get_group(group['id']) - self.assertDictEqual(group_ref, group) - - self.identity_api.delete_group(group['id']) - self.assertRaises(exception.GroupNotFound, - self.identity_api.get_group, - group['id']) - def test_domain_crud(self): domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'enabled': True, 'description': uuid.uuid4().hex} @@ -434,33 +554,6 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests): self.identity_api.get_domain, domain['id']) - def test_get_role_grant_by_user_and_project(self): - raise nose.exc.SkipTest('Blocked by bug 1101287') - - def test_get_role_grants_for_user_and_project_404(self): - raise nose.exc.SkipTest('Blocked by bug 1101287') - - def test_add_role_grant_to_user_and_project_404(self): - raise nose.exc.SkipTest('Blocked by bug 1101287') - - def test_remove_role_grant_from_user_and_project(self): - raise nose.exc.SkipTest('Blocked by bug 1101287') - - def test_get_and_remove_role_grant_by_group_and_project(self): - raise nose.exc.SkipTest('Blocked by bug 1101287') - - def test_get_and_remove_role_grant_by_group_and_domain(self): - raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains') - - def test_get_and_remove_role_grant_by_user_and_domain(self): - raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains') - - def test_get_and_remove_correct_role_grant_from_a_mix(self): - raise nose.exc.SkipTest('Blocked by bug 1101287') - - def test_list_role_assignments_unfiltered(self): - raise nose.exc.SkipTest('Blocked by bug 1195019') - def test_project_crud(self): # NOTE(topol): LDAP implementation does not currently support the # updating of a project name so this method override @@ -488,18 +581,6 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests): self.identity_api.get_project, project['id']) - def test_get_and_remove_role_grant_by_group_and_cross_domain(self): - raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains') - - def test_get_and_remove_role_grant_by_user_and_cross_domain(self): - raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains') - - def test_role_grant_by_group_and_cross_domain_project(self): - raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains') - - def test_role_grant_by_user_and_cross_domain_project(self): - raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains') - def test_multi_role_grant_by_user_group_on_project_domain(self): # This is a partial implementation of the standard test that # is defined in test_backend.py. It omits both domain and @@ -549,117 +630,6 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests): user1['id'], CONF.identity.default_domain_id) self.assertEquals(len(combined_role_list), 0) - def test_multi_group_grants_on_project_domain(self): - raise nose.exc.SkipTest('Blocked by bug 1101287') - - def test_delete_role_with_user_and_group_grants(self): - raise nose.exc.SkipTest('Blocked by bug 1101287') - - def test_delete_user_with_group_project_domain_links(self): - raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains') - - def test_delete_group_with_user_project_domain_links(self): - raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains') - - def test_list_user_projects(self): - raise nose.exc.SkipTest('Blocked by bug 1101287') - - def test_create_duplicate_user_name_in_different_domains(self): - raise nose.exc.SkipTest('Blocked by bug 1101276') - - def test_create_duplicate_project_name_in_different_domains(self): - raise nose.exc.SkipTest('Blocked by bug 1101276') - - def test_create_duplicate_group_name_in_different_domains(self): - raise nose.exc.SkipTest( - 'N/A: LDAP does not support multiple domains') - - def test_move_user_between_domains(self): - raise nose.exc.SkipTest('Blocked by bug 1101276') - - def test_move_user_between_domains_with_clashing_names_fails(self): - raise nose.exc.SkipTest('Blocked by bug 1101276') - - def test_move_group_between_domains(self): - raise nose.exc.SkipTest( - 'N/A: LDAP does not support multiple domains') - - def test_move_group_between_domains_with_clashing_names_fails(self): - raise nose.exc.SkipTest('Blocked by bug 1101276') - - def test_move_project_between_domains(self): - raise nose.exc.SkipTest('Blocked by bug 1101276') - - def test_move_project_between_domains_with_clashing_names_fails(self): - raise nose.exc.SkipTest('Blocked by bug 1101276') - - def test_get_roles_for_user_and_domain(self): - raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains') - - def test_list_group_members_missing_entry(self): - """List group members with deleted user. - - If a group has a deleted entry for a member, the non-deleted members - are returned. - - """ - - # Create a group - group_id = None - group = dict(name=uuid.uuid4().hex) - group_id = self.identity_api.create_group(group_id, group)['id'] - - # Create a couple of users and add them to the group. - user_id = None - user = dict(name=uuid.uuid4().hex, id=uuid.uuid4().hex) - user_1_id = self.identity_api.create_user(user_id, user)['id'] - - self.identity_api.add_user_to_group(user_1_id, group_id) - - user_id = None - user = dict(name=uuid.uuid4().hex, id=uuid.uuid4().hex) - user_2_id = self.identity_api.create_user(user_id, user)['id'] - - self.identity_api.add_user_to_group(user_2_id, group_id) - - # Delete user 2 - # NOTE(blk-u): need to go directly to user interface to keep from - # updating the group. - self.identity_api.driver.user.delete(user_2_id) - - # List group users and verify only user 1. - res = self.identity_api.list_users_in_group(group_id) - - self.assertEqual(len(res), 1, "Expected 1 entry (user_1)") - self.assertEqual(res[0]['id'], user_1_id, "Expected user 1 id") - - def test_list_domains(self): - domains = self.identity_api.list_domains() - self.assertEquals( - domains, - [{'id': CONF.identity.default_domain_id, - 'name': 'Default', - 'enabled': True}]) - - def test_authenticate_requires_simple_bind(self): - user = { - 'id': 'no_meta', - 'name': 'NO_META', - 'domain_id': test_backend.DEFAULT_DOMAIN_ID, - 'password': 'no_meta2', - 'enabled': True, - } - self.identity_api.create_user(user['id'], user) - self.identity_api.add_user_to_project(self.tenant_baz['id'], - user['id']) - self.identity_api.driver.user.LDAP_USER = None - self.identity_api.driver.user.LDAP_PASSWORD = None - - self.assertRaises(AssertionError, - self.identity_api.authenticate, - user_id=user['id'], - password=None) - class LDAPIdentityEnabledEmulation(LDAPIdentity): def setUp(self): @@ -733,3 +703,41 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity): def test_user_enable_attribute_mask(self): raise nose.exc.SkipTest( "Enabled emulation conflicts with enabled mask") + + +class LdapIdentitySqlAssignment(sql.Base, test.TestCase, BaseLDAPIdentity): + + def _set_config(self): + self.config([test.etcdir('keystone.conf.sample'), + test.testsdir('test_overrides.conf'), + test.testsdir('backend_ldap_sql.conf')]) + + def setUp(self): + self._set_config() + self.clear_database() + self.load_backends() + self.engine = self.get_engine() + sql.ModelBase.metadata.create_all(bind=self.engine) + self.load_fixtures(default_fixtures) + #defaulted by the data load + self.user_foo['enabled'] = True + + def tearDown(self): + sql.ModelBase.metadata.drop_all(bind=self.engine) + self.engine.dispose() + sql.set_global_engine(None) + + def test_domain_crud(self): + pass + + def test_list_domains(self): + domains = self.identity_api.list_domains() + self.assertEquals(domains, [assignment.DEFAULT_DOMAIN]) + + def test_project_filter(self): + raise nose.exc.SkipTest( + 'N/A: Not part of SQL backend') + + def test_role_filter(self): + raise nose.exc.SkipTest( + 'N/A: Not part of SQL backend') diff --git a/tests/test_drivers.py b/tests/test_drivers.py index 439b0d30..c83c1a89 100644 --- a/tests/test_drivers.py +++ b/tests/test_drivers.py @@ -1,6 +1,7 @@ import inspect import unittest2 as unittest +from keystone import assignment from keystone import catalog from keystone import exception from keystone import identity @@ -35,6 +36,10 @@ class TestDrivers(unittest.TestCase): if name[0] != '_' and callable(method): self.assertMethodNotImplemented(method) + def test_assignment_driver_unimplemented(self): + interface = assignment.Driver() + self.assertInterfaceNotImplemented(interface) + def test_catalog_driver_unimplemented(self): interface = catalog.Driver() self.assertInterfaceNotImplemented(interface) -- cgit