summaryrefslogtreecommitdiffstats
path: root/openstack/common/rpc
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-02-05 16:23:23 +0000
committerGerrit Code Review <review@openstack.org>2013-02-05 16:23:23 +0000
commitad4192ad1ecfab5a63ce126bfb5defda7dcb1796 (patch)
treeee325f2c1d353add3bc5bb36e4809639a624b1cb /openstack/common/rpc
parent04ab97e0e987712678b9ce96f54c41b161490dd5 (diff)
parentb51a7241db53d87d780849563b99b5eee41761ba (diff)
downloadoslo-ad4192ad1ecfab5a63ce126bfb5defda7dcb1796.tar.gz
oslo-ad4192ad1ecfab5a63ce126bfb5defda7dcb1796.tar.xz
oslo-ad4192ad1ecfab5a63ce126bfb5defda7dcb1796.zip
Merge "Only unpack replies on caller."
Diffstat (limited to 'openstack/common/rpc')
-rw-r--r--openstack/common/rpc/impl_zmq.py40
1 files changed, 18 insertions, 22 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index db88c38..cd974e5 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -217,6 +217,8 @@ class ZmqClient(object):
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
+ msg_id = msg_id or 0
+
if serialize:
data = rpc_common.serialize_msg(data, force_envelope)
self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data))))
@@ -293,13 +295,13 @@ class InternalContext(object):
ctx.replies)
LOG.debug(_("Sending reply"))
- cast(CONF, ctx, topic, {
+ _multi_send(_cast, ctx, topic, {
'method': '-process_reply',
'args': {
- 'msg_id': msg_id,
+ 'msg_id': msg_id, # Include for Folsom compat.
'response': response
}
- })
+ }, _msg_id=msg_id)
class ConsumerBase(object):
@@ -435,16 +437,8 @@ class ZmqProxy(ZmqBaseReactor):
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
- # Handle zmq_replies magic
- if topic.startswith('fanout~'):
- sock_type = zmq.PUB
- elif topic.startswith('zmq_replies'):
+ if topic.startswith('fanout~') or topic.startswith('zmq_replies'):
sock_type = zmq.PUB
- inside = rpc_common.deserialize_msg(_deserialize(in_msg))
- msg_id = inside[-1]['args']['msg_id']
- response = inside[-1]['args']['response']
- LOG.debug(_("->response->%s"), response)
- data = map(bytes, (msg_id, _serialize(response)))
else:
sock_type = zmq.PUSH
@@ -599,8 +593,8 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread()
-def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
- force_envelope=False):
+def _cast(addr, context, topic, msg, timeout=None, serialize=True,
+ force_envelope=False, _msg_id=None):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@@ -609,7 +603,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
conn = ZmqClient(addr)
# assumes cast can't return an exception
- conn.cast(msg_id, topic, payload, serialize, force_envelope)
+ conn.cast(_msg_id, topic, payload, serialize, force_envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
@@ -617,7 +611,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
conn.close()
-def _call(addr, context, msg_id, topic, msg, timeout=None,
+def _call(addr, context, topic, msg, timeout=None,
serialize=True, force_envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
@@ -653,7 +647,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
)
LOG.debug(_("Sending cast"))
- _cast(addr, context, msg_id, topic, payload,
+ _cast(addr, context, topic, payload,
serialize=serialize, force_envelope=force_envelope)
LOG.debug(_("Cast sent; Waiting reply"))
@@ -661,10 +655,12 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
- responses = _deserialize(msg[-1])
+ responses = _deserialize(msg[-1])[-1]['args']['response']
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")
+ except (IndexError, KeyError):
+ raise RPCException(_("RPC Message Invalid."))
finally:
if 'msg_waiter' in vars():
msg_waiter.close()
@@ -681,7 +677,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
- force_envelope=False):
+ force_envelope=False, _msg_id=None):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
@@ -707,10 +703,10 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
- _topic, _topic, msg, timeout, serialize,
- force_envelope)
+ _topic, msg, timeout, serialize,
+ force_envelope, _msg_id)
return
- return method(_addr, context, _topic, _topic, msg, timeout,
+ return method(_addr, context, _topic, msg, timeout,
serialize, force_envelope)