diff options
Diffstat (limited to 'openstack/common')
| -rw-r--r-- | openstack/common/rpc/impl_zmq.py | 93 |
1 files changed, 67 insertions, 26 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index 822b1b5..f011955 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -216,12 +216,18 @@ class ZmqClient(object): socket_type = zmq.PUSH self.outq = ZmqSocket(addr, socket_type, bind=bind) - def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): + def cast(self, msg_id, topic, data, envelope=False): msg_id = msg_id or 0 - if serialize: - data = rpc_common.serialize_msg(data, force_envelope) - self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data)))) + if not (envelope or rpc_common._SEND_RPC_ENVELOPE): + self.outq.send(map(bytes, + (msg_id, topic, 'cast', _serialize(data)))) + return + + rpc_envelope = rpc_common.serialize_msg(data[1], envelope) + zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items()) + self.outq.send(map(bytes, + (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg)) def close(self): self.outq.close() @@ -320,7 +326,7 @@ class ConsumerBase(object): else: return [result] - def process(self, style, target, proxy, ctx, data): + def process(self, proxy, ctx, data): data.setdefault('version', None) data.setdefault('args', {}) @@ -432,7 +438,7 @@ class ZmqProxy(ZmqBaseReactor): #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() - msg_id, topic, style, in_msg = data + topic = data[1] topic = topic.split('.', 1)[0] LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) @@ -520,6 +526,21 @@ class ZmqProxy(ZmqBaseReactor): super(ZmqProxy, self).consume_in_thread() +def unflatten_envelope(packenv): + """Unflattens the RPC envelope. + Takes a list and returns a dictionary. + i.e. [1,2,3,4] => {1: 2, 3: 4} + """ + i = iter(packenv) + h = {} + try: + while True: + k = i.next() + h[k] = i.next() + except StopIteration: + return h + + class ZmqReactor(ZmqBaseReactor): """ A consumer class implementing a @@ -540,15 +561,27 @@ class ZmqReactor(ZmqBaseReactor): self.mapping[sock].send(data) return - msg_id, topic, style, in_msg = data + proxy = self.proxies[sock] - ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg)) - ctx = RpcContext.unmarshal(ctx) + if data[2] == 'cast': # Legacy protocol + packenv = data[3] - proxy = self.proxies[sock] + ctx, msg = _deserialize(packenv) + request = rpc_common.deserialize_msg(msg) + ctx = RpcContext.unmarshal(ctx) + elif data[2] == 'impl_zmq_v2': + packenv = data[4:] - self.pool.spawn_n(self.process, style, topic, - proxy, ctx, request) + msg = unflatten_envelope(packenv) + request = rpc_common.deserialize_msg(msg) + + # Unmarshal only after verifying the message. + ctx = RpcContext.unmarshal(data[3]) + else: + LOG.error(_("ZMQ Envelope version unsupported or unknown.")) + return + + self.pool.spawn_n(self.process, proxy, ctx, request) class Connection(rpc_common.Connection): @@ -593,8 +626,8 @@ class Connection(rpc_common.Connection): self.reactor.consume_in_thread() -def _cast(addr, context, topic, msg, timeout=None, serialize=True, - force_envelope=False, _msg_id=None): +def _cast(addr, context, topic, msg, timeout=None, envelope=False, + _msg_id=None): timeout_cast = timeout or CONF.rpc_cast_timeout payload = [RpcContext.marshal(context), msg] @@ -603,7 +636,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True, conn = ZmqClient(addr) # assumes cast can't return an exception - conn.cast(_msg_id, topic, payload, serialize, force_envelope) + conn.cast(_msg_id, topic, payload, envelope) except zmq.ZMQError: raise RPCException("Cast failed. ZMQ Socket Exception") finally: @@ -612,7 +645,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True, def _call(addr, context, topic, msg, timeout=None, - serialize=True, force_envelope=False): + envelope=False): # timeout_response is how long we wait for a response timeout = timeout or CONF.rpc_response_timeout @@ -647,15 +680,24 @@ def _call(addr, context, topic, msg, timeout=None, ) LOG.debug(_("Sending cast")) - _cast(addr, context, topic, payload, - serialize=serialize, force_envelope=force_envelope) + _cast(addr, context, topic, payload, envelope) LOG.debug(_("Cast sent; Waiting reply")) # Blocks until receives reply msg = msg_waiter.recv() LOG.debug(_("Received message: %s"), msg) LOG.debug(_("Unpacking response")) - responses = _deserialize(msg[-1])[-1]['args']['response'] + + if msg[2] == 'cast': # Legacy version + raw_msg = _deserialize(msg[-1])[-1] + elif msg[2] == 'impl_zmq_v2': + rpc_envelope = unflatten_envelope(msg[4:]) + raw_msg = rpc_common.deserialize_msg(rpc_envelope) + else: + raise rpc_common.UnsupportedRpcEnvelopeVersion( + _("Unsupported or unknown ZMQ envelope returned.")) + + responses = raw_msg['args']['response'] # ZMQError trumps the Timeout error. except zmq.ZMQError: raise RPCException("ZMQ Socket Error") @@ -676,8 +718,8 @@ def _call(addr, context, topic, msg, timeout=None, return responses[-1] -def _multi_send(method, context, topic, msg, timeout=None, serialize=True, - force_envelope=False, _msg_id=None): +def _multi_send(method, context, topic, msg, timeout=None, + envelope=False, _msg_id=None): """ Wraps the sending of messages, dispatches to the matchmaker and sends @@ -703,11 +745,11 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True, if method.__name__ == '_cast': eventlet.spawn_n(method, _addr, context, - _topic, msg, timeout, serialize, - force_envelope, _msg_id) + _topic, msg, timeout, envelope, + _msg_id) return return method(_addr, context, _topic, msg, timeout, - serialize, force_envelope) + envelope) def create_connection(conf, new=True): @@ -746,8 +788,7 @@ def notify(conf, context, topic, msg, **kwargs): # NOTE(ewindisch): dot-priority in rpc notifier does not # work with our assumptions. topic.replace('.', '-') - kwargs['serialize'] = kwargs.pop('envelope') - kwargs['force_envelope'] = True + kwargs['envelope'] = kwargs.get('envelope', True) cast(conf, context, topic, msg, **kwargs) |
