summaryrefslogtreecommitdiffstats
path: root/openstack/common
diff options
context:
space:
mode:
Diffstat (limited to 'openstack/common')
-rw-r--r--openstack/common/rpc/impl_kombu.py28
1 files changed, 2 insertions, 26 deletions
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index 91d81b0..b63edea 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -98,21 +98,6 @@ cfg.CONF.register_opts(kombu_opts)
LOG = rpc_common.LOG
-def _unflatten_envelope(packenv):
- """Unflattens the RPC envelope.
- Takes a list and returns a dictionary.
- i.e. [1,2,3,4] => {1: 2, 3: 4}
- """
- i = iter(packenv)
- h = {}
- try:
- while True:
- k = i.next()
- h[k] = i.next()
- except StopIteration:
- return h
-
-
def _get_queue_arguments(conf):
"""Construct the arguments for declaring a queue.
@@ -178,13 +163,8 @@ class ConsumerBase(object):
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
try:
- if '\0' in message.payload:
- msg = _unflatten_envelope(message.payload.split('\0'))
- else:
- msg = message.payload
-
- raw_msg = rpc_common.deserialize_msg(msg)
- callback(raw_msg)
+ msg = rpc_common.deserialize_msg(message.payload)
+ callback(msg)
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
@@ -327,10 +307,6 @@ class Publisher(object):
def send(self, msg, timeout=None):
"""Send a message"""
- if isinstance(msg, dict) and rpc_common._VERSION_KEY in msg:
- # Fast-serialize envelopes to avoid Kombu's JSON serialization.
- msg = '\0'.join(reduce(lambda x, y: x + y, msg.items()))
-
if timeout:
#
# AMQP TTL is in milliseconds when set in the header.