summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--nova/fakerabbit.py22
1 files changed, 15 insertions, 7 deletions
diff --git a/nova/fakerabbit.py b/nova/fakerabbit.py
index a7dee8caf..a29ba9d86 100644
--- a/nova/fakerabbit.py
+++ b/nova/fakerabbit.py
@@ -77,6 +77,10 @@ class Queue(object):
class Backend(base.BaseBackend):
+ def __init__(self, connection, **kwargs):
+ super(Backend, self).__init__(connection, **kwargs)
+ self.consumers = []
+
def queue_declare(self, queue, **kwargs):
global QUEUES
if queue not in QUEUES:
@@ -97,16 +101,20 @@ class Backend(base.BaseBackend):
EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key)
def declare_consumer(self, queue, callback, *args, **kwargs):
- self.current_queue = queue
- self.current_callback = callback
+ self.consumers.append((queue, callback))
def consume(self, limit=None):
+ num = 0
while True:
- item = self.get(self.current_queue)
- if item:
- self.current_callback(item)
- raise StopIteration()
- greenthread.sleep(0)
+ for (queue, callback) in self.consumers:
+ item = self.get(queue)
+ if item:
+ callback(item)
+ num += 1
+ yield
+ if limit and num == limit:
+ raise StopIteration()
+ greenthread.sleep(0.1)
def get(self, queue, no_ack=False):
global QUEUES