summaryrefslogtreecommitdiffstats
path: root/tests/unit/rpc/common.py
diff options
context:
space:
mode:
authorRaymond Pekowski <pekowski@gmail.com>2013-02-18 12:17:16 -0600
committerRaymond Pekowski <pekowski@gmail.com>2013-02-18 12:56:07 -0600
commit70891c271e011f59792933eaf65c3214493ef14a (patch)
tree1d6b27661ebf4d872fe8eb37c1ae65ffd154bc04 /tests/unit/rpc/common.py
parent538721d3fdab613ed3d3ab33df123f90da173e93 (diff)
downloadoslo-70891c271e011f59792933eaf65c3214493ef14a.tar.gz
oslo-70891c271e011f59792933eaf65c3214493ef14a.tar.xz
oslo-70891c271e011f59792933eaf65c3214493ef14a.zip
blueprint amqp-rpc-fast-reply-queue
For AMQP based RPC, specifically RabbitMQ and Qpid, this change replaces the dynamically created RPC call reply queue with a single queue that is created on the first RPC call and used on all subsequent calls. It provides backward compatibility on the callee side by recognizing downlevel callers and on the caller side by adding a config option to revert to the old dynamically created queue based upon the msg_id. Change-Id: Idb09a71472866bd3950f58d4f7f45a3181eb40fc
Diffstat (limited to 'tests/unit/rpc/common.py')
-rw-r--r--tests/unit/rpc/common.py192
1 files changed, 122 insertions, 70 deletions
diff --git a/tests/unit/rpc/common.py b/tests/unit/rpc/common.py
index 5a655c6..6359142 100644
--- a/tests/unit/rpc/common.py
+++ b/tests/unit/rpc/common.py
@@ -21,6 +21,7 @@ Unit Tests for remote procedure calls shared between all implementations
import logging
import time
+import datetime
import eventlet
from eventlet import greenthread
@@ -29,7 +30,6 @@ from oslo.config import cfg
from openstack.common import exception
from openstack.common.gettextutils import _
from openstack.common import jsonutils
-from openstack.common.rpc import amqp as rpc_amqp
from openstack.common.rpc import common as rpc_common
from openstack.common.rpc import dispatcher as rpc_dispatcher
from tests import utils as test_utils
@@ -252,80 +252,124 @@ class BaseRpcTestCase(test_utils.BaseTestCase):
except rpc_common.Timeout as exc:
pass
-
-class BaseRpcAMQPTestCase(BaseRpcTestCase):
- """Base test class for all AMQP-based RPC tests."""
- def test_proxycallback_handles_exceptions(self):
- """Make sure exceptions unpacking messages don't cause hangs."""
+ def test_multithreaded_resp_routing(self):
if not self.rpc:
self.skipTest('rpc driver not available.')
- orig_unpack = rpc_amqp.unpack_context
-
- info = {'unpacked': False}
-
- def fake_unpack_context(*args, **kwargs):
- info['unpacked'] = True
- raise test.TestingException('moo')
-
- self.stubs.Set(rpc_amqp, 'unpack_context', fake_unpack_context)
-
- value = 41
- self.rpc.cast(FLAGS, self.context, self.topic,
- {"method": "echo", "args": {"value": value}})
-
- # Wait for the cast to complete.
- for x in xrange(50):
- if info['unpacked']:
- break
- greenthread.sleep(0.1)
- else:
- self.fail("Timeout waiting for message to be consumed")
-
- # Now see if we get a response even though we raised an
- # exception for the cast above.
- self.stubs.Set(rpc_amqp, 'unpack_context', orig_unpack)
-
- value = 42
- result = self.rpc.call(FLAGS, self.context, self.topic,
- {"method": "echo",
- "args": {"value": value}})
- self.assertEqual(value, result)
-
- def test_notification_envelope(self):
- raw_msg = {'a': 'b'}
- self.test_msg = None
-
- def fake_notify_send(_conn, topic, msg):
- self.test_msg = msg
-
- self.stubs.Set(self.rpc.Connection, 'notify_send', fake_notify_send)
-
- self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg,
- envelope=False)
- self.assertEqual(self.test_msg, raw_msg)
-
- # Envelopes enabled, but not enabled for notifications
- self.stubs.Set(rpc_common, '_SEND_RPC_ENVELOPE', True)
- self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg,
- envelope=False)
- self.assertEqual(self.test_msg, raw_msg)
+ global synced_echo_call
+ synced_echo_call = SyncedEchoCall()
+
+ callid1 = synced_echo_call.spawn(self.rpc.call, FLAGS, self.context,
+ self.topic, value=1)
+ callid2 = synced_echo_call.spawn(self.rpc.call, FLAGS, self.context,
+ self.topic, value=2)
+ callid3 = synced_echo_call.spawn(self.rpc.call, FLAGS, self.context,
+ self.topic, value=3)
+
+ r3 = synced_echo_call.post(callid3)
+ self.assertEqual(synced_echo_call.wait_states(),
+ synced_echo_call.expected_wait_states())
+ r1 = synced_echo_call.post(callid1)
+ self.assertEqual(synced_echo_call.wait_states(),
+ synced_echo_call.expected_wait_states())
+ r2 = synced_echo_call.post(callid2)
+ self.assertEqual(synced_echo_call.wait_states(),
+ synced_echo_call.expected_wait_states())
+
+ #synced_echo_call.print_times() #for DEBUG
+ self.assertEqual((r1, r2, r3), (1, 2, 3))
+ self.assertTrue(synced_echo_call.verify_time_order(callid3, callid1,
+ callid2))
+
+synced_echo_call = None
+
+
+def rpc_wrapper(callid, func, *args):
+ """This wrapper was added because tests would hang when there was a bug
+ that caused the RPC to timeout. The post event would hang waiting for
+ the wait event. The missing wait is added here. It just makes
+ debugging the unit tests easier.
+ """
+ try:
+ ret = func(*args)
+ except rpc_common.Timeout as exc:
+ synced_echo_call.wait(callid)
+ ret = None
+ return ret
- # Now turn it on for notifications
- msg = {
- 'oslo.version': rpc_common._RPC_ENVELOPE_VERSION,
- 'oslo.message': jsonutils.dumps(raw_msg),
- }
- self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg,
- envelope=True)
- self.assertEqual(self.test_msg, msg)
- # Make sure envelopes are still on notifications, even if turned off
- # for general messages.
- self.stubs.Set(rpc_common, '_SEND_RPC_ENVELOPE', False)
- self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg,
- envelope=True)
- self.assertEqual(self.test_msg, msg)
+class SyncedEchoCall():
+ """Class to control the synchronization of the synced_echo method of the
+ TestReceiver class
+ """
+ class data():
+ def __init__(self):
+ self.gthread = None
+ self.event = eventlet.event.Event()
+ self.waiting = False
+ self.expected_wait_state = False
+ self.time = 0
+
+ def __init__(self):
+ self.list = []
+
+ def spawn(self, *args, **kwargs):
+ idx = len(self.list)
+ self.list.append(SyncedEchoCall.data())
+ args = list(args)
+ value = kwargs['value']
+ args.append({"method": "synced_echo", "args":
+ {"value": value, "callid": idx}})
+ args.insert(0, idx)
+ args.insert(0, rpc_wrapper)
+ self.list[idx].gthread = eventlet.spawn(*args)
+ self.list[idx].expected_wait_state = True
+ return idx
+
+ def wait_states(self):
+ rlist = []
+ for i in self.list:
+ rlist.append(i.waiting)
+ return rlist
+
+ def expected_wait_states(self):
+ rlist = []
+ for i in self.list:
+ rlist.append(i.expected_wait_state)
+ return rlist
+
+ def post(self, idx):
+ self.list[idx].event.send()
+ retval = self.list[idx].gthread.wait()
+ self.list[idx].expected_wait_state = False
+ #self.print_wait_states() #for DEBUG
+ return retval
+
+ def wait(self, idx):
+ self.list[idx].waiting = True
+ self.list[idx].event.wait()
+ self.list[idx].waiting = False
+ self.list[idx].time = datetime.datetime.now()
+
+ def verify_time_order(self, idx1, idx2, idx3):
+ return self.list[idx1].time < self.list[idx2].time and \
+ self.list[idx2].time < self.list[idx3].time
+
+ # for DEBUG
+ #def print_times(self):
+ # # change /dev/null to name to get output to a log file
+ # with open('mylog', 'a') as f:
+ # f.write('SyncedEchoCall times: ' + '\n')
+ # f.write(' ' + str(self.list[0].time) + '\n')
+ # f.write(' ' + str(self.list[1].time) + '\n')
+ # f.write(' ' + str(self.list[2].time) + '\n')
+
+ # for DEBUG
+ #def print_wait_states(self):
+ # # change /dev/null to name to get output to a log file
+ # with open('mylog', 'a') as f:
+ # f.write('SyncedEchoCall times: ' +
+ # str(self.wait_states()) + '\n')
class TestReceiver(object):
@@ -341,6 +385,14 @@ class TestReceiver(object):
return value
@staticmethod
+ def synced_echo(context, value, callid):
+ """Waits on the event identified by callid."""
+ LOG.debug(_("Received %s"), value)
+ global synced_echo_call
+ synced_echo_call.wait(callid)
+ return value
+
+ @staticmethod
def fortytwo(context):
"""Simply returns 42."""
return 42