diff options
-rwxr-xr-x | bin/nova-cells | 53 | ||||
-rw-r--r-- | nova/cells/__init__.py | 19 | ||||
-rw-r--r-- | nova/cells/driver.py | 41 | ||||
-rw-r--r-- | nova/cells/manager.py | 136 | ||||
-rw-r--r-- | nova/cells/messaging.py | 1047 | ||||
-rw-r--r-- | nova/cells/opts.py | 44 | ||||
-rw-r--r-- | nova/cells/rpc_driver.py | 165 | ||||
-rw-r--r-- | nova/cells/rpcapi.py | 138 | ||||
-rw-r--r-- | nova/cells/scheduler.py | 136 | ||||
-rw-r--r-- | nova/cells/state.py | 346 | ||||
-rw-r--r-- | nova/compute/api.py | 8 | ||||
-rw-r--r-- | nova/compute/cells_api.py | 471 | ||||
-rw-r--r-- | nova/db/api.py | 69 | ||||
-rw-r--r-- | nova/exception.py | 28 | ||||
-rw-r--r-- | nova/tests/cells/__init__.py | 19 | ||||
-rw-r--r-- | nova/tests/cells/fakes.py | 191 | ||||
-rw-r--r-- | nova/tests/cells/test_cells_manager.py | 151 | ||||
-rw-r--r-- | nova/tests/cells/test_cells_messaging.py | 913 | ||||
-rw-r--r-- | nova/tests/cells/test_cells_rpc_driver.py | 218 | ||||
-rw-r--r-- | nova/tests/cells/test_cells_rpcapi.py | 206 | ||||
-rw-r--r-- | nova/tests/cells/test_cells_scheduler.py | 206 | ||||
-rw-r--r-- | nova/tests/compute/test_compute.py | 4 | ||||
-rw-r--r-- | nova/tests/compute/test_compute_cells.py | 99 | ||||
-rw-r--r-- | setup.py | 1 |
24 files changed, 4696 insertions, 13 deletions
diff --git a/bin/nova-cells b/bin/nova-cells new file mode 100755 index 000000000..a7e16ef53 --- /dev/null +++ b/bin/nova-cells @@ -0,0 +1,53 @@ +#!/usr/bin/env python +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright (c) 2012 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Starter script for Nova Cells Service.""" + +import eventlet +eventlet.monkey_patch() + +import os +import sys + +# If ../nova/__init__.py exists, add ../ to Python search path, so that +# it will override what happens to be installed in /usr/(local/)lib/python... +possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), + os.pardir, + os.pardir)) +if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')): + sys.path.insert(0, possible_topdir) + +from nova import config +from nova.openstack.common import cfg +from nova.openstack.common import log as logging +from nova import service +from nova import utils + +CONF = cfg.CONF +CONF.import_opt('topic', 'nova.cells.opts', group='cells') +CONF.import_opt('manager', 'nova.cells.opts', group='cells') + +if __name__ == '__main__': + config.parse_args(sys.argv) + logging.setup('nova') + utils.monkey_patch() + server = service.Service.create(binary='nova-cells', + topic=CONF.cells.topic, + manager=CONF.cells.manager) + service.serve(server) + service.wait() diff --git a/nova/cells/__init__.py b/nova/cells/__init__.py new file mode 100644 index 000000000..47d21a14b --- /dev/null +++ b/nova/cells/__init__.py @@ -0,0 +1,19 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2012 Rackspace Hosting +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Cells +""" diff --git a/nova/cells/driver.py b/nova/cells/driver.py new file mode 100644 index 000000000..04e29dddf --- /dev/null +++ b/nova/cells/driver.py @@ -0,0 +1,41 @@ +# Copyright (c) 2012 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Base Cells Communication Driver +""" + + +class BaseCellsDriver(object): + """The base class for cells communication. + + One instance of this class will be created for every neighbor cell + that we find in the DB and it will be associated with the cell in + its CellState. + + One instance is also created by the cells manager for setting up + the consumers. + """ + def start_consumers(self, msg_runner): + """Start any consumers the driver may need.""" + raise NotImplementedError() + + def stop_consumers(self): + """Stop consuming messages.""" + raise NotImplementedError() + + def send_message_to_cell(self, cell_state, message): + """Send a message to a cell.""" + raise NotImplementedError() diff --git a/nova/cells/manager.py b/nova/cells/manager.py new file mode 100644 index 000000000..a1352601c --- /dev/null +++ b/nova/cells/manager.py @@ -0,0 +1,136 @@ +# Copyright (c) 2012 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Cells Service Manager +""" + +from nova.cells import messaging +from nova.cells import state as cells_state +from nova import context +from nova import manager +from nova.openstack.common import cfg +from nova.openstack.common import importutils +from nova.openstack.common import log as logging + +cell_manager_opts = [ + cfg.StrOpt('driver', + default='nova.cells.rpc_driver.CellsRPCDriver', + help='Cells communication driver to use'), +] + + +LOG = logging.getLogger(__name__) + +CONF = cfg.CONF +CONF.register_opts(cell_manager_opts, group='cells') + + +class CellsManager(manager.Manager): + """The nova-cells manager class. This class defines RPC + methods that the local cell may call. This class is NOT used for + messages coming from other cells. That communication is + driver-specific. + + Communication to other cells happens via the messaging module. The + MessageRunner from that module will handle routing the message to + the correct cell via the communications driver. Most methods below + create 'targeted' (where we want to route a message to a specific cell) + or 'broadcast' (where we want a message to go to multiple cells) + messages. + + Scheduling requests get passed to the scheduler class. + """ + RPC_API_VERSION = '1.0' + + def __init__(self, *args, **kwargs): + # Mostly for tests. + cell_state_manager = kwargs.pop('cell_state_manager', None) + super(CellsManager, self).__init__(*args, **kwargs) + if cell_state_manager is None: + cell_state_manager = cells_state.CellStateManager + self.state_manager = cell_state_manager() + self.msg_runner = messaging.MessageRunner(self.state_manager) + cells_driver_cls = importutils.import_class( + CONF.cells.driver) + self.driver = cells_driver_cls() + + def post_start_hook(self): + """Have the driver start its consumers for inter-cell communication. + Also ask our child cells for their capacities and capabilities so + we get them more quickly than just waiting for the next periodic + update. Receiving the updates from the children will cause us to + update our parents. If we don't have any children, just update + our parents immediately. + """ + # FIXME(comstud): There's currently no hooks when services are + # stopping, so we have no way to stop consumers cleanly. + self.driver.start_consumers(self.msg_runner) + ctxt = context.get_admin_context() + if self.state_manager.get_child_cells(): + self.msg_runner.ask_children_for_capabilities(ctxt) + self.msg_runner.ask_children_for_capacities(ctxt) + else: + self._update_our_parents(ctxt) + + @manager.periodic_task + def _update_our_parents(self, ctxt): + """Update our parent cells with our capabilities and capacity + if we're at the bottom of the tree. + """ + self.msg_runner.tell_parents_our_capabilities(ctxt) + self.msg_runner.tell_parents_our_capacities(ctxt) + + def schedule_run_instance(self, ctxt, host_sched_kwargs): + """Pick a cell (possibly ourselves) to build new instance(s) + and forward the request accordingly. + """ + # Target is ourselves first. + our_cell = self.state_manager.get_my_state() + self.msg_runner.schedule_run_instance(ctxt, our_cell, + host_sched_kwargs) + + def run_compute_api_method(self, ctxt, cell_name, method_info, call): + """Call a compute API method in a specific cell.""" + response = self.msg_runner.run_compute_api_method(ctxt, + cell_name, + method_info, + call) + if call: + return response.value_or_raise() + + def instance_update_at_top(self, ctxt, instance): + """Update an instance at the top level cell.""" + self.msg_runner.instance_update_at_top(ctxt, instance) + + def instance_destroy_at_top(self, ctxt, instance): + """Destroy an instance at the top level cell.""" + self.msg_runner.instance_destroy_at_top(ctxt, instance) + + def instance_delete_everywhere(self, ctxt, instance, delete_type): + """This is used by API cell when it didn't know what cell + an instance was in, but the instance was requested to be + deleted or soft_deleted. So, we'll broadcast this everywhere. + """ + self.msg_runner.instance_delete_everywhere(ctxt, instance, + delete_type) + + def instance_fault_create_at_top(self, ctxt, instance_fault): + """Create an instance fault at the top level cell.""" + self.msg_runner.instance_fault_create_at_top(ctxt, instance_fault) + + def bw_usage_update_at_top(self, ctxt, bw_update_info): + """Update bandwidth usage at top level cell.""" + self.msg_runner.bw_usage_update_at_top(ctxt, bw_update_info) diff --git a/nova/cells/messaging.py b/nova/cells/messaging.py new file mode 100644 index 000000000..e5617e742 --- /dev/null +++ b/nova/cells/messaging.py @@ -0,0 +1,1047 @@ +# Copyright (c) 2012 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Cell messaging module. + +This module defines the different message types that are passed between +cells and the methods that they can call when the target cell has been +reached. + +The interface into this module is the MessageRunner class. +""" +import sys + +from eventlet import queue + +from nova.cells import state as cells_state +from nova import compute +from nova import context +from nova.db import base +from nova import exception +from nova.openstack.common import cfg +from nova.openstack.common import excutils +from nova.openstack.common import importutils +from nova.openstack.common import jsonutils +from nova.openstack.common import log as logging +from nova.openstack.common.rpc import common as rpc_common +from nova.openstack.common import uuidutils +from nova import utils + + +cell_messaging_opts = [ + cfg.IntOpt('max_hop_count', + default=10, + help='Maximum number of hops for cells routing.'), + cfg.StrOpt('scheduler', + default='nova.cells.scheduler.CellsScheduler', + help='Cells scheduler to use')] + +CONF = cfg.CONF +CONF.import_opt('name', 'nova.cells.opts', group='cells') +CONF.import_opt('call_timeout', 'nova.cells.opts', group='cells') +CONF.register_opts(cell_messaging_opts, group='cells') + +LOG = logging.getLogger(__name__) + +# Separator used between cell names for the 'full cell name' and routing +# path. +_PATH_CELL_SEP = '!' + + +def _reverse_path(path): + """Reverse a path. Used for sending responses upstream.""" + path_parts = path.split(_PATH_CELL_SEP) + path_parts.reverse() + return _PATH_CELL_SEP.join(path_parts) + + +def _response_cell_name_from_path(routing_path, neighbor_only=False): + """Reverse the routing_path. If we only want to send to our parent, + set neighbor_only to True. + """ + path = _reverse_path(routing_path) + if not neighbor_only or len(path) == 1: + return path + return _PATH_CELL_SEP.join(path.split(_PATH_CELL_SEP)[:2]) + + +# +# Message classes. +# + + +class _BaseMessage(object): + """Base message class. It defines data that is passed with every + single message through every cell. + + Messages are JSON-ified before sending and turned back into a + class instance when being received. + + Every message has a unique ID. This is used to route responses + back to callers. In the future, this might be used to detect + receiving the same message more than once. + + routing_path is updated on every hop through a cell. The current + cell name is appended to it (cells are separated by + _PATH_CELL_SEP ('!')). This is used to tell if we've reached the + target cell and also to determine the source of a message for + responses by reversing it. + + hop_count is incremented and compared against max_hop_count. The + only current usefulness of this is to break out of a routing loop + if someone has a broken config. + + fanout means to send to all nova-cells services running in a cell. + This is useful for capacity and capability broadcasting as well + as making sure responses get back to the nova-cells service that + is waiting. + """ + + # Override message_type in a subclass + message_type = None + + base_attrs_to_json = ['message_type', + 'ctxt', + 'method_name', + 'method_kwargs', + 'direction', + 'need_response', + 'fanout', + 'uuid', + 'routing_path', + 'hop_count', + 'max_hop_count'] + + def __init__(self, msg_runner, ctxt, method_name, method_kwargs, + direction, need_response=False, fanout=False, uuid=None, + routing_path=None, hop_count=0, max_hop_count=None, + **kwargs): + self.ctxt = ctxt + self.resp_queue = None + self.msg_runner = msg_runner + self.state_manager = msg_runner.state_manager + # Copy these. + self.base_attrs_to_json = self.base_attrs_to_json[:] + # Normally this would just be CONF.cells.name, but going through + # the msg_runner allows us to stub it more easily. + self.our_path_part = self.msg_runner.our_name + self.uuid = uuid + if self.uuid is None: + self.uuid = uuidutils.generate_uuid() + self.method_name = method_name + self.method_kwargs = method_kwargs + self.direction = direction + self.need_response = need_response + self.fanout = fanout + self.routing_path = routing_path + self.hop_count = hop_count + if max_hop_count is None: + max_hop_count = CONF.cells.max_hop_count + self.max_hop_count = max_hop_count + self.is_broadcast = False + self._append_hop() + # Each sub-class should set this when the message is inited + self.next_hops = [] + self.resp_queue = None + + def __repr__(self): + _dict = self._to_dict() + _dict.pop('method_kwargs') + return "<%s: %s>" % (self.__class__.__name__, _dict) + + def _append_hop(self): + """Add our hop to the routing_path.""" + routing_path = (self.routing_path and + self.routing_path + _PATH_CELL_SEP or '') + self.routing_path = routing_path + self.our_path_part + self.hop_count += 1 + + def _at_max_hop_count(self, do_raise=True): + """Check if we're at the max hop count. If we are and do_raise is + True, raise CellMaxHopCountReached. If we are at the max and + do_raise is False... return True, else False. + """ + if self.hop_count >= self.max_hop_count: + if do_raise: + raise exception.CellMaxHopCountReached( + hop_count=self.hop_count) + return True + return False + + def _process_locally(self): + """Its been determined that we should process this message in this + cell. Go through the MessageRunner to call the appropriate + method for this message. Catch the response and/or exception and + encode it within a Response instance. Return it so the caller + can potentially return it to another cell... or return it to + a caller waiting in this cell. + """ + try: + resp_value = self.msg_runner._process_message_locally(self) + failure = False + except Exception as exc: + resp_value = sys.exc_info() + failure = True + LOG.exception(_("Error processing message locally: %(exc)s"), + locals()) + return Response(self.routing_path, resp_value, failure) + + def _setup_response_queue(self): + """Shortcut to creating a response queue in the MessageRunner.""" + self.resp_queue = self.msg_runner._setup_response_queue(self) + + def _cleanup_response_queue(self): + """Shortcut to deleting a response queue in the MessageRunner.""" + if self.resp_queue: + self.msg_runner._cleanup_response_queue(self) + self.resp_queue = None + + def _wait_for_json_responses(self, num_responses=1): + """Wait for response(s) to be put into the eventlet queue. Since + each queue entry actually contains a list of JSON-ified responses, + combine them all into a single list to return. + + Destroy the eventlet queue when done. + """ + if not self.resp_queue: + # Source is not actually expecting a response + return + responses = [] + wait_time = CONF.cells.call_timeout + try: + for x in xrange(num_responses): + json_responses = self.resp_queue.get(timeout=wait_time) + responses.extend(json_responses) + except queue.Empty: + raise exception.CellTimeout() + finally: + self._cleanup_response_queue() + return responses + + def _send_json_responses(self, json_responses, neighbor_only=False, + fanout=False): + """Send list of responses to this message. Responses passed here + are JSON-ified. Targeted messages have a single response while + Broadcast messages may have multiple responses. + + If this cell was the source of the message, these responses will + be returned from self.process(). + + Otherwise, we will route the response to the source of the + request. If 'neighbor_only' is True, the response will be sent + to the neighbor cell, not the original requester. Broadcast + messages get aggregated at each hop, so neighbor_only will be + True for those messages. + """ + if not self.need_response: + return + if self.source_is_us(): + responses = [] + for json_response in json_responses: + responses.append(Response.from_json(json_response)) + return responses + direction = self.direction == 'up' and 'down' or 'up' + response_kwargs = {'orig_message': self.to_json(), + 'responses': json_responses} + target_cell = _response_cell_name_from_path(self.routing_path, + neighbor_only=neighbor_only) + response = self.msg_runner._create_response_message(self.ctxt, + direction, target_cell, self.uuid, response_kwargs, + fanout=fanout) + response.process() + + def _send_response(self, response, neighbor_only=False): + """Send a response to this message. If the source of the + request was ourselves, just return the response. It'll be + passed back to the caller of self.process(). See DocString for + _send_json_responses() as it handles most of the real work for + this method. + + 'response' is an instance of Response class. + """ + if not self.need_response: + return + if self.source_is_us(): + return response + self._send_json_responses([response.to_json()], + neighbor_only=neighbor_only) + + def _send_response_from_exception(self, exc_info): + """Take an exception as returned from sys.exc_info(), encode + it in a Response, and send it. + """ + response = Response(self.routing_path, exc_info, True) + return self._send_response(response) + + def _to_dict(self): + """Convert a message to a dictionary. Only used internally.""" + _dict = {} + for key in self.base_attrs_to_json: + _dict[key] = getattr(self, key) + return _dict + + def to_json(self): + """Convert a message into JSON for sending to a sibling cell.""" + _dict = self._to_dict() + # Convert context to dict. + _dict['ctxt'] = _dict['ctxt'].to_dict() + return jsonutils.dumps(_dict) + + def source_is_us(self): + """Did this cell create this message?""" + return self.routing_path == self.our_path_part + + def process(self): + """Process a message. Deal with it locally and/or forward it to a + sibling cell. + + Override in a subclass. + """ + raise NotImplementedError() + + +class _TargetedMessage(_BaseMessage): + """A targeted message is a message that is destined for a specific + single cell. + + 'target_cell' can be a full cell name like 'api!child-cell' or it can + be an instance of the CellState class if the target is a neighbor cell. + """ + message_type = 'targeted' + + def __init__(self, msg_runner, ctxt, method_name, method_kwargs, + direction, target_cell, **kwargs): + super(_TargetedMessage, self).__init__(msg_runner, ctxt, + method_name, method_kwargs, direction, **kwargs) + if isinstance(target_cell, cells_state.CellState): + # Neighbor cell or ourselves. Convert it to a 'full path'. + if target_cell.is_me: + target_cell = self.our_path_part + else: + target_cell = '%s%s%s' % (self.our_path_part, + _PATH_CELL_SEP, + target_cell.name) + self.target_cell = target_cell + self.base_attrs_to_json.append('target_cell') + + def _get_next_hop(self): + """Return the cell name for the next hop. If the next hop is + the current cell, return None. + """ + if self.target_cell == self.routing_path: + return self.state_manager.my_cell_state + target_cell = self.target_cell + routing_path = self.routing_path + current_hops = routing_path.count(_PATH_CELL_SEP) + next_hop_num = current_hops + 1 + dest_hops = target_cell.count(_PATH_CELL_SEP) + if dest_hops < current_hops: + reason = _("destination is %(target_cell)s but routing_path " + "is %(routing_path)s") % locals() + raise exception.CellRoutingInconsistency(reason=reason) + dest_name_parts = target_cell.split(_PATH_CELL_SEP) + if (_PATH_CELL_SEP.join(dest_name_parts[:next_hop_num]) != + routing_path): + reason = _("destination is %(target_cell)s but routing_path " + "is %(routing_path)s") % locals() + raise exception.CellRoutingInconsistency(reason=reason) + next_hop_name = dest_name_parts[next_hop_num] + if self.direction == 'up': + next_hop = self.state_manager.get_parent_cell(next_hop_name) + else: + next_hop = self.state_manager.get_child_cell(next_hop_name) + if not next_hop: + cell_type = 'parent' if self.direction == 'up' else 'child' + reason = _("Unknown %(cell_type)s when routing to " + "%(target_cell)s") % locals() + raise exception.CellRoutingInconsistency(reason=reason) + return next_hop + + def process(self): + """Process a targeted message. This is called for all cells + that touch this message. If the local cell is the one that + created this message, we reply directly with a Response instance. + If the local cell is not the target, an eventlet queue is created + and we wait for the response to show up via another thread + receiving the Response back. + + Responses to targeted messages are routed directly back to the + source. No eventlet queues are created in intermediate hops. + + All exceptions for processing the message across the whole + routing path are caught and encoded within the Response and + returned to the caller. + """ + try: + next_hop = self._get_next_hop() + except Exception as exc: + exc_info = sys.exc_info() + LOG.exception(_("Error locating next hop for message: %(exc)s"), + locals()) + return self._send_response_from_exception(exc_info) + + if next_hop.is_me: + # Final destination. + response = self._process_locally() + return self._send_response(response) + + # Need to forward via neighbor cell. + if self.need_response and self.source_is_us(): + # A response is needed and the source of the message is + # this cell. Create the eventlet queue. + self._setup_response_queue() + wait_for_response = True + else: + wait_for_response = False + + try: + # This is inside the try block, so we can encode the + # exception and return it to the caller. + if self.hop_count >= self.max_hop_count: + raise exception.CellMaxHopCountReached( + hop_count=self.hop_count) + next_hop.send_message(self) + except Exception as exc: + exc_info = sys.exc_info() + err_str = _("Failed to send message to cell: %(next_hop)s: " + "%(exc)s") + LOG.exception(err_str, locals()) + self._cleanup_response_queue() + return self._send_response_from_exception(exc_info) + + if wait_for_response: + # Targeted messages only have 1 response. + remote_response = self._wait_for_json_responses()[0] + return Response.from_json(remote_response) + + +class _BroadcastMessage(_BaseMessage): + """A broadcast message. This means to call a method in every single + cell going in a certain direction. + """ + message_type = 'broadcast' + + def __init__(self, msg_runner, ctxt, method_name, method_kwargs, + direction, run_locally=True, **kwargs): + super(_BroadcastMessage, self).__init__(msg_runner, ctxt, + method_name, method_kwargs, direction, **kwargs) + # The local cell creating this message has the option + # to be able to process the message locally or not. + self.run_locally = run_locally + self.is_broadcast = True + + def _get_next_hops(self): + """Set the next hops and return the number of hops. The next + hops may include ourself. + """ + if self.hop_count >= self.max_hop_count: + return [] + if self.direction == 'down': + return self.state_manager.get_child_cells() + else: + return self.state_manager.get_parent_cells() + + def _send_to_cells(self, target_cells): + """Send a message to multiple cells.""" + for cell in target_cells: + cell.send_message(self) + + def _send_json_responses(self, json_responses): + """Responses to broadcast messages always need to go to the + neighbor cell from which we received this message. That + cell aggregates the responses and makes sure to forward them + to the correct source. + """ + return super(_BroadcastMessage, self)._send_json_responses( + json_responses, neighbor_only=True, fanout=True) + + def process(self): + """Process a broadcast message. This is called for all cells + that touch this message. + + The message is sent to all cells in the certain direction and + the creator of this message has the option of whether or not + to process it locally as well. + + If responses from all cells are required, each hop creates an + eventlet queue and waits for responses from its immediate + neighbor cells. All responses are then aggregated into a + single list and are returned to the neighbor cell until the + source is reached. + + When the source is reached, a list of Response instances are + returned to the caller. + + All exceptions for processing the message across the whole + routing path are caught and encoded within the Response and + returned to the caller. It is possible to get a mix of + successful responses and failure responses. The caller is + responsible for dealing with this. + """ + try: + next_hops = self._get_next_hops() + except Exception as exc: + exc_info = sys.exc_info() + LOG.exception(_("Error locating next hops for message: %(exc)s"), + locals()) + return self._send_response_from_exception(exc_info) + + # Short circuit if we don't need to respond + if not self.need_response: + if self.run_locally: + self._process_locally() + self._send_to_cells(next_hops) + return + + # We'll need to aggregate all of the responses (from ourself + # and our sibling cells) into 1 response + try: + self._setup_response_queue() + self._send_to_cells(next_hops) + except Exception as exc: + # Error just trying to send to cells. Send a single response + # with the failure. + exc_info = sys.exc_info() + LOG.exception(_("Error sending message to next hops: %(exc)s"), + locals()) + self._cleanup_response_queue() + return self._send_response_from_exception(exc_info) + + if self.run_locally: + # Run locally and store the Response. + local_response = self._process_locally() + else: + local_response = None + + try: + remote_responses = self._wait_for_json_responses( + num_responses=len(next_hops)) + except Exception as exc: + # Error waiting for responses, most likely a timeout. + # Send a single response back with the failure. + exc_info = sys.exc_info() + err_str = _("Error waiting for responses from neighbor cells: " + "%(exc)s") + LOG.exception(err_str, locals()) + return self._send_response_from_exception(exc_info) + + if local_response: + remote_responses.append(local_response.to_json()) + return self._send_json_responses(remote_responses) + + +class _ResponseMessage(_TargetedMessage): + """A response message is really just a special targeted message, + saying to call 'parse_responses' when we reach the source of a 'call'. + + The 'fanout' attribute on this message may be true if we're responding + to a broadcast or if we're about to respond to the source of an + original target message. Because multiple nova-cells services may + be running within a cell, we need to make sure the response gets + back to the correct one, so we have to fanout. + """ + message_type = 'response' + + def __init__(self, msg_runner, ctxt, method_name, method_kwargs, + direction, target_cell, response_uuid, **kwargs): + super(_ResponseMessage, self).__init__(msg_runner, ctxt, + method_name, method_kwargs, direction, target_cell, **kwargs) + self.response_uuid = response_uuid + self.base_attrs_to_json.append('response_uuid') + + def process(self): + """Process a response. If the target is the local cell, process + the response here. Otherwise, forward it to where it needs to + go. + """ + next_hop = self._get_next_hop() + if next_hop.is_me: + self._process_locally() + return + if self.fanout is False: + # Really there's 1 more hop on each of these below, but + # it doesn't matter for this logic. + target_hops = self.target_cell.count(_PATH_CELL_SEP) + current_hops = self.routing_path.count(_PATH_CELL_SEP) + if current_hops + 1 == target_hops: + # Next hop is the target.. so we must fanout. See + # DocString above. + self.fanout = True + next_hop.send_message(self) + + +# +# Methods that may be called when processing messages after reaching +# a target cell. +# + + +class _BaseMessageMethods(base.Base): + """Base class for defining methods by message types.""" + def __init__(self, msg_runner): + super(_BaseMessageMethods, self).__init__() + self.msg_runner = msg_runner + self.state_manager = msg_runner.state_manager + self.compute_api = compute.API() + + +class _ResponseMessageMethods(_BaseMessageMethods): + """Methods that are called from a ResponseMessage. There's only + 1 method (parse_responses) and it is called when the message reaches + the source of a 'call'. All we do is stuff the response into the + eventlet queue to signal the caller that's waiting. + """ + def parse_responses(self, message, orig_message, responses): + self.msg_runner._put_response(message.response_uuid, + responses) + + +class _TargetedMessageMethods(_BaseMessageMethods): + """These are the methods that can be called when routing a message + to a specific cell. + """ + def __init__(self, *args, **kwargs): + super(_TargetedMessageMethods, self).__init__(*args, **kwargs) + + def schedule_run_instance(self, message, host_sched_kwargs): + """Parent cell told us to schedule new instance creation.""" + self.msg_runner.scheduler.run_instance(message, host_sched_kwargs) + + def run_compute_api_method(self, message, method_info): + """Run a method in the compute api class.""" + method = method_info['method'] + fn = getattr(self.compute_api, method, None) + if not fn: + detail = _("Unknown method '%(method)s' in compute API") + raise exception.CellServiceAPIMethodNotFound( + detail=detail % locals()) + args = list(method_info['method_args']) + # 1st arg is instance_uuid that we need to turn into the + # instance object. + instance_uuid = args[0] + try: + instance = self.db.instance_get_by_uuid(message.ctxt, + instance_uuid) + except exception.InstanceNotFound: + with excutils.save_and_reraise_exception(): + # Must be a race condition. Let's try to resolve it by + # telling the top level cells that this instance doesn't + # exist. + instance = {'uuid': instance_uuid} + self.msg_runner.instance_destroy_at_top(message.ctxt, + instance) + args[0] = instance + return fn(message.ctxt, *args, **method_info['method_kwargs']) + + def update_capabilities(self, message, cell_name, capabilities): + """A child cell told us about their capabilities.""" + LOG.debug(_("Received capabilities from child cell " + "%(cell_name)s: %(capabilities)s"), locals()) + self.state_manager.update_cell_capabilities(cell_name, + capabilities) + # Go ahead and update our parents now that a child updated us + self.msg_runner.tell_parents_our_capabilities(message.ctxt) + + def update_capacities(self, message, cell_name, capacities): + """A child cell told us about their capacity.""" + LOG.debug(_("Received capacities from child cell " + "%(cell_name)s: %(capacities)s"), locals()) + self.state_manager.update_cell_capacities(cell_name, + capacities) + # Go ahead and update our parents now that a child updated us + self.msg_runner.tell_parents_our_capacities(message.ctxt) + + def announce_capabilities(self, message): + """A parent cell has told us to send our capabilities, so let's + do so. + """ + self.msg_runner.tell_parents_our_capabilities(message.ctxt) + + def announce_capacities(self, message): + """A parent cell has told us to send our capacity, so let's + do so. + """ + self.msg_runner.tell_parents_our_capacities(message.ctxt) + + +class _BroadcastMessageMethods(_BaseMessageMethods): + """These are the methods that can be called as a part of a broadcast + message. + """ + def _at_the_top(self): + """Are we the API level?""" + return not self.state_manager.get_parent_cells() + + def instance_update_at_top(self, message, instance, **kwargs): + """Update an instance in the DB if we're a top level cell.""" + if not self._at_the_top(): + return + instance_uuid = instance['uuid'] + routing_path = message.routing_path + instance['cell_name'] = _reverse_path(routing_path) + # Remove things that we can't update in the top level cells. + # 'cell_name' is included in this list.. because we'll set it + # ourselves based on the reverse of the routing path. metadata + # is only updated in the API cell, so we don't listen to what + # the child cell tells us. + items_to_remove = ['id', 'security_groups', 'instance_type', + 'volumes', 'cell_name', 'name', 'metadata'] + for key in items_to_remove: + instance.pop(key, None) + + # Fixup info_cache. We'll have to update this separately if + # it exists. + info_cache = instance.pop('info_cache', None) + if info_cache is not None: + info_cache.pop('id', None) + info_cache.pop('instance', None) + + # Fixup system_metadata (should be a dict for update, not a list) + if ('system_metadata' in instance and + isinstance(instance['system_metadata'], list)): + sys_metadata = dict([(md['key'], md['value']) + for md in instance['system_metadata']]) + instance['system_metadata'] = sys_metadata + + LOG.debug(_("Got update for instance %(instance_uuid)s: " + "%(instance)s") % locals()) + + # It's possible due to some weird condition that the instance + # was already set as deleted... so we'll attempt to update + # it with permissions that allows us to read deleted. + with utils.temporary_mutation(message.ctxt, read_deleted="yes"): + try: + self.db.instance_update(message.ctxt, instance_uuid, + instance, update_cells=False) + except exception.NotFound: + # FIXME(comstud): Strange. Need to handle quotas here, + # if we actually want this code to remain.. + self.db.instance_create(message.ctxt, instance) + if info_cache: + self.db.instance_info_cache_update(message.ctxt, instance_uuid, + info_cache, update_cells=False) + + def instance_destroy_at_top(self, message, instance, **kwargs): + """Destroy an instance from the DB if we're a top level cell.""" + if not self._at_the_top(): + return + instance_uuid = instance['uuid'] + LOG.debug(_("Got update to delete instance %(instance_uuid)s") % + locals()) + try: + self.db.instance_destroy(message.ctxt, instance_uuid, + update_cells=False) + except exception.InstanceNotFound: + pass + + def instance_delete_everywhere(self, message, instance, delete_type, + **kwargs): + """Call compute API delete() or soft_delete() in every cell. + This is used when the API cell doesn't know what cell an instance + belongs to but the instance was requested to be deleted or + soft-deleted. So, we'll run it everywhere. + """ + LOG.debug(_("Got broadcast to %(delete_type)s delete instance"), + locals(), instance=instance) + if delete_type == 'soft': + self.compute_api.soft_delete(message.ctxt, instance) + else: + self.compute_api.delete(message.ctxt, instance) + + def instance_fault_create_at_top(self, message, instance_fault, **kwargs): + """Destroy an instance from the DB if we're a top level cell.""" + if not self._at_the_top(): + return + items_to_remove = ['id'] + for key in items_to_remove: + instance_fault.pop(key, None) + log_str = _("Got message to create instance fault: " + "%(instance_fault)s") + LOG.debug(log_str, locals()) + self.db.instance_fault_create(message.ctxt, instance_fault) + + def bw_usage_update_at_top(self, message, bw_update_info, **kwargs): + """Update Bandwidth usage in the DB if we're a top level cell.""" + if not self._at_the_top(): + return + self.db.bw_usage_update(message.ctxt, **bw_update_info) + + +_CELL_MESSAGE_TYPE_TO_MESSAGE_CLS = {'targeted': _TargetedMessage, + 'broadcast': _BroadcastMessage, + 'response': _ResponseMessage} +_CELL_MESSAGE_TYPE_TO_METHODS_CLS = {'targeted': _TargetedMessageMethods, + 'broadcast': _BroadcastMessageMethods, + 'response': _ResponseMessageMethods} + + +# +# Below are the public interfaces into this module. +# + + +class MessageRunner(object): + """This class is the main interface into creating messages and + processing them. + + Public methods in this class are typically called by the CellsManager + to create a new message and process it with the exception of + 'message_from_json' which should be used by CellsDrivers to convert + a JSONified message it has received back into the appropriate Message + class. + + Private methods are used internally when we need to keep some + 'global' state. For instance, eventlet queues used for responses are + held in this class. Also, when a Message is process()ed above and + it's determined we should take action locally, + _process_message_locally() will be called. + + When needing to add a new method to call in a Cell2Cell message, + define the new method below and also add it to the appropriate + MessageMethods class where the real work will be done. + """ + + def __init__(self, state_manager): + self.state_manager = state_manager + cells_scheduler_cls = importutils.import_class( + CONF.cells.scheduler) + self.scheduler = cells_scheduler_cls(self) + self.response_queues = {} + self.methods_by_type = {} + self.our_name = CONF.cells.name + for msg_type, cls in _CELL_MESSAGE_TYPE_TO_METHODS_CLS.iteritems(): + self.methods_by_type[msg_type] = cls(self) + + def _process_message_locally(self, message): + """Message processing will call this when its determined that + the message should be processed within this cell. Find the + method to call based on the message type, and call it. The + caller is responsible for catching exceptions and returning + results to cells, if needed. + """ + methods = self.methods_by_type[message.message_type] + fn = getattr(methods, message.method_name) + return fn(message, **message.method_kwargs) + + def _put_response(self, response_uuid, response): + """Put a response into a response queue. This is called when + a _ResponseMessage is processed in the cell that initiated a + 'call' to another cell. + """ + resp_queue = self.response_queues.get(response_uuid) + if not resp_queue: + # Response queue is gone. We must have restarted or we + # received a response after our timeout period. + return + resp_queue.put(response) + + def _setup_response_queue(self, message): + """Set up an eventlet queue to use to wait for replies. + + Replies come back from the target cell as a _ResponseMessage + being sent back to the source. + """ + resp_queue = queue.Queue() + self.response_queues[message.uuid] = resp_queue + return resp_queue + + def _cleanup_response_queue(self, message): + """Stop tracking the response queue either because we're + done receiving responses, or we've timed out. + """ + try: + del self.response_queues[message.uuid] + except KeyError: + # Ignore if queue is gone already somehow. + pass + + def _create_response_message(self, ctxt, direction, target_cell, + response_uuid, response_kwargs, **kwargs): + """Create a ResponseMessage. This is used internally within + the messaging module. + """ + return _ResponseMessage(self, ctxt, 'parse_responses', + response_kwargs, direction, target_cell, + response_uuid, **kwargs) + + def message_from_json(self, json_message): + """Turns a message in JSON format into an appropriate Message + instance. This is called when cells receive a message from + another cell. + """ + message_dict = jsonutils.loads(json_message) + message_type = message_dict.pop('message_type') + # Need to convert context back. + ctxt = message_dict['ctxt'] + message_dict['ctxt'] = context.RequestContext.from_dict(ctxt) + message_cls = _CELL_MESSAGE_TYPE_TO_MESSAGE_CLS[message_type] + return message_cls(self, **message_dict) + + def ask_children_for_capabilities(self, ctxt): + """Tell child cells to send us capabilities. This is typically + called on startup of the nova-cells service. + """ + child_cells = self.state_manager.get_child_cells() + for child_cell in child_cells: + message = _TargetedMessage(self, ctxt, + 'announce_capabilities', + dict(), 'down', child_cell) + message.process() + + def ask_children_for_capacities(self, ctxt): + """Tell child cells to send us capacities. This is typically + called on startup of the nova-cells service. + """ + child_cells = self.state_manager.get_child_cells() + for child_cell in child_cells: + message = _TargetedMessage(self, ctxt, 'announce_capacities', + dict(), 'down', child_cell) + message.process() + + def tell_parents_our_capabilities(self, ctxt): + """Send our capabilities to parent cells.""" + parent_cells = self.state_manager.get_parent_cells() + if not parent_cells: + return + my_cell_info = self.state_manager.get_my_state() + capabs = self.state_manager.get_our_capabilities() + LOG.debug(_("Updating parents with our capabilities: %(capabs)s"), + locals()) + # We have to turn the sets into lists so they can potentially + # be json encoded when the raw message is sent. + for key, values in capabs.items(): + capabs[key] = list(values) + method_kwargs = {'cell_name': my_cell_info.name, + 'capabilities': capabs} + for cell in parent_cells: + message = _TargetedMessage(self, ctxt, 'update_capabilities', + method_kwargs, 'up', cell, fanout=True) + message.process() + + def tell_parents_our_capacities(self, ctxt): + """Send our capacities to parent cells.""" + parent_cells = self.state_manager.get_parent_cells() + if not parent_cells: + return + my_cell_info = self.state_manager.get_my_state() + capacities = self.state_manager.get_our_capacities() + LOG.debug(_("Updating parents with our capacities: %(capacities)s"), + locals()) + method_kwargs = {'cell_name': my_cell_info.name, + 'capacities': capacities} + for cell in parent_cells: + message = _TargetedMessage(self, ctxt, 'update_capacities', + method_kwargs, 'up', cell, fanout=True) + message.process() + + def schedule_run_instance(self, ctxt, target_cell, host_sched_kwargs): + """Called by the scheduler to tell a child cell to schedule + a new instance for build. + """ + method_kwargs = dict(host_sched_kwargs=host_sched_kwargs) + message = _TargetedMessage(self, ctxt, 'schedule_run_instance', + method_kwargs, 'down', + target_cell) + message.process() + + def run_compute_api_method(self, ctxt, cell_name, method_info, call): + """Call a compute API method in a specific cell.""" + message = _TargetedMessage(self, ctxt, 'run_compute_api_method', + dict(method_info=method_info), 'down', + cell_name, need_response=call) + return message.process() + + def instance_update_at_top(self, ctxt, instance): + """Update an instance at the top level cell.""" + message = _BroadcastMessage(self, ctxt, 'instance_update_at_top', + dict(instance=instance), 'up', + run_locally=False) + message.process() + + def instance_destroy_at_top(self, ctxt, instance): + """Destroy an instance at the top level cell.""" + message = _BroadcastMessage(self, ctxt, 'instance_destroy_at_top', + dict(instance=instance), 'up', + run_locally=False) + message.process() + + def instance_delete_everywhere(self, ctxt, instance, delete_type): + """This is used by API cell when it didn't know what cell + an instance was in, but the instance was requested to be + deleted or soft_deleted. So, we'll broadcast this everywhere. + """ + method_kwargs = dict(instance=instance, delete_type=delete_type) + message = _BroadcastMessage(self, ctxt, + 'instance_delete_everywhere', + method_kwargs, 'down', + run_locally=False) + message.process() + + def instance_fault_create_at_top(self, ctxt, instance_fault): + """Create an instance fault at the top level cell.""" + message = _BroadcastMessage(self, ctxt, + 'instance_fault_create_at_top', + dict(instance_fault=instance_fault), + 'up', run_locally=False) + message.process() + + def bw_usage_update_at_top(self, ctxt, bw_update_info): + """Update bandwidth usage at top level cell.""" + message = _BroadcastMessage(self, ctxt, 'bw_usage_update_at_top', + dict(bw_update_info=bw_update_info), + 'up', run_locally=False) + message.process() + + @staticmethod + def get_message_types(): + return _CELL_MESSAGE_TYPE_TO_MESSAGE_CLS.keys() + + +class Response(object): + """Holds a response from a cell. If there was a failure, 'failure' + will be True and 'response' will contain an encoded Exception. + """ + def __init__(self, cell_name, value, failure): + self.failure = failure + self.cell_name = cell_name + self.value = value + + def to_json(self): + resp_value = self.value + if self.failure: + resp_value = rpc_common.serialize_remote_exception(resp_value, + log_failure=False) + _dict = {'cell_name': self.cell_name, + 'value': resp_value, + 'failure': self.failure} + return jsonutils.dumps(_dict) + + @classmethod + def from_json(cls, json_message): + _dict = jsonutils.loads(json_message) + if _dict['failure']: + resp_value = rpc_common.deserialize_remote_exception( + CONF, _dict['value']) + _dict['value'] = resp_value + return cls(**_dict) + + def value_or_raise(self): + if self.failure: + if isinstance(self.value, (tuple, list)): + raise self.value[0], self.value[1], self.value[2] + else: + raise self.value + return self.value diff --git a/nova/cells/opts.py b/nova/cells/opts.py new file mode 100644 index 000000000..45b453ebc --- /dev/null +++ b/nova/cells/opts.py @@ -0,0 +1,44 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2012 Rackspace Hosting +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Global cells config options +""" + +from nova.openstack.common import cfg + +cells_opts = [ + cfg.BoolOpt('enable', + default=False, + help='Enable cell functionality'), + cfg.StrOpt('topic', + default='cells', + help='the topic cells nodes listen on'), + cfg.StrOpt('manager', + default='nova.cells.manager.CellsManager', + help='Manager for cells'), + cfg.StrOpt('name', + default='nova', + help='name of this cell'), + cfg.ListOpt('capabilities', + default=['hypervisor=xenserver;kvm', 'os=linux;windows'], + help='Key/Multi-value list with the capabilities of the cell'), + cfg.IntOpt('call_timeout', + default=60, + help='Seconds to wait for response from a call to a cell.'), +] + +cfg.CONF.register_opts(cells_opts, group='cells') diff --git a/nova/cells/rpc_driver.py b/nova/cells/rpc_driver.py new file mode 100644 index 000000000..5e420aa8e --- /dev/null +++ b/nova/cells/rpc_driver.py @@ -0,0 +1,165 @@ +# Copyright (c) 2012 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Cells RPC Communication Driver +""" +from nova.cells import driver +from nova.openstack.common import cfg +from nova.openstack.common import rpc +from nova.openstack.common.rpc import dispatcher as rpc_dispatcher +from nova.openstack.common.rpc import proxy as rpc_proxy + +cell_rpc_driver_opts = [ + cfg.StrOpt('rpc_driver_queue_base', + default='cells.intercell', + help="Base queue name to use when communicating between " + "cells. Various topics by message type will be " + "appended to this.")] + +CONF = cfg.CONF +CONF.register_opts(cell_rpc_driver_opts, group='cells') +CONF.import_opt('call_timeout', 'nova.cells.opts', group='cells') + +_CELL_TO_CELL_RPC_API_VERSION = '1.0' + + +class CellsRPCDriver(driver.BaseCellsDriver): + """Driver for cell<->cell communication via RPC. This is used to + setup the RPC consumers as well as to send a message to another cell. + + One instance of this class will be created for every neighbor cell + that we find in the DB and it will be associated with the cell in + its CellState. + + One instance is also created by the cells manager for setting up + the consumers. + """ + BASE_RPC_API_VERSION = _CELL_TO_CELL_RPC_API_VERSION + + def __init__(self, *args, **kwargs): + super(CellsRPCDriver, self).__init__(*args, **kwargs) + self.rpc_connections = [] + self.intercell_rpcapi = InterCellRPCAPI( + self.BASE_RPC_API_VERSION) + + def _start_consumer(self, dispatcher, topic): + """Start an RPC consumer.""" + conn = rpc.create_connection(new=True) + conn.create_consumer(topic, dispatcher, fanout=False) + conn.create_consumer(topic, dispatcher, fanout=True) + self.rpc_connections.append(conn) + conn.consume_in_thread() + return conn + + def start_consumers(self, msg_runner): + """Start RPC consumers. + + Start up 2 separate consumers for handling inter-cell + communication via RPC. Both handle the same types of + messages, but requests/replies are separated to solve + potential deadlocks. (If we used the same queue for both, + it's possible to exhaust the RPC thread pool while we wait + for replies.. such that we'd never consume a reply.) + """ + topic_base = CONF.cells.rpc_driver_queue_base + proxy_manager = InterCellRPCDispatcher(msg_runner) + dispatcher = rpc_dispatcher.RpcDispatcher([proxy_manager]) + for msg_type in msg_runner.get_message_types(): + topic = '%s.%s' % (topic_base, msg_type) + self._start_consumer(dispatcher, topic) + + def stop_consumers(self): + """Stop RPC consumers. + + NOTE: Currently there's no hooks when stopping services + to have managers cleanup, so this is not currently called. + """ + for conn in self.rpc_connections: + conn.close() + + def send_message_to_cell(self, cell_state, message): + """Use the IntercellRPCAPI to send a message to a cell.""" + self.intercell_rpcapi.send_message_to_cell(cell_state, message) + + +class InterCellRPCAPI(rpc_proxy.RpcProxy): + """Client side of the Cell<->Cell RPC API. + + The CellsRPCDriver uses this to make calls to another cell. + + API version history: + 1.0 - Initial version. + """ + def __init__(self, default_version): + super(InterCellRPCAPI, self).__init__(None, default_version) + + @staticmethod + def _get_server_params_for_cell(next_hop): + """Turn the DB information for a cell into the parameters + needed for the RPC call. + """ + param_map = {'username': 'username', + 'password': 'password', + 'rpc_host': 'hostname', + 'rpc_port': 'port', + 'rpc_virtual_host': 'virtual_host'} + server_params = {} + for source, target in param_map.items(): + if next_hop.db_info[source]: + server_params[target] = next_hop.db_info[source] + return server_params + + def send_message_to_cell(self, cell_state, message): + """Send a message to another cell by JSON-ifying the message and + making an RPC cast to 'process_message'. If the message says to + fanout, do it. The topic that is used will be + 'CONF.rpc_driver_queue_base.<message_type>'. + """ + ctxt = message.ctxt + json_message = message.to_json() + rpc_message = self.make_msg('process_message', message=json_message) + topic_base = CONF.cells.rpc_driver_queue_base + topic = '%s.%s' % (topic_base, message.message_type) + server_params = self._get_server_params_for_cell(cell_state) + if message.fanout: + self.fanout_cast_to_server(ctxt, server_params, + rpc_message, topic=topic) + else: + self.cast_to_server(ctxt, server_params, + rpc_message, topic=topic) + + +class InterCellRPCDispatcher(object): + """RPC Dispatcher to handle messages received from other cells. + + All messages received here have come from a sibling cell. Depending + on the ultimate target and type of message, we may process the message + in this cell, relay the message to another sibling cell, or both. This + logic is defined by the message class in the messaging module. + """ + BASE_RPC_API_VERSION = _CELL_TO_CELL_RPC_API_VERSION + + def __init__(self, msg_runner): + """Init the Intercell RPC Dispatcher.""" + self.msg_runner = msg_runner + + def process_message(self, _ctxt, message): + """We received a message from another cell. Use the MessageRunner + to turn this from JSON back into an instance of the correct + Message class. Then process it! + """ + message = self.msg_runner.message_from_json(message) + message.process() diff --git a/nova/cells/rpcapi.py b/nova/cells/rpcapi.py new file mode 100644 index 000000000..8ce298829 --- /dev/null +++ b/nova/cells/rpcapi.py @@ -0,0 +1,138 @@ +# Copyright (c) 2012 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Client side of nova-cells RPC API (for talking to the nova-cells service +within a cell). + +This is different than communication between child and parent nova-cells +services. That communication is handled by the cells driver via the +messging module. +""" + +from nova.openstack.common import cfg +from nova.openstack.common import jsonutils +from nova.openstack.common import log as logging +from nova.openstack.common.rpc import proxy as rpc_proxy + +LOG = logging.getLogger(__name__) + +CONF = cfg.CONF +CONF.import_opt('enable', 'nova.cells.opts', group='cells') +CONF.import_opt('topic', 'nova.cells.opts', group='cells') + + +class CellsAPI(rpc_proxy.RpcProxy): + '''Cells client-side RPC API + + API version history: + + 1.0 - Initial version. + ''' + BASE_RPC_API_VERSION = '1.0' + + def __init__(self): + super(CellsAPI, self).__init__(topic=CONF.cells.topic, + default_version=self.BASE_RPC_API_VERSION) + + def cast_compute_api_method(self, ctxt, cell_name, method, + *args, **kwargs): + """Make a cast to a compute API method in a certain cell.""" + method_info = {'method': method, + 'method_args': args, + 'method_kwargs': kwargs} + self.cast(ctxt, self.make_msg('run_compute_api_method', + cell_name=cell_name, + method_info=method_info, + call=False)) + + def call_compute_api_method(self, ctxt, cell_name, method, + *args, **kwargs): + """Make a call to a compute API method in a certain cell.""" + method_info = {'method': method, + 'method_args': args, + 'method_kwargs': kwargs} + return self.call(ctxt, self.make_msg('run_compute_api_method', + cell_name=cell_name, + method_info=method_info, + call=True)) + + def schedule_run_instance(self, ctxt, **kwargs): + """Schedule a new instance for creation.""" + self.cast(ctxt, self.make_msg('schedule_run_instance', + host_sched_kwargs=kwargs)) + + def instance_update_at_top(self, ctxt, instance): + """Update instance at API level.""" + if not CONF.cells.enable: + return + # Make sure we have a dict, not a SQLAlchemy model + instance_p = jsonutils.to_primitive(instance) + self.cast(ctxt, self.make_msg('instance_update_at_top', + instance=instance_p)) + + def instance_destroy_at_top(self, ctxt, instance): + """Destroy instance at API level.""" + if not CONF.cells.enable: + return + instance_p = jsonutils.to_primitive(instance) + self.cast(ctxt, self.make_msg('instance_destroy_at_top', + instance=instance_p)) + + def instance_delete_everywhere(self, ctxt, instance, delete_type): + """Delete instance everywhere. delete_type may be 'soft' + or 'hard'. This is generally only used to resolve races + when API cell doesn't know to what cell an instance belongs. + """ + if not CONF.cells.enable: + return + instance_p = jsonutils.to_primitive(instance) + self.cast(ctxt, self.make_msg('instance_delete_everywhere', + instance=instance_p, + delete_type=delete_type)) + + def instance_fault_create_at_top(self, ctxt, instance_fault): + """Create an instance fault at the top.""" + if not CONF.cells.enable: + return + instance_fault_p = jsonutils.to_primitive(instance_fault) + self.cast(ctxt, self.make_msg('instance_fault_create_at_top', + instance_fault=instance_fault_p)) + + def bw_usage_update_at_top(self, ctxt, uuid, mac, start_period, + bw_in, bw_out, last_ctr_in, last_ctr_out, last_refreshed=None): + """Broadcast upwards that bw_usage was updated.""" + if not CONF.cells.enable: + return + bw_update_info = {'uuid': uuid, + 'mac': mac, + 'start_period': start_period, + 'bw_in': bw_in, + 'bw_out': bw_out, + 'last_ctr_in': last_ctr_in, + 'last_ctr_out': last_ctr_out, + 'last_refreshed': last_refreshed} + self.cast(ctxt, self.make_msg('bw_usage_update_at_top', + bw_update_info=bw_update_info)) + + def instance_info_cache_update_at_top(self, ctxt, instance_info_cache): + """Broadcast up that an instance's info_cache has changed.""" + if not CONF.cells.enable: + return + iicache = jsonutils.to_primitive(instance_info_cache) + instance = {'uuid': iicache['instance_uuid'], + 'info_cache': iicache} + self.cast(ctxt, self.make_msg('instance_update_at_top', + instance=instance)) diff --git a/nova/cells/scheduler.py b/nova/cells/scheduler.py new file mode 100644 index 000000000..0b730290a --- /dev/null +++ b/nova/cells/scheduler.py @@ -0,0 +1,136 @@ +# Copyright (c) 2012 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Cells Scheduler +""" +import random +import time + +from nova import compute +from nova.compute import vm_states +from nova.db import base +from nova import exception +from nova.openstack.common import cfg +from nova.openstack.common import log as logging +from nova.scheduler import rpcapi as scheduler_rpcapi + +cell_scheduler_opts = [ + cfg.IntOpt('scheduler_retries', + default=10, + help='How many retries when no cells are available.'), + cfg.IntOpt('scheduler_retry_delay', + default=2, + help='How often to retry in seconds when no cells are ' + 'available.') +] + +LOG = logging.getLogger(__name__) + +CONF = cfg.CONF +CONF.register_opts(cell_scheduler_opts, group='cells') + + +class CellsScheduler(base.Base): + """The cells scheduler.""" + + def __init__(self, msg_runner): + super(CellsScheduler, self).__init__() + self.msg_runner = msg_runner + self.state_manager = msg_runner.state_manager + self.compute_api = compute.API() + self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI() + + def _create_instances_here(self, ctxt, request_spec): + instance_values = request_spec['instance_properties'] + for instance_uuid in request_spec['instance_uuids']: + instance_values['uuid'] = instance_uuid + instance = self.compute_api.create_db_entry_for_new_instance( + ctxt, + request_spec['instance_type'], + request_spec['image'], + instance_values, + request_spec['security_group'], + request_spec['block_device_mapping']) + self.msg_runner.instance_update_at_top(ctxt, instance) + + def _get_possible_cells(self): + cells = set(self.state_manager.get_child_cells()) + our_cell = self.state_manager.get_my_state() + # Include our cell in the list, if we have any capacity info + if not cells or our_cell.capacities: + cells.add(our_cell) + return cells + + def _run_instance(self, message, host_sched_kwargs): + """Attempt to schedule instance(s). If we have no cells + to try, raise exception.NoCellsAvailable + """ + ctxt = message.ctxt + request_spec = host_sched_kwargs['request_spec'] + + # The message we might forward to a child cell + cells = self._get_possible_cells() + if not cells: + raise exception.NoCellsAvailable() + cells = list(cells) + + # Random selection for now + random.shuffle(cells) + target_cell = cells[0] + + LOG.debug(_("Scheduling with routing_path=%(routing_path)s"), + locals()) + + if target_cell.is_me: + # Need to create instance DB entries as the host scheduler + # expects that the instance(s) already exists. + self._create_instances_here(ctxt, request_spec) + self.scheduler_rpcapi.run_instance(ctxt, + **host_sched_kwargs) + return + self.msg_runner.schedule_run_instance(ctxt, target_cell, + host_sched_kwargs) + + def run_instance(self, message, host_sched_kwargs): + """Pick a cell where we should create a new instance.""" + try: + for i in xrange(max(0, CONF.cells.scheduler_retries) + 1): + try: + return self._run_instance(message, host_sched_kwargs) + except exception.NoCellsAvailable: + if i == max(0, CONF.cells.scheduler_retries): + raise + sleep_time = max(1, CONF.cells.scheduler_retry_delay) + LOG.info(_("No cells available when scheduling. Will " + "retry in %(sleep_time)s second(s)"), locals()) + time.sleep(sleep_time) + continue + except Exception: + request_spec = host_sched_kwargs['request_spec'] + instance_uuids = request_spec['instance_uuids'] + LOG.exception(_("Error scheduling instances %(instance_uuids)s"), + locals()) + ctxt = message.ctxt + for instance_uuid in instance_uuids: + self.msg_runner.instance_update_at_top(ctxt, + {'uuid': instance_uuid, + 'vm_state': vm_states.ERROR}) + try: + self.db.instance_update(ctxt, + instance_uuid, + {'vm_state': vm_states.ERROR}) + except Exception: + pass diff --git a/nova/cells/state.py b/nova/cells/state.py new file mode 100644 index 000000000..c6f8f3220 --- /dev/null +++ b/nova/cells/state.py @@ -0,0 +1,346 @@ +# Copyright (c) 2012 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +CellState Manager +""" +import copy +import datetime +import functools + +from nova.cells import rpc_driver +from nova import context +from nova.db import base +from nova.openstack.common import cfg +from nova.openstack.common import lockutils +from nova.openstack.common import log as logging +from nova.openstack.common import timeutils + +cell_state_manager_opts = [ + cfg.IntOpt('db_check_interval', + default=60, + help='Seconds between getting fresh cell info from db.'), +] + + +LOG = logging.getLogger(__name__) + +CONF = cfg.CONF +CONF.import_opt('host', 'nova.config') +CONF.import_opt('name', 'nova.cells.opts', group='cells') +#CONF.import_opt('capabilities', 'nova.cells.opts', group='cells') +CONF.register_opts(cell_state_manager_opts, group='cells') + + +class CellState(object): + """Holds information for a particular cell.""" + def __init__(self, cell_name, is_me=False): + self.name = cell_name + self.is_me = is_me + self.last_seen = datetime.datetime.min + self.capabilities = {} + self.capacities = {} + self.db_info = {} + # TODO(comstud): The DB will specify the driver to use to talk + # to this cell, but there's no column for this yet. The only + # available driver is the rpc driver. + self.driver = rpc_driver.CellsRPCDriver() + + def update_db_info(self, cell_db_info): + """Update cell credentials from db""" + self.db_info = dict( + [(k, v) for k, v in cell_db_info.iteritems() + if k != 'name']) + + def update_capabilities(self, cell_metadata): + """Update cell capabilities for a cell.""" + self.last_seen = timeutils.utcnow() + self.capabilities = cell_metadata + + def update_capacities(self, capacities): + """Update capacity information for a cell.""" + self.last_seen = timeutils.utcnow() + self.capacities = capacities + + def get_cell_info(self): + """Return subset of cell information for OS API use.""" + db_fields_to_return = ['id', 'is_parent', 'weight_scale', + 'weight_offset', 'username', 'rpc_host', 'rpc_port'] + cell_info = dict(name=self.name, capabilities=self.capabilities) + if self.db_info: + for field in db_fields_to_return: + cell_info[field] = self.db_info[field] + return cell_info + + def send_message(self, message): + """Send a message to a cell. Just forward this to the driver, + passing ourselves and the message as arguments. + """ + self.driver.send_message_to_cell(self, message) + + def __repr__(self): + me = "me" if self.is_me else "not_me" + return "Cell '%s' (%s)" % (self.name, me) + + +def sync_from_db(f): + """Use as a decorator to wrap methods that use cell information to + make sure they sync the latest information from the DB periodically. + """ + @functools.wraps(f) + def wrapper(self, *args, **kwargs): + if self._time_to_sync(): + self._cell_db_sync() + return f(self, *args, **kwargs) + return wrapper + + +class CellStateManager(base.Base): + def __init__(self, cell_state_cls=None): + super(CellStateManager, self).__init__() + if not cell_state_cls: + cell_state_cls = CellState + self.cell_state_cls = cell_state_cls + self.my_cell_state = cell_state_cls(CONF.cells.name, is_me=True) + self.parent_cells = {} + self.child_cells = {} + self.last_cell_db_check = datetime.datetime.min + self._cell_db_sync() + my_cell_capabs = {} + for cap in CONF.cells.capabilities: + name, value = cap.split('=', 1) + if ';' in value: + values = set(value.split(';')) + else: + values = set([value]) + my_cell_capabs[name] = values + self.my_cell_state.update_capabilities(my_cell_capabs) + + def _refresh_cells_from_db(self, ctxt): + """Make our cell info map match the db.""" + # Add/update existing cells ... + db_cells = self.db.cell_get_all(ctxt) + db_cells_dict = dict([(cell['name'], cell) for cell in db_cells]) + + # Update current cells. Delete ones that disappeared + for cells_dict in (self.parent_cells, self.child_cells): + for cell_name, cell_info in cells_dict.items(): + is_parent = cell_info.db_info['is_parent'] + db_dict = db_cells_dict.get(cell_name) + if db_dict and is_parent == db_dict['is_parent']: + cell_info.update_db_info(db_dict) + else: + del cells_dict[cell_name] + + # Add new cells + for cell_name, db_info in db_cells_dict.items(): + if db_info['is_parent']: + cells_dict = self.parent_cells + else: + cells_dict = self.child_cells + if cell_name not in cells_dict: + cells_dict[cell_name] = self.cell_state_cls(cell_name) + cells_dict[cell_name].update_db_info(db_info) + + def _time_to_sync(self): + """Is it time to sync the DB against our memory cache?""" + diff = timeutils.utcnow() - self.last_cell_db_check + return diff.seconds >= CONF.cells.db_check_interval + + def _update_our_capacity(self, context): + """Update our capacity in the self.my_cell_state CellState. + + This will add/update 2 entries in our CellState.capacities, + 'ram_free' and 'disk_free'. + + The values of these are both dictionaries with the following + format: + + {'total_mb': <total_memory_free_in_the_cell>, + 'units_by_mb: <units_dictionary>} + + <units_dictionary> contains the number of units that we can + build for every instance_type that we have. This number is + computed by looking at room available on every compute_node. + + Take the following instance_types as an example: + + [{'memory_mb': 1024, 'root_gb': 10, 'ephemeral_gb': 100}, + {'memory_mb': 2048, 'root_gb': 20, 'ephemeral_gb': 200}] + + capacities['ram_free']['units_by_mb'] would contain the following: + + {'1024': <number_of_instances_that_will_fit>, + '2048': <number_of_instances_that_will_fit>} + + capacities['disk_free']['units_by_mb'] would contain the following: + + {'122880': <number_of_instances_that_will_fit>, + '225280': <number_of_instances_that_will_fit>} + + Units are in MB, so 122880 = (10 + 100) * 1024. + + NOTE(comstud): Perhaps we should only report a single number + available per instance_type. + """ + + compute_hosts = {} + + def _get_compute_hosts(): + compute_nodes = self.db.compute_node_get_all(context) + for compute in compute_nodes: + service = compute['service'] + if not service or service['disabled']: + continue + host = service['host'] + compute_hosts[host] = { + 'free_ram_mb': compute['free_ram_mb'], + 'free_disk_mb': compute['free_disk_gb'] * 1024} + + _get_compute_hosts() + if not compute_hosts: + self.my_cell_state.update_capacities({}) + return + + ram_mb_free_units = {} + disk_mb_free_units = {} + total_ram_mb_free = 0 + total_disk_mb_free = 0 + + def _free_units(tot, per_inst): + if per_inst: + return max(0, int(tot / per_inst)) + else: + return 0 + + def _update_from_values(values, instance_type): + memory_mb = instance_type['memory_mb'] + disk_mb = (instance_type['root_gb'] + + instance_type['ephemeral_gb']) * 1024 + ram_mb_free_units.setdefault(str(memory_mb), 0) + disk_mb_free_units.setdefault(str(disk_mb), 0) + ram_free_units = _free_units(compute_values['free_ram_mb'], + memory_mb) + disk_free_units = _free_units(compute_values['free_disk_mb'], + disk_mb) + ram_mb_free_units[str(memory_mb)] += ram_free_units + disk_mb_free_units[str(disk_mb)] += disk_free_units + + instance_types = self.db.instance_type_get_all(context) + + for compute_values in compute_hosts.values(): + total_ram_mb_free += compute_values['free_ram_mb'] + total_disk_mb_free += compute_values['free_disk_mb'] + for instance_type in instance_types: + _update_from_values(compute_values, instance_type) + + capacities = {'ram_free': {'total_mb': total_ram_mb_free, + 'units_by_mb': ram_mb_free_units}, + 'disk_free': {'total_mb': total_disk_mb_free, + 'units_by_mb': disk_mb_free_units}} + self.my_cell_state.update_capacities(capacities) + + @lockutils.synchronized('cell-db-sync', 'nova-') + def _cell_db_sync(self): + """Update status for all cells if it's time. Most calls to + this are from the check_for_update() decorator that checks + the time, but it checks outside of a lock. The duplicate + check here is to prevent multiple threads from pulling the + information simultaneously. + """ + if self._time_to_sync(): + LOG.debug(_("Updating cell cache from db.")) + self.last_cell_db_check = timeutils.utcnow() + ctxt = context.get_admin_context() + self._refresh_cells_from_db(ctxt) + self._update_our_capacity(ctxt) + + @sync_from_db + def get_my_state(self): + """Return information for my (this) cell.""" + return self.my_cell_state + + @sync_from_db + def get_child_cells(self): + """Return list of child cell_infos.""" + return self.child_cells.values() + + @sync_from_db + def get_parent_cells(self): + """Return list of parent cell_infos.""" + return self.parent_cells.values() + + @sync_from_db + def get_parent_cell(self, cell_name): + return self.parent_cells.get(cell_name) + + @sync_from_db + def get_child_cell(self, cell_name): + return self.child_cells.get(cell_name) + + @sync_from_db + def update_cell_capabilities(self, cell_name, capabilities): + """Update capabilities for a cell.""" + cell = self.child_cells.get(cell_name) + if not cell: + cell = self.parent_cells.get(cell_name) + if not cell: + LOG.error(_("Unknown cell '%(cell_name)s' when trying to " + "update capabilities"), locals()) + return + # Make sure capabilities are sets. + for capab_name, values in capabilities.items(): + capabilities[capab_name] = set(values) + cell.update_capabilities(capabilities) + + @sync_from_db + def update_cell_capacities(self, cell_name, capacities): + """Update capacities for a cell.""" + cell = self.child_cells.get(cell_name) + if not cell: + cell = self.parent_cells.get(cell_name) + if not cell: + LOG.error(_("Unknown cell '%(cell_name)s' when trying to " + "update capacities"), locals()) + return + cell.update_capacities(capacities) + + @sync_from_db + def get_our_capabilities(self, include_children=True): + capabs = copy.deepcopy(self.my_cell_state.capabilities) + if include_children: + for cell in self.child_cells.values(): + for capab_name, values in cell.capabilities.items(): + if capab_name not in capabs: + capabs[capab_name] = set([]) + capabs[capab_name] |= values + return capabs + + def _add_to_dict(self, target, src): + for key, value in src.items(): + if isinstance(value, dict): + target.setdefault(key, {}) + self._add_to_dict(target[key], value) + continue + target.setdefault(key, 0) + target[key] += value + + @sync_from_db + def get_our_capacities(self, include_children=True): + capacities = copy.deepcopy(self.my_cell_state.capacities) + if include_children: + for cell in self.child_cells.values(): + self._add_to_dict(capacities, cell.capacities) + return capacities diff --git a/nova/compute/api.py b/nova/compute/api.py index 757f78f2d..abbc0bd92 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -1954,6 +1954,14 @@ class API(base.Base): return {'url': connect_info['access_url']} + def get_vnc_connect_info(self, context, instance, console_type): + """Used in a child cell to get console info.""" + if not instance['host']: + raise exception.InstanceNotReady(instance_id=instance['uuid']) + connect_info = self.compute_rpcapi.get_vnc_console(context, + instance=instance, console_type=console_type) + return connect_info + @wrap_check_policy def get_console_output(self, context, instance, tail_length=None): """Get console output for an instance.""" diff --git a/nova/compute/cells_api.py b/nova/compute/cells_api.py new file mode 100644 index 000000000..cdbccebb1 --- /dev/null +++ b/nova/compute/cells_api.py @@ -0,0 +1,471 @@ +# Copyright (c) 2012 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Compute API that proxies via Cells Service""" + +from nova import block_device +from nova.cells import rpcapi as cells_rpcapi +from nova.compute import api as compute_api +from nova.compute import task_states +from nova.compute import vm_states +from nova import exception +from nova.openstack.common import excutils +from nova.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + + +check_instance_state = compute_api.check_instance_state +wrap_check_policy = compute_api.wrap_check_policy +check_policy = compute_api.check_policy +check_instance_lock = compute_api.check_instance_lock + + +def validate_cell(fn): + def _wrapped(self, context, instance, *args, **kwargs): + self._validate_cell(instance, fn.__name__) + return fn(self, context, instance, *args, **kwargs) + _wrapped.__name__ = fn.__name__ + return _wrapped + + +class ComputeRPCAPINoOp(object): + def __getattr__(self, key): + def _noop_rpc_wrapper(*args, **kwargs): + return None + return _noop_rpc_wrapper + + +class SchedulerRPCAPIRedirect(object): + def __init__(self, cells_rpcapi_obj): + self.cells_rpcapi = cells_rpcapi_obj + + def __getattr__(self, key): + def _noop_rpc_wrapper(*args, **kwargs): + return None + return _noop_rpc_wrapper + + def run_instance(self, context, **kwargs): + self.cells_rpcapi.schedule_run_instance(context, **kwargs) + + +class ComputeCellsAPI(compute_api.API): + def __init__(self, *args, **kwargs): + super(ComputeCellsAPI, self).__init__(*args, **kwargs) + self.cells_rpcapi = cells_rpcapi.CellsAPI() + # Avoid casts/calls directly to compute + self.compute_rpcapi = ComputeRPCAPINoOp() + # Redirect scheduler run_instance to cells. + self.scheduler_rpcapi = SchedulerRPCAPIRedirect(self.cells_rpcapi) + + def _cell_read_only(self, cell_name): + """Is the target cell in a read-only mode?""" + # FIXME(comstud): Add support for this. + return False + + def _validate_cell(self, instance, method): + cell_name = instance['cell_name'] + if not cell_name: + raise exception.InstanceUnknownCell( + instance_uuid=instance['uuid']) + if self._cell_read_only(cell_name): + raise exception.InstanceInvalidState( + attr="vm_state", + instance_uuid=instance['uuid'], + state="temporary_readonly", + method=method) + + def _cast_to_cells(self, context, instance, method, *args, **kwargs): + instance_uuid = instance['uuid'] + cell_name = instance['cell_name'] + if not cell_name: + raise exception.InstanceUnknownCell(instance_uuid=instance_uuid) + self.cells_rpcapi.cast_compute_api_method(context, cell_name, + method, instance_uuid, *args, **kwargs) + + def _call_to_cells(self, context, instance, method, *args, **kwargs): + instance_uuid = instance['uuid'] + cell_name = instance['cell_name'] + if not cell_name: + raise exception.InstanceUnknownCell(instance_uuid=instance_uuid) + return self.cells_rpcapi.call_compute_api_method(context, cell_name, + method, instance_uuid, *args, **kwargs) + + def _check_requested_networks(self, context, requested_networks): + """Override compute API's checking of this. It'll happen in + child cell + """ + return + + def _validate_image_href(self, context, image_href): + """Override compute API's checking of this. It'll happen in + child cell + """ + return + + def _create_image(self, context, instance, name, image_type, + backup_type=None, rotation=None, extra_properties=None): + if backup_type: + return self._call_to_cells(context, instance, 'backup', + name, backup_type, rotation, + extra_properties=extra_properties) + else: + return self._call_to_cells(context, instance, 'snapshot', + name, extra_properties=extra_properties) + + def create(self, *args, **kwargs): + """We can use the base functionality, but I left this here just + for completeness. + """ + return super(ComputeCellsAPI, self).create(*args, **kwargs) + + @validate_cell + def update(self, context, instance, **kwargs): + """Update an instance.""" + rv = super(ComputeCellsAPI, self).update(context, + instance, **kwargs) + # We need to skip vm_state/task_state updates... those will + # happen when via a a _cast_to_cells for running a different + # compute api method + kwargs_copy = kwargs.copy() + kwargs_copy.pop('vm_state', None) + kwargs_copy.pop('task_state', None) + if kwargs_copy: + try: + self._cast_to_cells(context, instance, 'update', + **kwargs_copy) + except exception.InstanceUnknownCell: + pass + return rv + + def _local_delete(self, context, instance, bdms): + # This will get called for every delete in the API cell + # because _delete() in compute/api.py will not find a + # service when checking if it's up. + # We need to only take action if there's no cell_name. Our + # overrides of delete() and soft_delete() will take care of + # the rest. + cell_name = instance['cell_name'] + if not cell_name: + return super(ComputeCellsAPI, self)._local_delete(context, + instance, bdms) + + def soft_delete(self, context, instance): + self._handle_cell_delete(context, instance, + super(ComputeCellsAPI, self).soft_delete, 'soft_delete') + + def delete(self, context, instance): + self._handle_cell_delete(context, instance, + super(ComputeCellsAPI, self).delete, 'delete') + + def _handle_cell_delete(self, context, instance, method, method_name): + """Terminate an instance.""" + # We can't use the decorator because we have special logic in the + # case we don't know the cell_name... + cell_name = instance['cell_name'] + if cell_name and self._cell_read_only(cell_name): + raise exception.InstanceInvalidState( + attr="vm_state", + instance_uuid=instance['uuid'], + state="temporary_readonly", + method=method_name) + method(context, instance) + try: + self._cast_to_cells(context, instance, method_name) + except exception.InstanceUnknownCell: + # If there's no cell, there's also no host... which means + # the instance was destroyed from the DB here. Let's just + # broadcast a message down to all cells and hope this ends + # up resolving itself... Worse case.. the instance will + # show back up again here. + delete_type = method == 'soft_delete' and 'soft' or 'hard' + self.cells_rpcapi.instance_delete_everywhere(context, + instance['uuid'], delete_type) + + @validate_cell + def restore(self, context, instance): + """Restore a previously deleted (but not reclaimed) instance.""" + super(ComputeCellsAPI, self).restore(context, instance) + self._cast_to_cells(context, instance, 'restore') + + @validate_cell + def force_delete(self, context, instance): + """Force delete a previously deleted (but not reclaimed) instance.""" + super(ComputeCellsAPI, self).force_delete(context, instance) + self._cast_to_cells(context, instance, 'force_delete') + + @validate_cell + def stop(self, context, instance, do_cast=True): + """Stop an instance.""" + super(ComputeCellsAPI, self).stop(context, instance) + if do_cast: + self._cast_to_cells(context, instance, 'stop', do_cast=True) + else: + return self._call_to_cells(context, instance, 'stop', + do_cast=False) + + @validate_cell + def start(self, context, instance): + """Start an instance.""" + super(ComputeCellsAPI, self).start(context, instance) + self._cast_to_cells(context, instance, 'start') + + @validate_cell + def reboot(self, context, instance, *args, **kwargs): + """Reboot the given instance.""" + super(ComputeCellsAPI, self).reboot(context, instance, + *args, **kwargs) + self._cast_to_cells(context, instance, 'reboot', *args, + **kwargs) + + @validate_cell + def rebuild(self, context, instance, *args, **kwargs): + """Rebuild the given instance with the provided attributes.""" + super(ComputeCellsAPI, self).rebuild(context, instance, *args, + **kwargs) + self._cast_to_cells(context, instance, 'rebuild', *args, **kwargs) + + @check_instance_state(vm_state=[vm_states.RESIZED]) + @validate_cell + def revert_resize(self, context, instance): + """Reverts a resize, deleting the 'new' instance in the process.""" + # NOTE(markwash): regular api manipulates the migration here, but we + # don't have access to it. So to preserve the interface just update the + # vm and task state. + self.update(context, instance, + task_state=task_states.RESIZE_REVERTING) + self._cast_to_cells(context, instance, 'revert_resize') + + @check_instance_state(vm_state=[vm_states.RESIZED]) + @validate_cell + def confirm_resize(self, context, instance): + """Confirms a migration/resize and deletes the 'old' instance.""" + # NOTE(markwash): regular api manipulates migration here, but we don't + # have the migration in the api database. So to preserve the interface + # just update the vm and task state without calling super() + self.update(context, instance, task_state=None, + vm_state=vm_states.ACTIVE) + self._cast_to_cells(context, instance, 'confirm_resize') + + @check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.STOPPED], + task_state=[None]) + @validate_cell + def resize(self, context, instance, *args, **kwargs): + """Resize (ie, migrate) a running instance. + + If flavor_id is None, the process is considered a migration, keeping + the original flavor_id. If flavor_id is not None, the instance should + be migrated to a new host and resized to the new flavor_id. + """ + super(ComputeCellsAPI, self).resize(context, instance, *args, + **kwargs) + # FIXME(comstud): pass new instance_type object down to a method + # that'll unfold it + self._cast_to_cells(context, instance, 'resize', *args, **kwargs) + + @validate_cell + def add_fixed_ip(self, context, instance, *args, **kwargs): + """Add fixed_ip from specified network to given instance.""" + super(ComputeCellsAPI, self).add_fixed_ip(context, instance, + *args, **kwargs) + self._cast_to_cells(context, instance, 'add_fixed_ip', + *args, **kwargs) + + @validate_cell + def remove_fixed_ip(self, context, instance, *args, **kwargs): + """Remove fixed_ip from specified network to given instance.""" + super(ComputeCellsAPI, self).remove_fixed_ip(context, instance, + *args, **kwargs) + self._cast_to_cells(context, instance, 'remove_fixed_ip', + *args, **kwargs) + + @validate_cell + def pause(self, context, instance): + """Pause the given instance.""" + super(ComputeCellsAPI, self).pause(context, instance) + self._cast_to_cells(context, instance, 'pause') + + @validate_cell + def unpause(self, context, instance): + """Unpause the given instance.""" + super(ComputeCellsAPI, self).unpause(context, instance) + self._cast_to_cells(context, instance, 'unpause') + + def set_host_enabled(self, context, host, enabled): + """Sets the specified host's ability to accept new instances.""" + # FIXME(comstud): Since there's no instance here, we have no + # idea which cell should be the target. + pass + + def host_power_action(self, context, host, action): + """Reboots, shuts down or powers up the host.""" + # FIXME(comstud): Since there's no instance here, we have no + # idea which cell should be the target. + pass + + def get_diagnostics(self, context, instance): + """Retrieve diagnostics for the given instance.""" + # FIXME(comstud): Cache this? + # Also: only calling super() to get state/policy checking + super(ComputeCellsAPI, self).get_diagnostics(context, instance) + return self._call_to_cells(context, instance, 'get_diagnostics') + + @validate_cell + def suspend(self, context, instance): + """Suspend the given instance.""" + super(ComputeCellsAPI, self).suspend(context, instance) + self._cast_to_cells(context, instance, 'suspend') + + @validate_cell + def resume(self, context, instance): + """Resume the given instance.""" + super(ComputeCellsAPI, self).resume(context, instance) + self._cast_to_cells(context, instance, 'resume') + + @validate_cell + def rescue(self, context, instance, rescue_password=None): + """Rescue the given instance.""" + super(ComputeCellsAPI, self).rescue(context, instance, + rescue_password=rescue_password) + self._cast_to_cells(context, instance, 'rescue', + rescue_password=rescue_password) + + @validate_cell + def unrescue(self, context, instance): + """Unrescue the given instance.""" + super(ComputeCellsAPI, self).unrescue(context, instance) + self._cast_to_cells(context, instance, 'unrescue') + + @validate_cell + def set_admin_password(self, context, instance, password=None): + """Set the root/admin password for the given instance.""" + super(ComputeCellsAPI, self).set_admin_password(context, instance, + password=password) + self._cast_to_cells(context, instance, 'set_admin_password', + password=password) + + @validate_cell + def inject_file(self, context, instance, *args, **kwargs): + """Write a file to the given instance.""" + super(ComputeCellsAPI, self).inject_file(context, instance, *args, + **kwargs) + self._cast_to_cells(context, instance, 'inject_file', *args, **kwargs) + + @wrap_check_policy + @validate_cell + def get_vnc_console(self, context, instance, console_type): + """Get a url to a VNC Console.""" + if not instance['host']: + raise exception.InstanceNotReady(instance_id=instance['uuid']) + + connect_info = self._call_to_cells(context, instance, + 'get_vnc_connect_info', console_type) + + self.consoleauth_rpcapi.authorize_console(context, + connect_info['token'], console_type, connect_info['host'], + connect_info['port'], connect_info['internal_access_path']) + return {'url': connect_info['access_url']} + + @validate_cell + def get_console_output(self, context, instance, *args, **kwargs): + """Get console output for an an instance.""" + # NOTE(comstud): Calling super() just to get policy check + super(ComputeCellsAPI, self).get_console_output(context, instance, + *args, **kwargs) + return self._call_to_cells(context, instance, 'get_console_output', + *args, **kwargs) + + def lock(self, context, instance): + """Lock the given instance.""" + super(ComputeCellsAPI, self).lock(context, instance) + self._cast_to_cells(context, instance, 'lock') + + def unlock(self, context, instance): + """Unlock the given instance.""" + super(ComputeCellsAPI, self).lock(context, instance) + self._cast_to_cells(context, instance, 'unlock') + + @validate_cell + def reset_network(self, context, instance): + """Reset networking on the instance.""" + super(ComputeCellsAPI, self).reset_network(context, instance) + self._cast_to_cells(context, instance, 'reset_network') + + @validate_cell + def inject_network_info(self, context, instance): + """Inject network info for the instance.""" + super(ComputeCellsAPI, self).inject_network_info(context, instance) + self._cast_to_cells(context, instance, 'inject_network_info') + + @wrap_check_policy + @validate_cell + def attach_volume(self, context, instance, volume_id, device=None): + """Attach an existing volume to an existing instance.""" + if device and not block_device.match_device(device): + raise exception.InvalidDevicePath(path=device) + device = self.compute_rpcapi.reserve_block_device_name( + context, device=device, instance=instance, volume_id=volume_id) + try: + volume = self.volume_api.get(context, volume_id) + self.volume_api.check_attach(context, volume) + except Exception: + with excutils.save_and_reraise_exception(): + self.db.block_device_mapping_destroy_by_instance_and_device( + context, instance['uuid'], device) + self._cast_to_cells(context, instance, 'attach_volume', + volume_id, device) + + @check_instance_lock + @validate_cell + def _detach_volume(self, context, instance, volume_id): + """Detach a volume from an instance.""" + check_policy(context, 'detach_volume', instance) + + volume = self.volume_api.get(context, volume_id) + self.volume_api.check_detach(context, volume) + self._cast_to_cells(context, instance, 'detach_volume', + volume_id) + + @wrap_check_policy + @validate_cell + def associate_floating_ip(self, context, instance, address): + """Makes calls to network_api to associate_floating_ip. + + :param address: is a string floating ip address + """ + self._cast_to_cells(context, instance, 'associate_floating_ip', + address) + + @validate_cell + def delete_instance_metadata(self, context, instance, key): + """Delete the given metadata item from an instance.""" + super(ComputeCellsAPI, self).delete_instance_metadata(context, + instance, key) + self._cast_to_cells(context, instance, 'delete_instance_metadata', + key) + + @wrap_check_policy + @validate_cell + def update_instance_metadata(self, context, instance, + metadata, delete=False): + rv = super(ComputeCellsAPI, self).update_instance_metadata(context, + instance, metadata, delete=delete) + try: + self._cast_to_cells(context, instance, + 'update_instance_metadata', + metadata, delete=delete) + except exception.InstanceUnknownCell: + pass + return rv diff --git a/nova/db/api.py b/nova/db/api.py index 3e350fc75..1322c29e9 100644 --- a/nova/db/api.py +++ b/nova/db/api.py @@ -43,8 +43,10 @@ these objects be simple dictionaries. """ +from nova.cells import rpcapi as cells_rpcapi from nova import exception from nova.openstack.common import cfg +from nova.openstack.common import log as logging from nova import utils @@ -68,6 +70,7 @@ CONF.register_opts(db_opts) IMPL = utils.LazyPluggable('db_backend', sqlalchemy='nova.db.sqlalchemy.api') +LOG = logging.getLogger(__name__) class NoMoreNetworks(exception.NovaException): @@ -566,9 +569,16 @@ def instance_data_get_for_project(context, project_id, session=None): session=session) -def instance_destroy(context, instance_uuid, constraint=None): +def instance_destroy(context, instance_uuid, constraint=None, + update_cells=True): """Destroy the instance or raise if it does not exist.""" - return IMPL.instance_destroy(context, instance_uuid, constraint) + rv = IMPL.instance_destroy(context, instance_uuid, constraint) + if update_cells: + try: + cells_rpcapi.CellsAPI().instance_destroy_at_top(context, rv) + except Exception: + LOG.exception(_("Failed to notify cells of instance destroy")) + return rv def instance_get_by_uuid(context, uuid): @@ -665,13 +675,19 @@ def instance_test_and_set(context, instance_uuid, attr, ok_states, ok_states, new_state) -def instance_update(context, instance_uuid, values): +def instance_update(context, instance_uuid, values, update_cells=True): """Set the given properties on an instance and update it. Raises NotFound if instance does not exist. """ - return IMPL.instance_update(context, instance_uuid, values) + rv = IMPL.instance_update(context, instance_uuid, values) + if update_cells: + try: + cells_rpcapi.CellsAPI().instance_update_at_top(context, rv) + except Exception: + LOG.exception(_("Failed to notify cells of instance update")) + return rv def instance_update_and_get_original(context, instance_uuid, values): @@ -687,8 +703,12 @@ def instance_update_and_get_original(context, instance_uuid, values): Raises NotFound if instance does not exist. """ - return IMPL.instance_update_and_get_original(context, instance_uuid, - values) + rv = IMPL.instance_update_and_get_original(context, instance_uuid, values) + try: + cells_rpcapi.CellsAPI().instance_update_at_top(context, rv[1]) + except Exception: + LOG.exception(_("Failed to notify cells of instance update")) + return rv def instance_add_security_group(context, instance_id, security_group_id): @@ -714,13 +734,21 @@ def instance_info_cache_get(context, instance_uuid): return IMPL.instance_info_cache_get(context, instance_uuid) -def instance_info_cache_update(context, instance_uuid, values): +def instance_info_cache_update(context, instance_uuid, values, + update_cells=True): """Update an instance info cache record in the table. :param instance_uuid: = uuid of info cache's instance :param values: = dict containing column values to update """ - return IMPL.instance_info_cache_update(context, instance_uuid, values) + rv = IMPL.instance_info_cache_update(context, instance_uuid, values) + try: + cells_rpcapi.CellsAPI().instance_info_cache_update_at_top(context, + rv) + except Exception: + LOG.exception(_("Failed to notify cells of instance info cache " + "update")) + return rv def instance_info_cache_delete(context, instance_uuid): @@ -1354,7 +1382,7 @@ def instance_metadata_delete(context, instance_uuid, key): def instance_metadata_update(context, instance_uuid, metadata, delete): """Update metadata if it exists, otherwise create it.""" return IMPL.instance_metadata_update(context, instance_uuid, - metadata, delete) + metadata, delete) #################### @@ -1414,12 +1442,21 @@ def bw_usage_get_by_uuids(context, uuids, start_period): def bw_usage_update(context, uuid, mac, start_period, bw_in, bw_out, - last_ctr_in, last_ctr_out, last_refreshed=None): + last_ctr_in, last_ctr_out, last_refreshed=None, + update_cells=True): """Update cached bandwidth usage for an instance's network based on mac address. Creates new record if needed. """ - return IMPL.bw_usage_update(context, uuid, mac, start_period, bw_in, + rv = IMPL.bw_usage_update(context, uuid, mac, start_period, bw_in, bw_out, last_ctr_in, last_ctr_out, last_refreshed=last_refreshed) + if update_cells: + try: + cells_rpcapi.CellsAPI().bw_usage_update_at_top(context, + uuid, mac, start_period, bw_in, bw_out, + last_ctr_in, last_ctr_out, last_refreshed) + except Exception: + LOG.exception(_("Failed to notify cells of bw_usage update")) + return rv #################### @@ -1555,9 +1592,15 @@ def aggregate_host_delete(context, aggregate_id, host): #################### -def instance_fault_create(context, values): +def instance_fault_create(context, values, update_cells=True): """Create a new Instance Fault.""" - return IMPL.instance_fault_create(context, values) + rv = IMPL.instance_fault_create(context, values) + if update_cells: + try: + cells_rpcapi.CellsAPI().instance_fault_create_at_top(context, rv) + except Exception: + LOG.exception(_("Failed to notify cells of instance fault")) + return rv def instance_fault_get_by_instance_uuids(context, instance_uuids): diff --git a/nova/exception.py b/nova/exception.py index ee0a88a95..c484b5120 100644 --- a/nova/exception.py +++ b/nova/exception.py @@ -769,6 +769,34 @@ class CellNotFound(NotFound): message = _("Cell %(cell_id)s could not be found.") +class CellRoutingInconsistency(NovaException): + message = _("Inconsistency in cell routing: %(reason)s") + + +class CellServiceAPIMethodNotFound(NotFound): + message = _("Service API method not found: %(detail)s") + + +class CellTimeout(NotFound): + message = _("Timeout waiting for response from cell") + + +class CellMaxHopCountReached(NovaException): + message = _("Cell message has reached maximum hop count: %(hop_count)s") + + +class NoCellsAvailable(NovaException): + message = _("No cells available matching scheduling criteria.") + + +class CellError(NovaException): + message = _("Exception received during cell processing: %(exc_name)s.") + + +class InstanceUnknownCell(NotFound): + message = _("Cell is not known for instance %(instance_uuid)s") + + class SchedulerHostFilterNotFound(NotFound): message = _("Scheduler Host Filter %(filter_name)s could not be found.") diff --git a/nova/tests/cells/__init__.py b/nova/tests/cells/__init__.py new file mode 100644 index 000000000..d1bf725f7 --- /dev/null +++ b/nova/tests/cells/__init__.py @@ -0,0 +1,19 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2012 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# NOTE(vish): this forces the fixtures from tests/__init.py:setup() to work +from nova.tests import * diff --git a/nova/tests/cells/fakes.py b/nova/tests/cells/fakes.py new file mode 100644 index 000000000..a9de530d1 --- /dev/null +++ b/nova/tests/cells/fakes.py @@ -0,0 +1,191 @@ +# Copyright (c) 2012 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +Fakes For Cells tests. +""" + +from nova.cells import driver +from nova.cells import manager as cells_manager +from nova.cells import messaging +from nova.cells import state as cells_state +import nova.db +from nova.db import base +from nova.openstack.common import cfg + +CONF = cfg.CONF +CONF.import_opt('name', 'nova.cells.opts', group='cells') + + +# Fake Cell Hierarchy +FAKE_TOP_LEVEL_CELL_NAME = 'api-cell' +FAKE_CELL_LAYOUT = [{'child-cell1': []}, + {'child-cell2': [{'grandchild-cell1': []}]}, + {'child-cell3': [{'grandchild-cell2': []}, + {'grandchild-cell3': []}]}, + {'child-cell4': []}] + +# build_cell_stub_infos() below will take the above layout and create +# a fake view of the DB from the perspective of each of the cells. +# For each cell, a CellStubInfo will be created with this info. +CELL_NAME_TO_STUB_INFO = {} + + +class FakeDBApi(object): + def __init__(self, cell_db_entries): + self.cell_db_entries = cell_db_entries + + def __getattr__(self, key): + return getattr(nova.db, key) + + def cell_get_all(self, ctxt): + return self.cell_db_entries + + def compute_node_get_all(self, ctxt): + return [] + + +class FakeCellsDriver(driver.BaseCellsDriver): + pass + + +class FakeCellState(cells_state.CellState): + def send_message(self, message): + message_runner = get_message_runner(self.name) + orig_ctxt = message.ctxt + json_message = message.to_json() + message = message_runner.message_from_json(json_message) + # Restore this so we can use mox and verify same context + message.ctxt = orig_ctxt + message.process() + + +class FakeCellStateManager(cells_state.CellStateManager): + def __init__(self, *args, **kwargs): + super(FakeCellStateManager, self).__init__(*args, + cell_state_cls=FakeCellState, **kwargs) + + +class FakeCellsManager(cells_manager.CellsManager): + def __init__(self, *args, **kwargs): + super(FakeCellsManager, self).__init__(*args, + cell_state_manager=FakeCellStateManager, + **kwargs) + + +class CellStubInfo(object): + def __init__(self, test_case, cell_name, db_entries): + self.test_case = test_case + self.cell_name = cell_name + self.db_entries = db_entries + + def fake_base_init(_self, *args, **kwargs): + _self.db = FakeDBApi(db_entries) + + test_case.stubs.Set(base.Base, '__init__', fake_base_init) + self.cells_manager = FakeCellsManager() + # Fix the cell name, as it normally uses CONF.cells.name + msg_runner = self.cells_manager.msg_runner + msg_runner.our_name = self.cell_name + self.cells_manager.state_manager.my_cell_state.name = self.cell_name + + +def _build_cell_stub_info(test_case, our_name, parent_path, children): + cell_db_entries = [] + cur_db_id = 1 + sep_char = messaging._PATH_CELL_SEP + if parent_path: + cell_db_entries.append( + dict(id=cur_db_id, + name=parent_path.split(sep_char)[-1], + is_parent=True, + username='username%s' % cur_db_id, + password='password%s' % cur_db_id, + rpc_host='rpc_host%s' % cur_db_id, + rpc_port='rpc_port%s' % cur_db_id, + rpc_virtual_host='rpc_vhost%s' % cur_db_id)) + cur_db_id += 1 + our_path = parent_path + sep_char + our_name + else: + our_path = our_name + for child in children: + for child_name, grandchildren in child.items(): + _build_cell_stub_info(test_case, child_name, our_path, + grandchildren) + cell_entry = dict(id=cur_db_id, + name=child_name, + username='username%s' % cur_db_id, + password='password%s' % cur_db_id, + rpc_host='rpc_host%s' % cur_db_id, + rpc_port='rpc_port%s' % cur_db_id, + rpc_virtual_host='rpc_vhost%s' % cur_db_id, + is_parent=False) + cell_db_entries.append(cell_entry) + cur_db_id += 1 + stub_info = CellStubInfo(test_case, our_name, cell_db_entries) + CELL_NAME_TO_STUB_INFO[our_name] = stub_info + + +def _build_cell_stub_infos(test_case): + _build_cell_stub_info(test_case, FAKE_TOP_LEVEL_CELL_NAME, '', + FAKE_CELL_LAYOUT) + + +def init(test_case): + global CELL_NAME_TO_STUB_INFO + test_case.flags(driver='nova.tests.cells.fakes.FakeCellsDriver', + group='cells') + CELL_NAME_TO_STUB_INFO = {} + _build_cell_stub_infos(test_case) + + +def _get_cell_stub_info(cell_name): + return CELL_NAME_TO_STUB_INFO[cell_name] + + +def get_state_manager(cell_name): + return _get_cell_stub_info(cell_name).cells_manager.state_manager + + +def get_cell_state(cur_cell_name, tgt_cell_name): + state_manager = get_state_manager(cur_cell_name) + cell = state_manager.child_cells.get(tgt_cell_name) + if cell is None: + cell = state_manager.parent_cells.get(tgt_cell_name) + return cell + + +def get_cells_manager(cell_name): + return _get_cell_stub_info(cell_name).cells_manager + + +def get_message_runner(cell_name): + return _get_cell_stub_info(cell_name).cells_manager.msg_runner + + +def stub_tgt_method(test_case, cell_name, method_name, method): + msg_runner = get_message_runner(cell_name) + tgt_msg_methods = msg_runner.methods_by_type['targeted'] + setattr(tgt_msg_methods, method_name, method) + + +def stub_bcast_method(test_case, cell_name, method_name, method): + msg_runner = get_message_runner(cell_name) + tgt_msg_methods = msg_runner.methods_by_type['broadcast'] + setattr(tgt_msg_methods, method_name, method) + + +def stub_bcast_methods(test_case, method_name, method): + for cell_name in CELL_NAME_TO_STUB_INFO.keys(): + stub_bcast_method(test_case, cell_name, method_name, method) diff --git a/nova/tests/cells/test_cells_manager.py b/nova/tests/cells/test_cells_manager.py new file mode 100644 index 000000000..5a2b83145 --- /dev/null +++ b/nova/tests/cells/test_cells_manager.py @@ -0,0 +1,151 @@ +# Copyright (c) 2012 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +Tests For CellsManager +""" +from nova.cells import messaging +from nova import context +from nova import test +from nova.tests.cells import fakes + + +class CellsManagerClassTestCase(test.TestCase): + """Test case for CellsManager class""" + + def setUp(self): + super(CellsManagerClassTestCase, self).setUp() + fakes.init(self) + # pick a child cell to use for tests. + self.our_cell = 'grandchild-cell1' + self.cells_manager = fakes.get_cells_manager(self.our_cell) + self.msg_runner = self.cells_manager.msg_runner + self.driver = self.cells_manager.driver + self.ctxt = 'fake_context' + + def test_post_start_hook_child_cell(self): + self.mox.StubOutWithMock(self.driver, 'start_consumers') + self.mox.StubOutWithMock(context, 'get_admin_context') + self.mox.StubOutWithMock(self.cells_manager, '_update_our_parents') + + self.driver.start_consumers(self.msg_runner) + context.get_admin_context().AndReturn(self.ctxt) + self.cells_manager._update_our_parents(self.ctxt) + self.mox.ReplayAll() + self.cells_manager.post_start_hook() + + def test_post_start_hook_middle_cell(self): + cells_manager = fakes.get_cells_manager('child-cell2') + msg_runner = cells_manager.msg_runner + driver = cells_manager.driver + + self.mox.StubOutWithMock(driver, 'start_consumers') + self.mox.StubOutWithMock(context, 'get_admin_context') + self.mox.StubOutWithMock(msg_runner, + 'ask_children_for_capabilities') + self.mox.StubOutWithMock(msg_runner, + 'ask_children_for_capacities') + + driver.start_consumers(msg_runner) + context.get_admin_context().AndReturn(self.ctxt) + msg_runner.ask_children_for_capabilities(self.ctxt) + msg_runner.ask_children_for_capacities(self.ctxt) + self.mox.ReplayAll() + cells_manager.post_start_hook() + + def test_update_our_parents(self): + self.mox.StubOutWithMock(self.msg_runner, + 'tell_parents_our_capabilities') + self.mox.StubOutWithMock(self.msg_runner, + 'tell_parents_our_capacities') + + self.msg_runner.tell_parents_our_capabilities(self.ctxt) + self.msg_runner.tell_parents_our_capacities(self.ctxt) + self.mox.ReplayAll() + self.cells_manager._update_our_parents(self.ctxt) + + def test_schedule_run_instance(self): + host_sched_kwargs = 'fake_host_sched_kwargs_silently_passed' + self.mox.StubOutWithMock(self.msg_runner, 'schedule_run_instance') + our_cell = self.msg_runner.state_manager.get_my_state() + self.msg_runner.schedule_run_instance(self.ctxt, our_cell, + host_sched_kwargs) + self.mox.ReplayAll() + self.cells_manager.schedule_run_instance(self.ctxt, + host_sched_kwargs=host_sched_kwargs) + + def test_run_compute_api_method(self): + # Args should just be silently passed through + cell_name = 'fake-cell-name' + method_info = 'fake-method-info' + + fake_response = messaging.Response('fake', 'fake', False) + + self.mox.StubOutWithMock(self.msg_runner, + 'run_compute_api_method') + self.mox.StubOutWithMock(fake_response, + 'value_or_raise') + self.msg_runner.run_compute_api_method(self.ctxt, + cell_name, + method_info, + True).AndReturn(fake_response) + fake_response.value_or_raise().AndReturn('fake-response') + self.mox.ReplayAll() + response = self.cells_manager.run_compute_api_method( + self.ctxt, cell_name=cell_name, method_info=method_info, + call=True) + self.assertEqual('fake-response', response) + + def test_instance_update_at_top(self): + self.mox.StubOutWithMock(self.msg_runner, 'instance_update_at_top') + self.msg_runner.instance_update_at_top(self.ctxt, 'fake-instance') + self.mox.ReplayAll() + self.cells_manager.instance_update_at_top(self.ctxt, + instance='fake-instance') + + def test_instance_destroy_at_top(self): + self.mox.StubOutWithMock(self.msg_runner, 'instance_destroy_at_top') + self.msg_runner.instance_destroy_at_top(self.ctxt, 'fake-instance') + self.mox.ReplayAll() + self.cells_manager.instance_destroy_at_top(self.ctxt, + instance='fake-instance') + + def test_instance_delete_everywhere(self): + self.mox.StubOutWithMock(self.msg_runner, + 'instance_delete_everywhere') + self.msg_runner.instance_delete_everywhere(self.ctxt, + 'fake-instance', + 'fake-type') + self.mox.ReplayAll() + self.cells_manager.instance_delete_everywhere( + self.ctxt, instance='fake-instance', + delete_type='fake-type') + + def test_instance_fault_create_at_top(self): + self.mox.StubOutWithMock(self.msg_runner, + 'instance_fault_create_at_top') + self.msg_runner.instance_fault_create_at_top(self.ctxt, + 'fake-fault') + self.mox.ReplayAll() + self.cells_manager.instance_fault_create_at_top( + self.ctxt, instance_fault='fake-fault') + + def test_bw_usage_update_at_top(self): + self.mox.StubOutWithMock(self.msg_runner, + 'bw_usage_update_at_top') + self.msg_runner.bw_usage_update_at_top(self.ctxt, + 'fake-bw-info') + self.mox.ReplayAll() + self.cells_manager.bw_usage_update_at_top( + self.ctxt, bw_update_info='fake-bw-info') diff --git a/nova/tests/cells/test_cells_messaging.py b/nova/tests/cells/test_cells_messaging.py new file mode 100644 index 000000000..d728c9474 --- /dev/null +++ b/nova/tests/cells/test_cells_messaging.py @@ -0,0 +1,913 @@ +# Copyright (c) 2012 Rackspace Hosting # All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +Tests For Cells Messaging module +""" + +from nova.cells import messaging +from nova import context +from nova import exception +from nova.openstack.common import cfg +from nova import test +from nova.tests.cells import fakes + + +CONF = cfg.CONF +CONF.import_opt('host', 'nova.config') +CONF.import_opt('name', 'nova.cells.opts', group='cells') +CONF.import_opt('allowed_rpc_exception_modules', + 'nova.openstack.common.rpc') + + +class CellsMessageClassesTestCase(test.TestCase): + """Test case for the main Cells Message classes.""" + def setUp(self): + super(CellsMessageClassesTestCase, self).setUp() + fakes.init(self) + self.ctxt = context.RequestContext('fake', 'fake') + # Need to be able to deserialize test.TestingException. + allowed_modules = CONF.allowed_rpc_exception_modules + allowed_modules.append('nova.test') + self.flags(allowed_rpc_exception_modules=allowed_modules) + self.our_name = 'api-cell' + self.msg_runner = fakes.get_message_runner(self.our_name) + self.state_manager = self.msg_runner.state_manager + + def test_reverse_path(self): + path = 'a!b!c!d' + expected = 'd!c!b!a' + rev_path = messaging._reverse_path(path) + self.assertEqual(rev_path, expected) + + def test_response_cell_name_from_path(self): + # test array with tuples of inputs/expected outputs + test_paths = [('cell1', 'cell1'), + ('cell1!cell2', 'cell2!cell1'), + ('cell1!cell2!cell3', 'cell3!cell2!cell1')] + + for test_input, expected_output in test_paths: + self.assertEqual(expected_output, + messaging._response_cell_name_from_path(test_input)) + + def test_response_cell_name_from_path_neighbor_only(self): + # test array with tuples of inputs/expected outputs + test_paths = [('cell1', 'cell1'), + ('cell1!cell2', 'cell2!cell1'), + ('cell1!cell2!cell3', 'cell3!cell2')] + + for test_input, expected_output in test_paths: + self.assertEqual(expected_output, + messaging._response_cell_name_from_path(test_input, + neighbor_only=True)) + + def test_targeted_message(self): + self.flags(max_hop_count=99, group='cells') + target_cell = 'api-cell!child-cell2!grandchild-cell1' + method = 'fake_method' + method_kwargs = dict(arg1=1, arg2=2) + direction = 'down' + tgt_message = messaging._TargetedMessage(self.msg_runner, + self.ctxt, method, + method_kwargs, direction, + target_cell) + self.assertEqual(self.ctxt, tgt_message.ctxt) + self.assertEqual(method, tgt_message.method_name) + self.assertEqual(method_kwargs, tgt_message.method_kwargs) + self.assertEqual(direction, tgt_message.direction) + self.assertEqual(target_cell, target_cell) + self.assertFalse(tgt_message.fanout) + self.assertFalse(tgt_message.need_response) + self.assertEqual(self.our_name, tgt_message.routing_path) + self.assertEqual(1, tgt_message.hop_count) + self.assertEqual(99, tgt_message.max_hop_count) + self.assertFalse(tgt_message.is_broadcast) + # Correct next hop? + next_hop = tgt_message._get_next_hop() + child_cell = self.state_manager.get_child_cell('child-cell2') + self.assertEqual(child_cell, next_hop) + + def test_create_targeted_message_with_response(self): + self.flags(max_hop_count=99, group='cells') + our_name = 'child-cell1' + target_cell = 'child-cell1!api-cell' + msg_runner = fakes.get_message_runner(our_name) + method = 'fake_method' + method_kwargs = dict(arg1=1, arg2=2) + direction = 'up' + tgt_message = messaging._TargetedMessage(msg_runner, + self.ctxt, method, + method_kwargs, direction, + target_cell, + need_response=True) + self.assertEqual(self.ctxt, tgt_message.ctxt) + self.assertEqual(method, tgt_message.method_name) + self.assertEqual(method_kwargs, tgt_message.method_kwargs) + self.assertEqual(direction, tgt_message.direction) + self.assertEqual(target_cell, target_cell) + self.assertFalse(tgt_message.fanout) + self.assertTrue(tgt_message.need_response) + self.assertEqual(our_name, tgt_message.routing_path) + self.assertEqual(1, tgt_message.hop_count) + self.assertEqual(99, tgt_message.max_hop_count) + self.assertFalse(tgt_message.is_broadcast) + # Correct next hop? + next_hop = tgt_message._get_next_hop() + parent_cell = msg_runner.state_manager.get_parent_cell('api-cell') + self.assertEqual(parent_cell, next_hop) + + def test_targeted_message_when_target_is_cell_state(self): + method = 'fake_method' + method_kwargs = dict(arg1=1, arg2=2) + direction = 'down' + target_cell = self.state_manager.get_child_cell('child-cell2') + tgt_message = messaging._TargetedMessage(self.msg_runner, + self.ctxt, method, + method_kwargs, direction, + target_cell) + self.assertEqual('api-cell!child-cell2', tgt_message.target_cell) + # Correct next hop? + next_hop = tgt_message._get_next_hop() + self.assertEqual(target_cell, next_hop) + + def test_targeted_message_when_target_cell_state_is_me(self): + method = 'fake_method' + method_kwargs = dict(arg1=1, arg2=2) + direction = 'down' + target_cell = self.state_manager.get_my_state() + tgt_message = messaging._TargetedMessage(self.msg_runner, + self.ctxt, method, + method_kwargs, direction, + target_cell) + self.assertEqual('api-cell', tgt_message.target_cell) + # Correct next hop? + next_hop = tgt_message._get_next_hop() + self.assertEqual(target_cell, next_hop) + + def test_create_broadcast_message(self): + self.flags(max_hop_count=99, group='cells') + self.flags(name='api-cell', max_hop_count=99, group='cells') + method = 'fake_method' + method_kwargs = dict(arg1=1, arg2=2) + direction = 'down' + bcast_message = messaging._BroadcastMessage(self.msg_runner, + self.ctxt, method, + method_kwargs, direction) + self.assertEqual(self.ctxt, bcast_message.ctxt) + self.assertEqual(method, bcast_message.method_name) + self.assertEqual(method_kwargs, bcast_message.method_kwargs) + self.assertEqual(direction, bcast_message.direction) + self.assertFalse(bcast_message.fanout) + self.assertFalse(bcast_message.need_response) + self.assertEqual(self.our_name, bcast_message.routing_path) + self.assertEqual(1, bcast_message.hop_count) + self.assertEqual(99, bcast_message.max_hop_count) + self.assertTrue(bcast_message.is_broadcast) + # Correct next hops? + next_hops = bcast_message._get_next_hops() + child_cells = self.state_manager.get_child_cells() + self.assertEqual(child_cells, next_hops) + + def test_create_broadcast_message_with_response(self): + self.flags(max_hop_count=99, group='cells') + our_name = 'child-cell1' + msg_runner = fakes.get_message_runner(our_name) + method = 'fake_method' + method_kwargs = dict(arg1=1, arg2=2) + direction = 'up' + bcast_message = messaging._BroadcastMessage(msg_runner, self.ctxt, + method, method_kwargs, direction, need_response=True) + self.assertEqual(self.ctxt, bcast_message.ctxt) + self.assertEqual(method, bcast_message.method_name) + self.assertEqual(method_kwargs, bcast_message.method_kwargs) + self.assertEqual(direction, bcast_message.direction) + self.assertFalse(bcast_message.fanout) + self.assertTrue(bcast_message.need_response) + self.assertEqual(our_name, bcast_message.routing_path) + self.assertEqual(1, bcast_message.hop_count) + self.assertEqual(99, bcast_message.max_hop_count) + self.assertTrue(bcast_message.is_broadcast) + # Correct next hops? + next_hops = bcast_message._get_next_hops() + parent_cells = msg_runner.state_manager.get_parent_cells() + self.assertEqual(parent_cells, next_hops) + + def test_self_targeted_message(self): + target_cell = 'api-cell' + method = 'our_fake_method' + method_kwargs = dict(arg1=1, arg2=2) + direction = 'down' + + call_info = {} + + def our_fake_method(message, **kwargs): + call_info['context'] = message.ctxt + call_info['routing_path'] = message.routing_path + call_info['kwargs'] = kwargs + + fakes.stub_tgt_method(self, 'api-cell', 'our_fake_method', + our_fake_method) + + tgt_message = messaging._TargetedMessage(self.msg_runner, + self.ctxt, method, + method_kwargs, direction, + target_cell) + tgt_message.process() + + self.assertEqual(self.ctxt, call_info['context']) + self.assertEqual(method_kwargs, call_info['kwargs']) + self.assertEqual(target_cell, call_info['routing_path']) + + def test_child_targeted_message(self): + target_cell = 'api-cell!child-cell1' + method = 'our_fake_method' + method_kwargs = dict(arg1=1, arg2=2) + direction = 'down' + + call_info = {} + + def our_fake_method(message, **kwargs): + call_info['context'] = message.ctxt + call_info['routing_path'] = message.routing_path + call_info['kwargs'] = kwargs + + fakes.stub_tgt_method(self, 'child-cell1', 'our_fake_method', + our_fake_method) + + tgt_message = messaging._TargetedMessage(self.msg_runner, + self.ctxt, method, + method_kwargs, direction, + target_cell) + tgt_message.process() + + self.assertEqual(self.ctxt, call_info['context']) + self.assertEqual(method_kwargs, call_info['kwargs']) + self.assertEqual(target_cell, call_info['routing_path']) + + def test_grandchild_targeted_message(self): + target_cell = 'api-cell!child-cell2!grandchild-cell1' + method = 'our_fake_method' + method_kwargs = dict(arg1=1, arg2=2) + direction = 'down' + + call_info = {} + + def our_fake_method(message, **kwargs): + call_info['context'] = message.ctxt + call_info['routing_path'] = message.routing_path + call_info['kwargs'] = kwargs + + fakes.stub_tgt_method(self, 'grandchild-cell1', 'our_fake_method', + our_fake_method) + + tgt_message = messaging._TargetedMessage(self.msg_runner, + self.ctxt, method, + method_kwargs, direction, + target_cell) + tgt_message.process() + + self.assertEqual(self.ctxt, call_info['context']) + self.assertEqual(method_kwargs, call_info['kwargs']) + self.assertEqual(target_cell, call_info['routing_path']) + + def test_grandchild_targeted_message_with_response(self): + target_cell = 'api-cell!child-cell2!grandchild-cell1' + method = 'our_fake_method' + method_kwargs = dict(arg1=1, arg2=2) + direction = 'down' + + call_info = {} + + def our_fake_method(message, **kwargs): + call_info['context'] = message.ctxt + call_info['routing_path'] = message.routing_path + call_info['kwargs'] = kwargs + return 'our_fake_response' + + fakes.stub_tgt_method(self, 'grandchild-cell1', 'our_fake_method', + our_fake_method) + + tgt_message = messaging._TargetedMessage(self.msg_runner, + self.ctxt, method, + method_kwargs, direction, + target_cell, + need_response=True) + response = tgt_message.process() + + self.assertEqual(self.ctxt, call_info['context']) + self.assertEqual(method_kwargs, call_info['kwargs']) + self.assertEqual(target_cell, call_info['routing_path']) + self.assertFalse(response.failure) + self.assertTrue(response.value_or_raise(), 'our_fake_response') + + def test_grandchild_targeted_message_with_error(self): + target_cell = 'api-cell!child-cell2!grandchild-cell1' + method = 'our_fake_method' + method_kwargs = dict(arg1=1, arg2=2) + direction = 'down' + + def our_fake_method(message, **kwargs): + raise test.TestingException('this should be returned') + + fakes.stub_tgt_method(self, 'grandchild-cell1', 'our_fake_method', + our_fake_method) + + tgt_message = messaging._TargetedMessage(self.msg_runner, + self.ctxt, method, + method_kwargs, direction, + target_cell, + need_response=True) + response = tgt_message.process() + self.assertTrue(response.failure) + self.assertRaises(test.TestingException, response.value_or_raise) + + def test_grandchild_targeted_message_max_hops(self): + self.flags(max_hop_count=2, group='cells') + target_cell = 'api-cell!child-cell2!grandchild-cell1' + method = 'our_fake_method' + method_kwargs = dict(arg1=1, arg2=2) + direction = 'down' + + def our_fake_method(message, **kwargs): + raise test.TestingException('should not be reached') + + fakes.stub_tgt_method(self, 'grandchild-cell1', 'our_fake_method', + our_fake_method) + + tgt_message = messaging._TargetedMessage(self.msg_runner, + self.ctxt, method, + method_kwargs, direction, + target_cell, + need_response=True) + response = tgt_message.process() + self.assertTrue(response.failure) + self.assertRaises(exception.CellMaxHopCountReached, + response.value_or_raise) + + def test_targeted_message_invalid_cell(self): + target_cell = 'api-cell!child-cell2!grandchild-cell4' + method = 'our_fake_method' + method_kwargs = dict(arg1=1, arg2=2) + direction = 'down' + + tgt_message = messaging._TargetedMessage(self.msg_runner, + self.ctxt, method, + method_kwargs, direction, + target_cell, + need_response=True) + response = tgt_message.process() + self.assertTrue(response.failure) + self.assertRaises(exception.CellRoutingInconsistency, + response.value_or_raise) + + def test_targeted_message_invalid_cell2(self): + target_cell = 'unknown-cell!child-cell2' + method = 'our_fake_method' + method_kwargs = dict(arg1=1, arg2=2) + direction = 'down' + + tgt_message = messaging._TargetedMessage(self.msg_runner, + self.ctxt, method, + method_kwargs, direction, + target_cell, + need_response=True) + response = tgt_message.process() + self.assertTrue(response.failure) + self.assertRaises(exception.CellRoutingInconsistency, + response.value_or_raise) + + def test_broadcast_routing(self): + method = 'our_fake_method' + method_kwargs = dict(arg1=1, arg2=2) + direction = 'down' + + cells = set() + + def our_fake_method(message, **kwargs): + cells.add(message.routing_path) + + fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method) + + bcast_message = messaging._BroadcastMessage(self.msg_runner, + self.ctxt, method, + method_kwargs, + direction, + run_locally=True) + bcast_message.process() + # fakes creates 8 cells (including ourself). + self.assertEqual(len(cells), 8) + + def test_broadcast_routing_up(self): + method = 'our_fake_method' + method_kwargs = dict(arg1=1, arg2=2) + direction = 'up' + msg_runner = fakes.get_message_runner('grandchild-cell3') + + cells = set() + + def our_fake_method(message, **kwargs): + cells.add(message.routing_path) + + fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method) + + bcast_message = messaging._BroadcastMessage(msg_runner, self.ctxt, + method, method_kwargs, + direction, + run_locally=True) + bcast_message.process() + # Paths are reversed, since going 'up' + expected = set(['grandchild-cell3', 'grandchild-cell3!child-cell3', + 'grandchild-cell3!child-cell3!api-cell']) + self.assertEqual(expected, cells) + + def test_broadcast_routing_without_ourselves(self): + method = 'our_fake_method' + method_kwargs = dict(arg1=1, arg2=2) + direction = 'down' + + cells = set() + + def our_fake_method(message, **kwargs): + cells.add(message.routing_path) + + fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method) + + bcast_message = messaging._BroadcastMessage(self.msg_runner, + self.ctxt, method, + method_kwargs, + direction, + run_locally=False) + bcast_message.process() + # fakes creates 8 cells (including ourself). So we should see + # only 7 here. + self.assertEqual(len(cells), 7) + + def test_broadcast_routing_with_response(self): + method = 'our_fake_method' + method_kwargs = dict(arg1=1, arg2=2) + direction = 'down' + + def our_fake_method(message, **kwargs): + return 'response-%s' % message.routing_path + + fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method) + + bcast_message = messaging._BroadcastMessage(self.msg_runner, + self.ctxt, method, + method_kwargs, + direction, + run_locally=True, + need_response=True) + responses = bcast_message.process() + self.assertEqual(len(responses), 8) + for response in responses: + self.assertFalse(response.failure) + self.assertEqual('response-%s' % response.cell_name, + response.value_or_raise()) + + def test_broadcast_routing_with_response_max_hops(self): + self.flags(max_hop_count=2, group='cells') + method = 'our_fake_method' + method_kwargs = dict(arg1=1, arg2=2) + direction = 'down' + + def our_fake_method(message, **kwargs): + return 'response-%s' % message.routing_path + + fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method) + + bcast_message = messaging._BroadcastMessage(self.msg_runner, + self.ctxt, method, + method_kwargs, + direction, + run_locally=True, + need_response=True) + responses = bcast_message.process() + # Should only get responses from our immediate children (and + # ourselves) + self.assertEqual(len(responses), 5) + for response in responses: + self.assertFalse(response.failure) + self.assertEqual('response-%s' % response.cell_name, + response.value_or_raise()) + + def test_broadcast_routing_with_all_erroring(self): + method = 'our_fake_method' + method_kwargs = dict(arg1=1, arg2=2) + direction = 'down' + + def our_fake_method(message, **kwargs): + raise test.TestingException('fake failure') + + fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method) + + bcast_message = messaging._BroadcastMessage(self.msg_runner, + self.ctxt, method, + method_kwargs, + direction, + run_locally=True, + need_response=True) + responses = bcast_message.process() + self.assertEqual(len(responses), 8) + for response in responses: + self.assertTrue(response.failure) + self.assertRaises(test.TestingException, response.value_or_raise) + + def test_broadcast_routing_with_two_erroring(self): + method = 'our_fake_method' + method_kwargs = dict(arg1=1, arg2=2) + direction = 'down' + + def our_fake_method_failing(message, **kwargs): + raise test.TestingException('fake failure') + + def our_fake_method(message, **kwargs): + return 'response-%s' % message.routing_path + + fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method) + fakes.stub_bcast_method(self, 'child-cell2', 'our_fake_method', + our_fake_method_failing) + fakes.stub_bcast_method(self, 'grandchild-cell3', 'our_fake_method', + our_fake_method_failing) + + bcast_message = messaging._BroadcastMessage(self.msg_runner, + self.ctxt, method, + method_kwargs, + direction, + run_locally=True, + need_response=True) + responses = bcast_message.process() + self.assertEqual(len(responses), 8) + failure_responses = [resp for resp in responses if resp.failure] + success_responses = [resp for resp in responses if not resp.failure] + self.assertEqual(len(failure_responses), 2) + self.assertEqual(len(success_responses), 6) + + for response in success_responses: + self.assertFalse(response.failure) + self.assertEqual('response-%s' % response.cell_name, + response.value_or_raise()) + + for response in failure_responses: + self.assertIn(response.cell_name, ['api-cell!child-cell2', + 'api-cell!child-cell3!grandchild-cell3']) + self.assertTrue(response.failure) + self.assertRaises(test.TestingException, response.value_or_raise) + + +class CellsTargetedMethodsTestCase(test.TestCase): + """Test case for _TargetedMessageMethods class. Most of these + tests actually test the full path from the MessageRunner through + to the functionality of the message method. Hits 2 birds with 1 + stone, even though it's a little more than a unit test. + """ + def setUp(self): + super(CellsTargetedMethodsTestCase, self).setUp() + fakes.init(self) + self.ctxt = context.RequestContext('fake', 'fake') + self._setup_attrs('api-cell', 'api-cell!child-cell2') + + def _setup_attrs(self, source_cell, target_cell): + self.tgt_cell_name = target_cell + self.src_msg_runner = fakes.get_message_runner(source_cell) + self.src_state_manager = self.src_msg_runner.state_manager + tgt_shortname = target_cell.split('!')[-1] + self.tgt_cell_mgr = fakes.get_cells_manager(tgt_shortname) + self.tgt_msg_runner = self.tgt_cell_mgr.msg_runner + self.tgt_scheduler = self.tgt_msg_runner.scheduler + self.tgt_state_manager = self.tgt_msg_runner.state_manager + methods_cls = self.tgt_msg_runner.methods_by_type['targeted'] + self.tgt_methods_cls = methods_cls + self.tgt_compute_api = methods_cls.compute_api + self.tgt_db_inst = methods_cls.db + + def test_schedule_run_instance(self): + host_sched_kwargs = {'filter_properties': {}, + 'key1': 'value1', + 'key2': 'value2'} + self.mox.StubOutWithMock(self.tgt_scheduler, 'run_instance') + self.tgt_scheduler.run_instance(self.ctxt, host_sched_kwargs) + self.mox.ReplayAll() + self.src_msg_runner.schedule_run_instance(self.ctxt, + self.tgt_cell_name, + host_sched_kwargs) + + def test_call_compute_api_method(self): + + instance_uuid = 'fake_instance_uuid' + method_info = {'method': 'reboot', + 'method_args': (instance_uuid, 2, 3), + 'method_kwargs': {'arg1': 'val1', 'arg2': 'val2'}} + self.mox.StubOutWithMock(self.tgt_compute_api, 'reboot') + self.mox.StubOutWithMock(self.tgt_db_inst, 'instance_get_by_uuid') + + self.tgt_db_inst.instance_get_by_uuid(self.ctxt, + instance_uuid).AndReturn( + 'fake_instance') + self.tgt_compute_api.reboot(self.ctxt, 'fake_instance', 2, 3, + arg1='val1', arg2='val2').AndReturn('fake_result') + self.mox.ReplayAll() + + response = self.src_msg_runner.run_compute_api_method( + self.ctxt, + self.tgt_cell_name, + method_info, + True) + result = response.value_or_raise() + self.assertEqual('fake_result', result) + + def test_call_compute_api_method_unknown_instance(self): + # Unknown instance should send a broadcast up that instance + # is gone. + instance_uuid = 'fake_instance_uuid' + instance = {'uuid': instance_uuid} + method_info = {'method': 'reboot', + 'method_args': (instance_uuid, 2, 3), + 'method_kwargs': {'arg1': 'val1', 'arg2': 'val2'}} + + self.mox.StubOutWithMock(self.tgt_db_inst, 'instance_get_by_uuid') + self.mox.StubOutWithMock(self.tgt_msg_runner, + 'instance_destroy_at_top') + + self.tgt_db_inst.instance_get_by_uuid(self.ctxt, + 'fake_instance_uuid').AndRaise( + exception.InstanceNotFound(instance_id=instance_uuid)) + self.tgt_msg_runner.instance_destroy_at_top(self.ctxt, instance) + + self.mox.ReplayAll() + + response = self.src_msg_runner.run_compute_api_method( + self.ctxt, + self.tgt_cell_name, + method_info, + True) + self.assertRaises(exception.InstanceNotFound, + response.value_or_raise) + + def test_update_capabilities(self): + # Route up to API + self._setup_attrs('child-cell2', 'child-cell2!api-cell') + capabs = {'cap1': set(['val1', 'val2']), + 'cap2': set(['val3'])} + # The list(set([])) seems silly, but we can't assume the order + # of the list... This behavior should match the code we're + # testing... which is check that a set was converted to a list. + expected_capabs = {'cap1': list(set(['val1', 'val2'])), + 'cap2': ['val3']} + self.mox.StubOutWithMock(self.src_state_manager, + 'get_our_capabilities') + self.mox.StubOutWithMock(self.tgt_state_manager, + 'update_cell_capabilities') + self.mox.StubOutWithMock(self.tgt_msg_runner, + 'tell_parents_our_capabilities') + self.src_state_manager.get_our_capabilities().AndReturn(capabs) + self.tgt_state_manager.update_cell_capabilities('child-cell2', + expected_capabs) + self.tgt_msg_runner.tell_parents_our_capabilities(self.ctxt) + + self.mox.ReplayAll() + + self.src_msg_runner.tell_parents_our_capabilities(self.ctxt) + + def test_update_capacities(self): + self._setup_attrs('child-cell2', 'child-cell2!api-cell') + capacs = 'fake_capacs' + self.mox.StubOutWithMock(self.src_state_manager, + 'get_our_capacities') + self.mox.StubOutWithMock(self.tgt_state_manager, + 'update_cell_capacities') + self.mox.StubOutWithMock(self.tgt_msg_runner, + 'tell_parents_our_capacities') + self.src_state_manager.get_our_capacities().AndReturn(capacs) + self.tgt_state_manager.update_cell_capacities('child-cell2', + capacs) + self.tgt_msg_runner.tell_parents_our_capacities(self.ctxt) + + self.mox.ReplayAll() + + self.src_msg_runner.tell_parents_our_capacities(self.ctxt) + + def test_announce_capabilities(self): + self._setup_attrs('api-cell', 'api-cell!child-cell1') + # To make this easier to test, make us only have 1 child cell. + cell_state = self.src_state_manager.child_cells['child-cell1'] + self.src_state_manager.child_cells = {'child-cell1': cell_state} + + self.mox.StubOutWithMock(self.tgt_msg_runner, + 'tell_parents_our_capabilities') + self.tgt_msg_runner.tell_parents_our_capabilities(self.ctxt) + + self.mox.ReplayAll() + + self.src_msg_runner.ask_children_for_capabilities(self.ctxt) + + def test_announce_capacities(self): + self._setup_attrs('api-cell', 'api-cell!child-cell1') + # To make this easier to test, make us only have 1 child cell. + cell_state = self.src_state_manager.child_cells['child-cell1'] + self.src_state_manager.child_cells = {'child-cell1': cell_state} + + self.mox.StubOutWithMock(self.tgt_msg_runner, + 'tell_parents_our_capacities') + self.tgt_msg_runner.tell_parents_our_capacities(self.ctxt) + + self.mox.ReplayAll() + + self.src_msg_runner.ask_children_for_capacities(self.ctxt) + + +class CellsBroadcastMethodsTestCase(test.TestCase): + """Test case for _BroadcastMessageMethods class. Most of these + tests actually test the full path from the MessageRunner through + to the functionality of the message method. Hits 2 birds with 1 + stone, even though it's a little more than a unit test. + """ + + def setUp(self): + super(CellsBroadcastMethodsTestCase, self).setUp() + fakes.init(self) + self.ctxt = context.RequestContext('fake', 'fake') + self._setup_attrs() + + def _setup_attrs(self, up=True): + mid_cell = 'child-cell2' + if up: + src_cell = 'grandchild-cell1' + tgt_cell = 'api-cell' + else: + src_cell = 'api-cell' + tgt_cell = 'grandchild-cell1' + + self.src_msg_runner = fakes.get_message_runner(src_cell) + methods_cls = self.src_msg_runner.methods_by_type['broadcast'] + self.src_methods_cls = methods_cls + self.src_db_inst = methods_cls.db + self.src_compute_api = methods_cls.compute_api + + self.mid_msg_runner = fakes.get_message_runner(mid_cell) + methods_cls = self.mid_msg_runner.methods_by_type['broadcast'] + self.mid_methods_cls = methods_cls + self.mid_db_inst = methods_cls.db + self.mid_compute_api = methods_cls.compute_api + + self.tgt_msg_runner = fakes.get_message_runner(tgt_cell) + methods_cls = self.tgt_msg_runner.methods_by_type['broadcast'] + self.tgt_methods_cls = methods_cls + self.tgt_db_inst = methods_cls.db + self.tgt_compute_api = methods_cls.compute_api + + def test_at_the_top(self): + self.assertTrue(self.tgt_methods_cls._at_the_top()) + self.assertFalse(self.mid_methods_cls._at_the_top()) + self.assertFalse(self.src_methods_cls._at_the_top()) + + def test_instance_update_at_top(self): + fake_info_cache = {'id': 1, + 'instance': 'fake_instance', + 'other': 'moo'} + fake_sys_metadata = [{'id': 1, + 'key': 'key1', + 'value': 'value1'}, + {'id': 2, + 'key': 'key2', + 'value': 'value2'}] + fake_instance = {'id': 2, + 'uuid': 'fake_uuid', + 'security_groups': 'fake', + 'instance_type': 'fake', + 'volumes': 'fake', + 'cell_name': 'fake', + 'name': 'fake', + 'metadata': 'fake', + 'info_cache': fake_info_cache, + 'system_metadata': fake_sys_metadata, + 'other': 'meow'} + expected_sys_metadata = {'key1': 'value1', + 'key2': 'value2'} + expected_info_cache = {'other': 'moo'} + expected_instance = {'system_metadata': expected_sys_metadata, + 'other': 'meow', + 'uuid': 'fake_uuid'} + + # To show these should not be called in src/mid-level cell + self.mox.StubOutWithMock(self.src_db_inst, 'instance_update') + self.mox.StubOutWithMock(self.src_db_inst, + 'instance_info_cache_update') + self.mox.StubOutWithMock(self.mid_db_inst, 'instance_update') + self.mox.StubOutWithMock(self.mid_db_inst, + 'instance_info_cache_update') + + self.mox.StubOutWithMock(self.tgt_db_inst, 'instance_update') + self.mox.StubOutWithMock(self.tgt_db_inst, + 'instance_info_cache_update') + self.tgt_db_inst.instance_update(self.ctxt, 'fake_uuid', + expected_instance, + update_cells=False) + self.tgt_db_inst.instance_info_cache_update(self.ctxt, 'fake_uuid', + expected_info_cache, + update_cells=False) + self.mox.ReplayAll() + + self.src_msg_runner.instance_update_at_top(self.ctxt, fake_instance) + + def test_instance_destroy_at_top(self): + fake_instance = {'uuid': 'fake_uuid'} + + # To show these should not be called in src/mid-level cell + self.mox.StubOutWithMock(self.src_db_inst, 'instance_destroy') + + self.mox.StubOutWithMock(self.tgt_db_inst, 'instance_destroy') + self.tgt_db_inst.instance_destroy(self.ctxt, 'fake_uuid', + update_cells=False) + self.mox.ReplayAll() + + self.src_msg_runner.instance_destroy_at_top(self.ctxt, fake_instance) + + def test_instance_hard_delete_everywhere(self): + # Reset this, as this is a broadcast down. + self._setup_attrs(up=False) + instance = {'uuid': 'meow'} + + # Should not be called in src (API cell) + self.mox.StubOutWithMock(self.src_compute_api, 'delete') + + self.mox.StubOutWithMock(self.mid_compute_api, 'delete') + self.mox.StubOutWithMock(self.tgt_compute_api, 'delete') + + self.mid_compute_api.delete(self.ctxt, instance) + self.tgt_compute_api.delete(self.ctxt, instance) + + self.mox.ReplayAll() + + self.src_msg_runner.instance_delete_everywhere(self.ctxt, + instance, 'hard') + + def test_instance_soft_delete_everywhere(self): + # Reset this, as this is a broadcast down. + self._setup_attrs(up=False) + instance = {'uuid': 'meow'} + + # Should not be called in src (API cell) + self.mox.StubOutWithMock(self.src_compute_api, 'soft_delete') + + self.mox.StubOutWithMock(self.mid_compute_api, 'soft_delete') + self.mox.StubOutWithMock(self.tgt_compute_api, 'soft_delete') + + self.mid_compute_api.soft_delete(self.ctxt, instance) + self.tgt_compute_api.soft_delete(self.ctxt, instance) + + self.mox.ReplayAll() + + self.src_msg_runner.instance_delete_everywhere(self.ctxt, + instance, 'soft') + + def test_instance_fault_create_at_top(self): + fake_instance_fault = {'id': 1, + 'other stuff': 2, + 'more stuff': 3} + expected_instance_fault = {'other stuff': 2, + 'more stuff': 3} + + # Shouldn't be called for these 2 cells + self.mox.StubOutWithMock(self.src_db_inst, 'instance_fault_create') + self.mox.StubOutWithMock(self.mid_db_inst, 'instance_fault_create') + + self.mox.StubOutWithMock(self.tgt_db_inst, 'instance_fault_create') + self.tgt_db_inst.instance_fault_create(self.ctxt, + expected_instance_fault) + self.mox.ReplayAll() + + self.src_msg_runner.instance_fault_create_at_top(self.ctxt, + fake_instance_fault) + + def test_bw_usage_update_at_top(self): + fake_bw_update_info = {'uuid': 'fake_uuid', + 'mac': 'fake_mac', + 'start_period': 'fake_start_period', + 'bw_in': 'fake_bw_in', + 'bw_out': 'fake_bw_out', + 'last_ctr_in': 'fake_last_ctr_in', + 'last_ctr_out': 'fake_last_ctr_out', + 'last_refreshed': 'fake_last_refreshed'} + + # Shouldn't be called for these 2 cells + self.mox.StubOutWithMock(self.src_db_inst, 'bw_usage_update') + self.mox.StubOutWithMock(self.mid_db_inst, 'bw_usage_update') + + self.mox.StubOutWithMock(self.tgt_db_inst, 'bw_usage_update') + self.tgt_db_inst.bw_usage_update(self.ctxt, **fake_bw_update_info) + + self.mox.ReplayAll() + + self.src_msg_runner.bw_usage_update_at_top(self.ctxt, + fake_bw_update_info) diff --git a/nova/tests/cells/test_cells_rpc_driver.py b/nova/tests/cells/test_cells_rpc_driver.py new file mode 100644 index 000000000..a44fe9376 --- /dev/null +++ b/nova/tests/cells/test_cells_rpc_driver.py @@ -0,0 +1,218 @@ +# Copyright (c) 2012 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +Tests For Cells RPC Communication Driver +""" + +from nova.cells import messaging +from nova.cells import rpc_driver +from nova import context +from nova.openstack.common import cfg +from nova.openstack.common import rpc +from nova.openstack.common.rpc import dispatcher as rpc_dispatcher +from nova import test +from nova.tests.cells import fakes + +CONF = cfg.CONF +CONF.import_opt('rpc_driver_queue_base', 'nova.cells.rpc_driver', + group='cells') + + +class CellsRPCDriverTestCase(test.TestCase): + """Test case for Cells communication via RPC.""" + + def setUp(self): + super(CellsRPCDriverTestCase, self).setUp() + fakes.init(self) + self.ctxt = context.RequestContext('fake', 'fake') + self.driver = rpc_driver.CellsRPCDriver() + + def test_start_consumers(self): + self.flags(rpc_driver_queue_base='cells.intercell42', group='cells') + rpc_consumers = [] + rpc_conns = [] + fake_msg_runner = fakes.get_message_runner('api-cell') + call_info = {} + + class FakeInterCellRPCDispatcher(object): + def __init__(_self, msg_runner): + self.assertEqual(fake_msg_runner, msg_runner) + call_info['intercell_dispatcher'] = _self + + class FakeRPCDispatcher(object): + def __init__(_self, proxy_objs): + self.assertEqual([call_info['intercell_dispatcher']], + proxy_objs) + call_info['rpc_dispatcher'] = _self + + class FakeRPCConn(object): + def create_consumer(_self, topic, proxy_obj, **kwargs): + self.assertEqual(call_info['rpc_dispatcher'], proxy_obj) + rpc_consumers.append((topic, kwargs)) + + def consume_in_thread(_self): + pass + + def _fake_create_connection(new): + self.assertTrue(new) + fake_conn = FakeRPCConn() + rpc_conns.append(fake_conn) + return fake_conn + + self.stubs.Set(rpc, 'create_connection', _fake_create_connection) + self.stubs.Set(rpc_driver, 'InterCellRPCDispatcher', + FakeInterCellRPCDispatcher) + self.stubs.Set(rpc_dispatcher, 'RpcDispatcher', FakeRPCDispatcher) + + self.driver.start_consumers(fake_msg_runner) + + for message_type in ['broadcast', 'response', 'targeted']: + topic = 'cells.intercell42.' + message_type + self.assertIn((topic, {'fanout': True}), rpc_consumers) + self.assertIn((topic, {'fanout': False}), rpc_consumers) + self.assertEqual(rpc_conns, self.driver.rpc_connections) + + def test_stop_consumers(self): + call_info = {'closed': []} + + class FakeRPCConn(object): + def close(self): + call_info['closed'].append(self) + + fake_conns = [FakeRPCConn() for x in xrange(5)] + self.driver.rpc_connections = fake_conns + self.driver.stop_consumers() + self.assertEqual(fake_conns, call_info['closed']) + + def test_send_message_to_cell_cast(self): + msg_runner = fakes.get_message_runner('api-cell') + cell_state = fakes.get_cell_state('api-cell', 'child-cell2') + message = messaging._TargetedMessage(msg_runner, + self.ctxt, 'fake', 'fake', 'down', cell_state, fanout=False) + + call_info = {} + + def _fake_make_msg(method, **kwargs): + call_info['rpc_method'] = method + call_info['rpc_kwargs'] = kwargs + return 'fake-message' + + def _fake_cast_to_server(*args, **kwargs): + call_info['cast_args'] = args + call_info['cast_kwargs'] = kwargs + + self.stubs.Set(rpc, 'cast_to_server', _fake_cast_to_server) + self.stubs.Set(self.driver.intercell_rpcapi, 'make_msg', + _fake_make_msg) + self.stubs.Set(self.driver.intercell_rpcapi, 'cast_to_server', + _fake_cast_to_server) + + self.driver.send_message_to_cell(cell_state, message) + expected_server_params = {'hostname': 'rpc_host2', + 'password': 'password2', + 'port': 'rpc_port2', + 'username': 'username2', + 'virtual_host': 'rpc_vhost2'} + expected_cast_args = (self.ctxt, expected_server_params, + 'fake-message') + expected_cast_kwargs = {'topic': 'cells.intercell.targeted'} + expected_rpc_kwargs = {'message': message.to_json()} + self.assertEqual(expected_cast_args, call_info['cast_args']) + self.assertEqual(expected_cast_kwargs, call_info['cast_kwargs']) + self.assertEqual('process_message', call_info['rpc_method']) + self.assertEqual(expected_rpc_kwargs, call_info['rpc_kwargs']) + + def test_send_message_to_cell_fanout_cast(self): + msg_runner = fakes.get_message_runner('api-cell') + cell_state = fakes.get_cell_state('api-cell', 'child-cell2') + message = messaging._TargetedMessage(msg_runner, + self.ctxt, 'fake', 'fake', 'down', cell_state, fanout=True) + + call_info = {} + + def _fake_make_msg(method, **kwargs): + call_info['rpc_method'] = method + call_info['rpc_kwargs'] = kwargs + return 'fake-message' + + def _fake_fanout_cast_to_server(*args, **kwargs): + call_info['cast_args'] = args + call_info['cast_kwargs'] = kwargs + + self.stubs.Set(rpc, 'fanout_cast_to_server', + _fake_fanout_cast_to_server) + self.stubs.Set(self.driver.intercell_rpcapi, 'make_msg', + _fake_make_msg) + self.stubs.Set(self.driver.intercell_rpcapi, + 'fanout_cast_to_server', _fake_fanout_cast_to_server) + + self.driver.send_message_to_cell(cell_state, message) + expected_server_params = {'hostname': 'rpc_host2', + 'password': 'password2', + 'port': 'rpc_port2', + 'username': 'username2', + 'virtual_host': 'rpc_vhost2'} + expected_cast_args = (self.ctxt, expected_server_params, + 'fake-message') + expected_cast_kwargs = {'topic': 'cells.intercell.targeted'} + expected_rpc_kwargs = {'message': message.to_json()} + self.assertEqual(expected_cast_args, call_info['cast_args']) + self.assertEqual(expected_cast_kwargs, call_info['cast_kwargs']) + self.assertEqual('process_message', call_info['rpc_method']) + self.assertEqual(expected_rpc_kwargs, call_info['rpc_kwargs']) + + def test_rpc_topic_uses_message_type(self): + self.flags(rpc_driver_queue_base='cells.intercell42', group='cells') + msg_runner = fakes.get_message_runner('api-cell') + cell_state = fakes.get_cell_state('api-cell', 'child-cell2') + message = messaging._BroadcastMessage(msg_runner, + self.ctxt, 'fake', 'fake', 'down', fanout=True) + message.message_type = 'fake-message-type' + + call_info = {} + + def _fake_fanout_cast_to_server(*args, **kwargs): + call_info['topic'] = kwargs.get('topic') + + self.stubs.Set(self.driver.intercell_rpcapi, + 'fanout_cast_to_server', _fake_fanout_cast_to_server) + + self.driver.send_message_to_cell(cell_state, message) + self.assertEqual('cells.intercell42.fake-message-type', + call_info['topic']) + + def test_process_message(self): + msg_runner = fakes.get_message_runner('api-cell') + dispatcher = rpc_driver.InterCellRPCDispatcher(msg_runner) + message = messaging._BroadcastMessage(msg_runner, + self.ctxt, 'fake', 'fake', 'down', fanout=True) + + call_info = {} + + def _fake_message_from_json(json_message): + call_info['json_message'] = json_message + self.assertEqual(message.to_json(), json_message) + return message + + def _fake_process(): + call_info['process_called'] = True + + self.stubs.Set(msg_runner, 'message_from_json', + _fake_message_from_json) + self.stubs.Set(message, 'process', _fake_process) + + dispatcher.process_message(self.ctxt, message.to_json()) + self.assertEqual(message.to_json(), call_info['json_message']) + self.assertTrue(call_info['process_called']) diff --git a/nova/tests/cells/test_cells_rpcapi.py b/nova/tests/cells/test_cells_rpcapi.py new file mode 100644 index 000000000..b51bfa0c1 --- /dev/null +++ b/nova/tests/cells/test_cells_rpcapi.py @@ -0,0 +1,206 @@ +# Copyright (c) 2012 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +Tests For Cells RPCAPI +""" + +from nova.cells import rpcapi as cells_rpcapi +from nova.openstack.common import cfg +from nova.openstack.common import rpc +from nova import test + +CONF = cfg.CONF +CONF.import_opt('topic', 'nova.cells.opts', group='cells') + + +class CellsAPITestCase(test.TestCase): + """Test case for cells.api interfaces.""" + + def setUp(self): + super(CellsAPITestCase, self).setUp() + self.fake_topic = 'fake_topic' + self.fake_context = 'fake_context' + self.flags(topic=self.fake_topic, enable=True, group='cells') + self.cells_rpcapi = cells_rpcapi.CellsAPI() + + def _stub_rpc_method(self, rpc_method, result): + call_info = {} + + def fake_rpc_method(ctxt, topic, msg, *args, **kwargs): + call_info['context'] = ctxt + call_info['topic'] = topic + call_info['msg'] = msg + return result + + self.stubs.Set(rpc, rpc_method, fake_rpc_method) + return call_info + + def _check_result(self, call_info, method, args, version=None): + if version is None: + version = self.cells_rpcapi.BASE_RPC_API_VERSION + self.assertEqual(self.fake_context, call_info['context']) + self.assertEqual(self.fake_topic, call_info['topic']) + self.assertEqual(method, call_info['msg']['method']) + self.assertEqual(version, call_info['msg']['version']) + self.assertEqual(args, call_info['msg']['args']) + + def test_cast_compute_api_method(self): + fake_cell_name = 'fake_cell_name' + fake_method = 'fake_method' + fake_method_args = (1, 2) + fake_method_kwargs = {'kwarg1': 10, 'kwarg2': 20} + + expected_method_info = {'method': fake_method, + 'method_args': fake_method_args, + 'method_kwargs': fake_method_kwargs} + expected_args = {'method_info': expected_method_info, + 'cell_name': fake_cell_name, + 'call': False} + + call_info = self._stub_rpc_method('cast', None) + + self.cells_rpcapi.cast_compute_api_method(self.fake_context, + fake_cell_name, fake_method, + *fake_method_args, **fake_method_kwargs) + self._check_result(call_info, 'run_compute_api_method', + expected_args) + + def test_call_compute_api_method(self): + fake_cell_name = 'fake_cell_name' + fake_method = 'fake_method' + fake_method_args = (1, 2) + fake_method_kwargs = {'kwarg1': 10, 'kwarg2': 20} + fake_response = 'fake_response' + + expected_method_info = {'method': fake_method, + 'method_args': fake_method_args, + 'method_kwargs': fake_method_kwargs} + expected_args = {'method_info': expected_method_info, + 'cell_name': fake_cell_name, + 'call': True} + + call_info = self._stub_rpc_method('call', fake_response) + + result = self.cells_rpcapi.call_compute_api_method(self.fake_context, + fake_cell_name, fake_method, + *fake_method_args, **fake_method_kwargs) + self._check_result(call_info, 'run_compute_api_method', + expected_args) + self.assertEqual(fake_response, result) + + def test_schedule_run_instance(self): + call_info = self._stub_rpc_method('cast', None) + + self.cells_rpcapi.schedule_run_instance( + self.fake_context, arg1=1, arg2=2, arg3=3) + + expected_args = {'host_sched_kwargs': {'arg1': 1, + 'arg2': 2, + 'arg3': 3}} + self._check_result(call_info, 'schedule_run_instance', + expected_args) + + def test_instance_update_at_top(self): + fake_info_cache = {'id': 1, + 'instance': 'fake_instance', + 'other': 'moo'} + fake_sys_metadata = [{'id': 1, + 'key': 'key1', + 'value': 'value1'}, + {'id': 2, + 'key': 'key2', + 'value': 'value2'}] + fake_instance = {'id': 2, + 'security_groups': 'fake', + 'instance_type': 'fake', + 'volumes': 'fake', + 'cell_name': 'fake', + 'name': 'fake', + 'metadata': 'fake', + 'info_cache': fake_info_cache, + 'system_metadata': fake_sys_metadata, + 'other': 'meow'} + + call_info = self._stub_rpc_method('cast', None) + + self.cells_rpcapi.instance_update_at_top( + self.fake_context, fake_instance) + + expected_args = {'instance': fake_instance} + self._check_result(call_info, 'instance_update_at_top', + expected_args) + + def test_instance_destroy_at_top(self): + fake_instance = {'uuid': 'fake-uuid'} + + call_info = self._stub_rpc_method('cast', None) + + self.cells_rpcapi.instance_destroy_at_top( + self.fake_context, fake_instance) + + expected_args = {'instance': fake_instance} + self._check_result(call_info, 'instance_destroy_at_top', + expected_args) + + def test_instance_delete_everywhere(self): + fake_instance = {'uuid': 'fake-uuid'} + + call_info = self._stub_rpc_method('cast', None) + + self.cells_rpcapi.instance_delete_everywhere( + self.fake_context, fake_instance, + 'fake-type') + + expected_args = {'instance': fake_instance, + 'delete_type': 'fake-type'} + self._check_result(call_info, 'instance_delete_everywhere', + expected_args) + + def test_instance_fault_create_at_top(self): + fake_instance_fault = {'id': 2, + 'other': 'meow'} + + call_info = self._stub_rpc_method('cast', None) + + self.cells_rpcapi.instance_fault_create_at_top( + self.fake_context, fake_instance_fault) + + expected_args = {'instance_fault': fake_instance_fault} + self._check_result(call_info, 'instance_fault_create_at_top', + expected_args) + + def test_bw_usage_update_at_top(self): + update_args = ('fake_uuid', 'fake_mac', 'fake_start_period', + 'fake_bw_in', 'fake_bw_out', 'fake_ctr_in', + 'fake_ctr_out') + update_kwargs = {'last_refreshed': 'fake_refreshed'} + + call_info = self._stub_rpc_method('cast', None) + + self.cells_rpcapi.bw_usage_update_at_top( + self.fake_context, *update_args, **update_kwargs) + + bw_update_info = {'uuid': 'fake_uuid', + 'mac': 'fake_mac', + 'start_period': 'fake_start_period', + 'bw_in': 'fake_bw_in', + 'bw_out': 'fake_bw_out', + 'last_ctr_in': 'fake_ctr_in', + 'last_ctr_out': 'fake_ctr_out', + 'last_refreshed': 'fake_refreshed'} + + expected_args = {'bw_update_info': bw_update_info} + self._check_result(call_info, 'bw_usage_update_at_top', + expected_args) diff --git a/nova/tests/cells/test_cells_scheduler.py b/nova/tests/cells/test_cells_scheduler.py new file mode 100644 index 000000000..66e7e245e --- /dev/null +++ b/nova/tests/cells/test_cells_scheduler.py @@ -0,0 +1,206 @@ +# Copyright (c) 2012 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +Tests For CellsScheduler +""" +import time + +from nova.compute import vm_states +from nova import context +from nova import db +from nova import exception +from nova.openstack.common import cfg +from nova.openstack.common import uuidutils +from nova import test +from nova.tests.cells import fakes + +CONF = cfg.CONF +CONF.import_opt('scheduler_retries', 'nova.cells.scheduler', group='cells') + + +class CellsSchedulerTestCase(test.TestCase): + """Test case for CellsScheduler class""" + + def setUp(self): + super(CellsSchedulerTestCase, self).setUp() + fakes.init(self) + self.msg_runner = fakes.get_message_runner('api-cell') + self.scheduler = self.msg_runner.scheduler + self.state_manager = self.msg_runner.state_manager + self.my_cell_state = self.state_manager.get_my_state() + self.ctxt = context.RequestContext('fake', 'fake') + instance_uuids = [] + for x in xrange(3): + instance_uuids.append(uuidutils.generate_uuid()) + self.instance_uuids = instance_uuids + self.request_spec = {'instance_uuids': instance_uuids, + 'other': 'stuff'} + + def test_create_instances_here(self): + # Just grab the first instance type + inst_type = db.instance_type_get(self.ctxt, 1) + image = {'properties': {}} + instance_props = {'hostname': 'meow', + 'display_name': 'moo', + 'image_ref': 'fake_image_ref', + 'user_id': self.ctxt.user_id, + 'project_id': self.ctxt.project_id} + request_spec = {'instance_type': inst_type, + 'image': image, + 'security_group': ['default'], + 'block_device_mapping': [], + 'instance_properties': instance_props, + 'instance_uuids': self.instance_uuids} + + call_info = {'uuids': []} + + def _fake_instance_update_at_top(_ctxt, instance): + call_info['uuids'].append(instance['uuid']) + + self.stubs.Set(self.msg_runner, 'instance_update_at_top', + _fake_instance_update_at_top) + + self.scheduler._create_instances_here(self.ctxt, request_spec) + self.assertEqual(self.instance_uuids, call_info['uuids']) + + for instance_uuid in self.instance_uuids: + instance = db.instance_get_by_uuid(self.ctxt, instance_uuid) + self.assertEqual('meow', instance['hostname']) + self.assertEqual('moo', instance['display_name']) + self.assertEqual('fake_image_ref', instance['image_ref']) + + def test_run_instance_selects_child_cell(self): + # Make sure there's no capacity info so we're sure to + # select a child cell + our_cell_info = self.state_manager.get_my_state() + our_cell_info.capacities = {} + + call_info = {'times': 0} + + orig_fn = self.msg_runner.schedule_run_instance + + def msg_runner_schedule_run_instance(ctxt, target_cell, + host_sched_kwargs): + # This gets called twice. Once for our running it + # in this cell.. and then it'll get called when the + # child cell is picked. So, first time.. just run it + # like normal. + if not call_info['times']: + call_info['times'] += 1 + return orig_fn(ctxt, target_cell, host_sched_kwargs) + call_info['ctxt'] = ctxt + call_info['target_cell'] = target_cell + call_info['host_sched_kwargs'] = host_sched_kwargs + + self.stubs.Set(self.msg_runner, 'schedule_run_instance', + msg_runner_schedule_run_instance) + + host_sched_kwargs = {'request_spec': self.request_spec} + self.msg_runner.schedule_run_instance(self.ctxt, + self.my_cell_state, host_sched_kwargs) + + self.assertEqual(self.ctxt, call_info['ctxt']) + self.assertEqual(host_sched_kwargs, call_info['host_sched_kwargs']) + child_cells = self.state_manager.get_child_cells() + self.assertIn(call_info['target_cell'], child_cells) + + def test_run_instance_selects_current_cell(self): + # Make sure there's no child cells so that we will be + # selected + self.state_manager.child_cells = {} + + call_info = {} + + def fake_create_instances_here(ctxt, request_spec): + call_info['ctxt'] = ctxt + call_info['request_spec'] = request_spec + + def fake_rpc_run_instance(ctxt, **host_sched_kwargs): + call_info['host_sched_kwargs'] = host_sched_kwargs + + self.stubs.Set(self.scheduler, '_create_instances_here', + fake_create_instances_here) + self.stubs.Set(self.scheduler.scheduler_rpcapi, + 'run_instance', fake_rpc_run_instance) + + host_sched_kwargs = {'request_spec': self.request_spec, + 'other': 'stuff'} + self.msg_runner.schedule_run_instance(self.ctxt, + self.my_cell_state, host_sched_kwargs) + + self.assertEqual(self.ctxt, call_info['ctxt']) + self.assertEqual(self.request_spec, call_info['request_spec']) + self.assertEqual(host_sched_kwargs, call_info['host_sched_kwargs']) + + def test_run_instance_retries_when_no_cells_avail(self): + self.flags(scheduler_retries=7, group='cells') + + host_sched_kwargs = {'request_spec': self.request_spec} + + call_info = {'num_tries': 0, 'errored_uuids': []} + + def fake_run_instance(message, host_sched_kwargs): + call_info['num_tries'] += 1 + raise exception.NoCellsAvailable() + + def fake_sleep(_secs): + return + + def fake_instance_update(ctxt, instance_uuid, values): + self.assertEqual(vm_states.ERROR, values['vm_state']) + call_info['errored_uuids'].append(instance_uuid) + + self.stubs.Set(self.scheduler, '_run_instance', fake_run_instance) + self.stubs.Set(time, 'sleep', fake_sleep) + self.stubs.Set(db, 'instance_update', fake_instance_update) + + self.msg_runner.schedule_run_instance(self.ctxt, + self.my_cell_state, host_sched_kwargs) + + self.assertEqual(8, call_info['num_tries']) + self.assertEqual(self.instance_uuids, call_info['errored_uuids']) + + def test_run_instance_on_random_exception(self): + self.flags(scheduler_retries=7, group='cells') + + host_sched_kwargs = {'request_spec': self.request_spec} + + call_info = {'num_tries': 0, + 'errored_uuids1': [], + 'errored_uuids2': []} + + def fake_run_instance(message, host_sched_kwargs): + call_info['num_tries'] += 1 + raise test.TestingException() + + def fake_instance_update(ctxt, instance_uuid, values): + self.assertEqual(vm_states.ERROR, values['vm_state']) + call_info['errored_uuids1'].append(instance_uuid) + + def fake_instance_update_at_top(ctxt, instance): + self.assertEqual(vm_states.ERROR, instance['vm_state']) + call_info['errored_uuids2'].append(instance['uuid']) + + self.stubs.Set(self.scheduler, '_run_instance', fake_run_instance) + self.stubs.Set(db, 'instance_update', fake_instance_update) + self.stubs.Set(self.msg_runner, 'instance_update_at_top', + fake_instance_update_at_top) + + self.msg_runner.schedule_run_instance(self.ctxt, + self.my_cell_state, host_sched_kwargs) + # Shouldn't retry + self.assertEqual(1, call_info['num_tries']) + self.assertEqual(self.instance_uuids, call_info['errored_uuids1']) + self.assertEqual(self.instance_uuids, call_info['errored_uuids2']) diff --git a/nova/tests/compute/test_compute.py b/nova/tests/compute/test_compute.py index d335f6675..104fd4e68 100644 --- a/nova/tests/compute/test_compute.py +++ b/nova/tests/compute/test_compute.py @@ -3696,8 +3696,12 @@ class ComputeAPITestCase(BaseTestCase): {'vm_state': vm_states.SOFT_DELETED, 'task_state': None}) + # Ensure quotas are committed self.mox.StubOutWithMock(nova.quota.QUOTAS, 'commit') nova.quota.QUOTAS.commit(mox.IgnoreArg(), mox.IgnoreArg()) + if self.__class__.__name__ == 'CellsComputeAPITestCase': + # Called a 2nd time (for the child cell) when testing cells + nova.quota.QUOTAS.commit(mox.IgnoreArg(), mox.IgnoreArg()) self.mox.ReplayAll() self.compute_api.restore(self.context, instance) diff --git a/nova/tests/compute/test_compute_cells.py b/nova/tests/compute/test_compute_cells.py new file mode 100644 index 000000000..aa4b448d4 --- /dev/null +++ b/nova/tests/compute/test_compute_cells.py @@ -0,0 +1,99 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright (c) 2012 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +Tests For Compute w/ Cells +""" +from nova.compute import cells_api as compute_cells_api +from nova.openstack.common import log as logging +from nova.tests.compute import test_compute + + +LOG = logging.getLogger('nova.tests.test_compute_cells') + +ORIG_COMPUTE_API = None + + +def stub_call_to_cells(context, instance, method, *args, **kwargs): + fn = getattr(ORIG_COMPUTE_API, method) + return fn(context, instance, *args, **kwargs) + + +def stub_cast_to_cells(context, instance, method, *args, **kwargs): + fn = getattr(ORIG_COMPUTE_API, method) + fn(context, instance, *args, **kwargs) + + +def deploy_stubs(stubs, api): + stubs.Set(api, '_call_to_cells', stub_call_to_cells) + stubs.Set(api, '_cast_to_cells', stub_cast_to_cells) + + +class CellsComputeAPITestCase(test_compute.ComputeAPITestCase): + def setUp(self): + super(CellsComputeAPITestCase, self).setUp() + global ORIG_COMPUTE_API + ORIG_COMPUTE_API = self.compute_api + + def _fake_cell_read_only(*args, **kwargs): + return False + + def _fake_validate_cell(*args, **kwargs): + return + + def _nop_update(context, instance, **kwargs): + return instance + + self.compute_api = compute_cells_api.ComputeCellsAPI() + self.stubs.Set(self.compute_api, '_cell_read_only', + _fake_cell_read_only) + self.stubs.Set(self.compute_api, '_validate_cell', + _fake_validate_cell) + + # NOTE(belliott) Don't update the instance state + # for the tests at the API layer. Let it happen after + # the stub cast to cells so that expected_task_states + # match. + self.stubs.Set(self.compute_api, 'update', _nop_update) + + deploy_stubs(self.stubs, self.compute_api) + + def tearDown(self): + global ORIG_COMPUTE_API + self.compute_api = ORIG_COMPUTE_API + super(CellsComputeAPITestCase, self).tearDown() + + def test_instance_metadata(self): + self.skipTest("Test is incompatible with cells.") + + def test_live_migrate(self): + self.skipTest("Test is incompatible with cells.") + + def test_get_backdoor_port(self): + self.skipTest("Test is incompatible with cells.") + + +class CellsComputePolicyTestCase(test_compute.ComputePolicyTestCase): + def setUp(self): + super(CellsComputePolicyTestCase, self).setUp() + global ORIG_COMPUTE_API + ORIG_COMPUTE_API = self.compute_api + self.compute_api = compute_cells_api.ComputeCellsAPI() + deploy_stubs(self.stubs, self.compute_api) + + def tearDown(self): + global ORIG_COMPUTE_API + self.compute_api = ORIG_COMPUTE_API + super(CellsComputePolicyTestCase, self).tearDown() @@ -50,6 +50,7 @@ setuptools.setup(name='nova', 'bin/nova-api-metadata', 'bin/nova-api-os-compute', 'bin/nova-rpc-zmq-receiver', + 'bin/nova-cells', 'bin/nova-cert', 'bin/nova-clear-rabbit-queues', 'bin/nova-compute', |