summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorZed Shaw <zedshaw@zedshaw.com>2011-07-26 16:29:50 -0700
committerZed Shaw <zedshaw@zedshaw.com>2011-07-26 16:29:50 -0700
commit534b8c3c5b2f6eb3d4c3545c3d5dc2d15061cf6e (patch)
tree1d3466e70042795742c37986504fd987e5cc39d7
parent4a52d4984e9349115f37d34e47e4d1141a8cf6fc (diff)
downloadnova-534b8c3c5b2f6eb3d4c3545c3d5dc2d15061cf6e.tar.gz
nova-534b8c3c5b2f6eb3d4c3545c3d5dc2d15061cf6e.tar.xz
nova-534b8c3c5b2f6eb3d4c3545c3d5dc2d15061cf6e.zip
Implements a simplified messaging abstraction with the least amount of impact to the code base.
-rw-r--r--nova/rpc.py598
-rw-r--r--nova/rpc_backends/__init__.py0
-rw-r--r--nova/rpc_backends/amqp.py591
-rw-r--r--nova/rpc_backends/common.py23
-rw-r--r--nova/service.py28
-rw-r--r--nova/test.py16
-rw-r--r--nova/tests/test_adminapi.py2
-rw-r--r--nova/tests/test_cloud.py68
-rw-r--r--nova/tests/test_rpc.py61
-rw-r--r--nova/tests/test_rpc_amqp.py68
-rw-r--r--nova/tests/test_service.py170
-rw-r--r--nova/tests/test_test.py13
-rw-r--r--nova/utils.py11
13 files changed, 760 insertions, 889 deletions
diff --git a/nova/rpc.py b/nova/rpc.py
index e2771ca88..8b0c6df67 100644
--- a/nova/rpc.py
+++ b/nova/rpc.py
@@ -16,597 +16,51 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""AMQP-based RPC.
-Queues have consumers and publishers.
-
-No fan-out support yet.
-
-"""
-
-import json
-import sys
-import time
-import traceback
-import types
-import uuid
-
-from carrot import connection as carrot_connection
-from carrot import messaging
-from eventlet import greenpool
-from eventlet import pools
-from eventlet import queue
-import greenlet
-
-from nova import context
-from nova import exception
-from nova import fakerabbit
+from nova.utils import load_module
+from nova.rpc_backends.common import RemoteError, LOG
from nova import flags
-from nova import log as logging
-from nova import utils
-
-
-LOG = logging.getLogger('nova.rpc')
-
FLAGS = flags.FLAGS
-flags.DEFINE_integer('rpc_thread_pool_size', 1024,
- 'Size of RPC thread pool')
-flags.DEFINE_integer('rpc_conn_pool_size', 30,
- 'Size of RPC connection pool')
-
-
-class Connection(carrot_connection.BrokerConnection):
- """Connection instance object."""
-
- @classmethod
- def instance(cls, new=True):
- """Returns the instance."""
- if new or not hasattr(cls, '_instance'):
- params = dict(hostname=FLAGS.rabbit_host,
- port=FLAGS.rabbit_port,
- ssl=FLAGS.rabbit_use_ssl,
- userid=FLAGS.rabbit_userid,
- password=FLAGS.rabbit_password,
- virtual_host=FLAGS.rabbit_virtual_host)
-
- if FLAGS.fake_rabbit:
- params['backend_cls'] = fakerabbit.Backend
-
- # NOTE(vish): magic is fun!
- # pylint: disable=W0142
- if new:
- return cls(**params)
- else:
- cls._instance = cls(**params)
- return cls._instance
-
- @classmethod
- def recreate(cls):
- """Recreates the connection instance.
-
- This is necessary to recover from some network errors/disconnects.
-
- """
- try:
- del cls._instance
- except AttributeError, e:
- # The _instance stuff is for testing purposes. Usually we don't use
- # it. So don't freak out if it doesn't exist.
- pass
- return cls.instance()
-
-
-class Pool(pools.Pool):
- """Class that implements a Pool of Connections."""
-
- # TODO(comstud): Timeout connections not used in a while
- def create(self):
- LOG.debug('Creating new connection')
- return Connection.instance(new=True)
-
-# Create a ConnectionPool to use for RPC calls. We'll order the
-# pool as a stack (LIFO), so that we can potentially loop through and
-# timeout old unused connections at some point
-ConnectionPool = Pool(
- max_size=FLAGS.rpc_conn_pool_size,
- order_as_stack=True)
-
-
-class Consumer(messaging.Consumer):
- """Consumer base class.
-
- Contains methods for connecting the fetch method to async loops.
-
- """
-
- def __init__(self, *args, **kwargs):
- for i in xrange(FLAGS.rabbit_max_retries):
- if i > 0:
- time.sleep(FLAGS.rabbit_retry_interval)
- try:
- super(Consumer, self).__init__(*args, **kwargs)
- self.failed_connection = False
- break
- except Exception as e: # Catching all because carrot sucks
- fl_host = FLAGS.rabbit_host
- fl_port = FLAGS.rabbit_port
- fl_intv = FLAGS.rabbit_retry_interval
- LOG.error(_('AMQP server on %(fl_host)s:%(fl_port)d is'
- ' unreachable: %(e)s. Trying again in %(fl_intv)d'
- ' seconds.') % locals())
- self.failed_connection = True
- if self.failed_connection:
- LOG.error(_('Unable to connect to AMQP server '
- 'after %d tries. Shutting down.'),
- FLAGS.rabbit_max_retries)
- sys.exit(1)
-
- def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
- """Wraps the parent fetch with some logic for failed connection."""
- # TODO(vish): the logic for failed connections and logging should be
- # refactored into some sort of connection manager object
- try:
- if self.failed_connection:
- # NOTE(vish): connection is defined in the parent class, we can
- # recreate it as long as we create the backend too
- # pylint: disable=W0201
- self.connection = Connection.recreate()
- self.backend = self.connection.create_backend()
- self.declare()
- return super(Consumer, self).fetch(no_ack,
- auto_ack,
- enable_callbacks)
- if self.failed_connection:
- LOG.error(_('Reconnected to queue'))
- self.failed_connection = False
- # NOTE(vish): This is catching all errors because we really don't
- # want exceptions to be logged 10 times a second if some
- # persistent failure occurs.
- except Exception, e: # pylint: disable=W0703
- if not self.failed_connection:
- LOG.exception(_('Failed to fetch message from queue: %s' % e))
- self.failed_connection = True
-
- def attach_to_eventlet(self):
- """Only needed for unit tests!"""
- timer = utils.LoopingCall(self.fetch, enable_callbacks=True)
- timer.start(0.1)
- return timer
-
-
-class AdapterConsumer(Consumer):
- """Calls methods on a proxy object based on method and args."""
-
- def __init__(self, connection=None, topic='broadcast', proxy=None):
- LOG.debug(_('Initing the Adapter Consumer for %s') % topic)
- self.proxy = proxy
- self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
- super(AdapterConsumer, self).__init__(connection=connection,
- topic=topic)
- self.register_callback(self.process_data)
-
- def process_data(self, message_data, message):
- """Consumer callback to call a method on a proxy object.
-
- Parses the message for validity and fires off a thread to call the
- proxy object method.
-
- Message data should be a dictionary with two keys:
- method: string representing the method to call
- args: dictionary of arg: value
-
- Example: {'method': 'echo', 'args': {'value': 42}}
-
- """
- LOG.debug(_('received %s') % message_data)
- # This will be popped off in _unpack_context
- msg_id = message_data.get('_msg_id', None)
- ctxt = _unpack_context(message_data)
-
- method = message_data.get('method')
- args = message_data.get('args', {})
- message.ack()
- if not method:
- # NOTE(vish): we may not want to ack here, but that means that bad
- # messages stay in the queue indefinitely, so for now
- # we just log the message and send an error string
- # back to the caller
- LOG.warn(_('no method for message: %s') % message_data)
- if msg_id:
- msg_reply(msg_id,
- _('No method for message: %s') % message_data)
- return
- self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args)
-
- @exception.wrap_exception()
- def _process_data(self, msg_id, ctxt, method, args):
- """Thread that maigcally looks for a method on the proxy
- object and calls it.
- """
-
- node_func = getattr(self.proxy, str(method))
- node_args = dict((str(k), v) for k, v in args.iteritems())
- # NOTE(vish): magic is fun!
- try:
- rval = node_func(context=ctxt, **node_args)
- if msg_id:
- # Check if the result was a generator
- if isinstance(rval, types.GeneratorType):
- for x in rval:
- msg_reply(msg_id, x, None)
- else:
- msg_reply(msg_id, rval, None)
-
- # This final None tells multicall that it is done.
- msg_reply(msg_id, None, None)
- elif isinstance(rval, types.GeneratorType):
- # NOTE(vish): this iterates through the generator
- list(rval)
- except Exception as e:
- logging.exception('Exception during message handling')
- if msg_id:
- msg_reply(msg_id, None, sys.exc_info())
- return
-
-
-class TopicAdapterConsumer(AdapterConsumer):
- """Consumes messages on a specific topic."""
-
- exchange_type = 'topic'
-
- def __init__(self, connection=None, topic='broadcast', proxy=None):
- self.queue = topic
- self.routing_key = topic
- self.exchange = FLAGS.control_exchange
- self.durable = False
- super(TopicAdapterConsumer, self).__init__(connection=connection,
- topic=topic, proxy=proxy)
-
-
-class FanoutAdapterConsumer(AdapterConsumer):
- """Consumes messages from a fanout exchange."""
-
- exchange_type = 'fanout'
-
- def __init__(self, connection=None, topic='broadcast', proxy=None):
- self.exchange = '%s_fanout' % topic
- self.routing_key = topic
- unique = uuid.uuid4().hex
- self.queue = '%s_fanout_%s' % (topic, unique)
- self.durable = False
- # Fanout creates unique queue names, so we should auto-remove
- # them when done, so they're not left around on restart.
- # Also, we're the only one that should be consuming. exclusive
- # implies auto_delete, so we'll just set that..
- self.exclusive = True
- LOG.info(_('Created "%(exchange)s" fanout exchange '
- 'with "%(key)s" routing key'),
- dict(exchange=self.exchange, key=self.routing_key))
- super(FanoutAdapterConsumer, self).__init__(connection=connection,
- topic=topic, proxy=proxy)
-
-
-class ConsumerSet(object):
- """Groups consumers to listen on together on a single connection."""
-
- def __init__(self, connection, consumer_list):
- self.consumer_list = set(consumer_list)
- self.consumer_set = None
- self.enabled = True
- self.init(connection)
-
- def init(self, conn):
- if not conn:
- conn = Connection.instance(new=True)
- if self.consumer_set:
- self.consumer_set.close()
- self.consumer_set = messaging.ConsumerSet(conn)
- for consumer in self.consumer_list:
- consumer.connection = conn
- # consumer.backend is set for us
- self.consumer_set.add_consumer(consumer)
-
- def reconnect(self):
- self.init(None)
-
- def wait(self, limit=None):
- running = True
- while running:
- it = self.consumer_set.iterconsume(limit=limit)
- if not it:
- break
- while True:
- try:
- it.next()
- except StopIteration:
- return
- except greenlet.GreenletExit:
- running = False
- break
- except Exception as e:
- LOG.exception(_("Exception while processing consumer"))
- self.reconnect()
- # Break to outer loop
- break
+flags.DEFINE_string('rpc_backend',
+ 'nova.rpc_backends.amqp',
+ "The messaging module to use, defaults to AMQP.")
- def close(self):
- self.consumer_set.close()
+RPCIMPL = load_module(FLAGS.rpc_backend)
-class Publisher(messaging.Publisher):
- """Publisher base class."""
- pass
+def create_connection(new=True):
+ return RPCIMPL.Connection.instance(new=True)
-class TopicPublisher(Publisher):
- """Publishes messages on a specific topic."""
+def create_consumer(conn, topic, proxy, fanout=False):
+ if fanout:
+ return RPCIMPL.FanoutAdapterConsumer(
+ connection=conn,
+ topic=topic,
+ proxy=proxy)
+ else:
+ return RPCIMPL.TopicAdapterConsumer(
+ connection=conn,
+ topic=topic,
+ proxy=proxy)
- exchange_type = 'topic'
- def __init__(self, connection=None, topic='broadcast'):
- self.routing_key = topic
- self.exchange = FLAGS.control_exchange
- self.durable = False
- super(TopicPublisher, self).__init__(connection=connection)
-
-
-class FanoutPublisher(Publisher):
- """Publishes messages to a fanout exchange."""
-
- exchange_type = 'fanout'
-
- def __init__(self, topic, connection=None):
- self.exchange = '%s_fanout' % topic
- self.queue = '%s_fanout' % topic
- self.durable = False
- self.auto_delete = True
- LOG.info(_('Creating "%(exchange)s" fanout exchange'),
- dict(exchange=self.exchange))
- super(FanoutPublisher, self).__init__(connection=connection)
-
-
-class DirectConsumer(Consumer):
- """Consumes messages directly on a channel specified by msg_id."""
-
- exchange_type = 'direct'
-
- def __init__(self, connection=None, msg_id=None):
- self.queue = msg_id
- self.routing_key = msg_id
- self.exchange = msg_id
- self.auto_delete = True
- self.exclusive = True
- super(DirectConsumer, self).__init__(connection=connection)
-
-
-class DirectPublisher(Publisher):
- """Publishes messages directly on a channel specified by msg_id."""
-
- exchange_type = 'direct'
-
- def __init__(self, connection=None, msg_id=None):
- self.routing_key = msg_id
- self.exchange = msg_id
- self.auto_delete = True
- super(DirectPublisher, self).__init__(connection=connection)
-
-
-def msg_reply(msg_id, reply=None, failure=None):
- """Sends a reply or an error on the channel signified by msg_id.
-
- Failure should be a sys.exc_info() tuple.
-
- """
- if failure:
- message = str(failure[1])
- tb = traceback.format_exception(*failure)
- LOG.error(_("Returning exception %s to caller"), message)
- LOG.error(tb)
- failure = (failure[0].__name__, str(failure[1]), tb)
-
- with ConnectionPool.item() as conn:
- publisher = DirectPublisher(connection=conn, msg_id=msg_id)
- try:
- publisher.send({'result': reply, 'failure': failure})
- except TypeError:
- publisher.send(
- {'result': dict((k, repr(v))
- for k, v in reply.__dict__.iteritems()),
- 'failure': failure})
-
- publisher.close()
-
-
-class RemoteError(exception.Error):
- """Signifies that a remote class has raised an exception.
-
- Containes a string representation of the type of the original exception,
- the value of the original exception, and the traceback. These are
- sent to the parent as a joined string so printing the exception
- contains all of the relevent info.
-
- """
-
- def __init__(self, exc_type, value, traceback):
- self.exc_type = exc_type
- self.value = value
- self.traceback = traceback
- super(RemoteError, self).__init__('%s %s\n%s' % (exc_type,
- value,
- traceback))
-
-
-def _unpack_context(msg):
- """Unpack context from msg."""
- context_dict = {}
- for key in list(msg.keys()):
- # NOTE(vish): Some versions of python don't like unicode keys
- # in kwargs.
- key = str(key)
- if key.startswith('_context_'):
- value = msg.pop(key)
- context_dict[key[9:]] = value
- context_dict['msg_id'] = msg.pop('_msg_id', None)
- LOG.debug(_('unpacked context: %s'), context_dict)
- return RpcContext.from_dict(context_dict)
-
-
-def _pack_context(msg, context):
- """Pack context into msg.
-
- Values for message keys need to be less than 255 chars, so we pull
- context out into a bunch of separate keys. If we want to support
- more arguments in rabbit messages, we may want to do the same
- for args at some point.
-
- """
- context_d = dict([('_context_%s' % key, value)
- for (key, value) in context.to_dict().iteritems()])
- msg.update(context_d)
-
-
-class RpcContext(context.RequestContext):
- def __init__(self, *args, **kwargs):
- msg_id = kwargs.pop('msg_id', None)
- self.msg_id = msg_id
- super(RpcContext, self).__init__(*args, **kwargs)
-
- def reply(self, *args, **kwargs):
- msg_reply(self.msg_id, *args, **kwargs)
-
-
-def multicall(context, topic, msg):
- """Make a call that returns multiple times."""
- LOG.debug(_('Making asynchronous call on %s ...'), topic)
- msg_id = uuid.uuid4().hex
- msg.update({'_msg_id': msg_id})
- LOG.debug(_('MSG_ID is %s') % (msg_id))
- _pack_context(msg, context)
-
- con_conn = ConnectionPool.get()
- consumer = DirectConsumer(connection=con_conn, msg_id=msg_id)
- wait_msg = MulticallWaiter(consumer)
- consumer.register_callback(wait_msg)
-
- publisher = TopicPublisher(connection=con_conn, topic=topic)
- publisher.send(msg)
- publisher.close()
-
- return wait_msg
-
-
-class MulticallWaiter(object):
- def __init__(self, consumer):
- self._consumer = consumer
- self._results = queue.Queue()
- self._closed = False
-
- def close(self):
- self._closed = True
- self._consumer.close()
- ConnectionPool.put(self._consumer.connection)
-
- def __call__(self, data, message):
- """Acks message and sets result."""
- message.ack()
- if data['failure']:
- self._results.put(RemoteError(*data['failure']))
- else:
- self._results.put(data['result'])
-
- def __iter__(self):
- return self.wait()
-
- def wait(self):
- while True:
- rv = None
- while rv is None and not self._closed:
- try:
- rv = self._consumer.fetch(enable_callbacks=True)
- except Exception:
- self.close()
- raise
- time.sleep(0.01)
-
- result = self._results.get()
- if isinstance(result, Exception):
- self.close()
- raise result
- if result == None:
- self.close()
- raise StopIteration
- yield result
+def create_consumer_set(conn, consumers):
+ return RPCIMPL.ConsumerSet(connection=conn, consumer_list=consumers)
def call(context, topic, msg):
- """Sends a message on a topic and wait for a response."""
- rv = multicall(context, topic, msg)
- # NOTE(vish): return the last result from the multicall
- rv = list(rv)
- if not rv:
- return
- return rv[-1]
+ return RPCIMPL.call(context, topic, msg)
def cast(context, topic, msg):
- """Sends a message on a topic without waiting for a response."""
- LOG.debug(_('Making asynchronous cast on %s...'), topic)
- _pack_context(msg, context)
- with ConnectionPool.item() as conn:
- publisher = TopicPublisher(connection=conn, topic=topic)
- publisher.send(msg)
- publisher.close()
+ return RPCIMPL.cast(context, topic, msg)
def fanout_cast(context, topic, msg):
- """Sends a message on a fanout exchange without waiting for a response."""
- LOG.debug(_('Making asynchronous fanout cast...'))
- _pack_context(msg, context)
- with ConnectionPool.item() as conn:
- publisher = FanoutPublisher(topic, connection=conn)
- publisher.send(msg)
- publisher.close()
-
-
-def generic_response(message_data, message):
- """Logs a result and exits."""
- LOG.debug(_('response %s'), message_data)
- message.ack()
- sys.exit(0)
+ return RPCIMPL.fanout_cast(context, topic, msg)
-def send_message(topic, message, wait=True):
- """Sends a message for testing."""
- msg_id = uuid.uuid4().hex
- message.update({'_msg_id': msg_id})
- LOG.debug(_('topic is %s'), topic)
- LOG.debug(_('message %s'), message)
-
- if wait:
- consumer = messaging.Consumer(connection=Connection.instance(),
- queue=msg_id,
- exchange=msg_id,
- auto_delete=True,
- exchange_type='direct',
- routing_key=msg_id)
- consumer.register_callback(generic_response)
-
- publisher = messaging.Publisher(connection=Connection.instance(),
- exchange=FLAGS.control_exchange,
- durable=False,
- exchange_type='topic',
- routing_key=topic)
- publisher.send(message)
- publisher.close()
-
- if wait:
- consumer.wait()
- consumer.close()
-
-
-if __name__ == '__main__':
- # You can send messages from the command line using
- # topic and a json string representing a dictionary
- # for the method
- send_message(sys.argv[1], json.loads(sys.argv[2]))
+def multicall(context, topic, msg):
+ return RPCIMPL.multicall(context, topic, msg)
diff --git a/nova/rpc_backends/__init__.py b/nova/rpc_backends/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/nova/rpc_backends/__init__.py
diff --git a/nova/rpc_backends/amqp.py b/nova/rpc_backends/amqp.py
new file mode 100644
index 000000000..efa178bd2
--- /dev/null
+++ b/nova/rpc_backends/amqp.py
@@ -0,0 +1,591 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""AMQP-based RPC.
+
+Queues have consumers and publishers.
+
+No fan-out support yet.
+
+"""
+
+import json
+import sys
+import time
+import traceback
+import types
+import uuid
+
+from carrot import connection as carrot_connection
+from carrot import messaging
+from eventlet import greenpool
+from eventlet import pools
+from eventlet import queue
+import greenlet
+
+from nova import context
+from nova import exception
+from nova import fakerabbit
+from nova import flags
+from nova import log as logging
+from nova import utils
+from nova.rpc_backends.common import RemoteError, LOG
+
+
+FLAGS = flags.FLAGS
+flags.DEFINE_integer('rpc_thread_pool_size', 1024,
+ 'Size of RPC thread pool')
+flags.DEFINE_integer('rpc_conn_pool_size', 30,
+ 'Size of RPC connection pool')
+
+
+class Connection(carrot_connection.BrokerConnection):
+ """Connection instance object."""
+
+ @classmethod
+ def instance(cls, new=True):
+ """Returns the instance."""
+ if new or not hasattr(cls, '_instance'):
+ params = dict(hostname=FLAGS.rabbit_host,
+ port=FLAGS.rabbit_port,
+ ssl=FLAGS.rabbit_use_ssl,
+ userid=FLAGS.rabbit_userid,
+ password=FLAGS.rabbit_password,
+ virtual_host=FLAGS.rabbit_virtual_host)
+
+ if FLAGS.fake_rabbit:
+ params['backend_cls'] = fakerabbit.Backend
+
+ # NOTE(vish): magic is fun!
+ # pylint: disable=W0142
+ if new:
+ return cls(**params)
+ else:
+ cls._instance = cls(**params)
+ return cls._instance
+
+ @classmethod
+ def recreate(cls):
+ """Recreates the connection instance.
+
+ This is necessary to recover from some network errors/disconnects.
+
+ """
+ try:
+ del cls._instance
+ except AttributeError, e:
+ # The _instance stuff is for testing purposes. Usually we don't use
+ # it. So don't freak out if it doesn't exist.
+ pass
+ return cls.instance()
+
+
+class Pool(pools.Pool):
+ """Class that implements a Pool of Connections."""
+
+ # TODO(comstud): Timeout connections not used in a while
+ def create(self):
+ LOG.debug('Creating new connection')
+ return Connection.instance(new=True)
+
+# Create a ConnectionPool to use for RPC calls. We'll order the
+# pool as a stack (LIFO), so that we can potentially loop through and
+# timeout old unused connections at some point
+ConnectionPool = Pool(
+ max_size=FLAGS.rpc_conn_pool_size,
+ order_as_stack=True)
+
+
+class Consumer(messaging.Consumer):
+ """Consumer base class.
+
+ Contains methods for connecting the fetch method to async loops.
+
+ """
+
+ def __init__(self, *args, **kwargs):
+ for i in xrange(FLAGS.rabbit_max_retries):
+ if i > 0:
+ time.sleep(FLAGS.rabbit_retry_interval)
+ try:
+ super(Consumer, self).__init__(*args, **kwargs)
+ self.failed_connection = False
+ break
+ except Exception as e: # Catching all because carrot sucks
+ fl_host = FLAGS.rabbit_host
+ fl_port = FLAGS.rabbit_port
+ fl_intv = FLAGS.rabbit_retry_interval
+ LOG.error(_('AMQP server on %(fl_host)s:%(fl_port)d is'
+ ' unreachable: %(e)s. Trying again in %(fl_intv)d'
+ ' seconds.') % locals())
+ self.failed_connection = True
+ if self.failed_connection:
+ LOG.error(_('Unable to connect to AMQP server '
+ 'after %d tries. Shutting down.'),
+ FLAGS.rabbit_max_retries)
+ sys.exit(1)
+
+ def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
+ """Wraps the parent fetch with some logic for failed connection."""
+ # TODO(vish): the logic for failed connections and logging should be
+ # refactored into some sort of connection manager object
+ try:
+ if self.failed_connection:
+ # NOTE(vish): connection is defined in the parent class, we can
+ # recreate it as long as we create the backend too
+ # pylint: disable=W0201
+ self.connection = Connection.recreate()
+ self.backend = self.connection.create_backend()
+ self.declare()
+ return super(Consumer, self).fetch(no_ack,
+ auto_ack,
+ enable_callbacks)
+ if self.failed_connection:
+ LOG.error(_('Reconnected to queue'))
+ self.failed_connection = False
+ # NOTE(vish): This is catching all errors because we really don't
+ # want exceptions to be logged 10 times a second if some
+ # persistent failure occurs.
+ except Exception, e: # pylint: disable=W0703
+ if not self.failed_connection:
+ LOG.exception(_('Failed to fetch message from queue: %s' % e))
+ self.failed_connection = True
+
+ def attach_to_eventlet(self):
+ """Only needed for unit tests!"""
+ timer = utils.LoopingCall(self.fetch, enable_callbacks=True)
+ timer.start(0.1)
+ return timer
+
+
+class AdapterConsumer(Consumer):
+ """Calls methods on a proxy object based on method and args."""
+
+ def __init__(self, connection=None, topic='broadcast', proxy=None):
+ LOG.debug(_('Initing the Adapter Consumer for %s') % topic)
+ self.proxy = proxy
+ self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
+ super(AdapterConsumer, self).__init__(connection=connection,
+ topic=topic)
+ self.register_callback(self.process_data)
+
+ def process_data(self, message_data, message):
+ """Consumer callback to call a method on a proxy object.
+
+ Parses the message for validity and fires off a thread to call the
+ proxy object method.
+
+ Message data should be a dictionary with two keys:
+ method: string representing the method to call
+ args: dictionary of arg: value
+
+ Example: {'method': 'echo', 'args': {'value': 42}}
+
+ """
+ LOG.debug(_('received %s') % message_data)
+ # This will be popped off in _unpack_context
+ msg_id = message_data.get('_msg_id', None)
+ ctxt = _unpack_context(message_data)
+
+ method = message_data.get('method')
+ args = message_data.get('args', {})
+ message.ack()
+ if not method:
+ # NOTE(vish): we may not want to ack here, but that means that bad
+ # messages stay in the queue indefinitely, so for now
+ # we just log the message and send an error string
+ # back to the caller
+ LOG.warn(_('no method for message: %s') % message_data)
+ if msg_id:
+ msg_reply(msg_id,
+ _('No method for message: %s') % message_data)
+ return
+ self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args)
+
+ @exception.wrap_exception()
+ def _process_data(self, msg_id, ctxt, method, args):
+ """Thread that maigcally looks for a method on the proxy
+ object and calls it.
+ """
+
+ node_func = getattr(self.proxy, str(method))
+ node_args = dict((str(k), v) for k, v in args.iteritems())
+ # NOTE(vish): magic is fun!
+ try:
+ rval = node_func(context=ctxt, **node_args)
+ if msg_id:
+ # Check if the result was a generator
+ if isinstance(rval, types.GeneratorType):
+ for x in rval:
+ msg_reply(msg_id, x, None)
+ else:
+ msg_reply(msg_id, rval, None)
+
+ # This final None tells multicall that it is done.
+ msg_reply(msg_id, None, None)
+ elif isinstance(rval, types.GeneratorType):
+ # NOTE(vish): this iterates through the generator
+ list(rval)
+ except Exception as e:
+ logging.exception('Exception during message handling')
+ if msg_id:
+ msg_reply(msg_id, None, sys.exc_info())
+ return
+
+
+class TopicAdapterConsumer(AdapterConsumer):
+ """Consumes messages on a specific topic."""
+
+ exchange_type = 'topic'
+
+ def __init__(self, connection=None, topic='broadcast', proxy=None):
+ self.queue = topic
+ self.routing_key = topic
+ self.exchange = FLAGS.control_exchange
+ self.durable = False
+ super(TopicAdapterConsumer, self).__init__(connection=connection,
+ topic=topic, proxy=proxy)
+
+
+class FanoutAdapterConsumer(AdapterConsumer):
+ """Consumes messages from a fanout exchange."""
+
+ exchange_type = 'fanout'
+
+ def __init__(self, connection=None, topic='broadcast', proxy=None):
+ self.exchange = '%s_fanout' % topic
+ self.routing_key = topic
+ unique = uuid.uuid4().hex
+ self.queue = '%s_fanout_%s' % (topic, unique)
+ self.durable = False
+ # Fanout creates unique queue names, so we should auto-remove
+ # them when done, so they're not left around on restart.
+ # Also, we're the only one that should be consuming. exclusive
+ # implies auto_delete, so we'll just set that..
+ self.exclusive = True
+ LOG.info(_('Created "%(exchange)s" fanout exchange '
+ 'with "%(key)s" routing key'),
+ dict(exchange=self.exchange, key=self.routing_key))
+ super(FanoutAdapterConsumer, self).__init__(connection=connection,
+ topic=topic, proxy=proxy)
+
+
+class ConsumerSet(object):
+ """Groups consumers to listen on together on a single connection."""
+
+ def __init__(self, connection, consumer_list):
+ self.consumer_list = set(consumer_list)
+ self.consumer_set = None
+ self.enabled = True
+ self.init(connection)
+
+ def init(self, conn):
+ if not conn:
+ conn = Connection.instance(new=True)
+ if self.consumer_set:
+ self.consumer_set.close()
+ self.consumer_set = messaging.ConsumerSet(conn)
+ for consumer in self.consumer_list:
+ consumer.connection = conn
+ # consumer.backend is set for us
+ self.consumer_set.add_consumer(consumer)
+
+ def reconnect(self):
+ self.init(None)
+
+ def wait(self, limit=None):
+ running = True
+ while running:
+ it = self.consumer_set.iterconsume(limit=limit)
+ if not it:
+ break
+ while True:
+ try:
+ it.next()
+ except StopIteration:
+ return
+ except greenlet.GreenletExit:
+ running = False
+ break
+ except Exception as e:
+ LOG.exception(_("Exception while processing consumer"))
+ self.reconnect()
+ # Break to outer loop
+ break
+
+ def close(self):
+ self.consumer_set.close()
+
+
+class Publisher(messaging.Publisher):
+ """Publisher base class."""
+ pass
+
+
+class TopicPublisher(Publisher):
+ """Publishes messages on a specific topic."""
+
+ exchange_type = 'topic'
+
+ def __init__(self, connection=None, topic='broadcast'):
+ self.routing_key = topic
+ self.exchange = FLAGS.control_exchange
+ self.durable = False
+ super(TopicPublisher, self).__init__(connection=connection)
+
+
+class FanoutPublisher(Publisher):
+ """Publishes messages to a fanout exchange."""
+
+ exchange_type = 'fanout'
+
+ def __init__(self, topic, connection=None):
+ self.exchange = '%s_fanout' % topic
+ self.queue = '%s_fanout' % topic
+ self.durable = False
+ self.auto_delete = True
+ LOG.info(_('Creating "%(exchange)s" fanout exchange'),
+ dict(exchange=self.exchange))
+ super(FanoutPublisher, self).__init__(connection=connection)
+
+
+class DirectConsumer(Consumer):
+ """Consumes messages directly on a channel specified by msg_id."""
+
+ exchange_type = 'direct'
+
+ def __init__(self, connection=None, msg_id=None):
+ self.queue = msg_id
+ self.routing_key = msg_id
+ self.exchange = msg_id
+ self.auto_delete = True
+ self.exclusive = True
+ super(DirectConsumer, self).__init__(connection=connection)
+
+
+class DirectPublisher(Publisher):
+ """Publishes messages directly on a channel specified by msg_id."""
+
+ exchange_type = 'direct'
+
+ def __init__(self, connection=None, msg_id=None):
+ self.routing_key = msg_id
+ self.exchange = msg_id
+ self.auto_delete = True
+ super(DirectPublisher, self).__init__(connection=connection)
+
+
+def msg_reply(msg_id, reply=None, failure=None):
+ """Sends a reply or an error on the channel signified by msg_id.
+
+ Failure should be a sys.exc_info() tuple.
+
+ """
+ if failure:
+ message = str(failure[1])
+ tb = traceback.format_exception(*failure)
+ LOG.error(_("Returning exception %s to caller"), message)
+ LOG.error(tb)
+ failure = (failure[0].__name__, str(failure[1]), tb)
+
+ with ConnectionPool.item() as conn:
+ publisher = DirectPublisher(connection=conn, msg_id=msg_id)
+ try:
+ publisher.send({'result': reply, 'failure': failure})
+ except TypeError:
+ publisher.send(
+ {'result': dict((k, repr(v))
+ for k, v in reply.__dict__.iteritems()),
+ 'failure': failure})
+
+ publisher.close()
+
+
+def _unpack_context(msg):
+ """Unpack context from msg."""
+ context_dict = {}
+ for key in list(msg.keys()):
+ # NOTE(vish): Some versions of python don't like unicode keys
+ # in kwargs.
+ key = str(key)
+ if key.startswith('_context_'):
+ value = msg.pop(key)
+ context_dict[key[9:]] = value
+ context_dict['msg_id'] = msg.pop('_msg_id', None)
+ LOG.debug(_('unpacked context: %s'), context_dict)
+ return RpcContext.from_dict(context_dict)
+
+
+def _pack_context(msg, context):
+ """Pack context into msg.
+
+ Values for message keys need to be less than 255 chars, so we pull
+ context out into a bunch of separate keys. If we want to support
+ more arguments in rabbit messages, we may want to do the same
+ for args at some point.
+
+ """
+ context_d = dict([('_context_%s' % key, value)
+ for (key, value) in context.to_dict().iteritems()])
+ msg.update(context_d)
+
+
+class RpcContext(context.RequestContext):
+ def __init__(self, *args, **kwargs):
+ msg_id = kwargs.pop('msg_id', None)
+ self.msg_id = msg_id
+ super(RpcContext, self).__init__(*args, **kwargs)
+
+ def reply(self, *args, **kwargs):
+ msg_reply(self.msg_id, *args, **kwargs)
+
+
+def multicall(context, topic, msg):
+ """Make a call that returns multiple times."""
+ LOG.debug(_('Making asynchronous call on %s ...'), topic)
+ msg_id = uuid.uuid4().hex
+ msg.update({'_msg_id': msg_id})
+ LOG.debug(_('MSG_ID is %s') % (msg_id))
+ _pack_context(msg, context)
+
+ con_conn = ConnectionPool.get()
+ consumer = DirectConsumer(connection=con_conn, msg_id=msg_id)
+ wait_msg = MulticallWaiter(consumer)
+ consumer.register_callback(wait_msg)
+
+ publisher = TopicPublisher(connection=con_conn, topic=topic)
+ publisher.send(msg)
+ publisher.close()
+
+ return wait_msg
+
+
+class MulticallWaiter(object):
+ def __init__(self, consumer):
+ self._consumer = consumer
+ self._results = queue.Queue()
+ self._closed = False
+
+ def close(self):
+ self._closed = True
+ self._consumer.close()
+ ConnectionPool.put(self._consumer.connection)
+
+ def __call__(self, data, message):
+ """Acks message and sets result."""
+ message.ack()
+ if data['failure']:
+ self._results.put(RemoteError(*data['failure']))
+ else:
+ self._results.put(data['result'])
+
+ def __iter__(self):
+ return self.wait()
+
+ def wait(self):
+ while True:
+ rv = None
+ while rv is None and not self._closed:
+ try:
+ rv = self._consumer.fetch(enable_callbacks=True)
+ except Exception:
+ self.close()
+ raise
+ time.sleep(0.01)
+
+ result = self._results.get()
+ if isinstance(result, Exception):
+ self.close()
+ raise result
+ if result == None:
+ self.close()
+ raise StopIteration
+ yield result
+
+
+def call(context, topic, msg):
+ """Sends a message on a topic and wait for a response."""
+ rv = multicall(context, topic, msg)
+ # NOTE(vish): return the last result from the multicall
+ rv = list(rv)
+ if not rv:
+ return
+ return rv[-1]
+
+
+def cast(context, topic, msg):
+ """Sends a message on a topic without waiting for a response."""
+ LOG.debug(_('Making asynchronous cast on %s...'), topic)
+ _pack_context(msg, context)
+ with ConnectionPool.item() as conn:
+ publisher = TopicPublisher(connection=conn, topic=topic)
+ publisher.send(msg)
+ publisher.close()
+
+
+def fanout_cast(context, topic, msg):
+ """Sends a message on a fanout exchange without waiting for a response."""
+ LOG.debug(_('Making asynchronous fanout cast...'))
+ _pack_context(msg, context)
+ with ConnectionPool.item() as conn:
+ publisher = FanoutPublisher(topic, connection=conn)
+ publisher.send(msg)
+ publisher.close()
+
+
+def generic_response(message_data, message):
+ """Logs a result and exits."""
+ LOG.debug(_('response %s'), message_data)
+ message.ack()
+ sys.exit(0)
+
+
+def send_message(topic, message, wait=True):
+ """Sends a message for testing."""
+ msg_id = uuid.uuid4().hex
+ message.update({'_msg_id': msg_id})
+ LOG.debug(_('topic is %s'), topic)
+ LOG.debug(_('message %s'), message)
+
+ if wait:
+ consumer = messaging.Consumer(connection=Connection.instance(),
+ queue=msg_id,
+ exchange=msg_id,
+ auto_delete=True,
+ exchange_type='direct',
+ routing_key=msg_id)
+ consumer.register_callback(generic_response)
+
+ publisher = messaging.Publisher(connection=Connection.instance(),
+ exchange=FLAGS.control_exchange,
+ durable=False,
+ exchange_type='topic',
+ routing_key=topic)
+ publisher.send(message)
+ publisher.close()
+
+ if wait:
+ consumer.wait()
+ consumer.close()
+
+
+if __name__ == '__main__':
+ # You can send messages from the command line using
+ # topic and a json string representing a dictionary
+ # for the method
+ send_message(sys.argv[1], json.loads(sys.argv[2]))
diff --git a/nova/rpc_backends/common.py b/nova/rpc_backends/common.py
new file mode 100644
index 000000000..1d3065a83
--- /dev/null
+++ b/nova/rpc_backends/common.py
@@ -0,0 +1,23 @@
+from nova import exception
+from nova import log as logging
+
+LOG = logging.getLogger('nova.rpc')
+
+
+class RemoteError(exception.Error):
+ """Signifies that a remote class has raised an exception.
+
+ Containes a string representation of the type of the original exception,
+ the value of the original exception, and the traceback. These are
+ sent to the parent as a joined string so printing the exception
+ contains all of the relevent info.
+
+ """
+
+ def __init__(self, exc_type, value, traceback):
+ self.exc_type = exc_type
+ self.value = value
+ self.traceback = traceback
+ super(RemoteError, self).__init__('%s %s\n%s' % (exc_type,
+ value,
+ traceback))
diff --git a/nova/service.py b/nova/service.py
index 00e4f61e5..6e9eddc5a 100644
--- a/nova/service.py
+++ b/nova/service.py
@@ -149,26 +149,22 @@ class Service(object):
if 'nova-compute' == self.binary:
self.manager.update_available_resource(ctxt)
- self.conn = rpc.Connection.instance(new=True)
+ self.conn = rpc.create_connection(new=True)
logging.debug("Creating Consumer connection for Service %s" %
self.topic)
# Share this same connection for these Consumers
- consumer_all = rpc.TopicAdapterConsumer(
- connection=self.conn,
- topic=self.topic,
- proxy=self)
- consumer_node = rpc.TopicAdapterConsumer(
- connection=self.conn,
- topic='%s.%s' % (self.topic, self.host),
- proxy=self)
- fanout = rpc.FanoutAdapterConsumer(
- connection=self.conn,
- topic=self.topic,
- proxy=self)
- consumer_set = rpc.ConsumerSet(
- connection=self.conn,
- consumer_list=[consumer_all, consumer_node, fanout])
+ consumer_all = rpc.create_consumer(self.conn, self.topic, self,
+ fanout=False)
+
+ node_topic = '%s.%s' % (self.topic, self.host)
+ consumer_node = rpc.create_consumer(self.conn, node_topic, self,
+ fanout=False)
+
+ fanout = rpc.create_consumer(self.conn, self.topic, self, fanout=True)
+
+ consumers = [consumer_all, consumer_node, fanout]
+ consumer_set = rpc.create_consumer_set(self.conn, consumers)
# Wait forever, processing these consumers
def _wait():
diff --git a/nova/test.py b/nova/test.py
index 9790b0aa1..549aa6fcf 100644
--- a/nova/test.py
+++ b/nova/test.py
@@ -99,9 +99,7 @@ class TestCase(unittest.TestCase):
self.flag_overrides = {}
self.injected = []
self._services = []
- self._monkey_patch_attach()
self._original_flags = FLAGS.FlagValuesDict()
- rpc.ConnectionPool = rpc.Pool(max_size=FLAGS.rpc_conn_pool_size)
def tearDown(self):
"""Runs after each test method to tear down test environment."""
@@ -126,9 +124,6 @@ class TestCase(unittest.TestCase):
# Reset any overriden flags
self.reset_flags()
- # Reset our monkey-patches
- rpc.Consumer.attach_to_eventlet = self.original_attach
-
# Stop any timers
for x in self.injected:
try:
@@ -172,17 +167,6 @@ class TestCase(unittest.TestCase):
self._services.append(svc)
return svc
- def _monkey_patch_attach(self):
- self.original_attach = rpc.Consumer.attach_to_eventlet
-
- def _wrapped(inner_self):
- rv = self.original_attach(inner_self)
- self.injected.append(rv)
- return rv
-
- _wrapped.func_name = self.original_attach.func_name
- rpc.Consumer.attach_to_eventlet = _wrapped
-
# Useful assertions
def assertDictMatch(self, d1, d2, approx_equal=False, tolerance=0.001):
"""Assert two dicts are equivalent.
diff --git a/nova/tests/test_adminapi.py b/nova/tests/test_adminapi.py
index 877cf4ea1..6bbe15f53 100644
--- a/nova/tests/test_adminapi.py
+++ b/nova/tests/test_adminapi.py
@@ -39,7 +39,7 @@ class AdminApiTestCase(test.TestCase):
super(AdminApiTestCase, self).setUp()
self.flags(connection_type='fake')
- self.conn = rpc.Connection.instance()
+ self.conn = rpc.create_connection()
# set up our cloud
self.api = admin.AdminController()
diff --git a/nova/tests/test_cloud.py b/nova/tests/test_cloud.py
index 136082cc1..a1b296a96 100644
--- a/nova/tests/test_cloud.py
+++ b/nova/tests/test_cloud.py
@@ -50,7 +50,7 @@ class CloudTestCase(test.TestCase):
self.flags(connection_type='fake',
stub_network=True)
- self.conn = rpc.Connection.instance()
+ self.conn = rpc.create_connection()
# set up our cloud
self.cloud = cloud.CloudController()
@@ -269,63 +269,24 @@ class CloudTestCase(test.TestCase):
delete = self.cloud.delete_security_group
self.assertRaises(exception.ApiError, delete, self.context)
- def test_authorize_security_group_ingress(self):
+ def test_authorize_revoke_security_group_ingress(self):
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
sec = db.security_group_create(self.context, kwargs)
authz = self.cloud.authorize_security_group_ingress
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
- self.assertTrue(authz(self.context, group_name=sec['name'], **kwargs))
-
- def test_authorize_security_group_ingress_ip_permissions_ip_ranges(self):
- kwargs = {'project_id': self.context.project_id, 'name': 'test'}
- sec = db.security_group_create(self.context, kwargs)
- authz = self.cloud.authorize_security_group_ingress
- kwargs = {'ip_permissions': [{'to_port': 81, 'from_port': 81,
- 'ip_ranges':
- {'1': {'cidr_ip': u'0.0.0.0/0'},
- '2': {'cidr_ip': u'10.10.10.10/32'}},
- 'ip_protocol': u'tcp'}]}
- self.assertTrue(authz(self.context, group_name=sec['name'], **kwargs))
-
- def test_authorize_security_group_ingress_ip_permissions_groups(self):
- kwargs = {'project_id': self.context.project_id, 'name': 'test'}
- sec = db.security_group_create(self.context, kwargs)
- authz = self.cloud.authorize_security_group_ingress
- kwargs = {'ip_permissions': [{'to_port': 81, 'from_port': 81,
- 'ip_ranges':{'1': {'cidr_ip': u'0.0.0.0/0'},
- '2': {'cidr_ip': u'10.10.10.10/32'}},
- 'groups': {'1': {'user_id': u'someuser',
- 'group_name': u'somegroup1'},
- '2': {'user_id': u'someuser',
- 'group_name': u'othergroup2'}},
- 'ip_protocol': u'tcp'}]}
- self.assertTrue(authz(self.context, group_name=sec['name'], **kwargs))
-
- def test_revoke_security_group_ingress(self):
- kwargs = {'project_id': self.context.project_id, 'name': 'test'}
- sec = db.security_group_create(self.context, kwargs)
- authz = self.cloud.authorize_security_group_ingress
- kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
- authz(self.context, group_id=sec['id'], **kwargs)
+ authz(self.context, group_name=sec['name'], **kwargs)
revoke = self.cloud.revoke_security_group_ingress
self.assertTrue(revoke(self.context, group_name=sec['name'], **kwargs))
- def test_revoke_security_group_ingress_by_id(self):
- kwargs = {'project_id': self.context.project_id, 'name': 'test'}
- sec = db.security_group_create(self.context, kwargs)
- authz = self.cloud.authorize_security_group_ingress
- kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
- authz(self.context, group_id=sec['id'], **kwargs)
- revoke = self.cloud.revoke_security_group_ingress
- self.assertTrue(revoke(self.context, group_id=sec['id'], **kwargs))
-
- def test_authorize_security_group_ingress_by_id(self):
+ def test_authorize_revoke_security_group_ingress_by_id(self):
sec = db.security_group_create(self.context,
{'project_id': self.context.project_id,
'name': 'test'})
authz = self.cloud.authorize_security_group_ingress
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
- self.assertTrue(authz(self.context, group_id=sec['id'], **kwargs))
+ authz(self.context, group_id=sec['id'], **kwargs)
+ revoke = self.cloud.revoke_security_group_ingress
+ self.assertTrue(revoke(self.context, group_id=sec['id'], **kwargs))
def test_authorize_security_group_ingress_missing_protocol_params(self):
sec = db.security_group_create(self.context,
@@ -947,21 +908,6 @@ class CloudTestCase(test.TestCase):
self._wait_for_running(ec2_instance_id)
return ec2_instance_id
- def test_rescue_unrescue_instance(self):
- instance_id = self._run_instance(
- image_id='ami-1',
- instance_type=FLAGS.default_instance_type,
- max_count=1)
- self.cloud.rescue_instance(context=self.context,
- instance_id=instance_id)
- # NOTE(vish): This currently does no validation, it simply makes sure
- # that the code path doesn't throw an exception.
- self.cloud.unrescue_instance(context=self.context,
- instance_id=instance_id)
- # TODO(soren): We need this until we can stop polling in the rpc code
- # for unit tests.
- self.cloud.terminate_instances(self.context, [instance_id])
-
def test_console_output(self):
instance_id = self._run_instance(
image_id='ami-1',
diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py
index ffd748efe..2d2436175 100644
--- a/nova/tests/test_rpc.py
+++ b/nova/tests/test_rpc.py
@@ -33,11 +33,12 @@ LOG = logging.getLogger('nova.tests.rpc')
class RpcTestCase(test.TestCase):
def setUp(self):
super(RpcTestCase, self).setUp()
- self.conn = rpc.Connection.instance(True)
+ self.conn = rpc.create_connection(True)
self.receiver = TestReceiver()
- self.consumer = rpc.TopicAdapterConsumer(connection=self.conn,
- topic='test',
- proxy=self.receiver)
+ self.consumer = rpc.create_consumer(self.conn,
+ 'test',
+ self.receiver,
+ False)
self.consumer.attach_to_eventlet()
self.context = context.get_admin_context()
@@ -129,6 +130,8 @@ class RpcTestCase(test.TestCase):
"""Calls echo in the passed queue"""
LOG.debug(_("Nested received %(queue)s, %(value)s")
% locals())
+ # TODO: so, it will replay the context and use the same REQID?
+ # that's bizarre.
ret = rpc.call(context,
queue,
{"method": "echo",
@@ -137,10 +140,11 @@ class RpcTestCase(test.TestCase):
return value
nested = Nested()
- conn = rpc.Connection.instance(True)
- consumer = rpc.TopicAdapterConsumer(connection=conn,
- topic='nested',
- proxy=nested)
+ conn = rpc.create_connection(True)
+ consumer = rpc.create_consumer(conn,
+ 'nested',
+ nested,
+ False)
consumer.attach_to_eventlet()
value = 42
result = rpc.call(self.context,
@@ -149,47 +153,6 @@ class RpcTestCase(test.TestCase):
"value": value}})
self.assertEqual(value, result)
- def test_connectionpool_single(self):
- """Test that ConnectionPool recycles a single connection."""
- conn1 = rpc.ConnectionPool.get()
- rpc.ConnectionPool.put(conn1)
- conn2 = rpc.ConnectionPool.get()
- rpc.ConnectionPool.put(conn2)
- self.assertEqual(conn1, conn2)
-
- def test_connectionpool_double(self):
- """Test that ConnectionPool returns and reuses separate connections.
-
- When called consecutively we should get separate connections and upon
- returning them those connections should be reused for future calls
- before generating a new connection.
-
- """
- conn1 = rpc.ConnectionPool.get()
- conn2 = rpc.ConnectionPool.get()
-
- self.assertNotEqual(conn1, conn2)
- rpc.ConnectionPool.put(conn1)
- rpc.ConnectionPool.put(conn2)
-
- conn3 = rpc.ConnectionPool.get()
- conn4 = rpc.ConnectionPool.get()
- self.assertEqual(conn1, conn3)
- self.assertEqual(conn2, conn4)
-
- def test_connectionpool_limit(self):
- """Test connection pool limit and connection uniqueness."""
- max_size = FLAGS.rpc_conn_pool_size
- conns = []
-
- for i in xrange(max_size):
- conns.append(rpc.ConnectionPool.get())
-
- self.assertFalse(rpc.ConnectionPool.free_items)
- self.assertEqual(rpc.ConnectionPool.current_size,
- rpc.ConnectionPool.max_size)
- self.assertEqual(len(set(conns)), max_size)
-
class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call.
diff --git a/nova/tests/test_rpc_amqp.py b/nova/tests/test_rpc_amqp.py
new file mode 100644
index 000000000..e3df2393a
--- /dev/null
+++ b/nova/tests/test_rpc_amqp.py
@@ -0,0 +1,68 @@
+from nova import context
+from nova import flags
+from nova import log as logging
+from nova import rpc
+from nova.rpc_backends import amqp
+from nova import test
+
+
+FLAGS = flags.FLAGS
+LOG = logging.getLogger('nova.tests.rpc')
+
+
+class RpcAMQPTestCase(test.TestCase):
+ def setUp(self):
+ super(RpcAMQPTestCase, self).setUp()
+ self.conn = rpc.create_connection(True)
+ self.receiver = TestReceiver()
+ self.consumer = rpc.create_consumer(self.conn,
+ 'test',
+ self.receiver,
+ False)
+ self.consumer.attach_to_eventlet()
+ self.context = context.get_admin_context()
+
+ def test_connectionpool_single(self):
+ """Test that ConnectionPool recycles a single connection."""
+ conn1 = amqp.ConnectionPool.get()
+ amqp.ConnectionPool.put(conn1)
+ conn2 = amqp.ConnectionPool.get()
+ amqp.ConnectionPool.put(conn2)
+ self.assertEqual(conn1, conn2)
+
+
+class TestReceiver(object):
+ """Simple Proxy class so the consumer has methods to call.
+
+ Uses static methods because we aren't actually storing any state.
+
+ """
+
+ @staticmethod
+ def echo(context, value):
+ """Simply returns whatever value is sent in."""
+ LOG.debug(_("Received %s"), value)
+ return value
+
+ @staticmethod
+ def context(context, value):
+ """Returns dictionary version of context."""
+ LOG.debug(_("Received %s"), context)
+ return context.to_dict()
+
+ @staticmethod
+ def echo_three_times(context, value):
+ context.reply(value)
+ context.reply(value + 1)
+ context.reply(value + 2)
+
+ @staticmethod
+ def echo_three_times_yield(context, value):
+ yield value
+ yield value + 1
+ yield value + 2
+
+ @staticmethod
+ def fail(context, value):
+ """Raises an exception with the value sent in."""
+ raise Exception(value)
diff --git a/nova/tests/test_service.py b/nova/tests/test_service.py
index f45f76b73..bbf47b50f 100644
--- a/nova/tests/test_service.py
+++ b/nova/tests/test_service.py
@@ -109,103 +109,8 @@ class ServiceTestCase(test.TestCase):
# the looping calls are created in StartService.
app = service.Service.create(host=host, binary=binary, topic=topic)
- self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
- service.rpc.Connection.instance(new=mox.IgnoreArg())
-
- self.mox.StubOutWithMock(rpc,
- 'TopicAdapterConsumer',
- use_mock_anything=True)
- self.mox.StubOutWithMock(rpc,
- 'FanoutAdapterConsumer',
- use_mock_anything=True)
-
- self.mox.StubOutWithMock(rpc,
- 'ConsumerSet',
- use_mock_anything=True)
-
- rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
- topic=topic,
- proxy=mox.IsA(service.Service)).AndReturn(
- rpc.TopicAdapterConsumer)
-
- rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
- topic='%s.%s' % (topic, host),
- proxy=mox.IsA(service.Service)).AndReturn(
- rpc.TopicAdapterConsumer)
-
- rpc.FanoutAdapterConsumer(connection=mox.IgnoreArg(),
- topic=topic,
- proxy=mox.IsA(service.Service)).AndReturn(
- rpc.FanoutAdapterConsumer)
-
- def wait_func(self, limit=None):
- return None
-
- mock_cset = self.mox.CreateMock(rpc.ConsumerSet,
- {'wait': wait_func})
- rpc.ConsumerSet(connection=mox.IgnoreArg(),
- consumer_list=mox.IsA(list)).AndReturn(mock_cset)
- wait_func(mox.IgnoreArg())
-
- service_create = {'host': host,
- 'binary': binary,
- 'topic': topic,
- 'report_count': 0,
- 'availability_zone': 'nova'}
- service_ref = {'host': host,
- 'binary': binary,
- 'report_count': 0,
- 'id': 1}
-
- service.db.service_get_by_args(mox.IgnoreArg(),
- host,
- binary).AndRaise(exception.NotFound())
- service.db.service_create(mox.IgnoreArg(),
- service_create).AndReturn(service_ref)
- self.mox.ReplayAll()
-
- app.start()
- app.stop()
self.assert_(app)
- # We're testing sort of weird behavior in how report_state decides
- # whether it is disconnected, it looks for a variable on itself called
- # 'model_disconnected' and report_state doesn't really do much so this
- # these are mostly just for coverage
- def test_report_state_no_service(self):
- host = 'foo'
- binary = 'bar'
- topic = 'test'
- service_create = {'host': host,
- 'binary': binary,
- 'topic': topic,
- 'report_count': 0,
- 'availability_zone': 'nova'}
- service_ref = {'host': host,
- 'binary': binary,
- 'topic': topic,
- 'report_count': 0,
- 'availability_zone': 'nova',
- 'id': 1}
-
- service.db.service_get_by_args(mox.IgnoreArg(),
- host,
- binary).AndRaise(exception.NotFound())
- service.db.service_create(mox.IgnoreArg(),
- service_create).AndReturn(service_ref)
- service.db.service_get(mox.IgnoreArg(),
- service_ref['id']).AndReturn(service_ref)
- service.db.service_update(mox.IgnoreArg(), service_ref['id'],
- mox.ContainsKeyValue('report_count', 1))
-
- self.mox.ReplayAll()
- serv = service.Service(host,
- binary,
- topic,
- 'nova.tests.test_service.FakeManager')
- serv.start()
- serv.report_state()
-
def test_report_state_newly_disconnected(self):
host = 'foo'
binary = 'bar'
@@ -276,81 +181,6 @@ class ServiceTestCase(test.TestCase):
self.assert_(not serv.model_disconnected)
- def test_compute_can_update_available_resource(self):
- """Confirm compute updates their record of compute-service table."""
- host = 'foo'
- binary = 'nova-compute'
- topic = 'compute'
-
- # Any mocks are not working without UnsetStubs() here.
- self.mox.UnsetStubs()
- ctxt = context.get_admin_context()
- service_ref = db.service_create(ctxt, {'host': host,
- 'binary': binary,
- 'topic': topic})
- serv = service.Service(host,
- binary,
- topic,
- 'nova.compute.manager.ComputeManager')
-
- # This testcase want to test calling update_available_resource.
- # No need to call periodic call, then below variable must be set 0.
- serv.report_interval = 0
- serv.periodic_interval = 0
-
- # Creating mocks
- self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
- service.rpc.Connection.instance(new=mox.IgnoreArg())
-
- self.mox.StubOutWithMock(rpc,
- 'TopicAdapterConsumer',
- use_mock_anything=True)
- self.mox.StubOutWithMock(rpc,
- 'FanoutAdapterConsumer',
- use_mock_anything=True)
-
- self.mox.StubOutWithMock(rpc,
- 'ConsumerSet',
- use_mock_anything=True)
-
- rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
- topic=topic,
- proxy=mox.IsA(service.Service)).AndReturn(
- rpc.TopicAdapterConsumer)
-
- rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
- topic='%s.%s' % (topic, host),
- proxy=mox.IsA(service.Service)).AndReturn(
- rpc.TopicAdapterConsumer)
-
- rpc.FanoutAdapterConsumer(connection=mox.IgnoreArg(),
- topic=topic,
- proxy=mox.IsA(service.Service)).AndReturn(
- rpc.FanoutAdapterConsumer)
-
- def wait_func(self, limit=None):
- return None
-
- mock_cset = self.mox.CreateMock(rpc.ConsumerSet,
- {'wait': wait_func})
- rpc.ConsumerSet(connection=mox.IgnoreArg(),
- consumer_list=mox.IsA(list)).AndReturn(mock_cset)
- wait_func(mox.IgnoreArg())
-
- self.mox.StubOutWithMock(serv.manager.driver,
- 'update_available_resource')
- serv.manager.driver.update_available_resource(mox.IgnoreArg(), host)
-
- # Just doing start()-stop(), not confirm new db record is created,
- # because update_available_resource() works only in
- # libvirt environment. This testcase confirms
- # update_available_resource() is called. Otherwise, mox complains.
- self.mox.ReplayAll()
- serv.start()
- serv.stop()
-
- db.service_destroy(ctxt, service_ref['id'])
-
class TestWSGIService(test.TestCase):
diff --git a/nova/tests/test_test.py b/nova/tests/test_test.py
index 35c838065..64f11fa45 100644
--- a/nova/tests/test_test.py
+++ b/nova/tests/test_test.py
@@ -33,8 +33,13 @@ class IsolationTestCase(test.TestCase):
self.start_service('compute')
def test_rpc_consumer_isolation(self):
- connection = rpc.Connection.instance(new=True)
- consumer = rpc.TopicAdapterConsumer(connection, topic='compute')
- consumer.register_callback(
- lambda x, y: self.fail('I should never be called'))
+ class NeverCalled(object):
+
+ def __getattribute__(*args):
+ assert False, "I should never get called."
+
+ connection = rpc.create_connection(new=True)
+ proxy = NeverCalled()
+ consumer = rpc.create_consumer(connection, 'compute',
+ proxy, fanout=False)
consumer.attach_to_eventlet()
diff --git a/nova/utils.py b/nova/utils.py
index 8784a227d..ad31f88bd 100644
--- a/nova/utils.py
+++ b/nova/utils.py
@@ -809,3 +809,14 @@ class Bootstrapper(object):
for key in FLAGS:
value = FLAGS.get(key, None)
logging.audit(_("%(key)s : %(value)s" % locals()))
+
+
+def load_module(name):
+ mod = __import__(name)
+
+ components = name.split('.')
+
+ for comp in components[1:]:
+ mod = getattr(mod, comp)
+
+ return mod