From d1691eee88c5462ef1d015617fd5b65eec0319b9 Mon Sep 17 00:00:00 2001 From: "Thierry bordaz (tbordaz)" Date: Thu, 5 Mar 2015 14:25:33 +0100 Subject: User life cycle: stageuser-add verb Add a accounts plugin (accounts class) that defines variables and methods common to 'users' and 'stageuser'. accounts is a superclass of users/stageuser Add the stageuser plugin, with support of stageuser-add verb. Reviewed By: David Kupka, Martin Basti, Jan Cholasta https://fedorahosted.org/freeipa/ticket/3813 Reviewed-By: Jan Cholasta Reviewed-By: David Kupka --- ipalib/plugins/user.py | 439 +++++-------------------------------------------- 1 file changed, 38 insertions(+), 401 deletions(-) (limited to 'ipalib/plugins/user.py') diff --git a/ipalib/plugins/user.py b/ipalib/plugins/user.py index abe5ee26b..dea946e35 100644 --- a/ipalib/plugins/user.py +++ b/ipalib/plugins/user.py @@ -25,6 +25,12 @@ import os from ipalib import api, errors from ipalib import Flag, Int, Password, Str, Bool, StrEnum, DateTime +from ipalib.plugins.baseuser import baseuser, baseuser_add, baseuser_del, \ + baseuser_mod, baseuser_find, baseuser_show, \ + NO_UPG_MAGIC, UPG_DEFINITION_DN, baseuser_output_params, \ + status_baseuser_output_params, baseuser_pwdchars, \ + validate_nsaccountlock, radius_dn2pk, convert_nsaccountlock, split_principal, validate_principal, \ + normalize_principal, fix_addressbook_permission_bindrule from ipalib.plugable import Registry from ipalib.plugins.baseldap import * from ipalib.plugins import baseldap @@ -85,105 +91,10 @@ EXAMPLES: register = Registry() -NO_UPG_MAGIC = '__no_upg__' - -user_output_params = ( - Flag('has_keytab', - label=_('Kerberos keys available'), - ), - Str('sshpubkeyfp*', - label=_('SSH public key fingerprint'), - ), - ) - -status_output_params = ( - Str('server', - label=_('Server'), - ), - Str('krbloginfailedcount', - label=_('Failed logins'), - ), - Str('krblastsuccessfulauth', - label=_('Last successful authentication'), - ), - Str('krblastfailedauth', - label=_('Last failed authentication'), - ), - Str('now', - label=_('Time now'), - ), - ) - -UPG_DEFINITION_DN = DN(('cn', 'UPG Definition'), - ('cn', 'Definitions'), - ('cn', 'Managed Entries'), - ('cn', 'etc'), - api.env.basedn) - -# characters to be used for generating random user passwords -user_pwdchars = string.digits + string.ascii_letters + '_,.@+-=' - -def validate_nsaccountlock(entry_attrs): - if 'nsaccountlock' in entry_attrs: - nsaccountlock = entry_attrs['nsaccountlock'] - if not isinstance(nsaccountlock, (bool, Bool)): - if not isinstance(nsaccountlock, basestring): - raise errors.OnlyOneValueAllowed(attr='nsaccountlock') - if nsaccountlock.lower() not in ('true', 'false'): - raise errors.ValidationError(name='nsaccountlock', - error=_('must be TRUE or FALSE')) - -def radius_dn2pk(api, entry_attrs): - cl = entry_attrs.get('ipatokenradiusconfiglink', None) - if cl: - pk = api.Object['radiusproxy'].get_primary_key_from_dn(cl[0]) - entry_attrs['ipatokenradiusconfiglink'] = [pk] - -def convert_nsaccountlock(entry_attrs): - if not 'nsaccountlock' in entry_attrs: - entry_attrs['nsaccountlock'] = False - else: - nsaccountlock = Bool('temp') - entry_attrs['nsaccountlock'] = nsaccountlock.convert(entry_attrs['nsaccountlock'][0]) - -def split_principal(principal): - """ - Split the principal into its components and do some basic validation. - Automatically append our realm if it wasn't provided. - """ - realm = None - parts = principal.split('@') - user = parts[0].lower() - if len(parts) > 2: - raise errors.MalformedUserPrincipal(principal=principal) - - if len(parts) == 2: - realm = parts[1].upper() - # At some point we'll support multiple realms - if realm != api.env.realm: - raise errors.RealmMismatch() - else: - realm = api.env.realm - - return (user, realm) - -def validate_principal(ugettext, principal): - """ - All the real work is done in split_principal. - """ - (user, realm) = split_principal(principal) - return None +user_output_params = baseuser_output_params -def normalize_principal(principal): - """ - Ensure that the name in the principal is lower-case. The realm is - upper-case by convention but it isn't required. - - The principal is validated at this point. - """ - (user, realm) = split_principal(principal) - return unicode('%s@%s' % (user, realm)) +status_output_params = status_baseuser_output_params def check_protected_member(user, protected_group_name=u'admins'): @@ -204,60 +115,17 @@ def check_protected_member(user, protected_group_name=u'admins'): raise errors.LastMemberError(key=user, label=_(u'group'), container=protected_group_name) - -def fix_addressbook_permission_bindrule(name, template, is_new, - anonymous_read_aci, - **other_options): - """Fix bind rule type for Read User Addressbook/IPA Attributes permission - - When upgrading from an old IPA that had the global read ACI, - or when installing the first replica with granular read permissions, - we need to keep allowing anonymous access to many user attributes. - This fixup_function changes the bind rule type accordingly. - """ - if is_new and anonymous_read_aci: - template['ipapermbindruletype'] = 'anonymous' - - @register() -class user(LDAPObject): +class user(baseuser): """ User object. """ - container_dn = api.env.container_user - object_name = _('user') - object_name_plural = _('users') - object_class = ['posixaccount'] - object_class_config = 'ipauserobjectclasses' - possible_objectclasses = [ - 'meporiginentry', 'ipauserauthtypeclass', 'ipauser', - 'ipatokenradiusproxyuser' - ] - disallow_object_classes = ['krbticketpolicyaux'] - permission_filter_objectclasses = ['posixaccount'] - search_attributes_config = 'ipausersearchfields' - default_attributes = [ - 'uid', 'givenname', 'sn', 'homedirectory', 'loginshell', - 'uidnumber', 'gidnumber', 'mail', 'ou', - 'telephonenumber', 'title', 'memberof', 'nsaccountlock', - 'memberofindirect', 'ipauserauthtype', 'userclass', - 'ipatokenradiusconfiglink', 'ipatokenradiususername', - 'krbprincipalexpiration' - ] - search_display_attributes = [ - 'uid', 'givenname', 'sn', 'homedirectory', 'loginshell', - 'mail', 'telephonenumber', 'title', 'nsaccountlock', - 'uidnumber', 'gidnumber', 'sshpubkeyfp', - ] - uuid_attribute = 'ipauniqueid' - attribute_members = { - 'memberof': ['group', 'netgroup', 'role', 'hbacrule', 'sudorule'], - 'memberofindirect': ['group', 'netgroup', 'role', 'hbacrule', 'sudorule'], - } - rdn_is_primary_key = True - bindable = True - password_attributes = [('userpassword', 'has_password'), - ('krbprincipalkey', 'has_keytab')] + + container_dn = baseuser.active_container_dn + label = _('Users') + label_singular = _('User') + object_name = _('user') + object_name_plural = _('users') managed_permissions = { 'System: Read User Standard Attributes': { 'replaces_global_anonymous_aci': True, @@ -460,259 +328,28 @@ class user(LDAPObject): }, } - label = _('Users') - label_singular = _('User') - - takes_params = ( - Str('uid', - pattern='^[a-zA-Z0-9_.][a-zA-Z0-9_.-]{0,252}[a-zA-Z0-9_.$-]?$', - pattern_errmsg='may only include letters, numbers, _, -, . and $', - maxlength=255, - cli_name='login', - label=_('User login'), - primary_key=True, - default_from=lambda givenname, sn: givenname[0] + sn, - normalizer=lambda value: value.lower(), - ), - Str('givenname', - cli_name='first', - label=_('First name'), - ), - Str('sn', - cli_name='last', - label=_('Last name'), - ), - Str('cn', - label=_('Full name'), - default_from=lambda givenname, sn: '%s %s' % (givenname, sn), - autofill=True, - ), - Str('displayname?', - label=_('Display name'), - default_from=lambda givenname, sn: '%s %s' % (givenname, sn), - autofill=True, - ), - Str('initials?', - label=_('Initials'), - default_from=lambda givenname, sn: '%c%c' % (givenname[0], sn[0]), - autofill=True, - ), - Str('homedirectory?', - cli_name='homedir', - label=_('Home directory'), - ), - Str('gecos?', - label=_('GECOS'), - default_from=lambda givenname, sn: '%s %s' % (givenname, sn), - autofill=True, - ), - Str('loginshell?', - cli_name='shell', - label=_('Login shell'), - ), - Str('krbprincipalname?', validate_principal, - cli_name='principal', - label=_('Kerberos principal'), - default_from=lambda uid: '%s@%s' % (uid.lower(), api.env.realm), - autofill=True, - flags=['no_update'], - normalizer=lambda value: normalize_principal(value), - ), - DateTime('krbprincipalexpiration?', - cli_name='principal_expiration', - label=_('Kerberos principal expiration'), - ), - Str('mail*', - cli_name='email', - label=_('Email address'), - ), - Password('userpassword?', - cli_name='password', - label=_('Password'), - doc=_('Prompt to set the user password'), - # FIXME: This is temporary till bug is fixed causing updates to - # bomb out via the webUI. - exclude='webui', - ), - Flag('random?', - doc=_('Generate a random user password'), - flags=('no_search', 'virtual_attribute'), - default=False, - ), - Str('randompassword?', - label=_('Random password'), - flags=('no_create', 'no_update', 'no_search', 'virtual_attribute'), - ), - Int('uidnumber?', - cli_name='uid', - label=_('UID'), - doc=_('User ID Number (system will assign one if not provided)'), - minvalue=1, - ), - Int('gidnumber?', - label=_('GID'), - doc=_('Group ID Number'), - minvalue=1, - ), - Str('street?', - cli_name='street', - label=_('Street address'), - ), - Str('l?', - cli_name='city', - label=_('City'), - ), - Str('st?', - cli_name='state', - label=_('State/Province'), - ), - Str('postalcode?', - label=_('ZIP'), - ), - Str('telephonenumber*', - cli_name='phone', - label=_('Telephone Number') - ), - Str('mobile*', - label=_('Mobile Telephone Number') - ), - Str('pager*', - label=_('Pager Number') - ), - Str('facsimiletelephonenumber*', - cli_name='fax', - label=_('Fax Number'), - ), - Str('ou?', - cli_name='orgunit', - label=_('Org. Unit'), - ), - Str('title?', - label=_('Job Title'), - ), - Str('manager?', - label=_('Manager'), - ), - Str('carlicense*', - label=_('Car License'), - ), + takes_params = baseuser.takes_params + ( Bool('nsaccountlock?', label=_('Account disabled'), flags=['no_option'], ), - Str('ipasshpubkey*', validate_sshpubkey, - cli_name='sshpubkey', - label=_('SSH public key'), - normalizer=normalize_sshpubkey, - csv=True, - flags=['no_search'], - ), - StrEnum('ipauserauthtype*', - cli_name='user_auth_type', - label=_('User authentication types'), - doc=_('Types of supported user authentication'), - values=(u'password', u'radius', u'otp'), - csv=True, - ), - Str('userclass*', - cli_name='class', - label=_('Class'), - doc=_('User category (semantics placed on this attribute are for ' - 'local interpretation)'), - ), - Str('ipatokenradiusconfiglink?', - cli_name='radius', - label=_('RADIUS proxy configuration'), - ), - Str('ipatokenradiususername?', - cli_name='radius_username', - label=_('RADIUS proxy username'), - ), - Str('departmentnumber*', - label=_('Department Number'), - ), - Str('employeenumber?', - label=_('Employee Number'), - ), - Str('employeetype?', - label=_('Employee Type'), - ), - Str('preferredlanguage?', - label=_('Preferred Language'), - pattern='^(([a-zA-Z]{1,8}(-[a-zA-Z]{1,8})?(;q\=((0(\.[0-9]{0,3})?)|(1(\.0{0,3})?)))?' \ - + '(\s*,\s*[a-zA-Z]{1,8}(-[a-zA-Z]{1,8})?(;q\=((0(\.[0-9]{0,3})?)|(1(\.0{0,3})?)))?)*)|(\*))$', - pattern_errmsg='must match RFC 2068 - 14.4, e.g., "da, en-gb;q=0.8, en;q=0.7"', - ), ) - def _normalize_and_validate_email(self, email, config=None): - if not config: - config = self.backend.get_ipa_config() - - # check if default email domain should be added - defaultdomain = config.get('ipadefaultemaildomain', [None])[0] - if email: - norm_email = [] - if not isinstance(email, (list, tuple)): - email = [email] - for m in email: - if isinstance(m, basestring): - if '@' not in m and defaultdomain: - m = m + u'@' + defaultdomain - if not Email(m): - raise errors.ValidationError(name='email', error=_('invalid e-mail format: %(email)s') % dict(email=m)) - norm_email.append(m) - else: - if not Email(m): - raise errors.ValidationError(name='email', error=_('invalid e-mail format: %(email)s') % dict(email=m)) - norm_email.append(m) - return norm_email - - return email def _normalize_manager(self, manager): """ Given a userid verify the user's existence and return the dn. """ - if not manager: - return None - - if not isinstance(manager, list): - manager = [manager] - try: - container_dn = DN(self.container_dn, api.env.basedn) - for m in xrange(len(manager)): - if isinstance(manager[m], DN) and manager[m].endswith(container_dn): - continue - entry_attrs = self.backend.find_entry_by_attr( - self.primary_key.name, manager[m], self.object_class, [''], - container_dn - ) - manager[m] = entry_attrs.dn - except errors.NotFound: - raise errors.NotFound(reason=_('manager %(manager)s not found') % dict(manager=manager[m])) - - return manager - - def _convert_manager(self, entry_attrs, **options): - """ - Convert a manager dn into a userid - """ - if options.get('raw', False): - return - - if 'manager' in entry_attrs: - for m in xrange(len(entry_attrs['manager'])): - entry_attrs['manager'][m] = self.get_primary_key_from_dn(entry_attrs['manager'][m]) + return super(user, self).normalize_manager(manager, self.active_container_dn) @register() -class user_add(LDAPCreate): +class user_add(baseuser_add): __doc__ = _('Add a new user.') msg_summary = _('Added user "%(value)s"') - has_output_params = LDAPCreate.has_output_params + user_output_params + has_output_params = baseuser_add.has_output_params + user_output_params takes_options = LDAPCreate.takes_options + ( Flag('noprivate', @@ -798,21 +435,21 @@ class user_add(LDAPCreate): entry_attrs['gidnumber'] = group_attrs['gidnumber'] if 'userpassword' not in entry_attrs and options.get('random'): - entry_attrs['userpassword'] = ipa_generate_password(user_pwdchars) + entry_attrs['userpassword'] = ipa_generate_password(baseuser_pwdchars) # save the password so it can be displayed in post_callback setattr(context, 'randompassword', entry_attrs['userpassword']) if 'mail' in entry_attrs: - entry_attrs['mail'] = self.obj._normalize_and_validate_email(entry_attrs['mail'], config) + entry_attrs['mail'] = self.obj.normalize_and_validate_email(entry_attrs['mail'], config) else: # No e-mail passed in. If we have a default e-mail domain set # then we'll add it automatically. defaultdomain = config.get('ipadefaultemaildomain', [None])[0] if defaultdomain: - entry_attrs['mail'] = self.obj._normalize_and_validate_email(keys[-1], config) + entry_attrs['mail'] = self.obj.normalize_and_validate_email(keys[-1], config) if 'manager' in entry_attrs: - entry_attrs['manager'] = self.obj._normalize_manager(entry_attrs['manager']) + entry_attrs['manager'] = self.obj.normalize_manager(entry_attrs['manager'], self.obj.active_container_dn) if 'userclass' in entry_attrs and \ 'ipauser' not in entry_attrs['objectclass']: @@ -847,7 +484,7 @@ class user_add(LDAPCreate): except errors.AlreadyGroupMember: pass - self.obj._convert_manager(entry_attrs, **options) + self.obj.convert_manager(entry_attrs, **options) # delete description attribute NO_UPG_MAGIC if present if options.get('noprivate', False): if not options.get('all', False): @@ -880,7 +517,7 @@ class user_add(LDAPCreate): @register() -class user_del(LDAPDelete): +class user_del(baseuser_del): __doc__ = _('Delete a user.') msg_summary = _('Deleted user "%(value)s"') @@ -905,12 +542,12 @@ class user_del(LDAPDelete): @register() -class user_mod(LDAPUpdate): +class user_mod(baseuser_mod): __doc__ = _('Modify a user.') msg_summary = _('Modified user "%(value)s"') - has_output_params = LDAPUpdate.has_output_params + user_output_params + has_output_params = baseuser_mod.has_output_params + user_output_params def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options): assert isinstance(dn, DN) @@ -925,12 +562,12 @@ class user_mod(LDAPUpdate): ) ) if 'mail' in entry_attrs: - entry_attrs['mail'] = self.obj._normalize_and_validate_email(entry_attrs['mail']) + entry_attrs['mail'] = self.obj.normalize_and_validate_email(entry_attrs['mail']) if 'manager' in entry_attrs: - entry_attrs['manager'] = self.obj._normalize_manager(entry_attrs['manager']) + entry_attrs['manager'] = self.obj.normalize_manager(entry_attrs['manager'], self.obj.active_container_dn) validate_nsaccountlock(entry_attrs) if 'userpassword' not in entry_attrs and options.get('random'): - entry_attrs['userpassword'] = ipa_generate_password(user_pwdchars) + entry_attrs['userpassword'] = ipa_generate_password(baseuser_pwdchars) # save the password so it can be displayed in post_callback setattr(context, 'randompassword', entry_attrs['userpassword']) if ('ipasshpubkey' in entry_attrs or 'ipauserauthtype' in entry_attrs @@ -970,7 +607,7 @@ class user_mod(LDAPUpdate): # if both randompassword and userpassword options were used pass convert_nsaccountlock(entry_attrs) - self.obj._convert_manager(entry_attrs, **options) + self.obj.convert_manager(entry_attrs, **options) self.obj.get_password_attributes(ldap, dn, entry_attrs) convert_sshpubkey_post(ldap, dn, entry_attrs) radius_dn2pk(self.api, entry_attrs) @@ -978,11 +615,11 @@ class user_mod(LDAPUpdate): @register() -class user_find(LDAPSearch): +class user_find(baseuser_find): __doc__ = _('Search for users.') member_attributes = ['memberof'] - has_output_params = LDAPSearch.has_output_params + user_output_params + has_output_params = baseuser_find.has_output_params + user_output_params takes_options = LDAPSearch.takes_options + ( Flag('whoami', @@ -995,7 +632,7 @@ class user_find(LDAPSearch): # assure the manager attr is a dn, not just a bare uid manager = options.get('manager') if manager is not None: - options['manager'] = self.obj._normalize_manager(manager) + options['manager'] = self.obj.normalize_manager(manager, self.obj.active_container_dn) # Ensure that the RADIUS config link is a dn, not just the name cl = 'ipatokenradiusconfiglink' @@ -1016,7 +653,7 @@ class user_find(LDAPSearch): if options.get('pkey_only', False): return truncated for attrs in entries: - self.obj._convert_manager(attrs, **options) + self.obj.convert_manager(attrs, **options) self.obj.get_password_attributes(ldap, attrs.dn, attrs) convert_nsaccountlock(attrs) convert_sshpubkey_post(ldap, attrs.dn, attrs) @@ -1028,15 +665,15 @@ class user_find(LDAPSearch): @register() -class user_show(LDAPRetrieve): +class user_show(baseuser_show): __doc__ = _('Display information about a user.') - has_output_params = LDAPRetrieve.has_output_params + user_output_params + has_output_params = baseuser_show.has_output_params + user_output_params def post_callback(self, ldap, dn, entry_attrs, *keys, **options): assert isinstance(dn, DN) convert_nsaccountlock(entry_attrs) - self.obj._convert_manager(entry_attrs, **options) + self.obj.convert_manager(entry_attrs, **options) self.obj.get_password_attributes(ldap, dn, entry_attrs) convert_sshpubkey_post(ldap, dn, entry_attrs) radius_dn2pk(self.api, entry_attrs) -- cgit