diff options
Diffstat (limited to 'openstack/common/rpc/impl_zmq.py')
| -rw-r--r-- | openstack/common/rpc/impl_zmq.py | 50 |
1 files changed, 15 insertions, 35 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index 33562b0..880896b 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -28,7 +28,6 @@ import greenlet from openstack.common import cfg from openstack.common.gettextutils import _ from openstack.common import importutils -from openstack.common import jsonutils from openstack.common.rpc import common as rpc_common @@ -77,27 +76,6 @@ ZMQ_CTX = None # ZeroMQ Context, must be global. matchmaker = None # memoized matchmaker object -def _serialize(data): - """ - Serialization wrapper - We prefer using JSON, but it cannot encode all types. - Error if a developer passes us bad data. - """ - try: - return str(jsonutils.dumps(data, ensure_ascii=True)) - except TypeError: - LOG.error(_("JSON serialization failed.")) - raise - - -def _deserialize(data): - """ - Deserialization wrapper - """ - LOG.debug(_("Deserializing: %s"), data) - return jsonutils.loads(data) - - class ZmqSocket(object): """ A tiny wrapper around ZeroMQ to simplify the send/recv protocol @@ -205,9 +183,10 @@ class ZmqClient(object): def __init__(self, addr, socket_type=zmq.PUSH, bind=False): self.outq = ZmqSocket(addr, socket_type, bind=bind) - def cast(self, msg_id, topic, data): - self.outq.send([str(msg_id), str(topic), str('cast'), - _serialize(data)]) + def cast(self, msg_id, topic, data, serialize=True): + if serialize: + data = rpc_common.serialize_msg(data) + self.outq.send([str(msg_id), str(topic), str('cast'), data]) def close(self): self.outq.close() @@ -232,11 +211,11 @@ class RpcContext(rpc_common.CommonRpcContext): @classmethod def marshal(self, ctx): ctx_data = ctx.to_dict() - return _serialize(ctx_data) + return rpc_common.serialize_msg(ctx_data) @classmethod def unmarshal(self, data): - return RpcContext.from_dict(_deserialize(data)) + return RpcContext.from_dict(rpc_common.deserialize_msg(data)) class InternalContext(object): @@ -433,11 +412,11 @@ class ZmqProxy(ZmqBaseReactor): sock_type = zmq.PUB elif topic.startswith('zmq_replies'): sock_type = zmq.PUB - inside = _deserialize(in_msg) + inside = rpc_common.deserialize_msg(in_msg) msg_id = inside[-1]['args']['msg_id'] response = inside[-1]['args']['response'] LOG.debug(_("->response->%s"), response) - data = [str(msg_id), _serialize(response)] + data = [str(msg_id), rpc_common.serialize_msg(response)] else: sock_type = zmq.PUSH @@ -480,7 +459,7 @@ class ZmqReactor(ZmqBaseReactor): msg_id, topic, style, in_msg = data - ctx, request = _deserialize(in_msg) + ctx, request = rpc_common.deserialize_msg(in_msg) ctx = RpcContext.unmarshal(ctx) proxy = self.proxies[sock] @@ -531,7 +510,7 @@ class Connection(rpc_common.Connection): self.reactor.consume_in_thread() -def _cast(addr, context, msg_id, topic, msg, timeout=None): +def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True): timeout_cast = timeout or CONF.rpc_cast_timeout payload = [RpcContext.marshal(context), msg] @@ -540,7 +519,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None): conn = ZmqClient(addr) # assumes cast can't return an exception - conn.cast(msg_id, topic, payload) + conn.cast(msg_id, topic, payload, serialize) except zmq.ZMQError: raise RPCException("Cast failed. ZMQ Socket Exception") finally: @@ -590,7 +569,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): msg = msg_waiter.recv() LOG.debug(_("Received message: %s"), msg) LOG.debug(_("Unpacking response")) - responses = _deserialize(msg[-1]) + responses = rpc_common.deserialize_msg(msg[-1]) # ZMQError trumps the Timeout error. except zmq.ZMQError: raise RPCException("ZMQ Socket Error") @@ -609,7 +588,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): return responses[-1] -def _multi_send(method, context, topic, msg, timeout=None): +def _multi_send(method, context, topic, msg, timeout=None, serialize=True): """ Wraps the sending of messages, dispatches to the matchmaker and sends @@ -635,7 +614,7 @@ def _multi_send(method, context, topic, msg, timeout=None): if method.__name__ == '_cast': eventlet.spawn_n(method, _addr, context, - _topic, _topic, msg, timeout) + _topic, _topic, msg, timeout, serialize) return return method(_addr, context, _topic, _topic, msg, timeout) @@ -676,6 +655,7 @@ def notify(conf, context, topic, msg, **kwargs): # NOTE(ewindisch): dot-priority in rpc notifier does not # work with our assumptions. topic.replace('.', '-') + kwargs['serialize'] = False cast(conf, context, topic, msg, **kwargs) |
