summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDan Smith <danms@us.ibm.com>2013-05-21 09:17:35 -0700
committerDan Smith <danms@us.ibm.com>2013-05-21 09:17:35 -0700
commit04724f696da70064592f455ec6fb6e86e07167ed (patch)
treed16a5ba5d1b2a8f95e0d5a3d3d0be06fc44c3f34
parentddee491093984c19166dd2a7850ef10f9733a8aa (diff)
downloadnova-04724f696da70064592f455ec6fb6e86e07167ed.tar.gz
nova-04724f696da70064592f455ec6fb6e86e07167ed.tar.xz
nova-04724f696da70064592f455ec6fb6e86e07167ed.zip
Sync RPC serializer changes from Oslo
Pulls in the serializer object and changes to the dispatcher and proxy implementations to support it. Needed for bp/rpc-support-for-objects Change-Id: I349de94c80a183f244cbb10009b6922996cc4d82
-rw-r--r--nova/openstack/common/rpc/dispatcher.py29
-rw-r--r--nova/openstack/common/rpc/proxy.py37
-rw-r--r--nova/openstack/common/rpc/serializer.py52
3 files changed, 113 insertions, 5 deletions
diff --git a/nova/openstack/common/rpc/dispatcher.py b/nova/openstack/common/rpc/dispatcher.py
index 3c84671df..81d16bcee 100644
--- a/nova/openstack/common/rpc/dispatcher.py
+++ b/nova/openstack/common/rpc/dispatcher.py
@@ -84,6 +84,7 @@ minimum version that supports the new parameter should be specified.
"""
from nova.openstack.common.rpc import common as rpc_common
+from nova.openstack.common.rpc import serializer as rpc_serializer
class RpcDispatcher(object):
@@ -93,16 +94,38 @@ class RpcDispatcher(object):
contains a list of underlying managers that have an API_VERSION attribute.
"""
- def __init__(self, callbacks):
+ def __init__(self, callbacks, serializer=None):
"""Initialize the rpc dispatcher.
:param callbacks: List of proxy objects that are an instance
of a class with rpc methods exposed. Each proxy
object should have an RPC_API_VERSION attribute.
+ :param serializer: The Serializer object that will be used to
+ deserialize arguments before the method call and
+ to serialize the result after it returns.
"""
self.callbacks = callbacks
+ if serializer is None:
+ serializer = rpc_serializer.NoOpSerializer()
+ self.serializer = serializer
super(RpcDispatcher, self).__init__()
+ def _deserialize_args(self, context, kwargs):
+ """Helper method called to deserialize args before dispatch.
+
+ This calls our serializer on each argument, returning a new set of
+ args that have been deserialized.
+
+ :param context: The request context
+ :param kwargs: The arguments to be deserialized
+ :returns: A new set of deserialized args
+ """
+ new_kwargs = dict()
+ for argname, arg in kwargs.iteritems():
+ new_kwargs[argname] = self.serializer.deserialize_entity(context,
+ arg)
+ return new_kwargs
+
def dispatch(self, ctxt, version, method, namespace, **kwargs):
"""Dispatch a message based on a requested version.
@@ -145,7 +168,9 @@ class RpcDispatcher(object):
if not hasattr(proxyobj, method):
continue
if is_compatible:
- return getattr(proxyobj, method)(ctxt, **kwargs)
+ kwargs = self._deserialize_args(ctxt, kwargs)
+ result = getattr(proxyobj, method)(ctxt, **kwargs)
+ return self.serializer.serialize_entity(ctxt, result)
if had_compatible:
raise AttributeError("No such RPC function '%s'" % method)
diff --git a/nova/openstack/common/rpc/proxy.py b/nova/openstack/common/rpc/proxy.py
index 4260ae355..dcdfc0864 100644
--- a/nova/openstack/common/rpc/proxy.py
+++ b/nova/openstack/common/rpc/proxy.py
@@ -24,6 +24,7 @@ For more information about rpc API version numbers, see:
from nova.openstack.common import rpc
from nova.openstack.common.rpc import common as rpc_common
+from nova.openstack.common.rpc import serializer as rpc_serializer
class RpcProxy(object):
@@ -38,7 +39,8 @@ class RpcProxy(object):
# The default namespace, which can be overriden in a subclass.
RPC_API_NAMESPACE = None
- def __init__(self, topic, default_version, version_cap=None):
+ def __init__(self, topic, default_version, version_cap=None,
+ serializer=None):
"""Initialize an RpcProxy.
:param topic: The topic to use for all messages.
@@ -47,10 +49,15 @@ class RpcProxy(object):
basis.
:param version_cap: Optionally cap the maximum version used for sent
messages.
+ :param serializer: Optionaly (de-)serialize entities with a
+ provided helper.
"""
self.topic = topic
self.default_version = default_version
self.version_cap = version_cap
+ if serializer is None:
+ serializer = rpc_serializer.NoOpSerializer()
+ self.serializer = serializer
super(RpcProxy, self).__init__()
def _set_version(self, msg, vers):
@@ -77,6 +84,22 @@ class RpcProxy(object):
return self.make_namespaced_msg(method, self.RPC_API_NAMESPACE,
**kwargs)
+ def _serialize_msg_args(self, context, kwargs):
+ """Helper method called to serialize message arguments.
+
+ This calls our serializer on each argument, returning a new
+ set of args that have been serialized.
+
+ :param context: The request context
+ :param kwargs: The arguments to serialize
+ :returns: A new set of serialized arguments
+ """
+ new_kwargs = dict()
+ for argname, arg in kwargs.iteritems():
+ new_kwargs[argname] = self.serializer.serialize_entity(context,
+ arg)
+ return new_kwargs
+
def call(self, context, msg, topic=None, version=None, timeout=None):
"""rpc.call() a remote method.
@@ -92,9 +115,11 @@ class RpcProxy(object):
:returns: The return value from the remote method.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
real_topic = self._get_topic(topic)
try:
- return rpc.call(context, real_topic, msg, timeout)
+ result = rpc.call(context, real_topic, msg, timeout)
+ return self.serializer.deserialize_entity(context, result)
except rpc.common.Timeout as exc:
raise rpc.common.Timeout(
exc.info, real_topic, msg.get('method'))
@@ -115,9 +140,11 @@ class RpcProxy(object):
from the remote method as they arrive.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
real_topic = self._get_topic(topic)
try:
- return rpc.multicall(context, real_topic, msg, timeout)
+ result = rpc.multicall(context, real_topic, msg, timeout)
+ return self.serializer.deserialize_entity(context, result)
except rpc.common.Timeout as exc:
raise rpc.common.Timeout(
exc.info, real_topic, msg.get('method'))
@@ -135,6 +162,7 @@ class RpcProxy(object):
remote method.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.cast(context, self._get_topic(topic), msg)
def fanout_cast(self, context, msg, topic=None, version=None):
@@ -150,6 +178,7 @@ class RpcProxy(object):
from the remote method.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.fanout_cast(context, self._get_topic(topic), msg)
def cast_to_server(self, context, server_params, msg, topic=None,
@@ -168,6 +197,7 @@ class RpcProxy(object):
return values.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
def fanout_cast_to_server(self, context, server_params, msg, topic=None,
@@ -186,5 +216,6 @@ class RpcProxy(object):
return values.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.fanout_cast_to_server(context, server_params,
self._get_topic(topic), msg)
diff --git a/nova/openstack/common/rpc/serializer.py b/nova/openstack/common/rpc/serializer.py
new file mode 100644
index 000000000..0a2c9c4f1
--- /dev/null
+++ b/nova/openstack/common/rpc/serializer.py
@@ -0,0 +1,52 @@
+# Copyright 2013 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Provides the definition of an RPC serialization handler"""
+
+import abc
+
+
+class Serializer(object):
+ """Generic (de-)serialization definition base class"""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def serialize_entity(self, context, entity):
+ """Serialize something to primitive form.
+
+ :param context: Security context
+ :param entity: Entity to be serialized
+ :returns: Serialized form of entity
+ """
+ pass
+
+ @abc.abstractmethod
+ def deserialize_entity(self, context, entity):
+ """Deserialize something from primitive form.
+
+ :param context: Security context
+ :param entity: Primitive to be deserialized
+ :returns: Deserialized form of entity
+ """
+ pass
+
+
+class NoOpSerializer(Serializer):
+ """A serializer that does nothing"""
+
+ def serialize_entity(self, context, entity):
+ return entity
+
+ def deserialize_entity(self, context, entity):
+ return entity