diff options
author | Martin Kosek <mkosek@redhat.com> | 2013-01-18 17:28:39 +0100 |
---|---|---|
committer | Martin Kosek <mkosek@redhat.com> | 2013-02-14 08:38:11 +0100 |
commit | e60e80e2b6710e581e417d9e7e05cea21ba9f6b0 (patch) | |
tree | 828e6b7d2142ca73a59bd747ed3db7db55511883 /ipaserver | |
parent | 4c4418fb9e9c2cf4fff8dec59b6d8fcdb05ea706 (diff) | |
download | freeipa-e60e80e2b6710e581e417d9e7e05cea21ba9f6b0.tar.gz freeipa-e60e80e2b6710e581e417d9e7e05cea21ba9f6b0.tar.xz freeipa-e60e80e2b6710e581e417d9e7e05cea21ba9f6b0.zip |
Generalize AD GC search
Modify access methods to AD GC so that callers can specify a custom
basedn, filter, scope and attribute list, thus allowing it to perform
any LDAP search.
Error checking methodology in these functions was changed, so that it
rather raises an exception with a desription instead of simply returning
a None or False value which would made an investigation why something
does not work much more difficult. External membership method in
group-add-member command was updated to match this approach.
https://fedorahosted.org/freeipa/ticket/2997
Diffstat (limited to 'ipaserver')
-rw-r--r-- | ipaserver/dcerpc.py | 145 |
1 files changed, 95 insertions, 50 deletions
diff --git a/ipaserver/dcerpc.py b/ipaserver/dcerpc.py index 384044437..56d8b0319 100644 --- a/ipaserver/dcerpc.py +++ b/ipaserver/dcerpc.py @@ -164,16 +164,18 @@ class DomainValidator(object): except errors.NotFound, e: return [] - def is_trusted_sid_valid(self, sid): + def get_domain_by_sid(self, sid): if not self.domain: # our domain is not configured or self.is_configured() never run # reject SIDs as we can't check correctness of them - return False + raise errors.ValidationError(name='sid', + error=_('domain is not configured')) # Parse sid string to see if it is really in a SID format try: test_sid = security.dom_sid(sid) except TypeError, e: - return False + raise errors.ValidationError(name='sid', + error=_('SID is not valid')) # At this point we have SID_NT_AUTHORITY family SID and really need to # check it against prefixes of domain SIDs we trust to if not self._domains: @@ -181,7 +183,8 @@ class DomainValidator(object): if len(self._domains) == 0: # Our domain is configured but no trusted domains are configured # This means we can't check the correctness of a trusted domain SIDs - return False + raise errors.ValidationError(name='sid', + error=_('no trusted domain is configured')) # We have non-zero list of trusted domains and have to go through them # one by one and check their sids as prefixes test_sid_subauths = test_sid.sub_auths @@ -190,44 +193,87 @@ class DomainValidator(object): sub_auths = domsid.sub_auths num_auths = min(test_sid.num_auths, domsid.num_auths) if test_sid_subauths[:num_auths] == sub_auths[:num_auths]: - return True - return False + return domain + raise errors.NotFound(reason=_('SID does not match any trusted domain')) + + def is_trusted_sid_valid(self, sid): + try: + self.get_domain_by_sid(sid) + except (errors.ValidationError, errors.NotFound): + return False + else: + return True - def get_sid_trusted_domain_object(self, object_name): + def get_trusted_domain_objects(self, domain=None, flatname=None, filter="", + attrs=None, scope=_ldap.SCOPE_SUBTREE, basedn=None): + """ + Search for LDAP objects in a trusted domain specified either by `domain' + or `flatname'. The actual LDAP search is specified by `filter', `attrs', + `scope' and `basedn'. When `basedn' is empty, database root DN is used. + """ + assert domain is not None or flatname is not None """Returns SID for the trusted domain object (user or group only)""" if not self.domain: # our domain is not configured or self.is_configured() never run - return None + raise errors.ValidationError(name=_('Trust setup'), + error=_('Our domain is not configured')) if not self._domains: self._domains = self.get_trusted_domains() if len(self._domains) == 0: # Our domain is configured but no trusted domains are configured - return None - - components = normalize_name(object_name) - if not ('domain' in components or 'flatname' in components): - # No domain or realm specified, ambiguous search - return False - - entry = None - if 'domain' in components and components['domain'] in self._domains: + raise errors.ValidationError(name=_('Trust setup'), + error=_('No trusted domain is not configured')) + + entries = None + if domain is not None: + if domain not in self._domains: + raise errors.ValidationError(name=_('trusted domain object'), + error= _('domain is not trusted')) # Now we have a name to check against our list of trusted domains - entry = self.resolve_against_gc(components['domain'], components['name']) - elif 'flatname' in components: + entries = self.search_in_gc(domain, filter, attrs, scope, basedn) + elif flatname is not None: # Flatname was specified, traverse through the list of trusted # domains first to find the proper one + found_flatname = False for domain in self._domains: - if self._domains[domain][0] == components['flatname']: - entry = self.resolve_against_gc(domain, components['name']) - if entry: + if self._domains[domain][0] == flatname: + found_flatname = True + entries = self.search_in_gc(domain, filter, attrs, scope, basedn) + if entries: break - if entry: - try: - test_sid = security.dom_sid(entry) - return unicode(test_sid) - except TypeError, e: - return False - return False + if not found_flatname: + raise errors.ValidationError(name=_('trusted domain object'), + error= _('no trusted domain matched the specified flat name')) + if not entries: + raise errors.NotFound(reason=_('trusted domain object not found')) + + return entries + + def get_trusted_domain_object_sid(self, object_name): + components = normalize_name(object_name) + if not ('domain' in components or 'flatname' in components): + # No domain or realm specified, ambiguous search + raise errors.ValidationError(name=_('trusted domain object'), + error= _('Ambiguous search, user domain was not specified')) + + attrs = ['objectSid'] + filter = '(&(sAMAccountName=%(name)s)(|(objectClass=user)(objectClass=group)))' \ + % dict(name=components['name']) + scope = _ldap.SCOPE_SUBTREE + entries = self.get_trusted_domain_objects(components.get('domain'), + components.get('flatname'), filter, attrs, scope) + + if len(entries) > 1: + # Treat non-unique entries as invalid + raise errors.ValidationError(name=_('trusted domain object'), + error= _('Trusted domain did not return a unique object')) + sid = self.__sid_to_str(entries[0][1]['objectSid'][0]) + try: + test_sid = security.dom_sid(sid) + return unicode(test_sid) + except TypeError, e: + raise errors.ValidationError(name=_('trusted domain object'), + error= _('Trusted domain did not return a valid SID for the object')) def __sid_to_str(self, sid): """ @@ -280,36 +326,33 @@ class DomainValidator(object): dict(domain=info['dns_domain'],message=stderr.strip())) return (None, None) - def resolve_against_gc(self, domain, name): + def search_in_gc(self, domain, filter, attrs, scope, basedn=None): """ - Resolves `name' against trusted domain `domain' using Global Catalog - Returns SID of the `name' or None + Perform LDAP search in a trusted domain `domain' Global Catalog. + Returns resulting entries or None """ - entry = None + entries = None sid = None info = self.__retrieve_trusted_domain_gc_list(domain) if not info: - return None + raise errors.ValidationError(name=_('Trust setup'), + error=_('Cannot retrieve trusted domain GC list')) for (host, port) in info['gc']: - entry = self.__resolve_against_gc(info, host, port, name) - if entry: + entries = self.__search_in_gc(info, host, port, filter, attrs, scope, basedn) + if entries: break - if entry: - l = len(entry) - if l > 2: - # Treat non-unique entries as invalid - return None - sid = self.__sid_to_str(entry[0][1]['objectSid'][0]) - return sid + return entries - def __resolve_against_gc(self, info, host, port, name): + def __search_in_gc(self, info, host, port, filter, attrs, scope, basedn=None): """ - Actual resolution against LDAP server, using SASL GSSAPI authentication + Actual search in AD LDAP server, using SASL GSSAPI authentication Returns LDAP result or None """ conn = IPAdmin(host=host, port=port) auth = self.__extract_trusted_auth(info) + if attrs is None: + attrs = [] if auth: (ccache_name, principal) = self.__kinit_as_trusted_account(info, auth) if ccache_name: @@ -322,13 +365,15 @@ class DomainValidator(object): # records pointing back to the same host name conn.set_option(_ldap.OPT_X_SASL_NOCANON, _ldap.OPT_ON) conn.sasl_interactive_bind_s(None, sasl_auth) - base = DN(*map(lambda p: ('dc', p), info['dns_domain'].split('.'))) + if basedn is None: + # Use domain root base DN + basedn = DN(*map(lambda p: ('dc', p), info['dns_domain'].split('.'))) # We don't use conn.getEntry() because it will attempt to fetch schema from GC and that will fail - filterstr = conn.encode('(&(sAMAccountName=%(name)s)(|(objectClass=user)(objectClass=group)))' % dict(name=name)) - attrlist = conn.encode(['sAMAccountName', 'sAMAccountType', 'objectSid', 'groupType', 'description']) - entry = conn.conn.search_s(str(base), _ldap.SCOPE_SUBTREE, filterstr, attrlist, 0) + filterstr = conn.encode(filter) + attrlist = conn.encode(attrs) + entries = conn.conn.search_s(str(basedn), scope, filterstr, attrlist, 0) os.environ["KRB5CCNAME"] = old_ccache - return entry + return entries def __retrieve_trusted_domain_gc_list(self, domain): """ |