summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
authorKevin L. Mitchell <kevin.mitchell@rackspace.com>2012-10-09 13:23:26 +0100
committerMark McLoughlin <markmc@redhat.com>2012-10-09 21:16:03 +0100
commit451dd68b2d9194f4612074412acedfad800e799c (patch)
treeccba7e8b07f574fe8a6ebe9ec1168e2a4b198914 /tests
parent355ca27471218012a7604b751b1ffbbd2d001f05 (diff)
downloadoslo-451dd68b2d9194f4612074412acedfad800e799c.tar.gz
oslo-451dd68b2d9194f4612074412acedfad800e799c.tar.xz
oslo-451dd68b2d9194f4612074412acedfad800e799c.zip
Rewrite the policy engine from scratch
Implements blueprint fine-grained-policy Complete rewrite of the policy engine, with careful thought given to backwards-compatibility. Policy rules are now represented internally by a tree of Check objects. A new API is added for parsing and enforcing rules: set_rules(Rules.load_json(data, default_rule=...)) result = check(rule, target, credentials) reset() The old Brain/enforce API is deprecated and will be removed soon. A new API is also added for registering new check types: @register("key") class KeyCheck(Check): def __call__(self, target, creds): return self.match in creds Support for using functions as check types is deprecated and will also be removed soon. Change-Id: I2951a0de3751bd2ec868e7a661070fed624e4af2
Diffstat (limited to 'tests')
-rw-r--r--tests/unit/test_policy.py757
1 files changed, 452 insertions, 305 deletions
diff --git a/tests/unit/test_policy.py b/tests/unit/test_policy.py
index 5d77a6f..7ef9c85 100644
--- a/tests/unit/test_policy.py
+++ b/tests/unit/test_policy.py
@@ -15,7 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""Test of Policy Engine For Nova"""
+"""Test of Policy Engine"""
import os.path
import StringIO
@@ -29,116 +29,180 @@ from openstack.common import jsonutils
from openstack.common import policy
+class TestException(Exception):
+ def __init__(self, *args, **kwargs):
+ self.args = args
+ self.kwargs = kwargs
+
+
+class RulesTestCase(unittest.TestCase):
+ def test_init_basic(self):
+ rules = policy.Rules()
+
+ self.assertEqual(rules, {})
+ self.assertEqual(rules.default_rule, None)
+
+ def test_init(self):
+ rules = policy.Rules(dict(a=1, b=2, c=3), 'a')
+
+ self.assertEqual(rules, dict(a=1, b=2, c=3))
+ self.assertEqual(rules.default_rule, 'a')
+
+ def test_no_default(self):
+ rules = policy.Rules(dict(a=1, b=2, c=3))
+
+ self.assertRaises(KeyError, lambda: rules['d'])
+
+ def test_missing_default(self):
+ rules = policy.Rules(dict(a=1, c=3), 'b')
+
+ self.assertRaises(KeyError, lambda: rules['d'])
+
+ def test_with_default(self):
+ rules = policy.Rules(dict(a=1, b=2, c=3), 'b')
+
+ self.assertEqual(rules['d'], 2)
+
+ def test_retrieval(self):
+ rules = policy.Rules(dict(a=1, b=2, c=3), 'b')
+
+ self.assertEqual(rules['a'], 1)
+ self.assertEqual(rules['b'], 2)
+ self.assertEqual(rules['c'], 3)
+
+ @mock.patch.object(policy, 'parse_rule', lambda x: x)
+ def test_load_json(self):
+ exemplar = """{
+ "admin_or_owner": [["role:admin"], ["project_id:%(project_id)s"]],
+ "default": []
+}"""
+ rules = policy.Rules.load_json(exemplar, 'default')
+
+ self.assertEqual(rules.default_rule, 'default')
+ self.assertEqual(rules, dict(
+ admin_or_owner=[["role:admin"], ["project_id:%(project_id)s"]],
+ default=[],
+ ))
+
+ def test_str_true(self):
+ exemplar = """{
+ "admin_or_owner": ""
+}"""
+ rules = policy.Rules(dict(
+ admin_or_owner=policy.TrueCheck(),
+ ))
+
+ self.assertEqual(str(rules), exemplar)
+
+
class PolicySetAndResetTestCase(unittest.TestCase):
def tearDown(self):
- # Make sure the policy brain is reset for remaining tests
- policy._BRAIN = None
+ # Make sure the policy rules are reset for remaining tests
+ policy._rules = None
+
+ def test_set_rules(self):
+ # Make sure the rules are set properly
+ policy._rules = None
+ policy.set_rules('spam')
+ self.assertEqual(policy._rules, 'spam')
def test_set_brain(self):
# Make sure the brain is set properly
- policy._BRAIN = None
+ policy._rules = None
policy.set_brain('spam')
- self.assertEqual(policy._BRAIN, 'spam')
+ self.assertEqual(policy._rules, 'spam')
def test_reset(self):
- # Make sure the brain is set to something
- policy._BRAIN = 'spam'
+ # Make sure the rules are reset
+ policy._rules = 'spam'
policy.reset()
- self.assertEqual(policy._BRAIN, None)
+ self.assertEqual(policy._rules, None)
-class FakeBrain(object):
- check_result = True
+class FakeCheck(policy.BaseCheck):
+ def __init__(self, result=None):
+ self.result = result
- def check(self, match_list, target_dict, credentials_dict):
- self.match_list = match_list
- self.target_dict = target_dict
- self.credentials_dict = credentials_dict
- return self.check_result
+ def __str__(self):
+ return self.result
+ def __call__(self, target, creds):
+ if self.result is not None:
+ return self.result
+ return (target, creds)
-class PolicyEnforceTestCase(unittest.TestCase):
- def setUp(self):
- self.fake_brain = FakeBrain()
- policy._BRAIN = self.fake_brain
+class CheckFunctionTestCase(unittest.TestCase):
def tearDown(self):
- policy.reset()
+ # Make sure the policy rules are reset for remaining tests
+ policy._rules = None
- def check_args(self, match_list, target_dict, credentials_dict):
- self.assertEqual(self.fake_brain.match_list, match_list)
- self.assertEqual(self.fake_brain.target_dict, target_dict)
- self.assertEqual(self.fake_brain.credentials_dict, credentials_dict)
+ def test_check_explicit(self):
+ policy._rules = None
+ rule = FakeCheck()
+ result = policy.check(rule, "target", "creds")
- def test_make_new_brain(self):
- with mock.patch.object(policy, "Brain") as fake_brain:
- fake_brain.check.return_value = True
+ self.assertEqual(result, ("target", "creds"))
+ self.assertEqual(policy._rules, None)
- result = policy.enforce("match", "target", "credentials")
+ def test_check_no_rules(self):
+ policy._rules = None
+ result = policy.check('rule', "target", "creds")
- self.assertNotEqual(policy._BRAIN, None)
- self.assertEqual(result, True)
+ self.assertEqual(result, False)
+ self.assertEqual(policy._rules, None)
- def test_use_existing_brain(self):
- result = policy.enforce("match", "target", "credentials")
+ def test_check_missing_rule(self):
+ policy._rules = {}
+ result = policy.check('rule', 'target', 'creds')
- self.assertNotEqual(policy._BRAIN, None)
- self.assertEqual(result, True)
- self.check_args("match", "target", "credentials")
+ self.assertEqual(result, False)
- def test_fail_no_exc(self):
- self.fake_brain.check_result = False
- result = policy.enforce("match", "target", "credentials")
+ def test_check_with_rule(self):
+ policy._rules = dict(default=FakeCheck())
+ result = policy.check("default", "target", "creds")
- self.assertEqual(result, False)
- self.check_args("match", "target", "credentials")
+ self.assertEqual(result, ("target", "creds"))
- def test_fail_with_exc(self):
- class TestException(Exception):
- def __init__(self, *args, **kwargs):
- self.args = args
- self.kwargs = kwargs
+ def test_check_raises(self):
+ policy._rules = None
- self.fake_brain.check_result = False
try:
- result = policy.enforce("match", "target", "credentials",
- TestException, "arg1", "arg2",
- kw1="kwarg1", kw2="kwarg2")
+ result = policy.check('rule', 'target', 'creds', TestException,
+ "arg1", "arg2", kw1="kwarg1", kw2="kwarg2")
except TestException as exc:
self.assertEqual(exc.args, ("arg1", "arg2"))
self.assertEqual(exc.kwargs, dict(kw1="kwarg1", kw2="kwarg2"))
else:
- self.fail("policy.enforce() failed to raise requested exception")
+ self.fail("policy.check() failed to raise requested exception")
class BrainTestCase(unittest.TestCase):
def test_basic_init(self):
brain = policy.Brain()
+ self.assertEqual(brain, {})
self.assertEqual(brain.rules, {})
self.assertEqual(brain.default_rule, None)
+ @mock.patch.object(policy, 'parse_rule', lambda x: x)
def test_init_with_args(self):
brain = policy.Brain(rules=dict(a="foo", b="bar"), default_rule="a")
+ self.assertEqual(brain, dict(a="foo", b="bar"))
self.assertEqual(brain.rules, dict(a="foo", b="bar"))
self.assertEqual(brain.default_rule, "a")
- def test_load_json(self):
- exemplar = """{
- "admin_or_owner": [["role:admin"], ["project_id:%(project_id)s"]],
- "default": []
-}"""
- brain = policy.Brain.load_json(exemplar, "default")
+ @mock.patch.object(policy, 'parse_rule', lambda x: x)
+ def test_init_with_checks(self):
+ exemplar = dict(a=FakeCheck("a"), b=FakeCheck("b"))
+ brain = policy.Brain(rules=exemplar, default_rule="a")
- self.assertEqual(
- brain.rules, dict(
- admin_or_owner=[["role:admin"], ["project_id:%(project_id)s"]],
- default=[],
- )
- )
- self.assertEqual(brain.default_rule, "default")
+ self.assertEqual(brain, exemplar)
+ self.assertEqual(brain.rules, exemplar)
+ self.assertEqual(brain.default_rule, "a")
+ @mock.patch.object(policy, 'parse_rule', lambda x: x)
def test_add_rule(self):
brain = policy.Brain()
brain.add_rule("rule1",
@@ -148,321 +212,404 @@ class BrainTestCase(unittest.TestCase):
brain.rules, dict(
rule1=[["role:admin"], ["project_id:%(project_id)s"]]))
- def test_check_with_badmatch(self):
+ @mock.patch.object(policy, 'parse_rule', lambda x: x)
+ def test_check(self):
brain = policy.Brain()
- result = brain._check("badmatch", "target", "credentials")
+ result = brain.check(FakeCheck(), 'target', 'creds')
- self.assertEqual(result, False)
+ self.assertEqual(result, ('target', 'creds'))
- def test_check_with_specific(self):
- self.spam_called = False
- def check_spam(brain, kind, match, target_dict, cred_dict):
- self.assertEqual(kind, "spam")
- self.assertEqual(match, "check")
- self.assertEqual(target_dict, "target")
- self.assertEqual(cred_dict, "credentials")
- self.spam_called = True
+class PolicyEnforceTestCase(unittest.TestCase):
+ @mock.patch.object(policy, 'parse_rule', lambda x: 'spam')
+ def test_enforce(self):
+ with mock.patch.object(policy, 'check') as mock_check:
+ mock_check.return_value = "result"
- class TestBrain(policy.Brain):
- _checks = dict(spam=check_spam)
+ result = policy.enforce("fake_check", 'target', 'creds',
+ 'exception', 'arg1', 'arg2',
+ kw1='kwarg1', kw2='kwarg2')
- brain = TestBrain()
- result = brain._check("spam:check", "target", "credentials")
+ self.assertEqual(result, 'result')
+ mock_check.assert_called_once_with('spam', 'target', 'creds',
+ 'exception', 'arg1', 'arg2',
+ kw1='kwarg1', kw2='kwarg2')
- self.assertEqual(self.spam_called, True)
- def test_check_with_generic(self):
- self.generic_called = False
+class FalseCheckTestCase(unittest.TestCase):
+ def test_str(self):
+ check = policy.FalseCheck()
- def check_generic(brain, kind, match, target_dict, cred_dict):
- self.assertEqual(kind, "spam")
- self.assertEqual(match, "check")
- self.assertEqual(target_dict, "target")
- self.assertEqual(cred_dict, "credentials")
- self.generic_called = True
+ self.assertEqual(str(check), '!')
- class TestBrain(policy.Brain):
- _checks = {None: check_generic}
+ def test_call(self):
+ check = policy.FalseCheck()
- brain = TestBrain()
- result = brain._check("spam:check", "target", "credentials")
+ self.assertEqual(check('target', 'creds'), False)
- self.assertEqual(self.generic_called, True)
- def test_check_with_inheritance(self):
- self.inherited_called = False
+class TrueCheckTestCase(unittest.TestCase):
+ def test_str(self):
+ check = policy.TrueCheck()
- class TestBrain(policy.Brain):
- _checks = {}
+ self.assertEqual(str(check), '@')
- def _check_inherited(inst, match, target_dict, cred_dict):
- self.assertEqual(match, "check")
- self.assertEqual(target_dict, "target")
- self.assertEqual(cred_dict, "credentials")
- self.inherited_called = True
+ def test_call(self):
+ check = policy.TrueCheck()
- brain = TestBrain()
- result = brain._check("inherited:check", "target", "credentials")
+ self.assertEqual(check('target', 'creds'), True)
- self.assertEqual(self.inherited_called, True)
- def test_check_no_handler(self):
- class TestBrain(policy.Brain):
- _checks = {}
+class CheckForTest(policy.Check):
+ def __call__(self, target, creds):
+ pass
- brain = TestBrain()
- self.assertEqual(brain._check('spam:mer', 'target', 'cred'), False)
- def test_check_empty(self):
- class TestBrain(policy.Brain):
- def _check(inst, match, target_dict, cred_dict):
- self.fail("_check() called for empty match list!")
+class CheckTestCase(unittest.TestCase):
+ def test_init(self):
+ check = CheckForTest('kind', 'match')
- brain = TestBrain()
- result = brain.check([], "target", "credentials")
+ self.assertEqual(check.kind, 'kind')
+ self.assertEqual(check.match, 'match')
- self.assertEqual(result, True)
+ def test_str(self):
+ check = CheckForTest('kind', 'match')
- def stub__check(self):
- self._check_called = 0
- self.matches = []
- self.targets = []
- self.creds = []
+ self.assertEqual(str(check), 'kind:match')
- class TestBrain(policy.Brain):
- def _check(inst, match, target_dict, cred_dict):
- self._check_called += 1
- self.matches.append(match)
- self.targets.append(target_dict)
- self.creds.append(cred_dict)
- return match == "True"
- return TestBrain()
+class FuncCheckTestCase(unittest.TestCase):
+ def test_init(self):
+ check = policy.FuncCheck('func', 'kind', 'match')
- def test_check_basic_true(self):
- brain = self.stub__check()
- result = brain.check(["True"], "target", "creds")
+ self.assertEqual(check.func, 'func')
+ self.assertEqual(check.kind, 'kind')
+ self.assertEqual(check.match, 'match')
- self.assertEqual(result, True)
- self.assertEqual(self._check_called, 1)
- self.assertEqual(self.matches, ["True"])
- self.assertEqual(self.targets, ["target"])
- self.assertEqual(self.creds, ["creds"])
+ def test_str(self):
+ check = policy.FuncCheck('func', 'kind', 'match')
- def test_check_basic_false(self):
- brain = self.stub__check()
- result = brain.check(["False"], "target", "creds")
+ self.assertEqual(str(check), 'kind:match')
- self.assertEqual(result, False)
- self.assertEqual(self._check_called, 1)
- self.assertEqual(self.matches, ["False"])
- self.assertEqual(self.targets, ["target"])
- self.assertEqual(self.creds, ["creds"])
-
- def test_check_or_true(self):
- brain = self.stub__check()
- result = brain.check([["False"], ["True"], ["False"]],
- "target", "creds")
-
- self.assertEqual(result, True)
- self.assertEqual(self._check_called, 2)
- self.assertEqual(self.matches, ["False", "True"])
- self.assertEqual(self.targets, ["target", "target"])
- self.assertEqual(self.creds, ["creds", "creds"])
-
- def test_check_or_false(self):
- brain = self.stub__check()
- result = brain.check([["False"], ["False"], ["False"]],
- "target", "creds")
+ def test_call(self):
+ check = policy.FuncCheck(lambda a, b, c, d, e: (a, b, c, d, e),
+ 'kind', 'match')
- self.assertEqual(result, False)
- self.assertEqual(self._check_called, 3)
- self.assertEqual(self.matches, ["False", "False", "False"])
- self.assertEqual(self.targets, ["target", "target", "target"])
- self.assertEqual(self.creds, ["creds", "creds", "creds"])
-
- def test_check_and_true(self):
- brain = self.stub__check()
- result = brain.check([["True", "True", "True"]],
- "target", "creds")
-
- self.assertEqual(result, True)
- self.assertEqual(self._check_called, 3)
- self.assertEqual(self.matches, ["True", "True", "True"])
- self.assertEqual(self.targets, ["target", "target", "target"])
- self.assertEqual(self.creds, ["creds", "creds", "creds"])
-
- def test_check_and_false(self):
- brain = self.stub__check()
- result = brain.check([["True", "True", "False"]],
- "target", "creds")
+ self.assertEqual(check('target', 'creds'),
+ (None, 'kind', 'match', 'target', 'creds'))
- self.assertEqual(result, False)
- self.assertEqual(self._check_called, 3)
- self.assertEqual(self.matches, ["True", "True", "False"])
- self.assertEqual(self.targets, ["target", "target", "target"])
- self.assertEqual(self.creds, ["creds", "creds", "creds"])
+class OrCheckTestCase(unittest.TestCase):
+ def test_init(self):
+ check = policy.OrCheck(['rule1', 'rule2'])
-class CheckRegisterTestCase(unittest.TestCase):
- def setUp(self):
- self.brain_checks = policy.Brain._checks
- policy.Brain._checks = {}
+ self.assertEqual(check.rules, ['rule1', 'rule2'])
- def tearDown(self):
- policy.Brain._checks = self.brain_checks
+ def test_add_check(self):
+ check = policy.OrCheck(['rule1', 'rule2'])
+ check.add_check('rule3')
- def test_class_register(self):
- policy.Brain._register('spam', 'func')
- policy.Brain._register('spammer', 'funcer')
+ self.assertEqual(check.rules, ['rule1', 'rule2', 'rule3'])
- self.assertEqual(policy.Brain._checks,
- dict(spam='func', spammer='funcer'))
+ def test_str(self):
+ check = policy.OrCheck(['rule1', 'rule2'])
- def test_register_func(self):
- policy.register('spam', 'func')
+ self.assertEqual(str(check), '(rule1 or rule2)')
- self.assertEqual(policy.Brain._checks,
- dict(spam='func'))
+ def test_call_all_false(self):
+ rules = [mock.Mock(return_value=False), mock.Mock(return_value=False)]
+ check = policy.OrCheck(rules)
- def test_register_decorator(self):
- @policy.register('spam')
- def test_func():
- pass
+ self.assertEqual(check('target', 'cred'), False)
+ rules[0].assert_called_once_with('target', 'cred')
+ rules[1].assert_called_once_with('target', 'cred')
- self.assertEqual(policy.Brain._checks,
- dict(spam=test_func))
+ def test_call_first_true(self):
+ rules = [mock.Mock(return_value=True), mock.Mock(return_value=False)]
+ check = policy.OrCheck(rules)
+ self.assertEqual(check('target', 'cred'), True)
+ rules[0].assert_called_once_with('target', 'cred')
+ self.assertFalse(rules[1].called)
-class CheckTestCase(unittest.TestCase):
- def setUp(self):
- self.urlopen_result = ""
+ def test_call_second_true(self):
+ rules = [mock.Mock(return_value=False), mock.Mock(return_value=True)]
+ check = policy.OrCheck(rules)
- def fake_urlopen(url, post_data):
- self.url = url
- self.post_data = post_data
- return StringIO.StringIO(self.urlopen_result)
+ self.assertEqual(check('target', 'cred'), True)
+ rules[0].assert_called_once_with('target', 'cred')
+ rules[1].assert_called_once_with('target', 'cred')
- self.patcher = mock.patch.object(urllib2, "urlopen", fake_urlopen)
- self.patcher.start()
- def tearDown(self):
- self.patcher.stop()
+class ParseCheckTestCase(unittest.TestCase):
+ def test_bad_rule(self):
+ result = policy._parse_check('foobar')
- def stub__check_rule(self, rules=None, default_rule=None):
- self.check_called = False
+ self.assertTrue(isinstance(result, policy.FalseCheck))
- class TestBrain(policy.Brain):
- def check(inst, matchs, target_dict, cred_dict):
- self.check_called = True
- self.target = target_dict
- self.cred = cred_dict
- return matchs
+ @mock.patch.object(policy, '_checks', {})
+ @mock.patch.object(policy, '_functions', {})
+ def test_no_handler(self):
+ result = policy._parse_check('no:handler')
- return TestBrain(rules=rules, default_rule=default_rule)
+ self.assertTrue(isinstance(result, policy.FalseCheck))
- def test_rule_no_rules_no_default(self):
- brain = self.stub__check_rule()
- result = policy._check_rule(brain, "rule", "spam", "target", "creds")
+ @mock.patch.object(policy, '_checks', {
+ 'spam': mock.Mock(return_value="spam_check"),
+ None: mock.Mock(return_value="none_check"),
+ })
+ @mock.patch.object(policy, '_functions', {
+ 'spam': "spam_func",
+ None: "none_func",
+ })
+ def test_check(self):
+ result = policy._parse_check('spam:handler')
- self.assertEqual(result, False)
- self.assertEqual(self.check_called, False)
+ self.assertEqual(result, 'spam_check')
+ policy._checks['spam'].assert_called_once_with('spam', 'handler')
+ self.assertFalse(policy._checks[None].called)
- def test_rule_no_rules_default(self):
- brain = self.stub__check_rule(default_rule="spam")
- result = policy._check_rule(brain, "rule", "spam", "target", "creds")
+ @mock.patch.object(policy, '_checks', {
+ None: mock.Mock(return_value="none_check"),
+ })
+ @mock.patch.object(policy, '_functions', {
+ 'spam': "spam_func",
+ None: "none_func",
+ })
+ def test_func(self):
+ result = policy._parse_check('spam:handler')
+
+ self.assertTrue(isinstance(result, policy.FuncCheck))
+ self.assertEqual(result.func, 'spam_func')
+ self.assertEqual(result.kind, 'spam')
+ self.assertEqual(result.match, 'handler')
+ self.assertFalse(policy._checks[None].called)
+
+ @mock.patch.object(policy, '_checks', {
+ None: mock.Mock(return_value="none_check"),
+ })
+ @mock.patch.object(policy, '_functions', {
+ None: "none_func",
+ })
+ def test_check_default(self):
+ result = policy._parse_check('spam:handler')
+
+ self.assertEqual(result, 'none_check')
+ policy._checks[None].assert_called_once_with('spam', 'handler')
+
+ @mock.patch.object(policy, '_checks', {})
+ @mock.patch.object(policy, '_functions', {
+ None: "none_func",
+ })
+ def test_func_default(self):
+ result = policy._parse_check('spam:handler')
+
+ self.assertTrue(isinstance(result, policy.FuncCheck))
+ self.assertEqual(result.func, 'none_func')
+ self.assertEqual(result.kind, 'spam')
+ self.assertEqual(result.match, 'handler')
+
+
+class ParseListRuleTestCase(unittest.TestCase):
+ def test_empty(self):
+ result = policy._parse_list_rule([])
+
+ self.assertTrue(isinstance(result, policy.TrueCheck))
+ self.assertEqual(str(result), '@')
+
+ @mock.patch.object(policy, '_parse_check', FakeCheck)
+ def test_oneele_zeroele(self):
+ result = policy._parse_list_rule([[]])
+
+ self.assertTrue(isinstance(result, policy.FalseCheck))
+ self.assertEqual(str(result), '!')
+
+ @mock.patch.object(policy, '_parse_check', FakeCheck)
+ def test_oneele_bare(self):
+ result = policy._parse_list_rule(['rule'])
+
+ self.assertTrue(isinstance(result, FakeCheck))
+ self.assertEqual(result.result, 'rule')
+ self.assertEqual(str(result), 'rule')
+
+ @mock.patch.object(policy, '_parse_check', FakeCheck)
+ def test_oneele_oneele(self):
+ result = policy._parse_list_rule([['rule']])
+
+ self.assertTrue(isinstance(result, FakeCheck))
+ self.assertEqual(result.result, 'rule')
+ self.assertEqual(str(result), 'rule')
+
+ @mock.patch.object(policy, '_parse_check', FakeCheck)
+ def test_oneele_multi(self):
+ result = policy._parse_list_rule([['rule1', 'rule2']])
+
+ self.assertTrue(isinstance(result, policy.AndCheck))
+ self.assertEqual(len(result.rules), 2)
+ for i, value in enumerate(['rule1', 'rule2']):
+ self.assertTrue(isinstance(result.rules[i], FakeCheck))
+ self.assertEqual(result.rules[i].result, value)
+ self.assertEqual(str(result), '(rule1 and rule2)')
+
+ @mock.patch.object(policy, '_parse_check', FakeCheck)
+ def test_multi_oneele(self):
+ result = policy._parse_list_rule([['rule1'], ['rule2']])
+
+ self.assertTrue(isinstance(result, policy.OrCheck))
+ self.assertEqual(len(result.rules), 2)
+ for i, value in enumerate(['rule1', 'rule2']):
+ self.assertTrue(isinstance(result.rules[i], FakeCheck))
+ self.assertEqual(result.rules[i].result, value)
+ self.assertEqual(str(result), '(rule1 or rule2)')
+
+ @mock.patch.object(policy, '_parse_check', FakeCheck)
+ def test_multi_multi(self):
+ result = policy._parse_list_rule([['rule1', 'rule2'],
+ ['rule3', 'rule4']])
+
+ self.assertTrue(isinstance(result, policy.OrCheck))
+ self.assertEqual(len(result.rules), 2)
+ for i, values in enumerate([['rule1', 'rule2'], ['rule3', 'rule4']]):
+ self.assertTrue(isinstance(result.rules[i], policy.AndCheck))
+ self.assertEqual(len(result.rules[i].rules), 2)
+ for j, value in enumerate(values):
+ self.assertTrue(isinstance(result.rules[i].rules[j],
+ FakeCheck))
+ self.assertEqual(result.rules[i].rules[j].result, value)
+ self.assertEqual(str(result),
+ '((rule1 and rule2) or (rule3 and rule4))')
- self.assertEqual(result, False)
- self.assertEqual(self.check_called, False)
- def test_rule_no_rules_non_default(self):
- brain = self.stub__check_rule(default_rule="spam")
- result = policy._check_rule(brain, "rule", "python", "target", "creds")
+class CheckRegisterTestCase(unittest.TestCase):
+ @mock.patch.object(policy, '_functions', {})
+ def test_register_func(self):
+ def test_func():
+ pass
- self.assertEqual(self.check_called, True)
- self.assertEqual(result, ("rule:spam",))
- self.assertEqual(self.target, "target")
- self.assertEqual(self.cred, "creds")
+ policy.register('spam', test_func)
- def test_rule_with_rules(self):
- brain = self.stub__check_rule(rules=dict(spam=["hiho:ni"]))
- result = policy._check_rule(brain, "rule", "spam", "target", "creds")
+ self.assertEqual(policy._functions, dict(spam=test_func))
- self.assertEqual(self.check_called, True)
- self.assertEqual(result, ["hiho:ni"])
- self.assertEqual(self.target, "target")
- self.assertEqual(self.cred, "creds")
+ @mock.patch.object(policy, '_functions', {})
+ def test_register_func_decorator(self):
+ @policy.register('spam')
+ def test_func():
+ pass
- def test_role_no_match(self):
- result = policy._check_role(None, "role", "SpAm", {},
- dict(roles=["a", "b", "c"]))
+ self.assertEqual(policy._functions, dict(spam=test_func))
- self.assertEqual(result, False)
+ @mock.patch.object(policy, '_checks', {})
+ def test_register_check(self):
+ class TestCheck(policy.Check):
+ pass
- def test_role_with_match(self):
- result = policy._check_role(None, "role", "SpAm", {},
- dict(roles=["a", "b", "sPaM"]))
+ policy.register('spam', TestCheck)
- self.assertEqual(result, True)
+ self.assertEqual(policy._checks, dict(spam=TestCheck))
- def test_generic_boolean(self):
- result = policy._check_generic(None, "is_admin", "True",
- {},
- dict(is_admin=True))
+ @mock.patch.object(policy, '_checks', {})
+ def test_register_check_decorator(self):
+ @policy.register('spam')
+ class TestCheck(policy.Check):
+ pass
- self.assertEqual(result, True)
+ self.assertEqual(policy._checks, dict(spam=TestCheck))
- def test_generic_no_key(self):
- result = policy._check_generic(None, "tenant", "%(tenant_id)s",
- dict(tenant_id="spam"),
- {})
- self.assertEqual(result, False)
+class RuleCheckTestCase(unittest.TestCase):
+ @mock.patch.object(policy, '_rules', {})
+ def test_rule_missing(self):
+ check = policy.RuleCheck('rule', 'spam')
- def test_generic_with_key_mismatch(self):
- result = policy._check_generic(None, "tenant", "%(tenant_id)s",
- dict(tenant_id="spam"),
- dict(tenant="nospam"))
+ self.assertEqual(check('target', 'creds'), False)
- self.assertEqual(result, False)
+ @mock.patch.object(policy, '_rules',
+ dict(spam=mock.Mock(return_value=False)))
+ def test_rule_false(self):
+ check = policy.RuleCheck('rule', 'spam')
+
+ self.assertEqual(check('target', 'creds'), False)
+ policy._rules['spam'].assert_called_once_with('target', 'creds')
+
+ @mock.patch.object(policy, '_rules',
+ dict(spam=mock.Mock(return_value=True)))
+ def test_rule_true(self):
+ check = policy.RuleCheck('rule', 'spam')
+
+ self.assertEqual(check('target', 'creds'), True)
+ policy._rules['spam'].assert_called_once_with('target', 'creds')
+
+
+class RoleCheckTestCase(unittest.TestCase):
+ def test_accept(self):
+ check = policy.RoleCheck('role', 'sPaM')
- def test_generic_with_key_match(self):
- result = policy._check_generic(None, "tenant", "%(tenant_id)s",
- dict(tenant_id="spam"),
- dict(tenant="spam"))
+ self.assertEqual(check('target', dict(roles=['SpAm'])), True)
- self.assertEqual(result, True)
+ def test_reject(self):
+ check = policy.RoleCheck('role', 'spam')
- def decode_post_data(self):
+ self.assertEqual(check('target', dict(roles=[])), False)
+
+
+class HttpCheckTestCase(unittest.TestCase):
+ def decode_post_data(self, post_data):
result = {}
- for item in self.post_data.split('&'):
+ for item in post_data.split('&'):
key, _sep, value = item.partition('=')
result[key] = jsonutils.loads(urllib.unquote_plus(value))
return result
- def test_http_false(self):
- result = policy._check_http(None, "http",
- "//spam.example.org/%(tenant)s",
- dict(tenant="spam"),
- dict(roles=["a", "b", "c"]))
+ @mock.patch.object(urllib2, 'urlopen',
+ return_value=StringIO.StringIO('True'))
+ def test_accept(self, mock_urlopen):
+ check = policy.HttpCheck('http', '//example.com/%(name)s')
- self.assertEqual(result, False)
- self.assertEqual(self.url, "http://spam.example.org/spam")
- self.assertEqual(self.decode_post_data(), dict(
- target=dict(tenant="spam"),
- credentials=dict(roles=["a", "b", "c"])))
-
- def test_http_true(self):
- self.urlopen_result = "True"
- result = policy._check_http(None, "http",
- "//spam.example.org/%(tenant)s",
- dict(tenant="spam"),
- dict(roles=["a", "b", "c"]))
-
- self.assertEqual(result, True)
- self.assertEqual(self.url, "http://spam.example.org/spam")
- self.assertEqual(self.decode_post_data(), dict(
- target=dict(tenant="spam"),
- credentials=dict(roles=["a", "b", "c"])))
+ self.assertEqual(check(dict(name='target', spam='spammer'),
+ dict(user='user', roles=['a', 'b', 'c'])),
+ True)
+ self.assertEqual(mock_urlopen.call_count, 1)
+
+ args = mock_urlopen.call_args[0]
+
+ self.assertEqual(args[0], 'http://example.com/target')
+ self.assertEqual(self.decode_post_data(args[1]), dict(
+ target=dict(name='target', spam='spammer'),
+ credentials=dict(user='user', roles=['a', 'b', 'c']),
+ ))
+
+ @mock.patch.object(urllib2, 'urlopen',
+ return_value=StringIO.StringIO('other'))
+ def test_reject(self, mock_urlopen):
+ check = policy.HttpCheck('http', '//example.com/%(name)s')
+
+ self.assertEqual(check(dict(name='target', spam='spammer'),
+ dict(user='user', roles=['a', 'b', 'c'])),
+ False)
+ self.assertEqual(mock_urlopen.call_count, 1)
+
+ args = mock_urlopen.call_args[0]
+
+ self.assertEqual(args[0], 'http://example.com/target')
+ self.assertEqual(self.decode_post_data(args[1]), dict(
+ target=dict(name='target', spam='spammer'),
+ credentials=dict(user='user', roles=['a', 'b', 'c']),
+ ))
+
+
+class GenericCheckTestCase(unittest.TestCase):
+ def test_no_cred(self):
+ check = policy.GenericCheck('name', '%(name)s')
+
+ self.assertEqual(check(dict(name='spam'), {}), False)
+
+ def test_cred_mismatch(self):
+ check = policy.GenericCheck('name', '%(name)s')
+
+ self.assertEqual(check(dict(name='spam'), dict(name='ham')), False)
+
+ def test_accept(self):
+ check = policy.GenericCheck('name', '%(name)s')
+
+ self.assertEqual(check(dict(name='spam'), dict(name='spam')), True)