diff options
| author | Jenkins <jenkins@review.openstack.org> | 2013-07-16 19:04:05 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2013-07-16 19:04:05 +0000 |
| commit | 15f45d28c703026d8c607107d632c2fe86a00087 (patch) | |
| tree | 6fee085f8abb012e0b9d3b85e9610f13af2553d9 /openstack/common | |
| parent | cc29fd378207b0f5716fb1a0775fd08f1fde5b20 (diff) | |
| parent | 5ff534d15a136decd6d076fc1f2cd066806843c9 (diff) | |
| download | oslo-15f45d28c703026d8c607107d632c2fe86a00087.tar.gz oslo-15f45d28c703026d8c607107d632c2fe86a00087.tar.xz oslo-15f45d28c703026d8c607107d632c2fe86a00087.zip | |
Merge "Add config for amqp durable/auto_delete queues"
Diffstat (limited to 'openstack/common')
| -rw-r--r-- | openstack/common/rpc/amqp.py | 14 | ||||
| -rw-r--r-- | openstack/common/rpc/impl_kombu.py | 13 | ||||
| -rw-r--r-- | openstack/common/rpc/impl_qpid.py | 26 |
3 files changed, 37 insertions, 16 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index 23e0079..1afd2ab 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -34,6 +34,7 @@ from eventlet import greenpool from eventlet import pools from eventlet import queue from eventlet import semaphore +from oslo.config import cfg from openstack.common import excutils from openstack.common.gettextutils import _ # noqa @@ -42,6 +43,19 @@ from openstack.common import log as logging from openstack.common.rpc import common as rpc_common +amqp_opts = [ + cfg.BoolOpt('amqp_durable_queues', + default=False, + deprecated_name='rabbit_durable_queues', + deprecated_group='DEFAULT', + help='Use durable queues in amqp.'), + cfg.BoolOpt('amqp_auto_delete', + default=False, + help='Auto-delete queues in amqp.'), +] + +cfg.CONF.register_opts(amqp_opts) + UNIQUE_ID = '_unique_id' LOG = logging.getLogger(__name__) diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 809fa3d..6b1ae93 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -86,9 +86,6 @@ kombu_opts = [ default=0, help='maximum retries with trying to connect to RabbitMQ ' '(the default of 0 implies an infinite retry count)'), - cfg.BoolOpt('rabbit_durable_queues', - default=False, - help='use durable queues in RabbitMQ'), cfg.BoolOpt('rabbit_ha_queues', default=False, help='use H/A queues in RabbitMQ (x-ha-policy: all).' @@ -260,9 +257,9 @@ class TopicConsumer(ConsumerBase): Other kombu options may be passed as keyword arguments """ # Default options - options = {'durable': conf.rabbit_durable_queues, + options = {'durable': conf.amqp_durable_queues, 'queue_arguments': _get_queue_arguments(conf), - 'auto_delete': False, + 'auto_delete': conf.amqp_auto_delete, 'exclusive': False} options.update(kwargs) exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) @@ -366,8 +363,8 @@ class TopicPublisher(Publisher): Kombu options may be passed as keyword args to override defaults """ - options = {'durable': conf.rabbit_durable_queues, - 'auto_delete': False, + options = {'durable': conf.amqp_durable_queues, + 'auto_delete': conf.amqp_auto_delete, 'exclusive': False} options.update(kwargs) exchange_name = rpc_amqp.get_control_exchange(conf) @@ -397,7 +394,7 @@ class NotifyPublisher(TopicPublisher): """Publisher class for 'notify'.""" def __init__(self, conf, channel, topic, **kwargs): - self.durable = kwargs.pop('durable', conf.rabbit_durable_queues) + self.durable = kwargs.pop('durable', conf.amqp_durable_queues) self.queue_arguments = _get_queue_arguments(conf) super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index dbfbdde..7fd835a 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -181,11 +181,16 @@ class DirectConsumer(ConsumerBase): 'callback' is the callback to call when messages are received """ - super(DirectConsumer, self).__init__(session, callback, - "%s/%s" % (msg_id, msg_id), - {"type": "direct"}, - msg_id, - {"exclusive": True}) + super(DirectConsumer, self).__init__( + session, callback, + "%s/%s" % (msg_id, msg_id), + {"type": "direct"}, + msg_id, + { + "auto-delete": conf.amqp_auto_delete, + "exclusive": True, + "durable": conf.amqp_durable_queues, + }) class TopicConsumer(ConsumerBase): @@ -203,9 +208,14 @@ class TopicConsumer(ConsumerBase): """ exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) - super(TopicConsumer, self).__init__(session, callback, - "%s/%s" % (exchange_name, topic), - {}, name or topic, {}) + super(TopicConsumer, self).__init__( + session, callback, + "%s/%s" % (exchange_name, topic), + {}, name or topic, + { + "auto-delete": conf.amqp_auto_delete, + "durable": conf.amqp_durable_queues, + }) class FanoutConsumer(ConsumerBase): |
