summaryrefslogtreecommitdiffstats
path: root/ipaserver/plugins/idviews.py
diff options
context:
space:
mode:
authorJan Cholasta <jcholast@redhat.com>2016-04-28 10:30:05 +0200
committerJan Cholasta <jcholast@redhat.com>2016-06-03 09:00:34 +0200
commit6e44557b601f769d23ee74555a72e8b5cc62c0c9 (patch)
treeeedd3e054b0709341b9f58c190ea54f999f7d13a /ipaserver/plugins/idviews.py
parentec841e5d7ab29d08de294b3fa863a631cd50e30a (diff)
downloadfreeipa-6e44557b601f769d23ee74555a72e8b5cc62c0c9.tar.gz
freeipa-6e44557b601f769d23ee74555a72e8b5cc62c0c9.tar.xz
freeipa-6e44557b601f769d23ee74555a72e8b5cc62c0c9.zip
ipalib: move server-side plugins to ipaserver
Move the remaining plugin code from ipalib.plugins to ipaserver.plugins. Remove the now unused ipalib.plugins package. https://fedorahosted.org/freeipa/ticket/4739 Reviewed-By: David Kupka <dkupka@redhat.com>
Diffstat (limited to 'ipaserver/plugins/idviews.py')
-rw-r--r--ipaserver/plugins/idviews.py1123
1 files changed, 1123 insertions, 0 deletions
diff --git a/ipaserver/plugins/idviews.py b/ipaserver/plugins/idviews.py
new file mode 100644
index 000000000..537f924ce
--- /dev/null
+++ b/ipaserver/plugins/idviews.py
@@ -0,0 +1,1123 @@
+# Authors:
+# Alexander Bokovoy <abokovoy@redhat.com>
+# Tomas Babej <tbabej@redhat.com>
+#
+# Copyright (C) 2014 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import re
+
+import six
+
+from .baseldap import (LDAPQuery, LDAPObject, LDAPCreate,
+ LDAPDelete, LDAPUpdate, LDAPSearch,
+ LDAPAddAttribute, LDAPRemoveAttribute,
+ LDAPRetrieve, global_output_params)
+from .hostgroup import get_complete_hostgroup_member_list
+from .service import validate_certificate
+from ipalib import api, Str, Int, Bytes, Flag, _, ngettext, errors, output
+from ipalib.constants import IPA_ANCHOR_PREFIX, SID_ANCHOR_PREFIX
+from ipalib.plugable import Registry
+from ipalib.util import (normalize_sshpubkey, validate_sshpubkey,
+ convert_sshpubkey_post)
+
+from ipapython.dn import DN
+
+if six.PY3:
+ unicode = str
+
+_dcerpc_bindings_installed = False
+
+if api.env.in_server and api.env.context in ['lite', 'server']:
+ try:
+ import ipaserver.dcerpc
+ _dcerpc_bindings_installed = True
+ except ImportError:
+ pass
+
+__doc__ = _("""
+ID Views
+Manage ID Views
+IPA allows to override certain properties of users and groups per each host.
+This functionality is primarily used to allow migration from older systems or
+other Identity Management solutions.
+""")
+
+register = Registry()
+
+protected_default_trust_view_error = errors.ProtectedEntryError(
+ label=_('ID View'),
+ key=u"Default Trust View",
+ reason=_('system ID View')
+)
+
+fallback_to_ldap_option = Flag(
+ 'fallback_to_ldap?',
+ default=False,
+ label=_('Fallback to AD DC LDAP'),
+ doc=_("Allow falling back to AD DC LDAP when resolving AD "
+ "trusted objects. For two-way trusts only."),
+)
+
+DEFAULT_TRUST_VIEW_NAME = "default trust view"
+
+ANCHOR_REGEX = re.compile(
+ r':IPA:.*:[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}'
+ r'|'
+ r':SID:S-[0-9\-]+'
+)
+
+
+@register()
+class idview(LDAPObject):
+ """
+ ID View object.
+ """
+
+ container_dn = api.env.container_views
+ object_name = _('ID View')
+ object_name_plural = _('ID Views')
+ object_class = ['ipaIDView', 'top']
+ default_attributes = ['cn', 'description']
+ rdn_is_primary_key = True
+
+ label = _('ID Views')
+ label_singular = _('ID View')
+
+ takes_params = (
+ Str('cn',
+ cli_name='name',
+ label=_('ID View Name'),
+ primary_key=True,
+ ),
+ Str('description?',
+ cli_name='desc',
+ label=_('Description'),
+ ),
+ )
+
+ permission_filter_objectclasses = ['nsContainer']
+ managed_permissions = {
+ 'System: Read ID Views': {
+ 'ipapermbindruletype': 'all',
+ 'ipapermright': {'read', 'search', 'compare'},
+ 'ipapermdefaultattr': {
+ 'cn', 'description', 'objectClass',
+ },
+ },
+ }
+
+
+@register()
+class idview_add(LDAPCreate):
+ __doc__ = _('Add a new ID View.')
+ msg_summary = _('Added ID View "%(value)s"')
+
+
+@register()
+class idview_del(LDAPDelete):
+ __doc__ = _('Delete an ID View.')
+ msg_summary = _('Deleted ID View "%(value)s"')
+
+ def pre_callback(self, ldap, dn, *keys, **options):
+ for key in keys:
+ if key.lower() == DEFAULT_TRUST_VIEW_NAME:
+ raise protected_default_trust_view_error
+
+ return dn
+
+
+@register()
+class idview_mod(LDAPUpdate):
+ __doc__ = _('Modify an ID View.')
+ msg_summary = _('Modified an ID View "%(value)s"')
+
+ def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
+ for key in keys:
+ if key.lower() == DEFAULT_TRUST_VIEW_NAME:
+ raise protected_default_trust_view_error
+
+ return dn
+
+
+@register()
+class idview_find(LDAPSearch):
+ __doc__ = _('Search for an ID View.')
+ msg_summary = ngettext('%(count)d ID View matched',
+ '%(count)d ID Views matched', 0)
+
+
+@register()
+class idview_show(LDAPRetrieve):
+ __doc__ = _('Display information about an ID View.')
+
+ takes_options = LDAPRetrieve.takes_options + (
+ Flag('show_hosts?',
+ cli_name='show_hosts',
+ doc=_('Enumerate all the hosts the view applies to.'),
+ ),
+ )
+
+ has_output_params = global_output_params + (
+ Str('useroverrides',
+ label=_('User object overrides'),
+ ),
+ Str('groupoverrides',
+ label=_('Group object overrides'),
+ ),
+ Str('appliedtohosts',
+ label=_('Hosts the view applies to')
+ ),
+ )
+
+ def show_id_overrides(self, dn, entry_attrs):
+ ldap = self.obj.backend
+
+ for objectclass, obj_type in [('ipaUserOverride', 'user'),
+ ('ipaGroupOverride', 'group')]:
+
+ # Attribute to store results is called (user|group)overrides
+ attr_name = obj_type + 'overrides'
+
+ try:
+ (overrides, truncated) = ldap.find_entries(
+ filter="objectclass=%s" % objectclass,
+ attrs_list=['ipaanchoruuid'],
+ base_dn=dn,
+ scope=ldap.SCOPE_ONELEVEL,
+ paged_search=True)
+
+ resolved_overrides = []
+ for override in overrides:
+ anchor = override.single_value['ipaanchoruuid']
+
+ try:
+ name = resolve_anchor_to_object_name(ldap, obj_type,
+ anchor)
+ resolved_overrides.append(name)
+
+ except (errors.NotFound, errors.ValidationError):
+ # Anchor could not be resolved, use raw
+ resolved_overrides.append(anchor)
+
+ entry_attrs[attr_name] = resolved_overrides
+
+ except errors.NotFound:
+ # No overrides found, nothing to do
+ pass
+
+ def enumerate_hosts(self, dn, entry_attrs):
+ ldap = self.obj.backend
+
+ filter_params = {
+ 'ipaAssignedIDView': dn,
+ 'objectClass': 'ipaHost',
+ }
+
+ try:
+ (hosts, truncated) = ldap.find_entries(
+ filter=ldap.make_filter(filter_params, rules=ldap.MATCH_ALL),
+ attrs_list=['cn'],
+ base_dn=api.env.container_host + api.env.basedn,
+ scope=ldap.SCOPE_ONELEVEL,
+ paged_search=True)
+
+ entry_attrs['appliedtohosts'] = [host.single_value['cn']
+ for host in hosts]
+ except errors.NotFound:
+ pass
+
+ def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
+ self.show_id_overrides(dn, entry_attrs)
+
+ # Enumerating hosts is a potentially expensive operation (uses paged
+ # search to list all the hosts the ID view applies to). Show the list
+ # of the hosts only if explicitly asked for (or asked for --all).
+ # Do not display with --raw, since this attribute does not exist in
+ # LDAP.
+
+ if ((options.get('show_hosts') or options.get('all'))
+ and not options.get('raw')):
+ self.enumerate_hosts(dn, entry_attrs)
+
+ return dn
+
+
+class baseidview_apply(LDAPQuery):
+ """
+ Base class for idview_apply and idview_unapply commands.
+ """
+
+ has_output_params = global_output_params
+
+ def execute(self, *keys, **options):
+ view = keys[-1] if keys else None
+ ldap = self.obj.backend
+
+ # Test if idview actually exists, if it does not, NotFound is raised
+ if not options.get('clear_view', False):
+ view_dn = self.api.Object['idview'].get_dn_if_exists(view)
+ assert isinstance(view_dn, DN)
+
+ # Check that we're not applying the Default Trust View
+ if view.lower() == DEFAULT_TRUST_VIEW_NAME:
+ raise errors.ValidationError(
+ name=_('ID View'),
+ error=_('Default Trust View cannot be applied on hosts')
+ )
+
+ else:
+ # In case we are removing assigned view, we modify the host setting
+ # the ipaAssignedIDView to None
+ view_dn = None
+
+ completed = 0
+ succeeded = {'host': []}
+ failed = {
+ 'host': [],
+ 'hostgroup': [],
+ }
+
+ # Make sure we ignore None passed via host or hostgroup, since it does
+ # not make sense
+ for key in ('host', 'hostgroup'):
+ if key in options and options[key] is None:
+ del options[key]
+
+ # Generate a list of all hosts to apply the view to
+ hosts_to_apply = list(options.get('host', []))
+
+ for hostgroup in options.get('hostgroup', ()):
+ try:
+ hosts_to_apply += get_complete_hostgroup_member_list(hostgroup)
+ except errors.NotFound:
+ failed['hostgroup'].append((hostgroup, unicode(_("not found"))))
+ except errors.PublicError as e:
+ failed['hostgroup'].append((hostgroup, "%s : %s" % (
+ e.__class__.__name__, str(e))))
+
+ for host in hosts_to_apply:
+ try:
+ host_dn = api.Object['host'].get_dn_if_exists(host)
+
+ host_entry = ldap.get_entry(host_dn,
+ attrs_list=['ipaassignedidview'])
+ host_entry['ipaassignedidview'] = view_dn
+
+ ldap.update_entry(host_entry)
+
+ # If no exception was raised, view assigment went well
+ completed = completed + 1
+ succeeded['host'].append(host)
+ except errors.EmptyModlist:
+ # If view was already applied, complain about it
+ failed['host'].append((host,
+ unicode(_("ID View already applied"))))
+ except errors.NotFound:
+ failed['host'].append((host, unicode(_("not found"))))
+ except errors.PublicError as e:
+ failed['host'].append((host, str(e)))
+
+ # Wrap dictionary containing failures in another dictionary under key
+ # 'memberhost', since that is output parameter in global_output_params
+ # and thus we get nice output in the CLI
+ failed = {'memberhost': failed}
+
+ # Sort the list of affected hosts
+ succeeded['host'].sort()
+
+ # Note that we're returning the list of affected hosts even if they
+ # were passed via referencing a hostgroup. This is desired, since we
+ # want to stress the fact that view is applied on all the current
+ # member hosts of the hostgroup and not tied with the hostgroup itself.
+
+ return dict(
+ summary=unicode(_(self.msg_summary % {'value': view})),
+ succeeded=succeeded,
+ completed=completed,
+ failed=failed,
+ )
+
+
+@register()
+class idview_apply(baseidview_apply):
+ __doc__ = _('Applies ID View to specified hosts or current members of '
+ 'specified hostgroups. If any other ID View is applied to '
+ 'the host, it is overridden.')
+
+ member_count_out = (_('ID View applied to %i host.'),
+ _('ID View applied to %i hosts.'))
+
+ msg_summary = 'Applied ID View "%(value)s"'
+
+ takes_options = (
+ Str('host*',
+ cli_name='hosts',
+ doc=_('Hosts to apply the ID View to'),
+ label=_('hosts'),
+ ),
+ Str('hostgroup*',
+ cli_name='hostgroups',
+ doc=_('Hostgroups to whose hosts apply the ID View to. Please note '
+ 'that view is not applied automatically to any hosts added '
+ 'to the hostgroup after running the idview-apply command.'),
+ label=_('hostgroups'),
+ ),
+ )
+
+ has_output = (
+ output.summary,
+ output.Output('succeeded',
+ type=dict,
+ doc=_('Hosts that this ID View was applied to.'),
+ ),
+ output.Output('failed',
+ type=dict,
+ doc=_('Hosts or hostgroups that this ID View could not be '
+ 'applied to.'),
+ ),
+ output.Output('completed',
+ type=int,
+ doc=_('Number of hosts the ID View was applied to:'),
+ ),
+ )
+
+
+@register()
+class idview_unapply(baseidview_apply):
+ __doc__ = _('Clears ID View from specified hosts or current members of '
+ 'specified hostgroups.')
+
+ member_count_out = (_('ID View cleared from %i host.'),
+ _('ID View cleared from %i hosts.'))
+
+ msg_summary = 'Cleared ID Views'
+
+ takes_options = (
+ Str('host*',
+ cli_name='hosts',
+ doc=_('Hosts to clear (any) ID View from.'),
+ label=_('hosts'),
+ ),
+ Str('hostgroup*',
+ cli_name='hostgroups',
+ doc=_('Hostgroups whose hosts should have ID Views cleared. Note '
+ 'that view is not cleared automatically from any host added '
+ 'to the hostgroup after running idview-unapply command.'),
+ label=_('hostgroups'),
+ ),
+ )
+
+ has_output = (
+ output.summary,
+ output.Output('succeeded',
+ type=dict,
+ doc=_('Hosts that ID View was cleared from.'),
+ ),
+ output.Output('failed',
+ type=dict,
+ doc=_('Hosts or hostgroups that ID View could not be cleared '
+ 'from.'),
+ ),
+ output.Output('completed',
+ type=int,
+ doc=_('Number of hosts that had a ID View was unset:'),
+ ),
+ )
+
+ # Take no arguments, since ID View reference is not needed to clear
+ # the hosts
+ def get_args(self):
+ return ()
+
+ def execute(self, *keys, **options):
+ options['clear_view'] = True
+ return super(idview_unapply, self).execute(*keys, **options)
+
+
+# ID overrides helper methods
+def verify_trusted_domain_object_type(validator, desired_type, name_or_sid):
+
+ object_type = validator.get_trusted_domain_object_type(name_or_sid)
+
+ if object_type == desired_type:
+ # In case SSSD returns the same type as the type being
+ # searched, no problems here.
+ return True
+
+ elif desired_type == 'user' and object_type == 'both':
+ # Type both denotes users with magic private groups.
+ # Overriding attributes for such users is OK.
+ return True
+
+ elif desired_type == 'group' and object_type == 'both':
+ # However, overriding attributes for magic private groups
+ # does not make sense. One should override the GID of
+ # the user itself.
+
+ raise errors.ConversionError(
+ name='identifier',
+ error=_('You are trying to reference a magic private group '
+ 'which is not allowed to be overridden. '
+ 'Try overriding the GID attribute of the '
+ 'corresponding user instead.')
+ )
+
+ return False
+
+
+def resolve_object_to_anchor(ldap, obj_type, obj, fallback_to_ldap):
+ """
+ Resolves the user/group name to the anchor uuid:
+ - first it tries to find the object as user or group in IPA (depending
+ on the passed obj_type)
+ - if the IPA lookup failed, lookup object SID in the trusted domains
+
+ Takes options:
+ ldap - the backend
+ obj_type - either 'user' or 'group'
+ obj - the name of the object, e.g 'admin' or 'testuser'
+ """
+
+ try:
+ entry = ldap.get_entry(api.Object[obj_type].get_dn(obj),
+ attrs_list=['ipaUniqueID', 'objectClass'])
+
+ # First we check this is a valid object to override
+ # - for groups, it must have ipaUserGroup objectclass
+ # - for users, it must have posixAccount objectclass
+
+ required_objectclass = {
+ 'user': 'posixaccount',
+ 'group': 'ipausergroup',
+ }[obj_type]
+
+ if required_objectclass not in entry['objectclass']:
+ raise errors.ValidationError(
+ name=_('IPA object'),
+ error=_('system IPA objects (e.g system groups, user '
+ 'private groups) cannot be overridden')
+ )
+
+ # The domain prefix, this will need to be reworked once we
+ # introduce IPA-IPA trusts
+ domain = api.env.domain
+ uuid = entry.single_value['ipaUniqueID']
+
+ return "%s%s:%s" % (IPA_ANCHOR_PREFIX, domain, uuid)
+ except errors.NotFound:
+ pass
+
+ # If not successfull, try looking up the object in the trusted domain
+ try:
+ if _dcerpc_bindings_installed:
+ domain_validator = ipaserver.dcerpc.DomainValidator(api)
+ if domain_validator.is_configured():
+ sid = domain_validator.get_trusted_domain_object_sid(obj,
+ fallback_to_ldap=fallback_to_ldap)
+
+ # We need to verify that the object type is correct
+ type_correct = verify_trusted_domain_object_type(
+ domain_validator, obj_type, sid)
+
+ if type_correct:
+ # There is no domain prefix since SID contains information
+ # about the domain
+ return SID_ANCHOR_PREFIX + sid
+
+ except errors.ValidationError:
+ # Domain validator raises Validation Error if object name does not
+ # contain domain part (either NETBIOS\ prefix or @domain.name suffix)
+ pass
+
+ # No acceptable object was found
+ api.Object[obj_type].handle_not_found(obj)
+
+
+def resolve_anchor_to_object_name(ldap, obj_type, anchor):
+ """
+ Resolves IPA Anchor UUID to the actual common object name (uid for users,
+ cn for groups).
+
+ Takes options:
+ ldap - the backend
+ anchor - the anchor, e.g.
+ ':IPA:ipa.example.com:2cb604ea-39a5-11e4-a37e-001a4a22216f'
+ """
+
+ if anchor.startswith(IPA_ANCHOR_PREFIX):
+
+ # Prepare search parameters
+ accounts_dn = DN(api.env.container_accounts, api.env.basedn)
+
+ # Anchor of the form :IPA:<domain>:<uuid>
+ # Strip the IPA prefix and the domain prefix
+ uuid = anchor.rpartition(':')[-1].strip()
+
+ # Set the object type-specific search attributes
+ objectclass, name_attr = {
+ 'user': ('posixaccount', 'uid'),
+ 'group': ('ipausergroup', 'cn'),
+ }[obj_type]
+
+ entry = ldap.find_entry_by_attr(attr='ipaUniqueID',
+ value=uuid,
+ object_class=objectclass,
+ attrs_list=[name_attr],
+ base_dn=accounts_dn)
+
+ # Return the name of the object, which is either cn for
+ # groups or uid for users
+ return entry.single_value[name_attr]
+
+ elif anchor.startswith(SID_ANCHOR_PREFIX):
+
+ # Parse the SID out from the anchor
+ sid = anchor[len(SID_ANCHOR_PREFIX):].strip()
+
+ if _dcerpc_bindings_installed:
+ domain_validator = ipaserver.dcerpc.DomainValidator(api)
+ if domain_validator.is_configured():
+ name = domain_validator.get_trusted_domain_object_from_sid(sid)
+
+ # We need to verify that the object type is correct
+ type_correct = verify_trusted_domain_object_type(
+ domain_validator, obj_type, name)
+
+ if type_correct:
+ return name
+
+ # No acceptable object was found
+ raise errors.NotFound(
+ reason=_("Anchor '%(anchor)s' could not be resolved.")
+ % dict(anchor=anchor))
+
+
+def remove_ipaobject_overrides(ldap, api, dn):
+ """
+ Removes all ID overrides for given object. This method is to be
+ consumed by -del commands of the given objects (users, groups).
+ """
+
+ entry = ldap.get_entry(dn, attrs_list=['ipaUniqueID'])
+ object_uuid = entry.single_value['ipaUniqueID']
+
+ override_filter = '(ipaanchoruuid=:IPA:{0}:{1})'.format(api.env.domain,
+ object_uuid)
+ try:
+ entries, truncated = ldap.find_entries(
+ override_filter,
+ base_dn=DN(api.env.container_views, api.env.basedn),
+ paged_search=True
+ )
+ except errors.EmptyResult:
+ pass
+ else:
+ # In case we found something, delete it
+ for entry in entries:
+ ldap.delete_entry(entry)
+
+
+# This is not registered on purpose, it's a base class for ID overrides
+class baseidoverride(LDAPObject):
+ """
+ Base ID override object.
+ """
+
+ parent_object = 'idview'
+ container_dn = api.env.container_views
+
+ object_class = ['ipaOverrideAnchor', 'top']
+ default_attributes = [
+ 'description', 'ipaAnchorUUID',
+ ]
+
+ takes_params = (
+ Str('ipaanchoruuid',
+ cli_name='anchor',
+ primary_key=True,
+ label=_('Anchor to override'),
+ ),
+ Str('description?',
+ cli_name='desc',
+ label=_('Description'),
+ ),
+ )
+
+ override_object = None
+
+ def get_dn(self, *keys, **options):
+ # If user passed raw anchor, do not try
+ # to translate it.
+ if ANCHOR_REGEX.match(keys[-1]):
+ anchor = keys[-1]
+
+ # Otherwise, translate object into a
+ # legitimate object anchor.
+ else:
+ anchor = resolve_object_to_anchor(
+ self.backend,
+ self.override_object,
+ keys[-1],
+ fallback_to_ldap=options['fallback_to_ldap']
+ )
+
+ keys = keys[:-1] + (anchor, )
+ return super(baseidoverride, self).get_dn(*keys, **options)
+
+ def set_anchoruuid_from_dn(self, dn, entry_attrs):
+ # TODO: Use entry_attrs.single_value once LDAPUpdate supports
+ # lists in primary key fields (baseldap.LDAPUpdate.execute)
+ entry_attrs['ipaanchoruuid'] = dn[0].value
+
+ def convert_anchor_to_human_readable_form(self, entry_attrs, **options):
+ if not options.get('raw'):
+ anchor = entry_attrs.single_value['ipaanchoruuid']
+
+ if anchor:
+ try:
+ object_name = resolve_anchor_to_object_name(
+ self.backend,
+ self.override_object,
+ anchor
+ )
+ entry_attrs.single_value['ipaanchoruuid'] = object_name
+ except errors.NotFound:
+ # If we were unable to resolve the anchor,
+ # keep it in the raw form
+ pass
+ except errors.ValidationError:
+ # Same as above, ValidationError may be raised when SIDs
+ # are attempted to be converted, but the domain is no
+ # longer trusted
+ pass
+
+ def prohibit_ipa_users_in_default_view(self, dn, entry_attrs):
+ # Check if parent object is Default Trust View, if so, prohibit
+ # adding overrides for IPA objects
+
+ if dn[1].value.lower() == DEFAULT_TRUST_VIEW_NAME:
+ if dn[0].value.startswith(IPA_ANCHOR_PREFIX):
+ raise errors.ValidationError(
+ name=_('ID View'),
+ error=_('Default Trust View cannot contain IPA users')
+ )
+
+class baseidoverride_add(LDAPCreate):
+ __doc__ = _('Add a new ID override.')
+ msg_summary = _('Added ID override "%(value)s"')
+
+ takes_options = LDAPCreate.takes_options + (fallback_to_ldap_option,)
+
+ def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
+ self.obj.set_anchoruuid_from_dn(dn, entry_attrs)
+ self.obj.prohibit_ipa_users_in_default_view(dn, entry_attrs)
+ return dn
+
+ def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
+ self.obj.convert_anchor_to_human_readable_form(entry_attrs, **options)
+ return dn
+
+
+class baseidoverride_del(LDAPDelete):
+ __doc__ = _('Delete an ID override.')
+ msg_summary = _('Deleted ID override "%(value)s"')
+
+ takes_options = LDAPDelete.takes_options + (fallback_to_ldap_option,)
+
+ def pre_callback(self, ldap, dn, *keys, **options):
+ assert isinstance(dn, DN)
+
+ # Make sure the entry we're deleting has all the objectclasses
+ # this object requires
+ try:
+ entry = ldap.get_entry(dn, ['objectclass'])
+ except errors.NotFound:
+ self.obj.handle_not_found(*keys)
+
+ required_object_classes = set(self.obj.object_class)
+ actual_object_classes = set(entry['objectclass'])
+
+ # If not, treat it as a failed search
+ if not required_object_classes.issubset(actual_object_classes):
+ self.obj.handle_not_found(*keys)
+
+ return dn
+
+
+class baseidoverride_mod(LDAPUpdate):
+ __doc__ = _('Modify an ID override.')
+ msg_summary = _('Modified an ID override "%(value)s"')
+
+ takes_options = LDAPUpdate.takes_options + (fallback_to_ldap_option,)
+
+ def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
+ if 'rename' in options:
+ raise errors.ValidationError(
+ name=_('ID override'),
+ error=_('ID overrides cannot be renamed')
+ )
+
+ self.obj.prohibit_ipa_users_in_default_view(dn, entry_attrs)
+ return dn
+
+ def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
+ self.obj.convert_anchor_to_human_readable_form(entry_attrs, **options)
+ return dn
+
+
+class baseidoverride_find(LDAPSearch):
+ __doc__ = _('Search for an ID override.')
+ msg_summary = ngettext('%(count)d ID override matched',
+ '%(count)d ID overrides matched', 0)
+
+ takes_options = LDAPSearch.takes_options + (fallback_to_ldap_option,)
+
+ def post_callback(self, ldap, entries, truncated, *args, **options):
+ for entry in entries:
+ self.obj.convert_anchor_to_human_readable_form(entry, **options)
+ return truncated
+
+
+class baseidoverride_show(LDAPRetrieve):
+ __doc__ = _('Display information about an ID override.')
+
+ takes_options = LDAPRetrieve.takes_options + (fallback_to_ldap_option,)
+
+ def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
+ self.obj.convert_anchor_to_human_readable_form(entry_attrs, **options)
+ return dn
+
+
+@register()
+class idoverrideuser(baseidoverride):
+
+ object_name = _('User ID override')
+ object_name_plural = _('User ID overrides')
+
+ label = _('User ID overrides')
+ label_singular = _('User ID override')
+ rdn_is_primary_key = True
+
+ permission_filter_objectclasses = ['ipaUserOverride']
+ managed_permissions = {
+ 'System: Read User ID Overrides': {
+ 'ipapermbindruletype': 'all',
+ 'ipapermright': {'read', 'search', 'compare'},
+ 'ipapermdefaultattr': {
+ 'objectClass', 'ipaAnchorUUID', 'uidNumber', 'description',
+ 'homeDirectory', 'uid', 'ipaOriginalUid', 'loginShell', 'gecos',
+ 'gidNumber', 'ipaSshPubkey', 'usercertificate'
+ },
+ },
+ }
+
+ object_class = baseidoverride.object_class + ['ipaUserOverride']
+ possible_objectclasses = ['ipasshuser', 'ipaSshGroupOfPubKeys']
+ default_attributes = baseidoverride.default_attributes + [
+ 'homeDirectory', 'uidNumber', 'uid', 'ipaOriginalUid', 'loginShell',
+ 'ipaSshPubkey', 'gidNumber', 'gecos', 'usercertificate;binary',
+ ]
+
+ search_display_attributes = baseidoverride.default_attributes + [
+ 'homeDirectory', 'uidNumber', 'uid', 'ipaOriginalUid', 'loginShell',
+ 'ipaSshPubkey', 'gidNumber', 'gecos',
+ ]
+
+ takes_params = baseidoverride.takes_params + (
+ Str('uid?',
+ pattern='^[a-zA-Z0-9_.][a-zA-Z0-9_.-]{0,252}[a-zA-Z0-9_.$-]?$',
+ pattern_errmsg='may only include letters, numbers, _, -, . and $',
+ maxlength=255,
+ cli_name='login',
+ label=_('User login'),
+ normalizer=lambda value: value.lower(),
+ ),
+ Int('uidnumber?',
+ cli_name='uid',
+ label=_('UID'),
+ doc=_('User ID Number'),
+ minvalue=1,
+ ),
+ Str('gecos?',
+ label=_('GECOS'),
+ ),
+ Int('gidnumber?',
+ label=_('GID'),
+ doc=_('Group ID Number'),
+ minvalue=1,
+ ),
+ Str('homedirectory?',
+ cli_name='homedir',
+ label=_('Home directory'),
+ ),
+ Str('loginshell?',
+ cli_name='shell',
+ label=_('Login shell'),
+ ),
+ Str('ipaoriginaluid?',
+ flags=['no_option', 'no_output']
+ ),
+ Str('ipasshpubkey*', validate_sshpubkey,
+ cli_name='sshpubkey',
+ label=_('SSH public key'),
+ normalizer=normalize_sshpubkey,
+ flags=['no_search'],
+ ),
+ Bytes('usercertificate*', validate_certificate,
+ cli_name='certificate',
+ label=_('Certificate'),
+ doc=_('Base-64 encoded user certificate'),
+ flags=['no_search',],
+ ),
+ )
+
+ override_object = 'user'
+
+ def update_original_uid_reference(self, entry_attrs):
+ anchor = entry_attrs.single_value['ipaanchoruuid']
+ try:
+ original_uid = resolve_anchor_to_object_name(self.backend,
+ self.override_object,
+ anchor)
+ entry_attrs['ipaOriginalUid'] = original_uid
+
+ except (errors.NotFound, errors.ValidationError):
+ # Anchor could not be resolved, this means we had to specify the
+ # object to manipulate using a raw anchor value already, hence
+ # we have no way to update the original_uid
+ pass
+
+ def convert_usercertificate_pre(self, entry_attrs):
+ if 'usercertificate' in entry_attrs:
+ entry_attrs['usercertificate;binary'] = entry_attrs.pop(
+ 'usercertificate')
+
+ def convert_usercertificate_post(self, entry_attrs, **options):
+ if 'usercertificate;binary' in entry_attrs:
+ entry_attrs['usercertificate'] = entry_attrs.pop(
+ 'usercertificate;binary')
+
+
+
+@register()
+class idoverridegroup(baseidoverride):
+
+ object_name = _('Group ID override')
+ object_name_plural = _('Group ID overrides')
+
+ label = _('Group ID overrides')
+ label_singular = _('Group ID override')
+ rdn_is_primary_key = True
+
+ permission_filter_objectclasses = ['ipaGroupOverride']
+ managed_permissions = {
+ 'System: Read Group ID Overrides': {
+ 'ipapermbindruletype': 'all',
+ 'ipapermright': {'read', 'search', 'compare'},
+ 'ipapermdefaultattr': {
+ 'objectClass', 'ipaAnchorUUID', 'gidNumber',
+ 'description', 'cn',
+ },
+ },
+ }
+
+ object_class = baseidoverride.object_class + ['ipaGroupOverride']
+ default_attributes = baseidoverride.default_attributes + [
+ 'gidNumber', 'cn',
+ ]
+
+ takes_params = baseidoverride.takes_params + (
+ Str('cn?',
+ pattern='^[a-zA-Z0-9_.][a-zA-Z0-9_.-]{0,252}[a-zA-Z0-9_.$-]?$',
+ pattern_errmsg='may only include letters, numbers, _, -, . and $',
+ maxlength=255,
+ cli_name='group_name',
+ label=_('Group name'),
+ normalizer=lambda value: value.lower(),
+ ),
+ Int('gidnumber?',
+ cli_name='gid',
+ label=_('GID'),
+ doc=_('Group ID Number'),
+ minvalue=1,
+ ),
+ )
+
+ override_object = 'group'
+
+@register()
+class idoverrideuser_add_cert(LDAPAddAttribute):
+ __doc__ = _('Add one or more certificates to the idoverrideuser entry')
+ msg_summary = _('Added certificates to idoverrideuser "%(value)s"')
+ attribute = 'usercertificate'
+
+ takes_options = LDAPAddAttribute.takes_options + (fallback_to_ldap_option,)
+
+ def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys,
+ **options):
+ dn = self.obj.get_dn(*keys, **options)
+ self.obj.convert_usercertificate_pre(entry_attrs)
+
+ return dn
+
+ def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
+ assert isinstance(dn, DN)
+ self.obj.convert_usercertificate_post(entry_attrs, **options)
+ self.obj.convert_anchor_to_human_readable_form(entry_attrs, **options)
+ return dn
+
+
+@register()
+class idoverrideuser_remove_cert(LDAPRemoveAttribute):
+ __doc__ = _('Remove one or more certificates to the idoverrideuser entry')
+ msg_summary = _('Removed certificates from idoverrideuser "%(value)s"')
+ attribute = 'usercertificate'
+
+ takes_options = LDAPRemoveAttribute.takes_options + (fallback_to_ldap_option,)
+
+ def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys,
+ **options):
+ dn = self.obj.get_dn(*keys, **options)
+ self.obj.convert_usercertificate_pre(entry_attrs)
+
+ return dn
+
+ def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
+ assert isinstance(dn, DN)
+ self.obj.convert_usercertificate_post(entry_attrs, **options)
+ self.obj.convert_anchor_to_human_readable_form(entry_attrs, **options)
+
+ return dn
+
+
+@register()
+class idoverrideuser_add(baseidoverride_add):
+ __doc__ = _('Add a new User ID override.')
+ msg_summary = _('Added User ID override "%(value)s"')
+
+ def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
+ dn = super(idoverrideuser_add, self).pre_callback(ldap, dn,
+ entry_attrs, attrs_list, *keys, **options)
+
+ entry_attrs['objectclass'].append('ipasshuser')
+ self.obj.convert_usercertificate_pre(entry_attrs)
+
+ # Update the ipaOriginalUid
+ self.obj.update_original_uid_reference(entry_attrs)
+ return dn
+
+ def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
+ dn = super(idoverrideuser_add, self).post_callback(ldap, dn,
+ entry_attrs, *keys, **options)
+ convert_sshpubkey_post(entry_attrs)
+ self.obj.convert_usercertificate_post(entry_attrs, **options)
+ return dn
+
+
+
+@register()
+class idoverrideuser_del(baseidoverride_del):
+ __doc__ = _('Delete an User ID override.')
+ msg_summary = _('Deleted User ID override "%(value)s"')
+
+
+@register()
+class idoverrideuser_mod(baseidoverride_mod):
+ __doc__ = _('Modify an User ID override.')
+ msg_summary = _('Modified an User ID override "%(value)s"')
+
+ def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
+ dn = super(idoverrideuser_mod, self).pre_callback(ldap, dn,
+ entry_attrs, attrs_list, *keys, **options)
+
+ # Update the ipaOriginalUid
+ self.obj.set_anchoruuid_from_dn(dn, entry_attrs)
+ self.obj.update_original_uid_reference(entry_attrs)
+ if 'objectclass' in entry_attrs:
+ obj_classes = entry_attrs['objectclass']
+ else:
+ _entry_attrs = ldap.get_entry(dn, ['objectclass'])
+ obj_classes = entry_attrs['objectclass'] = _entry_attrs['objectclass']
+
+ if 'ipasshpubkey' in entry_attrs and 'ipasshuser' not in obj_classes:
+ obj_classes.append('ipasshuser')
+
+ self.obj.convert_usercertificate_pre(entry_attrs)
+ return dn
+
+ def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
+ dn = super(idoverrideuser_mod, self).post_callback(ldap, dn,
+ entry_attrs, *keys, **options)
+ convert_sshpubkey_post(entry_attrs)
+ self.obj.convert_usercertificate_post(entry_attrs, **options)
+ return dn
+
+
+@register()
+class idoverrideuser_find(baseidoverride_find):
+ __doc__ = _('Search for an User ID override.')
+ msg_summary = ngettext('%(count)d User ID override matched',
+ '%(count)d User ID overrides matched', 0)
+
+ def post_callback(self, ldap, entries, truncated, *args, **options):
+ truncated = super(idoverrideuser_find, self).post_callback(
+ ldap, entries, truncated, *args, **options)
+ for entry in entries:
+ convert_sshpubkey_post(entry)
+ self.obj.convert_usercertificate_post(entry, **options)
+ return truncated
+
+
+@register()
+class idoverrideuser_show(baseidoverride_show):
+ __doc__ = _('Display information about an User ID override.')
+
+ def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
+ dn = super(idoverrideuser_show, self).post_callback(ldap, dn,
+ entry_attrs, *keys, **options)
+ convert_sshpubkey_post(entry_attrs)
+ self.obj.convert_usercertificate_post(entry_attrs, **options)
+ return dn
+
+
+@register()
+class idoverridegroup_add(baseidoverride_add):
+ __doc__ = _('Add a new Group ID override.')
+ msg_summary = _('Added Group ID override "%(value)s"')
+
+
+@register()
+class idoverridegroup_del(baseidoverride_del):
+ __doc__ = _('Delete an Group ID override.')
+ msg_summary = _('Deleted Group ID override "%(value)s"')
+
+
+@register()
+class idoverridegroup_mod(baseidoverride_mod):
+ __doc__ = _('Modify an Group ID override.')
+ msg_summary = _('Modified an Group ID override "%(value)s"')
+
+
+@register()
+class idoverridegroup_find(baseidoverride_find):
+ __doc__ = _('Search for an Group ID override.')
+ msg_summary = ngettext('%(count)d Group ID override matched',
+ '%(count)d Group ID overrides matched', 0)
+
+
+@register()
+class idoverridegroup_show(baseidoverride_show):
+ __doc__ = _('Display information about an Group ID override.')