summaryrefslogtreecommitdiffstats
path: root/openstack/common/rpc
diff options
context:
space:
mode:
authorEric Windisch <eric@cloudscaling.com>2013-02-19 14:39:09 -0500
committerEric Windisch <eric@cloudscaling.com>2013-02-19 15:07:54 -0500
commit22c497097b0d0bd461be40a7a03290aa0b4179f2 (patch)
tree27032113e286bd65a99d30dec310a36c988d1f18 /openstack/common/rpc
parent70891c271e011f59792933eaf65c3214493ef14a (diff)
downloadoslo-22c497097b0d0bd461be40a7a03290aa0b4179f2.tar.gz
oslo-22c497097b0d0bd461be40a7a03290aa0b4179f2.tar.xz
oslo-22c497097b0d0bd461be40a7a03290aa0b4179f2.zip
Fast serialization of RPC envelopes for Kombu
By flattening the dictionary and turning it into a fast-serialized string, we save ourselves from Kombu's very slow JSON serialization routines. Change-Id: I64796265c7cc89a05406faabd8d7b253fe3b8acb
Diffstat (limited to 'openstack/common/rpc')
-rw-r--r--openstack/common/rpc/impl_kombu.py28
1 files changed, 26 insertions, 2 deletions
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index 3f0fa1f..fb5d16f 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -98,6 +98,21 @@ cfg.CONF.register_opts(kombu_opts)
LOG = rpc_common.LOG
+def _unflatten_envelope(packenv):
+ """Unflattens the RPC envelope.
+ Takes a list and returns a dictionary.
+ i.e. [1,2,3,4] => {1: 2, 3: 4}
+ """
+ i = iter(packenv)
+ h = {}
+ try:
+ while True:
+ k = i.next()
+ h[k] = i.next()
+ except StopIteration:
+ return h
+
+
def _get_queue_arguments(conf):
"""Construct the arguments for declaring a queue.
@@ -163,8 +178,13 @@ class ConsumerBase(object):
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
try:
- msg = rpc_common.deserialize_msg(message.payload)
- callback(msg)
+ if '\0' in message.payload:
+ msg = _unflatten_envelope(message.payload.split('\0'))
+ else:
+ msg = message.payload
+
+ raw_msg = rpc_common.deserialize_msg(msg)
+ callback(raw_msg)
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
@@ -306,6 +326,10 @@ class Publisher(object):
def send(self, msg, timeout=None):
"""Send a message"""
+ if isinstance(msg, dict) and rpc_common._VERSION_KEY in msg:
+ # Fast-serialize envelopes to avoid Kombu's JSON serialization.
+ msg = '\0'.join(reduce(lambda x, y: x + y, msg.items()))
+
if timeout:
#
# AMQP TTL is in milliseconds when set in the header.