summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVishvananda Ishaya <vishvananda@yahoo.com>2010-09-20 19:17:36 -0700
committerVishvananda Ishaya <vishvananda@yahoo.com>2010-09-20 19:17:36 -0700
commit026f6f2b7f5457f15a258ebed66bb8dda3263eec (patch)
tree146dcfe5e69bd4c7bea76e46fa97fbeb39828581
parentc791229d11e3baf2a5828ee8efe98ed827a35dde (diff)
parentd8861d04a85044ae57ffd7eb9ab682879beecf7d (diff)
downloadnova-026f6f2b7f5457f15a258ebed66bb8dda3263eec.tar.gz
nova-026f6f2b7f5457f15a258ebed66bb8dda3263eec.tar.xz
nova-026f6f2b7f5457f15a258ebed66bb8dda3263eec.zip
merged trunk
-rwxr-xr-xbin/nova-scheduler43
-rw-r--r--nova/api/__init__.py13
-rw-r--r--nova/api/rackspace/__init__.py69
-rw-r--r--nova/api/rackspace/ratelimiting/__init__.py122
-rw-r--r--nova/api/rackspace/ratelimiting/tests.py237
-rw-r--r--nova/compute/manager.py12
-rw-r--r--nova/db/api.py72
-rw-r--r--nova/db/sqlalchemy/api.py144
-rw-r--r--nova/db/sqlalchemy/models.py41
-rwxr-xr-xnova/endpoint/api.py5
-rw-r--r--nova/endpoint/cloud.py86
-rw-r--r--nova/exception.py12
-rw-r--r--nova/flags.py3
-rw-r--r--nova/network/manager.py4
-rw-r--r--nova/process.py8
-rw-r--r--nova/quota.py91
-rw-r--r--nova/scheduler/__init__.py25
-rw-r--r--nova/scheduler/chance.py38
-rw-r--r--nova/scheduler/driver.py58
-rw-r--r--nova/scheduler/manager.py66
-rw-r--r--nova/scheduler/simple.py90
-rw-r--r--nova/tests/api/__init__.py5
-rw-r--r--nova/tests/api/rackspace/__init__.py79
-rw-r--r--nova/tests/compute_unittest.py9
-rw-r--r--nova/tests/quota_unittest.py155
-rw-r--r--nova/tests/scheduler_unittest.py231
-rw-r--r--nova/utils.py18
-rw-r--r--nova/volume/manager.py12
-rw-r--r--run_tests.py2
-rw-r--r--tools/install_venv.py4
-rw-r--r--tools/pip-requires5
31 files changed, 1702 insertions, 57 deletions
diff --git a/bin/nova-scheduler b/bin/nova-scheduler
new file mode 100755
index 000000000..38a8f213f
--- /dev/null
+++ b/bin/nova-scheduler
@@ -0,0 +1,43 @@
+#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+ Twistd daemon for the nova scheduler nodes.
+"""
+
+import os
+import sys
+
+# If ../nova/__init__.py exists, add ../ to Python search path, so that
+# it will override what happens to be installed in /usr/(local/)lib/python...
+possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
+ os.pardir,
+ os.pardir))
+if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
+ sys.path.insert(0, possible_topdir)
+
+from nova import service
+from nova import twistd
+
+
+if __name__ == '__main__':
+ twistd.serve(__file__)
+
+if __name__ == '__builtin__':
+ application = service.Service.create()
diff --git a/nova/api/__init__.py b/nova/api/__init__.py
index b9b9e3988..9f116dada 100644
--- a/nova/api/__init__.py
+++ b/nova/api/__init__.py
@@ -21,6 +21,7 @@ Root WSGI middleware for all API controllers.
"""
import routes
+import webob.dec
from nova import wsgi
from nova.api import ec2
@@ -32,6 +33,18 @@ class API(wsgi.Router):
def __init__(self):
mapper = routes.Mapper()
+ mapper.connect("/", controller=self.versions)
mapper.connect("/v1.0/{path_info:.*}", controller=rackspace.API())
mapper.connect("/ec2/{path_info:.*}", controller=ec2.API())
super(API, self).__init__(mapper)
+
+ @webob.dec.wsgify
+ def versions(self, req):
+ """Respond to a request for all OpenStack API versions."""
+ response = {
+ "versions": [
+ dict(status="CURRENT", id="v1.0")]}
+ metadata = {
+ "application/xml": {
+ "attributes": dict(version=["status", "id"])}}
+ return wsgi.Serializer(req.environ, metadata).to_content_type(response)
diff --git a/nova/api/rackspace/__init__.py b/nova/api/rackspace/__init__.py
index b4d666d63..ac5365310 100644
--- a/nova/api/rackspace/__init__.py
+++ b/nova/api/rackspace/__init__.py
@@ -31,6 +31,7 @@ from nova import flags
from nova import wsgi
from nova.api.rackspace import flavors
from nova.api.rackspace import images
+from nova.api.rackspace import ratelimiting
from nova.api.rackspace import servers
from nova.api.rackspace import sharedipgroups
from nova.auth import manager
@@ -40,7 +41,7 @@ class API(wsgi.Middleware):
"""WSGI entry point for all Rackspace API requests."""
def __init__(self):
- app = AuthMiddleware(APIRouter())
+ app = AuthMiddleware(RateLimitingMiddleware(APIRouter()))
super(API, self).__init__(app)
@@ -65,6 +66,72 @@ class AuthMiddleware(wsgi.Middleware):
return self.application
+class RateLimitingMiddleware(wsgi.Middleware):
+ """Rate limit incoming requests according to the OpenStack rate limits."""
+
+ def __init__(self, application, service_host=None):
+ """Create a rate limiting middleware that wraps the given application.
+
+ By default, rate counters are stored in memory. If service_host is
+ specified, the middleware instead relies on the ratelimiting.WSGIApp
+ at the given host+port to keep rate counters.
+ """
+ super(RateLimitingMiddleware, self).__init__(application)
+ if not service_host:
+ #TODO(gundlach): These limits were based on limitations of Cloud
+ #Servers. We should revisit them in Nova.
+ self.limiter = ratelimiting.Limiter(limits={
+ 'DELETE': (100, ratelimiting.PER_MINUTE),
+ 'PUT': (10, ratelimiting.PER_MINUTE),
+ 'POST': (10, ratelimiting.PER_MINUTE),
+ 'POST servers': (50, ratelimiting.PER_DAY),
+ 'GET changes-since': (3, ratelimiting.PER_MINUTE),
+ })
+ else:
+ self.limiter = ratelimiting.WSGIAppProxy(service_host)
+
+ @webob.dec.wsgify
+ def __call__(self, req):
+ """Rate limit the request.
+
+ If the request should be rate limited, return a 413 status with a
+ Retry-After header giving the time when the request would succeed.
+ """
+ username = req.headers['X-Auth-User']
+ action_name = self.get_action_name(req)
+ if not action_name: # not rate limited
+ return self.application
+ delay = self.get_delay(action_name, username)
+ if delay:
+ # TODO(gundlach): Get the retry-after format correct.
+ raise webob.exc.HTTPRequestEntityTooLarge(headers={
+ 'Retry-After': time.time() + delay})
+ return self.application
+
+ def get_delay(self, action_name, username):
+ """Return the delay for the given action and username, or None if
+ the action would not be rate limited.
+ """
+ if action_name == 'POST servers':
+ # "POST servers" is a POST, so it counts against "POST" too.
+ # Attempt the "POST" first, lest we are rate limited by "POST" but
+ # use up a precious "POST servers" call.
+ delay = self.limiter.perform("POST", username=username)
+ if delay:
+ return delay
+ return self.limiter.perform(action_name, username=username)
+
+ def get_action_name(self, req):
+ """Return the action name for this request."""
+ if req.method == 'GET' and 'changes-since' in req.GET:
+ return 'GET changes-since'
+ if req.method == 'POST' and req.path_info.startswith('/servers'):
+ return 'POST servers'
+ if req.method in ['PUT', 'POST', 'DELETE']:
+ return req.method
+ return None
+
+
class APIRouter(wsgi.Router):
"""
Routes requests on the Rackspace API to the appropriate controller
diff --git a/nova/api/rackspace/ratelimiting/__init__.py b/nova/api/rackspace/ratelimiting/__init__.py
new file mode 100644
index 000000000..f843bac0f
--- /dev/null
+++ b/nova/api/rackspace/ratelimiting/__init__.py
@@ -0,0 +1,122 @@
+"""Rate limiting of arbitrary actions."""
+
+import httplib
+import time
+import urllib
+import webob.dec
+import webob.exc
+
+
+# Convenience constants for the limits dictionary passed to Limiter().
+PER_SECOND = 1
+PER_MINUTE = 60
+PER_HOUR = 60 * 60
+PER_DAY = 60 * 60 * 24
+
+class Limiter(object):
+
+ """Class providing rate limiting of arbitrary actions."""
+
+ def __init__(self, limits):
+ """Create a rate limiter.
+
+ limits: a dict mapping from action name to a tuple. The tuple contains
+ the number of times the action may be performed, and the time period
+ (in seconds) during which the number must not be exceeded for this
+ action. Example: dict(reboot=(10, ratelimiting.PER_MINUTE)) would
+ allow 10 'reboot' actions per minute.
+ """
+ self.limits = limits
+ self._levels = {}
+
+ def perform(self, action_name, username='nobody'):
+ """Attempt to perform an action by the given username.
+
+ action_name: the string name of the action to perform. This must
+ be a key in the limits dict passed to the ctor.
+
+ username: an optional string name of the user performing the action.
+ Each user has her own set of rate limiting counters. Defaults to
+ 'nobody' (so that if you never specify a username when calling
+ perform(), a single set of counters will be used.)
+
+ Return None if the action may proceed. If the action may not proceed
+ because it has been rate limited, return the float number of seconds
+ until the action would succeed.
+ """
+ # Think of rate limiting as a bucket leaking water at 1cc/second. The
+ # bucket can hold as many ccs as there are seconds in the rate
+ # limiting period (e.g. 3600 for per-hour ratelimits), and if you can
+ # perform N actions in that time, each action fills the bucket by
+ # 1/Nth of its volume. You may only perform an action if the bucket
+ # would not overflow.
+ now = time.time()
+ key = '%s:%s' % (username, action_name)
+ last_time_performed, water_level = self._levels.get(key, (now, 0))
+ # The bucket leaks 1cc/second.
+ water_level -= (now - last_time_performed)
+ if water_level < 0:
+ water_level = 0
+ num_allowed_per_period, period_in_secs = self.limits[action_name]
+ # Fill the bucket by 1/Nth its capacity, and hope it doesn't overflow.
+ capacity = period_in_secs
+ new_level = water_level + (capacity * 1.0 / num_allowed_per_period)
+ if new_level > capacity:
+ # Delay this many seconds.
+ return new_level - capacity
+ self._levels[key] = (now, new_level)
+ return None
+
+
+# If one instance of this WSGIApps is unable to handle your load, put a
+# sharding app in front that shards by username to one of many backends.
+
+class WSGIApp(object):
+
+ """Application that tracks rate limits in memory. Send requests to it of
+ this form:
+
+ POST /limiter/<username>/<urlencoded action>
+
+ and receive a 200 OK, or a 403 Forbidden with an X-Wait-Seconds header
+ containing the number of seconds to wait before the action would succeed.
+ """
+
+ def __init__(self, limiter):
+ """Create the WSGI application using the given Limiter instance."""
+ self.limiter = limiter
+
+ @webob.dec.wsgify
+ def __call__(self, req):
+ parts = req.path_info.split('/')
+ # format: /limiter/<username>/<urlencoded action>
+ if req.method != 'POST':
+ raise webob.exc.HTTPMethodNotAllowed()
+ if len(parts) != 4 or parts[1] != 'limiter':
+ raise webob.exc.HTTPNotFound()
+ username = parts[2]
+ action_name = urllib.unquote(parts[3])
+ delay = self.limiter.perform(action_name, username)
+ if delay:
+ return webob.exc.HTTPForbidden(
+ headers={'X-Wait-Seconds': "%.2f" % delay})
+ else:
+ return '' # 200 OK
+
+
+class WSGIAppProxy(object):
+
+ """Limiter lookalike that proxies to a ratelimiting.WSGIApp."""
+
+ def __init__(self, service_host):
+ """Creates a proxy pointing to a ratelimiting.WSGIApp at the given
+ host."""
+ self.service_host = service_host
+
+ def perform(self, action, username='nobody'):
+ conn = httplib.HTTPConnection(self.service_host)
+ conn.request('POST', '/limiter/%s/%s' % (username, action))
+ resp = conn.getresponse()
+ if resp.status == 200:
+ return None # no delay
+ return float(resp.getheader('X-Wait-Seconds'))
diff --git a/nova/api/rackspace/ratelimiting/tests.py b/nova/api/rackspace/ratelimiting/tests.py
new file mode 100644
index 000000000..4c9510917
--- /dev/null
+++ b/nova/api/rackspace/ratelimiting/tests.py
@@ -0,0 +1,237 @@
+import httplib
+import StringIO
+import time
+import unittest
+import webob
+
+import nova.api.rackspace.ratelimiting as ratelimiting
+
+class LimiterTest(unittest.TestCase):
+
+ def setUp(self):
+ self.limits = {
+ 'a': (5, ratelimiting.PER_SECOND),
+ 'b': (5, ratelimiting.PER_MINUTE),
+ 'c': (5, ratelimiting.PER_HOUR),
+ 'd': (1, ratelimiting.PER_SECOND),
+ 'e': (100, ratelimiting.PER_SECOND)}
+ self.rl = ratelimiting.Limiter(self.limits)
+
+ def exhaust(self, action, times_until_exhausted, **kwargs):
+ for i in range(times_until_exhausted):
+ when = self.rl.perform(action, **kwargs)
+ self.assertEqual(when, None)
+ num, period = self.limits[action]
+ delay = period * 1.0 / num
+ # Verify that we are now thoroughly delayed
+ for i in range(10):
+ when = self.rl.perform(action, **kwargs)
+ self.assertAlmostEqual(when, delay, 2)
+
+ def test_second(self):
+ self.exhaust('a', 5)
+ time.sleep(0.2)
+ self.exhaust('a', 1)
+ time.sleep(1)
+ self.exhaust('a', 5)
+
+ def test_minute(self):
+ self.exhaust('b', 5)
+
+ def test_one_per_period(self):
+ def allow_once_and_deny_once():
+ when = self.rl.perform('d')
+ self.assertEqual(when, None)
+ when = self.rl.perform('d')
+ self.assertAlmostEqual(when, 1, 2)
+ return when
+ time.sleep(allow_once_and_deny_once())
+ time.sleep(allow_once_and_deny_once())
+ allow_once_and_deny_once()
+
+ def test_we_can_go_indefinitely_if_we_spread_out_requests(self):
+ for i in range(200):
+ when = self.rl.perform('e')
+ self.assertEqual(when, None)
+ time.sleep(0.01)
+
+ def test_users_get_separate_buckets(self):
+ self.exhaust('c', 5, username='alice')
+ self.exhaust('c', 5, username='bob')
+ self.exhaust('c', 5, username='chuck')
+ self.exhaust('c', 0, username='chuck')
+ self.exhaust('c', 0, username='bob')
+ self.exhaust('c', 0, username='alice')
+
+
+class FakeLimiter(object):
+ """Fake Limiter class that you can tell how to behave."""
+ def __init__(self, test):
+ self._action = self._username = self._delay = None
+ self.test = test
+ def mock(self, action, username, delay):
+ self._action = action
+ self._username = username
+ self._delay = delay
+ def perform(self, action, username):
+ self.test.assertEqual(action, self._action)
+ self.test.assertEqual(username, self._username)
+ return self._delay
+
+
+class WSGIAppTest(unittest.TestCase):
+
+ def setUp(self):
+ self.limiter = FakeLimiter(self)
+ self.app = ratelimiting.WSGIApp(self.limiter)
+
+ def test_invalid_methods(self):
+ requests = []
+ for method in ['GET', 'PUT', 'DELETE']:
+ req = webob.Request.blank('/limits/michael/breakdance',
+ dict(REQUEST_METHOD=method))
+ requests.append(req)
+ for req in requests:
+ self.assertEqual(req.get_response(self.app).status_int, 405)
+
+ def test_invalid_urls(self):
+ requests = []
+ for prefix in ['limit', '', 'limiter2', 'limiter/limits', 'limiter/1']:
+ req = webob.Request.blank('/%s/michael/breakdance' % prefix,
+ dict(REQUEST_METHOD='POST'))
+ requests.append(req)
+ for req in requests:
+ self.assertEqual(req.get_response(self.app).status_int, 404)
+
+ def verify(self, url, username, action, delay=None):
+ """Make sure that POSTing to the given url causes the given username
+ to perform the given action. Make the internal rate limiter return
+ delay and make sure that the WSGI app returns the correct response.
+ """
+ req = webob.Request.blank(url, dict(REQUEST_METHOD='POST'))
+ self.limiter.mock(action, username, delay)
+ resp = req.get_response(self.app)
+ if not delay:
+ self.assertEqual(resp.status_int, 200)
+ else:
+ self.assertEqual(resp.status_int, 403)
+ self.assertEqual(resp.headers['X-Wait-Seconds'], "%.2f" % delay)
+
+ def test_good_urls(self):
+ self.verify('/limiter/michael/hoot', 'michael', 'hoot')
+
+ def test_escaping(self):
+ self.verify('/limiter/michael/jump%20up', 'michael', 'jump up')
+
+ def test_response_to_delays(self):
+ self.verify('/limiter/michael/hoot', 'michael', 'hoot', 1)
+ self.verify('/limiter/michael/hoot', 'michael', 'hoot', 1.56)
+ self.verify('/limiter/michael/hoot', 'michael', 'hoot', 1000)
+
+
+class FakeHttplibSocket(object):
+ """a fake socket implementation for httplib.HTTPResponse, trivial"""
+
+ def __init__(self, response_string):
+ self._buffer = StringIO.StringIO(response_string)
+
+ def makefile(self, _mode, _other):
+ """Returns the socket's internal buffer"""
+ return self._buffer
+
+
+class FakeHttplibConnection(object):
+ """A fake httplib.HTTPConnection
+
+ Requests made via this connection actually get translated and routed into
+ our WSGI app, we then wait for the response and turn it back into
+ an httplib.HTTPResponse.
+ """
+ def __init__(self, app, host, is_secure=False):
+ self.app = app
+ self.host = host
+
+ def request(self, method, path, data='', headers={}):
+ req = webob.Request.blank(path)
+ req.method = method
+ req.body = data
+ req.headers = headers
+ req.host = self.host
+ # Call the WSGI app, get the HTTP response
+ resp = str(req.get_response(self.app))
+ # For some reason, the response doesn't have "HTTP/1.0 " prepended; I
+ # guess that's a function the web server usually provides.
+ resp = "HTTP/1.0 %s" % resp
+ sock = FakeHttplibSocket(resp)
+ self.http_response = httplib.HTTPResponse(sock)
+ self.http_response.begin()
+
+ def getresponse(self):
+ return self.http_response
+
+
+def wire_HTTPConnection_to_WSGI(host, app):
+ """Monkeypatches HTTPConnection so that if you try to connect to host, you
+ are instead routed straight to the given WSGI app.
+
+ After calling this method, when any code calls
+
+ httplib.HTTPConnection(host)
+
+ the connection object will be a fake. Its requests will be sent directly
+ to the given WSGI app rather than through a socket.
+
+ Code connecting to hosts other than host will not be affected.
+
+ This method may be called multiple times to map different hosts to
+ different apps.
+ """
+ class HTTPConnectionDecorator(object):
+ """Wraps the real HTTPConnection class so that when you instantiate
+ the class you might instead get a fake instance."""
+ def __init__(self, wrapped):
+ self.wrapped = wrapped
+ def __call__(self, connection_host, *args, **kwargs):
+ if connection_host == host:
+ return FakeHttplibConnection(app, host)
+ else:
+ return self.wrapped(connection_host, *args, **kwargs)
+ httplib.HTTPConnection = HTTPConnectionDecorator(httplib.HTTPConnection)
+
+
+class WSGIAppProxyTest(unittest.TestCase):
+
+ def setUp(self):
+ """Our WSGIAppProxy is going to call across an HTTPConnection to a
+ WSGIApp running a limiter. The proxy will send input, and the proxy
+ should receive that same input, pass it to the limiter who gives a
+ result, and send the expected result back.
+
+ The HTTPConnection isn't real -- it's monkeypatched to point straight
+ at the WSGIApp. And the limiter isn't real -- it's a fake that
+ behaves the way we tell it to.
+ """
+ self.limiter = FakeLimiter(self)
+ app = ratelimiting.WSGIApp(self.limiter)
+ wire_HTTPConnection_to_WSGI('100.100.100.100:80', app)
+ self.proxy = ratelimiting.WSGIAppProxy('100.100.100.100:80')
+
+ def test_200(self):
+ self.limiter.mock('conquer', 'caesar', None)
+ when = self.proxy.perform('conquer', 'caesar')
+ self.assertEqual(when, None)
+
+ def test_403(self):
+ self.limiter.mock('grumble', 'proletariat', 1.5)
+ when = self.proxy.perform('grumble', 'proletariat')
+ self.assertEqual(when, 1.5)
+
+ def test_failure(self):
+ def shouldRaise():
+ self.limiter.mock('murder', 'brutus', None)
+ self.proxy.perform('stab', 'brutus')
+ self.assertRaises(AssertionError, shouldRaise)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index ae7099812..24538e4f1 100644
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -85,7 +85,9 @@ class ComputeManager(manager.Manager):
try:
yield self.driver.spawn(instance_ref)
now = datetime.datetime.utcnow()
- self.db.instance_update(None, instance_id, {'launched_at': now})
+ self.db.instance_update(context,
+ instance_id,
+ {'launched_at': now})
except Exception: # pylint: disable-msg=W0702
logging.exception("instance %s: Failed to spawn",
instance_ref['name'])
@@ -100,8 +102,8 @@ class ComputeManager(manager.Manager):
def terminate_instance(self, context, instance_id):
"""Terminate an instance on this machine."""
logging.debug("instance %s: terminating", instance_id)
- instance_ref = self.db.instance_get(context, instance_id)
+ instance_ref = self.db.instance_get(context, instance_id)
if instance_ref['state'] == power_state.SHUTOFF:
self.db.instance_destroy(context, instance_id)
raise exception.Error('trying to destroy already destroyed'
@@ -112,8 +114,6 @@ class ComputeManager(manager.Manager):
power_state.NOSTATE,
'shutting_down')
yield self.driver.destroy(instance_ref)
- now = datetime.datetime.utcnow()
- self.db.instance_update(None, instance_id, {'terminated_at': now})
# TODO(ja): should we keep it in a terminated state for a bit?
self.db.instance_destroy(context, instance_id)
@@ -189,7 +189,7 @@ class ComputeManager(manager.Manager):
volume_id)
instance_ref = self.db.instance_get(context, instance_id)
volume_ref = self.db.volume_get(context, volume_id)
- self.driver.detach_volume(instance_ref['str_id'],
- volume_ref['mountpoint'])
+ yield self.driver.detach_volume(instance_ref['str_id'],
+ volume_ref['mountpoint'])
self.db.volume_detached(context, volume_id)
defer.returnValue(True)
diff --git a/nova/db/api.py b/nova/db/api.py
index 2db462bdd..8e418f9f0 100644
--- a/nova/db/api.py
+++ b/nova/db/api.py
@@ -51,11 +51,45 @@ class NoMoreNetworks(exception.Error):
###################
+def service_destroy(context, instance_id):
+ """Destroy the service or raise if it does not exist."""
+ return IMPL.service_destroy(context, instance_id)
+
+
def service_get(context, service_id):
"""Get an service or raise if it does not exist."""
return IMPL.service_get(context, service_id)
+def service_get_all_by_topic(context, topic):
+ """Get all compute services for a given topic """
+ return IMPL.service_get_all_by_topic(context, topic)
+
+
+def service_get_all_compute_sorted(context):
+ """Get all compute services sorted by instance count
+
+ Returns a list of (Service, instance_count) tuples
+ """
+ return IMPL.service_get_all_compute_sorted(context)
+
+
+def service_get_all_network_sorted(context):
+ """Get all network services sorted by network count
+
+ Returns a list of (Service, network_count) tuples
+ """
+ return IMPL.service_get_all_network_sorted(context)
+
+
+def service_get_all_volume_sorted(context):
+ """Get all volume services sorted by volume count
+
+ Returns a list of (Service, volume_count) tuples
+ """
+ return IMPL.service_get_all_volume_sorted(context)
+
+
def service_get_by_args(context, host, binary):
"""Get the state of an service by node name and binary."""
return IMPL.service_get_by_args(context, host, binary)
@@ -91,6 +125,11 @@ def floating_ip_create(context, values):
return IMPL.floating_ip_create(context, values)
+def floating_ip_count_by_project(context, project_id):
+ """Count floating ips used by project."""
+ return IMPL.floating_ip_count_by_project(context, project_id)
+
+
def floating_ip_deallocate(context, address):
"""Deallocate an floating ip by address"""
return IMPL.floating_ip_deallocate(context, address)
@@ -193,6 +232,11 @@ def instance_create(context, values):
return IMPL.instance_create(context, values)
+def instance_data_get_for_project(context, project_id):
+ """Get (instance_count, core_count) for project."""
+ return IMPL.instance_data_get_for_project(context, project_id)
+
+
def instance_destroy(context, instance_id):
"""Destroy the instance or raise if it does not exist."""
return IMPL.instance_destroy(context, instance_id)
@@ -405,6 +449,29 @@ def export_device_create(context, values):
###################
+def quota_create(context, values):
+ """Create a quota from the values dictionary."""
+ return IMPL.quota_create(context, values)
+
+
+def quota_get(context, project_id):
+ """Retrieve a quota or raise if it does not exist."""
+ return IMPL.quota_get(context, project_id)
+
+
+def quota_update(context, project_id, values):
+ """Update a quota from the values dictionary."""
+ return IMPL.quota_update(context, project_id, values)
+
+
+def quota_destroy(context, project_id):
+ """Destroy the quota or raise if it does not exist."""
+ return IMPL.quota_destroy(context, project_id)
+
+
+###################
+
+
def volume_allocate_shelf_and_blade(context, volume_id):
"""Atomically allocate a free shelf and blade from the pool."""
return IMPL.volume_allocate_shelf_and_blade(context, volume_id)
@@ -420,6 +487,11 @@ def volume_create(context, values):
return IMPL.volume_create(context, values)
+def volume_data_get_for_project(context, project_id):
+ """Get (volume_count, gigabytes) for project."""
+ return IMPL.volume_data_get_for_project(context, project_id)
+
+
def volume_destroy(context, volume_id):
"""Destroy the volume or raise if it does not exist."""
return IMPL.volume_destroy(context, volume_id)
diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py
index be99a8eeb..30c550105 100644
--- a/nova/db/sqlalchemy/api.py
+++ b/nova/db/sqlalchemy/api.py
@@ -26,6 +26,7 @@ from nova.db.sqlalchemy import models
from nova.db.sqlalchemy.session import get_session
from sqlalchemy import or_
from sqlalchemy.orm import joinedload_all
+from sqlalchemy.sql import func
FLAGS = flags.FLAGS
@@ -46,10 +47,92 @@ def _deleted(context):
###################
+def service_destroy(context, service_id):
+ session = get_session()
+ with session.begin():
+ service_ref = models.Service.find(service_id, session=session)
+ service_ref.delete(session=session)
+
def service_get(_context, service_id):
return models.Service.find(service_id)
+def service_get_all_by_topic(context, topic):
+ session = get_session()
+ return session.query(models.Service
+ ).filter_by(deleted=False
+ ).filter_by(topic=topic
+ ).all()
+
+
+def _service_get_all_topic_subquery(_context, session, topic, subq, label):
+ sort_value = getattr(subq.c, label)
+ return session.query(models.Service, func.coalesce(sort_value, 0)
+ ).filter_by(topic=topic
+ ).filter_by(deleted=False
+ ).outerjoin((subq, models.Service.host == subq.c.host)
+ ).order_by(sort_value
+ ).all()
+
+
+def service_get_all_compute_sorted(context):
+ session = get_session()
+ with session.begin():
+ # NOTE(vish): The intended query is below
+ # SELECT services.*, COALESCE(inst_cores.instance_cores,
+ # 0)
+ # FROM services LEFT OUTER JOIN
+ # (SELECT host, SUM(instances.vcpus) AS instance_cores
+ # FROM instances GROUP BY host) AS inst_cores
+ # ON services.host = inst_cores.host
+ topic = 'compute'
+ label = 'instance_cores'
+ subq = session.query(models.Instance.host,
+ func.sum(models.Instance.vcpus).label(label)
+ ).filter_by(deleted=False
+ ).group_by(models.Instance.host
+ ).subquery()
+ return _service_get_all_topic_subquery(context,
+ session,
+ topic,
+ subq,
+ label)
+
+
+def service_get_all_network_sorted(context):
+ session = get_session()
+ with session.begin():
+ topic = 'network'
+ label = 'network_count'
+ subq = session.query(models.Network.host,
+ func.count(models.Network.id).label(label)
+ ).filter_by(deleted=False
+ ).group_by(models.Network.host
+ ).subquery()
+ return _service_get_all_topic_subquery(context,
+ session,
+ topic,
+ subq,
+ label)
+
+
+def service_get_all_volume_sorted(context):
+ session = get_session()
+ with session.begin():
+ topic = 'volume'
+ label = 'volume_gigabytes'
+ subq = session.query(models.Volume.host,
+ func.sum(models.Volume.size).label(label)
+ ).filter_by(deleted=False
+ ).group_by(models.Volume.host
+ ).subquery()
+ return _service_get_all_topic_subquery(context,
+ session,
+ topic,
+ subq,
+ label)
+
+
def service_get_by_args(_context, host, binary):
return models.Service.find_by_args(host, binary)
@@ -100,6 +183,14 @@ def floating_ip_create(_context, values):
return floating_ip_ref['address']
+def floating_ip_count_by_project(_context, project_id):
+ session = get_session()
+ return session.query(models.FloatingIp
+ ).filter_by(project_id=project_id
+ ).filter_by(deleted=False
+ ).count()
+
+
def floating_ip_fixed_ip_associate(_context, floating_address, fixed_address):
session = get_session()
with session.begin():
@@ -269,6 +360,17 @@ def instance_create(_context, values):
return instance_ref
+def instance_data_get_for_project(_context, project_id):
+ session = get_session()
+ result = session.query(func.count(models.Instance.id),
+ func.sum(models.Instance.vcpus)
+ ).filter_by(project_id=project_id
+ ).filter_by(deleted=False
+ ).first()
+ # NOTE(vish): convert None to 0
+ return (result[0] or 0, result[1] or 0)
+
+
def instance_destroy(_context, instance_id):
session = get_session()
with session.begin():
@@ -579,6 +681,37 @@ def export_device_create(_context, values):
###################
+def quota_create(_context, values):
+ quota_ref = models.Quota()
+ for (key, value) in values.iteritems():
+ quota_ref[key] = value
+ quota_ref.save()
+ return quota_ref
+
+
+def quota_get(_context, project_id):
+ return models.Quota.find_by_str(project_id)
+
+
+def quota_update(_context, project_id, values):
+ session = get_session()
+ with session.begin():
+ quota_ref = models.Quota.find_by_str(project_id, session=session)
+ for (key, value) in values.iteritems():
+ quota_ref[key] = value
+ quota_ref.save(session=session)
+
+
+def quota_destroy(_context, project_id):
+ session = get_session()
+ with session.begin():
+ quota_ref = models.Quota.find_by_str(project_id, session=session)
+ quota_ref.delete(session=session)
+
+
+###################
+
+
def volume_allocate_shelf_and_blade(_context, volume_id):
session = get_session()
with session.begin():
@@ -616,6 +749,17 @@ def volume_create(_context, values):
return volume_ref
+def volume_data_get_for_project(_context, project_id):
+ session = get_session()
+ result = session.query(func.count(models.Volume.id),
+ func.sum(models.Volume.size)
+ ).filter_by(project_id=project_id
+ ).filter_by(deleted=False
+ ).first()
+ # NOTE(vish): convert None to 0
+ return (result[0] or 0, result[1] or 0)
+
+
def volume_destroy(_context, volume_id):
session = get_session()
with session.begin():
diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py
index 0ecc48bae..f62b79af8 100644
--- a/nova/db/sqlalchemy/models.py
+++ b/nova/db/sqlalchemy/models.py
@@ -103,7 +103,7 @@ class NovaBase(object):
def delete(self, session=None):
"""Delete this object"""
self.deleted = True
- self.deleted_at = datetime.datetime.now()
+ self.deleted_at = datetime.datetime.utcnow()
self.save(session=session)
def __setitem__(self, key, value):
@@ -229,6 +229,11 @@ class Instance(BASE, NovaBase):
state = Column(Integer)
state_description = Column(String(255))
+ memory_mb = Column(Integer)
+ vcpus = Column(Integer)
+ local_gb = Column(Integer)
+
+
hostname = Column(String(255))
host = Column(String(255)) # , ForeignKey('hosts.id'))
@@ -239,6 +244,7 @@ class Instance(BASE, NovaBase):
reservation_id = Column(String(255))
mac_address = Column(String(255))
+ scheduled_at = Column(DateTime)
launched_at = Column(DateTime)
terminated_at = Column(DateTime)
# TODO(vish): see Ewan's email about state improvements, probably
@@ -272,6 +278,39 @@ class Volume(BASE, NovaBase):
status = Column(String(255)) # TODO(vish): enum?
attach_status = Column(String(255)) # TODO(vish): enum
+ scheduled_at = Column(DateTime)
+ launched_at = Column(DateTime)
+ terminated_at = Column(DateTime)
+
+class Quota(BASE, NovaBase):
+ """Represents quota overrides for a project"""
+ __tablename__ = 'quotas'
+ id = Column(Integer, primary_key=True)
+
+ project_id = Column(String(255))
+
+ instances = Column(Integer)
+ cores = Column(Integer)
+ volumes = Column(Integer)
+ gigabytes = Column(Integer)
+ floating_ips = Column(Integer)
+
+ @property
+ def str_id(self):
+ return self.project_id
+
+ @classmethod
+ def find_by_str(cls, str_id, session=None, deleted=False):
+ if not session:
+ session = get_session()
+ try:
+ return session.query(cls
+ ).filter_by(project_id=str_id
+ ).filter_by(deleted=deleted
+ ).one()
+ except exc.NoResultFound:
+ new_exc = exception.NotFound("No model for project_id %s" % str_id)
+ raise new_exc.__class__, new_exc, sys.exc_info()[2]
class ExportDevice(BASE, NovaBase):
"""Represates a shelf and blade that a volume can be exported on"""
diff --git a/nova/endpoint/api.py b/nova/endpoint/api.py
index 40be00bb7..12eedfe67 100755
--- a/nova/endpoint/api.py
+++ b/nova/endpoint/api.py
@@ -304,7 +304,10 @@ class APIRequestHandler(tornado.web.RequestHandler):
try:
failure.raiseException()
except exception.ApiError as ex:
- self._error(type(ex).__name__ + "." + ex.code, ex.message)
+ if ex.code:
+ self._error(ex.code, ex.message)
+ else:
+ self._error(type(ex).__name__, ex.message)
# TODO(vish): do something more useful with unknown exceptions
except Exception as ex:
self._error(type(ex).__name__, str(ex))
diff --git a/nova/endpoint/cloud.py b/nova/endpoint/cloud.py
index a88b1870e..53d9dc284 100644
--- a/nova/endpoint/cloud.py
+++ b/nova/endpoint/cloud.py
@@ -23,6 +23,7 @@ datastore.
"""
import base64
+import datetime
import logging
import os
import time
@@ -33,6 +34,7 @@ from nova import crypto
from nova import db
from nova import exception
from nova import flags
+from nova import quota
from nova import rpc
from nova import utils
from nova.auth import rbac
@@ -44,6 +46,11 @@ FLAGS = flags.FLAGS
flags.DECLARE('storage_availability_zone', 'nova.volume.manager')
+class QuotaError(exception.ApiError):
+ """Quota Exceeeded"""
+ pass
+
+
def _gen_key(context, user_id, key_name):
"""Generate a key
@@ -297,6 +304,14 @@ class CloudController(object):
@rbac.allow('projectmanager', 'sysadmin')
def create_volume(self, context, size, **kwargs):
+ # check quota
+ size = int(size)
+ if quota.allowed_volumes(context, 1, size) < 1:
+ logging.warn("Quota exceeeded for %s, tried to create %sG volume",
+ context.project.id, size)
+ raise QuotaError("Volume quota exceeded. You cannot "
+ "create a volume of size %s" %
+ size)
vol = {}
vol['size'] = size
vol['user_id'] = context.user.id
@@ -306,9 +321,11 @@ class CloudController(object):
vol['attach_status'] = "detached"
volume_ref = db.volume_create(context, vol)
- rpc.cast(FLAGS.volume_topic, {"method": "create_volume",
- "args": {"context": None,
- "volume_id": volume_ref['id']}})
+ rpc.cast(FLAGS.scheduler_topic,
+ {"method": "create_volume",
+ "args": {"context": None,
+ "topic": FLAGS.volume_topic,
+ "volume_id": volume_ref['id']}})
return {'volumeSet': [self._format_volume(context, volume_ref)]}
@@ -317,6 +334,8 @@ class CloudController(object):
def attach_volume(self, context, volume_id, instance_id, device, **kwargs):
volume_ref = db.volume_get_by_str(context, volume_id)
# TODO(vish): abstract status checking?
+ if volume_ref['status'] != "available":
+ raise exception.ApiError("Volume status must be available")
if volume_ref['attach_status'] == "attached":
raise exception.ApiError("Volume is already attached")
instance_ref = db.instance_get_by_str(context, instance_id)
@@ -339,10 +358,10 @@ class CloudController(object):
volume_ref = db.volume_get_by_str(context, volume_id)
instance_ref = db.volume_get_instance(context, volume_ref['id'])
if not instance_ref:
- raise exception.Error("Volume isn't attached to anything!")
+ raise exception.ApiError("Volume isn't attached to anything!")
# TODO(vish): abstract status checking?
if volume_ref['status'] == "available":
- raise exception.Error("Volume is already detached")
+ raise exception.ApiError("Volume is already detached")
try:
host = instance_ref['host']
rpc.cast(db.queue_get_for(context, FLAGS.compute_topic, host),
@@ -386,7 +405,7 @@ class CloudController(object):
instances = db.instance_get_by_reservation(context,
reservation_id)
else:
- if not context.user.is_admin():
+ if context.user.is_admin():
instances = db.instance_get_all(context)
else:
instances = db.instance_get_by_project(context,
@@ -461,6 +480,12 @@ class CloudController(object):
@rbac.allow('netadmin')
@defer.inlineCallbacks
def allocate_address(self, context, **kwargs):
+ # check quota
+ if quota.allowed_floating_ips(context, 1) < 1:
+ logging.warn("Quota exceeeded for %s, tried to allocate address",
+ context.project.id)
+ raise QuotaError("Address quota exceeded. You cannot "
+ "allocate any more addresses")
network_topic = yield self._get_network_topic(context)
public_ip = yield rpc.call(network_topic,
{"method": "allocate_floating_ip",
@@ -520,6 +545,22 @@ class CloudController(object):
@rbac.allow('projectmanager', 'sysadmin')
@defer.inlineCallbacks
def run_instances(self, context, **kwargs):
+ instance_type = kwargs.get('instance_type', 'm1.small')
+ if instance_type not in INSTANCE_TYPES:
+ raise exception.ApiError("Unknown instance type: %s",
+ instance_type)
+ # check quota
+ max_instances = int(kwargs.get('max_count', 1))
+ min_instances = int(kwargs.get('min_count', max_instances))
+ num_instances = quota.allowed_instances(context,
+ max_instances,
+ instance_type)
+ if num_instances < min_instances:
+ logging.warn("Quota exceeeded for %s, tried to run %s instances",
+ context.project.id, min_instances)
+ raise QuotaError("Instance quota exceeded. You can only "
+ "run %s more instances of this type." %
+ num_instances, "InstanceLimitExceeded")
# make sure user can access the image
# vpn image is private so it doesn't show up on lists
vpn = kwargs['image_id'] == FLAGS.vpn_image_id
@@ -541,7 +582,7 @@ class CloudController(object):
images.get(context, kernel_id)
images.get(context, ramdisk_id)
- logging.debug("Going to run instances...")
+ logging.debug("Going to run %s instances...", num_instances)
launch_time = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
key_data = None
if kwargs.has_key('key_name'):
@@ -555,6 +596,7 @@ class CloudController(object):
reservation_id = utils.generate_uid('r')
base_options = {}
+ base_options['state_description'] = 'scheduling'
base_options['image_id'] = image_id
base_options['kernel_id'] = kernel_id
base_options['ramdisk_id'] = ramdisk_id
@@ -564,10 +606,15 @@ class CloudController(object):
base_options['user_id'] = context.user.id
base_options['project_id'] = context.project.id
base_options['user_data'] = kwargs.get('user_data', '')
- base_options['instance_type'] = kwargs.get('instance_type', 'm1.small')
base_options['security_group'] = security_group
+ base_options['instance_type'] = instance_type
+
+ type_data = INSTANCE_TYPES[instance_type]
+ base_options['memory_mb'] = type_data['memory_mb']
+ base_options['vcpus'] = type_data['vcpus']
+ base_options['local_gb'] = type_data['local_gb']
- for num in range(int(kwargs['max_count'])):
+ for num in range(num_instances):
instance_ref = db.instance_create(context, base_options)
inst_id = instance_ref['id']
@@ -588,11 +635,12 @@ class CloudController(object):
"args": {"context": None,
"address": address}})
- rpc.cast(FLAGS.compute_topic,
- {"method": "run_instance",
- "args": {"context": None,
- "instance_id": inst_id}})
- logging.debug("Casting to node for %s/%s's instance %s" %
+ rpc.cast(FLAGS.scheduler_topic,
+ {"method": "run_instance",
+ "args": {"context": None,
+ "topic": FLAGS.compute_topic,
+ "instance_id": inst_id}})
+ logging.debug("Casting to scheduler for %s/%s's instance %s" %
(context.project.name, context.user.name, inst_id))
defer.returnValue(self._format_run_instances(context,
reservation_id))
@@ -611,6 +659,10 @@ class CloudController(object):
% id_str)
continue
+ now = datetime.datetime.utcnow()
+ db.instance_update(context,
+ instance_ref['id'],
+ {'terminated_at': now})
# FIXME(ja): where should network deallocate occur?
address = db.instance_get_floating_address(context,
instance_ref['id'])
@@ -632,7 +684,7 @@ class CloudController(object):
# NOTE(vish): Currently, nothing needs to be done on the
# network node until release. If this changes,
# we will need to cast here.
- self.network.deallocate_fixed_ip(context, address)
+ self.network_manager.deallocate_fixed_ip(context, address)
host = instance_ref['host']
if host:
@@ -660,6 +712,10 @@ class CloudController(object):
def delete_volume(self, context, volume_id, **kwargs):
# TODO: return error if not authorized
volume_ref = db.volume_get_by_str(context, volume_id)
+ if volume_ref['status'] != "available":
+ raise exception.ApiError("Volume status must be available")
+ now = datetime.datetime.utcnow()
+ db.volume_update(context, volume_ref['id'], {'terminated_at': now})
host = volume_ref['host']
rpc.cast(db.queue_get_for(context, FLAGS.volume_topic, host),
{"method": "delete_volume",
diff --git a/nova/exception.py b/nova/exception.py
index 29bcb17f8..b8894758f 100644
--- a/nova/exception.py
+++ b/nova/exception.py
@@ -26,6 +26,18 @@ import sys
import traceback
+class ProcessExecutionError(IOError):
+ def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None,
+ description=None):
+ if description is None:
+ description = "Unexpected error while running command."
+ if exit_code is None:
+ exit_code = '-'
+ message = "%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r" % (
+ description, cmd, exit_code, stdout, stderr)
+ IOError.__init__(self, message)
+
+
class Error(Exception):
def __init__(self, message=None):
super(Error, self).__init__(message)
diff --git a/nova/flags.py b/nova/flags.py
index 7b0c95a3c..ed0baee65 100644
--- a/nova/flags.py
+++ b/nova/flags.py
@@ -171,6 +171,7 @@ DEFINE_string('connection_type', 'libvirt', 'libvirt, xenapi or fake')
DEFINE_integer('s3_port', 3333, 's3 port')
DEFINE_string('s3_host', '127.0.0.1', 's3 host')
DEFINE_string('compute_topic', 'compute', 'the topic compute nodes listen on')
+DEFINE_string('scheduler_topic', 'scheduler', 'the topic scheduler nodes listen on')
DEFINE_string('volume_topic', 'volume', 'the topic volume nodes listen on')
DEFINE_string('network_topic', 'network', 'the topic network nodes listen on')
@@ -213,6 +214,8 @@ DEFINE_string('network_manager', 'nova.network.manager.VlanManager',
'Manager for network')
DEFINE_string('volume_manager', 'nova.volume.manager.AOEManager',
'Manager for volume')
+DEFINE_string('scheduler_manager', 'nova.scheduler.manager.SchedulerManager',
+ 'Manager for scheduler')
DEFINE_string('host', socket.gethostname(),
'name of this node')
diff --git a/nova/network/manager.py b/nova/network/manager.py
index 7a3bcfc2f..191c1d364 100644
--- a/nova/network/manager.py
+++ b/nova/network/manager.py
@@ -90,7 +90,7 @@ class NetworkManager(manager.Manager):
network_id = network_ref['id']
host = self.db.network_set_host(context,
network_id,
- FLAGS.host)
+ self.host)
self._on_set_network_host(context, network_id)
return host
@@ -118,7 +118,7 @@ class NetworkManager(manager.Manager):
"""Gets an floating ip from the pool"""
# TODO(vish): add floating ips through manage command
return self.db.floating_ip_allocate_address(context,
- FLAGS.host,
+ self.host,
project_id)
def associate_floating_ip(self, context, floating_address, fixed_address):
diff --git a/nova/process.py b/nova/process.py
index 74725c157..b3cad894b 100644
--- a/nova/process.py
+++ b/nova/process.py
@@ -30,7 +30,7 @@ from twisted.internet import protocol
from twisted.internet import reactor
from nova import flags
-from nova.utils import ProcessExecutionError
+from nova.exception import ProcessExecutionError
FLAGS = flags.FLAGS
flags.DEFINE_integer('process_pool_size', 4,
@@ -127,7 +127,7 @@ def get_process_output(executable, args=None, env=None, path=None,
deferred = defer.Deferred()
cmd = executable
if args:
- cmd = cmd + " " + ' '.join(args)
+ cmd = " ".join([cmd] + args)
logging.debug("Running cmd: %s", cmd)
process_handler = BackRelayWithInput(
deferred,
@@ -141,8 +141,8 @@ def get_process_output(executable, args=None, env=None, path=None,
executable = str(executable)
if not args is None:
args = [str(x) for x in args]
- process_reactor.spawnProcess( process_handler, executable,
- (executable,)+tuple(args), env, path)
+ process_reactor.spawnProcess(process_handler, executable,
+ (executable,)+tuple(args), env, path)
return deferred
diff --git a/nova/quota.py b/nova/quota.py
new file mode 100644
index 000000000..f0e51feeb
--- /dev/null
+++ b/nova/quota.py
@@ -0,0 +1,91 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Quotas for instances, volumes, and floating ips
+"""
+
+from nova import db
+from nova import exception
+from nova import flags
+from nova.compute import instance_types
+
+FLAGS = flags.FLAGS
+
+flags.DEFINE_integer('quota_instances', 10,
+ 'number of instances allowed per project')
+flags.DEFINE_integer('quota_cores', 20,
+ 'number of instance cores allowed per project')
+flags.DEFINE_integer('quota_volumes', 10,
+ 'number of volumes allowed per project')
+flags.DEFINE_integer('quota_gigabytes', 1000,
+ 'number of volume gigabytes allowed per project')
+flags.DEFINE_integer('quota_floating_ips', 10,
+ 'number of floating ips allowed per project')
+
+def _get_quota(context, project_id):
+ rval = {'instances': FLAGS.quota_instances,
+ 'cores': FLAGS.quota_cores,
+ 'volumes': FLAGS.quota_volumes,
+ 'gigabytes': FLAGS.quota_gigabytes,
+ 'floating_ips': FLAGS.quota_floating_ips}
+ try:
+ quota = db.quota_get(context, project_id)
+ for key in rval.keys():
+ if quota[key] is not None:
+ rval[key] = quota[key]
+ except exception.NotFound:
+ pass
+ return rval
+
+def allowed_instances(context, num_instances, instance_type):
+ """Check quota and return min(num_instances, allowed_instances)"""
+ project_id = context.project.id
+ used_instances, used_cores = db.instance_data_get_for_project(context,
+ project_id)
+ quota = _get_quota(context, project_id)
+ allowed_instances = quota['instances'] - used_instances
+ allowed_cores = quota['cores'] - used_cores
+ type_cores = instance_types.INSTANCE_TYPES[instance_type]['vcpus']
+ num_cores = num_instances * type_cores
+ allowed_instances = min(allowed_instances,
+ int(allowed_cores // type_cores))
+ return min(num_instances, allowed_instances)
+
+
+def allowed_volumes(context, num_volumes, size):
+ """Check quota and return min(num_volumes, allowed_volumes)"""
+ project_id = context.project.id
+ used_volumes, used_gigabytes = db.volume_data_get_for_project(context,
+ project_id)
+ quota = _get_quota(context, project_id)
+ allowed_volumes = quota['volumes'] - used_volumes
+ allowed_gigabytes = quota['gigabytes'] - used_gigabytes
+ num_gigabytes = num_volumes * size
+ allowed_volumes = min(allowed_volumes,
+ int(allowed_gigabytes // size))
+ return min(num_volumes, allowed_volumes)
+
+
+def allowed_floating_ips(context, num_floating_ips):
+ """Check quota and return min(num_floating_ips, allowed_floating_ips)"""
+ project_id = context.project.id
+ used_floating_ips = db.floating_ip_count_by_project(context, project_id)
+ quota = _get_quota(context, project_id)
+ allowed_floating_ips = quota['floating_ips'] - used_floating_ips
+ return min(num_floating_ips, allowed_floating_ips)
+
diff --git a/nova/scheduler/__init__.py b/nova/scheduler/__init__.py
new file mode 100644
index 000000000..8359a7aeb
--- /dev/null
+++ b/nova/scheduler/__init__.py
@@ -0,0 +1,25 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Openstack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+:mod:`nova.scheduler` -- Scheduler Nodes
+=====================================================
+
+.. automodule:: nova.scheduler
+ :platform: Unix
+ :synopsis: Module that picks a compute node to run a VM instance.
+.. moduleauthor:: Chris Behrens <cbehrens@codestud.com>
+"""
diff --git a/nova/scheduler/chance.py b/nova/scheduler/chance.py
new file mode 100644
index 000000000..7fd09b053
--- /dev/null
+++ b/nova/scheduler/chance.py
@@ -0,0 +1,38 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Openstack, LLC.
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Chance (Random) Scheduler implementation
+"""
+
+import random
+
+from nova.scheduler import driver
+
+
+class ChanceScheduler(driver.Scheduler):
+ """Implements Scheduler as a random node selector."""
+
+ def schedule(self, context, topic, *_args, **_kwargs):
+ """Picks a host that is up at random."""
+
+ hosts = self.hosts_up(context, topic)
+ if not hosts:
+ raise driver.NoValidHost("No hosts found")
+ return hosts[int(random.random() * len(hosts))]
diff --git a/nova/scheduler/driver.py b/nova/scheduler/driver.py
new file mode 100644
index 000000000..2e6a5a835
--- /dev/null
+++ b/nova/scheduler/driver.py
@@ -0,0 +1,58 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Openstack, LLC.
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Scheduler base class that all Schedulers should inherit from
+"""
+
+import datetime
+
+from nova import db
+from nova import exception
+from nova import flags
+
+FLAGS = flags.FLAGS
+flags.DEFINE_integer('service_down_time', 60,
+ 'maximum time since last checkin for up service')
+
+class NoValidHost(exception.Error):
+ """There is no valid host for the command."""
+ pass
+
+class Scheduler(object):
+ """The base class that all Scheduler clases should inherit from."""
+
+ @staticmethod
+ def service_is_up(service):
+ """Check whether a service is up based on last heartbeat."""
+ last_heartbeat = service['updated_at'] or service['created_at']
+ elapsed = datetime.datetime.now() - last_heartbeat
+ return elapsed < datetime.timedelta(seconds=FLAGS.service_down_time)
+
+ def hosts_up(self, context, topic):
+ """Return the list of hosts that have a running service for topic."""
+
+ services = db.service_get_all_by_topic(context, topic)
+ return [service.host
+ for service in services
+ if self.service_is_up(service)]
+
+ def schedule(self, context, topic, *_args, **_kwargs):
+ """Must override at least this method for scheduler to work."""
+ raise NotImplementedError("Must implement a fallback schedule")
diff --git a/nova/scheduler/manager.py b/nova/scheduler/manager.py
new file mode 100644
index 000000000..0ad7ca86b
--- /dev/null
+++ b/nova/scheduler/manager.py
@@ -0,0 +1,66 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Openstack, LLC.
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Scheduler Service
+"""
+
+import logging
+import functools
+
+from nova import db
+from nova import flags
+from nova import manager
+from nova import rpc
+from nova import utils
+
+FLAGS = flags.FLAGS
+flags.DEFINE_string('scheduler_driver',
+ 'nova.scheduler.chance.ChanceScheduler',
+ 'Driver to use for the scheduler')
+
+
+class SchedulerManager(manager.Manager):
+ """Chooses a host to run instances on."""
+ def __init__(self, scheduler_driver=None, *args, **kwargs):
+ if not scheduler_driver:
+ scheduler_driver = FLAGS.scheduler_driver
+ self.driver = utils.import_object(scheduler_driver)
+ super(SchedulerManager, self).__init__(*args, **kwargs)
+
+ def __getattr__(self, key):
+ """Converts all method calls to use the schedule method"""
+ return functools.partial(self._schedule, key)
+
+ def _schedule(self, method, context, topic, *args, **kwargs):
+ """Tries to call schedule_* method on the driver to retrieve host.
+
+ Falls back to schedule(context, topic) if method doesn't exist.
+ """
+ driver_method = 'schedule_%s' % method
+ try:
+ host = getattr(self.driver, driver_method)(context, *args, **kwargs)
+ except AttributeError:
+ host = self.driver.schedule(context, topic, *args, **kwargs)
+
+ kwargs.update({"context": None})
+ rpc.cast(db.queue_get_for(context, topic, host),
+ {"method": method,
+ "args": kwargs})
+ logging.debug("Casting to %s %s for %s", topic, host, method)
diff --git a/nova/scheduler/simple.py b/nova/scheduler/simple.py
new file mode 100644
index 000000000..fdaff74d8
--- /dev/null
+++ b/nova/scheduler/simple.py
@@ -0,0 +1,90 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Openstack, LLC.
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Simple Scheduler
+"""
+
+import datetime
+
+from nova import db
+from nova import flags
+from nova.scheduler import driver
+from nova.scheduler import chance
+
+FLAGS = flags.FLAGS
+flags.DEFINE_integer("max_cores", 16,
+ "maximum number of instance cores to allow per host")
+flags.DEFINE_integer("max_gigabytes", 10000,
+ "maximum number of volume gigabytes to allow per host")
+flags.DEFINE_integer("max_networks", 1000,
+ "maximum number of networks to allow per host")
+
+class SimpleScheduler(chance.ChanceScheduler):
+ """Implements Naive Scheduler that tries to find least loaded host."""
+
+ def schedule_run_instance(self, context, instance_id, *_args, **_kwargs):
+ """Picks a host that is up and has the fewest running instances."""
+ instance_ref = db.instance_get(context, instance_id)
+ results = db.service_get_all_compute_sorted(context)
+ for result in results:
+ (service, instance_cores) = result
+ if instance_cores + instance_ref['vcpus'] > FLAGS.max_cores:
+ raise driver.NoValidHost("All hosts have too many cores")
+ if self.service_is_up(service):
+ # NOTE(vish): this probably belongs in the manager, if we
+ # can generalize this somehow
+ now = datetime.datetime.utcnow()
+ db.instance_update(context,
+ instance_id,
+ {'host': service['host'],
+ 'scheduled_at': now})
+ return service['host']
+ raise driver.NoValidHost("No hosts found")
+
+ def schedule_create_volume(self, context, volume_id, *_args, **_kwargs):
+ """Picks a host that is up and has the fewest volumes."""
+ volume_ref = db.volume_get(context, volume_id)
+ results = db.service_get_all_volume_sorted(context)
+ for result in results:
+ (service, volume_gigabytes) = result
+ if volume_gigabytes + volume_ref['size'] > FLAGS.max_gigabytes:
+ raise driver.NoValidHost("All hosts have too many gigabytes")
+ if self.service_is_up(service):
+ # NOTE(vish): this probably belongs in the manager, if we
+ # can generalize this somehow
+ now = datetime.datetime.utcnow()
+ db.volume_update(context,
+ volume_id,
+ {'host': service['host'],
+ 'scheduled_at': now})
+ return service['host']
+ raise driver.NoValidHost("No hosts found")
+
+ def schedule_set_network_host(self, context, *_args, **_kwargs):
+ """Picks a host that is up and has the fewest networks."""
+
+ results = db.service_get_all_network_sorted(context)
+ for result in results:
+ (service, instance_count) = result
+ if instance_count >= FLAGS.max_networks:
+ raise driver.NoValidHost("All hosts have too many networks")
+ if self.service_is_up(service):
+ return service['host']
+ raise driver.NoValidHost("No hosts found")
diff --git a/nova/tests/api/__init__.py b/nova/tests/api/__init__.py
index 59c4adc3d..4682c094e 100644
--- a/nova/tests/api/__init__.py
+++ b/nova/tests/api/__init__.py
@@ -52,8 +52,9 @@ class Test(unittest.TestCase):
result = webob.Request.blank('/test/cloud').get_response(api.API())
self.assertNotEqual(result.body, "/cloud")
- def test_query_api_version(self):
- pass
+ def test_query_api_versions(self):
+ result = webob.Request.blank('/').get_response(api.API())
+ self.assertTrue('CURRENT' in result.body)
if __name__ == '__main__':
unittest.main()
diff --git a/nova/tests/api/rackspace/__init__.py b/nova/tests/api/rackspace/__init__.py
index e69de29bb..622cb4335 100644
--- a/nova/tests/api/rackspace/__init__.py
+++ b/nova/tests/api/rackspace/__init__.py
@@ -0,0 +1,79 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+from nova.api.rackspace import RateLimitingMiddleware
+from nova.tests.api.test_helper import *
+from webob import Request
+
+
+class RateLimitingMiddlewareTest(unittest.TestCase):
+
+ def test_get_action_name(self):
+ middleware = RateLimitingMiddleware(APIStub())
+ def verify(method, url, action_name):
+ req = Request.blank(url)
+ req.method = method
+ action = middleware.get_action_name(req)
+ self.assertEqual(action, action_name)
+ verify('PUT', '/servers/4', 'PUT')
+ verify('DELETE', '/servers/4', 'DELETE')
+ verify('POST', '/images/4', 'POST')
+ verify('POST', '/servers/4', 'POST servers')
+ verify('GET', '/foo?a=4&changes-since=never&b=5', 'GET changes-since')
+ verify('GET', '/foo?a=4&monkeys-since=never&b=5', None)
+ verify('GET', '/servers/4', None)
+ verify('HEAD', '/servers/4', None)
+
+ def exhaust(self, middleware, method, url, username, times):
+ req = Request.blank(url, dict(REQUEST_METHOD=method),
+ headers={'X-Auth-User': username})
+ for i in range(times):
+ resp = req.get_response(middleware)
+ self.assertEqual(resp.status_int, 200)
+ resp = req.get_response(middleware)
+ self.assertEqual(resp.status_int, 413)
+ self.assertTrue('Retry-After' in resp.headers)
+
+ def test_single_action(self):
+ middleware = RateLimitingMiddleware(APIStub())
+ self.exhaust(middleware, 'DELETE', '/servers/4', 'usr1', 100)
+ self.exhaust(middleware, 'DELETE', '/servers/4', 'usr2', 100)
+
+ def test_POST_servers_action_implies_POST_action(self):
+ middleware = RateLimitingMiddleware(APIStub())
+ self.exhaust(middleware, 'POST', '/servers/4', 'usr1', 10)
+ self.exhaust(middleware, 'POST', '/images/4', 'usr2', 10)
+ self.assertTrue(set(middleware.limiter._levels) ==
+ set(['usr1:POST', 'usr1:POST servers', 'usr2:POST']))
+
+ def test_POST_servers_action_correctly_ratelimited(self):
+ middleware = RateLimitingMiddleware(APIStub())
+ # Use up all of our "POST" allowance for the minute, 5 times
+ for i in range(5):
+ self.exhaust(middleware, 'POST', '/servers/4', 'usr1', 10)
+ # Reset the 'POST' action counter.
+ del middleware.limiter._levels['usr1:POST']
+ # All 50 daily "POST servers" actions should be all used up
+ self.exhaust(middleware, 'POST', '/servers/4', 'usr1', 0)
+
+ def test_proxy_ctor_works(self):
+ middleware = RateLimitingMiddleware(APIStub())
+ self.assertEqual(middleware.limiter.__class__.__name__, "Limiter")
+ middleware = RateLimitingMiddleware(APIStub(), service_host='foobar')
+ self.assertEqual(middleware.limiter.__class__.__name__, "WSGIAppProxy")
diff --git a/nova/tests/compute_unittest.py b/nova/tests/compute_unittest.py
index de2bf3d3b..f5c0f1c09 100644
--- a/nova/tests/compute_unittest.py
+++ b/nova/tests/compute_unittest.py
@@ -50,6 +50,7 @@ class ComputeTestCase(test.TrialTestCase):
def tearDown(self): # pylint: disable-msg=C0103
self.manager.delete_user(self.user)
self.manager.delete_project(self.project)
+ super(ComputeTestCase, self).tearDown()
def _create_instance(self):
"""Create a test instance"""
@@ -83,21 +84,21 @@ class ComputeTestCase(test.TrialTestCase):
@defer.inlineCallbacks
def test_run_terminate_timestamps(self):
- """Make sure it is possible to run and terminate instance"""
+ """Make sure timestamps are set for launched and destroyed"""
instance_id = self._create_instance()
instance_ref = db.instance_get(self.context, instance_id)
self.assertEqual(instance_ref['launched_at'], None)
- self.assertEqual(instance_ref['terminated_at'], None)
+ self.assertEqual(instance_ref['deleted_at'], None)
launch = datetime.datetime.utcnow()
yield self.compute.run_instance(self.context, instance_id)
instance_ref = db.instance_get(self.context, instance_id)
self.assert_(instance_ref['launched_at'] > launch)
- self.assertEqual(instance_ref['terminated_at'], None)
+ self.assertEqual(instance_ref['deleted_at'], None)
terminate = datetime.datetime.utcnow()
yield self.compute.terminate_instance(self.context, instance_id)
instance_ref = db.instance_get({'deleted': True}, instance_id)
self.assert_(instance_ref['launched_at'] < terminate)
- self.assert_(instance_ref['terminated_at'] > terminate)
+ self.assert_(instance_ref['deleted_at'] > terminate)
@defer.inlineCallbacks
def test_reboot(self):
diff --git a/nova/tests/quota_unittest.py b/nova/tests/quota_unittest.py
new file mode 100644
index 000000000..cab9f663d
--- /dev/null
+++ b/nova/tests/quota_unittest.py
@@ -0,0 +1,155 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+from nova import db
+from nova import exception
+from nova import flags
+from nova import quota
+from nova import test
+from nova import utils
+from nova.auth import manager
+from nova.endpoint import cloud
+from nova.endpoint import api
+
+
+FLAGS = flags.FLAGS
+
+
+class QuotaTestCase(test.TrialTestCase):
+ def setUp(self): # pylint: disable-msg=C0103
+ logging.getLogger().setLevel(logging.DEBUG)
+ super(QuotaTestCase, self).setUp()
+ self.flags(connection_type='fake',
+ quota_instances=2,
+ quota_cores=4,
+ quota_volumes=2,
+ quota_gigabytes=20,
+ quota_floating_ips=1)
+
+ self.cloud = cloud.CloudController()
+ self.manager = manager.AuthManager()
+ self.user = self.manager.create_user('admin', 'admin', 'admin', True)
+ self.project = self.manager.create_project('admin', 'admin', 'admin')
+ self.network = utils.import_object(FLAGS.network_manager)
+ self.context = api.APIRequestContext(handler=None,
+ project=self.project,
+ user=self.user)
+
+ def tearDown(self): # pylint: disable-msg=C0103
+ manager.AuthManager().delete_project(self.project)
+ manager.AuthManager().delete_user(self.user)
+ super(QuotaTestCase, self).tearDown()
+
+ def _create_instance(self, cores=2):
+ """Create a test instance"""
+ inst = {}
+ inst['image_id'] = 'ami-test'
+ inst['reservation_id'] = 'r-fakeres'
+ inst['user_id'] = self.user.id
+ inst['project_id'] = self.project.id
+ inst['instance_type'] = 'm1.large'
+ inst['vcpus'] = cores
+ inst['mac_address'] = utils.generate_mac()
+ return db.instance_create(self.context, inst)['id']
+
+ def _create_volume(self, size=10):
+ """Create a test volume"""
+ vol = {}
+ vol['user_id'] = self.user.id
+ vol['project_id'] = self.project.id
+ vol['size'] = size
+ return db.volume_create(self.context, vol)['id']
+
+ def test_quota_overrides(self):
+ """Make sure overriding a projects quotas works"""
+ num_instances = quota.allowed_instances(self.context, 100, 'm1.small')
+ self.assertEqual(num_instances, 2)
+ db.quota_create(self.context, {'project_id': self.project.id,
+ 'instances': 10})
+ num_instances = quota.allowed_instances(self.context, 100, 'm1.small')
+ self.assertEqual(num_instances, 4)
+ db.quota_update(self.context, self.project.id, {'cores': 100})
+ num_instances = quota.allowed_instances(self.context, 100, 'm1.small')
+ self.assertEqual(num_instances, 10)
+ db.quota_destroy(self.context, self.project.id)
+
+ def test_too_many_instances(self):
+ instance_ids = []
+ for i in range(FLAGS.quota_instances):
+ instance_id = self._create_instance()
+ instance_ids.append(instance_id)
+ self.assertFailure(self.cloud.run_instances(self.context,
+ min_count=1,
+ max_count=1,
+ instance_type='m1.small'),
+ cloud.QuotaError)
+ for instance_id in instance_ids:
+ db.instance_destroy(self.context, instance_id)
+
+ def test_too_many_cores(self):
+ instance_ids = []
+ instance_id = self._create_instance(cores=4)
+ instance_ids.append(instance_id)
+ self.assertFailure(self.cloud.run_instances(self.context,
+ min_count=1,
+ max_count=1,
+ instance_type='m1.small'),
+ cloud.QuotaError)
+ for instance_id in instance_ids:
+ db.instance_destroy(self.context, instance_id)
+
+ def test_too_many_volumes(self):
+ volume_ids = []
+ for i in range(FLAGS.quota_volumes):
+ volume_id = self._create_volume()
+ volume_ids.append(volume_id)
+ self.assertRaises(cloud.QuotaError,
+ self.cloud.create_volume,
+ self.context,
+ size=10)
+ for volume_id in volume_ids:
+ db.volume_destroy(self.context, volume_id)
+
+ def test_too_many_gigabytes(self):
+ volume_ids = []
+ volume_id = self._create_volume(size=20)
+ volume_ids.append(volume_id)
+ self.assertRaises(cloud.QuotaError,
+ self.cloud.create_volume,
+ self.context,
+ size=10)
+ for volume_id in volume_ids:
+ db.volume_destroy(self.context, volume_id)
+
+ def test_too_many_addresses(self):
+ address = '192.168.0.100'
+ try:
+ db.floating_ip_get_by_address(None, address)
+ except exception.NotFound:
+ db.floating_ip_create(None, {'address': address,
+ 'host': FLAGS.host})
+ float_addr = self.network.allocate_floating_ip(self.context,
+ self.project.id)
+ # NOTE(vish): This assert never fails. When cloud attempts to
+ # make an rpc.call, the test just finishes with OK. It
+ # appears to be something in the magic inline callbacks
+ # that is breaking.
+ self.assertFailure(self.cloud.allocate_address(self.context),
+ cloud.QuotaError)
diff --git a/nova/tests/scheduler_unittest.py b/nova/tests/scheduler_unittest.py
new file mode 100644
index 000000000..fde30f81e
--- /dev/null
+++ b/nova/tests/scheduler_unittest.py
@@ -0,0 +1,231 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Tests For Scheduler
+"""
+
+from nova import db
+from nova import flags
+from nova import service
+from nova import test
+from nova import rpc
+from nova import utils
+from nova.auth import manager as auth_manager
+from nova.scheduler import manager
+from nova.scheduler import driver
+
+
+FLAGS = flags.FLAGS
+flags.DECLARE('max_cores', 'nova.scheduler.simple')
+
+class TestDriver(driver.Scheduler):
+ """Scheduler Driver for Tests"""
+ def schedule(context, topic, *args, **kwargs):
+ return 'fallback_host'
+
+ def schedule_named_method(context, topic, num):
+ return 'named_host'
+
+class SchedulerTestCase(test.TrialTestCase):
+ """Test case for scheduler"""
+ def setUp(self): # pylint: disable=C0103
+ super(SchedulerTestCase, self).setUp()
+ self.flags(scheduler_driver='nova.tests.scheduler_unittest.TestDriver')
+
+ def test_fallback(self):
+ scheduler = manager.SchedulerManager()
+ self.mox.StubOutWithMock(rpc, 'cast', use_mock_anything=True)
+ rpc.cast('topic.fallback_host',
+ {'method': 'noexist',
+ 'args': {'context': None,
+ 'num': 7}})
+ self.mox.ReplayAll()
+ scheduler.noexist(None, 'topic', num=7)
+
+ def test_named_method(self):
+ scheduler = manager.SchedulerManager()
+ self.mox.StubOutWithMock(rpc, 'cast', use_mock_anything=True)
+ rpc.cast('topic.named_host',
+ {'method': 'named_method',
+ 'args': {'context': None,
+ 'num': 7}})
+ self.mox.ReplayAll()
+ scheduler.named_method(None, 'topic', num=7)
+
+
+class SimpleDriverTestCase(test.TrialTestCase):
+ """Test case for simple driver"""
+ def setUp(self): # pylint: disable-msg=C0103
+ super(SimpleDriverTestCase, self).setUp()
+ self.flags(connection_type='fake',
+ max_cores=4,
+ max_gigabytes=4,
+ volume_driver='nova.volume.driver.FakeAOEDriver',
+ scheduler_driver='nova.scheduler.simple.SimpleScheduler')
+ self.scheduler = manager.SchedulerManager()
+ self.context = None
+ self.manager = auth_manager.AuthManager()
+ self.user = self.manager.create_user('fake', 'fake', 'fake')
+ self.project = self.manager.create_project('fake', 'fake', 'fake')
+ self.context = None
+
+ def tearDown(self): # pylint: disable-msg=C0103
+ self.manager.delete_user(self.user)
+ self.manager.delete_project(self.project)
+
+ def _create_instance(self):
+ """Create a test instance"""
+ inst = {}
+ inst['image_id'] = 'ami-test'
+ inst['reservation_id'] = 'r-fakeres'
+ inst['user_id'] = self.user.id
+ inst['project_id'] = self.project.id
+ inst['instance_type'] = 'm1.tiny'
+ inst['mac_address'] = utils.generate_mac()
+ inst['ami_launch_index'] = 0
+ inst['vcpus'] = 1
+ return db.instance_create(self.context, inst)['id']
+
+ def _create_volume(self):
+ """Create a test volume"""
+ vol = {}
+ vol['image_id'] = 'ami-test'
+ vol['reservation_id'] = 'r-fakeres'
+ vol['size'] = 1
+ return db.volume_create(self.context, vol)['id']
+
+ def test_hosts_are_up(self):
+ """Ensures driver can find the hosts that are up"""
+ # NOTE(vish): constructing service without create method
+ # because we are going to use it without queue
+ compute1 = service.Service('host1',
+ 'nova-compute',
+ 'compute',
+ FLAGS.compute_manager)
+ compute2 = service.Service('host2',
+ 'nova-compute',
+ 'compute',
+ FLAGS.compute_manager)
+ hosts = self.scheduler.driver.hosts_up(self.context, 'compute')
+ self.assertEqual(len(hosts), 2)
+ compute1.kill()
+ compute2.kill()
+
+ def test_least_busy_host_gets_instance(self):
+ """Ensures the host with less cores gets the next one"""
+ compute1 = service.Service('host1',
+ 'nova-compute',
+ 'compute',
+ FLAGS.compute_manager)
+ compute2 = service.Service('host2',
+ 'nova-compute',
+ 'compute',
+ FLAGS.compute_manager)
+ instance_id1 = self._create_instance()
+ compute1.run_instance(self.context, instance_id1)
+ instance_id2 = self._create_instance()
+ host = self.scheduler.driver.schedule_run_instance(self.context,
+ instance_id2)
+ self.assertEqual(host, 'host2')
+ compute1.terminate_instance(self.context, instance_id1)
+ db.instance_destroy(self.context, instance_id2)
+ compute1.kill()
+ compute2.kill()
+
+ def test_too_many_cores(self):
+ """Ensures we don't go over max cores"""
+ compute1 = service.Service('host1',
+ 'nova-compute',
+ 'compute',
+ FLAGS.compute_manager)
+ compute2 = service.Service('host2',
+ 'nova-compute',
+ 'compute',
+ FLAGS.compute_manager)
+ instance_ids1 = []
+ instance_ids2 = []
+ for index in xrange(FLAGS.max_cores):
+ instance_id = self._create_instance()
+ compute1.run_instance(self.context, instance_id)
+ instance_ids1.append(instance_id)
+ instance_id = self._create_instance()
+ compute2.run_instance(self.context, instance_id)
+ instance_ids2.append(instance_id)
+ instance_id = self._create_instance()
+ self.assertRaises(driver.NoValidHost,
+ self.scheduler.driver.schedule_run_instance,
+ self.context,
+ instance_id)
+ for instance_id in instance_ids1:
+ compute1.terminate_instance(self.context, instance_id)
+ for instance_id in instance_ids2:
+ compute2.terminate_instance(self.context, instance_id)
+ compute1.kill()
+ compute2.kill()
+
+ def test_least_busy_host_gets_volume(self):
+ """Ensures the host with less gigabytes gets the next one"""
+ volume1 = service.Service('host1',
+ 'nova-volume',
+ 'volume',
+ FLAGS.volume_manager)
+ volume2 = service.Service('host2',
+ 'nova-volume',
+ 'volume',
+ FLAGS.volume_manager)
+ volume_id1 = self._create_volume()
+ volume1.create_volume(self.context, volume_id1)
+ volume_id2 = self._create_volume()
+ host = self.scheduler.driver.schedule_create_volume(self.context,
+ volume_id2)
+ self.assertEqual(host, 'host2')
+ volume1.delete_volume(self.context, volume_id1)
+ db.volume_destroy(self.context, volume_id2)
+ volume1.kill()
+ volume2.kill()
+
+ def test_too_many_gigabytes(self):
+ """Ensures we don't go over max gigabytes"""
+ volume1 = service.Service('host1',
+ 'nova-volume',
+ 'volume',
+ FLAGS.volume_manager)
+ volume2 = service.Service('host2',
+ 'nova-volume',
+ 'volume',
+ FLAGS.volume_manager)
+ volume_ids1 = []
+ volume_ids2 = []
+ for index in xrange(FLAGS.max_gigabytes):
+ volume_id = self._create_volume()
+ volume1.create_volume(self.context, volume_id)
+ volume_ids1.append(volume_id)
+ volume_id = self._create_volume()
+ volume2.create_volume(self.context, volume_id)
+ volume_ids2.append(volume_id)
+ volume_id = self._create_volume()
+ self.assertRaises(driver.NoValidHost,
+ self.scheduler.driver.schedule_create_volume,
+ self.context,
+ volume_id)
+ for volume_id in volume_ids1:
+ volume1.delete_volume(self.context, volume_id)
+ for volume_id in volume_ids2:
+ volume2.delete_volume(self.context, volume_id)
+ volume1.kill()
+ volume2.kill()
diff --git a/nova/utils.py b/nova/utils.py
index 011a5cb09..d18dd9843 100644
--- a/nova/utils.py
+++ b/nova/utils.py
@@ -33,22 +33,12 @@ from twisted.internet.threads import deferToThread
from nova import exception
from nova import flags
+from nova.exception import ProcessExecutionError
FLAGS = flags.FLAGS
TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
-class ProcessExecutionError(IOError):
- def __init__( self, stdout=None, stderr=None, exit_code=None, cmd=None,
- description=None):
- if description is None:
- description = "Unexpected error while running command."
- if exit_code is None:
- exit_code = '-'
- message = "%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r" % (
- description, cmd, exit_code, stdout, stderr)
- IOError.__init__(self, message)
-
def import_class(import_str):
"""Returns a class from a string including module and class"""
mod_str, _sep, class_str = import_str.rpartition('.')
@@ -129,8 +119,10 @@ def runthis(prompt, cmd, check_exit_code = True):
exit_code = subprocess.call(cmd.split(" "))
logging.debug(prompt % (exit_code))
if check_exit_code and exit_code <> 0:
- raise Exception( "Unexpected exit code: %s from cmd: %s"
- % (exit_code, cmd))
+ raise ProcessExecutionError(exit_code=exit_code,
+ stdout=None,
+ stderr=None,
+ cmd=cmd)
def generate_uid(topic, size=8):
diff --git a/nova/volume/manager.py b/nova/volume/manager.py
index 174c036d6..034763512 100644
--- a/nova/volume/manager.py
+++ b/nova/volume/manager.py
@@ -22,6 +22,7 @@ destroying persistent storage volumes, ala EBS.
"""
import logging
+import datetime
from twisted.internet import defer
@@ -72,7 +73,7 @@ class AOEManager(manager.Manager):
self.db.volume_update(context,
volume_id,
- {'host': FLAGS.host})
+ {'host': self.host})
size = volume_ref['size']
logging.debug("volume %s: creating lv of size %sG", volume_id, size)
@@ -89,14 +90,13 @@ class AOEManager(manager.Manager):
yield self.driver.create_export(volume_ref['str_id'],
shelf_id,
blade_id)
- # TODO(joshua): We need to trigger a fanout message
- # for aoe-discover on all the nodes
-
- self.db.volume_update(context, volume_id, {'status': 'available'})
logging.debug("volume %s: re-exporting all values", volume_id)
yield self.driver.ensure_exports()
+ now = datetime.datetime.utcnow()
+ self.db.volume_update(context, volume_id, {'status': 'available',
+ 'launched_at': now})
logging.debug("volume %s: created successfully", volume_id)
defer.returnValue(volume_id)
@@ -107,7 +107,7 @@ class AOEManager(manager.Manager):
volume_ref = self.db.volume_get(context, volume_id)
if volume_ref['attach_status'] == "attached":
raise exception.Error("Volume is still attached")
- if volume_ref['host'] != FLAGS.host:
+ if volume_ref['host'] != self.host:
raise exception.Error("Volume is not local to this node")
shelf_id, blade_id = self.db.volume_get_shelf_and_blade(context,
volume_id)
diff --git a/run_tests.py b/run_tests.py
index d5dc5f934..4121f4c06 100644
--- a/run_tests.py
+++ b/run_tests.py
@@ -58,7 +58,9 @@ from nova.tests.flags_unittest import *
from nova.tests.network_unittest import *
from nova.tests.objectstore_unittest import *
from nova.tests.process_unittest import *
+from nova.tests.quota_unittest import *
from nova.tests.rpc_unittest import *
+from nova.tests.scheduler_unittest import *
from nova.tests.service_unittest import *
from nova.tests.validator_unittest import *
from nova.tests.volume_unittest import *
diff --git a/tools/install_venv.py b/tools/install_venv.py
index 5d2369a96..32c372352 100644
--- a/tools/install_venv.py
+++ b/tools/install_venv.py
@@ -88,6 +88,10 @@ def create_virtualenv(venv=VENV):
def install_dependencies(venv=VENV):
print 'Installing dependencies with pip (this can take a while)...'
+ # Install greenlet by hand - just listing it in the requires file does not
+ # get it in stalled in the right order
+ run_command(['tools/with_venv.sh', 'pip', 'install', '-E', venv, 'greenlet'],
+ redirect_output=False)
run_command(['tools/with_venv.sh', 'pip', 'install', '-E', venv, '-r', PIP_REQUIRES],
redirect_output=False)
run_command(['tools/with_venv.sh', 'pip', 'install', '-E', venv, TWISTED_NOVA],
diff --git a/tools/pip-requires b/tools/pip-requires
index dd69708ce..1e2707be7 100644
--- a/tools/pip-requires
+++ b/tools/pip-requires
@@ -7,15 +7,16 @@ amqplib==0.6.1
anyjson==0.2.4
boto==2.0b1
carrot==0.10.5
-eventlet==0.9.10
+eventlet==0.9.12
lockfile==0.8
python-daemon==1.5.5
python-gflags==1.3
redis==2.0.0
routes==1.12.3
tornado==1.0
-webob==0.9.8
+WebOb==0.9.8
wsgiref==0.1.2
zope.interface==3.6.1
mox==0.5.0
-f http://pymox.googlecode.com/files/mox-0.5.0.tar.gz
+greenlet==0.3.1