summaryrefslogtreecommitdiffstats
path: root/openstack/common
diff options
context:
space:
mode:
authorEric Windisch <eric@cloudscaling.com>2013-01-31 14:43:29 -0500
committerEric Windisch <eric@cloudscaling.com>2013-01-31 15:14:21 -0500
commitb51a7241db53d87d780849563b99b5eee41761ba (patch)
treeedbefe8a5d35bb5db2c8e85715d3c3aac135f5f3 /openstack/common
parent4552f1064fa50228ecdc4b5cf43c24f5401040b7 (diff)
downloadoslo-b51a7241db53d87d780849563b99b5eee41761ba.tar.gz
oslo-b51a7241db53d87d780849563b99b5eee41761ba.tar.xz
oslo-b51a7241db53d87d780849563b99b5eee41761ba.zip
Only unpack replies on caller.
Opening the messages is unnecessary in ZmqProxy, given that the msg_id is already in the (zmq-specific) message envelope. Additionally, by not opening the messages, we save a bit of complexity. This avoids having to deserialize and reserialize messages in ZmqProxy, a necessary improvement for trusted-messaging and should also have a markable performance benefit. Also cleans up msg_id handling a bit. Change-Id: I31f365b9e1040503206b1d8320a0266a8121c821
Diffstat (limited to 'openstack/common')
-rw-r--r--openstack/common/rpc/impl_zmq.py40
1 files changed, 18 insertions, 22 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index e788078..f2234d5 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -218,6 +218,8 @@ class ZmqClient(object):
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
+ msg_id = msg_id or 0
+
if serialize:
data = rpc_common.serialize_msg(data, force_envelope)
self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data))))
@@ -294,13 +296,13 @@ class InternalContext(object):
ctx.replies)
LOG.debug(_("Sending reply"))
- cast(CONF, ctx, topic, {
+ _multi_send(_cast, ctx, topic, {
'method': '-process_reply',
'args': {
- 'msg_id': msg_id,
+ 'msg_id': msg_id, # Include for Folsom compat.
'response': response
}
- })
+ }, _msg_id=msg_id)
class ConsumerBase(object):
@@ -435,16 +437,8 @@ class ZmqProxy(ZmqBaseReactor):
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
- # Handle zmq_replies magic
- if topic.startswith('fanout~'):
- sock_type = zmq.PUB
- elif topic.startswith('zmq_replies'):
+ if topic.startswith('fanout~') or topic.startswith('zmq_replies'):
sock_type = zmq.PUB
- inside = rpc_common.deserialize_msg(_deserialize(in_msg))
- msg_id = inside[-1]['args']['msg_id']
- response = inside[-1]['args']['response']
- LOG.debug(_("->response->%s"), response)
- data = map(bytes, (msg_id, _serialize(response)))
else:
sock_type = zmq.PUSH
@@ -599,8 +593,8 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread()
-def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
- force_envelope=False):
+def _cast(addr, context, topic, msg, timeout=None, serialize=True,
+ force_envelope=False, _msg_id=None):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@@ -609,7 +603,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
conn = ZmqClient(addr)
# assumes cast can't return an exception
- conn.cast(msg_id, topic, payload, serialize, force_envelope)
+ conn.cast(_msg_id, topic, payload, serialize, force_envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
@@ -617,7 +611,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
conn.close()
-def _call(addr, context, msg_id, topic, msg, timeout=None,
+def _call(addr, context, topic, msg, timeout=None,
serialize=True, force_envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
@@ -653,7 +647,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
)
LOG.debug(_("Sending cast"))
- _cast(addr, context, msg_id, topic, payload,
+ _cast(addr, context, topic, payload,
serialize=serialize, force_envelope=force_envelope)
LOG.debug(_("Cast sent; Waiting reply"))
@@ -661,10 +655,12 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
- responses = _deserialize(msg[-1])
+ responses = _deserialize(msg[-1])[-1]['args']['response']
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")
+ except (IndexError, KeyError):
+ raise RPCException(_("RPC Message Invalid."))
finally:
if 'msg_waiter' in vars():
msg_waiter.close()
@@ -681,7 +677,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
- force_envelope=False):
+ force_envelope=False, _msg_id=None):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
@@ -707,10 +703,10 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
- _topic, _topic, msg, timeout, serialize,
- force_envelope)
+ _topic, msg, timeout, serialize,
+ force_envelope, _msg_id)
return
- return method(_addr, context, _topic, _topic, msg, timeout,
+ return method(_addr, context, _topic, msg, timeout,
serialize, force_envelope)