summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-02-14 15:16:46 +0000
committerGerrit Code Review <review@openstack.org>2013-02-14 15:16:46 +0000
commit399f27b4f75e1eadb9ff5f9a2c9328130ee8c7a8 (patch)
treea9099238b338c77aeb6e110b36dd6e823a1a6283 /tests
parentbfd17fd6cf35f26661c8e446f1a42095abc2eaf4 (diff)
parent74a9cbdc86580d256b6cd28ae57cd69213da6c44 (diff)
downloadoslo-399f27b4f75e1eadb9ff5f9a2c9328130ee8c7a8.tar.gz
oslo-399f27b4f75e1eadb9ff5f9a2c9328130ee8c7a8.tar.xz
oslo-399f27b4f75e1eadb9ff5f9a2c9328130ee8c7a8.zip
Merge "Add join_consumer_pool() to RPC connections"
Diffstat (limited to 'tests')
-rw-r--r--tests/unit/rpc/test_kombu.py34
-rw-r--r--tests/unit/rpc/test_qpid.py30
2 files changed, 64 insertions, 0 deletions
diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py
index a201940..1bd23d5 100644
--- a/tests/unit/rpc/test_kombu.py
+++ b/tests/unit/rpc/test_kombu.py
@@ -26,6 +26,8 @@ import contextlib
import logging
import time
+import mock
+
from openstack.common import cfg
from openstack.common import exception
from openstack.common.rpc import amqp as rpc_amqp
@@ -495,6 +497,38 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
#Traceback should be included in exception message
self.assertTrue('exception.ApiError' in unicode(exc))
+ def test_create_worker(self):
+ meth = 'declare_topic_consumer'
+ with mock.patch.object(self.rpc.Connection, meth) as p:
+ conn = self.rpc.create_connection(FLAGS)
+ conn.create_worker(
+ 'topic.name',
+ lambda *a, **k: (a, k),
+ 'pool.name',
+ )
+ p.assert_called_with(
+ 'topic.name',
+ mock.ANY, # the proxy
+ 'pool.name',
+ )
+
+ def test_join_consumer_pool(self):
+ meth = 'declare_topic_consumer'
+ with mock.patch.object(self.rpc.Connection, meth) as p:
+ conn = self.rpc.create_connection(FLAGS)
+ conn.join_consumer_pool(
+ callback=lambda *a, **k: (a, k),
+ pool_name='pool.name',
+ topic='topic.name',
+ exchange_name='exchange.name',
+ )
+ p.assert_called_with(
+ callback=mock.ANY, # the callback wrapper
+ queue_name='pool.name',
+ exchange_name='exchange.name',
+ topic='topic.name',
+ )
+
class RpcKombuHATestCase(utils.BaseTestCase):
def setUp(self):
diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py
index 1df94d5..016c5b9 100644
--- a/tests/unit/rpc/test_qpid.py
+++ b/tests/unit/rpc/test_qpid.py
@@ -184,6 +184,36 @@ class RpcQpidTestCase(utils.BaseTestCase):
)
connection.close()
+ def test_join_consumer_pool(self):
+ self.mock_connection = self.mox.CreateMock(self.orig_connection)
+ self.mock_session = self.mox.CreateMock(self.orig_session)
+ self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
+
+ self.mock_connection.opened().AndReturn(False)
+ self.mock_connection.open()
+ self.mock_connection.session().AndReturn(self.mock_session)
+ expected_address = (
+ 'exchange-name/impl_qpid_test ; {"node": {"x-declare": '
+ '{"auto-delete": true, "durable": true}, "type": "topic"}, '
+ '"create": "always", "link": {"x-declare": {"auto-delete": '
+ 'true, "exclusive": false, "durable": false}, "durable": '
+ 'true, "name": "impl.qpid.test.consumer.pool"}}')
+ self.mock_session.receiver(expected_address).AndReturn(
+ self.mock_receiver)
+ self.mock_receiver.capacity = 1
+ self.mock_connection.close()
+
+ self.mox.ReplayAll()
+
+ connection = impl_qpid.create_connection(FLAGS)
+ connection.join_consumer_pool(
+ callback=lambda *_x, **_y: None,
+ pool_name='impl.qpid.test.consumer.pool',
+ topic="impl_qpid_test",
+ exchange_name='exchange-name',
+ )
+ connection.close()
+
def test_topic_consumer(self):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)