diff options
-rw-r--r-- | nova/rpc/impl_kombu.py | 7 | ||||
-rw-r--r-- | nova/rpc/impl_qpid.py | 5 | ||||
-rw-r--r-- | nova/tests/rpc/common.py | 45 | ||||
-rw-r--r-- | nova/tests/rpc/test_carrot.py | 2 | ||||
-rw-r--r-- | nova/tests/rpc/test_fake.py | 2 | ||||
-rw-r--r-- | nova/tests/rpc/test_kombu.py | 2 |
6 files changed, 54 insertions, 9 deletions
diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py index 8f527467f..f78005e3c 100644 --- a/nova/rpc/impl_kombu.py +++ b/nova/rpc/impl_kombu.py @@ -104,8 +104,11 @@ class ConsumerBase(object): def _callback(raw_message): message = self.channel.message_to_python(raw_message) - callback(message.payload) - message.ack() + try: + callback(message.payload) + message.ack() + except Exception: + LOG.exception(_("Failed to process message... skipping it.")) self.queue.consume(*args, callback=_callback, **options) diff --git a/nova/rpc/impl_qpid.py b/nova/rpc/impl_qpid.py index 88e820e82..8b10b7547 100644 --- a/nova/rpc/impl_qpid.py +++ b/nova/rpc/impl_qpid.py @@ -406,7 +406,10 @@ class Connection(object): def _consume(): nxt_receiver = self.session.next_receiver(timeout=timeout) - self._lookup_consumer(nxt_receiver).consume() + try: + self._lookup_consumer(nxt_receiver).consume() + except Exception: + LOG.exception(_("Error processing message. Skipping it.")) for iteration in itertools.count(0): if limit and iteration >= limit: diff --git a/nova/tests/rpc/common.py b/nova/tests/rpc/common.py index 320df834d..4ccc8ef6c 100644 --- a/nova/tests/rpc/common.py +++ b/nova/tests/rpc/common.py @@ -21,10 +21,12 @@ Unit Tests for remote procedure calls shared between all implementations import time +from eventlet import greenthread import nose from nova import context from nova import log as logging +from nova.rpc import amqp as rpc_amqp from nova.rpc import common as rpc_common from nova import test @@ -32,9 +34,9 @@ from nova import test LOG = logging.getLogger(__name__) -class _BaseRpcTestCase(test.TestCase): +class BaseRpcTestCase(test.TestCase): def setUp(self, supports_timeouts=True): - super(_BaseRpcTestCase, self).setUp() + super(BaseRpcTestCase, self).setUp() self.conn = self.rpc.create_connection(True) self.receiver = TestReceiver() self.conn.create_consumer('test', self.receiver, False) @@ -44,7 +46,7 @@ class _BaseRpcTestCase(test.TestCase): def tearDown(self): self.conn.close() - super(_BaseRpcTestCase, self).tearDown() + super(BaseRpcTestCase, self).tearDown() def test_call_succeed(self): value = 42 @@ -174,6 +176,43 @@ class _BaseRpcTestCase(test.TestCase): pass +class BaseRpcAMQPTestCase(BaseRpcTestCase): + """Base test class for all AMQP-based RPC tests""" + def test_proxycallback_handles_exceptions(self): + """Make sure exceptions unpacking messages don't cause hangs.""" + orig_unpack = rpc_amqp.unpack_context + + info = {'unpacked': False} + + def fake_unpack_context(*args, **kwargs): + info['unpacked'] = True + raise test.TestingException('moo') + + self.stubs.Set(rpc_amqp, 'unpack_context', fake_unpack_context) + + value = 41 + self.rpc.cast(self.context, 'test', {"method": "echo", + "args": {"value": value}}) + + # Wait for the cast to complete. + for x in xrange(50): + if info['unpacked']: + break + greenthread.sleep(0.1) + else: + self.fail("Timeout waiting for message to be consued") + + # Now see if we get a response even though we raised an + # exception for the cast above. + self.stubs.Set(rpc_amqp, 'unpack_context', orig_unpack) + + value = 42 + result = self.rpc.call(self.context, 'test', + {"method": "echo", + "args": {"value": value}}) + self.assertEqual(value, result) + + class TestReceiver(object): """Simple Proxy class so the consumer has methods to call. diff --git a/nova/tests/rpc/test_carrot.py b/nova/tests/rpc/test_carrot.py index 239863048..dae08e8e0 100644 --- a/nova/tests/rpc/test_carrot.py +++ b/nova/tests/rpc/test_carrot.py @@ -27,7 +27,7 @@ from nova.tests.rpc import common LOG = logging.getLogger(__name__) -class RpcCarrotTestCase(common._BaseRpcTestCase): +class RpcCarrotTestCase(common.BaseRpcTestCase): def setUp(self): self.rpc = impl_carrot super(RpcCarrotTestCase, self).setUp(supports_timeouts=False) diff --git a/nova/tests/rpc/test_fake.py b/nova/tests/rpc/test_fake.py index 72333c139..8d6878ca4 100644 --- a/nova/tests/rpc/test_fake.py +++ b/nova/tests/rpc/test_fake.py @@ -27,7 +27,7 @@ from nova.tests.rpc import common LOG = logging.getLogger(__name__) -class RpcFakeTestCase(common._BaseRpcTestCase): +class RpcFakeTestCase(common.BaseRpcTestCase): def setUp(self): self.rpc = impl_fake super(RpcFakeTestCase, self).setUp() diff --git a/nova/tests/rpc/test_kombu.py b/nova/tests/rpc/test_kombu.py index f90d111fc..47d15f844 100644 --- a/nova/tests/rpc/test_kombu.py +++ b/nova/tests/rpc/test_kombu.py @@ -48,7 +48,7 @@ def _raise_exc_stub(stubs, times, obj, method, exc_msg): return info -class RpcKombuTestCase(common._BaseRpcTestCase): +class RpcKombuTestCase(common.BaseRpcAMQPTestCase): def setUp(self): self.rpc = impl_kombu super(RpcKombuTestCase, self).setUp() |