diff options
| author | Jelmer Vernooij <jelmer@samba.org> | 2012-12-28 15:37:14 +0100 |
|---|---|---|
| committer | Andrew Bartlett <abartlet@samba.org> | 2013-03-02 03:57:34 +0100 |
| commit | 87afc3aee1ea593069322a49355dd8780d99e123 (patch) | |
| tree | 8e1ea6678d93b53f21b34c4940b7d5a64e0f5020 /python/samba/tests/samba_tool | |
| parent | 80fce353e740c793619005ac102ab07fb5e7d280 (diff) | |
| download | samba-87afc3aee1ea593069322a49355dd8780d99e123.tar.gz samba-87afc3aee1ea593069322a49355dd8780d99e123.tar.xz samba-87afc3aee1ea593069322a49355dd8780d99e123.zip | |
Move python modules from source4/scripting/python/ to python/.
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Autobuild-User(master): Andrew Bartlett <abartlet@samba.org>
Autobuild-Date(master): Sat Mar 2 03:57:34 CET 2013 on sn-devel-104
Diffstat (limited to 'python/samba/tests/samba_tool')
| -rw-r--r-- | python/samba/tests/samba_tool/__init__.py | 15 | ||||
| -rw-r--r-- | python/samba/tests/samba_tool/base.py | 114 | ||||
| -rw-r--r-- | python/samba/tests/samba_tool/gpo.py | 79 | ||||
| -rw-r--r-- | python/samba/tests/samba_tool/group.py | 169 | ||||
| -rw-r--r-- | python/samba/tests/samba_tool/ntacl.py | 135 | ||||
| -rw-r--r-- | python/samba/tests/samba_tool/processes.py | 35 | ||||
| -rw-r--r-- | python/samba/tests/samba_tool/timecmd.py | 43 | ||||
| -rw-r--r-- | python/samba/tests/samba_tool/user.py | 362 |
8 files changed, 952 insertions, 0 deletions
diff --git a/python/samba/tests/samba_tool/__init__.py b/python/samba/tests/samba_tool/__init__.py new file mode 100644 index 0000000000..3d7f0591e2 --- /dev/null +++ b/python/samba/tests/samba_tool/__init__.py @@ -0,0 +1,15 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. diff --git a/python/samba/tests/samba_tool/base.py b/python/samba/tests/samba_tool/base.py new file mode 100644 index 0000000000..60ccaa543d --- /dev/null +++ b/python/samba/tests/samba_tool/base.py @@ -0,0 +1,114 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +# This provides a wrapper around the cmd interface so that tests can +# easily be built on top of it and have minimal code to run basic tests +# of the commands. A list of the environmental variables can be found in +# ~/selftest/selftest.pl +# +# These can all be accesses via os.environ["VARIBLENAME"] when needed + +import random +import string +from samba.auth import system_session +from samba.samdb import SamDB +from cStringIO import StringIO +from samba.netcmd.main import cmd_sambatool +import samba.tests + +class SambaToolCmdTest(samba.tests.TestCaseInTempDir): + + def getSamDB(self, *argv): + """a convenience function to get a samdb instance so that we can query it""" + + # We build a fake command to get the options created the same + # way the command classes do it. It would be better if the command + # classes had a way to more cleanly do this, but this lets us write + # tests for now + cmd = cmd_sambatool.subcommands["user"].subcommands["setexpiry"] + parser, optiongroups = cmd._create_parser("user") + opts, args = parser.parse_args(list(argv)) + # Filter out options from option groups + args = args[1:] + kwargs = dict(opts.__dict__) + for option_group in parser.option_groups: + for option in option_group.option_list: + if option.dest is not None: + del kwargs[option.dest] + kwargs.update(optiongroups) + + H = kwargs.get("H", None) + sambaopts = kwargs.get("sambaopts", None) + credopts = kwargs.get("credopts", None) + + lp = sambaopts.get_loadparm() + creds = credopts.get_credentials(lp, fallback_machine=True) + + samdb = SamDB(url=H, session_info=system_session(), + credentials=creds, lp=lp) + return samdb + + + def runcmd(self, name, *args): + """run a single level command""" + cmd = cmd_sambatool.subcommands[name] + cmd.outf = StringIO() + cmd.errf = StringIO() + result = cmd._run(name, *args) + return (result, cmd.outf.getvalue(), cmd.errf.getvalue()) + + def runsubcmd(self, name, sub, *args): + """run a command with sub commands""" + # The reason we need this function seperate from runcmd is + # that the .outf StringIO assignment is overriden if we use + # runcmd, so we can't capture stdout and stderr + cmd = cmd_sambatool.subcommands[name].subcommands[sub] + cmd.outf = StringIO() + cmd.errf = StringIO() + result = cmd._run(name, *args) + return (result, cmd.outf.getvalue(), cmd.errf.getvalue()) + + def assertCmdSuccess(self, val, msg=""): + self.assertIsNone(val, msg) + + def assertCmdFail(self, val, msg=""): + self.assertIsNotNone(val, msg) + + def assertMatch(self, base, string, msg=""): + self.assertTrue(string in base, msg) + + def randomName(self, count=8): + """Create a random name, cap letters and numbers, and always starting with a letter""" + name = random.choice(string.ascii_uppercase) + name += ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase+ string.digits) for x in range(count - 1)) + return name + + def randomPass(self, count=16): + name = random.choice(string.ascii_uppercase) + name += random.choice(string.digits) + name += random.choice(string.ascii_lowercase) + name += ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase+ string.digits) for x in range(count - 3)) + return name + + def randomXid(self): + # pick some hopefully unused, high UID/GID range to avoid interference + # from the system the test runs on + xid = random.randint(4711000, 4799000) + return xid + + def assertWithin(self, val1, val2, delta, msg=""): + """Assert that val1 is within delta of val2, useful for time computations""" + self.assertTrue(((val1 + delta) > val2) and ((val1 - delta) < val2), msg) diff --git a/python/samba/tests/samba_tool/gpo.py b/python/samba/tests/samba_tool/gpo.py new file mode 100644 index 0000000000..e20a97794a --- /dev/null +++ b/python/samba/tests/samba_tool/gpo.py @@ -0,0 +1,79 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Andrew Bartlett 2012 +# +# based on time.py: +# Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import os +from samba.tests.samba_tool.base import SambaToolCmdTest +import shutil + +class GpoCmdTestCase(SambaToolCmdTest): + """Tests for samba-tool time subcommands""" + + gpo_name = "testgpo" + + def test_gpo_list(self): + """Run gpo list against the server and make sure it looks accurate""" + (result, out, err) = self.runsubcmd("gpo", "listall", "-H", "ldap://%s" % os.environ["SERVER"]) + self.assertCmdSuccess(result, "Ensuring gpo listall ran successfully") + + def test_fetchfail(self): + """Run against a non-existent GPO, and make sure it fails (this hard-coded UUID is very unlikely to exist""" + (result, out, err) = self.runsubcmd("gpo", "fetch", "c25cac17-a02a-4151-835d-fae17446ee43", "-H", "ldap://%s" % os.environ["SERVER"]) + self.assertEquals(result, -1, "check for result code") + + def test_fetch(self): + """Run against a real GPO, and make sure it passes""" + (result, out, err) = self.runsubcmd("gpo", "fetch", self.gpo_guid, "-H", "ldap://%s" % os.environ["SERVER"], "--tmpdir", self.tempdir) + self.assertCmdSuccess(result, "Ensuring gpo fetched successfully") + shutil.rmtree(os.path.join(self.tempdir, "policy")) + + def test_show(self): + """Show a real GPO, and make sure it passes""" + (result, out, err) = self.runsubcmd("gpo", "show", self.gpo_guid, "-H", "ldap://%s" % os.environ["SERVER"]) + self.assertCmdSuccess(result, "Ensuring gpo fetched successfully") + + def test_show_as_admin(self): + """Show a real GPO, and make sure it passes""" + (result, out, err) = self.runsubcmd("gpo", "show", self.gpo_guid, "-H", "ldap://%s" % os.environ["SERVER"], "-U%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])) + self.assertCmdSuccess(result, "Ensuring gpo fetched successfully") + + def test_aclcheck(self): + """Check all the GPOs on the remote server have correct ACLs""" + (result, out, err) = self.runsubcmd("gpo", "aclcheck", "-H", "ldap://%s" % os.environ["SERVER"], "-U%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])) + self.assertCmdSuccess(result, "Ensuring gpo checked successfully") + + def setUp(self): + """set up a temporary GPO to work with""" + super(GpoCmdTestCase, self).setUp() + (result, out, err) = self.runsubcmd("gpo", "create", self.gpo_name, + "-H", "ldap://%s" % os.environ["SERVER"], + "-U%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"]), + "--tmpdir", self.tempdir) + shutil.rmtree(os.path.join(self.tempdir, "policy")) + self.assertCmdSuccess(result, "Ensuring gpo created successfully") + try: + self.gpo_guid = "{%s}" % out.split("{")[1].split("}")[0] + except IndexError: + self.fail("Failed to find GUID in output: %s" % out) + + def tearDown(self): + """remove the temporary GPO to work with""" + (result, out, err) = self.runsubcmd("gpo", "del", self.gpo_guid, "-H", "ldap://%s" % os.environ["SERVER"], "-U%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])) + self.assertCmdSuccess(result, "Ensuring gpo deleted successfully") + super(GpoCmdTestCase, self).tearDown() diff --git a/python/samba/tests/samba_tool/group.py b/python/samba/tests/samba_tool/group.py new file mode 100644 index 0000000000..2c0c46e5dc --- /dev/null +++ b/python/samba/tests/samba_tool/group.py @@ -0,0 +1,169 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Michael Adam 2012 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import os +import time +import ldb +from samba.tests.samba_tool.base import SambaToolCmdTest +from samba import ( + nttime2unix, + dsdb + ) + +class GroupCmdTestCase(SambaToolCmdTest): + """Tests for samba-tool group subcommands""" + groups = [] + samdb = None + + def setUp(self): + super(GroupCmdTestCase, self).setUp() + self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + self.groups = [] + self.groups.append(self._randomGroup({"name": "testgroup1"})) + self.groups.append(self._randomGroup({"name": "testgroup2"})) + self.groups.append(self._randomGroup({"name": "testgroup3"})) + self.groups.append(self._randomGroup({"name": "testgroup4"})) + + # setup the 4 groups and ensure they are correct + for group in self.groups: + (result, out, err) = self._create_group(group) + + self.assertCmdSuccess(result) + self.assertEquals(err, "", "There shouldn't be any error message") + self.assertIn("Added group %s" % group["name"], out) + + found = self._find_group(group["name"]) + + self.assertIsNotNone(found) + + self.assertEquals("%s" % found.get("name"), group["name"]) + self.assertEquals("%s" % found.get("description"), group["description"]) + + def tearDown(self): + super(GroupCmdTestCase, self).tearDown() + # clean up all the left over groups, just in case + for group in self.groups: + if self._find_group(group["name"]): + self.runsubcmd("group", "delete", group["name"]) + + + def test_newgroup(self): + """This tests the "group add" and "group delete" commands""" + # try to add all the groups again, this should fail + for group in self.groups: + (result, out, err) = self._create_group(group) + self.assertCmdFail(result, "Succeeded to create existing group") + self.assertIn("LDAP error 68 LDAP_ENTRY_ALREADY_EXISTS", err) + + # try to delete all the groups we just added + for group in self.groups: + (result, out, err) = self.runsubcmd("group", "delete", group["name"]) + self.assertCmdSuccess(result, + "Failed to delete group '%s'" % group["name"]) + found = self._find_group(group["name"]) + self.assertIsNone(found, + "Deleted group '%s' still exists" % group["name"]) + + # test adding groups + for group in self.groups: + (result, out, err) = self.runsubcmd("group", "add", group["name"], + "--description=%s" % group["description"], + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + + self.assertCmdSuccess(result) + self.assertEquals(err,"","There shouldn't be any error message") + self.assertIn("Added group %s" % group["name"], out) + + found = self._find_group(group["name"]) + + self.assertEquals("%s" % found.get("samaccountname"), + "%s" % group["name"]) + + + def test_list(self): + (result, out, err) = self.runsubcmd("group", "list", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, "Error running list") + + search_filter = "(objectClass=group)" + + grouplist = self.samdb.search(base=self.samdb.domain_dn(), + scope=ldb.SCOPE_SUBTREE, + expression=search_filter, + attrs=["samaccountname"]) + + self.assertTrue(len(grouplist) > 0, "no groups found in samdb") + + for groupobj in grouplist: + name = groupobj.get("samaccountname", idx=0) + found = self.assertMatch(out, name, + "group '%s' not found" % name) + + def test_listmembers(self): + (result, out, err) = self.runsubcmd("group", "listmembers", "Domain Users", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, "Error running listmembers") + + search_filter = "(|(primaryGroupID=513)(memberOf=CN=Domain Users,CN=Users,%s))" % self.samdb.domain_dn() + + grouplist = self.samdb.search(base=self.samdb.domain_dn(), + scope=ldb.SCOPE_SUBTREE, + expression=search_filter, + attrs=["samAccountName"]) + + self.assertTrue(len(grouplist) > 0, "no groups found in samdb") + + for groupobj in grouplist: + name = groupobj.get("samAccountName", idx=0) + found = self.assertMatch(out, name, "group '%s' not found" % name) + + def _randomGroup(self, base={}): + """create a group with random attribute values, you can specify base attributes""" + group = { + "name": self.randomName(), + "description": self.randomName(count=100), + } + group.update(base) + return group + + def _create_group(self, group): + return self.runsubcmd("group", "add", group["name"], + "--description=%s" % group["description"], + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + + def _find_group(self, name): + search_filter = ("(&(sAMAccountName=%s)(objectCategory=%s,%s))" % + (ldb.binary_encode(name), + "CN=Group,CN=Schema,CN=Configuration", + self.samdb.domain_dn())) + grouplist = self.samdb.search(base=self.samdb.domain_dn(), + scope=ldb.SCOPE_SUBTREE, + expression=search_filter, + attrs=[]) + if grouplist: + return grouplist[0] + else: + return None diff --git a/python/samba/tests/samba_tool/ntacl.py b/python/samba/tests/samba_tool/ntacl.py new file mode 100644 index 0000000000..2a329fe7d4 --- /dev/null +++ b/python/samba/tests/samba_tool/ntacl.py @@ -0,0 +1,135 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Andrew Bartlett 2012 +# +# Based on user.py: +# Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import os +import time +import ldb +from samba.tests.samba_tool.base import SambaToolCmdTest +import random + +class NtACLCmdSysvolTestCase(SambaToolCmdTest): + """Tests for samba-tool ntacl sysvol* subcommands""" + + + def test_ntvfs(self): + (result, out, err) = self.runsubcmd("ntacl", "sysvolreset", + "--use-ntvfs") + self.assertCmdSuccess(result) + self.assertEquals(out,"","Shouldn't be any output messages") + self.assertIn("Please note that POSIX permissions have NOT been changed, only the stored NT ACL", err) + + def test_s3fs(self): + (result, out, err) = self.runsubcmd("ntacl", "sysvolreset", + "--use-s3fs") + + self.assertCmdSuccess(result) + self.assertEquals(err,"","Shouldn't be any error messages") + self.assertEquals(out,"","Shouldn't be any output messages") + + def test_ntvfs_check(self): + (result, out, err) = self.runsubcmd("ntacl", "sysvolreset", + "--use-ntvfs") + self.assertCmdSuccess(result) + self.assertEquals(out,"","Shouldn't be any output messages") + self.assertIn("Please note that POSIX permissions have NOT been changed, only the stored NT ACL", err) + # Now check they were set correctly + (result, out, err) = self.runsubcmd("ntacl", "sysvolcheck") + self.assertCmdSuccess(result) + self.assertEquals(err,"","Shouldn't be any error messages") + self.assertEquals(out,"","Shouldn't be any output messages") + + def test_s3fs_check(self): + (result, out, err) = self.runsubcmd("ntacl", "sysvolreset", + "--use-s3fs") + + self.assertCmdSuccess(result) + self.assertEquals(err,"","Shouldn't be any error messages") + self.assertEquals(out,"","Shouldn't be any output messages") + + # Now check they were set correctly + (result, out, err) = self.runsubcmd("ntacl", "sysvolcheck") + self.assertCmdSuccess(result) + self.assertEquals(err,"","Shouldn't be any error messages") + self.assertEquals(out,"","Shouldn't be any output messages") + +class NtACLCmdGetSetTestCase(SambaToolCmdTest): + """Tests for samba-tool ntacl get/set subcommands""" + + acl = "O:DAG:DUD:P(A;OICI;0x001f01ff;;;DA)(A;OICI;0x001f01ff;;;EA)(A;OICIIO;0x001f01ff;;;CO)(A;OICI;0x001f01ff;;;DA)(A;OICI;0x001f01ff;;;SY)(A;OICI;0x001200a9;;;AU)(A;OICI;0x001200a9;;;ED)S:AI(OU;CIIDSA;WP;f30e3bbe-9ff0-11d1-b603-0000f80367c1;bf967aa5-0de6-11d0-a285-00aa003049e2;WD)(OU;CIIDSA;WP;f30e3bbf-9ff0-11d1-b603-0000f80367c1;bf967aa5-0de6-11d0-a285-00aa003049e2;WD)" + + + def test_ntvfs(self): + path = os.environ['SELFTEST_PREFIX'] + tempf = os.path.join(path,"pytests"+str(int(100000*random.random()))) + open(tempf, 'w').write("empty") + + (result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf, + "--use-ntvfs") + self.assertCmdSuccess(result) + self.assertEquals(out,"","Shouldn't be any output messages") + self.assertIn("Please note that POSIX permissions have NOT been changed, only the stored NT ACL", err) + + def test_s3fs(self): + path = os.environ['SELFTEST_PREFIX'] + tempf = os.path.join(path,"pytests"+str(int(100000*random.random()))) + open(tempf, 'w').write("empty") + + (result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf, + "--use-s3fs") + + self.assertCmdSuccess(result) + self.assertEquals(err,"","Shouldn't be any error messages") + self.assertEquals(out,"","Shouldn't be any output messages") + + def test_ntvfs_check(self): + path = os.environ['SELFTEST_PREFIX'] + tempf = os.path.join(path,"pytests"+str(int(100000*random.random()))) + open(tempf, 'w').write("empty") + + (result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf, + "--use-ntvfs") + self.assertCmdSuccess(result) + self.assertEquals(out,"","Shouldn't be any output messages") + self.assertIn("Please note that POSIX permissions have NOT been changed, only the stored NT ACL", err) + + # Now check they were set correctly + (result, out, err) = self.runsubcmd("ntacl", "get", tempf, + "--use-ntvfs", "--as-sddl") + self.assertCmdSuccess(result) + self.assertEquals(err,"","Shouldn't be any error messages") + self.assertEquals(self.acl+"\n", out, "Output should be the ACL") + + def test_s3fs_check(self): + path = os.environ['SELFTEST_PREFIX'] + tempf = os.path.join(path,"pytests"+str(int(100000*random.random()))) + open(tempf, 'w').write("empty") + + (result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf, + "--use-s3fs") + self.assertCmdSuccess(result) + self.assertEquals(out,"","Shouldn't be any output messages") + self.assertEquals(err,"","Shouldn't be any error messages") + + # Now check they were set correctly + (result, out, err) = self.runsubcmd("ntacl", "get", tempf, + "--use-s3fs", "--as-sddl") + self.assertCmdSuccess(result) + self.assertEquals(err,"","Shouldn't be any error messages") + self.assertEquals(self.acl+"\n", out,"Output should be the ACL") diff --git a/python/samba/tests/samba_tool/processes.py b/python/samba/tests/samba_tool/processes.py new file mode 100644 index 0000000000..91a5266b54 --- /dev/null +++ b/python/samba/tests/samba_tool/processes.py @@ -0,0 +1,35 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Andrew Bartlett 2012 +# +# based on time.py: +# Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import os +from samba.tests.samba_tool.base import SambaToolCmdTest + +class ProcessCmdTestCase(SambaToolCmdTest): + """Tests for samba-tool process subcommands""" + + def test_name(self): + """Run processes command""" + (result, out, err) = self.runcmd("processes", "--name", "samba") + self.assertCmdSuccess(result, "Ensuring processes ran successfully") + + def test_all(self): + """Run processes command""" + (result, out, err) = self.runcmd("processes") + self.assertCmdSuccess(result, "Ensuring processes ran successfully") diff --git a/python/samba/tests/samba_tool/timecmd.py b/python/samba/tests/samba_tool/timecmd.py new file mode 100644 index 0000000000..000f0f2828 --- /dev/null +++ b/python/samba/tests/samba_tool/timecmd.py @@ -0,0 +1,43 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import os +from time import localtime, strptime, mktime +from samba.tests.samba_tool.base import SambaToolCmdTest + +class TimeCmdTestCase(SambaToolCmdTest): + """Tests for samba-tool time subcommands""" + + def test_timeget(self): + """Run time against the server and make sure it looks accurate""" + (result, out, err) = self.runcmd("time", os.environ["SERVER"]) + self.assertCmdSuccess(result, "Ensuring time ran successfully") + + timefmt = strptime(out, "%a %b %d %H:%M:%S %Y %Z\n") + servertime = int(mktime(timefmt)) + now = int(mktime(localtime())) + + # because there is a race here, allow up to 5 seconds difference in times + delta = 5 + self.assertTrue((servertime > (now - delta) and (servertime < (now + delta)), "Time is now")) + + def test_timefail(self): + """Run time against a non-existent server, and make sure it fails""" + (result, out, err) = self.runcmd("time", "notaserver") + self.assertEquals(result, -1, "check for result code") + self.assertTrue(err.strip().endswith("NT_STATUS_OBJECT_NAME_NOT_FOUND"), "ensure right error string") + self.assertEquals(out, "", "ensure no output returned") diff --git a/python/samba/tests/samba_tool/user.py b/python/samba/tests/samba_tool/user.py new file mode 100644 index 0000000000..33344cd3d3 --- /dev/null +++ b/python/samba/tests/samba_tool/user.py @@ -0,0 +1,362 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import os +import time +import ldb +from samba.tests.samba_tool.base import SambaToolCmdTest +from samba import ( + nttime2unix, + dsdb + ) + +class UserCmdTestCase(SambaToolCmdTest): + """Tests for samba-tool user subcommands""" + users = [] + samdb = None + + def setUp(self): + super(UserCmdTestCase, self).setUp() + self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + self.users = [] + self.users.append(self._randomUser({"name": "sambatool1", "company": "comp1"})) + self.users.append(self._randomUser({"name": "sambatool2", "company": "comp1"})) + self.users.append(self._randomUser({"name": "sambatool3", "company": "comp2"})) + self.users.append(self._randomUser({"name": "sambatool4", "company": "comp2"})) + self.users.append(self._randomPosixUser({"name": "posixuser1"})) + self.users.append(self._randomPosixUser({"name": "posixuser2"})) + self.users.append(self._randomPosixUser({"name": "posixuser3"})) + self.users.append(self._randomPosixUser({"name": "posixuser4"})) + + # setup the 8 users and ensure they are correct + for user in self.users: + (result, out, err) = user["createUserFn"](user) + + self.assertCmdSuccess(result) + self.assertEquals(err,"","Shouldn't be any error messages") + self.assertIn("User '%s' created successfully" % user["name"], out) + + user["checkUserFn"](user) + + + def tearDown(self): + super(UserCmdTestCase, self).tearDown() + # clean up all the left over users, just in case + for user in self.users: + if self._find_user(user["name"]): + self.runsubcmd("user", "delete", user["name"]) + + + def test_newuser(self): + # try to add all the users again, this should fail + for user in self.users: + (result, out, err) = self._create_user(user) + self.assertCmdFail(result, "Ensure that create user fails") + self.assertIn("LDAP error 68 LDAP_ENTRY_ALREADY_EXISTS", err) + + # try to delete all the 4 users we just added + for user in self.users: + (result, out, err) = self.runsubcmd("user", "delete", user["name"]) + self.assertCmdSuccess(result, "Can we delete users") + found = self._find_user(user["name"]) + self.assertIsNone(found) + + # test adding users with --use-username-as-cn + for user in self.users: + (result, out, err) = self.runsubcmd("user", "add", user["name"], user["password"], + "--use-username-as-cn", + "--surname=%s" % user["surname"], + "--given-name=%s" % user["given-name"], + "--job-title=%s" % user["job-title"], + "--department=%s" % user["department"], + "--description=%s" % user["description"], + "--company=%s" % user["company"], + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + + self.assertCmdSuccess(result) + self.assertEquals(err,"","Shouldn't be any error messages") + self.assertIn("User '%s' created successfully" % user["name"], out) + + found = self._find_user(user["name"]) + + self.assertEquals("%s" % found.get("cn"), "%(name)s" % user) + self.assertEquals("%s" % found.get("name"), "%(name)s" % user) + + + + def test_setpassword(self): + for user in self.users: + newpasswd = self.randomPass() + (result, out, err) = self.runsubcmd("user", "setpassword", + user["name"], + "--newpassword=%s" % newpasswd, + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + # self.assertCmdSuccess(result, "Ensure setpassword runs") + self.assertEquals(err,"","setpassword with url") + self.assertMatch(out, "Changed password OK", "setpassword with url") + + for user in self.users: + newpasswd = self.randomPass() + (result, out, err) = self.runsubcmd("user", "setpassword", + user["name"], + "--newpassword=%s" % newpasswd) + # self.assertCmdSuccess(result, "Ensure setpassword runs") + self.assertEquals(err,"","setpassword without url") + self.assertMatch(out, "Changed password OK", "setpassword without url") + + for user in self.users: + newpasswd = self.randomPass() + (result, out, err) = self.runsubcmd("user", "setpassword", + user["name"], + "--newpassword=%s" % newpasswd, + "--must-change-at-next-login", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + # self.assertCmdSuccess(result, "Ensure setpassword runs") + self.assertEquals(err,"","setpassword with forced change") + self.assertMatch(out, "Changed password OK", "setpassword with forced change") + + + + + def test_setexpiry(self): + twodays = time.time() + (2 * 24 * 60 * 60) + + for user in self.users: + (result, out, err) = self.runsubcmd("user", "setexpiry", user["name"], + "--days=2", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, "Can we run setexpiry with names") + self.assertIn("Expiry for user '%s' set to 2 days." % user["name"], out) + + for user in self.users: + found = self._find_user(user["name"]) + + expires = nttime2unix(int("%s" % found.get("accountExpires"))) + self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time") + + # TODO: renable this after the filter case is sorted out + if "filters are broken, bail now": + return + + # now run the expiration based on a filter + fourdays = time.time() + (4 * 24 * 60 * 60) + (result, out, err) = self.runsubcmd("user", "setexpiry", + "--filter", "(&(objectClass=user)(company=comp2))", + "--days=4", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, "Can we run setexpiry with a filter") + + for user in self.users: + found = self._find_user(user["name"]) + if ("%s" % found.get("company")) == "comp2": + expires = nttime2unix(int("%s" % found.get("accountExpires"))) + self.assertWithin(expires, fourdays, 5, "Ensure account expires is within 5 seconds of the expected time") + else: + expires = nttime2unix(int("%s" % found.get("accountExpires"))) + self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time") + + + def test_list(self): + (result, out, err) = self.runsubcmd("user", "list", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, "Error running list") + + search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" % + (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT)) + + userlist = self.samdb.search(base=self.samdb.domain_dn(), + scope=ldb.SCOPE_SUBTREE, + expression=search_filter, + attrs=["samaccountname"]) + + self.assertTrue(len(userlist) > 0, "no users found in samdb") + + for userobj in userlist: + name = userobj.get("samaccountname", idx=0) + found = self.assertMatch(out, name, + "user '%s' not found" % name) + def test_getpwent(self): + try: + import pwd + except ImportError: + self.skipTest("Skipping getpwent test, no 'pwd' module available") + return + + # get the current user's data for the test + uid = os.geteuid() + try: + u = pwd.getpwuid(uid) + except KeyError: + self.skipTest("Skipping getpwent test, current EUID not found in NSS") + return + + user = self._randomPosixUser({ + "name": u[0], + "uid": u[0], + "uidNumber": u[2], + "gidNumber": u[3], + "gecos": u[4], + "loginShell": u[6], + }) + # check if --rfc2307-from-nss sets the same values as we got from pwd.getpwuid() + (result, out, err) = self.runsubcmd("user", "add", user["name"], user["password"], + "--surname=%s" % user["surname"], + "--given-name=%s" % user["given-name"], + "--job-title=%s" % user["job-title"], + "--department=%s" % user["department"], + "--description=%s" % user["description"], + "--company=%s" % user["company"], + "--rfc2307-from-nss", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + + self.assertCmdSuccess(result) + self.assertEquals(err,"","Shouldn't be any error messages") + self.assertIn("User '%s' created successfully" % user["name"], out) + + self._check_posix_user(user) + self.runsubcmd("user", "delete", user["name"]) + + # Check if overriding the attributes from NSS with explicit values works + # + # get a user with all random posix attributes + user = self._randomPosixUser({"name": u[0]}) + # create a user with posix attributes from nss but override all of them with the + # random ones just obtained + (result, out, err) = self.runsubcmd("user", "add", user["name"], user["password"], + "--surname=%s" % user["surname"], + "--given-name=%s" % user["given-name"], + "--job-title=%s" % user["job-title"], + "--department=%s" % user["department"], + "--description=%s" % user["description"], + "--company=%s" % user["company"], + "--rfc2307-from-nss", + "--gecos=%s" % user["gecos"], + "--login-shell=%s" % user["loginShell"], + "--uid=%s" % user["uid"], + "--uid-number=%s" % user["uidNumber"], + "--gid-number=%s" % user["gidNumber"], + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + + self.assertCmdSuccess(result) + self.assertEquals(err,"","Shouldn't be any error messages") + self.assertIn("User '%s' created successfully" % user["name"], out) + + self._check_posix_user(user) + self.runsubcmd("user", "delete", user["name"]) + + def _randomUser(self, base={}): + """create a user with random attribute values, you can specify base attributes""" + user = { + "name": self.randomName(), + "password": self.randomPass(), + "surname": self.randomName(), + "given-name": self.randomName(), + "job-title": self.randomName(), + "department": self.randomName(), + "company": self.randomName(), + "description": self.randomName(count=100), + "createUserFn": self._create_user, + "checkUserFn": self._check_user, + } + user.update(base) + return user + + def _randomPosixUser(self, base={}): + """create a user with random attribute values and additional RFC2307 + attributes, you can specify base attributes""" + user = self._randomUser({}) + user.update(base) + posixAttributes = { + "uid": self.randomName(), + "loginShell": self.randomName(), + "gecos": self.randomName(), + "uidNumber": self.randomXid(), + "gidNumber": self.randomXid(), + "createUserFn": self._create_posix_user, + "checkUserFn": self._check_posix_user, + } + user.update(posixAttributes) + user.update(base) + return user + + def _check_user(self, user): + """ check if a user from SamDB has the same attributes as its template """ + found = self._find_user(user["name"]) + + self.assertEquals("%s" % found.get("name"), "%(given-name)s %(surname)s" % user) + self.assertEquals("%s" % found.get("title"), user["job-title"]) + self.assertEquals("%s" % found.get("company"), user["company"]) + self.assertEquals("%s" % found.get("description"), user["description"]) + self.assertEquals("%s" % found.get("department"), user["department"]) + + def _check_posix_user(self, user): + """ check if a posix_user from SamDB has the same attributes as its template """ + found = self._find_user(user["name"]) + + self.assertEquals("%s" % found.get("loginShell"), user["loginShell"]) + self.assertEquals("%s" % found.get("gecos"), user["gecos"]) + self.assertEquals("%s" % found.get("uidNumber"), "%s" % user["uidNumber"]) + self.assertEquals("%s" % found.get("gidNumber"), "%s" % user["gidNumber"]) + self.assertEquals("%s" % found.get("uid"), user["uid"]) + self._check_user(user) + + def _create_user(self, user): + return self.runsubcmd("user", "add", user["name"], user["password"], + "--surname=%s" % user["surname"], + "--given-name=%s" % user["given-name"], + "--job-title=%s" % user["job-title"], + "--department=%s" % user["department"], + "--description=%s" % user["description"], + "--company=%s" % user["company"], + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + def _create_posix_user(self, user): + """ create a new user with RFC2307 attributes """ + return self.runsubcmd("user", "create", user["name"], user["password"], + "--surname=%s" % user["surname"], + "--given-name=%s" % user["given-name"], + "--job-title=%s" % user["job-title"], + "--department=%s" % user["department"], + "--description=%s" % user["description"], + "--company=%s" % user["company"], + "--gecos=%s" % user["gecos"], + "--login-shell=%s" % user["loginShell"], + "--uid=%s" % user["uid"], + "--uid-number=%s" % user["uidNumber"], + "--gid-number=%s" % user["gidNumber"], + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + + def _find_user(self, name): + search_filter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % (ldb.binary_encode(name), "CN=Person,CN=Schema,CN=Configuration", self.samdb.domain_dn()) + userlist = self.samdb.search(base=self.samdb.domain_dn(), + scope=ldb.SCOPE_SUBTREE, + expression=search_filter, attrs=[]) + if userlist: + return userlist[0] + else: + return None |
