summaryrefslogtreecommitdiffstats
path: root/openstack/common
diff options
context:
space:
mode:
authorJulien Danjou <julien@danjou.info>2012-09-26 17:03:37 +0200
committerJulien Danjou <julien@danjou.info>2012-09-27 14:59:43 +0200
commit83b4cce116ec2f196c916ed238884a935e69235a (patch)
tree21d50e2c9155cf27bc31d66f332bfe75b676d0dd /openstack/common
parent8a0c03c9e57926f4bc6c1a0ad9a87b9d59953e6e (diff)
downloadoslo-83b4cce116ec2f196c916ed238884a935e69235a.tar.gz
oslo-83b4cce116ec2f196c916ed238884a935e69235a.tar.xz
oslo-83b4cce116ec2f196c916ed238884a935e69235a.zip
Add exchange_name parameter to TopicConsumer
In Ceilometer, we need to consume messages from different exchanges of type 'topic', so being able to specify the exchange name rather than using the default in TopicConsumer is necessary. This fixes bug #1055483 Change-Id: Ibbaec8496f24c0b221cf489258af1e7fe362e84a Signed-off-by: Julien Danjou <julien@danjou.info>
Diffstat (limited to 'openstack/common')
-rw-r--r--openstack/common/rpc/impl_kombu.py8
-rw-r--r--openstack/common/rpc/impl_qpid.py9
2 files changed, 11 insertions, 6 deletions
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index 6c981b2..fc34de1 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -215,7 +215,7 @@ class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
def __init__(self, conf, channel, topic, callback, tag, name=None,
- **kwargs):
+ exchange_name=None, **kwargs):
"""Init a 'topic' queue.
:param channel: the amqp channel to use
@@ -234,7 +234,7 @@ class TopicConsumer(ConsumerBase):
'auto_delete': False,
'exclusive': False}
options.update(kwargs)
- exchange_name = rpc_amqp.get_control_exchange(conf)
+ exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
exchange = kombu.entity.Exchange(name=exchange_name,
type='topic',
durable=options['durable'],
@@ -664,10 +664,12 @@ class Connection(object):
"""
self.declare_consumer(DirectConsumer, topic, callback)
- def declare_topic_consumer(self, topic, callback=None, queue_name=None):
+ def declare_topic_consumer(self, topic, callback=None, queue_name=None,
+ exchange_name=None):
"""Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
+ exchange_name=exchange_name,
),
topic, callback)
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index 97ac7c7..1b81cd9 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -170,7 +170,8 @@ class DirectConsumer(ConsumerBase):
class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
- def __init__(self, conf, session, topic, callback, name=None):
+ def __init__(self, conf, session, topic, callback, name=None,
+ exchange_name=None):
"""Init a 'topic' queue.
:param session: the amqp session to use
@@ -180,7 +181,7 @@ class TopicConsumer(ConsumerBase):
:param name: optional queue name, defaults to topic
"""
- exchange_name = rpc_amqp.get_control_exchange(conf)
+ exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
super(TopicConsumer, self).__init__(session, callback,
"%s/%s" % (exchange_name, topic),
{}, name or topic, {})
@@ -464,10 +465,12 @@ class Connection(object):
"""
self.declare_consumer(DirectConsumer, topic, callback)
- def declare_topic_consumer(self, topic, callback=None, queue_name=None):
+ def declare_topic_consumer(self, topic, callback=None, queue_name=None,
+ exchange_name=None):
"""Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
+ exchange_name=exchange_name,
),
topic, callback)