# vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2011 Piston Cloud Computing, Inc. # All Rights Reserved. # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """Test of Policy Engine For Nova.""" import os.path import StringIO import urllib2 from nova import context from nova import exception from nova.openstack.common import policy as common_policy from nova import policy from nova import test from nova import utils class PolicyFileTestCase(test.TestCase): def setUp(self): super(PolicyFileTestCase, self).setUp() self.context = context.RequestContext('fake', 'fake') self.target = {} def test_modified_policy_reloads(self): with utils.tempdir() as tmpdir: tmpfilename = os.path.join(tmpdir, 'policy') self.flags(policy_file=tmpfilename) # NOTE(uni): context construction invokes policy check to determin # is_admin or not. As a side-effect, policy reset is needed here # to flush existing policy cache. policy.reset() action = "example:test" with open(tmpfilename, "w") as policyfile: policyfile.write('{"example:test": ""}') policy.enforce(self.context, action, self.target) with open(tmpfilename, "w") as policyfile: policyfile.write('{"example:test": "!"}') # NOTE(vish): reset stored policy cache so we don't have to # sleep(1) policy._POLICY_CACHE = {} self.assertRaises(exception.PolicyNotAuthorized, policy.enforce, self.context, action, self.target) class PolicyTestCase(test.TestCase): def setUp(self): super(PolicyTestCase, self).setUp() rules = { "true": '@', "example:allowed": '@', "example:denied": "!", "example:get_http": "http://www.example.com", "example:my_file": "role:compute_admin or " "project_id:%(project_id)s", "example:early_and_fail": "! and @", "example:early_or_success": "@ or !", "example:lowercase_admin": "role:admin or role:sysadmin", "example:uppercase_admin": "role:ADMIN or role:sysadmin", } self.policy.set_rules(rules) self.context = context.RequestContext('fake', 'fake', roles=['member']) self.target = {} def test_enforce_nonexistent_action_throws(self): action = "example:noexist" self.assertRaises(exception.PolicyNotAuthorized, policy.enforce, self.context, action, self.target) def test_enforce_bad_action_throws(self): action = "example:denied" self.assertRaises(exception.PolicyNotAuthorized, policy.enforce, self.context, action, self.target) def test_enforce_bad_action_noraise(self): action = "example:denied" result = policy.enforce(self.context, action, self.target, False) self.assertEqual(result, False) def test_enforce_good_action(self): action = "example:allowed" result = policy.enforce(self.context, action, self.target) self.assertEqual(result, True) def test_enforce_http_true(self): def fakeurlopen(url, post_data): return StringIO.StringIO("True") self.stubs.Set(urllib2, 'urlopen', fakeurlopen) action = "example:get_http" target = {} result = policy.enforce(self.context, action, target) self.assertEqual(result, True) def test_enforce_http_false(self): def fakeurlopen(url, post_data): return StringIO.StringIO("False") self.stubs.Set(urllib2, 'urlopen', fakeurlopen) action = "example:get_http" target = {} self.assertRaises(exception.PolicyNotAuthorized, policy.enforce, self.context, action, target) def test_templatized_enforcement(self): target_mine = {'project_id': 'fake'} target_not_mine = {'project_id': 'another'} action = "example:my_file" policy.enforce(self.context, action, target_mine) self.assertRaises(exception.PolicyNotAuthorized, policy.enforce, self.context, action, target_not_mine) def test_early_AND_enforcement(self): action = "example:early_and_fail" self.assertRaises(exception.PolicyNotAuthorized, policy.enforce, self.context, action, self.target) def test_early_OR_enforcement(self): action = "example:early_or_success" policy.enforce(self.context, action, self.target) def test_ignore_case_role_check(self): lowercase_action = "example:lowercase_admin" uppercase_action = "example:uppercase_admin" # NOTE(dprince) we mix case in the Admin role here to ensure # case is ignored admin_context = context.RequestContext('admin', 'fake', roles=['AdMiN']) policy.enforce(admin_context, lowercase_action, self.target) policy.enforce(admin_context, uppercase_action, self.target) class DefaultPolicyTestCase(test.TestCase): def setUp(self): super(DefaultPolicyTestCase, self).setUp() self.rules = { "default": '', "example:exist": "!", } self._set_rules('default') self.context = context.RequestContext('fake', 'fake') def _set_rules(self, default_rule): rules = common_policy.Rules( dict((k, common_policy.parse_rule(v)) for k, v in self.rules.items()), default_rule) common_policy.set_rules(rules) def test_policy_called(self): self.assertRaises(exception.PolicyNotAuthorized, policy.enforce, self.context, "example:exist", {}) def test_not_found_policy_calls_default(self): policy.enforce(self.context, "example:noexist", {}) def test_default_not_found(self): self._set_rules("default_noexist") self.assertRaises(exception.PolicyNotAuthorized, policy.enforce, self.context, "example:noexist", {}) class IsAdminCheckTestCase(test.TestCase): def test_init_true(self): check = policy.IsAdminCheck('is_admin', 'True') self.assertEqual(check.kind, 'is_admin') self.assertEqual(check.match, 'True') self.assertEqual(check.expected, True) def test_init_false(self): check = policy.IsAdminCheck('is_admin', 'nottrue') self.assertEqual(check.kind, 'is_admin') self.assertEqual(check.match, 'False') self.assertEqual(check.expected, False) def test_call_true(self): check = policy.IsAdminCheck('is_admin', 'True') self.assertEqual(check('target', dict(is_admin=True)), True) self.assertEqual(check('target', dict(is_admin=False)), False) def test_call_false(self): check = policy.IsAdminCheck('is_admin', 'False') self.assertEqual(check('target', dict(is_admin=True)), False) self.assertEqual(check('target', dict(is_admin=False)), True)