diff options
| -rw-r--r-- | openstack/common/policy.py | 152 | ||||
| -rw-r--r-- | tests/unit/test_policy.py | 169 |
2 files changed, 219 insertions, 102 deletions
diff --git a/openstack/common/policy.py b/openstack/common/policy.py index 7d6eff9..fd1d7d3 100644 --- a/openstack/common/policy.py +++ b/openstack/common/policy.py @@ -131,6 +131,13 @@ def enforce(match_list, target_dict, credentials_dict, exc=None, class Brain(object): """Implements policy checking.""" + + _checks = {} + + @classmethod + def _register(cls, name, func): + cls._checks[name] = func + @classmethod def load_json(cls, data, default_rule=None): """Init a brain using json instead of a rules dictionary.""" @@ -138,6 +145,11 @@ class Brain(object): return cls(rules=rules_dict, default_rule=default_rule) def __init__(self, rules=None, default_rule=None): + if self.__class__ != Brain: + LOG.warning(_("Inheritance-based rules are deprecated; use " + "the default brain instead of %s.") % + self.__class__.__name__) + self.rules = rules or {} self.default_rule = default_rule @@ -151,15 +163,24 @@ class Brain(object): LOG.exception(_("Failed to understand rule %(match)r") % locals()) # If the rule is invalid, fail closed return False + + func = None try: - f = getattr(self, '_check_%s' % match_kind) + old_func = getattr(self, '_check_%s' % match_kind) except AttributeError: - if not self._check_generic(match, target_dict, cred_dict): - return False + func = self._checks.get(match_kind, self._checks.get(None, None)) else: - if not f(match_value, target_dict, cred_dict): - return False - return True + LOG.warning(_("Inheritance-based rules are deprecated; update " + "_check_%s") % match_kind) + func = (lambda brain, kind, value, target, cred: + old_func(value, target, cred)) + + if not func: + LOG.error(_("No handler for matches of kind %s") % match_kind) + # Fail closed + return False + + return func(self, match_kind, match_value, target_dict, cred_dict) def check(self, match_list, target_dict, cred_dict): """Checks authorization of some rules against credentials. @@ -183,58 +204,97 @@ class Brain(object): return True return False - def _check_rule(self, match, target_dict, cred_dict): - """Recursively checks credentials based on the brains rules.""" - try: - new_match_list = self.rules[match] - except KeyError: - if self.default_rule and match != self.default_rule: - new_match_list = ('rule:%s' % self.default_rule,) - else: - return False - return self.check(new_match_list, target_dict, cred_dict) +class HttpBrain(Brain): + """A brain that can check external urls for policy. - def _check_role(self, match, target_dict, cred_dict): - """Check that there is a matching role in the cred dict.""" - return match.lower() in [x.lower() for x in cred_dict['roles']] + Posts json blobs for target and credentials. - def _check_generic(self, match, target_dict, cred_dict): - """Check an individual match. + Note that this brain is deprecated; the http check is registered + by default. + """ - Matches look like: + pass - tenant:%(tenant_id)s - role:compute:admin - """ +def register(name, func=None): + """ + Register a function as a policy check. + + :param name: Gives the name of the check type, e.g., 'rule', + 'role', etc. If name is None, a default function + will be registered. + :param func: If given, provides the function to register. If not + given, returns a function taking one argument to + specify the function to register, allowing use as a + decorator. + """ - # TODO(termie): do dict inspection via dot syntax - match = match % target_dict - key, value = match.split(':', 1) - if key in cred_dict: - return value == cred_dict[key] - return False + # Perform the actual decoration by registering the function. + # Returns the function for compliance with the decorator + # interface. + def decorator(func): + # Register the function + Brain._register(name, func) + return func + + # If the function is given, do the registration + if func: + return decorator(func) + + return decorator + + +@register("rule") +def _check_rule(brain, match_kind, match, target_dict, cred_dict): + """Recursively checks credentials based on the brains rules.""" + try: + new_match_list = brain.rules[match] + except KeyError: + if brain.default_rule and match != brain.default_rule: + new_match_list = ('rule:%s' % brain.default_rule,) + else: + return False + return brain.check(new_match_list, target_dict, cred_dict) -class HttpBrain(Brain): - """A brain that can check external urls for policy. - Posts json blobs for target and credentials. +@register("role") +def _check_role(brain, match_kind, match, target_dict, cred_dict): + """Check that there is a matching role in the cred dict.""" + return match.lower() in [x.lower() for x in cred_dict['roles']] + + +@register('http') +def _check_http(brain, match_kind, match, target_dict, cred_dict): + """Check http: rules by calling to a remote server. + + This example implementation simply verifies that the response is + exactly 'True'. A custom brain using response codes could easily + be implemented. """ + url = 'http:' + (match % target_dict) + data = {'target': jsonutils.dumps(target_dict), + 'credentials': jsonutils.dumps(cred_dict)} + post_data = urllib.urlencode(data) + f = urllib2.urlopen(url, post_data) + return f.read() == "True" - def _check_http(self, match, target_dict, cred_dict): - """Check http: rules by calling to a remote server. - This example implementation simply verifies that the response is - exactly 'True'. A custom brain using response codes could easily - be implemented. +@register(None) +def _check_generic(brain, match_kind, match, target_dict, cred_dict): + """Check an individual match. - """ - url = match % target_dict - data = {'target': jsonutils.dumps(target_dict), - 'credentials': jsonutils.dumps(cred_dict)} - post_data = urllib.urlencode(data) - f = urllib2.urlopen(url, post_data) - return f.read() == "True" + Matches look like: + + tenant:%(tenant_id)s + role:compute:admin + + """ + + # TODO(termie): do dict inspection via dot syntax + match = match % target_dict + if match_kind in cred_dict: + return match == cred_dict[match_kind] + return False diff --git a/tests/unit/test_policy.py b/tests/unit/test_policy.py index 449c8a9..e7f391f 100644 --- a/tests/unit/test_policy.py +++ b/tests/unit/test_policy.py @@ -157,12 +157,15 @@ class BrainTestCase(unittest.TestCase): def test_check_with_specific(self): self.spam_called = False + def check_spam(brain, kind, match, target_dict, cred_dict): + self.assertEqual(kind, "spam") + self.assertEqual(match, "check") + self.assertEqual(target_dict, "target") + self.assertEqual(cred_dict, "credentials") + self.spam_called = True + class TestBrain(policy.Brain): - def _check_spam(inst, match, target_dict, cred_dict): - self.assertEqual(match, "check") - self.assertEqual(target_dict, "target") - self.assertEqual(cred_dict, "credentials") - self.spam_called = True + _checks = dict(spam=check_spam) brain = TestBrain() result = brain._check("spam:check", "target", "credentials") @@ -172,18 +175,45 @@ class BrainTestCase(unittest.TestCase): def test_check_with_generic(self): self.generic_called = False + def check_generic(brain, kind, match, target_dict, cred_dict): + self.assertEqual(kind, "spam") + self.assertEqual(match, "check") + self.assertEqual(target_dict, "target") + self.assertEqual(cred_dict, "credentials") + self.generic_called = True + class TestBrain(policy.Brain): - def _check_generic(inst, match, target_dict, cred_dict): - self.assertEqual(match, "spam:check") - self.assertEqual(target_dict, "target") - self.assertEqual(cred_dict, "credentials") - self.generic_called = True + _checks = {None: check_generic} brain = TestBrain() result = brain._check("spam:check", "target", "credentials") self.assertEqual(self.generic_called, True) + def test_check_with_inheritance(self): + self.inherited_called = False + + class TestBrain(policy.Brain): + _checks = {} + + def _check_inherited(inst, match, target_dict, cred_dict): + self.assertEqual(match, "check") + self.assertEqual(target_dict, "target") + self.assertEqual(cred_dict, "credentials") + self.inherited_called = True + + brain = TestBrain() + result = brain._check("inherited:check", "target", "credentials") + + self.assertEqual(self.inherited_called, True) + + def test_check_no_handler(self): + class TestBrain(policy.Brain): + _checks = {} + + brain = TestBrain() + self.assertEqual(brain._check('spam:mer', 'target', 'cred'), False) + def test_check_empty(self): class TestBrain(policy.Brain): def _check(inst, match, target_dict, cred_dict): @@ -274,6 +304,52 @@ class BrainTestCase(unittest.TestCase): self.assertEqual(self.targets, ["target", "target", "target"]) self.assertEqual(self.creds, ["creds", "creds", "creds"]) + +class CheckRegisterTestCase(unittest.TestCase): + def setUp(self): + self.brain_checks = policy.Brain._checks + policy.Brain._checks = {} + + def tearDown(self): + policy.Brain._checks = self.brain_checks + + def test_class_register(self): + policy.Brain._register('spam', 'func') + policy.Brain._register('spammer', 'funcer') + + self.assertEqual(policy.Brain._checks, + dict(spam='func', spammer='funcer')) + + def test_register_func(self): + policy.register('spam', 'func') + + self.assertEqual(policy.Brain._checks, + dict(spam='func')) + + def test_register_decorator(self): + @policy.register('spam') + def test_func(): + pass + + self.assertEqual(policy.Brain._checks, + dict(spam=test_func)) + + +class CheckTestCase(unittest.TestCase): + def setUp(self): + self.urlopen_result = "" + + def fake_urlopen(url, post_data): + self.url = url + self.post_data = post_data + return StringIO.StringIO(self.urlopen_result) + + self.patcher = mock.patch.object(urllib2, "urlopen", fake_urlopen) + self.patcher.start() + + def tearDown(self): + self.patcher.stop() + def stub__check_rule(self, rules=None, default_rule=None): self.check_called = False @@ -288,21 +364,21 @@ class BrainTestCase(unittest.TestCase): def test_rule_no_rules_no_default(self): brain = self.stub__check_rule() - result = brain._check_rule("spam", "target", "creds") + result = policy._check_rule(brain, "rule", "spam", "target", "creds") self.assertEqual(result, False) self.assertEqual(self.check_called, False) def test_rule_no_rules_default(self): brain = self.stub__check_rule(default_rule="spam") - result = brain._check_rule("spam", "target", "creds") + result = policy._check_rule(brain, "rule", "spam", "target", "creds") self.assertEqual(result, False) self.assertEqual(self.check_called, False) def test_rule_no_rules_non_default(self): brain = self.stub__check_rule(default_rule="spam") - result = brain._check_rule("python", "target", "creds") + result = policy._check_rule(brain, "rule", "python", "target", "creds") self.assertEqual(self.check_called, True) self.assertEqual(result, ("rule:spam",)) @@ -311,7 +387,7 @@ class BrainTestCase(unittest.TestCase): def test_rule_with_rules(self): brain = self.stub__check_rule(rules=dict(spam=["hiho:ni"])) - result = brain._check_rule("spam", "target", "creds") + result = policy._check_rule(brain, "rule", "spam", "target", "creds") self.assertEqual(self.check_called, True) self.assertEqual(result, ["hiho:ni"]) @@ -319,57 +395,38 @@ class BrainTestCase(unittest.TestCase): self.assertEqual(self.cred, "creds") def test_role_no_match(self): - brain = policy.Brain() - result = brain._check_role("SpAm", {}, dict(roles=["a", "b", "c"])) + result = policy._check_role(None, "role", "SpAm", {}, + dict(roles=["a", "b", "c"])) self.assertEqual(result, False) def test_role_with_match(self): - brain = policy.Brain() - result = brain._check_role("SpAm", {}, dict(roles=["a", "b", "sPaM"])) + result = policy._check_role(None, "role", "SpAm", {}, + dict(roles=["a", "b", "sPaM"])) self.assertEqual(result, True) def test_generic_no_key(self): - brain = policy.Brain() - result = brain._check_generic("tenant:%(tenant_id)s", - dict(tenant_id="spam"), - {}) + result = policy._check_generic(None, "tenant", "%(tenant_id)s", + dict(tenant_id="spam"), + {}) self.assertEqual(result, False) def test_generic_with_key_mismatch(self): - brain = policy.Brain() - result = brain._check_generic("tenant:%(tenant_id)s", - dict(tenant_id="spam"), - dict(tenant="nospam")) + result = policy._check_generic(None, "tenant", "%(tenant_id)s", + dict(tenant_id="spam"), + dict(tenant="nospam")) self.assertEqual(result, False) def test_generic_with_key_match(self): - brain = policy.Brain() - result = brain._check_generic("tenant:%(tenant_id)s", - dict(tenant_id="spam"), - dict(tenant="spam")) + result = policy._check_generic(None, "tenant", "%(tenant_id)s", + dict(tenant_id="spam"), + dict(tenant="spam")) self.assertEqual(result, True) - -class HttpBrainTestCase(unittest.TestCase): - def setUp(self): - self.urlopen_result = "" - - def fake_urlopen(url, post_data): - self.url = url - self.post_data = post_data - return StringIO.StringIO(self.urlopen_result) - - self.patcher = mock.patch.object(urllib2, "urlopen", fake_urlopen) - self.patcher.start() - - def tearDown(self): - self.patcher.stop() - def decode_post_data(self): result = {} for item in self.post_data.split('&'): @@ -379,26 +436,26 @@ class HttpBrainTestCase(unittest.TestCase): return result def test_http_false(self): - brain = policy.HttpBrain() - result = brain._check_http("//spam.example.org/%(tenant)s", - dict(tenant="spam"), - dict(roles=["a", "b", "c"])) + result = policy._check_http(None, "http", + "//spam.example.org/%(tenant)s", + dict(tenant="spam"), + dict(roles=["a", "b", "c"])) self.assertEqual(result, False) - self.assertEqual(self.url, "//spam.example.org/spam") + self.assertEqual(self.url, "http://spam.example.org/spam") self.assertEqual(self.decode_post_data(), dict( target=dict(tenant="spam"), credentials=dict(roles=["a", "b", "c"]))) def test_http_true(self): self.urlopen_result = "True" - brain = policy.HttpBrain() - result = brain._check_http("//spam.example.org/%(tenant)s", - dict(tenant="spam"), - dict(roles=["a", "b", "c"])) + result = policy._check_http(None, "http", + "//spam.example.org/%(tenant)s", + dict(tenant="spam"), + dict(roles=["a", "b", "c"])) self.assertEqual(result, True) - self.assertEqual(self.url, "//spam.example.org/spam") + self.assertEqual(self.url, "http://spam.example.org/spam") self.assertEqual(self.decode_post_data(), dict( target=dict(tenant="spam"), credentials=dict(roles=["a", "b", "c"]))) |
