diff options
author | Jenkins <jenkins@review.openstack.org> | 2013-06-11 21:42:59 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2013-06-11 21:42:59 +0000 |
commit | da554770974706ffa6f69f12d9051062cd0d0b80 (patch) | |
tree | d35b67c605fb6d63a343cc796116540982281f69 /tests | |
parent | b13651d9142c7e928568a99b94bfad5ca2c99d31 (diff) | |
parent | 6d42ced0d6d73783e901d6c4f840d0f2465e31bf (diff) | |
download | oslo-da554770974706ffa6f69f12d9051062cd0d0b80.tar.gz oslo-da554770974706ffa6f69f12d9051062cd0d0b80.tar.xz oslo-da554770974706ffa6f69f12d9051062cd0d0b80.zip |
Merge "Remove the amqp_rpc_single_reply_queue option from Havana"
Diffstat (limited to 'tests')
-rw-r--r-- | tests/unit/rpc/amqp.py | 74 | ||||
-rw-r--r-- | tests/unit/rpc/test_qpid.py | 114 |
2 files changed, 56 insertions, 132 deletions
diff --git a/tests/unit/rpc/amqp.py b/tests/unit/rpc/amqp.py index 432dd35..83713c7 100644 --- a/tests/unit/rpc/amqp.py +++ b/tests/unit/rpc/amqp.py @@ -97,16 +97,11 @@ class BaseRpcAMQPTestCase(common.BaseRpcTestCase): # Make sure the msg envelope was applied self.assertTrue('oslo.version' in self.test_msg) - def test_single_reply_queue_on_has_ids( + def test_single_reply_queue_caller_on( self, single_reply_queue_for_callee_off=False): if not self.rpc: self.skipTest('rpc driver not available.') - # TODO(pekowski): Remove these lines in Havana where the option will be - # removed and the default will be true. - self.assertFalse(FLAGS.amqp_rpc_single_reply_queue) - self.config(amqp_rpc_single_reply_queue=True) - self.orig_unpack_context = rpc_amqp.unpack_context def my_unpack_context(conf, msg): @@ -127,10 +122,7 @@ class BaseRpcAMQPTestCase(common.BaseRpcTestCase): def _process_data(myself, message_data): #with open('mylog', 'a') as f: # f.write('my_process_data: ' + str(message_data) + '\n') - if single_reply_queue_for_callee_off: - self.assertTrue('_msg_id' not in message_data) - else: - self.assertTrue('_msg_id' in message_data) + self.assertTrue('_msg_id' in message_data) self.ReplyProxy_was_called = True super(MyReplyProxy, myself)._process_data(message_data) @@ -158,70 +150,10 @@ class BaseRpcAMQPTestCase(common.BaseRpcTestCase): self.stubs.UnsetAll() self.conn.pool.reply_proxy = self.orig_reply_proxy - # TODO(pekowski): Remove this line in Havana - self.config(amqp_rpc_single_reply_queue=False) - - # TODO(pekowski): Unfortunately remove this test in Havana. - # The amqp_rpc_single_reply_queue option will go away in Havana. - # There will be no way to send a downlevel RPC in Havana, yet - # Havana will be able to receive downlevel RPCs. We would - # need a downlevel caller to test it. - def test_single_reply_queue_off_no_ids( - self, single_reply_queue_for_callee_on=False): - if not self.rpc: - self.skipTest('rpc driver not available.') - - self.assertFalse(FLAGS.amqp_rpc_single_reply_queue) - - def my_unpack_context(conf, msg): - self.assertTrue('_reply_q' not in msg) - if single_reply_queue_for_callee_on: - self.config(amqp_rpc_single_reply_queue=True) - return self.orig_unpack_context(conf, msg) - - self.orig_unpack_context = rpc_amqp.unpack_context - self.stubs.Set(rpc_amqp, 'unpack_context', my_unpack_context) - - self.MulticallWaiter_call_was_called = False - - def my_MulticallWaiter_call(myself, data): - #with open('mylog', 'a') as f: - # f.write('my_MulticallWaiter_call: ' + str(data) + '\n') - self.assertTrue('_reply_q' not in data) - self.MulticallWaiter_call_was_called = True - return self.orig_MulticallWaiter_call(myself, data) - - self.orig_MulticallWaiter_call = rpc_amqp.MulticallWaiter.__call__ - self.stubs.Set(rpc_amqp.MulticallWaiter, '__call__', - my_MulticallWaiter_call) - - value = 42 - result = self.rpc.call(FLAGS, self.context, self.topic, - {"method": "echo", "args": {"value": value}}) - self.assertEqual(value, result) - self.assertTrue(self.MulticallWaiter_call_was_called) - - self.config(amqp_rpc_single_reply_queue=False) - self.stubs.UnsetAll() - - # TODO(pekowski): Remove this test in Havana. - def test_single_reply_queue_caller_off_callee_on(self): - self.test_single_reply_queue_off_no_ids( - single_reply_queue_for_callee_on=True) - def test_single_reply_queue_caller_on_callee_off(self): - self.test_single_reply_queue_on_has_ids( + self.test_single_reply_queue_caller_on( single_reply_queue_for_callee_off=True) - #TODO(pekowski): remove this test in Havana - def test_single_reply_queue_mt_resp_rting(self): - if not self.rpc: - self.skipTest('rpc driver not available.') - - self.config(amqp_rpc_single_reply_queue=True) - self.test_multithreaded_resp_routing() - self.config(amqp_rpc_single_reply_queue=False) - def test_duplicate_message_check(self): """Test sending *not-dict* to a topic exchange/queue.""" diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py index 42a6e6b..0bad387 100644 --- a/tests/unit/rpc/test_qpid.py +++ b/tests/unit/rpc/test_qpid.py @@ -26,6 +26,7 @@ eventlet.monkey_patch() import fixtures import mox from oslo.config import cfg +import uuid from openstack.common import context from openstack.common import jsonutils @@ -89,6 +90,13 @@ class RpcQpidTestCase(utils.BaseTestCase): fixtures.MonkeyPatch('qpid.messaging.Receiver', lambda *_x, **_y: self.mock_receiver)) + self.uuid4 = uuid.uuid4() + self.useFixture( + fixtures.MonkeyPatch('uuid.uuid4', self.mock_uuid4)) + + def mock_uuid4(self): + return self.uuid4 + def cleanUp(self): if impl_qpid: # Need to reset this in case we changed the connection_cls @@ -266,6 +274,7 @@ class RpcQpidTestCase(utils.BaseTestCase): # connection. self.mock_session.close() self.mock_connection.session().AndReturn(self.mock_session) + self.mock_connection.close() self.mox.ReplayAll() @@ -289,10 +298,8 @@ class RpcQpidTestCase(utils.BaseTestCase): method(*args) finally: - while impl_qpid.Connection.pool.free_items: - # Pull the mock connection object out of the connection pool so - # that it doesn't mess up other test cases. - impl_qpid.Connection.pool.get() + impl_qpid.cleanup() + self.uuid4 = uuid.uuid4() def test_cast(self): self._test_cast(fanout=False) @@ -331,7 +338,7 @@ class RpcQpidTestCase(utils.BaseTestCase): self._setup_to_server_tests(server_params) self._test_cast(fanout=True, server_params=server_params) - def _test_call(self, multi): + def _test_call_mock_common(self): self.mock_connection = self.mox.CreateMock(self.orig_connection) self.mock_session = self.mox.CreateMock(self.orig_session) self.mock_sender = self.mox.CreateMock(self.orig_sender) @@ -348,38 +355,53 @@ class RpcQpidTestCase(utils.BaseTestCase): 'false}, "durable": true, "name": ".*"}}') self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver) self.mock_receiver.capacity = 1 + self.mock_connection.opened().AndReturn(False) + self.mock_connection.open() + self.mock_connection.session().AndReturn(self.mock_session) send_addr = ( 'openstack/impl_qpid_test ; {"node": {"x-declare": ' '{"auto-delete": true, "durable": false}, "type": "topic"}, ' '"create": "always"}') self.mock_session.sender(send_addr).AndReturn(self.mock_sender) self.mock_sender.send(mox.IgnoreArg()) + self.mock_session.close() + self.mock_connection.session().AndReturn(self.mock_session) + + def _test_call(self, multi): + self._test_call_mock_common() - self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( + self.mock_session.next_receiver(timeout=None).AndReturn( self.mock_receiver) self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( - {"result": "foo", "failure": False, "ending": False})) + {"_msg_id": self.uuid4.hex, "result": "foo", "failure": False, + "ending": False})) self.mock_session.acknowledge(mox.IgnoreArg()) if multi: - self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( + self.mock_session.next_receiver(timeout=None).AndReturn( self.mock_receiver) self.mock_receiver.fetch().AndReturn( - qpid.messaging.Message({"result": "bar", "failure": False, + qpid.messaging.Message({"_msg_id": self.uuid4.hex, + "result": "bar", + "failure": False, "ending": False})) self.mock_session.acknowledge(mox.IgnoreArg()) - self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( + self.mock_session.next_receiver(timeout=None).AndReturn( self.mock_receiver) self.mock_receiver.fetch().AndReturn( - qpid.messaging.Message({"result": "baz", "failure": False, + qpid.messaging.Message({"_msg_id": self.uuid4.hex, + "result": "baz", + "failure": False, "ending": False})) self.mock_session.acknowledge(mox.IgnoreArg()) - self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( + self.mock_session.next_receiver(timeout=None).AndReturn( self.mock_receiver) self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( - {"failure": False, "ending": True})) + {"_msg_id": self.uuid4.hex, "failure": False, "ending": True})) self.mock_session.acknowledge(mox.IgnoreArg()) - self.mock_session.close() - self.mock_connection.session().AndReturn(self.mock_session) + # Normally the iterconsume() runs indefinitely, but we have to stop it + # here otherwise, the test won't end + self.mock_session.next_receiver(timeout=None).AndRaise(StopIteration) + self.mock_connection.close() self.mox.ReplayAll() @@ -399,59 +421,31 @@ class RpcQpidTestCase(utils.BaseTestCase): else: self.assertEquals(res, "foo") finally: - while impl_qpid.Connection.pool.free_items: - # Pull the mock connection object out of the connection pool so - # that it doesn't mess up other test cases. - impl_qpid.Connection.pool.get() + impl_qpid.cleanup() + self.uuid4 = uuid.uuid4() def test_call(self): self._test_call(multi=False) def _test_call_with_timeout(self, timeout, expect_failure): - # TODO(beagles): should be possible to refactor this method and - # _test_call to share common code. Maybe making the messages - # and test checks parameters, etc. - self.mock_connection = self.mox.CreateMock(self.orig_connection) - self.mock_session = self.mox.CreateMock(self.orig_session) - self.mock_sender = self.mox.CreateMock(self.orig_sender) - self.mock_receiver = self.mox.CreateMock(self.orig_receiver) + self._test_call_mock_common() - self.mock_connection.opened().AndReturn(False) - self.mock_connection.open() - self.mock_connection.session().AndReturn(self.mock_session) - rcv_addr = mox.Regex( - r'^.*/.* ; {"node": {"x-declare": {"auto-delete":' - ' true, "durable": true, "type": "direct"}, "type": ' - '"topic"}, "create": "always", "link": {"x-declare": ' - '{"auto-delete": true, "exclusive": true, "durable": ' - 'false}, "durable": true, "name": ".*"}}') - self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver) - self.mock_receiver.capacity = 1 - send_addr = ( - 'openstack/impl_qpid_test ; {"node": {"x-declare": ' - '{"auto-delete": true, "durable": false}, "type": "topic"}, ' - '"create": "always"}') - self.mock_session.sender(send_addr).AndReturn(self.mock_sender) - self.mock_sender.send(mox.IgnoreArg()) - - if expect_failure: - self.mock_session.next_receiver(timeout=mox.IsA(int)).AndRaise( - qpid.messaging.exceptions.Empty()) - self.mock_session.close() - self.mock_connection.session().AndReturn(self.mock_session) - else: - self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( + if not expect_failure: + self.mock_session.next_receiver(timeout=None).AndReturn( self.mock_receiver) self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( - {"result": "foo", "failure": False, "ending": False})) + {"_msg_id": self.uuid4.hex, "result": "foo", "failure": False, + "ending": False})) self.mock_session.acknowledge(mox.IgnoreArg()) - self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( + self.mock_session.next_receiver(timeout=None).AndReturn( self.mock_receiver) self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( - {"failure": False, "ending": True})) + {"_msg_id": self.uuid4.hex, "failure": False, "ending": True})) self.mock_session.acknowledge(mox.IgnoreArg()) - self.mock_session.close() - self.mock_connection.session().AndReturn(self.mock_session) + # Normally the iterconsume() runs indefinitely, but we have to stop it + # here otherwise, the test won't end + self.mock_session.next_receiver(timeout=None).AndRaise(StopIteration) + self.mock_connection.close() self.mox.ReplayAll() @@ -472,10 +466,8 @@ class RpcQpidTestCase(utils.BaseTestCase): {"method": "test_method", "args": {}}, timeout) self.assertEquals(res, "foo") finally: - while impl_qpid.Connection.pool.free_items: - # Pull the mock connection object out of the connection pool so - # that it doesn't mess up other test cases. - impl_qpid.Connection.pool.get() + impl_qpid.cleanup() + self.uuid4 = uuid.uuid4() def test_call_with_timeout(self): """A little more indepth for a timeout test. @@ -488,7 +480,7 @@ class RpcQpidTestCase(utils.BaseTestCase): sufficient. """ self._test_call_with_timeout(timeout=5, expect_failure=False) - self._test_call_with_timeout(timeout=0, expect_failure=True) + self._test_call_with_timeout(timeout=0.1, expect_failure=True) def test_multicall(self): self._test_call(multi=True) |