diff options
| author | Russell Bryant <rbryant@redhat.com> | 2013-01-03 15:18:54 -0500 |
|---|---|---|
| committer | Russell Bryant <rbryant@redhat.com> | 2013-01-07 15:08:23 -0500 |
| commit | d44918c256d2a80cecd4489917aed9589906f07c (patch) | |
| tree | e88cdb97587252a0499838d8af5049b52d5f7cc8 /openstack | |
| parent | 1e50543ca215fc5d160f8b740115835e291c7dde (diff) | |
| download | oslo-d44918c256d2a80cecd4489917aed9589906f07c.tar.gz oslo-d44918c256d2a80cecd4489917aed9589906f07c.tar.xz oslo-d44918c256d2a80cecd4489917aed9589906f07c.zip | |
Fix serialization in impl_zmq.
When the rpc message envelope patch went in, the serialization done in
impl_zmq was replaced with what is done in rpc.common. Instead, it just
needs to be done as an additional stage of serialization. The reason is
that the amqp drivers can handle passing dicts into the messaging
libraries. The zeromq library can not handle that, so it has to ensure
it is a string before passing it down to the library.
This patch restores the original (unmodified) _serialize() and
_deserialize() methods in impl_zmq that were removed and calls them as
appropriate.
Change-Id: I09cac780c9a9b15390229c63d71e360120548b3d
Diffstat (limited to 'openstack')
| -rw-r--r-- | openstack/common/rpc/impl_zmq.py | 37 |
1 files changed, 30 insertions, 7 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index 7196125..044f312 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -28,6 +28,7 @@ import greenlet from openstack.common import cfg from openstack.common.gettextutils import _ from openstack.common import importutils +from openstack.common import jsonutils from openstack.common.rpc import common as rpc_common @@ -76,6 +77,27 @@ ZMQ_CTX = None # ZeroMQ Context, must be global. matchmaker = None # memoized matchmaker object +def _serialize(data): + """ + Serialization wrapper + We prefer using JSON, but it cannot encode all types. + Error if a developer passes us bad data. + """ + try: + return str(jsonutils.dumps(data, ensure_ascii=True)) + except TypeError: + LOG.error(_("JSON serialization failed.")) + raise + + +def _deserialize(data): + """ + Deserialization wrapper + """ + LOG.debug(_("Deserializing: %s"), data) + return jsonutils.loads(data) + + class ZmqSocket(object): """ A tiny wrapper around ZeroMQ to simplify the send/recv protocol @@ -186,7 +208,8 @@ class ZmqClient(object): def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): if serialize: data = rpc_common.serialize_msg(data, force_envelope) - self.outq.send([str(msg_id), str(topic), str('cast'), data]) + self.outq.send([str(msg_id), str(topic), str('cast'), + _serialize(data)]) def close(self): self.outq.close() @@ -211,11 +234,11 @@ class RpcContext(rpc_common.CommonRpcContext): @classmethod def marshal(self, ctx): ctx_data = ctx.to_dict() - return rpc_common.serialize_msg(ctx_data) + return _serialize(ctx_data) @classmethod def unmarshal(self, data): - return RpcContext.from_dict(rpc_common.deserialize_msg(data)) + return RpcContext.from_dict(_deserialize(data)) class InternalContext(object): @@ -412,11 +435,11 @@ class ZmqProxy(ZmqBaseReactor): sock_type = zmq.PUB elif topic.startswith('zmq_replies'): sock_type = zmq.PUB - inside = rpc_common.deserialize_msg(in_msg) + inside = rpc_common.deserialize_msg(_deserialize(in_msg)) msg_id = inside[-1]['args']['msg_id'] response = inside[-1]['args']['response'] LOG.debug(_("->response->%s"), response) - data = [str(msg_id), rpc_common.serialize_msg(response)] + data = [str(msg_id), _serialize(response)] else: sock_type = zmq.PUSH @@ -459,7 +482,7 @@ class ZmqReactor(ZmqBaseReactor): msg_id, topic, style, in_msg = data - ctx, request = rpc_common.deserialize_msg(in_msg) + ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg)) ctx = RpcContext.unmarshal(ctx) proxy = self.proxies[sock] @@ -570,7 +593,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): msg = msg_waiter.recv() LOG.debug(_("Received message: %s"), msg) LOG.debug(_("Unpacking response")) - responses = rpc_common.deserialize_msg(msg[-1]) + responses = _deserialize(msg[-1]) # ZMQError trumps the Timeout error. except zmq.ZMQError: raise RPCException("ZMQ Socket Error") |
