From b51a7241db53d87d780849563b99b5eee41761ba Mon Sep 17 00:00:00 2001 From: Eric Windisch Date: Thu, 31 Jan 2013 14:43:29 -0500 Subject: Only unpack replies on caller. Opening the messages is unnecessary in ZmqProxy, given that the msg_id is already in the (zmq-specific) message envelope. Additionally, by not opening the messages, we save a bit of complexity. This avoids having to deserialize and reserialize messages in ZmqProxy, a necessary improvement for trusted-messaging and should also have a markable performance benefit. Also cleans up msg_id handling a bit. Change-Id: I31f365b9e1040503206b1d8320a0266a8121c821 --- openstack/common/rpc/impl_zmq.py | 40 ++++++++++++++++++---------------------- 1 file changed, 18 insertions(+), 22 deletions(-) (limited to 'openstack/common/rpc') diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index e788078..f2234d5 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -218,6 +218,8 @@ class ZmqClient(object): self.outq = ZmqSocket(addr, socket_type, bind=bind) def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): + msg_id = msg_id or 0 + if serialize: data = rpc_common.serialize_msg(data, force_envelope) self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data)))) @@ -294,13 +296,13 @@ class InternalContext(object): ctx.replies) LOG.debug(_("Sending reply")) - cast(CONF, ctx, topic, { + _multi_send(_cast, ctx, topic, { 'method': '-process_reply', 'args': { - 'msg_id': msg_id, + 'msg_id': msg_id, # Include for Folsom compat. 'response': response } - }) + }, _msg_id=msg_id) class ConsumerBase(object): @@ -435,16 +437,8 @@ class ZmqProxy(ZmqBaseReactor): LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) - # Handle zmq_replies magic - if topic.startswith('fanout~'): - sock_type = zmq.PUB - elif topic.startswith('zmq_replies'): + if topic.startswith('fanout~') or topic.startswith('zmq_replies'): sock_type = zmq.PUB - inside = rpc_common.deserialize_msg(_deserialize(in_msg)) - msg_id = inside[-1]['args']['msg_id'] - response = inside[-1]['args']['response'] - LOG.debug(_("->response->%s"), response) - data = map(bytes, (msg_id, _serialize(response))) else: sock_type = zmq.PUSH @@ -599,8 +593,8 @@ class Connection(rpc_common.Connection): self.reactor.consume_in_thread() -def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, - force_envelope=False): +def _cast(addr, context, topic, msg, timeout=None, serialize=True, + force_envelope=False, _msg_id=None): timeout_cast = timeout or CONF.rpc_cast_timeout payload = [RpcContext.marshal(context), msg] @@ -609,7 +603,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, conn = ZmqClient(addr) # assumes cast can't return an exception - conn.cast(msg_id, topic, payload, serialize, force_envelope) + conn.cast(_msg_id, topic, payload, serialize, force_envelope) except zmq.ZMQError: raise RPCException("Cast failed. ZMQ Socket Exception") finally: @@ -617,7 +611,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, conn.close() -def _call(addr, context, msg_id, topic, msg, timeout=None, +def _call(addr, context, topic, msg, timeout=None, serialize=True, force_envelope=False): # timeout_response is how long we wait for a response timeout = timeout or CONF.rpc_response_timeout @@ -653,7 +647,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None, ) LOG.debug(_("Sending cast")) - _cast(addr, context, msg_id, topic, payload, + _cast(addr, context, topic, payload, serialize=serialize, force_envelope=force_envelope) LOG.debug(_("Cast sent; Waiting reply")) @@ -661,10 +655,12 @@ def _call(addr, context, msg_id, topic, msg, timeout=None, msg = msg_waiter.recv() LOG.debug(_("Received message: %s"), msg) LOG.debug(_("Unpacking response")) - responses = _deserialize(msg[-1]) + responses = _deserialize(msg[-1])[-1]['args']['response'] # ZMQError trumps the Timeout error. except zmq.ZMQError: raise RPCException("ZMQ Socket Error") + except (IndexError, KeyError): + raise RPCException(_("RPC Message Invalid.")) finally: if 'msg_waiter' in vars(): msg_waiter.close() @@ -681,7 +677,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None, def _multi_send(method, context, topic, msg, timeout=None, serialize=True, - force_envelope=False): + force_envelope=False, _msg_id=None): """ Wraps the sending of messages, dispatches to the matchmaker and sends @@ -707,10 +703,10 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True, if method.__name__ == '_cast': eventlet.spawn_n(method, _addr, context, - _topic, _topic, msg, timeout, serialize, - force_envelope) + _topic, msg, timeout, serialize, + force_envelope, _msg_id) return - return method(_addr, context, _topic, _topic, msg, timeout, + return method(_addr, context, _topic, msg, timeout, serialize, force_envelope) -- cgit