summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2011-12-23 10:19:53 +0000
committerGerrit Code Review <review@openstack.org>2011-12-23 10:19:53 +0000
commit7db38e4d40bf406f23e7ca2717824e230897cc41 (patch)
treeadad5b7efe8ad91dda32f327ce2f5b304ea45af5
parent787f76eb8ae3949cceed950c8cc9513d46b8277b (diff)
parentf2764375c37c005710943d25bd494558577f892c (diff)
downloadnova-7db38e4d40bf406f23e7ca2717824e230897cc41.tar.gz
nova-7db38e4d40bf406f23e7ca2717824e230897cc41.tar.xz
nova-7db38e4d40bf406f23e7ca2717824e230897cc41.zip
Merge "Help clarify rpc API with docs and a bit of code."
-rw-r--r--nova/rpc/__init__.py102
-rw-r--r--nova/rpc/common.py74
-rw-r--r--nova/rpc/impl_carrot.py5
-rw-r--r--nova/rpc/impl_kombu.py9
4 files changed, 174 insertions, 16 deletions
diff --git a/nova/rpc/__init__.py b/nova/rpc/__init__.py
index c0cfdd5ce..eeb2791ba 100644
--- a/nova/rpc/__init__.py
+++ b/nova/rpc/__init__.py
@@ -3,6 +3,7 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
+# Copyright 2011 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -26,32 +27,107 @@ flags.DEFINE_string('rpc_backend',
'nova.rpc.impl_kombu',
"The messaging module to use, defaults to kombu.")
-_RPCIMPL = None
+def create_connection(new=True):
+ """Create a connection to the message bus used for rpc.
-def get_impl():
- """Delay import of rpc_backend until FLAGS are loaded."""
- global _RPCIMPL
- if _RPCIMPL is None:
- _RPCIMPL = import_object(FLAGS.rpc_backend)
- return _RPCIMPL
+ For some example usage of creating a connection and some consumers on that
+ connection, see nova.service.
+ :param new: Whether or not to create a new connection. A new connection
+ will be created by default. If new is False, the
+ implementation is free to return an existing connection from a
+ pool.
-def create_connection(new=True):
- return get_impl().create_connection(new=new)
+ :returns: An instance of nova.rpc.common.Connection
+ """
+ return _get_impl().create_connection(new=new)
def call(context, topic, msg):
- return get_impl().call(context, topic, msg)
+ """Invoke a remote method that returns something.
+
+ :param context: Information that identifies the user that has made this
+ request.
+ :param topic: The topic to send the rpc message to. This correlates to the
+ topic argument of
+ nova.rpc.common.Connection.create_consumer() and only applies
+ when the consumer was created with fanout=False.
+ :param msg: This is a dict in the form { "method" : "method_to_invoke",
+ "args" : dict_of_kwargs }
+
+ :returns: A dict from the remote method.
+ """
+ return _get_impl().call(context, topic, msg)
def cast(context, topic, msg):
- return get_impl().cast(context, topic, msg)
+ """Invoke a remote method that does not return anything.
+
+ :param context: Information that identifies the user that has made this
+ request.
+ :param topic: The topic to send the rpc message to. This correlates to the
+ topic argument of
+ nova.rpc.common.Connection.create_consumer() and only applies
+ when the consumer was created with fanout=False.
+ :param msg: This is a dict in the form { "method" : "method_to_invoke",
+ "args" : dict_of_kwargs }
+
+ :returns: None
+ """
+ return _get_impl().cast(context, topic, msg)
def fanout_cast(context, topic, msg):
- return get_impl().fanout_cast(context, topic, msg)
+ """Broadcast a remote method invocation with no return.
+
+ This method will get invoked on all consumers that were set up with this
+ topic name and fanout=True.
+
+ :param context: Information that identifies the user that has made this
+ request.
+ :param topic: The topic to send the rpc message to. This correlates to the
+ topic argument of
+ nova.rpc.common.Connection.create_consumer() and only applies
+ when the consumer was created with fanout=True.
+ :param msg: This is a dict in the form { "method" : "method_to_invoke",
+ "args" : dict_of_kwargs }
+
+ :returns: None
+ """
+ return _get_impl().fanout_cast(context, topic, msg)
def multicall(context, topic, msg):
- return get_impl().multicall(context, topic, msg)
+ """Invoke a remote method and get back an iterator.
+
+ In this case, the remote method will be returning multiple values in
+ separate messages, so the return values can be processed as the come in via
+ an iterator.
+
+ :param context: Information that identifies the user that has made this
+ request.
+ :param topic: The topic to send the rpc message to. This correlates to the
+ topic argument of
+ nova.rpc.common.Connection.create_consumer() and only applies
+ when the consumer was created with fanout=False.
+ :param msg: This is a dict in the form { "method" : "method_to_invoke",
+ "args" : dict_of_kwargs }
+
+ :returns: An iterator. The iterator will yield a tuple (N, X) where N is
+ an index that starts at 0 and increases by one for each value
+ returned and X is the Nth value that was returned by the remote
+ method.
+ """
+ return _get_impl().multicall(context, topic, msg)
+
+
+_RPCIMPL = None
+
+
+def _get_impl():
+ """Delay import of rpc_backend until FLAGS are loaded."""
+ global _RPCIMPL
+ if _RPCIMPL is None:
+ _RPCIMPL = import_object(FLAGS.rpc_backend)
+ return _RPCIMPL
diff --git a/nova/rpc/common.py b/nova/rpc/common.py
index a7597d29b..43c4a1fae 100644
--- a/nova/rpc/common.py
+++ b/nova/rpc/common.py
@@ -1,3 +1,23 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+# Copyright 2011 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
from nova import exception
from nova import flags
from nova import log as logging
@@ -26,3 +46,57 @@ class RemoteError(exception.NovaException):
self.value = value
self.traceback = traceback
super(RemoteError, self).__init__(**self.__dict__)
+
+
+class Connection(object):
+ """A connection, returned by rpc.create_connection().
+
+ This class represents a connection to the message bus used for rpc.
+ An instance of this class should never be created by users of the rpc API.
+ Use rpc.create_connection() instead.
+ """
+ def close(self):
+ """Close the connection.
+
+ This method must be called when the connection will no longer be used.
+ It will ensure that any resources associated with the connection, such
+ as a network connection, and cleaned up.
+ """
+ raise NotImplementedError()
+
+ def create_consumer(self, topic, proxy, fanout=False):
+ """Create a consumer on this connection.
+
+ A consumer is associated with a message queue on the backend message
+ bus. The consumer will read messages from the queue, unpack them, and
+ dispatch them to the proxy object. The contents of the message pulled
+ off of the queue will determine which method gets called on the proxy
+ object.
+
+ :param topic: This is a name associated with what to consume from.
+ Multiple instances of a service may consume from the same
+ topic. For example, all instances of nova-compute consume
+ from a queue called "compute". In that case, the
+ messages will get distributed amongst the consumers in a
+ round-robin fashion if fanout=False. If fanout=True,
+ every consumer associated with this topic will get a
+ copy of every message.
+ :param proxy: The object that will handle all incoming messages.
+ :param fanout: Whether or not this is a fanout topic. See the
+ documentation for the topic parameter for some
+ additional comments on this.
+ """
+ raise NotImplementedError()
+
+ def consume_in_thread(self):
+ """Spawn a thread to handle incoming messages.
+
+ Spawn a thread that will be responsible for handling all incoming
+ messages for consumers that were set up on this connection.
+
+ Message dispatching inside of this is expected to be implemented in a
+ non-blocking manner. An example implementation would be having this
+ thread pull messages in for all of the consumers, but utilize a thread
+ pool for dispatching the messages to the proxy objects.
+ """
+ raise NotImplementedError()
diff --git a/nova/rpc/impl_carrot.py b/nova/rpc/impl_carrot.py
index eed8cb10d..ce119d655 100644
--- a/nova/rpc/impl_carrot.py
+++ b/nova/rpc/impl_carrot.py
@@ -42,6 +42,7 @@ import greenlet
from nova import context
from nova import exception
from nova import flags
+from nova.rpc import common as rpc_common
from nova.rpc.common import RemoteError, LOG
from nova.testing import fake
@@ -51,7 +52,7 @@ eventlet.monkey_patch()
FLAGS = flags.FLAGS
-class Connection(carrot_connection.BrokerConnection):
+class Connection(carrot_connection.BrokerConnection, rpc_common.Connection):
"""Connection instance object."""
def __init__(self, *args, **kwargs):
@@ -105,7 +106,7 @@ class Connection(carrot_connection.BrokerConnection):
# ignore all errors
pass
self._rpc_consumers = []
- super(Connection, self).close()
+ carrot_connection.BrokerConnection.close(self)
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread"""
diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py
index 757e7636a..b16bc3c79 100644
--- a/nova/rpc/impl_kombu.py
+++ b/nova/rpc/impl_kombu.py
@@ -33,6 +33,7 @@ import greenlet
from nova import context
from nova import exception
from nova import flags
+from nova.rpc import common as rpc_common
from nova.rpc.common import RemoteError, LOG
# Needed for tests
@@ -512,7 +513,7 @@ ConnectionPool = Pool(
order_as_stack=True)
-class ConnectionContext(object):
+class ConnectionContext(rpc_common.Connection):
"""The class that is actually returned to the caller of
create_connection(). This is a essentially a wrapper around
Connection that supports 'with' and can return a new Connection or
@@ -569,6 +570,12 @@ class ConnectionContext(object):
"""Caller is done with this connection."""
self._done()
+ def create_consumer(self, topic, proxy, fanout=False):
+ self.connection.create_consumer(topic, proxy, fanout)
+
+ def consume_in_thread(self):
+ self.connection.consume_in_thread()
+
def __getattr__(self, key):
"""Proxy all other calls to the Connection instance"""
if self.connection: