summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2012-02-02 18:58:22 +0000
committerGerrit Code Review <review@openstack.org>2012-02-02 18:58:22 +0000
commitdef8544e804d4af22b89350b2d54c2c042833169 (patch)
tree48363f1f45cee070b270388b2db8ed42ae3d9c9c
parent44bd90224c83750b4d78241a10891ae427477a5f (diff)
parente0c59afc6dca4c6cf737e6e5d24da3415c258090 (diff)
downloadnova-def8544e804d4af22b89350b2d54c2c042833169.tar.gz
nova-def8544e804d4af22b89350b2d54c2c042833169.tar.xz
nova-def8544e804d4af22b89350b2d54c2c042833169.zip
Merge "Don't block forever for rpc.(multi)call response."
-rw-r--r--nova/rpc/__init__.py18
-rw-r--r--nova/rpc/amqp.py13
-rw-r--r--nova/rpc/common.py12
-rw-r--r--nova/rpc/impl_carrot.py7
-rw-r--r--nova/rpc/impl_fake.py79
-rw-r--r--nova/rpc/impl_kombu.py26
-rw-r--r--nova/rpc/impl_qpid.py25
-rw-r--r--nova/tests/rpc/common.py35
-rw-r--r--nova/tests/rpc/test_carrot.py2
-rw-r--r--nova/tests/rpc/test_qpid.py12
10 files changed, 163 insertions, 66 deletions
diff --git a/nova/rpc/__init__.py b/nova/rpc/__init__.py
index 1fbd9aead..a6067432e 100644
--- a/nova/rpc/__init__.py
+++ b/nova/rpc/__init__.py
@@ -48,7 +48,7 @@ def create_connection(new=True):
return _get_impl().create_connection(new=new)
-def call(context, topic, msg):
+def call(context, topic, msg, timeout=None):
"""Invoke a remote method that returns something.
:param context: Information that identifies the user that has made this
@@ -59,10 +59,15 @@ def call(context, topic, msg):
when the consumer was created with fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
+ :param timeout: int, number of seconds to use for a response timeout.
+ If set, this overrides the rpc_response_timeout option.
:returns: A dict from the remote method.
+
+ :raises: nova.rpc.common.Timeout if a complete response is not received
+ before the timeout is reached.
"""
- return _get_impl().call(context, topic, msg)
+ return _get_impl().call(context, topic, msg, timeout)
def cast(context, topic, msg):
@@ -102,7 +107,7 @@ def fanout_cast(context, topic, msg):
return _get_impl().fanout_cast(context, topic, msg)
-def multicall(context, topic, msg):
+def multicall(context, topic, msg, timeout=None):
"""Invoke a remote method and get back an iterator.
In this case, the remote method will be returning multiple values in
@@ -117,13 +122,18 @@ def multicall(context, topic, msg):
when the consumer was created with fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
+ :param timeout: int, number of seconds to use for a response timeout.
+ If set, this overrides the rpc_response_timeout option.
:returns: An iterator. The iterator will yield a tuple (N, X) where N is
an index that starts at 0 and increases by one for each value
returned and X is the Nth value that was returned by the remote
method.
+
+ :raises: nova.rpc.common.Timeout if a complete response is not received
+ before the timeout is reached.
"""
- return _get_impl().multicall(context, topic, msg)
+ return _get_impl().multicall(context, topic, msg, timeout)
def notify(context, topic, msg):
diff --git a/nova/rpc/amqp.py b/nova/rpc/amqp.py
index 483100806..0995d9ab8 100644
--- a/nova/rpc/amqp.py
+++ b/nova/rpc/amqp.py
@@ -262,9 +262,10 @@ class ProxyCallback(object):
class MulticallWaiter(object):
- def __init__(self, connection):
+ def __init__(self, connection, timeout):
self._connection = connection
- self._iterator = connection.iterconsume()
+ self._iterator = connection.iterconsume(
+ timeout=timeout or FLAGS.rpc_response_timeout)
self._result = None
self._done = False
self._got_ending = False
@@ -307,7 +308,7 @@ def create_connection(new=True):
return ConnectionContext(pooled=not new)
-def multicall(context, topic, msg):
+def multicall(context, topic, msg, timeout):
"""Make a call that returns multiple times."""
# Can't use 'with' for multicall, as it returns an iterator
# that will continue to use the connection. When it's done,
@@ -320,15 +321,15 @@ def multicall(context, topic, msg):
pack_context(msg, context)
conn = ConnectionContext()
- wait_msg = MulticallWaiter(conn)
+ wait_msg = MulticallWaiter(conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, msg)
return wait_msg
-def call(context, topic, msg):
+def call(context, topic, msg, timeout):
"""Sends a message on a topic and wait for a response."""
- rv = multicall(context, topic, msg)
+ rv = multicall(context, topic, msg, timeout)
# NOTE(vish): return the last result from the multicall
rv = list(rv)
if not rv:
diff --git a/nova/rpc/common.py b/nova/rpc/common.py
index ff0577011..70d5d07ba 100644
--- a/nova/rpc/common.py
+++ b/nova/rpc/common.py
@@ -34,6 +34,9 @@ rpc_opts = [
cfg.IntOpt('rpc_conn_pool_size',
default=30,
help='Size of RPC connection pool'),
+ cfg.IntOpt('rpc_response_timeout',
+ default=3600,
+ help='Seconds to wait for a response from call or multicall'),
]
flags.FLAGS.add_options(rpc_opts)
@@ -59,6 +62,15 @@ class RemoteError(exception.NovaException):
traceback=traceback)
+class Timeout(exception.NovaException):
+ """Signifies that a timeout has occurred.
+
+ This exception is raised if the rpc_response_timeout is reached while
+ waiting for a response from the remote side.
+ """
+ message = _("Timeout while waiting on RPC response.")
+
+
class Connection(object):
"""A connection, returned by rpc.create_connection().
diff --git a/nova/rpc/impl_carrot.py b/nova/rpc/impl_carrot.py
index 5750e5989..7ce377941 100644
--- a/nova/rpc/impl_carrot.py
+++ b/nova/rpc/impl_carrot.py
@@ -522,8 +522,9 @@ class RpcContext(context.RequestContext):
self.msg_id = None
-def multicall(context, topic, msg):
+def multicall(context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
+ # NOTE(russellb): carrot doesn't support timeouts
LOG.debug(_('Making asynchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
@@ -594,9 +595,9 @@ def create_connection(new=True):
return Connection.instance(new=new)
-def call(context, topic, msg):
+def call(context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
- rv = multicall(context, topic, msg)
+ rv = multicall(context, topic, msg, timeout)
# NOTE(vish): return the last result from the multicall
rv = list(rv)
if not rv:
diff --git a/nova/rpc/impl_fake.py b/nova/rpc/impl_fake.py
index dc30522b8..6e4d2f6ec 100644
--- a/nova/rpc/impl_fake.py
+++ b/nova/rpc/impl_fake.py
@@ -18,14 +18,21 @@ queues. Casts will block, but this is very useful for tests.
"""
import inspect
+import signal
import sys
+import time
import traceback
+import eventlet
+
from nova import context
+from nova import flags
from nova.rpc import common as rpc_common
CONSUMERS = {}
+FLAGS = flags.FLAGS
+
class RpcContext(context.RequestContext):
def __init__(self, *args, **kwargs):
@@ -45,31 +52,49 @@ class Consumer(object):
self.topic = topic
self.proxy = proxy
- def call(self, context, method, args):
+ def call(self, context, method, args, timeout):
node_func = getattr(self.proxy, method)
node_args = dict((str(k), v) for k, v in args.iteritems())
-
- ctxt = RpcContext.from_dict(context.to_dict())
- try:
- rval = node_func(context=ctxt, **node_args)
- # Caller might have called ctxt.reply() manually
- for (reply, failure) in ctxt._response:
- if failure:
- raise failure[0], failure[1], failure[2]
- yield reply
- # if ending not 'sent'...we might have more data to
- # return from the function itself
- if not ctxt._done:
- if inspect.isgenerator(rval):
- for val in rval:
- yield val
- else:
- yield rval
- except Exception:
- exc_info = sys.exc_info()
- raise rpc_common.RemoteError(exc_info[0].__name__,
- str(exc_info[1]),
- ''.join(traceback.format_exception(*exc_info)))
+ done = eventlet.event.Event()
+
+ def _inner():
+ ctxt = RpcContext.from_dict(context.to_dict())
+ try:
+ rval = node_func(context=ctxt, **node_args)
+ res = []
+ # Caller might have called ctxt.reply() manually
+ for (reply, failure) in ctxt._response:
+ if failure:
+ raise failure[0], failure[1], failure[2]
+ res.append(reply)
+ # if ending not 'sent'...we might have more data to
+ # return from the function itself
+ if not ctxt._done:
+ if inspect.isgenerator(rval):
+ for val in rval:
+ res.append(val)
+ else:
+ res.append(rval)
+ done.send(res)
+ except Exception:
+ exc_info = sys.exc_info()
+ done.send_exception(
+ rpc_common.RemoteError(exc_info[0].__name__,
+ str(exc_info[1]),
+ ''.join(traceback.format_exception(*exc_info))))
+
+ thread = eventlet.greenthread.spawn(_inner)
+
+ if timeout:
+ start_time = time.time()
+ while not done.ready():
+ eventlet.greenthread.sleep(1)
+ cur_time = time.time()
+ if (cur_time - start_time) > timeout:
+ thread.kill()
+ raise rpc_common.Timeout()
+
+ return done.wait()
class Connection(object):
@@ -99,7 +124,7 @@ def create_connection(new=True):
return Connection()
-def multicall(context, topic, msg):
+def multicall(context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
method = msg.get('method')
@@ -112,12 +137,12 @@ def multicall(context, topic, msg):
except (KeyError, IndexError):
return iter([None])
else:
- return consumer.call(context, method, args)
+ return consumer.call(context, method, args, timeout)
-def call(context, topic, msg):
+def call(context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
- rv = multicall(context, topic, msg)
+ rv = multicall(context, topic, msg, timeout)
# NOTE(vish): return the last result from the multicall
rv = list(rv)
if not rv:
diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py
index e2c0b9036..50459e5ad 100644
--- a/nova/rpc/impl_kombu.py
+++ b/nova/rpc/impl_kombu.py
@@ -15,6 +15,7 @@
# under the License.
import itertools
+import socket
import sys
import time
import uuid
@@ -425,7 +426,7 @@ class Connection(object):
while True:
try:
return method(*args, **kwargs)
- except self.connection_errors, e:
+ except (self.connection_errors, socket.timeout), e:
pass
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
@@ -478,15 +479,20 @@ class Connection(object):
return self.ensure(_connect_error, _declare_consumer)
- def iterconsume(self, limit=None):
+ def iterconsume(self, limit=None, timeout=None):
"""Return an iterator that will consume from all queues/consumers"""
info = {'do_consume': True}
def _error_callback(exc):
- LOG.exception(_('Failed to consume message from queue: %s') %
- str(exc))
- info['do_consume'] = True
+ if isinstance(exc, socket.timeout):
+ LOG.exception(_('Timed out waiting for RPC response: %s') %
+ str(exc))
+ raise rpc_common.Timeout()
+ else:
+ LOG.exception(_('Failed to consume message from queue: %s') %
+ str(exc))
+ info['do_consume'] = True
def _consume():
if info['do_consume']:
@@ -496,7 +502,7 @@ class Connection(object):
queue.consume(nowait=True)
queues_tail.consume(nowait=False)
info['do_consume'] = False
- return self.connection.drain_events()
+ return self.connection.drain_events(timeout=timeout)
for iteration in itertools.count(0):
if limit and iteration >= limit:
@@ -595,14 +601,14 @@ def create_connection(new=True):
return rpc_amqp.create_connection(new)
-def multicall(context, topic, msg):
+def multicall(context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
- return rpc_amqp.multicall(context, topic, msg)
+ return rpc_amqp.multicall(context, topic, msg, timeout)
-def call(context, topic, msg):
+def call(context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
- return rpc_amqp.call(context, topic, msg)
+ return rpc_amqp.call(context, topic, msg, timeout)
def cast(context, topic, msg):
diff --git a/nova/rpc/impl_qpid.py b/nova/rpc/impl_qpid.py
index f4b6b9ffa..353c7e502 100644
--- a/nova/rpc/impl_qpid.py
+++ b/nova/rpc/impl_qpid.py
@@ -28,6 +28,7 @@ import qpid.messaging.exceptions
from nova.common import cfg
from nova import flags
from nova.rpc import amqp as rpc_amqp
+from nova.rpc import common as rpc_common
from nova.rpc.common import LOG
@@ -338,7 +339,8 @@ class Connection(object):
while True:
try:
return method(*args, **kwargs)
- except qpid.messaging.exceptions.ConnectionError, e:
+ except (qpid.messaging.exceptions.Empty,
+ qpid.messaging.exceptions.ConnectionError), e:
if error_callback:
error_callback(e)
self.reconnect()
@@ -372,15 +374,20 @@ class Connection(object):
return self.ensure(_connect_error, _declare_consumer)
- def iterconsume(self, limit=None):
+ def iterconsume(self, limit=None, timeout=None):
"""Return an iterator that will consume from all queues/consumers"""
def _error_callback(exc):
- LOG.exception(_('Failed to consume message from queue: %s') %
- str(exc))
+ if isinstance(exc, qpid.messaging.exceptions.Empty):
+ LOG.exception(_('Timed out waiting for RPC response: %s') %
+ str(exc))
+ raise rpc_common.Timeout()
+ else:
+ LOG.exception(_('Failed to consume message from queue: %s') %
+ str(exc))
def _consume():
- nxt_receiver = self.session.next_receiver()
+ nxt_receiver = self.session.next_receiver(timeout=timeout)
self._lookup_consumer(nxt_receiver).consume()
for iteration in itertools.count(0):
@@ -483,14 +490,14 @@ def create_connection(new=True):
return rpc_amqp.create_connection(new)
-def multicall(context, topic, msg):
+def multicall(context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
- return rpc_amqp.multicall(context, topic, msg)
+ return rpc_amqp.multicall(context, topic, msg, timeout)
-def call(context, topic, msg):
+def call(context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
- return rpc_amqp.call(context, topic, msg)
+ return rpc_amqp.call(context, topic, msg, timeout)
def cast(context, topic, msg):
diff --git a/nova/tests/rpc/common.py b/nova/tests/rpc/common.py
index dc8aafcfe..c41375ace 100644
--- a/nova/tests/rpc/common.py
+++ b/nova/tests/rpc/common.py
@@ -19,9 +19,13 @@
Unit Tests for remote procedure calls shared between all implementations
"""
+import time
+
+import nose
+
from nova import context
from nova import log as logging
-from nova.rpc.common import RemoteError
+from nova.rpc.common import RemoteError, Timeout
from nova import test
@@ -29,13 +33,14 @@ LOG = logging.getLogger('nova.tests.rpc')
class _BaseRpcTestCase(test.TestCase):
- def setUp(self):
+ def setUp(self, supports_timeouts=True):
super(_BaseRpcTestCase, self).setUp()
self.conn = self.rpc.create_connection(True)
self.receiver = TestReceiver()
self.conn.create_consumer('test', self.receiver, False)
self.conn.consume_in_thread()
self.context = context.get_admin_context()
+ self.supports_timeouts = supports_timeouts
def tearDown(self):
self.conn.close()
@@ -162,6 +167,28 @@ class _BaseRpcTestCase(test.TestCase):
conn.close()
self.assertEqual(value, result)
+ def test_call_timeout(self):
+ """Make sure rpc.call will time out"""
+ if not self.supports_timeouts:
+ raise nose.SkipTest(_("RPC backend does not support timeouts"))
+
+ value = 42
+ self.assertRaises(Timeout,
+ self.rpc.call,
+ self.context,
+ 'test',
+ {"method": "block",
+ "args": {"value": value}}, timeout=1)
+ try:
+ self.rpc.call(self.context,
+ 'test',
+ {"method": "block",
+ "args": {"value": value}},
+ timeout=1)
+ self.fail("should have thrown Timeout")
+ except Timeout as exc:
+ pass
+
class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call.
@@ -205,3 +232,7 @@ class TestReceiver(object):
def fail(context, value):
"""Raises an exception with the value sent in."""
raise Exception(value)
+
+ @staticmethod
+ def block(context, value):
+ time.sleep(2)
diff --git a/nova/tests/rpc/test_carrot.py b/nova/tests/rpc/test_carrot.py
index 2523810d8..153747da2 100644
--- a/nova/tests/rpc/test_carrot.py
+++ b/nova/tests/rpc/test_carrot.py
@@ -30,7 +30,7 @@ LOG = logging.getLogger('nova.tests.rpc')
class RpcCarrotTestCase(common._BaseRpcTestCase):
def setUp(self):
self.rpc = impl_carrot
- super(RpcCarrotTestCase, self).setUp()
+ super(RpcCarrotTestCase, self).setUp(supports_timeouts=False)
def tearDown(self):
super(RpcCarrotTestCase, self).tearDown()
diff --git a/nova/tests/rpc/test_qpid.py b/nova/tests/rpc/test_qpid.py
index 0417674b8..9e318fbfd 100644
--- a/nova/tests/rpc/test_qpid.py
+++ b/nova/tests/rpc/test_qpid.py
@@ -221,21 +221,25 @@ class RpcQpidTestCase(test.TestCase):
self.mock_session.sender(send_addr).AndReturn(self.mock_sender)
self.mock_sender.send(mox.IgnoreArg())
- self.mock_session.next_receiver().AndReturn(self.mock_receiver)
+ self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
{"result": "foo", "failure": False, "ending": False}))
if multi:
- self.mock_session.next_receiver().AndReturn(self.mock_receiver)
+ self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ self.mock_receiver)
self.mock_receiver.fetch().AndReturn(
qpid.messaging.Message(
{"result": "bar", "failure": False,
"ending": False}))
- self.mock_session.next_receiver().AndReturn(self.mock_receiver)
+ self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ self.mock_receiver)
self.mock_receiver.fetch().AndReturn(
qpid.messaging.Message(
{"result": "baz", "failure": False,
"ending": False}))
- self.mock_session.next_receiver().AndReturn(self.mock_receiver)
+ self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
{"failure": False, "ending": True}))
self.mock_session.close()