summaryrefslogtreecommitdiffstats
path: root/ipaserver
diff options
context:
space:
mode:
authorMartin Kosek <mkosek@redhat.com>2013-01-18 17:28:39 +0100
committerMartin Kosek <mkosek@redhat.com>2013-02-14 08:38:11 +0100
commite60e80e2b6710e581e417d9e7e05cea21ba9f6b0 (patch)
tree828e6b7d2142ca73a59bd747ed3db7db55511883 /ipaserver
parent4c4418fb9e9c2cf4fff8dec59b6d8fcdb05ea706 (diff)
downloadfreeipa-e60e80e2b6710e581e417d9e7e05cea21ba9f6b0.tar.gz
freeipa-e60e80e2b6710e581e417d9e7e05cea21ba9f6b0.tar.xz
freeipa-e60e80e2b6710e581e417d9e7e05cea21ba9f6b0.zip
Generalize AD GC search
Modify access methods to AD GC so that callers can specify a custom basedn, filter, scope and attribute list, thus allowing it to perform any LDAP search. Error checking methodology in these functions was changed, so that it rather raises an exception with a desription instead of simply returning a None or False value which would made an investigation why something does not work much more difficult. External membership method in group-add-member command was updated to match this approach. https://fedorahosted.org/freeipa/ticket/2997
Diffstat (limited to 'ipaserver')
-rw-r--r--ipaserver/dcerpc.py145
1 files changed, 95 insertions, 50 deletions
diff --git a/ipaserver/dcerpc.py b/ipaserver/dcerpc.py
index 384044437..56d8b0319 100644
--- a/ipaserver/dcerpc.py
+++ b/ipaserver/dcerpc.py
@@ -164,16 +164,18 @@ class DomainValidator(object):
except errors.NotFound, e:
return []
- def is_trusted_sid_valid(self, sid):
+ def get_domain_by_sid(self, sid):
if not self.domain:
# our domain is not configured or self.is_configured() never run
# reject SIDs as we can't check correctness of them
- return False
+ raise errors.ValidationError(name='sid',
+ error=_('domain is not configured'))
# Parse sid string to see if it is really in a SID format
try:
test_sid = security.dom_sid(sid)
except TypeError, e:
- return False
+ raise errors.ValidationError(name='sid',
+ error=_('SID is not valid'))
# At this point we have SID_NT_AUTHORITY family SID and really need to
# check it against prefixes of domain SIDs we trust to
if not self._domains:
@@ -181,7 +183,8 @@ class DomainValidator(object):
if len(self._domains) == 0:
# Our domain is configured but no trusted domains are configured
# This means we can't check the correctness of a trusted domain SIDs
- return False
+ raise errors.ValidationError(name='sid',
+ error=_('no trusted domain is configured'))
# We have non-zero list of trusted domains and have to go through them
# one by one and check their sids as prefixes
test_sid_subauths = test_sid.sub_auths
@@ -190,44 +193,87 @@ class DomainValidator(object):
sub_auths = domsid.sub_auths
num_auths = min(test_sid.num_auths, domsid.num_auths)
if test_sid_subauths[:num_auths] == sub_auths[:num_auths]:
- return True
- return False
+ return domain
+ raise errors.NotFound(reason=_('SID does not match any trusted domain'))
+
+ def is_trusted_sid_valid(self, sid):
+ try:
+ self.get_domain_by_sid(sid)
+ except (errors.ValidationError, errors.NotFound):
+ return False
+ else:
+ return True
- def get_sid_trusted_domain_object(self, object_name):
+ def get_trusted_domain_objects(self, domain=None, flatname=None, filter="",
+ attrs=None, scope=_ldap.SCOPE_SUBTREE, basedn=None):
+ """
+ Search for LDAP objects in a trusted domain specified either by `domain'
+ or `flatname'. The actual LDAP search is specified by `filter', `attrs',
+ `scope' and `basedn'. When `basedn' is empty, database root DN is used.
+ """
+ assert domain is not None or flatname is not None
"""Returns SID for the trusted domain object (user or group only)"""
if not self.domain:
# our domain is not configured or self.is_configured() never run
- return None
+ raise errors.ValidationError(name=_('Trust setup'),
+ error=_('Our domain is not configured'))
if not self._domains:
self._domains = self.get_trusted_domains()
if len(self._domains) == 0:
# Our domain is configured but no trusted domains are configured
- return None
-
- components = normalize_name(object_name)
- if not ('domain' in components or 'flatname' in components):
- # No domain or realm specified, ambiguous search
- return False
-
- entry = None
- if 'domain' in components and components['domain'] in self._domains:
+ raise errors.ValidationError(name=_('Trust setup'),
+ error=_('No trusted domain is not configured'))
+
+ entries = None
+ if domain is not None:
+ if domain not in self._domains:
+ raise errors.ValidationError(name=_('trusted domain object'),
+ error= _('domain is not trusted'))
# Now we have a name to check against our list of trusted domains
- entry = self.resolve_against_gc(components['domain'], components['name'])
- elif 'flatname' in components:
+ entries = self.search_in_gc(domain, filter, attrs, scope, basedn)
+ elif flatname is not None:
# Flatname was specified, traverse through the list of trusted
# domains first to find the proper one
+ found_flatname = False
for domain in self._domains:
- if self._domains[domain][0] == components['flatname']:
- entry = self.resolve_against_gc(domain, components['name'])
- if entry:
+ if self._domains[domain][0] == flatname:
+ found_flatname = True
+ entries = self.search_in_gc(domain, filter, attrs, scope, basedn)
+ if entries:
break
- if entry:
- try:
- test_sid = security.dom_sid(entry)
- return unicode(test_sid)
- except TypeError, e:
- return False
- return False
+ if not found_flatname:
+ raise errors.ValidationError(name=_('trusted domain object'),
+ error= _('no trusted domain matched the specified flat name'))
+ if not entries:
+ raise errors.NotFound(reason=_('trusted domain object not found'))
+
+ return entries
+
+ def get_trusted_domain_object_sid(self, object_name):
+ components = normalize_name(object_name)
+ if not ('domain' in components or 'flatname' in components):
+ # No domain or realm specified, ambiguous search
+ raise errors.ValidationError(name=_('trusted domain object'),
+ error= _('Ambiguous search, user domain was not specified'))
+
+ attrs = ['objectSid']
+ filter = '(&(sAMAccountName=%(name)s)(|(objectClass=user)(objectClass=group)))' \
+ % dict(name=components['name'])
+ scope = _ldap.SCOPE_SUBTREE
+ entries = self.get_trusted_domain_objects(components.get('domain'),
+ components.get('flatname'), filter, attrs, scope)
+
+ if len(entries) > 1:
+ # Treat non-unique entries as invalid
+ raise errors.ValidationError(name=_('trusted domain object'),
+ error= _('Trusted domain did not return a unique object'))
+ sid = self.__sid_to_str(entries[0][1]['objectSid'][0])
+ try:
+ test_sid = security.dom_sid(sid)
+ return unicode(test_sid)
+ except TypeError, e:
+ raise errors.ValidationError(name=_('trusted domain object'),
+ error= _('Trusted domain did not return a valid SID for the object'))
def __sid_to_str(self, sid):
"""
@@ -280,36 +326,33 @@ class DomainValidator(object):
dict(domain=info['dns_domain'],message=stderr.strip()))
return (None, None)
- def resolve_against_gc(self, domain, name):
+ def search_in_gc(self, domain, filter, attrs, scope, basedn=None):
"""
- Resolves `name' against trusted domain `domain' using Global Catalog
- Returns SID of the `name' or None
+ Perform LDAP search in a trusted domain `domain' Global Catalog.
+ Returns resulting entries or None
"""
- entry = None
+ entries = None
sid = None
info = self.__retrieve_trusted_domain_gc_list(domain)
if not info:
- return None
+ raise errors.ValidationError(name=_('Trust setup'),
+ error=_('Cannot retrieve trusted domain GC list'))
for (host, port) in info['gc']:
- entry = self.__resolve_against_gc(info, host, port, name)
- if entry:
+ entries = self.__search_in_gc(info, host, port, filter, attrs, scope, basedn)
+ if entries:
break
- if entry:
- l = len(entry)
- if l > 2:
- # Treat non-unique entries as invalid
- return None
- sid = self.__sid_to_str(entry[0][1]['objectSid'][0])
- return sid
+ return entries
- def __resolve_against_gc(self, info, host, port, name):
+ def __search_in_gc(self, info, host, port, filter, attrs, scope, basedn=None):
"""
- Actual resolution against LDAP server, using SASL GSSAPI authentication
+ Actual search in AD LDAP server, using SASL GSSAPI authentication
Returns LDAP result or None
"""
conn = IPAdmin(host=host, port=port)
auth = self.__extract_trusted_auth(info)
+ if attrs is None:
+ attrs = []
if auth:
(ccache_name, principal) = self.__kinit_as_trusted_account(info, auth)
if ccache_name:
@@ -322,13 +365,15 @@ class DomainValidator(object):
# records pointing back to the same host name
conn.set_option(_ldap.OPT_X_SASL_NOCANON, _ldap.OPT_ON)
conn.sasl_interactive_bind_s(None, sasl_auth)
- base = DN(*map(lambda p: ('dc', p), info['dns_domain'].split('.')))
+ if basedn is None:
+ # Use domain root base DN
+ basedn = DN(*map(lambda p: ('dc', p), info['dns_domain'].split('.')))
# We don't use conn.getEntry() because it will attempt to fetch schema from GC and that will fail
- filterstr = conn.encode('(&(sAMAccountName=%(name)s)(|(objectClass=user)(objectClass=group)))' % dict(name=name))
- attrlist = conn.encode(['sAMAccountName', 'sAMAccountType', 'objectSid', 'groupType', 'description'])
- entry = conn.conn.search_s(str(base), _ldap.SCOPE_SUBTREE, filterstr, attrlist, 0)
+ filterstr = conn.encode(filter)
+ attrlist = conn.encode(attrs)
+ entries = conn.conn.search_s(str(basedn), scope, filterstr, attrlist, 0)
os.environ["KRB5CCNAME"] = old_ccache
- return entry
+ return entries
def __retrieve_trusted_domain_gc_list(self, domain):
"""