summaryrefslogtreecommitdiffstats
path: root/openstack/common/rpc/impl_kombu.py
diff options
context:
space:
mode:
authorDoug Hellmann <doug.hellmann@dreamhost.com>2013-02-13 18:02:22 -0500
committerDoug Hellmann <doug.hellmann@dreamhost.com>2013-02-13 18:02:22 -0500
commit74a9cbdc86580d256b6cd28ae57cd69213da6c44 (patch)
treeb1520d653dd54e676a5ad96b52e596796174a958 /openstack/common/rpc/impl_kombu.py
parenteaab5fae2502198e9fa57d0d90a7204a2bd83b16 (diff)
downloadoslo-74a9cbdc86580d256b6cd28ae57cd69213da6c44.tar.gz
oslo-74a9cbdc86580d256b6cd28ae57cd69213da6c44.tar.xz
oslo-74a9cbdc86580d256b6cd28ae57cd69213da6c44.zip
Add join_consumer_pool() to RPC connections
blueprint move-listener-framework-oslo bug 1047015 bug 1111632 Ceilometer and Quantum use private methods of the RPC connection object to configure themselves to listen to a queue shared among a group of workers. This change adds a public method to the RPC connection to support this use case, without resorting to using private API calls. Change-Id: I3a89f1dfdcf8accca70cf305f7a31315bea093d8 Signed-off-by: Doug Hellmann <doug.hellmann@dreamhost.com>
Diffstat (limited to 'openstack/common/rpc/impl_kombu.py')
-rw-r--r--openstack/common/rpc/impl_kombu.py24
1 files changed, 24 insertions, 0 deletions
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index 4f3cc7a..e9b715d 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -750,6 +750,30 @@ class Connection(object):
self.proxy_callbacks.append(proxy_cb)
self.declare_topic_consumer(topic, proxy_cb, pool_name)
+ def join_consumer_pool(self, callback, pool_name, topic,
+ exchange_name=None):
+ """Register as a member of a group of consumers for a given topic from
+ the specified exchange.
+
+ Exactly one member of a given pool will receive each message.
+
+ A message will be delivered to multiple pools, if more than
+ one is created.
+ """
+ callback_wrapper = rpc_amqp.CallbackWrapper(
+ conf=self.conf,
+ callback=callback,
+ connection_pool=rpc_amqp.get_connection_pool(self.conf,
+ Connection),
+ )
+ self.proxy_callbacks.append(callback_wrapper)
+ self.declare_topic_consumer(
+ queue_name=pool_name,
+ topic=topic,
+ exchange_name=exchange_name,
+ callback=callback_wrapper,
+ )
+
def create_connection(conf, new=True):
"""Create a connection"""