From 0f6df0b51fbabf30259b61289cb2157f82bed3a8 Mon Sep 17 00:00:00 2001 From: Eric Windisch Date: Fri, 25 May 2012 09:59:47 -0400 Subject: New RPC tests, docstring fixes. Also fixes versioning problem in impl_fake Makes topics configurable by RPC test suites. Change-Id: I2c7b2413b8e642e7ac1e82d54c052b97d6149659 --- nova/rpc/impl_fake.py | 3 +- nova/tests/rpc/common.py | 101 +++++++++++++++++++++++++++++++++++------------ 2 files changed, 78 insertions(+), 26 deletions(-) diff --git a/nova/rpc/impl_fake.py b/nova/rpc/impl_fake.py index 70a8ca5f7..ccfd92943 100644 --- a/nova/rpc/impl_fake.py +++ b/nova/rpc/impl_fake.py @@ -169,10 +169,11 @@ def fanout_cast(conf, context, topic, msg): if not method: return args = msg.get('args', {}) + version = msg.get('version', None) for consumer in CONSUMERS.get(topic, []): try: - consumer.call(context, method, args, None) + consumer.call(context, version, method, args, None) except Exception: pass diff --git a/nova/tests/rpc/common.py b/nova/tests/rpc/common.py index 3c13869ba..84dd79890 100644 --- a/nova/tests/rpc/common.py +++ b/nova/tests/rpc/common.py @@ -21,6 +21,7 @@ Unit Tests for remote procedure calls shared between all implementations import time +import eventlet from eventlet import greenthread import nose @@ -39,28 +40,36 @@ LOG = logging.getLogger(__name__) class BaseRpcTestCase(test.TestCase): - def setUp(self, supports_timeouts=True): + def setUp(self, supports_timeouts=True, topic='test', + topic_nested='nested'): super(BaseRpcTestCase, self).setUp() + self.topic = topic or self.topic + self.topic_nested = topic_nested or self.topic_nested self.supports_timeouts = supports_timeouts self.context = context.get_admin_context() + if self.rpc: - self.conn = self.rpc.create_connection(FLAGS, True) receiver = TestReceiver() - self.dispatcher = rpc_dispatcher.RpcDispatcher([receiver]) - self.conn.create_consumer('test', self.dispatcher, False) - self.conn.consume_in_thread() + self.conn = self._create_consumer(receiver, self.topic) def tearDown(self): if self.rpc: self.conn.close() super(BaseRpcTestCase, self).tearDown() + def _create_consumer(self, proxy, topic, fanout=False): + dispatcher = rpc_dispatcher.RpcDispatcher([proxy]) + conn = self.rpc.create_connection(FLAGS, True) + conn.create_consumer(topic, dispatcher, fanout) + conn.consume_in_thread() + return conn + def test_call_succeed(self): if not self.rpc: raise nose.SkipTest('rpc driver not available.') value = 42 - result = self.rpc.call(FLAGS, self.context, 'test', + result = self.rpc.call(FLAGS, self.context, self.topic, {"method": "echo", "args": {"value": value}}) self.assertEqual(value, result) @@ -69,7 +78,7 @@ class BaseRpcTestCase(test.TestCase): raise nose.SkipTest('rpc driver not available.') value = 42 - result = self.rpc.call(FLAGS, self.context, 'test', + result = self.rpc.call(FLAGS, self.context, self.topic, {"method": "echo_three_times_yield", "args": {"value": value}}) self.assertEqual(value + 2, result) @@ -80,7 +89,7 @@ class BaseRpcTestCase(test.TestCase): value = 42 result = self.rpc.multicall(FLAGS, self.context, - 'test', + self.topic, {"method": "echo", "args": {"value": value}}) for i, x in enumerate(result): @@ -94,7 +103,7 @@ class BaseRpcTestCase(test.TestCase): value = 42 result = self.rpc.multicall(FLAGS, self.context, - 'test', + self.topic, {"method": "multicall_three_nones", "args": {"value": value}}) for i, x in enumerate(result): @@ -108,7 +117,7 @@ class BaseRpcTestCase(test.TestCase): value = 42 result = self.rpc.multicall(FLAGS, self.context, - 'test', + self.topic, {"method": "echo_three_times_yield", "args": {"value": value}}) for i, x in enumerate(result): @@ -121,10 +130,54 @@ class BaseRpcTestCase(test.TestCase): """Makes sure a context is passed through rpc call.""" value = 42 result = self.rpc.call(FLAGS, self.context, - 'test', {"method": "context", + self.topic, {"method": "context", "args": {"value": value}}) self.assertEqual(self.context.to_dict(), result) + def _test_cast(self, fanout=False): + """Test casts by pushing items through a channeled queue.""" + + # Not a true global, but capitalized so + # it is clear it is leaking scope into Nested() + QUEUE = eventlet.queue.Queue() + + if not self.rpc: + raise nose.SkipTest('rpc driver not available.') + + # We use the nested topic so we don't need QUEUE to be a proper + # global, and do not keep state outside this test. + class Nested(object): + @staticmethod + def put_queue(context, value): + LOG.debug("Got value in put_queue: %s", value) + QUEUE.put(value) + + nested = Nested() + conn = self._create_consumer(nested, self.topic_nested, fanout) + value = 42 + + method = (self.rpc.cast, self.rpc.fanout_cast)[fanout] + method(FLAGS, self.context, + self.topic_nested, + {"method": "put_queue", + "args": {"value": value}}) + + try: + # If it does not succeed in 2 seconds, give up and assume + # failure. + result = QUEUE.get(True, 2) + except Exception: + self.assertEqual(value, None) + + conn.close() + self.assertEqual(value, result) + + def test_cast_success(self): + self._test_cast(False) + + def test_fanout_success(self): + self._test_cast(True) + def test_nested_calls(self): if not self.rpc: raise nose.SkipTest('rpc driver not available.') @@ -133,7 +186,7 @@ class BaseRpcTestCase(test.TestCase): class Nested(object): @staticmethod def echo(context, queue, value): - """Calls echo in the passed queue""" + """Calls echo in the passed queue.""" LOG.debug(_("Nested received %(queue)s, %(value)s") % locals()) # TODO(comstud): @@ -147,15 +200,13 @@ class BaseRpcTestCase(test.TestCase): return value nested = Nested() - dispatcher = rpc_dispatcher.RpcDispatcher([nested]) - conn = self.rpc.create_connection(FLAGS, True) - conn.create_consumer('nested', dispatcher, False) - conn.consume_in_thread() + conn = self._create_consumer(nested, self.topic_nested) + value = 42 result = self.rpc.call(FLAGS, self.context, - 'nested', {"method": "echo", - "args": {"queue": "test", - "value": value}}) + self.topic_nested, + {"method": "echo", + "args": {"queue": "test", "value": value}}) conn.close() self.assertEqual(value, result) @@ -163,7 +214,7 @@ class BaseRpcTestCase(test.TestCase): if not self.rpc: raise nose.SkipTest('rpc driver not available.') - """Make sure rpc.call will time out""" + """Make sure rpc.call will time out.""" if not self.supports_timeouts: raise nose.SkipTest(_("RPC backend does not support timeouts")) @@ -171,12 +222,12 @@ class BaseRpcTestCase(test.TestCase): self.assertRaises(rpc_common.Timeout, self.rpc.call, FLAGS, self.context, - 'test', + self.topic, {"method": "block", "args": {"value": value}}, timeout=1) try: self.rpc.call(FLAGS, self.context, - 'test', + self.topic, {"method": "block", "args": {"value": value}}, timeout=1) @@ -186,7 +237,7 @@ class BaseRpcTestCase(test.TestCase): class BaseRpcAMQPTestCase(BaseRpcTestCase): - """Base test class for all AMQP-based RPC tests""" + """Base test class for all AMQP-based RPC tests.""" def test_proxycallback_handles_exceptions(self): """Make sure exceptions unpacking messages don't cause hangs.""" if not self.rpc: @@ -203,7 +254,7 @@ class BaseRpcAMQPTestCase(BaseRpcTestCase): self.stubs.Set(rpc_amqp, 'unpack_context', fake_unpack_context) value = 41 - self.rpc.cast(FLAGS, self.context, 'test', + self.rpc.cast(FLAGS, self.context, self.topic, {"method": "echo", "args": {"value": value}}) # Wait for the cast to complete. @@ -219,7 +270,7 @@ class BaseRpcAMQPTestCase(BaseRpcTestCase): self.stubs.Set(rpc_amqp, 'unpack_context', orig_unpack) value = 42 - result = self.rpc.call(FLAGS, self.context, 'test', + result = self.rpc.call(FLAGS, self.context, self.topic, {"method": "echo", "args": {"value": value}}) self.assertEqual(value, result) -- cgit