diff options
Diffstat (limited to 'openstack')
| -rw-r--r-- | openstack/common/rpc/impl_kombu.py | 28 |
1 files changed, 26 insertions, 2 deletions
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 3f0fa1f..fb5d16f 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -98,6 +98,21 @@ cfg.CONF.register_opts(kombu_opts) LOG = rpc_common.LOG +def _unflatten_envelope(packenv): + """Unflattens the RPC envelope. + Takes a list and returns a dictionary. + i.e. [1,2,3,4] => {1: 2, 3: 4} + """ + i = iter(packenv) + h = {} + try: + while True: + k = i.next() + h[k] = i.next() + except StopIteration: + return h + + def _get_queue_arguments(conf): """Construct the arguments for declaring a queue. @@ -163,8 +178,13 @@ class ConsumerBase(object): def _callback(raw_message): message = self.channel.message_to_python(raw_message) try: - msg = rpc_common.deserialize_msg(message.payload) - callback(msg) + if '\0' in message.payload: + msg = _unflatten_envelope(message.payload.split('\0')) + else: + msg = message.payload + + raw_msg = rpc_common.deserialize_msg(msg) + callback(raw_msg) except Exception: LOG.exception(_("Failed to process message... skipping it.")) finally: @@ -306,6 +326,10 @@ class Publisher(object): def send(self, msg, timeout=None): """Send a message""" + if isinstance(msg, dict) and rpc_common._VERSION_KEY in msg: + # Fast-serialize envelopes to avoid Kombu's JSON serialization. + msg = '\0'.join(reduce(lambda x, y: x + y, msg.items())) + if timeout: # # AMQP TTL is in milliseconds when set in the header. |
