summaryrefslogtreecommitdiffstats
path: root/openstack/common/rpc
diff options
context:
space:
mode:
authorEric Windisch <eric@cloudscaling.com>2013-02-12 11:46:24 -0500
committerEric Windisch <eric@cloudscaling.com>2013-02-19 14:47:38 -0500
commitf1e5d569b6c9ceb6d7a4b338db9186e4f9c2fb7b (patch)
tree784e131dc3bdd1cbce62c5b0d3516a6a70c93e23 /openstack/common/rpc
parent83162508572a494a169a319651e1d75d582618d7 (diff)
downloadoslo-f1e5d569b6c9ceb6d7a4b338db9186e4f9c2fb7b.tar.gz
oslo-f1e5d569b6c9ceb6d7a4b338db9186e4f9c2fb7b.tar.xz
oslo-f1e5d569b6c9ceb6d7a4b338db9186e4f9c2fb7b.zip
Support RPC envelopes in impl_zmq
This patch began as a set of tests verifying the functionality of sending and receiving RPC envelopes when using impl_zmq. It was discovered that when enabled, RPC envelopes were not actually working, The ZeroMQ driver includes its own envelopes. This patch introduce versioning to that envelope, eliminating the previously reserved 'style' field. A new iteration of the zeromq-envelope is introduced, 'impl_zmq_v2'. It specifies that the zeromq-envelope should be followed by an unpacked array representing key value pairs of the standard RPC Envelope. Because the key-values of the RPC Envelope can be successfully transformed with bytes(), this prevents the need to double-serialize the content traversing the message bus. Also removes some unused imports. Closes bug 1123709 Closes bug 1055446 Change-Id: Ib04e3d092c9596146f1048d3502ac248496d313b
Diffstat (limited to 'openstack/common/rpc')
-rw-r--r--openstack/common/rpc/impl_zmq.py93
1 files changed, 67 insertions, 26 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index 822b1b5..f011955 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -216,12 +216,18 @@ class ZmqClient(object):
socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind)
- def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
+ def cast(self, msg_id, topic, data, envelope=False):
msg_id = msg_id or 0
- if serialize:
- data = rpc_common.serialize_msg(data, force_envelope)
- self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data))))
+ if not (envelope or rpc_common._SEND_RPC_ENVELOPE):
+ self.outq.send(map(bytes,
+ (msg_id, topic, 'cast', _serialize(data))))
+ return
+
+ rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
+ zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
+ self.outq.send(map(bytes,
+ (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
def close(self):
self.outq.close()
@@ -320,7 +326,7 @@ class ConsumerBase(object):
else:
return [result]
- def process(self, style, target, proxy, ctx, data):
+ def process(self, proxy, ctx, data):
data.setdefault('version', None)
data.setdefault('args', {})
@@ -432,7 +438,7 @@ class ZmqProxy(ZmqBaseReactor):
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
- msg_id, topic, style, in_msg = data
+ topic = data[1]
topic = topic.split('.', 1)[0]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
@@ -520,6 +526,21 @@ class ZmqProxy(ZmqBaseReactor):
super(ZmqProxy, self).consume_in_thread()
+def unflatten_envelope(packenv):
+ """Unflattens the RPC envelope.
+ Takes a list and returns a dictionary.
+ i.e. [1,2,3,4] => {1: 2, 3: 4}
+ """
+ i = iter(packenv)
+ h = {}
+ try:
+ while True:
+ k = i.next()
+ h[k] = i.next()
+ except StopIteration:
+ return h
+
+
class ZmqReactor(ZmqBaseReactor):
"""
A consumer class implementing a
@@ -540,15 +561,27 @@ class ZmqReactor(ZmqBaseReactor):
self.mapping[sock].send(data)
return
- msg_id, topic, style, in_msg = data
+ proxy = self.proxies[sock]
- ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
- ctx = RpcContext.unmarshal(ctx)
+ if data[2] == 'cast': # Legacy protocol
+ packenv = data[3]
- proxy = self.proxies[sock]
+ ctx, msg = _deserialize(packenv)
+ request = rpc_common.deserialize_msg(msg)
+ ctx = RpcContext.unmarshal(ctx)
+ elif data[2] == 'impl_zmq_v2':
+ packenv = data[4:]
- self.pool.spawn_n(self.process, style, topic,
- proxy, ctx, request)
+ msg = unflatten_envelope(packenv)
+ request = rpc_common.deserialize_msg(msg)
+
+ # Unmarshal only after verifying the message.
+ ctx = RpcContext.unmarshal(data[3])
+ else:
+ LOG.error(_("ZMQ Envelope version unsupported or unknown."))
+ return
+
+ self.pool.spawn_n(self.process, proxy, ctx, request)
class Connection(rpc_common.Connection):
@@ -593,8 +626,8 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread()
-def _cast(addr, context, topic, msg, timeout=None, serialize=True,
- force_envelope=False, _msg_id=None):
+def _cast(addr, context, topic, msg, timeout=None, envelope=False,
+ _msg_id=None):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@@ -603,7 +636,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True,
conn = ZmqClient(addr)
# assumes cast can't return an exception
- conn.cast(_msg_id, topic, payload, serialize, force_envelope)
+ conn.cast(_msg_id, topic, payload, envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
@@ -612,7 +645,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True,
def _call(addr, context, topic, msg, timeout=None,
- serialize=True, force_envelope=False):
+ envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
@@ -647,15 +680,24 @@ def _call(addr, context, topic, msg, timeout=None,
)
LOG.debug(_("Sending cast"))
- _cast(addr, context, topic, payload,
- serialize=serialize, force_envelope=force_envelope)
+ _cast(addr, context, topic, payload, envelope)
LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
- responses = _deserialize(msg[-1])[-1]['args']['response']
+
+ if msg[2] == 'cast': # Legacy version
+ raw_msg = _deserialize(msg[-1])[-1]
+ elif msg[2] == 'impl_zmq_v2':
+ rpc_envelope = unflatten_envelope(msg[4:])
+ raw_msg = rpc_common.deserialize_msg(rpc_envelope)
+ else:
+ raise rpc_common.UnsupportedRpcEnvelopeVersion(
+ _("Unsupported or unknown ZMQ envelope returned."))
+
+ responses = raw_msg['args']['response']
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")
@@ -676,8 +718,8 @@ def _call(addr, context, topic, msg, timeout=None,
return responses[-1]
-def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
- force_envelope=False, _msg_id=None):
+def _multi_send(method, context, topic, msg, timeout=None,
+ envelope=False, _msg_id=None):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
@@ -703,11 +745,11 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
- _topic, msg, timeout, serialize,
- force_envelope, _msg_id)
+ _topic, msg, timeout, envelope,
+ _msg_id)
return
return method(_addr, context, _topic, msg, timeout,
- serialize, force_envelope)
+ envelope)
def create_connection(conf, new=True):
@@ -746,8 +788,7 @@ def notify(conf, context, topic, msg, **kwargs):
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic.replace('.', '-')
- kwargs['serialize'] = kwargs.pop('envelope')
- kwargs['force_envelope'] = True
+ kwargs['envelope'] = kwargs.get('envelope', True)
cast(conf, context, topic, msg, **kwargs)