diff options
Diffstat (limited to 'nova/fakerabbit.py')
| -rw-r--r-- | nova/fakerabbit.py | 31 |
1 files changed, 23 insertions, 8 deletions
diff --git a/nova/fakerabbit.py b/nova/fakerabbit.py index a7dee8caf..e7e9dab77 100644 --- a/nova/fakerabbit.py +++ b/nova/fakerabbit.py @@ -31,6 +31,7 @@ LOG = logging.getLogger("nova.fakerabbit") EXCHANGES = {} QUEUES = {} +CONSUMERS = {} class Message(base.BaseMessage): @@ -96,17 +97,29 @@ class Backend(base.BaseBackend): ' key %(routing_key)s') % locals()) EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key) - def declare_consumer(self, queue, callback, *args, **kwargs): - self.current_queue = queue - self.current_callback = callback + def declare_consumer(self, queue, callback, consumer_tag, *args, **kwargs): + global CONSUMERS + LOG.debug("Adding consumer %s", consumer_tag) + CONSUMERS[consumer_tag] = (queue, callback) + + def cancel(self, consumer_tag): + global CONSUMERS + LOG.debug("Removing consumer %s", consumer_tag) + del CONSUMERS[consumer_tag] def consume(self, limit=None): + global CONSUMERS + num = 0 while True: - item = self.get(self.current_queue) - if item: - self.current_callback(item) - raise StopIteration() - greenthread.sleep(0) + for (queue, callback) in CONSUMERS.itervalues(): + item = self.get(queue) + if item: + callback(item) + num += 1 + yield + if limit and num == limit: + raise StopIteration() + greenthread.sleep(0.1) def get(self, queue, no_ack=False): global QUEUES @@ -134,5 +147,7 @@ class Backend(base.BaseBackend): def reset_all(): global EXCHANGES global QUEUES + global CONSUMERS EXCHANGES = {} QUEUES = {} + CONSUMERS = {} |
