diff options
author | Chris Behrens <cbehrens@codestud.com> | 2012-01-11 12:35:42 -0800 |
---|---|---|
committer | Chris Behrens <cbehrens@codestud.com> | 2012-01-12 23:35:09 -0800 |
commit | 59e8ae1362f33ab30b2dc900dcbde30efc5a57c8 (patch) | |
tree | ea0f292623debe76296ebb24a7c2031abf5017f8 | |
parent | 6d80851279052a30b98e465106f158cb2afdb6a5 (diff) | |
download | nova-59e8ae1362f33ab30b2dc900dcbde30efc5a57c8.tar.gz nova-59e8ae1362f33ab30b2dc900dcbde30efc5a57c8.tar.xz nova-59e8ae1362f33ab30b2dc900dcbde30efc5a57c8.zip |
Implement more complete kombu reconnecting
Fixes bug 888621
We were missing some wrapping around when consumers are declared and
a case where we had an exception we weren't trapping. In the latter
case, it's not easy to trap it because you'd have to bypass the kombu
interface and import amqplib and try to trap one of its exceptions.
What I've implemented here looks for 'timeout' in any exception, even
though I really don't like it. :)
Fixes HACKING violations while I'm at it.
Change-Id: I0132fbc4377e221b0a366d0340652147ddb33c87
-rw-r--r-- | nova/rpc/impl_kombu.py | 203 | ||||
-rw-r--r-- | nova/tests/rpc/test_kombu.py | 114 |
2 files changed, 250 insertions, 67 deletions
diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py index 9ed1e4e6f..f0a8bc61e 100644 --- a/nova/rpc/impl_kombu.py +++ b/nova/rpc/impl_kombu.py @@ -14,10 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import kombu -import kombu.entity -import kombu.messaging -import kombu.connection import inspect import itertools import sys @@ -29,17 +25,18 @@ import eventlet from eventlet import greenpool from eventlet import pools import greenlet +import kombu +import kombu.entity +import kombu.messaging +import kombu.connection from nova import context from nova import exception from nova import flags from nova.rpc import common as rpc_common -from nova.rpc.common import RemoteError, LOG - -# Needed for tests -eventlet.monkey_patch() FLAGS = flags.FLAGS +LOG = rpc_common.LOG class ConsumerBase(object): @@ -340,58 +337,115 @@ class Connection(object): self.connection = None self.reconnect() - def reconnect(self): - """Handles reconnecting and re-establishing queues""" + def _connect(self): + """Connect to rabbit. Re-establish any queues that may have + been declared before if we are reconnecting. Exceptions should + be handled by the caller. + """ if self.connection: + LOG.info(_("Reconnecting to AMQP server on " + "%(hostname)s:%(port)d") % self.params) try: self.connection.close() - except self.connection.connection_errors: + except self.connection_errors: pass - time.sleep(1) - self.connection = kombu.connection.BrokerConnection(**self.params) + # Setting this in case the next statement fails, though + # it shouldn't be doing any network operations, yet. + self.connection = None + self.connection = kombu.connection.BrokerConnection( + **self.params) + self.connection_errors = self.connection.connection_errors if self.memory_transport: # Kludge to speed up tests. self.connection.transport.polling_interval = 0.0 self.consumer_num = itertools.count(1) - - try: - self.connection.ensure_connection(errback=self.connect_error, - max_retries=self.max_retries, - interval_start=self.interval_start, - interval_step=self.interval_stepping, - interval_max=self.interval_max) - except self.connection.connection_errors, e: - # We should only get here if max_retries is set. We'll go - # ahead and exit in this case. - err_str = str(e) - max_retries = self.max_retries - LOG.error(_('Unable to connect to AMQP server ' - 'after %(max_retries)d tries: %(err_str)s') % locals()) - sys.exit(1) - LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d' % - self.params)) + self.connection.connect() self.channel = self.connection.channel() # work around 'memory' transport bug in 1.1.3 if self.memory_transport: self.channel._new_queue('ae.undeliver') for consumer in self.consumers: consumer.reconnect(self.channel) - if self.consumers: - LOG.debug(_("Re-established AMQP queues")) + LOG.info(_('Connected to AMQP server on ' + '%(hostname)s:%(port)d' % self.params)) + + def reconnect(self): + """Handles reconnecting and re-establishing queues. + Will retry up to self.max_retries number of times. + self.max_retries = 0 means to retry forever. + Sleep between tries, starting at self.rabbit_retry_interval + seconds, backing off self.rabbit_retry_backoff number of seconds + each attempt. + """ + + attempt = 0 + while True: + attempt += 1 + try: + self._connect() + return + except self.connection_errors, e: + pass + except Exception, e: + # NOTE(comstud): Unfortunately it's possible for amqplib + # to return an error not covered by its transport + # connection_errors in the case of a timeout waiting for + # a protocol response. (See paste link in LP888621) + # So, we check all exceptions for 'timeout' in them + # and try to reconnect in this case. + if 'timeout' not in str(e): + raise + + log_info = {} + log_info['err_str'] = str(e) + log_info['max_retries'] = self.max_retries + log_info.update(self.params) + + if self.max_retries and attempt == self.max_retries: + LOG.exception(_('Unable to connect to AMQP server on ' + '%(hostname)s:%(port)d after %(max_retries)d ' + 'tries: %(err_str)s') % log_info) + # NOTE(comstud): Copied from original code. There's + # really no better recourse because if this was a queue we + # need to consume on, we have no way to consume anymore. + sys.exit(1) + + if attempt == 1: + sleep_time = self.rabbit_retry_interval or 1 + elif attempt > 1: + sleep_time += self.rabbit_retry_backoff + if self.interval_max: + sleep_time = min(sleep_time, self.interval_max) + + log_info['sleep_time'] = sleep_time + LOG.exception(_('AMQP server on %(hostname)s:%(port)d is' + ' unreachable: %(err_str)s. Trying again in ' + '%(sleep_time)d seconds.') % log_info) + time.sleep(sleep_time) + + def ensure(self, error_callback, method, *args, **kwargs): + while True: + try: + return method(*args, **kwargs) + except self.connection_errors, e: + pass + except Exception, e: + # NOTE(comstud): Unfortunately it's possible for amqplib + # to return an error not covered by its transport + # connection_errors in the case of a timeout waiting for + # a protocol response. (See paste link in LP888621) + # So, we check all exceptions for 'timeout' in them + # and try to reconnect in this case. + if 'timeout' not in str(e): + raise + if error_callback: + error_callback(e) + self.reconnect() def get_channel(self): """Convenience call for bin/clear_rabbit_queues""" return self.channel - def connect_error(self, exc, interval): - """Callback when there are connection re-tries by kombu""" - info = self.params.copy() - info['intv'] = interval - info['e'] = exc - LOG.error(_('AMQP server on %(hostname)s:%(port)d is' - ' unreachable: %(e)s. Trying again in %(intv)d' - ' seconds.') % info) - def close(self): """Close/release this connection""" self.cancel_consumer_thread() @@ -412,29 +466,44 @@ class Connection(object): """Create a Consumer using the class that was passed in and add it to our list of consumers """ - consumer = consumer_cls(self.channel, topic, callback, - self.consumer_num.next()) - self.consumers.append(consumer) - return consumer + + def _connect_error(exc): + log_info = {'topic': topic, 'err_str': str(exc)} + LOG.error(_("Failed to declare consumer for topic '%(topic)s': " + "%(err_str)s") % log_info) + + def _declare_consumer(): + consumer = consumer_cls(self.channel, topic, callback, + self.consumer_num.next()) + self.consumers.append(consumer) + return consumer + + return self.ensure(_connect_error, _declare_consumer) def iterconsume(self, limit=None): """Return an iterator that will consume from all queues/consumers""" - while True: - try: + + info = {'do_consume': True} + + def _error_callback(exc): + LOG.exception(_('Failed to consume message from queue: %s') % + str(exc)) + info['do_consume'] = True + + def _consume(): + if info['do_consume']: queues_head = self.consumers[:-1] queues_tail = self.consumers[-1] for queue in queues_head: queue.consume(nowait=True) queues_tail.consume(nowait=False) + info['do_consume'] = False + return self.connection.drain_events() - for iteration in itertools.count(0): - if limit and iteration >= limit: - raise StopIteration - yield self.connection.drain_events() - except self.connection.connection_errors, e: - LOG.exception(_('Failed to consume message from queue: ' - '%s' % str(e))) - self.reconnect() + for iteration in itertools.count(0): + if limit and iteration >= limit: + raise StopIteration + yield self.ensure(_error_callback, _consume) def cancel_consumer_thread(self): """Cancel a consumer thread""" @@ -448,17 +517,17 @@ class Connection(object): def publisher_send(self, cls, topic, msg, **kwargs): """Send to a publisher based on the publisher class""" - while True: - try: - publisher = cls(self.channel, topic, **kwargs) - publisher.send(msg) - return - except self.connection.connection_errors, e: - LOG.exception(_('Failed to publish message %s' % str(e))) - try: - self.reconnect() - except self.connection.connection_errors, e: - pass + + def _error_callback(exc): + log_info = {'topic': topic, 'err_str': str(exc)} + LOG.exception(_("Failed to publish message to topic " + "'%(topic)s': %(err_str)s") % log_info) + + def _publish(): + publisher = cls(self.channel, topic, **kwargs) + publisher.send(msg) + + self.ensure(_error_callback, _publish) def declare_direct_consumer(self, topic, callback): """Create a 'direct' queue. @@ -723,7 +792,7 @@ class MulticallWaiter(object): def __call__(self, data): """The consume() callback will call this. Store the result.""" if data['failure']: - self._result = RemoteError(*data['failure']) + self._result = rpc_common.RemoteError(*data['failure']) elif data.get('ending', False): self._got_ending = True else: diff --git a/nova/tests/rpc/test_kombu.py b/nova/tests/rpc/test_kombu.py index 01b00f33d..dc100cf12 100644 --- a/nova/tests/rpc/test_kombu.py +++ b/nova/tests/rpc/test_kombu.py @@ -29,6 +29,23 @@ from nova.tests.rpc import common LOG = logging.getLogger('nova.tests.rpc') +class MyException(Exception): + pass + + +def _raise_exc_stub(stubs, times, obj, method, exc_msg): + info = {'called': 0} + orig_method = getattr(obj, method) + + def _raise_stub(*args, **kwargs): + info['called'] += 1 + if info['called'] <= times: + raise MyException(exc_msg) + orig_method(*args, **kwargs) + stubs.Set(obj, method, _raise_stub) + return info + + class RpcKombuTestCase(common._BaseRpcTestCase): def setUp(self): self.rpc = impl_kombu @@ -108,3 +125,100 @@ class RpcKombuTestCase(common._BaseRpcTestCase): conn2.consume(limit=1) conn2.close() self.assertEqual(self.received_message, message) + + def test_declare_consumer_errors_will_reconnect(self): + # Test that any exception with 'timeout' in it causes a + # reconnection + info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer, + '__init__', 'foo timeout foo') + + conn = self.rpc.Connection() + result = conn.declare_consumer(self.rpc.DirectConsumer, + 'test_topic', None) + + self.assertEqual(info['called'], 3) + self.assertTrue(isinstance(result, self.rpc.DirectConsumer)) + + # Test that any exception in transport.connection_errors causes + # a reconnection + self.stubs.UnsetAll() + + info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectConsumer, + '__init__', 'meow') + + conn = self.rpc.Connection() + conn.connection_errors = (MyException, ) + + result = conn.declare_consumer(self.rpc.DirectConsumer, + 'test_topic', None) + + self.assertEqual(info['called'], 2) + self.assertTrue(isinstance(result, self.rpc.DirectConsumer)) + + def test_publishing_errors_will_reconnect(self): + # Test that any exception with 'timeout' in it causes a + # reconnection when declaring the publisher class and when + # calling send() + info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher, + '__init__', 'foo timeout foo') + + conn = self.rpc.Connection() + conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg') + + self.assertEqual(info['called'], 3) + self.stubs.UnsetAll() + + info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher, + 'send', 'foo timeout foo') + + conn = self.rpc.Connection() + conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg') + + self.assertEqual(info['called'], 3) + + # Test that any exception in transport.connection_errors causes + # a reconnection when declaring the publisher class and when + # calling send() + self.stubs.UnsetAll() + + info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher, + '__init__', 'meow') + + conn = self.rpc.Connection() + conn.connection_errors = (MyException, ) + + conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg') + + self.assertEqual(info['called'], 2) + self.stubs.UnsetAll() + + info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher, + 'send', 'meow') + + conn = self.rpc.Connection() + conn.connection_errors = (MyException, ) + + conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg') + + self.assertEqual(info['called'], 2) + + def test_iterconsume_errors_will_reconnect(self): + conn = self.rpc.Connection() + message = 'reconnect test message' + + self.received_message = None + + def _callback(message): + self.received_message = message + + conn.declare_direct_consumer('a_direct', _callback) + conn.direct_send('a_direct', message) + + info = _raise_exc_stub(self.stubs, 1, conn.connection, + 'drain_events', 'foo timeout foo') + conn.consume(limit=1) + conn.close() + + self.assertEqual(self.received_message, message) + # Only called once, because our stub goes away during reconnection + self.assertEqual(info['called'], 1) |