diff options
-rw-r--r-- | ipalib/plugins/pwpolicy.py | 64 | ||||
-rw-r--r-- | tests/test_xmlrpc/test_pwpolicy.py | 43 |
2 files changed, 89 insertions, 18 deletions
diff --git a/ipalib/plugins/pwpolicy.py b/ipalib/plugins/pwpolicy.py index dbf636e65..22af9fb78 100644 --- a/ipalib/plugins/pwpolicy.py +++ b/ipalib/plugins/pwpolicy.py @@ -110,6 +110,41 @@ def make_policy_entry(group_cn, policy_entry): return (policy_dn, policy_entry) +def find_group_policy(ldap): + """ + Return all group policy entries. + """ + attrs = ('cn','krbminpwdlife', 'krbmaxpwdlife', 'krbpwdmindiffchars', 'krbpwdminlength', 'krbpwdhistorylength',) + + attr_filter = ldap.make_filter({'objectclass':'krbpwdpolicy'}, rules=ldap.MATCH_ALL) + + try: + (entries, truncated) = ldap.find_entries( + attr_filter, attrs, 'cn=%s,cn=kerberos,%s' % (api.env.realm, api.env.basedn), scope=ldap.SCOPE_ONELEVEL + ) + except errors.NotFound: + (entries, truncated) = (tuple(), False) + + return (entries, truncated) + +def unique_priority(ldap, priority): + """ + Return True if the given priority is unique, False otherwise + + Having two cosPriority with the same value is undefined in the DS. + + This isn't done as a validation on the attribute since we want it done + only on the server side. + """ + (entries, truncated) = find_group_policy(ldap) + for e in entries: + groupdn = find_group_dn(e[1]['cn'][0]) + cos_dn = 'cn="%s", cn=cosTemplates, cn=accounts, %s' % (groupdn, api.env.basedn) + (dn, cos_attrs) = ldap.get_entry(cos_dn, normalize=False) + if priority == int(cos_attrs['cospriority'][0]): + return False + + return True class pwpolicy(Object): """ @@ -188,6 +223,10 @@ class pwpolicy_add(crud.Create): group_cn = options['group'] + if 'cospriority' in options: + if not unique_priority(ldap, options['cospriority']): + raise errors.ValidationError(name='priority', error=_('Priority must be a unique value.')) + # Create the CoS template (cos_dn, cos_entry) = make_cos_entry(group_cn, options.get('cospriority', None)) if 'cospriority' in options: @@ -258,6 +297,8 @@ class pwpolicy_mod(crud.Update): if 'cospriority' in options: if options['cospriority'] is None: raise errors.RequirementError(name='priority') + if not unique_priority(ldap, options['cospriority']): + raise errors.ValidationError(name='priority', error=_('Priority must be a unique value.')) groupdn = find_group_dn(group_cn) cos_dn = 'cn="%s", cn=cosTemplates, cn=accounts, %s' % (groupdn, api.env.basedn) self.log.debug('%s' % cos_dn) @@ -397,22 +438,25 @@ class pwpolicy_find(Method): has_output = output.standard_list_of_entries + takes_options = ( + Int('cospriority?', + cli_name='priority', + label=_('Priority'), + flags=['no_create', 'no_update', 'no_search'], + ), + ) + def execute(self, *args, **options): ldap = self.api.Backend.ldap2 - attrs = ('cn','krbminpwdlife', 'krbmaxpwdlife', 'krbpwdmindiffchars', 'krbpwdminlength', 'krbpwdhistorylength',) - - attr_filter = ldap.make_filter({'objectclass':'krbpwdpolicy'}, rules=ldap.MATCH_ALL) - - try: - (entries, truncated) = ldap.find_entries( - attr_filter, attrs, 'cn=%s,cn=kerberos,%s' % (api.env.realm, api.env.basedn), scope=ldap.SCOPE_ONELEVEL - ) - except errors.NotFound: - (entries, truncated) = (tuple(), False) + (entries, truncated) = find_group_policy(ldap) for e in entries: _convert_time_for_output(e[1]) e[1]['dn'] = e[0] + groupdn = find_group_dn(e[1]['cn'][0]) + cos_dn = 'cn="%s", cn=cosTemplates, cn=accounts, %s' % (groupdn, api.env.basedn) + (dn, cos_attrs) = ldap.get_entry(cos_dn, normalize=False) + e[1]['cospriority'] = cos_attrs['cospriority'] entries = tuple(e for (dn, e) in entries) return dict(result=entries, diff --git a/tests/test_xmlrpc/test_pwpolicy.py b/tests/test_xmlrpc/test_pwpolicy.py index ceb4f8b62..1a34c02ae 100644 --- a/tests/test_xmlrpc/test_pwpolicy.py +++ b/tests/test_xmlrpc/test_pwpolicy.py @@ -57,16 +57,32 @@ class test_pwpolicy(XMLRPC_test): def test_2_pwpolicy_add(self): """ + Add a policy with a duplicate priority + + The priority validation is done first so it's ok that the group + is the same here. + """ + try: + api.Command['pwpolicy_add'](**self.kw) + except errors.ValidationError: + pass + else: + assert False + + def test_3_pwpolicy_add(self): + """ Add a policy that already exists """ try: + # cospriority needs to be unique + self.kw['cospriority'] = 3 api.Command['pwpolicy_add'](**self.kw) except errors.DuplicateEntry: pass else: assert False - def test_3_pwpolicy_add(self): + def test_4_pwpolicy_add(self): """ Test adding another per-group policy using the `xmlrpc.pwpolicy_add` method. """ @@ -79,18 +95,18 @@ class test_pwpolicy(XMLRPC_test): assert_attr_equal(entry, 'krbpwdhistorylength', '8') assert_attr_equal(entry, 'krbpwdminlength', '9') - def test_4_pwpolicy_add(self): + def test_5_pwpolicy_add(self): """ Add a pwpolicy for a non-existent group """ try: - api.Command['pwpolicy_add'](group=u'nopwpolicy',cospriority=1,krbminpwdlife=1) + api.Command['pwpolicy_add'](group=u'nopwpolicy',cospriority=4,krbminpwdlife=1) except errors.NotFound: pass else: assert False - def test_5_pwpolicy_show(self): + def test_6_pwpolicy_show(self): """ Test the `xmlrpc.pwpolicy_show` method with global policy. """ @@ -101,7 +117,7 @@ class test_pwpolicy(XMLRPC_test): assert_attr_equal(entry, 'krbpwdhistorylength', '0') assert_attr_equal(entry, 'krbpwdminlength', '8') - def test_6_pwpolicy_show(self): + def test_7_pwpolicy_show(self): """ Test the `xmlrpc.pwpolicy_show` method. """ @@ -111,7 +127,7 @@ class test_pwpolicy(XMLRPC_test): assert_attr_equal(entry, 'krbpwdhistorylength', '5') assert_attr_equal(entry, 'krbpwdminlength', '6') - def test_7_pwpolicy_mod(self): + def test_8_pwpolicy_mod(self): """ Test the `xmlrpc.pwpolicy_mod` method for global policy. """ @@ -122,14 +138,25 @@ class test_pwpolicy(XMLRPC_test): entry = api.Command['pwpolicy_mod'](krbminpwdlife=1)['result'] assert_attr_equal(entry, 'krbminpwdlife', '1') - def test_8_pwpolicy_mod(self): + def test_9_pwpolicy_mod(self): """ Test the `xmlrpc.pwpolicy_mod` method. """ entry = api.Command['pwpolicy_mod'](group=self.group, krbminpwdlife=50)['result'] assert_attr_equal(entry, 'krbminpwdlife', '50') - def test_9_pwpolicy_del(self): + def test_a_pwpolicy_mod(self): + """ + Test `xmlrpc.pwpolicy_mod` with a duplicate priority + """ + try: + api.Command['pwpolicy_mod'](group=self.group, cospriority=1) + except errors.ValidationError: + pass + else: + assert False + + def test_b_pwpolicy_del(self): """ Test the `xmlrpc.pwpolicy_del` method. """ |