diff options
author | Mark McLoughlin <markmc@redhat.com> | 2012-09-25 11:33:50 +0100 |
---|---|---|
committer | Mark McLoughlin <markmc@redhat.com> | 2012-09-25 11:55:52 +0100 |
commit | 83570cf31d480284af888eb984227c7e6d2a8dbf (patch) | |
tree | fcd4004cfd2471f71be8a3414f5221514333d94c /openstack/common/rpc/impl_kombu.py | |
parent | 1481849e0d55051f97bed7b1183d07ab724ccaa8 (diff) | |
download | oslo-83570cf31d480284af888eb984227c7e6d2a8dbf.tar.gz oslo-83570cf31d480284af888eb984227c7e6d2a8dbf.tar.xz oslo-83570cf31d480284af888eb984227c7e6d2a8dbf.zip |
DRY up mirrored Rabbit queue declaration
Consolidate the setting of x-ha-policy to a single place and add
soem documentation.
Change-Id: I35751e43c3ac15c142996f291e0ec609cab4948a
Diffstat (limited to 'openstack/common/rpc/impl_kombu.py')
-rw-r--r-- | openstack/common/rpc/impl_kombu.py | 22 |
1 files changed, 17 insertions, 5 deletions
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 9eaa3cd..dda31d9 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -97,6 +97,20 @@ cfg.CONF.register_opts(kombu_opts) LOG = rpc_common.LOG +def _get_queue_arguments(conf): + """Construct the arguments for declaring a queue. + + If the rabbit_ha_queues option is set, we declare a mirrored queue + as described here: + + http://www.rabbitmq.com/ha.html + + Setting x-ha-policy to all means that the queue will be mirrored + to all nodes in the cluster. + """ + return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} + + class ConsumerBase(object): """Consumer base class.""" @@ -215,9 +229,8 @@ class TopicConsumer(ConsumerBase): Other kombu options may be passed as keyword arguments """ # Default options - args = {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} options = {'durable': conf.rabbit_durable_queues, - 'queue_arguments': args, + 'queue_arguments': _get_queue_arguments(conf), 'auto_delete': False, 'exclusive': False} options.update(kwargs) @@ -347,7 +360,7 @@ class NotifyPublisher(TopicPublisher): def __init__(self, conf, channel, topic, **kwargs): self.durable = kwargs.pop('durable', conf.rabbit_durable_queues) - self.rabbit_ha_queues = conf.rabbit_ha_queues + self.queue_arguments = _get_queue_arguments(conf) super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) def reconnect(self, channel): @@ -356,13 +369,12 @@ class NotifyPublisher(TopicPublisher): # NOTE(jerdfelt): Normally the consumer would create the queue, but # we do this to ensure that messages don't get dropped if the # consumer is started after we do - args = {'x-ha-policy': 'all'} if self.rabbit_ha_queues else {} queue = kombu.entity.Queue(channel=channel, exchange=self.exchange, durable=self.durable, name=self.routing_key, routing_key=self.routing_key, - queue_arguments=args) + queue_arguments=self.queue_arguments) queue.declare() |