diff options
author | Dan Smith <danms@us.ibm.com> | 2013-05-21 09:17:35 -0700 |
---|---|---|
committer | Dan Smith <danms@us.ibm.com> | 2013-05-21 09:17:35 -0700 |
commit | 04724f696da70064592f455ec6fb6e86e07167ed (patch) | |
tree | d16a5ba5d1b2a8f95e0d5a3d3d0be06fc44c3f34 | |
parent | ddee491093984c19166dd2a7850ef10f9733a8aa (diff) | |
download | nova-04724f696da70064592f455ec6fb6e86e07167ed.tar.gz nova-04724f696da70064592f455ec6fb6e86e07167ed.tar.xz nova-04724f696da70064592f455ec6fb6e86e07167ed.zip |
Sync RPC serializer changes from Oslo
Pulls in the serializer object and changes to the dispatcher and
proxy implementations to support it.
Needed for bp/rpc-support-for-objects
Change-Id: I349de94c80a183f244cbb10009b6922996cc4d82
-rw-r--r-- | nova/openstack/common/rpc/dispatcher.py | 29 | ||||
-rw-r--r-- | nova/openstack/common/rpc/proxy.py | 37 | ||||
-rw-r--r-- | nova/openstack/common/rpc/serializer.py | 52 |
3 files changed, 113 insertions, 5 deletions
diff --git a/nova/openstack/common/rpc/dispatcher.py b/nova/openstack/common/rpc/dispatcher.py index 3c84671df..81d16bcee 100644 --- a/nova/openstack/common/rpc/dispatcher.py +++ b/nova/openstack/common/rpc/dispatcher.py @@ -84,6 +84,7 @@ minimum version that supports the new parameter should be specified. """ from nova.openstack.common.rpc import common as rpc_common +from nova.openstack.common.rpc import serializer as rpc_serializer class RpcDispatcher(object): @@ -93,16 +94,38 @@ class RpcDispatcher(object): contains a list of underlying managers that have an API_VERSION attribute. """ - def __init__(self, callbacks): + def __init__(self, callbacks, serializer=None): """Initialize the rpc dispatcher. :param callbacks: List of proxy objects that are an instance of a class with rpc methods exposed. Each proxy object should have an RPC_API_VERSION attribute. + :param serializer: The Serializer object that will be used to + deserialize arguments before the method call and + to serialize the result after it returns. """ self.callbacks = callbacks + if serializer is None: + serializer = rpc_serializer.NoOpSerializer() + self.serializer = serializer super(RpcDispatcher, self).__init__() + def _deserialize_args(self, context, kwargs): + """Helper method called to deserialize args before dispatch. + + This calls our serializer on each argument, returning a new set of + args that have been deserialized. + + :param context: The request context + :param kwargs: The arguments to be deserialized + :returns: A new set of deserialized args + """ + new_kwargs = dict() + for argname, arg in kwargs.iteritems(): + new_kwargs[argname] = self.serializer.deserialize_entity(context, + arg) + return new_kwargs + def dispatch(self, ctxt, version, method, namespace, **kwargs): """Dispatch a message based on a requested version. @@ -145,7 +168,9 @@ class RpcDispatcher(object): if not hasattr(proxyobj, method): continue if is_compatible: - return getattr(proxyobj, method)(ctxt, **kwargs) + kwargs = self._deserialize_args(ctxt, kwargs) + result = getattr(proxyobj, method)(ctxt, **kwargs) + return self.serializer.serialize_entity(ctxt, result) if had_compatible: raise AttributeError("No such RPC function '%s'" % method) diff --git a/nova/openstack/common/rpc/proxy.py b/nova/openstack/common/rpc/proxy.py index 4260ae355..dcdfc0864 100644 --- a/nova/openstack/common/rpc/proxy.py +++ b/nova/openstack/common/rpc/proxy.py @@ -24,6 +24,7 @@ For more information about rpc API version numbers, see: from nova.openstack.common import rpc from nova.openstack.common.rpc import common as rpc_common +from nova.openstack.common.rpc import serializer as rpc_serializer class RpcProxy(object): @@ -38,7 +39,8 @@ class RpcProxy(object): # The default namespace, which can be overriden in a subclass. RPC_API_NAMESPACE = None - def __init__(self, topic, default_version, version_cap=None): + def __init__(self, topic, default_version, version_cap=None, + serializer=None): """Initialize an RpcProxy. :param topic: The topic to use for all messages. @@ -47,10 +49,15 @@ class RpcProxy(object): basis. :param version_cap: Optionally cap the maximum version used for sent messages. + :param serializer: Optionaly (de-)serialize entities with a + provided helper. """ self.topic = topic self.default_version = default_version self.version_cap = version_cap + if serializer is None: + serializer = rpc_serializer.NoOpSerializer() + self.serializer = serializer super(RpcProxy, self).__init__() def _set_version(self, msg, vers): @@ -77,6 +84,22 @@ class RpcProxy(object): return self.make_namespaced_msg(method, self.RPC_API_NAMESPACE, **kwargs) + def _serialize_msg_args(self, context, kwargs): + """Helper method called to serialize message arguments. + + This calls our serializer on each argument, returning a new + set of args that have been serialized. + + :param context: The request context + :param kwargs: The arguments to serialize + :returns: A new set of serialized arguments + """ + new_kwargs = dict() + for argname, arg in kwargs.iteritems(): + new_kwargs[argname] = self.serializer.serialize_entity(context, + arg) + return new_kwargs + def call(self, context, msg, topic=None, version=None, timeout=None): """rpc.call() a remote method. @@ -92,9 +115,11 @@ class RpcProxy(object): :returns: The return value from the remote method. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) real_topic = self._get_topic(topic) try: - return rpc.call(context, real_topic, msg, timeout) + result = rpc.call(context, real_topic, msg, timeout) + return self.serializer.deserialize_entity(context, result) except rpc.common.Timeout as exc: raise rpc.common.Timeout( exc.info, real_topic, msg.get('method')) @@ -115,9 +140,11 @@ class RpcProxy(object): from the remote method as they arrive. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) real_topic = self._get_topic(topic) try: - return rpc.multicall(context, real_topic, msg, timeout) + result = rpc.multicall(context, real_topic, msg, timeout) + return self.serializer.deserialize_entity(context, result) except rpc.common.Timeout as exc: raise rpc.common.Timeout( exc.info, real_topic, msg.get('method')) @@ -135,6 +162,7 @@ class RpcProxy(object): remote method. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) rpc.cast(context, self._get_topic(topic), msg) def fanout_cast(self, context, msg, topic=None, version=None): @@ -150,6 +178,7 @@ class RpcProxy(object): from the remote method. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) rpc.fanout_cast(context, self._get_topic(topic), msg) def cast_to_server(self, context, server_params, msg, topic=None, @@ -168,6 +197,7 @@ class RpcProxy(object): return values. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) rpc.cast_to_server(context, server_params, self._get_topic(topic), msg) def fanout_cast_to_server(self, context, server_params, msg, topic=None, @@ -186,5 +216,6 @@ class RpcProxy(object): return values. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) rpc.fanout_cast_to_server(context, server_params, self._get_topic(topic), msg) diff --git a/nova/openstack/common/rpc/serializer.py b/nova/openstack/common/rpc/serializer.py new file mode 100644 index 000000000..0a2c9c4f1 --- /dev/null +++ b/nova/openstack/common/rpc/serializer.py @@ -0,0 +1,52 @@ +# Copyright 2013 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Provides the definition of an RPC serialization handler""" + +import abc + + +class Serializer(object): + """Generic (de-)serialization definition base class""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def serialize_entity(self, context, entity): + """Serialize something to primitive form. + + :param context: Security context + :param entity: Entity to be serialized + :returns: Serialized form of entity + """ + pass + + @abc.abstractmethod + def deserialize_entity(self, context, entity): + """Deserialize something from primitive form. + + :param context: Security context + :param entity: Primitive to be deserialized + :returns: Deserialized form of entity + """ + pass + + +class NoOpSerializer(Serializer): + """A serializer that does nothing""" + + def serialize_entity(self, context, entity): + return entity + + def deserialize_entity(self, context, entity): + return entity |