diff options
| author | Sandy Walsh <sandy.walsh@rackspace.com> | 2011-05-27 07:31:29 -0700 |
|---|---|---|
| committer | Sandy Walsh <sandy.walsh@rackspace.com> | 2011-05-27 07:31:29 -0700 |
| commit | ceb6eee5ddbbd202af80ae32795bbf53d2e9ef49 (patch) | |
| tree | 9cf90f9db1d5e563c5326c96d2d34e76a879d7a8 /nova/fakerabbit.py | |
| parent | 3f911877a2a9facdf153f173b3fb76a18e44a2ac (diff) | |
| parent | a7c36f68793a7db454d344187d4596ebecc8ade0 (diff) | |
| download | nova-ceb6eee5ddbbd202af80ae32795bbf53d2e9ef49.tar.gz nova-ceb6eee5ddbbd202af80ae32795bbf53d2e9ef49.tar.xz nova-ceb6eee5ddbbd202af80ae32795bbf53d2e9ef49.zip | |
trunk merge
Diffstat (limited to 'nova/fakerabbit.py')
| -rw-r--r-- | nova/fakerabbit.py | 31 |
1 files changed, 23 insertions, 8 deletions
diff --git a/nova/fakerabbit.py b/nova/fakerabbit.py index a7dee8caf..e7e9dab77 100644 --- a/nova/fakerabbit.py +++ b/nova/fakerabbit.py @@ -31,6 +31,7 @@ LOG = logging.getLogger("nova.fakerabbit") EXCHANGES = {} QUEUES = {} +CONSUMERS = {} class Message(base.BaseMessage): @@ -96,17 +97,29 @@ class Backend(base.BaseBackend): ' key %(routing_key)s') % locals()) EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key) - def declare_consumer(self, queue, callback, *args, **kwargs): - self.current_queue = queue - self.current_callback = callback + def declare_consumer(self, queue, callback, consumer_tag, *args, **kwargs): + global CONSUMERS + LOG.debug("Adding consumer %s", consumer_tag) + CONSUMERS[consumer_tag] = (queue, callback) + + def cancel(self, consumer_tag): + global CONSUMERS + LOG.debug("Removing consumer %s", consumer_tag) + del CONSUMERS[consumer_tag] def consume(self, limit=None): + global CONSUMERS + num = 0 while True: - item = self.get(self.current_queue) - if item: - self.current_callback(item) - raise StopIteration() - greenthread.sleep(0) + for (queue, callback) in CONSUMERS.itervalues(): + item = self.get(queue) + if item: + callback(item) + num += 1 + yield + if limit and num == limit: + raise StopIteration() + greenthread.sleep(0.1) def get(self, queue, no_ack=False): global QUEUES @@ -134,5 +147,7 @@ class Backend(base.BaseBackend): def reset_all(): global EXCHANGES global QUEUES + global CONSUMERS EXCHANGES = {} QUEUES = {} + CONSUMERS = {} |
