summaryrefslogtreecommitdiffstats
path: root/nova
diff options
context:
space:
mode:
authorRussell Bryant <rbryant@redhat.com>2012-06-13 10:48:54 -0400
committerRussell Bryant <rbryant@redhat.com>2012-06-20 12:57:21 -0400
commitba3754e3ff672a877d90c78486c7f4d5fd4bf7b0 (patch)
tree47f35e1ce9c22ec66155986484e54acb4089efdf /nova
parent83e6cf7b92ae6a845939adf1771f0422a5e5f2ca (diff)
downloadnova-ba3754e3ff672a877d90c78486c7f4d5fd4bf7b0.tar.gz
nova-ba3754e3ff672a877d90c78486c7f4d5fd4bf7b0.tar.xz
nova-ba3754e3ff672a877d90c78486c7f4d5fd4bf7b0.zip
Use rpc from openstack-common.
Final patch for blueprint common-rpc. This patch removes nova.rpc in favor of the copy in openstack-common. Change-Id: I9c2f6bdbe8cd0c44417f75284131dbf3c126d1dd
Diffstat (limited to 'nova')
-rw-r--r--nova/api/openstack/compute/servers.py2
-rw-r--r--nova/cert/rpcapi.py4
-rw-r--r--nova/compute/manager.py2
-rw-r--r--nova/compute/rpcapi.py8
-rw-r--r--nova/console/api.py2
-rw-r--r--nova/console/rpcapi.py4
-rw-r--r--nova/consoleauth/rpcapi.py4
-rw-r--r--nova/manager.py2
-rw-r--r--nova/network/api.py2
-rw-r--r--nova/network/manager.py2
-rw-r--r--nova/network/quantum/manager.py2
-rw-r--r--nova/notifier/rabbit_notifier.py2
-rw-r--r--nova/openstack/common/rpc/__init__.py (renamed from nova/rpc/__init__.py)46
-rw-r--r--nova/openstack/common/rpc/amqp.py (renamed from nova/rpc/amqp.py)4
-rw-r--r--nova/openstack/common/rpc/common.py (renamed from nova/rpc/common.py)2
-rw-r--r--nova/openstack/common/rpc/dispatcher.py (renamed from nova/rpc/dispatcher.py)2
-rw-r--r--nova/openstack/common/rpc/impl_fake.py (renamed from nova/rpc/impl_fake.py)6
-rw-r--r--nova/openstack/common/rpc/impl_kombu.py (renamed from nova/rpc/impl_kombu.py)7
-rw-r--r--nova/openstack/common/rpc/impl_qpid.py (renamed from nova/rpc/impl_qpid.py)11
-rw-r--r--nova/openstack/common/rpc/impl_zmq.py (renamed from nova/rpc/impl_zmq.py)13
-rw-r--r--nova/openstack/common/rpc/matchmaker.py (renamed from nova/rpc/matchmaker.py)0
-rw-r--r--nova/openstack/common/rpc/proxy.py (renamed from nova/rpc/proxy.py)2
-rw-r--r--nova/scheduler/driver.py2
-rw-r--r--nova/scheduler/rpcapi.py4
-rw-r--r--nova/service.py2
-rw-r--r--nova/tests/api/ec2/test_cloud.py2
-rw-r--r--nova/tests/api/ec2/test_ec2_validate.py2
-rw-r--r--nova/tests/api/openstack/compute/contrib/test_certificates.py2
-rw-r--r--nova/tests/api/openstack/compute/contrib/test_disk_config.py2
-rw-r--r--nova/tests/api/openstack/compute/contrib/test_floating_ips.py2
-rw-r--r--nova/tests/api/openstack/compute/contrib/test_scheduler_hints.py2
-rw-r--r--nova/tests/api/openstack/compute/test_servers.py9
-rw-r--r--nova/tests/cert/test_rpcapi.py2
-rw-r--r--nova/tests/compute/test_compute.py4
-rw-r--r--nova/tests/compute/test_rpcapi.py2
-rw-r--r--nova/tests/console/test_rpcapi.py2
-rw-r--r--nova/tests/consoleauth/test_rpcapi.py2
-rw-r--r--nova/tests/fake_flags.py2
-rw-r--r--nova/tests/network/test_manager.py2
-rw-r--r--nova/tests/rpc/__init__.py19
-rw-r--r--nova/tests/rpc/common.py321
-rw-r--r--nova/tests/rpc/test_common.py144
-rw-r--r--nova/tests/rpc/test_dispatcher.py109
-rw-r--r--nova/tests/rpc/test_fake.py33
-rw-r--r--nova/tests/rpc/test_kombu.py395
-rw-r--r--nova/tests/rpc/test_kombu_ssl.py66
-rw-r--r--nova/tests/rpc/test_matchmaker.py58
-rw-r--r--nova/tests/rpc/test_proxy.py124
-rw-r--r--nova/tests/rpc/test_qpid.py370
-rw-r--r--nova/tests/rpc/test_zmq.py128
-rw-r--r--nova/tests/scheduler/test_rpcapi.py2
-rw-r--r--nova/tests/scheduler/test_scheduler.py4
-rw-r--r--nova/tests/test_notifier.py6
-rw-r--r--nova/tests/test_quota.py2
-rw-r--r--nova/tests/test_test.py2
-rw-r--r--nova/tests/test_volume.py2
-rw-r--r--nova/virt/xenapi/pool.py2
-rw-r--r--nova/volume/api.py2
58 files changed, 105 insertions, 1858 deletions
diff --git a/nova/api/openstack/compute/servers.py b/nova/api/openstack/compute/servers.py
index f0b1c355c..69f1b2f3d 100644
--- a/nova/api/openstack/compute/servers.py
+++ b/nova/api/openstack/compute/servers.py
@@ -32,8 +32,8 @@ from nova.compute import instance_types
from nova import exception
from nova import flags
from nova import log as logging
+from nova.openstack.common.rpc import common as rpc_common
from nova.openstack.common import timeutils
-from nova.rpc import common as rpc_common
from nova import utils
diff --git a/nova/cert/rpcapi.py b/nova/cert/rpcapi.py
index d062026da..cbf1af61f 100644
--- a/nova/cert/rpcapi.py
+++ b/nova/cert/rpcapi.py
@@ -19,13 +19,13 @@ Client side of the cert manager RPC API.
"""
from nova import flags
-import nova.rpc.proxy
+import nova.openstack.common.rpc.proxy
FLAGS = flags.FLAGS
-class CertAPI(nova.rpc.proxy.RpcProxy):
+class CertAPI(nova.openstack.common.rpc.proxy.RpcProxy):
'''Client side of the cert rpc API.
API version history:
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index f33dd3162..7e5070bce 100644
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -68,8 +68,8 @@ from nova.openstack.common import cfg
from nova.openstack.common import excutils
from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
+from nova.openstack.common import rpc
from nova.openstack.common import timeutils
-from nova import rpc
from nova import utils
from nova.virt import driver
from nova import volume
diff --git a/nova/compute/rpcapi.py b/nova/compute/rpcapi.py
index e7945c7d4..7a531fcf5 100644
--- a/nova/compute/rpcapi.py
+++ b/nova/compute/rpcapi.py
@@ -20,8 +20,8 @@ Client side of the compute RPC API.
from nova import exception
from nova import flags
-from nova import rpc
-import nova.rpc.proxy
+from nova.openstack.common import rpc
+import nova.openstack.common.rpc.proxy
FLAGS = flags.FLAGS
@@ -48,7 +48,7 @@ def _compute_topic(topic, ctxt, host, instance):
return rpc.queue_get_for(ctxt, topic, host)
-class ComputeAPI(nova.rpc.proxy.RpcProxy):
+class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy):
'''Client side of the compute rpc API.
API version history:
@@ -358,7 +358,7 @@ class ComputeAPI(nova.rpc.proxy.RpcProxy):
topic=_compute_topic(self.topic, ctxt, None, instance))
-class SecurityGroupAPI(nova.rpc.proxy.RpcProxy):
+class SecurityGroupAPI(nova.openstack.common.rpc.proxy.RpcProxy):
'''Client side of the security group rpc API.
API version history:
diff --git a/nova/console/api.py b/nova/console/api.py
index 46718a49e..ecbdbf2fa 100644
--- a/nova/console/api.py
+++ b/nova/console/api.py
@@ -21,7 +21,7 @@ from nova.compute import rpcapi as compute_rpcapi
from nova.console import rpcapi as console_rpcapi
from nova.db import base
from nova import flags
-from nova import rpc
+from nova.openstack.common import rpc
from nova import utils
diff --git a/nova/console/rpcapi.py b/nova/console/rpcapi.py
index 8f0c5b97f..66c963403 100644
--- a/nova/console/rpcapi.py
+++ b/nova/console/rpcapi.py
@@ -19,13 +19,13 @@ Client side of the console RPC API.
"""
from nova import flags
-import nova.rpc.proxy
+import nova.openstack.common.rpc.proxy
FLAGS = flags.FLAGS
-class ConsoleAPI(nova.rpc.proxy.RpcProxy):
+class ConsoleAPI(nova.openstack.common.rpc.proxy.RpcProxy):
'''Client side of the console rpc API.
API version history:
diff --git a/nova/consoleauth/rpcapi.py b/nova/consoleauth/rpcapi.py
index 5cb940a34..c6a02e949 100644
--- a/nova/consoleauth/rpcapi.py
+++ b/nova/consoleauth/rpcapi.py
@@ -19,13 +19,13 @@ Client side of the consoleauth RPC API.
"""
from nova import flags
-import nova.rpc.proxy
+import nova.openstack.common.rpc.proxy
FLAGS = flags.FLAGS
-class ConsoleAuthAPI(nova.rpc.proxy.RpcProxy):
+class ConsoleAuthAPI(nova.openstack.common.rpc.proxy.RpcProxy):
'''Client side of the consoleauth rpc API.
API version history:
diff --git a/nova/manager.py b/nova/manager.py
index a63b6cfcd..1d73040b4 100644
--- a/nova/manager.py
+++ b/nova/manager.py
@@ -56,7 +56,7 @@ This module provides Manager, a base class for managers.
from nova.db import base
from nova import flags
from nova import log as logging
-from nova.rpc import dispatcher as rpc_dispatcher
+from nova.openstack.common.rpc import dispatcher as rpc_dispatcher
from nova.scheduler import rpcapi as scheduler_rpcapi
from nova import version
diff --git a/nova/network/api.py b/nova/network/api.py
index 70ad5d4c2..00f694d7a 100644
--- a/nova/network/api.py
+++ b/nova/network/api.py
@@ -23,7 +23,7 @@ from nova.db import base
from nova import flags
from nova import log as logging
from nova.network import model as network_model
-from nova import rpc
+from nova.openstack.common import rpc
FLAGS = flags.FLAGS
diff --git a/nova/network/manager.py b/nova/network/manager.py
index 684a9e79d..fb37499e4 100644
--- a/nova/network/manager.py
+++ b/nova/network/manager.py
@@ -67,10 +67,10 @@ from nova.openstack.common import cfg
from nova.openstack.common import excutils
from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
+from nova.openstack.common import rpc
from nova.openstack.common import timeutils
import nova.policy
from nova import quota
-from nova import rpc
from nova import utils
diff --git a/nova/network/quantum/manager.py b/nova/network/quantum/manager.py
index 73a484570..95ee06c9f 100644
--- a/nova/network/quantum/manager.py
+++ b/nova/network/quantum/manager.py
@@ -28,7 +28,7 @@ from nova.network import manager
from nova.network.quantum import melange_ipam_lib
from nova.network.quantum import quantum_connection
from nova.openstack.common import cfg
-from nova import rpc
+from nova.openstack.common import rpc
from nova import utils
LOG = logging.getLogger(__name__)
diff --git a/nova/notifier/rabbit_notifier.py b/nova/notifier/rabbit_notifier.py
index cf39ba9be..27f6ea209 100644
--- a/nova/notifier/rabbit_notifier.py
+++ b/nova/notifier/rabbit_notifier.py
@@ -19,7 +19,7 @@ import nova.context
from nova import flags
from nova import log as logging
from nova.openstack.common import cfg
-from nova import rpc
+from nova.openstack.common import rpc
LOG = logging.getLogger(__name__)
diff --git a/nova/rpc/__init__.py b/nova/openstack/common/rpc/__init__.py
index 1ce43d650..6f022b3b2 100644
--- a/nova/rpc/__init__.py
+++ b/nova/openstack/common/rpc/__init__.py
@@ -31,7 +31,7 @@ from nova.openstack.common import importutils
rpc_opts = [
cfg.StrOpt('rpc_backend',
- default='nova.rpc.impl_kombu',
+ default='%s.impl_kombu' % __package__,
help="The messaging module to use, defaults to kombu."),
cfg.IntOpt('rpc_thread_pool_size',
default=64,
@@ -47,9 +47,9 @@ rpc_opts = [
help='Seconds to wait before a cast expires (TTL). '
'Only supported by impl_zmq.'),
cfg.ListOpt('allowed_rpc_exception_modules',
- default=['nova.exception'],
- help='Modules of exceptions that are permitted to be recreated'
- 'upon receiving exception data from an rpc call.'),
+ default=['openstack.common.exception', 'nova.exception'],
+ help='Modules of exceptions that are permitted to be recreated'
+ 'upon receiving exception data from an rpc call.'),
cfg.StrOpt('control_exchange',
default='nova',
help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
@@ -72,7 +72,7 @@ def create_connection(new=True):
implementation is free to return an existing connection from a
pool.
- :returns: An instance of nova.rpc.common.Connection
+ :returns: An instance of openstack.common.rpc.common.Connection
"""
return _get_impl().create_connection(cfg.CONF, new=new)
@@ -84,8 +84,9 @@ def call(context, topic, msg, timeout=None):
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
- nova.rpc.common.Connection.create_consumer() and only applies
- when the consumer was created with fanout=False.
+ openstack.common.rpc.common.Connection.create_consumer()
+ and only applies when the consumer was created with
+ fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
@@ -93,8 +94,8 @@ def call(context, topic, msg, timeout=None):
:returns: A dict from the remote method.
- :raises: nova.rpc.common.Timeout if a complete response is not received
- before the timeout is reached.
+ :raises: openstack.common.rpc.common.Timeout if a complete response
+ is not received before the timeout is reached.
"""
return _get_impl().call(cfg.CONF, context, topic, msg, timeout)
@@ -106,8 +107,9 @@ def cast(context, topic, msg):
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
- nova.rpc.common.Connection.create_consumer() and only applies
- when the consumer was created with fanout=False.
+ openstack.common.rpc.common.Connection.create_consumer()
+ and only applies when the consumer was created with
+ fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
@@ -126,8 +128,9 @@ def fanout_cast(context, topic, msg):
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
- nova.rpc.common.Connection.create_consumer() and only applies
- when the consumer was created with fanout=True.
+ openstack.common.rpc.common.Connection.create_consumer()
+ and only applies when the consumer was created with
+ fanout=True.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
@@ -147,8 +150,9 @@ def multicall(context, topic, msg, timeout=None):
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
- nova.rpc.common.Connection.create_consumer() and only applies
- when the consumer was created with fanout=False.
+ openstack.common.rpc.common.Connection.create_consumer()
+ and only applies when the consumer was created with
+ fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
@@ -159,8 +163,8 @@ def multicall(context, topic, msg, timeout=None):
returned and X is the Nth value that was returned by the remote
method.
- :raises: nova.rpc.common.Timeout if a complete response is not received
- before the timeout is reached.
+ :raises: openstack.common.rpc.common.Timeout if a complete response
+ is not received before the timeout is reached.
"""
return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
@@ -248,5 +252,11 @@ def _get_impl():
"""Delay import of rpc_backend until configuration is loaded."""
global _RPCIMPL
if _RPCIMPL is None:
- _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend)
+ try:
+ _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend)
+ except ImportError:
+ # For backwards compatibility with older nova config.
+ impl = cfg.CONF.rpc_backend.replace('nova.rpc',
+ 'nova.openstack.common.rpc')
+ _RPCIMPL = importutils.import_module(impl)
return _RPCIMPL
diff --git a/nova/rpc/amqp.py b/nova/openstack/common/rpc/amqp.py
index 8e5d685d5..5f62d35c5 100644
--- a/nova/rpc/amqp.py
+++ b/nova/openstack/common/rpc/amqp.py
@@ -18,7 +18,7 @@
# under the License.
"""
-Shared code between AMQP based nova.rpc implementations.
+Shared code between AMQP based openstack.common.rpc implementations.
The code in this module is shared between the rpc implemenations based on AMQP.
Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses
@@ -36,7 +36,7 @@ from eventlet import semaphore
from nova.openstack.common import excutils
from nova.openstack.common import local
-import nova.rpc.common as rpc_common
+from nova.openstack.common.rpc import common as rpc_common
LOG = logging.getLogger(__name__)
diff --git a/nova/rpc/common.py b/nova/openstack/common/rpc/common.py
index 57f7053ee..0b927a0ee 100644
--- a/nova/rpc/common.py
+++ b/nova/openstack/common/rpc/common.py
@@ -19,8 +19,10 @@
import copy
import logging
+import sys
import traceback
+from nova.openstack.common import cfg
from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
from nova.openstack.common import local
diff --git a/nova/rpc/dispatcher.py b/nova/openstack/common/rpc/dispatcher.py
index 3f46398a9..1d36f32ea 100644
--- a/nova/rpc/dispatcher.py
+++ b/nova/openstack/common/rpc/dispatcher.py
@@ -42,7 +42,7 @@ there can be both versioned and unversioned APIs implemented in the same code
base.
"""
-from nova.rpc import common as rpc_common
+from nova.openstack.common.rpc import common as rpc_common
class RpcDispatcher(object):
diff --git a/nova/rpc/impl_fake.py b/nova/openstack/common/rpc/impl_fake.py
index ea9303434..07e8bdc08 100644
--- a/nova/rpc/impl_fake.py
+++ b/nova/openstack/common/rpc/impl_fake.py
@@ -18,12 +18,12 @@ queues. Casts will block, but this is very useful for tests.
"""
import inspect
+import json
import time
import eventlet
-from nova.openstack.common import jsonutils
-from nova.rpc import common as rpc_common
+from nova.openstack.common.rpc import common as rpc_common
CONSUMERS = {}
@@ -121,7 +121,7 @@ def create_connection(conf, new=True):
def check_serialize(msg):
"""Make sure a message intended for rpc can be serialized."""
- jsonutils.dumps(msg)
+ json.dumps(msg)
def multicall(conf, context, topic, msg, timeout=None):
diff --git a/nova/rpc/impl_kombu.py b/nova/openstack/common/rpc/impl_kombu.py
index ac4b412bb..66152a9d5 100644
--- a/nova/rpc/impl_kombu.py
+++ b/nova/openstack/common/rpc/impl_kombu.py
@@ -30,8 +30,8 @@ import kombu.entity
import kombu.messaging
from nova.openstack.common import cfg
-from nova.rpc import amqp as rpc_amqp
-from nova.rpc import common as rpc_common
+from nova.openstack.common.rpc import amqp as rpc_amqp
+from nova.openstack.common.rpc import common as rpc_common
kombu_opts = [
cfg.StrOpt('kombu_ssl_version',
@@ -139,10 +139,9 @@ class ConsumerBase(object):
message = self.channel.message_to_python(raw_message)
try:
callback(message.payload)
+ message.ack()
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
- finally:
- message.ack()
self.queue.consume(*args, callback=_callback, **options)
diff --git a/nova/rpc/impl_qpid.py b/nova/openstack/common/rpc/impl_qpid.py
index c5ab4a1d5..fce8cd07b 100644
--- a/nova/rpc/impl_qpid.py
+++ b/nova/openstack/common/rpc/impl_qpid.py
@@ -17,6 +17,7 @@
import functools
import itertools
+import json
import logging
import time
import uuid
@@ -27,9 +28,9 @@ import qpid.messaging
import qpid.messaging.exceptions
from nova.openstack.common import cfg
-from nova.openstack.common import jsonutils
-from nova.rpc import amqp as rpc_amqp
-from nova.rpc import common as rpc_common
+from nova.openstack.common.gettextutils import _
+from nova.openstack.common.rpc import amqp as rpc_amqp
+from nova.openstack.common.rpc import common as rpc_common
LOG = logging.getLogger(__name__)
@@ -124,7 +125,7 @@ class ConsumerBase(object):
addr_opts["node"]["x-declare"].update(node_opts)
addr_opts["link"]["x-declare"].update(link_opts)
- self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
+ self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
self.reconnect(session)
@@ -227,7 +228,7 @@ class Publisher(object):
if node_opts:
addr_opts["node"]["x-declare"].update(node_opts)
- self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
+ self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
self.reconnect(session)
diff --git a/nova/rpc/impl_zmq.py b/nova/openstack/common/rpc/impl_zmq.py
index 35ae0094c..77768dbec 100644
--- a/nova/rpc/impl_zmq.py
+++ b/nova/openstack/common/rpc/impl_zmq.py
@@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import json
import pprint
import string
import sys
@@ -25,9 +26,9 @@ from eventlet.green import zmq
import greenlet
from nova.openstack.common import cfg
+from nova.openstack.common.gettextutils import _
from nova.openstack.common import importutils
-from nova.openstack.common import jsonutils
-from nova.rpc import common as rpc_common
+from nova.openstack.common.rpc import common as rpc_common
# for convenience, are not modified.
@@ -45,7 +46,7 @@ zmq_opts = [
# The module.Class to use for matchmaking.
cfg.StrOpt('rpc_zmq_matchmaker',
- default='nova.rpc.matchmaker.MatchMakerLocalhost',
+ default='openstack.common.rpc.matchmaker.MatchMakerLocalhost',
help='MatchMaker driver'),
# The following port is unassigned by IANA as of 2012-05-21
@@ -55,7 +56,7 @@ zmq_opts = [
cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'),
- cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/nova',
+ cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
help='Directory for holding IPC sockets'),
]
@@ -74,7 +75,7 @@ def _serialize(data):
Error if a developer passes us bad data.
"""
try:
- return str(jsonutils.dumps(data))
+ return str(json.dumps(data, ensure_ascii=True))
except TypeError:
LOG.error(_("JSON serialization failed."))
raise
@@ -85,7 +86,7 @@ def _deserialize(data):
Deserialization wrapper
"""
LOG.debug(_("Deserializing: %s"), data)
- return jsonutils.loads(data)
+ return json.loads(data)
class ZmqSocket(object):
diff --git a/nova/rpc/matchmaker.py b/nova/openstack/common/rpc/matchmaker.py
index f59e2555d..f59e2555d 100644
--- a/nova/rpc/matchmaker.py
+++ b/nova/openstack/common/rpc/matchmaker.py
diff --git a/nova/rpc/proxy.py b/nova/openstack/common/rpc/proxy.py
index 79a90dc3a..cfa2b8cd0 100644
--- a/nova/rpc/proxy.py
+++ b/nova/openstack/common/rpc/proxy.py
@@ -22,7 +22,7 @@ For more information about rpc API version numbers, see:
"""
-from nova import rpc
+from nova.openstack.common import rpc
class RpcProxy(object):
diff --git a/nova/scheduler/driver.py b/nova/scheduler/driver.py
index 3e25e8516..9cc528049 100644
--- a/nova/scheduler/driver.py
+++ b/nova/scheduler/driver.py
@@ -33,8 +33,8 @@ from nova import notifications
from nova.openstack.common import cfg
from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
+from nova.openstack.common import rpc
from nova.openstack.common import timeutils
-from nova import rpc
from nova import utils
diff --git a/nova/scheduler/rpcapi.py b/nova/scheduler/rpcapi.py
index 0e4052a7a..dea17b912 100644
--- a/nova/scheduler/rpcapi.py
+++ b/nova/scheduler/rpcapi.py
@@ -19,13 +19,13 @@ Client side of the scheduler manager RPC API.
"""
from nova import flags
-import nova.rpc.proxy
+import nova.openstack.common.rpc.proxy
FLAGS = flags.FLAGS
-class SchedulerAPI(nova.rpc.proxy.RpcProxy):
+class SchedulerAPI(nova.openstack.common.rpc.proxy.RpcProxy):
'''Client side of the scheduler rpc API.
API version history:
diff --git a/nova/service.py b/nova/service.py
index ab1fd339a..c701d3813 100644
--- a/nova/service.py
+++ b/nova/service.py
@@ -35,7 +35,7 @@ from nova import flags
from nova import log as logging
from nova.openstack.common import cfg
from nova.openstack.common import importutils
-from nova import rpc
+from nova.openstack.common import rpc
from nova import utils
from nova import version
from nova import wsgi
diff --git a/nova/tests/api/ec2/test_cloud.py b/nova/tests/api/ec2/test_cloud.py
index 26fb81faa..b981314dc 100644
--- a/nova/tests/api/ec2/test_cloud.py
+++ b/nova/tests/api/ec2/test_cloud.py
@@ -40,7 +40,7 @@ from nova.image import fake
from nova.image import s3
from nova import log as logging
from nova.network import api as network_api
-from nova import rpc
+from nova.openstack.common import rpc
from nova import test
from nova import utils
diff --git a/nova/tests/api/ec2/test_ec2_validate.py b/nova/tests/api/ec2/test_ec2_validate.py
index ee3d5bba0..d917c3352 100644
--- a/nova/tests/api/ec2/test_ec2_validate.py
+++ b/nova/tests/api/ec2/test_ec2_validate.py
@@ -25,7 +25,7 @@ from nova import flags
from nova.image import fake
from nova import log as logging
from nova.openstack.common import importutils
-from nova import rpc
+from nova.openstack.common import rpc
from nova import test
LOG = logging.getLogger(__name__)
diff --git a/nova/tests/api/openstack/compute/contrib/test_certificates.py b/nova/tests/api/openstack/compute/contrib/test_certificates.py
index 803cf5601..9c9625e8e 100644
--- a/nova/tests/api/openstack/compute/contrib/test_certificates.py
+++ b/nova/tests/api/openstack/compute/contrib/test_certificates.py
@@ -17,7 +17,7 @@ from lxml import etree
from nova.api.openstack.compute.contrib import certificates
from nova import context
-from nova import rpc
+from nova.openstack.common import rpc
from nova import test
from nova.tests.api.openstack import fakes
diff --git a/nova/tests/api/openstack/compute/contrib/test_disk_config.py b/nova/tests/api/openstack/compute/contrib/test_disk_config.py
index 4b5c390e2..e63c28080 100644
--- a/nova/tests/api/openstack/compute/contrib/test_disk_config.py
+++ b/nova/tests/api/openstack/compute/contrib/test_disk_config.py
@@ -21,7 +21,7 @@ from nova.api.openstack import compute
import nova.db.api
from nova import flags
from nova.openstack.common import jsonutils
-import nova.rpc
+import nova.openstack.common.rpc
from nova import test
from nova.tests.api.openstack import fakes
diff --git a/nova/tests/api/openstack/compute/contrib/test_floating_ips.py b/nova/tests/api/openstack/compute/contrib/test_floating_ips.py
index b1f87c6df..6ce0daa05 100644
--- a/nova/tests/api/openstack/compute/contrib/test_floating_ips.py
+++ b/nova/tests/api/openstack/compute/contrib/test_floating_ips.py
@@ -24,7 +24,7 @@ from nova import context
from nova import db
from nova import exception
from nova import network
-from nova import rpc
+from nova.openstack.common import rpc
from nova import test
from nova.tests.api.openstack import fakes
from nova.tests import fake_network
diff --git a/nova/tests/api/openstack/compute/contrib/test_scheduler_hints.py b/nova/tests/api/openstack/compute/contrib/test_scheduler_hints.py
index 5b545ec88..c651444f4 100644
--- a/nova/tests/api/openstack/compute/contrib/test_scheduler_hints.py
+++ b/nova/tests/api/openstack/compute/contrib/test_scheduler_hints.py
@@ -18,7 +18,7 @@
from nova.api.openstack import compute
import nova.db.api
from nova.openstack.common import jsonutils
-import nova.rpc
+import nova.openstack.common.rpc
from nova import test
from nova.tests.api.openstack import fakes
diff --git a/nova/tests/api/openstack/compute/test_servers.py b/nova/tests/api/openstack/compute/test_servers.py
index f0d06cf21..fbc60da68 100644
--- a/nova/tests/api/openstack/compute/test_servers.py
+++ b/nova/tests/api/openstack/compute/test_servers.py
@@ -37,7 +37,7 @@ from nova.db.sqlalchemy import models
from nova import flags
import nova.image.fake
from nova.openstack.common import jsonutils
-import nova.rpc
+import nova.openstack.common.rpc
from nova import test
from nova.tests.api.openstack import fakes
from nova.tests import fake_network
@@ -1493,11 +1493,12 @@ class ServersControllerCreateTest(test.TestCase):
self.stubs.Set(nova.db, 'instance_system_metadata_update',
fake_method)
self.stubs.Set(nova.db, 'instance_get', instance_get)
- self.stubs.Set(nova.rpc, 'cast', fake_method)
- self.stubs.Set(nova.rpc, 'call', rpc_call_wrapper)
+ self.stubs.Set(nova.openstack.common.rpc, 'cast', fake_method)
+ self.stubs.Set(nova.openstack.common.rpc, 'call', rpc_call_wrapper)
self.stubs.Set(nova.db, 'instance_update_and_get_original',
server_update)
- self.stubs.Set(nova.rpc, 'queue_get_for', queue_get_for)
+ self.stubs.Set(nova.openstack.common.rpc, 'queue_get_for',
+ queue_get_for)
self.stubs.Set(nova.network.manager.VlanManager, 'allocate_fixed_ip',
fake_method)
diff --git a/nova/tests/cert/test_rpcapi.py b/nova/tests/cert/test_rpcapi.py
index 2e3feeaaf..1deb82c98 100644
--- a/nova/tests/cert/test_rpcapi.py
+++ b/nova/tests/cert/test_rpcapi.py
@@ -21,7 +21,7 @@ Unit Tests for nova.cert.rpcapi
from nova.cert import rpcapi as cert_rpcapi
from nova import context
from nova import flags
-from nova import rpc
+from nova.openstack.common import rpc
from nova import test
diff --git a/nova/tests/compute/test_compute.py b/nova/tests/compute/test_compute.py
index 51cbb5b1f..e4d224584 100644
--- a/nova/tests/compute/test_compute.py
+++ b/nova/tests/compute/test_compute.py
@@ -45,10 +45,10 @@ from nova.notifier import test_notifier
from nova.openstack.common import importutils
from nova.openstack.common import policy as common_policy
from nova.openstack.common import timeutils
+from nova.openstack.common import rpc
+from nova.openstack.common.rpc import common as rpc_common
import nova.policy
from nova import quota
-from nova import rpc
-from nova.rpc import common as rpc_common
from nova.scheduler import driver as scheduler_driver
from nova import test
from nova.tests import fake_network
diff --git a/nova/tests/compute/test_rpcapi.py b/nova/tests/compute/test_rpcapi.py
index a0da63918..37427477f 100644
--- a/nova/tests/compute/test_rpcapi.py
+++ b/nova/tests/compute/test_rpcapi.py
@@ -21,7 +21,7 @@ Unit Tests for nova.compute.rpcapi
from nova.compute import rpcapi as compute_rpcapi
from nova import context
from nova import flags
-from nova import rpc
+from nova.openstack.common import rpc
from nova import test
diff --git a/nova/tests/console/test_rpcapi.py b/nova/tests/console/test_rpcapi.py
index 132657697..016451928 100644
--- a/nova/tests/console/test_rpcapi.py
+++ b/nova/tests/console/test_rpcapi.py
@@ -21,7 +21,7 @@ Unit Tests for nova.console.rpcapi
from nova.console import rpcapi as console_rpcapi
from nova import context
from nova import flags
-from nova import rpc
+from nova.openstack.common import rpc
from nova import test
diff --git a/nova/tests/consoleauth/test_rpcapi.py b/nova/tests/consoleauth/test_rpcapi.py
index 546183185..91626c765 100644
--- a/nova/tests/consoleauth/test_rpcapi.py
+++ b/nova/tests/consoleauth/test_rpcapi.py
@@ -21,7 +21,7 @@ Unit Tests for nova.consoleauth.rpcapi
from nova.consoleauth import rpcapi as consoleauth_rpcapi
from nova import context
from nova import flags
-from nova import rpc
+from nova.openstack.common import rpc
from nova import test
diff --git a/nova/tests/fake_flags.py b/nova/tests/fake_flags.py
index 2f5bc2a3b..3d05d7605 100644
--- a/nova/tests/fake_flags.py
+++ b/nova/tests/fake_flags.py
@@ -42,7 +42,7 @@ def set_defaults(conf):
conf.set_default('iscsi_num_targets', 8)
conf.set_default('network_size', 8)
conf.set_default('num_networks', 2)
- conf.set_default('rpc_backend', 'nova.rpc.impl_fake')
+ conf.set_default('rpc_backend', 'nova.openstack.common.rpc.impl_fake')
conf.set_default('sql_connection', "sqlite://")
conf.set_default('sqlite_synchronous', False)
conf.set_default('use_ipv6', True)
diff --git a/nova/tests/network/test_manager.py b/nova/tests/network/test_manager.py
index 817b7271d..11e293291 100644
--- a/nova/tests/network/test_manager.py
+++ b/nova/tests/network/test_manager.py
@@ -27,8 +27,8 @@ from nova import log as logging
from nova.network import linux_net
from nova.network import manager as network_manager
from nova.openstack.common import importutils
+from nova.openstack.common import rpc
import nova.policy
-from nova import rpc
from nova import test
from nova.tests import fake_network
from nova import utils
diff --git a/nova/tests/rpc/__init__.py b/nova/tests/rpc/__init__.py
deleted file mode 100644
index 7e04e7c73..000000000
--- a/nova/tests/rpc/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2011 OpenStack LLC.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# NOTE(vish): this forces the fixtures from tests/__init.py:setup() to work
-from nova.tests import *
diff --git a/nova/tests/rpc/common.py b/nova/tests/rpc/common.py
deleted file mode 100644
index 84dd79890..000000000
--- a/nova/tests/rpc/common.py
+++ /dev/null
@@ -1,321 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-"""
-Unit Tests for remote procedure calls shared between all implementations
-"""
-
-import time
-
-import eventlet
-from eventlet import greenthread
-import nose
-
-from nova import context
-from nova import exception
-from nova import flags
-from nova import log as logging
-from nova.rpc import amqp as rpc_amqp
-from nova.rpc import common as rpc_common
-from nova.rpc import dispatcher as rpc_dispatcher
-from nova import test
-
-
-FLAGS = flags.FLAGS
-LOG = logging.getLogger(__name__)
-
-
-class BaseRpcTestCase(test.TestCase):
- def setUp(self, supports_timeouts=True, topic='test',
- topic_nested='nested'):
- super(BaseRpcTestCase, self).setUp()
- self.topic = topic or self.topic
- self.topic_nested = topic_nested or self.topic_nested
- self.supports_timeouts = supports_timeouts
- self.context = context.get_admin_context()
-
- if self.rpc:
- receiver = TestReceiver()
- self.conn = self._create_consumer(receiver, self.topic)
-
- def tearDown(self):
- if self.rpc:
- self.conn.close()
- super(BaseRpcTestCase, self).tearDown()
-
- def _create_consumer(self, proxy, topic, fanout=False):
- dispatcher = rpc_dispatcher.RpcDispatcher([proxy])
- conn = self.rpc.create_connection(FLAGS, True)
- conn.create_consumer(topic, dispatcher, fanout)
- conn.consume_in_thread()
- return conn
-
- def test_call_succeed(self):
- if not self.rpc:
- raise nose.SkipTest('rpc driver not available.')
-
- value = 42
- result = self.rpc.call(FLAGS, self.context, self.topic,
- {"method": "echo", "args": {"value": value}})
- self.assertEqual(value, result)
-
- def test_call_succeed_despite_multiple_returns_yield(self):
- if not self.rpc:
- raise nose.SkipTest('rpc driver not available.')
-
- value = 42
- result = self.rpc.call(FLAGS, self.context, self.topic,
- {"method": "echo_three_times_yield",
- "args": {"value": value}})
- self.assertEqual(value + 2, result)
-
- def test_multicall_succeed_once(self):
- if not self.rpc:
- raise nose.SkipTest('rpc driver not available.')
-
- value = 42
- result = self.rpc.multicall(FLAGS, self.context,
- self.topic,
- {"method": "echo",
- "args": {"value": value}})
- for i, x in enumerate(result):
- if i > 0:
- self.fail('should only receive one response')
- self.assertEqual(value + i, x)
-
- def test_multicall_three_nones(self):
- if not self.rpc:
- raise nose.SkipTest('rpc driver not available.')
-
- value = 42
- result = self.rpc.multicall(FLAGS, self.context,
- self.topic,
- {"method": "multicall_three_nones",
- "args": {"value": value}})
- for i, x in enumerate(result):
- self.assertEqual(x, None)
- # i should have been 0, 1, and finally 2:
- self.assertEqual(i, 2)
-
- def test_multicall_succeed_three_times_yield(self):
- if not self.rpc:
- raise nose.SkipTest('rpc driver not available.')
-
- value = 42
- result = self.rpc.multicall(FLAGS, self.context,
- self.topic,
- {"method": "echo_three_times_yield",
- "args": {"value": value}})
- for i, x in enumerate(result):
- self.assertEqual(value + i, x)
-
- def test_context_passed(self):
- if not self.rpc:
- raise nose.SkipTest('rpc driver not available.')
-
- """Makes sure a context is passed through rpc call."""
- value = 42
- result = self.rpc.call(FLAGS, self.context,
- self.topic, {"method": "context",
- "args": {"value": value}})
- self.assertEqual(self.context.to_dict(), result)
-
- def _test_cast(self, fanout=False):
- """Test casts by pushing items through a channeled queue."""
-
- # Not a true global, but capitalized so
- # it is clear it is leaking scope into Nested()
- QUEUE = eventlet.queue.Queue()
-
- if not self.rpc:
- raise nose.SkipTest('rpc driver not available.')
-
- # We use the nested topic so we don't need QUEUE to be a proper
- # global, and do not keep state outside this test.
- class Nested(object):
- @staticmethod
- def put_queue(context, value):
- LOG.debug("Got value in put_queue: %s", value)
- QUEUE.put(value)
-
- nested = Nested()
- conn = self._create_consumer(nested, self.topic_nested, fanout)
- value = 42
-
- method = (self.rpc.cast, self.rpc.fanout_cast)[fanout]
- method(FLAGS, self.context,
- self.topic_nested,
- {"method": "put_queue",
- "args": {"value": value}})
-
- try:
- # If it does not succeed in 2 seconds, give up and assume
- # failure.
- result = QUEUE.get(True, 2)
- except Exception:
- self.assertEqual(value, None)
-
- conn.close()
- self.assertEqual(value, result)
-
- def test_cast_success(self):
- self._test_cast(False)
-
- def test_fanout_success(self):
- self._test_cast(True)
-
- def test_nested_calls(self):
- if not self.rpc:
- raise nose.SkipTest('rpc driver not available.')
-
- """Test that we can do an rpc.call inside another call."""
- class Nested(object):
- @staticmethod
- def echo(context, queue, value):
- """Calls echo in the passed queue."""
- LOG.debug(_("Nested received %(queue)s, %(value)s")
- % locals())
- # TODO(comstud):
- # so, it will replay the context and use the same REQID?
- # that's bizarre.
- ret = self.rpc.call(FLAGS, context,
- queue,
- {"method": "echo",
- "args": {"value": value}})
- LOG.debug(_("Nested return %s"), ret)
- return value
-
- nested = Nested()
- conn = self._create_consumer(nested, self.topic_nested)
-
- value = 42
- result = self.rpc.call(FLAGS, self.context,
- self.topic_nested,
- {"method": "echo",
- "args": {"queue": "test", "value": value}})
- conn.close()
- self.assertEqual(value, result)
-
- def test_call_timeout(self):
- if not self.rpc:
- raise nose.SkipTest('rpc driver not available.')
-
- """Make sure rpc.call will time out."""
- if not self.supports_timeouts:
- raise nose.SkipTest(_("RPC backend does not support timeouts"))
-
- value = 42
- self.assertRaises(rpc_common.Timeout,
- self.rpc.call,
- FLAGS, self.context,
- self.topic,
- {"method": "block",
- "args": {"value": value}}, timeout=1)
- try:
- self.rpc.call(FLAGS, self.context,
- self.topic,
- {"method": "block",
- "args": {"value": value}},
- timeout=1)
- self.fail("should have thrown Timeout")
- except rpc_common.Timeout as exc:
- pass
-
-
-class BaseRpcAMQPTestCase(BaseRpcTestCase):
- """Base test class for all AMQP-based RPC tests."""
- def test_proxycallback_handles_exceptions(self):
- """Make sure exceptions unpacking messages don't cause hangs."""
- if not self.rpc:
- raise nose.SkipTest('rpc driver not available.')
-
- orig_unpack = rpc_amqp.unpack_context
-
- info = {'unpacked': False}
-
- def fake_unpack_context(*args, **kwargs):
- info['unpacked'] = True
- raise test.TestingException('moo')
-
- self.stubs.Set(rpc_amqp, 'unpack_context', fake_unpack_context)
-
- value = 41
- self.rpc.cast(FLAGS, self.context, self.topic,
- {"method": "echo", "args": {"value": value}})
-
- # Wait for the cast to complete.
- for x in xrange(50):
- if info['unpacked']:
- break
- greenthread.sleep(0.1)
- else:
- self.fail("Timeout waiting for message to be consumed")
-
- # Now see if we get a response even though we raised an
- # exception for the cast above.
- self.stubs.Set(rpc_amqp, 'unpack_context', orig_unpack)
-
- value = 42
- result = self.rpc.call(FLAGS, self.context, self.topic,
- {"method": "echo",
- "args": {"value": value}})
- self.assertEqual(value, result)
-
-
-class TestReceiver(object):
- """Simple Proxy class so the consumer has methods to call.
-
- Uses static methods because we aren't actually storing any state.
-
- """
- @staticmethod
- def echo(context, value):
- """Simply returns whatever value is sent in."""
- LOG.debug(_("Received %s"), value)
- return value
-
- @staticmethod
- def context(context, value):
- """Returns dictionary version of context."""
- LOG.debug(_("Received %s"), context)
- return context.to_dict()
-
- @staticmethod
- def multicall_three_nones(context, value):
- yield None
- yield None
- yield None
-
- @staticmethod
- def echo_three_times_yield(context, value):
- yield value
- yield value + 1
- yield value + 2
-
- @staticmethod
- def fail(context, value):
- """Raises an exception with the value sent in."""
- raise NotImplementedError(value)
-
- @staticmethod
- def fail_converted(context, value):
- """Raises an exception with the value sent in."""
- raise exception.ConvertedException(explanation=value)
-
- @staticmethod
- def block(context, value):
- time.sleep(2)
diff --git a/nova/tests/rpc/test_common.py b/nova/tests/rpc/test_common.py
deleted file mode 100644
index 7a6199c0f..000000000
--- a/nova/tests/rpc/test_common.py
+++ /dev/null
@@ -1,144 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2012 OpenStack, LLC
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-"""
-Unit Tests for 'common' functons used through rpc code.
-"""
-
-import sys
-
-from nova import exception
-from nova import flags
-from nova import log as logging
-from nova.openstack.common import jsonutils
-from nova.rpc import common as rpc_common
-from nova import test
-
-FLAGS = flags.FLAGS
-LOG = logging.getLogger(__name__)
-
-
-def raise_exception():
- raise Exception("test")
-
-
-class FakeUserDefinedException(Exception):
- def __init__(self):
- Exception.__init__(self, "Test Message")
-
-
-class RpcCommonTestCase(test.TestCase):
- def test_serialize_remote_exception(self):
- expected = {
- 'class': 'Exception',
- 'module': 'exceptions',
- 'message': 'test',
- }
-
- try:
- raise_exception()
- except Exception as exc:
- failure = rpc_common.serialize_remote_exception(sys.exc_info())
-
- failure = jsonutils.loads(failure)
- #assure the traceback was added
- self.assertEqual(expected['class'], failure['class'])
- self.assertEqual(expected['module'], failure['module'])
- self.assertEqual(expected['message'], failure['message'])
-
- def test_serialize_remote_nova_exception(self):
- def raise_nova_exception():
- raise exception.NovaException("test", code=500)
-
- expected = {
- 'class': 'NovaException',
- 'module': 'nova.exception',
- 'kwargs': {'code': 500},
- 'message': 'test'
- }
-
- try:
- raise_nova_exception()
- except Exception as exc:
- failure = rpc_common.serialize_remote_exception(sys.exc_info())
-
- failure = jsonutils.loads(failure)
- #assure the traceback was added
- self.assertEqual(expected['class'], failure['class'])
- self.assertEqual(expected['module'], failure['module'])
- self.assertEqual(expected['kwargs'], failure['kwargs'])
- self.assertEqual(expected['message'], failure['message'])
-
- def test_deserialize_remote_exception(self):
- failure = {
- 'class': 'NovaException',
- 'module': 'nova.exception',
- 'message': 'test message',
- 'tb': ['raise NovaException'],
- }
- serialized = jsonutils.dumps(failure)
-
- after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized)
- self.assertTrue(isinstance(after_exc, exception.NovaException))
- self.assertTrue('test message' in unicode(after_exc))
- #assure the traceback was added
- self.assertTrue('raise NovaException' in unicode(after_exc))
-
- def test_deserialize_remote_exception_bad_module(self):
- failure = {
- 'class': 'popen2',
- 'module': 'os',
- 'kwargs': {'cmd': '/bin/echo failed'},
- 'message': 'foo',
- }
- serialized = jsonutils.dumps(failure)
-
- after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized)
- self.assertTrue(isinstance(after_exc, rpc_common.RemoteError))
-
- def test_deserialize_remote_exception_user_defined_exception(self):
- """Ensure a user defined exception can be deserialized."""
- self.flags(allowed_rpc_exception_modules=[self.__class__.__module__])
- failure = {
- 'class': 'FakeUserDefinedException',
- 'module': self.__class__.__module__,
- 'tb': ['raise FakeUserDefinedException'],
- }
- serialized = jsonutils.dumps(failure)
-
- after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized)
- self.assertTrue(isinstance(after_exc, FakeUserDefinedException))
- #assure the traceback was added
- self.assertTrue('raise FakeUserDefinedException' in unicode(after_exc))
-
- def test_deserialize_remote_exception_cannot_recreate(self):
- """Ensure a RemoteError is returned on initialization failure.
-
- If an exception cannot be recreated with it's original class then a
- RemoteError with the exception informations should still be returned.
-
- """
- self.flags(allowed_rpc_exception_modules=[self.__class__.__module__])
- failure = {
- 'class': 'FakeIDontExistException',
- 'module': self.__class__.__module__,
- 'tb': ['raise FakeIDontExistException'],
- }
- serialized = jsonutils.dumps(failure)
-
- after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized)
- self.assertTrue(isinstance(after_exc, rpc_common.RemoteError))
- #assure the traceback was added
- self.assertTrue('raise FakeIDontExistException' in unicode(after_exc))
diff --git a/nova/tests/rpc/test_dispatcher.py b/nova/tests/rpc/test_dispatcher.py
deleted file mode 100644
index d67f7b8ec..000000000
--- a/nova/tests/rpc/test_dispatcher.py
+++ /dev/null
@@ -1,109 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2012, Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-Unit Tests for rpc.dispatcher
-"""
-
-from nova import context
-from nova.rpc import common as rpc_common
-from nova.rpc import dispatcher
-from nova import test
-
-
-class RpcDispatcherTestCase(test.TestCase):
- class API1(object):
- RPC_API_VERSION = '1.0'
-
- def __init__(self):
- self.test_method_ctxt = None
- self.test_method_arg1 = None
-
- def test_method(self, ctxt, arg1):
- self.test_method_ctxt = ctxt
- self.test_method_arg1 = arg1
-
- class API2(object):
- RPC_API_VERSION = '2.1'
-
- def __init__(self):
- self.test_method_ctxt = None
- self.test_method_arg1 = None
-
- def test_method(self, ctxt, arg1):
- self.test_method_ctxt = ctxt
- self.test_method_arg1 = arg1
-
- class API3(object):
- RPC_API_VERSION = '3.5'
-
- def __init__(self):
- self.test_method_ctxt = None
- self.test_method_arg1 = None
-
- def test_method(self, ctxt, arg1):
- self.test_method_ctxt = ctxt
- self.test_method_arg1 = arg1
-
- def setUp(self):
- self.ctxt = context.RequestContext('fake_user', 'fake_project')
- super(RpcDispatcherTestCase, self).setUp()
-
- def tearDown(self):
- super(RpcDispatcherTestCase, self).tearDown()
-
- def _test_dispatch(self, version, expectations):
- v2 = self.API2()
- v3 = self.API3()
- disp = dispatcher.RpcDispatcher([v2, v3])
-
- disp.dispatch(self.ctxt, version, 'test_method', arg1=1)
-
- self.assertEqual(v2.test_method_ctxt, expectations[0])
- self.assertEqual(v2.test_method_arg1, expectations[1])
- self.assertEqual(v3.test_method_ctxt, expectations[2])
- self.assertEqual(v3.test_method_arg1, expectations[3])
-
- def test_dispatch(self):
- self._test_dispatch('2.1', (self.ctxt, 1, None, None))
- self._test_dispatch('3.5', (None, None, self.ctxt, 1))
-
- def test_dispatch_lower_minor_version(self):
- self._test_dispatch('2.0', (self.ctxt, 1, None, None))
- self._test_dispatch('3.1', (None, None, self.ctxt, 1))
-
- def test_dispatch_higher_minor_version(self):
- self.assertRaises(rpc_common.UnsupportedRpcVersion,
- self._test_dispatch, '2.6', (None, None, None, None))
- self.assertRaises(rpc_common.UnsupportedRpcVersion,
- self._test_dispatch, '3.6', (None, None, None, None))
-
- def test_dispatch_lower_major_version(self):
- self.assertRaises(rpc_common.UnsupportedRpcVersion,
- self._test_dispatch, '1.0', (None, None, None, None))
-
- def test_dispatch_higher_major_version(self):
- self.assertRaises(rpc_common.UnsupportedRpcVersion,
- self._test_dispatch, '4.0', (None, None, None, None))
-
- def test_dispatch_no_version_uses_v1(self):
- v1 = self.API1()
- disp = dispatcher.RpcDispatcher([v1])
-
- disp.dispatch(self.ctxt, None, 'test_method', arg1=1)
-
- self.assertEqual(v1.test_method_ctxt, self.ctxt)
- self.assertEqual(v1.test_method_arg1, 1)
diff --git a/nova/tests/rpc/test_fake.py b/nova/tests/rpc/test_fake.py
deleted file mode 100644
index 8d6878ca4..000000000
--- a/nova/tests/rpc/test_fake.py
+++ /dev/null
@@ -1,33 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-"""
-Unit Tests for remote procedure calls using fake_impl
-"""
-
-from nova import log as logging
-from nova.rpc import impl_fake
-from nova.tests.rpc import common
-
-
-LOG = logging.getLogger(__name__)
-
-
-class RpcFakeTestCase(common.BaseRpcTestCase):
- def setUp(self):
- self.rpc = impl_fake
- super(RpcFakeTestCase, self).setUp()
diff --git a/nova/tests/rpc/test_kombu.py b/nova/tests/rpc/test_kombu.py
deleted file mode 100644
index 0055f253c..000000000
--- a/nova/tests/rpc/test_kombu.py
+++ /dev/null
@@ -1,395 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-"""
-Unit Tests for remote procedure calls using kombu
-"""
-
-from nova import context
-from nova import exception
-from nova import flags
-from nova import log as logging
-from nova.rpc import amqp as rpc_amqp
-from nova import test
-from nova.tests.rpc import common
-
-try:
- import kombu
- from nova.rpc import impl_kombu
-except ImportError:
- kombu = None
- impl_kombu = None
-
-
-FLAGS = flags.FLAGS
-LOG = logging.getLogger(__name__)
-
-
-class MyException(Exception):
- pass
-
-
-def _raise_exc_stub(stubs, times, obj, method, exc_msg,
- exc_class=MyException):
- info = {'called': 0}
- orig_method = getattr(obj, method)
-
- def _raise_stub(*args, **kwargs):
- info['called'] += 1
- if info['called'] <= times:
- raise exc_class(exc_msg)
- orig_method(*args, **kwargs)
- stubs.Set(obj, method, _raise_stub)
- return info
-
-
-class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
- def setUp(self):
- if kombu:
- self.rpc = impl_kombu
- else:
- self.rpc = None
- super(RpcKombuTestCase, self).setUp()
-
- def tearDown(self):
- if kombu:
- impl_kombu.cleanup()
- super(RpcKombuTestCase, self).tearDown()
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_reusing_connection(self):
- """Test that reusing a connection returns same one."""
- conn_context = self.rpc.create_connection(FLAGS, new=False)
- conn1 = conn_context.connection
- conn_context.close()
- conn_context = self.rpc.create_connection(FLAGS, new=False)
- conn2 = conn_context.connection
- conn_context.close()
- self.assertEqual(conn1, conn2)
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_topic_send_receive(self):
- """Test sending to a topic exchange/queue"""
-
- conn = self.rpc.create_connection(FLAGS)
- message = 'topic test message'
-
- self.received_message = None
-
- def _callback(message):
- self.received_message = message
-
- conn.declare_topic_consumer('a_topic', _callback)
- conn.topic_send('a_topic', message)
- conn.consume(limit=1)
- conn.close()
-
- self.assertEqual(self.received_message, message)
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_topic_multiple_queues(self):
- """Test sending to a topic exchange with multiple queues"""
-
- conn = self.rpc.create_connection(FLAGS)
- message = 'topic test message'
-
- self.received_message_1 = None
- self.received_message_2 = None
-
- def _callback1(message):
- self.received_message_1 = message
-
- def _callback2(message):
- self.received_message_2 = message
-
- conn.declare_topic_consumer('a_topic', _callback1, queue_name='queue1')
- conn.declare_topic_consumer('a_topic', _callback2, queue_name='queue2')
- conn.topic_send('a_topic', message)
- conn.consume(limit=2)
- conn.close()
-
- self.assertEqual(self.received_message_1, message)
- self.assertEqual(self.received_message_2, message)
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_direct_send_receive(self):
- """Test sending to a direct exchange/queue"""
- conn = self.rpc.create_connection(FLAGS)
- message = 'direct test message'
-
- self.received_message = None
-
- def _callback(message):
- self.received_message = message
-
- conn.declare_direct_consumer('a_direct', _callback)
- conn.direct_send('a_direct', message)
- conn.consume(limit=1)
- conn.close()
-
- self.assertEqual(self.received_message, message)
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_cast_interface_uses_default_options(self):
- """Test kombu rpc.cast"""
-
- ctxt = context.RequestContext('fake_user', 'fake_project')
-
- class MyConnection(impl_kombu.Connection):
- def __init__(myself, *args, **kwargs):
- super(MyConnection, myself).__init__(*args, **kwargs)
- self.assertEqual(myself.params,
- {'hostname': FLAGS.rabbit_host,
- 'userid': FLAGS.rabbit_userid,
- 'password': FLAGS.rabbit_password,
- 'port': FLAGS.rabbit_port,
- 'virtual_host': FLAGS.rabbit_virtual_host,
- 'transport': 'memory'})
-
- def topic_send(_context, topic, msg):
- pass
-
- MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection)
- self.stubs.Set(impl_kombu, 'Connection', MyConnection)
-
- impl_kombu.cast(FLAGS, ctxt, 'fake_topic', {'msg': 'fake'})
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_cast_to_server_uses_server_params(self):
- """Test kombu rpc.cast"""
-
- ctxt = context.RequestContext('fake_user', 'fake_project')
-
- server_params = {'username': 'fake_username',
- 'password': 'fake_password',
- 'hostname': 'fake_hostname',
- 'port': 31337,
- 'virtual_host': 'fake_virtual_host'}
-
- class MyConnection(impl_kombu.Connection):
- def __init__(myself, *args, **kwargs):
- super(MyConnection, myself).__init__(*args, **kwargs)
- self.assertEqual(myself.params,
- {'hostname': server_params['hostname'],
- 'userid': server_params['username'],
- 'password': server_params['password'],
- 'port': server_params['port'],
- 'virtual_host': server_params['virtual_host'],
- 'transport': 'memory'})
-
- def topic_send(_context, topic, msg):
- pass
-
- MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection)
- self.stubs.Set(impl_kombu, 'Connection', MyConnection)
-
- impl_kombu.cast_to_server(FLAGS, ctxt, server_params,
- 'fake_topic', {'msg': 'fake'})
-
- @test.skip_test("kombu memory transport seems buggy with fanout queues "
- "as this test passes when you use rabbit (fake_rabbit=False)")
- def test_fanout_send_receive(self):
- """Test sending to a fanout exchange and consuming from 2 queues"""
-
- conn = self.rpc.create_connection()
- conn2 = self.rpc.create_connection()
- message = 'fanout test message'
-
- self.received_message = None
-
- def _callback(message):
- self.received_message = message
-
- conn.declare_fanout_consumer('a_fanout', _callback)
- conn2.declare_fanout_consumer('a_fanout', _callback)
- conn.fanout_send('a_fanout', message)
-
- conn.consume(limit=1)
- conn.close()
- self.assertEqual(self.received_message, message)
-
- self.received_message = None
- conn2.consume(limit=1)
- conn2.close()
- self.assertEqual(self.received_message, message)
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_declare_consumer_errors_will_reconnect(self):
- # Test that any exception with 'timeout' in it causes a
- # reconnection
- info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer,
- '__init__', 'foo timeout foo')
-
- conn = self.rpc.Connection(FLAGS)
- result = conn.declare_consumer(self.rpc.DirectConsumer,
- 'test_topic', None)
-
- self.assertEqual(info['called'], 3)
- self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
-
- # Test that any exception in transport.connection_errors causes
- # a reconnection
- self.stubs.UnsetAll()
-
- info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectConsumer,
- '__init__', 'meow')
-
- conn = self.rpc.Connection(FLAGS)
- conn.connection_errors = (MyException, )
-
- result = conn.declare_consumer(self.rpc.DirectConsumer,
- 'test_topic', None)
-
- self.assertEqual(info['called'], 2)
- self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_declare_consumer_ioerrors_will_reconnect(self):
- """Test that an IOError exception causes a reconnection"""
- info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer,
- '__init__', 'Socket closed', exc_class=IOError)
-
- conn = self.rpc.Connection(FLAGS)
- result = conn.declare_consumer(self.rpc.DirectConsumer,
- 'test_topic', None)
-
- self.assertEqual(info['called'], 3)
- self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_publishing_errors_will_reconnect(self):
- # Test that any exception with 'timeout' in it causes a
- # reconnection when declaring the publisher class and when
- # calling send()
- info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
- '__init__', 'foo timeout foo')
-
- conn = self.rpc.Connection(FLAGS)
- conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
-
- self.assertEqual(info['called'], 3)
- self.stubs.UnsetAll()
-
- info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
- 'send', 'foo timeout foo')
-
- conn = self.rpc.Connection(FLAGS)
- conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
-
- self.assertEqual(info['called'], 3)
-
- # Test that any exception in transport.connection_errors causes
- # a reconnection when declaring the publisher class and when
- # calling send()
- self.stubs.UnsetAll()
-
- info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
- '__init__', 'meow')
-
- conn = self.rpc.Connection(FLAGS)
- conn.connection_errors = (MyException, )
-
- conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
-
- self.assertEqual(info['called'], 2)
- self.stubs.UnsetAll()
-
- info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
- 'send', 'meow')
-
- conn = self.rpc.Connection(FLAGS)
- conn.connection_errors = (MyException, )
-
- conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
-
- self.assertEqual(info['called'], 2)
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_iterconsume_errors_will_reconnect(self):
- conn = self.rpc.Connection(FLAGS)
- message = 'reconnect test message'
-
- self.received_message = None
-
- def _callback(message):
- self.received_message = message
-
- conn.declare_direct_consumer('a_direct', _callback)
- conn.direct_send('a_direct', message)
-
- info = _raise_exc_stub(self.stubs, 1, conn.connection,
- 'drain_events', 'foo timeout foo')
- conn.consume(limit=1)
- conn.close()
-
- self.assertEqual(self.received_message, message)
- # Only called once, because our stub goes away during reconnection
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_call_exception(self):
- """Test that exception gets passed back properly.
-
- rpc.call returns an Exception object. The value of the
- exception is converted to a string.
-
- """
- self.flags(allowed_rpc_exception_modules=['exceptions'])
- value = "This is the exception message"
- self.assertRaises(NotImplementedError,
- self.rpc.call,
- FLAGS,
- self.context,
- 'test',
- {"method": "fail",
- "args": {"value": value}})
- try:
- self.rpc.call(FLAGS, self.context,
- 'test',
- {"method": "fail",
- "args": {"value": value}})
- self.fail("should have thrown Exception")
- except NotImplementedError as exc:
- self.assertTrue(value in unicode(exc))
- #Traceback should be included in exception message
- self.assertTrue('raise NotImplementedError(value)' in unicode(exc))
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_call_converted_exception(self):
- """Test that exception gets passed back properly.
-
- rpc.call returns an Exception object. The value of the
- exception is converted to a string.
-
- """
- value = "This is the exception message"
- self.assertRaises(exception.ConvertedException,
- self.rpc.call,
- FLAGS,
- self.context,
- 'test',
- {"method": "fail_converted",
- "args": {"value": value}})
- try:
- self.rpc.call(FLAGS, self.context,
- 'test',
- {"method": "fail_converted",
- "args": {"value": value}})
- self.fail("should have thrown Exception")
- except exception.ConvertedException as exc:
- self.assertTrue(value in unicode(exc))
- #Traceback should be included in exception message
- self.assertTrue('exception.ConvertedException' in unicode(exc))
diff --git a/nova/tests/rpc/test_kombu_ssl.py b/nova/tests/rpc/test_kombu_ssl.py
deleted file mode 100644
index f74d38492..000000000
--- a/nova/tests/rpc/test_kombu_ssl.py
+++ /dev/null
@@ -1,66 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-"""
-Unit Tests for remote procedure calls using kombu + ssl
-"""
-
-from nova import flags
-from nova import test
-
-try:
- import kombu
- from nova.rpc import impl_kombu
-except ImportError:
- kombu = None
- impl_kombu = None
-
-
-# Flag settings we will ensure get passed to amqplib
-SSL_VERSION = "SSLv2"
-SSL_CERT = "/tmp/cert.blah.blah"
-SSL_CA_CERT = "/tmp/cert.ca.blah.blah"
-SSL_KEYFILE = "/tmp/keyfile.blah.blah"
-
-FLAGS = flags.FLAGS
-
-
-class RpcKombuSslTestCase(test.TestCase):
-
- def setUp(self):
- super(RpcKombuSslTestCase, self).setUp()
- if kombu:
- self.flags(kombu_ssl_keyfile=SSL_KEYFILE,
- kombu_ssl_ca_certs=SSL_CA_CERT,
- kombu_ssl_certfile=SSL_CERT,
- kombu_ssl_version=SSL_VERSION,
- rabbit_use_ssl=True)
-
- @test.skip_if(kombu is None, "Test requires kombu")
- def test_ssl_on_extended(self):
- rpc = impl_kombu
- conn = rpc.create_connection(FLAGS, True)
- c = conn.connection
- #This might be kombu version dependent...
- #Since we are now peaking into the internals of kombu...
- self.assertTrue(isinstance(c.connection.ssl, dict))
- self.assertEqual(SSL_VERSION, c.connection.ssl.get("ssl_version"))
- self.assertEqual(SSL_CERT, c.connection.ssl.get("certfile"))
- self.assertEqual(SSL_CA_CERT, c.connection.ssl.get("ca_certs"))
- self.assertEqual(SSL_KEYFILE, c.connection.ssl.get("keyfile"))
- #That hash then goes into amqplib which then goes
- #Into python ssl creation...
diff --git a/nova/tests/rpc/test_matchmaker.py b/nova/tests/rpc/test_matchmaker.py
deleted file mode 100644
index f7bffa67a..000000000
--- a/nova/tests/rpc/test_matchmaker.py
+++ /dev/null
@@ -1,58 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2012 Cloudscaling Group, Inc
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from nova import log as logging
-from nova.rpc import matchmaker
-from nova import test
-
-LOG = logging.getLogger(__name__)
-
-
-class _MatchMakerTestCase(test.TestCase):
- def test_valid_host_matches(self):
- queues = self.driver.queues(self.topic)
- matched_hosts = map(lambda x: x[1], queues)
-
- for host in matched_hosts:
- self.assertIn(host, self.hosts)
-
- def test_fanout_host_matches(self):
- """For known hosts, see if they're in fanout."""
- queues = self.driver.queues("fanout~" + self.topic)
- matched_hosts = map(lambda x: x[1], queues)
-
- LOG.info("Received result from matchmaker: %s", queues)
- for host in self.hosts:
- self.assertIn(host, matched_hosts)
-
-
-class MatchMakerFileTestCase(_MatchMakerTestCase):
- def setUp(self):
- self.topic = "test"
- self.hosts = ['hello', 'world', 'foo', 'bar', 'baz']
- ring = {
- self.topic: self.hosts
- }
- self.driver = matchmaker.MatchMakerRing(ring)
- super(MatchMakerFileTestCase, self).setUp()
-
-
-class MatchMakerLocalhostTestCase(_MatchMakerTestCase):
- def setUp(self):
- self.driver = matchmaker.MatchMakerLocalhost()
- self.topic = "test"
- self.hosts = ['localhost']
- super(MatchMakerLocalhostTestCase, self).setUp()
diff --git a/nova/tests/rpc/test_proxy.py b/nova/tests/rpc/test_proxy.py
deleted file mode 100644
index 9ef504a0d..000000000
--- a/nova/tests/rpc/test_proxy.py
+++ /dev/null
@@ -1,124 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2012, Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-Unit Tests for rpc.proxy
-"""
-
-import copy
-
-from nova import context
-from nova import rpc
-from nova.rpc import proxy
-from nova import test
-
-
-class RpcProxyTestCase(test.TestCase):
-
- def setUp(self):
- super(RpcProxyTestCase, self).setUp()
-
- def tearDown(self):
- super(RpcProxyTestCase, self).tearDown()
-
- def _test_rpc_method(self, rpc_method, has_timeout=False, has_retval=False,
- server_params=None, supports_topic_override=True):
- topic = 'fake_topic'
- timeout = 123
- rpc_proxy = proxy.RpcProxy(topic, '1.0')
- ctxt = context.RequestContext('fake_user', 'fake_project')
- msg = {'method': 'fake_method', 'args': {'x': 'y'}}
- expected_msg = {'method': 'fake_method', 'args': {'x': 'y'},
- 'version': '1.0'}
-
- expected_retval = 'hi' if has_retval else None
-
- self.fake_args = None
- self.fake_kwargs = None
-
- def _fake_rpc_method(*args, **kwargs):
- self.fake_args = args
- self.fake_kwargs = kwargs
- if has_retval:
- return expected_retval
-
- self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
-
- args = [ctxt, msg]
- if server_params:
- args.insert(1, server_params)
-
- # Base method usage
- retval = getattr(rpc_proxy, rpc_method)(*args)
- self.assertEqual(retval, expected_retval)
- expected_args = [ctxt, topic, expected_msg]
- if server_params:
- expected_args.insert(1, server_params)
- for arg, expected_arg in zip(self.fake_args, expected_args):
- self.assertEqual(arg, expected_arg)
-
- # overriding the version
- retval = getattr(rpc_proxy, rpc_method)(*args, version='1.1')
- self.assertEqual(retval, expected_retval)
- new_msg = copy.deepcopy(expected_msg)
- new_msg['version'] = '1.1'
- expected_args = [ctxt, topic, new_msg]
- if server_params:
- expected_args.insert(1, server_params)
- for arg, expected_arg in zip(self.fake_args, expected_args):
- self.assertEqual(arg, expected_arg)
-
- if has_timeout:
- # set a timeout
- retval = getattr(rpc_proxy, rpc_method)(ctxt, msg, timeout=timeout)
- self.assertEqual(retval, expected_retval)
- expected_args = [ctxt, topic, expected_msg, timeout]
- for arg, expected_arg in zip(self.fake_args, expected_args):
- self.assertEqual(arg, expected_arg)
-
- if supports_topic_override:
- # set a topic
- new_topic = 'foo.bar'
- retval = getattr(rpc_proxy, rpc_method)(*args, topic=new_topic)
- self.assertEqual(retval, expected_retval)
- expected_args = [ctxt, new_topic, expected_msg]
- if server_params:
- expected_args.insert(1, server_params)
- for arg, expected_arg in zip(self.fake_args, expected_args):
- self.assertEqual(arg, expected_arg)
-
- def test_call(self):
- self._test_rpc_method('call', has_timeout=True, has_retval=True)
-
- def test_multicall(self):
- self._test_rpc_method('multicall', has_timeout=True, has_retval=True)
-
- def test_cast(self):
- self._test_rpc_method('cast')
-
- def test_fanout_cast(self):
- self._test_rpc_method('fanout_cast', supports_topic_override=False)
-
- def test_cast_to_server(self):
- self._test_rpc_method('cast_to_server', server_params={'blah': 1})
-
- def test_fanout_cast_to_server(self):
- self._test_rpc_method('fanout_cast_to_server',
- server_params={'blah': 1}, supports_topic_override=False)
-
- def test_make_msg(self):
- self.assertEqual(proxy.RpcProxy.make_msg('test_method', a=1, b=2),
- {'method': 'test_method', 'args': {'a': 1, 'b': 2}})
diff --git a/nova/tests/rpc/test_qpid.py b/nova/tests/rpc/test_qpid.py
deleted file mode 100644
index b8553873b..000000000
--- a/nova/tests/rpc/test_qpid.py
+++ /dev/null
@@ -1,370 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-# Copyright 2012, Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-"""
-Unit Tests for remote procedure calls using qpid
-"""
-
-import mox
-
-from nova import context
-from nova import flags
-from nova import log as logging
-from nova.rpc import amqp as rpc_amqp
-from nova import test
-
-try:
- from nova.rpc import impl_qpid
- import qpid
-except ImportError:
- qpid = None
- impl_qpid = None
-
-
-FLAGS = flags.FLAGS
-LOG = logging.getLogger(__name__)
-
-
-class RpcQpidTestCase(test.TestCase):
- """
- Exercise the public API of impl_qpid utilizing mox.
-
- This set of tests utilizes mox to replace the Qpid objects and ensures
- that the right operations happen on them when the various public rpc API
- calls are exercised. The API calls tested here include:
-
- nova.rpc.create_connection()
- nova.rpc.common.Connection.create_consumer()
- nova.rpc.common.Connection.close()
- nova.rpc.cast()
- nova.rpc.fanout_cast()
- nova.rpc.call()
- nova.rpc.multicall()
- """
-
- def setUp(self):
- super(RpcQpidTestCase, self).setUp()
-
- self.mock_connection = None
- self.mock_session = None
- self.mock_sender = None
- self.mock_receiver = None
-
- if qpid:
- self.orig_connection = qpid.messaging.Connection
- self.orig_session = qpid.messaging.Session
- self.orig_sender = qpid.messaging.Sender
- self.orig_receiver = qpid.messaging.Receiver
- qpid.messaging.Connection = lambda *_x, **_y: self.mock_connection
- qpid.messaging.Session = lambda *_x, **_y: self.mock_session
- qpid.messaging.Sender = lambda *_x, **_y: self.mock_sender
- qpid.messaging.Receiver = lambda *_x, **_y: self.mock_receiver
-
- def tearDown(self):
- if qpid:
- qpid.messaging.Connection = self.orig_connection
- qpid.messaging.Session = self.orig_session
- qpid.messaging.Sender = self.orig_sender
- qpid.messaging.Receiver = self.orig_receiver
- if impl_qpid:
- # Need to reset this in case we changed the connection_cls
- # in self._setup_to_server_tests()
- impl_qpid.Connection.pool.connection_cls = impl_qpid.Connection
-
- super(RpcQpidTestCase, self).tearDown()
-
- @test.skip_if(qpid is None, "Test requires qpid")
- def test_create_connection(self):
- self.mock_connection = self.mox.CreateMock(self.orig_connection)
- self.mock_session = self.mox.CreateMock(self.orig_session)
-
- self.mock_connection.opened().AndReturn(False)
- self.mock_connection.open()
- self.mock_connection.session().AndReturn(self.mock_session)
- self.mock_connection.close()
-
- self.mox.ReplayAll()
-
- connection = impl_qpid.create_connection(FLAGS)
- connection.close()
-
- def _test_create_consumer(self, fanout):
- self.mock_connection = self.mox.CreateMock(self.orig_connection)
- self.mock_session = self.mox.CreateMock(self.orig_session)
- self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
-
- self.mock_connection.opened().AndReturn(False)
- self.mock_connection.open()
- self.mock_connection.session().AndReturn(self.mock_session)
- if fanout:
- # The link name includes a UUID, so match it with a regex.
- expected_address = mox.Regex(r'^impl_qpid_test_fanout ; '
- '{"node": {"x-declare": {"auto-delete": true, "durable": '
- 'false, "type": "fanout"}, "type": "topic"}, "create": '
- '"always", "link": {"x-declare": {"auto-delete": true, '
- '"exclusive": true, "durable": false}, "durable": true, '
- '"name": "impl_qpid_test_fanout_.*"}}$')
- else:
- expected_address = ('nova/impl_qpid_test ; {"node": {"x-declare": '
- '{"auto-delete": true, "durable": true}, "type": "topic"}, '
- '"create": "always", "link": {"x-declare": {"auto-delete": '
- 'true, "exclusive": false, "durable": false}, "durable": '
- 'true, "name": "impl_qpid_test"}}')
- self.mock_session.receiver(expected_address).AndReturn(
- self.mock_receiver)
- self.mock_receiver.capacity = 1
- self.mock_connection.close()
-
- self.mox.ReplayAll()
-
- connection = impl_qpid.create_connection(FLAGS)
- connection.create_consumer("impl_qpid_test",
- lambda *_x, **_y: None,
- fanout)
- connection.close()
-
- @test.skip_if(qpid is None, "Test requires qpid")
- def test_create_consumer(self):
- self._test_create_consumer(fanout=False)
-
- @test.skip_if(qpid is None, "Test requires qpid")
- def test_create_consumer_fanout(self):
- self._test_create_consumer(fanout=True)
-
- @test.skip_if(qpid is None, "Test requires qpid")
- def test_create_worker(self):
- self.mock_connection = self.mox.CreateMock(self.orig_connection)
- self.mock_session = self.mox.CreateMock(self.orig_session)
- self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
-
- self.mock_connection.opened().AndReturn(False)
- self.mock_connection.open()
- self.mock_connection.session().AndReturn(self.mock_session)
- expected_address = (
- 'nova/impl_qpid_test ; {"node": {"x-declare": '
- '{"auto-delete": true, "durable": true}, "type": "topic"}, '
- '"create": "always", "link": {"x-declare": {"auto-delete": '
- 'true, "exclusive": false, "durable": false}, "durable": '
- 'true, "name": "impl.qpid.test.workers"}}')
- self.mock_session.receiver(expected_address).AndReturn(
- self.mock_receiver)
- self.mock_receiver.capacity = 1
- self.mock_connection.close()
-
- self.mox.ReplayAll()
-
- connection = impl_qpid.create_connection(FLAGS)
- connection.create_worker("impl_qpid_test",
- lambda *_x, **_y: None,
- 'impl.qpid.test.workers',
- )
- connection.close()
-
- def _test_cast(self, fanout, server_params=None):
- self.mock_connection = self.mox.CreateMock(self.orig_connection)
- self.mock_session = self.mox.CreateMock(self.orig_session)
- self.mock_sender = self.mox.CreateMock(self.orig_sender)
-
- self.mock_connection.opened().AndReturn(False)
- self.mock_connection.open()
-
- self.mock_connection.session().AndReturn(self.mock_session)
- if fanout:
- expected_address = ('impl_qpid_test_fanout ; '
- '{"node": {"x-declare": {"auto-delete": true, '
- '"durable": false, "type": "fanout"}, '
- '"type": "topic"}, "create": "always"}')
- else:
- expected_address = ('nova/impl_qpid_test ; {"node": {"x-declare": '
- '{"auto-delete": true, "durable": false}, "type": "topic"}, '
- '"create": "always"}')
- self.mock_session.sender(expected_address).AndReturn(self.mock_sender)
- self.mock_sender.send(mox.IgnoreArg())
- if not server_params:
- # This is a pooled connection, so instead of closing it, it
- # gets reset, which is just creating a new session on the
- # connection.
- self.mock_session.close()
- self.mock_connection.session().AndReturn(self.mock_session)
-
- self.mox.ReplayAll()
-
- try:
- ctx = context.RequestContext("user", "project")
-
- args = [FLAGS, ctx, "impl_qpid_test",
- {"method": "test_method", "args": {}}]
-
- if server_params:
- args.insert(2, server_params)
- if fanout:
- method = impl_qpid.fanout_cast_to_server
- else:
- method = impl_qpid.cast_to_server
- else:
- if fanout:
- method = impl_qpid.fanout_cast
- else:
- method = impl_qpid.cast
-
- method(*args)
- finally:
- while impl_qpid.Connection.pool.free_items:
- # Pull the mock connection object out of the connection pool so
- # that it doesn't mess up other test cases.
- impl_qpid.Connection.pool.get()
-
- @test.skip_if(qpid is None, "Test requires qpid")
- def test_cast(self):
- self._test_cast(fanout=False)
-
- @test.skip_if(qpid is None, "Test requires qpid")
- def test_fanout_cast(self):
- self._test_cast(fanout=True)
-
- def _setup_to_server_tests(self, server_params):
- class MyConnection(impl_qpid.Connection):
- def __init__(myself, *args, **kwargs):
- super(MyConnection, myself).__init__(*args, **kwargs)
- self.assertEqual(myself.connection.username,
- server_params['username'])
- self.assertEqual(myself.connection.password,
- server_params['password'])
- self.assertEqual(myself.broker,
- server_params['hostname'] + ':' +
- str(server_params['port']))
-
- MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection)
- self.stubs.Set(impl_qpid, 'Connection', MyConnection)
-
- @test.skip_if(qpid is None, "Test requires qpid")
- def test_cast_to_server(self):
- server_params = {'username': 'fake_username',
- 'password': 'fake_password',
- 'hostname': 'fake_hostname',
- 'port': 31337}
- self._setup_to_server_tests(server_params)
- self._test_cast(fanout=False, server_params=server_params)
-
- @test.skip_if(qpid is None, "Test requires qpid")
- def test_fanout_cast_to_server(self):
- server_params = {'username': 'fake_username',
- 'password': 'fake_password',
- 'hostname': 'fake_hostname',
- 'port': 31337}
- self._setup_to_server_tests(server_params)
- self._test_cast(fanout=True, server_params=server_params)
-
- def _test_call(self, multi):
- self.mock_connection = self.mox.CreateMock(self.orig_connection)
- self.mock_session = self.mox.CreateMock(self.orig_session)
- self.mock_sender = self.mox.CreateMock(self.orig_sender)
- self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
-
- self.mock_connection.opened().AndReturn(False)
- self.mock_connection.open()
- self.mock_connection.session().AndReturn(self.mock_session)
- rcv_addr = mox.Regex(r'^.*/.* ; {"node": {"x-declare": {"auto-delete":'
- ' true, "durable": true, "type": "direct"}, "type": '
- '"topic"}, "create": "always", "link": {"x-declare": '
- '{"auto-delete": true, "exclusive": true, "durable": '
- 'false}, "durable": true, "name": ".*"}}')
- self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver)
- self.mock_receiver.capacity = 1
- send_addr = ('nova/impl_qpid_test ; {"node": {"x-declare": '
- '{"auto-delete": true, "durable": false}, "type": "topic"}, '
- '"create": "always"}')
- self.mock_session.sender(send_addr).AndReturn(self.mock_sender)
- self.mock_sender.send(mox.IgnoreArg())
-
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
- self.mock_receiver)
- self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
- {"result": "foo", "failure": False, "ending": False}))
- self.mock_session.acknowledge(mox.IgnoreArg())
- if multi:
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
- self.mock_receiver)
- self.mock_receiver.fetch().AndReturn(
- qpid.messaging.Message(
- {"result": "bar", "failure": False,
- "ending": False}))
- self.mock_session.acknowledge(mox.IgnoreArg())
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
- self.mock_receiver)
- self.mock_receiver.fetch().AndReturn(
- qpid.messaging.Message(
- {"result": "baz", "failure": False,
- "ending": False}))
- self.mock_session.acknowledge(mox.IgnoreArg())
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
- self.mock_receiver)
- self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
- {"failure": False, "ending": True}))
- self.mock_session.acknowledge(mox.IgnoreArg())
- self.mock_session.close()
- self.mock_connection.session().AndReturn(self.mock_session)
-
- self.mox.ReplayAll()
-
- try:
- ctx = context.RequestContext("user", "project")
-
- if multi:
- method = impl_qpid.multicall
- else:
- method = impl_qpid.call
-
- res = method(FLAGS, ctx, "impl_qpid_test",
- {"method": "test_method", "args": {}})
-
- if multi:
- self.assertEquals(list(res), ["foo", "bar", "baz"])
- else:
- self.assertEquals(res, "foo")
- finally:
- while impl_qpid.Connection.pool.free_items:
- # Pull the mock connection object out of the connection pool so
- # that it doesn't mess up other test cases.
- impl_qpid.Connection.pool.get()
-
- @test.skip_if(qpid is None, "Test requires qpid")
- def test_call(self):
- self._test_call(multi=False)
-
- @test.skip_if(qpid is None, "Test requires qpid")
- def test_multicall(self):
- self._test_call(multi=True)
-
-
-#
-#from nova.tests.rpc import common
-#
-# Qpid does not have a handy in-memory transport like kombu, so it's not
-# terribly straight forward to take advantage of the common unit tests.
-# However, at least at the time of this writing, the common unit tests all pass
-# with qpidd running.
-#
-# class RpcQpidCommonTestCase(common._BaseRpcTestCase):
-# def setUp(self):
-# self.rpc = impl_qpid
-# super(RpcQpidCommonTestCase, self).setUp()
-#
-# def tearDown(self):
-# super(RpcQpidCommonTestCase, self).tearDown()
-#
diff --git a/nova/tests/rpc/test_zmq.py b/nova/tests/rpc/test_zmq.py
deleted file mode 100644
index aa7032850..000000000
--- a/nova/tests/rpc/test_zmq.py
+++ /dev/null
@@ -1,128 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2012 Cloudscaling Group, Inc.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-"""
-Unit Tests for remote procedure calls using zeromq
-"""
-
-import os
-
-from nova import exception
-from nova import flags
-from nova import log as logging
-from nova import rpc
-from nova import test
-from nova.tests.rpc import common
-from nova import utils
-
-try:
- from eventlet.green import zmq
- from nova.rpc import impl_zmq
-except ImportError:
- zmq = None
- impl_zmq = None
-
-LOG = logging.getLogger(__name__)
-FLAGS = flags.FLAGS
-
-
-class _RpcZmqBaseTestCase(common.BaseRpcTestCase):
- @test.skip_if(zmq is None, "Test requires zmq")
- def setUp(self, topic='test', topic_nested='nested'):
- if not impl_zmq:
- return None
-
- self.reactor = None
- FLAGS.register_opts(rpc.rpc_opts)
- self.rpc = impl_zmq
- self.rpc.register_opts(FLAGS)
- FLAGS.set_default('rpc_zmq_matchmaker',
- 'mod_matchmaker.MatchMakerLocalhost')
-
- # We'll change this if we detect no daemon running.
- ipc_dir = FLAGS.rpc_zmq_ipc_dir
-
- # Only launch the router if it isn't running independently.
- if not os.path.exists(os.path.join(ipc_dir, "zmq_topic_zmq_replies")):
- LOG.info(_("Running internal zmq receiver."))
- # The normal ipc_dir default needs to run as root,
- # /tmp is easier within a testing environment.
- FLAGS.set_default('rpc_zmq_ipc_dir', '/tmp/nova-zmq.ipc.test')
-
- # Value has changed.
- ipc_dir = FLAGS.rpc_zmq_ipc_dir
-
- try:
- # Only launch the receiver if it isn't running independently.
- # This is checked again, with the (possibly) new ipc_dir.
- if os.path.exists(os.path.join(ipc_dir, "zmq_topic_zmq_replies")):
- LOG.warning(_("Detected zmq-receiver socket. "
- "Assuming nova-rpc-zmq-receiver is running."))
- return
-
- if not os.path.isdir(ipc_dir):
- os.mkdir(ipc_dir)
-
- self.reactor = impl_zmq.ZmqProxy(FLAGS)
- consume_in = "tcp://%s:%s" % \
- (FLAGS.rpc_zmq_bind_address,
- FLAGS.rpc_zmq_port)
- consumption_proxy = impl_zmq.InternalContext(None)
-
- self.reactor.register(consumption_proxy,
- consume_in, zmq.PULL, out_bind=True)
- self.reactor.consume_in_thread()
- except zmq.ZMQError:
- assert False, _("Could not create ZeroMQ receiver daemon. "
- "Socket may already be in use.")
- except OSError:
- assert False, _("Could not create IPC directory %s") % \
- (ipc_dir, )
- finally:
- super(_RpcZmqBaseTestCase, self).setUp(
- topic=topic, topic_nested=topic_nested)
-
- def tearDown(self):
- if not impl_zmq:
- return None
- if self.reactor:
- self.reactor.close()
-
- try:
- utils.execute('rm', '-rf', FLAGS.rpc_zmq_ipc_dir)
- except exception.ProcessExecutionError:
- pass
-
- super(_RpcZmqBaseTestCase, self).tearDown()
-
-
-class RpcZmqBaseTopicTestCase(_RpcZmqBaseTestCase):
- """
- This tests with topics such as 'test' and 'nested',
- without any .host appended. Stresses the matchmaker.
- """
- pass
-
-
-class RpcZmqDirectTopicTestCase(_RpcZmqBaseTestCase):
- """
- Test communication directly to a host,
- tests use 'localhost'.
- """
- def setUp(self):
- super(RpcZmqDirectTopicTestCase, self).setUp(
- topic='test.localhost',
- topic_nested='nested.localhost')
diff --git a/nova/tests/scheduler/test_rpcapi.py b/nova/tests/scheduler/test_rpcapi.py
index 3617cecdf..fed367c70 100644
--- a/nova/tests/scheduler/test_rpcapi.py
+++ b/nova/tests/scheduler/test_rpcapi.py
@@ -20,7 +20,7 @@ Unit Tests for nova.scheduler.rpcapi
from nova import context
from nova import flags
-from nova import rpc
+from nova.openstack.common import rpc
from nova.scheduler import rpcapi as scheduler_rpcapi
from nova import test
diff --git a/nova/tests/scheduler/test_scheduler.py b/nova/tests/scheduler/test_scheduler.py
index 2c2598dbc..9979c5e04 100644
--- a/nova/tests/scheduler/test_scheduler.py
+++ b/nova/tests/scheduler/test_scheduler.py
@@ -28,9 +28,9 @@ from nova import db
from nova import exception
from nova import flags
from nova.openstack.common import jsonutils
+from nova.openstack.common import rpc
+from nova.openstack.common.rpc import common as rpc_common
from nova.openstack.common import timeutils
-from nova import rpc
-from nova.rpc import common as rpc_common
from nova.scheduler import driver
from nova.scheduler import manager
from nova import test
diff --git a/nova/tests/test_notifier.py b/nova/tests/test_notifier.py
index 7a74c74e8..773ffe49b 100644
--- a/nova/tests/test_notifier.py
+++ b/nova/tests/test_notifier.py
@@ -73,7 +73,7 @@ class NotifierTestCase(test.TestCase):
def mock_notify(cls, *args):
self.mock_notify = True
- self.stubs.Set(nova.rpc, 'notify', mock_notify)
+ self.stubs.Set(nova.openstack.common.rpc, 'notify', mock_notify)
notifier_api.notify(ctxt, 'publisher_id', 'event_type',
nova.notifier.api.WARN, dict(a=3))
@@ -96,7 +96,7 @@ class NotifierTestCase(test.TestCase):
def mock_notify(context, topic, msg):
self.test_topic = topic
- self.stubs.Set(nova.rpc, 'notify', mock_notify)
+ self.stubs.Set(nova.openstack.common.rpc, 'notify', mock_notify)
notifier_api.notify(ctxt, 'publisher_id',
'event_type', 'DEBUG', dict(a=3))
self.assertEqual(self.test_topic, 'testnotify.debug')
@@ -112,7 +112,7 @@ class NotifierTestCase(test.TestCase):
def mock_notify(context, topic, data):
msgs.append(data)
- self.stubs.Set(nova.rpc, 'notify', mock_notify)
+ self.stubs.Set(nova.openstack.common.rpc, 'notify', mock_notify)
LOG.error('foo')
self.assertEqual(1, len(msgs))
msg = msgs[0]
diff --git a/nova/tests/test_quota.py b/nova/tests/test_quota.py
index d65bc89a9..54489918b 100644
--- a/nova/tests/test_quota.py
+++ b/nova/tests/test_quota.py
@@ -26,9 +26,9 @@ from nova.db.sqlalchemy import api as sqa_api
from nova.db.sqlalchemy import models as sqa_models
from nova import exception
from nova import flags
+from nova.openstack.common import rpc
from nova.openstack.common import timeutils
from nova import quota
-from nova import rpc
from nova.scheduler import driver as scheduler_driver
from nova import test
from nova import volume
diff --git a/nova/tests/test_test.py b/nova/tests/test_test.py
index 3482ff6a0..f89a5bb94 100644
--- a/nova/tests/test_test.py
+++ b/nova/tests/test_test.py
@@ -18,7 +18,7 @@
"""Tests for the testing base code."""
-from nova import rpc
+from nova.openstack.common import rpc
from nova import test
diff --git a/nova/tests/test_volume.py b/nova/tests/test_volume.py
index c8788f3ad..0aac9e8cf 100644
--- a/nova/tests/test_volume.py
+++ b/nova/tests/test_volume.py
@@ -31,9 +31,9 @@ from nova import flags
from nova import log as logging
from nova.notifier import test_notifier
from nova.openstack.common import importutils
+from nova.openstack.common import rpc
import nova.policy
from nova import quota
-from nova import rpc
from nova import test
import nova.volume.api
diff --git a/nova/virt/xenapi/pool.py b/nova/virt/xenapi/pool.py
index 6b12934ff..7b0c576bf 100644
--- a/nova/virt/xenapi/pool.py
+++ b/nova/virt/xenapi/pool.py
@@ -28,7 +28,7 @@ from nova import flags
from nova import log as logging
from nova.openstack.common import cfg
from nova.openstack.common import jsonutils
-from nova import rpc
+from nova.openstack.common import rpc
from nova.virt.xenapi import vm_utils
LOG = logging.getLogger(__name__)
diff --git a/nova/volume/api.py b/nova/volume/api.py
index d8dbd4a6b..748b2d16b 100644
--- a/nova/volume/api.py
+++ b/nova/volume/api.py
@@ -26,10 +26,10 @@ from nova.db import base
from nova import exception
from nova import flags
from nova import log as logging
+from nova.openstack.common import rpc
from nova.openstack.common import timeutils
import nova.policy
from nova import quota
-from nova import rpc
FLAGS = flags.FLAGS
flags.DECLARE('storage_availability_zone', 'nova.volume.manager')