diff options
| author | Doug Hellmann <doug.hellmann@dreamhost.com> | 2012-05-18 15:13:20 -0400 |
|---|---|---|
| committer | Doug Hellmann <doug.hellmann@dreamhost.com> | 2012-05-24 15:42:30 -0400 |
| commit | 89fccd50d740a0cd68e7993e94536e68641d3337 (patch) | |
| tree | 4b47834cdf125631eb23d90ea7f12f15d2616a65 /nova/tests | |
| parent | d7eef34fdea8f94068dd2d745cba1ae493af61a9 (diff) | |
add queue name argument to TopicConsumer
ceilometer is going to want to subscribe several worker processes
to the notifications.info topic queue. The pool of workers needs
to be assured of receiving all messages, without interference from
other clients listening for notifications. The TopicConsumer
class always assumes the topic and queue name should be the same,
but in the ceilometer case we want to use a separate named queue
with a different name. The only parameter that cannot be
controlled by the user of TopicConsumer is the queue name,
so this change adds the ability to set the queue name.
Change-Id: I41a525de2fd855ca30c24fafcfbfefd6ab615dd7
Diffstat (limited to 'nova/tests')
| -rw-r--r-- | nova/tests/rpc/test_kombu.py | 25 | ||||
| -rw-r--r-- | nova/tests/rpc/test_qpid.py | 29 |
2 files changed, 54 insertions, 0 deletions
diff --git a/nova/tests/rpc/test_kombu.py b/nova/tests/rpc/test_kombu.py index 837495273..a69dcdfe9 100644 --- a/nova/tests/rpc/test_kombu.py +++ b/nova/tests/rpc/test_kombu.py @@ -102,6 +102,31 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): self.assertEqual(self.received_message, message) @test.skip_if(kombu is None, "Test requires kombu") + def test_topic_multiple_queues(self): + """Test sending to a topic exchange with multiple queues""" + + conn = self.rpc.create_connection(FLAGS) + message = 'topic test message' + + self.received_message_1 = None + self.received_message_2 = None + + def _callback1(message): + self.received_message_1 = message + + def _callback2(message): + self.received_message_2 = message + + conn.declare_topic_consumer('a_topic', _callback1, queue_name='queue1') + conn.declare_topic_consumer('a_topic', _callback2, queue_name='queue2') + conn.topic_send('a_topic', message) + conn.consume(limit=2) + conn.close() + + self.assertEqual(self.received_message_1, message) + self.assertEqual(self.received_message_2, message) + + @test.skip_if(kombu is None, "Test requires kombu") def test_direct_send_receive(self): """Test sending to a direct exchange/queue""" conn = self.rpc.create_connection(FLAGS) diff --git a/nova/tests/rpc/test_qpid.py b/nova/tests/rpc/test_qpid.py index f01c8a25b..c523f3fe3 100644 --- a/nova/tests/rpc/test_qpid.py +++ b/nova/tests/rpc/test_qpid.py @@ -147,6 +147,35 @@ class RpcQpidTestCase(test.TestCase): def test_create_consumer_fanout(self): self._test_create_consumer(fanout=True) + @test.skip_if(qpid is None, "Test requires qpid") + def test_create_worker(self): + self.mock_connection = self.mox.CreateMock(self.orig_connection) + self.mock_session = self.mox.CreateMock(self.orig_session) + self.mock_receiver = self.mox.CreateMock(self.orig_receiver) + + self.mock_connection.opened().AndReturn(False) + self.mock_connection.open() + self.mock_connection.session().AndReturn(self.mock_session) + expected_address = ( + 'nova/impl_qpid_test ; {"node": {"x-declare": ' + '{"auto-delete": true, "durable": true}, "type": "topic"}, ' + '"create": "always", "link": {"x-declare": {"auto-delete": ' + 'true, "exclusive": false, "durable": false}, "durable": ' + 'true, "name": "impl.qpid.test.workers"}}') + self.mock_session.receiver(expected_address).AndReturn( + self.mock_receiver) + self.mock_receiver.capacity = 1 + self.mock_connection.close() + + self.mox.ReplayAll() + + connection = impl_qpid.create_connection(FLAGS) + connection.create_worker("impl_qpid_test", + lambda *_x, **_y: None, + 'impl.qpid.test.workers', + ) + connection.close() + def _test_cast(self, fanout, server_params=None): self.mock_connection = self.mox.CreateMock(self.orig_connection) self.mock_session = self.mox.CreateMock(self.orig_session) |
