diff options
author | Julien Danjou <julien@danjou.info> | 2012-09-26 17:03:37 +0200 |
---|---|---|
committer | Julien Danjou <julien@danjou.info> | 2012-09-27 14:59:43 +0200 |
commit | 83b4cce116ec2f196c916ed238884a935e69235a (patch) | |
tree | 21d50e2c9155cf27bc31d66f332bfe75b676d0dd /tests | |
parent | 8a0c03c9e57926f4bc6c1a0ad9a87b9d59953e6e (diff) | |
download | oslo-83b4cce116ec2f196c916ed238884a935e69235a.tar.gz oslo-83b4cce116ec2f196c916ed238884a935e69235a.tar.xz oslo-83b4cce116ec2f196c916ed238884a935e69235a.zip |
Add exchange_name parameter to TopicConsumer
In Ceilometer, we need to consume messages from different exchanges of type
'topic', so being able to specify the exchange name rather than using the
default in TopicConsumer is necessary.
This fixes bug #1055483
Change-Id: Ibbaec8496f24c0b221cf489258af1e7fe362e84a
Signed-off-by: Julien Danjou <julien@danjou.info>
Diffstat (limited to 'tests')
-rw-r--r-- | tests/unit/rpc/test_kombu.py | 80 | ||||
-rw-r--r-- | tests/unit/rpc/test_qpid.py | 29 |
2 files changed, 109 insertions, 0 deletions
diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py index 16ee63e..5da05de 100644 --- a/tests/unit/rpc/test_kombu.py +++ b/tests/unit/rpc/test_kombu.py @@ -125,6 +125,26 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): self.assertEqual(self.received_message, message) @testutils.skip_if(kombu is None, "Test requires kombu") + def test_topic_send_receive_exchange_name(self): + """Test sending to a topic exchange/queue with an exchange name""" + + conn = self.rpc.create_connection(FLAGS) + message = 'topic test message' + + self.received_message = None + + def _callback(message): + self.received_message = message + + conn.declare_topic_consumer('a_topic', _callback, + exchange_name="foorbar") + conn.topic_send('a_topic', message) + conn.consume(limit=1) + conn.close() + + self.assertEqual(self.received_message, message) + + @testutils.skip_if(kombu is None, "Test requires kombu") def test_topic_multiple_queues(self): """Test sending to a topic exchange with multiple queues""" @@ -150,6 +170,66 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): self.assertEqual(self.received_message_2, message) @testutils.skip_if(kombu is None, "Test requires kombu") + def test_topic_multiple_queues_specify_exchange(self): + """Test sending to a topic exchange with multiple queues and one + exchange + + """ + + conn = self.rpc.create_connection(FLAGS) + message = 'topic test message' + + self.received_message_1 = None + self.received_message_2 = None + + def _callback1(message): + self.received_message_1 = message + + def _callback2(message): + self.received_message_2 = message + + conn.declare_topic_consumer('a_topic', _callback1, queue_name='queue1', + exchange_name="abc") + conn.declare_topic_consumer('a_topic', _callback2, queue_name='queue2', + exchange_name="abc") + conn.topic_send('a_topic', message) + conn.consume(limit=2) + conn.close() + + self.assertEqual(self.received_message_1, message) + self.assertEqual(self.received_message_2, message) + + @testutils.skip_if(kombu is None, "Test requires kombu") + def test_topic_one_queues_multiple_exchange(self): + """Test sending to a topic exchange with one queues and several + exchanges + + """ + + conn = self.rpc.create_connection(FLAGS) + message = 'topic test message' + + self.received_message_1 = None + self.received_message_2 = None + + def _callback1(message): + self.received_message_1 = message + + def _callback2(message): + self.received_message_2 = message + + conn.declare_topic_consumer('a_topic', _callback1, queue_name='queue1', + exchange_name="abc") + conn.declare_topic_consumer('a_topic', _callback2, queue_name='queue2', + exchange_name="def") + conn.topic_send('a_topic', message) + conn.consume(limit=2) + conn.close() + + self.assertEqual(self.received_message_1, message) + self.assertEqual(self.received_message_2, message) + + @testutils.skip_if(kombu is None, "Test requires kombu") def test_direct_send_receive(self): """Test sending to a direct exchange/queue""" conn = self.rpc.create_connection(FLAGS) diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py index 4e0cc29..1f1a595 100644 --- a/tests/unit/rpc/test_qpid.py +++ b/tests/unit/rpc/test_qpid.py @@ -188,6 +188,35 @@ class RpcQpidTestCase(unittest.TestCase): ) connection.close() + @testutils.skip_if(qpid is None, "Test requires qpid") + def test_topic_consumer(self): + self.mock_connection = self.mox.CreateMock(self.orig_connection) + self.mock_session = self.mox.CreateMock(self.orig_session) + self.mock_receiver = self.mox.CreateMock(self.orig_receiver) + + self.mock_connection.opened().AndReturn(False) + self.mock_connection.open() + self.mock_connection.session().AndReturn(self.mock_session) + expected_address = ( + 'foobar/impl_qpid_test ; {"node": {"x-declare": ' + '{"auto-delete": true, "durable": true}, "type": "topic"}, ' + '"create": "always", "link": {"x-declare": {"auto-delete": ' + 'true, "exclusive": false, "durable": false}, "durable": ' + 'true, "name": "impl.qpid.test.workers"}}') + self.mock_session.receiver(expected_address).AndReturn( + self.mock_receiver) + self.mock_receiver.capacity = 1 + self.mock_connection.close() + + self.mox.ReplayAll() + + connection = impl_qpid.create_connection(FLAGS) + connection.declare_topic_consumer("impl_qpid_test", + lambda *_x, **_y: None, + queue_name='impl.qpid.test.workers', + exchange_name='foobar') + connection.close() + def _test_cast(self, fanout, server_params=None): self.mock_connection = self.mox.CreateMock(self.orig_connection) self.mock_session = self.mox.CreateMock(self.orig_session) |