diff options
author | Russell Bryant <rbryant@redhat.com> | 2012-05-07 14:10:29 -0400 |
---|---|---|
committer | Russell Bryant <rbryant@redhat.com> | 2012-05-17 12:16:11 -0400 |
commit | 8ed3059cb4cb6ff777b59716db32c3133900b393 (patch) | |
tree | e0460aad9a32e5a7a6a077d5994e042a757f1464 /nova | |
parent | f00b4e060008d319ae813710deec9866f9e93335 (diff) | |
download | nova-8ed3059cb4cb6ff777b59716db32c3133900b393.tar.gz nova-8ed3059cb4cb6ff777b59716db32c3133900b393.tar.xz nova-8ed3059cb4cb6ff777b59716db32c3133900b393.zip |
Add base support for rpc API versioning.
Part of blueprint versioned-rpc-apis.
This commit includes the base support for versioned RPC APIs. It
introduces the RpcProxy and RpcDispatcher classes that have common code
for handling versioning on the client and server sides, respectively.
RPC APIs will be converted one at a time using this infrastructure.
Change-Id: I07bd82e9ff60c356123950e466caaffdfce79eba
Diffstat (limited to 'nova')
-rw-r--r-- | nova/manager.py | 12 | ||||
-rw-r--r-- | nova/rpc/__init__.py | 8 | ||||
-rw-r--r-- | nova/rpc/amqp.py | 19 | ||||
-rw-r--r-- | nova/rpc/common.py | 5 | ||||
-rw-r--r-- | nova/rpc/dispatcher.py | 105 | ||||
-rw-r--r-- | nova/rpc/impl_fake.py | 9 | ||||
-rw-r--r-- | nova/rpc/proxy.py | 161 | ||||
-rw-r--r-- | nova/service.py | 8 | ||||
-rw-r--r-- | nova/tests/rpc/common.py | 10 | ||||
-rw-r--r-- | nova/tests/rpc/test_dispatcher.py | 109 | ||||
-rw-r--r-- | nova/tests/rpc/test_proxy.py | 124 |
11 files changed, 550 insertions, 20 deletions
diff --git a/nova/manager.py b/nova/manager.py index 7a0b2888e..cbe67e008 100644 --- a/nova/manager.py +++ b/nova/manager.py @@ -56,6 +56,7 @@ This module provides Manager, a base class for managers. from nova.db import base from nova import flags from nova import log as logging +from nova.rpc import dispatcher as rpc_dispatcher from nova.scheduler import api from nova import version @@ -130,12 +131,23 @@ class ManagerMeta(type): class Manager(base.Base): __metaclass__ = ManagerMeta + # Set RPC API version to 1.0 by default. + RPC_API_VERSION = '1.0' + def __init__(self, host=None, db_driver=None): if not host: host = FLAGS.host self.host = host super(Manager, self).__init__(db_driver) + def create_rpc_dispatcher(self): + '''Get the rpc dispatcher for this manager. + + If a manager would like to set an rpc API version, or support more than + one class as the target of rpc messages, override this method. + ''' + return rpc_dispatcher.RpcDispatcher([self]) + def periodic_tasks(self, context, raise_on_error=False): """Tasks to be run at a periodic interval.""" for task_name, task in self._periodic_tasks: diff --git a/nova/rpc/__init__.py b/nova/rpc/__init__.py index acfc10e7e..b48e47610 100644 --- a/nova/rpc/__init__.py +++ b/nova/rpc/__init__.py @@ -17,6 +17,14 @@ # License for the specific language governing permissions and limitations # under the License. +""" +A remote procedure call (rpc) abstraction. + +For some wrappers that add message versioning to rpc, see: + rpc.dispatcher + rpc.proxy +""" + from nova.openstack.common import cfg from nova.openstack.common import importutils diff --git a/nova/rpc/amqp.py b/nova/rpc/amqp.py index b0a0d5d12..0e079f533 100644 --- a/nova/rpc/amqp.py +++ b/nova/rpc/amqp.py @@ -242,23 +242,26 @@ class ProxyCallback(object): ctxt = unpack_context(self.conf, message_data) method = message_data.get('method') args = message_data.get('args', {}) + version = message_data.get('version', None) if not method: LOG.warn(_('no method for message: %s') % message_data) ctxt.reply(_('No method for message: %s') % message_data, connection_pool=self.connection_pool) return - self.pool.spawn_n(self._process_data, ctxt, method, args) + self.pool.spawn_n(self._process_data, ctxt, version, method, args) - def _process_data(self, ctxt, method, args): - """Thread that magically looks for a method on the proxy - object and calls it. + def _process_data(self, ctxt, version, method, args): + """Process a message in a new thread. + + If the proxy object we have has a dispatch method + (see rpc.dispatcher.RpcDispatcher), pass it the version, + method, and args and let it dispatch as appropriate. If not, use + the old behavior of magically calling the specified method on the + proxy we have here. """ ctxt.update_store() try: - node_func = getattr(self.proxy, str(method)) - node_args = dict((str(k), v) for k, v in args.iteritems()) - # NOTE(vish): magic is fun! - rval = node_func(context=ctxt, **node_args) + rval = self.proxy.dispatch(ctxt, version, method, **args) # Check if the result was a generator if inspect.isgenerator(rval): for x in rval: diff --git a/nova/rpc/common.py b/nova/rpc/common.py index d15d1f3f1..aee243800 100644 --- a/nova/rpc/common.py +++ b/nova/rpc/common.py @@ -85,6 +85,11 @@ class InvalidRPCConnectionReuse(RPCException): message = _("Invalid reuse of an RPC connection.") +class UnsupportedRpcVersion(RPCException): + message = _("Specified RPC version, %(version)s, not supported by " + "this endpoint.") + + class Connection(object): """A connection, returned by rpc.create_connection(). diff --git a/nova/rpc/dispatcher.py b/nova/rpc/dispatcher.py new file mode 100644 index 000000000..3f46398a9 --- /dev/null +++ b/nova/rpc/dispatcher.py @@ -0,0 +1,105 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Code for rpc message dispatching. + +Messages that come in have a version number associated with them. RPC API +version numbers are in the form: + + Major.Minor + +For a given message with version X.Y, the receiver must be marked as able to +handle messages of version A.B, where: + + A = X + + B >= Y + +The Major version number would be incremented for an almost completely new API. +The Minor version number would be incremented for backwards compatible changes +to an existing API. A backwards compatible change could be something like +adding a new method, adding an argument to an existing method (but not +requiring it), or changing the type for an existing argument (but still +handling the old type as well). + +The conversion over to a versioned API must be done on both the client side and +server side of the API at the same time. However, as the code stands today, +there can be both versioned and unversioned APIs implemented in the same code +base. +""" + +from nova.rpc import common as rpc_common + + +class RpcDispatcher(object): + """Dispatch rpc messages according to the requested API version. + + This class can be used as the top level 'manager' for a service. It + contains a list of underlying managers that have an API_VERSION attribute. + """ + + def __init__(self, callbacks): + """Initialize the rpc dispatcher. + + :param callbacks: List of proxy objects that are an instance + of a class with rpc methods exposed. Each proxy + object should have an RPC_API_VERSION attribute. + """ + self.callbacks = callbacks + super(RpcDispatcher, self).__init__() + + @staticmethod + def _is_compatible(mversion, version): + """Determine whether versions are compatible. + + :param mversion: The API version implemented by a callback. + :param version: The API version requested by an incoming message. + """ + version_parts = version.split('.') + mversion_parts = mversion.split('.') + if int(version_parts[0]) != int(mversion_parts[0]): # Major + return False + if int(version_parts[1]) > int(mversion_parts[1]): # Minor + return False + return True + + def dispatch(self, ctxt, version, method, **kwargs): + """Dispatch a message based on a requested version. + + :param ctxt: The request context + :param version: The requested API version from the incoming message + :param method: The method requested to be called by the incoming + message. + :param kwargs: A dict of keyword arguments to be passed to the method. + + :returns: Whatever is returned by the underlying method that gets + called. + """ + if not version: + version = '1.0' + + for proxyobj in self.callbacks: + if hasattr(proxyobj, 'RPC_API_VERSION'): + rpc_api_version = proxyobj.RPC_API_VERSION + else: + rpc_api_version = '1.0' + if not hasattr(proxyobj, method): + continue + if self._is_compatible(rpc_api_version, version): + return getattr(proxyobj, method)(ctxt, **kwargs) + + raise rpc_common.UnsupportedRpcVersion(version=version) diff --git a/nova/rpc/impl_fake.py b/nova/rpc/impl_fake.py index 99c686901..70a8ca5f7 100644 --- a/nova/rpc/impl_fake.py +++ b/nova/rpc/impl_fake.py @@ -47,15 +47,13 @@ class Consumer(object): self.topic = topic self.proxy = proxy - def call(self, context, method, args, timeout): - node_func = getattr(self.proxy, method) - node_args = dict((str(k), v) for k, v in args.iteritems()) + def call(self, context, version, method, args, timeout): done = eventlet.event.Event() def _inner(): ctxt = RpcContext.from_dict(context.to_dict()) try: - rval = node_func(context=ctxt, **node_args) + rval = self.proxy.dispatch(context, version, method, **args) res = [] # Caller might have called ctxt.reply() manually for (reply, failure) in ctxt._response: @@ -129,13 +127,14 @@ def multicall(conf, context, topic, msg, timeout=None): if not method: return args = msg.get('args', {}) + version = msg.get('version', None) try: consumer = CONSUMERS[topic][0] except (KeyError, IndexError): return iter([None]) else: - return consumer.call(context, method, args, timeout) + return consumer.call(context, version, method, args, timeout) def call(conf, context, topic, msg, timeout=None): diff --git a/nova/rpc/proxy.py b/nova/rpc/proxy.py new file mode 100644 index 000000000..79a90dc3a --- /dev/null +++ b/nova/rpc/proxy.py @@ -0,0 +1,161 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +A helper class for proxy objects to remote APIs. + +For more information about rpc API version numbers, see: + rpc/dispatcher.py +""" + + +from nova import rpc + + +class RpcProxy(object): + """A helper class for rpc clients. + + This class is a wrapper around the RPC client API. It allows you to + specify the topic and API version in a single place. This is intended to + be used as a base class for a class that implements the client side of an + rpc API. + """ + + def __init__(self, topic, default_version): + """Initialize an RpcProxy. + + :param topic: The topic to use for all messages. + :param default_version: The default API version to request in all + outgoing messages. This can be overridden on a per-message + basis. + """ + self.topic = topic + self.default_version = default_version + super(RpcProxy, self).__init__() + + def _set_version(self, msg, vers): + """Helper method to set the version in a message. + + :param msg: The message having a version added to it. + :param vers: The version number to add to the message. + """ + msg['version'] = vers if vers else self.default_version + + def _get_topic(self, topic): + """Return the topic to use for a message.""" + return topic if topic else self.topic + + @staticmethod + def make_msg(method, **kwargs): + return {'method': method, 'args': kwargs} + + def call(self, context, msg, topic=None, version=None, timeout=None): + """rpc.call() a remote method. + + :param context: The request context + :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. + :param timeout: (Optional) A timeout to use when waiting for the + response. If no timeout is specified, a default timeout will be + used that is usually sufficient. + :param version: (Optional) Override the requested API version in this + message. + + :returns: The return value from the remote method. + """ + self._set_version(msg, version) + return rpc.call(context, self._get_topic(topic), msg, timeout) + + def multicall(self, context, msg, topic=None, version=None, timeout=None): + """rpc.multicall() a remote method. + + :param context: The request context + :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. + :param timeout: (Optional) A timeout to use when waiting for the + response. If no timeout is specified, a default timeout will be + used that is usually sufficient. + :param version: (Optional) Override the requested API version in this + message. + + :returns: An iterator that lets you process each of the returned values + from the remote method as they arrive. + """ + self._set_version(msg, version) + return rpc.multicall(context, self._get_topic(topic), msg, timeout) + + def cast(self, context, msg, topic=None, version=None): + """rpc.cast() a remote method. + + :param context: The request context + :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. + :param version: (Optional) Override the requested API version in this + message. + + :returns: None. rpc.cast() does not wait on any return value from the + remote method. + """ + self._set_version(msg, version) + rpc.cast(context, self._get_topic(topic), msg) + + def fanout_cast(self, context, msg, version=None): + """rpc.fanout_cast() a remote method. + + :param context: The request context + :param msg: The message to send, including the method and args. + :param version: (Optional) Override the requested API version in this + message. + + :returns: None. rpc.fanout_cast() does not wait on any return value + from the remote method. + """ + self._set_version(msg, version) + rpc.fanout_cast(context, self.topic, msg) + + def cast_to_server(self, context, server_params, msg, topic=None, + version=None): + """rpc.cast_to_server() a remote method. + + :param context: The request context + :param server_params: Server parameters. See rpc.cast_to_server() for + details. + :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. + :param version: (Optional) Override the requested API version in this + message. + + :returns: None. rpc.cast_to_server() does not wait on any + return values. + """ + self._set_version(msg, version) + rpc.cast_to_server(context, server_params, self._get_topic(topic), msg) + + def fanout_cast_to_server(self, context, server_params, msg, version=None): + """rpc.fanout_cast_to_server() a remote method. + + :param context: The request context + :param server_params: Server parameters. See rpc.cast_to_server() for + details. + :param msg: The message to send, including the method and args. + :param version: (Optional) Override the requested API version in this + message. + + :returns: None. rpc.fanout_cast_to_server() does not wait on any + return values. + """ + self._set_version(msg, version) + rpc.fanout_cast_to_server(context, server_params, self.topic, msg) diff --git a/nova/service.py b/nova/service.py index 7d5db5a0a..dcd1205f2 100644 --- a/nova/service.py +++ b/nova/service.py @@ -197,13 +197,15 @@ class Service(object): LOG.debug(_("Creating Consumer connection for Service %s") % self.topic) + rpc_dispatcher = self.manager.create_rpc_dispatcher() + # Share this same connection for these Consumers - self.conn.create_consumer(self.topic, self, fanout=False) + self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False) node_topic = '%s.%s' % (self.topic, self.host) - self.conn.create_consumer(node_topic, self, fanout=False) + self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False) - self.conn.create_consumer(self.topic, self, fanout=True) + self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True) # Consume from all consumers in a thread self.conn.consume_in_thread() diff --git a/nova/tests/rpc/common.py b/nova/tests/rpc/common.py index 0c8c11487..c07ddfa1a 100644 --- a/nova/tests/rpc/common.py +++ b/nova/tests/rpc/common.py @@ -30,6 +30,7 @@ from nova import flags from nova import log as logging from nova.rpc import amqp as rpc_amqp from nova.rpc import common as rpc_common +from nova.rpc import dispatcher as rpc_dispatcher from nova import test @@ -44,8 +45,9 @@ class BaseRpcTestCase(test.TestCase): self.context = context.get_admin_context() if self.rpc: self.conn = self.rpc.create_connection(FLAGS, True) - self.receiver = TestReceiver() - self.conn.create_consumer('test', self.receiver, False) + receiver = TestReceiver() + self.dispatcher = rpc_dispatcher.RpcDispatcher([receiver]) + self.conn.create_consumer('test', self.dispatcher, False) self.conn.consume_in_thread() def tearDown(self): @@ -145,8 +147,9 @@ class BaseRpcTestCase(test.TestCase): return value nested = Nested() + dispatcher = rpc_dispatcher.RpcDispatcher([nested]) conn = self.rpc.create_connection(FLAGS, True) - conn.create_consumer('nested', nested, False) + conn.create_consumer('nested', dispatcher, False) conn.consume_in_thread() value = 42 result = self.rpc.call(FLAGS, self.context, @@ -228,7 +231,6 @@ class TestReceiver(object): Uses static methods because we aren't actually storing any state. """ - @staticmethod def echo(context, value): """Simply returns whatever value is sent in.""" diff --git a/nova/tests/rpc/test_dispatcher.py b/nova/tests/rpc/test_dispatcher.py new file mode 100644 index 000000000..0c16c6a34 --- /dev/null +++ b/nova/tests/rpc/test_dispatcher.py @@ -0,0 +1,109 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Unit Tests for rpc.dispatcher +""" + +from nova import context +from nova.rpc import dispatcher +from nova.rpc import common as rpc_common +from nova import test + + +class RpcDispatcherTestCase(test.TestCase): + class API1(object): + RPC_API_VERSION = '1.0' + + def __init__(self): + self.test_method_ctxt = None + self.test_method_arg1 = None + + def test_method(self, ctxt, arg1): + self.test_method_ctxt = ctxt + self.test_method_arg1 = arg1 + + class API2(object): + RPC_API_VERSION = '2.1' + + def __init__(self): + self.test_method_ctxt = None + self.test_method_arg1 = None + + def test_method(self, ctxt, arg1): + self.test_method_ctxt = ctxt + self.test_method_arg1 = arg1 + + class API3(object): + RPC_API_VERSION = '3.5' + + def __init__(self): + self.test_method_ctxt = None + self.test_method_arg1 = None + + def test_method(self, ctxt, arg1): + self.test_method_ctxt = ctxt + self.test_method_arg1 = arg1 + + def setUp(self): + self.ctxt = context.RequestContext('fake_user', 'fake_project') + super(RpcDispatcherTestCase, self).setUp() + + def tearDown(self): + super(RpcDispatcherTestCase, self).tearDown() + + def _test_dispatch(self, version, expectations): + v2 = self.API2() + v3 = self.API3() + disp = dispatcher.RpcDispatcher([v2, v3]) + + disp.dispatch(self.ctxt, version, 'test_method', arg1=1) + + self.assertEqual(v2.test_method_ctxt, expectations[0]) + self.assertEqual(v2.test_method_arg1, expectations[1]) + self.assertEqual(v3.test_method_ctxt, expectations[2]) + self.assertEqual(v3.test_method_arg1, expectations[3]) + + def test_dispatch(self): + self._test_dispatch('2.1', (self.ctxt, 1, None, None)) + self._test_dispatch('3.5', (None, None, self.ctxt, 1)) + + def test_dispatch_lower_minor_version(self): + self._test_dispatch('2.0', (self.ctxt, 1, None, None)) + self._test_dispatch('3.1', (None, None, self.ctxt, 1)) + + def test_dispatch_higher_minor_version(self): + self.assertRaises(rpc_common.UnsupportedRpcVersion, + self._test_dispatch, '2.6', (None, None, None, None)) + self.assertRaises(rpc_common.UnsupportedRpcVersion, + self._test_dispatch, '3.6', (None, None, None, None)) + + def test_dispatch_lower_major_version(self): + self.assertRaises(rpc_common.UnsupportedRpcVersion, + self._test_dispatch, '1.0', (None, None, None, None)) + + def test_dispatch_higher_major_version(self): + self.assertRaises(rpc_common.UnsupportedRpcVersion, + self._test_dispatch, '4.0', (None, None, None, None)) + + def test_dispatch_no_version_uses_v1(self): + v1 = self.API1() + disp = dispatcher.RpcDispatcher([v1]) + + disp.dispatch(self.ctxt, None, 'test_method', arg1=1) + + self.assertEqual(v1.test_method_ctxt, self.ctxt) + self.assertEqual(v1.test_method_arg1, 1) diff --git a/nova/tests/rpc/test_proxy.py b/nova/tests/rpc/test_proxy.py new file mode 100644 index 000000000..9ef504a0d --- /dev/null +++ b/nova/tests/rpc/test_proxy.py @@ -0,0 +1,124 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Unit Tests for rpc.proxy +""" + +import copy + +from nova import context +from nova import rpc +from nova.rpc import proxy +from nova import test + + +class RpcProxyTestCase(test.TestCase): + + def setUp(self): + super(RpcProxyTestCase, self).setUp() + + def tearDown(self): + super(RpcProxyTestCase, self).tearDown() + + def _test_rpc_method(self, rpc_method, has_timeout=False, has_retval=False, + server_params=None, supports_topic_override=True): + topic = 'fake_topic' + timeout = 123 + rpc_proxy = proxy.RpcProxy(topic, '1.0') + ctxt = context.RequestContext('fake_user', 'fake_project') + msg = {'method': 'fake_method', 'args': {'x': 'y'}} + expected_msg = {'method': 'fake_method', 'args': {'x': 'y'}, + 'version': '1.0'} + + expected_retval = 'hi' if has_retval else None + + self.fake_args = None + self.fake_kwargs = None + + def _fake_rpc_method(*args, **kwargs): + self.fake_args = args + self.fake_kwargs = kwargs + if has_retval: + return expected_retval + + self.stubs.Set(rpc, rpc_method, _fake_rpc_method) + + args = [ctxt, msg] + if server_params: + args.insert(1, server_params) + + # Base method usage + retval = getattr(rpc_proxy, rpc_method)(*args) + self.assertEqual(retval, expected_retval) + expected_args = [ctxt, topic, expected_msg] + if server_params: + expected_args.insert(1, server_params) + for arg, expected_arg in zip(self.fake_args, expected_args): + self.assertEqual(arg, expected_arg) + + # overriding the version + retval = getattr(rpc_proxy, rpc_method)(*args, version='1.1') + self.assertEqual(retval, expected_retval) + new_msg = copy.deepcopy(expected_msg) + new_msg['version'] = '1.1' + expected_args = [ctxt, topic, new_msg] + if server_params: + expected_args.insert(1, server_params) + for arg, expected_arg in zip(self.fake_args, expected_args): + self.assertEqual(arg, expected_arg) + + if has_timeout: + # set a timeout + retval = getattr(rpc_proxy, rpc_method)(ctxt, msg, timeout=timeout) + self.assertEqual(retval, expected_retval) + expected_args = [ctxt, topic, expected_msg, timeout] + for arg, expected_arg in zip(self.fake_args, expected_args): + self.assertEqual(arg, expected_arg) + + if supports_topic_override: + # set a topic + new_topic = 'foo.bar' + retval = getattr(rpc_proxy, rpc_method)(*args, topic=new_topic) + self.assertEqual(retval, expected_retval) + expected_args = [ctxt, new_topic, expected_msg] + if server_params: + expected_args.insert(1, server_params) + for arg, expected_arg in zip(self.fake_args, expected_args): + self.assertEqual(arg, expected_arg) + + def test_call(self): + self._test_rpc_method('call', has_timeout=True, has_retval=True) + + def test_multicall(self): + self._test_rpc_method('multicall', has_timeout=True, has_retval=True) + + def test_cast(self): + self._test_rpc_method('cast') + + def test_fanout_cast(self): + self._test_rpc_method('fanout_cast', supports_topic_override=False) + + def test_cast_to_server(self): + self._test_rpc_method('cast_to_server', server_params={'blah': 1}) + + def test_fanout_cast_to_server(self): + self._test_rpc_method('fanout_cast_to_server', + server_params={'blah': 1}, supports_topic_override=False) + + def test_make_msg(self): + self.assertEqual(proxy.RpcProxy.make_msg('test_method', a=1, b=2), + {'method': 'test_method', 'args': {'a': 1, 'b': 2}}) |