diff options
author | Flaper Fesp <flaper87@gmail.com> | 2013-04-29 16:58:23 +0200 |
---|---|---|
committer | Flaper Fesp <flaper87@gmail.com> | 2013-05-21 17:18:59 +0200 |
commit | 1091b4f3bed9b0ad8c23e5db6d4a5cee15fc786c (patch) | |
tree | fe47d22af56baf2919a41c09930ffbf9ec75d63a /tests | |
parent | 3e0114f570d97c47b1b8eefce0bcd27146098b8d (diff) | |
download | oslo-1091b4f3bed9b0ad8c23e5db6d4a5cee15fc786c.tar.gz oslo-1091b4f3bed9b0ad8c23e5db6d4a5cee15fc786c.tar.xz oslo-1091b4f3bed9b0ad8c23e5db6d4a5cee15fc786c.zip |
Reduce duplicated code related to policies
This patch adds some logic that is currently duplicated throughout
Openstack. As part of this de-duplication, the patch also modifies
current implementation.
Major Changes:
* check, set_rules, reset, init are now part of the Enforcer class
* check was renamed into enforce
* init was renamed into load_rules
* It is now possible to load multiple files and have per instance
rules instead of global rules.
* There's a global instance of the Enforcer class that can be used as
main enforcer.
from openstack.common import policy
ENFORCER = policy.ENFORCER
ENFORCER.enforce(rule, target, creds)
Minor Changes:
* Added do_raise to the enforce method
* Enforcer instance is now passed to the Check call.
NOTE: If / once this patch gets in, I'll update other projects and port
them to the latest version.
Change-Id: Ife909bdf3277ef33c2fb1eae16ae261fa6374c63
Diffstat (limited to 'tests')
-rw-r--r-- | tests/unit/test_fileutils.py | 48 | ||||
-rw-r--r-- | tests/unit/test_policy.py | 155 | ||||
-rw-r--r-- | tests/var/policy.json | 4 |
3 files changed, 153 insertions, 54 deletions
diff --git a/tests/unit/test_fileutils.py b/tests/unit/test_fileutils.py index c0bf0ac..7cd067c 100644 --- a/tests/unit/test_fileutils.py +++ b/tests/unit/test_fileutils.py @@ -15,10 +15,13 @@ # License for the specific language governing permissions and limitations # under the License. +import __builtin__ import os import shutil import tempfile +import mox + from openstack.common import fileutils from tests import utils @@ -34,3 +37,48 @@ class EnsureTree(utils.BaseTestCase): finally: if os.path.exists(tmpdir): shutil.rmtree(tmpdir) + + +class TestCachedFile(utils.BaseTestCase): + + def setUp(self): + super(TestCachedFile, self).setUp() + self.mox = mox.Mox() + self.addCleanup(self.mox.UnsetStubs) + + def test_read_cached_file(self): + self.mox.StubOutWithMock(os.path, "getmtime") + os.path.getmtime(mox.IgnoreArg()).AndReturn(1) + self.mox.ReplayAll() + + fileutils._FILE_CACHE = { + '/this/is/a/fake': {"data": 1123, "mtime": 1} + } + fresh, data = fileutils.read_cached_file("/this/is/a/fake") + fdata = fileutils._FILE_CACHE['/this/is/a/fake']["data"] + self.assertEqual(fdata, data) + + def test_read_modified_cached_file(self): + self.mox.StubOutWithMock(os.path, "getmtime") + self.mox.StubOutWithMock(__builtin__, 'open') + os.path.getmtime(mox.IgnoreArg()).AndReturn(2) + + fake_contents = "lorem ipsum" + fake_file = self.mox.CreateMockAnything() + fake_file.read().AndReturn(fake_contents) + fake_context_manager = self.mox.CreateMockAnything() + fake_context_manager.__enter__().AndReturn(fake_file) + fake_context_manager.__exit__(mox.IgnoreArg(), + mox.IgnoreArg(), + mox.IgnoreArg()) + + __builtin__.open(mox.IgnoreArg()).AndReturn(fake_context_manager) + + self.mox.ReplayAll() + fileutils._FILE_CACHE = { + '/this/is/a/fake': {"data": 1123, "mtime": 1} + } + + fresh, data = fileutils.read_cached_file("/this/is/a/fake") + self.assertEqual(data, fake_contents) + self.assertTrue(fresh) diff --git a/tests/unit/test_policy.py b/tests/unit/test_policy.py index 7e56796..582021b 100644 --- a/tests/unit/test_policy.py +++ b/tests/unit/test_policy.py @@ -17,16 +17,25 @@ """Test of Policy Engine""" +import os import StringIO import urllib import mock import urllib2 +from oslo.config import cfg from openstack.common import jsonutils from openstack.common import policy from tests import utils +CONF = cfg.CONF + +TEST_VAR_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), + '..', 'var')) + +ENFORCER = policy.Enforcer() + class TestException(Exception): def __init__(self, *args, **kwargs): @@ -35,6 +44,7 @@ class TestException(Exception): class RulesTestCase(utils.BaseTestCase): + def test_init_basic(self): rules = policy.Rules() @@ -104,24 +114,50 @@ class RulesTestCase(utils.BaseTestCase): self.assertEqual(str(rules), exemplar) -class PolicySetAndResetTestCase(utils.BaseTestCase): +class PolicyBaseTestCase(utils.BaseTestCase): def setUp(self): - super(PolicySetAndResetTestCase, self).setUp() + super(PolicyBaseTestCase, self).setUp() + CONF(args=['--config-dir', TEST_VAR_DIR]) + self.enforcer = ENFORCER + + def tearDown(self): + super(PolicyBaseTestCase, self).tearDown() # Make sure the policy rules are reset for remaining tests - self.addCleanup(setattr, policy, '_rules', None) + self.enforcer.clear() + + +class EnforcerTest(PolicyBaseTestCase): + + def test_load_file(self): + self.enforcer.load_rules(True) + self.assertIsNotNone(self.enforcer.rules) + self.assertIn('default', self.enforcer.rules) + self.assertIn('admin', self.enforcer.rules) + + def test_reload(self): + self.enforcer.set_rules({'test': 'test'}) + self.enforcer.load_rules(force_reload=True) + + self.assertNotEquals(self.enforcer.rules, {'test': 'test'}) + self.assertIn('default', self.enforcer.rules) def test_set_rules(self): # Make sure the rules are set properly - policy._rules = None - policy.set_rules('spam') - self.assertEqual(policy._rules, 'spam') + self.enforcer.rules = None + self.enforcer.set_rules({'test': 1}) + self.assertEqual(self.enforcer.rules, {'test': 1}) + + def test_set_rules_type(self): + self.assertRaises(TypeError, + self.enforcer.set_rules, + 'dummy') - def test_reset(self): + def test_clear(self): # Make sure the rules are reset - policy._rules = 'spam' - policy.reset() - self.assertEqual(policy._rules, None) + self.enforcer.rules = 'spam' + self.enforcer.clear() + self.assertEqual(self.enforcer.rules, {}) class FakeCheck(policy.BaseCheck): @@ -129,59 +165,58 @@ class FakeCheck(policy.BaseCheck): self.result = result def __str__(self): - return self.result + return str(self.result) - def __call__(self, target, creds): + def __call__(self, target, creds, enforcer): if self.result is not None: return self.result - return (target, creds) - + return (target, creds, enforcer) -class CheckFunctionTestCase(utils.BaseTestCase): - def setUp(self): - super(CheckFunctionTestCase, self).setUp() - # Make sure the policy rules are reset for remaining tests - self.addCleanup(setattr, policy, '_rules', None) +class CheckFunctionTestCase(PolicyBaseTestCase): def test_check_explicit(self): - policy._rules = None + self.enforcer.load_rules() + self.enforcer.rules = None rule = FakeCheck() - result = policy.check(rule, "target", "creds") + result = self.enforcer.enforce(rule, "target", "creds") - self.assertEqual(result, ("target", "creds")) - self.assertEqual(policy._rules, None) + self.assertEqual(result, ("target", "creds", self.enforcer)) + self.assertEqual(self.enforcer.rules, None) def test_check_no_rules(self): - policy._rules = None - result = policy.check('rule', "target", "creds") + self.enforcer.load_rules() + self.enforcer.rules = None + result = self.enforcer.enforce('rule', "target", "creds") self.assertEqual(result, False) - self.assertEqual(policy._rules, None) + self.assertEqual(self.enforcer.rules, None) def test_check_missing_rule(self): - policy._rules = {} - result = policy.check('rule', 'target', 'creds') + self.enforcer.rules = {} + result = self.enforcer.enforce('rule', 'target', 'creds') self.assertEqual(result, False) def test_check_with_rule(self): - policy._rules = dict(default=FakeCheck()) - result = policy.check("default", "target", "creds") + self.enforcer.load_rules() + self.enforcer.rules = dict(default=FakeCheck()) + result = self.enforcer.enforce("default", "target", "creds") - self.assertEqual(result, ("target", "creds")) + self.assertEqual(result, ("target", "creds", self.enforcer)) def test_check_raises(self): - policy._rules = None + self.enforcer.rules = None try: - policy.check('rule', 'target', 'creds', TestException, - "arg1", "arg2", kw1="kwarg1", kw2="kwarg2") + self.enforcer.enforce('rule', 'target', 'creds', + True, TestException, "arg1", + "arg2", kw1="kwarg1", kw2="kwarg2") except TestException as exc: self.assertEqual(exc.args, ("arg1", "arg2")) self.assertEqual(exc.kwargs, dict(kw1="kwarg1", kw2="kwarg2")) else: - self.fail("policy.check() failed to raise requested exception") + self.fail("enforcer.enforce() failed to raise requested exception") class FalseCheckTestCase(utils.BaseTestCase): @@ -209,7 +244,7 @@ class TrueCheckTestCase(utils.BaseTestCase): class CheckForTest(policy.Check): - def __call__(self, target, creds): + def __call__(self, target, creds, enforcer): pass @@ -693,42 +728,48 @@ class CheckRegisterTestCase(utils.BaseTestCase): class RuleCheckTestCase(utils.BaseTestCase): - @mock.patch.object(policy, '_rules', {}) + @mock.patch.object(ENFORCER, 'rules', {}) def test_rule_missing(self): check = policy.RuleCheck('rule', 'spam') - self.assertEqual(check('target', 'creds'), False) + self.assertEqual(check('target', 'creds', ENFORCER), False) - @mock.patch.object(policy, '_rules', + @mock.patch.object(ENFORCER, 'rules', dict(spam=mock.Mock(return_value=False))) def test_rule_false(self): + enforcer = ENFORCER + check = policy.RuleCheck('rule', 'spam') - self.assertEqual(check('target', 'creds'), False) - policy._rules['spam'].assert_called_once_with('target', 'creds') + self.assertEqual(check('target', 'creds', enforcer), False) + enforcer.rules['spam'].assert_called_once_with('target', 'creds', + enforcer) - @mock.patch.object(policy, '_rules', + @mock.patch.object(ENFORCER, 'rules', dict(spam=mock.Mock(return_value=True))) def test_rule_true(self): + enforcer = ENFORCER check = policy.RuleCheck('rule', 'spam') - self.assertEqual(check('target', 'creds'), True) - policy._rules['spam'].assert_called_once_with('target', 'creds') + self.assertEqual(check('target', 'creds', enforcer), True) + enforcer.rules['spam'].assert_called_once_with('target', 'creds', + enforcer) -class RoleCheckTestCase(utils.BaseTestCase): +class RoleCheckTestCase(PolicyBaseTestCase): def test_accept(self): check = policy.RoleCheck('role', 'sPaM') - self.assertEqual(check('target', dict(roles=['SpAm'])), True) + self.assertEqual(check('target', dict(roles=['SpAm']), + self.enforcer), True) def test_reject(self): check = policy.RoleCheck('role', 'spam') - self.assertEqual(check('target', dict(roles=[])), False) + self.assertEqual(check('target', dict(roles=[]), self.enforcer), False) -class HttpCheckTestCase(utils.BaseTestCase): +class HttpCheckTestCase(PolicyBaseTestCase): def decode_post_data(self, post_data): result = {} for item in post_data.split('&'): @@ -743,7 +784,8 @@ class HttpCheckTestCase(utils.BaseTestCase): check = policy.HttpCheck('http', '//example.com/%(name)s') self.assertEqual(check(dict(name='target', spam='spammer'), - dict(user='user', roles=['a', 'b', 'c'])), + dict(user='user', roles=['a', 'b', 'c']), + self.enforcer), True) self.assertEqual(mock_urlopen.call_count, 1) @@ -761,7 +803,8 @@ class HttpCheckTestCase(utils.BaseTestCase): check = policy.HttpCheck('http', '//example.com/%(name)s') self.assertEqual(check(dict(name='target', spam='spammer'), - dict(user='user', roles=['a', 'b', 'c'])), + dict(user='user', roles=['a', 'b', 'c']), + self.enforcer), False) self.assertEqual(mock_urlopen.call_count, 1) @@ -774,18 +817,22 @@ class HttpCheckTestCase(utils.BaseTestCase): )) -class GenericCheckTestCase(utils.BaseTestCase): +class GenericCheckTestCase(PolicyBaseTestCase): def test_no_cred(self): check = policy.GenericCheck('name', '%(name)s') - self.assertEqual(check(dict(name='spam'), {}), False) + self.assertEqual(check(dict(name='spam'), {}, self.enforcer), False) def test_cred_mismatch(self): check = policy.GenericCheck('name', '%(name)s') - self.assertEqual(check(dict(name='spam'), dict(name='ham')), False) + self.assertEqual(check(dict(name='spam'), + dict(name='ham'), + self.enforcer), False) def test_accept(self): check = policy.GenericCheck('name', '%(name)s') - self.assertEqual(check(dict(name='spam'), dict(name='spam')), True) + self.assertEqual(check(dict(name='spam'), + dict(name='spam'), + self.enforcer), True) diff --git a/tests/var/policy.json b/tests/var/policy.json new file mode 100644 index 0000000..73730ae --- /dev/null +++ b/tests/var/policy.json @@ -0,0 +1,4 @@ +{ + "default": "rule:admin", + "admin": "is_admin:True" +} |