diff options
| author | Paul Mathews <pmathews@bluehost.com> | 2013-07-12 15:24:54 -0600 |
|---|---|---|
| committer | Paul Mathews <pmathews@bluehost.com> | 2013-07-16 09:52:55 -0600 |
| commit | 5ff534d15a136decd6d076fc1f2cd066806843c9 (patch) | |
| tree | 85116bdb5d58c4920925eb2217ddfc8e4b7d6177 /openstack | |
| parent | ca0ca29e3f04ced20f62c33ad8b5bf7492775824 (diff) | |
| download | oslo-5ff534d15a136decd6d076fc1f2cd066806843c9.tar.gz oslo-5ff534d15a136decd6d076fc1f2cd066806843c9.tar.xz oslo-5ff534d15a136decd6d076fc1f2cd066806843c9.zip | |
Add config for amqp durable/auto_delete queues
This patch makes two new config options (amqp_durable_queues and
amqp_auto_delete) that enable durable and auto delete queues in amqp.
Fixes: bug #1033915
DocImpact
Change-Id: I56e5c92f1ed8ac2d429a306f3f38a963f0138c6c
Diffstat (limited to 'openstack')
| -rw-r--r-- | openstack/common/rpc/amqp.py | 14 | ||||
| -rw-r--r-- | openstack/common/rpc/impl_kombu.py | 13 | ||||
| -rw-r--r-- | openstack/common/rpc/impl_qpid.py | 26 |
3 files changed, 37 insertions, 16 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index 23e0079..1afd2ab 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -34,6 +34,7 @@ from eventlet import greenpool from eventlet import pools from eventlet import queue from eventlet import semaphore +from oslo.config import cfg from openstack.common import excutils from openstack.common.gettextutils import _ # noqa @@ -42,6 +43,19 @@ from openstack.common import log as logging from openstack.common.rpc import common as rpc_common +amqp_opts = [ + cfg.BoolOpt('amqp_durable_queues', + default=False, + deprecated_name='rabbit_durable_queues', + deprecated_group='DEFAULT', + help='Use durable queues in amqp.'), + cfg.BoolOpt('amqp_auto_delete', + default=False, + help='Auto-delete queues in amqp.'), +] + +cfg.CONF.register_opts(amqp_opts) + UNIQUE_ID = '_unique_id' LOG = logging.getLogger(__name__) diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 809fa3d..6b1ae93 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -86,9 +86,6 @@ kombu_opts = [ default=0, help='maximum retries with trying to connect to RabbitMQ ' '(the default of 0 implies an infinite retry count)'), - cfg.BoolOpt('rabbit_durable_queues', - default=False, - help='use durable queues in RabbitMQ'), cfg.BoolOpt('rabbit_ha_queues', default=False, help='use H/A queues in RabbitMQ (x-ha-policy: all).' @@ -260,9 +257,9 @@ class TopicConsumer(ConsumerBase): Other kombu options may be passed as keyword arguments """ # Default options - options = {'durable': conf.rabbit_durable_queues, + options = {'durable': conf.amqp_durable_queues, 'queue_arguments': _get_queue_arguments(conf), - 'auto_delete': False, + 'auto_delete': conf.amqp_auto_delete, 'exclusive': False} options.update(kwargs) exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) @@ -366,8 +363,8 @@ class TopicPublisher(Publisher): Kombu options may be passed as keyword args to override defaults """ - options = {'durable': conf.rabbit_durable_queues, - 'auto_delete': False, + options = {'durable': conf.amqp_durable_queues, + 'auto_delete': conf.amqp_auto_delete, 'exclusive': False} options.update(kwargs) exchange_name = rpc_amqp.get_control_exchange(conf) @@ -397,7 +394,7 @@ class NotifyPublisher(TopicPublisher): """Publisher class for 'notify'.""" def __init__(self, conf, channel, topic, **kwargs): - self.durable = kwargs.pop('durable', conf.rabbit_durable_queues) + self.durable = kwargs.pop('durable', conf.amqp_durable_queues) self.queue_arguments = _get_queue_arguments(conf) super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index dbfbdde..7fd835a 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -181,11 +181,16 @@ class DirectConsumer(ConsumerBase): 'callback' is the callback to call when messages are received """ - super(DirectConsumer, self).__init__(session, callback, - "%s/%s" % (msg_id, msg_id), - {"type": "direct"}, - msg_id, - {"exclusive": True}) + super(DirectConsumer, self).__init__( + session, callback, + "%s/%s" % (msg_id, msg_id), + {"type": "direct"}, + msg_id, + { + "auto-delete": conf.amqp_auto_delete, + "exclusive": True, + "durable": conf.amqp_durable_queues, + }) class TopicConsumer(ConsumerBase): @@ -203,9 +208,14 @@ class TopicConsumer(ConsumerBase): """ exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) - super(TopicConsumer, self).__init__(session, callback, - "%s/%s" % (exchange_name, topic), - {}, name or topic, {}) + super(TopicConsumer, self).__init__( + session, callback, + "%s/%s" % (exchange_name, topic), + {}, name or topic, + { + "auto-delete": conf.amqp_auto_delete, + "durable": conf.amqp_durable_queues, + }) class FanoutConsumer(ConsumerBase): |
