summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDoug Hellmann <doug.hellmann@dreamhost.com>2013-02-13 18:02:22 -0500
committerDoug Hellmann <doug.hellmann@dreamhost.com>2013-02-13 18:02:22 -0500
commit74a9cbdc86580d256b6cd28ae57cd69213da6c44 (patch)
treeb1520d653dd54e676a5ad96b52e596796174a958
parenteaab5fae2502198e9fa57d0d90a7204a2bd83b16 (diff)
downloadoslo-74a9cbdc86580d256b6cd28ae57cd69213da6c44.tar.gz
oslo-74a9cbdc86580d256b6cd28ae57cd69213da6c44.tar.xz
oslo-74a9cbdc86580d256b6cd28ae57cd69213da6c44.zip
Add join_consumer_pool() to RPC connections
blueprint move-listener-framework-oslo bug 1047015 bug 1111632 Ceilometer and Quantum use private methods of the RPC connection object to configure themselves to listen to a queue shared among a group of workers. This change adds a public method to the RPC connection to support this use case, without resorting to using private API calls. Change-Id: I3a89f1dfdcf8accca70cf305f7a31315bea093d8 Signed-off-by: Doug Hellmann <doug.hellmann@dreamhost.com>
-rw-r--r--.gitignore1
-rw-r--r--openstack/common/rpc/amqp.py57
-rw-r--r--openstack/common/rpc/common.py22
-rw-r--r--openstack/common/rpc/impl_kombu.py24
-rw-r--r--openstack/common/rpc/impl_qpid.py28
-rw-r--r--tests/unit/rpc/test_kombu.py34
-rw-r--r--tests/unit/rpc/test_qpid.py30
7 files changed, 188 insertions, 8 deletions
diff --git a/.gitignore b/.gitignore
index 772ac6e..383605d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,3 +17,4 @@ AUTHORS
ChangeLog
openstack/versioninfo
*.egg
+openstack/common/db/*.sqlite \ No newline at end of file
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index d1cc9ea..3d7ad5e 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -137,6 +137,12 @@ class ConnectionContext(rpc_common.Connection):
def create_worker(self, topic, proxy, pool_name):
self.connection.create_worker(topic, proxy, pool_name)
+ def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
+ self.connection.join_consumer_pool(callback,
+ pool_name,
+ topic,
+ exchange_name)
+
def consume_in_thread(self):
self.connection.consume_in_thread()
@@ -224,15 +230,54 @@ def pack_context(msg, context):
msg.update(context_d)
-class ProxyCallback(object):
- """Calls methods on a proxy object based on method and args."""
+class _ThreadPoolWithWait(object):
+ """Base class for a delayed invocation manager used by
+ the Connection class to start up green threads
+ to handle incoming messages.
+ """
- def __init__(self, conf, proxy, connection_pool):
- self.proxy = proxy
+ def __init__(self, conf, connection_pool):
self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)
self.connection_pool = connection_pool
self.conf = conf
+ def wait(self):
+ """Wait for all callback threads to exit."""
+ self.pool.waitall()
+
+
+class CallbackWrapper(_ThreadPoolWithWait):
+ """Wraps a straight callback to allow it to be invoked in a green
+ thread.
+ """
+
+ def __init__(self, conf, callback, connection_pool):
+ """
+ :param conf: cfg.CONF instance
+ :param callback: a callable (probably a function)
+ :param connection_pool: connection pool as returned by
+ get_connection_pool()
+ """
+ super(CallbackWrapper, self).__init__(
+ conf=conf,
+ connection_pool=connection_pool,
+ )
+ self.callback = callback
+
+ def __call__(self, message_data):
+ self.pool.spawn_n(self.callback, message_data)
+
+
+class ProxyCallback(_ThreadPoolWithWait):
+ """Calls methods on a proxy object based on method and args."""
+
+ def __init__(self, conf, proxy, connection_pool):
+ super(ProxyCallback, self).__init__(
+ conf=conf,
+ connection_pool=connection_pool,
+ )
+ self.proxy = proxy
+
def __call__(self, message_data):
"""Consumer callback to call a method on a proxy object.
@@ -293,10 +338,6 @@ class ProxyCallback(object):
ctxt.reply(None, sys.exc_info(),
connection_pool=self.connection_pool)
- def wait(self):
- """Wait for all callback threads to exit."""
- self.pool.waitall()
-
class MulticallWaiter(object):
def __init__(self, conf, connection, timeout):
diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py
index 357ee9d..55dad2d 100644
--- a/openstack/common/rpc/common.py
+++ b/openstack/common/rpc/common.py
@@ -196,6 +196,28 @@ class Connection(object):
"""
raise NotImplementedError()
+ def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
+ """Register as a member of a group of consumers for a given topic from
+ the specified exchange.
+
+ Exactly one member of a given pool will receive each message.
+
+ A message will be delivered to multiple pools, if more than
+ one is created.
+
+ :param callback: Callable to be invoked for each message.
+ :type callback: callable accepting one argument
+ :param pool_name: The name of the consumer pool.
+ :type pool_name: str
+ :param topic: The routing topic for desired messages.
+ :type topic: str
+ :param exchange_name: The name of the message exchange where
+ the client should attach. Defaults to
+ the configured exchange.
+ :type exchange_name: str
+ """
+ raise NotImplementedError()
+
def consume_in_thread(self):
"""Spawn a thread to handle incoming messages.
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index 4f3cc7a..e9b715d 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -750,6 +750,30 @@ class Connection(object):
self.proxy_callbacks.append(proxy_cb)
self.declare_topic_consumer(topic, proxy_cb, pool_name)
+ def join_consumer_pool(self, callback, pool_name, topic,
+ exchange_name=None):
+ """Register as a member of a group of consumers for a given topic from
+ the specified exchange.
+
+ Exactly one member of a given pool will receive each message.
+
+ A message will be delivered to multiple pools, if more than
+ one is created.
+ """
+ callback_wrapper = rpc_amqp.CallbackWrapper(
+ conf=self.conf,
+ callback=callback,
+ connection_pool=rpc_amqp.get_connection_pool(self.conf,
+ Connection),
+ )
+ self.proxy_callbacks.append(callback_wrapper)
+ self.declare_topic_consumer(
+ queue_name=pool_name,
+ topic=topic,
+ exchange_name=exchange_name,
+ callback=callback_wrapper,
+ )
+
def create_connection(conf, new=True):
"""Create a connection"""
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index 34262a3..b3c78bb 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -560,6 +560,34 @@ class Connection(object):
return consumer
+ def join_consumer_pool(self, callback, pool_name, topic,
+ exchange_name=None):
+ """Register as a member of a group of consumers for a given topic from
+ the specified exchange.
+
+ Exactly one member of a given pool will receive each message.
+
+ A message will be delivered to multiple pools, if more than
+ one is created.
+ """
+ callback_wrapper = rpc_amqp.CallbackWrapper(
+ conf=self.conf,
+ callback=callback,
+ connection_pool=rpc_amqp.get_connection_pool(self.conf,
+ Connection),
+ )
+ self.proxy_callbacks.append(callback_wrapper)
+
+ consumer = TopicConsumer(conf=self.conf,
+ session=self.session,
+ topic=topic,
+ callback=callback_wrapper,
+ name=pool_name,
+ exchange_name=exchange_name)
+
+ self._register_consumer(consumer)
+ return consumer
+
def create_connection(conf, new=True):
"""Create a connection"""
diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py
index a201940..1bd23d5 100644
--- a/tests/unit/rpc/test_kombu.py
+++ b/tests/unit/rpc/test_kombu.py
@@ -26,6 +26,8 @@ import contextlib
import logging
import time
+import mock
+
from openstack.common import cfg
from openstack.common import exception
from openstack.common.rpc import amqp as rpc_amqp
@@ -495,6 +497,38 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
#Traceback should be included in exception message
self.assertTrue('exception.ApiError' in unicode(exc))
+ def test_create_worker(self):
+ meth = 'declare_topic_consumer'
+ with mock.patch.object(self.rpc.Connection, meth) as p:
+ conn = self.rpc.create_connection(FLAGS)
+ conn.create_worker(
+ 'topic.name',
+ lambda *a, **k: (a, k),
+ 'pool.name',
+ )
+ p.assert_called_with(
+ 'topic.name',
+ mock.ANY, # the proxy
+ 'pool.name',
+ )
+
+ def test_join_consumer_pool(self):
+ meth = 'declare_topic_consumer'
+ with mock.patch.object(self.rpc.Connection, meth) as p:
+ conn = self.rpc.create_connection(FLAGS)
+ conn.join_consumer_pool(
+ callback=lambda *a, **k: (a, k),
+ pool_name='pool.name',
+ topic='topic.name',
+ exchange_name='exchange.name',
+ )
+ p.assert_called_with(
+ callback=mock.ANY, # the callback wrapper
+ queue_name='pool.name',
+ exchange_name='exchange.name',
+ topic='topic.name',
+ )
+
class RpcKombuHATestCase(utils.BaseTestCase):
def setUp(self):
diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py
index 1df94d5..016c5b9 100644
--- a/tests/unit/rpc/test_qpid.py
+++ b/tests/unit/rpc/test_qpid.py
@@ -184,6 +184,36 @@ class RpcQpidTestCase(utils.BaseTestCase):
)
connection.close()
+ def test_join_consumer_pool(self):
+ self.mock_connection = self.mox.CreateMock(self.orig_connection)
+ self.mock_session = self.mox.CreateMock(self.orig_session)
+ self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
+
+ self.mock_connection.opened().AndReturn(False)
+ self.mock_connection.open()
+ self.mock_connection.session().AndReturn(self.mock_session)
+ expected_address = (
+ 'exchange-name/impl_qpid_test ; {"node": {"x-declare": '
+ '{"auto-delete": true, "durable": true}, "type": "topic"}, '
+ '"create": "always", "link": {"x-declare": {"auto-delete": '
+ 'true, "exclusive": false, "durable": false}, "durable": '
+ 'true, "name": "impl.qpid.test.consumer.pool"}}')
+ self.mock_session.receiver(expected_address).AndReturn(
+ self.mock_receiver)
+ self.mock_receiver.capacity = 1
+ self.mock_connection.close()
+
+ self.mox.ReplayAll()
+
+ connection = impl_qpid.create_connection(FLAGS)
+ connection.join_consumer_pool(
+ callback=lambda *_x, **_y: None,
+ pool_name='impl.qpid.test.consumer.pool',
+ topic="impl_qpid_test",
+ exchange_name='exchange-name',
+ )
+ connection.close()
+
def test_topic_consumer(self):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)