From e5117bd995aa10aab59f487058b4a57c8b695be9 Mon Sep 17 00:00:00 2001 From: Ana Krivokapic Date: Fri, 31 May 2013 12:01:23 +0200 Subject: Fail when adding a trust with a different range When adding a trust, if an id range already exists for this trust, and options --base-id/--range-size are provided with the trust-add command, trust-add should fail. https://fedorahosted.org/freeipa/ticket/3635 --- ipalib/plugins/trust.py | 217 +++++++++++++++++++++++++++++++++--------------- ipaserver/dcerpc.py | 15 +++- 2 files changed, 160 insertions(+), 72 deletions(-) diff --git a/ipalib/plugins/trust.py b/ipalib/plugins/trust.py index 3cb0ed980..5c9360b57 100644 --- a/ipalib/plugins/trust.py +++ b/ipalib/plugins/trust.py @@ -155,6 +155,8 @@ _trust_type_option = StrEnum('trust_type', autofill=True, ) +DEFAULT_RANGE_SIZE = 200000 + def trust_type_string(level): """ Returns a string representing a type of the trust. The original field is an enum: @@ -287,7 +289,7 @@ sides. Int('range_size?', cli_name='range_size', label=_('Size of the ID range reserved for the trusted domain'), - default=200000, + default=DEFAULT_RANGE_SIZE, autofill=True ), ) @@ -296,20 +298,12 @@ sides. has_output_params = LDAPCreate.has_output_params + trust_output_params def execute(self, *keys, **options): - if not _murmur_installed and 'base_id' not in options: - raise errors.ValidationError(name=_('missing base_id'), - error=_('pysss_murmur is not available on the server ' \ - 'and no base-id is given.')) - - if 'trust_type' in options: - if options['trust_type'] == u'ad': - result = self.execute_ad(*keys, **options) - else: - raise errors.ValidationError(name=_('trust type'), error=_('only "ad" is supported')) - else: - raise errors.RequirementError(name=_('trust type')) + full_join = self.validate_options(*keys, **options) + old_range, range_name, dom_sid = self.validate_range(*keys, **options) + result = self.execute_ad(full_join, *keys, **options) - self.add_range(*keys, **options) + if not old_range: + self.add_range(range_name, dom_sid, **options) trust_filter = "cn=%s" % result['value'] ldap = self.obj.backend @@ -325,32 +319,132 @@ sides. return result - def add_range(self, *keys, **options): - new_obj = api.Command['trust_show'](keys[-1]) - dom_sid = new_obj['result']['ipanttrusteddomainsid'][0]; + def validate_options(self, *keys, **options): + if not _bindings_installed: + raise errors.NotFound( + name=_('AD Trust setup'), + reason=_( + 'Cannot perform join operation without Samba 4 support ' + 'installed. Make sure you have installed server-trust-ad ' + 'sub-package of IPA' + ) + ) + + if not _murmur_installed and 'base_id' not in options: + raise errors.ValidationError( + name=_('missing base_id'), + error=_( + 'pysss_murmur is not available on the server ' + 'and no base-id is given.' + ) + ) + + if 'trust_type' not in options: + raise errors.RequirementError(name=_('trust type')) + + if options['trust_type'] != u'ad': + raise errors.ValidationError( + name=_('trust type'), + error=_('only "ad" is supported') + ) + + self.trustinstance = ipaserver.dcerpc.TrustDomainJoins(self.api) + if not self.trustinstance.configured: + raise errors.NotFound( + name=_('AD Trust setup'), + reason=_( + 'Cannot perform join operation without own domain ' + 'configured. Make sure you have run ipa-adtrust-install ' + 'on the IPA server first' + ) + ) + + self.realm_server = options.get('realm_server') + self.realm_admin = options.get('realm_admin') + self.realm_passwd = options.get('realm_passwd') + + if self.realm_admin: + names = self.realm_admin.split('@') + + if len(names) > 1: + # realm admin name is in UPN format, user@realm, check that + # realm is the same as the one that we are attempting to trust + if keys[-1].lower() != names[-1].lower(): + raise errors.ValidationError( + name=_('AD Trust setup'), + error=_( + 'Trusted domain and administrator account use ' + 'different realms' + ) + ) + self.realm_admin = names[0] + + if not self.realm_passwd: + raise errors.ValidationError( + name=_('AD Trust setup'), + error=_('Realm administrator password should be specified') + ) + return True + + return False + def validate_range(self, *keys, **options): + # If a range for this trusted domain already exists, + # '--base-id' or '--range-size' options should not be specified range_name = keys[-1].upper()+'_id_range' try: old_range = api.Command['idrange_show'](range_name) - except errors.NotFound, e: + except errors.NotFound: old_range = None - if old_range: - old_dom_sid = old_range['result']['ipanttrusteddomainsid'][0]; + base_id = options.get('base_id') + range_size = options.get('range_size') != DEFAULT_RANGE_SIZE + + if old_range and (base_id or range_size): + raise errors.ValidationError( + name=_('id range'), + error=_( + 'An id range already exists for this trust. ' + 'You should either delete the old range, or ' + 'exclude --base-id/--range-size options from the command.' + ) + ) + + # If a range for this trusted domain already exists, + # domain SID must also match + self.trustinstance.populate_remote_domain( + keys[-1], + self.realm_server, + self.realm_admin, + self.realm_passwd + ) + dom_sid = self.trustinstance.remote_domain.info['sid'] - if old_dom_sid == dom_sid: - return + if old_range: + old_dom_sid = old_range['result']['ipanttrusteddomainsid'][0] + + if old_dom_sid != dom_sid: + raise errors.ValidationError( + name=_('range exists'), + error=_( + 'ID range with the same name but different domain SID ' + 'already exists. The ID range for the new trusted ' + 'domain must be created manually.' + ) + ) - raise errors.ValidationError(name=_('range exists'), - error=_('ID range with the same name but different ' \ - 'domain SID already exists. The ID range for ' \ - 'the new trusted domain must be created manually.')) + return old_range, range_name, dom_sid - if 'base_id' in options: - base_id = options['base_id'] - else: - base_id = 200000 + (pysss_murmur.murmurhash3(dom_sid, len(dom_sid), 0xdeadbeef) % 10000) * 200000 + def add_range(self, range_name, dom_sid, **options): + base_id = options.get('base_id') + if not base_id: + base_id = DEFAULT_RANGE_SIZE + ( + pysss_murmur.murmurhash3( + dom_sid, + len(dom_sid), 0xdeadbeef + ) % 10000 + ) * DEFAULT_RANGE_SIZE # Add new ID range api.Command['idrange_add'](range_name, @@ -359,51 +453,26 @@ sides. ipabaserid=0, ipanttrusteddomainsid=dom_sid) - def execute_ad(self, *keys, **options): + def execute_ad(self, full_join, *keys, **options): # Join domain using full credentials and with random trustdom # secret (will be generated by the join method) - trustinstance = None - if not _bindings_installed: - raise errors.NotFound(name=_('AD Trust setup'), - reason=_('''Cannot perform join operation without Samba 4 support installed. - Make sure you have installed server-trust-ad sub-package of IPA''')) - - if 'realm_server' not in options: - realm_server = None - else: - realm_server = options['realm_server'] - - trustinstance = ipaserver.dcerpc.TrustDomainJoins(self.api) - if not trustinstance.configured: - raise errors.NotFound(name=_('AD Trust setup'), - reason=_('''Cannot perform join operation without own domain configured. - Make sure you have run ipa-adtrust-install on the IPA server first''')) - try: - existing_trust = api.Command['trust_show'](keys[-1]) + api.Command['trust_show'](keys[-1]) summary = _('Re-established trust to domain "%(value)s"') except errors.NotFound: summary = self.msg_summary + # 1. Full access to the remote domain. Use admin credentials and # generate random trustdom password to do work on both sides - if 'realm_admin' in options: - realm_admin = options['realm_admin'] - names = realm_admin.split('@') - if len(names) > 1: - # realm admin name is in UPN format, user@realm, check that - # realm is the same as the one that we are attempting to trust - if keys[-1].lower() != names[-1].lower(): - raise errors.ValidationError(name=_('AD Trust setup'), - error=_('Trusted domain and administrator account use different realms')) - realm_admin = names[0] - - if 'realm_passwd' not in options: - raise errors.ValidationError(name=_('AD Trust setup'), error=_('Realm administrator password should be specified')) - realm_passwd = options['realm_passwd'] - + if full_join: try: - result = trustinstance.join_ad_full_credentials(keys[-1], realm_server, realm_admin, realm_passwd) - except errors.NotFound, e: + result = self.trustinstance.join_ad_full_credentials( + keys[-1], + self.realm_server, + self.realm_admin, + self.realm_passwd + ) + except errors.NotFound: error_message=_("Unable to resolve domain controller for '%s' domain. ") % (keys[-1]) instructions=[] if dns_container_exists(self.obj.backend): @@ -432,7 +501,10 @@ sides. raise errors.ValidationError(name=_('AD Trust setup'), error=_('Unable to verify write permissions to the AD')) - ret = dict(value=trustinstance.remote_domain.info['dns_domain'], verified=result['verified']) + ret = dict( + value=self.trustinstance.remote_domain.info['dns_domain'], + verified=result['verified'] + ) ret['summary'] = summary % ret return ret @@ -441,8 +513,15 @@ sides. # is provided. Do the work on our side and inform what to do on remote # side. if 'trust_secret' in options: - result = trustinstance.join_ad_ipa_half(keys[-1], realm_server, options['trust_secret']) - ret = dict(value=trustinstance.remote_domain.info['dns_domain'], verified=result['verified']) + result = self.trustinstance.join_ad_ipa_half( + keys[-1], + self.realm_server, + options['trust_secret'] + ) + ret = dict( + value=self.trustinstance.remote_domain.info['dns_domain'], + verified=result['verified'] + ) ret['summary'] = summary % ret return ret raise errors.ValidationError(name=_('AD Trust setup'), diff --git a/ipaserver/dcerpc.py b/ipaserver/dcerpc.py index 5d052ea4a..0f98ce83c 100644 --- a/ipaserver/dcerpc.py +++ b/ipaserver/dcerpc.py @@ -816,7 +816,7 @@ class TrustDomainJoins(object): ld.retrieve(installutils.get_fqdn()) self.local_domain = ld - def __populate_remote_domain(self, realm, realm_server=None, realm_admin=None, realm_passwd=None): + def populate_remote_domain(self, realm, realm_server=None, realm_admin=None, realm_passwd=None): def get_instance(self): # Fetch data from foreign domain using password only rd = TrustDomainInstance('') @@ -860,7 +860,14 @@ class TrustDomainJoins(object): if not self.configured: return None - self.__populate_remote_domain(realm, realm_server, realm_admin, realm_passwd) + if not(isinstance(self.remote_domain, TrustDomainInstance)): + self.populate_remote_domain( + realm, + realm_server, + realm_admin, + realm_passwd + ) + if not self.remote_domain.read_only: trustdom_pass = samba.generate_random_password(128, 128) self.remote_domain.establish_trust(self.local_domain, trustdom_pass) @@ -873,6 +880,8 @@ class TrustDomainJoins(object): if not self.configured: return None - self.__populate_remote_domain(realm, realm_server, realm_passwd=None) + if not(isinstance(self.remote_domain, TrustDomainInstance)): + self.populate_remote_domain(realm, realm_server, realm_passwd=None) + self.local_domain.establish_trust(self.remote_domain, trustdom_passwd) return dict(local=self.local_domain, remote=self.remote_domain, verified=False) -- cgit