diff options
Diffstat (limited to 'openstack/common/rpc/proxy.py')
| -rw-r--r-- | openstack/common/rpc/proxy.py | 37 |
1 files changed, 34 insertions, 3 deletions
diff --git a/openstack/common/rpc/proxy.py b/openstack/common/rpc/proxy.py index 0fb140f..3b74fe1 100644 --- a/openstack/common/rpc/proxy.py +++ b/openstack/common/rpc/proxy.py @@ -24,6 +24,7 @@ For more information about rpc API version numbers, see: from openstack.common import rpc from openstack.common.rpc import common as rpc_common +from openstack.common.rpc import serializer as rpc_serializer class RpcProxy(object): @@ -38,7 +39,8 @@ class RpcProxy(object): # The default namespace, which can be overriden in a subclass. RPC_API_NAMESPACE = None - def __init__(self, topic, default_version, version_cap=None): + def __init__(self, topic, default_version, version_cap=None, + serializer=None): """Initialize an RpcProxy. :param topic: The topic to use for all messages. @@ -47,10 +49,15 @@ class RpcProxy(object): basis. :param version_cap: Optionally cap the maximum version used for sent messages. + :param serializer: Optionaly (de-)serialize entities with a + provided helper. """ self.topic = topic self.default_version = default_version self.version_cap = version_cap + if serializer is None: + serializer = rpc_serializer.NoOpSerializer() + self.serializer = serializer super(RpcProxy, self).__init__() def _set_version(self, msg, vers): @@ -77,6 +84,22 @@ class RpcProxy(object): return self.make_namespaced_msg(method, self.RPC_API_NAMESPACE, **kwargs) + def _serialize_msg_args(self, context, kwargs): + """Helper method called to serialize message arguments. + + This calls our serializer on each argument, returning a new + set of args that have been serialized. + + :param context: The request context + :param kwargs: The arguments to serialize + :returns: A new set of serialized arguments + """ + new_kwargs = dict() + for argname, arg in kwargs.iteritems(): + new_kwargs[argname] = self.serializer.serialize_entity(context, + arg) + return new_kwargs + def call(self, context, msg, topic=None, version=None, timeout=None): """rpc.call() a remote method. @@ -92,9 +115,11 @@ class RpcProxy(object): :returns: The return value from the remote method. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) real_topic = self._get_topic(topic) try: - return rpc.call(context, real_topic, msg, timeout) + result = rpc.call(context, real_topic, msg, timeout) + return self.serializer.deserialize_entity(context, result) except rpc.common.Timeout as exc: raise rpc.common.Timeout( exc.info, real_topic, msg.get('method')) @@ -115,9 +140,11 @@ class RpcProxy(object): from the remote method as they arrive. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) real_topic = self._get_topic(topic) try: - return rpc.multicall(context, real_topic, msg, timeout) + result = rpc.multicall(context, real_topic, msg, timeout) + return self.serializer.deserialize_entity(context, result) except rpc.common.Timeout as exc: raise rpc.common.Timeout( exc.info, real_topic, msg.get('method')) @@ -135,6 +162,7 @@ class RpcProxy(object): remote method. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) rpc.cast(context, self._get_topic(topic), msg) def fanout_cast(self, context, msg, topic=None, version=None): @@ -150,6 +178,7 @@ class RpcProxy(object): from the remote method. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) rpc.fanout_cast(context, self._get_topic(topic), msg) def cast_to_server(self, context, server_params, msg, topic=None, @@ -168,6 +197,7 @@ class RpcProxy(object): return values. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) rpc.cast_to_server(context, server_params, self._get_topic(topic), msg) def fanout_cast_to_server(self, context, server_params, msg, topic=None, @@ -186,5 +216,6 @@ class RpcProxy(object): return values. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) rpc.fanout_cast_to_server(context, server_params, self._get_topic(topic), msg) |
