summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael Wilson <geekinutah@gmail.com>2013-05-23 10:27:08 -0600
committerMichael Wilson <geekinutah@gmail.com>2013-06-12 09:58:18 -0600
commit719eba4cae7f2bc11753b17c65c7597b8d6ed72d (patch)
treeb7e4b0f145e4cde01369cfcc5426db012629f729
parent706fa4b31d1e13ab8774bcd10a917849d14033d9 (diff)
downloadoslo-719eba4cae7f2bc11753b17c65c7597b8d6ed72d.tar.gz
oslo-719eba4cae7f2bc11753b17c65c7597b8d6ed72d.tar.xz
oslo-719eba4cae7f2bc11753b17c65c7597b8d6ed72d.zip
Don't reconnect to exclusive fanout consumers.
See bug #1180166, basically we shouldn't be trying to reconnect to exclusive queues. Fixes: bug #1180166 Change-Id: I71ac5a150ea8c4d97a34404cb821ce8e8dfa152d
-rw-r--r--openstack/common/rpc/impl_qpid.py25
1 files changed, 24 insertions, 1 deletions
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index 7352517..0434fdd 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -118,10 +118,17 @@ class ConsumerBase(object):
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
- self.reconnect(session)
+ self.connect(session)
+
+ def connect(self, session):
+ """Declare the reciever on connect."""
+ self._declare_receiver(session)
def reconnect(self, session):
"""Re-declare the receiver after a qpid reconnect."""
+ self._declare_receiver(session)
+
+ def _declare_receiver(self, session):
self.session = session
self.receiver = session.receiver(self.address)
self.receiver.capacity = 1
@@ -157,6 +164,9 @@ class ConsumerBase(object):
def get_receiver(self):
return self.receiver
+ def get_node_name(self):
+ return self.address.split(';')[0]
+
class DirectConsumer(ConsumerBase):
"""Queue/consumer class for 'direct'."""
@@ -206,6 +216,7 @@ class FanoutConsumer(ConsumerBase):
'topic' is the topic to listen on
'callback' is the callback to call when messages are received
"""
+ self.conf = conf
super(FanoutConsumer, self).__init__(
session, callback,
@@ -214,6 +225,18 @@ class FanoutConsumer(ConsumerBase):
"%s_fanout_%s" % (topic, uuid.uuid4().hex),
{"exclusive": True})
+ def reconnect(self, session):
+ topic = self.get_node_name()
+ params = {
+ 'session': session,
+ 'topic': topic,
+ 'callback': self.callback,
+ }
+
+ self.__init__(conf=self.conf, **params)
+
+ super(FanoutConsumer, self).reconnect(session)
+
class Publisher(object):
"""Base Publisher class."""