summaryrefslogtreecommitdiffstats
path: root/python/samba/tests/dcerpc
diff options
context:
space:
mode:
authorJelmer Vernooij <jelmer@samba.org>2012-12-28 15:37:14 +0100
committerAndrew Bartlett <abartlet@samba.org>2013-03-02 03:57:34 +0100
commit87afc3aee1ea593069322a49355dd8780d99e123 (patch)
tree8e1ea6678d93b53f21b34c4940b7d5a64e0f5020 /python/samba/tests/dcerpc
parent80fce353e740c793619005ac102ab07fb5e7d280 (diff)
downloadsamba-87afc3aee1ea593069322a49355dd8780d99e123.tar.gz
samba-87afc3aee1ea593069322a49355dd8780d99e123.tar.xz
samba-87afc3aee1ea593069322a49355dd8780d99e123.zip
Move python modules from source4/scripting/python/ to python/.
Reviewed-by: Andrew Bartlett <abartlet@samba.org> Autobuild-User(master): Andrew Bartlett <abartlet@samba.org> Autobuild-Date(master): Sat Mar 2 03:57:34 CET 2013 on sn-devel-104
Diffstat (limited to 'python/samba/tests/dcerpc')
-rw-r--r--python/samba/tests/dcerpc/__init__.py20
-rw-r--r--python/samba/tests/dcerpc/bare.py51
-rw-r--r--python/samba/tests/dcerpc/dnsserver.py241
-rw-r--r--python/samba/tests/dcerpc/misc.py62
-rw-r--r--python/samba/tests/dcerpc/registry.py51
-rw-r--r--python/samba/tests/dcerpc/rpc_talloc.py84
-rw-r--r--python/samba/tests/dcerpc/rpcecho.py71
-rw-r--r--python/samba/tests/dcerpc/sam.py50
-rw-r--r--python/samba/tests/dcerpc/srvsvc.py68
-rw-r--r--python/samba/tests/dcerpc/testrpc.py141
-rw-r--r--python/samba/tests/dcerpc/unix.py49
11 files changed, 888 insertions, 0 deletions
diff --git a/python/samba/tests/dcerpc/__init__.py b/python/samba/tests/dcerpc/__init__.py
new file mode 100644
index 0000000000..d84cb57a09
--- /dev/null
+++ b/python/samba/tests/dcerpc/__init__.py
@@ -0,0 +1,20 @@
+# -*- coding: utf-8 -*-
+#
+# Unix SMB/CIFS implementation.
+# Copyright © Jelmer Vernooij <jelmer@samba.org> 2008
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""Tests for the DCE/RPC Python bindings."""
+
diff --git a/python/samba/tests/dcerpc/bare.py b/python/samba/tests/dcerpc/bare.py
new file mode 100644
index 0000000000..3efbf9d4cf
--- /dev/null
+++ b/python/samba/tests/dcerpc/bare.py
@@ -0,0 +1,51 @@
+# -*- coding: utf-8 -*-
+#
+# Unix SMB/CIFS implementation.
+# Copyright © Jelmer Vernooij <jelmer@samba.org> 2008
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""Tests for samba.tests.dcerpc.bare."""
+
+from samba.dcerpc import ClientConnection
+import samba.tests
+
+class BareTestCase(samba.tests.TestCase):
+
+ def test_bare(self):
+ # Connect to the echo pipe
+ x = ClientConnection("ncalrpc:localhost[DEFAULT]",
+ ("60a15ec5-4de8-11d7-a637-005056a20182", 1),
+ lp_ctx=samba.tests.env_loadparm())
+ self.assertEquals("\x01\x00\x00\x00", x.request(0, chr(0) * 4))
+
+ def test_alter_context(self):
+ x = ClientConnection("ncalrpc:localhost[DEFAULT]",
+ ("12345778-1234-abcd-ef00-0123456789ac", 1),
+ lp_ctx=samba.tests.env_loadparm())
+ y = ClientConnection("ncalrpc:localhost",
+ ("60a15ec5-4de8-11d7-a637-005056a20182", 1),
+ basis_connection=x, lp_ctx=samba.tests.env_loadparm())
+ x.alter_context(("60a15ec5-4de8-11d7-a637-005056a20182", 1))
+ # FIXME: self.assertEquals("\x01\x00\x00\x00", x.request(0, chr(0) * 4))
+
+ def test_two_connections(self):
+ x = ClientConnection("ncalrpc:localhost[DEFAULT]",
+ ("60a15ec5-4de8-11d7-a637-005056a20182", 1),
+ lp_ctx=samba.tests.env_loadparm())
+ y = ClientConnection("ncalrpc:localhost",
+ ("60a15ec5-4de8-11d7-a637-005056a20182", 1),
+ basis_connection=x, lp_ctx=samba.tests.env_loadparm())
+ self.assertEquals("\x01\x00\x00\x00", y.request(0, chr(0) * 4))
diff --git a/python/samba/tests/dcerpc/dnsserver.py b/python/samba/tests/dcerpc/dnsserver.py
new file mode 100644
index 0000000000..59d6eee761
--- /dev/null
+++ b/python/samba/tests/dcerpc/dnsserver.py
@@ -0,0 +1,241 @@
+# Unix SMB/CIFS implementation.
+# Copyright (C) Amitay Isaacs <amitay@gmail.com> 2011
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""Tests for samba.dcerpc.dnsserver"""
+
+from samba.dcerpc import dnsp, dnsserver
+from samba.tests import RpcInterfaceTestCase, env_get_var_value
+from samba.netcmd.dns import ARecord
+
+class DnsserverTests(RpcInterfaceTestCase):
+
+ def setUp(self):
+ super(DnsserverTests, self).setUp()
+ self.server = env_get_var_value("SERVER_IP")
+ self.zone = env_get_var_value("REALM").lower()
+ self.conn = dnsserver.dnsserver("ncacn_ip_tcp:%s" % (self.server),
+ self.get_loadparm(),
+ self.get_credentials())
+
+ def test_operation2(self):
+ pass
+
+
+ def test_query2(self):
+ typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_W2K,
+ 0,
+ self.server,
+ None,
+ 'ServerInfo')
+ self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO_W2K, typeid)
+
+ typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_DOTNET,
+ 0,
+ self.server,
+ None,
+ 'ServerInfo')
+ self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO_DOTNET, typeid)
+
+ typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0,
+ self.server,
+ None,
+ 'ServerInfo')
+ self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO, typeid)
+
+ def test_operation2(self):
+ client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
+ rev_zone = '1.168.192.in-addr.arpa'
+
+ zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
+ zone_create.pszZoneName = rev_zone
+ zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
+ zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE
+ zone_create.fAging = 0
+ zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
+
+ # Create zone
+ self.conn.DnssrvOperation2(client_version,
+ 0,
+ self.server,
+ None,
+ 0,
+ 'ZoneCreate',
+ dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
+ zone_create)
+
+ request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE |
+ dnsserver.DNS_ZONE_REQUEST_PRIMARY)
+ typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
+ 0,
+ self.server,
+ None,
+ 'EnumZones',
+ dnsserver.DNSSRV_TYPEID_DWORD,
+ request_filter)
+ self.assertEquals(1, zones.dwZoneCount)
+
+ # Delete zone
+ self.conn.DnssrvOperation2(client_version,
+ 0,
+ self.server,
+ rev_zone,
+ 0,
+ 'DeleteZoneFromDs',
+ dnsserver.DNSSRV_TYPEID_NULL,
+ None)
+
+ typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
+ 0,
+ self.server,
+ None,
+ 'EnumZones',
+ dnsserver.DNSSRV_TYPEID_DWORD,
+ request_filter)
+ self.assertEquals(0, zones.dwZoneCount)
+
+
+ def test_complexoperation2(self):
+ client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
+ request_filter = (dnsserver.DNS_ZONE_REQUEST_FORWARD |
+ dnsserver.DNS_ZONE_REQUEST_PRIMARY)
+ typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
+ 0,
+ self.server,
+ None,
+ 'EnumZones',
+ dnsserver.DNSSRV_TYPEID_DWORD,
+ request_filter)
+ self.assertEquals(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid)
+ self.assertEquals(2, zones.dwZoneCount)
+
+ request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE |
+ dnsserver.DNS_ZONE_REQUEST_PRIMARY)
+ typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
+ 0,
+ self.server,
+ None,
+ 'EnumZones',
+ dnsserver.DNSSRV_TYPEID_DWORD,
+ request_filter)
+ self.assertEquals(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid)
+ self.assertEquals(0, zones.dwZoneCount)
+
+
+ def test_enumrecords2(self):
+ client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
+ record_type = dnsp.DNS_TYPE_NS
+ select_flags = (dnsserver.DNS_RPC_VIEW_ROOT_HINT_DATA |
+ dnsserver.DNS_RPC_VIEW_ADDITIONAL_DATA)
+ buflen, roothints = self.conn.DnssrvEnumRecords2(client_version,
+ 0,
+ self.server,
+ '..RootHints',
+ '.',
+ None,
+ record_type,
+ select_flags,
+ None,
+ None)
+ self.assertEquals(14, roothints.count) # 1 NS + 13 A records (a-m)
+
+
+ def test_updaterecords2(self):
+ client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
+ record_type = dnsp.DNS_TYPE_A
+ select_flags = dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA
+
+ name = 'dummy'
+ rec = ARecord('1.2.3.4')
+ rec2 = ARecord('5.6.7.8')
+
+ # Add record
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+ self.conn.DnssrvUpdateRecord2(client_version,
+ 0,
+ self.server,
+ self.zone,
+ name,
+ add_rec_buf,
+ None)
+
+ buflen, result = self.conn.DnssrvEnumRecords2(client_version,
+ 0,
+ self.server,
+ self.zone,
+ name,
+ None,
+ record_type,
+ select_flags,
+ None,
+ None)
+ self.assertEquals(1, result.count)
+ self.assertEquals(1, result.rec[0].wRecordCount)
+ self.assertEquals(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType)
+ self.assertEquals('1.2.3.4', result.rec[0].records[0].data)
+
+ # Update record
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec2
+ del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ del_rec_buf.rec = rec
+ self.conn.DnssrvUpdateRecord2(client_version,
+ 0,
+ self.server,
+ self.zone,
+ name,
+ add_rec_buf,
+ del_rec_buf)
+
+ buflen, result = self.conn.DnssrvEnumRecords2(client_version,
+ 0,
+ self.server,
+ self.zone,
+ name,
+ None,
+ record_type,
+ select_flags,
+ None,
+ None)
+ self.assertEquals(1, result.count)
+ self.assertEquals(1, result.rec[0].wRecordCount)
+ self.assertEquals(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType)
+ self.assertEquals('5.6.7.8', result.rec[0].records[0].data)
+
+ # Delete record
+ del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ del_rec_buf.rec = rec2
+ self.conn.DnssrvUpdateRecord2(client_version,
+ 0,
+ self.server,
+ self.zone,
+ name,
+ None,
+ del_rec_buf)
+
+ self.assertRaises(RuntimeError, self.conn.DnssrvEnumRecords2,
+ client_version,
+ 0,
+ self.server,
+ self.zone,
+ name,
+ None,
+ record_type,
+ select_flags,
+ None,
+ None)
diff --git a/python/samba/tests/dcerpc/misc.py b/python/samba/tests/dcerpc/misc.py
new file mode 100644
index 0000000000..11e14aadfa
--- /dev/null
+++ b/python/samba/tests/dcerpc/misc.py
@@ -0,0 +1,62 @@
+# Unix SMB/CIFS implementation.
+# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""Tests for samba.dcerpc.misc."""
+
+from samba.dcerpc import misc
+import samba.tests
+
+text1 = "76f53846-a7c2-476a-ae2c-20e2b80d7b34"
+text2 = "344edffa-330a-4b39-b96e-2c34da52e8b1"
+
+class GUIDTests(samba.tests.TestCase):
+
+ def test_str(self):
+ guid = misc.GUID(text1)
+ self.assertEquals(text1, str(guid))
+
+ def test_repr(self):
+ guid = misc.GUID(text1)
+ self.assertEquals("GUID('%s')" % text1, repr(guid))
+
+ def test_compare_different(self):
+ guid1 = misc.GUID(text1)
+ guid2 = misc.GUID(text2)
+ self.assertTrue(cmp(guid1, guid2) > 0)
+
+ def test_compare_same(self):
+ guid1 = misc.GUID(text1)
+ guid2 = misc.GUID(text1)
+ self.assertEquals(0, cmp(guid1, guid2))
+ self.assertEquals(guid1, guid2)
+
+
+class PolicyHandleTests(samba.tests.TestCase):
+
+ def test_init(self):
+ x = misc.policy_handle(text1, 1)
+ self.assertEquals(1, x.handle_type)
+ self.assertEquals(text1, str(x.uuid))
+
+ def test_repr(self):
+ x = misc.policy_handle(text1, 42)
+ self.assertEquals("policy_handle(%d, '%s')" % (42, text1), repr(x))
+
+ def test_str(self):
+ x = misc.policy_handle(text1, 42)
+ self.assertEquals("%d, %s" % (42, text1), str(x))
+
diff --git a/python/samba/tests/dcerpc/registry.py b/python/samba/tests/dcerpc/registry.py
new file mode 100644
index 0000000000..c7bcbfd530
--- /dev/null
+++ b/python/samba/tests/dcerpc/registry.py
@@ -0,0 +1,51 @@
+# Unix SMB/CIFS implementation.
+# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2008
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""Tests for samba.dcerpc.registry."""
+
+from samba.dcerpc import winreg
+from samba.tests import RpcInterfaceTestCase
+
+
+class WinregTests(RpcInterfaceTestCase):
+
+ def setUp(self):
+ super(WinregTests, self).setUp()
+ self.conn = winreg.winreg("ncalrpc:", self.get_loadparm(),
+ self.get_credentials())
+
+ def get_hklm(self):
+ return self.conn.OpenHKLM(None,
+ winreg.KEY_QUERY_VALUE | winreg.KEY_ENUMERATE_SUB_KEYS)
+
+ def test_hklm(self):
+ handle = self.conn.OpenHKLM(None,
+ winreg.KEY_QUERY_VALUE | winreg.KEY_ENUMERATE_SUB_KEYS)
+ self.conn.CloseKey(handle)
+
+ def test_getversion(self):
+ handle = self.get_hklm()
+ version = self.conn.GetVersion(handle)
+ self.assertEquals(int, version.__class__)
+ self.conn.CloseKey(handle)
+
+ def test_getkeyinfo(self):
+ handle = self.conn.OpenHKLM(None,
+ winreg.KEY_QUERY_VALUE | winreg.KEY_ENUMERATE_SUB_KEYS)
+ x = self.conn.QueryInfoKey(handle, winreg.String())
+ self.assertEquals(9, len(x)) # should return a 9-tuple
+ self.conn.CloseKey(handle)
diff --git a/python/samba/tests/dcerpc/rpc_talloc.py b/python/samba/tests/dcerpc/rpc_talloc.py
new file mode 100644
index 0000000000..c091f26c1b
--- /dev/null
+++ b/python/samba/tests/dcerpc/rpc_talloc.py
@@ -0,0 +1,84 @@
+# test generated python code from pidl
+# Copyright (C) Andrew Tridgell August 2010
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+#
+# to run this test, use one of these:
+#
+# python -m testtools.run samba.tests.dcerpc.rpc_talloc
+#
+# or if you have trial installed (from twisted), use
+#
+# trial samba.tests.dcerpc.rpc_talloc
+
+"""Tests for the talloc handling in the generated Python DCE/RPC bindings."""
+
+import sys
+
+sys.path.insert(0, "bin/python")
+
+import samba
+import samba.tests
+from samba.dcerpc import drsuapi
+import talloc
+
+talloc.enable_null_tracking()
+
+
+class TallocTests(samba.tests.TestCase):
+ '''test talloc behaviour of pidl generated python code'''
+
+ def check_blocks(self, object, num_expected):
+ '''check that the number of allocated blocks is correct'''
+ nblocks = talloc.total_blocks(object)
+ if object is None:
+ nblocks -= self.initial_blocks
+ self.assertEquals(nblocks, num_expected)
+
+ def get_rodc_partial_attribute_set(self):
+ '''get a list of attributes for RODC replication'''
+ partial_attribute_set = drsuapi.DsPartialAttributeSet()
+
+ # we expect one block for the object, and one for the structure
+ self.check_blocks(partial_attribute_set, 2)
+
+ attids = [1, 2, 3]
+ partial_attribute_set.version = 1
+ partial_attribute_set.attids = attids
+ partial_attribute_set.num_attids = len(attids)
+
+ # we expect one block object, a structure, an ARRAY, and a
+ # reference to the array
+ self.check_blocks(partial_attribute_set, 3)
+
+ return partial_attribute_set
+
+ def pas_test(self):
+ pas = self.get_rodc_partial_attribute_set()
+ self.check_blocks(pas, 3)
+ req8 = drsuapi.DsGetNCChangesRequest8()
+ self.check_blocks(req8, 2)
+ self.check_blocks(None, 5)
+ req8.partial_attribute_set = pas
+ if req8.partial_attribute_set.attids[1] != 2:
+ raise Exception("Wrong value in attids[2]")
+ # we now get an additional reference
+ self.check_blocks(None, 6)
+
+ def test_run(self):
+ self.initial_blocks = talloc.total_blocks(None)
+ self.check_blocks(None, 0)
+ self.pas_test()
+ self.check_blocks(None, 0)
diff --git a/python/samba/tests/dcerpc/rpcecho.py b/python/samba/tests/dcerpc/rpcecho.py
new file mode 100644
index 0000000000..099f8f619c
--- /dev/null
+++ b/python/samba/tests/dcerpc/rpcecho.py
@@ -0,0 +1,71 @@
+# Unix SMB/CIFS implementation.
+# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2008
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""Tests for samba.dceprc.rpcecho."""
+
+from samba.dcerpc import echo
+from samba.ndr import ndr_pack, ndr_unpack
+from samba.tests import RpcInterfaceTestCase, TestCase
+
+
+class RpcEchoTests(RpcInterfaceTestCase):
+
+ def setUp(self):
+ super(RpcEchoTests, self).setUp()
+ self.conn = echo.rpcecho("ncalrpc:", self.get_loadparm())
+
+ def test_two_contexts(self):
+ self.conn2 = echo.rpcecho("ncalrpc:", self.get_loadparm(), basis_connection=self.conn)
+ self.assertEquals(3, self.conn2.AddOne(2))
+
+ def test_abstract_syntax(self):
+ self.assertEquals(("60a15ec5-4de8-11d7-a637-005056a20182", 1),
+ self.conn.abstract_syntax)
+
+ def test_addone(self):
+ self.assertEquals(2, self.conn.AddOne(1))
+
+ def test_echodata(self):
+ self.assertEquals([1,2,3], self.conn.EchoData([1, 2, 3]))
+
+ def test_call(self):
+ self.assertEquals(u"foobar", self.conn.TestCall(u"foobar"))
+
+ def test_surrounding(self):
+ surrounding_struct = echo.Surrounding()
+ surrounding_struct.x = 4
+ surrounding_struct.surrounding = [1,2,3,4]
+ y = self.conn.TestSurrounding(surrounding_struct)
+ self.assertEquals(8 * [0], y.surrounding)
+
+ def test_manual_request(self):
+ self.assertEquals("\x01\x00\x00\x00", self.conn.request(0, chr(0) * 4))
+
+ def test_server_name(self):
+ self.assertEquals(None, self.conn.server_name)
+
+
+class NdrEchoTests(TestCase):
+
+ def test_info1_push(self):
+ x = echo.info1()
+ x.v = 42
+ self.assertEquals("\x2a", ndr_pack(x))
+
+ def test_info1_pull(self):
+ x = ndr_unpack(echo.info1, "\x42")
+ self.assertEquals(x.v, 66)
diff --git a/python/samba/tests/dcerpc/sam.py b/python/samba/tests/dcerpc/sam.py
new file mode 100644
index 0000000000..0e09323adb
--- /dev/null
+++ b/python/samba/tests/dcerpc/sam.py
@@ -0,0 +1,50 @@
+# -*- coding: utf-8 -*-
+#
+# Unix SMB/CIFS implementation.
+# Copyright © Jelmer Vernooij <jelmer@samba.org> 2008
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""Tests for samba.dcerpc.sam."""
+
+from samba.dcerpc import samr, security
+from samba.tests import RpcInterfaceTestCase
+
+# FIXME: Pidl should be doing this for us
+def toArray((handle, array, num_entries)):
+ ret = []
+ for x in range(num_entries):
+ ret.append((array.entries[x].idx, array.entries[x].name))
+ return ret
+
+
+class SamrTests(RpcInterfaceTestCase):
+
+ def setUp(self):
+ super(SamrTests, self).setUp()
+ self.conn = samr.samr("ncalrpc:", self.get_loadparm())
+
+ def test_connect5(self):
+ (level, info, handle) = self.conn.Connect5(None, 0, 1, samr.ConnectInfo1())
+
+ def test_connect2(self):
+ handle = self.conn.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
+ self.assertTrue(handle is not None)
+
+ def test_EnumDomains(self):
+ handle = self.conn.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
+ domains = toArray(self.conn.EnumDomains(handle, 0, -1))
+ self.conn.Close(handle)
+
diff --git a/python/samba/tests/dcerpc/srvsvc.py b/python/samba/tests/dcerpc/srvsvc.py
new file mode 100644
index 0000000000..3206a27e67
--- /dev/null
+++ b/python/samba/tests/dcerpc/srvsvc.py
@@ -0,0 +1,68 @@
+# -*- coding: utf-8 -*-
+#
+# Unix SMB/CIFS implementation.
+# Copyright © Dhananjay Sathe <dhanajaysathe@gmail.com> 2011
+# Copyright © Jelmer Vernooij <jelmer@samba.org> 2011
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""Tests for samba.dcerpc.srvsvc."""
+
+from samba.dcerpc import srvsvc
+from samba.tests import RpcInterfaceTestCase
+
+
+class SrvsvcTests(RpcInterfaceTestCase):
+
+ def setUp(self):
+ super(SrvsvcTests, self).setUp()
+ self.conn = srvsvc.srvsvc("ncalrpc:", self.get_loadparm())
+ self.server_unc = "\\\\."
+
+ def getDummyShareObject(self):
+ share = srvsvc.NetShareInfo2()
+
+ share.name = u'test'
+ share.comment = u'test share'
+ share.type = srvsvc.STYPE_DISKTREE
+ share.current_users = 0x00000000
+ share.max_users = -1
+ share.password = None
+ share.path = u'C:\\tmp' # some random path
+ share.permissions = 123434566
+ return share
+
+ def test_NetShareAdd(self):
+ self.skip("Dangerous test")
+ share = self.getDummyShareObject()
+ self.conn.NetShareAdd(self.server_unc, 2, share, None)
+
+ def test_NetShareSetInfo(self):
+ self.skip("Dangerous test")
+ share = self.getDummyShareObject()
+ parm_error = 0x00000000
+ self.conn.NetShareAdd(self.server_unc, 502, share, parm_error)
+ name = share.name
+ share.comment = "now sucessfully modified "
+ parm_error = self.pipe.NetShareSetInfo(self.server_unc, name,
+ 502, share, parm_error)
+
+ def test_NetShareDel(self):
+ self.skip("Dangerous test")
+ share = self.getDummyShareObject()
+ parm_error = 0x00000000
+ self.expectFailure("NetShareAdd doesn't work properly from Python",
+ self.conn.NetShareAdd, self.server_unc, 502, share, parm_error)
+ self.conn.NetShareDel(self.server_unc, share.name, 0)
diff --git a/python/samba/tests/dcerpc/testrpc.py b/python/samba/tests/dcerpc/testrpc.py
new file mode 100644
index 0000000000..e35d6b5544
--- /dev/null
+++ b/python/samba/tests/dcerpc/testrpc.py
@@ -0,0 +1,141 @@
+# test generated python code from pidl
+# Copyright (C) Andrew Tridgell August 2010
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+import sys
+
+sys.path.insert(0, "bin/python")
+
+import samba
+import samba.tests
+from samba.dcerpc import drsuapi
+import talloc
+
+talloc.enable_null_tracking()
+
+class RpcTests(object):
+ '''test type behaviour of pidl generated python RPC code'''
+
+ def check_blocks(self, object, num_expected):
+ '''check that the number of allocated blocks is correct'''
+ nblocks = talloc.total_blocks(object)
+ if object is None:
+ nblocks -= self.initial_blocks
+ leaked_blocks = (nblocks - num_expected)
+ if leaked_blocks != 0:
+ print "Leaked %d blocks" % leaked_blocks
+
+ def check_type(self, interface, typename, type):
+ print "Checking type %s" % typename
+ v = type()
+ for n in dir(v):
+ if n[0] == '_':
+ continue
+ try:
+ value = getattr(v, n)
+ except TypeError, errstr:
+ if str(errstr) == "unknown union level":
+ print "ERROR: Unknown union level in %s.%s" % (typename, n)
+ self.errcount += 1
+ continue
+ print str(errstr)[1:21]
+ if str(errstr)[0:21] == "Can not convert C Type":
+ print "ERROR: Unknown C type for %s.%s" % (typename, n)
+ self.errcount += 1
+ continue
+ else:
+ print "ERROR: Failed to instantiate %s.%s" % (typename, n)
+ self.errcount += 1
+ continue
+ except Exception:
+ print "ERROR: Failed to instantiate %s.%s" % (typename, n)
+ self.errcount += 1
+ continue
+
+ # now try setting the value back
+ try:
+ print "Setting %s.%s" % (typename, n)
+ setattr(v, n, value)
+ except Exception, e:
+ if isinstance(e, AttributeError) and str(e).endswith("is read-only"):
+ # readonly, ignore
+ continue
+ else:
+ print "ERROR: Failed to set %s.%s: %r: %s" % (typename, n, e.__class__, e)
+ self.errcount += 1
+ continue
+
+ # and try a comparison
+ try:
+ if value != getattr(v, n):
+ print "ERROR: Comparison failed for %s.%s: %r != %r" % (typename, n, value, getattr(v, n))
+ continue
+ except Exception, e:
+ print "ERROR: compare exception for %s.%s: %r: %s" % (typename, n, e.__class__, e)
+ continue
+
+ def check_interface(self, interface, iname):
+ errcount = self.errcount
+ for n in dir(interface):
+ if n[0] == '_' or n == iname:
+ # skip the special ones
+ continue
+ value = getattr(interface, n)
+ if isinstance(value, str):
+ #print "%s=\"%s\"" % (n, value)
+ pass
+ elif isinstance(value, int) or isinstance(value, long):
+ #print "%s=%d" % (n, value)
+ pass
+ elif isinstance(value, type):
+ try:
+ initial_blocks = talloc.total_blocks(None)
+ self.check_type(interface, n, value)
+ self.check_blocks(None, initial_blocks)
+ except Exception, e:
+ print "ERROR: Failed to check_type %s.%s: %r: %s" % (iname, n, e.__class__, e)
+ self.errcount += 1
+ elif callable(value):
+ pass # Method
+ else:
+ print "UNKNOWN: %s=%s" % (n, value)
+ if self.errcount - errcount != 0:
+ print "Found %d errors in %s" % (self.errcount - errcount, iname)
+
+ def check_all_interfaces(self):
+ for iname in dir(samba.dcerpc):
+ if iname[0] == '_':
+ continue
+ if iname == 'ClientConnection' or iname == 'base':
+ continue
+ print "Checking interface %s" % iname
+ iface = getattr(samba.dcerpc, iname)
+ initial_blocks = talloc.total_blocks(None)
+ self.check_interface(iface, iname)
+ self.check_blocks(None, initial_blocks)
+
+ def run(self):
+ self.initial_blocks = talloc.total_blocks(None)
+ self.errcount = 0
+ self.check_all_interfaces()
+ return self.errcount
+
+tests = RpcTests()
+errcount = tests.run()
+if errcount == 0:
+ sys.exit(0)
+else:
+ print "%d failures" % errcount
+ sys.exit(1)
diff --git a/python/samba/tests/dcerpc/unix.py b/python/samba/tests/dcerpc/unix.py
new file mode 100644
index 0000000000..e8ef4da863
--- /dev/null
+++ b/python/samba/tests/dcerpc/unix.py
@@ -0,0 +1,49 @@
+# Unix SMB/CIFS implementation.
+# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2008
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""Tests for samba.dcerpc.unixinfo."""
+
+
+from samba.dcerpc import unixinfo
+from samba.tests import RpcInterfaceTestCase
+
+class UnixinfoTests(RpcInterfaceTestCase):
+
+ def setUp(self):
+ super(UnixinfoTests, self).setUp()
+ self.conn = unixinfo.unixinfo("ncalrpc:", self.get_loadparm())
+
+ def test_getpwuid_int(self):
+ infos = self.conn.GetPWUid(range(512))
+ self.assertEquals(512, len(infos))
+ self.assertEquals("/bin/false", infos[0].shell)
+ self.assertTrue(isinstance(infos[0].homedir, unicode))
+
+ def test_getpwuid(self):
+ infos = self.conn.GetPWUid(map(long, range(512)))
+ self.assertEquals(512, len(infos))
+ self.assertEquals("/bin/false", infos[0].shell)
+ self.assertTrue(isinstance(infos[0].homedir, unicode))
+
+ def test_gidtosid(self):
+ self.conn.GidToSid(1000L)
+
+ def test_uidtosid(self):
+ self.conn.UidToSid(1000)
+
+ def test_uidtosid_fail(self):
+ self.assertRaises(TypeError, self.conn.UidToSid, "100")