diff options
| author | Kevin L. Mitchell <kevin.mitchell@rackspace.com> | 2012-07-30 19:23:28 -0500 |
|---|---|---|
| committer | Kevin L. Mitchell <kevin.mitchell@rackspace.com> | 2012-07-30 19:23:54 -0500 |
| commit | dba9636e6df092d768d04cfaee839b76722e2393 (patch) | |
| tree | 4f3d740773f21600be6d43344575790f56ef8d5b | |
| parent | e0134fc3e2d7e59741b3643e2680f578cc9def41 (diff) | |
| download | oslo-dba9636e6df092d768d04cfaee839b76722e2393.tar.gz oslo-dba9636e6df092d768d04cfaee839b76722e2393.tar.xz oslo-dba9636e6df092d768d04cfaee839b76722e2393.zip | |
Use function registration for policy checks
The original policy framework allowed new policy checks to be created
through inheritance. This is somewhat clunky and unnecessary in
Python. This change refactors policy.py to allow new policy checks
to be registered using an @register() decorator. One consequence is
that HttpBrain is deprecated.
Care has been taken to ensure backwards compatibility; deprecation
warnings will be emitted for uses of HttpBrain or the inheritance-
based checks.
Change-Id: I3ccef5868906ef64a3c24d6c92533471e89682ba
| -rw-r--r-- | openstack/common/policy.py | 152 | ||||
| -rw-r--r-- | tests/unit/test_policy.py | 169 |
2 files changed, 219 insertions, 102 deletions
diff --git a/openstack/common/policy.py b/openstack/common/policy.py index 7d6eff9..fd1d7d3 100644 --- a/openstack/common/policy.py +++ b/openstack/common/policy.py @@ -131,6 +131,13 @@ def enforce(match_list, target_dict, credentials_dict, exc=None, class Brain(object): """Implements policy checking.""" + + _checks = {} + + @classmethod + def _register(cls, name, func): + cls._checks[name] = func + @classmethod def load_json(cls, data, default_rule=None): """Init a brain using json instead of a rules dictionary.""" @@ -138,6 +145,11 @@ class Brain(object): return cls(rules=rules_dict, default_rule=default_rule) def __init__(self, rules=None, default_rule=None): + if self.__class__ != Brain: + LOG.warning(_("Inheritance-based rules are deprecated; use " + "the default brain instead of %s.") % + self.__class__.__name__) + self.rules = rules or {} self.default_rule = default_rule @@ -151,15 +163,24 @@ class Brain(object): LOG.exception(_("Failed to understand rule %(match)r") % locals()) # If the rule is invalid, fail closed return False + + func = None try: - f = getattr(self, '_check_%s' % match_kind) + old_func = getattr(self, '_check_%s' % match_kind) except AttributeError: - if not self._check_generic(match, target_dict, cred_dict): - return False + func = self._checks.get(match_kind, self._checks.get(None, None)) else: - if not f(match_value, target_dict, cred_dict): - return False - return True + LOG.warning(_("Inheritance-based rules are deprecated; update " + "_check_%s") % match_kind) + func = (lambda brain, kind, value, target, cred: + old_func(value, target, cred)) + + if not func: + LOG.error(_("No handler for matches of kind %s") % match_kind) + # Fail closed + return False + + return func(self, match_kind, match_value, target_dict, cred_dict) def check(self, match_list, target_dict, cred_dict): """Checks authorization of some rules against credentials. @@ -183,58 +204,97 @@ class Brain(object): return True return False - def _check_rule(self, match, target_dict, cred_dict): - """Recursively checks credentials based on the brains rules.""" - try: - new_match_list = self.rules[match] - except KeyError: - if self.default_rule and match != self.default_rule: - new_match_list = ('rule:%s' % self.default_rule,) - else: - return False - return self.check(new_match_list, target_dict, cred_dict) +class HttpBrain(Brain): + """A brain that can check external urls for policy. - def _check_role(self, match, target_dict, cred_dict): - """Check that there is a matching role in the cred dict.""" - return match.lower() in [x.lower() for x in cred_dict['roles']] + Posts json blobs for target and credentials. - def _check_generic(self, match, target_dict, cred_dict): - """Check an individual match. + Note that this brain is deprecated; the http check is registered + by default. + """ - Matches look like: + pass - tenant:%(tenant_id)s - role:compute:admin - """ +def register(name, func=None): + """ + Register a function as a policy check. + + :param name: Gives the name of the check type, e.g., 'rule', + 'role', etc. If name is None, a default function + will be registered. + :param func: If given, provides the function to register. If not + given, returns a function taking one argument to + specify the function to register, allowing use as a + decorator. + """ - # TODO(termie): do dict inspection via dot syntax - match = match % target_dict - key, value = match.split(':', 1) - if key in cred_dict: - return value == cred_dict[key] - return False + # Perform the actual decoration by registering the function. + # Returns the function for compliance with the decorator + # interface. + def decorator(func): + # Register the function + Brain._register(name, func) + return func + + # If the function is given, do the registration + if func: + return decorator(func) + + return decorator + + +@register("rule") +def _check_rule(brain, match_kind, match, target_dict, cred_dict): + """Recursively checks credentials based on the brains rules.""" + try: + new_match_list = brain.rules[match] + except KeyError: + if brain.default_rule and match != brain.default_rule: + new_match_list = ('rule:%s' % brain.default_rule,) + else: + return False + return brain.check(new_match_list, target_dict, cred_dict) -class HttpBrain(Brain): - """A brain that can check external urls for policy. - Posts json blobs for target and credentials. +@register("role") +def _check_role(brain, match_kind, match, target_dict, cred_dict): + """Check that there is a matching role in the cred dict.""" + return match.lower() in [x.lower() for x in cred_dict['roles']] + + +@register('http') +def _check_http(brain, match_kind, match, target_dict, cred_dict): + """Check http: rules by calling to a remote server. + + This example implementation simply verifies that the response is + exactly 'True'. A custom brain using response codes could easily + be implemented. """ + url = 'http:' + (match % target_dict) + data = {'target': jsonutils.dumps(target_dict), + 'credentials': jsonutils.dumps(cred_dict)} + post_data = urllib.urlencode(data) + f = urllib2.urlopen(url, post_data) + return f.read() == "True" - def _check_http(self, match, target_dict, cred_dict): - """Check http: rules by calling to a remote server. - This example implementation simply verifies that the response is - exactly 'True'. A custom brain using response codes could easily - be implemented. +@register(None) +def _check_generic(brain, match_kind, match, target_dict, cred_dict): + """Check an individual match. - """ - url = match % target_dict - data = {'target': jsonutils.dumps(target_dict), - 'credentials': jsonutils.dumps(cred_dict)} - post_data = urllib.urlencode(data) - f = urllib2.urlopen(url, post_data) - return f.read() == "True" + Matches look like: + + tenant:%(tenant_id)s + role:compute:admin + + """ + + # TODO(termie): do dict inspection via dot syntax + match = match % target_dict + if match_kind in cred_dict: + return match == cred_dict[match_kind] + return False diff --git a/tests/unit/test_policy.py b/tests/unit/test_policy.py index 449c8a9..e7f391f 100644 --- a/tests/unit/test_policy.py +++ b/tests/unit/test_policy.py @@ -157,12 +157,15 @@ class BrainTestCase(unittest.TestCase): def test_check_with_specific(self): self.spam_called = False + def check_spam(brain, kind, match, target_dict, cred_dict): + self.assertEqual(kind, "spam") + self.assertEqual(match, "check") + self.assertEqual(target_dict, "target") + self.assertEqual(cred_dict, "credentials") + self.spam_called = True + class TestBrain(policy.Brain): - def _check_spam(inst, match, target_dict, cred_dict): - self.assertEqual(match, "check") - self.assertEqual(target_dict, "target") - self.assertEqual(cred_dict, "credentials") - self.spam_called = True + _checks = dict(spam=check_spam) brain = TestBrain() result = brain._check("spam:check", "target", "credentials") @@ -172,18 +175,45 @@ class BrainTestCase(unittest.TestCase): def test_check_with_generic(self): self.generic_called = False + def check_generic(brain, kind, match, target_dict, cred_dict): + self.assertEqual(kind, "spam") + self.assertEqual(match, "check") + self.assertEqual(target_dict, "target") + self.assertEqual(cred_dict, "credentials") + self.generic_called = True + class TestBrain(policy.Brain): - def _check_generic(inst, match, target_dict, cred_dict): - self.assertEqual(match, "spam:check") - self.assertEqual(target_dict, "target") - self.assertEqual(cred_dict, "credentials") - self.generic_called = True + _checks = {None: check_generic} brain = TestBrain() result = brain._check("spam:check", "target", "credentials") self.assertEqual(self.generic_called, True) + def test_check_with_inheritance(self): + self.inherited_called = False + + class TestBrain(policy.Brain): + _checks = {} + + def _check_inherited(inst, match, target_dict, cred_dict): + self.assertEqual(match, "check") + self.assertEqual(target_dict, "target") + self.assertEqual(cred_dict, "credentials") + self.inherited_called = True + + brain = TestBrain() + result = brain._check("inherited:check", "target", "credentials") + + self.assertEqual(self.inherited_called, True) + + def test_check_no_handler(self): + class TestBrain(policy.Brain): + _checks = {} + + brain = TestBrain() + self.assertEqual(brain._check('spam:mer', 'target', 'cred'), False) + def test_check_empty(self): class TestBrain(policy.Brain): def _check(inst, match, target_dict, cred_dict): @@ -274,6 +304,52 @@ class BrainTestCase(unittest.TestCase): self.assertEqual(self.targets, ["target", "target", "target"]) self.assertEqual(self.creds, ["creds", "creds", "creds"]) + +class CheckRegisterTestCase(unittest.TestCase): + def setUp(self): + self.brain_checks = policy.Brain._checks + policy.Brain._checks = {} + + def tearDown(self): + policy.Brain._checks = self.brain_checks + + def test_class_register(self): + policy.Brain._register('spam', 'func') + policy.Brain._register('spammer', 'funcer') + + self.assertEqual(policy.Brain._checks, + dict(spam='func', spammer='funcer')) + + def test_register_func(self): + policy.register('spam', 'func') + + self.assertEqual(policy.Brain._checks, + dict(spam='func')) + + def test_register_decorator(self): + @policy.register('spam') + def test_func(): + pass + + self.assertEqual(policy.Brain._checks, + dict(spam=test_func)) + + +class CheckTestCase(unittest.TestCase): + def setUp(self): + self.urlopen_result = "" + + def fake_urlopen(url, post_data): + self.url = url + self.post_data = post_data + return StringIO.StringIO(self.urlopen_result) + + self.patcher = mock.patch.object(urllib2, "urlopen", fake_urlopen) + self.patcher.start() + + def tearDown(self): + self.patcher.stop() + def stub__check_rule(self, rules=None, default_rule=None): self.check_called = False @@ -288,21 +364,21 @@ class BrainTestCase(unittest.TestCase): def test_rule_no_rules_no_default(self): brain = self.stub__check_rule() - result = brain._check_rule("spam", "target", "creds") + result = policy._check_rule(brain, "rule", "spam", "target", "creds") self.assertEqual(result, False) self.assertEqual(self.check_called, False) def test_rule_no_rules_default(self): brain = self.stub__check_rule(default_rule="spam") - result = brain._check_rule("spam", "target", "creds") + result = policy._check_rule(brain, "rule", "spam", "target", "creds") self.assertEqual(result, False) self.assertEqual(self.check_called, False) def test_rule_no_rules_non_default(self): brain = self.stub__check_rule(default_rule="spam") - result = brain._check_rule("python", "target", "creds") + result = policy._check_rule(brain, "rule", "python", "target", "creds") self.assertEqual(self.check_called, True) self.assertEqual(result, ("rule:spam",)) @@ -311,7 +387,7 @@ class BrainTestCase(unittest.TestCase): def test_rule_with_rules(self): brain = self.stub__check_rule(rules=dict(spam=["hiho:ni"])) - result = brain._check_rule("spam", "target", "creds") + result = policy._check_rule(brain, "rule", "spam", "target", "creds") self.assertEqual(self.check_called, True) self.assertEqual(result, ["hiho:ni"]) @@ -319,57 +395,38 @@ class BrainTestCase(unittest.TestCase): self.assertEqual(self.cred, "creds") def test_role_no_match(self): - brain = policy.Brain() - result = brain._check_role("SpAm", {}, dict(roles=["a", "b", "c"])) + result = policy._check_role(None, "role", "SpAm", {}, + dict(roles=["a", "b", "c"])) self.assertEqual(result, False) def test_role_with_match(self): - brain = policy.Brain() - result = brain._check_role("SpAm", {}, dict(roles=["a", "b", "sPaM"])) + result = policy._check_role(None, "role", "SpAm", {}, + dict(roles=["a", "b", "sPaM"])) self.assertEqual(result, True) def test_generic_no_key(self): - brain = policy.Brain() - result = brain._check_generic("tenant:%(tenant_id)s", - dict(tenant_id="spam"), - {}) + result = policy._check_generic(None, "tenant", "%(tenant_id)s", + dict(tenant_id="spam"), + {}) self.assertEqual(result, False) def test_generic_with_key_mismatch(self): - brain = policy.Brain() - result = brain._check_generic("tenant:%(tenant_id)s", - dict(tenant_id="spam"), - dict(tenant="nospam")) + result = policy._check_generic(None, "tenant", "%(tenant_id)s", + dict(tenant_id="spam"), + dict(tenant="nospam")) self.assertEqual(result, False) def test_generic_with_key_match(self): - brain = policy.Brain() - result = brain._check_generic("tenant:%(tenant_id)s", - dict(tenant_id="spam"), - dict(tenant="spam")) + result = policy._check_generic(None, "tenant", "%(tenant_id)s", + dict(tenant_id="spam"), + dict(tenant="spam")) self.assertEqual(result, True) - -class HttpBrainTestCase(unittest.TestCase): - def setUp(self): - self.urlopen_result = "" - - def fake_urlopen(url, post_data): - self.url = url - self.post_data = post_data - return StringIO.StringIO(self.urlopen_result) - - self.patcher = mock.patch.object(urllib2, "urlopen", fake_urlopen) - self.patcher.start() - - def tearDown(self): - self.patcher.stop() - def decode_post_data(self): result = {} for item in self.post_data.split('&'): @@ -379,26 +436,26 @@ class HttpBrainTestCase(unittest.TestCase): return result def test_http_false(self): - brain = policy.HttpBrain() - result = brain._check_http("//spam.example.org/%(tenant)s", - dict(tenant="spam"), - dict(roles=["a", "b", "c"])) + result = policy._check_http(None, "http", + "//spam.example.org/%(tenant)s", + dict(tenant="spam"), + dict(roles=["a", "b", "c"])) self.assertEqual(result, False) - self.assertEqual(self.url, "//spam.example.org/spam") + self.assertEqual(self.url, "http://spam.example.org/spam") self.assertEqual(self.decode_post_data(), dict( target=dict(tenant="spam"), credentials=dict(roles=["a", "b", "c"]))) def test_http_true(self): self.urlopen_result = "True" - brain = policy.HttpBrain() - result = brain._check_http("//spam.example.org/%(tenant)s", - dict(tenant="spam"), - dict(roles=["a", "b", "c"])) + result = policy._check_http(None, "http", + "//spam.example.org/%(tenant)s", + dict(tenant="spam"), + dict(roles=["a", "b", "c"])) self.assertEqual(result, True) - self.assertEqual(self.url, "//spam.example.org/spam") + self.assertEqual(self.url, "http://spam.example.org/spam") self.assertEqual(self.decode_post_data(), dict( target=dict(tenant="spam"), credentials=dict(roles=["a", "b", "c"]))) |
