summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--nova/manager.py12
-rw-r--r--nova/rpc/__init__.py8
-rw-r--r--nova/rpc/amqp.py19
-rw-r--r--nova/rpc/common.py5
-rw-r--r--nova/rpc/dispatcher.py105
-rw-r--r--nova/rpc/impl_fake.py9
-rw-r--r--nova/rpc/proxy.py161
-rw-r--r--nova/service.py8
-rw-r--r--nova/tests/rpc/common.py10
-rw-r--r--nova/tests/rpc/test_dispatcher.py109
-rw-r--r--nova/tests/rpc/test_proxy.py124
11 files changed, 550 insertions, 20 deletions
diff --git a/nova/manager.py b/nova/manager.py
index 7a0b2888e..cbe67e008 100644
--- a/nova/manager.py
+++ b/nova/manager.py
@@ -56,6 +56,7 @@ This module provides Manager, a base class for managers.
from nova.db import base
from nova import flags
from nova import log as logging
+from nova.rpc import dispatcher as rpc_dispatcher
from nova.scheduler import api
from nova import version
@@ -130,12 +131,23 @@ class ManagerMeta(type):
class Manager(base.Base):
__metaclass__ = ManagerMeta
+ # Set RPC API version to 1.0 by default.
+ RPC_API_VERSION = '1.0'
+
def __init__(self, host=None, db_driver=None):
if not host:
host = FLAGS.host
self.host = host
super(Manager, self).__init__(db_driver)
+ def create_rpc_dispatcher(self):
+ '''Get the rpc dispatcher for this manager.
+
+ If a manager would like to set an rpc API version, or support more than
+ one class as the target of rpc messages, override this method.
+ '''
+ return rpc_dispatcher.RpcDispatcher([self])
+
def periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
for task_name, task in self._periodic_tasks:
diff --git a/nova/rpc/__init__.py b/nova/rpc/__init__.py
index acfc10e7e..b48e47610 100644
--- a/nova/rpc/__init__.py
+++ b/nova/rpc/__init__.py
@@ -17,6 +17,14 @@
# License for the specific language governing permissions and limitations
# under the License.
+"""
+A remote procedure call (rpc) abstraction.
+
+For some wrappers that add message versioning to rpc, see:
+ rpc.dispatcher
+ rpc.proxy
+"""
+
from nova.openstack.common import cfg
from nova.openstack.common import importutils
diff --git a/nova/rpc/amqp.py b/nova/rpc/amqp.py
index b0a0d5d12..0e079f533 100644
--- a/nova/rpc/amqp.py
+++ b/nova/rpc/amqp.py
@@ -242,23 +242,26 @@ class ProxyCallback(object):
ctxt = unpack_context(self.conf, message_data)
method = message_data.get('method')
args = message_data.get('args', {})
+ version = message_data.get('version', None)
if not method:
LOG.warn(_('no method for message: %s') % message_data)
ctxt.reply(_('No method for message: %s') % message_data,
connection_pool=self.connection_pool)
return
- self.pool.spawn_n(self._process_data, ctxt, method, args)
+ self.pool.spawn_n(self._process_data, ctxt, version, method, args)
- def _process_data(self, ctxt, method, args):
- """Thread that magically looks for a method on the proxy
- object and calls it.
+ def _process_data(self, ctxt, version, method, args):
+ """Process a message in a new thread.
+
+ If the proxy object we have has a dispatch method
+ (see rpc.dispatcher.RpcDispatcher), pass it the version,
+ method, and args and let it dispatch as appropriate. If not, use
+ the old behavior of magically calling the specified method on the
+ proxy we have here.
"""
ctxt.update_store()
try:
- node_func = getattr(self.proxy, str(method))
- node_args = dict((str(k), v) for k, v in args.iteritems())
- # NOTE(vish): magic is fun!
- rval = node_func(context=ctxt, **node_args)
+ rval = self.proxy.dispatch(ctxt, version, method, **args)
# Check if the result was a generator
if inspect.isgenerator(rval):
for x in rval:
diff --git a/nova/rpc/common.py b/nova/rpc/common.py
index d15d1f3f1..aee243800 100644
--- a/nova/rpc/common.py
+++ b/nova/rpc/common.py
@@ -85,6 +85,11 @@ class InvalidRPCConnectionReuse(RPCException):
message = _("Invalid reuse of an RPC connection.")
+class UnsupportedRpcVersion(RPCException):
+ message = _("Specified RPC version, %(version)s, not supported by "
+ "this endpoint.")
+
+
class Connection(object):
"""A connection, returned by rpc.create_connection().
diff --git a/nova/rpc/dispatcher.py b/nova/rpc/dispatcher.py
new file mode 100644
index 000000000..3f46398a9
--- /dev/null
+++ b/nova/rpc/dispatcher.py
@@ -0,0 +1,105 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Code for rpc message dispatching.
+
+Messages that come in have a version number associated with them. RPC API
+version numbers are in the form:
+
+ Major.Minor
+
+For a given message with version X.Y, the receiver must be marked as able to
+handle messages of version A.B, where:
+
+ A = X
+
+ B >= Y
+
+The Major version number would be incremented for an almost completely new API.
+The Minor version number would be incremented for backwards compatible changes
+to an existing API. A backwards compatible change could be something like
+adding a new method, adding an argument to an existing method (but not
+requiring it), or changing the type for an existing argument (but still
+handling the old type as well).
+
+The conversion over to a versioned API must be done on both the client side and
+server side of the API at the same time. However, as the code stands today,
+there can be both versioned and unversioned APIs implemented in the same code
+base.
+"""
+
+from nova.rpc import common as rpc_common
+
+
+class RpcDispatcher(object):
+ """Dispatch rpc messages according to the requested API version.
+
+ This class can be used as the top level 'manager' for a service. It
+ contains a list of underlying managers that have an API_VERSION attribute.
+ """
+
+ def __init__(self, callbacks):
+ """Initialize the rpc dispatcher.
+
+ :param callbacks: List of proxy objects that are an instance
+ of a class with rpc methods exposed. Each proxy
+ object should have an RPC_API_VERSION attribute.
+ """
+ self.callbacks = callbacks
+ super(RpcDispatcher, self).__init__()
+
+ @staticmethod
+ def _is_compatible(mversion, version):
+ """Determine whether versions are compatible.
+
+ :param mversion: The API version implemented by a callback.
+ :param version: The API version requested by an incoming message.
+ """
+ version_parts = version.split('.')
+ mversion_parts = mversion.split('.')
+ if int(version_parts[0]) != int(mversion_parts[0]): # Major
+ return False
+ if int(version_parts[1]) > int(mversion_parts[1]): # Minor
+ return False
+ return True
+
+ def dispatch(self, ctxt, version, method, **kwargs):
+ """Dispatch a message based on a requested version.
+
+ :param ctxt: The request context
+ :param version: The requested API version from the incoming message
+ :param method: The method requested to be called by the incoming
+ message.
+ :param kwargs: A dict of keyword arguments to be passed to the method.
+
+ :returns: Whatever is returned by the underlying method that gets
+ called.
+ """
+ if not version:
+ version = '1.0'
+
+ for proxyobj in self.callbacks:
+ if hasattr(proxyobj, 'RPC_API_VERSION'):
+ rpc_api_version = proxyobj.RPC_API_VERSION
+ else:
+ rpc_api_version = '1.0'
+ if not hasattr(proxyobj, method):
+ continue
+ if self._is_compatible(rpc_api_version, version):
+ return getattr(proxyobj, method)(ctxt, **kwargs)
+
+ raise rpc_common.UnsupportedRpcVersion(version=version)
diff --git a/nova/rpc/impl_fake.py b/nova/rpc/impl_fake.py
index 99c686901..70a8ca5f7 100644
--- a/nova/rpc/impl_fake.py
+++ b/nova/rpc/impl_fake.py
@@ -47,15 +47,13 @@ class Consumer(object):
self.topic = topic
self.proxy = proxy
- def call(self, context, method, args, timeout):
- node_func = getattr(self.proxy, method)
- node_args = dict((str(k), v) for k, v in args.iteritems())
+ def call(self, context, version, method, args, timeout):
done = eventlet.event.Event()
def _inner():
ctxt = RpcContext.from_dict(context.to_dict())
try:
- rval = node_func(context=ctxt, **node_args)
+ rval = self.proxy.dispatch(context, version, method, **args)
res = []
# Caller might have called ctxt.reply() manually
for (reply, failure) in ctxt._response:
@@ -129,13 +127,14 @@ def multicall(conf, context, topic, msg, timeout=None):
if not method:
return
args = msg.get('args', {})
+ version = msg.get('version', None)
try:
consumer = CONSUMERS[topic][0]
except (KeyError, IndexError):
return iter([None])
else:
- return consumer.call(context, method, args, timeout)
+ return consumer.call(context, version, method, args, timeout)
def call(conf, context, topic, msg, timeout=None):
diff --git a/nova/rpc/proxy.py b/nova/rpc/proxy.py
new file mode 100644
index 000000000..79a90dc3a
--- /dev/null
+++ b/nova/rpc/proxy.py
@@ -0,0 +1,161 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+A helper class for proxy objects to remote APIs.
+
+For more information about rpc API version numbers, see:
+ rpc/dispatcher.py
+"""
+
+
+from nova import rpc
+
+
+class RpcProxy(object):
+ """A helper class for rpc clients.
+
+ This class is a wrapper around the RPC client API. It allows you to
+ specify the topic and API version in a single place. This is intended to
+ be used as a base class for a class that implements the client side of an
+ rpc API.
+ """
+
+ def __init__(self, topic, default_version):
+ """Initialize an RpcProxy.
+
+ :param topic: The topic to use for all messages.
+ :param default_version: The default API version to request in all
+ outgoing messages. This can be overridden on a per-message
+ basis.
+ """
+ self.topic = topic
+ self.default_version = default_version
+ super(RpcProxy, self).__init__()
+
+ def _set_version(self, msg, vers):
+ """Helper method to set the version in a message.
+
+ :param msg: The message having a version added to it.
+ :param vers: The version number to add to the message.
+ """
+ msg['version'] = vers if vers else self.default_version
+
+ def _get_topic(self, topic):
+ """Return the topic to use for a message."""
+ return topic if topic else self.topic
+
+ @staticmethod
+ def make_msg(method, **kwargs):
+ return {'method': method, 'args': kwargs}
+
+ def call(self, context, msg, topic=None, version=None, timeout=None):
+ """rpc.call() a remote method.
+
+ :param context: The request context
+ :param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
+ :param timeout: (Optional) A timeout to use when waiting for the
+ response. If no timeout is specified, a default timeout will be
+ used that is usually sufficient.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: The return value from the remote method.
+ """
+ self._set_version(msg, version)
+ return rpc.call(context, self._get_topic(topic), msg, timeout)
+
+ def multicall(self, context, msg, topic=None, version=None, timeout=None):
+ """rpc.multicall() a remote method.
+
+ :param context: The request context
+ :param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
+ :param timeout: (Optional) A timeout to use when waiting for the
+ response. If no timeout is specified, a default timeout will be
+ used that is usually sufficient.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: An iterator that lets you process each of the returned values
+ from the remote method as they arrive.
+ """
+ self._set_version(msg, version)
+ return rpc.multicall(context, self._get_topic(topic), msg, timeout)
+
+ def cast(self, context, msg, topic=None, version=None):
+ """rpc.cast() a remote method.
+
+ :param context: The request context
+ :param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: None. rpc.cast() does not wait on any return value from the
+ remote method.
+ """
+ self._set_version(msg, version)
+ rpc.cast(context, self._get_topic(topic), msg)
+
+ def fanout_cast(self, context, msg, version=None):
+ """rpc.fanout_cast() a remote method.
+
+ :param context: The request context
+ :param msg: The message to send, including the method and args.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: None. rpc.fanout_cast() does not wait on any return value
+ from the remote method.
+ """
+ self._set_version(msg, version)
+ rpc.fanout_cast(context, self.topic, msg)
+
+ def cast_to_server(self, context, server_params, msg, topic=None,
+ version=None):
+ """rpc.cast_to_server() a remote method.
+
+ :param context: The request context
+ :param server_params: Server parameters. See rpc.cast_to_server() for
+ details.
+ :param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: None. rpc.cast_to_server() does not wait on any
+ return values.
+ """
+ self._set_version(msg, version)
+ rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
+
+ def fanout_cast_to_server(self, context, server_params, msg, version=None):
+ """rpc.fanout_cast_to_server() a remote method.
+
+ :param context: The request context
+ :param server_params: Server parameters. See rpc.cast_to_server() for
+ details.
+ :param msg: The message to send, including the method and args.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: None. rpc.fanout_cast_to_server() does not wait on any
+ return values.
+ """
+ self._set_version(msg, version)
+ rpc.fanout_cast_to_server(context, server_params, self.topic, msg)
diff --git a/nova/service.py b/nova/service.py
index 6ff50d98e..b179cda6c 100644
--- a/nova/service.py
+++ b/nova/service.py
@@ -198,13 +198,15 @@ class Service(object):
LOG.debug(_("Creating Consumer connection for Service %s") %
self.topic)
+ rpc_dispatcher = self.manager.create_rpc_dispatcher()
+
# Share this same connection for these Consumers
- self.conn.create_consumer(self.topic, self, fanout=False)
+ self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
node_topic = '%s.%s' % (self.topic, self.host)
- self.conn.create_consumer(node_topic, self, fanout=False)
+ self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
- self.conn.create_consumer(self.topic, self, fanout=True)
+ self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
diff --git a/nova/tests/rpc/common.py b/nova/tests/rpc/common.py
index 0c8c11487..c07ddfa1a 100644
--- a/nova/tests/rpc/common.py
+++ b/nova/tests/rpc/common.py
@@ -30,6 +30,7 @@ from nova import flags
from nova import log as logging
from nova.rpc import amqp as rpc_amqp
from nova.rpc import common as rpc_common
+from nova.rpc import dispatcher as rpc_dispatcher
from nova import test
@@ -44,8 +45,9 @@ class BaseRpcTestCase(test.TestCase):
self.context = context.get_admin_context()
if self.rpc:
self.conn = self.rpc.create_connection(FLAGS, True)
- self.receiver = TestReceiver()
- self.conn.create_consumer('test', self.receiver, False)
+ receiver = TestReceiver()
+ self.dispatcher = rpc_dispatcher.RpcDispatcher([receiver])
+ self.conn.create_consumer('test', self.dispatcher, False)
self.conn.consume_in_thread()
def tearDown(self):
@@ -145,8 +147,9 @@ class BaseRpcTestCase(test.TestCase):
return value
nested = Nested()
+ dispatcher = rpc_dispatcher.RpcDispatcher([nested])
conn = self.rpc.create_connection(FLAGS, True)
- conn.create_consumer('nested', nested, False)
+ conn.create_consumer('nested', dispatcher, False)
conn.consume_in_thread()
value = 42
result = self.rpc.call(FLAGS, self.context,
@@ -228,7 +231,6 @@ class TestReceiver(object):
Uses static methods because we aren't actually storing any state.
"""
-
@staticmethod
def echo(context, value):
"""Simply returns whatever value is sent in."""
diff --git a/nova/tests/rpc/test_dispatcher.py b/nova/tests/rpc/test_dispatcher.py
new file mode 100644
index 000000000..0c16c6a34
--- /dev/null
+++ b/nova/tests/rpc/test_dispatcher.py
@@ -0,0 +1,109 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012, Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Unit Tests for rpc.dispatcher
+"""
+
+from nova import context
+from nova.rpc import dispatcher
+from nova.rpc import common as rpc_common
+from nova import test
+
+
+class RpcDispatcherTestCase(test.TestCase):
+ class API1(object):
+ RPC_API_VERSION = '1.0'
+
+ def __init__(self):
+ self.test_method_ctxt = None
+ self.test_method_arg1 = None
+
+ def test_method(self, ctxt, arg1):
+ self.test_method_ctxt = ctxt
+ self.test_method_arg1 = arg1
+
+ class API2(object):
+ RPC_API_VERSION = '2.1'
+
+ def __init__(self):
+ self.test_method_ctxt = None
+ self.test_method_arg1 = None
+
+ def test_method(self, ctxt, arg1):
+ self.test_method_ctxt = ctxt
+ self.test_method_arg1 = arg1
+
+ class API3(object):
+ RPC_API_VERSION = '3.5'
+
+ def __init__(self):
+ self.test_method_ctxt = None
+ self.test_method_arg1 = None
+
+ def test_method(self, ctxt, arg1):
+ self.test_method_ctxt = ctxt
+ self.test_method_arg1 = arg1
+
+ def setUp(self):
+ self.ctxt = context.RequestContext('fake_user', 'fake_project')
+ super(RpcDispatcherTestCase, self).setUp()
+
+ def tearDown(self):
+ super(RpcDispatcherTestCase, self).tearDown()
+
+ def _test_dispatch(self, version, expectations):
+ v2 = self.API2()
+ v3 = self.API3()
+ disp = dispatcher.RpcDispatcher([v2, v3])
+
+ disp.dispatch(self.ctxt, version, 'test_method', arg1=1)
+
+ self.assertEqual(v2.test_method_ctxt, expectations[0])
+ self.assertEqual(v2.test_method_arg1, expectations[1])
+ self.assertEqual(v3.test_method_ctxt, expectations[2])
+ self.assertEqual(v3.test_method_arg1, expectations[3])
+
+ def test_dispatch(self):
+ self._test_dispatch('2.1', (self.ctxt, 1, None, None))
+ self._test_dispatch('3.5', (None, None, self.ctxt, 1))
+
+ def test_dispatch_lower_minor_version(self):
+ self._test_dispatch('2.0', (self.ctxt, 1, None, None))
+ self._test_dispatch('3.1', (None, None, self.ctxt, 1))
+
+ def test_dispatch_higher_minor_version(self):
+ self.assertRaises(rpc_common.UnsupportedRpcVersion,
+ self._test_dispatch, '2.6', (None, None, None, None))
+ self.assertRaises(rpc_common.UnsupportedRpcVersion,
+ self._test_dispatch, '3.6', (None, None, None, None))
+
+ def test_dispatch_lower_major_version(self):
+ self.assertRaises(rpc_common.UnsupportedRpcVersion,
+ self._test_dispatch, '1.0', (None, None, None, None))
+
+ def test_dispatch_higher_major_version(self):
+ self.assertRaises(rpc_common.UnsupportedRpcVersion,
+ self._test_dispatch, '4.0', (None, None, None, None))
+
+ def test_dispatch_no_version_uses_v1(self):
+ v1 = self.API1()
+ disp = dispatcher.RpcDispatcher([v1])
+
+ disp.dispatch(self.ctxt, None, 'test_method', arg1=1)
+
+ self.assertEqual(v1.test_method_ctxt, self.ctxt)
+ self.assertEqual(v1.test_method_arg1, 1)
diff --git a/nova/tests/rpc/test_proxy.py b/nova/tests/rpc/test_proxy.py
new file mode 100644
index 000000000..9ef504a0d
--- /dev/null
+++ b/nova/tests/rpc/test_proxy.py
@@ -0,0 +1,124 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012, Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Unit Tests for rpc.proxy
+"""
+
+import copy
+
+from nova import context
+from nova import rpc
+from nova.rpc import proxy
+from nova import test
+
+
+class RpcProxyTestCase(test.TestCase):
+
+ def setUp(self):
+ super(RpcProxyTestCase, self).setUp()
+
+ def tearDown(self):
+ super(RpcProxyTestCase, self).tearDown()
+
+ def _test_rpc_method(self, rpc_method, has_timeout=False, has_retval=False,
+ server_params=None, supports_topic_override=True):
+ topic = 'fake_topic'
+ timeout = 123
+ rpc_proxy = proxy.RpcProxy(topic, '1.0')
+ ctxt = context.RequestContext('fake_user', 'fake_project')
+ msg = {'method': 'fake_method', 'args': {'x': 'y'}}
+ expected_msg = {'method': 'fake_method', 'args': {'x': 'y'},
+ 'version': '1.0'}
+
+ expected_retval = 'hi' if has_retval else None
+
+ self.fake_args = None
+ self.fake_kwargs = None
+
+ def _fake_rpc_method(*args, **kwargs):
+ self.fake_args = args
+ self.fake_kwargs = kwargs
+ if has_retval:
+ return expected_retval
+
+ self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
+
+ args = [ctxt, msg]
+ if server_params:
+ args.insert(1, server_params)
+
+ # Base method usage
+ retval = getattr(rpc_proxy, rpc_method)(*args)
+ self.assertEqual(retval, expected_retval)
+ expected_args = [ctxt, topic, expected_msg]
+ if server_params:
+ expected_args.insert(1, server_params)
+ for arg, expected_arg in zip(self.fake_args, expected_args):
+ self.assertEqual(arg, expected_arg)
+
+ # overriding the version
+ retval = getattr(rpc_proxy, rpc_method)(*args, version='1.1')
+ self.assertEqual(retval, expected_retval)
+ new_msg = copy.deepcopy(expected_msg)
+ new_msg['version'] = '1.1'
+ expected_args = [ctxt, topic, new_msg]
+ if server_params:
+ expected_args.insert(1, server_params)
+ for arg, expected_arg in zip(self.fake_args, expected_args):
+ self.assertEqual(arg, expected_arg)
+
+ if has_timeout:
+ # set a timeout
+ retval = getattr(rpc_proxy, rpc_method)(ctxt, msg, timeout=timeout)
+ self.assertEqual(retval, expected_retval)
+ expected_args = [ctxt, topic, expected_msg, timeout]
+ for arg, expected_arg in zip(self.fake_args, expected_args):
+ self.assertEqual(arg, expected_arg)
+
+ if supports_topic_override:
+ # set a topic
+ new_topic = 'foo.bar'
+ retval = getattr(rpc_proxy, rpc_method)(*args, topic=new_topic)
+ self.assertEqual(retval, expected_retval)
+ expected_args = [ctxt, new_topic, expected_msg]
+ if server_params:
+ expected_args.insert(1, server_params)
+ for arg, expected_arg in zip(self.fake_args, expected_args):
+ self.assertEqual(arg, expected_arg)
+
+ def test_call(self):
+ self._test_rpc_method('call', has_timeout=True, has_retval=True)
+
+ def test_multicall(self):
+ self._test_rpc_method('multicall', has_timeout=True, has_retval=True)
+
+ def test_cast(self):
+ self._test_rpc_method('cast')
+
+ def test_fanout_cast(self):
+ self._test_rpc_method('fanout_cast', supports_topic_override=False)
+
+ def test_cast_to_server(self):
+ self._test_rpc_method('cast_to_server', server_params={'blah': 1})
+
+ def test_fanout_cast_to_server(self):
+ self._test_rpc_method('fanout_cast_to_server',
+ server_params={'blah': 1}, supports_topic_override=False)
+
+ def test_make_msg(self):
+ self.assertEqual(proxy.RpcProxy.make_msg('test_method', a=1, b=2),
+ {'method': 'test_method', 'args': {'a': 1, 'b': 2}})