summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJakub Hrozek <jhrozek@redhat.com>2016-10-25 16:16:01 +0200
committerJakub Hrozek <jhrozek@redhat.com>2017-02-15 14:53:52 +0100
commit8bdb8c0970dc9acb5b0a54dab0bae306ca964944 (patch)
treeb94bb6b4d8002276bd66b5c69524bdd7b5c65663
parent3728db53ac32da51fcaae96b132e8e56ebbaebfa (diff)
downloadsssd-8bdb8c0970dc9acb5b0a54dab0bae306ca964944.tar.gz
sssd-8bdb8c0970dc9acb5b0a54dab0bae306ca964944.tar.xz
sssd-8bdb8c0970dc9acb5b0a54dab0bae306ca964944.zip
TESTS: Add files provider integration tests
Implements integration tests for the files provider. In order to change entries in the nss-wrapped passwd and group files, this commit also implements a helper module that creates a new passwd and group file and moves it in place of the nss-wrapped files. We move the files instead of modifying them in-place in order to trigger similar inotify notifications as shadow-utils would. The unit test uses sleep on several places. This is suboptimal, but during testing especially on slow machines, it became apparent that sometimes the inotify message arrives later than the test would check for the changed entries. Therefore, the check would query the NSS responder even before the sss-files domain was invalidated. Reviewed-by: Pavel Březina <pbrezina@redhat.com> Reviewed-by: Lukáš Slebodník <lslebodn@redhat.com>
-rw-r--r--src/tests/intg/Makefile.am3
-rw-r--r--src/tests/intg/files_ops.py158
-rw-r--r--src/tests/intg/test_files_ops.py84
-rw-r--r--src/tests/intg/test_files_provider.py692
4 files changed, 937 insertions, 0 deletions
diff --git a/src/tests/intg/Makefile.am b/src/tests/intg/Makefile.am
index 03245b35d..64f90b490 100644
--- a/src/tests/intg/Makefile.am
+++ b/src/tests/intg/Makefile.am
@@ -23,6 +23,9 @@ dist_noinst_DATA = \
secrets.py \
test_secrets.py \
test_sssctl.py \
+ files_ops.py \
+ test_files_ops.py \
+ test_files_provider.py \
$(NULL)
config.py: config.py.m4
diff --git a/src/tests/intg/files_ops.py b/src/tests/intg/files_ops.py
new file mode 100644
index 000000000..65b3e5ee4
--- /dev/null
+++ b/src/tests/intg/files_ops.py
@@ -0,0 +1,158 @@
+#
+# SSSD integration test - operations on UNIX user and group database
+#
+# Copyright (c) 2016 Red Hat, Inc.
+#
+# This is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import os
+import os.path
+import tempfile
+import pytest
+
+import ent
+from util import backup_envvar_file, restore_envvar_file
+
+
+@pytest.fixture
+def passwd_ops_setup(request):
+ pwd_file = os.environ["NSS_WRAPPER_PASSWD"]
+ backup_envvar_file("NSS_WRAPPER_PASSWD")
+ request.addfinalizer(lambda: restore_envvar_file("NSS_WRAPPER_PASSWD"))
+ pwd_ops = PasswdOps(pwd_file)
+ return pwd_ops
+
+
+@pytest.fixture
+def group_ops_setup(request):
+ grp_file = os.environ["NSS_WRAPPER_GROUP"]
+ backup_envvar_file("NSS_WRAPPER_GROUP")
+ request.addfinalizer(lambda: restore_envvar_file("NSS_WRAPPER_GROUP"))
+ grp_ops = GroupOps(grp_file)
+ return grp_ops
+
+
+@pytest.fixture
+def group_db_setup(request):
+ group = request.param
+ grp_ops = group_ops_setup(request)
+ grp_ops.groupadd(**group)
+ ent.assert_group_by_name(group['name'], group)
+ return grp_ops
+
+
+class FilesOps(object):
+ """
+ A naive implementation of operations as a basis for user or group
+ operations. Uses rename to (hopefully) trigger the same fs-level
+ notifications as shadow-utils would.
+ """
+ def __init__(self, file_name):
+ self.file_name = file_name
+ self.tmp_dir = os.path.dirname(self.file_name)
+
+ @staticmethod
+ def _get_named_line(name, contents):
+ for num, line in enumerate(contents, 0):
+ pname = line.split(':')[0]
+ if name == pname:
+ return num
+ raise KeyError("%s not found" % name)
+
+ def _read_contents(self):
+ with open(self.file_name, "r") as pfile:
+ contents = pfile.readlines()
+ return contents
+
+ def _write_contents(self, contents):
+ tmp_file = tempfile.NamedTemporaryFile(dir=self.tmp_dir, delete=False)
+ tmp_file.writelines(contents)
+ tmp_file.flush()
+
+ os.rename(tmp_file.name, self.file_name)
+
+ def _append_line(self, new_line):
+ contents = self._read_contents()
+ contents.extend(new_line)
+ self._write_contents(contents)
+
+ def _subst_line(self, key, line):
+ contents = self._read_contents()
+ kindex = self._get_named_line(key, contents)
+ contents[kindex] = line
+ self._write_contents(contents)
+
+ def _del_line(self, key):
+ contents = self._read_contents()
+ kindex = self._get_named_line(key, contents)
+ contents.pop(kindex)
+ self._write_contents(contents)
+
+ contents = self._read_contents()
+
+
+class PasswdOps(FilesOps):
+ """
+ A naive implementation of user operations
+ """
+ def __init__(self, file_name):
+ super(PasswdOps, self).__init__(file_name)
+
+ def _pwd2line(self, name, uid, gid, passwd, gecos, homedir, shell):
+ pwd_fmt = "{name}:{passwd}:{uid}:{gid}:{gecos}:{homedir}:{shell}\n"
+ return pwd_fmt.format(name=name,
+ passwd=passwd,
+ uid=uid,
+ gid=gid,
+ gecos=gecos,
+ homedir=homedir,
+ shell=shell)
+
+ def useradd(self, name, uid, gid, passwd='', gecos='', dir='', shell=''):
+ pwd_line = self._pwd2line(name, uid, gid, passwd, gecos, dir, shell)
+ self._append_line(pwd_line)
+
+ def usermod(self, name, uid, gid, passwd='', gecos='', dir='', shell=''):
+ pwd_line = self._pwd2line(name, uid, gid, passwd, gecos, dir, shell)
+ self._subst_line(name, pwd_line)
+
+ def userdel(self, name):
+ self._del_line(name)
+
+
+class GroupOps(FilesOps):
+ """
+ A naive implementation of group operations
+ """
+ def __init__(self, file_name):
+ super(GroupOps, self).__init__(file_name)
+
+ def _grp2line(self, name, gid, mem, passwd):
+ member_list = ",".join(m for m in mem)
+ grp_fmt = "{name}:{passwd}:{gid}:{member_list}\n"
+ return grp_fmt.format(name=name,
+ passwd=passwd,
+ gid=gid,
+ member_list=member_list)
+
+ def groupadd(self, name, gid, mem, passwd="*"):
+ grp_line = self._grp2line(name, gid, mem, passwd)
+ self._append_line(grp_line)
+
+ def groupmod(self, old_name, name, gid, mem, passwd="*"):
+ grp_line = self._grp2line(name, gid, mem, passwd)
+ self._subst_line(old_name, grp_line)
+
+ def groupdel(self, name):
+ self._del_line(name)
diff --git a/src/tests/intg/test_files_ops.py b/src/tests/intg/test_files_ops.py
new file mode 100644
index 000000000..63816acbf
--- /dev/null
+++ b/src/tests/intg/test_files_ops.py
@@ -0,0 +1,84 @@
+#
+# SSSD integration test - operations on UNIX user and group database
+#
+# Copyright (c) 2016 Red Hat, Inc.
+#
+# This is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+import pwd
+import grp
+import pytest
+
+import ent
+from files_ops import passwd_ops_setup, group_ops_setup
+
+USER1 = dict(name='user1', passwd='*', uid=10001, gid=20001,
+ gecos='User for tests',
+ dir='/home/user1',
+ shell='/bin/bash')
+
+GROUP1 = dict(name='group1',
+ gid=30001,
+ mem=['user1'])
+
+
+def test_useradd(passwd_ops_setup):
+ with pytest.raises(KeyError):
+ pwd.getpwnam("user1")
+ passwd_ops_setup.useradd(**USER1)
+ ent.assert_passwd_by_name("user1", USER1)
+
+
+def test_usermod(passwd_ops_setup):
+ passwd_ops_setup.useradd(**USER1)
+ ent.assert_passwd_by_name("user1", USER1)
+
+ USER1['shell'] = '/bin/zsh'
+ passwd_ops_setup.usermod(**USER1)
+ ent.assert_passwd_by_name("user1", USER1)
+
+
+def test_userdel(passwd_ops_setup):
+ passwd_ops_setup.useradd(**USER1)
+ ent.assert_passwd_by_name("user1", USER1)
+
+ passwd_ops_setup.userdel("user1")
+ with pytest.raises(KeyError):
+ pwd.getpwnam("user1")
+
+
+def test_groupadd(group_ops_setup):
+ with pytest.raises(KeyError):
+ grp.getgrnam("group1")
+ group_ops_setup.groupadd(**GROUP1)
+ ent.assert_group_by_name("group1", GROUP1)
+
+
+def test_groupmod(group_ops_setup):
+ group_ops_setup.groupadd(**GROUP1)
+ ent.assert_group_by_name("group1", GROUP1)
+
+ modgroup = dict(GROUP1)
+ modgroup['mem'] = []
+
+ group_ops_setup.groupmod(old_name=GROUP1["name"], **modgroup)
+ ent.assert_group_by_name("group1", modgroup)
+
+
+def test_groupdel(group_ops_setup):
+ group_ops_setup.groupadd(**GROUP1)
+ ent.assert_group_by_name("group1", GROUP1)
+
+ group_ops_setup.groupdel("group1")
+ with pytest.raises(KeyError):
+ grp.getgrnam("group1")
diff --git a/src/tests/intg/test_files_provider.py b/src/tests/intg/test_files_provider.py
new file mode 100644
index 000000000..0a2e5a7f4
--- /dev/null
+++ b/src/tests/intg/test_files_provider.py
@@ -0,0 +1,692 @@
+#
+# SSSD files domain tests
+#
+# Copyright (c) 2016 Red Hat, Inc.
+#
+# This is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import os
+import stat
+import time
+import config
+import signal
+import subprocess
+import pwd
+import grp
+import pytest
+
+import ent
+import sssd_id
+from sssd_nss import NssReturnCode
+from sssd_passwd import call_sssd_getpwnam, call_sssd_enumeration
+from sssd_group import call_sssd_getgrnam
+from files_ops import passwd_ops_setup, group_ops_setup
+from util import unindent
+
+CANARY = dict(name='canary', passwd='x', uid=100001, gid=200001,
+ gecos='Used to check if passwd is resolvable',
+ dir='/home/canary',
+ shell='/bin/bash')
+
+USER1 = dict(name='user1', passwd='x', uid=10001, gid=20001,
+ gecos='User for tests',
+ dir='/home/user1',
+ shell='/bin/bash')
+
+USER2 = dict(name='user2', passwd='x', uid=10002, gid=20001,
+ gecos='User2 for tests',
+ dir='/home/user2',
+ shell='/bin/bash')
+
+CANARY_GR = dict(name='canary',
+ gid=300001,
+ mem=[])
+
+GROUP1 = dict(name='group1',
+ gid=30001,
+ mem=['user1'])
+
+GROUP12 = dict(name='group12',
+ gid=30012,
+ mem=['user1', 'user2'])
+
+GROUP_NOMEM = dict(name='group_nomem',
+ gid=40000,
+ mem=[])
+
+
+def stop_sssd():
+ pid_file = open(config.PIDFILE_PATH, "r")
+ pid = int(pid_file.read())
+ os.kill(pid, signal.SIGTERM)
+ while True:
+ try:
+ os.kill(pid, signal.SIGCONT)
+ except:
+ break
+ time.sleep(1)
+
+
+def create_conf_fixture(request, contents):
+ """Generate sssd.conf and add teardown for removing it"""
+ conf = open(config.CONF_PATH, "w")
+ conf.write(contents)
+ conf.close()
+ os.chmod(config.CONF_PATH, stat.S_IRUSR | stat.S_IWUSR)
+ request.addfinalizer(lambda: os.unlink(config.CONF_PATH))
+
+
+def create_sssd_fixture(request):
+ """Start sssd and add teardown for stopping it and removing state"""
+ os.environ["SSS_FILES_PASSWD"] = os.environ["NSS_WRAPPER_PASSWD"]
+ os.environ["SSS_FILES_GROUP"] = os.environ["NSS_WRAPPER_GROUP"]
+ if subprocess.call(["sssd", "-D", "-f"]) != 0:
+ raise Exception("sssd start failed")
+
+ def teardown():
+ try:
+ stop_sssd()
+ except:
+ pass
+ for path in os.listdir(config.DB_PATH):
+ os.unlink(config.DB_PATH + "/" + path)
+ for path in os.listdir(config.MCACHE_PATH):
+ os.unlink(config.MCACHE_PATH + "/" + path)
+ request.addfinalizer(teardown)
+
+
+# Fixtures
+@pytest.fixture
+def files_domain_only(request):
+ conf = unindent("""\
+ [sssd]
+ domains = files
+ services = nss
+
+ [domain/files]
+ id_provider = files
+ """).format(**locals())
+ create_conf_fixture(request, conf)
+ create_sssd_fixture(request)
+ return None
+
+
+def setup_pw_with_list(request, user_list):
+ pwd_ops = passwd_ops_setup(request)
+ for user in user_list:
+ pwd_ops.useradd(**user)
+ ent.assert_passwd_by_name(CANARY['name'], CANARY)
+ return pwd_ops
+
+
+@pytest.fixture
+def add_user_with_canary(request):
+ return setup_pw_with_list(request, [CANARY, USER1])
+
+
+@pytest.fixture
+def setup_pw_with_canary(request):
+ return setup_pw_with_list(request, [CANARY])
+
+
+def setup_gr_with_list(request, group_list):
+ grp_ops = group_ops_setup(request)
+ for group in group_list:
+ grp_ops.groupadd(**group)
+ ent.assert_group_by_name(CANARY_GR['name'], CANARY_GR)
+ return grp_ops
+
+
+@pytest.fixture
+def add_group_with_canary(request):
+ return setup_gr_with_list(request, [GROUP1, CANARY_GR])
+
+
+@pytest.fixture
+def setup_gr_with_canary(request):
+ return setup_gr_with_list(request, [CANARY_GR])
+
+
+def poll_canary(fn, name, threshold=20):
+ """
+ If we query SSSD while it's updating its cache, it would return NOTFOUND
+ rather than a result from potentially outdated or incomplete cache. In
+ reality this doesn't hurt because the order of the modules is normally
+ "sss files" so the user lookup would fall back to files. But in tests
+ we use this loop to wait until the canary user who is always there is
+ resolved.
+ """
+ for _ in range(0, threshold):
+ res, _ = fn(name)
+ if res == NssReturnCode.SUCCESS:
+ return True
+ elif res == NssReturnCode.NOTFOUND:
+ time.sleep(0.1)
+ continue
+ else:
+ return False
+ return False
+
+
+def sssd_getpwnam_sync(name):
+ ret = poll_canary(call_sssd_getpwnam, CANARY["name"])
+ if ret is False:
+ return NssReturnCode.NOTFOUND, None
+
+ return call_sssd_getpwnam(name)
+
+
+def sssd_getgrnam_sync(name):
+ ret = poll_canary(call_sssd_getgrnam, CANARY_GR["name"])
+ if ret is False:
+ return NssReturnCode.NOTFOUND, None
+
+ return call_sssd_getgrnam(name)
+
+
+def sssd_id_sync(name):
+ sssd_getpwnam_sync(CANARY["name"])
+ res, _, groups = sssd_id.get_user_groups(name)
+ return res, groups
+
+
+# Helper functions
+def user_generator(seqnum):
+ return dict(name='user%d' % seqnum,
+ passwd='*',
+ uid=10000 + seqnum,
+ gid=20000 + seqnum,
+ gecos='User for tests',
+ dir='/home/user%d' % seqnum,
+ shell='/bin/bash')
+
+
+def check_user(exp_user, delay=1.0):
+ if delay > 0:
+ time.sleep(delay)
+
+ res, found_user = sssd_getpwnam_sync(exp_user["name"])
+ assert res == NssReturnCode.SUCCESS
+ assert found_user == exp_user
+
+
+def check_group(exp_group, delay=1.0):
+ if delay > 0:
+ time.sleep(delay)
+
+ res, found_group = sssd_getgrnam_sync(exp_group["name"])
+ assert res == NssReturnCode.SUCCESS
+ assert found_group == exp_group
+
+
+def check_group_list(exp_groups_list):
+ for exp_group in exp_groups_list:
+ check_group(exp_group)
+
+
+# User tests
+def test_getpwnam_after_start(add_user_with_canary, files_domain_only):
+ """
+ Test that after startup without any additional operations, a user
+ can be resolved through sssd
+ """
+ res, user = sssd_getpwnam_sync(USER1["name"])
+ assert res == NssReturnCode.SUCCESS
+ assert user == USER1
+
+
+def test_getpwnam_neg(files_domain_only):
+ """
+ Test that a nonexistant user cannot be resolved
+ """
+ res, _ = call_sssd_getpwnam("nosuchuser")
+ assert res == NssReturnCode.NOTFOUND
+
+
+def test_root_does_not_resolve(files_domain_only):
+ """
+ SSSD currently does not resolve the root user even though it can
+ be resolved through the NSS interface
+ """
+ nss_root = pwd.getpwnam("root")
+ assert nss_root is not None
+
+ res, _ = call_sssd_getpwnam("root")
+ assert res == NssReturnCode.NOTFOUND
+
+
+def test_add_remove_add_file_user(setup_pw_with_canary, files_domain_only):
+ """
+ Test that removing a user is detected and the user
+ is removed from the sssd database. Similarly, an add
+ should be detected. Do this several times to test retaining
+ the inotify watch for moved and unlinked files.
+ """
+ res, _ = call_sssd_getpwnam(USER1["name"])
+ assert res == NssReturnCode.NOTFOUND
+
+ setup_pw_with_canary.useradd(**USER1)
+ check_user(USER1)
+
+ setup_pw_with_canary.userdel(USER1["name"])
+ time.sleep(1.0)
+ res, _ = sssd_getpwnam_sync(USER1["name"])
+ assert res == NssReturnCode.NOTFOUND
+
+ setup_pw_with_canary.useradd(**USER1)
+ check_user(USER1)
+
+
+def test_mod_user_shell(add_user_with_canary, files_domain_only):
+ """
+ Test that modifying a user shell is detected and the user
+ is modified in the sssd database
+ """
+ res, user = sssd_getpwnam_sync(USER1["name"])
+ assert res == NssReturnCode.SUCCESS
+ assert user == USER1
+
+ moduser = dict(USER1)
+ moduser['shell'] = '/bin/zsh'
+ add_user_with_canary.usermod(**moduser)
+
+ check_user(moduser)
+
+
+def test_enum_users(setup_pw_with_canary, files_domain_only):
+ """
+ Test that enumerating all users works with the default configuration. Also
+ test that removing all entries and then enumerating again returns an empty
+ set
+ """
+ num_users = 10
+ for i in range(1, num_users+1):
+ user = user_generator(i)
+ setup_pw_with_canary.useradd(**user)
+
+ sssd_getpwnam_sync(CANARY["name"])
+ user_list = call_sssd_enumeration()
+ # +1 because the canary is added
+ assert len(user_list) == num_users+1
+
+
+def incomplete_user_setup(pwd_ops, del_field, exp_field):
+ adduser = dict(USER1)
+ del adduser[del_field]
+ exp_user = dict(USER1)
+ exp_user[del_field] = exp_field
+
+ pwd_ops.useradd(**adduser)
+
+ return exp_user
+
+
+def test_user_no_shell(setup_pw_with_canary, files_domain_only):
+ """
+ Test that resolving a user without a shell defined works and returns
+ a fallback value
+ """
+ check_user(incomplete_user_setup(setup_pw_with_canary, 'shell', ''))
+
+
+def test_user_no_dir(setup_pw_with_canary, files_domain_only):
+ """
+ Test that resolving a user without a homedir defined works and returns
+ a fallback value
+ """
+ check_user(incomplete_user_setup(setup_pw_with_canary, 'dir', '/'))
+
+
+def test_user_no_gecos(setup_pw_with_canary, files_domain_only):
+ """
+ Test that resolving a user without a gecos defined works and returns
+ a fallback value
+ """
+ check_user(incomplete_user_setup(setup_pw_with_canary, 'gecos', ''))
+
+
+def test_user_no_passwd(setup_pw_with_canary, files_domain_only):
+ """
+ Test that resolving a user without a password defined works and returns
+ a fallback value
+ """
+ check_user(incomplete_user_setup(setup_pw_with_canary, 'passwd', 'x'))
+
+
+def bad_incomplete_user_setup(pwd_ops, del_field):
+ adduser = dict(USER1)
+ adduser[del_field] = ''
+
+ pwd_ops.useradd(**adduser)
+
+
+def test_incomplete_user_fail(setup_pw_with_canary, files_domain_only):
+ """
+ Test resolving an incomplete user where the missing field is required
+ to be present in the user record and thus the user shouldn't resolve.
+
+ We cannot test uid and gid missing because nss_wrapper doesn't even
+ load the malformed passwd file, then.
+ """
+ bad_incomplete_user_setup(setup_pw_with_canary, 'name')
+ res, user = sssd_getpwnam_sync(USER1["name"])
+ assert res == NssReturnCode.NOTFOUND
+
+
+def test_getgrnam_after_start(add_group_with_canary, files_domain_only):
+ """
+ Test that after startup without any additional operations, a group
+ can be resolved through sssd
+ """
+ check_group(GROUP1)
+
+
+def test_getgrnam_neg(files_domain_only):
+ """
+ Test that a nonexistant group cannot be resolved
+ """
+ res, user = sssd_getgrnam_sync("nosuchgroup")
+ assert res == NssReturnCode.NOTFOUND
+
+
+def test_root_group_does_not_resolve(files_domain_only):
+ """
+ SSSD currently does not resolve the root group even though it can
+ be resolved through the NSS interface
+ """
+ nss_root = grp.getgrnam("root")
+ assert nss_root is not None
+
+ res, user = call_sssd_getgrnam("root")
+ assert res == NssReturnCode.NOTFOUND
+
+
+def test_add_remove_add_file_group(setup_gr_with_canary, files_domain_only):
+ """
+ Test that removing a group is detected and the group
+ is removed from the sssd database. Similarly, an add
+ should be detected. Do this several times to test retaining
+ the inotify watch for moved and unlinked files.
+ """
+ res, group = call_sssd_getgrnam(GROUP1["name"])
+ assert res == NssReturnCode.NOTFOUND
+
+ setup_gr_with_canary.groupadd(**GROUP1)
+ check_group(GROUP1)
+
+ setup_gr_with_canary.groupdel(GROUP1["name"])
+ time.sleep(1)
+ res, group = call_sssd_getgrnam(GROUP1["name"])
+ assert res == NssReturnCode.NOTFOUND
+
+ setup_gr_with_canary.groupadd(**GROUP1)
+ check_group(GROUP1)
+
+
+def test_mod_group_name(add_group_with_canary, files_domain_only):
+ """
+ Test that modifying a group name is detected and the group
+ is modified in the sssd database
+ """
+ check_group(GROUP1)
+
+ modgroup = dict(GROUP1)
+ modgroup['name'] = 'group1_mod'
+ add_group_with_canary.groupmod(old_name=GROUP1["name"], **modgroup)
+
+ check_group(modgroup)
+
+
+def test_mod_group_gid(add_group_with_canary, files_domain_only):
+ """
+ Test that modifying a group name is detected and the group
+ is modified in the sssd database
+ """
+ check_group(GROUP1)
+
+ modgroup = dict(GROUP1)
+ modgroup['gid'] = 30002
+ add_group_with_canary.groupmod(old_name=GROUP1["name"], **modgroup)
+
+ check_group(modgroup)
+
+
+@pytest.fixture
+def add_group_nomem_with_canary(request):
+ return setup_gr_with_list(request, [GROUP_NOMEM, CANARY_GR])
+
+
+def test_getgrnam_no_members(add_group_nomem_with_canary, files_domain_only):
+ """
+ Test that after startup without any additional operations, a group
+ can be resolved through sssd
+ """
+ check_group(GROUP_NOMEM)
+
+
+def groupadd_list(grp_ops, groups):
+ for grp in groups:
+ grp_ops.groupadd(**grp)
+
+
+def useradd_list(pwd_ops, users):
+ for usr in users:
+ pwd_ops.useradd(**usr)
+
+
+def user_and_group_setup(pwd_ops, grp_ops, users, groups, reverse):
+ """
+ The reverse is added so that we test cases where a group is added first,
+ then a user for this group is created -- in that case, we need to properly
+ link the group after the user is added.
+ """
+ if reverse is False:
+ useradd_list(pwd_ops, users)
+ groupadd_list(grp_ops, groups)
+ else:
+ groupadd_list(grp_ops, groups)
+ useradd_list(pwd_ops, users)
+
+
+def members_check(added_groups):
+ # Test that users are members as per getgrnam
+ check_group_list(added_groups)
+
+ # Test that users are members as per initgroups
+ for group in added_groups:
+ for member in group['mem']:
+ res, groups = sssd_id_sync(member)
+ assert res == sssd_id.NssReturnCode.SUCCESS
+ assert group['name'] in groups
+
+
+def test_getgrnam_members_users_first(setup_pw_with_canary,
+ setup_gr_with_canary,
+ files_domain_only):
+ """
+ A user is linked with a group
+ """
+ user_and_group_setup(setup_pw_with_canary,
+ setup_gr_with_canary,
+ [USER1],
+ [GROUP1],
+ False)
+ members_check([GROUP1])
+
+
+def test_getgrnam_members_users_multiple(setup_pw_with_canary,
+ setup_gr_with_canary,
+ files_domain_only):
+ """
+ Multiple users are linked with a group
+ """
+ user_and_group_setup(setup_pw_with_canary,
+ setup_gr_with_canary,
+ [USER1, USER2],
+ [GROUP12],
+ False)
+ members_check([GROUP12])
+
+
+def test_getgrnam_members_groups_first(setup_pw_with_canary,
+ setup_gr_with_canary,
+ files_domain_only):
+ """
+ A group is linked with a user
+ """
+ user_and_group_setup(setup_pw_with_canary,
+ setup_gr_with_canary,
+ [USER1],
+ [GROUP1],
+ True)
+ members_check([GROUP1])
+
+
+def test_getgrnam_ghost(setup_pw_with_canary,
+ setup_gr_with_canary,
+ files_domain_only):
+ """
+ Test that a group with members while the members are not present
+ are added as ghosts. This is also what nss_files does, getgrnam would
+ return group members that do not exist as well.
+ """
+ user_and_group_setup(setup_pw_with_canary,
+ setup_gr_with_canary,
+ [],
+ [GROUP12],
+ False)
+ check_group(GROUP12)
+ for member in GROUP12['mem']:
+ res, _ = call_sssd_getpwnam(member)
+ assert res == NssReturnCode.NOTFOUND
+
+
+def ghost_and_member_test(pw_ops, grp_ops, reverse):
+ user_and_group_setup(pw_ops,
+ grp_ops,
+ [USER1],
+ [GROUP12],
+ reverse)
+ check_group(GROUP12)
+
+ # We checked that the group added has the same members as group12,
+ # so both user1 and user2. Now check that user1 is a member of
+ # group12 and its own primary GID but user2 doesn't exist, it's
+ # just a ghost entry
+ res, groups = sssd_id_sync('user1')
+ assert res == sssd_id.NssReturnCode.SUCCESS
+ assert len(groups) == 2
+ assert 'group12' in groups
+
+ res, _ = call_sssd_getpwnam('user2')
+ assert res == NssReturnCode.NOTFOUND
+
+
+def test_getgrnam_user_ghost_and_member(setup_pw_with_canary,
+ setup_gr_with_canary,
+ files_domain_only):
+ """
+ Test that a group with one member and one ghost.
+ """
+ ghost_and_member_test(setup_pw_with_canary,
+ setup_gr_with_canary,
+ False)
+
+
+def test_getgrnam_user_member_and_ghost(setup_pw_with_canary,
+ setup_gr_with_canary,
+ files_domain_only):
+ """
+ Test that a group with one member and one ghost, adding the group
+ first and then linking the member
+ """
+ ghost_and_member_test(setup_pw_with_canary,
+ setup_gr_with_canary,
+ True)
+
+
+def test_getgrnam_add_remove_members(setup_pw_with_canary,
+ add_group_nomem_with_canary,
+ files_domain_only):
+ """
+ Test that a user is linked with a group
+ """
+ pwd_ops = setup_pw_with_canary
+
+ check_group(GROUP_NOMEM)
+
+ for usr in [USER1, USER2]:
+ pwd_ops.useradd(**usr)
+
+ modgroup = dict(GROUP_NOMEM)
+ modgroup['mem'] = ['user1', 'user2']
+ add_group_nomem_with_canary.groupmod(old_name=modgroup['name'], **modgroup)
+ check_group(modgroup)
+
+ res, groups = sssd_id_sync('user1')
+ assert res == sssd_id.NssReturnCode.SUCCESS
+ assert len(groups) == 2
+ assert 'group_nomem' in groups
+
+ res, groups = sssd_id_sync('user2')
+ assert res == sssd_id.NssReturnCode.SUCCESS
+ assert 'group_nomem' in groups
+
+ modgroup['mem'] = ['user2']
+ add_group_nomem_with_canary.groupmod(old_name=modgroup['name'], **modgroup)
+ check_group(modgroup)
+
+ # User1 exists, but is not a member of any supplementary group anymore
+ res, _ = call_sssd_getpwnam('user1')
+ assert res == sssd_id.NssReturnCode.SUCCESS
+ res, groups = sssd_id_sync('user1')
+ assert res == sssd_id.NssReturnCode.NOTFOUND
+
+ # user2 still is
+ res, groups = sssd_id_sync('user2')
+ assert res == sssd_id.NssReturnCode.SUCCESS
+ assert len(groups) == 2
+ assert 'group_nomem' in groups
+
+
+def test_getgrnam_add_remove_ghosts(setup_pw_with_canary,
+ add_group_nomem_with_canary,
+ files_domain_only):
+ """
+ Test that a user is linked with a group
+ """
+ pwd_ops = setup_pw_with_canary
+
+ check_group(GROUP_NOMEM)
+
+ modgroup = dict(GROUP_NOMEM)
+ modgroup['mem'] = ['user1', 'user2']
+ add_group_nomem_with_canary.groupmod(old_name=modgroup['name'], **modgroup)
+ check_group(modgroup)
+
+ modgroup['mem'] = ['user2']
+ add_group_nomem_with_canary.groupmod(old_name=modgroup['name'], **modgroup)
+ check_group(modgroup)
+
+ res, _ = call_sssd_getpwnam('user1')
+ assert res == NssReturnCode.NOTFOUND
+ res, _ = call_sssd_getpwnam('user2')
+ assert res == NssReturnCode.NOTFOUND
+
+ # Add this user and verify it's been added as a member
+ pwd_ops.useradd(**USER2)
+ res, groups = sssd_id_sync('user2')
+ assert res == sssd_id.NssReturnCode.SUCCESS
+ assert len(groups) == 2
+ assert 'group_nomem' in groups