diff options
author | Jakub Hrozek <jhrozek@redhat.com> | 2011-05-30 09:57:19 +0200 |
---|---|---|
committer | Stephen Gallagher <sgallagh@redhat.com> | 2011-10-26 10:29:38 -0400 |
commit | 817d3fe806506c637988cf99c7ab774345292e05 (patch) | |
tree | fb8cb01d7b5a6198244bff4ee8de49c9240e23e4 | |
parent | 9a58bc432bb9bc4ae4c452c5e600c94d4fc1c5a0 (diff) | |
download | sssd-817d3fe806506c637988cf99c7ab774345292e05.tar.gz sssd-817d3fe806506c637988cf99c7ab774345292e05.tar.xz sssd-817d3fe806506c637988cf99c7ab774345292e05.zip |
Provide python bindings for the HBAC evaluator library
Fixes for python HBAC bindings
These changes were proposed during a review:
* Change the signature of str_concat_sequence() to const char *
* use a getsetter for HbacRule.enabled to allow string true/false and
integer 1/0 in addition to bool
* fix a minor memory leak (HbacRequest.rule_name)
* remove overzealous discard consts
Fix python HBAC bindings for python <= 2.4
Several parts of the HBAC python bindings did not work with old Python
versions, such as the one shipped in RHEL5.
The changes include:
* a compatibility wrapper around python set object
* PyModule_AddIntMacro compat macro
* Py_ssize_t compat definition
* Do not use PyUnicode_FromFormat
* several function prototypes and structures used to have "char
arguments where they have "const char *" in recent versions.
This caused compilation warnings this patch mitigates by using
the discard_const hack on python 2.4
Remove dead code from python HBAC bindings
https://fedorahosted.org/sssd/ticket/935
Handle allocation error in python HBAC bindings
https://fedorahosted.org/sssd/ticket/934
HBAC rule validation Python bindings
https://fedorahosted.org/sssd/ticket/943
-rw-r--r-- | Makefile.am | 23 | ||||
-rw-r--r-- | configure.ac | 1 | ||||
-rw-r--r-- | contrib/sssd.spec.in | 17 | ||||
-rw-r--r-- | src/external/python.m4 | 20 | ||||
-rw-r--r-- | src/python/pyhbac.c | 1948 | ||||
-rwxr-xr-x | src/tests/pyhbac-test.py | 522 | ||||
-rw-r--r-- | src/util/sss_python.c | 104 | ||||
-rw-r--r-- | src/util/sss_python.h | 63 |
8 files changed, 2694 insertions, 4 deletions
diff --git a/Makefile.am b/Makefile.am index d75300bdb..44c9b7c42 100644 --- a/Makefile.am +++ b/Makefile.am @@ -92,7 +92,8 @@ check_PROGRAMS = \ PYTHON_TESTS = if BUILD_PYTHON_BINDINGS -PYTHON_TESTS += $(srcdir)/src/config/SSSDConfigTest.py +PYTHON_TESTS += $(srcdir)/src/config/SSSDConfigTest.py \ + $(srcdir)/src/tests/pyhbac-test.py endif TESTS = \ @@ -140,7 +141,8 @@ libsss_crypt_la_LIBADD = \ if BUILD_PYTHON_BINDINGS pyexec_LTLIBRARIES = \ - pysss.la + pysss.la \ + pyhbac.la endif dist_noinst_SCRIPTS = \ @@ -149,7 +151,8 @@ dist_noinst_SCRIPTS = \ src/config/ipachangeconf.py \ src/config/SSSDConfig.py \ src/config/SSSDConfigTest.py \ - src/config/sssd_upgrade_config.py + src/config/sssd_upgrade_config.py \ + src/tests/pyhbac-test.py dist_noinst_DATA = \ src/config/testconfigs/sssd-valid.conf \ @@ -308,6 +311,7 @@ dist_noinst_HEADERS = \ src/util/util.h \ src/util/strtonum.h \ src/util/sss_ldap.h \ + src/util/sss_python.h \ src/util/sss_krb5.h \ src/util/refcount.h \ src/util/find_uid.h \ @@ -989,6 +993,19 @@ pysss_la_LIBADD = \ pysss_la_LDFLAGS = \ -avoid-version \ -module + +pyhbac_la_SOURCES = \ + src/python/pyhbac.c \ + src/util/sss_python.c +pyhbac_la_CFLAGS = \ + $(AM_CFLAGS) \ + $(PYTHON_CFLAGS) +pyhbac_la_LIBADD = \ + $(PYTHON_LIBS) \ + libipa_hbac.la +pyhbac_la_LDFLAGS = \ + -avoid-version \ + -module endif ################ diff --git a/configure.ac b/configure.ac index e86fa73a5..60434e2f2 100644 --- a/configure.ac +++ b/configure.ac @@ -154,6 +154,7 @@ if test x$HAVE_PYTHON_BINDINGS != x; then AM_CHECK_PYTHON_HEADERS([], AC_MSG_ERROR([Could not find python headers])) AM_PYTHON_CONFIG + AM_CHECK_PYTHON_COMPAT fi if test x$HAVE_SELINUX != x; then diff --git a/contrib/sssd.spec.in b/contrib/sssd.spec.in index db0ecb3f4..291029b11 100644 --- a/contrib/sssd.spec.in +++ b/contrib/sssd.spec.in @@ -124,6 +124,16 @@ Requires: libipa_hbac = %{version}-%{release} %description -n libipa_hbac-devel Utility library to validate FreeIPA HBAC rules for authorization requests +%package -n libipa_hbac-python +Summary: Python bindings for the FreeIPA HBAC Evaluator library +Group: Development/Libraries +License: LGPLv3+ +Requires: libipa_hbac = %{version}-%{release} + +%description -n libipa_hbac-python +The libipa_hbac-python contains the bindings so that libipa_hbac can be +used by Python applications. + %prep %setup -q @@ -179,7 +189,8 @@ rm -f \ $RPM_BUILD_ROOT/%{_libdir}/sssd/libsss_simple.la \ $RPM_BUILD_ROOT/%{_libdir}/krb5/plugins/libkrb5/sssd_krb5_locator_plugin.la \ $RPM_BUILD_ROOT/%{_libdir}/libipa_hbac.la \ - $RPM_BUILD_ROOT/%{python_sitearch}/pysss.la + $RPM_BUILD_ROOT/%{python_sitearch}/pysss.la \ + $RPM_BUILD_ROOT/%{python_sitearch}/pyhbac.la # Older versions of rpmbuild can only handle one -f option # So we need to append to the sssd.lang file @@ -264,6 +275,10 @@ rm -rf $RPM_BUILD_ROOT %{_libdir}/libipa_hbac.so %{_libdir}/pkgconfig/ipa_hbac.pc +%files -n libipa_hbac-python +%defattr(-,root,root,-) +%{python_sitearch}/pyhbac.so + %post /sbin/ldconfig /sbin/chkconfig --add %{servicename} diff --git a/src/external/python.m4 b/src/external/python.m4 index db5986385..00487a746 100644 --- a/src/external/python.m4 +++ b/src/external/python.m4 @@ -56,3 +56,23 @@ AC_DEFUN([AM_CHECK_PYTHON_HEADERS], ]) +dnl Checks for a couple of functions we use that may not be defined +dnl in some older python versions used e.g. on RHEL5 +AC_DEFUN([AM_CHECK_PYTHON_COMPAT], +[AC_REQUIRE([AM_CHECK_PYTHON_HEADERS]) + save_CPPFLAGS="$CPPFLAGS" + save_LIBS="$LIBS" + CPPFLAGS="$CPPFLAGS $PYTHON_INCLUDES" + LIBS="$LIBS $PYTHON_LIBS" + + AC_CHECK_TYPE(Py_ssize_t, + [ AC_DEFINE_UNQUOTED(HAVE_PY_SSIZE_T, 1, [Native Py_ssize_t type]) ], + [], + [[#include <Python.h>]]) + + AC_CHECK_FUNCS([PySet_New PySet_Add PyErr_NewExceptionWithDoc]) + AC_CHECK_DECLS([PySet_Check, PyModule_AddIntMacro, PyUnicode_FromString], [], [], [[#include <Python.h>]]) + + CPPFLAGS="$save_CPPFLAGS" + LIBS="$save_LIBS" +]) diff --git a/src/python/pyhbac.c b/src/python/pyhbac.c new file mode 100644 index 000000000..230b9316a --- /dev/null +++ b/src/python/pyhbac.c @@ -0,0 +1,1948 @@ +/* + Authors: + Jakub Hrozek <jhrozek@redhat.com> + + Copyright (C) 2011 Red Hat + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <Python.h> +#include <structmember.h> + +#include "util/util.h" +#include "util/sss_python.h" +#include "providers/ipa/ipa_hbac.h" + +#define PYTHON_MODULE_NAME "pyhbac" + +#ifndef PYHBAC_ENCODING +#define PYHBAC_ENCODING "UTF-8" +#endif + +#define PYHBAC_ENCODING_ERRORS "strict" + +#define CHECK_ATTRIBUTE_DELETE(attr, attrname) do { \ + if (attr == NULL) { \ + PyErr_Format(PyExc_TypeError, \ + "Cannot delete the %s attribute", \ + attrname); \ + return -1; \ + } \ +} while(0); + +static PyObject *PyExc_HbacError; + +/* ==================== Utility functions ========================*/ +static char * +py_strdup(const char *string) +{ + char *copy; + + copy = PyMem_New(char, strlen(string)+1); + if (copy == NULL) { + PyErr_NoMemory(); + return NULL; + } + + return strcpy(copy, string); +} + +static char * +py_strcat_realloc(char *first, const char *second) +{ + char *new_first; + new_first = PyMem_Realloc(first, strlen(first) + strlen(second) + 1); + if (new_first == NULL) { + PyErr_NoMemory(); + return NULL; + } + + return strcat(new_first, second); +} + +static PyObject * +get_utf8_string(PyObject *obj, const char *attrname) +{ + const char *a = attrname ? attrname : "attribute"; + PyObject *obj_utf8 = NULL; + + if (PyString_Check(obj)) { + obj_utf8 = obj; + Py_INCREF(obj_utf8); /* Make sure we can DECREF later */ + } else if (PyUnicode_Check(obj)) { + if ((obj_utf8 = PyUnicode_AsUTF8String(obj)) == NULL) { + return NULL; + } + } else { + PyErr_Format(PyExc_TypeError, "%s must be a string", a); + return NULL; + } + + return obj_utf8; +} + +static void +free_string_list(const char **list) +{ + int i; + + if (!list) return; + + for (i=0; list[i]; i++) { + PyMem_Free(discard_const_p(char, list[i])); + } + PyMem_Free(list); +} + +static const char ** +sequence_as_string_list(PyObject *seq, const char *paramname) +{ + const char *p = paramname ? paramname : "attribute values"; + const char **ret; + PyObject *utf_item; + int i; + Py_ssize_t len; + PyObject *item; + + if (!PySequence_Check(seq)) { + PyErr_Format(PyExc_TypeError, + "The object must be a sequence\n"); + return NULL; + } + + len = PySequence_Size(seq); + if (len == -1) return NULL; + + ret = PyMem_New(const char *, (len+1)); + if (!ret) { + PyErr_NoMemory(); + return NULL; + } + + for (i = 0; i < len; i++) { + item = PySequence_GetItem(seq, i); + if (item == NULL) { + break; + } + + utf_item = get_utf8_string(item, p); + if (utf_item == NULL) { + return NULL; + } + + ret[i] = py_strdup(PyString_AsString(utf_item)); + Py_DECREF(utf_item); + if (!ret[i]) { + return NULL; + } + } + + ret[i] = NULL; + return ret; +} + +static bool +verify_sequence(PyObject *seq, const char *attrname) +{ + const char *a = attrname ? attrname : "attribute"; + + if (!PySequence_Check(seq)) { + PyErr_Format(PyExc_TypeError, "%s must be a sequence", a); + return false; + } + + return true; +} + +static int +pyobject_to_category(PyObject *o) +{ + int c; + + c = PyInt_AsLong(o); + if (c == -1 && PyErr_Occurred()) { + PyErr_Format(PyExc_TypeError, + "Invalid type for category element - must be an int\n"); + return -1; + } + + switch (c) { + case HBAC_CATEGORY_NULL: + case HBAC_CATEGORY_ALL: + return c; + } + + PyErr_Format(PyExc_ValueError, "Invalid value %d for category\n", c); + return -1; +} + +static uint32_t +native_category(PyObject *pycat) +{ + PyObject *iterator; + PyObject *item; + uint32_t cat; + int ret; + + iterator = PyObject_GetIter(pycat); + if (iterator == NULL) { + PyErr_Format(PyExc_RuntimeError, "Cannot iterate category\n"); + return -1; + } + + cat = 0; + while ((item = PyIter_Next(iterator))) { + ret = pyobject_to_category(item); + Py_DECREF(item); + if (ret == -1) { + Py_DECREF(iterator); + return -1; + } + + cat |= ret; + } + + Py_DECREF(iterator); + return cat; +} + +static char * +str_concat_sequence(PyObject *seq, const char *delim) +{ + Py_ssize_t size; + Py_ssize_t i; + PyObject *item; + char *s = NULL; + char *part; + + size = PySequence_Size(seq); + + if (size == 0) { + s = py_strdup(""); + if (s == NULL) { + return NULL; + } + return s; + } + + for (i=0; i < size; i++) { + item = PySequence_GetItem(seq, i); + if (item == NULL) goto fail; + + part = PyString_AsString(item); + if (part == NULL) { + Py_DECREF(item); + goto fail; + } + + if (s) { + s = py_strcat_realloc(s, delim); + if (s == NULL) goto fail; + s = py_strcat_realloc(s, part); + if (s == NULL) goto fail; + } else { + s = py_strdup(part); + if (s == NULL) goto fail; + } + Py_DECREF(item); + } + + return s; +fail: + PyMem_Free(s); + return NULL; +} + +/* ================= HBAC Exception handling =====================*/ +static void +set_hbac_exception(PyObject *exc, struct hbac_info *error) +{ + PyErr_SetObject(exc, + Py_BuildValue(sss_py_const_p(char, "(i,s)"), + error->code, + error->rule_name ? \ + error->rule_name : "no rule")); +} + +/* ==================== HBAC Rule Element ========================*/ +typedef struct { + PyObject_HEAD + + PyObject *category; + PyObject *names; + PyObject *groups; +} HbacRuleElement; + +static PyObject * +HbacRuleElement_new(PyTypeObject *type, PyObject *args, PyObject *kwds) +{ + HbacRuleElement *self; + + self = (HbacRuleElement *) type->tp_alloc(type, 0); + if (self == NULL) { + PyErr_NoMemory(); + return NULL; + } + + self->category = sss_python_set_new(); + self->names = PyList_New(0); + self->groups = PyList_New(0); + if (!self->names || !self->groups || !self->category) { + Py_DECREF(self); + PyErr_NoMemory(); + return NULL; + } + + return (PyObject *) self; +} + +static int +HbacRuleElement_clear(HbacRuleElement *self) +{ + Py_CLEAR(self->names); + Py_CLEAR(self->groups); + Py_CLEAR(self->category); + return 0; +} + +static void +HbacRuleElement_dealloc(HbacRuleElement *self) +{ + HbacRuleElement_clear(self); + self->ob_type->tp_free((PyObject*) self); +} + +static int +HbacRuleElement_traverse(HbacRuleElement *self, visitproc visit, void *arg) +{ + Py_VISIT(self->groups); + Py_VISIT(self->names); + Py_VISIT(self->category); + return 0; +} + +static int +hbac_rule_element_set_names(HbacRuleElement *self, PyObject *names, + void *closure); +static int +hbac_rule_element_set_groups(HbacRuleElement *self, PyObject *groups, + void *closure); +static int +hbac_rule_element_set_category(HbacRuleElement *self, PyObject *category, + void *closure); + +static int +HbacRuleElement_init(HbacRuleElement *self, PyObject *args, PyObject *kwargs) +{ + const char * const kwlist[] = { "names", "groups", "category", NULL }; + PyObject *names = NULL; + PyObject *groups = NULL; + PyObject *category = NULL; + PyObject *tmp = NULL; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, + sss_py_const_p(char, "|OOO"), + discard_const_p(char *, kwlist), + &names, &groups, &category)) { + return -1; + } + + if (names) { + if (hbac_rule_element_set_names(self, names, NULL) != 0) { + return -1; + } + } + + if (groups) { + if (hbac_rule_element_set_groups(self, groups, NULL) != 0) { + return -1; + } + } + + if (category) { + if (hbac_rule_element_set_category(self, category, NULL) != 0) { + return -1; + } + } else { + tmp = PyInt_FromLong(HBAC_CATEGORY_NULL); + if (!tmp) { + return -1; + } + + if (sss_python_set_add(self->category, tmp) != 0) { + Py_DECREF(tmp); + return -1; + } + } + + return 0; +} + +static int +hbac_rule_element_set_names(HbacRuleElement *self, + PyObject *names, + void *closure) +{ + CHECK_ATTRIBUTE_DELETE(names, "names"); + + if (!verify_sequence(names, "names")) { + return -1; + } + + SAFE_SET(self->names, names); + return 0; +} + +static PyObject * +hbac_rule_element_get_names(HbacRuleElement *self, void *closure) +{ + Py_INCREF(self->names); + return self->names; +} + +static int +hbac_rule_element_set_groups(HbacRuleElement *self, + PyObject *groups, + void *closure) +{ + CHECK_ATTRIBUTE_DELETE(groups, "groups"); + + if (!verify_sequence(groups, "groups")) { + return -1; + } + + SAFE_SET(self->groups, groups); + return 0; +} + +static PyObject * +hbac_rule_element_get_groups(HbacRuleElement *self, void *closure) +{ + Py_INCREF(self->groups); + return self->groups; +} + +static int +hbac_rule_element_set_category(HbacRuleElement *self, + PyObject *category, + void *closure) +{ + PyObject *iterator; + PyObject *item; + int ret; + + CHECK_ATTRIBUTE_DELETE(category, "category"); + + if (!sss_python_set_check(category)) { + PyErr_Format(PyExc_TypeError, "The category must be a set type\n"); + return -1; + } + + /* Check the values, too */ + iterator = PyObject_GetIter(category); + if (iterator == NULL) { + PyErr_Format(PyExc_RuntimeError, "Cannot iterate a set?\n"); + return -1; + } + + while ((item = PyIter_Next(iterator))) { + ret = pyobject_to_category(item); + Py_DECREF(item); + if (ret == -1) { + Py_DECREF(iterator); + return -1; + } + } + + SAFE_SET(self->category, category); + Py_DECREF(iterator); + return 0; +} + +static PyObject * +hbac_rule_element_get_category(HbacRuleElement *self, void *closure) +{ + Py_INCREF(self->category); + return self->category; +} + +static PyObject * +HbacRuleElement_repr(HbacRuleElement *self) +{ + char *strnames = NULL; + char *strgroups = NULL; + uint32_t category; + PyObject *o, *format, *args; + + format = sss_python_unicode_from_string("<category %lu names [%s] groups [%s]>"); + if (format == NULL) { + return NULL; + } + + strnames = str_concat_sequence(self->names, + discard_const_p(char, ",")); + strgroups = str_concat_sequence(self->groups, + discard_const_p(char, ",")); + category = native_category(self->category); + if (strnames == NULL || strgroups == NULL || category == -1) { + PyMem_Free(strnames); + PyMem_Free(strgroups); + Py_DECREF(format); + return NULL; + } + + args = Py_BuildValue(sss_py_const_p(char, "Kss"), + (unsigned long long ) category, + strnames, strgroups); + if (args == NULL) { + PyMem_Free(strnames); + PyMem_Free(strgroups); + Py_DECREF(format); + return NULL; + } + + o = PyUnicode_Format(format, args); + PyMem_Free(strnames); + PyMem_Free(strgroups); + Py_DECREF(format); + Py_DECREF(args); + return o; +} + +PyDoc_STRVAR(HbacRuleElement_names__doc__, +"(sequence of strings) A list of object names this element applies to"); +PyDoc_STRVAR(HbacRuleElement_groups__doc__, +"(sequence of strings) A list of group names this element applies to"); +PyDoc_STRVAR(HbacRuleElement_category__doc__, +"(set) A set of categories this rule falls into"); + +static PyGetSetDef py_hbac_rule_element_getset[] = { + { discard_const_p(char, "names"), + (getter) hbac_rule_element_get_names, + (setter) hbac_rule_element_set_names, + HbacRuleElement_names__doc__, + NULL }, + + { discard_const_p(char, "groups"), + (getter) hbac_rule_element_get_groups, + (setter) hbac_rule_element_set_groups, + HbacRuleElement_groups__doc__, + NULL }, + + { discard_const_p(char, "category"), + (getter) hbac_rule_element_get_category, + (setter) hbac_rule_element_set_category, + HbacRuleElement_category__doc__, + NULL }, + + { NULL, 0, 0, 0, NULL } /* Sentinel */ +}; + +PyDoc_STRVAR(HbacRuleElement__doc__, +"IPA HBAC Rule Element\n\n" +"HbacRuleElement() -> new empty rule element\n" +"HbacRuleElement([names], [groups], [category]) -> optionally, provide\n" +"names and/or groups and/or category\n"); + +static PyTypeObject pyhbac_hbacrule_element_type = { + PyObject_HEAD_INIT(NULL) + .tp_name = sss_py_const_p(char, "pyhbac.HbacRuleElement"), + .tp_basicsize = sizeof(HbacRuleElement), + .tp_new = HbacRuleElement_new, + .tp_dealloc = (destructor) HbacRuleElement_dealloc, + .tp_traverse = (traverseproc) HbacRuleElement_traverse, + .tp_clear = (inquiry) HbacRuleElement_clear, + .tp_init = (initproc) HbacRuleElement_init, + .tp_repr = (reprfunc) HbacRuleElement_repr, + .tp_getset = py_hbac_rule_element_getset, + .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_GC, + .tp_doc = HbacRuleElement__doc__ +}; + +static void +free_hbac_rule_element(struct hbac_rule_element *el) +{ + if (!el) return; + + free_string_list(el->names); + free_string_list(el->groups); + PyMem_Free(el); +} + +struct hbac_rule_element * +HbacRuleElement_to_native(HbacRuleElement *pyel) +{ + struct hbac_rule_element *el = NULL; + + /* check the type, None would wreak havoc here because for some reason + * it would pass the sequence check */ + if (!PyObject_IsInstance((PyObject *) pyel, + (PyObject *) &pyhbac_hbacrule_element_type)) { + PyErr_Format(PyExc_TypeError, + "The element must be of type HbacRuleElement\n"); + goto fail; + } + + el = PyMem_Malloc(sizeof(struct hbac_rule_element)); + if (!el) { + PyErr_NoMemory(); + goto fail; + } + + el->category = native_category(pyel->category); + el->names = sequence_as_string_list(pyel->names, "names"); + el->groups = sequence_as_string_list(pyel->groups, "groups"); + if (!el->names || !el->groups || el->category == -1) { + goto fail; + } + + return el; + +fail: + free_hbac_rule_element(el); + return NULL; +} + +/* ==================== HBAC Rule ========================*/ +typedef struct { + PyObject_HEAD + + PyObject *name; + bool enabled; + + HbacRuleElement *users; + HbacRuleElement *services; + HbacRuleElement *targethosts; + HbacRuleElement *srchosts; +} HbacRuleObject; + +static void +free_hbac_rule(struct hbac_rule *rule); +static struct hbac_rule * +HbacRule_to_native(HbacRuleObject *pyrule); + +static PyObject * +HbacRule_new(PyTypeObject *type, PyObject *args, PyObject *kwds) +{ + HbacRuleObject *self; + + self = (HbacRuleObject *) type->tp_alloc(type, 0); + if (self == NULL) { + PyErr_NoMemory(); + return NULL; + } + + self->name = sss_python_unicode_from_string(""); + if (self->name == NULL) { + Py_DECREF(self); + PyErr_NoMemory(); + return NULL; + } + + self->enabled = false; + + self->services = (HbacRuleElement *) HbacRuleElement_new( + &pyhbac_hbacrule_element_type, + NULL, NULL); + self->users = (HbacRuleElement *) HbacRuleElement_new( + &pyhbac_hbacrule_element_type, + NULL, NULL); + self->targethosts = (HbacRuleElement *) HbacRuleElement_new( + &pyhbac_hbacrule_element_type, + NULL, NULL); + self->srchosts = (HbacRuleElement *) HbacRuleElement_new( + &pyhbac_hbacrule_element_type, + NULL, NULL); + if (self->services == NULL || self->users == NULL || + self->targethosts == NULL || self->srchosts == NULL) { + Py_XDECREF(self->services); + Py_XDECREF(self->users); + Py_XDECREF(self->targethosts); + Py_XDECREF(self->srchosts); + Py_DECREF(self->name); + Py_DECREF(self); + PyErr_NoMemory(); + return NULL; + } + + return (PyObject *) self; +} + +static int +HbacRule_clear(HbacRuleObject *self) +{ + Py_CLEAR(self->name); + Py_CLEAR(self->services); + Py_CLEAR(self->users); + Py_CLEAR(self->targethosts); + Py_CLEAR(self->srchosts); + return 0; +} + +static void +HbacRule_dealloc(HbacRuleObject *self) +{ + HbacRule_clear(self); + self->ob_type->tp_free((PyObject*) self); +} + +static int +HbacRule_traverse(HbacRuleObject *self, visitproc visit, void *arg) +{ + Py_VISIT((PyObject *) self->name); + Py_VISIT((PyObject *) self->services); + Py_VISIT((PyObject *) self->users); + Py_VISIT((PyObject *) self->targethosts); + Py_VISIT((PyObject *) self->srchosts); + return 0; +} + +static int +hbac_rule_set_name(HbacRuleObject *self, PyObject *name, void *closure); + +static int +HbacRule_init(HbacRuleObject *self, PyObject *args, PyObject *kwargs) +{ + const char * const kwlist[] = { "name", "enabled", NULL }; + PyObject *name = NULL; + PyObject *empty_tuple = NULL; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, + sss_py_const_p(char, "O|i"), + discard_const_p(char *, kwlist), + &name, &self->enabled)) { + return -1; + } + + if (hbac_rule_set_name(self, name, NULL) == -1) { + return -1; + } + + empty_tuple = PyTuple_New(0); + if (!empty_tuple) { + return -1; + } + + if (HbacRuleElement_init(self->users, empty_tuple, NULL) == -1 || + HbacRuleElement_init(self->services, empty_tuple, NULL) == -1 || + HbacRuleElement_init(self->targethosts, empty_tuple, NULL) == -1 || + HbacRuleElement_init(self->srchosts, empty_tuple, NULL) == -1) { + Py_DECREF(empty_tuple); + return -1; + } + + Py_DECREF(empty_tuple); + return 0; +} + +static int +hbac_rule_set_enabled(HbacRuleObject *self, PyObject *enabled, void *closure) +{ + CHECK_ATTRIBUTE_DELETE(enabled, "enabled"); + + if (PyString_Check(enabled) || PyUnicode_Check(enabled)) { + PyObject *utf8_str; + char *str; + + utf8_str = get_utf8_string(enabled, "enabled"); + if (!utf8_str) return -1; + str = PyString_AsString(utf8_str); + if (!str) { + Py_DECREF(utf8_str); + return -1; + } + + if (strcasecmp(str, "true") == 0) { + self->enabled = true; + } else if (strcasecmp(str, "false") == 0) { + self->enabled = false; + } else { + PyErr_Format(PyExc_ValueError, + "enabled only accepts 'true' of 'false' " + "string literals"); + Py_DECREF(utf8_str); + return -1; + } + + Py_DECREF(utf8_str); + return 0; + } else if (PyBool_Check(enabled)) { + self->enabled = (enabled == Py_True); + return 0; + } else if (PyInt_Check(enabled)) { + switch(PyInt_AsLong(enabled)) { + case 0: + self->enabled = false; + break; + case 1: + self->enabled = true; + break; + default: + PyErr_Format(PyExc_ValueError, + "enabled only accepts '0' of '1' " + "integer constants"); + return -1; + } + return 0; + } + + PyErr_Format(PyExc_TypeError, "enabled must be a boolean, an integer " + "1 or 0 or a string constant true/false"); + return -1; + +} + +static PyObject * +hbac_rule_get_enabled(HbacRuleObject *self, void *closure) +{ + if (self->enabled) { + Py_RETURN_TRUE; + } + + Py_RETURN_FALSE; +} + +static int +hbac_rule_set_name(HbacRuleObject *self, PyObject *name, void *closure) +{ + CHECK_ATTRIBUTE_DELETE(name, "name"); + + if (!PyString_Check(name) && !PyUnicode_Check(name)) { + PyErr_Format(PyExc_TypeError, "name must be a string or Unicode"); + return -1; + } + + SAFE_SET(self->name, name); + return 0; +} + +static PyObject * +hbac_rule_get_name(HbacRuleObject *self, void *closure) +{ + if (PyUnicode_Check(self->name)) { + Py_INCREF(self->name); + return self->name; + } else if (PyString_Check(self->name)) { + return PyUnicode_FromEncodedObject(self->name, + PYHBAC_ENCODING, PYHBAC_ENCODING_ERRORS); + } + + /* setter does typechecking but let us be paranoid */ + PyErr_Format(PyExc_TypeError, "name must be a string or Unicode"); + return NULL; +} + +static PyObject * +HbacRule_repr(HbacRuleObject *self) +{ + PyObject *users_repr; + PyObject *services_repr; + PyObject *targethosts_repr; + PyObject *srchosts_repr; + PyObject *o, *format, *args; + + format = sss_python_unicode_from_string("<name %s enabled %d " + "users %s services %s " + "targethosts %s srchosts %s>"); + if (format == NULL) { + return NULL; + } + + users_repr = HbacRuleElement_repr(self->users); + services_repr = HbacRuleElement_repr(self->services); + targethosts_repr = HbacRuleElement_repr(self->targethosts); + srchosts_repr = HbacRuleElement_repr(self->srchosts); + if (users_repr == NULL || services_repr == NULL || + targethosts_repr == NULL || srchosts_repr == NULL) { + Py_XDECREF(users_repr); + Py_XDECREF(services_repr); + Py_XDECREF(targethosts_repr); + Py_XDECREF(srchosts_repr); + Py_DECREF(format); + return NULL; + } + + args = Py_BuildValue(sss_py_const_p(char, "OiOOOO"), + self->name, self->enabled, + users_repr, services_repr, + targethosts_repr, srchosts_repr); + if (args == NULL) { + Py_DECREF(users_repr); + Py_DECREF(services_repr); + Py_DECREF(targethosts_repr); + Py_DECREF(srchosts_repr); + Py_DECREF(format); + return NULL; + } + + o = PyUnicode_Format(format, args); + Py_DECREF(users_repr); + Py_DECREF(services_repr); + Py_DECREF(targethosts_repr); + Py_DECREF(srchosts_repr); + Py_DECREF(format); + Py_DECREF(args); + return o; +} + +static PyObject * +py_hbac_rule_validate(HbacRuleObject *self, PyObject *args) +{ + struct hbac_rule *rule; + bool is_valid; + uint32_t missing; + uint32_t attr; + PyObject *ret = NULL; + PyObject *py_is_valid = NULL; + PyObject *py_missing = NULL; + PyObject *py_attr = NULL; + + rule = HbacRule_to_native(self); + if (!rule) { + /* Make sure there is at least a generic exception */ + if (!PyErr_Occurred()) { + PyErr_Format(PyExc_IOError, + "Could not convert HbacRule to native type\n"); + } + goto fail; + } + + is_valid = hbac_rule_is_complete(rule, &missing); + free_hbac_rule(rule); + + ret = PyTuple_New(2); + if (!ret) { + PyErr_NoMemory(); + goto fail; + } + + py_is_valid = PyBool_FromLong(is_valid); + py_missing = sss_python_set_new(); + if (!py_missing || !py_is_valid) { + PyErr_NoMemory(); + goto fail; + } + + for (attr = HBAC_RULE_ELEMENT_USERS; + attr <= HBAC_RULE_ELEMENT_SOURCEHOSTS; + attr <<= 1) { + if (!(missing & attr)) continue; + + py_attr = PyInt_FromLong(attr); + if (!py_attr) { + PyErr_NoMemory(); + goto fail; + } + + if (sss_python_set_add(py_missing, py_attr) != 0) { + /* If the set-add succeeded, it would steal the reference */ + Py_DECREF(py_attr); + goto fail; + } + } + + PyTuple_SET_ITEM(ret, 0, py_is_valid); + PyTuple_SET_ITEM(ret, 1, py_missing); + return ret; + +fail: + Py_XDECREF(ret); + Py_XDECREF(py_missing); + Py_XDECREF(py_is_valid); + return NULL; +} + +PyDoc_STRVAR(py_hbac_rule_validate__doc__, +"validate() -> (valid, missing)\n\n" +"Validate an HBAC rule\n" +"Returns a tuple of (bool, set). The boolean value describes whether\n" +"the rule is valid. If it is False, then the set lists all the missing " +"rule elements as HBAC_RULE_ELEMENT_* constants\n"); + +static PyMethodDef py_hbac_rule_methods[] = { + { sss_py_const_p(char, "validate"), + (PyCFunction) py_hbac_rule_validate, + METH_VARARGS, py_hbac_rule_validate__doc__, + }, + { NULL, NULL, 0, NULL } /* Sentinel */ +}; + +PyDoc_STRVAR(HbacRuleObject_users__doc__, +"(HbacRuleElement) Users and user groups for which this rule applies"); +PyDoc_STRVAR(HbacRuleObject_services__doc__, +"(HbacRuleElement) Services and service groups for which this rule applies"); +PyDoc_STRVAR(HbacRuleObject_targethosts__doc__, +"(HbacRuleElement) Target hosts for which this rule applies"); +PyDoc_STRVAR(HbacRuleObject_srchosts__doc__, +"(HbacRuleElement) Source hosts for which this rule applies"); + +static PyMemberDef py_hbac_rule_members[] = { + { discard_const_p(char, "users"), T_OBJECT_EX, + offsetof(HbacRuleObject, users), 0, + HbacRuleObject_users__doc__ }, + + { discard_const_p(char, "services"), T_OBJECT_EX, + offsetof(HbacRuleObject, services), 0, + HbacRuleObject_services__doc__ }, + + { discard_const_p(char, "targethosts"), T_OBJECT_EX, + offsetof(HbacRuleObject, targethosts), 0, + HbacRuleObject_targethosts__doc__}, + + { discard_const_p(char, "srchosts"), T_OBJECT_EX, + offsetof(HbacRuleObject, srchosts), 0, + HbacRuleObject_srchosts__doc__}, + + { NULL, 0, 0, 0, NULL } /* Sentinel */ +}; + +PyDoc_STRVAR(HbacRuleObject_enabled__doc__, +"(bool) Is the rule enabled"); +PyDoc_STRVAR(HbacRuleObject_name__doc__, +"(string) The name of the rule"); + +static PyGetSetDef py_hbac_rule_getset[] = { + { discard_const_p(char, "enabled"), + (getter) hbac_rule_get_enabled, + (setter) hbac_rule_set_enabled, + HbacRuleObject_enabled__doc__, + NULL }, + + { discard_const_p(char, "name"), + (getter) hbac_rule_get_name, + (setter) hbac_rule_set_name, + HbacRuleObject_name__doc__, + NULL }, + + {NULL, 0, 0, 0, NULL} /* Sentinel */ +}; + +PyDoc_STRVAR(HbacRuleObject__doc__, +"IPA HBAC Rule\n\n" +"HbacRule(name, [enabled]) -> instantiate an empty rule, optionally\n" +"specify whether it is enabled. Rules are created disabled by default and\n" +"contain empty HbacRuleElement instances in services, users, targethosts\n" +"and srchosts attributes.\n"); + +static PyTypeObject pyhbac_hbacrule_type = { + PyObject_HEAD_INIT(NULL) + .tp_name = sss_py_const_p(char, "pyhbac.HbacRule"), + .tp_basicsize = sizeof(HbacRuleObject), + .tp_new = HbacRule_new, + .tp_dealloc = (destructor) HbacRule_dealloc, + .tp_traverse = (traverseproc) HbacRule_traverse, + .tp_clear = (inquiry) HbacRule_clear, + .tp_init = (initproc) HbacRule_init, + .tp_repr = (reprfunc) HbacRule_repr, + .tp_members = py_hbac_rule_members, + .tp_methods = py_hbac_rule_methods, + .tp_getset = py_hbac_rule_getset, + .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_GC, + .tp_doc = HbacRuleObject__doc__ +}; + +static void +free_hbac_rule(struct hbac_rule *rule) +{ + if (!rule) return; + + free_hbac_rule_element(rule->services); + free_hbac_rule_element(rule->users); + free_hbac_rule_element(rule->targethosts); + free_hbac_rule_element(rule->srchosts); + + PyMem_Free(discard_const_p(char, rule->name)); + PyMem_Free(rule); +} + +static struct hbac_rule * +HbacRule_to_native(HbacRuleObject *pyrule) +{ + struct hbac_rule *rule = NULL; + PyObject *utf_name; + + rule = PyMem_Malloc(sizeof(struct hbac_rule)); + if (!rule) { + PyErr_NoMemory(); + goto fail; + } + + if (!PyObject_IsInstance((PyObject *) pyrule, + (PyObject *) &pyhbac_hbacrule_type)) { + PyErr_Format(PyExc_TypeError, + "The rule must be of type HbacRule\n"); + goto fail; + } + + utf_name = get_utf8_string(pyrule->name, "name"); + if (utf_name == NULL) { + return NULL; + } + + rule->name = py_strdup(PyString_AsString(utf_name)); + Py_DECREF(utf_name); + if (rule->name == NULL) { + goto fail; + } + + rule->services = HbacRuleElement_to_native(pyrule->services); + rule->users = HbacRuleElement_to_native(pyrule->users); + rule->targethosts = HbacRuleElement_to_native(pyrule->targethosts); + rule->srchosts = HbacRuleElement_to_native(pyrule->srchosts); + if (!rule->services || !rule->users || + !rule->targethosts || !rule->srchosts) { + goto fail; + } + + rule->enabled = pyrule->enabled; + return rule; + +fail: + free_hbac_rule(rule); + return NULL; +} + +/* ==================== HBAC Request Element ========================*/ +typedef struct { + PyObject_HEAD + + PyObject *name; + PyObject *groups; +} HbacRequestElement; + +static PyObject * +HbacRequestElement_new(PyTypeObject *type, PyObject *args, PyObject *kwds) +{ + HbacRequestElement *self; + + self = (HbacRequestElement *) type->tp_alloc(type, 0); + if (self == NULL) { + PyErr_NoMemory(); + return NULL; + } + + self->name = sss_python_unicode_from_string(""); + if (self->name == NULL) { + PyErr_NoMemory(); + Py_DECREF(self); + return NULL; + } + + self->groups = PyList_New(0); + if (self->groups == NULL) { + Py_DECREF(self->name); + Py_DECREF(self); + PyErr_NoMemory(); + return NULL; + } + + return (PyObject *) self; +} + +static int +HbacRequestElement_clear(HbacRequestElement *self) +{ + Py_CLEAR(self->name); + Py_CLEAR(self->groups); + return 0; +} + +static void +HbacRequestElement_dealloc(HbacRequestElement *self) +{ + HbacRequestElement_clear(self); + self->ob_type->tp_free((PyObject*) self); +} + +static int +HbacRequestElement_traverse(HbacRequestElement *self, + visitproc visit, void *arg) +{ + Py_VISIT(self->name); + Py_VISIT(self->groups); + return 0; +} + +static int +hbac_request_element_set_groups(HbacRequestElement *self, + PyObject *groups, + void *closure); +static int +hbac_request_element_set_name(HbacRequestElement *self, + PyObject *name, + void *closure); + +static int +HbacRequestElement_init(HbacRequestElement *self, + PyObject *args, + PyObject *kwargs) +{ + const char * const kwlist[] = { "name", "groups", NULL }; + PyObject *name = NULL; + PyObject *groups = NULL; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, + sss_py_const_p(char, "|OO"), + discard_const_p(char *, kwlist), + &name, &groups)) { + return -1; + } + + if (name) { + if (hbac_request_element_set_name(self, name, NULL) != 0) { + return -1; + } + } + + if (groups) { + if (hbac_request_element_set_groups(self, groups, NULL) != 0) { + return -1; + } + } + + return 0; +} + +static int +hbac_request_element_set_name(HbacRequestElement *self, + PyObject *name, + void *closure) +{ + CHECK_ATTRIBUTE_DELETE(name, "name"); + + if (!PyString_Check(name) && !PyUnicode_Check(name)) { + PyErr_Format(PyExc_TypeError, "name must be a string or Unicode"); + return -1; + } + + SAFE_SET(self->name, name); + return 0; +} + +static PyObject * +hbac_request_element_get_name(HbacRequestElement *self, void *closure) +{ + if (PyUnicode_Check(self->name)) { + Py_INCREF(self->name); + return self->name; + } else if (PyString_Check(self->name)) { + return PyUnicode_FromEncodedObject(self->name, + PYHBAC_ENCODING, PYHBAC_ENCODING_ERRORS); + } + + /* setter does typechecking but let us be paranoid */ + PyErr_Format(PyExc_TypeError, "name must be a string or Unicode"); + return NULL; +} + +static int +hbac_request_element_set_groups(HbacRequestElement *self, + PyObject *groups, + void *closure) +{ + CHECK_ATTRIBUTE_DELETE(groups, "groups"); + + if (!verify_sequence(groups, "groups")) { + return -1; + } + + SAFE_SET(self->groups, groups); + return 0; +} + +static PyObject * +hbac_request_element_get_groups(HbacRequestElement *self, void *closure) +{ + Py_INCREF(self->groups); + return self->groups; +} + +static PyObject * +HbacRequestElement_repr(HbacRequestElement *self) +{ + char *strgroups; + PyObject *o, *format, *args; + + format = sss_python_unicode_from_string("<name %s groups [%s]>"); + if (format == NULL) { + return NULL; + } + + strgroups = str_concat_sequence(self->groups, discard_const_p(char, ",")); + if (strgroups == NULL) { + Py_DECREF(format); + return NULL; + } + + args = Py_BuildValue(sss_py_const_p(char, "Os"), self->name, strgroups); + if (args == NULL) { + PyMem_Free(strgroups); + Py_DECREF(format); + return NULL; + } + + o = PyUnicode_Format(format, args); + PyMem_Free(strgroups); + Py_DECREF(format); + Py_DECREF(args); + return o; +} + +PyDoc_STRVAR(HbacRequestElement_name__doc__, +"(string) An object name this element applies to"); +PyDoc_STRVAR(HbacRequestElement_groups__doc__, +"(list of strings) A list of group names this element applies to"); + +static PyGetSetDef py_hbac_request_element_getset[] = { + { discard_const_p(char, "name"), + (getter) hbac_request_element_get_name, + (setter) hbac_request_element_set_name, + HbacRequestElement_name__doc__, + NULL }, + + { discard_const_p(char, "groups"), + (getter) hbac_request_element_get_groups, + (setter) hbac_request_element_set_groups, + HbacRequestElement_groups__doc__, + NULL }, + + { NULL, 0, 0, 0, NULL } /* Sentinel */ +}; + +PyDoc_STRVAR(HbacRequestElement__doc__, +"IPA HBAC Request Element\n\n" +"HbacRequestElement() -> new empty request element\n" +"HbacRequestElement([name], [groups]) -> optionally, provide name and/or " +"groups\n"); + +static PyTypeObject pyhbac_hbacrequest_element_type = { + PyObject_HEAD_INIT(NULL) + .tp_name = sss_py_const_p(char, "pyhbac.HbacRequestElement"), + .tp_basicsize = sizeof(HbacRequestElement), + .tp_new = HbacRequestElement_new, + .tp_dealloc = (destructor) HbacRequestElement_dealloc, + .tp_traverse = (traverseproc) HbacRequestElement_traverse, + .tp_clear = (inquiry) HbacRequestElement_clear, + .tp_init = (initproc) HbacRequestElement_init, + .tp_repr = (reprfunc) HbacRequestElement_repr, + .tp_getset = py_hbac_request_element_getset, + .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_GC, + .tp_doc = HbacRequestElement__doc__ +}; + +static void +free_hbac_request_element(struct hbac_request_element *el) +{ + if (!el) return; + + PyMem_Free(discard_const_p(char, el->name)); + free_string_list(el->groups); + PyMem_Free(el); +} + +static struct hbac_request_element * +HbacRequestElement_to_native(HbacRequestElement *pyel) +{ + struct hbac_request_element *el = NULL; + PyObject *utf_name; + + if (!PyObject_IsInstance((PyObject *) pyel, + (PyObject *) &pyhbac_hbacrequest_element_type)) { + PyErr_Format(PyExc_TypeError, + "The element must be of type HbacRequestElement\n"); + goto fail; + } + + el = PyMem_Malloc(sizeof(struct hbac_request_element)); + if (!el) { + PyErr_NoMemory(); + goto fail; + } + + utf_name = get_utf8_string(pyel->name, "name"); + if (utf_name == NULL) { + return NULL; + } + + el->name = py_strdup(PyString_AsString(utf_name)); + Py_DECREF(utf_name); + if (!el->name) { + goto fail; + } + + el->groups = sequence_as_string_list(pyel->groups, "groups"); + if (!el->groups) { + goto fail; + } + + return el; + +fail: + free_hbac_request_element(el); + return NULL; +} + +/* ==================== HBAC Request ========================*/ +typedef struct { + PyObject_HEAD + + HbacRequestElement *service; + HbacRequestElement *user; + HbacRequestElement *targethost; + HbacRequestElement *srchost; + + PyObject *rule_name; +} HbacRequest; + +static PyObject * +HbacRequest_new(PyTypeObject *type, PyObject *args, PyObject *kwds) +{ + HbacRequest *self; + + self = (HbacRequest *) type->tp_alloc(type, 0); + if (self == NULL) { + PyErr_NoMemory(); + return NULL; + } + + self->service = (HbacRequestElement *) HbacRequestElement_new( + &pyhbac_hbacrequest_element_type, + NULL, NULL); + self->user = (HbacRequestElement *) HbacRequestElement_new( + &pyhbac_hbacrequest_element_type, + NULL, NULL); + self->targethost = (HbacRequestElement *) HbacRequestElement_new( + &pyhbac_hbacrequest_element_type, + NULL, NULL); + self->srchost = (HbacRequestElement *) HbacRequestElement_new( + &pyhbac_hbacrequest_element_type, + NULL, NULL); + if (self->service == NULL || self->user == NULL || + self->targethost == NULL || self->srchost == NULL) { + Py_XDECREF(self->service); + Py_XDECREF(self->user); + Py_XDECREF(self->targethost); + Py_XDECREF(self->srchost); + Py_DECREF(self); + PyErr_NoMemory(); + return NULL; + } + + return (PyObject *) self; +} + +static int +HbacRequest_clear(HbacRequest *self) +{ + Py_CLEAR(self->service); + Py_CLEAR(self->user); + Py_CLEAR(self->targethost); + Py_CLEAR(self->srchost); + Py_CLEAR(self->rule_name); + return 0; +} + +static void +HbacRequest_dealloc(HbacRequest *self) +{ + HbacRequest_clear(self); + self->ob_type->tp_free((PyObject*) self); +} + +static int +HbacRequest_traverse(HbacRequest *self, visitproc visit, void *arg) +{ + Py_VISIT((PyObject *) self->service); + Py_VISIT((PyObject *) self->user); + Py_VISIT((PyObject *) self->targethost); + Py_VISIT((PyObject *) self->srchost); + return 0; +} + +static int +HbacRequest_init(HbacRequest *self, PyObject *args, PyObject *kwargs) +{ + PyObject *empty_tuple = NULL; + + empty_tuple = PyTuple_New(0); + if (!empty_tuple) { + PyErr_NoMemory(); + return -1; + } + + self->rule_name = NULL; + + if (HbacRequestElement_init(self->user, empty_tuple, NULL) == -1 || + HbacRequestElement_init(self->service, empty_tuple, NULL) == -1 || + HbacRequestElement_init(self->targethost, empty_tuple, NULL) == -1 || + HbacRequestElement_init(self->srchost, empty_tuple, NULL) == -1) { + Py_DECREF(empty_tuple); + return -1; + } + + Py_DECREF(empty_tuple); + return 0; +} + +PyDoc_STRVAR(py_hbac_evaluate__doc__, +"evaluate(rules) -> int\n\n" +"Evaluate a set of HBAC rules.\n" +"rules is a sequence of HbacRule objects. The returned value describes\n" +"the result of evaluation and will have one of HBAC_EVAL_* values.\n" +"Use hbac_result_string() to get textual representation of the result\n" +"On error, HbacError exception is raised.\n" +"If HBAC_EVAL_ALLOW is returned, the class attribute rule_name would\n" +"contain the name of the rule that matched. Otherwise, the attribute\n" +"contains None\n"); + +static struct hbac_eval_req * +HbacRequest_to_native(HbacRequest *pyreq); + +static void +free_hbac_rule_list(struct hbac_rule **rules) +{ + int i; + + if (!rules) return; + + for(i=0; rules[i]; i++) { + free_hbac_rule(rules[i]); + } + PyMem_Free(rules); +} + +static void +free_hbac_eval_req(struct hbac_eval_req *req); + +static PyObject * +py_hbac_evaluate(HbacRequest *self, PyObject *args) +{ + PyObject *py_rules_list = NULL; + PyObject *py_rule = NULL; + Py_ssize_t num_rules; + struct hbac_rule **rules = NULL; + struct hbac_eval_req *hbac_req = NULL; + enum hbac_eval_result eres; + struct hbac_info *info = NULL; + PyObject *ret = NULL; + long i; + + if (!PyArg_ParseTuple(args, sss_py_const_p(char, "O"), &py_rules_list)) { + goto fail; + } + + if (!PySequence_Check(py_rules_list)) { + PyErr_Format(PyExc_TypeError, + "The parameter rules must be a sequence\n"); + goto fail; + } + + num_rules = PySequence_Size(py_rules_list); + rules = PyMem_New(struct hbac_rule *, num_rules+1); + if (!rules) { + PyErr_NoMemory(); + goto fail; + } + + for (i=0; i < num_rules; i++) { + py_rule = PySequence_GetItem(py_rules_list, i); + + if (!PyObject_IsInstance(py_rule, + (PyObject *) &pyhbac_hbacrule_type)) { + PyErr_Format(PyExc_TypeError, + "A rule must be of type HbacRule\n"); + goto fail; + } + + rules[i] = HbacRule_to_native((HbacRuleObject *) py_rule); + if (!rules[i]) { + /* Make sure there is at least a generic exception */ + if (!PyErr_Occurred()) { + PyErr_Format(PyExc_IOError, + "Could not convert HbacRule to native type\n"); + } + goto fail; + } + } + rules[num_rules] = NULL; + + hbac_req = HbacRequest_to_native(self); + if (!hbac_req) { + if (!PyErr_Occurred()) { + PyErr_Format(PyExc_IOError, + "Could not convert HbacRequest to native type\n"); + } + goto fail; + } + + Py_XDECREF(self->rule_name); + self->rule_name = NULL; + + eres = hbac_evaluate(rules, hbac_req, &info); + switch (eres) { + case HBAC_EVAL_ALLOW: + self->rule_name = sss_python_unicode_from_string(info->rule_name); + if (!self->rule_name) { + PyErr_NoMemory(); + goto fail; + } + /* FALLTHROUGH */ + case HBAC_EVAL_DENY: + ret = PyInt_FromLong(eres); + break; + case HBAC_EVAL_ERROR: + set_hbac_exception(PyExc_HbacError, info); + goto fail; + case HBAC_EVAL_OOM: + PyErr_NoMemory(); + goto fail; + } + + free_hbac_eval_req(hbac_req); + free_hbac_rule_list(rules); + hbac_free_info(info); + return ret; + +fail: + hbac_free_info(info); + free_hbac_eval_req(hbac_req); + free_hbac_rule_list(rules); + return NULL; +} + +static PyObject * +hbac_request_element_get_rule_name(HbacRequest *self, void *closure) +{ + if (self->rule_name == NULL) { + Py_INCREF(Py_None); + return Py_None; + } else if (PyUnicode_Check(self->rule_name)) { + Py_INCREF(self->rule_name); + return self->rule_name; + } + + PyErr_Format(PyExc_TypeError, "rule_name is not Unicode"); + return NULL; +} + +static PyObject * +HbacRequest_repr(HbacRequest *self) +{ + PyObject *user_repr; + PyObject *service_repr; + PyObject *targethost_repr; + PyObject *srchost_repr; + PyObject *o, *format, *args; + + format = sss_python_unicode_from_string("<user %s service %s " + "targethost %s srchost %s>"); + if (format == NULL) { + return NULL; + } + + user_repr = HbacRequestElement_repr(self->user); + service_repr = HbacRequestElement_repr(self->service); + targethost_repr = HbacRequestElement_repr(self->targethost); + srchost_repr = HbacRequestElement_repr(self->srchost); + if (user_repr == NULL || service_repr == NULL || + targethost_repr == NULL || srchost_repr == NULL) { + Py_XDECREF(user_repr); + Py_XDECREF(service_repr); + Py_XDECREF(targethost_repr); + Py_XDECREF(srchost_repr); + Py_DECREF(format); + return NULL; + } + + args = Py_BuildValue(sss_py_const_p(char, "OOOO"), + user_repr, service_repr, + targethost_repr, srchost_repr); + if (args == NULL) { + Py_DECREF(user_repr); + Py_DECREF(service_repr); + Py_DECREF(targethost_repr); + Py_DECREF(srchost_repr); + Py_DECREF(format); + } + + o = PyUnicode_Format(format, args); + Py_DECREF(user_repr); + Py_DECREF(service_repr); + Py_DECREF(targethost_repr); + Py_DECREF(srchost_repr); + Py_DECREF(format); + Py_DECREF(args); + return o; +} + +static PyMethodDef py_hbac_request_methods[] = { + { sss_py_const_p(char, "evaluate"), + (PyCFunction) py_hbac_evaluate, + METH_VARARGS, py_hbac_evaluate__doc__ + }, + { NULL, NULL, 0, NULL } /* Sentinel */ +}; + +PyDoc_STRVAR(HbacRequest_service__doc__, +"(HbacRequestElement) This is a list of service DNs to check, it must\n" +"consist of the actual service requested, as well as all parent groups\n" +"containing that service"); +PyDoc_STRVAR(HbacRequest_user__doc__, +"(HbacRequestElement) This is a list of user DNs to check, it must consist\n" +"of the actual user requested, as well as all parent groups containing\n" +"that user."); +PyDoc_STRVAR(HbacRequest_targethost__doc__, +"(HbacRequestElement) This is a list of target hosts to check, it must\n" +"consist of the actual target host requested, as well as all parent groups\n" +"containing that target host."); +PyDoc_STRVAR(HbacRequest_srchost__doc__, +"(HbacRequestElement) This is a list of source hosts to check, it must\n" +"consist of the actual source host requested, as well as all parent groups\n" +"containing that source host."); + +static PyMemberDef py_hbac_request_members[] = { + { discard_const_p(char, "service"), T_OBJECT_EX, + offsetof(HbacRequest, service), 0, + HbacRequest_service__doc__ }, + + { discard_const_p(char, "user"), T_OBJECT_EX, + offsetof(HbacRequest, user), 0, + HbacRequest_user__doc__ }, + + { discard_const_p(char, "targethost"), T_OBJECT_EX, + offsetof(HbacRequest, targethost), 0, + HbacRequest_targethost__doc__ }, + + { discard_const_p(char, "srchost"), T_OBJECT_EX, + offsetof(HbacRequest, srchost), 0, + HbacRequest_srchost__doc__ }, + + { NULL, 0, 0, 0, NULL } /* Sentinel */ +}; + +PyDoc_STRVAR(HbacRequest_rule_name__doc__, +"(string) If result of evaluation was to allow access, this member contains\n" +"the name of the rule that allowed it. Otherwise, this attribute contains \n" +"None. This attribute is read-only.\n"); + +static PyGetSetDef py_hbac_request_getset[] = { + { discard_const_p(char, "rule_name"), + (getter) hbac_request_element_get_rule_name, + NULL, /* read only */ + HbacRequest_rule_name__doc__, + NULL }, + + { NULL, 0, 0, 0, NULL } /* Sentinel */ +}; + +PyDoc_STRVAR(HbacRequest__doc__, +"IPA HBAC Request\n\n" +"HbacRequest() -> new empty HBAC request"); + +static PyTypeObject pyhbac_hbacrequest_type = { + PyObject_HEAD_INIT(NULL) + .tp_name = sss_py_const_p(char, "pyhbac.HbacRequest"), + .tp_basicsize = sizeof(HbacRequest), + .tp_new = HbacRequest_new, + .tp_dealloc = (destructor) HbacRequest_dealloc, + .tp_traverse = (traverseproc) HbacRequest_traverse, + .tp_clear = (inquiry) HbacRequest_clear, + .tp_init = (initproc) HbacRequest_init, + .tp_repr = (reprfunc) HbacRequest_repr, + .tp_methods = py_hbac_request_methods, + .tp_members = py_hbac_request_members, + .tp_getset = py_hbac_request_getset, + .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, + .tp_doc = HbacRequest__doc__ +}; + +static void +free_hbac_eval_req(struct hbac_eval_req *req) +{ + if (!req) return; + + free_hbac_request_element(req->service); + free_hbac_request_element(req->user); + free_hbac_request_element(req->targethost); + free_hbac_request_element(req->srchost); + + PyMem_Free(req); +} + +static struct hbac_eval_req * +HbacRequest_to_native(HbacRequest *pyreq) +{ + struct hbac_eval_req *req = NULL; + + req = PyMem_Malloc(sizeof(struct hbac_eval_req)); + if (!req) { + PyErr_NoMemory(); + goto fail; + } + + if (!PyObject_IsInstance((PyObject *) pyreq, + (PyObject *) &pyhbac_hbacrequest_type)) { + PyErr_Format(PyExc_TypeError, + "The request must be of type HbacRequest\n"); + goto fail; + } + + req->service = HbacRequestElement_to_native(pyreq->service); + req->user = HbacRequestElement_to_native(pyreq->user); + req->targethost = HbacRequestElement_to_native(pyreq->targethost); + req->srchost = HbacRequestElement_to_native(pyreq->srchost); + if (!req->service || !req->user || + !req->targethost || !req->srchost) { + goto fail; + } + return req; + +fail: + free_hbac_eval_req(req); + return NULL; +} + +/* =================== the pyhbac module initialization =====================*/ +PyDoc_STRVAR(py_hbac_result_string__doc__, +"hbac_result_string(code) -> string\n" +"Returns a string representation of the HBAC result code"); + +static PyObject * +py_hbac_result_string(PyObject *module, PyObject *args) +{ + enum hbac_eval_result result; + const char *str; + + if (!PyArg_ParseTuple(args, sss_py_const_p(char, "i"), &result)) { + return NULL; + } + + str = hbac_result_string(result); + if (str == NULL) { + /* None needs to be referenced, too */ + Py_INCREF(Py_None); + return Py_None; + } + + return sss_python_unicode_from_string(str); +} + +PyDoc_STRVAR(py_hbac_error_string__doc__, +"hbac_error_string(code) -> string\n" +"Returns a string representation of the HBAC error code"); + +static PyObject * +py_hbac_error_string(PyObject *module, PyObject *args) +{ + enum hbac_error_code code; + const char *str; + + if (!PyArg_ParseTuple(args, sss_py_const_p(char, "i"), &code)) { + return NULL; + } + + str = hbac_error_string(code); + if (str == NULL) { + /* None needs to be referenced, too */ + Py_INCREF(Py_None); + return Py_None; + } + + return sss_python_unicode_from_string(str); +} + +static PyMethodDef pyhbac_module_methods[] = { + { sss_py_const_p(char, "hbac_result_string"), + (PyCFunction) py_hbac_result_string, + METH_VARARGS, + py_hbac_result_string__doc__, + }, + + { sss_py_const_p(char, "hbac_error_string"), + (PyCFunction) py_hbac_error_string, + METH_VARARGS, + py_hbac_error_string__doc__, + }, + + {NULL, NULL, 0, NULL} /* Sentinel */ +}; + +PyDoc_STRVAR(HbacError__doc__, +"An HBAC processing exception\n\n" +"This exception is raised when there is an internal error during the\n" +"HBAC processing, such as an Out-Of-Memory situation or unparseable\n" +"rule. HbacError.args argument is a tuple that contains error code and\n" +"the name of the rule that was being processed. Use hbac_error_string()\n" +"to get the text representation of the HBAC error"); + +PyMODINIT_FUNC +initpyhbac(void) +{ + PyObject *m; + int ret; + + m = Py_InitModule(sss_py_const_p(char, PYTHON_MODULE_NAME), pyhbac_module_methods); + if (m == NULL) return; + + /* The HBAC module exception */ + PyExc_HbacError = sss_exception_with_doc( + discard_const_p(char, "hbac.HbacError"), + HbacError__doc__, + PyExc_EnvironmentError, NULL); + Py_INCREF(PyExc_HbacError); + ret = PyModule_AddObject(m, sss_py_const_p(char, "HbacError"), PyExc_HbacError); + if (ret == -1) return; + + /* HBAC rule categories */ + ret = PyModule_AddIntMacro(m, HBAC_CATEGORY_NULL); + if (ret == -1) return; + ret = PyModule_AddIntMacro(m, HBAC_CATEGORY_ALL); + if (ret == -1) return; + + /* HBAC rule elements */ + ret = PyModule_AddIntMacro(m, HBAC_RULE_ELEMENT_USERS); + if (ret == -1) return; + ret = PyModule_AddIntMacro(m, HBAC_RULE_ELEMENT_SERVICES); + if (ret == -1) return; + ret = PyModule_AddIntMacro(m, HBAC_RULE_ELEMENT_TARGETHOSTS); + if (ret == -1) return; + ret = PyModule_AddIntMacro(m, HBAC_RULE_ELEMENT_SOURCEHOSTS); + if (ret == -1) return; + + /* enum hbac_eval_result */ + ret = PyModule_AddIntMacro(m, HBAC_EVAL_ALLOW); + if (ret == -1) return; + ret = PyModule_AddIntMacro(m, HBAC_EVAL_DENY); + if (ret == -1) return; + ret = PyModule_AddIntMacro(m, HBAC_EVAL_ERROR); + if (ret == -1) return; + + /* enum hbac_error_code */ + ret = PyModule_AddIntMacro(m, HBAC_ERROR_UNKNOWN); + if (ret == -1) return; + ret = PyModule_AddIntMacro(m, HBAC_SUCCESS); + if (ret == -1) return; + ret = PyModule_AddIntMacro(m, HBAC_ERROR_NOT_IMPLEMENTED); + if (ret == -1) return; + ret = PyModule_AddIntMacro(m, HBAC_ERROR_OUT_OF_MEMORY); + if (ret == -1) return; + ret = PyModule_AddIntMacro(m, HBAC_ERROR_UNPARSEABLE_RULE); + if (ret == -1) return; + + TYPE_READY(m, pyhbac_hbacrule_type, "HbacRule"); + TYPE_READY(m, pyhbac_hbacrule_element_type, "HbacRuleElement"); + TYPE_READY(m, pyhbac_hbacrequest_element_type, "HbacRequestElement"); + TYPE_READY(m, pyhbac_hbacrequest_type, "HbacRequest"); +} diff --git a/src/tests/pyhbac-test.py b/src/tests/pyhbac-test.py new file mode 100755 index 000000000..5579180b0 --- /dev/null +++ b/src/tests/pyhbac-test.py @@ -0,0 +1,522 @@ +#!/usr/bin/python + +import unittest +import sys +import os +import copy + +srcdir = os.getenv('builddir') +if not srcdir: + srcdir = "." +MODPATH = srcdir + "/.libs" #FIXME - is there a way to get this from libtool? + +def compat_assertItemsEqual(this, expected_seq, actual_seq, msg=None): + return this.assertEqual(sorted(expected_seq), sorted(actual_seq)) + +def compat_assertIsInstance(this, obj, cls, msg=None): + return this.assertTrue(isinstance(obj, cls)) + +# add compat methods for old unittest.TestCase versions +# (python < 2.7, RHEL5 for instance) +if not hasattr(unittest.TestCase, "assertItemsEqual"): + setattr(unittest.TestCase, "assertItemsEqual", compat_assertItemsEqual) +if not hasattr(unittest.TestCase, "assertIsInstance"): + setattr(unittest.TestCase, "assertIsInstance", compat_assertIsInstance) + +class PyHbacImport(unittest.TestCase): + def setUp(self): + " Make sure we load the in-tree module " + self.system_path = sys.path[:] + sys.path = [ MODPATH ] + + def tearDown(self): + " Restore the system path " + sys.path = self.system_path + + def testImport(self): + " Import the module and assert it comes from tree " + try: + import pyhbac + except ImportError, e: + print >>sys.stderr, "Could not load the pyhbac module. Please check if it is compiled" + raise e + self.assertEqual(pyhbac.__file__, MODPATH + "/pyhbac.so") + +class PyHbacRuleElementTest(unittest.TestCase): + def testInstantiateEmpty(self): + el = pyhbac.HbacRuleElement() + self.assertItemsEqual(el.names, []) + self.assertItemsEqual(el.groups, []) + self.assertItemsEqual(el.category, set([pyhbac.HBAC_CATEGORY_NULL])) + + def testInit(self): + names = [ "foo", "bar" ] + el = pyhbac.HbacRuleElement(names=names) + self.assertItemsEqual(el.names, names) + + groups = [ "abc", "def" ] + el = pyhbac.HbacRuleElement(groups=groups) + self.assertItemsEqual(el.groups, groups) + + def testGetSet(self): + names = [ "foo", "bar" ] + el = pyhbac.HbacRuleElement() + self.assertItemsEqual(el.names, []) + el.names = names + self.assertItemsEqual(el.names, names) + + groups = [ "abc", "def" ] + el = pyhbac.HbacRuleElement() + self.assertItemsEqual(el.groups, []) + el.groups = groups + self.assertItemsEqual(el.groups, groups) + + # Test other iterables than list + groups = ( "abc", "def" ) + el = pyhbac.HbacRuleElement() + self.assertItemsEqual(el.groups, []) + el.groups = groups + self.assertItemsEqual(el.groups, groups) + + def testCategory(self): + el = pyhbac.HbacRuleElement() + assert pyhbac.HBAC_CATEGORY_NULL in el.category + assert pyhbac.HBAC_CATEGORY_ALL not in el.category + + el.category.add(pyhbac.HBAC_CATEGORY_ALL) + assert pyhbac.HBAC_CATEGORY_ALL in el.category + + el.category = set([pyhbac.HBAC_CATEGORY_ALL]) + assert pyhbac.HBAC_CATEGORY_ALL in el.category + + # negative tests + self.assertRaises(TypeError, el.__setattr__, "category", [pyhbac.HBAC_CATEGORY_ALL]) + self.assertRaises(TypeError, el.__setattr__, "category", None) + self.assertRaises(TypeError, el.__setattr__, "category", 1) + + def testNotIterable(self): + self.assertRaises(TypeError, pyhbac.HbacRuleElement, names=123) + self.assertRaises(TypeError, pyhbac.HbacRuleElement, names=None) + + def testRuleElementReference(self): + def _get_rule(): + users = [ "foo", "bar" ] + user_groups = [ "abc", "def" ] + return pyhbac.HbacRuleElement(names=users, groups=user_groups) + + el = _get_rule() + self.assertItemsEqual(el.names, [ "foo", "bar" ]) + self.assertItemsEqual(el.groups, [ "abc", "def" ]) + + def testRepr(self): + el = pyhbac.HbacRuleElement() + self.assertEquals(el.__repr__(), u'<category 0 names [] groups []>') + + el.category.add(pyhbac.HBAC_CATEGORY_ALL) + el.names = ['foo'] + el.groups = ['bar, baz'] + self.assertEquals(el.__repr__(), u'<category 1 names [foo] groups [bar, baz]>') + + +class PyHbacRuleTest(unittest.TestCase): + def testRuleGetSetName(self): + name = "testGetRule" + new_name = "testGetNewRule" + + rule = pyhbac.HbacRule(name) + self.assertEqual(rule.name, unicode(name)) + + rule.name = new_name + self.assertEqual(rule.name, unicode(new_name)) + + def testRuleGetSetEnabled(self): + rule = pyhbac.HbacRule("testRuleGetSetEnabled") + + rule.enabled = True + self.assertEqual(rule.enabled, True) + rule.enabled = False + self.assertEqual(rule.enabled, False) + + rule.enabled = "TRUE" + self.assertEqual(rule.enabled, True) + rule.enabled = "FALSE" + self.assertEqual(rule.enabled, False) + + rule.enabled = "true" + self.assertEqual(rule.enabled, True) + rule.enabled = "false" + self.assertEqual(rule.enabled, False) + + rule.enabled = "True" + self.assertEqual(rule.enabled, True) + rule.enabled = "False" + self.assertEqual(rule.enabled, False) + + rule.enabled = 1 + self.assertEqual(rule.enabled, True) + rule.enabled = 0 + self.assertEqual(rule.enabled, False) + + # negative test + self.assertRaises(TypeError, rule.__setattr__, "enabled", None) + self.assertRaises(TypeError, rule.__setattr__, "enabled", []) + self.assertRaises(ValueError, rule.__setattr__, "enabled", "foo") + self.assertRaises(ValueError, rule.__setattr__, "enabled", 5) + + def testRuleElementInRule(self): + users = [ "foo", "bar" ] + user_groups = [ "abc", "def" ] + + # rule should contain empty elements after instantiation + rule = pyhbac.HbacRule("testRuleElement") + self.assertIsInstance(rule.users, pyhbac.HbacRuleElement) + self.assertIsInstance(rule.services, pyhbac.HbacRuleElement) + self.assertIsInstance(rule.targethosts, pyhbac.HbacRuleElement) + self.assertIsInstance(rule.srchosts, pyhbac.HbacRuleElement) + + self.assertIsInstance(rule.users.names, list) + self.assertIsInstance(rule.users.groups, list) + self.assertItemsEqual(rule.users.names, []) + self.assertItemsEqual(rule.users.groups, []) + + # Assign by copying a HbacRuleElement + user_el = pyhbac.HbacRuleElement(names=users, groups=user_groups) + rule = pyhbac.HbacRule("testRuleElement") + rule.users = user_el + self.assertItemsEqual(rule.users.names, users) + self.assertItemsEqual(rule.users.groups, user_groups) + + # Assign directly + rule = pyhbac.HbacRule("testRuleElement") + rule.users.names = users + rule.users.groups = user_groups + self.assertItemsEqual(rule.users.names, users) + self.assertItemsEqual(rule.users.groups, user_groups) + + def testRuleElementInRuleReference(self): + " Test that references to RuleElement are kept even if element goes out of scope " + def _get_rule(): + users = [ "foo", "bar" ] + user_groups = [ "abc", "def" ] + el = pyhbac.HbacRuleElement(names=users, groups=user_groups) + rule = pyhbac.HbacRule("testRuleElement") + rule.users = el + return rule + + rule = _get_rule() + self.assertItemsEqual(rule.users.names, [ "foo", "bar" ]) + self.assertItemsEqual(rule.users.groups, [ "abc", "def" ]) + + def testRepr(self): + r = pyhbac.HbacRule('foo') + self.assertEqual(r.__repr__(), u"<name foo enabled 0 " + "users <category 0 names [] groups []> " + "services <category 0 names [] groups []> " + "targethosts <category 0 names [] groups []> " + "srchosts <category 0 names [] groups []>>") + + name = "someuser" + service = "ssh" + srchost = "host1" + targethost = "host2" + + r.users.names = [ name ] + r.services.names = [ service ] + r.srchosts.names = [ srchost ] + r.targethosts.names = [ targethost ] + + self.assertEqual(r.__repr__(), u"<name foo enabled 0 " + "users <category 0 names [%s] groups []> " + "services <category 0 names [%s] groups []> " + "targethosts <category 0 names [%s] groups []> " + "srchosts <category 0 names [%s] groups []>>" % + (name, service, targethost, srchost)) + + def testValidate(self): + r = pyhbac.HbacRule('valid_rule') + + valid, missing = r.validate() + self.assertEqual(valid, False) + self.assertItemsEqual(missing, ( pyhbac.HBAC_RULE_ELEMENT_USERS, + pyhbac.HBAC_RULE_ELEMENT_SERVICES, + pyhbac.HBAC_RULE_ELEMENT_TARGETHOSTS, + pyhbac.HBAC_RULE_ELEMENT_SOURCEHOSTS )) + + r.users.names = [ "someuser" ] + r.services.names = [ "ssh" ] + + valid, missing = r.validate() + self.assertEqual(valid, False) + self.assertItemsEqual(missing, ( pyhbac.HBAC_RULE_ELEMENT_TARGETHOSTS, + pyhbac.HBAC_RULE_ELEMENT_SOURCEHOSTS )) + + r.srchosts.names = [ "host1" ] + r.targethosts.names = [ "host2" ] + + valid, missing = r.validate() + self.assertEqual(valid, True) + +class PyHbacRequestElementTest(unittest.TestCase): + def testInstantiateEmpty(self): + el = pyhbac.HbacRequestElement() + self.assertItemsEqual(el.name, "") + self.assertItemsEqual(el.groups, []) + + def testInit(self): + name = "foo" + el = pyhbac.HbacRequestElement(name=name) + self.assertItemsEqual(el.name, name) + + groups = [ "abc", "def" ] + el = pyhbac.HbacRequestElement(groups=groups) + self.assertItemsEqual(el.groups, groups) + + def testGetSet(self): + name = "foo" + el = pyhbac.HbacRequestElement() + self.assertItemsEqual(el.name, "") + el.name = name + self.assertItemsEqual(el.name, name) + + groups = [ "abc", "def" ] + el = pyhbac.HbacRequestElement() + self.assertItemsEqual(el.groups, []) + el.groups = groups + self.assertItemsEqual(el.groups, groups) + + # Test other iterables than list + groups = ( "abc", "def" ) + el = pyhbac.HbacRequestElement() + self.assertItemsEqual(el.groups, []) + el.groups = groups + self.assertItemsEqual(el.groups, groups) + + def testGroupsNotIterable(self): + self.assertRaises(TypeError, pyhbac.HbacRequestElement, groups=None) + self.assertRaises(TypeError, pyhbac.HbacRequestElement, groups=123) + + def testRepr(self): + r = pyhbac.HbacRequestElement() + self.assertEqual(r.__repr__(), u"<name groups []>") + + r.name = 'foo' + r.groups = ['bar', 'baz'] + self.assertEqual(r.__repr__(), u"<name foo groups [bar,baz]>") + +class PyHbacRequestTest(unittest.TestCase): + def testRequestElementHandling(self): + name = "req_name" + groups = [ "g1", "g2" ] + + # The request should be empty after instantiation + req = pyhbac.HbacRequest() + self.assertIsInstance(req.user, pyhbac.HbacRequestElement) + self.assertIsInstance(req.service, pyhbac.HbacRequestElement) + self.assertIsInstance(req.targethost, pyhbac.HbacRequestElement) + self.assertIsInstance(req.srchost, pyhbac.HbacRequestElement) + + self.assertEqual(req.user.name, "") + self.assertIsInstance(req.user.groups, list) + self.assertItemsEqual(req.user.groups, []) + + # Assign by copying a HbacRequestElement + user_el = pyhbac.HbacRequestElement(name=name, groups=groups) + req = pyhbac.HbacRequest() + req.user = user_el + self.assertItemsEqual(req.user.name, name) + self.assertItemsEqual(req.user.groups, groups) + + # Assign directly + req = pyhbac.HbacRequest() + req.user.name = name + req.user.groups = groups + self.assertItemsEqual(req.user.name, name) + self.assertItemsEqual(req.user.groups, groups) + + def testRuleName(self): + req = pyhbac.HbacRequest() + self.assertEqual(req.rule_name, None) + # python 2.4 raises TypError, 2.7 raises AttributeError + self.assertRaises((TypeError, AttributeError), req.__setattr__, "rule_name", "foo") + + def testEvaluate(self): + name = "someuser" + service = "ssh" + srchost = "host1" + targethost = "host2" + + allow_rule = pyhbac.HbacRule("allowRule", enabled=True) + allow_rule.users.names = [ name ] + allow_rule.services.names = [ service ] + allow_rule.srchosts.names = [ srchost ] + allow_rule.targethosts.names = [ targethost ] + + req = pyhbac.HbacRequest() + req.user.name = name + req.service.name = service + req.srchost.name = srchost + req.targethost.name = targethost + + # Test that an allow rule on its own allows access + res = req.evaluate((allow_rule,)) + self.assertEqual(res, pyhbac.HBAC_EVAL_ALLOW) + self.assertEqual(req.rule_name, "allowRule") + + # Test that a user not in the rule is not allowed + savename = req.user.name + req.user.name = "someotheruser" + res = req.evaluate((allow_rule, )) + self.assertEqual(res, pyhbac.HBAC_EVAL_DENY) + self.assertEqual(req.rule_name, None) + + # But allows if the rule is an ALL rule + allow_rule.users.category.add(pyhbac.HBAC_CATEGORY_ALL) + res = req.evaluate((allow_rule, )) + self.assertEqual(res, pyhbac.HBAC_EVAL_ALLOW) + + def testRepr(self): + name = "someuser" + service = "ssh" + srchost = "host1" + targethost = "host2" + + req = pyhbac.HbacRequest() + + self.assertEqual(req.__repr__(), "<user <name groups []> " + "service <name groups []> " + "targethost <name groups []> " + "srchost <name groups []>>") + + req.user.name = name + req.service.name = service + req.srchost.name = srchost + req.targethost.name = targethost + + self.assertEqual(req.__repr__(), "<user <name %s groups []> " + "service <name %s groups []> " + "targethost <name %s groups []> " + "srchost <name %s groups []>>" % + (name, service, targethost, srchost)) + + def testEvaluateNegative(self): + name = "someuser" + service = "ssh" + srchost = "host1" + targethost = "host2" + + allow_rule = pyhbac.HbacRule("allowRule", enabled=True) + allow_rule.users.names = [ name ] + allow_rule.services.names = [ service ] + allow_rule.srchosts.names = [ srchost ] + allow_rule.targethosts.names = [ targethost ] + + req = pyhbac.HbacRequest() + req.service.name = service + req.srchost.name = srchost + req.targethost.name = targethost + req.user.name = name + + saveuser = req.user + req.user = None # need to catch this + + # catch invalid category value + savecat = copy.copy(allow_rule.users.category) + allow_rule.users.category.add(pyhbac.HBAC_EVAL_ERROR) + self.assertRaises(ValueError, req.evaluate, (allow_rule,)) + allow_rule.users.category = savecat + + # Test that invalid type is raised + self.assertRaises(TypeError, req.evaluate, (allow_rule,)) + + req.user = saveuser + allow_rule.users = None # need to catch this + self.assertRaises(TypeError, req.evaluate, (allow_rule,)) + + # catch invalid rule type + self.assertRaises(TypeError, req.evaluate, (allow_rule, None)) + +class PyHbacModuleTest(unittest.TestCase): + def testHasResultTypes(self): + assert hasattr(pyhbac, "HBAC_EVAL_ALLOW") + assert hasattr(pyhbac, "HBAC_EVAL_DENY") + assert hasattr(pyhbac, "HBAC_EVAL_ERROR") + + def testHasErrorTypes(self): + assert hasattr(pyhbac, "HBAC_ERROR_UNKNOWN") + assert hasattr(pyhbac, "HBAC_SUCCESS") + assert hasattr(pyhbac, "HBAC_ERROR_NOT_IMPLEMENTED") + assert hasattr(pyhbac, "HBAC_ERROR_OUT_OF_MEMORY") + assert hasattr(pyhbac, "HBAC_ERROR_UNPARSEABLE_RULE") + + def testHasCategories(self): + assert hasattr(pyhbac, "HBAC_CATEGORY_NULL") + assert hasattr(pyhbac, "HBAC_CATEGORY_ALL") + + def testHasRuleElementTypes(self): + assert hasattr(pyhbac, "HBAC_RULE_ELEMENT_USERS") + assert hasattr(pyhbac, "HBAC_RULE_ELEMENT_SERVICES") + assert hasattr(pyhbac, "HBAC_RULE_ELEMENT_TARGETHOSTS") + assert hasattr(pyhbac, "HBAC_RULE_ELEMENT_SOURCEHOSTS") + + def testHbacResultString(self): + results = [ pyhbac.HBAC_EVAL_ALLOW, pyhbac.HBAC_EVAL_DENY, + pyhbac.HBAC_EVAL_ERROR ] + for r in results: + s = pyhbac.hbac_result_string(r) + self.assertIsInstance(s, unicode) + assert len(s) > 0 + + def testHbacErrorString(self): + errors = [ pyhbac.HBAC_ERROR_UNKNOWN, + pyhbac.HBAC_SUCCESS, + pyhbac.HBAC_ERROR_NOT_IMPLEMENTED, + pyhbac.HBAC_ERROR_OUT_OF_MEMORY, + pyhbac.HBAC_ERROR_UNPARSEABLE_RULE ] + for e in errors: + s = pyhbac.hbac_error_string(e) + self.assertIsInstance(s, unicode) + assert len(s) > 0 + + +if __name__ == "__main__": + error = 0 + + suite = unittest.TestLoader().loadTestsFromTestCase(PyHbacImport) + res = unittest.TextTestRunner().run(suite) + if not res.wasSuccessful(): + error |= 0x1 + # need to bail out here because pyhbac could not be imported + sys.exit(error) + + # import the pyhbac module into the global namespace, but make sure it's + # the one in tree + sys.path.insert(0, MODPATH) + import pyhbac + + suite = unittest.TestLoader().loadTestsFromTestCase(PyHbacRuleElementTest) + res = unittest.TextTestRunner().run(suite) + if not res.wasSuccessful(): + error |= 0x2 + + suite = unittest.TestLoader().loadTestsFromTestCase(PyHbacRuleTest) + res = unittest.TextTestRunner().run(suite) + if not res.wasSuccessful(): + error |= 0x3 + + suite = unittest.TestLoader().loadTestsFromTestCase(PyHbacRequestElementTest) + res = unittest.TextTestRunner().run(suite) + if not res.wasSuccessful(): + error |= 0x4 + + suite = unittest.TestLoader().loadTestsFromTestCase(PyHbacRequestTest) + res = unittest.TextTestRunner().run(suite) + if not res.wasSuccessful(): + error |= 0x5 + + suite = unittest.TestLoader().loadTestsFromTestCase(PyHbacModuleTest) + res = unittest.TextTestRunner().run(suite) + if not res.wasSuccessful(): + error |= 0x6 + + sys.exit(error) + diff --git a/src/util/sss_python.c b/src/util/sss_python.c new file mode 100644 index 000000000..19717a55c --- /dev/null +++ b/src/util/sss_python.c @@ -0,0 +1,104 @@ +/* + Authors: + Jakub Hrozek <jhrozek@redhat.com> + + Copyright (C) 2011 Red Hat + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "src/util/sss_python.h" +#include "config.h" + +PyObject * +sss_python_set_new(void) +{ +#ifdef HAVE_PYSET_NEW + return PySet_New(NULL); +#else + return PyObject_CallObject((PyObject *) &PySet_Type, NULL); +#endif +} + +int +sss_python_set_add(PyObject *set, PyObject *key) +{ +#ifdef HAVE_PYSET_ADD + return PySet_Add(set, key); +#else + PyObject *pyret; + int ret; + + pyret = PyObject_CallMethod(set, sss_py_const_p(char, "add"), + sss_py_const_p(char, "O"), key); + ret = (pyret == NULL) ? -1 : 0; + Py_XDECREF(pyret); + return ret; +#endif +} + +bool +sss_python_set_check(PyObject *set) +{ +#if HAVE_DECL_PYSET_CHECK + return PySet_Check(set); +#else + return PyObject_TypeCheck(set, &PySet_Type); +#endif +} + +PyObject * +sss_python_unicode_from_string(const char *u) +{ +#ifdef HAVE_PYUNICODE_FROMSTRING + return PyUnicode_FromString(u); +#else + return PyUnicode_DecodeUTF8(u, strlen(u), NULL); +#endif +} + +PyObject * +sss_exception_with_doc(char *name, char *doc, PyObject *base, PyObject *dict) +{ +#ifdef HAVE_PYERR_NEWEXCEPTIONWITHDOC + return PyErr_NewExceptionWithDoc(name, doc, base, dict); +#else + int result; + PyObject *ret = NULL; + PyObject *mydict = NULL; /* points to the dict only if we create it */ + PyObject *docobj; + + if (dict == NULL) { + dict = mydict = PyDict_New(); + if (dict == NULL) { + return NULL; + } + } + + if (doc != NULL) { + docobj = PyString_FromString(doc); + if (docobj == NULL) + goto failure; + result = PyDict_SetItemString(dict, "__doc__", docobj); + Py_DECREF(docobj); + if (result < 0) + goto failure; + } + + ret = PyErr_NewException(name, base, dict); + failure: + Py_XDECREF(mydict); + return ret; +#endif +} diff --git a/src/util/sss_python.h b/src/util/sss_python.h new file mode 100644 index 000000000..135c28771 --- /dev/null +++ b/src/util/sss_python.h @@ -0,0 +1,63 @@ +#ifndef __SSS_PYTHON_H__ +#define __SSS_PYTHON_H__ + +#include <Python.h> +#include <stdbool.h> +#include "util/util.h" + +#if PY_VERSION_HEX < 0x02050000 +#define sss_py_const_p(type, value) discard_const_p(type, (value)) +#else +#define sss_py_const_p(type, value) (value) +#endif + +/* Py_ssize_t compatibility for python < 2.5 as per + * http://www.python.org/dev/peps/pep-0353/ */ +#ifndef HAVE_PY_SSIZE_T +typedef int Py_ssize_t; +#endif + +#ifndef PY_SSIZE_T_MAX +#define PY_SSIZE_T_MAX INT_MAX +#endif + +#ifndef PY_SSIZE_T_MIN +#define PY_SSIZE_T_MIN INT_MIN +#endif + +/* Wrappers providing the subset of C API for python's set objects we use */ +PyObject *sss_python_set_new(void); +int sss_python_set_add(PyObject *set, PyObject *key); +bool sss_python_set_check(PyObject *set); + +/* Unicode compatibility */ +PyObject *sss_python_unicode_from_string(const char *u); + +/* Exceptions compatibility */ +PyObject * +sss_exception_with_doc(char *name, char *doc, PyObject *base, PyObject *dict); + +/* PyModule_AddIntMacro() compatibility */ +#if !HAVE_DECL_PYMODULE_ADDINTMACRO +#define PyModule_AddIntMacro(m, c) PyModule_AddIntConstant(m, sss_py_const_p(char, #c), c) +#endif + +/* Convenience macros */ +#define TYPE_READY(module, type, name) do { \ + if (PyType_Ready(&type) < 0) \ + return; \ + Py_INCREF(&type); \ + PyModule_AddObject(module, \ + discard_const_p(char, name), \ + (PyObject *) &type); \ +} while(0); \ + +#define SAFE_SET(old, new) do { \ + PyObject *__simple_set_tmp = NULL; \ + __simple_set_tmp = old; \ + Py_INCREF(new); \ + old = new; \ + Py_XDECREF(__simple_set_tmp); \ +} while(0); + +#endif /* __SSS_PYTHON_H__ */ |