diff options
author | Doug Hellmann <doug.hellmann@dreamhost.com> | 2013-02-13 18:02:22 -0500 |
---|---|---|
committer | Doug Hellmann <doug.hellmann@dreamhost.com> | 2013-02-13 18:02:22 -0500 |
commit | 74a9cbdc86580d256b6cd28ae57cd69213da6c44 (patch) | |
tree | b1520d653dd54e676a5ad96b52e596796174a958 | |
parent | eaab5fae2502198e9fa57d0d90a7204a2bd83b16 (diff) | |
download | oslo-74a9cbdc86580d256b6cd28ae57cd69213da6c44.tar.gz oslo-74a9cbdc86580d256b6cd28ae57cd69213da6c44.tar.xz oslo-74a9cbdc86580d256b6cd28ae57cd69213da6c44.zip |
Add join_consumer_pool() to RPC connections
blueprint move-listener-framework-oslo
bug 1047015
bug 1111632
Ceilometer and Quantum use private methods of the RPC connection
object to configure themselves to listen to a queue shared among a
group of workers. This change adds a public method to the RPC
connection to support this use case, without resorting to using
private API calls.
Change-Id: I3a89f1dfdcf8accca70cf305f7a31315bea093d8
Signed-off-by: Doug Hellmann <doug.hellmann@dreamhost.com>
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | openstack/common/rpc/amqp.py | 57 | ||||
-rw-r--r-- | openstack/common/rpc/common.py | 22 | ||||
-rw-r--r-- | openstack/common/rpc/impl_kombu.py | 24 | ||||
-rw-r--r-- | openstack/common/rpc/impl_qpid.py | 28 | ||||
-rw-r--r-- | tests/unit/rpc/test_kombu.py | 34 | ||||
-rw-r--r-- | tests/unit/rpc/test_qpid.py | 30 |
7 files changed, 188 insertions, 8 deletions
@@ -17,3 +17,4 @@ AUTHORS ChangeLog openstack/versioninfo *.egg +openstack/common/db/*.sqlite
\ No newline at end of file diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index d1cc9ea..3d7ad5e 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -137,6 +137,12 @@ class ConnectionContext(rpc_common.Connection): def create_worker(self, topic, proxy, pool_name): self.connection.create_worker(topic, proxy, pool_name) + def join_consumer_pool(self, callback, pool_name, topic, exchange_name): + self.connection.join_consumer_pool(callback, + pool_name, + topic, + exchange_name) + def consume_in_thread(self): self.connection.consume_in_thread() @@ -224,15 +230,54 @@ def pack_context(msg, context): msg.update(context_d) -class ProxyCallback(object): - """Calls methods on a proxy object based on method and args.""" +class _ThreadPoolWithWait(object): + """Base class for a delayed invocation manager used by + the Connection class to start up green threads + to handle incoming messages. + """ - def __init__(self, conf, proxy, connection_pool): - self.proxy = proxy + def __init__(self, conf, connection_pool): self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size) self.connection_pool = connection_pool self.conf = conf + def wait(self): + """Wait for all callback threads to exit.""" + self.pool.waitall() + + +class CallbackWrapper(_ThreadPoolWithWait): + """Wraps a straight callback to allow it to be invoked in a green + thread. + """ + + def __init__(self, conf, callback, connection_pool): + """ + :param conf: cfg.CONF instance + :param callback: a callable (probably a function) + :param connection_pool: connection pool as returned by + get_connection_pool() + """ + super(CallbackWrapper, self).__init__( + conf=conf, + connection_pool=connection_pool, + ) + self.callback = callback + + def __call__(self, message_data): + self.pool.spawn_n(self.callback, message_data) + + +class ProxyCallback(_ThreadPoolWithWait): + """Calls methods on a proxy object based on method and args.""" + + def __init__(self, conf, proxy, connection_pool): + super(ProxyCallback, self).__init__( + conf=conf, + connection_pool=connection_pool, + ) + self.proxy = proxy + def __call__(self, message_data): """Consumer callback to call a method on a proxy object. @@ -293,10 +338,6 @@ class ProxyCallback(object): ctxt.reply(None, sys.exc_info(), connection_pool=self.connection_pool) - def wait(self): - """Wait for all callback threads to exit.""" - self.pool.waitall() - class MulticallWaiter(object): def __init__(self, conf, connection, timeout): diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py index 357ee9d..55dad2d 100644 --- a/openstack/common/rpc/common.py +++ b/openstack/common/rpc/common.py @@ -196,6 +196,28 @@ class Connection(object): """ raise NotImplementedError() + def join_consumer_pool(self, callback, pool_name, topic, exchange_name): + """Register as a member of a group of consumers for a given topic from + the specified exchange. + + Exactly one member of a given pool will receive each message. + + A message will be delivered to multiple pools, if more than + one is created. + + :param callback: Callable to be invoked for each message. + :type callback: callable accepting one argument + :param pool_name: The name of the consumer pool. + :type pool_name: str + :param topic: The routing topic for desired messages. + :type topic: str + :param exchange_name: The name of the message exchange where + the client should attach. Defaults to + the configured exchange. + :type exchange_name: str + """ + raise NotImplementedError() + def consume_in_thread(self): """Spawn a thread to handle incoming messages. diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 4f3cc7a..e9b715d 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -750,6 +750,30 @@ class Connection(object): self.proxy_callbacks.append(proxy_cb) self.declare_topic_consumer(topic, proxy_cb, pool_name) + def join_consumer_pool(self, callback, pool_name, topic, + exchange_name=None): + """Register as a member of a group of consumers for a given topic from + the specified exchange. + + Exactly one member of a given pool will receive each message. + + A message will be delivered to multiple pools, if more than + one is created. + """ + callback_wrapper = rpc_amqp.CallbackWrapper( + conf=self.conf, + callback=callback, + connection_pool=rpc_amqp.get_connection_pool(self.conf, + Connection), + ) + self.proxy_callbacks.append(callback_wrapper) + self.declare_topic_consumer( + queue_name=pool_name, + topic=topic, + exchange_name=exchange_name, + callback=callback_wrapper, + ) + def create_connection(conf, new=True): """Create a connection""" diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index 34262a3..b3c78bb 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -560,6 +560,34 @@ class Connection(object): return consumer + def join_consumer_pool(self, callback, pool_name, topic, + exchange_name=None): + """Register as a member of a group of consumers for a given topic from + the specified exchange. + + Exactly one member of a given pool will receive each message. + + A message will be delivered to multiple pools, if more than + one is created. + """ + callback_wrapper = rpc_amqp.CallbackWrapper( + conf=self.conf, + callback=callback, + connection_pool=rpc_amqp.get_connection_pool(self.conf, + Connection), + ) + self.proxy_callbacks.append(callback_wrapper) + + consumer = TopicConsumer(conf=self.conf, + session=self.session, + topic=topic, + callback=callback_wrapper, + name=pool_name, + exchange_name=exchange_name) + + self._register_consumer(consumer) + return consumer + def create_connection(conf, new=True): """Create a connection""" diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py index a201940..1bd23d5 100644 --- a/tests/unit/rpc/test_kombu.py +++ b/tests/unit/rpc/test_kombu.py @@ -26,6 +26,8 @@ import contextlib import logging import time +import mock + from openstack.common import cfg from openstack.common import exception from openstack.common.rpc import amqp as rpc_amqp @@ -495,6 +497,38 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): #Traceback should be included in exception message self.assertTrue('exception.ApiError' in unicode(exc)) + def test_create_worker(self): + meth = 'declare_topic_consumer' + with mock.patch.object(self.rpc.Connection, meth) as p: + conn = self.rpc.create_connection(FLAGS) + conn.create_worker( + 'topic.name', + lambda *a, **k: (a, k), + 'pool.name', + ) + p.assert_called_with( + 'topic.name', + mock.ANY, # the proxy + 'pool.name', + ) + + def test_join_consumer_pool(self): + meth = 'declare_topic_consumer' + with mock.patch.object(self.rpc.Connection, meth) as p: + conn = self.rpc.create_connection(FLAGS) + conn.join_consumer_pool( + callback=lambda *a, **k: (a, k), + pool_name='pool.name', + topic='topic.name', + exchange_name='exchange.name', + ) + p.assert_called_with( + callback=mock.ANY, # the callback wrapper + queue_name='pool.name', + exchange_name='exchange.name', + topic='topic.name', + ) + class RpcKombuHATestCase(utils.BaseTestCase): def setUp(self): diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py index 1df94d5..016c5b9 100644 --- a/tests/unit/rpc/test_qpid.py +++ b/tests/unit/rpc/test_qpid.py @@ -184,6 +184,36 @@ class RpcQpidTestCase(utils.BaseTestCase): ) connection.close() + def test_join_consumer_pool(self): + self.mock_connection = self.mox.CreateMock(self.orig_connection) + self.mock_session = self.mox.CreateMock(self.orig_session) + self.mock_receiver = self.mox.CreateMock(self.orig_receiver) + + self.mock_connection.opened().AndReturn(False) + self.mock_connection.open() + self.mock_connection.session().AndReturn(self.mock_session) + expected_address = ( + 'exchange-name/impl_qpid_test ; {"node": {"x-declare": ' + '{"auto-delete": true, "durable": true}, "type": "topic"}, ' + '"create": "always", "link": {"x-declare": {"auto-delete": ' + 'true, "exclusive": false, "durable": false}, "durable": ' + 'true, "name": "impl.qpid.test.consumer.pool"}}') + self.mock_session.receiver(expected_address).AndReturn( + self.mock_receiver) + self.mock_receiver.capacity = 1 + self.mock_connection.close() + + self.mox.ReplayAll() + + connection = impl_qpid.create_connection(FLAGS) + connection.join_consumer_pool( + callback=lambda *_x, **_y: None, + pool_name='impl.qpid.test.consumer.pool', + topic="impl_qpid_test", + exchange_name='exchange-name', + ) + connection.close() + def test_topic_consumer(self): self.mock_connection = self.mox.CreateMock(self.orig_connection) self.mock_session = self.mox.CreateMock(self.orig_session) |