diff options
| author | Jenkins <jenkins@review.openstack.org> | 2013-01-08 02:18:33 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2013-01-08 02:18:33 +0000 |
| commit | ad93e4e30e79cbb4c435068ce5bfa01e6d6b8953 (patch) | |
| tree | ad1beb74822d50e8a90dc00f7af0e20d979c9ee2 | |
| parent | c7a5a8bf79c7c8a3878e54f8bae89cca65bd17e3 (diff) | |
| parent | d44918c256d2a80cecd4489917aed9589906f07c (diff) | |
| download | oslo-ad93e4e30e79cbb4c435068ce5bfa01e6d6b8953.tar.gz oslo-ad93e4e30e79cbb4c435068ce5bfa01e6d6b8953.tar.xz oslo-ad93e4e30e79cbb4c435068ce5bfa01e6d6b8953.zip | |
Merge "Fix serialization in impl_zmq."
| -rw-r--r-- | openstack/common/rpc/impl_zmq.py | 37 |
1 files changed, 30 insertions, 7 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index 7196125..044f312 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -28,6 +28,7 @@ import greenlet from openstack.common import cfg from openstack.common.gettextutils import _ from openstack.common import importutils +from openstack.common import jsonutils from openstack.common.rpc import common as rpc_common @@ -76,6 +77,27 @@ ZMQ_CTX = None # ZeroMQ Context, must be global. matchmaker = None # memoized matchmaker object +def _serialize(data): + """ + Serialization wrapper + We prefer using JSON, but it cannot encode all types. + Error if a developer passes us bad data. + """ + try: + return str(jsonutils.dumps(data, ensure_ascii=True)) + except TypeError: + LOG.error(_("JSON serialization failed.")) + raise + + +def _deserialize(data): + """ + Deserialization wrapper + """ + LOG.debug(_("Deserializing: %s"), data) + return jsonutils.loads(data) + + class ZmqSocket(object): """ A tiny wrapper around ZeroMQ to simplify the send/recv protocol @@ -186,7 +208,8 @@ class ZmqClient(object): def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): if serialize: data = rpc_common.serialize_msg(data, force_envelope) - self.outq.send([str(msg_id), str(topic), str('cast'), data]) + self.outq.send([str(msg_id), str(topic), str('cast'), + _serialize(data)]) def close(self): self.outq.close() @@ -211,11 +234,11 @@ class RpcContext(rpc_common.CommonRpcContext): @classmethod def marshal(self, ctx): ctx_data = ctx.to_dict() - return rpc_common.serialize_msg(ctx_data) + return _serialize(ctx_data) @classmethod def unmarshal(self, data): - return RpcContext.from_dict(rpc_common.deserialize_msg(data)) + return RpcContext.from_dict(_deserialize(data)) class InternalContext(object): @@ -412,11 +435,11 @@ class ZmqProxy(ZmqBaseReactor): sock_type = zmq.PUB elif topic.startswith('zmq_replies'): sock_type = zmq.PUB - inside = rpc_common.deserialize_msg(in_msg) + inside = rpc_common.deserialize_msg(_deserialize(in_msg)) msg_id = inside[-1]['args']['msg_id'] response = inside[-1]['args']['response'] LOG.debug(_("->response->%s"), response) - data = [str(msg_id), rpc_common.serialize_msg(response)] + data = [str(msg_id), _serialize(response)] else: sock_type = zmq.PUSH @@ -459,7 +482,7 @@ class ZmqReactor(ZmqBaseReactor): msg_id, topic, style, in_msg = data - ctx, request = rpc_common.deserialize_msg(in_msg) + ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg)) ctx = RpcContext.unmarshal(ctx) proxy = self.proxies[sock] @@ -570,7 +593,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): msg = msg_waiter.recv() LOG.debug(_("Received message: %s"), msg) LOG.debug(_("Unpacking response")) - responses = rpc_common.deserialize_msg(msg[-1]) + responses = _deserialize(msg[-1]) # ZMQError trumps the Timeout error. except zmq.ZMQError: raise RPCException("ZMQ Socket Error") |
