summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDan Smith <danms@us.ibm.com>2013-05-21 09:17:35 -0700
committerDan Smith <danms@us.ibm.com>2013-05-21 09:17:35 -0700
commit04724f696da70064592f455ec6fb6e86e07167ed (patch)
treed16a5ba5d1b2a8f95e0d5a3d3d0be06fc44c3f34
parentddee491093984c19166dd2a7850ef10f9733a8aa (diff)
Sync RPC serializer changes from Oslo
Pulls in the serializer object and changes to the dispatcher and proxy implementations to support it. Needed for bp/rpc-support-for-objects Change-Id: I349de94c80a183f244cbb10009b6922996cc4d82
-rw-r--r--nova/openstack/common/rpc/dispatcher.py29
-rw-r--r--nova/openstack/common/rpc/proxy.py37
-rw-r--r--nova/openstack/common/rpc/serializer.py52
3 files changed, 113 insertions, 5 deletions
diff --git a/nova/openstack/common/rpc/dispatcher.py b/nova/openstack/common/rpc/dispatcher.py
index 3c84671df..81d16bcee 100644
--- a/nova/openstack/common/rpc/dispatcher.py
+++ b/nova/openstack/common/rpc/dispatcher.py
@@ -84,6 +84,7 @@ minimum version that supports the new parameter should be specified.
"""
from nova.openstack.common.rpc import common as rpc_common
+from nova.openstack.common.rpc import serializer as rpc_serializer
class RpcDispatcher(object):
@@ -93,16 +94,38 @@ class RpcDispatcher(object):
contains a list of underlying managers that have an API_VERSION attribute.
"""
- def __init__(self, callbacks):
+ def __init__(self, callbacks, serializer=None):
"""Initialize the rpc dispatcher.
:param callbacks: List of proxy objects that are an instance
of a class with rpc methods exposed. Each proxy
object should have an RPC_API_VERSION attribute.
+ :param serializer: The Serializer object that will be used to
+ deserialize arguments before the method call and
+ to serialize the result after it returns.
"""
self.callbacks = callbacks
+ if serializer is None:
+ serializer = rpc_serializer.NoOpSerializer()
+ self.serializer = serializer
super(RpcDispatcher, self).__init__()
+ def _deserialize_args(self, context, kwargs):
+ """Helper method called to deserialize args before dispatch.
+
+ This calls our serializer on each argument, returning a new set of
+ args that have been deserialized.
+
+ :param context: The request context
+ :param kwargs: The arguments to be deserialized
+ :returns: A new set of deserialized args
+ """
+ new_kwargs = dict()
+ for argname, arg in kwargs.iteritems():
+ new_kwargs[argname] = self.serializer.deserialize_entity(context,
+ arg)
+ return new_kwargs
+
def dispatch(self, ctxt, version, method, namespace, **kwargs):
"""Dispatch a message based on a requested version.
@@ -145,7 +168,9 @@ class RpcDispatcher(object):
if not hasattr(proxyobj, method):
continue
if is_compatible:
- return getattr(proxyobj, method)(ctxt, **kwargs)
+ kwargs = self._deserialize_args(ctxt, kwargs)
+ result = getattr(proxyobj, method)(ctxt, **kwargs)
+ return self.serializer.serialize_entity(ctxt, result)
if had_compatible:
raise AttributeError("No such RPC function '%s'" % method)
diff --git a/nova/openstack/common/rpc/proxy.py b/nova/openstack/common/rpc/proxy.py
index 4260ae355..dcdfc0864 100644
--- a/nova/openstack/common/rpc/proxy.py
+++ b/nova/openstack/common/rpc/proxy.py
@@ -24,6 +24,7 @@ For more information about rpc API version numbers, see:
from nova.openstack.common import rpc
from nova.openstack.common.rpc import common as rpc_common
+from nova.openstack.common.rpc import serializer as rpc_serializer
class RpcProxy(object):
@@ -38,7 +39,8 @@ class RpcProxy(object):
# The default namespace, which can be overriden in a subclass.
RPC_API_NAMESPACE = None
- def __init__(self, topic, default_version, version_cap=None):
+ def __init__(self, topic, default_version, version_cap=None,
+ serializer=None):
"""Initialize an RpcProxy.
:param topic: The topic to use for all messages.
@@ -47,10 +49,15 @@ class RpcProxy(object):
basis.
:param version_cap: Optionally cap the maximum version used for sent
messages.
+ :param serializer: Optionaly (de-)serialize entities with a
+ provided helper.
"""
self.topic = topic
self.default_version = default_version
self.version_cap = version_cap
+ if serializer is None:
+ serializer = rpc_serializer.NoOpSerializer()
+ self.serializer = serializer
super(RpcProxy, self).__init__()
def _set_version(self, msg, vers):
@@ -77,6 +84,22 @@ class RpcProxy(object):
return self.make_namespaced_msg(method, self.RPC_API_NAMESPACE,
**kwargs)
+ def _serialize_msg_args(self, context, kwargs):
+ """Helper method called to serialize message arguments.
+
+ This calls our serializer on each argument, returning a new
+ set of args that have been serialized.
+
+ :param context: The request context
+ :param kwargs: The arguments to serialize
+ :returns: A new set of serialized arguments
+ """
+ new_kwargs = dict()
+ for argname, arg in kwargs.iteritems():
+ new_kwargs[argname] = self.serializer.serialize_entity(context,
+ arg)
+ return new_kwargs
+
def call(self, context, msg, topic=None, version=None, timeout=None):
"""rpc.call() a remote method.
@@ -92,9 +115,11 @@ class RpcProxy(object):
:returns: The return value from the remote method.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
real_topic = self._get_topic(topic)
try:
- return rpc.call(context, real_topic, msg, timeout)
+ result = rpc.call(context, real_topic, msg, timeout)
+ return self.serializer.deserialize_entity(context, result)
except rpc.common.Timeout as exc:
raise rpc.common.Timeout(
exc.info, real_topic, msg.get('method'))
@@ -115,9 +140,11 @@ class RpcProxy(object):
from the remote method as they arrive.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
real_topic = self._get_topic(topic)
try:
- return rpc.multicall(context, real_topic, msg, timeout)
+ result = rpc.multicall(context, real_topic, msg, timeout)
+ return self.serializer.deserialize_entity(context, result)
except rpc.common.Timeout as exc:
raise rpc.common.Timeout(
exc.info, real_topic, msg.get('method'))
@@ -135,6 +162,7 @@ class RpcProxy(object):
remote method.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.cast(context, self._get_topic(topic), msg)
def fanout_cast(self, context, msg, topic=None, version=None):
@@ -150,6 +178,7 @@ class RpcProxy(object):
from the remote method.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.fanout_cast(context, self._get_topic(topic), msg)
def cast_to_server(self, context, server_params, msg, topic=None,
@@ -168,6 +197,7 @@ class RpcProxy(object):
return values.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
def fanout_cast_to_server(self, context, server_params, msg, topic=None,
@@ -186,5 +216,6 @@ class RpcProxy(object):
return values.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.fanout_cast_to_server(context, server_params,
self._get_topic(topic), msg)
diff --git a/nova/openstack/common/rpc/serializer.py b/nova/openstack/common/rpc/serializer.py
new file mode 100644
index 000000000..0a2c9c4f1
--- /dev/null
+++ b/nova/openstack/common/rpc/serializer.py
@@ -0,0 +1,52 @@
+# Copyright 2013 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Provides the definition of an RPC serialization handler"""
+
+import abc
+
+
+class Serializer(object):
+ """Generic (de-)serialization definition base class"""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def serialize_entity(self, context, entity):
+ """Serialize something to primitive form.
+
+ :param context: Security context
+ :param entity: Entity to be serialized
+ :returns: Serialized form of entity
+ """
+ pass
+
+ @abc.abstractmethod
+ def deserialize_entity(self, context, entity):
+ """Deserialize something from primitive form.
+
+ :param context: Security context
+ :param entity: Primitive to be deserialized
+ :returns: Deserialized form of entity
+ """
+ pass
+
+
+class NoOpSerializer(Serializer):
+ """A serializer that does nothing"""
+
+ def serialize_entity(self, context, entity):
+ return entity
+
+ def deserialize_entity(self, context, entity):
+ return entity