From f2c2a593c828fc86e298d3eb31672a09b498c41f Mon Sep 17 00:00:00 2001 From: Chris Behrens Date: Wed, 25 May 2011 15:42:24 -0700 Subject: fakerabbit's declare_consumer should support more than 1 consumer. also: make fakerabbit Backend.consume be an iterator like it should be.. --- nova/fakerabbit.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) (limited to 'nova/fakerabbit.py') diff --git a/nova/fakerabbit.py b/nova/fakerabbit.py index a7dee8caf..a29ba9d86 100644 --- a/nova/fakerabbit.py +++ b/nova/fakerabbit.py @@ -77,6 +77,10 @@ class Queue(object): class Backend(base.BaseBackend): + def __init__(self, connection, **kwargs): + super(Backend, self).__init__(connection, **kwargs) + self.consumers = [] + def queue_declare(self, queue, **kwargs): global QUEUES if queue not in QUEUES: @@ -97,16 +101,20 @@ class Backend(base.BaseBackend): EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key) def declare_consumer(self, queue, callback, *args, **kwargs): - self.current_queue = queue - self.current_callback = callback + self.consumers.append((queue, callback)) def consume(self, limit=None): + num = 0 while True: - item = self.get(self.current_queue) - if item: - self.current_callback(item) - raise StopIteration() - greenthread.sleep(0) + for (queue, callback) in self.consumers: + item = self.get(queue) + if item: + callback(item) + num += 1 + yield + if limit and num == limit: + raise StopIteration() + greenthread.sleep(0.1) def get(self, queue, no_ack=False): global QUEUES -- cgit From 90e30806a2e0c235612eb09792656cd861997f84 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Wed, 25 May 2011 15:42:24 -0700 Subject: fix consumers to actually be deleted and clean up cloud test --- nova/fakerabbit.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) (limited to 'nova/fakerabbit.py') diff --git a/nova/fakerabbit.py b/nova/fakerabbit.py index a29ba9d86..5f3e75c48 100644 --- a/nova/fakerabbit.py +++ b/nova/fakerabbit.py @@ -79,7 +79,7 @@ class Queue(object): class Backend(base.BaseBackend): def __init__(self, connection, **kwargs): super(Backend, self).__init__(connection, **kwargs) - self.consumers = [] + self.consumers = {} def queue_declare(self, queue, **kwargs): global QUEUES @@ -100,13 +100,18 @@ class Backend(base.BaseBackend): ' key %(routing_key)s') % locals()) EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key) - def declare_consumer(self, queue, callback, *args, **kwargs): - self.consumers.append((queue, callback)) + def declare_consumer(self, queue, callback, consumer_tag, *args, **kwargs): + LOG.debug("Adding consumer %s", consumer_tag) + self.consumers[consumer_tag] = (queue, callback) + + def cancel(self, consumer_tag): + LOG.debug("Removing consumer %s", consumer_tag) + del self.consumers[consumer_tag] def consume(self, limit=None): num = 0 while True: - for (queue, callback) in self.consumers: + for (queue, callback) in self.consumers.itervalues(): item = self.get(queue) if item: callback(item) -- cgit From c9b21b0619891c069251c568e4d89be791af56c3 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Wed, 25 May 2011 15:42:24 -0700 Subject: lots of fixes for rpc and extra imports --- nova/fakerabbit.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) (limited to 'nova/fakerabbit.py') diff --git a/nova/fakerabbit.py b/nova/fakerabbit.py index 5f3e75c48..ff993e29a 100644 --- a/nova/fakerabbit.py +++ b/nova/fakerabbit.py @@ -31,6 +31,7 @@ LOG = logging.getLogger("nova.fakerabbit") EXCHANGES = {} QUEUES = {} +CONSUMERS = {} class Message(base.BaseMessage): @@ -101,17 +102,20 @@ class Backend(base.BaseBackend): EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key) def declare_consumer(self, queue, callback, consumer_tag, *args, **kwargs): + global CONSUMERS LOG.debug("Adding consumer %s", consumer_tag) - self.consumers[consumer_tag] = (queue, callback) + CONSUMERS[consumer_tag] = (queue, callback) def cancel(self, consumer_tag): + global CONSUMERS LOG.debug("Removing consumer %s", consumer_tag) - del self.consumers[consumer_tag] + del CONSUMERS[consumer_tag] def consume(self, limit=None): + global CONSUMERS num = 0 while True: - for (queue, callback) in self.consumers.itervalues(): + for (queue, callback) in CONSUMERS.itervalues(): item = self.get(queue) if item: callback(item) @@ -147,5 +151,7 @@ class Backend(base.BaseBackend): def reset_all(): global EXCHANGES global QUEUES + global CONSUMERS EXCHANGES = {} QUEUES = {} + CONSUMERS = {} -- cgit From c7fe7e5e28b9f4bb999c8309f56953f6609cbc57 Mon Sep 17 00:00:00 2001 From: termie Date: Wed, 25 May 2011 15:42:49 -0700 Subject: cleanup the code for merging --- nova/fakerabbit.py | 4 ---- 1 file changed, 4 deletions(-) (limited to 'nova/fakerabbit.py') diff --git a/nova/fakerabbit.py b/nova/fakerabbit.py index ff993e29a..e7e9dab77 100644 --- a/nova/fakerabbit.py +++ b/nova/fakerabbit.py @@ -78,10 +78,6 @@ class Queue(object): class Backend(base.BaseBackend): - def __init__(self, connection, **kwargs): - super(Backend, self).__init__(connection, **kwargs) - self.consumers = {} - def queue_declare(self, queue, **kwargs): global QUEUES if queue not in QUEUES: -- cgit