summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAdam Young <ayoung@redhat.com>2013-06-28 18:34:25 -0400
committerAdam Young <ayoung@redhat.com>2013-07-12 15:16:47 -0400
commit4be48868ef9b34e90e8e6028201bc3b0ac569c3d (patch)
tree01203677ad0d44c6360bb0cf08e4ed10d8ce504a
parent661cef927e95cf87a96eea7f0f6d840f8bf4adcd (diff)
Mixed LDAP/SQL Backend.
Supports the configuration where LDAP is used for identity and SQL is used for assignment. blueprint split-identity Change-Id: Ib91b5d804282b7f78fc2458ff64653bbf2cf5d9e
-rw-r--r--keystone/assignment/backends/ldap.py44
-rw-r--r--keystone/assignment/backends/sql.py40
-rw-r--r--keystone/assignment/core.py42
-rw-r--r--keystone/identity/backends/ldap.py10
-rw-r--r--keystone/identity/core.py9
-rw-r--r--tests/backend_ldap_sql.conf36
-rw-r--r--tests/default_fixtures.py11
-rw-r--r--tests/test_backend_ldap.py468
-rw-r--r--tests/test_drivers.py5
9 files changed, 358 insertions, 307 deletions
diff --git a/keystone/assignment/backends/ldap.py b/keystone/assignment/backends/ldap.py
index 44a479bf..09539c9f 100644
--- a/keystone/assignment/backends/ldap.py
+++ b/keystone/assignment/backends/ldap.py
@@ -76,41 +76,11 @@ class Assignment(assignment.Driver):
return self._set_default_domain(self.project.get_all())
def get_project_by_name(self, tenant_name, domain_id):
- self._validate_domain_id(domain_id)
+ self._validate_default_domain_id(domain_id)
return self._set_default_domain(self.project.get_by_name(tenant_name))
- def _validate_domain(self, ref):
- """Validate that either the default domain or nothing is specified.
-
- Also removes the domain from the ref so that LDAP doesn't have to
- persist the attribute.
-
- """
- ref = ref.copy()
- domain_id = ref.pop('domain_id', CONF.identity.default_domain_id)
- self._validate_domain_id(domain_id)
- return ref
-
- def _validate_domain_id(self, domain_id):
- """Validate that the domain ID specified belongs to the default domain.
-
- """
- if domain_id != CONF.identity.default_domain_id:
- raise exception.DomainNotFound(domain_id=domain_id)
-
- def _set_default_domain(self, ref):
- """Overrides any domain reference with the default domain."""
- if isinstance(ref, dict):
- ref = ref.copy()
- ref['domain_id'] = CONF.identity.default_domain_id
- return ref
- elif isinstance(ref, list):
- return [self._set_default_domain(x) for x in ref]
- else:
- raise ValueError(_('Expected dict or list: %s') % type(ref))
-
def create_project(self, tenant_id, tenant):
- tenant = self._validate_domain(tenant)
+ tenant = self._validate_default_domain(tenant)
tenant['name'] = clean.project_name(tenant['name'])
data = tenant.copy()
if 'id' not in data or data['id'] is None:
@@ -120,7 +90,7 @@ class Assignment(assignment.Driver):
return self._set_default_domain(self.project.create(data))
def update_project(self, tenant_id, tenant):
- tenant = self._validate_domain(tenant)
+ tenant = self._validate_default_domain(tenant)
if 'name' in tenant:
tenant['name'] = clean.project_name(tenant['name'])
return self._set_default_domain(self.project.update(tenant_id, tenant))
@@ -244,19 +214,19 @@ class Assignment(assignment.Driver):
raise exception.Forbidden('Domains are read-only against LDAP')
def get_domain(self, domain_id):
- self._validate_domain_id(domain_id)
+ self._validate_default_domain_id(domain_id)
return DEFAULT_DOMAIN
def update_domain(self, domain_id, domain):
- self._validate_domain_id(domain_id)
+ self._validate_default_domain_id(domain_id)
raise exception.Forbidden('Domains are read-only against LDAP')
def delete_domain(self, domain_id):
- self._validate_domain_id(domain_id)
+ self._validate_default_domain_id(domain_id)
raise exception.Forbidden('Domains are read-only against LDAP')
def list_domains(self):
- return [DEFAULT_DOMAIN]
+ return [assignment.DEFAULT_DOMAIN]
#Bulk actions on User From identity
def delete_user(self, user_id):
diff --git a/keystone/assignment/backends/sql.py b/keystone/assignment/backends/sql.py
index 57ca7834..237330ce 100644
--- a/keystone/assignment/backends/sql.py
+++ b/keystone/assignment/backends/sql.py
@@ -97,12 +97,14 @@ class Assignment(sql.Base, assignment.Driver):
def create_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
- session = self.get_session()
- self._get_role(session, role_id)
if user_id:
- self.identity_api._get_user(session, user_id)
+ self.identity_api.get_user(user_id)
if group_id:
- self.identity_api._get_group(session, group_id)
+ self.identity_api.get_group(group_id)
+
+ session = self.get_session()
+ self._get_role(session, role_id)
+
if domain_id:
self._get_domain(session, domain_id)
if project_id:
@@ -127,11 +129,11 @@ class Assignment(sql.Base, assignment.Driver):
def list_grants(self, user_id=None, group_id=None,
domain_id=None, project_id=None):
- session = self.get_session()
if user_id:
- self.identity_api._get_user(session, user_id)
+ self.identity_api.get_user(user_id)
if group_id:
- self.identity_api._get_group(session, group_id)
+ self.identity_api.get_group(group_id)
+ session = self.get_session()
if domain_id:
self._get_domain(session, domain_id)
if project_id:
@@ -146,12 +148,14 @@ class Assignment(sql.Base, assignment.Driver):
def get_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
- session = self.get_session()
- role_ref = self._get_role(session, role_id)
if user_id:
- self.identity_api._get_user(session, user_id)
+ self.identity_api.get_user(user_id)
if group_id:
- self.identity_api._get_group(session, group_id)
+ self.identity_api.get_group(group_id)
+
+ session = self.get_session()
+ role_ref = self._get_role(session, role_id)
+
if domain_id:
self._get_domain(session, domain_id)
if project_id:
@@ -169,12 +173,14 @@ class Assignment(sql.Base, assignment.Driver):
def delete_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
- session = self.get_session()
- self._get_role(session, role_id)
if user_id:
- self.identity_api._get_user(session, user_id)
+ self.identity_api.get_user(user_id)
if group_id:
- self.identity_api._get_group(session, group_id)
+ self.identity_api.get_group(group_id)
+
+ session = self.get_session()
+ self._get_role(session, role_id)
+
if domain_id:
self._get_domain(session, domain_id)
if project_id:
@@ -206,16 +212,16 @@ class Assignment(sql.Base, assignment.Driver):
return [tenant_ref.to_dict() for tenant_ref in tenant_refs]
def get_projects_for_user(self, user_id):
+ self.identity_api.get_user(user_id)
session = self.get_session()
- self.identity_api._get_user(session, user_id)
query = session.query(UserProjectGrant)
query = query.filter_by(user_id=user_id)
membership_refs = query.all()
return [x.project_id for x in membership_refs]
def add_role_to_user_and_project(self, user_id, tenant_id, role_id):
+ self.identity_api.get_user(user_id)
session = self.get_session()
- self.identity_api._get_user(session, user_id)
self._get_project(session, tenant_id)
self._get_role(session, role_id)
try:
diff --git a/keystone/assignment/core.py b/keystone/assignment/core.py
index 879fee0b..531da02e 100644
--- a/keystone/assignment/core.py
+++ b/keystone/assignment/core.py
@@ -26,6 +26,13 @@ from keystone import exception
CONF = config.CONF
LOG = logging.getLogger(__name__)
+DEFAULT_DOMAIN = {'description':
+ (u'Owns users and tenants (i.e. projects)'
+ ' available on Identity API v2.'),
+ 'enabled': True,
+ 'id': CONF.identity.default_domain_id,
+ 'name': u'Default'}
+
@dependency.provider('assignment_api')
class Manager(manager.Manager):
@@ -424,3 +431,38 @@ class Driver(object):
:raises: keystone.exception.RoleNotFound
"""
+ raise exception.NotImplemented()
+
+ #domain management functions for backends that only allow a single domain.
+ #currently, this is only LDAP, but might be used by PAM or other backends
+ #as well. This is used by both identity and assignment drivers.
+ def _set_default_domain(self, ref):
+ """If the domain ID has not been set, set it to the default."""
+ if isinstance(ref, dict):
+ if 'domain_id' not in ref:
+ ref = ref.copy()
+ ref['domain_id'] = CONF.identity.default_domain_id
+ return ref
+ elif isinstance(ref, list):
+ return [self._set_default_domain(x) for x in ref]
+ else:
+ raise ValueError(_('Expected dict or list: %s') % type(ref))
+
+ def _validate_default_domain(self, ref):
+ """Validate that either the default domain or nothing is specified.
+
+ Also removes the domain from the ref so that LDAP doesn't have to
+ persist the attribute.
+
+ """
+ ref = ref.copy()
+ domain_id = ref.pop('domain_id', CONF.identity.default_domain_id)
+ self._validate_default_domain_id(domain_id)
+ return ref
+
+ def _validate_default_domain_id(self, domain_id):
+ """Validate that the domain ID specified belongs to the default domain.
+
+ """
+ if domain_id != CONF.identity.default_domain_id:
+ raise exception.DomainNotFound(domain_id=domain_id)
diff --git a/keystone/identity/backends/ldap.py b/keystone/identity/backends/ldap.py
index de447b60..c06737c8 100644
--- a/keystone/identity/backends/ldap.py
+++ b/keystone/identity/backends/ldap.py
@@ -88,13 +88,13 @@ class Identity(identity.Driver):
return self.assignment._set_default_domain(self.user.get_all())
def get_user_by_name(self, user_name, domain_id):
- self.assignment._validate_domain_id(domain_id)
+ self.assignment._validate_default_domain_id(domain_id)
ref = identity.filter_user(self.user.get_by_name(user_name))
return self.assignment._set_default_domain(ref)
# CRUD
def create_user(self, user_id, user):
- user = self.assignment._validate_domain(user)
+ user = self.assignment._validate_default_domain(user)
user_ref = self.user.create(user)
tenant_id = user.get('tenant_id')
if tenant_id is not None:
@@ -103,7 +103,7 @@ class Identity(identity.Driver):
(identity.filter_user(user_ref)))
def update_user(self, user_id, user):
- user = self.assignment._validate_domain(user)
+ user = self.assignment._validate_default_domain(user)
if 'id' in user and user['id'] != user_id:
raise exception.ValidationError('Cannot change user ID')
old_obj = self.user.get(user_id)
@@ -143,7 +143,7 @@ class Identity(identity.Driver):
self.user.delete(user_id)
def create_group(self, group_id, group):
- group = self.assignment._validate_domain(group)
+ group = self.assignment._validate_default_domain(group)
group['name'] = clean.group_name(group['name'])
return self.assignment._set_default_domain(self.group.create(group))
@@ -151,7 +151,7 @@ class Identity(identity.Driver):
return self.assignment._set_default_domain(self.group.get(group_id))
def update_group(self, group_id, group):
- group = self.assignment._validate_domain(group)
+ group = self.assignment._validate_default_domain(group)
if 'name' in group:
group['name'] = clean.group_name(group['name'])
return (self.assignment._set_default_domain
diff --git a/keystone/identity/core.py b/keystone/identity/core.py
index f725589f..981d24ed 100644
--- a/keystone/identity/core.py
+++ b/keystone/identity/core.py
@@ -110,15 +110,6 @@ class Manager(manager.Manager):
def list_projects(self):
return self.assignment.list_projects()
- def _validate_domain(self, ref):
- return self.assignment._validate_domain(ref)
-
- def _validate_domain_id(self, domain_id):
- return self.assignment._validate_domain_id(domain_id)
-
- def _set_default_domain(self, ref):
- return self.assignment._set_default_domain(ref)
-
def get_role(self, role_id):
return self.assignment.get_role(role_id)
diff --git a/tests/backend_ldap_sql.conf b/tests/backend_ldap_sql.conf
new file mode 100644
index 00000000..8dcfa40d
--- /dev/null
+++ b/tests/backend_ldap_sql.conf
@@ -0,0 +1,36 @@
+[sql]
+connection = sqlite://
+#For a file based sqlite use
+#connection = sqlite:////tmp/keystone.db
+#To Test MySQL:
+#connection = mysql://keystone:keystone@localhost/keystone?charset=utf8
+#To Test PostgreSQL:
+#connection = postgresql://keystone:keystone@localhost/keystone?client_encoding=utf8
+idle_timeout = 200
+
+[ldap]
+url = fake://memory
+user = cn=Admin
+password = password
+suffix = cn=example,cn=com
+
+[identity]
+driver = keystone.identity.backends.ldap.Identity
+
+[assignment]
+driver = keystone.assignment.backends.sql.Assignment
+
+[token]
+driver = keystone.token.backends.sql.Token
+
+[ec2]
+driver = keystone.contrib.ec2.backends.sql.Ec2
+
+[catalog]
+driver = keystone.catalog.backends.sql.Catalog
+
+[policy]
+driver = keystone.policy.backends.sql.Policy
+
+[trust]
+driver = keystone.trust.backends.sql.Trust
diff --git a/tests/default_fixtures.py b/tests/default_fixtures.py
index a1329c22..2695da88 100644
--- a/tests/default_fixtures.py
+++ b/tests/default_fixtures.py
@@ -17,6 +17,7 @@
# NOTE(dolph): please try to avoid additional fixtures if possible; test suite
# performance may be negatively affected.
+from keystone import assignment
from keystone import config
@@ -120,12 +121,4 @@ ROLES = [
}
]
-DOMAINS = [
- {
- 'id': DEFAULT_DOMAIN_ID,
- 'name': 'Default',
- 'enabled': True,
- 'description': 'Owns users and tenants (i.e. projects) available '
- 'on Identity API v2.'
- }
-]
+DOMAINS = [assignment.DEFAULT_DOMAIN]
diff --git a/tests/test_backend_ldap.py b/tests/test_backend_ldap.py
index c9a8a4ed..b68399ba 100644
--- a/tests/test_backend_ldap.py
+++ b/tests/test_backend_ldap.py
@@ -19,12 +19,13 @@ import uuid
import nose.exc
-from keystone import test
-
+from keystone import assignment
from keystone.common.ldap import fakeldap
+from keystone.common import sql
from keystone import config
from keystone import exception
from keystone import identity
+from keystone import test
import default_fixtures
import test_backend
@@ -33,7 +34,7 @@ import test_backend
CONF = config.CONF
-class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
+class BaseLDAPIdentity(test_backend.IdentityTests):
def _get_domain_fixture(self):
"""Domains in LDAP are read-only, so just return the static one."""
return self.identity_api.get_domain(CONF.identity.default_domain_id)
@@ -47,14 +48,6 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
test.testsdir('test_overrides.conf'),
test.testsdir('backend_ldap.conf')])
- def setUp(self):
- super(LDAPIdentity, self).setUp()
- self._set_config()
- self.clear_database()
-
- self.load_backends()
- self.load_fixtures(default_fixtures)
-
def test_build_tree(self):
"""Regression test for building the tree names
"""
@@ -104,6 +97,202 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
self.identity_api.delete_user,
self.user_foo['id'])
+ def test_user_filter(self):
+ user_ref = self.identity_api.get_user(self.user_foo['id'])
+ self.user_foo.pop('password')
+ self.assertDictEqual(user_ref, self.user_foo)
+
+ CONF.ldap.user_filter = '(CN=DOES_NOT_MATCH)'
+ self.load_backends()
+ self.assertRaises(exception.UserNotFound,
+ self.identity_api.get_user,
+ self.user_foo['id'])
+
+ def test_get_role_grant_by_user_and_project(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101287')
+
+ def test_get_role_grants_for_user_and_project_404(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101287')
+
+ def test_add_role_grant_to_user_and_project_404(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101287')
+
+ def test_remove_role_grant_from_user_and_project(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101287')
+
+ def test_get_and_remove_role_grant_by_group_and_project(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101287')
+
+ def test_get_and_remove_role_grant_by_group_and_domain(self):
+ raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
+
+ def test_get_and_remove_role_grant_by_user_and_domain(self):
+ raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
+
+ def test_get_and_remove_correct_role_grant_from_a_mix(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101287')
+
+ def test_get_and_remove_role_grant_by_group_and_cross_domain(self):
+ raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
+
+ def test_get_and_remove_role_grant_by_user_and_cross_domain(self):
+ raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
+
+ def test_role_grant_by_group_and_cross_domain_project(self):
+ raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
+
+ def test_role_grant_by_user_and_cross_domain_project(self):
+ raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
+
+ def test_multi_role_grant_by_user_group_on_project_domain(self):
+ raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
+
+ def test_delete_role_with_user_and_group_grants(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101287')
+
+ def test_delete_user_with_group_project_domain_links(self):
+ raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
+
+ def test_delete_group_with_user_project_domain_links(self):
+ raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
+
+ def test_list_user_projects(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101287')
+
+ def test_create_duplicate_user_name_in_different_domains(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101276')
+
+ def test_create_duplicate_project_name_in_different_domains(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101276')
+
+ def test_create_duplicate_group_name_in_different_domains(self):
+ raise nose.exc.SkipTest(
+ 'N/A: LDAP does not support multiple domains')
+
+ def test_move_user_between_domains(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101276')
+
+ def test_move_user_between_domains_with_clashing_names_fails(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101276')
+
+ def test_move_group_between_domains(self):
+ raise nose.exc.SkipTest(
+ 'N/A: LDAP does not support multiple domains')
+
+ def test_move_group_between_domains_with_clashing_names_fails(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101276')
+
+ def test_move_project_between_domains(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101276')
+
+ def test_move_project_between_domains_with_clashing_names_fails(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101276')
+
+ def test_get_roles_for_user_and_domain(self):
+ raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
+
+ def test_list_role_assignments_unfiltered(self):
+ raise nose.exc.SkipTest('Blocked by bug 1195019')
+
+ def test_multi_group_grants_on_project_domain(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101287')
+
+ def test_list_group_members_missing_entry(self):
+ """List group members with deleted user.
+
+ If a group has a deleted entry for a member, the non-deleted members
+ are returned.
+
+ """
+
+ # Create a group
+ group_id = None
+ group = dict(name=uuid.uuid4().hex)
+ group_id = self.identity_api.create_group(group_id, group)['id']
+
+ # Create a couple of users and add them to the group.
+ user_id = None
+ user = dict(name=uuid.uuid4().hex, id=uuid.uuid4().hex)
+ user_1_id = self.identity_api.create_user(user_id, user)['id']
+
+ self.identity_api.add_user_to_group(user_1_id, group_id)
+
+ user_id = None
+ user = dict(name=uuid.uuid4().hex, id=uuid.uuid4().hex)
+ user_2_id = self.identity_api.create_user(user_id, user)['id']
+
+ self.identity_api.add_user_to_group(user_2_id, group_id)
+
+ # Delete user 2
+ # NOTE(blk-u): need to go directly to user interface to keep from
+ # updating the group.
+ self.identity_api.driver.user.delete(user_2_id)
+
+ # List group users and verify only user 1.
+ res = self.identity_api.list_users_in_group(group_id)
+
+ self.assertEqual(len(res), 1, "Expected 1 entry (user_1)")
+ self.assertEqual(res[0]['id'], user_1_id, "Expected user 1 id")
+
+ def test_list_domains(self):
+ domains = self.identity_api.list_domains()
+ self.assertEquals(
+ domains,
+ [assignment.DEFAULT_DOMAIN])
+
+ def test_authenticate_requires_simple_bind(self):
+ user = {
+ 'id': 'no_meta',
+ 'name': 'NO_META',
+ 'domain_id': test_backend.DEFAULT_DOMAIN_ID,
+ 'password': 'no_meta2',
+ 'enabled': True,
+ }
+ self.identity_api.create_user(user['id'], user)
+ self.identity_api.add_user_to_project(self.tenant_baz['id'],
+ user['id'])
+ self.identity_api.driver.user.LDAP_USER = None
+ self.identity_api.driver.user.LDAP_PASSWORD = None
+
+ self.assertRaises(AssertionError,
+ self.identity_api.authenticate,
+ user_id=user['id'],
+ password=None)
+
+ # (spzala)The group and domain crud tests below override the standard ones
+ # in test_backend.py so that we can exclude the update name test, since we
+ # do not yet support the update of either group or domain names with LDAP.
+ # In the tests below, the update is demonstrated by updating description.
+ # Refer to bug 1136403 for more detail.
+ def test_group_crud(self):
+ group = {
+ 'id': uuid.uuid4().hex,
+ 'domain_id': CONF.identity.default_domain_id,
+ 'name': uuid.uuid4().hex,
+ 'description': uuid.uuid4().hex}
+ self.identity_api.create_group(group['id'], group)
+ group_ref = self.identity_api.get_group(group['id'])
+ self.assertDictEqual(group_ref, group)
+ group['description'] = uuid.uuid4().hex
+ self.identity_api.update_group(group['id'], group)
+ group_ref = self.identity_api.get_group(group['id'])
+ self.assertDictEqual(group_ref, group)
+
+ self.identity_api.delete_group(group['id'])
+ self.assertRaises(exception.GroupNotFound,
+ self.identity_api.get_group,
+ group['id'])
+
+
+class LDAPIdentity(test.TestCase, BaseLDAPIdentity):
+ def setUp(self):
+ super(LDAPIdentity, self).setUp()
+ self._set_config()
+ self.clear_database()
+
+ self.load_backends()
+ self.load_fixtures(default_fixtures)
+
def test_configurable_allowed_project_actions(self):
tenant = {'id': 'fake1', 'name': 'fake1', 'enabled': True}
self.identity_api.create_project('fake1', tenant)
@@ -175,17 +364,6 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
self.identity_api.delete_role,
self.role_member['id'])
- def test_user_filter(self):
- user_ref = self.identity_api.get_user(self.user_foo['id'])
- self.user_foo.pop('password')
- self.assertDictEqual(user_ref, self.user_foo)
-
- CONF.ldap.user_filter = '(CN=DOES_NOT_MATCH)'
- self.load_backends()
- self.assertRaises(exception.UserNotFound,
- self.identity_api.get_user,
- self.user_foo['id'])
-
def test_project_filter(self):
tenant_ref = self.identity_api.get_project(self.tenant_bar['id'])
self.assertDictEqual(tenant_ref, self.tenant_bar)
@@ -216,40 +394,6 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
self.identity_api.get_user,
'dumb')
- def test_user_attribute_mapping(self):
- CONF.ldap.user_name_attribute = 'sn'
- CONF.ldap.user_mail_attribute = 'mail'
- CONF.ldap.user_enabled_attribute = 'enabled'
- self.clear_database()
- self.load_backends()
- self.load_fixtures(default_fixtures)
- user_ref = self.identity_api.get_user(self.user_two['id'])
- self.assertEqual(user_ref['id'], self.user_two['id'])
- self.assertEqual(user_ref['name'], self.user_two['name'])
- self.assertEqual(user_ref['email'], self.user_two['email'])
-
- CONF.ldap.user_name_attribute = 'mail'
- CONF.ldap.user_mail_attribute = 'sn'
- self.load_backends()
- user_ref = self.identity_api.get_user(self.user_two['id'])
- self.assertEqual(user_ref['id'], self.user_two['id'])
- self.assertEqual(user_ref['name'], self.user_two['email'])
- self.assertEqual(user_ref['email'], self.user_two['name'])
-
- def test_user_attribute_ignore(self):
- CONF.ldap.user_attribute_ignore = ['email', 'password',
- 'tenant_id', 'enabled', 'tenants']
- self.clear_database()
- self.load_backends()
- self.load_fixtures(default_fixtures)
- user_ref = self.identity_api.get_user(self.user_two['id'])
- self.assertEqual(user_ref['id'], self.user_two['id'])
- self.assertNotIn('email', user_ref)
- self.assertNotIn('password', user_ref)
- self.assertNotIn('tenant_id', user_ref)
- self.assertNotIn('enabled', user_ref)
- self.assertNotIn('tenants', user_ref)
-
def test_project_attribute_mapping(self):
CONF.ldap.tenant_name_attribute = 'ou'
CONF.ldap.tenant_desc_attribute = 'description'
@@ -384,30 +528,6 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
# TODO(henry-nash): These need to be removed when the full LDAP implementation
# is submitted - see Bugs 1092187, 1101287, 1101276, 1101289
- # (spzala)The group and domain crud tests below override the standard ones
- # in test_backend.py so that we can exclude the update name test, since we
- # do not yet support the update of either group or domain names with LDAP.
- # In the tests below, the update is demonstrated by updating description.
- # Refer to bug 1136403 for more detail.
- def test_group_crud(self):
- group = {
- 'id': uuid.uuid4().hex,
- 'domain_id': CONF.identity.default_domain_id,
- 'name': uuid.uuid4().hex,
- 'description': uuid.uuid4().hex}
- self.identity_api.create_group(group['id'], group)
- group_ref = self.identity_api.get_group(group['id'])
- self.assertDictEqual(group_ref, group)
- group['description'] = uuid.uuid4().hex
- self.identity_api.update_group(group['id'], group)
- group_ref = self.identity_api.get_group(group['id'])
- self.assertDictEqual(group_ref, group)
-
- self.identity_api.delete_group(group['id'])
- self.assertRaises(exception.GroupNotFound,
- self.identity_api.get_group,
- group['id'])
-
def test_domain_crud(self):
domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'enabled': True, 'description': uuid.uuid4().hex}
@@ -434,33 +554,6 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
self.identity_api.get_domain,
domain['id'])
- def test_get_role_grant_by_user_and_project(self):
- raise nose.exc.SkipTest('Blocked by bug 1101287')
-
- def test_get_role_grants_for_user_and_project_404(self):
- raise nose.exc.SkipTest('Blocked by bug 1101287')
-
- def test_add_role_grant_to_user_and_project_404(self):
- raise nose.exc.SkipTest('Blocked by bug 1101287')
-
- def test_remove_role_grant_from_user_and_project(self):
- raise nose.exc.SkipTest('Blocked by bug 1101287')
-
- def test_get_and_remove_role_grant_by_group_and_project(self):
- raise nose.exc.SkipTest('Blocked by bug 1101287')
-
- def test_get_and_remove_role_grant_by_group_and_domain(self):
- raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
-
- def test_get_and_remove_role_grant_by_user_and_domain(self):
- raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
-
- def test_get_and_remove_correct_role_grant_from_a_mix(self):
- raise nose.exc.SkipTest('Blocked by bug 1101287')
-
- def test_list_role_assignments_unfiltered(self):
- raise nose.exc.SkipTest('Blocked by bug 1195019')
-
def test_project_crud(self):
# NOTE(topol): LDAP implementation does not currently support the
# updating of a project name so this method override
@@ -488,18 +581,6 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
self.identity_api.get_project,
project['id'])
- def test_get_and_remove_role_grant_by_group_and_cross_domain(self):
- raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
-
- def test_get_and_remove_role_grant_by_user_and_cross_domain(self):
- raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
-
- def test_role_grant_by_group_and_cross_domain_project(self):
- raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
-
- def test_role_grant_by_user_and_cross_domain_project(self):
- raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
-
def test_multi_role_grant_by_user_group_on_project_domain(self):
# This is a partial implementation of the standard test that
# is defined in test_backend.py. It omits both domain and
@@ -549,117 +630,6 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
user1['id'], CONF.identity.default_domain_id)
self.assertEquals(len(combined_role_list), 0)
- def test_multi_group_grants_on_project_domain(self):
- raise nose.exc.SkipTest('Blocked by bug 1101287')
-
- def test_delete_role_with_user_and_group_grants(self):
- raise nose.exc.SkipTest('Blocked by bug 1101287')
-
- def test_delete_user_with_group_project_domain_links(self):
- raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
-
- def test_delete_group_with_user_project_domain_links(self):
- raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
-
- def test_list_user_projects(self):
- raise nose.exc.SkipTest('Blocked by bug 1101287')
-
- def test_create_duplicate_user_name_in_different_domains(self):
- raise nose.exc.SkipTest('Blocked by bug 1101276')
-
- def test_create_duplicate_project_name_in_different_domains(self):
- raise nose.exc.SkipTest('Blocked by bug 1101276')
-
- def test_create_duplicate_group_name_in_different_domains(self):
- raise nose.exc.SkipTest(
- 'N/A: LDAP does not support multiple domains')
-
- def test_move_user_between_domains(self):
- raise nose.exc.SkipTest('Blocked by bug 1101276')
-
- def test_move_user_between_domains_with_clashing_names_fails(self):
- raise nose.exc.SkipTest('Blocked by bug 1101276')
-
- def test_move_group_between_domains(self):
- raise nose.exc.SkipTest(
- 'N/A: LDAP does not support multiple domains')
-
- def test_move_group_between_domains_with_clashing_names_fails(self):
- raise nose.exc.SkipTest('Blocked by bug 1101276')
-
- def test_move_project_between_domains(self):
- raise nose.exc.SkipTest('Blocked by bug 1101276')
-
- def test_move_project_between_domains_with_clashing_names_fails(self):
- raise nose.exc.SkipTest('Blocked by bug 1101276')
-
- def test_get_roles_for_user_and_domain(self):
- raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
-
- def test_list_group_members_missing_entry(self):
- """List group members with deleted user.
-
- If a group has a deleted entry for a member, the non-deleted members
- are returned.
-
- """
-
- # Create a group
- group_id = None
- group = dict(name=uuid.uuid4().hex)
- group_id = self.identity_api.create_group(group_id, group)['id']
-
- # Create a couple of users and add them to the group.
- user_id = None
- user = dict(name=uuid.uuid4().hex, id=uuid.uuid4().hex)
- user_1_id = self.identity_api.create_user(user_id, user)['id']
-
- self.identity_api.add_user_to_group(user_1_id, group_id)
-
- user_id = None
- user = dict(name=uuid.uuid4().hex, id=uuid.uuid4().hex)
- user_2_id = self.identity_api.create_user(user_id, user)['id']
-
- self.identity_api.add_user_to_group(user_2_id, group_id)
-
- # Delete user 2
- # NOTE(blk-u): need to go directly to user interface to keep from
- # updating the group.
- self.identity_api.driver.user.delete(user_2_id)
-
- # List group users and verify only user 1.
- res = self.identity_api.list_users_in_group(group_id)
-
- self.assertEqual(len(res), 1, "Expected 1 entry (user_1)")
- self.assertEqual(res[0]['id'], user_1_id, "Expected user 1 id")
-
- def test_list_domains(self):
- domains = self.identity_api.list_domains()
- self.assertEquals(
- domains,
- [{'id': CONF.identity.default_domain_id,
- 'name': 'Default',
- 'enabled': True}])
-
- def test_authenticate_requires_simple_bind(self):
- user = {
- 'id': 'no_meta',
- 'name': 'NO_META',
- 'domain_id': test_backend.DEFAULT_DOMAIN_ID,
- 'password': 'no_meta2',
- 'enabled': True,
- }
- self.identity_api.create_user(user['id'], user)
- self.identity_api.add_user_to_project(self.tenant_baz['id'],
- user['id'])
- self.identity_api.driver.user.LDAP_USER = None
- self.identity_api.driver.user.LDAP_PASSWORD = None
-
- self.assertRaises(AssertionError,
- self.identity_api.authenticate,
- user_id=user['id'],
- password=None)
-
class LDAPIdentityEnabledEmulation(LDAPIdentity):
def setUp(self):
@@ -733,3 +703,41 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity):
def test_user_enable_attribute_mask(self):
raise nose.exc.SkipTest(
"Enabled emulation conflicts with enabled mask")
+
+
+class LdapIdentitySqlAssignment(sql.Base, test.TestCase, BaseLDAPIdentity):
+
+ def _set_config(self):
+ self.config([test.etcdir('keystone.conf.sample'),
+ test.testsdir('test_overrides.conf'),
+ test.testsdir('backend_ldap_sql.conf')])
+
+ def setUp(self):
+ self._set_config()
+ self.clear_database()
+ self.load_backends()
+ self.engine = self.get_engine()
+ sql.ModelBase.metadata.create_all(bind=self.engine)
+ self.load_fixtures(default_fixtures)
+ #defaulted by the data load
+ self.user_foo['enabled'] = True
+
+ def tearDown(self):
+ sql.ModelBase.metadata.drop_all(bind=self.engine)
+ self.engine.dispose()
+ sql.set_global_engine(None)
+
+ def test_domain_crud(self):
+ pass
+
+ def test_list_domains(self):
+ domains = self.identity_api.list_domains()
+ self.assertEquals(domains, [assignment.DEFAULT_DOMAIN])
+
+ def test_project_filter(self):
+ raise nose.exc.SkipTest(
+ 'N/A: Not part of SQL backend')
+
+ def test_role_filter(self):
+ raise nose.exc.SkipTest(
+ 'N/A: Not part of SQL backend')
diff --git a/tests/test_drivers.py b/tests/test_drivers.py
index 439b0d30..c83c1a89 100644
--- a/tests/test_drivers.py
+++ b/tests/test_drivers.py
@@ -1,6 +1,7 @@
import inspect
import unittest2 as unittest
+from keystone import assignment
from keystone import catalog
from keystone import exception
from keystone import identity
@@ -35,6 +36,10 @@ class TestDrivers(unittest.TestCase):
if name[0] != '_' and callable(method):
self.assertMethodNotImplemented(method)
+ def test_assignment_driver_unimplemented(self):
+ interface = assignment.Driver()
+ self.assertInterfaceNotImplemented(interface)
+
def test_catalog_driver_unimplemented(self):
interface = catalog.Driver()
self.assertInterfaceNotImplemented(interface)