summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEric Windisch <eric@cloudscaling.com>2012-05-25 09:59:47 -0400
committerEric Windisch <eric@cloudscaling.com>2012-05-30 13:42:14 -0400
commit0f6df0b51fbabf30259b61289cb2157f82bed3a8 (patch)
tree599872b913110e3b11d46dbe1a3176ac50c42454
parentf06624ef9415dfb3343a357c156f035c29fa01ef (diff)
downloadnova-0f6df0b51fbabf30259b61289cb2157f82bed3a8.tar.gz
nova-0f6df0b51fbabf30259b61289cb2157f82bed3a8.tar.xz
nova-0f6df0b51fbabf30259b61289cb2157f82bed3a8.zip
New RPC tests, docstring fixes.
Also fixes versioning problem in impl_fake Makes topics configurable by RPC test suites. Change-Id: I2c7b2413b8e642e7ac1e82d54c052b97d6149659
-rw-r--r--nova/rpc/impl_fake.py3
-rw-r--r--nova/tests/rpc/common.py101
2 files changed, 78 insertions, 26 deletions
diff --git a/nova/rpc/impl_fake.py b/nova/rpc/impl_fake.py
index 70a8ca5f7..ccfd92943 100644
--- a/nova/rpc/impl_fake.py
+++ b/nova/rpc/impl_fake.py
@@ -169,10 +169,11 @@ def fanout_cast(conf, context, topic, msg):
if not method:
return
args = msg.get('args', {})
+ version = msg.get('version', None)
for consumer in CONSUMERS.get(topic, []):
try:
- consumer.call(context, method, args, None)
+ consumer.call(context, version, method, args, None)
except Exception:
pass
diff --git a/nova/tests/rpc/common.py b/nova/tests/rpc/common.py
index 3c13869ba..84dd79890 100644
--- a/nova/tests/rpc/common.py
+++ b/nova/tests/rpc/common.py
@@ -21,6 +21,7 @@ Unit Tests for remote procedure calls shared between all implementations
import time
+import eventlet
from eventlet import greenthread
import nose
@@ -39,28 +40,36 @@ LOG = logging.getLogger(__name__)
class BaseRpcTestCase(test.TestCase):
- def setUp(self, supports_timeouts=True):
+ def setUp(self, supports_timeouts=True, topic='test',
+ topic_nested='nested'):
super(BaseRpcTestCase, self).setUp()
+ self.topic = topic or self.topic
+ self.topic_nested = topic_nested or self.topic_nested
self.supports_timeouts = supports_timeouts
self.context = context.get_admin_context()
+
if self.rpc:
- self.conn = self.rpc.create_connection(FLAGS, True)
receiver = TestReceiver()
- self.dispatcher = rpc_dispatcher.RpcDispatcher([receiver])
- self.conn.create_consumer('test', self.dispatcher, False)
- self.conn.consume_in_thread()
+ self.conn = self._create_consumer(receiver, self.topic)
def tearDown(self):
if self.rpc:
self.conn.close()
super(BaseRpcTestCase, self).tearDown()
+ def _create_consumer(self, proxy, topic, fanout=False):
+ dispatcher = rpc_dispatcher.RpcDispatcher([proxy])
+ conn = self.rpc.create_connection(FLAGS, True)
+ conn.create_consumer(topic, dispatcher, fanout)
+ conn.consume_in_thread()
+ return conn
+
def test_call_succeed(self):
if not self.rpc:
raise nose.SkipTest('rpc driver not available.')
value = 42
- result = self.rpc.call(FLAGS, self.context, 'test',
+ result = self.rpc.call(FLAGS, self.context, self.topic,
{"method": "echo", "args": {"value": value}})
self.assertEqual(value, result)
@@ -69,7 +78,7 @@ class BaseRpcTestCase(test.TestCase):
raise nose.SkipTest('rpc driver not available.')
value = 42
- result = self.rpc.call(FLAGS, self.context, 'test',
+ result = self.rpc.call(FLAGS, self.context, self.topic,
{"method": "echo_three_times_yield",
"args": {"value": value}})
self.assertEqual(value + 2, result)
@@ -80,7 +89,7 @@ class BaseRpcTestCase(test.TestCase):
value = 42
result = self.rpc.multicall(FLAGS, self.context,
- 'test',
+ self.topic,
{"method": "echo",
"args": {"value": value}})
for i, x in enumerate(result):
@@ -94,7 +103,7 @@ class BaseRpcTestCase(test.TestCase):
value = 42
result = self.rpc.multicall(FLAGS, self.context,
- 'test',
+ self.topic,
{"method": "multicall_three_nones",
"args": {"value": value}})
for i, x in enumerate(result):
@@ -108,7 +117,7 @@ class BaseRpcTestCase(test.TestCase):
value = 42
result = self.rpc.multicall(FLAGS, self.context,
- 'test',
+ self.topic,
{"method": "echo_three_times_yield",
"args": {"value": value}})
for i, x in enumerate(result):
@@ -121,10 +130,54 @@ class BaseRpcTestCase(test.TestCase):
"""Makes sure a context is passed through rpc call."""
value = 42
result = self.rpc.call(FLAGS, self.context,
- 'test', {"method": "context",
+ self.topic, {"method": "context",
"args": {"value": value}})
self.assertEqual(self.context.to_dict(), result)
+ def _test_cast(self, fanout=False):
+ """Test casts by pushing items through a channeled queue."""
+
+ # Not a true global, but capitalized so
+ # it is clear it is leaking scope into Nested()
+ QUEUE = eventlet.queue.Queue()
+
+ if not self.rpc:
+ raise nose.SkipTest('rpc driver not available.')
+
+ # We use the nested topic so we don't need QUEUE to be a proper
+ # global, and do not keep state outside this test.
+ class Nested(object):
+ @staticmethod
+ def put_queue(context, value):
+ LOG.debug("Got value in put_queue: %s", value)
+ QUEUE.put(value)
+
+ nested = Nested()
+ conn = self._create_consumer(nested, self.topic_nested, fanout)
+ value = 42
+
+ method = (self.rpc.cast, self.rpc.fanout_cast)[fanout]
+ method(FLAGS, self.context,
+ self.topic_nested,
+ {"method": "put_queue",
+ "args": {"value": value}})
+
+ try:
+ # If it does not succeed in 2 seconds, give up and assume
+ # failure.
+ result = QUEUE.get(True, 2)
+ except Exception:
+ self.assertEqual(value, None)
+
+ conn.close()
+ self.assertEqual(value, result)
+
+ def test_cast_success(self):
+ self._test_cast(False)
+
+ def test_fanout_success(self):
+ self._test_cast(True)
+
def test_nested_calls(self):
if not self.rpc:
raise nose.SkipTest('rpc driver not available.')
@@ -133,7 +186,7 @@ class BaseRpcTestCase(test.TestCase):
class Nested(object):
@staticmethod
def echo(context, queue, value):
- """Calls echo in the passed queue"""
+ """Calls echo in the passed queue."""
LOG.debug(_("Nested received %(queue)s, %(value)s")
% locals())
# TODO(comstud):
@@ -147,15 +200,13 @@ class BaseRpcTestCase(test.TestCase):
return value
nested = Nested()
- dispatcher = rpc_dispatcher.RpcDispatcher([nested])
- conn = self.rpc.create_connection(FLAGS, True)
- conn.create_consumer('nested', dispatcher, False)
- conn.consume_in_thread()
+ conn = self._create_consumer(nested, self.topic_nested)
+
value = 42
result = self.rpc.call(FLAGS, self.context,
- 'nested', {"method": "echo",
- "args": {"queue": "test",
- "value": value}})
+ self.topic_nested,
+ {"method": "echo",
+ "args": {"queue": "test", "value": value}})
conn.close()
self.assertEqual(value, result)
@@ -163,7 +214,7 @@ class BaseRpcTestCase(test.TestCase):
if not self.rpc:
raise nose.SkipTest('rpc driver not available.')
- """Make sure rpc.call will time out"""
+ """Make sure rpc.call will time out."""
if not self.supports_timeouts:
raise nose.SkipTest(_("RPC backend does not support timeouts"))
@@ -171,12 +222,12 @@ class BaseRpcTestCase(test.TestCase):
self.assertRaises(rpc_common.Timeout,
self.rpc.call,
FLAGS, self.context,
- 'test',
+ self.topic,
{"method": "block",
"args": {"value": value}}, timeout=1)
try:
self.rpc.call(FLAGS, self.context,
- 'test',
+ self.topic,
{"method": "block",
"args": {"value": value}},
timeout=1)
@@ -186,7 +237,7 @@ class BaseRpcTestCase(test.TestCase):
class BaseRpcAMQPTestCase(BaseRpcTestCase):
- """Base test class for all AMQP-based RPC tests"""
+ """Base test class for all AMQP-based RPC tests."""
def test_proxycallback_handles_exceptions(self):
"""Make sure exceptions unpacking messages don't cause hangs."""
if not self.rpc:
@@ -203,7 +254,7 @@ class BaseRpcAMQPTestCase(BaseRpcTestCase):
self.stubs.Set(rpc_amqp, 'unpack_context', fake_unpack_context)
value = 41
- self.rpc.cast(FLAGS, self.context, 'test',
+ self.rpc.cast(FLAGS, self.context, self.topic,
{"method": "echo", "args": {"value": value}})
# Wait for the cast to complete.
@@ -219,7 +270,7 @@ class BaseRpcAMQPTestCase(BaseRpcTestCase):
self.stubs.Set(rpc_amqp, 'unpack_context', orig_unpack)
value = 42
- result = self.rpc.call(FLAGS, self.context, 'test',
+ result = self.rpc.call(FLAGS, self.context, self.topic,
{"method": "echo",
"args": {"value": value}})
self.assertEqual(value, result)