summaryrefslogtreecommitdiffstats
path: root/openstack
diff options
context:
space:
mode:
authorPaul Mathews <pmathews@bluehost.com>2013-07-12 15:24:54 -0600
committerPaul Mathews <pmathews@bluehost.com>2013-07-16 09:52:55 -0600
commit5ff534d15a136decd6d076fc1f2cd066806843c9 (patch)
tree85116bdb5d58c4920925eb2217ddfc8e4b7d6177 /openstack
parentca0ca29e3f04ced20f62c33ad8b5bf7492775824 (diff)
downloadoslo-5ff534d15a136decd6d076fc1f2cd066806843c9.tar.gz
oslo-5ff534d15a136decd6d076fc1f2cd066806843c9.tar.xz
oslo-5ff534d15a136decd6d076fc1f2cd066806843c9.zip
Add config for amqp durable/auto_delete queues
This patch makes two new config options (amqp_durable_queues and amqp_auto_delete) that enable durable and auto delete queues in amqp. Fixes: bug #1033915 DocImpact Change-Id: I56e5c92f1ed8ac2d429a306f3f38a963f0138c6c
Diffstat (limited to 'openstack')
-rw-r--r--openstack/common/rpc/amqp.py14
-rw-r--r--openstack/common/rpc/impl_kombu.py13
-rw-r--r--openstack/common/rpc/impl_qpid.py26
3 files changed, 37 insertions, 16 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index 23e0079..1afd2ab 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -34,6 +34,7 @@ from eventlet import greenpool
from eventlet import pools
from eventlet import queue
from eventlet import semaphore
+from oslo.config import cfg
from openstack.common import excutils
from openstack.common.gettextutils import _ # noqa
@@ -42,6 +43,19 @@ from openstack.common import log as logging
from openstack.common.rpc import common as rpc_common
+amqp_opts = [
+ cfg.BoolOpt('amqp_durable_queues',
+ default=False,
+ deprecated_name='rabbit_durable_queues',
+ deprecated_group='DEFAULT',
+ help='Use durable queues in amqp.'),
+ cfg.BoolOpt('amqp_auto_delete',
+ default=False,
+ help='Auto-delete queues in amqp.'),
+]
+
+cfg.CONF.register_opts(amqp_opts)
+
UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index 809fa3d..6b1ae93 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -86,9 +86,6 @@ kombu_opts = [
default=0,
help='maximum retries with trying to connect to RabbitMQ '
'(the default of 0 implies an infinite retry count)'),
- cfg.BoolOpt('rabbit_durable_queues',
- default=False,
- help='use durable queues in RabbitMQ'),
cfg.BoolOpt('rabbit_ha_queues',
default=False,
help='use H/A queues in RabbitMQ (x-ha-policy: all).'
@@ -260,9 +257,9 @@ class TopicConsumer(ConsumerBase):
Other kombu options may be passed as keyword arguments
"""
# Default options
- options = {'durable': conf.rabbit_durable_queues,
+ options = {'durable': conf.amqp_durable_queues,
'queue_arguments': _get_queue_arguments(conf),
- 'auto_delete': False,
+ 'auto_delete': conf.amqp_auto_delete,
'exclusive': False}
options.update(kwargs)
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
@@ -366,8 +363,8 @@ class TopicPublisher(Publisher):
Kombu options may be passed as keyword args to override defaults
"""
- options = {'durable': conf.rabbit_durable_queues,
- 'auto_delete': False,
+ options = {'durable': conf.amqp_durable_queues,
+ 'auto_delete': conf.amqp_auto_delete,
'exclusive': False}
options.update(kwargs)
exchange_name = rpc_amqp.get_control_exchange(conf)
@@ -397,7 +394,7 @@ class NotifyPublisher(TopicPublisher):
"""Publisher class for 'notify'."""
def __init__(self, conf, channel, topic, **kwargs):
- self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
+ self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
self.queue_arguments = _get_queue_arguments(conf)
super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index dbfbdde..7fd835a 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -181,11 +181,16 @@ class DirectConsumer(ConsumerBase):
'callback' is the callback to call when messages are received
"""
- super(DirectConsumer, self).__init__(session, callback,
- "%s/%s" % (msg_id, msg_id),
- {"type": "direct"},
- msg_id,
- {"exclusive": True})
+ super(DirectConsumer, self).__init__(
+ session, callback,
+ "%s/%s" % (msg_id, msg_id),
+ {"type": "direct"},
+ msg_id,
+ {
+ "auto-delete": conf.amqp_auto_delete,
+ "exclusive": True,
+ "durable": conf.amqp_durable_queues,
+ })
class TopicConsumer(ConsumerBase):
@@ -203,9 +208,14 @@ class TopicConsumer(ConsumerBase):
"""
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
- super(TopicConsumer, self).__init__(session, callback,
- "%s/%s" % (exchange_name, topic),
- {}, name or topic, {})
+ super(TopicConsumer, self).__init__(
+ session, callback,
+ "%s/%s" % (exchange_name, topic),
+ {}, name or topic,
+ {
+ "auto-delete": conf.amqp_auto_delete,
+ "durable": conf.amqp_durable_queues,
+ })
class FanoutConsumer(ConsumerBase):