summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEric Windisch <eric@cloudscaling.com>2013-02-12 11:46:24 -0500
committerEric Windisch <eric@cloudscaling.com>2013-02-19 14:47:38 -0500
commitf1e5d569b6c9ceb6d7a4b338db9186e4f9c2fb7b (patch)
tree784e131dc3bdd1cbce62c5b0d3516a6a70c93e23
parent83162508572a494a169a319651e1d75d582618d7 (diff)
Support RPC envelopes in impl_zmq
This patch began as a set of tests verifying the functionality of sending and receiving RPC envelopes when using impl_zmq. It was discovered that when enabled, RPC envelopes were not actually working, The ZeroMQ driver includes its own envelopes. This patch introduce versioning to that envelope, eliminating the previously reserved 'style' field. A new iteration of the zeromq-envelope is introduced, 'impl_zmq_v2'. It specifies that the zeromq-envelope should be followed by an unpacked array representing key value pairs of the standard RPC Envelope. Because the key-values of the RPC Envelope can be successfully transformed with bytes(), this prevents the need to double-serialize the content traversing the message bus. Also removes some unused imports. Closes bug 1123709 Closes bug 1055446 Change-Id: Ib04e3d092c9596146f1048d3502ac248496d313b
-rw-r--r--openstack/common/rpc/impl_zmq.py93
-rw-r--r--tests/unit/rpc/test_zmq.py12
2 files changed, 77 insertions, 28 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index 822b1b5..f011955 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -216,12 +216,18 @@ class ZmqClient(object):
socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind)
- def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
+ def cast(self, msg_id, topic, data, envelope=False):
msg_id = msg_id or 0
- if serialize:
- data = rpc_common.serialize_msg(data, force_envelope)
- self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data))))
+ if not (envelope or rpc_common._SEND_RPC_ENVELOPE):
+ self.outq.send(map(bytes,
+ (msg_id, topic, 'cast', _serialize(data))))
+ return
+
+ rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
+ zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
+ self.outq.send(map(bytes,
+ (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
def close(self):
self.outq.close()
@@ -320,7 +326,7 @@ class ConsumerBase(object):
else:
return [result]
- def process(self, style, target, proxy, ctx, data):
+ def process(self, proxy, ctx, data):
data.setdefault('version', None)
data.setdefault('args', {})
@@ -432,7 +438,7 @@ class ZmqProxy(ZmqBaseReactor):
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
- msg_id, topic, style, in_msg = data
+ topic = data[1]
topic = topic.split('.', 1)[0]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
@@ -520,6 +526,21 @@ class ZmqProxy(ZmqBaseReactor):
super(ZmqProxy, self).consume_in_thread()
+def unflatten_envelope(packenv):
+ """Unflattens the RPC envelope.
+ Takes a list and returns a dictionary.
+ i.e. [1,2,3,4] => {1: 2, 3: 4}
+ """
+ i = iter(packenv)
+ h = {}
+ try:
+ while True:
+ k = i.next()
+ h[k] = i.next()
+ except StopIteration:
+ return h
+
+
class ZmqReactor(ZmqBaseReactor):
"""
A consumer class implementing a
@@ -540,15 +561,27 @@ class ZmqReactor(ZmqBaseReactor):
self.mapping[sock].send(data)
return
- msg_id, topic, style, in_msg = data
+ proxy = self.proxies[sock]
- ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
- ctx = RpcContext.unmarshal(ctx)
+ if data[2] == 'cast': # Legacy protocol
+ packenv = data[3]
- proxy = self.proxies[sock]
+ ctx, msg = _deserialize(packenv)
+ request = rpc_common.deserialize_msg(msg)
+ ctx = RpcContext.unmarshal(ctx)
+ elif data[2] == 'impl_zmq_v2':
+ packenv = data[4:]
- self.pool.spawn_n(self.process, style, topic,
- proxy, ctx, request)
+ msg = unflatten_envelope(packenv)
+ request = rpc_common.deserialize_msg(msg)
+
+ # Unmarshal only after verifying the message.
+ ctx = RpcContext.unmarshal(data[3])
+ else:
+ LOG.error(_("ZMQ Envelope version unsupported or unknown."))
+ return
+
+ self.pool.spawn_n(self.process, proxy, ctx, request)
class Connection(rpc_common.Connection):
@@ -593,8 +626,8 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread()
-def _cast(addr, context, topic, msg, timeout=None, serialize=True,
- force_envelope=False, _msg_id=None):
+def _cast(addr, context, topic, msg, timeout=None, envelope=False,
+ _msg_id=None):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@@ -603,7 +636,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True,
conn = ZmqClient(addr)
# assumes cast can't return an exception
- conn.cast(_msg_id, topic, payload, serialize, force_envelope)
+ conn.cast(_msg_id, topic, payload, envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
@@ -612,7 +645,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True,
def _call(addr, context, topic, msg, timeout=None,
- serialize=True, force_envelope=False):
+ envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
@@ -647,15 +680,24 @@ def _call(addr, context, topic, msg, timeout=None,
)
LOG.debug(_("Sending cast"))
- _cast(addr, context, topic, payload,
- serialize=serialize, force_envelope=force_envelope)
+ _cast(addr, context, topic, payload, envelope)
LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
- responses = _deserialize(msg[-1])[-1]['args']['response']
+
+ if msg[2] == 'cast': # Legacy version
+ raw_msg = _deserialize(msg[-1])[-1]
+ elif msg[2] == 'impl_zmq_v2':
+ rpc_envelope = unflatten_envelope(msg[4:])
+ raw_msg = rpc_common.deserialize_msg(rpc_envelope)
+ else:
+ raise rpc_common.UnsupportedRpcEnvelopeVersion(
+ _("Unsupported or unknown ZMQ envelope returned."))
+
+ responses = raw_msg['args']['response']
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")
@@ -676,8 +718,8 @@ def _call(addr, context, topic, msg, timeout=None,
return responses[-1]
-def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
- force_envelope=False, _msg_id=None):
+def _multi_send(method, context, topic, msg, timeout=None,
+ envelope=False, _msg_id=None):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
@@ -703,11 +745,11 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
- _topic, msg, timeout, serialize,
- force_envelope, _msg_id)
+ _topic, msg, timeout, envelope,
+ _msg_id)
return
return method(_addr, context, _topic, msg, timeout,
- serialize, force_envelope)
+ envelope)
def create_connection(conf, new=True):
@@ -746,8 +788,7 @@ def notify(conf, context, topic, msg, **kwargs):
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic.replace('.', '-')
- kwargs['serialize'] = kwargs.pop('envelope')
- kwargs['force_envelope'] = True
+ kwargs['envelope'] = kwargs.get('envelope', True)
cast(conf, context, topic, msg, **kwargs)
diff --git a/tests/unit/rpc/test_zmq.py b/tests/unit/rpc/test_zmq.py
index 8c9a966..7a1e501 100644
--- a/tests/unit/rpc/test_zmq.py
+++ b/tests/unit/rpc/test_zmq.py
@@ -30,8 +30,7 @@ from oslo.config import cfg
from openstack.common import exception
from openstack.common.gettextutils import _
-from openstack.common import processutils
-from openstack.common import rpc
+from openstack.common.rpc import common as rpc_common
from tests.unit.rpc import common
try:
@@ -110,6 +109,15 @@ class RpcZmqBaseTopicTestCase(_RpcZmqBaseTestCase):
pass
+class RpcZmqEnvelopeEnabledTestCase(_RpcZmqBaseTestCase):
+ """
+ This sends messages with envelopes enabled.
+ """
+ def setUp(self):
+ super(RpcZmqEnvelopeEnabledTestCase, self).setUp()
+ self.stubs.Set(rpc_common, '_SEND_RPC_ENVELOPE', True)
+
+
class RpcZmqDirectTopicTestCase(_RpcZmqBaseTestCase):
"""
Test communication directly to a host,