diff options
author | Doug Hellmann <doug.hellmann@dreamhost.com> | 2013-02-13 18:02:22 -0500 |
---|---|---|
committer | Doug Hellmann <doug.hellmann@dreamhost.com> | 2013-02-13 18:02:22 -0500 |
commit | 74a9cbdc86580d256b6cd28ae57cd69213da6c44 (patch) | |
tree | b1520d653dd54e676a5ad96b52e596796174a958 /openstack/common/rpc/impl_kombu.py | |
parent | eaab5fae2502198e9fa57d0d90a7204a2bd83b16 (diff) | |
download | oslo-74a9cbdc86580d256b6cd28ae57cd69213da6c44.tar.gz oslo-74a9cbdc86580d256b6cd28ae57cd69213da6c44.tar.xz oslo-74a9cbdc86580d256b6cd28ae57cd69213da6c44.zip |
Add join_consumer_pool() to RPC connections
blueprint move-listener-framework-oslo
bug 1047015
bug 1111632
Ceilometer and Quantum use private methods of the RPC connection
object to configure themselves to listen to a queue shared among a
group of workers. This change adds a public method to the RPC
connection to support this use case, without resorting to using
private API calls.
Change-Id: I3a89f1dfdcf8accca70cf305f7a31315bea093d8
Signed-off-by: Doug Hellmann <doug.hellmann@dreamhost.com>
Diffstat (limited to 'openstack/common/rpc/impl_kombu.py')
-rw-r--r-- | openstack/common/rpc/impl_kombu.py | 24 |
1 files changed, 24 insertions, 0 deletions
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 4f3cc7a..e9b715d 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -750,6 +750,30 @@ class Connection(object): self.proxy_callbacks.append(proxy_cb) self.declare_topic_consumer(topic, proxy_cb, pool_name) + def join_consumer_pool(self, callback, pool_name, topic, + exchange_name=None): + """Register as a member of a group of consumers for a given topic from + the specified exchange. + + Exactly one member of a given pool will receive each message. + + A message will be delivered to multiple pools, if more than + one is created. + """ + callback_wrapper = rpc_amqp.CallbackWrapper( + conf=self.conf, + callback=callback, + connection_pool=rpc_amqp.get_connection_pool(self.conf, + Connection), + ) + self.proxy_callbacks.append(callback_wrapper) + self.declare_topic_consumer( + queue_name=pool_name, + topic=topic, + exchange_name=exchange_name, + callback=callback_wrapper, + ) + def create_connection(conf, new=True): """Create a connection""" |