summaryrefslogtreecommitdiffstats
path: root/nova/fakerabbit.py
diff options
context:
space:
mode:
Diffstat (limited to 'nova/fakerabbit.py')
-rw-r--r--nova/fakerabbit.py140
1 files changed, 63 insertions, 77 deletions
diff --git a/nova/fakerabbit.py b/nova/fakerabbit.py
index c64617931..79d8b894d 100644
--- a/nova/fakerabbit.py
+++ b/nova/fakerabbit.py
@@ -25,6 +25,10 @@ from carrot.backends import base
from eventlet import greenthread
+EXCHANGES = {}
+QUEUES = {}
+
+
class Message(base.BaseMessage):
pass
@@ -37,12 +41,12 @@ class Exchange(object):
self._routes = {}
def publish(self, message, routing_key=None):
- logging.debug('(%s) publish (key: %s) %s',
+ logging.debug(_('(%s) publish (key: %s) %s'),
self.name, routing_key, message)
routing_key = routing_key.split('.')[0]
if routing_key in self._routes:
for f in self._routes[routing_key]:
- logging.debug('Publishing to route %s', f)
+ logging.debug(_('Publishing to route %s'), f)
f(message, routing_key=routing_key)
def bind(self, callback, routing_key):
@@ -68,81 +72,63 @@ class Queue(object):
return self._queue.get()
-class Backend(object):
- """ Singleton backend for testing """
- class __impl(base.BaseBackend):
- def __init__(self, *args, **kwargs):
- #super(__impl, self).__init__(*args, **kwargs)
- self._exchanges = {}
- self._queues = {}
-
- def _reset_all(self):
- self._exchanges = {}
- self._queues = {}
-
- def queue_declare(self, queue, **kwargs):
- if queue not in self._queues:
- logging.debug('Declaring queue %s', queue)
- self._queues[queue] = Queue(queue)
-
- def exchange_declare(self, exchange, type, *args, **kwargs):
- if exchange not in self._exchanges:
- logging.debug('Declaring exchange %s', exchange)
- self._exchanges[exchange] = Exchange(exchange, type)
-
- def queue_bind(self, queue, exchange, routing_key, **kwargs):
- logging.debug('Binding %s to %s with key %s',
- queue, exchange, routing_key)
- self._exchanges[exchange].bind(self._queues[queue].push,
- routing_key)
-
- def declare_consumer(self, queue, callback, *args, **kwargs):
- self.current_queue = queue
- self.current_callback = callback
-
- def consume(self, *args, **kwargs):
- while True:
- item = self.get(self.current_queue)
- if item:
- self.current_callback(item)
- raise StopIteration()
- greenthread.sleep(0)
-
- def get(self, queue, no_ack=False):
- if not queue in self._queues or not self._queues[queue].size():
- return None
- (message_data, content_type, content_encoding) = \
- self._queues[queue].pop()
- message = Message(backend=self, body=message_data,
- content_type=content_type,
- content_encoding=content_encoding)
- message.result = True
- logging.debug('Getting from %s: %s', queue, message)
- return message
-
- def prepare_message(self, message_data, delivery_mode,
- content_type, content_encoding, **kwargs):
- """Prepare message for sending."""
- return (message_data, content_type, content_encoding)
-
- def publish(self, message, exchange, routing_key, **kwargs):
- if exchange in self._exchanges:
- self._exchanges[exchange].publish(
- message, routing_key=routing_key)
-
- __instance = None
-
- def __init__(self, *args, **kwargs):
- if Backend.__instance is None:
- Backend.__instance = Backend.__impl(*args, **kwargs)
- self.__dict__['_Backend__instance'] = Backend.__instance
-
- def __getattr__(self, attr):
- return getattr(self.__instance, attr)
-
- def __setattr__(self, attr, value):
- return setattr(self.__instance, attr, value)
+class Backend(base.BaseBackend):
+ def queue_declare(self, queue, **kwargs):
+ global QUEUES
+ if queue not in QUEUES:
+ logging.debug(_('Declaring queue %s'), queue)
+ QUEUES[queue] = Queue(queue)
+
+ def exchange_declare(self, exchange, type, *args, **kwargs):
+ global EXCHANGES
+ if exchange not in EXCHANGES:
+ logging.debug(_('Declaring exchange %s'), exchange)
+ EXCHANGES[exchange] = Exchange(exchange, type)
+
+ def queue_bind(self, queue, exchange, routing_key, **kwargs):
+ global EXCHANGES
+ global QUEUES
+ logging.debug(_('Binding %s to %s with key %s'),
+ queue, exchange, routing_key)
+ EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key)
+
+ def declare_consumer(self, queue, callback, *args, **kwargs):
+ self.current_queue = queue
+ self.current_callback = callback
+
+ def consume(self, limit=None):
+ while True:
+ item = self.get(self.current_queue)
+ if item:
+ self.current_callback(item)
+ raise StopIteration()
+ greenthread.sleep(0)
+
+ def get(self, queue, no_ack=False):
+ global QUEUES
+ if not queue in QUEUES or not QUEUES[queue].size():
+ return None
+ (message_data, content_type, content_encoding) = QUEUES[queue].pop()
+ message = Message(backend=self, body=message_data,
+ content_type=content_type,
+ content_encoding=content_encoding)
+ message.result = True
+ logging.debug(_('Getting from %s: %s'), queue, message)
+ return message
+
+ def prepare_message(self, message_data, delivery_mode,
+ content_type, content_encoding, **kwargs):
+ """Prepare message for sending."""
+ return (message_data, content_type, content_encoding)
+
+ def publish(self, message, exchange, routing_key, **kwargs):
+ global EXCHANGES
+ if exchange in EXCHANGES:
+ EXCHANGES[exchange].publish(message, routing_key=routing_key)
def reset_all():
- Backend()._reset_all()
+ global EXCHANGES
+ global QUEUES
+ EXCHANGES = {}
+ QUEUES = {}