summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRaymond Pekowski <pekowski@gmail.com>2013-06-05 11:40:23 -0500
committerRaymond Pekowski <pekowski@gmail.com>2013-06-05 11:41:09 -0500
commit6d42ced0d6d73783e901d6c4f840d0f2465e31bf (patch)
treee3435060ae75941944c375d6c8272a209dab3cbc
parent3f315db8dab8a859e7204e7ac184ea9796273a74 (diff)
downloadoslo-6d42ced0d6d73783e901d6c4f840d0f2465e31bf.tar.gz
oslo-6d42ced0d6d73783e901d6c4f840d0f2465e31bf.tar.xz
oslo-6d42ced0d6d73783e901d6c4f840d0f2465e31bf.zip
Remove the amqp_rpc_single_reply_queue option from Havana
blueprint amqp-rpc-fast-reply-queue As part of the Grizzly code review for the above blueprint (Idb09a714), the Grizzly code base is "aware" of this feature in terms of receiving RPCs and provided with an option for enabling it on the RPC send side. It was decided that in Havana the option would be removed, RPCs would only be sent using the new functionality yet awareness of prior functionality be retained for the receiving of RPCs. This code change implements the Havana piece. Change-Id: I5f2a8e432cf84a637c1aa813656c1cbc8db6e7eb
-rw-r--r--openstack/common/rpc/amqp.py100
-rw-r--r--tests/unit/rpc/amqp.py74
-rw-r--r--tests/unit/rpc/test_qpid.py114
3 files changed, 65 insertions, 223 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index f5b7cab..f46f990 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -34,10 +34,6 @@ from eventlet import greenpool
from eventlet import pools
from eventlet import queue
from eventlet import semaphore
-# TODO(pekowsk): Remove import cfg and below comment in Havana.
-# This import should no longer be needed when the amqp_rpc_single_reply_queue
-# option is removed.
-from oslo.config import cfg
from openstack.common import excutils
from openstack.common.gettextutils import _
@@ -46,16 +42,6 @@ from openstack.common import log as logging
from openstack.common.rpc import common as rpc_common
-# TODO(pekowski): Remove this option in Havana.
-amqp_opts = [
- cfg.BoolOpt('amqp_rpc_single_reply_queue',
- default=False,
- help='Enable a fast single reply queue if using AMQP based '
- 'RPC like RabbitMQ or Qpid.'),
-]
-
-cfg.CONF.register_opts(amqp_opts)
-
UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)
@@ -83,7 +69,7 @@ class Pool(pools.Pool):
# is the above "while loop" gets all the cached connections from the
# pool and closes them, but never returns them to the pool, a pool
# leak. The unit tests hang waiting for an item to be returned to the
- # pool. The unit tests get here via the teatDown() method. In the run
+ # pool. The unit tests get here via the tearDown() method. In the run
# time code, it gets here via cleanup() and only appears in service.py
# just before doing a sys.exit(), so cleanup() only happens once and
# the leakage is not a problem.
@@ -197,9 +183,10 @@ class ReplyProxy(ConnectionContext):
msg_id = message_data.pop('_msg_id', None)
waiter = self._call_waiters.get(msg_id)
if not waiter:
- LOG.warn(_('no calling threads waiting for msg_id : %(msg_id)s'
+ LOG.warn(_('No calling threads waiting for msg_id : %(msg_id)s'
', message : %(data)s'), {'msg_id': msg_id,
'data': message_data})
+ LOG.warn(_('_call_waiters: %s') % str(self._call_waiters))
else:
waiter.put(message_data)
@@ -513,59 +500,6 @@ class MulticallProxyWaiter(object):
yield result
-#TODO(pekowski): Remove MulticallWaiter() in Havana.
-class MulticallWaiter(object):
- def __init__(self, conf, connection, timeout):
- self._connection = connection
- self._iterator = connection.iterconsume(timeout=timeout or
- conf.rpc_response_timeout)
- self._result = None
- self._done = False
- self._got_ending = False
- self._conf = conf
- self.msg_id_cache = _MsgIdCache()
-
- def done(self):
- if self._done:
- return
- self._done = True
- self._iterator.close()
- self._iterator = None
- self._connection.close()
-
- def __call__(self, data):
- """The consume() callback will call this. Store the result."""
- self.msg_id_cache.check_duplicate_message(data)
- if data['failure']:
- failure = data['failure']
- self._result = rpc_common.deserialize_remote_exception(self._conf,
- failure)
-
- elif data.get('ending', False):
- self._got_ending = True
- else:
- self._result = data['result']
-
- def __iter__(self):
- """Return a result until we get a 'None' response from consumer"""
- if self._done:
- raise StopIteration
- while True:
- try:
- self._iterator.next()
- except Exception:
- with excutils.save_and_reraise_exception():
- self.done()
- if self._got_ending:
- self.done()
- raise StopIteration
- result = self._result
- if isinstance(result, Exception):
- self.done()
- raise result
- yield result
-
-
def create_connection(conf, new, connection_pool):
"""Create a connection."""
return ConnectionContext(conf, connection_pool, pooled=not new)
@@ -576,14 +510,6 @@ _reply_proxy_create_sem = semaphore.Semaphore()
def multicall(conf, context, topic, msg, timeout, connection_pool):
"""Make a call that returns multiple times."""
- # TODO(pekowski): Remove all these comments in Havana.
- # For amqp_rpc_single_reply_queue = False,
- # Can't use 'with' for multicall, as it returns an iterator
- # that will continue to use the connection. When it's done,
- # connection.close() will get called which will put it back into
- # the pool
- # For amqp_rpc_single_reply_queue = True,
- # The 'with' statement is mandatory for closing the connection
LOG.debug(_('Making synchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
@@ -591,21 +517,13 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
_add_unique_id(msg)
pack_context(msg, context)
- # TODO(pekowski): Remove this flag and the code under the if clause
- # in Havana.
- if not conf.amqp_rpc_single_reply_queue:
- conn = ConnectionContext(conf, connection_pool)
- wait_msg = MulticallWaiter(conf, conn, timeout)
- conn.declare_direct_consumer(msg_id, wait_msg)
+ with _reply_proxy_create_sem:
+ if not connection_pool.reply_proxy:
+ connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
+ msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
+ wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
+ with ConnectionContext(conf, connection_pool) as conn:
conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
- else:
- with _reply_proxy_create_sem:
- if not connection_pool.reply_proxy:
- connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
- msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
- wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
- with ConnectionContext(conf, connection_pool) as conn:
- conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
return wait_msg
diff --git a/tests/unit/rpc/amqp.py b/tests/unit/rpc/amqp.py
index 432dd35..83713c7 100644
--- a/tests/unit/rpc/amqp.py
+++ b/tests/unit/rpc/amqp.py
@@ -97,16 +97,11 @@ class BaseRpcAMQPTestCase(common.BaseRpcTestCase):
# Make sure the msg envelope was applied
self.assertTrue('oslo.version' in self.test_msg)
- def test_single_reply_queue_on_has_ids(
+ def test_single_reply_queue_caller_on(
self, single_reply_queue_for_callee_off=False):
if not self.rpc:
self.skipTest('rpc driver not available.')
- # TODO(pekowski): Remove these lines in Havana where the option will be
- # removed and the default will be true.
- self.assertFalse(FLAGS.amqp_rpc_single_reply_queue)
- self.config(amqp_rpc_single_reply_queue=True)
-
self.orig_unpack_context = rpc_amqp.unpack_context
def my_unpack_context(conf, msg):
@@ -127,10 +122,7 @@ class BaseRpcAMQPTestCase(common.BaseRpcTestCase):
def _process_data(myself, message_data):
#with open('mylog', 'a') as f:
# f.write('my_process_data: ' + str(message_data) + '\n')
- if single_reply_queue_for_callee_off:
- self.assertTrue('_msg_id' not in message_data)
- else:
- self.assertTrue('_msg_id' in message_data)
+ self.assertTrue('_msg_id' in message_data)
self.ReplyProxy_was_called = True
super(MyReplyProxy, myself)._process_data(message_data)
@@ -158,70 +150,10 @@ class BaseRpcAMQPTestCase(common.BaseRpcTestCase):
self.stubs.UnsetAll()
self.conn.pool.reply_proxy = self.orig_reply_proxy
- # TODO(pekowski): Remove this line in Havana
- self.config(amqp_rpc_single_reply_queue=False)
-
- # TODO(pekowski): Unfortunately remove this test in Havana.
- # The amqp_rpc_single_reply_queue option will go away in Havana.
- # There will be no way to send a downlevel RPC in Havana, yet
- # Havana will be able to receive downlevel RPCs. We would
- # need a downlevel caller to test it.
- def test_single_reply_queue_off_no_ids(
- self, single_reply_queue_for_callee_on=False):
- if not self.rpc:
- self.skipTest('rpc driver not available.')
-
- self.assertFalse(FLAGS.amqp_rpc_single_reply_queue)
-
- def my_unpack_context(conf, msg):
- self.assertTrue('_reply_q' not in msg)
- if single_reply_queue_for_callee_on:
- self.config(amqp_rpc_single_reply_queue=True)
- return self.orig_unpack_context(conf, msg)
-
- self.orig_unpack_context = rpc_amqp.unpack_context
- self.stubs.Set(rpc_amqp, 'unpack_context', my_unpack_context)
-
- self.MulticallWaiter_call_was_called = False
-
- def my_MulticallWaiter_call(myself, data):
- #with open('mylog', 'a') as f:
- # f.write('my_MulticallWaiter_call: ' + str(data) + '\n')
- self.assertTrue('_reply_q' not in data)
- self.MulticallWaiter_call_was_called = True
- return self.orig_MulticallWaiter_call(myself, data)
-
- self.orig_MulticallWaiter_call = rpc_amqp.MulticallWaiter.__call__
- self.stubs.Set(rpc_amqp.MulticallWaiter, '__call__',
- my_MulticallWaiter_call)
-
- value = 42
- result = self.rpc.call(FLAGS, self.context, self.topic,
- {"method": "echo", "args": {"value": value}})
- self.assertEqual(value, result)
- self.assertTrue(self.MulticallWaiter_call_was_called)
-
- self.config(amqp_rpc_single_reply_queue=False)
- self.stubs.UnsetAll()
-
- # TODO(pekowski): Remove this test in Havana.
- def test_single_reply_queue_caller_off_callee_on(self):
- self.test_single_reply_queue_off_no_ids(
- single_reply_queue_for_callee_on=True)
-
def test_single_reply_queue_caller_on_callee_off(self):
- self.test_single_reply_queue_on_has_ids(
+ self.test_single_reply_queue_caller_on(
single_reply_queue_for_callee_off=True)
- #TODO(pekowski): remove this test in Havana
- def test_single_reply_queue_mt_resp_rting(self):
- if not self.rpc:
- self.skipTest('rpc driver not available.')
-
- self.config(amqp_rpc_single_reply_queue=True)
- self.test_multithreaded_resp_routing()
- self.config(amqp_rpc_single_reply_queue=False)
-
def test_duplicate_message_check(self):
"""Test sending *not-dict* to a topic exchange/queue."""
diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py
index 02e8e20..40f0e37 100644
--- a/tests/unit/rpc/test_qpid.py
+++ b/tests/unit/rpc/test_qpid.py
@@ -26,6 +26,7 @@ eventlet.monkey_patch()
import fixtures
import mox
from oslo.config import cfg
+import uuid
from openstack.common import context
from openstack.common import jsonutils
@@ -90,6 +91,13 @@ class RpcQpidTestCase(utils.BaseTestCase):
fixtures.MonkeyPatch('qpid.messaging.Receiver',
lambda *_x, **_y: self.mock_receiver))
+ self.uuid4 = uuid.uuid4()
+ self.useFixture(
+ fixtures.MonkeyPatch('uuid.uuid4', self.mock_uuid4))
+
+ def mock_uuid4(self):
+ return self.uuid4
+
def cleanUp(self):
if impl_qpid:
# Need to reset this in case we changed the connection_cls
@@ -267,6 +275,7 @@ class RpcQpidTestCase(utils.BaseTestCase):
# connection.
self.mock_session.close()
self.mock_connection.session().AndReturn(self.mock_session)
+ self.mock_connection.close()
self.mox.ReplayAll()
@@ -290,10 +299,8 @@ class RpcQpidTestCase(utils.BaseTestCase):
method(*args)
finally:
- while impl_qpid.Connection.pool.free_items:
- # Pull the mock connection object out of the connection pool so
- # that it doesn't mess up other test cases.
- impl_qpid.Connection.pool.get()
+ impl_qpid.cleanup()
+ self.uuid4 = uuid.uuid4()
def test_cast(self):
self._test_cast(fanout=False)
@@ -332,7 +339,7 @@ class RpcQpidTestCase(utils.BaseTestCase):
self._setup_to_server_tests(server_params)
self._test_cast(fanout=True, server_params=server_params)
- def _test_call(self, multi):
+ def _test_call_mock_common(self):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
self.mock_sender = self.mox.CreateMock(self.orig_sender)
@@ -349,38 +356,53 @@ class RpcQpidTestCase(utils.BaseTestCase):
'false}, "durable": true, "name": ".*"}}')
self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver)
self.mock_receiver.capacity = 1
+ self.mock_connection.opened().AndReturn(False)
+ self.mock_connection.open()
+ self.mock_connection.session().AndReturn(self.mock_session)
send_addr = (
'openstack/impl_qpid_test ; {"node": {"x-declare": '
'{"auto-delete": true, "durable": false}, "type": "topic"}, '
'"create": "always"}')
self.mock_session.sender(send_addr).AndReturn(self.mock_sender)
self.mock_sender.send(mox.IgnoreArg())
+ self.mock_session.close()
+ self.mock_connection.session().AndReturn(self.mock_session)
+
+ def _test_call(self, multi):
+ self._test_call_mock_common()
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
- {"result": "foo", "failure": False, "ending": False}))
+ {"_msg_id": self.uuid4.hex, "result": "foo", "failure": False,
+ "ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
if multi:
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(
- qpid.messaging.Message({"result": "bar", "failure": False,
+ qpid.messaging.Message({"_msg_id": self.uuid4.hex,
+ "result": "bar",
+ "failure": False,
"ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(
- qpid.messaging.Message({"result": "baz", "failure": False,
+ qpid.messaging.Message({"_msg_id": self.uuid4.hex,
+ "result": "baz",
+ "failure": False,
"ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
- {"failure": False, "ending": True}))
+ {"_msg_id": self.uuid4.hex, "failure": False, "ending": True}))
self.mock_session.acknowledge(mox.IgnoreArg())
- self.mock_session.close()
- self.mock_connection.session().AndReturn(self.mock_session)
+ # Normally the iterconsume() runs indefinitely, but we have to stop it
+ # here otherwise, the test won't end
+ self.mock_session.next_receiver(timeout=None).AndRaise(StopIteration)
+ self.mock_connection.close()
self.mox.ReplayAll()
@@ -400,59 +422,31 @@ class RpcQpidTestCase(utils.BaseTestCase):
else:
self.assertEquals(res, "foo")
finally:
- while impl_qpid.Connection.pool.free_items:
- # Pull the mock connection object out of the connection pool so
- # that it doesn't mess up other test cases.
- impl_qpid.Connection.pool.get()
+ impl_qpid.cleanup()
+ self.uuid4 = uuid.uuid4()
def test_call(self):
self._test_call(multi=False)
def _test_call_with_timeout(self, timeout, expect_failure):
- # TODO(beagles): should be possible to refactor this method and
- # _test_call to share common code. Maybe making the messages
- # and test checks parameters, etc.
- self.mock_connection = self.mox.CreateMock(self.orig_connection)
- self.mock_session = self.mox.CreateMock(self.orig_session)
- self.mock_sender = self.mox.CreateMock(self.orig_sender)
- self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
+ self._test_call_mock_common()
- self.mock_connection.opened().AndReturn(False)
- self.mock_connection.open()
- self.mock_connection.session().AndReturn(self.mock_session)
- rcv_addr = mox.Regex(
- r'^.*/.* ; {"node": {"x-declare": {"auto-delete":'
- ' true, "durable": true, "type": "direct"}, "type": '
- '"topic"}, "create": "always", "link": {"x-declare": '
- '{"auto-delete": true, "exclusive": true, "durable": '
- 'false}, "durable": true, "name": ".*"}}')
- self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver)
- self.mock_receiver.capacity = 1
- send_addr = (
- 'openstack/impl_qpid_test ; {"node": {"x-declare": '
- '{"auto-delete": true, "durable": false}, "type": "topic"}, '
- '"create": "always"}')
- self.mock_session.sender(send_addr).AndReturn(self.mock_sender)
- self.mock_sender.send(mox.IgnoreArg())
-
- if expect_failure:
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndRaise(
- qpid.messaging.exceptions.Empty())
- self.mock_session.close()
- self.mock_connection.session().AndReturn(self.mock_session)
- else:
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ if not expect_failure:
+ self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
- {"result": "foo", "failure": False, "ending": False}))
+ {"_msg_id": self.uuid4.hex, "result": "foo", "failure": False,
+ "ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
- {"failure": False, "ending": True}))
+ {"_msg_id": self.uuid4.hex, "failure": False, "ending": True}))
self.mock_session.acknowledge(mox.IgnoreArg())
- self.mock_session.close()
- self.mock_connection.session().AndReturn(self.mock_session)
+ # Normally the iterconsume() runs indefinitely, but we have to stop it
+ # here otherwise, the test won't end
+ self.mock_session.next_receiver(timeout=None).AndRaise(StopIteration)
+ self.mock_connection.close()
self.mox.ReplayAll()
@@ -473,10 +467,8 @@ class RpcQpidTestCase(utils.BaseTestCase):
{"method": "test_method", "args": {}}, timeout)
self.assertEquals(res, "foo")
finally:
- while impl_qpid.Connection.pool.free_items:
- # Pull the mock connection object out of the connection pool so
- # that it doesn't mess up other test cases.
- impl_qpid.Connection.pool.get()
+ impl_qpid.cleanup()
+ self.uuid4 = uuid.uuid4()
def test_call_with_timeout(self):
"""A little more indepth for a timeout test. Specifically we are
@@ -488,7 +480,7 @@ class RpcQpidTestCase(utils.BaseTestCase):
sufficient.
"""
self._test_call_with_timeout(timeout=5, expect_failure=False)
- self._test_call_with_timeout(timeout=0, expect_failure=True)
+ self._test_call_with_timeout(timeout=0.1, expect_failure=True)
def test_multicall(self):
self._test_call(multi=True)