summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--nova/rpc/impl_kombu.py7
-rw-r--r--nova/rpc/impl_qpid.py5
-rw-r--r--nova/tests/rpc/common.py45
-rw-r--r--nova/tests/rpc/test_carrot.py2
-rw-r--r--nova/tests/rpc/test_fake.py2
-rw-r--r--nova/tests/rpc/test_kombu.py2
6 files changed, 54 insertions, 9 deletions
diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py
index 8f527467f..f78005e3c 100644
--- a/nova/rpc/impl_kombu.py
+++ b/nova/rpc/impl_kombu.py
@@ -104,8 +104,11 @@ class ConsumerBase(object):
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
- callback(message.payload)
- message.ack()
+ try:
+ callback(message.payload)
+ message.ack()
+ except Exception:
+ LOG.exception(_("Failed to process message... skipping it."))
self.queue.consume(*args, callback=_callback, **options)
diff --git a/nova/rpc/impl_qpid.py b/nova/rpc/impl_qpid.py
index 88e820e82..8b10b7547 100644
--- a/nova/rpc/impl_qpid.py
+++ b/nova/rpc/impl_qpid.py
@@ -406,7 +406,10 @@ class Connection(object):
def _consume():
nxt_receiver = self.session.next_receiver(timeout=timeout)
- self._lookup_consumer(nxt_receiver).consume()
+ try:
+ self._lookup_consumer(nxt_receiver).consume()
+ except Exception:
+ LOG.exception(_("Error processing message. Skipping it."))
for iteration in itertools.count(0):
if limit and iteration >= limit:
diff --git a/nova/tests/rpc/common.py b/nova/tests/rpc/common.py
index 320df834d..4ccc8ef6c 100644
--- a/nova/tests/rpc/common.py
+++ b/nova/tests/rpc/common.py
@@ -21,10 +21,12 @@ Unit Tests for remote procedure calls shared between all implementations
import time
+from eventlet import greenthread
import nose
from nova import context
from nova import log as logging
+from nova.rpc import amqp as rpc_amqp
from nova.rpc import common as rpc_common
from nova import test
@@ -32,9 +34,9 @@ from nova import test
LOG = logging.getLogger(__name__)
-class _BaseRpcTestCase(test.TestCase):
+class BaseRpcTestCase(test.TestCase):
def setUp(self, supports_timeouts=True):
- super(_BaseRpcTestCase, self).setUp()
+ super(BaseRpcTestCase, self).setUp()
self.conn = self.rpc.create_connection(True)
self.receiver = TestReceiver()
self.conn.create_consumer('test', self.receiver, False)
@@ -44,7 +46,7 @@ class _BaseRpcTestCase(test.TestCase):
def tearDown(self):
self.conn.close()
- super(_BaseRpcTestCase, self).tearDown()
+ super(BaseRpcTestCase, self).tearDown()
def test_call_succeed(self):
value = 42
@@ -174,6 +176,43 @@ class _BaseRpcTestCase(test.TestCase):
pass
+class BaseRpcAMQPTestCase(BaseRpcTestCase):
+ """Base test class for all AMQP-based RPC tests"""
+ def test_proxycallback_handles_exceptions(self):
+ """Make sure exceptions unpacking messages don't cause hangs."""
+ orig_unpack = rpc_amqp.unpack_context
+
+ info = {'unpacked': False}
+
+ def fake_unpack_context(*args, **kwargs):
+ info['unpacked'] = True
+ raise test.TestingException('moo')
+
+ self.stubs.Set(rpc_amqp, 'unpack_context', fake_unpack_context)
+
+ value = 41
+ self.rpc.cast(self.context, 'test', {"method": "echo",
+ "args": {"value": value}})
+
+ # Wait for the cast to complete.
+ for x in xrange(50):
+ if info['unpacked']:
+ break
+ greenthread.sleep(0.1)
+ else:
+ self.fail("Timeout waiting for message to be consued")
+
+ # Now see if we get a response even though we raised an
+ # exception for the cast above.
+ self.stubs.Set(rpc_amqp, 'unpack_context', orig_unpack)
+
+ value = 42
+ result = self.rpc.call(self.context, 'test',
+ {"method": "echo",
+ "args": {"value": value}})
+ self.assertEqual(value, result)
+
+
class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call.
diff --git a/nova/tests/rpc/test_carrot.py b/nova/tests/rpc/test_carrot.py
index 239863048..dae08e8e0 100644
--- a/nova/tests/rpc/test_carrot.py
+++ b/nova/tests/rpc/test_carrot.py
@@ -27,7 +27,7 @@ from nova.tests.rpc import common
LOG = logging.getLogger(__name__)
-class RpcCarrotTestCase(common._BaseRpcTestCase):
+class RpcCarrotTestCase(common.BaseRpcTestCase):
def setUp(self):
self.rpc = impl_carrot
super(RpcCarrotTestCase, self).setUp(supports_timeouts=False)
diff --git a/nova/tests/rpc/test_fake.py b/nova/tests/rpc/test_fake.py
index 72333c139..8d6878ca4 100644
--- a/nova/tests/rpc/test_fake.py
+++ b/nova/tests/rpc/test_fake.py
@@ -27,7 +27,7 @@ from nova.tests.rpc import common
LOG = logging.getLogger(__name__)
-class RpcFakeTestCase(common._BaseRpcTestCase):
+class RpcFakeTestCase(common.BaseRpcTestCase):
def setUp(self):
self.rpc = impl_fake
super(RpcFakeTestCase, self).setUp()
diff --git a/nova/tests/rpc/test_kombu.py b/nova/tests/rpc/test_kombu.py
index f90d111fc..47d15f844 100644
--- a/nova/tests/rpc/test_kombu.py
+++ b/nova/tests/rpc/test_kombu.py
@@ -48,7 +48,7 @@ def _raise_exc_stub(stubs, times, obj, method, exc_msg):
return info
-class RpcKombuTestCase(common._BaseRpcTestCase):
+class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
def setUp(self):
self.rpc = impl_kombu
super(RpcKombuTestCase, self).setUp()