From 83570cf31d480284af888eb984227c7e6d2a8dbf Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Tue, 25 Sep 2012 11:33:50 +0100 Subject: DRY up mirrored Rabbit queue declaration Consolidate the setting of x-ha-policy to a single place and add soem documentation. Change-Id: I35751e43c3ac15c142996f291e0ec609cab4948a --- openstack/common/rpc/impl_kombu.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) (limited to 'openstack') diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 9eaa3cd..dda31d9 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -97,6 +97,20 @@ cfg.CONF.register_opts(kombu_opts) LOG = rpc_common.LOG +def _get_queue_arguments(conf): + """Construct the arguments for declaring a queue. + + If the rabbit_ha_queues option is set, we declare a mirrored queue + as described here: + + http://www.rabbitmq.com/ha.html + + Setting x-ha-policy to all means that the queue will be mirrored + to all nodes in the cluster. + """ + return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} + + class ConsumerBase(object): """Consumer base class.""" @@ -215,9 +229,8 @@ class TopicConsumer(ConsumerBase): Other kombu options may be passed as keyword arguments """ # Default options - args = {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} options = {'durable': conf.rabbit_durable_queues, - 'queue_arguments': args, + 'queue_arguments': _get_queue_arguments(conf), 'auto_delete': False, 'exclusive': False} options.update(kwargs) @@ -347,7 +360,7 @@ class NotifyPublisher(TopicPublisher): def __init__(self, conf, channel, topic, **kwargs): self.durable = kwargs.pop('durable', conf.rabbit_durable_queues) - self.rabbit_ha_queues = conf.rabbit_ha_queues + self.queue_arguments = _get_queue_arguments(conf) super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) def reconnect(self, channel): @@ -356,13 +369,12 @@ class NotifyPublisher(TopicPublisher): # NOTE(jerdfelt): Normally the consumer would create the queue, but # we do this to ensure that messages don't get dropped if the # consumer is started after we do - args = {'x-ha-policy': 'all'} if self.rabbit_ha_queues else {} queue = kombu.entity.Queue(channel=channel, exchange=self.exchange, durable=self.durable, name=self.routing_key, routing_key=self.routing_key, - queue_arguments=args) + queue_arguments=self.queue_arguments) queue.declare() -- cgit