summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKevin L. Mitchell <kevin.mitchell@rackspace.com>2012-07-30 19:23:28 -0500
committerKevin L. Mitchell <kevin.mitchell@rackspace.com>2012-07-30 19:23:54 -0500
commitdba9636e6df092d768d04cfaee839b76722e2393 (patch)
tree4f3d740773f21600be6d43344575790f56ef8d5b
parente0134fc3e2d7e59741b3643e2680f578cc9def41 (diff)
downloadoslo-dba9636e6df092d768d04cfaee839b76722e2393.tar.gz
oslo-dba9636e6df092d768d04cfaee839b76722e2393.tar.xz
oslo-dba9636e6df092d768d04cfaee839b76722e2393.zip
Use function registration for policy checks
The original policy framework allowed new policy checks to be created through inheritance. This is somewhat clunky and unnecessary in Python. This change refactors policy.py to allow new policy checks to be registered using an @register() decorator. One consequence is that HttpBrain is deprecated. Care has been taken to ensure backwards compatibility; deprecation warnings will be emitted for uses of HttpBrain or the inheritance- based checks. Change-Id: I3ccef5868906ef64a3c24d6c92533471e89682ba
-rw-r--r--openstack/common/policy.py152
-rw-r--r--tests/unit/test_policy.py169
2 files changed, 219 insertions, 102 deletions
diff --git a/openstack/common/policy.py b/openstack/common/policy.py
index 7d6eff9..fd1d7d3 100644
--- a/openstack/common/policy.py
+++ b/openstack/common/policy.py
@@ -131,6 +131,13 @@ def enforce(match_list, target_dict, credentials_dict, exc=None,
class Brain(object):
"""Implements policy checking."""
+
+ _checks = {}
+
+ @classmethod
+ def _register(cls, name, func):
+ cls._checks[name] = func
+
@classmethod
def load_json(cls, data, default_rule=None):
"""Init a brain using json instead of a rules dictionary."""
@@ -138,6 +145,11 @@ class Brain(object):
return cls(rules=rules_dict, default_rule=default_rule)
def __init__(self, rules=None, default_rule=None):
+ if self.__class__ != Brain:
+ LOG.warning(_("Inheritance-based rules are deprecated; use "
+ "the default brain instead of %s.") %
+ self.__class__.__name__)
+
self.rules = rules or {}
self.default_rule = default_rule
@@ -151,15 +163,24 @@ class Brain(object):
LOG.exception(_("Failed to understand rule %(match)r") % locals())
# If the rule is invalid, fail closed
return False
+
+ func = None
try:
- f = getattr(self, '_check_%s' % match_kind)
+ old_func = getattr(self, '_check_%s' % match_kind)
except AttributeError:
- if not self._check_generic(match, target_dict, cred_dict):
- return False
+ func = self._checks.get(match_kind, self._checks.get(None, None))
else:
- if not f(match_value, target_dict, cred_dict):
- return False
- return True
+ LOG.warning(_("Inheritance-based rules are deprecated; update "
+ "_check_%s") % match_kind)
+ func = (lambda brain, kind, value, target, cred:
+ old_func(value, target, cred))
+
+ if not func:
+ LOG.error(_("No handler for matches of kind %s") % match_kind)
+ # Fail closed
+ return False
+
+ return func(self, match_kind, match_value, target_dict, cred_dict)
def check(self, match_list, target_dict, cred_dict):
"""Checks authorization of some rules against credentials.
@@ -183,58 +204,97 @@ class Brain(object):
return True
return False
- def _check_rule(self, match, target_dict, cred_dict):
- """Recursively checks credentials based on the brains rules."""
- try:
- new_match_list = self.rules[match]
- except KeyError:
- if self.default_rule and match != self.default_rule:
- new_match_list = ('rule:%s' % self.default_rule,)
- else:
- return False
- return self.check(new_match_list, target_dict, cred_dict)
+class HttpBrain(Brain):
+ """A brain that can check external urls for policy.
- def _check_role(self, match, target_dict, cred_dict):
- """Check that there is a matching role in the cred dict."""
- return match.lower() in [x.lower() for x in cred_dict['roles']]
+ Posts json blobs for target and credentials.
- def _check_generic(self, match, target_dict, cred_dict):
- """Check an individual match.
+ Note that this brain is deprecated; the http check is registered
+ by default.
+ """
- Matches look like:
+ pass
- tenant:%(tenant_id)s
- role:compute:admin
- """
+def register(name, func=None):
+ """
+ Register a function as a policy check.
+
+ :param name: Gives the name of the check type, e.g., 'rule',
+ 'role', etc. If name is None, a default function
+ will be registered.
+ :param func: If given, provides the function to register. If not
+ given, returns a function taking one argument to
+ specify the function to register, allowing use as a
+ decorator.
+ """
- # TODO(termie): do dict inspection via dot syntax
- match = match % target_dict
- key, value = match.split(':', 1)
- if key in cred_dict:
- return value == cred_dict[key]
- return False
+ # Perform the actual decoration by registering the function.
+ # Returns the function for compliance with the decorator
+ # interface.
+ def decorator(func):
+ # Register the function
+ Brain._register(name, func)
+ return func
+
+ # If the function is given, do the registration
+ if func:
+ return decorator(func)
+
+ return decorator
+
+
+@register("rule")
+def _check_rule(brain, match_kind, match, target_dict, cred_dict):
+ """Recursively checks credentials based on the brains rules."""
+ try:
+ new_match_list = brain.rules[match]
+ except KeyError:
+ if brain.default_rule and match != brain.default_rule:
+ new_match_list = ('rule:%s' % brain.default_rule,)
+ else:
+ return False
+ return brain.check(new_match_list, target_dict, cred_dict)
-class HttpBrain(Brain):
- """A brain that can check external urls for policy.
- Posts json blobs for target and credentials.
+@register("role")
+def _check_role(brain, match_kind, match, target_dict, cred_dict):
+ """Check that there is a matching role in the cred dict."""
+ return match.lower() in [x.lower() for x in cred_dict['roles']]
+
+
+@register('http')
+def _check_http(brain, match_kind, match, target_dict, cred_dict):
+ """Check http: rules by calling to a remote server.
+
+ This example implementation simply verifies that the response is
+ exactly 'True'. A custom brain using response codes could easily
+ be implemented.
"""
+ url = 'http:' + (match % target_dict)
+ data = {'target': jsonutils.dumps(target_dict),
+ 'credentials': jsonutils.dumps(cred_dict)}
+ post_data = urllib.urlencode(data)
+ f = urllib2.urlopen(url, post_data)
+ return f.read() == "True"
- def _check_http(self, match, target_dict, cred_dict):
- """Check http: rules by calling to a remote server.
- This example implementation simply verifies that the response is
- exactly 'True'. A custom brain using response codes could easily
- be implemented.
+@register(None)
+def _check_generic(brain, match_kind, match, target_dict, cred_dict):
+ """Check an individual match.
- """
- url = match % target_dict
- data = {'target': jsonutils.dumps(target_dict),
- 'credentials': jsonutils.dumps(cred_dict)}
- post_data = urllib.urlencode(data)
- f = urllib2.urlopen(url, post_data)
- return f.read() == "True"
+ Matches look like:
+
+ tenant:%(tenant_id)s
+ role:compute:admin
+
+ """
+
+ # TODO(termie): do dict inspection via dot syntax
+ match = match % target_dict
+ if match_kind in cred_dict:
+ return match == cred_dict[match_kind]
+ return False
diff --git a/tests/unit/test_policy.py b/tests/unit/test_policy.py
index 449c8a9..e7f391f 100644
--- a/tests/unit/test_policy.py
+++ b/tests/unit/test_policy.py
@@ -157,12 +157,15 @@ class BrainTestCase(unittest.TestCase):
def test_check_with_specific(self):
self.spam_called = False
+ def check_spam(brain, kind, match, target_dict, cred_dict):
+ self.assertEqual(kind, "spam")
+ self.assertEqual(match, "check")
+ self.assertEqual(target_dict, "target")
+ self.assertEqual(cred_dict, "credentials")
+ self.spam_called = True
+
class TestBrain(policy.Brain):
- def _check_spam(inst, match, target_dict, cred_dict):
- self.assertEqual(match, "check")
- self.assertEqual(target_dict, "target")
- self.assertEqual(cred_dict, "credentials")
- self.spam_called = True
+ _checks = dict(spam=check_spam)
brain = TestBrain()
result = brain._check("spam:check", "target", "credentials")
@@ -172,18 +175,45 @@ class BrainTestCase(unittest.TestCase):
def test_check_with_generic(self):
self.generic_called = False
+ def check_generic(brain, kind, match, target_dict, cred_dict):
+ self.assertEqual(kind, "spam")
+ self.assertEqual(match, "check")
+ self.assertEqual(target_dict, "target")
+ self.assertEqual(cred_dict, "credentials")
+ self.generic_called = True
+
class TestBrain(policy.Brain):
- def _check_generic(inst, match, target_dict, cred_dict):
- self.assertEqual(match, "spam:check")
- self.assertEqual(target_dict, "target")
- self.assertEqual(cred_dict, "credentials")
- self.generic_called = True
+ _checks = {None: check_generic}
brain = TestBrain()
result = brain._check("spam:check", "target", "credentials")
self.assertEqual(self.generic_called, True)
+ def test_check_with_inheritance(self):
+ self.inherited_called = False
+
+ class TestBrain(policy.Brain):
+ _checks = {}
+
+ def _check_inherited(inst, match, target_dict, cred_dict):
+ self.assertEqual(match, "check")
+ self.assertEqual(target_dict, "target")
+ self.assertEqual(cred_dict, "credentials")
+ self.inherited_called = True
+
+ brain = TestBrain()
+ result = brain._check("inherited:check", "target", "credentials")
+
+ self.assertEqual(self.inherited_called, True)
+
+ def test_check_no_handler(self):
+ class TestBrain(policy.Brain):
+ _checks = {}
+
+ brain = TestBrain()
+ self.assertEqual(brain._check('spam:mer', 'target', 'cred'), False)
+
def test_check_empty(self):
class TestBrain(policy.Brain):
def _check(inst, match, target_dict, cred_dict):
@@ -274,6 +304,52 @@ class BrainTestCase(unittest.TestCase):
self.assertEqual(self.targets, ["target", "target", "target"])
self.assertEqual(self.creds, ["creds", "creds", "creds"])
+
+class CheckRegisterTestCase(unittest.TestCase):
+ def setUp(self):
+ self.brain_checks = policy.Brain._checks
+ policy.Brain._checks = {}
+
+ def tearDown(self):
+ policy.Brain._checks = self.brain_checks
+
+ def test_class_register(self):
+ policy.Brain._register('spam', 'func')
+ policy.Brain._register('spammer', 'funcer')
+
+ self.assertEqual(policy.Brain._checks,
+ dict(spam='func', spammer='funcer'))
+
+ def test_register_func(self):
+ policy.register('spam', 'func')
+
+ self.assertEqual(policy.Brain._checks,
+ dict(spam='func'))
+
+ def test_register_decorator(self):
+ @policy.register('spam')
+ def test_func():
+ pass
+
+ self.assertEqual(policy.Brain._checks,
+ dict(spam=test_func))
+
+
+class CheckTestCase(unittest.TestCase):
+ def setUp(self):
+ self.urlopen_result = ""
+
+ def fake_urlopen(url, post_data):
+ self.url = url
+ self.post_data = post_data
+ return StringIO.StringIO(self.urlopen_result)
+
+ self.patcher = mock.patch.object(urllib2, "urlopen", fake_urlopen)
+ self.patcher.start()
+
+ def tearDown(self):
+ self.patcher.stop()
+
def stub__check_rule(self, rules=None, default_rule=None):
self.check_called = False
@@ -288,21 +364,21 @@ class BrainTestCase(unittest.TestCase):
def test_rule_no_rules_no_default(self):
brain = self.stub__check_rule()
- result = brain._check_rule("spam", "target", "creds")
+ result = policy._check_rule(brain, "rule", "spam", "target", "creds")
self.assertEqual(result, False)
self.assertEqual(self.check_called, False)
def test_rule_no_rules_default(self):
brain = self.stub__check_rule(default_rule="spam")
- result = brain._check_rule("spam", "target", "creds")
+ result = policy._check_rule(brain, "rule", "spam", "target", "creds")
self.assertEqual(result, False)
self.assertEqual(self.check_called, False)
def test_rule_no_rules_non_default(self):
brain = self.stub__check_rule(default_rule="spam")
- result = brain._check_rule("python", "target", "creds")
+ result = policy._check_rule(brain, "rule", "python", "target", "creds")
self.assertEqual(self.check_called, True)
self.assertEqual(result, ("rule:spam",))
@@ -311,7 +387,7 @@ class BrainTestCase(unittest.TestCase):
def test_rule_with_rules(self):
brain = self.stub__check_rule(rules=dict(spam=["hiho:ni"]))
- result = brain._check_rule("spam", "target", "creds")
+ result = policy._check_rule(brain, "rule", "spam", "target", "creds")
self.assertEqual(self.check_called, True)
self.assertEqual(result, ["hiho:ni"])
@@ -319,57 +395,38 @@ class BrainTestCase(unittest.TestCase):
self.assertEqual(self.cred, "creds")
def test_role_no_match(self):
- brain = policy.Brain()
- result = brain._check_role("SpAm", {}, dict(roles=["a", "b", "c"]))
+ result = policy._check_role(None, "role", "SpAm", {},
+ dict(roles=["a", "b", "c"]))
self.assertEqual(result, False)
def test_role_with_match(self):
- brain = policy.Brain()
- result = brain._check_role("SpAm", {}, dict(roles=["a", "b", "sPaM"]))
+ result = policy._check_role(None, "role", "SpAm", {},
+ dict(roles=["a", "b", "sPaM"]))
self.assertEqual(result, True)
def test_generic_no_key(self):
- brain = policy.Brain()
- result = brain._check_generic("tenant:%(tenant_id)s",
- dict(tenant_id="spam"),
- {})
+ result = policy._check_generic(None, "tenant", "%(tenant_id)s",
+ dict(tenant_id="spam"),
+ {})
self.assertEqual(result, False)
def test_generic_with_key_mismatch(self):
- brain = policy.Brain()
- result = brain._check_generic("tenant:%(tenant_id)s",
- dict(tenant_id="spam"),
- dict(tenant="nospam"))
+ result = policy._check_generic(None, "tenant", "%(tenant_id)s",
+ dict(tenant_id="spam"),
+ dict(tenant="nospam"))
self.assertEqual(result, False)
def test_generic_with_key_match(self):
- brain = policy.Brain()
- result = brain._check_generic("tenant:%(tenant_id)s",
- dict(tenant_id="spam"),
- dict(tenant="spam"))
+ result = policy._check_generic(None, "tenant", "%(tenant_id)s",
+ dict(tenant_id="spam"),
+ dict(tenant="spam"))
self.assertEqual(result, True)
-
-class HttpBrainTestCase(unittest.TestCase):
- def setUp(self):
- self.urlopen_result = ""
-
- def fake_urlopen(url, post_data):
- self.url = url
- self.post_data = post_data
- return StringIO.StringIO(self.urlopen_result)
-
- self.patcher = mock.patch.object(urllib2, "urlopen", fake_urlopen)
- self.patcher.start()
-
- def tearDown(self):
- self.patcher.stop()
-
def decode_post_data(self):
result = {}
for item in self.post_data.split('&'):
@@ -379,26 +436,26 @@ class HttpBrainTestCase(unittest.TestCase):
return result
def test_http_false(self):
- brain = policy.HttpBrain()
- result = brain._check_http("//spam.example.org/%(tenant)s",
- dict(tenant="spam"),
- dict(roles=["a", "b", "c"]))
+ result = policy._check_http(None, "http",
+ "//spam.example.org/%(tenant)s",
+ dict(tenant="spam"),
+ dict(roles=["a", "b", "c"]))
self.assertEqual(result, False)
- self.assertEqual(self.url, "//spam.example.org/spam")
+ self.assertEqual(self.url, "http://spam.example.org/spam")
self.assertEqual(self.decode_post_data(), dict(
target=dict(tenant="spam"),
credentials=dict(roles=["a", "b", "c"])))
def test_http_true(self):
self.urlopen_result = "True"
- brain = policy.HttpBrain()
- result = brain._check_http("//spam.example.org/%(tenant)s",
- dict(tenant="spam"),
- dict(roles=["a", "b", "c"]))
+ result = policy._check_http(None, "http",
+ "//spam.example.org/%(tenant)s",
+ dict(tenant="spam"),
+ dict(roles=["a", "b", "c"]))
self.assertEqual(result, True)
- self.assertEqual(self.url, "//spam.example.org/spam")
+ self.assertEqual(self.url, "http://spam.example.org/spam")
self.assertEqual(self.decode_post_data(), dict(
target=dict(tenant="spam"),
credentials=dict(roles=["a", "b", "c"])))