diff options
author | Nikolai Kondrashov <Nikolai.Kondrashov@redhat.com> | 2014-11-24 19:13:16 +0200 |
---|---|---|
committer | Jakub Hrozek <jhrozek@redhat.com> | 2015-05-28 13:55:52 +0200 |
commit | 9d453f1e8b28983b363b44c49b7cd701a994fd97 (patch) | |
tree | f681e8183b68cfcca3e7b618b119238489b46cce /src | |
parent | 9c5e4ae08ea41f9b1cdb3b3d0e9c35056baeab86 (diff) | |
download | sssd-9d453f1e8b28983b363b44c49b7cd701a994fd97.tar.gz sssd-9d453f1e8b28983b363b44c49b7cd701a994fd97.tar.xz sssd-9d453f1e8b28983b363b44c49b7cd701a994fd97.zip |
Add integration tests
Add "intgcheck" make target. Update CI to use it.
The "intgcheck" target configures and builds sssd in a sub-directory,
installs it into a prefix in another sub-directory, and then makes the
"intgcheck-installed" target from within src/tests/intg in that separate
build.
The "intgcheck-installed" target in src/tests/intg runs py.test for all
tests it can find in that directory, under fakeroot and
nss_wrapper/uid_wrapper environments emulating running under root.
It also adds the value of INTGCHECK_PYTEST_ARGS environment/make
variable to the py.test command line. You can use it to pass additional
py.test options, such as specifying a subset of tests to run. See
"py.test --help" output.
There are only two test suites in src/tests/intg at the moment:
ent_test.py and ldap_test.py.
The ent_test.py runs tests on ent.py - a module of assertion functions
for checking entries in NSS database (passwd and group), for use in
actual tests. The ent_test.py suite can be used as ent.py usage
reference.
The ldap_test.py suite sets up and starts a slapd instance, adds a few
user and group entries, configures and starts sssd and verifies that
those users and groups are retrieved correctly using various NSS
functions. The tests are very basic at the moment.
Reviewed-by: Lukáš Slebodník <lslebodn@redhat.com>
Reviewed-by: Michal Židek <mzidek@redhat.com>
Diffstat (limited to 'src')
-rw-r--r-- | src/external/cwrap.m4 | 7 | ||||
-rw-r--r-- | src/external/intgcheck.m4 | 32 | ||||
-rw-r--r-- | src/external/ldap.m4 | 4 | ||||
-rw-r--r-- | src/tests/intg/Makefile.am | 61 | ||||
-rw-r--r-- | src/tests/intg/config.py.m4 | 13 | ||||
-rw-r--r-- | src/tests/intg/ds.py | 61 | ||||
-rw-r--r-- | src/tests/intg/ds_openldap.py | 279 | ||||
-rw-r--r-- | src/tests/intg/ent.py | 470 | ||||
-rw-r--r-- | src/tests/intg/ent_test.py | 417 | ||||
-rw-r--r-- | src/tests/intg/ldap_ent.py | 102 | ||||
-rw-r--r-- | src/tests/intg/ldap_test.py | 261 | ||||
-rw-r--r-- | src/tests/intg/util.py | 55 |
12 files changed, 1758 insertions, 4 deletions
diff --git a/src/external/cwrap.m4 b/src/external/cwrap.m4 index 0bd0bc9c9..b03d1ef00 100644 --- a/src/external/cwrap.m4 +++ b/src/external/cwrap.m4 @@ -4,20 +4,19 @@ dnl AM_CHECK_WRAPPER(name, conditional) dnl If the cwrap library is found, sets the HAVE_$name conditional AC_DEFUN([AM_CHECK_WRAPPER], [ - FOUND_WRAPPER=0 - AC_MSG_CHECKING([for $1]) PKG_CHECK_EXISTS([$1], [ AC_MSG_RESULT([yes]) - FOUND_WRAPPER=1 + AC_SUBST([$2], [yes]) ], [ AC_MSG_RESULT([no]) + AC_SUBST([$2], [no]) AC_MSG_WARN([cwrap library $1 not found, some tests will not run]) ]) - AM_CONDITIONAL($2, [ test x$FOUND_WRAPPER = x1]) + AM_CONDITIONAL($2, [ test x$2 = xyes]) ]) AC_DEFUN([AM_CHECK_UID_WRAPPER], diff --git a/src/external/intgcheck.m4 b/src/external/intgcheck.m4 new file mode 100644 index 000000000..80d41b599 --- /dev/null +++ b/src/external/intgcheck.m4 @@ -0,0 +1,32 @@ +AC_CHECK_PROG([HAVE_FAKEROOT], [fakeroot], [yes], [no]) + +AC_PATH_PROG([PYTEST], [py.test]) +AS_IF([test -n "$PYTEST"], [HAVE_PYTEST=yes], [HAVE_PYTEST=no]) + +dnl Check for variable and fail unless value is "yes" +dnl The second argument will be printed in error message in case of error +dnl Usage: +dnl SSS_INTGCHECK_REQ(variable, message) + +AC_DEFUN([SSS_INTGCHECK_REQ], [ + AS_IF([test x$$1 = xyes], , [ + AC_MSG_ERROR([cannot enable integration tests: $2 not found])]) +]) + +AC_DEFUN([SSS_ENABLE_INTGCHECK_REQS], [ + AC_ARG_ENABLE(intgcheck-reqs, + [AS_HELP_STRING([--enable-intgcheck-reqs], + [enable checking for integration test requirements [default=no]])], + [enable_intgcheck_reqs="$enableval"], + [enable_intgcheck_reqs="no"]) + if test x"$enable_intgcheck_reqs" = xyes; then + SSS_INTGCHECK_REQ([HAVE_UID_WRAPPER], [uid_wrapper]) + SSS_INTGCHECK_REQ([HAVE_NSS_WRAPPER], [nss_wrapper]) + SSS_INTGCHECK_REQ([HAVE_SLAPD], [slapd]) + SSS_INTGCHECK_REQ([HAVE_LDAPMODIFY], [ldapmodify]) + SSS_INTGCHECK_REQ([HAVE_FAKEROOT], [fakeroot]) + SSS_INTGCHECK_REQ([HAVE_PYTHON2], [python2]) + SSS_INTGCHECK_REQ([HAVE_PYTEST], [pytest]) + SSS_INTGCHECK_REQ([HAVE_PY2MOD_LDAP], [python-ldap]) + fi +]) diff --git a/src/external/ldap.m4 b/src/external/ldap.m4 index 3a99ddfcc..43a01efae 100644 --- a/src/external/ldap.m4 +++ b/src/external/ldap.m4 @@ -90,3 +90,7 @@ AC_CHECK_TYPE([LDAPDerefRes], CFLAGS=$SAVE_CFLAGS LIBS=$SAVE_LIBS +AC_PATH_PROG([SLAPD], [slapd], , + [$PATH$PATH_SEPARATOR/usr/sbin$PATH_SEPARATOR]) +AS_IF([test -n "$SLAPD"], [HAVE_SLAPD=yes], [HAVE_SLAPD=no]) +AC_CHECK_PROG([HAVE_LDAPMODIFY], [ldapmodify], [yes], [no]) diff --git a/src/tests/intg/Makefile.am b/src/tests/intg/Makefile.am new file mode 100644 index 000000000..9383e1120 --- /dev/null +++ b/src/tests/intg/Makefile.am @@ -0,0 +1,61 @@ +dist_noinst_DATA = \ + config.py.m4 \ + ds.py \ + ds_openldap.py \ + ent.py \ + ent_test.py \ + ldap_ent.py \ + ldap_test.py \ + util.py \ + $(NULL) + +config.py: config.py.m4 + m4 -D "prefix=\`$(prefix)'" \ + -D "sysconfdir=\`$(sysconfdir)'" \ + -D "dbpath=\`$(dbpath)'" \ + -D "pidpath=\`$(pidpath)'" \ + -D "logpath=\`$(logpath)'" \ + -D "mcpath=\`$(mcpath)'" \ + $< > $@ + +root: + : "Create directory for emulated root's D-Bus cookies." + : "See http://dbus.freedesktop.org/doc/dbus-specification.html#auth-mechanisms" + $(MKDIR_P) -m 0700 root/.dbus-keyrings + +passwd: root + echo "root:x:0:0:root:$(abs_builddir)/root:/bin/bash" > $@ + +group: + echo "root:x:0:" > $@ + +CLEANFILES=config.py config.pyc passwd group + +clean-local: + rm -Rf root + +intgcheck-installed: config.py passwd group + pipepath="$(DESTDIR)$(pipepath)"; \ + if test $${#pipepath} -gt 80; then \ + echo "error: Pipe directory path too long," \ + "D-Bus won't be able to open sockets" >&2; \ + exit 1; \ + fi + set -e; \ + cd "$(abs_srcdir)"; \ + nss_wrapper=$$(pkg-config --libs nss_wrapper); \ + uid_wrapper=$$(pkg-config --libs uid_wrapper); \ + PATH="$$(dirname -- $(SLAPD)):$$PATH" \ + PATH="$(DESTDIR)$(sbindir):$(DESTDIR)$(bindir):$$PATH" \ + PATH="$(abs_builddir):$(abs_srcdir):$$PATH" \ + PYTHONPATH="$(abs_builddir):$(abs_srcdir)" \ + LDB_MODULES_PATH="$(DESTDIR)$(ldblibdir)" \ + LD_PRELOAD="$$nss_wrapper $$uid_wrapper" \ + NSS_WRAPPER_PASSWD="$(abs_builddir)/passwd" \ + NSS_WRAPPER_GROUP="$(abs_builddir)/group" \ + NSS_WRAPPER_MODULE_SO_PATH="$(DESTDIR)$(nsslibdir)/libnss_sss.so.2" \ + NSS_WRAPPER_MODULE_FN_PREFIX="sss" \ + UID_WRAPPER=1 \ + UID_WRAPPER_ROOT=1 \ + fakeroot $(PYTHON2) $(PYTEST) -v --tb=native $(INTGCHECK_PYTEST_ARGS) . + rm -f $(DESTDIR)$(logpath)/* diff --git a/src/tests/intg/config.py.m4 b/src/tests/intg/config.py.m4 new file mode 100644 index 000000000..563127c6e --- /dev/null +++ b/src/tests/intg/config.py.m4 @@ -0,0 +1,13 @@ +""" +Build configuration variables. +""" + +PREFIX = "prefix" +SYSCONFDIR = "sysconfdir" +SSSDCONFDIR = SYSCONFDIR + "/sssd" +CONF_PATH = SSSDCONFDIR + "/sssd.conf" +DB_PATH = "dbpath" +PID_PATH = "pidpath" +PIDFILE_PATH = PID_PATH + "/sssd.pid" +LOG_PATH = "logpath" +MCACHE_PATH = "mcpath" diff --git a/src/tests/intg/ds.py b/src/tests/intg/ds.py new file mode 100644 index 000000000..a87190275 --- /dev/null +++ b/src/tests/intg/ds.py @@ -0,0 +1,61 @@ +# +# Abstract directory server instance class +# +# Copyright (c) 2015 Red Hat, Inc. +# Author: Nikolai Kondrashov <Nikolai.Kondrashov@redhat.com> +# +# This is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import ldap + +class DS: + """Abstract directory server instance.""" + + def __init__(self, dir, port, base_dn, admin_rdn, admin_pw): + """ + Initialize the instance. + + Arguments: + dir Path to the root of the filesystem hierarchy to create + the instance under. + port TCP port on localhost to bind the server to. + base_dn Base DN. + admin_rdn Administrator DN, relative to BASE_DN. + admin_pw Administrator password. + """ + self.dir = dir + self.port = port + self.ldap_url = "ldap://localhost:" + str(self.port) + self.base_dn = base_dn + self.admin_rdn = admin_rdn + self.admin_dn = admin_rdn + "," + base_dn + self.admin_pw = admin_pw + + def setup(self): + """Setup the instance""" + raise NotImplementedError() + + def teardown(self): + """Teardown the instance""" + raise NotImplementedError() + + def bind(self): + """Connect to the server and bind as admin, return connection.""" + conn = ldap.initialize(self.ldap_url) + conn.simple_bind_s(self.admin_dn, self.admin_pw) + return conn + + def __del__(self): + """Destroy the instance.""" + self.teardown() diff --git a/src/tests/intg/ds_openldap.py b/src/tests/intg/ds_openldap.py new file mode 100644 index 000000000..c58e53a2a --- /dev/null +++ b/src/tests/intg/ds_openldap.py @@ -0,0 +1,279 @@ +# +# OpenLDAP directory server instance class +# +# Copyright (c) 2015 Red Hat, Inc. +# Author: Nikolai Kondrashov <Nikolai.Kondrashov@redhat.com> +# +# This is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import hashlib +import base64 +import urllib +import time +import ldap +import os +import errno +import signal +import shutil +import sys +from util import * +from ds import DS + +def hash_password(password): + """Generate userPassword value for a password.""" + salt = os.urandom(4) + hash = hashlib.sha1(password) + hash.update(salt) + return "{SSHA}" + base64.standard_b64encode(hash.digest() + salt) + +class DSOpenLDAP(DS): + """OpenLDAP directory server instance.""" + + def __init__(self, dir, port, base_dn, admin_rdn, admin_pw): + """ + Initialize the instance. + + Arguments: + dir Path to the root of the filesystem hierarchy to create + the instance under. + port TCP port on localhost to bind the server to. + base_dn Base DN. + admin_rdn Administrator DN, relative to BASE_DN. + admin_pw Administrator password. + """ + DS.__init__(self, dir, port, base_dn, admin_rdn, admin_pw) + self.run_dir = self.dir + "/var/run/ldap" + self.pid_path = self.run_dir + "/slapd.pid" + self.conf_dir = self.dir + "/etc/ldap" + self.conf_slapd_d_dir = self.conf_dir + "/slapd.d" + self.data_dir = self.dir + "/var/lib/ldap" + + def _setup_config(self): + """Setup the instance initial configuration.""" + dist_lib_dir = first_dir("/usr/lib/openldap", + "/usr/lib64/openldap", + "/usr/lib/ldap") + dist_conf_dir = first_dir("/etc/ldap", + "/etc/openldap") + args_file = self.run_dir + "/slapd.args" + admin_pw_hash = hash_password(self.admin_pw) + uid = os.geteuid() + gid = os.getegid() + + # + # Add configuration + # + config = unindent(""" + dn: cn=config + objectClass: olcGlobal + cn: config + olcPidFile: {self.pid_path} + olcArgsFile: {args_file} + # Read slapd.conf(5) for possible values + olcLogLevel: none + + # Frontend settings + dn: olcDatabase={{-1}}frontend,cn=config + objectClass: olcDatabaseConfig + objectClass: olcFrontendConfig + olcDatabase: {{-1}}frontend + # The maximum number of entries that is returned for + # a search operation + olcSizeLimit: 500 + # Allow unlimited access to local connection from the local root + olcAccess: {{0}}to * by dn.exact=gidNumber={gid}+uidNumber={uid}, + cn=peercred,cn=external,cn=auth manage by * break + # Allow unauthenticated read access for schema and + # base DN autodiscovery + olcAccess: {{1}}to dn.exact="" by * read + olcAccess: {{2}}to dn.base="cn=Subschema" by * read + + # Config db settings + dn: olcDatabase=config,cn=config + objectClass: olcDatabaseConfig + olcDatabase: config + # Allow unlimited access to local connection from the local root + olcAccess: to * by dn.exact=gidNumber={gid}+uidNumber={uid}, + cn=peercred,cn=external,cn=auth manage by * break + olcRootDN: {self.admin_rdn},cn=config + olcRootPW: {admin_pw_hash} + + # Load schemas + dn: cn=schema,cn=config + objectClass: olcSchemaConfig + cn: schema + + include: file://{dist_conf_dir}/schema/core.ldif + include: file://{dist_conf_dir}/schema/cosine.ldif + include: file://{dist_conf_dir}/schema/nis.ldif + include: file://{dist_conf_dir}/schema/inetorgperson.ldif + + # Load module + dn: cn=module{{0}},cn=config + objectClass: olcModuleList + cn: module{{0}} + olcModulePath: {dist_lib_dir} + olcModuleLoad: back_hdb + + # Set defaults for the backend + dn: olcBackend=hdb,cn=config + objectClass: olcBackendConfig + olcBackend: hdb + + # The database definition. + dn: olcDatabase=hdb,cn=config + objectClass: olcDatabaseConfig + objectClass: olcHdbConfig + olcDatabase: hdb + olcDbCheckpoint: 512 30 + olcLastMod: TRUE + olcSuffix: {self.base_dn} + olcDbDirectory: {self.data_dir} + olcRootDN: {self.admin_dn} + olcRootPW: {admin_pw_hash} + olcDbIndex: objectClass eq + olcDbIndex: cn,uid eq + olcDbIndex: uidNumber,gidNumber eq + olcDbIndex: member,memberUid eq + olcAccess: to attrs=userPassword,shadowLastChange + by self write + by anonymous auth + by * none + olcAccess: to dn.base="" by * read + olcAccess: to * + by * read + """).format(**locals()) + + slapadd = subprocess.Popen( + ["slapadd", "-F", self.conf_slapd_d_dir, "-b", "cn=config"], + stdin = subprocess.PIPE, close_fds = True + ) + slapadd.communicate(config) + if slapadd.returncode != 0: + raise Exception("Failed to add configuration with slapadd") + + # + # Add database config (example from distribution) + # + db_config = unindent(""" + # One 0.25 GB cache + set_cachesize 0 268435456 1 + + # Transaction Log settings + set_lg_regionmax 262144 + set_lg_bsize 2097152 + """) + db_config_file = open(self.data_dir + "/DB_CONFIG", "w") + db_config_file.write(db_config) + db_config_file.close() + + def setup(self): + """Setup the instance.""" + ldapi_socket = self.run_dir + "/ldapi" + ldapi_url = "ldapi://" + urllib.quote(ldapi_socket, "") + url_list = ldapi_url + " " + self.ldap_url + + os.makedirs(self.conf_slapd_d_dir) + os.makedirs(self.run_dir) + os.makedirs(self.data_dir) + + # + # Setup initial configuration + # + self._setup_config() + + # + # Start the daemon + # + if subprocess.call(["slapd", "-F", self.conf_slapd_d_dir, + "-h", url_list]) != 0: + raise Exception("Failed to start slapd") + + # + # Wait until it is available + # + attempt = 0 + while True: + try: + ldap_conn = ldap.initialize(ldapi_url) + ldap_conn.simple_bind_s(self.admin_rdn + ",cn=config", self.admin_pw) + ldap_conn.unbind_s() + ldap_conn = ldap.initialize(self.ldap_url) + ldap_conn.simple_bind_s(self.admin_dn, self.admin_pw) + ldap_conn.unbind_s() + break + except ldap.SERVER_DOWN: + pass + if ++attempt > 30: + raise Exception("Failed to start slapd") + time.sleep(1) + + # + # Relax requirement of member attribute presence in groupOfNames + # + modlist = [ + (ldap.MOD_DELETE, "olcObjectClasses", + "{7}( 2.5.6.9 NAME 'groupOfNames' " + "DESC 'RFC2256: a group of names (DNs)' SUP top " + "STRUCTURAL MUST ( member $ cn ) MAY ( businessCategory $ " + "seeAlso $ owner $ ou $ o $ description ) )"), + (ldap.MOD_ADD, "olcObjectClasses", + "{7}( 2.5.6.9 NAME 'groupOfNames' " + "DESC 'RFC2256: a group of names (DNs)' SUP top " + "STRUCTURAL MUST ( cn ) MAY ( member $ businessCategory $ " + "seeAlso $ owner $ ou $ o $ description ) )"), + ] + ldap_conn = ldap.initialize(ldapi_url) + ldap_conn.simple_bind_s(self.admin_rdn + ",cn=config", self.admin_pw) + ldap_conn.modify_s("cn={0}core,cn=schema,cn=config", modlist) + ldap_conn.unbind_s() + + # + # Add data + # + ldap_conn = ldap.initialize(self.ldap_url) + ldap_conn.simple_bind_s(self.admin_dn, self.admin_pw) + ldap_conn.add_s(self.base_dn, [ + ("objectClass", ["dcObject", "organization"]), + ("o", "Example Company"), + ]) + ldap_conn.add_s("cn=Manager," + self.base_dn, [ + ("objectClass", "organizationalRole"), + ]) + for ou in ("Users", "Groups", "Netgroups", "Services", "Policies"): + ldap_conn.add_s("ou=" + ou + "," + self.base_dn, [ + ("objectClass", ["top", "organizationalUnit"]), + ]) + ldap_conn.unbind_s() + + def teardown(self): + """Teardown the instance.""" + # Wait for slapd to stop + try: + pid_file = open(self.pid_path, "r") + try: + os.kill(int(pid_file.read()), signal.SIGTERM) + finally: + pid_file.close() + attempt = 0 + while os.path.isfile(self.pid_path): + if ++attempt > 30: + raise Exception("Failed to stop slapd") + time.sleep(1) + except IOError, e: + if e.errno != errno.ENOENT: + raise + + for path in (self.conf_slapd_d_dir, self.run_dir, self.data_dir): + shutil.rmtree(path, True) diff --git a/src/tests/intg/ent.py b/src/tests/intg/ent.py new file mode 100644 index 000000000..13feaaf04 --- /dev/null +++ b/src/tests/intg/ent.py @@ -0,0 +1,470 @@ +# +# Abstract passwd/group entry management +# +# Copyright (c) 2015 Red Hat, Inc. +# Author: Nikolai Kondrashov <Nikolai.Kondrashov@redhat.com> +# +# This is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +from pprint import pformat +import pwd +import grp + +_PASSWD_LIST_DESC = {None: ("user", {})} +_GROUP_DESC = {"mem": ("member list", {None: ("member", {})})} +_GROUP_LIST_DESC = {None: ("group", _GROUP_DESC)} + +def _get_desc(desc_map, key): + """ + Get an item description from a container description map. + + Arguments: + desc_map Container description map. + key Item key, None for wildcard description. + """ + assert isinstance(desc_map, dict) + if key in desc_map: + return desc_map[key] + if None in desc_map: + desc = desc_map[None] + if key != None: + desc = (desc[0] + " " + pformat(key), desc[1]) + return desc + elif key == None: + return ("item", {}) + else: + return (pformat(key), {}) + +def _diff(ent, pattern, desc_map={}): + """ + Describe difference between an entry and a pattern. + Return None, if none. + + Arguments: + ent Entry. + pattern Pattern. + desc_map Container pattern description map. + + An entry is a value, a list of entries, or a dictionary of entries. + Entries are used to store passwd and group database entries as + dictionaries, in lists and dictionaries. + + A pattern is a value, a tuple, a list, or a dictionary of patterns. + + E.g. 123, "abc", [ 123, "abc" ], { "abc": 123 }, { "abc": ( 123 ) } + + A pattern can be matched against a value, a list, or a dictionary entry. + + A value is considered matching, if it's equal to the pattern. + + E.g. 123 == 123, 123 != 456, "abc" == "abc", "abc" != "def", 123 != "abc" + + A list is considered matching a pattern, if the pattern is a list or a + tuple, where each of pattern list items matches an entry list item and + vice versa, or where each pattern tuple item matches an entry list item, + but not necessarily the other way around. + + E.g. [] != "abc", [] == [], [ "abc", 123 ] == [ 123, "abc" ], + [ "abc" ] != [ 123 ], [ 123 ] != [], + [] == (), [ "abc", 123 ] == ( 123, "abc" ), + [ "abc" ] != ( 123 ), [ 123 ] == (), [ 123, "abc" ] == ( 123 ) + + NOTE: For the sake of readability, it is recommended to use + "contains_only" function to create patterns matching all entry list + items (list patterns), and "contains" function to create patterns + matching a subset of entry list items (tuple patterns). + + A dictionary is considered matching a pattern, if it is also a dictionary, + and all of pattern values match identically-keyed values of the + dictionary. + + E.g. {} == {}, {} != "abc", { "abc": 123, "def": 456 } == { "abc": 123 }, + { "abc": 123 } == {} + + Container pattern description map is a dictionary with keys being item + keys/indices and values being (name, description map) tuples. None key + points to a wildcard description, others to specific item descriptions. + The description map argument is optional, and is used to generate more + readable difference explanations. + """ + assert isinstance(desc_map, dict) + + if isinstance(pattern, dict): + if not isinstance(ent, dict): + return "not a dict, " + str(type(ent)) + + for key, value in pattern.iteritems(): + item_name, item_map = _get_desc(desc_map, key) + d = _diff(ent[key], value, item_map) + if d: + return item_name + " mismatch: " + d + elif isinstance(pattern, tuple): + if not isinstance(ent, list): + return "not a list, " + str(type(ent)) + + pattern_matches = [0 for pv in pattern] + + for ei, ev in enumerate(ent): + for pi, pv in enumerate(pattern): + d = _diff(ev, pv) + if not d: + pattern_matches[pi] += 1 + + unmatched_pattern = [ pattern[pi] for pi in \ + xrange(0, len(pattern)) \ + if pattern_matches[pi] == 0 ] + + items = _get_desc(desc_map, None)[0] + "s" + if len(unmatched_pattern) > 0: + return "\nexpected " + items + " not found:\n" + \ + pformat(unmatched_pattern) + elif isinstance(pattern, list): + if not isinstance(ent, list): + return "not a list, " + str(type(ent)) + + pattern_matches = [0 for pv in pattern] + ent_matches = [0 for ev in ent] + + for ei, ev in enumerate(ent): + for pi, pv in enumerate(pattern): + d = _diff(ev, pv) + if not d: + pattern_matches[pi] += 1 + ent_matches[ei] += 1 + + unmatched_pattern = [ pattern[pi] for pi in \ + xrange(0, len(pattern)) \ + if pattern_matches[pi] == 0 ] + unmatched_ent = [ ent[pi] for pi in \ + xrange(0, len(ent)) \ + if ent_matches[pi] == 0 ] + + items = _get_desc(desc_map, None)[0] + "s" + d = "" + if len(unmatched_pattern) > 0: + d += "\nexpected " + items + " not found:\n" + \ + pformat(unmatched_pattern) + if len(unmatched_ent) != 0: + d += "\nunexpected " + items + " found:\n" + \ + pformat(unmatched_ent) + if len(d) > 0: + return d + else: + if pattern != ent: + return pformat(pattern) + " != " + pformat(ent) + + return None + +def contains_only(*args): + """ + Produce a pattern matching all list items against arguments. + Use this function instead of constructing bare lists, for readability. + """ + return list(args) + +def contains(*args): + """ + Produce a pattern matching a subset of list items against arguments. + Use this function instead of constructing bare tuples, for readability. + """ + return args + +def _convert_passwd(passwd): + """ + Convert a passwd entry returned by pwd module to an entry dictionary. + """ + return dict( + name = passwd.pw_name, + passwd = passwd.pw_passwd, + uid = passwd.pw_uid, + gid = passwd.pw_gid, + gecos = passwd.pw_gecos, + dir = passwd.pw_dir, + shell = passwd.pw_shell + ) + +def get_passwd_by_name(name): + """Get a passwd database entry by name.""" + return _convert_passwd(pwd.getpwnam(name)) + +def get_passwd_by_uid(uid): + """Get a passwd database entry by UID.""" + return _convert_passwd(pwd.getpwuid(uid)) + +def assert_passwd_by_name(name, pattern): + """Assert a passwd entry, retrieved by name, matches a pattern.""" + try: + ent = get_passwd_by_name(name) + except KeyError, err: + assert False, err + d = _diff(ent, pattern) + assert not d, d + +def assert_passwd_by_uid(uid, pattern): + """Assert a passwd entry, retrieved by UID, matches a pattern.""" + try: + ent = get_passwd_by_uid(uid) + except KeyError, err: + assert False, err + d = _diff(ent, pattern) + assert not d, d + +def get_passwd_list(): + """Get passwd database entry list with root user removed.""" + passwd_list = pwd.getpwall() + for i, v in enumerate(passwd_list): + if v.pw_name == "root" and v.pw_uid == 0 and v.pw_gid == 0: + del passwd_list[i] + return map(_convert_passwd, passwd_list) + raise Exception("no root user found") + +def assert_passwd_list(pattern): + """Assert retrieved passwd list matches a pattern.""" + d = _diff(get_passwd_list(), pattern, _PASSWD_LIST_DESC) + assert not d, d + +def _diff_each_passwd_by_name(pattern_dict): + """ + Describe difference between each pattern_dict value and a passwd entry + retrieved by name being the corresponding key. + """ + try: + ent = dict((k, get_passwd_by_name(k)) for k in pattern_dict.keys()) + except KeyError, err: + return str(err) + return _diff(ent, pattern_dict, _PASSWD_LIST_DESC) + +def _diff_each_passwd_by_uid(pattern_dict): + """ + Describe difference between each pattern_dict value and a passwd entry + retrieved by UID being the corresponding key. + """ + try: + ent = dict((k, get_passwd_by_uid(k)) for k in pattern_dict.keys()) + except KeyError, err: + return str(err) + return _diff(ent, pattern_dict, _PASSWD_LIST_DESC) + +def _diff_each_passwd_with_name(pattern_seq): + """ + Describe difference between each pattern in pattern_seq sequence and a + passwd entry retrieved by name being the pattern's "name" value. + """ + return _diff_each_passwd_by_name(dict((p["name"], p) for p in pattern_seq)) + +def _diff_each_passwd_with_uid(pattern_seq): + """ + Describe difference between each pattern in pattern_seq sequence and a + passwd entry retrieved by UID being the pattern's "uid" value. + """ + return _diff_each_passwd_by_uid(dict((p["uid"], p) for p in pattern_seq)) + +def assert_each_passwd_by_name(pattern_dict): + """ + Assert each pattern_dict value matches a passwd entry retrieved by + name being the corresponding key. + """ + d = _diff_each_passwd_by_name(pattern_dict) + assert not d, d + +def assert_each_passwd_by_uid(pattern_dict): + """ + Assert each pattern_dict value matches a passwd entry retrieved by + UID being the corresponding key. + """ + d = _diff_each_passwd_by_uid(pattern_dict) + assert not d, d + +def assert_each_passwd_with_name(pattern_seq): + """ + Assert each pattern in pattern_seq sequence matches a passwd entry + retrieved by name being the pattern's "name" value. + """ + d = _diff_each_passwd_with_name(pattern_seq) + assert not d, d + +def assert_each_passwd_with_uid(pattern_seq): + """ + Assert each pattern in pattern_seq sequence matches a passwd entry + retrieved by UID being the pattern's "uid" value. + """ + d = _diff_each_passwd_with_uid(pattern_seq) + assert not d, d + +def _diff_passwd(pattern): + """ + Describe difference between passwd database and a pattern. + Each pattern entry must have "name" and "uid" attribute. + """ + d = _diff(get_passwd_list(), pattern, _PASSWD_LIST_DESC) + if d: + return "list mismatch: " + d + d = _diff_each_passwd_with_name(pattern) + if d: + return "name retrieval mismatch: " + d + d = _diff_each_passwd_with_uid(pattern) + if d: + return "UID retrieval mismatch: " + d + return None + +def assert_passwd(pattern): + """ + Assert passwd database matches a pattern. + Each pattern entry must have "name" and "uid" attribute. + """ + d = _diff_passwd(pattern) + assert not d, d + +def _convert_group(group): + """ + Convert a group entry returned by grp module to an entry dictionary. + """ + return dict( + name = group.gr_name, + passwd = group.gr_passwd, + gid = group.gr_gid, + mem = group.gr_mem + ) + +def get_group_by_name(name): + """Get a group database entry by name.""" + return _convert_group(grp.getgrnam(name)) + +def get_group_by_gid(gid): + """Get a group database entry by GID.""" + return _convert_group(grp.getgrgid(gid)) + +def assert_group_by_name(name, pattern): + """Assert a group entry, retrieved by name, matches a pattern.""" + try: + ent = get_group_by_name(name) + except KeyError, err: + assert False, err + d = _diff(ent, pattern, _GROUP_DESC) + assert not d, d + +def assert_group_by_gid(gid, pattern): + """Assert a group entry, retrieved by GID, matches a pattern.""" + try: + ent = get_group_by_gid(gid) + except KeyError, err: + assert False, err + d = _diff(ent, pattern, _GROUP_DESC) + assert not d, d + +def get_group_list(): + """Get group database entry list with root group removed.""" + group_list = grp.getgrall() + for i, v in enumerate(group_list): + if v.gr_name == "root" and v.gr_gid == 0: + del group_list[i] + return map(_convert_group, group_list) + raise Exception("no root group found") + +def assert_group_list(pattern): + """Assert retrieved group list matches a pattern.""" + d = _diff(get_group_list(), pattern, _GROUP_LIST_DESC) + assert not d, d + +def _diff_each_group_by_name(pattern_dict): + """ + Describe difference between each pattern_dict value and a group entry + retrieved by name being the corresponding key. + """ + try: + ent = dict((k, get_group_by_name(k)) for k in pattern_dict.keys()) + except KeyError, err: + return str(err) + return _diff(ent, pattern_dict, _GROUP_LIST_DESC) + +def _diff_each_group_by_gid(pattern_dict): + """ + Describe difference between each pattern_dict value and a group entry + retrieved by GID being the corresponding key. + """ + try: + ent = dict((k, get_group_by_gid(k)) for k in pattern_dict.keys()) + except KeyError, err: + return str(err) + return _diff(ent, pattern_dict, _GROUP_LIST_DESC) + +def _diff_each_group_with_name(pattern_seq): + """ + Describe difference between each pattern in pattern_seq sequence and a + group entry retrieved name being the pattern's "name" value. + """ + return _diff_each_group_by_name(dict((p["name"], p) for p in pattern_seq)) + +def _diff_each_group_with_gid(pattern_seq): + """ + Describe difference between each pattern in pattern_seq sequence and a + group entry retrieved by GID being the pattern's "gid" value. + """ + return _diff_each_group_by_gid(dict((p["gid"], p) for p in pattern_seq)) + +def assert_each_group_by_name(pattern_dict): + """ + Assert each pattern_dict value matches a group entry retrieved by + name being the corresponding key. + """ + d = _diff_each_group_by_name(pattern_dict) + assert not d, d + +def assert_each_group_by_gid(pattern_dict): + """ + Assert each pattern_dict value matches a group entry retrieved by + GID being the corresponding key. + """ + d = _diff_each_group_by_gid(pattern_dict) + assert not d, d + +def assert_each_group_with_name(pattern_seq): + """ + Assert each pattern in pattern_seq sequence matches a group entry + retrieved by name being the pattern's "name" value. + """ + d = _diff_each_group_with_name(pattern_seq) + assert not d, d + +def assert_each_group_with_gid(pattern_seq): + """ + Assert each pattern in pattern_seq sequence matches a group entry + retrieved by GID being the pattern's "gid" value. + """ + d = _diff_each_group_with_gid(pattern_seq) + assert not d, d + +def _diff_group(pattern): + """ + Describe difference between group database and a pattern. + Each pattern entry must have "name" and "gid" attribute. + """ + d = _diff(get_group_list(), pattern, _GROUP_LIST_DESC) + if d: + return "list mismatch: " + d + d = _diff_each_group_with_name(pattern) + if d: + return "name retrieval mismatch: " + d + d = _diff_each_group_with_gid(pattern) + if d: + return "GID retrieval mismatch: " + d + return None + +def assert_group(pattern): + """ + Assert group database matches a pattern. + Each pattern entry must have "name" and "gid" attribute. + """ + d = _diff_group(pattern) + assert not d, d diff --git a/src/tests/intg/ent_test.py b/src/tests/intg/ent_test.py new file mode 100644 index 000000000..896fcbe14 --- /dev/null +++ b/src/tests/intg/ent_test.py @@ -0,0 +1,417 @@ +# +# ent.py module tests +# +# Copyright (c) 2015 Red Hat, Inc. +# Author: Nikolai Kondrashov <Nikolai.Kondrashov@redhat.com> +# +# This is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +import re +import os +import io +import shutil +import pytest +import ent +from util import * + +def backup_envvar_file(name): + path = os.environ[name] + backup_path = path + ".bak" + shutil.copyfile(path, backup_path) + return path + +def restore_envvar_file(name): + path = os.environ[name] + backup_path = path + ".bak" + os.rename(backup_path, path) + +@pytest.fixture(scope="module") +def passwd_path(request): + name = "NSS_WRAPPER_PASSWD" + request.addfinalizer(lambda: restore_envvar_file(name)) + return backup_envvar_file(name) + +@pytest.fixture(scope="module") +def group_path(request): + name = "NSS_WRAPPER_GROUP" + request.addfinalizer(lambda: restore_envvar_file(name)) + return backup_envvar_file(name) + +USER1 = dict(name="user1", passwd="x", uid=1001, gid=2001, + gecos="User 1", dir="/home/user1", shell="/bin/bash") +USER2 = dict(name="user2", passwd="x", uid=1002, gid=2002, + gecos="User 2", dir="/home/user2", shell="/bin/bash") +USER_LIST = [USER1, USER2] +USER_NAME_DICT = dict((u["name"], u) for u in USER_LIST) +USER_UID_DICT = dict((u["uid"], u) for u in USER_LIST) + + +EMPTY_GROUP = dict(name="empty_group", passwd="x", gid=2000, + mem=ent.contains_only()) +GROUP1 = dict(name="group1", passwd="x", gid=2001, + mem=ent.contains_only()) +GROUP2 = dict(name="group2", passwd="x", gid=2002, + mem=ent.contains_only()) +ONE_USER_GROUP1 = dict(name="one_user_group1", passwd="x", gid=2011, + mem=ent.contains_only("user1")) +ONE_USER_GROUP2 = dict(name="one_user_group2", passwd="x", gid=2012, + mem=ent.contains_only("user2")) +TWO_USER_GROUP = dict(name="two_user_group", passwd="x", gid=2020, + mem=ent.contains_only("user1", "user2")) +GROUP_LIST = [EMPTY_GROUP, + GROUP1, + GROUP2, + ONE_USER_GROUP1, + ONE_USER_GROUP2, + TWO_USER_GROUP] +GROUP_NAME_DICT = dict((g["name"], g) for g in GROUP_LIST) +GROUP_GID_DICT = dict((g["gid"], g) for g in GROUP_LIST) + +@pytest.fixture(scope="module") +def users_and_groups(request, passwd_path, group_path): + passwd_contents = "".join([ + "{name}:{passwd}:{uid}:{gid}:{gecos}:{dir}:{shell}\n".format(**u) \ + for u in USER_LIST + ]) + group_contents = "".join([ + "%s:%s:%s:%s\n" % (g["name"], g["passwd"], g["gid"], + ",".join(g["mem"])) \ + for g in GROUP_LIST + ]) + + with open(passwd_path, "a") as f: + f.write(passwd_contents) + with open(group_path, "a") as f: + f.write(group_contents) + +def test_assert_passwd_by_name(users_and_groups): + ent.assert_passwd_by_name("user1", {}) + ent.assert_passwd_by_name("user1", dict(name="user1", uid=1001)) + ent.assert_passwd_by_name("user1", USER1) + + try: + ent.assert_passwd_by_name("user3", {}) + assert False + except AssertionError, e: + assert str(e) == "'getpwnam(): name not found: user3'" + + try: + ent.assert_passwd_by_name("user2", dict(name="user1")) + assert False + except AssertionError, e: + assert str(e) == "'name' mismatch: 'user1' != 'user2'" + +def test_assert_passwd_by_uid(users_and_groups): + ent.assert_passwd_by_uid(1001, {}) + ent.assert_passwd_by_uid(1001, dict(name="user1", uid=1001)) + ent.assert_passwd_by_uid(1001, USER1) + + try: + ent.assert_passwd_by_uid(1003, {}) + assert False + except AssertionError, e: + assert str(e) == "'getpwuid(): uid not found: 1003'" + + try: + ent.assert_passwd_by_uid(1002, dict(name="user1")) + assert False + except AssertionError, e: + assert str(e) == "'name' mismatch: 'user1' != 'user2'" + + +def test_assert_passwd_list(users_and_groups): + ent.assert_passwd_list(ent.contains()) + ent.assert_passwd_list(ent.contains(USER1)) + ent.assert_passwd_list(ent.contains_only(*USER_LIST)) + try: + ent.assert_passwd_list(ent.contains_only()) + assert False + except AssertionError, e: + assert not re.search("expected users not found:", str(e)) + assert re.search("unexpected users found:", str(e)) + try: + ent.assert_passwd_list(ent.contains(dict(name="non_existent"))) + assert False + except AssertionError, e: + assert re.search("expected users not found:", str(e)) + assert not re.search("unexpected users found:", str(e)) + +def test_assert_each_passwd_by_name(users_and_groups): + ent.assert_each_passwd_by_name({}) + ent.assert_each_passwd_by_name(dict(user1=USER1)) + ent.assert_each_passwd_by_name(USER_NAME_DICT) + try: + ent.assert_each_passwd_by_name(dict(user3={})) + assert False + except AssertionError, e: + assert str(e) == "'getpwnam(): name not found: user3'" + try: + ent.assert_each_passwd_by_name(dict(user1=dict(name="user2"))) + assert False + except AssertionError, e: + assert str(e) == \ + "user 'user1' mismatch: 'name' mismatch: 'user2' != 'user1'" + +def test_assert_each_passwd_by_uid(users_and_groups): + ent.assert_each_passwd_by_uid({}) + ent.assert_each_passwd_by_uid({1001:USER1}) + ent.assert_each_passwd_by_uid(USER_UID_DICT) + try: + ent.assert_each_passwd_by_uid({1003:{}}) + assert False + except AssertionError, e: + assert str(e) == "'getpwuid(): uid not found: 1003'" + try: + ent.assert_each_passwd_by_uid({1001:dict(uid=1002)}) + assert False + except AssertionError, e: + assert str(e) == \ + "user 1001 mismatch: 'uid' mismatch: 1002 != 1001" + +def test_assert_each_passwd_with_name(users_and_groups): + ent.assert_each_passwd_with_name([]) + ent.assert_each_passwd_with_name([USER1]) + ent.assert_each_passwd_with_name(USER_LIST) + try: + ent.assert_each_passwd_with_name([dict(name="user3")]) + assert False + except AssertionError, e: + assert str(e) == "'getpwnam(): name not found: user3'" + try: + ent.assert_each_passwd_with_name([dict(name="user1", uid=1002)]) + assert False + except AssertionError, e: + assert str(e) == \ + "user 'user1' mismatch: 'uid' mismatch: 1002 != 1001" + +def test_assert_each_passwd_with_uid(users_and_groups): + ent.assert_each_passwd_with_uid([]) + ent.assert_each_passwd_with_uid([USER1]) + ent.assert_each_passwd_with_uid(USER_LIST) + try: + ent.assert_each_passwd_with_uid([dict(uid=1003)]) + assert False + except AssertionError, e: + assert str(e) == "'getpwuid(): uid not found: 1003'" + try: + ent.assert_each_passwd_with_uid([dict(name="user2", uid=1001)]) + assert False + except AssertionError, e: + assert str(e) == \ + "user 1001 mismatch: 'name' mismatch: 'user2' != 'user1'" + +def test_assert_passwd(users_and_groups): + ent.assert_passwd(ent.contains()) + ent.assert_passwd(ent.contains(USER1)) + ent.assert_passwd(ent.contains_only(*USER_LIST)) + try: + ent.assert_passwd(ent.contains(dict(name="user3", uid=1003))) + assert False + except AssertionError, e: + assert re.search("list mismatch:", str(e)) + assert re.search("expected users not found:", str(e)) + assert not re.search("unexpected users found:", str(e)) + try: + ent.assert_passwd(ent.contains_only(USER1)) + assert False + except AssertionError, e: + assert re.search("list mismatch:", str(e)) + assert not re.search("expected users not found:", str(e)) + assert re.search("unexpected users found:", str(e)) + +def test_group_member_matching(users_and_groups): + ent.assert_group_by_name("empty_group", dict(mem=ent.contains())) + ent.assert_group_by_name("empty_group", dict(mem=ent.contains_only())) + try: + ent.assert_group_by_name("empty_group", + dict(mem=ent.contains("user1"))) + except AssertionError, e: + assert re.search("member list mismatch:", str(e)) + assert re.search("expected members not found:", str(e)) + + ent.assert_group_by_name("one_user_group1", dict(mem=ent.contains())) + ent.assert_group_by_name("one_user_group1", + dict(mem=ent.contains("user1"))) + ent.assert_group_by_name("one_user_group1", + dict(mem=ent.contains_only("user1"))) + try: + ent.assert_group_by_name("one_user_group1", + dict(mem=ent.contains_only())) + except AssertionError, e: + assert re.search("member list mismatch:", str(e)) + assert re.search("unexpected members found:", str(e)) + assert not re.search("expected members not found:", str(e)) + try: + ent.assert_group_by_name("one_user_group1", + dict(mem=ent.contains_only("user3"))) + except AssertionError, e: + assert re.search("member list mismatch:", str(e)) + assert re.search("unexpected members found:", str(e)) + assert re.search("expected members not found:", str(e)) + try: + ent.assert_group_by_name("one_user_group1", + dict(mem=ent.contains("user3"))) + except AssertionError, e: + assert re.search("member list mismatch:", str(e)) + assert not re.search("unexpected members found:", str(e)) + assert re.search("expected members not found:", str(e)) + + ent.assert_group_by_name("two_user_group", dict(mem=ent.contains())) + ent.assert_group_by_name("two_user_group", + dict(mem=ent.contains("user1"))) + ent.assert_group_by_name("two_user_group", + dict(mem=ent.contains("user1", "user2"))) + ent.assert_group_by_name("two_user_group", + dict(mem=ent.contains_only("user1", "user2"))) + try: + ent.assert_group_by_name("two_user_group", + dict(mem=ent.contains_only("user1"))) + except AssertionError, e: + assert re.search("member list mismatch:", str(e)) + assert re.search("unexpected members found:", str(e)) + assert not re.search("expected members not found:", str(e)) + +def test_assert_group_by_name(users_and_groups): + ent.assert_group_by_name("group1", {}) + ent.assert_group_by_name("group1", dict(name="group1", gid=2001)) + ent.assert_group_by_name("group1", GROUP1) + + try: + ent.assert_group_by_name("group3", {}) + assert False + except AssertionError, e: + assert str(e) == "'getgrnam(): name not found: group3'" + + try: + ent.assert_group_by_name("group2", dict(name="group1")) + assert False + except AssertionError, e: + assert str(e) == "'name' mismatch: 'group1' != 'group2'" + +def test_assert_group_by_gid(users_and_groups): + ent.assert_group_by_gid(2001, {}) + ent.assert_group_by_gid(2001, dict(name="group1", gid=2001)) + ent.assert_group_by_gid(2001, GROUP1) + + try: + ent.assert_group_by_gid(2003, {}) + assert False + except AssertionError, e: + assert str(e) == "'getgrgid(): gid not found: 2003'" + + try: + ent.assert_group_by_gid(2002, dict(name="group1")) + assert False + except AssertionError, e: + assert str(e) == "'name' mismatch: 'group1' != 'group2'" + + +def test_assert_group_list(users_and_groups): + ent.assert_group_list(ent.contains()) + ent.assert_group_list(ent.contains(GROUP1)) + ent.assert_group_list(ent.contains_only(*GROUP_LIST)) + try: + ent.assert_group_list(ent.contains_only()) + assert False + except AssertionError, e: + assert not re.search("expected groups not found:", str(e)) + assert re.search("unexpected groups found:", str(e)) + try: + ent.assert_group_list(ent.contains(dict(name="non_existent"))) + assert False + except AssertionError, e: + assert re.search("expected groups not found:", str(e)) + assert not re.search("unexpected groups found:", str(e)) + +def test_assert_each_group_by_name(users_and_groups): + ent.assert_each_group_by_name({}) + ent.assert_each_group_by_name(dict(group1=GROUP1)) + ent.assert_each_group_by_name(GROUP_NAME_DICT) + try: + ent.assert_each_group_by_name(dict(group3={})) + assert False + except AssertionError, e: + assert str(e) == "'getgrnam(): name not found: group3'" + try: + ent.assert_each_group_by_name(dict(group1=dict(name="group2"))) + assert False + except AssertionError, e: + assert str(e) == "group 'group1' mismatch: " + \ + "'name' mismatch: 'group2' != 'group1'" + +def test_assert_each_group_by_gid(users_and_groups): + ent.assert_each_group_by_gid({}) + ent.assert_each_group_by_gid({2001:GROUP1}) + ent.assert_each_group_by_gid(GROUP_GID_DICT) + try: + ent.assert_each_group_by_gid({2003:{}}) + assert False + except AssertionError, e: + assert str(e) == "'getgrgid(): gid not found: 2003'" + try: + ent.assert_each_group_by_gid({2001:dict(gid=2002)}) + assert False + except AssertionError, e: + assert str(e) == \ + "group 2001 mismatch: 'gid' mismatch: 2002 != 2001" + +def test_assert_each_group_with_name(users_and_groups): + ent.assert_each_group_with_name([]) + ent.assert_each_group_with_name([GROUP1]) + ent.assert_each_group_with_name(GROUP_LIST) + try: + ent.assert_each_group_with_name([dict(name="group3")]) + assert False + except AssertionError, e: + assert str(e) == "'getgrnam(): name not found: group3'" + try: + ent.assert_each_group_with_name([dict(name="group1", gid=2002)]) + assert False + except AssertionError, e: + assert str(e) == \ + "group 'group1' mismatch: 'gid' mismatch: 2002 != 2001" + +def test_assert_each_group_with_gid(users_and_groups): + ent.assert_each_group_with_gid([]) + ent.assert_each_group_with_gid([GROUP1]) + ent.assert_each_group_with_gid(GROUP_LIST) + try: + ent.assert_each_group_with_gid([dict(gid=2003)]) + assert False + except AssertionError, e: + assert str(e) == "'getgrgid(): gid not found: 2003'" + try: + ent.assert_each_group_with_gid([dict(name="group2", gid=2001)]) + assert False + except AssertionError, e: + assert str(e) == \ + "group 2001 mismatch: 'name' mismatch: 'group2' != 'group1'" + +def test_assert_group(users_and_groups): + ent.assert_group(ent.contains()) + ent.assert_group(ent.contains(GROUP1)) + ent.assert_group(ent.contains_only(*GROUP_LIST)) + try: + ent.assert_group(ent.contains(dict(name="group3", gid=2003))) + assert False + except AssertionError, e: + assert re.search("list mismatch:", str(e)) + assert re.search("expected groups not found:", str(e)) + assert not re.search("unexpected groups found:", str(e)) + try: + ent.assert_group(ent.contains_only(GROUP1)) + assert False + except AssertionError, e: + assert re.search("list mismatch:", str(e)) + assert not re.search("expected groups not found:", str(e)) + assert re.search("unexpected groups found:", str(e)) diff --git a/src/tests/intg/ldap_ent.py b/src/tests/intg/ldap_ent.py new file mode 100644 index 000000000..ef2d14727 --- /dev/null +++ b/src/tests/intg/ldap_ent.py @@ -0,0 +1,102 @@ +# +# LDAP modlist generation +# +# Copyright (c) 2015 Red Hat, Inc. +# Author: Nikolai Kondrashov <Nikolai.Kondrashov@redhat.com> +# +# This is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +def user(base_dn, uid, uidNumber, gidNumber): + """ + Generate an RFC2307(bis) user add-modlist for passing to ldap.add* + """ + uidNumber = str(uidNumber) + gidNumber = str(gidNumber) + return ( + "uid=" + uid + ",ou=Users," + base_dn, + [ + ('objectClass', ['top', 'inetOrgPerson', 'posixAccount']), + ('cn', [uidNumber]), + ('sn', ['User']), + ('uidNumber', [uidNumber]), + ('gidNumber', [gidNumber]), + ('userPassword', ['Password' + uidNumber]), + ('homeDirectory', ['/home/' + uid]), + ('loginShell', ['/bin/bash']), + ] + ) + +def group(base_dn, cn, gidNumber, member_uids=[]): + """ + Generate an RFC2307 group add-modlist for passing to ldap.add*. + """ + gidNumber = str(gidNumber) + attr_list = [ + ('objectClass', ['top', 'posixGroup']), + ('gidNumber', [gidNumber]) + ] + if len(member_uids) > 0: + attr_list.append(('memberUid', member_uids)) + return ("cn=" + cn + ",ou=Groups," + base_dn, attr_list) + +def group_bis(base_dn, cn, gidNumber, member_uids=[], member_gids=[]): + """ + Generate an RFC2307bis group add-modlist for passing to ldap.add*. + """ + gidNumber = str(gidNumber) + attr_list = [ + ('objectClass', ['top', 'extensibleObject', 'groupOfNames']), + ('gidNumber', [gidNumber]) + ] + if len(member_uids) > 0: + attr_list.append( + ('member', [ + "uid=" + uid + ",ou=Users," + base_dn for + uid in member_uids + ]) + ) + if len(member_gids) > 0: + attr_list.append( + ('member', [ + "cn=" + gid + ",ou=Groups," + base_dn for + gid in member_gids + ]) + ) + return ("cn=" + cn + ",ou=Groups," + base_dn, attr_list) + +class List(list): + """LDAP add-modlist list""" + + def __init__(self, base_dn): + self.base_dn = base_dn + + def add_user(self, uid, uidNumber, gidNumber, + base_dn=None): + """Add an RFC2307(bis) user add-modlist.""" + self.append(user(base_dn or self.base_dn, + uid, uidNumber, gidNumber)) + + def add_group(self, cn, gidNumber, member_uids=[], + base_dn=None): + """Add an RFC2307 group add-modlist.""" + self.append(group(base_dn or self.base_dn, + cn, gidNumber, member_uids)) + + def add_group_bis(self, cn, gidNumber, + member_uids=[], member_gids=[], + base_dn=None): + """Add an RFC2307bis group add-modlist.""" + self.append(group_bis(base_dn or self.base_dn, + cn, gidNumber, + member_uids, member_gids)) diff --git a/src/tests/intg/ldap_test.py b/src/tests/intg/ldap_test.py new file mode 100644 index 000000000..afc77d702 --- /dev/null +++ b/src/tests/intg/ldap_test.py @@ -0,0 +1,261 @@ +# +# LDAP integration test +# +# Copyright (c) 2015 Red Hat, Inc. +# Author: Nikolai Kondrashov <Nikolai.Kondrashov@redhat.com> +# +# This is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +import os +import sys +import stat +import pwd +import grp +import ent +import config +import signal +import subprocess +import time +import ldap +import pytest +import ds_openldap +import ldap_ent +from util import * + +LDAP_BASE_DN="dc=example,dc=com" + +@pytest.fixture(scope="module") +def ds_inst(request): + """LDAP server instance fixture""" + ds_inst = ds_openldap.DSOpenLDAP( + config.PREFIX, 10389, LDAP_BASE_DN, + "cn=admin", "Secret123") + try: + ds_inst.setup() + except: + ds_inst.teardown() + raise + request.addfinalizer(lambda: ds_inst.teardown()) + return ds_inst + +@pytest.fixture(scope="module") +def ldap_conn(request, ds_inst): + """LDAP server connection fixture""" + ldap_conn = ds_inst.bind() + ldap_conn.ds_inst = ds_inst + request.addfinalizer(lambda: ldap_conn.unbind_s()) + return ldap_conn + +def create_ldap_fixture(request, ldap_conn, ent_list): + """Add LDAP entries and add teardown for removing them""" + for entry in ent_list: + ldap_conn.add_s(entry[0], entry[1]) + def teardown(): + for entry in ent_list: + ldap_conn.delete_s(entry[0]) + request.addfinalizer(teardown) + +def create_conf_fixture(request, contents): + """Generate sssd.conf and add teardown for removing it""" + conf = open(config.CONF_PATH, "w") + conf.write(contents) + conf.close() + os.chmod(config.CONF_PATH, stat.S_IRUSR | stat.S_IWUSR) + request.addfinalizer(lambda: os.unlink(config.CONF_PATH)) + +def create_sssd_fixture(request): + """Start sssd and add teardown for stopping it and removing state""" + if subprocess.call(["sssd", "-D", "-f"]) != 0: + raise Exception("sssd start failed") + def teardown(): + try: + pid_file = open(config.PIDFILE_PATH, "r") + pid = int(pid_file.read()) + os.kill(pid, signal.SIGTERM) + while True: + try: + os.kill(pid, signal.SIGCONT) + except: + break + time.sleep(1) + except: + pass + for path in os.listdir(config.DB_PATH): + os.unlink(config.DB_PATH + "/" + path) + for path in os.listdir(config.MCACHE_PATH): + os.unlink(config.MCACHE_PATH + "/" + path) + request.addfinalizer(teardown) + +@pytest.fixture +def sanity_rfc2307(request, ldap_conn): + ent_list = ldap_ent.List(LDAP_BASE_DN) + ent_list.add_user("user1", 1001, 2001) + ent_list.add_user("user2", 1002, 2002) + ent_list.add_user("user3", 1003, 2003) + + ent_list.add_group("group1", 2001) + ent_list.add_group("group2", 2002) + ent_list.add_group("group3", 2003) + + ent_list.add_group("empty_group", 2010) + + ent_list.add_group("two_user_group", 2012, ["user1", "user2"]) + create_ldap_fixture(request, ldap_conn, ent_list) + + conf = unindent("""\ + [sssd] + debug_level = 0xffff + config_file_version = 2 + domains = LDAP + services = nss, pam + + [nss] + debug_level = 0xffff + memcache_timeout = 0 + + [pam] + debug_level = 0xffff + + [domain/LDAP] + ldap_auth_disable_tls_never_use_in_production = true + debug_level = 0xffff + enumerate = true + ldap_schema = rfc2307 + id_provider = ldap + auth_provider = ldap + sudo_provider = ldap + ldap_uri = {ldap_conn.ds_inst.ldap_url} + ldap_search_base = {ldap_conn.ds_inst.base_dn} + """).format(**locals()) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return None + +@pytest.fixture +def sanity_rfc2307_bis(request, ldap_conn): + ent_list = ldap_ent.List(LDAP_BASE_DN) + ent_list.add_user("user1", 1001, 2001) + ent_list.add_user("user2", 1002, 2002) + ent_list.add_user("user3", 1003, 2003) + + ent_list.add_group_bis("group1", 2001) + ent_list.add_group_bis("group2", 2002) + ent_list.add_group_bis("group3", 2003) + + ent_list.add_group_bis("empty_group1", 2010) + ent_list.add_group_bis("empty_group2", 2011) + + ent_list.add_group_bis("two_user_group", 2012, ["user1", "user2"]) + ent_list.add_group_bis("group_empty_group", 2013, [], ["empty_group1"]) + ent_list.add_group_bis("group_two_empty_groups", 2014, + [], ["empty_group1", "empty_group2"]) + ent_list.add_group_bis("one_user_group1", 2015, ["user1"]) + ent_list.add_group_bis("one_user_group2", 2016, ["user2"]) + ent_list.add_group_bis("group_one_user_group", 2017, + [], ["one_user_group1"]) + ent_list.add_group_bis("group_two_user_group", 2018, + [], ["two_user_group"]) + ent_list.add_group_bis("group_two_one_user_groups", 2019, + [], ["one_user_group1", "one_user_group2"]) + + create_ldap_fixture(request, ldap_conn, ent_list) + + conf = unindent("""\ + [sssd] + debug_level = 0xffff + config_file_version = 2 + domains = LDAP + services = nss, pam + + [nss] + debug_level = 0xffff + memcache_timeout = 0 + + [pam] + debug_level = 0xffff + + [domain/LDAP] + ldap_auth_disable_tls_never_use_in_production = true + debug_level = 0xffff + enumerate = true + ldap_schema = rfc2307bis + ldap_group_object_class = groupOfNames + id_provider = ldap + auth_provider = ldap + sudo_provider = ldap + ldap_uri = {ldap_conn.ds_inst.ldap_url} + ldap_search_base = {ldap_conn.ds_inst.base_dn} + """).format(**locals()) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return None + +def test_sanity_rfc2307(ldap_conn, sanity_rfc2307): + passwd_pattern = ent.contains_only( + dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', dir='/home/user1', shell='/bin/bash'), + dict(name='user2', passwd='*', uid=1002, gid=2002, gecos='1002', dir='/home/user2', shell='/bin/bash'), + dict(name='user3', passwd='*', uid=1003, gid=2003, gecos='1003', dir='/home/user3', shell='/bin/bash') + ) + ent.assert_passwd(passwd_pattern) + + group_pattern = ent.contains_only( + dict(name='group1', passwd='*', gid=2001, mem=ent.contains_only()), + dict(name='group2', passwd='*', gid=2002, mem=ent.contains_only()), + dict(name='group3', passwd='*', gid=2003, mem=ent.contains_only()), + dict(name='empty_group', passwd='*', gid=2010, mem=ent.contains_only()), + dict(name='two_user_group', passwd='*', gid=2012, mem=ent.contains_only("user1", "user2")) + ) + ent.assert_group(group_pattern) + + with pytest.raises(KeyError): + pwd.getpwnam("non_existent_user") + with pytest.raises(KeyError): + pwd.getpwuid(1) + with pytest.raises(KeyError): + grp.getgrnam("non_existent_group") + with pytest.raises(KeyError): + grp.getgrgid(1) + +def test_sanity_rfc2307_bis(ldap_conn, sanity_rfc2307_bis): + passwd_pattern = ent.contains_only( + dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', dir='/home/user1', shell='/bin/bash'), + dict(name='user2', passwd='*', uid=1002, gid=2002, gecos='1002', dir='/home/user2', shell='/bin/bash'), + dict(name='user3', passwd='*', uid=1003, gid=2003, gecos='1003', dir='/home/user3', shell='/bin/bash') + ) + ent.assert_passwd(passwd_pattern) + + group_pattern = ent.contains_only( + dict(name='group1', passwd='*', gid=2001, mem=ent.contains_only()), + dict(name='group2', passwd='*', gid=2002, mem=ent.contains_only()), + dict(name='group3', passwd='*', gid=2003, mem=ent.contains_only()), + dict(name='empty_group1', passwd='*', gid=2010, mem=ent.contains_only()), + dict(name='empty_group2', passwd='*', gid=2011, mem=ent.contains_only()), + dict(name='two_user_group', passwd='*', gid=2012, mem=ent.contains_only("user1", "user2")), + dict(name='group_empty_group', passwd='*', gid=2013, mem=ent.contains_only()), + dict(name='group_two_empty_groups', passwd='*', gid=2014, mem=ent.contains_only()), + dict(name='one_user_group1', passwd='*', gid=2015, mem=ent.contains_only("user1")), + dict(name='one_user_group2', passwd='*', gid=2016, mem=ent.contains_only("user2")), + dict(name='group_one_user_group', passwd='*', gid=2017, mem=ent.contains_only("user1")), + dict(name='group_two_user_group', passwd='*', gid=2018, mem=ent.contains_only("user1", "user2")), + dict(name='group_two_one_user_groups', passwd='*', gid=2019, mem=ent.contains_only("user1", "user2")) + ) + ent.assert_group(group_pattern) + + with pytest.raises(KeyError): + pwd.getpwnam("non_existent_user") + with pytest.raises(KeyError): + pwd.getpwuid(1) + with pytest.raises(KeyError): + grp.getgrnam("non_existent_group") + with pytest.raises(KeyError): + grp.getgrgid(1) diff --git a/src/tests/intg/util.py b/src/tests/intg/util.py new file mode 100644 index 000000000..5dd92b220 --- /dev/null +++ b/src/tests/intg/util.py @@ -0,0 +1,55 @@ +# +# Various functions +# +# Copyright (c) 2015 Red Hat, Inc. +# Author: Nikolai Kondrashov <Nikolai.Kondrashov@redhat.com> +# +# This is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import re +import os +import subprocess + +UNINDENT_RE = re.compile("^ +", re.MULTILINE) + +def unindent(text): + """ + Unindent text by removing at most the number of spaces present in + the first non-empty line from the beginning of every line. + """ + indent_ref = [0] + def replace(match): + if indent_ref[0] == 0: + indent_ref[0] = len(match.group()) + return match.group()[indent_ref[0]:] + return UNINDENT_RE.sub(replace, text) + +def run_shell(): + """ + Execute an interactive shell under "screen", preserving environment. + For use as a breakpoint for debugging. + """ + subprocess.call([ + "screen", "-D", "-m", "bash", "-c", + "PATH='" + os.getenv("PATH", "") + "' " + + "LD_LIBRARY_PATH='" + os.getenv("LD_LIBRARY_PATH", "") + "' " + + "LD_PRELOAD='" + os.getenv("LD_PRELOAD", "") + "' " + + "bash -i" + ]) + +def first_dir(*args): + """Return first argument that points to an existing directory.""" + for arg in args: + if os.path.isdir(arg): + return arg |