summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-01-08 02:18:33 +0000
committerGerrit Code Review <review@openstack.org>2013-01-08 02:18:33 +0000
commitad93e4e30e79cbb4c435068ce5bfa01e6d6b8953 (patch)
treead1beb74822d50e8a90dc00f7af0e20d979c9ee2
parentc7a5a8bf79c7c8a3878e54f8bae89cca65bd17e3 (diff)
parentd44918c256d2a80cecd4489917aed9589906f07c (diff)
downloadoslo-ad93e4e30e79cbb4c435068ce5bfa01e6d6b8953.tar.gz
oslo-ad93e4e30e79cbb4c435068ce5bfa01e6d6b8953.tar.xz
oslo-ad93e4e30e79cbb4c435068ce5bfa01e6d6b8953.zip
Merge "Fix serialization in impl_zmq."
-rw-r--r--openstack/common/rpc/impl_zmq.py37
1 files changed, 30 insertions, 7 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index 7196125..044f312 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -28,6 +28,7 @@ import greenlet
from openstack.common import cfg
from openstack.common.gettextutils import _
from openstack.common import importutils
+from openstack.common import jsonutils
from openstack.common.rpc import common as rpc_common
@@ -76,6 +77,27 @@ ZMQ_CTX = None # ZeroMQ Context, must be global.
matchmaker = None # memoized matchmaker object
+def _serialize(data):
+ """
+ Serialization wrapper
+ We prefer using JSON, but it cannot encode all types.
+ Error if a developer passes us bad data.
+ """
+ try:
+ return str(jsonutils.dumps(data, ensure_ascii=True))
+ except TypeError:
+ LOG.error(_("JSON serialization failed."))
+ raise
+
+
+def _deserialize(data):
+ """
+ Deserialization wrapper
+ """
+ LOG.debug(_("Deserializing: %s"), data)
+ return jsonutils.loads(data)
+
+
class ZmqSocket(object):
"""
A tiny wrapper around ZeroMQ to simplify the send/recv protocol
@@ -186,7 +208,8 @@ class ZmqClient(object):
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
if serialize:
data = rpc_common.serialize_msg(data, force_envelope)
- self.outq.send([str(msg_id), str(topic), str('cast'), data])
+ self.outq.send([str(msg_id), str(topic), str('cast'),
+ _serialize(data)])
def close(self):
self.outq.close()
@@ -211,11 +234,11 @@ class RpcContext(rpc_common.CommonRpcContext):
@classmethod
def marshal(self, ctx):
ctx_data = ctx.to_dict()
- return rpc_common.serialize_msg(ctx_data)
+ return _serialize(ctx_data)
@classmethod
def unmarshal(self, data):
- return RpcContext.from_dict(rpc_common.deserialize_msg(data))
+ return RpcContext.from_dict(_deserialize(data))
class InternalContext(object):
@@ -412,11 +435,11 @@ class ZmqProxy(ZmqBaseReactor):
sock_type = zmq.PUB
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
- inside = rpc_common.deserialize_msg(in_msg)
+ inside = rpc_common.deserialize_msg(_deserialize(in_msg))
msg_id = inside[-1]['args']['msg_id']
response = inside[-1]['args']['response']
LOG.debug(_("->response->%s"), response)
- data = [str(msg_id), rpc_common.serialize_msg(response)]
+ data = [str(msg_id), _serialize(response)]
else:
sock_type = zmq.PUSH
@@ -459,7 +482,7 @@ class ZmqReactor(ZmqBaseReactor):
msg_id, topic, style, in_msg = data
- ctx, request = rpc_common.deserialize_msg(in_msg)
+ ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
ctx = RpcContext.unmarshal(ctx)
proxy = self.proxies[sock]
@@ -570,7 +593,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
- responses = rpc_common.deserialize_msg(msg[-1])
+ responses = _deserialize(msg[-1])
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")