1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for rpc.proxy
"""
import copy
import stubout
import unittest
from openstack.common import context
from openstack.common import rpc
from openstack.common.rpc import proxy
class RpcProxyTestCase(unittest.TestCase):
def setUp(self):
self.stubs = stubout.StubOutForTesting()
super(RpcProxyTestCase, self).setUp()
def tearDown(self):
self.stubs.UnsetAll()
self.stubs.SmartUnsetAll()
super(RpcProxyTestCase, self).tearDown()
def _test_rpc_method(self, rpc_method, has_timeout=False, has_retval=False,
server_params=None, supports_topic_override=True):
topic = 'fake_topic'
timeout = 123
rpc_proxy = proxy.RpcProxy(topic, '1.0')
ctxt = context.RequestContext('fake_user', 'fake_project')
msg = {'method': 'fake_method', 'args': {'x': 'y'}}
expected_msg = {'method': 'fake_method', 'args': {'x': 'y'},
'version': '1.0'}
expected_retval = 'hi' if has_retval else None
self.fake_args = None
self.fake_kwargs = None
def _fake_rpc_method(*args, **kwargs):
self.fake_args = args
self.fake_kwargs = kwargs
if has_retval:
return expected_retval
self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
args = [ctxt, msg]
if server_params:
args.insert(1, server_params)
# Base method usage
retval = getattr(rpc_proxy, rpc_method)(*args)
self.assertEqual(retval, expected_retval)
expected_args = [ctxt, topic, expected_msg]
if server_params:
expected_args.insert(1, server_params)
for arg, expected_arg in zip(self.fake_args, expected_args):
self.assertEqual(arg, expected_arg)
# overriding the version
retval = getattr(rpc_proxy, rpc_method)(*args, version='1.1')
self.assertEqual(retval, expected_retval)
new_msg = copy.deepcopy(expected_msg)
new_msg['version'] = '1.1'
expected_args = [ctxt, topic, new_msg]
if server_params:
expected_args.insert(1, server_params)
for arg, expected_arg in zip(self.fake_args, expected_args):
self.assertEqual(arg, expected_arg)
if has_timeout:
# set a timeout
retval = getattr(rpc_proxy, rpc_method)(ctxt, msg, timeout=timeout)
self.assertEqual(retval, expected_retval)
expected_args = [ctxt, topic, expected_msg, timeout]
for arg, expected_arg in zip(self.fake_args, expected_args):
self.assertEqual(arg, expected_arg)
if supports_topic_override:
# set a topic
new_topic = 'foo.bar'
retval = getattr(rpc_proxy, rpc_method)(*args, topic=new_topic)
self.assertEqual(retval, expected_retval)
expected_args = [ctxt, new_topic, expected_msg]
if server_params:
expected_args.insert(1, server_params)
for arg, expected_arg in zip(self.fake_args, expected_args):
self.assertEqual(arg, expected_arg)
def test_call(self):
self._test_rpc_method('call', has_timeout=True, has_retval=True)
def test_multicall(self):
self._test_rpc_method('multicall', has_timeout=True, has_retval=True)
def test_cast(self):
self._test_rpc_method('cast')
def test_fanout_cast(self):
self._test_rpc_method('fanout_cast', supports_topic_override=False)
def test_cast_to_server(self):
self._test_rpc_method('cast_to_server', server_params={'blah': 1})
def test_fanout_cast_to_server(self):
self._test_rpc_method('fanout_cast_to_server',
server_params={'blah': 1}, supports_topic_override=False)
def test_make_msg(self):
self.assertEqual(proxy.RpcProxy.make_msg('test_method', a=1, b=2),
{'method': 'test_method', 'args': {'a': 1, 'b': 2}})
|