summaryrefslogtreecommitdiffstats
path: root/ipaserver/dcerpc.py
diff options
context:
space:
mode:
Diffstat (limited to 'ipaserver/dcerpc.py')
-rw-r--r--ipaserver/dcerpc.py93
1 files changed, 85 insertions, 8 deletions
diff --git a/ipaserver/dcerpc.py b/ipaserver/dcerpc.py
index 3bc8b63af..07e40c2d3 100644
--- a/ipaserver/dcerpc.py
+++ b/ipaserver/dcerpc.py
@@ -58,6 +58,79 @@ class ExtendedDNControl(_ldap.controls.RequestControl):
def encodeControlValue(self):
return '0\x03\x02\x01\x01'
+class DomainValidator(object):
+ ATTR_FLATNAME = 'ipantflatname'
+ ATTR_SID = 'ipantsecurityidentifier'
+ ATTR_TRUSTED_SID = 'ipanttrusteddomainsid'
+
+ def __init__(self, api):
+ self.api = api
+ self.ldap = self.api.Backend.ldap2
+ self.domain = None
+ self.flatname = None
+ self.dn = None
+ self.sid = None
+ self._domains = None
+
+ def is_configured(self):
+ cn_trust_local = DN(('cn', self.api.env.domain), self.api.env.container_cifsdomains, self.api.env.basedn)
+ try:
+ (dn, entry_attrs) = self.ldap.get_entry(unicode(cn_trust_local), [self.ATTR_FLATNAME, self.ATTR_SID])
+ self.flatname = entry_attrs[self.ATTR_FLATNAME][0]
+ self.sid = entry_attrs[self.ATTR_SID][0]
+ self.dn = dn
+ self.domain = self.api.env.domain
+ except errors.NotFound, e:
+ return False
+ return True
+
+ def get_trusted_domains(self):
+ cn_trust = DN(('cn', 'ad'), self.api.env.container_trusts, self.api.env.basedn)
+ try:
+ search_kw = {'objectClass': 'ipaNTTrustedDomain'}
+ filter = self.ldap.make_filter(search_kw, rules=self.ldap.MATCH_ALL)
+ (entries, truncated) = self.ldap.find_entries(filter=filter, base_dn=unicode(cn_trust),
+ attrs_list=[self.ATTR_TRUSTED_SID, 'dn'])
+
+ return entries
+ except errors.NotFound, e:
+ return []
+
+ def is_trusted_sid_valid(self, sid):
+ if not self.domain:
+ # our domain is not configured or self.is_configured() never run
+ # reject SIDs as we can't check correctness of them
+ return False
+ # Parse sid string to see if it is really in a SID format
+ try:
+ test_sid = security.dom_sid(sid)
+ except TypeError:
+ return False
+ (dom, sid_rid) = test_sid.split()
+ sid_dom = str(dom)
+ # Now we have domain prefix of the sid as sid_dom string and can
+ # analyze it against known prefixes
+ if sid_dom.find(security.SID_NT_AUTHORITY) != 0:
+ # Ignore any potential SIDs that are not S-1-5-*
+ return False
+ if sid_dom.find(self.sid) == 0:
+ # A SID from our own domain cannot be treated as trusted domain's SID
+ return False
+ # At this point we have SID_NT_AUTHORITY family SID and really need to
+ # check it against prefixes of domain SIDs we trust to
+ if not self._domains:
+ self._domains = self.get_trusted_domains()
+ if len(self._domains) == 0:
+ # Our domain is configured but no trusted domains are configured
+ # This means we can't check the correctness of a trusted domain SIDs
+ return False
+ # We have non-zero list of trusted domains and have to go through them
+ # one by one and check their sids as prefixes
+ for (dn, domaininfo) in self._domains:
+ if sid_dom.find(domaininfo[self.ATTR_TRUSTED_SID][0]) == 0:
+ return True
+ return False
+
class TrustDomainInstance(object):
def __init__(self, hostname, creds=None):
@@ -247,20 +320,18 @@ class TrustDomainInstance(object):
self._pipe.CreateTrustedDomainEx2(self._policy_handle, info, self.auth_info, security.SEC_STD_DELETE)
class TrustDomainJoins(object):
- ATTR_FLATNAME = 'ipantflatname'
-
def __init__(self, api):
self.api = api
self.local_domain = None
self.remote_domain = None
- self.ldap = self.api.Backend.ldap2
- cn_trust_local = DN(('cn', self.api.env.domain), self.api.env.container_cifsdomains, self.api.env.basedn)
- (dn, entry_attrs) = self.ldap.get_entry(unicode(cn_trust_local), [self.ATTR_FLATNAME])
- self.local_flatname = entry_attrs[self.ATTR_FLATNAME][0]
- self.local_dn = dn
+ domain_validator = DomainValidator(api)
+ self.configured = domain_validator.is_configured()
- self.__populate_local_domain()
+ if self.configured:
+ self.local_flatname = domain_validator.flatname
+ self.local_dn = domain_validator.dn
+ self.__populate_local_domain()
def __populate_local_domain(self):
# Initialize local domain info using kerberos only
@@ -308,6 +379,9 @@ class TrustDomainJoins(object):
self.remote_domain = rd
def join_ad_full_credentials(self, realm, realm_server, realm_admin, realm_passwd):
+ if not self.configured:
+ return None
+
self.__populate_remote_domain(realm, realm_server, realm_admin, realm_passwd)
if not self.remote_domain.read_only:
trustdom_pass = samba.generate_random_password(128, 128)
@@ -317,6 +391,9 @@ class TrustDomainJoins(object):
return None
def join_ad_ipa_half(self, realm, realm_server, trustdom_passwd):
+ if not self.configured:
+ return None
+
self.__populate_remote_domain(realm, realm_server, realm_passwd=None)
self.local_domain.establish_trust(self.remote_domain, trustdom_passwd)
return dict(local=self.local_domain, remote=self.remote_domain)