summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xbin/nova-cells53
-rw-r--r--nova/cells/__init__.py19
-rw-r--r--nova/cells/driver.py41
-rw-r--r--nova/cells/manager.py136
-rw-r--r--nova/cells/messaging.py1047
-rw-r--r--nova/cells/opts.py44
-rw-r--r--nova/cells/rpc_driver.py165
-rw-r--r--nova/cells/rpcapi.py138
-rw-r--r--nova/cells/scheduler.py136
-rw-r--r--nova/cells/state.py346
-rw-r--r--nova/compute/api.py8
-rw-r--r--nova/compute/cells_api.py471
-rw-r--r--nova/db/api.py69
-rw-r--r--nova/exception.py28
-rw-r--r--nova/tests/cells/__init__.py19
-rw-r--r--nova/tests/cells/fakes.py191
-rw-r--r--nova/tests/cells/test_cells_manager.py151
-rw-r--r--nova/tests/cells/test_cells_messaging.py913
-rw-r--r--nova/tests/cells/test_cells_rpc_driver.py218
-rw-r--r--nova/tests/cells/test_cells_rpcapi.py206
-rw-r--r--nova/tests/cells/test_cells_scheduler.py206
-rw-r--r--nova/tests/compute/test_compute.py4
-rw-r--r--nova/tests/compute/test_compute_cells.py99
-rw-r--r--setup.py1
24 files changed, 4696 insertions, 13 deletions
diff --git a/bin/nova-cells b/bin/nova-cells
new file mode 100755
index 000000000..a7e16ef53
--- /dev/null
+++ b/bin/nova-cells
@@ -0,0 +1,53 @@
+#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Starter script for Nova Cells Service."""
+
+import eventlet
+eventlet.monkey_patch()
+
+import os
+import sys
+
+# If ../nova/__init__.py exists, add ../ to Python search path, so that
+# it will override what happens to be installed in /usr/(local/)lib/python...
+possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
+ os.pardir,
+ os.pardir))
+if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
+ sys.path.insert(0, possible_topdir)
+
+from nova import config
+from nova.openstack.common import cfg
+from nova.openstack.common import log as logging
+from nova import service
+from nova import utils
+
+CONF = cfg.CONF
+CONF.import_opt('topic', 'nova.cells.opts', group='cells')
+CONF.import_opt('manager', 'nova.cells.opts', group='cells')
+
+if __name__ == '__main__':
+ config.parse_args(sys.argv)
+ logging.setup('nova')
+ utils.monkey_patch()
+ server = service.Service.create(binary='nova-cells',
+ topic=CONF.cells.topic,
+ manager=CONF.cells.manager)
+ service.serve(server)
+ service.wait()
diff --git a/nova/cells/__init__.py b/nova/cells/__init__.py
new file mode 100644
index 000000000..47d21a14b
--- /dev/null
+++ b/nova/cells/__init__.py
@@ -0,0 +1,19 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2012 Rackspace Hosting
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Cells
+"""
diff --git a/nova/cells/driver.py b/nova/cells/driver.py
new file mode 100644
index 000000000..04e29dddf
--- /dev/null
+++ b/nova/cells/driver.py
@@ -0,0 +1,41 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Cells Communication Driver
+"""
+
+
+class BaseCellsDriver(object):
+ """The base class for cells communication.
+
+ One instance of this class will be created for every neighbor cell
+ that we find in the DB and it will be associated with the cell in
+ its CellState.
+
+ One instance is also created by the cells manager for setting up
+ the consumers.
+ """
+ def start_consumers(self, msg_runner):
+ """Start any consumers the driver may need."""
+ raise NotImplementedError()
+
+ def stop_consumers(self):
+ """Stop consuming messages."""
+ raise NotImplementedError()
+
+ def send_message_to_cell(self, cell_state, message):
+ """Send a message to a cell."""
+ raise NotImplementedError()
diff --git a/nova/cells/manager.py b/nova/cells/manager.py
new file mode 100644
index 000000000..a1352601c
--- /dev/null
+++ b/nova/cells/manager.py
@@ -0,0 +1,136 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Cells Service Manager
+"""
+
+from nova.cells import messaging
+from nova.cells import state as cells_state
+from nova import context
+from nova import manager
+from nova.openstack.common import cfg
+from nova.openstack.common import importutils
+from nova.openstack.common import log as logging
+
+cell_manager_opts = [
+ cfg.StrOpt('driver',
+ default='nova.cells.rpc_driver.CellsRPCDriver',
+ help='Cells communication driver to use'),
+]
+
+
+LOG = logging.getLogger(__name__)
+
+CONF = cfg.CONF
+CONF.register_opts(cell_manager_opts, group='cells')
+
+
+class CellsManager(manager.Manager):
+ """The nova-cells manager class. This class defines RPC
+ methods that the local cell may call. This class is NOT used for
+ messages coming from other cells. That communication is
+ driver-specific.
+
+ Communication to other cells happens via the messaging module. The
+ MessageRunner from that module will handle routing the message to
+ the correct cell via the communications driver. Most methods below
+ create 'targeted' (where we want to route a message to a specific cell)
+ or 'broadcast' (where we want a message to go to multiple cells)
+ messages.
+
+ Scheduling requests get passed to the scheduler class.
+ """
+ RPC_API_VERSION = '1.0'
+
+ def __init__(self, *args, **kwargs):
+ # Mostly for tests.
+ cell_state_manager = kwargs.pop('cell_state_manager', None)
+ super(CellsManager, self).__init__(*args, **kwargs)
+ if cell_state_manager is None:
+ cell_state_manager = cells_state.CellStateManager
+ self.state_manager = cell_state_manager()
+ self.msg_runner = messaging.MessageRunner(self.state_manager)
+ cells_driver_cls = importutils.import_class(
+ CONF.cells.driver)
+ self.driver = cells_driver_cls()
+
+ def post_start_hook(self):
+ """Have the driver start its consumers for inter-cell communication.
+ Also ask our child cells for their capacities and capabilities so
+ we get them more quickly than just waiting for the next periodic
+ update. Receiving the updates from the children will cause us to
+ update our parents. If we don't have any children, just update
+ our parents immediately.
+ """
+ # FIXME(comstud): There's currently no hooks when services are
+ # stopping, so we have no way to stop consumers cleanly.
+ self.driver.start_consumers(self.msg_runner)
+ ctxt = context.get_admin_context()
+ if self.state_manager.get_child_cells():
+ self.msg_runner.ask_children_for_capabilities(ctxt)
+ self.msg_runner.ask_children_for_capacities(ctxt)
+ else:
+ self._update_our_parents(ctxt)
+
+ @manager.periodic_task
+ def _update_our_parents(self, ctxt):
+ """Update our parent cells with our capabilities and capacity
+ if we're at the bottom of the tree.
+ """
+ self.msg_runner.tell_parents_our_capabilities(ctxt)
+ self.msg_runner.tell_parents_our_capacities(ctxt)
+
+ def schedule_run_instance(self, ctxt, host_sched_kwargs):
+ """Pick a cell (possibly ourselves) to build new instance(s)
+ and forward the request accordingly.
+ """
+ # Target is ourselves first.
+ our_cell = self.state_manager.get_my_state()
+ self.msg_runner.schedule_run_instance(ctxt, our_cell,
+ host_sched_kwargs)
+
+ def run_compute_api_method(self, ctxt, cell_name, method_info, call):
+ """Call a compute API method in a specific cell."""
+ response = self.msg_runner.run_compute_api_method(ctxt,
+ cell_name,
+ method_info,
+ call)
+ if call:
+ return response.value_or_raise()
+
+ def instance_update_at_top(self, ctxt, instance):
+ """Update an instance at the top level cell."""
+ self.msg_runner.instance_update_at_top(ctxt, instance)
+
+ def instance_destroy_at_top(self, ctxt, instance):
+ """Destroy an instance at the top level cell."""
+ self.msg_runner.instance_destroy_at_top(ctxt, instance)
+
+ def instance_delete_everywhere(self, ctxt, instance, delete_type):
+ """This is used by API cell when it didn't know what cell
+ an instance was in, but the instance was requested to be
+ deleted or soft_deleted. So, we'll broadcast this everywhere.
+ """
+ self.msg_runner.instance_delete_everywhere(ctxt, instance,
+ delete_type)
+
+ def instance_fault_create_at_top(self, ctxt, instance_fault):
+ """Create an instance fault at the top level cell."""
+ self.msg_runner.instance_fault_create_at_top(ctxt, instance_fault)
+
+ def bw_usage_update_at_top(self, ctxt, bw_update_info):
+ """Update bandwidth usage at top level cell."""
+ self.msg_runner.bw_usage_update_at_top(ctxt, bw_update_info)
diff --git a/nova/cells/messaging.py b/nova/cells/messaging.py
new file mode 100644
index 000000000..e5617e742
--- /dev/null
+++ b/nova/cells/messaging.py
@@ -0,0 +1,1047 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Cell messaging module.
+
+This module defines the different message types that are passed between
+cells and the methods that they can call when the target cell has been
+reached.
+
+The interface into this module is the MessageRunner class.
+"""
+import sys
+
+from eventlet import queue
+
+from nova.cells import state as cells_state
+from nova import compute
+from nova import context
+from nova.db import base
+from nova import exception
+from nova.openstack.common import cfg
+from nova.openstack.common import excutils
+from nova.openstack.common import importutils
+from nova.openstack.common import jsonutils
+from nova.openstack.common import log as logging
+from nova.openstack.common.rpc import common as rpc_common
+from nova.openstack.common import uuidutils
+from nova import utils
+
+
+cell_messaging_opts = [
+ cfg.IntOpt('max_hop_count',
+ default=10,
+ help='Maximum number of hops for cells routing.'),
+ cfg.StrOpt('scheduler',
+ default='nova.cells.scheduler.CellsScheduler',
+ help='Cells scheduler to use')]
+
+CONF = cfg.CONF
+CONF.import_opt('name', 'nova.cells.opts', group='cells')
+CONF.import_opt('call_timeout', 'nova.cells.opts', group='cells')
+CONF.register_opts(cell_messaging_opts, group='cells')
+
+LOG = logging.getLogger(__name__)
+
+# Separator used between cell names for the 'full cell name' and routing
+# path.
+_PATH_CELL_SEP = '!'
+
+
+def _reverse_path(path):
+ """Reverse a path. Used for sending responses upstream."""
+ path_parts = path.split(_PATH_CELL_SEP)
+ path_parts.reverse()
+ return _PATH_CELL_SEP.join(path_parts)
+
+
+def _response_cell_name_from_path(routing_path, neighbor_only=False):
+ """Reverse the routing_path. If we only want to send to our parent,
+ set neighbor_only to True.
+ """
+ path = _reverse_path(routing_path)
+ if not neighbor_only or len(path) == 1:
+ return path
+ return _PATH_CELL_SEP.join(path.split(_PATH_CELL_SEP)[:2])
+
+
+#
+# Message classes.
+#
+
+
+class _BaseMessage(object):
+ """Base message class. It defines data that is passed with every
+ single message through every cell.
+
+ Messages are JSON-ified before sending and turned back into a
+ class instance when being received.
+
+ Every message has a unique ID. This is used to route responses
+ back to callers. In the future, this might be used to detect
+ receiving the same message more than once.
+
+ routing_path is updated on every hop through a cell. The current
+ cell name is appended to it (cells are separated by
+ _PATH_CELL_SEP ('!')). This is used to tell if we've reached the
+ target cell and also to determine the source of a message for
+ responses by reversing it.
+
+ hop_count is incremented and compared against max_hop_count. The
+ only current usefulness of this is to break out of a routing loop
+ if someone has a broken config.
+
+ fanout means to send to all nova-cells services running in a cell.
+ This is useful for capacity and capability broadcasting as well
+ as making sure responses get back to the nova-cells service that
+ is waiting.
+ """
+
+ # Override message_type in a subclass
+ message_type = None
+
+ base_attrs_to_json = ['message_type',
+ 'ctxt',
+ 'method_name',
+ 'method_kwargs',
+ 'direction',
+ 'need_response',
+ 'fanout',
+ 'uuid',
+ 'routing_path',
+ 'hop_count',
+ 'max_hop_count']
+
+ def __init__(self, msg_runner, ctxt, method_name, method_kwargs,
+ direction, need_response=False, fanout=False, uuid=None,
+ routing_path=None, hop_count=0, max_hop_count=None,
+ **kwargs):
+ self.ctxt = ctxt
+ self.resp_queue = None
+ self.msg_runner = msg_runner
+ self.state_manager = msg_runner.state_manager
+ # Copy these.
+ self.base_attrs_to_json = self.base_attrs_to_json[:]
+ # Normally this would just be CONF.cells.name, but going through
+ # the msg_runner allows us to stub it more easily.
+ self.our_path_part = self.msg_runner.our_name
+ self.uuid = uuid
+ if self.uuid is None:
+ self.uuid = uuidutils.generate_uuid()
+ self.method_name = method_name
+ self.method_kwargs = method_kwargs
+ self.direction = direction
+ self.need_response = need_response
+ self.fanout = fanout
+ self.routing_path = routing_path
+ self.hop_count = hop_count
+ if max_hop_count is None:
+ max_hop_count = CONF.cells.max_hop_count
+ self.max_hop_count = max_hop_count
+ self.is_broadcast = False
+ self._append_hop()
+ # Each sub-class should set this when the message is inited
+ self.next_hops = []
+ self.resp_queue = None
+
+ def __repr__(self):
+ _dict = self._to_dict()
+ _dict.pop('method_kwargs')
+ return "<%s: %s>" % (self.__class__.__name__, _dict)
+
+ def _append_hop(self):
+ """Add our hop to the routing_path."""
+ routing_path = (self.routing_path and
+ self.routing_path + _PATH_CELL_SEP or '')
+ self.routing_path = routing_path + self.our_path_part
+ self.hop_count += 1
+
+ def _at_max_hop_count(self, do_raise=True):
+ """Check if we're at the max hop count. If we are and do_raise is
+ True, raise CellMaxHopCountReached. If we are at the max and
+ do_raise is False... return True, else False.
+ """
+ if self.hop_count >= self.max_hop_count:
+ if do_raise:
+ raise exception.CellMaxHopCountReached(
+ hop_count=self.hop_count)
+ return True
+ return False
+
+ def _process_locally(self):
+ """Its been determined that we should process this message in this
+ cell. Go through the MessageRunner to call the appropriate
+ method for this message. Catch the response and/or exception and
+ encode it within a Response instance. Return it so the caller
+ can potentially return it to another cell... or return it to
+ a caller waiting in this cell.
+ """
+ try:
+ resp_value = self.msg_runner._process_message_locally(self)
+ failure = False
+ except Exception as exc:
+ resp_value = sys.exc_info()
+ failure = True
+ LOG.exception(_("Error processing message locally: %(exc)s"),
+ locals())
+ return Response(self.routing_path, resp_value, failure)
+
+ def _setup_response_queue(self):
+ """Shortcut to creating a response queue in the MessageRunner."""
+ self.resp_queue = self.msg_runner._setup_response_queue(self)
+
+ def _cleanup_response_queue(self):
+ """Shortcut to deleting a response queue in the MessageRunner."""
+ if self.resp_queue:
+ self.msg_runner._cleanup_response_queue(self)
+ self.resp_queue = None
+
+ def _wait_for_json_responses(self, num_responses=1):
+ """Wait for response(s) to be put into the eventlet queue. Since
+ each queue entry actually contains a list of JSON-ified responses,
+ combine them all into a single list to return.
+
+ Destroy the eventlet queue when done.
+ """
+ if not self.resp_queue:
+ # Source is not actually expecting a response
+ return
+ responses = []
+ wait_time = CONF.cells.call_timeout
+ try:
+ for x in xrange(num_responses):
+ json_responses = self.resp_queue.get(timeout=wait_time)
+ responses.extend(json_responses)
+ except queue.Empty:
+ raise exception.CellTimeout()
+ finally:
+ self._cleanup_response_queue()
+ return responses
+
+ def _send_json_responses(self, json_responses, neighbor_only=False,
+ fanout=False):
+ """Send list of responses to this message. Responses passed here
+ are JSON-ified. Targeted messages have a single response while
+ Broadcast messages may have multiple responses.
+
+ If this cell was the source of the message, these responses will
+ be returned from self.process().
+
+ Otherwise, we will route the response to the source of the
+ request. If 'neighbor_only' is True, the response will be sent
+ to the neighbor cell, not the original requester. Broadcast
+ messages get aggregated at each hop, so neighbor_only will be
+ True for those messages.
+ """
+ if not self.need_response:
+ return
+ if self.source_is_us():
+ responses = []
+ for json_response in json_responses:
+ responses.append(Response.from_json(json_response))
+ return responses
+ direction = self.direction == 'up' and 'down' or 'up'
+ response_kwargs = {'orig_message': self.to_json(),
+ 'responses': json_responses}
+ target_cell = _response_cell_name_from_path(self.routing_path,
+ neighbor_only=neighbor_only)
+ response = self.msg_runner._create_response_message(self.ctxt,
+ direction, target_cell, self.uuid, response_kwargs,
+ fanout=fanout)
+ response.process()
+
+ def _send_response(self, response, neighbor_only=False):
+ """Send a response to this message. If the source of the
+ request was ourselves, just return the response. It'll be
+ passed back to the caller of self.process(). See DocString for
+ _send_json_responses() as it handles most of the real work for
+ this method.
+
+ 'response' is an instance of Response class.
+ """
+ if not self.need_response:
+ return
+ if self.source_is_us():
+ return response
+ self._send_json_responses([response.to_json()],
+ neighbor_only=neighbor_only)
+
+ def _send_response_from_exception(self, exc_info):
+ """Take an exception as returned from sys.exc_info(), encode
+ it in a Response, and send it.
+ """
+ response = Response(self.routing_path, exc_info, True)
+ return self._send_response(response)
+
+ def _to_dict(self):
+ """Convert a message to a dictionary. Only used internally."""
+ _dict = {}
+ for key in self.base_attrs_to_json:
+ _dict[key] = getattr(self, key)
+ return _dict
+
+ def to_json(self):
+ """Convert a message into JSON for sending to a sibling cell."""
+ _dict = self._to_dict()
+ # Convert context to dict.
+ _dict['ctxt'] = _dict['ctxt'].to_dict()
+ return jsonutils.dumps(_dict)
+
+ def source_is_us(self):
+ """Did this cell create this message?"""
+ return self.routing_path == self.our_path_part
+
+ def process(self):
+ """Process a message. Deal with it locally and/or forward it to a
+ sibling cell.
+
+ Override in a subclass.
+ """
+ raise NotImplementedError()
+
+
+class _TargetedMessage(_BaseMessage):
+ """A targeted message is a message that is destined for a specific
+ single cell.
+
+ 'target_cell' can be a full cell name like 'api!child-cell' or it can
+ be an instance of the CellState class if the target is a neighbor cell.
+ """
+ message_type = 'targeted'
+
+ def __init__(self, msg_runner, ctxt, method_name, method_kwargs,
+ direction, target_cell, **kwargs):
+ super(_TargetedMessage, self).__init__(msg_runner, ctxt,
+ method_name, method_kwargs, direction, **kwargs)
+ if isinstance(target_cell, cells_state.CellState):
+ # Neighbor cell or ourselves. Convert it to a 'full path'.
+ if target_cell.is_me:
+ target_cell = self.our_path_part
+ else:
+ target_cell = '%s%s%s' % (self.our_path_part,
+ _PATH_CELL_SEP,
+ target_cell.name)
+ self.target_cell = target_cell
+ self.base_attrs_to_json.append('target_cell')
+
+ def _get_next_hop(self):
+ """Return the cell name for the next hop. If the next hop is
+ the current cell, return None.
+ """
+ if self.target_cell == self.routing_path:
+ return self.state_manager.my_cell_state
+ target_cell = self.target_cell
+ routing_path = self.routing_path
+ current_hops = routing_path.count(_PATH_CELL_SEP)
+ next_hop_num = current_hops + 1
+ dest_hops = target_cell.count(_PATH_CELL_SEP)
+ if dest_hops < current_hops:
+ reason = _("destination is %(target_cell)s but routing_path "
+ "is %(routing_path)s") % locals()
+ raise exception.CellRoutingInconsistency(reason=reason)
+ dest_name_parts = target_cell.split(_PATH_CELL_SEP)
+ if (_PATH_CELL_SEP.join(dest_name_parts[:next_hop_num]) !=
+ routing_path):
+ reason = _("destination is %(target_cell)s but routing_path "
+ "is %(routing_path)s") % locals()
+ raise exception.CellRoutingInconsistency(reason=reason)
+ next_hop_name = dest_name_parts[next_hop_num]
+ if self.direction == 'up':
+ next_hop = self.state_manager.get_parent_cell(next_hop_name)
+ else:
+ next_hop = self.state_manager.get_child_cell(next_hop_name)
+ if not next_hop:
+ cell_type = 'parent' if self.direction == 'up' else 'child'
+ reason = _("Unknown %(cell_type)s when routing to "
+ "%(target_cell)s") % locals()
+ raise exception.CellRoutingInconsistency(reason=reason)
+ return next_hop
+
+ def process(self):
+ """Process a targeted message. This is called for all cells
+ that touch this message. If the local cell is the one that
+ created this message, we reply directly with a Response instance.
+ If the local cell is not the target, an eventlet queue is created
+ and we wait for the response to show up via another thread
+ receiving the Response back.
+
+ Responses to targeted messages are routed directly back to the
+ source. No eventlet queues are created in intermediate hops.
+
+ All exceptions for processing the message across the whole
+ routing path are caught and encoded within the Response and
+ returned to the caller.
+ """
+ try:
+ next_hop = self._get_next_hop()
+ except Exception as exc:
+ exc_info = sys.exc_info()
+ LOG.exception(_("Error locating next hop for message: %(exc)s"),
+ locals())
+ return self._send_response_from_exception(exc_info)
+
+ if next_hop.is_me:
+ # Final destination.
+ response = self._process_locally()
+ return self._send_response(response)
+
+ # Need to forward via neighbor cell.
+ if self.need_response and self.source_is_us():
+ # A response is needed and the source of the message is
+ # this cell. Create the eventlet queue.
+ self._setup_response_queue()
+ wait_for_response = True
+ else:
+ wait_for_response = False
+
+ try:
+ # This is inside the try block, so we can encode the
+ # exception and return it to the caller.
+ if self.hop_count >= self.max_hop_count:
+ raise exception.CellMaxHopCountReached(
+ hop_count=self.hop_count)
+ next_hop.send_message(self)
+ except Exception as exc:
+ exc_info = sys.exc_info()
+ err_str = _("Failed to send message to cell: %(next_hop)s: "
+ "%(exc)s")
+ LOG.exception(err_str, locals())
+ self._cleanup_response_queue()
+ return self._send_response_from_exception(exc_info)
+
+ if wait_for_response:
+ # Targeted messages only have 1 response.
+ remote_response = self._wait_for_json_responses()[0]
+ return Response.from_json(remote_response)
+
+
+class _BroadcastMessage(_BaseMessage):
+ """A broadcast message. This means to call a method in every single
+ cell going in a certain direction.
+ """
+ message_type = 'broadcast'
+
+ def __init__(self, msg_runner, ctxt, method_name, method_kwargs,
+ direction, run_locally=True, **kwargs):
+ super(_BroadcastMessage, self).__init__(msg_runner, ctxt,
+ method_name, method_kwargs, direction, **kwargs)
+ # The local cell creating this message has the option
+ # to be able to process the message locally or not.
+ self.run_locally = run_locally
+ self.is_broadcast = True
+
+ def _get_next_hops(self):
+ """Set the next hops and return the number of hops. The next
+ hops may include ourself.
+ """
+ if self.hop_count >= self.max_hop_count:
+ return []
+ if self.direction == 'down':
+ return self.state_manager.get_child_cells()
+ else:
+ return self.state_manager.get_parent_cells()
+
+ def _send_to_cells(self, target_cells):
+ """Send a message to multiple cells."""
+ for cell in target_cells:
+ cell.send_message(self)
+
+ def _send_json_responses(self, json_responses):
+ """Responses to broadcast messages always need to go to the
+ neighbor cell from which we received this message. That
+ cell aggregates the responses and makes sure to forward them
+ to the correct source.
+ """
+ return super(_BroadcastMessage, self)._send_json_responses(
+ json_responses, neighbor_only=True, fanout=True)
+
+ def process(self):
+ """Process a broadcast message. This is called for all cells
+ that touch this message.
+
+ The message is sent to all cells in the certain direction and
+ the creator of this message has the option of whether or not
+ to process it locally as well.
+
+ If responses from all cells are required, each hop creates an
+ eventlet queue and waits for responses from its immediate
+ neighbor cells. All responses are then aggregated into a
+ single list and are returned to the neighbor cell until the
+ source is reached.
+
+ When the source is reached, a list of Response instances are
+ returned to the caller.
+
+ All exceptions for processing the message across the whole
+ routing path are caught and encoded within the Response and
+ returned to the caller. It is possible to get a mix of
+ successful responses and failure responses. The caller is
+ responsible for dealing with this.
+ """
+ try:
+ next_hops = self._get_next_hops()
+ except Exception as exc:
+ exc_info = sys.exc_info()
+ LOG.exception(_("Error locating next hops for message: %(exc)s"),
+ locals())
+ return self._send_response_from_exception(exc_info)
+
+ # Short circuit if we don't need to respond
+ if not self.need_response:
+ if self.run_locally:
+ self._process_locally()
+ self._send_to_cells(next_hops)
+ return
+
+ # We'll need to aggregate all of the responses (from ourself
+ # and our sibling cells) into 1 response
+ try:
+ self._setup_response_queue()
+ self._send_to_cells(next_hops)
+ except Exception as exc:
+ # Error just trying to send to cells. Send a single response
+ # with the failure.
+ exc_info = sys.exc_info()
+ LOG.exception(_("Error sending message to next hops: %(exc)s"),
+ locals())
+ self._cleanup_response_queue()
+ return self._send_response_from_exception(exc_info)
+
+ if self.run_locally:
+ # Run locally and store the Response.
+ local_response = self._process_locally()
+ else:
+ local_response = None
+
+ try:
+ remote_responses = self._wait_for_json_responses(
+ num_responses=len(next_hops))
+ except Exception as exc:
+ # Error waiting for responses, most likely a timeout.
+ # Send a single response back with the failure.
+ exc_info = sys.exc_info()
+ err_str = _("Error waiting for responses from neighbor cells: "
+ "%(exc)s")
+ LOG.exception(err_str, locals())
+ return self._send_response_from_exception(exc_info)
+
+ if local_response:
+ remote_responses.append(local_response.to_json())
+ return self._send_json_responses(remote_responses)
+
+
+class _ResponseMessage(_TargetedMessage):
+ """A response message is really just a special targeted message,
+ saying to call 'parse_responses' when we reach the source of a 'call'.
+
+ The 'fanout' attribute on this message may be true if we're responding
+ to a broadcast or if we're about to respond to the source of an
+ original target message. Because multiple nova-cells services may
+ be running within a cell, we need to make sure the response gets
+ back to the correct one, so we have to fanout.
+ """
+ message_type = 'response'
+
+ def __init__(self, msg_runner, ctxt, method_name, method_kwargs,
+ direction, target_cell, response_uuid, **kwargs):
+ super(_ResponseMessage, self).__init__(msg_runner, ctxt,
+ method_name, method_kwargs, direction, target_cell, **kwargs)
+ self.response_uuid = response_uuid
+ self.base_attrs_to_json.append('response_uuid')
+
+ def process(self):
+ """Process a response. If the target is the local cell, process
+ the response here. Otherwise, forward it to where it needs to
+ go.
+ """
+ next_hop = self._get_next_hop()
+ if next_hop.is_me:
+ self._process_locally()
+ return
+ if self.fanout is False:
+ # Really there's 1 more hop on each of these below, but
+ # it doesn't matter for this logic.
+ target_hops = self.target_cell.count(_PATH_CELL_SEP)
+ current_hops = self.routing_path.count(_PATH_CELL_SEP)
+ if current_hops + 1 == target_hops:
+ # Next hop is the target.. so we must fanout. See
+ # DocString above.
+ self.fanout = True
+ next_hop.send_message(self)
+
+
+#
+# Methods that may be called when processing messages after reaching
+# a target cell.
+#
+
+
+class _BaseMessageMethods(base.Base):
+ """Base class for defining methods by message types."""
+ def __init__(self, msg_runner):
+ super(_BaseMessageMethods, self).__init__()
+ self.msg_runner = msg_runner
+ self.state_manager = msg_runner.state_manager
+ self.compute_api = compute.API()
+
+
+class _ResponseMessageMethods(_BaseMessageMethods):
+ """Methods that are called from a ResponseMessage. There's only
+ 1 method (parse_responses) and it is called when the message reaches
+ the source of a 'call'. All we do is stuff the response into the
+ eventlet queue to signal the caller that's waiting.
+ """
+ def parse_responses(self, message, orig_message, responses):
+ self.msg_runner._put_response(message.response_uuid,
+ responses)
+
+
+class _TargetedMessageMethods(_BaseMessageMethods):
+ """These are the methods that can be called when routing a message
+ to a specific cell.
+ """
+ def __init__(self, *args, **kwargs):
+ super(_TargetedMessageMethods, self).__init__(*args, **kwargs)
+
+ def schedule_run_instance(self, message, host_sched_kwargs):
+ """Parent cell told us to schedule new instance creation."""
+ self.msg_runner.scheduler.run_instance(message, host_sched_kwargs)
+
+ def run_compute_api_method(self, message, method_info):
+ """Run a method in the compute api class."""
+ method = method_info['method']
+ fn = getattr(self.compute_api, method, None)
+ if not fn:
+ detail = _("Unknown method '%(method)s' in compute API")
+ raise exception.CellServiceAPIMethodNotFound(
+ detail=detail % locals())
+ args = list(method_info['method_args'])
+ # 1st arg is instance_uuid that we need to turn into the
+ # instance object.
+ instance_uuid = args[0]
+ try:
+ instance = self.db.instance_get_by_uuid(message.ctxt,
+ instance_uuid)
+ except exception.InstanceNotFound:
+ with excutils.save_and_reraise_exception():
+ # Must be a race condition. Let's try to resolve it by
+ # telling the top level cells that this instance doesn't
+ # exist.
+ instance = {'uuid': instance_uuid}
+ self.msg_runner.instance_destroy_at_top(message.ctxt,
+ instance)
+ args[0] = instance
+ return fn(message.ctxt, *args, **method_info['method_kwargs'])
+
+ def update_capabilities(self, message, cell_name, capabilities):
+ """A child cell told us about their capabilities."""
+ LOG.debug(_("Received capabilities from child cell "
+ "%(cell_name)s: %(capabilities)s"), locals())
+ self.state_manager.update_cell_capabilities(cell_name,
+ capabilities)
+ # Go ahead and update our parents now that a child updated us
+ self.msg_runner.tell_parents_our_capabilities(message.ctxt)
+
+ def update_capacities(self, message, cell_name, capacities):
+ """A child cell told us about their capacity."""
+ LOG.debug(_("Received capacities from child cell "
+ "%(cell_name)s: %(capacities)s"), locals())
+ self.state_manager.update_cell_capacities(cell_name,
+ capacities)
+ # Go ahead and update our parents now that a child updated us
+ self.msg_runner.tell_parents_our_capacities(message.ctxt)
+
+ def announce_capabilities(self, message):
+ """A parent cell has told us to send our capabilities, so let's
+ do so.
+ """
+ self.msg_runner.tell_parents_our_capabilities(message.ctxt)
+
+ def announce_capacities(self, message):
+ """A parent cell has told us to send our capacity, so let's
+ do so.
+ """
+ self.msg_runner.tell_parents_our_capacities(message.ctxt)
+
+
+class _BroadcastMessageMethods(_BaseMessageMethods):
+ """These are the methods that can be called as a part of a broadcast
+ message.
+ """
+ def _at_the_top(self):
+ """Are we the API level?"""
+ return not self.state_manager.get_parent_cells()
+
+ def instance_update_at_top(self, message, instance, **kwargs):
+ """Update an instance in the DB if we're a top level cell."""
+ if not self._at_the_top():
+ return
+ instance_uuid = instance['uuid']
+ routing_path = message.routing_path
+ instance['cell_name'] = _reverse_path(routing_path)
+ # Remove things that we can't update in the top level cells.
+ # 'cell_name' is included in this list.. because we'll set it
+ # ourselves based on the reverse of the routing path. metadata
+ # is only updated in the API cell, so we don't listen to what
+ # the child cell tells us.
+ items_to_remove = ['id', 'security_groups', 'instance_type',
+ 'volumes', 'cell_name', 'name', 'metadata']
+ for key in items_to_remove:
+ instance.pop(key, None)
+
+ # Fixup info_cache. We'll have to update this separately if
+ # it exists.
+ info_cache = instance.pop('info_cache', None)
+ if info_cache is not None:
+ info_cache.pop('id', None)
+ info_cache.pop('instance', None)
+
+ # Fixup system_metadata (should be a dict for update, not a list)
+ if ('system_metadata' in instance and
+ isinstance(instance['system_metadata'], list)):
+ sys_metadata = dict([(md['key'], md['value'])
+ for md in instance['system_metadata']])
+ instance['system_metadata'] = sys_metadata
+
+ LOG.debug(_("Got update for instance %(instance_uuid)s: "
+ "%(instance)s") % locals())
+
+ # It's possible due to some weird condition that the instance
+ # was already set as deleted... so we'll attempt to update
+ # it with permissions that allows us to read deleted.
+ with utils.temporary_mutation(message.ctxt, read_deleted="yes"):
+ try:
+ self.db.instance_update(message.ctxt, instance_uuid,
+ instance, update_cells=False)
+ except exception.NotFound:
+ # FIXME(comstud): Strange. Need to handle quotas here,
+ # if we actually want this code to remain..
+ self.db.instance_create(message.ctxt, instance)
+ if info_cache:
+ self.db.instance_info_cache_update(message.ctxt, instance_uuid,
+ info_cache, update_cells=False)
+
+ def instance_destroy_at_top(self, message, instance, **kwargs):
+ """Destroy an instance from the DB if we're a top level cell."""
+ if not self._at_the_top():
+ return
+ instance_uuid = instance['uuid']
+ LOG.debug(_("Got update to delete instance %(instance_uuid)s") %
+ locals())
+ try:
+ self.db.instance_destroy(message.ctxt, instance_uuid,
+ update_cells=False)
+ except exception.InstanceNotFound:
+ pass
+
+ def instance_delete_everywhere(self, message, instance, delete_type,
+ **kwargs):
+ """Call compute API delete() or soft_delete() in every cell.
+ This is used when the API cell doesn't know what cell an instance
+ belongs to but the instance was requested to be deleted or
+ soft-deleted. So, we'll run it everywhere.
+ """
+ LOG.debug(_("Got broadcast to %(delete_type)s delete instance"),
+ locals(), instance=instance)
+ if delete_type == 'soft':
+ self.compute_api.soft_delete(message.ctxt, instance)
+ else:
+ self.compute_api.delete(message.ctxt, instance)
+
+ def instance_fault_create_at_top(self, message, instance_fault, **kwargs):
+ """Destroy an instance from the DB if we're a top level cell."""
+ if not self._at_the_top():
+ return
+ items_to_remove = ['id']
+ for key in items_to_remove:
+ instance_fault.pop(key, None)
+ log_str = _("Got message to create instance fault: "
+ "%(instance_fault)s")
+ LOG.debug(log_str, locals())
+ self.db.instance_fault_create(message.ctxt, instance_fault)
+
+ def bw_usage_update_at_top(self, message, bw_update_info, **kwargs):
+ """Update Bandwidth usage in the DB if we're a top level cell."""
+ if not self._at_the_top():
+ return
+ self.db.bw_usage_update(message.ctxt, **bw_update_info)
+
+
+_CELL_MESSAGE_TYPE_TO_MESSAGE_CLS = {'targeted': _TargetedMessage,
+ 'broadcast': _BroadcastMessage,
+ 'response': _ResponseMessage}
+_CELL_MESSAGE_TYPE_TO_METHODS_CLS = {'targeted': _TargetedMessageMethods,
+ 'broadcast': _BroadcastMessageMethods,
+ 'response': _ResponseMessageMethods}
+
+
+#
+# Below are the public interfaces into this module.
+#
+
+
+class MessageRunner(object):
+ """This class is the main interface into creating messages and
+ processing them.
+
+ Public methods in this class are typically called by the CellsManager
+ to create a new message and process it with the exception of
+ 'message_from_json' which should be used by CellsDrivers to convert
+ a JSONified message it has received back into the appropriate Message
+ class.
+
+ Private methods are used internally when we need to keep some
+ 'global' state. For instance, eventlet queues used for responses are
+ held in this class. Also, when a Message is process()ed above and
+ it's determined we should take action locally,
+ _process_message_locally() will be called.
+
+ When needing to add a new method to call in a Cell2Cell message,
+ define the new method below and also add it to the appropriate
+ MessageMethods class where the real work will be done.
+ """
+
+ def __init__(self, state_manager):
+ self.state_manager = state_manager
+ cells_scheduler_cls = importutils.import_class(
+ CONF.cells.scheduler)
+ self.scheduler = cells_scheduler_cls(self)
+ self.response_queues = {}
+ self.methods_by_type = {}
+ self.our_name = CONF.cells.name
+ for msg_type, cls in _CELL_MESSAGE_TYPE_TO_METHODS_CLS.iteritems():
+ self.methods_by_type[msg_type] = cls(self)
+
+ def _process_message_locally(self, message):
+ """Message processing will call this when its determined that
+ the message should be processed within this cell. Find the
+ method to call based on the message type, and call it. The
+ caller is responsible for catching exceptions and returning
+ results to cells, if needed.
+ """
+ methods = self.methods_by_type[message.message_type]
+ fn = getattr(methods, message.method_name)
+ return fn(message, **message.method_kwargs)
+
+ def _put_response(self, response_uuid, response):
+ """Put a response into a response queue. This is called when
+ a _ResponseMessage is processed in the cell that initiated a
+ 'call' to another cell.
+ """
+ resp_queue = self.response_queues.get(response_uuid)
+ if not resp_queue:
+ # Response queue is gone. We must have restarted or we
+ # received a response after our timeout period.
+ return
+ resp_queue.put(response)
+
+ def _setup_response_queue(self, message):
+ """Set up an eventlet queue to use to wait for replies.
+
+ Replies come back from the target cell as a _ResponseMessage
+ being sent back to the source.
+ """
+ resp_queue = queue.Queue()
+ self.response_queues[message.uuid] = resp_queue
+ return resp_queue
+
+ def _cleanup_response_queue(self, message):
+ """Stop tracking the response queue either because we're
+ done receiving responses, or we've timed out.
+ """
+ try:
+ del self.response_queues[message.uuid]
+ except KeyError:
+ # Ignore if queue is gone already somehow.
+ pass
+
+ def _create_response_message(self, ctxt, direction, target_cell,
+ response_uuid, response_kwargs, **kwargs):
+ """Create a ResponseMessage. This is used internally within
+ the messaging module.
+ """
+ return _ResponseMessage(self, ctxt, 'parse_responses',
+ response_kwargs, direction, target_cell,
+ response_uuid, **kwargs)
+
+ def message_from_json(self, json_message):
+ """Turns a message in JSON format into an appropriate Message
+ instance. This is called when cells receive a message from
+ another cell.
+ """
+ message_dict = jsonutils.loads(json_message)
+ message_type = message_dict.pop('message_type')
+ # Need to convert context back.
+ ctxt = message_dict['ctxt']
+ message_dict['ctxt'] = context.RequestContext.from_dict(ctxt)
+ message_cls = _CELL_MESSAGE_TYPE_TO_MESSAGE_CLS[message_type]
+ return message_cls(self, **message_dict)
+
+ def ask_children_for_capabilities(self, ctxt):
+ """Tell child cells to send us capabilities. This is typically
+ called on startup of the nova-cells service.
+ """
+ child_cells = self.state_manager.get_child_cells()
+ for child_cell in child_cells:
+ message = _TargetedMessage(self, ctxt,
+ 'announce_capabilities',
+ dict(), 'down', child_cell)
+ message.process()
+
+ def ask_children_for_capacities(self, ctxt):
+ """Tell child cells to send us capacities. This is typically
+ called on startup of the nova-cells service.
+ """
+ child_cells = self.state_manager.get_child_cells()
+ for child_cell in child_cells:
+ message = _TargetedMessage(self, ctxt, 'announce_capacities',
+ dict(), 'down', child_cell)
+ message.process()
+
+ def tell_parents_our_capabilities(self, ctxt):
+ """Send our capabilities to parent cells."""
+ parent_cells = self.state_manager.get_parent_cells()
+ if not parent_cells:
+ return
+ my_cell_info = self.state_manager.get_my_state()
+ capabs = self.state_manager.get_our_capabilities()
+ LOG.debug(_("Updating parents with our capabilities: %(capabs)s"),
+ locals())
+ # We have to turn the sets into lists so they can potentially
+ # be json encoded when the raw message is sent.
+ for key, values in capabs.items():
+ capabs[key] = list(values)
+ method_kwargs = {'cell_name': my_cell_info.name,
+ 'capabilities': capabs}
+ for cell in parent_cells:
+ message = _TargetedMessage(self, ctxt, 'update_capabilities',
+ method_kwargs, 'up', cell, fanout=True)
+ message.process()
+
+ def tell_parents_our_capacities(self, ctxt):
+ """Send our capacities to parent cells."""
+ parent_cells = self.state_manager.get_parent_cells()
+ if not parent_cells:
+ return
+ my_cell_info = self.state_manager.get_my_state()
+ capacities = self.state_manager.get_our_capacities()
+ LOG.debug(_("Updating parents with our capacities: %(capacities)s"),
+ locals())
+ method_kwargs = {'cell_name': my_cell_info.name,
+ 'capacities': capacities}
+ for cell in parent_cells:
+ message = _TargetedMessage(self, ctxt, 'update_capacities',
+ method_kwargs, 'up', cell, fanout=True)
+ message.process()
+
+ def schedule_run_instance(self, ctxt, target_cell, host_sched_kwargs):
+ """Called by the scheduler to tell a child cell to schedule
+ a new instance for build.
+ """
+ method_kwargs = dict(host_sched_kwargs=host_sched_kwargs)
+ message = _TargetedMessage(self, ctxt, 'schedule_run_instance',
+ method_kwargs, 'down',
+ target_cell)
+ message.process()
+
+ def run_compute_api_method(self, ctxt, cell_name, method_info, call):
+ """Call a compute API method in a specific cell."""
+ message = _TargetedMessage(self, ctxt, 'run_compute_api_method',
+ dict(method_info=method_info), 'down',
+ cell_name, need_response=call)
+ return message.process()
+
+ def instance_update_at_top(self, ctxt, instance):
+ """Update an instance at the top level cell."""
+ message = _BroadcastMessage(self, ctxt, 'instance_update_at_top',
+ dict(instance=instance), 'up',
+ run_locally=False)
+ message.process()
+
+ def instance_destroy_at_top(self, ctxt, instance):
+ """Destroy an instance at the top level cell."""
+ message = _BroadcastMessage(self, ctxt, 'instance_destroy_at_top',
+ dict(instance=instance), 'up',
+ run_locally=False)
+ message.process()
+
+ def instance_delete_everywhere(self, ctxt, instance, delete_type):
+ """This is used by API cell when it didn't know what cell
+ an instance was in, but the instance was requested to be
+ deleted or soft_deleted. So, we'll broadcast this everywhere.
+ """
+ method_kwargs = dict(instance=instance, delete_type=delete_type)
+ message = _BroadcastMessage(self, ctxt,
+ 'instance_delete_everywhere',
+ method_kwargs, 'down',
+ run_locally=False)
+ message.process()
+
+ def instance_fault_create_at_top(self, ctxt, instance_fault):
+ """Create an instance fault at the top level cell."""
+ message = _BroadcastMessage(self, ctxt,
+ 'instance_fault_create_at_top',
+ dict(instance_fault=instance_fault),
+ 'up', run_locally=False)
+ message.process()
+
+ def bw_usage_update_at_top(self, ctxt, bw_update_info):
+ """Update bandwidth usage at top level cell."""
+ message = _BroadcastMessage(self, ctxt, 'bw_usage_update_at_top',
+ dict(bw_update_info=bw_update_info),
+ 'up', run_locally=False)
+ message.process()
+
+ @staticmethod
+ def get_message_types():
+ return _CELL_MESSAGE_TYPE_TO_MESSAGE_CLS.keys()
+
+
+class Response(object):
+ """Holds a response from a cell. If there was a failure, 'failure'
+ will be True and 'response' will contain an encoded Exception.
+ """
+ def __init__(self, cell_name, value, failure):
+ self.failure = failure
+ self.cell_name = cell_name
+ self.value = value
+
+ def to_json(self):
+ resp_value = self.value
+ if self.failure:
+ resp_value = rpc_common.serialize_remote_exception(resp_value,
+ log_failure=False)
+ _dict = {'cell_name': self.cell_name,
+ 'value': resp_value,
+ 'failure': self.failure}
+ return jsonutils.dumps(_dict)
+
+ @classmethod
+ def from_json(cls, json_message):
+ _dict = jsonutils.loads(json_message)
+ if _dict['failure']:
+ resp_value = rpc_common.deserialize_remote_exception(
+ CONF, _dict['value'])
+ _dict['value'] = resp_value
+ return cls(**_dict)
+
+ def value_or_raise(self):
+ if self.failure:
+ if isinstance(self.value, (tuple, list)):
+ raise self.value[0], self.value[1], self.value[2]
+ else:
+ raise self.value
+ return self.value
diff --git a/nova/cells/opts.py b/nova/cells/opts.py
new file mode 100644
index 000000000..45b453ebc
--- /dev/null
+++ b/nova/cells/opts.py
@@ -0,0 +1,44 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2012 Rackspace Hosting
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Global cells config options
+"""
+
+from nova.openstack.common import cfg
+
+cells_opts = [
+ cfg.BoolOpt('enable',
+ default=False,
+ help='Enable cell functionality'),
+ cfg.StrOpt('topic',
+ default='cells',
+ help='the topic cells nodes listen on'),
+ cfg.StrOpt('manager',
+ default='nova.cells.manager.CellsManager',
+ help='Manager for cells'),
+ cfg.StrOpt('name',
+ default='nova',
+ help='name of this cell'),
+ cfg.ListOpt('capabilities',
+ default=['hypervisor=xenserver;kvm', 'os=linux;windows'],
+ help='Key/Multi-value list with the capabilities of the cell'),
+ cfg.IntOpt('call_timeout',
+ default=60,
+ help='Seconds to wait for response from a call to a cell.'),
+]
+
+cfg.CONF.register_opts(cells_opts, group='cells')
diff --git a/nova/cells/rpc_driver.py b/nova/cells/rpc_driver.py
new file mode 100644
index 000000000..5e420aa8e
--- /dev/null
+++ b/nova/cells/rpc_driver.py
@@ -0,0 +1,165 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Cells RPC Communication Driver
+"""
+from nova.cells import driver
+from nova.openstack.common import cfg
+from nova.openstack.common import rpc
+from nova.openstack.common.rpc import dispatcher as rpc_dispatcher
+from nova.openstack.common.rpc import proxy as rpc_proxy
+
+cell_rpc_driver_opts = [
+ cfg.StrOpt('rpc_driver_queue_base',
+ default='cells.intercell',
+ help="Base queue name to use when communicating between "
+ "cells. Various topics by message type will be "
+ "appended to this.")]
+
+CONF = cfg.CONF
+CONF.register_opts(cell_rpc_driver_opts, group='cells')
+CONF.import_opt('call_timeout', 'nova.cells.opts', group='cells')
+
+_CELL_TO_CELL_RPC_API_VERSION = '1.0'
+
+
+class CellsRPCDriver(driver.BaseCellsDriver):
+ """Driver for cell<->cell communication via RPC. This is used to
+ setup the RPC consumers as well as to send a message to another cell.
+
+ One instance of this class will be created for every neighbor cell
+ that we find in the DB and it will be associated with the cell in
+ its CellState.
+
+ One instance is also created by the cells manager for setting up
+ the consumers.
+ """
+ BASE_RPC_API_VERSION = _CELL_TO_CELL_RPC_API_VERSION
+
+ def __init__(self, *args, **kwargs):
+ super(CellsRPCDriver, self).__init__(*args, **kwargs)
+ self.rpc_connections = []
+ self.intercell_rpcapi = InterCellRPCAPI(
+ self.BASE_RPC_API_VERSION)
+
+ def _start_consumer(self, dispatcher, topic):
+ """Start an RPC consumer."""
+ conn = rpc.create_connection(new=True)
+ conn.create_consumer(topic, dispatcher, fanout=False)
+ conn.create_consumer(topic, dispatcher, fanout=True)
+ self.rpc_connections.append(conn)
+ conn.consume_in_thread()
+ return conn
+
+ def start_consumers(self, msg_runner):
+ """Start RPC consumers.
+
+ Start up 2 separate consumers for handling inter-cell
+ communication via RPC. Both handle the same types of
+ messages, but requests/replies are separated to solve
+ potential deadlocks. (If we used the same queue for both,
+ it's possible to exhaust the RPC thread pool while we wait
+ for replies.. such that we'd never consume a reply.)
+ """
+ topic_base = CONF.cells.rpc_driver_queue_base
+ proxy_manager = InterCellRPCDispatcher(msg_runner)
+ dispatcher = rpc_dispatcher.RpcDispatcher([proxy_manager])
+ for msg_type in msg_runner.get_message_types():
+ topic = '%s.%s' % (topic_base, msg_type)
+ self._start_consumer(dispatcher, topic)
+
+ def stop_consumers(self):
+ """Stop RPC consumers.
+
+ NOTE: Currently there's no hooks when stopping services
+ to have managers cleanup, so this is not currently called.
+ """
+ for conn in self.rpc_connections:
+ conn.close()
+
+ def send_message_to_cell(self, cell_state, message):
+ """Use the IntercellRPCAPI to send a message to a cell."""
+ self.intercell_rpcapi.send_message_to_cell(cell_state, message)
+
+
+class InterCellRPCAPI(rpc_proxy.RpcProxy):
+ """Client side of the Cell<->Cell RPC API.
+
+ The CellsRPCDriver uses this to make calls to another cell.
+
+ API version history:
+ 1.0 - Initial version.
+ """
+ def __init__(self, default_version):
+ super(InterCellRPCAPI, self).__init__(None, default_version)
+
+ @staticmethod
+ def _get_server_params_for_cell(next_hop):
+ """Turn the DB information for a cell into the parameters
+ needed for the RPC call.
+ """
+ param_map = {'username': 'username',
+ 'password': 'password',
+ 'rpc_host': 'hostname',
+ 'rpc_port': 'port',
+ 'rpc_virtual_host': 'virtual_host'}
+ server_params = {}
+ for source, target in param_map.items():
+ if next_hop.db_info[source]:
+ server_params[target] = next_hop.db_info[source]
+ return server_params
+
+ def send_message_to_cell(self, cell_state, message):
+ """Send a message to another cell by JSON-ifying the message and
+ making an RPC cast to 'process_message'. If the message says to
+ fanout, do it. The topic that is used will be
+ 'CONF.rpc_driver_queue_base.<message_type>'.
+ """
+ ctxt = message.ctxt
+ json_message = message.to_json()
+ rpc_message = self.make_msg('process_message', message=json_message)
+ topic_base = CONF.cells.rpc_driver_queue_base
+ topic = '%s.%s' % (topic_base, message.message_type)
+ server_params = self._get_server_params_for_cell(cell_state)
+ if message.fanout:
+ self.fanout_cast_to_server(ctxt, server_params,
+ rpc_message, topic=topic)
+ else:
+ self.cast_to_server(ctxt, server_params,
+ rpc_message, topic=topic)
+
+
+class InterCellRPCDispatcher(object):
+ """RPC Dispatcher to handle messages received from other cells.
+
+ All messages received here have come from a sibling cell. Depending
+ on the ultimate target and type of message, we may process the message
+ in this cell, relay the message to another sibling cell, or both. This
+ logic is defined by the message class in the messaging module.
+ """
+ BASE_RPC_API_VERSION = _CELL_TO_CELL_RPC_API_VERSION
+
+ def __init__(self, msg_runner):
+ """Init the Intercell RPC Dispatcher."""
+ self.msg_runner = msg_runner
+
+ def process_message(self, _ctxt, message):
+ """We received a message from another cell. Use the MessageRunner
+ to turn this from JSON back into an instance of the correct
+ Message class. Then process it!
+ """
+ message = self.msg_runner.message_from_json(message)
+ message.process()
diff --git a/nova/cells/rpcapi.py b/nova/cells/rpcapi.py
new file mode 100644
index 000000000..8ce298829
--- /dev/null
+++ b/nova/cells/rpcapi.py
@@ -0,0 +1,138 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Client side of nova-cells RPC API (for talking to the nova-cells service
+within a cell).
+
+This is different than communication between child and parent nova-cells
+services. That communication is handled by the cells driver via the
+messging module.
+"""
+
+from nova.openstack.common import cfg
+from nova.openstack.common import jsonutils
+from nova.openstack.common import log as logging
+from nova.openstack.common.rpc import proxy as rpc_proxy
+
+LOG = logging.getLogger(__name__)
+
+CONF = cfg.CONF
+CONF.import_opt('enable', 'nova.cells.opts', group='cells')
+CONF.import_opt('topic', 'nova.cells.opts', group='cells')
+
+
+class CellsAPI(rpc_proxy.RpcProxy):
+ '''Cells client-side RPC API
+
+ API version history:
+
+ 1.0 - Initial version.
+ '''
+ BASE_RPC_API_VERSION = '1.0'
+
+ def __init__(self):
+ super(CellsAPI, self).__init__(topic=CONF.cells.topic,
+ default_version=self.BASE_RPC_API_VERSION)
+
+ def cast_compute_api_method(self, ctxt, cell_name, method,
+ *args, **kwargs):
+ """Make a cast to a compute API method in a certain cell."""
+ method_info = {'method': method,
+ 'method_args': args,
+ 'method_kwargs': kwargs}
+ self.cast(ctxt, self.make_msg('run_compute_api_method',
+ cell_name=cell_name,
+ method_info=method_info,
+ call=False))
+
+ def call_compute_api_method(self, ctxt, cell_name, method,
+ *args, **kwargs):
+ """Make a call to a compute API method in a certain cell."""
+ method_info = {'method': method,
+ 'method_args': args,
+ 'method_kwargs': kwargs}
+ return self.call(ctxt, self.make_msg('run_compute_api_method',
+ cell_name=cell_name,
+ method_info=method_info,
+ call=True))
+
+ def schedule_run_instance(self, ctxt, **kwargs):
+ """Schedule a new instance for creation."""
+ self.cast(ctxt, self.make_msg('schedule_run_instance',
+ host_sched_kwargs=kwargs))
+
+ def instance_update_at_top(self, ctxt, instance):
+ """Update instance at API level."""
+ if not CONF.cells.enable:
+ return
+ # Make sure we have a dict, not a SQLAlchemy model
+ instance_p = jsonutils.to_primitive(instance)
+ self.cast(ctxt, self.make_msg('instance_update_at_top',
+ instance=instance_p))
+
+ def instance_destroy_at_top(self, ctxt, instance):
+ """Destroy instance at API level."""
+ if not CONF.cells.enable:
+ return
+ instance_p = jsonutils.to_primitive(instance)
+ self.cast(ctxt, self.make_msg('instance_destroy_at_top',
+ instance=instance_p))
+
+ def instance_delete_everywhere(self, ctxt, instance, delete_type):
+ """Delete instance everywhere. delete_type may be 'soft'
+ or 'hard'. This is generally only used to resolve races
+ when API cell doesn't know to what cell an instance belongs.
+ """
+ if not CONF.cells.enable:
+ return
+ instance_p = jsonutils.to_primitive(instance)
+ self.cast(ctxt, self.make_msg('instance_delete_everywhere',
+ instance=instance_p,
+ delete_type=delete_type))
+
+ def instance_fault_create_at_top(self, ctxt, instance_fault):
+ """Create an instance fault at the top."""
+ if not CONF.cells.enable:
+ return
+ instance_fault_p = jsonutils.to_primitive(instance_fault)
+ self.cast(ctxt, self.make_msg('instance_fault_create_at_top',
+ instance_fault=instance_fault_p))
+
+ def bw_usage_update_at_top(self, ctxt, uuid, mac, start_period,
+ bw_in, bw_out, last_ctr_in, last_ctr_out, last_refreshed=None):
+ """Broadcast upwards that bw_usage was updated."""
+ if not CONF.cells.enable:
+ return
+ bw_update_info = {'uuid': uuid,
+ 'mac': mac,
+ 'start_period': start_period,
+ 'bw_in': bw_in,
+ 'bw_out': bw_out,
+ 'last_ctr_in': last_ctr_in,
+ 'last_ctr_out': last_ctr_out,
+ 'last_refreshed': last_refreshed}
+ self.cast(ctxt, self.make_msg('bw_usage_update_at_top',
+ bw_update_info=bw_update_info))
+
+ def instance_info_cache_update_at_top(self, ctxt, instance_info_cache):
+ """Broadcast up that an instance's info_cache has changed."""
+ if not CONF.cells.enable:
+ return
+ iicache = jsonutils.to_primitive(instance_info_cache)
+ instance = {'uuid': iicache['instance_uuid'],
+ 'info_cache': iicache}
+ self.cast(ctxt, self.make_msg('instance_update_at_top',
+ instance=instance))
diff --git a/nova/cells/scheduler.py b/nova/cells/scheduler.py
new file mode 100644
index 000000000..0b730290a
--- /dev/null
+++ b/nova/cells/scheduler.py
@@ -0,0 +1,136 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Cells Scheduler
+"""
+import random
+import time
+
+from nova import compute
+from nova.compute import vm_states
+from nova.db import base
+from nova import exception
+from nova.openstack.common import cfg
+from nova.openstack.common import log as logging
+from nova.scheduler import rpcapi as scheduler_rpcapi
+
+cell_scheduler_opts = [
+ cfg.IntOpt('scheduler_retries',
+ default=10,
+ help='How many retries when no cells are available.'),
+ cfg.IntOpt('scheduler_retry_delay',
+ default=2,
+ help='How often to retry in seconds when no cells are '
+ 'available.')
+]
+
+LOG = logging.getLogger(__name__)
+
+CONF = cfg.CONF
+CONF.register_opts(cell_scheduler_opts, group='cells')
+
+
+class CellsScheduler(base.Base):
+ """The cells scheduler."""
+
+ def __init__(self, msg_runner):
+ super(CellsScheduler, self).__init__()
+ self.msg_runner = msg_runner
+ self.state_manager = msg_runner.state_manager
+ self.compute_api = compute.API()
+ self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
+
+ def _create_instances_here(self, ctxt, request_spec):
+ instance_values = request_spec['instance_properties']
+ for instance_uuid in request_spec['instance_uuids']:
+ instance_values['uuid'] = instance_uuid
+ instance = self.compute_api.create_db_entry_for_new_instance(
+ ctxt,
+ request_spec['instance_type'],
+ request_spec['image'],
+ instance_values,
+ request_spec['security_group'],
+ request_spec['block_device_mapping'])
+ self.msg_runner.instance_update_at_top(ctxt, instance)
+
+ def _get_possible_cells(self):
+ cells = set(self.state_manager.get_child_cells())
+ our_cell = self.state_manager.get_my_state()
+ # Include our cell in the list, if we have any capacity info
+ if not cells or our_cell.capacities:
+ cells.add(our_cell)
+ return cells
+
+ def _run_instance(self, message, host_sched_kwargs):
+ """Attempt to schedule instance(s). If we have no cells
+ to try, raise exception.NoCellsAvailable
+ """
+ ctxt = message.ctxt
+ request_spec = host_sched_kwargs['request_spec']
+
+ # The message we might forward to a child cell
+ cells = self._get_possible_cells()
+ if not cells:
+ raise exception.NoCellsAvailable()
+ cells = list(cells)
+
+ # Random selection for now
+ random.shuffle(cells)
+ target_cell = cells[0]
+
+ LOG.debug(_("Scheduling with routing_path=%(routing_path)s"),
+ locals())
+
+ if target_cell.is_me:
+ # Need to create instance DB entries as the host scheduler
+ # expects that the instance(s) already exists.
+ self._create_instances_here(ctxt, request_spec)
+ self.scheduler_rpcapi.run_instance(ctxt,
+ **host_sched_kwargs)
+ return
+ self.msg_runner.schedule_run_instance(ctxt, target_cell,
+ host_sched_kwargs)
+
+ def run_instance(self, message, host_sched_kwargs):
+ """Pick a cell where we should create a new instance."""
+ try:
+ for i in xrange(max(0, CONF.cells.scheduler_retries) + 1):
+ try:
+ return self._run_instance(message, host_sched_kwargs)
+ except exception.NoCellsAvailable:
+ if i == max(0, CONF.cells.scheduler_retries):
+ raise
+ sleep_time = max(1, CONF.cells.scheduler_retry_delay)
+ LOG.info(_("No cells available when scheduling. Will "
+ "retry in %(sleep_time)s second(s)"), locals())
+ time.sleep(sleep_time)
+ continue
+ except Exception:
+ request_spec = host_sched_kwargs['request_spec']
+ instance_uuids = request_spec['instance_uuids']
+ LOG.exception(_("Error scheduling instances %(instance_uuids)s"),
+ locals())
+ ctxt = message.ctxt
+ for instance_uuid in instance_uuids:
+ self.msg_runner.instance_update_at_top(ctxt,
+ {'uuid': instance_uuid,
+ 'vm_state': vm_states.ERROR})
+ try:
+ self.db.instance_update(ctxt,
+ instance_uuid,
+ {'vm_state': vm_states.ERROR})
+ except Exception:
+ pass
diff --git a/nova/cells/state.py b/nova/cells/state.py
new file mode 100644
index 000000000..c6f8f3220
--- /dev/null
+++ b/nova/cells/state.py
@@ -0,0 +1,346 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+CellState Manager
+"""
+import copy
+import datetime
+import functools
+
+from nova.cells import rpc_driver
+from nova import context
+from nova.db import base
+from nova.openstack.common import cfg
+from nova.openstack.common import lockutils
+from nova.openstack.common import log as logging
+from nova.openstack.common import timeutils
+
+cell_state_manager_opts = [
+ cfg.IntOpt('db_check_interval',
+ default=60,
+ help='Seconds between getting fresh cell info from db.'),
+]
+
+
+LOG = logging.getLogger(__name__)
+
+CONF = cfg.CONF
+CONF.import_opt('host', 'nova.config')
+CONF.import_opt('name', 'nova.cells.opts', group='cells')
+#CONF.import_opt('capabilities', 'nova.cells.opts', group='cells')
+CONF.register_opts(cell_state_manager_opts, group='cells')
+
+
+class CellState(object):
+ """Holds information for a particular cell."""
+ def __init__(self, cell_name, is_me=False):
+ self.name = cell_name
+ self.is_me = is_me
+ self.last_seen = datetime.datetime.min
+ self.capabilities = {}
+ self.capacities = {}
+ self.db_info = {}
+ # TODO(comstud): The DB will specify the driver to use to talk
+ # to this cell, but there's no column for this yet. The only
+ # available driver is the rpc driver.
+ self.driver = rpc_driver.CellsRPCDriver()
+
+ def update_db_info(self, cell_db_info):
+ """Update cell credentials from db"""
+ self.db_info = dict(
+ [(k, v) for k, v in cell_db_info.iteritems()
+ if k != 'name'])
+
+ def update_capabilities(self, cell_metadata):
+ """Update cell capabilities for a cell."""
+ self.last_seen = timeutils.utcnow()
+ self.capabilities = cell_metadata
+
+ def update_capacities(self, capacities):
+ """Update capacity information for a cell."""
+ self.last_seen = timeutils.utcnow()
+ self.capacities = capacities
+
+ def get_cell_info(self):
+ """Return subset of cell information for OS API use."""
+ db_fields_to_return = ['id', 'is_parent', 'weight_scale',
+ 'weight_offset', 'username', 'rpc_host', 'rpc_port']
+ cell_info = dict(name=self.name, capabilities=self.capabilities)
+ if self.db_info:
+ for field in db_fields_to_return:
+ cell_info[field] = self.db_info[field]
+ return cell_info
+
+ def send_message(self, message):
+ """Send a message to a cell. Just forward this to the driver,
+ passing ourselves and the message as arguments.
+ """
+ self.driver.send_message_to_cell(self, message)
+
+ def __repr__(self):
+ me = "me" if self.is_me else "not_me"
+ return "Cell '%s' (%s)" % (self.name, me)
+
+
+def sync_from_db(f):
+ """Use as a decorator to wrap methods that use cell information to
+ make sure they sync the latest information from the DB periodically.
+ """
+ @functools.wraps(f)
+ def wrapper(self, *args, **kwargs):
+ if self._time_to_sync():
+ self._cell_db_sync()
+ return f(self, *args, **kwargs)
+ return wrapper
+
+
+class CellStateManager(base.Base):
+ def __init__(self, cell_state_cls=None):
+ super(CellStateManager, self).__init__()
+ if not cell_state_cls:
+ cell_state_cls = CellState
+ self.cell_state_cls = cell_state_cls
+ self.my_cell_state = cell_state_cls(CONF.cells.name, is_me=True)
+ self.parent_cells = {}
+ self.child_cells = {}
+ self.last_cell_db_check = datetime.datetime.min
+ self._cell_db_sync()
+ my_cell_capabs = {}
+ for cap in CONF.cells.capabilities:
+ name, value = cap.split('=', 1)
+ if ';' in value:
+ values = set(value.split(';'))
+ else:
+ values = set([value])
+ my_cell_capabs[name] = values
+ self.my_cell_state.update_capabilities(my_cell_capabs)
+
+ def _refresh_cells_from_db(self, ctxt):
+ """Make our cell info map match the db."""
+ # Add/update existing cells ...
+ db_cells = self.db.cell_get_all(ctxt)
+ db_cells_dict = dict([(cell['name'], cell) for cell in db_cells])
+
+ # Update current cells. Delete ones that disappeared
+ for cells_dict in (self.parent_cells, self.child_cells):
+ for cell_name, cell_info in cells_dict.items():
+ is_parent = cell_info.db_info['is_parent']
+ db_dict = db_cells_dict.get(cell_name)
+ if db_dict and is_parent == db_dict['is_parent']:
+ cell_info.update_db_info(db_dict)
+ else:
+ del cells_dict[cell_name]
+
+ # Add new cells
+ for cell_name, db_info in db_cells_dict.items():
+ if db_info['is_parent']:
+ cells_dict = self.parent_cells
+ else:
+ cells_dict = self.child_cells
+ if cell_name not in cells_dict:
+ cells_dict[cell_name] = self.cell_state_cls(cell_name)
+ cells_dict[cell_name].update_db_info(db_info)
+
+ def _time_to_sync(self):
+ """Is it time to sync the DB against our memory cache?"""
+ diff = timeutils.utcnow() - self.last_cell_db_check
+ return diff.seconds >= CONF.cells.db_check_interval
+
+ def _update_our_capacity(self, context):
+ """Update our capacity in the self.my_cell_state CellState.
+
+ This will add/update 2 entries in our CellState.capacities,
+ 'ram_free' and 'disk_free'.
+
+ The values of these are both dictionaries with the following
+ format:
+
+ {'total_mb': <total_memory_free_in_the_cell>,
+ 'units_by_mb: <units_dictionary>}
+
+ <units_dictionary> contains the number of units that we can
+ build for every instance_type that we have. This number is
+ computed by looking at room available on every compute_node.
+
+ Take the following instance_types as an example:
+
+ [{'memory_mb': 1024, 'root_gb': 10, 'ephemeral_gb': 100},
+ {'memory_mb': 2048, 'root_gb': 20, 'ephemeral_gb': 200}]
+
+ capacities['ram_free']['units_by_mb'] would contain the following:
+
+ {'1024': <number_of_instances_that_will_fit>,
+ '2048': <number_of_instances_that_will_fit>}
+
+ capacities['disk_free']['units_by_mb'] would contain the following:
+
+ {'122880': <number_of_instances_that_will_fit>,
+ '225280': <number_of_instances_that_will_fit>}
+
+ Units are in MB, so 122880 = (10 + 100) * 1024.
+
+ NOTE(comstud): Perhaps we should only report a single number
+ available per instance_type.
+ """
+
+ compute_hosts = {}
+
+ def _get_compute_hosts():
+ compute_nodes = self.db.compute_node_get_all(context)
+ for compute in compute_nodes:
+ service = compute['service']
+ if not service or service['disabled']:
+ continue
+ host = service['host']
+ compute_hosts[host] = {
+ 'free_ram_mb': compute['free_ram_mb'],
+ 'free_disk_mb': compute['free_disk_gb'] * 1024}
+
+ _get_compute_hosts()
+ if not compute_hosts:
+ self.my_cell_state.update_capacities({})
+ return
+
+ ram_mb_free_units = {}
+ disk_mb_free_units = {}
+ total_ram_mb_free = 0
+ total_disk_mb_free = 0
+
+ def _free_units(tot, per_inst):
+ if per_inst:
+ return max(0, int(tot / per_inst))
+ else:
+ return 0
+
+ def _update_from_values(values, instance_type):
+ memory_mb = instance_type['memory_mb']
+ disk_mb = (instance_type['root_gb'] +
+ instance_type['ephemeral_gb']) * 1024
+ ram_mb_free_units.setdefault(str(memory_mb), 0)
+ disk_mb_free_units.setdefault(str(disk_mb), 0)
+ ram_free_units = _free_units(compute_values['free_ram_mb'],
+ memory_mb)
+ disk_free_units = _free_units(compute_values['free_disk_mb'],
+ disk_mb)
+ ram_mb_free_units[str(memory_mb)] += ram_free_units
+ disk_mb_free_units[str(disk_mb)] += disk_free_units
+
+ instance_types = self.db.instance_type_get_all(context)
+
+ for compute_values in compute_hosts.values():
+ total_ram_mb_free += compute_values['free_ram_mb']
+ total_disk_mb_free += compute_values['free_disk_mb']
+ for instance_type in instance_types:
+ _update_from_values(compute_values, instance_type)
+
+ capacities = {'ram_free': {'total_mb': total_ram_mb_free,
+ 'units_by_mb': ram_mb_free_units},
+ 'disk_free': {'total_mb': total_disk_mb_free,
+ 'units_by_mb': disk_mb_free_units}}
+ self.my_cell_state.update_capacities(capacities)
+
+ @lockutils.synchronized('cell-db-sync', 'nova-')
+ def _cell_db_sync(self):
+ """Update status for all cells if it's time. Most calls to
+ this are from the check_for_update() decorator that checks
+ the time, but it checks outside of a lock. The duplicate
+ check here is to prevent multiple threads from pulling the
+ information simultaneously.
+ """
+ if self._time_to_sync():
+ LOG.debug(_("Updating cell cache from db."))
+ self.last_cell_db_check = timeutils.utcnow()
+ ctxt = context.get_admin_context()
+ self._refresh_cells_from_db(ctxt)
+ self._update_our_capacity(ctxt)
+
+ @sync_from_db
+ def get_my_state(self):
+ """Return information for my (this) cell."""
+ return self.my_cell_state
+
+ @sync_from_db
+ def get_child_cells(self):
+ """Return list of child cell_infos."""
+ return self.child_cells.values()
+
+ @sync_from_db
+ def get_parent_cells(self):
+ """Return list of parent cell_infos."""
+ return self.parent_cells.values()
+
+ @sync_from_db
+ def get_parent_cell(self, cell_name):
+ return self.parent_cells.get(cell_name)
+
+ @sync_from_db
+ def get_child_cell(self, cell_name):
+ return self.child_cells.get(cell_name)
+
+ @sync_from_db
+ def update_cell_capabilities(self, cell_name, capabilities):
+ """Update capabilities for a cell."""
+ cell = self.child_cells.get(cell_name)
+ if not cell:
+ cell = self.parent_cells.get(cell_name)
+ if not cell:
+ LOG.error(_("Unknown cell '%(cell_name)s' when trying to "
+ "update capabilities"), locals())
+ return
+ # Make sure capabilities are sets.
+ for capab_name, values in capabilities.items():
+ capabilities[capab_name] = set(values)
+ cell.update_capabilities(capabilities)
+
+ @sync_from_db
+ def update_cell_capacities(self, cell_name, capacities):
+ """Update capacities for a cell."""
+ cell = self.child_cells.get(cell_name)
+ if not cell:
+ cell = self.parent_cells.get(cell_name)
+ if not cell:
+ LOG.error(_("Unknown cell '%(cell_name)s' when trying to "
+ "update capacities"), locals())
+ return
+ cell.update_capacities(capacities)
+
+ @sync_from_db
+ def get_our_capabilities(self, include_children=True):
+ capabs = copy.deepcopy(self.my_cell_state.capabilities)
+ if include_children:
+ for cell in self.child_cells.values():
+ for capab_name, values in cell.capabilities.items():
+ if capab_name not in capabs:
+ capabs[capab_name] = set([])
+ capabs[capab_name] |= values
+ return capabs
+
+ def _add_to_dict(self, target, src):
+ for key, value in src.items():
+ if isinstance(value, dict):
+ target.setdefault(key, {})
+ self._add_to_dict(target[key], value)
+ continue
+ target.setdefault(key, 0)
+ target[key] += value
+
+ @sync_from_db
+ def get_our_capacities(self, include_children=True):
+ capacities = copy.deepcopy(self.my_cell_state.capacities)
+ if include_children:
+ for cell in self.child_cells.values():
+ self._add_to_dict(capacities, cell.capacities)
+ return capacities
diff --git a/nova/compute/api.py b/nova/compute/api.py
index 757f78f2d..abbc0bd92 100644
--- a/nova/compute/api.py
+++ b/nova/compute/api.py
@@ -1954,6 +1954,14 @@ class API(base.Base):
return {'url': connect_info['access_url']}
+ def get_vnc_connect_info(self, context, instance, console_type):
+ """Used in a child cell to get console info."""
+ if not instance['host']:
+ raise exception.InstanceNotReady(instance_id=instance['uuid'])
+ connect_info = self.compute_rpcapi.get_vnc_console(context,
+ instance=instance, console_type=console_type)
+ return connect_info
+
@wrap_check_policy
def get_console_output(self, context, instance, tail_length=None):
"""Get console output for an instance."""
diff --git a/nova/compute/cells_api.py b/nova/compute/cells_api.py
new file mode 100644
index 000000000..cdbccebb1
--- /dev/null
+++ b/nova/compute/cells_api.py
@@ -0,0 +1,471 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Compute API that proxies via Cells Service"""
+
+from nova import block_device
+from nova.cells import rpcapi as cells_rpcapi
+from nova.compute import api as compute_api
+from nova.compute import task_states
+from nova.compute import vm_states
+from nova import exception
+from nova.openstack.common import excutils
+from nova.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+check_instance_state = compute_api.check_instance_state
+wrap_check_policy = compute_api.wrap_check_policy
+check_policy = compute_api.check_policy
+check_instance_lock = compute_api.check_instance_lock
+
+
+def validate_cell(fn):
+ def _wrapped(self, context, instance, *args, **kwargs):
+ self._validate_cell(instance, fn.__name__)
+ return fn(self, context, instance, *args, **kwargs)
+ _wrapped.__name__ = fn.__name__
+ return _wrapped
+
+
+class ComputeRPCAPINoOp(object):
+ def __getattr__(self, key):
+ def _noop_rpc_wrapper(*args, **kwargs):
+ return None
+ return _noop_rpc_wrapper
+
+
+class SchedulerRPCAPIRedirect(object):
+ def __init__(self, cells_rpcapi_obj):
+ self.cells_rpcapi = cells_rpcapi_obj
+
+ def __getattr__(self, key):
+ def _noop_rpc_wrapper(*args, **kwargs):
+ return None
+ return _noop_rpc_wrapper
+
+ def run_instance(self, context, **kwargs):
+ self.cells_rpcapi.schedule_run_instance(context, **kwargs)
+
+
+class ComputeCellsAPI(compute_api.API):
+ def __init__(self, *args, **kwargs):
+ super(ComputeCellsAPI, self).__init__(*args, **kwargs)
+ self.cells_rpcapi = cells_rpcapi.CellsAPI()
+ # Avoid casts/calls directly to compute
+ self.compute_rpcapi = ComputeRPCAPINoOp()
+ # Redirect scheduler run_instance to cells.
+ self.scheduler_rpcapi = SchedulerRPCAPIRedirect(self.cells_rpcapi)
+
+ def _cell_read_only(self, cell_name):
+ """Is the target cell in a read-only mode?"""
+ # FIXME(comstud): Add support for this.
+ return False
+
+ def _validate_cell(self, instance, method):
+ cell_name = instance['cell_name']
+ if not cell_name:
+ raise exception.InstanceUnknownCell(
+ instance_uuid=instance['uuid'])
+ if self._cell_read_only(cell_name):
+ raise exception.InstanceInvalidState(
+ attr="vm_state",
+ instance_uuid=instance['uuid'],
+ state="temporary_readonly",
+ method=method)
+
+ def _cast_to_cells(self, context, instance, method, *args, **kwargs):
+ instance_uuid = instance['uuid']
+ cell_name = instance['cell_name']
+ if not cell_name:
+ raise exception.InstanceUnknownCell(instance_uuid=instance_uuid)
+ self.cells_rpcapi.cast_compute_api_method(context, cell_name,
+ method, instance_uuid, *args, **kwargs)
+
+ def _call_to_cells(self, context, instance, method, *args, **kwargs):
+ instance_uuid = instance['uuid']
+ cell_name = instance['cell_name']
+ if not cell_name:
+ raise exception.InstanceUnknownCell(instance_uuid=instance_uuid)
+ return self.cells_rpcapi.call_compute_api_method(context, cell_name,
+ method, instance_uuid, *args, **kwargs)
+
+ def _check_requested_networks(self, context, requested_networks):
+ """Override compute API's checking of this. It'll happen in
+ child cell
+ """
+ return
+
+ def _validate_image_href(self, context, image_href):
+ """Override compute API's checking of this. It'll happen in
+ child cell
+ """
+ return
+
+ def _create_image(self, context, instance, name, image_type,
+ backup_type=None, rotation=None, extra_properties=None):
+ if backup_type:
+ return self._call_to_cells(context, instance, 'backup',
+ name, backup_type, rotation,
+ extra_properties=extra_properties)
+ else:
+ return self._call_to_cells(context, instance, 'snapshot',
+ name, extra_properties=extra_properties)
+
+ def create(self, *args, **kwargs):
+ """We can use the base functionality, but I left this here just
+ for completeness.
+ """
+ return super(ComputeCellsAPI, self).create(*args, **kwargs)
+
+ @validate_cell
+ def update(self, context, instance, **kwargs):
+ """Update an instance."""
+ rv = super(ComputeCellsAPI, self).update(context,
+ instance, **kwargs)
+ # We need to skip vm_state/task_state updates... those will
+ # happen when via a a _cast_to_cells for running a different
+ # compute api method
+ kwargs_copy = kwargs.copy()
+ kwargs_copy.pop('vm_state', None)
+ kwargs_copy.pop('task_state', None)
+ if kwargs_copy:
+ try:
+ self._cast_to_cells(context, instance, 'update',
+ **kwargs_copy)
+ except exception.InstanceUnknownCell:
+ pass
+ return rv
+
+ def _local_delete(self, context, instance, bdms):
+ # This will get called for every delete in the API cell
+ # because _delete() in compute/api.py will not find a
+ # service when checking if it's up.
+ # We need to only take action if there's no cell_name. Our
+ # overrides of delete() and soft_delete() will take care of
+ # the rest.
+ cell_name = instance['cell_name']
+ if not cell_name:
+ return super(ComputeCellsAPI, self)._local_delete(context,
+ instance, bdms)
+
+ def soft_delete(self, context, instance):
+ self._handle_cell_delete(context, instance,
+ super(ComputeCellsAPI, self).soft_delete, 'soft_delete')
+
+ def delete(self, context, instance):
+ self._handle_cell_delete(context, instance,
+ super(ComputeCellsAPI, self).delete, 'delete')
+
+ def _handle_cell_delete(self, context, instance, method, method_name):
+ """Terminate an instance."""
+ # We can't use the decorator because we have special logic in the
+ # case we don't know the cell_name...
+ cell_name = instance['cell_name']
+ if cell_name and self._cell_read_only(cell_name):
+ raise exception.InstanceInvalidState(
+ attr="vm_state",
+ instance_uuid=instance['uuid'],
+ state="temporary_readonly",
+ method=method_name)
+ method(context, instance)
+ try:
+ self._cast_to_cells(context, instance, method_name)
+ except exception.InstanceUnknownCell:
+ # If there's no cell, there's also no host... which means
+ # the instance was destroyed from the DB here. Let's just
+ # broadcast a message down to all cells and hope this ends
+ # up resolving itself... Worse case.. the instance will
+ # show back up again here.
+ delete_type = method == 'soft_delete' and 'soft' or 'hard'
+ self.cells_rpcapi.instance_delete_everywhere(context,
+ instance['uuid'], delete_type)
+
+ @validate_cell
+ def restore(self, context, instance):
+ """Restore a previously deleted (but not reclaimed) instance."""
+ super(ComputeCellsAPI, self).restore(context, instance)
+ self._cast_to_cells(context, instance, 'restore')
+
+ @validate_cell
+ def force_delete(self, context, instance):
+ """Force delete a previously deleted (but not reclaimed) instance."""
+ super(ComputeCellsAPI, self).force_delete(context, instance)
+ self._cast_to_cells(context, instance, 'force_delete')
+
+ @validate_cell
+ def stop(self, context, instance, do_cast=True):
+ """Stop an instance."""
+ super(ComputeCellsAPI, self).stop(context, instance)
+ if do_cast:
+ self._cast_to_cells(context, instance, 'stop', do_cast=True)
+ else:
+ return self._call_to_cells(context, instance, 'stop',
+ do_cast=False)
+
+ @validate_cell
+ def start(self, context, instance):
+ """Start an instance."""
+ super(ComputeCellsAPI, self).start(context, instance)
+ self._cast_to_cells(context, instance, 'start')
+
+ @validate_cell
+ def reboot(self, context, instance, *args, **kwargs):
+ """Reboot the given instance."""
+ super(ComputeCellsAPI, self).reboot(context, instance,
+ *args, **kwargs)
+ self._cast_to_cells(context, instance, 'reboot', *args,
+ **kwargs)
+
+ @validate_cell
+ def rebuild(self, context, instance, *args, **kwargs):
+ """Rebuild the given instance with the provided attributes."""
+ super(ComputeCellsAPI, self).rebuild(context, instance, *args,
+ **kwargs)
+ self._cast_to_cells(context, instance, 'rebuild', *args, **kwargs)
+
+ @check_instance_state(vm_state=[vm_states.RESIZED])
+ @validate_cell
+ def revert_resize(self, context, instance):
+ """Reverts a resize, deleting the 'new' instance in the process."""
+ # NOTE(markwash): regular api manipulates the migration here, but we
+ # don't have access to it. So to preserve the interface just update the
+ # vm and task state.
+ self.update(context, instance,
+ task_state=task_states.RESIZE_REVERTING)
+ self._cast_to_cells(context, instance, 'revert_resize')
+
+ @check_instance_state(vm_state=[vm_states.RESIZED])
+ @validate_cell
+ def confirm_resize(self, context, instance):
+ """Confirms a migration/resize and deletes the 'old' instance."""
+ # NOTE(markwash): regular api manipulates migration here, but we don't
+ # have the migration in the api database. So to preserve the interface
+ # just update the vm and task state without calling super()
+ self.update(context, instance, task_state=None,
+ vm_state=vm_states.ACTIVE)
+ self._cast_to_cells(context, instance, 'confirm_resize')
+
+ @check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.STOPPED],
+ task_state=[None])
+ @validate_cell
+ def resize(self, context, instance, *args, **kwargs):
+ """Resize (ie, migrate) a running instance.
+
+ If flavor_id is None, the process is considered a migration, keeping
+ the original flavor_id. If flavor_id is not None, the instance should
+ be migrated to a new host and resized to the new flavor_id.
+ """
+ super(ComputeCellsAPI, self).resize(context, instance, *args,
+ **kwargs)
+ # FIXME(comstud): pass new instance_type object down to a method
+ # that'll unfold it
+ self._cast_to_cells(context, instance, 'resize', *args, **kwargs)
+
+ @validate_cell
+ def add_fixed_ip(self, context, instance, *args, **kwargs):
+ """Add fixed_ip from specified network to given instance."""
+ super(ComputeCellsAPI, self).add_fixed_ip(context, instance,
+ *args, **kwargs)
+ self._cast_to_cells(context, instance, 'add_fixed_ip',
+ *args, **kwargs)
+
+ @validate_cell
+ def remove_fixed_ip(self, context, instance, *args, **kwargs):
+ """Remove fixed_ip from specified network to given instance."""
+ super(ComputeCellsAPI, self).remove_fixed_ip(context, instance,
+ *args, **kwargs)
+ self._cast_to_cells(context, instance, 'remove_fixed_ip',
+ *args, **kwargs)
+
+ @validate_cell
+ def pause(self, context, instance):
+ """Pause the given instance."""
+ super(ComputeCellsAPI, self).pause(context, instance)
+ self._cast_to_cells(context, instance, 'pause')
+
+ @validate_cell
+ def unpause(self, context, instance):
+ """Unpause the given instance."""
+ super(ComputeCellsAPI, self).unpause(context, instance)
+ self._cast_to_cells(context, instance, 'unpause')
+
+ def set_host_enabled(self, context, host, enabled):
+ """Sets the specified host's ability to accept new instances."""
+ # FIXME(comstud): Since there's no instance here, we have no
+ # idea which cell should be the target.
+ pass
+
+ def host_power_action(self, context, host, action):
+ """Reboots, shuts down or powers up the host."""
+ # FIXME(comstud): Since there's no instance here, we have no
+ # idea which cell should be the target.
+ pass
+
+ def get_diagnostics(self, context, instance):
+ """Retrieve diagnostics for the given instance."""
+ # FIXME(comstud): Cache this?
+ # Also: only calling super() to get state/policy checking
+ super(ComputeCellsAPI, self).get_diagnostics(context, instance)
+ return self._call_to_cells(context, instance, 'get_diagnostics')
+
+ @validate_cell
+ def suspend(self, context, instance):
+ """Suspend the given instance."""
+ super(ComputeCellsAPI, self).suspend(context, instance)
+ self._cast_to_cells(context, instance, 'suspend')
+
+ @validate_cell
+ def resume(self, context, instance):
+ """Resume the given instance."""
+ super(ComputeCellsAPI, self).resume(context, instance)
+ self._cast_to_cells(context, instance, 'resume')
+
+ @validate_cell
+ def rescue(self, context, instance, rescue_password=None):
+ """Rescue the given instance."""
+ super(ComputeCellsAPI, self).rescue(context, instance,
+ rescue_password=rescue_password)
+ self._cast_to_cells(context, instance, 'rescue',
+ rescue_password=rescue_password)
+
+ @validate_cell
+ def unrescue(self, context, instance):
+ """Unrescue the given instance."""
+ super(ComputeCellsAPI, self).unrescue(context, instance)
+ self._cast_to_cells(context, instance, 'unrescue')
+
+ @validate_cell
+ def set_admin_password(self, context, instance, password=None):
+ """Set the root/admin password for the given instance."""
+ super(ComputeCellsAPI, self).set_admin_password(context, instance,
+ password=password)
+ self._cast_to_cells(context, instance, 'set_admin_password',
+ password=password)
+
+ @validate_cell
+ def inject_file(self, context, instance, *args, **kwargs):
+ """Write a file to the given instance."""
+ super(ComputeCellsAPI, self).inject_file(context, instance, *args,
+ **kwargs)
+ self._cast_to_cells(context, instance, 'inject_file', *args, **kwargs)
+
+ @wrap_check_policy
+ @validate_cell
+ def get_vnc_console(self, context, instance, console_type):
+ """Get a url to a VNC Console."""
+ if not instance['host']:
+ raise exception.InstanceNotReady(instance_id=instance['uuid'])
+
+ connect_info = self._call_to_cells(context, instance,
+ 'get_vnc_connect_info', console_type)
+
+ self.consoleauth_rpcapi.authorize_console(context,
+ connect_info['token'], console_type, connect_info['host'],
+ connect_info['port'], connect_info['internal_access_path'])
+ return {'url': connect_info['access_url']}
+
+ @validate_cell
+ def get_console_output(self, context, instance, *args, **kwargs):
+ """Get console output for an an instance."""
+ # NOTE(comstud): Calling super() just to get policy check
+ super(ComputeCellsAPI, self).get_console_output(context, instance,
+ *args, **kwargs)
+ return self._call_to_cells(context, instance, 'get_console_output',
+ *args, **kwargs)
+
+ def lock(self, context, instance):
+ """Lock the given instance."""
+ super(ComputeCellsAPI, self).lock(context, instance)
+ self._cast_to_cells(context, instance, 'lock')
+
+ def unlock(self, context, instance):
+ """Unlock the given instance."""
+ super(ComputeCellsAPI, self).lock(context, instance)
+ self._cast_to_cells(context, instance, 'unlock')
+
+ @validate_cell
+ def reset_network(self, context, instance):
+ """Reset networking on the instance."""
+ super(ComputeCellsAPI, self).reset_network(context, instance)
+ self._cast_to_cells(context, instance, 'reset_network')
+
+ @validate_cell
+ def inject_network_info(self, context, instance):
+ """Inject network info for the instance."""
+ super(ComputeCellsAPI, self).inject_network_info(context, instance)
+ self._cast_to_cells(context, instance, 'inject_network_info')
+
+ @wrap_check_policy
+ @validate_cell
+ def attach_volume(self, context, instance, volume_id, device=None):
+ """Attach an existing volume to an existing instance."""
+ if device and not block_device.match_device(device):
+ raise exception.InvalidDevicePath(path=device)
+ device = self.compute_rpcapi.reserve_block_device_name(
+ context, device=device, instance=instance, volume_id=volume_id)
+ try:
+ volume = self.volume_api.get(context, volume_id)
+ self.volume_api.check_attach(context, volume)
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ self.db.block_device_mapping_destroy_by_instance_and_device(
+ context, instance['uuid'], device)
+ self._cast_to_cells(context, instance, 'attach_volume',
+ volume_id, device)
+
+ @check_instance_lock
+ @validate_cell
+ def _detach_volume(self, context, instance, volume_id):
+ """Detach a volume from an instance."""
+ check_policy(context, 'detach_volume', instance)
+
+ volume = self.volume_api.get(context, volume_id)
+ self.volume_api.check_detach(context, volume)
+ self._cast_to_cells(context, instance, 'detach_volume',
+ volume_id)
+
+ @wrap_check_policy
+ @validate_cell
+ def associate_floating_ip(self, context, instance, address):
+ """Makes calls to network_api to associate_floating_ip.
+
+ :param address: is a string floating ip address
+ """
+ self._cast_to_cells(context, instance, 'associate_floating_ip',
+ address)
+
+ @validate_cell
+ def delete_instance_metadata(self, context, instance, key):
+ """Delete the given metadata item from an instance."""
+ super(ComputeCellsAPI, self).delete_instance_metadata(context,
+ instance, key)
+ self._cast_to_cells(context, instance, 'delete_instance_metadata',
+ key)
+
+ @wrap_check_policy
+ @validate_cell
+ def update_instance_metadata(self, context, instance,
+ metadata, delete=False):
+ rv = super(ComputeCellsAPI, self).update_instance_metadata(context,
+ instance, metadata, delete=delete)
+ try:
+ self._cast_to_cells(context, instance,
+ 'update_instance_metadata',
+ metadata, delete=delete)
+ except exception.InstanceUnknownCell:
+ pass
+ return rv
diff --git a/nova/db/api.py b/nova/db/api.py
index 3e350fc75..1322c29e9 100644
--- a/nova/db/api.py
+++ b/nova/db/api.py
@@ -43,8 +43,10 @@ these objects be simple dictionaries.
"""
+from nova.cells import rpcapi as cells_rpcapi
from nova import exception
from nova.openstack.common import cfg
+from nova.openstack.common import log as logging
from nova import utils
@@ -68,6 +70,7 @@ CONF.register_opts(db_opts)
IMPL = utils.LazyPluggable('db_backend',
sqlalchemy='nova.db.sqlalchemy.api')
+LOG = logging.getLogger(__name__)
class NoMoreNetworks(exception.NovaException):
@@ -566,9 +569,16 @@ def instance_data_get_for_project(context, project_id, session=None):
session=session)
-def instance_destroy(context, instance_uuid, constraint=None):
+def instance_destroy(context, instance_uuid, constraint=None,
+ update_cells=True):
"""Destroy the instance or raise if it does not exist."""
- return IMPL.instance_destroy(context, instance_uuid, constraint)
+ rv = IMPL.instance_destroy(context, instance_uuid, constraint)
+ if update_cells:
+ try:
+ cells_rpcapi.CellsAPI().instance_destroy_at_top(context, rv)
+ except Exception:
+ LOG.exception(_("Failed to notify cells of instance destroy"))
+ return rv
def instance_get_by_uuid(context, uuid):
@@ -665,13 +675,19 @@ def instance_test_and_set(context, instance_uuid, attr, ok_states,
ok_states, new_state)
-def instance_update(context, instance_uuid, values):
+def instance_update(context, instance_uuid, values, update_cells=True):
"""Set the given properties on an instance and update it.
Raises NotFound if instance does not exist.
"""
- return IMPL.instance_update(context, instance_uuid, values)
+ rv = IMPL.instance_update(context, instance_uuid, values)
+ if update_cells:
+ try:
+ cells_rpcapi.CellsAPI().instance_update_at_top(context, rv)
+ except Exception:
+ LOG.exception(_("Failed to notify cells of instance update"))
+ return rv
def instance_update_and_get_original(context, instance_uuid, values):
@@ -687,8 +703,12 @@ def instance_update_and_get_original(context, instance_uuid, values):
Raises NotFound if instance does not exist.
"""
- return IMPL.instance_update_and_get_original(context, instance_uuid,
- values)
+ rv = IMPL.instance_update_and_get_original(context, instance_uuid, values)
+ try:
+ cells_rpcapi.CellsAPI().instance_update_at_top(context, rv[1])
+ except Exception:
+ LOG.exception(_("Failed to notify cells of instance update"))
+ return rv
def instance_add_security_group(context, instance_id, security_group_id):
@@ -714,13 +734,21 @@ def instance_info_cache_get(context, instance_uuid):
return IMPL.instance_info_cache_get(context, instance_uuid)
-def instance_info_cache_update(context, instance_uuid, values):
+def instance_info_cache_update(context, instance_uuid, values,
+ update_cells=True):
"""Update an instance info cache record in the table.
:param instance_uuid: = uuid of info cache's instance
:param values: = dict containing column values to update
"""
- return IMPL.instance_info_cache_update(context, instance_uuid, values)
+ rv = IMPL.instance_info_cache_update(context, instance_uuid, values)
+ try:
+ cells_rpcapi.CellsAPI().instance_info_cache_update_at_top(context,
+ rv)
+ except Exception:
+ LOG.exception(_("Failed to notify cells of instance info cache "
+ "update"))
+ return rv
def instance_info_cache_delete(context, instance_uuid):
@@ -1354,7 +1382,7 @@ def instance_metadata_delete(context, instance_uuid, key):
def instance_metadata_update(context, instance_uuid, metadata, delete):
"""Update metadata if it exists, otherwise create it."""
return IMPL.instance_metadata_update(context, instance_uuid,
- metadata, delete)
+ metadata, delete)
####################
@@ -1414,12 +1442,21 @@ def bw_usage_get_by_uuids(context, uuids, start_period):
def bw_usage_update(context, uuid, mac, start_period, bw_in, bw_out,
- last_ctr_in, last_ctr_out, last_refreshed=None):
+ last_ctr_in, last_ctr_out, last_refreshed=None,
+ update_cells=True):
"""Update cached bandwidth usage for an instance's network based on mac
address. Creates new record if needed.
"""
- return IMPL.bw_usage_update(context, uuid, mac, start_period, bw_in,
+ rv = IMPL.bw_usage_update(context, uuid, mac, start_period, bw_in,
bw_out, last_ctr_in, last_ctr_out, last_refreshed=last_refreshed)
+ if update_cells:
+ try:
+ cells_rpcapi.CellsAPI().bw_usage_update_at_top(context,
+ uuid, mac, start_period, bw_in, bw_out,
+ last_ctr_in, last_ctr_out, last_refreshed)
+ except Exception:
+ LOG.exception(_("Failed to notify cells of bw_usage update"))
+ return rv
####################
@@ -1555,9 +1592,15 @@ def aggregate_host_delete(context, aggregate_id, host):
####################
-def instance_fault_create(context, values):
+def instance_fault_create(context, values, update_cells=True):
"""Create a new Instance Fault."""
- return IMPL.instance_fault_create(context, values)
+ rv = IMPL.instance_fault_create(context, values)
+ if update_cells:
+ try:
+ cells_rpcapi.CellsAPI().instance_fault_create_at_top(context, rv)
+ except Exception:
+ LOG.exception(_("Failed to notify cells of instance fault"))
+ return rv
def instance_fault_get_by_instance_uuids(context, instance_uuids):
diff --git a/nova/exception.py b/nova/exception.py
index ee0a88a95..c484b5120 100644
--- a/nova/exception.py
+++ b/nova/exception.py
@@ -769,6 +769,34 @@ class CellNotFound(NotFound):
message = _("Cell %(cell_id)s could not be found.")
+class CellRoutingInconsistency(NovaException):
+ message = _("Inconsistency in cell routing: %(reason)s")
+
+
+class CellServiceAPIMethodNotFound(NotFound):
+ message = _("Service API method not found: %(detail)s")
+
+
+class CellTimeout(NotFound):
+ message = _("Timeout waiting for response from cell")
+
+
+class CellMaxHopCountReached(NovaException):
+ message = _("Cell message has reached maximum hop count: %(hop_count)s")
+
+
+class NoCellsAvailable(NovaException):
+ message = _("No cells available matching scheduling criteria.")
+
+
+class CellError(NovaException):
+ message = _("Exception received during cell processing: %(exc_name)s.")
+
+
+class InstanceUnknownCell(NotFound):
+ message = _("Cell is not known for instance %(instance_uuid)s")
+
+
class SchedulerHostFilterNotFound(NotFound):
message = _("Scheduler Host Filter %(filter_name)s could not be found.")
diff --git a/nova/tests/cells/__init__.py b/nova/tests/cells/__init__.py
new file mode 100644
index 000000000..d1bf725f7
--- /dev/null
+++ b/nova/tests/cells/__init__.py
@@ -0,0 +1,19 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# NOTE(vish): this forces the fixtures from tests/__init.py:setup() to work
+from nova.tests import *
diff --git a/nova/tests/cells/fakes.py b/nova/tests/cells/fakes.py
new file mode 100644
index 000000000..a9de530d1
--- /dev/null
+++ b/nova/tests/cells/fakes.py
@@ -0,0 +1,191 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Fakes For Cells tests.
+"""
+
+from nova.cells import driver
+from nova.cells import manager as cells_manager
+from nova.cells import messaging
+from nova.cells import state as cells_state
+import nova.db
+from nova.db import base
+from nova.openstack.common import cfg
+
+CONF = cfg.CONF
+CONF.import_opt('name', 'nova.cells.opts', group='cells')
+
+
+# Fake Cell Hierarchy
+FAKE_TOP_LEVEL_CELL_NAME = 'api-cell'
+FAKE_CELL_LAYOUT = [{'child-cell1': []},
+ {'child-cell2': [{'grandchild-cell1': []}]},
+ {'child-cell3': [{'grandchild-cell2': []},
+ {'grandchild-cell3': []}]},
+ {'child-cell4': []}]
+
+# build_cell_stub_infos() below will take the above layout and create
+# a fake view of the DB from the perspective of each of the cells.
+# For each cell, a CellStubInfo will be created with this info.
+CELL_NAME_TO_STUB_INFO = {}
+
+
+class FakeDBApi(object):
+ def __init__(self, cell_db_entries):
+ self.cell_db_entries = cell_db_entries
+
+ def __getattr__(self, key):
+ return getattr(nova.db, key)
+
+ def cell_get_all(self, ctxt):
+ return self.cell_db_entries
+
+ def compute_node_get_all(self, ctxt):
+ return []
+
+
+class FakeCellsDriver(driver.BaseCellsDriver):
+ pass
+
+
+class FakeCellState(cells_state.CellState):
+ def send_message(self, message):
+ message_runner = get_message_runner(self.name)
+ orig_ctxt = message.ctxt
+ json_message = message.to_json()
+ message = message_runner.message_from_json(json_message)
+ # Restore this so we can use mox and verify same context
+ message.ctxt = orig_ctxt
+ message.process()
+
+
+class FakeCellStateManager(cells_state.CellStateManager):
+ def __init__(self, *args, **kwargs):
+ super(FakeCellStateManager, self).__init__(*args,
+ cell_state_cls=FakeCellState, **kwargs)
+
+
+class FakeCellsManager(cells_manager.CellsManager):
+ def __init__(self, *args, **kwargs):
+ super(FakeCellsManager, self).__init__(*args,
+ cell_state_manager=FakeCellStateManager,
+ **kwargs)
+
+
+class CellStubInfo(object):
+ def __init__(self, test_case, cell_name, db_entries):
+ self.test_case = test_case
+ self.cell_name = cell_name
+ self.db_entries = db_entries
+
+ def fake_base_init(_self, *args, **kwargs):
+ _self.db = FakeDBApi(db_entries)
+
+ test_case.stubs.Set(base.Base, '__init__', fake_base_init)
+ self.cells_manager = FakeCellsManager()
+ # Fix the cell name, as it normally uses CONF.cells.name
+ msg_runner = self.cells_manager.msg_runner
+ msg_runner.our_name = self.cell_name
+ self.cells_manager.state_manager.my_cell_state.name = self.cell_name
+
+
+def _build_cell_stub_info(test_case, our_name, parent_path, children):
+ cell_db_entries = []
+ cur_db_id = 1
+ sep_char = messaging._PATH_CELL_SEP
+ if parent_path:
+ cell_db_entries.append(
+ dict(id=cur_db_id,
+ name=parent_path.split(sep_char)[-1],
+ is_parent=True,
+ username='username%s' % cur_db_id,
+ password='password%s' % cur_db_id,
+ rpc_host='rpc_host%s' % cur_db_id,
+ rpc_port='rpc_port%s' % cur_db_id,
+ rpc_virtual_host='rpc_vhost%s' % cur_db_id))
+ cur_db_id += 1
+ our_path = parent_path + sep_char + our_name
+ else:
+ our_path = our_name
+ for child in children:
+ for child_name, grandchildren in child.items():
+ _build_cell_stub_info(test_case, child_name, our_path,
+ grandchildren)
+ cell_entry = dict(id=cur_db_id,
+ name=child_name,
+ username='username%s' % cur_db_id,
+ password='password%s' % cur_db_id,
+ rpc_host='rpc_host%s' % cur_db_id,
+ rpc_port='rpc_port%s' % cur_db_id,
+ rpc_virtual_host='rpc_vhost%s' % cur_db_id,
+ is_parent=False)
+ cell_db_entries.append(cell_entry)
+ cur_db_id += 1
+ stub_info = CellStubInfo(test_case, our_name, cell_db_entries)
+ CELL_NAME_TO_STUB_INFO[our_name] = stub_info
+
+
+def _build_cell_stub_infos(test_case):
+ _build_cell_stub_info(test_case, FAKE_TOP_LEVEL_CELL_NAME, '',
+ FAKE_CELL_LAYOUT)
+
+
+def init(test_case):
+ global CELL_NAME_TO_STUB_INFO
+ test_case.flags(driver='nova.tests.cells.fakes.FakeCellsDriver',
+ group='cells')
+ CELL_NAME_TO_STUB_INFO = {}
+ _build_cell_stub_infos(test_case)
+
+
+def _get_cell_stub_info(cell_name):
+ return CELL_NAME_TO_STUB_INFO[cell_name]
+
+
+def get_state_manager(cell_name):
+ return _get_cell_stub_info(cell_name).cells_manager.state_manager
+
+
+def get_cell_state(cur_cell_name, tgt_cell_name):
+ state_manager = get_state_manager(cur_cell_name)
+ cell = state_manager.child_cells.get(tgt_cell_name)
+ if cell is None:
+ cell = state_manager.parent_cells.get(tgt_cell_name)
+ return cell
+
+
+def get_cells_manager(cell_name):
+ return _get_cell_stub_info(cell_name).cells_manager
+
+
+def get_message_runner(cell_name):
+ return _get_cell_stub_info(cell_name).cells_manager.msg_runner
+
+
+def stub_tgt_method(test_case, cell_name, method_name, method):
+ msg_runner = get_message_runner(cell_name)
+ tgt_msg_methods = msg_runner.methods_by_type['targeted']
+ setattr(tgt_msg_methods, method_name, method)
+
+
+def stub_bcast_method(test_case, cell_name, method_name, method):
+ msg_runner = get_message_runner(cell_name)
+ tgt_msg_methods = msg_runner.methods_by_type['broadcast']
+ setattr(tgt_msg_methods, method_name, method)
+
+
+def stub_bcast_methods(test_case, method_name, method):
+ for cell_name in CELL_NAME_TO_STUB_INFO.keys():
+ stub_bcast_method(test_case, cell_name, method_name, method)
diff --git a/nova/tests/cells/test_cells_manager.py b/nova/tests/cells/test_cells_manager.py
new file mode 100644
index 000000000..5a2b83145
--- /dev/null
+++ b/nova/tests/cells/test_cells_manager.py
@@ -0,0 +1,151 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Tests For CellsManager
+"""
+from nova.cells import messaging
+from nova import context
+from nova import test
+from nova.tests.cells import fakes
+
+
+class CellsManagerClassTestCase(test.TestCase):
+ """Test case for CellsManager class"""
+
+ def setUp(self):
+ super(CellsManagerClassTestCase, self).setUp()
+ fakes.init(self)
+ # pick a child cell to use for tests.
+ self.our_cell = 'grandchild-cell1'
+ self.cells_manager = fakes.get_cells_manager(self.our_cell)
+ self.msg_runner = self.cells_manager.msg_runner
+ self.driver = self.cells_manager.driver
+ self.ctxt = 'fake_context'
+
+ def test_post_start_hook_child_cell(self):
+ self.mox.StubOutWithMock(self.driver, 'start_consumers')
+ self.mox.StubOutWithMock(context, 'get_admin_context')
+ self.mox.StubOutWithMock(self.cells_manager, '_update_our_parents')
+
+ self.driver.start_consumers(self.msg_runner)
+ context.get_admin_context().AndReturn(self.ctxt)
+ self.cells_manager._update_our_parents(self.ctxt)
+ self.mox.ReplayAll()
+ self.cells_manager.post_start_hook()
+
+ def test_post_start_hook_middle_cell(self):
+ cells_manager = fakes.get_cells_manager('child-cell2')
+ msg_runner = cells_manager.msg_runner
+ driver = cells_manager.driver
+
+ self.mox.StubOutWithMock(driver, 'start_consumers')
+ self.mox.StubOutWithMock(context, 'get_admin_context')
+ self.mox.StubOutWithMock(msg_runner,
+ 'ask_children_for_capabilities')
+ self.mox.StubOutWithMock(msg_runner,
+ 'ask_children_for_capacities')
+
+ driver.start_consumers(msg_runner)
+ context.get_admin_context().AndReturn(self.ctxt)
+ msg_runner.ask_children_for_capabilities(self.ctxt)
+ msg_runner.ask_children_for_capacities(self.ctxt)
+ self.mox.ReplayAll()
+ cells_manager.post_start_hook()
+
+ def test_update_our_parents(self):
+ self.mox.StubOutWithMock(self.msg_runner,
+ 'tell_parents_our_capabilities')
+ self.mox.StubOutWithMock(self.msg_runner,
+ 'tell_parents_our_capacities')
+
+ self.msg_runner.tell_parents_our_capabilities(self.ctxt)
+ self.msg_runner.tell_parents_our_capacities(self.ctxt)
+ self.mox.ReplayAll()
+ self.cells_manager._update_our_parents(self.ctxt)
+
+ def test_schedule_run_instance(self):
+ host_sched_kwargs = 'fake_host_sched_kwargs_silently_passed'
+ self.mox.StubOutWithMock(self.msg_runner, 'schedule_run_instance')
+ our_cell = self.msg_runner.state_manager.get_my_state()
+ self.msg_runner.schedule_run_instance(self.ctxt, our_cell,
+ host_sched_kwargs)
+ self.mox.ReplayAll()
+ self.cells_manager.schedule_run_instance(self.ctxt,
+ host_sched_kwargs=host_sched_kwargs)
+
+ def test_run_compute_api_method(self):
+ # Args should just be silently passed through
+ cell_name = 'fake-cell-name'
+ method_info = 'fake-method-info'
+
+ fake_response = messaging.Response('fake', 'fake', False)
+
+ self.mox.StubOutWithMock(self.msg_runner,
+ 'run_compute_api_method')
+ self.mox.StubOutWithMock(fake_response,
+ 'value_or_raise')
+ self.msg_runner.run_compute_api_method(self.ctxt,
+ cell_name,
+ method_info,
+ True).AndReturn(fake_response)
+ fake_response.value_or_raise().AndReturn('fake-response')
+ self.mox.ReplayAll()
+ response = self.cells_manager.run_compute_api_method(
+ self.ctxt, cell_name=cell_name, method_info=method_info,
+ call=True)
+ self.assertEqual('fake-response', response)
+
+ def test_instance_update_at_top(self):
+ self.mox.StubOutWithMock(self.msg_runner, 'instance_update_at_top')
+ self.msg_runner.instance_update_at_top(self.ctxt, 'fake-instance')
+ self.mox.ReplayAll()
+ self.cells_manager.instance_update_at_top(self.ctxt,
+ instance='fake-instance')
+
+ def test_instance_destroy_at_top(self):
+ self.mox.StubOutWithMock(self.msg_runner, 'instance_destroy_at_top')
+ self.msg_runner.instance_destroy_at_top(self.ctxt, 'fake-instance')
+ self.mox.ReplayAll()
+ self.cells_manager.instance_destroy_at_top(self.ctxt,
+ instance='fake-instance')
+
+ def test_instance_delete_everywhere(self):
+ self.mox.StubOutWithMock(self.msg_runner,
+ 'instance_delete_everywhere')
+ self.msg_runner.instance_delete_everywhere(self.ctxt,
+ 'fake-instance',
+ 'fake-type')
+ self.mox.ReplayAll()
+ self.cells_manager.instance_delete_everywhere(
+ self.ctxt, instance='fake-instance',
+ delete_type='fake-type')
+
+ def test_instance_fault_create_at_top(self):
+ self.mox.StubOutWithMock(self.msg_runner,
+ 'instance_fault_create_at_top')
+ self.msg_runner.instance_fault_create_at_top(self.ctxt,
+ 'fake-fault')
+ self.mox.ReplayAll()
+ self.cells_manager.instance_fault_create_at_top(
+ self.ctxt, instance_fault='fake-fault')
+
+ def test_bw_usage_update_at_top(self):
+ self.mox.StubOutWithMock(self.msg_runner,
+ 'bw_usage_update_at_top')
+ self.msg_runner.bw_usage_update_at_top(self.ctxt,
+ 'fake-bw-info')
+ self.mox.ReplayAll()
+ self.cells_manager.bw_usage_update_at_top(
+ self.ctxt, bw_update_info='fake-bw-info')
diff --git a/nova/tests/cells/test_cells_messaging.py b/nova/tests/cells/test_cells_messaging.py
new file mode 100644
index 000000000..d728c9474
--- /dev/null
+++ b/nova/tests/cells/test_cells_messaging.py
@@ -0,0 +1,913 @@
+# Copyright (c) 2012 Rackspace Hosting # All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Tests For Cells Messaging module
+"""
+
+from nova.cells import messaging
+from nova import context
+from nova import exception
+from nova.openstack.common import cfg
+from nova import test
+from nova.tests.cells import fakes
+
+
+CONF = cfg.CONF
+CONF.import_opt('host', 'nova.config')
+CONF.import_opt('name', 'nova.cells.opts', group='cells')
+CONF.import_opt('allowed_rpc_exception_modules',
+ 'nova.openstack.common.rpc')
+
+
+class CellsMessageClassesTestCase(test.TestCase):
+ """Test case for the main Cells Message classes."""
+ def setUp(self):
+ super(CellsMessageClassesTestCase, self).setUp()
+ fakes.init(self)
+ self.ctxt = context.RequestContext('fake', 'fake')
+ # Need to be able to deserialize test.TestingException.
+ allowed_modules = CONF.allowed_rpc_exception_modules
+ allowed_modules.append('nova.test')
+ self.flags(allowed_rpc_exception_modules=allowed_modules)
+ self.our_name = 'api-cell'
+ self.msg_runner = fakes.get_message_runner(self.our_name)
+ self.state_manager = self.msg_runner.state_manager
+
+ def test_reverse_path(self):
+ path = 'a!b!c!d'
+ expected = 'd!c!b!a'
+ rev_path = messaging._reverse_path(path)
+ self.assertEqual(rev_path, expected)
+
+ def test_response_cell_name_from_path(self):
+ # test array with tuples of inputs/expected outputs
+ test_paths = [('cell1', 'cell1'),
+ ('cell1!cell2', 'cell2!cell1'),
+ ('cell1!cell2!cell3', 'cell3!cell2!cell1')]
+
+ for test_input, expected_output in test_paths:
+ self.assertEqual(expected_output,
+ messaging._response_cell_name_from_path(test_input))
+
+ def test_response_cell_name_from_path_neighbor_only(self):
+ # test array with tuples of inputs/expected outputs
+ test_paths = [('cell1', 'cell1'),
+ ('cell1!cell2', 'cell2!cell1'),
+ ('cell1!cell2!cell3', 'cell3!cell2')]
+
+ for test_input, expected_output in test_paths:
+ self.assertEqual(expected_output,
+ messaging._response_cell_name_from_path(test_input,
+ neighbor_only=True))
+
+ def test_targeted_message(self):
+ self.flags(max_hop_count=99, group='cells')
+ target_cell = 'api-cell!child-cell2!grandchild-cell1'
+ method = 'fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+ tgt_message = messaging._TargetedMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell)
+ self.assertEqual(self.ctxt, tgt_message.ctxt)
+ self.assertEqual(method, tgt_message.method_name)
+ self.assertEqual(method_kwargs, tgt_message.method_kwargs)
+ self.assertEqual(direction, tgt_message.direction)
+ self.assertEqual(target_cell, target_cell)
+ self.assertFalse(tgt_message.fanout)
+ self.assertFalse(tgt_message.need_response)
+ self.assertEqual(self.our_name, tgt_message.routing_path)
+ self.assertEqual(1, tgt_message.hop_count)
+ self.assertEqual(99, tgt_message.max_hop_count)
+ self.assertFalse(tgt_message.is_broadcast)
+ # Correct next hop?
+ next_hop = tgt_message._get_next_hop()
+ child_cell = self.state_manager.get_child_cell('child-cell2')
+ self.assertEqual(child_cell, next_hop)
+
+ def test_create_targeted_message_with_response(self):
+ self.flags(max_hop_count=99, group='cells')
+ our_name = 'child-cell1'
+ target_cell = 'child-cell1!api-cell'
+ msg_runner = fakes.get_message_runner(our_name)
+ method = 'fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'up'
+ tgt_message = messaging._TargetedMessage(msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell,
+ need_response=True)
+ self.assertEqual(self.ctxt, tgt_message.ctxt)
+ self.assertEqual(method, tgt_message.method_name)
+ self.assertEqual(method_kwargs, tgt_message.method_kwargs)
+ self.assertEqual(direction, tgt_message.direction)
+ self.assertEqual(target_cell, target_cell)
+ self.assertFalse(tgt_message.fanout)
+ self.assertTrue(tgt_message.need_response)
+ self.assertEqual(our_name, tgt_message.routing_path)
+ self.assertEqual(1, tgt_message.hop_count)
+ self.assertEqual(99, tgt_message.max_hop_count)
+ self.assertFalse(tgt_message.is_broadcast)
+ # Correct next hop?
+ next_hop = tgt_message._get_next_hop()
+ parent_cell = msg_runner.state_manager.get_parent_cell('api-cell')
+ self.assertEqual(parent_cell, next_hop)
+
+ def test_targeted_message_when_target_is_cell_state(self):
+ method = 'fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+ target_cell = self.state_manager.get_child_cell('child-cell2')
+ tgt_message = messaging._TargetedMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell)
+ self.assertEqual('api-cell!child-cell2', tgt_message.target_cell)
+ # Correct next hop?
+ next_hop = tgt_message._get_next_hop()
+ self.assertEqual(target_cell, next_hop)
+
+ def test_targeted_message_when_target_cell_state_is_me(self):
+ method = 'fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+ target_cell = self.state_manager.get_my_state()
+ tgt_message = messaging._TargetedMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell)
+ self.assertEqual('api-cell', tgt_message.target_cell)
+ # Correct next hop?
+ next_hop = tgt_message._get_next_hop()
+ self.assertEqual(target_cell, next_hop)
+
+ def test_create_broadcast_message(self):
+ self.flags(max_hop_count=99, group='cells')
+ self.flags(name='api-cell', max_hop_count=99, group='cells')
+ method = 'fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+ bcast_message = messaging._BroadcastMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction)
+ self.assertEqual(self.ctxt, bcast_message.ctxt)
+ self.assertEqual(method, bcast_message.method_name)
+ self.assertEqual(method_kwargs, bcast_message.method_kwargs)
+ self.assertEqual(direction, bcast_message.direction)
+ self.assertFalse(bcast_message.fanout)
+ self.assertFalse(bcast_message.need_response)
+ self.assertEqual(self.our_name, bcast_message.routing_path)
+ self.assertEqual(1, bcast_message.hop_count)
+ self.assertEqual(99, bcast_message.max_hop_count)
+ self.assertTrue(bcast_message.is_broadcast)
+ # Correct next hops?
+ next_hops = bcast_message._get_next_hops()
+ child_cells = self.state_manager.get_child_cells()
+ self.assertEqual(child_cells, next_hops)
+
+ def test_create_broadcast_message_with_response(self):
+ self.flags(max_hop_count=99, group='cells')
+ our_name = 'child-cell1'
+ msg_runner = fakes.get_message_runner(our_name)
+ method = 'fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'up'
+ bcast_message = messaging._BroadcastMessage(msg_runner, self.ctxt,
+ method, method_kwargs, direction, need_response=True)
+ self.assertEqual(self.ctxt, bcast_message.ctxt)
+ self.assertEqual(method, bcast_message.method_name)
+ self.assertEqual(method_kwargs, bcast_message.method_kwargs)
+ self.assertEqual(direction, bcast_message.direction)
+ self.assertFalse(bcast_message.fanout)
+ self.assertTrue(bcast_message.need_response)
+ self.assertEqual(our_name, bcast_message.routing_path)
+ self.assertEqual(1, bcast_message.hop_count)
+ self.assertEqual(99, bcast_message.max_hop_count)
+ self.assertTrue(bcast_message.is_broadcast)
+ # Correct next hops?
+ next_hops = bcast_message._get_next_hops()
+ parent_cells = msg_runner.state_manager.get_parent_cells()
+ self.assertEqual(parent_cells, next_hops)
+
+ def test_self_targeted_message(self):
+ target_cell = 'api-cell'
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ call_info = {}
+
+ def our_fake_method(message, **kwargs):
+ call_info['context'] = message.ctxt
+ call_info['routing_path'] = message.routing_path
+ call_info['kwargs'] = kwargs
+
+ fakes.stub_tgt_method(self, 'api-cell', 'our_fake_method',
+ our_fake_method)
+
+ tgt_message = messaging._TargetedMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell)
+ tgt_message.process()
+
+ self.assertEqual(self.ctxt, call_info['context'])
+ self.assertEqual(method_kwargs, call_info['kwargs'])
+ self.assertEqual(target_cell, call_info['routing_path'])
+
+ def test_child_targeted_message(self):
+ target_cell = 'api-cell!child-cell1'
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ call_info = {}
+
+ def our_fake_method(message, **kwargs):
+ call_info['context'] = message.ctxt
+ call_info['routing_path'] = message.routing_path
+ call_info['kwargs'] = kwargs
+
+ fakes.stub_tgt_method(self, 'child-cell1', 'our_fake_method',
+ our_fake_method)
+
+ tgt_message = messaging._TargetedMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell)
+ tgt_message.process()
+
+ self.assertEqual(self.ctxt, call_info['context'])
+ self.assertEqual(method_kwargs, call_info['kwargs'])
+ self.assertEqual(target_cell, call_info['routing_path'])
+
+ def test_grandchild_targeted_message(self):
+ target_cell = 'api-cell!child-cell2!grandchild-cell1'
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ call_info = {}
+
+ def our_fake_method(message, **kwargs):
+ call_info['context'] = message.ctxt
+ call_info['routing_path'] = message.routing_path
+ call_info['kwargs'] = kwargs
+
+ fakes.stub_tgt_method(self, 'grandchild-cell1', 'our_fake_method',
+ our_fake_method)
+
+ tgt_message = messaging._TargetedMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell)
+ tgt_message.process()
+
+ self.assertEqual(self.ctxt, call_info['context'])
+ self.assertEqual(method_kwargs, call_info['kwargs'])
+ self.assertEqual(target_cell, call_info['routing_path'])
+
+ def test_grandchild_targeted_message_with_response(self):
+ target_cell = 'api-cell!child-cell2!grandchild-cell1'
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ call_info = {}
+
+ def our_fake_method(message, **kwargs):
+ call_info['context'] = message.ctxt
+ call_info['routing_path'] = message.routing_path
+ call_info['kwargs'] = kwargs
+ return 'our_fake_response'
+
+ fakes.stub_tgt_method(self, 'grandchild-cell1', 'our_fake_method',
+ our_fake_method)
+
+ tgt_message = messaging._TargetedMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell,
+ need_response=True)
+ response = tgt_message.process()
+
+ self.assertEqual(self.ctxt, call_info['context'])
+ self.assertEqual(method_kwargs, call_info['kwargs'])
+ self.assertEqual(target_cell, call_info['routing_path'])
+ self.assertFalse(response.failure)
+ self.assertTrue(response.value_or_raise(), 'our_fake_response')
+
+ def test_grandchild_targeted_message_with_error(self):
+ target_cell = 'api-cell!child-cell2!grandchild-cell1'
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ def our_fake_method(message, **kwargs):
+ raise test.TestingException('this should be returned')
+
+ fakes.stub_tgt_method(self, 'grandchild-cell1', 'our_fake_method',
+ our_fake_method)
+
+ tgt_message = messaging._TargetedMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell,
+ need_response=True)
+ response = tgt_message.process()
+ self.assertTrue(response.failure)
+ self.assertRaises(test.TestingException, response.value_or_raise)
+
+ def test_grandchild_targeted_message_max_hops(self):
+ self.flags(max_hop_count=2, group='cells')
+ target_cell = 'api-cell!child-cell2!grandchild-cell1'
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ def our_fake_method(message, **kwargs):
+ raise test.TestingException('should not be reached')
+
+ fakes.stub_tgt_method(self, 'grandchild-cell1', 'our_fake_method',
+ our_fake_method)
+
+ tgt_message = messaging._TargetedMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell,
+ need_response=True)
+ response = tgt_message.process()
+ self.assertTrue(response.failure)
+ self.assertRaises(exception.CellMaxHopCountReached,
+ response.value_or_raise)
+
+ def test_targeted_message_invalid_cell(self):
+ target_cell = 'api-cell!child-cell2!grandchild-cell4'
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ tgt_message = messaging._TargetedMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell,
+ need_response=True)
+ response = tgt_message.process()
+ self.assertTrue(response.failure)
+ self.assertRaises(exception.CellRoutingInconsistency,
+ response.value_or_raise)
+
+ def test_targeted_message_invalid_cell2(self):
+ target_cell = 'unknown-cell!child-cell2'
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ tgt_message = messaging._TargetedMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell,
+ need_response=True)
+ response = tgt_message.process()
+ self.assertTrue(response.failure)
+ self.assertRaises(exception.CellRoutingInconsistency,
+ response.value_or_raise)
+
+ def test_broadcast_routing(self):
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ cells = set()
+
+ def our_fake_method(message, **kwargs):
+ cells.add(message.routing_path)
+
+ fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)
+
+ bcast_message = messaging._BroadcastMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs,
+ direction,
+ run_locally=True)
+ bcast_message.process()
+ # fakes creates 8 cells (including ourself).
+ self.assertEqual(len(cells), 8)
+
+ def test_broadcast_routing_up(self):
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'up'
+ msg_runner = fakes.get_message_runner('grandchild-cell3')
+
+ cells = set()
+
+ def our_fake_method(message, **kwargs):
+ cells.add(message.routing_path)
+
+ fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)
+
+ bcast_message = messaging._BroadcastMessage(msg_runner, self.ctxt,
+ method, method_kwargs,
+ direction,
+ run_locally=True)
+ bcast_message.process()
+ # Paths are reversed, since going 'up'
+ expected = set(['grandchild-cell3', 'grandchild-cell3!child-cell3',
+ 'grandchild-cell3!child-cell3!api-cell'])
+ self.assertEqual(expected, cells)
+
+ def test_broadcast_routing_without_ourselves(self):
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ cells = set()
+
+ def our_fake_method(message, **kwargs):
+ cells.add(message.routing_path)
+
+ fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)
+
+ bcast_message = messaging._BroadcastMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs,
+ direction,
+ run_locally=False)
+ bcast_message.process()
+ # fakes creates 8 cells (including ourself). So we should see
+ # only 7 here.
+ self.assertEqual(len(cells), 7)
+
+ def test_broadcast_routing_with_response(self):
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ def our_fake_method(message, **kwargs):
+ return 'response-%s' % message.routing_path
+
+ fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)
+
+ bcast_message = messaging._BroadcastMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs,
+ direction,
+ run_locally=True,
+ need_response=True)
+ responses = bcast_message.process()
+ self.assertEqual(len(responses), 8)
+ for response in responses:
+ self.assertFalse(response.failure)
+ self.assertEqual('response-%s' % response.cell_name,
+ response.value_or_raise())
+
+ def test_broadcast_routing_with_response_max_hops(self):
+ self.flags(max_hop_count=2, group='cells')
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ def our_fake_method(message, **kwargs):
+ return 'response-%s' % message.routing_path
+
+ fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)
+
+ bcast_message = messaging._BroadcastMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs,
+ direction,
+ run_locally=True,
+ need_response=True)
+ responses = bcast_message.process()
+ # Should only get responses from our immediate children (and
+ # ourselves)
+ self.assertEqual(len(responses), 5)
+ for response in responses:
+ self.assertFalse(response.failure)
+ self.assertEqual('response-%s' % response.cell_name,
+ response.value_or_raise())
+
+ def test_broadcast_routing_with_all_erroring(self):
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ def our_fake_method(message, **kwargs):
+ raise test.TestingException('fake failure')
+
+ fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)
+
+ bcast_message = messaging._BroadcastMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs,
+ direction,
+ run_locally=True,
+ need_response=True)
+ responses = bcast_message.process()
+ self.assertEqual(len(responses), 8)
+ for response in responses:
+ self.assertTrue(response.failure)
+ self.assertRaises(test.TestingException, response.value_or_raise)
+
+ def test_broadcast_routing_with_two_erroring(self):
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ def our_fake_method_failing(message, **kwargs):
+ raise test.TestingException('fake failure')
+
+ def our_fake_method(message, **kwargs):
+ return 'response-%s' % message.routing_path
+
+ fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)
+ fakes.stub_bcast_method(self, 'child-cell2', 'our_fake_method',
+ our_fake_method_failing)
+ fakes.stub_bcast_method(self, 'grandchild-cell3', 'our_fake_method',
+ our_fake_method_failing)
+
+ bcast_message = messaging._BroadcastMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs,
+ direction,
+ run_locally=True,
+ need_response=True)
+ responses = bcast_message.process()
+ self.assertEqual(len(responses), 8)
+ failure_responses = [resp for resp in responses if resp.failure]
+ success_responses = [resp for resp in responses if not resp.failure]
+ self.assertEqual(len(failure_responses), 2)
+ self.assertEqual(len(success_responses), 6)
+
+ for response in success_responses:
+ self.assertFalse(response.failure)
+ self.assertEqual('response-%s' % response.cell_name,
+ response.value_or_raise())
+
+ for response in failure_responses:
+ self.assertIn(response.cell_name, ['api-cell!child-cell2',
+ 'api-cell!child-cell3!grandchild-cell3'])
+ self.assertTrue(response.failure)
+ self.assertRaises(test.TestingException, response.value_or_raise)
+
+
+class CellsTargetedMethodsTestCase(test.TestCase):
+ """Test case for _TargetedMessageMethods class. Most of these
+ tests actually test the full path from the MessageRunner through
+ to the functionality of the message method. Hits 2 birds with 1
+ stone, even though it's a little more than a unit test.
+ """
+ def setUp(self):
+ super(CellsTargetedMethodsTestCase, self).setUp()
+ fakes.init(self)
+ self.ctxt = context.RequestContext('fake', 'fake')
+ self._setup_attrs('api-cell', 'api-cell!child-cell2')
+
+ def _setup_attrs(self, source_cell, target_cell):
+ self.tgt_cell_name = target_cell
+ self.src_msg_runner = fakes.get_message_runner(source_cell)
+ self.src_state_manager = self.src_msg_runner.state_manager
+ tgt_shortname = target_cell.split('!')[-1]
+ self.tgt_cell_mgr = fakes.get_cells_manager(tgt_shortname)
+ self.tgt_msg_runner = self.tgt_cell_mgr.msg_runner
+ self.tgt_scheduler = self.tgt_msg_runner.scheduler
+ self.tgt_state_manager = self.tgt_msg_runner.state_manager
+ methods_cls = self.tgt_msg_runner.methods_by_type['targeted']
+ self.tgt_methods_cls = methods_cls
+ self.tgt_compute_api = methods_cls.compute_api
+ self.tgt_db_inst = methods_cls.db
+
+ def test_schedule_run_instance(self):
+ host_sched_kwargs = {'filter_properties': {},
+ 'key1': 'value1',
+ 'key2': 'value2'}
+ self.mox.StubOutWithMock(self.tgt_scheduler, 'run_instance')
+ self.tgt_scheduler.run_instance(self.ctxt, host_sched_kwargs)
+ self.mox.ReplayAll()
+ self.src_msg_runner.schedule_run_instance(self.ctxt,
+ self.tgt_cell_name,
+ host_sched_kwargs)
+
+ def test_call_compute_api_method(self):
+
+ instance_uuid = 'fake_instance_uuid'
+ method_info = {'method': 'reboot',
+ 'method_args': (instance_uuid, 2, 3),
+ 'method_kwargs': {'arg1': 'val1', 'arg2': 'val2'}}
+ self.mox.StubOutWithMock(self.tgt_compute_api, 'reboot')
+ self.mox.StubOutWithMock(self.tgt_db_inst, 'instance_get_by_uuid')
+
+ self.tgt_db_inst.instance_get_by_uuid(self.ctxt,
+ instance_uuid).AndReturn(
+ 'fake_instance')
+ self.tgt_compute_api.reboot(self.ctxt, 'fake_instance', 2, 3,
+ arg1='val1', arg2='val2').AndReturn('fake_result')
+ self.mox.ReplayAll()
+
+ response = self.src_msg_runner.run_compute_api_method(
+ self.ctxt,
+ self.tgt_cell_name,
+ method_info,
+ True)
+ result = response.value_or_raise()
+ self.assertEqual('fake_result', result)
+
+ def test_call_compute_api_method_unknown_instance(self):
+ # Unknown instance should send a broadcast up that instance
+ # is gone.
+ instance_uuid = 'fake_instance_uuid'
+ instance = {'uuid': instance_uuid}
+ method_info = {'method': 'reboot',
+ 'method_args': (instance_uuid, 2, 3),
+ 'method_kwargs': {'arg1': 'val1', 'arg2': 'val2'}}
+
+ self.mox.StubOutWithMock(self.tgt_db_inst, 'instance_get_by_uuid')
+ self.mox.StubOutWithMock(self.tgt_msg_runner,
+ 'instance_destroy_at_top')
+
+ self.tgt_db_inst.instance_get_by_uuid(self.ctxt,
+ 'fake_instance_uuid').AndRaise(
+ exception.InstanceNotFound(instance_id=instance_uuid))
+ self.tgt_msg_runner.instance_destroy_at_top(self.ctxt, instance)
+
+ self.mox.ReplayAll()
+
+ response = self.src_msg_runner.run_compute_api_method(
+ self.ctxt,
+ self.tgt_cell_name,
+ method_info,
+ True)
+ self.assertRaises(exception.InstanceNotFound,
+ response.value_or_raise)
+
+ def test_update_capabilities(self):
+ # Route up to API
+ self._setup_attrs('child-cell2', 'child-cell2!api-cell')
+ capabs = {'cap1': set(['val1', 'val2']),
+ 'cap2': set(['val3'])}
+ # The list(set([])) seems silly, but we can't assume the order
+ # of the list... This behavior should match the code we're
+ # testing... which is check that a set was converted to a list.
+ expected_capabs = {'cap1': list(set(['val1', 'val2'])),
+ 'cap2': ['val3']}
+ self.mox.StubOutWithMock(self.src_state_manager,
+ 'get_our_capabilities')
+ self.mox.StubOutWithMock(self.tgt_state_manager,
+ 'update_cell_capabilities')
+ self.mox.StubOutWithMock(self.tgt_msg_runner,
+ 'tell_parents_our_capabilities')
+ self.src_state_manager.get_our_capabilities().AndReturn(capabs)
+ self.tgt_state_manager.update_cell_capabilities('child-cell2',
+ expected_capabs)
+ self.tgt_msg_runner.tell_parents_our_capabilities(self.ctxt)
+
+ self.mox.ReplayAll()
+
+ self.src_msg_runner.tell_parents_our_capabilities(self.ctxt)
+
+ def test_update_capacities(self):
+ self._setup_attrs('child-cell2', 'child-cell2!api-cell')
+ capacs = 'fake_capacs'
+ self.mox.StubOutWithMock(self.src_state_manager,
+ 'get_our_capacities')
+ self.mox.StubOutWithMock(self.tgt_state_manager,
+ 'update_cell_capacities')
+ self.mox.StubOutWithMock(self.tgt_msg_runner,
+ 'tell_parents_our_capacities')
+ self.src_state_manager.get_our_capacities().AndReturn(capacs)
+ self.tgt_state_manager.update_cell_capacities('child-cell2',
+ capacs)
+ self.tgt_msg_runner.tell_parents_our_capacities(self.ctxt)
+
+ self.mox.ReplayAll()
+
+ self.src_msg_runner.tell_parents_our_capacities(self.ctxt)
+
+ def test_announce_capabilities(self):
+ self._setup_attrs('api-cell', 'api-cell!child-cell1')
+ # To make this easier to test, make us only have 1 child cell.
+ cell_state = self.src_state_manager.child_cells['child-cell1']
+ self.src_state_manager.child_cells = {'child-cell1': cell_state}
+
+ self.mox.StubOutWithMock(self.tgt_msg_runner,
+ 'tell_parents_our_capabilities')
+ self.tgt_msg_runner.tell_parents_our_capabilities(self.ctxt)
+
+ self.mox.ReplayAll()
+
+ self.src_msg_runner.ask_children_for_capabilities(self.ctxt)
+
+ def test_announce_capacities(self):
+ self._setup_attrs('api-cell', 'api-cell!child-cell1')
+ # To make this easier to test, make us only have 1 child cell.
+ cell_state = self.src_state_manager.child_cells['child-cell1']
+ self.src_state_manager.child_cells = {'child-cell1': cell_state}
+
+ self.mox.StubOutWithMock(self.tgt_msg_runner,
+ 'tell_parents_our_capacities')
+ self.tgt_msg_runner.tell_parents_our_capacities(self.ctxt)
+
+ self.mox.ReplayAll()
+
+ self.src_msg_runner.ask_children_for_capacities(self.ctxt)
+
+
+class CellsBroadcastMethodsTestCase(test.TestCase):
+ """Test case for _BroadcastMessageMethods class. Most of these
+ tests actually test the full path from the MessageRunner through
+ to the functionality of the message method. Hits 2 birds with 1
+ stone, even though it's a little more than a unit test.
+ """
+
+ def setUp(self):
+ super(CellsBroadcastMethodsTestCase, self).setUp()
+ fakes.init(self)
+ self.ctxt = context.RequestContext('fake', 'fake')
+ self._setup_attrs()
+
+ def _setup_attrs(self, up=True):
+ mid_cell = 'child-cell2'
+ if up:
+ src_cell = 'grandchild-cell1'
+ tgt_cell = 'api-cell'
+ else:
+ src_cell = 'api-cell'
+ tgt_cell = 'grandchild-cell1'
+
+ self.src_msg_runner = fakes.get_message_runner(src_cell)
+ methods_cls = self.src_msg_runner.methods_by_type['broadcast']
+ self.src_methods_cls = methods_cls
+ self.src_db_inst = methods_cls.db
+ self.src_compute_api = methods_cls.compute_api
+
+ self.mid_msg_runner = fakes.get_message_runner(mid_cell)
+ methods_cls = self.mid_msg_runner.methods_by_type['broadcast']
+ self.mid_methods_cls = methods_cls
+ self.mid_db_inst = methods_cls.db
+ self.mid_compute_api = methods_cls.compute_api
+
+ self.tgt_msg_runner = fakes.get_message_runner(tgt_cell)
+ methods_cls = self.tgt_msg_runner.methods_by_type['broadcast']
+ self.tgt_methods_cls = methods_cls
+ self.tgt_db_inst = methods_cls.db
+ self.tgt_compute_api = methods_cls.compute_api
+
+ def test_at_the_top(self):
+ self.assertTrue(self.tgt_methods_cls._at_the_top())
+ self.assertFalse(self.mid_methods_cls._at_the_top())
+ self.assertFalse(self.src_methods_cls._at_the_top())
+
+ def test_instance_update_at_top(self):
+ fake_info_cache = {'id': 1,
+ 'instance': 'fake_instance',
+ 'other': 'moo'}
+ fake_sys_metadata = [{'id': 1,
+ 'key': 'key1',
+ 'value': 'value1'},
+ {'id': 2,
+ 'key': 'key2',
+ 'value': 'value2'}]
+ fake_instance = {'id': 2,
+ 'uuid': 'fake_uuid',
+ 'security_groups': 'fake',
+ 'instance_type': 'fake',
+ 'volumes': 'fake',
+ 'cell_name': 'fake',
+ 'name': 'fake',
+ 'metadata': 'fake',
+ 'info_cache': fake_info_cache,
+ 'system_metadata': fake_sys_metadata,
+ 'other': 'meow'}
+ expected_sys_metadata = {'key1': 'value1',
+ 'key2': 'value2'}
+ expected_info_cache = {'other': 'moo'}
+ expected_instance = {'system_metadata': expected_sys_metadata,
+ 'other': 'meow',
+ 'uuid': 'fake_uuid'}
+
+ # To show these should not be called in src/mid-level cell
+ self.mox.StubOutWithMock(self.src_db_inst, 'instance_update')
+ self.mox.StubOutWithMock(self.src_db_inst,
+ 'instance_info_cache_update')
+ self.mox.StubOutWithMock(self.mid_db_inst, 'instance_update')
+ self.mox.StubOutWithMock(self.mid_db_inst,
+ 'instance_info_cache_update')
+
+ self.mox.StubOutWithMock(self.tgt_db_inst, 'instance_update')
+ self.mox.StubOutWithMock(self.tgt_db_inst,
+ 'instance_info_cache_update')
+ self.tgt_db_inst.instance_update(self.ctxt, 'fake_uuid',
+ expected_instance,
+ update_cells=False)
+ self.tgt_db_inst.instance_info_cache_update(self.ctxt, 'fake_uuid',
+ expected_info_cache,
+ update_cells=False)
+ self.mox.ReplayAll()
+
+ self.src_msg_runner.instance_update_at_top(self.ctxt, fake_instance)
+
+ def test_instance_destroy_at_top(self):
+ fake_instance = {'uuid': 'fake_uuid'}
+
+ # To show these should not be called in src/mid-level cell
+ self.mox.StubOutWithMock(self.src_db_inst, 'instance_destroy')
+
+ self.mox.StubOutWithMock(self.tgt_db_inst, 'instance_destroy')
+ self.tgt_db_inst.instance_destroy(self.ctxt, 'fake_uuid',
+ update_cells=False)
+ self.mox.ReplayAll()
+
+ self.src_msg_runner.instance_destroy_at_top(self.ctxt, fake_instance)
+
+ def test_instance_hard_delete_everywhere(self):
+ # Reset this, as this is a broadcast down.
+ self._setup_attrs(up=False)
+ instance = {'uuid': 'meow'}
+
+ # Should not be called in src (API cell)
+ self.mox.StubOutWithMock(self.src_compute_api, 'delete')
+
+ self.mox.StubOutWithMock(self.mid_compute_api, 'delete')
+ self.mox.StubOutWithMock(self.tgt_compute_api, 'delete')
+
+ self.mid_compute_api.delete(self.ctxt, instance)
+ self.tgt_compute_api.delete(self.ctxt, instance)
+
+ self.mox.ReplayAll()
+
+ self.src_msg_runner.instance_delete_everywhere(self.ctxt,
+ instance, 'hard')
+
+ def test_instance_soft_delete_everywhere(self):
+ # Reset this, as this is a broadcast down.
+ self._setup_attrs(up=False)
+ instance = {'uuid': 'meow'}
+
+ # Should not be called in src (API cell)
+ self.mox.StubOutWithMock(self.src_compute_api, 'soft_delete')
+
+ self.mox.StubOutWithMock(self.mid_compute_api, 'soft_delete')
+ self.mox.StubOutWithMock(self.tgt_compute_api, 'soft_delete')
+
+ self.mid_compute_api.soft_delete(self.ctxt, instance)
+ self.tgt_compute_api.soft_delete(self.ctxt, instance)
+
+ self.mox.ReplayAll()
+
+ self.src_msg_runner.instance_delete_everywhere(self.ctxt,
+ instance, 'soft')
+
+ def test_instance_fault_create_at_top(self):
+ fake_instance_fault = {'id': 1,
+ 'other stuff': 2,
+ 'more stuff': 3}
+ expected_instance_fault = {'other stuff': 2,
+ 'more stuff': 3}
+
+ # Shouldn't be called for these 2 cells
+ self.mox.StubOutWithMock(self.src_db_inst, 'instance_fault_create')
+ self.mox.StubOutWithMock(self.mid_db_inst, 'instance_fault_create')
+
+ self.mox.StubOutWithMock(self.tgt_db_inst, 'instance_fault_create')
+ self.tgt_db_inst.instance_fault_create(self.ctxt,
+ expected_instance_fault)
+ self.mox.ReplayAll()
+
+ self.src_msg_runner.instance_fault_create_at_top(self.ctxt,
+ fake_instance_fault)
+
+ def test_bw_usage_update_at_top(self):
+ fake_bw_update_info = {'uuid': 'fake_uuid',
+ 'mac': 'fake_mac',
+ 'start_period': 'fake_start_period',
+ 'bw_in': 'fake_bw_in',
+ 'bw_out': 'fake_bw_out',
+ 'last_ctr_in': 'fake_last_ctr_in',
+ 'last_ctr_out': 'fake_last_ctr_out',
+ 'last_refreshed': 'fake_last_refreshed'}
+
+ # Shouldn't be called for these 2 cells
+ self.mox.StubOutWithMock(self.src_db_inst, 'bw_usage_update')
+ self.mox.StubOutWithMock(self.mid_db_inst, 'bw_usage_update')
+
+ self.mox.StubOutWithMock(self.tgt_db_inst, 'bw_usage_update')
+ self.tgt_db_inst.bw_usage_update(self.ctxt, **fake_bw_update_info)
+
+ self.mox.ReplayAll()
+
+ self.src_msg_runner.bw_usage_update_at_top(self.ctxt,
+ fake_bw_update_info)
diff --git a/nova/tests/cells/test_cells_rpc_driver.py b/nova/tests/cells/test_cells_rpc_driver.py
new file mode 100644
index 000000000..a44fe9376
--- /dev/null
+++ b/nova/tests/cells/test_cells_rpc_driver.py
@@ -0,0 +1,218 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Tests For Cells RPC Communication Driver
+"""
+
+from nova.cells import messaging
+from nova.cells import rpc_driver
+from nova import context
+from nova.openstack.common import cfg
+from nova.openstack.common import rpc
+from nova.openstack.common.rpc import dispatcher as rpc_dispatcher
+from nova import test
+from nova.tests.cells import fakes
+
+CONF = cfg.CONF
+CONF.import_opt('rpc_driver_queue_base', 'nova.cells.rpc_driver',
+ group='cells')
+
+
+class CellsRPCDriverTestCase(test.TestCase):
+ """Test case for Cells communication via RPC."""
+
+ def setUp(self):
+ super(CellsRPCDriverTestCase, self).setUp()
+ fakes.init(self)
+ self.ctxt = context.RequestContext('fake', 'fake')
+ self.driver = rpc_driver.CellsRPCDriver()
+
+ def test_start_consumers(self):
+ self.flags(rpc_driver_queue_base='cells.intercell42', group='cells')
+ rpc_consumers = []
+ rpc_conns = []
+ fake_msg_runner = fakes.get_message_runner('api-cell')
+ call_info = {}
+
+ class FakeInterCellRPCDispatcher(object):
+ def __init__(_self, msg_runner):
+ self.assertEqual(fake_msg_runner, msg_runner)
+ call_info['intercell_dispatcher'] = _self
+
+ class FakeRPCDispatcher(object):
+ def __init__(_self, proxy_objs):
+ self.assertEqual([call_info['intercell_dispatcher']],
+ proxy_objs)
+ call_info['rpc_dispatcher'] = _self
+
+ class FakeRPCConn(object):
+ def create_consumer(_self, topic, proxy_obj, **kwargs):
+ self.assertEqual(call_info['rpc_dispatcher'], proxy_obj)
+ rpc_consumers.append((topic, kwargs))
+
+ def consume_in_thread(_self):
+ pass
+
+ def _fake_create_connection(new):
+ self.assertTrue(new)
+ fake_conn = FakeRPCConn()
+ rpc_conns.append(fake_conn)
+ return fake_conn
+
+ self.stubs.Set(rpc, 'create_connection', _fake_create_connection)
+ self.stubs.Set(rpc_driver, 'InterCellRPCDispatcher',
+ FakeInterCellRPCDispatcher)
+ self.stubs.Set(rpc_dispatcher, 'RpcDispatcher', FakeRPCDispatcher)
+
+ self.driver.start_consumers(fake_msg_runner)
+
+ for message_type in ['broadcast', 'response', 'targeted']:
+ topic = 'cells.intercell42.' + message_type
+ self.assertIn((topic, {'fanout': True}), rpc_consumers)
+ self.assertIn((topic, {'fanout': False}), rpc_consumers)
+ self.assertEqual(rpc_conns, self.driver.rpc_connections)
+
+ def test_stop_consumers(self):
+ call_info = {'closed': []}
+
+ class FakeRPCConn(object):
+ def close(self):
+ call_info['closed'].append(self)
+
+ fake_conns = [FakeRPCConn() for x in xrange(5)]
+ self.driver.rpc_connections = fake_conns
+ self.driver.stop_consumers()
+ self.assertEqual(fake_conns, call_info['closed'])
+
+ def test_send_message_to_cell_cast(self):
+ msg_runner = fakes.get_message_runner('api-cell')
+ cell_state = fakes.get_cell_state('api-cell', 'child-cell2')
+ message = messaging._TargetedMessage(msg_runner,
+ self.ctxt, 'fake', 'fake', 'down', cell_state, fanout=False)
+
+ call_info = {}
+
+ def _fake_make_msg(method, **kwargs):
+ call_info['rpc_method'] = method
+ call_info['rpc_kwargs'] = kwargs
+ return 'fake-message'
+
+ def _fake_cast_to_server(*args, **kwargs):
+ call_info['cast_args'] = args
+ call_info['cast_kwargs'] = kwargs
+
+ self.stubs.Set(rpc, 'cast_to_server', _fake_cast_to_server)
+ self.stubs.Set(self.driver.intercell_rpcapi, 'make_msg',
+ _fake_make_msg)
+ self.stubs.Set(self.driver.intercell_rpcapi, 'cast_to_server',
+ _fake_cast_to_server)
+
+ self.driver.send_message_to_cell(cell_state, message)
+ expected_server_params = {'hostname': 'rpc_host2',
+ 'password': 'password2',
+ 'port': 'rpc_port2',
+ 'username': 'username2',
+ 'virtual_host': 'rpc_vhost2'}
+ expected_cast_args = (self.ctxt, expected_server_params,
+ 'fake-message')
+ expected_cast_kwargs = {'topic': 'cells.intercell.targeted'}
+ expected_rpc_kwargs = {'message': message.to_json()}
+ self.assertEqual(expected_cast_args, call_info['cast_args'])
+ self.assertEqual(expected_cast_kwargs, call_info['cast_kwargs'])
+ self.assertEqual('process_message', call_info['rpc_method'])
+ self.assertEqual(expected_rpc_kwargs, call_info['rpc_kwargs'])
+
+ def test_send_message_to_cell_fanout_cast(self):
+ msg_runner = fakes.get_message_runner('api-cell')
+ cell_state = fakes.get_cell_state('api-cell', 'child-cell2')
+ message = messaging._TargetedMessage(msg_runner,
+ self.ctxt, 'fake', 'fake', 'down', cell_state, fanout=True)
+
+ call_info = {}
+
+ def _fake_make_msg(method, **kwargs):
+ call_info['rpc_method'] = method
+ call_info['rpc_kwargs'] = kwargs
+ return 'fake-message'
+
+ def _fake_fanout_cast_to_server(*args, **kwargs):
+ call_info['cast_args'] = args
+ call_info['cast_kwargs'] = kwargs
+
+ self.stubs.Set(rpc, 'fanout_cast_to_server',
+ _fake_fanout_cast_to_server)
+ self.stubs.Set(self.driver.intercell_rpcapi, 'make_msg',
+ _fake_make_msg)
+ self.stubs.Set(self.driver.intercell_rpcapi,
+ 'fanout_cast_to_server', _fake_fanout_cast_to_server)
+
+ self.driver.send_message_to_cell(cell_state, message)
+ expected_server_params = {'hostname': 'rpc_host2',
+ 'password': 'password2',
+ 'port': 'rpc_port2',
+ 'username': 'username2',
+ 'virtual_host': 'rpc_vhost2'}
+ expected_cast_args = (self.ctxt, expected_server_params,
+ 'fake-message')
+ expected_cast_kwargs = {'topic': 'cells.intercell.targeted'}
+ expected_rpc_kwargs = {'message': message.to_json()}
+ self.assertEqual(expected_cast_args, call_info['cast_args'])
+ self.assertEqual(expected_cast_kwargs, call_info['cast_kwargs'])
+ self.assertEqual('process_message', call_info['rpc_method'])
+ self.assertEqual(expected_rpc_kwargs, call_info['rpc_kwargs'])
+
+ def test_rpc_topic_uses_message_type(self):
+ self.flags(rpc_driver_queue_base='cells.intercell42', group='cells')
+ msg_runner = fakes.get_message_runner('api-cell')
+ cell_state = fakes.get_cell_state('api-cell', 'child-cell2')
+ message = messaging._BroadcastMessage(msg_runner,
+ self.ctxt, 'fake', 'fake', 'down', fanout=True)
+ message.message_type = 'fake-message-type'
+
+ call_info = {}
+
+ def _fake_fanout_cast_to_server(*args, **kwargs):
+ call_info['topic'] = kwargs.get('topic')
+
+ self.stubs.Set(self.driver.intercell_rpcapi,
+ 'fanout_cast_to_server', _fake_fanout_cast_to_server)
+
+ self.driver.send_message_to_cell(cell_state, message)
+ self.assertEqual('cells.intercell42.fake-message-type',
+ call_info['topic'])
+
+ def test_process_message(self):
+ msg_runner = fakes.get_message_runner('api-cell')
+ dispatcher = rpc_driver.InterCellRPCDispatcher(msg_runner)
+ message = messaging._BroadcastMessage(msg_runner,
+ self.ctxt, 'fake', 'fake', 'down', fanout=True)
+
+ call_info = {}
+
+ def _fake_message_from_json(json_message):
+ call_info['json_message'] = json_message
+ self.assertEqual(message.to_json(), json_message)
+ return message
+
+ def _fake_process():
+ call_info['process_called'] = True
+
+ self.stubs.Set(msg_runner, 'message_from_json',
+ _fake_message_from_json)
+ self.stubs.Set(message, 'process', _fake_process)
+
+ dispatcher.process_message(self.ctxt, message.to_json())
+ self.assertEqual(message.to_json(), call_info['json_message'])
+ self.assertTrue(call_info['process_called'])
diff --git a/nova/tests/cells/test_cells_rpcapi.py b/nova/tests/cells/test_cells_rpcapi.py
new file mode 100644
index 000000000..b51bfa0c1
--- /dev/null
+++ b/nova/tests/cells/test_cells_rpcapi.py
@@ -0,0 +1,206 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Tests For Cells RPCAPI
+"""
+
+from nova.cells import rpcapi as cells_rpcapi
+from nova.openstack.common import cfg
+from nova.openstack.common import rpc
+from nova import test
+
+CONF = cfg.CONF
+CONF.import_opt('topic', 'nova.cells.opts', group='cells')
+
+
+class CellsAPITestCase(test.TestCase):
+ """Test case for cells.api interfaces."""
+
+ def setUp(self):
+ super(CellsAPITestCase, self).setUp()
+ self.fake_topic = 'fake_topic'
+ self.fake_context = 'fake_context'
+ self.flags(topic=self.fake_topic, enable=True, group='cells')
+ self.cells_rpcapi = cells_rpcapi.CellsAPI()
+
+ def _stub_rpc_method(self, rpc_method, result):
+ call_info = {}
+
+ def fake_rpc_method(ctxt, topic, msg, *args, **kwargs):
+ call_info['context'] = ctxt
+ call_info['topic'] = topic
+ call_info['msg'] = msg
+ return result
+
+ self.stubs.Set(rpc, rpc_method, fake_rpc_method)
+ return call_info
+
+ def _check_result(self, call_info, method, args, version=None):
+ if version is None:
+ version = self.cells_rpcapi.BASE_RPC_API_VERSION
+ self.assertEqual(self.fake_context, call_info['context'])
+ self.assertEqual(self.fake_topic, call_info['topic'])
+ self.assertEqual(method, call_info['msg']['method'])
+ self.assertEqual(version, call_info['msg']['version'])
+ self.assertEqual(args, call_info['msg']['args'])
+
+ def test_cast_compute_api_method(self):
+ fake_cell_name = 'fake_cell_name'
+ fake_method = 'fake_method'
+ fake_method_args = (1, 2)
+ fake_method_kwargs = {'kwarg1': 10, 'kwarg2': 20}
+
+ expected_method_info = {'method': fake_method,
+ 'method_args': fake_method_args,
+ 'method_kwargs': fake_method_kwargs}
+ expected_args = {'method_info': expected_method_info,
+ 'cell_name': fake_cell_name,
+ 'call': False}
+
+ call_info = self._stub_rpc_method('cast', None)
+
+ self.cells_rpcapi.cast_compute_api_method(self.fake_context,
+ fake_cell_name, fake_method,
+ *fake_method_args, **fake_method_kwargs)
+ self._check_result(call_info, 'run_compute_api_method',
+ expected_args)
+
+ def test_call_compute_api_method(self):
+ fake_cell_name = 'fake_cell_name'
+ fake_method = 'fake_method'
+ fake_method_args = (1, 2)
+ fake_method_kwargs = {'kwarg1': 10, 'kwarg2': 20}
+ fake_response = 'fake_response'
+
+ expected_method_info = {'method': fake_method,
+ 'method_args': fake_method_args,
+ 'method_kwargs': fake_method_kwargs}
+ expected_args = {'method_info': expected_method_info,
+ 'cell_name': fake_cell_name,
+ 'call': True}
+
+ call_info = self._stub_rpc_method('call', fake_response)
+
+ result = self.cells_rpcapi.call_compute_api_method(self.fake_context,
+ fake_cell_name, fake_method,
+ *fake_method_args, **fake_method_kwargs)
+ self._check_result(call_info, 'run_compute_api_method',
+ expected_args)
+ self.assertEqual(fake_response, result)
+
+ def test_schedule_run_instance(self):
+ call_info = self._stub_rpc_method('cast', None)
+
+ self.cells_rpcapi.schedule_run_instance(
+ self.fake_context, arg1=1, arg2=2, arg3=3)
+
+ expected_args = {'host_sched_kwargs': {'arg1': 1,
+ 'arg2': 2,
+ 'arg3': 3}}
+ self._check_result(call_info, 'schedule_run_instance',
+ expected_args)
+
+ def test_instance_update_at_top(self):
+ fake_info_cache = {'id': 1,
+ 'instance': 'fake_instance',
+ 'other': 'moo'}
+ fake_sys_metadata = [{'id': 1,
+ 'key': 'key1',
+ 'value': 'value1'},
+ {'id': 2,
+ 'key': 'key2',
+ 'value': 'value2'}]
+ fake_instance = {'id': 2,
+ 'security_groups': 'fake',
+ 'instance_type': 'fake',
+ 'volumes': 'fake',
+ 'cell_name': 'fake',
+ 'name': 'fake',
+ 'metadata': 'fake',
+ 'info_cache': fake_info_cache,
+ 'system_metadata': fake_sys_metadata,
+ 'other': 'meow'}
+
+ call_info = self._stub_rpc_method('cast', None)
+
+ self.cells_rpcapi.instance_update_at_top(
+ self.fake_context, fake_instance)
+
+ expected_args = {'instance': fake_instance}
+ self._check_result(call_info, 'instance_update_at_top',
+ expected_args)
+
+ def test_instance_destroy_at_top(self):
+ fake_instance = {'uuid': 'fake-uuid'}
+
+ call_info = self._stub_rpc_method('cast', None)
+
+ self.cells_rpcapi.instance_destroy_at_top(
+ self.fake_context, fake_instance)
+
+ expected_args = {'instance': fake_instance}
+ self._check_result(call_info, 'instance_destroy_at_top',
+ expected_args)
+
+ def test_instance_delete_everywhere(self):
+ fake_instance = {'uuid': 'fake-uuid'}
+
+ call_info = self._stub_rpc_method('cast', None)
+
+ self.cells_rpcapi.instance_delete_everywhere(
+ self.fake_context, fake_instance,
+ 'fake-type')
+
+ expected_args = {'instance': fake_instance,
+ 'delete_type': 'fake-type'}
+ self._check_result(call_info, 'instance_delete_everywhere',
+ expected_args)
+
+ def test_instance_fault_create_at_top(self):
+ fake_instance_fault = {'id': 2,
+ 'other': 'meow'}
+
+ call_info = self._stub_rpc_method('cast', None)
+
+ self.cells_rpcapi.instance_fault_create_at_top(
+ self.fake_context, fake_instance_fault)
+
+ expected_args = {'instance_fault': fake_instance_fault}
+ self._check_result(call_info, 'instance_fault_create_at_top',
+ expected_args)
+
+ def test_bw_usage_update_at_top(self):
+ update_args = ('fake_uuid', 'fake_mac', 'fake_start_period',
+ 'fake_bw_in', 'fake_bw_out', 'fake_ctr_in',
+ 'fake_ctr_out')
+ update_kwargs = {'last_refreshed': 'fake_refreshed'}
+
+ call_info = self._stub_rpc_method('cast', None)
+
+ self.cells_rpcapi.bw_usage_update_at_top(
+ self.fake_context, *update_args, **update_kwargs)
+
+ bw_update_info = {'uuid': 'fake_uuid',
+ 'mac': 'fake_mac',
+ 'start_period': 'fake_start_period',
+ 'bw_in': 'fake_bw_in',
+ 'bw_out': 'fake_bw_out',
+ 'last_ctr_in': 'fake_ctr_in',
+ 'last_ctr_out': 'fake_ctr_out',
+ 'last_refreshed': 'fake_refreshed'}
+
+ expected_args = {'bw_update_info': bw_update_info}
+ self._check_result(call_info, 'bw_usage_update_at_top',
+ expected_args)
diff --git a/nova/tests/cells/test_cells_scheduler.py b/nova/tests/cells/test_cells_scheduler.py
new file mode 100644
index 000000000..66e7e245e
--- /dev/null
+++ b/nova/tests/cells/test_cells_scheduler.py
@@ -0,0 +1,206 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Tests For CellsScheduler
+"""
+import time
+
+from nova.compute import vm_states
+from nova import context
+from nova import db
+from nova import exception
+from nova.openstack.common import cfg
+from nova.openstack.common import uuidutils
+from nova import test
+from nova.tests.cells import fakes
+
+CONF = cfg.CONF
+CONF.import_opt('scheduler_retries', 'nova.cells.scheduler', group='cells')
+
+
+class CellsSchedulerTestCase(test.TestCase):
+ """Test case for CellsScheduler class"""
+
+ def setUp(self):
+ super(CellsSchedulerTestCase, self).setUp()
+ fakes.init(self)
+ self.msg_runner = fakes.get_message_runner('api-cell')
+ self.scheduler = self.msg_runner.scheduler
+ self.state_manager = self.msg_runner.state_manager
+ self.my_cell_state = self.state_manager.get_my_state()
+ self.ctxt = context.RequestContext('fake', 'fake')
+ instance_uuids = []
+ for x in xrange(3):
+ instance_uuids.append(uuidutils.generate_uuid())
+ self.instance_uuids = instance_uuids
+ self.request_spec = {'instance_uuids': instance_uuids,
+ 'other': 'stuff'}
+
+ def test_create_instances_here(self):
+ # Just grab the first instance type
+ inst_type = db.instance_type_get(self.ctxt, 1)
+ image = {'properties': {}}
+ instance_props = {'hostname': 'meow',
+ 'display_name': 'moo',
+ 'image_ref': 'fake_image_ref',
+ 'user_id': self.ctxt.user_id,
+ 'project_id': self.ctxt.project_id}
+ request_spec = {'instance_type': inst_type,
+ 'image': image,
+ 'security_group': ['default'],
+ 'block_device_mapping': [],
+ 'instance_properties': instance_props,
+ 'instance_uuids': self.instance_uuids}
+
+ call_info = {'uuids': []}
+
+ def _fake_instance_update_at_top(_ctxt, instance):
+ call_info['uuids'].append(instance['uuid'])
+
+ self.stubs.Set(self.msg_runner, 'instance_update_at_top',
+ _fake_instance_update_at_top)
+
+ self.scheduler._create_instances_here(self.ctxt, request_spec)
+ self.assertEqual(self.instance_uuids, call_info['uuids'])
+
+ for instance_uuid in self.instance_uuids:
+ instance = db.instance_get_by_uuid(self.ctxt, instance_uuid)
+ self.assertEqual('meow', instance['hostname'])
+ self.assertEqual('moo', instance['display_name'])
+ self.assertEqual('fake_image_ref', instance['image_ref'])
+
+ def test_run_instance_selects_child_cell(self):
+ # Make sure there's no capacity info so we're sure to
+ # select a child cell
+ our_cell_info = self.state_manager.get_my_state()
+ our_cell_info.capacities = {}
+
+ call_info = {'times': 0}
+
+ orig_fn = self.msg_runner.schedule_run_instance
+
+ def msg_runner_schedule_run_instance(ctxt, target_cell,
+ host_sched_kwargs):
+ # This gets called twice. Once for our running it
+ # in this cell.. and then it'll get called when the
+ # child cell is picked. So, first time.. just run it
+ # like normal.
+ if not call_info['times']:
+ call_info['times'] += 1
+ return orig_fn(ctxt, target_cell, host_sched_kwargs)
+ call_info['ctxt'] = ctxt
+ call_info['target_cell'] = target_cell
+ call_info['host_sched_kwargs'] = host_sched_kwargs
+
+ self.stubs.Set(self.msg_runner, 'schedule_run_instance',
+ msg_runner_schedule_run_instance)
+
+ host_sched_kwargs = {'request_spec': self.request_spec}
+ self.msg_runner.schedule_run_instance(self.ctxt,
+ self.my_cell_state, host_sched_kwargs)
+
+ self.assertEqual(self.ctxt, call_info['ctxt'])
+ self.assertEqual(host_sched_kwargs, call_info['host_sched_kwargs'])
+ child_cells = self.state_manager.get_child_cells()
+ self.assertIn(call_info['target_cell'], child_cells)
+
+ def test_run_instance_selects_current_cell(self):
+ # Make sure there's no child cells so that we will be
+ # selected
+ self.state_manager.child_cells = {}
+
+ call_info = {}
+
+ def fake_create_instances_here(ctxt, request_spec):
+ call_info['ctxt'] = ctxt
+ call_info['request_spec'] = request_spec
+
+ def fake_rpc_run_instance(ctxt, **host_sched_kwargs):
+ call_info['host_sched_kwargs'] = host_sched_kwargs
+
+ self.stubs.Set(self.scheduler, '_create_instances_here',
+ fake_create_instances_here)
+ self.stubs.Set(self.scheduler.scheduler_rpcapi,
+ 'run_instance', fake_rpc_run_instance)
+
+ host_sched_kwargs = {'request_spec': self.request_spec,
+ 'other': 'stuff'}
+ self.msg_runner.schedule_run_instance(self.ctxt,
+ self.my_cell_state, host_sched_kwargs)
+
+ self.assertEqual(self.ctxt, call_info['ctxt'])
+ self.assertEqual(self.request_spec, call_info['request_spec'])
+ self.assertEqual(host_sched_kwargs, call_info['host_sched_kwargs'])
+
+ def test_run_instance_retries_when_no_cells_avail(self):
+ self.flags(scheduler_retries=7, group='cells')
+
+ host_sched_kwargs = {'request_spec': self.request_spec}
+
+ call_info = {'num_tries': 0, 'errored_uuids': []}
+
+ def fake_run_instance(message, host_sched_kwargs):
+ call_info['num_tries'] += 1
+ raise exception.NoCellsAvailable()
+
+ def fake_sleep(_secs):
+ return
+
+ def fake_instance_update(ctxt, instance_uuid, values):
+ self.assertEqual(vm_states.ERROR, values['vm_state'])
+ call_info['errored_uuids'].append(instance_uuid)
+
+ self.stubs.Set(self.scheduler, '_run_instance', fake_run_instance)
+ self.stubs.Set(time, 'sleep', fake_sleep)
+ self.stubs.Set(db, 'instance_update', fake_instance_update)
+
+ self.msg_runner.schedule_run_instance(self.ctxt,
+ self.my_cell_state, host_sched_kwargs)
+
+ self.assertEqual(8, call_info['num_tries'])
+ self.assertEqual(self.instance_uuids, call_info['errored_uuids'])
+
+ def test_run_instance_on_random_exception(self):
+ self.flags(scheduler_retries=7, group='cells')
+
+ host_sched_kwargs = {'request_spec': self.request_spec}
+
+ call_info = {'num_tries': 0,
+ 'errored_uuids1': [],
+ 'errored_uuids2': []}
+
+ def fake_run_instance(message, host_sched_kwargs):
+ call_info['num_tries'] += 1
+ raise test.TestingException()
+
+ def fake_instance_update(ctxt, instance_uuid, values):
+ self.assertEqual(vm_states.ERROR, values['vm_state'])
+ call_info['errored_uuids1'].append(instance_uuid)
+
+ def fake_instance_update_at_top(ctxt, instance):
+ self.assertEqual(vm_states.ERROR, instance['vm_state'])
+ call_info['errored_uuids2'].append(instance['uuid'])
+
+ self.stubs.Set(self.scheduler, '_run_instance', fake_run_instance)
+ self.stubs.Set(db, 'instance_update', fake_instance_update)
+ self.stubs.Set(self.msg_runner, 'instance_update_at_top',
+ fake_instance_update_at_top)
+
+ self.msg_runner.schedule_run_instance(self.ctxt,
+ self.my_cell_state, host_sched_kwargs)
+ # Shouldn't retry
+ self.assertEqual(1, call_info['num_tries'])
+ self.assertEqual(self.instance_uuids, call_info['errored_uuids1'])
+ self.assertEqual(self.instance_uuids, call_info['errored_uuids2'])
diff --git a/nova/tests/compute/test_compute.py b/nova/tests/compute/test_compute.py
index d335f6675..104fd4e68 100644
--- a/nova/tests/compute/test_compute.py
+++ b/nova/tests/compute/test_compute.py
@@ -3696,8 +3696,12 @@ class ComputeAPITestCase(BaseTestCase):
{'vm_state': vm_states.SOFT_DELETED,
'task_state': None})
+ # Ensure quotas are committed
self.mox.StubOutWithMock(nova.quota.QUOTAS, 'commit')
nova.quota.QUOTAS.commit(mox.IgnoreArg(), mox.IgnoreArg())
+ if self.__class__.__name__ == 'CellsComputeAPITestCase':
+ # Called a 2nd time (for the child cell) when testing cells
+ nova.quota.QUOTAS.commit(mox.IgnoreArg(), mox.IgnoreArg())
self.mox.ReplayAll()
self.compute_api.restore(self.context, instance)
diff --git a/nova/tests/compute/test_compute_cells.py b/nova/tests/compute/test_compute_cells.py
new file mode 100644
index 000000000..aa4b448d4
--- /dev/null
+++ b/nova/tests/compute/test_compute_cells.py
@@ -0,0 +1,99 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Tests For Compute w/ Cells
+"""
+from nova.compute import cells_api as compute_cells_api
+from nova.openstack.common import log as logging
+from nova.tests.compute import test_compute
+
+
+LOG = logging.getLogger('nova.tests.test_compute_cells')
+
+ORIG_COMPUTE_API = None
+
+
+def stub_call_to_cells(context, instance, method, *args, **kwargs):
+ fn = getattr(ORIG_COMPUTE_API, method)
+ return fn(context, instance, *args, **kwargs)
+
+
+def stub_cast_to_cells(context, instance, method, *args, **kwargs):
+ fn = getattr(ORIG_COMPUTE_API, method)
+ fn(context, instance, *args, **kwargs)
+
+
+def deploy_stubs(stubs, api):
+ stubs.Set(api, '_call_to_cells', stub_call_to_cells)
+ stubs.Set(api, '_cast_to_cells', stub_cast_to_cells)
+
+
+class CellsComputeAPITestCase(test_compute.ComputeAPITestCase):
+ def setUp(self):
+ super(CellsComputeAPITestCase, self).setUp()
+ global ORIG_COMPUTE_API
+ ORIG_COMPUTE_API = self.compute_api
+
+ def _fake_cell_read_only(*args, **kwargs):
+ return False
+
+ def _fake_validate_cell(*args, **kwargs):
+ return
+
+ def _nop_update(context, instance, **kwargs):
+ return instance
+
+ self.compute_api = compute_cells_api.ComputeCellsAPI()
+ self.stubs.Set(self.compute_api, '_cell_read_only',
+ _fake_cell_read_only)
+ self.stubs.Set(self.compute_api, '_validate_cell',
+ _fake_validate_cell)
+
+ # NOTE(belliott) Don't update the instance state
+ # for the tests at the API layer. Let it happen after
+ # the stub cast to cells so that expected_task_states
+ # match.
+ self.stubs.Set(self.compute_api, 'update', _nop_update)
+
+ deploy_stubs(self.stubs, self.compute_api)
+
+ def tearDown(self):
+ global ORIG_COMPUTE_API
+ self.compute_api = ORIG_COMPUTE_API
+ super(CellsComputeAPITestCase, self).tearDown()
+
+ def test_instance_metadata(self):
+ self.skipTest("Test is incompatible with cells.")
+
+ def test_live_migrate(self):
+ self.skipTest("Test is incompatible with cells.")
+
+ def test_get_backdoor_port(self):
+ self.skipTest("Test is incompatible with cells.")
+
+
+class CellsComputePolicyTestCase(test_compute.ComputePolicyTestCase):
+ def setUp(self):
+ super(CellsComputePolicyTestCase, self).setUp()
+ global ORIG_COMPUTE_API
+ ORIG_COMPUTE_API = self.compute_api
+ self.compute_api = compute_cells_api.ComputeCellsAPI()
+ deploy_stubs(self.stubs, self.compute_api)
+
+ def tearDown(self):
+ global ORIG_COMPUTE_API
+ self.compute_api = ORIG_COMPUTE_API
+ super(CellsComputePolicyTestCase, self).tearDown()
diff --git a/setup.py b/setup.py
index f3da54618..e13ae4f64 100644
--- a/setup.py
+++ b/setup.py
@@ -50,6 +50,7 @@ setuptools.setup(name='nova',
'bin/nova-api-metadata',
'bin/nova-api-os-compute',
'bin/nova-rpc-zmq-receiver',
+ 'bin/nova-cells',
'bin/nova-cert',
'bin/nova-clear-rabbit-queues',
'bin/nova-compute',