summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ipalib/__init__.py5
-rw-r--r--ipalib/ipauuid.py541
-rw-r--r--ipalib/plugins/f_host.py6
-rw-r--r--ipalib/plugins/f_hostgroup.py2
-rw-r--r--ipalib/plugins/f_netgroup.py461
-rw-r--r--ipaserver/ipaldap.py13
-rw-r--r--ipaserver/plugins/b_ldap.py16
-rw-r--r--ipaserver/servercore.py21
-rw-r--r--ipaserver/updates/host.update7
-rw-r--r--tests/test_xmlrpc/test_group_plugin.py29
-rw-r--r--tests/test_xmlrpc/test_netgroup_plugin.py320
-rw-r--r--tests/test_xmlrpc/test_user_plugin.py9
-rw-r--r--tests/test_xmlrpc/xmlrpc_test.py1
13 files changed, 1407 insertions, 24 deletions
diff --git a/ipalib/__init__.py b/ipalib/__init__.py
index 8abb90292..29344e182 100644
--- a/ipalib/__init__.py
+++ b/ipalib/__init__.py
@@ -877,6 +877,11 @@ from frontend import Object, Method, Property
from parameters import DefaultFrom, Bool, Flag, Int, Float, Bytes, Str, Password
from parameters import BytesEnum, StrEnum
+try:
+ import uuid
+except ImportError:
+ import ipauuid as uuid
+
def create_api(mode='dummy'):
"""
Return standard `plugable.API` instance.
diff --git a/ipalib/ipauuid.py b/ipalib/ipauuid.py
new file mode 100644
index 000000000..457639d83
--- /dev/null
+++ b/ipalib/ipauuid.py
@@ -0,0 +1,541 @@
+r"""UUID objects (universally unique identifiers) according to RFC 4122.
+
+This module provides immutable UUID objects (class UUID) and the functions
+uuid1(), uuid3(), uuid4(), uuid5() for generating version 1, 3, 4, and 5
+UUIDs as specified in RFC 4122.
+
+If all you want is a unique ID, you should probably call uuid1() or uuid4().
+Note that uuid1() may compromise privacy since it creates a UUID containing
+the computer's network address. uuid4() creates a random UUID.
+
+Typical usage:
+
+ >>> import uuid
+
+ # make a UUID based on the host ID and current time
+ >>> uuid.uuid1() #doctest: +SKIP
+ UUID('a8098c1a-f86e-11da-bd1a-00112444be1e')
+
+ # make a UUID using an MD5 hash of a namespace UUID and a name
+ >>> uuid.uuid3(uuid.NAMESPACE_DNS, 'python.org') #doctest: +SKIP
+ UUID('6fa459ea-ee8a-3ca4-894e-db77e160355e')
+
+ # make a random UUID
+ >>> uuid.uuid4() #doctest: +SKIP
+ UUID('16fd2706-8baf-433b-82eb-8c7fada847da')
+
+ # make a UUID using a SHA-1 hash of a namespace UUID and a name
+ >>> uuid.uuid5(uuid.NAMESPACE_DNS, 'python.org') #doctest: +SKIP
+ UUID('886313e1-3b8a-5372-9b90-0c9aee199e5d')
+
+ # make a UUID from a string of hex digits (braces and hyphens ignored)
+ >>> x = uuid.UUID('{00010203-0405-0607-0809-0a0b0c0d0e0f}') #doctest: +SKIP
+
+ # convert a UUID to a string of hex digits in standard form
+ >>> str(x) #doctest: +SKIP
+ '00010203-0405-0607-0809-0a0b0c0d0e0f'
+
+ # get the raw 16 bytes of the UUID
+ >>> x.bytes #doctest: +SKIP
+ '\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f'
+
+ # make a UUID from a 16-byte string
+ >>> uuid.UUID(bytes=x.bytes) #doctest: +SKIP
+ UUID('00010203-0405-0607-0809-0a0b0c0d0e0f')
+"""
+
+__author__ = 'Ka-Ping Yee <ping@zesty.ca>'
+
+RESERVED_NCS, RFC_4122, RESERVED_MICROSOFT, RESERVED_FUTURE = [
+ 'reserved for NCS compatibility', 'specified in RFC 4122',
+ 'reserved for Microsoft compatibility', 'reserved for future definition']
+
+class UUID(object):
+ """Instances of the UUID class represent UUIDs as specified in RFC 4122.
+ UUID objects are immutable, hashable, and usable as dictionary keys.
+ Converting a UUID to a string with str() yields something in the form
+ '12345678-1234-1234-1234-123456789abc'. The UUID constructor accepts
+ five possible forms: a similar string of hexadecimal digits, or a tuple
+ of six integer fields (with 32-bit, 16-bit, 16-bit, 8-bit, 8-bit, and
+ 48-bit values respectively) as an argument named 'fields', or a string
+ of 16 bytes (with all the integer fields in big-endian order) as an
+ argument named 'bytes', or a string of 16 bytes (with the first three
+ fields in little-endian order) as an argument named 'bytes_le', or a
+ single 128-bit integer as an argument named 'int'.
+
+ UUIDs have these read-only attributes:
+
+ bytes the UUID as a 16-byte string (containing the six
+ integer fields in big-endian byte order)
+
+ bytes_le the UUID as a 16-byte string (with time_low, time_mid,
+ and time_hi_version in little-endian byte order)
+
+ fields a tuple of the six integer fields of the UUID,
+ which are also available as six individual attributes
+ and two derived attributes:
+
+ time_low the first 32 bits of the UUID
+ time_mid the next 16 bits of the UUID
+ time_hi_version the next 16 bits of the UUID
+ clock_seq_hi_variant the next 8 bits of the UUID
+ clock_seq_low the next 8 bits of the UUID
+ node the last 48 bits of the UUID
+
+ time the 60-bit timestamp
+ clock_seq the 14-bit sequence number
+
+ hex the UUID as a 32-character hexadecimal string
+
+ int the UUID as a 128-bit integer
+
+ urn the UUID as a URN as specified in RFC 4122
+
+ variant the UUID variant (one of the constants RESERVED_NCS,
+ RFC_4122, RESERVED_MICROSOFT, or RESERVED_FUTURE)
+
+ version the UUID version number (1 through 5, meaningful only
+ when the variant is RFC_4122)
+ """
+
+ def __init__(self, hex=None, bytes=None, bytes_le=None, fields=None,
+ int=None, version=None):
+ r"""Create a UUID from either a string of 32 hexadecimal digits,
+ a string of 16 bytes as the 'bytes' argument, a string of 16 bytes
+ in little-endian order as the 'bytes_le' argument, a tuple of six
+ integers (32-bit time_low, 16-bit time_mid, 16-bit time_hi_version,
+ 8-bit clock_seq_hi_variant, 8-bit clock_seq_low, 48-bit node) as
+ the 'fields' argument, or a single 128-bit integer as the 'int'
+ argument. When a string of hex digits is given, curly braces,
+ hyphens, and a URN prefix are all optional. For example, these
+ expressions all yield the same UUID:
+
+ UUID('{12345678-1234-5678-1234-567812345678}')
+ UUID('12345678123456781234567812345678')
+ UUID('urn:uuid:12345678-1234-5678-1234-567812345678')
+ UUID(bytes='\x12\x34\x56\x78'*4)
+ UUID(bytes_le='\x78\x56\x34\x12\x34\x12\x78\x56' +
+ '\x12\x34\x56\x78\x12\x34\x56\x78')
+ UUID(fields=(0x12345678, 0x1234, 0x5678, 0x12, 0x34, 0x567812345678))
+ UUID(int=0x12345678123456781234567812345678)
+
+ Exactly one of 'hex', 'bytes', 'bytes_le', 'fields', or 'int' must
+ be given. The 'version' argument is optional; if given, the resulting
+ UUID will have its variant and version set according to RFC 4122,
+ overriding the given 'hex', 'bytes', 'bytes_le', 'fields', or 'int'.
+ """
+
+ if [hex, bytes, bytes_le, fields, int].count(None) != 4:
+ raise TypeError('need one of hex, bytes, bytes_le, fields, or int')
+ if hex is not None:
+ hex = hex.replace('urn:', '').replace('uuid:', '')
+ hex = hex.strip('{}').replace('-', '')
+ if len(hex) != 32:
+ raise ValueError('badly formed hexadecimal UUID string')
+ int = long(hex, 16)
+ if bytes_le is not None:
+ if len(bytes_le) != 16:
+ raise ValueError('bytes_le is not a 16-char string')
+ bytes = (bytes_le[3] + bytes_le[2] + bytes_le[1] + bytes_le[0] +
+ bytes_le[5] + bytes_le[4] + bytes_le[7] + bytes_le[6] +
+ bytes_le[8:])
+ if bytes is not None:
+ if len(bytes) != 16:
+ raise ValueError('bytes is not a 16-char string')
+ int = long(('%02x'*16) % tuple(map(ord, bytes)), 16)
+ if fields is not None:
+ if len(fields) != 6:
+ raise ValueError('fields is not a 6-tuple')
+ (time_low, time_mid, time_hi_version,
+ clock_seq_hi_variant, clock_seq_low, node) = fields
+ if not 0 <= time_low < 1<<32L:
+ raise ValueError('field 1 out of range (need a 32-bit value)')
+ if not 0 <= time_mid < 1<<16L:
+ raise ValueError('field 2 out of range (need a 16-bit value)')
+ if not 0 <= time_hi_version < 1<<16L:
+ raise ValueError('field 3 out of range (need a 16-bit value)')
+ if not 0 <= clock_seq_hi_variant < 1<<8L:
+ raise ValueError('field 4 out of range (need an 8-bit value)')
+ if not 0 <= clock_seq_low < 1<<8L:
+ raise ValueError('field 5 out of range (need an 8-bit value)')
+ if not 0 <= node < 1<<48L:
+ raise ValueError('field 6 out of range (need a 48-bit value)')
+ clock_seq = (clock_seq_hi_variant << 8L) | clock_seq_low
+ int = ((time_low << 96L) | (time_mid << 80L) |
+ (time_hi_version << 64L) | (clock_seq << 48L) | node)
+ if int is not None:
+ if not 0 <= int < 1<<128L:
+ raise ValueError('int is out of range (need a 128-bit value)')
+ if version is not None:
+ if not 1 <= version <= 5:
+ raise ValueError('illegal version number')
+ # Set the variant to RFC 4122.
+ int &= ~(0xc000 << 48L)
+ int |= 0x8000 << 48L
+ # Set the version number.
+ int &= ~(0xf000 << 64L)
+ int |= version << 76L
+ self.__dict__['int'] = int
+
+ def __cmp__(self, other):
+ if isinstance(other, UUID):
+ return cmp(self.int, other.int)
+ return NotImplemented
+
+ def __hash__(self):
+ return hash(self.int)
+
+ def __int__(self):
+ return self.int
+
+ def __repr__(self):
+ return 'UUID(%r)' % str(self)
+
+ def __setattr__(self, name, value):
+ raise TypeError('UUID objects are immutable')
+
+ def __str__(self):
+ hex = '%032x' % self.int
+ return '%s-%s-%s-%s-%s' % (
+ hex[:8], hex[8:12], hex[12:16], hex[16:20], hex[20:])
+
+ def get_bytes(self):
+ bytes = ''
+ for shift in range(0, 128, 8):
+ bytes = chr((self.int >> shift) & 0xff) + bytes
+ return bytes
+
+ bytes = property(get_bytes)
+
+ def get_bytes_le(self):
+ bytes = self.bytes
+ return (bytes[3] + bytes[2] + bytes[1] + bytes[0] +
+ bytes[5] + bytes[4] + bytes[7] + bytes[6] + bytes[8:])
+
+ bytes_le = property(get_bytes_le)
+
+ def get_fields(self):
+ return (self.time_low, self.time_mid, self.time_hi_version,
+ self.clock_seq_hi_variant, self.clock_seq_low, self.node)
+
+ fields = property(get_fields)
+
+ def get_time_low(self):
+ return self.int >> 96L
+
+ time_low = property(get_time_low)
+
+ def get_time_mid(self):
+ return (self.int >> 80L) & 0xffff
+
+ time_mid = property(get_time_mid)
+
+ def get_time_hi_version(self):
+ return (self.int >> 64L) & 0xffff
+
+ time_hi_version = property(get_time_hi_version)
+
+ def get_clock_seq_hi_variant(self):
+ return (self.int >> 56L) & 0xff
+
+ clock_seq_hi_variant = property(get_clock_seq_hi_variant)
+
+ def get_clock_seq_low(self):
+ return (self.int >> 48L) & 0xff
+
+ clock_seq_low = property(get_clock_seq_low)
+
+ def get_time(self):
+ return (((self.time_hi_version & 0x0fffL) << 48L) |
+ (self.time_mid << 32L) | self.time_low)
+
+ time = property(get_time)
+
+ def get_clock_seq(self):
+ return (((self.clock_seq_hi_variant & 0x3fL) << 8L) |
+ self.clock_seq_low)
+
+ clock_seq = property(get_clock_seq)
+
+ def get_node(self):
+ return self.int & 0xffffffffffff
+
+ node = property(get_node)
+
+ def get_hex(self):
+ return '%032x' % self.int
+
+ hex = property(get_hex)
+
+ def get_urn(self):
+ return 'urn:uuid:' + str(self)
+
+ urn = property(get_urn)
+
+ def get_variant(self):
+ if not self.int & (0x8000 << 48L):
+ return RESERVED_NCS
+ elif not self.int & (0x4000 << 48L):
+ return RFC_4122
+ elif not self.int & (0x2000 << 48L):
+ return RESERVED_MICROSOFT
+ else:
+ return RESERVED_FUTURE
+
+ variant = property(get_variant)
+
+ def get_version(self):
+ # The version bits are only meaningful for RFC 4122 UUIDs.
+ if self.variant == RFC_4122:
+ return int((self.int >> 76L) & 0xf)
+
+ version = property(get_version)
+
+def _find_mac(command, args, hw_identifiers, get_index):
+ import os
+ for dir in ['', '/sbin/', '/usr/sbin']:
+ executable = os.path.join(dir, command)
+ if not os.path.exists(executable):
+ continue
+
+ try:
+ # LC_ALL to get English output, 2>/dev/null to
+ # prevent output on stderr
+ cmd = 'LC_ALL=C %s %s 2>/dev/null' % (executable, args)
+ pipe = os.popen(cmd)
+ except IOError:
+ continue
+
+ for line in pipe:
+ words = line.lower().split()
+ for i in range(len(words)):
+ if words[i] in hw_identifiers:
+ return int(words[get_index(i)].replace(':', ''), 16)
+ return None
+
+def _ifconfig_getnode():
+ """Get the hardware address on Unix by running ifconfig."""
+
+ # This works on Linux ('' or '-a'), Tru64 ('-av'), but not all Unixes.
+ for args in ('', '-a', '-av'):
+ mac = _find_mac('ifconfig', args, ['hwaddr', 'ether'], lambda i: i+1)
+ if mac:
+ return mac
+
+ import socket
+ ip_addr = socket.gethostbyname(socket.gethostname())
+
+ # Try getting the MAC addr from arp based on our IP address (Solaris).
+ mac = _find_mac('arp', '-an', [ip_addr], lambda i: -1)
+ if mac:
+ return mac
+
+ # This might work on HP-UX.
+ mac = _find_mac('lanscan', '-ai', ['lan0'], lambda i: 0)
+ if mac:
+ return mac
+
+ return None
+
+def _ipconfig_getnode():
+ """Get the hardware address on Windows by running ipconfig.exe."""
+ import os, re
+ dirs = ['', r'c:\windows\system32', r'c:\winnt\system32']
+ try:
+ import ctypes
+ buffer = ctypes.create_string_buffer(300)
+ ctypes.windll.kernel32.GetSystemDirectoryA(buffer, 300)
+ dirs.insert(0, buffer.value.decode('mbcs'))
+ except:
+ pass
+ for dir in dirs:
+ try:
+ pipe = os.popen(os.path.join(dir, 'ipconfig') + ' /all')
+ except IOError:
+ continue
+ for line in pipe:
+ value = line.split(':')[-1].strip().lower()
+ if re.match('([0-9a-f][0-9a-f]-){5}[0-9a-f][0-9a-f]', value):
+ return int(value.replace('-', ''), 16)
+
+def _netbios_getnode():
+ """Get the hardware address on Windows using NetBIOS calls.
+ See http://support.microsoft.com/kb/118623 for details."""
+ import win32wnet, netbios
+ ncb = netbios.NCB()
+ ncb.Command = netbios.NCBENUM
+ ncb.Buffer = adapters = netbios.LANA_ENUM()
+ adapters._pack()
+ if win32wnet.Netbios(ncb) != 0:
+ return
+ adapters._unpack()
+ for i in range(adapters.length):
+ ncb.Reset()
+ ncb.Command = netbios.NCBRESET
+ ncb.Lana_num = ord(adapters.lana[i])
+ if win32wnet.Netbios(ncb) != 0:
+ continue
+ ncb.Reset()
+ ncb.Command = netbios.NCBASTAT
+ ncb.Lana_num = ord(adapters.lana[i])
+ ncb.Callname = '*'.ljust(16)
+ ncb.Buffer = status = netbios.ADAPTER_STATUS()
+ if win32wnet.Netbios(ncb) != 0:
+ continue
+ status._unpack()
+ bytes = map(ord, status.adapter_address)
+ return ((bytes[0]<<40L) + (bytes[1]<<32L) + (bytes[2]<<24L) +
+ (bytes[3]<<16L) + (bytes[4]<<8L) + bytes[5])
+
+# Thanks to Thomas Heller for ctypes and for his help with its use here.
+
+# If ctypes is available, use it to find system routines for UUID generation.
+_uuid_generate_random = _uuid_generate_time = _UuidCreate = None
+try:
+ import ctypes, ctypes.util
+ _buffer = ctypes.create_string_buffer(16)
+
+ # The uuid_generate_* routines are provided by libuuid on at least
+ # Linux and FreeBSD, and provided by libc on Mac OS X.
+ for libname in ['uuid', 'c']:
+ try:
+ lib = ctypes.CDLL(ctypes.util.find_library(libname))
+ except:
+ continue
+ if hasattr(lib, 'uuid_generate_random'):
+ _uuid_generate_random = lib.uuid_generate_random
+ if hasattr(lib, 'uuid_generate_time'):
+ _uuid_generate_time = lib.uuid_generate_time
+
+ # On Windows prior to 2000, UuidCreate gives a UUID containing the
+ # hardware address. On Windows 2000 and later, UuidCreate makes a
+ # random UUID and UuidCreateSequential gives a UUID containing the
+ # hardware address. These routines are provided by the RPC runtime.
+ # NOTE: at least on Tim's WinXP Pro SP2 desktop box, while the last
+ # 6 bytes returned by UuidCreateSequential are fixed, they don't appear
+ # to bear any relationship to the MAC address of any network device
+ # on the box.
+ try:
+ lib = ctypes.windll.rpcrt4
+ except:
+ lib = None
+ _UuidCreate = getattr(lib, 'UuidCreateSequential',
+ getattr(lib, 'UuidCreate', None))
+except:
+ pass
+
+def _unixdll_getnode():
+ """Get the hardware address on Unix using ctypes."""
+ _uuid_generate_time(_buffer)
+ return UUID(bytes=_buffer.raw).node
+
+def _windll_getnode():
+ """Get the hardware address on Windows using ctypes."""
+ if _UuidCreate(_buffer) == 0:
+ return UUID(bytes=_buffer.raw).node
+
+def _random_getnode():
+ """Get a random node ID, with eighth bit set as suggested by RFC 4122."""
+ import random
+ return random.randrange(0, 1<<48L) | 0x010000000000L
+
+_node = None
+
+def getnode():
+ """Get the hardware address as a 48-bit positive integer.
+
+ The first time this runs, it may launch a separate program, which could
+ be quite slow. If all attempts to obtain the hardware address fail, we
+ choose a random 48-bit number with its eighth bit set to 1 as recommended
+ in RFC 4122.
+ """
+
+ global _node
+ if _node is not None:
+ return _node
+
+ import sys
+ if sys.platform == 'win32':
+ getters = [_windll_getnode, _netbios_getnode, _ipconfig_getnode]
+ else:
+ getters = [_unixdll_getnode, _ifconfig_getnode]
+
+ for getter in getters + [_random_getnode]:
+ try:
+ _node = getter()
+ except:
+ continue
+ if _node is not None:
+ return _node
+
+_last_timestamp = None
+
+def uuid1(node=None, clock_seq=None):
+ """Generate a UUID from a host ID, sequence number, and the current time.
+ If 'node' is not given, getnode() is used to obtain the hardware
+ address. If 'clock_seq' is given, it is used as the sequence number;
+ otherwise a random 14-bit sequence number is chosen."""
+
+ # When the system provides a version-1 UUID generator, use it (but don't
+ # use UuidCreate here because its UUIDs don't conform to RFC 4122).
+ if _uuid_generate_time and node is clock_seq is None:
+ _uuid_generate_time(_buffer)
+ return UUID(bytes=_buffer.raw)
+
+ global _last_timestamp
+ import time
+ nanoseconds = int(time.time() * 1e9)
+ # 0x01b21dd213814000 is the number of 100-ns intervals between the
+ # UUID epoch 1582-10-15 00:00:00 and the Unix epoch 1970-01-01 00:00:00.
+ timestamp = int(nanoseconds/100) + 0x01b21dd213814000L
+ if timestamp <= _last_timestamp:
+ timestamp = _last_timestamp + 1
+ _last_timestamp = timestamp
+ if clock_seq is None:
+ import random
+ clock_seq = random.randrange(1<<14L) # instead of stable storage
+ time_low = timestamp & 0xffffffffL
+ time_mid = (timestamp >> 32L) & 0xffffL
+ time_hi_version = (timestamp >> 48L) & 0x0fffL
+ clock_seq_low = clock_seq & 0xffL
+ clock_seq_hi_variant = (clock_seq >> 8L) & 0x3fL
+ if node is None:
+ node = getnode()
+ return UUID(fields=(time_low, time_mid, time_hi_version,
+ clock_seq_hi_variant, clock_seq_low, node), version=1)
+
+def uuid3(namespace, name):
+ """Generate a UUID from the MD5 hash of a namespace UUID and a name."""
+ import md5
+ hash = md5.md5(namespace.bytes + name).digest()
+ return UUID(bytes=hash[:16], version=3)
+
+def uuid4():
+ """Generate a random UUID."""
+
+ # When the system provides a version-4 UUID generator, use it.
+ if _uuid_generate_random:
+ _uuid_generate_random(_buffer)
+ return UUID(bytes=_buffer.raw)
+
+ # Otherwise, get randomness from urandom or the 'random' module.
+ try:
+ import os
+ return UUID(bytes=os.urandom(16), version=4)
+ except:
+ import random
+ bytes = [chr(random.randrange(256)) for i in range(16)]
+ return UUID(bytes=bytes, version=4)
+
+def uuid5(namespace, name):
+ """Generate a UUID from the SHA-1 hash of a namespace UUID and a name."""
+ import sha
+ hash = sha.sha(namespace.bytes + name).digest()
+ return UUID(bytes=hash[:16], version=5)
+
+# The following standard UUIDs are for use with uuid3() or uuid5().
+
+NAMESPACE_DNS = UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8')
+NAMESPACE_URL = UUID('6ba7b811-9dad-11d1-80b4-00c04fd430c8')
+NAMESPACE_OID = UUID('6ba7b812-9dad-11d1-80b4-00c04fd430c8')
+NAMESPACE_X500 = UUID('6ba7b814-9dad-11d1-80b4-00c04fd430c8')
diff --git a/ipalib/plugins/f_host.py b/ipalib/plugins/f_host.py
index 3fcda77c8..ea819a77d 100644
--- a/ipalib/plugins/f_host.py
+++ b/ipalib/plugins/f_host.py
@@ -21,7 +21,7 @@
Frontend plugins for host/machine Identity.
"""
-from ipalib import api, crud, errors
+from ipalib import api, crud, errors, util
from ipalib import Object # Plugin base class
from ipalib import Str, Flag # Parameter types
@@ -42,7 +42,7 @@ def get_host(hostname):
dn = ldap.find_entry_dn("serverhostname", hostname, "ipaHost")
return dn
-def validate_host(cn):
+def validate_host(ugettext, cn):
"""
Require at least one dot in the hostname (to support localhost.localdomain)
"""
@@ -129,7 +129,7 @@ class host_add(crud.Add):
# some required objectclasses
# FIXME: add this attribute to cn=ipaconfig
#kw['objectclass'] = config.get('ipahostobjectclasses')
- kw['objectclass'] = ['nsHost', 'ipaHost']
+ kw['objectclass'] = ['nsHost', 'ipaHost', 'pkiUser']
# Ensure the list of objectclasses is lower-case
kw['objectclass'] = map(lambda z: z.lower(), kw.get('objectclass'))
diff --git a/ipalib/plugins/f_hostgroup.py b/ipalib/plugins/f_hostgroup.py
index c365c918e..706712c9a 100644
--- a/ipalib/plugins/f_hostgroup.py
+++ b/ipalib/plugins/f_hostgroup.py
@@ -281,7 +281,7 @@ class hostgroup_add_member(Command):
for a in result:
print "\t'%s'" % a
else:
- textui.print_entry("Group membership updated.")
+ textui.print_plain("Group membership updated.")
api.register(hostgroup_add_member)
diff --git a/ipalib/plugins/f_netgroup.py b/ipalib/plugins/f_netgroup.py
new file mode 100644
index 000000000..6ee55b0db
--- /dev/null
+++ b/ipalib/plugins/f_netgroup.py
@@ -0,0 +1,461 @@
+# Authors:
+# Rob Crittenden <rcritten@redhat.com>
+#
+# Copyright (C) 2009 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Frontend plugin for netgroups.
+"""
+
+from ipalib import api, crud, errors
+from ipalib import Object, Command # Plugin base classes
+from ipalib import Str # Parameter types
+from ipalib import uuid
+
+netgroup_base = "cn=ng, cn=alt"
+netgroup_filter = "ipaNISNetgroup"
+hostgroup_filter = "groupofnames)(!(objectclass=posixGroup)"
+
+def get_members(members):
+ """
+ Return a list of members.
+
+ It is possible that the value passed in is None.
+ """
+ if members:
+ members = members.split(',')
+ else:
+ members = []
+
+ return members
+
+def find_members(ldap, failed, members, attribute, filter=None):
+ """
+ Return 2 lists: one a list of DNs found, one a list of errors
+ """
+ found = []
+ for m in members:
+ if not m: continue
+ try:
+ member_dn = ldap.find_entry_dn(attribute, m, filter)
+ found.append(member_dn)
+ except errors.NotFound:
+ failed.append(m)
+ continue
+
+ return found, failed
+
+def add_members(ldap, completed, members, dn, memberattr):
+ add_failed = []
+ for member_dn in members:
+ try:
+ ldap.add_member_to_group(member_dn, dn, memberattr)
+ completed+=1
+ except:
+ add_failed.append(member_dn)
+
+ return completed, add_failed
+
+def add_external(ldap, completed, members, cn):
+ failed = []
+ netgroup = api.Command['netgroup_show'](cn)
+ external = netgroup.get('externalhost', [])
+ if not isinstance(external, list):
+ external = [external]
+ external_len = len(external)
+ for m in members:
+ if not m in external:
+ external.append(m)
+ completed+=1
+ else:
+ failed.append(m)
+ if len(external) > external_len:
+ kw = {'externalhost': external}
+ ldap.update(netgroup['dn'], **kw)
+
+ return completed, failed
+
+def remove_members(ldap, completed, members, dn, memberattr):
+ remove_failed = []
+ for member_dn in members:
+ try:
+ ldap.remove_member_from_group(member_dn, dn, memberattr)
+ completed+=1
+ except:
+ remove_failed.append(member_dn)
+
+ return completed, remove_failed
+
+def remove_external(ldap, completed, members, cn):
+ failed = []
+ netgroup = api.Command['netgroup_show'](cn)
+ external = netgroup.get('externalhost', [])
+ if not isinstance(external, list):
+ external = [external]
+ external_len = len(external)
+ for m in members:
+ try:
+ external.remove(m)
+ completed+=1
+ except ValueError:
+ failed.append(m)
+ if len(external) < external_len:
+ kw = {'externalhost': external}
+ ldap.update(netgroup['dn'], **kw)
+
+ return completed, failed
+
+class netgroup(Object):
+ """
+ netgroups object.
+ """
+ takes_params = (
+ Str('cn',
+ cli_name='name',
+ primary_key=True
+ ),
+ Str('description',
+ doc='Description',
+ ),
+ Str('nisdomainname?',
+ cli_name='domainname',
+ doc='Domain name',
+ ),
+ )
+api.register(netgroup)
+
+
+class netgroup_add(crud.Add):
+ 'Add a new netgroup.'
+
+ def execute(self, cn, **kw):
+ """
+ Execute the netgroup-add operation.
+
+ The dn should not be passed as a keyword argument as it is constructed
+ by this method.
+
+ Returns the entry as it will be created in LDAP.
+
+ :param cn: The name of the netgroup
+ :param kw: Keyword arguments for the other LDAP attributes.
+ """
+ self.log.info("IPA: netgroup-add '%s'" % cn)
+ assert 'cn' not in kw
+ assert 'dn' not in kw
+ ldap = self.api.Backend.ldap
+ kw['cn'] = cn
+# kw['dn'] = ldap.make_netgroup_dn()
+ kw['ipauniqueid'] = str(uuid.uuid1())
+ kw['dn'] = "ipauniqueid=%s,%s,%s" % (kw['ipauniqueid'], netgroup_base, api.env.basedn)
+
+ if not kw.get('nisdomainname', False):
+ kw['nisdomainname'] = api.env.domain
+
+ # some required objectclasses
+ kw['objectClass'] = ['top', 'ipaAssociation', 'ipaNISNetgroup']
+
+ return ldap.create(**kw)
+
+ def output_for_cli(self, textui, result, *args, **options):
+ """
+ Output result of this command to command line interface.
+ """
+ textui.print_name(self.name)
+ textui.print_entry(result)
+ textui.print_dashed('Added netgroup "%s"' % result.get('cn'))
+
+api.register(netgroup_add)
+
+
+class netgroup_del(crud.Del):
+ 'Delete an existing netgroup.'
+
+ def execute(self, cn, **kw):
+ """Delete a netgroup.
+
+ cn is the cn of the netgroup to delete
+
+ The memberOf plugin handles removing the netgroup from any other
+ groups.
+
+ :param cn: The name of the netgroup being removed.
+ :param kw: Not used.
+ """
+ self.log.info("IPA: netgroup-del '%s'" % cn)
+
+ ldap = self.api.Backend.ldap
+ dn = ldap.find_entry_dn("cn", cn, netgroup_filter, netgroup_base)
+ return ldap.delete(dn)
+
+ def output_for_cli(self, textui, result, cn):
+ """
+ Output result of this command to command line interface.
+ """
+ textui.print_plain('Deleted net group "%s"' % cn)
+
+api.register(netgroup_del)
+
+
+class netgroup_mod(crud.Mod):
+ 'Edit an existing netgroup.'
+ def execute(self, cn, **kw):
+ """
+ Execute the netgroup-mod operation.
+
+ The dn should not be passed as a keyword argument as it is constructed
+ by this method.
+
+ Returns the entry
+
+ :param cn: The name of the netgroup to retrieve.
+ :param kw: Keyword arguments for the other LDAP attributes.
+ """
+ self.log.info("IPA: netgroup-mod '%s'" % cn)
+ assert 'cn' not in kw
+ assert 'dn' not in kw
+ ldap = self.api.Backend.ldap
+ dn = ldap.find_entry_dn("cn", cn, netgroup_filter, netgroup_base)
+ return ldap.update(dn, **kw)
+
+ def output_for_cli(self, textui, result, cn, **options):
+ """
+ Output result of this command to command line interface.
+ """
+ textui.print_name(self.name)
+ textui.print_entry(result)
+ textui.print_dashed('Updated netgroup "%s"' % result['cn'])
+
+api.register(netgroup_mod)
+
+
+class netgroup_find(crud.Find):
+ 'Search the netgroups.'
+ def execute(self, term, **kw):
+ ldap = self.api.Backend.ldap
+
+ search_fields = ['ipauniqueid','description','nisdomainname','cn']
+
+ search_kw = {}
+ for s in search_fields:
+ search_kw[s] = term
+
+ search_kw['objectclass'] = netgroup_filter
+ search_kw['base'] = netgroup_base
+ return ldap.search(**search_kw)
+
+ def output_for_cli(self, textui, result, *args, **options):
+ counter = result[0]
+ groups = result[1:]
+ if counter == 0 or len(groups) == 0:
+ textui.print_plain("No entries found")
+ return
+ if len(groups) == 1:
+ textui.print_entry(groups[0])
+ return
+ textui.print_name(self.name)
+ for g in groups:
+ textui.print_entry(g)
+ textui.print_plain('')
+ if counter == -1:
+ textui.print_plain('These results are truncated.')
+ textui.print_plain('Please refine your search and try again.')
+ textui.print_count(groups, '%d netgroups matched')
+
+api.register(netgroup_find)
+
+
+class netgroup_show(crud.Get):
+ 'Examine an existing netgroup.'
+ def execute(self, cn, **kw):
+ """
+ Execute the netgroup-show operation.
+
+ The dn should not be passed as a keyword argument as it is constructed
+ by this method.
+
+ Returns the entry
+
+ :param cn: The name of the netgroup to retrieve.
+ :param kw: Unused
+ """
+ ldap = self.api.Backend.ldap
+ dn = ldap.find_entry_dn("cn", cn, netgroup_filter, netgroup_base)
+ return ldap.retrieve(dn)
+
+ def output_for_cli(self, textui, result, *args, **options):
+ textui.print_entry(result)
+
+api.register(netgroup_show)
+
+class netgroup_add_member(Command):
+ 'Add a member to a group.'
+ takes_args = (
+ Str('cn',
+ cli_name='name',
+ primary_key=True
+ ),
+ )
+ takes_options = (
+ Str('hosts?', doc='comma-separated list of hosts to add'),
+ Str('hostgroups?', doc='comma-separated list of host groups to add'),
+ Str('users?', doc='comma-separated list of users to add'),
+ Str('groups?', doc='comma-separated list of groups to add'),
+ )
+
+ def execute(self, cn, **kw):
+ """
+ Execute the netgroup-add-member operation.
+
+ Returns the updated group entry
+
+ :param cn: The netgroup name to add new members to.
+ :param kw: hosts is a comma-separated list of hosts to add
+ :param kw: hostgroups is a comma-separated list of host groups to add
+ :param kw: users is a comma-separated list of users to add
+ :param kw: groups is a comma-separated list of host to add
+ """
+ ldap = self.api.Backend.ldap
+ dn = ldap.find_entry_dn("cn", cn, netgroup_filter, netgroup_base)
+ add_failed = []
+ to_add = []
+ completed = 0
+
+ # Hosts
+ members = get_members(kw.get('hosts', ''))
+ (to_add, add_failed) = find_members(ldap, add_failed, members, "cn", "ipaHost")
+
+ # If a host is not found we'll consider it an externalHost. It will
+ # be up to the user to handle typos
+ if add_failed:
+ (completed, failed) = add_external(ldap, completed, add_failed, cn)
+ add_failed = failed
+
+ (completed, failed) = add_members(ldap, completed, to_add, dn, 'memberhost')
+ add_failed+=failed
+
+ # Host groups
+ members = get_members(kw.get('hostgroups', ''))
+ (to_add, add_failed) = find_members(ldap, add_failed, members, "cn", hostgroup_filter)
+ (completed, failed) = add_members(ldap, completed, to_add, dn, 'memberhost')
+ add_failed+=failed
+
+ # User
+ members = get_members(kw.get('users', ''))
+ (to_add, add_failed) = find_members(ldap, add_failed, members, "uid")
+ (completed, failed) = add_members(ldap, completed, to_add, dn, 'memberuser')
+ add_failed+=failed
+
+ # Groups
+ members = get_members(kw.get('groups', ''))
+ (to_add, add_failed) = find_members(ldap, add_failed, members, "cn", "posixGroup")
+ (completed, failed) = add_members(ldap, completed, to_add, dn, 'memberuser')
+ add_failed+=failed
+
+ return add_failed
+
+ def output_for_cli(self, textui, result, *args, **options):
+ """
+ Output result of this command to command line interface.
+ """
+ if result:
+ textui.print_plain("These entries failed to add to the group:")
+ for a in result:
+ print "\t'%s'" % a
+ else:
+ textui.print_plain("netgroup membership updated.")
+
+api.register(netgroup_add_member)
+
+
+class netgroup_remove_member(Command):
+ 'Remove a member from a group.'
+ takes_args = (
+ Str('cn',
+ cli_name='name',
+ primary_key=True
+ ),
+ )
+ takes_options = (
+ Str('hosts?', doc='comma-separated list of hosts to remove'),
+ Str('hostgroups?', doc='comma-separated list of groups to remove'),
+ Str('users?', doc='comma-separated list of users to remove'),
+ Str('groups?', doc='comma-separated list of groups to remove'),
+ )
+ def execute(self, cn, **kw):
+ """
+ Execute the group-remove-member operation.
+
+ Returns the members that could not be added
+
+ :param cn: The group name to add new members to.
+ :param kw: hosts is a comma-separated list of hosts to remove
+ :param kw: hostgroups is a comma-separated list of host groups to remove
+ :param kw: users is a comma-separated list of users to remove
+ :param kw: groups is a comma-separated list of host to remove
+ """
+ ldap = self.api.Backend.ldap
+ dn = ldap.find_entry_dn("cn", cn, netgroup_filter, netgroup_base)
+ remove_failed = []
+ to_remove = []
+ completed = 0
+
+ # Hosts
+ members = get_members(kw.get('hosts', ''))
+ (to_remove, remove_failed) = find_members(ldap, remove_failed, members, "cn", "ipaHost")
+
+ # If a host is not found we'll consider it an externalHost. It will
+ # be up to the user to handle typos
+ if remove_failed:
+ (completed, failed) = remove_external(ldap, completed, remove_failed, cn)
+ remove_failed = failed
+
+ (completed, failed) = remove_members(ldap, completed, to_remove, dn, 'memberhost')
+ remove_failed+=failed
+
+ # Host groups
+ members = get_members(kw.get('hostgroups', ''))
+ (to_remove, remove_failed) = find_members(ldap, remove_failed, members, "cn", hostgroup_filter)
+ (completed, failed) = remove_members(ldap, completed, to_remove, dn, 'memberhost')
+ remove_failed+=failed
+
+ # User
+ members = get_members(kw.get('users', ''))
+ (to_remove, remove_failed) = find_members(ldap, remove_failed, members, "uid")
+ (completed, failed) = remove_members(ldap, completed, to_remove, dn, 'memberuser')
+ remove_failed+=failed
+
+ # Groups
+ members = get_members(kw.get('groups', ''))
+ (to_remove, remove_failed) = find_members(ldap, remove_failed, members, "cn", "posixGroup")
+ (completed, failed) = remove_members(ldap, completed, to_remove, dn, 'memberuser')
+ remove_failed+=failed
+
+ return remove_failed
+
+ def output_for_cli(self, textui, result, *args, **options):
+ """
+ Output result of this command to command line interface.
+ """
+ if result:
+ textui.print_plain("These entries failed to be removed from the group:")
+ for a in result:
+ print "\t'%s'" % a
+ else:
+ textui.print_plain("netgroup membership updated.")
+
+api.register(netgroup_remove_member)
diff --git a/ipaserver/ipaldap.py b/ipaserver/ipaldap.py
index 19fd40efd..4a2e4e31c 100644
--- a/ipaserver/ipaldap.py
+++ b/ipaserver/ipaldap.py
@@ -111,6 +111,13 @@ class Entry:
setValues = setValue
+ def delAttr(self, name):
+ """
+ Entirely remove an attribute of this entry.
+ """
+ if self.hasAttr(name):
+ del self.data[name]
+
def toTupleList(self):
"""Convert the attrs and values to a list of 2-tuples. The first element
of the tuple is the attribute name. The second element is either a
@@ -375,7 +382,7 @@ class IPAdmin(SimpleLDAPObject):
except ldap.ALREADY_EXISTS, e:
raise errors.DuplicateEntry, "Entry already exists"
except ldap.LDAPError, e:
- raise DatabaseError, e
+ raise errors.DatabaseError, e
return True
def updateRDN(self, dn, newrdn):
@@ -392,7 +399,7 @@ class IPAdmin(SimpleLDAPObject):
self.set_option(ldap.OPT_SERVER_CONTROLS, sctrl)
self.modrdn_s(dn, newrdn, delold=1)
except ldap.LDAPError, e:
- raise DatabaseError, e
+ raise errors.DatabaseError, e
return True
def updateEntry(self,dn,oldentry,newentry):
@@ -474,7 +481,7 @@ class IPAdmin(SimpleLDAPObject):
self.set_option(ldap.OPT_SERVER_CONTROLS, sctrl)
self.modify_s(dn, modlist)
except ldap.LDAPError, e:
- raise DatabaseError, e
+ raise errors.DatabaseError, e
return True
def deleteEntry(self,*args):
diff --git a/ipaserver/plugins/b_ldap.py b/ipaserver/plugins/b_ldap.py
index 2d6ad6258..9e06ce51b 100644
--- a/ipaserver/plugins/b_ldap.py
+++ b/ipaserver/plugins/b_ldap.py
@@ -190,23 +190,23 @@ class ldap(CrudBackend):
def modify_password(self, dn, **kw):
return servercore.modify_password(dn, kw.get('oldpass'), kw.get('newpass'))
- def add_member_to_group(self, memberdn, groupdn):
+ def add_member_to_group(self, memberdn, groupdn, memberattr='member'):
"""
Add a new member to a group.
:param memberdn: the DN of the member to add
:param groupdn: the DN of the group to add a member to
"""
- return servercore.add_member_to_group(memberdn, groupdn)
+ return servercore.add_member_to_group(memberdn, groupdn, memberattr)
- def remove_member_from_group(self, memberdn, groupdn):
+ def remove_member_from_group(self, memberdn, groupdn, memberattr='member'):
"""
Remove a new member from a group.
:param memberdn: the DN of the member to remove
:param groupdn: the DN of the group to remove a member from
"""
- return servercore.remove_member_from_group(memberdn, groupdn)
+ return servercore.remove_member_from_group(memberdn, groupdn, memberattr)
# The CRUD operations
@@ -227,6 +227,7 @@ class ldap(CrudBackend):
else:
assert type(value) in (str, unicode, bool, int, float)
yield (key, value)
+ yield (key, value)
def create(self, **kw):
if servercore.entry_exists(kw['dn']):
@@ -251,13 +252,18 @@ class ldap(CrudBackend):
def update(self, dn, **kw):
result = self.retrieve(dn, ["*"])
+ start_keys = kw.keys()
entry = ipaldap.Entry((dn, servercore.convert_scalar_values(result)))
kw = dict(self.strip_none(kw))
for k in kw:
entry.setValues(k, kw[k])
- servercore.update_entry(entry.toDict())
+ remove_keys = list(set(start_keys) - set(kw.keys()))
+ for k in remove_keys:
+ entry.delAttr(k)
+
+ servercore.update_entry(entry.toDict(), remove_keys)
return self.retrieve(dn)
diff --git a/ipaserver/servercore.py b/ipaserver/servercore.py
index 6991989e5..362013401 100644
--- a/ipaserver/servercore.py
+++ b/ipaserver/servercore.py
@@ -227,16 +227,19 @@ def uid_too_long(uid):
return False
-def update_entry (entry):
+def update_entry (entry, remove_keys=[]):
"""Update an LDAP entry
entry is a dict
+ remove_keys is a list of attributes to remove from this entry
This refreshes the record from LDAP in order to obtain the list of
- attributes that has changed.
+ attributes that has changed. It only retrieves the attributes that
+ are in the update so attributes aren't inadvertantly lost.
"""
+ assert type(remove_keys) is list
attrs = entry.keys()
- o = get_base_entry(entry['dn'], "objectclass=*", attrs)
+ o = get_base_entry(entry['dn'], "objectclass=*", attrs + remove_keys)
oldentry = convert_scalar_values(o)
newentry = convert_scalar_values(entry)
@@ -395,7 +398,7 @@ def mark_entry_inactive (dn):
return res
-def add_member_to_group(member_dn, group_dn):
+def add_member_to_group(member_dn, group_dn, memberattr='member'):
"""
Add a member to an existing group.
"""
@@ -414,18 +417,18 @@ def add_member_to_group(member_dn, group_dn):
raise errors.NotFound
# Add the new member to the group member attribute
- members = group.get('member', [])
+ members = group.get(memberattr, [])
if isinstance(members, basestring):
members = [members]
members.append(member_dn)
- group['member'] = members
+ group[memberattr] = members
try:
return update_entry(group)
except errors.EmptyModlist:
raise
-def remove_member_from_group(member_dn, group_dn=None):
+def remove_member_from_group(member_dn, group_dn, memberattr='member'):
"""Remove a member_dn from an existing group."""
group = get_entry_by_dn(group_dn, None)
@@ -439,7 +442,7 @@ def remove_member_from_group(member_dn, group_dn=None):
"""
api.log.info("IPA: remove_member_from_group '%s' from '%s'" % (member_dn, group_dn))
- members = group.get('member', False)
+ members = group.get(memberattr, False)
if not members:
raise errors.NotGroupMember
@@ -456,7 +459,7 @@ def remove_member_from_group(member_dn, group_dn=None):
except Exception, e:
raise e
- group['member'] = members
+ group[memberattr] = members
try:
return update_entry(group)
diff --git a/ipaserver/updates/host.update b/ipaserver/updates/host.update
index dfc9723cf..f5ecda5ac 100644
--- a/ipaserver/updates/host.update
+++ b/ipaserver/updates/host.update
@@ -18,5 +18,8 @@ add: objectClasses:
AUXILIARY
MAY ( userPassword $ ipaClientVersion $ enrolledBy)
X-ORIGIN 'IPA v2' )
-
-
+add: objectClasses:
+ ( 2.5.6.21 NAME 'pkiUser'
+ SUP top AUXILIARY
+ MAY ( userCertificate )
+ X-ORIGIN 'RFC 2587' )
diff --git a/tests/test_xmlrpc/test_group_plugin.py b/tests/test_xmlrpc/test_group_plugin.py
index 7ac88f294..2b16cc8a5 100644
--- a/tests/test_xmlrpc/test_group_plugin.py
+++ b/tests/test_xmlrpc/test_group_plugin.py
@@ -52,6 +52,15 @@ class test_Group(XMLRPC_test):
def test_add2(self):
"""
+ Test the `xmlrpc.group_add` method duplicate detection.
+ """
+ try:
+ res = api.Command['group_add'](**self.kw)
+ except errors.DuplicateEntry:
+ pass
+
+ def test_add2(self):
+ """
Test the `xmlrpc.group_add` method.
"""
self.kw['cn'] = self.cn2
@@ -69,6 +78,16 @@ class test_Group(XMLRPC_test):
res = api.Command['group_add_member'](self.cn, **kw)
assert res == []
+ def test_add_member2(self):
+ """
+ Test the `xmlrpc.group_add_member` with a non-existent member
+ """
+ kw={}
+ kw['groups'] = "notfound"
+ res = api.Command['group_add_member'](self.cn, **kw)
+ # an error isn't thrown, the list of failed members is returned
+ assert res != []
+
def test_doshow(self):
"""
Test the `xmlrpc.group_show` method.
@@ -118,6 +137,16 @@ class test_Group(XMLRPC_test):
assert res
assert res.get('member','') == ''
+ def test_remove_member2(self):
+ """
+ Test the `xmlrpc.group_remove_member` method with non-member
+ """
+ kw={}
+ kw['groups'] = "notfound"
+ # an error isn't thrown, the list of failed members is returned
+ res = api.Command['group_remove_member'](self.cn, **kw)
+ assert res != []
+
def test_remove_x(self):
"""
Test the `xmlrpc.group_del` method.
diff --git a/tests/test_xmlrpc/test_netgroup_plugin.py b/tests/test_xmlrpc/test_netgroup_plugin.py
new file mode 100644
index 000000000..3d3e4afff
--- /dev/null
+++ b/tests/test_xmlrpc/test_netgroup_plugin.py
@@ -0,0 +1,320 @@
+# Authors:
+# Rob Crittenden <rcritten@redhat.com>
+#
+# Copyright (C) 2009 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipalib/plugins/f_netgroup` module.
+"""
+
+import sys
+from xmlrpc_test import XMLRPC_test
+from ipalib import api
+from ipalib import errors
+from ipalib.cli import CLI
+
+try:
+ api.finalize()
+except StandardError:
+ pass
+
+def is_member_of(members, candidate):
+ if not isinstance(members, list):
+ members = [members]
+ for m in members:
+ if m.startswith(candidate):
+ return True
+ return False
+
+class test_Netgroup(XMLRPC_test):
+ """
+ Test the `f_netgroup` plugin.
+ """
+ ng_cn='ng1'
+ ng_description='Netgroup'
+ ng_kw={'cn': ng_cn, 'description': ng_description}
+
+ host_cn='ipaexample.%s' % api.env.domain
+ host_description='Test host'
+ host_localityname='Undisclosed location'
+ host_kw={'cn': host_cn, 'description': host_description, 'localityname': host_localityname}
+
+ hg_cn='ng1'
+ hg_description='Netgroup'
+ hg_kw={'cn': hg_cn, 'description': hg_description}
+
+ user_uid='jexample'
+ user_givenname='Jim'
+ user_sn='Example'
+ user_home='/home/%s' % user_uid
+ user_kw={'givenname':user_givenname,'sn':user_sn,'uid':user_uid,'homedirectory':user_home}
+
+ group_cn='testgroup'
+ group_description='This is a test'
+ group_kw={'description':group_description,'cn':group_cn}
+
+ def test_add(self):
+ """
+ Test the `xmlrpc.netgroup_add` method.
+ """
+ res = api.Command['netgroup_add'](**self.ng_kw)
+ assert res
+ assert res.get('description','') == self.ng_description
+ assert res.get('cn','') == self.ng_cn
+
+ def test_adddata(self):
+ """
+ Add the data needed to do additional testing.
+ """
+
+ # Add a host
+ res = api.Command['host_add'](**self.host_kw)
+ assert res
+ assert res.get('description','') == self.host_description
+ assert res.get('cn','') == self.host_cn
+
+ # Add a hostgroup
+ res = api.Command['hostgroup_add'](**self.hg_kw)
+ assert res
+ assert res.get('description','') == self.hg_description
+ assert res.get('cn','') == self.hg_cn
+
+ # Add a user
+ res = api.Command['user_add'](**self.user_kw)
+ assert res
+ assert res.get('givenname','') == self.user_givenname
+ assert res.get('uid','') == self.user_uid
+
+ # Add a group
+ res = api.Command['group_add'](**self.group_kw)
+ assert res
+ assert res.get('description','') == self.group_description
+ assert res.get('cn','') == self.group_cn
+
+ def test_addmembers(self):
+ """
+ Test the `xmlrpc.netgroup_add_member` method.
+ """
+ kw={}
+ kw['hosts'] = self.host_cn
+ res = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert res == []
+
+ kw={}
+ kw['hostgroups'] = self.hg_cn
+ res = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert res == []
+
+ kw={}
+ kw['users'] = self.user_uid
+ res = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert res == []
+
+ kw={}
+ kw['groups'] = self.group_cn
+ res = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert res == []
+
+ def test_addmembers2(self):
+ """
+ Test the `xmlrpc.netgroup_add_member` method again to test dupes.
+ """
+ kw={}
+ kw['hosts'] = self.host_cn
+ res = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert is_member_of(res, 'cn=%s' % self.host_cn)
+
+ kw={}
+ kw['hostgroups'] = self.hg_cn
+ res = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert is_member_of(res, 'cn=%s' % self.hg_cn)
+
+ kw={}
+ kw['users'] = self.user_uid
+ res = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert is_member_of(res, 'uid=%s' % self.user_uid)
+
+ kw={}
+ kw['groups'] = self.group_cn
+ res = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert is_member_of(res, 'cn=%s' % self.group_cn)
+
+ def test_addexternalmembers(self):
+ """
+ Test adding external hosts
+ """
+ kw={}
+ kw['hosts'] = "nosuchhost"
+ res = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert res == []
+ res = api.Command['netgroup_show'](self.ng_cn)
+ assert res
+ assert is_member_of(res.get('externalhost',[]), kw['hosts'])
+
+ def test_doshow(self):
+ """
+ Test the `xmlrpc.netgroup_show` method.
+ """
+ res = api.Command['netgroup_show'](self.ng_cn)
+ assert res
+ assert res.get('description','') == self.ng_description
+ assert res.get('cn','') == self.ng_cn
+ assert is_member_of(res.get('memberhost',[]), 'cn=%s' % self.host_cn)
+ assert is_member_of(res.get('memberhost',[]), 'cn=%s' % self.hg_cn)
+ assert is_member_of(res.get('memberuser',[]), 'uid=%s' % self.user_uid)
+ assert is_member_of(res.get('memberuser',[]), 'cn=%s' % self.group_cn)
+
+ def test_find(self):
+ """
+ Test the `xmlrpc.hostgroup_find` method.
+ """
+ res = api.Command['netgroup_find'](self.ng_cn)
+ assert res
+ assert len(res) == 2
+ assert res[1].get('description','') == self.ng_description
+ assert res[1].get('cn','') == self.ng_cn
+
+ def test_mod(self):
+ """
+ Test the `xmlrpc.hostgroup_mod` method.
+ """
+ newdesc='Updated host group'
+ modkw={'cn': self.ng_cn, 'description': newdesc}
+ res = api.Command['netgroup_mod'](**modkw)
+ assert res
+ assert res.get('description','') == newdesc
+
+ # Ok, double-check that it was changed
+ res = api.Command['netgroup_show'](self.ng_cn)
+ assert res
+ assert res.get('description','') == newdesc
+ assert res.get('cn','') == self.ng_cn
+
+ def test_member_remove(self):
+ """
+ Test the `xmlrpc.hostgroup_remove_member` method.
+ """
+ kw={}
+ kw['hosts'] = self.host_cn
+ res = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert res == []
+
+ kw={}
+ kw['hostgroups'] = self.hg_cn
+ res = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert res == []
+
+ kw={}
+ kw['users'] = self.user_uid
+ res = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert res == []
+
+ kw={}
+ kw['groups'] = self.group_cn
+ res = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert res == []
+
+ def test_member_remove2(self):
+ """
+ Test the `xmlrpc.netgroup_remove_member` method again to test not found.
+ """
+ kw={}
+ kw['hosts'] = self.host_cn
+ res = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert is_member_of(res, 'cn=%s' % self.host_cn)
+
+ kw={}
+ kw['hostgroups'] = self.hg_cn
+ res = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert is_member_of(res, 'cn=%s' % self.hg_cn)
+
+ kw={}
+ kw['users'] = self.user_uid
+ res = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert is_member_of(res, 'uid=%s' % self.user_uid)
+
+ kw={}
+ kw['groups'] = self.group_cn
+ res = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert is_member_of(res, 'cn=%s' % self.group_cn)
+
+ def test_remove(self):
+ """
+ Test the `xmlrpc.netgroup_del` method.
+ """
+ res = api.Command['netgroup_del'](self.ng_cn)
+ assert res == True
+
+ # Verify that it is gone
+ try:
+ res = api.Command['netgroup_show'](self.ng_cn)
+ except errors.NotFound:
+ pass
+ else:
+ assert False
+
+ def test_removedata(self):
+ """
+ Remove the test data we added
+ """
+ # Remove the host
+ res = api.Command['host_del'](self.host_cn)
+ assert res == True
+
+ # Verify that it is gone
+ try:
+ res = api.Command['host_show'](self.host_cn)
+ except errors.NotFound:
+ pass
+ else:
+ assert False
+
+ # Remove the hostgroup
+ res = api.Command['hostgroup_del'](self.hg_cn)
+ assert res == True
+
+ # Verify that it is gone
+ try:
+ res = api.Command['hostgroup_show'](self.hg_cn)
+ except errors.NotFound:
+ pass
+ else:
+ assert False
+
+ # Remove the user
+ res = api.Command['user_del'](self.user_uid)
+ assert res == True
+
+ # Verify that it is gone
+ try:
+ res = api.Command['user_show'](self.user_uid)
+ except errors.NotFound:
+ pass
+ else:
+ assert False
+
+ # Remove the group
+ res = api.Command['group_del'](self.group_cn)
+ assert res == True
+
+ # Verify that it is gone
+ try:
+ res = api.Command['group_show'](self.group_cn)
+ except errors.NotFound:
+ pass
+ else:
+ assert False
diff --git a/tests/test_xmlrpc/test_user_plugin.py b/tests/test_xmlrpc/test_user_plugin.py
index 6353d0b46..0189aa5ac 100644
--- a/tests/test_xmlrpc/test_user_plugin.py
+++ b/tests/test_xmlrpc/test_user_plugin.py
@@ -54,6 +54,15 @@ class test_User(XMLRPC_test):
assert res.get('uid','') == self.uid
assert res.get('homedirectory','') == self.home
+ def test_add2(self):
+ """
+ Test the `xmlrpc.user_add` method duplicate detection.
+ """
+ try:
+ res = api.Command['user_add'](**self.kw)
+ except errors.DuplicateEntry:
+ pass
+
def test_doshow(self):
"""
Test the `xmlrpc.user_show` method.
diff --git a/tests/test_xmlrpc/xmlrpc_test.py b/tests/test_xmlrpc/xmlrpc_test.py
index 28ca7f6d1..744c0c277 100644
--- a/tests/test_xmlrpc/xmlrpc_test.py
+++ b/tests/test_xmlrpc/xmlrpc_test.py
@@ -41,7 +41,6 @@ class XMLRPC_test:
def setUp(self):
# FIXME: changing Plugin.name from a property to an instance attribute
# somehow broke this.
- raise nose.SkipTest()
try:
res = api.Command['user_show']('notfound')
except socket.error: