summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKevin L. Mitchell <kevin.mitchell@rackspace.com>2012-06-04 10:25:38 -0500
committerKevin L. Mitchell <kevin.mitchell@rackspace.com>2012-06-04 10:28:41 -0500
commitdfe65cc17b197b3728e114761739cb8be1ec59d2 (patch)
tree38288ba6e67ad794225a430c585053b843c746f7
parentffbffc3dcaf20a82b91a0b5fa48b74d6edf07d8e (diff)
downloadoslo-dfe65cc17b197b3728e114761739cb8be1ec59d2.tar.gz
oslo-dfe65cc17b197b3728e114761739cb8be1ec59d2.tar.xz
oslo-dfe65cc17b197b3728e114761739cb8be1ec59d2.zip
Common-ize policies.
Both Glance and Nova make use of (almost) the same policy system, defined in glance/common/policy.py and nova/common/policy.py, respectively. They also have independent glue code (glance/api/policy.py and nova/policy.py), so that the common policy system is substantially similar. This makes policies a perfect candidate for incorporation into openstack-common, particularly given that this same code will soon be used by Quantum. This change adds the common policy module (along with some minor interface changes and bug fixes) to openstack-common, along with a test suite. Change-Id: I0022a91f16ded28f9dc6b4975ef1b6e4cc8460ac
-rw-r--r--openstack/common/policy.py238
-rw-r--r--tests/unit/test_policy.py401
2 files changed, 639 insertions, 0 deletions
diff --git a/openstack/common/policy.py b/openstack/common/policy.py
new file mode 100644
index 0000000..203995a
--- /dev/null
+++ b/openstack/common/policy.py
@@ -0,0 +1,238 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2011 OpenStack, LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Common Policy Engine Implementation"""
+
+import json
+import logging
+import urllib
+import urllib2
+
+
+LOG = logging.getLogger(__name__)
+
+
+_BRAIN = None
+
+
+def set_brain(brain):
+ """Set the brain used by enforce().
+
+ Defaults use Brain() if not set.
+
+ """
+ global _BRAIN
+ _BRAIN = brain
+
+
+def reset():
+ """Clear the brain used by enforce()."""
+ global _BRAIN
+ _BRAIN = None
+
+
+def enforce(match_list, target_dict, credentials_dict, exc=None,
+ *args, **kwargs):
+ """Enforces authorization of some rules against credentials.
+
+ :param match_list: nested tuples of data to match against
+
+ The basic brain supports three types of match lists:
+
+ 1) rules
+
+ looks like: ``('rule:compute:get_instance',)``
+
+ Retrieves the named rule from the rules dict and recursively
+ checks against the contents of the rule.
+
+ 2) roles
+
+ looks like: ``('role:compute:admin',)``
+
+ Matches if the specified role is in credentials_dict['roles'].
+
+ 3) generic
+
+ looks like: ``('tenant_id:%(tenant_id)s',)``
+
+ Substitutes values from the target dict into the match using
+ the % operator and matches them against the creds dict.
+
+ Combining rules:
+
+ The brain returns True if any of the outer tuple of rules
+ match and also True if all of the inner tuples match. You
+ can use this to perform simple boolean logic. For
+ example, the following rule would return True if the creds
+ contain the role 'admin' OR the if the tenant_id matches
+ the target dict AND the the creds contains the role
+ 'compute_sysadmin':
+
+ ::
+
+ {
+ "rule:combined": (
+ 'role:admin',
+ ('tenant_id:%(tenant_id)s', 'role:compute_sysadmin')
+ )
+ }
+
+ Note that rule and role are reserved words in the credentials match, so
+ you can't match against properties with those names. Custom brains may
+ also add new reserved words. For example, the HttpBrain adds http as a
+ reserved word.
+
+ :param target_dict: dict of object properties
+
+ Target dicts contain as much information as we can about the object being
+ operated on.
+
+ :param credentials_dict: dict of actor properties
+
+ Credentials dicts contain as much information as we can about the user
+ performing the action.
+
+ :param exc: exception to raise
+
+ Class of the exception to raise if the check fails. Any remaining
+ arguments passed to enforce() (both positional and keyword arguments)
+ will be passed to the exception class. If exc is not provided, returns
+ False.
+
+ :return: True if the policy allows the action
+ :return: False if the policy does not allow the action and exc is not set
+ """
+ global _BRAIN
+ if not _BRAIN:
+ _BRAIN = Brain()
+ if not _BRAIN.check(match_list, target_dict, credentials_dict):
+ if exc:
+ raise exc(*args, **kwargs)
+ return False
+ return True
+
+
+class Brain(object):
+ """Implements policy checking."""
+ @classmethod
+ def load_json(cls, data, default_rule=None):
+ """Init a brain using json instead of a rules dictionary."""
+ rules_dict = json.loads(data)
+ return cls(rules=rules_dict, default_rule=default_rule)
+
+ def __init__(self, rules=None, default_rule=None):
+ self.rules = rules or {}
+ self.default_rule = default_rule
+
+ def add_rule(self, key, match):
+ self.rules[key] = match
+
+ def _check(self, match, target_dict, cred_dict):
+ try:
+ match_kind, match_value = match.split(':', 1)
+ except Exception:
+ LOG.exception(_("Failed to understand rule %(match)r") % locals())
+ # If the rule is invalid, fail closed
+ return False
+ try:
+ f = getattr(self, '_check_%s' % match_kind)
+ except AttributeError:
+ if not self._check_generic(match, target_dict, cred_dict):
+ return False
+ else:
+ if not f(match_value, target_dict, cred_dict):
+ return False
+ return True
+
+ def check(self, match_list, target_dict, cred_dict):
+ """Checks authorization of some rules against credentials.
+
+ Detailed description of the check with examples in policy.enforce().
+
+ :param match_list: nested tuples of data to match against
+ :param target_dict: dict of object properties
+ :param credentials_dict: dict of actor properties
+
+ :returns: True if the check passes
+
+ """
+ if not match_list:
+ return True
+ for and_list in match_list:
+ if isinstance(and_list, basestring):
+ and_list = (and_list,)
+ if all([self._check(item, target_dict, cred_dict)
+ for item in and_list]):
+ return True
+ return False
+
+ def _check_rule(self, match, target_dict, cred_dict):
+ """Recursively checks credentials based on the brains rules."""
+ try:
+ new_match_list = self.rules[match]
+ except KeyError:
+ if self.default_rule and match != self.default_rule:
+ new_match_list = ('rule:%s' % self.default_rule,)
+ else:
+ return False
+
+ return self.check(new_match_list, target_dict, cred_dict)
+
+ def _check_role(self, match, target_dict, cred_dict):
+ """Check that there is a matching role in the cred dict."""
+ return match.lower() in [x.lower() for x in cred_dict['roles']]
+
+ def _check_generic(self, match, target_dict, cred_dict):
+ """Check an individual match.
+
+ Matches look like:
+
+ tenant:%(tenant_id)s
+ role:compute:admin
+
+ """
+
+ # TODO(termie): do dict inspection via dot syntax
+ match = match % target_dict
+ key, value = match.split(':', 1)
+ if key in cred_dict:
+ return value == cred_dict[key]
+ return False
+
+
+class HttpBrain(Brain):
+ """A brain that can check external urls for policy.
+
+ Posts json blobs for target and credentials.
+
+ """
+
+ def _check_http(self, match, target_dict, cred_dict):
+ """Check http: rules by calling to a remote server.
+
+ This example implementation simply verifies that the response is
+ exactly 'True'. A custom brain using response codes could easily
+ be implemented.
+
+ """
+ url = match % target_dict
+ data = {'target': json.dumps(target_dict),
+ 'credentials': json.dumps(cred_dict)}
+ post_data = urllib.urlencode(data)
+ f = urllib2.urlopen(url, post_data)
+ return f.read() == "True"
diff --git a/tests/unit/test_policy.py b/tests/unit/test_policy.py
new file mode 100644
index 0000000..da70820
--- /dev/null
+++ b/tests/unit/test_policy.py
@@ -0,0 +1,401 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2012 OpenStack, LLC.
+# All Rights Reserved.
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Test of Policy Engine For Nova"""
+
+import json
+import os.path
+import StringIO
+import unittest
+import urllib
+
+import mock
+import urllib2
+
+from openstack.common import policy
+
+
+class PolicySetAndResetTestCase(unittest.TestCase):
+ def tearDown(self):
+ # Make sure the policy brain is reset for remaining tests
+ policy._BRAIN = None
+
+ def test_set_brain(self):
+ # Make sure the brain is set properly
+ policy._BRAIN = None
+ policy.set_brain('spam')
+ self.assertEqual(policy._BRAIN, 'spam')
+
+ def test_reset(self):
+ # Make sure the brain is set to something
+ policy._BRAIN = 'spam'
+ policy.reset()
+ self.assertEqual(policy._BRAIN, None)
+
+
+class FakeBrain(object):
+ check_result = True
+
+ def check(self, match_list, target_dict, credentials_dict):
+ self.match_list = match_list
+ self.target_dict = target_dict
+ self.credentials_dict = credentials_dict
+ return self.check_result
+
+
+class PolicyEnforceTestCase(unittest.TestCase):
+ def setUp(self):
+ self.fake_brain = FakeBrain()
+ policy._BRAIN = self.fake_brain
+
+ def tearDown(self):
+ policy.reset()
+
+ def check_args(self, match_list, target_dict, credentials_dict):
+ self.assertEqual(self.fake_brain.match_list, match_list)
+ self.assertEqual(self.fake_brain.target_dict, target_dict)
+ self.assertEqual(self.fake_brain.credentials_dict, credentials_dict)
+
+ def test_make_new_brain(self):
+ with mock.patch.object(policy, "Brain") as fake_brain:
+ fake_brain.check.return_value = True
+
+ result = policy.enforce("match", "target", "credentials")
+
+ self.assertNotEqual(policy._BRAIN, None)
+ self.assertEqual(result, True)
+
+ def test_use_existing_brain(self):
+ result = policy.enforce("match", "target", "credentials")
+
+ self.assertNotEqual(policy._BRAIN, None)
+ self.assertEqual(result, True)
+ self.check_args("match", "target", "credentials")
+
+ def test_fail_no_exc(self):
+ self.fake_brain.check_result = False
+ result = policy.enforce("match", "target", "credentials")
+
+ self.assertEqual(result, False)
+ self.check_args("match", "target", "credentials")
+
+ def test_fail_with_exc(self):
+ class TestException(Exception):
+ def __init__(self, *args, **kwargs):
+ self.args = args
+ self.kwargs = kwargs
+
+ self.fake_brain.check_result = False
+ try:
+ result = policy.enforce("match", "target", "credentials",
+ TestException, "arg1", "arg2",
+ kw1="kwarg1", kw2="kwarg2")
+ except TestException as exc:
+ self.assertEqual(exc.args, ("arg1", "arg2"))
+ self.assertEqual(exc.kwargs, dict(kw1="kwarg1", kw2="kwarg2"))
+ else:
+ self.fail("policy.enforce() failed to raise requested exception")
+
+
+class BrainTestCase(unittest.TestCase):
+ def test_basic_init(self):
+ brain = policy.Brain()
+
+ self.assertEqual(brain.rules, {})
+ self.assertEqual(brain.default_rule, None)
+
+ def test_init_with_args(self):
+ brain = policy.Brain(rules=dict(a="foo", b="bar"), default_rule="a")
+
+ self.assertEqual(brain.rules, dict(a="foo", b="bar"))
+ self.assertEqual(brain.default_rule, "a")
+
+ def test_load_json(self):
+ exemplar = """{
+ "admin_or_owner": [["role:admin"], ["project_id:%(project_id)s"]],
+ "default": []
+}"""
+ brain = policy.Brain.load_json(exemplar, "default")
+
+ self.assertEqual(brain.rules, dict(
+ admin_or_owner=[["role:admin"], ["project_id:%(project_id)s"]],
+ default=[],
+ ))
+ self.assertEqual(brain.default_rule, "default")
+
+ def test_add_rule(self):
+ brain = policy.Brain()
+ brain.add_rule("rule1",
+ [["role:admin"], ["project_id:%(project_id)s"]])
+
+ self.assertEqual(brain.rules, dict(
+ rule1=[["role:admin"], ["project_id:%(project_id)s"]]))
+
+ def test_check_with_badmatch(self):
+ brain = policy.Brain()
+ result = brain._check("badmatch", "target", "credentials")
+
+ self.assertEqual(result, False)
+
+ def test_check_with_specific(self):
+ self.spam_called = False
+
+ class TestBrain(policy.Brain):
+ def _check_spam(inst, match, target_dict, cred_dict):
+ self.assertEqual(match, "check")
+ self.assertEqual(target_dict, "target")
+ self.assertEqual(cred_dict, "credentials")
+ self.spam_called = True
+
+ brain = TestBrain()
+ result = brain._check("spam:check", "target", "credentials")
+
+ self.assertEqual(self.spam_called, True)
+
+ def test_check_with_generic(self):
+ self.generic_called = False
+
+ class TestBrain(policy.Brain):
+ def _check_generic(inst, match, target_dict, cred_dict):
+ self.assertEqual(match, "spam:check")
+ self.assertEqual(target_dict, "target")
+ self.assertEqual(cred_dict, "credentials")
+ self.generic_called = True
+
+ brain = TestBrain()
+ result = brain._check("spam:check", "target", "credentials")
+
+ self.assertEqual(self.generic_called, True)
+
+ def test_check_empty(self):
+ class TestBrain(policy.Brain):
+ def _check(inst, match, target_dict, cred_dict):
+ self.fail("_check() called for empty match list!")
+
+ brain = TestBrain()
+ result = brain.check([], "target", "credentials")
+
+ self.assertEqual(result, True)
+
+ def stub__check(self):
+ self._check_called = 0
+ self.matches = []
+ self.targets = []
+ self.creds = []
+
+ class TestBrain(policy.Brain):
+ def _check(inst, match, target_dict, cred_dict):
+ self._check_called += 1
+ self.matches.append(match)
+ self.targets.append(target_dict)
+ self.creds.append(cred_dict)
+ return match == "True"
+
+ return TestBrain()
+
+ def test_check_basic_true(self):
+ brain = self.stub__check()
+ result = brain.check(["True"], "target", "creds")
+
+ self.assertEqual(result, True)
+ self.assertEqual(self._check_called, 1)
+ self.assertEqual(self.matches, ["True"])
+ self.assertEqual(self.targets, ["target"])
+ self.assertEqual(self.creds, ["creds"])
+
+ def test_check_basic_false(self):
+ brain = self.stub__check()
+ result = brain.check(["False"], "target", "creds")
+
+ self.assertEqual(result, False)
+ self.assertEqual(self._check_called, 1)
+ self.assertEqual(self.matches, ["False"])
+ self.assertEqual(self.targets, ["target"])
+ self.assertEqual(self.creds, ["creds"])
+
+ def test_check_or_true(self):
+ brain = self.stub__check()
+ result = brain.check([["False"], ["True"], ["False"]],
+ "target", "creds")
+
+ self.assertEqual(result, True)
+ self.assertEqual(self._check_called, 2)
+ self.assertEqual(self.matches, ["False", "True"])
+ self.assertEqual(self.targets, ["target", "target"])
+ self.assertEqual(self.creds, ["creds", "creds"])
+
+ def test_check_or_false(self):
+ brain = self.stub__check()
+ result = brain.check([["False"], ["False"], ["False"]],
+ "target", "creds")
+
+ self.assertEqual(result, False)
+ self.assertEqual(self._check_called, 3)
+ self.assertEqual(self.matches, ["False", "False", "False"])
+ self.assertEqual(self.targets, ["target", "target", "target"])
+ self.assertEqual(self.creds, ["creds", "creds", "creds"])
+
+ def test_check_and_true(self):
+ brain = self.stub__check()
+ result = brain.check([["True", "True", "True"]],
+ "target", "creds")
+
+ self.assertEqual(result, True)
+ self.assertEqual(self._check_called, 3)
+ self.assertEqual(self.matches, ["True", "True", "True"])
+ self.assertEqual(self.targets, ["target", "target", "target"])
+ self.assertEqual(self.creds, ["creds", "creds", "creds"])
+
+ def test_check_and_false(self):
+ brain = self.stub__check()
+ result = brain.check([["True", "True", "False"]],
+ "target", "creds")
+
+ self.assertEqual(result, False)
+ self.assertEqual(self._check_called, 3)
+ self.assertEqual(self.matches, ["True", "True", "False"])
+ self.assertEqual(self.targets, ["target", "target", "target"])
+ self.assertEqual(self.creds, ["creds", "creds", "creds"])
+
+ def stub__check_rule(self, rules=None, default_rule=None):
+ self.check_called = False
+
+ class TestBrain(policy.Brain):
+ def check(inst, matchs, target_dict, cred_dict):
+ self.check_called = True
+ self.target = target_dict
+ self.cred = cred_dict
+ return matchs
+
+ return TestBrain(rules=rules, default_rule=default_rule)
+
+ def test_rule_no_rules_no_default(self):
+ brain = self.stub__check_rule()
+ result = brain._check_rule("spam", "target", "creds")
+
+ self.assertEqual(result, False)
+ self.assertEqual(self.check_called, False)
+
+ def test_rule_no_rules_default(self):
+ brain = self.stub__check_rule(default_rule="spam")
+ result = brain._check_rule("spam", "target", "creds")
+
+ self.assertEqual(result, False)
+ self.assertEqual(self.check_called, False)
+
+ def test_rule_no_rules_non_default(self):
+ brain = self.stub__check_rule(default_rule="spam")
+ result = brain._check_rule("python", "target", "creds")
+
+ self.assertEqual(self.check_called, True)
+ self.assertEqual(result, ("rule:spam",))
+ self.assertEqual(self.target, "target")
+ self.assertEqual(self.cred, "creds")
+
+ def test_rule_with_rules(self):
+ brain = self.stub__check_rule(rules=dict(spam=["hiho:ni"]))
+ result = brain._check_rule("spam", "target", "creds")
+
+ self.assertEqual(self.check_called, True)
+ self.assertEqual(result, ["hiho:ni"])
+ self.assertEqual(self.target, "target")
+ self.assertEqual(self.cred, "creds")
+
+ def test_role_no_match(self):
+ brain = policy.Brain()
+ result = brain._check_role("SpAm", {}, dict(roles=["a", "b", "c"]))
+
+ self.assertEqual(result, False)
+
+ def test_role_with_match(self):
+ brain = policy.Brain()
+ result = brain._check_role("SpAm", {}, dict(roles=["a", "b", "sPaM"]))
+
+ self.assertEqual(result, True)
+
+ def test_generic_no_key(self):
+ brain = policy.Brain()
+ result = brain._check_generic("tenant:%(tenant_id)s",
+ dict(tenant_id="spam"),
+ {})
+
+ self.assertEqual(result, False)
+
+ def test_generic_with_key_mismatch(self):
+ brain = policy.Brain()
+ result = brain._check_generic("tenant:%(tenant_id)s",
+ dict(tenant_id="spam"),
+ dict(tenant="nospam"))
+
+ self.assertEqual(result, False)
+
+ def test_generic_with_key_match(self):
+ brain = policy.Brain()
+ result = brain._check_generic("tenant:%(tenant_id)s",
+ dict(tenant_id="spam"),
+ dict(tenant="spam"))
+
+ self.assertEqual(result, True)
+
+
+class HttpBrainTestCase(unittest.TestCase):
+ def setUp(self):
+ self.urlopen_result = ""
+
+ def fake_urlopen(url, post_data):
+ self.url = url
+ self.post_data = post_data
+ return StringIO.StringIO(self.urlopen_result)
+
+ self.patcher = mock.patch.object(urllib2, "urlopen", fake_urlopen)
+ self.patcher.start()
+
+ def tearDown(self):
+ self.patcher.stop()
+
+ def decode_post_data(self):
+ result = {}
+ for item in self.post_data.split('&'):
+ key, _sep, value = item.partition('=')
+ result[key] = json.loads(urllib.unquote_plus(value))
+
+ return result
+
+ def test_http_false(self):
+ brain = policy.HttpBrain()
+ result = brain._check_http("//spam.example.org/%(tenant)s",
+ dict(tenant="spam"),
+ dict(roles=["a", "b", "c"]))
+
+ self.assertEqual(result, False)
+ self.assertEqual(self.url, "//spam.example.org/spam")
+ self.assertEqual(self.decode_post_data(), dict(
+ target=dict(tenant="spam"),
+ credentials=dict(roles=["a", "b", "c"])))
+
+ def test_http_true(self):
+ self.urlopen_result = "True"
+ brain = policy.HttpBrain()
+ result = brain._check_http("//spam.example.org/%(tenant)s",
+ dict(tenant="spam"),
+ dict(roles=["a", "b", "c"]))
+
+ self.assertEqual(result, True)
+ self.assertEqual(self.url, "//spam.example.org/spam")
+ self.assertEqual(self.decode_post_data(), dict(
+ target=dict(tenant="spam"),
+ credentials=dict(roles=["a", "b", "c"])))