summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--openstack/common/rpc/amqp.py16
-rw-r--r--openstack/common/rpc/common.py118
-rw-r--r--openstack/common/rpc/dispatcher.py18
-rw-r--r--openstack/common/rpc/impl_kombu.py3
-rw-r--r--openstack/common/rpc/impl_qpid.py3
-rw-r--r--openstack/common/rpc/impl_zmq.py50
-rw-r--r--tests/unit/rpc/test_common.py29
-rw-r--r--tests/unit/rpc/test_kombu.py14
8 files changed, 185 insertions, 66 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index 00b2f24..ce4c028 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -168,7 +168,7 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
'failure': failure}
if ending:
msg['ending'] = True
- conn.direct_send(msg_id, msg)
+ conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
class RpcContext(rpc_common.CommonRpcContext):
@@ -364,7 +364,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
- conn.topic_send(topic, msg)
+ conn.topic_send(topic, rpc_common.serialize_msg(msg))
return wait_msg
@@ -383,7 +383,7 @@ def cast(conf, context, topic, msg, connection_pool):
LOG.debug(_('Making asynchronous cast on %s...'), topic)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
- conn.topic_send(topic, msg)
+ conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast(conf, context, topic, msg, connection_pool):
@@ -391,7 +391,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool):
LOG.debug(_('Making asynchronous fanout cast...'))
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
- conn.fanout_send(topic, msg)
+ conn.fanout_send(topic, rpc_common.serialize_msg(msg))
def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
@@ -399,7 +399,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
- conn.topic_send(topic, msg)
+ conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast_to_server(conf, context, server_params, topic, msg,
@@ -408,7 +408,7 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg,
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
- conn.fanout_send(topic, msg)
+ conn.fanout_send(topic, rpc_common.serialize_msg(msg))
def notify(conf, context, topic, msg, connection_pool):
@@ -418,6 +418,10 @@ def notify(conf, context, topic, msg, connection_pool):
topic=topic))
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
+ # NOTE(russellb): We are explicitly *NOT* using serialize_msg() here.
+ # The messages sent out as notifications are intended to be consumed by
+ # 3rd party applications. The notification producer is entirely
+ # responsible for the message content and versioning.
conn.notify_send(topic, msg)
diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py
index efdf26f..a3a3699 100644
--- a/openstack/common/rpc/common.py
+++ b/openstack/common/rpc/common.py
@@ -21,6 +21,7 @@ import copy
import sys
import traceback
+from openstack.common import cfg
from openstack.common.gettextutils import _
from openstack.common import importutils
from openstack.common import jsonutils
@@ -28,9 +29,50 @@ from openstack.common import local
from openstack.common import log as logging
+CONF = cfg.CONF
LOG = logging.getLogger(__name__)
+'''RPC Envelope Version.
+
+This version number applies to the top level structure of messages sent out.
+It does *not* apply to the message payload, which must be versioned
+independently. For example, when using rpc APIs, a version number is applied
+for changes to the API being exposed over rpc. This version number is handled
+in the rpc proxy and dispatcher modules.
+
+This version number applies to the message envelope that is used in the
+serialization done inside the rpc layer. See serialize_msg() and
+deserialize_msg().
+
+The current message format (version 2.0) is very simple. It is:
+
+ {
+ 'oslo.version': <RPC Envelope Version as a String>,
+ 'oslo.message': <Application Message Payload, JSON encoded>
+ }
+
+Message format version '1.0' is just considered to be the messages we sent
+without a message envelope.
+
+So, the current message envelope just includes the envelope version. It may
+eventually contain additional information, such as a signature for the message
+payload.
+
+We will JSON encode the application message payload. The message envelope,
+which includes the JSON encoded application message body, will be passed down
+to the messaging libraries as a dict.
+'''
+_RPC_ENVELOPE_VERSION = '2.0'
+
+_VERSION_KEY = 'oslo.version'
+_MESSAGE_KEY = 'oslo.message'
+
+
+# TODO(russellb) Turn this on after Grizzly.
+_SEND_RPC_ENVELOPE = False
+
+
class RPCException(Exception):
message = _("An unknown RPC related exception occurred.")
@@ -91,6 +133,11 @@ class UnsupportedRpcVersion(RPCException):
"this endpoint.")
+class UnsupportedRpcEnvelopeVersion(RPCException):
+ message = _("Specified RPC envelope version, %(version)s, "
+ "not supported by this endpoint.")
+
+
class Connection(object):
"""A connection, returned by rpc.create_connection().
@@ -344,3 +391,74 @@ def client_exceptions(*exceptions):
return catch_client_exception(exceptions, func, *args, **kwargs)
return inner
return outer
+
+
+def version_is_compatible(imp_version, version):
+ """Determine whether versions are compatible.
+
+ :param imp_version: The version implemented
+ :param version: The version requested by an incoming message.
+ """
+ version_parts = version.split('.')
+ imp_version_parts = imp_version.split('.')
+ if int(version_parts[0]) != int(imp_version_parts[0]): # Major
+ return False
+ if int(version_parts[1]) > int(imp_version_parts[1]): # Minor
+ return False
+ return True
+
+
+def serialize_msg(raw_msg):
+ if not _SEND_RPC_ENVELOPE:
+ return raw_msg
+
+ # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
+ # information about this format.
+ msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
+ _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
+
+ return msg
+
+
+def deserialize_msg(msg):
+ # NOTE(russellb): Hang on to your hats, this road is about to
+ # get a little bumpy.
+ #
+ # Robustness Principle:
+ # "Be strict in what you send, liberal in what you accept."
+ #
+ # At this point we have to do a bit of guessing about what it
+ # is we just received. Here is the set of possibilities:
+ #
+ # 1) We received a dict. This could be 2 things:
+ #
+ # a) Inspect it to see if it looks like a standard message envelope.
+ # If so, great!
+ #
+ # b) If it doesn't look like a standard message envelope, it could either
+ # be a notification (which we don't wrap), or a message from before
+ # we added a message envelope (referred to as version 1.0).
+ # Just return the message as-is.
+ #
+ # 2) It's any other non-dict type. Just return it and hope for the best.
+ # This case covers return values from rpc.call() from before message
+ # envelopes were used. (messages to call a method were always a dict)
+
+ if not isinstance(msg, dict):
+ # See #2 above.
+ return msg
+
+ base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
+ if not all(map(lambda key: key in msg, base_envelope_keys)):
+ # See #1.b above.
+ return msg
+
+ # At this point we think we have the message envelope
+ # format we were expecting. (#1.a above)
+
+ if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
+ raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
+
+ raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
+
+ return raw_msg
diff --git a/openstack/common/rpc/dispatcher.py b/openstack/common/rpc/dispatcher.py
index f09879c..0adc4ef 100644
--- a/openstack/common/rpc/dispatcher.py
+++ b/openstack/common/rpc/dispatcher.py
@@ -103,21 +103,6 @@ class RpcDispatcher(object):
self.callbacks = callbacks
super(RpcDispatcher, self).__init__()
- @staticmethod
- def _is_compatible(mversion, version):
- """Determine whether versions are compatible.
-
- :param mversion: The API version implemented by a callback.
- :param version: The API version requested by an incoming message.
- """
- version_parts = version.split('.')
- mversion_parts = mversion.split('.')
- if int(version_parts[0]) != int(mversion_parts[0]): # Major
- return False
- if int(version_parts[1]) > int(mversion_parts[1]): # Minor
- return False
- return True
-
def dispatch(self, ctxt, version, method, **kwargs):
"""Dispatch a message based on a requested version.
@@ -139,7 +124,8 @@ class RpcDispatcher(object):
rpc_api_version = proxyobj.RPC_API_VERSION
else:
rpc_api_version = '1.0'
- is_compatible = self._is_compatible(rpc_api_version, version)
+ is_compatible = rpc_common.version_is_compatible(rpc_api_version,
+ version)
had_compatible = had_compatible or is_compatible
if not hasattr(proxyobj, method):
continue
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index ced4e96..cdbba4e 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -162,7 +162,8 @@ class ConsumerBase(object):
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
try:
- callback(message.payload)
+ msg = rpc_common.deserialize_msg(message.payload)
+ callback(msg)
message.ack()
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index 7f21461..a6dd983 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -124,7 +124,8 @@ class ConsumerBase(object):
"""Fetch the message and pass it to the callback object"""
message = self.receiver.fetch()
try:
- self.callback(message.content)
+ msg = rpc_common.deserialize_msg(message.content)
+ self.callback(msg)
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index 33562b0..880896b 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -28,7 +28,6 @@ import greenlet
from openstack.common import cfg
from openstack.common.gettextutils import _
from openstack.common import importutils
-from openstack.common import jsonutils
from openstack.common.rpc import common as rpc_common
@@ -77,27 +76,6 @@ ZMQ_CTX = None # ZeroMQ Context, must be global.
matchmaker = None # memoized matchmaker object
-def _serialize(data):
- """
- Serialization wrapper
- We prefer using JSON, but it cannot encode all types.
- Error if a developer passes us bad data.
- """
- try:
- return str(jsonutils.dumps(data, ensure_ascii=True))
- except TypeError:
- LOG.error(_("JSON serialization failed."))
- raise
-
-
-def _deserialize(data):
- """
- Deserialization wrapper
- """
- LOG.debug(_("Deserializing: %s"), data)
- return jsonutils.loads(data)
-
-
class ZmqSocket(object):
"""
A tiny wrapper around ZeroMQ to simplify the send/recv protocol
@@ -205,9 +183,10 @@ class ZmqClient(object):
def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
self.outq = ZmqSocket(addr, socket_type, bind=bind)
- def cast(self, msg_id, topic, data):
- self.outq.send([str(msg_id), str(topic), str('cast'),
- _serialize(data)])
+ def cast(self, msg_id, topic, data, serialize=True):
+ if serialize:
+ data = rpc_common.serialize_msg(data)
+ self.outq.send([str(msg_id), str(topic), str('cast'), data])
def close(self):
self.outq.close()
@@ -232,11 +211,11 @@ class RpcContext(rpc_common.CommonRpcContext):
@classmethod
def marshal(self, ctx):
ctx_data = ctx.to_dict()
- return _serialize(ctx_data)
+ return rpc_common.serialize_msg(ctx_data)
@classmethod
def unmarshal(self, data):
- return RpcContext.from_dict(_deserialize(data))
+ return RpcContext.from_dict(rpc_common.deserialize_msg(data))
class InternalContext(object):
@@ -433,11 +412,11 @@ class ZmqProxy(ZmqBaseReactor):
sock_type = zmq.PUB
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
- inside = _deserialize(in_msg)
+ inside = rpc_common.deserialize_msg(in_msg)
msg_id = inside[-1]['args']['msg_id']
response = inside[-1]['args']['response']
LOG.debug(_("->response->%s"), response)
- data = [str(msg_id), _serialize(response)]
+ data = [str(msg_id), rpc_common.serialize_msg(response)]
else:
sock_type = zmq.PUSH
@@ -480,7 +459,7 @@ class ZmqReactor(ZmqBaseReactor):
msg_id, topic, style, in_msg = data
- ctx, request = _deserialize(in_msg)
+ ctx, request = rpc_common.deserialize_msg(in_msg)
ctx = RpcContext.unmarshal(ctx)
proxy = self.proxies[sock]
@@ -531,7 +510,7 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread()
-def _cast(addr, context, msg_id, topic, msg, timeout=None):
+def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@@ -540,7 +519,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None):
conn = ZmqClient(addr)
# assumes cast can't return an exception
- conn.cast(msg_id, topic, payload)
+ conn.cast(msg_id, topic, payload, serialize)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
@@ -590,7 +569,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
- responses = _deserialize(msg[-1])
+ responses = rpc_common.deserialize_msg(msg[-1])
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")
@@ -609,7 +588,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
return responses[-1]
-def _multi_send(method, context, topic, msg, timeout=None):
+def _multi_send(method, context, topic, msg, timeout=None, serialize=True):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
@@ -635,7 +614,7 @@ def _multi_send(method, context, topic, msg, timeout=None):
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
- _topic, _topic, msg, timeout)
+ _topic, _topic, msg, timeout, serialize)
return
return method(_addr, context, _topic, _topic, msg, timeout)
@@ -676,6 +655,7 @@ def notify(conf, context, topic, msg, **kwargs):
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic.replace('.', '-')
+ kwargs['serialize'] = False
cast(conf, context, topic, msg, **kwargs)
diff --git a/tests/unit/rpc/test_common.py b/tests/unit/rpc/test_common.py
index 6f8b7ff..78f4b93 100644
--- a/tests/unit/rpc/test_common.py
+++ b/tests/unit/rpc/test_common.py
@@ -236,3 +236,32 @@ class RpcCommonTestCase(test_utils.BaseTestCase):
self.assertRaises(rpc_common.ClientException, naughty)
self.assertRaises(ValueError, really_naughty)
+
+ def test_serialize_msg_v1(self):
+ self.stubs.Set(rpc_common, '_SEND_RPC_ENVELOPE', False)
+ msg = {'foo': 'bar'}
+ self.assertEqual(msg, rpc_common.serialize_msg(msg))
+
+ def test_serialize_msg_v2(self):
+ self.stubs.Set(rpc_common, '_SEND_RPC_ENVELOPE', True)
+ msg = {'foo': 'bar'}
+ s_msg = {'oslo.version': rpc_common._RPC_ENVELOPE_VERSION,
+ 'oslo.message': jsonutils.dumps(msg)}
+ serialized = rpc_common.serialize_msg(msg)
+
+ self.assertEqual(s_msg, rpc_common.serialize_msg(msg))
+
+ self.assertEqual(msg, rpc_common.deserialize_msg(serialized))
+
+ def test_deserialize_msg_no_envelope(self):
+ self.assertEqual(1, rpc_common.deserialize_msg(1))
+ self.assertEqual([], rpc_common.deserialize_msg([]))
+ self.assertEqual({}, rpc_common.deserialize_msg({}))
+ self.assertEqual('foo', rpc_common.deserialize_msg('foo'))
+
+ def test_deserialize_msg_bad_version(self):
+ s_msg = {'oslo.version': '8675309.0',
+ 'oslo.message': 'whatever'}
+
+ self.assertRaises(rpc_common.UnsupportedRpcEnvelopeVersion,
+ rpc_common.deserialize_msg, s_msg)
diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py
index 5da05de..921415d 100644
--- a/tests/unit/rpc/test_kombu.py
+++ b/tests/unit/rpc/test_kombu.py
@@ -118,7 +118,7 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
self.received_message = message
conn.declare_topic_consumer('a_topic', _callback)
- conn.topic_send('a_topic', message)
+ conn.topic_send('a_topic', rpc_common.serialize_msg(message))
conn.consume(limit=1)
conn.close()
@@ -138,7 +138,7 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
conn.declare_topic_consumer('a_topic', _callback,
exchange_name="foorbar")
- conn.topic_send('a_topic', message)
+ conn.topic_send('a_topic', rpc_common.serialize_msg(message))
conn.consume(limit=1)
conn.close()
@@ -162,7 +162,7 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
conn.declare_topic_consumer('a_topic', _callback1, queue_name='queue1')
conn.declare_topic_consumer('a_topic', _callback2, queue_name='queue2')
- conn.topic_send('a_topic', message)
+ conn.topic_send('a_topic', rpc_common.serialize_msg(message))
conn.consume(limit=2)
conn.close()
@@ -192,7 +192,7 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
exchange_name="abc")
conn.declare_topic_consumer('a_topic', _callback2, queue_name='queue2',
exchange_name="abc")
- conn.topic_send('a_topic', message)
+ conn.topic_send('a_topic', rpc_common.serialize_msg(message))
conn.consume(limit=2)
conn.close()
@@ -222,7 +222,7 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
exchange_name="abc")
conn.declare_topic_consumer('a_topic', _callback2, queue_name='queue2',
exchange_name="def")
- conn.topic_send('a_topic', message)
+ conn.topic_send('a_topic', rpc_common.serialize_msg(message))
conn.consume(limit=2)
conn.close()
@@ -241,7 +241,7 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
self.received_message = message
conn.declare_direct_consumer('a_direct', _callback)
- conn.direct_send('a_direct', message)
+ conn.direct_send('a_direct', rpc_common.serialize_msg(message))
conn.consume(limit=1)
conn.close()
@@ -438,7 +438,7 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
self.received_message = message
conn.declare_direct_consumer('a_direct', _callback)
- conn.direct_send('a_direct', message)
+ conn.direct_send('a_direct', rpc_common.serialize_msg(message))
info = _raise_exc_stub(self.stubs, 1, conn.connection,
'drain_events', 'foo timeout foo')