From b893608b77dac11fceb0b0392f8f908de586970b Mon Sep 17 00:00:00 2001 From: Chris Behrens Date: Thu, 29 Mar 2012 18:44:16 +0000 Subject: Make kombu failures retry on IOError Fixes bug 797770 Unfortunately if rabbit decides protocol negotiation is taking too long, it will close the socket on us. This ends up raising IOError with a 'socket closed' message. This patch will catch IOError and re-try. Change-Id: I9110c845b71118c0fad760d90e91c585e6db46ed --- nova/rpc/impl_kombu.py | 4 ++-- nova/tests/rpc/test_kombu.py | 17 +++++++++++++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py index f78005e3c..f3b78edc5 100644 --- a/nova/rpc/impl_kombu.py +++ b/nova/rpc/impl_kombu.py @@ -445,7 +445,7 @@ class Connection(object): try: self._connect() return - except self.connection_errors, e: + except (self.connection_errors, IOError), e: pass except Exception, e: # NOTE(comstud): Unfortunately it's possible for amqplib @@ -488,7 +488,7 @@ class Connection(object): while True: try: return method(*args, **kwargs) - except (self.connection_errors, socket.timeout), e: + except (self.connection_errors, socket.timeout, IOError), e: pass except Exception, e: # NOTE(comstud): Unfortunately it's possible for amqplib diff --git a/nova/tests/rpc/test_kombu.py b/nova/tests/rpc/test_kombu.py index 47d15f844..aa49b5d51 100644 --- a/nova/tests/rpc/test_kombu.py +++ b/nova/tests/rpc/test_kombu.py @@ -35,14 +35,15 @@ class MyException(Exception): pass -def _raise_exc_stub(stubs, times, obj, method, exc_msg): +def _raise_exc_stub(stubs, times, obj, method, exc_msg, + exc_class=MyException): info = {'called': 0} orig_method = getattr(obj, method) def _raise_stub(*args, **kwargs): info['called'] += 1 if info['called'] <= times: - raise MyException(exc_msg) + raise exc_class(exc_msg) orig_method(*args, **kwargs) stubs.Set(obj, method, _raise_stub) return info @@ -213,6 +214,18 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): self.assertEqual(info['called'], 2) self.assertTrue(isinstance(result, self.rpc.DirectConsumer)) + def test_declare_consumer_ioerrors_will_reconnect(self): + """Test that an IOError exception causes a reconnection""" + info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer, + '__init__', 'Socket closed', exc_class=IOError) + + conn = self.rpc.Connection() + result = conn.declare_consumer(self.rpc.DirectConsumer, + 'test_topic', None) + + self.assertEqual(info['called'], 3) + self.assertTrue(isinstance(result, self.rpc.DirectConsumer)) + def test_publishing_errors_will_reconnect(self): # Test that any exception with 'timeout' in it causes a # reconnection when declaring the publisher class and when -- cgit