summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--openstack/common/policy.py152
-rw-r--r--tests/unit/test_policy.py169
2 files changed, 219 insertions, 102 deletions
diff --git a/openstack/common/policy.py b/openstack/common/policy.py
index 7d6eff9..fd1d7d3 100644
--- a/openstack/common/policy.py
+++ b/openstack/common/policy.py
@@ -131,6 +131,13 @@ def enforce(match_list, target_dict, credentials_dict, exc=None,
class Brain(object):
"""Implements policy checking."""
+
+ _checks = {}
+
+ @classmethod
+ def _register(cls, name, func):
+ cls._checks[name] = func
+
@classmethod
def load_json(cls, data, default_rule=None):
"""Init a brain using json instead of a rules dictionary."""
@@ -138,6 +145,11 @@ class Brain(object):
return cls(rules=rules_dict, default_rule=default_rule)
def __init__(self, rules=None, default_rule=None):
+ if self.__class__ != Brain:
+ LOG.warning(_("Inheritance-based rules are deprecated; use "
+ "the default brain instead of %s.") %
+ self.__class__.__name__)
+
self.rules = rules or {}
self.default_rule = default_rule
@@ -151,15 +163,24 @@ class Brain(object):
LOG.exception(_("Failed to understand rule %(match)r") % locals())
# If the rule is invalid, fail closed
return False
+
+ func = None
try:
- f = getattr(self, '_check_%s' % match_kind)
+ old_func = getattr(self, '_check_%s' % match_kind)
except AttributeError:
- if not self._check_generic(match, target_dict, cred_dict):
- return False
+ func = self._checks.get(match_kind, self._checks.get(None, None))
else:
- if not f(match_value, target_dict, cred_dict):
- return False
- return True
+ LOG.warning(_("Inheritance-based rules are deprecated; update "
+ "_check_%s") % match_kind)
+ func = (lambda brain, kind, value, target, cred:
+ old_func(value, target, cred))
+
+ if not func:
+ LOG.error(_("No handler for matches of kind %s") % match_kind)
+ # Fail closed
+ return False
+
+ return func(self, match_kind, match_value, target_dict, cred_dict)
def check(self, match_list, target_dict, cred_dict):
"""Checks authorization of some rules against credentials.
@@ -183,58 +204,97 @@ class Brain(object):
return True
return False
- def _check_rule(self, match, target_dict, cred_dict):
- """Recursively checks credentials based on the brains rules."""
- try:
- new_match_list = self.rules[match]
- except KeyError:
- if self.default_rule and match != self.default_rule:
- new_match_list = ('rule:%s' % self.default_rule,)
- else:
- return False
- return self.check(new_match_list, target_dict, cred_dict)
+class HttpBrain(Brain):
+ """A brain that can check external urls for policy.
- def _check_role(self, match, target_dict, cred_dict):
- """Check that there is a matching role in the cred dict."""
- return match.lower() in [x.lower() for x in cred_dict['roles']]
+ Posts json blobs for target and credentials.
- def _check_generic(self, match, target_dict, cred_dict):
- """Check an individual match.
+ Note that this brain is deprecated; the http check is registered
+ by default.
+ """
- Matches look like:
+ pass
- tenant:%(tenant_id)s
- role:compute:admin
- """
+def register(name, func=None):
+ """
+ Register a function as a policy check.
+
+ :param name: Gives the name of the check type, e.g., 'rule',
+ 'role', etc. If name is None, a default function
+ will be registered.
+ :param func: If given, provides the function to register. If not
+ given, returns a function taking one argument to
+ specify the function to register, allowing use as a
+ decorator.
+ """
- # TODO(termie): do dict inspection via dot syntax
- match = match % target_dict
- key, value = match.split(':', 1)
- if key in cred_dict:
- return value == cred_dict[key]
- return False
+ # Perform the actual decoration by registering the function.
+ # Returns the function for compliance with the decorator
+ # interface.
+ def decorator(func):
+ # Register the function
+ Brain._register(name, func)
+ return func
+
+ # If the function is given, do the registration
+ if func:
+ return decorator(func)
+
+ return decorator
+
+
+@register("rule")
+def _check_rule(brain, match_kind, match, target_dict, cred_dict):
+ """Recursively checks credentials based on the brains rules."""
+ try:
+ new_match_list = brain.rules[match]
+ except KeyError:
+ if brain.default_rule and match != brain.default_rule:
+ new_match_list = ('rule:%s' % brain.default_rule,)
+ else:
+ return False
+ return brain.check(new_match_list, target_dict, cred_dict)
-class HttpBrain(Brain):
- """A brain that can check external urls for policy.
- Posts json blobs for target and credentials.
+@register("role")
+def _check_role(brain, match_kind, match, target_dict, cred_dict):
+ """Check that there is a matching role in the cred dict."""
+ return match.lower() in [x.lower() for x in cred_dict['roles']]
+
+
+@register('http')
+def _check_http(brain, match_kind, match, target_dict, cred_dict):
+ """Check http: rules by calling to a remote server.
+
+ This example implementation simply verifies that the response is
+ exactly 'True'. A custom brain using response codes could easily
+ be implemented.
"""
+ url = 'http:' + (match % target_dict)
+ data = {'target': jsonutils.dumps(target_dict),
+ 'credentials': jsonutils.dumps(cred_dict)}
+ post_data = urllib.urlencode(data)
+ f = urllib2.urlopen(url, post_data)
+ return f.read() == "True"
- def _check_http(self, match, target_dict, cred_dict):
- """Check http: rules by calling to a remote server.
- This example implementation simply verifies that the response is
- exactly 'True'. A custom brain using response codes could easily
- be implemented.
+@register(None)
+def _check_generic(brain, match_kind, match, target_dict, cred_dict):
+ """Check an individual match.
- """
- url = match % target_dict
- data = {'target': jsonutils.dumps(target_dict),
- 'credentials': jsonutils.dumps(cred_dict)}
- post_data = urllib.urlencode(data)
- f = urllib2.urlopen(url, post_data)
- return f.read() == "True"
+ Matches look like:
+
+ tenant:%(tenant_id)s
+ role:compute:admin
+
+ """
+
+ # TODO(termie): do dict inspection via dot syntax
+ match = match % target_dict
+ if match_kind in cred_dict:
+ return match == cred_dict[match_kind]
+ return False
diff --git a/tests/unit/test_policy.py b/tests/unit/test_policy.py
index 449c8a9..e7f391f 100644
--- a/tests/unit/test_policy.py
+++ b/tests/unit/test_policy.py
@@ -157,12 +157,15 @@ class BrainTestCase(unittest.TestCase):
def test_check_with_specific(self):
self.spam_called = False
+ def check_spam(brain, kind, match, target_dict, cred_dict):
+ self.assertEqual(kind, "spam")
+ self.assertEqual(match, "check")
+ self.assertEqual(target_dict, "target")
+ self.assertEqual(cred_dict, "credentials")
+ self.spam_called = True
+
class TestBrain(policy.Brain):
- def _check_spam(inst, match, target_dict, cred_dict):
- self.assertEqual(match, "check")
- self.assertEqual(target_dict, "target")
- self.assertEqual(cred_dict, "credentials")
- self.spam_called = True
+ _checks = dict(spam=check_spam)
brain = TestBrain()
result = brain._check("spam:check", "target", "credentials")
@@ -172,18 +175,45 @@ class BrainTestCase(unittest.TestCase):
def test_check_with_generic(self):
self.generic_called = False
+ def check_generic(brain, kind, match, target_dict, cred_dict):
+ self.assertEqual(kind, "spam")
+ self.assertEqual(match, "check")
+ self.assertEqual(target_dict, "target")
+ self.assertEqual(cred_dict, "credentials")
+ self.generic_called = True
+
class TestBrain(policy.Brain):
- def _check_generic(inst, match, target_dict, cred_dict):
- self.assertEqual(match, "spam:check")
- self.assertEqual(target_dict, "target")
- self.assertEqual(cred_dict, "credentials")
- self.generic_called = True
+ _checks = {None: check_generic}
brain = TestBrain()
result = brain._check("spam:check", "target", "credentials")
self.assertEqual(self.generic_called, True)
+ def test_check_with_inheritance(self):
+ self.inherited_called = False
+
+ class TestBrain(policy.Brain):
+ _checks = {}
+
+ def _check_inherited(inst, match, target_dict, cred_dict):
+ self.assertEqual(match, "check")
+ self.assertEqual(target_dict, "target")
+ self.assertEqual(cred_dict, "credentials")
+ self.inherited_called = True
+
+ brain = TestBrain()
+ result = brain._check("inherited:check", "target", "credentials")
+
+ self.assertEqual(self.inherited_called, True)
+
+ def test_check_no_handler(self):
+ class TestBrain(policy.Brain):
+ _checks = {}
+
+ brain = TestBrain()
+ self.assertEqual(brain._check('spam:mer', 'target', 'cred'), False)
+
def test_check_empty(self):
class TestBrain(policy.Brain):
def _check(inst, match, target_dict, cred_dict):
@@ -274,6 +304,52 @@ class BrainTestCase(unittest.TestCase):
self.assertEqual(self.targets, ["target", "target", "target"])
self.assertEqual(self.creds, ["creds", "creds", "creds"])
+
+class CheckRegisterTestCase(unittest.TestCase):
+ def setUp(self):
+ self.brain_checks = policy.Brain._checks
+ policy.Brain._checks = {}
+
+ def tearDown(self):
+ policy.Brain._checks = self.brain_checks
+
+ def test_class_register(self):
+ policy.Brain._register('spam', 'func')
+ policy.Brain._register('spammer', 'funcer')
+
+ self.assertEqual(policy.Brain._checks,
+ dict(spam='func', spammer='funcer'))
+
+ def test_register_func(self):
+ policy.register('spam', 'func')
+
+ self.assertEqual(policy.Brain._checks,
+ dict(spam='func'))
+
+ def test_register_decorator(self):
+ @policy.register('spam')
+ def test_func():
+ pass
+
+ self.assertEqual(policy.Brain._checks,
+ dict(spam=test_func))
+
+
+class CheckTestCase(unittest.TestCase):
+ def setUp(self):
+ self.urlopen_result = ""
+
+ def fake_urlopen(url, post_data):
+ self.url = url
+ self.post_data = post_data
+ return StringIO.StringIO(self.urlopen_result)
+
+ self.patcher = mock.patch.object(urllib2, "urlopen", fake_urlopen)
+ self.patcher.start()
+
+ def tearDown(self):
+ self.patcher.stop()
+
def stub__check_rule(self, rules=None, default_rule=None):
self.check_called = False
@@ -288,21 +364,21 @@ class BrainTestCase(unittest.TestCase):
def test_rule_no_rules_no_default(self):
brain = self.stub__check_rule()
- result = brain._check_rule("spam", "target", "creds")
+ result = policy._check_rule(brain, "rule", "spam", "target", "creds")
self.assertEqual(result, False)
self.assertEqual(self.check_called, False)
def test_rule_no_rules_default(self):
brain = self.stub__check_rule(default_rule="spam")
- result = brain._check_rule("spam", "target", "creds")
+ result = policy._check_rule(brain, "rule", "spam", "target", "creds")
self.assertEqual(result, False)
self.assertEqual(self.check_called, False)
def test_rule_no_rules_non_default(self):
brain = self.stub__check_rule(default_rule="spam")
- result = brain._check_rule("python", "target", "creds")
+ result = policy._check_rule(brain, "rule", "python", "target", "creds")
self.assertEqual(self.check_called, True)
self.assertEqual(result, ("rule:spam",))
@@ -311,7 +387,7 @@ class BrainTestCase(unittest.TestCase):
def test_rule_with_rules(self):
brain = self.stub__check_rule(rules=dict(spam=["hiho:ni"]))
- result = brain._check_rule("spam", "target", "creds")
+ result = policy._check_rule(brain, "rule", "spam", "target", "creds")
self.assertEqual(self.check_called, True)
self.assertEqual(result, ["hiho:ni"])
@@ -319,57 +395,38 @@ class BrainTestCase(unittest.TestCase):
self.assertEqual(self.cred, "creds")
def test_role_no_match(self):
- brain = policy.Brain()
- result = brain._check_role("SpAm", {}, dict(roles=["a", "b", "c"]))
+ result = policy._check_role(None, "role", "SpAm", {},
+ dict(roles=["a", "b", "c"]))
self.assertEqual(result, False)
def test_role_with_match(self):
- brain = policy.Brain()
- result = brain._check_role("SpAm", {}, dict(roles=["a", "b", "sPaM"]))
+ result = policy._check_role(None, "role", "SpAm", {},
+ dict(roles=["a", "b", "sPaM"]))
self.assertEqual(result, True)
def test_generic_no_key(self):
- brain = policy.Brain()
- result = brain._check_generic("tenant:%(tenant_id)s",
- dict(tenant_id="spam"),
- {})
+ result = policy._check_generic(None, "tenant", "%(tenant_id)s",
+ dict(tenant_id="spam"),
+ {})
self.assertEqual(result, False)
def test_generic_with_key_mismatch(self):
- brain = policy.Brain()
- result = brain._check_generic("tenant:%(tenant_id)s",
- dict(tenant_id="spam"),
- dict(tenant="nospam"))
+ result = policy._check_generic(None, "tenant", "%(tenant_id)s",
+ dict(tenant_id="spam"),
+ dict(tenant="nospam"))
self.assertEqual(result, False)
def test_generic_with_key_match(self):
- brain = policy.Brain()
- result = brain._check_generic("tenant:%(tenant_id)s",
- dict(tenant_id="spam"),
- dict(tenant="spam"))
+ result = policy._check_generic(None, "tenant", "%(tenant_id)s",
+ dict(tenant_id="spam"),
+ dict(tenant="spam"))
self.assertEqual(result, True)
-
-class HttpBrainTestCase(unittest.TestCase):
- def setUp(self):
- self.urlopen_result = ""
-
- def fake_urlopen(url, post_data):
- self.url = url
- self.post_data = post_data
- return StringIO.StringIO(self.urlopen_result)
-
- self.patcher = mock.patch.object(urllib2, "urlopen", fake_urlopen)
- self.patcher.start()
-
- def tearDown(self):
- self.patcher.stop()
-
def decode_post_data(self):
result = {}
for item in self.post_data.split('&'):
@@ -379,26 +436,26 @@ class HttpBrainTestCase(unittest.TestCase):
return result
def test_http_false(self):
- brain = policy.HttpBrain()
- result = brain._check_http("//spam.example.org/%(tenant)s",
- dict(tenant="spam"),
- dict(roles=["a", "b", "c"]))
+ result = policy._check_http(None, "http",
+ "//spam.example.org/%(tenant)s",
+ dict(tenant="spam"),
+ dict(roles=["a", "b", "c"]))
self.assertEqual(result, False)
- self.assertEqual(self.url, "//spam.example.org/spam")
+ self.assertEqual(self.url, "http://spam.example.org/spam")
self.assertEqual(self.decode_post_data(), dict(
target=dict(tenant="spam"),
credentials=dict(roles=["a", "b", "c"])))
def test_http_true(self):
self.urlopen_result = "True"
- brain = policy.HttpBrain()
- result = brain._check_http("//spam.example.org/%(tenant)s",
- dict(tenant="spam"),
- dict(roles=["a", "b", "c"]))
+ result = policy._check_http(None, "http",
+ "//spam.example.org/%(tenant)s",
+ dict(tenant="spam"),
+ dict(roles=["a", "b", "c"]))
self.assertEqual(result, True)
- self.assertEqual(self.url, "//spam.example.org/spam")
+ self.assertEqual(self.url, "http://spam.example.org/spam")
self.assertEqual(self.decode_post_data(), dict(
target=dict(tenant="spam"),
credentials=dict(roles=["a", "b", "c"])))