diff options
| author | Vishvananda Ishaya <vishvananda@gmail.com> | 2010-12-15 00:25:04 +0000 |
|---|---|---|
| committer | Vishvananda Ishaya <vishvananda@gmail.com> | 2010-12-15 00:25:04 +0000 |
| commit | 99347717ed2c7e92b3dc3bd33c12a3a05e8e349d (patch) | |
| tree | ec8401e992ce728032a325faab4b231b650f20f4 /nova/tests | |
| parent | e328d7bbb18a4b32671f2016ef3dc1bd63cbe3a0 (diff) | |
Lockout middleware for ec2 api
Diffstat (limited to 'nova/tests')
| -rw-r--r-- | nova/tests/middleware_unittest.py | 82 |
1 files changed, 82 insertions, 0 deletions
diff --git a/nova/tests/middleware_unittest.py b/nova/tests/middleware_unittest.py new file mode 100644 index 000000000..bbbd4a5a7 --- /dev/null +++ b/nova/tests/middleware_unittest.py @@ -0,0 +1,82 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import webob +import webob.dec +import webob.exc + +from nova.api import ec2 +from nova import flags +from nova import test + + +FLAGS = flags.FLAGS + + +@webob.dec.wsgify +def conditional_forbid(req): + """Helper wsgi app returns 403 if param 'die' is 1.""" + if 'die' in req.params and req.params['die'] == '1': + raise webob.exc.HTTPForbidden() + return 'OK' + + +class LockoutTestCase(test.TrialTestCase): + """Test case for the Lockout middleware.""" + def setUp(self): # pylint: disable-msg=C0103 + self.local_time = 0 + self.lockout = ec2.Lockout(conditional_forbid, + time_fn=self._constant_time) + super(LockoutTestCase, self).setUp() + + def _constant_time(self): + """Helper method to force timeouts.""" + return self.local_time + + def _trigger_lockout(self, access_key): + """Send x failed requests where x = lockout_attempts.""" + for i in xrange(FLAGS.lockout_attempts): + req = webob.Request.blank('/?AWSAccessKeyId=%s&die=1' % access_key) + self.assertEqual(req.get_response(self.lockout).status_int, 403) + + def _is_locked_out(self, access_key): + """Sends a test request to see if key is locked out.""" + req = webob.Request.blank('/?AWSAccessKeyId=%s' % access_key) + return (req.get_response(self.lockout).status_int == 403) + + def _timeout(self): + """Increment time to 1 second past the lockout.""" + self.local_time = 1 + self.local_time + FLAGS.lockout_minutes * 60 + + def test_lockout(self): + self._trigger_lockout('test') + self.assertTrue(self._is_locked_out('test')) + + def test_timeout(self): + self._trigger_lockout('test') + self.assertTrue(self._is_locked_out('test')) + self._timeout() + self.assertFalse(self._is_locked_out('test')) + + def test_multiple_keys(self): + self._trigger_lockout('test1') + self.assertTrue(self._is_locked_out('test1')) + self.assertFalse(self._is_locked_out('test2')) + self._timeout() + self.assertFalse(self._is_locked_out('test1')) + self.assertFalse(self._is_locked_out('test2')) |
