diff options
| author | Jenkins <jenkins@review.openstack.org> | 2012-01-13 18:50:10 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2012-01-13 18:50:10 +0000 |
| commit | 485328e48fc7dff1ce2ebfd262c442d8f993688b (patch) | |
| tree | bd14c184edfc8c80fac079f2a94c4b91966747b9 /nova/tests | |
| parent | 572c85da9843b549f3ac26ee7d4107a3ab0b0809 (diff) | |
| parent | 59e8ae1362f33ab30b2dc900dcbde30efc5a57c8 (diff) | |
Merge "Implement more complete kombu reconnecting"
Diffstat (limited to 'nova/tests')
| -rw-r--r-- | nova/tests/rpc/test_kombu.py | 114 |
1 files changed, 114 insertions, 0 deletions
diff --git a/nova/tests/rpc/test_kombu.py b/nova/tests/rpc/test_kombu.py index 01b00f33d..dc100cf12 100644 --- a/nova/tests/rpc/test_kombu.py +++ b/nova/tests/rpc/test_kombu.py @@ -29,6 +29,23 @@ from nova.tests.rpc import common LOG = logging.getLogger('nova.tests.rpc') +class MyException(Exception): + pass + + +def _raise_exc_stub(stubs, times, obj, method, exc_msg): + info = {'called': 0} + orig_method = getattr(obj, method) + + def _raise_stub(*args, **kwargs): + info['called'] += 1 + if info['called'] <= times: + raise MyException(exc_msg) + orig_method(*args, **kwargs) + stubs.Set(obj, method, _raise_stub) + return info + + class RpcKombuTestCase(common._BaseRpcTestCase): def setUp(self): self.rpc = impl_kombu @@ -108,3 +125,100 @@ class RpcKombuTestCase(common._BaseRpcTestCase): conn2.consume(limit=1) conn2.close() self.assertEqual(self.received_message, message) + + def test_declare_consumer_errors_will_reconnect(self): + # Test that any exception with 'timeout' in it causes a + # reconnection + info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer, + '__init__', 'foo timeout foo') + + conn = self.rpc.Connection() + result = conn.declare_consumer(self.rpc.DirectConsumer, + 'test_topic', None) + + self.assertEqual(info['called'], 3) + self.assertTrue(isinstance(result, self.rpc.DirectConsumer)) + + # Test that any exception in transport.connection_errors causes + # a reconnection + self.stubs.UnsetAll() + + info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectConsumer, + '__init__', 'meow') + + conn = self.rpc.Connection() + conn.connection_errors = (MyException, ) + + result = conn.declare_consumer(self.rpc.DirectConsumer, + 'test_topic', None) + + self.assertEqual(info['called'], 2) + self.assertTrue(isinstance(result, self.rpc.DirectConsumer)) + + def test_publishing_errors_will_reconnect(self): + # Test that any exception with 'timeout' in it causes a + # reconnection when declaring the publisher class and when + # calling send() + info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher, + '__init__', 'foo timeout foo') + + conn = self.rpc.Connection() + conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg') + + self.assertEqual(info['called'], 3) + self.stubs.UnsetAll() + + info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher, + 'send', 'foo timeout foo') + + conn = self.rpc.Connection() + conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg') + + self.assertEqual(info['called'], 3) + + # Test that any exception in transport.connection_errors causes + # a reconnection when declaring the publisher class and when + # calling send() + self.stubs.UnsetAll() + + info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher, + '__init__', 'meow') + + conn = self.rpc.Connection() + conn.connection_errors = (MyException, ) + + conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg') + + self.assertEqual(info['called'], 2) + self.stubs.UnsetAll() + + info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher, + 'send', 'meow') + + conn = self.rpc.Connection() + conn.connection_errors = (MyException, ) + + conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg') + + self.assertEqual(info['called'], 2) + + def test_iterconsume_errors_will_reconnect(self): + conn = self.rpc.Connection() + message = 'reconnect test message' + + self.received_message = None + + def _callback(message): + self.received_message = message + + conn.declare_direct_consumer('a_direct', _callback) + conn.direct_send('a_direct', message) + + info = _raise_exc_stub(self.stubs, 1, conn.connection, + 'drain_events', 'foo timeout foo') + conn.consume(limit=1) + conn.close() + + self.assertEqual(self.received_message, message) + # Only called once, because our stub goes away during reconnection + self.assertEqual(info['called'], 1) |
