summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--openstack/common/rpc/amqp.py1
-rw-r--r--openstack/common/rpc/common.py20
-rw-r--r--openstack/common/rpc/proxy.py22
-rw-r--r--tests/unit/rpc/test_proxy.py60
4 files changed, 74 insertions, 29 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index 81fb41e..1d45db6 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -495,7 +495,6 @@ class MulticallProxyWaiter(object):
data = self._dataqueue.get(timeout=self._timeout)
result = self._process_data(data)
except queue.Empty:
- LOG.exception(_('Timed out waiting for RPC response.'))
self.done()
raise rpc_common.Timeout()
except Exception:
diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py
index 8bffc2b..42aaf09 100644
--- a/openstack/common/rpc/common.py
+++ b/openstack/common/rpc/common.py
@@ -118,7 +118,25 @@ class Timeout(RPCException):
This exception is raised if the rpc_response_timeout is reached while
waiting for a response from the remote side.
"""
- message = _("Timeout while waiting on RPC response.")
+ message = _('Timeout while waiting on RPC response - '
+ 'topic: "%(topic)s", RPC method: "%(method)s" '
+ 'info: "%(info)s"')
+
+ def __init__(self, info=None, topic=None, method=None):
+ """
+ :param info: Extra info to convey to the user
+ :param topic: The topic that the rpc call was sent to
+ :param rpc_method_name: The name of the rpc method being
+ called
+ """
+ self.info = info
+ self.topic = topic
+ self.method = method
+ super(Timeout, self).__init__(
+ None,
+ info=info or _('<unknown>'),
+ topic=topic or _('<unknown>'),
+ method=method or _('<unknown>'))
class DuplicateMessageError(RPCException):
diff --git a/openstack/common/rpc/proxy.py b/openstack/common/rpc/proxy.py
index fc09116..822248d 100644
--- a/openstack/common/rpc/proxy.py
+++ b/openstack/common/rpc/proxy.py
@@ -68,16 +68,21 @@ class RpcProxy(object):
:param context: The request context
:param msg: The message to send, including the method and args.
:param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
:param timeout: (Optional) A timeout to use when waiting for the
response. If no timeout is specified, a default timeout will be
used that is usually sufficient.
- :param version: (Optional) Override the requested API version in this
- message.
:returns: The return value from the remote method.
"""
self._set_version(msg, version)
- return rpc.call(context, self._get_topic(topic), msg, timeout)
+ real_topic = self._get_topic(topic)
+ try:
+ return rpc.call(context, real_topic, msg, timeout)
+ except rpc.common.Timeout as exc:
+ raise rpc.common.Timeout(
+ exc.info, real_topic, msg.get('method'))
def multicall(self, context, msg, topic=None, version=None, timeout=None):
"""rpc.multicall() a remote method.
@@ -85,17 +90,22 @@ class RpcProxy(object):
:param context: The request context
:param msg: The message to send, including the method and args.
:param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
:param timeout: (Optional) A timeout to use when waiting for the
response. If no timeout is specified, a default timeout will be
used that is usually sufficient.
- :param version: (Optional) Override the requested API version in this
- message.
:returns: An iterator that lets you process each of the returned values
from the remote method as they arrive.
"""
self._set_version(msg, version)
- return rpc.multicall(context, self._get_topic(topic), msg, timeout)
+ real_topic = self._get_topic(topic)
+ try:
+ return rpc.multicall(context, real_topic, msg, timeout)
+ except rpc.common.Timeout as exc:
+ raise rpc.common.Timeout(
+ exc.info, real_topic, msg.get('method'))
def cast(self, context, msg, topic=None, version=None):
"""rpc.cast() a remote method.
diff --git a/tests/unit/rpc/test_proxy.py b/tests/unit/rpc/test_proxy.py
index 6b08b91..5ac4cea 100644
--- a/tests/unit/rpc/test_proxy.py
+++ b/tests/unit/rpc/test_proxy.py
@@ -24,6 +24,7 @@ from openstack.common import context
from openstack.common import lockutils
from openstack.common import rpc
from openstack.common.rpc import proxy
+from openstack.common.rpc import common as rpc_common
from tests import utils
@@ -32,7 +33,7 @@ class RpcProxyTestCase(utils.BaseTestCase):
def _test_rpc_method(self, rpc_method, has_timeout=False, has_retval=False,
server_params=None, supports_topic_override=True):
topic = 'fake_topic'
- timeout = 123
+ timeout = None
rpc_proxy = proxy.RpcProxy(topic, '1.0')
ctxt = context.RequestContext('fake_user', 'fake_project')
msg = {'method': 'fake_method', 'args': {'x': 'y'}}
@@ -51,6 +52,20 @@ class RpcProxyTestCase(utils.BaseTestCase):
if has_retval:
return expected_retval
+ def _fake_rpc_method_timeout(*args, **kwargs):
+ rpc._check_for_lock()
+ self.fake_args = args
+ self.fake_kwargs = kwargs
+ raise rpc_common.Timeout("The spider got you")
+
+ def _check_args(context, topic, msg, timeout=None):
+ expected_args = [context, topic, msg]
+ if server_params:
+ expected_args.insert(1, server_params)
+ if has_timeout:
+ expected_args.append(timeout)
+ self.assertEqual(tuple(expected_args), self.fake_args)
+
self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
args = [ctxt, msg]
@@ -60,41 +75,44 @@ class RpcProxyTestCase(utils.BaseTestCase):
# Base method usage
retval = getattr(rpc_proxy, rpc_method)(*args)
self.assertEqual(retval, expected_retval)
- expected_args = [ctxt, topic, expected_msg]
- if server_params:
- expected_args.insert(1, server_params)
- for arg, expected_arg in zip(self.fake_args, expected_args):
- self.assertEqual(arg, expected_arg)
+ _check_args(ctxt, topic, expected_msg)
# overriding the version
retval = getattr(rpc_proxy, rpc_method)(*args, version='1.1')
self.assertEqual(retval, expected_retval)
new_msg = copy.deepcopy(expected_msg)
new_msg['version'] = '1.1'
- expected_args = [ctxt, topic, new_msg]
- if server_params:
- expected_args.insert(1, server_params)
- for arg, expected_arg in zip(self.fake_args, expected_args):
- self.assertEqual(arg, expected_arg)
+ _check_args(ctxt, topic, new_msg)
if has_timeout:
- # set a timeout
- retval = getattr(rpc_proxy, rpc_method)(ctxt, msg, timeout=timeout)
+ # Set a timeout
+ retval = getattr(rpc_proxy, rpc_method)(ctxt, msg, timeout=42)
self.assertEqual(retval, expected_retval)
- expected_args = [ctxt, topic, expected_msg, timeout]
- for arg, expected_arg in zip(self.fake_args, expected_args):
- self.assertEqual(arg, expected_arg)
+ _check_args(ctxt, topic, expected_msg, timeout=42)
+
+ # Make it timeout and check that the exception is written as
+ # expected
+ self.stubs.Set(rpc, rpc_method, _fake_rpc_method_timeout)
+ try:
+ getattr(rpc_proxy, rpc_method)(*args, timeout=42)
+ self.fail("This should have raised a Timeout exception")
+ except rpc_common.Timeout as exc:
+ self.assertEqual(exc.info, 'The spider got you')
+ self.assertEqual(exc.topic, 'fake_topic')
+ self.assertEqual(exc.method, 'fake_method')
+ self.assertEqual(
+ u'Timeout while waiting on RPC response - '
+ 'topic: "fake_topic", RPC method: "fake_method" '
+ 'info: "The spider got you"', unicode(exc))
+ _check_args(ctxt, topic, expected_msg, timeout=42)
+ self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
if supports_topic_override:
# set a topic
new_topic = 'foo.bar'
retval = getattr(rpc_proxy, rpc_method)(*args, topic=new_topic)
self.assertEqual(retval, expected_retval)
- expected_args = [ctxt, new_topic, expected_msg]
- if server_params:
- expected_args.insert(1, server_params)
- for arg, expected_arg in zip(self.fake_args, expected_args):
- self.assertEqual(arg, expected_arg)
+ _check_args(ctxt, new_topic, expected_msg)
def test_call(self):
self._test_rpc_method('call', has_timeout=True, has_retval=True)