summaryrefslogtreecommitdiffstats
path: root/nova/fakerabbit.py
diff options
context:
space:
mode:
authorSandy Walsh <sandy.walsh@rackspace.com>2011-05-27 07:31:29 -0700
committerSandy Walsh <sandy.walsh@rackspace.com>2011-05-27 07:31:29 -0700
commitceb6eee5ddbbd202af80ae32795bbf53d2e9ef49 (patch)
tree9cf90f9db1d5e563c5326c96d2d34e76a879d7a8 /nova/fakerabbit.py
parent3f911877a2a9facdf153f173b3fb76a18e44a2ac (diff)
parenta7c36f68793a7db454d344187d4596ebecc8ade0 (diff)
downloadnova-ceb6eee5ddbbd202af80ae32795bbf53d2e9ef49.tar.gz
nova-ceb6eee5ddbbd202af80ae32795bbf53d2e9ef49.tar.xz
nova-ceb6eee5ddbbd202af80ae32795bbf53d2e9ef49.zip
trunk merge
Diffstat (limited to 'nova/fakerabbit.py')
-rw-r--r--nova/fakerabbit.py31
1 files changed, 23 insertions, 8 deletions
diff --git a/nova/fakerabbit.py b/nova/fakerabbit.py
index a7dee8caf..e7e9dab77 100644
--- a/nova/fakerabbit.py
+++ b/nova/fakerabbit.py
@@ -31,6 +31,7 @@ LOG = logging.getLogger("nova.fakerabbit")
EXCHANGES = {}
QUEUES = {}
+CONSUMERS = {}
class Message(base.BaseMessage):
@@ -96,17 +97,29 @@ class Backend(base.BaseBackend):
' key %(routing_key)s') % locals())
EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key)
- def declare_consumer(self, queue, callback, *args, **kwargs):
- self.current_queue = queue
- self.current_callback = callback
+ def declare_consumer(self, queue, callback, consumer_tag, *args, **kwargs):
+ global CONSUMERS
+ LOG.debug("Adding consumer %s", consumer_tag)
+ CONSUMERS[consumer_tag] = (queue, callback)
+
+ def cancel(self, consumer_tag):
+ global CONSUMERS
+ LOG.debug("Removing consumer %s", consumer_tag)
+ del CONSUMERS[consumer_tag]
def consume(self, limit=None):
+ global CONSUMERS
+ num = 0
while True:
- item = self.get(self.current_queue)
- if item:
- self.current_callback(item)
- raise StopIteration()
- greenthread.sleep(0)
+ for (queue, callback) in CONSUMERS.itervalues():
+ item = self.get(queue)
+ if item:
+ callback(item)
+ num += 1
+ yield
+ if limit and num == limit:
+ raise StopIteration()
+ greenthread.sleep(0.1)
def get(self, queue, no_ack=False):
global QUEUES
@@ -134,5 +147,7 @@ class Backend(base.BaseBackend):
def reset_all():
global EXCHANGES
global QUEUES
+ global CONSUMERS
EXCHANGES = {}
QUEUES = {}
+ CONSUMERS = {}