summaryrefslogtreecommitdiffstats
path: root/keystone/openstack/common/rpc/proxy.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone/openstack/common/rpc/proxy.py')
-rw-r--r--keystone/openstack/common/rpc/proxy.py226
1 files changed, 226 insertions, 0 deletions
diff --git a/keystone/openstack/common/rpc/proxy.py b/keystone/openstack/common/rpc/proxy.py
new file mode 100644
index 00000000..9cc61920
--- /dev/null
+++ b/keystone/openstack/common/rpc/proxy.py
@@ -0,0 +1,226 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012-2013 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+A helper class for proxy objects to remote APIs.
+
+For more information about rpc API version numbers, see:
+ rpc/dispatcher.py
+"""
+
+
+from keystone.openstack.common import rpc
+from keystone.openstack.common.rpc import common as rpc_common
+from keystone.openstack.common.rpc import serializer as rpc_serializer
+
+
+class RpcProxy(object):
+ """A helper class for rpc clients.
+
+ This class is a wrapper around the RPC client API. It allows you to
+ specify the topic and API version in a single place. This is intended to
+ be used as a base class for a class that implements the client side of an
+ rpc API.
+ """
+
+ # The default namespace, which can be overriden in a subclass.
+ RPC_API_NAMESPACE = None
+
+ def __init__(self, topic, default_version, version_cap=None,
+ serializer=None):
+ """Initialize an RpcProxy.
+
+ :param topic: The topic to use for all messages.
+ :param default_version: The default API version to request in all
+ outgoing messages. This can be overridden on a per-message
+ basis.
+ :param version_cap: Optionally cap the maximum version used for sent
+ messages.
+ :param serializer: Optionaly (de-)serialize entities with a
+ provided helper.
+ """
+ self.topic = topic
+ self.default_version = default_version
+ self.version_cap = version_cap
+ if serializer is None:
+ serializer = rpc_serializer.NoOpSerializer()
+ self.serializer = serializer
+ super(RpcProxy, self).__init__()
+
+ def _set_version(self, msg, vers):
+ """Helper method to set the version in a message.
+
+ :param msg: The message having a version added to it.
+ :param vers: The version number to add to the message.
+ """
+ v = vers if vers else self.default_version
+ if (self.version_cap and not
+ rpc_common.version_is_compatible(self.version_cap, v)):
+ raise rpc_common.RpcVersionCapError(version_cap=self.version_cap)
+ msg['version'] = v
+
+ def _get_topic(self, topic):
+ """Return the topic to use for a message."""
+ return topic if topic else self.topic
+
+ def can_send_version(self, version):
+ """Check to see if a version is compatible with the version cap."""
+ return (not self.version_cap or
+ rpc_common.version_is_compatible(self.version_cap, version))
+
+ @staticmethod
+ def make_namespaced_msg(method, namespace, **kwargs):
+ return {'method': method, 'namespace': namespace, 'args': kwargs}
+
+ def make_msg(self, method, **kwargs):
+ return self.make_namespaced_msg(method, self.RPC_API_NAMESPACE,
+ **kwargs)
+
+ def _serialize_msg_args(self, context, kwargs):
+ """Helper method called to serialize message arguments.
+
+ This calls our serializer on each argument, returning a new
+ set of args that have been serialized.
+
+ :param context: The request context
+ :param kwargs: The arguments to serialize
+ :returns: A new set of serialized arguments
+ """
+ new_kwargs = dict()
+ for argname, arg in kwargs.iteritems():
+ new_kwargs[argname] = self.serializer.serialize_entity(context,
+ arg)
+ return new_kwargs
+
+ def call(self, context, msg, topic=None, version=None, timeout=None):
+ """rpc.call() a remote method.
+
+ :param context: The request context
+ :param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
+ :param timeout: (Optional) A timeout to use when waiting for the
+ response. If no timeout is specified, a default timeout will be
+ used that is usually sufficient.
+
+ :returns: The return value from the remote method.
+ """
+ self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
+ real_topic = self._get_topic(topic)
+ try:
+ result = rpc.call(context, real_topic, msg, timeout)
+ return self.serializer.deserialize_entity(context, result)
+ except rpc.common.Timeout as exc:
+ raise rpc.common.Timeout(
+ exc.info, real_topic, msg.get('method'))
+
+ def multicall(self, context, msg, topic=None, version=None, timeout=None):
+ """rpc.multicall() a remote method.
+
+ :param context: The request context
+ :param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
+ :param timeout: (Optional) A timeout to use when waiting for the
+ response. If no timeout is specified, a default timeout will be
+ used that is usually sufficient.
+
+ :returns: An iterator that lets you process each of the returned values
+ from the remote method as they arrive.
+ """
+ self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
+ real_topic = self._get_topic(topic)
+ try:
+ result = rpc.multicall(context, real_topic, msg, timeout)
+ return self.serializer.deserialize_entity(context, result)
+ except rpc.common.Timeout as exc:
+ raise rpc.common.Timeout(
+ exc.info, real_topic, msg.get('method'))
+
+ def cast(self, context, msg, topic=None, version=None):
+ """rpc.cast() a remote method.
+
+ :param context: The request context
+ :param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: None. rpc.cast() does not wait on any return value from the
+ remote method.
+ """
+ self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
+ rpc.cast(context, self._get_topic(topic), msg)
+
+ def fanout_cast(self, context, msg, topic=None, version=None):
+ """rpc.fanout_cast() a remote method.
+
+ :param context: The request context
+ :param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: None. rpc.fanout_cast() does not wait on any return value
+ from the remote method.
+ """
+ self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
+ rpc.fanout_cast(context, self._get_topic(topic), msg)
+
+ def cast_to_server(self, context, server_params, msg, topic=None,
+ version=None):
+ """rpc.cast_to_server() a remote method.
+
+ :param context: The request context
+ :param server_params: Server parameters. See rpc.cast_to_server() for
+ details.
+ :param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: None. rpc.cast_to_server() does not wait on any
+ return values.
+ """
+ self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
+ rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
+
+ def fanout_cast_to_server(self, context, server_params, msg, topic=None,
+ version=None):
+ """rpc.fanout_cast_to_server() a remote method.
+
+ :param context: The request context
+ :param server_params: Server parameters. See rpc.cast_to_server() for
+ details.
+ :param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: None. rpc.fanout_cast_to_server() does not wait on any
+ return values.
+ """
+ self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
+ rpc.fanout_cast_to_server(context, server_params,
+ self._get_topic(topic), msg)