diff options
| author | Jelmer Vernooij <jelmer@samba.org> | 2012-12-28 15:37:14 +0100 |
|---|---|---|
| committer | Andrew Bartlett <abartlet@samba.org> | 2013-03-02 03:57:34 +0100 |
| commit | 87afc3aee1ea593069322a49355dd8780d99e123 (patch) | |
| tree | 8e1ea6678d93b53f21b34c4940b7d5a64e0f5020 /python/samba/tests/dcerpc | |
| parent | 80fce353e740c793619005ac102ab07fb5e7d280 (diff) | |
| download | samba-87afc3aee1ea593069322a49355dd8780d99e123.tar.gz samba-87afc3aee1ea593069322a49355dd8780d99e123.tar.xz samba-87afc3aee1ea593069322a49355dd8780d99e123.zip | |
Move python modules from source4/scripting/python/ to python/.
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Autobuild-User(master): Andrew Bartlett <abartlet@samba.org>
Autobuild-Date(master): Sat Mar 2 03:57:34 CET 2013 on sn-devel-104
Diffstat (limited to 'python/samba/tests/dcerpc')
| -rw-r--r-- | python/samba/tests/dcerpc/__init__.py | 20 | ||||
| -rw-r--r-- | python/samba/tests/dcerpc/bare.py | 51 | ||||
| -rw-r--r-- | python/samba/tests/dcerpc/dnsserver.py | 241 | ||||
| -rw-r--r-- | python/samba/tests/dcerpc/misc.py | 62 | ||||
| -rw-r--r-- | python/samba/tests/dcerpc/registry.py | 51 | ||||
| -rw-r--r-- | python/samba/tests/dcerpc/rpc_talloc.py | 84 | ||||
| -rw-r--r-- | python/samba/tests/dcerpc/rpcecho.py | 71 | ||||
| -rw-r--r-- | python/samba/tests/dcerpc/sam.py | 50 | ||||
| -rw-r--r-- | python/samba/tests/dcerpc/srvsvc.py | 68 | ||||
| -rw-r--r-- | python/samba/tests/dcerpc/testrpc.py | 141 | ||||
| -rw-r--r-- | python/samba/tests/dcerpc/unix.py | 49 |
11 files changed, 888 insertions, 0 deletions
diff --git a/python/samba/tests/dcerpc/__init__.py b/python/samba/tests/dcerpc/__init__.py new file mode 100644 index 0000000000..d84cb57a09 --- /dev/null +++ b/python/samba/tests/dcerpc/__init__.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# +# Unix SMB/CIFS implementation. +# Copyright © Jelmer Vernooij <jelmer@samba.org> 2008 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +"""Tests for the DCE/RPC Python bindings.""" + diff --git a/python/samba/tests/dcerpc/bare.py b/python/samba/tests/dcerpc/bare.py new file mode 100644 index 0000000000..3efbf9d4cf --- /dev/null +++ b/python/samba/tests/dcerpc/bare.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +# +# Unix SMB/CIFS implementation. +# Copyright © Jelmer Vernooij <jelmer@samba.org> 2008 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.tests.dcerpc.bare.""" + +from samba.dcerpc import ClientConnection +import samba.tests + +class BareTestCase(samba.tests.TestCase): + + def test_bare(self): + # Connect to the echo pipe + x = ClientConnection("ncalrpc:localhost[DEFAULT]", + ("60a15ec5-4de8-11d7-a637-005056a20182", 1), + lp_ctx=samba.tests.env_loadparm()) + self.assertEquals("\x01\x00\x00\x00", x.request(0, chr(0) * 4)) + + def test_alter_context(self): + x = ClientConnection("ncalrpc:localhost[DEFAULT]", + ("12345778-1234-abcd-ef00-0123456789ac", 1), + lp_ctx=samba.tests.env_loadparm()) + y = ClientConnection("ncalrpc:localhost", + ("60a15ec5-4de8-11d7-a637-005056a20182", 1), + basis_connection=x, lp_ctx=samba.tests.env_loadparm()) + x.alter_context(("60a15ec5-4de8-11d7-a637-005056a20182", 1)) + # FIXME: self.assertEquals("\x01\x00\x00\x00", x.request(0, chr(0) * 4)) + + def test_two_connections(self): + x = ClientConnection("ncalrpc:localhost[DEFAULT]", + ("60a15ec5-4de8-11d7-a637-005056a20182", 1), + lp_ctx=samba.tests.env_loadparm()) + y = ClientConnection("ncalrpc:localhost", + ("60a15ec5-4de8-11d7-a637-005056a20182", 1), + basis_connection=x, lp_ctx=samba.tests.env_loadparm()) + self.assertEquals("\x01\x00\x00\x00", y.request(0, chr(0) * 4)) diff --git a/python/samba/tests/dcerpc/dnsserver.py b/python/samba/tests/dcerpc/dnsserver.py new file mode 100644 index 0000000000..59d6eee761 --- /dev/null +++ b/python/samba/tests/dcerpc/dnsserver.py @@ -0,0 +1,241 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Amitay Isaacs <amitay@gmail.com> 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.dcerpc.dnsserver""" + +from samba.dcerpc import dnsp, dnsserver +from samba.tests import RpcInterfaceTestCase, env_get_var_value +from samba.netcmd.dns import ARecord + +class DnsserverTests(RpcInterfaceTestCase): + + def setUp(self): + super(DnsserverTests, self).setUp() + self.server = env_get_var_value("SERVER_IP") + self.zone = env_get_var_value("REALM").lower() + self.conn = dnsserver.dnsserver("ncacn_ip_tcp:%s" % (self.server), + self.get_loadparm(), + self.get_credentials()) + + def test_operation2(self): + pass + + + def test_query2(self): + typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_W2K, + 0, + self.server, + None, + 'ServerInfo') + self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO_W2K, typeid) + + typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_DOTNET, + 0, + self.server, + None, + 'ServerInfo') + self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO_DOTNET, typeid) + + typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server, + None, + 'ServerInfo') + self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO, typeid) + + def test_operation2(self): + client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN + rev_zone = '1.168.192.in-addr.arpa' + + zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN() + zone_create.pszZoneName = rev_zone + zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY + zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE + zone_create.fAging = 0 + zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT + + # Create zone + self.conn.DnssrvOperation2(client_version, + 0, + self.server, + None, + 0, + 'ZoneCreate', + dnsserver.DNSSRV_TYPEID_ZONE_CREATE, + zone_create) + + request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE | + dnsserver.DNS_ZONE_REQUEST_PRIMARY) + typeid, zones = self.conn.DnssrvComplexOperation2(client_version, + 0, + self.server, + None, + 'EnumZones', + dnsserver.DNSSRV_TYPEID_DWORD, + request_filter) + self.assertEquals(1, zones.dwZoneCount) + + # Delete zone + self.conn.DnssrvOperation2(client_version, + 0, + self.server, + rev_zone, + 0, + 'DeleteZoneFromDs', + dnsserver.DNSSRV_TYPEID_NULL, + None) + + typeid, zones = self.conn.DnssrvComplexOperation2(client_version, + 0, + self.server, + None, + 'EnumZones', + dnsserver.DNSSRV_TYPEID_DWORD, + request_filter) + self.assertEquals(0, zones.dwZoneCount) + + + def test_complexoperation2(self): + client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN + request_filter = (dnsserver.DNS_ZONE_REQUEST_FORWARD | + dnsserver.DNS_ZONE_REQUEST_PRIMARY) + typeid, zones = self.conn.DnssrvComplexOperation2(client_version, + 0, + self.server, + None, + 'EnumZones', + dnsserver.DNSSRV_TYPEID_DWORD, + request_filter) + self.assertEquals(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid) + self.assertEquals(2, zones.dwZoneCount) + + request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE | + dnsserver.DNS_ZONE_REQUEST_PRIMARY) + typeid, zones = self.conn.DnssrvComplexOperation2(client_version, + 0, + self.server, + None, + 'EnumZones', + dnsserver.DNSSRV_TYPEID_DWORD, + request_filter) + self.assertEquals(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid) + self.assertEquals(0, zones.dwZoneCount) + + + def test_enumrecords2(self): + client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN + record_type = dnsp.DNS_TYPE_NS + select_flags = (dnsserver.DNS_RPC_VIEW_ROOT_HINT_DATA | + dnsserver.DNS_RPC_VIEW_ADDITIONAL_DATA) + buflen, roothints = self.conn.DnssrvEnumRecords2(client_version, + 0, + self.server, + '..RootHints', + '.', + None, + record_type, + select_flags, + None, + None) + self.assertEquals(14, roothints.count) # 1 NS + 13 A records (a-m) + + + def test_updaterecords2(self): + client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN + record_type = dnsp.DNS_TYPE_A + select_flags = dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA + + name = 'dummy' + rec = ARecord('1.2.3.4') + rec2 = ARecord('5.6.7.8') + + # Add record + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec + self.conn.DnssrvUpdateRecord2(client_version, + 0, + self.server, + self.zone, + name, + add_rec_buf, + None) + + buflen, result = self.conn.DnssrvEnumRecords2(client_version, + 0, + self.server, + self.zone, + name, + None, + record_type, + select_flags, + None, + None) + self.assertEquals(1, result.count) + self.assertEquals(1, result.rec[0].wRecordCount) + self.assertEquals(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType) + self.assertEquals('1.2.3.4', result.rec[0].records[0].data) + + # Update record + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec2 + del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + del_rec_buf.rec = rec + self.conn.DnssrvUpdateRecord2(client_version, + 0, + self.server, + self.zone, + name, + add_rec_buf, + del_rec_buf) + + buflen, result = self.conn.DnssrvEnumRecords2(client_version, + 0, + self.server, + self.zone, + name, + None, + record_type, + select_flags, + None, + None) + self.assertEquals(1, result.count) + self.assertEquals(1, result.rec[0].wRecordCount) + self.assertEquals(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType) + self.assertEquals('5.6.7.8', result.rec[0].records[0].data) + + # Delete record + del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + del_rec_buf.rec = rec2 + self.conn.DnssrvUpdateRecord2(client_version, + 0, + self.server, + self.zone, + name, + None, + del_rec_buf) + + self.assertRaises(RuntimeError, self.conn.DnssrvEnumRecords2, + client_version, + 0, + self.server, + self.zone, + name, + None, + record_type, + select_flags, + None, + None) diff --git a/python/samba/tests/dcerpc/misc.py b/python/samba/tests/dcerpc/misc.py new file mode 100644 index 0000000000..11e14aadfa --- /dev/null +++ b/python/samba/tests/dcerpc/misc.py @@ -0,0 +1,62 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.dcerpc.misc.""" + +from samba.dcerpc import misc +import samba.tests + +text1 = "76f53846-a7c2-476a-ae2c-20e2b80d7b34" +text2 = "344edffa-330a-4b39-b96e-2c34da52e8b1" + +class GUIDTests(samba.tests.TestCase): + + def test_str(self): + guid = misc.GUID(text1) + self.assertEquals(text1, str(guid)) + + def test_repr(self): + guid = misc.GUID(text1) + self.assertEquals("GUID('%s')" % text1, repr(guid)) + + def test_compare_different(self): + guid1 = misc.GUID(text1) + guid2 = misc.GUID(text2) + self.assertTrue(cmp(guid1, guid2) > 0) + + def test_compare_same(self): + guid1 = misc.GUID(text1) + guid2 = misc.GUID(text1) + self.assertEquals(0, cmp(guid1, guid2)) + self.assertEquals(guid1, guid2) + + +class PolicyHandleTests(samba.tests.TestCase): + + def test_init(self): + x = misc.policy_handle(text1, 1) + self.assertEquals(1, x.handle_type) + self.assertEquals(text1, str(x.uuid)) + + def test_repr(self): + x = misc.policy_handle(text1, 42) + self.assertEquals("policy_handle(%d, '%s')" % (42, text1), repr(x)) + + def test_str(self): + x = misc.policy_handle(text1, 42) + self.assertEquals("%d, %s" % (42, text1), str(x)) + diff --git a/python/samba/tests/dcerpc/registry.py b/python/samba/tests/dcerpc/registry.py new file mode 100644 index 0000000000..c7bcbfd530 --- /dev/null +++ b/python/samba/tests/dcerpc/registry.py @@ -0,0 +1,51 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2008 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.dcerpc.registry.""" + +from samba.dcerpc import winreg +from samba.tests import RpcInterfaceTestCase + + +class WinregTests(RpcInterfaceTestCase): + + def setUp(self): + super(WinregTests, self).setUp() + self.conn = winreg.winreg("ncalrpc:", self.get_loadparm(), + self.get_credentials()) + + def get_hklm(self): + return self.conn.OpenHKLM(None, + winreg.KEY_QUERY_VALUE | winreg.KEY_ENUMERATE_SUB_KEYS) + + def test_hklm(self): + handle = self.conn.OpenHKLM(None, + winreg.KEY_QUERY_VALUE | winreg.KEY_ENUMERATE_SUB_KEYS) + self.conn.CloseKey(handle) + + def test_getversion(self): + handle = self.get_hklm() + version = self.conn.GetVersion(handle) + self.assertEquals(int, version.__class__) + self.conn.CloseKey(handle) + + def test_getkeyinfo(self): + handle = self.conn.OpenHKLM(None, + winreg.KEY_QUERY_VALUE | winreg.KEY_ENUMERATE_SUB_KEYS) + x = self.conn.QueryInfoKey(handle, winreg.String()) + self.assertEquals(9, len(x)) # should return a 9-tuple + self.conn.CloseKey(handle) diff --git a/python/samba/tests/dcerpc/rpc_talloc.py b/python/samba/tests/dcerpc/rpc_talloc.py new file mode 100644 index 0000000000..c091f26c1b --- /dev/null +++ b/python/samba/tests/dcerpc/rpc_talloc.py @@ -0,0 +1,84 @@ +# test generated python code from pidl +# Copyright (C) Andrew Tridgell August 2010 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +# +# to run this test, use one of these: +# +# python -m testtools.run samba.tests.dcerpc.rpc_talloc +# +# or if you have trial installed (from twisted), use +# +# trial samba.tests.dcerpc.rpc_talloc + +"""Tests for the talloc handling in the generated Python DCE/RPC bindings.""" + +import sys + +sys.path.insert(0, "bin/python") + +import samba +import samba.tests +from samba.dcerpc import drsuapi +import talloc + +talloc.enable_null_tracking() + + +class TallocTests(samba.tests.TestCase): + '''test talloc behaviour of pidl generated python code''' + + def check_blocks(self, object, num_expected): + '''check that the number of allocated blocks is correct''' + nblocks = talloc.total_blocks(object) + if object is None: + nblocks -= self.initial_blocks + self.assertEquals(nblocks, num_expected) + + def get_rodc_partial_attribute_set(self): + '''get a list of attributes for RODC replication''' + partial_attribute_set = drsuapi.DsPartialAttributeSet() + + # we expect one block for the object, and one for the structure + self.check_blocks(partial_attribute_set, 2) + + attids = [1, 2, 3] + partial_attribute_set.version = 1 + partial_attribute_set.attids = attids + partial_attribute_set.num_attids = len(attids) + + # we expect one block object, a structure, an ARRAY, and a + # reference to the array + self.check_blocks(partial_attribute_set, 3) + + return partial_attribute_set + + def pas_test(self): + pas = self.get_rodc_partial_attribute_set() + self.check_blocks(pas, 3) + req8 = drsuapi.DsGetNCChangesRequest8() + self.check_blocks(req8, 2) + self.check_blocks(None, 5) + req8.partial_attribute_set = pas + if req8.partial_attribute_set.attids[1] != 2: + raise Exception("Wrong value in attids[2]") + # we now get an additional reference + self.check_blocks(None, 6) + + def test_run(self): + self.initial_blocks = talloc.total_blocks(None) + self.check_blocks(None, 0) + self.pas_test() + self.check_blocks(None, 0) diff --git a/python/samba/tests/dcerpc/rpcecho.py b/python/samba/tests/dcerpc/rpcecho.py new file mode 100644 index 0000000000..099f8f619c --- /dev/null +++ b/python/samba/tests/dcerpc/rpcecho.py @@ -0,0 +1,71 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2008 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.dceprc.rpcecho.""" + +from samba.dcerpc import echo +from samba.ndr import ndr_pack, ndr_unpack +from samba.tests import RpcInterfaceTestCase, TestCase + + +class RpcEchoTests(RpcInterfaceTestCase): + + def setUp(self): + super(RpcEchoTests, self).setUp() + self.conn = echo.rpcecho("ncalrpc:", self.get_loadparm()) + + def test_two_contexts(self): + self.conn2 = echo.rpcecho("ncalrpc:", self.get_loadparm(), basis_connection=self.conn) + self.assertEquals(3, self.conn2.AddOne(2)) + + def test_abstract_syntax(self): + self.assertEquals(("60a15ec5-4de8-11d7-a637-005056a20182", 1), + self.conn.abstract_syntax) + + def test_addone(self): + self.assertEquals(2, self.conn.AddOne(1)) + + def test_echodata(self): + self.assertEquals([1,2,3], self.conn.EchoData([1, 2, 3])) + + def test_call(self): + self.assertEquals(u"foobar", self.conn.TestCall(u"foobar")) + + def test_surrounding(self): + surrounding_struct = echo.Surrounding() + surrounding_struct.x = 4 + surrounding_struct.surrounding = [1,2,3,4] + y = self.conn.TestSurrounding(surrounding_struct) + self.assertEquals(8 * [0], y.surrounding) + + def test_manual_request(self): + self.assertEquals("\x01\x00\x00\x00", self.conn.request(0, chr(0) * 4)) + + def test_server_name(self): + self.assertEquals(None, self.conn.server_name) + + +class NdrEchoTests(TestCase): + + def test_info1_push(self): + x = echo.info1() + x.v = 42 + self.assertEquals("\x2a", ndr_pack(x)) + + def test_info1_pull(self): + x = ndr_unpack(echo.info1, "\x42") + self.assertEquals(x.v, 66) diff --git a/python/samba/tests/dcerpc/sam.py b/python/samba/tests/dcerpc/sam.py new file mode 100644 index 0000000000..0e09323adb --- /dev/null +++ b/python/samba/tests/dcerpc/sam.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +# +# Unix SMB/CIFS implementation. +# Copyright © Jelmer Vernooij <jelmer@samba.org> 2008 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.dcerpc.sam.""" + +from samba.dcerpc import samr, security +from samba.tests import RpcInterfaceTestCase + +# FIXME: Pidl should be doing this for us +def toArray((handle, array, num_entries)): + ret = [] + for x in range(num_entries): + ret.append((array.entries[x].idx, array.entries[x].name)) + return ret + + +class SamrTests(RpcInterfaceTestCase): + + def setUp(self): + super(SamrTests, self).setUp() + self.conn = samr.samr("ncalrpc:", self.get_loadparm()) + + def test_connect5(self): + (level, info, handle) = self.conn.Connect5(None, 0, 1, samr.ConnectInfo1()) + + def test_connect2(self): + handle = self.conn.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED) + self.assertTrue(handle is not None) + + def test_EnumDomains(self): + handle = self.conn.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED) + domains = toArray(self.conn.EnumDomains(handle, 0, -1)) + self.conn.Close(handle) + diff --git a/python/samba/tests/dcerpc/srvsvc.py b/python/samba/tests/dcerpc/srvsvc.py new file mode 100644 index 0000000000..3206a27e67 --- /dev/null +++ b/python/samba/tests/dcerpc/srvsvc.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- +# +# Unix SMB/CIFS implementation. +# Copyright © Dhananjay Sathe <dhanajaysathe@gmail.com> 2011 +# Copyright © Jelmer Vernooij <jelmer@samba.org> 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.dcerpc.srvsvc.""" + +from samba.dcerpc import srvsvc +from samba.tests import RpcInterfaceTestCase + + +class SrvsvcTests(RpcInterfaceTestCase): + + def setUp(self): + super(SrvsvcTests, self).setUp() + self.conn = srvsvc.srvsvc("ncalrpc:", self.get_loadparm()) + self.server_unc = "\\\\." + + def getDummyShareObject(self): + share = srvsvc.NetShareInfo2() + + share.name = u'test' + share.comment = u'test share' + share.type = srvsvc.STYPE_DISKTREE + share.current_users = 0x00000000 + share.max_users = -1 + share.password = None + share.path = u'C:\\tmp' # some random path + share.permissions = 123434566 + return share + + def test_NetShareAdd(self): + self.skip("Dangerous test") + share = self.getDummyShareObject() + self.conn.NetShareAdd(self.server_unc, 2, share, None) + + def test_NetShareSetInfo(self): + self.skip("Dangerous test") + share = self.getDummyShareObject() + parm_error = 0x00000000 + self.conn.NetShareAdd(self.server_unc, 502, share, parm_error) + name = share.name + share.comment = "now sucessfully modified " + parm_error = self.pipe.NetShareSetInfo(self.server_unc, name, + 502, share, parm_error) + + def test_NetShareDel(self): + self.skip("Dangerous test") + share = self.getDummyShareObject() + parm_error = 0x00000000 + self.expectFailure("NetShareAdd doesn't work properly from Python", + self.conn.NetShareAdd, self.server_unc, 502, share, parm_error) + self.conn.NetShareDel(self.server_unc, share.name, 0) diff --git a/python/samba/tests/dcerpc/testrpc.py b/python/samba/tests/dcerpc/testrpc.py new file mode 100644 index 0000000000..e35d6b5544 --- /dev/null +++ b/python/samba/tests/dcerpc/testrpc.py @@ -0,0 +1,141 @@ +# test generated python code from pidl +# Copyright (C) Andrew Tridgell August 2010 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +import sys + +sys.path.insert(0, "bin/python") + +import samba +import samba.tests +from samba.dcerpc import drsuapi +import talloc + +talloc.enable_null_tracking() + +class RpcTests(object): + '''test type behaviour of pidl generated python RPC code''' + + def check_blocks(self, object, num_expected): + '''check that the number of allocated blocks is correct''' + nblocks = talloc.total_blocks(object) + if object is None: + nblocks -= self.initial_blocks + leaked_blocks = (nblocks - num_expected) + if leaked_blocks != 0: + print "Leaked %d blocks" % leaked_blocks + + def check_type(self, interface, typename, type): + print "Checking type %s" % typename + v = type() + for n in dir(v): + if n[0] == '_': + continue + try: + value = getattr(v, n) + except TypeError, errstr: + if str(errstr) == "unknown union level": + print "ERROR: Unknown union level in %s.%s" % (typename, n) + self.errcount += 1 + continue + print str(errstr)[1:21] + if str(errstr)[0:21] == "Can not convert C Type": + print "ERROR: Unknown C type for %s.%s" % (typename, n) + self.errcount += 1 + continue + else: + print "ERROR: Failed to instantiate %s.%s" % (typename, n) + self.errcount += 1 + continue + except Exception: + print "ERROR: Failed to instantiate %s.%s" % (typename, n) + self.errcount += 1 + continue + + # now try setting the value back + try: + print "Setting %s.%s" % (typename, n) + setattr(v, n, value) + except Exception, e: + if isinstance(e, AttributeError) and str(e).endswith("is read-only"): + # readonly, ignore + continue + else: + print "ERROR: Failed to set %s.%s: %r: %s" % (typename, n, e.__class__, e) + self.errcount += 1 + continue + + # and try a comparison + try: + if value != getattr(v, n): + print "ERROR: Comparison failed for %s.%s: %r != %r" % (typename, n, value, getattr(v, n)) + continue + except Exception, e: + print "ERROR: compare exception for %s.%s: %r: %s" % (typename, n, e.__class__, e) + continue + + def check_interface(self, interface, iname): + errcount = self.errcount + for n in dir(interface): + if n[0] == '_' or n == iname: + # skip the special ones + continue + value = getattr(interface, n) + if isinstance(value, str): + #print "%s=\"%s\"" % (n, value) + pass + elif isinstance(value, int) or isinstance(value, long): + #print "%s=%d" % (n, value) + pass + elif isinstance(value, type): + try: + initial_blocks = talloc.total_blocks(None) + self.check_type(interface, n, value) + self.check_blocks(None, initial_blocks) + except Exception, e: + print "ERROR: Failed to check_type %s.%s: %r: %s" % (iname, n, e.__class__, e) + self.errcount += 1 + elif callable(value): + pass # Method + else: + print "UNKNOWN: %s=%s" % (n, value) + if self.errcount - errcount != 0: + print "Found %d errors in %s" % (self.errcount - errcount, iname) + + def check_all_interfaces(self): + for iname in dir(samba.dcerpc): + if iname[0] == '_': + continue + if iname == 'ClientConnection' or iname == 'base': + continue + print "Checking interface %s" % iname + iface = getattr(samba.dcerpc, iname) + initial_blocks = talloc.total_blocks(None) + self.check_interface(iface, iname) + self.check_blocks(None, initial_blocks) + + def run(self): + self.initial_blocks = talloc.total_blocks(None) + self.errcount = 0 + self.check_all_interfaces() + return self.errcount + +tests = RpcTests() +errcount = tests.run() +if errcount == 0: + sys.exit(0) +else: + print "%d failures" % errcount + sys.exit(1) diff --git a/python/samba/tests/dcerpc/unix.py b/python/samba/tests/dcerpc/unix.py new file mode 100644 index 0000000000..e8ef4da863 --- /dev/null +++ b/python/samba/tests/dcerpc/unix.py @@ -0,0 +1,49 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2008 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.dcerpc.unixinfo.""" + + +from samba.dcerpc import unixinfo +from samba.tests import RpcInterfaceTestCase + +class UnixinfoTests(RpcInterfaceTestCase): + + def setUp(self): + super(UnixinfoTests, self).setUp() + self.conn = unixinfo.unixinfo("ncalrpc:", self.get_loadparm()) + + def test_getpwuid_int(self): + infos = self.conn.GetPWUid(range(512)) + self.assertEquals(512, len(infos)) + self.assertEquals("/bin/false", infos[0].shell) + self.assertTrue(isinstance(infos[0].homedir, unicode)) + + def test_getpwuid(self): + infos = self.conn.GetPWUid(map(long, range(512))) + self.assertEquals(512, len(infos)) + self.assertEquals("/bin/false", infos[0].shell) + self.assertTrue(isinstance(infos[0].homedir, unicode)) + + def test_gidtosid(self): + self.conn.GidToSid(1000L) + + def test_uidtosid(self): + self.conn.UidToSid(1000) + + def test_uidtosid_fail(self): + self.assertRaises(TypeError, self.conn.UidToSid, "100") |
