summaryrefslogtreecommitdiffstats
path: root/openstack
diff options
context:
space:
mode:
authorRussell Bryant <rbryant@redhat.com>2013-01-03 15:18:54 -0500
committerRussell Bryant <rbryant@redhat.com>2013-01-07 15:08:23 -0500
commitd44918c256d2a80cecd4489917aed9589906f07c (patch)
treee88cdb97587252a0499838d8af5049b52d5f7cc8 /openstack
parent1e50543ca215fc5d160f8b740115835e291c7dde (diff)
downloadoslo-d44918c256d2a80cecd4489917aed9589906f07c.tar.gz
oslo-d44918c256d2a80cecd4489917aed9589906f07c.tar.xz
oslo-d44918c256d2a80cecd4489917aed9589906f07c.zip
Fix serialization in impl_zmq.
When the rpc message envelope patch went in, the serialization done in impl_zmq was replaced with what is done in rpc.common. Instead, it just needs to be done as an additional stage of serialization. The reason is that the amqp drivers can handle passing dicts into the messaging libraries. The zeromq library can not handle that, so it has to ensure it is a string before passing it down to the library. This patch restores the original (unmodified) _serialize() and _deserialize() methods in impl_zmq that were removed and calls them as appropriate. Change-Id: I09cac780c9a9b15390229c63d71e360120548b3d
Diffstat (limited to 'openstack')
-rw-r--r--openstack/common/rpc/impl_zmq.py37
1 files changed, 30 insertions, 7 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index 7196125..044f312 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -28,6 +28,7 @@ import greenlet
from openstack.common import cfg
from openstack.common.gettextutils import _
from openstack.common import importutils
+from openstack.common import jsonutils
from openstack.common.rpc import common as rpc_common
@@ -76,6 +77,27 @@ ZMQ_CTX = None # ZeroMQ Context, must be global.
matchmaker = None # memoized matchmaker object
+def _serialize(data):
+ """
+ Serialization wrapper
+ We prefer using JSON, but it cannot encode all types.
+ Error if a developer passes us bad data.
+ """
+ try:
+ return str(jsonutils.dumps(data, ensure_ascii=True))
+ except TypeError:
+ LOG.error(_("JSON serialization failed."))
+ raise
+
+
+def _deserialize(data):
+ """
+ Deserialization wrapper
+ """
+ LOG.debug(_("Deserializing: %s"), data)
+ return jsonutils.loads(data)
+
+
class ZmqSocket(object):
"""
A tiny wrapper around ZeroMQ to simplify the send/recv protocol
@@ -186,7 +208,8 @@ class ZmqClient(object):
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
if serialize:
data = rpc_common.serialize_msg(data, force_envelope)
- self.outq.send([str(msg_id), str(topic), str('cast'), data])
+ self.outq.send([str(msg_id), str(topic), str('cast'),
+ _serialize(data)])
def close(self):
self.outq.close()
@@ -211,11 +234,11 @@ class RpcContext(rpc_common.CommonRpcContext):
@classmethod
def marshal(self, ctx):
ctx_data = ctx.to_dict()
- return rpc_common.serialize_msg(ctx_data)
+ return _serialize(ctx_data)
@classmethod
def unmarshal(self, data):
- return RpcContext.from_dict(rpc_common.deserialize_msg(data))
+ return RpcContext.from_dict(_deserialize(data))
class InternalContext(object):
@@ -412,11 +435,11 @@ class ZmqProxy(ZmqBaseReactor):
sock_type = zmq.PUB
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
- inside = rpc_common.deserialize_msg(in_msg)
+ inside = rpc_common.deserialize_msg(_deserialize(in_msg))
msg_id = inside[-1]['args']['msg_id']
response = inside[-1]['args']['response']
LOG.debug(_("->response->%s"), response)
- data = [str(msg_id), rpc_common.serialize_msg(response)]
+ data = [str(msg_id), _serialize(response)]
else:
sock_type = zmq.PUSH
@@ -459,7 +482,7 @@ class ZmqReactor(ZmqBaseReactor):
msg_id, topic, style, in_msg = data
- ctx, request = rpc_common.deserialize_msg(in_msg)
+ ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
ctx = RpcContext.unmarshal(ctx)
proxy = self.proxies[sock]
@@ -570,7 +593,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
- responses = rpc_common.deserialize_msg(msg[-1])
+ responses = _deserialize(msg[-1])
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")