summaryrefslogtreecommitdiffstats
path: root/openstack/common
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-07-16 19:04:05 +0000
committerGerrit Code Review <review@openstack.org>2013-07-16 19:04:05 +0000
commit15f45d28c703026d8c607107d632c2fe86a00087 (patch)
tree6fee085f8abb012e0b9d3b85e9610f13af2553d9 /openstack/common
parentcc29fd378207b0f5716fb1a0775fd08f1fde5b20 (diff)
parent5ff534d15a136decd6d076fc1f2cd066806843c9 (diff)
downloadoslo-15f45d28c703026d8c607107d632c2fe86a00087.tar.gz
oslo-15f45d28c703026d8c607107d632c2fe86a00087.tar.xz
oslo-15f45d28c703026d8c607107d632c2fe86a00087.zip
Merge "Add config for amqp durable/auto_delete queues"
Diffstat (limited to 'openstack/common')
-rw-r--r--openstack/common/rpc/amqp.py14
-rw-r--r--openstack/common/rpc/impl_kombu.py13
-rw-r--r--openstack/common/rpc/impl_qpid.py26
3 files changed, 37 insertions, 16 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index 23e0079..1afd2ab 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -34,6 +34,7 @@ from eventlet import greenpool
from eventlet import pools
from eventlet import queue
from eventlet import semaphore
+from oslo.config import cfg
from openstack.common import excutils
from openstack.common.gettextutils import _ # noqa
@@ -42,6 +43,19 @@ from openstack.common import log as logging
from openstack.common.rpc import common as rpc_common
+amqp_opts = [
+ cfg.BoolOpt('amqp_durable_queues',
+ default=False,
+ deprecated_name='rabbit_durable_queues',
+ deprecated_group='DEFAULT',
+ help='Use durable queues in amqp.'),
+ cfg.BoolOpt('amqp_auto_delete',
+ default=False,
+ help='Auto-delete queues in amqp.'),
+]
+
+cfg.CONF.register_opts(amqp_opts)
+
UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index 809fa3d..6b1ae93 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -86,9 +86,6 @@ kombu_opts = [
default=0,
help='maximum retries with trying to connect to RabbitMQ '
'(the default of 0 implies an infinite retry count)'),
- cfg.BoolOpt('rabbit_durable_queues',
- default=False,
- help='use durable queues in RabbitMQ'),
cfg.BoolOpt('rabbit_ha_queues',
default=False,
help='use H/A queues in RabbitMQ (x-ha-policy: all).'
@@ -260,9 +257,9 @@ class TopicConsumer(ConsumerBase):
Other kombu options may be passed as keyword arguments
"""
# Default options
- options = {'durable': conf.rabbit_durable_queues,
+ options = {'durable': conf.amqp_durable_queues,
'queue_arguments': _get_queue_arguments(conf),
- 'auto_delete': False,
+ 'auto_delete': conf.amqp_auto_delete,
'exclusive': False}
options.update(kwargs)
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
@@ -366,8 +363,8 @@ class TopicPublisher(Publisher):
Kombu options may be passed as keyword args to override defaults
"""
- options = {'durable': conf.rabbit_durable_queues,
- 'auto_delete': False,
+ options = {'durable': conf.amqp_durable_queues,
+ 'auto_delete': conf.amqp_auto_delete,
'exclusive': False}
options.update(kwargs)
exchange_name = rpc_amqp.get_control_exchange(conf)
@@ -397,7 +394,7 @@ class NotifyPublisher(TopicPublisher):
"""Publisher class for 'notify'."""
def __init__(self, conf, channel, topic, **kwargs):
- self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
+ self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
self.queue_arguments = _get_queue_arguments(conf)
super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index dbfbdde..7fd835a 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -181,11 +181,16 @@ class DirectConsumer(ConsumerBase):
'callback' is the callback to call when messages are received
"""
- super(DirectConsumer, self).__init__(session, callback,
- "%s/%s" % (msg_id, msg_id),
- {"type": "direct"},
- msg_id,
- {"exclusive": True})
+ super(DirectConsumer, self).__init__(
+ session, callback,
+ "%s/%s" % (msg_id, msg_id),
+ {"type": "direct"},
+ msg_id,
+ {
+ "auto-delete": conf.amqp_auto_delete,
+ "exclusive": True,
+ "durable": conf.amqp_durable_queues,
+ })
class TopicConsumer(ConsumerBase):
@@ -203,9 +208,14 @@ class TopicConsumer(ConsumerBase):
"""
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
- super(TopicConsumer, self).__init__(session, callback,
- "%s/%s" % (exchange_name, topic),
- {}, name or topic, {})
+ super(TopicConsumer, self).__init__(
+ session, callback,
+ "%s/%s" % (exchange_name, topic),
+ {}, name or topic,
+ {
+ "auto-delete": conf.amqp_auto_delete,
+ "durable": conf.amqp_durable_queues,
+ })
class FanoutConsumer(ConsumerBase):