diff options
Diffstat (limited to 'openstack/common')
| -rw-r--r-- | openstack/common/rpc/impl_kombu.py | 8 | ||||
| -rw-r--r-- | openstack/common/rpc/impl_qpid.py | 9 |
2 files changed, 11 insertions, 6 deletions
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 6c981b2..fc34de1 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -215,7 +215,7 @@ class TopicConsumer(ConsumerBase): """Consumer class for 'topic'""" def __init__(self, conf, channel, topic, callback, tag, name=None, - **kwargs): + exchange_name=None, **kwargs): """Init a 'topic' queue. :param channel: the amqp channel to use @@ -234,7 +234,7 @@ class TopicConsumer(ConsumerBase): 'auto_delete': False, 'exclusive': False} options.update(kwargs) - exchange_name = rpc_amqp.get_control_exchange(conf) + exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) exchange = kombu.entity.Exchange(name=exchange_name, type='topic', durable=options['durable'], @@ -664,10 +664,12 @@ class Connection(object): """ self.declare_consumer(DirectConsumer, topic, callback) - def declare_topic_consumer(self, topic, callback=None, queue_name=None): + def declare_topic_consumer(self, topic, callback=None, queue_name=None, + exchange_name=None): """Create a 'topic' consumer.""" self.declare_consumer(functools.partial(TopicConsumer, name=queue_name, + exchange_name=exchange_name, ), topic, callback) diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index 97ac7c7..1b81cd9 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -170,7 +170,8 @@ class DirectConsumer(ConsumerBase): class TopicConsumer(ConsumerBase): """Consumer class for 'topic'""" - def __init__(self, conf, session, topic, callback, name=None): + def __init__(self, conf, session, topic, callback, name=None, + exchange_name=None): """Init a 'topic' queue. :param session: the amqp session to use @@ -180,7 +181,7 @@ class TopicConsumer(ConsumerBase): :param name: optional queue name, defaults to topic """ - exchange_name = rpc_amqp.get_control_exchange(conf) + exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) super(TopicConsumer, self).__init__(session, callback, "%s/%s" % (exchange_name, topic), {}, name or topic, {}) @@ -464,10 +465,12 @@ class Connection(object): """ self.declare_consumer(DirectConsumer, topic, callback) - def declare_topic_consumer(self, topic, callback=None, queue_name=None): + def declare_topic_consumer(self, topic, callback=None, queue_name=None, + exchange_name=None): """Create a 'topic' consumer.""" self.declare_consumer(functools.partial(TopicConsumer, name=queue_name, + exchange_name=exchange_name, ), topic, callback) |
