summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLukas Slebodnik <lslebodn@redhat.com>2015-08-04 12:47:58 +0200
committerJakub Hrozek <jhrozek@redhat.com>2015-08-05 12:51:43 +0200
commit089db891b8a7a94b5666e8cffb1d7b359d6aeb6e (patch)
treed70d2c28581358a6afc5863fc90e1970656ea5bd
parentc3baf4d7c0cbd139d96fd04f6b3c175d2f99de6c (diff)
downloadsssd-089db891b8a7a94b5666e8cffb1d7b359d6aeb6e.tar.gz
sssd-089db891b8a7a94b5666e8cffb1d7b359d6aeb6e.tar.xz
sssd-089db891b8a7a94b5666e8cffb1d7b359d6aeb6e.zip
test_memory_cache: Test invalidation with sss_cache
Reviewed-by: Michal Židek <mzidek@redhat.com>
-rw-r--r--src/tests/intg/test_memory_cache.py176
1 files changed, 176 insertions, 0 deletions
diff --git a/src/tests/intg/test_memory_cache.py b/src/tests/intg/test_memory_cache.py
index c809a4b6d..1fd577e65 100644
--- a/src/tests/intg/test_memory_cache.py
+++ b/src/tests/intg/test_memory_cache.py
@@ -20,6 +20,7 @@ import os
import stat
import ent
import grp
+import pwd
import config
import signal
import subprocess
@@ -570,3 +571,178 @@ def test_initgroups_without_change_in_membership(ldap_conn, sanity_rfc2307):
# everything should be in memory cache
run_simple_test_with_initgroups()
+
+
+def assert_mc_records_for_user1():
+ ent.assert_passwd_by_name(
+ 'user1',
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+ ent.assert_passwd_by_uid(
+ 1001,
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+
+ ent.assert_group_by_name(
+ "group1",
+ dict(mem=ent.contains_only("user1", "user11", "user21")))
+ ent.assert_group_by_gid(
+ 2001,
+ dict(mem=ent.contains_only("user1", "user11", "user21")))
+ ent.assert_group_by_name(
+ "group0x",
+ dict(mem=ent.contains_only("user1", "user2", "user3")))
+ ent.assert_group_by_gid(
+ 2000,
+ dict(mem=ent.contains_only("user1", "user2", "user3")))
+
+ assert_initgroups_equal("user1", 2001, [2000, 2001])
+
+
+def assert_missing_mc_records_for_user1():
+ with pytest.raises(KeyError):
+ pwd.getpwnam("user1")
+ with pytest.raises(KeyError):
+ pwd.getpwuid(1001)
+
+ for gid in [2000, 2001]:
+ with pytest.raises(KeyError):
+ grp.getgrgid(gid)
+ for group in ["group0x", "group1"]:
+ with pytest.raises(KeyError):
+ grp.getgrnam(group)
+
+ (res, err, _) = sssd_id.call_sssd_initgroups("user1", 2001)
+ assert res == sssd_id.NssReturnCode.UNAVAIL, \
+ "Initgroups should not find anything after invalidation of mc.\n" \
+ "User %s, errno:%d" % (user, err)
+
+
+def test_invalidate_user_before_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1 %s, %d" % errno
+ assert_mc_records_for_user1()
+
+ subprocess.call(["sss_cache", "-u", "user1"])
+ stop_sssd()
+
+ assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_user_after_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1 %s, %d" % errno
+ assert_mc_records_for_user1()
+
+ stop_sssd()
+ subprocess.call(["sss_cache", "-u", "user1"])
+
+ assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_users_before_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1 %s, %d" % errno
+ assert_mc_records_for_user1()
+
+ subprocess.call(["sss_cache", "-U"])
+ stop_sssd()
+
+ assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_users_after_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1 %s, %d" % errno
+ assert_mc_records_for_user1()
+
+ stop_sssd()
+ subprocess.call(["sss_cache", "-U"])
+
+ assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_group_before_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1 %s, %d" % errno
+ assert_mc_records_for_user1()
+
+ subprocess.call(["sss_cache", "-g", "group1"])
+ stop_sssd()
+
+ assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_group_after_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1 %s, %d" % errno
+ assert_mc_records_for_user1()
+
+ stop_sssd()
+ subprocess.call(["sss_cache", "-g", "group1"])
+
+ assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_groups_before_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1 %s, %d" % errno
+ assert_mc_records_for_user1()
+
+ subprocess.call(["sss_cache", "-G"])
+ stop_sssd()
+
+ assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_groups_after_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1 %s, %d" % errno
+ assert_mc_records_for_user1()
+
+ stop_sssd()
+ subprocess.call(["sss_cache", "-G"])
+
+ assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_everything_before_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1 %s, %d" % errno
+ assert_mc_records_for_user1()
+
+ subprocess.call(["sss_cache", "-E"])
+ stop_sssd()
+
+ assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_everything_after_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1 %s, %d" % errno
+ assert_mc_records_for_user1()
+
+ stop_sssd()
+ subprocess.call(["sss_cache", "-E"])
+
+ assert_missing_mc_records_for_user1()