diff options
| author | Doug Hellmann <doug.hellmann@dreamhost.com> | 2012-05-18 15:13:20 -0400 |
|---|---|---|
| committer | Doug Hellmann <doug.hellmann@dreamhost.com> | 2012-05-24 15:42:30 -0400 |
| commit | 89fccd50d740a0cd68e7993e94536e68641d3337 (patch) | |
| tree | 4b47834cdf125631eb23d90ea7f12f15d2616a65 | |
| parent | d7eef34fdea8f94068dd2d745cba1ae493af61a9 (diff) | |
| download | nova-89fccd50d740a0cd68e7993e94536e68641d3337.tar.gz nova-89fccd50d740a0cd68e7993e94536e68641d3337.tar.xz nova-89fccd50d740a0cd68e7993e94536e68641d3337.zip | |
add queue name argument to TopicConsumer
ceilometer is going to want to subscribe several worker processes
to the notifications.info topic queue. The pool of workers needs
to be assured of receiving all messages, without interference from
other clients listening for notifications. The TopicConsumer
class always assumes the topic and queue name should be the same,
but in the ceilometer case we want to use a separate named queue
with a different name. The only parameter that cannot be
controlled by the user of TopicConsumer is the queue name,
so this change adds the ability to set the queue name.
Change-Id: I41a525de2fd855ca30c24fafcfbfefd6ab615dd7
| -rw-r--r-- | nova/rpc/amqp.py | 3 | ||||
| -rw-r--r-- | nova/rpc/common.py | 19 | ||||
| -rw-r--r-- | nova/rpc/impl_kombu.py | 32 | ||||
| -rw-r--r-- | nova/rpc/impl_qpid.py | 32 | ||||
| -rw-r--r-- | nova/tests/rpc/test_kombu.py | 25 | ||||
| -rw-r--r-- | nova/tests/rpc/test_qpid.py | 29 |
6 files changed, 124 insertions, 16 deletions
diff --git a/nova/rpc/amqp.py b/nova/rpc/amqp.py index 0e079f533..4850eb47e 100644 --- a/nova/rpc/amqp.py +++ b/nova/rpc/amqp.py @@ -132,6 +132,9 @@ class ConnectionContext(rpc_common.Connection): def create_consumer(self, topic, proxy, fanout=False): self.connection.create_consumer(topic, proxy, fanout) + def create_worker(self, topic, proxy, pool_name): + self.connection.create_worker(topic, proxy, pool_name) + def consume_in_thread(self): self.connection.consume_in_thread() diff --git a/nova/rpc/common.py b/nova/rpc/common.py index 886917733..120b41c14 100644 --- a/nova/rpc/common.py +++ b/nova/rpc/common.py @@ -131,6 +131,25 @@ class Connection(object): """ raise NotImplementedError() + def create_worker(self, conf, topic, proxy, pool_name): + """Create a worker on this connection. + + A worker is like a regular consumer of messages directed to a + topic, except that it is part of a set of such consumers (the + "pool") which may run in parallel. Every pool of workers will + receive a given message, but only one worker in the pool will + be asked to process it. Load is distributed across the members + of the pool in round-robin fashion. + + :param conf: An openstack.common.cfg configuration object. + :param topic: This is a name associated with what to consume from. + Multiple instances of a service may consume from the same + topic. + :param proxy: The object that will handle all incoming messages. + :param pool_name: String containing the name of the pool of workers + """ + raise NotImplementedError() + def consume_in_thread(self): """Spawn a thread to handle incoming messages. diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py index 7285acbe3..ecabf4ded 100644 --- a/nova/rpc/impl_kombu.py +++ b/nova/rpc/impl_kombu.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import functools import itertools import socket import ssl @@ -156,15 +157,19 @@ class DirectConsumer(ConsumerBase): class TopicConsumer(ConsumerBase): """Consumer class for 'topic'""" - def __init__(self, conf, channel, topic, callback, tag, **kwargs): + def __init__(self, conf, channel, topic, callback, tag, name=None, + **kwargs): """Init a 'topic' queue. - 'channel' is the amqp channel to use - 'topic' is the topic to listen on - 'callback' is the callback to call when messages are received - 'tag' is a unique ID for the consumer on the channel + :param channel: the amqp channel to use + :param topic: the topic to listen on + :paramtype topic: str + :param callback: the callback to call when messages are received + :param tag: a unique ID for the consumer on the channel + :param name: optional queue name, defaults to topic + :paramtype name: str - Other kombu options may be passed + Other kombu options may be passed as keyword arguments """ # Default options options = {'durable': conf.rabbit_durable_queues, @@ -180,7 +185,7 @@ class TopicConsumer(ConsumerBase): channel, callback, tag, - name=topic, + name=name or topic, exchange=exchange, routing_key=topic, **options) @@ -602,9 +607,12 @@ class Connection(object): """ self.declare_consumer(DirectConsumer, topic, callback) - def declare_topic_consumer(self, topic, callback=None): + def declare_topic_consumer(self, topic, callback=None, queue_name=None): """Create a 'topic' consumer.""" - self.declare_consumer(TopicConsumer, topic, callback) + self.declare_consumer(functools.partial(TopicConsumer, + name=queue_name, + ), + topic, callback) def declare_fanout_consumer(self, topic, callback): """Create a 'fanout' consumer""" @@ -656,6 +664,12 @@ class Connection(object): else: self.declare_topic_consumer(topic, proxy_cb) + def create_worker(self, topic, proxy, pool_name): + """Create a worker that calls a method in a proxy object""" + proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy, + rpc_amqp.get_connection_pool(self, Connection)) + self.declare_topic_consumer(topic, proxy_cb, pool_name) + def create_connection(conf, new=True): """Create a connection""" diff --git a/nova/rpc/impl_qpid.py b/nova/rpc/impl_qpid.py index f2f669a54..dad47defc 100644 --- a/nova/rpc/impl_qpid.py +++ b/nova/rpc/impl_qpid.py @@ -15,6 +15,7 @@ # License for the specific language governing permissions and limitations # under the License. +import functools import itertools import json import time @@ -161,17 +162,19 @@ class DirectConsumer(ConsumerBase): class TopicConsumer(ConsumerBase): """Consumer class for 'topic'""" - def __init__(self, conf, session, topic, callback): + def __init__(self, conf, session, topic, callback, name=None): """Init a 'topic' queue. - 'session' is the amqp session to use - 'topic' is the topic to listen on - 'callback' is the callback to call when messages are received + :param session: the amqp session to use + :param topic: is the topic to listen on + :paramtype topic: str + :param callback: the callback to call when messages are received + :param name: optional queue name, defaults to topic """ super(TopicConsumer, self).__init__(session, callback, "%s/%s" % (conf.control_exchange, topic), {}, - topic, {}) + name or topic, {}) class FanoutConsumer(ConsumerBase): @@ -448,9 +451,12 @@ class Connection(object): """ self.declare_consumer(DirectConsumer, topic, callback) - def declare_topic_consumer(self, topic, callback=None): + def declare_topic_consumer(self, topic, callback=None, queue_name=None): """Create a 'topic' consumer.""" - self.declare_consumer(TopicConsumer, topic, callback) + self.declare_consumer(functools.partial(TopicConsumer, + name=queue_name, + ), + topic, callback) def declare_fanout_consumer(self, topic, callback): """Create a 'fanout' consumer""" @@ -506,6 +512,18 @@ class Connection(object): return consumer + def create_worker(self, topic, proxy, pool_name): + """Create a worker that calls a method in a proxy object""" + proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy, + rpc_amqp.get_connection_pool(self, Connection)) + + consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb, + name=pool_name) + + self._register_consumer(consumer) + + return consumer + def create_connection(conf, new=True): """Create a connection""" diff --git a/nova/tests/rpc/test_kombu.py b/nova/tests/rpc/test_kombu.py index 837495273..a69dcdfe9 100644 --- a/nova/tests/rpc/test_kombu.py +++ b/nova/tests/rpc/test_kombu.py @@ -102,6 +102,31 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): self.assertEqual(self.received_message, message) @test.skip_if(kombu is None, "Test requires kombu") + def test_topic_multiple_queues(self): + """Test sending to a topic exchange with multiple queues""" + + conn = self.rpc.create_connection(FLAGS) + message = 'topic test message' + + self.received_message_1 = None + self.received_message_2 = None + + def _callback1(message): + self.received_message_1 = message + + def _callback2(message): + self.received_message_2 = message + + conn.declare_topic_consumer('a_topic', _callback1, queue_name='queue1') + conn.declare_topic_consumer('a_topic', _callback2, queue_name='queue2') + conn.topic_send('a_topic', message) + conn.consume(limit=2) + conn.close() + + self.assertEqual(self.received_message_1, message) + self.assertEqual(self.received_message_2, message) + + @test.skip_if(kombu is None, "Test requires kombu") def test_direct_send_receive(self): """Test sending to a direct exchange/queue""" conn = self.rpc.create_connection(FLAGS) diff --git a/nova/tests/rpc/test_qpid.py b/nova/tests/rpc/test_qpid.py index f01c8a25b..c523f3fe3 100644 --- a/nova/tests/rpc/test_qpid.py +++ b/nova/tests/rpc/test_qpid.py @@ -147,6 +147,35 @@ class RpcQpidTestCase(test.TestCase): def test_create_consumer_fanout(self): self._test_create_consumer(fanout=True) + @test.skip_if(qpid is None, "Test requires qpid") + def test_create_worker(self): + self.mock_connection = self.mox.CreateMock(self.orig_connection) + self.mock_session = self.mox.CreateMock(self.orig_session) + self.mock_receiver = self.mox.CreateMock(self.orig_receiver) + + self.mock_connection.opened().AndReturn(False) + self.mock_connection.open() + self.mock_connection.session().AndReturn(self.mock_session) + expected_address = ( + 'nova/impl_qpid_test ; {"node": {"x-declare": ' + '{"auto-delete": true, "durable": true}, "type": "topic"}, ' + '"create": "always", "link": {"x-declare": {"auto-delete": ' + 'true, "exclusive": false, "durable": false}, "durable": ' + 'true, "name": "impl.qpid.test.workers"}}') + self.mock_session.receiver(expected_address).AndReturn( + self.mock_receiver) + self.mock_receiver.capacity = 1 + self.mock_connection.close() + + self.mox.ReplayAll() + + connection = impl_qpid.create_connection(FLAGS) + connection.create_worker("impl_qpid_test", + lambda *_x, **_y: None, + 'impl.qpid.test.workers', + ) + connection.close() + def _test_cast(self, fanout, server_params=None): self.mock_connection = self.mox.CreateMock(self.orig_connection) self.mock_session = self.mox.CreateMock(self.orig_session) |
