diff options
author | Jenkins <jenkins@review.openstack.org> | 2013-03-25 08:32:19 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2013-03-25 08:32:19 +0000 |
commit | ace737b1a25d03136b22b4150aae64e0384a2546 (patch) | |
tree | be5c948b4c033370878c1eaa1594e6ee639dd6fc | |
parent | 552e632fa8c513c86da6713c012a96ea38995a7b (diff) | |
parent | 81204d4020712cf5b5b368b75d934d637b2c001b (diff) | |
download | nova-ace737b1a25d03136b22b4150aae64e0384a2546.tar.gz nova-ace737b1a25d03136b22b4150aae64e0384a2546.tar.xz nova-ace737b1a25d03136b22b4150aae64e0384a2546.zip |
Merge "Sync rpc from oslo-incubator."
-rw-r--r-- | nova/openstack/common/rpc/amqp.py | 2 | ||||
-rw-r--r-- | nova/openstack/common/rpc/common.py | 29 | ||||
-rw-r--r-- | nova/openstack/common/rpc/impl_zmq.py | 25 | ||||
-rw-r--r-- | nova/openstack/common/rpc/proxy.py | 22 |
4 files changed, 57 insertions, 21 deletions
diff --git a/nova/openstack/common/rpc/amqp.py b/nova/openstack/common/rpc/amqp.py index cc0098cc3..e6b899cd9 100644 --- a/nova/openstack/common/rpc/amqp.py +++ b/nova/openstack/common/rpc/amqp.py @@ -661,7 +661,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope): pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: if envelope: - msg = rpc_common.serialize_msg(msg, force_envelope=True) + msg = rpc_common.serialize_msg(msg) conn.notify_send(topic, msg) diff --git a/nova/openstack/common/rpc/common.py b/nova/openstack/common/rpc/common.py index 5fca30717..a3be2fe10 100644 --- a/nova/openstack/common/rpc/common.py +++ b/nova/openstack/common/rpc/common.py @@ -70,10 +70,6 @@ _VERSION_KEY = 'oslo.version' _MESSAGE_KEY = 'oslo.message' -# TODO(russellb) Turn this on after Grizzly. -_SEND_RPC_ENVELOPE = False - - class RPCException(Exception): message = _("An unknown RPC related exception occurred.") @@ -122,7 +118,25 @@ class Timeout(RPCException): This exception is raised if the rpc_response_timeout is reached while waiting for a response from the remote side. """ - message = _("Timeout while waiting on RPC response.") + message = _('Timeout while waiting on RPC response - ' + 'topic: "%(topic)s", RPC method: "%(method)s" ' + 'info: "%(info)s"') + + def __init__(self, info=None, topic=None, method=None): + """ + :param info: Extra info to convey to the user + :param topic: The topic that the rpc call was sent to + :param rpc_method_name: The name of the rpc method being + called + """ + self.info = info + self.topic = topic + self.method = method + super(Timeout, self).__init__( + None, + info=info or _('<unknown>'), + topic=topic or _('<unknown>'), + method=method or _('<unknown>')) class DuplicateMessageError(RPCException): @@ -441,10 +455,7 @@ def version_is_compatible(imp_version, version): return True -def serialize_msg(raw_msg, force_envelope=False): - if not _SEND_RPC_ENVELOPE and not force_envelope: - return raw_msg - +def serialize_msg(raw_msg): # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more # information about this format. msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION, diff --git a/nova/openstack/common/rpc/impl_zmq.py b/nova/openstack/common/rpc/impl_zmq.py index 4102146fb..c501504cb 100644 --- a/nova/openstack/common/rpc/impl_zmq.py +++ b/nova/openstack/common/rpc/impl_zmq.py @@ -16,6 +16,7 @@ import os import pprint +import re import socket import sys import types @@ -220,7 +221,7 @@ class ZmqClient(object): def cast(self, msg_id, topic, data, envelope=False): msg_id = msg_id or 0 - if not (envelope or rpc_common._SEND_RPC_ENVELOPE): + if not envelope: self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data)))) return @@ -294,11 +295,16 @@ class InternalContext(object): def reply(self, ctx, proxy, msg_id=None, context=None, topic=None, msg=None): """Reply to a casted call.""" - # Our real method is curried into msg['args'] + # NOTE(ewindisch): context kwarg exists for Grizzly compat. + # this may be able to be removed earlier than + # 'I' if ConsumerBase.process were refactored. + if type(msg) is list: + payload = msg[-1] + else: + payload = msg - child_ctx = RpcContext.unmarshal(msg[0]) response = ConsumerBase.normalize_reply( - self._get_response(child_ctx, proxy, topic, msg[1]), + self._get_response(ctx, proxy, topic, payload), ctx.replies) LOG.debug(_("Sending reply")) @@ -431,6 +437,8 @@ class ZmqProxy(ZmqBaseReactor): def __init__(self, conf): super(ZmqProxy, self).__init__(conf) + pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\')) + self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep))) self.topic_proxy = {} @@ -456,6 +464,13 @@ class ZmqProxy(ZmqBaseReactor): LOG.info(_("Creating proxy for topic: %s"), topic) try: + # The topic is received over the network, + # don't trust this input. + if self.badchars.search(topic) is not None: + emsg = _("Topic contained dangerous characters.") + LOG.warn(emsg) + raise RPCException(emsg) + out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), sock_type, bind=True) @@ -675,8 +690,8 @@ def _call(addr, context, topic, msg, timeout=None, 'method': '-reply', 'args': { 'msg_id': msg_id, - 'context': mcontext, 'topic': reply_topic, + # TODO(ewindisch): safe to remove mcontext in I. 'msg': [mcontext, msg] } } diff --git a/nova/openstack/common/rpc/proxy.py b/nova/openstack/common/rpc/proxy.py index a0775528e..f6a7dbc74 100644 --- a/nova/openstack/common/rpc/proxy.py +++ b/nova/openstack/common/rpc/proxy.py @@ -68,16 +68,21 @@ class RpcProxy(object): :param context: The request context :param msg: The message to send, including the method and args. :param topic: Override the topic for this message. + :param version: (Optional) Override the requested API version in this + message. :param timeout: (Optional) A timeout to use when waiting for the response. If no timeout is specified, a default timeout will be used that is usually sufficient. - :param version: (Optional) Override the requested API version in this - message. :returns: The return value from the remote method. """ self._set_version(msg, version) - return rpc.call(context, self._get_topic(topic), msg, timeout) + real_topic = self._get_topic(topic) + try: + return rpc.call(context, real_topic, msg, timeout) + except rpc.common.Timeout as exc: + raise rpc.common.Timeout( + exc.info, real_topic, msg.get('method')) def multicall(self, context, msg, topic=None, version=None, timeout=None): """rpc.multicall() a remote method. @@ -85,17 +90,22 @@ class RpcProxy(object): :param context: The request context :param msg: The message to send, including the method and args. :param topic: Override the topic for this message. + :param version: (Optional) Override the requested API version in this + message. :param timeout: (Optional) A timeout to use when waiting for the response. If no timeout is specified, a default timeout will be used that is usually sufficient. - :param version: (Optional) Override the requested API version in this - message. :returns: An iterator that lets you process each of the returned values from the remote method as they arrive. """ self._set_version(msg, version) - return rpc.multicall(context, self._get_topic(topic), msg, timeout) + real_topic = self._get_topic(topic) + try: + return rpc.multicall(context, real_topic, msg, timeout) + except rpc.common.Timeout as exc: + raise rpc.common.Timeout( + exc.info, real_topic, msg.get('method')) def cast(self, context, msg, topic=None, version=None): """rpc.cast() a remote method. |