summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-03-25 08:32:19 +0000
committerGerrit Code Review <review@openstack.org>2013-03-25 08:32:19 +0000
commitace737b1a25d03136b22b4150aae64e0384a2546 (patch)
treebe5c948b4c033370878c1eaa1594e6ee639dd6fc
parent552e632fa8c513c86da6713c012a96ea38995a7b (diff)
parent81204d4020712cf5b5b368b75d934d637b2c001b (diff)
downloadnova-ace737b1a25d03136b22b4150aae64e0384a2546.tar.gz
nova-ace737b1a25d03136b22b4150aae64e0384a2546.tar.xz
nova-ace737b1a25d03136b22b4150aae64e0384a2546.zip
Merge "Sync rpc from oslo-incubator."
-rw-r--r--nova/openstack/common/rpc/amqp.py2
-rw-r--r--nova/openstack/common/rpc/common.py29
-rw-r--r--nova/openstack/common/rpc/impl_zmq.py25
-rw-r--r--nova/openstack/common/rpc/proxy.py22
4 files changed, 57 insertions, 21 deletions
diff --git a/nova/openstack/common/rpc/amqp.py b/nova/openstack/common/rpc/amqp.py
index cc0098cc3..e6b899cd9 100644
--- a/nova/openstack/common/rpc/amqp.py
+++ b/nova/openstack/common/rpc/amqp.py
@@ -661,7 +661,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope):
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
if envelope:
- msg = rpc_common.serialize_msg(msg, force_envelope=True)
+ msg = rpc_common.serialize_msg(msg)
conn.notify_send(topic, msg)
diff --git a/nova/openstack/common/rpc/common.py b/nova/openstack/common/rpc/common.py
index 5fca30717..a3be2fe10 100644
--- a/nova/openstack/common/rpc/common.py
+++ b/nova/openstack/common/rpc/common.py
@@ -70,10 +70,6 @@ _VERSION_KEY = 'oslo.version'
_MESSAGE_KEY = 'oslo.message'
-# TODO(russellb) Turn this on after Grizzly.
-_SEND_RPC_ENVELOPE = False
-
-
class RPCException(Exception):
message = _("An unknown RPC related exception occurred.")
@@ -122,7 +118,25 @@ class Timeout(RPCException):
This exception is raised if the rpc_response_timeout is reached while
waiting for a response from the remote side.
"""
- message = _("Timeout while waiting on RPC response.")
+ message = _('Timeout while waiting on RPC response - '
+ 'topic: "%(topic)s", RPC method: "%(method)s" '
+ 'info: "%(info)s"')
+
+ def __init__(self, info=None, topic=None, method=None):
+ """
+ :param info: Extra info to convey to the user
+ :param topic: The topic that the rpc call was sent to
+ :param rpc_method_name: The name of the rpc method being
+ called
+ """
+ self.info = info
+ self.topic = topic
+ self.method = method
+ super(Timeout, self).__init__(
+ None,
+ info=info or _('<unknown>'),
+ topic=topic or _('<unknown>'),
+ method=method or _('<unknown>'))
class DuplicateMessageError(RPCException):
@@ -441,10 +455,7 @@ def version_is_compatible(imp_version, version):
return True
-def serialize_msg(raw_msg, force_envelope=False):
- if not _SEND_RPC_ENVELOPE and not force_envelope:
- return raw_msg
-
+def serialize_msg(raw_msg):
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
# information about this format.
msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
diff --git a/nova/openstack/common/rpc/impl_zmq.py b/nova/openstack/common/rpc/impl_zmq.py
index 4102146fb..c501504cb 100644
--- a/nova/openstack/common/rpc/impl_zmq.py
+++ b/nova/openstack/common/rpc/impl_zmq.py
@@ -16,6 +16,7 @@
import os
import pprint
+import re
import socket
import sys
import types
@@ -220,7 +221,7 @@ class ZmqClient(object):
def cast(self, msg_id, topic, data, envelope=False):
msg_id = msg_id or 0
- if not (envelope or rpc_common._SEND_RPC_ENVELOPE):
+ if not envelope:
self.outq.send(map(bytes,
(msg_id, topic, 'cast', _serialize(data))))
return
@@ -294,11 +295,16 @@ class InternalContext(object):
def reply(self, ctx, proxy,
msg_id=None, context=None, topic=None, msg=None):
"""Reply to a casted call."""
- # Our real method is curried into msg['args']
+ # NOTE(ewindisch): context kwarg exists for Grizzly compat.
+ # this may be able to be removed earlier than
+ # 'I' if ConsumerBase.process were refactored.
+ if type(msg) is list:
+ payload = msg[-1]
+ else:
+ payload = msg
- child_ctx = RpcContext.unmarshal(msg[0])
response = ConsumerBase.normalize_reply(
- self._get_response(child_ctx, proxy, topic, msg[1]),
+ self._get_response(ctx, proxy, topic, payload),
ctx.replies)
LOG.debug(_("Sending reply"))
@@ -431,6 +437,8 @@ class ZmqProxy(ZmqBaseReactor):
def __init__(self, conf):
super(ZmqProxy, self).__init__(conf)
+ pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\'))
+ self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep)))
self.topic_proxy = {}
@@ -456,6 +464,13 @@ class ZmqProxy(ZmqBaseReactor):
LOG.info(_("Creating proxy for topic: %s"), topic)
try:
+ # The topic is received over the network,
+ # don't trust this input.
+ if self.badchars.search(topic) is not None:
+ emsg = _("Topic contained dangerous characters.")
+ LOG.warn(emsg)
+ raise RPCException(emsg)
+
out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
(ipc_dir, topic),
sock_type, bind=True)
@@ -675,8 +690,8 @@ def _call(addr, context, topic, msg, timeout=None,
'method': '-reply',
'args': {
'msg_id': msg_id,
- 'context': mcontext,
'topic': reply_topic,
+ # TODO(ewindisch): safe to remove mcontext in I.
'msg': [mcontext, msg]
}
}
diff --git a/nova/openstack/common/rpc/proxy.py b/nova/openstack/common/rpc/proxy.py
index a0775528e..f6a7dbc74 100644
--- a/nova/openstack/common/rpc/proxy.py
+++ b/nova/openstack/common/rpc/proxy.py
@@ -68,16 +68,21 @@ class RpcProxy(object):
:param context: The request context
:param msg: The message to send, including the method and args.
:param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
:param timeout: (Optional) A timeout to use when waiting for the
response. If no timeout is specified, a default timeout will be
used that is usually sufficient.
- :param version: (Optional) Override the requested API version in this
- message.
:returns: The return value from the remote method.
"""
self._set_version(msg, version)
- return rpc.call(context, self._get_topic(topic), msg, timeout)
+ real_topic = self._get_topic(topic)
+ try:
+ return rpc.call(context, real_topic, msg, timeout)
+ except rpc.common.Timeout as exc:
+ raise rpc.common.Timeout(
+ exc.info, real_topic, msg.get('method'))
def multicall(self, context, msg, topic=None, version=None, timeout=None):
"""rpc.multicall() a remote method.
@@ -85,17 +90,22 @@ class RpcProxy(object):
:param context: The request context
:param msg: The message to send, including the method and args.
:param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
:param timeout: (Optional) A timeout to use when waiting for the
response. If no timeout is specified, a default timeout will be
used that is usually sufficient.
- :param version: (Optional) Override the requested API version in this
- message.
:returns: An iterator that lets you process each of the returned values
from the remote method as they arrive.
"""
self._set_version(msg, version)
- return rpc.multicall(context, self._get_topic(topic), msg, timeout)
+ real_topic = self._get_topic(topic)
+ try:
+ return rpc.multicall(context, real_topic, msg, timeout)
+ except rpc.common.Timeout as exc:
+ raise rpc.common.Timeout(
+ exc.info, real_topic, msg.get('method'))
def cast(self, context, msg, topic=None, version=None):
"""rpc.cast() a remote method.