diff options
author | Russell Bryant <rbryant@redhat.com> | 2012-12-05 11:31:00 -0500 |
---|---|---|
committer | Russell Bryant <rbryant@redhat.com> | 2013-01-02 12:47:32 -0500 |
commit | 77442689c676a7fb9ee5a277e498e8c3495346d9 (patch) | |
tree | c91b57967dd0d4a1f4cabbdc9fd52c05ef419f6b /openstack/common/rpc/impl_zmq.py | |
parent | ffeb0855085617095f19296770a1223cb5641d1c (diff) | |
download | oslo-77442689c676a7fb9ee5a277e498e8c3495346d9.tar.gz oslo-77442689c676a7fb9ee5a277e498e8c3495346d9.tar.xz oslo-77442689c676a7fb9ee5a277e498e8c3495346d9.zip |
Add a rpc envelope format version number.
This patch adds a message envelope that includes a envelope format
version number. This message envelope will allow us to embed
additional metadata later on, such as a signature for the message
payload.
Up to this point, we've deferred message serialization as a
responsibility of the messaging library we're using by passing it a
message as Python types and letting it deal with how to pass it over a
network. This patch adds json serialization in the rpc layer of the
application message payload before passing the message down into the
messaging library.
There are some benefits to be gained by doing a pass at serialization
ourselves. As an example, we occasionally hit serialization bugs that
only affect some messaging drivers. The kombu driver has always had a
nice advantage. It uses anyjson internally for serializing messages,
which we hook into using our jsonutils module. When there is a problem
serializing, we automatically use to_primitive() to fix it. This patch
allows all drivers to take advantage of this automatic message fix-up.
This also creates a convenient common hook point for messages coming in
and out of the system, regardless of the driver in use.
While this changes the base format of the messages sent between nodes,
it has been done in a backwards compatible manner. The new message
format will not be used by default. The idea is that all nodes will be
upgraded to a version that is capable of receiving the new format
(Grizzly) before switching it on. We will turn it on post-Grizzly.
Implement blueprint version-rpc-messages.
Change-Id: Ib6b2d11ca42abaa64c40986d72233e7048e504a0
Diffstat (limited to 'openstack/common/rpc/impl_zmq.py')
-rw-r--r-- | openstack/common/rpc/impl_zmq.py | 50 |
1 files changed, 15 insertions, 35 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index 33562b0..880896b 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -28,7 +28,6 @@ import greenlet from openstack.common import cfg from openstack.common.gettextutils import _ from openstack.common import importutils -from openstack.common import jsonutils from openstack.common.rpc import common as rpc_common @@ -77,27 +76,6 @@ ZMQ_CTX = None # ZeroMQ Context, must be global. matchmaker = None # memoized matchmaker object -def _serialize(data): - """ - Serialization wrapper - We prefer using JSON, but it cannot encode all types. - Error if a developer passes us bad data. - """ - try: - return str(jsonutils.dumps(data, ensure_ascii=True)) - except TypeError: - LOG.error(_("JSON serialization failed.")) - raise - - -def _deserialize(data): - """ - Deserialization wrapper - """ - LOG.debug(_("Deserializing: %s"), data) - return jsonutils.loads(data) - - class ZmqSocket(object): """ A tiny wrapper around ZeroMQ to simplify the send/recv protocol @@ -205,9 +183,10 @@ class ZmqClient(object): def __init__(self, addr, socket_type=zmq.PUSH, bind=False): self.outq = ZmqSocket(addr, socket_type, bind=bind) - def cast(self, msg_id, topic, data): - self.outq.send([str(msg_id), str(topic), str('cast'), - _serialize(data)]) + def cast(self, msg_id, topic, data, serialize=True): + if serialize: + data = rpc_common.serialize_msg(data) + self.outq.send([str(msg_id), str(topic), str('cast'), data]) def close(self): self.outq.close() @@ -232,11 +211,11 @@ class RpcContext(rpc_common.CommonRpcContext): @classmethod def marshal(self, ctx): ctx_data = ctx.to_dict() - return _serialize(ctx_data) + return rpc_common.serialize_msg(ctx_data) @classmethod def unmarshal(self, data): - return RpcContext.from_dict(_deserialize(data)) + return RpcContext.from_dict(rpc_common.deserialize_msg(data)) class InternalContext(object): @@ -433,11 +412,11 @@ class ZmqProxy(ZmqBaseReactor): sock_type = zmq.PUB elif topic.startswith('zmq_replies'): sock_type = zmq.PUB - inside = _deserialize(in_msg) + inside = rpc_common.deserialize_msg(in_msg) msg_id = inside[-1]['args']['msg_id'] response = inside[-1]['args']['response'] LOG.debug(_("->response->%s"), response) - data = [str(msg_id), _serialize(response)] + data = [str(msg_id), rpc_common.serialize_msg(response)] else: sock_type = zmq.PUSH @@ -480,7 +459,7 @@ class ZmqReactor(ZmqBaseReactor): msg_id, topic, style, in_msg = data - ctx, request = _deserialize(in_msg) + ctx, request = rpc_common.deserialize_msg(in_msg) ctx = RpcContext.unmarshal(ctx) proxy = self.proxies[sock] @@ -531,7 +510,7 @@ class Connection(rpc_common.Connection): self.reactor.consume_in_thread() -def _cast(addr, context, msg_id, topic, msg, timeout=None): +def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True): timeout_cast = timeout or CONF.rpc_cast_timeout payload = [RpcContext.marshal(context), msg] @@ -540,7 +519,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None): conn = ZmqClient(addr) # assumes cast can't return an exception - conn.cast(msg_id, topic, payload) + conn.cast(msg_id, topic, payload, serialize) except zmq.ZMQError: raise RPCException("Cast failed. ZMQ Socket Exception") finally: @@ -590,7 +569,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): msg = msg_waiter.recv() LOG.debug(_("Received message: %s"), msg) LOG.debug(_("Unpacking response")) - responses = _deserialize(msg[-1]) + responses = rpc_common.deserialize_msg(msg[-1]) # ZMQError trumps the Timeout error. except zmq.ZMQError: raise RPCException("ZMQ Socket Error") @@ -609,7 +588,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): return responses[-1] -def _multi_send(method, context, topic, msg, timeout=None): +def _multi_send(method, context, topic, msg, timeout=None, serialize=True): """ Wraps the sending of messages, dispatches to the matchmaker and sends @@ -635,7 +614,7 @@ def _multi_send(method, context, topic, msg, timeout=None): if method.__name__ == '_cast': eventlet.spawn_n(method, _addr, context, - _topic, _topic, msg, timeout) + _topic, _topic, msg, timeout, serialize) return return method(_addr, context, _topic, _topic, msg, timeout) @@ -676,6 +655,7 @@ def notify(conf, context, topic, msg, **kwargs): # NOTE(ewindisch): dot-priority in rpc notifier does not # work with our assumptions. topic.replace('.', '-') + kwargs['serialize'] = False cast(conf, context, topic, msg, **kwargs) |