diff options
author | Russell Bryant <rbryant@redhat.com> | 2012-06-13 10:48:54 -0400 |
---|---|---|
committer | Russell Bryant <rbryant@redhat.com> | 2012-06-20 12:57:21 -0400 |
commit | ba3754e3ff672a877d90c78486c7f4d5fd4bf7b0 (patch) | |
tree | 47f35e1ce9c22ec66155986484e54acb4089efdf | |
parent | 83e6cf7b92ae6a845939adf1771f0422a5e5f2ca (diff) | |
download | nova-ba3754e3ff672a877d90c78486c7f4d5fd4bf7b0.tar.gz nova-ba3754e3ff672a877d90c78486c7f4d5fd4bf7b0.tar.xz nova-ba3754e3ff672a877d90c78486c7f4d5fd4bf7b0.zip |
Use rpc from openstack-common.
Final patch for blueprint common-rpc.
This patch removes nova.rpc in favor of the copy in openstack-common.
Change-Id: I9c2f6bdbe8cd0c44417f75284131dbf3c126d1dd
66 files changed, 114 insertions, 1867 deletions
diff --git a/bin/nova-clear-rabbit-queues b/bin/nova-clear-rabbit-queues index 578681790..aff9da14e 100755 --- a/bin/nova-clear-rabbit-queues +++ b/bin/nova-clear-rabbit-queues @@ -45,7 +45,7 @@ from nova import exception from nova import flags from nova import log as logging from nova.openstack.common import cfg -from nova import rpc +from nova.openstack.common import rpc delete_exchange_opt = cfg.BoolOpt('delete_exchange', diff --git a/bin/nova-dhcpbridge b/bin/nova-dhcpbridge index f30a0877e..d5179d074 100755 --- a/bin/nova-dhcpbridge +++ b/bin/nova-dhcpbridge @@ -41,7 +41,7 @@ from nova import flags from nova import log as logging from nova.network import linux_net from nova.openstack.common import importutils -from nova import rpc +from nova.openstack.common import rpc from nova import utils FLAGS = flags.FLAGS diff --git a/bin/nova-instance-usage-audit b/bin/nova-instance-usage-audit index 3d60edfec..9fc71db05 100755 --- a/bin/nova-instance-usage-audit +++ b/bin/nova-instance-usage-audit @@ -56,7 +56,7 @@ from nova import db from nova import exception from nova import flags from nova import log as logging -from nova import rpc +from nova.openstack.common import rpc from nova import utils diff --git a/bin/nova-manage b/bin/nova-manage index 6bda2ada5..ece879418 100755 --- a/bin/nova-manage +++ b/bin/nova-manage @@ -90,9 +90,9 @@ from nova import log as logging from nova.openstack.common import cfg from nova.openstack.common import importutils from nova.openstack.common import jsonutils +from nova.openstack.common import rpc from nova.openstack.common import timeutils from nova import quota -from nova import rpc from nova.scheduler import rpcapi as scheduler_rpcapi from nova import utils from nova import version diff --git a/bin/nova-rpc-zmq-receiver b/bin/nova-rpc-zmq-receiver index 76b67a840..2fe569b77 100755 --- a/bin/nova-rpc-zmq-receiver +++ b/bin/nova-rpc-zmq-receiver @@ -35,8 +35,8 @@ if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'nova', '__init__.py')): from nova import exception from nova.openstack.common import cfg -from nova import rpc -from nova.rpc import impl_zmq +from nova.openstack.common import rpc +from nova.openstack.common.rpc import impl_zmq from nova import utils CONF = cfg.CONF diff --git a/bin/nova-volume-usage-audit b/bin/nova-volume-usage-audit index 2c01da56f..aff650aa4 100755 --- a/bin/nova-volume-usage-audit +++ b/bin/nova-volume-usage-audit @@ -55,7 +55,7 @@ from nova import db from nova import exception from nova import flags from nova import log as logging -from nova import rpc +from nova.openstack.common import rpc from nova import utils from nova.volume import utils as volume_utils diff --git a/bin/nova-xvpvncproxy b/bin/nova-xvpvncproxy index bdbe20997..1e3708072 100755 --- a/bin/nova-xvpvncproxy +++ b/bin/nova-xvpvncproxy @@ -33,7 +33,7 @@ if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')): from nova import flags from nova import log as logging -from nova import rpc +from nova.openstack.common import rpc from nova import service from nova.vnc import xvp_proxy diff --git a/nova/api/openstack/compute/servers.py b/nova/api/openstack/compute/servers.py index f0b1c355c..69f1b2f3d 100644 --- a/nova/api/openstack/compute/servers.py +++ b/nova/api/openstack/compute/servers.py @@ -32,8 +32,8 @@ from nova.compute import instance_types from nova import exception from nova import flags from nova import log as logging +from nova.openstack.common.rpc import common as rpc_common from nova.openstack.common import timeutils -from nova.rpc import common as rpc_common from nova import utils diff --git a/nova/cert/rpcapi.py b/nova/cert/rpcapi.py index d062026da..cbf1af61f 100644 --- a/nova/cert/rpcapi.py +++ b/nova/cert/rpcapi.py @@ -19,13 +19,13 @@ Client side of the cert manager RPC API. """ from nova import flags -import nova.rpc.proxy +import nova.openstack.common.rpc.proxy FLAGS = flags.FLAGS -class CertAPI(nova.rpc.proxy.RpcProxy): +class CertAPI(nova.openstack.common.rpc.proxy.RpcProxy): '''Client side of the cert rpc API. API version history: diff --git a/nova/compute/manager.py b/nova/compute/manager.py index f33dd3162..7e5070bce 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -68,8 +68,8 @@ from nova.openstack.common import cfg from nova.openstack.common import excutils from nova.openstack.common import importutils from nova.openstack.common import jsonutils +from nova.openstack.common import rpc from nova.openstack.common import timeutils -from nova import rpc from nova import utils from nova.virt import driver from nova import volume diff --git a/nova/compute/rpcapi.py b/nova/compute/rpcapi.py index e7945c7d4..7a531fcf5 100644 --- a/nova/compute/rpcapi.py +++ b/nova/compute/rpcapi.py @@ -20,8 +20,8 @@ Client side of the compute RPC API. from nova import exception from nova import flags -from nova import rpc -import nova.rpc.proxy +from nova.openstack.common import rpc +import nova.openstack.common.rpc.proxy FLAGS = flags.FLAGS @@ -48,7 +48,7 @@ def _compute_topic(topic, ctxt, host, instance): return rpc.queue_get_for(ctxt, topic, host) -class ComputeAPI(nova.rpc.proxy.RpcProxy): +class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy): '''Client side of the compute rpc API. API version history: @@ -358,7 +358,7 @@ class ComputeAPI(nova.rpc.proxy.RpcProxy): topic=_compute_topic(self.topic, ctxt, None, instance)) -class SecurityGroupAPI(nova.rpc.proxy.RpcProxy): +class SecurityGroupAPI(nova.openstack.common.rpc.proxy.RpcProxy): '''Client side of the security group rpc API. API version history: diff --git a/nova/console/api.py b/nova/console/api.py index 46718a49e..ecbdbf2fa 100644 --- a/nova/console/api.py +++ b/nova/console/api.py @@ -21,7 +21,7 @@ from nova.compute import rpcapi as compute_rpcapi from nova.console import rpcapi as console_rpcapi from nova.db import base from nova import flags -from nova import rpc +from nova.openstack.common import rpc from nova import utils diff --git a/nova/console/rpcapi.py b/nova/console/rpcapi.py index 8f0c5b97f..66c963403 100644 --- a/nova/console/rpcapi.py +++ b/nova/console/rpcapi.py @@ -19,13 +19,13 @@ Client side of the console RPC API. """ from nova import flags -import nova.rpc.proxy +import nova.openstack.common.rpc.proxy FLAGS = flags.FLAGS -class ConsoleAPI(nova.rpc.proxy.RpcProxy): +class ConsoleAPI(nova.openstack.common.rpc.proxy.RpcProxy): '''Client side of the console rpc API. API version history: diff --git a/nova/consoleauth/rpcapi.py b/nova/consoleauth/rpcapi.py index 5cb940a34..c6a02e949 100644 --- a/nova/consoleauth/rpcapi.py +++ b/nova/consoleauth/rpcapi.py @@ -19,13 +19,13 @@ Client side of the consoleauth RPC API. """ from nova import flags -import nova.rpc.proxy +import nova.openstack.common.rpc.proxy FLAGS = flags.FLAGS -class ConsoleAuthAPI(nova.rpc.proxy.RpcProxy): +class ConsoleAuthAPI(nova.openstack.common.rpc.proxy.RpcProxy): '''Client side of the consoleauth rpc API. API version history: diff --git a/nova/manager.py b/nova/manager.py index a63b6cfcd..1d73040b4 100644 --- a/nova/manager.py +++ b/nova/manager.py @@ -56,7 +56,7 @@ This module provides Manager, a base class for managers. from nova.db import base from nova import flags from nova import log as logging -from nova.rpc import dispatcher as rpc_dispatcher +from nova.openstack.common.rpc import dispatcher as rpc_dispatcher from nova.scheduler import rpcapi as scheduler_rpcapi from nova import version diff --git a/nova/network/api.py b/nova/network/api.py index 70ad5d4c2..00f694d7a 100644 --- a/nova/network/api.py +++ b/nova/network/api.py @@ -23,7 +23,7 @@ from nova.db import base from nova import flags from nova import log as logging from nova.network import model as network_model -from nova import rpc +from nova.openstack.common import rpc FLAGS = flags.FLAGS diff --git a/nova/network/manager.py b/nova/network/manager.py index 684a9e79d..fb37499e4 100644 --- a/nova/network/manager.py +++ b/nova/network/manager.py @@ -67,10 +67,10 @@ from nova.openstack.common import cfg from nova.openstack.common import excutils from nova.openstack.common import importutils from nova.openstack.common import jsonutils +from nova.openstack.common import rpc from nova.openstack.common import timeutils import nova.policy from nova import quota -from nova import rpc from nova import utils diff --git a/nova/network/quantum/manager.py b/nova/network/quantum/manager.py index 73a484570..95ee06c9f 100644 --- a/nova/network/quantum/manager.py +++ b/nova/network/quantum/manager.py @@ -28,7 +28,7 @@ from nova.network import manager from nova.network.quantum import melange_ipam_lib from nova.network.quantum import quantum_connection from nova.openstack.common import cfg -from nova import rpc +from nova.openstack.common import rpc from nova import utils LOG = logging.getLogger(__name__) diff --git a/nova/notifier/rabbit_notifier.py b/nova/notifier/rabbit_notifier.py index cf39ba9be..27f6ea209 100644 --- a/nova/notifier/rabbit_notifier.py +++ b/nova/notifier/rabbit_notifier.py @@ -19,7 +19,7 @@ import nova.context from nova import flags from nova import log as logging from nova.openstack.common import cfg -from nova import rpc +from nova.openstack.common import rpc LOG = logging.getLogger(__name__) diff --git a/nova/rpc/__init__.py b/nova/openstack/common/rpc/__init__.py index 1ce43d650..6f022b3b2 100644 --- a/nova/rpc/__init__.py +++ b/nova/openstack/common/rpc/__init__.py @@ -31,7 +31,7 @@ from nova.openstack.common import importutils rpc_opts = [ cfg.StrOpt('rpc_backend', - default='nova.rpc.impl_kombu', + default='%s.impl_kombu' % __package__, help="The messaging module to use, defaults to kombu."), cfg.IntOpt('rpc_thread_pool_size', default=64, @@ -47,9 +47,9 @@ rpc_opts = [ help='Seconds to wait before a cast expires (TTL). ' 'Only supported by impl_zmq.'), cfg.ListOpt('allowed_rpc_exception_modules', - default=['nova.exception'], - help='Modules of exceptions that are permitted to be recreated' - 'upon receiving exception data from an rpc call.'), + default=['openstack.common.exception', 'nova.exception'], + help='Modules of exceptions that are permitted to be recreated' + 'upon receiving exception data from an rpc call.'), cfg.StrOpt('control_exchange', default='nova', help='AMQP exchange to connect to if using RabbitMQ or Qpid'), @@ -72,7 +72,7 @@ def create_connection(new=True): implementation is free to return an existing connection from a pool. - :returns: An instance of nova.rpc.common.Connection + :returns: An instance of openstack.common.rpc.common.Connection """ return _get_impl().create_connection(cfg.CONF, new=new) @@ -84,8 +84,9 @@ def call(context, topic, msg, timeout=None): request. :param topic: The topic to send the rpc message to. This correlates to the topic argument of - nova.rpc.common.Connection.create_consumer() and only applies - when the consumer was created with fanout=False. + openstack.common.rpc.common.Connection.create_consumer() + and only applies when the consumer was created with + fanout=False. :param msg: This is a dict in the form { "method" : "method_to_invoke", "args" : dict_of_kwargs } :param timeout: int, number of seconds to use for a response timeout. @@ -93,8 +94,8 @@ def call(context, topic, msg, timeout=None): :returns: A dict from the remote method. - :raises: nova.rpc.common.Timeout if a complete response is not received - before the timeout is reached. + :raises: openstack.common.rpc.common.Timeout if a complete response + is not received before the timeout is reached. """ return _get_impl().call(cfg.CONF, context, topic, msg, timeout) @@ -106,8 +107,9 @@ def cast(context, topic, msg): request. :param topic: The topic to send the rpc message to. This correlates to the topic argument of - nova.rpc.common.Connection.create_consumer() and only applies - when the consumer was created with fanout=False. + openstack.common.rpc.common.Connection.create_consumer() + and only applies when the consumer was created with + fanout=False. :param msg: This is a dict in the form { "method" : "method_to_invoke", "args" : dict_of_kwargs } @@ -126,8 +128,9 @@ def fanout_cast(context, topic, msg): request. :param topic: The topic to send the rpc message to. This correlates to the topic argument of - nova.rpc.common.Connection.create_consumer() and only applies - when the consumer was created with fanout=True. + openstack.common.rpc.common.Connection.create_consumer() + and only applies when the consumer was created with + fanout=True. :param msg: This is a dict in the form { "method" : "method_to_invoke", "args" : dict_of_kwargs } @@ -147,8 +150,9 @@ def multicall(context, topic, msg, timeout=None): request. :param topic: The topic to send the rpc message to. This correlates to the topic argument of - nova.rpc.common.Connection.create_consumer() and only applies - when the consumer was created with fanout=False. + openstack.common.rpc.common.Connection.create_consumer() + and only applies when the consumer was created with + fanout=False. :param msg: This is a dict in the form { "method" : "method_to_invoke", "args" : dict_of_kwargs } :param timeout: int, number of seconds to use for a response timeout. @@ -159,8 +163,8 @@ def multicall(context, topic, msg, timeout=None): returned and X is the Nth value that was returned by the remote method. - :raises: nova.rpc.common.Timeout if a complete response is not received - before the timeout is reached. + :raises: openstack.common.rpc.common.Timeout if a complete response + is not received before the timeout is reached. """ return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout) @@ -248,5 +252,11 @@ def _get_impl(): """Delay import of rpc_backend until configuration is loaded.""" global _RPCIMPL if _RPCIMPL is None: - _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend) + try: + _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend) + except ImportError: + # For backwards compatibility with older nova config. + impl = cfg.CONF.rpc_backend.replace('nova.rpc', + 'nova.openstack.common.rpc') + _RPCIMPL = importutils.import_module(impl) return _RPCIMPL diff --git a/nova/rpc/amqp.py b/nova/openstack/common/rpc/amqp.py index 8e5d685d5..5f62d35c5 100644 --- a/nova/rpc/amqp.py +++ b/nova/openstack/common/rpc/amqp.py @@ -18,7 +18,7 @@ # under the License. """ -Shared code between AMQP based nova.rpc implementations. +Shared code between AMQP based openstack.common.rpc implementations. The code in this module is shared between the rpc implemenations based on AMQP. Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses @@ -36,7 +36,7 @@ from eventlet import semaphore from nova.openstack.common import excutils from nova.openstack.common import local -import nova.rpc.common as rpc_common +from nova.openstack.common.rpc import common as rpc_common LOG = logging.getLogger(__name__) diff --git a/nova/rpc/common.py b/nova/openstack/common/rpc/common.py index 57f7053ee..0b927a0ee 100644 --- a/nova/rpc/common.py +++ b/nova/openstack/common/rpc/common.py @@ -19,8 +19,10 @@ import copy import logging +import sys import traceback +from nova.openstack.common import cfg from nova.openstack.common import importutils from nova.openstack.common import jsonutils from nova.openstack.common import local diff --git a/nova/rpc/dispatcher.py b/nova/openstack/common/rpc/dispatcher.py index 3f46398a9..1d36f32ea 100644 --- a/nova/rpc/dispatcher.py +++ b/nova/openstack/common/rpc/dispatcher.py @@ -42,7 +42,7 @@ there can be both versioned and unversioned APIs implemented in the same code base. """ -from nova.rpc import common as rpc_common +from nova.openstack.common.rpc import common as rpc_common class RpcDispatcher(object): diff --git a/nova/rpc/impl_fake.py b/nova/openstack/common/rpc/impl_fake.py index ea9303434..07e8bdc08 100644 --- a/nova/rpc/impl_fake.py +++ b/nova/openstack/common/rpc/impl_fake.py @@ -18,12 +18,12 @@ queues. Casts will block, but this is very useful for tests. """ import inspect +import json import time import eventlet -from nova.openstack.common import jsonutils -from nova.rpc import common as rpc_common +from nova.openstack.common.rpc import common as rpc_common CONSUMERS = {} @@ -121,7 +121,7 @@ def create_connection(conf, new=True): def check_serialize(msg): """Make sure a message intended for rpc can be serialized.""" - jsonutils.dumps(msg) + json.dumps(msg) def multicall(conf, context, topic, msg, timeout=None): diff --git a/nova/rpc/impl_kombu.py b/nova/openstack/common/rpc/impl_kombu.py index ac4b412bb..66152a9d5 100644 --- a/nova/rpc/impl_kombu.py +++ b/nova/openstack/common/rpc/impl_kombu.py @@ -30,8 +30,8 @@ import kombu.entity import kombu.messaging from nova.openstack.common import cfg -from nova.rpc import amqp as rpc_amqp -from nova.rpc import common as rpc_common +from nova.openstack.common.rpc import amqp as rpc_amqp +from nova.openstack.common.rpc import common as rpc_common kombu_opts = [ cfg.StrOpt('kombu_ssl_version', @@ -139,10 +139,9 @@ class ConsumerBase(object): message = self.channel.message_to_python(raw_message) try: callback(message.payload) + message.ack() except Exception: LOG.exception(_("Failed to process message... skipping it.")) - finally: - message.ack() self.queue.consume(*args, callback=_callback, **options) diff --git a/nova/rpc/impl_qpid.py b/nova/openstack/common/rpc/impl_qpid.py index c5ab4a1d5..fce8cd07b 100644 --- a/nova/rpc/impl_qpid.py +++ b/nova/openstack/common/rpc/impl_qpid.py @@ -17,6 +17,7 @@ import functools import itertools +import json import logging import time import uuid @@ -27,9 +28,9 @@ import qpid.messaging import qpid.messaging.exceptions from nova.openstack.common import cfg -from nova.openstack.common import jsonutils -from nova.rpc import amqp as rpc_amqp -from nova.rpc import common as rpc_common +from nova.openstack.common.gettextutils import _ +from nova.openstack.common.rpc import amqp as rpc_amqp +from nova.openstack.common.rpc import common as rpc_common LOG = logging.getLogger(__name__) @@ -124,7 +125,7 @@ class ConsumerBase(object): addr_opts["node"]["x-declare"].update(node_opts) addr_opts["link"]["x-declare"].update(link_opts) - self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) + self.address = "%s ; %s" % (node_name, json.dumps(addr_opts)) self.reconnect(session) @@ -227,7 +228,7 @@ class Publisher(object): if node_opts: addr_opts["node"]["x-declare"].update(node_opts) - self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) + self.address = "%s ; %s" % (node_name, json.dumps(addr_opts)) self.reconnect(session) diff --git a/nova/rpc/impl_zmq.py b/nova/openstack/common/rpc/impl_zmq.py index 35ae0094c..77768dbec 100644 --- a/nova/rpc/impl_zmq.py +++ b/nova/openstack/common/rpc/impl_zmq.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import json import pprint import string import sys @@ -25,9 +26,9 @@ from eventlet.green import zmq import greenlet from nova.openstack.common import cfg +from nova.openstack.common.gettextutils import _ from nova.openstack.common import importutils -from nova.openstack.common import jsonutils -from nova.rpc import common as rpc_common +from nova.openstack.common.rpc import common as rpc_common # for convenience, are not modified. @@ -45,7 +46,7 @@ zmq_opts = [ # The module.Class to use for matchmaking. cfg.StrOpt('rpc_zmq_matchmaker', - default='nova.rpc.matchmaker.MatchMakerLocalhost', + default='openstack.common.rpc.matchmaker.MatchMakerLocalhost', help='MatchMaker driver'), # The following port is unassigned by IANA as of 2012-05-21 @@ -55,7 +56,7 @@ zmq_opts = [ cfg.IntOpt('rpc_zmq_contexts', default=1, help='Number of ZeroMQ contexts, defaults to 1'), - cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/nova', + cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack', help='Directory for holding IPC sockets'), ] @@ -74,7 +75,7 @@ def _serialize(data): Error if a developer passes us bad data. """ try: - return str(jsonutils.dumps(data)) + return str(json.dumps(data, ensure_ascii=True)) except TypeError: LOG.error(_("JSON serialization failed.")) raise @@ -85,7 +86,7 @@ def _deserialize(data): Deserialization wrapper """ LOG.debug(_("Deserializing: %s"), data) - return jsonutils.loads(data) + return json.loads(data) class ZmqSocket(object): diff --git a/nova/rpc/matchmaker.py b/nova/openstack/common/rpc/matchmaker.py index f59e2555d..f59e2555d 100644 --- a/nova/rpc/matchmaker.py +++ b/nova/openstack/common/rpc/matchmaker.py diff --git a/nova/rpc/proxy.py b/nova/openstack/common/rpc/proxy.py index 79a90dc3a..cfa2b8cd0 100644 --- a/nova/rpc/proxy.py +++ b/nova/openstack/common/rpc/proxy.py @@ -22,7 +22,7 @@ For more information about rpc API version numbers, see: """ -from nova import rpc +from nova.openstack.common import rpc class RpcProxy(object): diff --git a/nova/scheduler/driver.py b/nova/scheduler/driver.py index 3e25e8516..9cc528049 100644 --- a/nova/scheduler/driver.py +++ b/nova/scheduler/driver.py @@ -33,8 +33,8 @@ from nova import notifications from nova.openstack.common import cfg from nova.openstack.common import importutils from nova.openstack.common import jsonutils +from nova.openstack.common import rpc from nova.openstack.common import timeutils -from nova import rpc from nova import utils diff --git a/nova/scheduler/rpcapi.py b/nova/scheduler/rpcapi.py index 0e4052a7a..dea17b912 100644 --- a/nova/scheduler/rpcapi.py +++ b/nova/scheduler/rpcapi.py @@ -19,13 +19,13 @@ Client side of the scheduler manager RPC API. """ from nova import flags -import nova.rpc.proxy +import nova.openstack.common.rpc.proxy FLAGS = flags.FLAGS -class SchedulerAPI(nova.rpc.proxy.RpcProxy): +class SchedulerAPI(nova.openstack.common.rpc.proxy.RpcProxy): '''Client side of the scheduler rpc API. API version history: diff --git a/nova/service.py b/nova/service.py index ab1fd339a..c701d3813 100644 --- a/nova/service.py +++ b/nova/service.py @@ -35,7 +35,7 @@ from nova import flags from nova import log as logging from nova.openstack.common import cfg from nova.openstack.common import importutils -from nova import rpc +from nova.openstack.common import rpc from nova import utils from nova import version from nova import wsgi diff --git a/nova/tests/api/ec2/test_cloud.py b/nova/tests/api/ec2/test_cloud.py index 26fb81faa..b981314dc 100644 --- a/nova/tests/api/ec2/test_cloud.py +++ b/nova/tests/api/ec2/test_cloud.py @@ -40,7 +40,7 @@ from nova.image import fake from nova.image import s3 from nova import log as logging from nova.network import api as network_api -from nova import rpc +from nova.openstack.common import rpc from nova import test from nova import utils diff --git a/nova/tests/api/ec2/test_ec2_validate.py b/nova/tests/api/ec2/test_ec2_validate.py index ee3d5bba0..d917c3352 100644 --- a/nova/tests/api/ec2/test_ec2_validate.py +++ b/nova/tests/api/ec2/test_ec2_validate.py @@ -25,7 +25,7 @@ from nova import flags from nova.image import fake from nova import log as logging from nova.openstack.common import importutils -from nova import rpc +from nova.openstack.common import rpc from nova import test LOG = logging.getLogger(__name__) diff --git a/nova/tests/api/openstack/compute/contrib/test_certificates.py b/nova/tests/api/openstack/compute/contrib/test_certificates.py index 803cf5601..9c9625e8e 100644 --- a/nova/tests/api/openstack/compute/contrib/test_certificates.py +++ b/nova/tests/api/openstack/compute/contrib/test_certificates.py @@ -17,7 +17,7 @@ from lxml import etree from nova.api.openstack.compute.contrib import certificates from nova import context -from nova import rpc +from nova.openstack.common import rpc from nova import test from nova.tests.api.openstack import fakes diff --git a/nova/tests/api/openstack/compute/contrib/test_disk_config.py b/nova/tests/api/openstack/compute/contrib/test_disk_config.py index 4b5c390e2..e63c28080 100644 --- a/nova/tests/api/openstack/compute/contrib/test_disk_config.py +++ b/nova/tests/api/openstack/compute/contrib/test_disk_config.py @@ -21,7 +21,7 @@ from nova.api.openstack import compute import nova.db.api from nova import flags from nova.openstack.common import jsonutils -import nova.rpc +import nova.openstack.common.rpc from nova import test from nova.tests.api.openstack import fakes diff --git a/nova/tests/api/openstack/compute/contrib/test_floating_ips.py b/nova/tests/api/openstack/compute/contrib/test_floating_ips.py index b1f87c6df..6ce0daa05 100644 --- a/nova/tests/api/openstack/compute/contrib/test_floating_ips.py +++ b/nova/tests/api/openstack/compute/contrib/test_floating_ips.py @@ -24,7 +24,7 @@ from nova import context from nova import db from nova import exception from nova import network -from nova import rpc +from nova.openstack.common import rpc from nova import test from nova.tests.api.openstack import fakes from nova.tests import fake_network diff --git a/nova/tests/api/openstack/compute/contrib/test_scheduler_hints.py b/nova/tests/api/openstack/compute/contrib/test_scheduler_hints.py index 5b545ec88..c651444f4 100644 --- a/nova/tests/api/openstack/compute/contrib/test_scheduler_hints.py +++ b/nova/tests/api/openstack/compute/contrib/test_scheduler_hints.py @@ -18,7 +18,7 @@ from nova.api.openstack import compute import nova.db.api from nova.openstack.common import jsonutils -import nova.rpc +import nova.openstack.common.rpc from nova import test from nova.tests.api.openstack import fakes diff --git a/nova/tests/api/openstack/compute/test_servers.py b/nova/tests/api/openstack/compute/test_servers.py index f0d06cf21..fbc60da68 100644 --- a/nova/tests/api/openstack/compute/test_servers.py +++ b/nova/tests/api/openstack/compute/test_servers.py @@ -37,7 +37,7 @@ from nova.db.sqlalchemy import models from nova import flags import nova.image.fake from nova.openstack.common import jsonutils -import nova.rpc +import nova.openstack.common.rpc from nova import test from nova.tests.api.openstack import fakes from nova.tests import fake_network @@ -1493,11 +1493,12 @@ class ServersControllerCreateTest(test.TestCase): self.stubs.Set(nova.db, 'instance_system_metadata_update', fake_method) self.stubs.Set(nova.db, 'instance_get', instance_get) - self.stubs.Set(nova.rpc, 'cast', fake_method) - self.stubs.Set(nova.rpc, 'call', rpc_call_wrapper) + self.stubs.Set(nova.openstack.common.rpc, 'cast', fake_method) + self.stubs.Set(nova.openstack.common.rpc, 'call', rpc_call_wrapper) self.stubs.Set(nova.db, 'instance_update_and_get_original', server_update) - self.stubs.Set(nova.rpc, 'queue_get_for', queue_get_for) + self.stubs.Set(nova.openstack.common.rpc, 'queue_get_for', + queue_get_for) self.stubs.Set(nova.network.manager.VlanManager, 'allocate_fixed_ip', fake_method) diff --git a/nova/tests/cert/test_rpcapi.py b/nova/tests/cert/test_rpcapi.py index 2e3feeaaf..1deb82c98 100644 --- a/nova/tests/cert/test_rpcapi.py +++ b/nova/tests/cert/test_rpcapi.py @@ -21,7 +21,7 @@ Unit Tests for nova.cert.rpcapi from nova.cert import rpcapi as cert_rpcapi from nova import context from nova import flags -from nova import rpc +from nova.openstack.common import rpc from nova import test diff --git a/nova/tests/compute/test_compute.py b/nova/tests/compute/test_compute.py index 51cbb5b1f..e4d224584 100644 --- a/nova/tests/compute/test_compute.py +++ b/nova/tests/compute/test_compute.py @@ -45,10 +45,10 @@ from nova.notifier import test_notifier from nova.openstack.common import importutils from nova.openstack.common import policy as common_policy from nova.openstack.common import timeutils +from nova.openstack.common import rpc +from nova.openstack.common.rpc import common as rpc_common import nova.policy from nova import quota -from nova import rpc -from nova.rpc import common as rpc_common from nova.scheduler import driver as scheduler_driver from nova import test from nova.tests import fake_network diff --git a/nova/tests/compute/test_rpcapi.py b/nova/tests/compute/test_rpcapi.py index a0da63918..37427477f 100644 --- a/nova/tests/compute/test_rpcapi.py +++ b/nova/tests/compute/test_rpcapi.py @@ -21,7 +21,7 @@ Unit Tests for nova.compute.rpcapi from nova.compute import rpcapi as compute_rpcapi from nova import context from nova import flags -from nova import rpc +from nova.openstack.common import rpc from nova import test diff --git a/nova/tests/console/test_rpcapi.py b/nova/tests/console/test_rpcapi.py index 132657697..016451928 100644 --- a/nova/tests/console/test_rpcapi.py +++ b/nova/tests/console/test_rpcapi.py @@ -21,7 +21,7 @@ Unit Tests for nova.console.rpcapi from nova.console import rpcapi as console_rpcapi from nova import context from nova import flags -from nova import rpc +from nova.openstack.common import rpc from nova import test diff --git a/nova/tests/consoleauth/test_rpcapi.py b/nova/tests/consoleauth/test_rpcapi.py index 546183185..91626c765 100644 --- a/nova/tests/consoleauth/test_rpcapi.py +++ b/nova/tests/consoleauth/test_rpcapi.py @@ -21,7 +21,7 @@ Unit Tests for nova.consoleauth.rpcapi from nova.consoleauth import rpcapi as consoleauth_rpcapi from nova import context from nova import flags -from nova import rpc +from nova.openstack.common import rpc from nova import test diff --git a/nova/tests/fake_flags.py b/nova/tests/fake_flags.py index 2f5bc2a3b..3d05d7605 100644 --- a/nova/tests/fake_flags.py +++ b/nova/tests/fake_flags.py @@ -42,7 +42,7 @@ def set_defaults(conf): conf.set_default('iscsi_num_targets', 8) conf.set_default('network_size', 8) conf.set_default('num_networks', 2) - conf.set_default('rpc_backend', 'nova.rpc.impl_fake') + conf.set_default('rpc_backend', 'nova.openstack.common.rpc.impl_fake') conf.set_default('sql_connection', "sqlite://") conf.set_default('sqlite_synchronous', False) conf.set_default('use_ipv6', True) diff --git a/nova/tests/network/test_manager.py b/nova/tests/network/test_manager.py index 817b7271d..11e293291 100644 --- a/nova/tests/network/test_manager.py +++ b/nova/tests/network/test_manager.py @@ -27,8 +27,8 @@ from nova import log as logging from nova.network import linux_net from nova.network import manager as network_manager from nova.openstack.common import importutils +from nova.openstack.common import rpc import nova.policy -from nova import rpc from nova import test from nova.tests import fake_network from nova import utils diff --git a/nova/tests/rpc/__init__.py b/nova/tests/rpc/__init__.py deleted file mode 100644 index 7e04e7c73..000000000 --- a/nova/tests/rpc/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2011 OpenStack LLC. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -# NOTE(vish): this forces the fixtures from tests/__init.py:setup() to work -from nova.tests import * diff --git a/nova/tests/rpc/common.py b/nova/tests/rpc/common.py deleted file mode 100644 index 84dd79890..000000000 --- a/nova/tests/rpc/common.py +++ /dev/null @@ -1,321 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -""" -Unit Tests for remote procedure calls shared between all implementations -""" - -import time - -import eventlet -from eventlet import greenthread -import nose - -from nova import context -from nova import exception -from nova import flags -from nova import log as logging -from nova.rpc import amqp as rpc_amqp -from nova.rpc import common as rpc_common -from nova.rpc import dispatcher as rpc_dispatcher -from nova import test - - -FLAGS = flags.FLAGS -LOG = logging.getLogger(__name__) - - -class BaseRpcTestCase(test.TestCase): - def setUp(self, supports_timeouts=True, topic='test', - topic_nested='nested'): - super(BaseRpcTestCase, self).setUp() - self.topic = topic or self.topic - self.topic_nested = topic_nested or self.topic_nested - self.supports_timeouts = supports_timeouts - self.context = context.get_admin_context() - - if self.rpc: - receiver = TestReceiver() - self.conn = self._create_consumer(receiver, self.topic) - - def tearDown(self): - if self.rpc: - self.conn.close() - super(BaseRpcTestCase, self).tearDown() - - def _create_consumer(self, proxy, topic, fanout=False): - dispatcher = rpc_dispatcher.RpcDispatcher([proxy]) - conn = self.rpc.create_connection(FLAGS, True) - conn.create_consumer(topic, dispatcher, fanout) - conn.consume_in_thread() - return conn - - def test_call_succeed(self): - if not self.rpc: - raise nose.SkipTest('rpc driver not available.') - - value = 42 - result = self.rpc.call(FLAGS, self.context, self.topic, - {"method": "echo", "args": {"value": value}}) - self.assertEqual(value, result) - - def test_call_succeed_despite_multiple_returns_yield(self): - if not self.rpc: - raise nose.SkipTest('rpc driver not available.') - - value = 42 - result = self.rpc.call(FLAGS, self.context, self.topic, - {"method": "echo_three_times_yield", - "args": {"value": value}}) - self.assertEqual(value + 2, result) - - def test_multicall_succeed_once(self): - if not self.rpc: - raise nose.SkipTest('rpc driver not available.') - - value = 42 - result = self.rpc.multicall(FLAGS, self.context, - self.topic, - {"method": "echo", - "args": {"value": value}}) - for i, x in enumerate(result): - if i > 0: - self.fail('should only receive one response') - self.assertEqual(value + i, x) - - def test_multicall_three_nones(self): - if not self.rpc: - raise nose.SkipTest('rpc driver not available.') - - value = 42 - result = self.rpc.multicall(FLAGS, self.context, - self.topic, - {"method": "multicall_three_nones", - "args": {"value": value}}) - for i, x in enumerate(result): - self.assertEqual(x, None) - # i should have been 0, 1, and finally 2: - self.assertEqual(i, 2) - - def test_multicall_succeed_three_times_yield(self): - if not self.rpc: - raise nose.SkipTest('rpc driver not available.') - - value = 42 - result = self.rpc.multicall(FLAGS, self.context, - self.topic, - {"method": "echo_three_times_yield", - "args": {"value": value}}) - for i, x in enumerate(result): - self.assertEqual(value + i, x) - - def test_context_passed(self): - if not self.rpc: - raise nose.SkipTest('rpc driver not available.') - - """Makes sure a context is passed through rpc call.""" - value = 42 - result = self.rpc.call(FLAGS, self.context, - self.topic, {"method": "context", - "args": {"value": value}}) - self.assertEqual(self.context.to_dict(), result) - - def _test_cast(self, fanout=False): - """Test casts by pushing items through a channeled queue.""" - - # Not a true global, but capitalized so - # it is clear it is leaking scope into Nested() - QUEUE = eventlet.queue.Queue() - - if not self.rpc: - raise nose.SkipTest('rpc driver not available.') - - # We use the nested topic so we don't need QUEUE to be a proper - # global, and do not keep state outside this test. - class Nested(object): - @staticmethod - def put_queue(context, value): - LOG.debug("Got value in put_queue: %s", value) - QUEUE.put(value) - - nested = Nested() - conn = self._create_consumer(nested, self.topic_nested, fanout) - value = 42 - - method = (self.rpc.cast, self.rpc.fanout_cast)[fanout] - method(FLAGS, self.context, - self.topic_nested, - {"method": "put_queue", - "args": {"value": value}}) - - try: - # If it does not succeed in 2 seconds, give up and assume - # failure. - result = QUEUE.get(True, 2) - except Exception: - self.assertEqual(value, None) - - conn.close() - self.assertEqual(value, result) - - def test_cast_success(self): - self._test_cast(False) - - def test_fanout_success(self): - self._test_cast(True) - - def test_nested_calls(self): - if not self.rpc: - raise nose.SkipTest('rpc driver not available.') - - """Test that we can do an rpc.call inside another call.""" - class Nested(object): - @staticmethod - def echo(context, queue, value): - """Calls echo in the passed queue.""" - LOG.debug(_("Nested received %(queue)s, %(value)s") - % locals()) - # TODO(comstud): - # so, it will replay the context and use the same REQID? - # that's bizarre. - ret = self.rpc.call(FLAGS, context, - queue, - {"method": "echo", - "args": {"value": value}}) - LOG.debug(_("Nested return %s"), ret) - return value - - nested = Nested() - conn = self._create_consumer(nested, self.topic_nested) - - value = 42 - result = self.rpc.call(FLAGS, self.context, - self.topic_nested, - {"method": "echo", - "args": {"queue": "test", "value": value}}) - conn.close() - self.assertEqual(value, result) - - def test_call_timeout(self): - if not self.rpc: - raise nose.SkipTest('rpc driver not available.') - - """Make sure rpc.call will time out.""" - if not self.supports_timeouts: - raise nose.SkipTest(_("RPC backend does not support timeouts")) - - value = 42 - self.assertRaises(rpc_common.Timeout, - self.rpc.call, - FLAGS, self.context, - self.topic, - {"method": "block", - "args": {"value": value}}, timeout=1) - try: - self.rpc.call(FLAGS, self.context, - self.topic, - {"method": "block", - "args": {"value": value}}, - timeout=1) - self.fail("should have thrown Timeout") - except rpc_common.Timeout as exc: - pass - - -class BaseRpcAMQPTestCase(BaseRpcTestCase): - """Base test class for all AMQP-based RPC tests.""" - def test_proxycallback_handles_exceptions(self): - """Make sure exceptions unpacking messages don't cause hangs.""" - if not self.rpc: - raise nose.SkipTest('rpc driver not available.') - - orig_unpack = rpc_amqp.unpack_context - - info = {'unpacked': False} - - def fake_unpack_context(*args, **kwargs): - info['unpacked'] = True - raise test.TestingException('moo') - - self.stubs.Set(rpc_amqp, 'unpack_context', fake_unpack_context) - - value = 41 - self.rpc.cast(FLAGS, self.context, self.topic, - {"method": "echo", "args": {"value": value}}) - - # Wait for the cast to complete. - for x in xrange(50): - if info['unpacked']: - break - greenthread.sleep(0.1) - else: - self.fail("Timeout waiting for message to be consumed") - - # Now see if we get a response even though we raised an - # exception for the cast above. - self.stubs.Set(rpc_amqp, 'unpack_context', orig_unpack) - - value = 42 - result = self.rpc.call(FLAGS, self.context, self.topic, - {"method": "echo", - "args": {"value": value}}) - self.assertEqual(value, result) - - -class TestReceiver(object): - """Simple Proxy class so the consumer has methods to call. - - Uses static methods because we aren't actually storing any state. - - """ - @staticmethod - def echo(context, value): - """Simply returns whatever value is sent in.""" - LOG.debug(_("Received %s"), value) - return value - - @staticmethod - def context(context, value): - """Returns dictionary version of context.""" - LOG.debug(_("Received %s"), context) - return context.to_dict() - - @staticmethod - def multicall_three_nones(context, value): - yield None - yield None - yield None - - @staticmethod - def echo_three_times_yield(context, value): - yield value - yield value + 1 - yield value + 2 - - @staticmethod - def fail(context, value): - """Raises an exception with the value sent in.""" - raise NotImplementedError(value) - - @staticmethod - def fail_converted(context, value): - """Raises an exception with the value sent in.""" - raise exception.ConvertedException(explanation=value) - - @staticmethod - def block(context, value): - time.sleep(2) diff --git a/nova/tests/rpc/test_common.py b/nova/tests/rpc/test_common.py deleted file mode 100644 index 7a6199c0f..000000000 --- a/nova/tests/rpc/test_common.py +++ /dev/null @@ -1,144 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2012 OpenStack, LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -""" -Unit Tests for 'common' functons used through rpc code. -""" - -import sys - -from nova import exception -from nova import flags -from nova import log as logging -from nova.openstack.common import jsonutils -from nova.rpc import common as rpc_common -from nova import test - -FLAGS = flags.FLAGS -LOG = logging.getLogger(__name__) - - -def raise_exception(): - raise Exception("test") - - -class FakeUserDefinedException(Exception): - def __init__(self): - Exception.__init__(self, "Test Message") - - -class RpcCommonTestCase(test.TestCase): - def test_serialize_remote_exception(self): - expected = { - 'class': 'Exception', - 'module': 'exceptions', - 'message': 'test', - } - - try: - raise_exception() - except Exception as exc: - failure = rpc_common.serialize_remote_exception(sys.exc_info()) - - failure = jsonutils.loads(failure) - #assure the traceback was added - self.assertEqual(expected['class'], failure['class']) - self.assertEqual(expected['module'], failure['module']) - self.assertEqual(expected['message'], failure['message']) - - def test_serialize_remote_nova_exception(self): - def raise_nova_exception(): - raise exception.NovaException("test", code=500) - - expected = { - 'class': 'NovaException', - 'module': 'nova.exception', - 'kwargs': {'code': 500}, - 'message': 'test' - } - - try: - raise_nova_exception() - except Exception as exc: - failure = rpc_common.serialize_remote_exception(sys.exc_info()) - - failure = jsonutils.loads(failure) - #assure the traceback was added - self.assertEqual(expected['class'], failure['class']) - self.assertEqual(expected['module'], failure['module']) - self.assertEqual(expected['kwargs'], failure['kwargs']) - self.assertEqual(expected['message'], failure['message']) - - def test_deserialize_remote_exception(self): - failure = { - 'class': 'NovaException', - 'module': 'nova.exception', - 'message': 'test message', - 'tb': ['raise NovaException'], - } - serialized = jsonutils.dumps(failure) - - after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized) - self.assertTrue(isinstance(after_exc, exception.NovaException)) - self.assertTrue('test message' in unicode(after_exc)) - #assure the traceback was added - self.assertTrue('raise NovaException' in unicode(after_exc)) - - def test_deserialize_remote_exception_bad_module(self): - failure = { - 'class': 'popen2', - 'module': 'os', - 'kwargs': {'cmd': '/bin/echo failed'}, - 'message': 'foo', - } - serialized = jsonutils.dumps(failure) - - after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized) - self.assertTrue(isinstance(after_exc, rpc_common.RemoteError)) - - def test_deserialize_remote_exception_user_defined_exception(self): - """Ensure a user defined exception can be deserialized.""" - self.flags(allowed_rpc_exception_modules=[self.__class__.__module__]) - failure = { - 'class': 'FakeUserDefinedException', - 'module': self.__class__.__module__, - 'tb': ['raise FakeUserDefinedException'], - } - serialized = jsonutils.dumps(failure) - - after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized) - self.assertTrue(isinstance(after_exc, FakeUserDefinedException)) - #assure the traceback was added - self.assertTrue('raise FakeUserDefinedException' in unicode(after_exc)) - - def test_deserialize_remote_exception_cannot_recreate(self): - """Ensure a RemoteError is returned on initialization failure. - - If an exception cannot be recreated with it's original class then a - RemoteError with the exception informations should still be returned. - - """ - self.flags(allowed_rpc_exception_modules=[self.__class__.__module__]) - failure = { - 'class': 'FakeIDontExistException', - 'module': self.__class__.__module__, - 'tb': ['raise FakeIDontExistException'], - } - serialized = jsonutils.dumps(failure) - - after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized) - self.assertTrue(isinstance(after_exc, rpc_common.RemoteError)) - #assure the traceback was added - self.assertTrue('raise FakeIDontExistException' in unicode(after_exc)) diff --git a/nova/tests/rpc/test_dispatcher.py b/nova/tests/rpc/test_dispatcher.py deleted file mode 100644 index d67f7b8ec..000000000 --- a/nova/tests/rpc/test_dispatcher.py +++ /dev/null @@ -1,109 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2012, Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Unit Tests for rpc.dispatcher -""" - -from nova import context -from nova.rpc import common as rpc_common -from nova.rpc import dispatcher -from nova import test - - -class RpcDispatcherTestCase(test.TestCase): - class API1(object): - RPC_API_VERSION = '1.0' - - def __init__(self): - self.test_method_ctxt = None - self.test_method_arg1 = None - - def test_method(self, ctxt, arg1): - self.test_method_ctxt = ctxt - self.test_method_arg1 = arg1 - - class API2(object): - RPC_API_VERSION = '2.1' - - def __init__(self): - self.test_method_ctxt = None - self.test_method_arg1 = None - - def test_method(self, ctxt, arg1): - self.test_method_ctxt = ctxt - self.test_method_arg1 = arg1 - - class API3(object): - RPC_API_VERSION = '3.5' - - def __init__(self): - self.test_method_ctxt = None - self.test_method_arg1 = None - - def test_method(self, ctxt, arg1): - self.test_method_ctxt = ctxt - self.test_method_arg1 = arg1 - - def setUp(self): - self.ctxt = context.RequestContext('fake_user', 'fake_project') - super(RpcDispatcherTestCase, self).setUp() - - def tearDown(self): - super(RpcDispatcherTestCase, self).tearDown() - - def _test_dispatch(self, version, expectations): - v2 = self.API2() - v3 = self.API3() - disp = dispatcher.RpcDispatcher([v2, v3]) - - disp.dispatch(self.ctxt, version, 'test_method', arg1=1) - - self.assertEqual(v2.test_method_ctxt, expectations[0]) - self.assertEqual(v2.test_method_arg1, expectations[1]) - self.assertEqual(v3.test_method_ctxt, expectations[2]) - self.assertEqual(v3.test_method_arg1, expectations[3]) - - def test_dispatch(self): - self._test_dispatch('2.1', (self.ctxt, 1, None, None)) - self._test_dispatch('3.5', (None, None, self.ctxt, 1)) - - def test_dispatch_lower_minor_version(self): - self._test_dispatch('2.0', (self.ctxt, 1, None, None)) - self._test_dispatch('3.1', (None, None, self.ctxt, 1)) - - def test_dispatch_higher_minor_version(self): - self.assertRaises(rpc_common.UnsupportedRpcVersion, - self._test_dispatch, '2.6', (None, None, None, None)) - self.assertRaises(rpc_common.UnsupportedRpcVersion, - self._test_dispatch, '3.6', (None, None, None, None)) - - def test_dispatch_lower_major_version(self): - self.assertRaises(rpc_common.UnsupportedRpcVersion, - self._test_dispatch, '1.0', (None, None, None, None)) - - def test_dispatch_higher_major_version(self): - self.assertRaises(rpc_common.UnsupportedRpcVersion, - self._test_dispatch, '4.0', (None, None, None, None)) - - def test_dispatch_no_version_uses_v1(self): - v1 = self.API1() - disp = dispatcher.RpcDispatcher([v1]) - - disp.dispatch(self.ctxt, None, 'test_method', arg1=1) - - self.assertEqual(v1.test_method_ctxt, self.ctxt) - self.assertEqual(v1.test_method_arg1, 1) diff --git a/nova/tests/rpc/test_fake.py b/nova/tests/rpc/test_fake.py deleted file mode 100644 index 8d6878ca4..000000000 --- a/nova/tests/rpc/test_fake.py +++ /dev/null @@ -1,33 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -""" -Unit Tests for remote procedure calls using fake_impl -""" - -from nova import log as logging -from nova.rpc import impl_fake -from nova.tests.rpc import common - - -LOG = logging.getLogger(__name__) - - -class RpcFakeTestCase(common.BaseRpcTestCase): - def setUp(self): - self.rpc = impl_fake - super(RpcFakeTestCase, self).setUp() diff --git a/nova/tests/rpc/test_kombu.py b/nova/tests/rpc/test_kombu.py deleted file mode 100644 index 0055f253c..000000000 --- a/nova/tests/rpc/test_kombu.py +++ /dev/null @@ -1,395 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -""" -Unit Tests for remote procedure calls using kombu -""" - -from nova import context -from nova import exception -from nova import flags -from nova import log as logging -from nova.rpc import amqp as rpc_amqp -from nova import test -from nova.tests.rpc import common - -try: - import kombu - from nova.rpc import impl_kombu -except ImportError: - kombu = None - impl_kombu = None - - -FLAGS = flags.FLAGS -LOG = logging.getLogger(__name__) - - -class MyException(Exception): - pass - - -def _raise_exc_stub(stubs, times, obj, method, exc_msg, - exc_class=MyException): - info = {'called': 0} - orig_method = getattr(obj, method) - - def _raise_stub(*args, **kwargs): - info['called'] += 1 - if info['called'] <= times: - raise exc_class(exc_msg) - orig_method(*args, **kwargs) - stubs.Set(obj, method, _raise_stub) - return info - - -class RpcKombuTestCase(common.BaseRpcAMQPTestCase): - def setUp(self): - if kombu: - self.rpc = impl_kombu - else: - self.rpc = None - super(RpcKombuTestCase, self).setUp() - - def tearDown(self): - if kombu: - impl_kombu.cleanup() - super(RpcKombuTestCase, self).tearDown() - - @test.skip_if(kombu is None, "Test requires kombu") - def test_reusing_connection(self): - """Test that reusing a connection returns same one.""" - conn_context = self.rpc.create_connection(FLAGS, new=False) - conn1 = conn_context.connection - conn_context.close() - conn_context = self.rpc.create_connection(FLAGS, new=False) - conn2 = conn_context.connection - conn_context.close() - self.assertEqual(conn1, conn2) - - @test.skip_if(kombu is None, "Test requires kombu") - def test_topic_send_receive(self): - """Test sending to a topic exchange/queue""" - - conn = self.rpc.create_connection(FLAGS) - message = 'topic test message' - - self.received_message = None - - def _callback(message): - self.received_message = message - - conn.declare_topic_consumer('a_topic', _callback) - conn.topic_send('a_topic', message) - conn.consume(limit=1) - conn.close() - - self.assertEqual(self.received_message, message) - - @test.skip_if(kombu is None, "Test requires kombu") - def test_topic_multiple_queues(self): - """Test sending to a topic exchange with multiple queues""" - - conn = self.rpc.create_connection(FLAGS) - message = 'topic test message' - - self.received_message_1 = None - self.received_message_2 = None - - def _callback1(message): - self.received_message_1 = message - - def _callback2(message): - self.received_message_2 = message - - conn.declare_topic_consumer('a_topic', _callback1, queue_name='queue1') - conn.declare_topic_consumer('a_topic', _callback2, queue_name='queue2') - conn.topic_send('a_topic', message) - conn.consume(limit=2) - conn.close() - - self.assertEqual(self.received_message_1, message) - self.assertEqual(self.received_message_2, message) - - @test.skip_if(kombu is None, "Test requires kombu") - def test_direct_send_receive(self): - """Test sending to a direct exchange/queue""" - conn = self.rpc.create_connection(FLAGS) - message = 'direct test message' - - self.received_message = None - - def _callback(message): - self.received_message = message - - conn.declare_direct_consumer('a_direct', _callback) - conn.direct_send('a_direct', message) - conn.consume(limit=1) - conn.close() - - self.assertEqual(self.received_message, message) - - @test.skip_if(kombu is None, "Test requires kombu") - def test_cast_interface_uses_default_options(self): - """Test kombu rpc.cast""" - - ctxt = context.RequestContext('fake_user', 'fake_project') - - class MyConnection(impl_kombu.Connection): - def __init__(myself, *args, **kwargs): - super(MyConnection, myself).__init__(*args, **kwargs) - self.assertEqual(myself.params, - {'hostname': FLAGS.rabbit_host, - 'userid': FLAGS.rabbit_userid, - 'password': FLAGS.rabbit_password, - 'port': FLAGS.rabbit_port, - 'virtual_host': FLAGS.rabbit_virtual_host, - 'transport': 'memory'}) - - def topic_send(_context, topic, msg): - pass - - MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection) - self.stubs.Set(impl_kombu, 'Connection', MyConnection) - - impl_kombu.cast(FLAGS, ctxt, 'fake_topic', {'msg': 'fake'}) - - @test.skip_if(kombu is None, "Test requires kombu") - def test_cast_to_server_uses_server_params(self): - """Test kombu rpc.cast""" - - ctxt = context.RequestContext('fake_user', 'fake_project') - - server_params = {'username': 'fake_username', - 'password': 'fake_password', - 'hostname': 'fake_hostname', - 'port': 31337, - 'virtual_host': 'fake_virtual_host'} - - class MyConnection(impl_kombu.Connection): - def __init__(myself, *args, **kwargs): - super(MyConnection, myself).__init__(*args, **kwargs) - self.assertEqual(myself.params, - {'hostname': server_params['hostname'], - 'userid': server_params['username'], - 'password': server_params['password'], - 'port': server_params['port'], - 'virtual_host': server_params['virtual_host'], - 'transport': 'memory'}) - - def topic_send(_context, topic, msg): - pass - - MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection) - self.stubs.Set(impl_kombu, 'Connection', MyConnection) - - impl_kombu.cast_to_server(FLAGS, ctxt, server_params, - 'fake_topic', {'msg': 'fake'}) - - @test.skip_test("kombu memory transport seems buggy with fanout queues " - "as this test passes when you use rabbit (fake_rabbit=False)") - def test_fanout_send_receive(self): - """Test sending to a fanout exchange and consuming from 2 queues""" - - conn = self.rpc.create_connection() - conn2 = self.rpc.create_connection() - message = 'fanout test message' - - self.received_message = None - - def _callback(message): - self.received_message = message - - conn.declare_fanout_consumer('a_fanout', _callback) - conn2.declare_fanout_consumer('a_fanout', _callback) - conn.fanout_send('a_fanout', message) - - conn.consume(limit=1) - conn.close() - self.assertEqual(self.received_message, message) - - self.received_message = None - conn2.consume(limit=1) - conn2.close() - self.assertEqual(self.received_message, message) - - @test.skip_if(kombu is None, "Test requires kombu") - def test_declare_consumer_errors_will_reconnect(self): - # Test that any exception with 'timeout' in it causes a - # reconnection - info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer, - '__init__', 'foo timeout foo') - - conn = self.rpc.Connection(FLAGS) - result = conn.declare_consumer(self.rpc.DirectConsumer, - 'test_topic', None) - - self.assertEqual(info['called'], 3) - self.assertTrue(isinstance(result, self.rpc.DirectConsumer)) - - # Test that any exception in transport.connection_errors causes - # a reconnection - self.stubs.UnsetAll() - - info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectConsumer, - '__init__', 'meow') - - conn = self.rpc.Connection(FLAGS) - conn.connection_errors = (MyException, ) - - result = conn.declare_consumer(self.rpc.DirectConsumer, - 'test_topic', None) - - self.assertEqual(info['called'], 2) - self.assertTrue(isinstance(result, self.rpc.DirectConsumer)) - - @test.skip_if(kombu is None, "Test requires kombu") - def test_declare_consumer_ioerrors_will_reconnect(self): - """Test that an IOError exception causes a reconnection""" - info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer, - '__init__', 'Socket closed', exc_class=IOError) - - conn = self.rpc.Connection(FLAGS) - result = conn.declare_consumer(self.rpc.DirectConsumer, - 'test_topic', None) - - self.assertEqual(info['called'], 3) - self.assertTrue(isinstance(result, self.rpc.DirectConsumer)) - - @test.skip_if(kombu is None, "Test requires kombu") - def test_publishing_errors_will_reconnect(self): - # Test that any exception with 'timeout' in it causes a - # reconnection when declaring the publisher class and when - # calling send() - info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher, - '__init__', 'foo timeout foo') - - conn = self.rpc.Connection(FLAGS) - conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg') - - self.assertEqual(info['called'], 3) - self.stubs.UnsetAll() - - info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher, - 'send', 'foo timeout foo') - - conn = self.rpc.Connection(FLAGS) - conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg') - - self.assertEqual(info['called'], 3) - - # Test that any exception in transport.connection_errors causes - # a reconnection when declaring the publisher class and when - # calling send() - self.stubs.UnsetAll() - - info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher, - '__init__', 'meow') - - conn = self.rpc.Connection(FLAGS) - conn.connection_errors = (MyException, ) - - conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg') - - self.assertEqual(info['called'], 2) - self.stubs.UnsetAll() - - info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher, - 'send', 'meow') - - conn = self.rpc.Connection(FLAGS) - conn.connection_errors = (MyException, ) - - conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg') - - self.assertEqual(info['called'], 2) - - @test.skip_if(kombu is None, "Test requires kombu") - def test_iterconsume_errors_will_reconnect(self): - conn = self.rpc.Connection(FLAGS) - message = 'reconnect test message' - - self.received_message = None - - def _callback(message): - self.received_message = message - - conn.declare_direct_consumer('a_direct', _callback) - conn.direct_send('a_direct', message) - - info = _raise_exc_stub(self.stubs, 1, conn.connection, - 'drain_events', 'foo timeout foo') - conn.consume(limit=1) - conn.close() - - self.assertEqual(self.received_message, message) - # Only called once, because our stub goes away during reconnection - - @test.skip_if(kombu is None, "Test requires kombu") - def test_call_exception(self): - """Test that exception gets passed back properly. - - rpc.call returns an Exception object. The value of the - exception is converted to a string. - - """ - self.flags(allowed_rpc_exception_modules=['exceptions']) - value = "This is the exception message" - self.assertRaises(NotImplementedError, - self.rpc.call, - FLAGS, - self.context, - 'test', - {"method": "fail", - "args": {"value": value}}) - try: - self.rpc.call(FLAGS, self.context, - 'test', - {"method": "fail", - "args": {"value": value}}) - self.fail("should have thrown Exception") - except NotImplementedError as exc: - self.assertTrue(value in unicode(exc)) - #Traceback should be included in exception message - self.assertTrue('raise NotImplementedError(value)' in unicode(exc)) - - @test.skip_if(kombu is None, "Test requires kombu") - def test_call_converted_exception(self): - """Test that exception gets passed back properly. - - rpc.call returns an Exception object. The value of the - exception is converted to a string. - - """ - value = "This is the exception message" - self.assertRaises(exception.ConvertedException, - self.rpc.call, - FLAGS, - self.context, - 'test', - {"method": "fail_converted", - "args": {"value": value}}) - try: - self.rpc.call(FLAGS, self.context, - 'test', - {"method": "fail_converted", - "args": {"value": value}}) - self.fail("should have thrown Exception") - except exception.ConvertedException as exc: - self.assertTrue(value in unicode(exc)) - #Traceback should be included in exception message - self.assertTrue('exception.ConvertedException' in unicode(exc)) diff --git a/nova/tests/rpc/test_kombu_ssl.py b/nova/tests/rpc/test_kombu_ssl.py deleted file mode 100644 index f74d38492..000000000 --- a/nova/tests/rpc/test_kombu_ssl.py +++ /dev/null @@ -1,66 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -""" -Unit Tests for remote procedure calls using kombu + ssl -""" - -from nova import flags -from nova import test - -try: - import kombu - from nova.rpc import impl_kombu -except ImportError: - kombu = None - impl_kombu = None - - -# Flag settings we will ensure get passed to amqplib -SSL_VERSION = "SSLv2" -SSL_CERT = "/tmp/cert.blah.blah" -SSL_CA_CERT = "/tmp/cert.ca.blah.blah" -SSL_KEYFILE = "/tmp/keyfile.blah.blah" - -FLAGS = flags.FLAGS - - -class RpcKombuSslTestCase(test.TestCase): - - def setUp(self): - super(RpcKombuSslTestCase, self).setUp() - if kombu: - self.flags(kombu_ssl_keyfile=SSL_KEYFILE, - kombu_ssl_ca_certs=SSL_CA_CERT, - kombu_ssl_certfile=SSL_CERT, - kombu_ssl_version=SSL_VERSION, - rabbit_use_ssl=True) - - @test.skip_if(kombu is None, "Test requires kombu") - def test_ssl_on_extended(self): - rpc = impl_kombu - conn = rpc.create_connection(FLAGS, True) - c = conn.connection - #This might be kombu version dependent... - #Since we are now peaking into the internals of kombu... - self.assertTrue(isinstance(c.connection.ssl, dict)) - self.assertEqual(SSL_VERSION, c.connection.ssl.get("ssl_version")) - self.assertEqual(SSL_CERT, c.connection.ssl.get("certfile")) - self.assertEqual(SSL_CA_CERT, c.connection.ssl.get("ca_certs")) - self.assertEqual(SSL_KEYFILE, c.connection.ssl.get("keyfile")) - #That hash then goes into amqplib which then goes - #Into python ssl creation... diff --git a/nova/tests/rpc/test_matchmaker.py b/nova/tests/rpc/test_matchmaker.py deleted file mode 100644 index f7bffa67a..000000000 --- a/nova/tests/rpc/test_matchmaker.py +++ /dev/null @@ -1,58 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2012 Cloudscaling Group, Inc -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from nova import log as logging -from nova.rpc import matchmaker -from nova import test - -LOG = logging.getLogger(__name__) - - -class _MatchMakerTestCase(test.TestCase): - def test_valid_host_matches(self): - queues = self.driver.queues(self.topic) - matched_hosts = map(lambda x: x[1], queues) - - for host in matched_hosts: - self.assertIn(host, self.hosts) - - def test_fanout_host_matches(self): - """For known hosts, see if they're in fanout.""" - queues = self.driver.queues("fanout~" + self.topic) - matched_hosts = map(lambda x: x[1], queues) - - LOG.info("Received result from matchmaker: %s", queues) - for host in self.hosts: - self.assertIn(host, matched_hosts) - - -class MatchMakerFileTestCase(_MatchMakerTestCase): - def setUp(self): - self.topic = "test" - self.hosts = ['hello', 'world', 'foo', 'bar', 'baz'] - ring = { - self.topic: self.hosts - } - self.driver = matchmaker.MatchMakerRing(ring) - super(MatchMakerFileTestCase, self).setUp() - - -class MatchMakerLocalhostTestCase(_MatchMakerTestCase): - def setUp(self): - self.driver = matchmaker.MatchMakerLocalhost() - self.topic = "test" - self.hosts = ['localhost'] - super(MatchMakerLocalhostTestCase, self).setUp() diff --git a/nova/tests/rpc/test_proxy.py b/nova/tests/rpc/test_proxy.py deleted file mode 100644 index 9ef504a0d..000000000 --- a/nova/tests/rpc/test_proxy.py +++ /dev/null @@ -1,124 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2012, Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Unit Tests for rpc.proxy -""" - -import copy - -from nova import context -from nova import rpc -from nova.rpc import proxy -from nova import test - - -class RpcProxyTestCase(test.TestCase): - - def setUp(self): - super(RpcProxyTestCase, self).setUp() - - def tearDown(self): - super(RpcProxyTestCase, self).tearDown() - - def _test_rpc_method(self, rpc_method, has_timeout=False, has_retval=False, - server_params=None, supports_topic_override=True): - topic = 'fake_topic' - timeout = 123 - rpc_proxy = proxy.RpcProxy(topic, '1.0') - ctxt = context.RequestContext('fake_user', 'fake_project') - msg = {'method': 'fake_method', 'args': {'x': 'y'}} - expected_msg = {'method': 'fake_method', 'args': {'x': 'y'}, - 'version': '1.0'} - - expected_retval = 'hi' if has_retval else None - - self.fake_args = None - self.fake_kwargs = None - - def _fake_rpc_method(*args, **kwargs): - self.fake_args = args - self.fake_kwargs = kwargs - if has_retval: - return expected_retval - - self.stubs.Set(rpc, rpc_method, _fake_rpc_method) - - args = [ctxt, msg] - if server_params: - args.insert(1, server_params) - - # Base method usage - retval = getattr(rpc_proxy, rpc_method)(*args) - self.assertEqual(retval, expected_retval) - expected_args = [ctxt, topic, expected_msg] - if server_params: - expected_args.insert(1, server_params) - for arg, expected_arg in zip(self.fake_args, expected_args): - self.assertEqual(arg, expected_arg) - - # overriding the version - retval = getattr(rpc_proxy, rpc_method)(*args, version='1.1') - self.assertEqual(retval, expected_retval) - new_msg = copy.deepcopy(expected_msg) - new_msg['version'] = '1.1' - expected_args = [ctxt, topic, new_msg] - if server_params: - expected_args.insert(1, server_params) - for arg, expected_arg in zip(self.fake_args, expected_args): - self.assertEqual(arg, expected_arg) - - if has_timeout: - # set a timeout - retval = getattr(rpc_proxy, rpc_method)(ctxt, msg, timeout=timeout) - self.assertEqual(retval, expected_retval) - expected_args = [ctxt, topic, expected_msg, timeout] - for arg, expected_arg in zip(self.fake_args, expected_args): - self.assertEqual(arg, expected_arg) - - if supports_topic_override: - # set a topic - new_topic = 'foo.bar' - retval = getattr(rpc_proxy, rpc_method)(*args, topic=new_topic) - self.assertEqual(retval, expected_retval) - expected_args = [ctxt, new_topic, expected_msg] - if server_params: - expected_args.insert(1, server_params) - for arg, expected_arg in zip(self.fake_args, expected_args): - self.assertEqual(arg, expected_arg) - - def test_call(self): - self._test_rpc_method('call', has_timeout=True, has_retval=True) - - def test_multicall(self): - self._test_rpc_method('multicall', has_timeout=True, has_retval=True) - - def test_cast(self): - self._test_rpc_method('cast') - - def test_fanout_cast(self): - self._test_rpc_method('fanout_cast', supports_topic_override=False) - - def test_cast_to_server(self): - self._test_rpc_method('cast_to_server', server_params={'blah': 1}) - - def test_fanout_cast_to_server(self): - self._test_rpc_method('fanout_cast_to_server', - server_params={'blah': 1}, supports_topic_override=False) - - def test_make_msg(self): - self.assertEqual(proxy.RpcProxy.make_msg('test_method', a=1, b=2), - {'method': 'test_method', 'args': {'a': 1, 'b': 2}}) diff --git a/nova/tests/rpc/test_qpid.py b/nova/tests/rpc/test_qpid.py deleted file mode 100644 index b8553873b..000000000 --- a/nova/tests/rpc/test_qpid.py +++ /dev/null @@ -1,370 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# Copyright 2012, Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -""" -Unit Tests for remote procedure calls using qpid -""" - -import mox - -from nova import context -from nova import flags -from nova import log as logging -from nova.rpc import amqp as rpc_amqp -from nova import test - -try: - from nova.rpc import impl_qpid - import qpid -except ImportError: - qpid = None - impl_qpid = None - - -FLAGS = flags.FLAGS -LOG = logging.getLogger(__name__) - - -class RpcQpidTestCase(test.TestCase): - """ - Exercise the public API of impl_qpid utilizing mox. - - This set of tests utilizes mox to replace the Qpid objects and ensures - that the right operations happen on them when the various public rpc API - calls are exercised. The API calls tested here include: - - nova.rpc.create_connection() - nova.rpc.common.Connection.create_consumer() - nova.rpc.common.Connection.close() - nova.rpc.cast() - nova.rpc.fanout_cast() - nova.rpc.call() - nova.rpc.multicall() - """ - - def setUp(self): - super(RpcQpidTestCase, self).setUp() - - self.mock_connection = None - self.mock_session = None - self.mock_sender = None - self.mock_receiver = None - - if qpid: - self.orig_connection = qpid.messaging.Connection - self.orig_session = qpid.messaging.Session - self.orig_sender = qpid.messaging.Sender - self.orig_receiver = qpid.messaging.Receiver - qpid.messaging.Connection = lambda *_x, **_y: self.mock_connection - qpid.messaging.Session = lambda *_x, **_y: self.mock_session - qpid.messaging.Sender = lambda *_x, **_y: self.mock_sender - qpid.messaging.Receiver = lambda *_x, **_y: self.mock_receiver - - def tearDown(self): - if qpid: - qpid.messaging.Connection = self.orig_connection - qpid.messaging.Session = self.orig_session - qpid.messaging.Sender = self.orig_sender - qpid.messaging.Receiver = self.orig_receiver - if impl_qpid: - # Need to reset this in case we changed the connection_cls - # in self._setup_to_server_tests() - impl_qpid.Connection.pool.connection_cls = impl_qpid.Connection - - super(RpcQpidTestCase, self).tearDown() - - @test.skip_if(qpid is None, "Test requires qpid") - def test_create_connection(self): - self.mock_connection = self.mox.CreateMock(self.orig_connection) - self.mock_session = self.mox.CreateMock(self.orig_session) - - self.mock_connection.opened().AndReturn(False) - self.mock_connection.open() - self.mock_connection.session().AndReturn(self.mock_session) - self.mock_connection.close() - - self.mox.ReplayAll() - - connection = impl_qpid.create_connection(FLAGS) - connection.close() - - def _test_create_consumer(self, fanout): - self.mock_connection = self.mox.CreateMock(self.orig_connection) - self.mock_session = self.mox.CreateMock(self.orig_session) - self.mock_receiver = self.mox.CreateMock(self.orig_receiver) - - self.mock_connection.opened().AndReturn(False) - self.mock_connection.open() - self.mock_connection.session().AndReturn(self.mock_session) - if fanout: - # The link name includes a UUID, so match it with a regex. - expected_address = mox.Regex(r'^impl_qpid_test_fanout ; ' - '{"node": {"x-declare": {"auto-delete": true, "durable": ' - 'false, "type": "fanout"}, "type": "topic"}, "create": ' - '"always", "link": {"x-declare": {"auto-delete": true, ' - '"exclusive": true, "durable": false}, "durable": true, ' - '"name": "impl_qpid_test_fanout_.*"}}$') - else: - expected_address = ('nova/impl_qpid_test ; {"node": {"x-declare": ' - '{"auto-delete": true, "durable": true}, "type": "topic"}, ' - '"create": "always", "link": {"x-declare": {"auto-delete": ' - 'true, "exclusive": false, "durable": false}, "durable": ' - 'true, "name": "impl_qpid_test"}}') - self.mock_session.receiver(expected_address).AndReturn( - self.mock_receiver) - self.mock_receiver.capacity = 1 - self.mock_connection.close() - - self.mox.ReplayAll() - - connection = impl_qpid.create_connection(FLAGS) - connection.create_consumer("impl_qpid_test", - lambda *_x, **_y: None, - fanout) - connection.close() - - @test.skip_if(qpid is None, "Test requires qpid") - def test_create_consumer(self): - self._test_create_consumer(fanout=False) - - @test.skip_if(qpid is None, "Test requires qpid") - def test_create_consumer_fanout(self): - self._test_create_consumer(fanout=True) - - @test.skip_if(qpid is None, "Test requires qpid") - def test_create_worker(self): - self.mock_connection = self.mox.CreateMock(self.orig_connection) - self.mock_session = self.mox.CreateMock(self.orig_session) - self.mock_receiver = self.mox.CreateMock(self.orig_receiver) - - self.mock_connection.opened().AndReturn(False) - self.mock_connection.open() - self.mock_connection.session().AndReturn(self.mock_session) - expected_address = ( - 'nova/impl_qpid_test ; {"node": {"x-declare": ' - '{"auto-delete": true, "durable": true}, "type": "topic"}, ' - '"create": "always", "link": {"x-declare": {"auto-delete": ' - 'true, "exclusive": false, "durable": false}, "durable": ' - 'true, "name": "impl.qpid.test.workers"}}') - self.mock_session.receiver(expected_address).AndReturn( - self.mock_receiver) - self.mock_receiver.capacity = 1 - self.mock_connection.close() - - self.mox.ReplayAll() - - connection = impl_qpid.create_connection(FLAGS) - connection.create_worker("impl_qpid_test", - lambda *_x, **_y: None, - 'impl.qpid.test.workers', - ) - connection.close() - - def _test_cast(self, fanout, server_params=None): - self.mock_connection = self.mox.CreateMock(self.orig_connection) - self.mock_session = self.mox.CreateMock(self.orig_session) - self.mock_sender = self.mox.CreateMock(self.orig_sender) - - self.mock_connection.opened().AndReturn(False) - self.mock_connection.open() - - self.mock_connection.session().AndReturn(self.mock_session) - if fanout: - expected_address = ('impl_qpid_test_fanout ; ' - '{"node": {"x-declare": {"auto-delete": true, ' - '"durable": false, "type": "fanout"}, ' - '"type": "topic"}, "create": "always"}') - else: - expected_address = ('nova/impl_qpid_test ; {"node": {"x-declare": ' - '{"auto-delete": true, "durable": false}, "type": "topic"}, ' - '"create": "always"}') - self.mock_session.sender(expected_address).AndReturn(self.mock_sender) - self.mock_sender.send(mox.IgnoreArg()) - if not server_params: - # This is a pooled connection, so instead of closing it, it - # gets reset, which is just creating a new session on the - # connection. - self.mock_session.close() - self.mock_connection.session().AndReturn(self.mock_session) - - self.mox.ReplayAll() - - try: - ctx = context.RequestContext("user", "project") - - args = [FLAGS, ctx, "impl_qpid_test", - {"method": "test_method", "args": {}}] - - if server_params: - args.insert(2, server_params) - if fanout: - method = impl_qpid.fanout_cast_to_server - else: - method = impl_qpid.cast_to_server - else: - if fanout: - method = impl_qpid.fanout_cast - else: - method = impl_qpid.cast - - method(*args) - finally: - while impl_qpid.Connection.pool.free_items: - # Pull the mock connection object out of the connection pool so - # that it doesn't mess up other test cases. - impl_qpid.Connection.pool.get() - - @test.skip_if(qpid is None, "Test requires qpid") - def test_cast(self): - self._test_cast(fanout=False) - - @test.skip_if(qpid is None, "Test requires qpid") - def test_fanout_cast(self): - self._test_cast(fanout=True) - - def _setup_to_server_tests(self, server_params): - class MyConnection(impl_qpid.Connection): - def __init__(myself, *args, **kwargs): - super(MyConnection, myself).__init__(*args, **kwargs) - self.assertEqual(myself.connection.username, - server_params['username']) - self.assertEqual(myself.connection.password, - server_params['password']) - self.assertEqual(myself.broker, - server_params['hostname'] + ':' + - str(server_params['port'])) - - MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection) - self.stubs.Set(impl_qpid, 'Connection', MyConnection) - - @test.skip_if(qpid is None, "Test requires qpid") - def test_cast_to_server(self): - server_params = {'username': 'fake_username', - 'password': 'fake_password', - 'hostname': 'fake_hostname', - 'port': 31337} - self._setup_to_server_tests(server_params) - self._test_cast(fanout=False, server_params=server_params) - - @test.skip_if(qpid is None, "Test requires qpid") - def test_fanout_cast_to_server(self): - server_params = {'username': 'fake_username', - 'password': 'fake_password', - 'hostname': 'fake_hostname', - 'port': 31337} - self._setup_to_server_tests(server_params) - self._test_cast(fanout=True, server_params=server_params) - - def _test_call(self, multi): - self.mock_connection = self.mox.CreateMock(self.orig_connection) - self.mock_session = self.mox.CreateMock(self.orig_session) - self.mock_sender = self.mox.CreateMock(self.orig_sender) - self.mock_receiver = self.mox.CreateMock(self.orig_receiver) - - self.mock_connection.opened().AndReturn(False) - self.mock_connection.open() - self.mock_connection.session().AndReturn(self.mock_session) - rcv_addr = mox.Regex(r'^.*/.* ; {"node": {"x-declare": {"auto-delete":' - ' true, "durable": true, "type": "direct"}, "type": ' - '"topic"}, "create": "always", "link": {"x-declare": ' - '{"auto-delete": true, "exclusive": true, "durable": ' - 'false}, "durable": true, "name": ".*"}}') - self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver) - self.mock_receiver.capacity = 1 - send_addr = ('nova/impl_qpid_test ; {"node": {"x-declare": ' - '{"auto-delete": true, "durable": false}, "type": "topic"}, ' - '"create": "always"}') - self.mock_session.sender(send_addr).AndReturn(self.mock_sender) - self.mock_sender.send(mox.IgnoreArg()) - - self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( - self.mock_receiver) - self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( - {"result": "foo", "failure": False, "ending": False})) - self.mock_session.acknowledge(mox.IgnoreArg()) - if multi: - self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( - self.mock_receiver) - self.mock_receiver.fetch().AndReturn( - qpid.messaging.Message( - {"result": "bar", "failure": False, - "ending": False})) - self.mock_session.acknowledge(mox.IgnoreArg()) - self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( - self.mock_receiver) - self.mock_receiver.fetch().AndReturn( - qpid.messaging.Message( - {"result": "baz", "failure": False, - "ending": False})) - self.mock_session.acknowledge(mox.IgnoreArg()) - self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( - self.mock_receiver) - self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( - {"failure": False, "ending": True})) - self.mock_session.acknowledge(mox.IgnoreArg()) - self.mock_session.close() - self.mock_connection.session().AndReturn(self.mock_session) - - self.mox.ReplayAll() - - try: - ctx = context.RequestContext("user", "project") - - if multi: - method = impl_qpid.multicall - else: - method = impl_qpid.call - - res = method(FLAGS, ctx, "impl_qpid_test", - {"method": "test_method", "args": {}}) - - if multi: - self.assertEquals(list(res), ["foo", "bar", "baz"]) - else: - self.assertEquals(res, "foo") - finally: - while impl_qpid.Connection.pool.free_items: - # Pull the mock connection object out of the connection pool so - # that it doesn't mess up other test cases. - impl_qpid.Connection.pool.get() - - @test.skip_if(qpid is None, "Test requires qpid") - def test_call(self): - self._test_call(multi=False) - - @test.skip_if(qpid is None, "Test requires qpid") - def test_multicall(self): - self._test_call(multi=True) - - -# -#from nova.tests.rpc import common -# -# Qpid does not have a handy in-memory transport like kombu, so it's not -# terribly straight forward to take advantage of the common unit tests. -# However, at least at the time of this writing, the common unit tests all pass -# with qpidd running. -# -# class RpcQpidCommonTestCase(common._BaseRpcTestCase): -# def setUp(self): -# self.rpc = impl_qpid -# super(RpcQpidCommonTestCase, self).setUp() -# -# def tearDown(self): -# super(RpcQpidCommonTestCase, self).tearDown() -# diff --git a/nova/tests/rpc/test_zmq.py b/nova/tests/rpc/test_zmq.py deleted file mode 100644 index aa7032850..000000000 --- a/nova/tests/rpc/test_zmq.py +++ /dev/null @@ -1,128 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2012 Cloudscaling Group, Inc. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -""" -Unit Tests for remote procedure calls using zeromq -""" - -import os - -from nova import exception -from nova import flags -from nova import log as logging -from nova import rpc -from nova import test -from nova.tests.rpc import common -from nova import utils - -try: - from eventlet.green import zmq - from nova.rpc import impl_zmq -except ImportError: - zmq = None - impl_zmq = None - -LOG = logging.getLogger(__name__) -FLAGS = flags.FLAGS - - -class _RpcZmqBaseTestCase(common.BaseRpcTestCase): - @test.skip_if(zmq is None, "Test requires zmq") - def setUp(self, topic='test', topic_nested='nested'): - if not impl_zmq: - return None - - self.reactor = None - FLAGS.register_opts(rpc.rpc_opts) - self.rpc = impl_zmq - self.rpc.register_opts(FLAGS) - FLAGS.set_default('rpc_zmq_matchmaker', - 'mod_matchmaker.MatchMakerLocalhost') - - # We'll change this if we detect no daemon running. - ipc_dir = FLAGS.rpc_zmq_ipc_dir - - # Only launch the router if it isn't running independently. - if not os.path.exists(os.path.join(ipc_dir, "zmq_topic_zmq_replies")): - LOG.info(_("Running internal zmq receiver.")) - # The normal ipc_dir default needs to run as root, - # /tmp is easier within a testing environment. - FLAGS.set_default('rpc_zmq_ipc_dir', '/tmp/nova-zmq.ipc.test') - - # Value has changed. - ipc_dir = FLAGS.rpc_zmq_ipc_dir - - try: - # Only launch the receiver if it isn't running independently. - # This is checked again, with the (possibly) new ipc_dir. - if os.path.exists(os.path.join(ipc_dir, "zmq_topic_zmq_replies")): - LOG.warning(_("Detected zmq-receiver socket. " - "Assuming nova-rpc-zmq-receiver is running.")) - return - - if not os.path.isdir(ipc_dir): - os.mkdir(ipc_dir) - - self.reactor = impl_zmq.ZmqProxy(FLAGS) - consume_in = "tcp://%s:%s" % \ - (FLAGS.rpc_zmq_bind_address, - FLAGS.rpc_zmq_port) - consumption_proxy = impl_zmq.InternalContext(None) - - self.reactor.register(consumption_proxy, - consume_in, zmq.PULL, out_bind=True) - self.reactor.consume_in_thread() - except zmq.ZMQError: - assert False, _("Could not create ZeroMQ receiver daemon. " - "Socket may already be in use.") - except OSError: - assert False, _("Could not create IPC directory %s") % \ - (ipc_dir, ) - finally: - super(_RpcZmqBaseTestCase, self).setUp( - topic=topic, topic_nested=topic_nested) - - def tearDown(self): - if not impl_zmq: - return None - if self.reactor: - self.reactor.close() - - try: - utils.execute('rm', '-rf', FLAGS.rpc_zmq_ipc_dir) - except exception.ProcessExecutionError: - pass - - super(_RpcZmqBaseTestCase, self).tearDown() - - -class RpcZmqBaseTopicTestCase(_RpcZmqBaseTestCase): - """ - This tests with topics such as 'test' and 'nested', - without any .host appended. Stresses the matchmaker. - """ - pass - - -class RpcZmqDirectTopicTestCase(_RpcZmqBaseTestCase): - """ - Test communication directly to a host, - tests use 'localhost'. - """ - def setUp(self): - super(RpcZmqDirectTopicTestCase, self).setUp( - topic='test.localhost', - topic_nested='nested.localhost') diff --git a/nova/tests/scheduler/test_rpcapi.py b/nova/tests/scheduler/test_rpcapi.py index 3617cecdf..fed367c70 100644 --- a/nova/tests/scheduler/test_rpcapi.py +++ b/nova/tests/scheduler/test_rpcapi.py @@ -20,7 +20,7 @@ Unit Tests for nova.scheduler.rpcapi from nova import context from nova import flags -from nova import rpc +from nova.openstack.common import rpc from nova.scheduler import rpcapi as scheduler_rpcapi from nova import test diff --git a/nova/tests/scheduler/test_scheduler.py b/nova/tests/scheduler/test_scheduler.py index 2c2598dbc..9979c5e04 100644 --- a/nova/tests/scheduler/test_scheduler.py +++ b/nova/tests/scheduler/test_scheduler.py @@ -28,9 +28,9 @@ from nova import db from nova import exception from nova import flags from nova.openstack.common import jsonutils +from nova.openstack.common import rpc +from nova.openstack.common.rpc import common as rpc_common from nova.openstack.common import timeutils -from nova import rpc -from nova.rpc import common as rpc_common from nova.scheduler import driver from nova.scheduler import manager from nova import test diff --git a/nova/tests/test_notifier.py b/nova/tests/test_notifier.py index 7a74c74e8..773ffe49b 100644 --- a/nova/tests/test_notifier.py +++ b/nova/tests/test_notifier.py @@ -73,7 +73,7 @@ class NotifierTestCase(test.TestCase): def mock_notify(cls, *args): self.mock_notify = True - self.stubs.Set(nova.rpc, 'notify', mock_notify) + self.stubs.Set(nova.openstack.common.rpc, 'notify', mock_notify) notifier_api.notify(ctxt, 'publisher_id', 'event_type', nova.notifier.api.WARN, dict(a=3)) @@ -96,7 +96,7 @@ class NotifierTestCase(test.TestCase): def mock_notify(context, topic, msg): self.test_topic = topic - self.stubs.Set(nova.rpc, 'notify', mock_notify) + self.stubs.Set(nova.openstack.common.rpc, 'notify', mock_notify) notifier_api.notify(ctxt, 'publisher_id', 'event_type', 'DEBUG', dict(a=3)) self.assertEqual(self.test_topic, 'testnotify.debug') @@ -112,7 +112,7 @@ class NotifierTestCase(test.TestCase): def mock_notify(context, topic, data): msgs.append(data) - self.stubs.Set(nova.rpc, 'notify', mock_notify) + self.stubs.Set(nova.openstack.common.rpc, 'notify', mock_notify) LOG.error('foo') self.assertEqual(1, len(msgs)) msg = msgs[0] diff --git a/nova/tests/test_quota.py b/nova/tests/test_quota.py index d65bc89a9..54489918b 100644 --- a/nova/tests/test_quota.py +++ b/nova/tests/test_quota.py @@ -26,9 +26,9 @@ from nova.db.sqlalchemy import api as sqa_api from nova.db.sqlalchemy import models as sqa_models from nova import exception from nova import flags +from nova.openstack.common import rpc from nova.openstack.common import timeutils from nova import quota -from nova import rpc from nova.scheduler import driver as scheduler_driver from nova import test from nova import volume diff --git a/nova/tests/test_test.py b/nova/tests/test_test.py index 3482ff6a0..f89a5bb94 100644 --- a/nova/tests/test_test.py +++ b/nova/tests/test_test.py @@ -18,7 +18,7 @@ """Tests for the testing base code.""" -from nova import rpc +from nova.openstack.common import rpc from nova import test diff --git a/nova/tests/test_volume.py b/nova/tests/test_volume.py index c8788f3ad..0aac9e8cf 100644 --- a/nova/tests/test_volume.py +++ b/nova/tests/test_volume.py @@ -31,9 +31,9 @@ from nova import flags from nova import log as logging from nova.notifier import test_notifier from nova.openstack.common import importutils +from nova.openstack.common import rpc import nova.policy from nova import quota -from nova import rpc from nova import test import nova.volume.api diff --git a/nova/virt/xenapi/pool.py b/nova/virt/xenapi/pool.py index 6b12934ff..7b0c576bf 100644 --- a/nova/virt/xenapi/pool.py +++ b/nova/virt/xenapi/pool.py @@ -28,7 +28,7 @@ from nova import flags from nova import log as logging from nova.openstack.common import cfg from nova.openstack.common import jsonutils -from nova import rpc +from nova.openstack.common import rpc from nova.virt.xenapi import vm_utils LOG = logging.getLogger(__name__) diff --git a/nova/volume/api.py b/nova/volume/api.py index d8dbd4a6b..748b2d16b 100644 --- a/nova/volume/api.py +++ b/nova/volume/api.py @@ -26,10 +26,10 @@ from nova.db import base from nova import exception from nova import flags from nova import log as logging +from nova.openstack.common import rpc from nova.openstack.common import timeutils import nova.policy from nova import quota -from nova import rpc FLAGS = flags.FLAGS flags.DECLARE('storage_availability_zone', 'nova.volume.manager') diff --git a/openstack-common.conf b/openstack-common.conf index e9681f705..efbecba3b 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -1,7 +1,7 @@ [DEFAULT] # The list of modules to copy from openstack-common -modules=cfg,excutils,importutils,iniparser,jsonutils,local,policy,setup,timeutils +modules=cfg,excutils,importutils,iniparser,jsonutils,local,policy,setup,timeutils,rpc # The base module to hold the copy of openstack.common base=nova |