summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
authorFlaper Fesp <flaper87@gmail.com>2013-04-29 16:58:23 +0200
committerFlaper Fesp <flaper87@gmail.com>2013-05-21 17:18:59 +0200
commit1091b4f3bed9b0ad8c23e5db6d4a5cee15fc786c (patch)
treefe47d22af56baf2919a41c09930ffbf9ec75d63a /tests
parent3e0114f570d97c47b1b8eefce0bcd27146098b8d (diff)
downloadoslo-1091b4f3bed9b0ad8c23e5db6d4a5cee15fc786c.tar.gz
oslo-1091b4f3bed9b0ad8c23e5db6d4a5cee15fc786c.tar.xz
oslo-1091b4f3bed9b0ad8c23e5db6d4a5cee15fc786c.zip
Reduce duplicated code related to policies
This patch adds some logic that is currently duplicated throughout Openstack. As part of this de-duplication, the patch also modifies current implementation. Major Changes: * check, set_rules, reset, init are now part of the Enforcer class * check was renamed into enforce * init was renamed into load_rules * It is now possible to load multiple files and have per instance rules instead of global rules. * There's a global instance of the Enforcer class that can be used as main enforcer. from openstack.common import policy ENFORCER = policy.ENFORCER ENFORCER.enforce(rule, target, creds) Minor Changes: * Added do_raise to the enforce method * Enforcer instance is now passed to the Check call. NOTE: If / once this patch gets in, I'll update other projects and port them to the latest version. Change-Id: Ife909bdf3277ef33c2fb1eae16ae261fa6374c63
Diffstat (limited to 'tests')
-rw-r--r--tests/unit/test_fileutils.py48
-rw-r--r--tests/unit/test_policy.py155
-rw-r--r--tests/var/policy.json4
3 files changed, 153 insertions, 54 deletions
diff --git a/tests/unit/test_fileutils.py b/tests/unit/test_fileutils.py
index c0bf0ac..7cd067c 100644
--- a/tests/unit/test_fileutils.py
+++ b/tests/unit/test_fileutils.py
@@ -15,10 +15,13 @@
# License for the specific language governing permissions and limitations
# under the License.
+import __builtin__
import os
import shutil
import tempfile
+import mox
+
from openstack.common import fileutils
from tests import utils
@@ -34,3 +37,48 @@ class EnsureTree(utils.BaseTestCase):
finally:
if os.path.exists(tmpdir):
shutil.rmtree(tmpdir)
+
+
+class TestCachedFile(utils.BaseTestCase):
+
+ def setUp(self):
+ super(TestCachedFile, self).setUp()
+ self.mox = mox.Mox()
+ self.addCleanup(self.mox.UnsetStubs)
+
+ def test_read_cached_file(self):
+ self.mox.StubOutWithMock(os.path, "getmtime")
+ os.path.getmtime(mox.IgnoreArg()).AndReturn(1)
+ self.mox.ReplayAll()
+
+ fileutils._FILE_CACHE = {
+ '/this/is/a/fake': {"data": 1123, "mtime": 1}
+ }
+ fresh, data = fileutils.read_cached_file("/this/is/a/fake")
+ fdata = fileutils._FILE_CACHE['/this/is/a/fake']["data"]
+ self.assertEqual(fdata, data)
+
+ def test_read_modified_cached_file(self):
+ self.mox.StubOutWithMock(os.path, "getmtime")
+ self.mox.StubOutWithMock(__builtin__, 'open')
+ os.path.getmtime(mox.IgnoreArg()).AndReturn(2)
+
+ fake_contents = "lorem ipsum"
+ fake_file = self.mox.CreateMockAnything()
+ fake_file.read().AndReturn(fake_contents)
+ fake_context_manager = self.mox.CreateMockAnything()
+ fake_context_manager.__enter__().AndReturn(fake_file)
+ fake_context_manager.__exit__(mox.IgnoreArg(),
+ mox.IgnoreArg(),
+ mox.IgnoreArg())
+
+ __builtin__.open(mox.IgnoreArg()).AndReturn(fake_context_manager)
+
+ self.mox.ReplayAll()
+ fileutils._FILE_CACHE = {
+ '/this/is/a/fake': {"data": 1123, "mtime": 1}
+ }
+
+ fresh, data = fileutils.read_cached_file("/this/is/a/fake")
+ self.assertEqual(data, fake_contents)
+ self.assertTrue(fresh)
diff --git a/tests/unit/test_policy.py b/tests/unit/test_policy.py
index 7e56796..582021b 100644
--- a/tests/unit/test_policy.py
+++ b/tests/unit/test_policy.py
@@ -17,16 +17,25 @@
"""Test of Policy Engine"""
+import os
import StringIO
import urllib
import mock
import urllib2
+from oslo.config import cfg
from openstack.common import jsonutils
from openstack.common import policy
from tests import utils
+CONF = cfg.CONF
+
+TEST_VAR_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
+ '..', 'var'))
+
+ENFORCER = policy.Enforcer()
+
class TestException(Exception):
def __init__(self, *args, **kwargs):
@@ -35,6 +44,7 @@ class TestException(Exception):
class RulesTestCase(utils.BaseTestCase):
+
def test_init_basic(self):
rules = policy.Rules()
@@ -104,24 +114,50 @@ class RulesTestCase(utils.BaseTestCase):
self.assertEqual(str(rules), exemplar)
-class PolicySetAndResetTestCase(utils.BaseTestCase):
+class PolicyBaseTestCase(utils.BaseTestCase):
def setUp(self):
- super(PolicySetAndResetTestCase, self).setUp()
+ super(PolicyBaseTestCase, self).setUp()
+ CONF(args=['--config-dir', TEST_VAR_DIR])
+ self.enforcer = ENFORCER
+
+ def tearDown(self):
+ super(PolicyBaseTestCase, self).tearDown()
# Make sure the policy rules are reset for remaining tests
- self.addCleanup(setattr, policy, '_rules', None)
+ self.enforcer.clear()
+
+
+class EnforcerTest(PolicyBaseTestCase):
+
+ def test_load_file(self):
+ self.enforcer.load_rules(True)
+ self.assertIsNotNone(self.enforcer.rules)
+ self.assertIn('default', self.enforcer.rules)
+ self.assertIn('admin', self.enforcer.rules)
+
+ def test_reload(self):
+ self.enforcer.set_rules({'test': 'test'})
+ self.enforcer.load_rules(force_reload=True)
+
+ self.assertNotEquals(self.enforcer.rules, {'test': 'test'})
+ self.assertIn('default', self.enforcer.rules)
def test_set_rules(self):
# Make sure the rules are set properly
- policy._rules = None
- policy.set_rules('spam')
- self.assertEqual(policy._rules, 'spam')
+ self.enforcer.rules = None
+ self.enforcer.set_rules({'test': 1})
+ self.assertEqual(self.enforcer.rules, {'test': 1})
+
+ def test_set_rules_type(self):
+ self.assertRaises(TypeError,
+ self.enforcer.set_rules,
+ 'dummy')
- def test_reset(self):
+ def test_clear(self):
# Make sure the rules are reset
- policy._rules = 'spam'
- policy.reset()
- self.assertEqual(policy._rules, None)
+ self.enforcer.rules = 'spam'
+ self.enforcer.clear()
+ self.assertEqual(self.enforcer.rules, {})
class FakeCheck(policy.BaseCheck):
@@ -129,59 +165,58 @@ class FakeCheck(policy.BaseCheck):
self.result = result
def __str__(self):
- return self.result
+ return str(self.result)
- def __call__(self, target, creds):
+ def __call__(self, target, creds, enforcer):
if self.result is not None:
return self.result
- return (target, creds)
-
+ return (target, creds, enforcer)
-class CheckFunctionTestCase(utils.BaseTestCase):
- def setUp(self):
- super(CheckFunctionTestCase, self).setUp()
- # Make sure the policy rules are reset for remaining tests
- self.addCleanup(setattr, policy, '_rules', None)
+class CheckFunctionTestCase(PolicyBaseTestCase):
def test_check_explicit(self):
- policy._rules = None
+ self.enforcer.load_rules()
+ self.enforcer.rules = None
rule = FakeCheck()
- result = policy.check(rule, "target", "creds")
+ result = self.enforcer.enforce(rule, "target", "creds")
- self.assertEqual(result, ("target", "creds"))
- self.assertEqual(policy._rules, None)
+ self.assertEqual(result, ("target", "creds", self.enforcer))
+ self.assertEqual(self.enforcer.rules, None)
def test_check_no_rules(self):
- policy._rules = None
- result = policy.check('rule', "target", "creds")
+ self.enforcer.load_rules()
+ self.enforcer.rules = None
+ result = self.enforcer.enforce('rule', "target", "creds")
self.assertEqual(result, False)
- self.assertEqual(policy._rules, None)
+ self.assertEqual(self.enforcer.rules, None)
def test_check_missing_rule(self):
- policy._rules = {}
- result = policy.check('rule', 'target', 'creds')
+ self.enforcer.rules = {}
+ result = self.enforcer.enforce('rule', 'target', 'creds')
self.assertEqual(result, False)
def test_check_with_rule(self):
- policy._rules = dict(default=FakeCheck())
- result = policy.check("default", "target", "creds")
+ self.enforcer.load_rules()
+ self.enforcer.rules = dict(default=FakeCheck())
+ result = self.enforcer.enforce("default", "target", "creds")
- self.assertEqual(result, ("target", "creds"))
+ self.assertEqual(result, ("target", "creds", self.enforcer))
def test_check_raises(self):
- policy._rules = None
+ self.enforcer.rules = None
try:
- policy.check('rule', 'target', 'creds', TestException,
- "arg1", "arg2", kw1="kwarg1", kw2="kwarg2")
+ self.enforcer.enforce('rule', 'target', 'creds',
+ True, TestException, "arg1",
+ "arg2", kw1="kwarg1", kw2="kwarg2")
except TestException as exc:
self.assertEqual(exc.args, ("arg1", "arg2"))
self.assertEqual(exc.kwargs, dict(kw1="kwarg1", kw2="kwarg2"))
else:
- self.fail("policy.check() failed to raise requested exception")
+ self.fail("enforcer.enforce() failed to raise requested exception")
class FalseCheckTestCase(utils.BaseTestCase):
@@ -209,7 +244,7 @@ class TrueCheckTestCase(utils.BaseTestCase):
class CheckForTest(policy.Check):
- def __call__(self, target, creds):
+ def __call__(self, target, creds, enforcer):
pass
@@ -693,42 +728,48 @@ class CheckRegisterTestCase(utils.BaseTestCase):
class RuleCheckTestCase(utils.BaseTestCase):
- @mock.patch.object(policy, '_rules', {})
+ @mock.patch.object(ENFORCER, 'rules', {})
def test_rule_missing(self):
check = policy.RuleCheck('rule', 'spam')
- self.assertEqual(check('target', 'creds'), False)
+ self.assertEqual(check('target', 'creds', ENFORCER), False)
- @mock.patch.object(policy, '_rules',
+ @mock.patch.object(ENFORCER, 'rules',
dict(spam=mock.Mock(return_value=False)))
def test_rule_false(self):
+ enforcer = ENFORCER
+
check = policy.RuleCheck('rule', 'spam')
- self.assertEqual(check('target', 'creds'), False)
- policy._rules['spam'].assert_called_once_with('target', 'creds')
+ self.assertEqual(check('target', 'creds', enforcer), False)
+ enforcer.rules['spam'].assert_called_once_with('target', 'creds',
+ enforcer)
- @mock.patch.object(policy, '_rules',
+ @mock.patch.object(ENFORCER, 'rules',
dict(spam=mock.Mock(return_value=True)))
def test_rule_true(self):
+ enforcer = ENFORCER
check = policy.RuleCheck('rule', 'spam')
- self.assertEqual(check('target', 'creds'), True)
- policy._rules['spam'].assert_called_once_with('target', 'creds')
+ self.assertEqual(check('target', 'creds', enforcer), True)
+ enforcer.rules['spam'].assert_called_once_with('target', 'creds',
+ enforcer)
-class RoleCheckTestCase(utils.BaseTestCase):
+class RoleCheckTestCase(PolicyBaseTestCase):
def test_accept(self):
check = policy.RoleCheck('role', 'sPaM')
- self.assertEqual(check('target', dict(roles=['SpAm'])), True)
+ self.assertEqual(check('target', dict(roles=['SpAm']),
+ self.enforcer), True)
def test_reject(self):
check = policy.RoleCheck('role', 'spam')
- self.assertEqual(check('target', dict(roles=[])), False)
+ self.assertEqual(check('target', dict(roles=[]), self.enforcer), False)
-class HttpCheckTestCase(utils.BaseTestCase):
+class HttpCheckTestCase(PolicyBaseTestCase):
def decode_post_data(self, post_data):
result = {}
for item in post_data.split('&'):
@@ -743,7 +784,8 @@ class HttpCheckTestCase(utils.BaseTestCase):
check = policy.HttpCheck('http', '//example.com/%(name)s')
self.assertEqual(check(dict(name='target', spam='spammer'),
- dict(user='user', roles=['a', 'b', 'c'])),
+ dict(user='user', roles=['a', 'b', 'c']),
+ self.enforcer),
True)
self.assertEqual(mock_urlopen.call_count, 1)
@@ -761,7 +803,8 @@ class HttpCheckTestCase(utils.BaseTestCase):
check = policy.HttpCheck('http', '//example.com/%(name)s')
self.assertEqual(check(dict(name='target', spam='spammer'),
- dict(user='user', roles=['a', 'b', 'c'])),
+ dict(user='user', roles=['a', 'b', 'c']),
+ self.enforcer),
False)
self.assertEqual(mock_urlopen.call_count, 1)
@@ -774,18 +817,22 @@ class HttpCheckTestCase(utils.BaseTestCase):
))
-class GenericCheckTestCase(utils.BaseTestCase):
+class GenericCheckTestCase(PolicyBaseTestCase):
def test_no_cred(self):
check = policy.GenericCheck('name', '%(name)s')
- self.assertEqual(check(dict(name='spam'), {}), False)
+ self.assertEqual(check(dict(name='spam'), {}, self.enforcer), False)
def test_cred_mismatch(self):
check = policy.GenericCheck('name', '%(name)s')
- self.assertEqual(check(dict(name='spam'), dict(name='ham')), False)
+ self.assertEqual(check(dict(name='spam'),
+ dict(name='ham'),
+ self.enforcer), False)
def test_accept(self):
check = policy.GenericCheck('name', '%(name)s')
- self.assertEqual(check(dict(name='spam'), dict(name='spam')), True)
+ self.assertEqual(check(dict(name='spam'),
+ dict(name='spam'),
+ self.enforcer), True)
diff --git a/tests/var/policy.json b/tests/var/policy.json
new file mode 100644
index 0000000..73730ae
--- /dev/null
+++ b/tests/var/policy.json
@@ -0,0 +1,4 @@
+{
+ "default": "rule:admin",
+ "admin": "is_admin:True"
+}