summaryrefslogtreecommitdiffstats
path: root/nova/openstack
diff options
context:
space:
mode:
authorEugene Kirpichov <ekirpichov@gmail.com>2012-09-25 20:33:28 +0000
committerEugene Kirpichov <ekirpichov@gmail.com>2012-09-26 18:59:20 +0000
commit4a4fcc468506a533d22f5438b4230db70bfffb54 (patch)
tree8c113b9e19670f6c62d4e225c6036f138fb2f9b8 /nova/openstack
parentfe478bd49f031d4f9edaa8def9737af85aef6be6 (diff)
downloadnova-4a4fcc468506a533d22f5438b4230db70bfffb54.tar.gz
nova-4a4fcc468506a533d22f5438b4230db70bfffb54.tar.xz
nova-4a4fcc468506a533d22f5438b4230db70bfffb54.zip
Support for several HA RabbitMQ servers.
Port from openstack-common: https://review.openstack.org/#/c/10305/ Change-Id: Ib44abf115b42c3df42771344f6722ce1db043bbd
Diffstat (limited to 'nova/openstack')
-rw-r--r--nova/openstack/common/network_utils.py68
-rw-r--r--nova/openstack/common/rpc/impl_kombu.py110
2 files changed, 140 insertions, 38 deletions
diff --git a/nova/openstack/common/network_utils.py b/nova/openstack/common/network_utils.py
new file mode 100644
index 000000000..69f673216
--- /dev/null
+++ b/nova/openstack/common/network_utils.py
@@ -0,0 +1,68 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Network-related utilities and helper functions.
+"""
+
+import logging
+
+LOG = logging.getLogger(__name__)
+
+
+def parse_host_port(address, default_port=None):
+ """
+ Interpret a string as a host:port pair.
+ An IPv6 address MUST be escaped if accompanied by a port,
+ because otherwise ambiguity ensues: 2001:db8:85a3::8a2e:370:7334
+ means both [2001:db8:85a3::8a2e:370:7334] and
+ [2001:db8:85a3::8a2e:370]:7334.
+
+ >>> parse_host_port('server01:80')
+ ('server01', 80)
+ >>> parse_host_port('server01')
+ ('server01', None)
+ >>> parse_host_port('server01', default_port=1234)
+ ('server01', 1234)
+ >>> parse_host_port('[::1]:80')
+ ('::1', 80)
+ >>> parse_host_port('[::1]')
+ ('::1', None)
+ >>> parse_host_port('[::1]', default_port=1234)
+ ('::1', 1234)
+ >>> parse_host_port('2001:db8:85a3::8a2e:370:7334', default_port=1234)
+ ('2001:db8:85a3::8a2e:370:7334', 1234)
+
+ """
+ if address[0] == '[':
+ # Escaped ipv6
+ _host, _port = address[1:].split(']')
+ host = _host
+ if ':' in _port:
+ port = _port.split(':')[1]
+ else:
+ port = default_port
+ else:
+ if address.count(':') == 1:
+ host, port = address.split(':')
+ else:
+ # 0 means ipv4, >1 means ipv6.
+ # We prohibit unescaped ipv6 addresses with port.
+ host = address
+ port = default_port
+
+ return (host, None if port is None else int(port))
diff --git a/nova/openstack/common/rpc/impl_kombu.py b/nova/openstack/common/rpc/impl_kombu.py
index fff1ed9ce..979ad88bd 100644
--- a/nova/openstack/common/rpc/impl_kombu.py
+++ b/nova/openstack/common/rpc/impl_kombu.py
@@ -33,6 +33,7 @@ from nova.openstack.common import cfg
from nova.openstack.common.gettextutils import _
from nova.openstack.common.rpc import amqp as rpc_amqp
from nova.openstack.common.rpc import common as rpc_common
+from nova.openstack.common import network_utils
kombu_opts = [
cfg.StrOpt('kombu_ssl_version',
@@ -50,10 +51,13 @@ kombu_opts = [
'(valid only if SSL enabled)')),
cfg.StrOpt('rabbit_host',
default='localhost',
- help='the RabbitMQ host'),
+ help='The RabbitMQ broker address where a single node is used'),
cfg.IntOpt('rabbit_port',
default=5672,
- help='the RabbitMQ port'),
+ help='The RabbitMQ broker port where a single node is used'),
+ cfg.ListOpt('rabbit_hosts',
+ default=['$rabbit_host:$rabbit_port'],
+ help='RabbitMQ HA cluster host:port pairs'),
cfg.BoolOpt('rabbit_use_ssl',
default=False,
help='connect over SSL for RabbitMQ'),
@@ -80,6 +84,11 @@ kombu_opts = [
cfg.BoolOpt('rabbit_durable_queues',
default=False,
help='use durable queues in RabbitMQ'),
+ cfg.BoolOpt('rabbit_ha_queues',
+ default=False,
+ help='use H/A queues in RabbitMQ (x-ha-policy: all).'
+ 'You need to wipe RabbitMQ database when '
+ 'changing this option.'),
]
@@ -88,6 +97,20 @@ cfg.CONF.register_opts(kombu_opts)
LOG = rpc_common.LOG
+def _get_queue_arguments(conf):
+ """Construct the arguments for declaring a queue.
+
+ If the rabbit_ha_queues option is set, we declare a mirrored queue
+ as described here:
+
+ http://www.rabbitmq.com/ha.html
+
+ Setting x-ha-policy to all means that the queue will be mirrored
+ to all nodes in the cluster.
+ """
+ return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
+
+
class ConsumerBase(object):
"""Consumer base class."""
@@ -207,6 +230,7 @@ class TopicConsumer(ConsumerBase):
"""
# Default options
options = {'durable': conf.rabbit_durable_queues,
+ 'queue_arguments': _get_queue_arguments(conf),
'auto_delete': False,
'exclusive': False}
options.update(kwargs)
@@ -331,6 +355,7 @@ class NotifyPublisher(TopicPublisher):
def __init__(self, conf, channel, topic, **kwargs):
self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
+ self.queue_arguments = _get_queue_arguments(conf)
super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
def reconnect(self, channel):
@@ -343,7 +368,8 @@ class NotifyPublisher(TopicPublisher):
exchange=self.exchange,
durable=self.durable,
name=self.routing_key,
- routing_key=self.routing_key)
+ routing_key=self.routing_key,
+ queue_arguments=self.queue_arguments)
queue.declare()
@@ -368,31 +394,37 @@ class Connection(object):
if server_params is None:
server_params = {}
-
# Keys to translate from server_params to kombu params
server_params_to_kombu_params = {'username': 'userid'}
- params = {}
- for sp_key, value in server_params.iteritems():
- p_key = server_params_to_kombu_params.get(sp_key, sp_key)
- params[p_key] = value
+ ssl_params = self._fetch_ssl_params()
+ params_list = []
+ for adr in self.conf.rabbit_hosts:
+ hostname, port = network_utils.parse_host_port(
+ adr, default_port=self.conf.rabbit_port)
- params.setdefault('hostname', self.conf.rabbit_host)
- params.setdefault('port', self.conf.rabbit_port)
- params.setdefault('userid', self.conf.rabbit_userid)
- params.setdefault('password', self.conf.rabbit_password)
- params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
+ params = {}
- self.params = params
+ for sp_key, value in server_params.iteritems():
+ p_key = server_params_to_kombu_params.get(sp_key, sp_key)
+ params[p_key] = value
- if self.conf.fake_rabbit:
- self.params['transport'] = 'memory'
- self.memory_transport = True
- else:
- self.memory_transport = False
+ params.setdefault('hostname', hostname)
+ params.setdefault('port', port)
+ params.setdefault('userid', self.conf.rabbit_userid)
+ params.setdefault('password', self.conf.rabbit_password)
+ params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
+
+ if self.conf.fake_rabbit:
+ params['transport'] = 'memory'
+ if self.conf.rabbit_use_ssl:
+ params['ssl'] = ssl_params
- if self.conf.rabbit_use_ssl:
- self.params['ssl'] = self._fetch_ssl_params()
+ params_list.append(params)
+
+ self.params_list = params_list
+
+ self.memory_transport = self.conf.fake_rabbit
self.connection = None
self.reconnect()
@@ -422,14 +454,14 @@ class Connection(object):
# Return the extended behavior
return ssl_params
- def _connect(self):
+ def _connect(self, params):
"""Connect to rabbit. Re-establish any queues that may have
been declared before if we are reconnecting. Exceptions should
be handled by the caller.
"""
if self.connection:
LOG.info(_("Reconnecting to AMQP server on "
- "%(hostname)s:%(port)d") % self.params)
+ "%(hostname)s:%(port)d") % params)
try:
self.connection.close()
except self.connection_errors:
@@ -437,7 +469,7 @@ class Connection(object):
# Setting this in case the next statement fails, though
# it shouldn't be doing any network operations, yet.
self.connection = None
- self.connection = kombu.connection.BrokerConnection(**self.params)
+ self.connection = kombu.connection.BrokerConnection(**params)
self.connection_errors = self.connection.connection_errors
if self.memory_transport:
# Kludge to speed up tests.
@@ -450,8 +482,8 @@ class Connection(object):
self.channel._new_queue('ae.undeliver')
for consumer in self.consumers:
consumer.reconnect(self.channel)
- LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
- self.params)
+ LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
+ params)
def reconnect(self):
"""Handles reconnecting and re-establishing queues.
@@ -464,11 +496,12 @@ class Connection(object):
attempt = 0
while True:
+ params = self.params_list[attempt % len(self.params_list)]
attempt += 1
try:
- self._connect()
+ self._connect(params)
return
- except (self.connection_errors, IOError), e:
+ except (IOError, self.connection_errors) as e:
pass
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
@@ -483,12 +516,12 @@ class Connection(object):
log_info = {}
log_info['err_str'] = str(e)
log_info['max_retries'] = self.max_retries
- log_info.update(self.params)
+ log_info.update(params)
if self.max_retries and attempt == self.max_retries:
- LOG.exception(_('Unable to connect to AMQP server on '
- '%(hostname)s:%(port)d after %(max_retries)d '
- 'tries: %(err_str)s') % log_info)
+ LOG.error(_('Unable to connect to AMQP server on '
+ '%(hostname)s:%(port)d after %(max_retries)d '
+ 'tries: %(err_str)s') % log_info)
# NOTE(comstud): Copied from original code. There's
# really no better recourse because if this was a queue we
# need to consume on, we have no way to consume anymore.
@@ -502,9 +535,9 @@ class Connection(object):
sleep_time = min(sleep_time, self.interval_max)
log_info['sleep_time'] = sleep_time
- LOG.exception(_('AMQP server on %(hostname)s:%(port)d is'
- ' unreachable: %(err_str)s. Trying again in '
- '%(sleep_time)d seconds.') % log_info)
+ LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
+ 'unreachable: %(err_str)s. Trying again in '
+ '%(sleep_time)d seconds.') % log_info)
time.sleep(sleep_time)
def ensure(self, error_callback, method, *args, **kwargs):
@@ -512,7 +545,8 @@ class Connection(object):
try:
return method(*args, **kwargs)
except (self.connection_errors, socket.timeout, IOError), e:
- pass
+ if error_callback:
+ error_callback(e)
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport
@@ -522,8 +556,8 @@ class Connection(object):
# and try to reconnect in this case.
if 'timeout' not in str(e):
raise
- if error_callback:
- error_callback(e)
+ if error_callback:
+ error_callback(e)
self.reconnect()
def get_channel(self):