diff options
author | Eric Windisch <eric@cloudscaling.com> | 2013-02-12 11:46:24 -0500 |
---|---|---|
committer | Eric Windisch <eric@cloudscaling.com> | 2013-02-19 14:47:38 -0500 |
commit | f1e5d569b6c9ceb6d7a4b338db9186e4f9c2fb7b (patch) | |
tree | 784e131dc3bdd1cbce62c5b0d3516a6a70c93e23 /openstack/common/rpc/impl_zmq.py | |
parent | 83162508572a494a169a319651e1d75d582618d7 (diff) | |
download | oslo-f1e5d569b6c9ceb6d7a4b338db9186e4f9c2fb7b.tar.gz oslo-f1e5d569b6c9ceb6d7a4b338db9186e4f9c2fb7b.tar.xz oslo-f1e5d569b6c9ceb6d7a4b338db9186e4f9c2fb7b.zip |
Support RPC envelopes in impl_zmq
This patch began as a set
of tests verifying the functionality of
sending and receiving RPC envelopes when
using impl_zmq. It was discovered that
when enabled, RPC envelopes were not
actually working,
The ZeroMQ driver includes its own envelopes.
This patch introduce versioning to that
envelope, eliminating the previously reserved
'style' field.
A new iteration of the zeromq-envelope is
introduced, 'impl_zmq_v2'. It specifies
that the zeromq-envelope should be followed
by an unpacked array representing key value
pairs of the standard RPC Envelope.
Because the key-values of the RPC Envelope
can be successfully transformed with bytes(),
this prevents the need to double-serialize
the content traversing the message bus.
Also removes some unused imports.
Closes bug 1123709
Closes bug 1055446
Change-Id: Ib04e3d092c9596146f1048d3502ac248496d313b
Diffstat (limited to 'openstack/common/rpc/impl_zmq.py')
-rw-r--r-- | openstack/common/rpc/impl_zmq.py | 93 |
1 files changed, 67 insertions, 26 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index 822b1b5..f011955 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -216,12 +216,18 @@ class ZmqClient(object): socket_type = zmq.PUSH self.outq = ZmqSocket(addr, socket_type, bind=bind) - def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): + def cast(self, msg_id, topic, data, envelope=False): msg_id = msg_id or 0 - if serialize: - data = rpc_common.serialize_msg(data, force_envelope) - self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data)))) + if not (envelope or rpc_common._SEND_RPC_ENVELOPE): + self.outq.send(map(bytes, + (msg_id, topic, 'cast', _serialize(data)))) + return + + rpc_envelope = rpc_common.serialize_msg(data[1], envelope) + zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items()) + self.outq.send(map(bytes, + (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg)) def close(self): self.outq.close() @@ -320,7 +326,7 @@ class ConsumerBase(object): else: return [result] - def process(self, style, target, proxy, ctx, data): + def process(self, proxy, ctx, data): data.setdefault('version', None) data.setdefault('args', {}) @@ -432,7 +438,7 @@ class ZmqProxy(ZmqBaseReactor): #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() - msg_id, topic, style, in_msg = data + topic = data[1] topic = topic.split('.', 1)[0] LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) @@ -520,6 +526,21 @@ class ZmqProxy(ZmqBaseReactor): super(ZmqProxy, self).consume_in_thread() +def unflatten_envelope(packenv): + """Unflattens the RPC envelope. + Takes a list and returns a dictionary. + i.e. [1,2,3,4] => {1: 2, 3: 4} + """ + i = iter(packenv) + h = {} + try: + while True: + k = i.next() + h[k] = i.next() + except StopIteration: + return h + + class ZmqReactor(ZmqBaseReactor): """ A consumer class implementing a @@ -540,15 +561,27 @@ class ZmqReactor(ZmqBaseReactor): self.mapping[sock].send(data) return - msg_id, topic, style, in_msg = data + proxy = self.proxies[sock] - ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg)) - ctx = RpcContext.unmarshal(ctx) + if data[2] == 'cast': # Legacy protocol + packenv = data[3] - proxy = self.proxies[sock] + ctx, msg = _deserialize(packenv) + request = rpc_common.deserialize_msg(msg) + ctx = RpcContext.unmarshal(ctx) + elif data[2] == 'impl_zmq_v2': + packenv = data[4:] - self.pool.spawn_n(self.process, style, topic, - proxy, ctx, request) + msg = unflatten_envelope(packenv) + request = rpc_common.deserialize_msg(msg) + + # Unmarshal only after verifying the message. + ctx = RpcContext.unmarshal(data[3]) + else: + LOG.error(_("ZMQ Envelope version unsupported or unknown.")) + return + + self.pool.spawn_n(self.process, proxy, ctx, request) class Connection(rpc_common.Connection): @@ -593,8 +626,8 @@ class Connection(rpc_common.Connection): self.reactor.consume_in_thread() -def _cast(addr, context, topic, msg, timeout=None, serialize=True, - force_envelope=False, _msg_id=None): +def _cast(addr, context, topic, msg, timeout=None, envelope=False, + _msg_id=None): timeout_cast = timeout or CONF.rpc_cast_timeout payload = [RpcContext.marshal(context), msg] @@ -603,7 +636,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True, conn = ZmqClient(addr) # assumes cast can't return an exception - conn.cast(_msg_id, topic, payload, serialize, force_envelope) + conn.cast(_msg_id, topic, payload, envelope) except zmq.ZMQError: raise RPCException("Cast failed. ZMQ Socket Exception") finally: @@ -612,7 +645,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True, def _call(addr, context, topic, msg, timeout=None, - serialize=True, force_envelope=False): + envelope=False): # timeout_response is how long we wait for a response timeout = timeout or CONF.rpc_response_timeout @@ -647,15 +680,24 @@ def _call(addr, context, topic, msg, timeout=None, ) LOG.debug(_("Sending cast")) - _cast(addr, context, topic, payload, - serialize=serialize, force_envelope=force_envelope) + _cast(addr, context, topic, payload, envelope) LOG.debug(_("Cast sent; Waiting reply")) # Blocks until receives reply msg = msg_waiter.recv() LOG.debug(_("Received message: %s"), msg) LOG.debug(_("Unpacking response")) - responses = _deserialize(msg[-1])[-1]['args']['response'] + + if msg[2] == 'cast': # Legacy version + raw_msg = _deserialize(msg[-1])[-1] + elif msg[2] == 'impl_zmq_v2': + rpc_envelope = unflatten_envelope(msg[4:]) + raw_msg = rpc_common.deserialize_msg(rpc_envelope) + else: + raise rpc_common.UnsupportedRpcEnvelopeVersion( + _("Unsupported or unknown ZMQ envelope returned.")) + + responses = raw_msg['args']['response'] # ZMQError trumps the Timeout error. except zmq.ZMQError: raise RPCException("ZMQ Socket Error") @@ -676,8 +718,8 @@ def _call(addr, context, topic, msg, timeout=None, return responses[-1] -def _multi_send(method, context, topic, msg, timeout=None, serialize=True, - force_envelope=False, _msg_id=None): +def _multi_send(method, context, topic, msg, timeout=None, + envelope=False, _msg_id=None): """ Wraps the sending of messages, dispatches to the matchmaker and sends @@ -703,11 +745,11 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True, if method.__name__ == '_cast': eventlet.spawn_n(method, _addr, context, - _topic, msg, timeout, serialize, - force_envelope, _msg_id) + _topic, msg, timeout, envelope, + _msg_id) return return method(_addr, context, _topic, msg, timeout, - serialize, force_envelope) + envelope) def create_connection(conf, new=True): @@ -746,8 +788,7 @@ def notify(conf, context, topic, msg, **kwargs): # NOTE(ewindisch): dot-priority in rpc notifier does not # work with our assumptions. topic.replace('.', '-') - kwargs['serialize'] = kwargs.pop('envelope') - kwargs['force_envelope'] = True + kwargs['envelope'] = kwargs.get('envelope', True) cast(conf, context, topic, msg, **kwargs) |