summaryrefslogtreecommitdiffstats
path: root/nova/openstack
diff options
context:
space:
mode:
authorRussell Bryant <rbryant@redhat.com>2013-01-08 14:31:46 -0500
committerRussell Bryant <rbryant@redhat.com>2013-01-08 14:35:32 -0500
commitdff51fbf3ebdc5ab1a85b4457414f1f7a9a1dce1 (patch)
tree06f0d4c7f22460bcbdd7177425181c1103c01615 /nova/openstack
parent7da89a7947fd2613b3425fa1ded104878b78bda9 (diff)
downloadnova-dff51fbf3ebdc5ab1a85b4457414f1f7a9a1dce1.tar.gz
nova-dff51fbf3ebdc5ab1a85b4457414f1f7a9a1dce1.tar.xz
nova-dff51fbf3ebdc5ab1a85b4457414f1f7a9a1dce1.zip
Fix serialization in impl_zmq.
Sync rpc from oslo-incubator to include some fixes to impl_zmq: https://review.openstack.org/#/c/18913/ Change-Id: I5f5c157e81026c108df04bd385776a9985d4a497
Diffstat (limited to 'nova/openstack')
-rw-r--r--nova/openstack/common/rpc/impl_zmq.py41
1 files changed, 32 insertions, 9 deletions
diff --git a/nova/openstack/common/rpc/impl_zmq.py b/nova/openstack/common/rpc/impl_zmq.py
index ef4aa21db..d99d390f2 100644
--- a/nova/openstack/common/rpc/impl_zmq.py
+++ b/nova/openstack/common/rpc/impl_zmq.py
@@ -28,6 +28,7 @@ import greenlet
from nova.openstack.common import cfg
from nova.openstack.common.gettextutils import _
from nova.openstack.common import importutils
+from nova.openstack.common import jsonutils
from nova.openstack.common.rpc import common as rpc_common
@@ -76,6 +77,27 @@ ZMQ_CTX = None # ZeroMQ Context, must be global.
matchmaker = None # memoized matchmaker object
+def _serialize(data):
+ """
+ Serialization wrapper
+ We prefer using JSON, but it cannot encode all types.
+ Error if a developer passes us bad data.
+ """
+ try:
+ return str(jsonutils.dumps(data, ensure_ascii=True))
+ except TypeError:
+ LOG.error(_("JSON serialization failed."))
+ raise
+
+
+def _deserialize(data):
+ """
+ Deserialization wrapper
+ """
+ LOG.debug(_("Deserializing: %s"), data)
+ return jsonutils.loads(data)
+
+
class ZmqSocket(object):
"""
A tiny wrapper around ZeroMQ to simplify the send/recv protocol
@@ -186,7 +208,8 @@ class ZmqClient(object):
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
if serialize:
data = rpc_common.serialize_msg(data, force_envelope)
- self.outq.send([str(msg_id), str(topic), str('cast'), data])
+ self.outq.send([str(msg_id), str(topic), str('cast'),
+ _serialize(data)])
def close(self):
self.outq.close()
@@ -211,11 +234,11 @@ class RpcContext(rpc_common.CommonRpcContext):
@classmethod
def marshal(self, ctx):
ctx_data = ctx.to_dict()
- return rpc_common.serialize_msg(ctx_data)
+ return _serialize(ctx_data)
@classmethod
def unmarshal(self, data):
- return RpcContext.from_dict(rpc_common.deserialize_msg(data))
+ return RpcContext.from_dict(_deserialize(data))
class InternalContext(object):
@@ -229,7 +252,7 @@ class InternalContext(object):
"""Process a curried message and cast the result to topic."""
LOG.debug(_("Running func with context: %s"), ctx.to_dict())
data.setdefault('version', None)
- data.setdefault('args', [])
+ data.setdefault('args', {})
try:
result = proxy.dispatch(
@@ -300,7 +323,7 @@ class ConsumerBase(object):
return
data.setdefault('version', None)
- data.setdefault('args', [])
+ data.setdefault('args', {})
proxy.dispatch(ctx, data['version'],
data['method'], **data['args'])
@@ -412,11 +435,11 @@ class ZmqProxy(ZmqBaseReactor):
sock_type = zmq.PUB
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
- inside = rpc_common.deserialize_msg(in_msg)
+ inside = rpc_common.deserialize_msg(_deserialize(in_msg))
msg_id = inside[-1]['args']['msg_id']
response = inside[-1]['args']['response']
LOG.debug(_("->response->%s"), response)
- data = [str(msg_id), rpc_common.serialize_msg(response)]
+ data = [str(msg_id), _serialize(response)]
else:
sock_type = zmq.PUSH
@@ -459,7 +482,7 @@ class ZmqReactor(ZmqBaseReactor):
msg_id, topic, style, in_msg = data
- ctx, request = rpc_common.deserialize_msg(in_msg)
+ ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
ctx = RpcContext.unmarshal(ctx)
proxy = self.proxies[sock]
@@ -570,7 +593,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
- responses = rpc_common.deserialize_msg(msg[-1])
+ responses = _deserialize(msg[-1])
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")