From f3d2a462209a9f2dd3faa1c5ca271f304eaa16d5 Mon Sep 17 00:00:00 2001 From: Adam Young Date: Mon, 25 Feb 2013 14:53:03 -0500 Subject: flatten payload for policy allows the policy rules to run over a JSON payload. Nestes values en up in dotted notation Change-Id: I9a2ec870c79369d308a23cd742aaeda25400f33a --- keystone/common/controller.py | 85 +++++++++++++++++++++++++++++-------------- tests/test_v3_protection.py | 20 ++++++---- 2 files changed, 70 insertions(+), 35 deletions(-) diff --git a/keystone/common/controller.py b/keystone/common/controller.py index 6377692e..4d8c5ddc 100644 --- a/keystone/common/controller.py +++ b/keystone/common/controller.py @@ -1,3 +1,4 @@ +import collections import functools import uuid @@ -27,46 +28,78 @@ def _build_policy_check_credentials(self, action, context, kwargs): raise exception.Unauthorized() creds = {} - token_data = token_ref['token_data'] + if 'token_data' in token_ref: + #V3 Tokens + token_data = token_ref['token_data'] + try: + creds['user_id'] = token_data['user']['id'] + except AttributeError: + LOG.warning(_('RBAC: Invalid user')) + raise exception.Unauthorized() + + if 'project' in token_data: + creds['project_id'] = token_data['project']['id'] + else: + LOG.debug(_('RBAC: Proceeding without project')) - try: - creds['user_id'] = token_data['user']['id'] - except AttributeError: - LOG.warning(_('RBAC: Invalid user')) - raise exception.Unauthorized() + if 'domain' in token_data: + creds['domain_id'] = token_data['domain']['id'] - if 'project' in token_data: - creds['project_id'] = token_data['project']['id'] + if 'roles' in token_data: + creds['roles'] = [] + for role in token_data['roles']: + creds['roles'].append(role['name']) else: - LOG.debug(_('RBAC: Proceeding without project')) + #v2 Tokens + creds = token_ref.get('metadata', {}).copy() + try: + creds['user_id'] = token_ref['user'].get('id') + except AttributeError: + LOG.warning(_('RBAC: Invalid user')) + raise exception.Unauthorized() + try: + creds['project_id'] = token_ref['tenant'].get('id') + except AttributeError: + LOG.debug(_('RBAC: Proceeding without tenant')) + # NOTE(vish): this is pretty inefficient + creds['roles'] = [self.identity_api.get_role(context, role)['name'] + for role in creds.get('roles', [])] + + return creds - if 'domain' in token_data: - creds['domain_id'] = token_data['domain']['id'] - if 'roles' in token_data: - creds['roles'] = [] - for role in token_data['roles']: - creds['roles'].append(role['name']) +def flatten(d, parent_key=''): + """Flatten a nested dictionary - return creds + Converts a dictionary with nested values to a single level flat + dictionary, with dotted notation for each key. + + """ + items = [] + for k, v in d.items(): + new_key = parent_key + '.' + k if parent_key else k + if isinstance(v, collections.MutableMapping): + items.extend(flatten(v, new_key).items()) + else: + items.append((new_key, v)) + return dict(items) def protected(f): """Wraps API calls with role based access controls (RBAC).""" - @functools.wraps(f) def wrapper(self, context, **kwargs): - if not context['is_admin']: + if 'is_admin' in context and context['is_admin']: + LOG.warning(_('RBAC: Bypassing authorization')) + else: action = 'identity:%s' % f.__name__ creds = _build_policy_check_credentials(self, action, context, kwargs) # Simply use the passed kwargs as the target dict, which # would typically include the prime key of a get/update/delete # call. - self.policy_api.enforce(context, creds, action, kwargs) + self.policy_api.enforce(context, creds, action, flatten(kwargs)) LOG.debug(_('RBAC: Authorization granted')) - else: - LOG.warning(_('RBAC: Bypassing authorization')) return f(self, context, **kwargs) return wrapper @@ -89,11 +122,6 @@ def filterprotected(*filters): # parameter) and would typically include the prime key # of a get/update/delete call # - # TODO(henry-nash) do we need to put the whole object - # in, which is part of kwargs? I kept this in as it was part - # of the previous implementation, but without a specific key - # reference in the target I don't see how it can be used. - # First any query filter parameters target = dict() if len(filters) > 0: @@ -109,7 +137,8 @@ def filterprotected(*filters): for key in kwargs: target[key] = kwargs[key] - self.policy_api.enforce(context, creds, action, target) + self.policy_api.enforce(context, creds, action, + flatten(target)) LOG.debug(_('RBAC: Authorization granted')) else: @@ -119,7 +148,7 @@ def filterprotected(*filters): return _filterprotected -@dependency.requires('identity_api', 'policy_api', 'token_api') +@dependency.requires('identity_api', 'policy_api', 'token_api', 'catalog_api') class V2Controller(wsgi.Application): """Base controller class for Identity API v2.""" diff --git a/tests/test_v3_protection.py b/tests/test_v3_protection.py index bda73415..999dfa86 100644 --- a/tests/test_v3_protection.py +++ b/tests/test_v3_protection.py @@ -15,6 +15,7 @@ # License for the specific language governing permissions and limitations # under the License. +import json import tempfile import uuid @@ -130,20 +131,25 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase): def test_list_users_protected_by_domain(self): """GET /users?domain_id=mydomain (protected)""" - raise nose.exc.SkipTest('Blocked by incomplete ' - 'domain scoping in v3/auth') # Update policy to protect by domain, and then use a domain # scoped token new_policy = """{"identity:list_users": ["domain_id:%(domain_id)s"]}""" with open(self.tmpfilename, "w") as policyfile: policyfile.write(new_policy) - self.auth['scope'] = {'domain': []} - self.auth['domain']['id'] = self.domainA['id'] + self.auth['scope'] = {'domain': {'id': self.domainA['id']}} url_by_name = '/users?domain_id=%s' % self.user1['domain_id'] r = self.get(url_by_name, auth=self.auth) # We should only get back one user, the one in DomainA id_list = self._get_id_list_from_ref_list(r.body.get('users')) - self.assertIn(self.user2['id'], id_list) + self.assertIn(self.user1['id'], id_list) - # TODO (henry-nash) Add some more tests to cover the various likely - # protection filters + def test_get_user_protected_match_id(self): + """GET /users/{id} (match payload)""" + # Tests the flattening of the payload + policy = {"identity:get_user": [["user_id:%(user_id)s"]]} + with open(self.tmpfilename, "w") as policyfile: + policyfile.write(json.dumps(policy)) + url_by_name = '/users/%s' % self.user1['id'] + r = self.get(url_by_name, auth=self.auth) + body = r.body + self.assertEquals(self.user1['id'], body['user']['id']) -- cgit