diff options
author | Kamen Mazdrashki <kamenim@samba.org> | 2015-01-21 00:58:56 +0200 |
---|---|---|
committer | Andrew Bartlett <abartlet@samba.org> | 2015-02-03 05:02:11 +0100 |
commit | ea4786875d90d1865c9e45324319865f513d02aa (patch) | |
tree | 7a1fc6fc08034936fc1f450e725ae2228897b467 /source4/dsdb | |
parent | 599187ead61340d8d3bd3e9db7eab034175bfd7b (diff) | |
download | samba-ea4786875d90d1865c9e45324319865f513d02aa.tar.gz samba-ea4786875d90d1865c9e45324319865f513d02aa.tar.xz samba-ea4786875d90d1865c9e45324319865f513d02aa.zip |
s4-dsdb-test: Initial implementation for Tombstone restore test suite
Change-Id: Ib35ff930b6e7cee14317328b6fe25b59eec5262c
Signed-off-by: Kamen Mazdrashki <kamenim@samba.org>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Garming Sam <garming@catalyst.net.nz>
Diffstat (limited to 'source4/dsdb')
-rw-r--r-- | source4/dsdb/tests/python/tombstone_reanimation.py | 285 |
1 files changed, 285 insertions, 0 deletions
diff --git a/source4/dsdb/tests/python/tombstone_reanimation.py b/source4/dsdb/tests/python/tombstone_reanimation.py new file mode 100644 index 0000000000..21df42d291 --- /dev/null +++ b/source4/dsdb/tests/python/tombstone_reanimation.py @@ -0,0 +1,285 @@ +#!/usr/bin/env python +# +# Tombstone reanimation tests +# +# Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2014 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import sys +import optparse +import unittest + +sys.path.insert(0, "bin/python") +import samba + +import samba.tests +import samba.getopt as options +from ldb import (SCOPE_BASE, FLAG_MOD_DELETE, FLAG_MOD_REPLACE, Dn, Message, MessageElement) + + +class RestoredObjectAttributesBaseTestCase(samba.tests.TestCase): + """ verify Samba restores required attributes when + user restores a Deleted object + """ + + def setUp(self): + super(RestoredObjectAttributesBaseTestCase, self).setUp() + # load LoadParm + lp = options.SambaOptions(optparse.OptionParser()).get_loadparm() + self.samdb = samba.tests.connect_samdb_env("TEST_SERVER", "TEST_USERNAME", "TEST_PASSWORD", lp=lp) + self.base_dn = self.samdb.domain_dn() + self.schema_dn = self.samdb.get_schema_basedn().get_linearized() + # Get the old "dSHeuristics" if it was set + self.dsheuristics = self.samdb.get_dsheuristics() + # Set the "dSHeuristics" to activate the correct "userPassword" behaviour + self.samdb.set_dsheuristics("000000001") + # Get the old "minPwdAge" + self.minPwdAge = self.samdb.get_minPwdAge() + # Set it temporary to "0" + self.samdb.set_minPwdAge("0") + + def tearDown(self): + super(RestoredObjectAttributesBaseTestCase, self).tearDown() + # Reset the "dSHeuristics" as they were before + self.samdb.set_dsheuristics(self.dsheuristics) + # Reset the "minPwdAge" as it was before + self.samdb.set_minPwdAge(self.minPwdAge) + + def GUID_string(self, guid): + return self.samdb.schema_format_value("objectGUID", guid) + + def search_guid(self, guid): + res = self.samdb.search(base="<GUID=%s>" % self.GUID_string(guid), + scope=SCOPE_BASE, controls=["show_deleted:1"]) + self.assertEquals(len(res), 1) + return res[0] + + def search_dn(self, dn): + res = self.samdb.search(expression="(objectClass=*)", + base=dn, + scope=SCOPE_BASE, + controls=["show_recycled:1"]) + self.assertEquals(len(res), 1) + return res[0] + + def _create_object(self, msg): + """:param msg: dict with dn and attributes to create an object from""" + # delete an object if leftover from previous test + samba.tests.delete_force(self.samdb, msg['dn']) + self.samdb.add(msg) + return self.search_dn(msg['dn']) + + def assertAttributesEqual(self, obj_orig, attrs_orig, obj_restored, attrs_rest): + self.assertSetEqual(attrs_orig, attrs_rest) + # remove volatile attributes, they can't be equal + attrs_orig -= set(["uSNChanged", "dSCorePropagationData", "whenChanged"]) + for attr in attrs_orig: + # convert original attr value to ldif + orig_val = obj_orig.get(attr) + if orig_val is None: + continue + if not isinstance(orig_val, MessageElement): + orig_val = MessageElement(str(orig_val), 0, attr ) + m = Message() + m.add(orig_val) + orig_ldif = self.samdb.write_ldif(m, 0) + # convert restored attr value to ldif + rest_val = obj_restored.get(attr) + self.assertIsNotNone(rest_val) + m = Message() + if not isinstance(rest_val, MessageElement): + rest_val = MessageElement(str(rest_val), 0, attr) + m.add(rest_val) + rest_ldif = self.samdb.write_ldif(m, 0) + # compare generated ldif's + self.assertEqual(orig_ldif.lower(), rest_ldif.lower()) + + @staticmethod + def restore_deleted_object(samdb, del_dn, new_dn): + """Restores a deleted object + :param samdb: SamDB connection to SAM + :param del_dn: str Deleted object DN + :param new_dn: str Where to restore the object + """ + msg = Message() + msg.dn = Dn(samdb, str(del_dn)) + msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted") + msg["distinguishedName"] = MessageElement([str(new_dn)], FLAG_MOD_REPLACE, "distinguishedName") + samdb.modify(msg, ["show_deleted:1"]) + + +class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase): + """Test cases for delete/reanimate user objects""" + + def test_restore_user(self): + print "Test restored user attributes" + username = "restore_user" + usr_dn = "cn=%s,cn=users,%s" % (username, self.base_dn) + samba.tests.delete_force(self.samdb, usr_dn) + self.samdb.add({ + "dn": usr_dn, + "objectClass": "user", + "sAMAccountName": username}) + obj = self.search_dn(usr_dn) + guid = obj["objectGUID"][0] + self.samdb.delete(usr_dn) + obj_del = self.search_guid(guid) + # restore the user and fetch what's restored + self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn) + obj_restore = self.search_guid(guid) + # check original attributes and restored one are same + orig_attrs = set(obj.keys()) + # windows restore more attributes that originally we have + orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent']) + rest_attrs = set(obj_restore.keys()) + self.assertSetEqual(orig_attrs, rest_attrs) + + +class RestoreGroupObjectTestCase(RestoredObjectAttributesBaseTestCase): + """Test different scenarios for delete/reanimate group objects""" + + def _make_object_dn(self, name): + return "cn=%s,cn=users,%s" % (name, self.base_dn) + + def _create_test_user(self, user_name): + user_dn = self._make_object_dn(user_name) + ldif = { + "dn": user_dn, + "objectClass": "user", + "sAMAccountName": user_name, + } + # delete an object if leftover from previous test + samba.tests.delete_force(self.samdb, user_dn) + # finally, create the group + self.samdb.add(ldif) + return self.search_dn(user_dn) + + def _create_test_group(self, group_name, members=None): + group_dn = self._make_object_dn(group_name) + ldif = { + "dn": group_dn, + "objectClass": "group", + "sAMAccountName": group_name, + } + try: + ldif["member"] = [str(usr_dn) for usr_dn in members] + except TypeError: + pass + # delete an object if leftover from previous test + samba.tests.delete_force(self.samdb, group_dn) + # finally, create the group + self.samdb.add(ldif) + return self.search_dn(group_dn) + + def test_plain_group(self): + print "Test restored Group attributes" + # create test group + obj = self._create_test_group("r_group") + guid = obj["objectGUID"][0] + # delete the group + self.samdb.delete(str(obj.dn)) + obj_del = self.search_guid(guid) + # restore the Group and fetch what's restored + self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn) + obj_restore = self.search_guid(guid) + # check original attributes and restored one are same + attr_orig = set(obj.keys()) + # windows restore more attributes that originally we have + attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent']) + attr_rest = set(obj_restore.keys()) + self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest) + + def test_group_with_members(self): + print "Test restored Group with members attributes" + # create test group + usr1 = self._create_test_user("r_user_1") + usr2 = self._create_test_user("r_user_2") + obj = self._create_test_group("r_group", [usr1.dn, usr2.dn]) + guid = obj["objectGUID"][0] + # delete the group + self.samdb.delete(str(obj.dn)) + obj_del = self.search_guid(guid) + # restore the Group and fetch what's restored + self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn) + obj_restore = self.search_guid(guid) + # check original attributes and restored one are same + attr_orig = set(obj.keys()) + # windows restore more attributes that originally we have + attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent']) + # and does not restore following attributes + attr_orig.remove("member") + attr_rest = set(obj_restore.keys()) + self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest) + + +class RestoreContainerObjectTestCase(RestoredObjectAttributesBaseTestCase): + """Test different scenarios for delete/reanimate OU/container objects""" + + def _create_test_ou(self, rdn, name=None, description=None): + ou_dn = "OU=%s,%s" % (rdn, self.base_dn) + # delete an object if leftover from previous test + samba.tests.delete_force(self.samdb, ou_dn) + # create ou and return created object + self.samdb.create_ou(ou_dn, name=name, description=description) + return self.search_dn(ou_dn) + + def test_ou_with_name_description(self): + print "Test OU reanimation" + # create OU to test with + obj = self._create_test_ou(rdn="r_ou", + name="r_ou name", + description="r_ou description") + guid = obj["objectGUID"][0] + # delete the object + self.samdb.delete(str(obj.dn)) + obj_del = self.search_guid(guid) + # restore the Object and fetch what's restored + self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn) + obj_restore = self.search_guid(guid) + # check original attributes and restored one are same + attr_orig = set(obj.keys()) + attr_rest = set(obj_restore.keys()) + # windows restore more attributes that originally we have + attr_orig.update(["lastKnownParent"]) + # and does not restore following attributes + attr_orig -= {"description"} + self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest) + + def test_container(self): + print "Test Container reanimation" + # create test Container + obj = self._create_object({ + "dn": "CN=r_container,CN=Users,%s" % self.base_dn, + "objectClass": "container" + }) + guid = obj["objectGUID"][0] + # delete the object + self.samdb.delete(str(obj.dn)) + obj_del = self.search_guid(guid) + # restore the Object and fetch what's restored + self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn) + obj_restore = self.search_guid(guid) + # check original attributes and restored one are same + attr_orig = set(obj.keys()) + attr_rest = set(obj_restore.keys()) + # windows restore more attributes that originally we have + attr_orig.update(["lastKnownParent"]) + # and does not restore following attributes + attr_orig -= {"showInAdvancedViewOnly"} + self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest) + + +if __name__ == '__main__': + unittest.main() |