diff options
-rw-r--r-- | openstack/common/rpc/amqp.py | 1 | ||||
-rw-r--r-- | openstack/common/rpc/common.py | 20 | ||||
-rw-r--r-- | openstack/common/rpc/proxy.py | 22 | ||||
-rw-r--r-- | tests/unit/rpc/test_proxy.py | 60 |
4 files changed, 74 insertions, 29 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index 81fb41e..1d45db6 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -495,7 +495,6 @@ class MulticallProxyWaiter(object): data = self._dataqueue.get(timeout=self._timeout) result = self._process_data(data) except queue.Empty: - LOG.exception(_('Timed out waiting for RPC response.')) self.done() raise rpc_common.Timeout() except Exception: diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py index 8bffc2b..42aaf09 100644 --- a/openstack/common/rpc/common.py +++ b/openstack/common/rpc/common.py @@ -118,7 +118,25 @@ class Timeout(RPCException): This exception is raised if the rpc_response_timeout is reached while waiting for a response from the remote side. """ - message = _("Timeout while waiting on RPC response.") + message = _('Timeout while waiting on RPC response - ' + 'topic: "%(topic)s", RPC method: "%(method)s" ' + 'info: "%(info)s"') + + def __init__(self, info=None, topic=None, method=None): + """ + :param info: Extra info to convey to the user + :param topic: The topic that the rpc call was sent to + :param rpc_method_name: The name of the rpc method being + called + """ + self.info = info + self.topic = topic + self.method = method + super(Timeout, self).__init__( + None, + info=info or _('<unknown>'), + topic=topic or _('<unknown>'), + method=method or _('<unknown>')) class DuplicateMessageError(RPCException): diff --git a/openstack/common/rpc/proxy.py b/openstack/common/rpc/proxy.py index fc09116..822248d 100644 --- a/openstack/common/rpc/proxy.py +++ b/openstack/common/rpc/proxy.py @@ -68,16 +68,21 @@ class RpcProxy(object): :param context: The request context :param msg: The message to send, including the method and args. :param topic: Override the topic for this message. + :param version: (Optional) Override the requested API version in this + message. :param timeout: (Optional) A timeout to use when waiting for the response. If no timeout is specified, a default timeout will be used that is usually sufficient. - :param version: (Optional) Override the requested API version in this - message. :returns: The return value from the remote method. """ self._set_version(msg, version) - return rpc.call(context, self._get_topic(topic), msg, timeout) + real_topic = self._get_topic(topic) + try: + return rpc.call(context, real_topic, msg, timeout) + except rpc.common.Timeout as exc: + raise rpc.common.Timeout( + exc.info, real_topic, msg.get('method')) def multicall(self, context, msg, topic=None, version=None, timeout=None): """rpc.multicall() a remote method. @@ -85,17 +90,22 @@ class RpcProxy(object): :param context: The request context :param msg: The message to send, including the method and args. :param topic: Override the topic for this message. + :param version: (Optional) Override the requested API version in this + message. :param timeout: (Optional) A timeout to use when waiting for the response. If no timeout is specified, a default timeout will be used that is usually sufficient. - :param version: (Optional) Override the requested API version in this - message. :returns: An iterator that lets you process each of the returned values from the remote method as they arrive. """ self._set_version(msg, version) - return rpc.multicall(context, self._get_topic(topic), msg, timeout) + real_topic = self._get_topic(topic) + try: + return rpc.multicall(context, real_topic, msg, timeout) + except rpc.common.Timeout as exc: + raise rpc.common.Timeout( + exc.info, real_topic, msg.get('method')) def cast(self, context, msg, topic=None, version=None): """rpc.cast() a remote method. diff --git a/tests/unit/rpc/test_proxy.py b/tests/unit/rpc/test_proxy.py index 6b08b91..5ac4cea 100644 --- a/tests/unit/rpc/test_proxy.py +++ b/tests/unit/rpc/test_proxy.py @@ -24,6 +24,7 @@ from openstack.common import context from openstack.common import lockutils from openstack.common import rpc from openstack.common.rpc import proxy +from openstack.common.rpc import common as rpc_common from tests import utils @@ -32,7 +33,7 @@ class RpcProxyTestCase(utils.BaseTestCase): def _test_rpc_method(self, rpc_method, has_timeout=False, has_retval=False, server_params=None, supports_topic_override=True): topic = 'fake_topic' - timeout = 123 + timeout = None rpc_proxy = proxy.RpcProxy(topic, '1.0') ctxt = context.RequestContext('fake_user', 'fake_project') msg = {'method': 'fake_method', 'args': {'x': 'y'}} @@ -51,6 +52,20 @@ class RpcProxyTestCase(utils.BaseTestCase): if has_retval: return expected_retval + def _fake_rpc_method_timeout(*args, **kwargs): + rpc._check_for_lock() + self.fake_args = args + self.fake_kwargs = kwargs + raise rpc_common.Timeout("The spider got you") + + def _check_args(context, topic, msg, timeout=None): + expected_args = [context, topic, msg] + if server_params: + expected_args.insert(1, server_params) + if has_timeout: + expected_args.append(timeout) + self.assertEqual(tuple(expected_args), self.fake_args) + self.stubs.Set(rpc, rpc_method, _fake_rpc_method) args = [ctxt, msg] @@ -60,41 +75,44 @@ class RpcProxyTestCase(utils.BaseTestCase): # Base method usage retval = getattr(rpc_proxy, rpc_method)(*args) self.assertEqual(retval, expected_retval) - expected_args = [ctxt, topic, expected_msg] - if server_params: - expected_args.insert(1, server_params) - for arg, expected_arg in zip(self.fake_args, expected_args): - self.assertEqual(arg, expected_arg) + _check_args(ctxt, topic, expected_msg) # overriding the version retval = getattr(rpc_proxy, rpc_method)(*args, version='1.1') self.assertEqual(retval, expected_retval) new_msg = copy.deepcopy(expected_msg) new_msg['version'] = '1.1' - expected_args = [ctxt, topic, new_msg] - if server_params: - expected_args.insert(1, server_params) - for arg, expected_arg in zip(self.fake_args, expected_args): - self.assertEqual(arg, expected_arg) + _check_args(ctxt, topic, new_msg) if has_timeout: - # set a timeout - retval = getattr(rpc_proxy, rpc_method)(ctxt, msg, timeout=timeout) + # Set a timeout + retval = getattr(rpc_proxy, rpc_method)(ctxt, msg, timeout=42) self.assertEqual(retval, expected_retval) - expected_args = [ctxt, topic, expected_msg, timeout] - for arg, expected_arg in zip(self.fake_args, expected_args): - self.assertEqual(arg, expected_arg) + _check_args(ctxt, topic, expected_msg, timeout=42) + + # Make it timeout and check that the exception is written as + # expected + self.stubs.Set(rpc, rpc_method, _fake_rpc_method_timeout) + try: + getattr(rpc_proxy, rpc_method)(*args, timeout=42) + self.fail("This should have raised a Timeout exception") + except rpc_common.Timeout as exc: + self.assertEqual(exc.info, 'The spider got you') + self.assertEqual(exc.topic, 'fake_topic') + self.assertEqual(exc.method, 'fake_method') + self.assertEqual( + u'Timeout while waiting on RPC response - ' + 'topic: "fake_topic", RPC method: "fake_method" ' + 'info: "The spider got you"', unicode(exc)) + _check_args(ctxt, topic, expected_msg, timeout=42) + self.stubs.Set(rpc, rpc_method, _fake_rpc_method) if supports_topic_override: # set a topic new_topic = 'foo.bar' retval = getattr(rpc_proxy, rpc_method)(*args, topic=new_topic) self.assertEqual(retval, expected_retval) - expected_args = [ctxt, new_topic, expected_msg] - if server_params: - expected_args.insert(1, server_params) - for arg, expected_arg in zip(self.fake_args, expected_args): - self.assertEqual(arg, expected_arg) + _check_args(ctxt, new_topic, expected_msg) def test_call(self): self._test_rpc_method('call', has_timeout=True, has_retval=True) |