summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDoug Hellmann <doug.hellmann@dreamhost.com>2012-05-18 15:13:20 -0400
committerDoug Hellmann <doug.hellmann@dreamhost.com>2012-05-24 15:42:30 -0400
commit89fccd50d740a0cd68e7993e94536e68641d3337 (patch)
tree4b47834cdf125631eb23d90ea7f12f15d2616a65
parentd7eef34fdea8f94068dd2d745cba1ae493af61a9 (diff)
downloadnova-89fccd50d740a0cd68e7993e94536e68641d3337.tar.gz
nova-89fccd50d740a0cd68e7993e94536e68641d3337.tar.xz
nova-89fccd50d740a0cd68e7993e94536e68641d3337.zip
add queue name argument to TopicConsumer
ceilometer is going to want to subscribe several worker processes to the notifications.info topic queue. The pool of workers needs to be assured of receiving all messages, without interference from other clients listening for notifications. The TopicConsumer class always assumes the topic and queue name should be the same, but in the ceilometer case we want to use a separate named queue with a different name. The only parameter that cannot be controlled by the user of TopicConsumer is the queue name, so this change adds the ability to set the queue name. Change-Id: I41a525de2fd855ca30c24fafcfbfefd6ab615dd7
-rw-r--r--nova/rpc/amqp.py3
-rw-r--r--nova/rpc/common.py19
-rw-r--r--nova/rpc/impl_kombu.py32
-rw-r--r--nova/rpc/impl_qpid.py32
-rw-r--r--nova/tests/rpc/test_kombu.py25
-rw-r--r--nova/tests/rpc/test_qpid.py29
6 files changed, 124 insertions, 16 deletions
diff --git a/nova/rpc/amqp.py b/nova/rpc/amqp.py
index 0e079f533..4850eb47e 100644
--- a/nova/rpc/amqp.py
+++ b/nova/rpc/amqp.py
@@ -132,6 +132,9 @@ class ConnectionContext(rpc_common.Connection):
def create_consumer(self, topic, proxy, fanout=False):
self.connection.create_consumer(topic, proxy, fanout)
+ def create_worker(self, topic, proxy, pool_name):
+ self.connection.create_worker(topic, proxy, pool_name)
+
def consume_in_thread(self):
self.connection.consume_in_thread()
diff --git a/nova/rpc/common.py b/nova/rpc/common.py
index 886917733..120b41c14 100644
--- a/nova/rpc/common.py
+++ b/nova/rpc/common.py
@@ -131,6 +131,25 @@ class Connection(object):
"""
raise NotImplementedError()
+ def create_worker(self, conf, topic, proxy, pool_name):
+ """Create a worker on this connection.
+
+ A worker is like a regular consumer of messages directed to a
+ topic, except that it is part of a set of such consumers (the
+ "pool") which may run in parallel. Every pool of workers will
+ receive a given message, but only one worker in the pool will
+ be asked to process it. Load is distributed across the members
+ of the pool in round-robin fashion.
+
+ :param conf: An openstack.common.cfg configuration object.
+ :param topic: This is a name associated with what to consume from.
+ Multiple instances of a service may consume from the same
+ topic.
+ :param proxy: The object that will handle all incoming messages.
+ :param pool_name: String containing the name of the pool of workers
+ """
+ raise NotImplementedError()
+
def consume_in_thread(self):
"""Spawn a thread to handle incoming messages.
diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py
index 7285acbe3..ecabf4ded 100644
--- a/nova/rpc/impl_kombu.py
+++ b/nova/rpc/impl_kombu.py
@@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import functools
import itertools
import socket
import ssl
@@ -156,15 +157,19 @@ class DirectConsumer(ConsumerBase):
class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
- def __init__(self, conf, channel, topic, callback, tag, **kwargs):
+ def __init__(self, conf, channel, topic, callback, tag, name=None,
+ **kwargs):
"""Init a 'topic' queue.
- 'channel' is the amqp channel to use
- 'topic' is the topic to listen on
- 'callback' is the callback to call when messages are received
- 'tag' is a unique ID for the consumer on the channel
+ :param channel: the amqp channel to use
+ :param topic: the topic to listen on
+ :paramtype topic: str
+ :param callback: the callback to call when messages are received
+ :param tag: a unique ID for the consumer on the channel
+ :param name: optional queue name, defaults to topic
+ :paramtype name: str
- Other kombu options may be passed
+ Other kombu options may be passed as keyword arguments
"""
# Default options
options = {'durable': conf.rabbit_durable_queues,
@@ -180,7 +185,7 @@ class TopicConsumer(ConsumerBase):
channel,
callback,
tag,
- name=topic,
+ name=name or topic,
exchange=exchange,
routing_key=topic,
**options)
@@ -602,9 +607,12 @@ class Connection(object):
"""
self.declare_consumer(DirectConsumer, topic, callback)
- def declare_topic_consumer(self, topic, callback=None):
+ def declare_topic_consumer(self, topic, callback=None, queue_name=None):
"""Create a 'topic' consumer."""
- self.declare_consumer(TopicConsumer, topic, callback)
+ self.declare_consumer(functools.partial(TopicConsumer,
+ name=queue_name,
+ ),
+ topic, callback)
def declare_fanout_consumer(self, topic, callback):
"""Create a 'fanout' consumer"""
@@ -656,6 +664,12 @@ class Connection(object):
else:
self.declare_topic_consumer(topic, proxy_cb)
+ def create_worker(self, topic, proxy, pool_name):
+ """Create a worker that calls a method in a proxy object"""
+ proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
+ rpc_amqp.get_connection_pool(self, Connection))
+ self.declare_topic_consumer(topic, proxy_cb, pool_name)
+
def create_connection(conf, new=True):
"""Create a connection"""
diff --git a/nova/rpc/impl_qpid.py b/nova/rpc/impl_qpid.py
index f2f669a54..dad47defc 100644
--- a/nova/rpc/impl_qpid.py
+++ b/nova/rpc/impl_qpid.py
@@ -15,6 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import functools
import itertools
import json
import time
@@ -161,17 +162,19 @@ class DirectConsumer(ConsumerBase):
class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
- def __init__(self, conf, session, topic, callback):
+ def __init__(self, conf, session, topic, callback, name=None):
"""Init a 'topic' queue.
- 'session' is the amqp session to use
- 'topic' is the topic to listen on
- 'callback' is the callback to call when messages are received
+ :param session: the amqp session to use
+ :param topic: is the topic to listen on
+ :paramtype topic: str
+ :param callback: the callback to call when messages are received
+ :param name: optional queue name, defaults to topic
"""
super(TopicConsumer, self).__init__(session, callback,
"%s/%s" % (conf.control_exchange, topic), {},
- topic, {})
+ name or topic, {})
class FanoutConsumer(ConsumerBase):
@@ -448,9 +451,12 @@ class Connection(object):
"""
self.declare_consumer(DirectConsumer, topic, callback)
- def declare_topic_consumer(self, topic, callback=None):
+ def declare_topic_consumer(self, topic, callback=None, queue_name=None):
"""Create a 'topic' consumer."""
- self.declare_consumer(TopicConsumer, topic, callback)
+ self.declare_consumer(functools.partial(TopicConsumer,
+ name=queue_name,
+ ),
+ topic, callback)
def declare_fanout_consumer(self, topic, callback):
"""Create a 'fanout' consumer"""
@@ -506,6 +512,18 @@ class Connection(object):
return consumer
+ def create_worker(self, topic, proxy, pool_name):
+ """Create a worker that calls a method in a proxy object"""
+ proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
+ rpc_amqp.get_connection_pool(self, Connection))
+
+ consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
+ name=pool_name)
+
+ self._register_consumer(consumer)
+
+ return consumer
+
def create_connection(conf, new=True):
"""Create a connection"""
diff --git a/nova/tests/rpc/test_kombu.py b/nova/tests/rpc/test_kombu.py
index 837495273..a69dcdfe9 100644
--- a/nova/tests/rpc/test_kombu.py
+++ b/nova/tests/rpc/test_kombu.py
@@ -102,6 +102,31 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
self.assertEqual(self.received_message, message)
@test.skip_if(kombu is None, "Test requires kombu")
+ def test_topic_multiple_queues(self):
+ """Test sending to a topic exchange with multiple queues"""
+
+ conn = self.rpc.create_connection(FLAGS)
+ message = 'topic test message'
+
+ self.received_message_1 = None
+ self.received_message_2 = None
+
+ def _callback1(message):
+ self.received_message_1 = message
+
+ def _callback2(message):
+ self.received_message_2 = message
+
+ conn.declare_topic_consumer('a_topic', _callback1, queue_name='queue1')
+ conn.declare_topic_consumer('a_topic', _callback2, queue_name='queue2')
+ conn.topic_send('a_topic', message)
+ conn.consume(limit=2)
+ conn.close()
+
+ self.assertEqual(self.received_message_1, message)
+ self.assertEqual(self.received_message_2, message)
+
+ @test.skip_if(kombu is None, "Test requires kombu")
def test_direct_send_receive(self):
"""Test sending to a direct exchange/queue"""
conn = self.rpc.create_connection(FLAGS)
diff --git a/nova/tests/rpc/test_qpid.py b/nova/tests/rpc/test_qpid.py
index f01c8a25b..c523f3fe3 100644
--- a/nova/tests/rpc/test_qpid.py
+++ b/nova/tests/rpc/test_qpid.py
@@ -147,6 +147,35 @@ class RpcQpidTestCase(test.TestCase):
def test_create_consumer_fanout(self):
self._test_create_consumer(fanout=True)
+ @test.skip_if(qpid is None, "Test requires qpid")
+ def test_create_worker(self):
+ self.mock_connection = self.mox.CreateMock(self.orig_connection)
+ self.mock_session = self.mox.CreateMock(self.orig_session)
+ self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
+
+ self.mock_connection.opened().AndReturn(False)
+ self.mock_connection.open()
+ self.mock_connection.session().AndReturn(self.mock_session)
+ expected_address = (
+ 'nova/impl_qpid_test ; {"node": {"x-declare": '
+ '{"auto-delete": true, "durable": true}, "type": "topic"}, '
+ '"create": "always", "link": {"x-declare": {"auto-delete": '
+ 'true, "exclusive": false, "durable": false}, "durable": '
+ 'true, "name": "impl.qpid.test.workers"}}')
+ self.mock_session.receiver(expected_address).AndReturn(
+ self.mock_receiver)
+ self.mock_receiver.capacity = 1
+ self.mock_connection.close()
+
+ self.mox.ReplayAll()
+
+ connection = impl_qpid.create_connection(FLAGS)
+ connection.create_worker("impl_qpid_test",
+ lambda *_x, **_y: None,
+ 'impl.qpid.test.workers',
+ )
+ connection.close()
+
def _test_cast(self, fanout, server_params=None):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)