diff options
-rw-r--r-- | openstack/common/rpc/amqp.py | 16 | ||||
-rw-r--r-- | openstack/common/rpc/common.py | 118 | ||||
-rw-r--r-- | openstack/common/rpc/dispatcher.py | 18 | ||||
-rw-r--r-- | openstack/common/rpc/impl_kombu.py | 3 | ||||
-rw-r--r-- | openstack/common/rpc/impl_qpid.py | 3 | ||||
-rw-r--r-- | openstack/common/rpc/impl_zmq.py | 50 | ||||
-rw-r--r-- | tests/unit/rpc/test_common.py | 29 | ||||
-rw-r--r-- | tests/unit/rpc/test_kombu.py | 14 |
8 files changed, 185 insertions, 66 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index 00b2f24..ce4c028 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -168,7 +168,7 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, 'failure': failure} if ending: msg['ending'] = True - conn.direct_send(msg_id, msg) + conn.direct_send(msg_id, rpc_common.serialize_msg(msg)) class RpcContext(rpc_common.CommonRpcContext): @@ -364,7 +364,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool): conn = ConnectionContext(conf, connection_pool) wait_msg = MulticallWaiter(conf, conn, timeout) conn.declare_direct_consumer(msg_id, wait_msg) - conn.topic_send(topic, msg) + conn.topic_send(topic, rpc_common.serialize_msg(msg)) return wait_msg @@ -383,7 +383,7 @@ def cast(conf, context, topic, msg, connection_pool): LOG.debug(_('Making asynchronous cast on %s...'), topic) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: - conn.topic_send(topic, msg) + conn.topic_send(topic, rpc_common.serialize_msg(msg)) def fanout_cast(conf, context, topic, msg, connection_pool): @@ -391,7 +391,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool): LOG.debug(_('Making asynchronous fanout cast...')) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: - conn.fanout_send(topic, msg) + conn.fanout_send(topic, rpc_common.serialize_msg(msg)) def cast_to_server(conf, context, server_params, topic, msg, connection_pool): @@ -399,7 +399,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool): pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: - conn.topic_send(topic, msg) + conn.topic_send(topic, rpc_common.serialize_msg(msg)) def fanout_cast_to_server(conf, context, server_params, topic, msg, @@ -408,7 +408,7 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg, pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: - conn.fanout_send(topic, msg) + conn.fanout_send(topic, rpc_common.serialize_msg(msg)) def notify(conf, context, topic, msg, connection_pool): @@ -418,6 +418,10 @@ def notify(conf, context, topic, msg, connection_pool): topic=topic)) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: + # NOTE(russellb): We are explicitly *NOT* using serialize_msg() here. + # The messages sent out as notifications are intended to be consumed by + # 3rd party applications. The notification producer is entirely + # responsible for the message content and versioning. conn.notify_send(topic, msg) diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py index efdf26f..a3a3699 100644 --- a/openstack/common/rpc/common.py +++ b/openstack/common/rpc/common.py @@ -21,6 +21,7 @@ import copy import sys import traceback +from openstack.common import cfg from openstack.common.gettextutils import _ from openstack.common import importutils from openstack.common import jsonutils @@ -28,9 +29,50 @@ from openstack.common import local from openstack.common import log as logging +CONF = cfg.CONF LOG = logging.getLogger(__name__) +'''RPC Envelope Version. + +This version number applies to the top level structure of messages sent out. +It does *not* apply to the message payload, which must be versioned +independently. For example, when using rpc APIs, a version number is applied +for changes to the API being exposed over rpc. This version number is handled +in the rpc proxy and dispatcher modules. + +This version number applies to the message envelope that is used in the +serialization done inside the rpc layer. See serialize_msg() and +deserialize_msg(). + +The current message format (version 2.0) is very simple. It is: + + { + 'oslo.version': <RPC Envelope Version as a String>, + 'oslo.message': <Application Message Payload, JSON encoded> + } + +Message format version '1.0' is just considered to be the messages we sent +without a message envelope. + +So, the current message envelope just includes the envelope version. It may +eventually contain additional information, such as a signature for the message +payload. + +We will JSON encode the application message payload. The message envelope, +which includes the JSON encoded application message body, will be passed down +to the messaging libraries as a dict. +''' +_RPC_ENVELOPE_VERSION = '2.0' + +_VERSION_KEY = 'oslo.version' +_MESSAGE_KEY = 'oslo.message' + + +# TODO(russellb) Turn this on after Grizzly. +_SEND_RPC_ENVELOPE = False + + class RPCException(Exception): message = _("An unknown RPC related exception occurred.") @@ -91,6 +133,11 @@ class UnsupportedRpcVersion(RPCException): "this endpoint.") +class UnsupportedRpcEnvelopeVersion(RPCException): + message = _("Specified RPC envelope version, %(version)s, " + "not supported by this endpoint.") + + class Connection(object): """A connection, returned by rpc.create_connection(). @@ -344,3 +391,74 @@ def client_exceptions(*exceptions): return catch_client_exception(exceptions, func, *args, **kwargs) return inner return outer + + +def version_is_compatible(imp_version, version): + """Determine whether versions are compatible. + + :param imp_version: The version implemented + :param version: The version requested by an incoming message. + """ + version_parts = version.split('.') + imp_version_parts = imp_version.split('.') + if int(version_parts[0]) != int(imp_version_parts[0]): # Major + return False + if int(version_parts[1]) > int(imp_version_parts[1]): # Minor + return False + return True + + +def serialize_msg(raw_msg): + if not _SEND_RPC_ENVELOPE: + return raw_msg + + # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more + # information about this format. + msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION, + _MESSAGE_KEY: jsonutils.dumps(raw_msg)} + + return msg + + +def deserialize_msg(msg): + # NOTE(russellb): Hang on to your hats, this road is about to + # get a little bumpy. + # + # Robustness Principle: + # "Be strict in what you send, liberal in what you accept." + # + # At this point we have to do a bit of guessing about what it + # is we just received. Here is the set of possibilities: + # + # 1) We received a dict. This could be 2 things: + # + # a) Inspect it to see if it looks like a standard message envelope. + # If so, great! + # + # b) If it doesn't look like a standard message envelope, it could either + # be a notification (which we don't wrap), or a message from before + # we added a message envelope (referred to as version 1.0). + # Just return the message as-is. + # + # 2) It's any other non-dict type. Just return it and hope for the best. + # This case covers return values from rpc.call() from before message + # envelopes were used. (messages to call a method were always a dict) + + if not isinstance(msg, dict): + # See #2 above. + return msg + + base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY) + if not all(map(lambda key: key in msg, base_envelope_keys)): + # See #1.b above. + return msg + + # At this point we think we have the message envelope + # format we were expecting. (#1.a above) + + if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]): + raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY]) + + raw_msg = jsonutils.loads(msg[_MESSAGE_KEY]) + + return raw_msg diff --git a/openstack/common/rpc/dispatcher.py b/openstack/common/rpc/dispatcher.py index f09879c..0adc4ef 100644 --- a/openstack/common/rpc/dispatcher.py +++ b/openstack/common/rpc/dispatcher.py @@ -103,21 +103,6 @@ class RpcDispatcher(object): self.callbacks = callbacks super(RpcDispatcher, self).__init__() - @staticmethod - def _is_compatible(mversion, version): - """Determine whether versions are compatible. - - :param mversion: The API version implemented by a callback. - :param version: The API version requested by an incoming message. - """ - version_parts = version.split('.') - mversion_parts = mversion.split('.') - if int(version_parts[0]) != int(mversion_parts[0]): # Major - return False - if int(version_parts[1]) > int(mversion_parts[1]): # Minor - return False - return True - def dispatch(self, ctxt, version, method, **kwargs): """Dispatch a message based on a requested version. @@ -139,7 +124,8 @@ class RpcDispatcher(object): rpc_api_version = proxyobj.RPC_API_VERSION else: rpc_api_version = '1.0' - is_compatible = self._is_compatible(rpc_api_version, version) + is_compatible = rpc_common.version_is_compatible(rpc_api_version, + version) had_compatible = had_compatible or is_compatible if not hasattr(proxyobj, method): continue diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index ced4e96..cdbba4e 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -162,7 +162,8 @@ class ConsumerBase(object): def _callback(raw_message): message = self.channel.message_to_python(raw_message) try: - callback(message.payload) + msg = rpc_common.deserialize_msg(message.payload) + callback(msg) message.ack() except Exception: LOG.exception(_("Failed to process message... skipping it.")) diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index 7f21461..a6dd983 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -124,7 +124,8 @@ class ConsumerBase(object): """Fetch the message and pass it to the callback object""" message = self.receiver.fetch() try: - self.callback(message.content) + msg = rpc_common.deserialize_msg(message.content) + self.callback(msg) except Exception: LOG.exception(_("Failed to process message... skipping it.")) finally: diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index 33562b0..880896b 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -28,7 +28,6 @@ import greenlet from openstack.common import cfg from openstack.common.gettextutils import _ from openstack.common import importutils -from openstack.common import jsonutils from openstack.common.rpc import common as rpc_common @@ -77,27 +76,6 @@ ZMQ_CTX = None # ZeroMQ Context, must be global. matchmaker = None # memoized matchmaker object -def _serialize(data): - """ - Serialization wrapper - We prefer using JSON, but it cannot encode all types. - Error if a developer passes us bad data. - """ - try: - return str(jsonutils.dumps(data, ensure_ascii=True)) - except TypeError: - LOG.error(_("JSON serialization failed.")) - raise - - -def _deserialize(data): - """ - Deserialization wrapper - """ - LOG.debug(_("Deserializing: %s"), data) - return jsonutils.loads(data) - - class ZmqSocket(object): """ A tiny wrapper around ZeroMQ to simplify the send/recv protocol @@ -205,9 +183,10 @@ class ZmqClient(object): def __init__(self, addr, socket_type=zmq.PUSH, bind=False): self.outq = ZmqSocket(addr, socket_type, bind=bind) - def cast(self, msg_id, topic, data): - self.outq.send([str(msg_id), str(topic), str('cast'), - _serialize(data)]) + def cast(self, msg_id, topic, data, serialize=True): + if serialize: + data = rpc_common.serialize_msg(data) + self.outq.send([str(msg_id), str(topic), str('cast'), data]) def close(self): self.outq.close() @@ -232,11 +211,11 @@ class RpcContext(rpc_common.CommonRpcContext): @classmethod def marshal(self, ctx): ctx_data = ctx.to_dict() - return _serialize(ctx_data) + return rpc_common.serialize_msg(ctx_data) @classmethod def unmarshal(self, data): - return RpcContext.from_dict(_deserialize(data)) + return RpcContext.from_dict(rpc_common.deserialize_msg(data)) class InternalContext(object): @@ -433,11 +412,11 @@ class ZmqProxy(ZmqBaseReactor): sock_type = zmq.PUB elif topic.startswith('zmq_replies'): sock_type = zmq.PUB - inside = _deserialize(in_msg) + inside = rpc_common.deserialize_msg(in_msg) msg_id = inside[-1]['args']['msg_id'] response = inside[-1]['args']['response'] LOG.debug(_("->response->%s"), response) - data = [str(msg_id), _serialize(response)] + data = [str(msg_id), rpc_common.serialize_msg(response)] else: sock_type = zmq.PUSH @@ -480,7 +459,7 @@ class ZmqReactor(ZmqBaseReactor): msg_id, topic, style, in_msg = data - ctx, request = _deserialize(in_msg) + ctx, request = rpc_common.deserialize_msg(in_msg) ctx = RpcContext.unmarshal(ctx) proxy = self.proxies[sock] @@ -531,7 +510,7 @@ class Connection(rpc_common.Connection): self.reactor.consume_in_thread() -def _cast(addr, context, msg_id, topic, msg, timeout=None): +def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True): timeout_cast = timeout or CONF.rpc_cast_timeout payload = [RpcContext.marshal(context), msg] @@ -540,7 +519,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None): conn = ZmqClient(addr) # assumes cast can't return an exception - conn.cast(msg_id, topic, payload) + conn.cast(msg_id, topic, payload, serialize) except zmq.ZMQError: raise RPCException("Cast failed. ZMQ Socket Exception") finally: @@ -590,7 +569,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): msg = msg_waiter.recv() LOG.debug(_("Received message: %s"), msg) LOG.debug(_("Unpacking response")) - responses = _deserialize(msg[-1]) + responses = rpc_common.deserialize_msg(msg[-1]) # ZMQError trumps the Timeout error. except zmq.ZMQError: raise RPCException("ZMQ Socket Error") @@ -609,7 +588,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): return responses[-1] -def _multi_send(method, context, topic, msg, timeout=None): +def _multi_send(method, context, topic, msg, timeout=None, serialize=True): """ Wraps the sending of messages, dispatches to the matchmaker and sends @@ -635,7 +614,7 @@ def _multi_send(method, context, topic, msg, timeout=None): if method.__name__ == '_cast': eventlet.spawn_n(method, _addr, context, - _topic, _topic, msg, timeout) + _topic, _topic, msg, timeout, serialize) return return method(_addr, context, _topic, _topic, msg, timeout) @@ -676,6 +655,7 @@ def notify(conf, context, topic, msg, **kwargs): # NOTE(ewindisch): dot-priority in rpc notifier does not # work with our assumptions. topic.replace('.', '-') + kwargs['serialize'] = False cast(conf, context, topic, msg, **kwargs) diff --git a/tests/unit/rpc/test_common.py b/tests/unit/rpc/test_common.py index 6f8b7ff..78f4b93 100644 --- a/tests/unit/rpc/test_common.py +++ b/tests/unit/rpc/test_common.py @@ -236,3 +236,32 @@ class RpcCommonTestCase(test_utils.BaseTestCase): self.assertRaises(rpc_common.ClientException, naughty) self.assertRaises(ValueError, really_naughty) + + def test_serialize_msg_v1(self): + self.stubs.Set(rpc_common, '_SEND_RPC_ENVELOPE', False) + msg = {'foo': 'bar'} + self.assertEqual(msg, rpc_common.serialize_msg(msg)) + + def test_serialize_msg_v2(self): + self.stubs.Set(rpc_common, '_SEND_RPC_ENVELOPE', True) + msg = {'foo': 'bar'} + s_msg = {'oslo.version': rpc_common._RPC_ENVELOPE_VERSION, + 'oslo.message': jsonutils.dumps(msg)} + serialized = rpc_common.serialize_msg(msg) + + self.assertEqual(s_msg, rpc_common.serialize_msg(msg)) + + self.assertEqual(msg, rpc_common.deserialize_msg(serialized)) + + def test_deserialize_msg_no_envelope(self): + self.assertEqual(1, rpc_common.deserialize_msg(1)) + self.assertEqual([], rpc_common.deserialize_msg([])) + self.assertEqual({}, rpc_common.deserialize_msg({})) + self.assertEqual('foo', rpc_common.deserialize_msg('foo')) + + def test_deserialize_msg_bad_version(self): + s_msg = {'oslo.version': '8675309.0', + 'oslo.message': 'whatever'} + + self.assertRaises(rpc_common.UnsupportedRpcEnvelopeVersion, + rpc_common.deserialize_msg, s_msg) diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py index 5da05de..921415d 100644 --- a/tests/unit/rpc/test_kombu.py +++ b/tests/unit/rpc/test_kombu.py @@ -118,7 +118,7 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): self.received_message = message conn.declare_topic_consumer('a_topic', _callback) - conn.topic_send('a_topic', message) + conn.topic_send('a_topic', rpc_common.serialize_msg(message)) conn.consume(limit=1) conn.close() @@ -138,7 +138,7 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): conn.declare_topic_consumer('a_topic', _callback, exchange_name="foorbar") - conn.topic_send('a_topic', message) + conn.topic_send('a_topic', rpc_common.serialize_msg(message)) conn.consume(limit=1) conn.close() @@ -162,7 +162,7 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): conn.declare_topic_consumer('a_topic', _callback1, queue_name='queue1') conn.declare_topic_consumer('a_topic', _callback2, queue_name='queue2') - conn.topic_send('a_topic', message) + conn.topic_send('a_topic', rpc_common.serialize_msg(message)) conn.consume(limit=2) conn.close() @@ -192,7 +192,7 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): exchange_name="abc") conn.declare_topic_consumer('a_topic', _callback2, queue_name='queue2', exchange_name="abc") - conn.topic_send('a_topic', message) + conn.topic_send('a_topic', rpc_common.serialize_msg(message)) conn.consume(limit=2) conn.close() @@ -222,7 +222,7 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): exchange_name="abc") conn.declare_topic_consumer('a_topic', _callback2, queue_name='queue2', exchange_name="def") - conn.topic_send('a_topic', message) + conn.topic_send('a_topic', rpc_common.serialize_msg(message)) conn.consume(limit=2) conn.close() @@ -241,7 +241,7 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): self.received_message = message conn.declare_direct_consumer('a_direct', _callback) - conn.direct_send('a_direct', message) + conn.direct_send('a_direct', rpc_common.serialize_msg(message)) conn.consume(limit=1) conn.close() @@ -438,7 +438,7 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): self.received_message = message conn.declare_direct_consumer('a_direct', _callback) - conn.direct_send('a_direct', message) + conn.direct_send('a_direct', rpc_common.serialize_msg(message)) info = _raise_exc_stub(self.stubs, 1, conn.connection, 'drain_events', 'foo timeout foo') |