From 6a0856da9cc075acaa7fcb6bad614f8f403df9c7 Mon Sep 17 00:00:00 2001 From: Matthieu Patou Date: Wed, 16 Jun 2010 18:47:18 +0400 Subject: s4 dsdb: Use the changereplmetadata control This control allow to specify the replPropertyMetaData attribute to be specified on modify request. It can be used for very specific needs to tweak the content of the replication data. Signed-off-by: Andrew Bartlett --- source4/scripting/python/samba/tests/dsdb.py | 89 ++++++++++++++++++++++++---- 1 file changed, 78 insertions(+), 11 deletions(-) (limited to 'source4/scripting/python/samba/tests') diff --git a/source4/scripting/python/samba/tests/dsdb.py b/source4/scripting/python/samba/tests/dsdb.py index c19dbaab47..d28be82984 100644 --- a/source4/scripting/python/samba/tests/dsdb.py +++ b/source4/scripting/python/samba/tests/dsdb.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Unix SMB/CIFS implementation. Tests for dsdb +# Unix SMB/CIFS implementation. Tests for dsdb # Copyright (C) Matthieu Patou 2010 # # This program is free software; you can redistribute it and/or modify @@ -17,26 +17,93 @@ # along with this program. If not, see . # -import samba.dsdb from samba.credentials import Credentials from samba.samdb import SamDB from samba.auth import system_session from testtools.testcase import TestCase +from samba.ndr import ndr_unpack, ndr_pack +from samba.dcerpc import drsblobs +import ldb import os +import samba.dsdb class DsdbTests(TestCase): - def _baseprovpath(self): + + def setUp(self): + super(DsdbTests, self).setUp() + self.lp = samba.param.LoadParm() + self.lp.load(os.path.join(os.path.join(self.baseprovpath(), "etc"), "smb.conf")) + self.creds = Credentials() + self.creds.guess(self.lp) + self.session = system_session() + self.samdb = SamDB(os.path.join(self.baseprovpath(), "private", "sam.ldb"), + session_info=self.session, credentials=self.creds,lp=self.lp) + + + def baseprovpath(self): return os.path.join(os.environ['SELFTEST_PREFIX'], "dc") + def test_get_oid_from_attrid(self): - lp = samba.param.LoadParm() - lp.load(os.path.join(os.path.join(self._baseprovpath(), "etc"), "smb.conf")) - creds = Credentials() - creds.guess(lp) - session = system_session() - test_ldb = SamDB(os.path.join(self._baseprovpath(), "private", "sam.ldb"), - session_info=session, credentials=creds,lp=lp) - oid = test_ldb.get_oid_from_attid(591614) + oid = self.samdb.get_oid_from_attid(591614) self.assertEquals(oid, "1.2.840.113556.1.4.1790") + + def test_error_replpropertymetadata(self): + res = self.samdb.search(expression="cn=Administrator", + scope=ldb.SCOPE_SUBTREE, + attrs=["replPropertyMetaData"]) + repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, + str(res[0]["replPropertyMetaData"])) + ctr = repl.ctr + for o in ctr.array: + # Search for Description + if o.attid == 13: + old_version = o.version + o.version = o.version + 1 + replBlob = ndr_pack(repl) + msg = ldb.Message() + msg.dn = res[0].dn + msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData") + self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"]) + + def test_twoatt_replpropertymetadata(self): + res = self.samdb.search(expression="cn=Administrator", + scope=ldb.SCOPE_SUBTREE, + attrs=["replPropertyMetaData", "uSNChanged"]) + repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, + str(res[0]["replPropertyMetaData"])) + ctr = repl.ctr + for o in ctr.array: + # Search for Description + if o.attid == 13: + old_version = o.version + o.version = o.version + 1 + o.local_usn = long(str(res[0]["uSNChanged"])) + 1 + replBlob = ndr_pack(repl) + msg = ldb.Message() + msg.dn = res[0].dn + msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData") + msg["description"] = ldb.MessageElement("new val", ldb.FLAG_MOD_REPLACE, "description") + self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"]) + + def test_set_replpropertymetadata(self): + res = self.samdb.search(expression="cn=Administrator", + scope=ldb.SCOPE_SUBTREE, + attrs=["replPropertyMetaData", "uSNChanged"]) + repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, + str(res[0]["replPropertyMetaData"])) + ctr = repl.ctr + for o in ctr.array: + # Search for Description + if o.attid == 13: + old_version = o.version + o.version = o.version + 1 + o.local_usn = long(str(res[0]["uSNChanged"])) + 1 + o.originating_usn = long(str(res[0]["uSNChanged"])) + 1 + replBlob = ndr_pack(repl) + msg = ldb.Message() + msg.dn = res[0].dn + msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData") + self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"]) -- cgit From f97c90c9cd124314b4a0862e702dd021bd2df9a0 Mon Sep 17 00:00:00 2001 From: Matthieu Patou Date: Tue, 22 Jun 2010 20:03:15 +0400 Subject: s4 python: Add functions to samdb to manipulate version of replPropertyMetaData attribute This change contains also helpers for attribute id to attribute oid conversion and from attribute id to attribute name. It brings also unit tests Signed-off-by: Andrew Bartlett --- source4/scripting/python/samba/tests/dsdb.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) (limited to 'source4/scripting/python/samba/tests') diff --git a/source4/scripting/python/samba/tests/dsdb.py b/source4/scripting/python/samba/tests/dsdb.py index d28be82984..4a50c96e25 100644 --- a/source4/scripting/python/samba/tests/dsdb.py +++ b/source4/scripting/python/samba/tests/dsdb.py @@ -25,7 +25,7 @@ from samba.ndr import ndr_unpack, ndr_pack from samba.dcerpc import drsblobs import ldb import os -import samba.dsdb +import samba class DsdbTests(TestCase): @@ -107,3 +107,27 @@ class DsdbTests(TestCase): msg.dn = res[0].dn msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData") self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"]) + + def test_ok_get_attribute_from_attid(self): + self.assertEquals(self.samdb.get_attribute_from_attid(13), "description") + + def test_ko_get_attribute_from_attid(self): + self.assertEquals(self.samdb.get_attribute_from_attid(11979), None) + + def test_get_attribute_replmetadata_version(self): + res = self.samdb.search(expression="cn=Administrator", + scope=ldb.SCOPE_SUBTREE, + attrs=["dn"]) + self.assertEquals(len(res), 1) + dn = str(res[0].dn) + self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 1) + + def test_set_attribute_replmetadata_version(self): + res = self.samdb.search(expression="cn=Administrator", + scope=ldb.SCOPE_SUBTREE, + attrs=["dn"]) + self.assertEquals(len(res), 1) + dn = str(res[0].dn) + version = self.samdb.get_attribute_replmetadata_version(dn, "description") + self.samdb.set_attribute_replmetadata_version(dn, "description", version + 2) + self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "description"), version + 2) -- cgit From 62a32975c86e71eb1c5efeec0a4dee9d1ac20c10 Mon Sep 17 00:00:00 2001 From: Matthieu Patou Date: Tue, 15 Jun 2010 12:54:05 +0400 Subject: s4: Add unit test for increment_calculated_keyversion_number Signed-off-by: Andrew Bartlett --- .../python/samba/tests/upgradeprovisionneeddc.py | 26 +++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) (limited to 'source4/scripting/python/samba/tests') diff --git a/source4/scripting/python/samba/tests/upgradeprovisionneeddc.py b/source4/scripting/python/samba/tests/upgradeprovisionneeddc.py index b32ffc6155..e30906fc6b 100644 --- a/source4/scripting/python/samba/tests/upgradeprovisionneeddc.py +++ b/source4/scripting/python/samba/tests/upgradeprovisionneeddc.py @@ -29,7 +29,8 @@ from samba.upgradehelpers import (get_paths, get_ldbs, find_provision_key_parameters, identic_rename, updateOEMInfo, getOEMInfo, update_gpo, delta_update_basesamdb, - search_constructed_attrs_stored) + search_constructed_attrs_stored, + increment_calculated_keyversion_number) from samba.tests import env_loadparm, TestCaseInTempDir from samba.tests.provision import create_dummy_secretsdb import ldb @@ -91,6 +92,29 @@ class UpgradeProvisionWithLdbTestCase(TestCaseInTempDir): ["msds-KeyVersionNumber"]) self.assertFalse(hashAtt.has_key("msds-KeyVersionNumber")) + def test_increment_calculated_keyversion_number(self): + dn = "CN=Administrator,CN=Users,%s" % self.names.rootdn + # We conctruct a simple hash for the user administrator + hash = {} + # And we want the version to be 140 + hash[dn.lower()] = 140 + + increment_calculated_keyversion_number(self.ldbs.sam, + self.names.rootdn, + hash) + self.assertEqual(self.ldbs.sam.get_attribute_replmetadata_version(dn, + "unicodePwd"), + 140) + # This function should not decrement the version + hash[dn.lower()] = 130 + + increment_calculated_keyversion_number(self.ldbs.sam, + self.names.rootdn, + hash) + self.assertEqual(self.ldbs.sam.get_attribute_replmetadata_version(dn, + "unicodePwd"), + 140) + def test_identic_rename(self): rootdn = "DC=samba,DC=example,DC=com" -- cgit