summaryrefslogtreecommitdiffstats
path: root/nova/openstack
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2012-07-02 18:56:37 +0000
committerGerrit Code Review <review@openstack.org>2012-07-02 18:56:37 +0000
commit2b1eaa0afaff8b905801092c73395150127ef55a (patch)
treed202180f74dc7a12f31fd4479887bcdf720d404a /nova/openstack
parent75364f36881eba043e3aad96fd06a5c660005aea (diff)
parent0cd5c7faa2dfe2fd2538fd5bdcfad063c2691544 (diff)
downloadnova-2b1eaa0afaff8b905801092c73395150127ef55a.tar.gz
nova-2b1eaa0afaff8b905801092c73395150127ef55a.tar.xz
nova-2b1eaa0afaff8b905801092c73395150127ef55a.zip
Merge changes I5b0d4b8e,I61f6734c,Ia2b6e1a7,Ifc1533fe,I306e0f00,I3da35898
* changes: Sync rpc from openstack-common. Sync jsonutils from openstack-common. Sync iniparser from openstack-common. Sync latest importutils from openstack-common. Sync excutils from openstack-common. Sync cfg from openstack-common.
Diffstat (limited to 'nova/openstack')
-rw-r--r--nova/openstack/common/cfg.py110
-rw-r--r--nova/openstack/common/excutils.py2
-rw-r--r--nova/openstack/common/importutils.py15
-rw-r--r--nova/openstack/common/iniparser.py5
-rw-r--r--nova/openstack/common/jsonutils.py11
-rw-r--r--nova/openstack/common/rpc/__init__.py2
-rw-r--r--nova/openstack/common/rpc/amqp.py21
-rw-r--r--nova/openstack/common/rpc/common.py7
-rw-r--r--nova/openstack/common/rpc/dispatcher.py10
-rw-r--r--nova/openstack/common/rpc/impl_kombu.py207
-rw-r--r--nova/openstack/common/rpc/impl_qpid.py112
-rw-r--r--nova/openstack/common/rpc/impl_zmq.py42
-rw-r--r--nova/openstack/common/rpc/matchmaker.py4
-rw-r--r--nova/openstack/common/rpc/proxy.py2
14 files changed, 312 insertions, 238 deletions
diff --git a/nova/openstack/common/cfg.py b/nova/openstack/common/cfg.py
index dd367aeb6..6d71b0304 100644
--- a/nova/openstack/common/cfg.py
+++ b/nova/openstack/common/cfg.py
@@ -391,7 +391,7 @@ def _get_config_dirs(project=None):
fix_path('~'),
os.path.join('/etc', project) if project else None,
'/etc'
- ]
+ ]
return filter(bool, cfg_dirs)
@@ -489,12 +489,13 @@ class Opt(object):
metavar:
the name shown as the argument to a CLI option in --help output
help:
- a string explaining how the options value is used
+ an string explaining how the options value is used
"""
multi = False
def __init__(self, name, dest=None, short=None, default=None,
- metavar=None, help=None, secret=False, required=False):
+ metavar=None, help=None, secret=False, required=False,
+ deprecated_name=None):
"""Construct an Opt object.
The only required parameter is the option's name. However, it is
@@ -508,6 +509,7 @@ class Opt(object):
:param help: an explanation of how the option is used
:param secret: true iff the value should be obfuscated in log output
:param required: true iff a value must be supplied for this option
+ :param deprecated_name: deprecated name option. Acts like an alias
"""
self.name = name
if dest is None:
@@ -520,6 +522,10 @@ class Opt(object):
self.help = help
self.secret = secret
self.required = required
+ if deprecated_name is not None:
+ self.deprecated_name = deprecated_name.replace('-', '_')
+ else:
+ self.deprecated_name = None
def _get_from_config_parser(self, cparser, section):
"""Retrieves the option value from a MultiConfigParser object.
@@ -531,7 +537,13 @@ class Opt(object):
:param cparser: a ConfigParser object
:param section: a section name
"""
- return cparser.get(section, self.dest)
+ return self._cparser_get_with_deprecated(cparser, section)
+
+ def _cparser_get_with_deprecated(self, cparser, section):
+ """If cannot find option as dest try deprecated_name alias."""
+ if self.deprecated_name is not None:
+ return cparser.get(section, [self.dest, self.deprecated_name])
+ return cparser.get(section, [self.dest])
def _add_to_cli(self, parser, group=None):
"""Makes the option available in the command line interface.
@@ -546,9 +558,11 @@ class Opt(object):
container = self._get_optparse_container(parser, group)
kwargs = self._get_optparse_kwargs(group)
prefix = self._get_optparse_prefix('', group)
- self._add_to_optparse(container, self.name, self.short, kwargs, prefix)
+ self._add_to_optparse(container, self.name, self.short, kwargs, prefix,
+ self.deprecated_name)
- def _add_to_optparse(self, container, name, short, kwargs, prefix=''):
+ def _add_to_optparse(self, container, name, short, kwargs, prefix='',
+ deprecated_name=None):
"""Add an option to an optparse parser or group.
:param container: an optparse.OptionContainer object
@@ -561,6 +575,8 @@ class Opt(object):
args = ['--' + prefix + name]
if short:
args += ['-' + short]
+ if deprecated_name:
+ args += ['--' + prefix + deprecated_name]
for a in args:
if container.has_option(a):
raise DuplicateOptError(a)
@@ -591,11 +607,9 @@ class Opt(object):
dest = self.dest
if group is not None:
dest = group.name + '_' + dest
- kwargs.update({
- 'dest': dest,
- 'metavar': self.metavar,
- 'help': self.help,
- })
+ kwargs.update({'dest': dest,
+ 'metavar': self.metavar,
+ 'help': self.help, })
return kwargs
def _get_optparse_prefix(self, prefix, group):
@@ -645,7 +659,8 @@ class BoolOpt(Opt):
return value
- return [convert_bool(v) for v in cparser.get(section, self.dest)]
+ return [convert_bool(v) for v in
+ self._cparser_get_with_deprecated(cparser, section)]
def _add_to_cli(self, parser, group=None):
"""Extends the base class method to add the --nooptname option."""
@@ -658,7 +673,8 @@ class BoolOpt(Opt):
kwargs = self._get_optparse_kwargs(group, action='store_false')
prefix = self._get_optparse_prefix('no', group)
kwargs["help"] = "The inverse of --" + self.name
- self._add_to_optparse(container, self.name, None, kwargs, prefix)
+ self._add_to_optparse(container, self.name, None, kwargs, prefix,
+ self.deprecated_name)
def _get_optparse_kwargs(self, group, action='store_true', **kwargs):
"""Extends the base optparse keyword dict for boolean options."""
@@ -671,8 +687,9 @@ class IntOpt(Opt):
"""Int opt values are converted to integers using the int() builtin."""
def _get_from_config_parser(self, cparser, section):
- """Retrieve the opt value as an integer from ConfigParser."""
- return [int(v) for v in cparser.get(section, self.dest)]
+ """Retrieve the opt value as a integer from ConfigParser."""
+ return [int(v) for v in self._cparser_get_with_deprecated(cparser,
+ section)]
def _get_optparse_kwargs(self, group, **kwargs):
"""Extends the base optparse keyword dict for integer options."""
@@ -686,7 +703,8 @@ class FloatOpt(Opt):
def _get_from_config_parser(self, cparser, section):
"""Retrieve the opt value as a float from ConfigParser."""
- return [float(v) for v in cparser.get(section, self.dest)]
+ return [float(v) for v in
+ self._cparser_get_with_deprecated(cparser, section)]
def _get_optparse_kwargs(self, group, **kwargs):
"""Extends the base optparse keyword dict for float options."""
@@ -703,7 +721,8 @@ class ListOpt(Opt):
def _get_from_config_parser(self, cparser, section):
"""Retrieve the opt value as a list from ConfigParser."""
- return [v.split(',') for v in cparser.get(section, self.dest)]
+ return [v.split(',') for v in
+ self._cparser_get_with_deprecated(cparser, section)]
def _get_optparse_kwargs(self, group, **kwargs):
"""Extends the base optparse keyword dict for list options."""
@@ -732,6 +751,13 @@ class MultiStrOpt(Opt):
return super(MultiStrOpt,
self)._get_optparse_kwargs(group, action='append')
+ def _cparser_get_with_deprecated(self, cparser, section):
+ """If cannot find option as dest try deprecated_name alias."""
+ if self.deprecated_name is not None:
+ return cparser.get(section, [self.dest, self.deprecated_name],
+ multi=True)
+ return cparser.get(section, [self.dest], multi=True)
+
class OptGroup(object):
@@ -846,25 +872,38 @@ class ConfigParser(iniparser.BaseParser):
class MultiConfigParser(object):
def __init__(self):
- self.sections = {}
+ self.parsed = []
def read(self, config_files):
read_ok = []
for filename in config_files:
- parser = ConfigParser(filename, self.sections)
+ sections = {}
+ parser = ConfigParser(filename, sections)
try:
parser.parse()
except IOError:
continue
-
+ self.parsed.insert(0, sections)
read_ok.append(filename)
return read_ok
- def get(self, section, name):
- return self.sections[section][name]
+ def get(self, section, names, multi=False):
+ rvalue = []
+ for sections in self.parsed:
+ if section not in sections:
+ continue
+ for name in names:
+ if name in sections[section]:
+ if multi:
+ rvalue = sections[section][name] + rvalue
+ else:
+ return sections[section][name]
+ if multi and rvalue != []:
+ return rvalue
+ raise KeyError
class ConfigOpts(collections.Mapping):
@@ -905,13 +944,13 @@ class ConfigOpts(collections.Mapping):
self._oparser.disable_interspersed_args()
self._config_opts = [
- MultiStrOpt('config-file',
- default=default_config_files,
- metavar='PATH',
- help='Path to a config file to use. Multiple config '
- 'files can be specified, with values in later '
- 'files taking precedence. The default files '
- ' used are: %s' % (default_config_files, )),
+ MultiStrOpt('config-file',
+ default=default_config_files,
+ metavar='PATH',
+ help='Path to a config file to use. Multiple config '
+ 'files can be specified, with values in later '
+ 'files taking precedence. The default files '
+ ' used are: %s' % (default_config_files, )),
StrOpt('config-dir',
metavar='DIR',
help='Path to a config directory to pull *.conf '
@@ -921,7 +960,7 @@ class ConfigOpts(collections.Mapping):
'the file(s), if any, specified via --config-file, '
'hence over-ridden options in the directory take '
'precedence.'),
- ]
+ ]
self.register_cli_opts(self._config_opts)
self.project = project
@@ -1323,7 +1362,7 @@ class ConfigOpts(collections.Mapping):
def _substitute(self, value):
"""Perform string template substitution.
- Substititue any template variables (e.g. $foo, ${bar}) in the supplied
+ Substitute any template variables (e.g. $foo, ${bar}) in the supplied
string value(s) with opt values.
:param value: the string value, or list of string values
@@ -1338,7 +1377,7 @@ class ConfigOpts(collections.Mapping):
return value
def _get_group(self, group_or_name, autocreate=False):
- """Looks up an OptGroup object.
+ """Looks up a OptGroup object.
Helper function to return an OptGroup given a parameter which can
either be the group's name or an OptGroup object.
@@ -1411,8 +1450,7 @@ class ConfigOpts(collections.Mapping):
default, opt, override = [info[k] for k in sorted(info.keys())]
if opt.required:
- if (default is not None or
- override is not None):
+ if (default is not None or override is not None):
continue
if self._get(opt.name, group) is None:
@@ -1516,7 +1554,7 @@ class CommonConfigOpts(ConfigOpts):
short='v',
default=False,
help='Print more verbose output'),
- ]
+ ]
logging_cli_opts = [
StrOpt('log-config',
@@ -1550,7 +1588,7 @@ class CommonConfigOpts(ConfigOpts):
StrOpt('syslog-log-facility',
default='LOG_USER',
help='syslog facility to receive log lines')
- ]
+ ]
def __init__(self):
super(CommonConfigOpts, self).__init__()
diff --git a/nova/openstack/common/excutils.py b/nova/openstack/common/excutils.py
index 3cb678e9d..67c9fa951 100644
--- a/nova/openstack/common/excutils.py
+++ b/nova/openstack/common/excutils.py
@@ -44,6 +44,6 @@ def save_and_reraise_exception():
yield
except Exception:
logging.error('Original exception being dropped: %s' %
- (traceback.format_exception(type_, value, tb)))
+ (traceback.format_exception(type_, value, tb)))
raise
raise type_, value, tb
diff --git a/nova/openstack/common/importutils.py b/nova/openstack/common/importutils.py
index 7654af5b9..67d94ad5f 100644
--- a/nova/openstack/common/importutils.py
+++ b/nova/openstack/common/importutils.py
@@ -30,7 +30,7 @@ def import_class(import_str):
return getattr(sys.modules[mod_str], class_str)
except (ImportError, ValueError, AttributeError), exc:
raise ImportError('Class %s cannot be found (%s)' %
- (class_str, str(exc)))
+ (class_str, str(exc)))
def import_object(import_str, *args, **kwargs):
@@ -38,6 +38,19 @@ def import_object(import_str, *args, **kwargs):
return import_class(import_str)(*args, **kwargs)
+def import_object_ns(name_space, import_str, *args, **kwargs):
+ """
+ Import a class and return an instance of it, first by trying
+ to find the class in a default namespace, then failing back to
+ a full path if not found in the default namespace.
+ """
+ import_value = "%s.%s" % (name_space, import_str)
+ try:
+ return import_class(import_value)(*args, **kwargs)
+ except ImportError:
+ return import_class(import_str)(*args, **kwargs)
+
+
def import_module(import_str):
"""Import a module."""
__import__(import_str)
diff --git a/nova/openstack/common/iniparser.py b/nova/openstack/common/iniparser.py
index 53ca02334..e91eea538 100644
--- a/nova/openstack/common/iniparser.py
+++ b/nova/openstack/common/iniparser.py
@@ -52,7 +52,10 @@ class BaseParser(object):
else:
key, value = line[:colon], line[colon + 1:]
- return key.strip(), [value.strip()]
+ value = value.strip()
+ if value[0] == value[-1] and value[0] == "\"" or value[0] == "'":
+ value = value[1:-1]
+ return key.strip(), [value]
def parse(self, lineiter):
key = None
diff --git a/nova/openstack/common/jsonutils.py b/nova/openstack/common/jsonutils.py
index fa8b8f9d1..752266981 100644
--- a/nova/openstack/common/jsonutils.py
+++ b/nova/openstack/common/jsonutils.py
@@ -37,6 +37,7 @@ import datetime
import inspect
import itertools
import json
+import xmlrpclib
def to_primitive(value, convert_instances=False, level=0):
@@ -81,6 +82,12 @@ def to_primitive(value, convert_instances=False, level=0):
# The try block may not be necessary after the class check above,
# but just in case ...
try:
+ # It's not clear why xmlrpclib created their own DateTime type, but
+ # for our purposes, make it a datetime type which is explicitly
+ # handled
+ if isinstance(value, xmlrpclib.DateTime):
+ value = datetime.datetime(*tuple(value.timetuple())[:6])
+
if isinstance(value, (list, tuple)):
o = []
for v in value:
@@ -115,8 +122,8 @@ def to_primitive(value, convert_instances=False, level=0):
return unicode(value)
-def dumps(value):
- return json.dumps(value, default=to_primitive)
+def dumps(value, default=to_primitive, **kwargs):
+ return json.dumps(value, default=default, **kwargs)
def loads(s):
diff --git a/nova/openstack/common/rpc/__init__.py b/nova/openstack/common/rpc/__init__.py
index 6f022b3b2..1532c666e 100644
--- a/nova/openstack/common/rpc/__init__.py
+++ b/nova/openstack/common/rpc/__init__.py
@@ -56,7 +56,7 @@ rpc_opts = [
cfg.BoolOpt('fake_rabbit',
default=False,
help='If passed, use a fake RabbitMQ provider'),
- ]
+]
cfg.CONF.register_opts(rpc_opts)
diff --git a/nova/openstack/common/rpc/amqp.py b/nova/openstack/common/rpc/amqp.py
index 5f62d35c5..510a03819 100644
--- a/nova/openstack/common/rpc/amqp.py
+++ b/nova/openstack/common/rpc/amqp.py
@@ -92,8 +92,9 @@ class ConnectionContext(rpc_common.Connection):
if pooled:
self.connection = connection_pool.get()
else:
- self.connection = connection_pool.connection_cls(conf,
- server_params=server_params)
+ self.connection = connection_pool.connection_cls(
+ conf,
+ server_params=server_params)
self.pooled = pooled
def __enter__(self):
@@ -161,8 +162,8 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
msg = {'result': reply, 'failure': failure}
except TypeError:
msg = {'result': dict((k, repr(v))
- for k, v in reply.__dict__.iteritems()),
- 'failure': failure}
+ for k, v in reply.__dict__.iteritems()),
+ 'failure': failure}
if ending:
msg['ending'] = True
conn.direct_send(msg_id, msg)
@@ -288,8 +289,8 @@ class ProxyCallback(object):
class MulticallWaiter(object):
def __init__(self, conf, connection, timeout):
self._connection = connection
- self._iterator = connection.iterconsume(
- timeout=timeout or conf.rpc_response_timeout)
+ self._iterator = connection.iterconsume(timeout=timeout or
+ conf.rpc_response_timeout)
self._result = None
self._done = False
self._got_ending = False
@@ -308,7 +309,7 @@ class MulticallWaiter(object):
if data['failure']:
failure = data['failure']
self._result = rpc_common.deserialize_remote_exception(self._conf,
- failure)
+ failure)
elif data.get('ending', False):
self._got_ending = True
@@ -389,16 +390,16 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
"""Sends a message on a topic to a specific server."""
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
- server_params=server_params) as conn:
+ server_params=server_params) as conn:
conn.topic_send(topic, msg)
def fanout_cast_to_server(conf, context, server_params, topic, msg,
- connection_pool):
+ connection_pool):
"""Sends a message on a fanout exchange to a specific server."""
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
- server_params=server_params) as conn:
+ server_params=server_params) as conn:
conn.fanout_send(topic, msg)
diff --git a/nova/openstack/common/rpc/common.py b/nova/openstack/common/rpc/common.py
index 0b927a0ee..7bc65a6a6 100644
--- a/nova/openstack/common/rpc/common.py
+++ b/nova/openstack/common/rpc/common.py
@@ -23,6 +23,7 @@ import sys
import traceback
from nova.openstack.common import cfg
+from nova.openstack.common.gettextutils import _
from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
from nova.openstack.common import local
@@ -167,10 +168,8 @@ class Connection(object):
def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging."""
- SANITIZE = {
- 'set_admin_password': ('new_pass',),
- 'run_instance': ('admin_password',),
- }
+ SANITIZE = {'set_admin_password': ('new_pass',),
+ 'run_instance': ('admin_password',), }
has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
has_context_token = '_context_auth_token' in msg_data
diff --git a/nova/openstack/common/rpc/dispatcher.py b/nova/openstack/common/rpc/dispatcher.py
index 1d36f32ea..1fdb16c0b 100644
--- a/nova/openstack/common/rpc/dispatcher.py
+++ b/nova/openstack/common/rpc/dispatcher.py
@@ -92,14 +92,20 @@ class RpcDispatcher(object):
if not version:
version = '1.0'
+ had_compatible = False
for proxyobj in self.callbacks:
if hasattr(proxyobj, 'RPC_API_VERSION'):
rpc_api_version = proxyobj.RPC_API_VERSION
else:
rpc_api_version = '1.0'
+ is_compatible = self._is_compatible(rpc_api_version, version)
+ had_compatible = had_compatible or is_compatible
if not hasattr(proxyobj, method):
continue
- if self._is_compatible(rpc_api_version, version):
+ if is_compatible:
return getattr(proxyobj, method)(ctxt, **kwargs)
- raise rpc_common.UnsupportedRpcVersion(version=version)
+ if had_compatible:
+ raise AttributeError("No such RPC function '%s'" % method)
+ else:
+ raise rpc_common.UnsupportedRpcVersion(version=version)
diff --git a/nova/openstack/common/rpc/impl_kombu.py b/nova/openstack/common/rpc/impl_kombu.py
index 66152a9d5..fff1ed9ce 100644
--- a/nova/openstack/common/rpc/impl_kombu.py
+++ b/nova/openstack/common/rpc/impl_kombu.py
@@ -30,6 +30,7 @@ import kombu.entity
import kombu.messaging
from nova.openstack.common import cfg
+from nova.openstack.common.gettextutils import _
from nova.openstack.common.rpc import amqp as rpc_amqp
from nova.openstack.common.rpc import common as rpc_common
@@ -46,7 +47,7 @@ kombu_opts = [
cfg.StrOpt('kombu_ssl_ca_certs',
default='',
help=('SSL certification authority file '
- '(valid only if SSL enabled)')),
+ '(valid only if SSL enabled)')),
cfg.StrOpt('rabbit_host',
default='localhost',
help='the RabbitMQ host'),
@@ -80,7 +81,7 @@ kombu_opts = [
default=False,
help='use durable queues in RabbitMQ'),
- ]
+]
cfg.CONF.register_opts(kombu_opts)
@@ -171,22 +172,20 @@ class DirectConsumer(ConsumerBase):
"""
# Default options
options = {'durable': False,
- 'auto_delete': True,
- 'exclusive': True}
+ 'auto_delete': True,
+ 'exclusive': True}
options.update(kwargs)
- exchange = kombu.entity.Exchange(
- name=msg_id,
- type='direct',
- durable=options['durable'],
- auto_delete=options['auto_delete'])
- super(DirectConsumer, self).__init__(
- channel,
- callback,
- tag,
- name=msg_id,
- exchange=exchange,
- routing_key=msg_id,
- **options)
+ exchange = kombu.entity.Exchange(name=msg_id,
+ type='direct',
+ durable=options['durable'],
+ auto_delete=options['auto_delete'])
+ super(DirectConsumer, self).__init__(channel,
+ callback,
+ tag,
+ name=msg_id,
+ exchange=exchange,
+ routing_key=msg_id,
+ **options)
class TopicConsumer(ConsumerBase):
@@ -208,22 +207,20 @@ class TopicConsumer(ConsumerBase):
"""
# Default options
options = {'durable': conf.rabbit_durable_queues,
- 'auto_delete': False,
- 'exclusive': False}
+ 'auto_delete': False,
+ 'exclusive': False}
options.update(kwargs)
- exchange = kombu.entity.Exchange(
- name=conf.control_exchange,
- type='topic',
- durable=options['durable'],
- auto_delete=options['auto_delete'])
- super(TopicConsumer, self).__init__(
- channel,
- callback,
- tag,
- name=name or topic,
- exchange=exchange,
- routing_key=topic,
- **options)
+ exchange = kombu.entity.Exchange(name=conf.control_exchange,
+ type='topic',
+ durable=options['durable'],
+ auto_delete=options['auto_delete'])
+ super(TopicConsumer, self).__init__(channel,
+ callback,
+ tag,
+ name=name or topic,
+ exchange=exchange,
+ routing_key=topic,
+ **options)
class FanoutConsumer(ConsumerBase):
@@ -245,22 +242,17 @@ class FanoutConsumer(ConsumerBase):
# Default options
options = {'durable': False,
- 'auto_delete': True,
- 'exclusive': True}
+ 'auto_delete': True,
+ 'exclusive': True}
options.update(kwargs)
- exchange = kombu.entity.Exchange(
- name=exchange_name,
- type='fanout',
- durable=options['durable'],
- auto_delete=options['auto_delete'])
- super(FanoutConsumer, self).__init__(
- channel,
- callback,
- tag,
- name=queue_name,
- exchange=exchange,
- routing_key=topic,
- **options)
+ exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
+ durable=options['durable'],
+ auto_delete=options['auto_delete'])
+ super(FanoutConsumer, self).__init__(channel, callback, tag,
+ name=queue_name,
+ exchange=exchange,
+ routing_key=topic,
+ **options)
class Publisher(object):
@@ -278,9 +270,10 @@ class Publisher(object):
def reconnect(self, channel):
"""Re-establish the Producer after a rabbit reconnection"""
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
- **self.kwargs)
+ **self.kwargs)
self.producer = kombu.messaging.Producer(exchange=self.exchange,
- channel=channel, routing_key=self.routing_key)
+ channel=channel,
+ routing_key=self.routing_key)
def send(self, msg):
"""Send a message"""
@@ -296,14 +289,11 @@ class DirectPublisher(Publisher):
"""
options = {'durable': False,
- 'auto_delete': True,
- 'exclusive': True}
+ 'auto_delete': True,
+ 'exclusive': True}
options.update(kwargs)
- super(DirectPublisher, self).__init__(channel,
- msg_id,
- msg_id,
- type='direct',
- **options)
+ super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
+ type='direct', **options)
class TopicPublisher(Publisher):
@@ -314,14 +304,11 @@ class TopicPublisher(Publisher):
Kombu options may be passed as keyword args to override defaults
"""
options = {'durable': conf.rabbit_durable_queues,
- 'auto_delete': False,
- 'exclusive': False}
+ 'auto_delete': False,
+ 'exclusive': False}
options.update(kwargs)
- super(TopicPublisher, self).__init__(channel,
- conf.control_exchange,
- topic,
- type='topic',
- **options)
+ super(TopicPublisher, self).__init__(channel, conf.control_exchange,
+ topic, type='topic', **options)
class FanoutPublisher(Publisher):
@@ -332,14 +319,11 @@ class FanoutPublisher(Publisher):
Kombu options may be passed as keyword args to override defaults
"""
options = {'durable': False,
- 'auto_delete': True,
- 'exclusive': True}
+ 'auto_delete': True,
+ 'exclusive': True}
options.update(kwargs)
- super(FanoutPublisher, self).__init__(channel,
- '%s_fanout' % topic,
- None,
- type='fanout',
- **options)
+ super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
+ None, type='fanout', **options)
class NotifyPublisher(TopicPublisher):
@@ -356,10 +340,10 @@ class NotifyPublisher(TopicPublisher):
# we do this to ensure that messages don't get dropped if the
# consumer is started after we do
queue = kombu.entity.Queue(channel=channel,
- exchange=self.exchange,
- durable=self.durable,
- name=self.routing_key,
- routing_key=self.routing_key)
+ exchange=self.exchange,
+ durable=self.durable,
+ name=self.routing_key,
+ routing_key=self.routing_key)
queue.declare()
@@ -445,7 +429,7 @@ class Connection(object):
"""
if self.connection:
LOG.info(_("Reconnecting to AMQP server on "
- "%(hostname)s:%(port)d") % self.params)
+ "%(hostname)s:%(port)d") % self.params)
try:
self.connection.close()
except self.connection_errors:
@@ -453,8 +437,7 @@ class Connection(object):
# Setting this in case the next statement fails, though
# it shouldn't be doing any network operations, yet.
self.connection = None
- self.connection = kombu.connection.BrokerConnection(
- **self.params)
+ self.connection = kombu.connection.BrokerConnection(**self.params)
self.connection_errors = self.connection.connection_errors
if self.memory_transport:
# Kludge to speed up tests.
@@ -504,8 +487,8 @@ class Connection(object):
if self.max_retries and attempt == self.max_retries:
LOG.exception(_('Unable to connect to AMQP server on '
- '%(hostname)s:%(port)d after %(max_retries)d '
- 'tries: %(err_str)s') % log_info)
+ '%(hostname)s:%(port)d after %(max_retries)d '
+ 'tries: %(err_str)s') % log_info)
# NOTE(comstud): Copied from original code. There's
# really no better recourse because if this was a queue we
# need to consume on, we have no way to consume anymore.
@@ -519,9 +502,9 @@ class Connection(object):
sleep_time = min(sleep_time, self.interval_max)
log_info['sleep_time'] = sleep_time
- LOG.warn(_('AMQP server on %(hostname)s:%(port)d is'
- ' unreachable: %(err_str)s. Trying again in '
- '%(sleep_time)d seconds.') % log_info)
+ LOG.exception(_('AMQP server on %(hostname)s:%(port)d is'
+ ' unreachable: %(err_str)s. Trying again in '
+ '%(sleep_time)d seconds.') % log_info)
time.sleep(sleep_time)
def ensure(self, error_callback, method, *args, **kwargs):
@@ -571,11 +554,11 @@ class Connection(object):
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
- "%(err_str)s") % log_info)
+ "%(err_str)s") % log_info)
def _declare_consumer():
consumer = consumer_cls(self.conf, self.channel, topic, callback,
- self.consumer_num.next())
+ self.consumer_num.next())
self.consumers.append(consumer)
return consumer
@@ -589,11 +572,11 @@ class Connection(object):
def _error_callback(exc):
if isinstance(exc, socket.timeout):
LOG.exception(_('Timed out waiting for RPC response: %s') %
- str(exc))
+ str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
- str(exc))
+ str(exc))
info['do_consume'] = True
def _consume():
@@ -627,7 +610,7 @@ class Connection(object):
def _error_callback(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.exception(_("Failed to publish message to topic "
- "'%(topic)s': %(err_str)s") % log_info)
+ "'%(topic)s': %(err_str)s") % log_info)
def _publish():
publisher = cls(self.conf, self.channel, topic, **kwargs)
@@ -691,8 +674,9 @@ class Connection(object):
def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer that calls a method in a proxy object"""
- proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
- rpc_amqp.get_connection_pool(self.conf, Connection))
+ proxy_cb = rpc_amqp.ProxyCallback(
+ self.conf, proxy,
+ rpc_amqp.get_connection_pool(self.conf, Connection))
if fanout:
self.declare_fanout_consumer(topic, proxy_cb)
@@ -701,57 +685,66 @@ class Connection(object):
def create_worker(self, topic, proxy, pool_name):
"""Create a worker that calls a method in a proxy object"""
- proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
- rpc_amqp.get_connection_pool(self.conf, Connection))
+ proxy_cb = rpc_amqp.ProxyCallback(
+ self.conf, proxy,
+ rpc_amqp.get_connection_pool(self.conf, Connection))
self.declare_topic_consumer(topic, proxy_cb, pool_name)
def create_connection(conf, new=True):
"""Create a connection"""
- return rpc_amqp.create_connection(conf, new,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.create_connection(
+ conf, new,
+ rpc_amqp.get_connection_pool(conf, Connection))
def multicall(conf, context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
- return rpc_amqp.multicall(conf, context, topic, msg, timeout,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.multicall(
+ conf, context, topic, msg, timeout,
+ rpc_amqp.get_connection_pool(conf, Connection))
def call(conf, context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
- return rpc_amqp.call(conf, context, topic, msg, timeout,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.call(
+ conf, context, topic, msg, timeout,
+ rpc_amqp.get_connection_pool(conf, Connection))
def cast(conf, context, topic, msg):
"""Sends a message on a topic without waiting for a response."""
- return rpc_amqp.cast(conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.cast(
+ conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def fanout_cast(conf, context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response."""
- return rpc_amqp.fanout_cast(conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.fanout_cast(
+ conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def cast_to_server(conf, context, server_params, topic, msg):
"""Sends a message on a topic to a specific server."""
- return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.cast_to_server(
+ conf, context, server_params, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def fanout_cast_to_server(conf, context, server_params, topic, msg):
"""Sends a message on a fanout exchange to a specific server."""
- return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.cast_to_server(
+ conf, context, server_params, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def notify(conf, context, topic, msg):
"""Sends a notification event on a topic."""
- return rpc_amqp.notify(conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.notify(
+ conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def cleanup():
diff --git a/nova/openstack/common/rpc/impl_qpid.py b/nova/openstack/common/rpc/impl_qpid.py
index fce8cd07b..2eb1dfa7e 100644
--- a/nova/openstack/common/rpc/impl_qpid.py
+++ b/nova/openstack/common/rpc/impl_qpid.py
@@ -77,7 +77,7 @@ qpid_opts = [
cfg.BoolOpt('qpid_tcp_nodelay',
default=True,
help='Disable Nagle algorithm'),
- ]
+]
cfg.CONF.register_opts(qpid_opts)
@@ -161,10 +161,10 @@ class DirectConsumer(ConsumerBase):
"""
super(DirectConsumer, self).__init__(session, callback,
- "%s/%s" % (msg_id, msg_id),
- {"type": "direct"},
- msg_id,
- {"exclusive": True})
+ "%s/%s" % (msg_id, msg_id),
+ {"type": "direct"},
+ msg_id,
+ {"exclusive": True})
class TopicConsumer(ConsumerBase):
@@ -181,8 +181,9 @@ class TopicConsumer(ConsumerBase):
"""
super(TopicConsumer, self).__init__(session, callback,
- "%s/%s" % (conf.control_exchange, topic), {},
- name or topic, {})
+ "%s/%s" % (conf.control_exchange,
+ topic),
+ {}, name or topic, {})
class FanoutConsumer(ConsumerBase):
@@ -196,11 +197,12 @@ class FanoutConsumer(ConsumerBase):
'callback' is the callback to call when messages are received
"""
- super(FanoutConsumer, self).__init__(session, callback,
- "%s_fanout" % topic,
- {"durable": False, "type": "fanout"},
- "%s_fanout_%s" % (topic, uuid.uuid4().hex),
- {"exclusive": True})
+ super(FanoutConsumer, self).__init__(
+ session, callback,
+ "%s_fanout" % topic,
+ {"durable": False, "type": "fanout"},
+ "%s_fanout_%s" % (topic, uuid.uuid4().hex),
+ {"exclusive": True})
class Publisher(object):
@@ -254,8 +256,9 @@ class TopicPublisher(Publisher):
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
- super(TopicPublisher, self).__init__(session,
- "%s/%s" % (conf.control_exchange, topic))
+ super(TopicPublisher, self).__init__(
+ session,
+ "%s/%s" % (conf.control_exchange, topic))
class FanoutPublisher(Publisher):
@@ -263,8 +266,9 @@ class FanoutPublisher(Publisher):
def __init__(self, conf, session, topic):
"""init a 'fanout' publisher.
"""
- super(FanoutPublisher, self).__init__(session,
- "%s_fanout" % topic, {"type": "fanout"})
+ super(FanoutPublisher, self).__init__(
+ session,
+ "%s_fanout" % topic, {"type": "fanout"})
class NotifyPublisher(Publisher):
@@ -272,9 +276,10 @@ class NotifyPublisher(Publisher):
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
- super(NotifyPublisher, self).__init__(session,
- "%s/%s" % (conf.control_exchange, topic),
- {"durable": True})
+ super(NotifyPublisher, self).__init__(
+ session,
+ "%s/%s" % (conf.control_exchange, topic),
+ {"durable": True})
class Connection(object):
@@ -292,9 +297,9 @@ class Connection(object):
server_params = {}
default_params = dict(hostname=self.conf.qpid_hostname,
- port=self.conf.qpid_port,
- username=self.conf.qpid_username,
- password=self.conf.qpid_password)
+ port=self.conf.qpid_port,
+ username=self.conf.qpid_username,
+ password=self.conf.qpid_password)
params = server_params
for key in default_params.keys():
@@ -312,18 +317,18 @@ class Connection(object):
self.connection.reconnect = self.conf.qpid_reconnect
if self.conf.qpid_reconnect_timeout:
self.connection.reconnect_timeout = (
- self.conf.qpid_reconnect_timeout)
+ self.conf.qpid_reconnect_timeout)
if self.conf.qpid_reconnect_limit:
self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
if self.conf.qpid_reconnect_interval_max:
self.connection.reconnect_interval_max = (
- self.conf.qpid_reconnect_interval_max)
+ self.conf.qpid_reconnect_interval_max)
if self.conf.qpid_reconnect_interval_min:
self.connection.reconnect_interval_min = (
- self.conf.qpid_reconnect_interval_min)
+ self.conf.qpid_reconnect_interval_min)
if self.conf.qpid_reconnect_interval:
self.connection.reconnect_interval = (
- self.conf.qpid_reconnect_interval)
+ self.conf.qpid_reconnect_interval)
self.connection.hearbeat = self.conf.qpid_heartbeat
self.connection.protocol = self.conf.qpid_protocol
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
@@ -395,7 +400,7 @@ class Connection(object):
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
- "%(err_str)s") % log_info)
+ "%(err_str)s") % log_info)
def _declare_consumer():
consumer = consumer_cls(self.conf, self.session, topic, callback)
@@ -410,11 +415,11 @@ class Connection(object):
def _error_callback(exc):
if isinstance(exc, qpid.messaging.exceptions.Empty):
LOG.exception(_('Timed out waiting for RPC response: %s') %
- str(exc))
+ str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
- str(exc))
+ str(exc))
def _consume():
nxt_receiver = self.session.next_receiver(timeout=timeout)
@@ -444,7 +449,7 @@ class Connection(object):
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.exception(_("Failed to publish message to topic "
- "'%(topic)s': %(err_str)s") % log_info)
+ "'%(topic)s': %(err_str)s") % log_info)
def _publisher_send():
publisher = cls(self.conf, self.session, topic)
@@ -508,8 +513,9 @@ class Connection(object):
def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer that calls a method in a proxy object"""
- proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
- rpc_amqp.get_connection_pool(self.conf, Connection))
+ proxy_cb = rpc_amqp.ProxyCallback(
+ self.conf, proxy,
+ rpc_amqp.get_connection_pool(self.conf, Connection))
if fanout:
consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
@@ -522,8 +528,9 @@ class Connection(object):
def create_worker(self, topic, proxy, pool_name):
"""Create a worker that calls a method in a proxy object"""
- proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
- rpc_amqp.get_connection_pool(self.conf, Connection))
+ proxy_cb = rpc_amqp.ProxyCallback(
+ self.conf, proxy,
+ rpc_amqp.get_connection_pool(self.conf, Connection))
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
name=pool_name)
@@ -535,50 +542,57 @@ class Connection(object):
def create_connection(conf, new=True):
"""Create a connection"""
- return rpc_amqp.create_connection(conf, new,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.create_connection(
+ conf, new,
+ rpc_amqp.get_connection_pool(conf, Connection))
def multicall(conf, context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
- return rpc_amqp.multicall(conf, context, topic, msg, timeout,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.multicall(
+ conf, context, topic, msg, timeout,
+ rpc_amqp.get_connection_pool(conf, Connection))
def call(conf, context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
- return rpc_amqp.call(conf, context, topic, msg, timeout,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.call(
+ conf, context, topic, msg, timeout,
+ rpc_amqp.get_connection_pool(conf, Connection))
def cast(conf, context, topic, msg):
"""Sends a message on a topic without waiting for a response."""
- return rpc_amqp.cast(conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.cast(
+ conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def fanout_cast(conf, context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response."""
- return rpc_amqp.fanout_cast(conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.fanout_cast(
+ conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def cast_to_server(conf, context, server_params, topic, msg):
"""Sends a message on a topic to a specific server."""
- return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.cast_to_server(
+ conf, context, server_params, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def fanout_cast_to_server(conf, context, server_params, topic, msg):
"""Sends a message on a fanout exchange to a specific server."""
- return rpc_amqp.fanout_cast_to_server(conf, context, server_params, topic,
- msg, rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.fanout_cast_to_server(
+ conf, context, server_params, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def notify(conf, context, topic, msg):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ rpc_amqp.get_connection_pool(conf, Connection))
def cleanup():
diff --git a/nova/openstack/common/rpc/impl_zmq.py b/nova/openstack/common/rpc/impl_zmq.py
index 77768dbec..8435cd020 100644
--- a/nova/openstack/common/rpc/impl_zmq.py
+++ b/nova/openstack/common/rpc/impl_zmq.py
@@ -40,25 +40,26 @@ RPCException = rpc_common.RPCException
zmq_opts = [
cfg.StrOpt('rpc_zmq_bind_address', default='*',
- help='ZeroMQ bind address. Should be a wildcard (*), '
- 'an ethernet interface, or IP. '
- 'The "host" option should point or resolve to this address.'),
+ help='ZeroMQ bind address. Should be a wildcard (*), '
+ 'an ethernet interface, or IP. '
+ 'The "host" option should point or resolve to this '
+ 'address.'),
# The module.Class to use for matchmaking.
cfg.StrOpt('rpc_zmq_matchmaker',
- default='openstack.common.rpc.matchmaker.MatchMakerLocalhost',
- help='MatchMaker driver'),
+ default='openstack.common.rpc.matchmaker.MatchMakerLocalhost',
+ help='MatchMaker driver'),
# The following port is unassigned by IANA as of 2012-05-21
cfg.IntOpt('rpc_zmq_port', default=9501,
- help='ZeroMQ receiver listening port'),
+ help='ZeroMQ receiver listening port'),
cfg.IntOpt('rpc_zmq_contexts', default=1,
- help='Number of ZeroMQ contexts, defaults to 1'),
+ help='Number of ZeroMQ contexts, defaults to 1'),
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
- help='Directory for holding IPC sockets'),
- ]
+ help='Directory for holding IPC sockets'),
+]
# These globals are defined in register_opts(conf),
@@ -119,10 +120,10 @@ class ZmqSocket(object):
self.subscribe(f)
LOG.debug(_("Connecting to %{addr}s with %{type}s"
- "\n-> Subscribed to %{subscribe}s"
- "\n-> bind: %{bind}s"),
- {'addr': addr, 'type': self.socket_s(),
- 'subscribe': subscribe, 'bind': bind})
+ "\n-> Subscribed to %{subscribe}s"
+ "\n-> bind: %{bind}s"),
+ {'addr': addr, 'type': self.socket_s(),
+ 'subscribe': subscribe, 'bind': bind})
try:
if bind:
@@ -197,7 +198,7 @@ class ZmqClient(object):
def cast(self, msg_id, topic, data):
self.outq.send([str(msg_id), str(topic), str('cast'),
- _serialize(data)])
+ _serialize(data)])
def close(self):
self.outq.close()
@@ -306,7 +307,7 @@ class ConsumerBase(object):
data.setdefault('version', None)
data.setdefault('args', [])
proxy.dispatch(ctx, data['version'],
- data['method'], **data['args'])
+ data['method'], **data['args'])
class ZmqBaseReactor(ConsumerBase):
@@ -339,7 +340,7 @@ class ZmqBaseReactor(ConsumerBase):
# Items push in.
inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind,
- subscribe=subscribe)
+ subscribe=subscribe)
self.proxies[inq] = proxy
self.sockets.append(inq)
@@ -353,8 +354,7 @@ class ZmqBaseReactor(ConsumerBase):
raise RPCException("Bad output socktype")
# Items push out.
- outq = ZmqSocket(out_addr, zmq_type_out,
- bind=out_bind)
+ outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind)
self.mapping[inq] = outq
self.mapping[outq] = inq
@@ -428,7 +428,7 @@ class ZmqProxy(ZmqBaseReactor):
if not topic in self.topic_proxy:
outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
- sock_type, bind=True)
+ sock_type, bind=True)
self.topic_proxy[topic] = outq
self.sockets.append(outq)
LOG.info(_("Created topic proxy: %s"), topic)
@@ -486,7 +486,7 @@ class Connection(rpc_common.Connection):
topic = topic.split('.', 1)[0]
LOG.info(_("Create Consumer for topic (%(topic)s)") %
- {'topic': topic})
+ {'topic': topic})
# Subscription scenarios
if fanout:
@@ -502,7 +502,7 @@ class Connection(rpc_common.Connection):
(self.conf.rpc_zmq_ipc_dir, topic)
LOG.debug(_("Consumer is a zmq.%s"),
- ['PULL', 'SUB'][sock_type == zmq.SUB])
+ ['PULL', 'SUB'][sock_type == zmq.SUB])
self.reactor.register(proxy, inaddr, sock_type,
subscribe=subscribe, in_bind=False)
diff --git a/nova/openstack/common/rpc/matchmaker.py b/nova/openstack/common/rpc/matchmaker.py
index f59e2555d..fe9896a04 100644
--- a/nova/openstack/common/rpc/matchmaker.py
+++ b/nova/openstack/common/rpc/matchmaker.py
@@ -29,8 +29,8 @@ from nova.openstack.common import cfg
matchmaker_opts = [
# Matchmaker ring file
cfg.StrOpt('matchmaker_ringfile',
- default='/etc/nova/matchmaker_ring.json',
- help='Matchmaker ring file (JSON)'),
+ default='/etc/nova/matchmaker_ring.json',
+ help='Matchmaker ring file (JSON)'),
]
CONF = cfg.CONF
diff --git a/nova/openstack/common/rpc/proxy.py b/nova/openstack/common/rpc/proxy.py
index cfa2b8cd0..fe28cdde5 100644
--- a/nova/openstack/common/rpc/proxy.py
+++ b/nova/openstack/common/rpc/proxy.py
@@ -127,7 +127,7 @@ class RpcProxy(object):
rpc.fanout_cast(context, self.topic, msg)
def cast_to_server(self, context, server_params, msg, topic=None,
- version=None):
+ version=None):
"""rpc.cast_to_server() a remote method.
:param context: The request context