diff options
author | Soren Hansen <soren.hansen@rackspace.com> | 2010-09-22 13:26:36 +0200 |
---|---|---|
committer | Soren Hansen <soren.hansen@rackspace.com> | 2010-09-22 13:26:36 +0200 |
commit | d42e168763d232476407a07b79056fb745c6075e (patch) | |
tree | 25e98655b2fcf9a05ecc832ee17e0c1d9e7f2221 | |
parent | 28336ed41e0d44d7600588a6014f6253e4b87a42 (diff) | |
parent | 4e727faf450154d89687b1a33dae2159d5b691a0 (diff) | |
download | nova-d42e168763d232476407a07b79056fb745c6075e.tar.gz nova-d42e168763d232476407a07b79056fb745c6075e.tar.xz nova-d42e168763d232476407a07b79056fb745c6075e.zip |
Merge trunk
48 files changed, 2458 insertions, 549 deletions
diff --git a/bin/nova-dhcpbridge b/bin/nova-dhcpbridge index c4795cca2..a127ed03c 100755 --- a/bin/nova-dhcpbridge +++ b/bin/nova-dhcpbridge @@ -44,18 +44,20 @@ flags.DECLARE('auth_driver', 'nova.auth.manager') flags.DECLARE('redis_db', 'nova.datastore') flags.DECLARE('network_size', 'nova.network.manager') flags.DECLARE('num_networks', 'nova.network.manager') +flags.DECLARE('update_dhcp_on_disassociate', 'nova.network.manager') -def add_lease(_mac, ip_address, _hostname, _interface): +def add_lease(mac, ip_address, _hostname, _interface): """Set the IP that was assigned by the DHCP server.""" if FLAGS.fake_rabbit: logging.debug("leasing ip") network_manager = utils.import_object(FLAGS.network_manager) - network_manager.lease_fixed_ip(None, ip_address) + network_manager.lease_fixed_ip(None, mac, ip_address) else: - rpc.cast("%s.%s" % (FLAGS.network_topic, FLAGS.node_name), + rpc.cast("%s.%s" % (FLAGS.network_topic, FLAGS.host), {"method": "lease_fixed_ip", "args": {"context": None, + "mac": mac, "address": ip_address}}) @@ -64,16 +66,17 @@ def old_lease(_mac, _ip_address, _hostname, _interface): logging.debug("Adopted old lease or got a change of mac/hostname") -def del_lease(_mac, ip_address, _hostname, _interface): +def del_lease(mac, ip_address, _hostname, _interface): """Called when a lease expires.""" if FLAGS.fake_rabbit: logging.debug("releasing ip") network_manager = utils.import_object(FLAGS.network_manager) - network_manager.release_fixed_ip(None, ip_address) + network_manager.release_fixed_ip(None, mac, ip_address) else: - rpc.cast("%s.%s" % (FLAGS.network_topic, FLAGS.node_name), + rpc.cast("%s.%s" % (FLAGS.network_topic, FLAGS.host), {"method": "release_fixed_ip", "args": {"context": None, + "mac": mac, "address": ip_address}}) diff --git a/bin/nova-manage b/bin/nova-manage index ecef5d555..325245ac4 100755 --- a/bin/nova-manage +++ b/bin/nova-manage @@ -17,6 +17,37 @@ # License for the specific language governing permissions and limitations # under the License. +# Interactive shell based on Django: +# +# Copyright (c) 2005, the Lawrence Journal-World +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# 3. Neither the name of Django nor the names of its contributors may be used +# to endorse or promote products derived from this software without +# specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR +# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + """ CLI interface for nova management. Connects to the running ADMIN api in the api daemon. @@ -26,6 +57,8 @@ import os import sys import time +import IPy + # If ../nova/__init__.py exists, add ../ to Python search path, so that # it will override what happens to be installed in /usr/(local/)lib/python... possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), @@ -101,6 +134,29 @@ class VpnCommands(object): self.pipe.launch_vpn_instance(project_id) +class ShellCommands(object): + def run(self): + "Runs a Python interactive interpreter. Tries to use IPython, if it's available." + try: + import IPython + # Explicitly pass an empty list as arguments, because otherwise IPython + # would use sys.argv from this script. + shell = IPython.Shell.IPShell(argv=[]) + shell.mainloop() + except ImportError: + import code + try: # Try activating rlcompleter, because it's handy. + import readline + except ImportError: + pass + else: + # We don't have to wrap the following import in a 'try', because + # we already know 'readline' was imported successfully. + import rlcompleter + readline.parse_and_bind("tab:complete") + code.interact() + + class RoleCommands(object): """Class for managing roles.""" @@ -218,12 +274,45 @@ class ProjectCommands(object): with open(filename, 'w') as f: f.write(zip_file) +class FloatingIpCommands(object): + """Class for managing floating ip.""" + + def create(self, host, range): + """Creates floating ips for host by range + arguments: host ip_range""" + for address in IPy.IP(range): + db.floating_ip_create(None, {'address': str(address), + 'host': host}) + + def delete(self, ip_range): + """Deletes floating ips by range + arguments: range""" + for address in IPy.IP(ip_range): + db.floating_ip_destroy(None, str(address)) + + + def list(self, host=None): + """Lists all floating ips (optionally by host) + arguments: [host]""" + if host == None: + floating_ips = db.floating_ip_get_all(None) + else: + floating_ips = db.floating_ip_get_all_by_host(None, host) + for floating_ip in floating_ips: + instance = None + if floating_ip['fixed_ip']: + instance = floating_ip['fixed_ip']['instance']['str_id'] + print "%s\t%s\t%s" % (floating_ip['host'], + floating_ip['address'], + instance) CATEGORIES = [ ('user', UserCommands), ('project', ProjectCommands), ('role', RoleCommands), + ('shell', ShellCommands), ('vpn', VpnCommands), + ('floating', FloatingIpCommands) ] diff --git a/bin/nova-scheduler b/bin/nova-scheduler new file mode 100755 index 000000000..38a8f213f --- /dev/null +++ b/bin/nova-scheduler @@ -0,0 +1,43 @@ +#!/usr/bin/env python +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" + Twistd daemon for the nova scheduler nodes. +""" + +import os +import sys + +# If ../nova/__init__.py exists, add ../ to Python search path, so that +# it will override what happens to be installed in /usr/(local/)lib/python... +possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), + os.pardir, + os.pardir)) +if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')): + sys.path.insert(0, possible_topdir) + +from nova import service +from nova import twistd + + +if __name__ == '__main__': + twistd.serve(__file__) + +if __name__ == '__builtin__': + application = service.Service.create() diff --git a/nova/api/__init__.py b/nova/api/__init__.py index b9b9e3988..9f116dada 100644 --- a/nova/api/__init__.py +++ b/nova/api/__init__.py @@ -21,6 +21,7 @@ Root WSGI middleware for all API controllers. """ import routes +import webob.dec from nova import wsgi from nova.api import ec2 @@ -32,6 +33,18 @@ class API(wsgi.Router): def __init__(self): mapper = routes.Mapper() + mapper.connect("/", controller=self.versions) mapper.connect("/v1.0/{path_info:.*}", controller=rackspace.API()) mapper.connect("/ec2/{path_info:.*}", controller=ec2.API()) super(API, self).__init__(mapper) + + @webob.dec.wsgify + def versions(self, req): + """Respond to a request for all OpenStack API versions.""" + response = { + "versions": [ + dict(status="CURRENT", id="v1.0")]} + metadata = { + "application/xml": { + "attributes": dict(version=["status", "id"])}} + return wsgi.Serializer(req.environ, metadata).to_content_type(response) diff --git a/nova/api/rackspace/__init__.py b/nova/api/rackspace/__init__.py index b4d666d63..ac5365310 100644 --- a/nova/api/rackspace/__init__.py +++ b/nova/api/rackspace/__init__.py @@ -31,6 +31,7 @@ from nova import flags from nova import wsgi from nova.api.rackspace import flavors from nova.api.rackspace import images +from nova.api.rackspace import ratelimiting from nova.api.rackspace import servers from nova.api.rackspace import sharedipgroups from nova.auth import manager @@ -40,7 +41,7 @@ class API(wsgi.Middleware): """WSGI entry point for all Rackspace API requests.""" def __init__(self): - app = AuthMiddleware(APIRouter()) + app = AuthMiddleware(RateLimitingMiddleware(APIRouter())) super(API, self).__init__(app) @@ -65,6 +66,72 @@ class AuthMiddleware(wsgi.Middleware): return self.application +class RateLimitingMiddleware(wsgi.Middleware): + """Rate limit incoming requests according to the OpenStack rate limits.""" + + def __init__(self, application, service_host=None): + """Create a rate limiting middleware that wraps the given application. + + By default, rate counters are stored in memory. If service_host is + specified, the middleware instead relies on the ratelimiting.WSGIApp + at the given host+port to keep rate counters. + """ + super(RateLimitingMiddleware, self).__init__(application) + if not service_host: + #TODO(gundlach): These limits were based on limitations of Cloud + #Servers. We should revisit them in Nova. + self.limiter = ratelimiting.Limiter(limits={ + 'DELETE': (100, ratelimiting.PER_MINUTE), + 'PUT': (10, ratelimiting.PER_MINUTE), + 'POST': (10, ratelimiting.PER_MINUTE), + 'POST servers': (50, ratelimiting.PER_DAY), + 'GET changes-since': (3, ratelimiting.PER_MINUTE), + }) + else: + self.limiter = ratelimiting.WSGIAppProxy(service_host) + + @webob.dec.wsgify + def __call__(self, req): + """Rate limit the request. + + If the request should be rate limited, return a 413 status with a + Retry-After header giving the time when the request would succeed. + """ + username = req.headers['X-Auth-User'] + action_name = self.get_action_name(req) + if not action_name: # not rate limited + return self.application + delay = self.get_delay(action_name, username) + if delay: + # TODO(gundlach): Get the retry-after format correct. + raise webob.exc.HTTPRequestEntityTooLarge(headers={ + 'Retry-After': time.time() + delay}) + return self.application + + def get_delay(self, action_name, username): + """Return the delay for the given action and username, or None if + the action would not be rate limited. + """ + if action_name == 'POST servers': + # "POST servers" is a POST, so it counts against "POST" too. + # Attempt the "POST" first, lest we are rate limited by "POST" but + # use up a precious "POST servers" call. + delay = self.limiter.perform("POST", username=username) + if delay: + return delay + return self.limiter.perform(action_name, username=username) + + def get_action_name(self, req): + """Return the action name for this request.""" + if req.method == 'GET' and 'changes-since' in req.GET: + return 'GET changes-since' + if req.method == 'POST' and req.path_info.startswith('/servers'): + return 'POST servers' + if req.method in ['PUT', 'POST', 'DELETE']: + return req.method + return None + + class APIRouter(wsgi.Router): """ Routes requests on the Rackspace API to the appropriate controller diff --git a/nova/api/rackspace/ratelimiting/__init__.py b/nova/api/rackspace/ratelimiting/__init__.py new file mode 100644 index 000000000..f843bac0f --- /dev/null +++ b/nova/api/rackspace/ratelimiting/__init__.py @@ -0,0 +1,122 @@ +"""Rate limiting of arbitrary actions.""" + +import httplib +import time +import urllib +import webob.dec +import webob.exc + + +# Convenience constants for the limits dictionary passed to Limiter(). +PER_SECOND = 1 +PER_MINUTE = 60 +PER_HOUR = 60 * 60 +PER_DAY = 60 * 60 * 24 + +class Limiter(object): + + """Class providing rate limiting of arbitrary actions.""" + + def __init__(self, limits): + """Create a rate limiter. + + limits: a dict mapping from action name to a tuple. The tuple contains + the number of times the action may be performed, and the time period + (in seconds) during which the number must not be exceeded for this + action. Example: dict(reboot=(10, ratelimiting.PER_MINUTE)) would + allow 10 'reboot' actions per minute. + """ + self.limits = limits + self._levels = {} + + def perform(self, action_name, username='nobody'): + """Attempt to perform an action by the given username. + + action_name: the string name of the action to perform. This must + be a key in the limits dict passed to the ctor. + + username: an optional string name of the user performing the action. + Each user has her own set of rate limiting counters. Defaults to + 'nobody' (so that if you never specify a username when calling + perform(), a single set of counters will be used.) + + Return None if the action may proceed. If the action may not proceed + because it has been rate limited, return the float number of seconds + until the action would succeed. + """ + # Think of rate limiting as a bucket leaking water at 1cc/second. The + # bucket can hold as many ccs as there are seconds in the rate + # limiting period (e.g. 3600 for per-hour ratelimits), and if you can + # perform N actions in that time, each action fills the bucket by + # 1/Nth of its volume. You may only perform an action if the bucket + # would not overflow. + now = time.time() + key = '%s:%s' % (username, action_name) + last_time_performed, water_level = self._levels.get(key, (now, 0)) + # The bucket leaks 1cc/second. + water_level -= (now - last_time_performed) + if water_level < 0: + water_level = 0 + num_allowed_per_period, period_in_secs = self.limits[action_name] + # Fill the bucket by 1/Nth its capacity, and hope it doesn't overflow. + capacity = period_in_secs + new_level = water_level + (capacity * 1.0 / num_allowed_per_period) + if new_level > capacity: + # Delay this many seconds. + return new_level - capacity + self._levels[key] = (now, new_level) + return None + + +# If one instance of this WSGIApps is unable to handle your load, put a +# sharding app in front that shards by username to one of many backends. + +class WSGIApp(object): + + """Application that tracks rate limits in memory. Send requests to it of + this form: + + POST /limiter/<username>/<urlencoded action> + + and receive a 200 OK, or a 403 Forbidden with an X-Wait-Seconds header + containing the number of seconds to wait before the action would succeed. + """ + + def __init__(self, limiter): + """Create the WSGI application using the given Limiter instance.""" + self.limiter = limiter + + @webob.dec.wsgify + def __call__(self, req): + parts = req.path_info.split('/') + # format: /limiter/<username>/<urlencoded action> + if req.method != 'POST': + raise webob.exc.HTTPMethodNotAllowed() + if len(parts) != 4 or parts[1] != 'limiter': + raise webob.exc.HTTPNotFound() + username = parts[2] + action_name = urllib.unquote(parts[3]) + delay = self.limiter.perform(action_name, username) + if delay: + return webob.exc.HTTPForbidden( + headers={'X-Wait-Seconds': "%.2f" % delay}) + else: + return '' # 200 OK + + +class WSGIAppProxy(object): + + """Limiter lookalike that proxies to a ratelimiting.WSGIApp.""" + + def __init__(self, service_host): + """Creates a proxy pointing to a ratelimiting.WSGIApp at the given + host.""" + self.service_host = service_host + + def perform(self, action, username='nobody'): + conn = httplib.HTTPConnection(self.service_host) + conn.request('POST', '/limiter/%s/%s' % (username, action)) + resp = conn.getresponse() + if resp.status == 200: + return None # no delay + return float(resp.getheader('X-Wait-Seconds')) diff --git a/nova/api/rackspace/ratelimiting/tests.py b/nova/api/rackspace/ratelimiting/tests.py new file mode 100644 index 000000000..4c9510917 --- /dev/null +++ b/nova/api/rackspace/ratelimiting/tests.py @@ -0,0 +1,237 @@ +import httplib +import StringIO +import time +import unittest +import webob + +import nova.api.rackspace.ratelimiting as ratelimiting + +class LimiterTest(unittest.TestCase): + + def setUp(self): + self.limits = { + 'a': (5, ratelimiting.PER_SECOND), + 'b': (5, ratelimiting.PER_MINUTE), + 'c': (5, ratelimiting.PER_HOUR), + 'd': (1, ratelimiting.PER_SECOND), + 'e': (100, ratelimiting.PER_SECOND)} + self.rl = ratelimiting.Limiter(self.limits) + + def exhaust(self, action, times_until_exhausted, **kwargs): + for i in range(times_until_exhausted): + when = self.rl.perform(action, **kwargs) + self.assertEqual(when, None) + num, period = self.limits[action] + delay = period * 1.0 / num + # Verify that we are now thoroughly delayed + for i in range(10): + when = self.rl.perform(action, **kwargs) + self.assertAlmostEqual(when, delay, 2) + + def test_second(self): + self.exhaust('a', 5) + time.sleep(0.2) + self.exhaust('a', 1) + time.sleep(1) + self.exhaust('a', 5) + + def test_minute(self): + self.exhaust('b', 5) + + def test_one_per_period(self): + def allow_once_and_deny_once(): + when = self.rl.perform('d') + self.assertEqual(when, None) + when = self.rl.perform('d') + self.assertAlmostEqual(when, 1, 2) + return when + time.sleep(allow_once_and_deny_once()) + time.sleep(allow_once_and_deny_once()) + allow_once_and_deny_once() + + def test_we_can_go_indefinitely_if_we_spread_out_requests(self): + for i in range(200): + when = self.rl.perform('e') + self.assertEqual(when, None) + time.sleep(0.01) + + def test_users_get_separate_buckets(self): + self.exhaust('c', 5, username='alice') + self.exhaust('c', 5, username='bob') + self.exhaust('c', 5, username='chuck') + self.exhaust('c', 0, username='chuck') + self.exhaust('c', 0, username='bob') + self.exhaust('c', 0, username='alice') + + +class FakeLimiter(object): + """Fake Limiter class that you can tell how to behave.""" + def __init__(self, test): + self._action = self._username = self._delay = None + self.test = test + def mock(self, action, username, delay): + self._action = action + self._username = username + self._delay = delay + def perform(self, action, username): + self.test.assertEqual(action, self._action) + self.test.assertEqual(username, self._username) + return self._delay + + +class WSGIAppTest(unittest.TestCase): + + def setUp(self): + self.limiter = FakeLimiter(self) + self.app = ratelimiting.WSGIApp(self.limiter) + + def test_invalid_methods(self): + requests = [] + for method in ['GET', 'PUT', 'DELETE']: + req = webob.Request.blank('/limits/michael/breakdance', + dict(REQUEST_METHOD=method)) + requests.append(req) + for req in requests: + self.assertEqual(req.get_response(self.app).status_int, 405) + + def test_invalid_urls(self): + requests = [] + for prefix in ['limit', '', 'limiter2', 'limiter/limits', 'limiter/1']: + req = webob.Request.blank('/%s/michael/breakdance' % prefix, + dict(REQUEST_METHOD='POST')) + requests.append(req) + for req in requests: + self.assertEqual(req.get_response(self.app).status_int, 404) + + def verify(self, url, username, action, delay=None): + """Make sure that POSTing to the given url causes the given username + to perform the given action. Make the internal rate limiter return + delay and make sure that the WSGI app returns the correct response. + """ + req = webob.Request.blank(url, dict(REQUEST_METHOD='POST')) + self.limiter.mock(action, username, delay) + resp = req.get_response(self.app) + if not delay: + self.assertEqual(resp.status_int, 200) + else: + self.assertEqual(resp.status_int, 403) + self.assertEqual(resp.headers['X-Wait-Seconds'], "%.2f" % delay) + + def test_good_urls(self): + self.verify('/limiter/michael/hoot', 'michael', 'hoot') + + def test_escaping(self): + self.verify('/limiter/michael/jump%20up', 'michael', 'jump up') + + def test_response_to_delays(self): + self.verify('/limiter/michael/hoot', 'michael', 'hoot', 1) + self.verify('/limiter/michael/hoot', 'michael', 'hoot', 1.56) + self.verify('/limiter/michael/hoot', 'michael', 'hoot', 1000) + + +class FakeHttplibSocket(object): + """a fake socket implementation for httplib.HTTPResponse, trivial""" + + def __init__(self, response_string): + self._buffer = StringIO.StringIO(response_string) + + def makefile(self, _mode, _other): + """Returns the socket's internal buffer""" + return self._buffer + + +class FakeHttplibConnection(object): + """A fake httplib.HTTPConnection + + Requests made via this connection actually get translated and routed into + our WSGI app, we then wait for the response and turn it back into + an httplib.HTTPResponse. + """ + def __init__(self, app, host, is_secure=False): + self.app = app + self.host = host + + def request(self, method, path, data='', headers={}): + req = webob.Request.blank(path) + req.method = method + req.body = data + req.headers = headers + req.host = self.host + # Call the WSGI app, get the HTTP response + resp = str(req.get_response(self.app)) + # For some reason, the response doesn't have "HTTP/1.0 " prepended; I + # guess that's a function the web server usually provides. + resp = "HTTP/1.0 %s" % resp + sock = FakeHttplibSocket(resp) + self.http_response = httplib.HTTPResponse(sock) + self.http_response.begin() + + def getresponse(self): + return self.http_response + + +def wire_HTTPConnection_to_WSGI(host, app): + """Monkeypatches HTTPConnection so that if you try to connect to host, you + are instead routed straight to the given WSGI app. + + After calling this method, when any code calls + + httplib.HTTPConnection(host) + + the connection object will be a fake. Its requests will be sent directly + to the given WSGI app rather than through a socket. + + Code connecting to hosts other than host will not be affected. + + This method may be called multiple times to map different hosts to + different apps. + """ + class HTTPConnectionDecorator(object): + """Wraps the real HTTPConnection class so that when you instantiate + the class you might instead get a fake instance.""" + def __init__(self, wrapped): + self.wrapped = wrapped + def __call__(self, connection_host, *args, **kwargs): + if connection_host == host: + return FakeHttplibConnection(app, host) + else: + return self.wrapped(connection_host, *args, **kwargs) + httplib.HTTPConnection = HTTPConnectionDecorator(httplib.HTTPConnection) + + +class WSGIAppProxyTest(unittest.TestCase): + + def setUp(self): + """Our WSGIAppProxy is going to call across an HTTPConnection to a + WSGIApp running a limiter. The proxy will send input, and the proxy + should receive that same input, pass it to the limiter who gives a + result, and send the expected result back. + + The HTTPConnection isn't real -- it's monkeypatched to point straight + at the WSGIApp. And the limiter isn't real -- it's a fake that + behaves the way we tell it to. + """ + self.limiter = FakeLimiter(self) + app = ratelimiting.WSGIApp(self.limiter) + wire_HTTPConnection_to_WSGI('100.100.100.100:80', app) + self.proxy = ratelimiting.WSGIAppProxy('100.100.100.100:80') + + def test_200(self): + self.limiter.mock('conquer', 'caesar', None) + when = self.proxy.perform('conquer', 'caesar') + self.assertEqual(when, None) + + def test_403(self): + self.limiter.mock('grumble', 'proletariat', 1.5) + when = self.proxy.perform('grumble', 'proletariat') + self.assertEqual(when, 1.5) + + def test_failure(self): + def shouldRaise(): + self.limiter.mock('murder', 'brutus', None) + self.proxy.perform('stab', 'brutus') + self.assertRaises(AssertionError, shouldRaise) + + +if __name__ == '__main__': + unittest.main() diff --git a/nova/api/rackspace/servers.py b/nova/api/rackspace/servers.py index 44174ca52..1815f7523 100644 --- a/nova/api/rackspace/servers.py +++ b/nova/api/rackspace/servers.py @@ -72,7 +72,7 @@ class Controller(base.Controller): inst['reservation_id'] = reservation inst['launch_time'] = ltime inst['mac_address'] = utils.generate_mac() - inst_id = db.instance_create(None, inst) + inst_id = db.instance_create(None, inst)['id'] address = self.network_manager.allocate_fixed_ip(None, inst_id) # key_data, key_name, ami_launch_index # TODO(todd): key data or root password diff --git a/nova/auth/fakeldap.py b/nova/auth/fakeldap.py index bfc3433c5..2791dfde6 100644 --- a/nova/auth/fakeldap.py +++ b/nova/auth/fakeldap.py @@ -33,6 +33,7 @@ SCOPE_ONELEVEL = 1 # not implemented SCOPE_SUBTREE = 2 MOD_ADD = 0 MOD_DELETE = 1 +MOD_REPLACE = 2 class NO_SUCH_OBJECT(Exception): # pylint: disable-msg=C0103 @@ -175,7 +176,7 @@ class FakeLDAP(object): Args: dn -- a dn attrs -- a list of tuples in the following form: - ([MOD_ADD | MOD_DELETE], attribute, value) + ([MOD_ADD | MOD_DELETE | MOD_REPACE], attribute, value) """ redis = datastore.Redis.instance() @@ -185,6 +186,8 @@ class FakeLDAP(object): values = _from_json(redis.hget(key, k)) if cmd == MOD_ADD: values.append(v) + elif cmd == MOD_REPLACE: + values = [v] else: values.remove(v) values = redis.hset(key, k, _to_json(values)) diff --git a/nova/auth/ldapdriver.py b/nova/auth/ldapdriver.py index 74ba011b5..021851ebf 100644 --- a/nova/auth/ldapdriver.py +++ b/nova/auth/ldapdriver.py @@ -99,13 +99,6 @@ class LdapDriver(object): dn = FLAGS.ldap_user_subtree return self.__to_user(self.__find_object(dn, query)) - def get_key_pair(self, uid, key_name): - """Retrieve key pair by uid and key name""" - dn = 'cn=%s,%s' % (key_name, - self.__uid_to_dn(uid)) - attr = self.__find_object(dn, '(objectclass=novaKeyPair)') - return self.__to_key_pair(uid, attr) - def get_project(self, pid): """Retrieve project by id""" dn = 'cn=%s,%s' % (pid, @@ -119,12 +112,6 @@ class LdapDriver(object): '(objectclass=novaUser)') return [self.__to_user(attr) for attr in attrs] - def get_key_pairs(self, uid): - """Retrieve list of key pairs""" - attrs = self.__find_objects(self.__uid_to_dn(uid), - '(objectclass=novaKeyPair)') - return [self.__to_key_pair(uid, attr) for attr in attrs] - def get_projects(self, uid=None): """Retrieve list of projects""" pattern = '(objectclass=novaProject)' @@ -154,21 +141,6 @@ class LdapDriver(object): self.conn.add_s(self.__uid_to_dn(name), attr) return self.__to_user(dict(attr)) - def create_key_pair(self, uid, key_name, public_key, fingerprint): - """Create a key pair""" - # TODO(vish): possibly refactor this to store keys in their own ou - # and put dn reference in the user object - attr = [ - ('objectclass', ['novaKeyPair']), - ('cn', [key_name]), - ('sshPublicKey', [public_key]), - ('keyFingerprint', [fingerprint]), - ] - self.conn.add_s('cn=%s,%s' % (key_name, - self.__uid_to_dn(uid)), - attr) - return self.__to_key_pair(uid, dict(attr)) - def create_project(self, name, manager_uid, description=None, member_uids=None): """Create a project""" @@ -202,6 +174,24 @@ class LdapDriver(object): self.conn.add_s('cn=%s,%s' % (name, FLAGS.ldap_project_subtree), attr) return self.__to_project(dict(attr)) + def modify_project(self, project_id, manager_uid=None, description=None): + """Modify an existing project""" + if not manager_uid and not description: + return + attr = [] + if manager_uid: + if not self.__user_exists(manager_uid): + raise exception.NotFound("Project can't be modified because " + "manager %s doesn't exist" % + manager_uid) + manager_dn = self.__uid_to_dn(manager_uid) + attr.append((self.ldap.MOD_REPLACE, 'projectManager', manager_dn)) + if description: + attr.append((self.ldap.MOD_REPLACE, 'description', description)) + self.conn.modify_s('cn=%s,%s' % (project_id, + FLAGS.ldap_project_subtree), + attr) + def add_to_project(self, uid, project_id): """Add user to project""" dn = 'cn=%s,%s' % (project_id, FLAGS.ldap_project_subtree) @@ -265,19 +255,10 @@ class LdapDriver(object): """Delete a user""" if not self.__user_exists(uid): raise exception.NotFound("User %s doesn't exist" % uid) - self.__delete_key_pairs(uid) self.__remove_from_all(uid) self.conn.delete_s('uid=%s,%s' % (uid, FLAGS.ldap_user_subtree)) - def delete_key_pair(self, uid, key_name): - """Delete a key pair""" - if not self.__key_pair_exists(uid, key_name): - raise exception.NotFound("Key Pair %s doesn't exist for user %s" % - (key_name, uid)) - self.conn.delete_s('cn=%s,uid=%s,%s' % (key_name, uid, - FLAGS.ldap_user_subtree)) - def delete_project(self, project_id): """Delete a project""" project_dn = 'cn=%s,%s' % (project_id, FLAGS.ldap_project_subtree) @@ -288,10 +269,6 @@ class LdapDriver(object): """Check if user exists""" return self.get_user(uid) != None - def __key_pair_exists(self, uid, key_name): - """Check if key pair exists""" - return self.get_key_pair(uid, key_name) != None - def __project_exists(self, project_id): """Check if project exists""" return self.get_project(project_id) != None @@ -341,13 +318,6 @@ class LdapDriver(object): """Check if group exists""" return self.__find_object(dn, '(objectclass=groupOfNames)') != None - def __delete_key_pairs(self, uid): - """Delete all key pairs for user""" - keys = self.get_key_pairs(uid) - if keys != None: - for key in keys: - self.delete_key_pair(uid, key['name']) - @staticmethod def __role_to_dn(role, project_id=None): """Convert role to corresponding dn""" @@ -472,18 +442,6 @@ class LdapDriver(object): 'secret': attr['secretKey'][0], 'admin': (attr['isAdmin'][0] == 'TRUE')} - @staticmethod - def __to_key_pair(owner, attr): - """Convert ldap attributes to KeyPair object""" - if attr == None: - return None - return { - 'id': attr['cn'][0], - 'name': attr['cn'][0], - 'owner_id': owner, - 'public_key': attr['sshPublicKey'][0], - 'fingerprint': attr['keyFingerprint'][0]} - def __to_project(self, attr): """Convert ldap attributes to Project object""" if attr == None: diff --git a/nova/auth/manager.py b/nova/auth/manager.py index 323c48dd0..c6f366087 100644 --- a/nova/auth/manager.py +++ b/nova/auth/manager.py @@ -128,24 +128,6 @@ class User(AuthBase): def is_project_manager(self, project): return AuthManager().is_project_manager(self, project) - def generate_key_pair(self, name): - return AuthManager().generate_key_pair(self.id, name) - - def create_key_pair(self, name, public_key, fingerprint): - return AuthManager().create_key_pair(self.id, - name, - public_key, - fingerprint) - - def get_key_pair(self, name): - return AuthManager().get_key_pair(self.id, name) - - def delete_key_pair(self, name): - return AuthManager().delete_key_pair(self.id, name) - - def get_key_pairs(self): - return AuthManager().get_key_pairs(self.id) - def __repr__(self): return "User('%s', '%s', '%s', '%s', %s)" % (self.id, self.name, @@ -154,29 +136,6 @@ class User(AuthBase): self.admin) -class KeyPair(AuthBase): - """Represents an ssh key returned from the datastore - - Even though this object is named KeyPair, only the public key and - fingerprint is stored. The user's private key is not saved. - """ - - def __init__(self, id, name, owner_id, public_key, fingerprint): - AuthBase.__init__(self) - self.id = id - self.name = name - self.owner_id = owner_id - self.public_key = public_key - self.fingerprint = fingerprint - - def __repr__(self): - return "KeyPair('%s', '%s', '%s', '%s', '%s')" % (self.id, - self.name, - self.owner_id, - self.public_key, - self.fingerprint) - - class Project(AuthBase): """Represents a Project returned from the datastore""" @@ -539,6 +498,26 @@ class AuthManager(object): db.security_group_create({}, values) return project + def modify_project(self, project, manager_user=None, description=None): + """Modify a project + + @type name: Project or project_id + @param project: The project to modify. + + @type manager_user: User or uid + @param manager_user: This user will be the new project manager. + + @type description: str + @param project: This will be the new description of the project. + + """ + if manager_user: + manager_user = User.safe_id(manager_user) + with self.driver() as drv: + drv.modify_project(Project.safe_id(project), + manager_user, + description) + def add_to_project(self, user, project): """Add user to project""" with self.driver() as drv: @@ -659,67 +638,13 @@ class AuthManager(object): return User(**user_dict) def delete_user(self, user): - """Deletes a user""" - with self.driver() as drv: - drv.delete_user(User.safe_id(user)) - - def generate_key_pair(self, user, key_name): - """Generates a key pair for a user - - Generates a public and private key, stores the public key using the - key_name, and returns the private key and fingerprint. - - @type user: User or uid - @param user: User for which to create key pair. + """Deletes a user - @type key_name: str - @param key_name: Name to use for the generated KeyPair. - - @rtype: tuple (private_key, fingerprint) - @return: A tuple containing the private_key and fingerprint. - """ - # NOTE(vish): generating key pair is slow so check for legal - # creation before creating keypair + Additionally deletes all users key_pairs""" uid = User.safe_id(user) + db.key_pair_destroy_all_by_user(None, uid) with self.driver() as drv: - if not drv.get_user(uid): - raise exception.NotFound("User %s doesn't exist" % user) - if drv.get_key_pair(uid, key_name): - raise exception.Duplicate("The keypair %s already exists" - % key_name) - private_key, public_key, fingerprint = crypto.generate_key_pair() - self.create_key_pair(uid, key_name, public_key, fingerprint) - return private_key, fingerprint - - def create_key_pair(self, user, key_name, public_key, fingerprint): - """Creates a key pair for user""" - with self.driver() as drv: - kp_dict = drv.create_key_pair(User.safe_id(user), - key_name, - public_key, - fingerprint) - if kp_dict: - return KeyPair(**kp_dict) - - def get_key_pair(self, user, key_name): - """Retrieves a key pair for user""" - with self.driver() as drv: - kp_dict = drv.get_key_pair(User.safe_id(user), key_name) - if kp_dict: - return KeyPair(**kp_dict) - - def get_key_pairs(self, user): - """Retrieves all key pairs for user""" - with self.driver() as drv: - kp_list = drv.get_key_pairs(User.safe_id(user)) - if not kp_list: - return [] - return [KeyPair(**kp_dict) for kp_dict in kp_list] - - def delete_key_pair(self, user, key_name): - """Deletes a key pair for user""" - with self.driver() as drv: - drv.delete_key_pair(User.safe_id(user), key_name) + drv.delete_user(uid) def get_credentials(self, user, project=None): """Get credential zip for user in project""" diff --git a/nova/cloudpipe/pipelib.py b/nova/cloudpipe/pipelib.py index 2867bcb21..de6a97fb6 100644 --- a/nova/cloudpipe/pipelib.py +++ b/nova/cloudpipe/pipelib.py @@ -58,7 +58,7 @@ class CloudPipe(object): z.write(FLAGS.boot_script_template,'autorun.sh') z.close() - key_name = self.setup_keypair(project.project_manager_id, project_id) + key_name = self.setup_key_pair(project.project_manager_id, project_id) zippy = open(zippath, "r") context = api.APIRequestContext(handler=None, user=project.project_manager, project=project) @@ -74,7 +74,7 @@ class CloudPipe(object): security_groups=["vpn-secgroup"]) zippy.close() - def setup_keypair(self, user_id, project_id): + def setup_key_pair(self, user_id, project_id): key_name = '%s%s' % (project_id, FLAGS.vpn_key_suffix) try: private_key, fingerprint = self.manager.generate_key_pair(user_id, key_name) diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 1f3a181ff..94cea1c50 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -21,6 +21,7 @@ Handles all code relating to instances (guest vms) """ import base64 +import datetime import logging import os @@ -88,6 +89,10 @@ class ComputeManager(manager.Manager): try: yield self.driver.spawn(instance_ref) + now = datetime.datetime.utcnow() + self.db.instance_update(context, + instance_id, + {'launched_at': now}) except Exception: # pylint: disable-msg=W0702 logging.exception("instance %s: Failed to spawn", instance_ref['name']) @@ -102,8 +107,8 @@ class ComputeManager(manager.Manager): def terminate_instance(self, context, instance_id): """Terminate an instance on this machine.""" logging.debug("instance %s: terminating", instance_id) - instance_ref = self.db.instance_get(context, instance_id) + instance_ref = self.db.instance_get(context, instance_id) if instance_ref['state'] == power_state.SHUTOFF: self.db.instance_destroy(context, instance_id) raise exception.Error('trying to destroy already destroyed' @@ -189,7 +194,7 @@ class ComputeManager(manager.Manager): volume_id) instance_ref = self.db.instance_get(context, instance_id) volume_ref = self.db.volume_get(context, volume_id) - self.driver.detach_volume(instance_ref['str_id'], - volume_ref['mountpoint']) + yield self.driver.detach_volume(instance_ref['str_id'], + volume_ref['mountpoint']) self.db.volume_detached(context, volume_id) defer.returnValue(True) diff --git a/nova/crypto.py b/nova/crypto.py index b05548ea1..1c6fe57ad 100644 --- a/nova/crypto.py +++ b/nova/crypto.py @@ -18,7 +18,7 @@ """ Wrappers around standard crypto, including root and intermediate CAs, -SSH keypairs and x509 certificates. +SSH key_pairs and x509 certificates. """ import base64 diff --git a/nova/db/api.py b/nova/db/api.py index fa937dab2..e4655fd23 100644 --- a/nova/db/api.py +++ b/nova/db/api.py @@ -51,11 +51,45 @@ class NoMoreNetworks(exception.Error): ################### +def service_destroy(context, instance_id): + """Destroy the service or raise if it does not exist.""" + return IMPL.service_destroy(context, instance_id) + + def service_get(context, service_id): """Get an service or raise if it does not exist.""" return IMPL.service_get(context, service_id) +def service_get_all_by_topic(context, topic): + """Get all compute services for a given topic """ + return IMPL.service_get_all_by_topic(context, topic) + + +def service_get_all_compute_sorted(context): + """Get all compute services sorted by instance count + + Returns a list of (Service, instance_count) tuples + """ + return IMPL.service_get_all_compute_sorted(context) + + +def service_get_all_network_sorted(context): + """Get all network services sorted by network count + + Returns a list of (Service, network_count) tuples + """ + return IMPL.service_get_all_network_sorted(context) + + +def service_get_all_volume_sorted(context): + """Get all volume services sorted by volume count + + Returns a list of (Service, volume_count) tuples + """ + return IMPL.service_get_all_volume_sorted(context) + + def service_get_by_args(context, host, binary): """Get the state of an service by node name and binary.""" return IMPL.service_get_by_args(context, host, binary) @@ -91,6 +125,21 @@ def floating_ip_create(context, values): return IMPL.floating_ip_create(context, values) +def floating_ip_count_by_project(context, project_id): + """Count floating ips used by project.""" + return IMPL.floating_ip_count_by_project(context, project_id) + + +def floating_ip_deallocate(context, address): + """Deallocate an floating ip by address""" + return IMPL.floating_ip_deallocate(context, address) + + +def floating_ip_destroy(context, address): + """Destroy the floating_ip or raise if it does not exist.""" + return IMPL.floating_ip_destroy(context, address) + + def floating_ip_disassociate(context, address): """Disassociate an floating ip from a fixed ip by address. @@ -99,11 +148,6 @@ def floating_ip_disassociate(context, address): return IMPL.floating_ip_disassociate(context, address) -def floating_ip_deallocate(context, address): - """Deallocate an floating ip by address""" - return IMPL.floating_ip_deallocate(context, address) - - def floating_ip_fixed_ip_associate(context, floating_address, fixed_address): """Associate an floating ip to a fixed_ip by address.""" return IMPL.floating_ip_fixed_ip_associate(context, @@ -111,6 +155,16 @@ def floating_ip_fixed_ip_associate(context, floating_address, fixed_address): fixed_address) +def floating_ip_get_all(context): + """Get all floating ips.""" + return IMPL.floating_ip_get_all(context) + + +def floating_ip_get_all_by_host(context, host): + """Get all floating ips.""" + return IMPL.floating_ip_get_all_by_host(context, host) + + def floating_ip_get_by_address(context, address): """Get a floating ip by address or raise if it doesn't exist.""" return IMPL.floating_ip_get_by_address(context, address) @@ -124,12 +178,20 @@ def floating_ip_get_instance(context, address): #################### -def fixed_ip_allocate(context, network_id): - """Allocate free fixed ip and return the address. +def fixed_ip_associate(context, address, instance_id): + """Associate fixed ip to instance. + + Raises if fixed ip is not available. + """ + return IMPL.fixed_ip_associate(context, address, instance_id) + + +def fixed_ip_associate_pool(context, network_id, instance_id): + """Find free ip in network and associate it to instance. Raises if one is not available. """ - return IMPL.fixed_ip_allocate(context, network_id) + return IMPL.fixed_ip_associate_pool(context, network_id, instance_id) def fixed_ip_create(context, values): @@ -137,9 +199,9 @@ def fixed_ip_create(context, values): return IMPL.fixed_ip_create(context, values) -def fixed_ip_deallocate(context, address): - """Deallocate a fixed ip by address.""" - return IMPL.fixed_ip_deallocate(context, address) +def fixed_ip_disassociate(context, address): + """Disassociate a fixed ip from an instance by address.""" + return IMPL.fixed_ip_disassociate(context, address) def fixed_ip_get_by_address(context, address): @@ -157,16 +219,6 @@ def fixed_ip_get_network(context, address): return IMPL.fixed_ip_get_network(context, address) -def fixed_ip_instance_associate(context, address, instance_id): - """Associate a fixed ip to an instance by address.""" - return IMPL.fixed_ip_instance_associate(context, address, instance_id) - - -def fixed_ip_instance_disassociate(context, address): - """Disassociate a fixed ip from an instance by address.""" - return IMPL.fixed_ip_instance_disassociate(context, address) - - def fixed_ip_update(context, address, values): """Create a fixed ip from the values dictionary.""" return IMPL.fixed_ip_update(context, address, values) @@ -180,6 +232,11 @@ def instance_create(context, values): return IMPL.instance_create(context, values) +def instance_data_get_for_project(context, project_id): + """Get (instance_count, core_count) for project.""" + return IMPL.instance_data_get_for_project(context, project_id) + + def instance_destroy(context, instance_id): """Destroy the instance or raise if it does not exist.""" return IMPL.instance_destroy(context, instance_id) @@ -220,11 +277,6 @@ def instance_get_by_str(context, str_id): return IMPL.instance_get_by_str(context, str_id) -def instance_get_host(context, instance_id): - """Get the host that the instance is running on.""" - return IMPL.instance_get_host(context, instance_id) - - def instance_is_vpn(context, instance_id): """True if instance is a vpn.""" return IMPL.instance_is_vpn(context, instance_id) @@ -249,6 +301,34 @@ def instance_add_security_group(context, instance_id, security_group_id): return IMPL.instance_add_security_group(context, instance_id, security_group_id) +################### + + +def key_pair_create(context, values): + """Create a key_pair from the values dictionary.""" + return IMPL.key_pair_create(context, values) + + +def key_pair_destroy(context, user_id, name): + """Destroy the key_pair or raise if it does not exist.""" + return IMPL.key_pair_destroy(context, user_id, name) + + +def key_pair_destroy_all_by_user(context, user_id): + """Destroy all key_pairs by user.""" + return IMPL.key_pair_destroy_all_by_user(context, user_id) + + +def key_pair_get(context, user_id, name): + """Get a key_pair or raise if it does not exist.""" + return IMPL.key_pair_get(context, user_id, name) + + +def key_pair_get_all_by_user(context, user_id): + """Get all key_pairs by user.""" + return IMPL.key_pair_get_all_by_user(context, user_id) + + #################### @@ -303,11 +383,6 @@ def network_get_by_bridge(context, bridge): return IMPL.network_get_by_bridge(context, bridge) -def network_get_host(context, network_id): - """Get host assigned to network or raise""" - return IMPL.network_get_host(context, network_id) - - def network_get_index(context, network_id): """Get non-conflicting index for network""" return IMPL.network_get_index(context, network_id) @@ -379,6 +454,29 @@ def export_device_create(context, values): ################### +def quota_create(context, values): + """Create a quota from the values dictionary.""" + return IMPL.quota_create(context, values) + + +def quota_get(context, project_id): + """Retrieve a quota or raise if it does not exist.""" + return IMPL.quota_get(context, project_id) + + +def quota_update(context, project_id, values): + """Update a quota from the values dictionary.""" + return IMPL.quota_update(context, project_id, values) + + +def quota_destroy(context, project_id): + """Destroy the quota or raise if it does not exist.""" + return IMPL.quota_destroy(context, project_id) + + +################### + + def volume_allocate_shelf_and_blade(context, volume_id): """Atomically allocate a free shelf and blade from the pool.""" return IMPL.volume_allocate_shelf_and_blade(context, volume_id) @@ -394,6 +492,11 @@ def volume_create(context, values): return IMPL.volume_create(context, values) +def volume_data_get_for_project(context, project_id): + """Get (volume_count, gigabytes) for project.""" + return IMPL.volume_data_get_for_project(context, project_id) + + def volume_destroy(context, volume_id): """Destroy the volume or raise if it does not exist.""" return IMPL.volume_destroy(context, volume_id) @@ -429,11 +532,6 @@ def volume_get_by_str(context, str_id): return IMPL.volume_get_by_str(context, str_id) -def volume_get_host(context, volume_id): - """Get the host that the volume is running on.""" - return IMPL.volume_get_host(context, volume_id) - - def volume_get_shelf_and_blade(context, volume_id): """Get the shelf and blade allocated to the volume.""" return IMPL.volume_get_shelf_and_blade(context, volume_id) diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py index 038bb7f23..858b955f3 100644 --- a/nova/db/sqlalchemy/api.py +++ b/nova/db/sqlalchemy/api.py @@ -26,20 +26,114 @@ from nova.db.sqlalchemy import models from nova.db.sqlalchemy.session import get_session from sqlalchemy import or_ from sqlalchemy.orm import eagerload +from sqlalchemy.orm import joinedload_all +from sqlalchemy.sql import func FLAGS = flags.FLAGS + # NOTE(vish): disabling docstring pylint because the docstrings are # in the interface definition # pylint: disable-msg=C0111 +def _deleted(context): + """Calculates whether to include deleted objects based on context. + + Currently just looks for a flag called deleted in the context dict. + """ + if not hasattr(context, 'get'): + return False + return context.get('deleted', False) + ################### +def service_destroy(context, service_id): + session = get_session() + with session.begin(): + service_ref = models.Service.find(service_id, session=session) + service_ref.delete(session=session) + def service_get(_context, service_id): return models.Service.find(service_id) +def service_get_all_by_topic(context, topic): + session = get_session() + return session.query(models.Service + ).filter_by(deleted=False + ).filter_by(topic=topic + ).all() + + +def _service_get_all_topic_subquery(_context, session, topic, subq, label): + sort_value = getattr(subq.c, label) + return session.query(models.Service, func.coalesce(sort_value, 0) + ).filter_by(topic=topic + ).filter_by(deleted=False + ).outerjoin((subq, models.Service.host == subq.c.host) + ).order_by(sort_value + ).all() + + +def service_get_all_compute_sorted(context): + session = get_session() + with session.begin(): + # NOTE(vish): The intended query is below + # SELECT services.*, COALESCE(inst_cores.instance_cores, + # 0) + # FROM services LEFT OUTER JOIN + # (SELECT host, SUM(instances.vcpus) AS instance_cores + # FROM instances GROUP BY host) AS inst_cores + # ON services.host = inst_cores.host + topic = 'compute' + label = 'instance_cores' + subq = session.query(models.Instance.host, + func.sum(models.Instance.vcpus).label(label) + ).filter_by(deleted=False + ).group_by(models.Instance.host + ).subquery() + return _service_get_all_topic_subquery(context, + session, + topic, + subq, + label) + + +def service_get_all_network_sorted(context): + session = get_session() + with session.begin(): + topic = 'network' + label = 'network_count' + subq = session.query(models.Network.host, + func.count(models.Network.id).label(label) + ).filter_by(deleted=False + ).group_by(models.Network.host + ).subquery() + return _service_get_all_topic_subquery(context, + session, + topic, + subq, + label) + + +def service_get_all_volume_sorted(context): + session = get_session() + with session.begin(): + topic = 'volume' + label = 'volume_gigabytes' + subq = session.query(models.Volume.host, + func.sum(models.Volume.size).label(label) + ).filter_by(deleted=False + ).group_by(models.Volume.host + ).subquery() + return _service_get_all_topic_subquery(context, + session, + topic, + subq, + label) + + def service_get_by_args(_context, host, binary): return models.Service.find_by_args(host, binary) @@ -49,7 +143,7 @@ def service_create(_context, values): for (key, value) in values.iteritems(): service_ref[key] = value service_ref.save() - return service_ref.id + return service_ref def service_update(_context, service_id, values): @@ -90,6 +184,14 @@ def floating_ip_create(_context, values): return floating_ip_ref['address'] +def floating_ip_count_by_project(_context, project_id): + session = get_session() + return session.query(models.FloatingIp + ).filter_by(project_id=project_id + ).filter_by(deleted=False + ).count() + + def floating_ip_fixed_ip_associate(_context, floating_address, fixed_address): session = get_session() with session.begin(): @@ -101,6 +203,23 @@ def floating_ip_fixed_ip_associate(_context, floating_address, fixed_address): floating_ip_ref.save(session=session) +def floating_ip_deallocate(_context, address): + session = get_session() + with session.begin(): + floating_ip_ref = models.FloatingIp.find_by_str(address, + session=session) + floating_ip_ref['project_id'] = None + floating_ip_ref.save(session=session) + + +def floating_ip_destroy(_context, address): + session = get_session() + with session.begin(): + floating_ip_ref = models.FloatingIp.find_by_str(address, + session=session) + floating_ip_ref.delete(session=session) + + def floating_ip_disassociate(_context, address): session = get_session() with session.begin(): @@ -116,14 +235,21 @@ def floating_ip_disassociate(_context, address): return fixed_ip_address -def floating_ip_deallocate(_context, address): +def floating_ip_get_all(_context): session = get_session() - with session.begin(): - floating_ip_ref = models.FloatingIp.find_by_str(address, - session=session) - floating_ip_ref['project_id'] = None - floating_ip_ref.save(session=session) + return session.query(models.FloatingIp + ).options(joinedload_all('fixed_ip.instance') + ).filter_by(deleted=False + ).all() + +def floating_ip_get_all_by_host(_context, host): + session = get_session() + return session.query(models.FloatingIp + ).options(joinedload_all('fixed_ip.instance') + ).filter_by(host=host + ).filter_by(deleted=False + ).all() def floating_ip_get_by_address(_context, address): return models.FloatingIp.find_by_str(address) @@ -140,7 +266,25 @@ def floating_ip_get_instance(_context, address): ################### -def fixed_ip_allocate(_context, network_id): +def fixed_ip_associate(_context, address, instance_id): + session = get_session() + with session.begin(): + fixed_ip_ref = session.query(models.FixedIp + ).filter_by(address=address + ).filter_by(deleted=False + ).filter_by(instance=None + ).with_lockmode('update' + ).first() + # NOTE(vish): if with_lockmode isn't supported, as in sqlite, + # then this has concurrency issues + if not fixed_ip_ref: + raise db.NoMoreAddresses() + fixed_ip_ref.instance = models.Instance.find(instance_id, + session=session) + session.add(fixed_ip_ref) + + +def fixed_ip_associate_pool(_context, network_id, instance_id): session = get_session() with session.begin(): network_or_none = or_(models.FixedIp.network_id == network_id, @@ -148,9 +292,8 @@ def fixed_ip_allocate(_context, network_id): fixed_ip_ref = session.query(models.FixedIp ).filter(network_or_none ).filter_by(reserved=False - ).filter_by(allocated=False - ).filter_by(leased=False ).filter_by(deleted=False + ).filter_by(instance=None ).with_lockmode('update' ).first() # NOTE(vish): if with_lockmode isn't supported, as in sqlite, @@ -160,7 +303,8 @@ def fixed_ip_allocate(_context, network_id): if not fixed_ip_ref.network: fixed_ip_ref.network = models.Network.find(network_id, session=session) - fixed_ip_ref['allocated'] = True + fixed_ip_ref.instance = models.Instance.find(instance_id, + session=session) session.add(fixed_ip_ref) return fixed_ip_ref['address'] @@ -173,6 +317,14 @@ def fixed_ip_create(_context, values): return fixed_ip_ref['address'] +def fixed_ip_disassociate(_context, address): + session = get_session() + with session.begin(): + fixed_ip_ref = models.FixedIp.find_by_str(address, session=session) + fixed_ip_ref.instance = None + fixed_ip_ref.save(session=session) + + def fixed_ip_get_by_address(_context, address): return models.FixedIp.find_by_str(address) @@ -189,27 +341,6 @@ def fixed_ip_get_network(_context, address): return models.FixedIp.find_by_str(address, session=session).network -def fixed_ip_deallocate(context, address): - db.fixed_ip_update(context, address, {'allocated': False}) - - -def fixed_ip_instance_associate(_context, address, instance_id): - session = get_session() - with session.begin(): - fixed_ip_ref = models.FixedIp.find_by_str(address, session=session) - instance_ref = models.Instance.find(instance_id, session=session) - fixed_ip_ref.instance = instance_ref - fixed_ip_ref.save(session=session) - - -def fixed_ip_instance_disassociate(_context, address): - session = get_session() - with session.begin(): - fixed_ip_ref = models.FixedIp.find_by_str(address, session=session) - fixed_ip_ref.instance = None - fixed_ip_ref.save(session=session) - - def fixed_ip_update(_context, address, values): session = get_session() with session.begin(): @@ -227,7 +358,18 @@ def instance_create(_context, values): for (key, value) in values.iteritems(): instance_ref[key] = value instance_ref.save() - return instance_ref.id + return instance_ref + + +def instance_data_get_for_project(_context, project_id): + session = get_session() + result = session.query(func.count(models.Instance.id), + func.sum(models.Instance.vcpus) + ).filter_by(project_id=project_id + ).filter_by(deleted=False + ).first() + # NOTE(vish): convert None to 0 + return (result[0] or 0, result[1] or 0) def instance_destroy(_context, instance_id): @@ -237,35 +379,41 @@ def instance_destroy(_context, instance_id): instance_ref.delete(session=session) -def instance_get(_context, instance_id): +def instance_get(context, instance_id): session = get_session() return session.query(models.Instance ).options(eagerload('security_groups') - ).get(instance_id) + ).find(instance_id, deleted=_deleted(context)) -def instance_get_all(_context): - return models.Instance.all() +def instance_get_all(context): + session = get_session() + return session.query(models.Instance + ).options(joinedload_all('fixed_ip.floating_ips') + ).filter_by(deleted=_deleted(context) + ).all() -def instance_get_by_project(_context, project_id): +def instance_get_by_project(context, project_id): session = get_session() return session.query(models.Instance + ).options(joinedload_all('fixed_ip.floating_ips') ).filter_by(project_id=project_id - ).filter_by(deleted=False + ).filter_by(deleted=_deleted(context) ).all() def instance_get_by_reservation(_context, reservation_id): session = get_session() return session.query(models.Instance + ).options(joinedload_all('fixed_ip.floating_ips') ).filter_by(reservation_id=reservation_id ).filter_by(deleted=False ).all() -def instance_get_by_str(_context, str_id): - return models.Instance.find_by_str(str_id) +def instance_get_by_str(context, str_id): + return models.Instance.find_by_str(str_id, deleted=_deleted(context)) def instance_get_fixed_address(_context, instance_id): @@ -289,11 +437,6 @@ def instance_get_floating_address(_context, instance_id): return instance_ref.fixed_ip.floating_ips[0]['address'] -def instance_get_host(context, instance_id): - instance_ref = instance_get(context, instance_id) - return instance_ref['host'] - - def instance_is_vpn(context, instance_id): # TODO(vish): Move this into image code somewhere instance_ref = instance_get(context, instance_id) @@ -334,6 +477,46 @@ def instance_add_security_group(context, instance_id, security_group_id): ################### +def key_pair_create(_context, values): + key_pair_ref = models.KeyPair() + for (key, value) in values.iteritems(): + key_pair_ref[key] = value + key_pair_ref.save() + return key_pair_ref + + +def key_pair_destroy(_context, user_id, name): + session = get_session() + with session.begin(): + key_pair_ref = models.KeyPair.find_by_args(user_id, + name, + session=session) + key_pair_ref.delete(session=session) + + +def key_pair_destroy_all_by_user(_context, user_id): + session = get_session() + with session.begin(): + # TODO(vish): do we have to use sql here? + session.execute('update key_pairs set deleted=1 where user_id=:id', + {'id': user_id}) + + +def key_pair_get(_context, user_id, name): + return models.KeyPair.find_by_args(user_id, name) + + +def key_pair_get_all_by_user(_context, user_id): + session = get_session() + return session.query(models.KeyPair + ).filter_by(user_id=user_id + ).filter_by(deleted=False + ).all() + + +################### + + def network_count(_context): return models.Network.count() @@ -419,11 +602,6 @@ def network_get_by_bridge(_context, bridge): return rv -def network_get_host(context, network_id): - network_ref = network_get(context, network_id) - return network_ref['host'] - - def network_get_index(_context, network_id): session = get_session() with session.begin(): @@ -518,6 +696,37 @@ def export_device_create(_context, values): ################### +def quota_create(_context, values): + quota_ref = models.Quota() + for (key, value) in values.iteritems(): + quota_ref[key] = value + quota_ref.save() + return quota_ref + + +def quota_get(_context, project_id): + return models.Quota.find_by_str(project_id) + + +def quota_update(_context, project_id, values): + session = get_session() + with session.begin(): + quota_ref = models.Quota.find_by_str(project_id, session=session) + for (key, value) in values.iteritems(): + quota_ref[key] = value + quota_ref.save(session=session) + + +def quota_destroy(_context, project_id): + session = get_session() + with session.begin(): + quota_ref = models.Quota.find_by_str(project_id, session=session) + quota_ref.delete(session=session) + + +################### + + def volume_allocate_shelf_and_blade(_context, volume_id): session = get_session() with session.begin(): @@ -555,6 +764,17 @@ def volume_create(_context, values): return volume_ref +def volume_data_get_for_project(_context, project_id): + session = get_session() + result = session.query(func.count(models.Volume.id), + func.sum(models.Volume.size) + ).filter_by(project_id=project_id + ).filter_by(deleted=False + ).first() + # NOTE(vish): convert None to 0 + return (result[0] or 0, result[1] or 0) + + def volume_destroy(_context, volume_id): session = get_session() with session.begin(): @@ -577,29 +797,24 @@ def volume_detached(_context, volume_id): volume_ref.save(session=session) -def volume_get(_context, volume_id): - return models.Volume.find(volume_id) +def volume_get(context, volume_id): + return models.Volume.find(volume_id, deleted=_deleted(context)) -def volume_get_all(_context): - return models.Volume.all() +def volume_get_all(context): + return models.Volume.all(deleted=_deleted(context)) -def volume_get_by_project(_context, project_id): +def volume_get_by_project(context, project_id): session = get_session() return session.query(models.Volume ).filter_by(project_id=project_id - ).filter_by(deleted=False + ).filter_by(deleted=_deleted(context) ).all() -def volume_get_by_str(_context, str_id): - return models.Volume.find_by_str(str_id) - - -def volume_get_host(context, volume_id): - volume_ref = volume_get(context, volume_id) - return volume_ref['host'] +def volume_get_by_str(context, str_id): + return models.Volume.find_by_str(str_id, deleted=_deleted(context)) def volume_get_instance(_context, volume_id): diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py index 424906c1f..2392f1c86 100644 --- a/nova/db/sqlalchemy/models.py +++ b/nova/db/sqlalchemy/models.py @@ -24,9 +24,8 @@ import sys import datetime # TODO(vish): clean up these imports -from sqlalchemy.orm import relationship, backref, validates, exc -from sqlalchemy.sql import func -from sqlalchemy import Column, Integer, String, Table +from sqlalchemy.orm import relationship, backref, exc, object_mapper +from sqlalchemy import Table, Column, Integer, String from sqlalchemy import ForeignKey, DateTime, Boolean, Text from sqlalchemy.ext.declarative import declarative_base @@ -46,47 +45,48 @@ class NovaBase(object): __table_args__ = {'mysql_engine': 'InnoDB'} __table_initialized__ = False __prefix__ = 'none' - created_at = Column(DateTime, default=func.now()) - updated_at = Column(DateTime, onupdate=datetime.datetime.now) + created_at = Column(DateTime, default=datetime.datetime.utcnow) + updated_at = Column(DateTime, onupdate=datetime.datetime.utcnow) + deleted_at = Column(DateTime) deleted = Column(Boolean, default=False) @classmethod - def all(cls, session=None): + def all(cls, session=None, deleted=False): """Get all objects of this type""" if not session: session = get_session() return session.query(cls - ).filter_by(deleted=False + ).filter_by(deleted=deleted ).all() @classmethod - def count(cls, session=None): + def count(cls, session=None, deleted=False): """Count objects of this type""" if not session: session = get_session() return session.query(cls - ).filter_by(deleted=False + ).filter_by(deleted=deleted ).count() @classmethod - def find(cls, obj_id, session=None): + def find(cls, obj_id, session=None, deleted=False): """Find object by id""" if not session: session = get_session() try: return session.query(cls ).filter_by(id=obj_id - ).filter_by(deleted=False + ).filter_by(deleted=deleted ).one() except exc.NoResultFound: new_exc = exception.NotFound("No model for id %s" % obj_id) raise new_exc.__class__, new_exc, sys.exc_info()[2] @classmethod - def find_by_str(cls, str_id, session=None): + def find_by_str(cls, str_id, session=None, deleted=False): """Find object by str_id""" int_id = int(str_id.rpartition('-')[2]) - return cls.find(int_id, session=session) + return cls.find(int_id, session=session, deleted=deleted) @property def str_id(self): @@ -103,6 +103,7 @@ class NovaBase(object): def delete(self, session=None): """Delete this object""" self.deleted = True + self.deleted_at = datetime.datetime.utcnow() self.save(session=session) def __setitem__(self, key, value): @@ -111,6 +112,14 @@ class NovaBase(object): def __getitem__(self, key): return getattr(self, key) + def __iter__(self): + self._i = iter(object_mapper(self).columns) + return self + + def next(self): + n = self._i.next().name + return n, getattr(self, n) + # TODO(vish): Store images in the database instead of file system #class Image(BASE, NovaBase): # """Represents an image in the datastore""" @@ -166,14 +175,14 @@ class Service(BASE, NovaBase): report_count = Column(Integer, nullable=False, default=0) @classmethod - def find_by_args(cls, host, binary, session=None): + def find_by_args(cls, host, binary, session=None, deleted=False): if not session: session = get_session() try: return session.query(cls ).filter_by(host=host ).filter_by(binary=binary - ).filter_by(deleted=False + ).filter_by(deleted=deleted ).one() except exc.NoResultFound: new_exc = exception.NotFound("No model for %s, %s" % (host, @@ -219,6 +228,11 @@ class Instance(BASE, NovaBase): state = Column(Integer) state_description = Column(String(255)) + memory_mb = Column(Integer) + vcpus = Column(Integer) + local_gb = Column(Integer) + + hostname = Column(String(255)) host = Column(String(255)) # , ForeignKey('hosts.id')) @@ -229,6 +243,9 @@ class Instance(BASE, NovaBase): reservation_id = Column(String(255)) mac_address = Column(String(255)) + scheduled_at = Column(DateTime) + launched_at = Column(DateTime) + terminated_at = Column(DateTime) # TODO(vish): see Ewan's email about state improvements, probably # should be in a driver base class or some such # vmstate_state = running, halted, suspended, paused @@ -260,6 +277,39 @@ class Volume(BASE, NovaBase): status = Column(String(255)) # TODO(vish): enum? attach_status = Column(String(255)) # TODO(vish): enum + scheduled_at = Column(DateTime) + launched_at = Column(DateTime) + terminated_at = Column(DateTime) + +class Quota(BASE, NovaBase): + """Represents quota overrides for a project""" + __tablename__ = 'quotas' + id = Column(Integer, primary_key=True) + + project_id = Column(String(255)) + + instances = Column(Integer) + cores = Column(Integer) + volumes = Column(Integer) + gigabytes = Column(Integer) + floating_ips = Column(Integer) + + @property + def str_id(self): + return self.project_id + + @classmethod + def find_by_str(cls, str_id, session=None, deleted=False): + if not session: + session = get_session() + try: + return session.query(cls + ).filter_by(project_id=str_id + ).filter_by(deleted=deleted + ).one() + except exc.NoResultFound: + new_exc = exception.NotFound("No model for project_id %s" % str_id) + raise new_exc.__class__, new_exc, sys.exc_info()[2] class ExportDevice(BASE, NovaBase): """Represates a shelf and blade that a volume can be exported on""" @@ -283,10 +333,8 @@ class SecurityGroup(BASE, NovaBase): """Represents a security group""" __tablename__ = 'security_group' id = Column(Integer, primary_key=True) - name = Column(String(255)) description = Column(String(255)) - user_id = Column(String(255)) project_id = Column(String(255)) @@ -323,6 +371,39 @@ class SecurityGroupIngressRule(BASE, NovaBase): group_id = Column(Integer, ForeignKey('security_group.id')) +class KeyPair(BASE, NovaBase): + """Represents a public key pair for ssh""" + __tablename__ = 'key_pairs' + id = Column(Integer, primary_key=True) + name = Column(String(255)) + user_id = Column(String(255)) + fingerprint = Column(String(255)) + public_key = Column(Text) + + @property + def str_id(self): + return '%s.%s' % (self.user_id, self.name) + + @classmethod + def find_by_str(cls, str_id, session=None, deleted=False): + user_id, _sep, name = str_id.partition('.') + return cls.find_by_str(user_id, name, session, deleted) + + @classmethod + def find_by_args(cls, user_id, name, session=None, deleted=False): + if not session: + session = get_session() + try: + return session.query(cls + ).filter_by(user_id=user_id + ).filter_by(name=name + ).filter_by(deleted=deleted + ).one() + except exc.NoResultFound: + new_exc = exception.NotFound("No model for user %s, name %s" % + (user_id, name)) + raise new_exc.__class__, new_exc, sys.exc_info()[2] + class Network(BASE, NovaBase): """Represents a network""" __tablename__ = 'networks' @@ -381,13 +462,13 @@ class FixedIp(BASE, NovaBase): return self.address @classmethod - def find_by_str(cls, str_id, session=None): + def find_by_str(cls, str_id, session=None, deleted=False): if not session: session = get_session() try: return session.query(cls ).filter_by(address=str_id - ).filter_by(deleted=False + ).filter_by(deleted=deleted ).one() except exc.NoResultFound: new_exc = exception.NotFound("No model for address %s" % str_id) @@ -410,13 +491,13 @@ class FloatingIp(BASE, NovaBase): return self.address @classmethod - def find_by_str(cls, str_id, session=None): + def find_by_str(cls, str_id, session=None, deleted=False): if not session: session = get_session() try: return session.query(cls ).filter_by(address=str_id - ).filter_by(deleted=False + ).filter_by(deleted=deleted ).one() except exc.NoResultFound: new_exc = exception.NotFound("No model for address %s" % str_id) diff --git a/nova/endpoint/api.py b/nova/endpoint/api.py index 40be00bb7..12eedfe67 100755 --- a/nova/endpoint/api.py +++ b/nova/endpoint/api.py @@ -304,7 +304,10 @@ class APIRequestHandler(tornado.web.RequestHandler): try: failure.raiseException() except exception.ApiError as ex: - self._error(type(ex).__name__ + "." + ex.code, ex.message) + if ex.code: + self._error(ex.code, ex.message) + else: + self._error(type(ex).__name__, ex.message) # TODO(vish): do something more useful with unknown exceptions except Exception as ex: self._error(type(ex).__name__, str(ex)) diff --git a/nova/endpoint/cloud.py b/nova/endpoint/cloud.py index d1ccf24ff..82af63aba 100644 --- a/nova/endpoint/cloud.py +++ b/nova/endpoint/cloud.py @@ -23,6 +23,7 @@ datastore. """ import base64 +import datetime import logging import os import time @@ -31,13 +32,14 @@ import IPy from twisted.internet import defer +from nova import crypto from nova import db from nova import exception from nova import flags +from nova import quota from nova import rpc from nova import utils from nova.auth import rbac -from nova.auth import manager from nova.compute.instance_types import INSTANCE_TYPES from nova.endpoint import images @@ -47,14 +49,35 @@ flags.DECLARE('storage_availability_zone', 'nova.volume.manager') InvalidInputException = exception.InvalidInputException -def _gen_key(user_id, key_name): - """ Tuck this into AuthManager """ +class QuotaError(exception.ApiError): + """Quota Exceeeded""" + pass + + +def _gen_key(context, user_id, key_name): + """Generate a key + + This is a module level method because it is slow and we need to defer + it into a process pool.""" try: - mgr = manager.AuthManager() - private_key, fingerprint = mgr.generate_key_pair(user_id, key_name) + # NOTE(vish): generating key pair is slow so check for legal + # creation before creating key_pair + try: + db.key_pair_get(context, user_id, key_name) + raise exception.Duplicate("The key_pair %s already exists" + % key_name) + except exception.NotFound: + pass + private_key, public_key, fingerprint = crypto.generate_key_pair() + key = {} + key['user_id'] = user_id + key['name'] = key_name + key['public_key'] = public_key + key['fingerprint'] = fingerprint + db.key_pair_create(context, key) + return {'private_key': private_key, 'fingerprint': fingerprint} except Exception as ex: return {'exception': ex} - return {'private_key': private_key, 'fingerprint': fingerprint} class CloudController(object): @@ -87,13 +110,15 @@ class CloudController(object): def _get_mpi_data(self, project_id): result = {} - for instance in db.instance_get_by_project(project_id): - line = '%s slots=%d' % (instance.fixed_ip['str_id'], - INSTANCE_TYPES[instance['instance_type']]['vcpus']) - if instance['key_name'] in result: - result[instance['key_name']].append(line) - else: - result[instance['key_name']] = [line] + for instance in db.instance_get_by_project(None, project_id): + if instance['fixed_ip']: + line = '%s slots=%d' % (instance['fixed_ip']['str_id'], + INSTANCE_TYPES[instance['instance_type']]['vcpus']) + key = str(instance['key_name']) + if key in result: + result[key].append(line) + else: + result[key] = [line] return result def _trigger_refresh_security_group(self, security_group): @@ -119,13 +144,13 @@ class CloudController(object): else: keys = '' hostname = instance_ref['hostname'] - floating_ip = db.instance_get_floating_ip_address(None, - instance_ref['id']) + floating_ip = db.instance_get_floating_address(None, + instance_ref['id']) data = { 'user-data': base64.b64decode(instance_ref['user_data']), 'meta-data': { 'ami-id': instance_ref['image_id'], - 'ami-launch-index': instance_ref['ami_launch_index'], + 'ami-launch-index': instance_ref['launch_index'], 'ami-manifest-path': 'FIXME', 'block-device-mapping': { # TODO(vish): replace with real data 'ami': 'sda1', @@ -141,7 +166,7 @@ class CloudController(object): 'local-ipv4': address, 'kernel-id': instance_ref['kernel_id'], 'placement': { - 'availaibility-zone': instance_ref['availability_zone'], + 'availability-zone': 'nova' # TODO(vish): real zone }, 'public-hostname': hostname, 'public-ipv4': floating_ip or '', @@ -165,9 +190,18 @@ class CloudController(object): @rbac.allow('all') def describe_regions(self, context, region_name=None, **kwargs): - # TODO(vish): region_name is an array. Support filtering - return {'regionInfo': [{'regionName': 'nova', - 'regionUrl': FLAGS.ec2_url}]} + if FLAGS.region_list: + regions = [] + for region in FLAGS.region_list: + name, _sep, url = region.partition('=') + regions.append({'regionName': name, + 'regionEndpoint': url}) + else: + regions = [{'regionName': 'nova', + 'regionEndpoint': FLAGS.ec2_url}] + if region_name: + regions = [r for r in regions if r['regionName'] in region_name] + return {'regionInfo': regions } @rbac.allow('all') def describe_snapshots(self, @@ -187,18 +221,18 @@ class CloudController(object): @rbac.allow('all') def describe_key_pairs(self, context, key_name=None, **kwargs): - key_pairs = context.user.get_key_pairs() + key_pairs = db.key_pair_get_all_by_user(context, context.user.id) if not key_name is None: - key_pairs = [x for x in key_pairs if x.name in key_name] + key_pairs = [x for x in key_pairs if x['name'] in key_name] result = [] for key_pair in key_pairs: # filter out the vpn keys suffix = FLAGS.vpn_key_suffix - if context.user.is_admin() or not key_pair.name.endswith(suffix): + if context.user.is_admin() or not key_pair['name'].endswith(suffix): result.append({ - 'keyName': key_pair.name, - 'keyFingerprint': key_pair.fingerprint, + 'keyName': key_pair['name'], + 'keyFingerprint': key_pair['fingerprint'], }) return {'keypairsSet': result} @@ -214,14 +248,18 @@ class CloudController(object): dcall.callback({'keyName': key_name, 'keyFingerprint': kwargs['fingerprint'], 'keyMaterial': kwargs['private_key']}) - pool.apply_async(_gen_key, [context.user.id, key_name], + # TODO(vish): when context is no longer an object, pass it here + pool.apply_async(_gen_key, [None, context.user.id, key_name], callback=_complete) return dcall @rbac.allow('all') def delete_key_pair(self, context, key_name, **kwargs): - context.user.delete_key_pair(key_name) - # aws returns true even if the key doens't exist + try: + db.key_pair_destroy(context, context.user.id, key_name) + except exception.NotFound: + # aws returns true even if the key doesn't exist + pass return True @rbac.allow('all') @@ -419,7 +457,7 @@ class CloudController(object): v['status'] = '%s (%s, %s, %s, %s)' % ( volume['status'], volume['user_id'], - 'host', + volume['host'], volume['instance_id'], volume['mountpoint']) if volume['attach_status'] == 'attached': @@ -435,6 +473,14 @@ class CloudController(object): @rbac.allow('projectmanager', 'sysadmin') def create_volume(self, context, size, **kwargs): + # check quota + size = int(size) + if quota.allowed_volumes(context, 1, size) < 1: + logging.warn("Quota exceeeded for %s, tried to create %sG volume", + context.project.id, size) + raise QuotaError("Volume quota exceeded. You cannot " + "create a volume of size %s" % + size) vol = {} vol['size'] = size vol['user_id'] = context.user.id @@ -444,9 +490,11 @@ class CloudController(object): vol['attach_status'] = "detached" volume_ref = db.volume_create(context, vol) - rpc.cast(FLAGS.volume_topic, {"method": "create_volume", - "args": {"context": None, - "volume_id": volume_ref['id']}}) + rpc.cast(FLAGS.scheduler_topic, + {"method": "create_volume", + "args": {"context": None, + "topic": FLAGS.volume_topic, + "volume_id": volume_ref['id']}}) return {'volumeSet': [self._format_volume(context, volume_ref)]} @@ -455,10 +503,12 @@ class CloudController(object): def attach_volume(self, context, volume_id, instance_id, device, **kwargs): volume_ref = db.volume_get_by_str(context, volume_id) # TODO(vish): abstract status checking? + if volume_ref['status'] != "available": + raise exception.ApiError("Volume status must be available") if volume_ref['attach_status'] == "attached": raise exception.ApiError("Volume is already attached") instance_ref = db.instance_get_by_str(context, instance_id) - host = db.instance_get_host(context, instance_ref['id']) + host = instance_ref['host'] rpc.cast(db.queue_get_for(context, FLAGS.compute_topic, host), {"method": "attach_volume", "args": {"context": None, @@ -477,12 +527,12 @@ class CloudController(object): volume_ref = db.volume_get_by_str(context, volume_id) instance_ref = db.volume_get_instance(context, volume_ref['id']) if not instance_ref: - raise exception.Error("Volume isn't attached to anything!") + raise exception.ApiError("Volume isn't attached to anything!") # TODO(vish): abstract status checking? if volume_ref['status'] == "available": - raise exception.Error("Volume is already detached") + raise exception.ApiError("Volume is already detached") try: - host = db.instance_get_host(context, instance_ref['id']) + host = instance_ref['host'] rpc.cast(db.queue_get_for(context, FLAGS.compute_topic, host), {"method": "detach_volume", "args": {"context": None, @@ -521,12 +571,14 @@ class CloudController(object): def _format_instances(self, context, reservation_id=None): reservations = {} if reservation_id: - instances = db.instance_get_by_reservation(context, reservation_id) + instances = db.instance_get_by_reservation(context, + reservation_id) else: - if not context.user.is_admin(): + if context.user.is_admin(): instances = db.instance_get_all(context) else: - instances = db.instance_get_by_project(context, context.project.id) + instances = db.instance_get_by_project(context, + context.project.id) for instance in instances: if not context.user.is_admin(): if instance['image_id'] == FLAGS.vpn_image_id: @@ -538,15 +590,16 @@ class CloudController(object): 'code': instance['state'], 'name': instance['state_description'] } - floating_addr = db.instance_get_floating_address(context, - instance['id']) - i['publicDnsName'] = floating_addr - fixed_addr = db.instance_get_fixed_address(context, - instance['id']) + fixed_addr = None + floating_addr = None + if instance['fixed_ip']: + fixed_addr = instance['fixed_ip']['str_id'] + if instance['fixed_ip']['floating_ips']: + fixed = instance['fixed_ip'] + floating_addr = fixed['floating_ips'][0]['str_id'] i['privateDnsName'] = fixed_addr - if not i['publicDnsName']: - i['publicDnsName'] = i['privateDnsName'] - i['dnsName'] = None + i['publicDnsName'] = floating_addr + i['dnsName'] = i['publicDnsName'] or i['privateDnsName'] i['keyName'] = instance['key_name'] if context.user.is_admin(): i['keyName'] = '%s (%s, %s)' % (i['keyName'], @@ -579,10 +632,13 @@ class CloudController(object): iterator = db.floating_ip_get_by_project(context, context.project.id) for floating_ip_ref in iterator: - address = floating_ip_ref['id_str'] - instance_ref = db.floating_ip_get_instance(address) + address = floating_ip_ref['str_id'] + instance_id = None + if (floating_ip_ref['fixed_ip'] + and floating_ip_ref['fixed_ip']['instance']): + instance_id = floating_ip_ref['fixed_ip']['instance']['str_id'] address_rv = {'public_ip': address, - 'instance_id': instance_ref['id_str']} + 'instance_id': instance_id} if context.user.is_admin(): details = "%s (%s)" % (address_rv['instance_id'], floating_ip_ref['project_id']) @@ -593,6 +649,12 @@ class CloudController(object): @rbac.allow('netadmin') @defer.inlineCallbacks def allocate_address(self, context, **kwargs): + # check quota + if quota.allowed_floating_ips(context, 1) < 1: + logging.warn("Quota exceeeded for %s, tried to allocate address", + context.project.id) + raise QuotaError("Address quota exceeded. You cannot " + "allocate any more addresses") network_topic = yield self._get_network_topic(context) public_ip = yield rpc.call(network_topic, {"method": "allocate_floating_ip", @@ -607,9 +669,9 @@ class CloudController(object): floating_ip_ref = db.floating_ip_get_by_address(context, public_ip) network_topic = yield self._get_network_topic(context) rpc.cast(network_topic, - {"method": "deallocate_floating_ip", - "args": {"context": None, - "floating_ip": floating_ip_ref['str_id']}}) + {"method": "deallocate_floating_ip", + "args": {"context": None, + "floating_address": floating_ip_ref['str_id']}}) defer.returnValue({'releaseResponse': ["Address released."]}) @rbac.allow('netadmin') @@ -620,11 +682,10 @@ class CloudController(object): floating_ip_ref = db.floating_ip_get_by_address(context, public_ip) network_topic = yield self._get_network_topic(context) rpc.cast(network_topic, - {"method": "associate_floating_ip", - "args": {"context": None, - "floating_ip": floating_ip_ref['str_id'], - "fixed_ip": fixed_ip_ref['str_id'], - "instance_id": instance_ref['id']}}) + {"method": "associate_floating_ip", + "args": {"context": None, + "floating_address": floating_ip_ref['str_id'], + "fixed_address": fixed_ip_ref['str_id']}}) defer.returnValue({'associateResponse': ["Address associated."]}) @rbac.allow('netadmin') @@ -633,26 +694,42 @@ class CloudController(object): floating_ip_ref = db.floating_ip_get_by_address(context, public_ip) network_topic = yield self._get_network_topic(context) rpc.cast(network_topic, - {"method": "disassociate_floating_ip", - "args": {"context": None, - "floating_ip": floating_ip_ref['str_id']}}) + {"method": "disassociate_floating_ip", + "args": {"context": None, + "floating_address": floating_ip_ref['str_id']}}) defer.returnValue({'disassociateResponse': ["Address disassociated."]}) @defer.inlineCallbacks def _get_network_topic(self, context): """Retrieves the network host for a project""" network_ref = db.project_get_network(context, context.project.id) - host = db.network_get_host(context, network_ref['id']) + host = network_ref['host'] if not host: host = yield rpc.call(FLAGS.network_topic, - {"method": "set_network_host", - "args": {"context": None, - "project_id": context.project.id}}) + {"method": "set_network_host", + "args": {"context": None, + "project_id": context.project.id}}) defer.returnValue(db.queue_get_for(context, FLAGS.network_topic, host)) @rbac.allow('projectmanager', 'sysadmin') @defer.inlineCallbacks def run_instances(self, context, **kwargs): + instance_type = kwargs.get('instance_type', 'm1.small') + if instance_type not in INSTANCE_TYPES: + raise exception.ApiError("Unknown instance type: %s", + instance_type) + # check quota + max_instances = int(kwargs.get('max_count', 1)) + min_instances = int(kwargs.get('min_count', max_instances)) + num_instances = quota.allowed_instances(context, + max_instances, + instance_type) + if num_instances < min_instances: + logging.warn("Quota exceeeded for %s, tried to run %s instances", + context.project.id, min_instances) + raise QuotaError("Instance quota exceeded. You can only " + "run %s more instances of this type." % + num_instances, "InstanceLimitExceeded") # make sure user can access the image # vpn image is private so it doesn't show up on lists vpn = kwargs['image_id'] == FLAGS.vpn_image_id @@ -674,15 +751,14 @@ class CloudController(object): images.get(context, kernel_id) images.get(context, ramdisk_id) - logging.debug("Going to run instances...") + logging.debug("Going to run %s instances...", num_instances) launch_time = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) key_data = None if kwargs.has_key('key_name'): - key_pair = context.user.get_key_pair(kwargs['key_name']) - if not key_pair: - raise exception.ApiError('Key Pair %s not found' % - kwargs['key_name']) - key_data = key_pair.public_key + key_pair_ref = db.key_pair_get(context, + context.user.id, + kwargs['key_name']) + key_data = key_pair_ref['public_key'] security_group_arg = kwargs.get('security_group', ["default"]) if not type(security_group_arg) is list: @@ -697,6 +773,7 @@ class CloudController(object): reservation_id = utils.generate_uid('r') base_options = {} + base_options['state_description'] = 'scheduling' base_options['image_id'] = image_id base_options['kernel_id'] = kernel_id base_options['ramdisk_id'] = ramdisk_id @@ -706,10 +783,15 @@ class CloudController(object): base_options['user_id'] = context.user.id base_options['project_id'] = context.project.id base_options['user_data'] = kwargs.get('user_data', '') - base_options['instance_type'] = kwargs.get('instance_type', 'm1.small') - for num in range(int(kwargs['max_count'])): - inst_id = db.instance_create(context, base_options) + type_data = INSTANCE_TYPES[instance_type] + base_options['memory_mb'] = type_data['memory_mb'] + base_options['vcpus'] = type_data['vcpus'] + base_options['local_gb'] = type_data['local_gb'] + + for num in range(num_instances): + instance_ref = db.instance_create(context, base_options) + inst_id = instance_ref['id'] for security_group_id in security_groups: db.instance_add_security_group(context, inst_id, @@ -718,7 +800,7 @@ class CloudController(object): inst = {} inst['mac_address'] = utils.generate_mac() inst['launch_index'] = num - inst['hostname'] = inst_id + inst['hostname'] = instance_ref['str_id'] db.instance_update(context, inst_id, inst) address = self.network_manager.allocate_fixed_ip(context, inst_id, @@ -732,11 +814,12 @@ class CloudController(object): "args": {"context": None, "address": address}}) - rpc.cast(FLAGS.compute_topic, - {"method": "run_instance", - "args": {"context": None, - "instance_id": inst_id}}) - logging.debug("Casting to node for %s/%s's instance %s" % + rpc.cast(FLAGS.scheduler_topic, + {"method": "run_instance", + "args": {"context": None, + "topic": FLAGS.compute_topic, + "instance_id": inst_id}}) + logging.debug("Casting to scheduler for %s/%s's instance %s" % (context.project.name, context.user.name, inst_id)) defer.returnValue(self._format_run_instances(context, reservation_id)) @@ -755,6 +838,10 @@ class CloudController(object): % id_str) continue + now = datetime.datetime.utcnow() + db.instance_update(context, + instance_ref['id'], + {'terminated_at': now}) # FIXME(ja): where should network deallocate occur? address = db.instance_get_floating_address(context, instance_ref['id']) @@ -776,9 +863,9 @@ class CloudController(object): # NOTE(vish): Currently, nothing needs to be done on the # network node until release. If this changes, # we will need to cast here. - db.fixed_ip_deallocate(context, address) + self.network_manager.deallocate_fixed_ip(context, address) - host = db.instance_get_host(context, instance_ref['id']) + host = instance_ref['host'] if host: rpc.cast(db.queue_get_for(context, FLAGS.compute_topic, host), {"method": "terminate_instance", @@ -793,7 +880,7 @@ class CloudController(object): """instance_id is a list of instance ids""" for id_str in instance_id: instance_ref = db.instance_get_by_str(context, id_str) - host = db.instance_get_host(context, instance_ref['id']) + host = instance_ref['host'] rpc.cast(db.queue_get_for(context, FLAGS.compute_topic, host), {"method": "reboot_instance", "args": {"context": None, @@ -804,11 +891,15 @@ class CloudController(object): def delete_volume(self, context, volume_id, **kwargs): # TODO: return error if not authorized volume_ref = db.volume_get_by_str(context, volume_id) - host = db.volume_get_host(context, volume_ref['id']) - rpc.cast(db.queue_get_for(context, FLAGS.compute_topic, host), + if volume_ref['status'] != "available": + raise exception.ApiError("Volume status must be available") + now = datetime.datetime.utcnow() + db.volume_update(context, volume_ref['id'], {'terminated_at': now}) + host = volume_ref['host'] + rpc.cast(db.queue_get_for(context, FLAGS.volume_topic, host), {"method": "delete_volume", "args": {"context": None, - "volume_id": volume_id}}) + "volume_id": volume_ref['id']}}) return defer.succeed(True) @rbac.allow('all') diff --git a/nova/exception.py b/nova/exception.py index 43e5c36c6..f157fab2d 100644 --- a/nova/exception.py +++ b/nova/exception.py @@ -26,6 +26,18 @@ import sys import traceback +class ProcessExecutionError(IOError): + def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None, + description=None): + if description is None: + description = "Unexpected error while running command." + if exit_code is None: + exit_code = '-' + message = "%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r" % ( + description, cmd, exit_code, stdout, stderr) + IOError.__init__(self, message) + + class Error(Exception): def __init__(self, message=None): super(Error, self).__init__(message) diff --git a/nova/flags.py b/nova/flags.py index 7b0c95a3c..6a1c14490 100644 --- a/nova/flags.py +++ b/nova/flags.py @@ -167,10 +167,14 @@ def DECLARE(name, module_string, flag_values=FLAGS): # Define any app-specific flags in their own files, docs at: # http://code.google.com/p/python-gflags/source/browse/trunk/gflags.py#39 +DEFINE_list('region_list', + [], + 'list of region=url pairs separated by commas') DEFINE_string('connection_type', 'libvirt', 'libvirt, xenapi or fake') DEFINE_integer('s3_port', 3333, 's3 port') DEFINE_string('s3_host', '127.0.0.1', 's3 host') DEFINE_string('compute_topic', 'compute', 'the topic compute nodes listen on') +DEFINE_string('scheduler_topic', 'scheduler', 'the topic scheduler nodes listen on') DEFINE_string('volume_topic', 'volume', 'the topic volume nodes listen on') DEFINE_string('network_topic', 'network', 'the topic network nodes listen on') @@ -213,6 +217,8 @@ DEFINE_string('network_manager', 'nova.network.manager.VlanManager', 'Manager for network') DEFINE_string('volume_manager', 'nova.volume.manager.AOEManager', 'Manager for volume') +DEFINE_string('scheduler_manager', 'nova.scheduler.manager.SchedulerManager', + 'Manager for scheduler') DEFINE_string('host', socket.gethostname(), 'name of this node') diff --git a/nova/network/manager.py b/nova/network/manager.py index bca3217f0..e2dc846a6 100644 --- a/nova/network/manager.py +++ b/nova/network/manager.py @@ -61,6 +61,8 @@ flags.DEFINE_integer('cnt_vpn_clients', 5, 'Number of addresses reserved for vpn clients') flags.DEFINE_string('network_driver', 'nova.network.linux_net', 'Driver to use for network creation') +flags.DEFINE_bool('update_dhcp_on_disassociate', False, + 'Whether to update dhcp when fixed_ip is disassocated') class AddressAlreadyAllocated(exception.Error): @@ -68,11 +70,6 @@ class AddressAlreadyAllocated(exception.Error): pass -class AddressNotAllocated(exception.Error): - """Address has not been allocated""" - pass - - class NetworkManager(manager.Manager): """Implements common network manager functionality @@ -93,7 +90,7 @@ class NetworkManager(manager.Manager): network_id = network_ref['id'] host = self.db.network_set_host(context, network_id, - FLAGS.host) + self.host) self._on_set_network_host(context, network_id) return host @@ -101,6 +98,10 @@ class NetworkManager(manager.Manager): """Gets a fixed ip from the pool""" raise NotImplementedError() + def deallocate_fixed_ip(self, context, instance_id, *args, **kwargs): + """Returns a fixed ip to the pool""" + raise NotImplementedError() + def setup_fixed_ip(self, context, address): """Sets up rules for fixed ip""" raise NotImplementedError() @@ -117,7 +118,7 @@ class NetworkManager(manager.Manager): """Gets an floating ip from the pool""" # TODO(vish): add floating ips through manage command return self.db.floating_ip_allocate_address(context, - FLAGS.host, + self.host, project_id) def associate_floating_ip(self, context, floating_address, fixed_address): @@ -175,10 +176,17 @@ class FlatManager(NetworkManager): def allocate_fixed_ip(self, context, instance_id, *args, **kwargs): """Gets a fixed ip from the pool""" network_ref = self.db.project_get_network(context, context.project.id) - address = self.db.fixed_ip_allocate(context, network_ref['id']) - self.db.fixed_ip_instance_associate(context, address, instance_id) + address = self.db.fixed_ip_associate_pool(context, + network_ref['id'], + instance_id) + self.db.fixed_ip_update(context, address, {'allocated': True}) return address + def deallocate_fixed_ip(self, context, address, *args, **kwargs): + """Returns a fixed ip to the pool""" + self.db.fixed_ip_update(context, address, {'allocated': False}) + self.db.fixed_ip_disassociate(context, address) + def setup_compute_network(self, context, project_id): """Network is created manually""" pass @@ -213,13 +221,29 @@ class VlanManager(NetworkManager): """Gets a fixed ip from the pool""" network_ref = self.db.project_get_network(context, context.project.id) if kwargs.get('vpn', None): - address = self._allocate_vpn_ip(context, network_ref['id']) + address = network_ref['vpn_private_address'] + self.db.fixed_ip_associate(context, address, instance_id) else: - address = self.db.fixed_ip_allocate(context, - network_ref['id']) - self.db.fixed_ip_instance_associate(context, address, instance_id) + address = self.db.fixed_ip_associate_pool(context, + network_ref['id'], + instance_id) + self.db.fixed_ip_update(context, address, {'allocated': True}) return address + def deallocate_fixed_ip(self, context, address, *args, **kwargs): + """Returns a fixed ip to the pool""" + self.db.fixed_ip_update(context, address, {'allocated': False}) + fixed_ip_ref = self.db.fixed_ip_get_by_address(context, address) + if not fixed_ip_ref['leased']: + self.db.fixed_ip_disassociate(context, address) + # NOTE(vish): dhcp server isn't updated until next setup, this + # means there will stale entries in the conf file + # the code below will update the file if necessary + if FLAGS.update_dhcp_on_disassociate: + network_ref = self.db.fixed_ip_get_network(context, address) + self.driver.update_dhcp(context, network_ref['id']) + + def setup_fixed_ip(self, context, address): """Sets forwarding rules and dhcp for fixed ip""" fixed_ip_ref = self.db.fixed_ip_get_by_address(context, address) @@ -230,22 +254,47 @@ class VlanManager(NetworkManager): network_ref['vpn_private_address']) self.driver.update_dhcp(context, network_ref['id']) - def lease_fixed_ip(self, context, address): + def lease_fixed_ip(self, context, mac, address): """Called by dhcp-bridge when ip is leased""" logging.debug("Leasing IP %s", address) fixed_ip_ref = self.db.fixed_ip_get_by_address(context, address) if not fixed_ip_ref['allocated']: - raise AddressNotAllocated(address) + logging.warn("IP %s leased that was already deallocated", address) + return + instance_ref = self.db.fixed_ip_get_instance(context, address) + if not instance_ref: + raise exception.Error("IP %s leased that isn't associated" % + address) + if instance_ref['mac_address'] != mac: + raise exception.Error("IP %s leased to bad mac %s vs %s" % + (address, instance_ref['mac_address'], mac)) self.db.fixed_ip_update(context, fixed_ip_ref['str_id'], {'leased': True}) - def release_fixed_ip(self, context, address): + def release_fixed_ip(self, context, mac, address): """Called by dhcp-bridge when ip is released""" logging.debug("Releasing IP %s", address) - self.db.fixed_ip_update(context, address, {'allocated': False, - 'leased': False}) - self.db.fixed_ip_instance_disassociate(context, address) + fixed_ip_ref = self.db.fixed_ip_get_by_address(context, address) + if not fixed_ip_ref['leased']: + logging.warn("IP %s released that was not leased", address) + return + instance_ref = self.db.fixed_ip_get_instance(context, address) + if not instance_ref: + raise exception.Error("IP %s released that isn't associated" % + address) + if instance_ref['mac_address'] != mac: + raise exception.Error("IP %s released from bad mac %s vs %s" % + (address, instance_ref['mac_address'], mac)) + self.db.fixed_ip_update(context, address, {'leased': False}) + if not fixed_ip_ref['allocated']: + self.db.fixed_ip_disassociate(context, address) + # NOTE(vish): dhcp server isn't updated until next setup, this + # means there will stale entries in the conf file + # the code below will update the file if necessary + if FLAGS.update_dhcp_on_disassociate: + network_ref = self.db.fixed_ip_get_network(context, address) + self.driver.update_dhcp(context, network_ref['id']) def allocate_network(self, context, project_id): """Set up the network""" @@ -287,19 +336,6 @@ class VlanManager(NetworkManager): # TODO(vish): Implement this pass - @staticmethod - def _allocate_vpn_ip(context, network_id): - """Allocate vpn ip for network""" - # TODO(vish): There is a possible concurrency issue here. - network_ref = db.network_get(context, network_id) - address = network_ref['vpn_private_address'] - fixed_ip_ref = db.fixed_ip_get_by_address(context, address) - # TODO(vish): Should this be fixed_ip_is_allocated? - if fixed_ip_ref['allocated']: - raise AddressAlreadyAllocated() - db.fixed_ip_update(context, fixed_ip_ref['id'], {'allocated': True}) - return fixed_ip_ref['str_id'] - def _ensure_indexes(self, context): """Ensure the indexes for the network exist diff --git a/nova/process.py b/nova/process.py index bda8147d5..13cb90e82 100644 --- a/nova/process.py +++ b/nova/process.py @@ -30,7 +30,7 @@ from twisted.internet import protocol from twisted.internet import reactor from nova import flags -from nova.utils import ProcessExecutionError +from nova.exception import ProcessExecutionError FLAGS = flags.FLAGS flags.DEFINE_integer('process_pool_size', 4, @@ -127,7 +127,7 @@ def get_process_output(executable, args=None, env=None, path=None, deferred = defer.Deferred() cmd = executable if args: - cmd = cmd + " " + ' '.join(args) + cmd = " ".join([cmd] + args) logging.debug("Running cmd: %s", cmd) process_handler = BackRelayWithInput( deferred, @@ -141,8 +141,8 @@ def get_process_output(executable, args=None, env=None, path=None, executable = str(executable) if not args is None: args = [str(x) for x in args] - process_reactor.spawnProcess( process_handler, executable, - (executable,)+tuple(args), env, path) + process_reactor.spawnProcess(process_handler, executable, + (executable,)+tuple(args), env, path) return deferred diff --git a/nova/quota.py b/nova/quota.py new file mode 100644 index 000000000..f0e51feeb --- /dev/null +++ b/nova/quota.py @@ -0,0 +1,91 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +Quotas for instances, volumes, and floating ips +""" + +from nova import db +from nova import exception +from nova import flags +from nova.compute import instance_types + +FLAGS = flags.FLAGS + +flags.DEFINE_integer('quota_instances', 10, + 'number of instances allowed per project') +flags.DEFINE_integer('quota_cores', 20, + 'number of instance cores allowed per project') +flags.DEFINE_integer('quota_volumes', 10, + 'number of volumes allowed per project') +flags.DEFINE_integer('quota_gigabytes', 1000, + 'number of volume gigabytes allowed per project') +flags.DEFINE_integer('quota_floating_ips', 10, + 'number of floating ips allowed per project') + +def _get_quota(context, project_id): + rval = {'instances': FLAGS.quota_instances, + 'cores': FLAGS.quota_cores, + 'volumes': FLAGS.quota_volumes, + 'gigabytes': FLAGS.quota_gigabytes, + 'floating_ips': FLAGS.quota_floating_ips} + try: + quota = db.quota_get(context, project_id) + for key in rval.keys(): + if quota[key] is not None: + rval[key] = quota[key] + except exception.NotFound: + pass + return rval + +def allowed_instances(context, num_instances, instance_type): + """Check quota and return min(num_instances, allowed_instances)""" + project_id = context.project.id + used_instances, used_cores = db.instance_data_get_for_project(context, + project_id) + quota = _get_quota(context, project_id) + allowed_instances = quota['instances'] - used_instances + allowed_cores = quota['cores'] - used_cores + type_cores = instance_types.INSTANCE_TYPES[instance_type]['vcpus'] + num_cores = num_instances * type_cores + allowed_instances = min(allowed_instances, + int(allowed_cores // type_cores)) + return min(num_instances, allowed_instances) + + +def allowed_volumes(context, num_volumes, size): + """Check quota and return min(num_volumes, allowed_volumes)""" + project_id = context.project.id + used_volumes, used_gigabytes = db.volume_data_get_for_project(context, + project_id) + quota = _get_quota(context, project_id) + allowed_volumes = quota['volumes'] - used_volumes + allowed_gigabytes = quota['gigabytes'] - used_gigabytes + num_gigabytes = num_volumes * size + allowed_volumes = min(allowed_volumes, + int(allowed_gigabytes // size)) + return min(num_volumes, allowed_volumes) + + +def allowed_floating_ips(context, num_floating_ips): + """Check quota and return min(num_floating_ips, allowed_floating_ips)""" + project_id = context.project.id + used_floating_ips = db.floating_ip_count_by_project(context, project_id) + quota = _get_quota(context, project_id) + allowed_floating_ips = quota['floating_ips'] - used_floating_ips + return min(num_floating_ips, allowed_floating_ips) + diff --git a/nova/scheduler/__init__.py b/nova/scheduler/__init__.py new file mode 100644 index 000000000..8359a7aeb --- /dev/null +++ b/nova/scheduler/__init__.py @@ -0,0 +1,25 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Openstack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +:mod:`nova.scheduler` -- Scheduler Nodes +===================================================== + +.. automodule:: nova.scheduler + :platform: Unix + :synopsis: Module that picks a compute node to run a VM instance. +.. moduleauthor:: Chris Behrens <cbehrens@codestud.com> +""" diff --git a/nova/scheduler/chance.py b/nova/scheduler/chance.py new file mode 100644 index 000000000..7fd09b053 --- /dev/null +++ b/nova/scheduler/chance.py @@ -0,0 +1,38 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Openstack, LLC. +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Chance (Random) Scheduler implementation +""" + +import random + +from nova.scheduler import driver + + +class ChanceScheduler(driver.Scheduler): + """Implements Scheduler as a random node selector.""" + + def schedule(self, context, topic, *_args, **_kwargs): + """Picks a host that is up at random.""" + + hosts = self.hosts_up(context, topic) + if not hosts: + raise driver.NoValidHost("No hosts found") + return hosts[int(random.random() * len(hosts))] diff --git a/nova/scheduler/driver.py b/nova/scheduler/driver.py new file mode 100644 index 000000000..2e6a5a835 --- /dev/null +++ b/nova/scheduler/driver.py @@ -0,0 +1,58 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Openstack, LLC. +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Scheduler base class that all Schedulers should inherit from +""" + +import datetime + +from nova import db +from nova import exception +from nova import flags + +FLAGS = flags.FLAGS +flags.DEFINE_integer('service_down_time', 60, + 'maximum time since last checkin for up service') + +class NoValidHost(exception.Error): + """There is no valid host for the command.""" + pass + +class Scheduler(object): + """The base class that all Scheduler clases should inherit from.""" + + @staticmethod + def service_is_up(service): + """Check whether a service is up based on last heartbeat.""" + last_heartbeat = service['updated_at'] or service['created_at'] + elapsed = datetime.datetime.now() - last_heartbeat + return elapsed < datetime.timedelta(seconds=FLAGS.service_down_time) + + def hosts_up(self, context, topic): + """Return the list of hosts that have a running service for topic.""" + + services = db.service_get_all_by_topic(context, topic) + return [service.host + for service in services + if self.service_is_up(service)] + + def schedule(self, context, topic, *_args, **_kwargs): + """Must override at least this method for scheduler to work.""" + raise NotImplementedError("Must implement a fallback schedule") diff --git a/nova/scheduler/manager.py b/nova/scheduler/manager.py new file mode 100644 index 000000000..0ad7ca86b --- /dev/null +++ b/nova/scheduler/manager.py @@ -0,0 +1,66 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Openstack, LLC. +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Scheduler Service +""" + +import logging +import functools + +from nova import db +from nova import flags +from nova import manager +from nova import rpc +from nova import utils + +FLAGS = flags.FLAGS +flags.DEFINE_string('scheduler_driver', + 'nova.scheduler.chance.ChanceScheduler', + 'Driver to use for the scheduler') + + +class SchedulerManager(manager.Manager): + """Chooses a host to run instances on.""" + def __init__(self, scheduler_driver=None, *args, **kwargs): + if not scheduler_driver: + scheduler_driver = FLAGS.scheduler_driver + self.driver = utils.import_object(scheduler_driver) + super(SchedulerManager, self).__init__(*args, **kwargs) + + def __getattr__(self, key): + """Converts all method calls to use the schedule method""" + return functools.partial(self._schedule, key) + + def _schedule(self, method, context, topic, *args, **kwargs): + """Tries to call schedule_* method on the driver to retrieve host. + + Falls back to schedule(context, topic) if method doesn't exist. + """ + driver_method = 'schedule_%s' % method + try: + host = getattr(self.driver, driver_method)(context, *args, **kwargs) + except AttributeError: + host = self.driver.schedule(context, topic, *args, **kwargs) + + kwargs.update({"context": None}) + rpc.cast(db.queue_get_for(context, topic, host), + {"method": method, + "args": kwargs}) + logging.debug("Casting to %s %s for %s", topic, host, method) diff --git a/nova/scheduler/simple.py b/nova/scheduler/simple.py new file mode 100644 index 000000000..fdaff74d8 --- /dev/null +++ b/nova/scheduler/simple.py @@ -0,0 +1,90 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Openstack, LLC. +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Simple Scheduler +""" + +import datetime + +from nova import db +from nova import flags +from nova.scheduler import driver +from nova.scheduler import chance + +FLAGS = flags.FLAGS +flags.DEFINE_integer("max_cores", 16, + "maximum number of instance cores to allow per host") +flags.DEFINE_integer("max_gigabytes", 10000, + "maximum number of volume gigabytes to allow per host") +flags.DEFINE_integer("max_networks", 1000, + "maximum number of networks to allow per host") + +class SimpleScheduler(chance.ChanceScheduler): + """Implements Naive Scheduler that tries to find least loaded host.""" + + def schedule_run_instance(self, context, instance_id, *_args, **_kwargs): + """Picks a host that is up and has the fewest running instances.""" + instance_ref = db.instance_get(context, instance_id) + results = db.service_get_all_compute_sorted(context) + for result in results: + (service, instance_cores) = result + if instance_cores + instance_ref['vcpus'] > FLAGS.max_cores: + raise driver.NoValidHost("All hosts have too many cores") + if self.service_is_up(service): + # NOTE(vish): this probably belongs in the manager, if we + # can generalize this somehow + now = datetime.datetime.utcnow() + db.instance_update(context, + instance_id, + {'host': service['host'], + 'scheduled_at': now}) + return service['host'] + raise driver.NoValidHost("No hosts found") + + def schedule_create_volume(self, context, volume_id, *_args, **_kwargs): + """Picks a host that is up and has the fewest volumes.""" + volume_ref = db.volume_get(context, volume_id) + results = db.service_get_all_volume_sorted(context) + for result in results: + (service, volume_gigabytes) = result + if volume_gigabytes + volume_ref['size'] > FLAGS.max_gigabytes: + raise driver.NoValidHost("All hosts have too many gigabytes") + if self.service_is_up(service): + # NOTE(vish): this probably belongs in the manager, if we + # can generalize this somehow + now = datetime.datetime.utcnow() + db.volume_update(context, + volume_id, + {'host': service['host'], + 'scheduled_at': now}) + return service['host'] + raise driver.NoValidHost("No hosts found") + + def schedule_set_network_host(self, context, *_args, **_kwargs): + """Picks a host that is up and has the fewest networks.""" + + results = db.service_get_all_network_sorted(context) + for result in results: + (service, instance_count) = result + if instance_count >= FLAGS.max_networks: + raise driver.NoValidHost("All hosts have too many networks") + if self.service_is_up(service): + return service['host'] + raise driver.NoValidHost("No hosts found") diff --git a/nova/service.py b/nova/service.py index 60583dcdb..870dd6ceb 100644 --- a/nova/service.py +++ b/nova/service.py @@ -62,10 +62,11 @@ class Service(object, service.Service): def _create_service_ref(self): - self.service_id = db.service_create(None, {'host': self.host, - 'binary': self.binary, - 'topic': self.topic, - 'report_count': 0}) + service_ref = db.service_create(None, {'host': self.host, + 'binary': self.binary, + 'topic': self.topic, + 'report_count': 0}) + self.service_id = service_ref['id'] def __getattr__(self, key): try: diff --git a/nova/tests/api/__init__.py b/nova/tests/api/__init__.py index 59c4adc3d..4682c094e 100644 --- a/nova/tests/api/__init__.py +++ b/nova/tests/api/__init__.py @@ -52,8 +52,9 @@ class Test(unittest.TestCase): result = webob.Request.blank('/test/cloud').get_response(api.API()) self.assertNotEqual(result.body, "/cloud") - def test_query_api_version(self): - pass + def test_query_api_versions(self): + result = webob.Request.blank('/').get_response(api.API()) + self.assertTrue('CURRENT' in result.body) if __name__ == '__main__': unittest.main() diff --git a/nova/tests/api/rackspace/__init__.py b/nova/tests/api/rackspace/__init__.py index e69de29bb..622cb4335 100644 --- a/nova/tests/api/rackspace/__init__.py +++ b/nova/tests/api/rackspace/__init__.py @@ -0,0 +1,79 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import unittest + +from nova.api.rackspace import RateLimitingMiddleware +from nova.tests.api.test_helper import * +from webob import Request + + +class RateLimitingMiddlewareTest(unittest.TestCase): + + def test_get_action_name(self): + middleware = RateLimitingMiddleware(APIStub()) + def verify(method, url, action_name): + req = Request.blank(url) + req.method = method + action = middleware.get_action_name(req) + self.assertEqual(action, action_name) + verify('PUT', '/servers/4', 'PUT') + verify('DELETE', '/servers/4', 'DELETE') + verify('POST', '/images/4', 'POST') + verify('POST', '/servers/4', 'POST servers') + verify('GET', '/foo?a=4&changes-since=never&b=5', 'GET changes-since') + verify('GET', '/foo?a=4&monkeys-since=never&b=5', None) + verify('GET', '/servers/4', None) + verify('HEAD', '/servers/4', None) + + def exhaust(self, middleware, method, url, username, times): + req = Request.blank(url, dict(REQUEST_METHOD=method), + headers={'X-Auth-User': username}) + for i in range(times): + resp = req.get_response(middleware) + self.assertEqual(resp.status_int, 200) + resp = req.get_response(middleware) + self.assertEqual(resp.status_int, 413) + self.assertTrue('Retry-After' in resp.headers) + + def test_single_action(self): + middleware = RateLimitingMiddleware(APIStub()) + self.exhaust(middleware, 'DELETE', '/servers/4', 'usr1', 100) + self.exhaust(middleware, 'DELETE', '/servers/4', 'usr2', 100) + + def test_POST_servers_action_implies_POST_action(self): + middleware = RateLimitingMiddleware(APIStub()) + self.exhaust(middleware, 'POST', '/servers/4', 'usr1', 10) + self.exhaust(middleware, 'POST', '/images/4', 'usr2', 10) + self.assertTrue(set(middleware.limiter._levels) == + set(['usr1:POST', 'usr1:POST servers', 'usr2:POST'])) + + def test_POST_servers_action_correctly_ratelimited(self): + middleware = RateLimitingMiddleware(APIStub()) + # Use up all of our "POST" allowance for the minute, 5 times + for i in range(5): + self.exhaust(middleware, 'POST', '/servers/4', 'usr1', 10) + # Reset the 'POST' action counter. + del middleware.limiter._levels['usr1:POST'] + # All 50 daily "POST servers" actions should be all used up + self.exhaust(middleware, 'POST', '/servers/4', 'usr1', 0) + + def test_proxy_ctor_works(self): + middleware = RateLimitingMiddleware(APIStub()) + self.assertEqual(middleware.limiter.__class__.__name__, "Limiter") + middleware = RateLimitingMiddleware(APIStub(), service_host='foobar') + self.assertEqual(middleware.limiter.__class__.__name__, "WSGIAppProxy") diff --git a/nova/tests/api_unittest.py b/nova/tests/api_unittest.py index 70669206c..78c686571 100644 --- a/nova/tests/api_unittest.py +++ b/nova/tests/api_unittest.py @@ -41,8 +41,8 @@ FLAGS = flags.FLAGS # it's pretty damn circuitous so apologies if you have to fix # a bug in it # NOTE(jaypipes) The pylint disables here are for R0913 (too many args) which -# isn't controllable since boto's HTTPRequest needs that many -# args, and for the version-differentiated import of tornado's +# isn't controllable since boto's HTTPRequest needs that many +# args, and for the version-differentiated import of tornado's # httputil. # NOTE(jaypipes): The disable-msg=E1101 and E1103 below is because pylint is # unable to introspect the deferred's return value properly @@ -224,7 +224,8 @@ class ApiEc2TestCase(test.BaseTestCase): for x in range(random.randint(4, 8))) user = self.manager.create_user('fake', 'fake', 'fake') project = self.manager.create_project('fake', 'fake', 'fake') - self.manager.generate_key_pair(user.id, keyname) + # NOTE(vish): create depends on pool, so call helper directly + cloud._gen_key(None, user.id, keyname) rv = self.ec2.get_all_key_pairs() results = [k for k in rv if k.name == keyname] diff --git a/nova/tests/auth_unittest.py b/nova/tests/auth_unittest.py index b54e68274..cbf7b22e2 100644 --- a/nova/tests/auth_unittest.py +++ b/nova/tests/auth_unittest.py @@ -17,8 +17,6 @@ # under the License. import logging -from M2Crypto import BIO -from M2Crypto import RSA from M2Crypto import X509 import unittest @@ -65,35 +63,6 @@ class AuthTestCase(test.BaseTestCase): 'export S3_URL="http://127.0.0.1:3333/"\n' + 'export EC2_USER_ID="test1"\n') - def test_006_test_key_storage(self): - user = self.manager.get_user('test1') - user.create_key_pair('public', 'key', 'fingerprint') - key = user.get_key_pair('public') - self.assertEqual('key', key.public_key) - self.assertEqual('fingerprint', key.fingerprint) - - def test_007_test_key_generation(self): - user = self.manager.get_user('test1') - private_key, fingerprint = user.generate_key_pair('public2') - key = RSA.load_key_string(private_key, callback=lambda: None) - bio = BIO.MemoryBuffer() - public_key = user.get_key_pair('public2').public_key - key.save_pub_key_bio(bio) - converted = crypto.ssl_pub_to_ssh_pub(bio.read()) - # assert key fields are equal - self.assertEqual(public_key.split(" ")[1].strip(), - converted.split(" ")[1].strip()) - - def test_008_can_list_key_pairs(self): - keys = self.manager.get_user('test1').get_key_pairs() - self.assertTrue(filter(lambda k: k.name == 'public', keys)) - self.assertTrue(filter(lambda k: k.name == 'public2', keys)) - - def test_009_can_delete_key_pair(self): - self.manager.get_user('test1').delete_key_pair('public') - keys = self.manager.get_user('test1').get_key_pairs() - self.assertFalse(filter(lambda k: k.name == 'public', keys)) - def test_010_can_list_users(self): users = self.manager.get_users() logging.warn(users) @@ -204,6 +173,12 @@ class AuthTestCase(test.BaseTestCase): self.assert_(len(self.manager.get_projects()) > 1) self.assertEqual(len(self.manager.get_projects('test2')), 1) + def test_220_can_modify_project(self): + self.manager.modify_project('testproj', 'test2', 'new description') + project = self.manager.get_project('testproj') + self.assertEqual(project.project_manager_id, 'test2') + self.assertEqual(project.description, 'new description') + def test_299_can_delete_project(self): self.manager.delete_project('testproj') self.assertFalse(filter(lambda p: p.name == 'testproj', self.manager.get_projects())) diff --git a/nova/tests/cloud_unittest.py b/nova/tests/cloud_unittest.py index 29947e03c..317200e01 100644 --- a/nova/tests/cloud_unittest.py +++ b/nova/tests/cloud_unittest.py @@ -17,18 +17,24 @@ # under the License. import logging +from M2Crypto import BIO +from M2Crypto import RSA import StringIO import time + from tornado import ioloop from twisted.internet import defer import unittest from xml.etree import ElementTree +from nova import crypto +from nova import db from nova import flags from nova import rpc from nova import test from nova import utils from nova.auth import manager +from nova.compute import power_state from nova.endpoint import api from nova.endpoint import cloud @@ -54,16 +60,21 @@ class CloudTestCase(test.BaseTestCase): proxy=self.compute) self.injected.append(self.compute_consumer.attach_to_tornado(self.ioloop)) - try: - manager.AuthManager().create_user('admin', 'admin', 'admin') - except: pass - admin = manager.AuthManager().get_user('admin') - project = manager.AuthManager().create_project('proj', 'admin', 'proj') - self.context = api.APIRequestContext(handler=None,project=project,user=admin) + self.manager = manager.AuthManager() + self.user = self.manager.create_user('admin', 'admin', 'admin', True) + self.project = self.manager.create_project('proj', 'admin', 'proj') + self.context = api.APIRequestContext(handler=None, + user=self.user, + project=self.project) def tearDown(self): - manager.AuthManager().delete_project('proj') - manager.AuthManager().delete_user('admin') + self.manager.delete_project(self.project) + self.manager.delete_user(self.user) + super(CloudTestCase, self).setUp() + + def _create_key(self, name): + # NOTE(vish): create depends on pool, so just call helper directly + return cloud._gen_key(self.context, self.context.user.id, name) def test_console_output(self): if FLAGS.connection_type == 'fake': @@ -76,6 +87,33 @@ class CloudTestCase(test.BaseTestCase): self.assert_(output) rv = yield self.compute.terminate_instance(instance_id) + + def test_key_generation(self): + result = self._create_key('test') + private_key = result['private_key'] + key = RSA.load_key_string(private_key, callback=lambda: None) + bio = BIO.MemoryBuffer() + public_key = db.key_pair_get(self.context, + self.context.user.id, + 'test')['public_key'] + key.save_pub_key_bio(bio) + converted = crypto.ssl_pub_to_ssh_pub(bio.read()) + # assert key fields are equal + self.assertEqual(public_key.split(" ")[1].strip(), + converted.split(" ")[1].strip()) + + def test_describe_key_pairs(self): + self._create_key('test1') + self._create_key('test2') + result = self.cloud.describe_key_pairs(self.context) + keys = result["keypairsSet"] + self.assertTrue(filter(lambda k: k['keyName'] == 'test1', keys)) + self.assertTrue(filter(lambda k: k['keyName'] == 'test2', keys)) + + def test_delete_key_pair(self): + self._create_key('test') + self.cloud.delete_key_pair(self.context, 'test') + def test_run_instances(self): if FLAGS.connection_type == 'fake': logging.debug("Can't test instances without a real virtual env.") @@ -94,7 +132,7 @@ class CloudTestCase(test.BaseTestCase): rv = yield defer.succeed(time.sleep(1)) info = self.cloud._get_instance(instance['instance_id']) logging.debug(info['state']) - if info['state'] == node.Instance.RUNNING: + if info['state'] == power_state.RUNNING: break self.assert_(rv) diff --git a/nova/tests/compute_unittest.py b/nova/tests/compute_unittest.py index 746c035d6..f5c0f1c09 100644 --- a/nova/tests/compute_unittest.py +++ b/nova/tests/compute_unittest.py @@ -18,6 +18,8 @@ """ Tests For Compute """ + +import datetime import logging from twisted.internet import defer @@ -48,6 +50,7 @@ class ComputeTestCase(test.TrialTestCase): def tearDown(self): # pylint: disable-msg=C0103 self.manager.delete_user(self.user) self.manager.delete_project(self.project) + super(ComputeTestCase, self).tearDown() def _create_instance(self): """Create a test instance""" @@ -60,7 +63,7 @@ class ComputeTestCase(test.TrialTestCase): inst['instance_type'] = 'm1.tiny' inst['mac_address'] = utils.generate_mac() inst['ami_launch_index'] = 0 - return db.instance_create(self.context, inst) + return db.instance_create(self.context, inst)['id'] @defer.inlineCallbacks def test_run_terminate(self): @@ -80,6 +83,24 @@ class ComputeTestCase(test.TrialTestCase): self.assertEqual(len(instances), 0) @defer.inlineCallbacks + def test_run_terminate_timestamps(self): + """Make sure timestamps are set for launched and destroyed""" + instance_id = self._create_instance() + instance_ref = db.instance_get(self.context, instance_id) + self.assertEqual(instance_ref['launched_at'], None) + self.assertEqual(instance_ref['deleted_at'], None) + launch = datetime.datetime.utcnow() + yield self.compute.run_instance(self.context, instance_id) + instance_ref = db.instance_get(self.context, instance_id) + self.assert_(instance_ref['launched_at'] > launch) + self.assertEqual(instance_ref['deleted_at'], None) + terminate = datetime.datetime.utcnow() + yield self.compute.terminate_instance(self.context, instance_id) + instance_ref = db.instance_get({'deleted': True}, instance_id) + self.assert_(instance_ref['launched_at'] < terminate) + self.assert_(instance_ref['deleted_at'] > terminate) + + @defer.inlineCallbacks def test_reboot(self): """Ensure instance can be rebooted""" instance_id = self._create_instance() diff --git a/nova/tests/network_unittest.py b/nova/tests/network_unittest.py index a89f1d622..dc5277f02 100644 --- a/nova/tests/network_unittest.py +++ b/nova/tests/network_unittest.py @@ -28,6 +28,7 @@ from nova import flags from nova import test from nova import utils from nova.auth import manager +from nova.endpoint import api FLAGS = flags.FLAGS @@ -48,7 +49,7 @@ class NetworkTestCase(test.TrialTestCase): self.user = self.manager.create_user('netuser', 'netuser', 'netuser') self.projects = [] self.network = utils.import_object(FLAGS.network_manager) - self.context = None + self.context = api.APIRequestContext(None, project=None, user=self.user) for i in range(5): name = 'project%s' % i self.projects.append(self.manager.create_project(name, @@ -56,12 +57,12 @@ class NetworkTestCase(test.TrialTestCase): name)) # create the necessary network data for the project self.network.set_network_host(self.context, self.projects[i].id) - instance_id = db.instance_create(None, + instance_ref = db.instance_create(None, {'mac_address': utils.generate_mac()}) - self.instance_id = instance_id - instance_id = db.instance_create(None, + self.instance_id = instance_ref['id'] + instance_ref = db.instance_create(None, {'mac_address': utils.generate_mac()}) - self.instance2_id = instance_id + self.instance2_id = instance_ref['id'] def tearDown(self): # pylint: disable-msg=C0103 super(NetworkTestCase, self).tearDown() @@ -75,12 +76,10 @@ class NetworkTestCase(test.TrialTestCase): def _create_address(self, project_num, instance_id=None): """Create an address in given project num""" - net = db.project_get_network(None, self.projects[project_num].id) - address = db.fixed_ip_allocate(None, net['id']) if instance_id is None: instance_id = self.instance_id - db.fixed_ip_instance_associate(None, address, instance_id) - return address + self.context.project = self.projects[project_num] + return self.network.allocate_fixed_ip(self.context, instance_id) def test_public_network_association(self): """Makes sure that we can allocaate a public ip""" @@ -103,14 +102,14 @@ class NetworkTestCase(test.TrialTestCase): address = db.instance_get_floating_address(None, self.instance_id) self.assertEqual(address, None) self.network.deallocate_floating_ip(self.context, float_addr) - db.fixed_ip_deallocate(None, fix_addr) + self.network.deallocate_fixed_ip(self.context, fix_addr) def test_allocate_deallocate_fixed_ip(self): """Makes sure that we can allocate and deallocate a fixed ip""" address = self._create_address(0) self.assertTrue(is_allocated_in_project(address, self.projects[0].id)) lease_ip(address) - db.fixed_ip_deallocate(None, address) + self.network.deallocate_fixed_ip(self.context, address) # Doesn't go away until it's dhcp released self.assertTrue(is_allocated_in_project(address, self.projects[0].id)) @@ -131,14 +130,14 @@ class NetworkTestCase(test.TrialTestCase): lease_ip(address) lease_ip(address2) - db.fixed_ip_deallocate(None, address) + self.network.deallocate_fixed_ip(self.context, address) release_ip(address) self.assertFalse(is_allocated_in_project(address, self.projects[0].id)) # First address release shouldn't affect the second self.assertTrue(is_allocated_in_project(address2, self.projects[1].id)) - db.fixed_ip_deallocate(None, address2) + self.network.deallocate_fixed_ip(self.context, address2) release_ip(address2) self.assertFalse(is_allocated_in_project(address2, self.projects[1].id)) @@ -147,10 +146,23 @@ class NetworkTestCase(test.TrialTestCase): """Makes sure that private ips don't overlap""" first = self._create_address(0) lease_ip(first) + instance_ids = [] for i in range(1, 5): - address = self._create_address(i) - address2 = self._create_address(i) - address3 = self._create_address(i) + mac = utils.generate_mac() + instance_ref = db.instance_create(None, + {'mac_address': mac}) + instance_ids.append(instance_ref['id']) + address = self._create_address(i, instance_ref['id']) + mac = utils.generate_mac() + instance_ref = db.instance_create(None, + {'mac_address': mac}) + instance_ids.append(instance_ref['id']) + address2 = self._create_address(i, instance_ref['id']) + mac = utils.generate_mac() + instance_ref = db.instance_create(None, + {'mac_address': mac}) + instance_ids.append(instance_ref['id']) + address3 = self._create_address(i, instance_ref['id']) lease_ip(address) lease_ip(address2) lease_ip(address3) @@ -160,14 +172,16 @@ class NetworkTestCase(test.TrialTestCase): self.projects[0].id)) self.assertFalse(is_allocated_in_project(address3, self.projects[0].id)) - db.fixed_ip_deallocate(None, address) - db.fixed_ip_deallocate(None, address2) - db.fixed_ip_deallocate(None, address3) + self.network.deallocate_fixed_ip(self.context, address) + self.network.deallocate_fixed_ip(self.context, address2) + self.network.deallocate_fixed_ip(self.context, address3) release_ip(address) release_ip(address2) release_ip(address3) + for instance_id in instance_ids: + db.instance_destroy(None, instance_id) release_ip(first) - db.fixed_ip_deallocate(None, first) + self.network.deallocate_fixed_ip(self.context, first) def test_vpn_ip_and_port_looks_valid(self): """Ensure the vpn ip and port are reasonable""" @@ -194,12 +208,12 @@ class NetworkTestCase(test.TrialTestCase): """Makes sure that ip addresses that are deallocated get reused""" address = self._create_address(0) lease_ip(address) - db.fixed_ip_deallocate(None, address) + self.network.deallocate_fixed_ip(self.context, address) release_ip(address) address2 = self._create_address(0) self.assertEqual(address, address2) - db.fixed_ip_deallocate(None, address2) + self.network.deallocate_fixed_ip(self.context, address2) def test_available_ips(self): """Make sure the number of available ips for the network is correct @@ -226,21 +240,27 @@ class NetworkTestCase(test.TrialTestCase): num_available_ips = db.network_count_available_ips(None, network['id']) addresses = [] + instance_ids = [] for i in range(num_available_ips): - address = self._create_address(0) + mac = utils.generate_mac() + instance_ref = db.instance_create(None, + {'mac_address': mac}) + instance_ids.append(instance_ref['id']) + address = self._create_address(0, instance_ref['id']) addresses.append(address) lease_ip(address) self.assertEqual(db.network_count_available_ips(None, network['id']), 0) self.assertRaises(db.NoMoreAddresses, - db.fixed_ip_allocate, - None, - network['id']) + self.network.allocate_fixed_ip, + self.context, + 'foo') - for i in range(len(addresses)): - db.fixed_ip_deallocate(None, addresses[i]) + for i in range(num_available_ips): + self.network.deallocate_fixed_ip(self.context, addresses[i]) release_ip(addresses[i]) + db.instance_destroy(None, instance_ids[i]) self.assertEqual(db.network_count_available_ips(None, network['id']), num_available_ips) @@ -263,7 +283,10 @@ def binpath(script): def lease_ip(private_ip): """Run add command on dhcpbridge""" network_ref = db.fixed_ip_get_network(None, private_ip) - cmd = "%s add fake %s fake" % (binpath('nova-dhcpbridge'), private_ip) + instance_ref = db.fixed_ip_get_instance(None, private_ip) + cmd = "%s add %s %s fake" % (binpath('nova-dhcpbridge'), + instance_ref['mac_address'], + private_ip) env = {'DNSMASQ_INTERFACE': network_ref['bridge'], 'TESTING': '1', 'FLAGFILE': FLAGS.dhcpbridge_flagfile} @@ -274,7 +297,10 @@ def lease_ip(private_ip): def release_ip(private_ip): """Run del command on dhcpbridge""" network_ref = db.fixed_ip_get_network(None, private_ip) - cmd = "%s del fake %s fake" % (binpath('nova-dhcpbridge'), private_ip) + instance_ref = db.fixed_ip_get_instance(None, private_ip) + cmd = "%s del %s %s fake" % (binpath('nova-dhcpbridge'), + instance_ref['mac_address'], + private_ip) env = {'DNSMASQ_INTERFACE': network_ref['bridge'], 'TESTING': '1', 'FLAGFILE': FLAGS.dhcpbridge_flagfile} diff --git a/nova/tests/quota_unittest.py b/nova/tests/quota_unittest.py new file mode 100644 index 000000000..cab9f663d --- /dev/null +++ b/nova/tests/quota_unittest.py @@ -0,0 +1,155 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from nova import db +from nova import exception +from nova import flags +from nova import quota +from nova import test +from nova import utils +from nova.auth import manager +from nova.endpoint import cloud +from nova.endpoint import api + + +FLAGS = flags.FLAGS + + +class QuotaTestCase(test.TrialTestCase): + def setUp(self): # pylint: disable-msg=C0103 + logging.getLogger().setLevel(logging.DEBUG) + super(QuotaTestCase, self).setUp() + self.flags(connection_type='fake', + quota_instances=2, + quota_cores=4, + quota_volumes=2, + quota_gigabytes=20, + quota_floating_ips=1) + + self.cloud = cloud.CloudController() + self.manager = manager.AuthManager() + self.user = self.manager.create_user('admin', 'admin', 'admin', True) + self.project = self.manager.create_project('admin', 'admin', 'admin') + self.network = utils.import_object(FLAGS.network_manager) + self.context = api.APIRequestContext(handler=None, + project=self.project, + user=self.user) + + def tearDown(self): # pylint: disable-msg=C0103 + manager.AuthManager().delete_project(self.project) + manager.AuthManager().delete_user(self.user) + super(QuotaTestCase, self).tearDown() + + def _create_instance(self, cores=2): + """Create a test instance""" + inst = {} + inst['image_id'] = 'ami-test' + inst['reservation_id'] = 'r-fakeres' + inst['user_id'] = self.user.id + inst['project_id'] = self.project.id + inst['instance_type'] = 'm1.large' + inst['vcpus'] = cores + inst['mac_address'] = utils.generate_mac() + return db.instance_create(self.context, inst)['id'] + + def _create_volume(self, size=10): + """Create a test volume""" + vol = {} + vol['user_id'] = self.user.id + vol['project_id'] = self.project.id + vol['size'] = size + return db.volume_create(self.context, vol)['id'] + + def test_quota_overrides(self): + """Make sure overriding a projects quotas works""" + num_instances = quota.allowed_instances(self.context, 100, 'm1.small') + self.assertEqual(num_instances, 2) + db.quota_create(self.context, {'project_id': self.project.id, + 'instances': 10}) + num_instances = quota.allowed_instances(self.context, 100, 'm1.small') + self.assertEqual(num_instances, 4) + db.quota_update(self.context, self.project.id, {'cores': 100}) + num_instances = quota.allowed_instances(self.context, 100, 'm1.small') + self.assertEqual(num_instances, 10) + db.quota_destroy(self.context, self.project.id) + + def test_too_many_instances(self): + instance_ids = [] + for i in range(FLAGS.quota_instances): + instance_id = self._create_instance() + instance_ids.append(instance_id) + self.assertFailure(self.cloud.run_instances(self.context, + min_count=1, + max_count=1, + instance_type='m1.small'), + cloud.QuotaError) + for instance_id in instance_ids: + db.instance_destroy(self.context, instance_id) + + def test_too_many_cores(self): + instance_ids = [] + instance_id = self._create_instance(cores=4) + instance_ids.append(instance_id) + self.assertFailure(self.cloud.run_instances(self.context, + min_count=1, + max_count=1, + instance_type='m1.small'), + cloud.QuotaError) + for instance_id in instance_ids: + db.instance_destroy(self.context, instance_id) + + def test_too_many_volumes(self): + volume_ids = [] + for i in range(FLAGS.quota_volumes): + volume_id = self._create_volume() + volume_ids.append(volume_id) + self.assertRaises(cloud.QuotaError, + self.cloud.create_volume, + self.context, + size=10) + for volume_id in volume_ids: + db.volume_destroy(self.context, volume_id) + + def test_too_many_gigabytes(self): + volume_ids = [] + volume_id = self._create_volume(size=20) + volume_ids.append(volume_id) + self.assertRaises(cloud.QuotaError, + self.cloud.create_volume, + self.context, + size=10) + for volume_id in volume_ids: + db.volume_destroy(self.context, volume_id) + + def test_too_many_addresses(self): + address = '192.168.0.100' + try: + db.floating_ip_get_by_address(None, address) + except exception.NotFound: + db.floating_ip_create(None, {'address': address, + 'host': FLAGS.host}) + float_addr = self.network.allocate_floating_ip(self.context, + self.project.id) + # NOTE(vish): This assert never fails. When cloud attempts to + # make an rpc.call, the test just finishes with OK. It + # appears to be something in the magic inline callbacks + # that is breaking. + self.assertFailure(self.cloud.allocate_address(self.context), + cloud.QuotaError) diff --git a/nova/tests/scheduler_unittest.py b/nova/tests/scheduler_unittest.py new file mode 100644 index 000000000..fde30f81e --- /dev/null +++ b/nova/tests/scheduler_unittest.py @@ -0,0 +1,231 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +Tests For Scheduler +""" + +from nova import db +from nova import flags +from nova import service +from nova import test +from nova import rpc +from nova import utils +from nova.auth import manager as auth_manager +from nova.scheduler import manager +from nova.scheduler import driver + + +FLAGS = flags.FLAGS +flags.DECLARE('max_cores', 'nova.scheduler.simple') + +class TestDriver(driver.Scheduler): + """Scheduler Driver for Tests""" + def schedule(context, topic, *args, **kwargs): + return 'fallback_host' + + def schedule_named_method(context, topic, num): + return 'named_host' + +class SchedulerTestCase(test.TrialTestCase): + """Test case for scheduler""" + def setUp(self): # pylint: disable=C0103 + super(SchedulerTestCase, self).setUp() + self.flags(scheduler_driver='nova.tests.scheduler_unittest.TestDriver') + + def test_fallback(self): + scheduler = manager.SchedulerManager() + self.mox.StubOutWithMock(rpc, 'cast', use_mock_anything=True) + rpc.cast('topic.fallback_host', + {'method': 'noexist', + 'args': {'context': None, + 'num': 7}}) + self.mox.ReplayAll() + scheduler.noexist(None, 'topic', num=7) + + def test_named_method(self): + scheduler = manager.SchedulerManager() + self.mox.StubOutWithMock(rpc, 'cast', use_mock_anything=True) + rpc.cast('topic.named_host', + {'method': 'named_method', + 'args': {'context': None, + 'num': 7}}) + self.mox.ReplayAll() + scheduler.named_method(None, 'topic', num=7) + + +class SimpleDriverTestCase(test.TrialTestCase): + """Test case for simple driver""" + def setUp(self): # pylint: disable-msg=C0103 + super(SimpleDriverTestCase, self).setUp() + self.flags(connection_type='fake', + max_cores=4, + max_gigabytes=4, + volume_driver='nova.volume.driver.FakeAOEDriver', + scheduler_driver='nova.scheduler.simple.SimpleScheduler') + self.scheduler = manager.SchedulerManager() + self.context = None + self.manager = auth_manager.AuthManager() + self.user = self.manager.create_user('fake', 'fake', 'fake') + self.project = self.manager.create_project('fake', 'fake', 'fake') + self.context = None + + def tearDown(self): # pylint: disable-msg=C0103 + self.manager.delete_user(self.user) + self.manager.delete_project(self.project) + + def _create_instance(self): + """Create a test instance""" + inst = {} + inst['image_id'] = 'ami-test' + inst['reservation_id'] = 'r-fakeres' + inst['user_id'] = self.user.id + inst['project_id'] = self.project.id + inst['instance_type'] = 'm1.tiny' + inst['mac_address'] = utils.generate_mac() + inst['ami_launch_index'] = 0 + inst['vcpus'] = 1 + return db.instance_create(self.context, inst)['id'] + + def _create_volume(self): + """Create a test volume""" + vol = {} + vol['image_id'] = 'ami-test' + vol['reservation_id'] = 'r-fakeres' + vol['size'] = 1 + return db.volume_create(self.context, vol)['id'] + + def test_hosts_are_up(self): + """Ensures driver can find the hosts that are up""" + # NOTE(vish): constructing service without create method + # because we are going to use it without queue + compute1 = service.Service('host1', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute2 = service.Service('host2', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + hosts = self.scheduler.driver.hosts_up(self.context, 'compute') + self.assertEqual(len(hosts), 2) + compute1.kill() + compute2.kill() + + def test_least_busy_host_gets_instance(self): + """Ensures the host with less cores gets the next one""" + compute1 = service.Service('host1', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute2 = service.Service('host2', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + instance_id1 = self._create_instance() + compute1.run_instance(self.context, instance_id1) + instance_id2 = self._create_instance() + host = self.scheduler.driver.schedule_run_instance(self.context, + instance_id2) + self.assertEqual(host, 'host2') + compute1.terminate_instance(self.context, instance_id1) + db.instance_destroy(self.context, instance_id2) + compute1.kill() + compute2.kill() + + def test_too_many_cores(self): + """Ensures we don't go over max cores""" + compute1 = service.Service('host1', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute2 = service.Service('host2', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + instance_ids1 = [] + instance_ids2 = [] + for index in xrange(FLAGS.max_cores): + instance_id = self._create_instance() + compute1.run_instance(self.context, instance_id) + instance_ids1.append(instance_id) + instance_id = self._create_instance() + compute2.run_instance(self.context, instance_id) + instance_ids2.append(instance_id) + instance_id = self._create_instance() + self.assertRaises(driver.NoValidHost, + self.scheduler.driver.schedule_run_instance, + self.context, + instance_id) + for instance_id in instance_ids1: + compute1.terminate_instance(self.context, instance_id) + for instance_id in instance_ids2: + compute2.terminate_instance(self.context, instance_id) + compute1.kill() + compute2.kill() + + def test_least_busy_host_gets_volume(self): + """Ensures the host with less gigabytes gets the next one""" + volume1 = service.Service('host1', + 'nova-volume', + 'volume', + FLAGS.volume_manager) + volume2 = service.Service('host2', + 'nova-volume', + 'volume', + FLAGS.volume_manager) + volume_id1 = self._create_volume() + volume1.create_volume(self.context, volume_id1) + volume_id2 = self._create_volume() + host = self.scheduler.driver.schedule_create_volume(self.context, + volume_id2) + self.assertEqual(host, 'host2') + volume1.delete_volume(self.context, volume_id1) + db.volume_destroy(self.context, volume_id2) + volume1.kill() + volume2.kill() + + def test_too_many_gigabytes(self): + """Ensures we don't go over max gigabytes""" + volume1 = service.Service('host1', + 'nova-volume', + 'volume', + FLAGS.volume_manager) + volume2 = service.Service('host2', + 'nova-volume', + 'volume', + FLAGS.volume_manager) + volume_ids1 = [] + volume_ids2 = [] + for index in xrange(FLAGS.max_gigabytes): + volume_id = self._create_volume() + volume1.create_volume(self.context, volume_id) + volume_ids1.append(volume_id) + volume_id = self._create_volume() + volume2.create_volume(self.context, volume_id) + volume_ids2.append(volume_id) + volume_id = self._create_volume() + self.assertRaises(driver.NoValidHost, + self.scheduler.driver.schedule_create_volume, + self.context, + volume_id) + for volume_id in volume_ids1: + volume1.delete_volume(self.context, volume_id) + for volume_id in volume_ids2: + volume2.delete_volume(self.context, volume_id) + volume1.kill() + volume2.kill() diff --git a/nova/tests/service_unittest.py b/nova/tests/service_unittest.py index 097a045e0..01da0eb8a 100644 --- a/nova/tests/service_unittest.py +++ b/nova/tests/service_unittest.py @@ -87,7 +87,7 @@ class ServiceTestCase(test.BaseTestCase): host, binary).AndRaise(exception.NotFound()) service.db.service_create(None, - service_create).AndReturn(service_ref['id']) + service_create).AndReturn(service_ref) self.mox.ReplayAll() app = service.Service.create(host=host, binary=binary) @@ -131,7 +131,7 @@ class ServiceTestCase(test.BaseTestCase): host, binary).AndRaise(exception.NotFound()) service.db.service_create(None, - service_create).AndReturn(service_ref['id']) + service_create).AndReturn(service_ref) service.db.service_get(None, service_ref['id']).AndReturn(service_ref) service.db.service_update(None, service_ref['id'], mox.ContainsKeyValue('report_count', 1)) diff --git a/nova/tests/volume_unittest.py b/nova/tests/volume_unittest.py index 9e35d2a1c..1d665b502 100644 --- a/nova/tests/volume_unittest.py +++ b/nova/tests/volume_unittest.py @@ -108,7 +108,7 @@ class VolumeTestCase(test.TrialTestCase): inst['instance_type'] = 'm1.tiny' inst['mac_address'] = utils.generate_mac() inst['ami_launch_index'] = 0 - instance_id = db.instance_create(self.context, inst) + instance_id = db.instance_create(self.context, inst)['id'] mountpoint = "/dev/sdf" volume_id = self._create_volume() yield self.volume.create_volume(self.context, volume_id) diff --git a/nova/utils.py b/nova/utils.py index 011a5cb09..d18dd9843 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -33,22 +33,12 @@ from twisted.internet.threads import deferToThread from nova import exception from nova import flags +from nova.exception import ProcessExecutionError FLAGS = flags.FLAGS TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" -class ProcessExecutionError(IOError): - def __init__( self, stdout=None, stderr=None, exit_code=None, cmd=None, - description=None): - if description is None: - description = "Unexpected error while running command." - if exit_code is None: - exit_code = '-' - message = "%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r" % ( - description, cmd, exit_code, stdout, stderr) - IOError.__init__(self, message) - def import_class(import_str): """Returns a class from a string including module and class""" mod_str, _sep, class_str = import_str.rpartition('.') @@ -129,8 +119,10 @@ def runthis(prompt, cmd, check_exit_code = True): exit_code = subprocess.call(cmd.split(" ")) logging.debug(prompt % (exit_code)) if check_exit_code and exit_code <> 0: - raise Exception( "Unexpected exit code: %s from cmd: %s" - % (exit_code, cmd)) + raise ProcessExecutionError(exit_code=exit_code, + stdout=None, + stderr=None, + cmd=cmd) def generate_uid(topic, size=8): diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py index aaa2c69b6..4c4c7980b 100644 --- a/nova/virt/libvirt_conn.py +++ b/nova/virt/libvirt_conn.py @@ -140,10 +140,9 @@ class LibvirtConnection(object): timer = task.LoopingCall(f=None) def _wait_for_shutdown(): try: - db.instance_set_state(None, - instance['id'], - self.get_info(instance['name'])['state']) - if instance.state == power_state.SHUTDOWN: + state = self.get_info(instance['name'])['state'] + db.instance_set_state(None, instance['id'], state) + if state == power_state.SHUTDOWN: timer.stop() d.callback(None) except Exception: @@ -191,10 +190,9 @@ class LibvirtConnection(object): timer = task.LoopingCall(f=None) def _wait_for_reboot(): try: - db.instance_set_state(None, - instance['id'], - self.get_info(instance['name'])['state']) - if instance.state == power_state.RUNNING: + state = self.get_info(instance['name'])['state'] + db.instance_set_state(None, instance['id'], state) + if state == power_state.RUNNING: logging.debug('instance %s: rebooted', instance['name']) timer.stop() d.callback(None) @@ -228,10 +226,9 @@ class LibvirtConnection(object): timer = task.LoopingCall(f=None) def _wait_for_boot(): try: - db.instance_set_state(None, - instance['id'], - self.get_info(instance['name'])['state']) - if instance.state == power_state.RUNNING: + state = self.get_info(instance['name'])['state'] + db.instance_set_state(None, instance['id'], state) + if state == power_state.RUNNING: logging.debug('instance %s: booted', instance['name']) timer.stop() local_d.callback(None) diff --git a/nova/virt/xenapi.py b/nova/virt/xenapi.py index b44ac383a..1c6de4403 100644 --- a/nova/virt/xenapi.py +++ b/nova/virt/xenapi.py @@ -274,9 +274,19 @@ class XenAPIConnection(object): def destroy(self, instance): vm = yield self._lookup(instance.name) if vm is None: - raise Exception('instance not present %s' % instance.name) - task = yield self._call_xenapi('Async.VM.destroy', vm) - yield self._wait_for_task(task) + # Don't complain, just return. This lets us clean up instances + # that have already disappeared from the underlying platform. + defer.returnValue(None) + try: + task = yield self._call_xenapi('Async.VM.hard_shutdown', vm) + yield self._wait_for_task(task) + except Exception, exc: + logging.warn(exc) + try: + task = yield self._call_xenapi('Async.VM.destroy', vm) + yield self._wait_for_task(task) + except Exception, exc: + logging.warn(exc) def get_info(self, instance_id): vm = self._lookup_blocking(instance_id) @@ -330,9 +340,9 @@ class XenAPIConnection(object): error_info) deferred.errback(XenAPI.Failure(error_info)) #logging.debug('Polling task %s done.', task) - except Exception, exn: - logging.warn(exn) - deferred.errback(exn) + except Exception, exc: + logging.warn(exc) + deferred.errback(exc) @utils.deferredToThread def _call_xenapi(self, method, *args): @@ -358,21 +368,21 @@ class XenAPIConnection(object): def _unwrap_plugin_exceptions(func, *args, **kwargs): try: return func(*args, **kwargs) - except XenAPI.Failure, exn: - logging.debug("Got exception: %s", exn) - if (len(exn.details) == 4 and - exn.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and - exn.details[2] == 'Failure'): + except XenAPI.Failure, exc: + logging.debug("Got exception: %s", exc) + if (len(exc.details) == 4 and + exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and + exc.details[2] == 'Failure'): params = None try: - params = eval(exn.details[3]) + params = eval(exc.details[3]) except: - raise exn + raise exc raise XenAPI.Failure(params) else: raise - except xmlrpclib.ProtocolError, exn: - logging.debug("Got exception: %s", exn) + except xmlrpclib.ProtocolError, exc: + logging.debug("Got exception: %s", exc) raise diff --git a/nova/volume/manager.py b/nova/volume/manager.py index 174c036d6..034763512 100644 --- a/nova/volume/manager.py +++ b/nova/volume/manager.py @@ -22,6 +22,7 @@ destroying persistent storage volumes, ala EBS. """ import logging +import datetime from twisted.internet import defer @@ -72,7 +73,7 @@ class AOEManager(manager.Manager): self.db.volume_update(context, volume_id, - {'host': FLAGS.host}) + {'host': self.host}) size = volume_ref['size'] logging.debug("volume %s: creating lv of size %sG", volume_id, size) @@ -89,14 +90,13 @@ class AOEManager(manager.Manager): yield self.driver.create_export(volume_ref['str_id'], shelf_id, blade_id) - # TODO(joshua): We need to trigger a fanout message - # for aoe-discover on all the nodes - - self.db.volume_update(context, volume_id, {'status': 'available'}) logging.debug("volume %s: re-exporting all values", volume_id) yield self.driver.ensure_exports() + now = datetime.datetime.utcnow() + self.db.volume_update(context, volume_id, {'status': 'available', + 'launched_at': now}) logging.debug("volume %s: created successfully", volume_id) defer.returnValue(volume_id) @@ -107,7 +107,7 @@ class AOEManager(manager.Manager): volume_ref = self.db.volume_get(context, volume_id) if volume_ref['attach_status'] == "attached": raise exception.Error("Volume is still attached") - if volume_ref['host'] != FLAGS.host: + if volume_ref['host'] != self.host: raise exception.Error("Volume is not local to this node") shelf_id, blade_id = self.db.volume_get_shelf_and_blade(context, volume_id) diff --git a/run_tests.py b/run_tests.py index 75ab561a1..8bb068ed1 100644 --- a/run_tests.py +++ b/run_tests.py @@ -58,7 +58,9 @@ from nova.tests.flags_unittest import * from nova.tests.network_unittest import * from nova.tests.objectstore_unittest import * from nova.tests.process_unittest import * +from nova.tests.quota_unittest import * from nova.tests.rpc_unittest import * +from nova.tests.scheduler_unittest import * from nova.tests.service_unittest import * from nova.tests.validator_unittest import * from nova.tests.volume_unittest import * diff --git a/tools/install_venv.py b/tools/install_venv.py index 5d2369a96..32c372352 100644 --- a/tools/install_venv.py +++ b/tools/install_venv.py @@ -88,6 +88,10 @@ def create_virtualenv(venv=VENV): def install_dependencies(venv=VENV): print 'Installing dependencies with pip (this can take a while)...' + # Install greenlet by hand - just listing it in the requires file does not + # get it in stalled in the right order + run_command(['tools/with_venv.sh', 'pip', 'install', '-E', venv, 'greenlet'], + redirect_output=False) run_command(['tools/with_venv.sh', 'pip', 'install', '-E', venv, '-r', PIP_REQUIRES], redirect_output=False) run_command(['tools/with_venv.sh', 'pip', 'install', '-E', venv, TWISTED_NOVA], diff --git a/tools/pip-requires b/tools/pip-requires index dd69708ce..1e2707be7 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -7,15 +7,16 @@ amqplib==0.6.1 anyjson==0.2.4 boto==2.0b1 carrot==0.10.5 -eventlet==0.9.10 +eventlet==0.9.12 lockfile==0.8 python-daemon==1.5.5 python-gflags==1.3 redis==2.0.0 routes==1.12.3 tornado==1.0 -webob==0.9.8 +WebOb==0.9.8 wsgiref==0.1.2 zope.interface==3.6.1 mox==0.5.0 -f http://pymox.googlecode.com/files/mox-0.5.0.tar.gz +greenlet==0.3.1 |