diff options
| author | Kevin L. Mitchell <kevin.mitchell@rackspace.com> | 2012-06-04 10:25:38 -0500 |
|---|---|---|
| committer | Kevin L. Mitchell <kevin.mitchell@rackspace.com> | 2012-06-04 10:28:41 -0500 |
| commit | dfe65cc17b197b3728e114761739cb8be1ec59d2 (patch) | |
| tree | 38288ba6e67ad794225a430c585053b843c746f7 | |
| parent | ffbffc3dcaf20a82b91a0b5fa48b74d6edf07d8e (diff) | |
| download | oslo-dfe65cc17b197b3728e114761739cb8be1ec59d2.tar.gz oslo-dfe65cc17b197b3728e114761739cb8be1ec59d2.tar.xz oslo-dfe65cc17b197b3728e114761739cb8be1ec59d2.zip | |
Common-ize policies.
Both Glance and Nova make use of (almost) the same policy system,
defined in glance/common/policy.py and nova/common/policy.py,
respectively. They also have independent glue code
(glance/api/policy.py and nova/policy.py), so that the common
policy system is substantially similar. This makes policies a
perfect candidate for incorporation into openstack-common,
particularly given that this same code will soon be used by
Quantum.
This change adds the common policy module (along with some minor
interface changes and bug fixes) to openstack-common, along with
a test suite.
Change-Id: I0022a91f16ded28f9dc6b4975ef1b6e4cc8460ac
| -rw-r--r-- | openstack/common/policy.py | 238 | ||||
| -rw-r--r-- | tests/unit/test_policy.py | 401 |
2 files changed, 639 insertions, 0 deletions
diff --git a/openstack/common/policy.py b/openstack/common/policy.py new file mode 100644 index 0000000..203995a --- /dev/null +++ b/openstack/common/policy.py @@ -0,0 +1,238 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2011 OpenStack, LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Common Policy Engine Implementation""" + +import json +import logging +import urllib +import urllib2 + + +LOG = logging.getLogger(__name__) + + +_BRAIN = None + + +def set_brain(brain): + """Set the brain used by enforce(). + + Defaults use Brain() if not set. + + """ + global _BRAIN + _BRAIN = brain + + +def reset(): + """Clear the brain used by enforce().""" + global _BRAIN + _BRAIN = None + + +def enforce(match_list, target_dict, credentials_dict, exc=None, + *args, **kwargs): + """Enforces authorization of some rules against credentials. + + :param match_list: nested tuples of data to match against + + The basic brain supports three types of match lists: + + 1) rules + + looks like: ``('rule:compute:get_instance',)`` + + Retrieves the named rule from the rules dict and recursively + checks against the contents of the rule. + + 2) roles + + looks like: ``('role:compute:admin',)`` + + Matches if the specified role is in credentials_dict['roles']. + + 3) generic + + looks like: ``('tenant_id:%(tenant_id)s',)`` + + Substitutes values from the target dict into the match using + the % operator and matches them against the creds dict. + + Combining rules: + + The brain returns True if any of the outer tuple of rules + match and also True if all of the inner tuples match. You + can use this to perform simple boolean logic. For + example, the following rule would return True if the creds + contain the role 'admin' OR the if the tenant_id matches + the target dict AND the the creds contains the role + 'compute_sysadmin': + + :: + + { + "rule:combined": ( + 'role:admin', + ('tenant_id:%(tenant_id)s', 'role:compute_sysadmin') + ) + } + + Note that rule and role are reserved words in the credentials match, so + you can't match against properties with those names. Custom brains may + also add new reserved words. For example, the HttpBrain adds http as a + reserved word. + + :param target_dict: dict of object properties + + Target dicts contain as much information as we can about the object being + operated on. + + :param credentials_dict: dict of actor properties + + Credentials dicts contain as much information as we can about the user + performing the action. + + :param exc: exception to raise + + Class of the exception to raise if the check fails. Any remaining + arguments passed to enforce() (both positional and keyword arguments) + will be passed to the exception class. If exc is not provided, returns + False. + + :return: True if the policy allows the action + :return: False if the policy does not allow the action and exc is not set + """ + global _BRAIN + if not _BRAIN: + _BRAIN = Brain() + if not _BRAIN.check(match_list, target_dict, credentials_dict): + if exc: + raise exc(*args, **kwargs) + return False + return True + + +class Brain(object): + """Implements policy checking.""" + @classmethod + def load_json(cls, data, default_rule=None): + """Init a brain using json instead of a rules dictionary.""" + rules_dict = json.loads(data) + return cls(rules=rules_dict, default_rule=default_rule) + + def __init__(self, rules=None, default_rule=None): + self.rules = rules or {} + self.default_rule = default_rule + + def add_rule(self, key, match): + self.rules[key] = match + + def _check(self, match, target_dict, cred_dict): + try: + match_kind, match_value = match.split(':', 1) + except Exception: + LOG.exception(_("Failed to understand rule %(match)r") % locals()) + # If the rule is invalid, fail closed + return False + try: + f = getattr(self, '_check_%s' % match_kind) + except AttributeError: + if not self._check_generic(match, target_dict, cred_dict): + return False + else: + if not f(match_value, target_dict, cred_dict): + return False + return True + + def check(self, match_list, target_dict, cred_dict): + """Checks authorization of some rules against credentials. + + Detailed description of the check with examples in policy.enforce(). + + :param match_list: nested tuples of data to match against + :param target_dict: dict of object properties + :param credentials_dict: dict of actor properties + + :returns: True if the check passes + + """ + if not match_list: + return True + for and_list in match_list: + if isinstance(and_list, basestring): + and_list = (and_list,) + if all([self._check(item, target_dict, cred_dict) + for item in and_list]): + return True + return False + + def _check_rule(self, match, target_dict, cred_dict): + """Recursively checks credentials based on the brains rules.""" + try: + new_match_list = self.rules[match] + except KeyError: + if self.default_rule and match != self.default_rule: + new_match_list = ('rule:%s' % self.default_rule,) + else: + return False + + return self.check(new_match_list, target_dict, cred_dict) + + def _check_role(self, match, target_dict, cred_dict): + """Check that there is a matching role in the cred dict.""" + return match.lower() in [x.lower() for x in cred_dict['roles']] + + def _check_generic(self, match, target_dict, cred_dict): + """Check an individual match. + + Matches look like: + + tenant:%(tenant_id)s + role:compute:admin + + """ + + # TODO(termie): do dict inspection via dot syntax + match = match % target_dict + key, value = match.split(':', 1) + if key in cred_dict: + return value == cred_dict[key] + return False + + +class HttpBrain(Brain): + """A brain that can check external urls for policy. + + Posts json blobs for target and credentials. + + """ + + def _check_http(self, match, target_dict, cred_dict): + """Check http: rules by calling to a remote server. + + This example implementation simply verifies that the response is + exactly 'True'. A custom brain using response codes could easily + be implemented. + + """ + url = match % target_dict + data = {'target': json.dumps(target_dict), + 'credentials': json.dumps(cred_dict)} + post_data = urllib.urlencode(data) + f = urllib2.urlopen(url, post_data) + return f.read() == "True" diff --git a/tests/unit/test_policy.py b/tests/unit/test_policy.py new file mode 100644 index 0000000..da70820 --- /dev/null +++ b/tests/unit/test_policy.py @@ -0,0 +1,401 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2012 OpenStack, LLC. +# All Rights Reserved. + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Test of Policy Engine For Nova""" + +import json +import os.path +import StringIO +import unittest +import urllib + +import mock +import urllib2 + +from openstack.common import policy + + +class PolicySetAndResetTestCase(unittest.TestCase): + def tearDown(self): + # Make sure the policy brain is reset for remaining tests + policy._BRAIN = None + + def test_set_brain(self): + # Make sure the brain is set properly + policy._BRAIN = None + policy.set_brain('spam') + self.assertEqual(policy._BRAIN, 'spam') + + def test_reset(self): + # Make sure the brain is set to something + policy._BRAIN = 'spam' + policy.reset() + self.assertEqual(policy._BRAIN, None) + + +class FakeBrain(object): + check_result = True + + def check(self, match_list, target_dict, credentials_dict): + self.match_list = match_list + self.target_dict = target_dict + self.credentials_dict = credentials_dict + return self.check_result + + +class PolicyEnforceTestCase(unittest.TestCase): + def setUp(self): + self.fake_brain = FakeBrain() + policy._BRAIN = self.fake_brain + + def tearDown(self): + policy.reset() + + def check_args(self, match_list, target_dict, credentials_dict): + self.assertEqual(self.fake_brain.match_list, match_list) + self.assertEqual(self.fake_brain.target_dict, target_dict) + self.assertEqual(self.fake_brain.credentials_dict, credentials_dict) + + def test_make_new_brain(self): + with mock.patch.object(policy, "Brain") as fake_brain: + fake_brain.check.return_value = True + + result = policy.enforce("match", "target", "credentials") + + self.assertNotEqual(policy._BRAIN, None) + self.assertEqual(result, True) + + def test_use_existing_brain(self): + result = policy.enforce("match", "target", "credentials") + + self.assertNotEqual(policy._BRAIN, None) + self.assertEqual(result, True) + self.check_args("match", "target", "credentials") + + def test_fail_no_exc(self): + self.fake_brain.check_result = False + result = policy.enforce("match", "target", "credentials") + + self.assertEqual(result, False) + self.check_args("match", "target", "credentials") + + def test_fail_with_exc(self): + class TestException(Exception): + def __init__(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + + self.fake_brain.check_result = False + try: + result = policy.enforce("match", "target", "credentials", + TestException, "arg1", "arg2", + kw1="kwarg1", kw2="kwarg2") + except TestException as exc: + self.assertEqual(exc.args, ("arg1", "arg2")) + self.assertEqual(exc.kwargs, dict(kw1="kwarg1", kw2="kwarg2")) + else: + self.fail("policy.enforce() failed to raise requested exception") + + +class BrainTestCase(unittest.TestCase): + def test_basic_init(self): + brain = policy.Brain() + + self.assertEqual(brain.rules, {}) + self.assertEqual(brain.default_rule, None) + + def test_init_with_args(self): + brain = policy.Brain(rules=dict(a="foo", b="bar"), default_rule="a") + + self.assertEqual(brain.rules, dict(a="foo", b="bar")) + self.assertEqual(brain.default_rule, "a") + + def test_load_json(self): + exemplar = """{ + "admin_or_owner": [["role:admin"], ["project_id:%(project_id)s"]], + "default": [] +}""" + brain = policy.Brain.load_json(exemplar, "default") + + self.assertEqual(brain.rules, dict( + admin_or_owner=[["role:admin"], ["project_id:%(project_id)s"]], + default=[], + )) + self.assertEqual(brain.default_rule, "default") + + def test_add_rule(self): + brain = policy.Brain() + brain.add_rule("rule1", + [["role:admin"], ["project_id:%(project_id)s"]]) + + self.assertEqual(brain.rules, dict( + rule1=[["role:admin"], ["project_id:%(project_id)s"]])) + + def test_check_with_badmatch(self): + brain = policy.Brain() + result = brain._check("badmatch", "target", "credentials") + + self.assertEqual(result, False) + + def test_check_with_specific(self): + self.spam_called = False + + class TestBrain(policy.Brain): + def _check_spam(inst, match, target_dict, cred_dict): + self.assertEqual(match, "check") + self.assertEqual(target_dict, "target") + self.assertEqual(cred_dict, "credentials") + self.spam_called = True + + brain = TestBrain() + result = brain._check("spam:check", "target", "credentials") + + self.assertEqual(self.spam_called, True) + + def test_check_with_generic(self): + self.generic_called = False + + class TestBrain(policy.Brain): + def _check_generic(inst, match, target_dict, cred_dict): + self.assertEqual(match, "spam:check") + self.assertEqual(target_dict, "target") + self.assertEqual(cred_dict, "credentials") + self.generic_called = True + + brain = TestBrain() + result = brain._check("spam:check", "target", "credentials") + + self.assertEqual(self.generic_called, True) + + def test_check_empty(self): + class TestBrain(policy.Brain): + def _check(inst, match, target_dict, cred_dict): + self.fail("_check() called for empty match list!") + + brain = TestBrain() + result = brain.check([], "target", "credentials") + + self.assertEqual(result, True) + + def stub__check(self): + self._check_called = 0 + self.matches = [] + self.targets = [] + self.creds = [] + + class TestBrain(policy.Brain): + def _check(inst, match, target_dict, cred_dict): + self._check_called += 1 + self.matches.append(match) + self.targets.append(target_dict) + self.creds.append(cred_dict) + return match == "True" + + return TestBrain() + + def test_check_basic_true(self): + brain = self.stub__check() + result = brain.check(["True"], "target", "creds") + + self.assertEqual(result, True) + self.assertEqual(self._check_called, 1) + self.assertEqual(self.matches, ["True"]) + self.assertEqual(self.targets, ["target"]) + self.assertEqual(self.creds, ["creds"]) + + def test_check_basic_false(self): + brain = self.stub__check() + result = brain.check(["False"], "target", "creds") + + self.assertEqual(result, False) + self.assertEqual(self._check_called, 1) + self.assertEqual(self.matches, ["False"]) + self.assertEqual(self.targets, ["target"]) + self.assertEqual(self.creds, ["creds"]) + + def test_check_or_true(self): + brain = self.stub__check() + result = brain.check([["False"], ["True"], ["False"]], + "target", "creds") + + self.assertEqual(result, True) + self.assertEqual(self._check_called, 2) + self.assertEqual(self.matches, ["False", "True"]) + self.assertEqual(self.targets, ["target", "target"]) + self.assertEqual(self.creds, ["creds", "creds"]) + + def test_check_or_false(self): + brain = self.stub__check() + result = brain.check([["False"], ["False"], ["False"]], + "target", "creds") + + self.assertEqual(result, False) + self.assertEqual(self._check_called, 3) + self.assertEqual(self.matches, ["False", "False", "False"]) + self.assertEqual(self.targets, ["target", "target", "target"]) + self.assertEqual(self.creds, ["creds", "creds", "creds"]) + + def test_check_and_true(self): + brain = self.stub__check() + result = brain.check([["True", "True", "True"]], + "target", "creds") + + self.assertEqual(result, True) + self.assertEqual(self._check_called, 3) + self.assertEqual(self.matches, ["True", "True", "True"]) + self.assertEqual(self.targets, ["target", "target", "target"]) + self.assertEqual(self.creds, ["creds", "creds", "creds"]) + + def test_check_and_false(self): + brain = self.stub__check() + result = brain.check([["True", "True", "False"]], + "target", "creds") + + self.assertEqual(result, False) + self.assertEqual(self._check_called, 3) + self.assertEqual(self.matches, ["True", "True", "False"]) + self.assertEqual(self.targets, ["target", "target", "target"]) + self.assertEqual(self.creds, ["creds", "creds", "creds"]) + + def stub__check_rule(self, rules=None, default_rule=None): + self.check_called = False + + class TestBrain(policy.Brain): + def check(inst, matchs, target_dict, cred_dict): + self.check_called = True + self.target = target_dict + self.cred = cred_dict + return matchs + + return TestBrain(rules=rules, default_rule=default_rule) + + def test_rule_no_rules_no_default(self): + brain = self.stub__check_rule() + result = brain._check_rule("spam", "target", "creds") + + self.assertEqual(result, False) + self.assertEqual(self.check_called, False) + + def test_rule_no_rules_default(self): + brain = self.stub__check_rule(default_rule="spam") + result = brain._check_rule("spam", "target", "creds") + + self.assertEqual(result, False) + self.assertEqual(self.check_called, False) + + def test_rule_no_rules_non_default(self): + brain = self.stub__check_rule(default_rule="spam") + result = brain._check_rule("python", "target", "creds") + + self.assertEqual(self.check_called, True) + self.assertEqual(result, ("rule:spam",)) + self.assertEqual(self.target, "target") + self.assertEqual(self.cred, "creds") + + def test_rule_with_rules(self): + brain = self.stub__check_rule(rules=dict(spam=["hiho:ni"])) + result = brain._check_rule("spam", "target", "creds") + + self.assertEqual(self.check_called, True) + self.assertEqual(result, ["hiho:ni"]) + self.assertEqual(self.target, "target") + self.assertEqual(self.cred, "creds") + + def test_role_no_match(self): + brain = policy.Brain() + result = brain._check_role("SpAm", {}, dict(roles=["a", "b", "c"])) + + self.assertEqual(result, False) + + def test_role_with_match(self): + brain = policy.Brain() + result = brain._check_role("SpAm", {}, dict(roles=["a", "b", "sPaM"])) + + self.assertEqual(result, True) + + def test_generic_no_key(self): + brain = policy.Brain() + result = brain._check_generic("tenant:%(tenant_id)s", + dict(tenant_id="spam"), + {}) + + self.assertEqual(result, False) + + def test_generic_with_key_mismatch(self): + brain = policy.Brain() + result = brain._check_generic("tenant:%(tenant_id)s", + dict(tenant_id="spam"), + dict(tenant="nospam")) + + self.assertEqual(result, False) + + def test_generic_with_key_match(self): + brain = policy.Brain() + result = brain._check_generic("tenant:%(tenant_id)s", + dict(tenant_id="spam"), + dict(tenant="spam")) + + self.assertEqual(result, True) + + +class HttpBrainTestCase(unittest.TestCase): + def setUp(self): + self.urlopen_result = "" + + def fake_urlopen(url, post_data): + self.url = url + self.post_data = post_data + return StringIO.StringIO(self.urlopen_result) + + self.patcher = mock.patch.object(urllib2, "urlopen", fake_urlopen) + self.patcher.start() + + def tearDown(self): + self.patcher.stop() + + def decode_post_data(self): + result = {} + for item in self.post_data.split('&'): + key, _sep, value = item.partition('=') + result[key] = json.loads(urllib.unquote_plus(value)) + + return result + + def test_http_false(self): + brain = policy.HttpBrain() + result = brain._check_http("//spam.example.org/%(tenant)s", + dict(tenant="spam"), + dict(roles=["a", "b", "c"])) + + self.assertEqual(result, False) + self.assertEqual(self.url, "//spam.example.org/spam") + self.assertEqual(self.decode_post_data(), dict( + target=dict(tenant="spam"), + credentials=dict(roles=["a", "b", "c"]))) + + def test_http_true(self): + self.urlopen_result = "True" + brain = policy.HttpBrain() + result = brain._check_http("//spam.example.org/%(tenant)s", + dict(tenant="spam"), + dict(roles=["a", "b", "c"])) + + self.assertEqual(result, True) + self.assertEqual(self.url, "//spam.example.org/spam") + self.assertEqual(self.decode_post_data(), dict( + target=dict(tenant="spam"), + credentials=dict(roles=["a", "b", "c"]))) |
