From 3b848ae38bd087cef263b9072d5da559d4508c73 Mon Sep 17 00:00:00 2001 From: Russell Bryant Date: Wed, 20 Feb 2013 17:02:20 -0500 Subject: Revert "Fast serialization of RPC envelopes for Kombu" This reverts commit 22c497097b0d0bd461be40a7a03290aa0b4179f2. I'm not convinced that this isn't just a micro-optimization when compared to the rest of the work that a given OpenStack service performs as the result of these messages. The implementation is also problematic. It depends on using a '\0' byte as a separator and assumes that '\0' will not exist anywhere else in the message. This seems to be asking for trouble. If future data could ever have a '\0' in it, this will be broken. Further, if a user could get a '\0' in a message directly with user-supplied input, this could result in a security vulnerability. Lastly, this has a significant impact on consumers of notifications that are outside of OpenStack code, which have been the primary use case of notifications (Ceilometer is changing that to a degree). I don't think consumers of notifications should have to implement this deserialization method. Change-Id: Ib3163ca98f568bf9f789d4b64bcc6d72e0fcb459 --- openstack/common/rpc/impl_kombu.py | 28 ++-------------------------- 1 file changed, 2 insertions(+), 26 deletions(-) (limited to 'openstack') diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 91d81b0..b63edea 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -98,21 +98,6 @@ cfg.CONF.register_opts(kombu_opts) LOG = rpc_common.LOG -def _unflatten_envelope(packenv): - """Unflattens the RPC envelope. - Takes a list and returns a dictionary. - i.e. [1,2,3,4] => {1: 2, 3: 4} - """ - i = iter(packenv) - h = {} - try: - while True: - k = i.next() - h[k] = i.next() - except StopIteration: - return h - - def _get_queue_arguments(conf): """Construct the arguments for declaring a queue. @@ -178,13 +163,8 @@ class ConsumerBase(object): def _callback(raw_message): message = self.channel.message_to_python(raw_message) try: - if '\0' in message.payload: - msg = _unflatten_envelope(message.payload.split('\0')) - else: - msg = message.payload - - raw_msg = rpc_common.deserialize_msg(msg) - callback(raw_msg) + msg = rpc_common.deserialize_msg(message.payload) + callback(msg) except Exception: LOG.exception(_("Failed to process message... skipping it.")) finally: @@ -327,10 +307,6 @@ class Publisher(object): def send(self, msg, timeout=None): """Send a message""" - if isinstance(msg, dict) and rpc_common._VERSION_KEY in msg: - # Fast-serialize envelopes to avoid Kombu's JSON serialization. - msg = '\0'.join(reduce(lambda x, y: x + y, msg.items())) - if timeout: # # AMQP TTL is in milliseconds when set in the header. -- cgit