summaryrefslogtreecommitdiffstats
path: root/tests/unit/rpc/test_kombu.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unit/rpc/test_kombu.py')
-rw-r--r--tests/unit/rpc/test_kombu.py238
1 files changed, 221 insertions, 17 deletions
diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py
index 159fefb..cbe948d 100644
--- a/tests/unit/rpc/test_kombu.py
+++ b/tests/unit/rpc/test_kombu.py
@@ -23,11 +23,15 @@ import eventlet
eventlet.monkey_patch()
import contextlib
+import functools
import logging
+import weakref
+import fixtures
import mock
from oslo.config import cfg
import six
+import time
from openstack.common import exception
from openstack.common.rpc import amqp as rpc_amqp
@@ -37,6 +41,8 @@ from tests import utils
try:
import kombu
+ import kombu.connection
+ import kombu.entity
from openstack.common.rpc import impl_kombu
except ImportError:
kombu = None
@@ -65,25 +71,50 @@ def _raise_exc_stub(stubs, times, obj, method, exc_msg,
return info
-class KombuStubs:
- @staticmethod
+class KombuStubs(fixtures.Fixture):
+ def __init__(self, test):
+ super(KombuStubs, self).__init__()
+
+ # NOTE(rpodolyaka): use a weak ref here to prevent ref cycles
+ self.test = weakref.ref(test)
+
def setUp(self):
+ super(KombuStubs, self).setUp()
+
+ test = self.test()
if kombu:
- self.config(fake_rabbit=True)
- self.config(rpc_response_timeout=5)
- self.rpc = impl_kombu
+ test.conf = FLAGS
+ test.config(fake_rabbit=True)
+ test.config(rpc_response_timeout=5)
+ test.rpc = impl_kombu
self.addCleanup(impl_kombu.cleanup)
else:
- self.rpc = None
+ test.rpc = None
+
+
+class FakeMessage(object):
+ acked = False
+ rejected = False
+
+ def __init__(self, payload):
+ self.payload = payload
+
+ def ack(self):
+ self.acked = True
+
+ def reject(self):
+ self.rejected = True
class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase):
def setUp(self):
- KombuStubs.setUp(self)
- super(RpcKombuTestCase, self).setUp()
if kombu is None:
self.skipTest("Test requires kombu")
+ self.useFixture(KombuStubs(self))
+
+ super(RpcKombuTestCase, self).setUp()
+
def test_reusing_connection(self):
"""Test that reusing a connection returns same one."""
conn_context = self.rpc.create_connection(FLAGS, new=False)
@@ -112,6 +143,74 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase):
self.assertEqual(self.received_message, message)
+ def test_callback_handler_ack_on_error(self):
+ """The default case will ack on error. Same as before.
+ """
+ def _callback(msg):
+ pass
+
+ conn = self.rpc.create_connection(FLAGS)
+ consumer = conn.declare_consumer(functools.partial(
+ impl_kombu.TopicConsumer,
+ name=None,
+ exchange_name=None),
+ "a_topic", _callback)
+ message = FakeMessage("some message")
+ consumer._callback_handler(message, _callback)
+ self.assertTrue(message.acked)
+ self.assertFalse(message.rejected)
+
+ def test_callback_handler_ack_on_error_exception(self):
+
+ def _callback(msg):
+ raise MyException()
+
+ conn = self.rpc.create_connection(FLAGS)
+ consumer = conn.declare_consumer(functools.partial(
+ impl_kombu.TopicConsumer,
+ name=None,
+ exchange_name=None,
+ ack_on_error=True),
+ "a_topic", _callback)
+ message = FakeMessage("some message")
+ consumer._callback_handler(message, _callback)
+ self.assertTrue(message.acked)
+ self.assertFalse(message.rejected)
+
+ def test_callback_handler_no_ack_on_error_exception(self):
+
+ def _callback(msg):
+ raise MyException()
+
+ conn = self.rpc.create_connection(FLAGS)
+ consumer = conn.declare_consumer(functools.partial(
+ impl_kombu.TopicConsumer,
+ name=None,
+ exchange_name=None,
+ ack_on_error=False),
+ "a_topic", _callback)
+ message = FakeMessage("some message")
+ consumer._callback_handler(message, _callback)
+ self.assertFalse(message.acked)
+ self.assertTrue(message.rejected)
+
+ def test_callback_handler_no_ack_on_error(self):
+
+ def _callback(msg):
+ pass
+
+ conn = self.rpc.create_connection(FLAGS)
+ consumer = conn.declare_consumer(functools.partial(
+ impl_kombu.TopicConsumer,
+ name=None,
+ exchange_name=None,
+ ack_on_error=False),
+ "a_topic", _callback)
+ message = FakeMessage("some message")
+ consumer._callback_handler(message, _callback)
+ self.assertTrue(message.acked)
+ self.assertFalse(message.rejected)
+
def test_message_ttl_on_timeout(self):
"""Test message ttl being set by request timeout. The message
should die on the vine and never arrive.
@@ -308,6 +407,22 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase):
impl_kombu.cast_to_server(FLAGS, ctxt, server_params,
'fake_topic', {'msg': 'fake'})
+ def test_fanout_success(self):
+ # Overriding the method in the base class because the test
+ # seems to fail for the same reason as
+ # test_fanout_send_receive().
+ self.skipTest("kombu memory transport seems buggy with "
+ "fanout queues as this test passes when "
+ "you use rabbit (fake_rabbit=False)")
+
+ def test_cast_success_despite_missing_args(self):
+ # Overriding the method in the base class because the test
+ # seems to fail for the same reason as
+ # test_fanout_send_receive().
+ self.skipTest("kombu memory transport seems buggy with "
+ "fanout queues as this test passes when "
+ "you use rabbit (fake_rabbit=False)")
+
def test_fanout_send_receive(self):
"""Test sending to a fanout exchange and consuming from 2 queues."""
@@ -514,7 +629,7 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase):
'pool.name',
)
- def test_join_consumer_pool(self):
+ def test_join_consumer_pool_default(self):
meth = 'declare_topic_consumer'
with mock.patch.object(self.rpc.Connection, meth) as p:
conn = self.rpc.create_connection(FLAGS)
@@ -529,13 +644,108 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase):
queue_name='pool.name',
exchange_name='exchange.name',
topic='topic.name',
+ ack_on_error=True,
)
+ def test_join_consumer_pool_no_ack(self):
+ meth = 'declare_topic_consumer'
+ with mock.patch.object(self.rpc.Connection, meth) as p:
+ conn = self.rpc.create_connection(FLAGS)
+ conn.join_consumer_pool(
+ callback=lambda *a, **k: (a, k),
+ pool_name='pool.name',
+ topic='topic.name',
+ exchange_name='exchange.name',
+ ack_on_error=False,
+ )
+ p.assert_called_with(
+ callback=mock.ANY, # the callback wrapper
+ queue_name='pool.name',
+ exchange_name='exchange.name',
+ topic='topic.name',
+ ack_on_error=False,
+ )
+
+ # used to make unexpected exception tests run faster
+ def my_time_sleep(self, sleep_time):
+ return
+
+ def test_service_consume_thread_unexpected_exceptions(self):
+
+ def my_TopicConsumer_consume(myself, *args, **kwargs):
+ self.consume_calls += 1
+ # see if it can sustain three failures
+ if self.consume_calls < 3:
+ raise Exception('unexpected exception')
+ else:
+ self.orig_TopicConsumer_consume(myself, *args, **kwargs)
+
+ self.consume_calls = 0
+ self.orig_TopicConsumer_consume = impl_kombu.TopicConsumer.consume
+ self.stubs.Set(impl_kombu.TopicConsumer, 'consume',
+ my_TopicConsumer_consume)
+ self.stubs.Set(time, 'sleep', self.my_time_sleep)
+
+ value = 42
+ result = self.rpc.call(FLAGS, self.context, self.topic,
+ {"method": "echo",
+ "args": {"value": value}})
+ self.assertEqual(value, result)
+
+ def test_replyproxy_consume_thread_unexpected_exceptions(self):
+
+ def my_DirectConsumer_consume(myself, *args, **kwargs):
+ self.consume_calls += 1
+ # see if it can sustain three failures
+ if self.consume_calls < 3:
+ raise Exception('unexpected exception')
+ else:
+ self.orig_DirectConsumer_consume(myself, *args, **kwargs)
+
+ self.consume_calls = 1
+ self.orig_DirectConsumer_consume = impl_kombu.DirectConsumer.consume
+ self.stubs.Set(impl_kombu.DirectConsumer, 'consume',
+ my_DirectConsumer_consume)
+ self.stubs.Set(time, 'sleep', self.my_time_sleep)
+
+ value = 42
+ result = self.rpc.call(FLAGS, self.context, self.topic,
+ {"method": "echo",
+ "args": {"value": value}})
+ self.assertEqual(value, result)
+
+ def test_reconnect_max_retries(self):
+ self.config(rabbit_hosts=[
+ 'host1:1234', 'host2:5678', '[::1]:2345',
+ '[2001:0db8:85a3:0042:0000:8a2e:0370:7334]'],
+ rabbit_max_retries=2,
+ rabbit_retry_interval=0.1,
+ rabbit_retry_backoff=0.1)
+
+ info = {'attempt': 0}
+
+ class MyConnection(kombu.connection.BrokerConnection):
+ def __init__(self, *args, **params):
+ super(MyConnection, self).__init__(*args, **params)
+ info['attempt'] += 1
+
+ def connect(self):
+ if info['attempt'] < 3:
+ # the word timeout is important (see impl_kombu.py:486)
+ raise Exception('connection timeout')
+ super(kombu.connection.BrokerConnection, self).connect()
+
+ self.stubs.Set(kombu.connection, 'BrokerConnection', MyConnection)
+
+ self.assertRaises(rpc_common.RPCException, self.rpc.Connection, FLAGS)
+ self.assertEqual(info['attempt'], 2)
+
class RpcKombuHATestCase(utils.BaseTestCase):
def setUp(self):
super(RpcKombuHATestCase, self).setUp()
- KombuStubs.setUp(self)
+
+ self.useFixture(KombuStubs(self))
self.addCleanup(FLAGS.reset)
def test_roundrobin_reconnect(self):
@@ -576,15 +786,13 @@ class RpcKombuHATestCase(utils.BaseTestCase):
]
}
- import kombu.connection
-
class MyConnection(kombu.connection.BrokerConnection):
def __init__(myself, *args, **params):
super(MyConnection, myself).__init__(*args, **params)
self.assertEqual(params,
info['params_list'][info['attempt'] %
len(info['params_list'])])
- info['attempt'] = info['attempt'] + 1
+ info['attempt'] += 1
def connect(myself):
if info['attempt'] < 5:
@@ -601,8 +809,6 @@ class RpcKombuHATestCase(utils.BaseTestCase):
def test_queue_not_declared_ha_if_ha_off(self):
self.config(rabbit_ha_queues=False)
- import kombu.entity
-
def my_declare(myself):
self.assertEqual(None,
(myself.queue_arguments or {}).get('x-ha-policy'))
@@ -615,8 +821,6 @@ class RpcKombuHATestCase(utils.BaseTestCase):
def test_queue_declared_ha_if_ha_on(self):
self.config(rabbit_ha_queues=True)
- import kombu.entity
-
def my_declare(myself):
self.assertEqual('all',
(myself.queue_arguments or {}).get('x-ha-policy'))