summaryrefslogtreecommitdiffstats
path: root/ipalib/plugins/aci.py
diff options
context:
space:
mode:
Diffstat (limited to 'ipalib/plugins/aci.py')
-rw-r--r--ipalib/plugins/aci.py386
1 files changed, 386 insertions, 0 deletions
diff --git a/ipalib/plugins/aci.py b/ipalib/plugins/aci.py
new file mode 100644
index 000000000..f728d2a4a
--- /dev/null
+++ b/ipalib/plugins/aci.py
@@ -0,0 +1,386 @@
+# Authors:
+# Rob Crittenden <rcritten@redhat.com>
+# Pavel Zuna <pzuna@redhat.com>
+#
+# Copyright (C) 2009 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+"""
+Directory Server Access Control Instructions (ACIs)
+"""
+
+from ipalib import api, crud, errors
+from ipalib import Object, Command
+from ipalib import Flag, Int, List, Str, StrEnum
+from ipalib.aci import ACI
+
+_type_map = {
+ 'user': 'ldap:///uid=*,%s,%s' % (api.env.container_user, api.env.basedn),
+ 'group': 'ldap:///cn=*,%s,%s' % (api.env.container_group, api.env.basedn),
+ 'host': 'ldap:///cn=*,%s,%s' % (api.env.container_host, api.env.basedn)
+}
+
+_valid_permissions_values = [
+ u'read', u'write', u'add', u'delete', u'selfwrite', u'all'
+]
+
+
+def _make_aci(current, aciname, kw):
+ try:
+ (dn, entry_attrs) = api.Command['taskgroup2_show'](kw['taskgroup'])
+ except errors.NotFound:
+ # The task group doesn't exist, let's be helpful and add it
+ tgkw = {'description': aciname}
+ (dn, entry_attrs) = api.Command['taskgroup2_create'](
+ kw['taskgroup'], **tgkw
+ )
+
+ a = ACI(current)
+ a.name = aciname
+ a.permissions = kw['permissions']
+ a.set_bindrule('groupdn = "ldap:///%s"' % dn)
+ if 'attrs' in kw:
+ a.set_target_attr(kw['attrs'])
+ if 'memberof' in kw:
+ (dn, entry_attrs) = api.Command['group2_show'](kw['memberof'])
+ a.set_target_filter('memberOf=%s' % dn)
+ if 'filter' in kw:
+ a.set_target_filter(kw['filter'])
+ if 'type' in kw:
+ target = _type_map[kw['type']]
+ a.set_target(target)
+ if 'targetgroup' in kw:
+ # Purposely no try here so we'll raise a NotFound
+ (dn, entry_attrs) = api.Command['group2_show'](kw['targetgroup'])
+ target = 'ldap:///%s' % dn
+ a.set_target(target)
+ if 'subtree' in kw:
+ # See if the subtree is a full URI
+ target = kw['subtree']
+ if not target.startswith('ldap:///'):
+ target = 'ldap:///%s' % target
+ a.set_target(target)
+
+ return a
+
+def _convert_strings_to_acis(acistrs):
+ acis = []
+ for a in acistrs:
+ try:
+ acis.append(ACI(a))
+ except SyntaxError, e:
+ # FIXME: need to log syntax errors, ignore for now
+ pass
+ return acis
+
+def _find_aci_by_name(acis, aciname):
+ for a in acis:
+ if a.name.lower() == aciname.lower():
+ return a
+ raise errors.NotFound('ACI with name "%s" not found' % aciname)
+
+def _normalize_permissions(permissions):
+ valid_permissions = []
+ permissions = permissions.split(',')
+ for p in permissions:
+ p = p.strip().lower()
+ if p in _valid_permissions_values and p not in valid_permissions:
+ valid_permissions.append(p)
+ return ','.join(valid_permissions)
+
+
+class aci2(Object):
+ """
+ ACI object.
+ """
+ takes_params = (
+ Str('aciname',
+ cli_name='name',
+ doc='name',
+ primary_key=True,
+ ),
+ Str('taskgroup',
+ cli_name='taskgroup',
+ doc='taskgroup ACI grants access to',
+ ),
+ List('permissions',
+ cli_name='permissions',
+ doc='comma-separated list of permissions to grant' \
+ '(read, write, add, delete, selfwrite, all)',
+ normalizer=_normalize_permissions,
+ ),
+ List('attrs?',
+ cli_name='attrs',
+ doc='comma-separated list of attributes',
+ ),
+ StrEnum('type?',
+ cli_name='type',
+ doc='type of IPA object (user, group, host)',
+ values=(u'user', u'group', u'host'),
+ ),
+ Str('memberof?',
+ cli_name='memberof',
+ doc='member of a group',
+ ),
+ Str('filter?',
+ cli_name='filter',
+ doc='legal LDAP filter (e.g. ou=Engineering)',
+ ),
+ Str('subtree?',
+ cli_name='subtree',
+ doc='subtree to apply ACI to',
+ ),
+ Str('targetgroup?',
+ cli_name='targetgroup',
+ doc='group to apply ACI to',
+ ),
+ )
+
+api.register(aci2)
+
+
+class aci2_create(crud.Create):
+ """
+ Create new ACI.
+ """
+ def execute(self, aciname, **kw):
+ """
+ Execute the aci-create operation.
+
+ Returns the entry as it will be created in LDAP.
+
+ :param aciname: The name of the ACI being added.
+ :param kw: Keyword arguments for the other LDAP attributes.
+ """
+ assert 'aciname' not in kw
+ ldap = self.api.Backend.ldap2
+
+ newaci = _make_aci(None, aciname, kw)
+
+ (dn, entry_attrs) = ldap.get_entry(self.api.env.basedn, ['aci'])
+
+ acis = _convert_strings_to_acis(entry_attrs.get('aci', []))
+ for a in acis:
+ if a.isequal(newaci):
+ raise errors.DuplicateEntry()
+
+ newaci_str = str(newaci)
+ entry_attrs['acis'].append(newaci_str)
+
+ ldap.update_entry(dn, entry_attrs)
+
+ return newaci_str
+
+ def output_for_cli(self, textui, result, aciname, **options):
+ textui.print_name(self.name)
+ textui.print_plain(result)
+ textui.print_dashed('Created ACI "%s".' % aciname)
+
+api.register(aci2_create)
+
+
+class aci2_delete(crud.Delete):
+ """
+ Delete ACI.
+ """
+ def execute(self, aciname, **kw):
+ """
+ Execute the aci-delete operation.
+
+ :param aciname: The name of the ACI being added.
+ :param kw: unused
+ """
+ assert 'aciname' not in kw
+ ldap = self.api.Backend.ldap2
+
+ (dn, entry_attrs) = ldap.get_entry(self.api.env.basedn, ['aci'])
+
+ acistrs = entry_attrs.get('aci', [])
+ acis = _convert_strings_to_acis(acistrs)
+ aci = _find_aci_by_name(acis, aciname)
+ for a in acistrs:
+ if aci.isequal(a):
+ acistrs.remove(a)
+ break
+
+ entry_attrs['aci'] = acistrs
+
+ ldap.update_entry(dn, entry_attrs)
+
+ return True
+
+ def output_for_cli(self, textui, result, aciname, **options):
+ """
+ Output result of this command to command line interface.
+ """
+ textui.print_name(self.name)
+ textui.print_plain('Deleted ACI "%s".' % aciname)
+
+api.register(aci2_delete)
+
+
+class aci2_mod(crud.Update):
+ """
+ Modify ACI.
+ """
+ def execute(self, aciname, **kw):
+ ldap = self.api.Backend.ldap2
+
+ (dn, entry_attrs) = ldap.get_entry(self.api.env.basedn, ['aci'])
+
+ acis = _convert_strings_to_acis(entry_attrs.get('aci', []))
+ aci = _find_aci_by_name(acis, aciname)
+
+ kw.setdefault('aciname', aci.name)
+ kw.setdefault('taskgroup', aci.bindrule['expression'])
+ kw.setdefault('permissions', aci.permissions)
+ kw.setdefault('attrs', aci.target['targetattr']['expression'])
+ if 'type' not in kw and 'targetgroup' not in kw and 'subtree' not in kw:
+ kw['subtree'] = aci.target['target']['expression']
+ if 'memberof' not in kw and 'filter' not in kw:
+ kw['filter'] = aci.target['targetfilter']['expression']
+
+ self.api.Command['aci2_delete'](aciname)
+
+ return self.api.Command['aci2_create'](aciname, **kw)
+
+ def output_for_cli(self, textui, result, aciname, **options):
+ textui.print_name(self.name)
+ textui.print_plain(result)
+ textui.print_dashed('Modified ACI "%s".' % aciname)
+
+api.register(aci2_mod)
+
+
+class aci2_find(crud.Search):
+ """
+ Search for ACIs.
+ """
+ def execute(self, term, **kw):
+ ldap = self.api.Backend.ldap2
+
+ (dn, entry_attrs) = ldap.get_entry(self.api.env.basedn, ['aci'])
+
+ acis = _convert_strings_to_acis(entry_attrs.get('aci', []))
+ results = []
+
+ if term:
+ term = term.lower()
+ for a in acis:
+ if a.name.lower().find(term) != -1 and a not in results:
+ results.append(a)
+ acis = list(results)
+ else:
+ results = list(acis)
+
+ if 'aciname' in kw:
+ for a in acis:
+ if a.name != kw['aciname']:
+ results.remove(a)
+ acis = list(results)
+
+ if 'attrs' in kw:
+ for a in acis:
+ alist1 = sorted(
+ [t.lower() for t in a.target['targetattr']['expression']]
+ )
+ alist2 = sorted([t.lower() for t in kw['attrs']])
+ if alist1 != alist2:
+ results.remove(a)
+ acis = list(results)
+
+ if 'taskgroup' in kw:
+ try:
+ (dn, entry_attrs) = self.api.Command['taskgroup2_show'](
+ kw['taskgroup']
+ )
+ except errors.NotFound:
+ pass
+ else:
+ for a in acis:
+ if a.bindrule['expression'] != ('ldap:///%s' % dn):
+ results.remove(a)
+ acis = list(results)
+
+ if 'permissions' in kw:
+ for a in acis:
+ alist1 = sorted(a.permissions)
+ alist2 = sorted(kw['permissions'])
+ if alist1 != alist2:
+ results.remove(a)
+ acis = list(results)
+
+ if 'memberof' in kw:
+ try:
+ (dn, entry_attrs) = self.api.Command['group2_show'](
+ kw['memberof']
+ )
+ except errors.NotFound:
+ pass
+ else:
+ memberof_filter = '(memberOf=%s)' % dn
+ for a in acis:
+ if 'targetfilter' in a.target:
+ filter = a.target['targetfilter']['expression']
+ if filter != memberof_filter:
+ results.remove(a)
+ else:
+ results.remove(a)
+ # uncomment next line if you add more search criteria
+ # acis = list(results)
+
+ # TODO: searching by: type, filter, subtree
+
+ return [str(aci) for aci in results]
+
+ def output_for_cli(self, textui, result, term, **options):
+ textui.print_name(self.name)
+ for aci in result:
+ textui.print_plain(aci)
+ textui.print_plain('')
+ textui.print_count(
+ len(result), '%i ACI matched.', '%i ACIs matched.'
+ )
+
+api.register(aci2_find)
+
+
+class aci2_show(crud.Retrieve):
+ """
+ Display ACI.
+ """
+ def execute(self, aciname, **kw):
+ """
+ Execute the aci-show operation.
+
+ Returns the entry
+
+ :param uid: The login name of the user to retrieve.
+ :param kw: unused
+ """
+ ldap = self.api.Backend.ldap2
+
+ (dn, entry_attrs) = ldap.get_entry(self.api.env.basedn, ['aci'])
+
+ acis = _convert_strings_to_acis(entry_attrs.get('aci', []))
+
+ return str(_find_aci_by_name(acis, aciname))
+
+ def output_for_cli(self, textui, result, aciname, **options):
+ textui.print_name(self.name)
+ textui.print_plain(result)
+
+api.register(aci2_show)
+