summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-05-21 23:45:36 +0000
committerGerrit Code Review <review@openstack.org>2013-05-21 23:45:36 +0000
commit7db13c58bc8fe0aac35b68ac7296cabee2b5004d (patch)
treea4a368566a8ef21180f78acd466c40efb993e975
parent6d776b779df29905505b2a27eb1bc1f832eaf0a3 (diff)
parent5d26d091e128c474739848cd987ccacb326c5cc8 (diff)
downloadkeystone-7db13c58bc8fe0aac35b68ac7296cabee2b5004d.tar.gz
keystone-7db13c58bc8fe0aac35b68ac7296cabee2b5004d.tar.xz
keystone-7db13c58bc8fe0aac35b68ac7296cabee2b5004d.zip
Merge "Read-only default domain for LDAP (bug 1168726)"
-rw-r--r--keystone/common/ldap/core.py8
-rw-r--r--keystone/identity/backends/ldap/core.py191
-rw-r--r--tests/test_backend.py53
-rw-r--r--tests/test_backend_ldap.py92
4 files changed, 176 insertions, 168 deletions
diff --git a/keystone/common/ldap/core.py b/keystone/common/ldap/core.py
index 7113fbbb..da3be1df 100644
--- a/keystone/common/ldap/core.py
+++ b/keystone/common/ldap/core.py
@@ -393,6 +393,8 @@ class BaseLdap(object):
except ldap.NO_SUCH_OBJECT:
raise self._not_found(id)
+ return self.get(id)
+
def delete(self, id):
if not self.allow_delete:
action = _('LDAP %s delete') % self.options_name
@@ -688,14 +690,16 @@ class EnabledEmuMixIn(BaseLdap):
if 'enabled' not in self.attribute_ignore and self.enabled_emulation:
data = values.copy()
enabled_value = data.pop('enabled', None)
- super(EnabledEmuMixIn, self).update(object_id, data, old_obj)
+ ref = super(EnabledEmuMixIn, self).update(object_id, data, old_obj)
if enabled_value is not None:
if enabled_value:
self._add_enabled(object_id)
else:
self._remove_enabled(object_id)
+ return ref
else:
- super(EnabledEmuMixIn, self).update(object_id, values, old_obj)
+ return super(EnabledEmuMixIn, self).update(
+ object_id, values, old_obj)
def delete(self, object_id):
if self.enabled_emulation:
diff --git a/keystone/identity/backends/ldap/core.py b/keystone/identity/backends/ldap/core.py
index 03d97700..1fad1120 100644
--- a/keystone/identity/backends/ldap/core.py
+++ b/keystone/identity/backends/ldap/core.py
@@ -28,10 +28,16 @@ from keystone import config
from keystone import exception
from keystone import identity
-CONF = config.CONF
+CONF = config.CONF
LOG = logging.getLogger(__name__)
+DEFAULT_DOMAIN = {
+ 'id': CONF.identity.default_domain_id,
+ 'name': 'Default',
+ 'enabled': True
+}
+
class Identity(identity.Driver):
def __init__(self):
@@ -45,7 +51,6 @@ class Identity(identity.Driver):
self.project = ProjectApi(CONF)
self.role = RoleApi(CONF)
self.group = GroupApi(CONF)
- self.domain = DomainApi(CONF)
def get_connection(self, user=None, password=None):
if self.LDAP_URL.startswith('fake://'):
@@ -59,6 +64,36 @@ class Identity(identity.Driver):
conn.simple_bind_s(user, password)
return conn
+ def _validate_domain(self, ref):
+ """Validate that either the default domain or nothing is specified.
+
+ Also removes the domain from the ref so that LDAP doesn't have to
+ persist the attribute.
+
+ """
+ ref = ref.copy()
+ domain_id = ref.pop('domain_id', CONF.identity.default_domain_id)
+ self._validate_domain_id(domain_id)
+ return ref
+
+ def _validate_domain_id(self, domain_id):
+ """Validate that the domain ID specified belongs to the default domain.
+
+ """
+ if domain_id != CONF.identity.default_domain_id:
+ raise exception.DomainNotFound(domain_id=domain_id)
+
+ def _set_default_domain(self, ref):
+ """Overrides any domain reference with the default domain."""
+ if isinstance(ref, dict):
+ ref = ref.copy()
+ ref['domain_id'] = CONF.identity.default_domain_id
+ return ref
+ elif isinstance(ref, list):
+ return [self._set_default_domain(x) for x in ref]
+ else:
+ raise ValueError(_('Expected dict or list: %s') % type(ref))
+
# Identity interface
def authenticate(self, user_id=None, tenant_id=None, password=None):
"""Authenticate based on a user, tenant and password.
@@ -97,38 +132,38 @@ class Identity(identity.Driver):
except exception.MetadataNotFound:
metadata_ref = {}
- return (identity.filter_user(user_ref), tenant_ref, metadata_ref)
+ user_ref = self._set_default_domain(identity.filter_user(user_ref))
+ return (user_ref, tenant_ref, metadata_ref)
def get_project(self, tenant_id):
- return self.project.get(tenant_id)
+ return self._set_default_domain(self.project.get(tenant_id))
def list_projects(self):
- return self.project.get_all()
+ return self._set_default_domain(self.project.get_all())
def get_project_by_name(self, tenant_name, domain_id):
- # TODO(henry-nash): Use domain_id once domains are implemented
- # in LDAP backend
- return self.project.get_by_name(tenant_name)
+ self._validate_domain_id(domain_id)
+ return self._set_default_domain(self.project.get_by_name(tenant_name))
def _get_user(self, user_id):
return self.user.get(user_id)
def get_user(self, user_id):
- return identity.filter_user(self._get_user(user_id))
+ ref = identity.filter_user(self._get_user(user_id))
+ return self._set_default_domain(ref)
def list_users(self):
- return self.user.get_all()
+ return self._set_default_domain(self.user.get_all())
def get_user_by_name(self, user_name, domain_id):
- # TODO(henry-nash): Use domain_id once domains are implemented
- # in LDAP backend
- return identity.filter_user(self.user.get_by_name(user_name))
+ self._validate_domain_id(domain_id)
+ ref = identity.filter_user(self.user.get_by_name(user_name))
+ return self._set_default_domain(ref)
def get_metadata(self, user_id=None, tenant_id=None,
domain_id=None, group_id=None):
- # FIXME(henry-nash): Use domain_id and group_id once domains
- # and groups are implemented in LDAP backend
-
+ if domain_id is not None:
+ raise NotImplemented('Domain metadata not supported by LDAP.')
if not self.get_project(tenant_id) or not self.get_user(user_id):
return {}
@@ -149,7 +184,7 @@ class Identity(identity.Driver):
def get_project_users(self, tenant_id):
self.get_project(tenant_id)
- return self.project.get_users(tenant_id)
+ return self._set_default_domain(self.project.get_users(tenant_id))
def get_roles_for_user_and_project(self, user_id, tenant_id):
self.get_user(user_id)
@@ -165,30 +200,35 @@ class Identity(identity.Driver):
# CRUD
def create_user(self, user_id, user):
+ user = self._validate_domain(user)
user['name'] = clean.user_name(user['name'])
user['enabled'] = clean.user_enabled(user.get('enabled', True))
- return identity.filter_user(self.user.create(user))
+ user_ref = self.user.create(user)
+ return self._set_default_domain(identity.filter_user(user_ref))
def update_user(self, user_id, user):
+ user = self._validate_domain(user)
if 'name' in user:
user['name'] = clean.user_name(user['name'])
if 'enabled' in user:
user['enabled'] = clean.user_enabled(user['enabled'])
- return self.user.update(user_id, user)
+ return self._set_default_domain(self.user.update(user_id, user))
def create_project(self, tenant_id, tenant):
+ tenant = self._validate_domain(tenant)
tenant['name'] = clean.project_name(tenant['name'])
data = tenant.copy()
if 'id' not in data or data['id'] is None:
data['id'] = str(uuid.uuid4().hex)
if 'description' in data and data['description'] in ['', None]:
data.pop('description')
- return self.project.create(data)
+ return self._set_default_domain(self.project.create(data))
def update_project(self, tenant_id, tenant):
+ tenant = self._validate_domain(tenant)
if 'name' in tenant:
tenant['name'] = clean.project_name(tenant['name'])
- return self.project.update(tenant_id, tenant)
+ return self._set_default_domain(self.project.update(tenant_id, tenant))
def create_metadata(self, user_id, tenant_id, metadata):
return {}
@@ -229,16 +269,18 @@ class Identity(identity.Driver):
self.role.update(role_id, role)
def create_group(self, group_id, group):
+ group = self._validate_domain(group)
group['name'] = clean.group_name(group['name'])
- return self.group.create(group)
+ return self._set_default_domain(self.group.create(group))
def get_group(self, group_id):
- return self.group.get(group_id)
+ return self._set_default_domain(self.group.get(group_id))
def update_group(self, group_id, group):
+ group = self._validate_domain(group)
if 'name' in group:
group['name'] = clean.group_name(group['name'])
- return self.group.update(group_id, group)
+ return self._set_default_domain(self.group.update(group_id, group))
def delete_group(self, group_id):
return self.group.delete(group_id)
@@ -255,14 +297,14 @@ class Identity(identity.Driver):
def list_groups_for_user(self, user_id):
self.get_user(user_id)
- return self.group.list_user_groups(user_id)
+ return self._set_default_domain(self.group.list_user_groups(user_id))
def list_groups(self):
- return self.group.get_all()
+ return self._set_default_domain(self.group.get_all())
def list_users_in_group(self, group_id):
self.get_group(group_id)
- return self.group.list_group_users(group_id)
+ return self._set_default_domain(self.group.list_group_users(group_id))
def check_user_in_group(self, user_id, group_id):
self.get_user(user_id)
@@ -276,28 +318,25 @@ class Identity(identity.Driver):
return found
def create_domain(self, domain_id, domain):
- domain['name'] = clean.domain_name(domain['name'])
- return self.domain.create(domain)
+ if domain_id == CONF.identity.default_domain_id:
+ msg = 'Duplicate ID, %s.' % domain_id
+ raise exception.Conflict(type='domain', details=msg)
+ raise exception.Forbidden('Domains are read-only against LDAP')
def get_domain(self, domain_id):
- try:
- return self.domain.get(domain_id)
- except exception.NotFound:
- raise exception.DomainNotFound(domain_id=domain_id)
+ self._validate_domain_id(domain_id)
+ return DEFAULT_DOMAIN
def update_domain(self, domain_id, domain):
- if 'name' in domain:
- domain['name'] = clean.domain_name(domain['name'])
- return self.domain.update(domain_id, domain)
+ self._validate_domain_id(domain_id)
+ raise exception.Forbidden('Domains are read-only against LDAP')
def delete_domain(self, domain_id):
- try:
- return self.domain.delete(domain_id)
- except ldap.NO_SUCH_OBJECT:
- raise exception.DomainNotFound(domain_id=domain_id)
+ self._validate_domain_id(domain_id)
+ raise exception.Forbidden('Domains are read-only against LDAP')
def list_domains(self):
- return self.domain.get_all()
+ return [DEFAULT_DOMAIN]
# TODO(termie): remove this and move cross-api calls into driver
@@ -341,12 +380,6 @@ class ApiShim(object):
self._group = GroupApi(self.conf)
return self._group
- @property
- def domain(self):
- if not self._domain:
- self._domain = DomainApi(self.conf)
- return self._domain
-
# TODO(termie): remove this and move cross-api calls into driver
class ApiShimMixin(object):
@@ -580,7 +613,7 @@ class ProjectApi(common_ldap.EnabledEmuMixIn, common_ldap.BaseLdap,
if old_obj['name'] != values['name']:
msg = 'Changing Name not supported by LDAP'
raise exception.NotImplemented(message=msg)
- super(ProjectApi, self).update(id, values, old_obj)
+ return super(ProjectApi, self).update(id, values, old_obj)
class UserRoleAssociation(object):
@@ -791,7 +824,7 @@ class RoleApi(common_ldap.BaseLdap, ApiShimMixin):
raise exception.Conflict('Cannot duplicate name %s' % old_name)
except exception.NotFound:
pass
- super(RoleApi, self).update(role_id, role)
+ return super(RoleApi, self).update(role_id, role)
def delete(self, id):
conn = self.get_connection()
@@ -878,7 +911,7 @@ class GroupApi(common_ldap.BaseLdap, ApiShimMixin):
if old_obj['name'] != values['name']:
msg = _('Changing Name not supported by LDAP')
raise exception.NotImplemented(message=msg)
- super(GroupApi, self).update(id, values, old_obj)
+ return super(GroupApi, self).update(id, values, old_obj)
def add_user(self, user_id, group_id):
conn = self.get_connection()
@@ -937,61 +970,3 @@ class GroupApi(common_ldap.BaseLdap, ApiShimMixin):
" from the group. The user will be ignored.") %
dict(user_dn=user_dn, group_dn=group_dn))
return users
-
-
-class DomainApi(common_ldap.EnabledEmuMixIn, common_ldap.BaseLdap,
- ApiShimMixin):
- DEFAULT_OU = 'ou=Domains'
- DEFAULT_STRUCTURAL_CLASSES = []
- DEFAULT_OBJECTCLASS = 'groupOfNames'
- DEFAULT_ID_ATTR = 'cn'
- DEFAULT_MEMBER_ATTRIBUTE = 'member'
- DEFAULT_ATTRIBUTE_IGNORE = []
- options_name = 'domain'
- attribute_mapping = {'name': 'ou',
- 'description': 'description',
- 'domainId': 'cn',
- 'enabled': 'enabled'}
- model = models.Domain
-
- def __init__(self, conf):
- super(DomainApi, self).__init__(conf)
- self.api = ApiShim(conf)
- self.attribute_mapping['name'] = conf.ldap.domain_name_attribute
- self.attribute_mapping['description'] = conf.ldap.domain_desc_attribute
- self.attribute_mapping['enabled'] = conf.ldap.tenant_enabled_attribute
- self.member_attribute = (getattr(conf.ldap, 'domain_member_attribute')
- or self.DEFAULT_MEMBER_ATTRIBUTE)
- self.attribute_ignore = (getattr(conf.ldap, 'domain_attribute_ignore')
- or self.DEFAULT_ATTRIBUTE_IGNORE)
-
- def get(self, id, filter=None):
- """Replaces exception.NotFound with exception.DomainNotFound."""
- try:
- return super(DomainApi, self).get(id, filter)
- except exception.NotFound:
- raise exception.DomainNotFound(domain_id=id)
-
- def create(self, values):
- self.affirm_unique(values)
- data = values.copy()
- if data.get('id') is None:
- data['id'] = uuid.uuid4().hex
- return super(DomainApi, self).create(data)
-
- def delete(self, id):
- if self.subtree_delete_enabled:
- super(DomainApi, self).deleteTree(id)
- else:
- self.role_api.roles_delete_subtree_by_type(id, 'Domain')
- super(DomainApi, self).delete(id)
-
- def update(self, id, values):
- try:
- old_obj = self.get(id)
- except exception.NotFound:
- raise exception.DomainNotFound(domain_id=id)
- if old_obj['name'] != values['name']:
- msg = _('Changing Name not supported by LDAP')
- raise exception.NotImplemented(message=msg)
- super(DomainApi, self).update(id, values, old_obj)
diff --git a/tests/test_backend.py b/tests/test_backend.py
index dd9bc91b..c465d128 100644
--- a/tests/test_backend.py
+++ b/tests/test_backend.py
@@ -32,6 +32,11 @@ NULL_OBJECT = object()
class IdentityTests(object):
+ def _get_domain_fixture(self):
+ domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.identity_api.create_domain(domain['id'], domain)
+ return domain
+
def test_project_add_and_remove_user_role(self):
user_refs = self.identity_api.get_project_users(self.tenant_bar['id'])
self.assertNotIn(self.user_two['id'], [x['id'] for x in user_refs])
@@ -569,8 +574,7 @@ class IdentityTests(object):
- Check non-existing domain gives DomainNotFound
"""
- new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
- self.identity_api.create_domain(new_domain['id'], new_domain)
+ new_domain = self._get_domain_fixture()
new_user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'password': uuid.uuid4().hex, 'enabled': True,
'domain_id': new_domain['id']}
@@ -1581,12 +1585,14 @@ class IdentityTests(object):
self.assertTrue(x for x in users if x['id'] == test_user['id'])
def test_list_groups(self):
- domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
- self.identity_api.create_domain(domain['id'], domain)
- group1 = {'id': uuid.uuid4().hex, 'domain_id': domain['id'],
- 'name': uuid.uuid4().hex}
- group2 = {'id': uuid.uuid4().hex, 'domain_id': domain['id'],
- 'name': uuid.uuid4().hex}
+ group1 = {
+ 'id': uuid.uuid4().hex,
+ 'domain_id': CONF.identity.default_domain_id,
+ 'name': uuid.uuid4().hex}
+ group2 = {
+ 'id': uuid.uuid4().hex,
+ 'domain_id': CONF.identity.default_domain_id,
+ 'name': uuid.uuid4().hex}
self.identity_man.create_group({}, group1['id'], group1)
self.identity_man.create_group({}, group2['id'], group2)
groups = self.identity_api.list_groups()
@@ -1736,8 +1742,7 @@ class IdentityTests(object):
self.assertEqual(tenant_ref['enabled'], tenant['enabled'])
def test_add_user_to_group(self):
- domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
- self.identity_api.create_domain(domain['id'], domain)
+ domain = self._get_domain_fixture()
new_group = {'id': uuid.uuid4().hex, 'domain_id': domain['id'],
'name': uuid.uuid4().hex}
self.identity_man.create_group({}, new_group['id'], new_group)
@@ -1756,8 +1761,7 @@ class IdentityTests(object):
self.assertTrue(found)
def test_add_user_to_group_404(self):
- domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
- self.identity_api.create_domain(domain['id'], domain)
+ domain = self._get_domain_fixture()
new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',
'password': uuid.uuid4().hex, 'enabled': True,
'domain_id': domain['id']}
@@ -1776,8 +1780,7 @@ class IdentityTests(object):
new_group['id'])
def test_check_user_in_group(self):
- domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
- self.identity_api.create_domain(domain['id'], domain)
+ domain = self._get_domain_fixture()
new_group = {'id': uuid.uuid4().hex, 'domain_id': domain['id'],
'name': uuid.uuid4().hex}
self.identity_man.create_group({}, new_group['id'], new_group)
@@ -1790,10 +1793,10 @@ class IdentityTests(object):
self.identity_api.check_user_in_group(new_user['id'], new_group['id'])
def test_check_user_not_in_group(self):
- domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
- self.identity_api.create_domain(domain['id'], domain)
- new_group = {'id': uuid.uuid4().hex, 'domain_id': domain['id'],
- 'name': uuid.uuid4().hex}
+ new_group = {
+ 'id': uuid.uuid4().hex,
+ 'domain_id': CONF.identity.default_domain_id,
+ 'name': uuid.uuid4().hex}
self.identity_man.create_group({}, new_group['id'], new_group)
self.assertRaises(exception.UserNotFound,
self.identity_api.check_user_in_group,
@@ -1801,8 +1804,7 @@ class IdentityTests(object):
new_group['id'])
def test_list_users_in_group(self):
- domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
- self.identity_api.create_domain(domain['id'], domain)
+ domain = self._get_domain_fixture()
new_group = {'id': uuid.uuid4().hex, 'domain_id': domain['id'],
'name': uuid.uuid4().hex}
self.identity_man.create_group({}, new_group['id'], new_group)
@@ -1820,8 +1822,7 @@ class IdentityTests(object):
self.assertTrue(found)
def test_remove_user_from_group(self):
- domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
- self.identity_api.create_domain(domain['id'], domain)
+ domain = self._get_domain_fixture()
new_group = {'id': uuid.uuid4().hex, 'domain_id': domain['id'],
'name': uuid.uuid4().hex}
self.identity_man.create_group({}, new_group['id'], new_group)
@@ -1839,8 +1840,7 @@ class IdentityTests(object):
self.assertFalse(x['id'] == new_group['id'])
def test_remove_user_from_group_404(self):
- domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
- self.identity_api.create_domain(domain['id'], domain)
+ domain = self._get_domain_fixture()
new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',
'password': uuid.uuid4().hex, 'enabled': True,
'domain_id': domain['id']}
@@ -1976,9 +1976,8 @@ class IdentityTests(object):
domain['id'])
def test_user_crud(self):
- domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
- self.identity_api.create_domain(domain['id'], domain)
- user = {'domain_id': domain['id'], 'id': uuid.uuid4().hex,
+ user = {'domain_id': CONF.identity.default_domain_id,
+ 'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex, 'password': 'passw0rd'}
self.identity_api.create_user(user['id'], user)
user_ref = self.identity_api.get_user(user['id'])
diff --git a/tests/test_backend_ldap.py b/tests/test_backend_ldap.py
index ef409902..c0bceea5 100644
--- a/tests/test_backend_ldap.py
+++ b/tests/test_backend_ldap.py
@@ -32,6 +32,9 @@ CONF = config.CONF
class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
+ def _get_domain_fixture(self):
+ """Domains in LDAP are read-only, so just return the static one."""
+ return self.identity_api.get_domain(CONF.identity.default_domain_id)
def clear_database(self):
db = fakeldap.FakeShelve().get_instance()
@@ -392,8 +395,11 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
# In the tests below, the update is demonstrated by updating description.
# Refer to bug 1136403 for more detail.
def test_group_crud(self):
- group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex, 'description': uuid.uuid4().hex}
+ group = {
+ 'id': uuid.uuid4().hex,
+ 'domain_id': CONF.identity.default_domain_id,
+ 'name': uuid.uuid4().hex,
+ 'description': uuid.uuid4().hex}
self.identity_api.create_group(group['id'], group)
group_ref = self.identity_api.get_group(group['id'])
self.assertDictEqual(group_ref, group)
@@ -410,15 +416,25 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
def test_domain_crud(self):
domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'enabled': True, 'description': uuid.uuid4().hex}
- self.identity_api.create_domain(domain['id'], domain)
- domain_ref = self.identity_api.get_domain(domain['id'])
- self.assertDictEqual(domain_ref, domain)
- domain['description'] = uuid.uuid4().hex
- self.identity_api.update_domain(domain['id'], domain)
- domain_ref = self.identity_api.get_domain(domain['id'])
- self.assertDictEqual(domain_ref, domain)
-
- self.identity_api.delete_domain(domain['id'])
+ with self.assertRaises(exception.Forbidden):
+ self.identity_api.create_domain(domain['id'], domain)
+ with self.assertRaises(exception.Conflict):
+ self.identity_api.create_domain(
+ CONF.identity.default_domain_id, domain)
+ with self.assertRaises(exception.DomainNotFound):
+ domain_ref = self.identity_api.get_domain(domain['id'])
+ with self.assertRaises(exception.DomainNotFound):
+ domain['description'] = uuid.uuid4().hex
+ self.identity_api.update_domain(domain['id'], domain)
+ with self.assertRaises(exception.Forbidden):
+ self.identity_api.update_domain(
+ CONF.identity.default_domain_id, domain)
+ with self.assertRaises(exception.DomainNotFound):
+ self.identity_api.get_domain(domain['id'])
+ with self.assertRaises(exception.DomainNotFound):
+ self.identity_api.delete_domain(domain['id'])
+ with self.assertRaises(exception.Forbidden):
+ self.identity_api.delete_domain(CONF.identity.default_domain_id)
self.assertRaises(exception.DomainNotFound,
self.identity_api.get_domain,
domain['id'])
@@ -439,10 +455,10 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
raise nose.exc.SkipTest('Blocked by bug 1101287')
def test_get_and_remove_role_grant_by_group_and_domain(self):
- raise nose.exc.SkipTest('Blocked by bug 1101287')
+ raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
def test_get_and_remove_role_grant_by_user_and_domain(self):
- raise nose.exc.SkipTest('Blocked by bug 1101287')
+ raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
def test_get_and_remove_correct_role_grant_from_a_mix(self):
raise nose.exc.SkipTest('Blocked by bug 1101287')
@@ -452,7 +468,7 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
# updating of a project name so this method override
# provides a different update test
project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
- 'domain_id': uuid.uuid4().hex,
+ 'domain_id': CONF.identity.default_domain_id,
'description': uuid.uuid4().hex
}
self.identity_api.create_project(project['id'], project)
@@ -475,34 +491,34 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
project['id'])
def test_get_and_remove_role_grant_by_group_and_cross_domain(self):
- raise nose.exc.SkipTest('Blocked by bug 1101287')
+ raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
def test_get_and_remove_role_grant_by_user_and_cross_domain(self):
- raise nose.exc.SkipTest('Blocked by bug 1101287')
+ raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
def test_role_grant_by_group_and_cross_domain_project(self):
- raise nose.exc.SkipTest('Blocked by bug 1101287')
+ raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
def test_role_grant_by_user_and_cross_domain_project(self):
- raise nose.exc.SkipTest('Blocked by bug 1101287')
+ raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
def test_multi_role_grant_by_user_group_on_project_domain(self):
- raise nose.exc.SkipTest('Blocked by bug 1101287')
+ raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
def test_delete_role_with_user_and_group_grants(self):
raise nose.exc.SkipTest('Blocked by bug 1101287')
def test_delete_user_with_group_project_domain_links(self):
- raise nose.exc.SkipTest('Blocked by bug 1101287')
+ raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
def test_delete_group_with_user_project_domain_links(self):
- raise nose.exc.SkipTest('Blocked by bug 1101287')
+ raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
def test_list_user_projects(self):
raise nose.exc.SkipTest('Blocked by bug 1101287')
def test_get_project_users(self):
- raise nose.exc.SkipTest('Blocked by bug 1101287')
+ raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
def test_create_duplicate_user_name_in_different_domains(self):
raise nose.exc.SkipTest('Blocked by bug 1101276')
@@ -511,7 +527,8 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
raise nose.exc.SkipTest('Blocked by bug 1101276')
def test_create_duplicate_group_name_in_different_domains(self):
- raise nose.exc.SkipTest('Blocked by bug 1101276')
+ raise nose.exc.SkipTest(
+ 'N/A: LDAP does not support multiple domains')
def test_move_user_between_domains(self):
raise nose.exc.SkipTest('Blocked by bug 1101276')
@@ -520,7 +537,8 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
raise nose.exc.SkipTest('Blocked by bug 1101276')
def test_move_group_between_domains(self):
- raise nose.exc.SkipTest('Blocked by bug 1101276')
+ raise nose.exc.SkipTest(
+ 'N/A: LDAP does not support multiple domains')
def test_move_group_between_domains_with_clashing_names_fails(self):
raise nose.exc.SkipTest('Blocked by bug 1101276')
@@ -532,7 +550,7 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
raise nose.exc.SkipTest('Blocked by bug 1101276')
def test_get_roles_for_user_and_domain(self):
- raise nose.exc.SkipTest('Blocked by bug 1101287')
+ raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
def test_list_group_members_missing_entry(self):
"""List group members with deleted user.
@@ -569,6 +587,14 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
self.assertEqual(len(res), 1, "Expected 1 entry (user_1)")
self.assertEqual(res[0]['id'], user_1_id, "Expected user 1 id")
+ def test_list_domains(self):
+ domains = self.identity_api.list_domains()
+ self.assertEquals(
+ domains,
+ [{'id': CONF.identity.default_domain_id,
+ 'name': 'Default',
+ 'enabled': True}])
+
class LDAPIdentityEnabledEmulation(LDAPIdentity):
def setUp(self):
@@ -614,10 +640,11 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity):
# NOTE(topol): LDAPIdentityEnabledEmulation will create an
# enabled key in the project dictionary so this
# method override handles this side-effect
- project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
- 'domain_id': uuid.uuid4().hex,
- 'description': uuid.uuid4().hex
- }
+ project = {
+ 'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': CONF.identity.default_domain_id,
+ 'description': uuid.uuid4().hex}
self.identity_api.create_project(project['id'], project)
project_ref = self.identity_api.get_project(project['id'])
@@ -639,8 +666,11 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity):
project['id'])
def test_user_crud(self):
- user = {'domain_id': uuid.uuid4().hex, 'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex, 'password': 'passw0rd'}
+ user = {
+ 'id': uuid.uuid4().hex,
+ 'domain_id': CONF.identity.default_domain_id,
+ 'name': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex}
self.identity_man.create_user({}, user['id'], user)
user['enabled'] = True
user_ref = self.identity_api.get_user(user['id'])