summaryrefslogtreecommitdiffstats
path: root/openstack/common/rpc
diff options
context:
space:
mode:
authorRussell Bryant <rbryant@redhat.com>2012-12-05 11:31:00 -0500
committerRussell Bryant <rbryant@redhat.com>2013-01-02 12:47:32 -0500
commit77442689c676a7fb9ee5a277e498e8c3495346d9 (patch)
treec91b57967dd0d4a1f4cabbdc9fd52c05ef419f6b /openstack/common/rpc
parentffeb0855085617095f19296770a1223cb5641d1c (diff)
downloadoslo-77442689c676a7fb9ee5a277e498e8c3495346d9.tar.gz
oslo-77442689c676a7fb9ee5a277e498e8c3495346d9.tar.xz
oslo-77442689c676a7fb9ee5a277e498e8c3495346d9.zip
Add a rpc envelope format version number.
This patch adds a message envelope that includes a envelope format version number. This message envelope will allow us to embed additional metadata later on, such as a signature for the message payload. Up to this point, we've deferred message serialization as a responsibility of the messaging library we're using by passing it a message as Python types and letting it deal with how to pass it over a network. This patch adds json serialization in the rpc layer of the application message payload before passing the message down into the messaging library. There are some benefits to be gained by doing a pass at serialization ourselves. As an example, we occasionally hit serialization bugs that only affect some messaging drivers. The kombu driver has always had a nice advantage. It uses anyjson internally for serializing messages, which we hook into using our jsonutils module. When there is a problem serializing, we automatically use to_primitive() to fix it. This patch allows all drivers to take advantage of this automatic message fix-up. This also creates a convenient common hook point for messages coming in and out of the system, regardless of the driver in use. While this changes the base format of the messages sent between nodes, it has been done in a backwards compatible manner. The new message format will not be used by default. The idea is that all nodes will be upgraded to a version that is capable of receiving the new format (Grizzly) before switching it on. We will turn it on post-Grizzly. Implement blueprint version-rpc-messages. Change-Id: Ib6b2d11ca42abaa64c40986d72233e7048e504a0
Diffstat (limited to 'openstack/common/rpc')
-rw-r--r--openstack/common/rpc/amqp.py16
-rw-r--r--openstack/common/rpc/common.py118
-rw-r--r--openstack/common/rpc/dispatcher.py18
-rw-r--r--openstack/common/rpc/impl_kombu.py3
-rw-r--r--openstack/common/rpc/impl_qpid.py3
-rw-r--r--openstack/common/rpc/impl_zmq.py50
6 files changed, 149 insertions, 59 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index 00b2f24..ce4c028 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -168,7 +168,7 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
'failure': failure}
if ending:
msg['ending'] = True
- conn.direct_send(msg_id, msg)
+ conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
class RpcContext(rpc_common.CommonRpcContext):
@@ -364,7 +364,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
- conn.topic_send(topic, msg)
+ conn.topic_send(topic, rpc_common.serialize_msg(msg))
return wait_msg
@@ -383,7 +383,7 @@ def cast(conf, context, topic, msg, connection_pool):
LOG.debug(_('Making asynchronous cast on %s...'), topic)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
- conn.topic_send(topic, msg)
+ conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast(conf, context, topic, msg, connection_pool):
@@ -391,7 +391,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool):
LOG.debug(_('Making asynchronous fanout cast...'))
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
- conn.fanout_send(topic, msg)
+ conn.fanout_send(topic, rpc_common.serialize_msg(msg))
def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
@@ -399,7 +399,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
- conn.topic_send(topic, msg)
+ conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast_to_server(conf, context, server_params, topic, msg,
@@ -408,7 +408,7 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg,
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
- conn.fanout_send(topic, msg)
+ conn.fanout_send(topic, rpc_common.serialize_msg(msg))
def notify(conf, context, topic, msg, connection_pool):
@@ -418,6 +418,10 @@ def notify(conf, context, topic, msg, connection_pool):
topic=topic))
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
+ # NOTE(russellb): We are explicitly *NOT* using serialize_msg() here.
+ # The messages sent out as notifications are intended to be consumed by
+ # 3rd party applications. The notification producer is entirely
+ # responsible for the message content and versioning.
conn.notify_send(topic, msg)
diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py
index efdf26f..a3a3699 100644
--- a/openstack/common/rpc/common.py
+++ b/openstack/common/rpc/common.py
@@ -21,6 +21,7 @@ import copy
import sys
import traceback
+from openstack.common import cfg
from openstack.common.gettextutils import _
from openstack.common import importutils
from openstack.common import jsonutils
@@ -28,9 +29,50 @@ from openstack.common import local
from openstack.common import log as logging
+CONF = cfg.CONF
LOG = logging.getLogger(__name__)
+'''RPC Envelope Version.
+
+This version number applies to the top level structure of messages sent out.
+It does *not* apply to the message payload, which must be versioned
+independently. For example, when using rpc APIs, a version number is applied
+for changes to the API being exposed over rpc. This version number is handled
+in the rpc proxy and dispatcher modules.
+
+This version number applies to the message envelope that is used in the
+serialization done inside the rpc layer. See serialize_msg() and
+deserialize_msg().
+
+The current message format (version 2.0) is very simple. It is:
+
+ {
+ 'oslo.version': <RPC Envelope Version as a String>,
+ 'oslo.message': <Application Message Payload, JSON encoded>
+ }
+
+Message format version '1.0' is just considered to be the messages we sent
+without a message envelope.
+
+So, the current message envelope just includes the envelope version. It may
+eventually contain additional information, such as a signature for the message
+payload.
+
+We will JSON encode the application message payload. The message envelope,
+which includes the JSON encoded application message body, will be passed down
+to the messaging libraries as a dict.
+'''
+_RPC_ENVELOPE_VERSION = '2.0'
+
+_VERSION_KEY = 'oslo.version'
+_MESSAGE_KEY = 'oslo.message'
+
+
+# TODO(russellb) Turn this on after Grizzly.
+_SEND_RPC_ENVELOPE = False
+
+
class RPCException(Exception):
message = _("An unknown RPC related exception occurred.")
@@ -91,6 +133,11 @@ class UnsupportedRpcVersion(RPCException):
"this endpoint.")
+class UnsupportedRpcEnvelopeVersion(RPCException):
+ message = _("Specified RPC envelope version, %(version)s, "
+ "not supported by this endpoint.")
+
+
class Connection(object):
"""A connection, returned by rpc.create_connection().
@@ -344,3 +391,74 @@ def client_exceptions(*exceptions):
return catch_client_exception(exceptions, func, *args, **kwargs)
return inner
return outer
+
+
+def version_is_compatible(imp_version, version):
+ """Determine whether versions are compatible.
+
+ :param imp_version: The version implemented
+ :param version: The version requested by an incoming message.
+ """
+ version_parts = version.split('.')
+ imp_version_parts = imp_version.split('.')
+ if int(version_parts[0]) != int(imp_version_parts[0]): # Major
+ return False
+ if int(version_parts[1]) > int(imp_version_parts[1]): # Minor
+ return False
+ return True
+
+
+def serialize_msg(raw_msg):
+ if not _SEND_RPC_ENVELOPE:
+ return raw_msg
+
+ # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
+ # information about this format.
+ msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
+ _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
+
+ return msg
+
+
+def deserialize_msg(msg):
+ # NOTE(russellb): Hang on to your hats, this road is about to
+ # get a little bumpy.
+ #
+ # Robustness Principle:
+ # "Be strict in what you send, liberal in what you accept."
+ #
+ # At this point we have to do a bit of guessing about what it
+ # is we just received. Here is the set of possibilities:
+ #
+ # 1) We received a dict. This could be 2 things:
+ #
+ # a) Inspect it to see if it looks like a standard message envelope.
+ # If so, great!
+ #
+ # b) If it doesn't look like a standard message envelope, it could either
+ # be a notification (which we don't wrap), or a message from before
+ # we added a message envelope (referred to as version 1.0).
+ # Just return the message as-is.
+ #
+ # 2) It's any other non-dict type. Just return it and hope for the best.
+ # This case covers return values from rpc.call() from before message
+ # envelopes were used. (messages to call a method were always a dict)
+
+ if not isinstance(msg, dict):
+ # See #2 above.
+ return msg
+
+ base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
+ if not all(map(lambda key: key in msg, base_envelope_keys)):
+ # See #1.b above.
+ return msg
+
+ # At this point we think we have the message envelope
+ # format we were expecting. (#1.a above)
+
+ if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
+ raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
+
+ raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
+
+ return raw_msg
diff --git a/openstack/common/rpc/dispatcher.py b/openstack/common/rpc/dispatcher.py
index f09879c..0adc4ef 100644
--- a/openstack/common/rpc/dispatcher.py
+++ b/openstack/common/rpc/dispatcher.py
@@ -103,21 +103,6 @@ class RpcDispatcher(object):
self.callbacks = callbacks
super(RpcDispatcher, self).__init__()
- @staticmethod
- def _is_compatible(mversion, version):
- """Determine whether versions are compatible.
-
- :param mversion: The API version implemented by a callback.
- :param version: The API version requested by an incoming message.
- """
- version_parts = version.split('.')
- mversion_parts = mversion.split('.')
- if int(version_parts[0]) != int(mversion_parts[0]): # Major
- return False
- if int(version_parts[1]) > int(mversion_parts[1]): # Minor
- return False
- return True
-
def dispatch(self, ctxt, version, method, **kwargs):
"""Dispatch a message based on a requested version.
@@ -139,7 +124,8 @@ class RpcDispatcher(object):
rpc_api_version = proxyobj.RPC_API_VERSION
else:
rpc_api_version = '1.0'
- is_compatible = self._is_compatible(rpc_api_version, version)
+ is_compatible = rpc_common.version_is_compatible(rpc_api_version,
+ version)
had_compatible = had_compatible or is_compatible
if not hasattr(proxyobj, method):
continue
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index ced4e96..cdbba4e 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -162,7 +162,8 @@ class ConsumerBase(object):
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
try:
- callback(message.payload)
+ msg = rpc_common.deserialize_msg(message.payload)
+ callback(msg)
message.ack()
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index 7f21461..a6dd983 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -124,7 +124,8 @@ class ConsumerBase(object):
"""Fetch the message and pass it to the callback object"""
message = self.receiver.fetch()
try:
- self.callback(message.content)
+ msg = rpc_common.deserialize_msg(message.content)
+ self.callback(msg)
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index 33562b0..880896b 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -28,7 +28,6 @@ import greenlet
from openstack.common import cfg
from openstack.common.gettextutils import _
from openstack.common import importutils
-from openstack.common import jsonutils
from openstack.common.rpc import common as rpc_common
@@ -77,27 +76,6 @@ ZMQ_CTX = None # ZeroMQ Context, must be global.
matchmaker = None # memoized matchmaker object
-def _serialize(data):
- """
- Serialization wrapper
- We prefer using JSON, but it cannot encode all types.
- Error if a developer passes us bad data.
- """
- try:
- return str(jsonutils.dumps(data, ensure_ascii=True))
- except TypeError:
- LOG.error(_("JSON serialization failed."))
- raise
-
-
-def _deserialize(data):
- """
- Deserialization wrapper
- """
- LOG.debug(_("Deserializing: %s"), data)
- return jsonutils.loads(data)
-
-
class ZmqSocket(object):
"""
A tiny wrapper around ZeroMQ to simplify the send/recv protocol
@@ -205,9 +183,10 @@ class ZmqClient(object):
def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
self.outq = ZmqSocket(addr, socket_type, bind=bind)
- def cast(self, msg_id, topic, data):
- self.outq.send([str(msg_id), str(topic), str('cast'),
- _serialize(data)])
+ def cast(self, msg_id, topic, data, serialize=True):
+ if serialize:
+ data = rpc_common.serialize_msg(data)
+ self.outq.send([str(msg_id), str(topic), str('cast'), data])
def close(self):
self.outq.close()
@@ -232,11 +211,11 @@ class RpcContext(rpc_common.CommonRpcContext):
@classmethod
def marshal(self, ctx):
ctx_data = ctx.to_dict()
- return _serialize(ctx_data)
+ return rpc_common.serialize_msg(ctx_data)
@classmethod
def unmarshal(self, data):
- return RpcContext.from_dict(_deserialize(data))
+ return RpcContext.from_dict(rpc_common.deserialize_msg(data))
class InternalContext(object):
@@ -433,11 +412,11 @@ class ZmqProxy(ZmqBaseReactor):
sock_type = zmq.PUB
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
- inside = _deserialize(in_msg)
+ inside = rpc_common.deserialize_msg(in_msg)
msg_id = inside[-1]['args']['msg_id']
response = inside[-1]['args']['response']
LOG.debug(_("->response->%s"), response)
- data = [str(msg_id), _serialize(response)]
+ data = [str(msg_id), rpc_common.serialize_msg(response)]
else:
sock_type = zmq.PUSH
@@ -480,7 +459,7 @@ class ZmqReactor(ZmqBaseReactor):
msg_id, topic, style, in_msg = data
- ctx, request = _deserialize(in_msg)
+ ctx, request = rpc_common.deserialize_msg(in_msg)
ctx = RpcContext.unmarshal(ctx)
proxy = self.proxies[sock]
@@ -531,7 +510,7 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread()
-def _cast(addr, context, msg_id, topic, msg, timeout=None):
+def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@@ -540,7 +519,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None):
conn = ZmqClient(addr)
# assumes cast can't return an exception
- conn.cast(msg_id, topic, payload)
+ conn.cast(msg_id, topic, payload, serialize)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
@@ -590,7 +569,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
- responses = _deserialize(msg[-1])
+ responses = rpc_common.deserialize_msg(msg[-1])
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")
@@ -609,7 +588,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
return responses[-1]
-def _multi_send(method, context, topic, msg, timeout=None):
+def _multi_send(method, context, topic, msg, timeout=None, serialize=True):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
@@ -635,7 +614,7 @@ def _multi_send(method, context, topic, msg, timeout=None):
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
- _topic, _topic, msg, timeout)
+ _topic, _topic, msg, timeout, serialize)
return
return method(_addr, context, _topic, _topic, msg, timeout)
@@ -676,6 +655,7 @@ def notify(conf, context, topic, msg, **kwargs):
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic.replace('.', '-')
+ kwargs['serialize'] = False
cast(conf, context, topic, msg, **kwargs)