summaryrefslogtreecommitdiffstats
path: root/openstack/common
diff options
context:
space:
mode:
Diffstat (limited to 'openstack/common')
-rw-r--r--openstack/common/rpc/impl_kombu.py8
-rw-r--r--openstack/common/rpc/impl_qpid.py9
2 files changed, 11 insertions, 6 deletions
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index 6c981b2..fc34de1 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -215,7 +215,7 @@ class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
def __init__(self, conf, channel, topic, callback, tag, name=None,
- **kwargs):
+ exchange_name=None, **kwargs):
"""Init a 'topic' queue.
:param channel: the amqp channel to use
@@ -234,7 +234,7 @@ class TopicConsumer(ConsumerBase):
'auto_delete': False,
'exclusive': False}
options.update(kwargs)
- exchange_name = rpc_amqp.get_control_exchange(conf)
+ exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
exchange = kombu.entity.Exchange(name=exchange_name,
type='topic',
durable=options['durable'],
@@ -664,10 +664,12 @@ class Connection(object):
"""
self.declare_consumer(DirectConsumer, topic, callback)
- def declare_topic_consumer(self, topic, callback=None, queue_name=None):
+ def declare_topic_consumer(self, topic, callback=None, queue_name=None,
+ exchange_name=None):
"""Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
+ exchange_name=exchange_name,
),
topic, callback)
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index 97ac7c7..1b81cd9 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -170,7 +170,8 @@ class DirectConsumer(ConsumerBase):
class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
- def __init__(self, conf, session, topic, callback, name=None):
+ def __init__(self, conf, session, topic, callback, name=None,
+ exchange_name=None):
"""Init a 'topic' queue.
:param session: the amqp session to use
@@ -180,7 +181,7 @@ class TopicConsumer(ConsumerBase):
:param name: optional queue name, defaults to topic
"""
- exchange_name = rpc_amqp.get_control_exchange(conf)
+ exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
super(TopicConsumer, self).__init__(session, callback,
"%s/%s" % (exchange_name, topic),
{}, name or topic, {})
@@ -464,10 +465,12 @@ class Connection(object):
"""
self.declare_consumer(DirectConsumer, topic, callback)
- def declare_topic_consumer(self, topic, callback=None, queue_name=None):
+ def declare_topic_consumer(self, topic, callback=None, queue_name=None,
+ exchange_name=None):
"""Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
+ exchange_name=exchange_name,
),
topic, callback)