summaryrefslogtreecommitdiffstats
path: root/source4/dsdb
diff options
context:
space:
mode:
authorKamen Mazdrashki <kamenim@samba.org>2015-01-21 00:58:56 +0200
committerAndrew Bartlett <abartlet@samba.org>2015-02-03 05:02:11 +0100
commitea4786875d90d1865c9e45324319865f513d02aa (patch)
tree7a1fc6fc08034936fc1f450e725ae2228897b467 /source4/dsdb
parent599187ead61340d8d3bd3e9db7eab034175bfd7b (diff)
downloadsamba-ea4786875d90d1865c9e45324319865f513d02aa.tar.gz
samba-ea4786875d90d1865c9e45324319865f513d02aa.tar.xz
samba-ea4786875d90d1865c9e45324319865f513d02aa.zip
s4-dsdb-test: Initial implementation for Tombstone restore test suite
Change-Id: Ib35ff930b6e7cee14317328b6fe25b59eec5262c Signed-off-by: Kamen Mazdrashki <kamenim@samba.org> Reviewed-by: Andrew Bartlett <abartlet@samba.org> Reviewed-by: Garming Sam <garming@catalyst.net.nz>
Diffstat (limited to 'source4/dsdb')
-rw-r--r--source4/dsdb/tests/python/tombstone_reanimation.py285
1 files changed, 285 insertions, 0 deletions
diff --git a/source4/dsdb/tests/python/tombstone_reanimation.py b/source4/dsdb/tests/python/tombstone_reanimation.py
new file mode 100644
index 0000000000..21df42d291
--- /dev/null
+++ b/source4/dsdb/tests/python/tombstone_reanimation.py
@@ -0,0 +1,285 @@
+#!/usr/bin/env python
+#
+# Tombstone reanimation tests
+#
+# Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2014
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import sys
+import optparse
+import unittest
+
+sys.path.insert(0, "bin/python")
+import samba
+
+import samba.tests
+import samba.getopt as options
+from ldb import (SCOPE_BASE, FLAG_MOD_DELETE, FLAG_MOD_REPLACE, Dn, Message, MessageElement)
+
+
+class RestoredObjectAttributesBaseTestCase(samba.tests.TestCase):
+ """ verify Samba restores required attributes when
+ user restores a Deleted object
+ """
+
+ def setUp(self):
+ super(RestoredObjectAttributesBaseTestCase, self).setUp()
+ # load LoadParm
+ lp = options.SambaOptions(optparse.OptionParser()).get_loadparm()
+ self.samdb = samba.tests.connect_samdb_env("TEST_SERVER", "TEST_USERNAME", "TEST_PASSWORD", lp=lp)
+ self.base_dn = self.samdb.domain_dn()
+ self.schema_dn = self.samdb.get_schema_basedn().get_linearized()
+ # Get the old "dSHeuristics" if it was set
+ self.dsheuristics = self.samdb.get_dsheuristics()
+ # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
+ self.samdb.set_dsheuristics("000000001")
+ # Get the old "minPwdAge"
+ self.minPwdAge = self.samdb.get_minPwdAge()
+ # Set it temporary to "0"
+ self.samdb.set_minPwdAge("0")
+
+ def tearDown(self):
+ super(RestoredObjectAttributesBaseTestCase, self).tearDown()
+ # Reset the "dSHeuristics" as they were before
+ self.samdb.set_dsheuristics(self.dsheuristics)
+ # Reset the "minPwdAge" as it was before
+ self.samdb.set_minPwdAge(self.minPwdAge)
+
+ def GUID_string(self, guid):
+ return self.samdb.schema_format_value("objectGUID", guid)
+
+ def search_guid(self, guid):
+ res = self.samdb.search(base="<GUID=%s>" % self.GUID_string(guid),
+ scope=SCOPE_BASE, controls=["show_deleted:1"])
+ self.assertEquals(len(res), 1)
+ return res[0]
+
+ def search_dn(self, dn):
+ res = self.samdb.search(expression="(objectClass=*)",
+ base=dn,
+ scope=SCOPE_BASE,
+ controls=["show_recycled:1"])
+ self.assertEquals(len(res), 1)
+ return res[0]
+
+ def _create_object(self, msg):
+ """:param msg: dict with dn and attributes to create an object from"""
+ # delete an object if leftover from previous test
+ samba.tests.delete_force(self.samdb, msg['dn'])
+ self.samdb.add(msg)
+ return self.search_dn(msg['dn'])
+
+ def assertAttributesEqual(self, obj_orig, attrs_orig, obj_restored, attrs_rest):
+ self.assertSetEqual(attrs_orig, attrs_rest)
+ # remove volatile attributes, they can't be equal
+ attrs_orig -= set(["uSNChanged", "dSCorePropagationData", "whenChanged"])
+ for attr in attrs_orig:
+ # convert original attr value to ldif
+ orig_val = obj_orig.get(attr)
+ if orig_val is None:
+ continue
+ if not isinstance(orig_val, MessageElement):
+ orig_val = MessageElement(str(orig_val), 0, attr )
+ m = Message()
+ m.add(orig_val)
+ orig_ldif = self.samdb.write_ldif(m, 0)
+ # convert restored attr value to ldif
+ rest_val = obj_restored.get(attr)
+ self.assertIsNotNone(rest_val)
+ m = Message()
+ if not isinstance(rest_val, MessageElement):
+ rest_val = MessageElement(str(rest_val), 0, attr)
+ m.add(rest_val)
+ rest_ldif = self.samdb.write_ldif(m, 0)
+ # compare generated ldif's
+ self.assertEqual(orig_ldif.lower(), rest_ldif.lower())
+
+ @staticmethod
+ def restore_deleted_object(samdb, del_dn, new_dn):
+ """Restores a deleted object
+ :param samdb: SamDB connection to SAM
+ :param del_dn: str Deleted object DN
+ :param new_dn: str Where to restore the object
+ """
+ msg = Message()
+ msg.dn = Dn(samdb, str(del_dn))
+ msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
+ msg["distinguishedName"] = MessageElement([str(new_dn)], FLAG_MOD_REPLACE, "distinguishedName")
+ samdb.modify(msg, ["show_deleted:1"])
+
+
+class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase):
+ """Test cases for delete/reanimate user objects"""
+
+ def test_restore_user(self):
+ print "Test restored user attributes"
+ username = "restore_user"
+ usr_dn = "cn=%s,cn=users,%s" % (username, self.base_dn)
+ samba.tests.delete_force(self.samdb, usr_dn)
+ self.samdb.add({
+ "dn": usr_dn,
+ "objectClass": "user",
+ "sAMAccountName": username})
+ obj = self.search_dn(usr_dn)
+ guid = obj["objectGUID"][0]
+ self.samdb.delete(usr_dn)
+ obj_del = self.search_guid(guid)
+ # restore the user and fetch what's restored
+ self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn)
+ obj_restore = self.search_guid(guid)
+ # check original attributes and restored one are same
+ orig_attrs = set(obj.keys())
+ # windows restore more attributes that originally we have
+ orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent'])
+ rest_attrs = set(obj_restore.keys())
+ self.assertSetEqual(orig_attrs, rest_attrs)
+
+
+class RestoreGroupObjectTestCase(RestoredObjectAttributesBaseTestCase):
+ """Test different scenarios for delete/reanimate group objects"""
+
+ def _make_object_dn(self, name):
+ return "cn=%s,cn=users,%s" % (name, self.base_dn)
+
+ def _create_test_user(self, user_name):
+ user_dn = self._make_object_dn(user_name)
+ ldif = {
+ "dn": user_dn,
+ "objectClass": "user",
+ "sAMAccountName": user_name,
+ }
+ # delete an object if leftover from previous test
+ samba.tests.delete_force(self.samdb, user_dn)
+ # finally, create the group
+ self.samdb.add(ldif)
+ return self.search_dn(user_dn)
+
+ def _create_test_group(self, group_name, members=None):
+ group_dn = self._make_object_dn(group_name)
+ ldif = {
+ "dn": group_dn,
+ "objectClass": "group",
+ "sAMAccountName": group_name,
+ }
+ try:
+ ldif["member"] = [str(usr_dn) for usr_dn in members]
+ except TypeError:
+ pass
+ # delete an object if leftover from previous test
+ samba.tests.delete_force(self.samdb, group_dn)
+ # finally, create the group
+ self.samdb.add(ldif)
+ return self.search_dn(group_dn)
+
+ def test_plain_group(self):
+ print "Test restored Group attributes"
+ # create test group
+ obj = self._create_test_group("r_group")
+ guid = obj["objectGUID"][0]
+ # delete the group
+ self.samdb.delete(str(obj.dn))
+ obj_del = self.search_guid(guid)
+ # restore the Group and fetch what's restored
+ self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
+ obj_restore = self.search_guid(guid)
+ # check original attributes and restored one are same
+ attr_orig = set(obj.keys())
+ # windows restore more attributes that originally we have
+ attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
+ attr_rest = set(obj_restore.keys())
+ self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
+
+ def test_group_with_members(self):
+ print "Test restored Group with members attributes"
+ # create test group
+ usr1 = self._create_test_user("r_user_1")
+ usr2 = self._create_test_user("r_user_2")
+ obj = self._create_test_group("r_group", [usr1.dn, usr2.dn])
+ guid = obj["objectGUID"][0]
+ # delete the group
+ self.samdb.delete(str(obj.dn))
+ obj_del = self.search_guid(guid)
+ # restore the Group and fetch what's restored
+ self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
+ obj_restore = self.search_guid(guid)
+ # check original attributes and restored one are same
+ attr_orig = set(obj.keys())
+ # windows restore more attributes that originally we have
+ attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
+ # and does not restore following attributes
+ attr_orig.remove("member")
+ attr_rest = set(obj_restore.keys())
+ self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
+
+
+class RestoreContainerObjectTestCase(RestoredObjectAttributesBaseTestCase):
+ """Test different scenarios for delete/reanimate OU/container objects"""
+
+ def _create_test_ou(self, rdn, name=None, description=None):
+ ou_dn = "OU=%s,%s" % (rdn, self.base_dn)
+ # delete an object if leftover from previous test
+ samba.tests.delete_force(self.samdb, ou_dn)
+ # create ou and return created object
+ self.samdb.create_ou(ou_dn, name=name, description=description)
+ return self.search_dn(ou_dn)
+
+ def test_ou_with_name_description(self):
+ print "Test OU reanimation"
+ # create OU to test with
+ obj = self._create_test_ou(rdn="r_ou",
+ name="r_ou name",
+ description="r_ou description")
+ guid = obj["objectGUID"][0]
+ # delete the object
+ self.samdb.delete(str(obj.dn))
+ obj_del = self.search_guid(guid)
+ # restore the Object and fetch what's restored
+ self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
+ obj_restore = self.search_guid(guid)
+ # check original attributes and restored one are same
+ attr_orig = set(obj.keys())
+ attr_rest = set(obj_restore.keys())
+ # windows restore more attributes that originally we have
+ attr_orig.update(["lastKnownParent"])
+ # and does not restore following attributes
+ attr_orig -= {"description"}
+ self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
+
+ def test_container(self):
+ print "Test Container reanimation"
+ # create test Container
+ obj = self._create_object({
+ "dn": "CN=r_container,CN=Users,%s" % self.base_dn,
+ "objectClass": "container"
+ })
+ guid = obj["objectGUID"][0]
+ # delete the object
+ self.samdb.delete(str(obj.dn))
+ obj_del = self.search_guid(guid)
+ # restore the Object and fetch what's restored
+ self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
+ obj_restore = self.search_guid(guid)
+ # check original attributes and restored one are same
+ attr_orig = set(obj.keys())
+ attr_rest = set(obj_restore.keys())
+ # windows restore more attributes that originally we have
+ attr_orig.update(["lastKnownParent"])
+ # and does not restore following attributes
+ attr_orig -= {"showInAdvancedViewOnly"}
+ self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
+
+
+if __name__ == '__main__':
+ unittest.main()