summaryrefslogtreecommitdiffstats
path: root/openstack/common/rpc/proxy.py
diff options
context:
space:
mode:
Diffstat (limited to 'openstack/common/rpc/proxy.py')
-rw-r--r--openstack/common/rpc/proxy.py37
1 files changed, 34 insertions, 3 deletions
diff --git a/openstack/common/rpc/proxy.py b/openstack/common/rpc/proxy.py
index 0fb140f..3b74fe1 100644
--- a/openstack/common/rpc/proxy.py
+++ b/openstack/common/rpc/proxy.py
@@ -24,6 +24,7 @@ For more information about rpc API version numbers, see:
from openstack.common import rpc
from openstack.common.rpc import common as rpc_common
+from openstack.common.rpc import serializer as rpc_serializer
class RpcProxy(object):
@@ -38,7 +39,8 @@ class RpcProxy(object):
# The default namespace, which can be overriden in a subclass.
RPC_API_NAMESPACE = None
- def __init__(self, topic, default_version, version_cap=None):
+ def __init__(self, topic, default_version, version_cap=None,
+ serializer=None):
"""Initialize an RpcProxy.
:param topic: The topic to use for all messages.
@@ -47,10 +49,15 @@ class RpcProxy(object):
basis.
:param version_cap: Optionally cap the maximum version used for sent
messages.
+ :param serializer: Optionaly (de-)serialize entities with a
+ provided helper.
"""
self.topic = topic
self.default_version = default_version
self.version_cap = version_cap
+ if serializer is None:
+ serializer = rpc_serializer.NoOpSerializer()
+ self.serializer = serializer
super(RpcProxy, self).__init__()
def _set_version(self, msg, vers):
@@ -77,6 +84,22 @@ class RpcProxy(object):
return self.make_namespaced_msg(method, self.RPC_API_NAMESPACE,
**kwargs)
+ def _serialize_msg_args(self, context, kwargs):
+ """Helper method called to serialize message arguments.
+
+ This calls our serializer on each argument, returning a new
+ set of args that have been serialized.
+
+ :param context: The request context
+ :param kwargs: The arguments to serialize
+ :returns: A new set of serialized arguments
+ """
+ new_kwargs = dict()
+ for argname, arg in kwargs.iteritems():
+ new_kwargs[argname] = self.serializer.serialize_entity(context,
+ arg)
+ return new_kwargs
+
def call(self, context, msg, topic=None, version=None, timeout=None):
"""rpc.call() a remote method.
@@ -92,9 +115,11 @@ class RpcProxy(object):
:returns: The return value from the remote method.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
real_topic = self._get_topic(topic)
try:
- return rpc.call(context, real_topic, msg, timeout)
+ result = rpc.call(context, real_topic, msg, timeout)
+ return self.serializer.deserialize_entity(context, result)
except rpc.common.Timeout as exc:
raise rpc.common.Timeout(
exc.info, real_topic, msg.get('method'))
@@ -115,9 +140,11 @@ class RpcProxy(object):
from the remote method as they arrive.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
real_topic = self._get_topic(topic)
try:
- return rpc.multicall(context, real_topic, msg, timeout)
+ result = rpc.multicall(context, real_topic, msg, timeout)
+ return self.serializer.deserialize_entity(context, result)
except rpc.common.Timeout as exc:
raise rpc.common.Timeout(
exc.info, real_topic, msg.get('method'))
@@ -135,6 +162,7 @@ class RpcProxy(object):
remote method.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.cast(context, self._get_topic(topic), msg)
def fanout_cast(self, context, msg, topic=None, version=None):
@@ -150,6 +178,7 @@ class RpcProxy(object):
from the remote method.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.fanout_cast(context, self._get_topic(topic), msg)
def cast_to_server(self, context, server_params, msg, topic=None,
@@ -168,6 +197,7 @@ class RpcProxy(object):
return values.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
def fanout_cast_to_server(self, context, server_params, msg, topic=None,
@@ -186,5 +216,6 @@ class RpcProxy(object):
return values.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.fanout_cast_to_server(context, server_params,
self._get_topic(topic), msg)