summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChris Behrens <cbehrens@codestud.com>2011-08-28 17:33:11 -0700
committerChris Behrens <cbehrens@codestud.com>2011-08-28 17:33:11 -0700
commit256cb956abeff85f3cddce499b488dd112c4137d (patch)
treede39ec816b7f94078b41fb7a340f0124f22d61ee
parent4faaf9c37d030d68cfea818d396963e3ed7deeaa (diff)
start to rework some consumer stuff
-rw-r--r--nova/rpc/impl_kombu.py127
-rw-r--r--nova/service.py24
2 files changed, 81 insertions, 70 deletions
diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py
index db839dd2a..01871606c 100644
--- a/nova/rpc/impl_kombu.py
+++ b/nova/rpc/impl_kombu.py
@@ -35,11 +35,11 @@ flags.DEFINE_integer('rpc_thread_pool_size', 1024,
'Size of RPC thread pool')
-class QueueBase(object):
- """Queue base class."""
+class ConsumerBase(object):
+ """Consumer base class."""
def __init__(self, channel, callback, tag, **kwargs):
- """Init the queue.
+ """Declare a queue on an amqp channel.
'channel' is the amqp channel to use
'callback' is the callback to call when messages are received
@@ -55,20 +55,21 @@ class QueueBase(object):
self.reconnect(channel)
def reconnect(self, channel):
- """Re-create the queue after a rabbit reconnect"""
+ """Re-declare the queue after a rabbit reconnect"""
self.channel = channel
self.kwargs['channel'] = channel
self.queue = kombu.entity.Queue(**self.kwargs)
self.queue.declare()
def consume(self, *args, **kwargs):
- """Consume from this queue.
+ """Actually declare the consumer on the amqp channel. This will
+ start the flow of messages from the queue. Using the
+ Connection.iterconsume() iterator will process the messages,
+ calling the appropriate callback.
+
If a callback is specified in kwargs, use that. Otherwise,
use the callback passed during __init__()
- The callback will be called if a message was read off of the
- queue.
-
If kwargs['nowait'] is True, then this call will block until
a message is read.
@@ -100,7 +101,7 @@ class QueueBase(object):
self.queue = None
-class DirectQueue(QueueBase):
+class DirectConsumer(ConsumerBase):
"""Queue/consumer class for 'direct'"""
def __init__(self, channel, msg_id, callback, tag, **kwargs):
@@ -123,7 +124,7 @@ class DirectQueue(QueueBase):
type='direct',
durable=options['durable'],
auto_delete=options['auto_delete'])
- super(DirectQueue, self).__init__(
+ super(DirectConsumer, self).__init__(
channel,
callback,
tag,
@@ -133,8 +134,8 @@ class DirectQueue(QueueBase):
**options)
-class TopicQueue(QueueBase):
- """Queue/consumer class for 'topic'"""
+class TopicConsumer(ConsumerBase):
+ """Consumer class for 'topic'"""
def __init__(self, channel, topic, callback, tag, **kwargs):
"""Init a 'topic' queue.
@@ -156,7 +157,7 @@ class TopicQueue(QueueBase):
type='topic',
durable=options['durable'],
auto_delete=options['auto_delete'])
- super(TopicQueue, self).__init__(
+ super(TopicConsumer, self).__init__(
channel,
callback,
tag,
@@ -166,8 +167,8 @@ class TopicQueue(QueueBase):
**options)
-class FanoutQueue(QueueBase):
- """Queue/consumer class for 'fanout'"""
+class FanoutConsumer(ConsumerBase):
+ """Consumer class for 'fanout'"""
def __init__(self, channel, topic, callback, tag, **kwargs):
"""Init a 'fanout' queue.
@@ -193,7 +194,7 @@ class FanoutQueue(QueueBase):
type='fanout',
durable=options['durable'],
auto_delete=options['auto_delete'])
- super(FanoutQueue, self).__init__(
+ super(FanoutConsumer, self).__init__(
channel,
callback,
tag,
@@ -286,7 +287,8 @@ class Connection(object):
"""Connection instance object."""
def __init__(self):
- self.queues = []
+ self.consumers = []
+ self.consumer_thread = None
self.max_retries = FLAGS.rabbit_max_retries
# Try forever?
if self.max_retries <= 0:
@@ -334,9 +336,9 @@ class Connection(object):
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d' %
self.params))
self.channel = self.connection.channel()
- for consumer in self.queues:
+ for consumer in self.consumers:
consumer.reconnect(self.channel)
- if self.queues:
+ if self.consumers:
LOG.debug(_("Re-established AMQP queues"))
def get_channel(self):
@@ -354,30 +356,32 @@ class Connection(object):
def close(self):
"""Close/release this connection"""
+ self.cancel_consumer_thread()
self.connection.release()
self.connection = None
def reset(self):
"""Reset a connection so it can be used again"""
+ self.cancel_consumer_thread()
self.channel.close()
self.channel = self.connection.channel()
- self.queues = []
+ self.consumers = []
- def create_queue(self, queue_cls, topic, callback):
- """Create a queue using the class that was passed in and
- add it to our list of queues used for consuming
+ def declare_consumer(self, consumer_cls, topic, callback):
+ """Create a Consumer using the class that was passed in and
+ add it to our list of consumers
"""
- queue = queue_cls(self.channel, topic, callback,
- self.queue_num.next())
- self.queues.append(queue)
- return queue
+ consumer = consumer_cls(self.channel, topic, callback,
+ self.consumer_num.next())
+ self.consumers.append(consumer)
+ return consumer
- def consume(self, limit=None):
- """Consume from all queues"""
+ def iterconsume(self, limit=None):
+ """Return an iterator that will consume from all queues/consumers"""
while True:
try:
- queues_head = self.queues[:-1]
- queues_tail = self.queues[-1]
+ queues_head = self.consumers[:-1]
+ queues_tail = self.consumers[-1]
for queue in queues_head:
queue.consume(nowait=True)
queues_tail.consume(nowait=False)
@@ -391,6 +395,36 @@ class Connection(object):
'%s' % str(e)))
self.reconnect()
+ def consume(self, limit=None):
+ """Consume from all queues/consumers"""
+ it = self.iterconsume(limit=limit)
+ while True:
+ try:
+ it.next()
+ except StopIteration:
+ return
+
+ def consume_in_thread(self):
+ """Consumer from all queues/consumers in a greenthread"""
+ def _consumer_thread():
+ try:
+ self.consume()
+ except greenlet.GreenletExit:
+ return
+ if not self.consumer_thread:
+ self.consumer_thread = eventlet.spawn(_consumer_thread)
+ return self.consumer_thread
+
+ def cancel_consumer_thread(self):
+ """Cancel a consumer thread"""
+ if self.consumer_thread:
+ self.consumer_thread.kill()
+ try:
+ self.consumer_thread.wait()
+ except greenlet.GreenletExit:
+ pass
+ self.consumer_thread = None
+
def publisher_send(self, cls, topic, msg):
"""Send to a publisher based on the publisher class"""
while True:
@@ -408,20 +442,20 @@ class Connection(object):
except self.connection.connection_errors, e:
pass
- def direct_consumer(self, topic, callback):
+ def declare_direct_consumer(self, topic, callback):
"""Create a 'direct' queue.
In nova's use, this is generally a msg_id queue used for
responses for call/multicall
"""
- return self.create_queue(DirectQueue, topic, callback)
+ self.declare_consumer(DirectConsumer, topic, callback)
- def topic_consumer(self, topic, callback=None):
- """Create a 'topic' queue."""
- return self.create_queue(TopicQueue, topic, callback)
+ def declare_topic_consumer(self, topic, callback=None):
+ """Create a 'topic' consumer."""
+ self.declare_consumer(TopicConsumer, topic, callback)
- def fanout_consumer(self, topic, callback):
- """Create a 'fanout' queue"""
- return self.create_queue(FanoutQueue, topic, callback)
+ def declare_fanout_consumer(self, topic, callback):
+ """Create a 'fanout' consumer"""
+ self.declare_consumer(FanoutConsumer, topic, callback)
def direct_send(self, msg_id, msg):
"""Send a 'direct' message"""
@@ -638,18 +672,9 @@ def create_connection(new=True):
def create_consumer(conn, topic, proxy, fanout=False):
"""Create a consumer that calls a method in a proxy object"""
if fanout:
- return conn.fanout_consumer(topic, ProxyCallback(proxy))
+ conn.declare_fanout_consumer(topic, ProxyCallback(proxy))
else:
- return conn.topic_consumer(topic, ProxyCallback(proxy))
-
-
-def create_consumer_set(conn, consumers):
- # FIXME(comstud): Replace this however necessary
- # Returns an object that you can call .wait() on to consume
- # all queues?
- # Needs to have a .close() which will stop consuming?
- # Needs to also have an method for tests?
- raise NotImplemented
+ conn.declare_topic_consumer(topic, ProxyCallback(proxy))
def multicall(context, topic, msg):
@@ -666,7 +691,7 @@ def multicall(context, topic, msg):
conn = ConnectionContext()
wait_msg = MulticallWaiter(conn)
- conn.direct_consumer(msg_id, wait_msg)
+ conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, msg)
return wait_msg
diff --git a/nova/service.py b/nova/service.py
index a872a36ee..ab7925eb3 100644
--- a/nova/service.py
+++ b/nova/service.py
@@ -153,26 +153,17 @@ class Service(object):
self.topic)
# Share this same connection for these Consumers
- consumer_all = rpc.create_consumer(self.conn, self.topic, self,
+ rpc.create_consumer(self.conn, self.topic, self,
fanout=False)
node_topic = '%s.%s' % (self.topic, self.host)
- consumer_node = rpc.create_consumer(self.conn, node_topic, self,
+ rpc.create_consumer(self.conn, node_topic, self,
fanout=False)
- fanout = rpc.create_consumer(self.conn, self.topic, self, fanout=True)
+ rpc.create_consumer(self.conn, self.topic, self, fanout=True)
- consumers = [consumer_all, consumer_node, fanout]
- consumer_set = rpc.create_consumer_set(self.conn, consumers)
-
- # Wait forever, processing these consumers
- def _wait():
- try:
- consumer_set.wait()
- finally:
- consumer_set.close()
-
- self.consumer_set_thread = eventlet.spawn(_wait)
+ # Consume from all consumers in a thread
+ self.conn.consume_in_thread()
if self.report_interval:
pulse = utils.LoopingCall(self.report_state)
@@ -237,11 +228,6 @@ class Service(object):
logging.warn(_('Service killed that has no database entry'))
def stop(self):
- self.consumer_set_thread.kill()
- try:
- self.consumer_set_thread.wait()
- except greenlet.GreenletExit:
- pass
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try: