summaryrefslogtreecommitdiffstats
path: root/ipalib/plugins/aci.py
diff options
context:
space:
mode:
Diffstat (limited to 'ipalib/plugins/aci.py')
-rw-r--r--ipalib/plugins/aci.py438
1 files changed, 438 insertions, 0 deletions
diff --git a/ipalib/plugins/aci.py b/ipalib/plugins/aci.py
new file mode 100644
index 00000000..6eb48264
--- /dev/null
+++ b/ipalib/plugins/aci.py
@@ -0,0 +1,438 @@
+# Authors:
+# Rob Crittenden <rcritten@redhat.com>
+#
+# Copyright (C) 2009 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Frontend plugins for managing DS ACIs
+"""
+
+from ipalib import api, crud, errors2
+from ipalib import Object, Command # Plugin base classes
+from ipalib import Str, Flag, Int # Parameter types
+from ipalib.aci import ACI
+
+def make_aci(current, aciname, kw):
+ try:
+ taskgroup = api.Command['taskgroup_show'](kw['taskgroup'])
+ except errors2.NotFound:
+ # The task group doesn't exist, let's be helpful and add it
+ tgkw = {'description':aciname}
+ taskgroup = api.Command['taskgroup_add'](kw['taskgroup'], **tgkw)
+
+ a = ACI(current)
+ a.name = aciname
+ a.permissions = kw['permissions'].replace(' ','').split(',')
+ a.set_bindrule("groupdn = \"ldap:///%s\"" % taskgroup['dn'])
+ if kw.get('attrs', None):
+ a.set_target_attr(kw['attrs'].split())
+ if kw.get('type', None):
+ a.set_target_attr(kw['attrs'].split())
+ if kw.get('memberof', None):
+ group = api.Command['group_show'](kw['memberof'])
+ a.set_target_filter("memberOf=%s" % group['dn'].decode('UTF-8'))
+ return a
+
+def search_by_name(acis, aciname):
+ """
+ Find an aci using the name field.
+
+ Must be an exact match of the entire name.
+ """
+ for a in acis:
+ try:
+ t = ACI(a)
+ if t.name == aciname:
+ return str(t)
+ except SyntaxError, e:
+ # FIXME: need to log syntax errors, ignore for now
+ pass
+
+ raise errors2.NotFound()
+
+def search_by_attr(acis, attrlist):
+ """
+ Find an aci by targetattr.
+
+ Returns an ACI list of all acis the attribute appears in.
+ """
+ results = []
+ for a in acis:
+ try:
+ t = ACI(a)
+ for attr in attrlist:
+ attr = attr.lower()
+ for v in t.target['targetattr'].get('expression'):
+ if attr == v.lower():
+ results.append(str(t))
+ except SyntaxError, e:
+ # FIXME: need to log syntax errors, ignore for now
+ pass
+
+ if results:
+ return results
+
+ raise errors2.NotFound()
+
+def search_by_taskgroup(acis, tgdn):
+ """
+ Find an aci by taskgroup. This searches the ACI bind rule.
+
+ Returns an ACI list of all acis that match.
+ """
+ results = []
+ for a in acis:
+ try:
+ t = ACI(a)
+ if t.bindrule['expression'] == "ldap:///" + tgdn:
+ results.append(str(t))
+ except SyntaxError, e:
+ # FIXME: need to log syntax errors, ignore for now
+ pass
+
+ if results:
+ return results
+
+ raise errors2.NotFound()
+
+def search_by_perm(acis, permlist):
+ """
+ Find an aci by permissions
+
+ Returns an ACI list of all acis the permission appears in.
+ """
+ results = []
+ for a in acis:
+ try:
+ t = ACI(a)
+ for perm in permlist:
+ if perm.lower() in t.permissions:
+ results.append(str(t))
+ except SyntaxError, e:
+ # FIXME: need to log syntax errors, ignore for now
+ pass
+
+ if results:
+ return results
+
+ raise errors2.NotFound()
+
+def search_by_memberof(acis, memberoffilter):
+ """
+ Find an aci by memberof
+
+ Returns an ACI list of all acis that has a matching memberOf as a
+ targetfilter.
+ """
+ results = []
+ memberoffilter = memberoffilter.lower()
+ for a in acis:
+ try:
+ t = ACI(a)
+ try:
+ if memberoffilter == t.target['targetfilter'].get('expression').lower():
+ results.append(str(t))
+ except KeyError:
+ pass
+ except SyntaxError, e:
+ # FIXME: need to log syntax errors, ignore for now
+ pass
+
+ if results:
+ return results
+
+ raise errors2.NotFound()
+
+class aci(Object):
+ """
+ ACI object.
+ """
+ takes_params = (
+ Str('aciname',
+ doc='Name of ACI',
+ primary_key=True,
+ ),
+ Str('taskgroup',
+ doc='Name of taskgroup this ACI grants access to',
+ ),
+ Str('permissions',
+ doc='Permissions to grant: read, write',
+ ),
+ Str('attrs?',
+ doc='Comma-separated list of attributes',
+ ),
+ Str('type?',
+ doc='type of IPA object: user, group, host',
+ ),
+ Str('memberof?',
+ doc='member of a group',
+ ),
+ Str('filter?',
+ doc='A legal LDAP filter (ou=Engineering)',
+ ),
+ Str('subtree?',
+ doc='A subtree to apply the ACI to',
+ ),
+ )
+api.register(aci)
+
+
+class aci_add(crud.Create):
+ """
+ Add a new aci.
+ """
+
+ def execute(self, aciname, **kw):
+ """
+ Execute the aci-add operation.
+
+ Returns the entry as it will be created in LDAP.
+
+ :param aciname: The name of the ACI being added.
+ :param kw: Keyword arguments for the other LDAP attributes.
+ """
+ assert 'aciname' not in kw
+ ldap = self.api.Backend.ldap
+
+ newaci = make_aci(None, aciname, kw)
+
+ currentaci = ldap.retrieve(self.api.env.basedn, ['aci'])
+
+ acilist = currentaci.get('aci')
+ for a in acilist:
+ try:
+ b = ACI(a)
+ if newaci.isequal(b):
+ raise errors2.DuplicateEntry()
+ except SyntaxError:
+ pass
+ acilist.append(str(newaci))
+ kwupdate = {'aci': acilist}
+
+ return ldap.update(currentaci.get('dn'), **kwupdate)
+
+api.register(aci_add)
+
+
+class aci_del(crud.Delete):
+ 'Delete an existing aci.'
+ """
+ Remove an aci by name.
+ """
+
+ def execute(self, aciname, **kw):
+ """
+ Execute the aci-del operation.
+
+ :param aciname: The name of the ACI being added.
+ :param kw: unused
+ """
+ assert 'aciname' not in kw
+ ldap = self.api.Backend.ldap
+
+ currentaci = ldap.retrieve(self.api.env.basedn, ['aci'])
+ acilist = currentaci.get('aci')
+ a = search_by_name(acilist, aciname)
+ i = acilist.index(str(a))
+ del acilist[i]
+
+ kwupdate = {'aci': acilist}
+
+ return ldap.update(currentaci.get('dn'), **kwupdate)
+
+ def output_for_cli(self, textui, result, aciname):
+ """
+ Output result of this command to command line interface.
+ """
+ textui.print_plain('Deleted aci "%s"' % aciname)
+
+api.register(aci_del)
+
+
+class aci_mod(crud.Update):
+ 'Edit an existing aci.'
+ def execute(self, aciname, **kw):
+ return "Not implemented"
+ def output_for_cli(self, textui, result, aciname, **options):
+ textui.print_plain(result)
+api.register(aci_mod)
+
+
+class aci_find(crud.Search):
+ 'Search for a aci.'
+ takes_options = (
+ Str('bindrule?',
+ doc='The bindrule (e.g. ldap:///self)'
+ ),
+ Flag('and?',
+ doc='Consider multiple options to be \"and\" so all are required.')
+ )
+ def execute(self, term, **kw):
+ ldap = self.api.Backend.ldap
+ currentaci = ldap.retrieve(self.api.env.basedn, ['aci'])
+ currentaci = currentaci.get('aci')
+ results = []
+
+ # aciname
+ if kw.get('aciname'):
+ try:
+ a = search_by_name(currentaci, kw.get('aciname'))
+ results = [a]
+ if kw.get('and'):
+ currentaci = results
+ except errors2.NotFound:
+ if kw.get('and'):
+ results = []
+ currentaci = []
+ pass
+
+ # attributes
+ if kw.get('attrs'):
+ try:
+ attrs = kw.get('attrs')
+ attrs = attrs.replace(' ','').split(',')
+ a=search_by_attr(currentaci, attrs)
+ if kw.get('and'):
+ results = a
+ currentaci = results
+ else:
+ results = results + a
+ except errors2.NotFound:
+ if kw.get('and'):
+ results = []
+ currentaci = []
+ pass
+
+ # taskgroup
+ if kw.get('taskgroup'):
+ try:
+ tg = api.Command['taskgroup_show'](kw.get('taskgroup'))
+ except errors2.NotFound:
+ # FIXME, need more precise error
+ raise
+ try:
+ a=search_by_taskgroup(currentaci, tg.get('dn'))
+ if kw.get('and'):
+ results = a
+ currentaci = results
+ else:
+ results = results + a
+ except errors2.NotFound:
+ if kw.get('and'):
+ results = []
+ currentaci = []
+ pass
+
+ # permissions
+ if kw.get('permissions'):
+ try:
+ permissions = kw.get('permissions')
+ permissions = permissions.replace(' ','').split(',')
+ a=search_by_perm(currentaci, permissions)
+ if kw.get('and'):
+ results = a
+ currentaci = results
+ else:
+ results = results + a
+ except errors2.NotFound:
+ if kw.get('and'):
+ results = []
+ currentaci = []
+ pass
+
+ # memberOf
+ if kw.get('memberof'):
+ try:
+ group = api.Command['group_show'](kw['memberof'])
+ memberof = "(memberOf=%s)" % group['dn'].decode('UTF-8')
+ a=search_by_memberof(currentaci, memberof)
+ results = results + a
+ if kw.get('and'):
+ currentaci = results
+ except errors2.NotFound:
+ if kw.get('and'):
+ results = []
+ currentaci = []
+ pass
+
+# TODO
+# --type=STR type of IPA object: user, group, host
+# --filter=STR A legal LDAP filter (ou=Engineering)
+# --subtree=STR A subtree to apply the ACI to
+# --bindrule=STR A subtree to apply the ACI to
+
+ # Make sure we have no dupes in the list
+ results = list(set(results))
+
+ # the first entry contains the count
+ counter = len(results)
+ return [counter] + results
+
+ def output_for_cli(self, textui, result, term, **options):
+ counter = result[0]
+ acis = result[1:]
+ if counter == 0 or len(acis) == 0:
+ textui.print_plain("No entries found")
+ return
+ textui.print_name(self.name)
+ for a in acis:
+ textui.print_plain(a)
+ textui.print_count(acis, '%d acis matched')
+
+api.register(aci_find)
+
+
+class aci_show(crud.Retrieve):
+ 'Examine an existing aci.'
+ def execute(self, aciname, **kw):
+ """
+ Execute the aci-show operation.
+
+ Returns the entry
+
+ :param uid: The login name of the user to retrieve.
+ :param kw: unused
+ """
+ ldap = self.api.Backend.ldap
+ currentaci = ldap.retrieve(self.api.env.basedn, ['aci'])
+
+ a = search_by_name(currentaci.get('aci'), aciname)
+ return str(a)
+
+ def output_for_cli(self, textui, result, aciname, **options):
+ textui.print_plain(result)
+
+api.register(aci_show)
+
+
+class aci_showall(Command):
+ 'Examine all existing acis.'
+ def execute(self):
+ """
+ Execute the aci-show operation.
+
+ Returns the entry
+
+ :param uid: The login name of the user to retrieve.
+ :param kw: unused
+ """
+ ldap = self.api.Backend.ldap
+ return ldap.retrieve(self.api.env.basedn, ['aci'])
+ def output_for_cli(self, textui, result, **options):
+ textui.print_entry(result)
+
+api.register(aci_showall)