summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2011-12-29 18:53:32 +0000
committerGerrit Code Review <review@openstack.org>2011-12-29 18:53:32 +0000
commit2654a3aa2d70f28c8bce4b49d270ca9c7ba97caf (patch)
tree20f540e65cede051eee790cdf6a56dc0a5525c87
parentb9500ff068d699ad6c846e0be05bf604900d723e (diff)
parent28ac41efda3869656d007db21e9dfb832d4c770e (diff)
downloadnova-2654a3aa2d70f28c8bce4b49d270ca9c7ba97caf.tar.gz
nova-2654a3aa2d70f28c8bce4b49d270ca9c7ba97caf.tar.xz
nova-2654a3aa2d70f28c8bce4b49d270ca9c7ba97caf.zip
Merge "Ensure queue is declared durable so messages aren't dropped"
-rw-r--r--nova/rpc/impl_kombu.py31
1 files changed, 28 insertions, 3 deletions
diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py
index 8da8ff1ad..810a6629e 100644
--- a/nova/rpc/impl_kombu.py
+++ b/nova/rpc/impl_kombu.py
@@ -290,6 +290,27 @@ class FanoutPublisher(Publisher):
**options)
+class NotifyPublisher(TopicPublisher):
+ """Publisher class for 'notify'"""
+
+ def __init__(self, *args, **kwargs):
+ self.durable = kwargs.pop('durable', FLAGS.rabbit_durable_queues)
+ super(NotifyPublisher, self).__init__(*args, **kwargs)
+
+ def reconnect(self, channel):
+ super(NotifyPublisher, self).reconnect(channel)
+
+ # NOTE(jerdfelt): Normally the consumer would create the queue, but
+ # we do this to ensure that messages don't get dropped if the
+ # consumer is started after we do
+ queue = kombu.entity.Queue(channel=channel,
+ exchange=self.exchange,
+ durable=self.durable,
+ name=self.routing_key,
+ routing_key=self.routing_key)
+ queue.declare()
+
+
class Connection(object):
"""Connection object."""
@@ -461,14 +482,18 @@ class Connection(object):
"""Send a 'direct' message"""
self.publisher_send(DirectPublisher, msg_id, msg)
- def topic_send(self, topic, msg, **kwargs):
+ def topic_send(self, topic, msg):
"""Send a 'topic' message"""
- self.publisher_send(TopicPublisher, topic, msg, **kwargs)
+ self.publisher_send(TopicPublisher, topic, msg)
def fanout_send(self, topic, msg):
"""Send a 'fanout' message"""
self.publisher_send(FanoutPublisher, topic, msg)
+ def notify_send(self, topic, msg, **kwargs):
+ """Send a notify message on a topic"""
+ self.publisher_send(NotifyPublisher, topic, msg, **kwargs)
+
def consume(self, limit=None):
"""Consume from all queues/consumers"""
it = self.iterconsume(limit=limit)
@@ -778,7 +803,7 @@ def notify(context, topic, msg):
LOG.debug(_('Sending notification on %s...'), topic)
_pack_context(msg, context)
with ConnectionContext() as conn:
- conn.topic_send(topic, msg, durable=True)
+ conn.notify_send(topic, msg, durable=True)
def msg_reply(msg_id, reply=None, failure=None, ending=False):