summaryrefslogtreecommitdiffstats
path: root/openstack
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-02-20 01:31:53 +0000
committerGerrit Code Review <review@openstack.org>2013-02-20 01:31:53 +0000
commit9e51e9a431b406d637183fd0d89c80f9ad52d061 (patch)
treee7d289796aa32c81f1b5f4cfa5e3fb42a43699f7 /openstack
parent0d171d65c0334b9558face4e07d22c4cbcaaadfb (diff)
parent22c497097b0d0bd461be40a7a03290aa0b4179f2 (diff)
downloadoslo-9e51e9a431b406d637183fd0d89c80f9ad52d061.tar.gz
oslo-9e51e9a431b406d637183fd0d89c80f9ad52d061.tar.xz
oslo-9e51e9a431b406d637183fd0d89c80f9ad52d061.zip
Merge "Fast serialization of RPC envelopes for Kombu"
Diffstat (limited to 'openstack')
-rw-r--r--openstack/common/rpc/impl_kombu.py28
1 files changed, 26 insertions, 2 deletions
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index 3f0fa1f..fb5d16f 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -98,6 +98,21 @@ cfg.CONF.register_opts(kombu_opts)
LOG = rpc_common.LOG
+def _unflatten_envelope(packenv):
+ """Unflattens the RPC envelope.
+ Takes a list and returns a dictionary.
+ i.e. [1,2,3,4] => {1: 2, 3: 4}
+ """
+ i = iter(packenv)
+ h = {}
+ try:
+ while True:
+ k = i.next()
+ h[k] = i.next()
+ except StopIteration:
+ return h
+
+
def _get_queue_arguments(conf):
"""Construct the arguments for declaring a queue.
@@ -163,8 +178,13 @@ class ConsumerBase(object):
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
try:
- msg = rpc_common.deserialize_msg(message.payload)
- callback(msg)
+ if '\0' in message.payload:
+ msg = _unflatten_envelope(message.payload.split('\0'))
+ else:
+ msg = message.payload
+
+ raw_msg = rpc_common.deserialize_msg(msg)
+ callback(raw_msg)
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
@@ -306,6 +326,10 @@ class Publisher(object):
def send(self, msg, timeout=None):
"""Send a message"""
+ if isinstance(msg, dict) and rpc_common._VERSION_KEY in msg:
+ # Fast-serialize envelopes to avoid Kombu's JSON serialization.
+ msg = '\0'.join(reduce(lambda x, y: x + y, msg.items()))
+
if timeout:
#
# AMQP TTL is in milliseconds when set in the header.