From 542768bec7bd8b0b33d5161e560d591780e11e70 Mon Sep 17 00:00:00 2001 From: Rob Crittenden Date: Fri, 14 May 2010 15:58:34 -0400 Subject: Replace old pwpolicy plugin with new one using baseldap, fix tests. Fix deletion of policy when a group is removed. --- ipalib/plugins/pwpolicy.py | 587 ++++++++++++++++++--------------------------- 1 file changed, 229 insertions(+), 358 deletions(-) (limited to 'ipalib/plugins/pwpolicy.py') diff --git a/ipalib/plugins/pwpolicy.py b/ipalib/plugins/pwpolicy.py index bf8abcf82..71c355959 100644 --- a/ipalib/plugins/pwpolicy.py +++ b/ipalib/plugins/pwpolicy.py @@ -1,8 +1,7 @@ # Authors: -# Rob Crittenden # Pavel Zuna # -# Copyright (C) 2008 Red Hat +# Copyright (C) 2010 Red Hat # see file 'COPYING' for use and warranty information # # This program is free software; you can redistribute it and/or @@ -17,472 +16,344 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - """ Password policy """ -from ipalib import api, crud, errors -from ipalib import Method, Object +from ipalib import api from ipalib import Int, Str -from ipalib import output -from ipalib import _, ngettext -from ldap.functions import explode_dn - -_fields = { - 'group': 'Group policy', - 'krbminpwdlife': 'Minimum lifetime (in hours)', - 'krbmaxpwdlife': 'Maximum lifetime (in days)', - 'krbpwdmindiffchars': 'Minimum number of characters classes', - 'krbpwdminlength': 'Minimum length', - 'krbpwdhistorylength': 'History size', -} - -_global=u'global' - -def _convert_time_for_output(entry_attrs): - # Convert seconds to hours and days for displaying to user - if 'krbmaxpwdlife' in entry_attrs: - entry_attrs['krbmaxpwdlife'][0] = unicode( - int(entry_attrs['krbmaxpwdlife'][0]) / 86400 - ) - if 'krbminpwdlife' in entry_attrs: - entry_attrs['krbminpwdlife'][0] = unicode( - int(entry_attrs['krbminpwdlife'][0]) / 3600 - ) +from ipalib.plugins.baseldap import * +from ipalib import _ -def _convert_time_on_input(entry_attrs): - # Convert hours and days to seconds for writing to LDAP - if 'krbmaxpwdlife' in entry_attrs and entry_attrs['krbmaxpwdlife']: - entry_attrs['krbmaxpwdlife'] = entry_attrs['krbmaxpwdlife'] * 86400 - if 'krbminpwdlife' in entry_attrs and entry_attrs['krbminpwdlife']: - entry_attrs['krbminpwdlife'] = entry_attrs['krbminpwdlife'] * 3600 -def find_group_dn(group): - """ - Given a group name find the DN of that group +class cosentry(LDAPObject): """ - try: - entry = api.Command['group_show'](group)['result'] - except errors.NotFound: - raise errors.NotFound(reason="group '%s' does not exist" % group) - return entry['dn'] - -def make_cos_entry(group, cospriority=None): + Class of Service object used for linking policies with groups """ - Make the CoS dn and entry for this group. + INTERNAL = True - Returns (cos_dn, cos_entry) where: - cos_dn = DN of the new CoS entry - cos_entry = entry representing this new object - """ - ldap = api.Backend.ldap2 + container_dn = 'cn=costemplates,%s' % api.env.container_accounts + object_class = ['top', 'costemplate', 'extensibleobject', 'krbcontainer'] + default_attributes = ['cn', 'cospriority', 'krbpwdpolicyreference'] - groupdn = find_group_dn(group) + takes_params = ( + Str('cn', primary_key=True), + Str('krbpwdpolicyreference'), + Int('cospriority', minvalue=0), + ) - cos_entry = {} - if cospriority: - cos_entry['cospriority'] = cospriority - cos_entry['objectclass'] = ['top', 'costemplate', 'extensibleobject', 'krbcontainer'] - cos_dn = ldap.make_dn_from_attr( - 'cn', groupdn, 'cn=cosTemplates,%s' % api.env.container_accounts + priority_not_unique_msg = _( + 'priority must be a unique value (%(prio)d already used by %(gname)s)' ) - return (cos_dn, cos_entry) + def get_dn(self, *keys, **options): + group_dn = self.api.Object.group.get_dn(keys[-1]) + return self.backend.make_dn_from_attr( + 'cn', group_dn, self.container_dn + ) + def check_priority_uniqueness(self, *keys, **options): + if options.get('cospriority') is not None: + entries = self.methods.find( + cospriority=options['cospriority'] + )['result'] + if len(entries) > 0: + group_name = self.api.Object.group.get_primary_key_from_dn( + entries[0]['cn'][0] + ) + raise errors.ValidationError( + name='priority', + error=self.priority_not_unique_msg % { + 'prio': options['cospriority'], + 'gname': group_name, + } + ) -def make_policy_entry(group_cn, policy_entry): - """ - Make the krbpwdpolicy dn and entry for this group. +api.register(cosentry) - Returns (policy_dn, policy_entry) where: - policy_dn = DN of the new password policy entry - policy_entry = entry representing this new object - """ - ldap = api.Backend.ldap2 - # This DN must *NOT* have spaces between elements - policy_dn = ldap.make_dn_from_attr( - 'cn', api.env.realm, 'cn=kerberos,%s' % api.env.basedn - ) - policy_dn = ldap.make_dn_from_attr('cn', group_cn, policy_dn) +class cosentry_add(LDAPCreate): + INTERNAL = True - # Create the krb password policy entry. This MUST be located - # in the same container as the REALM or the kldap plugin won't - # recognize it. The usual CoS trick of putting the whole DN into - # the dn won't work either because the kldap plugin doesn't like - # quotes in the DN. - policy_entry['objectclass'] = ['top', 'nscontainer', 'krbpwdpolicy'] - policy_entry['cn'] = group_cn + def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options): + # check for existence of the group + self.api.Command.group_show(keys[-1]) + self.obj.check_priority_uniqueness(*keys, **options) + del entry_attrs['cn'] + return dn - return (policy_dn, policy_entry) +api.register(cosentry_add) -def find_group_policy(ldap): - """ - Return all group policy entries. - """ - attrs = ('cn','krbminpwdlife', 'krbmaxpwdlife', 'krbpwdmindiffchars', 'krbpwdminlength', 'krbpwdhistorylength',) - attr_filter = ldap.make_filter({'objectclass':'krbpwdpolicy'}, rules=ldap.MATCH_ALL) +class cosentry_del(LDAPDelete): + INTERNAL = True - try: - (entries, truncated) = ldap.find_entries( - attr_filter, attrs, 'cn=%s,cn=kerberos,%s' % (api.env.realm, api.env.basedn), scope=ldap.SCOPE_ONELEVEL - ) - except errors.NotFound: - (entries, truncated) = (tuple(), False) +api.register(cosentry_del) - return (entries, truncated) -def unique_priority(ldap, priority): - """ - Return True if the given priority is unique, False otherwise +class cosentry_mod(LDAPUpdate): + INTERNAL = True - Having two cosPriority with the same value is undefined in the DS. + def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options): + self.obj.check_priority_uniqueness(*keys, **options) + return dn - This isn't done as a validation on the attribute since we want it done - only on the server side. - """ - attrs = ('cospriority',) +api.register(cosentry_mod) - attr_filter = ldap.make_filter({'objectclass':'krbcontainer', 'cospriority':priority }, rules=ldap.MATCH_ALL) - try: - (entries, truncated) = ldap.find_entries( - attr_filter, attrs, 'cn=cosTemplates,%s' % api.env.container_accounts, scope=ldap.SCOPE_ONELEVEL - ) - return False - except errors.NotFound: - return True +class cosentry_show(LDAPRetrieve): + INTERNAL = True + +api.register(cosentry_show) - return True -class pwpolicy(Object): +class cosentry_find(LDAPSearch): + INTERNAL = True + +api.register(cosentry_find) + + +GLOBAL_POLICY_NAME = u'GLOBAL' + + +class pwpolicy(LDAPObject): """ - Password Policy object. + Password Policy object """ + container_dn = 'cn=%s,cn=kerberos' % api.env.realm + object_name = 'password policy' + object_name_plural = 'password policies' + object_class = ['top', 'nscontainer', 'krbpwdpolicy'] + default_attributes = [ + 'cn', 'cospriority', 'krbmaxpwdlife', 'krbminpwdlife', + 'krbpwdhistorylength', 'krbpwdmindiffchars', 'krbpwdminlength', + ] takes_params = ( Str('cn?', + cli_name='group', label=_('Group'), - flags=['no_create', 'no_update', 'no_search'], + doc=_('Manage password policy for specific group'), + primary_key=True, ), Int('krbmaxpwdlife?', cli_name='maxlife', label=_('Max lifetime (days)'), doc=_('Maximum password lifetime (in days)'), minvalue=0, - attribute=True, ), Int('krbminpwdlife?', cli_name='minlife', label=_('Min lifetime (hours)'), doc=_('Minimum password lifetime (in hours)'), minvalue=0, - attribute=True, ), Int('krbpwdhistorylength?', cli_name='history', label=_('History size'), doc=_('Password history size'), minvalue=0, - attribute=True, ), Int('krbpwdmindiffchars?', cli_name='minclasses', label=_('Character classes'), doc=_('Minimum number of character classes'), minvalue=0, - attribute=True, + maxvalue=5, ), Int('krbpwdminlength?', cli_name='minlength', label=_('Min length'), doc=_('Minimum length of password'), minvalue=0, - attribute=True, - ), - ) - -api.register(pwpolicy) - - -class pwpolicy_add(crud.Create): - """ - Create a new password policy associated with a group. - """ - - msg_summary = _('Added policy for group "%(value)s"') - - takes_options = ( - Str('group', - label=_('Group'), - doc=_('Group to set policy for'), - attribute=False, ), Int('cospriority', cli_name='priority', label=_('Priority'), - doc=_('Priority of the policy (higher number equals lower priority)'), + doc=_('Priority of the policy (higher number means lower priority'), minvalue=0, - attribute=True, ), ) - def execute(self, *args, **options): - ldap = self.api.Backend.ldap2 - - group_cn = options['group'] - - if 'cospriority' in options: - if not unique_priority(ldap, options['cospriority']): - raise errors.ValidationError(name='priority', error=_('Priority must be a unique value.')) - - # Create the CoS template - (cos_dn, cos_entry) = make_cos_entry(group_cn, options.get('cospriority', None)) - if 'cospriority' in options: - del options['cospriority'] - - # Create the new password policy - policy_entry = self.args_options_2_entry(*args, **options) - (policy_dn, policy_entry) = make_policy_entry(group_cn, policy_entry) - _convert_time_on_input(policy_entry) + def get_dn(self, *keys, **options): + if keys[-1] is not None: + return self.backend.make_dn_from_attr( + self.primary_key.name, keys[-1], self.container_dn + ) + return self.api.env.container_accounts + + def convert_time_for_output(self, entry_attrs, **options): + # Convert seconds to hours and days for displaying to user + if not options.get('raw', False): + if 'krbmaxpwdlife' in entry_attrs: + entry_attrs['krbmaxpwdlife'][0] = unicode( + int(entry_attrs['krbmaxpwdlife'][0]) / 86400 + ) + if 'krbminpwdlife' in entry_attrs: + entry_attrs['krbminpwdlife'][0] = unicode( + int(entry_attrs['krbminpwdlife'][0]) / 3600 + ) - # Link the two entries together - cos_entry['krbpwdpolicyreference'] = policy_dn + def convert_time_on_input(self, entry_attrs): + # Convert hours and days to seconds for writing to LDAP + if 'krbmaxpwdlife' in entry_attrs and entry_attrs['krbmaxpwdlife']: + entry_attrs['krbmaxpwdlife'] = entry_attrs['krbmaxpwdlife'] * 86400 + if 'krbminpwdlife' in entry_attrs and entry_attrs['krbminpwdlife']: + entry_attrs['krbminpwdlife'] = entry_attrs['krbminpwdlife'] * 3600 - ldap.add_entry(policy_dn, policy_entry) - ldap.add_entry(cos_dn, cos_entry) +api.register(pwpolicy) - # The policy is what is interesting, return that - (dn, entry_attrs) = ldap.get_entry(policy_dn, policy_entry.keys()) - _convert_time_for_output(entry_attrs) +class pwpolicy_add(LDAPCreate): + """ + Create new group password policy. + """ + def get_args(self): + yield self.obj.primary_key.clone(attribute=True, required=True) - entry_attrs['dn'] = dn - return dict(result=entry_attrs, value=group_cn) + def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options): + self.api.Command.cosentry_add( + keys[-1], krbpwdpolicyreference=dn, + cospriority=options.get('cospriority') + ) + if 'cospriority' in entry_attrs: + del entry_attrs['cospriority'] + self.obj.convert_time_on_input(entry_attrs) + return dn + + def post_callback(self, ldap, dn, entry_attrs, *keys, **options): + self.log.info('%r' % entry_attrs) + if not options.get('raw', False): + if options.get('cospriority') is not None: + entry_attrs['cospriority'] = [unicode(options['cospriority'])] + self.obj.convert_time_for_output(entry_attrs, **options) + return dn api.register(pwpolicy_add) -class pwpolicy_mod(crud.Update): +class pwpolicy_del(LDAPDelete): """ - Modify password policy. + Delete group password policy. """ - msg_summary = _('Modified policy for group "%(value)s"') - takes_options = ( - Str('group?', - label=_('Group'), - doc=_('Group to set policy for'), - attribute=False, - ), - Int('cospriority?', - cli_name='priority', - label=_('Priority'), - doc=_('Priority of the policy (higher number equals lower priority)'), - minvalue=0, - attribute=True, - ), - ) + def get_args(self): + yield self.obj.primary_key.clone(attribute=True, required=True) - def execute(self, *args, **options): - assert 'dn' not in options - ldap = self.api.Backend.ldap2 - cospriority = None - - if 'group' in options: - group_cn = options['group'] - del options['group'] - else: - group_cn = None - if len(options) == 2: # 'all' and 'raw' are always sent - raise errors.EmptyModlist() - - if not group_cn: - group_cn = _global - if 'cospriority' in options: - raise errors.ValidationError(name='priority', error=_('priority cannot be set on global policy')) - dn = self.api.env.container_accounts - entry_attrs = self.args_options_2_entry(*args, **options) - else: - if 'cospriority' in options: - if options['cospriority'] is None: - raise errors.RequirementError(name='priority') - if not unique_priority(ldap, options['cospriority']): - raise errors.ValidationError(name='priority', error=_('Priority must be a unique value.')) - groupdn = find_group_dn(group_cn) - cos_dn = ldap.make_dn_from_attr( - 'cn', groupdn, - 'cn=cosTemplates,%s' % self.api.env.container_accounts - ) - ldap.update_entry(cos_dn, dict(cospriority = options['cospriority'])) - cospriority = options['cospriority'] - del options['cospriority'] - entry_attrs = self.args_options_2_entry(*args, **options) - (dn, entry_attrs) = make_policy_entry(group_cn, entry_attrs) - _convert_time_on_input(entry_attrs) + def post_callback(self, ldap, dn, *keys, **options): try: - ldap.update_entry(dn, entry_attrs) - except errors.EmptyModlist: + self.api.Command.cosentry_del(keys[-1]) + except errors.NotFound: pass + return True - (dn, entry_attrs) = ldap.get_entry(dn, entry_attrs.keys()) - - if cospriority: - entry_attrs['cospriority'] = cospriority - - _convert_time_for_output(entry_attrs) - - return dict(result=entry_attrs, value=group_cn) - -api.register(pwpolicy_mod) +api.register(pwpolicy_del) -class pwpolicy_del(crud.Delete): +class pwpolicy_mod(LDAPUpdate): """ - Delete a group password policy. + Modify group password policy. """ + def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options): + if options.get('cospriority') is not None: + if keys[-1] is None: + raise errors.ValidationError( + name='priority', + error=_('priority cannot be set on global policy') + ) + try: + self.api.Command.cosentry_mod( + keys[-1], cospriority=options['cospriority'] + ) + except errors.NotFound: + self.api.Command.cosentry_add( + keys[-1], krbpwdpolicyreference=dn, + cospriority=options['cospriority'] + ) + del entry_attrs['cospriority'] + self.obj.convert_time_on_input(entry_attrs) + return dn + + def post_callback(self, ldap, dn, entry_attrs, *keys, **options): + if not options.get('raw', False): + if options.get('cospriority') is not None: + entry_attrs['cospriority'] = [unicode(options['cospriority'])] + if keys[-1] is None: + entry_attrs['cn'] = GLOBAL_POLICY_NAME + self.obj.convert_time_for_output(entry_attrs, **options) + return dn + + def exc_callback(self, keys, options, exc, call_func, *call_args, **call_kwargs): + if isinstance(exc, errors.EmptyModlist): + entry_attrs = call_args[1] + if not entry_attrs and 'cospriority' in options: + return + raise exc - msg_summary = _('Deleted policy for group "%(value)s"') - takes_options = ( - Str('group', - doc=_('Group to remove policy from'), - ), - ) - - def execute(self, *args, **options): - assert 'dn' not in options - ldap = self.api.Backend.ldap2 - - group_cn = options['group'] - - # Get the DN of the CoS template to delete - try: - (cos_dn, cos_entry) = make_cos_entry(group_cn, None) - except errors.NotFound: - # Ok, perhaps the group was deleted, try to make the group DN - rdn = ldap.make_rdn_from_attr('cn', group_cn) - group_dn = ldap.make_dn_from_rdn(rdn, api.env.container_group) - cos_dn = ldap.make_dn_from_attr( - 'cn', group_dn, - 'cn=cosTemplates,%s' % self.api.env.container_accounts - ) - policy_entry = self.args_options_2_entry(*args, **options) - (policy_dn, policy_entry) = make_policy_entry(group_cn, policy_entry) - ldap.delete_entry(policy_dn) - ldap.delete_entry(cos_dn) - return dict( - result=True, - value=group_cn, - ) - -api.register(pwpolicy_del) +api.register(pwpolicy_mod) -class pwpolicy_show(Method): +class pwpolicy_show(LDAPRetrieve): """ - Display password policy. + Display group password policy. """ - - has_output = ( - output.Entry('result'), - ) takes_options = ( - Str('group?', - label=_('Group'), - doc=_('Group to display policy'), - ), Str('user?', label=_('User'), - doc=_('Display policy applied to a given user'), - ), - Int('cospriority?', - cli_name='priority', - label=_('Priority'), - flags=['no_create', 'no_update', 'no_search'], + doc=_('Display effective policy for a specific user'), ), ) - def execute(self, *args, **options): - ldap = self.api.Backend.ldap2 - - dn = None - group = None - - if 'user' in options: - rdn = ldap.make_rdn_from_attr('uid', options['user']) - user_dn = ldap.make_dn_from_rdn(rdn, api.env.container_user) - try: - (user_dn, user_attrs) = ldap.get_entry(user_dn, ['krbpwdpolicyreference']) - if 'krbpwdpolicyreference' in user_attrs: - dn = user_attrs['krbpwdpolicyreference'][0] - rdn = explode_dn(dn) - group = rdn[0].replace('cn=','') - except errors.NotFound: - raise errors.NotFound(reason="user '%s' not found" % options['user']) - - if dn is None: - if not 'group' in options: - dn = self.api.env.container_accounts + def pre_callback(self, ldap, dn, attrs_list, *keys, **options): + if options.get('user') is not None: + user_entry = self.api.Command.user_show( + options['user'], all=True + )['result'] + if 'krbpwdpolicyreference' in user_entry: + return user_entry.get('krbpwdpolicyreference', [dn])[0] + return dn + + def post_callback(self, ldap, dn, entry_attrs, *keys, **options): + if not options.get('raw', False): + if keys[-1] is not None: + try: + cos_entry = self.api.Command.cosentry_show( + keys[-1] + )['result'] + if cos_entry.get('cospriority') is not None: + entry_attrs['cospriority'] = cos_entry['cospriority'] + except errors.NotFound: + pass else: - policy_entry = self.args_options_2_entry(*args, **options) - (dn, policy_entry) = make_policy_entry(options['group'], policy_entry) - (dn, entry_attrs) = ldap.get_entry(dn) - - if 'group' in options: - groupdn = find_group_dn(options['group']) - cos_dn = ldap.make_dn_from_attr( - 'cn', groupdn, - 'cn=cosTemplates,%s' % self.api.env.container_accounts - ) - (dn, cos_attrs) = ldap.get_entry(cos_dn) - entry_attrs['cospriority'] = cos_attrs['cospriority'] - else: - entry_attrs['cn'] = _global - - if 'user' in options: - if group: - entry_attrs['cn'] = unicode(group) - _convert_time_for_output(entry_attrs) - - return dict(result=entry_attrs) + entry_attrs['cn'] = GLOBAL_POLICY_NAME + self.obj.convert_time_for_output(entry_attrs, **options) + return dn api.register(pwpolicy_show) -class pwpolicy_find(Method): + +class pwpolicy_find(LDAPSearch): """ - Display all groups with a password policy. + Search for group password policies. """ - - has_output = output.standard_list_of_entries - - takes_options = ( - Int('cospriority?', - cli_name='priority', - label=_('Priority'), - flags=['no_create', 'no_update', 'no_search'], - ), - ) - - def execute(self, *args, **options): - ldap = self.api.Backend.ldap2 - - (entries, truncated) = find_group_policy(ldap) - for e in entries: - _convert_time_for_output(e[1]) - e[1]['dn'] = e[0] - groupdn = find_group_dn(e[1]['cn'][0]) - cos_dn = ldap.make_dn_from_attr( - 'cn', groupdn, - 'cn=cosTemplates,%s' % self.api.env.container_accounts - ) - (dn, cos_attrs) = ldap.get_entry(cos_dn) - e[1]['cospriority'] = cos_attrs['cospriority'] - entries = tuple(e for (dn, e) in entries) - - return dict(result=entries, - count=len(entries), - truncated=truncated, - ) + def post_callback(self, ldap, entries, truncated, *args, **options): + if not options.get('raw', False): + for e in entries: + try: + cos_entry = self.api.Command.cosentry_show( + e[1]['cn'][0] + )['result'] + if cos_entry.get('cospriority') is not None: + e[1]['cospriority'] = cos_entry['cospriority'] + except errors.NotFound: + pass + self.obj.convert_time_for_output(e[1], **options) + if not args[-1]: + global_entry = self.api.Command.pwpolicy_show( + all=options.get('all', False), raw=options.get('raw', False) + )['result'] + dn = global_entry['dn'] + del global_entry['dn'] + entries.insert(0, (dn, global_entry)) api.register(pwpolicy_find) + -- cgit