summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRussell Bryant <rbryant@redhat.com>2012-04-02 18:23:09 -0400
committerRussell Bryant <rbryant@redhat.com>2012-04-02 18:35:26 -0400
commit70a712921f1d9253653ebe0d25a2c23d5cf5d750 (patch)
treee429a81d90ba8f437736a73e697de7aff2fb723a
parenta8aa3ffdeb4d171ec8b7b07472ed9008df1efb75 (diff)
downloadnova-70a712921f1d9253653ebe0d25a2c23d5cf5d750.tar.gz
nova-70a712921f1d9253653ebe0d25a2c23d5cf5d750.tar.xz
nova-70a712921f1d9253653ebe0d25a2c23d5cf5d750.zip
Remove nova.rpc.impl_carrot.
This module was marked as deprecated and scheduled for removal in Essex. Remove it now that Folsom development is open. nova.rpc.impl_kombu should be used instead. This patch also removes nova.testing.fake.rabbit, since as far as I can tell, it isn't used anymore and was the last thing still using the carrot dependency. Change-Id: I8cfb2d09ee5eed439ec1d152261f7097faf08ad6
-rw-r--r--nova/rpc/impl_carrot.py684
-rw-r--r--nova/test.py5
-rw-r--r--nova/testing/fake/rabbit.py153
-rw-r--r--nova/tests/rpc/test_carrot.py41
-rw-r--r--tools/pip-requires1
5 files changed, 0 insertions, 884 deletions
diff --git a/nova/rpc/impl_carrot.py b/nova/rpc/impl_carrot.py
deleted file mode 100644
index 22586b1a9..000000000
--- a/nova/rpc/impl_carrot.py
+++ /dev/null
@@ -1,684 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""AMQP-based RPC.
-
-Queues have consumers and publishers.
-
-No fan-out support yet.
-
-"""
-
-import inspect
-import json
-import sys
-import time
-import traceback
-import uuid
-
-from carrot import connection as carrot_connection
-from carrot import messaging
-import eventlet
-from eventlet import greenpool
-from eventlet import pools
-from eventlet import queue
-import greenlet
-
-from nova import context
-from nova import exception
-from nova import flags
-from nova import local
-from nova import log as logging
-from nova.rpc import common as rpc_common
-from nova.testing import fake
-from nova import utils
-
-FLAGS = flags.FLAGS
-LOG = logging.getLogger(__name__)
-
-
-@utils.deprecated('Use of carrot will be removed in a future release. '
- 'Use kombu, instead.')
-class Connection(carrot_connection.BrokerConnection, rpc_common.Connection):
- """Connection instance object."""
-
- def __init__(self, *args, **kwargs):
- super(Connection, self).__init__(*args, **kwargs)
- self._rpc_consumers = []
- self._rpc_consumer_thread = None
-
- @classmethod
- def instance(cls, new=True):
- """Returns the instance."""
- if new or not hasattr(cls, '_instance'):
- params = dict(hostname=FLAGS.rabbit_host,
- port=FLAGS.rabbit_port,
- ssl=FLAGS.rabbit_use_ssl,
- userid=FLAGS.rabbit_userid,
- password=FLAGS.rabbit_password,
- virtual_host=FLAGS.rabbit_virtual_host)
-
- if FLAGS.fake_rabbit:
- params['backend_cls'] = fake.rabbit.Backend
-
- # NOTE(vish): magic is fun!
- # pylint: disable=W0142
- if new:
- return cls(**params)
- else:
- cls._instance = cls(**params)
- return cls._instance
-
- @classmethod
- def recreate(cls):
- """Recreates the connection instance.
-
- This is necessary to recover from some network errors/disconnects.
-
- """
- try:
- del cls._instance
- except AttributeError, e:
- # The _instance stuff is for testing purposes. Usually we don't use
- # it. So don't freak out if it doesn't exist.
- pass
- return cls.instance()
-
- def close(self):
- self.cancel_consumer_thread()
- for consumer in self._rpc_consumers:
- try:
- consumer.close()
- except Exception:
- # ignore all errors
- pass
- self._rpc_consumers = []
- carrot_connection.BrokerConnection.close(self)
-
- def consume_in_thread(self):
- """Consumer from all queues/consumers in a greenthread"""
-
- consumer_set = ConsumerSet(connection=self,
- consumer_list=self._rpc_consumers)
-
- def _consumer_thread():
- try:
- consumer_set.wait()
- except greenlet.GreenletExit:
- return
- if self._rpc_consumer_thread is None:
- self._rpc_consumer_thread = eventlet.spawn(_consumer_thread)
- return self._rpc_consumer_thread
-
- def cancel_consumer_thread(self):
- """Cancel a consumer thread"""
- if self._rpc_consumer_thread is not None:
- self._rpc_consumer_thread.kill()
- try:
- self._rpc_consumer_thread.wait()
- except greenlet.GreenletExit:
- pass
- self._rpc_consumer_thread = None
-
- def create_consumer(self, topic, proxy, fanout=False):
- """Create a consumer that calls methods in the proxy"""
- if fanout:
- consumer = FanoutAdapterConsumer(
- connection=self,
- topic=topic,
- proxy=proxy)
- else:
- consumer = TopicAdapterConsumer(
- connection=self,
- topic=topic,
- proxy=proxy)
- self._rpc_consumers.append(consumer)
-
-
-class Pool(pools.Pool):
- """Class that implements a Pool of Connections."""
-
- # TODO(comstud): Timeout connections not used in a while
- def create(self):
- LOG.debug('Pool creating new connection')
- return Connection.instance(new=True)
-
-# Create a ConnectionPool to use for RPC calls. We'll order the
-# pool as a stack (LIFO), so that we can potentially loop through and
-# timeout old unused connections at some point
-ConnectionPool = Pool(
- max_size=FLAGS.rpc_conn_pool_size,
- order_as_stack=True)
-
-
-class Consumer(messaging.Consumer):
- """Consumer base class.
-
- Contains methods for connecting the fetch method to async loops.
-
- """
-
- def __init__(self, *args, **kwargs):
- max_retries = FLAGS.rabbit_max_retries
- sleep_time = FLAGS.rabbit_retry_interval
- tries = 0
- while True:
- tries += 1
- if tries > 1:
- time.sleep(sleep_time)
- # backoff for next retry attempt.. if there is one
- sleep_time += FLAGS.rabbit_retry_backoff
- if sleep_time > 30:
- sleep_time = 30
- try:
- super(Consumer, self).__init__(*args, **kwargs)
- self.failed_connection = False
- break
- except Exception as e: # Catching all because carrot sucks
- self.failed_connection = True
- if max_retries > 0 and tries == max_retries:
- break
- fl_host = FLAGS.rabbit_host
- fl_port = FLAGS.rabbit_port
- fl_intv = sleep_time
- LOG.error(_('AMQP server on %(fl_host)s:%(fl_port)d is'
- ' unreachable: %(e)s. Trying again in %(fl_intv)d'
- ' seconds.') % locals())
- if self.failed_connection:
- LOG.error(_('Unable to connect to AMQP server '
- 'after %(tries)d tries. Shutting down.') % locals())
- sys.exit(1)
-
- def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
- """Wraps the parent fetch with some logic for failed connection."""
- # TODO(vish): the logic for failed connections and logging should be
- # refactored into some sort of connection manager object
- try:
- if self.failed_connection:
- # NOTE(vish): connection is defined in the parent class, we can
- # recreate it as long as we create the backend too
- # pylint: disable=W0201
- self.connection = Connection.recreate()
- self.backend = self.connection.create_backend()
- self.declare()
- return super(Consumer, self).fetch(no_ack,
- auto_ack,
- enable_callbacks)
- if self.failed_connection:
- LOG.error(_('Reconnected to queue'))
- self.failed_connection = False
- # NOTE(vish): This is catching all errors because we really don't
- # want exceptions to be logged 10 times a second if some
- # persistent failure occurs.
- except Exception, e: # pylint: disable=W0703
- if not self.failed_connection:
- LOG.exception(_('Failed to fetch message from queue: %s') % e)
- self.failed_connection = True
-
-
-class AdapterConsumer(Consumer):
- """Calls methods on a proxy object based on method and args."""
-
- def __init__(self, connection=None, topic='broadcast', proxy=None):
- LOG.debug(_('Initing the Adapter Consumer for %s') % topic)
- self.proxy = proxy
- self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
- super(AdapterConsumer, self).__init__(connection=connection,
- topic=topic)
- self.register_callback(self.process_data)
-
- def process_data(self, message_data, message):
- """Consumer callback to call a method on a proxy object.
-
- Parses the message for validity and fires off a thread to call the
- proxy object method.
-
- Message data should be a dictionary with two keys:
- method: string representing the method to call
- args: dictionary of arg: value
-
- Example: {'method': 'echo', 'args': {'value': 42}}
-
- """
- # It is important to clear the context here, because at this point
- # the previous context is stored in local.store.context
- if hasattr(local.store, 'context'):
- del local.store.context
- rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
- # This will be popped off in _unpack_context
- msg_id = message_data.get('_msg_id', None)
- ctxt = _unpack_context(message_data)
-
- method = message_data.get('method')
- args = message_data.get('args', {})
- message.ack()
- if not method:
- # NOTE(vish): we may not want to ack here, but that means that bad
- # messages stay in the queue indefinitely, so for now
- # we just log the message and send an error string
- # back to the caller
- LOG.warn(_('no method for message: %s') % message_data)
- ctxt.reply(msg_id,
- _('No method for message: %s') % message_data)
- return
- self.pool.spawn_n(self._process_data, ctxt, method, args)
-
- @exception.wrap_exception()
- def _process_data(self, ctxt, method, args):
- """Thread that magically looks for a method on the proxy
- object and calls it.
- """
- ctxt.update_store()
- node_func = getattr(self.proxy, str(method))
- node_args = dict((str(k), v) for k, v in args.iteritems())
- # NOTE(vish): magic is fun!
- try:
- rval = node_func(context=ctxt, **node_args)
- # Check if the result was a generator
- if inspect.isgenerator(rval):
- for x in rval:
- ctxt.reply(x, None)
- else:
- ctxt.reply(rval, None)
-
- # This final None tells multicall that it is done.
- ctxt.reply(ending=True)
- except Exception as e:
- LOG.exception('Exception during message handling')
- ctxt.reply(None, sys.exc_info())
- return
-
-
-class TopicAdapterConsumer(AdapterConsumer):
- """Consumes messages on a specific topic."""
-
- exchange_type = 'topic'
-
- def __init__(self, connection=None, topic='broadcast', proxy=None):
- self.queue = topic
- self.routing_key = topic
- self.exchange = FLAGS.control_exchange
- self.durable = FLAGS.rabbit_durable_queues
- super(TopicAdapterConsumer, self).__init__(connection=connection,
- topic=topic, proxy=proxy)
-
-
-class FanoutAdapterConsumer(AdapterConsumer):
- """Consumes messages from a fanout exchange."""
-
- exchange_type = 'fanout'
-
- def __init__(self, connection=None, topic='broadcast', proxy=None):
- self.exchange = '%s_fanout' % topic
- self.routing_key = topic
- unique = uuid.uuid4().hex
- self.queue = '%s_fanout_%s' % (topic, unique)
- self.durable = False
- # Fanout creates unique queue names, so we should auto-remove
- # them when done, so they're not left around on restart.
- # Also, we're the only one that should be consuming. exclusive
- # implies auto_delete, so we'll just set that..
- self.exclusive = True
- LOG.info(_('Created "%(exchange)s" fanout exchange '
- 'with "%(key)s" routing key'),
- dict(exchange=self.exchange, key=self.routing_key))
- super(FanoutAdapterConsumer, self).__init__(connection=connection,
- topic=topic, proxy=proxy)
-
-
-class ConsumerSet(object):
- """Groups consumers to listen on together on a single connection."""
-
- def __init__(self, connection, consumer_list):
- self.consumer_list = set(consumer_list)
- self.consumer_set = None
- self.enabled = True
- self.init(connection)
-
- def init(self, conn):
- if not conn:
- conn = Connection.instance(new=True)
- if self.consumer_set:
- self.consumer_set.close()
- self.consumer_set = messaging.ConsumerSet(conn)
- for consumer in self.consumer_list:
- consumer.connection = conn
- # consumer.backend is set for us
- self.consumer_set.add_consumer(consumer)
-
- def reconnect(self):
- self.init(None)
-
- def wait(self, limit=None):
- running = True
- while running:
- it = self.consumer_set.iterconsume(limit=limit)
- if not it:
- break
- while True:
- try:
- it.next()
- except StopIteration:
- return
- except greenlet.GreenletExit:
- running = False
- break
- except Exception as e:
- LOG.exception(_("Exception while processing consumer"))
- self.reconnect()
- # Break to outer loop
- break
-
- def close(self):
- self.consumer_set.close()
-
-
-class Publisher(messaging.Publisher):
- """Publisher base class."""
- pass
-
-
-class TopicPublisher(Publisher):
- """Publishes messages on a specific topic."""
-
- exchange_type = 'topic'
-
- def __init__(self, connection=None, topic='broadcast', durable=None):
- self.routing_key = topic
- self.exchange = FLAGS.control_exchange
- self.durable = (FLAGS.rabbit_durable_queues if durable is None
- else durable)
- super(TopicPublisher, self).__init__(connection=connection)
-
-
-class FanoutPublisher(Publisher):
- """Publishes messages to a fanout exchange."""
-
- exchange_type = 'fanout'
-
- def __init__(self, topic, connection=None):
- self.exchange = '%s_fanout' % topic
- self.queue = '%s_fanout' % topic
- self.durable = False
- self.auto_delete = True
- LOG.info(_('Creating "%(exchange)s" fanout exchange'),
- dict(exchange=self.exchange))
- super(FanoutPublisher, self).__init__(connection=connection)
-
-
-class DirectConsumer(Consumer):
- """Consumes messages directly on a channel specified by msg_id."""
-
- exchange_type = 'direct'
-
- def __init__(self, connection=None, msg_id=None):
- self.queue = msg_id
- self.routing_key = msg_id
- self.exchange = msg_id
- self.durable = False
- self.auto_delete = True
- self.exclusive = True
- super(DirectConsumer, self).__init__(connection=connection)
-
-
-class DirectPublisher(Publisher):
- """Publishes messages directly on a channel specified by msg_id."""
-
- exchange_type = 'direct'
-
- def __init__(self, connection=None, msg_id=None):
- self.routing_key = msg_id
- self.exchange = msg_id
- self.durable = False
- self.auto_delete = True
- super(DirectPublisher, self).__init__(connection=connection)
-
-
-def msg_reply(msg_id, reply=None, failure=None, ending=False):
- """Sends a reply or an error on the channel signified by msg_id.
-
- Failure should be a sys.exc_info() tuple.
-
- """
- if failure:
- message = str(failure[1])
- tb = traceback.format_exception(*failure)
- LOG.error(_("Returning exception %s to caller"), message)
- LOG.error(tb)
- failure = (failure[0].__name__, str(failure[1]), tb)
-
- with ConnectionPool.item() as conn:
- publisher = DirectPublisher(connection=conn, msg_id=msg_id)
- try:
- msg = {'result': reply, 'failure': failure}
- if ending:
- msg['ending'] = True
- publisher.send(msg)
- except TypeError:
- msg = {'result': dict((k, repr(v))
- for k, v in reply.__dict__.iteritems()),
- 'failure': failure}
- if ending:
- msg['ending'] = True
- publisher.send(msg)
-
- publisher.close()
-
-
-def _unpack_context(msg):
- """Unpack context from msg."""
- context_dict = {}
- for key in list(msg.keys()):
- # NOTE(vish): Some versions of python don't like unicode keys
- # in kwargs.
- key = str(key)
- if key.startswith('_context_'):
- value = msg.pop(key)
- context_dict[key[9:]] = value
- context_dict['msg_id'] = msg.pop('_msg_id', None)
- ctx = RpcContext.from_dict(context_dict)
- LOG.debug(_('unpacked context: %s'), ctx.to_dict())
- return ctx
-
-
-def _pack_context(msg, context):
- """Pack context into msg.
-
- Values for message keys need to be less than 255 chars, so we pull
- context out into a bunch of separate keys. If we want to support
- more arguments in rabbit messages, we may want to do the same
- for args at some point.
-
- """
- context_d = dict([('_context_%s' % key, value)
- for (key, value) in context.to_dict().iteritems()])
- msg.update(context_d)
-
-
-class RpcContext(context.RequestContext):
- def __init__(self, *args, **kwargs):
- msg_id = kwargs.pop('msg_id', None)
- self.msg_id = msg_id
- super(RpcContext, self).__init__(*args, **kwargs)
-
- def reply(self, reply=None, failure=None, ending=False):
- if self.msg_id:
- msg_reply(self.msg_id, reply, failure, ending)
- if ending:
- self.msg_id = None
-
-
-def multicall(context, topic, msg, timeout=None):
- """Make a call that returns multiple times."""
- # NOTE(russellb): carrot doesn't support timeouts
- LOG.debug(_('Making asynchronous call on %s ...'), topic)
- msg_id = uuid.uuid4().hex
- msg.update({'_msg_id': msg_id})
- LOG.debug(_('MSG_ID is %s') % (msg_id))
- _pack_context(msg, context)
-
- con_conn = ConnectionPool.get()
- consumer = DirectConsumer(connection=con_conn, msg_id=msg_id)
- wait_msg = MulticallWaiter(consumer)
- consumer.register_callback(wait_msg)
-
- publisher = TopicPublisher(connection=con_conn, topic=topic)
- publisher.send(msg)
- publisher.close()
-
- return wait_msg
-
-
-class MulticallWaiter(object):
- def __init__(self, consumer):
- self._consumer = consumer
- self._results = queue.Queue()
- self._closed = False
- self._got_ending = False
-
- def close(self):
- if self._closed:
- return
- self._closed = True
- self._consumer.close()
- ConnectionPool.put(self._consumer.connection)
-
- def __call__(self, data, message):
- """Acks message and sets result."""
- message.ack()
- if data['failure']:
- self._results.put(rpc_common.RemoteError(*data['failure']))
- elif data.get('ending', False):
- self._got_ending = True
- else:
- self._results.put(data['result'])
-
- def __iter__(self):
- return self.wait()
-
- def wait(self):
- while not self._closed:
- try:
- rv = self._consumer.fetch(enable_callbacks=True)
- except Exception:
- self.close()
- raise
- if rv is None:
- time.sleep(0.01)
- continue
- if self._got_ending:
- self.close()
- raise StopIteration
- result = self._results.get()
- if isinstance(result, Exception):
- self.close()
- raise result
- yield result
-
-
-def create_connection(new=True):
- """Create a connection"""
- return Connection.instance(new=new)
-
-
-def call(context, topic, msg, timeout=None):
- """Sends a message on a topic and wait for a response."""
- rv = multicall(context, topic, msg, timeout)
- # NOTE(vish): return the last result from the multicall
- rv = list(rv)
- if not rv:
- return
- return rv[-1]
-
-
-def cast(context, topic, msg):
- """Sends a message on a topic without waiting for a response."""
- LOG.debug(_('Making asynchronous cast on %s...'), topic)
- _pack_context(msg, context)
- with ConnectionPool.item() as conn:
- publisher = TopicPublisher(connection=conn, topic=topic)
- publisher.send(msg)
- publisher.close()
-
-
-def fanout_cast(context, topic, msg):
- """Sends a message on a fanout exchange without waiting for a response."""
- LOG.debug(_('Making asynchronous fanout cast...'))
- _pack_context(msg, context)
- with ConnectionPool.item() as conn:
- publisher = FanoutPublisher(topic, connection=conn)
- publisher.send(msg)
- publisher.close()
-
-
-def notify(context, topic, msg):
- """Sends a notification event on a topic."""
- LOG.debug(_('Sending notification on %s...'), topic)
- _pack_context(msg, context)
- with ConnectionPool.item() as conn:
- publisher = TopicPublisher(connection=conn, topic=topic,
- durable=True)
- publisher.send(msg)
- publisher.close()
-
-
-def cleanup():
- pass
-
-
-def generic_response(message_data, message):
- """Logs a result and exits."""
- LOG.debug(_('response %s'), message_data)
- message.ack()
- sys.exit(0)
-
-
-def send_message(topic, message, wait=True):
- """Sends a message for testing."""
- msg_id = uuid.uuid4().hex
- message.update({'_msg_id': msg_id})
- LOG.debug(_('topic is %s'), topic)
- LOG.debug(_('message %s'), message)
-
- if wait:
- consumer = messaging.Consumer(connection=Connection.instance(),
- queue=msg_id,
- exchange=msg_id,
- auto_delete=True,
- exchange_type='direct',
- routing_key=msg_id)
- consumer.register_callback(generic_response)
-
- publisher = messaging.Publisher(connection=Connection.instance(),
- exchange=FLAGS.control_exchange,
- durable=FLAGS.rabbit_durable_queues,
- exchange_type='topic',
- routing_key=topic)
- publisher.send(message)
- publisher.close()
-
- if wait:
- consumer.wait()
- consumer.close()
-
-
-if __name__ == '__main__':
- # You can send messages from the command line using
- # topic and a json string representing a dictionary
- # for the method
- send_message(sys.argv[1], json.loads(sys.argv[2]))
diff --git a/nova/test.py b/nova/test.py
index e44ad57ea..c269b23f7 100644
--- a/nova/test.py
+++ b/nova/test.py
@@ -39,7 +39,6 @@ from nova import log as logging
from nova.openstack.common import cfg
from nova import utils
from nova import service
-from nova.testing.fake import rabbit
from nova.tests import reset_db
from nova.virt import fake
@@ -149,10 +148,6 @@ class TestCase(unittest.TestCase):
self.mox.VerifyAll()
super(TestCase, self).tearDown()
finally:
- # Clean out fake_rabbit's queue if we used it
- if FLAGS.fake_rabbit:
- rabbit.reset_all()
-
if FLAGS.connection_type == 'fake':
if hasattr(fake.FakeConnection, '_instance'):
del fake.FakeConnection._instance
diff --git a/nova/testing/fake/rabbit.py b/nova/testing/fake/rabbit.py
deleted file mode 100644
index 316dc2509..000000000
--- a/nova/testing/fake/rabbit.py
+++ /dev/null
@@ -1,153 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""Based a bit on the carrot.backends.queue backend... but a lot better."""
-
-import Queue as queue
-
-from carrot.backends import base
-from eventlet import greenthread
-
-from nova import log as logging
-
-
-LOG = logging.getLogger(__name__)
-
-
-EXCHANGES = {}
-QUEUES = {}
-CONSUMERS = {}
-
-
-class Message(base.BaseMessage):
- pass
-
-
-class Exchange(object):
- def __init__(self, name, exchange_type):
- self.name = name
- self.exchange_type = exchange_type
- self._queue = queue.Queue()
- self._routes = {}
-
- def publish(self, message, routing_key=None):
- nm = self.name
- LOG.debug(_('(%(nm)s) publish (key: %(routing_key)s)'
- ' %(message)s') % locals())
- if routing_key in self._routes:
- for f in self._routes[routing_key]:
- LOG.debug(_('Publishing to route %s'), f)
- f(message, routing_key=routing_key)
-
- def bind(self, callback, routing_key):
- self._routes.setdefault(routing_key, [])
- self._routes[routing_key].append(callback)
-
-
-class Queue(object):
- def __init__(self, name):
- self.name = name
- self._queue = queue.Queue()
-
- def __repr__(self):
- return '<Queue: %s>' % self.name
-
- def push(self, message, routing_key=None):
- self._queue.put(message)
-
- def size(self):
- return self._queue.qsize()
-
- def pop(self):
- return self._queue.get()
-
-
-class Backend(base.BaseBackend):
- def queue_declare(self, queue, **kwargs):
- global QUEUES
- if queue not in QUEUES:
- LOG.debug(_('Declaring queue %s'), queue)
- QUEUES[queue] = Queue(queue)
-
- def exchange_declare(self, exchange, type, *args, **kwargs):
- global EXCHANGES
- if exchange not in EXCHANGES:
- LOG.debug(_('Declaring exchange %s'), exchange)
- EXCHANGES[exchange] = Exchange(exchange, type)
-
- def queue_bind(self, queue, exchange, routing_key, **kwargs):
- global EXCHANGES
- global QUEUES
- LOG.debug(_('Binding %(queue)s to %(exchange)s with'
- ' key %(routing_key)s') % locals())
- EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key)
-
- def declare_consumer(self, queue, callback, consumer_tag, *args, **kwargs):
- global CONSUMERS
- LOG.debug("Adding consumer %s", consumer_tag)
- CONSUMERS[consumer_tag] = (queue, callback)
-
- def cancel(self, consumer_tag):
- global CONSUMERS
- LOG.debug("Removing consumer %s", consumer_tag)
- del CONSUMERS[consumer_tag]
-
- def consume(self, limit=None):
- global CONSUMERS
- num = 0
- while True:
- for (queue, callback) in CONSUMERS.itervalues():
- item = self.get(queue)
- if item:
- callback(item)
- num += 1
- yield
- if limit and num == limit:
- raise StopIteration()
- greenthread.sleep(0.1)
-
- def get(self, queue, no_ack=False):
- global QUEUES
- if not queue in QUEUES or not QUEUES[queue].size():
- return None
- (message_data, content_type, content_encoding) = QUEUES[queue].pop()
- message = Message(backend=self, body=message_data,
- content_type=content_type,
- content_encoding=content_encoding)
- message.result = True
- LOG.debug(_('Getting from %(queue)s: %(message)s') % locals())
- return message
-
- def prepare_message(self, message_data, delivery_mode,
- content_type, content_encoding, **kwargs):
- """Prepare message for sending."""
- return (message_data, content_type, content_encoding)
-
- def publish(self, message, exchange, routing_key, **kwargs):
- global EXCHANGES
- if exchange in EXCHANGES:
- EXCHANGES[exchange].publish(message, routing_key=routing_key)
-
-
-def reset_all():
- global EXCHANGES
- global QUEUES
- global CONSUMERS
- EXCHANGES = {}
- QUEUES = {}
- CONSUMERS = {}
diff --git a/nova/tests/rpc/test_carrot.py b/nova/tests/rpc/test_carrot.py
deleted file mode 100644
index dae08e8e0..000000000
--- a/nova/tests/rpc/test_carrot.py
+++ /dev/null
@@ -1,41 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-"""
-Unit Tests for remote procedure calls using carrot
-"""
-
-from nova import log as logging
-from nova.rpc import impl_carrot
-from nova.tests.rpc import common
-
-
-LOG = logging.getLogger(__name__)
-
-
-class RpcCarrotTestCase(common.BaseRpcTestCase):
- def setUp(self):
- self.rpc = impl_carrot
- super(RpcCarrotTestCase, self).setUp(supports_timeouts=False)
-
- def test_connectionpool_single(self):
- """Test that ConnectionPool recycles a single connection."""
- conn1 = self.rpc.ConnectionPool.get()
- self.rpc.ConnectionPool.put(conn1)
- conn2 = self.rpc.ConnectionPool.get()
- self.rpc.ConnectionPool.put(conn2)
- self.assertEqual(conn1, conn2)
diff --git a/tools/pip-requires b/tools/pip-requires
index 8949428f2..77ceb9b5a 100644
--- a/tools/pip-requires
+++ b/tools/pip-requires
@@ -3,7 +3,6 @@ Cheetah==2.4.4
amqplib==0.6.1
anyjson==0.2.4
boto==2.1.1
-carrot==0.10.5
eventlet
kombu==1.0.4
lockfile==0.8