summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBen Nemec <bnemec@us.ibm.com>2013-05-09 19:06:45 +0000
committerBen Nemec <bnemec@us.ibm.com>2013-06-03 22:08:12 +0000
commit7ce54410485b33cffc39c7ffb96eae422b88601c (patch)
tree5f4f6d38d5b8c9089e463068801e8a40a72a0106
parente3e497915f0cc47bba10aad4ef5e940669801cb3 (diff)
downloadoslo-7ce54410485b33cffc39c7ffb96eae422b88601c.tar.gz
oslo-7ce54410485b33cffc39c7ffb96eae422b88601c.tar.xz
oslo-7ce54410485b33cffc39c7ffb96eae422b88601c.zip
Fix problem with long messages in Qpid
Qpid has a limitation where it cannot serialize a dict containing a string greater than 65535 characters. This change alters the Qpid implementation to JSON encode the dict before sending it, but only if Qpid would fail to serialize it. This maintains as much backward compatibility as possible, though long messages will still fail if they are sent to an older receiver. Fixes bug 1175808 Change-Id: I5d104e099f523508dae2b657f7d06d96984b10f0
-rw-r--r--openstack/common/rpc/impl_qpid.py47
-rw-r--r--tests/unit/rpc/test_qpid.py83
2 files changed, 130 insertions, 0 deletions
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index 32680e1..7352517 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -31,6 +31,7 @@ from openstack.common import log as logging
from openstack.common.rpc import amqp as rpc_amqp
from openstack.common.rpc import common as rpc_common
+qpid_codec = importutils.try_import("qpid.codec010")
qpid_messaging = importutils.try_import("qpid.messaging")
qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
@@ -69,6 +70,8 @@ qpid_opts = [
cfg.CONF.register_opts(qpid_opts)
+JSON_CONTENT_TYPE = 'application/json; charset=utf8'
+
class ConsumerBase(object):
"""Consumer base class."""
@@ -123,10 +126,27 @@ class ConsumerBase(object):
self.receiver = session.receiver(self.address)
self.receiver.capacity = 1
+ def _unpack_json_msg(self, msg):
+ """Load the JSON data in msg if msg.content_type indicates that it
+ is necessary. Put the loaded data back into msg.content and
+ update msg.content_type appropriately.
+
+ A Qpid Message containing a dict will have a content_type of
+ 'amqp/map', whereas one containing a string that needs to be converted
+ back from JSON will have a content_type of JSON_CONTENT_TYPE.
+
+ :param msg: a Qpid Message object
+ :returns: None
+ """
+ if msg.content_type == JSON_CONTENT_TYPE:
+ msg.content = jsonutils.loads(msg.content)
+ msg.content_type = 'amqp/map'
+
def consume(self):
"""Fetch the message and pass it to the callback object."""
message = self.receiver.fetch()
try:
+ self._unpack_json_msg(message)
msg = rpc_common.deserialize_msg(message.content)
self.callback(msg)
except Exception:
@@ -228,8 +248,35 @@ class Publisher(object):
"""Re-establish the Sender after a reconnection."""
self.sender = session.sender(self.address)
+ def _pack_json_msg(self, msg):
+ """Qpid cannot serialize dicts containing strings longer than 65535
+ characters. This function dumps the message content to a JSON
+ string, which Qpid is able to handle.
+
+ :param msg: May be either a Qpid Message object or a bare dict.
+ :returns: A Qpid Message with its content field JSON encoded.
+ """
+ try:
+ msg.content = jsonutils.dumps(msg.content)
+ except AttributeError:
+ # Need to have a Qpid message so we can set the content_type.
+ msg = qpid_messaging.Message(jsonutils.dumps(msg))
+ msg.content_type = JSON_CONTENT_TYPE
+ return msg
+
def send(self, msg):
"""Send a message."""
+ try:
+ # Check if Qpid can encode the message
+ check_msg = msg
+ if not hasattr(check_msg, 'content_type'):
+ check_msg = qpid_messaging.Message(msg)
+ content_type = check_msg.content_type
+ enc, dec = qpid_messaging.message.get_codec(content_type)
+ enc(check_msg.content)
+ except qpid_codec.CodecException:
+ # This means the message couldn't be serialized as a dict.
+ msg = self._pack_json_msg(msg)
self.sender.send(msg)
diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py
index 1630ac4..4af4f35 100644
--- a/tests/unit/rpc/test_qpid.py
+++ b/tests/unit/rpc/test_qpid.py
@@ -28,6 +28,7 @@ import mox
from oslo.config import cfg
from openstack.common import context
+from openstack.common import jsonutils
from openstack.common.rpc import amqp as rpc_amqp
from openstack.common.rpc import common as rpc_common
from tests import utils
@@ -491,6 +492,88 @@ class RpcQpidTestCase(utils.BaseTestCase):
def test_multicall(self):
self._test_call(multi=True)
+ def _test_publisher(self, message=True):
+ """Test that messages containing long strings are correctly serialized
+ in a way that Qpid can handle.
+
+ :param message: The publisher may be passed either a Qpid Message
+ object or a bare dict. This parameter controls which of those the test
+ will send.
+ """
+ self.sent_msg = None
+
+ def send_stub(msg):
+ self.sent_msg = msg
+
+ # Qpid cannot serialize a dict containing a string > 65535 chars.
+ raw_msg = {'test': 'a' * 65536}
+ if message:
+ base_msg = qpid.messaging.Message(raw_msg)
+ else:
+ base_msg = raw_msg
+ expected_msg = qpid.messaging.Message(jsonutils.dumps(raw_msg))
+ expected_msg.content_type = impl_qpid.JSON_CONTENT_TYPE
+ mock_session = self.mox.CreateMock(self.orig_session)
+ mock_sender = self.mox.CreateMock(self.orig_sender)
+ mock_session.sender(mox.IgnoreArg()).AndReturn(mock_sender)
+ self.stubs.Set(mock_sender, 'send', send_stub)
+ self.mox.ReplayAll()
+
+ publisher = impl_qpid.Publisher(mock_session, 'test_node')
+ publisher.send(base_msg)
+
+ self.assertEqual(self.sent_msg.content, expected_msg.content)
+ self.assertEqual(self.sent_msg.content_type, expected_msg.content_type)
+
+ def test_publisher_long_message(self):
+ self._test_publisher(message=True)
+
+ def test_publisher_long_dict(self):
+ self._test_publisher(message=False)
+
+ def _test_consumer_long_message(self, json=True):
+ """Verify that the Qpid implementation correctly deserializes
+ message content.
+
+ :param json: For compatibility, this code needs to support both
+ messages that are and are not JSON encoded. This param
+ specifies which is being tested.
+ """
+ def fake_callback(msg):
+ self.received_msg = msg
+
+ # The longest string Qpid can handle itself
+ chars = 65535
+ if json:
+ # The first length that requires JSON encoding
+ chars = 65536
+ raw_msg = {'test': 'a' * chars}
+ if json:
+ fake_message = qpid.messaging.Message(jsonutils.dumps(raw_msg))
+ fake_message.content_type = impl_qpid.JSON_CONTENT_TYPE
+ else:
+ fake_message = qpid.messaging.Message(raw_msg)
+ mock_session = self.mox.CreateMock(self.orig_session)
+ mock_receiver = self.mox.CreateMock(self.orig_receiver)
+ mock_session.receiver(mox.IgnoreArg()).AndReturn(mock_receiver)
+ mock_receiver.fetch().AndReturn(fake_message)
+ mock_session.acknowledge(mox.IgnoreArg())
+ self.mox.ReplayAll()
+
+ consumer = impl_qpid.DirectConsumer(None,
+ mock_session,
+ 'bogus_msg_id',
+ fake_callback)
+ consumer.consume()
+
+ self.assertEqual(self.received_msg, raw_msg)
+
+ def test_consumer_long_message(self):
+ self._test_consumer_long_message(json=True)
+
+ def test_consumer_long_message_no_json(self):
+ self._test_consumer_long_message(json=False)
+
#
#from nova.tests.rpc import common