summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenry Nash <henryn@linux.vnet.ibm.com>2013-02-18 10:29:43 +0000
committerHenry Nash <henryn@linux.vnet.ibm.com>2013-02-21 00:57:04 +0000
commit329aeca9f9db4badc82d72907e7891c7d2de2f4b (patch)
treec63466d6d94ded7c7ecae6dec7b192c6ed279b6a
parent9f812939d4b05384b0a7d48e6b916baeca0477dc (diff)
Pass query filter attributes to policy engine
With the v3 api, there will be cases when a cloud provider will want to be able to protect apis by matching items in the query filter string. A classic case would be: GET /users?domain_id=mydomain The change augments the v3 controller protection wrapper with one that will also pass in filter parameters. Since this filter list also equates to the filter_by_attribute code that the subsequent api call will make, the filterprotection wrapper passes the filter list into the api call, allowing the code body to not have to re-specify the same list. This also has the consequency of fixing all the missing filter_by_attribute statements in the current code base. Some tests cannot yet be run due to dependency on completion of v3/auth Fixes Bug #1126048 Fixes Bug #1101240 Change-Id: Ibd9867f6eed585414671bbab774df95b8acdf6a5
-rw-r--r--keystone/catalog/controllers.py15
-rw-r--r--keystone/common/controller.py136
-rw-r--r--keystone/identity/controllers.py50
-rw-r--r--keystone/policy/controllers.py7
-rw-r--r--tests/test_v3.py19
-rw-r--r--tests/test_v3_identity.py16
-rw-r--r--tests/test_v3_protection.py139
7 files changed, 306 insertions, 76 deletions
diff --git a/keystone/catalog/controllers.py b/keystone/catalog/controllers.py
index 78f1c9e1..24482fe7 100644
--- a/keystone/catalog/controllers.py
+++ b/keystone/catalog/controllers.py
@@ -131,11 +131,10 @@ class ServiceV3(controller.V3Controller):
ref = self.catalog_api.create_service(context, ref['id'], ref)
return ServiceV3.wrap_member(context, ref)
- @controller.protected
- def list_services(self, context):
+ @controller.filterprotected('type')
+ def list_services(self, context, filters):
refs = self.catalog_api.list_services(context)
- refs = self._filter_by_attribute(context, refs, 'type')
- return ServiceV3.wrap_collection(context, refs)
+ return ServiceV3.wrap_collection(context, refs, filters)
@controller.protected
def get_service(self, context, service_id):
@@ -169,12 +168,10 @@ class EndpointV3(controller.V3Controller):
ref = self.catalog_api.create_endpoint(context, ref['id'], ref)
return EndpointV3.wrap_member(context, ref)
- @controller.protected
- def list_endpoints(self, context):
+ @controller.filterprotected('interface', 'service_id')
+ def list_endpoints(self, context, filters):
refs = self.catalog_api.list_endpoints(context)
- refs = self._filter_by_attribute(context, refs, 'service_id')
- refs = self._filter_by_attribute(context, refs, 'interface')
- return EndpointV3.wrap_collection(context, refs)
+ return EndpointV3.wrap_collection(context, refs, filters)
@controller.protected
def get_endpoint(self, context, endpoint_id):
diff --git a/keystone/common/controller.py b/keystone/common/controller.py
index f4b8804a..6377692e 100644
--- a/keystone/common/controller.py
+++ b/keystone/common/controller.py
@@ -13,6 +13,44 @@ CONF = config.CONF
DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id
+def _build_policy_check_credentials(self, action, context, kwargs):
+
+ LOG.debug(_('RBAC: Authorizing %s(%s)') % (
+ action,
+ ', '.join(['%s=%s' % (k, kwargs[k]) for k in kwargs])))
+
+ try:
+ token_ref = self.token_api.get_token(
+ context=context, token_id=context['token_id'])
+ except exception.TokenNotFound:
+ LOG.warning(_('RBAC: Invalid token'))
+ raise exception.Unauthorized()
+
+ creds = {}
+ token_data = token_ref['token_data']
+
+ try:
+ creds['user_id'] = token_data['user']['id']
+ except AttributeError:
+ LOG.warning(_('RBAC: Invalid user'))
+ raise exception.Unauthorized()
+
+ if 'project' in token_data:
+ creds['project_id'] = token_data['project']['id']
+ else:
+ LOG.debug(_('RBAC: Proceeding without project'))
+
+ if 'domain' in token_data:
+ creds['domain_id'] = token_data['domain']['id']
+
+ if 'roles' in token_data:
+ creds['roles'] = []
+ for role in token_data['roles']:
+ creds['roles'].append(role['name'])
+
+ return creds
+
+
def protected(f):
"""Wraps API calls with role based access controls (RBAC)."""
@@ -20,43 +58,65 @@ def protected(f):
def wrapper(self, context, **kwargs):
if not context['is_admin']:
action = 'identity:%s' % f.__name__
+ creds = _build_policy_check_credentials(self, action,
+ context, kwargs)
+ # Simply use the passed kwargs as the target dict, which
+ # would typically include the prime key of a get/update/delete
+ # call.
+ self.policy_api.enforce(context, creds, action, kwargs)
+ LOG.debug(_('RBAC: Authorization granted'))
+ else:
+ LOG.warning(_('RBAC: Bypassing authorization'))
- LOG.debug(_('RBAC: Authorizing %s(%s)') % (
- action,
- ', '.join(['%s=%s' % (k, kwargs[k]) for k in kwargs])))
+ return f(self, context, **kwargs)
+ return wrapper
- try:
- token_ref = self.token_api.get_token(
- context=context, token_id=context['token_id'])
- except exception.TokenNotFound:
- LOG.warning(_('RBAC: Invalid token'))
- raise exception.Unauthorized()
- creds = token_ref.get('metadata', {}).copy()
+def filterprotected(*filters):
+ """Wraps filtered API calls with role based access controls (RBAC)."""
- try:
- creds['user_id'] = token_ref['user'].get('id')
- except AttributeError:
- LOG.warning(_('RBAC: Invalid user'))
- raise exception.Unauthorized()
+ def _filterprotected(f):
+ @functools.wraps(f)
+ def wrapper(self, context, **kwargs):
+ if not context['is_admin']:
+ action = 'identity:%s' % f.__name__
+ creds = _build_policy_check_credentials(self, action,
+ context, kwargs)
+ # Now, build the target dict for policy check. We include:
+ #
+ # - Any query filter parameters
+ # - Data from the main url (which will be in the kwargs
+ # parameter) and would typically include the prime key
+ # of a get/update/delete call
+ #
+ # TODO(henry-nash) do we need to put the whole object
+ # in, which is part of kwargs? I kept this in as it was part
+ # of the previous implementation, but without a specific key
+ # reference in the target I don't see how it can be used.
- try:
- creds['tenant_id'] = token_ref['tenant'].get('id')
- except AttributeError:
- LOG.debug(_('RBAC: Proceeding without tenant'))
+ # First any query filter parameters
+ target = dict()
+ if len(filters) > 0:
+ for filter in filters:
+ if filter in context['query_string']:
+ target[filter] = context['query_string'][filter]
- # NOTE(vish): this is pretty inefficient
- creds['roles'] = [self.identity_api.get_role(context, role)['name']
- for role in creds.get('roles', [])]
+ LOG.debug(_('RBAC: Adding query filter params (%s)') % (
+ ', '.join(['%s=%s' % (filter, target[filter])
+ for filter in target])))
- self.policy_api.enforce(context, creds, action, kwargs)
+ # Now any formal url parameters
+ for key in kwargs:
+ target[key] = kwargs[key]
- LOG.debug(_('RBAC: Authorization granted'))
- else:
- LOG.warning(_('RBAC: Bypassing authorization'))
+ self.policy_api.enforce(context, creds, action, target)
- return f(self, context, **kwargs)
- return wrapper
+ LOG.debug(_('RBAC: Authorization granted'))
+ else:
+ LOG.warning(_('RBAC: Bypassing authorization'))
+ return f(self, context, filters, **kwargs)
+ return wrapper
+ return _filterprotected
@dependency.requires('identity_api', 'policy_api', 'token_api')
@@ -124,7 +184,10 @@ class V3Controller(V2Controller):
return {cls.member_name: ref}
@classmethod
- def wrap_collection(cls, context, refs):
+ def wrap_collection(cls, context, refs, filters=[]):
+ for f in filters:
+ refs = cls.filter_by_attribute(context, refs, f)
+
refs = cls.paginate(context, refs)
for ref in refs:
@@ -147,6 +210,14 @@ class V3Controller(V2Controller):
per_page = context['query_string'].get('per_page', 30)
return refs[per_page * (page - 1):per_page * page]
+ @classmethod
+ def filter_by_attribute(cls, context, refs, attr):
+ """Filters a list of references by query string value."""
+ if attr in context['query_string']:
+ value = context['query_string'][attr]
+ return [r for r in refs if r[attr] == value]
+ return refs
+
def _require_matching_id(self, value, ref):
"""Ensures the value matches the reference's ID, if any."""
if 'id' in ref and ref['id'] != value:
@@ -158,13 +229,6 @@ class V3Controller(V2Controller):
ref['id'] = uuid.uuid4().hex
return ref
- def _filter_by_attribute(self, context, refs, attr):
- """Filters a list of references by query string value."""
- if attr in context['query_string']:
- value = context['query_string'][attr]
- return [r for r in refs if r[attr] == value]
- return refs
-
def _normalize_domain_id(self, context, ref):
"""Fill in domain_id if not specified in a v3 call."""
diff --git a/keystone/identity/controllers.py b/keystone/identity/controllers.py
index ce1f54bc..e5f30e56 100644
--- a/keystone/identity/controllers.py
+++ b/keystone/identity/controllers.py
@@ -410,11 +410,10 @@ class DomainV3(controller.V3Controller):
ref = self.identity_api.create_domain(context, ref['id'], ref)
return DomainV3.wrap_member(context, ref)
- @controller.protected
- def list_domains(self, context):
+ @controller.filterprotected('enabled', 'name')
+ def list_domains(self, context, filters):
refs = self.identity_api.list_domains(context)
- refs = self._filter_by_attribute(context, refs, 'name')
- return DomainV3.wrap_collection(context, refs)
+ return DomainV3.wrap_collection(context, refs, filters)
@controller.protected
def get_domain(self, context, domain_id):
@@ -490,15 +489,15 @@ class ProjectV3(controller.V3Controller):
ref = self.identity_api.create_project(context, ref['id'], ref)
return ProjectV3.wrap_member(context, ref)
- @controller.protected
- def list_projects(self, context):
+ @controller.filterprotected('domain_id', 'enabled', 'name')
+ def list_projects(self, context, filters):
refs = self.identity_api.list_projects(context)
- return ProjectV3.wrap_collection(context, refs)
+ return ProjectV3.wrap_collection(context, refs, filters)
- @controller.protected
- def list_user_projects(self, context, user_id):
+ @controller.filterprotected('enabled', 'name')
+ def list_user_projects(self, context, filters, user_id):
refs = self.identity_api.list_user_projects(context, user_id)
- return ProjectV3.wrap_collection(context, refs)
+ return ProjectV3.wrap_collection(context, refs, filters)
@controller.protected
def get_project(self, context, project_id):
@@ -528,15 +527,15 @@ class UserV3(controller.V3Controller):
ref = self.identity_api.create_user(context, ref['id'], ref)
return UserV3.wrap_member(context, ref)
- @controller.protected
- def list_users(self, context):
+ @controller.filterprotected('domain_id', 'email', 'enabled', 'name')
+ def list_users(self, context, filters):
refs = self.identity_api.list_users(context)
- return UserV3.wrap_collection(context, refs)
+ return UserV3.wrap_collection(context, refs, filters)
- @controller.protected
- def list_users_in_group(self, context, group_id):
+ @controller.filterprotected('domain_id', 'email', 'enabled', 'name')
+ def list_users_in_group(self, context, filters, group_id):
refs = self.identity_api.list_users_in_group(context, group_id)
- return UserV3.wrap_collection(context, refs)
+ return UserV3.wrap_collection(context, refs, filters)
@controller.protected
def get_user(self, context, user_id):
@@ -546,7 +545,6 @@ class UserV3(controller.V3Controller):
@controller.protected
def update_user(self, context, user_id, user):
self._require_matching_id(user_id, user)
-
ref = self.identity_api.update_user(context, user_id, user)
if user.get('password') or not user.get('enabled', True):
@@ -588,15 +586,15 @@ class GroupV3(controller.V3Controller):
ref = self.identity_api.create_group(context, ref['id'], ref)
return GroupV3.wrap_member(context, ref)
- @controller.protected
- def list_groups(self, context):
+ @controller.filterprotected('domain_id', 'name')
+ def list_groups(self, context, filters):
refs = self.identity_api.list_groups(context)
- return GroupV3.wrap_collection(context, refs)
+ return GroupV3.wrap_collection(context, refs, filters)
- @controller.protected
- def list_groups_for_user(self, context, user_id):
+ @controller.filterprotected('name')
+ def list_groups_for_user(self, context, filters, user_id):
refs = self.identity_api.list_groups_for_user(context, user_id)
- return GroupV3.wrap_collection(context, refs)
+ return GroupV3.wrap_collection(context, refs, filters)
@controller.protected
def get_group(self, context, group_id):
@@ -660,10 +658,10 @@ class RoleV3(controller.V3Controller):
ref = self.identity_api.create_role(context, ref['id'], ref)
return RoleV3.wrap_member(context, ref)
- @controller.protected
- def list_roles(self, context):
+ @controller.filterprotected('name')
+ def list_roles(self, context, filters):
refs = self.identity_api.list_roles(context)
- return RoleV3.wrap_collection(context, refs)
+ return RoleV3.wrap_collection(context, refs, filters)
@controller.protected
def get_role(self, context, role_id):
diff --git a/keystone/policy/controllers.py b/keystone/policy/controllers.py
index ee2e4ee3..9fed6cf8 100644
--- a/keystone/policy/controllers.py
+++ b/keystone/policy/controllers.py
@@ -30,11 +30,10 @@ class PolicyV3(controller.V3Controller):
ref = self.policy_api.create_policy(context, ref['id'], ref)
return PolicyV3.wrap_member(context, ref)
- @controller.protected
- def list_policies(self, context):
+ @controller.filterprotected('type')
+ def list_policies(self, context, filters):
refs = self.policy_api.list_policies(context)
- refs = self._filter_by_attribute(context, refs, 'type')
- return PolicyV3.wrap_collection(context, refs)
+ return PolicyV3.wrap_collection(context, refs, filters)
@controller.protected
def get_policy(self, context, policy_id):
diff --git a/tests/test_v3.py b/tests/test_v3.py
index e2d367bb..ef1a1a6c 100644
--- a/tests/test_v3.py
+++ b/tests/test_v3.py
@@ -148,11 +148,28 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
})
return r.getheader('X-Subject-Token')
+ def get_requested_token(self, auth):
+ """Request the specific token we want."""
+
+ r = self.admin_request(
+ method='POST',
+ path='/v3/auth/tokens',
+ body=auth)
+ return r.getheader('X-Subject-Token')
+
def v3_request(self, path, **kwargs):
+ # Check if the caller has passed in auth details for
+ # use in requesting the token
+ auth = kwargs.get('auth', None)
+ if auth:
+ kwargs.pop('auth')
+ token = self.get_requested_token(auth)
+ else:
+ token = self.get_scoped_token()
path = '/v3' + path
return self.admin_request(
path=path,
- token=self.get_scoped_token(),
+ token=token,
**kwargs)
def get(self, path, **kwargs):
diff --git a/tests/test_v3_identity.py b/tests/test_v3_identity.py
index 77c5c898..47e50281 100644
--- a/tests/test_v3_identity.py
+++ b/tests/test_v3_identity.py
@@ -1,3 +1,19 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 OpenStack LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
import uuid
import test_v3
diff --git a/tests/test_v3_protection.py b/tests/test_v3_protection.py
new file mode 100644
index 00000000..b211af10
--- /dev/null
+++ b/tests/test_v3_protection.py
@@ -0,0 +1,139 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 OpenStack LLC
+# Copyright 2013 IBM
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import tempfile
+import uuid
+
+import nose.exc
+
+from keystone.policy.backends import rules
+
+import test_v3
+
+
+class IdentityTestProtectedCase(test_v3.RestfulTestCase):
+ """Test policy protection of a sample of v3 identity apis"""
+
+ def setUp(self):
+ super(IdentityTestProtectedCase, self).setUp()
+ # Start by creating a couple of domains
+ self.domainA = self.new_domain_ref()
+ domainA_ref = self.identity_api.create_domain(self.domainA['id'],
+ self.domainA)
+
+ self.domainB = self.new_domain_ref()
+ domainB_ref = self.identity_api.create_domain(self.domainB['id'],
+ self.domainB)
+
+ # Now create some users, one in domainA and two of them in domainB
+ self.user1 = self.new_user_ref(
+ domain_id=self.domainA['id'])
+ self.user1['password'] = uuid.uuid4().hex
+ user_ref = self.identity_api.create_user(self.user1['id'], self.user1)
+
+ self.user2 = self.new_user_ref(
+ domain_id=self.domainB['id'])
+ self.user2['password'] = uuid.uuid4().hex
+ user_ref = self.identity_api.create_user(self.user2['id'], self.user2)
+
+ self.user3 = self.new_user_ref(
+ domain_id=self.domainB['id'])
+ self.user3['password'] = uuid.uuid4().hex
+ user_ref = self.identity_api.create_user(self.user3['id'], self.user3)
+
+ self.project = self.new_project_ref(
+ domain_id=self.domainA['id'])
+ project_ref = self.identity_api.create_project(self.project['id'],
+ self.project)
+
+ self.role = self.new_role_ref()
+ self.identity_api.create_role(self.role['id'], self.role)
+ self.identity_api.add_role_to_user_and_project(self.user1['id'],
+ self.project['id'],
+ self.role['id'])
+ self.identity_api.create_grant(self.role['id'],
+ user_id=self.user1['id'],
+ domain_id=self.domainA['id'])
+
+ # Initialize the policy engine and allow us to write to a temp
+ # file in each test to create the policies
+ rules.reset()
+ _unused, self.tmpfilename = tempfile.mkstemp()
+ self.opt(policy_file=self.tmpfilename)
+
+ # A default auth request we can use - un-scoped user token
+ self.auth = {}
+ self.auth['authentication'] = {'methods': []}
+ self.auth['authentication']['methods'].append('password')
+ self.auth['authentication']['password'] = {'user': {}}
+ self.auth['authentication']['password']['user']['id'] = (
+ self.user1['id'])
+ self.auth['authentication']['password']['user']['password'] = (
+ self.user1['password'])
+
+ def _get_id_list_from_ref_list(self, ref_list):
+ result_list = []
+ for x in ref_list:
+ result_list.append(x['id'])
+ return result_list
+
+ def test_list_users_unprotected(self):
+ """GET /users (unprotected)"""
+
+ # Make sure we get all the users back if no protection
+ # or filtering
+ with open(self.tmpfilename, "w") as policyfile:
+ policyfile.write("""{"identity:list_users": []}""")
+ r = self.get('/users', auth=self.auth)
+ id_list = self._get_id_list_from_ref_list(r.body.get('users'))
+ self.assertIn(self.user1['id'], id_list)
+ self.assertIn(self.user2['id'], id_list)
+ self.assertIn(self.user3['id'], id_list)
+
+ def test_list_users_filtered_by_domain(self):
+ """GET /users?domain_id=mydomain """
+
+ # Using no protection, make sure filtering works
+ with open(self.tmpfilename, "w") as policyfile:
+ policyfile.write("""{"identity:list_users": []}""")
+ url_by_name = '/users?domain_id=%s' % self.domainB['id']
+ r = self.get(url_by_name, auth=self.auth)
+ # We should get back two users, those in DomainB
+ id_list = self._get_id_list_from_ref_list(r.body.get('users'))
+ self.assertIn(self.user2['id'], id_list)
+ self.assertIn(self.user3['id'], id_list)
+
+ def test_list_users_protected_by_domain(self):
+ """GET /users?domain_id=mydomain (protected)"""
+
+ raise nose.exc.SkipTest('Blocked by incomplete '
+ 'domain scoping in v3/auth')
+ # Update policy to protect by domain, and then use a domain
+ # scoped token
+ new_policy = """{"identity:list_users": ["domain_id:%(domain_id)s"]}"""
+ with open(self.tmpfilename, "w") as policyfile:
+ policyfile.write(new_policy)
+ self.auth['scope'] = {'domain': []}
+ self.auth['domain']['id'] = self.domainA['id']
+ url_by_name = '/users?domain_id=%s' % self.user1['domain_id']
+ r = self.get(url_by_name, auth=self.auth)
+ # We should only get back one user, the one in DomainA
+ id_list = self._get_id_list_from_ref_list(r.body.get('users'))
+ self.assertIn(self.user2['id'], id_list)
+
+ # TODO (henry-nash) Add some more tests to cover the various likely
+ # protection filters