diff options
author | Gary Kotton <gkotton@redhat.com> | 2012-06-17 04:05:37 -0400 |
---|---|---|
committer | Gary Kotton <gkotton@redhat.com> | 2012-06-18 01:15:09 -0400 |
commit | 9f938720f158889252fa1db44be96745fa48e1ff (patch) | |
tree | 5383ca2084fc6e188c59bef6224c78b2719a5ed9 /openstack/common | |
parent | 925edb3ee8bbd97afaa43b2888ab45d2bca50faf (diff) | |
download | oslo-9f938720f158889252fa1db44be96745fa48e1ff.tar.gz oslo-9f938720f158889252fa1db44be96745fa48e1ff.tar.xz oslo-9f938720f158889252fa1db44be96745fa48e1ff.zip |
Update common code to support pep 1.3.
bug 1014216
Change-Id: I3f8fa2e11c9d3f3d34fb20f65ce886bb9c94463d
Diffstat (limited to 'openstack/common')
-rw-r--r-- | openstack/common/cfg.py | 39 | ||||
-rw-r--r-- | openstack/common/config.py | 16 | ||||
-rw-r--r-- | openstack/common/excutils.py | 2 | ||||
-rw-r--r-- | openstack/common/extensions.py | 33 | ||||
-rw-r--r-- | openstack/common/importutils.py | 2 | ||||
-rw-r--r-- | openstack/common/rpc/__init__.py | 2 | ||||
-rw-r--r-- | openstack/common/rpc/amqp.py | 21 | ||||
-rw-r--r-- | openstack/common/rpc/common.py | 6 | ||||
-rw-r--r-- | openstack/common/rpc/impl_kombu.py | 204 | ||||
-rw-r--r-- | openstack/common/rpc/impl_qpid.py | 112 | ||||
-rw-r--r-- | openstack/common/rpc/impl_zmq.py | 42 | ||||
-rw-r--r-- | openstack/common/rpc/matchmaker.py | 4 | ||||
-rw-r--r-- | openstack/common/rpc/proxy.py | 2 | ||||
-rw-r--r-- | openstack/common/utils.py | 10 |
14 files changed, 248 insertions, 247 deletions
diff --git a/openstack/common/cfg.py b/openstack/common/cfg.py index f97f718..a6c0dea 100644 --- a/openstack/common/cfg.py +++ b/openstack/common/cfg.py @@ -391,7 +391,7 @@ def _get_config_dirs(project=None): fix_path('~'), os.path.join('/etc', project) if project else None, '/etc' - ] + ] return filter(bool, cfg_dirs) @@ -559,10 +559,10 @@ class Opt(object): kwargs = self._get_optparse_kwargs(group) prefix = self._get_optparse_prefix('', group) self._add_to_optparse(container, self.name, self.short, kwargs, prefix, - self.deprecated_name) + self.deprecated_name) def _add_to_optparse(self, container, name, short, kwargs, prefix='', - deprecated_name=None): + deprecated_name=None): """Add an option to an optparse parser or group. :param container: an optparse.OptionContainer object @@ -607,11 +607,9 @@ class Opt(object): dest = self.dest if group is not None: dest = group.name + '_' + dest - kwargs.update({ - 'dest': dest, - 'metavar': self.metavar, - 'help': self.help, - }) + kwargs.update({'dest': dest, + 'metavar': self.metavar, + 'help': self.help, }) return kwargs def _get_optparse_prefix(self, prefix, group): @@ -676,7 +674,7 @@ class BoolOpt(Opt): prefix = self._get_optparse_prefix('no', group) kwargs["help"] = "The inverse of --" + self.name self._add_to_optparse(container, self.name, None, kwargs, prefix, - self.deprecated_name) + self.deprecated_name) def _get_optparse_kwargs(self, group, action='store_true', **kwargs): """Extends the base optparse keyword dict for boolean options.""" @@ -946,13 +944,13 @@ class ConfigOpts(collections.Mapping): self._oparser.disable_interspersed_args() self._config_opts = [ - MultiStrOpt('config-file', - default=default_config_files, - metavar='PATH', - help='Path to a config file to use. Multiple config ' - 'files can be specified, with values in later ' - 'files taking precedence. The default files ' - ' used are: %s' % (default_config_files, )), + MultiStrOpt('config-file', + default=default_config_files, + metavar='PATH', + help='Path to a config file to use. Multiple config ' + 'files can be specified, with values in later ' + 'files taking precedence. The default files ' + ' used are: %s' % (default_config_files, )), StrOpt('config-dir', metavar='DIR', help='Path to a config directory to pull *.conf ' @@ -962,7 +960,7 @@ class ConfigOpts(collections.Mapping): 'the file(s), if any, specified via --config-file, ' 'hence over-ridden options in the directory take ' 'precedence.'), - ] + ] self.register_cli_opts(self._config_opts) self.project = project @@ -1452,8 +1450,7 @@ class ConfigOpts(collections.Mapping): default, opt, override = [info[k] for k in sorted(info.keys())] if opt.required: - if (default is not None or - override is not None): + if (default is not None or override is not None): continue if self._get(opt.name, group) is None: @@ -1557,7 +1554,7 @@ class CommonConfigOpts(ConfigOpts): short='v', default=False, help='Print more verbose output'), - ] + ] logging_cli_opts = [ StrOpt('log-config', @@ -1591,7 +1588,7 @@ class CommonConfigOpts(ConfigOpts): StrOpt('syslog-log-facility', default='LOG_USER', help='syslog facility to receive log lines') - ] + ] def __init__(self): super(CommonConfigOpts, self).__init__() diff --git a/openstack/common/config.py b/openstack/common/config.py index 7ae703e..12645f3 100644 --- a/openstack/common/config.py +++ b/openstack/common/config.py @@ -99,15 +99,15 @@ def add_log_options(parser): "the Python logging module documentation for " "details on logging configuration files.") group.add_option('--log-date-format', metavar="FORMAT", - default=DEFAULT_LOG_DATE_FORMAT, - help="Format string for %(asctime)s in log records. " - "Default: %default") + default=DEFAULT_LOG_DATE_FORMAT, + help="Format string for %(asctime)s in log records. " + "Default: %default") group.add_option('--log-file', default=None, metavar="PATH", - help="(Optional) Name of log file to output to. " - "If not set, logging will go to stdout.") + help="(Optional) Name of log file to output to. " + "If not set, logging will go to stdout.") group.add_option("--log-dir", default=None, - help="(Optional) The directory to keep log files in " - "(will be prepended to --logfile)") + help="(Optional) The directory to keep log files in " + "(will be prepended to --logfile)") group.add_option('--use-syslog', default=False, dest="use_syslog", action="store_true", help="Use syslog for logging.") @@ -249,7 +249,7 @@ def load_paste_config(app_name, options, args, config_dir=None): conf_file = find_config_file(app_name, options, args, config_dir) if not conf_file: raise RuntimeError("Unable to locate any configuration file. " - "Cannot load application %s" % app_name) + "Cannot load application %s" % app_name) try: conf = deploy.appconfig("config:%s" % conf_file, name=app_name) return conf_file, conf diff --git a/openstack/common/excutils.py b/openstack/common/excutils.py index 3cb678e..67c9fa9 100644 --- a/openstack/common/excutils.py +++ b/openstack/common/excutils.py @@ -44,6 +44,6 @@ def save_and_reraise_exception(): yield except Exception: logging.error('Original exception being dropped: %s' % - (traceback.format_exception(type_, value, tb))) + (traceback.format_exception(type_, value, tb))) raise raise type_, value, tb diff --git a/openstack/common/extensions.py b/openstack/common/extensions.py index 162a02a..f2f5d81 100644 --- a/openstack/common/extensions.py +++ b/openstack/common/extensions.py @@ -220,15 +220,15 @@ class ExtensionMiddleware(wsgi.Middleware): if not action.collection in action_resources.keys(): resource = ActionExtensionResource(application) mapper.connect("/%s/:(id)/action.:(format)" % - action.collection, - action='action', - controller=resource, - conditions=dict(method=['POST'])) + action.collection, + action='action', + controller=resource, + conditions=dict(method=['POST'])) mapper.connect("/%s/:(id)/action" % - action.collection, - action='action', - controller=resource, - conditions=dict(method=['POST'])) + action.collection, + action='action', + controller=resource, + conditions=dict(method=['POST'])) action_resources[action.collection] = resource return action_resources @@ -240,21 +240,20 @@ class ExtensionMiddleware(wsgi.Middleware): if not req_ext.key in request_ext_resources.keys(): resource = RequestExtensionResource(application) mapper.connect(req_ext.url_route + '.:(format)', - action='process', - controller=resource, - conditions=req_ext.conditions) + action='process', + controller=resource, + conditions=req_ext.conditions) mapper.connect(req_ext.url_route, - action='process', - controller=resource, - conditions=req_ext.conditions) + action='process', + controller=resource, + conditions=req_ext.conditions) request_ext_resources[req_ext.key] = resource return request_ext_resources def __init__(self, application, config, ext_mgr=None): - ext_mgr = ext_mgr or ExtensionManager( - config['api_extensions_path']) + ext_mgr = ext_mgr or ExtensionManager(config['api_extensions_path']) mapper = routes.Mapper() # extended resources @@ -275,7 +274,7 @@ class ExtensionMiddleware(wsgi.Middleware): # extended actions action_resources = self._action_ext_resources(application, ext_mgr, - mapper) + mapper) for action in ext_mgr.get_actions(): LOG.debug(_('Extended action: %s'), action.action_name) resource = action_resources[action.collection] diff --git a/openstack/common/importutils.py b/openstack/common/importutils.py index 7654af5..b507d22 100644 --- a/openstack/common/importutils.py +++ b/openstack/common/importutils.py @@ -30,7 +30,7 @@ def import_class(import_str): return getattr(sys.modules[mod_str], class_str) except (ImportError, ValueError, AttributeError), exc: raise ImportError('Class %s cannot be found (%s)' % - (class_str, str(exc))) + (class_str, str(exc))) def import_object(import_str, *args, **kwargs): diff --git a/openstack/common/rpc/__init__.py b/openstack/common/rpc/__init__.py index 26bd048..dc4b185 100644 --- a/openstack/common/rpc/__init__.py +++ b/openstack/common/rpc/__init__.py @@ -56,7 +56,7 @@ rpc_opts = [ cfg.BoolOpt('fake_rabbit', default=False, help='If passed, use a fake RabbitMQ provider'), - ] +] cfg.CONF.register_opts(rpc_opts) diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index a79a3aa..cc40ff3 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -92,8 +92,9 @@ class ConnectionContext(rpc_common.Connection): if pooled: self.connection = connection_pool.get() else: - self.connection = connection_pool.connection_cls(conf, - server_params=server_params) + self.connection = connection_pool.connection_cls( + conf, + server_params=server_params) self.pooled = pooled def __enter__(self): @@ -161,8 +162,8 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, msg = {'result': reply, 'failure': failure} except TypeError: msg = {'result': dict((k, repr(v)) - for k, v in reply.__dict__.iteritems()), - 'failure': failure} + for k, v in reply.__dict__.iteritems()), + 'failure': failure} if ending: msg['ending'] = True conn.direct_send(msg_id, msg) @@ -288,8 +289,8 @@ class ProxyCallback(object): class MulticallWaiter(object): def __init__(self, conf, connection, timeout): self._connection = connection - self._iterator = connection.iterconsume( - timeout=timeout or conf.rpc_response_timeout) + self._iterator = connection.iterconsume(timeout=timeout or + conf.rpc_response_timeout) self._result = None self._done = False self._got_ending = False @@ -308,7 +309,7 @@ class MulticallWaiter(object): if data['failure']: failure = data['failure'] self._result = rpc_common.deserialize_remote_exception(self._conf, - failure) + failure) elif data.get('ending', False): self._got_ending = True @@ -389,16 +390,16 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool): """Sends a message on a topic to a specific server.""" pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, - server_params=server_params) as conn: + server_params=server_params) as conn: conn.topic_send(topic, msg) def fanout_cast_to_server(conf, context, server_params, topic, msg, - connection_pool): + connection_pool): """Sends a message on a fanout exchange to a specific server.""" pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, - server_params=server_params) as conn: + server_params=server_params) as conn: conn.fanout_send(topic, msg) diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py index 6acd72c..a724ea2 100644 --- a/openstack/common/rpc/common.py +++ b/openstack/common/rpc/common.py @@ -167,10 +167,8 @@ class Connection(object): def _safe_log(log_func, msg, msg_data): """Sanitizes the msg_data field before logging.""" - SANITIZE = { - 'set_admin_password': ('new_pass',), - 'run_instance': ('admin_password',), - } + SANITIZE = {'set_admin_password': ('new_pass',), + 'run_instance': ('admin_password',), } has_method = 'method' in msg_data and msg_data['method'] in SANITIZE has_context_token = '_context_auth_token' in msg_data diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index c32497a..b78df37 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -46,7 +46,7 @@ kombu_opts = [ cfg.StrOpt('kombu_ssl_ca_certs', default='', help=('SSL certification authority file ' - '(valid only if SSL enabled)')), + '(valid only if SSL enabled)')), cfg.StrOpt('rabbit_host', default='localhost', help='the RabbitMQ host'), @@ -80,7 +80,7 @@ kombu_opts = [ default=False, help='use durable queues in RabbitMQ'), - ] +] cfg.CONF.register_opts(kombu_opts) @@ -171,22 +171,20 @@ class DirectConsumer(ConsumerBase): """ # Default options options = {'durable': False, - 'auto_delete': True, - 'exclusive': True} + 'auto_delete': True, + 'exclusive': True} options.update(kwargs) - exchange = kombu.entity.Exchange( - name=msg_id, - type='direct', - durable=options['durable'], - auto_delete=options['auto_delete']) - super(DirectConsumer, self).__init__( - channel, - callback, - tag, - name=msg_id, - exchange=exchange, - routing_key=msg_id, - **options) + exchange = kombu.entity.Exchange(name=msg_id, + type='direct', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(DirectConsumer, self).__init__(channel, + callback, + tag, + name=msg_id, + exchange=exchange, + routing_key=msg_id, + **options) class TopicConsumer(ConsumerBase): @@ -208,22 +206,20 @@ class TopicConsumer(ConsumerBase): """ # Default options options = {'durable': conf.rabbit_durable_queues, - 'auto_delete': False, - 'exclusive': False} + 'auto_delete': False, + 'exclusive': False} options.update(kwargs) - exchange = kombu.entity.Exchange( - name=conf.control_exchange, - type='topic', - durable=options['durable'], - auto_delete=options['auto_delete']) - super(TopicConsumer, self).__init__( - channel, - callback, - tag, - name=name or topic, - exchange=exchange, - routing_key=topic, - **options) + exchange = kombu.entity.Exchange(name=conf.control_exchange, + type='topic', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(TopicConsumer, self).__init__(channel, + callback, + tag, + name=name or topic, + exchange=exchange, + routing_key=topic, + **options) class FanoutConsumer(ConsumerBase): @@ -245,22 +241,17 @@ class FanoutConsumer(ConsumerBase): # Default options options = {'durable': False, - 'auto_delete': True, - 'exclusive': True} + 'auto_delete': True, + 'exclusive': True} options.update(kwargs) - exchange = kombu.entity.Exchange( - name=exchange_name, - type='fanout', - durable=options['durable'], - auto_delete=options['auto_delete']) - super(FanoutConsumer, self).__init__( - channel, - callback, - tag, - name=queue_name, - exchange=exchange, - routing_key=topic, - **options) + exchange = kombu.entity.Exchange(name=exchange_name, type='fanout', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(FanoutConsumer, self).__init__(channel, callback, tag, + name=queue_name, + exchange=exchange, + routing_key=topic, + **options) class Publisher(object): @@ -278,9 +269,10 @@ class Publisher(object): def reconnect(self, channel): """Re-establish the Producer after a rabbit reconnection""" self.exchange = kombu.entity.Exchange(name=self.exchange_name, - **self.kwargs) + **self.kwargs) self.producer = kombu.messaging.Producer(exchange=self.exchange, - channel=channel, routing_key=self.routing_key) + channel=channel, + routing_key=self.routing_key) def send(self, msg): """Send a message""" @@ -296,14 +288,11 @@ class DirectPublisher(Publisher): """ options = {'durable': False, - 'auto_delete': True, - 'exclusive': True} + 'auto_delete': True, + 'exclusive': True} options.update(kwargs) - super(DirectPublisher, self).__init__(channel, - msg_id, - msg_id, - type='direct', - **options) + super(DirectPublisher, self).__init__(channel, msg_id, msg_id, + type='direct', **options) class TopicPublisher(Publisher): @@ -314,14 +303,11 @@ class TopicPublisher(Publisher): Kombu options may be passed as keyword args to override defaults """ options = {'durable': conf.rabbit_durable_queues, - 'auto_delete': False, - 'exclusive': False} + 'auto_delete': False, + 'exclusive': False} options.update(kwargs) - super(TopicPublisher, self).__init__(channel, - conf.control_exchange, - topic, - type='topic', - **options) + super(TopicPublisher, self).__init__(channel, conf.control_exchange, + topic, type='topic', **options) class FanoutPublisher(Publisher): @@ -332,14 +318,11 @@ class FanoutPublisher(Publisher): Kombu options may be passed as keyword args to override defaults """ options = {'durable': False, - 'auto_delete': True, - 'exclusive': True} + 'auto_delete': True, + 'exclusive': True} options.update(kwargs) - super(FanoutPublisher, self).__init__(channel, - '%s_fanout' % topic, - None, - type='fanout', - **options) + super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic, + None, type='fanout', **options) class NotifyPublisher(TopicPublisher): @@ -356,10 +339,10 @@ class NotifyPublisher(TopicPublisher): # we do this to ensure that messages don't get dropped if the # consumer is started after we do queue = kombu.entity.Queue(channel=channel, - exchange=self.exchange, - durable=self.durable, - name=self.routing_key, - routing_key=self.routing_key) + exchange=self.exchange, + durable=self.durable, + name=self.routing_key, + routing_key=self.routing_key) queue.declare() @@ -445,7 +428,7 @@ class Connection(object): """ if self.connection: LOG.info(_("Reconnecting to AMQP server on " - "%(hostname)s:%(port)d") % self.params) + "%(hostname)s:%(port)d") % self.params) try: self.connection.close() except self.connection_errors: @@ -453,8 +436,7 @@ class Connection(object): # Setting this in case the next statement fails, though # it shouldn't be doing any network operations, yet. self.connection = None - self.connection = kombu.connection.BrokerConnection( - **self.params) + self.connection = kombu.connection.BrokerConnection(**self.params) self.connection_errors = self.connection.connection_errors if self.memory_transport: # Kludge to speed up tests. @@ -504,8 +486,8 @@ class Connection(object): if self.max_retries and attempt == self.max_retries: LOG.exception(_('Unable to connect to AMQP server on ' - '%(hostname)s:%(port)d after %(max_retries)d ' - 'tries: %(err_str)s') % log_info) + '%(hostname)s:%(port)d after %(max_retries)d ' + 'tries: %(err_str)s') % log_info) # NOTE(comstud): Copied from original code. There's # really no better recourse because if this was a queue we # need to consume on, we have no way to consume anymore. @@ -520,8 +502,8 @@ class Connection(object): log_info['sleep_time'] = sleep_time LOG.exception(_('AMQP server on %(hostname)s:%(port)d is' - ' unreachable: %(err_str)s. Trying again in ' - '%(sleep_time)d seconds.') % log_info) + ' unreachable: %(err_str)s. Trying again in ' + '%(sleep_time)d seconds.') % log_info) time.sleep(sleep_time) def ensure(self, error_callback, method, *args, **kwargs): @@ -571,11 +553,11 @@ class Connection(object): def _connect_error(exc): log_info = {'topic': topic, 'err_str': str(exc)} LOG.error(_("Failed to declare consumer for topic '%(topic)s': " - "%(err_str)s") % log_info) + "%(err_str)s") % log_info) def _declare_consumer(): consumer = consumer_cls(self.conf, self.channel, topic, callback, - self.consumer_num.next()) + self.consumer_num.next()) self.consumers.append(consumer) return consumer @@ -589,11 +571,11 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, socket.timeout): LOG.exception(_('Timed out waiting for RPC response: %s') % - str(exc)) + str(exc)) raise rpc_common.Timeout() else: LOG.exception(_('Failed to consume message from queue: %s') % - str(exc)) + str(exc)) info['do_consume'] = True def _consume(): @@ -627,7 +609,7 @@ class Connection(object): def _error_callback(exc): log_info = {'topic': topic, 'err_str': str(exc)} LOG.exception(_("Failed to publish message to topic " - "'%(topic)s': %(err_str)s") % log_info) + "'%(topic)s': %(err_str)s") % log_info) def _publish(): publisher = cls(self.conf, self.channel, topic, **kwargs) @@ -691,8 +673,9 @@ class Connection(object): def create_consumer(self, topic, proxy, fanout=False): """Create a consumer that calls a method in a proxy object""" - proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy, - rpc_amqp.get_connection_pool(self.conf, Connection)) + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) if fanout: self.declare_fanout_consumer(topic, proxy_cb) @@ -701,57 +684,66 @@ class Connection(object): def create_worker(self, topic, proxy, pool_name): """Create a worker that calls a method in a proxy object""" - proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy, - rpc_amqp.get_connection_pool(self.conf, Connection)) + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) self.declare_topic_consumer(topic, proxy_cb, pool_name) def create_connection(conf, new=True): """Create a connection""" - return rpc_amqp.create_connection(conf, new, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.create_connection( + conf, new, + rpc_amqp.get_connection_pool(conf, Connection)) def multicall(conf, context, topic, msg, timeout=None): """Make a call that returns multiple times.""" - return rpc_amqp.multicall(conf, context, topic, msg, timeout, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.multicall( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) def call(conf, context, topic, msg, timeout=None): """Sends a message on a topic and wait for a response.""" - return rpc_amqp.call(conf, context, topic, msg, timeout, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.call( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) def cast(conf, context, topic, msg): """Sends a message on a topic without waiting for a response.""" - return rpc_amqp.cast(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def fanout_cast(conf, context, topic, msg): """Sends a message on a fanout exchange without waiting for a response.""" - return rpc_amqp.fanout_cast(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.fanout_cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def cast_to_server(conf, context, server_params, topic, msg): """Sends a message on a topic to a specific server.""" - return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def fanout_cast_to_server(conf, context, server_params, topic, msg): """Sends a message on a fanout exchange to a specific server.""" - return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def notify(conf, context, topic, msg): """Sends a notification event on a topic.""" - return rpc_amqp.notify(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.notify( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def cleanup(): diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index 78236f2..850a60c 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -77,7 +77,7 @@ qpid_opts = [ cfg.BoolOpt('qpid_tcp_nodelay', default=True, help='Disable Nagle algorithm'), - ] +] cfg.CONF.register_opts(qpid_opts) @@ -161,10 +161,10 @@ class DirectConsumer(ConsumerBase): """ super(DirectConsumer, self).__init__(session, callback, - "%s/%s" % (msg_id, msg_id), - {"type": "direct"}, - msg_id, - {"exclusive": True}) + "%s/%s" % (msg_id, msg_id), + {"type": "direct"}, + msg_id, + {"exclusive": True}) class TopicConsumer(ConsumerBase): @@ -181,8 +181,9 @@ class TopicConsumer(ConsumerBase): """ super(TopicConsumer, self).__init__(session, callback, - "%s/%s" % (conf.control_exchange, topic), {}, - name or topic, {}) + "%s/%s" % (conf.control_exchange, + topic), + {}, name or topic, {}) class FanoutConsumer(ConsumerBase): @@ -196,11 +197,12 @@ class FanoutConsumer(ConsumerBase): 'callback' is the callback to call when messages are received """ - super(FanoutConsumer, self).__init__(session, callback, - "%s_fanout" % topic, - {"durable": False, "type": "fanout"}, - "%s_fanout_%s" % (topic, uuid.uuid4().hex), - {"exclusive": True}) + super(FanoutConsumer, self).__init__( + session, callback, + "%s_fanout" % topic, + {"durable": False, "type": "fanout"}, + "%s_fanout_%s" % (topic, uuid.uuid4().hex), + {"exclusive": True}) class Publisher(object): @@ -254,8 +256,9 @@ class TopicPublisher(Publisher): def __init__(self, conf, session, topic): """init a 'topic' publisher. """ - super(TopicPublisher, self).__init__(session, - "%s/%s" % (conf.control_exchange, topic)) + super(TopicPublisher, self).__init__( + session, + "%s/%s" % (conf.control_exchange, topic)) class FanoutPublisher(Publisher): @@ -263,8 +266,9 @@ class FanoutPublisher(Publisher): def __init__(self, conf, session, topic): """init a 'fanout' publisher. """ - super(FanoutPublisher, self).__init__(session, - "%s_fanout" % topic, {"type": "fanout"}) + super(FanoutPublisher, self).__init__( + session, + "%s_fanout" % topic, {"type": "fanout"}) class NotifyPublisher(Publisher): @@ -272,9 +276,10 @@ class NotifyPublisher(Publisher): def __init__(self, conf, session, topic): """init a 'topic' publisher. """ - super(NotifyPublisher, self).__init__(session, - "%s/%s" % (conf.control_exchange, topic), - {"durable": True}) + super(NotifyPublisher, self).__init__( + session, + "%s/%s" % (conf.control_exchange, topic), + {"durable": True}) class Connection(object): @@ -292,9 +297,9 @@ class Connection(object): server_params = {} default_params = dict(hostname=self.conf.qpid_hostname, - port=self.conf.qpid_port, - username=self.conf.qpid_username, - password=self.conf.qpid_password) + port=self.conf.qpid_port, + username=self.conf.qpid_username, + password=self.conf.qpid_password) params = server_params for key in default_params.keys(): @@ -312,18 +317,18 @@ class Connection(object): self.connection.reconnect = self.conf.qpid_reconnect if self.conf.qpid_reconnect_timeout: self.connection.reconnect_timeout = ( - self.conf.qpid_reconnect_timeout) + self.conf.qpid_reconnect_timeout) if self.conf.qpid_reconnect_limit: self.connection.reconnect_limit = self.conf.qpid_reconnect_limit if self.conf.qpid_reconnect_interval_max: self.connection.reconnect_interval_max = ( - self.conf.qpid_reconnect_interval_max) + self.conf.qpid_reconnect_interval_max) if self.conf.qpid_reconnect_interval_min: self.connection.reconnect_interval_min = ( - self.conf.qpid_reconnect_interval_min) + self.conf.qpid_reconnect_interval_min) if self.conf.qpid_reconnect_interval: self.connection.reconnect_interval = ( - self.conf.qpid_reconnect_interval) + self.conf.qpid_reconnect_interval) self.connection.hearbeat = self.conf.qpid_heartbeat self.connection.protocol = self.conf.qpid_protocol self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay @@ -395,7 +400,7 @@ class Connection(object): def _connect_error(exc): log_info = {'topic': topic, 'err_str': str(exc)} LOG.error(_("Failed to declare consumer for topic '%(topic)s': " - "%(err_str)s") % log_info) + "%(err_str)s") % log_info) def _declare_consumer(): consumer = consumer_cls(self.conf, self.session, topic, callback) @@ -410,11 +415,11 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, qpid.messaging.exceptions.Empty): LOG.exception(_('Timed out waiting for RPC response: %s') % - str(exc)) + str(exc)) raise rpc_common.Timeout() else: LOG.exception(_('Failed to consume message from queue: %s') % - str(exc)) + str(exc)) def _consume(): nxt_receiver = self.session.next_receiver(timeout=timeout) @@ -444,7 +449,7 @@ class Connection(object): def _connect_error(exc): log_info = {'topic': topic, 'err_str': str(exc)} LOG.exception(_("Failed to publish message to topic " - "'%(topic)s': %(err_str)s") % log_info) + "'%(topic)s': %(err_str)s") % log_info) def _publisher_send(): publisher = cls(self.conf, self.session, topic) @@ -508,8 +513,9 @@ class Connection(object): def create_consumer(self, topic, proxy, fanout=False): """Create a consumer that calls a method in a proxy object""" - proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy, - rpc_amqp.get_connection_pool(self.conf, Connection)) + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) if fanout: consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb) @@ -522,8 +528,9 @@ class Connection(object): def create_worker(self, topic, proxy, pool_name): """Create a worker that calls a method in a proxy object""" - proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy, - rpc_amqp.get_connection_pool(self.conf, Connection)) + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb, name=pool_name) @@ -535,50 +542,57 @@ class Connection(object): def create_connection(conf, new=True): """Create a connection""" - return rpc_amqp.create_connection(conf, new, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.create_connection( + conf, new, + rpc_amqp.get_connection_pool(conf, Connection)) def multicall(conf, context, topic, msg, timeout=None): """Make a call that returns multiple times.""" - return rpc_amqp.multicall(conf, context, topic, msg, timeout, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.multicall( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) def call(conf, context, topic, msg, timeout=None): """Sends a message on a topic and wait for a response.""" - return rpc_amqp.call(conf, context, topic, msg, timeout, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.call( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) def cast(conf, context, topic, msg): """Sends a message on a topic without waiting for a response.""" - return rpc_amqp.cast(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def fanout_cast(conf, context, topic, msg): """Sends a message on a fanout exchange without waiting for a response.""" - return rpc_amqp.fanout_cast(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.fanout_cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def cast_to_server(conf, context, server_params, topic, msg): """Sends a message on a topic to a specific server.""" - return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def fanout_cast_to_server(conf, context, server_params, topic, msg): """Sends a message on a fanout exchange to a specific server.""" - return rpc_amqp.fanout_cast_to_server(conf, context, server_params, topic, - msg, rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.fanout_cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def notify(conf, context, topic, msg): """Sends a notification event on a topic.""" return rpc_amqp.notify(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + rpc_amqp.get_connection_pool(conf, Connection)) def cleanup(): diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index da69dd7..560e649 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -40,25 +40,26 @@ RPCException = rpc_common.RPCException zmq_opts = [ cfg.StrOpt('rpc_zmq_bind_address', default='*', - help='ZeroMQ bind address. Should be a wildcard (*), ' - 'an ethernet interface, or IP. ' - 'The "host" option should point or resolve to this address.'), + help='ZeroMQ bind address. Should be a wildcard (*), ' + 'an ethernet interface, or IP. ' + 'The "host" option should point or resolve to this ' + 'address.'), # The module.Class to use for matchmaking. cfg.StrOpt('rpc_zmq_matchmaker', - default='openstack.common.rpc.matchmaker.MatchMakerLocalhost', - help='MatchMaker driver'), + default='openstack.common.rpc.matchmaker.MatchMakerLocalhost', + help='MatchMaker driver'), # The following port is unassigned by IANA as of 2012-05-21 cfg.IntOpt('rpc_zmq_port', default=9501, - help='ZeroMQ receiver listening port'), + help='ZeroMQ receiver listening port'), cfg.IntOpt('rpc_zmq_contexts', default=1, - help='Number of ZeroMQ contexts, defaults to 1'), + help='Number of ZeroMQ contexts, defaults to 1'), cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack', - help='Directory for holding IPC sockets'), - ] + help='Directory for holding IPC sockets'), +] # These globals are defined in register_opts(conf), @@ -119,10 +120,10 @@ class ZmqSocket(object): self.subscribe(f) LOG.debug(_("Connecting to %{addr}s with %{type}s" - "\n-> Subscribed to %{subscribe}s" - "\n-> bind: %{bind}s"), - {'addr': addr, 'type': self.socket_s(), - 'subscribe': subscribe, 'bind': bind}) + "\n-> Subscribed to %{subscribe}s" + "\n-> bind: %{bind}s"), + {'addr': addr, 'type': self.socket_s(), + 'subscribe': subscribe, 'bind': bind}) try: if bind: @@ -197,7 +198,7 @@ class ZmqClient(object): def cast(self, msg_id, topic, data): self.outq.send([str(msg_id), str(topic), str('cast'), - _serialize(data)]) + _serialize(data)]) def close(self): self.outq.close() @@ -306,7 +307,7 @@ class ConsumerBase(object): data.setdefault('version', None) data.setdefault('args', []) proxy.dispatch(ctx, data['version'], - data['method'], **data['args']) + data['method'], **data['args']) class ZmqBaseReactor(ConsumerBase): @@ -339,7 +340,7 @@ class ZmqBaseReactor(ConsumerBase): # Items push in. inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind, - subscribe=subscribe) + subscribe=subscribe) self.proxies[inq] = proxy self.sockets.append(inq) @@ -353,8 +354,7 @@ class ZmqBaseReactor(ConsumerBase): raise RPCException("Bad output socktype") # Items push out. - outq = ZmqSocket(out_addr, zmq_type_out, - bind=out_bind) + outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind) self.mapping[inq] = outq self.mapping[outq] = inq @@ -428,7 +428,7 @@ class ZmqProxy(ZmqBaseReactor): if not topic in self.topic_proxy: outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), - sock_type, bind=True) + sock_type, bind=True) self.topic_proxy[topic] = outq self.sockets.append(outq) LOG.info(_("Created topic proxy: %s"), topic) @@ -486,7 +486,7 @@ class Connection(rpc_common.Connection): topic = topic.split('.', 1)[0] LOG.info(_("Create Consumer for topic (%(topic)s)") % - {'topic': topic}) + {'topic': topic}) # Subscription scenarios if fanout: @@ -502,7 +502,7 @@ class Connection(rpc_common.Connection): (self.conf.rpc_zmq_ipc_dir, topic) LOG.debug(_("Consumer is a zmq.%s"), - ['PULL', 'SUB'][sock_type == zmq.SUB]) + ['PULL', 'SUB'][sock_type == zmq.SUB]) self.reactor.register(proxy, inaddr, sock_type, subscribe=subscribe, in_bind=False) diff --git a/openstack/common/rpc/matchmaker.py b/openstack/common/rpc/matchmaker.py index 4da1dcd..34a9ae4 100644 --- a/openstack/common/rpc/matchmaker.py +++ b/openstack/common/rpc/matchmaker.py @@ -29,8 +29,8 @@ from openstack.common import cfg matchmaker_opts = [ # Matchmaker ring file cfg.StrOpt('matchmaker_ringfile', - default='/etc/nova/matchmaker_ring.json', - help='Matchmaker ring file (JSON)'), + default='/etc/nova/matchmaker_ring.json', + help='Matchmaker ring file (JSON)'), ] CONF = cfg.CONF diff --git a/openstack/common/rpc/proxy.py b/openstack/common/rpc/proxy.py index 4f4dff5..112a66f 100644 --- a/openstack/common/rpc/proxy.py +++ b/openstack/common/rpc/proxy.py @@ -127,7 +127,7 @@ class RpcProxy(object): rpc.fanout_cast(context, self.topic, msg) def cast_to_server(self, context, server_params, msg, topic=None, - version=None): + version=None): """rpc.cast_to_server() a remote method. :param context: The request context diff --git a/openstack/common/utils.py b/openstack/common/utils.py index dee09ac..e782e89 100644 --- a/openstack/common/utils.py +++ b/openstack/common/utils.py @@ -118,13 +118,13 @@ def execute(*cmd, **kwargs): LOG.debug(_('Result was %s') % _returncode) if (isinstance(check_exit_code, int) and not isinstance(check_exit_code, bool) and - _returncode != check_exit_code): + _returncode != check_exit_code): (stdout, stderr) = result raise exception.ProcessExecutionError( - exit_code=_returncode, - stdout=stdout, - stderr=stderr, - cmd=' '.join(cmd)) + exit_code=_returncode, + stdout=stdout, + stderr=stderr, + cmd=' '.join(cmd)) return result except exception.ProcessExecutionError: if not attempts: |