summaryrefslogtreecommitdiffstats
path: root/tests/unit/rpc/test_qpid.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unit/rpc/test_qpid.py')
-rw-r--r--tests/unit/rpc/test_qpid.py31
1 files changed, 27 insertions, 4 deletions
diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py
index 0bad387..5d51a4b 100644
--- a/tests/unit/rpc/test_qpid.py
+++ b/tests/unit/rpc/test_qpid.py
@@ -26,6 +26,7 @@ eventlet.monkey_patch()
import fixtures
import mox
from oslo.config import cfg
+import time
import uuid
from openstack.common import context
@@ -218,7 +219,7 @@ class RpcQpidTestCase(utils.BaseTestCase):
)
connection.close()
- def test_topic_consumer(self):
+ def test_topic_consumer(self, consume_thread_exc=False):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
@@ -235,6 +236,9 @@ class RpcQpidTestCase(utils.BaseTestCase):
self.mock_session.receiver(expected_address).AndReturn(
self.mock_receiver)
self.mock_receiver.capacity = 1
+ if consume_thread_exc:
+ self.mock_session.next_receiver(timeout=None).AndRaise(
+ Exception('unexpected exception'))
self.mock_connection.close()
self.mox.ReplayAll()
@@ -244,8 +248,14 @@ class RpcQpidTestCase(utils.BaseTestCase):
lambda *_x, **_y: None,
queue_name='impl.qpid.test.workers',
exchange_name='foobar')
+ if consume_thread_exc:
+ connection.consume_in_thread()
+ time.sleep(0)
connection.close()
+ def test_consume_thread_exception(self):
+ self.test_topic_consumer(consume_thread_exc=True)
+
def _test_cast(self, fanout, server_params=None):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
@@ -338,7 +348,11 @@ class RpcQpidTestCase(utils.BaseTestCase):
self._setup_to_server_tests(server_params)
self._test_cast(fanout=True, server_params=server_params)
+ def my_time_sleep(self, arg):
+ pass
+
def _test_call_mock_common(self):
+ self.stubs.Set(time, 'sleep', self.my_time_sleep)
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
self.mock_sender = self.mox.CreateMock(self.orig_sender)
@@ -367,9 +381,12 @@ class RpcQpidTestCase(utils.BaseTestCase):
self.mock_session.close()
self.mock_connection.session().AndReturn(self.mock_session)
- def _test_call(self, multi):
+ def _test_call(self, multi, reply_proxy_exc):
self._test_call_mock_common()
+ if reply_proxy_exc:
+ self.mock_session.next_receiver(timeout=None).AndRaise(
+ Exception('unexpected exception'))
self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
@@ -393,6 +410,9 @@ class RpcQpidTestCase(utils.BaseTestCase):
"failure": False,
"ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
+ if reply_proxy_exc:
+ self.mock_session.next_receiver(timeout=None).AndRaise(
+ Exception('unexpected exception'))
self.mock_session.next_receiver(timeout=None).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
@@ -425,7 +445,10 @@ class RpcQpidTestCase(utils.BaseTestCase):
self.uuid4 = uuid.uuid4()
def test_call(self):
- self._test_call(multi=False)
+ self._test_call(multi=False, reply_proxy_exc=False)
+
+ def test_replyproxy_consume_thread_unexpected_exceptions(self):
+ self._test_call(multi=False, reply_proxy_exc=True)
def _test_call_with_timeout(self, timeout, expect_failure):
self._test_call_mock_common()
@@ -483,7 +506,7 @@ class RpcQpidTestCase(utils.BaseTestCase):
self._test_call_with_timeout(timeout=0.1, expect_failure=True)
def test_multicall(self):
- self._test_call(multi=True)
+ self._test_call(multi=True, reply_proxy_exc=False)
def _test_publisher(self, message=True):
"""Test that messages containing long strings are correctly serialized