summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
authorBrent Eagles <beagles@redhat.com>2013-01-28 13:34:55 -0330
committerBrent Eagles <beagles@redhat.com>2013-01-28 17:45:08 -0330
commit30a50c8a6c534f01d518eb3ce4cf0d35877d9a7f (patch)
treeadb7220ed3ecacf1fc5cab2e94c6643f2ea3e185 /tests
parentb5288ab022d3733abae25971ca8afb56f1a7011b (diff)
downloadoslo-30a50c8a6c534f01d518eb3ce4cf0d35877d9a7f.tar.gz
oslo-30a50c8a6c534f01d518eb3ce4cf0d35877d9a7f.tar.xz
oslo-30a50c8a6c534f01d518eb3ce4cf0d35877d9a7f.zip
Add TTL to AMQP messages
This patch sets a TTL derived from the timeout parameter used when invoking the RPC. Needed for: * Bug 1053614 Change-Id: Idaf9a2a972feac3f95f170c52632033b027b4414
Diffstat (limited to 'tests')
-rw-r--r--tests/unit/rpc/test_kombu.py19
-rw-r--r--tests/unit/rpc/test_qpid.py109
2 files changed, 123 insertions, 5 deletions
diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py
index b28c568..a201940 100644
--- a/tests/unit/rpc/test_kombu.py
+++ b/tests/unit/rpc/test_kombu.py
@@ -24,6 +24,7 @@ eventlet.monkey_patch()
import contextlib
import logging
+import time
from openstack.common import cfg
from openstack.common import exception
@@ -109,6 +110,24 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
self.assertEqual(self.received_message, message)
+ def test_message_ttl_on_timeout(self):
+ """Test message ttl being set by request timeout. The message
+ should die on the vine and never arrive."""
+ conn = self.rpc.create_connection(FLAGS)
+ message = 'topic test message'
+
+ self.received_message = None
+
+ def _callback(message):
+ self.received_message = message
+ self.fail("should not have received this message")
+
+ conn.declare_topic_consumer('a_topic', _callback)
+ conn.topic_send('a_topic', rpc_common.serialize_msg(message), 0)
+ conn.iterconsume(1, 2)
+
+ conn.close()
+
def test_topic_send_receive_exchange_name(self):
"""Test sending to a topic exchange/queue with an exchange name"""
diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py
index 27445ee..1df94d5 100644
--- a/tests/unit/rpc/test_qpid.py
+++ b/tests/unit/rpc/test_qpid.py
@@ -26,22 +26,24 @@ eventlet.monkey_patch()
import logging
import fixtures
+import time
+import testtools
+import mox
from openstack.common import cfg
from openstack.common import context
from openstack.common.rpc import amqp as rpc_amqp
from tests import utils
+from openstack.common.rpc import common as rpc_common
try:
- from openstack.common.rpc import impl_qpid
import qpid
+ from openstack.common.rpc import impl_qpid
except ImportError:
qpid = None
impl_qpid = None
-
FLAGS = cfg.CONF
-LOG = logging.getLogger(__name__)
class RpcQpidTestCase(utils.BaseTestCase):
@@ -71,6 +73,12 @@ class RpcQpidTestCase(utils.BaseTestCase):
self.mock_session = None
self.mock_sender = None
self.mock_receiver = None
+ self.mox = mox.Mox()
+
+ self.orig_connection = qpid.messaging.Connection
+ self.orig_session = qpid.messaging.Session
+ self.orig_sender = qpid.messaging.Sender
+ self.orig_receiver = qpid.messaging.Receiver
self.useFixture(
fixtures.MonkeyPatch('qpid.messaging.Connection',
@@ -82,9 +90,14 @@ class RpcQpidTestCase(utils.BaseTestCase):
fixtures.MonkeyPatch('qpid.messaging.Sender',
lambda *_x, **_y: self.mock_sender))
self.useFixture(
- fixtures.MonekyPatch('qpid.messaging.Receiver',
+ fixtures.MonkeyPatch('qpid.messaging.Receiver',
lambda *_x, **_y: self.mock_receiver))
- self.addCleanup(self._close_qpid)
+
+ def cleanUp(self):
+ if impl_qpid:
+ # Need to reset this in case we changed the connection_cls
+ # in self._setup_to_server_tests()
+ impl_qpid.Connection.pool.connection_cls = impl_qpid.Connection
def test_create_connection(self):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
@@ -368,6 +381,92 @@ class RpcQpidTestCase(utils.BaseTestCase):
def test_call(self):
self._test_call(multi=False)
+ def _test_call_with_timeout(self, timeout, expect_failure):
+ # TODO(beagles): should be possible to refactor this method and
+ # _test_call to share common code. Maybe making the messages
+ # and test checks parameters, etc.
+ self.mock_connection = self.mox.CreateMock(self.orig_connection)
+ self.mock_session = self.mox.CreateMock(self.orig_session)
+ self.mock_sender = self.mox.CreateMock(self.orig_sender)
+ self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
+
+ self.mock_connection.opened().AndReturn(False)
+ self.mock_connection.open()
+ self.mock_connection.session().AndReturn(self.mock_session)
+ rcv_addr = mox.Regex(
+ r'^.*/.* ; {"node": {"x-declare": {"auto-delete":'
+ ' true, "durable": true, "type": "direct"}, "type": '
+ '"topic"}, "create": "always", "link": {"x-declare": '
+ '{"auto-delete": true, "exclusive": true, "durable": '
+ 'false}, "durable": true, "name": ".*"}}')
+ self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver)
+ self.mock_receiver.capacity = 1
+ send_addr = (
+ 'openstack/impl_qpid_test ; {"node": {"x-declare": '
+ '{"auto-delete": true, "durable": false}, "type": "topic"}, '
+ '"create": "always"}')
+ self.mock_session.sender(send_addr).AndReturn(self.mock_sender)
+ self.mock_sender.send(mox.IgnoreArg())
+
+ if expect_failure:
+ self.mock_session.next_receiver(timeout=mox.IsA(int)).AndRaise(
+ qpid.messaging.exceptions.Empty())
+ self.mock_receiver.fetch()
+ self.mock_session.close()
+ self.mock_connection.session().AndReturn(self.mock_session)
+ else:
+ self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ self.mock_receiver)
+ self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
+ {"result": "foo", "failure": False, "ending": False}))
+ self.mock_session.acknowledge(mox.IgnoreArg())
+ self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ self.mock_receiver)
+ self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
+ {"failure": False, "ending": True}))
+ self.mock_session.acknowledge(mox.IgnoreArg())
+ self.mock_session.close()
+ self.mock_connection.session().AndReturn(self.mock_session)
+
+ self.mox.ReplayAll()
+
+ try:
+ ctx = context.RequestContext("user", "project")
+ method = impl_qpid.call
+ if expect_failure:
+ try:
+ res = method(FLAGS, ctx, "impl_qpid_test",
+ {"method": "test_method", "args": {}},
+ timeout)
+ self.fail('Expected a timeout exception')
+ except rpc_common.Timeout:
+ # Good, this is expected!
+ pass
+ else:
+ res = method(FLAGS, ctx, "impl_qpid_test",
+ {"method": "test_method", "args": {}}, timeout)
+ self.assertEquals(res, "foo")
+ finally:
+ while impl_qpid.Connection.pool.free_items:
+ # Pull the mock connection object out of the connection pool so
+ # that it doesn't mess up other test cases.
+ impl_qpid.Connection.pool.get()
+
+ def test_call(self):
+ self._test_call(multi=False)
+
+ def test_call_with_timeout(self):
+ """A little more indepth for a timeout test. Specifically we are
+ looking to simulate the event sent to qpid dying on the vine due
+ to a TTL. A string test that actually involved qpid would be
+ excellent, but this at least verifies that the exceptions flow
+ like they should. TODO(beagles): is this really necessary or is
+ the the case for qpid at least the basic timeout test is
+ sufficient.
+ """
+ self._test_call_with_timeout(timeout=5, expect_failure=False)
+ self._test_call_with_timeout(timeout=0, expect_failure=True)
+
def test_multicall(self):
self._test_call(multi=True)