summaryrefslogtreecommitdiffstats
path: root/tests/test_v3_protection.py
diff options
context:
space:
mode:
authorHenry Nash <henryn@linux.vnet.ibm.com>2013-02-18 10:29:43 +0000
committerHenry Nash <henryn@linux.vnet.ibm.com>2013-02-21 00:57:04 +0000
commit329aeca9f9db4badc82d72907e7891c7d2de2f4b (patch)
treec63466d6d94ded7c7ecae6dec7b192c6ed279b6a /tests/test_v3_protection.py
parent9f812939d4b05384b0a7d48e6b916baeca0477dc (diff)
downloadkeystone-329aeca9f9db4badc82d72907e7891c7d2de2f4b.tar.gz
keystone-329aeca9f9db4badc82d72907e7891c7d2de2f4b.tar.xz
keystone-329aeca9f9db4badc82d72907e7891c7d2de2f4b.zip
Pass query filter attributes to policy engine
With the v3 api, there will be cases when a cloud provider will want to be able to protect apis by matching items in the query filter string. A classic case would be: GET /users?domain_id=mydomain The change augments the v3 controller protection wrapper with one that will also pass in filter parameters. Since this filter list also equates to the filter_by_attribute code that the subsequent api call will make, the filterprotection wrapper passes the filter list into the api call, allowing the code body to not have to re-specify the same list. This also has the consequency of fixing all the missing filter_by_attribute statements in the current code base. Some tests cannot yet be run due to dependency on completion of v3/auth Fixes Bug #1126048 Fixes Bug #1101240 Change-Id: Ibd9867f6eed585414671bbab774df95b8acdf6a5
Diffstat (limited to 'tests/test_v3_protection.py')
-rw-r--r--tests/test_v3_protection.py139
1 files changed, 139 insertions, 0 deletions
diff --git a/tests/test_v3_protection.py b/tests/test_v3_protection.py
new file mode 100644
index 00000000..b211af10
--- /dev/null
+++ b/tests/test_v3_protection.py
@@ -0,0 +1,139 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 OpenStack LLC
+# Copyright 2013 IBM
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import tempfile
+import uuid
+
+import nose.exc
+
+from keystone.policy.backends import rules
+
+import test_v3
+
+
+class IdentityTestProtectedCase(test_v3.RestfulTestCase):
+ """Test policy protection of a sample of v3 identity apis"""
+
+ def setUp(self):
+ super(IdentityTestProtectedCase, self).setUp()
+ # Start by creating a couple of domains
+ self.domainA = self.new_domain_ref()
+ domainA_ref = self.identity_api.create_domain(self.domainA['id'],
+ self.domainA)
+
+ self.domainB = self.new_domain_ref()
+ domainB_ref = self.identity_api.create_domain(self.domainB['id'],
+ self.domainB)
+
+ # Now create some users, one in domainA and two of them in domainB
+ self.user1 = self.new_user_ref(
+ domain_id=self.domainA['id'])
+ self.user1['password'] = uuid.uuid4().hex
+ user_ref = self.identity_api.create_user(self.user1['id'], self.user1)
+
+ self.user2 = self.new_user_ref(
+ domain_id=self.domainB['id'])
+ self.user2['password'] = uuid.uuid4().hex
+ user_ref = self.identity_api.create_user(self.user2['id'], self.user2)
+
+ self.user3 = self.new_user_ref(
+ domain_id=self.domainB['id'])
+ self.user3['password'] = uuid.uuid4().hex
+ user_ref = self.identity_api.create_user(self.user3['id'], self.user3)
+
+ self.project = self.new_project_ref(
+ domain_id=self.domainA['id'])
+ project_ref = self.identity_api.create_project(self.project['id'],
+ self.project)
+
+ self.role = self.new_role_ref()
+ self.identity_api.create_role(self.role['id'], self.role)
+ self.identity_api.add_role_to_user_and_project(self.user1['id'],
+ self.project['id'],
+ self.role['id'])
+ self.identity_api.create_grant(self.role['id'],
+ user_id=self.user1['id'],
+ domain_id=self.domainA['id'])
+
+ # Initialize the policy engine and allow us to write to a temp
+ # file in each test to create the policies
+ rules.reset()
+ _unused, self.tmpfilename = tempfile.mkstemp()
+ self.opt(policy_file=self.tmpfilename)
+
+ # A default auth request we can use - un-scoped user token
+ self.auth = {}
+ self.auth['authentication'] = {'methods': []}
+ self.auth['authentication']['methods'].append('password')
+ self.auth['authentication']['password'] = {'user': {}}
+ self.auth['authentication']['password']['user']['id'] = (
+ self.user1['id'])
+ self.auth['authentication']['password']['user']['password'] = (
+ self.user1['password'])
+
+ def _get_id_list_from_ref_list(self, ref_list):
+ result_list = []
+ for x in ref_list:
+ result_list.append(x['id'])
+ return result_list
+
+ def test_list_users_unprotected(self):
+ """GET /users (unprotected)"""
+
+ # Make sure we get all the users back if no protection
+ # or filtering
+ with open(self.tmpfilename, "w") as policyfile:
+ policyfile.write("""{"identity:list_users": []}""")
+ r = self.get('/users', auth=self.auth)
+ id_list = self._get_id_list_from_ref_list(r.body.get('users'))
+ self.assertIn(self.user1['id'], id_list)
+ self.assertIn(self.user2['id'], id_list)
+ self.assertIn(self.user3['id'], id_list)
+
+ def test_list_users_filtered_by_domain(self):
+ """GET /users?domain_id=mydomain """
+
+ # Using no protection, make sure filtering works
+ with open(self.tmpfilename, "w") as policyfile:
+ policyfile.write("""{"identity:list_users": []}""")
+ url_by_name = '/users?domain_id=%s' % self.domainB['id']
+ r = self.get(url_by_name, auth=self.auth)
+ # We should get back two users, those in DomainB
+ id_list = self._get_id_list_from_ref_list(r.body.get('users'))
+ self.assertIn(self.user2['id'], id_list)
+ self.assertIn(self.user3['id'], id_list)
+
+ def test_list_users_protected_by_domain(self):
+ """GET /users?domain_id=mydomain (protected)"""
+
+ raise nose.exc.SkipTest('Blocked by incomplete '
+ 'domain scoping in v3/auth')
+ # Update policy to protect by domain, and then use a domain
+ # scoped token
+ new_policy = """{"identity:list_users": ["domain_id:%(domain_id)s"]}"""
+ with open(self.tmpfilename, "w") as policyfile:
+ policyfile.write(new_policy)
+ self.auth['scope'] = {'domain': []}
+ self.auth['domain']['id'] = self.domainA['id']
+ url_by_name = '/users?domain_id=%s' % self.user1['domain_id']
+ r = self.get(url_by_name, auth=self.auth)
+ # We should only get back one user, the one in DomainA
+ id_list = self._get_id_list_from_ref_list(r.body.get('users'))
+ self.assertIn(self.user2['id'], id_list)
+
+ # TODO (henry-nash) Add some more tests to cover the various likely
+ # protection filters