# # LDAP integration test # # Copyright (c) 2015 Red Hat, Inc. # Author: Lukas Slebodnik # # This is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by # the Free Software Foundation; version 2 only # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import os import stat import ent import grp import pwd import config import signal import subprocess import time import pytest import ds_openldap import ldap_ent import sssd_id from util import unindent LDAP_BASE_DN = "dc=example,dc=com" @pytest.fixture(scope="module") def ds_inst(request): """LDAP server instance fixture""" ds_inst = ds_openldap.DSOpenLDAP( config.PREFIX, 10389, LDAP_BASE_DN, "cn=admin", "Secret123") try: ds_inst.setup() except: ds_inst.teardown() raise request.addfinalizer(lambda: ds_inst.teardown()) return ds_inst @pytest.fixture(scope="module") def ldap_conn(request, ds_inst): """LDAP server connection fixture""" ldap_conn = ds_inst.bind() ldap_conn.ds_inst = ds_inst request.addfinalizer(lambda: ldap_conn.unbind_s()) return ldap_conn def create_ldap_fixture(request, ldap_conn, ent_list): """Add LDAP entries and add teardown for removing them""" for entry in ent_list: ldap_conn.add_s(entry[0], entry[1]) def teardown(): for entry in ent_list: ldap_conn.delete_s(entry[0]) request.addfinalizer(teardown) def create_conf_fixture(request, contents): """Generate sssd.conf and add teardown for removing it""" conf = open(config.CONF_PATH, "w") conf.write(contents) conf.close() os.chmod(config.CONF_PATH, stat.S_IRUSR | stat.S_IWUSR) request.addfinalizer(lambda: os.unlink(config.CONF_PATH)) def stop_sssd(): pid_file = open(config.PIDFILE_PATH, "r") pid = int(pid_file.read()) os.kill(pid, signal.SIGTERM) while True: try: os.kill(pid, signal.SIGCONT) except: break time.sleep(1) def create_sssd_fixture(request): """Start sssd and add teardown for stopping it and removing state""" if subprocess.call(["sssd", "-D", "-f"]) != 0: raise Exception("sssd start failed") def teardown(): try: stop_sssd() except: pass subprocess.call(["sss_cache", "-E"]) for path in os.listdir(config.DB_PATH): os.unlink(config.DB_PATH + "/" + path) for path in os.listdir(config.MCACHE_PATH): os.unlink(config.MCACHE_PATH + "/" + path) request.addfinalizer(teardown) def load_data_to_ldap(request, ldap_conn): ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) ent_list.add_user("user1", 1001, 2001) ent_list.add_user("user2", 1002, 2002) ent_list.add_user("user3", 1003, 2003) ent_list.add_user("user11", 1011, 2001) ent_list.add_user("user12", 1012, 2002) ent_list.add_user("user13", 1013, 2003) ent_list.add_user("user21", 1021, 2001) ent_list.add_user("user22", 1022, 2002) ent_list.add_user("user23", 1023, 2003) ent_list.add_group("group1", 2001, ["user1", "user11", "user21"]) ent_list.add_group("group2", 2002, ["user2", "user12", "user22"]) ent_list.add_group("group3", 2003, ["user3", "user13", "user23"]) ent_list.add_group("group0x", 2000, ["user1", "user2", "user3"]) ent_list.add_group("group1x", 2010, ["user11", "user12", "user13"]) ent_list.add_group("group2x", 2020, ["user21", "user22", "user23"]) create_ldap_fixture(request, ldap_conn, ent_list) @pytest.fixture def sanity(request, ldap_conn): load_data_to_ldap(request, ldap_conn) conf = unindent("""\ [sssd] domains = LDAP services = nss [nss] [domain/LDAP] ldap_auth_disable_tls_never_use_in_production = true ldap_schema = rfc2307 id_provider = ldap auth_provider = ldap sudo_provider = ldap ldap_uri = {ldap_conn.ds_inst.ldap_url} ldap_search_base = {ldap_conn.ds_inst.base_dn} """).format(**locals()) create_conf_fixture(request, conf) create_sssd_fixture(request) return None @pytest.fixture def fqname(request, ldap_conn): load_data_to_ldap(request, ldap_conn) conf = unindent("""\ [sssd] domains = LDAP services = nss [nss] [domain/LDAP] ldap_auth_disable_tls_never_use_in_production = true ldap_schema = rfc2307 id_provider = ldap auth_provider = ldap sudo_provider = ldap ldap_uri = {ldap_conn.ds_inst.ldap_url} ldap_search_base = {ldap_conn.ds_inst.base_dn} use_fully_qualified_names = true """).format(**locals()) create_conf_fixture(request, conf) create_sssd_fixture(request) return None @pytest.fixture def fqname_case_insensitive(request, ldap_conn): load_data_to_ldap(request, ldap_conn) conf = unindent("""\ [sssd] domains = LDAP services = nss [nss] [domain/LDAP] ldap_auth_disable_tls_never_use_in_production = true ldap_schema = rfc2307 id_provider = ldap auth_provider = ldap sudo_provider = ldap ldap_uri = {ldap_conn.ds_inst.ldap_url} ldap_search_base = {ldap_conn.ds_inst.base_dn} use_fully_qualified_names = true case_sensitive = false """).format(**locals()) create_conf_fixture(request, conf) create_sssd_fixture(request) return None def test_getpwnam(ldap_conn, sanity): ent.assert_passwd_by_name( 'user1', dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_passwd_by_uid( 1001, dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_passwd_by_name( 'user2', dict(name='user2', passwd='*', uid=1002, gid=2002, gecos='1002', shell='/bin/bash')) ent.assert_passwd_by_uid( 1002, dict(name='user2', passwd='*', uid=1002, gid=2002, gecos='1002', shell='/bin/bash')) ent.assert_passwd_by_name( 'user3', dict(name='user3', passwd='*', uid=1003, gid=2003, gecos='1003', shell='/bin/bash')) ent.assert_passwd_by_uid( 1003, dict(name='user3', passwd='*', uid=1003, gid=2003, gecos='1003', shell='/bin/bash')) ent.assert_passwd_by_name( 'user11', dict(name='user11', passwd='*', uid=1011, gid=2001, gecos='1011', shell='/bin/bash')) ent.assert_passwd_by_uid( 1011, dict(name='user11', passwd='*', uid=1011, gid=2001, gecos='1011', shell='/bin/bash')) ent.assert_passwd_by_name( 'user12', dict(name='user12', passwd='*', uid=1012, gid=2002, gecos='1012', shell='/bin/bash')) ent.assert_passwd_by_uid( 1012, dict(name='user12', passwd='*', uid=1012, gid=2002, gecos='1012', shell='/bin/bash')) ent.assert_passwd_by_name( 'user13', dict(name='user13', passwd='*', uid=1013, gid=2003, gecos='1013', shell='/bin/bash')) ent.assert_passwd_by_uid( 1013, dict(name='user13', passwd='*', uid=1013, gid=2003, gecos='1013', shell='/bin/bash')) ent.assert_passwd_by_name( 'user21', dict(name='user21', passwd='*', uid=1021, gid=2001, gecos='1021', shell='/bin/bash')) ent.assert_passwd_by_uid( 1021, dict(name='user21', passwd='*', uid=1021, gid=2001, gecos='1021', shell='/bin/bash')) ent.assert_passwd_by_name( 'user22', dict(name='user22', passwd='*', uid=1022, gid=2002, gecos='1022', shell='/bin/bash')) ent.assert_passwd_by_uid( 1022, dict(name='user22', passwd='*', uid=1022, gid=2002, gecos='1022', shell='/bin/bash')) ent.assert_passwd_by_name( 'user23', dict(name='user23', passwd='*', uid=1023, gid=2003, gecos='1023', shell='/bin/bash')) ent.assert_passwd_by_uid( 1023, dict(name='user23', passwd='*', uid=1023, gid=2003, gecos='1023', shell='/bin/bash')) def test_getpwnam_with_mc(ldap_conn, sanity): test_getpwnam(ldap_conn, sanity) stop_sssd() test_getpwnam(ldap_conn, sanity) def test_getgrnam_simple(ldap_conn, sanity): ent.assert_group_by_name("group1", dict(name="group1", gid=2001)) ent.assert_group_by_gid(2001, dict(name="group1", gid=2001)) ent.assert_group_by_name("group2", dict(name="group2", gid=2002)) ent.assert_group_by_gid(2002, dict(name="group2", gid=2002)) ent.assert_group_by_name("group3", dict(name="group3", gid=2003)) ent.assert_group_by_gid(2003, dict(name="group3", gid=2003)) ent.assert_group_by_name("group0x", dict(name="group0x", gid=2000)) ent.assert_group_by_gid(2000, dict(name="group0x", gid=2000)) ent.assert_group_by_name("group1x", dict(name="group1x", gid=2010)) ent.assert_group_by_gid(2010, dict(name="group1x", gid=2010)) ent.assert_group_by_name("group2x", dict(name="group2x", gid=2020)) ent.assert_group_by_gid(2020, dict(name="group2x", gid=2020)) def test_getgrnam_simple_with_mc(ldap_conn, sanity): test_getgrnam_simple(ldap_conn, sanity) stop_sssd() test_getgrnam_simple(ldap_conn, sanity) def test_getgrnam_membership(ldap_conn, sanity): ent.assert_group_by_name( "group1", dict(mem=ent.contains_only("user1", "user11", "user21"))) ent.assert_group_by_gid( 2001, dict(mem=ent.contains_only("user1", "user11", "user21"))) ent.assert_group_by_name( "group2", dict(mem=ent.contains_only("user2", "user12", "user22"))) ent.assert_group_by_gid( 2002, dict(mem=ent.contains_only("user2", "user12", "user22"))) ent.assert_group_by_name( "group3", dict(mem=ent.contains_only("user3", "user13", "user23"))) ent.assert_group_by_gid( 2003, dict(mem=ent.contains_only("user3", "user13", "user23"))) ent.assert_group_by_name( "group0x", dict(mem=ent.contains_only("user1", "user2", "user3"))) ent.assert_group_by_gid( 2000, dict(mem=ent.contains_only("user1", "user2", "user3"))) ent.assert_group_by_name( "group1x", dict(mem=ent.contains_only("user11", "user12", "user13"))) ent.assert_group_by_gid( 2010, dict(mem=ent.contains_only("user11", "user12", "user13"))) ent.assert_group_by_name( "group2x", dict(mem=ent.contains_only("user21", "user22", "user23"))) ent.assert_group_by_gid( 2020, dict(mem=ent.contains_only("user21", "user22", "user23"))) def test_getgrnam_membership_with_mc(ldap_conn, sanity): test_getgrnam_membership(ldap_conn, sanity) stop_sssd() test_getgrnam_membership(ldap_conn, sanity) def assert_user_gids_equal(user, expected_gids): (res, errno, gids) = sssd_id.get_user_gids(user) assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user %s, %d" % (user, errno) assert sorted(gids) == sorted(expected_gids), \ "result: %s\n expected %s" % ( ", ".join(["%s" % s for s in sorted(gids)]), ", ".join(["%s" % s for s in sorted(expected_gids)]) ) def test_initgroups(ldap_conn, sanity): assert_user_gids_equal('user1', [2000, 2001]) assert_user_gids_equal('user2', [2000, 2002]) assert_user_gids_equal('user3', [2000, 2003]) assert_user_gids_equal('user11', [2010, 2001]) assert_user_gids_equal('user12', [2010, 2002]) assert_user_gids_equal('user13', [2010, 2003]) assert_user_gids_equal('user21', [2020, 2001]) assert_user_gids_equal('user22', [2020, 2002]) assert_user_gids_equal('user23', [2020, 2003]) def test_initgroups_with_mc(ldap_conn, sanity): test_initgroups(ldap_conn, sanity) stop_sssd() test_initgroups(ldap_conn, sanity) def test_initgroups_fqname_with_mc(ldap_conn, fqname): assert_user_gids_equal('user1@LDAP', [2000, 2001]) stop_sssd() assert_user_gids_equal('user1@LDAP', [2000, 2001]) def assert_initgroups_equal(user, primary_gid, expected_gids): (res, errno, gids) = sssd_id.call_sssd_initgroups(user, primary_gid) assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user %s, %d" % (user, errno) assert sorted(gids) == sorted(expected_gids), \ "result: %s\n expected %s" % ( ", ".join(["%s" % s for s in sorted(gids)]), ", ".join(["%s" % s for s in sorted(expected_gids)]) ) def assert_stored_last_initgroups(user1_case1, user1_case2, user1_case_last, primary_gid, expected_gids): assert_initgroups_equal(user1_case1, primary_gid, expected_gids) assert_initgroups_equal(user1_case2, primary_gid, expected_gids) assert_initgroups_equal(user1_case_last, primary_gid, expected_gids) stop_sssd() user = user1_case1 (res, errno, _) = sssd_id.call_sssd_initgroups(user, primary_gid) assert res == sssd_id.NssReturnCode.UNAVAIL, \ "Initgroups for user shoudl fail user %s, %d, %d" % (user, res, errno) user = user1_case2 (res, errno, _) = sssd_id.call_sssd_initgroups(user, primary_gid) assert res == sssd_id.NssReturnCode.UNAVAIL, \ "Initgroups for user shoudl fail user %s, %d, %d" % (user, res, errno) # Just last invocation of initgroups shoudl PASS # Otherwise, we would not be able to invalidate it assert_initgroups_equal(user1_case_last, primary_gid, expected_gids) def test_initgroups_case_insensitive_with_mc1(ldap_conn, fqname_case_insensitive): user1_case1 = 'User1@LDAP' user1_case2 = 'uSer1@LDAP' user1_case_last = 'usEr1@LDAP' primary_gid = 2001 expected_gids = [2000, 2001] assert_stored_last_initgroups(user1_case1, user1_case2, user1_case_last, primary_gid, expected_gids) def test_initgroups_case_insensitive_with_mc2(ldap_conn, fqname_case_insensitive): user1_case1 = 'usEr1@LDAP' user1_case2 = 'User1@LDAP' user1_case_last = 'uSer1@LDAP' primary_gid = 2001 expected_gids = [2000, 2001] assert_stored_last_initgroups(user1_case1, user1_case2, user1_case_last, primary_gid, expected_gids) def test_initgroups_case_insensitive_with_mc3(ldap_conn, fqname_case_insensitive): user1_case1 = 'uSer1@LDAP' user1_case2 = 'usEr1@LDAP' user1_case_last = 'User1@LDAP' primary_gid = 2001 expected_gids = [2000, 2001] assert_stored_last_initgroups(user1_case1, user1_case2, user1_case_last, primary_gid, expected_gids) def run_simple_test_with_initgroups(): ent.assert_passwd_by_name( 'user1', dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_passwd_by_uid( 1001, dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_group_by_name( "group1", dict(mem=ent.contains_only("user1", "user11", "user21"))) ent.assert_group_by_gid( 2001, dict(mem=ent.contains_only("user1", "user11", "user21"))) # unrelated group to user1 ent.assert_group_by_name( "group2", dict(mem=ent.contains_only("user2", "user12", "user22"))) ent.assert_group_by_gid( 2002, dict(mem=ent.contains_only("user2", "user12", "user22"))) assert_initgroups_equal("user1", 2001, [2000, 2001]) def test_invalidation_of_gids_after_initgroups(ldap_conn, sanity): # the sssd cache was empty and not all user's group were # resolved with getgr{nm,gid}. Therefore there is a change in # group membership => user groups should be invalidated run_simple_test_with_initgroups() assert_initgroups_equal("user1", 2001, [2000, 2001]) stop_sssd() ent.assert_passwd_by_name( 'user1', dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_passwd_by_uid( 1001, dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) # unrelated group to user1 must be returned ent.assert_group_by_name( "group2", dict(mem=ent.contains_only("user2", "user12", "user22"))) ent.assert_group_by_gid( 2002, dict(mem=ent.contains_only("user2", "user12", "user22"))) assert_initgroups_equal("user1", 2001, [2000, 2001]) # user groups must be invalidated for group in ["group1", "group0x"]: with pytest.raises(KeyError): grp.getgrnam(group) for gid in [2000, 2001]: with pytest.raises(KeyError): grp.getgrgid(gid) def test_initgroups_without_change_in_membership(ldap_conn, sanity): # the sssd cache was empty and not all user's group were # resolved with getgr{nm,gid}. Therefore there is a change in # group membership => user groups should be invalidated run_simple_test_with_initgroups() # invalidate cache subprocess.call(["sss_cache", "-E"]) # all users and groups will be just refreshed from LDAP # but there will not be a change in group membership # user groups should not be invlaidated run_simple_test_with_initgroups() stop_sssd() # everything should be in memory cache run_simple_test_with_initgroups() def assert_mc_records_for_user1(): ent.assert_passwd_by_name( 'user1', dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_passwd_by_uid( 1001, dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_group_by_name( "group1", dict(mem=ent.contains_only("user1", "user11", "user21"))) ent.assert_group_by_gid( 2001, dict(mem=ent.contains_only("user1", "user11", "user21"))) ent.assert_group_by_name( "group0x", dict(mem=ent.contains_only("user1", "user2", "user3"))) ent.assert_group_by_gid( 2000, dict(mem=ent.contains_only("user1", "user2", "user3"))) assert_initgroups_equal("user1", 2001, [2000, 2001]) def assert_missing_mc_records_for_user1(): with pytest.raises(KeyError): pwd.getpwnam("user1") with pytest.raises(KeyError): pwd.getpwuid(1001) for gid in [2000, 2001]: with pytest.raises(KeyError): grp.getgrgid(gid) for group in ["group0x", "group1"]: with pytest.raises(KeyError): grp.getgrnam(group) (res, err, _) = sssd_id.call_sssd_initgroups("user1", 2001) assert res == sssd_id.NssReturnCode.UNAVAIL, \ "Initgroups should not find anything after invalidation of mc.\n" \ "User user1, errno:%d" % err def test_invalidate_user_before_stop(ldap_conn, sanity): # initialize cache with full ID (res, errno, _) = sssd_id.get_user_groups("user1") assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user1 %s, %d" % errno assert_mc_records_for_user1() subprocess.call(["sss_cache", "-u", "user1"]) stop_sssd() assert_missing_mc_records_for_user1() def test_invalidate_user_after_stop(ldap_conn, sanity): # initialize cache with full ID (res, errno, _) = sssd_id.get_user_groups("user1") assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user1 %s, %d" % errno assert_mc_records_for_user1() stop_sssd() subprocess.call(["sss_cache", "-u", "user1"]) assert_missing_mc_records_for_user1() def test_invalidate_users_before_stop(ldap_conn, sanity): # initialize cache with full ID (res, errno, _) = sssd_id.get_user_groups("user1") assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user1 %s, %d" % errno assert_mc_records_for_user1() subprocess.call(["sss_cache", "-U"]) stop_sssd() assert_missing_mc_records_for_user1() def test_invalidate_users_after_stop(ldap_conn, sanity): # initialize cache with full ID (res, errno, _) = sssd_id.get_user_groups("user1") assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user1 %s, %d" % errno assert_mc_records_for_user1() stop_sssd() subprocess.call(["sss_cache", "-U"]) assert_missing_mc_records_for_user1() def test_invalidate_group_before_stop(ldap_conn, sanity): # initialize cache with full ID (res, errno, _) = sssd_id.get_user_groups("user1") assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user1 %s, %d" % errno assert_mc_records_for_user1() subprocess.call(["sss_cache", "-g", "group1"]) stop_sssd() assert_missing_mc_records_for_user1() def test_invalidate_group_after_stop(ldap_conn, sanity): # initialize cache with full ID (res, errno, _) = sssd_id.get_user_groups("user1") assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user1 %s, %d" % errno assert_mc_records_for_user1() stop_sssd() subprocess.call(["sss_cache", "-g", "group1"]) assert_missing_mc_records_for_user1() def test_invalidate_groups_before_stop(ldap_conn, sanity): # initialize cache with full ID (res, errno, _) = sssd_id.get_user_groups("user1") assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user1 %s, %d" % errno assert_mc_records_for_user1() subprocess.call(["sss_cache", "-G"]) stop_sssd() assert_missing_mc_records_for_user1() def test_invalidate_groups_after_stop(ldap_conn, sanity): # initialize cache with full ID (res, errno, _) = sssd_id.get_user_groups("user1") assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user1 %s, %d" % errno assert_mc_records_for_user1() stop_sssd() subprocess.call(["sss_cache", "-G"]) assert_missing_mc_records_for_user1() def test_invalidate_everything_before_stop(ldap_conn, sanity): # initialize cache with full ID (res, errno, _) = sssd_id.get_user_groups("user1") assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user1 %s, %d" % errno assert_mc_records_for_user1() subprocess.call(["sss_cache", "-E"]) stop_sssd() assert_missing_mc_records_for_user1() def test_invalidate_everything_after_stop(ldap_conn, sanity): # initialize cache with full ID (res, errno, _) = sssd_id.get_user_groups("user1") assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user1 %s, %d" % errno assert_mc_records_for_user1() stop_sssd() subprocess.call(["sss_cache", "-E"]) assert_missing_mc_records_for_user1()