diff options
Diffstat (limited to 'openstack/common')
| -rw-r--r-- | openstack/common/rpc/impl_kombu.py | 28 |
1 files changed, 2 insertions, 26 deletions
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 91d81b0..b63edea 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -98,21 +98,6 @@ cfg.CONF.register_opts(kombu_opts) LOG = rpc_common.LOG -def _unflatten_envelope(packenv): - """Unflattens the RPC envelope. - Takes a list and returns a dictionary. - i.e. [1,2,3,4] => {1: 2, 3: 4} - """ - i = iter(packenv) - h = {} - try: - while True: - k = i.next() - h[k] = i.next() - except StopIteration: - return h - - def _get_queue_arguments(conf): """Construct the arguments for declaring a queue. @@ -178,13 +163,8 @@ class ConsumerBase(object): def _callback(raw_message): message = self.channel.message_to_python(raw_message) try: - if '\0' in message.payload: - msg = _unflatten_envelope(message.payload.split('\0')) - else: - msg = message.payload - - raw_msg = rpc_common.deserialize_msg(msg) - callback(raw_msg) + msg = rpc_common.deserialize_msg(message.payload) + callback(msg) except Exception: LOG.exception(_("Failed to process message... skipping it.")) finally: @@ -327,10 +307,6 @@ class Publisher(object): def send(self, msg, timeout=None): """Send a message""" - if isinstance(msg, dict) and rpc_common._VERSION_KEY in msg: - # Fast-serialize envelopes to avoid Kombu's JSON serialization. - msg = '\0'.join(reduce(lambda x, y: x + y, msg.items())) - if timeout: # # AMQP TTL is in milliseconds when set in the header. |
