summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEugene Kirpichov <ekirpichov@gmail.com>2012-07-24 04:14:51 +0000
committerMark McLoughlin <markmc@redhat.com>2012-09-25 11:52:46 +0100
commit1481849e0d55051f97bed7b1183d07ab724ccaa8 (patch)
tree4cace36d2f1e6a92391dc173109e4afcd0891bfc
parent2265258dad5c6a1e535336867d9cea5c7a2355d3 (diff)
downloadoslo-1481849e0d55051f97bed7b1183d07ab724ccaa8.tar.gz
oslo-1481849e0d55051f97bed7b1183d07ab724ccaa8.tar.xz
oslo-1481849e0d55051f97bed7b1183d07ab724ccaa8.zip
Support for several HA RabbitMQ servers.
Use the rabbit_ha_queues boolean option. It is off by default, and you need to wipe RabbitMQ Mnesia database (/var/lib/rabbitmq/mnesia) when changing it. Change-Id: Id06692a142d1e5316602f6ceed4ccad615ce74c1
-rw-r--r--openstack/common/rpc/impl_kombu.py78
-rw-r--r--tests/unit/rpc/test_kombu.py148
2 files changed, 180 insertions, 46 deletions
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index 8617bfc..9eaa3cd 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -33,6 +33,7 @@ from openstack.common import cfg
from openstack.common.gettextutils import _
from openstack.common.rpc import amqp as rpc_amqp
from openstack.common.rpc import common as rpc_common
+from openstack.common import utils
kombu_opts = [
cfg.StrOpt('kombu_ssl_version',
@@ -50,10 +51,13 @@ kombu_opts = [
'(valid only if SSL enabled)')),
cfg.StrOpt('rabbit_host',
default='localhost',
- help='the RabbitMQ host'),
+ help='Deprecated: Use rabbit_hosts instead. The RabbitMQ host'),
cfg.IntOpt('rabbit_port',
default=5672,
- help='the RabbitMQ port'),
+ help='Deprecated: Use rabbit_hosts instead. The RabbitMQ port'),
+ cfg.ListOpt('rabbit_hosts',
+ default=['$rabbit_host:$rabbit_port'],
+ help='RabbitMQ HA cluster host:port pairs'),
cfg.BoolOpt('rabbit_use_ssl',
default=False,
help='connect over SSL for RabbitMQ'),
@@ -80,6 +84,11 @@ kombu_opts = [
cfg.BoolOpt('rabbit_durable_queues',
default=False,
help='use durable queues in RabbitMQ'),
+ cfg.BoolOpt('rabbit_ha_queues',
+ default=False,
+ help='use H/A queues in RabbitMQ (x-ha-policy: all).'
+ 'You need to wipe RabbitMQ database when '
+ 'changing this option.'),
]
@@ -206,7 +215,9 @@ class TopicConsumer(ConsumerBase):
Other kombu options may be passed as keyword arguments
"""
# Default options
+ args = {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
options = {'durable': conf.rabbit_durable_queues,
+ 'queue_arguments': args,
'auto_delete': False,
'exclusive': False}
options.update(kwargs)
@@ -336,6 +347,7 @@ class NotifyPublisher(TopicPublisher):
def __init__(self, conf, channel, topic, **kwargs):
self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
+ self.rabbit_ha_queues = conf.rabbit_ha_queues
super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
def reconnect(self, channel):
@@ -344,11 +356,13 @@ class NotifyPublisher(TopicPublisher):
# NOTE(jerdfelt): Normally the consumer would create the queue, but
# we do this to ensure that messages don't get dropped if the
# consumer is started after we do
+ args = {'x-ha-policy': 'all'} if self.rabbit_ha_queues else {}
queue = kombu.entity.Queue(channel=channel,
exchange=self.exchange,
durable=self.durable,
name=self.routing_key,
- routing_key=self.routing_key)
+ routing_key=self.routing_key,
+ queue_arguments=args)
queue.declare()
@@ -373,31 +387,36 @@ class Connection(object):
if server_params is None:
server_params = {}
-
# Keys to translate from server_params to kombu params
server_params_to_kombu_params = {'username': 'userid'}
- params = {}
- for sp_key, value in server_params.iteritems():
- p_key = server_params_to_kombu_params.get(sp_key, sp_key)
- params[p_key] = value
+ ssl_params = self._fetch_ssl_params()
+ params_list = []
+ for adr in self.conf.rabbit_hosts:
+ hostname, port = utils.parse_host_port(adr, default_port=5672)
- params.setdefault('hostname', self.conf.rabbit_host)
- params.setdefault('port', self.conf.rabbit_port)
- params.setdefault('userid', self.conf.rabbit_userid)
- params.setdefault('password', self.conf.rabbit_password)
- params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
+ params = {}
- self.params = params
+ for sp_key, value in server_params.iteritems():
+ p_key = server_params_to_kombu_params.get(sp_key, sp_key)
+ params[p_key] = value
- if self.conf.fake_rabbit:
- self.params['transport'] = 'memory'
- self.memory_transport = True
- else:
- self.memory_transport = False
+ params.setdefault('hostname', hostname)
+ params.setdefault('port', port)
+ params.setdefault('userid', self.conf.rabbit_userid)
+ params.setdefault('password', self.conf.rabbit_password)
+ params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
+
+ if self.conf.fake_rabbit:
+ params['transport'] = 'memory'
+ if self.conf.rabbit_use_ssl:
+ params['ssl'] = ssl_params
+
+ params_list.append(params)
+
+ self.params_list = params_list
- if self.conf.rabbit_use_ssl:
- self.params['ssl'] = self._fetch_ssl_params()
+ self.memory_transport = self.conf.fake_rabbit
self.connection = None
self.reconnect()
@@ -427,14 +446,14 @@ class Connection(object):
# Return the extended behavior
return ssl_params
- def _connect(self):
+ def _connect(self, params):
"""Connect to rabbit. Re-establish any queues that may have
been declared before if we are reconnecting. Exceptions should
be handled by the caller.
"""
if self.connection:
LOG.info(_("Reconnecting to AMQP server on "
- "%(hostname)s:%(port)d") % self.params)
+ "%(hostname)s:%(port)d") % params)
try:
self.connection.close()
except self.connection_errors:
@@ -442,7 +461,7 @@ class Connection(object):
# Setting this in case the next statement fails, though
# it shouldn't be doing any network operations, yet.
self.connection = None
- self.connection = kombu.connection.BrokerConnection(**self.params)
+ self.connection = kombu.connection.BrokerConnection(**params)
self.connection_errors = self.connection.connection_errors
if self.memory_transport:
# Kludge to speed up tests.
@@ -455,8 +474,8 @@ class Connection(object):
self.channel._new_queue('ae.undeliver')
for consumer in self.consumers:
consumer.reconnect(self.channel)
- LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
- self.params)
+ LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
+ params)
def reconnect(self):
"""Handles reconnecting and re-establishing queues.
@@ -469,11 +488,12 @@ class Connection(object):
attempt = 0
while True:
+ params = self.params_list[attempt % len(self.params_list)]
attempt += 1
try:
- self._connect()
+ self._connect(params)
return
- except (self.connection_errors, IOError), e:
+ except (IOError, self.connection_errors) as e:
pass
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
@@ -488,7 +508,7 @@ class Connection(object):
log_info = {}
log_info['err_str'] = str(e)
log_info['max_retries'] = self.max_retries
- log_info.update(self.params)
+ log_info.update(params)
if self.max_retries and attempt == self.max_retries:
LOG.exception(_('Unable to connect to AMQP server on '
diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py
index 2cdca15..16ee63e 100644
--- a/tests/unit/rpc/test_kombu.py
+++ b/tests/unit/rpc/test_kombu.py
@@ -22,6 +22,7 @@ Unit Tests for remote procedure calls using kombu
import eventlet
eventlet.monkey_patch()
+import contextlib
import logging
import unittest
@@ -32,6 +33,7 @@ from openstack.common import exception
from openstack.common.rpc import amqp as rpc_amqp
from openstack.common.rpc import common as rpc_common
from openstack.common import testutils
+from tests import utils
from tests.unit.rpc import common
try:
@@ -64,7 +66,8 @@ def _raise_exc_stub(stubs, times, obj, method, exc_msg,
return info
-class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
+class KombuStubs:
+ @staticmethod
def setUp(self):
self.stubs = stubout.StubOutForTesting()
if kombu:
@@ -73,14 +76,23 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
self.rpc = impl_kombu
else:
self.rpc = None
- super(RpcKombuTestCase, self).setUp()
+ @staticmethod
def tearDown(self):
self.stubs.UnsetAll()
self.stubs.SmartUnsetAll()
if kombu:
impl_kombu.cleanup()
- super(RpcKombuTestCase, self).tearDown()
+
+
+class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
+ def setUp(self):
+ KombuStubs.setUp(self)
+ common.BaseRpcAMQPTestCase.setUp(self)
+
+ def tearDown(self):
+ common.BaseRpcAMQPTestCase.tearDown(self)
+ KombuStubs.tearDown(self)
@testutils.skip_if(kombu is None, "Test requires kombu")
def test_reusing_connection(self):
@@ -166,13 +178,13 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
def __init__(myself, *args, **kwargs):
super(MyConnection, myself).__init__(*args, **kwargs)
self.assertEqual(
- myself.params,
- {'hostname': FLAGS.rabbit_host,
- 'userid': FLAGS.rabbit_userid,
- 'password': FLAGS.rabbit_password,
- 'port': FLAGS.rabbit_port,
- 'virtual_host': FLAGS.rabbit_virtual_host,
- 'transport': 'memory'})
+ myself.params_list,
+ [{'hostname': FLAGS.rabbit_host,
+ 'userid': FLAGS.rabbit_userid,
+ 'password': FLAGS.rabbit_password,
+ 'port': FLAGS.rabbit_port,
+ 'virtual_host': FLAGS.rabbit_virtual_host,
+ 'transport': 'memory'}])
def topic_send(_context, topic, msg):
pass
@@ -199,13 +211,13 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
def __init__(myself, *args, **kwargs):
super(MyConnection, myself).__init__(*args, **kwargs)
self.assertEqual(
- myself.params,
- {'hostname': server_params['hostname'],
- 'userid': server_params['username'],
- 'password': server_params['password'],
- 'port': server_params['port'],
- 'virtual_host': server_params['virtual_host'],
- 'transport': 'memory'})
+ myself.params_list,
+ [{'hostname': server_params['hostname'],
+ 'userid': server_params['username'],
+ 'password': server_params['password'],
+ 'port': server_params['port'],
+ 'virtual_host': server_params['virtual_host'],
+ 'transport': 'memory'}])
def topic_send(_context, topic, msg):
pass
@@ -411,3 +423,105 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
self.assertTrue(value in unicode(exc))
#Traceback should be included in exception message
self.assertTrue('exception.ApiError' in unicode(exc))
+
+
+class RpcKombuHATestCase(utils.BaseTestCase):
+ def setUp(self):
+ utils.BaseTestCase.setUp(self)
+ KombuStubs.setUp(self)
+
+ def tearDown(self):
+ FLAGS.reset()
+ KombuStubs.tearDown(self)
+ utils.BaseTestCase.tearDown(self)
+
+ @testutils.skip_if(kombu is None, "Test requires kombu")
+ def test_roundrobin_reconnect(self):
+ """Test that rabbits are tried in roundrobin at connection failures."""
+ self.config(rabbit_hosts=[
+ 'host1:1234', 'host2:5678', '[::1]:2345',
+ '[2001:0db8:85a3:0042:0000:8a2e:0370:7334]'],
+ rabbit_retry_interval=0.1,
+ rabbit_retry_backoff=0.1)
+
+ info = {
+ 'attempt': 0,
+ 'params_list': [
+ {'hostname': 'host1',
+ 'userid': FLAGS.rabbit_userid,
+ 'password': FLAGS.rabbit_password,
+ 'port': 1234,
+ 'virtual_host': FLAGS.rabbit_virtual_host,
+ 'transport': 'memory'},
+ {'hostname': 'host2',
+ 'userid': FLAGS.rabbit_userid,
+ 'password': FLAGS.rabbit_password,
+ 'port': 5678,
+ 'virtual_host': FLAGS.rabbit_virtual_host,
+ 'transport': 'memory'},
+ {'hostname': '::1',
+ 'userid': FLAGS.rabbit_userid,
+ 'password': FLAGS.rabbit_password,
+ 'port': 2345,
+ 'virtual_host': FLAGS.rabbit_virtual_host,
+ 'transport': 'memory'},
+ {'hostname': '2001:0db8:85a3:0042:0000:8a2e:0370:7334',
+ 'userid': FLAGS.rabbit_userid,
+ 'password': FLAGS.rabbit_password,
+ 'port': 5672,
+ 'virtual_host': FLAGS.rabbit_virtual_host,
+ 'transport': 'memory'},
+ ]
+ }
+
+ import kombu.connection
+
+ class MyConnection(kombu.connection.BrokerConnection):
+ def __init__(myself, *args, **params):
+ super(MyConnection, myself).__init__(*args, **params)
+ self.assertEqual(params,
+ info['params_list'][info['attempt'] %
+ len(info['params_list'])])
+ info['attempt'] = info['attempt'] + 1
+
+ def connect(myself):
+ if info['attempt'] < 5:
+ # the word timeout is important (see impl_kombu.py:486)
+ raise Exception('connection timeout')
+ super(kombu.connection.BrokerConnection, myself).connect()
+
+ self.stubs.Set(kombu.connection, 'BrokerConnection', MyConnection)
+
+ conn = self.rpc.Connection(FLAGS)
+
+ self.assertEqual(info['attempt'], 5)
+
+ @testutils.skip_if(kombu is None, "Test requires kombu")
+ def test_queue_not_declared_ha_if_ha_off(self):
+ self.config(rabbit_ha_queues=False)
+
+ import kombu.entity
+
+ def my_declare(myself):
+ self.assertEqual(None,
+ (myself.queue_arguments or {}).get('x-ha-policy'))
+
+ self.stubs.Set(kombu.entity.Queue, 'declare', my_declare)
+
+ with contextlib.closing(self.rpc.create_connection(FLAGS)) as conn:
+ conn.declare_topic_consumer('a_topic', lambda *args: None)
+
+ @testutils.skip_if(kombu is None, "Test requires kombu")
+ def test_queue_declared_ha_if_ha_on(self):
+ self.config(rabbit_ha_queues=True)
+
+ import kombu.entity
+
+ def my_declare(myself):
+ self.assertEqual('all',
+ (myself.queue_arguments or {}).get('x-ha-policy'))
+
+ self.stubs.Set(kombu.entity.Queue, 'declare', my_declare)
+
+ with contextlib.closing(self.rpc.create_connection(FLAGS)) as conn:
+ conn.declare_topic_consumer('a_topic', lambda *args: None)