summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMehdi Abaakouk <mehdi.abaakouk@enovance.com>2013-06-21 15:44:18 +0200
committerMehdi Abaakouk <mehdi.abaakouk@enovance.com>2013-07-02 09:48:25 +0200
commitdea334a3f55ee7876fdb1b4717e7be8f21499b81 (patch)
tree5320ced3aee67cb55e6dd1eaf6f2a80ea3296c82
parent262d6a5f53e1ecfdcf7f605e011b9985fcd62822 (diff)
downloadoslo-dea334a3f55ee7876fdb1b4717e7be8f21499b81.tar.gz
oslo-dea334a3f55ee7876fdb1b4717e7be8f21499b81.tar.xz
oslo-dea334a3f55ee7876fdb1b4717e7be8f21499b81.zip
Replace sys.exit by a RPCException
This change replace sys.exit by a RPCException like the zmq implementation does, to allow library users to handle the case of the rpc failure in their applications. Change-Id: Iafda7bfa20840fa5488dece1d5ad49e2b14b73b5
-rw-r--r--openstack/common/rpc/impl_kombu.py13
-rw-r--r--tests/unit/rpc/test_kombu.py36
2 files changed, 34 insertions, 15 deletions
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index 8fb3504..36d2fc5 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -18,7 +18,6 @@ import functools
import itertools
import socket
import ssl
-import sys
import time
import uuid
@@ -561,13 +560,11 @@ class Connection(object):
log_info.update(params)
if self.max_retries and attempt == self.max_retries:
- LOG.error(_('Unable to connect to AMQP server on '
- '%(hostname)s:%(port)d after %(max_retries)d '
- 'tries: %(err_str)s') % log_info)
- # NOTE(comstud): Copied from original code. There's
- # really no better recourse because if this was a queue we
- # need to consume on, we have no way to consume anymore.
- sys.exit(1)
+ msg = _('Unable to connect to AMQP server on '
+ '%(hostname)s:%(port)d after %(max_retries)d '
+ 'tries: %(err_str)s') % log_info
+ LOG.error(msg)
+ raise rpc_common.RPCException(msg)
if attempt == 1:
sleep_time = self.interval_start or 1
diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py
index c8c3f32..cbe948d 100644
--- a/tests/unit/rpc/test_kombu.py
+++ b/tests/unit/rpc/test_kombu.py
@@ -41,6 +41,8 @@ from tests import utils
try:
import kombu
+ import kombu.connection
+ import kombu.entity
from openstack.common.rpc import impl_kombu
except ImportError:
kombu = None
@@ -712,6 +714,32 @@ class RpcKombuTestCase(amqp.BaseRpcAMQPTestCase):
"args": {"value": value}})
self.assertEqual(value, result)
+ def test_reconnect_max_retries(self):
+ self.config(rabbit_hosts=[
+ 'host1:1234', 'host2:5678', '[::1]:2345',
+ '[2001:0db8:85a3:0042:0000:8a2e:0370:7334]'],
+ rabbit_max_retries=2,
+ rabbit_retry_interval=0.1,
+ rabbit_retry_backoff=0.1)
+
+ info = {'attempt': 0}
+
+ class MyConnection(kombu.connection.BrokerConnection):
+ def __init__(self, *args, **params):
+ super(MyConnection, self).__init__(*args, **params)
+ info['attempt'] += 1
+
+ def connect(self):
+ if info['attempt'] < 3:
+ # the word timeout is important (see impl_kombu.py:486)
+ raise Exception('connection timeout')
+ super(kombu.connection.BrokerConnection, self).connect()
+
+ self.stubs.Set(kombu.connection, 'BrokerConnection', MyConnection)
+
+ self.assertRaises(rpc_common.RPCException, self.rpc.Connection, FLAGS)
+ self.assertEqual(info['attempt'], 2)
+
class RpcKombuHATestCase(utils.BaseTestCase):
def setUp(self):
@@ -758,15 +786,13 @@ class RpcKombuHATestCase(utils.BaseTestCase):
]
}
- import kombu.connection
-
class MyConnection(kombu.connection.BrokerConnection):
def __init__(myself, *args, **params):
super(MyConnection, myself).__init__(*args, **params)
self.assertEqual(params,
info['params_list'][info['attempt'] %
len(info['params_list'])])
- info['attempt'] = info['attempt'] + 1
+ info['attempt'] += 1
def connect(myself):
if info['attempt'] < 5:
@@ -783,8 +809,6 @@ class RpcKombuHATestCase(utils.BaseTestCase):
def test_queue_not_declared_ha_if_ha_off(self):
self.config(rabbit_ha_queues=False)
- import kombu.entity
-
def my_declare(myself):
self.assertEqual(None,
(myself.queue_arguments or {}).get('x-ha-policy'))
@@ -797,8 +821,6 @@ class RpcKombuHATestCase(utils.BaseTestCase):
def test_queue_declared_ha_if_ha_on(self):
self.config(rabbit_ha_queues=True)
- import kombu.entity
-
def my_declare(myself):
self.assertEqual('all',
(myself.queue_arguments or {}).get('x-ha-policy'))