summaryrefslogtreecommitdiffstats
path: root/src/tests
diff options
context:
space:
mode:
authorJakub Hrozek <jhrozek@redhat.com>2016-06-01 16:47:48 +0200
committerJakub Hrozek <jhrozek@redhat.com>2016-06-23 13:47:24 +0200
commitb8946a5dbde01a87465de707092716349a35248b (patch)
tree24a14bd8edfd221fe93cc7504eebc34902f06b7a /src/tests
parentd36f4db9bb5efc63b94190cca25adb08ee56971c (diff)
downloadsssd-b8946a5dbde01a87465de707092716349a35248b.tar.gz
sssd-b8946a5dbde01a87465de707092716349a35248b.tar.xz
sssd-b8946a5dbde01a87465de707092716349a35248b.zip
TESTS: Add an integration test for the timestamps cache
Reviewed-by: Sumit Bose <sbose@redhat.com>
Diffstat (limited to 'src/tests')
-rw-r--r--src/tests/intg/Makefile.am2
-rw-r--r--src/tests/intg/sssd_ldb.py84
-rw-r--r--src/tests/intg/test_ts_cache.py589
3 files changed, 675 insertions, 0 deletions
diff --git a/src/tests/intg/Makefile.am b/src/tests/intg/Makefile.am
index 739499731..d79f6424e 100644
--- a/src/tests/intg/Makefile.am
+++ b/src/tests/intg/Makefile.am
@@ -1,6 +1,7 @@
dist_noinst_DATA = \
config.py.m4 \
sssd_id.py \
+ sssd_ldb.py \
ds.py \
ds_openldap.py \
ent.py \
@@ -11,6 +12,7 @@ dist_noinst_DATA = \
test_local_domain.py \
util.py \
test_memory_cache.py \
+ test_ts_cache.py \
$(NULL)
config.py: config.py.m4
diff --git a/src/tests/intg/sssd_ldb.py b/src/tests/intg/sssd_ldb.py
new file mode 100644
index 000000000..bf5264b77
--- /dev/null
+++ b/src/tests/intg/sssd_ldb.py
@@ -0,0 +1,84 @@
+#
+# SSSD integration test - access the ldb cache
+#
+# Copyright (c) 2016 Red Hat, Inc.
+#
+# This is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import os
+import ldb
+import config
+
+
+class CacheType(object):
+ sysdb = 1
+ timestamps = 2
+
+
+class TsCacheEntry(object):
+ user = 1
+ group = 2
+
+
+class SssdLdb(object):
+ def __init__(self, domain_name):
+ self._domain_name = domain_name
+ self._sysdb = self._create_dbconn(CacheType.sysdb,
+ domain_name)
+ self._timestamps = self._create_dbconn(CacheType.timestamps,
+ domain_name)
+
+ def _create_dbconn(self, cache_type, domain_name):
+ if cache_type == CacheType.sysdb:
+ db_path = os.path.join(config.DB_PATH,
+ "cache_%s.ldb" % domain_name)
+ elif cache_type == CacheType.timestamps:
+ db_path = os.path.join(config.DB_PATH,
+ "timestamps_%s.ldb" % domain_name)
+ else:
+ raise ValueError("Unknown cache type\n")
+
+ pyldb = ldb.Ldb()
+ pyldb.connect(db_path)
+ return pyldb
+
+ def _get_dbconn(self, cache_type):
+ dbconn = None
+ if cache_type == CacheType.sysdb:
+ dbconn = self._sysdb
+ elif cache_type == CacheType.timestamps:
+ dbconn = self._timestamps
+ return dbconn
+
+ def _entry_basedn(self, entry_type):
+ if entry_type == TsCacheEntry.user:
+ rdn = "users"
+ elif entry_type == TsCacheEntry.group:
+ rdn = "groups"
+ else:
+ raise ValueError("Unknown entry type\n")
+ return "cn=%s,cn=%s,cn=sysdb" % (rdn, self._domain_name)
+
+ def _basedn(self, name, entry_type):
+ return "name=%s,%s" % (name, self._entry_basedn(entry_type))
+
+ def get_entry_attr(self, cache_type, entry_type, name, attr):
+ dbconn = self._get_dbconn(cache_type)
+ basedn = self._basedn(name, entry_type)
+
+ res = dbconn.search(base=basedn, scope=ldb.SCOPE_BASE, attrs=[attr])
+ if res.count != 1:
+ return None
+
+ return res.msgs[0].get(attr).get(0)
diff --git a/src/tests/intg/test_ts_cache.py b/src/tests/intg/test_ts_cache.py
new file mode 100644
index 000000000..6497d3680
--- /dev/null
+++ b/src/tests/intg/test_ts_cache.py
@@ -0,0 +1,589 @@
+#
+# LDAP integration test - test updating the sysdb and timestamp
+# cache
+#
+# Copyright (c) 2016 Red Hat, Inc.
+#
+# This is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import os
+import stat
+import ent
+import grp
+import pwd
+import config
+import signal
+import subprocess
+import time
+import ldap
+import pytest
+import ds_openldap
+import ldap_ent
+import sssd_ldb
+import sssd_id
+from util import unindent
+from util import run_shell
+
+LDAP_BASE_DN = "dc=example,dc=com"
+
+SCHEMA_RFC2307 = "rfc2307"
+SCHEMA_RFC2307_BIS = "rfc2307bis"
+
+TS_ATTRLIST = ("dataExpireTimestamp", "originalModifyTimestamp")
+
+
+@pytest.fixture(scope="module")
+def ds_inst(request):
+ """LDAP server instance fixture"""
+ ds_inst = ds_openldap.DSOpenLDAP(
+ config.PREFIX, 10389, LDAP_BASE_DN,
+ "cn=admin", "Secret123")
+ try:
+ ds_inst.setup()
+ except:
+ ds_inst.teardown()
+ raise
+ request.addfinalizer(lambda: ds_inst.teardown())
+ return ds_inst
+
+
+@pytest.fixture(scope="module")
+def ldap_conn(request, ds_inst):
+ """LDAP server connection fixture"""
+ ldap_conn = ds_inst.bind()
+ ldap_conn.ds_inst = ds_inst
+ request.addfinalizer(lambda: ldap_conn.unbind_s())
+ return ldap_conn
+
+
+def create_ldap_fixture(request, ldap_conn, ent_list):
+ """Add LDAP entries and add teardown for removing them"""
+ for entry in ent_list:
+ ldap_conn.add_s(entry[0], entry[1])
+
+ def teardown():
+ for entry in ent_list:
+ try:
+ ldap_conn.delete_s(entry[0])
+ except ldap.NO_SUCH_OBJECT:
+ # if the test already removed an object, it's fine
+ # to not care in the teardown
+ pass
+ request.addfinalizer(teardown)
+
+
+def create_conf_fixture(request, contents):
+ """Generate sssd.conf and add teardown for removing it"""
+ conf = open(config.CONF_PATH, "w")
+ conf.write(contents)
+ conf.close()
+ os.chmod(config.CONF_PATH, stat.S_IRUSR | stat.S_IWUSR)
+ request.addfinalizer(lambda: os.unlink(config.CONF_PATH))
+
+
+def stop_sssd():
+ pid_file = open(config.PIDFILE_PATH, "r")
+ pid = int(pid_file.read())
+ os.kill(pid, signal.SIGTERM)
+ while True:
+ try:
+ os.kill(pid, signal.SIGCONT)
+ except:
+ break
+ time.sleep(1)
+
+
+def create_sssd_fixture(request):
+ """Start sssd and add teardown for stopping it and removing state"""
+ if subprocess.call(["sssd", "-D", "-f"]) != 0:
+ raise Exception("sssd start failed")
+
+ def teardown():
+ try:
+ stop_sssd()
+ except:
+ pass
+ for path in os.listdir(config.DB_PATH):
+ os.unlink(config.DB_PATH + "/" + path)
+ for path in os.listdir(config.MCACHE_PATH):
+ os.unlink(config.MCACHE_PATH + "/" + path)
+ request.addfinalizer(teardown)
+
+
+def load_data_to_ldap(request, ldap_conn, schema):
+ ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn)
+ ent_list.add_user("user1", 1001, 2001)
+ ent_list.add_user("user11", 1011, 2001)
+ ent_list.add_user("user21", 1021, 2001)
+
+ if schema == SCHEMA_RFC2307_BIS:
+ ent_list.add_group_bis("group1", 2001, ("user1", "user11", "user21"))
+ elif schema == SCHEMA_RFC2307:
+ ent_list.add_group("group1", 2001, ("user1", "user11", "user21"))
+ create_ldap_fixture(request, ldap_conn, ent_list)
+
+
+def load_2307bis_data_to_ldap(request, ldap_conn):
+ return load_data_to_ldap(request, ldap_conn, SCHEMA_RFC2307_BIS)
+
+
+def load_2307_data_to_ldap(request, ldap_conn):
+ return load_data_to_ldap(request, ldap_conn, SCHEMA_RFC2307)
+
+
+@pytest.fixture
+def setup_rfc2307bis(request, ldap_conn):
+ load_2307bis_data_to_ldap(request, ldap_conn)
+
+ conf = unindent("""\
+ [sssd]
+ domains = LDAP
+ services = nss
+
+ [nss]
+ memcache_timeout = 1
+
+ [domain/LDAP]
+ ldap_schema = rfc2307bis
+ id_provider = ldap
+ auth_provider = ldap
+ sudo_provider = ldap
+ ldap_group_object_class = groupOfNames
+ ldap_uri = {ldap_conn.ds_inst.ldap_url}
+ ldap_search_base = {ldap_conn.ds_inst.base_dn}
+ """).format(**locals())
+ create_conf_fixture(request, conf)
+ create_sssd_fixture(request)
+ return None
+
+
+@pytest.fixture
+def setup_rfc2307(request, ldap_conn):
+ load_2307_data_to_ldap(request, ldap_conn)
+
+ conf = unindent("""\
+ [sssd]
+ domains = LDAP
+ services = nss
+
+ [nss]
+ memcache_timeout = 1
+
+ [domain/LDAP]
+ ldap_schema = rfc2307
+ id_provider = ldap
+ auth_provider = ldap
+ sudo_provider = ldap
+ ldap_uri = {ldap_conn.ds_inst.ldap_url}
+ ldap_search_base = {ldap_conn.ds_inst.base_dn}
+ """).format(**locals())
+ create_conf_fixture(request, conf)
+ create_sssd_fixture(request)
+ return None
+
+
+@pytest.fixture
+def ldb_examine(request):
+ ldb_conn = sssd_ldb.SssdLdb('LDAP')
+ return ldb_conn
+
+
+def invalidate_group(name):
+ subprocess.call(["sss_cache", "-g", name])
+
+
+def invalidate_user(name):
+ subprocess.call(["sss_cache", "-u", name])
+
+
+def get_attrs(ldb_conn, type, name, attr_list):
+ sysdb_attrs = dict()
+ ts_attrs = dict()
+
+ for attr in attr_list:
+ sysdb_attrs[attr] = ldb_conn.get_entry_attr(
+ sssd_ldb.CacheType.sysdb,
+ type, name, attr)
+ ts_attrs[attr] = ldb_conn.get_entry_attr(
+ sssd_ldb.CacheType.timestamps,
+ type, name, attr)
+ return (sysdb_attrs, ts_attrs)
+
+
+def get_group_attrs(ldb_conn, name, attr_list):
+ return get_attrs(ldb_conn, sssd_ldb.TsCacheEntry.group, name, attr_list)
+
+
+def get_user_attrs(ldb_conn, name, attr_list):
+ return get_attrs(ldb_conn, sssd_ldb.TsCacheEntry.user, name, attr_list)
+
+
+def assert_same_attrval(adict1, adict2, attr_name):
+ assert adict1.get(attr_name) is not None and \
+ adict1.get(attr_name) == adict2.get(attr_name)
+
+
+def assert_diff_attrval(adict1, adict2, attr_name):
+ assert adict1.get(attr_name) is not None and \
+ adict1.get(attr_name) != adict2.get(attr_name)
+
+
+def prime_cache_group(ldb_conn, name, members):
+ ent.assert_group_by_name(
+ name,
+ dict(mem=ent.contains_only(*members)))
+ sysdb_attrs, ts_attrs = get_group_attrs(ldb_conn, name, TS_ATTRLIST)
+ assert_same_attrval(sysdb_attrs, ts_attrs, "dataExpireTimestamp")
+ assert_same_attrval(sysdb_attrs, ts_attrs, "originalModifyTimestamp")
+
+ # just to force different stamps and make sure memcache is gone
+ time.sleep(1)
+ invalidate_group(name)
+
+ return sysdb_attrs, ts_attrs
+
+
+def prime_cache_user(ldb_conn, name, primary_gid):
+ # calling initgroups would add the initgExpire timestamp attribute and make sure
+ # that sss_cache doesn't add it with a value of 1, triggering a sysdb update
+ (res, errno, gids) = sssd_id.call_sssd_initgroups(name, primary_gid)
+ assert res == sssd_id.NssReturnCode.SUCCESS
+
+ sysdb_attrs, ts_attrs = get_user_attrs(ldb_conn, name, TS_ATTRLIST)
+ assert_same_attrval(sysdb_attrs, ts_attrs, "dataExpireTimestamp")
+ assert_same_attrval(sysdb_attrs, ts_attrs, "originalModifyTimestamp")
+
+ # just to force different stamps and make sure memcache is gone
+ time.sleep(1)
+ invalidate_user(name)
+
+ return sysdb_attrs, ts_attrs
+
+
+def test_group_2307bis_update_same_modstamp(ldap_conn,
+ ldb_examine,
+ setup_rfc2307bis):
+ """
+ Test that a group update with the same modifyTimestamp does not trigger
+ sysdb cache update
+ """
+ ldb_conn = ldb_examine
+ old_sysdb_attrs, old_ts_attrs = prime_cache_group(
+ ldb_conn, "group1",
+ ("user1", "user11", "user21"))
+
+ ent.assert_group_by_name(
+ "group1",
+ dict(mem=ent.contains_only("user1", "user11", "user21")))
+ sysdb_attrs, ts_attrs = get_group_attrs(ldb_conn, "group1", TS_ATTRLIST)
+
+ assert_same_attrval(sysdb_attrs, old_sysdb_attrs, "dataExpireTimestamp")
+ assert_same_attrval(sysdb_attrs, old_sysdb_attrs, "originalModifyTimestamp")
+
+ assert_diff_attrval(ts_attrs, old_ts_attrs, "dataExpireTimestamp")
+ assert_same_attrval(ts_attrs, old_ts_attrs, "originalModifyTimestamp")
+
+
+def test_group_2307bis_update_same_attrs(ldap_conn,
+ ldb_examine,
+ setup_rfc2307bis):
+ """
+ Test that a group update with a different modifyTimestamp but the same
+ attrs does not trigger sysdb cache update
+ """
+ ldb_conn = ldb_examine
+ old_sysdb_attrs, old_ts_attrs = prime_cache_group(
+ ldb_conn, "group1",
+ ("user1", "user11", "user21"))
+
+ # modify an argument we don't save to the cache. This will bump the
+ # modifyTimestamp attribute, but the attributes themselves will be the same
+ # from sssd's point of view
+ ldap_conn.modify_s("cn=group1,ou=Groups," + ldap_conn.ds_inst.base_dn,
+ [(ldap.MOD_ADD, "description", "group one")])
+ # wait for slapd to change its database
+ time.sleep(1)
+
+ ent.assert_group_by_name(
+ "group1",
+ dict(mem=ent.contains_only("user1", "user11", "user21")))
+ sysdb_attrs, ts_attrs = get_group_attrs(ldb_conn, "group1", TS_ATTRLIST)
+
+ assert_same_attrval(sysdb_attrs, old_sysdb_attrs, "dataExpireTimestamp")
+ assert_same_attrval(sysdb_attrs, old_sysdb_attrs, "originalModifyTimestamp")
+
+ assert_diff_attrval(ts_attrs, old_ts_attrs, "dataExpireTimestamp")
+ assert_diff_attrval(ts_attrs, old_ts_attrs, "originalModifyTimestamp")
+
+
+def test_group_2307bis_update_diff_attrs(ldap_conn,
+ ldb_examine,
+ setup_rfc2307bis):
+ """
+ Test that a group update with different attribute triggers cache update
+ """
+ ldb_conn = ldb_examine
+ old_sysdb_attrs, old_ts_attrs = prime_cache_group(
+ ldb_conn, "group1",
+ ("user1", "user11", "user21"))
+
+ ldap_conn.modify_s("cn=group1,ou=Groups," + ldap_conn.ds_inst.base_dn,
+ [(ldap.MOD_DELETE, "member",
+ "uid=user1,ou=Users," + ldap_conn.ds_inst.base_dn)])
+ # wait for slapd to change its database
+ time.sleep(1)
+
+ ent.assert_group_by_name(
+ "group1",
+ dict(mem=ent.contains_only("user11", "user21")))
+ sysdb_attrs, ts_attrs = get_group_attrs(ldb_conn, "group1", TS_ATTRLIST)
+
+ assert_diff_attrval(sysdb_attrs, old_sysdb_attrs, "dataExpireTimestamp")
+ assert_diff_attrval(sysdb_attrs, old_sysdb_attrs, "originalModifyTimestamp")
+
+ assert_diff_attrval(ts_attrs, old_ts_attrs, "dataExpireTimestamp")
+ assert_diff_attrval(ts_attrs, old_ts_attrs, "originalModifyTimestamp")
+
+
+def test_group_2307bis_delete_group(ldap_conn,
+ ldb_examine,
+ setup_rfc2307bis):
+ """
+ Test that deleting a group removes it from both caches
+ """
+ ldb_conn = ldb_examine
+ old_sysdb_attrs, old_ts_attrs = prime_cache_group(
+ ldb_conn, "group1",
+ ("user1", "user11", "user21"))
+
+ e = ldap_ent.group_bis(ldap_conn.ds_inst.base_dn, "group1", 2001)
+ ldap_conn.delete_s(e[0])
+ # wait for slapd to change its database
+ time.sleep(1)
+
+ with pytest.raises(KeyError):
+ grp.getgrnam("group1")
+
+ sysdb_attrs, ts_attrs = get_group_attrs(ldb_conn, "group1", TS_ATTRLIST)
+ assert sysdb_attrs.get("dataExpireTimestamp") is None
+ assert sysdb_attrs.get("originalModifyTimestamp") is None
+ assert ts_attrs.get("dataExpireTimestamp") is None
+ assert ts_attrs.get("originalModifyTimestamp") is None
+
+
+def test_group_2307_update_same_modstamp(ldap_conn,
+ ldb_examine,
+ setup_rfc2307):
+ """
+ Test that a group update with the same modifyTimestamp does not trigger
+ sysdb cache update
+ """
+ ldb_conn = ldb_examine
+ old_sysdb_attrs, old_ts_attrs = prime_cache_group(
+ ldb_conn, "group1",
+ ("user1", "user11", "user21"))
+
+ ent.assert_group_by_name(
+ "group1",
+ dict(mem=ent.contains_only("user1", "user11", "user21")))
+ sysdb_attrs, ts_attrs = get_group_attrs(ldb_conn, "group1", TS_ATTRLIST)
+
+ assert_same_attrval(sysdb_attrs, old_sysdb_attrs, "dataExpireTimestamp")
+ assert_same_attrval(sysdb_attrs, old_sysdb_attrs, "originalModifyTimestamp")
+
+ assert_diff_attrval(ts_attrs, old_ts_attrs, "dataExpireTimestamp")
+ assert_same_attrval(ts_attrs, old_ts_attrs, "originalModifyTimestamp")
+
+
+def test_group_2307_update_same_attrs(ldap_conn,
+ ldb_examine,
+ setup_rfc2307):
+ """
+ Test that a group update with a different modifyTimestamp but the same
+ attrs does not trigger sysdb cache update
+ """
+ ldb_conn = ldb_examine
+ old_sysdb_attrs, old_ts_attrs = prime_cache_group(
+ ldb_conn, "group1",
+ ("user1", "user11", "user21"))
+
+ # modify an argument we don't save to the cache. This will bump the
+ # modifyTimestamp attribute, but the attributes themselves will be the same
+ # from sssd's point of view
+ ldap_conn.modify_s("cn=group1,ou=Groups," + ldap_conn.ds_inst.base_dn,
+ [(ldap.MOD_ADD, "description", "group one")])
+ # wait for slapd to change its database
+ time.sleep(1)
+
+ ent.assert_group_by_name(
+ "group1",
+ dict(mem=ent.contains_only("user1", "user11", "user21")))
+ sysdb_attrs, ts_attrs = get_group_attrs(ldb_conn, "group1", TS_ATTRLIST)
+
+ assert_same_attrval(sysdb_attrs, old_sysdb_attrs, "dataExpireTimestamp")
+ assert_same_attrval(sysdb_attrs, old_sysdb_attrs, "originalModifyTimestamp")
+
+ assert_diff_attrval(ts_attrs, old_ts_attrs, "dataExpireTimestamp")
+ assert_diff_attrval(ts_attrs, old_ts_attrs, "originalModifyTimestamp")
+
+
+def test_group_2307_update_diff_attrs(ldap_conn,
+ ldb_examine,
+ setup_rfc2307):
+ """
+ Test that a group update with different attribute triggers cache update
+ """
+ ldb_conn = ldb_examine
+ old_sysdb_attrs, old_ts_attrs = prime_cache_group(
+ ldb_conn, "group1",
+ ("user1", "user11", "user21"))
+
+ ldap_conn.modify_s("cn=group1,ou=Groups," + ldap_conn.ds_inst.base_dn,
+ [(ldap.MOD_DELETE, "memberUid", "user1")])
+ # wait for slapd to change its database
+ time.sleep(1)
+
+ ent.assert_group_by_name(
+ "group1",
+ dict(mem=ent.contains_only("user11", "user21")))
+ sysdb_attrs, ts_attrs = get_group_attrs(ldb_conn, "group1", TS_ATTRLIST)
+
+ assert_diff_attrval(sysdb_attrs, old_sysdb_attrs, "dataExpireTimestamp")
+ assert_diff_attrval(sysdb_attrs, old_sysdb_attrs, "originalModifyTimestamp")
+
+ assert_diff_attrval(ts_attrs, old_ts_attrs, "dataExpireTimestamp")
+ assert_diff_attrval(ts_attrs, old_ts_attrs, "originalModifyTimestamp")
+
+
+def test_group_2307_delete_group(ldap_conn,
+ ldb_examine,
+ setup_rfc2307):
+ """
+ Test that deleting a group removes it from both caches
+ """
+ ldb_conn = ldb_examine
+ old_sysdb_attrs, old_ts_attrs = prime_cache_group(
+ ldb_conn, "group1",
+ ("user1", "user11", "user21"))
+
+ e = ldap_ent.group_bis(ldap_conn.ds_inst.base_dn, "group1", 2001)
+ ldap_conn.delete_s(e[0])
+ # wait for slapd to change its database
+ time.sleep(1)
+
+ with pytest.raises(KeyError):
+ grp.getgrnam("group1")
+
+ sysdb_attrs, ts_attrs = get_group_attrs(ldb_conn, "group1", TS_ATTRLIST)
+ assert sysdb_attrs.get("dataExpireTimestamp") is None
+ assert sysdb_attrs.get("originalModifyTimestamp") is None
+ assert ts_attrs.get("dataExpireTimestamp") is None
+ assert ts_attrs.get("originalModifyTimestamp") is None
+
+
+def test_user_update_same_modstamp(ldap_conn,
+ ldb_examine,
+ setup_rfc2307bis):
+ """
+ Test that a user update with the same modifyTimestamp does not trigger
+ sysdb cache update
+ """
+ ldb_conn = ldb_examine
+ old_sysdb_attrs, old_ts_attrs = prime_cache_user(ldb_conn, "user1", 2001)
+
+ ent.assert_passwd_by_name("user1", dict(name="user1"))
+
+ sysdb_attrs, ts_attrs = get_user_attrs(ldb_conn, "user1", TS_ATTRLIST)
+ assert_same_attrval(sysdb_attrs, old_sysdb_attrs, "dataExpireTimestamp")
+ assert_same_attrval(sysdb_attrs, old_sysdb_attrs, "originalModifyTimestamp")
+
+ assert_diff_attrval(ts_attrs, old_ts_attrs, "dataExpireTimestamp")
+ assert_same_attrval(ts_attrs, old_ts_attrs, "originalModifyTimestamp")
+
+
+def test_user_update_same_attrs(ldap_conn,
+ ldb_examine,
+ setup_rfc2307bis):
+ """
+ Test that a user update with the same modifyTimestamp does not trigger
+ sysdb cache update
+ """
+ ldb_conn = ldb_examine
+ old_sysdb_attrs, old_ts_attrs = prime_cache_user(ldb_conn, "user1", 2001)
+
+ # modify an argument we don't save to the cache. This will bump the
+ # modifyTimestamp attribute, but the attributes themselves will be the same
+ # from sssd's point of view
+ ldap_conn.modify_s("uid=user1,ou=Users," + ldap_conn.ds_inst.base_dn,
+ [(ldap.MOD_ADD, "description", "user one")])
+ # wait for slapd to change its database
+ time.sleep(1)
+
+ ent.assert_passwd_by_name("user1", dict(name="user1"))
+ sysdb_attrs, ts_attrs = get_user_attrs(ldb_conn, "user1", TS_ATTRLIST)
+ assert_same_attrval(sysdb_attrs, old_sysdb_attrs, "dataExpireTimestamp")
+ assert_same_attrval(sysdb_attrs, old_sysdb_attrs, "originalModifyTimestamp")
+
+ assert_diff_attrval(ts_attrs, old_ts_attrs, "dataExpireTimestamp")
+ assert_diff_attrval(ts_attrs, old_ts_attrs, "originalModifyTimestamp")
+
+
+def test_user_update_diff_attrs(ldap_conn,
+ ldb_examine,
+ setup_rfc2307bis):
+ """
+ Test that a user update with the same modifyTimestamp does not trigger
+ sysdb cache update
+ """
+ ldb_conn = ldb_examine
+ old_sysdb_attrs, old_ts_attrs = prime_cache_user(ldb_conn, "user1", 2001)
+
+ # modify an argument we don't save to the cache. This will bump the
+ # modifyTimestamp attribute, but the attributes themselves will be the same
+ # from sssd's point of view
+ ldap_conn.modify_s("uid=user1,ou=Users," + ldap_conn.ds_inst.base_dn,
+ [(ldap.MOD_REPLACE, "loginShell", "/bin/zsh")])
+ # wait for slapd to change its database
+ time.sleep(1)
+
+ ent.assert_passwd_by_name("user1", dict(name="user1"))
+ sysdb_attrs, ts_attrs = get_user_attrs(ldb_conn, "user1", TS_ATTRLIST)
+ assert_diff_attrval(sysdb_attrs, old_sysdb_attrs, "dataExpireTimestamp")
+ assert_diff_attrval(sysdb_attrs, old_sysdb_attrs, "originalModifyTimestamp")
+
+ assert_diff_attrval(ts_attrs, old_ts_attrs, "dataExpireTimestamp")
+ assert_diff_attrval(ts_attrs, old_ts_attrs, "originalModifyTimestamp")
+
+def test_user_2307bis_delete_user(ldap_conn,
+ ldb_examine,
+ setup_rfc2307bis):
+ """
+ Test that deleting a user removes it from both caches
+ """
+ ldb_conn = ldb_examine
+ old_sysdb_attrs, old_ts_attrs = prime_cache_user(ldb_conn, "user1", 2001)
+
+ e = ldap_ent.user(ldap_conn.ds_inst.base_dn, "user1", 1001, 2001)
+
+ ldap_conn.delete_s(e[0])
+ # wait for slapd to change its database
+ time.sleep(1)
+
+ with pytest.raises(KeyError):
+ pwd.getpwnam("user1")
+ sysdb_attrs, ts_attrs = get_user_attrs(ldb_conn, "user1", TS_ATTRLIST)
+ assert sysdb_attrs.get("dataExpireTimestamp") is None
+ assert sysdb_attrs.get("originalModifyTimestamp") is None
+ assert ts_attrs.get("dataExpireTimestamp") is None
+ assert ts_attrs.get("originalModifyTimestamp") is None