summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRaymond Pekowski <pekowski@gmail.com>2013-02-18 12:17:16 -0600
committerRaymond Pekowski <pekowski@gmail.com>2013-02-18 12:56:07 -0600
commit70891c271e011f59792933eaf65c3214493ef14a (patch)
tree1d6b27661ebf4d872fe8eb37c1ae65ffd154bc04
parent538721d3fdab613ed3d3ab33df123f90da173e93 (diff)
downloadoslo-70891c271e011f59792933eaf65c3214493ef14a.tar.gz
oslo-70891c271e011f59792933eaf65c3214493ef14a.tar.xz
oslo-70891c271e011f59792933eaf65c3214493ef14a.zip
blueprint amqp-rpc-fast-reply-queue
For AMQP based RPC, specifically RabbitMQ and Qpid, this change replaces the dynamically created RPC call reply queue with a single queue that is created on the first RPC call and used on all subsequent calls. It provides backward compatibility on the callee side by recognizing downlevel callers and on the caller side by adding a config option to revert to the old dynamically created queue based upon the msg_id. Change-Id: Idb09a71472866bd3950f58d4f7f45a3181eb40fc
-rw-r--r--openstack/common/rpc/amqp.py167
-rw-r--r--tests/unit/rpc/amqp.py235
-rw-r--r--tests/unit/rpc/common.py192
-rw-r--r--tests/unit/rpc/test_kombu.py4
4 files changed, 517 insertions, 81 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index 3d7ad5e..98cfd9d 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -32,13 +32,27 @@ import uuid
from eventlet import greenpool
from eventlet import pools
from eventlet import semaphore
+from eventlet import queue
+# TODO(pekowsk): Remove import cfg and below comment in Havana.
+# This import should no longer be needed when the amqp_rpc_single_reply_queue
+# option is removed.
+from oslo.config import cfg
from openstack.common import excutils
from openstack.common.gettextutils import _
from openstack.common import local
from openstack.common import log as logging
from openstack.common.rpc import common as rpc_common
+# TODO(pekowski): Remove this option in Havana.
+amqp_opts = [
+ cfg.BoolOpt('amqp_rpc_single_reply_queue',
+ default=False,
+ help='Enable a fast single reply queue if using AMQP based '
+ 'RPC like RabbitMQ or Qpid.'),
+]
+
+cfg.CONF.register_opts(amqp_opts)
LOG = logging.getLogger(__name__)
@@ -51,6 +65,7 @@ class Pool(pools.Pool):
kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size)
kwargs.setdefault("order_as_stack", True)
super(Pool, self).__init__(*args, **kwargs)
+ self.reply_proxy = None
# TODO(comstud): Timeout connections not used in a while
def create(self):
@@ -60,6 +75,16 @@ class Pool(pools.Pool):
def empty(self):
while self.free_items:
self.get().close()
+ # Force a new connection pool to be created.
+ # Note that this was added due to failing unit test cases. The issue
+ # is the above "while loop" gets all the cached connections from the
+ # pool and closes them, but never returns them to the pool, a pool
+ # leak. The unit tests hang waiting for an item to be returned to the
+ # pool. The unit tests get here via the teatDown() method. In the run
+ # time code, it gets here via cleanup() and only appears in service.py
+ # just before doing a sys.exit(), so cleanup() only happens once and
+ # the leakage is not a problem.
+ self.connection_cls.pool = None
_pool_create_sem = semaphore.Semaphore()
@@ -154,8 +179,45 @@ class ConnectionContext(rpc_common.Connection):
raise rpc_common.InvalidRPCConnectionReuse()
-def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
- ending=False, log_failure=True):
+class ReplyProxy(ConnectionContext):
+ """ Connection class for RPC replies / callbacks """
+ def __init__(self, conf, connection_pool):
+ self._call_waiters = {}
+ self._num_call_waiters = 0
+ self._num_call_waiters_wrn_threshhold = 10
+ self._reply_q = 'reply_' + uuid.uuid4().hex
+ super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
+ self.declare_direct_consumer(self._reply_q, self._process_data)
+ self.consume_in_thread()
+
+ def _process_data(self, message_data):
+ msg_id = message_data.pop('_msg_id', None)
+ waiter = self._call_waiters.get(msg_id)
+ if not waiter:
+ LOG.warn(_('no calling threads waiting for msg_id : %s'
+ ', message : %s') % (msg_id, message_data))
+ else:
+ waiter.put(message_data)
+
+ def add_call_waiter(self, waiter, msg_id):
+ self._num_call_waiters += 1
+ if self._num_call_waiters > self._num_call_waiters_wrn_threshhold:
+ LOG.warn(_('Number of call waiters is greater than warning '
+ 'threshhold: %d. There could be a MulticallProxyWaiter '
+ 'leak.') % self._num_call_waiters_wrn_threshhold)
+ self._num_call_waiters_wrn_threshhold *= 2
+ self._call_waiters[msg_id] = waiter
+
+ def del_call_waiter(self, msg_id):
+ self._num_call_waiters -= 1
+ del self._call_waiters[msg_id]
+
+ def get_reply_q(self):
+ return self._reply_q
+
+
+def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
+ failure=None, ending=False, log_failure=True):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
@@ -174,13 +236,21 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
'failure': failure}
if ending:
msg['ending'] = True
- conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
+ # If a reply_q exists, add the msg_id to the reply and pass the
+ # reply_q to direct_send() to use it as the response queue.
+ # Otherwise use the msg_id for backward compatibilty.
+ if reply_q:
+ msg['_msg_id'] = msg_id
+ conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
+ else:
+ conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
class RpcContext(rpc_common.CommonRpcContext):
"""Context that supports replying to a rpc.call"""
def __init__(self, **kwargs):
self.msg_id = kwargs.pop('msg_id', None)
+ self.reply_q = kwargs.pop('reply_q', None)
self.conf = kwargs.pop('conf')
super(RpcContext, self).__init__(**kwargs)
@@ -188,13 +258,14 @@ class RpcContext(rpc_common.CommonRpcContext):
values = self.to_dict()
values['conf'] = self.conf
values['msg_id'] = self.msg_id
+ values['reply_q'] = self.reply_q
return self.__class__(**values)
def reply(self, reply=None, failure=None, ending=False,
connection_pool=None, log_failure=True):
if self.msg_id:
- msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
- ending, log_failure)
+ msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool,
+ reply, failure, ending, log_failure)
if ending:
self.msg_id = None
@@ -210,6 +281,7 @@ def unpack_context(conf, msg):
value = msg.pop(key)
context_dict[key[9:]] = value
context_dict['msg_id'] = msg.pop('_msg_id', None)
+ context_dict['reply_q'] = msg.pop('_reply_q', None)
context_dict['conf'] = conf
ctx = RpcContext.from_dict(context_dict)
rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
@@ -339,6 +411,65 @@ class ProxyCallback(_ThreadPoolWithWait):
connection_pool=self.connection_pool)
+class MulticallProxyWaiter(object):
+ def __init__(self, conf, msg_id, timeout, connection_pool):
+ self._msg_id = msg_id
+ self._timeout = timeout or conf.rpc_response_timeout
+ self._reply_proxy = connection_pool.reply_proxy
+ self._done = False
+ self._got_ending = False
+ self._conf = conf
+ self._dataqueue = queue.LightQueue()
+ # Add this caller to the reply proxy's call_waiters
+ self._reply_proxy.add_call_waiter(self, self._msg_id)
+
+ def put(self, data):
+ self._dataqueue.put(data)
+
+ def done(self):
+ if self._done:
+ return
+ self._done = True
+ # Remove this caller from reply proxy's call_waiters
+ self._reply_proxy.del_call_waiter(self._msg_id)
+
+ def _process_data(self, data):
+ result = None
+ if data['failure']:
+ failure = data['failure']
+ result = rpc_common.deserialize_remote_exception(self._conf,
+ failure)
+ elif data.get('ending', False):
+ self._got_ending = True
+ else:
+ result = data['result']
+ return result
+
+ def __iter__(self):
+ """Return a result until we get a reply with an 'ending" flag"""
+ if self._done:
+ raise StopIteration
+ while True:
+ try:
+ data = self._dataqueue.get(timeout=self._timeout)
+ result = self._process_data(data)
+ except queue.Empty:
+ LOG.exception(_('Timed out waiting for RPC response.'))
+ self.done()
+ raise rpc_common.Timeout()
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ self.done()
+ if self._got_ending:
+ self.done()
+ raise StopIteration
+ if isinstance(result, Exception):
+ self.done()
+ raise result
+ yield result
+
+
+#TODO(pekowski): Remove MulticallWaiter() in Havana.
class MulticallWaiter(object):
def __init__(self, conf, connection, timeout):
self._connection = connection
@@ -394,22 +525,40 @@ def create_connection(conf, new, connection_pool):
return ConnectionContext(conf, connection_pool, pooled=not new)
+_reply_proxy_create_sem = semaphore.Semaphore()
+
+
def multicall(conf, context, topic, msg, timeout, connection_pool):
"""Make a call that returns multiple times."""
+ # TODO(pekowski): Remove all these comments in Havana.
+ # For amqp_rpc_single_reply_queue = False,
# Can't use 'with' for multicall, as it returns an iterator
# that will continue to use the connection. When it's done,
# connection.close() will get called which will put it back into
# the pool
+ # For amqp_rpc_single_reply_queue = True,
+ # The 'with' statement is mandatory for closing the connection
LOG.debug(_('Making synchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
pack_context(msg, context)
- conn = ConnectionContext(conf, connection_pool)
- wait_msg = MulticallWaiter(conf, conn, timeout)
- conn.declare_direct_consumer(msg_id, wait_msg)
- conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
+ # TODO(pekowski): Remove this flag and the code under the if clause
+ # in Havana.
+ if not conf.amqp_rpc_single_reply_queue:
+ conn = ConnectionContext(conf, connection_pool)
+ wait_msg = MulticallWaiter(conf, conn, timeout)
+ conn.declare_direct_consumer(msg_id, wait_msg)
+ conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
+ else:
+ with _reply_proxy_create_sem:
+ if not connection_pool.reply_proxy:
+ connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
+ msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
+ wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
+ with ConnectionContext(conf, connection_pool) as conn:
+ conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
return wait_msg
diff --git a/tests/unit/rpc/amqp.py b/tests/unit/rpc/amqp.py
new file mode 100644
index 0000000..1e4733c
--- /dev/null
+++ b/tests/unit/rpc/amqp.py
@@ -0,0 +1,235 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Unit Tests for AMQP-based remote procedure calls
+"""
+
+import logging
+
+from eventlet import greenthread
+from oslo.config import cfg
+
+from openstack.common.gettextutils import _
+from openstack.common import jsonutils
+from openstack.common.rpc import amqp as rpc_amqp
+from openstack.common.rpc import common as rpc_common
+from tests.unit.rpc import common
+
+
+FLAGS = cfg.CONF
+LOG = logging.getLogger(__name__)
+
+
+class BaseRpcAMQPTestCase(common.BaseRpcTestCase):
+ """Base test class for all AMQP-based RPC tests."""
+ def test_proxycallback_handles_exceptions(self):
+ """Make sure exceptions unpacking messages don't cause hangs."""
+ if not self.rpc:
+ self.skipTest('rpc driver not available.')
+
+ orig_unpack = rpc_amqp.unpack_context
+
+ info = {'unpacked': False}
+
+ def fake_unpack_context(*args, **kwargs):
+ info['unpacked'] = True
+ raise test.TestingException('moo')
+
+ self.stubs.Set(rpc_amqp, 'unpack_context', fake_unpack_context)
+
+ value = 41
+ self.rpc.cast(FLAGS, self.context, self.topic,
+ {"method": "echo", "args": {"value": value}})
+
+ # Wait for the cast to complete.
+ for x in xrange(50):
+ if info['unpacked']:
+ break
+ greenthread.sleep(0.1)
+ else:
+ self.fail("Timeout waiting for message to be consumed")
+
+ # Now see if we get a response even though we raised an
+ # exception for the cast above.
+ self.stubs.Set(rpc_amqp, 'unpack_context', orig_unpack)
+
+ value = 42
+ result = self.rpc.call(FLAGS, self.context, self.topic,
+ {"method": "echo",
+ "args": {"value": value}})
+ self.assertEqual(value, result)
+
+ def test_notification_envelope(self):
+ raw_msg = {'a': 'b'}
+ self.test_msg = None
+
+ def fake_notify_send(_conn, topic, msg):
+ self.test_msg = msg
+
+ self.stubs.Set(self.rpc.Connection, 'notify_send', fake_notify_send)
+
+ self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg,
+ envelope=False)
+ self.assertEqual(self.test_msg, raw_msg)
+
+ # Envelopes enabled, but not enabled for notifications
+ self.stubs.Set(rpc_common, '_SEND_RPC_ENVELOPE', True)
+ self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg,
+ envelope=False)
+ self.assertEqual(self.test_msg, raw_msg)
+
+ # Now turn it on for notifications
+ msg = {
+ 'oslo.version': rpc_common._RPC_ENVELOPE_VERSION,
+ 'oslo.message': jsonutils.dumps(raw_msg),
+ }
+ self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg,
+ envelope=True)
+ self.assertEqual(self.test_msg, msg)
+
+ # Make sure envelopes are still on notifications, even if turned off
+ # for general messages.
+ self.stubs.Set(rpc_common, '_SEND_RPC_ENVELOPE', False)
+ self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg,
+ envelope=True)
+ self.assertEqual(self.test_msg, msg)
+
+ def test_single_reply_queue_on_has_ids(
+ self, single_reply_queue_for_callee_off=False):
+ if not self.rpc:
+ self.skipTest('rpc driver not available.')
+
+ # TODO(pekowski): Remove these lines in Havana where the option will be
+ # removed and the default will be true.
+ self.assertFalse(FLAGS.amqp_rpc_single_reply_queue)
+ self.config(amqp_rpc_single_reply_queue=True)
+
+ self.orig_unpack_context = rpc_amqp.unpack_context
+
+ def my_unpack_context(conf, msg):
+ self.assertTrue('_reply_q' in msg)
+ if single_reply_queue_for_callee_off:
+ # Simulate a downlevel RPC callee by removing the reply_q.
+ # This will make the callee think it got a request
+ # from a downlevel caller and thus respond in a downlevel
+ # way. In fact we are testing an uplevel caller.
+ msg.pop('_reply_q')
+ return self.orig_unpack_context(conf, msg)
+
+ self.stubs.Set(rpc_amqp, 'unpack_context', my_unpack_context)
+
+ self.ReplyProxy_was_called = False
+
+ class MyReplyProxy(rpc_amqp.ReplyProxy):
+ def _process_data(myself, message_data):
+ #with open('mylog', 'a') as f:
+ # f.write('my_process_data: ' + str(message_data) + '\n')
+ if single_reply_queue_for_callee_off:
+ self.assertTrue('_msg_id' not in message_data)
+ else:
+ self.assertTrue('_msg_id' in message_data)
+ self.ReplyProxy_was_called = True
+ super(MyReplyProxy, myself)._process_data(message_data)
+
+ self.orig_reply_proxy = self.conn.pool.reply_proxy
+ self.conn.pool.reply_proxy = MyReplyProxy(FLAGS, self.conn.pool)
+
+ value = 42
+ result = None
+ try:
+ result = self.rpc.call(
+ FLAGS, self.context, self.topic,
+ {"method": "echo", "args": {"value": value}},
+ timeout=1)
+ except rpc_common.Timeout as exc:
+ # expect a timeout in this case
+ if single_reply_queue_for_callee_off:
+ result = 42
+
+ self.assertEqual(value, result)
+ if single_reply_queue_for_callee_off:
+ self.assertFalse(self.ReplyProxy_was_called)
+ else:
+ self.assertTrue(self.ReplyProxy_was_called)
+
+ self.stubs.UnsetAll()
+ self.conn.pool.reply_proxy = self.orig_reply_proxy
+
+ # TODO(pekowski): Remove this line in Havana
+ self.config(amqp_rpc_single_reply_queue=False)
+
+ # TODO(pekowski): Unfortunately remove this test in Havana.
+ # The amqp_rpc_single_reply_queue option will go away in Havana.
+ # There will be no way to send a downlevel RPC in Havana, yet
+ # Havana will be able to receive downlevel RPCs. We would
+ # need a downlevel caller to test it.
+ def test_single_reply_queue_off_no_ids(
+ self, single_reply_queue_for_callee_on=False):
+ if not self.rpc:
+ self.skipTest('rpc driver not available.')
+
+ self.assertFalse(FLAGS.amqp_rpc_single_reply_queue)
+
+ def my_unpack_context(conf, msg):
+ self.assertTrue('_reply_q' not in msg)
+ if single_reply_queue_for_callee_on:
+ self.config(amqp_rpc_single_reply_queue=True)
+ return self.orig_unpack_context(conf, msg)
+
+ self.orig_unpack_context = rpc_amqp.unpack_context
+ self.stubs.Set(rpc_amqp, 'unpack_context', my_unpack_context)
+
+ self.MulticallWaiter_call_was_called = False
+
+ def my_MulticallWaiter_call(myself, data):
+ #with open('mylog', 'a') as f:
+ # f.write('my_MulticallWaiter_call: ' + str(data) + '\n')
+ self.assertTrue('_reply_q' not in data)
+ self.MulticallWaiter_call_was_called = True
+ return self.orig_MulticallWaiter_call(myself, data)
+
+ self.orig_MulticallWaiter_call = rpc_amqp.MulticallWaiter.__call__
+ self.stubs.Set(rpc_amqp.MulticallWaiter, '__call__',
+ my_MulticallWaiter_call)
+
+ value = 42
+ result = self.rpc.call(FLAGS, self.context, self.topic,
+ {"method": "echo", "args": {"value": value}})
+ self.assertEqual(value, result)
+ self.assertTrue(self.MulticallWaiter_call_was_called)
+
+ self.config(amqp_rpc_single_reply_queue=False)
+ self.stubs.UnsetAll()
+
+ # TODO(pekowski): Remove this test in Havana.
+ def test_single_reply_queue_caller_off_callee_on(self):
+ self.test_single_reply_queue_off_no_ids(
+ single_reply_queue_for_callee_on=True)
+
+ def test_single_reply_queue_caller_on_callee_off(self):
+ self.test_single_reply_queue_on_has_ids(
+ single_reply_queue_for_callee_off=True)
+
+ #TODO(pekowski): remove this test in Havana
+ def test_single_reply_queue_mt_resp_rting(self):
+ if not self.rpc:
+ self.skipTest('rpc driver not available.')
+
+ self.config(amqp_rpc_single_reply_queue=True)
+ self.test_multithreaded_resp_routing()
+ self.config(amqp_rpc_single_reply_queue=False)
diff --git a/tests/unit/rpc/common.py b/tests/unit/rpc/common.py
index 5a655c6..6359142 100644
--- a/tests/unit/rpc/common.py
+++ b/tests/unit/rpc/common.py
@@ -21,6 +21,7 @@ Unit Tests for remote procedure calls shared between all implementations
import logging
import time
+import datetime
import eventlet
from eventlet import greenthread
@@ -29,7 +30,6 @@ from oslo.config import cfg
from openstack.common import exception
from openstack.common.gettextutils import _
from openstack.common import jsonutils
-from openstack.common.rpc import amqp as rpc_amqp
from openstack.common.rpc import common as rpc_common
from openstack.common.rpc import dispatcher as rpc_dispatcher
from tests import utils as test_utils
@@ -252,80 +252,124 @@ class BaseRpcTestCase(test_utils.BaseTestCase):
except rpc_common.Timeout as exc:
pass
-
-class BaseRpcAMQPTestCase(BaseRpcTestCase):
- """Base test class for all AMQP-based RPC tests."""
- def test_proxycallback_handles_exceptions(self):
- """Make sure exceptions unpacking messages don't cause hangs."""
+ def test_multithreaded_resp_routing(self):
if not self.rpc:
self.skipTest('rpc driver not available.')
- orig_unpack = rpc_amqp.unpack_context
-
- info = {'unpacked': False}
-
- def fake_unpack_context(*args, **kwargs):
- info['unpacked'] = True
- raise test.TestingException('moo')
-
- self.stubs.Set(rpc_amqp, 'unpack_context', fake_unpack_context)
-
- value = 41
- self.rpc.cast(FLAGS, self.context, self.topic,
- {"method": "echo", "args": {"value": value}})
-
- # Wait for the cast to complete.
- for x in xrange(50):
- if info['unpacked']:
- break
- greenthread.sleep(0.1)
- else:
- self.fail("Timeout waiting for message to be consumed")
-
- # Now see if we get a response even though we raised an
- # exception for the cast above.
- self.stubs.Set(rpc_amqp, 'unpack_context', orig_unpack)
-
- value = 42
- result = self.rpc.call(FLAGS, self.context, self.topic,
- {"method": "echo",
- "args": {"value": value}})
- self.assertEqual(value, result)
-
- def test_notification_envelope(self):
- raw_msg = {'a': 'b'}
- self.test_msg = None
-
- def fake_notify_send(_conn, topic, msg):
- self.test_msg = msg
-
- self.stubs.Set(self.rpc.Connection, 'notify_send', fake_notify_send)
-
- self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg,
- envelope=False)
- self.assertEqual(self.test_msg, raw_msg)
-
- # Envelopes enabled, but not enabled for notifications
- self.stubs.Set(rpc_common, '_SEND_RPC_ENVELOPE', True)
- self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg,
- envelope=False)
- self.assertEqual(self.test_msg, raw_msg)
+ global synced_echo_call
+ synced_echo_call = SyncedEchoCall()
+
+ callid1 = synced_echo_call.spawn(self.rpc.call, FLAGS, self.context,
+ self.topic, value=1)
+ callid2 = synced_echo_call.spawn(self.rpc.call, FLAGS, self.context,
+ self.topic, value=2)
+ callid3 = synced_echo_call.spawn(self.rpc.call, FLAGS, self.context,
+ self.topic, value=3)
+
+ r3 = synced_echo_call.post(callid3)
+ self.assertEqual(synced_echo_call.wait_states(),
+ synced_echo_call.expected_wait_states())
+ r1 = synced_echo_call.post(callid1)
+ self.assertEqual(synced_echo_call.wait_states(),
+ synced_echo_call.expected_wait_states())
+ r2 = synced_echo_call.post(callid2)
+ self.assertEqual(synced_echo_call.wait_states(),
+ synced_echo_call.expected_wait_states())
+
+ #synced_echo_call.print_times() #for DEBUG
+ self.assertEqual((r1, r2, r3), (1, 2, 3))
+ self.assertTrue(synced_echo_call.verify_time_order(callid3, callid1,
+ callid2))
+
+synced_echo_call = None
+
+
+def rpc_wrapper(callid, func, *args):
+ """This wrapper was added because tests would hang when there was a bug
+ that caused the RPC to timeout. The post event would hang waiting for
+ the wait event. The missing wait is added here. It just makes
+ debugging the unit tests easier.
+ """
+ try:
+ ret = func(*args)
+ except rpc_common.Timeout as exc:
+ synced_echo_call.wait(callid)
+ ret = None
+ return ret
- # Now turn it on for notifications
- msg = {
- 'oslo.version': rpc_common._RPC_ENVELOPE_VERSION,
- 'oslo.message': jsonutils.dumps(raw_msg),
- }
- self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg,
- envelope=True)
- self.assertEqual(self.test_msg, msg)
- # Make sure envelopes are still on notifications, even if turned off
- # for general messages.
- self.stubs.Set(rpc_common, '_SEND_RPC_ENVELOPE', False)
- self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg,
- envelope=True)
- self.assertEqual(self.test_msg, msg)
+class SyncedEchoCall():
+ """Class to control the synchronization of the synced_echo method of the
+ TestReceiver class
+ """
+ class data():
+ def __init__(self):
+ self.gthread = None
+ self.event = eventlet.event.Event()
+ self.waiting = False
+ self.expected_wait_state = False
+ self.time = 0
+
+ def __init__(self):
+ self.list = []
+
+ def spawn(self, *args, **kwargs):
+ idx = len(self.list)
+ self.list.append(SyncedEchoCall.data())
+ args = list(args)
+ value = kwargs['value']
+ args.append({"method": "synced_echo", "args":
+ {"value": value, "callid": idx}})
+ args.insert(0, idx)
+ args.insert(0, rpc_wrapper)
+ self.list[idx].gthread = eventlet.spawn(*args)
+ self.list[idx].expected_wait_state = True
+ return idx
+
+ def wait_states(self):
+ rlist = []
+ for i in self.list:
+ rlist.append(i.waiting)
+ return rlist
+
+ def expected_wait_states(self):
+ rlist = []
+ for i in self.list:
+ rlist.append(i.expected_wait_state)
+ return rlist
+
+ def post(self, idx):
+ self.list[idx].event.send()
+ retval = self.list[idx].gthread.wait()
+ self.list[idx].expected_wait_state = False
+ #self.print_wait_states() #for DEBUG
+ return retval
+
+ def wait(self, idx):
+ self.list[idx].waiting = True
+ self.list[idx].event.wait()
+ self.list[idx].waiting = False
+ self.list[idx].time = datetime.datetime.now()
+
+ def verify_time_order(self, idx1, idx2, idx3):
+ return self.list[idx1].time < self.list[idx2].time and \
+ self.list[idx2].time < self.list[idx3].time
+
+ # for DEBUG
+ #def print_times(self):
+ # # change /dev/null to name to get output to a log file
+ # with open('mylog', 'a') as f:
+ # f.write('SyncedEchoCall times: ' + '\n')
+ # f.write(' ' + str(self.list[0].time) + '\n')
+ # f.write(' ' + str(self.list[1].time) + '\n')
+ # f.write(' ' + str(self.list[2].time) + '\n')
+
+ # for DEBUG
+ #def print_wait_states(self):
+ # # change /dev/null to name to get output to a log file
+ # with open('mylog', 'a') as f:
+ # f.write('SyncedEchoCall times: ' +
+ # str(self.wait_states()) + '\n')
class TestReceiver(object):
@@ -341,6 +385,14 @@ class TestReceiver(object):
return value
@staticmethod
+ def synced_echo(context, value, callid):
+ """Waits on the event identified by callid."""
+ LOG.debug(_("Received %s"), value)
+ global synced_echo_call
+ synced_echo_call.wait(callid)
+ return value
+
+ @staticmethod
def fortytwo(context):
"""Simply returns 42."""
return 42
diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py
index cadb630..4a76900 100644
--- a/tests/unit/rpc/test_kombu.py
+++ b/tests/unit/rpc/test_kombu.py
@@ -32,7 +32,7 @@ from oslo.config import cfg
from openstack.common import exception
from openstack.common.rpc import amqp as rpc_amqp
from openstack.common.rpc import common as rpc_common
-from tests.unit.rpc import common
+from tests.unit.rpc import amqp
from tests import utils
try:
@@ -77,7 +77,7 @@ class KombuStubs:
self.rpc = None
-class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
+class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase):
def setUp(self):
KombuStubs.setUp(self)
super(RpcKombuTestCase, self).setUp()