diff options
author | Michael Wilson <geekinutah@gmail.com> | 2013-05-23 10:27:08 -0600 |
---|---|---|
committer | Michael Wilson <geekinutah@gmail.com> | 2013-06-12 09:58:18 -0600 |
commit | 719eba4cae7f2bc11753b17c65c7597b8d6ed72d (patch) | |
tree | b7e4b0f145e4cde01369cfcc5426db012629f729 | |
parent | 706fa4b31d1e13ab8774bcd10a917849d14033d9 (diff) | |
download | oslo-719eba4cae7f2bc11753b17c65c7597b8d6ed72d.tar.gz oslo-719eba4cae7f2bc11753b17c65c7597b8d6ed72d.tar.xz oslo-719eba4cae7f2bc11753b17c65c7597b8d6ed72d.zip |
Don't reconnect to exclusive fanout consumers.
See bug #1180166, basically we shouldn't be trying to reconnect to
exclusive queues.
Fixes: bug #1180166
Change-Id: I71ac5a150ea8c4d97a34404cb821ce8e8dfa152d
-rw-r--r-- | openstack/common/rpc/impl_qpid.py | 25 |
1 files changed, 24 insertions, 1 deletions
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index 7352517..0434fdd 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -118,10 +118,17 @@ class ConsumerBase(object): self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) - self.reconnect(session) + self.connect(session) + + def connect(self, session): + """Declare the reciever on connect.""" + self._declare_receiver(session) def reconnect(self, session): """Re-declare the receiver after a qpid reconnect.""" + self._declare_receiver(session) + + def _declare_receiver(self, session): self.session = session self.receiver = session.receiver(self.address) self.receiver.capacity = 1 @@ -157,6 +164,9 @@ class ConsumerBase(object): def get_receiver(self): return self.receiver + def get_node_name(self): + return self.address.split(';')[0] + class DirectConsumer(ConsumerBase): """Queue/consumer class for 'direct'.""" @@ -206,6 +216,7 @@ class FanoutConsumer(ConsumerBase): 'topic' is the topic to listen on 'callback' is the callback to call when messages are received """ + self.conf = conf super(FanoutConsumer, self).__init__( session, callback, @@ -214,6 +225,18 @@ class FanoutConsumer(ConsumerBase): "%s_fanout_%s" % (topic, uuid.uuid4().hex), {"exclusive": True}) + def reconnect(self, session): + topic = self.get_node_name() + params = { + 'session': session, + 'topic': topic, + 'callback': self.callback, + } + + self.__init__(conf=self.conf, **params) + + super(FanoutConsumer, self).reconnect(session) + class Publisher(object): """Base Publisher class.""" |