summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDan Prince <dan.prince@rackspace.com>2011-03-24 15:35:41 -0400
committerDan Prince <dan.prince@rackspace.com>2011-03-24 15:35:41 -0400
commitf51e2f3091f19823a5ad5ec7039cc1d18ce041e6 (patch)
tree8d79f003e9a9008661d6119341daa7b2091fb3a5
parent6254069cdf0262e128bfa877f0c56e5aeba2b4c2 (diff)
parenta0ea76b26a7725efb2fc4a811dff66b4f8bff6b7 (diff)
Merge w/ trunk.
-rwxr-xr-xbin/nova-manage3
-rw-r--r--nova/api/ec2/__init__.py13
-rw-r--r--nova/api/ec2/admin.py2
-rw-r--r--nova/api/ec2/cloud.py4
-rw-r--r--nova/api/openstack/accounts.py7
-rw-r--r--nova/api/openstack/servers.py36
-rw-r--r--nova/api/openstack/zones.py34
-rw-r--r--nova/compute/api.py17
-rw-r--r--nova/compute/manager.py5
-rw-r--r--nova/db/api.py3
-rw-r--r--nova/db/sqlalchemy/api.py16
-rw-r--r--nova/flags.py5
-rw-r--r--nova/image/glance.py2
-rw-r--r--nova/manager.py31
-rw-r--r--nova/network/linux_net.py13
-rw-r--r--nova/network/manager.py5
-rw-r--r--nova/rpc.py77
-rw-r--r--nova/scheduler/api.py228
-rw-r--r--nova/scheduler/driver.py7
-rw-r--r--nova/scheduler/manager.py14
-rw-r--r--nova/scheduler/zone_manager.py39
-rw-r--r--nova/service.py10
-rw-r--r--nova/tests/api/openstack/test_zones.py29
-rw-r--r--nova/tests/test_rpc.py4
-rw-r--r--nova/tests/test_scheduler.py161
-rw-r--r--nova/tests/test_service.py24
-rw-r--r--nova/tests/test_test.py2
-rw-r--r--nova/tests/test_volume.py4
-rw-r--r--nova/tests/test_zones.py34
-rw-r--r--nova/utils.py9
-rw-r--r--nova/virt/fake.py2
-rw-r--r--nova/virt/libvirt_conn.py13
-rw-r--r--nova/volume/manager.py5
33 files changed, 735 insertions, 123 deletions
diff --git a/bin/nova-manage b/bin/nova-manage
index 69cbf6f95..cf0caf47e 100755
--- a/bin/nova-manage
+++ b/bin/nova-manage
@@ -97,6 +97,7 @@ flags.DECLARE('vlan_start', 'nova.network.manager')
flags.DECLARE('vpn_start', 'nova.network.manager')
flags.DECLARE('fixed_range_v6', 'nova.network.manager')
flags.DECLARE('images_path', 'nova.image.local')
+flags.DECLARE('libvirt_type', 'nova.virt.libvirt_conn')
flags.DEFINE_flag(flags.HelpFlag())
flags.DEFINE_flag(flags.HelpshortFlag())
flags.DEFINE_flag(flags.HelpXMLFlag())
@@ -610,7 +611,7 @@ class ServiceCommands(object):
args: [host] [service]"""
ctxt = context.get_admin_context()
now = datetime.datetime.utcnow()
- services = db.service_get_all(ctxt) + db.service_get_all(ctxt, True)
+ services = db.service_get_all(ctxt)
if host:
services = [s for s in services if s['host'] == host]
if service:
diff --git a/nova/api/ec2/__init__.py b/nova/api/ec2/__init__.py
index 20701cfa8..a3c3b25a1 100644
--- a/nova/api/ec2/__init__.py
+++ b/nova/api/ec2/__init__.py
@@ -61,10 +61,13 @@ class RequestLogging(wsgi.Middleware):
return rv
def log_request_completion(self, response, request, start):
- controller = request.environ.get('ec2.controller', None)
- if controller:
- controller = controller.__class__.__name__
- action = request.environ.get('ec2.action', None)
+ apireq = request.environ.get('ec2.request', None)
+ if apireq:
+ controller = apireq.controller
+ action = apireq.action
+ else:
+ controller = None
+ action = None
ctxt = request.environ.get('ec2.context', None)
delta = utils.utcnow() - start
seconds = delta.seconds
@@ -75,7 +78,7 @@ class RequestLogging(wsgi.Middleware):
microseconds,
request.remote_addr,
request.method,
- request.path_info,
+ "%s%s" % (request.script_name, request.path_info),
controller,
action,
response.status_int,
diff --git a/nova/api/ec2/admin.py b/nova/api/ec2/admin.py
index d8d90ad83..6a5609d4a 100644
--- a/nova/api/ec2/admin.py
+++ b/nova/api/ec2/admin.py
@@ -304,7 +304,7 @@ class AdminController(object):
* Volume (up, down, None)
* Volume Count
"""
- services = db.service_get_all(context)
+ services = db.service_get_all(context, False)
now = datetime.datetime.utcnow()
hosts = []
rv = []
diff --git a/nova/api/ec2/cloud.py b/nova/api/ec2/cloud.py
index e257e44e7..2afcea77c 100644
--- a/nova/api/ec2/cloud.py
+++ b/nova/api/ec2/cloud.py
@@ -196,7 +196,7 @@ class CloudController(object):
def _describe_availability_zones(self, context, **kwargs):
ctxt = context.elevated()
- enabled_services = db.service_get_all(ctxt)
+ enabled_services = db.service_get_all(ctxt, False)
disabled_services = db.service_get_all(ctxt, True)
available_zones = []
for zone in [service.availability_zone for service
@@ -221,7 +221,7 @@ class CloudController(object):
rv = {'availabilityZoneInfo': [{'zoneName': 'nova',
'zoneState': 'available'}]}
- services = db.service_get_all(context)
+ services = db.service_get_all(context, False)
now = datetime.datetime.utcnow()
hosts = []
for host in [service['host'] for service in services]:
diff --git a/nova/api/openstack/accounts.py b/nova/api/openstack/accounts.py
index 2510ffb61..86066fa20 100644
--- a/nova/api/openstack/accounts.py
+++ b/nova/api/openstack/accounts.py
@@ -14,6 +14,7 @@
# under the License.
import common
+import webob.exc
from nova import exception
from nova import flags
@@ -51,10 +52,10 @@ class Controller(wsgi.Controller):
raise exception.NotAuthorized(_("Not admin user."))
def index(self, req):
- raise faults.Fault(exc.HTTPNotImplemented())
+ raise faults.Fault(webob.exc.HTTPNotImplemented())
def detail(self, req):
- raise faults.Fault(exc.HTTPNotImplemented())
+ raise faults.Fault(webob.exc.HTTPNotImplemented())
def show(self, req, id):
"""Return data about the given account id"""
@@ -69,7 +70,7 @@ class Controller(wsgi.Controller):
def create(self, req):
"""We use update with create-or-update semantics
because the id comes from an external source"""
- raise faults.Fault(exc.HTTPNotImplemented())
+ raise faults.Fault(webob.exc.HTTPNotImplemented())
def update(self, req, id):
"""This is really create or update."""
diff --git a/nova/api/openstack/servers.py b/nova/api/openstack/servers.py
index 443196146..0dad46268 100644
--- a/nova/api/openstack/servers.py
+++ b/nova/api/openstack/servers.py
@@ -15,19 +15,19 @@
import base64
import hashlib
-import json
import traceback
-from xml.dom import minidom
from webob import exc
+from xml.dom import minidom
from nova import compute
from nova import context
from nova import exception
from nova import flags
from nova import log as logging
-from nova import wsgi
+from nova import quota
from nova import utils
+from nova import wsgi
from nova.api.openstack import common
from nova.api.openstack import faults
import nova.api.openstack.views.addresses
@@ -36,8 +36,8 @@ import nova.api.openstack.views.servers
from nova.auth import manager as auth_manager
from nova.compute import instance_types
from nova.compute import power_state
-from nova.quota import QuotaError
import nova.api.openstack
+from nova.scheduler import api as scheduler_api
LOG = logging.getLogger('server')
@@ -88,15 +88,18 @@ class Controller(wsgi.Controller):
for inst in limited_list]
return dict(servers=servers)
+ @scheduler_api.redirect_handler
def show(self, req, id):
""" Returns server details by server id """
try:
- instance = self.compute_api.get(req.environ['nova.context'], id)
+ instance = self.compute_api.routing_get(
+ req.environ['nova.context'], id)
builder = self._get_view_builder(req)
return builder.build(instance, is_detail=True)
except exception.NotFound:
return faults.Fault(exc.HTTPNotFound())
+ @scheduler_api.redirect_handler
def delete(self, req, id):
""" Destroys a server """
try:
@@ -156,8 +159,8 @@ class Controller(wsgi.Controller):
key_data=key_data,
metadata=metadata,
injected_files=injected_files)
- except QuotaError as error:
- self._handle_quota_errors(error)
+ except quota.QuotaError as error:
+ self._handle_quota_error(error)
inst['instance_type'] = flavor_id
inst['image_id'] = requested_image_id
@@ -211,7 +214,7 @@ class Controller(wsgi.Controller):
injected_files.append((path, contents))
return injected_files
- def _handle_quota_errors(self, error):
+ def _handle_quota_error(self, error):
"""
Reraise quota errors as api-specific http exceptions
"""
@@ -227,6 +230,7 @@ class Controller(wsgi.Controller):
# if the original error is okay, just reraise it
raise error
+ @scheduler_api.redirect_handler
def update(self, req, id):
""" Updates the server name or password """
if len(req.body) == 0:
@@ -242,7 +246,7 @@ class Controller(wsgi.Controller):
update_dict['admin_pass'] = inst_dict['server']['adminPass']
try:
self.compute_api.set_admin_password(ctxt, id)
- except exception.TimeoutException, e:
+ except exception.TimeoutException:
return exc.HTTPRequestTimeout()
if 'name' in inst_dict['server']:
update_dict['display_name'] = inst_dict['server']['name']
@@ -252,6 +256,7 @@ class Controller(wsgi.Controller):
return faults.Fault(exc.HTTPNotFound())
return exc.HTTPNoContent()
+ @scheduler_api.redirect_handler
def action(self, req, id):
"""Multi-purpose method used to reboot, rebuild, or
resize a server"""
@@ -317,6 +322,7 @@ class Controller(wsgi.Controller):
return faults.Fault(exc.HTTPUnprocessableEntity())
return exc.HTTPAccepted()
+ @scheduler_api.redirect_handler
def lock(self, req, id):
"""
lock the instance with id
@@ -332,6 +338,7 @@ class Controller(wsgi.Controller):
return faults.Fault(exc.HTTPUnprocessableEntity())
return exc.HTTPAccepted()
+ @scheduler_api.redirect_handler
def unlock(self, req, id):
"""
unlock the instance with id
@@ -347,6 +354,7 @@ class Controller(wsgi.Controller):
return faults.Fault(exc.HTTPUnprocessableEntity())
return exc.HTTPAccepted()
+ @scheduler_api.redirect_handler
def get_lock(self, req, id):
"""
return the boolean state of (instance with id)'s lock
@@ -361,6 +369,7 @@ class Controller(wsgi.Controller):
return faults.Fault(exc.HTTPUnprocessableEntity())
return exc.HTTPAccepted()
+ @scheduler_api.redirect_handler
def reset_network(self, req, id):
"""
Reset networking on an instance (admin only).
@@ -375,6 +384,7 @@ class Controller(wsgi.Controller):
return faults.Fault(exc.HTTPUnprocessableEntity())
return exc.HTTPAccepted()
+ @scheduler_api.redirect_handler
def inject_network_info(self, req, id):
"""
Inject network info for an instance (admin only).
@@ -389,6 +399,7 @@ class Controller(wsgi.Controller):
return faults.Fault(exc.HTTPUnprocessableEntity())
return exc.HTTPAccepted()
+ @scheduler_api.redirect_handler
def pause(self, req, id):
""" Permit Admins to Pause the server. """
ctxt = req.environ['nova.context']
@@ -400,6 +411,7 @@ class Controller(wsgi.Controller):
return faults.Fault(exc.HTTPUnprocessableEntity())
return exc.HTTPAccepted()
+ @scheduler_api.redirect_handler
def unpause(self, req, id):
""" Permit Admins to Unpause the server. """
ctxt = req.environ['nova.context']
@@ -411,6 +423,7 @@ class Controller(wsgi.Controller):
return faults.Fault(exc.HTTPUnprocessableEntity())
return exc.HTTPAccepted()
+ @scheduler_api.redirect_handler
def suspend(self, req, id):
"""permit admins to suspend the server"""
context = req.environ['nova.context']
@@ -422,6 +435,7 @@ class Controller(wsgi.Controller):
return faults.Fault(exc.HTTPUnprocessableEntity())
return exc.HTTPAccepted()
+ @scheduler_api.redirect_handler
def resume(self, req, id):
"""permit admins to resume the server from suspend"""
context = req.environ['nova.context']
@@ -433,6 +447,7 @@ class Controller(wsgi.Controller):
return faults.Fault(exc.HTTPUnprocessableEntity())
return exc.HTTPAccepted()
+ @scheduler_api.redirect_handler
def rescue(self, req, id):
"""Permit users to rescue the server."""
context = req.environ["nova.context"]
@@ -444,6 +459,7 @@ class Controller(wsgi.Controller):
return faults.Fault(exc.HTTPUnprocessableEntity())
return exc.HTTPAccepted()
+ @scheduler_api.redirect_handler
def unrescue(self, req, id):
"""Permit users to unrescue the server."""
context = req.environ["nova.context"]
@@ -455,6 +471,7 @@ class Controller(wsgi.Controller):
return faults.Fault(exc.HTTPUnprocessableEntity())
return exc.HTTPAccepted()
+ @scheduler_api.redirect_handler
def get_ajax_console(self, req, id):
""" Returns a url to an instance's ajaxterm console. """
try:
@@ -464,6 +481,7 @@ class Controller(wsgi.Controller):
return faults.Fault(exc.HTTPNotFound())
return exc.HTTPAccepted()
+ @scheduler_api.redirect_handler
def diagnostics(self, req, id):
"""Permit Admins to retrieve server diagnostics."""
ctxt = req.environ["nova.context"]
diff --git a/nova/api/openstack/zones.py b/nova/api/openstack/zones.py
index 8fe84275a..846cb48a1 100644
--- a/nova/api/openstack/zones.py
+++ b/nova/api/openstack/zones.py
@@ -15,9 +15,10 @@
import common
+from nova import db
from nova import flags
+from nova import log as logging
from nova import wsgi
-from nova import db
from nova.scheduler import api
@@ -38,7 +39,8 @@ def _exclude_keys(item, keys):
def _scrub_zone(zone):
- return _filter_keys(zone, ('id', 'api_url'))
+ return _exclude_keys(zone, ('username', 'password', 'created_at',
+ 'deleted', 'deleted_at', 'updated_at'))
class Controller(wsgi.Controller):
@@ -52,13 +54,9 @@ class Controller(wsgi.Controller):
"""Return all zones in brief"""
# Ask the ZoneManager in the Scheduler for most recent data,
# or fall-back to the database ...
- items = api.API().get_zone_list(req.environ['nova.context'])
- if not items:
- items = db.zone_get_all(req.environ['nova.context'])
-
+ items = api.get_zone_list(req.environ['nova.context'])
items = common.limited(items, req)
- items = [_exclude_keys(item, ['username', 'password'])
- for item in items]
+ items = [_scrub_zone(item) for item in items]
return dict(zones=items)
def detail(self, req):
@@ -67,29 +65,37 @@ class Controller(wsgi.Controller):
def info(self, req):
"""Return name and capabilities for this zone."""
- return dict(zone=dict(name=FLAGS.zone_name,
- capabilities=FLAGS.zone_capabilities))
+ items = api.get_zone_capabilities(req.environ['nova.context'])
+
+ zone = dict(name=FLAGS.zone_name)
+ caps = FLAGS.zone_capabilities
+ for cap in caps:
+ key, value = cap.split('=')
+ zone[key] = value
+ for item, (min_value, max_value) in items.iteritems():
+ zone[item] = "%s,%s" % (min_value, max_value)
+ return dict(zone=zone)
def show(self, req, id):
"""Return data about the given zone id"""
zone_id = int(id)
- zone = db.zone_get(req.environ['nova.context'], zone_id)
+ zone = api.zone_get(req.environ['nova.context'], zone_id)
return dict(zone=_scrub_zone(zone))
def delete(self, req, id):
zone_id = int(id)
- db.zone_delete(req.environ['nova.context'], zone_id)
+ api.zone_delete(req.environ['nova.context'], zone_id)
return {}
def create(self, req):
context = req.environ['nova.context']
env = self._deserialize(req.body, req.get_content_type())
- zone = db.zone_create(context, env["zone"])
+ zone = api.zone_create(context, env["zone"])
return dict(zone=_scrub_zone(zone))
def update(self, req, id):
context = req.environ['nova.context']
env = self._deserialize(req.body, req.get_content_type())
zone_id = int(id)
- zone = db.zone_update(context, zone_id, env["zone"])
+ zone = api.zone_update(context, zone_id, env["zone"])
return dict(zone=_scrub_zone(zone))
diff --git a/nova/compute/api.py b/nova/compute/api.py
index 825b50e48..c4f14f826 100644
--- a/nova/compute/api.py
+++ b/nova/compute/api.py
@@ -34,6 +34,7 @@ from nova import rpc
from nova import utils
from nova import volume
from nova.compute import instance_types
+from nova.scheduler import api as scheduler_api
from nova.db import base
FLAGS = flags.FLAGS
@@ -352,6 +353,7 @@ class API(base.Base):
rv = self.db.instance_update(context, instance_id, kwargs)
return dict(rv.iteritems())
+ @scheduler_api.reroute_compute("delete")
def delete(self, context, instance_id):
LOG.debug(_("Going to try to terminate %s"), instance_id)
try:
@@ -384,6 +386,13 @@ class API(base.Base):
rv = self.db.instance_get(context, instance_id)
return dict(rv.iteritems())
+ @scheduler_api.reroute_compute("get")
+ def routing_get(self, context, instance_id):
+ """Use this method instead of get() if this is the only
+ operation you intend to to. It will route to novaclient.get
+ if the instance is not found."""
+ return self.get(context, instance_id)
+
def get_all(self, context, project_id=None, reservation_id=None,
fixed_ip=None):
"""Get all instances, possibly filtered by one of the
@@ -527,14 +536,17 @@ class API(base.Base):
"instance_id": instance_id,
"flavor_id": flavor_id}})
+ @scheduler_api.reroute_compute("pause")
def pause(self, context, instance_id):
"""Pause the given instance."""
self._cast_compute_message('pause_instance', context, instance_id)
+ @scheduler_api.reroute_compute("unpause")
def unpause(self, context, instance_id):
"""Unpause the given instance."""
self._cast_compute_message('unpause_instance', context, instance_id)
+ @scheduler_api.reroute_compute("diagnostics")
def get_diagnostics(self, context, instance_id):
"""Retrieve diagnostics for the given instance."""
return self._call_compute_message(
@@ -546,18 +558,22 @@ class API(base.Base):
"""Retrieve actions for the given instance."""
return self.db.instance_get_actions(context, instance_id)
+ @scheduler_api.reroute_compute("suspend")
def suspend(self, context, instance_id):
"""suspend the instance with instance_id"""
self._cast_compute_message('suspend_instance', context, instance_id)
+ @scheduler_api.reroute_compute("resume")
def resume(self, context, instance_id):
"""resume the instance with instance_id"""
self._cast_compute_message('resume_instance', context, instance_id)
+ @scheduler_api.reroute_compute("rescue")
def rescue(self, context, instance_id):
"""Rescue the given instance."""
self._cast_compute_message('rescue_instance', context, instance_id)
+ @scheduler_api.reroute_compute("unrescue")
def unrescue(self, context, instance_id):
"""Unrescue the given instance."""
self._cast_compute_message('unrescue_instance', context, instance_id)
@@ -573,7 +589,6 @@ class API(base.Base):
def get_ajax_console(self, context, instance_id):
"""Get a url to an AJAX Console"""
- instance = self.get(context, instance_id)
output = self._call_compute_message('get_ajax_console',
context,
instance_id)
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index 7316d1510..468771f46 100644
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -111,7 +111,7 @@ def checks_instance_lock(function):
return decorated_function
-class ComputeManager(manager.Manager):
+class ComputeManager(manager.SchedulerDependentManager):
"""Manages the running instances from creation to destruction."""
@@ -132,7 +132,8 @@ class ComputeManager(manager.Manager):
self.network_manager = utils.import_object(FLAGS.network_manager)
self.volume_manager = utils.import_object(FLAGS.volume_manager)
- super(ComputeManager, self).__init__(*args, **kwargs)
+ super(ComputeManager, self).__init__(service_name="compute",
+ *args, **kwargs)
def init_host(self):
"""Do any initialization that needs to be run if this is a
diff --git a/nova/db/api.py b/nova/db/api.py
index 300723c62..fd3c63b76 100644
--- a/nova/db/api.py
+++ b/nova/db/api.py
@@ -71,6 +71,7 @@ class NoMoreTargets(exception.Error):
"""No more available blades"""
pass
+
###################
@@ -89,7 +90,7 @@ def service_get_by_host_and_topic(context, host, topic):
return IMPL.service_get_by_host_and_topic(context, host, topic)
-def service_get_all(context, disabled=False):
+def service_get_all(context, disabled=None):
"""Get all services."""
return IMPL.service_get_all(context, disabled)
diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py
index 57c05623b..86eea105d 100644
--- a/nova/db/sqlalchemy/api.py
+++ b/nova/db/sqlalchemy/api.py
@@ -143,12 +143,15 @@ def service_get(context, service_id, session=None):
@require_admin_context
-def service_get_all(context, disabled=False):
+def service_get_all(context, disabled=None):
session = get_session()
- return session.query(models.Service).\
- filter_by(deleted=can_read_deleted(context)).\
- filter_by(disabled=disabled).\
- all()
+ query = session.query(models.Service).\
+ filter_by(deleted=can_read_deleted(context))
+
+ if disabled is not None:
+ query = query.filter_by(disabled=disabled)
+
+ return query.all()
@require_admin_context
@@ -2209,7 +2212,7 @@ def migration_get(context, id, session=None):
filter_by(id=id).first()
if not result:
raise exception.NotFound(_("No migration found with id %s")
- % migration_id)
+ % id)
return result
@@ -2432,6 +2435,7 @@ def zone_create(context, values):
@require_admin_context
def zone_update(context, zone_id, values):
+ session = get_session()
zone = session.query(models.Zone).filter_by(id=zone_id).first()
if not zone:
raise exception.NotFound(_("No zone with id %(zone_id)s") % locals())
diff --git a/nova/flags.py b/nova/flags.py
index 9123e9ac7..bf83b8e0f 100644
--- a/nova/flags.py
+++ b/nova/flags.py
@@ -358,5 +358,6 @@ DEFINE_string('node_availability_zone', 'nova',
'availability zone of this node')
DEFINE_string('zone_name', 'nova', 'name of this zone')
-DEFINE_string('zone_capabilities', 'kypervisor:xenserver;os:linux',
- 'Key/Value tags which represent capabilities of this zone')
+DEFINE_list('zone_capabilities',
+ ['hypervisor=xenserver;kvm', 'os=linux;windows'],
+ 'Key/Multi-value list representng capabilities of this zone')
diff --git a/nova/image/glance.py b/nova/image/glance.py
index 171b28fde..9984a3ba1 100644
--- a/nova/image/glance.py
+++ b/nova/image/glance.py
@@ -73,7 +73,7 @@ class GlanceImageService(service.BaseImageService):
Returns image with known timestamp fields converted to datetime objects
"""
for attr in ['created_at', 'updated_at', 'deleted_at']:
- if image.get(attr) is not None:
+ if image.get(attr):
image[attr] = self._parse_glance_iso8601_timestamp(image[attr])
return image
diff --git a/nova/manager.py b/nova/manager.py
index 3d38504bd..804a50479 100644
--- a/nova/manager.py
+++ b/nova/manager.py
@@ -53,11 +53,14 @@ This module provides Manager, a base class for managers.
from nova import utils
from nova import flags
+from nova import log as logging
from nova.db import base
-
+from nova.scheduler import api
FLAGS = flags.FLAGS
+LOG = logging.getLogger('nova.manager')
+
class Manager(base.Base):
def __init__(self, host=None, db_driver=None):
@@ -74,3 +77,29 @@ class Manager(base.Base):
"""Do any initialization that needs to be run if this is a standalone
service. Child classes should override this method."""
pass
+
+
+class SchedulerDependentManager(Manager):
+ """Periodically send capability updates to the Scheduler services.
+ Services that need to update the Scheduler of their capabilities
+ should derive from this class. Otherwise they can derive from
+ manager.Manager directly. Updates are only sent after
+ update_service_capabilities is called with non-None values."""
+
+ def __init__(self, host=None, db_driver=None, service_name="undefined"):
+ self.last_capabilities = None
+ self.service_name = service_name
+ super(SchedulerDependentManager, self).__init__(host, db_driver)
+
+ def update_service_capabilities(self, capabilities):
+ """Remember these capabilities to send on next periodic update."""
+ self.last_capabilities = capabilities
+
+ def periodic_tasks(self, context=None):
+ """Pass data back to the scheduler at a periodic interval"""
+ if self.last_capabilities:
+ LOG.debug(_("Notifying Schedulers of capabilities ..."))
+ api.update_service_capabilities(context, self.service_name,
+ self.host, self.last_capabilities)
+
+ super(SchedulerDependentManager, self).periodic_tasks(context)
diff --git a/nova/network/linux_net.py b/nova/network/linux_net.py
index 796d6ba31..06b05366a 100644
--- a/nova/network/linux_net.py
+++ b/nova/network/linux_net.py
@@ -1,3 +1,5 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
@@ -210,10 +212,7 @@ class IptablesManager(object):
"""
def __init__(self, execute=None):
if not execute:
- if FLAGS.fake_network:
- self.execute = lambda *args, **kwargs: ('', '')
- else:
- self.execute = utils.execute
+ self.execute = _execute
else:
self.execute = execute
@@ -352,9 +351,6 @@ class IptablesManager(object):
return new_filter
-iptables_manager = IptablesManager()
-
-
def metadata_forward():
"""Create forwarding rule for metadata"""
iptables_manager.ipv4['nat'].add_rule("PREROUTING",
@@ -767,3 +763,6 @@ def _ip_bridge_cmd(action, params, device):
cmd.extend(params)
cmd.extend(['dev', device])
return cmd
+
+
+iptables_manager = IptablesManager()
diff --git a/nova/network/manager.py b/nova/network/manager.py
index 34fc042e4..d994f7dc8 100644
--- a/nova/network/manager.py
+++ b/nova/network/manager.py
@@ -105,7 +105,7 @@ class AddressAlreadyAllocated(exception.Error):
pass
-class NetworkManager(manager.Manager):
+class NetworkManager(manager.SchedulerDependentManager):
"""Implements common network manager functionality.
This class must be subclassed to support specific topologies.
@@ -116,7 +116,8 @@ class NetworkManager(manager.Manager):
if not network_driver:
network_driver = FLAGS.network_driver
self.driver = utils.import_object(network_driver)
- super(NetworkManager, self).__init__(*args, **kwargs)
+ super(NetworkManager, self).__init__(service_name='network',
+ *args, **kwargs)
def init_host(self):
"""Do any initialization that needs to be run if this is a
diff --git a/nova/rpc.py b/nova/rpc.py
index 5935e1fb3..388f78d69 100644
--- a/nova/rpc.py
+++ b/nova/rpc.py
@@ -137,24 +137,7 @@ class Consumer(messaging.Consumer):
return timer
-class Publisher(messaging.Publisher):
- """Publisher base class"""
- pass
-
-
-class TopicConsumer(Consumer):
- """Consumes messages on a specific topic"""
- exchange_type = "topic"
-
- def __init__(self, connection=None, topic="broadcast"):
- self.queue = topic
- self.routing_key = topic
- self.exchange = FLAGS.control_exchange
- self.durable = False
- super(TopicConsumer, self).__init__(connection=connection)
-
-
-class AdapterConsumer(TopicConsumer):
+class AdapterConsumer(Consumer):
"""Calls methods on a proxy object based on method and args"""
def __init__(self, connection=None, topic="broadcast", proxy=None):
LOG.debug(_('Initing the Adapter Consumer for %s') % topic)
@@ -207,6 +190,41 @@ class AdapterConsumer(TopicConsumer):
return
+class Publisher(messaging.Publisher):
+ """Publisher base class"""
+ pass
+
+
+class TopicAdapterConsumer(AdapterConsumer):
+ """Consumes messages on a specific topic"""
+ exchange_type = "topic"
+
+ def __init__(self, connection=None, topic="broadcast", proxy=None):
+ self.queue = topic
+ self.routing_key = topic
+ self.exchange = FLAGS.control_exchange
+ self.durable = False
+ super(TopicAdapterConsumer, self).__init__(connection=connection,
+ topic=topic, proxy=proxy)
+
+
+class FanoutAdapterConsumer(AdapterConsumer):
+ """Consumes messages from a fanout exchange"""
+ exchange_type = "fanout"
+
+ def __init__(self, connection=None, topic="broadcast", proxy=None):
+ self.exchange = "%s_fanout" % topic
+ self.routing_key = topic
+ unique = uuid.uuid4().hex
+ self.queue = "%s_fanout_%s" % (topic, unique)
+ self.durable = False
+ LOG.info(_("Created '%(exchange)s' fanout exchange "
+ "with '%(key)s' routing key"),
+ dict(exchange=self.exchange, key=self.routing_key))
+ super(FanoutAdapterConsumer, self).__init__(connection=connection,
+ topic=topic, proxy=proxy)
+
+
class TopicPublisher(Publisher):
"""Publishes messages on a specific topic"""
exchange_type = "topic"
@@ -218,6 +236,19 @@ class TopicPublisher(Publisher):
super(TopicPublisher, self).__init__(connection=connection)
+class FanoutPublisher(Publisher):
+ """Publishes messages to a fanout exchange."""
+ exchange_type = "fanout"
+
+ def __init__(self, topic, connection=None):
+ self.exchange = "%s_fanout" % topic
+ self.queue = "%s_fanout" % topic
+ self.durable = False
+ LOG.info(_("Creating '%(exchange)s' fanout exchange"),
+ dict(exchange=self.exchange))
+ super(FanoutPublisher, self).__init__(connection=connection)
+
+
class DirectConsumer(Consumer):
"""Consumes messages directly on a channel specified by msg_id"""
exchange_type = "direct"
@@ -360,6 +391,16 @@ def cast(context, topic, msg):
publisher.close()
+def fanout_cast(context, topic, msg):
+ """Sends a message on a fanout exchange without waiting for a response"""
+ LOG.debug(_("Making asynchronous fanout cast..."))
+ _pack_context(msg, context)
+ conn = Connection.instance()
+ publisher = FanoutPublisher(topic, connection=conn)
+ publisher.send(msg)
+ publisher.close()
+
+
def generic_response(message_data, message):
"""Logs a result and exits"""
LOG.debug(_('response %s'), message_data)
diff --git a/nova/scheduler/api.py b/nova/scheduler/api.py
index 2405f1343..6bb3bf3cd 100644
--- a/nova/scheduler/api.py
+++ b/nova/scheduler/api.py
@@ -17,33 +17,225 @@
Handles all requests relating to schedulers.
"""
+import novaclient
+
+from nova import db
+from nova import exception
from nova import flags
from nova import log as logging
from nova import rpc
+from eventlet import greenpool
+
FLAGS = flags.FLAGS
+flags.DEFINE_bool('enable_zone_routing',
+ False,
+ 'When True, routing to child zones will occur.')
+
LOG = logging.getLogger('nova.scheduler.api')
-class API(object):
- """API for interacting with the scheduler."""
+def _call_scheduler(method, context, params=None):
+ """Generic handler for RPC calls to the scheduler.
+
+ :param params: Optional dictionary of arguments to be passed to the
+ scheduler worker
+
+ :retval: Result returned by scheduler worker
+ """
+ if not params:
+ params = {}
+ queue = FLAGS.scheduler_topic
+ kwargs = {'method': method, 'args': params}
+ return rpc.call(context, queue, kwargs)
+
+
+def get_zone_list(context):
+ """Return a list of zones assoicated with this zone."""
+ items = _call_scheduler('get_zone_list', context)
+ for item in items:
+ item['api_url'] = item['api_url'].replace('\\/', '/')
+ if not items:
+ items = db.zone_get_all(context)
+ return items
+
+
+def zone_get(context, zone_id):
+ return db.zone_get(context, zone_id)
+
+
+def zone_delete(context, zone_id):
+ return db.zone_delete(context, zone_id)
+
+
+def zone_create(context, data):
+ return db.zone_create(context, data)
+
+
+def zone_update(context, zone_id, data):
+ return db.zone_update(context, zone_id, data)
+
+
+def get_zone_capabilities(context, service=None):
+ """Returns a dict of key, value capabilities for this zone,
+ or for a particular class of services running in this zone."""
+ return _call_scheduler('get_zone_capabilities', context=context,
+ params=dict(service=service))
+
+
+def update_service_capabilities(context, service_name, host, capabilities):
+ """Send an update to all the scheduler services informing them
+ of the capabilities of this service."""
+ kwargs = dict(method='update_service_capabilities',
+ args=dict(service_name=service_name, host=host,
+ capabilities=capabilities))
+ return rpc.fanout_cast(context, 'scheduler', kwargs)
+
+
+def _wrap_method(function, self):
+ """Wrap method to supply self."""
+ def _wrap(*args, **kwargs):
+ return function(self, *args, **kwargs)
+ return _wrap
+
+
+def _process(func, zone):
+ """Worker stub for green thread pool. Give the worker
+ an authenticated nova client and zone info."""
+ nova = novaclient.OpenStack(zone.username, zone.password, zone.api_url)
+ nova.authenticate()
+ return func(nova, zone)
+
+
+def child_zone_helper(zone_list, func):
+ """Fire off a command to each zone in the list.
+ The return is [novaclient return objects] from each child zone.
+ For example, if you are calling server.pause(), the list will
+ be whatever the response from server.pause() is. One entry
+ per child zone called."""
+ green_pool = greenpool.GreenPool()
+ return [result for result in green_pool.imap(
+ _wrap_method(_process, func), zone_list)]
+
+
+def _issue_novaclient_command(nova, zone, collection, method_name, item_id):
+ """Use novaclient to issue command to a single child zone.
+ One of these will be run in parallel for each child zone."""
+ manager = getattr(nova, collection)
+ result = None
+ try:
+ try:
+ result = manager.get(int(item_id))
+ except ValueError, e:
+ result = manager.find(name=item_id)
+ except novaclient.NotFound:
+ url = zone.api_url
+ LOG.debug(_("%(collection)s '%(item_id)s' not found on '%(url)s'" %
+ locals()))
+ return None
+
+ if method_name.lower() not in ['get', 'find']:
+ result = getattr(result, method_name)()
+ return result
+
+
+def wrap_novaclient_function(f, collection, method_name, item_id):
+ """Appends collection, method_name and item_id to the incoming
+ (nova, zone) call from child_zone_helper."""
+ def inner(nova, zone):
+ return f(nova, zone, collection, method_name, item_id)
+
+ return inner
+
+
+class RedirectResult(exception.Error):
+ """Used to the HTTP API know that these results are pre-cooked
+ and they can be returned to the caller directly."""
+ def __init__(self, results):
+ self.results = results
+ super(RedirectResult, self).__init__(
+ message=_("Uncaught Zone redirection exception"))
+
+
+class reroute_compute(object):
+ """Decorator used to indicate that the method should
+ delegate the call the child zones if the db query
+ can't find anything."""
+ def __init__(self, method_name):
+ self.method_name = method_name
+
+ def __call__(self, f):
+ def wrapped_f(*args, **kwargs):
+ collection, context, item_id = \
+ self.get_collection_context_and_id(args, kwargs)
+ try:
+ # Call the original function ...
+ return f(*args, **kwargs)
+ except exception.InstanceNotFound, e:
+ LOG.debug(_("Instance %(item_id)s not found "
+ "locally: '%(e)s'" % locals()))
+
+ if not FLAGS.enable_zone_routing:
+ raise
+
+ zones = db.zone_get_all(context)
+ if not zones:
+ raise
+
+ # Ask the children to provide an answer ...
+ LOG.debug(_("Asking child zones ..."))
+ result = self._call_child_zones(zones,
+ wrap_novaclient_function(_issue_novaclient_command,
+ collection, self.method_name, item_id))
+ # Scrub the results and raise another exception
+ # so the API layers can bail out gracefully ...
+ raise RedirectResult(self.unmarshall_result(result))
+ return wrapped_f
+
+ def _call_child_zones(self, zones, function):
+ """Ask the child zones to perform this operation.
+ Broken out for testing."""
+ return child_zone_helper(zones, function)
+
+ def get_collection_context_and_id(self, args, kwargs):
+ """Returns a tuple of (novaclient collection name, security
+ context and resource id. Derived class should override this."""
+ context = kwargs.get('context', None)
+ instance_id = kwargs.get('instance_id', None)
+ if len(args) > 0 and not context:
+ context = args[1]
+ if len(args) > 1 and not instance_id:
+ instance_id = args[2]
+ return ("servers", context, instance_id)
+
+ def unmarshall_result(self, zone_responses):
+ """Result is a list of responses from each child zone.
+ Each decorator derivation is responsible to turning this
+ into a format expected by the calling method. For
+ example, this one is expected to return a single Server
+ dict {'server':{k:v}}. Others may return a list of them, like
+ {'servers':[{k,v}]}"""
+ reduced_response = []
+ for zone_response in zone_responses:
+ if not zone_response:
+ continue
+
+ server = zone_response.__dict__
- def _call_scheduler(self, method, context, params=None):
- """Generic handler for RPC calls to the scheduler.
+ for k in server.keys():
+ if k[0] == '_' or k == 'manager':
+ del server[k]
- :param params: Optional dictionary of arguments to be passed to the
- scheduler worker
+ reduced_response.append(dict(server=server))
+ if reduced_response:
+ return reduced_response[0] # first for now.
+ return {}
- :retval: Result returned by scheduler worker
- """
- if not params:
- params = {}
- queue = FLAGS.scheduler_topic
- kwargs = {'method': method, 'args': params}
- return rpc.call(context, queue, kwargs)
- def get_zone_list(self, context):
- items = self._call_scheduler('get_zone_list', context)
- for item in items:
- item['api_url'] = item['api_url'].replace('\\/', '/')
- return items
+def redirect_handler(f):
+ def new_f(*args, **kwargs):
+ try:
+ return f(*args, **kwargs)
+ except RedirectResult, e:
+ return e.results
+ return new_f
diff --git a/nova/scheduler/driver.py b/nova/scheduler/driver.py
index ed3dfe1c0..ce05d9f6a 100644
--- a/nova/scheduler/driver.py
+++ b/nova/scheduler/driver.py
@@ -49,6 +49,13 @@ class WillNotSchedule(exception.Error):
class Scheduler(object):
"""The base class that all Scheduler clases should inherit from."""
+ def __init__(self):
+ self.zone_manager = None
+
+ def set_zone_manager(self, zone_manager):
+ """Called by the Scheduler Service to supply a ZoneManager."""
+ self.zone_manager = zone_manager
+
@staticmethod
def service_is_up(service):
"""Check whether a service is up based on last heartbeat."""
diff --git a/nova/scheduler/manager.py b/nova/scheduler/manager.py
index 053a53356..7d62cfc4e 100644
--- a/nova/scheduler/manager.py
+++ b/nova/scheduler/manager.py
@@ -41,10 +41,11 @@ flags.DEFINE_string('scheduler_driver',
class SchedulerManager(manager.Manager):
"""Chooses a host to run instances on."""
def __init__(self, scheduler_driver=None, *args, **kwargs):
+ self.zone_manager = zone_manager.ZoneManager()
if not scheduler_driver:
scheduler_driver = FLAGS.scheduler_driver
self.driver = utils.import_object(scheduler_driver)
- self.zone_manager = zone_manager.ZoneManager()
+ self.driver.set_zone_manager(self.zone_manager)
super(SchedulerManager, self).__init__(*args, **kwargs)
def __getattr__(self, key):
@@ -59,6 +60,17 @@ class SchedulerManager(manager.Manager):
"""Get a list of zones from the ZoneManager."""
return self.zone_manager.get_zone_list()
+ def get_zone_capabilities(self, context=None, service=None):
+ """Get the normalized set of capabilites for this zone,
+ or for a particular service."""
+ return self.zone_manager.get_zone_capabilities(context, service)
+
+ def update_service_capabilities(self, context=None, service_name=None,
+ host=None, capabilities={}):
+ """Process a capability update from a service node."""
+ self.zone_manager.update_service_capabilities(service_name,
+ host, capabilities)
+
def _schedule(self, method, context, topic, *args, **kwargs):
"""Tries to call schedule_* method on the driver to retrieve host.
diff --git a/nova/scheduler/zone_manager.py b/nova/scheduler/zone_manager.py
index edf9000cc..198f9d4cc 100644
--- a/nova/scheduler/zone_manager.py
+++ b/nova/scheduler/zone_manager.py
@@ -58,8 +58,9 @@ class ZoneState(object):
child zone."""
self.last_seen = datetime.now()
self.attempt = 0
- self.name = zone_metadata["name"]
- self.capabilities = zone_metadata["capabilities"]
+ self.name = zone_metadata.get("name", "n/a")
+ self.capabilities = ", ".join(["%s=%s" % (k, v)
+ for k, v in zone_metadata.iteritems() if k != 'name'])
self.is_active = True
def to_dict(self):
@@ -104,13 +105,37 @@ class ZoneManager(object):
"""Keeps the zone states updated."""
def __init__(self):
self.last_zone_db_check = datetime.min
- self.zone_states = {}
+ self.zone_states = {} # { <zone_id> : ZoneState }
+ self.service_states = {} # { <service> : { <host> : { cap k : v }}}
self.green_pool = greenpool.GreenPool()
def get_zone_list(self):
"""Return the list of zones we know about."""
return [zone.to_dict() for zone in self.zone_states.values()]
+ def get_zone_capabilities(self, context, service=None):
+ """Roll up all the individual host info to generic 'service'
+ capabilities. Each capability is aggregated into
+ <cap>_min and <cap>_max values."""
+ service_dict = self.service_states
+ if service:
+ service_dict = {service: self.service_states.get(service, {})}
+
+ # TODO(sandy) - be smarter about fabricating this structure.
+ # But it's likely to change once we understand what the Best-Match
+ # code will need better.
+ combined = {} # { <service>_<cap> : (min, max), ... }
+ for service_name, host_dict in service_dict.iteritems():
+ for host, caps_dict in host_dict.iteritems():
+ for cap, value in caps_dict.iteritems():
+ key = "%s_%s" % (service_name, cap)
+ min_value, max_value = combined.get(key, (value, value))
+ min_value = min(min_value, value)
+ max_value = max(max_value, value)
+ combined[key] = (min_value, max_value)
+
+ return combined
+
def _refresh_from_db(self, context):
"""Make our zone state map match the db."""
# Add/update existing zones ...
@@ -141,3 +166,11 @@ class ZoneManager(object):
self.last_zone_db_check = datetime.now()
self._refresh_from_db(context)
self._poll_zones(context)
+
+ def update_service_capabilities(self, service_name, host, capabilities):
+ """Update the per-service capabilities based on this notification."""
+ logging.debug(_("Received %(service_name)s service update from "
+ "%(host)s: %(capabilities)s") % locals())
+ service_caps = self.service_states.get(service_name, {})
+ service_caps[host] = capabilities
+ self.service_states[service_name] = service_caps
diff --git a/nova/service.py b/nova/service.py
index 52bb15ad7..47c0b96c0 100644
--- a/nova/service.py
+++ b/nova/service.py
@@ -97,18 +97,24 @@ class Service(object):
conn1 = rpc.Connection.instance(new=True)
conn2 = rpc.Connection.instance(new=True)
+ conn3 = rpc.Connection.instance(new=True)
if self.report_interval:
- consumer_all = rpc.AdapterConsumer(
+ consumer_all = rpc.TopicAdapterConsumer(
connection=conn1,
topic=self.topic,
proxy=self)
- consumer_node = rpc.AdapterConsumer(
+ consumer_node = rpc.TopicAdapterConsumer(
connection=conn2,
topic='%s.%s' % (self.topic, self.host),
proxy=self)
+ fanout = rpc.FanoutAdapterConsumer(
+ connection=conn3,
+ topic=self.topic,
+ proxy=self)
self.timers.append(consumer_all.attach_to_eventlet())
self.timers.append(consumer_node.attach_to_eventlet())
+ self.timers.append(fanout.attach_to_eventlet())
pulse = utils.LoopingCall(self.report_state)
pulse.start(interval=self.report_interval, now=False)
diff --git a/nova/tests/api/openstack/test_zones.py b/nova/tests/api/openstack/test_zones.py
index 38399bb3f..a3f191aaa 100644
--- a/nova/tests/api/openstack/test_zones.py
+++ b/nova/tests/api/openstack/test_zones.py
@@ -75,6 +75,10 @@ def zone_get_all_db(context):
]
+def zone_capabilities(method, context, params):
+ return dict()
+
+
class ZonesTest(test.TestCase):
def setUp(self):
super(ZonesTest, self).setUp()
@@ -93,13 +97,18 @@ class ZonesTest(test.TestCase):
self.stubs.Set(nova.db, 'zone_create', zone_create)
self.stubs.Set(nova.db, 'zone_delete', zone_delete)
+ self.old_zone_name = FLAGS.zone_name
+ self.old_zone_capabilities = FLAGS.zone_capabilities
+
def tearDown(self):
self.stubs.UnsetAll()
FLAGS.allow_admin_api = self.allow_admin
+ FLAGS.zone_name = self.old_zone_name
+ FLAGS.zone_capabilities = self.old_zone_capabilities
super(ZonesTest, self).tearDown()
def test_get_zone_list_scheduler(self):
- self.stubs.Set(api.API, '_call_scheduler', zone_get_all_scheduler)
+ self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler)
req = webob.Request.blank('/v1.0/zones')
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
@@ -108,8 +117,7 @@ class ZonesTest(test.TestCase):
self.assertEqual(len(res_dict['zones']), 2)
def test_get_zone_list_db(self):
- self.stubs.Set(api.API, '_call_scheduler',
- zone_get_all_scheduler_empty)
+ self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler_empty)
self.stubs.Set(nova.db, 'zone_get_all', zone_get_all_db)
req = webob.Request.blank('/v1.0/zones')
req.headers["Content-Type"] = "application/json"
@@ -167,3 +175,18 @@ class ZonesTest(test.TestCase):
self.assertEqual(res_dict['zone']['id'], 1)
self.assertEqual(res_dict['zone']['api_url'], 'http://example.com')
self.assertFalse('username' in res_dict['zone'])
+
+ def test_zone_info(self):
+ FLAGS.zone_name = 'darksecret'
+ FLAGS.zone_capabilities = ['cap1=a;b', 'cap2=c;d']
+ self.stubs.Set(api, '_call_scheduler', zone_capabilities)
+
+ body = dict(zone=dict(username='zeb', password='sneaky'))
+ req = webob.Request.blank('/v1.0/zones/info')
+
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+ self.assertEqual(res.status_int, 200)
+ self.assertEqual(res_dict['zone']['name'], 'darksecret')
+ self.assertEqual(res_dict['zone']['cap1'], 'a;b')
+ self.assertEqual(res_dict['zone']['cap2'], 'c;d')
diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py
index 4820e04fb..44d7c91eb 100644
--- a/nova/tests/test_rpc.py
+++ b/nova/tests/test_rpc.py
@@ -36,7 +36,7 @@ class RpcTestCase(test.TestCase):
super(RpcTestCase, self).setUp()
self.conn = rpc.Connection.instance(True)
self.receiver = TestReceiver()
- self.consumer = rpc.AdapterConsumer(connection=self.conn,
+ self.consumer = rpc.TopicAdapterConsumer(connection=self.conn,
topic='test',
proxy=self.receiver)
self.consumer.attach_to_eventlet()
@@ -97,7 +97,7 @@ class RpcTestCase(test.TestCase):
nested = Nested()
conn = rpc.Connection.instance(True)
- consumer = rpc.AdapterConsumer(connection=conn,
+ consumer = rpc.TopicAdapterConsumer(connection=conn,
topic='nested',
proxy=nested)
consumer.attach_to_eventlet()
diff --git a/nova/tests/test_scheduler.py b/nova/tests/test_scheduler.py
index 244e43bd9..6df74dd61 100644
--- a/nova/tests/test_scheduler.py
+++ b/nova/tests/test_scheduler.py
@@ -21,6 +21,9 @@ Tests For Scheduler
import datetime
import mox
+import novaclient.exceptions
+import stubout
+import webob
from mox import IgnoreArg
from nova import context
@@ -32,6 +35,7 @@ from nova import test
from nova import rpc
from nova import utils
from nova.auth import manager as auth_manager
+from nova.scheduler import api
from nova.scheduler import manager
from nova.scheduler import driver
from nova.compute import power_state
@@ -937,3 +941,160 @@ class SimpleDriverTestCase(test.TestCase):
db.instance_destroy(self.context, instance_id)
db.service_destroy(self.context, s_ref['id'])
db.service_destroy(self.context, s_ref2['id'])
+
+
+class FakeZone(object):
+ def __init__(self, api_url, username, password):
+ self.api_url = api_url
+ self.username = username
+ self.password = password
+
+
+def zone_get_all(context):
+ return [
+ FakeZone('http://example.com', 'bob', 'xxx'),
+ ]
+
+
+class FakeRerouteCompute(api.reroute_compute):
+ def _call_child_zones(self, zones, function):
+ return []
+
+ def get_collection_context_and_id(self, args, kwargs):
+ return ("servers", None, 1)
+
+ def unmarshall_result(self, zone_responses):
+ return dict(magic="found me")
+
+
+def go_boom(self, context, instance):
+ raise exception.InstanceNotFound("boom message", instance)
+
+
+def found_instance(self, context, instance):
+ return dict(name='myserver')
+
+
+class FakeResource(object):
+ def __init__(self, attribute_dict):
+ for k, v in attribute_dict.iteritems():
+ setattr(self, k, v)
+
+ def pause(self):
+ pass
+
+
+class ZoneRedirectTest(test.TestCase):
+ def setUp(self):
+ super(ZoneRedirectTest, self).setUp()
+ self.stubs = stubout.StubOutForTesting()
+
+ self.stubs.Set(db, 'zone_get_all', zone_get_all)
+
+ self.enable_zone_routing = FLAGS.enable_zone_routing
+ FLAGS.enable_zone_routing = True
+
+ def tearDown(self):
+ self.stubs.UnsetAll()
+ FLAGS.enable_zone_routing = self.enable_zone_routing
+ super(ZoneRedirectTest, self).tearDown()
+
+ def test_trap_found_locally(self):
+ decorator = FakeRerouteCompute("foo")
+ try:
+ result = decorator(found_instance)(None, None, 1)
+ except api.RedirectResult, e:
+ self.fail(_("Successful database hit should succeed"))
+
+ def test_trap_not_found_locally(self):
+ decorator = FakeRerouteCompute("foo")
+ try:
+ result = decorator(go_boom)(None, None, 1)
+ self.assertFail(_("Should have rerouted."))
+ except api.RedirectResult, e:
+ self.assertEquals(e.results['magic'], 'found me')
+
+ def test_routing_flags(self):
+ FLAGS.enable_zone_routing = False
+ decorator = FakeRerouteCompute("foo")
+ try:
+ result = decorator(go_boom)(None, None, 1)
+ self.assertFail(_("Should have thrown exception."))
+ except exception.InstanceNotFound, e:
+ self.assertEquals(e.message, 'boom message')
+
+ def test_get_collection_context_and_id(self):
+ decorator = api.reroute_compute("foo")
+ self.assertEquals(decorator.get_collection_context_and_id(
+ (None, 10, 20), {}), ("servers", 10, 20))
+ self.assertEquals(decorator.get_collection_context_and_id(
+ (None, 11,), dict(instance_id=21)), ("servers", 11, 21))
+ self.assertEquals(decorator.get_collection_context_and_id(
+ (None,), dict(context=12, instance_id=22)), ("servers", 12, 22))
+
+ def test_unmarshal_single_server(self):
+ decorator = api.reroute_compute("foo")
+ self.assertEquals(decorator.unmarshall_result([]), {})
+ self.assertEquals(decorator.unmarshall_result(
+ [FakeResource(dict(a=1, b=2)), ]),
+ dict(server=dict(a=1, b=2)))
+ self.assertEquals(decorator.unmarshall_result(
+ [FakeResource(dict(a=1, _b=2)), ]),
+ dict(server=dict(a=1,)))
+ self.assertEquals(decorator.unmarshall_result(
+ [FakeResource(dict(a=1, manager=2)), ]),
+ dict(server=dict(a=1,)))
+ self.assertEquals(decorator.unmarshall_result(
+ [FakeResource(dict(_a=1, manager=2)), ]),
+ dict(server={}))
+
+
+class FakeServerCollection(object):
+ def get(self, instance_id):
+ return FakeResource(dict(a=10, b=20))
+
+ def find(self, name):
+ return FakeResource(dict(a=11, b=22))
+
+
+class FakeEmptyServerCollection(object):
+ def get(self, f):
+ raise novaclient.NotFound(1)
+
+ def find(self, name):
+ raise novaclient.NotFound(2)
+
+
+class FakeNovaClient(object):
+ def __init__(self, collection):
+ self.servers = collection
+
+
+class DynamicNovaClientTest(test.TestCase):
+ def test_issue_novaclient_command_found(self):
+ zone = FakeZone('http://example.com', 'bob', 'xxx')
+ self.assertEquals(api._issue_novaclient_command(
+ FakeNovaClient(FakeServerCollection()),
+ zone, "servers", "get", 100).a, 10)
+
+ self.assertEquals(api._issue_novaclient_command(
+ FakeNovaClient(FakeServerCollection()),
+ zone, "servers", "find", "name").b, 22)
+
+ self.assertEquals(api._issue_novaclient_command(
+ FakeNovaClient(FakeServerCollection()),
+ zone, "servers", "pause", 100), None)
+
+ def test_issue_novaclient_command_not_found(self):
+ zone = FakeZone('http://example.com', 'bob', 'xxx')
+ self.assertEquals(api._issue_novaclient_command(
+ FakeNovaClient(FakeEmptyServerCollection()),
+ zone, "servers", "get", 100), None)
+
+ self.assertEquals(api._issue_novaclient_command(
+ FakeNovaClient(FakeEmptyServerCollection()),
+ zone, "servers", "find", "name"), None)
+
+ self.assertEquals(api._issue_novaclient_command(
+ FakeNovaClient(FakeEmptyServerCollection()),
+ zone, "servers", "any", "name"), None)
diff --git a/nova/tests/test_service.py b/nova/tests/test_service.py
index 393f9d20b..d48de2057 100644
--- a/nova/tests/test_service.py
+++ b/nova/tests/test_service.py
@@ -109,20 +109,29 @@ class ServiceTestCase(test.TestCase):
app = service.Service.create(host=host, binary=binary)
self.mox.StubOutWithMock(rpc,
- 'AdapterConsumer',
+ 'TopicAdapterConsumer',
use_mock_anything=True)
- rpc.AdapterConsumer(connection=mox.IgnoreArg(),
+ self.mox.StubOutWithMock(rpc,
+ 'FanoutAdapterConsumer',
+ use_mock_anything=True)
+ rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
topic=topic,
proxy=mox.IsA(service.Service)).AndReturn(
- rpc.AdapterConsumer)
+ rpc.TopicAdapterConsumer)
- rpc.AdapterConsumer(connection=mox.IgnoreArg(),
+ rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
topic='%s.%s' % (topic, host),
proxy=mox.IsA(service.Service)).AndReturn(
- rpc.AdapterConsumer)
+ rpc.TopicAdapterConsumer)
+
+ rpc.FanoutAdapterConsumer(connection=mox.IgnoreArg(),
+ topic=topic,
+ proxy=mox.IsA(service.Service)).AndReturn(
+ rpc.FanoutAdapterConsumer)
- rpc.AdapterConsumer.attach_to_eventlet()
- rpc.AdapterConsumer.attach_to_eventlet()
+ rpc.TopicAdapterConsumer.attach_to_eventlet()
+ rpc.TopicAdapterConsumer.attach_to_eventlet()
+ rpc.FanoutAdapterConsumer.attach_to_eventlet()
service_create = {'host': host,
'binary': binary,
@@ -279,6 +288,7 @@ class ServiceTestCase(test.TestCase):
self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
service.rpc.Connection.instance(new=mox.IgnoreArg())
service.rpc.Connection.instance(new=mox.IgnoreArg())
+ service.rpc.Connection.instance(new=mox.IgnoreArg())
self.mox.StubOutWithMock(serv.manager.driver,
'update_available_resource')
serv.manager.driver.update_available_resource(mox.IgnoreArg(), host)
diff --git a/nova/tests/test_test.py b/nova/tests/test_test.py
index e237674e6..35c838065 100644
--- a/nova/tests/test_test.py
+++ b/nova/tests/test_test.py
@@ -34,7 +34,7 @@ class IsolationTestCase(test.TestCase):
def test_rpc_consumer_isolation(self):
connection = rpc.Connection.instance(new=True)
- consumer = rpc.TopicConsumer(connection, topic='compute')
+ consumer = rpc.TopicAdapterConsumer(connection, topic='compute')
consumer.register_callback(
lambda x, y: self.fail('I should never be called'))
consumer.attach_to_eventlet()
diff --git a/nova/tests/test_volume.py b/nova/tests/test_volume.py
index 5d68ca2ae..d71b75f3f 100644
--- a/nova/tests/test_volume.py
+++ b/nova/tests/test_volume.py
@@ -356,8 +356,8 @@ class ISCSITestCase(DriverTestCase):
tid = db.volume_get_iscsi_target_num(self.context, volume_id_list[0])
self.mox.StubOutWithMock(self.volume.driver, '_execute')
self.volume.driver._execute("sudo", "ietadm", "--op", "show",
- "--tid=%(tid)d" % locals()
- ).AndRaise(exception.ProcessExecutionError())
+ "--tid=%(tid)d" % locals()).AndRaise(
+ exception.ProcessExecutionError())
self.mox.ReplayAll()
self.assertRaises(exception.ProcessExecutionError,
diff --git a/nova/tests/test_zones.py b/nova/tests/test_zones.py
index 5a52a0506..688dc704d 100644
--- a/nova/tests/test_zones.py
+++ b/nova/tests/test_zones.py
@@ -76,6 +76,40 @@ class ZoneManagerTestCase(test.TestCase):
self.assertEquals(len(zm.zone_states), 1)
self.assertEquals(zm.zone_states[1].username, 'user1')
+ def test_service_capabilities(self):
+ zm = zone_manager.ZoneManager()
+ caps = zm.get_zone_capabilities(self, None)
+ self.assertEquals(caps, {})
+
+ zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
+ caps = zm.get_zone_capabilities(self, None)
+ self.assertEquals(caps, dict(svc1_a=(1, 1), svc1_b=(2, 2)))
+
+ zm.update_service_capabilities("svc1", "host1", dict(a=2, b=3))
+ caps = zm.get_zone_capabilities(self, None)
+ self.assertEquals(caps, dict(svc1_a=(2, 2), svc1_b=(3, 3)))
+
+ zm.update_service_capabilities("svc1", "host2", dict(a=20, b=30))
+ caps = zm.get_zone_capabilities(self, None)
+ self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30)))
+
+ zm.update_service_capabilities("svc10", "host1", dict(a=99, b=99))
+ caps = zm.get_zone_capabilities(self, None)
+ self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
+ svc10_a=(99, 99), svc10_b=(99, 99)))
+
+ zm.update_service_capabilities("svc1", "host3", dict(c=5))
+ caps = zm.get_zone_capabilities(self, None)
+ self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
+ svc1_c=(5, 5), svc10_a=(99, 99),
+ svc10_b=(99, 99)))
+
+ caps = zm.get_zone_capabilities(self, 'svc1')
+ self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
+ svc1_c=(5, 5)))
+ caps = zm.get_zone_capabilities(self, 'svc10')
+ self.assertEquals(caps, dict(svc10_a=(99, 99), svc10_b=(99, 99)))
+
def test_refresh_from_db_replace_existing(self):
zm = zone_manager.ZoneManager()
zone_state = zone_manager.ZoneState()
diff --git a/nova/utils.py b/nova/utils.py
index e4d8a70eb..2f568f739 100644
--- a/nova/utils.py
+++ b/nova/utils.py
@@ -171,10 +171,6 @@ def execute(*cmd, **kwargs):
stdout=stdout,
stderr=stderr,
cmd=' '.join(cmd))
- # NOTE(termie): this appears to be necessary to let the subprocess
- # call clean something up in between calls, without
- # it two execute calls in a row hangs the second one
- greenthread.sleep(0)
return result
except ProcessExecutionError:
if not attempts:
@@ -183,6 +179,11 @@ def execute(*cmd, **kwargs):
LOG.debug(_("%r failed. Retrying."), cmd)
if delay_on_retry:
greenthread.sleep(random.randint(20, 200) / 100.0)
+ finally:
+ # NOTE(termie): this appears to be necessary to let the subprocess
+ # call clean something up in between calls, without
+ # it two execute calls in a row hangs the second one
+ greenthread.sleep(0)
def ssh_execute(ssh, cmd, process_input=None,
diff --git a/nova/virt/fake.py b/nova/virt/fake.py
index 5b0fe1877..7018f8c1b 100644
--- a/nova/virt/fake.py
+++ b/nova/virt/fake.py
@@ -344,7 +344,7 @@ class FakeConnection(driver.ComputeDriver):
Note that this function takes an instance ID, not a
compute.service.Instance, so that it can be called by compute.monitor.
"""
- return [0L, 0L, 0L, 0L, null]
+ return [0L, 0L, 0L, 0L, None]
def interface_stats(self, instance_name, iface_id):
"""
diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py
index e1a0a6f29..2bb96f819 100644
--- a/nova/virt/libvirt_conn.py
+++ b/nova/virt/libvirt_conn.py
@@ -1008,7 +1008,18 @@ class LibvirtConnection(driver.ComputeDriver):
"""
- return self._conn.getVersion()
+ # NOTE(justinsb): getVersion moved between libvirt versions
+ # Trying to do be compatible with older versions is a lost cause
+ # But ... we can at least give the user a nice message
+ method = getattr(self._conn, 'getVersion', None)
+ if method is None:
+ raise exception.Error(_("libvirt version is too old"
+ " (does not support getVersion)"))
+ # NOTE(justinsb): If we wanted to get the version, we could:
+ # method = getattr(libvirt, 'getVersion', None)
+ # NOTE(justinsb): This would then rely on a proper version check
+
+ return method()
def get_cpu_info(self):
"""Get cpuinfo information.
diff --git a/nova/volume/manager.py b/nova/volume/manager.py
index 9dea35b35..2178389ce 100644
--- a/nova/volume/manager.py
+++ b/nova/volume/manager.py
@@ -64,14 +64,15 @@ flags.DEFINE_boolean('use_local_volumes', True,
'if True, will not discover local volumes')
-class VolumeManager(manager.Manager):
+class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
def __init__(self, volume_driver=None, *args, **kwargs):
"""Load the driver from the one specified in args, or from flags."""
if not volume_driver:
volume_driver = FLAGS.volume_driver
self.driver = utils.import_object(volume_driver)
- super(VolumeManager, self).__init__(*args, **kwargs)
+ super(VolumeManager, self).__init__(service_name='volume',
+ *args, **kwargs)
# NOTE(vish): Implementation specific db handling is done
# by the driver.
self.driver.db = self.db