summaryrefslogtreecommitdiffstats
path: root/nova/api
diff options
context:
space:
mode:
authorMichael Gundlach <michael.gundlach@rackspace.com>2010-09-20 18:30:35 -0400
committerMichael Gundlach <michael.gundlach@rackspace.com>2010-09-20 18:30:35 -0400
commit9ea20110ae05a0bd5294774c2ee11626e9c4147f (patch)
tree0e3943e4868f92b9d3c8c8939ca51b2eceb6b1ed /nova/api
parent8fb32a956490f9de623fad12d9b2b1f08f88511a (diff)
parentd8861d04a85044ae57ffd7eb9ab682879beecf7d (diff)
downloadnova-9ea20110ae05a0bd5294774c2ee11626e9c4147f.tar.gz
nova-9ea20110ae05a0bd5294774c2ee11626e9c4147f.tar.xz
nova-9ea20110ae05a0bd5294774c2ee11626e9c4147f.zip
Merge from trunk
Diffstat (limited to 'nova/api')
-rw-r--r--nova/api/__init__.py17
-rw-r--r--nova/api/rackspace/__init__.py69
-rw-r--r--nova/api/rackspace/ratelimiting/__init__.py122
-rw-r--r--nova/api/rackspace/ratelimiting/tests.py237
4 files changed, 440 insertions, 5 deletions
diff --git a/nova/api/__init__.py b/nova/api/__init__.py
index 786b246ec..821f1deea 100644
--- a/nova/api/__init__.py
+++ b/nova/api/__init__.py
@@ -21,6 +21,7 @@ Root WSGI middleware for all API controllers.
"""
import routes
+import webob.dec
from nova import wsgi
from nova.api import ec2
@@ -32,10 +33,18 @@ class API(wsgi.Router):
def __init__(self):
mapper = routes.Mapper()
- # TODO(gundlach): EC2 RootController is replaced by this class;
- # MetadataRequestHandlers isn't part of the EC2 API and thus can
- # be dropped; and I'm leaving off CloudPipeRequestHandler until
- # I hear that we need it.
+ mapper.connect("/", controller=self.versions)
mapper.connect("/v1.0/{path_info:.*}", controller=rackspace.API())
mapper.connect("/services/{path_info:.*}", controller=ec2.API())
super(API, self).__init__(mapper)
+
+ @webob.dec.wsgify
+ def versions(self, req):
+ """Respond to a request for all OpenStack API versions."""
+ response = {
+ "versions": [
+ dict(status="CURRENT", id="v1.0")]}
+ metadata = {
+ "application/xml": {
+ "attributes": dict(version=["status", "id"])}}
+ return wsgi.Serializer(req.environ, metadata).to_content_type(response)
diff --git a/nova/api/rackspace/__init__.py b/nova/api/rackspace/__init__.py
index b4d666d63..ac5365310 100644
--- a/nova/api/rackspace/__init__.py
+++ b/nova/api/rackspace/__init__.py
@@ -31,6 +31,7 @@ from nova import flags
from nova import wsgi
from nova.api.rackspace import flavors
from nova.api.rackspace import images
+from nova.api.rackspace import ratelimiting
from nova.api.rackspace import servers
from nova.api.rackspace import sharedipgroups
from nova.auth import manager
@@ -40,7 +41,7 @@ class API(wsgi.Middleware):
"""WSGI entry point for all Rackspace API requests."""
def __init__(self):
- app = AuthMiddleware(APIRouter())
+ app = AuthMiddleware(RateLimitingMiddleware(APIRouter()))
super(API, self).__init__(app)
@@ -65,6 +66,72 @@ class AuthMiddleware(wsgi.Middleware):
return self.application
+class RateLimitingMiddleware(wsgi.Middleware):
+ """Rate limit incoming requests according to the OpenStack rate limits."""
+
+ def __init__(self, application, service_host=None):
+ """Create a rate limiting middleware that wraps the given application.
+
+ By default, rate counters are stored in memory. If service_host is
+ specified, the middleware instead relies on the ratelimiting.WSGIApp
+ at the given host+port to keep rate counters.
+ """
+ super(RateLimitingMiddleware, self).__init__(application)
+ if not service_host:
+ #TODO(gundlach): These limits were based on limitations of Cloud
+ #Servers. We should revisit them in Nova.
+ self.limiter = ratelimiting.Limiter(limits={
+ 'DELETE': (100, ratelimiting.PER_MINUTE),
+ 'PUT': (10, ratelimiting.PER_MINUTE),
+ 'POST': (10, ratelimiting.PER_MINUTE),
+ 'POST servers': (50, ratelimiting.PER_DAY),
+ 'GET changes-since': (3, ratelimiting.PER_MINUTE),
+ })
+ else:
+ self.limiter = ratelimiting.WSGIAppProxy(service_host)
+
+ @webob.dec.wsgify
+ def __call__(self, req):
+ """Rate limit the request.
+
+ If the request should be rate limited, return a 413 status with a
+ Retry-After header giving the time when the request would succeed.
+ """
+ username = req.headers['X-Auth-User']
+ action_name = self.get_action_name(req)
+ if not action_name: # not rate limited
+ return self.application
+ delay = self.get_delay(action_name, username)
+ if delay:
+ # TODO(gundlach): Get the retry-after format correct.
+ raise webob.exc.HTTPRequestEntityTooLarge(headers={
+ 'Retry-After': time.time() + delay})
+ return self.application
+
+ def get_delay(self, action_name, username):
+ """Return the delay for the given action and username, or None if
+ the action would not be rate limited.
+ """
+ if action_name == 'POST servers':
+ # "POST servers" is a POST, so it counts against "POST" too.
+ # Attempt the "POST" first, lest we are rate limited by "POST" but
+ # use up a precious "POST servers" call.
+ delay = self.limiter.perform("POST", username=username)
+ if delay:
+ return delay
+ return self.limiter.perform(action_name, username=username)
+
+ def get_action_name(self, req):
+ """Return the action name for this request."""
+ if req.method == 'GET' and 'changes-since' in req.GET:
+ return 'GET changes-since'
+ if req.method == 'POST' and req.path_info.startswith('/servers'):
+ return 'POST servers'
+ if req.method in ['PUT', 'POST', 'DELETE']:
+ return req.method
+ return None
+
+
class APIRouter(wsgi.Router):
"""
Routes requests on the Rackspace API to the appropriate controller
diff --git a/nova/api/rackspace/ratelimiting/__init__.py b/nova/api/rackspace/ratelimiting/__init__.py
new file mode 100644
index 000000000..f843bac0f
--- /dev/null
+++ b/nova/api/rackspace/ratelimiting/__init__.py
@@ -0,0 +1,122 @@
+"""Rate limiting of arbitrary actions."""
+
+import httplib
+import time
+import urllib
+import webob.dec
+import webob.exc
+
+
+# Convenience constants for the limits dictionary passed to Limiter().
+PER_SECOND = 1
+PER_MINUTE = 60
+PER_HOUR = 60 * 60
+PER_DAY = 60 * 60 * 24
+
+class Limiter(object):
+
+ """Class providing rate limiting of arbitrary actions."""
+
+ def __init__(self, limits):
+ """Create a rate limiter.
+
+ limits: a dict mapping from action name to a tuple. The tuple contains
+ the number of times the action may be performed, and the time period
+ (in seconds) during which the number must not be exceeded for this
+ action. Example: dict(reboot=(10, ratelimiting.PER_MINUTE)) would
+ allow 10 'reboot' actions per minute.
+ """
+ self.limits = limits
+ self._levels = {}
+
+ def perform(self, action_name, username='nobody'):
+ """Attempt to perform an action by the given username.
+
+ action_name: the string name of the action to perform. This must
+ be a key in the limits dict passed to the ctor.
+
+ username: an optional string name of the user performing the action.
+ Each user has her own set of rate limiting counters. Defaults to
+ 'nobody' (so that if you never specify a username when calling
+ perform(), a single set of counters will be used.)
+
+ Return None if the action may proceed. If the action may not proceed
+ because it has been rate limited, return the float number of seconds
+ until the action would succeed.
+ """
+ # Think of rate limiting as a bucket leaking water at 1cc/second. The
+ # bucket can hold as many ccs as there are seconds in the rate
+ # limiting period (e.g. 3600 for per-hour ratelimits), and if you can
+ # perform N actions in that time, each action fills the bucket by
+ # 1/Nth of its volume. You may only perform an action if the bucket
+ # would not overflow.
+ now = time.time()
+ key = '%s:%s' % (username, action_name)
+ last_time_performed, water_level = self._levels.get(key, (now, 0))
+ # The bucket leaks 1cc/second.
+ water_level -= (now - last_time_performed)
+ if water_level < 0:
+ water_level = 0
+ num_allowed_per_period, period_in_secs = self.limits[action_name]
+ # Fill the bucket by 1/Nth its capacity, and hope it doesn't overflow.
+ capacity = period_in_secs
+ new_level = water_level + (capacity * 1.0 / num_allowed_per_period)
+ if new_level > capacity:
+ # Delay this many seconds.
+ return new_level - capacity
+ self._levels[key] = (now, new_level)
+ return None
+
+
+# If one instance of this WSGIApps is unable to handle your load, put a
+# sharding app in front that shards by username to one of many backends.
+
+class WSGIApp(object):
+
+ """Application that tracks rate limits in memory. Send requests to it of
+ this form:
+
+ POST /limiter/<username>/<urlencoded action>
+
+ and receive a 200 OK, or a 403 Forbidden with an X-Wait-Seconds header
+ containing the number of seconds to wait before the action would succeed.
+ """
+
+ def __init__(self, limiter):
+ """Create the WSGI application using the given Limiter instance."""
+ self.limiter = limiter
+
+ @webob.dec.wsgify
+ def __call__(self, req):
+ parts = req.path_info.split('/')
+ # format: /limiter/<username>/<urlencoded action>
+ if req.method != 'POST':
+ raise webob.exc.HTTPMethodNotAllowed()
+ if len(parts) != 4 or parts[1] != 'limiter':
+ raise webob.exc.HTTPNotFound()
+ username = parts[2]
+ action_name = urllib.unquote(parts[3])
+ delay = self.limiter.perform(action_name, username)
+ if delay:
+ return webob.exc.HTTPForbidden(
+ headers={'X-Wait-Seconds': "%.2f" % delay})
+ else:
+ return '' # 200 OK
+
+
+class WSGIAppProxy(object):
+
+ """Limiter lookalike that proxies to a ratelimiting.WSGIApp."""
+
+ def __init__(self, service_host):
+ """Creates a proxy pointing to a ratelimiting.WSGIApp at the given
+ host."""
+ self.service_host = service_host
+
+ def perform(self, action, username='nobody'):
+ conn = httplib.HTTPConnection(self.service_host)
+ conn.request('POST', '/limiter/%s/%s' % (username, action))
+ resp = conn.getresponse()
+ if resp.status == 200:
+ return None # no delay
+ return float(resp.getheader('X-Wait-Seconds'))
diff --git a/nova/api/rackspace/ratelimiting/tests.py b/nova/api/rackspace/ratelimiting/tests.py
new file mode 100644
index 000000000..4c9510917
--- /dev/null
+++ b/nova/api/rackspace/ratelimiting/tests.py
@@ -0,0 +1,237 @@
+import httplib
+import StringIO
+import time
+import unittest
+import webob
+
+import nova.api.rackspace.ratelimiting as ratelimiting
+
+class LimiterTest(unittest.TestCase):
+
+ def setUp(self):
+ self.limits = {
+ 'a': (5, ratelimiting.PER_SECOND),
+ 'b': (5, ratelimiting.PER_MINUTE),
+ 'c': (5, ratelimiting.PER_HOUR),
+ 'd': (1, ratelimiting.PER_SECOND),
+ 'e': (100, ratelimiting.PER_SECOND)}
+ self.rl = ratelimiting.Limiter(self.limits)
+
+ def exhaust(self, action, times_until_exhausted, **kwargs):
+ for i in range(times_until_exhausted):
+ when = self.rl.perform(action, **kwargs)
+ self.assertEqual(when, None)
+ num, period = self.limits[action]
+ delay = period * 1.0 / num
+ # Verify that we are now thoroughly delayed
+ for i in range(10):
+ when = self.rl.perform(action, **kwargs)
+ self.assertAlmostEqual(when, delay, 2)
+
+ def test_second(self):
+ self.exhaust('a', 5)
+ time.sleep(0.2)
+ self.exhaust('a', 1)
+ time.sleep(1)
+ self.exhaust('a', 5)
+
+ def test_minute(self):
+ self.exhaust('b', 5)
+
+ def test_one_per_period(self):
+ def allow_once_and_deny_once():
+ when = self.rl.perform('d')
+ self.assertEqual(when, None)
+ when = self.rl.perform('d')
+ self.assertAlmostEqual(when, 1, 2)
+ return when
+ time.sleep(allow_once_and_deny_once())
+ time.sleep(allow_once_and_deny_once())
+ allow_once_and_deny_once()
+
+ def test_we_can_go_indefinitely_if_we_spread_out_requests(self):
+ for i in range(200):
+ when = self.rl.perform('e')
+ self.assertEqual(when, None)
+ time.sleep(0.01)
+
+ def test_users_get_separate_buckets(self):
+ self.exhaust('c', 5, username='alice')
+ self.exhaust('c', 5, username='bob')
+ self.exhaust('c', 5, username='chuck')
+ self.exhaust('c', 0, username='chuck')
+ self.exhaust('c', 0, username='bob')
+ self.exhaust('c', 0, username='alice')
+
+
+class FakeLimiter(object):
+ """Fake Limiter class that you can tell how to behave."""
+ def __init__(self, test):
+ self._action = self._username = self._delay = None
+ self.test = test
+ def mock(self, action, username, delay):
+ self._action = action
+ self._username = username
+ self._delay = delay
+ def perform(self, action, username):
+ self.test.assertEqual(action, self._action)
+ self.test.assertEqual(username, self._username)
+ return self._delay
+
+
+class WSGIAppTest(unittest.TestCase):
+
+ def setUp(self):
+ self.limiter = FakeLimiter(self)
+ self.app = ratelimiting.WSGIApp(self.limiter)
+
+ def test_invalid_methods(self):
+ requests = []
+ for method in ['GET', 'PUT', 'DELETE']:
+ req = webob.Request.blank('/limits/michael/breakdance',
+ dict(REQUEST_METHOD=method))
+ requests.append(req)
+ for req in requests:
+ self.assertEqual(req.get_response(self.app).status_int, 405)
+
+ def test_invalid_urls(self):
+ requests = []
+ for prefix in ['limit', '', 'limiter2', 'limiter/limits', 'limiter/1']:
+ req = webob.Request.blank('/%s/michael/breakdance' % prefix,
+ dict(REQUEST_METHOD='POST'))
+ requests.append(req)
+ for req in requests:
+ self.assertEqual(req.get_response(self.app).status_int, 404)
+
+ def verify(self, url, username, action, delay=None):
+ """Make sure that POSTing to the given url causes the given username
+ to perform the given action. Make the internal rate limiter return
+ delay and make sure that the WSGI app returns the correct response.
+ """
+ req = webob.Request.blank(url, dict(REQUEST_METHOD='POST'))
+ self.limiter.mock(action, username, delay)
+ resp = req.get_response(self.app)
+ if not delay:
+ self.assertEqual(resp.status_int, 200)
+ else:
+ self.assertEqual(resp.status_int, 403)
+ self.assertEqual(resp.headers['X-Wait-Seconds'], "%.2f" % delay)
+
+ def test_good_urls(self):
+ self.verify('/limiter/michael/hoot', 'michael', 'hoot')
+
+ def test_escaping(self):
+ self.verify('/limiter/michael/jump%20up', 'michael', 'jump up')
+
+ def test_response_to_delays(self):
+ self.verify('/limiter/michael/hoot', 'michael', 'hoot', 1)
+ self.verify('/limiter/michael/hoot', 'michael', 'hoot', 1.56)
+ self.verify('/limiter/michael/hoot', 'michael', 'hoot', 1000)
+
+
+class FakeHttplibSocket(object):
+ """a fake socket implementation for httplib.HTTPResponse, trivial"""
+
+ def __init__(self, response_string):
+ self._buffer = StringIO.StringIO(response_string)
+
+ def makefile(self, _mode, _other):
+ """Returns the socket's internal buffer"""
+ return self._buffer
+
+
+class FakeHttplibConnection(object):
+ """A fake httplib.HTTPConnection
+
+ Requests made via this connection actually get translated and routed into
+ our WSGI app, we then wait for the response and turn it back into
+ an httplib.HTTPResponse.
+ """
+ def __init__(self, app, host, is_secure=False):
+ self.app = app
+ self.host = host
+
+ def request(self, method, path, data='', headers={}):
+ req = webob.Request.blank(path)
+ req.method = method
+ req.body = data
+ req.headers = headers
+ req.host = self.host
+ # Call the WSGI app, get the HTTP response
+ resp = str(req.get_response(self.app))
+ # For some reason, the response doesn't have "HTTP/1.0 " prepended; I
+ # guess that's a function the web server usually provides.
+ resp = "HTTP/1.0 %s" % resp
+ sock = FakeHttplibSocket(resp)
+ self.http_response = httplib.HTTPResponse(sock)
+ self.http_response.begin()
+
+ def getresponse(self):
+ return self.http_response
+
+
+def wire_HTTPConnection_to_WSGI(host, app):
+ """Monkeypatches HTTPConnection so that if you try to connect to host, you
+ are instead routed straight to the given WSGI app.
+
+ After calling this method, when any code calls
+
+ httplib.HTTPConnection(host)
+
+ the connection object will be a fake. Its requests will be sent directly
+ to the given WSGI app rather than through a socket.
+
+ Code connecting to hosts other than host will not be affected.
+
+ This method may be called multiple times to map different hosts to
+ different apps.
+ """
+ class HTTPConnectionDecorator(object):
+ """Wraps the real HTTPConnection class so that when you instantiate
+ the class you might instead get a fake instance."""
+ def __init__(self, wrapped):
+ self.wrapped = wrapped
+ def __call__(self, connection_host, *args, **kwargs):
+ if connection_host == host:
+ return FakeHttplibConnection(app, host)
+ else:
+ return self.wrapped(connection_host, *args, **kwargs)
+ httplib.HTTPConnection = HTTPConnectionDecorator(httplib.HTTPConnection)
+
+
+class WSGIAppProxyTest(unittest.TestCase):
+
+ def setUp(self):
+ """Our WSGIAppProxy is going to call across an HTTPConnection to a
+ WSGIApp running a limiter. The proxy will send input, and the proxy
+ should receive that same input, pass it to the limiter who gives a
+ result, and send the expected result back.
+
+ The HTTPConnection isn't real -- it's monkeypatched to point straight
+ at the WSGIApp. And the limiter isn't real -- it's a fake that
+ behaves the way we tell it to.
+ """
+ self.limiter = FakeLimiter(self)
+ app = ratelimiting.WSGIApp(self.limiter)
+ wire_HTTPConnection_to_WSGI('100.100.100.100:80', app)
+ self.proxy = ratelimiting.WSGIAppProxy('100.100.100.100:80')
+
+ def test_200(self):
+ self.limiter.mock('conquer', 'caesar', None)
+ when = self.proxy.perform('conquer', 'caesar')
+ self.assertEqual(when, None)
+
+ def test_403(self):
+ self.limiter.mock('grumble', 'proletariat', 1.5)
+ when = self.proxy.perform('grumble', 'proletariat')
+ self.assertEqual(when, 1.5)
+
+ def test_failure(self):
+ def shouldRaise():
+ self.limiter.mock('murder', 'brutus', None)
+ self.proxy.perform('stab', 'brutus')
+ self.assertRaises(AssertionError, shouldRaise)
+
+
+if __name__ == '__main__':
+ unittest.main()