summaryrefslogtreecommitdiffstats
path: root/nova/tests/test_middleware.py
blob: 9d49167baf8b7483a9a4eb0b1c7ec81d93632835 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# vim: tabstop=4 shiftwidth=4 softtabstop=4

# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import datetime
import webob
import webob.dec
import webob.exc

from nova.api import ec2
from nova import flags
from nova import test
from nova import utils


FLAGS = flags.FLAGS


@webob.dec.wsgify
def conditional_forbid(req):
    """Helper wsgi app returns 403 if param 'die' is 1."""
    if 'die' in req.params and req.params['die'] == '1':
        raise webob.exc.HTTPForbidden()
    return 'OK'


class LockoutTestCase(test.TestCase):
    """Test case for the Lockout middleware."""
    def setUp(self):  # pylint: disable-msg=C0103
        super(LockoutTestCase, self).setUp()
        utils.set_time_override()
        self.lockout = ec2.Lockout(conditional_forbid)

    def tearDown(self):  # pylint: disable-msg=C0103
        utils.clear_time_override()
        super(LockoutTestCase, self).tearDown()

    def _send_bad_attempts(self, access_key, num_attempts=1):
        """Fail x."""
        for i in xrange(num_attempts):
            req = webob.Request.blank('/?AWSAccessKeyId=%s&die=1' % access_key)
            self.assertEqual(req.get_response(self.lockout).status_int, 403)

    def _is_locked_out(self, access_key):
        """Sends a test request to see if key is locked out."""
        req = webob.Request.blank('/?AWSAccessKeyId=%s' % access_key)
        return (req.get_response(self.lockout).status_int == 403)

    def test_lockout(self):
        self._send_bad_attempts('test', FLAGS.lockout_attempts)
        self.assertTrue(self._is_locked_out('test'))

    def test_timeout(self):
        self._send_bad_attempts('test', FLAGS.lockout_attempts)
        self.assertTrue(self._is_locked_out('test'))
        utils.advance_time_seconds(FLAGS.lockout_minutes * 60)
        self.assertFalse(self._is_locked_out('test'))

    def test_multiple_keys(self):
        self._send_bad_attempts('test1', FLAGS.lockout_attempts)
        self.assertTrue(self._is_locked_out('test1'))
        self.assertFalse(self._is_locked_out('test2'))
        utils.advance_time_seconds(FLAGS.lockout_minutes * 60)
        self.assertFalse(self._is_locked_out('test1'))
        self.assertFalse(self._is_locked_out('test2'))

    def test_window_timeout(self):
        self._send_bad_attempts('test', FLAGS.lockout_attempts - 1)
        self.assertFalse(self._is_locked_out('test'))
        utils.advance_time_seconds(FLAGS.lockout_window * 60)
        self._send_bad_attempts('test', FLAGS.lockout_attempts - 1)
        self.assertFalse(self._is_locked_out('test'))