diff options
author | Vishvananda Ishaya <vishvananda@gmail.com> | 2010-12-21 21:07:53 +0000 |
---|---|---|
committer | Vishvananda Ishaya <vishvananda@gmail.com> | 2010-12-21 21:07:53 +0000 |
commit | 1f7103964045a5433fa8b3580ed492d9d34acd0c (patch) | |
tree | 80328a684e1aafdcdb209b140682c646099e89d4 /nova/fakerabbit.py | |
parent | 132b12e27fa69319f85dee0089fad1ba1a342fd8 (diff) | |
parent | 40dba7dc0b8faef6dace3e9d54f43b19398c73fc (diff) | |
download | nova-1f7103964045a5433fa8b3580ed492d9d34acd0c.tar.gz nova-1f7103964045a5433fa8b3580ed492d9d34acd0c.tar.xz nova-1f7103964045a5433fa8b3580ed492d9d34acd0c.zip |
merge fakerabbit fix and turn fake back on for cloud unit tests
Diffstat (limited to 'nova/fakerabbit.py')
-rw-r--r-- | nova/fakerabbit.py | 135 |
1 files changed, 60 insertions, 75 deletions
diff --git a/nova/fakerabbit.py b/nova/fakerabbit.py index c64617931..792e4c344 100644 --- a/nova/fakerabbit.py +++ b/nova/fakerabbit.py @@ -25,6 +25,9 @@ from carrot.backends import base from eventlet import greenthread +EXCHANGES = {} +QUEUES = {} + class Message(base.BaseMessage): pass @@ -68,81 +71,63 @@ class Queue(object): return self._queue.get() -class Backend(object): - """ Singleton backend for testing """ - class __impl(base.BaseBackend): - def __init__(self, *args, **kwargs): - #super(__impl, self).__init__(*args, **kwargs) - self._exchanges = {} - self._queues = {} - - def _reset_all(self): - self._exchanges = {} - self._queues = {} - - def queue_declare(self, queue, **kwargs): - if queue not in self._queues: - logging.debug('Declaring queue %s', queue) - self._queues[queue] = Queue(queue) - - def exchange_declare(self, exchange, type, *args, **kwargs): - if exchange not in self._exchanges: - logging.debug('Declaring exchange %s', exchange) - self._exchanges[exchange] = Exchange(exchange, type) - - def queue_bind(self, queue, exchange, routing_key, **kwargs): - logging.debug('Binding %s to %s with key %s', - queue, exchange, routing_key) - self._exchanges[exchange].bind(self._queues[queue].push, - routing_key) - - def declare_consumer(self, queue, callback, *args, **kwargs): - self.current_queue = queue - self.current_callback = callback - - def consume(self, *args, **kwargs): - while True: - item = self.get(self.current_queue) - if item: - self.current_callback(item) - raise StopIteration() - greenthread.sleep(0) - - def get(self, queue, no_ack=False): - if not queue in self._queues or not self._queues[queue].size(): - return None - (message_data, content_type, content_encoding) = \ - self._queues[queue].pop() - message = Message(backend=self, body=message_data, - content_type=content_type, - content_encoding=content_encoding) - message.result = True - logging.debug('Getting from %s: %s', queue, message) - return message - - def prepare_message(self, message_data, delivery_mode, - content_type, content_encoding, **kwargs): - """Prepare message for sending.""" - return (message_data, content_type, content_encoding) - - def publish(self, message, exchange, routing_key, **kwargs): - if exchange in self._exchanges: - self._exchanges[exchange].publish( - message, routing_key=routing_key) - - __instance = None - - def __init__(self, *args, **kwargs): - if Backend.__instance is None: - Backend.__instance = Backend.__impl(*args, **kwargs) - self.__dict__['_Backend__instance'] = Backend.__instance - - def __getattr__(self, attr): - return getattr(self.__instance, attr) - - def __setattr__(self, attr, value): - return setattr(self.__instance, attr, value) +class Backend(base.BaseBackend): + def queue_declare(self, queue, **kwargs): + global QUEUES + if queue not in QUEUES: + logging.debug('Declaring queue %s', queue) + QUEUES[queue] = Queue(queue) + + def exchange_declare(self, exchange, type, *args, **kwargs): + global EXCHANGES + if exchange not in EXCHANGES: + logging.debug('Declaring exchange %s', exchange) + EXCHANGES[exchange] = Exchange(exchange, type) + + def queue_bind(self, queue, exchange, routing_key, **kwargs): + global EXCHANGES + global QUEUES + logging.debug('Binding %s to %s with key %s', + queue, exchange, routing_key) + EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key) + + def declare_consumer(self, queue, callback, *args, **kwargs): + self.current_queue = queue + self.current_callback = callback + + def consume(self, limit=None): + while True: + item = self.get(self.current_queue) + if item: + self.current_callback(item) + raise StopIteration() + greenthread.sleep(0) + + def get(self, queue, no_ack=False): + global QUEUES + if not queue in QUEUES or not QUEUES[queue].size(): + return None + (message_data, content_type, content_encoding) = QUEUES[queue].pop() + message = Message(backend=self, body=message_data, + content_type=content_type, + content_encoding=content_encoding) + message.result = True + logging.debug('Getting from %s: %s', queue, message) + return message + + def prepare_message(self, message_data, delivery_mode, + content_type, content_encoding, **kwargs): + """Prepare message for sending.""" + return (message_data, content_type, content_encoding) + + def publish(self, message, exchange, routing_key, **kwargs): + global EXCHANGES + if exchange in EXCHANGES: + EXCHANGES[exchange].publish(message, routing_key=routing_key) def reset_all(): - Backend()._reset_all() + global EXCHANGES + global QUEUES + EXCHANGES = {} + QUEUES = {} |