diff options
Diffstat (limited to 'openstack')
| -rw-r--r-- | openstack/common/rpc/amqp.py | 5 | ||||
| -rw-r--r-- | openstack/common/rpc/dispatcher.py | 29 | ||||
| -rw-r--r-- | openstack/common/rpc/matchmaker.py | 3 | ||||
| -rw-r--r-- | openstack/common/rpc/proxy.py | 46 | ||||
| -rw-r--r-- | openstack/common/rpc/serializer.py | 52 |
5 files changed, 124 insertions, 11 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index dc9aadb..946501b 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -197,8 +197,9 @@ class ReplyProxy(ConnectionContext): msg_id = message_data.pop('_msg_id', None) waiter = self._call_waiters.get(msg_id) if not waiter: - LOG.warn(_('no calling threads waiting for msg_id : %s' - ', message : %s') % (msg_id, message_data)) + LOG.warn(_('no calling threads waiting for msg_id : %(msg_id)s' + ', message : %(data)s'), {'msg_id': msg_id, + 'data': message_data}) else: waiter.put(message_data) diff --git a/openstack/common/rpc/dispatcher.py b/openstack/common/rpc/dispatcher.py index 2efa5f2..34eadff 100644 --- a/openstack/common/rpc/dispatcher.py +++ b/openstack/common/rpc/dispatcher.py @@ -84,6 +84,7 @@ minimum version that supports the new parameter should be specified. """ from openstack.common.rpc import common as rpc_common +from openstack.common.rpc import serializer as rpc_serializer class RpcDispatcher(object): @@ -93,16 +94,38 @@ class RpcDispatcher(object): contains a list of underlying managers that have an API_VERSION attribute. """ - def __init__(self, callbacks): + def __init__(self, callbacks, serializer=None): """Initialize the rpc dispatcher. :param callbacks: List of proxy objects that are an instance of a class with rpc methods exposed. Each proxy object should have an RPC_API_VERSION attribute. + :param serializer: The Serializer object that will be used to + deserialize arguments before the method call and + to serialize the result after it returns. """ self.callbacks = callbacks + if serializer is None: + serializer = rpc_serializer.NoOpSerializer() + self.serializer = serializer super(RpcDispatcher, self).__init__() + def _deserialize_args(self, context, kwargs): + """Helper method called to deserialize args before dispatch. + + This calls our serializer on each argument, returning a new set of + args that have been deserialized. + + :param context: The request context + :param kwargs: The arguments to be deserialized + :returns: A new set of deserialized args + """ + new_kwargs = dict() + for argname, arg in kwargs.iteritems(): + new_kwargs[argname] = self.serializer.deserialize_entity(context, + arg) + return new_kwargs + def dispatch(self, ctxt, version, method, namespace, **kwargs): """Dispatch a message based on a requested version. @@ -145,7 +168,9 @@ class RpcDispatcher(object): if not hasattr(proxyobj, method): continue if is_compatible: - return getattr(proxyobj, method)(ctxt, **kwargs) + kwargs = self._deserialize_args(ctxt, kwargs) + result = getattr(proxyobj, method)(ctxt, **kwargs) + return self.serializer.serialize_entity(ctxt, result) if had_compatible: raise AttributeError("No such RPC function '%s'" % method) diff --git a/openstack/common/rpc/matchmaker.py b/openstack/common/rpc/matchmaker.py index 8072023..fd10f27 100644 --- a/openstack/common/rpc/matchmaker.py +++ b/openstack/common/rpc/matchmaker.py @@ -230,7 +230,8 @@ class HeartbeatMatchMakerBase(MatchMakerBase): self.hosts.discard(host) self.backend_unregister(key, '.'.join((key, host))) - LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host))) + LOG.info(_("Matchmaker unregistered: %(key)s, %(host)s"), + {'key': key, 'host': host}) def start_heartbeat(self): """ diff --git a/openstack/common/rpc/proxy.py b/openstack/common/rpc/proxy.py index 0b311de..3b74fe1 100644 --- a/openstack/common/rpc/proxy.py +++ b/openstack/common/rpc/proxy.py @@ -24,6 +24,7 @@ For more information about rpc API version numbers, see: from openstack.common import rpc from openstack.common.rpc import common as rpc_common +from openstack.common.rpc import serializer as rpc_serializer class RpcProxy(object): @@ -35,7 +36,11 @@ class RpcProxy(object): rpc API. """ - def __init__(self, topic, default_version, version_cap=None): + # The default namespace, which can be overriden in a subclass. + RPC_API_NAMESPACE = None + + def __init__(self, topic, default_version, version_cap=None, + serializer=None): """Initialize an RpcProxy. :param topic: The topic to use for all messages. @@ -44,10 +49,15 @@ class RpcProxy(object): basis. :param version_cap: Optionally cap the maximum version used for sent messages. + :param serializer: Optionaly (de-)serialize entities with a + provided helper. """ self.topic = topic self.default_version = default_version self.version_cap = version_cap + if serializer is None: + serializer = rpc_serializer.NoOpSerializer() + self.serializer = serializer super(RpcProxy, self).__init__() def _set_version(self, msg, vers): @@ -70,9 +80,25 @@ class RpcProxy(object): def make_namespaced_msg(method, namespace, **kwargs): return {'method': method, 'namespace': namespace, 'args': kwargs} - @staticmethod - def make_msg(method, **kwargs): - return RpcProxy.make_namespaced_msg(method, None, **kwargs) + def make_msg(self, method, **kwargs): + return self.make_namespaced_msg(method, self.RPC_API_NAMESPACE, + **kwargs) + + def _serialize_msg_args(self, context, kwargs): + """Helper method called to serialize message arguments. + + This calls our serializer on each argument, returning a new + set of args that have been serialized. + + :param context: The request context + :param kwargs: The arguments to serialize + :returns: A new set of serialized arguments + """ + new_kwargs = dict() + for argname, arg in kwargs.iteritems(): + new_kwargs[argname] = self.serializer.serialize_entity(context, + arg) + return new_kwargs def call(self, context, msg, topic=None, version=None, timeout=None): """rpc.call() a remote method. @@ -89,9 +115,11 @@ class RpcProxy(object): :returns: The return value from the remote method. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) real_topic = self._get_topic(topic) try: - return rpc.call(context, real_topic, msg, timeout) + result = rpc.call(context, real_topic, msg, timeout) + return self.serializer.deserialize_entity(context, result) except rpc.common.Timeout as exc: raise rpc.common.Timeout( exc.info, real_topic, msg.get('method')) @@ -112,9 +140,11 @@ class RpcProxy(object): from the remote method as they arrive. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) real_topic = self._get_topic(topic) try: - return rpc.multicall(context, real_topic, msg, timeout) + result = rpc.multicall(context, real_topic, msg, timeout) + return self.serializer.deserialize_entity(context, result) except rpc.common.Timeout as exc: raise rpc.common.Timeout( exc.info, real_topic, msg.get('method')) @@ -132,6 +162,7 @@ class RpcProxy(object): remote method. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) rpc.cast(context, self._get_topic(topic), msg) def fanout_cast(self, context, msg, topic=None, version=None): @@ -147,6 +178,7 @@ class RpcProxy(object): from the remote method. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) rpc.fanout_cast(context, self._get_topic(topic), msg) def cast_to_server(self, context, server_params, msg, topic=None, @@ -165,6 +197,7 @@ class RpcProxy(object): return values. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) rpc.cast_to_server(context, server_params, self._get_topic(topic), msg) def fanout_cast_to_server(self, context, server_params, msg, topic=None, @@ -183,5 +216,6 @@ class RpcProxy(object): return values. """ self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) rpc.fanout_cast_to_server(context, server_params, self._get_topic(topic), msg) diff --git a/openstack/common/rpc/serializer.py b/openstack/common/rpc/serializer.py new file mode 100644 index 0000000..0a2c9c4 --- /dev/null +++ b/openstack/common/rpc/serializer.py @@ -0,0 +1,52 @@ +# Copyright 2013 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Provides the definition of an RPC serialization handler""" + +import abc + + +class Serializer(object): + """Generic (de-)serialization definition base class""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def serialize_entity(self, context, entity): + """Serialize something to primitive form. + + :param context: Security context + :param entity: Entity to be serialized + :returns: Serialized form of entity + """ + pass + + @abc.abstractmethod + def deserialize_entity(self, context, entity): + """Deserialize something from primitive form. + + :param context: Security context + :param entity: Primitive to be deserialized + :returns: Deserialized form of entity + """ + pass + + +class NoOpSerializer(Serializer): + """A serializer that does nothing""" + + def serialize_entity(self, context, entity): + return entity + + def deserialize_entity(self, context, entity): + return entity |
