diff options
-rw-r--r-- | keystone/identity/backends/ldap/core.py | 83 | ||||
-rw-r--r-- | tests/_ldap_livetest.py | 8 | ||||
-rw-r--r-- | tests/test_backend_ldap.py | 85 |
3 files changed, 80 insertions, 96 deletions
diff --git a/keystone/identity/backends/ldap/core.py b/keystone/identity/backends/ldap/core.py index cf8c53e1..2839a175 100644 --- a/keystone/identity/backends/ldap/core.py +++ b/keystone/identity/backends/ldap/core.py @@ -167,6 +167,13 @@ class Identity(identity.Driver): tenant_list.append(tenant['id']) return tenant_list + def get_tenant_users(self, tenant_id): + self.get_tenant(tenant_id) + user_list = [] + for user in self.tenant.get_users(tenant_id): + user_list.append(user) + return user_list + def get_roles_for_user_and_tenant(self, user_id, tenant_id): self.get_user(user_id) self.get_tenant(tenant_id) @@ -230,6 +237,30 @@ class Identity(identity.Driver): except ldap.NO_SUCH_OBJECT: raise exception.RoleNotFound(role_id=role_id) + def delete_tenant(self, tenant_id): + try: + return self.tenant.delete(tenant_id) + except ldap.NO_SUCH_OBJECT: + raise exception.TenantNotFound(tenant_id=tenant_id) + + def delete_user(self, user_id): + try: + return self.user.delete(user_id) + except ldap.NO_SUCH_OBJECT: + raise exception.UserNotFound(user_id=user_id) + + def remove_role_from_user_and_tenant(self, user_id, tenant_id, role_id): + return self.role.delete_user(role_id, user_id, tenant_id) + + def remove_user_from_tenant(self, tenant_id, user_id): + self.get_user(user_id) + self.get_tenant(tenant_id) + return self.tenant.remove_user(tenant_id, user_id) + + def update_role(self, role_id, role): + self.get_role(role_id) + self.role.update(role_id, role) + # TODO(termie): remove this and move cross-api calls into driver class ApiShim(object): @@ -338,8 +369,7 @@ class UserApi(common_ldap.BaseLdap, ApiShimMixin): except exception.NotFound: raise exception.UserNotFound(user_id=id) if old_obj.get('name') != values['name']: - raise exception.ValidationError('Cannot change user name') - + raise exception.Conflict('Cannot change user name') try: new_tenant = values['tenant_id'] except KeyError: @@ -356,7 +386,7 @@ class UserApi(common_ldap.BaseLdap, ApiShimMixin): def delete(self, id): user = self.get(id) - if user.tenant_id: + if hasattr(user, 'tenant_id'): self.tenant_api.remove_user(user.tenant_id, id) super(UserApi, self).delete(id) @@ -501,10 +531,13 @@ class TenantApi(common_ldap.BaseLdap, ApiShimMixin): def remove_user(self, tenant_id, user_id): conn = self.get_connection() - conn.modify_s(self._id_to_dn(tenant_id), - [(ldap.MOD_DELETE, - self.member_attribute, - self.user_api._id_to_dn(user_id))]) + try: + conn.modify_s(self._id_to_dn(tenant_id), + [(ldap.MOD_DELETE, + self.member_attribute, + self.user_api._id_to_dn(user_id))]) + except ldap.NO_SUCH_ATTRIBUTE: + raise exception.NotFound(user_id) def get_users(self, tenant_id, role_id=None): tenant = self._ldap_get(tenant_id) @@ -655,6 +688,29 @@ class RoleApi(common_ldap.BaseLdap, ApiShimMixin): user_id=user_id, tenant_id=tenant_id) + def delete_user(self, role_id, user_id, tenant_id): + role_dn = self._subrole_id_to_dn(role_id, tenant_id) + conn = self.get_connection() + user_dn = self.user_api._id_to_dn(user_id) + try: + conn.modify_s(role_dn, [(ldap.MOD_DELETE, + self.member_attribute, user_dn)]) + except ldap.NO_SUCH_OBJECT: + if tenant_id is None or self.get(role_id) is None: + raise exception.RoleNotFound(role_id=roll_id) + attrs = [('objectClass', [self.object_class]), + (self.member_attribute, [user_dn])] + + if self.use_dumb_member: + attrs[1][1].append(self.DUMB_MEMBER_DN) + try: + conn.add_s(role_dn, attrs) + except Exception as inst: + raise inst + + except ldap.NO_SUCH_ATTRIBUTE: + raise exception.UserNotFound(user_id=user_id) + def get_by_service(self, service_id): roles = self.get_all('(service_id=%s)' % ldap_filter.escape_filter_chars(service_id)) @@ -874,3 +930,16 @@ class RoleApi(common_ldap.BaseLdap, ApiShimMixin): role_id=role.id, user_id=user_id) return None + + def update(self, role_id, role): + if role['id'] != role_id: + raise exception.ValidationError('Cannot change role ID') + try: + old_name = self.get_by_name(role['name']) + raise exception.Conflict('Cannot duplicate name %s' % role['name']) + except exception.NotFound: + pass + try: + super(RoleApi, self).update(id, role) + except exception.NotFound: + raise exception.UserNotFound(user_id=id) diff --git a/tests/_ldap_livetest.py b/tests/_ldap_livetest.py index 1a7e3276..73a9bad0 100644 --- a/tests/_ldap_livetest.py +++ b/tests/_ldap_livetest.py @@ -22,6 +22,7 @@ from keystone import test import default_fixtures import test_backend +import test_backend_ldap CONF = config.CONF @@ -40,10 +41,9 @@ def delete_object(name): def clear_live_database(): - roles = ['keystone_admin'] + roles = ['keystone_admin', 'fake1', 'fake2', 'useless'] groups = ['baz', 'bar', 'tenent4add', 'fake1', 'fake2'] users = ['foo', 'two', 'fake1', 'fake2', 'no_meta'] - roles = ['keystone_admin', 'useless'] for group in groups: for role in roles: @@ -57,9 +57,9 @@ def clear_live_database(): delete_object('cn=%s,ou=Roles' % role) -class LDAPIdentity(test.TestCase, test_backend.IdentityTests): +class LiveLDAPIdentity(test_backend_ldap.LDAPIdentity): def setUp(self): - super(LDAPIdentity, self).setUp() + super(LiveLDAPIdentity, self).setUp() self.config([test.etcdir('keystone.conf.sample'), test.testsdir('test_overrides.conf'), test.testsdir('backend_liveldap.conf')]) diff --git a/tests/test_backend_ldap.py b/tests/test_backend_ldap.py index a8a80d97..5f0137c5 100644 --- a/tests/test_backend_ldap.py +++ b/tests/test_backend_ldap.py @@ -44,102 +44,17 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests): self.identity_api = identity_ldap.Identity() self.load_fixtures(default_fixtures) - def test_delete_tenant_404(self): - self.assertRaises(exception.NotImplemented, - self.identity_api.delete_tenant, - uuid.uuid4().hex) - - def test_delete_user_404(self): - self.assertRaises(exception.NotImplemented, - self.identity_api.delete_user, - uuid.uuid4().hex) - - def test_rename_duplicate_role_name_fails(self): - role1 = { - 'id': 'fake1', - 'name': 'fake1name' - } - role2 = { - 'id': 'fake2', - 'name': 'fake2name' - } - self.identity_api.create_role('fake1', role1) - self.identity_api.create_role('fake2', role2) - role1['name'] = 'fake2name' - self.assertRaises(exception.NotImplemented, - self.identity_api.update_role, - 'fake1', - role1) - - def test_rename_duplicate_user_name_fails(self): - user1 = {'id': 'fake1', - 'name': 'fake1', - 'password': 'fakepass', - 'tenants': ['bar']} - user2 = {'id': 'fake2', - 'name': 'fake2', - 'password': 'fakepass', - 'tenants': ['bar']} - self.identity_api.create_user('fake1', user1) - self.identity_api.create_user('fake2', user2) - user2['name'] = 'fake1' - self.assertRaises(exception.ValidationError, - self.identity_api.update_user, - 'fake2', - user2) - - def test_delete_user_with_tenant_association(self): - self.assertRaises(exception.NotImplemented, - self.identity_api.delete_user, - uuid.uuid4().hex) - - def test_remove_user_from_tenant(self): - self.assertRaises(exception.NotImplemented, - self.identity_api.remove_user_from_tenant, - self.tenant_bar['id'], - self.user_foo['id']) - - def test_remove_user_from_tenant_404(self): - self.assertRaises(exception.NotImplemented, - self.identity_api.remove_user_from_tenant, - self.tenant_bar['id'], - self.user_foo['id']) - - def test_remove_role_from_user_and_tenant(self): - self.assertRaises(exception.NotImplemented, - self.identity_api.remove_role_from_user_and_tenant, - self.tenant_bar['id'], - self.user_foo['id'], - 'useless') - def test_role_crud(self): role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_role(role['id'], role) role_ref = self.identity_api.get_role(role['id']) role_ref_dict = dict((x, role_ref[x]) for x in role_ref) self.assertDictEqual(role_ref_dict, role) - - self.assertRaises(exception.NotImplemented, - self.identity_api.update_role, - role['id'], - role) - self.identity_api.delete_role(role['id']) self.assertRaises(exception.RoleNotFound, self.identity_api.get_role, role['id']) - def test_update_role_404(self): - self.assertRaises(exception.NotImplemented, - self.identity_api.update_role, - uuid.uuid4().hex, - {}) - - def test_get_tenant_users_404(self): - self.assertRaises(exception.NotImplemented, - self.identity_api.get_tenant_users, - tenant_id=uuid.uuid4().hex) - def test_build_tree(self): """Regression test for building the tree names """ |