diff options
| author | Naveed Massjouni <naveedm9@gmail.com> | 2011-03-24 18:32:00 -0400 |
|---|---|---|
| committer | Naveed Massjouni <naveedm9@gmail.com> | 2011-03-24 18:32:00 -0400 |
| commit | 74c226c564d5357b8b09edc67cc0bdfec6b8d871 (patch) | |
| tree | e68bd07d29c3effbe7cd99c874d4eba08f74e75f | |
| parent | a6174e64b541560989c305b50787c96fb5890679 (diff) | |
| parent | f186c8ecc21cbcddf6e1e94053d6e250717852cb (diff) | |
| download | nova-74c226c564d5357b8b09edc67cc0bdfec6b8d871.tar.gz nova-74c226c564d5357b8b09edc67cc0bdfec6b8d871.tar.xz nova-74c226c564d5357b8b09edc67cc0bdfec6b8d871.zip | |
Merge from trunk
50 files changed, 1894 insertions, 332 deletions
diff --git a/bin/nova-direct-api b/bin/nova-direct-api index a2c9f1557..83ec72722 100755 --- a/bin/nova-direct-api +++ b/bin/nova-direct-api @@ -34,12 +34,14 @@ if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')): gettext.install('nova', unicode=1) +from nova import compute from nova import flags from nova import log as logging +from nova import network from nova import utils +from nova import volume from nova import wsgi from nova.api import direct -from nova.compute import api as compute_api FLAGS = flags.FLAGS @@ -50,13 +52,42 @@ flags.DEFINE_flag(flags.HelpshortFlag()) flags.DEFINE_flag(flags.HelpXMLFlag()) +# An example of an API that only exposes read-only methods. +# In this case we're just limiting which methods are exposed. +class ReadOnlyCompute(direct.Limited): + """Read-only Compute API.""" + + _allowed = ['get', 'get_all', 'get_console_output'] + + +# An example of an API that provides a backwards compatibility layer. +# In this case we're overwriting the implementation to ensure +# compatibility with an older version. In reality we would want the +# "description=None" to be part of the actual API so that code +# like this isn't even necessary, but this example shows what one can +# do if that isn't the situation. +class VolumeVersionOne(direct.Limited): + _allowed = ['create', 'delete', 'update', 'get'] + + def create(self, context, size, name): + self.proxy.create(context, size, name, description=None) + + if __name__ == '__main__': utils.default_flagfile() FLAGS(sys.argv) logging.setup() - direct.register_service('compute', compute_api.API()) + direct.register_service('compute', compute.API()) + direct.register_service('volume', volume.API()) + direct.register_service('network', network.API()) direct.register_service('reflect', direct.Reflection()) + + # Here is how we could expose the code in the examples above. + #direct.register_service('compute-readonly', + # ReadOnlyCompute(compute.API())) + #direct.register_service('volume-v1', VolumeVersionOne(volume.API())) + router = direct.Router() with_json = direct.JsonParamsMiddleware(router) with_req = direct.PostParamsMiddleware(with_json) diff --git a/bin/nova-manage b/bin/nova-manage index 69cbf6f95..cf0caf47e 100755 --- a/bin/nova-manage +++ b/bin/nova-manage @@ -97,6 +97,7 @@ flags.DECLARE('vlan_start', 'nova.network.manager') flags.DECLARE('vpn_start', 'nova.network.manager') flags.DECLARE('fixed_range_v6', 'nova.network.manager') flags.DECLARE('images_path', 'nova.image.local') +flags.DECLARE('libvirt_type', 'nova.virt.libvirt_conn') flags.DEFINE_flag(flags.HelpFlag()) flags.DEFINE_flag(flags.HelpshortFlag()) flags.DEFINE_flag(flags.HelpXMLFlag()) @@ -610,7 +611,7 @@ class ServiceCommands(object): args: [host] [service]""" ctxt = context.get_admin_context() now = datetime.datetime.utcnow() - services = db.service_get_all(ctxt) + db.service_get_all(ctxt, True) + services = db.service_get_all(ctxt) if host: services = [s for s in services if s['host'] == host] if service: @@ -59,11 +59,21 @@ USAGE = """usage: stack [options] <controller> <method> [arg1=value arg2=value] def format_help(d): """Format help text, keys are labels and values are descriptions.""" + MAX_INDENT = 30 indent = max([len(k) for k in d]) + if indent > MAX_INDENT: + indent = MAX_INDENT - 6 + out = [] for k, v in d.iteritems(): - t = textwrap.TextWrapper(initial_indent=' %s ' % k.ljust(indent), - subsequent_indent=' ' * (indent + 6)) + if (len(k) + 6) > MAX_INDENT: + out.extend([' %s' % k]) + initial_indent = ' ' * (indent + 6) + else: + initial_indent = ' %s ' % k.ljust(indent) + subsequent_indent = ' ' * (indent + 6) + t = textwrap.TextWrapper(initial_indent=initial_indent, + subsequent_indent=subsequent_indent) out.extend(t.wrap(v)) return out diff --git a/etc/api-paste.ini b/etc/api-paste.ini index 169163289..35d4a8391 100644 --- a/etc/api-paste.ini +++ b/etc/api-paste.ini @@ -74,7 +74,7 @@ use = egg:Paste#urlmap pipeline = faultwrap auth ratelimit osapiapp10 [pipeline:openstackapi11] -pipeline = faultwrap auth ratelimit osapiapp11 +pipeline = faultwrap auth ratelimit extensions osapiapp11 [filter:faultwrap] paste.filter_factory = nova.api.openstack:FaultWrapper.factory @@ -85,6 +85,9 @@ paste.filter_factory = nova.api.openstack.auth:AuthMiddleware.factory [filter:ratelimit] paste.filter_factory = nova.api.openstack.limits:RateLimitingMiddleware.factory +[filter:extensions] +paste.filter_factory = nova.api.openstack.extensions:ExtensionMiddleware.factory + [app:osapiapp10] paste.app_factory = nova.api.openstack:APIRouterV10.factory diff --git a/nova/api/direct.py b/nova/api/direct.py index dfca250e0..e5f33cee4 100644 --- a/nova/api/direct.py +++ b/nova/api/direct.py @@ -38,6 +38,7 @@ import routes import webob from nova import context +from nova import exception from nova import flags from nova import utils from nova import wsgi @@ -205,10 +206,53 @@ class ServiceWrapper(wsgi.Controller): # NOTE(vish): make sure we have no unicode keys for py2.6. params = dict([(str(k), v) for (k, v) in params.iteritems()]) result = method(context, **params) - if type(result) is dict or type(result) is list: - return self._serialize(result, req.best_match_content_type()) - else: + if result is None or type(result) is str or type(result) is unicode: return result + try: + return self._serialize(result, req.best_match_content_type()) + except: + raise exception.Error("returned non-serializable type: %s" + % result) + + +class Limited(object): + __notdoc = """Limit the available methods on a given object. + + (Not a docstring so that the docstring can be conditionally overriden.) + + Useful when defining a public API that only exposes a subset of an + internal API. + + Expected usage of this class is to define a subclass that lists the allowed + methods in the 'allowed' variable. + + Additionally where appropriate methods can be added or overwritten, for + example to provide backwards compatibility. + + The wrapping approach has been chosen so that the wrapped API can maintain + its own internal consistency, for example if it calls "self.create" it + should get its own create method rather than anything we do here. + + """ + + _allowed = None + + def __init__(self, proxy): + self._proxy = proxy + if not self.__doc__: + self.__doc__ = proxy.__doc__ + if not self._allowed: + self._allowed = [] + + def __getattr__(self, key): + """Only return methods that are named in self._allowed.""" + if key not in self._allowed: + raise AttributeError() + return getattr(self._proxy, key) + + def __dir__(self): + """Only return methods that are named in self._allowed.""" + return [x for x in dir(self._proxy) if x in self._allowed] class Proxy(object): diff --git a/nova/api/ec2/__init__.py b/nova/api/ec2/__init__.py index 20701cfa8..a3c3b25a1 100644 --- a/nova/api/ec2/__init__.py +++ b/nova/api/ec2/__init__.py @@ -61,10 +61,13 @@ class RequestLogging(wsgi.Middleware): return rv def log_request_completion(self, response, request, start): - controller = request.environ.get('ec2.controller', None) - if controller: - controller = controller.__class__.__name__ - action = request.environ.get('ec2.action', None) + apireq = request.environ.get('ec2.request', None) + if apireq: + controller = apireq.controller + action = apireq.action + else: + controller = None + action = None ctxt = request.environ.get('ec2.context', None) delta = utils.utcnow() - start seconds = delta.seconds @@ -75,7 +78,7 @@ class RequestLogging(wsgi.Middleware): microseconds, request.remote_addr, request.method, - request.path_info, + "%s%s" % (request.script_name, request.path_info), controller, action, response.status_int, diff --git a/nova/api/ec2/admin.py b/nova/api/ec2/admin.py index d8d90ad83..6a5609d4a 100644 --- a/nova/api/ec2/admin.py +++ b/nova/api/ec2/admin.py @@ -304,7 +304,7 @@ class AdminController(object): * Volume (up, down, None) * Volume Count """ - services = db.service_get_all(context) + services = db.service_get_all(context, False) now = datetime.datetime.utcnow() hosts = [] rv = [] diff --git a/nova/api/ec2/cloud.py b/nova/api/ec2/cloud.py index e257e44e7..0da642318 100644 --- a/nova/api/ec2/cloud.py +++ b/nova/api/ec2/cloud.py @@ -196,7 +196,7 @@ class CloudController(object): def _describe_availability_zones(self, context, **kwargs): ctxt = context.elevated() - enabled_services = db.service_get_all(ctxt) + enabled_services = db.service_get_all(ctxt, False) disabled_services = db.service_get_all(ctxt, True) available_zones = [] for zone in [service.availability_zone for service @@ -221,7 +221,7 @@ class CloudController(object): rv = {'availabilityZoneInfo': [{'zoneName': 'nova', 'zoneState': 'available'}]} - services = db.service_get_all(context) + services = db.service_get_all(context, False) now = datetime.datetime.utcnow() hosts = [] for host in [service['host'] for service in services]: @@ -541,7 +541,7 @@ class CloudController(object): volumes = [] for ec2_id in volume_id: internal_id = ec2utils.ec2_id_to_id(ec2_id) - volume = self.volume_api.get(context, internal_id) + volume = self.volume_api.get(context, volume_id=internal_id) volumes.append(volume) else: volumes = self.volume_api.get_all(context) @@ -585,9 +585,11 @@ class CloudController(object): def create_volume(self, context, size, **kwargs): LOG.audit(_("Create volume of %s GB"), size, context=context) - volume = self.volume_api.create(context, size, - kwargs.get('display_name'), - kwargs.get('display_description')) + volume = self.volume_api.create( + context, + size=size, + name=kwargs.get('display_name'), + description=kwargs.get('display_description')) # TODO(vish): Instance should be None at db layer instead of # trying to lazy load, but for now we turn it into # a dict to avoid an error. @@ -606,7 +608,9 @@ class CloudController(object): if field in kwargs: changes[field] = kwargs[field] if changes: - self.volume_api.update(context, volume_id, kwargs) + self.volume_api.update(context, + volume_id=volume_id, + fields=changes) return True def attach_volume(self, context, volume_id, instance_id, device, **kwargs): @@ -619,7 +623,7 @@ class CloudController(object): instance_id=instance_id, volume_id=volume_id, device=device) - volume = self.volume_api.get(context, volume_id) + volume = self.volume_api.get(context, volume_id=volume_id) return {'attachTime': volume['attach_time'], 'device': volume['mountpoint'], 'instanceId': ec2utils.id_to_ec2_id(instance_id), @@ -630,7 +634,7 @@ class CloudController(object): def detach_volume(self, context, volume_id, **kwargs): volume_id = ec2utils.ec2_id_to_id(volume_id) LOG.audit(_("Detach volume %s"), volume_id, context=context) - volume = self.volume_api.get(context, volume_id) + volume = self.volume_api.get(context, volume_id=volume_id) instance = self.compute_api.detach_volume(context, volume_id=volume_id) return {'attachTime': volume['attach_time'], 'device': volume['mountpoint'], @@ -768,7 +772,7 @@ class CloudController(object): def release_address(self, context, public_ip, **kwargs): LOG.audit(_("Release address %s"), public_ip, context=context) - self.network_api.release_floating_ip(context, public_ip) + self.network_api.release_floating_ip(context, address=public_ip) return {'releaseResponse': ["Address released."]} def associate_address(self, context, instance_id, public_ip, **kwargs): @@ -782,7 +786,7 @@ class CloudController(object): def disassociate_address(self, context, public_ip, **kwargs): LOG.audit(_("Disassociate address %s"), public_ip, context=context) - self.network_api.disassociate_floating_ip(context, public_ip) + self.network_api.disassociate_floating_ip(context, address=public_ip) return {'disassociateResponse': ["Address disassociated."]} def run_instances(self, context, **kwargs): diff --git a/nova/api/openstack/__init__.py b/nova/api/openstack/__init__.py index 143b1d2b2..0531b3504 100644 --- a/nova/api/openstack/__init__.py +++ b/nova/api/openstack/__init__.py @@ -71,7 +71,7 @@ class APIRouter(wsgi.Router): """Simple paste factory, :class:`nova.wsgi.Router` doesn't have one""" return cls() - def __init__(self): + def __init__(self, ext_mgr=None): self.server_members = {} mapper = routes.Mapper() self._setup_routes(mapper) diff --git a/nova/api/openstack/accounts.py b/nova/api/openstack/accounts.py index 2510ffb61..86066fa20 100644 --- a/nova/api/openstack/accounts.py +++ b/nova/api/openstack/accounts.py @@ -14,6 +14,7 @@ # under the License. import common +import webob.exc from nova import exception from nova import flags @@ -51,10 +52,10 @@ class Controller(wsgi.Controller): raise exception.NotAuthorized(_("Not admin user.")) def index(self, req): - raise faults.Fault(exc.HTTPNotImplemented()) + raise faults.Fault(webob.exc.HTTPNotImplemented()) def detail(self, req): - raise faults.Fault(exc.HTTPNotImplemented()) + raise faults.Fault(webob.exc.HTTPNotImplemented()) def show(self, req, id): """Return data about the given account id""" @@ -69,7 +70,7 @@ class Controller(wsgi.Controller): def create(self, req): """We use update with create-or-update semantics because the id comes from an external source""" - raise faults.Fault(exc.HTTPNotImplemented()) + raise faults.Fault(webob.exc.HTTPNotImplemented()) def update(self, req, id): """This is really create or update.""" diff --git a/nova/api/openstack/extensions.py b/nova/api/openstack/extensions.py new file mode 100644 index 000000000..9d98d849a --- /dev/null +++ b/nova/api/openstack/extensions.py @@ -0,0 +1,369 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import imp +import os +import sys +import routes +import webob.dec +import webob.exc + +from nova import flags +from nova import log as logging +from nova import wsgi +from nova.api.openstack import faults + + +LOG = logging.getLogger('extensions') + + +FLAGS = flags.FLAGS + + +class ActionExtensionController(wsgi.Controller): + + def __init__(self, application): + + self.application = application + self.action_handlers = {} + + def add_action(self, action_name, handler): + self.action_handlers[action_name] = handler + + def action(self, req, id): + + input_dict = self._deserialize(req.body, req.get_content_type()) + for action_name, handler in self.action_handlers.iteritems(): + if action_name in input_dict: + return handler(input_dict, req, id) + # no action handler found (bump to downstream application) + res = self.application + return res + + +class ResponseExtensionController(wsgi.Controller): + + def __init__(self, application): + self.application = application + self.handlers = [] + + def add_handler(self, handler): + self.handlers.append(handler) + + def process(self, req, *args, **kwargs): + res = req.get_response(self.application) + content_type = req.best_match_content_type() + # currently response handlers are un-ordered + for handler in self.handlers: + res = handler(res) + try: + body = res.body + headers = res.headers + except AttributeError: + body = self._serialize(res, content_type) + headers = {"Content-Type": content_type} + res = webob.Response() + res.body = body + res.headers = headers + return res + + +class ExtensionController(wsgi.Controller): + + def __init__(self, extension_manager): + self.extension_manager = extension_manager + + def _translate(self, ext): + ext_data = {} + ext_data['name'] = ext.get_name() + ext_data['alias'] = ext.get_alias() + ext_data['description'] = ext.get_description() + ext_data['namespace'] = ext.get_namespace() + ext_data['updated'] = ext.get_updated() + ext_data['links'] = [] # TODO: implement extension links + return ext_data + + def index(self, req): + extensions = [] + for alias, ext in self.extension_manager.extensions.iteritems(): + extensions.append(self._translate(ext)) + return dict(extensions=extensions) + + def show(self, req, id): + # NOTE: the extensions alias is used as the 'id' for show + ext = self.extension_manager.extensions[id] + return self._translate(ext) + + def delete(self, req, id): + raise faults.Fault(exc.HTTPNotFound()) + + def create(self, req): + raise faults.Fault(exc.HTTPNotFound()) + + def delete(self, req, id): + raise faults.Fault(exc.HTTPNotFound()) + + +class ExtensionMiddleware(wsgi.Middleware): + """ + Extensions middleware that intercepts configured routes for extensions. + """ + @classmethod + def factory(cls, global_config, **local_config): + """ paste factory """ + def _factory(app): + return cls(app, **local_config) + return _factory + + def _action_ext_controllers(self, application, ext_mgr, mapper): + """ + Return a dict of ActionExtensionController objects by collection + """ + action_controllers = {} + for action in ext_mgr.get_actions(): + if not action.collection in action_controllers.keys(): + controller = ActionExtensionController(application) + mapper.connect("/%s/:(id)/action.:(format)" % + action.collection, + action='action', + controller=controller, + conditions=dict(method=['POST'])) + mapper.connect("/%s/:(id)/action" % action.collection, + action='action', + controller=controller, + conditions=dict(method=['POST'])) + action_controllers[action.collection] = controller + + return action_controllers + + def _response_ext_controllers(self, application, ext_mgr, mapper): + """ + Return a dict of ResponseExtensionController objects by collection + """ + response_ext_controllers = {} + for resp_ext in ext_mgr.get_response_extensions(): + if not resp_ext.key in response_ext_controllers.keys(): + controller = ResponseExtensionController(application) + mapper.connect(resp_ext.url_route + '.:(format)', + action='process', + controller=controller, + conditions=resp_ext.conditions) + + mapper.connect(resp_ext.url_route, + action='process', + controller=controller, + conditions=resp_ext.conditions) + response_ext_controllers[resp_ext.key] = controller + + return response_ext_controllers + + def __init__(self, application, ext_mgr=None): + + if ext_mgr is None: + ext_mgr = ExtensionManager(FLAGS.osapi_extensions_path) + self.ext_mgr = ext_mgr + + mapper = routes.Mapper() + + # extended resources + for resource in ext_mgr.get_resources(): + LOG.debug(_('Extended resource: %s'), + resource.collection) + mapper.resource(resource.collection, resource.collection, + controller=resource.controller, + collection=resource.collection_actions, + member=resource.member_actions, + parent_resource=resource.parent) + + # extended actions + action_controllers = self._action_ext_controllers(application, ext_mgr, + mapper) + for action in ext_mgr.get_actions(): + LOG.debug(_('Extended action: %s'), action.action_name) + controller = action_controllers[action.collection] + controller.add_action(action.action_name, action.handler) + + # extended responses + resp_controllers = self._response_ext_controllers(application, ext_mgr, + mapper) + for response_ext in ext_mgr.get_response_extensions(): + LOG.debug(_('Extended response: %s'), response_ext.key) + controller = resp_controllers[response_ext.key] + controller.add_handler(response_ext.handler) + + self._router = routes.middleware.RoutesMiddleware(self._dispatch, + mapper) + + super(ExtensionMiddleware, self).__init__(application) + + @webob.dec.wsgify(RequestClass=wsgi.Request) + def __call__(self, req): + """ + Route the incoming request with router. + """ + req.environ['extended.app'] = self.application + return self._router + + @staticmethod + @webob.dec.wsgify(RequestClass=wsgi.Request) + def _dispatch(req): + """ + Returns the routed WSGI app's response or defers to the extended + application. + """ + match = req.environ['wsgiorg.routing_args'][1] + if not match: + return req.environ['extended.app'] + app = match['controller'] + return app + + +class ExtensionManager(object): + """ + Load extensions from the configured extension path. + See nova/tests/api/openstack/extensions/foxinsocks.py for an example + extension implementation. + """ + + def __init__(self, path): + LOG.audit(_('Initializing extension manager.')) + + self.path = path + self.extensions = {} + self._load_extensions() + + def get_resources(self): + """ + returns a list of ResourceExtension objects + """ + resources = [] + resources.append(ResourceExtension('extensions', + ExtensionController(self))) + for alias, ext in self.extensions.iteritems(): + try: + resources.extend(ext.get_resources()) + except AttributeError: + # NOTE: Extension aren't required to have resource extensions + pass + return resources + + def get_actions(self): + """ + returns a list of ActionExtension objects + """ + actions = [] + for alias, ext in self.extensions.iteritems(): + try: + actions.extend(ext.get_actions()) + except AttributeError: + # NOTE: Extension aren't required to have action extensions + pass + return actions + + def get_response_extensions(self): + """ + returns a list of ResponseExtension objects + """ + response_exts = [] + for alias, ext in self.extensions.iteritems(): + try: + response_exts.extend(ext.get_response_extensions()) + except AttributeError: + # NOTE: Extension aren't required to have response extensions + pass + return response_exts + + def _check_extension(self, extension): + """ + Checks for required methods in extension objects. + """ + try: + LOG.debug(_('Ext name: %s'), extension.get_name()) + LOG.debug(_('Ext alias: %s'), extension.get_alias()) + LOG.debug(_('Ext description: %s'), extension.get_description()) + LOG.debug(_('Ext namespace: %s'), extension.get_namespace()) + LOG.debug(_('Ext updated: %s'), extension.get_updated()) + except AttributeError as ex: + LOG.exception(_("Exception loading extension: %s"), unicode(ex)) + + def _load_extensions(self): + """ + Load extensions from the configured path. The extension name is + constructed from the module_name. If your extension module was named + widgets.py the extension class within that module should be + 'Widgets'. + + See nova/tests/api/openstack/extensions/foxinsocks.py for an example + extension implementation. + """ + if not os.path.exists(self.path): + return + + for f in os.listdir(self.path): + LOG.audit(_('Loading extension file: %s'), f) + mod_name, file_ext = os.path.splitext(os.path.split(f)[-1]) + ext_path = os.path.join(self.path, f) + if file_ext.lower() == '.py': + mod = imp.load_source(mod_name, ext_path) + ext_name = mod_name[0].upper() + mod_name[1:] + try: + new_ext = getattr(mod, ext_name)() + self._check_extension(new_ext) + self.extensions[new_ext.get_alias()] = new_ext + except AttributeError as ex: + LOG.exception(_("Exception loading extension: %s"), + unicode(ex)) + + +class ResponseExtension(object): + """ + ResponseExtension objects can be used to add data to responses from + core nova OpenStack API controllers. + """ + + def __init__(self, method, url_route, handler): + self.url_route = url_route + self.handler = handler + self.conditions = dict(method=[method]) + self.key = "%s-%s" % (method, url_route) + + +class ActionExtension(object): + """ + ActionExtension objects can be used to add custom actions to core nova + nova OpenStack API controllers. + """ + + def __init__(self, collection, action_name, handler): + self.collection = collection + self.action_name = action_name + self.handler = handler + + +class ResourceExtension(object): + """ + ResourceExtension objects can be used to add top level resources + to the OpenStack API in nova. + """ + + def __init__(self, collection, controller, parent=None, + collection_actions={}, member_actions={}): + self.collection = collection + self.controller = controller + self.parent = parent + self.collection_actions = collection_actions + self.member_actions = member_actions diff --git a/nova/api/openstack/servers.py b/nova/api/openstack/servers.py index 740a9bb57..144d14536 100644 --- a/nova/api/openstack/servers.py +++ b/nova/api/openstack/servers.py @@ -15,19 +15,19 @@ import base64 import hashlib -import json import traceback -from xml.dom import minidom from webob import exc +from xml.dom import minidom from nova import compute from nova import context from nova import exception from nova import flags from nova import log as logging -from nova import wsgi +from nova import quota from nova import utils +from nova import wsgi from nova.api.openstack import common from nova.api.openstack import faults import nova.api.openstack.views.addresses @@ -36,8 +36,8 @@ import nova.api.openstack.views.servers from nova.auth import manager as auth_manager from nova.compute import instance_types from nova.compute import power_state -from nova.quota import QuotaError import nova.api.openstack +from nova.scheduler import api as scheduler_api LOG = logging.getLogger('server') @@ -88,15 +88,18 @@ class Controller(wsgi.Controller): for inst in limited_list] return dict(servers=servers) + @scheduler_api.redirect_handler def show(self, req, id): """ Returns server details by server id """ try: - instance = self.compute_api.get(req.environ['nova.context'], id) + instance = self.compute_api.routing_get( + req.environ['nova.context'], id) builder = self._get_view_builder(req) return builder.build(instance, is_detail=True) except exception.NotFound: return faults.Fault(exc.HTTPNotFound()) + @scheduler_api.redirect_handler def delete(self, req, id): """ Destroys a server """ try: @@ -156,8 +159,8 @@ class Controller(wsgi.Controller): key_data=key_data, metadata=metadata, injected_files=injected_files) - except QuotaError as error: - self._handle_quota_errors(error) + except quota.QuotaError as error: + self._handle_quota_error(error) inst['instance_type'] = flavor_id inst['image_id'] = requested_image_id @@ -211,7 +214,7 @@ class Controller(wsgi.Controller): injected_files.append((path, contents)) return injected_files - def _handle_quota_errors(self, error): + def _handle_quota_error(self, error): """ Reraise quota errors as api-specific http exceptions """ @@ -227,6 +230,7 @@ class Controller(wsgi.Controller): # if the original error is okay, just reraise it raise error + @scheduler_api.redirect_handler def update(self, req, id): """ Updates the server name or password """ if len(req.body) == 0: @@ -242,7 +246,7 @@ class Controller(wsgi.Controller): update_dict['admin_pass'] = inst_dict['server']['adminPass'] try: self.compute_api.set_admin_password(ctxt, id) - except exception.TimeoutException, e: + except exception.TimeoutException: return exc.HTTPRequestTimeout() if 'name' in inst_dict['server']: update_dict['display_name'] = inst_dict['server']['name'] @@ -252,6 +256,7 @@ class Controller(wsgi.Controller): return faults.Fault(exc.HTTPNotFound()) return exc.HTTPNoContent() + @scheduler_api.redirect_handler def action(self, req, id): """Multi-purpose method used to reboot, rebuild, or resize a server""" @@ -317,6 +322,7 @@ class Controller(wsgi.Controller): return faults.Fault(exc.HTTPUnprocessableEntity()) return exc.HTTPAccepted() + @scheduler_api.redirect_handler def lock(self, req, id): """ lock the instance with id @@ -332,6 +338,7 @@ class Controller(wsgi.Controller): return faults.Fault(exc.HTTPUnprocessableEntity()) return exc.HTTPAccepted() + @scheduler_api.redirect_handler def unlock(self, req, id): """ unlock the instance with id @@ -347,6 +354,7 @@ class Controller(wsgi.Controller): return faults.Fault(exc.HTTPUnprocessableEntity()) return exc.HTTPAccepted() + @scheduler_api.redirect_handler def get_lock(self, req, id): """ return the boolean state of (instance with id)'s lock @@ -361,6 +369,7 @@ class Controller(wsgi.Controller): return faults.Fault(exc.HTTPUnprocessableEntity()) return exc.HTTPAccepted() + @scheduler_api.redirect_handler def reset_network(self, req, id): """ Reset networking on an instance (admin only). @@ -375,6 +384,7 @@ class Controller(wsgi.Controller): return faults.Fault(exc.HTTPUnprocessableEntity()) return exc.HTTPAccepted() + @scheduler_api.redirect_handler def inject_network_info(self, req, id): """ Inject network info for an instance (admin only). @@ -389,6 +399,7 @@ class Controller(wsgi.Controller): return faults.Fault(exc.HTTPUnprocessableEntity()) return exc.HTTPAccepted() + @scheduler_api.redirect_handler def pause(self, req, id): """ Permit Admins to Pause the server. """ ctxt = req.environ['nova.context'] @@ -400,6 +411,7 @@ class Controller(wsgi.Controller): return faults.Fault(exc.HTTPUnprocessableEntity()) return exc.HTTPAccepted() + @scheduler_api.redirect_handler def unpause(self, req, id): """ Permit Admins to Unpause the server. """ ctxt = req.environ['nova.context'] @@ -411,6 +423,7 @@ class Controller(wsgi.Controller): return faults.Fault(exc.HTTPUnprocessableEntity()) return exc.HTTPAccepted() + @scheduler_api.redirect_handler def suspend(self, req, id): """permit admins to suspend the server""" context = req.environ['nova.context'] @@ -422,6 +435,7 @@ class Controller(wsgi.Controller): return faults.Fault(exc.HTTPUnprocessableEntity()) return exc.HTTPAccepted() + @scheduler_api.redirect_handler def resume(self, req, id): """permit admins to resume the server from suspend""" context = req.environ['nova.context'] @@ -433,6 +447,7 @@ class Controller(wsgi.Controller): return faults.Fault(exc.HTTPUnprocessableEntity()) return exc.HTTPAccepted() + @scheduler_api.redirect_handler def rescue(self, req, id): """Permit users to rescue the server.""" context = req.environ["nova.context"] @@ -444,6 +459,7 @@ class Controller(wsgi.Controller): return faults.Fault(exc.HTTPUnprocessableEntity()) return exc.HTTPAccepted() + @scheduler_api.redirect_handler def unrescue(self, req, id): """Permit users to unrescue the server.""" context = req.environ["nova.context"] @@ -455,6 +471,7 @@ class Controller(wsgi.Controller): return faults.Fault(exc.HTTPUnprocessableEntity()) return exc.HTTPAccepted() + @scheduler_api.redirect_handler def get_ajax_console(self, req, id): """ Returns a url to an instance's ajaxterm console. """ try: @@ -464,6 +481,7 @@ class Controller(wsgi.Controller): return faults.Fault(exc.HTTPNotFound()) return exc.HTTPAccepted() + @scheduler_api.redirect_handler def diagnostics(self, req, id): """Permit Admins to retrieve server diagnostics.""" ctxt = req.environ["nova.context"] diff --git a/nova/api/openstack/zones.py b/nova/api/openstack/zones.py index 8fe84275a..846cb48a1 100644 --- a/nova/api/openstack/zones.py +++ b/nova/api/openstack/zones.py @@ -15,9 +15,10 @@ import common +from nova import db from nova import flags +from nova import log as logging from nova import wsgi -from nova import db from nova.scheduler import api @@ -38,7 +39,8 @@ def _exclude_keys(item, keys): def _scrub_zone(zone): - return _filter_keys(zone, ('id', 'api_url')) + return _exclude_keys(zone, ('username', 'password', 'created_at', + 'deleted', 'deleted_at', 'updated_at')) class Controller(wsgi.Controller): @@ -52,13 +54,9 @@ class Controller(wsgi.Controller): """Return all zones in brief""" # Ask the ZoneManager in the Scheduler for most recent data, # or fall-back to the database ... - items = api.API().get_zone_list(req.environ['nova.context']) - if not items: - items = db.zone_get_all(req.environ['nova.context']) - + items = api.get_zone_list(req.environ['nova.context']) items = common.limited(items, req) - items = [_exclude_keys(item, ['username', 'password']) - for item in items] + items = [_scrub_zone(item) for item in items] return dict(zones=items) def detail(self, req): @@ -67,29 +65,37 @@ class Controller(wsgi.Controller): def info(self, req): """Return name and capabilities for this zone.""" - return dict(zone=dict(name=FLAGS.zone_name, - capabilities=FLAGS.zone_capabilities)) + items = api.get_zone_capabilities(req.environ['nova.context']) + + zone = dict(name=FLAGS.zone_name) + caps = FLAGS.zone_capabilities + for cap in caps: + key, value = cap.split('=') + zone[key] = value + for item, (min_value, max_value) in items.iteritems(): + zone[item] = "%s,%s" % (min_value, max_value) + return dict(zone=zone) def show(self, req, id): """Return data about the given zone id""" zone_id = int(id) - zone = db.zone_get(req.environ['nova.context'], zone_id) + zone = api.zone_get(req.environ['nova.context'], zone_id) return dict(zone=_scrub_zone(zone)) def delete(self, req, id): zone_id = int(id) - db.zone_delete(req.environ['nova.context'], zone_id) + api.zone_delete(req.environ['nova.context'], zone_id) return {} def create(self, req): context = req.environ['nova.context'] env = self._deserialize(req.body, req.get_content_type()) - zone = db.zone_create(context, env["zone"]) + zone = api.zone_create(context, env["zone"]) return dict(zone=_scrub_zone(zone)) def update(self, req, id): context = req.environ['nova.context'] env = self._deserialize(req.body, req.get_content_type()) zone_id = int(id) - zone = db.zone_update(context, zone_id, env["zone"]) + zone = api.zone_update(context, zone_id, env["zone"]) return dict(zone=_scrub_zone(zone)) diff --git a/nova/compute/api.py b/nova/compute/api.py index 01eead4ac..f4aab97de 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -34,6 +34,7 @@ from nova import rpc from nova import utils from nova import volume from nova.compute import instance_types +from nova.scheduler import api as scheduler_api from nova.db import base FLAGS = flags.FLAGS @@ -352,6 +353,7 @@ class API(base.Base): rv = self.db.instance_update(context, instance_id, kwargs) return dict(rv.iteritems()) + @scheduler_api.reroute_compute("delete") def delete(self, context, instance_id): LOG.debug(_("Going to try to terminate %s"), instance_id) try: @@ -384,6 +386,13 @@ class API(base.Base): rv = self.db.instance_get(context, instance_id) return dict(rv.iteritems()) + @scheduler_api.reroute_compute("get") + def routing_get(self, context, instance_id): + """Use this method instead of get() if this is the only + operation you intend to to. It will route to novaclient.get + if the instance is not found.""" + return self.get(context, instance_id) + def get_all(self, context, project_id=None, reservation_id=None, fixed_ip=None): """Get all instances, possibly filtered by one of the @@ -527,14 +536,17 @@ class API(base.Base): "instance_id": instance_id, "flavor_id": flavor_id}}) + @scheduler_api.reroute_compute("pause") def pause(self, context, instance_id): """Pause the given instance.""" self._cast_compute_message('pause_instance', context, instance_id) + @scheduler_api.reroute_compute("unpause") def unpause(self, context, instance_id): """Unpause the given instance.""" self._cast_compute_message('unpause_instance', context, instance_id) + @scheduler_api.reroute_compute("diagnostics") def get_diagnostics(self, context, instance_id): """Retrieve diagnostics for the given instance.""" return self._call_compute_message( @@ -546,18 +558,22 @@ class API(base.Base): """Retrieve actions for the given instance.""" return self.db.instance_get_actions(context, instance_id) + @scheduler_api.reroute_compute("suspend") def suspend(self, context, instance_id): """suspend the instance with instance_id""" self._cast_compute_message('suspend_instance', context, instance_id) + @scheduler_api.reroute_compute("resume") def resume(self, context, instance_id): """resume the instance with instance_id""" self._cast_compute_message('resume_instance', context, instance_id) + @scheduler_api.reroute_compute("rescue") def rescue(self, context, instance_id): """Rescue the given instance.""" self._cast_compute_message('rescue_instance', context, instance_id) + @scheduler_api.reroute_compute("unrescue") def unrescue(self, context, instance_id): """Unrescue the given instance.""" self._cast_compute_message('unrescue_instance', context, instance_id) @@ -573,7 +589,6 @@ class API(base.Base): def get_ajax_console(self, context, instance_id): """Get a url to an AJAX Console""" - instance = self.get(context, instance_id) output = self._call_compute_message('get_ajax_console', context, instance_id) @@ -621,7 +636,7 @@ class API(base.Base): if not re.match("^/dev/[a-z]d[a-z]+$", device): raise exception.ApiError(_("Invalid device specified: %s. " "Example device: /dev/vdb") % device) - self.volume_api.check_attach(context, volume_id) + self.volume_api.check_attach(context, volume_id=volume_id) instance = self.get(context, instance_id) host = instance['host'] rpc.cast(context, @@ -635,7 +650,7 @@ class API(base.Base): instance = self.db.volume_get_instance(context.elevated(), volume_id) if not instance: raise exception.ApiError(_("Volume isn't attached to anything!")) - self.volume_api.check_detach(context, volume_id) + self.volume_api.check_detach(context, volume_id=volume_id) host = instance['host'] rpc.cast(context, self.db.queue_get_for(context, FLAGS.compute_topic, host), @@ -646,5 +661,6 @@ class API(base.Base): def associate_floating_ip(self, context, instance_id, address): instance = self.get(context, instance_id) - self.network_api.associate_floating_ip(context, address, - instance['fixed_ip']) + self.network_api.associate_floating_ip(context, + floating_ip=address, + fixed_ip=instance['fixed_ip']) diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 7316d1510..468771f46 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -111,7 +111,7 @@ def checks_instance_lock(function): return decorated_function -class ComputeManager(manager.Manager): +class ComputeManager(manager.SchedulerDependentManager): """Manages the running instances from creation to destruction.""" @@ -132,7 +132,8 @@ class ComputeManager(manager.Manager): self.network_manager = utils.import_object(FLAGS.network_manager) self.volume_manager = utils.import_object(FLAGS.volume_manager) - super(ComputeManager, self).__init__(*args, **kwargs) + super(ComputeManager, self).__init__(service_name="compute", + *args, **kwargs) def init_host(self): """Do any initialization that needs to be run if this is a diff --git a/nova/db/api.py b/nova/db/api.py index afc1bff2f..91e7cbf31 100644 --- a/nova/db/api.py +++ b/nova/db/api.py @@ -71,6 +71,7 @@ class NoMoreTargets(exception.Error): """No more available blades""" pass + ################### @@ -89,7 +90,7 @@ def service_get_by_host_and_topic(context, host, topic): return IMPL.service_get_by_host_and_topic(context, host, topic) -def service_get_all(context, disabled=False): +def service_get_all(context, disabled=None): """Get all services.""" return IMPL.service_get_all(context, disabled) diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py index d7b5aff46..bfceaa94c 100644 --- a/nova/db/sqlalchemy/api.py +++ b/nova/db/sqlalchemy/api.py @@ -143,12 +143,15 @@ def service_get(context, service_id, session=None): @require_admin_context -def service_get_all(context, disabled=False): +def service_get_all(context, disabled=None): session = get_session() - return session.query(models.Service).\ - filter_by(deleted=can_read_deleted(context)).\ - filter_by(disabled=disabled).\ - all() + query = session.query(models.Service).\ + filter_by(deleted=can_read_deleted(context)) + + if disabled is not None: + query = query.filter_by(disabled=disabled) + + return query.all() @require_admin_context @@ -2209,7 +2212,7 @@ def migration_get(context, id, session=None): filter_by(id=id).first() if not result: raise exception.NotFound(_("No migration found with id %s") - % migration_id) + % id) return result @@ -2432,6 +2435,7 @@ def zone_create(context, values): @require_admin_context def zone_update(context, zone_id, values): + session = get_session() zone = session.query(models.Zone).filter_by(id=zone_id).first() if not zone: raise exception.NotFound(_("No zone with id %(zone_id)s") % locals()) diff --git a/nova/flags.py b/nova/flags.py index d1817dc3b..f011ab383 100644 --- a/nova/flags.py +++ b/nova/flags.py @@ -298,6 +298,8 @@ DEFINE_string('ec2_dmz_host', '$my_ip', 'internal ip of api server') DEFINE_integer('ec2_port', 8773, 'cloud controller port') DEFINE_string('ec2_scheme', 'http', 'prefix for ec2') DEFINE_string('ec2_path', '/services/Cloud', 'suffix for ec2') +DEFINE_string('osapi_extensions_path', '/var/lib/nova/extensions', + 'default directory for nova extensions') DEFINE_string('osapi_host', '$my_ip', 'ip of api server') DEFINE_string('osapi_scheme', 'http', 'prefix for openstack') DEFINE_integer('osapi_port', 8774, 'OpenStack API port') @@ -360,5 +362,6 @@ DEFINE_string('node_availability_zone', 'nova', 'availability zone of this node') DEFINE_string('zone_name', 'nova', 'name of this zone') -DEFINE_string('zone_capabilities', 'kypervisor:xenserver;os:linux', - 'Key/Value tags which represent capabilities of this zone') +DEFINE_list('zone_capabilities', + ['hypervisor=xenserver;kvm', 'os=linux;windows'], + 'Key/Multi-value list representng capabilities of this zone') diff --git a/nova/image/glance.py b/nova/image/glance.py index 171b28fde..9984a3ba1 100644 --- a/nova/image/glance.py +++ b/nova/image/glance.py @@ -73,7 +73,7 @@ class GlanceImageService(service.BaseImageService): Returns image with known timestamp fields converted to datetime objects """ for attr in ['created_at', 'updated_at', 'deleted_at']: - if image.get(attr) is not None: + if image.get(attr): image[attr] = self._parse_glance_iso8601_timestamp(image[attr]) return image diff --git a/nova/manager.py b/nova/manager.py index 3d38504bd..804a50479 100644 --- a/nova/manager.py +++ b/nova/manager.py @@ -53,11 +53,14 @@ This module provides Manager, a base class for managers. from nova import utils from nova import flags +from nova import log as logging from nova.db import base - +from nova.scheduler import api FLAGS = flags.FLAGS +LOG = logging.getLogger('nova.manager') + class Manager(base.Base): def __init__(self, host=None, db_driver=None): @@ -74,3 +77,29 @@ class Manager(base.Base): """Do any initialization that needs to be run if this is a standalone service. Child classes should override this method.""" pass + + +class SchedulerDependentManager(Manager): + """Periodically send capability updates to the Scheduler services. + Services that need to update the Scheduler of their capabilities + should derive from this class. Otherwise they can derive from + manager.Manager directly. Updates are only sent after + update_service_capabilities is called with non-None values.""" + + def __init__(self, host=None, db_driver=None, service_name="undefined"): + self.last_capabilities = None + self.service_name = service_name + super(SchedulerDependentManager, self).__init__(host, db_driver) + + def update_service_capabilities(self, capabilities): + """Remember these capabilities to send on next periodic update.""" + self.last_capabilities = capabilities + + def periodic_tasks(self, context=None): + """Pass data back to the scheduler at a periodic interval""" + if self.last_capabilities: + LOG.debug(_("Notifying Schedulers of capabilities ...")) + api.update_service_capabilities(context, self.service_name, + self.host, self.last_capabilities) + + super(SchedulerDependentManager, self).periodic_tasks(context) diff --git a/nova/network/linux_net.py b/nova/network/linux_net.py index 796d6ba31..06b05366a 100644 --- a/nova/network/linux_net.py +++ b/nova/network/linux_net.py @@ -1,3 +1,5 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. @@ -210,10 +212,7 @@ class IptablesManager(object): """ def __init__(self, execute=None): if not execute: - if FLAGS.fake_network: - self.execute = lambda *args, **kwargs: ('', '') - else: - self.execute = utils.execute + self.execute = _execute else: self.execute = execute @@ -352,9 +351,6 @@ class IptablesManager(object): return new_filter -iptables_manager = IptablesManager() - - def metadata_forward(): """Create forwarding rule for metadata""" iptables_manager.ipv4['nat'].add_rule("PREROUTING", @@ -767,3 +763,6 @@ def _ip_bridge_cmd(action, params, device): cmd.extend(params) cmd.extend(['dev', device]) return cmd + + +iptables_manager = IptablesManager() diff --git a/nova/network/manager.py b/nova/network/manager.py index 34fc042e4..d994f7dc8 100644 --- a/nova/network/manager.py +++ b/nova/network/manager.py @@ -105,7 +105,7 @@ class AddressAlreadyAllocated(exception.Error): pass -class NetworkManager(manager.Manager): +class NetworkManager(manager.SchedulerDependentManager): """Implements common network manager functionality. This class must be subclassed to support specific topologies. @@ -116,7 +116,8 @@ class NetworkManager(manager.Manager): if not network_driver: network_driver = FLAGS.network_driver self.driver = utils.import_object(network_driver) - super(NetworkManager, self).__init__(*args, **kwargs) + super(NetworkManager, self).__init__(service_name='network', + *args, **kwargs) def init_host(self): """Do any initialization that needs to be run if this is a diff --git a/nova/rpc.py b/nova/rpc.py index 5935e1fb3..388f78d69 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -137,24 +137,7 @@ class Consumer(messaging.Consumer): return timer -class Publisher(messaging.Publisher): - """Publisher base class""" - pass - - -class TopicConsumer(Consumer): - """Consumes messages on a specific topic""" - exchange_type = "topic" - - def __init__(self, connection=None, topic="broadcast"): - self.queue = topic - self.routing_key = topic - self.exchange = FLAGS.control_exchange - self.durable = False - super(TopicConsumer, self).__init__(connection=connection) - - -class AdapterConsumer(TopicConsumer): +class AdapterConsumer(Consumer): """Calls methods on a proxy object based on method and args""" def __init__(self, connection=None, topic="broadcast", proxy=None): LOG.debug(_('Initing the Adapter Consumer for %s') % topic) @@ -207,6 +190,41 @@ class AdapterConsumer(TopicConsumer): return +class Publisher(messaging.Publisher): + """Publisher base class""" + pass + + +class TopicAdapterConsumer(AdapterConsumer): + """Consumes messages on a specific topic""" + exchange_type = "topic" + + def __init__(self, connection=None, topic="broadcast", proxy=None): + self.queue = topic + self.routing_key = topic + self.exchange = FLAGS.control_exchange + self.durable = False + super(TopicAdapterConsumer, self).__init__(connection=connection, + topic=topic, proxy=proxy) + + +class FanoutAdapterConsumer(AdapterConsumer): + """Consumes messages from a fanout exchange""" + exchange_type = "fanout" + + def __init__(self, connection=None, topic="broadcast", proxy=None): + self.exchange = "%s_fanout" % topic + self.routing_key = topic + unique = uuid.uuid4().hex + self.queue = "%s_fanout_%s" % (topic, unique) + self.durable = False + LOG.info(_("Created '%(exchange)s' fanout exchange " + "with '%(key)s' routing key"), + dict(exchange=self.exchange, key=self.routing_key)) + super(FanoutAdapterConsumer, self).__init__(connection=connection, + topic=topic, proxy=proxy) + + class TopicPublisher(Publisher): """Publishes messages on a specific topic""" exchange_type = "topic" @@ -218,6 +236,19 @@ class TopicPublisher(Publisher): super(TopicPublisher, self).__init__(connection=connection) +class FanoutPublisher(Publisher): + """Publishes messages to a fanout exchange.""" + exchange_type = "fanout" + + def __init__(self, topic, connection=None): + self.exchange = "%s_fanout" % topic + self.queue = "%s_fanout" % topic + self.durable = False + LOG.info(_("Creating '%(exchange)s' fanout exchange"), + dict(exchange=self.exchange)) + super(FanoutPublisher, self).__init__(connection=connection) + + class DirectConsumer(Consumer): """Consumes messages directly on a channel specified by msg_id""" exchange_type = "direct" @@ -360,6 +391,16 @@ def cast(context, topic, msg): publisher.close() +def fanout_cast(context, topic, msg): + """Sends a message on a fanout exchange without waiting for a response""" + LOG.debug(_("Making asynchronous fanout cast...")) + _pack_context(msg, context) + conn = Connection.instance() + publisher = FanoutPublisher(topic, connection=conn) + publisher.send(msg) + publisher.close() + + def generic_response(message_data, message): """Logs a result and exits""" LOG.debug(_('response %s'), message_data) diff --git a/nova/scheduler/api.py b/nova/scheduler/api.py index 2405f1343..6bb3bf3cd 100644 --- a/nova/scheduler/api.py +++ b/nova/scheduler/api.py @@ -17,33 +17,225 @@ Handles all requests relating to schedulers. """ +import novaclient + +from nova import db +from nova import exception from nova import flags from nova import log as logging from nova import rpc +from eventlet import greenpool + FLAGS = flags.FLAGS +flags.DEFINE_bool('enable_zone_routing', + False, + 'When True, routing to child zones will occur.') + LOG = logging.getLogger('nova.scheduler.api') -class API(object): - """API for interacting with the scheduler.""" +def _call_scheduler(method, context, params=None): + """Generic handler for RPC calls to the scheduler. + + :param params: Optional dictionary of arguments to be passed to the + scheduler worker + + :retval: Result returned by scheduler worker + """ + if not params: + params = {} + queue = FLAGS.scheduler_topic + kwargs = {'method': method, 'args': params} + return rpc.call(context, queue, kwargs) + + +def get_zone_list(context): + """Return a list of zones assoicated with this zone.""" + items = _call_scheduler('get_zone_list', context) + for item in items: + item['api_url'] = item['api_url'].replace('\\/', '/') + if not items: + items = db.zone_get_all(context) + return items + + +def zone_get(context, zone_id): + return db.zone_get(context, zone_id) + + +def zone_delete(context, zone_id): + return db.zone_delete(context, zone_id) + + +def zone_create(context, data): + return db.zone_create(context, data) + + +def zone_update(context, zone_id, data): + return db.zone_update(context, zone_id, data) + + +def get_zone_capabilities(context, service=None): + """Returns a dict of key, value capabilities for this zone, + or for a particular class of services running in this zone.""" + return _call_scheduler('get_zone_capabilities', context=context, + params=dict(service=service)) + + +def update_service_capabilities(context, service_name, host, capabilities): + """Send an update to all the scheduler services informing them + of the capabilities of this service.""" + kwargs = dict(method='update_service_capabilities', + args=dict(service_name=service_name, host=host, + capabilities=capabilities)) + return rpc.fanout_cast(context, 'scheduler', kwargs) + + +def _wrap_method(function, self): + """Wrap method to supply self.""" + def _wrap(*args, **kwargs): + return function(self, *args, **kwargs) + return _wrap + + +def _process(func, zone): + """Worker stub for green thread pool. Give the worker + an authenticated nova client and zone info.""" + nova = novaclient.OpenStack(zone.username, zone.password, zone.api_url) + nova.authenticate() + return func(nova, zone) + + +def child_zone_helper(zone_list, func): + """Fire off a command to each zone in the list. + The return is [novaclient return objects] from each child zone. + For example, if you are calling server.pause(), the list will + be whatever the response from server.pause() is. One entry + per child zone called.""" + green_pool = greenpool.GreenPool() + return [result for result in green_pool.imap( + _wrap_method(_process, func), zone_list)] + + +def _issue_novaclient_command(nova, zone, collection, method_name, item_id): + """Use novaclient to issue command to a single child zone. + One of these will be run in parallel for each child zone.""" + manager = getattr(nova, collection) + result = None + try: + try: + result = manager.get(int(item_id)) + except ValueError, e: + result = manager.find(name=item_id) + except novaclient.NotFound: + url = zone.api_url + LOG.debug(_("%(collection)s '%(item_id)s' not found on '%(url)s'" % + locals())) + return None + + if method_name.lower() not in ['get', 'find']: + result = getattr(result, method_name)() + return result + + +def wrap_novaclient_function(f, collection, method_name, item_id): + """Appends collection, method_name and item_id to the incoming + (nova, zone) call from child_zone_helper.""" + def inner(nova, zone): + return f(nova, zone, collection, method_name, item_id) + + return inner + + +class RedirectResult(exception.Error): + """Used to the HTTP API know that these results are pre-cooked + and they can be returned to the caller directly.""" + def __init__(self, results): + self.results = results + super(RedirectResult, self).__init__( + message=_("Uncaught Zone redirection exception")) + + +class reroute_compute(object): + """Decorator used to indicate that the method should + delegate the call the child zones if the db query + can't find anything.""" + def __init__(self, method_name): + self.method_name = method_name + + def __call__(self, f): + def wrapped_f(*args, **kwargs): + collection, context, item_id = \ + self.get_collection_context_and_id(args, kwargs) + try: + # Call the original function ... + return f(*args, **kwargs) + except exception.InstanceNotFound, e: + LOG.debug(_("Instance %(item_id)s not found " + "locally: '%(e)s'" % locals())) + + if not FLAGS.enable_zone_routing: + raise + + zones = db.zone_get_all(context) + if not zones: + raise + + # Ask the children to provide an answer ... + LOG.debug(_("Asking child zones ...")) + result = self._call_child_zones(zones, + wrap_novaclient_function(_issue_novaclient_command, + collection, self.method_name, item_id)) + # Scrub the results and raise another exception + # so the API layers can bail out gracefully ... + raise RedirectResult(self.unmarshall_result(result)) + return wrapped_f + + def _call_child_zones(self, zones, function): + """Ask the child zones to perform this operation. + Broken out for testing.""" + return child_zone_helper(zones, function) + + def get_collection_context_and_id(self, args, kwargs): + """Returns a tuple of (novaclient collection name, security + context and resource id. Derived class should override this.""" + context = kwargs.get('context', None) + instance_id = kwargs.get('instance_id', None) + if len(args) > 0 and not context: + context = args[1] + if len(args) > 1 and not instance_id: + instance_id = args[2] + return ("servers", context, instance_id) + + def unmarshall_result(self, zone_responses): + """Result is a list of responses from each child zone. + Each decorator derivation is responsible to turning this + into a format expected by the calling method. For + example, this one is expected to return a single Server + dict {'server':{k:v}}. Others may return a list of them, like + {'servers':[{k,v}]}""" + reduced_response = [] + for zone_response in zone_responses: + if not zone_response: + continue + + server = zone_response.__dict__ - def _call_scheduler(self, method, context, params=None): - """Generic handler for RPC calls to the scheduler. + for k in server.keys(): + if k[0] == '_' or k == 'manager': + del server[k] - :param params: Optional dictionary of arguments to be passed to the - scheduler worker + reduced_response.append(dict(server=server)) + if reduced_response: + return reduced_response[0] # first for now. + return {} - :retval: Result returned by scheduler worker - """ - if not params: - params = {} - queue = FLAGS.scheduler_topic - kwargs = {'method': method, 'args': params} - return rpc.call(context, queue, kwargs) - def get_zone_list(self, context): - items = self._call_scheduler('get_zone_list', context) - for item in items: - item['api_url'] = item['api_url'].replace('\\/', '/') - return items +def redirect_handler(f): + def new_f(*args, **kwargs): + try: + return f(*args, **kwargs) + except RedirectResult, e: + return e.results + return new_f diff --git a/nova/scheduler/driver.py b/nova/scheduler/driver.py index ed3dfe1c0..ce05d9f6a 100644 --- a/nova/scheduler/driver.py +++ b/nova/scheduler/driver.py @@ -49,6 +49,13 @@ class WillNotSchedule(exception.Error): class Scheduler(object): """The base class that all Scheduler clases should inherit from.""" + def __init__(self): + self.zone_manager = None + + def set_zone_manager(self, zone_manager): + """Called by the Scheduler Service to supply a ZoneManager.""" + self.zone_manager = zone_manager + @staticmethod def service_is_up(service): """Check whether a service is up based on last heartbeat.""" diff --git a/nova/scheduler/manager.py b/nova/scheduler/manager.py index 053a53356..7d62cfc4e 100644 --- a/nova/scheduler/manager.py +++ b/nova/scheduler/manager.py @@ -41,10 +41,11 @@ flags.DEFINE_string('scheduler_driver', class SchedulerManager(manager.Manager): """Chooses a host to run instances on.""" def __init__(self, scheduler_driver=None, *args, **kwargs): + self.zone_manager = zone_manager.ZoneManager() if not scheduler_driver: scheduler_driver = FLAGS.scheduler_driver self.driver = utils.import_object(scheduler_driver) - self.zone_manager = zone_manager.ZoneManager() + self.driver.set_zone_manager(self.zone_manager) super(SchedulerManager, self).__init__(*args, **kwargs) def __getattr__(self, key): @@ -59,6 +60,17 @@ class SchedulerManager(manager.Manager): """Get a list of zones from the ZoneManager.""" return self.zone_manager.get_zone_list() + def get_zone_capabilities(self, context=None, service=None): + """Get the normalized set of capabilites for this zone, + or for a particular service.""" + return self.zone_manager.get_zone_capabilities(context, service) + + def update_service_capabilities(self, context=None, service_name=None, + host=None, capabilities={}): + """Process a capability update from a service node.""" + self.zone_manager.update_service_capabilities(service_name, + host, capabilities) + def _schedule(self, method, context, topic, *args, **kwargs): """Tries to call schedule_* method on the driver to retrieve host. diff --git a/nova/scheduler/zone_manager.py b/nova/scheduler/zone_manager.py index edf9000cc..198f9d4cc 100644 --- a/nova/scheduler/zone_manager.py +++ b/nova/scheduler/zone_manager.py @@ -58,8 +58,9 @@ class ZoneState(object): child zone.""" self.last_seen = datetime.now() self.attempt = 0 - self.name = zone_metadata["name"] - self.capabilities = zone_metadata["capabilities"] + self.name = zone_metadata.get("name", "n/a") + self.capabilities = ", ".join(["%s=%s" % (k, v) + for k, v in zone_metadata.iteritems() if k != 'name']) self.is_active = True def to_dict(self): @@ -104,13 +105,37 @@ class ZoneManager(object): """Keeps the zone states updated.""" def __init__(self): self.last_zone_db_check = datetime.min - self.zone_states = {} + self.zone_states = {} # { <zone_id> : ZoneState } + self.service_states = {} # { <service> : { <host> : { cap k : v }}} self.green_pool = greenpool.GreenPool() def get_zone_list(self): """Return the list of zones we know about.""" return [zone.to_dict() for zone in self.zone_states.values()] + def get_zone_capabilities(self, context, service=None): + """Roll up all the individual host info to generic 'service' + capabilities. Each capability is aggregated into + <cap>_min and <cap>_max values.""" + service_dict = self.service_states + if service: + service_dict = {service: self.service_states.get(service, {})} + + # TODO(sandy) - be smarter about fabricating this structure. + # But it's likely to change once we understand what the Best-Match + # code will need better. + combined = {} # { <service>_<cap> : (min, max), ... } + for service_name, host_dict in service_dict.iteritems(): + for host, caps_dict in host_dict.iteritems(): + for cap, value in caps_dict.iteritems(): + key = "%s_%s" % (service_name, cap) + min_value, max_value = combined.get(key, (value, value)) + min_value = min(min_value, value) + max_value = max(max_value, value) + combined[key] = (min_value, max_value) + + return combined + def _refresh_from_db(self, context): """Make our zone state map match the db.""" # Add/update existing zones ... @@ -141,3 +166,11 @@ class ZoneManager(object): self.last_zone_db_check = datetime.now() self._refresh_from_db(context) self._poll_zones(context) + + def update_service_capabilities(self, service_name, host, capabilities): + """Update the per-service capabilities based on this notification.""" + logging.debug(_("Received %(service_name)s service update from " + "%(host)s: %(capabilities)s") % locals()) + service_caps = self.service_states.get(service_name, {}) + service_caps[host] = capabilities + self.service_states[service_name] = service_caps diff --git a/nova/service.py b/nova/service.py index 52bb15ad7..47c0b96c0 100644 --- a/nova/service.py +++ b/nova/service.py @@ -97,18 +97,24 @@ class Service(object): conn1 = rpc.Connection.instance(new=True) conn2 = rpc.Connection.instance(new=True) + conn3 = rpc.Connection.instance(new=True) if self.report_interval: - consumer_all = rpc.AdapterConsumer( + consumer_all = rpc.TopicAdapterConsumer( connection=conn1, topic=self.topic, proxy=self) - consumer_node = rpc.AdapterConsumer( + consumer_node = rpc.TopicAdapterConsumer( connection=conn2, topic='%s.%s' % (self.topic, self.host), proxy=self) + fanout = rpc.FanoutAdapterConsumer( + connection=conn3, + topic=self.topic, + proxy=self) self.timers.append(consumer_all.attach_to_eventlet()) self.timers.append(consumer_node.attach_to_eventlet()) + self.timers.append(fanout.attach_to_eventlet()) pulse = utils.LoopingCall(self.report_state) pulse.start(interval=self.report_interval, now=False) diff --git a/nova/tests/api/openstack/extensions/foxinsocks.py b/nova/tests/api/openstack/extensions/foxinsocks.py new file mode 100644 index 000000000..0860b51ac --- /dev/null +++ b/nova/tests/api/openstack/extensions/foxinsocks.py @@ -0,0 +1,98 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json + +from nova import wsgi + +from nova.api.openstack import extensions + + +class FoxInSocksController(wsgi.Controller): + + def index(self, req): + return "Try to say this Mr. Knox, sir..." + + +class Foxinsocks(object): + + def __init__(self): + pass + + def get_name(self): + return "Fox In Socks" + + def get_alias(self): + return "FOXNSOX" + + def get_description(self): + return "The Fox In Socks Extension" + + def get_namespace(self): + return "http://www.fox.in.socks/api/ext/pie/v1.0" + + def get_updated(self): + return "2011-01-22T13:25:27-06:00" + + def get_resources(self): + resources = [] + resource = extensions.ResourceExtension('foxnsocks', + FoxInSocksController()) + resources.append(resource) + return resources + + def get_actions(self): + actions = [] + actions.append(extensions.ActionExtension('servers', 'add_tweedle', + self._add_tweedle)) + actions.append(extensions.ActionExtension('servers', 'delete_tweedle', + self._delete_tweedle)) + return actions + + def get_response_extensions(self): + response_exts = [] + + def _goose_handler(res): + #NOTE: This only handles JSON responses. + # You can use content type header to test for XML. + data = json.loads(res.body) + data['flavor']['googoose'] = "Gooey goo for chewy chewing!" + return data + + resp_ext = extensions.ResponseExtension('GET', '/v1.1/flavors/:(id)', + _goose_handler) + response_exts.append(resp_ext) + + def _bands_handler(res): + #NOTE: This only handles JSON responses. + # You can use content type header to test for XML. + data = json.loads(res.body) + data['big_bands'] = 'Pig Bands!' + return data + + resp_ext2 = extensions.ResponseExtension('GET', '/v1.1/flavors/:(id)', + _bands_handler) + response_exts.append(resp_ext2) + return response_exts + + def _add_tweedle(self, input_dict, req, id): + + return "Tweedle Beetle Added." + + def _delete_tweedle(self, input_dict, req, id): + + return "Tweedle Beetle Deleted." diff --git a/nova/tests/api/openstack/test_extensions.py b/nova/tests/api/openstack/test_extensions.py new file mode 100644 index 000000000..481d34ed1 --- /dev/null +++ b/nova/tests/api/openstack/test_extensions.py @@ -0,0 +1,236 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import stubout +import unittest +import webob +import os.path + +from nova import context +from nova import flags +from nova.api import openstack +from nova.api.openstack import extensions +from nova.api.openstack import flavors +from nova.tests.api.openstack import fakes +import nova.wsgi + +FLAGS = flags.FLAGS + +response_body = "Try to say this Mr. Knox, sir..." + + +class StubController(nova.wsgi.Controller): + + def __init__(self, body): + self.body = body + + def index(self, req): + return self.body + + +class StubExtensionManager(object): + + def __init__(self, resource_ext=None, action_ext=None, response_ext=None): + self.resource_ext = resource_ext + self.action_ext = action_ext + self.response_ext = response_ext + + def get_name(self): + return "Tweedle Beetle Extension" + + def get_alias(self): + return "TWDLBETL" + + def get_description(self): + return "Provides access to Tweedle Beetles" + + def get_resources(self): + resource_exts = [] + if self.resource_ext: + resource_exts.append(self.resource_ext) + return resource_exts + + def get_actions(self): + action_exts = [] + if self.action_ext: + action_exts.append(self.action_ext) + return action_exts + + def get_response_extensions(self): + response_exts = [] + if self.response_ext: + response_exts.append(self.response_ext) + return response_exts + + +class ExtensionControllerTest(unittest.TestCase): + + def test_index(self): + app = openstack.APIRouterV11() + ext_midware = extensions.ExtensionMiddleware(app) + request = webob.Request.blank("/extensions") + response = request.get_response(ext_midware) + self.assertEqual(200, response.status_int) + + def test_get_by_alias(self): + app = openstack.APIRouterV11() + ext_midware = extensions.ExtensionMiddleware(app) + request = webob.Request.blank("/extensions/FOXNSOX") + response = request.get_response(ext_midware) + self.assertEqual(200, response.status_int) + + +class ResourceExtensionTest(unittest.TestCase): + + def test_no_extension_present(self): + manager = StubExtensionManager(None) + app = openstack.APIRouterV11() + ext_midware = extensions.ExtensionMiddleware(app, manager) + request = webob.Request.blank("/blah") + response = request.get_response(ext_midware) + self.assertEqual(404, response.status_int) + + def test_get_resources(self): + res_ext = extensions.ResourceExtension('tweedles', + StubController(response_body)) + manager = StubExtensionManager(res_ext) + app = openstack.APIRouterV11() + ext_midware = extensions.ExtensionMiddleware(app, manager) + request = webob.Request.blank("/tweedles") + response = request.get_response(ext_midware) + self.assertEqual(200, response.status_int) + self.assertEqual(response_body, response.body) + + def test_get_resources_with_controller(self): + res_ext = extensions.ResourceExtension('tweedles', + StubController(response_body)) + manager = StubExtensionManager(res_ext) + app = openstack.APIRouterV11() + ext_midware = extensions.ExtensionMiddleware(app, manager) + request = webob.Request.blank("/tweedles") + response = request.get_response(ext_midware) + self.assertEqual(200, response.status_int) + self.assertEqual(response_body, response.body) + + +class ExtensionManagerTest(unittest.TestCase): + + response_body = "Try to say this Mr. Knox, sir..." + + def setUp(self): + FLAGS.osapi_extensions_path = os.path.join(os.path.dirname(__file__), + "extensions") + + def test_get_resources(self): + app = openstack.APIRouterV11() + ext_midware = extensions.ExtensionMiddleware(app) + request = webob.Request.blank("/foxnsocks") + response = request.get_response(ext_midware) + self.assertEqual(200, response.status_int) + self.assertEqual(response_body, response.body) + + +class ActionExtensionTest(unittest.TestCase): + + def setUp(self): + FLAGS.osapi_extensions_path = os.path.join(os.path.dirname(__file__), + "extensions") + + def _send_server_action_request(self, url, body): + app = openstack.APIRouterV11() + ext_midware = extensions.ExtensionMiddleware(app) + request = webob.Request.blank(url) + request.method = 'POST' + request.content_type = 'application/json' + request.body = json.dumps(body) + response = request.get_response(ext_midware) + return response + + def test_extended_action(self): + body = dict(add_tweedle=dict(name="test")) + response = self._send_server_action_request("/servers/1/action", body) + self.assertEqual(200, response.status_int) + self.assertEqual("Tweedle Beetle Added.", response.body) + + body = dict(delete_tweedle=dict(name="test")) + response = self._send_server_action_request("/servers/1/action", body) + self.assertEqual(200, response.status_int) + self.assertEqual("Tweedle Beetle Deleted.", response.body) + + def test_invalid_action_body(self): + body = dict(blah=dict(name="test")) # Doesn't exist + response = self._send_server_action_request("/servers/1/action", body) + self.assertEqual(501, response.status_int) + + def test_invalid_action(self): + body = dict(blah=dict(name="test")) + response = self._send_server_action_request("/asdf/1/action", body) + self.assertEqual(404, response.status_int) + + +class ResponseExtensionTest(unittest.TestCase): + + def setUp(self): + super(ResponseExtensionTest, self).setUp() + self.stubs = stubout.StubOutForTesting() + fakes.FakeAuthManager.reset_fake_data() + fakes.FakeAuthDatabase.data = {} + fakes.stub_out_auth(self.stubs) + self.context = context.get_admin_context() + + def tearDown(self): + self.stubs.UnsetAll() + super(ResponseExtensionTest, self).tearDown() + + def test_get_resources_with_stub_mgr(self): + + test_resp = "Gooey goo for chewy chewing!" + + def _resp_handler(res): + # only handle JSON responses + data = json.loads(res.body) + data['flavor']['googoose'] = test_resp + return data + + resp_ext = extensions.ResponseExtension('GET', + '/v1.1/flavors/:(id)', + _resp_handler) + + manager = StubExtensionManager(None, None, resp_ext) + app = fakes.wsgi_app() + ext_midware = extensions.ExtensionMiddleware(app, manager) + request = webob.Request.blank("/v1.1/flavors/1") + request.environ['api.version'] = '1.1' + response = request.get_response(ext_midware) + self.assertEqual(200, response.status_int) + response_data = json.loads(response.body) + self.assertEqual(test_resp, response_data['flavor']['googoose']) + + def test_get_resources_with_mgr(self): + + test_resp = "Gooey goo for chewy chewing!" + + app = fakes.wsgi_app() + ext_midware = extensions.ExtensionMiddleware(app) + request = webob.Request.blank("/v1.1/flavors/1") + request.environ['api.version'] = '1.1' + response = request.get_response(ext_midware) + self.assertEqual(200, response.status_int) + response_data = json.loads(response.body) + self.assertEqual(test_resp, response_data['flavor']['googoose']) + self.assertEqual("Pig Bands!", response_data['big_bands']) diff --git a/nova/tests/api/openstack/test_zones.py b/nova/tests/api/openstack/test_zones.py index 38399bb3f..a3f191aaa 100644 --- a/nova/tests/api/openstack/test_zones.py +++ b/nova/tests/api/openstack/test_zones.py @@ -75,6 +75,10 @@ def zone_get_all_db(context): ] +def zone_capabilities(method, context, params): + return dict() + + class ZonesTest(test.TestCase): def setUp(self): super(ZonesTest, self).setUp() @@ -93,13 +97,18 @@ class ZonesTest(test.TestCase): self.stubs.Set(nova.db, 'zone_create', zone_create) self.stubs.Set(nova.db, 'zone_delete', zone_delete) + self.old_zone_name = FLAGS.zone_name + self.old_zone_capabilities = FLAGS.zone_capabilities + def tearDown(self): self.stubs.UnsetAll() FLAGS.allow_admin_api = self.allow_admin + FLAGS.zone_name = self.old_zone_name + FLAGS.zone_capabilities = self.old_zone_capabilities super(ZonesTest, self).tearDown() def test_get_zone_list_scheduler(self): - self.stubs.Set(api.API, '_call_scheduler', zone_get_all_scheduler) + self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler) req = webob.Request.blank('/v1.0/zones') res = req.get_response(fakes.wsgi_app()) res_dict = json.loads(res.body) @@ -108,8 +117,7 @@ class ZonesTest(test.TestCase): self.assertEqual(len(res_dict['zones']), 2) def test_get_zone_list_db(self): - self.stubs.Set(api.API, '_call_scheduler', - zone_get_all_scheduler_empty) + self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler_empty) self.stubs.Set(nova.db, 'zone_get_all', zone_get_all_db) req = webob.Request.blank('/v1.0/zones') req.headers["Content-Type"] = "application/json" @@ -167,3 +175,18 @@ class ZonesTest(test.TestCase): self.assertEqual(res_dict['zone']['id'], 1) self.assertEqual(res_dict['zone']['api_url'], 'http://example.com') self.assertFalse('username' in res_dict['zone']) + + def test_zone_info(self): + FLAGS.zone_name = 'darksecret' + FLAGS.zone_capabilities = ['cap1=a;b', 'cap2=c;d'] + self.stubs.Set(api, '_call_scheduler', zone_capabilities) + + body = dict(zone=dict(username='zeb', password='sneaky')) + req = webob.Request.blank('/v1.0/zones/info') + + res = req.get_response(fakes.wsgi_app()) + res_dict = json.loads(res.body) + self.assertEqual(res.status_int, 200) + self.assertEqual(res_dict['zone']['name'], 'darksecret') + self.assertEqual(res_dict['zone']['cap1'], 'a;b') + self.assertEqual(res_dict['zone']['cap2'], 'c;d') diff --git a/nova/tests/test_direct.py b/nova/tests/test_direct.py index 80e4d2e1f..588a24b35 100644 --- a/nova/tests/test_direct.py +++ b/nova/tests/test_direct.py @@ -25,12 +25,18 @@ import webob from nova import compute from nova import context from nova import exception +from nova import network from nova import test +from nova import volume from nova import utils from nova.api import direct from nova.tests import test_cloud +class ArbitraryObject(object): + pass + + class FakeService(object): def echo(self, context, data): return {'data': data} @@ -39,6 +45,9 @@ class FakeService(object): return {'user': context.user_id, 'project': context.project_id} + def invalid_return(self, context): + return ArbitraryObject() + class DirectTestCase(test.TestCase): def setUp(self): @@ -84,6 +93,12 @@ class DirectTestCase(test.TestCase): resp_parsed = json.loads(resp.body) self.assertEqual(resp_parsed['data'], 'foo') + def test_invalid(self): + req = webob.Request.blank('/fake/invalid_return') + req.environ['openstack.context'] = self.context + req.method = 'POST' + self.assertRaises(exception.Error, req.get_response, self.router) + def test_proxy(self): proxy = direct.Proxy(self.router) rv = proxy.fake.echo(self.context, data='baz') @@ -93,12 +108,20 @@ class DirectTestCase(test.TestCase): class DirectCloudTestCase(test_cloud.CloudTestCase): def setUp(self): super(DirectCloudTestCase, self).setUp() - compute_handle = compute.API(network_api=self.cloud.network_api, - volume_api=self.cloud.volume_api) + compute_handle = compute.API(image_service=self.cloud.image_service) + volume_handle = volume.API() + network_handle = network.API() direct.register_service('compute', compute_handle) + direct.register_service('volume', volume_handle) + direct.register_service('network', network_handle) + self.router = direct.JsonParamsMiddleware(direct.Router()) proxy = direct.Proxy(self.router) self.cloud.compute_api = proxy.compute + self.cloud.volume_api = proxy.volume + self.cloud.network_api = proxy.network + compute_handle.volume_api = proxy.volume + compute_handle.network_api = proxy.network def tearDown(self): super(DirectCloudTestCase, self).tearDown() diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py index 4820e04fb..44d7c91eb 100644 --- a/nova/tests/test_rpc.py +++ b/nova/tests/test_rpc.py @@ -36,7 +36,7 @@ class RpcTestCase(test.TestCase): super(RpcTestCase, self).setUp() self.conn = rpc.Connection.instance(True) self.receiver = TestReceiver() - self.consumer = rpc.AdapterConsumer(connection=self.conn, + self.consumer = rpc.TopicAdapterConsumer(connection=self.conn, topic='test', proxy=self.receiver) self.consumer.attach_to_eventlet() @@ -97,7 +97,7 @@ class RpcTestCase(test.TestCase): nested = Nested() conn = rpc.Connection.instance(True) - consumer = rpc.AdapterConsumer(connection=conn, + consumer = rpc.TopicAdapterConsumer(connection=conn, topic='nested', proxy=nested) consumer.attach_to_eventlet() diff --git a/nova/tests/test_scheduler.py b/nova/tests/test_scheduler.py index 244e43bd9..6df74dd61 100644 --- a/nova/tests/test_scheduler.py +++ b/nova/tests/test_scheduler.py @@ -21,6 +21,9 @@ Tests For Scheduler import datetime import mox +import novaclient.exceptions +import stubout +import webob from mox import IgnoreArg from nova import context @@ -32,6 +35,7 @@ from nova import test from nova import rpc from nova import utils from nova.auth import manager as auth_manager +from nova.scheduler import api from nova.scheduler import manager from nova.scheduler import driver from nova.compute import power_state @@ -937,3 +941,160 @@ class SimpleDriverTestCase(test.TestCase): db.instance_destroy(self.context, instance_id) db.service_destroy(self.context, s_ref['id']) db.service_destroy(self.context, s_ref2['id']) + + +class FakeZone(object): + def __init__(self, api_url, username, password): + self.api_url = api_url + self.username = username + self.password = password + + +def zone_get_all(context): + return [ + FakeZone('http://example.com', 'bob', 'xxx'), + ] + + +class FakeRerouteCompute(api.reroute_compute): + def _call_child_zones(self, zones, function): + return [] + + def get_collection_context_and_id(self, args, kwargs): + return ("servers", None, 1) + + def unmarshall_result(self, zone_responses): + return dict(magic="found me") + + +def go_boom(self, context, instance): + raise exception.InstanceNotFound("boom message", instance) + + +def found_instance(self, context, instance): + return dict(name='myserver') + + +class FakeResource(object): + def __init__(self, attribute_dict): + for k, v in attribute_dict.iteritems(): + setattr(self, k, v) + + def pause(self): + pass + + +class ZoneRedirectTest(test.TestCase): + def setUp(self): + super(ZoneRedirectTest, self).setUp() + self.stubs = stubout.StubOutForTesting() + + self.stubs.Set(db, 'zone_get_all', zone_get_all) + + self.enable_zone_routing = FLAGS.enable_zone_routing + FLAGS.enable_zone_routing = True + + def tearDown(self): + self.stubs.UnsetAll() + FLAGS.enable_zone_routing = self.enable_zone_routing + super(ZoneRedirectTest, self).tearDown() + + def test_trap_found_locally(self): + decorator = FakeRerouteCompute("foo") + try: + result = decorator(found_instance)(None, None, 1) + except api.RedirectResult, e: + self.fail(_("Successful database hit should succeed")) + + def test_trap_not_found_locally(self): + decorator = FakeRerouteCompute("foo") + try: + result = decorator(go_boom)(None, None, 1) + self.assertFail(_("Should have rerouted.")) + except api.RedirectResult, e: + self.assertEquals(e.results['magic'], 'found me') + + def test_routing_flags(self): + FLAGS.enable_zone_routing = False + decorator = FakeRerouteCompute("foo") + try: + result = decorator(go_boom)(None, None, 1) + self.assertFail(_("Should have thrown exception.")) + except exception.InstanceNotFound, e: + self.assertEquals(e.message, 'boom message') + + def test_get_collection_context_and_id(self): + decorator = api.reroute_compute("foo") + self.assertEquals(decorator.get_collection_context_and_id( + (None, 10, 20), {}), ("servers", 10, 20)) + self.assertEquals(decorator.get_collection_context_and_id( + (None, 11,), dict(instance_id=21)), ("servers", 11, 21)) + self.assertEquals(decorator.get_collection_context_and_id( + (None,), dict(context=12, instance_id=22)), ("servers", 12, 22)) + + def test_unmarshal_single_server(self): + decorator = api.reroute_compute("foo") + self.assertEquals(decorator.unmarshall_result([]), {}) + self.assertEquals(decorator.unmarshall_result( + [FakeResource(dict(a=1, b=2)), ]), + dict(server=dict(a=1, b=2))) + self.assertEquals(decorator.unmarshall_result( + [FakeResource(dict(a=1, _b=2)), ]), + dict(server=dict(a=1,))) + self.assertEquals(decorator.unmarshall_result( + [FakeResource(dict(a=1, manager=2)), ]), + dict(server=dict(a=1,))) + self.assertEquals(decorator.unmarshall_result( + [FakeResource(dict(_a=1, manager=2)), ]), + dict(server={})) + + +class FakeServerCollection(object): + def get(self, instance_id): + return FakeResource(dict(a=10, b=20)) + + def find(self, name): + return FakeResource(dict(a=11, b=22)) + + +class FakeEmptyServerCollection(object): + def get(self, f): + raise novaclient.NotFound(1) + + def find(self, name): + raise novaclient.NotFound(2) + + +class FakeNovaClient(object): + def __init__(self, collection): + self.servers = collection + + +class DynamicNovaClientTest(test.TestCase): + def test_issue_novaclient_command_found(self): + zone = FakeZone('http://example.com', 'bob', 'xxx') + self.assertEquals(api._issue_novaclient_command( + FakeNovaClient(FakeServerCollection()), + zone, "servers", "get", 100).a, 10) + + self.assertEquals(api._issue_novaclient_command( + FakeNovaClient(FakeServerCollection()), + zone, "servers", "find", "name").b, 22) + + self.assertEquals(api._issue_novaclient_command( + FakeNovaClient(FakeServerCollection()), + zone, "servers", "pause", 100), None) + + def test_issue_novaclient_command_not_found(self): + zone = FakeZone('http://example.com', 'bob', 'xxx') + self.assertEquals(api._issue_novaclient_command( + FakeNovaClient(FakeEmptyServerCollection()), + zone, "servers", "get", 100), None) + + self.assertEquals(api._issue_novaclient_command( + FakeNovaClient(FakeEmptyServerCollection()), + zone, "servers", "find", "name"), None) + + self.assertEquals(api._issue_novaclient_command( + FakeNovaClient(FakeEmptyServerCollection()), + zone, "servers", "any", "name"), None) diff --git a/nova/tests/test_service.py b/nova/tests/test_service.py index 393f9d20b..d48de2057 100644 --- a/nova/tests/test_service.py +++ b/nova/tests/test_service.py @@ -109,20 +109,29 @@ class ServiceTestCase(test.TestCase): app = service.Service.create(host=host, binary=binary) self.mox.StubOutWithMock(rpc, - 'AdapterConsumer', + 'TopicAdapterConsumer', use_mock_anything=True) - rpc.AdapterConsumer(connection=mox.IgnoreArg(), + self.mox.StubOutWithMock(rpc, + 'FanoutAdapterConsumer', + use_mock_anything=True) + rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(), topic=topic, proxy=mox.IsA(service.Service)).AndReturn( - rpc.AdapterConsumer) + rpc.TopicAdapterConsumer) - rpc.AdapterConsumer(connection=mox.IgnoreArg(), + rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(), topic='%s.%s' % (topic, host), proxy=mox.IsA(service.Service)).AndReturn( - rpc.AdapterConsumer) + rpc.TopicAdapterConsumer) + + rpc.FanoutAdapterConsumer(connection=mox.IgnoreArg(), + topic=topic, + proxy=mox.IsA(service.Service)).AndReturn( + rpc.FanoutAdapterConsumer) - rpc.AdapterConsumer.attach_to_eventlet() - rpc.AdapterConsumer.attach_to_eventlet() + rpc.TopicAdapterConsumer.attach_to_eventlet() + rpc.TopicAdapterConsumer.attach_to_eventlet() + rpc.FanoutAdapterConsumer.attach_to_eventlet() service_create = {'host': host, 'binary': binary, @@ -279,6 +288,7 @@ class ServiceTestCase(test.TestCase): self.mox.StubOutWithMock(service.rpc.Connection, 'instance') service.rpc.Connection.instance(new=mox.IgnoreArg()) service.rpc.Connection.instance(new=mox.IgnoreArg()) + service.rpc.Connection.instance(new=mox.IgnoreArg()) self.mox.StubOutWithMock(serv.manager.driver, 'update_available_resource') serv.manager.driver.update_available_resource(mox.IgnoreArg(), host) diff --git a/nova/tests/test_test.py b/nova/tests/test_test.py index e237674e6..35c838065 100644 --- a/nova/tests/test_test.py +++ b/nova/tests/test_test.py @@ -34,7 +34,7 @@ class IsolationTestCase(test.TestCase): def test_rpc_consumer_isolation(self): connection = rpc.Connection.instance(new=True) - consumer = rpc.TopicConsumer(connection, topic='compute') + consumer = rpc.TopicAdapterConsumer(connection, topic='compute') consumer.register_callback( lambda x, y: self.fail('I should never be called')) consumer.attach_to_eventlet() diff --git a/nova/tests/test_virt.py b/nova/tests/test_virt.py index fb0ba53b1..3a03159ff 100644 --- a/nova/tests/test_virt.py +++ b/nova/tests/test_virt.py @@ -796,7 +796,8 @@ class NWFilterTestCase(test.TestCase): instance_ref = db.instance_create(self.context, {'user_id': 'fake', - 'project_id': 'fake'}) + 'project_id': 'fake', + 'mac_address': '00:A0:C9:14:C8:29'}) inst_id = instance_ref['id'] ip = '10.11.12.13' @@ -813,7 +814,8 @@ class NWFilterTestCase(test.TestCase): 'instance_id': instance_ref['id']}) def _ensure_all_called(): - instance_filter = 'nova-instance-%s' % instance_ref['name'] + instance_filter = 'nova-instance-%s-%s' % (instance_ref['name'], + '00A0C914C829') secgroup_filter = 'nova-secgroup-%s' % self.security_group['id'] for required in [secgroup_filter, 'allow-dhcp-server', 'no-arp-spoofing', 'no-ip-spoofing', diff --git a/nova/tests/test_volume.py b/nova/tests/test_volume.py index 5d68ca2ae..d71b75f3f 100644 --- a/nova/tests/test_volume.py +++ b/nova/tests/test_volume.py @@ -356,8 +356,8 @@ class ISCSITestCase(DriverTestCase): tid = db.volume_get_iscsi_target_num(self.context, volume_id_list[0]) self.mox.StubOutWithMock(self.volume.driver, '_execute') self.volume.driver._execute("sudo", "ietadm", "--op", "show", - "--tid=%(tid)d" % locals() - ).AndRaise(exception.ProcessExecutionError()) + "--tid=%(tid)d" % locals()).AndRaise( + exception.ProcessExecutionError()) self.mox.ReplayAll() self.assertRaises(exception.ProcessExecutionError, diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py index 66a973a78..e54ffe712 100644 --- a/nova/tests/test_xenapi.py +++ b/nova/tests/test_xenapi.py @@ -186,6 +186,7 @@ class XenAPIVMTestCase(test.TestCase): stubs.stubout_stream_disk(self.stubs) stubs.stubout_is_vdi_pv(self.stubs) self.stubs.Set(VMOps, 'reset_network', reset_network) + stubs.stub_out_vm_methods(self.stubs) glance_stubs.stubout_glance_client(self.stubs, glance_stubs.FakeGlance) self.conn = xenapi_conn.get_connection(False) @@ -369,6 +370,17 @@ class XenAPIVMTestCase(test.TestCase): self.assertEquals(vif_rec['qos_algorithm_params']['kbps'], str(4 * 1024)) + def test_rescue(self): + instance = self._create_instance() + conn = xenapi_conn.get_connection(False) + conn.rescue(instance, None) + + def test_unrescue(self): + instance = self._create_instance() + conn = xenapi_conn.get_connection(False) + # Ensure that it will not unrescue a non-rescued instance. + self.assertRaises(Exception, conn.unrescue, instance, None) + def tearDown(self): super(XenAPIVMTestCase, self).tearDown() self.manager.delete_project(self.project) diff --git a/nova/tests/test_zones.py b/nova/tests/test_zones.py index 5a52a0506..688dc704d 100644 --- a/nova/tests/test_zones.py +++ b/nova/tests/test_zones.py @@ -76,6 +76,40 @@ class ZoneManagerTestCase(test.TestCase): self.assertEquals(len(zm.zone_states), 1) self.assertEquals(zm.zone_states[1].username, 'user1') + def test_service_capabilities(self): + zm = zone_manager.ZoneManager() + caps = zm.get_zone_capabilities(self, None) + self.assertEquals(caps, {}) + + zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2)) + caps = zm.get_zone_capabilities(self, None) + self.assertEquals(caps, dict(svc1_a=(1, 1), svc1_b=(2, 2))) + + zm.update_service_capabilities("svc1", "host1", dict(a=2, b=3)) + caps = zm.get_zone_capabilities(self, None) + self.assertEquals(caps, dict(svc1_a=(2, 2), svc1_b=(3, 3))) + + zm.update_service_capabilities("svc1", "host2", dict(a=20, b=30)) + caps = zm.get_zone_capabilities(self, None) + self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30))) + + zm.update_service_capabilities("svc10", "host1", dict(a=99, b=99)) + caps = zm.get_zone_capabilities(self, None) + self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30), + svc10_a=(99, 99), svc10_b=(99, 99))) + + zm.update_service_capabilities("svc1", "host3", dict(c=5)) + caps = zm.get_zone_capabilities(self, None) + self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30), + svc1_c=(5, 5), svc10_a=(99, 99), + svc10_b=(99, 99))) + + caps = zm.get_zone_capabilities(self, 'svc1') + self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30), + svc1_c=(5, 5))) + caps = zm.get_zone_capabilities(self, 'svc10') + self.assertEquals(caps, dict(svc10_a=(99, 99), svc10_b=(99, 99))) + def test_refresh_from_db_replace_existing(self): zm = zone_manager.ZoneManager() zone_state = zone_manager.ZoneState() diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py index 7f9706a3d..7c33710c0 100644 --- a/nova/tests/xenapi/stubs.py +++ b/nova/tests/xenapi/stubs.py @@ -185,6 +185,25 @@ class FakeSessionForVMTests(fake.SessionBase): pass +def stub_out_vm_methods(stubs): + def fake_shutdown(self, inst, vm, method="clean"): + pass + + def fake_acquire_bootlock(self, vm): + pass + + def fake_release_bootlock(self, vm): + pass + + def fake_spawn_rescue(self, inst): + pass + + stubs.Set(vmops.VMOps, "_shutdown", fake_shutdown) + stubs.Set(vmops.VMOps, "_acquire_bootlock", fake_acquire_bootlock) + stubs.Set(vmops.VMOps, "_release_bootlock", fake_release_bootlock) + stubs.Set(vmops.VMOps, "spawn_rescue", fake_spawn_rescue) + + class FakeSessionForVolumeTests(fake.SessionBase): """ Stubs out a XenAPISession for Volume tests """ def __init__(self, uri): diff --git a/nova/utils.py b/nova/utils.py index e4d8a70eb..fa01f2c94 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -171,10 +171,6 @@ def execute(*cmd, **kwargs): stdout=stdout, stderr=stderr, cmd=' '.join(cmd)) - # NOTE(termie): this appears to be necessary to let the subprocess - # call clean something up in between calls, without - # it two execute calls in a row hangs the second one - greenthread.sleep(0) return result except ProcessExecutionError: if not attempts: @@ -183,6 +179,11 @@ def execute(*cmd, **kwargs): LOG.debug(_("%r failed. Retrying."), cmd) if delay_on_retry: greenthread.sleep(random.randint(20, 200) / 100.0) + finally: + # NOTE(termie): this appears to be necessary to let the subprocess + # call clean something up in between calls, without + # it two execute calls in a row hangs the second one + greenthread.sleep(0) def ssh_execute(ssh, cmd, process_input=None, @@ -310,11 +311,15 @@ def get_my_linklocal(interface): def to_global_ipv6(prefix, mac): - mac64 = netaddr.EUI(mac).eui64().words - int_addr = int(''.join(['%02x' % i for i in mac64]), 16) - mac64_addr = netaddr.IPAddress(int_addr) - maskIP = netaddr.IPNetwork(prefix).ip - return (mac64_addr ^ netaddr.IPAddress('::0200:0:0:0') | maskIP).format() + try: + mac64 = netaddr.EUI(mac).eui64().words + int_addr = int(''.join(['%02x' % i for i in mac64]), 16) + mac64_addr = netaddr.IPAddress(int_addr) + maskIP = netaddr.IPNetwork(prefix).ip + return (mac64_addr ^ netaddr.IPAddress('::0200:0:0:0') | maskIP).\ + format() + except TypeError: + raise TypeError(_("Bad mac for to_global_ipv6: %s") % mac) def to_mac(ipv6_address): @@ -336,11 +341,8 @@ utcnow.override_time = None def is_older_than(before, seconds): - """Return True if before is older than 'seconds'""" - if utcnow() - before > datetime.timedelta(seconds=seconds): - return True - else: - return False + """Return True if before is older than seconds""" + return utcnow() - before > datetime.timedelta(seconds=seconds) def utcnow_ts(): diff --git a/nova/virt/driver.py b/nova/virt/driver.py index 0e3a4aa3b..f9cf1b8aa 100644 --- a/nova/virt/driver.py +++ b/nova/virt/driver.py @@ -61,7 +61,7 @@ class ComputeDriver(object): """Return a list of InstanceInfo for all registered VMs""" raise NotImplementedError() - def spawn(self, instance): + def spawn(self, instance, network_info=None): """Launch a VM for the specified instance""" raise NotImplementedError() diff --git a/nova/virt/fake.py b/nova/virt/fake.py index 5b0fe1877..7018f8c1b 100644 --- a/nova/virt/fake.py +++ b/nova/virt/fake.py @@ -344,7 +344,7 @@ class FakeConnection(driver.ComputeDriver): Note that this function takes an instance ID, not a compute.service.Instance, so that it can be called by compute.monitor. """ - return [0L, 0L, 0L, 0L, null] + return [0L, 0L, 0L, 0L, None] def interface_stats(self, instance_name, iface_id): """ diff --git a/nova/virt/interfaces.template b/nova/virt/interfaces.template index 3b34e54f4..e527cf35c 100644 --- a/nova/virt/interfaces.template +++ b/nova/virt/interfaces.template @@ -5,19 +5,20 @@ auto lo iface lo inet loopback -# The primary network interface -auto eth0 -iface eth0 inet static - address ${address} - netmask ${netmask} - broadcast ${broadcast} - gateway ${gateway} - dns-nameservers ${dns} +#for $ifc in $interfaces +auto ${ifc.name} +iface ${ifc.name} inet static + address ${ifc.address} + netmask ${ifc.netmask} + broadcast ${ifc.broadcast} + gateway ${ifc.gateway} + dns-nameservers ${ifc.dns} #if $use_ipv6 -iface eth0 inet6 static - address ${address_v6} - netmask ${netmask_v6} - gateway ${gateway_v6} +iface ${ifc.name} inet6 static + address ${ifc.address_v6} + netmask ${ifc.netmask_v6} + gateway ${ifc.gateway_v6} #end if +#end for diff --git a/nova/virt/libvirt.xml.template b/nova/virt/libvirt.xml.template index ef2d2cd6b..d74a9e85b 100644 --- a/nova/virt/libvirt.xml.template +++ b/nova/virt/libvirt.xml.template @@ -69,21 +69,24 @@ </disk> #end if #end if + +#for $nic in $nics <interface type='bridge'> - <source bridge='${bridge_name}'/> - <mac address='${mac_address}'/> + <source bridge='${nic.bridge_name}'/> + <mac address='${nic.mac_address}'/> <!-- <model type='virtio'/> CANT RUN virtio network right now --> - <filterref filter="nova-instance-${name}"> - <parameter name="IP" value="${ip_address}" /> - <parameter name="DHCPSERVER" value="${dhcp_server}" /> -#if $getVar('extra_params', False) - ${extra_params} + <filterref filter="nova-instance-${name}-${nic.id}"> + <parameter name="IP" value="${nic.ip_address}" /> + <parameter name="DHCPSERVER" value="${nic.dhcp_server}" /> +#if $getVar('nic.extra_params', False) + ${nic.extra_params} #end if -#if $getVar('gateway_v6', False) - <parameter name="RASERVER" value="${gateway_v6}" /> +#if $getVar('nic.gateway_v6', False) + <parameter name="RASERVER" value="${nic.gateway_v6}" /> #end if </filterref> </interface> +#end for <!-- The order is significant here. File must be defined first --> <serial type="file"> diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py index e1a0a6f29..2cecb010d 100644 --- a/nova/virt/libvirt_conn.py +++ b/nova/virt/libvirt_conn.py @@ -153,6 +153,51 @@ def _get_ip_version(cidr): return int(net.version()) +def _get_network_info(instance): + # TODO(adiantum) If we will keep this function + # we should cache network_info + admin_context = context.get_admin_context() + + ip_addresses = db.fixed_ip_get_all_by_instance(admin_context, + instance['id']) + + networks = db.network_get_all_by_instance(admin_context, + instance['id']) + network_info = [] + + def ip_dict(ip): + return { + "ip": ip.address, + "netmask": network["netmask"], + "enabled": "1"} + + def ip6_dict(ip6): + prefix = ip6.network.cidr_v6 + mac = instance.mac_address + return { + "ip": utils.to_global_ipv6(prefix, mac), + "netmask": ip6.network.netmask_v6, + "gateway": ip6.network.gateway_v6, + "enabled": "1"} + + for network in networks: + network_ips = [ip for ip in ip_addresses + if ip.network_id == network.id] + + mapping = { + 'label': network['label'], + 'gateway': network['gateway'], + 'mac': instance.mac_address, + 'dns': [network['dns']], + 'ips': [ip_dict(ip) for ip in network_ips]} + + if FLAGS.use_ipv6: + mapping['ip6s'] = [ip6_dict(ip) for ip in network_ips] + + network_info.append((network, mapping)) + return network_info + + class LibvirtConnection(driver.ComputeDriver): def __init__(self, read_only): @@ -444,16 +489,18 @@ class LibvirtConnection(driver.ComputeDriver): def poll_rescued_instances(self, timeout): pass + # NOTE(ilyaalekseyev): Implementation like in multinics + # for xenapi(tr3buchet) @exception.wrap_exception - def spawn(self, instance): - xml = self.to_xml(instance) + def spawn(self, instance, network_info=None): + xml = self.to_xml(instance, network_info) db.instance_set_state(context.get_admin_context(), instance['id'], power_state.NOSTATE, 'launching') - self.firewall_driver.setup_basic_filtering(instance) - self.firewall_driver.prepare_instance_filter(instance) - self._create_image(instance, xml) + self.firewall_driver.setup_basic_filtering(instance, network_info) + self.firewall_driver.prepare_instance_filter(instance, network_info) + self._create_image(instance, xml, network_info) self._conn.createXML(xml, 0) LOG.debug(_("instance %s: is running"), instance['name']) self.firewall_driver.apply_instance_filter(instance) @@ -609,7 +656,14 @@ class LibvirtConnection(driver.ComputeDriver): utils.execute('truncate', target, '-s', "%dG" % local_gb) # TODO(vish): should we format disk by default? - def _create_image(self, inst, libvirt_xml, suffix='', disk_images=None): + def _create_image(self, inst, libvirt_xml, suffix='', disk_images=None, + network_info=None): + if not network_info: + network_info = _get_network_info(inst) + + if not suffix: + suffix = '' + # syntactic nicety def basepath(fname='', suffix=suffix): return os.path.join(FLAGS.instances_path, @@ -685,28 +739,35 @@ class LibvirtConnection(driver.ComputeDriver): key = str(inst['key_data']) net = None - network_ref = db.network_get_by_instance(context.get_admin_context(), - inst['id']) - if network_ref['injected']: - admin_context = context.get_admin_context() - address = db.instance_get_fixed_address(admin_context, inst['id']) + + nets = [] + ifc_template = open(FLAGS.injected_network_template).read() + ifc_num = -1 + admin_context = context.get_admin_context() + for (network_ref, mapping) in network_info: + ifc_num += 1 + + if not 'injected' in network_ref: + continue + + address = mapping['ips'][0]['ip'] address_v6 = None if FLAGS.use_ipv6: - address_v6 = db.instance_get_fixed_address_v6(admin_context, - inst['id']) - - interfaces_info = {'address': address, - 'netmask': network_ref['netmask'], - 'gateway': network_ref['gateway'], - 'broadcast': network_ref['broadcast'], - 'dns': network_ref['dns'], - 'address_v6': address_v6, - 'gateway_v6': network_ref['gateway_v6'], - 'netmask_v6': network_ref['netmask_v6'], - 'use_ipv6': FLAGS.use_ipv6} - - net = str(Template(self.interfaces_xml, - searchList=[interfaces_info])) + address_v6 = mapping['ip6s'][0]['ip'] + net_info = {'name': 'eth%d' % ifc_num, + 'address': address, + 'netmask': network_ref['netmask'], + 'gateway': network_ref['gateway'], + 'broadcast': network_ref['broadcast'], + 'dns': network_ref['dns'], + 'address_v6': address_v6, + 'gateway_v6': network_ref['gateway_v6'], + 'netmask_v6': network_ref['netmask_v6'], + 'use_ipv6': FLAGS.use_ipv6} + nets.append(net_info) + + net = str(Template(ifc_template, searchList=[{'interfaces': nets}])) + if key or net: inst_name = inst['name'] img_id = inst.image_id @@ -728,20 +789,11 @@ class LibvirtConnection(driver.ComputeDriver): if FLAGS.libvirt_type == 'uml': utils.execute('sudo', 'chown', 'root', basepath('disk')) - def to_xml(self, instance, rescue=False): - # TODO(termie): cache? - LOG.debug(_('instance %s: starting toXML method'), instance['name']) - network = db.network_get_by_instance(context.get_admin_context(), - instance['id']) - # FIXME(vish): stick this in db - instance_type = instance['instance_type'] - # instance_type = test.INSTANCE_TYPES[instance_type] - instance_type = instance_types.get_instance_type(instance_type) - ip_address = db.instance_get_fixed_address(context.get_admin_context(), - instance['id']) + def _get_nic_for_xml(self, network, mapping): # Assume that the gateway also acts as the dhcp server. dhcp_server = network['gateway'] gateway_v6 = network['gateway_v6'] + mac_id = mapping['mac'].replace(':', '') if FLAGS.allow_project_net_traffic: if FLAGS.use_ipv6: @@ -766,6 +818,38 @@ class LibvirtConnection(driver.ComputeDriver): (net, mask) else: extra_params = "\n" + + result = { + 'id': mac_id, + 'bridge_name': network['bridge'], + 'mac_address': mapping['mac'], + 'ip_address': mapping['ips'][0]['ip'], + 'dhcp_server': dhcp_server, + 'extra_params': extra_params, + } + + if gateway_v6: + result['gateway_v6'] = gateway_v6 + "/128" + + return result + + def to_xml(self, instance, rescue=False, network_info=None): + # TODO(termie): cache? + LOG.debug(_('instance %s: starting toXML method'), instance['name']) + + # TODO(adiantum) remove network_info creation code + # when multinics will be completed + if not network_info: + network_info = _get_network_info(instance) + + nics = [] + for (network, mapping) in network_info: + nics.append(self._get_nic_for_xml(network, + mapping)) + # FIXME(vish): stick this in db + instance_type_name = instance['instance_type'] + instance_type = instance_types.get_instance_type(instance_type_name) + if FLAGS.use_cow_images: driver_type = 'qcow2' else: @@ -777,17 +861,11 @@ class LibvirtConnection(driver.ComputeDriver): instance['name']), 'memory_kb': instance_type['memory_mb'] * 1024, 'vcpus': instance_type['vcpus'], - 'bridge_name': network['bridge'], - 'mac_address': instance['mac_address'], - 'ip_address': ip_address, - 'dhcp_server': dhcp_server, - 'extra_params': extra_params, 'rescue': rescue, 'local': instance_type['local_gb'], - 'driver_type': driver_type} + 'driver_type': driver_type, + 'nics': nics} - if gateway_v6: - xml_info['gateway_v6'] = gateway_v6 + "/128" if not rescue: if instance['kernel_id']: xml_info['kernel'] = xml_info['basepath'] + "/kernel" @@ -800,7 +878,6 @@ class LibvirtConnection(driver.ComputeDriver): xml = str(Template(self.libvirt_xml, searchList=[xml_info])) LOG.debug(_('instance %s: finished toXML method'), instance['name']) - return xml def get_info(self, instance_name): @@ -1008,7 +1085,18 @@ class LibvirtConnection(driver.ComputeDriver): """ - return self._conn.getVersion() + # NOTE(justinsb): getVersion moved between libvirt versions + # Trying to do be compatible with older versions is a lost cause + # But ... we can at least give the user a nice message + method = getattr(self._conn, 'getVersion', None) + if method is None: + raise exception.Error(_("libvirt version is too old" + " (does not support getVersion)")) + # NOTE(justinsb): If we wanted to get the version, we could: + # method = getattr(libvirt, 'getVersion', None) + # NOTE(justinsb): This would then rely on a proper version check + + return method() def get_cpu_info(self): """Get cpuinfo information. @@ -1305,7 +1393,7 @@ class LibvirtConnection(driver.ComputeDriver): class FirewallDriver(object): - def prepare_instance_filter(self, instance): + def prepare_instance_filter(self, instance, network_info=None): """Prepare filters for the instance. At this point, the instance isn't running yet.""" @@ -1339,7 +1427,7 @@ class FirewallDriver(object): the security group.""" raise NotImplementedError() - def setup_basic_filtering(self, instance): + def setup_basic_filtering(self, instance, network_info=None): """Create rules to block spoofing and allow dhcp. This gets called when spawning an instance, before @@ -1348,11 +1436,6 @@ class FirewallDriver(object): """ raise NotImplementedError() - def _gateway_v6_for_instance(self, instance): - network = db.network_get_by_instance(context.get_admin_context(), - instance['id']) - return network['gateway_v6'] - class NWFilterFirewall(FirewallDriver): """ @@ -1444,10 +1527,13 @@ class NWFilterFirewall(FirewallDriver): </rule> </filter>''' - def setup_basic_filtering(self, instance): + def setup_basic_filtering(self, instance, network_info=None): """Set up basic filtering (MAC, IP, and ARP spoofing protection)""" logging.info('called setup_basic_filtering in nwfilter') + if not network_info: + network_info = _get_network_info(instance) + if self.handle_security_groups: # No point in setting up a filter set that we'll be overriding # anyway. @@ -1456,9 +1542,11 @@ class NWFilterFirewall(FirewallDriver): logging.info('ensuring static filters') self._ensure_static_filters() - instance_filter_name = self._instance_filter_name(instance) - self._define_filter(self._filter_container(instance_filter_name, - ['nova-base'])) + for (network, mapping) in network_info: + nic_id = mapping['mac'].replace(':', '') + instance_filter_name = self._instance_filter_name(instance, nic_id) + self._define_filter(self._filter_container(instance_filter_name, + ['nova-base'])) def _ensure_static_filters(self): if self.static_filters_configured: @@ -1549,48 +1637,60 @@ class NWFilterFirewall(FirewallDriver): # Nothing to do pass - def prepare_instance_filter(self, instance): + def prepare_instance_filter(self, instance, network_info=None): """ Creates an NWFilter for the given instance. In the process, it makes sure the filters for the security groups as well as the base filter are all in place. """ + if not network_info: + network_info = _get_network_info(instance) if instance['image_id'] == FLAGS.vpn_image_id: base_filter = 'nova-vpn' else: base_filter = 'nova-base' - instance_filter_name = self._instance_filter_name(instance) - instance_secgroup_filter_name = '%s-secgroup' % (instance_filter_name,) - instance_filter_children = [base_filter, instance_secgroup_filter_name] + ctxt = context.get_admin_context() + + instance_secgroup_filter_name = \ + '%s-secgroup' % (self._instance_filter_name(instance)) + #% (instance_filter_name,) + instance_secgroup_filter_children = ['nova-base-ipv4', 'nova-base-ipv6', 'nova-allow-dhcp-server'] - if FLAGS.use_ipv6: - gateway_v6 = self._gateway_v6_for_instance(instance) - if gateway_v6: - instance_secgroup_filter_children += ['nova-allow-ra-server'] - - ctxt = context.get_admin_context() - - if FLAGS.allow_project_net_traffic: - instance_filter_children += ['nova-project'] - if FLAGS.use_ipv6: - instance_filter_children += ['nova-project-v6'] - for security_group in db.security_group_get_by_instance(ctxt, - instance['id']): + for security_group in \ + db.security_group_get_by_instance(ctxt, instance['id']): self.refresh_security_group_rules(security_group['id']) instance_secgroup_filter_children += [('nova-secgroup-%s' % - security_group['id'])] + security_group['id'])] - self._define_filter( + self._define_filter( self._filter_container(instance_secgroup_filter_name, instance_secgroup_filter_children)) - self._define_filter( + for (network, mapping) in network_info: + nic_id = mapping['mac'].replace(':', '') + instance_filter_name = self._instance_filter_name(instance, nic_id) + instance_filter_children = \ + [base_filter, instance_secgroup_filter_name] + + if FLAGS.use_ipv6: + gateway_v6 = network['gateway_v6'] + + if gateway_v6: + instance_secgroup_filter_children += \ + ['nova-allow-ra-server'] + + if FLAGS.allow_project_net_traffic: + instance_filter_children += ['nova-project'] + if FLAGS.use_ipv6: + instance_filter_children += ['nova-project-v6'] + + self._define_filter( self._filter_container(instance_filter_name, instance_filter_children)) @@ -1638,8 +1738,10 @@ class NWFilterFirewall(FirewallDriver): xml += "chain='ipv4'>%s</filter>" % rule_xml return xml - def _instance_filter_name(self, instance): - return 'nova-instance-%s' % instance['name'] + def _instance_filter_name(self, instance, nic_id=None): + if not nic_id: + return 'nova-instance-%s' % (instance['name']) + return 'nova-instance-%s-%s' % (instance['name'], nic_id) class IptablesFirewallDriver(FirewallDriver): @@ -1654,9 +1756,11 @@ class IptablesFirewallDriver(FirewallDriver): self.iptables.ipv6['filter'].add_chain('sg-fallback') self.iptables.ipv6['filter'].add_rule('sg-fallback', '-j DROP') - def setup_basic_filtering(self, instance): + def setup_basic_filtering(self, instance, network_info=None): """Use NWFilter from libvirt for this.""" - return self.nwfilter.setup_basic_filtering(instance) + if not network_info: + network_info = _get_network_info(instance) + return self.nwfilter.setup_basic_filtering(instance, network_info) def apply_instance_filter(self, instance): """No-op. Everything is done in prepare_instance_filter""" @@ -1670,29 +1774,40 @@ class IptablesFirewallDriver(FirewallDriver): LOG.info(_('Attempted to unfilter instance %s which is not ' 'filtered'), instance['id']) - def prepare_instance_filter(self, instance): + def prepare_instance_filter(self, instance, network_info=None): + if not network_info: + network_info = _get_network_info(instance) self.instances[instance['id']] = instance - self.add_filters_for_instance(instance) + self.add_filters_for_instance(instance, network_info) self.iptables.apply() - def add_filters_for_instance(self, instance): + def add_filters_for_instance(self, instance, network_info=None): + if not network_info: + network_info = _get_network_info(instance) chain_name = self._instance_chain_name(instance) self.iptables.ipv4['filter'].add_chain(chain_name) - ipv4_address = self._ip_for_instance(instance) - self.iptables.ipv4['filter'].add_rule('local', - '-d %s -j $%s' % - (ipv4_address, chain_name)) + + ips_v4 = [ip['ip'] for (_, mapping) in network_info + for ip in mapping['ips']] + + for ipv4_address in ips_v4: + self.iptables.ipv4['filter'].add_rule('local', + '-d %s -j $%s' % + (ipv4_address, chain_name)) if FLAGS.use_ipv6: self.iptables.ipv6['filter'].add_chain(chain_name) - ipv6_address = self._ip_for_instance_v6(instance) - self.iptables.ipv6['filter'].add_rule('local', - '-d %s -j $%s' % - (ipv6_address, - chain_name)) + ips_v6 = [ip['ip'] for (_, mapping) in network_info + for ip in mapping['ip6s']] + + for ipv6_address in ips_v6: + self.iptables.ipv6['filter'].add_rule('local', + '-d %s -j $%s' % + (ipv6_address, + chain_name)) - ipv4_rules, ipv6_rules = self.instance_rules(instance) + ipv4_rules, ipv6_rules = self.instance_rules(instance, network_info) for rule in ipv4_rules: self.iptables.ipv4['filter'].add_rule(chain_name, rule) @@ -1708,7 +1823,9 @@ class IptablesFirewallDriver(FirewallDriver): if FLAGS.use_ipv6: self.iptables.ipv6['filter'].remove_chain(chain_name) - def instance_rules(self, instance): + def instance_rules(self, instance, network_info=None): + if not network_info: + network_info = _get_network_info(instance) ctxt = context.get_admin_context() ipv4_rules = [] @@ -1722,28 +1839,36 @@ class IptablesFirewallDriver(FirewallDriver): ipv4_rules += ['-m state --state ESTABLISHED,RELATED -j ACCEPT'] ipv6_rules += ['-m state --state ESTABLISHED,RELATED -j ACCEPT'] - dhcp_server = self._dhcp_server_for_instance(instance) - ipv4_rules += ['-s %s -p udp --sport 67 --dport 68 ' - '-j ACCEPT' % (dhcp_server,)] + dhcp_servers = [network['gateway'] for (network, _m) in network_info] + + for dhcp_server in dhcp_servers: + ipv4_rules.append('-s %s -p udp --sport 67 --dport 68 ' + '-j ACCEPT' % (dhcp_server,)) #Allow project network traffic if FLAGS.allow_project_net_traffic: - cidr = self._project_cidr_for_instance(instance) - ipv4_rules += ['-s %s -j ACCEPT' % (cidr,)] + cidrs = [network['cidr'] for (network, _m) in network_info] + for cidr in cidrs: + ipv4_rules.append('-s %s -j ACCEPT' % (cidr,)) # We wrap these in FLAGS.use_ipv6 because they might cause # a DB lookup. The other ones are just list operations, so # they're not worth the clutter. if FLAGS.use_ipv6: # Allow RA responses - gateway_v6 = self._gateway_v6_for_instance(instance) - if gateway_v6: - ipv6_rules += ['-s %s/128 -p icmpv6 -j ACCEPT' % (gateway_v6,)] + gateways_v6 = [network['gateway_v6'] for (network, _) in + network_info] + for gateway_v6 in gateways_v6: + ipv6_rules.append( + '-s %s/128 -p icmpv6 -j ACCEPT' % (gateway_v6,)) #Allow project network traffic if FLAGS.allow_project_net_traffic: - cidrv6 = self._project_cidrv6_for_instance(instance) - ipv6_rules += ['-s %s -j ACCEPT' % (cidrv6,)] + cidrv6s = [network['cidr_v6'] for (network, _m) + in network_info] + + for cidrv6 in cidrv6s: + ipv6_rules.append('-s %s -j ACCEPT' % (cidrv6,)) security_groups = db.security_group_get_by_instance(ctxt, instance['id']) @@ -1825,31 +1950,3 @@ class IptablesFirewallDriver(FirewallDriver): def _instance_chain_name(self, instance): return 'inst-%s' % (instance['id'],) - - def _ip_for_instance(self, instance): - return db.instance_get_fixed_address(context.get_admin_context(), - instance['id']) - - def _ip_for_instance_v6(self, instance): - return db.instance_get_fixed_address_v6(context.get_admin_context(), - instance['id']) - - def _dhcp_server_for_instance(self, instance): - network = db.network_get_by_instance(context.get_admin_context(), - instance['id']) - return network['gateway'] - - def _gateway_v6_for_instance(self, instance): - network = db.network_get_by_instance(context.get_admin_context(), - instance['id']) - return network['gateway_v6'] - - def _project_cidr_for_instance(self, instance): - network = db.network_get_by_instance(context.get_admin_context(), - instance['id']) - return network['cidr'] - - def _project_cidrv6_for_instance(self, instance): - network = db.network_get_by_instance(context.get_admin_context(), - instance['id']) - return network['cidr_v6'] diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py index af39a3def..419b9ad90 100644 --- a/nova/virt/xenapi/vmops.py +++ b/nova/virt/xenapi/vmops.py @@ -117,6 +117,10 @@ class VMOps(object): vm_ref = self._create_vm(instance, vdi_uuid, network_info) self._spawn(instance, vm_ref) + def spawn_rescue(self, instance): + """Spawn a rescue instance""" + self.spawn(instance) + def _create_vm(self, instance, vdi_uuid, network_info=None): """Create VM instance""" instance_name = instance.name @@ -543,7 +547,7 @@ class VMOps(object): vbd_refs = self._session.get_xenapi().VM.get_VBDs(rescue_vm_ref) for vbd_ref in vbd_refs: vbd_rec = self._session.get_xenapi().VBD.get_record(vbd_ref) - if vbd_rec["userdevice"] == "1": # primary VBD is always 1 + if vbd_rec.get("userdevice", None) == "1": # VBD is always 1 VMHelper.unplug_vbd(self._session, vbd_ref) VMHelper.destroy_vbd(self._session, vbd_ref) @@ -680,18 +684,18 @@ class VMOps(object): """ rescue_vm_ref = VMHelper.lookup(self._session, - instance.name + "-rescue") + "%s-rescue" % instance.name) if rescue_vm_ref: raise RuntimeError(_( "Instance is already in Rescue Mode: %s" % instance.name)) - vm_ref = self._get_vm_opaque_ref(instance) + vm_ref = VMHelper.lookup(self._session, instance.name) self._shutdown(instance, vm_ref) self._acquire_bootlock(vm_ref) instance._rescue = True - self.spawn(instance) - rescue_vm_ref = self._get_vm_opaque_ref(instance) + self.spawn_rescue(instance) + rescue_vm_ref = VMHelper.lookup(self._session, instance.name) vbd_ref = self._session.get_xenapi().VM.get_VBDs(vm_ref)[0] vdi_ref = self._session.get_xenapi().VBD.get_record(vbd_ref)["VDI"] @@ -708,13 +712,13 @@ class VMOps(object): """ rescue_vm_ref = VMHelper.lookup(self._session, - instance.name + "-rescue") + "%s-rescue" % instance.name) if not rescue_vm_ref: raise exception.NotFound(_( "Instance is not in Rescue Mode: %s" % instance.name)) - original_vm_ref = self._get_vm_opaque_ref(instance) + original_vm_ref = VMHelper.lookup(self._session, instance.name) instance._rescue = False self._destroy_rescue_instance(rescue_vm_ref) @@ -727,24 +731,24 @@ class VMOps(object): in rescue mode for >= the provided timeout """ last_ran = self.poll_rescue_last_ran - if last_ran: - if not utils.is_older_than(last_ran, timeout): - # Do not run. Let's bail. - return - else: - # Update the time tracker and proceed. - self.poll_rescue_last_ran = utils.utcnow() - else: + if not last_ran: # We need a base time to start tracking. self.poll_rescue_last_ran = utils.utcnow() return + if not utils.is_older_than(last_ran, timeout): + # Do not run. Let's bail. + return + + # Update the time tracker and proceed. + self.poll_rescue_last_ran = utils.utcnow() + rescue_vms = [] for instance in self.list_instances(): if instance.endswith("-rescue"): rescue_vms.append(dict(name=instance, - vm_ref=VMHelper.lookup(self._session, - instance))) + vm_ref=VMHelper.lookup(self._session, + instance))) for vm in rescue_vms: rescue_name = vm["name"] diff --git a/nova/volume/api.py b/nova/volume/api.py index 2f4494845..4b4bb9dc5 100644 --- a/nova/volume/api.py +++ b/nova/volume/api.py @@ -82,7 +82,8 @@ class API(base.Base): self.db.volume_update(context, volume_id, fields) def get(self, context, volume_id): - return self.db.volume_get(context, volume_id) + rv = self.db.volume_get(context, volume_id) + return dict(rv.iteritems()) def get_all(self, context): if context.is_admin: diff --git a/nova/volume/manager.py b/nova/volume/manager.py index 9dea35b35..2178389ce 100644 --- a/nova/volume/manager.py +++ b/nova/volume/manager.py @@ -64,14 +64,15 @@ flags.DEFINE_boolean('use_local_volumes', True, 'if True, will not discover local volumes') -class VolumeManager(manager.Manager): +class VolumeManager(manager.SchedulerDependentManager): """Manages attachable block storage devices.""" def __init__(self, volume_driver=None, *args, **kwargs): """Load the driver from the one specified in args, or from flags.""" if not volume_driver: volume_driver = FLAGS.volume_driver self.driver = utils.import_object(volume_driver) - super(VolumeManager, self).__init__(*args, **kwargs) + super(VolumeManager, self).__init__(service_name='volume', + *args, **kwargs) # NOTE(vish): Implementation specific db handling is done # by the driver. self.driver.db = self.db |
