summaryrefslogtreecommitdiffstats
path: root/tests/unit/rpc/test_qpid.py
diff options
context:
space:
mode:
authorRaymond Pekowski <pekowski@gmail.com>2013-06-05 11:40:23 -0500
committerRaymond Pekowski <pekowski@gmail.com>2013-06-05 11:41:09 -0500
commit6d42ced0d6d73783e901d6c4f840d0f2465e31bf (patch)
treee3435060ae75941944c375d6c8272a209dab3cbc /tests/unit/rpc/test_qpid.py
parent3f315db8dab8a859e7204e7ac184ea9796273a74 (diff)
downloadoslo-6d42ced0d6d73783e901d6c4f840d0f2465e31bf.tar.gz
oslo-6d42ced0d6d73783e901d6c4f840d0f2465e31bf.tar.xz
oslo-6d42ced0d6d73783e901d6c4f840d0f2465e31bf.zip
Remove the amqp_rpc_single_reply_queue option from Havana
blueprint amqp-rpc-fast-reply-queue As part of the Grizzly code review for the above blueprint (Idb09a714), the Grizzly code base is "aware" of this feature in terms of receiving RPCs and provided with an option for enabling it on the RPC send side. It was decided that in Havana the option would be removed, RPCs would only be sent using the new functionality yet awareness of prior functionality be retained for the receiving of RPCs. This code change implements the Havana piece. Change-Id: I5f2a8e432cf84a637c1aa813656c1cbc8db6e7eb
Diffstat (limited to 'tests/unit/rpc/test_qpid.py')
-rw-r--r--tests/unit/rpc/test_qpid.py114
1 files changed, 53 insertions, 61 deletions
diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py
index 02e8e20..40f0e37 100644
--- a/tests/unit/rpc/test_qpid.py
+++ b/tests/unit/rpc/test_qpid.py
@@ -26,6 +26,7 @@ eventlet.monkey_patch()
import fixtures
import mox
from oslo.config import cfg
+import uuid
from openstack.common import context
from openstack.common import jsonutils
@@ -90,6 +91,13 @@ class RpcQpidTestCase(utils.BaseTestCase):
fixtures.MonkeyPatch('qpid.messaging.Receiver',
lambda *_x, **_y: self.mock_receiver))
+ self.uuid4 = uuid.uuid4()
+ self.useFixture(
+ fixtures.MonkeyPatch('uuid.uuid4', self.mock_uuid4))
+
+ def mock_uuid4(self):
+ return self.uuid4
+
def cleanUp(self):
if impl_qpid:
# Need to reset this in case we changed the connection_cls
@@ -267,6 +275,7 @@ class RpcQpidTestCase(utils.BaseTestCase):
# connection.
self.mock_session.close()
self.mock_connection.session().AndReturn(self.mock_session)
+ self.mock_connection.close()
self.mox.ReplayAll()
@@ -290,10 +299,8 @@ class RpcQpidTestCase(utils.BaseTestCase):
method(*args)
finally:
- while impl_qpid.Connection.pool.free_items:
- # Pull the mock connection object out of the connection pool so
- # that it doesn't mess up other test cases.
- impl_qpid.Connection.pool.get()
+ impl_qpid.cleanup()
+ self.uuid4 = uuid.uuid4()
def test_cast(self):
self._test_cast(fanout=False)
@@ -332,7 +339,7 @@ class RpcQpidTestCase(utils.BaseTestCase):
self._setup_to_server_tests(server_params)
self._test_cast(fanout=True, server_params=server_params)
- def _test_call(self, multi):
+ def _test_call_mock_common(self):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
self.mock_sender = self.mox.CreateMock(self.orig_sender)
@@ -349,38 +356,53 @@ class RpcQpidTestCase(utils.BaseTestCase):
'false}, "durable": true, "name": ".*"}}')
self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver)
self.mock_receiver.capacity = 1
+ self.mock_connection.opened().AndReturn(False)
+ self.mock_connection.open()
+ self.mock_connection.session().AndReturn(self.mock_session)
send_addr = (
'openstack/impl_qpid_test ; {"node": {"x-declare": '
'{"auto-delete": true, "durable": false}, "type": "topic"}, '
'"create": "always"}')
self.mock_session.sender(send_addr).AndReturn(self.mock_sender)
self.mock_sender.send(mox.IgnoreArg())
+ self.mock_session.close()
+ self.mock_connection.session().AndReturn(self.mock_session)
+
+ def _test_call(self, multi):
+ self._test_call_mock_common()
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
- {"result": "foo", "failure": False, "ending": False}))
+ {"_msg_id": self.uuid4.hex, "result": "foo", "failure": False,
+ "ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
if multi:
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(
- qpid.messaging.Message({"result": "bar", "failure": False,
+ qpid.messaging.Message({"_msg_id": self.uuid4.hex,
+ "result": "bar",
+ "failure": False,
"ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(
- qpid.messaging.Message({"result": "baz", "failure": False,
+ qpid.messaging.Message({"_msg_id": self.uuid4.hex,
+ "result": "baz",
+ "failure": False,
"ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
- {"failure": False, "ending": True}))
+ {"_msg_id": self.uuid4.hex, "failure": False, "ending": True}))
self.mock_session.acknowledge(mox.IgnoreArg())
- self.mock_session.close()
- self.mock_connection.session().AndReturn(self.mock_session)
+ # Normally the iterconsume() runs indefinitely, but we have to stop it
+ # here otherwise, the test won't end
+ self.mock_session.next_receiver(timeout=None).AndRaise(StopIteration)
+ self.mock_connection.close()
self.mox.ReplayAll()
@@ -400,59 +422,31 @@ class RpcQpidTestCase(utils.BaseTestCase):
else:
self.assertEquals(res, "foo")
finally:
- while impl_qpid.Connection.pool.free_items:
- # Pull the mock connection object out of the connection pool so
- # that it doesn't mess up other test cases.
- impl_qpid.Connection.pool.get()
+ impl_qpid.cleanup()
+ self.uuid4 = uuid.uuid4()
def test_call(self):
self._test_call(multi=False)
def _test_call_with_timeout(self, timeout, expect_failure):
- # TODO(beagles): should be possible to refactor this method and
- # _test_call to share common code. Maybe making the messages
- # and test checks parameters, etc.
- self.mock_connection = self.mox.CreateMock(self.orig_connection)
- self.mock_session = self.mox.CreateMock(self.orig_session)
- self.mock_sender = self.mox.CreateMock(self.orig_sender)
- self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
+ self._test_call_mock_common()
- self.mock_connection.opened().AndReturn(False)
- self.mock_connection.open()
- self.mock_connection.session().AndReturn(self.mock_session)
- rcv_addr = mox.Regex(
- r'^.*/.* ; {"node": {"x-declare": {"auto-delete":'
- ' true, "durable": true, "type": "direct"}, "type": '
- '"topic"}, "create": "always", "link": {"x-declare": '
- '{"auto-delete": true, "exclusive": true, "durable": '
- 'false}, "durable": true, "name": ".*"}}')
- self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver)
- self.mock_receiver.capacity = 1
- send_addr = (
- 'openstack/impl_qpid_test ; {"node": {"x-declare": '
- '{"auto-delete": true, "durable": false}, "type": "topic"}, '
- '"create": "always"}')
- self.mock_session.sender(send_addr).AndReturn(self.mock_sender)
- self.mock_sender.send(mox.IgnoreArg())
-
- if expect_failure:
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndRaise(
- qpid.messaging.exceptions.Empty())
- self.mock_session.close()
- self.mock_connection.session().AndReturn(self.mock_session)
- else:
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ if not expect_failure:
+ self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
- {"result": "foo", "failure": False, "ending": False}))
+ {"_msg_id": self.uuid4.hex, "result": "foo", "failure": False,
+ "ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
- self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
- {"failure": False, "ending": True}))
+ {"_msg_id": self.uuid4.hex, "failure": False, "ending": True}))
self.mock_session.acknowledge(mox.IgnoreArg())
- self.mock_session.close()
- self.mock_connection.session().AndReturn(self.mock_session)
+ # Normally the iterconsume() runs indefinitely, but we have to stop it
+ # here otherwise, the test won't end
+ self.mock_session.next_receiver(timeout=None).AndRaise(StopIteration)
+ self.mock_connection.close()
self.mox.ReplayAll()
@@ -473,10 +467,8 @@ class RpcQpidTestCase(utils.BaseTestCase):
{"method": "test_method", "args": {}}, timeout)
self.assertEquals(res, "foo")
finally:
- while impl_qpid.Connection.pool.free_items:
- # Pull the mock connection object out of the connection pool so
- # that it doesn't mess up other test cases.
- impl_qpid.Connection.pool.get()
+ impl_qpid.cleanup()
+ self.uuid4 = uuid.uuid4()
def test_call_with_timeout(self):
"""A little more indepth for a timeout test. Specifically we are
@@ -488,7 +480,7 @@ class RpcQpidTestCase(utils.BaseTestCase):
sufficient.
"""
self._test_call_with_timeout(timeout=5, expect_failure=False)
- self._test_call_with_timeout(timeout=0, expect_failure=True)
+ self._test_call_with_timeout(timeout=0.1, expect_failure=True)
def test_multicall(self):
self._test_call(multi=True)