summaryrefslogtreecommitdiffstats
path: root/tests/unit/rpc/test_proxy.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unit/rpc/test_proxy.py')
-rw-r--r--tests/unit/rpc/test_proxy.py128
1 files changed, 128 insertions, 0 deletions
diff --git a/tests/unit/rpc/test_proxy.py b/tests/unit/rpc/test_proxy.py
new file mode 100644
index 0000000..1af37c7
--- /dev/null
+++ b/tests/unit/rpc/test_proxy.py
@@ -0,0 +1,128 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012, Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Unit Tests for rpc.proxy
+"""
+
+import copy
+import stubout
+import unittest
+
+from openstack.common import context
+from openstack.common import rpc
+from openstack.common.rpc import proxy
+
+
+class RpcProxyTestCase(unittest.TestCase):
+
+ def setUp(self):
+ self.stubs = stubout.StubOutForTesting()
+ super(RpcProxyTestCase, self).setUp()
+
+ def tearDown(self):
+ self.stubs.UnsetAll()
+ self.stubs.SmartUnsetAll()
+ super(RpcProxyTestCase, self).tearDown()
+
+ def _test_rpc_method(self, rpc_method, has_timeout=False, has_retval=False,
+ server_params=None, supports_topic_override=True):
+ topic = 'fake_topic'
+ timeout = 123
+ rpc_proxy = proxy.RpcProxy(topic, '1.0')
+ ctxt = context.RequestContext('fake_user', 'fake_project')
+ msg = {'method': 'fake_method', 'args': {'x': 'y'}}
+ expected_msg = {'method': 'fake_method', 'args': {'x': 'y'},
+ 'version': '1.0'}
+
+ expected_retval = 'hi' if has_retval else None
+
+ self.fake_args = None
+ self.fake_kwargs = None
+
+ def _fake_rpc_method(*args, **kwargs):
+ self.fake_args = args
+ self.fake_kwargs = kwargs
+ if has_retval:
+ return expected_retval
+
+ self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
+
+ args = [ctxt, msg]
+ if server_params:
+ args.insert(1, server_params)
+
+ # Base method usage
+ retval = getattr(rpc_proxy, rpc_method)(*args)
+ self.assertEqual(retval, expected_retval)
+ expected_args = [ctxt, topic, expected_msg]
+ if server_params:
+ expected_args.insert(1, server_params)
+ for arg, expected_arg in zip(self.fake_args, expected_args):
+ self.assertEqual(arg, expected_arg)
+
+ # overriding the version
+ retval = getattr(rpc_proxy, rpc_method)(*args, version='1.1')
+ self.assertEqual(retval, expected_retval)
+ new_msg = copy.deepcopy(expected_msg)
+ new_msg['version'] = '1.1'
+ expected_args = [ctxt, topic, new_msg]
+ if server_params:
+ expected_args.insert(1, server_params)
+ for arg, expected_arg in zip(self.fake_args, expected_args):
+ self.assertEqual(arg, expected_arg)
+
+ if has_timeout:
+ # set a timeout
+ retval = getattr(rpc_proxy, rpc_method)(ctxt, msg, timeout=timeout)
+ self.assertEqual(retval, expected_retval)
+ expected_args = [ctxt, topic, expected_msg, timeout]
+ for arg, expected_arg in zip(self.fake_args, expected_args):
+ self.assertEqual(arg, expected_arg)
+
+ if supports_topic_override:
+ # set a topic
+ new_topic = 'foo.bar'
+ retval = getattr(rpc_proxy, rpc_method)(*args, topic=new_topic)
+ self.assertEqual(retval, expected_retval)
+ expected_args = [ctxt, new_topic, expected_msg]
+ if server_params:
+ expected_args.insert(1, server_params)
+ for arg, expected_arg in zip(self.fake_args, expected_args):
+ self.assertEqual(arg, expected_arg)
+
+ def test_call(self):
+ self._test_rpc_method('call', has_timeout=True, has_retval=True)
+
+ def test_multicall(self):
+ self._test_rpc_method('multicall', has_timeout=True, has_retval=True)
+
+ def test_cast(self):
+ self._test_rpc_method('cast')
+
+ def test_fanout_cast(self):
+ self._test_rpc_method('fanout_cast', supports_topic_override=False)
+
+ def test_cast_to_server(self):
+ self._test_rpc_method('cast_to_server', server_params={'blah': 1})
+
+ def test_fanout_cast_to_server(self):
+ self._test_rpc_method('fanout_cast_to_server',
+ server_params={'blah': 1}, supports_topic_override=False)
+
+ def test_make_msg(self):
+ self.assertEqual(proxy.RpcProxy.make_msg('test_method', a=1, b=2),
+ {'method': 'test_method', 'args': {'a': 1, 'b': 2}})