summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPavel Zuna <pzuna@redhat.com>2009-09-30 16:24:25 +0200
committerRob Crittenden <rcritten@redhat.com>2009-10-05 15:55:27 -0400
commita6eb928f9871700d4c749e6fb1a8161940dda02b (patch)
treeac8022a7a8d0c815433cce91318bb74aa953e50c
parentdac224c25a2ff8a1400d0a746f600f81cfad6901 (diff)
downloadfreeipa-a6eb928f9871700d4c749e6fb1a8161940dda02b.tar.gz
freeipa-a6eb928f9871700d4c749e6fb1a8161940dda02b.tar.xz
freeipa-a6eb928f9871700d4c749e6fb1a8161940dda02b.zip
Add HBAC plugin and introduce GeneralizedTime parameter type.
-rw-r--r--install/share/bootstrap-template.ldif6
-rw-r--r--ipalib/__init__.py2
-rw-r--r--ipalib/parameters.py166
-rw-r--r--ipalib/plugins/hbac.py260
-rw-r--r--tests/test_xmlrpc/test_hbac_plugin.py305
5 files changed, 738 insertions, 1 deletions
diff --git a/install/share/bootstrap-template.ldif b/install/share/bootstrap-template.ldif
index 4c6e5575d..45d989f92 100644
--- a/install/share/bootstrap-template.ldif
+++ b/install/share/bootstrap-template.ldif
@@ -34,6 +34,12 @@ objectClass: top
objectClass: nsContainer
cn: computers
+dn: cn=hbac,$SUFFIX
+changetype: add
+objectClass: top
+objectClass: nsContainer
+cn: hbac
+
dn: cn=etc,$SUFFIX
changetype: add
objectClass: nsContainer
diff --git a/ipalib/__init__.py b/ipalib/__init__.py
index 844f5b468..3b177daa7 100644
--- a/ipalib/__init__.py
+++ b/ipalib/__init__.py
@@ -870,7 +870,7 @@ from frontend import Command, LocalOrRemote, Application
from frontend import Object, Method, Property
from crud import Create, Retrieve, Update, Delete, Search
from parameters import DefaultFrom, Bool, Flag, Int, Float, Bytes, Str, Password,List
-from parameters import BytesEnum, StrEnum
+from parameters import BytesEnum, StrEnum, GeneralizedTime
from errors import SkipPluginModule
# We can't import the python uuid since it includes ctypes which makes
diff --git a/ipalib/parameters.py b/ipalib/parameters.py
index f05c1ed48..ef440c727 100644
--- a/ipalib/parameters.py
+++ b/ipalib/parameters.py
@@ -1201,6 +1201,172 @@ class List(Param):
def _validate_scalar(self, value, index=None):
return
+
+class GeneralizedTime(Str):
+ """
+ Generalized time parameter type.
+
+ Accepts values conforming to generalizedTime as defined in RFC 4517
+ section 3.3.13 without time zone information.
+ """
+ def _check_HHMM(self, t):
+ if len(t) != 4:
+ raise ValueError('HHMM must be exactly 4 characters long')
+ if not t.isnumeric():
+ raise ValueError('HHMM non-numeric')
+ hh = int(t[0:2])
+ if hh < 0 or hh > 23:
+ raise ValueError('HH out of range')
+ mm = int(t[2:4])
+ if mm < 0 or mm > 59:
+ raise ValueError('MM out of range')
+
+ def _check_dotw(self, t):
+ if t.isnumeric():
+ value = int(t)
+ if value < 1 or value > 7:
+ raise ValueError('day of the week out of range')
+ elif t not in ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'):
+ raise ValueError('invalid day of the week')
+
+ def _check_dotm(self, t, month_num=1, year=4):
+ if not t.isnumeric():
+ raise ValueError('day of the month non-numeric')
+ value = int(t)
+ if month_num in (1, 3, 5, 7, 8, 10, 12):
+ if value < 1 or value > 31:
+ raise ValueError('day of the month out of range')
+ elif month_num in (4, 6, 9, 11):
+ if value < 1 or value > 30:
+ raise ValueError('day of the month out of range')
+ elif month_num == 2:
+ if year % 4 == 0 and (year % 100 != 0 or year % 400 == 0):
+ if value < 1 or value > 29:
+ raise ValueError('day of the month out of range')
+ else:
+ if value < 1 or value > 28:
+ raise ValueError('day of the month out of range')
+
+ def _check_wotm(self, t):
+ if not t.isnumeric():
+ raise ValueError('week of the month non-numeric')
+ value = int(t)
+ if value < 1 or value > 4:
+ raise ValueError('week of the month out of range')
+
+ def _check_woty(self, t):
+ if not t.isnumeric():
+ raise ValueError('week of the year non-numeric')
+ value = int(t)
+ if value < 1 or value > 52:
+ raise ValueError('week of the year out of range')
+
+ def _check_month_num(self, t):
+ if not t.isnumeric():
+ raise ValueError('month number non-numeric')
+ value = int(t)
+ if value < 1 or value > 12:
+ raise ValueError('month number out of range')
+
+ def _check_interval(self, t, check_func):
+ intervals = t.split(',')
+ for i in intervals:
+ if not i:
+ raise ValueError('invalid time range')
+ values = i.split('-')
+ if len(values) > 2:
+ raise ValueError('invalid time range')
+ for v in values:
+ check_func(v)
+ if len(values) == 2:
+ if int(v[0]) > int(v[1]):
+ raise ValueError('invalid time range')
+
+ def _check_W_spec(self, ts, index):
+ if ts[index] != 'day':
+ raise ValueError('invalid week specifier')
+ index += 1
+ self._check_interval(ts[index], self._check_dotw)
+ return index
+
+ def _check_M_spec(self, ts, index):
+ if ts[index] == 'week':
+ self._check_interval(ts[index + 1], self._check_wotm)
+ index = self._check_W_spec(ts, index + 2)
+ elif ts[index] == 'day':
+ index += 1
+ self._check_interval(ts[index], self._check_dotm)
+ else:
+ raise ValueError('invalid month specifier')
+ return index
+
+ def _check_Y_spec(self, ts, index):
+ if ts[index] == 'month':
+ index += 1
+ self._check_interval(ts[index], self._check_month_num)
+ month_num = int(ts[index])
+ index = self._check_M_spec(ts, index + 1, month_num)
+ elif ts[index] == 'week':
+ self._check_interval(ts[index + 1], self._check_woty)
+ index = self._check_W_spec(ts, index + 2)
+ elif ts[index] == 'day':
+ index += 1
+ self._check_interval(ts[index], self._check_doty)
+ else:
+ raise ValueError('invalid year specifier')
+ return index
+
+ def _check_generalized(self, t):
+ if len(t) not in (10, 12, 14):
+ raise ValueError('incomplete generalized time')
+ if not t.isnumeric():
+ raise ValueError('generalized time non-numeric')
+ # don't check year value, with time travel and all :)
+ self._check_month_num(t[4:6])
+ year_num = int(t[0:4])
+ month_num = int(t[4:6])
+ self._check_dotm(t[6:8], month_num, year_num)
+ if len(t) >= 12:
+ self._check_HHMM(t[8:12])
+ else:
+ self._check_HHMM('%s00' % t[8:10])
+ if len(t) == 14:
+ s = int(t[12:14])
+ if s < 0 or s > 60:
+ raise ValueError('seconds out of range')
+
+ def _check(self, time):
+ ts = time.split()
+ if ts[0] == 'absolute':
+ self._check_generalized(ts[1])
+ if ts[2] != '~':
+ raise ValueError('invalid time range separator')
+ self._check_generalized(ts[3])
+ if int(ts[1]) >= int(ts[3]):
+ raise ValueError('invalid generalized time range')
+ elif ts[0] == 'periodic':
+ if ts[1] == 'yearly':
+ index = self._check_Y_spec(ts, 2)
+ elif ts[1] == 'monthly':
+ index = self._check_M_spec(ts, 2)
+ elif ts[1] == 'daily':
+ index = 1
+ self._check_interval(ts[index + 1], self._check_HHMM)
+ else:
+ raise ValueError('time neither absolute or periodic')
+
+ def _rule_required(self, _, value):
+ try:
+ self._check(value)
+ except ValueError, e:
+ raise ValidationError(name=self.cli_name, error=e.message)
+ except IndexError:
+ raise ValidationError(
+ name=self.cli_name, errors='incomplete time value'
+ )
+ return None
+
+
def create_param(spec):
"""
Create an `Str` instance from the shorthand ``spec``.
diff --git a/ipalib/plugins/hbac.py b/ipalib/plugins/hbac.py
new file mode 100644
index 000000000..16a93d28c
--- /dev/null
+++ b/ipalib/plugins/hbac.py
@@ -0,0 +1,260 @@
+# Authors:
+# Pavel Zuna <pzuna@redhat.com>
+#
+# Copyright (C) 2009 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+"""
+Host based access control
+"""
+
+from ipalib import api, errors
+from ipalib import GeneralizedTime, Password, Str, StrEnum
+from ipalib.plugins.baseldap import *
+
+class hbac(LDAPObject):
+ """
+ HBAC object.
+ """
+ container_dn = api.env.container_hbac
+ object_name = 'HBAC rule'
+ object_name_plural = 'HBAC rules'
+ object_class = ['ipaassociation', 'ipahbacrule']
+ default_attributes = [
+ 'cn', 'accessruletype', 'ipaenabledflag', 'servicename',
+ 'accesstime', 'description',
+
+ ]
+ uuid_attribute = 'ipauniqueid'
+ attribute_names = {
+ 'cn': 'name',
+ 'accessruletype': 'type',
+ 'ipaenabledflag': 'status',
+ 'servicename': 'service',
+ 'ipauniqueid': 'unique id',
+ 'memberuser user': 'affected users',
+ 'memberuser group': 'affected groups',
+ 'memberhost host': 'affected hosts',
+ 'memberhost hostgroup': 'affected hostgroups',
+ 'sourcehost host': 'affected source hosts',
+ 'sourcehost hostgroup': 'affected source hostgroups',
+ }
+ attribute_order = ['cn', 'accessruletype', 'ipaenabledflag', 'servicename']
+ attribute_members = {
+ 'memberuser': ['user', 'group'],
+ 'memberhost': ['host', 'hostgroup'],
+ 'sourcehost': ['host', 'hostgroup'],
+ }
+
+ takes_params = (
+ Str('cn',
+ cli_name='name',
+ doc='rule name',
+ primary_key=True,
+ ),
+ StrEnum('accessruletype',
+ cli_name='type',
+ doc='rule type (allow or deny)',
+ values=(u'allow', u'deny'),
+ ),
+ Str('servicename?',
+ cli_name='service',
+ doc='name of service the rule applies to (e.g. ssh)',
+ ),
+ GeneralizedTime('accesstime?',
+ cli_name='time',
+ doc='access time in generalizedTime format (RFC 4517)',
+ ),
+ Str('description?',
+ cli_name='desc',
+ doc='description',
+ ),
+ )
+
+ def get_dn(self, *keys, **kwargs):
+ try:
+ (dn, entry_attrs) = self.backend.find_entry_by_attr(
+ self.primary_key.name, keys[-1], self.object_class, [''],
+ self.container_dn
+ )
+ except errors.NotFound:
+ dn = super(hbac, self).get_dn(*keys, **kwargs)
+ return dn
+
+ def get_primary_key_from_dn(self, dn):
+ pkey = self.primary_key.name
+ (dn, entry_attrs) = self.backend.get_entry(dn, [pkey])
+ return entry_attrs.get(pkey, '')
+
+api.register(hbac)
+
+
+class hbac_add(LDAPCreate):
+ """
+ Create new HBAC rule.
+ """
+ def pre_callback(self, ldap, dn, entry_attrs, *keys, **options):
+ if not dn.startswith('cn='):
+ msg = 'HBAC rule with name "%s" already exists' % keys[-1]
+ raise errors.DuplicateEntry(message=msg)
+ # HBAC rules are enabled by default
+ entry_attrs['ipaenabledflag'] = 'enabled'
+ return ldap.make_dn(
+ entry_attrs, self.obj.uuid_attribute, self.obj.container_dn
+ )
+
+api.register(hbac_add)
+
+
+class hbac_del(LDAPDelete):
+ """
+ Delete HBAC rule.
+ """
+
+api.register(hbac_del)
+
+
+class hbac_mod(LDAPUpdate):
+ """
+ Modify HBAC rule.
+ """
+
+api.register(hbac_mod)
+
+
+class hbac_find(LDAPSearch):
+ """
+ Search for HBAC rules.
+ """
+
+api.register(hbac_find)
+
+
+class hbac_show(LDAPRetrieve):
+ """
+ Dispaly HBAC rule.
+ """
+
+api.register(hbac_show)
+
+
+class hbac_enable(LDAPQuery):
+ """
+ Enable HBAC rule.
+ """
+ def execute(self, cn):
+ ldap = self.obj.backend
+
+ dn = self.obj.get_dn(cn)
+ entry_attrs = {'ipaenabledflag': 'enabled'}
+
+ try:
+ ldap.update_entry(dn, entry_attrs)
+ except errors.EmptyModlist:
+ pass
+
+ return True
+
+ def output_for_cli(self, textui, result, cn):
+ textui.print_name(self.name)
+ textui.print_dashed('Enabled HBAC rule "%s".' % cn)
+
+api.register(hbac_enable)
+
+
+class hbac_disable(LDAPQuery):
+ """
+ Disable HBAC rule.
+ """
+ def execute(self, cn):
+ ldap = self.obj.backend
+
+ dn = self.obj.get_dn(cn)
+ entry_attrs = {'ipaenabledflag': 'disabled'}
+
+ try:
+ ldap.update_entry(dn, entry_attrs)
+ except errors.EmptyModlist:
+ pass
+
+ return True
+
+ def output_for_cli(self, textui, result, cn):
+ textui.print_name(self.name)
+ textui.print_dashed('Disabled HBAC rule "%s".' % cn)
+
+api.register(hbac_disable)
+
+
+class hbac_add_user(LDAPAddMember):
+ """
+ Add users and groups affected by HBAC rule.
+ """
+ member_attributes = ['memberuser']
+ member_count_out = ('%i object added.', '%i objects added.')
+
+api.register(hbac_add_user)
+
+
+class hbac_remove_user(LDAPRemoveMember):
+ """
+ Remove users and groups affected by HBAC rule.
+ """
+ member_attributes = ['memberuser']
+ member_count_out = ('%i object removed.', '%i objects removed.')
+
+api.register(hbac_remove_user)
+
+
+class hbac_add_host(LDAPAddMember):
+ """
+ Add hosts and hostgroups affected by HBAC rule.
+ """
+ member_attributes = ['memberhost']
+ member_count_out = ('%i object added.', '%i objects added.')
+
+api.register(hbac_add_host)
+
+
+class hbac_remove_host(LDAPRemoveMember):
+ """
+ Remove hosts and hostgroups affected by HBAC rule.
+ """
+ member_attributes = ['memberhost']
+ member_count_out = ('%i object removed.', '%i objects removed.')
+
+api.register(hbac_remove_host)
+
+
+class hbac_add_sourcehost(LDAPAddMember):
+ """
+ Add source hosts and hostgroups affected by HBAC rule.
+ """
+ member_attributes = ['sourcehost']
+ member_count_out = ('%i object added.', '%i objects added.')
+
+api.register(hbac_add_sourcehost)
+
+
+class hbac_remove_sourcehost(LDAPRemoveMember):
+ """
+ Remove source hosts and hostgroups affected by HBAC rule.
+ """
+ member_attributes = ['sourcehost']
+ member_count_out = ('%i object removed.', '%i objects removed.')
+
+api.register(hbac_remove_sourcehost)
+
+
diff --git a/tests/test_xmlrpc/test_hbac_plugin.py b/tests/test_xmlrpc/test_hbac_plugin.py
new file mode 100644
index 000000000..caa916acf
--- /dev/null
+++ b/tests/test_xmlrpc/test_hbac_plugin.py
@@ -0,0 +1,305 @@
+# Authors:
+# Pavel Zuna <pzuna@redhat.com>
+#
+# Copyright (C) 2009 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+"""
+Test the `ipalib/plugins/hbac.py` module.
+"""
+
+from xmlrpc_test import XMLRPC_test, assert_attr_equal
+from ipalib import api
+from ipalib import errors
+
+
+class test_hbac(XMLRPC_test):
+ """
+ Test the `hbac` plugin.
+ """
+ rule_name = u'testing_rule1234'
+ rule_type = u'allow'
+ rule_type_fail = u'value not allowed'
+ rule_service = u'ssh'
+ rule_time = u'absolute 20081010000000 ~ 20081015120000'
+ # wrong time, has 30th day in February in first date
+ rule_time_fail = u'absolute 20080230000000 ~ 20081015120000'
+ rule_desc = u'description'
+ rule_desc_mod = u'description modified'
+
+ test_user = u'hbac_test_user'
+ test_group = u'hbac_test_group'
+ test_host = u'hbac._test_netgroup'
+ test_hostgroup = u'hbac_test_hostgroup'
+ test_sourcehost = u'hbac._test_src_host'
+ test_sourcehostgroup = u'hbac_test_src_hostgroup'
+
+ def test_0_hbac_add(self):
+ """
+ Test adding a new HBAC rule using `xmlrpc.hbac_add`.
+ """
+ (dn, res) = api.Command['hbac_add'](
+ self.rule_name, accessruletype=self.rule_type,
+ servicename=self.rule_service, accesstime=self.rule_time,
+ description=self.rule_desc
+ )
+ assert res
+ assert_attr_equal(res, 'cn', self.rule_name)
+ assert_attr_equal(res, 'accessruletype', self.rule_type)
+ assert_attr_equal(res, 'servicename', self.rule_service)
+ assert_attr_equal(res, 'ipaenabledflag', 'enabled')
+ assert_attr_equal(res, 'accesstime', self.rule_time)
+ assert_attr_equal(res, 'description', self.rule_desc)
+
+ def test_1_hbac_add(self):
+ """
+ Test adding an existing HBAC rule using `xmlrpc.hbac_add'.
+ """
+ try:
+ (dn, res) = api.Command['hbac_add'](
+ self.rule_name, accessruletype=self.rule_type
+ )
+ except errors.DuplicateEntry:
+ pass
+ else:
+ assert False
+
+ def test_2_hbac_show(self):
+ """
+ Test displaying a HBAC rule using `xmlrpc.hbac_show`.
+ """
+ (dn, res) = api.Command['hbac_show'](self.rule_name)
+ assert res
+ assert_attr_equal(res, 'cn', self.rule_name)
+ assert_attr_equal(res, 'accessruletype', self.rule_type)
+ assert_attr_equal(res, 'servicename', self.rule_service)
+ assert_attr_equal(res, 'ipaenabledflag', 'enabled')
+ assert_attr_equal(res, 'accesstime', self.rule_time)
+ assert_attr_equal(res, 'description', self.rule_desc)
+
+ def test_3_hbac_mod(self):
+ """
+ Test modifying a HBAC rule using `xmlrpc.hbac_mod`.
+ """
+ (dn, res) = api.Command['hbac_mod'](
+ self.rule_name, description=self.rule_desc_mod
+ )
+ assert res
+ assert_attr_equal(res, 'description', self.rule_desc_mod)
+
+ def test_4_hbac_mod(self):
+ """
+ Test setting invalid type of HBAC rule using `xmlrpc.hbac_mod`.
+ """
+ try:
+ (dn, res) = api.Command['hbac_mod'](
+ self.rule_name, accessruletype=self.rule_type_fail
+ )
+ except errors.ValidationError:
+ pass
+ else:
+ assert False
+
+ def test_5_hbac_mod(self):
+ """
+ Test setting invalid time in HBAC rule using `xmlrpc.hbac_mod`.
+ """
+ try:
+ (dn, res) = api.Command['hbac_mod'](
+ self.rule_name, accesstime=self.rule_time_fail
+ )
+ except errors.ValidationError:
+ pass
+ else:
+ assert False
+
+ def test_6_hbac_find(self):
+ """
+ Test searching for HBAC rules using `xmlrpc.hbac_find`.
+ """
+ (res, truncated) = api.Command['hbac_find'](
+ name=self.rule_name, accessruletype=self.rule_type,
+ description=self.rule_desc_mod
+ )
+ assert res
+ assert res[0]
+ assert_attr_equal(res[0][1], 'cn', self.rule_name)
+ assert_attr_equal(res[0][1], 'accessruletype', self.rule_type)
+ assert_attr_equal(res[0][1], 'description', self.rule_desc_mod)
+
+ def test_7_hbac_init_testing_data(self):
+ """
+ Initialize data for more HBAC plugin testing.
+ """
+ api.Command['user_add'](self.test_user, givenname=u'first', sn=u'last')
+ api.Command['group_add'](self.test_group, description=u'description')
+ api.Command['host_add'](self.test_host)
+ api.Command['hostgroup_add'](
+ self.test_hostgroup, description=u'description'
+ )
+ api.Command['host_add'](self.test_sourcehost)
+ api.Command['hostgroup_add'](
+ self.test_sourcehostgroup, description=u'desc'
+ )
+
+ def test_8_hbac_add_user(self):
+ """
+ Test adding user and group to HBAC rule using `xmlrpc.hbac_add_user`.
+ """
+ (completed, failed, res) = api.Command['hbac_add_user'](
+ self.rule_name, user=self.test_user, group=self.test_group
+ )
+ assert completed == 2
+ assert 'memberuser' in failed
+ assert 'user' in failed['memberuser']
+ assert not failed['memberuser']['user']
+ assert 'group' in failed['memberuser']
+ assert not failed['memberuser']['group']
+ assert res
+ assert_attr_equal(res[1], 'memberuser user', self.test_user)
+ assert_attr_equal(res[1], 'memberuser group', self.test_group)
+
+ def test_9_hbac_remove_user(self):
+ """
+ Test removing user and group from HBAC rule using `xmlrpc.hbac_remove_user'.
+ """
+ (completed, failed, res) = api.Command['hbac_remove_user'](
+ self.rule_name, user=self.test_user, group=self.test_group
+ )
+ assert completed == 2
+ assert 'memberuser' in failed
+ assert 'user' in failed['memberuser']
+ assert not failed['memberuser']['user']
+ assert 'group' in failed['memberuser']
+ assert not failed['memberuser']['group']
+ assert res
+ assert 'memberuser user' not in res[1]
+ assert 'memberuser group' not in res[1]
+
+ def test_a_hbac_add_host(self):
+ """
+ Test adding host and hostgroup to HBAC rule using `xmlrpc.hbac_add_host`.
+ """
+ (completed, failed, res) = api.Command['hbac_add_host'](
+ self.rule_name, host=self.test_host, hostgroup=self.test_hostgroup
+ )
+ assert completed == 2
+ assert 'memberhost' in failed
+ assert 'host' in failed['memberhost']
+ assert not failed['memberhost']['host']
+ assert 'hostgroup' in failed['memberhost']
+ assert not failed['memberhost']['hostgroup']
+ assert res
+ assert_attr_equal(res[1], 'memberhost host', self.test_host)
+ assert_attr_equal(res[1], 'memberhost hostgroup', self.test_hostgroup)
+
+ def test_b_hbac_remove_host(self):
+ """
+ Test removing host and hostgroup from HBAC rule using `xmlrpc.hbac_remove_host`.
+ """
+ (completed, failed, res) = api.Command['hbac_remove_host'](
+ self.rule_name, host=self.test_host, hostgroup=self.test_hostgroup
+ )
+ assert completed == 2
+ assert 'memberhost' in failed
+ assert 'host' in failed['memberhost']
+ assert not failed['memberhost']['host']
+ assert 'hostgroup' in failed['memberhost']
+ assert not failed['memberhost']['hostgroup']
+ assert res
+ assert 'memberhost host' not in res[1]
+ assert 'memberhost hostgroup' not in res[1]
+
+ def test_a_hbac_add_sourcehost(self):
+ """
+ Test adding source host and hostgroup to HBAC rule using `xmlrpc.hbac_add_host`.
+ """
+ (completed, failed, res) = api.Command['hbac_add_sourcehost'](
+ self.rule_name, host=self.test_host, hostgroup=self.test_hostgroup
+ )
+ assert completed == 2
+ assert 'sourcehost' in failed
+ assert 'host' in failed['sourcehost']
+ assert not failed['sourcehost']['host']
+ assert 'hostgroup' in failed['sourcehost']
+ assert not failed['sourcehost']['hostgroup']
+ assert res
+ assert_attr_equal(res[1], 'sourcehost host', self.test_host)
+ assert_attr_equal(res[1], 'sourcehost hostgroup', self.test_hostgroup)
+
+ def test_b_hbac_remove_host(self):
+ """
+ Test removing source host and hostgroup from HBAC rule using `xmlrpc.hbac_remove_host`.
+ """
+ (completed, failed, res) = api.Command['hbac_remove_sourcehost'](
+ self.rule_name, host=self.test_host, hostgroup=self.test_hostgroup
+ )
+ assert completed == 2
+ assert 'sourcehost' in failed
+ assert 'host' in failed['sourcehost']
+ assert not failed['sourcehost']['host']
+ assert 'hostgroup' in failed['sourcehost']
+ assert not failed['sourcehost']['hostgroup']
+ assert res
+ assert 'sourcehost host' not in res[1]
+ assert 'sourcehost hostgroup' not in res[1]
+
+ def test_c_hbac_clear_testing_data(self):
+ """
+ Clear data for HBAC plugin testing.
+ """
+ api.Command['user_del'](self.test_user)
+ api.Command['group_del'](self.test_group)
+ api.Command['host_del'](self.test_host)
+ api.Command['hostgroup_del'](self.test_hostgroup)
+ api.Command['host_del'](self.test_sourcehost)
+ api.Command['hostgroup_del'](self.test_sourcehostgroup)
+
+ def test_d_hbac_disable(self):
+ """
+ Test disabling HBAC rule using `xmlrpc.hbac_disable`.
+ """
+ res = api.Command['hbac_disable'](self.rule_name)
+ assert res == True
+ # check it's really disabled
+ (dn, res) = api.Command['hbac_show'](self.rule_name)
+ assert res
+ assert_attr_equal(res, 'ipaenabledflag', 'disabled')
+
+ def test_e_hbac_enabled(self):
+ """
+ Test enabling HBAC rule using `xmlrpc.hbac_enable`.
+ """
+ res = api.Command['hbac_enable'](self.rule_name)
+ assert res == True
+ # check it's really enabled
+ (dn, res) = api.Command['hbac_show'](self.rule_name)
+ assert res
+ assert_attr_equal(res, 'ipaenabledflag', 'enabled')
+
+ def test_f_hbac_del(self):
+ """
+ Test deleting a HBAC rule using `xmlrpc.hbac_remove_sourcehost`.
+ """
+ res = api.Command['hbac_del'](self.rule_name)
+ assert res == True
+ # verify that it's gone
+ try:
+ api.Command['hbac_show'](self.rule_name)
+ except errors.NotFound:
+ pass
+ else:
+ assert False
+