summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2012-01-13 18:50:10 +0000
committerGerrit Code Review <review@openstack.org>2012-01-13 18:50:10 +0000
commit485328e48fc7dff1ce2ebfd262c442d8f993688b (patch)
treebd14c184edfc8c80fac079f2a94c4b91966747b9
parent572c85da9843b549f3ac26ee7d4107a3ab0b0809 (diff)
parent59e8ae1362f33ab30b2dc900dcbde30efc5a57c8 (diff)
downloadnova-485328e48fc7dff1ce2ebfd262c442d8f993688b.tar.gz
nova-485328e48fc7dff1ce2ebfd262c442d8f993688b.tar.xz
nova-485328e48fc7dff1ce2ebfd262c442d8f993688b.zip
Merge "Implement more complete kombu reconnecting"
-rw-r--r--nova/rpc/impl_kombu.py203
-rw-r--r--nova/tests/rpc/test_kombu.py114
2 files changed, 250 insertions, 67 deletions
diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py
index 9ed1e4e6f..f0a8bc61e 100644
--- a/nova/rpc/impl_kombu.py
+++ b/nova/rpc/impl_kombu.py
@@ -14,10 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-import kombu
-import kombu.entity
-import kombu.messaging
-import kombu.connection
import inspect
import itertools
import sys
@@ -29,17 +25,18 @@ import eventlet
from eventlet import greenpool
from eventlet import pools
import greenlet
+import kombu
+import kombu.entity
+import kombu.messaging
+import kombu.connection
from nova import context
from nova import exception
from nova import flags
from nova.rpc import common as rpc_common
-from nova.rpc.common import RemoteError, LOG
-
-# Needed for tests
-eventlet.monkey_patch()
FLAGS = flags.FLAGS
+LOG = rpc_common.LOG
class ConsumerBase(object):
@@ -340,58 +337,115 @@ class Connection(object):
self.connection = None
self.reconnect()
- def reconnect(self):
- """Handles reconnecting and re-establishing queues"""
+ def _connect(self):
+ """Connect to rabbit. Re-establish any queues that may have
+ been declared before if we are reconnecting. Exceptions should
+ be handled by the caller.
+ """
if self.connection:
+ LOG.info(_("Reconnecting to AMQP server on "
+ "%(hostname)s:%(port)d") % self.params)
try:
self.connection.close()
- except self.connection.connection_errors:
+ except self.connection_errors:
pass
- time.sleep(1)
- self.connection = kombu.connection.BrokerConnection(**self.params)
+ # Setting this in case the next statement fails, though
+ # it shouldn't be doing any network operations, yet.
+ self.connection = None
+ self.connection = kombu.connection.BrokerConnection(
+ **self.params)
+ self.connection_errors = self.connection.connection_errors
if self.memory_transport:
# Kludge to speed up tests.
self.connection.transport.polling_interval = 0.0
self.consumer_num = itertools.count(1)
-
- try:
- self.connection.ensure_connection(errback=self.connect_error,
- max_retries=self.max_retries,
- interval_start=self.interval_start,
- interval_step=self.interval_stepping,
- interval_max=self.interval_max)
- except self.connection.connection_errors, e:
- # We should only get here if max_retries is set. We'll go
- # ahead and exit in this case.
- err_str = str(e)
- max_retries = self.max_retries
- LOG.error(_('Unable to connect to AMQP server '
- 'after %(max_retries)d tries: %(err_str)s') % locals())
- sys.exit(1)
- LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d' %
- self.params))
+ self.connection.connect()
self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3
if self.memory_transport:
self.channel._new_queue('ae.undeliver')
for consumer in self.consumers:
consumer.reconnect(self.channel)
- if self.consumers:
- LOG.debug(_("Re-established AMQP queues"))
+ LOG.info(_('Connected to AMQP server on '
+ '%(hostname)s:%(port)d' % self.params))
+
+ def reconnect(self):
+ """Handles reconnecting and re-establishing queues.
+ Will retry up to self.max_retries number of times.
+ self.max_retries = 0 means to retry forever.
+ Sleep between tries, starting at self.rabbit_retry_interval
+ seconds, backing off self.rabbit_retry_backoff number of seconds
+ each attempt.
+ """
+
+ attempt = 0
+ while True:
+ attempt += 1
+ try:
+ self._connect()
+ return
+ except self.connection_errors, e:
+ pass
+ except Exception, e:
+ # NOTE(comstud): Unfortunately it's possible for amqplib
+ # to return an error not covered by its transport
+ # connection_errors in the case of a timeout waiting for
+ # a protocol response. (See paste link in LP888621)
+ # So, we check all exceptions for 'timeout' in them
+ # and try to reconnect in this case.
+ if 'timeout' not in str(e):
+ raise
+
+ log_info = {}
+ log_info['err_str'] = str(e)
+ log_info['max_retries'] = self.max_retries
+ log_info.update(self.params)
+
+ if self.max_retries and attempt == self.max_retries:
+ LOG.exception(_('Unable to connect to AMQP server on '
+ '%(hostname)s:%(port)d after %(max_retries)d '
+ 'tries: %(err_str)s') % log_info)
+ # NOTE(comstud): Copied from original code. There's
+ # really no better recourse because if this was a queue we
+ # need to consume on, we have no way to consume anymore.
+ sys.exit(1)
+
+ if attempt == 1:
+ sleep_time = self.rabbit_retry_interval or 1
+ elif attempt > 1:
+ sleep_time += self.rabbit_retry_backoff
+ if self.interval_max:
+ sleep_time = min(sleep_time, self.interval_max)
+
+ log_info['sleep_time'] = sleep_time
+ LOG.exception(_('AMQP server on %(hostname)s:%(port)d is'
+ ' unreachable: %(err_str)s. Trying again in '
+ '%(sleep_time)d seconds.') % log_info)
+ time.sleep(sleep_time)
+
+ def ensure(self, error_callback, method, *args, **kwargs):
+ while True:
+ try:
+ return method(*args, **kwargs)
+ except self.connection_errors, e:
+ pass
+ except Exception, e:
+ # NOTE(comstud): Unfortunately it's possible for amqplib
+ # to return an error not covered by its transport
+ # connection_errors in the case of a timeout waiting for
+ # a protocol response. (See paste link in LP888621)
+ # So, we check all exceptions for 'timeout' in them
+ # and try to reconnect in this case.
+ if 'timeout' not in str(e):
+ raise
+ if error_callback:
+ error_callback(e)
+ self.reconnect()
def get_channel(self):
"""Convenience call for bin/clear_rabbit_queues"""
return self.channel
- def connect_error(self, exc, interval):
- """Callback when there are connection re-tries by kombu"""
- info = self.params.copy()
- info['intv'] = interval
- info['e'] = exc
- LOG.error(_('AMQP server on %(hostname)s:%(port)d is'
- ' unreachable: %(e)s. Trying again in %(intv)d'
- ' seconds.') % info)
-
def close(self):
"""Close/release this connection"""
self.cancel_consumer_thread()
@@ -412,29 +466,44 @@ class Connection(object):
"""Create a Consumer using the class that was passed in and
add it to our list of consumers
"""
- consumer = consumer_cls(self.channel, topic, callback,
- self.consumer_num.next())
- self.consumers.append(consumer)
- return consumer
+
+ def _connect_error(exc):
+ log_info = {'topic': topic, 'err_str': str(exc)}
+ LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
+ "%(err_str)s") % log_info)
+
+ def _declare_consumer():
+ consumer = consumer_cls(self.channel, topic, callback,
+ self.consumer_num.next())
+ self.consumers.append(consumer)
+ return consumer
+
+ return self.ensure(_connect_error, _declare_consumer)
def iterconsume(self, limit=None):
"""Return an iterator that will consume from all queues/consumers"""
- while True:
- try:
+
+ info = {'do_consume': True}
+
+ def _error_callback(exc):
+ LOG.exception(_('Failed to consume message from queue: %s') %
+ str(exc))
+ info['do_consume'] = True
+
+ def _consume():
+ if info['do_consume']:
queues_head = self.consumers[:-1]
queues_tail = self.consumers[-1]
for queue in queues_head:
queue.consume(nowait=True)
queues_tail.consume(nowait=False)
+ info['do_consume'] = False
+ return self.connection.drain_events()
- for iteration in itertools.count(0):
- if limit and iteration >= limit:
- raise StopIteration
- yield self.connection.drain_events()
- except self.connection.connection_errors, e:
- LOG.exception(_('Failed to consume message from queue: '
- '%s' % str(e)))
- self.reconnect()
+ for iteration in itertools.count(0):
+ if limit and iteration >= limit:
+ raise StopIteration
+ yield self.ensure(_error_callback, _consume)
def cancel_consumer_thread(self):
"""Cancel a consumer thread"""
@@ -448,17 +517,17 @@ class Connection(object):
def publisher_send(self, cls, topic, msg, **kwargs):
"""Send to a publisher based on the publisher class"""
- while True:
- try:
- publisher = cls(self.channel, topic, **kwargs)
- publisher.send(msg)
- return
- except self.connection.connection_errors, e:
- LOG.exception(_('Failed to publish message %s' % str(e)))
- try:
- self.reconnect()
- except self.connection.connection_errors, e:
- pass
+
+ def _error_callback(exc):
+ log_info = {'topic': topic, 'err_str': str(exc)}
+ LOG.exception(_("Failed to publish message to topic "
+ "'%(topic)s': %(err_str)s") % log_info)
+
+ def _publish():
+ publisher = cls(self.channel, topic, **kwargs)
+ publisher.send(msg)
+
+ self.ensure(_error_callback, _publish)
def declare_direct_consumer(self, topic, callback):
"""Create a 'direct' queue.
@@ -723,7 +792,7 @@ class MulticallWaiter(object):
def __call__(self, data):
"""The consume() callback will call this. Store the result."""
if data['failure']:
- self._result = RemoteError(*data['failure'])
+ self._result = rpc_common.RemoteError(*data['failure'])
elif data.get('ending', False):
self._got_ending = True
else:
diff --git a/nova/tests/rpc/test_kombu.py b/nova/tests/rpc/test_kombu.py
index 01b00f33d..dc100cf12 100644
--- a/nova/tests/rpc/test_kombu.py
+++ b/nova/tests/rpc/test_kombu.py
@@ -29,6 +29,23 @@ from nova.tests.rpc import common
LOG = logging.getLogger('nova.tests.rpc')
+class MyException(Exception):
+ pass
+
+
+def _raise_exc_stub(stubs, times, obj, method, exc_msg):
+ info = {'called': 0}
+ orig_method = getattr(obj, method)
+
+ def _raise_stub(*args, **kwargs):
+ info['called'] += 1
+ if info['called'] <= times:
+ raise MyException(exc_msg)
+ orig_method(*args, **kwargs)
+ stubs.Set(obj, method, _raise_stub)
+ return info
+
+
class RpcKombuTestCase(common._BaseRpcTestCase):
def setUp(self):
self.rpc = impl_kombu
@@ -108,3 +125,100 @@ class RpcKombuTestCase(common._BaseRpcTestCase):
conn2.consume(limit=1)
conn2.close()
self.assertEqual(self.received_message, message)
+
+ def test_declare_consumer_errors_will_reconnect(self):
+ # Test that any exception with 'timeout' in it causes a
+ # reconnection
+ info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer,
+ '__init__', 'foo timeout foo')
+
+ conn = self.rpc.Connection()
+ result = conn.declare_consumer(self.rpc.DirectConsumer,
+ 'test_topic', None)
+
+ self.assertEqual(info['called'], 3)
+ self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
+
+ # Test that any exception in transport.connection_errors causes
+ # a reconnection
+ self.stubs.UnsetAll()
+
+ info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectConsumer,
+ '__init__', 'meow')
+
+ conn = self.rpc.Connection()
+ conn.connection_errors = (MyException, )
+
+ result = conn.declare_consumer(self.rpc.DirectConsumer,
+ 'test_topic', None)
+
+ self.assertEqual(info['called'], 2)
+ self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
+
+ def test_publishing_errors_will_reconnect(self):
+ # Test that any exception with 'timeout' in it causes a
+ # reconnection when declaring the publisher class and when
+ # calling send()
+ info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
+ '__init__', 'foo timeout foo')
+
+ conn = self.rpc.Connection()
+ conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
+
+ self.assertEqual(info['called'], 3)
+ self.stubs.UnsetAll()
+
+ info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
+ 'send', 'foo timeout foo')
+
+ conn = self.rpc.Connection()
+ conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
+
+ self.assertEqual(info['called'], 3)
+
+ # Test that any exception in transport.connection_errors causes
+ # a reconnection when declaring the publisher class and when
+ # calling send()
+ self.stubs.UnsetAll()
+
+ info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
+ '__init__', 'meow')
+
+ conn = self.rpc.Connection()
+ conn.connection_errors = (MyException, )
+
+ conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
+
+ self.assertEqual(info['called'], 2)
+ self.stubs.UnsetAll()
+
+ info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
+ 'send', 'meow')
+
+ conn = self.rpc.Connection()
+ conn.connection_errors = (MyException, )
+
+ conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
+
+ self.assertEqual(info['called'], 2)
+
+ def test_iterconsume_errors_will_reconnect(self):
+ conn = self.rpc.Connection()
+ message = 'reconnect test message'
+
+ self.received_message = None
+
+ def _callback(message):
+ self.received_message = message
+
+ conn.declare_direct_consumer('a_direct', _callback)
+ conn.direct_send('a_direct', message)
+
+ info = _raise_exc_stub(self.stubs, 1, conn.connection,
+ 'drain_events', 'foo timeout foo')
+ conn.consume(limit=1)
+ conn.close()
+
+ self.assertEqual(self.received_message, message)
+ # Only called once, because our stub goes away during reconnection
+ self.assertEqual(info['called'], 1)