summaryrefslogtreecommitdiffstats
path: root/nova/tests
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2012-01-13 18:50:10 +0000
committerGerrit Code Review <review@openstack.org>2012-01-13 18:50:10 +0000
commit485328e48fc7dff1ce2ebfd262c442d8f993688b (patch)
treebd14c184edfc8c80fac079f2a94c4b91966747b9 /nova/tests
parent572c85da9843b549f3ac26ee7d4107a3ab0b0809 (diff)
parent59e8ae1362f33ab30b2dc900dcbde30efc5a57c8 (diff)
Merge "Implement more complete kombu reconnecting"
Diffstat (limited to 'nova/tests')
-rw-r--r--nova/tests/rpc/test_kombu.py114
1 files changed, 114 insertions, 0 deletions
diff --git a/nova/tests/rpc/test_kombu.py b/nova/tests/rpc/test_kombu.py
index 01b00f33d..dc100cf12 100644
--- a/nova/tests/rpc/test_kombu.py
+++ b/nova/tests/rpc/test_kombu.py
@@ -29,6 +29,23 @@ from nova.tests.rpc import common
LOG = logging.getLogger('nova.tests.rpc')
+class MyException(Exception):
+ pass
+
+
+def _raise_exc_stub(stubs, times, obj, method, exc_msg):
+ info = {'called': 0}
+ orig_method = getattr(obj, method)
+
+ def _raise_stub(*args, **kwargs):
+ info['called'] += 1
+ if info['called'] <= times:
+ raise MyException(exc_msg)
+ orig_method(*args, **kwargs)
+ stubs.Set(obj, method, _raise_stub)
+ return info
+
+
class RpcKombuTestCase(common._BaseRpcTestCase):
def setUp(self):
self.rpc = impl_kombu
@@ -108,3 +125,100 @@ class RpcKombuTestCase(common._BaseRpcTestCase):
conn2.consume(limit=1)
conn2.close()
self.assertEqual(self.received_message, message)
+
+ def test_declare_consumer_errors_will_reconnect(self):
+ # Test that any exception with 'timeout' in it causes a
+ # reconnection
+ info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer,
+ '__init__', 'foo timeout foo')
+
+ conn = self.rpc.Connection()
+ result = conn.declare_consumer(self.rpc.DirectConsumer,
+ 'test_topic', None)
+
+ self.assertEqual(info['called'], 3)
+ self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
+
+ # Test that any exception in transport.connection_errors causes
+ # a reconnection
+ self.stubs.UnsetAll()
+
+ info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectConsumer,
+ '__init__', 'meow')
+
+ conn = self.rpc.Connection()
+ conn.connection_errors = (MyException, )
+
+ result = conn.declare_consumer(self.rpc.DirectConsumer,
+ 'test_topic', None)
+
+ self.assertEqual(info['called'], 2)
+ self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
+
+ def test_publishing_errors_will_reconnect(self):
+ # Test that any exception with 'timeout' in it causes a
+ # reconnection when declaring the publisher class and when
+ # calling send()
+ info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
+ '__init__', 'foo timeout foo')
+
+ conn = self.rpc.Connection()
+ conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
+
+ self.assertEqual(info['called'], 3)
+ self.stubs.UnsetAll()
+
+ info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
+ 'send', 'foo timeout foo')
+
+ conn = self.rpc.Connection()
+ conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
+
+ self.assertEqual(info['called'], 3)
+
+ # Test that any exception in transport.connection_errors causes
+ # a reconnection when declaring the publisher class and when
+ # calling send()
+ self.stubs.UnsetAll()
+
+ info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
+ '__init__', 'meow')
+
+ conn = self.rpc.Connection()
+ conn.connection_errors = (MyException, )
+
+ conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
+
+ self.assertEqual(info['called'], 2)
+ self.stubs.UnsetAll()
+
+ info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
+ 'send', 'meow')
+
+ conn = self.rpc.Connection()
+ conn.connection_errors = (MyException, )
+
+ conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
+
+ self.assertEqual(info['called'], 2)
+
+ def test_iterconsume_errors_will_reconnect(self):
+ conn = self.rpc.Connection()
+ message = 'reconnect test message'
+
+ self.received_message = None
+
+ def _callback(message):
+ self.received_message = message
+
+ conn.declare_direct_consumer('a_direct', _callback)
+ conn.direct_send('a_direct', message)
+
+ info = _raise_exc_stub(self.stubs, 1, conn.connection,
+ 'drain_events', 'foo timeout foo')
+ conn.consume(limit=1)
+ conn.close()
+
+ self.assertEqual(self.received_message, message)
+ # Only called once, because our stub goes away during reconnection
+ self.assertEqual(info['called'], 1)