summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-06-11 21:42:59 +0000
committerGerrit Code Review <review@openstack.org>2013-06-11 21:42:59 +0000
commitda554770974706ffa6f69f12d9051062cd0d0b80 (patch)
treed35b67c605fb6d63a343cc796116540982281f69
parentb13651d9142c7e928568a99b94bfad5ca2c99d31 (diff)
parent6d42ced0d6d73783e901d6c4f840d0f2465e31bf (diff)
downloadoslo-da554770974706ffa6f69f12d9051062cd0d0b80.tar.gz
oslo-da554770974706ffa6f69f12d9051062cd0d0b80.tar.xz
oslo-da554770974706ffa6f69f12d9051062cd0d0b80.zip
Merge "Remove the amqp_rpc_single_reply_queue option from Havana"
-rw-r--r--openstack/common/rpc/amqp.py100
-rw-r--r--tests/unit/rpc/amqp.py74
-rw-r--r--tests/unit/rpc/test_qpid.py114
3 files changed, 65 insertions, 223 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index 64ef582..22e01d7 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -34,10 +34,6 @@ from eventlet import greenpool
from eventlet import pools
from eventlet import queue
from eventlet import semaphore
-# TODO(pekowsk): Remove import cfg and below comment in Havana.
-# This import should no longer be needed when the amqp_rpc_single_reply_queue
-# option is removed.
-from oslo.config import cfg
from openstack.common import excutils
from openstack.common.gettextutils import _
@@ -46,16 +42,6 @@ from openstack.common import log as logging
from openstack.common.rpc import common as rpc_common
-# TODO(pekowski): Remove this option in Havana.
-amqp_opts = [
- cfg.BoolOpt('amqp_rpc_single_reply_queue',
- default=False,
- help='Enable a fast single reply queue if using AMQP based '
- 'RPC like RabbitMQ or Qpid.'),
-]
-
-cfg.CONF.register_opts(amqp_opts)
-
UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)
@@ -83,7 +69,7 @@ class Pool(pools.Pool):
# is the above "while loop" gets all the cached connections from the
# pool and closes them, but never returns them to the pool, a pool
# leak. The unit tests hang waiting for an item to be returned to the
- # pool. The unit tests get here via the teatDown() method. In the run
+ # pool. The unit tests get here via the tearDown() method. In the run
# time code, it gets here via cleanup() and only appears in service.py
# just before doing a sys.exit(), so cleanup() only happens once and
# the leakage is not a problem.
@@ -197,9 +183,10 @@ class ReplyProxy(ConnectionContext):
msg_id = message_data.pop('_msg_id', None)
waiter = self._call_waiters.get(msg_id)
if not waiter:
- LOG.warn(_('no calling threads waiting for msg_id : %(msg_id)s'
+ LOG.warn(_('No calling threads waiting for msg_id : %(msg_id)s'
', message : %(data)s'), {'msg_id': msg_id,
'data': message_data})
+ LOG.warn(_('_call_waiters: %s') % str(self._call_waiters))
else:
waiter.put(message_data)
@@ -516,59 +503,6 @@ class MulticallProxyWaiter(object):
yield result
-#TODO(pekowski): Remove MulticallWaiter() in Havana.
-class MulticallWaiter(object):
- def __init__(self, conf, connection, timeout):
- self._connection = connection
- self._iterator = connection.iterconsume(timeout=timeout or
- conf.rpc_response_timeout)
- self._result = None
- self._done = False
- self._got_ending = False
- self._conf = conf
- self.msg_id_cache = _MsgIdCache()
-
- def done(self):
- if self._done:
- return
- self._done = True
- self._iterator.close()
- self._iterator = None
- self._connection.close()
-
- def __call__(self, data):
- """The consume() callback will call this. Store the result."""
- self.msg_id_cache.check_duplicate_message(data)
- if data['failure']:
- failure = data['failure']
- self._result = rpc_common.deserialize_remote_exception(self._conf,
- failure)
-
- elif data.get('ending', False):
- self._got_ending = True
- else:
- self._result = data['result']
-
- def __iter__(self):
- """Return a result until we get a 'None' response from consumer"""
- if self._done:
- raise StopIteration
- while True:
- try:
- self._iterator.next()
- except Exception:
- with excutils.save_and_reraise_exception():
- self.done()
- if self._got_ending:
- self.done()
- raise StopIteration
- result = self._result
- if isinstance(result, Exception):
- self.done()
- raise result
- yield result
-
-
def create_connection(conf, new, connection_pool):
"""Create a connection."""
return ConnectionContext(conf, connection_pool, pooled=not new)
@@ -579,14 +513,6 @@ _reply_proxy_create_sem = semaphore.Semaphore()
def multicall(conf, context, topic, msg, timeout, connection_pool):
"""Make a call that returns multiple times."""
- # TODO(pekowski): Remove all these comments in Havana.
- # For amqp_rpc_single_reply_queue = False,
- # Can't use 'with' for multicall, as it returns an iterator
- # that will continue to use the connection. When it's done,
- # connection.close() will get called which will put it back into
- # the pool
- # For amqp_rpc_single_reply_queue = True,
- # The 'with' statement is mandatory for closing the connection
LOG.debug(_('Making synchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
@@ -594,21 +520,13 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
_add_unique_id(msg)
pack_context(msg, context)
- # TODO(pekowski): Remove this flag and the code under the if clause
- # in Havana.
- if not conf.amqp_rpc_single_reply_queue:
- conn = ConnectionContext(conf, connection_pool)
- wait_msg = MulticallWaiter(conf, conn, timeout)
- conn.declare_direct_consumer(msg_id, wait_msg)
+ with _reply_proxy_create_sem:
+ if not connection_pool.reply_proxy:
+ connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
+ msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
+ wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
+ with ConnectionContext(conf, connection_pool) as conn:
conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
- else:
- with _reply_proxy_create_sem:
- if not connection_pool.reply_proxy:
- connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
- msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
- wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
- with ConnectionContext(conf, connection_pool) as conn:
- conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
return wait_msg
diff --git a/tests/unit/rpc/amqp.py b/tests/unit/rpc/amqp.py
index 432dd35..83713c7 100644
--- a/tests/unit/rpc/amqp.py
+++ b/tests/unit/rpc/amqp.py
@@ -97,16 +97,11 @@ class BaseRpcAMQPTestCase(common.BaseRpcTestCase):
# Make sure the msg envelope was applied
self.assertTrue('oslo.version' in self.test_msg)
- def test_single_reply_queue_on_has_ids(
+ def test_single_reply_queue_caller_on(
self, single_reply_queue_for_callee_off=False):
if not self.rpc:
self.skipTest('rpc driver not available.')
- # TODO(pekowski): Remove these lines in Havana where the option will be
- # removed and the default will be true.
- self.assertFalse(FLAGS.amqp_rpc_single_reply_queue)
- self.config(amqp_rpc_single_reply_queue=True)
-
self.orig_unpack_context = rpc_amqp.unpack_context
def my_unpack_context(conf, msg):
@@ -127,10 +122,7 @@ class BaseRpcAMQPTestCase(common.BaseRpcTestCase):
def _process_data(myself, message_data):
#with open('mylog', 'a') as f:
# f.write('my_process_data: ' + str(message_data) + '\n')
- if single_reply_queue_for_callee_off:
- self.assertTrue('_msg_id' not in message_data)
- else:
- self.assertTrue('_msg_id' in message_data)
+ self.assertTrue('_msg_id' in message_data)
self.ReplyProxy_was_called = True
super(MyReplyProxy, myself)._process_data(message_data)
@@ -158,70 +150,10 @@ class BaseRpcAMQPTestCase(common.BaseRpcTestCase):
self.stubs.UnsetAll()
self.conn.pool.reply_proxy = self.orig_reply_proxy
- # TODO(pekowski): Remove this line in Havana
- self.config(amqp_rpc_single_reply_queue=False)
-
- # TODO(pekowski): Unfortunately remove this test in Havana.
- # The amqp_rpc_single_reply_queue option will go away in Havana.
- # There will be no way to send a downlevel RPC in Havana, yet
- # Havana will be able to receive downlevel RPCs. We would
- # need a downlevel caller to test it.
- def test_single_reply_queue_off_no_ids(
- self, single_reply_queue_for_callee_on=False):
- if not self.rpc:
- self.skipTest('rpc driver not available.')
-
- self.assertFalse(FLAGS.amqp_rpc_single_reply_queue)
-
- def my_unpack_context(conf, msg):
- self.assertTrue('_reply_q' not in msg)
- if single_reply_queue_for_callee_on:
- self.config(amqp_rpc_single_reply_queue=True)
- return self.orig_unpack_context(conf, msg)
-
- self.orig_unpack_context = rpc_amqp.unpack_context
- self.stubs.Set(rpc_amqp, 'unpack_context', my_unpack_context)
-
- self.MulticallWaiter_call_was_called = False
-
- def my_MulticallWaiter_call(myself, data):
- #with open('mylog', 'a') as f:
- # f.write('my_MulticallWaiter_call: ' + str(data) + '\n')
- self.assertTrue('_reply_q' not in data)
- self.MulticallWaiter_call_was_called = True
- return self.orig_MulticallWaiter_call(myself, data)
-
- self.orig_MulticallWaiter_call = rpc_amqp.MulticallWaiter.__call__
- self.stubs.Set(rpc_amqp.MulticallWaiter, '__call__',
- my_MulticallWaiter_call)
-
- value = 42
- result = self.rpc.call(FLAGS, self.context, self.topic,
- {"method": "echo", "args": {"value": value}})
- self.assertEqual(value, result)
- self.assertTrue(self.MulticallWaiter_call_was_called)
-
- self.config(amqp_rpc_single_reply_queue=False)
- self.stubs.UnsetAll()
-
- # TODO(pekowski): Remove this test in Havana.
- def test_single_reply_queue_caller_off_callee_on(self):
- self.test_single_reply_queue_off_no_ids(
- single_reply_queue_for_callee_on=True)
-
def test_single_reply_queue_caller_on_callee_off(self):
- self.test_single_reply_queue_on_has_ids(
+ self.test_single_reply_queue_caller_on(
single_reply_queue_for_callee_off=True)
- #TODO(pekowski): remove this test in Havana
- def test_single_reply_queue_mt_resp_rting(self):
- if not self.rpc:
- self.skipTest('rpc driver not available.')
-
- self.config(amqp_rpc_single_reply_queue=True)
- self.test_multithreaded_resp_routing()
- self.config(amqp_rpc_single_reply_queue=False)
-
def test_duplicate_message_check(self):
"""Test sending *not-dict* to a topic exchange/queue."""
diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py
index 42a6e6b..0bad387 100644
--- a/tests/unit/rpc/test_qpid.py
+++ b/tests/unit/rpc/test_qpid.py
@@ -26,6 +26,7 @@ eventlet.monkey_patch()
import fixtures
import mox
from oslo.config import cfg
+import uuid
from openstack.common import context
from openstack.common import jsonutils
@@ -89,6 +90,13 @@ class RpcQpidTestCase(utils.BaseTestCase):
fixtures.MonkeyPatch('qpid.messaging.Receiver',
lambda *_x, **_y: self.mock_receiver))
+ self.uuid4 = uuid.uuid4()
+ self.useFixture(
+ fixtures.MonkeyPatch('uuid.uuid4', self.mock_uuid4))
+
+ def mock_uuid4(self):
+ return self.uuid4
+
def cleanUp(self):
if impl_qpid:
# Need to reset this in case we changed the connection_cls
@@ -266,6 +274,7 @@ class RpcQpidTestCase(utils.BaseTestCase):
# connection.
self.mock_session.close()
self.mock_connection.session().AndReturn(self.mock_session)
+ self.mock_connection.close()
self.mox.ReplayAll()
@@ -289,10 +298,8 @@ class RpcQpidTestCase(utils.BaseTestCase):
method(*args)
finally:
- while impl_qpid.Connection.pool.free_items:
- # Pull the mock connection object out of the connection pool so
- # that it doesn't mess up other test cases.
- impl_qpid.Connection.pool.get()
+ impl_qpid.cleanup()
+ self.uuid4 = uuid.uuid4()
def test_cast(self):
self._test_cast(fanout=False)
@@ -331,7 +338,7 @@ class RpcQpidTestCase(utils.BaseTestCase):
self._setup_to_server_tests(server_params)
self._test_cast(fanout=True, server_params=server_params)
- def _test_call(self, multi):
+ def _test_call_mock_common(self):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
self.mock_sender = self.mox.CreateMock(self.orig_sender)
@@ -348,38 +355,53 @@ class RpcQpidTestCase(utils.BaseTestCase):
'false}, "durable": true, "name": ".*"}}')
self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver)
self.mock_receiver.capacity = 1
+ self.mock_connection.opened().AndReturn(False)
+ self.mock_connection.open()
+ self.mock_connection.session().AndReturn(self.mock_session)
send_addr = (
'openstack/impl_qpid_test ; {"node": {"x-declare": '
'{"auto-delete": true, "durable": false}, "type": "topic"}, '
'"create": "always"}')
self.mock_session.sender(send_addr).AndReturn(self.mock_sender)
self.mock_sender.send(mox.IgnoreArg())
+ self.mock_session.close()
+ self.mock_connection.session().AndReturn(self.mock_session)
+
+ def _test_call(self, multi):
+ self._test_call_mock_common()
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
- {"result": "foo", "failure": False, "ending": False}))
+ {"_msg_id": self.uuid4.hex, "result": "foo", "failure": False,
+ "ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
if multi:
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(
- qpid.messaging.Message({"result": "bar", "failure": False,
+ qpid.messaging.Message({"_msg_id": self.uuid4.hex,
+ "result": "bar",
+ "failure": False,
"ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(
- qpid.messaging.Message({"result": "baz", "failure": False,
+ qpid.messaging.Message({"_msg_id": self.uuid4.hex,
+ "result": "baz",
+ "failure": False,
"ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
- {"failure": False, "ending": True}))
+ {"_msg_id": self.uuid4.hex, "failure": False, "ending": True}))
self.mock_session.acknowledge(mox.IgnoreArg())
- self.mock_session.close()
- self.mock_connection.session().AndReturn(self.mock_session)
+ # Normally the iterconsume() runs indefinitely, but we have to stop it
+ # here otherwise, the test won't end
+ self.mock_session.next_receiver(timeout=None).AndRaise(StopIteration)
+ self.mock_connection.close()
self.mox.ReplayAll()
@@ -399,59 +421,31 @@ class RpcQpidTestCase(utils.BaseTestCase):
else:
self.assertEquals(res, "foo")
finally:
- while impl_qpid.Connection.pool.free_items:
- # Pull the mock connection object out of the connection pool so
- # that it doesn't mess up other test cases.
- impl_qpid.Connection.pool.get()
+ impl_qpid.cleanup()
+ self.uuid4 = uuid.uuid4()
def test_call(self):
self._test_call(multi=False)
def _test_call_with_timeout(self, timeout, expect_failure):
- # TODO(beagles): should be possible to refactor this method and
- # _test_call to share common code. Maybe making the messages
- # and test checks parameters, etc.
- self.mock_connection = self.mox.CreateMock(self.orig_connection)
- self.mock_session = self.mox.CreateMock(self.orig_session)
- self.mock_sender = self.mox.CreateMock(self.orig_sender)
- self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
+ self._test_call_mock_common()
- self.mock_connection.opened().AndReturn(False)
- self.mock_connection.open()
- self.mock_connection.session().AndReturn(self.mock_session)
- rcv_addr = mox.Regex(
- r'^.*/.* ; {"node": {"x-declare": {"auto-delete":'
- ' true, "durable": true, "type": "direct"}, "type": '
- '"topic"}, "create": "always", "link": {"x-declare": '
- '{"auto-delete": true, "exclusive": true, "durable": '
- 'false}, "durable": true, "name": ".*"}}')
- self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver)
- self.mock_receiver.capacity = 1
- send_addr = (
- 'openstack/impl_qpid_test ; {"node": {"x-declare": '
- '{"auto-delete": true, "durable": false}, "type": "topic"}, '
- '"create": "always"}')
- self.mock_session.sender(send_addr).AndReturn(self.mock_sender)
- self.mock_sender.send(mox.IgnoreArg())
-
- if expect_failure:
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndRaise(
- qpid.messaging.exceptions.Empty())
- self.mock_session.close()
- self.mock_connection.session().AndReturn(self.mock_session)
- else:
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ if not expect_failure:
+ self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
- {"result": "foo", "failure": False, "ending": False}))
+ {"_msg_id": self.uuid4.hex, "result": "foo", "failure": False,
+ "ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
- {"failure": False, "ending": True}))
+ {"_msg_id": self.uuid4.hex, "failure": False, "ending": True}))
self.mock_session.acknowledge(mox.IgnoreArg())
- self.mock_session.close()
- self.mock_connection.session().AndReturn(self.mock_session)
+ # Normally the iterconsume() runs indefinitely, but we have to stop it
+ # here otherwise, the test won't end
+ self.mock_session.next_receiver(timeout=None).AndRaise(StopIteration)
+ self.mock_connection.close()
self.mox.ReplayAll()
@@ -472,10 +466,8 @@ class RpcQpidTestCase(utils.BaseTestCase):
{"method": "test_method", "args": {}}, timeout)
self.assertEquals(res, "foo")
finally:
- while impl_qpid.Connection.pool.free_items:
- # Pull the mock connection object out of the connection pool so
- # that it doesn't mess up other test cases.
- impl_qpid.Connection.pool.get()
+ impl_qpid.cleanup()
+ self.uuid4 = uuid.uuid4()
def test_call_with_timeout(self):
"""A little more indepth for a timeout test.
@@ -488,7 +480,7 @@ class RpcQpidTestCase(utils.BaseTestCase):
sufficient.
"""
self._test_call_with_timeout(timeout=5, expect_failure=False)
- self._test_call_with_timeout(timeout=0, expect_failure=True)
+ self._test_call_with_timeout(timeout=0.1, expect_failure=True)
def test_multicall(self):
self._test_call(multi=True)