diff options
-rw-r--r-- | ipalib/plugins/group.py | 2 | ||||
-rw-r--r-- | ipalib/plugins/pwpolicy.py | 587 | ||||
-rw-r--r-- | ipalib/plugins/pwpolicy2.py | 359 | ||||
-rw-r--r-- | tests/test_xmlrpc/test_pwpolicy.py | 54 | ||||
-rw-r--r-- | tests/test_xmlrpc/test_pwpolicy2.py | 171 |
5 files changed, 254 insertions, 919 deletions
diff --git a/ipalib/plugins/group.py b/ipalib/plugins/group.py index a78529404..296366f6d 100644 --- a/ipalib/plugins/group.py +++ b/ipalib/plugins/group.py @@ -129,7 +129,7 @@ class group_del(LDAPDelete): def post_callback(self, ldap, dn, *keys, **options): try: - api.Command['pwpolicy_del'](group=keys[-1]) + api.Command['pwpolicy_del'](keys[-1]) except errors.NotFound: pass diff --git a/ipalib/plugins/pwpolicy.py b/ipalib/plugins/pwpolicy.py index bf8abcf82..71c355959 100644 --- a/ipalib/plugins/pwpolicy.py +++ b/ipalib/plugins/pwpolicy.py @@ -1,8 +1,7 @@ # Authors: -# Rob Crittenden <rcritten@redhat.com> # Pavel Zuna <pzuna@redhat.com> # -# Copyright (C) 2008 Red Hat +# Copyright (C) 2010 Red Hat # see file 'COPYING' for use and warranty information # # This program is free software; you can redistribute it and/or @@ -17,472 +16,344 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - """ Password policy """ -from ipalib import api, crud, errors -from ipalib import Method, Object +from ipalib import api from ipalib import Int, Str -from ipalib import output -from ipalib import _, ngettext -from ldap.functions import explode_dn - -_fields = { - 'group': 'Group policy', - 'krbminpwdlife': 'Minimum lifetime (in hours)', - 'krbmaxpwdlife': 'Maximum lifetime (in days)', - 'krbpwdmindiffchars': 'Minimum number of characters classes', - 'krbpwdminlength': 'Minimum length', - 'krbpwdhistorylength': 'History size', -} - -_global=u'global' - -def _convert_time_for_output(entry_attrs): - # Convert seconds to hours and days for displaying to user - if 'krbmaxpwdlife' in entry_attrs: - entry_attrs['krbmaxpwdlife'][0] = unicode( - int(entry_attrs['krbmaxpwdlife'][0]) / 86400 - ) - if 'krbminpwdlife' in entry_attrs: - entry_attrs['krbminpwdlife'][0] = unicode( - int(entry_attrs['krbminpwdlife'][0]) / 3600 - ) +from ipalib.plugins.baseldap import * +from ipalib import _ -def _convert_time_on_input(entry_attrs): - # Convert hours and days to seconds for writing to LDAP - if 'krbmaxpwdlife' in entry_attrs and entry_attrs['krbmaxpwdlife']: - entry_attrs['krbmaxpwdlife'] = entry_attrs['krbmaxpwdlife'] * 86400 - if 'krbminpwdlife' in entry_attrs and entry_attrs['krbminpwdlife']: - entry_attrs['krbminpwdlife'] = entry_attrs['krbminpwdlife'] * 3600 -def find_group_dn(group): - """ - Given a group name find the DN of that group +class cosentry(LDAPObject): """ - try: - entry = api.Command['group_show'](group)['result'] - except errors.NotFound: - raise errors.NotFound(reason="group '%s' does not exist" % group) - return entry['dn'] - -def make_cos_entry(group, cospriority=None): + Class of Service object used for linking policies with groups """ - Make the CoS dn and entry for this group. + INTERNAL = True - Returns (cos_dn, cos_entry) where: - cos_dn = DN of the new CoS entry - cos_entry = entry representing this new object - """ - ldap = api.Backend.ldap2 + container_dn = 'cn=costemplates,%s' % api.env.container_accounts + object_class = ['top', 'costemplate', 'extensibleobject', 'krbcontainer'] + default_attributes = ['cn', 'cospriority', 'krbpwdpolicyreference'] - groupdn = find_group_dn(group) + takes_params = ( + Str('cn', primary_key=True), + Str('krbpwdpolicyreference'), + Int('cospriority', minvalue=0), + ) - cos_entry = {} - if cospriority: - cos_entry['cospriority'] = cospriority - cos_entry['objectclass'] = ['top', 'costemplate', 'extensibleobject', 'krbcontainer'] - cos_dn = ldap.make_dn_from_attr( - 'cn', groupdn, 'cn=cosTemplates,%s' % api.env.container_accounts + priority_not_unique_msg = _( + 'priority must be a unique value (%(prio)d already used by %(gname)s)' ) - return (cos_dn, cos_entry) + def get_dn(self, *keys, **options): + group_dn = self.api.Object.group.get_dn(keys[-1]) + return self.backend.make_dn_from_attr( + 'cn', group_dn, self.container_dn + ) + def check_priority_uniqueness(self, *keys, **options): + if options.get('cospriority') is not None: + entries = self.methods.find( + cospriority=options['cospriority'] + )['result'] + if len(entries) > 0: + group_name = self.api.Object.group.get_primary_key_from_dn( + entries[0]['cn'][0] + ) + raise errors.ValidationError( + name='priority', + error=self.priority_not_unique_msg % { + 'prio': options['cospriority'], + 'gname': group_name, + } + ) -def make_policy_entry(group_cn, policy_entry): - """ - Make the krbpwdpolicy dn and entry for this group. +api.register(cosentry) - Returns (policy_dn, policy_entry) where: - policy_dn = DN of the new password policy entry - policy_entry = entry representing this new object - """ - ldap = api.Backend.ldap2 - # This DN must *NOT* have spaces between elements - policy_dn = ldap.make_dn_from_attr( - 'cn', api.env.realm, 'cn=kerberos,%s' % api.env.basedn - ) - policy_dn = ldap.make_dn_from_attr('cn', group_cn, policy_dn) +class cosentry_add(LDAPCreate): + INTERNAL = True - # Create the krb password policy entry. This MUST be located - # in the same container as the REALM or the kldap plugin won't - # recognize it. The usual CoS trick of putting the whole DN into - # the dn won't work either because the kldap plugin doesn't like - # quotes in the DN. - policy_entry['objectclass'] = ['top', 'nscontainer', 'krbpwdpolicy'] - policy_entry['cn'] = group_cn + def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options): + # check for existence of the group + self.api.Command.group_show(keys[-1]) + self.obj.check_priority_uniqueness(*keys, **options) + del entry_attrs['cn'] + return dn - return (policy_dn, policy_entry) +api.register(cosentry_add) -def find_group_policy(ldap): - """ - Return all group policy entries. - """ - attrs = ('cn','krbminpwdlife', 'krbmaxpwdlife', 'krbpwdmindiffchars', 'krbpwdminlength', 'krbpwdhistorylength',) - attr_filter = ldap.make_filter({'objectclass':'krbpwdpolicy'}, rules=ldap.MATCH_ALL) +class cosentry_del(LDAPDelete): + INTERNAL = True - try: - (entries, truncated) = ldap.find_entries( - attr_filter, attrs, 'cn=%s,cn=kerberos,%s' % (api.env.realm, api.env.basedn), scope=ldap.SCOPE_ONELEVEL - ) - except errors.NotFound: - (entries, truncated) = (tuple(), False) +api.register(cosentry_del) - return (entries, truncated) -def unique_priority(ldap, priority): - """ - Return True if the given priority is unique, False otherwise +class cosentry_mod(LDAPUpdate): + INTERNAL = True - Having two cosPriority with the same value is undefined in the DS. + def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options): + self.obj.check_priority_uniqueness(*keys, **options) + return dn - This isn't done as a validation on the attribute since we want it done - only on the server side. - """ - attrs = ('cospriority',) +api.register(cosentry_mod) - attr_filter = ldap.make_filter({'objectclass':'krbcontainer', 'cospriority':priority }, rules=ldap.MATCH_ALL) - try: - (entries, truncated) = ldap.find_entries( - attr_filter, attrs, 'cn=cosTemplates,%s' % api.env.container_accounts, scope=ldap.SCOPE_ONELEVEL - ) - return False - except errors.NotFound: - return True +class cosentry_show(LDAPRetrieve): + INTERNAL = True + +api.register(cosentry_show) - return True -class pwpolicy(Object): +class cosentry_find(LDAPSearch): + INTERNAL = True + +api.register(cosentry_find) + + +GLOBAL_POLICY_NAME = u'GLOBAL' + + +class pwpolicy(LDAPObject): """ - Password Policy object. + Password Policy object """ + container_dn = 'cn=%s,cn=kerberos' % api.env.realm + object_name = 'password policy' + object_name_plural = 'password policies' + object_class = ['top', 'nscontainer', 'krbpwdpolicy'] + default_attributes = [ + 'cn', 'cospriority', 'krbmaxpwdlife', 'krbminpwdlife', + 'krbpwdhistorylength', 'krbpwdmindiffchars', 'krbpwdminlength', + ] takes_params = ( Str('cn?', + cli_name='group', label=_('Group'), - flags=['no_create', 'no_update', 'no_search'], + doc=_('Manage password policy for specific group'), + primary_key=True, ), Int('krbmaxpwdlife?', cli_name='maxlife', label=_('Max lifetime (days)'), doc=_('Maximum password lifetime (in days)'), minvalue=0, - attribute=True, ), Int('krbminpwdlife?', cli_name='minlife', label=_('Min lifetime (hours)'), doc=_('Minimum password lifetime (in hours)'), minvalue=0, - attribute=True, ), Int('krbpwdhistorylength?', cli_name='history', label=_('History size'), doc=_('Password history size'), minvalue=0, - attribute=True, ), Int('krbpwdmindiffchars?', cli_name='minclasses', label=_('Character classes'), doc=_('Minimum number of character classes'), minvalue=0, - attribute=True, + maxvalue=5, ), Int('krbpwdminlength?', cli_name='minlength', label=_('Min length'), doc=_('Minimum length of password'), minvalue=0, - attribute=True, - ), - ) - -api.register(pwpolicy) - - -class pwpolicy_add(crud.Create): - """ - Create a new password policy associated with a group. - """ - - msg_summary = _('Added policy for group "%(value)s"') - - takes_options = ( - Str('group', - label=_('Group'), - doc=_('Group to set policy for'), - attribute=False, ), Int('cospriority', cli_name='priority', label=_('Priority'), - doc=_('Priority of the policy (higher number equals lower priority)'), + doc=_('Priority of the policy (higher number means lower priority'), minvalue=0, - attribute=True, ), ) - def execute(self, *args, **options): - ldap = self.api.Backend.ldap2 - - group_cn = options['group'] - - if 'cospriority' in options: - if not unique_priority(ldap, options['cospriority']): - raise errors.ValidationError(name='priority', error=_('Priority must be a unique value.')) - - # Create the CoS template - (cos_dn, cos_entry) = make_cos_entry(group_cn, options.get('cospriority', None)) - if 'cospriority' in options: - del options['cospriority'] - - # Create the new password policy - policy_entry = self.args_options_2_entry(*args, **options) - (policy_dn, policy_entry) = make_policy_entry(group_cn, policy_entry) - _convert_time_on_input(policy_entry) + def get_dn(self, *keys, **options): + if keys[-1] is not None: + return self.backend.make_dn_from_attr( + self.primary_key.name, keys[-1], self.container_dn + ) + return self.api.env.container_accounts + + def convert_time_for_output(self, entry_attrs, **options): + # Convert seconds to hours and days for displaying to user + if not options.get('raw', False): + if 'krbmaxpwdlife' in entry_attrs: + entry_attrs['krbmaxpwdlife'][0] = unicode( + int(entry_attrs['krbmaxpwdlife'][0]) / 86400 + ) + if 'krbminpwdlife' in entry_attrs: + entry_attrs['krbminpwdlife'][0] = unicode( + int(entry_attrs['krbminpwdlife'][0]) / 3600 + ) - # Link the two entries together - cos_entry['krbpwdpolicyreference'] = policy_dn + def convert_time_on_input(self, entry_attrs): + # Convert hours and days to seconds for writing to LDAP + if 'krbmaxpwdlife' in entry_attrs and entry_attrs['krbmaxpwdlife']: + entry_attrs['krbmaxpwdlife'] = entry_attrs['krbmaxpwdlife'] * 86400 + if 'krbminpwdlife' in entry_attrs and entry_attrs['krbminpwdlife']: + entry_attrs['krbminpwdlife'] = entry_attrs['krbminpwdlife'] * 3600 - ldap.add_entry(policy_dn, policy_entry) - ldap.add_entry(cos_dn, cos_entry) +api.register(pwpolicy) - # The policy is what is interesting, return that - (dn, entry_attrs) = ldap.get_entry(policy_dn, policy_entry.keys()) - _convert_time_for_output(entry_attrs) +class pwpolicy_add(LDAPCreate): + """ + Create new group password policy. + """ + def get_args(self): + yield self.obj.primary_key.clone(attribute=True, required=True) - entry_attrs['dn'] = dn - return dict(result=entry_attrs, value=group_cn) + def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options): + self.api.Command.cosentry_add( + keys[-1], krbpwdpolicyreference=dn, + cospriority=options.get('cospriority') + ) + if 'cospriority' in entry_attrs: + del entry_attrs['cospriority'] + self.obj.convert_time_on_input(entry_attrs) + return dn + + def post_callback(self, ldap, dn, entry_attrs, *keys, **options): + self.log.info('%r' % entry_attrs) + if not options.get('raw', False): + if options.get('cospriority') is not None: + entry_attrs['cospriority'] = [unicode(options['cospriority'])] + self.obj.convert_time_for_output(entry_attrs, **options) + return dn api.register(pwpolicy_add) -class pwpolicy_mod(crud.Update): +class pwpolicy_del(LDAPDelete): """ - Modify password policy. + Delete group password policy. """ - msg_summary = _('Modified policy for group "%(value)s"') - takes_options = ( - Str('group?', - label=_('Group'), - doc=_('Group to set policy for'), - attribute=False, - ), - Int('cospriority?', - cli_name='priority', - label=_('Priority'), - doc=_('Priority of the policy (higher number equals lower priority)'), - minvalue=0, - attribute=True, - ), - ) + def get_args(self): + yield self.obj.primary_key.clone(attribute=True, required=True) - def execute(self, *args, **options): - assert 'dn' not in options - ldap = self.api.Backend.ldap2 - cospriority = None - - if 'group' in options: - group_cn = options['group'] - del options['group'] - else: - group_cn = None - if len(options) == 2: # 'all' and 'raw' are always sent - raise errors.EmptyModlist() - - if not group_cn: - group_cn = _global - if 'cospriority' in options: - raise errors.ValidationError(name='priority', error=_('priority cannot be set on global policy')) - dn = self.api.env.container_accounts - entry_attrs = self.args_options_2_entry(*args, **options) - else: - if 'cospriority' in options: - if options['cospriority'] is None: - raise errors.RequirementError(name='priority') - if not unique_priority(ldap, options['cospriority']): - raise errors.ValidationError(name='priority', error=_('Priority must be a unique value.')) - groupdn = find_group_dn(group_cn) - cos_dn = ldap.make_dn_from_attr( - 'cn', groupdn, - 'cn=cosTemplates,%s' % self.api.env.container_accounts - ) - ldap.update_entry(cos_dn, dict(cospriority = options['cospriority'])) - cospriority = options['cospriority'] - del options['cospriority'] - entry_attrs = self.args_options_2_entry(*args, **options) - (dn, entry_attrs) = make_policy_entry(group_cn, entry_attrs) - _convert_time_on_input(entry_attrs) + def post_callback(self, ldap, dn, *keys, **options): try: - ldap.update_entry(dn, entry_attrs) - except errors.EmptyModlist: + self.api.Command.cosentry_del(keys[-1]) + except errors.NotFound: pass + return True - (dn, entry_attrs) = ldap.get_entry(dn, entry_attrs.keys()) - - if cospriority: - entry_attrs['cospriority'] = cospriority - - _convert_time_for_output(entry_attrs) - - return dict(result=entry_attrs, value=group_cn) - -api.register(pwpolicy_mod) +api.register(pwpolicy_del) -class pwpolicy_del(crud.Delete): +class pwpolicy_mod(LDAPUpdate): """ - Delete a group password policy. + Modify group password policy. """ + def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options): + if options.get('cospriority') is not None: + if keys[-1] is None: + raise errors.ValidationError( + name='priority', + error=_('priority cannot be set on global policy') + ) + try: + self.api.Command.cosentry_mod( + keys[-1], cospriority=options['cospriority'] + ) + except errors.NotFound: + self.api.Command.cosentry_add( + keys[-1], krbpwdpolicyreference=dn, + cospriority=options['cospriority'] + ) + del entry_attrs['cospriority'] + self.obj.convert_time_on_input(entry_attrs) + return dn + + def post_callback(self, ldap, dn, entry_attrs, *keys, **options): + if not options.get('raw', False): + if options.get('cospriority') is not None: + entry_attrs['cospriority'] = [unicode(options['cospriority'])] + if keys[-1] is None: + entry_attrs['cn'] = GLOBAL_POLICY_NAME + self.obj.convert_time_for_output(entry_attrs, **options) + return dn + + def exc_callback(self, keys, options, exc, call_func, *call_args, **call_kwargs): + if isinstance(exc, errors.EmptyModlist): + entry_attrs = call_args[1] + if not entry_attrs and 'cospriority' in options: + return + raise exc - msg_summary = _('Deleted policy for group "%(value)s"') - takes_options = ( - Str('group', - doc=_('Group to remove policy from'), - ), - ) - - def execute(self, *args, **options): - assert 'dn' not in options - ldap = self.api.Backend.ldap2 - - group_cn = options['group'] - - # Get the DN of the CoS template to delete - try: - (cos_dn, cos_entry) = make_cos_entry(group_cn, None) - except errors.NotFound: - # Ok, perhaps the group was deleted, try to make the group DN - rdn = ldap.make_rdn_from_attr('cn', group_cn) - group_dn = ldap.make_dn_from_rdn(rdn, api.env.container_group) - cos_dn = ldap.make_dn_from_attr( - 'cn', group_dn, - 'cn=cosTemplates,%s' % self.api.env.container_accounts - ) - policy_entry = self.args_options_2_entry(*args, **options) - (policy_dn, policy_entry) = make_policy_entry(group_cn, policy_entry) - ldap.delete_entry(policy_dn) - ldap.delete_entry(cos_dn) - return dict( - result=True, - value=group_cn, - ) - -api.register(pwpolicy_del) +api.register(pwpolicy_mod) -class pwpolicy_show(Method): +class pwpolicy_show(LDAPRetrieve): """ - Display password policy. + Display group password policy. """ - - has_output = ( - output.Entry('result'), - ) takes_options = ( - Str('group?', - label=_('Group'), - doc=_('Group to display policy'), - ), Str('user?', label=_('User'), - doc=_('Display policy applied to a given user'), - ), - Int('cospriority?', - cli_name='priority', - label=_('Priority'), - flags=['no_create', 'no_update', 'no_search'], + doc=_('Display effective policy for a specific user'), ), ) - def execute(self, *args, **options): - ldap = self.api.Backend.ldap2 - - dn = None - group = None - - if 'user' in options: - rdn = ldap.make_rdn_from_attr('uid', options['user']) - user_dn = ldap.make_dn_from_rdn(rdn, api.env.container_user) - try: - (user_dn, user_attrs) = ldap.get_entry(user_dn, ['krbpwdpolicyreference']) - if 'krbpwdpolicyreference' in user_attrs: - dn = user_attrs['krbpwdpolicyreference'][0] - rdn = explode_dn(dn) - group = rdn[0].replace('cn=','') - except errors.NotFound: - raise errors.NotFound(reason="user '%s' not found" % options['user']) - - if dn is None: - if not 'group' in options: - dn = self.api.env.container_accounts + def pre_callback(self, ldap, dn, attrs_list, *keys, **options): + if options.get('user') is not None: + user_entry = self.api.Command.user_show( + options['user'], all=True + )['result'] + if 'krbpwdpolicyreference' in user_entry: + return user_entry.get('krbpwdpolicyreference', [dn])[0] + return dn + + def post_callback(self, ldap, dn, entry_attrs, *keys, **options): + if not options.get('raw', False): + if keys[-1] is not None: + try: + cos_entry = self.api.Command.cosentry_show( + keys[-1] + )['result'] + if cos_entry.get('cospriority') is not None: + entry_attrs['cospriority'] = cos_entry['cospriority'] + except errors.NotFound: + pass else: - policy_entry = self.args_options_2_entry(*args, **options) - (dn, policy_entry) = make_policy_entry(options['group'], policy_entry) - (dn, entry_attrs) = ldap.get_entry(dn) - - if 'group' in options: - groupdn = find_group_dn(options['group']) - cos_dn = ldap.make_dn_from_attr( - 'cn', groupdn, - 'cn=cosTemplates,%s' % self.api.env.container_accounts - ) - (dn, cos_attrs) = ldap.get_entry(cos_dn) - entry_attrs['cospriority'] = cos_attrs['cospriority'] - else: - entry_attrs['cn'] = _global - - if 'user' in options: - if group: - entry_attrs['cn'] = unicode(group) - _convert_time_for_output(entry_attrs) - - return dict(result=entry_attrs) + entry_attrs['cn'] = GLOBAL_POLICY_NAME + self.obj.convert_time_for_output(entry_attrs, **options) + return dn api.register(pwpolicy_show) -class pwpolicy_find(Method): + +class pwpolicy_find(LDAPSearch): """ - Display all groups with a password policy. + Search for group password policies. """ - - has_output = output.standard_list_of_entries - - takes_options = ( - Int('cospriority?', - cli_name='priority', - label=_('Priority'), - flags=['no_create', 'no_update', 'no_search'], - ), - ) - - def execute(self, *args, **options): - ldap = self.api.Backend.ldap2 - - (entries, truncated) = find_group_policy(ldap) - for e in entries: - _convert_time_for_output(e[1]) - e[1]['dn'] = e[0] - groupdn = find_group_dn(e[1]['cn'][0]) - cos_dn = ldap.make_dn_from_attr( - 'cn', groupdn, - 'cn=cosTemplates,%s' % self.api.env.container_accounts - ) - (dn, cos_attrs) = ldap.get_entry(cos_dn) - e[1]['cospriority'] = cos_attrs['cospriority'] - entries = tuple(e for (dn, e) in entries) - - return dict(result=entries, - count=len(entries), - truncated=truncated, - ) + def post_callback(self, ldap, entries, truncated, *args, **options): + if not options.get('raw', False): + for e in entries: + try: + cos_entry = self.api.Command.cosentry_show( + e[1]['cn'][0] + )['result'] + if cos_entry.get('cospriority') is not None: + e[1]['cospriority'] = cos_entry['cospriority'] + except errors.NotFound: + pass + self.obj.convert_time_for_output(e[1], **options) + if not args[-1]: + global_entry = self.api.Command.pwpolicy_show( + all=options.get('all', False), raw=options.get('raw', False) + )['result'] + dn = global_entry['dn'] + del global_entry['dn'] + entries.insert(0, (dn, global_entry)) api.register(pwpolicy_find) + diff --git a/ipalib/plugins/pwpolicy2.py b/ipalib/plugins/pwpolicy2.py deleted file mode 100644 index d67229276..000000000 --- a/ipalib/plugins/pwpolicy2.py +++ /dev/null @@ -1,359 +0,0 @@ -# Authors: -# Pavel Zuna <pzuna@redhat.com> -# -# Copyright (C) 2010 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; version 2 only -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -""" -Password policy -""" - -from ipalib import api -from ipalib import Int, Str -from ipalib.plugins.baseldap import * -from ipalib import _ - - -class cosentry(LDAPObject): - """ - Class of Service object used for linking policies with groups - """ - INTERNAL = True - - container_dn = 'cn=costemplates,%s' % api.env.container_accounts - object_class = ['top', 'costemplate', 'extensibleobject', 'krbcontainer'] - default_attributes = ['cn', 'cospriority', 'krbpwdpolicyreference'] - - takes_params = ( - Str('cn', primary_key=True), - Str('krbpwdpolicyreference'), - Int('cospriority', minvalue=0), - ) - - priority_not_unique_msg = _( - 'priority must be a unique value (%(prio)d already used by %(gname)s)' - ) - - def get_dn(self, *keys, **options): - group_dn = self.api.Object.group.get_dn(keys[-1]) - return self.backend.make_dn_from_attr( - 'cn', group_dn, self.container_dn - ) - - def check_priority_uniqueness(self, *keys, **options): - if options.get('cospriority') is not None: - entries = self.methods.find( - cospriority=options['cospriority'] - )['result'] - if len(entries) > 0: - group_name = self.api.Object.group.get_primary_key_from_dn( - entries[0]['cn'][0] - ) - raise errors.ValidationError( - name='priority', - error=self.priority_not_unique_msg % { - 'prio': options['cospriority'], - 'gname': group_name, - } - ) - -api.register(cosentry) - - -class cosentry_add(LDAPCreate): - INTERNAL = True - - def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options): - # check for existence of the group - self.api.Command.group_show(keys[-1]) - self.obj.check_priority_uniqueness(*keys, **options) - del entry_attrs['cn'] - return dn - -api.register(cosentry_add) - - -class cosentry_del(LDAPDelete): - INTERNAL = True - -api.register(cosentry_del) - - -class cosentry_mod(LDAPUpdate): - INTERNAL = True - - def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options): - self.obj.check_priority_uniqueness(*keys, **options) - return dn - -api.register(cosentry_mod) - - -class cosentry_show(LDAPRetrieve): - INTERNAL = True - -api.register(cosentry_show) - - -class cosentry_find(LDAPSearch): - INTERNAL = True - -api.register(cosentry_find) - - -GLOBAL_POLICY_NAME = u'GLOBAL' - - -class pwpolicy2(LDAPObject): - """ - Password Policy object - """ - container_dn = 'cn=%s,cn=kerberos' % api.env.realm - object_name = 'password policy' - object_name_plural = 'password policies' - object_class = ['top', 'nscontainer', 'krbpwdpolicy'] - default_attributes = [ - 'cn', 'cospriority', 'krbmaxpwdlife', 'krbminpwdlife', - 'krbpwdhistorylength', 'krbpwdmindiffchars', 'krbpwdminlength', - ] - - takes_params = ( - Str('cn?', - cli_name='group', - label=_('Group'), - doc=_('Manage password policy for specific group'), - primary_key=True, - ), - Int('krbmaxpwdlife?', - cli_name='maxlife', - label=_('Max lifetime (days)'), - doc=_('Maximum password lifetime (in days)'), - minvalue=0, - ), - Int('krbminpwdlife?', - cli_name='minlife', - label=_('Min lifetime (hours)'), - doc=_('Minimum password lifetime (in hours)'), - minvalue=0, - ), - Int('krbpwdhistorylength?', - cli_name='history', - label=_('History size'), - doc=_('Password history size'), - minvalue=0, - ), - Int('krbpwdmindiffchars?', - cli_name='minclasses', - label=_('Character classes'), - doc=_('Minimum number of character classes'), - minvalue=0, - maxvalue=5, - ), - Int('krbpwdminlength?', - cli_name='minlength', - label=_('Min length'), - doc=_('Minimum length of password'), - minvalue=0, - ), - Int('cospriority', - cli_name='priority', - label=_('Priority'), - doc=_('Priority of the policy (higher number means lower priority'), - minvalue=0, - ), - ) - - def get_dn(self, *keys, **options): - if keys[-1] is not None: - return self.backend.make_dn_from_attr( - self.primary_key.name, keys[-1], self.container_dn - ) - return self.api.env.container_accounts - - def convert_time_for_output(self, entry_attrs, **options): - # Convert seconds to hours and days for displaying to user - if not options.get('raw', False): - if 'krbmaxpwdlife' in entry_attrs: - entry_attrs['krbmaxpwdlife'][0] = unicode( - int(entry_attrs['krbmaxpwdlife'][0]) / 86400 - ) - if 'krbminpwdlife' in entry_attrs: - entry_attrs['krbminpwdlife'][0] = unicode( - int(entry_attrs['krbminpwdlife'][0]) / 3600 - ) - - def convert_time_on_input(self, entry_attrs): - # Convert hours and days to seconds for writing to LDAP - if 'krbmaxpwdlife' in entry_attrs and entry_attrs['krbmaxpwdlife']: - entry_attrs['krbmaxpwdlife'] = entry_attrs['krbmaxpwdlife'] * 86400 - if 'krbminpwdlife' in entry_attrs and entry_attrs['krbminpwdlife']: - entry_attrs['krbminpwdlife'] = entry_attrs['krbminpwdlife'] * 3600 - -api.register(pwpolicy2) - - -class pwpolicy2_add(LDAPCreate): - """ - Create new group password policy. - """ - def get_args(self): - yield self.obj.primary_key.clone(attribute=True, required=True) - - def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options): - self.api.Command.cosentry_add( - keys[-1], krbpwdpolicyreference=dn, - cospriority=options.get('cospriority') - ) - if 'cospriority' in entry_attrs: - del entry_attrs['cospriority'] - self.obj.convert_time_on_input(entry_attrs) - return dn - - def post_callback(self, ldap, dn, entry_attrs, *keys, **options): - self.log.info('%r' % entry_attrs) - if not options.get('raw', False): - if options.get('cospriority') is not None: - entry_attrs['cospriority'] = [unicode(options['cospriority'])] - self.obj.convert_time_for_output(entry_attrs, **options) - return dn - -api.register(pwpolicy2_add) - - -class pwpolicy2_del(LDAPDelete): - """ - Delete group password policy. - """ - def get_args(self): - yield self.obj.primary_key.clone(attribute=True, required=True) - - def post_callback(self, ldap, dn, *keys, **options): - try: - self.api.Command.cosentry_del(keys[-1]) - except errors.NotFound: - pass - return True - -api.register(pwpolicy2_del) - - -class pwpolicy2_mod(LDAPUpdate): - """ - Modify group password policy. - """ - def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options): - if options.get('cospriority') is not None: - if keys[-1] is None: - raise errors.ValidationError( - name='priority', - error=_('priority cannot be set on global policy') - ) - try: - self.api.Command.cosentry_mod( - keys[-1], cospriority=options['cospriority'] - ) - except errors.NotFound: - self.api.Command.cosentry_add( - keys[-1], krbpwdpolicyreference=dn, - cospriority=options['cospriority'] - ) - del entry_attrs['cospriority'] - self.obj.convert_time_on_input(entry_attrs) - return dn - - def post_callback(self, ldap, dn, entry_attrs, *keys, **options): - if not options.get('raw', False): - if options.get('cospriority') is not None: - entry_attrs['cospriority'] = [unicode(options['cospriority'])] - if keys[-1] is None: - entry_attrs['cn'] = GLOBAL_POLICY_NAME - self.obj.convert_time_for_output(entry_attrs, **options) - return dn - - def exc_callback(self, keys, options, exc, call_func, *call_args, **call_kwargs): - if isinstance(exc, errors.EmptyModlist): - entry_attrs = call_args[1] - if not entry_attrs and 'cospriority' in options: - return - raise exc - -api.register(pwpolicy2_mod) - - -class pwpolicy2_show(LDAPRetrieve): - """ - Display group password policy. - """ - takes_options = ( - Str('user?', - label=_('User'), - doc=_('Display effective policy for a specific user'), - ), - ) - - def pre_callback(self, ldap, dn, attrs_list, *keys, **options): - if options.get('user') is not None: - user_entry = self.api.Command.user_show( - options['user'], all=True - )['result'] - if 'krbpwdpolicyreference' in user_entry: - return user_entry.get('krbpwdpolicyreference', [dn])[0] - return dn - - def post_callback(self, ldap, dn, entry_attrs, *keys, **options): - if not options.get('raw', False): - if keys[-1] is not None: - try: - cos_entry = self.api.Command.cosentry_show( - keys[-1] - )['result'] - if cos_entry.get('cospriority') is not None: - entry_attrs['cospriority'] = cos_entry['cospriority'] - except errors.NotFound: - pass - else: - entry_attrs['cn'] = GLOBAL_POLICY_NAME - self.obj.convert_time_for_output(entry_attrs, **options) - return dn - -api.register(pwpolicy2_show) - - -class pwpolicy2_find(LDAPSearch): - """ - Search for group password policies. - """ - def post_callback(self, ldap, entries, truncated, *args, **options): - if not options.get('raw', False): - for e in entries: - try: - cos_entry = self.api.Command.cosentry_show( - e[1]['cn'][0] - )['result'] - if cos_entry.get('cospriority') is not None: - e[1]['cospriority'] = cos_entry['cospriority'] - except errors.NotFound: - pass - self.obj.convert_time_for_output(e[1], **options) - if not args[-1]: - global_entry = self.api.Command.pwpolicy2_show( - all=options.get('all', False), raw=options.get('raw', False) - )['result'] - dn = global_entry['dn'] - del global_entry['dn'] - entries.insert(0, (dn, global_entry)) - -api.register(pwpolicy2_find) - diff --git a/tests/test_xmlrpc/test_pwpolicy.py b/tests/test_xmlrpc/test_pwpolicy.py index 1a34c02ae..94063c568 100644 --- a/tests/test_xmlrpc/test_pwpolicy.py +++ b/tests/test_xmlrpc/test_pwpolicy.py @@ -1,7 +1,8 @@ # Authors: # Rob Crittenden <rcritten@redhat.com> +# Pavel Zuna <pzuna@redhat.com> # -# Copyright (C) 2009 Red Hat +# Copyright (C) 2010 Red Hat # see file 'COPYING' for use and warranty information # # This program is free software; you can redistribute it and/or @@ -30,11 +31,11 @@ class test_pwpolicy(XMLRPC_test): """ Test the `pwpolicy` plugin. """ - group = u'testgroup1' - group2 = u'testgroup2' - user = u'testuser1' - kw = {'group': group, 'cospriority': 1, 'krbminpwdlife': 30, 'krbmaxpwdlife': 40, 'krbpwdhistorylength': 5, 'krbpwdminlength': 6 } - kw2 = {'group': group2, 'cospriority': 2, 'krbminpwdlife': 40, 'krbmaxpwdlife': 60, 'krbpwdhistorylength': 8, 'krbpwdminlength': 9 } + group = u'testgroup12' + group2 = u'testgroup22' + user = u'testuser12' + kw = {'cospriority': 1, 'krbminpwdlife': 30, 'krbmaxpwdlife': 40, 'krbpwdhistorylength': 5, 'krbpwdminlength': 6 } + kw2 = {'cospriority': 2, 'krbminpwdlife': 40, 'krbmaxpwdlife': 60, 'krbpwdhistorylength': 8, 'krbpwdminlength': 9 } def test_1_pwpolicy_add(self): """ @@ -49,21 +50,22 @@ class test_pwpolicy(XMLRPC_test): ) api.Command.group_add_member(self.group, users=self.user) - entry = api.Command['pwpolicy_add'](**self.kw)['result'] + entry = api.Command['pwpolicy_add'](self.group, **self.kw)['result'] assert_attr_equal(entry, 'krbminpwdlife', '30') assert_attr_equal(entry, 'krbmaxpwdlife', '40') assert_attr_equal(entry, 'krbpwdhistorylength', '5') assert_attr_equal(entry, 'krbpwdminlength', '6') + assert_attr_equal(entry, 'cospriority', '1') def test_2_pwpolicy_add(self): """ - Add a policy with a duplicate priority + Add a policy with a already used priority. - The priority validation is done first so it's ok that the group + The priority validation is done first, so it's OK that the group is the same here. """ try: - api.Command['pwpolicy_add'](**self.kw) + api.Command['pwpolicy_add'](self.group, **self.kw) except errors.ValidationError: pass else: @@ -71,12 +73,12 @@ class test_pwpolicy(XMLRPC_test): def test_3_pwpolicy_add(self): """ - Add a policy that already exists + Add a policy that already exists. """ try: # cospriority needs to be unique self.kw['cospriority'] = 3 - api.Command['pwpolicy_add'](**self.kw) + api.Command['pwpolicy_add'](self.group, **self.kw) except errors.DuplicateEntry: pass else: @@ -89,18 +91,19 @@ class test_pwpolicy(XMLRPC_test): self.failsafe_add( api.Object.group, self.group2, description=u'pwpolicy test group 2' ) - entry = api.Command['pwpolicy_add'](**self.kw2)['result'] + entry = api.Command['pwpolicy_add'](self.group2, **self.kw2)['result'] assert_attr_equal(entry, 'krbminpwdlife', '40') assert_attr_equal(entry, 'krbmaxpwdlife', '60') assert_attr_equal(entry, 'krbpwdhistorylength', '8') assert_attr_equal(entry, 'krbpwdminlength', '9') + assert_attr_equal(entry, 'cospriority', '2') def test_5_pwpolicy_add(self): """ Add a pwpolicy for a non-existent group """ try: - api.Command['pwpolicy_add'](group=u'nopwpolicy',cospriority=4,krbminpwdlife=1) + api.Command['pwpolicy_add'](u'nopwpolicy', cospriority=1, krbminpwdlife=1) except errors.NotFound: pass else: @@ -121,11 +124,12 @@ class test_pwpolicy(XMLRPC_test): """ Test the `xmlrpc.pwpolicy_show` method. """ - entry = api.Command['pwpolicy_show'](group=self.group)['result'] + entry = api.Command['pwpolicy_show'](self.group)['result'] assert_attr_equal(entry, 'krbminpwdlife', '30') assert_attr_equal(entry, 'krbmaxpwdlife', '40') assert_attr_equal(entry, 'krbpwdhistorylength', '5') assert_attr_equal(entry, 'krbpwdminlength', '6') + assert_attr_equal(entry, 'cospriority', '1') def test_8_pwpolicy_mod(self): """ @@ -142,28 +146,17 @@ class test_pwpolicy(XMLRPC_test): """ Test the `xmlrpc.pwpolicy_mod` method. """ - entry = api.Command['pwpolicy_mod'](group=self.group, krbminpwdlife=50)['result'] + entry = api.Command['pwpolicy_mod'](self.group, krbminpwdlife=50)['result'] assert_attr_equal(entry, 'krbminpwdlife', '50') - def test_a_pwpolicy_mod(self): - """ - Test `xmlrpc.pwpolicy_mod` with a duplicate priority - """ - try: - api.Command['pwpolicy_mod'](group=self.group, cospriority=1) - except errors.ValidationError: - pass - else: - assert False - - def test_b_pwpolicy_del(self): + def test_a_pwpolicy_del(self): """ Test the `xmlrpc.pwpolicy_del` method. """ - assert api.Command['pwpolicy_del'](group=self.group)['result'] is True + assert api.Command['pwpolicy_del'](self.group)['result'] is True # Verify that it is gone try: - api.Command['pwpolicy_show'](group=self.group) + api.Command['pwpolicy_show'](self.group) except errors.NotFound: pass else: @@ -175,3 +168,4 @@ class test_pwpolicy(XMLRPC_test): # Remove the user we created api.Command['user_del'](self.user) + diff --git a/tests/test_xmlrpc/test_pwpolicy2.py b/tests/test_xmlrpc/test_pwpolicy2.py deleted file mode 100644 index 72c65beda..000000000 --- a/tests/test_xmlrpc/test_pwpolicy2.py +++ /dev/null @@ -1,171 +0,0 @@ -# Authors: -# Rob Crittenden <rcritten@redhat.com> -# Pavel Zuna <pzuna@redhat.com> -# -# Copyright (C) 2010 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; version 2 only -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -""" -Test the `ipalib/plugins/pwpolicy2.py` module. -""" - -import sys -from xmlrpc_test import XMLRPC_test, assert_attr_equal -from ipalib import api -from ipalib import errors - - -class test_pwpolicy2(XMLRPC_test): - """ - Test the `pwpolicy2` plugin. - """ - group = u'testgroup12' - group2 = u'testgroup22' - user = u'testuser12' - kw = {'cospriority': 1, 'krbminpwdlife': 30, 'krbmaxpwdlife': 40, 'krbpwdhistorylength': 5, 'krbpwdminlength': 6 } - kw2 = {'cospriority': 2, 'krbminpwdlife': 40, 'krbmaxpwdlife': 60, 'krbpwdhistorylength': 8, 'krbpwdminlength': 9 } - - def test_1_pwpolicy2_add(self): - """ - Test adding a per-group policy using the `xmlrpc.pwpolicy2_add` method. - """ - # First set up a group and user that will use this policy - self.failsafe_add( - api.Object.group, self.group, description=u'pwpolicy test group', - ) - self.failsafe_add( - api.Object.user, self.user, givenname=u'Test', sn=u'User' - ) - api.Command.group_add_member(self.group, users=self.user) - - entry = api.Command['pwpolicy2_add'](self.group, **self.kw)['result'] - assert_attr_equal(entry, 'krbminpwdlife', '30') - assert_attr_equal(entry, 'krbmaxpwdlife', '40') - assert_attr_equal(entry, 'krbpwdhistorylength', '5') - assert_attr_equal(entry, 'krbpwdminlength', '6') - assert_attr_equal(entry, 'cospriority', '1') - - def test_2_pwpolicy2_add(self): - """ - Add a policy with a already used priority. - - The priority validation is done first, so it's OK that the group - is the same here. - """ - try: - api.Command['pwpolicy2_add'](self.group, **self.kw) - except errors.ValidationError: - pass - else: - assert False - - def test_3_pwpolicy2_add(self): - """ - Add a policy that already exists. - """ - try: - # cospriority needs to be unique - self.kw['cospriority'] = 3 - api.Command['pwpolicy2_add'](self.group, **self.kw) - except errors.DuplicateEntry: - pass - else: - assert False - - def test_4_pwpolicy2_add(self): - """ - Test adding another per-group policy using the `xmlrpc.pwpolicy2_add` method. - """ - self.failsafe_add( - api.Object.group, self.group2, description=u'pwpolicy test group 2' - ) - entry = api.Command['pwpolicy2_add'](self.group2, **self.kw2)['result'] - assert_attr_equal(entry, 'krbminpwdlife', '40') - assert_attr_equal(entry, 'krbmaxpwdlife', '60') - assert_attr_equal(entry, 'krbpwdhistorylength', '8') - assert_attr_equal(entry, 'krbpwdminlength', '9') - assert_attr_equal(entry, 'cospriority', '2') - - def test_5_pwpolicy2_add(self): - """ - Add a pwpolicy for a non-existent group - """ - try: - api.Command['pwpolicy2_add'](u'nopwpolicy', cospriority=1, krbminpwdlife=1) - except errors.NotFound: - pass - else: - assert False - - def test_6_pwpolicy2_show(self): - """ - Test the `xmlrpc.pwpolicy2_show` method with global policy. - """ - entry = api.Command['pwpolicy2_show']()['result'] - # Note that this assumes an unchanged global policy - assert_attr_equal(entry, 'krbminpwdlife', '1') - assert_attr_equal(entry, 'krbmaxpwdlife', '90') - assert_attr_equal(entry, 'krbpwdhistorylength', '0') - assert_attr_equal(entry, 'krbpwdminlength', '8') - - def test_7_pwpolicy2_show(self): - """ - Test the `xmlrpc.pwpolicy2_show` method. - """ - entry = api.Command['pwpolicy2_show'](self.group)['result'] - assert_attr_equal(entry, 'krbminpwdlife', '30') - assert_attr_equal(entry, 'krbmaxpwdlife', '40') - assert_attr_equal(entry, 'krbpwdhistorylength', '5') - assert_attr_equal(entry, 'krbpwdminlength', '6') - assert_attr_equal(entry, 'cospriority', '1') - - def test_8_pwpolicy2_mod(self): - """ - Test the `xmlrpc.pwpolicy2_mod` method for global policy. - """ - entry = api.Command['pwpolicy2_mod'](krbminpwdlife=50)['result'] - assert_attr_equal(entry, 'krbminpwdlife', '50') - - # Great, now change it back - entry = api.Command['pwpolicy2_mod'](krbminpwdlife=1)['result'] - assert_attr_equal(entry, 'krbminpwdlife', '1') - - def test_9_pwpolicy2_mod(self): - """ - Test the `xmlrpc.pwpolicy2_mod` method. - """ - entry = api.Command['pwpolicy2_mod'](self.group, krbminpwdlife=50)['result'] - assert_attr_equal(entry, 'krbminpwdlife', '50') - - def test_a_pwpolicy_del(self): - """ - Test the `xmlrpc.pwpolicy2_del` method. - """ - assert api.Command['pwpolicy2_del'](self.group)['result'] is True - # Verify that it is gone - try: - api.Command['pwpolicy2_show'](self.group) - except errors.NotFound: - pass - else: - assert False - - # Remove the groups we created - api.Command['group_del'](self.group) - api.Command['group_del'](self.group2) - - # Remove the user we created - api.Command['user_del'](self.user) - |