summaryrefslogtreecommitdiffstats
path: root/ipaserver/dcerpc.py
diff options
context:
space:
mode:
Diffstat (limited to 'ipaserver/dcerpc.py')
-rw-r--r--ipaserver/dcerpc.py145
1 files changed, 95 insertions, 50 deletions
diff --git a/ipaserver/dcerpc.py b/ipaserver/dcerpc.py
index 384044437..56d8b0319 100644
--- a/ipaserver/dcerpc.py
+++ b/ipaserver/dcerpc.py
@@ -164,16 +164,18 @@ class DomainValidator(object):
except errors.NotFound, e:
return []
- def is_trusted_sid_valid(self, sid):
+ def get_domain_by_sid(self, sid):
if not self.domain:
# our domain is not configured or self.is_configured() never run
# reject SIDs as we can't check correctness of them
- return False
+ raise errors.ValidationError(name='sid',
+ error=_('domain is not configured'))
# Parse sid string to see if it is really in a SID format
try:
test_sid = security.dom_sid(sid)
except TypeError, e:
- return False
+ raise errors.ValidationError(name='sid',
+ error=_('SID is not valid'))
# At this point we have SID_NT_AUTHORITY family SID and really need to
# check it against prefixes of domain SIDs we trust to
if not self._domains:
@@ -181,7 +183,8 @@ class DomainValidator(object):
if len(self._domains) == 0:
# Our domain is configured but no trusted domains are configured
# This means we can't check the correctness of a trusted domain SIDs
- return False
+ raise errors.ValidationError(name='sid',
+ error=_('no trusted domain is configured'))
# We have non-zero list of trusted domains and have to go through them
# one by one and check their sids as prefixes
test_sid_subauths = test_sid.sub_auths
@@ -190,44 +193,87 @@ class DomainValidator(object):
sub_auths = domsid.sub_auths
num_auths = min(test_sid.num_auths, domsid.num_auths)
if test_sid_subauths[:num_auths] == sub_auths[:num_auths]:
- return True
- return False
+ return domain
+ raise errors.NotFound(reason=_('SID does not match any trusted domain'))
+
+ def is_trusted_sid_valid(self, sid):
+ try:
+ self.get_domain_by_sid(sid)
+ except (errors.ValidationError, errors.NotFound):
+ return False
+ else:
+ return True
- def get_sid_trusted_domain_object(self, object_name):
+ def get_trusted_domain_objects(self, domain=None, flatname=None, filter="",
+ attrs=None, scope=_ldap.SCOPE_SUBTREE, basedn=None):
+ """
+ Search for LDAP objects in a trusted domain specified either by `domain'
+ or `flatname'. The actual LDAP search is specified by `filter', `attrs',
+ `scope' and `basedn'. When `basedn' is empty, database root DN is used.
+ """
+ assert domain is not None or flatname is not None
"""Returns SID for the trusted domain object (user or group only)"""
if not self.domain:
# our domain is not configured or self.is_configured() never run
- return None
+ raise errors.ValidationError(name=_('Trust setup'),
+ error=_('Our domain is not configured'))
if not self._domains:
self._domains = self.get_trusted_domains()
if len(self._domains) == 0:
# Our domain is configured but no trusted domains are configured
- return None
-
- components = normalize_name(object_name)
- if not ('domain' in components or 'flatname' in components):
- # No domain or realm specified, ambiguous search
- return False
-
- entry = None
- if 'domain' in components and components['domain'] in self._domains:
+ raise errors.ValidationError(name=_('Trust setup'),
+ error=_('No trusted domain is not configured'))
+
+ entries = None
+ if domain is not None:
+ if domain not in self._domains:
+ raise errors.ValidationError(name=_('trusted domain object'),
+ error= _('domain is not trusted'))
# Now we have a name to check against our list of trusted domains
- entry = self.resolve_against_gc(components['domain'], components['name'])
- elif 'flatname' in components:
+ entries = self.search_in_gc(domain, filter, attrs, scope, basedn)
+ elif flatname is not None:
# Flatname was specified, traverse through the list of trusted
# domains first to find the proper one
+ found_flatname = False
for domain in self._domains:
- if self._domains[domain][0] == components['flatname']:
- entry = self.resolve_against_gc(domain, components['name'])
- if entry:
+ if self._domains[domain][0] == flatname:
+ found_flatname = True
+ entries = self.search_in_gc(domain, filter, attrs, scope, basedn)
+ if entries:
break
- if entry:
- try:
- test_sid = security.dom_sid(entry)
- return unicode(test_sid)
- except TypeError, e:
- return False
- return False
+ if not found_flatname:
+ raise errors.ValidationError(name=_('trusted domain object'),
+ error= _('no trusted domain matched the specified flat name'))
+ if not entries:
+ raise errors.NotFound(reason=_('trusted domain object not found'))
+
+ return entries
+
+ def get_trusted_domain_object_sid(self, object_name):
+ components = normalize_name(object_name)
+ if not ('domain' in components or 'flatname' in components):
+ # No domain or realm specified, ambiguous search
+ raise errors.ValidationError(name=_('trusted domain object'),
+ error= _('Ambiguous search, user domain was not specified'))
+
+ attrs = ['objectSid']
+ filter = '(&(sAMAccountName=%(name)s)(|(objectClass=user)(objectClass=group)))' \
+ % dict(name=components['name'])
+ scope = _ldap.SCOPE_SUBTREE
+ entries = self.get_trusted_domain_objects(components.get('domain'),
+ components.get('flatname'), filter, attrs, scope)
+
+ if len(entries) > 1:
+ # Treat non-unique entries as invalid
+ raise errors.ValidationError(name=_('trusted domain object'),
+ error= _('Trusted domain did not return a unique object'))
+ sid = self.__sid_to_str(entries[0][1]['objectSid'][0])
+ try:
+ test_sid = security.dom_sid(sid)
+ return unicode(test_sid)
+ except TypeError, e:
+ raise errors.ValidationError(name=_('trusted domain object'),
+ error= _('Trusted domain did not return a valid SID for the object'))
def __sid_to_str(self, sid):
"""
@@ -280,36 +326,33 @@ class DomainValidator(object):
dict(domain=info['dns_domain'],message=stderr.strip()))
return (None, None)
- def resolve_against_gc(self, domain, name):
+ def search_in_gc(self, domain, filter, attrs, scope, basedn=None):
"""
- Resolves `name' against trusted domain `domain' using Global Catalog
- Returns SID of the `name' or None
+ Perform LDAP search in a trusted domain `domain' Global Catalog.
+ Returns resulting entries or None
"""
- entry = None
+ entries = None
sid = None
info = self.__retrieve_trusted_domain_gc_list(domain)
if not info:
- return None
+ raise errors.ValidationError(name=_('Trust setup'),
+ error=_('Cannot retrieve trusted domain GC list'))
for (host, port) in info['gc']:
- entry = self.__resolve_against_gc(info, host, port, name)
- if entry:
+ entries = self.__search_in_gc(info, host, port, filter, attrs, scope, basedn)
+ if entries:
break
- if entry:
- l = len(entry)
- if l > 2:
- # Treat non-unique entries as invalid
- return None
- sid = self.__sid_to_str(entry[0][1]['objectSid'][0])
- return sid
+ return entries
- def __resolve_against_gc(self, info, host, port, name):
+ def __search_in_gc(self, info, host, port, filter, attrs, scope, basedn=None):
"""
- Actual resolution against LDAP server, using SASL GSSAPI authentication
+ Actual search in AD LDAP server, using SASL GSSAPI authentication
Returns LDAP result or None
"""
conn = IPAdmin(host=host, port=port)
auth = self.__extract_trusted_auth(info)
+ if attrs is None:
+ attrs = []
if auth:
(ccache_name, principal) = self.__kinit_as_trusted_account(info, auth)
if ccache_name:
@@ -322,13 +365,15 @@ class DomainValidator(object):
# records pointing back to the same host name
conn.set_option(_ldap.OPT_X_SASL_NOCANON, _ldap.OPT_ON)
conn.sasl_interactive_bind_s(None, sasl_auth)
- base = DN(*map(lambda p: ('dc', p), info['dns_domain'].split('.')))
+ if basedn is None:
+ # Use domain root base DN
+ basedn = DN(*map(lambda p: ('dc', p), info['dns_domain'].split('.')))
# We don't use conn.getEntry() because it will attempt to fetch schema from GC and that will fail
- filterstr = conn.encode('(&(sAMAccountName=%(name)s)(|(objectClass=user)(objectClass=group)))' % dict(name=name))
- attrlist = conn.encode(['sAMAccountName', 'sAMAccountType', 'objectSid', 'groupType', 'description'])
- entry = conn.conn.search_s(str(base), _ldap.SCOPE_SUBTREE, filterstr, attrlist, 0)
+ filterstr = conn.encode(filter)
+ attrlist = conn.encode(attrs)
+ entries = conn.conn.search_s(str(basedn), scope, filterstr, attrlist, 0)
os.environ["KRB5CCNAME"] = old_ccache
- return entry
+ return entries
def __retrieve_trusted_domain_gc_list(self, domain):
"""