summaryrefslogtreecommitdiffstats
path: root/openstack/common/rpc/impl_kombu.py
diff options
context:
space:
mode:
authorMark McLoughlin <markmc@redhat.com>2012-09-25 11:33:50 +0100
committerMark McLoughlin <markmc@redhat.com>2012-09-25 11:55:52 +0100
commit83570cf31d480284af888eb984227c7e6d2a8dbf (patch)
treefcd4004cfd2471f71be8a3414f5221514333d94c /openstack/common/rpc/impl_kombu.py
parent1481849e0d55051f97bed7b1183d07ab724ccaa8 (diff)
downloadoslo-83570cf31d480284af888eb984227c7e6d2a8dbf.tar.gz
oslo-83570cf31d480284af888eb984227c7e6d2a8dbf.tar.xz
oslo-83570cf31d480284af888eb984227c7e6d2a8dbf.zip
DRY up mirrored Rabbit queue declaration
Consolidate the setting of x-ha-policy to a single place and add soem documentation. Change-Id: I35751e43c3ac15c142996f291e0ec609cab4948a
Diffstat (limited to 'openstack/common/rpc/impl_kombu.py')
-rw-r--r--openstack/common/rpc/impl_kombu.py22
1 files changed, 17 insertions, 5 deletions
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index 9eaa3cd..dda31d9 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -97,6 +97,20 @@ cfg.CONF.register_opts(kombu_opts)
LOG = rpc_common.LOG
+def _get_queue_arguments(conf):
+ """Construct the arguments for declaring a queue.
+
+ If the rabbit_ha_queues option is set, we declare a mirrored queue
+ as described here:
+
+ http://www.rabbitmq.com/ha.html
+
+ Setting x-ha-policy to all means that the queue will be mirrored
+ to all nodes in the cluster.
+ """
+ return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
+
+
class ConsumerBase(object):
"""Consumer base class."""
@@ -215,9 +229,8 @@ class TopicConsumer(ConsumerBase):
Other kombu options may be passed as keyword arguments
"""
# Default options
- args = {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
options = {'durable': conf.rabbit_durable_queues,
- 'queue_arguments': args,
+ 'queue_arguments': _get_queue_arguments(conf),
'auto_delete': False,
'exclusive': False}
options.update(kwargs)
@@ -347,7 +360,7 @@ class NotifyPublisher(TopicPublisher):
def __init__(self, conf, channel, topic, **kwargs):
self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
- self.rabbit_ha_queues = conf.rabbit_ha_queues
+ self.queue_arguments = _get_queue_arguments(conf)
super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
def reconnect(self, channel):
@@ -356,13 +369,12 @@ class NotifyPublisher(TopicPublisher):
# NOTE(jerdfelt): Normally the consumer would create the queue, but
# we do this to ensure that messages don't get dropped if the
# consumer is started after we do
- args = {'x-ha-policy': 'all'} if self.rabbit_ha_queues else {}
queue = kombu.entity.Queue(channel=channel,
exchange=self.exchange,
durable=self.durable,
name=self.routing_key,
routing_key=self.routing_key,
- queue_arguments=args)
+ queue_arguments=self.queue_arguments)
queue.declare()