diff options
| author | Jenkins <jenkins@review.openstack.org> | 2012-09-25 14:53:14 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2012-09-25 14:53:14 +0000 |
| commit | 411f7f0999e907193012f2afec9c05aaf9b31981 (patch) | |
| tree | 1911ff1c53a62c95f7051c3d02cefc90effe3c45 /openstack | |
| parent | 974989d9b1b67ad40e22034b60f9b2640e9f5824 (diff) | |
| parent | 83570cf31d480284af888eb984227c7e6d2a8dbf (diff) | |
| download | oslo-411f7f0999e907193012f2afec9c05aaf9b31981.tar.gz oslo-411f7f0999e907193012f2afec9c05aaf9b31981.tar.xz oslo-411f7f0999e907193012f2afec9c05aaf9b31981.zip | |
Merge "DRY up mirrored Rabbit queue declaration"
Diffstat (limited to 'openstack')
| -rw-r--r-- | openstack/common/rpc/impl_kombu.py | 22 |
1 files changed, 17 insertions, 5 deletions
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 9eaa3cd..dda31d9 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -97,6 +97,20 @@ cfg.CONF.register_opts(kombu_opts) LOG = rpc_common.LOG +def _get_queue_arguments(conf): + """Construct the arguments for declaring a queue. + + If the rabbit_ha_queues option is set, we declare a mirrored queue + as described here: + + http://www.rabbitmq.com/ha.html + + Setting x-ha-policy to all means that the queue will be mirrored + to all nodes in the cluster. + """ + return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} + + class ConsumerBase(object): """Consumer base class.""" @@ -215,9 +229,8 @@ class TopicConsumer(ConsumerBase): Other kombu options may be passed as keyword arguments """ # Default options - args = {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} options = {'durable': conf.rabbit_durable_queues, - 'queue_arguments': args, + 'queue_arguments': _get_queue_arguments(conf), 'auto_delete': False, 'exclusive': False} options.update(kwargs) @@ -347,7 +360,7 @@ class NotifyPublisher(TopicPublisher): def __init__(self, conf, channel, topic, **kwargs): self.durable = kwargs.pop('durable', conf.rabbit_durable_queues) - self.rabbit_ha_queues = conf.rabbit_ha_queues + self.queue_arguments = _get_queue_arguments(conf) super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) def reconnect(self, channel): @@ -356,13 +369,12 @@ class NotifyPublisher(TopicPublisher): # NOTE(jerdfelt): Normally the consumer would create the queue, but # we do this to ensure that messages don't get dropped if the # consumer is started after we do - args = {'x-ha-policy': 'all'} if self.rabbit_ha_queues else {} queue = kombu.entity.Queue(channel=channel, exchange=self.exchange, durable=self.durable, name=self.routing_key, routing_key=self.routing_key, - queue_arguments=args) + queue_arguments=self.queue_arguments) queue.declare() |
