summaryrefslogtreecommitdiffstats
path: root/ipaserver/dcerpc.py
diff options
context:
space:
mode:
authorTomas Babej <tbabej@redhat.com>2013-07-17 15:55:36 +0200
committerAlexander Bokovoy <abokovoy@redhat.com>2013-07-23 16:24:33 +0300
commit17c7d46c254de8301a1c328155d245631d5c03e4 (patch)
tree03bf7d5b8c970f6d353c5a082eb992dc969c7c9d /ipaserver/dcerpc.py
parent84b2269589c115cae2d2bcec32fec602143fc42e (diff)
downloadfreeipa-17c7d46c254de8301a1c328155d245631d5c03e4.tar.gz
freeipa-17c7d46c254de8301a1c328155d245631d5c03e4.tar.xz
freeipa-17c7d46c254de8301a1c328155d245631d5c03e4.zip
Use AD LDAP probing to create trusted domain ID range
When creating a trusted domain ID range, probe AD DC to get information about ID space leveraged by POSIX users already defined in AD, and create an ID range with according parameters. For more details: http://www.freeipa.org/page/V3/Use_posix_attributes_defined_in_AD https://fedorahosted.org/freeipa/ticket/3649
Diffstat (limited to 'ipaserver/dcerpc.py')
-rw-r--r--ipaserver/dcerpc.py164
1 files changed, 127 insertions, 37 deletions
diff --git a/ipaserver/dcerpc.py b/ipaserver/dcerpc.py
index 0f98ce83c..88ad928eb 100644
--- a/ipaserver/dcerpc.py
+++ b/ipaserver/dcerpc.py
@@ -61,6 +61,7 @@ The code in this module relies heavily on samba4-python package
and Samba4 python bindings.
""")
+
def is_sid_valid(sid):
try:
security.dom_sid(sid)
@@ -69,6 +70,7 @@ def is_sid_valid(sid):
else:
return True
+
access_denied_error = errors.ACIError(info=_('CIFS server denied your credentials'))
dcerpc_error_codes = {
-1073741823:
@@ -113,6 +115,7 @@ class ExtendedDNControl(LDAPControl):
def encodeControlValue(self, value=None):
return '0\x03\x02\x01\x01'
+
class DomainValidator(object):
ATTR_FLATNAME = 'ipantflatname'
ATTR_SID = 'ipantsecurityidentifier'
@@ -184,6 +187,18 @@ class DomainValidator(object):
except errors.NotFound, e:
return []
+ def set_trusted_domains(self):
+ # At this point we have SID_NT_AUTHORITY family SID and really need to
+ # check it against prefixes of domain SIDs we trust to
+ if not self._domains:
+ self._domains = self.get_trusted_domains()
+ if len(self._domains) == 0:
+ # Our domain is configured but no trusted domains are configured
+ # This means we can't check the correctness of a trusted
+ # domain SIDs
+ raise errors.ValidationError(name='sid',
+ error=_('no trusted domain is configured'))
+
def get_domain_by_sid(self, sid, exact_match=False):
if not self.domain:
# our domain is not configured or self.is_configured() never run
@@ -200,14 +215,7 @@ class DomainValidator(object):
# At this point we have SID_NT_AUTHORITY family SID and really need to
# check it against prefixes of domain SIDs we trust to
- if not self._domains:
- self._domains = self.get_trusted_domains()
- if len(self._domains) == 0:
- # Our domain is configured but no trusted domains are configured
- # This means we can't check the correctness of a trusted
- # domain SIDs
- raise errors.ValidationError(name='sid',
- error=_('no trusted domain is configured'))
+ self.set_trusted_domains()
# We have non-zero list of trusted domains and have to go through
# them one by one and check their sids as prefixes / exact match
@@ -284,7 +292,7 @@ class DomainValidator(object):
raise errors.ValidationError(name=_('trusted domain object'),
error= _('domain is not trusted'))
# Now we have a name to check against our list of trusted domains
- entries = self.search_in_gc(domain, filter, attrs, scope, basedn)
+ entries = self.search_in_dc(domain, filter, attrs, scope, basedn)
elif flatname is not None:
# Flatname was specified, traverse through the list of trusted
# domains first to find the proper one
@@ -292,7 +300,7 @@ class DomainValidator(object):
for domain in self._domains:
if self._domains[domain][0] == flatname:
found_flatname = True
- entries = self.search_in_gc(domain, filter, attrs, scope, basedn)
+ entries = self.search_in_dc(domain, filter, attrs, scope, basedn)
if entries:
break
if not found_flatname:
@@ -436,48 +444,126 @@ class DomainValidator(object):
dict(domain=info['dns_domain'],message=stderr.strip()))
return (None, None)
- def search_in_gc(self, domain, filter, attrs, scope, basedn=None):
+ def kinit_as_http(self, domain):
+ """
+ Initializes ccache with http service credentials.
+
+ Applies session code defaults for ccache directory and naming prefix.
+ Session code uses krbccache_prefix+<pid>, we use
+ krbccache_prefix+<TD>+<domain netbios name> so there is no clash.
+
+ Returns tuple (ccache path, principal) where (None, None) signifes an
+ error on ccache initialization
+ """
+
+ domain_suffix = domain.replace('.', '-')
+
+ ccache_name = "%sTD%s" % (krbccache_prefix, domain_suffix)
+ ccache_path = os.path.join(krbccache_dir, ccache_name)
+
+ realm = api.env.realm
+ hostname = api.env.host
+ principal = 'HTTP/%s@%s' % (hostname, realm)
+ keytab = '/etc/httpd/conf/ipa.keytab'
+
+ # Destroy the contents of the ccache
+ root_logger.debug('Destroying the contents of the separate ccache')
+
+ (stdout, stderr, returncode) = ipautil.run(
+ ['/usr/bin/kdestroy', '-A', '-c', ccache_path],
+ env={'KRB5CCNAME': ccache_path},
+ raiseonerr=False)
+
+ # Destroy the contents of the ccache
+ root_logger.debug('Running kinit from ipa.keytab to obtain HTTP '
+ 'service principal with MS-PAC attached.')
+
+ (stdout, stderr, returncode) = ipautil.run(
+ ['/usr/bin/kinit', '-kt', keytab, principal],
+ env={'KRB5CCNAME': ccache_path},
+ raiseonerr=False)
+
+ if returncode == 0:
+ return (ccache_path, principal)
+ else:
+ return (None, None)
+
+ def search_in_dc(self, domain, filter, attrs, scope, basedn=None,
+ use_http=False, quiet=False):
"""
- Perform LDAP search in a trusted domain `domain' Global Catalog.
- Returns resulting entries or None
+ Perform LDAP search in a trusted domain `domain' Domain Controller.
+ Returns resulting entries or None.
+
+ If use_http is set to True, the search is conducted using
+ HTTP service credentials.
"""
+
entries = None
- sid = None
+
info = self.__retrieve_trusted_domain_gc_list(domain)
+
if not info:
- raise errors.ValidationError(name=_('Trust setup'),
+ raise errors.ValidationError(
+ name=_('Trust setup'),
error=_('Cannot retrieve trusted domain GC list'))
+
for (host, port) in info['gc']:
- entries = self.__search_in_gc(info, host, port, filter, attrs, scope, basedn)
+ entries = self.__search_in_dc(info, host, port, filter, attrs,
+ scope, basedn=basedn,
+ use_http=use_http,
+ quiet=quiet)
if entries:
break
return entries
- def __search_in_gc(self, info, host, port, filter, attrs, scope, basedn=None):
+ def __search_in_dc(self, info, host, port, filter, attrs, scope,
+ basedn=None, use_http=False, quiet=False):
"""
Actual search in AD LDAP server, using SASL GSSAPI authentication
- Returns LDAP result or None
+ Returns LDAP result or None.
"""
- conn = IPAdmin(host=host, port=port, no_schema=True, decode_attrs=False)
- auth = self.__extract_trusted_auth(info)
- if attrs is None:
- attrs = []
- if auth:
- (ccache_name, principal) = self.__kinit_as_trusted_account(info, auth)
- if ccache_name:
- old_ccache = os.environ.get('KRB5CCNAME')
- os.environ["KRB5CCNAME"] = ccache_name
- # OPT_X_SASL_NOCANON is used to avoid hard requirement for PTR
- # records pointing back to the same host name
- conn.set_option(_ldap.OPT_X_SASL_NOCANON, _ldap.OPT_ON)
- conn.do_sasl_gssapi_bind()
- if basedn is None:
- # Use domain root base DN
- basedn = DN(*map(lambda p: ('dc', p), info['dns_domain'].split('.')))
- entries = conn.get_entries(basedn, scope, filter, attrs)
- os.environ["KRB5CCNAME"] = old_ccache
- return entries
+
+ if use_http:
+ (ccache_name, principal) = self.kinit_as_http(info['dns_domain'])
+ else:
+ auth = self.__extract_trusted_auth(info)
+
+ if not auth:
+ return None
+
+ (ccache_name, principal) = self.__kinit_as_trusted_account(info,
+ auth)
+
+ if ccache_name:
+ with installutils.private_ccache(path=ccache_name):
+ entries = None
+
+ try:
+ conn = IPAdmin(host=host,
+ port=389, # query the AD DC
+ no_schema=True,
+ decode_attrs=False,
+ sasl_nocanon=True)
+ # sasl_nocanon used to avoid hard requirement for PTR
+ # records pointing back to the same host name
+
+ conn.do_sasl_gssapi_bind()
+
+ if basedn is None:
+ # Use domain root base DN
+ basedn = ipautil.realm_to_suffix(info['dns_domain'])
+
+ entries = conn.get_entries(basedn, scope, filter, attrs)
+ except Exception, e:
+ msg = "Search on AD DC {host}:{port} failed with: {err}"\
+ .format(host=host, port=str(port), err=str(e))
+ if quiet:
+ root_logger.debug(msg)
+ else:
+ root_logger.warning(msg)
+ finally:
+ return entries
def __retrieve_trusted_domain_gc_list(self, domain):
"""
@@ -508,9 +594,13 @@ class DomainValidator(object):
except RuntimeError, e:
finddc_error = e
+ if not self._domains:
+ self._domains = self.get_trusted_domains()
+
info = dict()
info['auth'] = self._domains[domain][2]
servers = []
+
if result:
info['name'] = unicode(result.domain_name)
info['dns_domain'] = unicode(result.dns_domain)