diff options
Diffstat (limited to 'ipaserver/dcerpc.py')
-rw-r--r-- | ipaserver/dcerpc.py | 93 |
1 files changed, 85 insertions, 8 deletions
diff --git a/ipaserver/dcerpc.py b/ipaserver/dcerpc.py index 3bc8b63af..07e40c2d3 100644 --- a/ipaserver/dcerpc.py +++ b/ipaserver/dcerpc.py @@ -58,6 +58,79 @@ class ExtendedDNControl(_ldap.controls.RequestControl): def encodeControlValue(self): return '0\x03\x02\x01\x01' +class DomainValidator(object): + ATTR_FLATNAME = 'ipantflatname' + ATTR_SID = 'ipantsecurityidentifier' + ATTR_TRUSTED_SID = 'ipanttrusteddomainsid' + + def __init__(self, api): + self.api = api + self.ldap = self.api.Backend.ldap2 + self.domain = None + self.flatname = None + self.dn = None + self.sid = None + self._domains = None + + def is_configured(self): + cn_trust_local = DN(('cn', self.api.env.domain), self.api.env.container_cifsdomains, self.api.env.basedn) + try: + (dn, entry_attrs) = self.ldap.get_entry(unicode(cn_trust_local), [self.ATTR_FLATNAME, self.ATTR_SID]) + self.flatname = entry_attrs[self.ATTR_FLATNAME][0] + self.sid = entry_attrs[self.ATTR_SID][0] + self.dn = dn + self.domain = self.api.env.domain + except errors.NotFound, e: + return False + return True + + def get_trusted_domains(self): + cn_trust = DN(('cn', 'ad'), self.api.env.container_trusts, self.api.env.basedn) + try: + search_kw = {'objectClass': 'ipaNTTrustedDomain'} + filter = self.ldap.make_filter(search_kw, rules=self.ldap.MATCH_ALL) + (entries, truncated) = self.ldap.find_entries(filter=filter, base_dn=unicode(cn_trust), + attrs_list=[self.ATTR_TRUSTED_SID, 'dn']) + + return entries + except errors.NotFound, e: + return [] + + def is_trusted_sid_valid(self, sid): + if not self.domain: + # our domain is not configured or self.is_configured() never run + # reject SIDs as we can't check correctness of them + return False + # Parse sid string to see if it is really in a SID format + try: + test_sid = security.dom_sid(sid) + except TypeError: + return False + (dom, sid_rid) = test_sid.split() + sid_dom = str(dom) + # Now we have domain prefix of the sid as sid_dom string and can + # analyze it against known prefixes + if sid_dom.find(security.SID_NT_AUTHORITY) != 0: + # Ignore any potential SIDs that are not S-1-5-* + return False + if sid_dom.find(self.sid) == 0: + # A SID from our own domain cannot be treated as trusted domain's SID + return False + # At this point we have SID_NT_AUTHORITY family SID and really need to + # check it against prefixes of domain SIDs we trust to + if not self._domains: + self._domains = self.get_trusted_domains() + if len(self._domains) == 0: + # Our domain is configured but no trusted domains are configured + # This means we can't check the correctness of a trusted domain SIDs + return False + # We have non-zero list of trusted domains and have to go through them + # one by one and check their sids as prefixes + for (dn, domaininfo) in self._domains: + if sid_dom.find(domaininfo[self.ATTR_TRUSTED_SID][0]) == 0: + return True + return False + class TrustDomainInstance(object): def __init__(self, hostname, creds=None): @@ -247,20 +320,18 @@ class TrustDomainInstance(object): self._pipe.CreateTrustedDomainEx2(self._policy_handle, info, self.auth_info, security.SEC_STD_DELETE) class TrustDomainJoins(object): - ATTR_FLATNAME = 'ipantflatname' - def __init__(self, api): self.api = api self.local_domain = None self.remote_domain = None - self.ldap = self.api.Backend.ldap2 - cn_trust_local = DN(('cn', self.api.env.domain), self.api.env.container_cifsdomains, self.api.env.basedn) - (dn, entry_attrs) = self.ldap.get_entry(unicode(cn_trust_local), [self.ATTR_FLATNAME]) - self.local_flatname = entry_attrs[self.ATTR_FLATNAME][0] - self.local_dn = dn + domain_validator = DomainValidator(api) + self.configured = domain_validator.is_configured() - self.__populate_local_domain() + if self.configured: + self.local_flatname = domain_validator.flatname + self.local_dn = domain_validator.dn + self.__populate_local_domain() def __populate_local_domain(self): # Initialize local domain info using kerberos only @@ -308,6 +379,9 @@ class TrustDomainJoins(object): self.remote_domain = rd def join_ad_full_credentials(self, realm, realm_server, realm_admin, realm_passwd): + if not self.configured: + return None + self.__populate_remote_domain(realm, realm_server, realm_admin, realm_passwd) if not self.remote_domain.read_only: trustdom_pass = samba.generate_random_password(128, 128) @@ -317,6 +391,9 @@ class TrustDomainJoins(object): return None def join_ad_ipa_half(self, realm, realm_server, trustdom_passwd): + if not self.configured: + return None + self.__populate_remote_domain(realm, realm_server, realm_passwd=None) self.local_domain.establish_trust(self.remote_domain, trustdom_passwd) return dict(local=self.local_domain, remote=self.remote_domain) |