diff options
Diffstat (limited to 'openstack/common')
| -rw-r--r-- | openstack/common/rpc/impl_kombu.py | 3 | ||||
| -rw-r--r-- | openstack/common/rpc/impl_qpid.py | 3 | ||||
| -rw-r--r-- | openstack/common/rpc/impl_zmq.py | 45 |
3 files changed, 24 insertions, 27 deletions
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index f1d46c4..4f3cc7a 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -66,7 +66,8 @@ kombu_opts = [ help='the RabbitMQ userid'), cfg.StrOpt('rabbit_password', default='guest', - help='the RabbitMQ password'), + help='the RabbitMQ password', + secret=True), cfg.StrOpt('rabbit_virtual_host', default='/', help='the RabbitMQ virtual host'), diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index 9d3a3a7..34262a3 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -51,7 +51,8 @@ qpid_opts = [ help='Username for qpid connection'), cfg.StrOpt('qpid_password', default='', - help='Password for qpid connection'), + help='Password for qpid connection', + secret=True), cfg.StrOpt('qpid_sasl_mechanisms', default='', help='Space separated list of SASL mechanisms to use for auth'), diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index 0e998e9..cd974e5 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -89,7 +89,7 @@ def _serialize(data): Error if a developer passes us bad data. """ try: - return str(jsonutils.dumps(data, ensure_ascii=True)) + return jsonutils.dumps(data, ensure_ascii=True) except TypeError: LOG.error(_("JSON serialization failed.")) raise @@ -217,10 +217,11 @@ class ZmqClient(object): self.outq = ZmqSocket(addr, socket_type, bind=bind) def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): + msg_id = msg_id or 0 + if serialize: data = rpc_common.serialize_msg(data, force_envelope) - self.outq.send([str(msg_id), str(topic), str('cast'), - _serialize(data)]) + self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data)))) def close(self): self.outq.close() @@ -294,13 +295,13 @@ class InternalContext(object): ctx.replies) LOG.debug(_("Sending reply")) - cast(CONF, ctx, topic, { + _multi_send(_cast, ctx, topic, { 'method': '-process_reply', 'args': { - 'msg_id': msg_id, + 'msg_id': msg_id, # Include for Folsom compat. 'response': response } - }) + }, _msg_id=msg_id) class ConsumerBase(object): @@ -436,16 +437,8 @@ class ZmqProxy(ZmqBaseReactor): LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) - # Handle zmq_replies magic - if topic.startswith('fanout~'): - sock_type = zmq.PUB - elif topic.startswith('zmq_replies'): + if topic.startswith('fanout~') or topic.startswith('zmq_replies'): sock_type = zmq.PUB - inside = rpc_common.deserialize_msg(_deserialize(in_msg)) - msg_id = inside[-1]['args']['msg_id'] - response = inside[-1]['args']['response'] - LOG.debug(_("->response->%s"), response) - data = [str(msg_id), _serialize(response)] else: sock_type = zmq.PUSH @@ -600,8 +593,8 @@ class Connection(rpc_common.Connection): self.reactor.consume_in_thread() -def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, - force_envelope=False): +def _cast(addr, context, topic, msg, timeout=None, serialize=True, + force_envelope=False, _msg_id=None): timeout_cast = timeout or CONF.rpc_cast_timeout payload = [RpcContext.marshal(context), msg] @@ -610,7 +603,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, conn = ZmqClient(addr) # assumes cast can't return an exception - conn.cast(msg_id, topic, payload, serialize, force_envelope) + conn.cast(_msg_id, topic, payload, serialize, force_envelope) except zmq.ZMQError: raise RPCException("Cast failed. ZMQ Socket Exception") finally: @@ -618,7 +611,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, conn.close() -def _call(addr, context, msg_id, topic, msg, timeout=None, +def _call(addr, context, topic, msg, timeout=None, serialize=True, force_envelope=False): # timeout_response is how long we wait for a response timeout = timeout or CONF.rpc_response_timeout @@ -654,7 +647,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None, ) LOG.debug(_("Sending cast")) - _cast(addr, context, msg_id, topic, payload, + _cast(addr, context, topic, payload, serialize=serialize, force_envelope=force_envelope) LOG.debug(_("Cast sent; Waiting reply")) @@ -662,10 +655,12 @@ def _call(addr, context, msg_id, topic, msg, timeout=None, msg = msg_waiter.recv() LOG.debug(_("Received message: %s"), msg) LOG.debug(_("Unpacking response")) - responses = _deserialize(msg[-1]) + responses = _deserialize(msg[-1])[-1]['args']['response'] # ZMQError trumps the Timeout error. except zmq.ZMQError: raise RPCException("ZMQ Socket Error") + except (IndexError, KeyError): + raise RPCException(_("RPC Message Invalid.")) finally: if 'msg_waiter' in vars(): msg_waiter.close() @@ -682,7 +677,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None, def _multi_send(method, context, topic, msg, timeout=None, serialize=True, - force_envelope=False): + force_envelope=False, _msg_id=None): """ Wraps the sending of messages, dispatches to the matchmaker and sends @@ -708,10 +703,10 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True, if method.__name__ == '_cast': eventlet.spawn_n(method, _addr, context, - _topic, _topic, msg, timeout, serialize, - force_envelope) + _topic, msg, timeout, serialize, + force_envelope, _msg_id) return - return method(_addr, context, _topic, _topic, msg, timeout, + return method(_addr, context, _topic, msg, timeout, serialize, force_envelope) |
