diff options
author | Kevin L. Mitchell <kevin.mitchell@rackspace.com> | 2012-10-09 13:23:26 +0100 |
---|---|---|
committer | Mark McLoughlin <markmc@redhat.com> | 2012-10-09 21:16:03 +0100 |
commit | 451dd68b2d9194f4612074412acedfad800e799c (patch) | |
tree | ccba7e8b07f574fe8a6ebe9ec1168e2a4b198914 /tests | |
parent | 355ca27471218012a7604b751b1ffbbd2d001f05 (diff) | |
download | oslo-451dd68b2d9194f4612074412acedfad800e799c.tar.gz oslo-451dd68b2d9194f4612074412acedfad800e799c.tar.xz oslo-451dd68b2d9194f4612074412acedfad800e799c.zip |
Rewrite the policy engine from scratch
Implements blueprint fine-grained-policy
Complete rewrite of the policy engine, with careful thought given to
backwards-compatibility.
Policy rules are now represented internally by a tree of Check
objects.
A new API is added for parsing and enforcing rules:
set_rules(Rules.load_json(data, default_rule=...))
result = check(rule, target, credentials)
reset()
The old Brain/enforce API is deprecated and will be removed soon.
A new API is also added for registering new check types:
@register("key")
class KeyCheck(Check):
def __call__(self, target, creds):
return self.match in creds
Support for using functions as check types is deprecated and will
also be removed soon.
Change-Id: I2951a0de3751bd2ec868e7a661070fed624e4af2
Diffstat (limited to 'tests')
-rw-r--r-- | tests/unit/test_policy.py | 757 |
1 files changed, 452 insertions, 305 deletions
diff --git a/tests/unit/test_policy.py b/tests/unit/test_policy.py index 5d77a6f..7ef9c85 100644 --- a/tests/unit/test_policy.py +++ b/tests/unit/test_policy.py @@ -15,7 +15,7 @@ # License for the specific language governing permissions and limitations # under the License. -"""Test of Policy Engine For Nova""" +"""Test of Policy Engine""" import os.path import StringIO @@ -29,116 +29,180 @@ from openstack.common import jsonutils from openstack.common import policy +class TestException(Exception): + def __init__(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + + +class RulesTestCase(unittest.TestCase): + def test_init_basic(self): + rules = policy.Rules() + + self.assertEqual(rules, {}) + self.assertEqual(rules.default_rule, None) + + def test_init(self): + rules = policy.Rules(dict(a=1, b=2, c=3), 'a') + + self.assertEqual(rules, dict(a=1, b=2, c=3)) + self.assertEqual(rules.default_rule, 'a') + + def test_no_default(self): + rules = policy.Rules(dict(a=1, b=2, c=3)) + + self.assertRaises(KeyError, lambda: rules['d']) + + def test_missing_default(self): + rules = policy.Rules(dict(a=1, c=3), 'b') + + self.assertRaises(KeyError, lambda: rules['d']) + + def test_with_default(self): + rules = policy.Rules(dict(a=1, b=2, c=3), 'b') + + self.assertEqual(rules['d'], 2) + + def test_retrieval(self): + rules = policy.Rules(dict(a=1, b=2, c=3), 'b') + + self.assertEqual(rules['a'], 1) + self.assertEqual(rules['b'], 2) + self.assertEqual(rules['c'], 3) + + @mock.patch.object(policy, 'parse_rule', lambda x: x) + def test_load_json(self): + exemplar = """{ + "admin_or_owner": [["role:admin"], ["project_id:%(project_id)s"]], + "default": [] +}""" + rules = policy.Rules.load_json(exemplar, 'default') + + self.assertEqual(rules.default_rule, 'default') + self.assertEqual(rules, dict( + admin_or_owner=[["role:admin"], ["project_id:%(project_id)s"]], + default=[], + )) + + def test_str_true(self): + exemplar = """{ + "admin_or_owner": "" +}""" + rules = policy.Rules(dict( + admin_or_owner=policy.TrueCheck(), + )) + + self.assertEqual(str(rules), exemplar) + + class PolicySetAndResetTestCase(unittest.TestCase): def tearDown(self): - # Make sure the policy brain is reset for remaining tests - policy._BRAIN = None + # Make sure the policy rules are reset for remaining tests + policy._rules = None + + def test_set_rules(self): + # Make sure the rules are set properly + policy._rules = None + policy.set_rules('spam') + self.assertEqual(policy._rules, 'spam') def test_set_brain(self): # Make sure the brain is set properly - policy._BRAIN = None + policy._rules = None policy.set_brain('spam') - self.assertEqual(policy._BRAIN, 'spam') + self.assertEqual(policy._rules, 'spam') def test_reset(self): - # Make sure the brain is set to something - policy._BRAIN = 'spam' + # Make sure the rules are reset + policy._rules = 'spam' policy.reset() - self.assertEqual(policy._BRAIN, None) + self.assertEqual(policy._rules, None) -class FakeBrain(object): - check_result = True +class FakeCheck(policy.BaseCheck): + def __init__(self, result=None): + self.result = result - def check(self, match_list, target_dict, credentials_dict): - self.match_list = match_list - self.target_dict = target_dict - self.credentials_dict = credentials_dict - return self.check_result + def __str__(self): + return self.result + def __call__(self, target, creds): + if self.result is not None: + return self.result + return (target, creds) -class PolicyEnforceTestCase(unittest.TestCase): - def setUp(self): - self.fake_brain = FakeBrain() - policy._BRAIN = self.fake_brain +class CheckFunctionTestCase(unittest.TestCase): def tearDown(self): - policy.reset() + # Make sure the policy rules are reset for remaining tests + policy._rules = None - def check_args(self, match_list, target_dict, credentials_dict): - self.assertEqual(self.fake_brain.match_list, match_list) - self.assertEqual(self.fake_brain.target_dict, target_dict) - self.assertEqual(self.fake_brain.credentials_dict, credentials_dict) + def test_check_explicit(self): + policy._rules = None + rule = FakeCheck() + result = policy.check(rule, "target", "creds") - def test_make_new_brain(self): - with mock.patch.object(policy, "Brain") as fake_brain: - fake_brain.check.return_value = True + self.assertEqual(result, ("target", "creds")) + self.assertEqual(policy._rules, None) - result = policy.enforce("match", "target", "credentials") + def test_check_no_rules(self): + policy._rules = None + result = policy.check('rule', "target", "creds") - self.assertNotEqual(policy._BRAIN, None) - self.assertEqual(result, True) + self.assertEqual(result, False) + self.assertEqual(policy._rules, None) - def test_use_existing_brain(self): - result = policy.enforce("match", "target", "credentials") + def test_check_missing_rule(self): + policy._rules = {} + result = policy.check('rule', 'target', 'creds') - self.assertNotEqual(policy._BRAIN, None) - self.assertEqual(result, True) - self.check_args("match", "target", "credentials") + self.assertEqual(result, False) - def test_fail_no_exc(self): - self.fake_brain.check_result = False - result = policy.enforce("match", "target", "credentials") + def test_check_with_rule(self): + policy._rules = dict(default=FakeCheck()) + result = policy.check("default", "target", "creds") - self.assertEqual(result, False) - self.check_args("match", "target", "credentials") + self.assertEqual(result, ("target", "creds")) - def test_fail_with_exc(self): - class TestException(Exception): - def __init__(self, *args, **kwargs): - self.args = args - self.kwargs = kwargs + def test_check_raises(self): + policy._rules = None - self.fake_brain.check_result = False try: - result = policy.enforce("match", "target", "credentials", - TestException, "arg1", "arg2", - kw1="kwarg1", kw2="kwarg2") + result = policy.check('rule', 'target', 'creds', TestException, + "arg1", "arg2", kw1="kwarg1", kw2="kwarg2") except TestException as exc: self.assertEqual(exc.args, ("arg1", "arg2")) self.assertEqual(exc.kwargs, dict(kw1="kwarg1", kw2="kwarg2")) else: - self.fail("policy.enforce() failed to raise requested exception") + self.fail("policy.check() failed to raise requested exception") class BrainTestCase(unittest.TestCase): def test_basic_init(self): brain = policy.Brain() + self.assertEqual(brain, {}) self.assertEqual(brain.rules, {}) self.assertEqual(brain.default_rule, None) + @mock.patch.object(policy, 'parse_rule', lambda x: x) def test_init_with_args(self): brain = policy.Brain(rules=dict(a="foo", b="bar"), default_rule="a") + self.assertEqual(brain, dict(a="foo", b="bar")) self.assertEqual(brain.rules, dict(a="foo", b="bar")) self.assertEqual(brain.default_rule, "a") - def test_load_json(self): - exemplar = """{ - "admin_or_owner": [["role:admin"], ["project_id:%(project_id)s"]], - "default": [] -}""" - brain = policy.Brain.load_json(exemplar, "default") + @mock.patch.object(policy, 'parse_rule', lambda x: x) + def test_init_with_checks(self): + exemplar = dict(a=FakeCheck("a"), b=FakeCheck("b")) + brain = policy.Brain(rules=exemplar, default_rule="a") - self.assertEqual( - brain.rules, dict( - admin_or_owner=[["role:admin"], ["project_id:%(project_id)s"]], - default=[], - ) - ) - self.assertEqual(brain.default_rule, "default") + self.assertEqual(brain, exemplar) + self.assertEqual(brain.rules, exemplar) + self.assertEqual(brain.default_rule, "a") + @mock.patch.object(policy, 'parse_rule', lambda x: x) def test_add_rule(self): brain = policy.Brain() brain.add_rule("rule1", @@ -148,321 +212,404 @@ class BrainTestCase(unittest.TestCase): brain.rules, dict( rule1=[["role:admin"], ["project_id:%(project_id)s"]])) - def test_check_with_badmatch(self): + @mock.patch.object(policy, 'parse_rule', lambda x: x) + def test_check(self): brain = policy.Brain() - result = brain._check("badmatch", "target", "credentials") + result = brain.check(FakeCheck(), 'target', 'creds') - self.assertEqual(result, False) + self.assertEqual(result, ('target', 'creds')) - def test_check_with_specific(self): - self.spam_called = False - def check_spam(brain, kind, match, target_dict, cred_dict): - self.assertEqual(kind, "spam") - self.assertEqual(match, "check") - self.assertEqual(target_dict, "target") - self.assertEqual(cred_dict, "credentials") - self.spam_called = True +class PolicyEnforceTestCase(unittest.TestCase): + @mock.patch.object(policy, 'parse_rule', lambda x: 'spam') + def test_enforce(self): + with mock.patch.object(policy, 'check') as mock_check: + mock_check.return_value = "result" - class TestBrain(policy.Brain): - _checks = dict(spam=check_spam) + result = policy.enforce("fake_check", 'target', 'creds', + 'exception', 'arg1', 'arg2', + kw1='kwarg1', kw2='kwarg2') - brain = TestBrain() - result = brain._check("spam:check", "target", "credentials") + self.assertEqual(result, 'result') + mock_check.assert_called_once_with('spam', 'target', 'creds', + 'exception', 'arg1', 'arg2', + kw1='kwarg1', kw2='kwarg2') - self.assertEqual(self.spam_called, True) - def test_check_with_generic(self): - self.generic_called = False +class FalseCheckTestCase(unittest.TestCase): + def test_str(self): + check = policy.FalseCheck() - def check_generic(brain, kind, match, target_dict, cred_dict): - self.assertEqual(kind, "spam") - self.assertEqual(match, "check") - self.assertEqual(target_dict, "target") - self.assertEqual(cred_dict, "credentials") - self.generic_called = True + self.assertEqual(str(check), '!') - class TestBrain(policy.Brain): - _checks = {None: check_generic} + def test_call(self): + check = policy.FalseCheck() - brain = TestBrain() - result = brain._check("spam:check", "target", "credentials") + self.assertEqual(check('target', 'creds'), False) - self.assertEqual(self.generic_called, True) - def test_check_with_inheritance(self): - self.inherited_called = False +class TrueCheckTestCase(unittest.TestCase): + def test_str(self): + check = policy.TrueCheck() - class TestBrain(policy.Brain): - _checks = {} + self.assertEqual(str(check), '@') - def _check_inherited(inst, match, target_dict, cred_dict): - self.assertEqual(match, "check") - self.assertEqual(target_dict, "target") - self.assertEqual(cred_dict, "credentials") - self.inherited_called = True + def test_call(self): + check = policy.TrueCheck() - brain = TestBrain() - result = brain._check("inherited:check", "target", "credentials") + self.assertEqual(check('target', 'creds'), True) - self.assertEqual(self.inherited_called, True) - def test_check_no_handler(self): - class TestBrain(policy.Brain): - _checks = {} +class CheckForTest(policy.Check): + def __call__(self, target, creds): + pass - brain = TestBrain() - self.assertEqual(brain._check('spam:mer', 'target', 'cred'), False) - def test_check_empty(self): - class TestBrain(policy.Brain): - def _check(inst, match, target_dict, cred_dict): - self.fail("_check() called for empty match list!") +class CheckTestCase(unittest.TestCase): + def test_init(self): + check = CheckForTest('kind', 'match') - brain = TestBrain() - result = brain.check([], "target", "credentials") + self.assertEqual(check.kind, 'kind') + self.assertEqual(check.match, 'match') - self.assertEqual(result, True) + def test_str(self): + check = CheckForTest('kind', 'match') - def stub__check(self): - self._check_called = 0 - self.matches = [] - self.targets = [] - self.creds = [] + self.assertEqual(str(check), 'kind:match') - class TestBrain(policy.Brain): - def _check(inst, match, target_dict, cred_dict): - self._check_called += 1 - self.matches.append(match) - self.targets.append(target_dict) - self.creds.append(cred_dict) - return match == "True" - return TestBrain() +class FuncCheckTestCase(unittest.TestCase): + def test_init(self): + check = policy.FuncCheck('func', 'kind', 'match') - def test_check_basic_true(self): - brain = self.stub__check() - result = brain.check(["True"], "target", "creds") + self.assertEqual(check.func, 'func') + self.assertEqual(check.kind, 'kind') + self.assertEqual(check.match, 'match') - self.assertEqual(result, True) - self.assertEqual(self._check_called, 1) - self.assertEqual(self.matches, ["True"]) - self.assertEqual(self.targets, ["target"]) - self.assertEqual(self.creds, ["creds"]) + def test_str(self): + check = policy.FuncCheck('func', 'kind', 'match') - def test_check_basic_false(self): - brain = self.stub__check() - result = brain.check(["False"], "target", "creds") + self.assertEqual(str(check), 'kind:match') - self.assertEqual(result, False) - self.assertEqual(self._check_called, 1) - self.assertEqual(self.matches, ["False"]) - self.assertEqual(self.targets, ["target"]) - self.assertEqual(self.creds, ["creds"]) - - def test_check_or_true(self): - brain = self.stub__check() - result = brain.check([["False"], ["True"], ["False"]], - "target", "creds") - - self.assertEqual(result, True) - self.assertEqual(self._check_called, 2) - self.assertEqual(self.matches, ["False", "True"]) - self.assertEqual(self.targets, ["target", "target"]) - self.assertEqual(self.creds, ["creds", "creds"]) - - def test_check_or_false(self): - brain = self.stub__check() - result = brain.check([["False"], ["False"], ["False"]], - "target", "creds") + def test_call(self): + check = policy.FuncCheck(lambda a, b, c, d, e: (a, b, c, d, e), + 'kind', 'match') - self.assertEqual(result, False) - self.assertEqual(self._check_called, 3) - self.assertEqual(self.matches, ["False", "False", "False"]) - self.assertEqual(self.targets, ["target", "target", "target"]) - self.assertEqual(self.creds, ["creds", "creds", "creds"]) - - def test_check_and_true(self): - brain = self.stub__check() - result = brain.check([["True", "True", "True"]], - "target", "creds") - - self.assertEqual(result, True) - self.assertEqual(self._check_called, 3) - self.assertEqual(self.matches, ["True", "True", "True"]) - self.assertEqual(self.targets, ["target", "target", "target"]) - self.assertEqual(self.creds, ["creds", "creds", "creds"]) - - def test_check_and_false(self): - brain = self.stub__check() - result = brain.check([["True", "True", "False"]], - "target", "creds") + self.assertEqual(check('target', 'creds'), + (None, 'kind', 'match', 'target', 'creds')) - self.assertEqual(result, False) - self.assertEqual(self._check_called, 3) - self.assertEqual(self.matches, ["True", "True", "False"]) - self.assertEqual(self.targets, ["target", "target", "target"]) - self.assertEqual(self.creds, ["creds", "creds", "creds"]) +class OrCheckTestCase(unittest.TestCase): + def test_init(self): + check = policy.OrCheck(['rule1', 'rule2']) -class CheckRegisterTestCase(unittest.TestCase): - def setUp(self): - self.brain_checks = policy.Brain._checks - policy.Brain._checks = {} + self.assertEqual(check.rules, ['rule1', 'rule2']) - def tearDown(self): - policy.Brain._checks = self.brain_checks + def test_add_check(self): + check = policy.OrCheck(['rule1', 'rule2']) + check.add_check('rule3') - def test_class_register(self): - policy.Brain._register('spam', 'func') - policy.Brain._register('spammer', 'funcer') + self.assertEqual(check.rules, ['rule1', 'rule2', 'rule3']) - self.assertEqual(policy.Brain._checks, - dict(spam='func', spammer='funcer')) + def test_str(self): + check = policy.OrCheck(['rule1', 'rule2']) - def test_register_func(self): - policy.register('spam', 'func') + self.assertEqual(str(check), '(rule1 or rule2)') - self.assertEqual(policy.Brain._checks, - dict(spam='func')) + def test_call_all_false(self): + rules = [mock.Mock(return_value=False), mock.Mock(return_value=False)] + check = policy.OrCheck(rules) - def test_register_decorator(self): - @policy.register('spam') - def test_func(): - pass + self.assertEqual(check('target', 'cred'), False) + rules[0].assert_called_once_with('target', 'cred') + rules[1].assert_called_once_with('target', 'cred') - self.assertEqual(policy.Brain._checks, - dict(spam=test_func)) + def test_call_first_true(self): + rules = [mock.Mock(return_value=True), mock.Mock(return_value=False)] + check = policy.OrCheck(rules) + self.assertEqual(check('target', 'cred'), True) + rules[0].assert_called_once_with('target', 'cred') + self.assertFalse(rules[1].called) -class CheckTestCase(unittest.TestCase): - def setUp(self): - self.urlopen_result = "" + def test_call_second_true(self): + rules = [mock.Mock(return_value=False), mock.Mock(return_value=True)] + check = policy.OrCheck(rules) - def fake_urlopen(url, post_data): - self.url = url - self.post_data = post_data - return StringIO.StringIO(self.urlopen_result) + self.assertEqual(check('target', 'cred'), True) + rules[0].assert_called_once_with('target', 'cred') + rules[1].assert_called_once_with('target', 'cred') - self.patcher = mock.patch.object(urllib2, "urlopen", fake_urlopen) - self.patcher.start() - def tearDown(self): - self.patcher.stop() +class ParseCheckTestCase(unittest.TestCase): + def test_bad_rule(self): + result = policy._parse_check('foobar') - def stub__check_rule(self, rules=None, default_rule=None): - self.check_called = False + self.assertTrue(isinstance(result, policy.FalseCheck)) - class TestBrain(policy.Brain): - def check(inst, matchs, target_dict, cred_dict): - self.check_called = True - self.target = target_dict - self.cred = cred_dict - return matchs + @mock.patch.object(policy, '_checks', {}) + @mock.patch.object(policy, '_functions', {}) + def test_no_handler(self): + result = policy._parse_check('no:handler') - return TestBrain(rules=rules, default_rule=default_rule) + self.assertTrue(isinstance(result, policy.FalseCheck)) - def test_rule_no_rules_no_default(self): - brain = self.stub__check_rule() - result = policy._check_rule(brain, "rule", "spam", "target", "creds") + @mock.patch.object(policy, '_checks', { + 'spam': mock.Mock(return_value="spam_check"), + None: mock.Mock(return_value="none_check"), + }) + @mock.patch.object(policy, '_functions', { + 'spam': "spam_func", + None: "none_func", + }) + def test_check(self): + result = policy._parse_check('spam:handler') - self.assertEqual(result, False) - self.assertEqual(self.check_called, False) + self.assertEqual(result, 'spam_check') + policy._checks['spam'].assert_called_once_with('spam', 'handler') + self.assertFalse(policy._checks[None].called) - def test_rule_no_rules_default(self): - brain = self.stub__check_rule(default_rule="spam") - result = policy._check_rule(brain, "rule", "spam", "target", "creds") + @mock.patch.object(policy, '_checks', { + None: mock.Mock(return_value="none_check"), + }) + @mock.patch.object(policy, '_functions', { + 'spam': "spam_func", + None: "none_func", + }) + def test_func(self): + result = policy._parse_check('spam:handler') + + self.assertTrue(isinstance(result, policy.FuncCheck)) + self.assertEqual(result.func, 'spam_func') + self.assertEqual(result.kind, 'spam') + self.assertEqual(result.match, 'handler') + self.assertFalse(policy._checks[None].called) + + @mock.patch.object(policy, '_checks', { + None: mock.Mock(return_value="none_check"), + }) + @mock.patch.object(policy, '_functions', { + None: "none_func", + }) + def test_check_default(self): + result = policy._parse_check('spam:handler') + + self.assertEqual(result, 'none_check') + policy._checks[None].assert_called_once_with('spam', 'handler') + + @mock.patch.object(policy, '_checks', {}) + @mock.patch.object(policy, '_functions', { + None: "none_func", + }) + def test_func_default(self): + result = policy._parse_check('spam:handler') + + self.assertTrue(isinstance(result, policy.FuncCheck)) + self.assertEqual(result.func, 'none_func') + self.assertEqual(result.kind, 'spam') + self.assertEqual(result.match, 'handler') + + +class ParseListRuleTestCase(unittest.TestCase): + def test_empty(self): + result = policy._parse_list_rule([]) + + self.assertTrue(isinstance(result, policy.TrueCheck)) + self.assertEqual(str(result), '@') + + @mock.patch.object(policy, '_parse_check', FakeCheck) + def test_oneele_zeroele(self): + result = policy._parse_list_rule([[]]) + + self.assertTrue(isinstance(result, policy.FalseCheck)) + self.assertEqual(str(result), '!') + + @mock.patch.object(policy, '_parse_check', FakeCheck) + def test_oneele_bare(self): + result = policy._parse_list_rule(['rule']) + + self.assertTrue(isinstance(result, FakeCheck)) + self.assertEqual(result.result, 'rule') + self.assertEqual(str(result), 'rule') + + @mock.patch.object(policy, '_parse_check', FakeCheck) + def test_oneele_oneele(self): + result = policy._parse_list_rule([['rule']]) + + self.assertTrue(isinstance(result, FakeCheck)) + self.assertEqual(result.result, 'rule') + self.assertEqual(str(result), 'rule') + + @mock.patch.object(policy, '_parse_check', FakeCheck) + def test_oneele_multi(self): + result = policy._parse_list_rule([['rule1', 'rule2']]) + + self.assertTrue(isinstance(result, policy.AndCheck)) + self.assertEqual(len(result.rules), 2) + for i, value in enumerate(['rule1', 'rule2']): + self.assertTrue(isinstance(result.rules[i], FakeCheck)) + self.assertEqual(result.rules[i].result, value) + self.assertEqual(str(result), '(rule1 and rule2)') + + @mock.patch.object(policy, '_parse_check', FakeCheck) + def test_multi_oneele(self): + result = policy._parse_list_rule([['rule1'], ['rule2']]) + + self.assertTrue(isinstance(result, policy.OrCheck)) + self.assertEqual(len(result.rules), 2) + for i, value in enumerate(['rule1', 'rule2']): + self.assertTrue(isinstance(result.rules[i], FakeCheck)) + self.assertEqual(result.rules[i].result, value) + self.assertEqual(str(result), '(rule1 or rule2)') + + @mock.patch.object(policy, '_parse_check', FakeCheck) + def test_multi_multi(self): + result = policy._parse_list_rule([['rule1', 'rule2'], + ['rule3', 'rule4']]) + + self.assertTrue(isinstance(result, policy.OrCheck)) + self.assertEqual(len(result.rules), 2) + for i, values in enumerate([['rule1', 'rule2'], ['rule3', 'rule4']]): + self.assertTrue(isinstance(result.rules[i], policy.AndCheck)) + self.assertEqual(len(result.rules[i].rules), 2) + for j, value in enumerate(values): + self.assertTrue(isinstance(result.rules[i].rules[j], + FakeCheck)) + self.assertEqual(result.rules[i].rules[j].result, value) + self.assertEqual(str(result), + '((rule1 and rule2) or (rule3 and rule4))') - self.assertEqual(result, False) - self.assertEqual(self.check_called, False) - def test_rule_no_rules_non_default(self): - brain = self.stub__check_rule(default_rule="spam") - result = policy._check_rule(brain, "rule", "python", "target", "creds") +class CheckRegisterTestCase(unittest.TestCase): + @mock.patch.object(policy, '_functions', {}) + def test_register_func(self): + def test_func(): + pass - self.assertEqual(self.check_called, True) - self.assertEqual(result, ("rule:spam",)) - self.assertEqual(self.target, "target") - self.assertEqual(self.cred, "creds") + policy.register('spam', test_func) - def test_rule_with_rules(self): - brain = self.stub__check_rule(rules=dict(spam=["hiho:ni"])) - result = policy._check_rule(brain, "rule", "spam", "target", "creds") + self.assertEqual(policy._functions, dict(spam=test_func)) - self.assertEqual(self.check_called, True) - self.assertEqual(result, ["hiho:ni"]) - self.assertEqual(self.target, "target") - self.assertEqual(self.cred, "creds") + @mock.patch.object(policy, '_functions', {}) + def test_register_func_decorator(self): + @policy.register('spam') + def test_func(): + pass - def test_role_no_match(self): - result = policy._check_role(None, "role", "SpAm", {}, - dict(roles=["a", "b", "c"])) + self.assertEqual(policy._functions, dict(spam=test_func)) - self.assertEqual(result, False) + @mock.patch.object(policy, '_checks', {}) + def test_register_check(self): + class TestCheck(policy.Check): + pass - def test_role_with_match(self): - result = policy._check_role(None, "role", "SpAm", {}, - dict(roles=["a", "b", "sPaM"])) + policy.register('spam', TestCheck) - self.assertEqual(result, True) + self.assertEqual(policy._checks, dict(spam=TestCheck)) - def test_generic_boolean(self): - result = policy._check_generic(None, "is_admin", "True", - {}, - dict(is_admin=True)) + @mock.patch.object(policy, '_checks', {}) + def test_register_check_decorator(self): + @policy.register('spam') + class TestCheck(policy.Check): + pass - self.assertEqual(result, True) + self.assertEqual(policy._checks, dict(spam=TestCheck)) - def test_generic_no_key(self): - result = policy._check_generic(None, "tenant", "%(tenant_id)s", - dict(tenant_id="spam"), - {}) - self.assertEqual(result, False) +class RuleCheckTestCase(unittest.TestCase): + @mock.patch.object(policy, '_rules', {}) + def test_rule_missing(self): + check = policy.RuleCheck('rule', 'spam') - def test_generic_with_key_mismatch(self): - result = policy._check_generic(None, "tenant", "%(tenant_id)s", - dict(tenant_id="spam"), - dict(tenant="nospam")) + self.assertEqual(check('target', 'creds'), False) - self.assertEqual(result, False) + @mock.patch.object(policy, '_rules', + dict(spam=mock.Mock(return_value=False))) + def test_rule_false(self): + check = policy.RuleCheck('rule', 'spam') + + self.assertEqual(check('target', 'creds'), False) + policy._rules['spam'].assert_called_once_with('target', 'creds') + + @mock.patch.object(policy, '_rules', + dict(spam=mock.Mock(return_value=True))) + def test_rule_true(self): + check = policy.RuleCheck('rule', 'spam') + + self.assertEqual(check('target', 'creds'), True) + policy._rules['spam'].assert_called_once_with('target', 'creds') + + +class RoleCheckTestCase(unittest.TestCase): + def test_accept(self): + check = policy.RoleCheck('role', 'sPaM') - def test_generic_with_key_match(self): - result = policy._check_generic(None, "tenant", "%(tenant_id)s", - dict(tenant_id="spam"), - dict(tenant="spam")) + self.assertEqual(check('target', dict(roles=['SpAm'])), True) - self.assertEqual(result, True) + def test_reject(self): + check = policy.RoleCheck('role', 'spam') - def decode_post_data(self): + self.assertEqual(check('target', dict(roles=[])), False) + + +class HttpCheckTestCase(unittest.TestCase): + def decode_post_data(self, post_data): result = {} - for item in self.post_data.split('&'): + for item in post_data.split('&'): key, _sep, value = item.partition('=') result[key] = jsonutils.loads(urllib.unquote_plus(value)) return result - def test_http_false(self): - result = policy._check_http(None, "http", - "//spam.example.org/%(tenant)s", - dict(tenant="spam"), - dict(roles=["a", "b", "c"])) + @mock.patch.object(urllib2, 'urlopen', + return_value=StringIO.StringIO('True')) + def test_accept(self, mock_urlopen): + check = policy.HttpCheck('http', '//example.com/%(name)s') - self.assertEqual(result, False) - self.assertEqual(self.url, "http://spam.example.org/spam") - self.assertEqual(self.decode_post_data(), dict( - target=dict(tenant="spam"), - credentials=dict(roles=["a", "b", "c"]))) - - def test_http_true(self): - self.urlopen_result = "True" - result = policy._check_http(None, "http", - "//spam.example.org/%(tenant)s", - dict(tenant="spam"), - dict(roles=["a", "b", "c"])) - - self.assertEqual(result, True) - self.assertEqual(self.url, "http://spam.example.org/spam") - self.assertEqual(self.decode_post_data(), dict( - target=dict(tenant="spam"), - credentials=dict(roles=["a", "b", "c"]))) + self.assertEqual(check(dict(name='target', spam='spammer'), + dict(user='user', roles=['a', 'b', 'c'])), + True) + self.assertEqual(mock_urlopen.call_count, 1) + + args = mock_urlopen.call_args[0] + + self.assertEqual(args[0], 'http://example.com/target') + self.assertEqual(self.decode_post_data(args[1]), dict( + target=dict(name='target', spam='spammer'), + credentials=dict(user='user', roles=['a', 'b', 'c']), + )) + + @mock.patch.object(urllib2, 'urlopen', + return_value=StringIO.StringIO('other')) + def test_reject(self, mock_urlopen): + check = policy.HttpCheck('http', '//example.com/%(name)s') + + self.assertEqual(check(dict(name='target', spam='spammer'), + dict(user='user', roles=['a', 'b', 'c'])), + False) + self.assertEqual(mock_urlopen.call_count, 1) + + args = mock_urlopen.call_args[0] + + self.assertEqual(args[0], 'http://example.com/target') + self.assertEqual(self.decode_post_data(args[1]), dict( + target=dict(name='target', spam='spammer'), + credentials=dict(user='user', roles=['a', 'b', 'c']), + )) + + +class GenericCheckTestCase(unittest.TestCase): + def test_no_cred(self): + check = policy.GenericCheck('name', '%(name)s') + + self.assertEqual(check(dict(name='spam'), {}), False) + + def test_cred_mismatch(self): + check = policy.GenericCheck('name', '%(name)s') + + self.assertEqual(check(dict(name='spam'), dict(name='ham')), False) + + def test_accept(self): + check = policy.GenericCheck('name', '%(name)s') + + self.assertEqual(check(dict(name='spam'), dict(name='spam')), True) |