summaryrefslogtreecommitdiffstats
path: root/openstack/common
diff options
context:
space:
mode:
authorBen Nemec <bnemec@us.ibm.com>2013-05-09 19:06:45 +0000
committerBen Nemec <bnemec@us.ibm.com>2013-06-03 22:08:12 +0000
commit7ce54410485b33cffc39c7ffb96eae422b88601c (patch)
tree5f4f6d38d5b8c9089e463068801e8a40a72a0106 /openstack/common
parente3e497915f0cc47bba10aad4ef5e940669801cb3 (diff)
downloadoslo-7ce54410485b33cffc39c7ffb96eae422b88601c.tar.gz
oslo-7ce54410485b33cffc39c7ffb96eae422b88601c.tar.xz
oslo-7ce54410485b33cffc39c7ffb96eae422b88601c.zip
Fix problem with long messages in Qpid
Qpid has a limitation where it cannot serialize a dict containing a string greater than 65535 characters. This change alters the Qpid implementation to JSON encode the dict before sending it, but only if Qpid would fail to serialize it. This maintains as much backward compatibility as possible, though long messages will still fail if they are sent to an older receiver. Fixes bug 1175808 Change-Id: I5d104e099f523508dae2b657f7d06d96984b10f0
Diffstat (limited to 'openstack/common')
-rw-r--r--openstack/common/rpc/impl_qpid.py47
1 files changed, 47 insertions, 0 deletions
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index 32680e1..7352517 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -31,6 +31,7 @@ from openstack.common import log as logging
from openstack.common.rpc import amqp as rpc_amqp
from openstack.common.rpc import common as rpc_common
+qpid_codec = importutils.try_import("qpid.codec010")
qpid_messaging = importutils.try_import("qpid.messaging")
qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
@@ -69,6 +70,8 @@ qpid_opts = [
cfg.CONF.register_opts(qpid_opts)
+JSON_CONTENT_TYPE = 'application/json; charset=utf8'
+
class ConsumerBase(object):
"""Consumer base class."""
@@ -123,10 +126,27 @@ class ConsumerBase(object):
self.receiver = session.receiver(self.address)
self.receiver.capacity = 1
+ def _unpack_json_msg(self, msg):
+ """Load the JSON data in msg if msg.content_type indicates that it
+ is necessary. Put the loaded data back into msg.content and
+ update msg.content_type appropriately.
+
+ A Qpid Message containing a dict will have a content_type of
+ 'amqp/map', whereas one containing a string that needs to be converted
+ back from JSON will have a content_type of JSON_CONTENT_TYPE.
+
+ :param msg: a Qpid Message object
+ :returns: None
+ """
+ if msg.content_type == JSON_CONTENT_TYPE:
+ msg.content = jsonutils.loads(msg.content)
+ msg.content_type = 'amqp/map'
+
def consume(self):
"""Fetch the message and pass it to the callback object."""
message = self.receiver.fetch()
try:
+ self._unpack_json_msg(message)
msg = rpc_common.deserialize_msg(message.content)
self.callback(msg)
except Exception:
@@ -228,8 +248,35 @@ class Publisher(object):
"""Re-establish the Sender after a reconnection."""
self.sender = session.sender(self.address)
+ def _pack_json_msg(self, msg):
+ """Qpid cannot serialize dicts containing strings longer than 65535
+ characters. This function dumps the message content to a JSON
+ string, which Qpid is able to handle.
+
+ :param msg: May be either a Qpid Message object or a bare dict.
+ :returns: A Qpid Message with its content field JSON encoded.
+ """
+ try:
+ msg.content = jsonutils.dumps(msg.content)
+ except AttributeError:
+ # Need to have a Qpid message so we can set the content_type.
+ msg = qpid_messaging.Message(jsonutils.dumps(msg))
+ msg.content_type = JSON_CONTENT_TYPE
+ return msg
+
def send(self, msg):
"""Send a message."""
+ try:
+ # Check if Qpid can encode the message
+ check_msg = msg
+ if not hasattr(check_msg, 'content_type'):
+ check_msg = qpid_messaging.Message(msg)
+ content_type = check_msg.content_type
+ enc, dec = qpid_messaging.message.get_codec(content_type)
+ enc(check_msg.content)
+ except qpid_codec.CodecException:
+ # This means the message couldn't be serialized as a dict.
+ msg = self._pack_json_msg(msg)
self.sender.send(msg)