summaryrefslogtreecommitdiffstats
path: root/ipaserver/dcerpc.py
diff options
context:
space:
mode:
Diffstat (limited to 'ipaserver/dcerpc.py')
-rw-r--r--ipaserver/dcerpc.py164
1 files changed, 127 insertions, 37 deletions
diff --git a/ipaserver/dcerpc.py b/ipaserver/dcerpc.py
index 0f98ce83..88ad928e 100644
--- a/ipaserver/dcerpc.py
+++ b/ipaserver/dcerpc.py
@@ -61,6 +61,7 @@ The code in this module relies heavily on samba4-python package
and Samba4 python bindings.
""")
+
def is_sid_valid(sid):
try:
security.dom_sid(sid)
@@ -69,6 +70,7 @@ def is_sid_valid(sid):
else:
return True
+
access_denied_error = errors.ACIError(info=_('CIFS server denied your credentials'))
dcerpc_error_codes = {
-1073741823:
@@ -113,6 +115,7 @@ class ExtendedDNControl(LDAPControl):
def encodeControlValue(self, value=None):
return '0\x03\x02\x01\x01'
+
class DomainValidator(object):
ATTR_FLATNAME = 'ipantflatname'
ATTR_SID = 'ipantsecurityidentifier'
@@ -184,6 +187,18 @@ class DomainValidator(object):
except errors.NotFound, e:
return []
+ def set_trusted_domains(self):
+ # At this point we have SID_NT_AUTHORITY family SID and really need to
+ # check it against prefixes of domain SIDs we trust to
+ if not self._domains:
+ self._domains = self.get_trusted_domains()
+ if len(self._domains) == 0:
+ # Our domain is configured but no trusted domains are configured
+ # This means we can't check the correctness of a trusted
+ # domain SIDs
+ raise errors.ValidationError(name='sid',
+ error=_('no trusted domain is configured'))
+
def get_domain_by_sid(self, sid, exact_match=False):
if not self.domain:
# our domain is not configured or self.is_configured() never run
@@ -200,14 +215,7 @@ class DomainValidator(object):
# At this point we have SID_NT_AUTHORITY family SID and really need to
# check it against prefixes of domain SIDs we trust to
- if not self._domains:
- self._domains = self.get_trusted_domains()
- if len(self._domains) == 0:
- # Our domain is configured but no trusted domains are configured
- # This means we can't check the correctness of a trusted
- # domain SIDs
- raise errors.ValidationError(name='sid',
- error=_('no trusted domain is configured'))
+ self.set_trusted_domains()
# We have non-zero list of trusted domains and have to go through
# them one by one and check their sids as prefixes / exact match
@@ -284,7 +292,7 @@ class DomainValidator(object):
raise errors.ValidationError(name=_('trusted domain object'),
error= _('domain is not trusted'))
# Now we have a name to check against our list of trusted domains
- entries = self.search_in_gc(domain, filter, attrs, scope, basedn)
+ entries = self.search_in_dc(domain, filter, attrs, scope, basedn)
elif flatname is not None:
# Flatname was specified, traverse through the list of trusted
# domains first to find the proper one
@@ -292,7 +300,7 @@ class DomainValidator(object):
for domain in self._domains:
if self._domains[domain][0] == flatname:
found_flatname = True
- entries = self.search_in_gc(domain, filter, attrs, scope, basedn)
+ entries = self.search_in_dc(domain, filter, attrs, scope, basedn)
if entries:
break
if not found_flatname:
@@ -436,48 +444,126 @@ class DomainValidator(object):
dict(domain=info['dns_domain'],message=stderr.strip()))
return (None, None)
- def search_in_gc(self, domain, filter, attrs, scope, basedn=None):
+ def kinit_as_http(self, domain):
+ """
+ Initializes ccache with http service credentials.
+
+ Applies session code defaults for ccache directory and naming prefix.
+ Session code uses krbccache_prefix+<pid>, we use
+ krbccache_prefix+<TD>+<domain netbios name> so there is no clash.
+
+ Returns tuple (ccache path, principal) where (None, None) signifes an
+ error on ccache initialization
+ """
+
+ domain_suffix = domain.replace('.', '-')
+
+ ccache_name = "%sTD%s" % (krbccache_prefix, domain_suffix)
+ ccache_path = os.path.join(krbccache_dir, ccache_name)
+
+ realm = api.env.realm
+ hostname = api.env.host
+ principal = 'HTTP/%s@%s' % (hostname, realm)
+ keytab = '/etc/httpd/conf/ipa.keytab'
+
+ # Destroy the contents of the ccache
+ root_logger.debug('Destroying the contents of the separate ccache')
+
+ (stdout, stderr, returncode) = ipautil.run(
+ ['/usr/bin/kdestroy', '-A', '-c', ccache_path],
+ env={'KRB5CCNAME': ccache_path},
+ raiseonerr=False)
+
+ # Destroy the contents of the ccache
+ root_logger.debug('Running kinit from ipa.keytab to obtain HTTP '
+ 'service principal with MS-PAC attached.')
+
+ (stdout, stderr, returncode) = ipautil.run(
+ ['/usr/bin/kinit', '-kt', keytab, principal],
+ env={'KRB5CCNAME': ccache_path},
+ raiseonerr=False)
+
+ if returncode == 0:
+ return (ccache_path, principal)
+ else:
+ return (None, None)
+
+ def search_in_dc(self, domain, filter, attrs, scope, basedn=None,
+ use_http=False, quiet=False):
"""
- Perform LDAP search in a trusted domain `domain' Global Catalog.
- Returns resulting entries or None
+ Perform LDAP search in a trusted domain `domain' Domain Controller.
+ Returns resulting entries or None.
+
+ If use_http is set to True, the search is conducted using
+ HTTP service credentials.
"""
+
entries = None
- sid = None
+
info = self.__retrieve_trusted_domain_gc_list(domain)
+
if not info:
- raise errors.ValidationError(name=_('Trust setup'),
+ raise errors.ValidationError(
+ name=_('Trust setup'),
error=_('Cannot retrieve trusted domain GC list'))
+
for (host, port) in info['gc']:
- entries = self.__search_in_gc(info, host, port, filter, attrs, scope, basedn)
+ entries = self.__search_in_dc(info, host, port, filter, attrs,
+ scope, basedn=basedn,
+ use_http=use_http,
+ quiet=quiet)
if entries:
break
return entries
- def __search_in_gc(self, info, host, port, filter, attrs, scope, basedn=None):
+ def __search_in_dc(self, info, host, port, filter, attrs, scope,
+ basedn=None, use_http=False, quiet=False):
"""
Actual search in AD LDAP server, using SASL GSSAPI authentication
- Returns LDAP result or None
+ Returns LDAP result or None.
"""
- conn = IPAdmin(host=host, port=port, no_schema=True, decode_attrs=False)
- auth = self.__extract_trusted_auth(info)
- if attrs is None:
- attrs = []
- if auth:
- (ccache_name, principal) = self.__kinit_as_trusted_account(info, auth)
- if ccache_name:
- old_ccache = os.environ.get('KRB5CCNAME')
- os.environ["KRB5CCNAME"] = ccache_name
- # OPT_X_SASL_NOCANON is used to avoid hard requirement for PTR
- # records pointing back to the same host name
- conn.set_option(_ldap.OPT_X_SASL_NOCANON, _ldap.OPT_ON)
- conn.do_sasl_gssapi_bind()
- if basedn is None:
- # Use domain root base DN
- basedn = DN(*map(lambda p: ('dc', p), info['dns_domain'].split('.')))
- entries = conn.get_entries(basedn, scope, filter, attrs)
- os.environ["KRB5CCNAME"] = old_ccache
- return entries
+
+ if use_http:
+ (ccache_name, principal) = self.kinit_as_http(info['dns_domain'])
+ else:
+ auth = self.__extract_trusted_auth(info)
+
+ if not auth:
+ return None
+
+ (ccache_name, principal) = self.__kinit_as_trusted_account(info,
+ auth)
+
+ if ccache_name:
+ with installutils.private_ccache(path=ccache_name):
+ entries = None
+
+ try:
+ conn = IPAdmin(host=host,
+ port=389, # query the AD DC
+ no_schema=True,
+ decode_attrs=False,
+ sasl_nocanon=True)
+ # sasl_nocanon used to avoid hard requirement for PTR
+ # records pointing back to the same host name
+
+ conn.do_sasl_gssapi_bind()
+
+ if basedn is None:
+ # Use domain root base DN
+ basedn = ipautil.realm_to_suffix(info['dns_domain'])
+
+ entries = conn.get_entries(basedn, scope, filter, attrs)
+ except Exception, e:
+ msg = "Search on AD DC {host}:{port} failed with: {err}"\
+ .format(host=host, port=str(port), err=str(e))
+ if quiet:
+ root_logger.debug(msg)
+ else:
+ root_logger.warning(msg)
+ finally:
+ return entries
def __retrieve_trusted_domain_gc_list(self, domain):
"""
@@ -508,9 +594,13 @@ class DomainValidator(object):
except RuntimeError, e:
finddc_error = e
+ if not self._domains:
+ self._domains = self.get_trusted_domains()
+
info = dict()
info['auth'] = self._domains[domain][2]
servers = []
+
if result:
info['name'] = unicode(result.domain_name)
info['dns_domain'] = unicode(result.dns_domain)