diff options
author | Eric Windisch <eric@cloudscaling.com> | 2013-02-19 14:39:09 -0500 |
---|---|---|
committer | Eric Windisch <eric@cloudscaling.com> | 2013-02-19 15:07:54 -0500 |
commit | 22c497097b0d0bd461be40a7a03290aa0b4179f2 (patch) | |
tree | 27032113e286bd65a99d30dec310a36c988d1f18 /openstack/common/rpc | |
parent | 70891c271e011f59792933eaf65c3214493ef14a (diff) | |
download | oslo-22c497097b0d0bd461be40a7a03290aa0b4179f2.tar.gz oslo-22c497097b0d0bd461be40a7a03290aa0b4179f2.tar.xz oslo-22c497097b0d0bd461be40a7a03290aa0b4179f2.zip |
Fast serialization of RPC envelopes for Kombu
By flattening the dictionary and turning
it into a fast-serialized string, we save
ourselves from Kombu's very slow JSON serialization
routines.
Change-Id: I64796265c7cc89a05406faabd8d7b253fe3b8acb
Diffstat (limited to 'openstack/common/rpc')
-rw-r--r-- | openstack/common/rpc/impl_kombu.py | 28 |
1 files changed, 26 insertions, 2 deletions
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 3f0fa1f..fb5d16f 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -98,6 +98,21 @@ cfg.CONF.register_opts(kombu_opts) LOG = rpc_common.LOG +def _unflatten_envelope(packenv): + """Unflattens the RPC envelope. + Takes a list and returns a dictionary. + i.e. [1,2,3,4] => {1: 2, 3: 4} + """ + i = iter(packenv) + h = {} + try: + while True: + k = i.next() + h[k] = i.next() + except StopIteration: + return h + + def _get_queue_arguments(conf): """Construct the arguments for declaring a queue. @@ -163,8 +178,13 @@ class ConsumerBase(object): def _callback(raw_message): message = self.channel.message_to_python(raw_message) try: - msg = rpc_common.deserialize_msg(message.payload) - callback(msg) + if '\0' in message.payload: + msg = _unflatten_envelope(message.payload.split('\0')) + else: + msg = message.payload + + raw_msg = rpc_common.deserialize_msg(msg) + callback(raw_msg) except Exception: LOG.exception(_("Failed to process message... skipping it.")) finally: @@ -306,6 +326,10 @@ class Publisher(object): def send(self, msg, timeout=None): """Send a message""" + if isinstance(msg, dict) and rpc_common._VERSION_KEY in msg: + # Fast-serialize envelopes to avoid Kombu's JSON serialization. + msg = '\0'.join(reduce(lambda x, y: x + y, msg.items())) + if timeout: # # AMQP TTL is in milliseconds when set in the header. |