diff options
| author | Ben Nemec <bnemec@us.ibm.com> | 2013-05-09 19:06:45 +0000 |
|---|---|---|
| committer | Ben Nemec <bnemec@us.ibm.com> | 2013-06-03 22:08:12 +0000 |
| commit | 7ce54410485b33cffc39c7ffb96eae422b88601c (patch) | |
| tree | 5f4f6d38d5b8c9089e463068801e8a40a72a0106 /openstack/common | |
| parent | e3e497915f0cc47bba10aad4ef5e940669801cb3 (diff) | |
| download | oslo-7ce54410485b33cffc39c7ffb96eae422b88601c.tar.gz oslo-7ce54410485b33cffc39c7ffb96eae422b88601c.tar.xz oslo-7ce54410485b33cffc39c7ffb96eae422b88601c.zip | |
Fix problem with long messages in Qpid
Qpid has a limitation where it cannot serialize a dict containing
a string greater than 65535 characters. This change alters the
Qpid implementation to JSON encode the dict before sending it, but
only if Qpid would fail to serialize it. This maintains as much
backward compatibility as possible, though long messages will
still fail if they are sent to an older receiver.
Fixes bug 1175808
Change-Id: I5d104e099f523508dae2b657f7d06d96984b10f0
Diffstat (limited to 'openstack/common')
| -rw-r--r-- | openstack/common/rpc/impl_qpid.py | 47 |
1 files changed, 47 insertions, 0 deletions
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index 32680e1..7352517 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -31,6 +31,7 @@ from openstack.common import log as logging from openstack.common.rpc import amqp as rpc_amqp from openstack.common.rpc import common as rpc_common +qpid_codec = importutils.try_import("qpid.codec010") qpid_messaging = importutils.try_import("qpid.messaging") qpid_exceptions = importutils.try_import("qpid.messaging.exceptions") @@ -69,6 +70,8 @@ qpid_opts = [ cfg.CONF.register_opts(qpid_opts) +JSON_CONTENT_TYPE = 'application/json; charset=utf8' + class ConsumerBase(object): """Consumer base class.""" @@ -123,10 +126,27 @@ class ConsumerBase(object): self.receiver = session.receiver(self.address) self.receiver.capacity = 1 + def _unpack_json_msg(self, msg): + """Load the JSON data in msg if msg.content_type indicates that it + is necessary. Put the loaded data back into msg.content and + update msg.content_type appropriately. + + A Qpid Message containing a dict will have a content_type of + 'amqp/map', whereas one containing a string that needs to be converted + back from JSON will have a content_type of JSON_CONTENT_TYPE. + + :param msg: a Qpid Message object + :returns: None + """ + if msg.content_type == JSON_CONTENT_TYPE: + msg.content = jsonutils.loads(msg.content) + msg.content_type = 'amqp/map' + def consume(self): """Fetch the message and pass it to the callback object.""" message = self.receiver.fetch() try: + self._unpack_json_msg(message) msg = rpc_common.deserialize_msg(message.content) self.callback(msg) except Exception: @@ -228,8 +248,35 @@ class Publisher(object): """Re-establish the Sender after a reconnection.""" self.sender = session.sender(self.address) + def _pack_json_msg(self, msg): + """Qpid cannot serialize dicts containing strings longer than 65535 + characters. This function dumps the message content to a JSON + string, which Qpid is able to handle. + + :param msg: May be either a Qpid Message object or a bare dict. + :returns: A Qpid Message with its content field JSON encoded. + """ + try: + msg.content = jsonutils.dumps(msg.content) + except AttributeError: + # Need to have a Qpid message so we can set the content_type. + msg = qpid_messaging.Message(jsonutils.dumps(msg)) + msg.content_type = JSON_CONTENT_TYPE + return msg + def send(self, msg): """Send a message.""" + try: + # Check if Qpid can encode the message + check_msg = msg + if not hasattr(check_msg, 'content_type'): + check_msg = qpid_messaging.Message(msg) + content_type = check_msg.content_type + enc, dec = qpid_messaging.message.get_codec(content_type) + enc(check_msg.content) + except qpid_codec.CodecException: + # This means the message couldn't be serialized as a dict. + msg = self._pack_json_msg(msg) self.sender.send(msg) |
