summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--openstack/common/rpc/impl_qpid.py47
-rw-r--r--tests/unit/rpc/test_qpid.py83
2 files changed, 130 insertions, 0 deletions
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index 32680e1..7352517 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -31,6 +31,7 @@ from openstack.common import log as logging
from openstack.common.rpc import amqp as rpc_amqp
from openstack.common.rpc import common as rpc_common
+qpid_codec = importutils.try_import("qpid.codec010")
qpid_messaging = importutils.try_import("qpid.messaging")
qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
@@ -69,6 +70,8 @@ qpid_opts = [
cfg.CONF.register_opts(qpid_opts)
+JSON_CONTENT_TYPE = 'application/json; charset=utf8'
+
class ConsumerBase(object):
"""Consumer base class."""
@@ -123,10 +126,27 @@ class ConsumerBase(object):
self.receiver = session.receiver(self.address)
self.receiver.capacity = 1
+ def _unpack_json_msg(self, msg):
+ """Load the JSON data in msg if msg.content_type indicates that it
+ is necessary. Put the loaded data back into msg.content and
+ update msg.content_type appropriately.
+
+ A Qpid Message containing a dict will have a content_type of
+ 'amqp/map', whereas one containing a string that needs to be converted
+ back from JSON will have a content_type of JSON_CONTENT_TYPE.
+
+ :param msg: a Qpid Message object
+ :returns: None
+ """
+ if msg.content_type == JSON_CONTENT_TYPE:
+ msg.content = jsonutils.loads(msg.content)
+ msg.content_type = 'amqp/map'
+
def consume(self):
"""Fetch the message and pass it to the callback object."""
message = self.receiver.fetch()
try:
+ self._unpack_json_msg(message)
msg = rpc_common.deserialize_msg(message.content)
self.callback(msg)
except Exception:
@@ -228,8 +248,35 @@ class Publisher(object):
"""Re-establish the Sender after a reconnection."""
self.sender = session.sender(self.address)
+ def _pack_json_msg(self, msg):
+ """Qpid cannot serialize dicts containing strings longer than 65535
+ characters. This function dumps the message content to a JSON
+ string, which Qpid is able to handle.
+
+ :param msg: May be either a Qpid Message object or a bare dict.
+ :returns: A Qpid Message with its content field JSON encoded.
+ """
+ try:
+ msg.content = jsonutils.dumps(msg.content)
+ except AttributeError:
+ # Need to have a Qpid message so we can set the content_type.
+ msg = qpid_messaging.Message(jsonutils.dumps(msg))
+ msg.content_type = JSON_CONTENT_TYPE
+ return msg
+
def send(self, msg):
"""Send a message."""
+ try:
+ # Check if Qpid can encode the message
+ check_msg = msg
+ if not hasattr(check_msg, 'content_type'):
+ check_msg = qpid_messaging.Message(msg)
+ content_type = check_msg.content_type
+ enc, dec = qpid_messaging.message.get_codec(content_type)
+ enc(check_msg.content)
+ except qpid_codec.CodecException:
+ # This means the message couldn't be serialized as a dict.
+ msg = self._pack_json_msg(msg)
self.sender.send(msg)
diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py
index 1630ac4..4af4f35 100644
--- a/tests/unit/rpc/test_qpid.py
+++ b/tests/unit/rpc/test_qpid.py
@@ -28,6 +28,7 @@ import mox
from oslo.config import cfg
from openstack.common import context
+from openstack.common import jsonutils
from openstack.common.rpc import amqp as rpc_amqp
from openstack.common.rpc import common as rpc_common
from tests import utils
@@ -491,6 +492,88 @@ class RpcQpidTestCase(utils.BaseTestCase):
def test_multicall(self):
self._test_call(multi=True)
+ def _test_publisher(self, message=True):
+ """Test that messages containing long strings are correctly serialized
+ in a way that Qpid can handle.
+
+ :param message: The publisher may be passed either a Qpid Message
+ object or a bare dict. This parameter controls which of those the test
+ will send.
+ """
+ self.sent_msg = None
+
+ def send_stub(msg):
+ self.sent_msg = msg
+
+ # Qpid cannot serialize a dict containing a string > 65535 chars.
+ raw_msg = {'test': 'a' * 65536}
+ if message:
+ base_msg = qpid.messaging.Message(raw_msg)
+ else:
+ base_msg = raw_msg
+ expected_msg = qpid.messaging.Message(jsonutils.dumps(raw_msg))
+ expected_msg.content_type = impl_qpid.JSON_CONTENT_TYPE
+ mock_session = self.mox.CreateMock(self.orig_session)
+ mock_sender = self.mox.CreateMock(self.orig_sender)
+ mock_session.sender(mox.IgnoreArg()).AndReturn(mock_sender)
+ self.stubs.Set(mock_sender, 'send', send_stub)
+ self.mox.ReplayAll()
+
+ publisher = impl_qpid.Publisher(mock_session, 'test_node')
+ publisher.send(base_msg)
+
+ self.assertEqual(self.sent_msg.content, expected_msg.content)
+ self.assertEqual(self.sent_msg.content_type, expected_msg.content_type)
+
+ def test_publisher_long_message(self):
+ self._test_publisher(message=True)
+
+ def test_publisher_long_dict(self):
+ self._test_publisher(message=False)
+
+ def _test_consumer_long_message(self, json=True):
+ """Verify that the Qpid implementation correctly deserializes
+ message content.
+
+ :param json: For compatibility, this code needs to support both
+ messages that are and are not JSON encoded. This param
+ specifies which is being tested.
+ """
+ def fake_callback(msg):
+ self.received_msg = msg
+
+ # The longest string Qpid can handle itself
+ chars = 65535
+ if json:
+ # The first length that requires JSON encoding
+ chars = 65536
+ raw_msg = {'test': 'a' * chars}
+ if json:
+ fake_message = qpid.messaging.Message(jsonutils.dumps(raw_msg))
+ fake_message.content_type = impl_qpid.JSON_CONTENT_TYPE
+ else:
+ fake_message = qpid.messaging.Message(raw_msg)
+ mock_session = self.mox.CreateMock(self.orig_session)
+ mock_receiver = self.mox.CreateMock(self.orig_receiver)
+ mock_session.receiver(mox.IgnoreArg()).AndReturn(mock_receiver)
+ mock_receiver.fetch().AndReturn(fake_message)
+ mock_session.acknowledge(mox.IgnoreArg())
+ self.mox.ReplayAll()
+
+ consumer = impl_qpid.DirectConsumer(None,
+ mock_session,
+ 'bogus_msg_id',
+ fake_callback)
+ consumer.consume()
+
+ self.assertEqual(self.received_msg, raw_msg)
+
+ def test_consumer_long_message(self):
+ self._test_consumer_long_message(json=True)
+
+ def test_consumer_long_message_no_json(self):
+ self._test_consumer_long_message(json=False)
+
#
#from nova.tests.rpc import common