diff options
author | Jenkins <jenkins@review.openstack.org> | 2013-02-20 01:31:53 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2013-02-20 01:31:53 +0000 |
commit | 9e51e9a431b406d637183fd0d89c80f9ad52d061 (patch) | |
tree | e7d289796aa32c81f1b5f4cfa5e3fb42a43699f7 /openstack | |
parent | 0d171d65c0334b9558face4e07d22c4cbcaaadfb (diff) | |
parent | 22c497097b0d0bd461be40a7a03290aa0b4179f2 (diff) | |
download | oslo-9e51e9a431b406d637183fd0d89c80f9ad52d061.tar.gz oslo-9e51e9a431b406d637183fd0d89c80f9ad52d061.tar.xz oslo-9e51e9a431b406d637183fd0d89c80f9ad52d061.zip |
Merge "Fast serialization of RPC envelopes for Kombu"
Diffstat (limited to 'openstack')
-rw-r--r-- | openstack/common/rpc/impl_kombu.py | 28 |
1 files changed, 26 insertions, 2 deletions
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 3f0fa1f..fb5d16f 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -98,6 +98,21 @@ cfg.CONF.register_opts(kombu_opts) LOG = rpc_common.LOG +def _unflatten_envelope(packenv): + """Unflattens the RPC envelope. + Takes a list and returns a dictionary. + i.e. [1,2,3,4] => {1: 2, 3: 4} + """ + i = iter(packenv) + h = {} + try: + while True: + k = i.next() + h[k] = i.next() + except StopIteration: + return h + + def _get_queue_arguments(conf): """Construct the arguments for declaring a queue. @@ -163,8 +178,13 @@ class ConsumerBase(object): def _callback(raw_message): message = self.channel.message_to_python(raw_message) try: - msg = rpc_common.deserialize_msg(message.payload) - callback(msg) + if '\0' in message.payload: + msg = _unflatten_envelope(message.payload.split('\0')) + else: + msg = message.payload + + raw_msg = rpc_common.deserialize_msg(msg) + callback(raw_msg) except Exception: LOG.exception(_("Failed to process message... skipping it.")) finally: @@ -306,6 +326,10 @@ class Publisher(object): def send(self, msg, timeout=None): """Send a message""" + if isinstance(msg, dict) and rpc_common._VERSION_KEY in msg: + # Fast-serialize envelopes to avoid Kombu's JSON serialization. + msg = '\0'.join(reduce(lambda x, y: x + y, msg.items())) + if timeout: # # AMQP TTL is in milliseconds when set in the header. |