summaryrefslogtreecommitdiffstats
path: root/openstack/common
diff options
context:
space:
mode:
Diffstat (limited to 'openstack/common')
-rw-r--r--openstack/common/rpc/impl_zmq.py93
1 files changed, 67 insertions, 26 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index 822b1b5..f011955 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -216,12 +216,18 @@ class ZmqClient(object):
socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind)
- def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
+ def cast(self, msg_id, topic, data, envelope=False):
msg_id = msg_id or 0
- if serialize:
- data = rpc_common.serialize_msg(data, force_envelope)
- self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data))))
+ if not (envelope or rpc_common._SEND_RPC_ENVELOPE):
+ self.outq.send(map(bytes,
+ (msg_id, topic, 'cast', _serialize(data))))
+ return
+
+ rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
+ zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
+ self.outq.send(map(bytes,
+ (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
def close(self):
self.outq.close()
@@ -320,7 +326,7 @@ class ConsumerBase(object):
else:
return [result]
- def process(self, style, target, proxy, ctx, data):
+ def process(self, proxy, ctx, data):
data.setdefault('version', None)
data.setdefault('args', {})
@@ -432,7 +438,7 @@ class ZmqProxy(ZmqBaseReactor):
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
- msg_id, topic, style, in_msg = data
+ topic = data[1]
topic = topic.split('.', 1)[0]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
@@ -520,6 +526,21 @@ class ZmqProxy(ZmqBaseReactor):
super(ZmqProxy, self).consume_in_thread()
+def unflatten_envelope(packenv):
+ """Unflattens the RPC envelope.
+ Takes a list and returns a dictionary.
+ i.e. [1,2,3,4] => {1: 2, 3: 4}
+ """
+ i = iter(packenv)
+ h = {}
+ try:
+ while True:
+ k = i.next()
+ h[k] = i.next()
+ except StopIteration:
+ return h
+
+
class ZmqReactor(ZmqBaseReactor):
"""
A consumer class implementing a
@@ -540,15 +561,27 @@ class ZmqReactor(ZmqBaseReactor):
self.mapping[sock].send(data)
return
- msg_id, topic, style, in_msg = data
+ proxy = self.proxies[sock]
- ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
- ctx = RpcContext.unmarshal(ctx)
+ if data[2] == 'cast': # Legacy protocol
+ packenv = data[3]
- proxy = self.proxies[sock]
+ ctx, msg = _deserialize(packenv)
+ request = rpc_common.deserialize_msg(msg)
+ ctx = RpcContext.unmarshal(ctx)
+ elif data[2] == 'impl_zmq_v2':
+ packenv = data[4:]
- self.pool.spawn_n(self.process, style, topic,
- proxy, ctx, request)
+ msg = unflatten_envelope(packenv)
+ request = rpc_common.deserialize_msg(msg)
+
+ # Unmarshal only after verifying the message.
+ ctx = RpcContext.unmarshal(data[3])
+ else:
+ LOG.error(_("ZMQ Envelope version unsupported or unknown."))
+ return
+
+ self.pool.spawn_n(self.process, proxy, ctx, request)
class Connection(rpc_common.Connection):
@@ -593,8 +626,8 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread()
-def _cast(addr, context, topic, msg, timeout=None, serialize=True,
- force_envelope=False, _msg_id=None):
+def _cast(addr, context, topic, msg, timeout=None, envelope=False,
+ _msg_id=None):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@@ -603,7 +636,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True,
conn = ZmqClient(addr)
# assumes cast can't return an exception
- conn.cast(_msg_id, topic, payload, serialize, force_envelope)
+ conn.cast(_msg_id, topic, payload, envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
@@ -612,7 +645,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True,
def _call(addr, context, topic, msg, timeout=None,
- serialize=True, force_envelope=False):
+ envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
@@ -647,15 +680,24 @@ def _call(addr, context, topic, msg, timeout=None,
)
LOG.debug(_("Sending cast"))
- _cast(addr, context, topic, payload,
- serialize=serialize, force_envelope=force_envelope)
+ _cast(addr, context, topic, payload, envelope)
LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
- responses = _deserialize(msg[-1])[-1]['args']['response']
+
+ if msg[2] == 'cast': # Legacy version
+ raw_msg = _deserialize(msg[-1])[-1]
+ elif msg[2] == 'impl_zmq_v2':
+ rpc_envelope = unflatten_envelope(msg[4:])
+ raw_msg = rpc_common.deserialize_msg(rpc_envelope)
+ else:
+ raise rpc_common.UnsupportedRpcEnvelopeVersion(
+ _("Unsupported or unknown ZMQ envelope returned."))
+
+ responses = raw_msg['args']['response']
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")
@@ -676,8 +718,8 @@ def _call(addr, context, topic, msg, timeout=None,
return responses[-1]
-def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
- force_envelope=False, _msg_id=None):
+def _multi_send(method, context, topic, msg, timeout=None,
+ envelope=False, _msg_id=None):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
@@ -703,11 +745,11 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
- _topic, msg, timeout, serialize,
- force_envelope, _msg_id)
+ _topic, msg, timeout, envelope,
+ _msg_id)
return
return method(_addr, context, _topic, msg, timeout,
- serialize, force_envelope)
+ envelope)
def create_connection(conf, new=True):
@@ -746,8 +788,7 @@ def notify(conf, context, topic, msg, **kwargs):
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic.replace('.', '-')
- kwargs['serialize'] = kwargs.pop('envelope')
- kwargs['force_envelope'] = True
+ kwargs['envelope'] = kwargs.get('envelope', True)
cast(conf, context, topic, msg, **kwargs)