summaryrefslogtreecommitdiffstats
path: root/openstack/common/rpc/impl_zmq.py
diff options
context:
space:
mode:
Diffstat (limited to 'openstack/common/rpc/impl_zmq.py')
-rw-r--r--openstack/common/rpc/impl_zmq.py50
1 files changed, 15 insertions, 35 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index 33562b0..880896b 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -28,7 +28,6 @@ import greenlet
from openstack.common import cfg
from openstack.common.gettextutils import _
from openstack.common import importutils
-from openstack.common import jsonutils
from openstack.common.rpc import common as rpc_common
@@ -77,27 +76,6 @@ ZMQ_CTX = None # ZeroMQ Context, must be global.
matchmaker = None # memoized matchmaker object
-def _serialize(data):
- """
- Serialization wrapper
- We prefer using JSON, but it cannot encode all types.
- Error if a developer passes us bad data.
- """
- try:
- return str(jsonutils.dumps(data, ensure_ascii=True))
- except TypeError:
- LOG.error(_("JSON serialization failed."))
- raise
-
-
-def _deserialize(data):
- """
- Deserialization wrapper
- """
- LOG.debug(_("Deserializing: %s"), data)
- return jsonutils.loads(data)
-
-
class ZmqSocket(object):
"""
A tiny wrapper around ZeroMQ to simplify the send/recv protocol
@@ -205,9 +183,10 @@ class ZmqClient(object):
def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
self.outq = ZmqSocket(addr, socket_type, bind=bind)
- def cast(self, msg_id, topic, data):
- self.outq.send([str(msg_id), str(topic), str('cast'),
- _serialize(data)])
+ def cast(self, msg_id, topic, data, serialize=True):
+ if serialize:
+ data = rpc_common.serialize_msg(data)
+ self.outq.send([str(msg_id), str(topic), str('cast'), data])
def close(self):
self.outq.close()
@@ -232,11 +211,11 @@ class RpcContext(rpc_common.CommonRpcContext):
@classmethod
def marshal(self, ctx):
ctx_data = ctx.to_dict()
- return _serialize(ctx_data)
+ return rpc_common.serialize_msg(ctx_data)
@classmethod
def unmarshal(self, data):
- return RpcContext.from_dict(_deserialize(data))
+ return RpcContext.from_dict(rpc_common.deserialize_msg(data))
class InternalContext(object):
@@ -433,11 +412,11 @@ class ZmqProxy(ZmqBaseReactor):
sock_type = zmq.PUB
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
- inside = _deserialize(in_msg)
+ inside = rpc_common.deserialize_msg(in_msg)
msg_id = inside[-1]['args']['msg_id']
response = inside[-1]['args']['response']
LOG.debug(_("->response->%s"), response)
- data = [str(msg_id), _serialize(response)]
+ data = [str(msg_id), rpc_common.serialize_msg(response)]
else:
sock_type = zmq.PUSH
@@ -480,7 +459,7 @@ class ZmqReactor(ZmqBaseReactor):
msg_id, topic, style, in_msg = data
- ctx, request = _deserialize(in_msg)
+ ctx, request = rpc_common.deserialize_msg(in_msg)
ctx = RpcContext.unmarshal(ctx)
proxy = self.proxies[sock]
@@ -531,7 +510,7 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread()
-def _cast(addr, context, msg_id, topic, msg, timeout=None):
+def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@@ -540,7 +519,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None):
conn = ZmqClient(addr)
# assumes cast can't return an exception
- conn.cast(msg_id, topic, payload)
+ conn.cast(msg_id, topic, payload, serialize)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
@@ -590,7 +569,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
- responses = _deserialize(msg[-1])
+ responses = rpc_common.deserialize_msg(msg[-1])
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")
@@ -609,7 +588,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
return responses[-1]
-def _multi_send(method, context, topic, msg, timeout=None):
+def _multi_send(method, context, topic, msg, timeout=None, serialize=True):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
@@ -635,7 +614,7 @@ def _multi_send(method, context, topic, msg, timeout=None):
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
- _topic, _topic, msg, timeout)
+ _topic, _topic, msg, timeout, serialize)
return
return method(_addr, context, _topic, _topic, msg, timeout)
@@ -676,6 +655,7 @@ def notify(conf, context, topic, msg, **kwargs):
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic.replace('.', '-')
+ kwargs['serialize'] = False
cast(conf, context, topic, msg, **kwargs)