From dea334a3f55ee7876fdb1b4717e7be8f21499b81 Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Fri, 21 Jun 2013 15:44:18 +0200 Subject: Replace sys.exit by a RPCException This change replace sys.exit by a RPCException like the zmq implementation does, to allow library users to handle the case of the rpc failure in their applications. Change-Id: Iafda7bfa20840fa5488dece1d5ad49e2b14b73b5 --- openstack/common/rpc/impl_kombu.py | 13 +++++-------- tests/unit/rpc/test_kombu.py | 36 +++++++++++++++++++++++++++++------- 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 8fb3504..36d2fc5 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -18,7 +18,6 @@ import functools import itertools import socket import ssl -import sys import time import uuid @@ -561,13 +560,11 @@ class Connection(object): log_info.update(params) if self.max_retries and attempt == self.max_retries: - LOG.error(_('Unable to connect to AMQP server on ' - '%(hostname)s:%(port)d after %(max_retries)d ' - 'tries: %(err_str)s') % log_info) - # NOTE(comstud): Copied from original code. There's - # really no better recourse because if this was a queue we - # need to consume on, we have no way to consume anymore. - sys.exit(1) + msg = _('Unable to connect to AMQP server on ' + '%(hostname)s:%(port)d after %(max_retries)d ' + 'tries: %(err_str)s') % log_info + LOG.error(msg) + raise rpc_common.RPCException(msg) if attempt == 1: sleep_time = self.interval_start or 1 diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py index c8c3f32..cbe948d 100644 --- a/tests/unit/rpc/test_kombu.py +++ b/tests/unit/rpc/test_kombu.py @@ -41,6 +41,8 @@ from tests import utils try: import kombu + import kombu.connection + import kombu.entity from openstack.common.rpc import impl_kombu except ImportError: kombu = None @@ -712,6 +714,32 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase): "args": {"value": value}}) self.assertEqual(value, result) + def test_reconnect_max_retries(self): + self.config(rabbit_hosts=[ + 'host1:1234', 'host2:5678', '[::1]:2345', + '[2001:0db8:85a3:0042:0000:8a2e:0370:7334]'], + rabbit_max_retries=2, + rabbit_retry_interval=0.1, + rabbit_retry_backoff=0.1) + + info = {'attempt': 0} + + class MyConnection(kombu.connection.BrokerConnection): + def __init__(self, *args, **params): + super(MyConnection, self).__init__(*args, **params) + info['attempt'] += 1 + + def connect(self): + if info['attempt'] < 3: + # the word timeout is important (see impl_kombu.py:486) + raise Exception('connection timeout') + super(kombu.connection.BrokerConnection, self).connect() + + self.stubs.Set(kombu.connection, 'BrokerConnection', MyConnection) + + self.assertRaises(rpc_common.RPCException, self.rpc.Connection, FLAGS) + self.assertEqual(info['attempt'], 2) + class RpcKombuHATestCase(utils.BaseTestCase): def setUp(self): @@ -758,15 +786,13 @@ class RpcKombuHATestCase(utils.BaseTestCase): ] } - import kombu.connection - class MyConnection(kombu.connection.BrokerConnection): def __init__(myself, *args, **params): super(MyConnection, myself).__init__(*args, **params) self.assertEqual(params, info['params_list'][info['attempt'] % len(info['params_list'])]) - info['attempt'] = info['attempt'] + 1 + info['attempt'] += 1 def connect(myself): if info['attempt'] < 5: @@ -783,8 +809,6 @@ class RpcKombuHATestCase(utils.BaseTestCase): def test_queue_not_declared_ha_if_ha_off(self): self.config(rabbit_ha_queues=False) - import kombu.entity - def my_declare(myself): self.assertEqual(None, (myself.queue_arguments or {}).get('x-ha-policy')) @@ -797,8 +821,6 @@ class RpcKombuHATestCase(utils.BaseTestCase): def test_queue_declared_ha_if_ha_on(self): self.config(rabbit_ha_queues=True) - import kombu.entity - def my_declare(myself): self.assertEqual('all', (myself.queue_arguments or {}).get('x-ha-policy')) -- cgit