summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChris Behrens <cbehrens@codestud.com>2011-08-31 11:54:19 -0700
committerChris Behrens <cbehrens@codestud.com>2011-08-31 11:54:19 -0700
commitba8163fed57bcd1948be4cfb021fb32391702cc5 (patch)
tree01fb783fd86774da38b96301d5a006f54da1ffdd
parente43ffb5137ba256a21b3241b549d7c66cb7e5e04 (diff)
kludge for kombu 1.1.3 memory transport bug
-rw-r--r--nova/rpc/impl_kombu.py12
1 files changed, 11 insertions, 1 deletions
diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py
index ab70e7cfb..b994a6a10 100644
--- a/nova/rpc/impl_kombu.py
+++ b/nova/rpc/impl_kombu.py
@@ -303,6 +303,7 @@ class Connection(object):
self.interval_stepping = FLAGS.rabbit_retry_backoff
# max retry-interval = 30 seconds
self.interval_max = 30
+ self.memory_transport = False
self.params = dict(hostname=FLAGS.rabbit_host,
port=FLAGS.rabbit_port,
@@ -311,6 +312,9 @@ class Connection(object):
virtual_host=FLAGS.rabbit_virtual_host)
if FLAGS.fake_rabbit:
self.params['transport'] = 'memory'
+ self.memory_transport = True
+ else:
+ self.memory_transport = False
self.connection = None
self.reconnect()
@@ -323,7 +327,7 @@ class Connection(object):
pass
time.sleep(1)
self.connection = kombu.connection.BrokerConnection(**self.params)
- if FLAGS.fake_rabbit:
+ if self.memory_transport:
# Kludge to speed up tests.
self.connection.transport.polling_interval = 0.0
self.consumer_num = itertools.count(1)
@@ -345,6 +349,9 @@ class Connection(object):
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d' %
self.params))
self.channel = self.connection.channel()
+ # work around 'memory' transport bug in 1.1.3
+ if self.memory_transport:
+ self.channel._new_queue('ae.undeliver')
for consumer in self.consumers:
consumer.reconnect(self.channel)
if self.consumers:
@@ -374,6 +381,9 @@ class Connection(object):
self.cancel_consumer_thread()
self.channel.close()
self.channel = self.connection.channel()
+ # work around 'memory' transport bug in 1.1.3
+ if self.memory_transport:
+ self.channel._new_queue('ae.undeliver')
self.consumers = []
def declare_consumer(self, consumer_cls, topic, callback):