summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-08-19 13:01:30 +0000
committerGerrit Code Review <review@openstack.org>2013-08-19 13:01:30 +0000
commit1a9966e52d7bf35774b9a009afe6412e6c5f014a (patch)
tree9f189a88a85878bed3277805eb56c7dc5fb664b6
parent3295a9024a9c4e07c8691b047ea32917605d9351 (diff)
parenta047a353438d1e9449f94850b29d79bb2dd8f9b1 (diff)
downloadoslo-1a9966e52d7bf35774b9a009afe6412e6c5f014a.tar.gz
oslo-1a9966e52d7bf35774b9a009afe6412e6c5f014a.tar.xz
oslo-1a9966e52d7bf35774b9a009afe6412e6c5f014a.zip
Merge "Make ZeroMQ based RPC consumer threads more robust"
-rw-r--r--openstack/common/rpc/impl_zmq.py1
-rw-r--r--tests/unit/rpc/test_zmq.py21
2 files changed, 22 insertions, 0 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index 33d5cb6..a5ddccf 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -383,6 +383,7 @@ class ZmqBaseReactor(ConsumerBase):
LOG.info(_("In reactor registered"))
def consume_in_thread(self):
+ @excutils.forever_retry_uncaught_exceptions
def _consume(sock):
LOG.info(_("Consuming socket"))
while True:
diff --git a/tests/unit/rpc/test_zmq.py b/tests/unit/rpc/test_zmq.py
index 11d3fcd..888dbb0 100644
--- a/tests/unit/rpc/test_zmq.py
+++ b/tests/unit/rpc/test_zmq.py
@@ -112,6 +112,27 @@ class _RpcZmqBaseTestCase(common.BaseRpcTestCase):
finally:
self.topic_nested = tmp_topic
+ def test_service_consume_thread_unexpected_exceptions(self):
+ def my_TopicConsumer_consume(myself, *args, **kwargs):
+ self.consume_calls += 1
+ # see if it can sustain three failures
+ if self.consume_calls < 3:
+ raise Exception('unexpected exception')
+ else:
+ self.orig_ZmqReactor_consume(myself, *args, **kwargs)
+
+ self.consume_calls = 0
+ self.orig_ZmqReactor_consume = impl_zmq.ZmqReactor.consume
+ self.stubs.Set(impl_zmq.ZmqReactor, 'consume',
+ my_TopicConsumer_consume)
+
+ value = 42
+ result = self.rpc.call(FLAGS, self.context, self.topic,
+ {"method": "echo",
+ "args": {"value": value}})
+ self.assertEqual(value, result)
+ self.stubs.UnsetAll()
+
class RpcZmqBaseTopicTestCase(_RpcZmqBaseTestCase):
"""Base topic RPC ZMQ test case.