diff options
author | Brent Eagles <beagles@redhat.com> | 2013-01-28 13:34:55 -0330 |
---|---|---|
committer | Brent Eagles <beagles@redhat.com> | 2013-01-28 17:45:08 -0330 |
commit | 30a50c8a6c534f01d518eb3ce4cf0d35877d9a7f (patch) | |
tree | adb7220ed3ecacf1fc5cab2e94c6643f2ea3e185 /tests | |
parent | b5288ab022d3733abae25971ca8afb56f1a7011b (diff) | |
download | oslo-30a50c8a6c534f01d518eb3ce4cf0d35877d9a7f.tar.gz oslo-30a50c8a6c534f01d518eb3ce4cf0d35877d9a7f.tar.xz oslo-30a50c8a6c534f01d518eb3ce4cf0d35877d9a7f.zip |
Add TTL to AMQP messages
This patch sets a TTL derived from the timeout parameter used
when invoking the RPC.
Needed for:
* Bug 1053614
Change-Id: Idaf9a2a972feac3f95f170c52632033b027b4414
Diffstat (limited to 'tests')
-rw-r--r-- | tests/unit/rpc/test_kombu.py | 19 | ||||
-rw-r--r-- | tests/unit/rpc/test_qpid.py | 109 |
2 files changed, 123 insertions, 5 deletions
diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py index b28c568..a201940 100644 --- a/tests/unit/rpc/test_kombu.py +++ b/tests/unit/rpc/test_kombu.py @@ -24,6 +24,7 @@ eventlet.monkey_patch() import contextlib import logging +import time from openstack.common import cfg from openstack.common import exception @@ -109,6 +110,24 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): self.assertEqual(self.received_message, message) + def test_message_ttl_on_timeout(self): + """Test message ttl being set by request timeout. The message + should die on the vine and never arrive.""" + conn = self.rpc.create_connection(FLAGS) + message = 'topic test message' + + self.received_message = None + + def _callback(message): + self.received_message = message + self.fail("should not have received this message") + + conn.declare_topic_consumer('a_topic', _callback) + conn.topic_send('a_topic', rpc_common.serialize_msg(message), 0) + conn.iterconsume(1, 2) + + conn.close() + def test_topic_send_receive_exchange_name(self): """Test sending to a topic exchange/queue with an exchange name""" diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py index 27445ee..1df94d5 100644 --- a/tests/unit/rpc/test_qpid.py +++ b/tests/unit/rpc/test_qpid.py @@ -26,22 +26,24 @@ eventlet.monkey_patch() import logging import fixtures +import time +import testtools +import mox from openstack.common import cfg from openstack.common import context from openstack.common.rpc import amqp as rpc_amqp from tests import utils +from openstack.common.rpc import common as rpc_common try: - from openstack.common.rpc import impl_qpid import qpid + from openstack.common.rpc import impl_qpid except ImportError: qpid = None impl_qpid = None - FLAGS = cfg.CONF -LOG = logging.getLogger(__name__) class RpcQpidTestCase(utils.BaseTestCase): @@ -71,6 +73,12 @@ class RpcQpidTestCase(utils.BaseTestCase): self.mock_session = None self.mock_sender = None self.mock_receiver = None + self.mox = mox.Mox() + + self.orig_connection = qpid.messaging.Connection + self.orig_session = qpid.messaging.Session + self.orig_sender = qpid.messaging.Sender + self.orig_receiver = qpid.messaging.Receiver self.useFixture( fixtures.MonkeyPatch('qpid.messaging.Connection', @@ -82,9 +90,14 @@ class RpcQpidTestCase(utils.BaseTestCase): fixtures.MonkeyPatch('qpid.messaging.Sender', lambda *_x, **_y: self.mock_sender)) self.useFixture( - fixtures.MonekyPatch('qpid.messaging.Receiver', + fixtures.MonkeyPatch('qpid.messaging.Receiver', lambda *_x, **_y: self.mock_receiver)) - self.addCleanup(self._close_qpid) + + def cleanUp(self): + if impl_qpid: + # Need to reset this in case we changed the connection_cls + # in self._setup_to_server_tests() + impl_qpid.Connection.pool.connection_cls = impl_qpid.Connection def test_create_connection(self): self.mock_connection = self.mox.CreateMock(self.orig_connection) @@ -368,6 +381,92 @@ class RpcQpidTestCase(utils.BaseTestCase): def test_call(self): self._test_call(multi=False) + def _test_call_with_timeout(self, timeout, expect_failure): + # TODO(beagles): should be possible to refactor this method and + # _test_call to share common code. Maybe making the messages + # and test checks parameters, etc. + self.mock_connection = self.mox.CreateMock(self.orig_connection) + self.mock_session = self.mox.CreateMock(self.orig_session) + self.mock_sender = self.mox.CreateMock(self.orig_sender) + self.mock_receiver = self.mox.CreateMock(self.orig_receiver) + + self.mock_connection.opened().AndReturn(False) + self.mock_connection.open() + self.mock_connection.session().AndReturn(self.mock_session) + rcv_addr = mox.Regex( + r'^.*/.* ; {"node": {"x-declare": {"auto-delete":' + ' true, "durable": true, "type": "direct"}, "type": ' + '"topic"}, "create": "always", "link": {"x-declare": ' + '{"auto-delete": true, "exclusive": true, "durable": ' + 'false}, "durable": true, "name": ".*"}}') + self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver) + self.mock_receiver.capacity = 1 + send_addr = ( + 'openstack/impl_qpid_test ; {"node": {"x-declare": ' + '{"auto-delete": true, "durable": false}, "type": "topic"}, ' + '"create": "always"}') + self.mock_session.sender(send_addr).AndReturn(self.mock_sender) + self.mock_sender.send(mox.IgnoreArg()) + + if expect_failure: + self.mock_session.next_receiver(timeout=mox.IsA(int)).AndRaise( + qpid.messaging.exceptions.Empty()) + self.mock_receiver.fetch() + self.mock_session.close() + self.mock_connection.session().AndReturn(self.mock_session) + else: + self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( + self.mock_receiver) + self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( + {"result": "foo", "failure": False, "ending": False})) + self.mock_session.acknowledge(mox.IgnoreArg()) + self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( + self.mock_receiver) + self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( + {"failure": False, "ending": True})) + self.mock_session.acknowledge(mox.IgnoreArg()) + self.mock_session.close() + self.mock_connection.session().AndReturn(self.mock_session) + + self.mox.ReplayAll() + + try: + ctx = context.RequestContext("user", "project") + method = impl_qpid.call + if expect_failure: + try: + res = method(FLAGS, ctx, "impl_qpid_test", + {"method": "test_method", "args": {}}, + timeout) + self.fail('Expected a timeout exception') + except rpc_common.Timeout: + # Good, this is expected! + pass + else: + res = method(FLAGS, ctx, "impl_qpid_test", + {"method": "test_method", "args": {}}, timeout) + self.assertEquals(res, "foo") + finally: + while impl_qpid.Connection.pool.free_items: + # Pull the mock connection object out of the connection pool so + # that it doesn't mess up other test cases. + impl_qpid.Connection.pool.get() + + def test_call(self): + self._test_call(multi=False) + + def test_call_with_timeout(self): + """A little more indepth for a timeout test. Specifically we are + looking to simulate the event sent to qpid dying on the vine due + to a TTL. A string test that actually involved qpid would be + excellent, but this at least verifies that the exceptions flow + like they should. TODO(beagles): is this really necessary or is + the the case for qpid at least the basic timeout test is + sufficient. + """ + self._test_call_with_timeout(timeout=5, expect_failure=False) + self._test_call_with_timeout(timeout=0, expect_failure=True) + def test_multicall(self): self._test_call(multi=True) |