diff options
author | Jenkins <jenkins@review.openstack.org> | 2012-01-13 18:50:10 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2012-01-13 18:50:10 +0000 |
commit | 485328e48fc7dff1ce2ebfd262c442d8f993688b (patch) | |
tree | bd14c184edfc8c80fac079f2a94c4b91966747b9 | |
parent | 572c85da9843b549f3ac26ee7d4107a3ab0b0809 (diff) | |
parent | 59e8ae1362f33ab30b2dc900dcbde30efc5a57c8 (diff) | |
download | nova-485328e48fc7dff1ce2ebfd262c442d8f993688b.tar.gz nova-485328e48fc7dff1ce2ebfd262c442d8f993688b.tar.xz nova-485328e48fc7dff1ce2ebfd262c442d8f993688b.zip |
Merge "Implement more complete kombu reconnecting"
-rw-r--r-- | nova/rpc/impl_kombu.py | 203 | ||||
-rw-r--r-- | nova/tests/rpc/test_kombu.py | 114 |
2 files changed, 250 insertions, 67 deletions
diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py index 9ed1e4e6f..f0a8bc61e 100644 --- a/nova/rpc/impl_kombu.py +++ b/nova/rpc/impl_kombu.py @@ -14,10 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import kombu -import kombu.entity -import kombu.messaging -import kombu.connection import inspect import itertools import sys @@ -29,17 +25,18 @@ import eventlet from eventlet import greenpool from eventlet import pools import greenlet +import kombu +import kombu.entity +import kombu.messaging +import kombu.connection from nova import context from nova import exception from nova import flags from nova.rpc import common as rpc_common -from nova.rpc.common import RemoteError, LOG - -# Needed for tests -eventlet.monkey_patch() FLAGS = flags.FLAGS +LOG = rpc_common.LOG class ConsumerBase(object): @@ -340,58 +337,115 @@ class Connection(object): self.connection = None self.reconnect() - def reconnect(self): - """Handles reconnecting and re-establishing queues""" + def _connect(self): + """Connect to rabbit. Re-establish any queues that may have + been declared before if we are reconnecting. Exceptions should + be handled by the caller. + """ if self.connection: + LOG.info(_("Reconnecting to AMQP server on " + "%(hostname)s:%(port)d") % self.params) try: self.connection.close() - except self.connection.connection_errors: + except self.connection_errors: pass - time.sleep(1) - self.connection = kombu.connection.BrokerConnection(**self.params) + # Setting this in case the next statement fails, though + # it shouldn't be doing any network operations, yet. + self.connection = None + self.connection = kombu.connection.BrokerConnection( + **self.params) + self.connection_errors = self.connection.connection_errors if self.memory_transport: # Kludge to speed up tests. self.connection.transport.polling_interval = 0.0 self.consumer_num = itertools.count(1) - - try: - self.connection.ensure_connection(errback=self.connect_error, - max_retries=self.max_retries, - interval_start=self.interval_start, - interval_step=self.interval_stepping, - interval_max=self.interval_max) - except self.connection.connection_errors, e: - # We should only get here if max_retries is set. We'll go - # ahead and exit in this case. - err_str = str(e) - max_retries = self.max_retries - LOG.error(_('Unable to connect to AMQP server ' - 'after %(max_retries)d tries: %(err_str)s') % locals()) - sys.exit(1) - LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d' % - self.params)) + self.connection.connect() self.channel = self.connection.channel() # work around 'memory' transport bug in 1.1.3 if self.memory_transport: self.channel._new_queue('ae.undeliver') for consumer in self.consumers: consumer.reconnect(self.channel) - if self.consumers: - LOG.debug(_("Re-established AMQP queues")) + LOG.info(_('Connected to AMQP server on ' + '%(hostname)s:%(port)d' % self.params)) + + def reconnect(self): + """Handles reconnecting and re-establishing queues. + Will retry up to self.max_retries number of times. + self.max_retries = 0 means to retry forever. + Sleep between tries, starting at self.rabbit_retry_interval + seconds, backing off self.rabbit_retry_backoff number of seconds + each attempt. + """ + + attempt = 0 + while True: + attempt += 1 + try: + self._connect() + return + except self.connection_errors, e: + pass + except Exception, e: + # NOTE(comstud): Unfortunately it's possible for amqplib + # to return an error not covered by its transport + # connection_errors in the case of a timeout waiting for + # a protocol response. (See paste link in LP888621) + # So, we check all exceptions for 'timeout' in them + # and try to reconnect in this case. + if 'timeout' not in str(e): + raise + + log_info = {} + log_info['err_str'] = str(e) + log_info['max_retries'] = self.max_retries + log_info.update(self.params) + + if self.max_retries and attempt == self.max_retries: + LOG.exception(_('Unable to connect to AMQP server on ' + '%(hostname)s:%(port)d after %(max_retries)d ' + 'tries: %(err_str)s') % log_info) + # NOTE(comstud): Copied from original code. There's + # really no better recourse because if this was a queue we + # need to consume on, we have no way to consume anymore. + sys.exit(1) + + if attempt == 1: + sleep_time = self.rabbit_retry_interval or 1 + elif attempt > 1: + sleep_time += self.rabbit_retry_backoff + if self.interval_max: + sleep_time = min(sleep_time, self.interval_max) + + log_info['sleep_time'] = sleep_time + LOG.exception(_('AMQP server on %(hostname)s:%(port)d is' + ' unreachable: %(err_str)s. Trying again in ' + '%(sleep_time)d seconds.') % log_info) + time.sleep(sleep_time) + + def ensure(self, error_callback, method, *args, **kwargs): + while True: + try: + return method(*args, **kwargs) + except self.connection_errors, e: + pass + except Exception, e: + # NOTE(comstud): Unfortunately it's possible for amqplib + # to return an error not covered by its transport + # connection_errors in the case of a timeout waiting for + # a protocol response. (See paste link in LP888621) + # So, we check all exceptions for 'timeout' in them + # and try to reconnect in this case. + if 'timeout' not in str(e): + raise + if error_callback: + error_callback(e) + self.reconnect() def get_channel(self): """Convenience call for bin/clear_rabbit_queues""" return self.channel - def connect_error(self, exc, interval): - """Callback when there are connection re-tries by kombu""" - info = self.params.copy() - info['intv'] = interval - info['e'] = exc - LOG.error(_('AMQP server on %(hostname)s:%(port)d is' - ' unreachable: %(e)s. Trying again in %(intv)d' - ' seconds.') % info) - def close(self): """Close/release this connection""" self.cancel_consumer_thread() @@ -412,29 +466,44 @@ class Connection(object): """Create a Consumer using the class that was passed in and add it to our list of consumers """ - consumer = consumer_cls(self.channel, topic, callback, - self.consumer_num.next()) - self.consumers.append(consumer) - return consumer + + def _connect_error(exc): + log_info = {'topic': topic, 'err_str': str(exc)} + LOG.error(_("Failed to declare consumer for topic '%(topic)s': " + "%(err_str)s") % log_info) + + def _declare_consumer(): + consumer = consumer_cls(self.channel, topic, callback, + self.consumer_num.next()) + self.consumers.append(consumer) + return consumer + + return self.ensure(_connect_error, _declare_consumer) def iterconsume(self, limit=None): """Return an iterator that will consume from all queues/consumers""" - while True: - try: + + info = {'do_consume': True} + + def _error_callback(exc): + LOG.exception(_('Failed to consume message from queue: %s') % + str(exc)) + info['do_consume'] = True + + def _consume(): + if info['do_consume']: queues_head = self.consumers[:-1] queues_tail = self.consumers[-1] for queue in queues_head: queue.consume(nowait=True) queues_tail.consume(nowait=False) + info['do_consume'] = False + return self.connection.drain_events() - for iteration in itertools.count(0): - if limit and iteration >= limit: - raise StopIteration - yield self.connection.drain_events() - except self.connection.connection_errors, e: - LOG.exception(_('Failed to consume message from queue: ' - '%s' % str(e))) - self.reconnect() + for iteration in itertools.count(0): + if limit and iteration >= limit: + raise StopIteration + yield self.ensure(_error_callback, _consume) def cancel_consumer_thread(self): """Cancel a consumer thread""" @@ -448,17 +517,17 @@ class Connection(object): def publisher_send(self, cls, topic, msg, **kwargs): """Send to a publisher based on the publisher class""" - while True: - try: - publisher = cls(self.channel, topic, **kwargs) - publisher.send(msg) - return - except self.connection.connection_errors, e: - LOG.exception(_('Failed to publish message %s' % str(e))) - try: - self.reconnect() - except self.connection.connection_errors, e: - pass + + def _error_callback(exc): + log_info = {'topic': topic, 'err_str': str(exc)} + LOG.exception(_("Failed to publish message to topic " + "'%(topic)s': %(err_str)s") % log_info) + + def _publish(): + publisher = cls(self.channel, topic, **kwargs) + publisher.send(msg) + + self.ensure(_error_callback, _publish) def declare_direct_consumer(self, topic, callback): """Create a 'direct' queue. @@ -723,7 +792,7 @@ class MulticallWaiter(object): def __call__(self, data): """The consume() callback will call this. Store the result.""" if data['failure']: - self._result = RemoteError(*data['failure']) + self._result = rpc_common.RemoteError(*data['failure']) elif data.get('ending', False): self._got_ending = True else: diff --git a/nova/tests/rpc/test_kombu.py b/nova/tests/rpc/test_kombu.py index 01b00f33d..dc100cf12 100644 --- a/nova/tests/rpc/test_kombu.py +++ b/nova/tests/rpc/test_kombu.py @@ -29,6 +29,23 @@ from nova.tests.rpc import common LOG = logging.getLogger('nova.tests.rpc') +class MyException(Exception): + pass + + +def _raise_exc_stub(stubs, times, obj, method, exc_msg): + info = {'called': 0} + orig_method = getattr(obj, method) + + def _raise_stub(*args, **kwargs): + info['called'] += 1 + if info['called'] <= times: + raise MyException(exc_msg) + orig_method(*args, **kwargs) + stubs.Set(obj, method, _raise_stub) + return info + + class RpcKombuTestCase(common._BaseRpcTestCase): def setUp(self): self.rpc = impl_kombu @@ -108,3 +125,100 @@ class RpcKombuTestCase(common._BaseRpcTestCase): conn2.consume(limit=1) conn2.close() self.assertEqual(self.received_message, message) + + def test_declare_consumer_errors_will_reconnect(self): + # Test that any exception with 'timeout' in it causes a + # reconnection + info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer, + '__init__', 'foo timeout foo') + + conn = self.rpc.Connection() + result = conn.declare_consumer(self.rpc.DirectConsumer, + 'test_topic', None) + + self.assertEqual(info['called'], 3) + self.assertTrue(isinstance(result, self.rpc.DirectConsumer)) + + # Test that any exception in transport.connection_errors causes + # a reconnection + self.stubs.UnsetAll() + + info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectConsumer, + '__init__', 'meow') + + conn = self.rpc.Connection() + conn.connection_errors = (MyException, ) + + result = conn.declare_consumer(self.rpc.DirectConsumer, + 'test_topic', None) + + self.assertEqual(info['called'], 2) + self.assertTrue(isinstance(result, self.rpc.DirectConsumer)) + + def test_publishing_errors_will_reconnect(self): + # Test that any exception with 'timeout' in it causes a + # reconnection when declaring the publisher class and when + # calling send() + info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher, + '__init__', 'foo timeout foo') + + conn = self.rpc.Connection() + conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg') + + self.assertEqual(info['called'], 3) + self.stubs.UnsetAll() + + info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher, + 'send', 'foo timeout foo') + + conn = self.rpc.Connection() + conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg') + + self.assertEqual(info['called'], 3) + + # Test that any exception in transport.connection_errors causes + # a reconnection when declaring the publisher class and when + # calling send() + self.stubs.UnsetAll() + + info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher, + '__init__', 'meow') + + conn = self.rpc.Connection() + conn.connection_errors = (MyException, ) + + conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg') + + self.assertEqual(info['called'], 2) + self.stubs.UnsetAll() + + info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher, + 'send', 'meow') + + conn = self.rpc.Connection() + conn.connection_errors = (MyException, ) + + conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg') + + self.assertEqual(info['called'], 2) + + def test_iterconsume_errors_will_reconnect(self): + conn = self.rpc.Connection() + message = 'reconnect test message' + + self.received_message = None + + def _callback(message): + self.received_message = message + + conn.declare_direct_consumer('a_direct', _callback) + conn.direct_send('a_direct', message) + + info = _raise_exc_stub(self.stubs, 1, conn.connection, + 'drain_events', 'foo timeout foo') + conn.consume(limit=1) + conn.close() + + self.assertEqual(self.received_message, message) + # Only called once, because our stub goes away during reconnection + self.assertEqual(info['called'], 1) |