summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-07-22 21:00:54 +0000
committerGerrit Code Review <review@openstack.org>2013-07-22 21:00:54 +0000
commite1ccaf46205057e6a557fdee9f00ec9962d4e292 (patch)
tree429a11683c571e278a2a3b572410022c0835b77f
parentcd78a6dbd48c346aabbc5554386d42ac5a4a5771 (diff)
parent34a684200e2f34be2f7c0b86da48c0c46691fe46 (diff)
downloadoslo-e1ccaf46205057e6a557fdee9f00ec9962d4e292.tar.gz
oslo-e1ccaf46205057e6a557fdee9f00ec9962d4e292.tar.xz
oslo-e1ccaf46205057e6a557fdee9f00ec9962d4e292.zip
Merge "On reconnecting a FanoutConsumer, don't grow the topic name"
-rw-r--r--openstack/common/rpc/impl_qpid.py2
-rw-r--r--tests/unit/rpc/test_qpid.py42
2 files changed, 43 insertions, 1 deletions
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index 7fd835a..99c4619 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -238,7 +238,7 @@ class FanoutConsumer(ConsumerBase):
{"exclusive": True})
def reconnect(self, session):
- topic = self.get_node_name()
+ topic = self.get_node_name().rpartition('_fanout')[0]
params = {
'session': session,
'topic': topic,
diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py
index 9ef0dcd..a4bce1c 100644
--- a/tests/unit/rpc/test_qpid.py
+++ b/tests/unit/rpc/test_qpid.py
@@ -649,6 +649,48 @@ class RpcQpidTestCase(utils.BaseTestCase):
lambda *_x, **_y: None)
connection.close()
+ def test_fanout_reconnect(self):
+ expected_address = mox.Regex(
+ r'^impl_qpid_test_fanout ; '
+ '{"node": {"x-declare": {"auto-delete": true, "durable": '
+ 'false, "type": "fanout"}, "type": "topic"}, "create": '
+ '"always", "link": {"x-declare": {"auto-delete": true, '
+ '"exclusive": true, "durable": false}, "durable": true, '
+ '"name": "impl_qpid_test_fanout_.*"}}$')
+ self.mock_connection = self.mox.CreateMock(self.orig_connection)
+ self.mock_session = self.mox.CreateMock(self.orig_session)
+ self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
+
+ # First connection and create_consumer
+ self.mock_connection.opened().AndReturn(False)
+ self.mock_connection.open()
+ self.mock_connection.session().AndReturn(self.mock_session)
+ self.mock_session.receiver(expected_address).AndReturn(
+ self.mock_receiver)
+ self.mock_receiver.capacity = 1
+
+ # Now call reconnect
+ self.mock_connection.opened().AndReturn(False)
+ self.mock_connection.open()
+ self.mock_connection.session().AndReturn(self.mock_session)
+ # FIXME(wilsonmh): connect is currently part of __init__(),
+ # causing a reconnect to declare two receivers :(
+ self.mock_session.receiver(expected_address).AndReturn(
+ self.mock_receiver)
+ self.mock_session.receiver(expected_address).AndReturn(
+ self.mock_receiver)
+ self.mock_receiver.capacity = 1
+ self.mock_connection.close()
+
+ self.mox.ReplayAll()
+
+ connection = impl_qpid.create_connection(FLAGS)
+ connection.create_consumer("impl_qpid_test",
+ None,
+ True)
+ connection.reconnect()
+ connection.close()
+
#
#from nova.tests.rpc import common