diff options
author | Jenkins <jenkins@review.openstack.org> | 2013-06-11 21:42:59 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2013-06-11 21:42:59 +0000 |
commit | da554770974706ffa6f69f12d9051062cd0d0b80 (patch) | |
tree | d35b67c605fb6d63a343cc796116540982281f69 | |
parent | b13651d9142c7e928568a99b94bfad5ca2c99d31 (diff) | |
parent | 6d42ced0d6d73783e901d6c4f840d0f2465e31bf (diff) | |
download | oslo-da554770974706ffa6f69f12d9051062cd0d0b80.tar.gz oslo-da554770974706ffa6f69f12d9051062cd0d0b80.tar.xz oslo-da554770974706ffa6f69f12d9051062cd0d0b80.zip |
Merge "Remove the amqp_rpc_single_reply_queue option from Havana"
-rw-r--r-- | openstack/common/rpc/amqp.py | 100 | ||||
-rw-r--r-- | tests/unit/rpc/amqp.py | 74 | ||||
-rw-r--r-- | tests/unit/rpc/test_qpid.py | 114 |
3 files changed, 65 insertions, 223 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index 64ef582..22e01d7 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -34,10 +34,6 @@ from eventlet import greenpool from eventlet import pools from eventlet import queue from eventlet import semaphore -# TODO(pekowsk): Remove import cfg and below comment in Havana. -# This import should no longer be needed when the amqp_rpc_single_reply_queue -# option is removed. -from oslo.config import cfg from openstack.common import excutils from openstack.common.gettextutils import _ @@ -46,16 +42,6 @@ from openstack.common import log as logging from openstack.common.rpc import common as rpc_common -# TODO(pekowski): Remove this option in Havana. -amqp_opts = [ - cfg.BoolOpt('amqp_rpc_single_reply_queue', - default=False, - help='Enable a fast single reply queue if using AMQP based ' - 'RPC like RabbitMQ or Qpid.'), -] - -cfg.CONF.register_opts(amqp_opts) - UNIQUE_ID = '_unique_id' LOG = logging.getLogger(__name__) @@ -83,7 +69,7 @@ class Pool(pools.Pool): # is the above "while loop" gets all the cached connections from the # pool and closes them, but never returns them to the pool, a pool # leak. The unit tests hang waiting for an item to be returned to the - # pool. The unit tests get here via the teatDown() method. In the run + # pool. The unit tests get here via the tearDown() method. In the run # time code, it gets here via cleanup() and only appears in service.py # just before doing a sys.exit(), so cleanup() only happens once and # the leakage is not a problem. @@ -197,9 +183,10 @@ class ReplyProxy(ConnectionContext): msg_id = message_data.pop('_msg_id', None) waiter = self._call_waiters.get(msg_id) if not waiter: - LOG.warn(_('no calling threads waiting for msg_id : %(msg_id)s' + LOG.warn(_('No calling threads waiting for msg_id : %(msg_id)s' ', message : %(data)s'), {'msg_id': msg_id, 'data': message_data}) + LOG.warn(_('_call_waiters: %s') % str(self._call_waiters)) else: waiter.put(message_data) @@ -516,59 +503,6 @@ class MulticallProxyWaiter(object): yield result -#TODO(pekowski): Remove MulticallWaiter() in Havana. -class MulticallWaiter(object): - def __init__(self, conf, connection, timeout): - self._connection = connection - self._iterator = connection.iterconsume(timeout=timeout or - conf.rpc_response_timeout) - self._result = None - self._done = False - self._got_ending = False - self._conf = conf - self.msg_id_cache = _MsgIdCache() - - def done(self): - if self._done: - return - self._done = True - self._iterator.close() - self._iterator = None - self._connection.close() - - def __call__(self, data): - """The consume() callback will call this. Store the result.""" - self.msg_id_cache.check_duplicate_message(data) - if data['failure']: - failure = data['failure'] - self._result = rpc_common.deserialize_remote_exception(self._conf, - failure) - - elif data.get('ending', False): - self._got_ending = True - else: - self._result = data['result'] - - def __iter__(self): - """Return a result until we get a 'None' response from consumer""" - if self._done: - raise StopIteration - while True: - try: - self._iterator.next() - except Exception: - with excutils.save_and_reraise_exception(): - self.done() - if self._got_ending: - self.done() - raise StopIteration - result = self._result - if isinstance(result, Exception): - self.done() - raise result - yield result - - def create_connection(conf, new, connection_pool): """Create a connection.""" return ConnectionContext(conf, connection_pool, pooled=not new) @@ -579,14 +513,6 @@ _reply_proxy_create_sem = semaphore.Semaphore() def multicall(conf, context, topic, msg, timeout, connection_pool): """Make a call that returns multiple times.""" - # TODO(pekowski): Remove all these comments in Havana. - # For amqp_rpc_single_reply_queue = False, - # Can't use 'with' for multicall, as it returns an iterator - # that will continue to use the connection. When it's done, - # connection.close() will get called which will put it back into - # the pool - # For amqp_rpc_single_reply_queue = True, - # The 'with' statement is mandatory for closing the connection LOG.debug(_('Making synchronous call on %s ...'), topic) msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) @@ -594,21 +520,13 @@ def multicall(conf, context, topic, msg, timeout, connection_pool): _add_unique_id(msg) pack_context(msg, context) - # TODO(pekowski): Remove this flag and the code under the if clause - # in Havana. - if not conf.amqp_rpc_single_reply_queue: - conn = ConnectionContext(conf, connection_pool) - wait_msg = MulticallWaiter(conf, conn, timeout) - conn.declare_direct_consumer(msg_id, wait_msg) + with _reply_proxy_create_sem: + if not connection_pool.reply_proxy: + connection_pool.reply_proxy = ReplyProxy(conf, connection_pool) + msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()}) + wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool) + with ConnectionContext(conf, connection_pool) as conn: conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout) - else: - with _reply_proxy_create_sem: - if not connection_pool.reply_proxy: - connection_pool.reply_proxy = ReplyProxy(conf, connection_pool) - msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()}) - wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool) - with ConnectionContext(conf, connection_pool) as conn: - conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout) return wait_msg diff --git a/tests/unit/rpc/amqp.py b/tests/unit/rpc/amqp.py index 432dd35..83713c7 100644 --- a/tests/unit/rpc/amqp.py +++ b/tests/unit/rpc/amqp.py @@ -97,16 +97,11 @@ class BaseRpcAMQPTestCase(common.BaseRpcTestCase): # Make sure the msg envelope was applied self.assertTrue('oslo.version' in self.test_msg) - def test_single_reply_queue_on_has_ids( + def test_single_reply_queue_caller_on( self, single_reply_queue_for_callee_off=False): if not self.rpc: self.skipTest('rpc driver not available.') - # TODO(pekowski): Remove these lines in Havana where the option will be - # removed and the default will be true. - self.assertFalse(FLAGS.amqp_rpc_single_reply_queue) - self.config(amqp_rpc_single_reply_queue=True) - self.orig_unpack_context = rpc_amqp.unpack_context def my_unpack_context(conf, msg): @@ -127,10 +122,7 @@ class BaseRpcAMQPTestCase(common.BaseRpcTestCase): def _process_data(myself, message_data): #with open('mylog', 'a') as f: # f.write('my_process_data: ' + str(message_data) + '\n') - if single_reply_queue_for_callee_off: - self.assertTrue('_msg_id' not in message_data) - else: - self.assertTrue('_msg_id' in message_data) + self.assertTrue('_msg_id' in message_data) self.ReplyProxy_was_called = True super(MyReplyProxy, myself)._process_data(message_data) @@ -158,70 +150,10 @@ class BaseRpcAMQPTestCase(common.BaseRpcTestCase): self.stubs.UnsetAll() self.conn.pool.reply_proxy = self.orig_reply_proxy - # TODO(pekowski): Remove this line in Havana - self.config(amqp_rpc_single_reply_queue=False) - - # TODO(pekowski): Unfortunately remove this test in Havana. - # The amqp_rpc_single_reply_queue option will go away in Havana. - # There will be no way to send a downlevel RPC in Havana, yet - # Havana will be able to receive downlevel RPCs. We would - # need a downlevel caller to test it. - def test_single_reply_queue_off_no_ids( - self, single_reply_queue_for_callee_on=False): - if not self.rpc: - self.skipTest('rpc driver not available.') - - self.assertFalse(FLAGS.amqp_rpc_single_reply_queue) - - def my_unpack_context(conf, msg): - self.assertTrue('_reply_q' not in msg) - if single_reply_queue_for_callee_on: - self.config(amqp_rpc_single_reply_queue=True) - return self.orig_unpack_context(conf, msg) - - self.orig_unpack_context = rpc_amqp.unpack_context - self.stubs.Set(rpc_amqp, 'unpack_context', my_unpack_context) - - self.MulticallWaiter_call_was_called = False - - def my_MulticallWaiter_call(myself, data): - #with open('mylog', 'a') as f: - # f.write('my_MulticallWaiter_call: ' + str(data) + '\n') - self.assertTrue('_reply_q' not in data) - self.MulticallWaiter_call_was_called = True - return self.orig_MulticallWaiter_call(myself, data) - - self.orig_MulticallWaiter_call = rpc_amqp.MulticallWaiter.__call__ - self.stubs.Set(rpc_amqp.MulticallWaiter, '__call__', - my_MulticallWaiter_call) - - value = 42 - result = self.rpc.call(FLAGS, self.context, self.topic, - {"method": "echo", "args": {"value": value}}) - self.assertEqual(value, result) - self.assertTrue(self.MulticallWaiter_call_was_called) - - self.config(amqp_rpc_single_reply_queue=False) - self.stubs.UnsetAll() - - # TODO(pekowski): Remove this test in Havana. - def test_single_reply_queue_caller_off_callee_on(self): - self.test_single_reply_queue_off_no_ids( - single_reply_queue_for_callee_on=True) - def test_single_reply_queue_caller_on_callee_off(self): - self.test_single_reply_queue_on_has_ids( + self.test_single_reply_queue_caller_on( single_reply_queue_for_callee_off=True) - #TODO(pekowski): remove this test in Havana - def test_single_reply_queue_mt_resp_rting(self): - if not self.rpc: - self.skipTest('rpc driver not available.') - - self.config(amqp_rpc_single_reply_queue=True) - self.test_multithreaded_resp_routing() - self.config(amqp_rpc_single_reply_queue=False) - def test_duplicate_message_check(self): """Test sending *not-dict* to a topic exchange/queue.""" diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py index 42a6e6b..0bad387 100644 --- a/tests/unit/rpc/test_qpid.py +++ b/tests/unit/rpc/test_qpid.py @@ -26,6 +26,7 @@ eventlet.monkey_patch() import fixtures import mox from oslo.config import cfg +import uuid from openstack.common import context from openstack.common import jsonutils @@ -89,6 +90,13 @@ class RpcQpidTestCase(utils.BaseTestCase): fixtures.MonkeyPatch('qpid.messaging.Receiver', lambda *_x, **_y: self.mock_receiver)) + self.uuid4 = uuid.uuid4() + self.useFixture( + fixtures.MonkeyPatch('uuid.uuid4', self.mock_uuid4)) + + def mock_uuid4(self): + return self.uuid4 + def cleanUp(self): if impl_qpid: # Need to reset this in case we changed the connection_cls @@ -266,6 +274,7 @@ class RpcQpidTestCase(utils.BaseTestCase): # connection. self.mock_session.close() self.mock_connection.session().AndReturn(self.mock_session) + self.mock_connection.close() self.mox.ReplayAll() @@ -289,10 +298,8 @@ class RpcQpidTestCase(utils.BaseTestCase): method(*args) finally: - while impl_qpid.Connection.pool.free_items: - # Pull the mock connection object out of the connection pool so - # that it doesn't mess up other test cases. - impl_qpid.Connection.pool.get() + impl_qpid.cleanup() + self.uuid4 = uuid.uuid4() def test_cast(self): self._test_cast(fanout=False) @@ -331,7 +338,7 @@ class RpcQpidTestCase(utils.BaseTestCase): self._setup_to_server_tests(server_params) self._test_cast(fanout=True, server_params=server_params) - def _test_call(self, multi): + def _test_call_mock_common(self): self.mock_connection = self.mox.CreateMock(self.orig_connection) self.mock_session = self.mox.CreateMock(self.orig_session) self.mock_sender = self.mox.CreateMock(self.orig_sender) @@ -348,38 +355,53 @@ class RpcQpidTestCase(utils.BaseTestCase): 'false}, "durable": true, "name": ".*"}}') self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver) self.mock_receiver.capacity = 1 + self.mock_connection.opened().AndReturn(False) + self.mock_connection.open() + self.mock_connection.session().AndReturn(self.mock_session) send_addr = ( 'openstack/impl_qpid_test ; {"node": {"x-declare": ' '{"auto-delete": true, "durable": false}, "type": "topic"}, ' '"create": "always"}') self.mock_session.sender(send_addr).AndReturn(self.mock_sender) self.mock_sender.send(mox.IgnoreArg()) + self.mock_session.close() + self.mock_connection.session().AndReturn(self.mock_session) + + def _test_call(self, multi): + self._test_call_mock_common() - self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( + self.mock_session.next_receiver(timeout=None).AndReturn( self.mock_receiver) self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( - {"result": "foo", "failure": False, "ending": False})) + {"_msg_id": self.uuid4.hex, "result": "foo", "failure": False, + "ending": False})) self.mock_session.acknowledge(mox.IgnoreArg()) if multi: - self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( + self.mock_session.next_receiver(timeout=None).AndReturn( self.mock_receiver) self.mock_receiver.fetch().AndReturn( - qpid.messaging.Message({"result": "bar", "failure": False, + qpid.messaging.Message({"_msg_id": self.uuid4.hex, + "result": "bar", + "failure": False, "ending": False})) self.mock_session.acknowledge(mox.IgnoreArg()) - self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( + self.mock_session.next_receiver(timeout=None).AndReturn( self.mock_receiver) self.mock_receiver.fetch().AndReturn( - qpid.messaging.Message({"result": "baz", "failure": False, + qpid.messaging.Message({"_msg_id": self.uuid4.hex, + "result": "baz", + "failure": False, "ending": False})) self.mock_session.acknowledge(mox.IgnoreArg()) - self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( + self.mock_session.next_receiver(timeout=None).AndReturn( self.mock_receiver) self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( - {"failure": False, "ending": True})) + {"_msg_id": self.uuid4.hex, "failure": False, "ending": True})) self.mock_session.acknowledge(mox.IgnoreArg()) - self.mock_session.close() - self.mock_connection.session().AndReturn(self.mock_session) + # Normally the iterconsume() runs indefinitely, but we have to stop it + # here otherwise, the test won't end + self.mock_session.next_receiver(timeout=None).AndRaise(StopIteration) + self.mock_connection.close() self.mox.ReplayAll() @@ -399,59 +421,31 @@ class RpcQpidTestCase(utils.BaseTestCase): else: self.assertEquals(res, "foo") finally: - while impl_qpid.Connection.pool.free_items: - # Pull the mock connection object out of the connection pool so - # that it doesn't mess up other test cases. - impl_qpid.Connection.pool.get() + impl_qpid.cleanup() + self.uuid4 = uuid.uuid4() def test_call(self): self._test_call(multi=False) def _test_call_with_timeout(self, timeout, expect_failure): - # TODO(beagles): should be possible to refactor this method and - # _test_call to share common code. Maybe making the messages - # and test checks parameters, etc. - self.mock_connection = self.mox.CreateMock(self.orig_connection) - self.mock_session = self.mox.CreateMock(self.orig_session) - self.mock_sender = self.mox.CreateMock(self.orig_sender) - self.mock_receiver = self.mox.CreateMock(self.orig_receiver) + self._test_call_mock_common() - self.mock_connection.opened().AndReturn(False) - self.mock_connection.open() - self.mock_connection.session().AndReturn(self.mock_session) - rcv_addr = mox.Regex( - r'^.*/.* ; {"node": {"x-declare": {"auto-delete":' - ' true, "durable": true, "type": "direct"}, "type": ' - '"topic"}, "create": "always", "link": {"x-declare": ' - '{"auto-delete": true, "exclusive": true, "durable": ' - 'false}, "durable": true, "name": ".*"}}') - self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver) - self.mock_receiver.capacity = 1 - send_addr = ( - 'openstack/impl_qpid_test ; {"node": {"x-declare": ' - '{"auto-delete": true, "durable": false}, "type": "topic"}, ' - '"create": "always"}') - self.mock_session.sender(send_addr).AndReturn(self.mock_sender) - self.mock_sender.send(mox.IgnoreArg()) - - if expect_failure: - self.mock_session.next_receiver(timeout=mox.IsA(int)).AndRaise( - qpid.messaging.exceptions.Empty()) - self.mock_session.close() - self.mock_connection.session().AndReturn(self.mock_session) - else: - self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( + if not expect_failure: + self.mock_session.next_receiver(timeout=None).AndReturn( self.mock_receiver) self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( - {"result": "foo", "failure": False, "ending": False})) + {"_msg_id": self.uuid4.hex, "result": "foo", "failure": False, + "ending": False})) self.mock_session.acknowledge(mox.IgnoreArg()) - self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( + self.mock_session.next_receiver(timeout=None).AndReturn( self.mock_receiver) self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( - {"failure": False, "ending": True})) + {"_msg_id": self.uuid4.hex, "failure": False, "ending": True})) self.mock_session.acknowledge(mox.IgnoreArg()) - self.mock_session.close() - self.mock_connection.session().AndReturn(self.mock_session) + # Normally the iterconsume() runs indefinitely, but we have to stop it + # here otherwise, the test won't end + self.mock_session.next_receiver(timeout=None).AndRaise(StopIteration) + self.mock_connection.close() self.mox.ReplayAll() @@ -472,10 +466,8 @@ class RpcQpidTestCase(utils.BaseTestCase): {"method": "test_method", "args": {}}, timeout) self.assertEquals(res, "foo") finally: - while impl_qpid.Connection.pool.free_items: - # Pull the mock connection object out of the connection pool so - # that it doesn't mess up other test cases. - impl_qpid.Connection.pool.get() + impl_qpid.cleanup() + self.uuid4 = uuid.uuid4() def test_call_with_timeout(self): """A little more indepth for a timeout test. @@ -488,7 +480,7 @@ class RpcQpidTestCase(utils.BaseTestCase): sufficient. """ self._test_call_with_timeout(timeout=5, expect_failure=False) - self._test_call_with_timeout(timeout=0, expect_failure=True) + self._test_call_with_timeout(timeout=0.1, expect_failure=True) def test_multicall(self): self._test_call(multi=True) |